refactor(plugins): serialize PlaylistGenerator plugin calls via work queue

Replace the timer-per-playlist + WaitGroup.Go model with a single worker
goroutine processing work items from a buffered channel. This eliminates
race conditions from parallel plugin calls (rate limiting risk),
unsynchronized map/field access across goroutines, and overlapping
discovery and refresh timers. The worker owns all mutable state
(refreshTimers, discoveryTimer) exclusively, while retryInterval and
refreshTimerCount use atomics for safe test observability. Also adds
retry-on-failure for GetAvailablePlaylists (5 min delay) and makes
MockPlaylistRepo thread-safe with sync.RWMutex.
This commit is contained in:
Deluan 2026-03-05 14:44:53 -05:00
parent fde4517339
commit 9c516cf601
5 changed files with 167 additions and 87 deletions

View File

@ -192,9 +192,9 @@ func (m *Manager) startPlaylistGenerators(ctx context.Context) {
if !hasCapability(p.capabilities, CapabilityPlaylistGenerator) {
continue
}
orch := newPlaylistGeneratorOrchestrator(name, p, m.ds, ctx)
orch := newPlaylistGeneratorOrchestrator(ctx, name, p, m.ds)
m.playlistGenerators[name] = orch
orch.wg.Go(func() { orch.discoverAndSync(orch.ctx) })
go orch.run()
}
}

View File

@ -4,7 +4,7 @@ import (
"context"
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/navidrome/navidrome/core/matcher"
@ -19,6 +19,12 @@ const (
FuncPlaylistGeneratorGetAvailablePlaylists = "nd_playlist_generator_get_available_playlists"
FuncPlaylistGeneratorGetPlaylist = "nd_playlist_generator_get_playlist"
// workChCapacity is the buffer size for the work channel.
workChCapacity = 16
// discoveryRetryDelay is how long to wait before retrying a failed GetAvailablePlaylists call.
discoveryRetryDelay = 5 * time.Minute
)
func init() {
@ -29,21 +35,40 @@ func init() {
)
}
// playlistGeneratorOrchestrator manages playlist generation for a single plugin.
type playlistGeneratorOrchestrator struct {
pluginName string
plugin *plugin
ds model.DataStore
matcher *matcher.Matcher
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
refreshTimers map[string]*time.Timer // keyed by playlist DB ID
discoveryTimer *time.Timer
retryInterval time.Duration // from last GetAvailablePlaylists response
type workType int
const (
workDiscover workType = iota // run discoverAndSync
workSync // run syncPlaylist for a single playlist
)
type workItem struct {
typ workType
info capabilities.PlaylistInfo // only for workSync
dbID string // only for workSync
ownerID string // only for workSync
}
func newPlaylistGeneratorOrchestrator(pluginName string, p *plugin, ds model.DataStore, parentCtx context.Context) *playlistGeneratorOrchestrator {
// playlistGeneratorOrchestrator manages playlist generation for a single plugin.
// All mutable state (refreshTimers, discoveryTimer) is owned exclusively by the
// worker goroutine — no synchronization needed. The retryInterval and
// refreshTimerCount fields use atomics so tests can observe them race-free.
type playlistGeneratorOrchestrator struct {
pluginName string
plugin *plugin
ds model.DataStore
matcher *matcher.Matcher
ctx context.Context
cancel context.CancelFunc
workCh chan workItem // serialized work queue
refreshTimers map[string]*time.Timer // keyed by playlist DB ID — worker-only
discoveryTimer *time.Timer // worker-only
retryInterval atomic.Int64 // nanoseconds; from last GetAvailablePlaylists response
refreshTimerCount atomic.Int32 // number of active refresh timers
done chan struct{} // closed when worker exits
}
func newPlaylistGeneratorOrchestrator(parentCtx context.Context, pluginName string, p *plugin, ds model.DataStore) *playlistGeneratorOrchestrator {
ctx, cancel := context.WithCancel(parentCtx)
return &playlistGeneratorOrchestrator{
pluginName: pluginName,
@ -52,23 +77,51 @@ func newPlaylistGeneratorOrchestrator(pluginName string, p *plugin, ds model.Dat
matcher: matcher.New(ds),
ctx: ctx,
cancel: cancel,
workCh: make(chan workItem, workChCapacity),
refreshTimers: make(map[string]*time.Timer),
done: make(chan struct{}),
}
}
// run is the single worker goroutine that processes all work items sequentially.
// It performs an initial discovery before entering the main loop.
func (o *playlistGeneratorOrchestrator) run() {
defer close(o.done)
// Run initial discovery before entering the loop
o.discoverAndSync()
for {
select {
case <-o.ctx.Done():
o.stopAllTimers()
return
case item := <-o.workCh:
switch item.typ {
case workDiscover:
o.discoverAndSync()
case workSync:
o.syncPlaylist(item.info, item.dbID, item.ownerID)
}
}
}
}
// discoverAndSync calls GetAvailablePlaylists, then GetPlaylist for each, matches tracks, and upserts.
func (o *playlistGeneratorOrchestrator) discoverAndSync(ctx context.Context) {
func (o *playlistGeneratorOrchestrator) discoverAndSync() {
ctx := o.ctx
resp, err := callPluginFunction[capabilities.GetAvailablePlaylistsRequest, capabilities.GetAvailablePlaylistsResponse](
ctx, o.plugin, FuncPlaylistGeneratorGetAvailablePlaylists, capabilities.GetAvailablePlaylistsRequest{},
)
if err != nil {
log.Error(ctx, "Failed to call GetAvailablePlaylists", "plugin", o.pluginName, err)
log.Error(ctx, "Failed to call GetAvailablePlaylists, retrying later", "plugin", o.pluginName, err)
o.scheduleDiscovery(discoveryRetryDelay)
return
}
// Store retry interval from response
if resp.RetryInterval > 0 {
o.retryInterval = time.Duration(resp.RetryInterval) * time.Second
o.retryInterval.Store(int64(time.Duration(resp.RetryInterval) * time.Second))
}
for _, info := range resp.Playlists {
@ -81,17 +134,18 @@ func (o *playlistGeneratorOrchestrator) discoverAndSync(ctx context.Context) {
}
ownerID := user.ID
dbID := id.NewHash(o.pluginName, info.ID, ownerID)
o.syncPlaylist(ctx, info, dbID, ownerID)
o.syncPlaylist(info, dbID, ownerID)
}
// Schedule re-discovery if RefreshInterval > 0
if resp.RefreshInterval > 0 {
o.scheduleDiscovery(ctx, time.Duration(resp.RefreshInterval)*time.Second)
o.scheduleDiscovery(time.Duration(resp.RefreshInterval) * time.Second)
}
}
// syncPlaylist calls GetPlaylist, matches tracks, and upserts the playlist in the DB.
func (o *playlistGeneratorOrchestrator) syncPlaylist(ctx context.Context, info capabilities.PlaylistInfo, dbID string, ownerID string) {
func (o *playlistGeneratorOrchestrator) syncPlaylist(info capabilities.PlaylistInfo, dbID string, ownerID string) {
ctx := o.ctx
resp, err := callPluginFunction[capabilities.GetPlaylistRequest, capabilities.GetPlaylistResponse](
ctx, o.plugin, FuncPlaylistGeneratorGetPlaylist, capabilities.GetPlaylistRequest{ID: info.ID},
)
@ -102,13 +156,14 @@ func (o *playlistGeneratorOrchestrator) syncPlaylist(ctx context.Context, info c
if timer, ok := o.refreshTimers[dbID]; ok {
timer.Stop()
delete(o.refreshTimers, dbID)
o.refreshTimerCount.Store(int32(len(o.refreshTimers)))
}
return
}
log.Warn(ctx, "Failed to call GetPlaylist", "plugin", o.pluginName, "playlistID", info.ID, err)
// Schedule retry for transient errors if retryInterval is configured
if o.retryInterval > 0 {
o.schedulePlaylistRefresh(ctx, info, dbID, ownerID, o.retryInterval)
if ri := time.Duration(o.retryInterval.Load()); ri > 0 {
o.schedulePlaylistRefresh(info, dbID, ownerID, ri)
}
return
}
@ -162,26 +217,33 @@ func (o *playlistGeneratorOrchestrator) syncPlaylist(ctx context.Context, info c
if delay <= 0 {
delay = 1 * time.Second // Already expired, refresh soon
}
o.schedulePlaylistRefresh(ctx, info, dbID, ownerID, delay)
o.schedulePlaylistRefresh(info, dbID, ownerID, delay)
}
}
func (o *playlistGeneratorOrchestrator) schedulePlaylistRefresh(_ context.Context, info capabilities.PlaylistInfo, dbID string, ownerID string, delay time.Duration) {
func (o *playlistGeneratorOrchestrator) schedulePlaylistRefresh(info capabilities.PlaylistInfo, dbID string, ownerID string, delay time.Duration) {
// Cancel existing timer if any
if timer, ok := o.refreshTimers[dbID]; ok {
timer.Stop()
}
o.refreshTimers[dbID] = time.AfterFunc(delay, func() {
o.wg.Go(func() { o.syncPlaylist(o.ctx, info, dbID, ownerID) })
select {
case o.workCh <- workItem{typ: workSync, info: info, dbID: dbID, ownerID: ownerID}:
case <-o.ctx.Done():
}
})
o.refreshTimerCount.Store(int32(len(o.refreshTimers)))
}
func (o *playlistGeneratorOrchestrator) scheduleDiscovery(_ context.Context, delay time.Duration) {
func (o *playlistGeneratorOrchestrator) scheduleDiscovery(delay time.Duration) {
if o.discoveryTimer != nil {
o.discoveryTimer.Stop()
}
o.discoveryTimer = time.AfterFunc(delay, func() {
o.wg.Go(func() { o.discoverAndSync(o.ctx) })
select {
case o.workCh <- workItem{typ: workDiscover}:
case <-o.ctx.Done():
}
})
}
@ -190,14 +252,18 @@ func isPlaylistNotFoundError(err error) bool {
return err != nil && strings.Contains(err.Error(), capabilities.PlaylistGeneratorErrorNotFound.Error())
}
// stop cancels the context, stops all timers, and waits for in-flight goroutines.
func (o *playlistGeneratorOrchestrator) stop() {
o.cancel()
// stopAllTimers stops the discovery timer and all refresh timers.
func (o *playlistGeneratorOrchestrator) stopAllTimers() {
if o.discoveryTimer != nil {
o.discoveryTimer.Stop()
}
for _, timer := range o.refreshTimers {
timer.Stop()
}
o.wg.Wait()
}
// stop cancels the context and waits for the worker goroutine to finish.
func (o *playlistGeneratorOrchestrator) stop() {
o.cancel()
<-o.done
}

View File

@ -5,7 +5,6 @@ package plugins
import (
"time"
"github.com/navidrome/navidrome/model"
"github.com/navidrome/navidrome/model/id"
"github.com/navidrome/navidrome/plugins/capabilities"
"github.com/navidrome/navidrome/tests"
@ -24,13 +23,7 @@ var _ = Describe("PlaylistGenerator", Ordered, func() {
"test-playlist-generator"+PackageExtension,
)
// Pre-initialize the mock playlist repo to avoid a race with the
// discoverAndSync goroutine that is launched during Start().
mockDS := pgManager.ds.(*tests.MockDataStore)
if mockDS.MockedPlaylist == nil {
mockDS.MockedPlaylist = tests.CreateMockPlaylistRepo()
}
mockPlsRepo = mockDS.MockedPlaylist.(*tests.MockPlaylistRepo)
mockPlsRepo = pgManager.ds.(*tests.MockDataStore).MockedPlaylist.(*tests.MockPlaylistRepo)
})
Describe("capability detection", func() {
@ -49,30 +42,16 @@ var _ = Describe("PlaylistGenerator", Ordered, func() {
// The orchestrator runs discoverAndSync in a goroutine on Start().
// Give it a moment to complete.
Eventually(func() int {
return len(mockPlsRepo.Data)
return mockPlsRepo.Len()
}).Should(BeNumerically(">=", 2))
})
It("creates playlists with correct fields", func() {
// Check that playlists have the correct plugin fields
Eventually(func() bool {
for _, pls := range mockPlsRepo.Data {
if pls.PluginID == "test-playlist-generator" && pls.PluginPlaylistID == "daily-mix-1" {
return true
}
}
return false
return mockPlsRepo.FindByPluginPlaylistID("daily-mix-1") != nil
}).Should(BeTrue())
// Find the daily-mix-1 playlist and verify its fields
var dailyMix1 *model.Playlist
for _, pls := range mockPlsRepo.Data {
if pls.PluginPlaylistID == "daily-mix-1" {
dailyMix1 = pls
break
}
}
Expect(dailyMix1).ToNot(BeNil())
dailyMix1 := mockPlsRepo.FindByPluginPlaylistID("daily-mix-1")
Expect(dailyMix1.Name).To(Equal("Daily Mix 1"))
Expect(dailyMix1.Comment).To(Equal("Your personalized daily mix"))
Expect(dailyMix1.ExternalImageURL).To(Equal("https://example.com/cover1.jpg"))
@ -85,7 +64,7 @@ var _ = Describe("PlaylistGenerator", Ordered, func() {
It("generates deterministic playlist IDs", func() {
expectedID := id.NewHash("test-playlist-generator", "daily-mix-1", "user-1")
Eventually(func() bool {
_, exists := mockPlsRepo.Data[expectedID]
_, exists := mockPlsRepo.GetData(expectedID)
return exists
}).Should(BeTrue())
})
@ -96,8 +75,8 @@ var _ = Describe("PlaylistGenerator", Ordered, func() {
Expect(id1).ToNot(Equal(id2))
Eventually(func() bool {
_, exists1 := mockPlsRepo.Data[id1]
_, exists2 := mockPlsRepo.Data[id2]
_, exists1 := mockPlsRepo.GetData(id1)
_, exists2 := mockPlsRepo.GetData(id2)
return exists1 && exists2
}).Should(BeTrue())
})
@ -113,15 +92,11 @@ var _ = Describe("PlaylistGenerator", Ordered, func() {
Expect(errManager.playlistGenerators).To(HaveKey("test-playlist-generator"))
// But no playlists created
errDS := errManager.ds.(*tests.MockDataStore)
if errDS.MockedPlaylist == nil {
errDS.MockedPlaylist = tests.CreateMockPlaylistRepo()
}
errPlsRepo := errDS.MockedPlaylist.(*tests.MockPlaylistRepo)
errPlsRepo := errManager.ds.(*tests.MockDataStore).MockedPlaylist.(*tests.MockPlaylistRepo)
// The orchestrator was started but GetAvailablePlaylists returned error,
// so no playlists should be created
Consistently(func() int {
return len(errPlsRepo.Data)
return errPlsRepo.Len()
}, "500ms").Should(Equal(0))
})
})
@ -139,20 +114,16 @@ var _ = Describe("PlaylistGenerator", Ordered, func() {
Expect(notFoundManager.playlistGenerators).To(HaveKey("test-playlist-generator"))
// No playlists should be created (all returned NotFound)
notFoundDS := notFoundManager.ds.(*tests.MockDataStore)
if notFoundDS.MockedPlaylist == nil {
notFoundDS.MockedPlaylist = tests.CreateMockPlaylistRepo()
}
notFoundPlsRepo := notFoundDS.MockedPlaylist.(*tests.MockPlaylistRepo)
notFoundPlsRepo := notFoundManager.ds.(*tests.MockDataStore).MockedPlaylist.(*tests.MockPlaylistRepo)
Consistently(func() int {
return len(notFoundPlsRepo.Data)
return notFoundPlsRepo.Len()
}, "500ms").Should(Equal(0))
// No refresh timers should be scheduled for NotFound playlists
orch := notFoundManager.playlistGenerators["test-playlist-generator"]
Eventually(func() int {
return len(orch.refreshTimers)
}).Should(Equal(0))
Eventually(func() int32 {
return orch.refreshTimerCount.Load()
}).Should(Equal(int32(0)))
})
})
@ -170,23 +141,19 @@ var _ = Describe("PlaylistGenerator", Ordered, func() {
// retryInterval should be stored from the response
Eventually(func() time.Duration {
return orch.retryInterval
return time.Duration(orch.retryInterval.Load())
}).Should(Equal(60 * time.Second))
// No playlists should be created (GetPlaylist failed)
retryDS := retryManager.ds.(*tests.MockDataStore)
if retryDS.MockedPlaylist == nil {
retryDS.MockedPlaylist = tests.CreateMockPlaylistRepo()
}
retryPlsRepo := retryDS.MockedPlaylist.(*tests.MockPlaylistRepo)
retryPlsRepo := retryManager.ds.(*tests.MockDataStore).MockedPlaylist.(*tests.MockPlaylistRepo)
Consistently(func() int {
return len(retryPlsRepo.Data)
return retryPlsRepo.Len()
}, "500ms").Should(Equal(0))
// Refresh timers should be scheduled for transient errors
Eventually(func() int {
return len(orch.refreshTimers)
}).Should(BeNumerically(">=", 1))
Eventually(func() int32 {
return orch.refreshTimerCount.Load()
}).Should(BeNumerically(">=", int32(1)))
})
})

View File

@ -139,7 +139,7 @@ func createTestManagerWithPluginsAndMetrics(pluginConfig map[string]map[string]s
mockUserRepo := tests.CreateMockUserRepo()
_ = mockUserRepo.Put(&model.User{ID: "user-1", UserName: "admin"})
dataStore := &tests.MockDataStore{MockedPlugin: mockPluginRepo, MockedUser: mockUserRepo}
dataStore := &tests.MockDataStore{MockedPlugin: mockPluginRepo, MockedUser: mockUserRepo, MockedPlaylist: tests.CreateMockPlaylistRepo()}
// Create and start manager
manager := &Manager{

View File

@ -2,6 +2,7 @@ package tests
import (
"errors"
"sync"
"github.com/deluan/rest"
"github.com/navidrome/navidrome/model"
@ -17,6 +18,7 @@ func CreateMockPlaylistRepo() *MockPlaylistRepo {
type MockPlaylistRepo struct {
model.PlaylistRepository
mu sync.RWMutex
Data map[string]*model.Playlist // keyed by ID
PathMap map[string]*model.Playlist // keyed by path
Last *model.Playlist
@ -26,10 +28,14 @@ type MockPlaylistRepo struct {
}
func (m *MockPlaylistRepo) SetError(err bool) {
m.mu.Lock()
defer m.mu.Unlock()
m.Err = err
}
func (m *MockPlaylistRepo) Get(id string) (*model.Playlist, error) {
m.mu.RLock()
defer m.mu.RUnlock()
if m.Err {
return nil, errors.New("error")
}
@ -46,6 +52,8 @@ func (m *MockPlaylistRepo) GetWithTracks(id string, _, _ bool) (*model.Playlist,
}
func (m *MockPlaylistRepo) Put(pls *model.Playlist) error {
m.mu.Lock()
defer m.mu.Unlock()
if m.Err {
return errors.New("error")
}
@ -60,6 +68,8 @@ func (m *MockPlaylistRepo) Put(pls *model.Playlist) error {
}
func (m *MockPlaylistRepo) FindByPath(path string) (*model.Playlist, error) {
m.mu.RLock()
defer m.mu.RUnlock()
if m.Err {
return nil, errors.New("error")
}
@ -72,6 +82,8 @@ func (m *MockPlaylistRepo) FindByPath(path string) (*model.Playlist, error) {
}
func (m *MockPlaylistRepo) Delete(id string) error {
m.mu.Lock()
defer m.mu.Unlock()
if m.Err {
return errors.New("error")
}
@ -80,10 +92,14 @@ func (m *MockPlaylistRepo) Delete(id string) error {
}
func (m *MockPlaylistRepo) Tracks(_ string, _ bool) model.PlaylistTrackRepository {
m.mu.RLock()
defer m.mu.RUnlock()
return m.TracksRepo
}
func (m *MockPlaylistRepo) Exists(id string) (bool, error) {
m.mu.RLock()
defer m.mu.RUnlock()
if m.Err {
return false, errors.New("error")
}
@ -95,6 +111,8 @@ func (m *MockPlaylistRepo) Exists(id string) (bool, error) {
}
func (m *MockPlaylistRepo) Count(_ ...rest.QueryOptions) (int64, error) {
m.mu.RLock()
defer m.mu.RUnlock()
if m.Err {
return 0, errors.New("error")
}
@ -102,10 +120,39 @@ func (m *MockPlaylistRepo) Count(_ ...rest.QueryOptions) (int64, error) {
}
func (m *MockPlaylistRepo) CountAll(_ ...model.QueryOptions) (int64, error) {
m.mu.RLock()
defer m.mu.RUnlock()
if m.Err {
return 0, errors.New("error")
}
return int64(len(m.Data)), nil
}
// Len returns the number of playlists in the repo in a thread-safe manner.
func (m *MockPlaylistRepo) Len() int {
m.mu.RLock()
defer m.mu.RUnlock()
return len(m.Data)
}
// GetData returns a playlist by ID in a thread-safe manner.
func (m *MockPlaylistRepo) GetData(id string) (*model.Playlist, bool) {
m.mu.RLock()
defer m.mu.RUnlock()
pls, ok := m.Data[id]
return pls, ok
}
// FindByPluginPlaylistID returns the first playlist with the given plugin playlist ID.
func (m *MockPlaylistRepo) FindByPluginPlaylistID(pluginPlaylistID string) *model.Playlist {
m.mu.RLock()
defer m.mu.RUnlock()
for _, pls := range m.Data {
if pls.PluginPlaylistID == pluginPlaylistID {
return pls
}
}
return nil
}
var _ model.PlaylistRepository = (*MockPlaylistRepo)(nil)