refactor(scheduler): add Close method for resource cleanup on plugin unload

Signed-off-by: Deluan <deluan@navidrome.org>
This commit is contained in:
Deluan 2025-12-24 09:59:34 -05:00
parent b94a214c91
commit 5e2e37bca7
4 changed files with 54 additions and 52 deletions

View File

@ -39,4 +39,9 @@ type SchedulerService interface {
// Returns an error if the schedule ID is not found or if cancellation fails. // Returns an error if the schedule ID is not found or if cancellation fails.
//nd:hostfunc //nd:hostfunc
CancelSchedule(ctx context.Context, scheduleID string) error CancelSchedule(ctx context.Context, scheduleID string) error
// Close cleans up any resources used by the SchedulerService.
//
// This should be called when the plugin is unloaded to ensure proper cleanup.
Close() error
} }

View File

@ -61,7 +61,7 @@ func (s *schedulerServiceImpl) ScheduleOneTime(ctx context.Context, delaySeconds
capturedID := scheduleID capturedID := scheduleID
timer := timeAfterFunc(time.Duration(delaySeconds)*time.Second, func() { timer := timeAfterFunc(time.Duration(delaySeconds)*time.Second, func() {
s.invokeCallback(capturedID) s.invokeCallback(ctx, capturedID)
// Clean up the entry after firing // Clean up the entry after firing
s.mu.Lock() s.mu.Lock()
delete(s.schedules, capturedID) delete(s.schedules, capturedID)
@ -86,7 +86,7 @@ func (s *schedulerServiceImpl) ScheduleRecurring(ctx context.Context, cronExpres
capturedID := scheduleID capturedID := scheduleID
callback := func() { callback := func() {
s.invokeCallback(capturedID) s.invokeCallback(ctx, capturedID)
} }
s.mu.Lock() s.mu.Lock()
@ -131,9 +131,9 @@ func (s *schedulerServiceImpl) CancelSchedule(ctx context.Context, scheduleID st
return nil return nil
} }
// CancelAllForPlugin cancels all schedules for this plugin. // Close cancels all schedules for this plugin.
// This is called when the plugin is unloaded. // This is called when the plugin is unloaded.
func (s *schedulerServiceImpl) CancelAllForPlugin() { func (s *schedulerServiceImpl) Close() error {
s.mu.Lock() s.mu.Lock()
schedules := make(map[string]*scheduleEntry, len(s.schedules)) schedules := make(map[string]*scheduleEntry, len(s.schedules))
for k, v := range s.schedules { for k, v := range s.schedules {
@ -148,8 +148,9 @@ func (s *schedulerServiceImpl) CancelAllForPlugin() {
} else { } else {
s.scheduler.Remove(entry.entryID) s.scheduler.Remove(entry.entryID)
} }
log.Debug(context.Background(), "Cancelled schedule on plugin unload", "plugin", s.pluginName, "scheduleID", scheduleID) log.Debug("Cancelled schedule on plugin unload", "plugin", s.pluginName, "scheduleID", scheduleID)
} }
return nil
} }
// schedulerCallbackInput is the input format for the nd_scheduler_callback function. // schedulerCallbackInput is the input format for the nd_scheduler_callback function.
@ -165,8 +166,7 @@ type schedulerCallbackOutput struct {
} }
// invokeCallback calls the plugin's nd_scheduler_callback function. // invokeCallback calls the plugin's nd_scheduler_callback function.
func (s *schedulerServiceImpl) invokeCallback(scheduleID string) { func (s *schedulerServiceImpl) invokeCallback(ctx context.Context, scheduleID string) {
ctx := context.Background()
log.Debug(ctx, "Scheduler callback invoked", "plugin", s.pluginName, "scheduleID", scheduleID) log.Debug(ctx, "Scheduler callback invoked", "plugin", s.pluginName, "scheduleID", scheduleID)
s.mu.Lock() s.mu.Lock()
@ -220,41 +220,3 @@ func (s *schedulerServiceImpl) invokeCallback(scheduleID string) {
// Verify interface implementation // Verify interface implementation
var _ host.SchedulerService = (*schedulerServiceImpl)(nil) var _ host.SchedulerService = (*schedulerServiceImpl)(nil)
// schedulerServiceRegistry keeps track of scheduler services per plugin for cleanup.
type schedulerServiceRegistry struct {
mu sync.RWMutex
services map[string]*schedulerServiceImpl
}
var schedulerRegistry = &schedulerServiceRegistry{
services: make(map[string]*schedulerServiceImpl),
}
// registerSchedulerService registers a scheduler service for a plugin.
func registerSchedulerService(pluginName string, service *schedulerServiceImpl) {
schedulerRegistry.mu.Lock()
defer schedulerRegistry.mu.Unlock()
schedulerRegistry.services[pluginName] = service
}
// unregisterSchedulerService unregisters and cancels all schedules for a plugin.
func unregisterSchedulerService(pluginName string) {
schedulerRegistry.mu.Lock()
service, exists := schedulerRegistry.services[pluginName]
if exists {
delete(schedulerRegistry.services, pluginName)
}
schedulerRegistry.mu.Unlock()
if exists && service != nil {
service.CancelAllForPlugin()
}
}
// getSchedulerService returns the scheduler service for a plugin (used by tests).
func getSchedulerService(pluginName string) *schedulerServiceImpl {
schedulerRegistry.mu.RLock()
defer schedulerRegistry.mu.RUnlock()
return schedulerRegistry.services[pluginName]
}

View File

@ -61,8 +61,8 @@ var _ = Describe("SchedulerService", Ordered, func() {
err = manager.Start(GinkgoT().Context()) err = manager.Start(GinkgoT().Context())
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
// Wrap the scheduler service and replace the scheduler with our mock // Get scheduler service from plugin's closers and wrap it for testing
service := getSchedulerService("fake-scheduler") service := findSchedulerService(manager, "fake-scheduler")
Expect(service).ToNot(BeNil()) Expect(service).ToNot(BeNil())
testService = &testableSchedulerService{schedulerServiceImpl: service} testService = &testableSchedulerService{schedulerServiceImpl: service}
testService.scheduler = mockSched testService.scheduler = mockSched
@ -87,7 +87,7 @@ var _ = Describe("SchedulerService", Ordered, func() {
}) })
It("should register scheduler service for plugin", func() { It("should register scheduler service for plugin", func() {
service := getSchedulerService("fake-scheduler") service := findSchedulerService(manager, "fake-scheduler")
Expect(service).ToNot(BeNil()) Expect(service).ToNot(BeNil())
}) })
}) })
@ -245,7 +245,7 @@ var _ = Describe("SchedulerService", Ordered, func() {
err = manager.UnloadPlugin("fake-scheduler") err = manager.UnloadPlugin("fake-scheduler")
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(getSchedulerService("fake-scheduler")).To(BeNil()) Expect(findSchedulerService(manager, "fake-scheduler")).To(BeNil())
Expect(mockSched.GetCallbackCount()).To(Equal(0)) // Recurring task removed Expect(mockSched.GetCallbackCount()).To(Equal(0)) // Recurring task removed
}) })
}) })
@ -386,3 +386,19 @@ func (r *mockTimerRegistry) Reset() {
r.callbacks = make([]func(), 0) r.callbacks = make([]func(), 0)
r.timers = make([]*time.Timer, 0) r.timers = make([]*time.Timer, 0)
} }
// findSchedulerService finds the scheduler service from a plugin's closers.
func findSchedulerService(m *Manager, pluginName string) *schedulerServiceImpl {
m.mu.RLock()
instance, ok := m.plugins[pluginName]
m.mu.RUnlock()
if !ok {
return nil
}
for _, closer := range instance.closers {
if svc, ok := closer.(*schedulerServiceImpl); ok {
return svc
}
}
return nil
}

View File

@ -4,7 +4,9 @@ import (
"context" "context"
"crypto/rand" "crypto/rand"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"io"
"net/http" "net/http"
"os" "os"
"path/filepath" "path/filepath"
@ -67,6 +69,7 @@ type pluginInstance struct {
manifest *Manifest manifest *Manifest
compiled *extism.CompiledPlugin compiled *extism.CompiledPlugin
capabilities []Capability // Auto-detected capabilities based on exported functions capabilities []Capability // Auto-detected capabilities based on exported functions
closers []io.Closer // Cleanup functions to call on unload
} }
func (p *pluginInstance) create() (*extism.Plugin, error) { func (p *pluginInstance) create() (*extism.Plugin, error) {
@ -80,6 +83,17 @@ func (p *pluginInstance) create() (*extism.Plugin, error) {
return plugin, nil return plugin, nil
} }
func (p *pluginInstance) Close() error {
var errs []error
for _, f := range p.closers {
err := f.Close()
if err != nil {
errs = append(errs, err)
}
}
return errors.Join(errs...)
}
// GetManager returns a singleton instance of the plugin manager. // GetManager returns a singleton instance of the plugin manager.
// The manager is not started automatically; call Start() to begin loading plugins. // The manager is not started automatically; call Start() to begin loading plugins.
func GetManager() *Manager { func GetManager() *Manager {
@ -384,6 +398,7 @@ func (m *Manager) loadPlugin(name, wasmPath string) error {
// Check if recompilation is needed (AllowedHosts or SubsonicAPI permission) // Check if recompilation is needed (AllowedHosts or SubsonicAPI permission)
needsRecompile := false needsRecompile := false
var hostFunctions []extism.HostFunction var hostFunctions []extism.HostFunction
var closers []io.Closer
if hosts := manifest.AllowedHosts(); len(hosts) > 0 { if hosts := manifest.AllowedHosts(); len(hosts) > 0 {
pluginManifest.AllowedHosts = hosts pluginManifest.AllowedHosts = hosts
@ -405,8 +420,8 @@ func (m *Manager) loadPlugin(name, wasmPath string) error {
// Register Scheduler host functions if permission is granted // Register Scheduler host functions if permission is granted
if manifest.Permissions != nil && manifest.Permissions.Scheduler != nil { if manifest.Permissions != nil && manifest.Permissions.Scheduler != nil {
service := newSchedulerService(name, m, scheduler.GetInstance()) service := newSchedulerService(name, m, scheduler.GetInstance())
closers = append(closers, service)
hostFunctions = append(hostFunctions, host.RegisterSchedulerHostFunctions(service)...) hostFunctions = append(hostFunctions, host.RegisterSchedulerHostFunctions(service)...)
registerSchedulerService(name, service.(*schedulerServiceImpl))
needsRecompile = true needsRecompile = true
} }
@ -426,6 +441,7 @@ func (m *Manager) loadPlugin(name, wasmPath string) error {
manifest: &manifest, manifest: &manifest,
compiled: compiled, compiled: compiled,
capabilities: capabilities, capabilities: capabilities,
closers: closers,
} }
m.mu.Unlock() m.mu.Unlock()
return nil return nil
@ -451,8 +467,11 @@ func (m *Manager) UnloadPlugin(name string) error {
delete(m.plugins, name) delete(m.plugins, name)
m.mu.Unlock() m.mu.Unlock()
// Cancel all scheduled tasks for this plugin // Run cleanup functions
unregisterSchedulerService(name) err := instance.Close()
if err != nil {
log.Error("Error during plugin cleanup", "plugin", name, err)
}
// Close the compiled plugin outside the lock with a grace period // Close the compiled plugin outside the lock with a grace period
// to allow in-flight requests to complete // to allow in-flight requests to complete