diff --git a/plugins/manager.go b/plugins/manager.go index 8cee839e2..4bf44928a 100644 --- a/plugins/manager.go +++ b/plugins/manager.go @@ -192,9 +192,9 @@ func (m *Manager) startPlaylistGenerators(ctx context.Context) { if !hasCapability(p.capabilities, CapabilityPlaylistGenerator) { continue } - orch := newPlaylistGeneratorOrchestrator(name, p, m.ds, ctx) + orch := newPlaylistGeneratorOrchestrator(ctx, name, p, m.ds) m.playlistGenerators[name] = orch - orch.wg.Go(func() { orch.discoverAndSync(orch.ctx) }) + go orch.run() } } diff --git a/plugins/playlist_generator.go b/plugins/playlist_generator.go index 687bc092b..c933f961a 100644 --- a/plugins/playlist_generator.go +++ b/plugins/playlist_generator.go @@ -4,7 +4,7 @@ import ( "context" "fmt" "strings" - "sync" + "sync/atomic" "time" "github.com/navidrome/navidrome/core/matcher" @@ -19,6 +19,12 @@ const ( FuncPlaylistGeneratorGetAvailablePlaylists = "nd_playlist_generator_get_available_playlists" FuncPlaylistGeneratorGetPlaylist = "nd_playlist_generator_get_playlist" + + // workChCapacity is the buffer size for the work channel. + workChCapacity = 16 + + // discoveryRetryDelay is how long to wait before retrying a failed GetAvailablePlaylists call. + discoveryRetryDelay = 5 * time.Minute ) func init() { @@ -29,21 +35,40 @@ func init() { ) } -// playlistGeneratorOrchestrator manages playlist generation for a single plugin. -type playlistGeneratorOrchestrator struct { - pluginName string - plugin *plugin - ds model.DataStore - matcher *matcher.Matcher - ctx context.Context - cancel context.CancelFunc - wg sync.WaitGroup - refreshTimers map[string]*time.Timer // keyed by playlist DB ID - discoveryTimer *time.Timer - retryInterval time.Duration // from last GetAvailablePlaylists response +type workType int + +const ( + workDiscover workType = iota // run discoverAndSync + workSync // run syncPlaylist for a single playlist +) + +type workItem struct { + typ workType + info capabilities.PlaylistInfo // only for workSync + dbID string // only for workSync + ownerID string // only for workSync } -func newPlaylistGeneratorOrchestrator(pluginName string, p *plugin, ds model.DataStore, parentCtx context.Context) *playlistGeneratorOrchestrator { +// playlistGeneratorOrchestrator manages playlist generation for a single plugin. +// All mutable state (refreshTimers, discoveryTimer) is owned exclusively by the +// worker goroutine — no synchronization needed. The retryInterval and +// refreshTimerCount fields use atomics so tests can observe them race-free. +type playlistGeneratorOrchestrator struct { + pluginName string + plugin *plugin + ds model.DataStore + matcher *matcher.Matcher + ctx context.Context + cancel context.CancelFunc + workCh chan workItem // serialized work queue + refreshTimers map[string]*time.Timer // keyed by playlist DB ID — worker-only + discoveryTimer *time.Timer // worker-only + retryInterval atomic.Int64 // nanoseconds; from last GetAvailablePlaylists response + refreshTimerCount atomic.Int32 // number of active refresh timers + done chan struct{} // closed when worker exits +} + +func newPlaylistGeneratorOrchestrator(parentCtx context.Context, pluginName string, p *plugin, ds model.DataStore) *playlistGeneratorOrchestrator { ctx, cancel := context.WithCancel(parentCtx) return &playlistGeneratorOrchestrator{ pluginName: pluginName, @@ -52,23 +77,51 @@ func newPlaylistGeneratorOrchestrator(pluginName string, p *plugin, ds model.Dat matcher: matcher.New(ds), ctx: ctx, cancel: cancel, + workCh: make(chan workItem, workChCapacity), refreshTimers: make(map[string]*time.Timer), + done: make(chan struct{}), + } +} + +// run is the single worker goroutine that processes all work items sequentially. +// It performs an initial discovery before entering the main loop. +func (o *playlistGeneratorOrchestrator) run() { + defer close(o.done) + + // Run initial discovery before entering the loop + o.discoverAndSync() + + for { + select { + case <-o.ctx.Done(): + o.stopAllTimers() + return + case item := <-o.workCh: + switch item.typ { + case workDiscover: + o.discoverAndSync() + case workSync: + o.syncPlaylist(item.info, item.dbID, item.ownerID) + } + } } } // discoverAndSync calls GetAvailablePlaylists, then GetPlaylist for each, matches tracks, and upserts. -func (o *playlistGeneratorOrchestrator) discoverAndSync(ctx context.Context) { +func (o *playlistGeneratorOrchestrator) discoverAndSync() { + ctx := o.ctx resp, err := callPluginFunction[capabilities.GetAvailablePlaylistsRequest, capabilities.GetAvailablePlaylistsResponse]( ctx, o.plugin, FuncPlaylistGeneratorGetAvailablePlaylists, capabilities.GetAvailablePlaylistsRequest{}, ) if err != nil { - log.Error(ctx, "Failed to call GetAvailablePlaylists", "plugin", o.pluginName, err) + log.Error(ctx, "Failed to call GetAvailablePlaylists, retrying later", "plugin", o.pluginName, err) + o.scheduleDiscovery(discoveryRetryDelay) return } // Store retry interval from response if resp.RetryInterval > 0 { - o.retryInterval = time.Duration(resp.RetryInterval) * time.Second + o.retryInterval.Store(int64(time.Duration(resp.RetryInterval) * time.Second)) } for _, info := range resp.Playlists { @@ -81,17 +134,18 @@ func (o *playlistGeneratorOrchestrator) discoverAndSync(ctx context.Context) { } ownerID := user.ID dbID := id.NewHash(o.pluginName, info.ID, ownerID) - o.syncPlaylist(ctx, info, dbID, ownerID) + o.syncPlaylist(info, dbID, ownerID) } // Schedule re-discovery if RefreshInterval > 0 if resp.RefreshInterval > 0 { - o.scheduleDiscovery(ctx, time.Duration(resp.RefreshInterval)*time.Second) + o.scheduleDiscovery(time.Duration(resp.RefreshInterval) * time.Second) } } // syncPlaylist calls GetPlaylist, matches tracks, and upserts the playlist in the DB. -func (o *playlistGeneratorOrchestrator) syncPlaylist(ctx context.Context, info capabilities.PlaylistInfo, dbID string, ownerID string) { +func (o *playlistGeneratorOrchestrator) syncPlaylist(info capabilities.PlaylistInfo, dbID string, ownerID string) { + ctx := o.ctx resp, err := callPluginFunction[capabilities.GetPlaylistRequest, capabilities.GetPlaylistResponse]( ctx, o.plugin, FuncPlaylistGeneratorGetPlaylist, capabilities.GetPlaylistRequest{ID: info.ID}, ) @@ -102,13 +156,14 @@ func (o *playlistGeneratorOrchestrator) syncPlaylist(ctx context.Context, info c if timer, ok := o.refreshTimers[dbID]; ok { timer.Stop() delete(o.refreshTimers, dbID) + o.refreshTimerCount.Store(int32(len(o.refreshTimers))) } return } log.Warn(ctx, "Failed to call GetPlaylist", "plugin", o.pluginName, "playlistID", info.ID, err) // Schedule retry for transient errors if retryInterval is configured - if o.retryInterval > 0 { - o.schedulePlaylistRefresh(ctx, info, dbID, ownerID, o.retryInterval) + if ri := time.Duration(o.retryInterval.Load()); ri > 0 { + o.schedulePlaylistRefresh(info, dbID, ownerID, ri) } return } @@ -162,26 +217,33 @@ func (o *playlistGeneratorOrchestrator) syncPlaylist(ctx context.Context, info c if delay <= 0 { delay = 1 * time.Second // Already expired, refresh soon } - o.schedulePlaylistRefresh(ctx, info, dbID, ownerID, delay) + o.schedulePlaylistRefresh(info, dbID, ownerID, delay) } } -func (o *playlistGeneratorOrchestrator) schedulePlaylistRefresh(_ context.Context, info capabilities.PlaylistInfo, dbID string, ownerID string, delay time.Duration) { +func (o *playlistGeneratorOrchestrator) schedulePlaylistRefresh(info capabilities.PlaylistInfo, dbID string, ownerID string, delay time.Duration) { // Cancel existing timer if any if timer, ok := o.refreshTimers[dbID]; ok { timer.Stop() } o.refreshTimers[dbID] = time.AfterFunc(delay, func() { - o.wg.Go(func() { o.syncPlaylist(o.ctx, info, dbID, ownerID) }) + select { + case o.workCh <- workItem{typ: workSync, info: info, dbID: dbID, ownerID: ownerID}: + case <-o.ctx.Done(): + } }) + o.refreshTimerCount.Store(int32(len(o.refreshTimers))) } -func (o *playlistGeneratorOrchestrator) scheduleDiscovery(_ context.Context, delay time.Duration) { +func (o *playlistGeneratorOrchestrator) scheduleDiscovery(delay time.Duration) { if o.discoveryTimer != nil { o.discoveryTimer.Stop() } o.discoveryTimer = time.AfterFunc(delay, func() { - o.wg.Go(func() { o.discoverAndSync(o.ctx) }) + select { + case o.workCh <- workItem{typ: workDiscover}: + case <-o.ctx.Done(): + } }) } @@ -190,14 +252,18 @@ func isPlaylistNotFoundError(err error) bool { return err != nil && strings.Contains(err.Error(), capabilities.PlaylistGeneratorErrorNotFound.Error()) } -// stop cancels the context, stops all timers, and waits for in-flight goroutines. -func (o *playlistGeneratorOrchestrator) stop() { - o.cancel() +// stopAllTimers stops the discovery timer and all refresh timers. +func (o *playlistGeneratorOrchestrator) stopAllTimers() { if o.discoveryTimer != nil { o.discoveryTimer.Stop() } for _, timer := range o.refreshTimers { timer.Stop() } - o.wg.Wait() +} + +// stop cancels the context and waits for the worker goroutine to finish. +func (o *playlistGeneratorOrchestrator) stop() { + o.cancel() + <-o.done } diff --git a/plugins/playlist_generator_test.go b/plugins/playlist_generator_test.go index d4b5c4f7c..dc4187952 100644 --- a/plugins/playlist_generator_test.go +++ b/plugins/playlist_generator_test.go @@ -5,7 +5,6 @@ package plugins import ( "time" - "github.com/navidrome/navidrome/model" "github.com/navidrome/navidrome/model/id" "github.com/navidrome/navidrome/plugins/capabilities" "github.com/navidrome/navidrome/tests" @@ -24,13 +23,7 @@ var _ = Describe("PlaylistGenerator", Ordered, func() { "test-playlist-generator"+PackageExtension, ) - // Pre-initialize the mock playlist repo to avoid a race with the - // discoverAndSync goroutine that is launched during Start(). - mockDS := pgManager.ds.(*tests.MockDataStore) - if mockDS.MockedPlaylist == nil { - mockDS.MockedPlaylist = tests.CreateMockPlaylistRepo() - } - mockPlsRepo = mockDS.MockedPlaylist.(*tests.MockPlaylistRepo) + mockPlsRepo = pgManager.ds.(*tests.MockDataStore).MockedPlaylist.(*tests.MockPlaylistRepo) }) Describe("capability detection", func() { @@ -49,30 +42,16 @@ var _ = Describe("PlaylistGenerator", Ordered, func() { // The orchestrator runs discoverAndSync in a goroutine on Start(). // Give it a moment to complete. Eventually(func() int { - return len(mockPlsRepo.Data) + return mockPlsRepo.Len() }).Should(BeNumerically(">=", 2)) }) It("creates playlists with correct fields", func() { - // Check that playlists have the correct plugin fields Eventually(func() bool { - for _, pls := range mockPlsRepo.Data { - if pls.PluginID == "test-playlist-generator" && pls.PluginPlaylistID == "daily-mix-1" { - return true - } - } - return false + return mockPlsRepo.FindByPluginPlaylistID("daily-mix-1") != nil }).Should(BeTrue()) - // Find the daily-mix-1 playlist and verify its fields - var dailyMix1 *model.Playlist - for _, pls := range mockPlsRepo.Data { - if pls.PluginPlaylistID == "daily-mix-1" { - dailyMix1 = pls - break - } - } - Expect(dailyMix1).ToNot(BeNil()) + dailyMix1 := mockPlsRepo.FindByPluginPlaylistID("daily-mix-1") Expect(dailyMix1.Name).To(Equal("Daily Mix 1")) Expect(dailyMix1.Comment).To(Equal("Your personalized daily mix")) Expect(dailyMix1.ExternalImageURL).To(Equal("https://example.com/cover1.jpg")) @@ -85,7 +64,7 @@ var _ = Describe("PlaylistGenerator", Ordered, func() { It("generates deterministic playlist IDs", func() { expectedID := id.NewHash("test-playlist-generator", "daily-mix-1", "user-1") Eventually(func() bool { - _, exists := mockPlsRepo.Data[expectedID] + _, exists := mockPlsRepo.GetData(expectedID) return exists }).Should(BeTrue()) }) @@ -96,8 +75,8 @@ var _ = Describe("PlaylistGenerator", Ordered, func() { Expect(id1).ToNot(Equal(id2)) Eventually(func() bool { - _, exists1 := mockPlsRepo.Data[id1] - _, exists2 := mockPlsRepo.Data[id2] + _, exists1 := mockPlsRepo.GetData(id1) + _, exists2 := mockPlsRepo.GetData(id2) return exists1 && exists2 }).Should(BeTrue()) }) @@ -113,15 +92,11 @@ var _ = Describe("PlaylistGenerator", Ordered, func() { Expect(errManager.playlistGenerators).To(HaveKey("test-playlist-generator")) // But no playlists created - errDS := errManager.ds.(*tests.MockDataStore) - if errDS.MockedPlaylist == nil { - errDS.MockedPlaylist = tests.CreateMockPlaylistRepo() - } - errPlsRepo := errDS.MockedPlaylist.(*tests.MockPlaylistRepo) + errPlsRepo := errManager.ds.(*tests.MockDataStore).MockedPlaylist.(*tests.MockPlaylistRepo) // The orchestrator was started but GetAvailablePlaylists returned error, // so no playlists should be created Consistently(func() int { - return len(errPlsRepo.Data) + return errPlsRepo.Len() }, "500ms").Should(Equal(0)) }) }) @@ -139,20 +114,16 @@ var _ = Describe("PlaylistGenerator", Ordered, func() { Expect(notFoundManager.playlistGenerators).To(HaveKey("test-playlist-generator")) // No playlists should be created (all returned NotFound) - notFoundDS := notFoundManager.ds.(*tests.MockDataStore) - if notFoundDS.MockedPlaylist == nil { - notFoundDS.MockedPlaylist = tests.CreateMockPlaylistRepo() - } - notFoundPlsRepo := notFoundDS.MockedPlaylist.(*tests.MockPlaylistRepo) + notFoundPlsRepo := notFoundManager.ds.(*tests.MockDataStore).MockedPlaylist.(*tests.MockPlaylistRepo) Consistently(func() int { - return len(notFoundPlsRepo.Data) + return notFoundPlsRepo.Len() }, "500ms").Should(Equal(0)) // No refresh timers should be scheduled for NotFound playlists orch := notFoundManager.playlistGenerators["test-playlist-generator"] - Eventually(func() int { - return len(orch.refreshTimers) - }).Should(Equal(0)) + Eventually(func() int32 { + return orch.refreshTimerCount.Load() + }).Should(Equal(int32(0))) }) }) @@ -170,23 +141,19 @@ var _ = Describe("PlaylistGenerator", Ordered, func() { // retryInterval should be stored from the response Eventually(func() time.Duration { - return orch.retryInterval + return time.Duration(orch.retryInterval.Load()) }).Should(Equal(60 * time.Second)) // No playlists should be created (GetPlaylist failed) - retryDS := retryManager.ds.(*tests.MockDataStore) - if retryDS.MockedPlaylist == nil { - retryDS.MockedPlaylist = tests.CreateMockPlaylistRepo() - } - retryPlsRepo := retryDS.MockedPlaylist.(*tests.MockPlaylistRepo) + retryPlsRepo := retryManager.ds.(*tests.MockDataStore).MockedPlaylist.(*tests.MockPlaylistRepo) Consistently(func() int { - return len(retryPlsRepo.Data) + return retryPlsRepo.Len() }, "500ms").Should(Equal(0)) // Refresh timers should be scheduled for transient errors - Eventually(func() int { - return len(orch.refreshTimers) - }).Should(BeNumerically(">=", 1)) + Eventually(func() int32 { + return orch.refreshTimerCount.Load() + }).Should(BeNumerically(">=", int32(1))) }) }) diff --git a/plugins/plugins_suite_test.go b/plugins/plugins_suite_test.go index 4ab259173..1e53db664 100644 --- a/plugins/plugins_suite_test.go +++ b/plugins/plugins_suite_test.go @@ -139,7 +139,7 @@ func createTestManagerWithPluginsAndMetrics(pluginConfig map[string]map[string]s mockUserRepo := tests.CreateMockUserRepo() _ = mockUserRepo.Put(&model.User{ID: "user-1", UserName: "admin"}) - dataStore := &tests.MockDataStore{MockedPlugin: mockPluginRepo, MockedUser: mockUserRepo} + dataStore := &tests.MockDataStore{MockedPlugin: mockPluginRepo, MockedUser: mockUserRepo, MockedPlaylist: tests.CreateMockPlaylistRepo()} // Create and start manager manager := &Manager{ diff --git a/tests/mock_playlist_repo.go b/tests/mock_playlist_repo.go index 9bdc52152..0fa3b0eae 100644 --- a/tests/mock_playlist_repo.go +++ b/tests/mock_playlist_repo.go @@ -2,6 +2,7 @@ package tests import ( "errors" + "sync" "github.com/deluan/rest" "github.com/navidrome/navidrome/model" @@ -17,6 +18,7 @@ func CreateMockPlaylistRepo() *MockPlaylistRepo { type MockPlaylistRepo struct { model.PlaylistRepository + mu sync.RWMutex Data map[string]*model.Playlist // keyed by ID PathMap map[string]*model.Playlist // keyed by path Last *model.Playlist @@ -26,10 +28,14 @@ type MockPlaylistRepo struct { } func (m *MockPlaylistRepo) SetError(err bool) { + m.mu.Lock() + defer m.mu.Unlock() m.Err = err } func (m *MockPlaylistRepo) Get(id string) (*model.Playlist, error) { + m.mu.RLock() + defer m.mu.RUnlock() if m.Err { return nil, errors.New("error") } @@ -46,6 +52,8 @@ func (m *MockPlaylistRepo) GetWithTracks(id string, _, _ bool) (*model.Playlist, } func (m *MockPlaylistRepo) Put(pls *model.Playlist) error { + m.mu.Lock() + defer m.mu.Unlock() if m.Err { return errors.New("error") } @@ -60,6 +68,8 @@ func (m *MockPlaylistRepo) Put(pls *model.Playlist) error { } func (m *MockPlaylistRepo) FindByPath(path string) (*model.Playlist, error) { + m.mu.RLock() + defer m.mu.RUnlock() if m.Err { return nil, errors.New("error") } @@ -72,6 +82,8 @@ func (m *MockPlaylistRepo) FindByPath(path string) (*model.Playlist, error) { } func (m *MockPlaylistRepo) Delete(id string) error { + m.mu.Lock() + defer m.mu.Unlock() if m.Err { return errors.New("error") } @@ -80,10 +92,14 @@ func (m *MockPlaylistRepo) Delete(id string) error { } func (m *MockPlaylistRepo) Tracks(_ string, _ bool) model.PlaylistTrackRepository { + m.mu.RLock() + defer m.mu.RUnlock() return m.TracksRepo } func (m *MockPlaylistRepo) Exists(id string) (bool, error) { + m.mu.RLock() + defer m.mu.RUnlock() if m.Err { return false, errors.New("error") } @@ -95,6 +111,8 @@ func (m *MockPlaylistRepo) Exists(id string) (bool, error) { } func (m *MockPlaylistRepo) Count(_ ...rest.QueryOptions) (int64, error) { + m.mu.RLock() + defer m.mu.RUnlock() if m.Err { return 0, errors.New("error") } @@ -102,10 +120,39 @@ func (m *MockPlaylistRepo) Count(_ ...rest.QueryOptions) (int64, error) { } func (m *MockPlaylistRepo) CountAll(_ ...model.QueryOptions) (int64, error) { + m.mu.RLock() + defer m.mu.RUnlock() if m.Err { return 0, errors.New("error") } return int64(len(m.Data)), nil } +// Len returns the number of playlists in the repo in a thread-safe manner. +func (m *MockPlaylistRepo) Len() int { + m.mu.RLock() + defer m.mu.RUnlock() + return len(m.Data) +} + +// GetData returns a playlist by ID in a thread-safe manner. +func (m *MockPlaylistRepo) GetData(id string) (*model.Playlist, bool) { + m.mu.RLock() + defer m.mu.RUnlock() + pls, ok := m.Data[id] + return pls, ok +} + +// FindByPluginPlaylistID returns the first playlist with the given plugin playlist ID. +func (m *MockPlaylistRepo) FindByPluginPlaylistID(pluginPlaylistID string) *model.Playlist { + m.mu.RLock() + defer m.mu.RUnlock() + for _, pls := range m.Data { + if pls.PluginPlaylistID == pluginPlaylistID { + return pls + } + } + return nil +} + var _ model.PlaylistRepository = (*MockPlaylistRepo)(nil)