mirror of
https://github.com/navidrome/navidrome.git
synced 2026-03-04 06:35:52 +00:00
* feat(plugins): define TaskQueue host service interface Add the TaskQueueService interface with CreateQueue, Enqueue, GetTaskStatus, and CancelTask methods plus QueueConfig struct. * feat(plugins): define TaskWorker capability for task execution callbacks * feat(plugins): add taskqueue permission to manifest schema Add TaskQueuePermission with maxConcurrency option. * feat(plugins): implement TaskQueue service with SQLite persistence and workers Per-plugin SQLite database with queues and tasks tables. Worker goroutines dequeue tasks and invoke nd_task_execute callback. Exponential backoff retries, rate limiting via delayMs, automatic cleanup of terminal tasks. * feat(plugins): require TaskWorker capability for taskqueue permission * feat(plugins): register TaskQueue host service in manager * feat(plugins): add test-taskqueue plugin for integration testing * feat(plugins): add integration tests for TaskQueue host service * docs: document TaskQueue module for persistent task queues Signed-off-by: Deluan <deluan@navidrome.org> * fix(plugins): harden TaskQueue host service with validation and safety improvements Add input validation (queue name length, payload size limits), extract status string constants to eliminate raw SQL literals, make CreateQueue idempotent via upsert for crash recovery, fix RetentionMs default check for negative values, cap exponential backoff at 1 hour to prevent overflow, and replace manual mutex-based delay enforcement with rate.Limiter from golang.org/x/time/rate for correct concurrent worker serialization. * refactor(plugins): remove capability check for TaskWorker in TaskQueue host service Signed-off-by: Deluan <deluan@navidrome.org> * fix(plugins): use context-aware database execution in TaskQueue host service Signed-off-by: Deluan <deluan@navidrome.org> * refactor(plugins): streamline task queue configuration and error handling Signed-off-by: Deluan <deluan@navidrome.org> * feat(plugins): increase maxConcurrency for task queue and handle budget exhaustion Signed-off-by: Deluan <deluan@navidrome.org> * refactor(plugins): simplify goroutine management in task queue service Signed-off-by: Deluan <deluan@navidrome.org> * feat(plugins): update TaskWorker interface to return status messages and refactor task queue service Signed-off-by: Deluan <deluan@navidrome.org> * feat(plugins): add ClearQueue function to remove pending tasks from a specified queue Signed-off-by: Deluan <deluan@navidrome.org> * refactor(plugins): use migrateDB for task queue schema and fix constant name collision Replaced the raw db.Exec call in createTaskQueueSchema with migrateDB, matching the pattern used by createKVStoreSchema. This enables version-tracked schema migrations via SQLite's PRAGMA user_version, allowing future schema changes to be appended incrementally. Also renamed cleanupInterval to taskCleanupInterval to resolve a redeclaration conflict with host_kvstore.go. * regenerate PDKs Signed-off-by: Deluan <deluan@navidrome.org> --------- Signed-off-by: Deluan <deluan@navidrome.org>
278 lines
6.9 KiB
Go
278 lines
6.9 KiB
Go
// Code generated by ndpgen. DO NOT EDIT.
|
|
//
|
|
// This file contains client wrappers for the Task host service.
|
|
// It is intended for use in Navidrome plugins built with TinyGo.
|
|
//
|
|
//go:build wasip1
|
|
|
|
package host
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
|
|
"github.com/navidrome/navidrome/plugins/pdk/go/pdk"
|
|
)
|
|
|
|
// QueueConfig represents the QueueConfig data structure.
|
|
// QueueConfig holds configuration for a task queue.
|
|
type QueueConfig struct {
|
|
Concurrency int32 `json:"concurrency"`
|
|
MaxRetries int32 `json:"maxRetries"`
|
|
BackoffMs int64 `json:"backoffMs"`
|
|
DelayMs int64 `json:"delayMs"`
|
|
RetentionMs int64 `json:"retentionMs"`
|
|
}
|
|
|
|
// TaskInfo represents the TaskInfo data structure.
|
|
// TaskInfo holds the current state of a task.
|
|
type TaskInfo struct {
|
|
Status string `json:"status"`
|
|
Message string `json:"message"`
|
|
Attempt int32 `json:"attempt"`
|
|
}
|
|
|
|
// task_createqueue is the host function provided by Navidrome.
|
|
//
|
|
//go:wasmimport extism:host/user task_createqueue
|
|
func task_createqueue(uint64) uint64
|
|
|
|
// task_enqueue is the host function provided by Navidrome.
|
|
//
|
|
//go:wasmimport extism:host/user task_enqueue
|
|
func task_enqueue(uint64) uint64
|
|
|
|
// task_get is the host function provided by Navidrome.
|
|
//
|
|
//go:wasmimport extism:host/user task_get
|
|
func task_get(uint64) uint64
|
|
|
|
// task_cancel is the host function provided by Navidrome.
|
|
//
|
|
//go:wasmimport extism:host/user task_cancel
|
|
func task_cancel(uint64) uint64
|
|
|
|
// task_clearqueue is the host function provided by Navidrome.
|
|
//
|
|
//go:wasmimport extism:host/user task_clearqueue
|
|
func task_clearqueue(uint64) uint64
|
|
|
|
type taskCreateQueueRequest struct {
|
|
Name string `json:"name"`
|
|
Config QueueConfig `json:"config"`
|
|
}
|
|
|
|
type taskEnqueueRequest struct {
|
|
QueueName string `json:"queueName"`
|
|
Payload []byte `json:"payload"`
|
|
}
|
|
|
|
type taskEnqueueResponse struct {
|
|
Result string `json:"result,omitempty"`
|
|
Error string `json:"error,omitempty"`
|
|
}
|
|
|
|
type taskGetRequest struct {
|
|
TaskID string `json:"taskId"`
|
|
}
|
|
|
|
type taskGetResponse struct {
|
|
Result *TaskInfo `json:"result,omitempty"`
|
|
Error string `json:"error,omitempty"`
|
|
}
|
|
|
|
type taskCancelRequest struct {
|
|
TaskID string `json:"taskId"`
|
|
}
|
|
|
|
type taskClearQueueRequest struct {
|
|
QueueName string `json:"queueName"`
|
|
}
|
|
|
|
type taskClearQueueResponse struct {
|
|
Result int64 `json:"result,omitempty"`
|
|
Error string `json:"error,omitempty"`
|
|
}
|
|
|
|
// TaskCreateQueue calls the task_createqueue host function.
|
|
// CreateQueue creates a named task queue with the given configuration.
|
|
// Zero-value fields in config use sensible defaults.
|
|
// If a queue with the same name already exists, returns an error.
|
|
// On startup, this also recovers any stale "running" tasks from a previous crash.
|
|
func TaskCreateQueue(name string, config QueueConfig) error {
|
|
// Marshal request to JSON
|
|
req := taskCreateQueueRequest{
|
|
Name: name,
|
|
Config: config,
|
|
}
|
|
reqBytes, err := json.Marshal(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
reqMem := pdk.AllocateBytes(reqBytes)
|
|
defer reqMem.Free()
|
|
|
|
// Call the host function
|
|
responsePtr := task_createqueue(reqMem.Offset())
|
|
|
|
// Read the response from memory
|
|
responseMem := pdk.FindMemory(responsePtr)
|
|
responseBytes := responseMem.ReadBytes()
|
|
|
|
// Parse error-only response
|
|
var response struct {
|
|
Error string `json:"error,omitempty"`
|
|
}
|
|
if err := json.Unmarshal(responseBytes, &response); err != nil {
|
|
return err
|
|
}
|
|
if response.Error != "" {
|
|
return errors.New(response.Error)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// TaskEnqueue calls the task_enqueue host function.
|
|
// Enqueue adds a task to the named queue. Returns the task ID.
|
|
// payload is opaque bytes passed back to the plugin on execution.
|
|
func TaskEnqueue(queueName string, payload []byte) (string, error) {
|
|
// Marshal request to JSON
|
|
req := taskEnqueueRequest{
|
|
QueueName: queueName,
|
|
Payload: payload,
|
|
}
|
|
reqBytes, err := json.Marshal(req)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
reqMem := pdk.AllocateBytes(reqBytes)
|
|
defer reqMem.Free()
|
|
|
|
// Call the host function
|
|
responsePtr := task_enqueue(reqMem.Offset())
|
|
|
|
// Read the response from memory
|
|
responseMem := pdk.FindMemory(responsePtr)
|
|
responseBytes := responseMem.ReadBytes()
|
|
|
|
// Parse the response
|
|
var response taskEnqueueResponse
|
|
if err := json.Unmarshal(responseBytes, &response); err != nil {
|
|
return "", err
|
|
}
|
|
|
|
// Convert Error field to Go error
|
|
if response.Error != "" {
|
|
return "", errors.New(response.Error)
|
|
}
|
|
|
|
return response.Result, nil
|
|
}
|
|
|
|
// TaskGet calls the task_get host function.
|
|
// Get returns the current state of a task including its status,
|
|
// message, and attempt count.
|
|
func TaskGet(taskID string) (*TaskInfo, error) {
|
|
// Marshal request to JSON
|
|
req := taskGetRequest{
|
|
TaskID: taskID,
|
|
}
|
|
reqBytes, err := json.Marshal(req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
reqMem := pdk.AllocateBytes(reqBytes)
|
|
defer reqMem.Free()
|
|
|
|
// Call the host function
|
|
responsePtr := task_get(reqMem.Offset())
|
|
|
|
// Read the response from memory
|
|
responseMem := pdk.FindMemory(responsePtr)
|
|
responseBytes := responseMem.ReadBytes()
|
|
|
|
// Parse the response
|
|
var response taskGetResponse
|
|
if err := json.Unmarshal(responseBytes, &response); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Convert Error field to Go error
|
|
if response.Error != "" {
|
|
return nil, errors.New(response.Error)
|
|
}
|
|
|
|
return response.Result, nil
|
|
}
|
|
|
|
// TaskCancel calls the task_cancel host function.
|
|
// Cancel cancels a pending task. Returns error if already
|
|
// running, completed, or failed.
|
|
func TaskCancel(taskID string) error {
|
|
// Marshal request to JSON
|
|
req := taskCancelRequest{
|
|
TaskID: taskID,
|
|
}
|
|
reqBytes, err := json.Marshal(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
reqMem := pdk.AllocateBytes(reqBytes)
|
|
defer reqMem.Free()
|
|
|
|
// Call the host function
|
|
responsePtr := task_cancel(reqMem.Offset())
|
|
|
|
// Read the response from memory
|
|
responseMem := pdk.FindMemory(responsePtr)
|
|
responseBytes := responseMem.ReadBytes()
|
|
|
|
// Parse error-only response
|
|
var response struct {
|
|
Error string `json:"error,omitempty"`
|
|
}
|
|
if err := json.Unmarshal(responseBytes, &response); err != nil {
|
|
return err
|
|
}
|
|
if response.Error != "" {
|
|
return errors.New(response.Error)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// TaskClearQueue calls the task_clearqueue host function.
|
|
// ClearQueue removes all pending tasks from the named queue.
|
|
// Running tasks are not affected. Returns the number of tasks removed.
|
|
func TaskClearQueue(queueName string) (int64, error) {
|
|
// Marshal request to JSON
|
|
req := taskClearQueueRequest{
|
|
QueueName: queueName,
|
|
}
|
|
reqBytes, err := json.Marshal(req)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
reqMem := pdk.AllocateBytes(reqBytes)
|
|
defer reqMem.Free()
|
|
|
|
// Call the host function
|
|
responsePtr := task_clearqueue(reqMem.Offset())
|
|
|
|
// Read the response from memory
|
|
responseMem := pdk.FindMemory(responsePtr)
|
|
responseBytes := responseMem.ReadBytes()
|
|
|
|
// Parse the response
|
|
var response taskClearQueueResponse
|
|
if err := json.Unmarshal(responseBytes, &response); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
// Convert Error field to Go error
|
|
if response.Error != "" {
|
|
return 0, errors.New(response.Error)
|
|
}
|
|
|
|
return response.Result, nil
|
|
}
|