navidrome/plugins/manager.go
Deluan 7b1126201e feat: enhance plugin manager to support metrics recording
Signed-off-by: Deluan <deluan@navidrome.org>
2026-01-02 00:41:33 -05:00

443 lines
12 KiB
Go

package plugins
import (
"context"
"fmt"
"net/http"
"os"
"path/filepath"
"runtime"
"sync"
"sync/atomic"
"time"
extism "github.com/extism/go-sdk"
"github.com/navidrome/navidrome/conf"
"github.com/navidrome/navidrome/core/agents"
"github.com/navidrome/navidrome/core/scrobbler"
"github.com/navidrome/navidrome/log"
"github.com/navidrome/navidrome/model"
"github.com/navidrome/navidrome/server/events"
"github.com/navidrome/navidrome/utils/singleton"
"github.com/rjeczalik/notify"
"github.com/tetratelabs/wazero"
)
const (
// defaultTimeout is the default timeout for plugin function calls
defaultTimeout = 30 * time.Second
// maxPluginLoadConcurrency is the maximum number of plugins that can be
// compiled/loaded in parallel during startup
maxPluginLoadConcurrency = 3
)
// SubsonicRouter is an http.Handler that serves Subsonic API requests.
type SubsonicRouter = http.Handler
// PluginMetricsRecorder is an interface for recording plugin metrics.
// This is satisfied by core/metrics.Metrics but defined here to avoid import cycles.
type PluginMetricsRecorder interface {
RecordPluginRequest(ctx context.Context, plugin, method string, ok bool, elapsed int64)
}
// Manager manages loading and lifecycle of WebAssembly plugins.
// It implements both agents.PluginLoader and scrobbler.PluginLoader interfaces.
type Manager struct {
mu sync.RWMutex
plugins map[string]*plugin
ctx context.Context
cancel context.CancelFunc
cache wazero.CompilationCache
stopped atomic.Bool // Set to true when Stop() is called
loadWg sync.WaitGroup // Tracks in-flight plugin load operations
// File watcher fields (used when AutoReload is enabled)
watcherEvents chan notify.EventInfo
watcherDone chan struct{}
debounceTimers map[string]*time.Timer
debounceMu sync.Mutex
// SubsonicAPI host function dependencies (set once before Start, not modified after)
subsonicRouter SubsonicRouter
ds model.DataStore
broker events.Broker
metrics PluginMetricsRecorder
}
// GetManager returns a singleton instance of the plugin manager.
// The manager is not started automatically; call Start() to begin loading plugins.
func GetManager(ds model.DataStore, broker events.Broker, m PluginMetricsRecorder) *Manager {
return singleton.GetInstance(func() *Manager {
return &Manager{
ds: ds,
broker: broker,
metrics: m,
plugins: make(map[string]*plugin),
}
})
}
// sendPluginRefreshEvent broadcasts a refresh event for the plugin resource.
// This notifies connected UI clients that plugin data has changed.
func (m *Manager) sendPluginRefreshEvent(ctx context.Context, pluginIDs ...string) {
if m.broker == nil {
return
}
event := (&events.RefreshResource{}).With("plugin", pluginIDs...)
m.broker.SendBroadcastMessage(ctx, event)
}
// SetSubsonicRouter sets the Subsonic router for SubsonicAPI host functions.
// This should be called after the subsonic router is created but before plugins
// that require SubsonicAPI access are loaded.
func (m *Manager) SetSubsonicRouter(router SubsonicRouter) {
m.subsonicRouter = router
}
// Start initializes the plugin manager and loads plugins from the configured folder.
// It should be called once during application startup when plugins are enabled.
// The startup flow is:
// 1. Sync plugins folder with DB (discover new, update changed, remove deleted)
// 2. Load only enabled plugins from DB
func (m *Manager) Start(ctx context.Context) error {
if !conf.Server.Plugins.Enabled {
log.Debug(ctx, "Plugin system is disabled")
return nil
}
if m.subsonicRouter == nil {
log.Fatal(ctx, "Plugin manager requires DataStore to be configured")
}
// Set extism log level based on plugin-specific config or global log level
pluginLogLevel := conf.Server.Plugins.LogLevel
if pluginLogLevel == "" {
pluginLogLevel = conf.Server.LogLevel
}
extism.SetLogLevel(toExtismLogLevel(log.ParseLogLevel(pluginLogLevel)))
m.ctx, m.cancel = context.WithCancel(ctx)
// Initialize wazero compilation cache for better performance
cacheDir := filepath.Join(conf.Server.CacheFolder, "plugins")
purgeCacheBySize(ctx, cacheDir, conf.Server.Plugins.CacheSize)
var err error
m.cache, err = wazero.NewCompilationCacheWithDir(cacheDir)
if err != nil {
log.Error(ctx, "Failed to create wazero compilation cache", err)
return fmt.Errorf("creating wazero compilation cache: %w", err)
}
folder := conf.Server.Plugins.Folder
if folder == "" {
log.Debug(ctx, "No plugins folder configured")
return nil
}
// Create plugins folder if it doesn't exist
if err := os.MkdirAll(folder, 0755); err != nil {
log.Error(ctx, "Failed to create plugins folder", "folder", folder, err)
return fmt.Errorf("creating plugins folder: %w", err)
}
log.Info(ctx, "Starting plugin manager", "folder", folder)
// Sync plugins folder with DB
if err := m.syncPlugins(ctx, folder); err != nil {
log.Error(ctx, "Error syncing plugins with DB", err)
// Continue - we can still try to load plugins
}
// Load enabled plugins from DB
if err := m.loadEnabledPlugins(ctx); err != nil {
log.Error(ctx, "Error loading enabled plugins", err)
return fmt.Errorf("loading enabled plugins: %w", err)
}
// Start file watcher if auto-reload is enabled
if conf.Server.Plugins.AutoReload {
if err := m.startWatcher(); err != nil {
log.Error(ctx, "Failed to start plugin file watcher", err)
// Non-fatal - plugins are still loaded, just no auto-reload
}
}
return nil
}
// Stop shuts down the plugin manager and releases all resources.
func (m *Manager) Stop() error {
// Mark as stopped first to prevent new operations
m.stopped.Store(true)
// Cancel context to signal all goroutines to stop
if m.cancel != nil {
m.cancel()
}
// Stop file watcher
m.stopWatcher()
// Wait for all in-flight plugin load operations to complete
// This is critical to avoid races with cache.Close()
m.loadWg.Wait()
m.mu.Lock()
defer m.mu.Unlock()
// Close all plugins
for name, plugin := range m.plugins {
err := plugin.Close()
if err != nil {
log.Error("Error during plugin cleanup", "plugin", name, err)
}
if plugin.compiled != nil {
if err := plugin.compiled.Close(context.Background()); err != nil {
log.Error("Error closing plugin", "plugin", name, err)
}
}
}
m.plugins = make(map[string]*plugin)
// Close compilation cache
if m.cache != nil {
if err := m.cache.Close(context.Background()); err != nil {
log.Error("Error closing wazero cache", err)
}
m.cache = nil
}
return nil
}
// PluginNames returns the names of all plugins that implement a particular capability.
// This is used by both agents and scrobbler systems to discover available plugins.
// Capabilities are auto-detected from the plugin's exported functions.
func (m *Manager) PluginNames(capability string) []string {
m.mu.RLock()
defer m.mu.RUnlock()
var names []string
cap := Capability(capability)
for name, plugin := range m.plugins {
if hasCapability(plugin.capabilities, cap) {
names = append(names, name)
}
}
return names
}
// LoadMediaAgent loads and returns a media agent plugin by name.
// Returns false if the plugin is not found or doesn't have the MetadataAgent capability.
func (m *Manager) LoadMediaAgent(name string) (agents.Interface, bool) {
m.mu.RLock()
plugin, ok := m.plugins[name]
m.mu.RUnlock()
if !ok || !hasCapability(plugin.capabilities, CapabilityMetadataAgent) {
return nil, false
}
// Create a new metadata agent adapter for this plugin
return &MetadataAgent{
name: plugin.name,
plugin: plugin,
}, true
}
// LoadScrobbler loads and returns a scrobbler plugin by name.
// Returns false if the plugin is not found or doesn't have the Scrobbler capability.
func (m *Manager) LoadScrobbler(name string) (scrobbler.Scrobbler, bool) {
m.mu.RLock()
plugin, ok := m.plugins[name]
m.mu.RUnlock()
if !ok || !hasCapability(plugin.capabilities, CapabilityScrobbler) {
return nil, false
}
// Create a new scrobbler adapter for this plugin
return &ScrobblerPlugin{
name: plugin.name,
plugin: plugin,
}, true
}
// PluginInfo contains basic information about a plugin for metrics/insights.
type PluginInfo struct {
Name string
Version string
}
// GetPluginInfo returns information about all loaded plugins.
func (m *Manager) GetPluginInfo() map[string]PluginInfo {
m.mu.RLock()
defer m.mu.RUnlock()
info := make(map[string]PluginInfo, len(m.plugins))
for name, plugin := range m.plugins {
info[name] = PluginInfo{
Name: plugin.manifest.Name,
Version: plugin.manifest.Version,
}
}
return info
}
// EnablePlugin enables a plugin by loading it and updating the DB.
// Returns an error if the plugin is not found in DB or fails to load.
func (m *Manager) EnablePlugin(ctx context.Context, id string) error {
if m.ds == nil {
return fmt.Errorf("datastore not configured")
}
adminCtx := adminContext(ctx)
repo := m.ds.Plugin(adminCtx)
plugin, err := repo.Get(id)
if err != nil {
return fmt.Errorf("getting plugin from DB: %w", err)
}
if plugin.Enabled {
return nil // Already enabled
}
// Try to load the plugin
if err := m.loadPluginWithConfig(plugin.ID, plugin.Path, plugin.Config); err != nil {
// Store error and return
plugin.LastError = err.Error()
plugin.UpdatedAt = time.Now()
_ = repo.Put(plugin)
return fmt.Errorf("loading plugin: %w", err)
}
// Update DB
plugin.Enabled = true
plugin.LastError = ""
plugin.UpdatedAt = time.Now()
if err := repo.Put(plugin); err != nil {
// Unload since we couldn't update DB
_ = m.unloadPlugin(id)
return fmt.Errorf("updating plugin in DB: %w", err)
}
log.Info(ctx, "Enabled plugin", "plugin", id)
m.sendPluginRefreshEvent(ctx, id)
return nil
}
// DisablePlugin disables a plugin by unloading it and updating the DB.
// Returns an error if the plugin is not found in DB.
func (m *Manager) DisablePlugin(ctx context.Context, id string) error {
if m.ds == nil {
return fmt.Errorf("datastore not configured")
}
adminCtx := adminContext(ctx)
repo := m.ds.Plugin(adminCtx)
plugin, err := repo.Get(id)
if err != nil {
return fmt.Errorf("getting plugin from DB: %w", err)
}
if !plugin.Enabled {
return nil // Already disabled
}
// Unload the plugin
if err := m.unloadPlugin(id); err != nil {
log.Debug(ctx, "Plugin was not loaded", "plugin", id)
}
// Update DB
plugin.Enabled = false
plugin.UpdatedAt = time.Now()
if err := repo.Put(plugin); err != nil {
return fmt.Errorf("updating plugin in DB: %w", err)
}
log.Info(ctx, "Disabled plugin", "plugin", id)
m.sendPluginRefreshEvent(ctx, id)
return nil
}
// UpdatePluginConfig updates the configuration for a plugin.
// If the plugin is enabled, it will be reloaded with the new config.
func (m *Manager) UpdatePluginConfig(ctx context.Context, id, configJSON string) error {
if m.ds == nil {
return fmt.Errorf("datastore not configured")
}
adminCtx := adminContext(ctx)
repo := m.ds.Plugin(adminCtx)
plugin, err := repo.Get(id)
if err != nil {
return fmt.Errorf("getting plugin from DB: %w", err)
}
wasEnabled := plugin.Enabled
// Update config in DB
plugin.Config = configJSON
plugin.UpdatedAt = time.Now()
if err := repo.Put(plugin); err != nil {
return fmt.Errorf("updating plugin config in DB: %w", err)
}
// Reload if enabled
if wasEnabled {
if err := m.unloadPlugin(id); err != nil {
log.Debug(ctx, "Plugin was not loaded", "plugin", id)
}
if err := m.loadPluginWithConfig(plugin.ID, plugin.Path, configJSON); err != nil {
plugin.LastError = err.Error()
plugin.Enabled = false
_ = repo.Put(plugin)
return fmt.Errorf("reloading plugin with new config: %w", err)
}
}
log.Info(ctx, "Updated plugin config", "plugin", id)
m.sendPluginRefreshEvent(ctx, id)
return nil
}
// unloadPlugin removes a plugin from the manager and closes its resources.
// Returns an error if the plugin is not found.
func (m *Manager) unloadPlugin(name string) error {
m.mu.Lock()
plugin, ok := m.plugins[name]
if !ok {
m.mu.Unlock()
return fmt.Errorf("plugin %q not found", name)
}
delete(m.plugins, name)
m.mu.Unlock()
// Run cleanup functions
err := plugin.Close()
if err != nil {
log.Error("Error during plugin cleanup", "plugin", name, err)
}
// Close the compiled plugin outside the lock with a grace period
// to allow in-flight requests to complete
if plugin.compiled != nil {
// Use a brief timeout for cleanup
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := plugin.compiled.Close(ctx); err != nil {
log.Error("Error closing plugin during unload", "plugin", name, err)
}
}
runtime.GC()
log.Info(m.ctx, "Unloaded plugin", "plugin", name)
return nil
}