mirror of
https://github.com/navidrome/navidrome.git
synced 2026-05-03 06:51:16 +00:00
refactor: update playlistSyncer methods to use consistent receiver naming
Signed-off-by: Deluan <deluan@navidrome.org>
This commit is contained in:
parent
73203eeef0
commit
0a67142f74
@ -85,43 +85,43 @@ func newPlaylistSyncer(parentCtx context.Context, pluginName string, p *plugin,
|
|||||||
|
|
||||||
// run is the single worker goroutine that processes all work items sequentially.
|
// run is the single worker goroutine that processes all work items sequentially.
|
||||||
// It performs an initial discovery before entering the main loop.
|
// It performs an initial discovery before entering the main loop.
|
||||||
func (o *playlistSyncer) run() {
|
func (p *playlistSyncer) run() {
|
||||||
defer close(o.done)
|
defer close(p.done)
|
||||||
|
|
||||||
// Run initial discovery before entering the loop
|
// Run initial discovery before entering the loop
|
||||||
o.discoverAndSync()
|
p.discoverAndSync()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-o.ctx.Done():
|
case <-p.ctx.Done():
|
||||||
o.stopAllTimers()
|
p.stopAllTimers()
|
||||||
return
|
return
|
||||||
case item := <-o.workCh:
|
case item := <-p.workCh:
|
||||||
switch item.typ {
|
switch item.typ {
|
||||||
case workDiscover:
|
case workDiscover:
|
||||||
o.discoverAndSync()
|
p.discoverAndSync()
|
||||||
case workSync:
|
case workSync:
|
||||||
o.syncPlaylist(item.info, item.dbID, item.ownerID)
|
p.syncPlaylist(item.info, item.dbID, item.ownerID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// discoverAndSync calls GetAvailablePlaylists, then GetPlaylist for each, matches tracks, and upserts.
|
// discoverAndSync calls GetAvailablePlaylists, then GetPlaylist for each, matches tracks, and upserts.
|
||||||
func (o *playlistSyncer) discoverAndSync() {
|
func (p *playlistSyncer) discoverAndSync() {
|
||||||
ctx := o.ctx
|
ctx := p.ctx
|
||||||
resp, err := callPluginFunction[capabilities.GetAvailablePlaylistsRequest, capabilities.GetAvailablePlaylistsResponse](
|
resp, err := callPluginFunction[capabilities.GetAvailablePlaylistsRequest, capabilities.GetAvailablePlaylistsResponse](
|
||||||
ctx, o.plugin, FuncPlaylistProviderGetAvailablePlaylists, capabilities.GetAvailablePlaylistsRequest{},
|
ctx, p.plugin, FuncPlaylistProviderGetAvailablePlaylists, capabilities.GetAvailablePlaylistsRequest{},
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(ctx, "Failed to call GetAvailablePlaylists, retrying later", "plugin", o.pluginName, err)
|
log.Error(ctx, "Failed to call GetAvailablePlaylists, retrying later", "plugin", p.pluginName, err)
|
||||||
o.scheduleDiscovery(discoveryRetryDelay)
|
p.scheduleDiscovery(discoveryRetryDelay)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store retry interval from response
|
// Store retry interval from response
|
||||||
if resp.RetryInterval > 0 {
|
if resp.RetryInterval > 0 {
|
||||||
o.retryInterval.Store(int64(time.Duration(resp.RetryInterval) * time.Second))
|
p.retryInterval.Store(int64(time.Duration(resp.RetryInterval) * time.Second))
|
||||||
}
|
}
|
||||||
|
|
||||||
resolvedUsers := map[string]string{} // username -> userID cache
|
resolvedUsers := map[string]string{} // username -> userID cache
|
||||||
@ -129,9 +129,9 @@ func (o *playlistSyncer) discoverAndSync() {
|
|||||||
// Resolve username to user ID (cached)
|
// Resolve username to user ID (cached)
|
||||||
ownerID, ok := resolvedUsers[info.OwnerUsername]
|
ownerID, ok := resolvedUsers[info.OwnerUsername]
|
||||||
if !ok {
|
if !ok {
|
||||||
user, err := o.ds.User(adminContext(ctx)).FindByUsername(info.OwnerUsername)
|
user, err := p.ds.User(adminContext(ctx)).FindByUsername(info.OwnerUsername)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(ctx, "Failed to resolve playlist owner", "plugin", o.pluginName,
|
log.Error(ctx, "Failed to resolve playlist owner", "plugin", p.pluginName,
|
||||||
"playlistID", info.ID, "username", info.OwnerUsername, err)
|
"playlistID", info.ID, "username", info.OwnerUsername, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -140,52 +140,52 @@ func (o *playlistSyncer) discoverAndSync() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Validate that the plugin is permitted to create playlists for this user
|
// Validate that the plugin is permitted to create playlists for this user
|
||||||
if !o.plugin.allUsers && !slices.Contains(o.plugin.allowedUserIDs, ownerID) {
|
if !p.plugin.allUsers && !slices.Contains(p.plugin.allowedUserIDs, ownerID) {
|
||||||
log.Error(ctx, "Plugin not permitted to create playlists for user", "plugin", o.pluginName,
|
log.Error(ctx, "Plugin not permitted to create playlists for user", "plugin", p.pluginName,
|
||||||
"playlistID", info.ID, "username", info.OwnerUsername)
|
"playlistID", info.ID, "username", info.OwnerUsername)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
dbID := id.NewHash(o.pluginName, info.ID, ownerID)
|
dbID := id.NewHash(p.pluginName, info.ID, ownerID)
|
||||||
o.syncPlaylist(info, dbID, ownerID)
|
p.syncPlaylist(info, dbID, ownerID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Schedule re-discovery if RefreshInterval > 0
|
// Schedule re-discovery if RefreshInterval > 0
|
||||||
if resp.RefreshInterval > 0 {
|
if resp.RefreshInterval > 0 {
|
||||||
o.scheduleDiscovery(time.Duration(resp.RefreshInterval) * time.Second)
|
p.scheduleDiscovery(time.Duration(resp.RefreshInterval) * time.Second)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// syncPlaylist calls GetPlaylist, matches tracks, and upserts the playlist in the DB.
|
// syncPlaylist calls GetPlaylist, matches tracks, and upserts the playlist in the DB.
|
||||||
func (o *playlistSyncer) syncPlaylist(info capabilities.PlaylistInfo, dbID string, ownerID string) {
|
func (p *playlistSyncer) syncPlaylist(info capabilities.PlaylistInfo, dbID string, ownerID string) {
|
||||||
ctx := o.ctx
|
ctx := p.ctx
|
||||||
resp, err := callPluginFunction[capabilities.GetPlaylistRequest, capabilities.GetPlaylistResponse](
|
resp, err := callPluginFunction[capabilities.GetPlaylistRequest, capabilities.GetPlaylistResponse](
|
||||||
ctx, o.plugin, FuncPlaylistProviderGetPlaylist, capabilities.GetPlaylistRequest{ID: info.ID},
|
ctx, p.plugin, FuncPlaylistProviderGetPlaylist, capabilities.GetPlaylistRequest{ID: info.ID},
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if isPlaylistNotFoundError(err) {
|
if isPlaylistNotFoundError(err) {
|
||||||
log.Info(ctx, "Playlist not found, skipping", "plugin", o.pluginName, "playlistID", info.ID)
|
log.Info(ctx, "Playlist not found, skipping", "plugin", p.pluginName, "playlistID", info.ID)
|
||||||
// Stop any existing refresh timer for this playlist
|
// Stop any existing refresh timer for this playlist
|
||||||
if timer, ok := o.refreshTimers[dbID]; ok {
|
if timer, ok := p.refreshTimers[dbID]; ok {
|
||||||
timer.Stop()
|
timer.Stop()
|
||||||
delete(o.refreshTimers, dbID)
|
delete(p.refreshTimers, dbID)
|
||||||
o.refreshTimerCount.Store(int32(len(o.refreshTimers)))
|
p.refreshTimerCount.Store(int32(len(p.refreshTimers)))
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
log.Warn(ctx, "Failed to call GetPlaylist", "plugin", o.pluginName, "playlistID", info.ID, err)
|
log.Warn(ctx, "Failed to call GetPlaylist", "plugin", p.pluginName, "playlistID", info.ID, err)
|
||||||
// Schedule retry for transient errors if retryInterval is configured
|
// Schedule retry for transient errors if retryInterval is configured
|
||||||
if ri := time.Duration(o.retryInterval.Load()); ri > 0 {
|
if ri := time.Duration(p.retryInterval.Load()); ri > 0 {
|
||||||
o.schedulePlaylistRefresh(info, dbID, ownerID, ri)
|
p.schedulePlaylistRefresh(info, dbID, ownerID, ri)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Convert SongRef → agents.Song and match against library
|
// Convert SongRef → agents.Song and match against library
|
||||||
songs := songRefsToAgentSongs(resp.Tracks)
|
songs := songRefsToAgentSongs(resp.Tracks)
|
||||||
matched, err := o.matcher.MatchSongsToLibrary(ctx, songs, len(songs))
|
matched, err := p.matcher.MatchSongsToLibrary(ctx, songs, len(songs))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(ctx, "Failed to match songs to library", "plugin", o.pluginName, "playlistID", info.ID, err)
|
log.Error(ctx, "Failed to match songs to library", "plugin", p.pluginName, "playlistID", info.ID, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -197,7 +197,7 @@ func (o *playlistSyncer) syncPlaylist(info capabilities.PlaylistInfo, dbID strin
|
|||||||
OwnerID: ownerID,
|
OwnerID: ownerID,
|
||||||
Public: false,
|
Public: false,
|
||||||
ExternalImageURL: resp.CoverArtURL,
|
ExternalImageURL: resp.CoverArtURL,
|
||||||
PluginID: o.pluginName,
|
PluginID: p.pluginName,
|
||||||
PluginPlaylistID: info.ID,
|
PluginPlaylistID: info.ID,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -205,13 +205,13 @@ func (o *playlistSyncer) syncPlaylist(info capabilities.PlaylistInfo, dbID strin
|
|||||||
pls.AddMediaFiles(matched)
|
pls.AddMediaFiles(matched)
|
||||||
|
|
||||||
// Upsert via repository
|
// Upsert via repository
|
||||||
plsRepo := o.ds.Playlist(ctx)
|
plsRepo := p.ds.Playlist(ctx)
|
||||||
if err := plsRepo.Put(pls); err != nil {
|
if err := plsRepo.Put(pls); err != nil {
|
||||||
log.Error(ctx, "Failed to upsert plugin playlist", "plugin", o.pluginName, "playlistID", info.ID, err)
|
log.Error(ctx, "Failed to upsert plugin playlist", "plugin", p.pluginName, "playlistID", info.ID, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info(ctx, "Synced plugin playlist", "plugin", o.pluginName, "playlistID", info.ID,
|
log.Info(ctx, "Synced plugin playlist", "plugin", p.pluginName, "playlistID", info.ID,
|
||||||
"name", resp.Name, "tracks", len(matched), "owner", ownerID)
|
"name", resp.Name, "tracks", len(matched), "owner", ownerID)
|
||||||
|
|
||||||
// Schedule refresh if ValidUntil > 0
|
// Schedule refresh if ValidUntil > 0
|
||||||
@ -221,32 +221,32 @@ func (o *playlistSyncer) syncPlaylist(info capabilities.PlaylistInfo, dbID strin
|
|||||||
if delay <= 0 {
|
if delay <= 0 {
|
||||||
delay = 1 * time.Second // Already expired, refresh soon
|
delay = 1 * time.Second // Already expired, refresh soon
|
||||||
}
|
}
|
||||||
o.schedulePlaylistRefresh(info, dbID, ownerID, delay)
|
p.schedulePlaylistRefresh(info, dbID, ownerID, delay)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (o *playlistSyncer) schedulePlaylistRefresh(info capabilities.PlaylistInfo, dbID string, ownerID string, delay time.Duration) {
|
func (p *playlistSyncer) schedulePlaylistRefresh(info capabilities.PlaylistInfo, dbID string, ownerID string, delay time.Duration) {
|
||||||
// Cancel existing timer if any
|
// Cancel existing timer if any
|
||||||
if timer, ok := o.refreshTimers[dbID]; ok {
|
if timer, ok := p.refreshTimers[dbID]; ok {
|
||||||
timer.Stop()
|
timer.Stop()
|
||||||
}
|
}
|
||||||
o.refreshTimers[dbID] = time.AfterFunc(delay, func() {
|
p.refreshTimers[dbID] = time.AfterFunc(delay, func() {
|
||||||
select {
|
select {
|
||||||
case o.workCh <- workItem{typ: workSync, info: info, dbID: dbID, ownerID: ownerID}:
|
case p.workCh <- workItem{typ: workSync, info: info, dbID: dbID, ownerID: ownerID}:
|
||||||
case <-o.ctx.Done():
|
case <-p.ctx.Done():
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
o.refreshTimerCount.Store(int32(len(o.refreshTimers)))
|
p.refreshTimerCount.Store(int32(len(p.refreshTimers)))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (o *playlistSyncer) scheduleDiscovery(delay time.Duration) {
|
func (p *playlistSyncer) scheduleDiscovery(delay time.Duration) {
|
||||||
if o.discoveryTimer != nil {
|
if p.discoveryTimer != nil {
|
||||||
o.discoveryTimer.Stop()
|
p.discoveryTimer.Stop()
|
||||||
}
|
}
|
||||||
o.discoveryTimer = time.AfterFunc(delay, func() {
|
p.discoveryTimer = time.AfterFunc(delay, func() {
|
||||||
select {
|
select {
|
||||||
case o.workCh <- workItem{typ: workDiscover}:
|
case p.workCh <- workItem{typ: workDiscover}:
|
||||||
case <-o.ctx.Done():
|
case <-p.ctx.Done():
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -257,18 +257,18 @@ func isPlaylistNotFoundError(err error) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// stopAllTimers stops the discovery timer and all refresh timers.
|
// stopAllTimers stops the discovery timer and all refresh timers.
|
||||||
func (o *playlistSyncer) stopAllTimers() {
|
func (p *playlistSyncer) stopAllTimers() {
|
||||||
if o.discoveryTimer != nil {
|
if p.discoveryTimer != nil {
|
||||||
o.discoveryTimer.Stop()
|
p.discoveryTimer.Stop()
|
||||||
}
|
}
|
||||||
for _, timer := range o.refreshTimers {
|
for _, timer := range p.refreshTimers {
|
||||||
timer.Stop()
|
timer.Stop()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close cancels the context and waits for the worker goroutine to finish.
|
// Close cancels the context and waits for the worker goroutine to finish.
|
||||||
func (o *playlistSyncer) Close() error {
|
func (p *playlistSyncer) Close() error {
|
||||||
o.cancel()
|
p.cancel()
|
||||||
<-o.done
|
<-p.done
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user