mirror of
https://github.com/navidrome/navidrome.git
synced 2026-03-04 06:35:52 +00:00
* feat(plugins): define TaskQueue host service interface Add the TaskQueueService interface with CreateQueue, Enqueue, GetTaskStatus, and CancelTask methods plus QueueConfig struct. * feat(plugins): define TaskWorker capability for task execution callbacks * feat(plugins): add taskqueue permission to manifest schema Add TaskQueuePermission with maxConcurrency option. * feat(plugins): implement TaskQueue service with SQLite persistence and workers Per-plugin SQLite database with queues and tasks tables. Worker goroutines dequeue tasks and invoke nd_task_execute callback. Exponential backoff retries, rate limiting via delayMs, automatic cleanup of terminal tasks. * feat(plugins): require TaskWorker capability for taskqueue permission * feat(plugins): register TaskQueue host service in manager * feat(plugins): add test-taskqueue plugin for integration testing * feat(plugins): add integration tests for TaskQueue host service * docs: document TaskQueue module for persistent task queues Signed-off-by: Deluan <deluan@navidrome.org> * fix(plugins): harden TaskQueue host service with validation and safety improvements Add input validation (queue name length, payload size limits), extract status string constants to eliminate raw SQL literals, make CreateQueue idempotent via upsert for crash recovery, fix RetentionMs default check for negative values, cap exponential backoff at 1 hour to prevent overflow, and replace manual mutex-based delay enforcement with rate.Limiter from golang.org/x/time/rate for correct concurrent worker serialization. * refactor(plugins): remove capability check for TaskWorker in TaskQueue host service Signed-off-by: Deluan <deluan@navidrome.org> * fix(plugins): use context-aware database execution in TaskQueue host service Signed-off-by: Deluan <deluan@navidrome.org> * refactor(plugins): streamline task queue configuration and error handling Signed-off-by: Deluan <deluan@navidrome.org> * feat(plugins): increase maxConcurrency for task queue and handle budget exhaustion Signed-off-by: Deluan <deluan@navidrome.org> * refactor(plugins): simplify goroutine management in task queue service Signed-off-by: Deluan <deluan@navidrome.org> * feat(plugins): update TaskWorker interface to return status messages and refactor task queue service Signed-off-by: Deluan <deluan@navidrome.org> * feat(plugins): add ClearQueue function to remove pending tasks from a specified queue Signed-off-by: Deluan <deluan@navidrome.org> * refactor(plugins): use migrateDB for task queue schema and fix constant name collision Replaced the raw db.Exec call in createTaskQueueSchema with migrateDB, matching the pattern used by createKVStoreSchema. This enables version-tracked schema migrations via SQLite's PRAGMA user_version, allowing future schema changes to be appended incrementally. Also renamed cleanupInterval to taskCleanupInterval to resolve a redeclaration conflict with host_kvstore.go. * regenerate PDKs Signed-off-by: Deluan <deluan@navidrome.org> --------- Signed-off-by: Deluan <deluan@navidrome.org>
451 lines
14 KiB
Go
451 lines
14 KiB
Go
package plugins
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"time"
|
|
|
|
extism "github.com/extism/go-sdk"
|
|
"github.com/navidrome/navidrome/log"
|
|
"github.com/navidrome/navidrome/model"
|
|
"github.com/navidrome/navidrome/plugins/host"
|
|
"github.com/navidrome/navidrome/scheduler"
|
|
"github.com/tetratelabs/wazero"
|
|
"github.com/tetratelabs/wazero/api"
|
|
"github.com/tetratelabs/wazero/experimental"
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
// serviceContext provides dependencies needed by host service factories.
|
|
type serviceContext struct {
|
|
pluginName string
|
|
manager *Manager
|
|
permissions *Permissions
|
|
config map[string]string
|
|
allowedUsers []string // User IDs this plugin can access
|
|
allUsers bool // If true, plugin can access all users
|
|
allowedLibraries []int // Library IDs this plugin can access
|
|
allLibraries bool // If true, plugin can access all libraries
|
|
}
|
|
|
|
// hostServiceEntry defines a host service for table-driven registration.
|
|
type hostServiceEntry struct {
|
|
name string
|
|
hasPermission func(*Permissions) bool
|
|
create func(*serviceContext) ([]extism.HostFunction, io.Closer)
|
|
}
|
|
|
|
// hostServices defines all available host services.
|
|
// Adding a new host service only requires adding an entry here.
|
|
var hostServices = []hostServiceEntry{
|
|
{
|
|
name: "Config",
|
|
hasPermission: func(p *Permissions) bool { return true }, // Always available, no permission required
|
|
create: func(ctx *serviceContext) ([]extism.HostFunction, io.Closer) {
|
|
service := newConfigService(ctx.pluginName, ctx.config)
|
|
return host.RegisterConfigHostFunctions(service), nil
|
|
},
|
|
},
|
|
{
|
|
name: "SubsonicAPI",
|
|
hasPermission: func(p *Permissions) bool { return p != nil && p.Subsonicapi != nil },
|
|
create: func(ctx *serviceContext) ([]extism.HostFunction, io.Closer) {
|
|
service := newSubsonicAPIService(ctx.pluginName, ctx.manager.subsonicRouter, ctx.manager.ds, ctx.allowedUsers, ctx.allUsers)
|
|
return host.RegisterSubsonicAPIHostFunctions(service), nil
|
|
},
|
|
},
|
|
{
|
|
name: "Scheduler",
|
|
hasPermission: func(p *Permissions) bool { return p != nil && p.Scheduler != nil },
|
|
create: func(ctx *serviceContext) ([]extism.HostFunction, io.Closer) {
|
|
service := newSchedulerService(ctx.pluginName, ctx.manager, scheduler.GetInstance())
|
|
return host.RegisterSchedulerHostFunctions(service), service
|
|
},
|
|
},
|
|
{
|
|
name: "WebSocket",
|
|
hasPermission: func(p *Permissions) bool { return p != nil && p.Websocket != nil },
|
|
create: func(ctx *serviceContext) ([]extism.HostFunction, io.Closer) {
|
|
perm := ctx.permissions.Websocket
|
|
service := newWebSocketService(ctx.pluginName, ctx.manager, perm)
|
|
return host.RegisterWebSocketHostFunctions(service), service
|
|
},
|
|
},
|
|
{
|
|
name: "Artwork",
|
|
hasPermission: func(p *Permissions) bool { return p != nil && p.Artwork != nil },
|
|
create: func(ctx *serviceContext) ([]extism.HostFunction, io.Closer) {
|
|
service := newArtworkService()
|
|
return host.RegisterArtworkHostFunctions(service), nil
|
|
},
|
|
},
|
|
{
|
|
name: "Cache",
|
|
hasPermission: func(p *Permissions) bool { return p != nil && p.Cache != nil },
|
|
create: func(ctx *serviceContext) ([]extism.HostFunction, io.Closer) {
|
|
service := newCacheService(ctx.pluginName)
|
|
return host.RegisterCacheHostFunctions(service), service
|
|
},
|
|
},
|
|
{
|
|
name: "Library",
|
|
hasPermission: func(p *Permissions) bool { return p != nil && p.Library != nil },
|
|
create: func(ctx *serviceContext) ([]extism.HostFunction, io.Closer) {
|
|
perm := ctx.permissions.Library
|
|
service := newLibraryService(ctx.manager.ds, perm, ctx.allowedLibraries, ctx.allLibraries)
|
|
return host.RegisterLibraryHostFunctions(service), nil
|
|
},
|
|
},
|
|
{
|
|
name: "KVStore",
|
|
hasPermission: func(p *Permissions) bool { return p != nil && p.Kvstore != nil },
|
|
create: func(ctx *serviceContext) ([]extism.HostFunction, io.Closer) {
|
|
perm := ctx.permissions.Kvstore
|
|
service, err := newKVStoreService(ctx.manager.ctx, ctx.pluginName, perm)
|
|
if err != nil {
|
|
log.Error("Failed to create KVStore service", "plugin", ctx.pluginName, err)
|
|
return nil, nil
|
|
}
|
|
return host.RegisterKVStoreHostFunctions(service), service
|
|
},
|
|
},
|
|
{
|
|
name: "Users",
|
|
hasPermission: func(p *Permissions) bool { return p != nil && p.Users != nil },
|
|
create: func(ctx *serviceContext) ([]extism.HostFunction, io.Closer) {
|
|
service := newUsersService(ctx.manager.ds, ctx.allowedUsers, ctx.allUsers)
|
|
return host.RegisterUsersHostFunctions(service), nil
|
|
},
|
|
},
|
|
{
|
|
name: "HTTP",
|
|
hasPermission: func(p *Permissions) bool { return p != nil && p.Http != nil },
|
|
create: func(ctx *serviceContext) ([]extism.HostFunction, io.Closer) {
|
|
perm := ctx.permissions.Http
|
|
service := newHTTPService(ctx.pluginName, perm)
|
|
return host.RegisterHTTPHostFunctions(service), nil
|
|
},
|
|
},
|
|
{
|
|
name: "Task",
|
|
hasPermission: func(p *Permissions) bool { return p != nil && p.Taskqueue != nil },
|
|
create: func(ctx *serviceContext) ([]extism.HostFunction, io.Closer) {
|
|
perm := ctx.permissions.Taskqueue
|
|
maxConcurrency := int32(1)
|
|
if perm.MaxConcurrency > 0 {
|
|
maxConcurrency = int32(perm.MaxConcurrency)
|
|
}
|
|
service, err := newTaskQueueService(ctx.pluginName, ctx.manager, maxConcurrency)
|
|
if err != nil {
|
|
log.Error("Failed to create Task service", "plugin", ctx.pluginName, err)
|
|
return nil, nil
|
|
}
|
|
return host.RegisterTaskHostFunctions(service), service
|
|
},
|
|
},
|
|
}
|
|
|
|
// extractManifest reads manifest from an .ndp package and computes its SHA-256 hash.
|
|
// This is a lightweight operation used for plugin discovery and change detection.
|
|
// Unlike the old implementation, this does NOT compile the wasm - just reads the manifest JSON.
|
|
func (m *Manager) extractManifest(ndpPath string) (*PluginMetadata, error) {
|
|
if m.stopped.Load() {
|
|
return nil, fmt.Errorf("manager is stopped")
|
|
}
|
|
|
|
manifest, err := readManifest(ndpPath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
sha256Hash, err := computeFileSHA256(ndpPath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("computing hash: %w", err)
|
|
}
|
|
|
|
return &PluginMetadata{
|
|
Manifest: manifest,
|
|
SHA256: sha256Hash,
|
|
}, nil
|
|
}
|
|
|
|
// loadEnabledPlugins loads all enabled plugins from the database.
|
|
func (m *Manager) loadEnabledPlugins(ctx context.Context) error {
|
|
if m.ds == nil {
|
|
return fmt.Errorf("datastore not configured")
|
|
}
|
|
|
|
adminCtx := adminContext(ctx)
|
|
repo := m.ds.Plugin(adminCtx)
|
|
|
|
plugins, err := repo.GetAll()
|
|
if err != nil {
|
|
return fmt.Errorf("reading plugins from DB: %w", err)
|
|
}
|
|
|
|
g := errgroup.Group{}
|
|
g.SetLimit(maxPluginLoadConcurrency)
|
|
|
|
for _, p := range plugins {
|
|
if !p.Enabled {
|
|
continue
|
|
}
|
|
|
|
plugin := p // Capture for goroutine
|
|
g.Go(func() error {
|
|
start := time.Now()
|
|
log.Debug(ctx, "Loading enabled plugin", "plugin", plugin.ID, "path", plugin.Path)
|
|
|
|
// Panic recovery
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
log.Error(ctx, "Panic while loading plugin", "plugin", plugin.ID, "panic", r)
|
|
}
|
|
}()
|
|
|
|
if err := m.loadPluginWithConfig(&plugin); err != nil {
|
|
// Store error in DB
|
|
plugin.LastError = err.Error()
|
|
plugin.Enabled = false
|
|
plugin.UpdatedAt = time.Now()
|
|
if putErr := repo.Put(&plugin); putErr != nil {
|
|
log.Error(ctx, "Failed to update plugin error in DB", "plugin", plugin.ID, putErr)
|
|
}
|
|
log.Error(ctx, "Failed to load plugin", "plugin", plugin.ID, err)
|
|
return nil
|
|
}
|
|
|
|
// Clear any previous error
|
|
if plugin.LastError != "" {
|
|
plugin.LastError = ""
|
|
plugin.UpdatedAt = time.Now()
|
|
if putErr := repo.Put(&plugin); putErr != nil {
|
|
log.Error(ctx, "Failed to clear plugin error in DB", "plugin", plugin.ID, putErr)
|
|
}
|
|
}
|
|
|
|
m.mu.RLock()
|
|
loadedPlugin := m.plugins[plugin.ID]
|
|
m.mu.RUnlock()
|
|
if loadedPlugin != nil {
|
|
log.Info(ctx, "Loaded plugin", "plugin", plugin.ID, "manifest", loadedPlugin.manifest.Name,
|
|
"capabilities", loadedPlugin.capabilities, "duration", time.Since(start))
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
return g.Wait()
|
|
}
|
|
|
|
// loadPluginWithConfig loads a plugin with configuration from DB.
|
|
// The p.Path should point to an .ndp package file.
|
|
func (m *Manager) loadPluginWithConfig(p *model.Plugin) error {
|
|
ctx := log.NewContext(m.ctx, "plugin", p.ID)
|
|
|
|
if m.stopped.Load() {
|
|
return fmt.Errorf("manager is stopped")
|
|
}
|
|
|
|
// Track this operation
|
|
m.loadWg.Add(1)
|
|
defer m.loadWg.Done()
|
|
|
|
if m.stopped.Load() {
|
|
return fmt.Errorf("manager is stopped")
|
|
}
|
|
|
|
// Parse config from JSON
|
|
pluginConfig, err := parsePluginConfig(p.Config)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Parse users from JSON
|
|
var allowedUsers []string
|
|
if p.Users != "" {
|
|
if err := json.Unmarshal([]byte(p.Users), &allowedUsers); err != nil {
|
|
return fmt.Errorf("parsing plugin users: %w", err)
|
|
}
|
|
}
|
|
|
|
// Parse libraries from JSON
|
|
var allowedLibraries []int
|
|
if p.Libraries != "" {
|
|
if err := json.Unmarshal([]byte(p.Libraries), &allowedLibraries); err != nil {
|
|
return fmt.Errorf("parsing plugin libraries: %w", err)
|
|
}
|
|
}
|
|
|
|
// Open the .ndp package to get manifest and wasm bytes
|
|
pkg, err := openPackage(p.Path)
|
|
if err != nil {
|
|
return fmt.Errorf("opening package: %w", err)
|
|
}
|
|
|
|
// Build extism manifest
|
|
pluginManifest := extism.Manifest{
|
|
Wasm: []extism.Wasm{
|
|
extism.WasmData{Data: pkg.WasmBytes, Name: "main"},
|
|
},
|
|
Config: pluginConfig,
|
|
Timeout: uint64(defaultTimeout.Milliseconds()),
|
|
}
|
|
|
|
if pkg.Manifest.Permissions != nil && pkg.Manifest.Permissions.Http != nil {
|
|
if hosts := pkg.Manifest.Permissions.Http.RequiredHosts; len(hosts) > 0 {
|
|
pluginManifest.AllowedHosts = hosts
|
|
}
|
|
}
|
|
|
|
// Configure filesystem access for library permission
|
|
if pkg.Manifest.Permissions != nil && pkg.Manifest.Permissions.Library != nil && pkg.Manifest.Permissions.Library.Filesystem {
|
|
adminCtx := adminContext(ctx)
|
|
libraries, err := m.ds.Library(adminCtx).GetAll()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get libraries for filesystem access: %w", err)
|
|
}
|
|
|
|
allowedPaths := buildAllowedPaths(ctx, libraries, allowedLibraries, p.AllLibraries, p.AllowWriteAccess)
|
|
pluginManifest.AllowedPaths = allowedPaths
|
|
}
|
|
|
|
// Build host functions based on permissions from manifest
|
|
var hostFunctions []extism.HostFunction
|
|
var closers []io.Closer
|
|
|
|
svcCtx := &serviceContext{
|
|
pluginName: p.ID,
|
|
manager: m,
|
|
permissions: pkg.Manifest.Permissions,
|
|
config: pluginConfig,
|
|
allowedUsers: allowedUsers,
|
|
allUsers: p.AllUsers,
|
|
allowedLibraries: allowedLibraries,
|
|
allLibraries: p.AllLibraries,
|
|
}
|
|
for _, entry := range hostServices {
|
|
if entry.hasPermission(pkg.Manifest.Permissions) {
|
|
funcs, closer := entry.create(svcCtx)
|
|
hostFunctions = append(hostFunctions, funcs...)
|
|
if closer != nil {
|
|
closers = append(closers, closer)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Compile the plugin with all host functions
|
|
runtimeConfig := wazero.NewRuntimeConfig().
|
|
WithCompilationCache(m.cache).
|
|
WithCloseOnContextDone(true)
|
|
|
|
// Enable experimental threads if requested in manifest
|
|
if pkg.Manifest.HasExperimentalThreads() {
|
|
runtimeConfig = runtimeConfig.WithCoreFeatures(api.CoreFeaturesV2 | experimental.CoreFeaturesThreads)
|
|
log.Debug(ctx, "Enabling experimental threads support")
|
|
}
|
|
|
|
extismConfig := extism.PluginConfig{
|
|
EnableWasi: true,
|
|
RuntimeConfig: runtimeConfig,
|
|
EnableHttpResponseHeaders: true,
|
|
}
|
|
compiled, err := extism.NewCompiledPlugin(ctx, pluginManifest, extismConfig, hostFunctions)
|
|
if err != nil {
|
|
return fmt.Errorf("compiling plugin: %w", err)
|
|
}
|
|
|
|
// Create instance to detect capabilities
|
|
instance, err := compiled.Instance(ctx, extism.PluginInstanceConfig{})
|
|
if err != nil {
|
|
compiled.Close(ctx)
|
|
return fmt.Errorf("creating instance: %w", err)
|
|
}
|
|
instance.SetLogger(extismLogger(p.ID))
|
|
capabilities := detectCapabilities(instance)
|
|
instance.Close(ctx)
|
|
|
|
// Validate manifest against detected capabilities
|
|
if err := ValidateWithCapabilities(pkg.Manifest, capabilities); err != nil {
|
|
compiled.Close(ctx)
|
|
return fmt.Errorf("manifest validation: %w", err)
|
|
}
|
|
|
|
m.mu.Lock()
|
|
m.plugins[p.ID] = &plugin{
|
|
name: p.ID,
|
|
path: p.Path,
|
|
manifest: pkg.Manifest,
|
|
compiled: compiled,
|
|
capabilities: capabilities,
|
|
closers: closers,
|
|
metrics: m.metrics,
|
|
allowedUserIDs: allowedUsers,
|
|
allUsers: p.AllUsers,
|
|
}
|
|
m.mu.Unlock()
|
|
|
|
// Call plugin init function
|
|
callPluginInit(ctx, m.plugins[p.ID])
|
|
|
|
return nil
|
|
}
|
|
|
|
// parsePluginConfig parses a JSON config string into a map of string values.
|
|
// For Extism, all config values must be strings, so non-string values are serialized as JSON.
|
|
func parsePluginConfig(configJSON string) (map[string]string, error) {
|
|
if configJSON == "" {
|
|
return nil, nil
|
|
}
|
|
var rawConfig map[string]any
|
|
if err := json.Unmarshal([]byte(configJSON), &rawConfig); err != nil {
|
|
return nil, fmt.Errorf("parsing plugin config: %w", err)
|
|
}
|
|
pluginConfig := make(map[string]string)
|
|
for key, value := range rawConfig {
|
|
switch v := value.(type) {
|
|
case string:
|
|
pluginConfig[key] = v
|
|
default:
|
|
// Serialize non-string values as JSON
|
|
jsonBytes, err := json.Marshal(v)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("serializing config value %q: %w", key, err)
|
|
}
|
|
pluginConfig[key] = string(jsonBytes)
|
|
}
|
|
}
|
|
return pluginConfig, nil
|
|
}
|
|
|
|
// buildAllowedPaths constructs the extism AllowedPaths map for filesystem access.
|
|
// When allowWriteAccess is false (default), paths are prefixed with "ro:" for read-only.
|
|
// Only libraries that match the allowed set (or all libraries if allLibraries is true) are included.
|
|
func buildAllowedPaths(ctx context.Context, libraries model.Libraries, allowedLibraryIDs []int, allLibraries, allowWriteAccess bool) map[string]string {
|
|
allowedLibrarySet := make(map[int]struct{}, len(allowedLibraryIDs))
|
|
for _, id := range allowedLibraryIDs {
|
|
allowedLibrarySet[id] = struct{}{}
|
|
}
|
|
allowedPaths := make(map[string]string)
|
|
for _, lib := range libraries {
|
|
_, allowed := allowedLibrarySet[lib.ID]
|
|
if allLibraries || allowed {
|
|
mountPoint := toPluginMountPoint(int32(lib.ID))
|
|
hostPath := lib.Path
|
|
if !allowWriteAccess {
|
|
hostPath = "ro:" + hostPath
|
|
}
|
|
allowedPaths[hostPath] = mountPoint
|
|
log.Trace(ctx, "Added library to allowed paths", "libraryID", lib.ID, "mountPoint", mountPoint, "writeAccess", allowWriteAccess, "hostPath", hostPath)
|
|
}
|
|
}
|
|
if allowWriteAccess {
|
|
log.Info(ctx, "Granting read-write filesystem access to libraries", "libraryCount", len(allowedPaths), "allLibraries", allLibraries)
|
|
} else {
|
|
log.Debug(ctx, "Granting read-only filesystem access to libraries", "libraryCount", len(allowedPaths), "allLibraries", allLibraries)
|
|
}
|
|
return allowedPaths
|
|
}
|