mirror of
https://github.com/navidrome/navidrome.git
synced 2026-03-04 06:35:52 +00:00
Move scheduler capability check from runtime (when callback fires) to load-time validation in ValidateWithCapabilities. This ensures plugins declaring the scheduler permission must export the nd_scheduler_callback function, failing fast with a clear error instead of silently skipping callbacks at runtime.
210 lines
5.9 KiB
Go
210 lines
5.9 KiB
Go
package plugins
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"maps"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/navidrome/navidrome/log"
|
|
"github.com/navidrome/navidrome/model/id"
|
|
"github.com/navidrome/navidrome/plugins/capabilities"
|
|
"github.com/navidrome/navidrome/plugins/host"
|
|
"github.com/navidrome/navidrome/scheduler"
|
|
)
|
|
|
|
// CapabilityScheduler indicates the plugin can receive scheduled event callbacks.
|
|
// Detected when the plugin exports the scheduler callback function.
|
|
const CapabilityScheduler Capability = "Scheduler"
|
|
|
|
const FuncSchedulerCallback = "nd_scheduler_callback"
|
|
|
|
func init() {
|
|
registerCapability(
|
|
CapabilityScheduler,
|
|
FuncSchedulerCallback,
|
|
)
|
|
}
|
|
|
|
// timeAfterFunc is a variable for time.AfterFunc, allowing tests to override it.
|
|
var timeAfterFunc = time.AfterFunc
|
|
|
|
// scheduleEntry stores metadata about a scheduled task.
|
|
type scheduleEntry struct {
|
|
pluginName string
|
|
payload string
|
|
isRecurring bool
|
|
entryID int // Internal scheduler entry ID (for recurring tasks)
|
|
timer *time.Timer // Timer for one-time tasks (nil for recurring)
|
|
}
|
|
|
|
// schedulerServiceImpl implements host.SchedulerService.
|
|
// It provides plugins with scheduling capabilities and invokes callbacks when schedules fire.
|
|
type schedulerServiceImpl struct {
|
|
pluginName string
|
|
manager *Manager
|
|
scheduler scheduler.Scheduler
|
|
|
|
mu sync.Mutex
|
|
schedules map[string]*scheduleEntry
|
|
}
|
|
|
|
// newSchedulerService creates a new SchedulerService for a plugin.
|
|
func newSchedulerService(pluginName string, manager *Manager, sched scheduler.Scheduler) *schedulerServiceImpl {
|
|
return &schedulerServiceImpl{
|
|
pluginName: pluginName,
|
|
manager: manager,
|
|
scheduler: sched,
|
|
schedules: make(map[string]*scheduleEntry),
|
|
}
|
|
}
|
|
|
|
func (s *schedulerServiceImpl) ScheduleOneTime(ctx context.Context, delaySeconds int32, payload string, scheduleID string) (string, error) {
|
|
if scheduleID == "" {
|
|
scheduleID = id.NewRandom()
|
|
}
|
|
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
if _, exists := s.schedules[scheduleID]; exists {
|
|
return "", fmt.Errorf("schedule ID %q already exists", scheduleID)
|
|
}
|
|
|
|
capturedID := scheduleID
|
|
timer := timeAfterFunc(time.Duration(delaySeconds)*time.Second, func() {
|
|
s.invokeCallback(context.Background(), capturedID)
|
|
// Clean up the entry after firing
|
|
s.mu.Lock()
|
|
delete(s.schedules, capturedID)
|
|
s.mu.Unlock()
|
|
})
|
|
|
|
s.schedules[scheduleID] = &scheduleEntry{
|
|
pluginName: s.pluginName,
|
|
payload: payload,
|
|
isRecurring: false,
|
|
timer: timer,
|
|
}
|
|
|
|
log.Debug(ctx, "Scheduled one-time task", "plugin", s.pluginName, "scheduleID", scheduleID, "delaySeconds", delaySeconds)
|
|
return scheduleID, nil
|
|
}
|
|
|
|
func (s *schedulerServiceImpl) ScheduleRecurring(ctx context.Context, cronExpression string, payload string, scheduleID string) (string, error) {
|
|
if scheduleID == "" {
|
|
scheduleID = id.NewRandom()
|
|
}
|
|
|
|
capturedID := scheduleID
|
|
callback := func() {
|
|
s.invokeCallback(context.Background(), capturedID)
|
|
}
|
|
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
if _, exists := s.schedules[scheduleID]; exists {
|
|
return "", fmt.Errorf("schedule ID %q already exists", scheduleID)
|
|
}
|
|
|
|
entryID, err := s.scheduler.Add(cronExpression, callback)
|
|
if err != nil {
|
|
return "", fmt.Errorf("failed to schedule task: %w", err)
|
|
}
|
|
|
|
s.schedules[scheduleID] = &scheduleEntry{
|
|
pluginName: s.pluginName,
|
|
payload: payload,
|
|
isRecurring: true,
|
|
entryID: entryID,
|
|
}
|
|
|
|
log.Debug(ctx, "Scheduled recurring task", "plugin", s.pluginName, "scheduleID", scheduleID, "cron", cronExpression)
|
|
return scheduleID, nil
|
|
}
|
|
|
|
func (s *schedulerServiceImpl) CancelSchedule(ctx context.Context, scheduleID string) error {
|
|
s.mu.Lock()
|
|
entry, exists := s.schedules[scheduleID]
|
|
if !exists {
|
|
s.mu.Unlock()
|
|
return fmt.Errorf("schedule ID %q not found", scheduleID)
|
|
}
|
|
delete(s.schedules, scheduleID)
|
|
s.mu.Unlock()
|
|
|
|
if entry.timer != nil {
|
|
entry.timer.Stop()
|
|
} else {
|
|
s.scheduler.Remove(entry.entryID)
|
|
}
|
|
log.Debug(ctx, "Cancelled schedule", "plugin", s.pluginName, "scheduleID", scheduleID)
|
|
return nil
|
|
}
|
|
|
|
// Close cancels all schedules for this plugin.
|
|
// This is called when the plugin is unloaded.
|
|
func (s *schedulerServiceImpl) Close() error {
|
|
s.mu.Lock()
|
|
schedules := maps.Clone(s.schedules)
|
|
s.schedules = make(map[string]*scheduleEntry)
|
|
s.mu.Unlock()
|
|
|
|
for scheduleID, entry := range schedules {
|
|
if entry.timer != nil {
|
|
entry.timer.Stop()
|
|
} else {
|
|
s.scheduler.Remove(entry.entryID)
|
|
}
|
|
log.Debug("Cancelled schedule on plugin unload", "plugin", s.pluginName, "scheduleID", scheduleID)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// invokeCallback calls the plugin's nd_scheduler_callback function.
|
|
func (s *schedulerServiceImpl) invokeCallback(ctx context.Context, scheduleID string) {
|
|
log.Debug(ctx, "Scheduler callback invoked", "plugin", s.pluginName, "scheduleID", scheduleID)
|
|
|
|
s.mu.Lock()
|
|
entry, exists := s.schedules[scheduleID]
|
|
if !exists {
|
|
s.mu.Unlock()
|
|
log.Warn(ctx, "Schedule entry not found during callback", "plugin", s.pluginName, "scheduleID", scheduleID)
|
|
return
|
|
}
|
|
payload := entry.payload
|
|
isRecurring := entry.isRecurring
|
|
s.mu.Unlock()
|
|
|
|
// Get the plugin instance from the manager
|
|
s.manager.mu.RLock()
|
|
instance, ok := s.manager.plugins[s.pluginName]
|
|
s.manager.mu.RUnlock()
|
|
|
|
if !ok {
|
|
log.Warn(ctx, "Plugin not loaded when scheduler callback fired", "plugin", s.pluginName, "scheduleID", scheduleID)
|
|
return
|
|
}
|
|
|
|
// Prepare callback input
|
|
input := capabilities.SchedulerCallbackRequest{
|
|
ScheduleID: scheduleID,
|
|
Payload: payload,
|
|
IsRecurring: isRecurring,
|
|
}
|
|
|
|
start := time.Now()
|
|
err := callPluginFunctionNoOutput(ctx, instance, FuncSchedulerCallback, input)
|
|
if err != nil {
|
|
log.Error(ctx, "Scheduler callback failed", "plugin", s.pluginName, "scheduleID", scheduleID, "duration", time.Since(start), err)
|
|
return
|
|
}
|
|
|
|
log.Debug(ctx, "Scheduler callback completed", "plugin", s.pluginName, "scheduleID", scheduleID, "duration", time.Since(start))
|
|
}
|
|
|
|
// Verify interface implementation
|
|
var _ host.SchedulerService = (*schedulerServiceImpl)(nil)
|