mirror of
https://github.com/navidrome/navidrome.git
synced 2026-03-04 06:35:52 +00:00
* feat(plugins): define TaskQueue host service interface Add the TaskQueueService interface with CreateQueue, Enqueue, GetTaskStatus, and CancelTask methods plus QueueConfig struct. * feat(plugins): define TaskWorker capability for task execution callbacks * feat(plugins): add taskqueue permission to manifest schema Add TaskQueuePermission with maxConcurrency option. * feat(plugins): implement TaskQueue service with SQLite persistence and workers Per-plugin SQLite database with queues and tasks tables. Worker goroutines dequeue tasks and invoke nd_task_execute callback. Exponential backoff retries, rate limiting via delayMs, automatic cleanup of terminal tasks. * feat(plugins): require TaskWorker capability for taskqueue permission * feat(plugins): register TaskQueue host service in manager * feat(plugins): add test-taskqueue plugin for integration testing * feat(plugins): add integration tests for TaskQueue host service * docs: document TaskQueue module for persistent task queues Signed-off-by: Deluan <deluan@navidrome.org> * fix(plugins): harden TaskQueue host service with validation and safety improvements Add input validation (queue name length, payload size limits), extract status string constants to eliminate raw SQL literals, make CreateQueue idempotent via upsert for crash recovery, fix RetentionMs default check for negative values, cap exponential backoff at 1 hour to prevent overflow, and replace manual mutex-based delay enforcement with rate.Limiter from golang.org/x/time/rate for correct concurrent worker serialization. * refactor(plugins): remove capability check for TaskWorker in TaskQueue host service Signed-off-by: Deluan <deluan@navidrome.org> * fix(plugins): use context-aware database execution in TaskQueue host service Signed-off-by: Deluan <deluan@navidrome.org> * refactor(plugins): streamline task queue configuration and error handling Signed-off-by: Deluan <deluan@navidrome.org> * feat(plugins): increase maxConcurrency for task queue and handle budget exhaustion Signed-off-by: Deluan <deluan@navidrome.org> * refactor(plugins): simplify goroutine management in task queue service Signed-off-by: Deluan <deluan@navidrome.org> * feat(plugins): update TaskWorker interface to return status messages and refactor task queue service Signed-off-by: Deluan <deluan@navidrome.org> * feat(plugins): add ClearQueue function to remove pending tasks from a specified queue Signed-off-by: Deluan <deluan@navidrome.org> * refactor(plugins): use migrateDB for task queue schema and fix constant name collision Replaced the raw db.Exec call in createTaskQueueSchema with migrateDB, matching the pattern used by createKVStoreSchema. This enables version-tracked schema migrations via SQLite's PRAGMA user_version, allowing future schema changes to be appended incrementally. Also renamed cleanupInterval to taskCleanupInterval to resolve a redeclaration conflict with host_kvstore.go. * regenerate PDKs Signed-off-by: Deluan <deluan@navidrome.org> --------- Signed-off-by: Deluan <deluan@navidrome.org>
267 lines
7.1 KiB
Go
267 lines
7.1 KiB
Go
// Code generated by ndpgen. DO NOT EDIT.
|
|
|
|
package host
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
|
|
extism "github.com/extism/go-sdk"
|
|
)
|
|
|
|
// TaskCreateQueueRequest is the request type for Task.CreateQueue.
|
|
type TaskCreateQueueRequest struct {
|
|
Name string `json:"name"`
|
|
Config QueueConfig `json:"config"`
|
|
}
|
|
|
|
// TaskCreateQueueResponse is the response type for Task.CreateQueue.
|
|
type TaskCreateQueueResponse struct {
|
|
Error string `json:"error,omitempty"`
|
|
}
|
|
|
|
// TaskEnqueueRequest is the request type for Task.Enqueue.
|
|
type TaskEnqueueRequest struct {
|
|
QueueName string `json:"queueName"`
|
|
Payload []byte `json:"payload"`
|
|
}
|
|
|
|
// TaskEnqueueResponse is the response type for Task.Enqueue.
|
|
type TaskEnqueueResponse struct {
|
|
Result string `json:"result,omitempty"`
|
|
Error string `json:"error,omitempty"`
|
|
}
|
|
|
|
// TaskGetRequest is the request type for Task.Get.
|
|
type TaskGetRequest struct {
|
|
TaskID string `json:"taskId"`
|
|
}
|
|
|
|
// TaskGetResponse is the response type for Task.Get.
|
|
type TaskGetResponse struct {
|
|
Result *TaskInfo `json:"result,omitempty"`
|
|
Error string `json:"error,omitempty"`
|
|
}
|
|
|
|
// TaskCancelRequest is the request type for Task.Cancel.
|
|
type TaskCancelRequest struct {
|
|
TaskID string `json:"taskId"`
|
|
}
|
|
|
|
// TaskCancelResponse is the response type for Task.Cancel.
|
|
type TaskCancelResponse struct {
|
|
Error string `json:"error,omitempty"`
|
|
}
|
|
|
|
// TaskClearQueueRequest is the request type for Task.ClearQueue.
|
|
type TaskClearQueueRequest struct {
|
|
QueueName string `json:"queueName"`
|
|
}
|
|
|
|
// TaskClearQueueResponse is the response type for Task.ClearQueue.
|
|
type TaskClearQueueResponse struct {
|
|
Result int64 `json:"result,omitempty"`
|
|
Error string `json:"error,omitempty"`
|
|
}
|
|
|
|
// RegisterTaskHostFunctions registers Task service host functions.
|
|
// The returned host functions should be added to the plugin's configuration.
|
|
func RegisterTaskHostFunctions(service TaskService) []extism.HostFunction {
|
|
return []extism.HostFunction{
|
|
newTaskCreateQueueHostFunction(service),
|
|
newTaskEnqueueHostFunction(service),
|
|
newTaskGetHostFunction(service),
|
|
newTaskCancelHostFunction(service),
|
|
newTaskClearQueueHostFunction(service),
|
|
}
|
|
}
|
|
|
|
func newTaskCreateQueueHostFunction(service TaskService) extism.HostFunction {
|
|
return extism.NewHostFunctionWithStack(
|
|
"task_createqueue",
|
|
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
|
|
// Read JSON request from plugin memory
|
|
reqBytes, err := p.ReadBytes(stack[0])
|
|
if err != nil {
|
|
taskWriteError(p, stack, err)
|
|
return
|
|
}
|
|
var req TaskCreateQueueRequest
|
|
if err := json.Unmarshal(reqBytes, &req); err != nil {
|
|
taskWriteError(p, stack, err)
|
|
return
|
|
}
|
|
|
|
// Call the service method
|
|
if svcErr := service.CreateQueue(ctx, req.Name, req.Config); svcErr != nil {
|
|
taskWriteError(p, stack, svcErr)
|
|
return
|
|
}
|
|
|
|
// Write JSON response to plugin memory
|
|
resp := TaskCreateQueueResponse{}
|
|
taskWriteResponse(p, stack, resp)
|
|
},
|
|
[]extism.ValueType{extism.ValueTypePTR},
|
|
[]extism.ValueType{extism.ValueTypePTR},
|
|
)
|
|
}
|
|
|
|
func newTaskEnqueueHostFunction(service TaskService) extism.HostFunction {
|
|
return extism.NewHostFunctionWithStack(
|
|
"task_enqueue",
|
|
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
|
|
// Read JSON request from plugin memory
|
|
reqBytes, err := p.ReadBytes(stack[0])
|
|
if err != nil {
|
|
taskWriteError(p, stack, err)
|
|
return
|
|
}
|
|
var req TaskEnqueueRequest
|
|
if err := json.Unmarshal(reqBytes, &req); err != nil {
|
|
taskWriteError(p, stack, err)
|
|
return
|
|
}
|
|
|
|
// Call the service method
|
|
result, svcErr := service.Enqueue(ctx, req.QueueName, req.Payload)
|
|
if svcErr != nil {
|
|
taskWriteError(p, stack, svcErr)
|
|
return
|
|
}
|
|
|
|
// Write JSON response to plugin memory
|
|
resp := TaskEnqueueResponse{
|
|
Result: result,
|
|
}
|
|
taskWriteResponse(p, stack, resp)
|
|
},
|
|
[]extism.ValueType{extism.ValueTypePTR},
|
|
[]extism.ValueType{extism.ValueTypePTR},
|
|
)
|
|
}
|
|
|
|
func newTaskGetHostFunction(service TaskService) extism.HostFunction {
|
|
return extism.NewHostFunctionWithStack(
|
|
"task_get",
|
|
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
|
|
// Read JSON request from plugin memory
|
|
reqBytes, err := p.ReadBytes(stack[0])
|
|
if err != nil {
|
|
taskWriteError(p, stack, err)
|
|
return
|
|
}
|
|
var req TaskGetRequest
|
|
if err := json.Unmarshal(reqBytes, &req); err != nil {
|
|
taskWriteError(p, stack, err)
|
|
return
|
|
}
|
|
|
|
// Call the service method
|
|
result, svcErr := service.Get(ctx, req.TaskID)
|
|
if svcErr != nil {
|
|
taskWriteError(p, stack, svcErr)
|
|
return
|
|
}
|
|
|
|
// Write JSON response to plugin memory
|
|
resp := TaskGetResponse{
|
|
Result: result,
|
|
}
|
|
taskWriteResponse(p, stack, resp)
|
|
},
|
|
[]extism.ValueType{extism.ValueTypePTR},
|
|
[]extism.ValueType{extism.ValueTypePTR},
|
|
)
|
|
}
|
|
|
|
func newTaskCancelHostFunction(service TaskService) extism.HostFunction {
|
|
return extism.NewHostFunctionWithStack(
|
|
"task_cancel",
|
|
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
|
|
// Read JSON request from plugin memory
|
|
reqBytes, err := p.ReadBytes(stack[0])
|
|
if err != nil {
|
|
taskWriteError(p, stack, err)
|
|
return
|
|
}
|
|
var req TaskCancelRequest
|
|
if err := json.Unmarshal(reqBytes, &req); err != nil {
|
|
taskWriteError(p, stack, err)
|
|
return
|
|
}
|
|
|
|
// Call the service method
|
|
if svcErr := service.Cancel(ctx, req.TaskID); svcErr != nil {
|
|
taskWriteError(p, stack, svcErr)
|
|
return
|
|
}
|
|
|
|
// Write JSON response to plugin memory
|
|
resp := TaskCancelResponse{}
|
|
taskWriteResponse(p, stack, resp)
|
|
},
|
|
[]extism.ValueType{extism.ValueTypePTR},
|
|
[]extism.ValueType{extism.ValueTypePTR},
|
|
)
|
|
}
|
|
|
|
func newTaskClearQueueHostFunction(service TaskService) extism.HostFunction {
|
|
return extism.NewHostFunctionWithStack(
|
|
"task_clearqueue",
|
|
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
|
|
// Read JSON request from plugin memory
|
|
reqBytes, err := p.ReadBytes(stack[0])
|
|
if err != nil {
|
|
taskWriteError(p, stack, err)
|
|
return
|
|
}
|
|
var req TaskClearQueueRequest
|
|
if err := json.Unmarshal(reqBytes, &req); err != nil {
|
|
taskWriteError(p, stack, err)
|
|
return
|
|
}
|
|
|
|
// Call the service method
|
|
result, svcErr := service.ClearQueue(ctx, req.QueueName)
|
|
if svcErr != nil {
|
|
taskWriteError(p, stack, svcErr)
|
|
return
|
|
}
|
|
|
|
// Write JSON response to plugin memory
|
|
resp := TaskClearQueueResponse{
|
|
Result: result,
|
|
}
|
|
taskWriteResponse(p, stack, resp)
|
|
},
|
|
[]extism.ValueType{extism.ValueTypePTR},
|
|
[]extism.ValueType{extism.ValueTypePTR},
|
|
)
|
|
}
|
|
|
|
// taskWriteResponse writes a JSON response to plugin memory.
|
|
func taskWriteResponse(p *extism.CurrentPlugin, stack []uint64, resp any) {
|
|
respBytes, err := json.Marshal(resp)
|
|
if err != nil {
|
|
taskWriteError(p, stack, err)
|
|
return
|
|
}
|
|
respPtr, err := p.WriteBytes(respBytes)
|
|
if err != nil {
|
|
stack[0] = 0
|
|
return
|
|
}
|
|
stack[0] = respPtr
|
|
}
|
|
|
|
// taskWriteError writes an error response to plugin memory.
|
|
func taskWriteError(p *extism.CurrentPlugin, stack []uint64, err error) {
|
|
errResp := struct {
|
|
Error string `json:"error"`
|
|
}{Error: err.Error()}
|
|
respBytes, _ := json.Marshal(errResp)
|
|
respPtr, _ := p.WriteBytes(respBytes)
|
|
stack[0] = respPtr
|
|
}
|