feat(plugins): define TaskQueue host service interface

Add the TaskQueueService interface with CreateQueue, Enqueue,
GetTaskStatus, and CancelTask methods plus QueueConfig struct.
This commit is contained in:
Deluan 2026-02-26 15:08:11 -05:00 committed by Deluan Quintão
parent acd69f6a4f
commit b76c57d2c4
6 changed files with 917 additions and 0 deletions

57
plugins/host/taskqueue.go Normal file
View File

@ -0,0 +1,57 @@
package host
import "context"
// QueueConfig holds configuration for a task queue.
type QueueConfig struct {
// Concurrency is the max number of parallel workers. Default: 1.
// Capped by the plugin's manifest maxConcurrency.
Concurrency int32 `json:"concurrency"`
// MaxRetries is the number of times to retry a failed task. Default: 0.
MaxRetries int32 `json:"maxRetries"`
// BackoffMs is the initial backoff between retries in milliseconds.
// Doubles each retry (exponential: backoffMs * 2^(attempt-1)). Default: 1000.
BackoffMs int64 `json:"backoffMs"`
// DelayMs is the minimum delay between starting consecutive tasks
// in milliseconds. Useful for rate limiting. Default: 0.
DelayMs int64 `json:"delayMs"`
// RetentionMs is how long completed/failed/cancelled tasks are kept
// in milliseconds. Default: 3600000 (1h). Min: 60000 (1m). Max: 604800000 (1w).
RetentionMs int64 `json:"retentionMs"`
}
// TaskQueueService provides persistent task queues for plugins.
//
// This service allows plugins to create named queues with configurable concurrency,
// retry policies, and rate limiting. Tasks are persisted to SQLite and survive
// server restarts. When a task is ready to execute, the host calls the plugin's
// nd_task_execute callback function.
//
//nd:hostservice name=TaskQueue permission=taskqueue
type TaskQueueService interface {
// CreateQueue creates a named task queue with the given configuration.
// Zero-value fields in config use sensible defaults.
// If a queue with the same name already exists, returns an error.
// On startup, this also recovers any stale "running" tasks from a previous crash.
//nd:hostfunc
CreateQueue(ctx context.Context, name string, config QueueConfig) error
// Enqueue adds a task to the named queue. Returns the task ID.
// payload is opaque bytes passed back to the plugin on execution.
//nd:hostfunc
Enqueue(ctx context.Context, queueName string, payload []byte) (string, error)
// GetTaskStatus returns the status of a task: "pending", "running",
// "completed", "failed", or "cancelled".
//nd:hostfunc
GetTaskStatus(ctx context.Context, taskID string) (string, error)
// CancelTask cancels a pending task. Returns error if already
// running, completed, or failed.
//nd:hostfunc
CancelTask(ctx context.Context, taskID string) error
}

View File

@ -0,0 +1,220 @@
// Code generated by ndpgen. DO NOT EDIT.
package host
import (
"context"
"encoding/json"
extism "github.com/extism/go-sdk"
)
// TaskQueueCreateQueueRequest is the request type for TaskQueue.CreateQueue.
type TaskQueueCreateQueueRequest struct {
Name string `json:"name"`
Config QueueConfig `json:"config"`
}
// TaskQueueCreateQueueResponse is the response type for TaskQueue.CreateQueue.
type TaskQueueCreateQueueResponse struct {
Error string `json:"error,omitempty"`
}
// TaskQueueEnqueueRequest is the request type for TaskQueue.Enqueue.
type TaskQueueEnqueueRequest struct {
QueueName string `json:"queueName"`
Payload []byte `json:"payload"`
}
// TaskQueueEnqueueResponse is the response type for TaskQueue.Enqueue.
type TaskQueueEnqueueResponse struct {
Result string `json:"result,omitempty"`
Error string `json:"error,omitempty"`
}
// TaskQueueGetTaskStatusRequest is the request type for TaskQueue.GetTaskStatus.
type TaskQueueGetTaskStatusRequest struct {
TaskID string `json:"taskId"`
}
// TaskQueueGetTaskStatusResponse is the response type for TaskQueue.GetTaskStatus.
type TaskQueueGetTaskStatusResponse struct {
Result string `json:"result,omitempty"`
Error string `json:"error,omitempty"`
}
// TaskQueueCancelTaskRequest is the request type for TaskQueue.CancelTask.
type TaskQueueCancelTaskRequest struct {
TaskID string `json:"taskId"`
}
// TaskQueueCancelTaskResponse is the response type for TaskQueue.CancelTask.
type TaskQueueCancelTaskResponse struct {
Error string `json:"error,omitempty"`
}
// RegisterTaskQueueHostFunctions registers TaskQueue service host functions.
// The returned host functions should be added to the plugin's configuration.
func RegisterTaskQueueHostFunctions(service TaskQueueService) []extism.HostFunction {
return []extism.HostFunction{
newTaskQueueCreateQueueHostFunction(service),
newTaskQueueEnqueueHostFunction(service),
newTaskQueueGetTaskStatusHostFunction(service),
newTaskQueueCancelTaskHostFunction(service),
}
}
func newTaskQueueCreateQueueHostFunction(service TaskQueueService) extism.HostFunction {
return extism.NewHostFunctionWithStack(
"taskqueue_createqueue",
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
// Read JSON request from plugin memory
reqBytes, err := p.ReadBytes(stack[0])
if err != nil {
taskqueueWriteError(p, stack, err)
return
}
var req TaskQueueCreateQueueRequest
if err := json.Unmarshal(reqBytes, &req); err != nil {
taskqueueWriteError(p, stack, err)
return
}
// Call the service method
if svcErr := service.CreateQueue(ctx, req.Name, req.Config); svcErr != nil {
taskqueueWriteError(p, stack, svcErr)
return
}
// Write JSON response to plugin memory
resp := TaskQueueCreateQueueResponse{}
taskqueueWriteResponse(p, stack, resp)
},
[]extism.ValueType{extism.ValueTypePTR},
[]extism.ValueType{extism.ValueTypePTR},
)
}
func newTaskQueueEnqueueHostFunction(service TaskQueueService) extism.HostFunction {
return extism.NewHostFunctionWithStack(
"taskqueue_enqueue",
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
// Read JSON request from plugin memory
reqBytes, err := p.ReadBytes(stack[0])
if err != nil {
taskqueueWriteError(p, stack, err)
return
}
var req TaskQueueEnqueueRequest
if err := json.Unmarshal(reqBytes, &req); err != nil {
taskqueueWriteError(p, stack, err)
return
}
// Call the service method
result, svcErr := service.Enqueue(ctx, req.QueueName, req.Payload)
if svcErr != nil {
taskqueueWriteError(p, stack, svcErr)
return
}
// Write JSON response to plugin memory
resp := TaskQueueEnqueueResponse{
Result: result,
}
taskqueueWriteResponse(p, stack, resp)
},
[]extism.ValueType{extism.ValueTypePTR},
[]extism.ValueType{extism.ValueTypePTR},
)
}
func newTaskQueueGetTaskStatusHostFunction(service TaskQueueService) extism.HostFunction {
return extism.NewHostFunctionWithStack(
"taskqueue_gettaskstatus",
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
// Read JSON request from plugin memory
reqBytes, err := p.ReadBytes(stack[0])
if err != nil {
taskqueueWriteError(p, stack, err)
return
}
var req TaskQueueGetTaskStatusRequest
if err := json.Unmarshal(reqBytes, &req); err != nil {
taskqueueWriteError(p, stack, err)
return
}
// Call the service method
result, svcErr := service.GetTaskStatus(ctx, req.TaskID)
if svcErr != nil {
taskqueueWriteError(p, stack, svcErr)
return
}
// Write JSON response to plugin memory
resp := TaskQueueGetTaskStatusResponse{
Result: result,
}
taskqueueWriteResponse(p, stack, resp)
},
[]extism.ValueType{extism.ValueTypePTR},
[]extism.ValueType{extism.ValueTypePTR},
)
}
func newTaskQueueCancelTaskHostFunction(service TaskQueueService) extism.HostFunction {
return extism.NewHostFunctionWithStack(
"taskqueue_canceltask",
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
// Read JSON request from plugin memory
reqBytes, err := p.ReadBytes(stack[0])
if err != nil {
taskqueueWriteError(p, stack, err)
return
}
var req TaskQueueCancelTaskRequest
if err := json.Unmarshal(reqBytes, &req); err != nil {
taskqueueWriteError(p, stack, err)
return
}
// Call the service method
if svcErr := service.CancelTask(ctx, req.TaskID); svcErr != nil {
taskqueueWriteError(p, stack, svcErr)
return
}
// Write JSON response to plugin memory
resp := TaskQueueCancelTaskResponse{}
taskqueueWriteResponse(p, stack, resp)
},
[]extism.ValueType{extism.ValueTypePTR},
[]extism.ValueType{extism.ValueTypePTR},
)
}
// taskqueueWriteResponse writes a JSON response to plugin memory.
func taskqueueWriteResponse(p *extism.CurrentPlugin, stack []uint64, resp any) {
respBytes, err := json.Marshal(resp)
if err != nil {
taskqueueWriteError(p, stack, err)
return
}
respPtr, err := p.WriteBytes(respBytes)
if err != nil {
stack[0] = 0
return
}
stack[0] = respPtr
}
// taskqueueWriteError writes an error response to plugin memory.
func taskqueueWriteError(p *extism.CurrentPlugin, stack []uint64, err error) {
errResp := struct {
Error string `json:"error"`
}{Error: err.Error()}
respBytes, _ := json.Marshal(errResp)
respPtr, _ := p.WriteBytes(respBytes)
stack[0] = respPtr
}

View File

@ -0,0 +1,219 @@
// Code generated by ndpgen. DO NOT EDIT.
//
// This file contains client wrappers for the TaskQueue host service.
// It is intended for use in Navidrome plugins built with TinyGo.
//
//go:build wasip1
package host
import (
"encoding/json"
"errors"
"github.com/navidrome/navidrome/plugins/pdk/go/pdk"
)
// QueueConfig represents the QueueConfig data structure.
// QueueConfig holds configuration for a task queue.
type QueueConfig struct {
Concurrency int32 `json:"concurrency"`
MaxRetries int32 `json:"maxRetries"`
BackoffMs int64 `json:"backoffMs"`
DelayMs int64 `json:"delayMs"`
RetentionMs int64 `json:"retentionMs"`
}
// taskqueue_createqueue is the host function provided by Navidrome.
//
//go:wasmimport extism:host/user taskqueue_createqueue
func taskqueue_createqueue(uint64) uint64
// taskqueue_enqueue is the host function provided by Navidrome.
//
//go:wasmimport extism:host/user taskqueue_enqueue
func taskqueue_enqueue(uint64) uint64
// taskqueue_gettaskstatus is the host function provided by Navidrome.
//
//go:wasmimport extism:host/user taskqueue_gettaskstatus
func taskqueue_gettaskstatus(uint64) uint64
// taskqueue_canceltask is the host function provided by Navidrome.
//
//go:wasmimport extism:host/user taskqueue_canceltask
func taskqueue_canceltask(uint64) uint64
type taskQueueCreateQueueRequest struct {
Name string `json:"name"`
Config QueueConfig `json:"config"`
}
type taskQueueEnqueueRequest struct {
QueueName string `json:"queueName"`
Payload []byte `json:"payload"`
}
type taskQueueEnqueueResponse struct {
Result string `json:"result,omitempty"`
Error string `json:"error,omitempty"`
}
type taskQueueGetTaskStatusRequest struct {
TaskID string `json:"taskId"`
}
type taskQueueGetTaskStatusResponse struct {
Result string `json:"result,omitempty"`
Error string `json:"error,omitempty"`
}
type taskQueueCancelTaskRequest struct {
TaskID string `json:"taskId"`
}
// TaskQueueCreateQueue calls the taskqueue_createqueue host function.
// CreateQueue creates a named task queue with the given configuration.
// Zero-value fields in config use sensible defaults.
// If a queue with the same name already exists, returns an error.
// On startup, this also recovers any stale "running" tasks from a previous crash.
func TaskQueueCreateQueue(name string, config QueueConfig) error {
// Marshal request to JSON
req := taskQueueCreateQueueRequest{
Name: name,
Config: config,
}
reqBytes, err := json.Marshal(req)
if err != nil {
return err
}
reqMem := pdk.AllocateBytes(reqBytes)
defer reqMem.Free()
// Call the host function
responsePtr := taskqueue_createqueue(reqMem.Offset())
// Read the response from memory
responseMem := pdk.FindMemory(responsePtr)
responseBytes := responseMem.ReadBytes()
// Parse error-only response
var response struct {
Error string `json:"error,omitempty"`
}
if err := json.Unmarshal(responseBytes, &response); err != nil {
return err
}
if response.Error != "" {
return errors.New(response.Error)
}
return nil
}
// TaskQueueEnqueue calls the taskqueue_enqueue host function.
// Enqueue adds a task to the named queue. Returns the task ID.
// payload is opaque bytes passed back to the plugin on execution.
func TaskQueueEnqueue(queueName string, payload []byte) (string, error) {
// Marshal request to JSON
req := taskQueueEnqueueRequest{
QueueName: queueName,
Payload: payload,
}
reqBytes, err := json.Marshal(req)
if err != nil {
return "", err
}
reqMem := pdk.AllocateBytes(reqBytes)
defer reqMem.Free()
// Call the host function
responsePtr := taskqueue_enqueue(reqMem.Offset())
// Read the response from memory
responseMem := pdk.FindMemory(responsePtr)
responseBytes := responseMem.ReadBytes()
// Parse the response
var response taskQueueEnqueueResponse
if err := json.Unmarshal(responseBytes, &response); err != nil {
return "", err
}
// Convert Error field to Go error
if response.Error != "" {
return "", errors.New(response.Error)
}
return response.Result, nil
}
// TaskQueueGetTaskStatus calls the taskqueue_gettaskstatus host function.
// GetTaskStatus returns the status of a task: "pending", "running",
// "completed", "failed", or "cancelled".
func TaskQueueGetTaskStatus(taskID string) (string, error) {
// Marshal request to JSON
req := taskQueueGetTaskStatusRequest{
TaskID: taskID,
}
reqBytes, err := json.Marshal(req)
if err != nil {
return "", err
}
reqMem := pdk.AllocateBytes(reqBytes)
defer reqMem.Free()
// Call the host function
responsePtr := taskqueue_gettaskstatus(reqMem.Offset())
// Read the response from memory
responseMem := pdk.FindMemory(responsePtr)
responseBytes := responseMem.ReadBytes()
// Parse the response
var response taskQueueGetTaskStatusResponse
if err := json.Unmarshal(responseBytes, &response); err != nil {
return "", err
}
// Convert Error field to Go error
if response.Error != "" {
return "", errors.New(response.Error)
}
return response.Result, nil
}
// TaskQueueCancelTask calls the taskqueue_canceltask host function.
// CancelTask cancels a pending task. Returns error if already
// running, completed, or failed.
func TaskQueueCancelTask(taskID string) error {
// Marshal request to JSON
req := taskQueueCancelTaskRequest{
TaskID: taskID,
}
reqBytes, err := json.Marshal(req)
if err != nil {
return err
}
reqMem := pdk.AllocateBytes(reqBytes)
defer reqMem.Free()
// Call the host function
responsePtr := taskqueue_canceltask(reqMem.Offset())
// Read the response from memory
responseMem := pdk.FindMemory(responsePtr)
responseBytes := responseMem.ReadBytes()
// Parse error-only response
var response struct {
Error string `json:"error,omitempty"`
}
if err := json.Unmarshal(responseBytes, &response); err != nil {
return err
}
if response.Error != "" {
return errors.New(response.Error)
}
return nil
}

View File

@ -0,0 +1,84 @@
// Code generated by ndpgen. DO NOT EDIT.
//
// This file contains mock implementations for non-WASM builds.
// These mocks allow IDE support, compilation, and unit testing on non-WASM platforms.
// Plugin authors can use the exported mock instances to set expectations in tests.
//
//go:build !wasip1
package host
import "github.com/stretchr/testify/mock"
// QueueConfig represents the QueueConfig data structure.
// QueueConfig holds configuration for a task queue.
type QueueConfig struct {
Concurrency int32 `json:"concurrency"`
MaxRetries int32 `json:"maxRetries"`
BackoffMs int64 `json:"backoffMs"`
DelayMs int64 `json:"delayMs"`
RetentionMs int64 `json:"retentionMs"`
}
// mockTaskQueueService is the mock implementation for testing.
type mockTaskQueueService struct {
mock.Mock
}
// TaskQueueMock is the auto-instantiated mock instance for testing.
// Use this to set expectations: host.TaskQueueMock.On("MethodName", args...).Return(values...)
var TaskQueueMock = &mockTaskQueueService{}
// CreateQueue is the mock method for TaskQueueCreateQueue.
func (m *mockTaskQueueService) CreateQueue(name string, config QueueConfig) error {
args := m.Called(name, config)
return args.Error(0)
}
// TaskQueueCreateQueue delegates to the mock instance.
// CreateQueue creates a named task queue with the given configuration.
// Zero-value fields in config use sensible defaults.
// If a queue with the same name already exists, returns an error.
// On startup, this also recovers any stale "running" tasks from a previous crash.
func TaskQueueCreateQueue(name string, config QueueConfig) error {
return TaskQueueMock.CreateQueue(name, config)
}
// Enqueue is the mock method for TaskQueueEnqueue.
func (m *mockTaskQueueService) Enqueue(queueName string, payload []byte) (string, error) {
args := m.Called(queueName, payload)
return args.String(0), args.Error(1)
}
// TaskQueueEnqueue delegates to the mock instance.
// Enqueue adds a task to the named queue. Returns the task ID.
// payload is opaque bytes passed back to the plugin on execution.
func TaskQueueEnqueue(queueName string, payload []byte) (string, error) {
return TaskQueueMock.Enqueue(queueName, payload)
}
// GetTaskStatus is the mock method for TaskQueueGetTaskStatus.
func (m *mockTaskQueueService) GetTaskStatus(taskID string) (string, error) {
args := m.Called(taskID)
return args.String(0), args.Error(1)
}
// TaskQueueGetTaskStatus delegates to the mock instance.
// GetTaskStatus returns the status of a task: "pending", "running",
// "completed", "failed", or "cancelled".
func TaskQueueGetTaskStatus(taskID string) (string, error) {
return TaskQueueMock.GetTaskStatus(taskID)
}
// CancelTask is the mock method for TaskQueueCancelTask.
func (m *mockTaskQueueService) CancelTask(taskID string) error {
args := m.Called(taskID)
return args.Error(0)
}
// TaskQueueCancelTask delegates to the mock instance.
// CancelTask cancels a pending task. Returns error if already
// running, completed, or failed.
func TaskQueueCancelTask(taskID string) error {
return TaskQueueMock.CancelTask(taskID)
}

View File

@ -0,0 +1,153 @@
# Code generated by ndpgen. DO NOT EDIT.
#
# This file contains client wrappers for the TaskQueue host service.
# It is intended for use in Navidrome plugins built with extism-py.
#
# IMPORTANT: Due to a limitation in extism-py, you cannot import this file directly.
# The @extism.import_fn decorators are only detected when defined in the plugin's
# main __init__.py file. Copy the needed functions from this file into your plugin.
from dataclasses import dataclass
from typing import Any
import extism
import json
class HostFunctionError(Exception):
"""Raised when a host function returns an error."""
pass
@extism.import_fn("extism:host/user", "taskqueue_createqueue")
def _taskqueue_createqueue(offset: int) -> int:
"""Raw host function - do not call directly."""
...
@extism.import_fn("extism:host/user", "taskqueue_enqueue")
def _taskqueue_enqueue(offset: int) -> int:
"""Raw host function - do not call directly."""
...
@extism.import_fn("extism:host/user", "taskqueue_gettaskstatus")
def _taskqueue_gettaskstatus(offset: int) -> int:
"""Raw host function - do not call directly."""
...
@extism.import_fn("extism:host/user", "taskqueue_canceltask")
def _taskqueue_canceltask(offset: int) -> int:
"""Raw host function - do not call directly."""
...
def taskqueue_create_queue(name: str, config: Any) -> None:
"""CreateQueue creates a named task queue with the given configuration.
Zero-value fields in config use sensible defaults.
If a queue with the same name already exists, returns an error.
On startup, this also recovers any stale "running" tasks from a previous crash.
Args:
name: str parameter.
config: Any parameter.
Raises:
HostFunctionError: If the host function returns an error.
"""
request = {
"name": name,
"config": config,
}
request_bytes = json.dumps(request).encode("utf-8")
request_mem = extism.memory.alloc(request_bytes)
response_offset = _taskqueue_createqueue(request_mem.offset)
response_mem = extism.memory.find(response_offset)
response = json.loads(extism.memory.string(response_mem))
if response.get("error"):
raise HostFunctionError(response["error"])
def taskqueue_enqueue(queue_name: str, payload: bytes) -> str:
"""Enqueue adds a task to the named queue. Returns the task ID.
payload is opaque bytes passed back to the plugin on execution.
Args:
queue_name: str parameter.
payload: bytes parameter.
Returns:
str: The result value.
Raises:
HostFunctionError: If the host function returns an error.
"""
request = {
"queueName": queue_name,
"payload": payload,
}
request_bytes = json.dumps(request).encode("utf-8")
request_mem = extism.memory.alloc(request_bytes)
response_offset = _taskqueue_enqueue(request_mem.offset)
response_mem = extism.memory.find(response_offset)
response = json.loads(extism.memory.string(response_mem))
if response.get("error"):
raise HostFunctionError(response["error"])
return response.get("result", "")
def taskqueue_get_task_status(task_id: str) -> str:
"""GetTaskStatus returns the status of a task: "pending", "running",
"completed", "failed", or "cancelled".
Args:
task_id: str parameter.
Returns:
str: The result value.
Raises:
HostFunctionError: If the host function returns an error.
"""
request = {
"taskId": task_id,
}
request_bytes = json.dumps(request).encode("utf-8")
request_mem = extism.memory.alloc(request_bytes)
response_offset = _taskqueue_gettaskstatus(request_mem.offset)
response_mem = extism.memory.find(response_offset)
response = json.loads(extism.memory.string(response_mem))
if response.get("error"):
raise HostFunctionError(response["error"])
return response.get("result", "")
def taskqueue_cancel_task(task_id: str) -> None:
"""CancelTask cancels a pending task. Returns error if already
running, completed, or failed.
Args:
task_id: str parameter.
Raises:
HostFunctionError: If the host function returns an error.
"""
request = {
"taskId": task_id,
}
request_bytes = json.dumps(request).encode("utf-8")
request_mem = extism.memory.alloc(request_bytes)
response_offset = _taskqueue_canceltask(request_mem.offset)
response_mem = extism.memory.find(response_offset)
response = json.loads(extism.memory.string(response_mem))
if response.get("error"):
raise HostFunctionError(response["error"])

View File

@ -0,0 +1,184 @@
// Code generated by ndpgen. DO NOT EDIT.
//
// This file contains client wrappers for the TaskQueue host service.
// It is intended for use in Navidrome plugins built with extism-pdk.
use extism_pdk::*;
use serde::{Deserialize, Serialize};
/// QueueConfig holds configuration for a task queue.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct QueueConfig {
pub concurrency: i32,
pub max_retries: i32,
pub backoff_ms: i64,
pub delay_ms: i64,
pub retention_ms: i64,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
struct TaskQueueCreateQueueRequest {
name: String,
config: QueueConfig,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
struct TaskQueueCreateQueueResponse {
#[serde(default)]
error: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
struct TaskQueueEnqueueRequest {
queue_name: String,
payload: Vec<u8>,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
struct TaskQueueEnqueueResponse {
#[serde(default)]
result: String,
#[serde(default)]
error: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
struct TaskQueueGetTaskStatusRequest {
task_id: String,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
struct TaskQueueGetTaskStatusResponse {
#[serde(default)]
result: String,
#[serde(default)]
error: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
struct TaskQueueCancelTaskRequest {
task_id: String,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
struct TaskQueueCancelTaskResponse {
#[serde(default)]
error: Option<String>,
}
#[host_fn]
extern "ExtismHost" {
fn taskqueue_createqueue(input: Json<TaskQueueCreateQueueRequest>) -> Json<TaskQueueCreateQueueResponse>;
fn taskqueue_enqueue(input: Json<TaskQueueEnqueueRequest>) -> Json<TaskQueueEnqueueResponse>;
fn taskqueue_gettaskstatus(input: Json<TaskQueueGetTaskStatusRequest>) -> Json<TaskQueueGetTaskStatusResponse>;
fn taskqueue_canceltask(input: Json<TaskQueueCancelTaskRequest>) -> Json<TaskQueueCancelTaskResponse>;
}
/// CreateQueue creates a named task queue with the given configuration.
/// Zero-value fields in config use sensible defaults.
/// If a queue with the same name already exists, returns an error.
/// On startup, this also recovers any stale "running" tasks from a previous crash.
///
/// # Arguments
/// * `name` - String parameter.
/// * `config` - QueueConfig parameter.
///
/// # Errors
/// Returns an error if the host function call fails.
pub fn create_queue(name: &str, config: QueueConfig) -> Result<(), Error> {
let response = unsafe {
taskqueue_createqueue(Json(TaskQueueCreateQueueRequest {
name: name.to_owned(),
config: config,
}))?
};
if let Some(err) = response.0.error {
return Err(Error::msg(err));
}
Ok(())
}
/// Enqueue adds a task to the named queue. Returns the task ID.
/// payload is opaque bytes passed back to the plugin on execution.
///
/// # Arguments
/// * `queue_name` - String parameter.
/// * `payload` - Vec<u8> parameter.
///
/// # Returns
/// The result value.
///
/// # Errors
/// Returns an error if the host function call fails.
pub fn enqueue(queue_name: &str, payload: Vec<u8>) -> Result<String, Error> {
let response = unsafe {
taskqueue_enqueue(Json(TaskQueueEnqueueRequest {
queue_name: queue_name.to_owned(),
payload: payload,
}))?
};
if let Some(err) = response.0.error {
return Err(Error::msg(err));
}
Ok(response.0.result)
}
/// GetTaskStatus returns the status of a task: "pending", "running",
/// "completed", "failed", or "cancelled".
///
/// # Arguments
/// * `task_id` - String parameter.
///
/// # Returns
/// The result value.
///
/// # Errors
/// Returns an error if the host function call fails.
pub fn get_task_status(task_id: &str) -> Result<String, Error> {
let response = unsafe {
taskqueue_gettaskstatus(Json(TaskQueueGetTaskStatusRequest {
task_id: task_id.to_owned(),
}))?
};
if let Some(err) = response.0.error {
return Err(Error::msg(err));
}
Ok(response.0.result)
}
/// CancelTask cancels a pending task. Returns error if already
/// running, completed, or failed.
///
/// # Arguments
/// * `task_id` - String parameter.
///
/// # Errors
/// Returns an error if the host function call fails.
pub fn cancel_task(task_id: &str) -> Result<(), Error> {
let response = unsafe {
taskqueue_canceltask(Json(TaskQueueCancelTaskRequest {
task_id: task_id.to_owned(),
}))?
};
if let Some(err) = response.0.error {
return Err(Error::msg(err));
}
Ok(())
}