navidrome/plugins/playlist_provider.go
Deluan ae8263671a fix: address PR review comments for playlist provider capability
Fix timer lifecycle bugs in the playlist syncer: always store
RetryInterval (including 0 to disable retries), cancel discovery timers
when RefreshInterval becomes 0, and cancel stale refresh timers when
ValidUntil becomes 0. Extract cancelRefreshTimer helper to deduplicate
the timer cleanup pattern. Improve plugin playlist update restrictions
in both the Subsonic and REST API paths to compare actual values instead
of just checking pointer presence or field inclusion, so passing
unchanged name/comment no longer triggers a false rejection.

Signed-off-by: Deluan <deluan@navidrome.org>
2026-04-12 17:38:21 -04:00

286 lines
9.1 KiB
Go

package plugins
import (
"context"
"slices"
"strings"
"sync/atomic"
"time"
"github.com/navidrome/navidrome/core/matcher"
"github.com/navidrome/navidrome/log"
"github.com/navidrome/navidrome/model"
"github.com/navidrome/navidrome/model/id"
"github.com/navidrome/navidrome/plugins/capabilities"
)
const (
CapabilityPlaylistProvider Capability = "PlaylistProvider"
FuncPlaylistProviderGetAvailablePlaylists = "nd_playlist_provider_get_available_playlists"
FuncPlaylistProviderGetPlaylist = "nd_playlist_provider_get_playlist"
// workChCapacity is the buffer size for the work channel.
workChCapacity = 64
// discoveryRetryDelay is how long to wait before retrying a failed GetAvailablePlaylists call.
discoveryRetryDelay = 5 * time.Minute
)
func init() {
registerCapability(
CapabilityPlaylistProvider,
FuncPlaylistProviderGetAvailablePlaylists,
FuncPlaylistProviderGetPlaylist,
)
}
type workType int
const (
workDiscover workType = iota // run discoverAndSync
workSync // run syncPlaylist for a single playlist
)
type workItem struct {
typ workType
info capabilities.PlaylistInfo // only for workSync
dbID string // only for workSync
ownerID string // only for workSync
}
// playlistSyncer manages playlist synchronization for a single plugin.
// All mutable state (refreshTimers, discoveryTimer) is owned exclusively by the
// worker goroutine — no synchronization needed. The retryInterval and
// refreshTimerCount fields use atomics so tests can observe them race-free.
type playlistSyncer struct {
pluginName string
plugin *plugin
ds model.DataStore
matcher *matcher.Matcher
ctx context.Context
cancel context.CancelFunc
workCh chan workItem // serialized work queue
refreshTimers map[string]*time.Timer // keyed by playlist DB ID — worker-only
discoveryTimer *time.Timer // worker-only
retryInterval atomic.Int64 // nanoseconds; from last GetAvailablePlaylists response
refreshTimerCount atomic.Int32 // number of active refresh timers
done chan struct{} // closed when worker exits
}
func newPlaylistSyncer(parentCtx context.Context, pluginName string, p *plugin, ds model.DataStore, m *matcher.Matcher) *playlistSyncer {
ctx, cancel := context.WithCancel(parentCtx)
return &playlistSyncer{
pluginName: pluginName,
plugin: p,
ds: ds,
matcher: m,
ctx: ctx,
cancel: cancel,
workCh: make(chan workItem, workChCapacity),
refreshTimers: make(map[string]*time.Timer),
done: make(chan struct{}),
}
}
// run is the single worker goroutine that processes all work items sequentially.
// It performs an initial discovery before entering the main loop.
func (p *playlistSyncer) run() {
defer close(p.done)
// Run initial discovery before entering the loop
p.discoverAndSync()
for {
select {
case <-p.ctx.Done():
p.stopAllTimers()
return
case item := <-p.workCh:
switch item.typ {
case workDiscover:
p.discoverAndSync()
case workSync:
p.syncPlaylist(item.info, item.dbID, item.ownerID)
}
}
}
}
// discoverAndSync calls GetAvailablePlaylists, then GetPlaylist for each, matches tracks, and upserts.
func (p *playlistSyncer) discoverAndSync() {
ctx := p.ctx
resp, err := callPluginFunction[capabilities.GetAvailablePlaylistsRequest, capabilities.GetAvailablePlaylistsResponse](
ctx, p.plugin, FuncPlaylistProviderGetAvailablePlaylists, capabilities.GetAvailablePlaylistsRequest{},
)
if err != nil {
log.Error(ctx, "Failed to call GetAvailablePlaylists, retrying later", "plugin", p.pluginName, err)
p.scheduleDiscovery(discoveryRetryDelay)
return
}
// Store retry interval from response (including 0, which disables retries)
p.retryInterval.Store(int64(time.Duration(resp.RetryInterval) * time.Second))
resolvedUsers := map[string]string{} // username -> userID cache
for _, info := range resp.Playlists {
// Resolve username to user ID (cached)
ownerID, ok := resolvedUsers[info.OwnerUsername]
if !ok {
user, err := p.ds.User(adminContext(ctx)).FindByUsername(info.OwnerUsername)
if err != nil {
log.Error(ctx, "Failed to resolve playlist owner", "plugin", p.pluginName,
"playlistID", info.ID, "username", info.OwnerUsername, err)
continue
}
ownerID = user.ID
resolvedUsers[info.OwnerUsername] = ownerID
}
// Validate that the plugin is permitted to create playlists for this user
if !p.plugin.allUsers && !slices.Contains(p.plugin.allowedUserIDs, ownerID) {
log.Error(ctx, "Plugin not permitted to create playlists for user", "plugin", p.pluginName,
"playlistID", info.ID, "username", info.OwnerUsername)
continue
}
dbID := id.NewHash(p.pluginName, info.ID, ownerID)
p.syncPlaylist(info, dbID, ownerID)
}
// Schedule re-discovery if RefreshInterval > 0, otherwise cancel any existing timer
if resp.RefreshInterval > 0 {
p.scheduleDiscovery(time.Duration(resp.RefreshInterval) * time.Second)
} else if p.discoveryTimer != nil {
p.discoveryTimer.Stop()
p.discoveryTimer = nil
}
}
// syncPlaylist calls GetPlaylist, matches tracks, and upserts the playlist in the DB.
func (p *playlistSyncer) syncPlaylist(info capabilities.PlaylistInfo, dbID string, ownerID string) {
ctx := p.ctx
resp, err := callPluginFunction[capabilities.GetPlaylistRequest, capabilities.GetPlaylistResponse](
ctx, p.plugin, FuncPlaylistProviderGetPlaylist, capabilities.GetPlaylistRequest{ID: info.ID},
)
if err != nil {
if isPlaylistNotFoundError(err) {
log.Info(ctx, "Playlist not found, skipping", "plugin", p.pluginName, "playlistID", info.ID)
p.cancelRefreshTimer(dbID)
return
}
log.Warn(ctx, "Failed to call GetPlaylist", "plugin", p.pluginName, "playlistID", info.ID, err)
// Schedule retry for transient errors if retryInterval is configured
if ri := time.Duration(p.retryInterval.Load()); ri > 0 {
p.schedulePlaylistRefresh(info, dbID, ownerID, ri)
}
return
}
// Convert SongRef → agents.Song and match against library
songs := songRefsToAgentSongs(resp.Tracks)
matched, err := p.matcher.MatchSongsToLibrary(ctx, songs, len(songs))
if err != nil {
log.Error(ctx, "Failed to match songs to library", "plugin", p.pluginName, "playlistID", info.ID, err)
return
}
// Build playlist model
pls := &model.Playlist{
ID: dbID,
Name: resp.Name,
Comment: resp.Description,
OwnerID: ownerID,
Public: false,
ExternalImageURL: resp.CoverArtURL,
PluginID: p.pluginName,
PluginPlaylistID: info.ID,
}
if resp.ValidUntil > 0 {
t := time.Unix(resp.ValidUntil, 0)
pls.ValidUntil = &t
}
// Set tracks from matched media files
pls.AddMediaFiles(matched)
// Upsert via repository
plsRepo := p.ds.Playlist(ctx)
if err := plsRepo.Put(pls); err != nil {
log.Error(ctx, "Failed to upsert plugin playlist", "plugin", p.pluginName, "playlistID", info.ID, err)
return
}
log.Info(ctx, "Synced plugin playlist", "plugin", p.pluginName, "playlistID", info.ID,
"name", resp.Name, "tracks", len(matched), "owner", ownerID)
// Schedule refresh if ValidUntil > 0, otherwise cancel any stale timer
if resp.ValidUntil > 0 {
validUntil := time.Unix(resp.ValidUntil, 0)
delay := time.Until(validUntil)
if delay <= 0 {
delay = 1 * time.Second // Already expired, refresh soon
}
p.schedulePlaylistRefresh(info, dbID, ownerID, delay)
} else {
p.cancelRefreshTimer(dbID)
}
}
// cancelRefreshTimer stops and removes the refresh timer for the given playlist DB ID, if any.
func (p *playlistSyncer) cancelRefreshTimer(dbID string) {
if timer, ok := p.refreshTimers[dbID]; ok {
timer.Stop()
delete(p.refreshTimers, dbID)
p.refreshTimerCount.Store(int32(len(p.refreshTimers)))
}
}
func (p *playlistSyncer) schedulePlaylistRefresh(info capabilities.PlaylistInfo, dbID string, ownerID string, delay time.Duration) {
// Cancel existing timer if any
if timer, ok := p.refreshTimers[dbID]; ok {
timer.Stop()
}
p.refreshTimers[dbID] = time.AfterFunc(delay, func() {
select {
case p.workCh <- workItem{typ: workSync, info: info, dbID: dbID, ownerID: ownerID}:
case <-p.ctx.Done():
}
})
p.refreshTimerCount.Store(int32(len(p.refreshTimers)))
}
func (p *playlistSyncer) scheduleDiscovery(delay time.Duration) {
if p.discoveryTimer != nil {
p.discoveryTimer.Stop()
}
p.discoveryTimer = time.AfterFunc(delay, func() {
select {
case p.workCh <- workItem{typ: workDiscover}:
case <-p.ctx.Done():
}
})
}
// isPlaylistNotFoundError checks if the error contains a NotFound sentinel from the plugin.
func isPlaylistNotFoundError(err error) bool {
return err != nil && strings.Contains(err.Error(), capabilities.PlaylistProviderErrorNotFound.Error())
}
// stopAllTimers stops the discovery timer and all refresh timers.
func (p *playlistSyncer) stopAllTimers() {
if p.discoveryTimer != nil {
p.discoveryTimer.Stop()
}
for _, timer := range p.refreshTimers {
timer.Stop()
}
}
// Close cancels the context and waits for the worker goroutine to finish.
func (p *playlistSyncer) Close() error {
p.cancel()
<-p.done
return nil
}