From b76c57d2c44b42e73d1cb7b0b009860d7beb0e8a Mon Sep 17 00:00:00 2001 From: Deluan Date: Thu, 26 Feb 2026 15:08:11 -0500 Subject: [PATCH] feat(plugins): define TaskQueue host service interface Add the TaskQueueService interface with CreateQueue, Enqueue, GetTaskStatus, and CancelTask methods plus QueueConfig struct. --- plugins/host/taskqueue.go | 57 +++++ plugins/host/taskqueue_gen.go | 220 ++++++++++++++++++ plugins/pdk/go/host/nd_host_taskqueue.go | 219 +++++++++++++++++ plugins/pdk/go/host/nd_host_taskqueue_stub.go | 84 +++++++ plugins/pdk/python/host/nd_host_taskqueue.py | 153 ++++++++++++ .../rust/nd-pdk-host/src/nd_host_taskqueue.rs | 184 +++++++++++++++ 6 files changed, 917 insertions(+) create mode 100644 plugins/host/taskqueue.go create mode 100644 plugins/host/taskqueue_gen.go create mode 100644 plugins/pdk/go/host/nd_host_taskqueue.go create mode 100644 plugins/pdk/go/host/nd_host_taskqueue_stub.go create mode 100644 plugins/pdk/python/host/nd_host_taskqueue.py create mode 100644 plugins/pdk/rust/nd-pdk-host/src/nd_host_taskqueue.rs diff --git a/plugins/host/taskqueue.go b/plugins/host/taskqueue.go new file mode 100644 index 000000000..63df8713b --- /dev/null +++ b/plugins/host/taskqueue.go @@ -0,0 +1,57 @@ +package host + +import "context" + +// QueueConfig holds configuration for a task queue. +type QueueConfig struct { + // Concurrency is the max number of parallel workers. Default: 1. + // Capped by the plugin's manifest maxConcurrency. + Concurrency int32 `json:"concurrency"` + + // MaxRetries is the number of times to retry a failed task. Default: 0. + MaxRetries int32 `json:"maxRetries"` + + // BackoffMs is the initial backoff between retries in milliseconds. + // Doubles each retry (exponential: backoffMs * 2^(attempt-1)). Default: 1000. + BackoffMs int64 `json:"backoffMs"` + + // DelayMs is the minimum delay between starting consecutive tasks + // in milliseconds. Useful for rate limiting. Default: 0. + DelayMs int64 `json:"delayMs"` + + // RetentionMs is how long completed/failed/cancelled tasks are kept + // in milliseconds. Default: 3600000 (1h). Min: 60000 (1m). Max: 604800000 (1w). + RetentionMs int64 `json:"retentionMs"` +} + +// TaskQueueService provides persistent task queues for plugins. +// +// This service allows plugins to create named queues with configurable concurrency, +// retry policies, and rate limiting. Tasks are persisted to SQLite and survive +// server restarts. When a task is ready to execute, the host calls the plugin's +// nd_task_execute callback function. +// +//nd:hostservice name=TaskQueue permission=taskqueue +type TaskQueueService interface { + // CreateQueue creates a named task queue with the given configuration. + // Zero-value fields in config use sensible defaults. + // If a queue with the same name already exists, returns an error. + // On startup, this also recovers any stale "running" tasks from a previous crash. + //nd:hostfunc + CreateQueue(ctx context.Context, name string, config QueueConfig) error + + // Enqueue adds a task to the named queue. Returns the task ID. + // payload is opaque bytes passed back to the plugin on execution. + //nd:hostfunc + Enqueue(ctx context.Context, queueName string, payload []byte) (string, error) + + // GetTaskStatus returns the status of a task: "pending", "running", + // "completed", "failed", or "cancelled". + //nd:hostfunc + GetTaskStatus(ctx context.Context, taskID string) (string, error) + + // CancelTask cancels a pending task. Returns error if already + // running, completed, or failed. + //nd:hostfunc + CancelTask(ctx context.Context, taskID string) error +} diff --git a/plugins/host/taskqueue_gen.go b/plugins/host/taskqueue_gen.go new file mode 100644 index 000000000..8edf6967c --- /dev/null +++ b/plugins/host/taskqueue_gen.go @@ -0,0 +1,220 @@ +// Code generated by ndpgen. DO NOT EDIT. + +package host + +import ( + "context" + "encoding/json" + + extism "github.com/extism/go-sdk" +) + +// TaskQueueCreateQueueRequest is the request type for TaskQueue.CreateQueue. +type TaskQueueCreateQueueRequest struct { + Name string `json:"name"` + Config QueueConfig `json:"config"` +} + +// TaskQueueCreateQueueResponse is the response type for TaskQueue.CreateQueue. +type TaskQueueCreateQueueResponse struct { + Error string `json:"error,omitempty"` +} + +// TaskQueueEnqueueRequest is the request type for TaskQueue.Enqueue. +type TaskQueueEnqueueRequest struct { + QueueName string `json:"queueName"` + Payload []byte `json:"payload"` +} + +// TaskQueueEnqueueResponse is the response type for TaskQueue.Enqueue. +type TaskQueueEnqueueResponse struct { + Result string `json:"result,omitempty"` + Error string `json:"error,omitempty"` +} + +// TaskQueueGetTaskStatusRequest is the request type for TaskQueue.GetTaskStatus. +type TaskQueueGetTaskStatusRequest struct { + TaskID string `json:"taskId"` +} + +// TaskQueueGetTaskStatusResponse is the response type for TaskQueue.GetTaskStatus. +type TaskQueueGetTaskStatusResponse struct { + Result string `json:"result,omitempty"` + Error string `json:"error,omitempty"` +} + +// TaskQueueCancelTaskRequest is the request type for TaskQueue.CancelTask. +type TaskQueueCancelTaskRequest struct { + TaskID string `json:"taskId"` +} + +// TaskQueueCancelTaskResponse is the response type for TaskQueue.CancelTask. +type TaskQueueCancelTaskResponse struct { + Error string `json:"error,omitempty"` +} + +// RegisterTaskQueueHostFunctions registers TaskQueue service host functions. +// The returned host functions should be added to the plugin's configuration. +func RegisterTaskQueueHostFunctions(service TaskQueueService) []extism.HostFunction { + return []extism.HostFunction{ + newTaskQueueCreateQueueHostFunction(service), + newTaskQueueEnqueueHostFunction(service), + newTaskQueueGetTaskStatusHostFunction(service), + newTaskQueueCancelTaskHostFunction(service), + } +} + +func newTaskQueueCreateQueueHostFunction(service TaskQueueService) extism.HostFunction { + return extism.NewHostFunctionWithStack( + "taskqueue_createqueue", + func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) { + // Read JSON request from plugin memory + reqBytes, err := p.ReadBytes(stack[0]) + if err != nil { + taskqueueWriteError(p, stack, err) + return + } + var req TaskQueueCreateQueueRequest + if err := json.Unmarshal(reqBytes, &req); err != nil { + taskqueueWriteError(p, stack, err) + return + } + + // Call the service method + if svcErr := service.CreateQueue(ctx, req.Name, req.Config); svcErr != nil { + taskqueueWriteError(p, stack, svcErr) + return + } + + // Write JSON response to plugin memory + resp := TaskQueueCreateQueueResponse{} + taskqueueWriteResponse(p, stack, resp) + }, + []extism.ValueType{extism.ValueTypePTR}, + []extism.ValueType{extism.ValueTypePTR}, + ) +} + +func newTaskQueueEnqueueHostFunction(service TaskQueueService) extism.HostFunction { + return extism.NewHostFunctionWithStack( + "taskqueue_enqueue", + func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) { + // Read JSON request from plugin memory + reqBytes, err := p.ReadBytes(stack[0]) + if err != nil { + taskqueueWriteError(p, stack, err) + return + } + var req TaskQueueEnqueueRequest + if err := json.Unmarshal(reqBytes, &req); err != nil { + taskqueueWriteError(p, stack, err) + return + } + + // Call the service method + result, svcErr := service.Enqueue(ctx, req.QueueName, req.Payload) + if svcErr != nil { + taskqueueWriteError(p, stack, svcErr) + return + } + + // Write JSON response to plugin memory + resp := TaskQueueEnqueueResponse{ + Result: result, + } + taskqueueWriteResponse(p, stack, resp) + }, + []extism.ValueType{extism.ValueTypePTR}, + []extism.ValueType{extism.ValueTypePTR}, + ) +} + +func newTaskQueueGetTaskStatusHostFunction(service TaskQueueService) extism.HostFunction { + return extism.NewHostFunctionWithStack( + "taskqueue_gettaskstatus", + func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) { + // Read JSON request from plugin memory + reqBytes, err := p.ReadBytes(stack[0]) + if err != nil { + taskqueueWriteError(p, stack, err) + return + } + var req TaskQueueGetTaskStatusRequest + if err := json.Unmarshal(reqBytes, &req); err != nil { + taskqueueWriteError(p, stack, err) + return + } + + // Call the service method + result, svcErr := service.GetTaskStatus(ctx, req.TaskID) + if svcErr != nil { + taskqueueWriteError(p, stack, svcErr) + return + } + + // Write JSON response to plugin memory + resp := TaskQueueGetTaskStatusResponse{ + Result: result, + } + taskqueueWriteResponse(p, stack, resp) + }, + []extism.ValueType{extism.ValueTypePTR}, + []extism.ValueType{extism.ValueTypePTR}, + ) +} + +func newTaskQueueCancelTaskHostFunction(service TaskQueueService) extism.HostFunction { + return extism.NewHostFunctionWithStack( + "taskqueue_canceltask", + func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) { + // Read JSON request from plugin memory + reqBytes, err := p.ReadBytes(stack[0]) + if err != nil { + taskqueueWriteError(p, stack, err) + return + } + var req TaskQueueCancelTaskRequest + if err := json.Unmarshal(reqBytes, &req); err != nil { + taskqueueWriteError(p, stack, err) + return + } + + // Call the service method + if svcErr := service.CancelTask(ctx, req.TaskID); svcErr != nil { + taskqueueWriteError(p, stack, svcErr) + return + } + + // Write JSON response to plugin memory + resp := TaskQueueCancelTaskResponse{} + taskqueueWriteResponse(p, stack, resp) + }, + []extism.ValueType{extism.ValueTypePTR}, + []extism.ValueType{extism.ValueTypePTR}, + ) +} + +// taskqueueWriteResponse writes a JSON response to plugin memory. +func taskqueueWriteResponse(p *extism.CurrentPlugin, stack []uint64, resp any) { + respBytes, err := json.Marshal(resp) + if err != nil { + taskqueueWriteError(p, stack, err) + return + } + respPtr, err := p.WriteBytes(respBytes) + if err != nil { + stack[0] = 0 + return + } + stack[0] = respPtr +} + +// taskqueueWriteError writes an error response to plugin memory. +func taskqueueWriteError(p *extism.CurrentPlugin, stack []uint64, err error) { + errResp := struct { + Error string `json:"error"` + }{Error: err.Error()} + respBytes, _ := json.Marshal(errResp) + respPtr, _ := p.WriteBytes(respBytes) + stack[0] = respPtr +} diff --git a/plugins/pdk/go/host/nd_host_taskqueue.go b/plugins/pdk/go/host/nd_host_taskqueue.go new file mode 100644 index 000000000..b21fdac42 --- /dev/null +++ b/plugins/pdk/go/host/nd_host_taskqueue.go @@ -0,0 +1,219 @@ +// Code generated by ndpgen. DO NOT EDIT. +// +// This file contains client wrappers for the TaskQueue host service. +// It is intended for use in Navidrome plugins built with TinyGo. +// +//go:build wasip1 + +package host + +import ( + "encoding/json" + "errors" + + "github.com/navidrome/navidrome/plugins/pdk/go/pdk" +) + +// QueueConfig represents the QueueConfig data structure. +// QueueConfig holds configuration for a task queue. +type QueueConfig struct { + Concurrency int32 `json:"concurrency"` + MaxRetries int32 `json:"maxRetries"` + BackoffMs int64 `json:"backoffMs"` + DelayMs int64 `json:"delayMs"` + RetentionMs int64 `json:"retentionMs"` +} + +// taskqueue_createqueue is the host function provided by Navidrome. +// +//go:wasmimport extism:host/user taskqueue_createqueue +func taskqueue_createqueue(uint64) uint64 + +// taskqueue_enqueue is the host function provided by Navidrome. +// +//go:wasmimport extism:host/user taskqueue_enqueue +func taskqueue_enqueue(uint64) uint64 + +// taskqueue_gettaskstatus is the host function provided by Navidrome. +// +//go:wasmimport extism:host/user taskqueue_gettaskstatus +func taskqueue_gettaskstatus(uint64) uint64 + +// taskqueue_canceltask is the host function provided by Navidrome. +// +//go:wasmimport extism:host/user taskqueue_canceltask +func taskqueue_canceltask(uint64) uint64 + +type taskQueueCreateQueueRequest struct { + Name string `json:"name"` + Config QueueConfig `json:"config"` +} + +type taskQueueEnqueueRequest struct { + QueueName string `json:"queueName"` + Payload []byte `json:"payload"` +} + +type taskQueueEnqueueResponse struct { + Result string `json:"result,omitempty"` + Error string `json:"error,omitempty"` +} + +type taskQueueGetTaskStatusRequest struct { + TaskID string `json:"taskId"` +} + +type taskQueueGetTaskStatusResponse struct { + Result string `json:"result,omitempty"` + Error string `json:"error,omitempty"` +} + +type taskQueueCancelTaskRequest struct { + TaskID string `json:"taskId"` +} + +// TaskQueueCreateQueue calls the taskqueue_createqueue host function. +// CreateQueue creates a named task queue with the given configuration. +// Zero-value fields in config use sensible defaults. +// If a queue with the same name already exists, returns an error. +// On startup, this also recovers any stale "running" tasks from a previous crash. +func TaskQueueCreateQueue(name string, config QueueConfig) error { + // Marshal request to JSON + req := taskQueueCreateQueueRequest{ + Name: name, + Config: config, + } + reqBytes, err := json.Marshal(req) + if err != nil { + return err + } + reqMem := pdk.AllocateBytes(reqBytes) + defer reqMem.Free() + + // Call the host function + responsePtr := taskqueue_createqueue(reqMem.Offset()) + + // Read the response from memory + responseMem := pdk.FindMemory(responsePtr) + responseBytes := responseMem.ReadBytes() + + // Parse error-only response + var response struct { + Error string `json:"error,omitempty"` + } + if err := json.Unmarshal(responseBytes, &response); err != nil { + return err + } + if response.Error != "" { + return errors.New(response.Error) + } + return nil +} + +// TaskQueueEnqueue calls the taskqueue_enqueue host function. +// Enqueue adds a task to the named queue. Returns the task ID. +// payload is opaque bytes passed back to the plugin on execution. +func TaskQueueEnqueue(queueName string, payload []byte) (string, error) { + // Marshal request to JSON + req := taskQueueEnqueueRequest{ + QueueName: queueName, + Payload: payload, + } + reqBytes, err := json.Marshal(req) + if err != nil { + return "", err + } + reqMem := pdk.AllocateBytes(reqBytes) + defer reqMem.Free() + + // Call the host function + responsePtr := taskqueue_enqueue(reqMem.Offset()) + + // Read the response from memory + responseMem := pdk.FindMemory(responsePtr) + responseBytes := responseMem.ReadBytes() + + // Parse the response + var response taskQueueEnqueueResponse + if err := json.Unmarshal(responseBytes, &response); err != nil { + return "", err + } + + // Convert Error field to Go error + if response.Error != "" { + return "", errors.New(response.Error) + } + + return response.Result, nil +} + +// TaskQueueGetTaskStatus calls the taskqueue_gettaskstatus host function. +// GetTaskStatus returns the status of a task: "pending", "running", +// "completed", "failed", or "cancelled". +func TaskQueueGetTaskStatus(taskID string) (string, error) { + // Marshal request to JSON + req := taskQueueGetTaskStatusRequest{ + TaskID: taskID, + } + reqBytes, err := json.Marshal(req) + if err != nil { + return "", err + } + reqMem := pdk.AllocateBytes(reqBytes) + defer reqMem.Free() + + // Call the host function + responsePtr := taskqueue_gettaskstatus(reqMem.Offset()) + + // Read the response from memory + responseMem := pdk.FindMemory(responsePtr) + responseBytes := responseMem.ReadBytes() + + // Parse the response + var response taskQueueGetTaskStatusResponse + if err := json.Unmarshal(responseBytes, &response); err != nil { + return "", err + } + + // Convert Error field to Go error + if response.Error != "" { + return "", errors.New(response.Error) + } + + return response.Result, nil +} + +// TaskQueueCancelTask calls the taskqueue_canceltask host function. +// CancelTask cancels a pending task. Returns error if already +// running, completed, or failed. +func TaskQueueCancelTask(taskID string) error { + // Marshal request to JSON + req := taskQueueCancelTaskRequest{ + TaskID: taskID, + } + reqBytes, err := json.Marshal(req) + if err != nil { + return err + } + reqMem := pdk.AllocateBytes(reqBytes) + defer reqMem.Free() + + // Call the host function + responsePtr := taskqueue_canceltask(reqMem.Offset()) + + // Read the response from memory + responseMem := pdk.FindMemory(responsePtr) + responseBytes := responseMem.ReadBytes() + + // Parse error-only response + var response struct { + Error string `json:"error,omitempty"` + } + if err := json.Unmarshal(responseBytes, &response); err != nil { + return err + } + if response.Error != "" { + return errors.New(response.Error) + } + return nil +} diff --git a/plugins/pdk/go/host/nd_host_taskqueue_stub.go b/plugins/pdk/go/host/nd_host_taskqueue_stub.go new file mode 100644 index 000000000..4ebc36085 --- /dev/null +++ b/plugins/pdk/go/host/nd_host_taskqueue_stub.go @@ -0,0 +1,84 @@ +// Code generated by ndpgen. DO NOT EDIT. +// +// This file contains mock implementations for non-WASM builds. +// These mocks allow IDE support, compilation, and unit testing on non-WASM platforms. +// Plugin authors can use the exported mock instances to set expectations in tests. +// +//go:build !wasip1 + +package host + +import "github.com/stretchr/testify/mock" + +// QueueConfig represents the QueueConfig data structure. +// QueueConfig holds configuration for a task queue. +type QueueConfig struct { + Concurrency int32 `json:"concurrency"` + MaxRetries int32 `json:"maxRetries"` + BackoffMs int64 `json:"backoffMs"` + DelayMs int64 `json:"delayMs"` + RetentionMs int64 `json:"retentionMs"` +} + +// mockTaskQueueService is the mock implementation for testing. +type mockTaskQueueService struct { + mock.Mock +} + +// TaskQueueMock is the auto-instantiated mock instance for testing. +// Use this to set expectations: host.TaskQueueMock.On("MethodName", args...).Return(values...) +var TaskQueueMock = &mockTaskQueueService{} + +// CreateQueue is the mock method for TaskQueueCreateQueue. +func (m *mockTaskQueueService) CreateQueue(name string, config QueueConfig) error { + args := m.Called(name, config) + return args.Error(0) +} + +// TaskQueueCreateQueue delegates to the mock instance. +// CreateQueue creates a named task queue with the given configuration. +// Zero-value fields in config use sensible defaults. +// If a queue with the same name already exists, returns an error. +// On startup, this also recovers any stale "running" tasks from a previous crash. +func TaskQueueCreateQueue(name string, config QueueConfig) error { + return TaskQueueMock.CreateQueue(name, config) +} + +// Enqueue is the mock method for TaskQueueEnqueue. +func (m *mockTaskQueueService) Enqueue(queueName string, payload []byte) (string, error) { + args := m.Called(queueName, payload) + return args.String(0), args.Error(1) +} + +// TaskQueueEnqueue delegates to the mock instance. +// Enqueue adds a task to the named queue. Returns the task ID. +// payload is opaque bytes passed back to the plugin on execution. +func TaskQueueEnqueue(queueName string, payload []byte) (string, error) { + return TaskQueueMock.Enqueue(queueName, payload) +} + +// GetTaskStatus is the mock method for TaskQueueGetTaskStatus. +func (m *mockTaskQueueService) GetTaskStatus(taskID string) (string, error) { + args := m.Called(taskID) + return args.String(0), args.Error(1) +} + +// TaskQueueGetTaskStatus delegates to the mock instance. +// GetTaskStatus returns the status of a task: "pending", "running", +// "completed", "failed", or "cancelled". +func TaskQueueGetTaskStatus(taskID string) (string, error) { + return TaskQueueMock.GetTaskStatus(taskID) +} + +// CancelTask is the mock method for TaskQueueCancelTask. +func (m *mockTaskQueueService) CancelTask(taskID string) error { + args := m.Called(taskID) + return args.Error(0) +} + +// TaskQueueCancelTask delegates to the mock instance. +// CancelTask cancels a pending task. Returns error if already +// running, completed, or failed. +func TaskQueueCancelTask(taskID string) error { + return TaskQueueMock.CancelTask(taskID) +} diff --git a/plugins/pdk/python/host/nd_host_taskqueue.py b/plugins/pdk/python/host/nd_host_taskqueue.py new file mode 100644 index 000000000..5b7471b5d --- /dev/null +++ b/plugins/pdk/python/host/nd_host_taskqueue.py @@ -0,0 +1,153 @@ +# Code generated by ndpgen. DO NOT EDIT. +# +# This file contains client wrappers for the TaskQueue host service. +# It is intended for use in Navidrome plugins built with extism-py. +# +# IMPORTANT: Due to a limitation in extism-py, you cannot import this file directly. +# The @extism.import_fn decorators are only detected when defined in the plugin's +# main __init__.py file. Copy the needed functions from this file into your plugin. + +from dataclasses import dataclass +from typing import Any + +import extism +import json + + +class HostFunctionError(Exception): + """Raised when a host function returns an error.""" + pass + + +@extism.import_fn("extism:host/user", "taskqueue_createqueue") +def _taskqueue_createqueue(offset: int) -> int: + """Raw host function - do not call directly.""" + ... + + +@extism.import_fn("extism:host/user", "taskqueue_enqueue") +def _taskqueue_enqueue(offset: int) -> int: + """Raw host function - do not call directly.""" + ... + + +@extism.import_fn("extism:host/user", "taskqueue_gettaskstatus") +def _taskqueue_gettaskstatus(offset: int) -> int: + """Raw host function - do not call directly.""" + ... + + +@extism.import_fn("extism:host/user", "taskqueue_canceltask") +def _taskqueue_canceltask(offset: int) -> int: + """Raw host function - do not call directly.""" + ... + + +def taskqueue_create_queue(name: str, config: Any) -> None: + """CreateQueue creates a named task queue with the given configuration. +Zero-value fields in config use sensible defaults. +If a queue with the same name already exists, returns an error. +On startup, this also recovers any stale "running" tasks from a previous crash. + + Args: + name: str parameter. + config: Any parameter. + + Raises: + HostFunctionError: If the host function returns an error. + """ + request = { + "name": name, + "config": config, + } + request_bytes = json.dumps(request).encode("utf-8") + request_mem = extism.memory.alloc(request_bytes) + response_offset = _taskqueue_createqueue(request_mem.offset) + response_mem = extism.memory.find(response_offset) + response = json.loads(extism.memory.string(response_mem)) + + if response.get("error"): + raise HostFunctionError(response["error"]) + + + +def taskqueue_enqueue(queue_name: str, payload: bytes) -> str: + """Enqueue adds a task to the named queue. Returns the task ID. +payload is opaque bytes passed back to the plugin on execution. + + Args: + queue_name: str parameter. + payload: bytes parameter. + + Returns: + str: The result value. + + Raises: + HostFunctionError: If the host function returns an error. + """ + request = { + "queueName": queue_name, + "payload": payload, + } + request_bytes = json.dumps(request).encode("utf-8") + request_mem = extism.memory.alloc(request_bytes) + response_offset = _taskqueue_enqueue(request_mem.offset) + response_mem = extism.memory.find(response_offset) + response = json.loads(extism.memory.string(response_mem)) + + if response.get("error"): + raise HostFunctionError(response["error"]) + + return response.get("result", "") + + +def taskqueue_get_task_status(task_id: str) -> str: + """GetTaskStatus returns the status of a task: "pending", "running", +"completed", "failed", or "cancelled". + + Args: + task_id: str parameter. + + Returns: + str: The result value. + + Raises: + HostFunctionError: If the host function returns an error. + """ + request = { + "taskId": task_id, + } + request_bytes = json.dumps(request).encode("utf-8") + request_mem = extism.memory.alloc(request_bytes) + response_offset = _taskqueue_gettaskstatus(request_mem.offset) + response_mem = extism.memory.find(response_offset) + response = json.loads(extism.memory.string(response_mem)) + + if response.get("error"): + raise HostFunctionError(response["error"]) + + return response.get("result", "") + + +def taskqueue_cancel_task(task_id: str) -> None: + """CancelTask cancels a pending task. Returns error if already +running, completed, or failed. + + Args: + task_id: str parameter. + + Raises: + HostFunctionError: If the host function returns an error. + """ + request = { + "taskId": task_id, + } + request_bytes = json.dumps(request).encode("utf-8") + request_mem = extism.memory.alloc(request_bytes) + response_offset = _taskqueue_canceltask(request_mem.offset) + response_mem = extism.memory.find(response_offset) + response = json.loads(extism.memory.string(response_mem)) + + if response.get("error"): + raise HostFunctionError(response["error"]) + diff --git a/plugins/pdk/rust/nd-pdk-host/src/nd_host_taskqueue.rs b/plugins/pdk/rust/nd-pdk-host/src/nd_host_taskqueue.rs new file mode 100644 index 000000000..ea2194f2f --- /dev/null +++ b/plugins/pdk/rust/nd-pdk-host/src/nd_host_taskqueue.rs @@ -0,0 +1,184 @@ +// Code generated by ndpgen. DO NOT EDIT. +// +// This file contains client wrappers for the TaskQueue host service. +// It is intended for use in Navidrome plugins built with extism-pdk. + +use extism_pdk::*; +use serde::{Deserialize, Serialize}; + +/// QueueConfig holds configuration for a task queue. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct QueueConfig { + pub concurrency: i32, + pub max_retries: i32, + pub backoff_ms: i64, + pub delay_ms: i64, + pub retention_ms: i64, +} + +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +struct TaskQueueCreateQueueRequest { + name: String, + config: QueueConfig, +} + +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "camelCase")] +struct TaskQueueCreateQueueResponse { + #[serde(default)] + error: Option, +} + +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +struct TaskQueueEnqueueRequest { + queue_name: String, + payload: Vec, +} + +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "camelCase")] +struct TaskQueueEnqueueResponse { + #[serde(default)] + result: String, + #[serde(default)] + error: Option, +} + +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +struct TaskQueueGetTaskStatusRequest { + task_id: String, +} + +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "camelCase")] +struct TaskQueueGetTaskStatusResponse { + #[serde(default)] + result: String, + #[serde(default)] + error: Option, +} + +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +struct TaskQueueCancelTaskRequest { + task_id: String, +} + +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "camelCase")] +struct TaskQueueCancelTaskResponse { + #[serde(default)] + error: Option, +} + +#[host_fn] +extern "ExtismHost" { + fn taskqueue_createqueue(input: Json) -> Json; + fn taskqueue_enqueue(input: Json) -> Json; + fn taskqueue_gettaskstatus(input: Json) -> Json; + fn taskqueue_canceltask(input: Json) -> Json; +} + +/// CreateQueue creates a named task queue with the given configuration. +/// Zero-value fields in config use sensible defaults. +/// If a queue with the same name already exists, returns an error. +/// On startup, this also recovers any stale "running" tasks from a previous crash. +/// +/// # Arguments +/// * `name` - String parameter. +/// * `config` - QueueConfig parameter. +/// +/// # Errors +/// Returns an error if the host function call fails. +pub fn create_queue(name: &str, config: QueueConfig) -> Result<(), Error> { + let response = unsafe { + taskqueue_createqueue(Json(TaskQueueCreateQueueRequest { + name: name.to_owned(), + config: config, + }))? + }; + + if let Some(err) = response.0.error { + return Err(Error::msg(err)); + } + + Ok(()) +} + +/// Enqueue adds a task to the named queue. Returns the task ID. +/// payload is opaque bytes passed back to the plugin on execution. +/// +/// # Arguments +/// * `queue_name` - String parameter. +/// * `payload` - Vec parameter. +/// +/// # Returns +/// The result value. +/// +/// # Errors +/// Returns an error if the host function call fails. +pub fn enqueue(queue_name: &str, payload: Vec) -> Result { + let response = unsafe { + taskqueue_enqueue(Json(TaskQueueEnqueueRequest { + queue_name: queue_name.to_owned(), + payload: payload, + }))? + }; + + if let Some(err) = response.0.error { + return Err(Error::msg(err)); + } + + Ok(response.0.result) +} + +/// GetTaskStatus returns the status of a task: "pending", "running", +/// "completed", "failed", or "cancelled". +/// +/// # Arguments +/// * `task_id` - String parameter. +/// +/// # Returns +/// The result value. +/// +/// # Errors +/// Returns an error if the host function call fails. +pub fn get_task_status(task_id: &str) -> Result { + let response = unsafe { + taskqueue_gettaskstatus(Json(TaskQueueGetTaskStatusRequest { + task_id: task_id.to_owned(), + }))? + }; + + if let Some(err) = response.0.error { + return Err(Error::msg(err)); + } + + Ok(response.0.result) +} + +/// CancelTask cancels a pending task. Returns error if already +/// running, completed, or failed. +/// +/// # Arguments +/// * `task_id` - String parameter. +/// +/// # Errors +/// Returns an error if the host function call fails. +pub fn cancel_task(task_id: &str) -> Result<(), Error> { + let response = unsafe { + taskqueue_canceltask(Json(TaskQueueCancelTaskRequest { + task_id: task_id.to_owned(), + }))? + }; + + if let Some(err) = response.0.error { + return Err(Error::msg(err)); + } + + Ok(()) +}