diff --git a/plugins/host/scheduler.go b/plugins/host/scheduler.go index cddac94cf..114489722 100644 --- a/plugins/host/scheduler.go +++ b/plugins/host/scheduler.go @@ -39,4 +39,9 @@ type SchedulerService interface { // Returns an error if the schedule ID is not found or if cancellation fails. //nd:hostfunc CancelSchedule(ctx context.Context, scheduleID string) error + + // Close cleans up any resources used by the SchedulerService. + // + // This should be called when the plugin is unloaded to ensure proper cleanup. + Close() error } diff --git a/plugins/host_scheduler.go b/plugins/host_scheduler.go index 6b9ccf9fb..40b5cd255 100644 --- a/plugins/host_scheduler.go +++ b/plugins/host_scheduler.go @@ -61,7 +61,7 @@ func (s *schedulerServiceImpl) ScheduleOneTime(ctx context.Context, delaySeconds capturedID := scheduleID timer := timeAfterFunc(time.Duration(delaySeconds)*time.Second, func() { - s.invokeCallback(capturedID) + s.invokeCallback(ctx, capturedID) // Clean up the entry after firing s.mu.Lock() delete(s.schedules, capturedID) @@ -86,7 +86,7 @@ func (s *schedulerServiceImpl) ScheduleRecurring(ctx context.Context, cronExpres capturedID := scheduleID callback := func() { - s.invokeCallback(capturedID) + s.invokeCallback(ctx, capturedID) } s.mu.Lock() @@ -131,9 +131,9 @@ func (s *schedulerServiceImpl) CancelSchedule(ctx context.Context, scheduleID st return nil } -// CancelAllForPlugin cancels all schedules for this plugin. +// Close cancels all schedules for this plugin. // This is called when the plugin is unloaded. -func (s *schedulerServiceImpl) CancelAllForPlugin() { +func (s *schedulerServiceImpl) Close() error { s.mu.Lock() schedules := make(map[string]*scheduleEntry, len(s.schedules)) for k, v := range s.schedules { @@ -148,8 +148,9 @@ func (s *schedulerServiceImpl) CancelAllForPlugin() { } else { s.scheduler.Remove(entry.entryID) } - log.Debug(context.Background(), "Cancelled schedule on plugin unload", "plugin", s.pluginName, "scheduleID", scheduleID) + log.Debug("Cancelled schedule on plugin unload", "plugin", s.pluginName, "scheduleID", scheduleID) } + return nil } // schedulerCallbackInput is the input format for the nd_scheduler_callback function. @@ -165,8 +166,7 @@ type schedulerCallbackOutput struct { } // invokeCallback calls the plugin's nd_scheduler_callback function. -func (s *schedulerServiceImpl) invokeCallback(scheduleID string) { - ctx := context.Background() +func (s *schedulerServiceImpl) invokeCallback(ctx context.Context, scheduleID string) { log.Debug(ctx, "Scheduler callback invoked", "plugin", s.pluginName, "scheduleID", scheduleID) s.mu.Lock() @@ -220,41 +220,3 @@ func (s *schedulerServiceImpl) invokeCallback(scheduleID string) { // Verify interface implementation var _ host.SchedulerService = (*schedulerServiceImpl)(nil) - -// schedulerServiceRegistry keeps track of scheduler services per plugin for cleanup. -type schedulerServiceRegistry struct { - mu sync.RWMutex - services map[string]*schedulerServiceImpl -} - -var schedulerRegistry = &schedulerServiceRegistry{ - services: make(map[string]*schedulerServiceImpl), -} - -// registerSchedulerService registers a scheduler service for a plugin. -func registerSchedulerService(pluginName string, service *schedulerServiceImpl) { - schedulerRegistry.mu.Lock() - defer schedulerRegistry.mu.Unlock() - schedulerRegistry.services[pluginName] = service -} - -// unregisterSchedulerService unregisters and cancels all schedules for a plugin. -func unregisterSchedulerService(pluginName string) { - schedulerRegistry.mu.Lock() - service, exists := schedulerRegistry.services[pluginName] - if exists { - delete(schedulerRegistry.services, pluginName) - } - schedulerRegistry.mu.Unlock() - - if exists && service != nil { - service.CancelAllForPlugin() - } -} - -// getSchedulerService returns the scheduler service for a plugin (used by tests). -func getSchedulerService(pluginName string) *schedulerServiceImpl { - schedulerRegistry.mu.RLock() - defer schedulerRegistry.mu.RUnlock() - return schedulerRegistry.services[pluginName] -} diff --git a/plugins/host_scheduler_test.go b/plugins/host_scheduler_test.go index b4f56164c..e152f7bec 100644 --- a/plugins/host_scheduler_test.go +++ b/plugins/host_scheduler_test.go @@ -61,8 +61,8 @@ var _ = Describe("SchedulerService", Ordered, func() { err = manager.Start(GinkgoT().Context()) Expect(err).ToNot(HaveOccurred()) - // Wrap the scheduler service and replace the scheduler with our mock - service := getSchedulerService("fake-scheduler") + // Get scheduler service from plugin's closers and wrap it for testing + service := findSchedulerService(manager, "fake-scheduler") Expect(service).ToNot(BeNil()) testService = &testableSchedulerService{schedulerServiceImpl: service} testService.scheduler = mockSched @@ -87,7 +87,7 @@ var _ = Describe("SchedulerService", Ordered, func() { }) It("should register scheduler service for plugin", func() { - service := getSchedulerService("fake-scheduler") + service := findSchedulerService(manager, "fake-scheduler") Expect(service).ToNot(BeNil()) }) }) @@ -245,7 +245,7 @@ var _ = Describe("SchedulerService", Ordered, func() { err = manager.UnloadPlugin("fake-scheduler") Expect(err).ToNot(HaveOccurred()) - Expect(getSchedulerService("fake-scheduler")).To(BeNil()) + Expect(findSchedulerService(manager, "fake-scheduler")).To(BeNil()) Expect(mockSched.GetCallbackCount()).To(Equal(0)) // Recurring task removed }) }) @@ -386,3 +386,19 @@ func (r *mockTimerRegistry) Reset() { r.callbacks = make([]func(), 0) r.timers = make([]*time.Timer, 0) } + +// findSchedulerService finds the scheduler service from a plugin's closers. +func findSchedulerService(m *Manager, pluginName string) *schedulerServiceImpl { + m.mu.RLock() + instance, ok := m.plugins[pluginName] + m.mu.RUnlock() + if !ok { + return nil + } + for _, closer := range instance.closers { + if svc, ok := closer.(*schedulerServiceImpl); ok { + return svc + } + } + return nil +} diff --git a/plugins/manager.go b/plugins/manager.go index 9afdfa746..c6f4aae99 100644 --- a/plugins/manager.go +++ b/plugins/manager.go @@ -4,7 +4,9 @@ import ( "context" "crypto/rand" "encoding/json" + "errors" "fmt" + "io" "net/http" "os" "path/filepath" @@ -67,6 +69,7 @@ type pluginInstance struct { manifest *Manifest compiled *extism.CompiledPlugin capabilities []Capability // Auto-detected capabilities based on exported functions + closers []io.Closer // Cleanup functions to call on unload } func (p *pluginInstance) create() (*extism.Plugin, error) { @@ -80,6 +83,17 @@ func (p *pluginInstance) create() (*extism.Plugin, error) { return plugin, nil } +func (p *pluginInstance) Close() error { + var errs []error + for _, f := range p.closers { + err := f.Close() + if err != nil { + errs = append(errs, err) + } + } + return errors.Join(errs...) +} + // GetManager returns a singleton instance of the plugin manager. // The manager is not started automatically; call Start() to begin loading plugins. func GetManager() *Manager { @@ -384,6 +398,7 @@ func (m *Manager) loadPlugin(name, wasmPath string) error { // Check if recompilation is needed (AllowedHosts or SubsonicAPI permission) needsRecompile := false var hostFunctions []extism.HostFunction + var closers []io.Closer if hosts := manifest.AllowedHosts(); len(hosts) > 0 { pluginManifest.AllowedHosts = hosts @@ -405,8 +420,8 @@ func (m *Manager) loadPlugin(name, wasmPath string) error { // Register Scheduler host functions if permission is granted if manifest.Permissions != nil && manifest.Permissions.Scheduler != nil { service := newSchedulerService(name, m, scheduler.GetInstance()) + closers = append(closers, service) hostFunctions = append(hostFunctions, host.RegisterSchedulerHostFunctions(service)...) - registerSchedulerService(name, service.(*schedulerServiceImpl)) needsRecompile = true } @@ -426,6 +441,7 @@ func (m *Manager) loadPlugin(name, wasmPath string) error { manifest: &manifest, compiled: compiled, capabilities: capabilities, + closers: closers, } m.mu.Unlock() return nil @@ -451,8 +467,11 @@ func (m *Manager) UnloadPlugin(name string) error { delete(m.plugins, name) m.mu.Unlock() - // Cancel all scheduled tasks for this plugin - unregisterSchedulerService(name) + // Run cleanup functions + err := instance.Close() + if err != nil { + log.Error("Error during plugin cleanup", "plugin", name, err) + } // Close the compiled plugin outside the lock with a grace period // to allow in-flight requests to complete