navidrome/plugins/manager_loader.go
Deluan Quintão 668869b6c7
feat(plugins): add TaskQueue host service for persistent background task queues (#5116)
* feat(plugins): define TaskQueue host service interface

Add the TaskQueueService interface with CreateQueue, Enqueue,
GetTaskStatus, and CancelTask methods plus QueueConfig struct.

* feat(plugins): define TaskWorker capability for task execution callbacks

* feat(plugins): add taskqueue permission to manifest schema

Add TaskQueuePermission with maxConcurrency option.

* feat(plugins): implement TaskQueue service with SQLite persistence and workers

Per-plugin SQLite database with queues and tasks tables. Worker goroutines
dequeue tasks and invoke nd_task_execute callback. Exponential backoff
retries, rate limiting via delayMs, automatic cleanup of terminal tasks.

* feat(plugins): require TaskWorker capability for taskqueue permission

* feat(plugins): register TaskQueue host service in manager

* feat(plugins): add test-taskqueue plugin for integration testing

* feat(plugins): add integration tests for TaskQueue host service

* docs: document TaskQueue module for persistent task queues

Signed-off-by: Deluan <deluan@navidrome.org>

* fix(plugins): harden TaskQueue host service with validation and safety improvements

Add input validation (queue name length, payload size limits), extract
status string constants to eliminate raw SQL literals, make CreateQueue
idempotent via upsert for crash recovery, fix RetentionMs default check
for negative values, cap exponential backoff at 1 hour to prevent
overflow, and replace manual mutex-based delay enforcement with
rate.Limiter from golang.org/x/time/rate for correct concurrent worker
serialization.

* refactor(plugins): remove capability check for TaskWorker in TaskQueue host service

Signed-off-by: Deluan <deluan@navidrome.org>

* fix(plugins): use context-aware database execution in TaskQueue host service

Signed-off-by: Deluan <deluan@navidrome.org>

* refactor(plugins): streamline task queue configuration and error handling

Signed-off-by: Deluan <deluan@navidrome.org>

* feat(plugins): increase maxConcurrency for task queue and handle budget exhaustion

Signed-off-by: Deluan <deluan@navidrome.org>

* refactor(plugins): simplify goroutine management in task queue service

Signed-off-by: Deluan <deluan@navidrome.org>

* feat(plugins): update TaskWorker interface to return status messages and refactor task queue service

Signed-off-by: Deluan <deluan@navidrome.org>

* feat(plugins): add ClearQueue function to remove pending tasks from a specified queue

Signed-off-by: Deluan <deluan@navidrome.org>

* refactor(plugins): use migrateDB for task queue schema and fix constant name collision

Replaced the raw db.Exec call in createTaskQueueSchema with migrateDB,
matching the pattern used by createKVStoreSchema. This enables version-tracked
schema migrations via SQLite's PRAGMA user_version, allowing future schema
changes to be appended incrementally. Also renamed cleanupInterval to
taskCleanupInterval to resolve a redeclaration conflict with host_kvstore.go.

* regenerate PDKs

Signed-off-by: Deluan <deluan@navidrome.org>

---------

Signed-off-by: Deluan <deluan@navidrome.org>
2026-03-03 13:48:49 -05:00

451 lines
14 KiB
Go

package plugins
import (
"context"
"encoding/json"
"fmt"
"io"
"time"
extism "github.com/extism/go-sdk"
"github.com/navidrome/navidrome/log"
"github.com/navidrome/navidrome/model"
"github.com/navidrome/navidrome/plugins/host"
"github.com/navidrome/navidrome/scheduler"
"github.com/tetratelabs/wazero"
"github.com/tetratelabs/wazero/api"
"github.com/tetratelabs/wazero/experimental"
"golang.org/x/sync/errgroup"
)
// serviceContext provides dependencies needed by host service factories.
type serviceContext struct {
pluginName string
manager *Manager
permissions *Permissions
config map[string]string
allowedUsers []string // User IDs this plugin can access
allUsers bool // If true, plugin can access all users
allowedLibraries []int // Library IDs this plugin can access
allLibraries bool // If true, plugin can access all libraries
}
// hostServiceEntry defines a host service for table-driven registration.
type hostServiceEntry struct {
name string
hasPermission func(*Permissions) bool
create func(*serviceContext) ([]extism.HostFunction, io.Closer)
}
// hostServices defines all available host services.
// Adding a new host service only requires adding an entry here.
var hostServices = []hostServiceEntry{
{
name: "Config",
hasPermission: func(p *Permissions) bool { return true }, // Always available, no permission required
create: func(ctx *serviceContext) ([]extism.HostFunction, io.Closer) {
service := newConfigService(ctx.pluginName, ctx.config)
return host.RegisterConfigHostFunctions(service), nil
},
},
{
name: "SubsonicAPI",
hasPermission: func(p *Permissions) bool { return p != nil && p.Subsonicapi != nil },
create: func(ctx *serviceContext) ([]extism.HostFunction, io.Closer) {
service := newSubsonicAPIService(ctx.pluginName, ctx.manager.subsonicRouter, ctx.manager.ds, ctx.allowedUsers, ctx.allUsers)
return host.RegisterSubsonicAPIHostFunctions(service), nil
},
},
{
name: "Scheduler",
hasPermission: func(p *Permissions) bool { return p != nil && p.Scheduler != nil },
create: func(ctx *serviceContext) ([]extism.HostFunction, io.Closer) {
service := newSchedulerService(ctx.pluginName, ctx.manager, scheduler.GetInstance())
return host.RegisterSchedulerHostFunctions(service), service
},
},
{
name: "WebSocket",
hasPermission: func(p *Permissions) bool { return p != nil && p.Websocket != nil },
create: func(ctx *serviceContext) ([]extism.HostFunction, io.Closer) {
perm := ctx.permissions.Websocket
service := newWebSocketService(ctx.pluginName, ctx.manager, perm)
return host.RegisterWebSocketHostFunctions(service), service
},
},
{
name: "Artwork",
hasPermission: func(p *Permissions) bool { return p != nil && p.Artwork != nil },
create: func(ctx *serviceContext) ([]extism.HostFunction, io.Closer) {
service := newArtworkService()
return host.RegisterArtworkHostFunctions(service), nil
},
},
{
name: "Cache",
hasPermission: func(p *Permissions) bool { return p != nil && p.Cache != nil },
create: func(ctx *serviceContext) ([]extism.HostFunction, io.Closer) {
service := newCacheService(ctx.pluginName)
return host.RegisterCacheHostFunctions(service), service
},
},
{
name: "Library",
hasPermission: func(p *Permissions) bool { return p != nil && p.Library != nil },
create: func(ctx *serviceContext) ([]extism.HostFunction, io.Closer) {
perm := ctx.permissions.Library
service := newLibraryService(ctx.manager.ds, perm, ctx.allowedLibraries, ctx.allLibraries)
return host.RegisterLibraryHostFunctions(service), nil
},
},
{
name: "KVStore",
hasPermission: func(p *Permissions) bool { return p != nil && p.Kvstore != nil },
create: func(ctx *serviceContext) ([]extism.HostFunction, io.Closer) {
perm := ctx.permissions.Kvstore
service, err := newKVStoreService(ctx.manager.ctx, ctx.pluginName, perm)
if err != nil {
log.Error("Failed to create KVStore service", "plugin", ctx.pluginName, err)
return nil, nil
}
return host.RegisterKVStoreHostFunctions(service), service
},
},
{
name: "Users",
hasPermission: func(p *Permissions) bool { return p != nil && p.Users != nil },
create: func(ctx *serviceContext) ([]extism.HostFunction, io.Closer) {
service := newUsersService(ctx.manager.ds, ctx.allowedUsers, ctx.allUsers)
return host.RegisterUsersHostFunctions(service), nil
},
},
{
name: "HTTP",
hasPermission: func(p *Permissions) bool { return p != nil && p.Http != nil },
create: func(ctx *serviceContext) ([]extism.HostFunction, io.Closer) {
perm := ctx.permissions.Http
service := newHTTPService(ctx.pluginName, perm)
return host.RegisterHTTPHostFunctions(service), nil
},
},
{
name: "Task",
hasPermission: func(p *Permissions) bool { return p != nil && p.Taskqueue != nil },
create: func(ctx *serviceContext) ([]extism.HostFunction, io.Closer) {
perm := ctx.permissions.Taskqueue
maxConcurrency := int32(1)
if perm.MaxConcurrency > 0 {
maxConcurrency = int32(perm.MaxConcurrency)
}
service, err := newTaskQueueService(ctx.pluginName, ctx.manager, maxConcurrency)
if err != nil {
log.Error("Failed to create Task service", "plugin", ctx.pluginName, err)
return nil, nil
}
return host.RegisterTaskHostFunctions(service), service
},
},
}
// extractManifest reads manifest from an .ndp package and computes its SHA-256 hash.
// This is a lightweight operation used for plugin discovery and change detection.
// Unlike the old implementation, this does NOT compile the wasm - just reads the manifest JSON.
func (m *Manager) extractManifest(ndpPath string) (*PluginMetadata, error) {
if m.stopped.Load() {
return nil, fmt.Errorf("manager is stopped")
}
manifest, err := readManifest(ndpPath)
if err != nil {
return nil, err
}
sha256Hash, err := computeFileSHA256(ndpPath)
if err != nil {
return nil, fmt.Errorf("computing hash: %w", err)
}
return &PluginMetadata{
Manifest: manifest,
SHA256: sha256Hash,
}, nil
}
// loadEnabledPlugins loads all enabled plugins from the database.
func (m *Manager) loadEnabledPlugins(ctx context.Context) error {
if m.ds == nil {
return fmt.Errorf("datastore not configured")
}
adminCtx := adminContext(ctx)
repo := m.ds.Plugin(adminCtx)
plugins, err := repo.GetAll()
if err != nil {
return fmt.Errorf("reading plugins from DB: %w", err)
}
g := errgroup.Group{}
g.SetLimit(maxPluginLoadConcurrency)
for _, p := range plugins {
if !p.Enabled {
continue
}
plugin := p // Capture for goroutine
g.Go(func() error {
start := time.Now()
log.Debug(ctx, "Loading enabled plugin", "plugin", plugin.ID, "path", plugin.Path)
// Panic recovery
defer func() {
if r := recover(); r != nil {
log.Error(ctx, "Panic while loading plugin", "plugin", plugin.ID, "panic", r)
}
}()
if err := m.loadPluginWithConfig(&plugin); err != nil {
// Store error in DB
plugin.LastError = err.Error()
plugin.Enabled = false
plugin.UpdatedAt = time.Now()
if putErr := repo.Put(&plugin); putErr != nil {
log.Error(ctx, "Failed to update plugin error in DB", "plugin", plugin.ID, putErr)
}
log.Error(ctx, "Failed to load plugin", "plugin", plugin.ID, err)
return nil
}
// Clear any previous error
if plugin.LastError != "" {
plugin.LastError = ""
plugin.UpdatedAt = time.Now()
if putErr := repo.Put(&plugin); putErr != nil {
log.Error(ctx, "Failed to clear plugin error in DB", "plugin", plugin.ID, putErr)
}
}
m.mu.RLock()
loadedPlugin := m.plugins[plugin.ID]
m.mu.RUnlock()
if loadedPlugin != nil {
log.Info(ctx, "Loaded plugin", "plugin", plugin.ID, "manifest", loadedPlugin.manifest.Name,
"capabilities", loadedPlugin.capabilities, "duration", time.Since(start))
}
return nil
})
}
return g.Wait()
}
// loadPluginWithConfig loads a plugin with configuration from DB.
// The p.Path should point to an .ndp package file.
func (m *Manager) loadPluginWithConfig(p *model.Plugin) error {
ctx := log.NewContext(m.ctx, "plugin", p.ID)
if m.stopped.Load() {
return fmt.Errorf("manager is stopped")
}
// Track this operation
m.loadWg.Add(1)
defer m.loadWg.Done()
if m.stopped.Load() {
return fmt.Errorf("manager is stopped")
}
// Parse config from JSON
pluginConfig, err := parsePluginConfig(p.Config)
if err != nil {
return err
}
// Parse users from JSON
var allowedUsers []string
if p.Users != "" {
if err := json.Unmarshal([]byte(p.Users), &allowedUsers); err != nil {
return fmt.Errorf("parsing plugin users: %w", err)
}
}
// Parse libraries from JSON
var allowedLibraries []int
if p.Libraries != "" {
if err := json.Unmarshal([]byte(p.Libraries), &allowedLibraries); err != nil {
return fmt.Errorf("parsing plugin libraries: %w", err)
}
}
// Open the .ndp package to get manifest and wasm bytes
pkg, err := openPackage(p.Path)
if err != nil {
return fmt.Errorf("opening package: %w", err)
}
// Build extism manifest
pluginManifest := extism.Manifest{
Wasm: []extism.Wasm{
extism.WasmData{Data: pkg.WasmBytes, Name: "main"},
},
Config: pluginConfig,
Timeout: uint64(defaultTimeout.Milliseconds()),
}
if pkg.Manifest.Permissions != nil && pkg.Manifest.Permissions.Http != nil {
if hosts := pkg.Manifest.Permissions.Http.RequiredHosts; len(hosts) > 0 {
pluginManifest.AllowedHosts = hosts
}
}
// Configure filesystem access for library permission
if pkg.Manifest.Permissions != nil && pkg.Manifest.Permissions.Library != nil && pkg.Manifest.Permissions.Library.Filesystem {
adminCtx := adminContext(ctx)
libraries, err := m.ds.Library(adminCtx).GetAll()
if err != nil {
return fmt.Errorf("failed to get libraries for filesystem access: %w", err)
}
allowedPaths := buildAllowedPaths(ctx, libraries, allowedLibraries, p.AllLibraries, p.AllowWriteAccess)
pluginManifest.AllowedPaths = allowedPaths
}
// Build host functions based on permissions from manifest
var hostFunctions []extism.HostFunction
var closers []io.Closer
svcCtx := &serviceContext{
pluginName: p.ID,
manager: m,
permissions: pkg.Manifest.Permissions,
config: pluginConfig,
allowedUsers: allowedUsers,
allUsers: p.AllUsers,
allowedLibraries: allowedLibraries,
allLibraries: p.AllLibraries,
}
for _, entry := range hostServices {
if entry.hasPermission(pkg.Manifest.Permissions) {
funcs, closer := entry.create(svcCtx)
hostFunctions = append(hostFunctions, funcs...)
if closer != nil {
closers = append(closers, closer)
}
}
}
// Compile the plugin with all host functions
runtimeConfig := wazero.NewRuntimeConfig().
WithCompilationCache(m.cache).
WithCloseOnContextDone(true)
// Enable experimental threads if requested in manifest
if pkg.Manifest.HasExperimentalThreads() {
runtimeConfig = runtimeConfig.WithCoreFeatures(api.CoreFeaturesV2 | experimental.CoreFeaturesThreads)
log.Debug(ctx, "Enabling experimental threads support")
}
extismConfig := extism.PluginConfig{
EnableWasi: true,
RuntimeConfig: runtimeConfig,
EnableHttpResponseHeaders: true,
}
compiled, err := extism.NewCompiledPlugin(ctx, pluginManifest, extismConfig, hostFunctions)
if err != nil {
return fmt.Errorf("compiling plugin: %w", err)
}
// Create instance to detect capabilities
instance, err := compiled.Instance(ctx, extism.PluginInstanceConfig{})
if err != nil {
compiled.Close(ctx)
return fmt.Errorf("creating instance: %w", err)
}
instance.SetLogger(extismLogger(p.ID))
capabilities := detectCapabilities(instance)
instance.Close(ctx)
// Validate manifest against detected capabilities
if err := ValidateWithCapabilities(pkg.Manifest, capabilities); err != nil {
compiled.Close(ctx)
return fmt.Errorf("manifest validation: %w", err)
}
m.mu.Lock()
m.plugins[p.ID] = &plugin{
name: p.ID,
path: p.Path,
manifest: pkg.Manifest,
compiled: compiled,
capabilities: capabilities,
closers: closers,
metrics: m.metrics,
allowedUserIDs: allowedUsers,
allUsers: p.AllUsers,
}
m.mu.Unlock()
// Call plugin init function
callPluginInit(ctx, m.plugins[p.ID])
return nil
}
// parsePluginConfig parses a JSON config string into a map of string values.
// For Extism, all config values must be strings, so non-string values are serialized as JSON.
func parsePluginConfig(configJSON string) (map[string]string, error) {
if configJSON == "" {
return nil, nil
}
var rawConfig map[string]any
if err := json.Unmarshal([]byte(configJSON), &rawConfig); err != nil {
return nil, fmt.Errorf("parsing plugin config: %w", err)
}
pluginConfig := make(map[string]string)
for key, value := range rawConfig {
switch v := value.(type) {
case string:
pluginConfig[key] = v
default:
// Serialize non-string values as JSON
jsonBytes, err := json.Marshal(v)
if err != nil {
return nil, fmt.Errorf("serializing config value %q: %w", key, err)
}
pluginConfig[key] = string(jsonBytes)
}
}
return pluginConfig, nil
}
// buildAllowedPaths constructs the extism AllowedPaths map for filesystem access.
// When allowWriteAccess is false (default), paths are prefixed with "ro:" for read-only.
// Only libraries that match the allowed set (or all libraries if allLibraries is true) are included.
func buildAllowedPaths(ctx context.Context, libraries model.Libraries, allowedLibraryIDs []int, allLibraries, allowWriteAccess bool) map[string]string {
allowedLibrarySet := make(map[int]struct{}, len(allowedLibraryIDs))
for _, id := range allowedLibraryIDs {
allowedLibrarySet[id] = struct{}{}
}
allowedPaths := make(map[string]string)
for _, lib := range libraries {
_, allowed := allowedLibrarySet[lib.ID]
if allLibraries || allowed {
mountPoint := toPluginMountPoint(int32(lib.ID))
hostPath := lib.Path
if !allowWriteAccess {
hostPath = "ro:" + hostPath
}
allowedPaths[hostPath] = mountPoint
log.Trace(ctx, "Added library to allowed paths", "libraryID", lib.ID, "mountPoint", mountPoint, "writeAccess", allowWriteAccess, "hostPath", hostPath)
}
}
if allowWriteAccess {
log.Info(ctx, "Granting read-write filesystem access to libraries", "libraryCount", len(allowedPaths), "allLibraries", allLibraries)
} else {
log.Debug(ctx, "Granting read-only filesystem access to libraries", "libraryCount", len(allowedPaths), "allLibraries", allLibraries)
}
return allowedPaths
}