mirror of
https://github.com/navidrome/navidrome.git
synced 2026-05-03 06:51:16 +00:00
fix: address race conditions and design issues in PlaylistGenerator
Rename PlaylistInfo.OwnerUserID to OwnerUsername so plugins provide usernames instead of internal user IDs, with server-side resolution via FindByUsername. Fix race condition on playlistGenerators map by using write lock in startPlaylistGenerators and moving map access inside the lock in unloadPlugin/Stop. Add context and WaitGroup to the orchestrator for proper cancellation and goroutine tracking. Include owner_id in the plugin playlist unique index. Use SetTracks instead of direct assignment to refresh playlist duration/size/song count stats.
This commit is contained in:
parent
188584e3fb
commit
a8a336de33
@ -12,15 +12,15 @@ func init() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func upAddPluginPlaylistFields(ctx context.Context, tx *sql.Tx) error {
|
func upAddPluginPlaylistFields(ctx context.Context, tx *sql.Tx) error {
|
||||||
_, err := tx.ExecContext(ctx, `ALTER TABLE playlist ADD COLUMN plugin_id VARCHAR(255) DEFAULT '';`)
|
_, err := tx.ExecContext(ctx, `ALTER TABLE playlist ADD COLUMN plugin_id VARCHAR(255) NOT NULL DEFAULT '';`)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
_, err = tx.ExecContext(ctx, `ALTER TABLE playlist ADD COLUMN plugin_playlist_id VARCHAR(255) DEFAULT '';`)
|
_, err = tx.ExecContext(ctx, `ALTER TABLE playlist ADD COLUMN plugin_playlist_id VARCHAR(255) NOT NULL DEFAULT '';`)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
_, err = tx.ExecContext(ctx, `CREATE UNIQUE INDEX IF NOT EXISTS idx_playlist_plugin ON playlist(plugin_id, plugin_playlist_id) WHERE plugin_id != '';`)
|
_, err = tx.ExecContext(ctx, `CREATE UNIQUE INDEX IF NOT EXISTS idx_playlist_plugin ON playlist(plugin_id, plugin_playlist_id, owner_id) WHERE plugin_id != '';`)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -32,8 +32,8 @@ type GetPlaylistsResponse struct {
|
|||||||
type PlaylistInfo struct {
|
type PlaylistInfo struct {
|
||||||
// ID is the plugin-scoped unique identifier for this playlist.
|
// ID is the plugin-scoped unique identifier for this playlist.
|
||||||
ID string `json:"id"`
|
ID string `json:"id"`
|
||||||
// OwnerUserID is the Navidrome user ID that owns this playlist.
|
// OwnerUsername is the Navidrome username that owns this playlist.
|
||||||
OwnerUserID string `json:"ownerUserId"`
|
OwnerUsername string `json:"ownerUsername"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetPlaylistRequest is the request for GetPlaylist.
|
// GetPlaylistRequest is the request for GetPlaylist.
|
||||||
|
|||||||
@ -79,12 +79,12 @@ components:
|
|||||||
id:
|
id:
|
||||||
type: string
|
type: string
|
||||||
description: ID is the plugin-scoped unique identifier for this playlist.
|
description: ID is the plugin-scoped unique identifier for this playlist.
|
||||||
ownerUserId:
|
ownerUsername:
|
||||||
type: string
|
type: string
|
||||||
description: OwnerUserID is the Navidrome user ID that owns this playlist.
|
description: OwnerUsername is the Navidrome username that owns this playlist.
|
||||||
required:
|
required:
|
||||||
- id
|
- id
|
||||||
- ownerUserId
|
- ownerUsername
|
||||||
SongRef:
|
SongRef:
|
||||||
description: SongRef is a reference to a song with metadata for matching.
|
description: SongRef is a reference to a song with metadata for matching.
|
||||||
properties:
|
properties:
|
||||||
|
|||||||
@ -185,16 +185,16 @@ func (m *Manager) Start(ctx context.Context) error {
|
|||||||
|
|
||||||
// startPlaylistGenerators starts orchestrators for all plugins with the PlaylistGenerator capability.
|
// startPlaylistGenerators starts orchestrators for all plugins with the PlaylistGenerator capability.
|
||||||
func (m *Manager) startPlaylistGenerators(ctx context.Context) {
|
func (m *Manager) startPlaylistGenerators(ctx context.Context) {
|
||||||
m.mu.RLock()
|
m.mu.Lock()
|
||||||
defer m.mu.RUnlock()
|
defer m.mu.Unlock()
|
||||||
|
|
||||||
for name, p := range m.plugins {
|
for name, p := range m.plugins {
|
||||||
if !hasCapability(p.capabilities, CapabilityPlaylistGenerator) {
|
if !hasCapability(p.capabilities, CapabilityPlaylistGenerator) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
orch := newPlaylistGeneratorOrchestrator(name, p, m.ds)
|
orch := newPlaylistGeneratorOrchestrator(name, p, m.ds, ctx)
|
||||||
m.playlistGenerators[name] = orch
|
m.playlistGenerators[name] = orch
|
||||||
go orch.discoverAndSync(ctx)
|
orch.wg.Go(func() { orch.discoverAndSync(orch.ctx) })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -209,9 +209,12 @@ func (m *Manager) Stop() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Stop all playlist generator orchestrators
|
// Stop all playlist generator orchestrators
|
||||||
for name, orch := range m.playlistGenerators {
|
m.mu.Lock()
|
||||||
|
orchestrators := m.playlistGenerators
|
||||||
|
m.playlistGenerators = make(map[string]*playlistGeneratorOrchestrator)
|
||||||
|
m.mu.Unlock()
|
||||||
|
for _, orch := range orchestrators {
|
||||||
orch.stop()
|
orch.stop()
|
||||||
delete(m.playlistGenerators, name)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop file watcher
|
// Stop file watcher
|
||||||
@ -585,12 +588,15 @@ func (m *Manager) unloadPlugin(name string) error {
|
|||||||
return fmt.Errorf("plugin %q not found", name)
|
return fmt.Errorf("plugin %q not found", name)
|
||||||
}
|
}
|
||||||
delete(m.plugins, name)
|
delete(m.plugins, name)
|
||||||
|
|
||||||
|
// Extract playlist generator orchestrator under lock
|
||||||
|
orch := m.playlistGenerators[name]
|
||||||
|
delete(m.playlistGenerators, name)
|
||||||
m.mu.Unlock()
|
m.mu.Unlock()
|
||||||
|
|
||||||
// Stop playlist generator orchestrator if any
|
// Stop playlist generator orchestrator outside lock
|
||||||
if orch, ok := m.playlistGenerators[name]; ok {
|
if orch != nil {
|
||||||
orch.stop()
|
orch.stop()
|
||||||
delete(m.playlistGenerators, name)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run cleanup functions
|
// Run cleanup functions
|
||||||
|
|||||||
@ -49,8 +49,8 @@ type GetPlaylistsResponse struct {
|
|||||||
type PlaylistInfo struct {
|
type PlaylistInfo struct {
|
||||||
// ID is the plugin-scoped unique identifier for this playlist.
|
// ID is the plugin-scoped unique identifier for this playlist.
|
||||||
ID string `json:"id"`
|
ID string `json:"id"`
|
||||||
// OwnerUserID is the Navidrome user ID that owns this playlist.
|
// OwnerUsername is the Navidrome username that owns this playlist.
|
||||||
OwnerUserID string `json:"ownerUserId"`
|
OwnerUsername string `json:"ownerUsername"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// SongRef is a reference to a song with metadata for matching.
|
// SongRef is a reference to a song with metadata for matching.
|
||||||
|
|||||||
@ -46,8 +46,8 @@ type GetPlaylistsResponse struct {
|
|||||||
type PlaylistInfo struct {
|
type PlaylistInfo struct {
|
||||||
// ID is the plugin-scoped unique identifier for this playlist.
|
// ID is the plugin-scoped unique identifier for this playlist.
|
||||||
ID string `json:"id"`
|
ID string `json:"id"`
|
||||||
// OwnerUserID is the Navidrome user ID that owns this playlist.
|
// OwnerUsername is the Navidrome username that owns this playlist.
|
||||||
OwnerUserID string `json:"ownerUserId"`
|
OwnerUsername string `json:"ownerUsername"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// SongRef is a reference to a song with metadata for matching.
|
// SongRef is a reference to a song with metadata for matching.
|
||||||
|
|||||||
@ -71,9 +71,9 @@ pub struct PlaylistInfo {
|
|||||||
/// ID is the plugin-scoped unique identifier for this playlist.
|
/// ID is the plugin-scoped unique identifier for this playlist.
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub id: String,
|
pub id: String,
|
||||||
/// OwnerUserID is the Navidrome user ID that owns this playlist.
|
/// OwnerUsername is the Navidrome username that owns this playlist.
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub owner_user_id: String,
|
pub owner_username: String,
|
||||||
}
|
}
|
||||||
/// SongRef is a reference to a song with metadata for matching.
|
/// SongRef is a reference to a song with metadata for matching.
|
||||||
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
|
||||||
|
|||||||
@ -3,6 +3,7 @@ package plugins
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/navidrome/navidrome/core/matcher"
|
"github.com/navidrome/navidrome/core/matcher"
|
||||||
@ -33,16 +34,22 @@ type playlistGeneratorOrchestrator struct {
|
|||||||
plugin *plugin
|
plugin *plugin
|
||||||
ds model.DataStore
|
ds model.DataStore
|
||||||
matcher *matcher.Matcher
|
matcher *matcher.Matcher
|
||||||
|
ctx context.Context
|
||||||
|
cancel context.CancelFunc
|
||||||
|
wg sync.WaitGroup
|
||||||
refreshTimers map[string]*time.Timer // keyed by playlist DB ID
|
refreshTimers map[string]*time.Timer // keyed by playlist DB ID
|
||||||
discoveryTimer *time.Timer
|
discoveryTimer *time.Timer
|
||||||
}
|
}
|
||||||
|
|
||||||
func newPlaylistGeneratorOrchestrator(pluginName string, p *plugin, ds model.DataStore) *playlistGeneratorOrchestrator {
|
func newPlaylistGeneratorOrchestrator(pluginName string, p *plugin, ds model.DataStore, parentCtx context.Context) *playlistGeneratorOrchestrator {
|
||||||
|
ctx, cancel := context.WithCancel(parentCtx)
|
||||||
return &playlistGeneratorOrchestrator{
|
return &playlistGeneratorOrchestrator{
|
||||||
pluginName: pluginName,
|
pluginName: pluginName,
|
||||||
plugin: p,
|
plugin: p,
|
||||||
ds: ds,
|
ds: ds,
|
||||||
matcher: matcher.New(ds),
|
matcher: matcher.New(ds),
|
||||||
|
ctx: ctx,
|
||||||
|
cancel: cancel,
|
||||||
refreshTimers: make(map[string]*time.Timer),
|
refreshTimers: make(map[string]*time.Timer),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -58,8 +65,16 @@ func (o *playlistGeneratorOrchestrator) discoverAndSync(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, info := range resp.Playlists {
|
for _, info := range resp.Playlists {
|
||||||
dbID := id.NewHash(o.pluginName, info.ID, info.OwnerUserID)
|
// Resolve username to user ID
|
||||||
o.syncPlaylist(ctx, info, dbID)
|
user, err := o.ds.User(adminContext(ctx)).FindByUsername(info.OwnerUsername)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(ctx, "Failed to resolve playlist owner", "plugin", o.pluginName,
|
||||||
|
"playlistID", info.ID, "username", info.OwnerUsername, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
ownerID := user.ID
|
||||||
|
dbID := id.NewHash(o.pluginName, info.ID, ownerID)
|
||||||
|
o.syncPlaylist(ctx, info, dbID, ownerID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Schedule re-discovery if RefreshInterval > 0
|
// Schedule re-discovery if RefreshInterval > 0
|
||||||
@ -69,7 +84,7 @@ func (o *playlistGeneratorOrchestrator) discoverAndSync(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// syncPlaylist calls GetPlaylist, matches tracks, and upserts the playlist in the DB.
|
// syncPlaylist calls GetPlaylist, matches tracks, and upserts the playlist in the DB.
|
||||||
func (o *playlistGeneratorOrchestrator) syncPlaylist(ctx context.Context, info capabilities.PlaylistInfo, dbID string) {
|
func (o *playlistGeneratorOrchestrator) syncPlaylist(ctx context.Context, info capabilities.PlaylistInfo, dbID string, ownerID string) {
|
||||||
resp, err := callPluginFunction[capabilities.GetPlaylistRequest, capabilities.GetPlaylistResponse](
|
resp, err := callPluginFunction[capabilities.GetPlaylistRequest, capabilities.GetPlaylistResponse](
|
||||||
ctx, o.plugin, FuncPlaylistGeneratorGetPlaylist, capabilities.GetPlaylistRequest{ID: info.ID},
|
ctx, o.plugin, FuncPlaylistGeneratorGetPlaylist, capabilities.GetPlaylistRequest{ID: info.ID},
|
||||||
)
|
)
|
||||||
@ -91,7 +106,7 @@ func (o *playlistGeneratorOrchestrator) syncPlaylist(ctx context.Context, info c
|
|||||||
ID: dbID,
|
ID: dbID,
|
||||||
Name: resp.Name,
|
Name: resp.Name,
|
||||||
Comment: resp.Description,
|
Comment: resp.Description,
|
||||||
OwnerID: info.OwnerUserID,
|
OwnerID: ownerID,
|
||||||
Public: false,
|
Public: false,
|
||||||
ExternalImageURL: resp.CoverArtURL,
|
ExternalImageURL: resp.CoverArtURL,
|
||||||
PluginID: o.pluginName,
|
PluginID: o.pluginName,
|
||||||
@ -108,7 +123,7 @@ func (o *playlistGeneratorOrchestrator) syncPlaylist(ctx context.Context, info c
|
|||||||
MediaFile: mf,
|
MediaFile: mf,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pls.Tracks = tracks
|
pls.SetTracks(tracks)
|
||||||
|
|
||||||
// Upsert via repository
|
// Upsert via repository
|
||||||
plsRepo := o.ds.Playlist(ctx)
|
plsRepo := o.ds.Playlist(ctx)
|
||||||
@ -118,7 +133,7 @@ func (o *playlistGeneratorOrchestrator) syncPlaylist(ctx context.Context, info c
|
|||||||
}
|
}
|
||||||
|
|
||||||
log.Info(ctx, "Synced plugin playlist", "plugin", o.pluginName, "playlistID", info.ID,
|
log.Info(ctx, "Synced plugin playlist", "plugin", o.pluginName, "playlistID", info.ID,
|
||||||
"name", resp.Name, "tracks", len(matched), "owner", info.OwnerUserID)
|
"name", resp.Name, "tracks", len(matched), "owner", ownerID)
|
||||||
|
|
||||||
// Schedule refresh if ValidUntil > 0
|
// Schedule refresh if ValidUntil > 0
|
||||||
if resp.ValidUntil > 0 {
|
if resp.ValidUntil > 0 {
|
||||||
@ -127,17 +142,17 @@ func (o *playlistGeneratorOrchestrator) syncPlaylist(ctx context.Context, info c
|
|||||||
if delay <= 0 {
|
if delay <= 0 {
|
||||||
delay = 1 * time.Second // Already expired, refresh soon
|
delay = 1 * time.Second // Already expired, refresh soon
|
||||||
}
|
}
|
||||||
o.schedulePlaylistRefresh(ctx, info, dbID, delay)
|
o.schedulePlaylistRefresh(ctx, info, dbID, ownerID, delay)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (o *playlistGeneratorOrchestrator) schedulePlaylistRefresh(_ context.Context, info capabilities.PlaylistInfo, dbID string, delay time.Duration) {
|
func (o *playlistGeneratorOrchestrator) schedulePlaylistRefresh(_ context.Context, info capabilities.PlaylistInfo, dbID string, ownerID string, delay time.Duration) {
|
||||||
// Cancel existing timer if any
|
// Cancel existing timer if any
|
||||||
if timer, ok := o.refreshTimers[dbID]; ok {
|
if timer, ok := o.refreshTimers[dbID]; ok {
|
||||||
timer.Stop()
|
timer.Stop()
|
||||||
}
|
}
|
||||||
o.refreshTimers[dbID] = time.AfterFunc(delay, func() {
|
o.refreshTimers[dbID] = time.AfterFunc(delay, func() {
|
||||||
o.syncPlaylist(context.Background(), info, dbID)
|
o.wg.Go(func() { o.syncPlaylist(o.ctx, info, dbID, ownerID) })
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -146,16 +161,18 @@ func (o *playlistGeneratorOrchestrator) scheduleDiscovery(_ context.Context, del
|
|||||||
o.discoveryTimer.Stop()
|
o.discoveryTimer.Stop()
|
||||||
}
|
}
|
||||||
o.discoveryTimer = time.AfterFunc(delay, func() {
|
o.discoveryTimer = time.AfterFunc(delay, func() {
|
||||||
o.discoverAndSync(context.Background())
|
o.wg.Go(func() { o.discoverAndSync(o.ctx) })
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// stop cancels all timers.
|
// stop cancels the context, stops all timers, and waits for in-flight goroutines.
|
||||||
func (o *playlistGeneratorOrchestrator) stop() {
|
func (o *playlistGeneratorOrchestrator) stop() {
|
||||||
|
o.cancel()
|
||||||
if o.discoveryTimer != nil {
|
if o.discoveryTimer != nil {
|
||||||
o.discoveryTimer.Stop()
|
o.discoveryTimer.Stop()
|
||||||
}
|
}
|
||||||
for _, timer := range o.refreshTimers {
|
for _, timer := range o.refreshTimers {
|
||||||
timer.Stop()
|
timer.Stop()
|
||||||
}
|
}
|
||||||
|
o.wg.Wait()
|
||||||
}
|
}
|
||||||
|
|||||||
@ -133,7 +133,13 @@ func createTestManagerWithPluginsAndMetrics(pluginConfig map[string]map[string]s
|
|||||||
mockPluginRepo := tests.CreateMockPluginRepo()
|
mockPluginRepo := tests.CreateMockPluginRepo()
|
||||||
mockPluginRepo.Permitted = true
|
mockPluginRepo.Permitted = true
|
||||||
mockPluginRepo.SetData(enabledPlugins)
|
mockPluginRepo.SetData(enabledPlugins)
|
||||||
dataStore := &tests.MockDataStore{MockedPlugin: mockPluginRepo}
|
|
||||||
|
// Pre-seed a mock user repo with a default user so that
|
||||||
|
// PlaylistGenerator's discoverAndSync can resolve usernames.
|
||||||
|
mockUserRepo := tests.CreateMockUserRepo()
|
||||||
|
_ = mockUserRepo.Put(&model.User{ID: "user-1", UserName: "admin"})
|
||||||
|
|
||||||
|
dataStore := &tests.MockDataStore{MockedPlugin: mockPluginRepo, MockedUser: mockUserRepo}
|
||||||
|
|
||||||
// Create and start manager
|
// Create and start manager
|
||||||
manager := &Manager{
|
manager := &Manager{
|
||||||
|
|||||||
12
plugins/testdata/test-playlist-generator/main.go
vendored
12
plugins/testdata/test-playlist-generator/main.go
vendored
@ -21,16 +21,16 @@ func (t *testPlaylistGenerator) GetPlaylists(_ pg.GetPlaylistsRequest) (pg.GetPl
|
|||||||
return pg.GetPlaylistsResponse{}, fmt.Errorf("%s", errMsg)
|
return pg.GetPlaylistsResponse{}, fmt.Errorf("%s", errMsg)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get the owner user ID from config (defaults to "user-1")
|
// Get the owner username from config (defaults to "admin")
|
||||||
ownerID := "user-1"
|
ownerUsername := "admin"
|
||||||
if id, ok := pdk.GetConfig("owner_id"); ok && id != "" {
|
if u, ok := pdk.GetConfig("owner_username"); ok && u != "" {
|
||||||
ownerID = id
|
ownerUsername = u
|
||||||
}
|
}
|
||||||
|
|
||||||
return pg.GetPlaylistsResponse{
|
return pg.GetPlaylistsResponse{
|
||||||
Playlists: []pg.PlaylistInfo{
|
Playlists: []pg.PlaylistInfo{
|
||||||
{ID: "daily-mix-1", OwnerUserID: ownerID},
|
{ID: "daily-mix-1", OwnerUsername: ownerUsername},
|
||||||
{ID: "daily-mix-2", OwnerUserID: ownerID},
|
{ID: "daily-mix-2", OwnerUsername: ownerUsername},
|
||||||
},
|
},
|
||||||
RefreshInterval: 0, // No re-discovery in tests
|
RefreshInterval: 0, // No re-discovery in tests
|
||||||
}, nil
|
}, nil
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user