From ad0c2310325154b59b8bb5523e29a132a82326f8 Mon Sep 17 00:00:00 2001 From: Stephan Richter Date: Wed, 8 Apr 2026 16:16:55 -0400 Subject: [PATCH] feat(jsonrpc2): add manual receive mode to prevent message loss MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In the default 'on-start' receive mode, signal-cli's daemon auto-pulls inbound messages and pushes them to its JSON-RPC clients the moment they arrive from the Signal servers. If no websocket subscriber is currently attached to /v1/receive/{number} — for example during a brief subscriber redeploy — the receiver loop in signal-cli-rest-api drops the message via a non-blocking channel send (see the 'no receiver' debug log in ReceiveData). signal-cli has already acknowledged the message to Signal, so it is never redelivered. Add support for signal-cli's --receive-mode=manual, opt-in via the new JSON_RPC_RECEIVE_MODE=manual environment variable. In manual mode signal-cli does NOT auto-receive; signal-cli-rest-api now issues subscribeReceive when a websocket subscriber attaches and unsubscribeReceive when the last subscriber detaches. While no subscriber is attached, signal-cli does not pull messages from the Signal servers, so they remain buffered server-side under Signal's normal retention rules and are delivered to the next subscriber that attaches. Implementation: * jsonrpc2-helper.go: thread JSON_RPC_RECEIVE_MODE through to the daemon launch command line. * jsonrpc2.go: add per-account subscription tracking with refcounting; subscribeReceive/unsubscribeReceive helpers; an unwrap step in ReceiveData so manual-mode notifications ({subscription,result}) and auto-mode notifications (envelope directly) reach downstream consumers in the same shape. * client.go, api.go: thread the account number through GetReceiveChannel so the JsonRpc2Client can attach the right subscription. The change is fully backward compatible: when JSON_RPC_RECEIVE_MODE is unset or set to 'on-start', signal-cli runs in auto mode and the new subscribeReceive code path is bypassed (account is empty, no RPC issued). Closes #255 (the same root cause: no subscriber → 'no receiver' drop). --- src/api/api.go | 2 +- src/client/client.go | 4 +- src/client/jsonrpc2.go | 140 ++++++++++++++++++++++++++++++--- src/scripts/jsonrpc2-helper.go | 20 ++++- 4 files changed, 150 insertions(+), 16 deletions(-) diff --git a/src/api/api.go b/src/api/api.go index a0304fc..ce447eb 100644 --- a/src/api/api.go +++ b/src/api/api.go @@ -572,7 +572,7 @@ func (a *Api) SendV2(c *gin.Context) { } func (a *Api) handleSignalReceive(ws *websocket.Conn, number string, stop chan struct{}) { - receiveChannel, channelUuid, err := a.signalClient.GetReceiveChannel() + receiveChannel, channelUuid, err := a.signalClient.GetReceiveChannel(number) if err != nil { log.Error("Couldn't get receive channel: ", err.Error()) return diff --git a/src/client/client.go b/src/client/client.go index d02eb36..5ba9fa5 100644 --- a/src/client/client.go +++ b/src/client/client.go @@ -1059,12 +1059,12 @@ func (s *SignalClient) Receive(number string, timeout int64, ignoreAttachments b } } -func (s *SignalClient) GetReceiveChannel() (chan JsonRpc2ReceivedMessage, string, error) { +func (s *SignalClient) GetReceiveChannel(number string) (chan JsonRpc2ReceivedMessage, string, error) { jsonRpc2Client, err := s.getJsonRpc2Client() if err != nil { return nil, "", err } - return jsonRpc2Client.GetReceiveChannel() + return jsonRpc2Client.GetReceiveChannel(number) } func (s *SignalClient) RemoveReceiveChannel(channelUuid string) { diff --git a/src/client/jsonrpc2.go b/src/client/jsonrpc2.go index ef00633..5d0d62f 100644 --- a/src/client/jsonrpc2.go +++ b/src/client/jsonrpc2.go @@ -5,6 +5,7 @@ import ( "bytes" "encoding/json" "errors" + "fmt" "net" "net/http" "strconv" @@ -56,15 +57,26 @@ func (r *RateLimitErrorType) Error() string { return r.Err.Error() } +// receiveSubscription tracks the state of a manual-mode signal-cli +// subscribeReceive call: the subscription id assigned by signal-cli and +// a refcount of websocket subscribers attached to that account. +type receiveSubscription struct { + id int64 + refcount int +} + type JsonRpc2Client struct { - conn net.Conn - receivedResponsesById map[string]chan JsonRpc2MessageResponse - receivedMessagesChannels map[string]chan JsonRpc2ReceivedMessage - signalCliApiConfig *utils.SignalCliApiConfig - number string - receivedMessagesMutex sync.Mutex - receivedResponsesMutex sync.Mutex - address string + conn net.Conn + receivedResponsesById map[string]chan JsonRpc2MessageResponse + receivedMessagesChannels map[string]chan JsonRpc2ReceivedMessage + receiveSubscriptions map[string]*receiveSubscription // account -> sub state + channelAccountByUuid map[string]string // channelUuid -> account + signalCliApiConfig *utils.SignalCliApiConfig + number string + receivedMessagesMutex sync.Mutex + receivedResponsesMutex sync.Mutex + receiveSubscriptionsMutex sync.Mutex + address string } func NewJsonRpc2Client(signalCliApiConfig *utils.SignalCliApiConfig, number string) *JsonRpc2Client { @@ -73,6 +85,8 @@ func NewJsonRpc2Client(signalCliApiConfig *utils.SignalCliApiConfig, number stri number: number, receivedResponsesById: make(map[string]chan JsonRpc2MessageResponse), receivedMessagesChannels: make(map[string]chan JsonRpc2ReceivedMessage), + receiveSubscriptions: make(map[string]*receiveSubscription), + channelAccountByUuid: make(map[string]string), } } @@ -236,6 +250,19 @@ func (r *JsonRpc2Client) ReceiveData(number string, receiveWebhookUrl string) { var resp1 JsonRpc2ReceivedMessage json.Unmarshal([]byte(str), &resp1) if resp1.Method == "receive" { + // In manual receive-mode signal-cli wraps the envelope in + // {"subscription":N,"result":{...}}; in auto mode it sends + // the envelope directly. Unwrap so the broadcast format is + // the same in both modes and downstream consumers (e.g. the + // websocket handler) don't have to know which mode is in use. + var manualWrapper struct { + Subscription int64 `json:"subscription"` + Result json.RawMessage `json:"result"` + } + if err := json.Unmarshal(resp1.Params, &manualWrapper); err == nil && len(manualWrapper.Result) > 0 { + resp1.Params = manualWrapper.Result + } + r.receivedMessagesMutex.Lock() for _, c := range r.receivedMessagesChannels { select { @@ -244,7 +271,6 @@ func (r *JsonRpc2Client) ReceiveData(number string, receiveWebhookUrl string) { default: log.Debug("Couldn't send message to golang channel, as there's no receiver") } - continue } r.receivedMessagesMutex.Unlock() @@ -270,16 +296,102 @@ func (r *JsonRpc2Client) ReceiveData(number string, receiveWebhookUrl string) { } } -func (r *JsonRpc2Client) GetReceiveChannel() (chan JsonRpc2ReceivedMessage, string, error) { - c := make(chan JsonRpc2ReceivedMessage) +// subscribeReceive starts receiving messages for an account by calling +// signal-cli's subscribeReceive JSON-RPC method. Only relevant when +// signal-cli was launched with --receive-mode=manual; in auto mode the +// daemon pushes notifications without an explicit subscribe call. +// Returns the subscription id assigned by signal-cli. +func (r *JsonRpc2Client) subscribeReceive(account string) (int64, error) { + type subscribeReceiveArgs struct{} + resultStr, err := r.getRaw("subscribeReceive", &account, subscribeReceiveArgs{}) + if err != nil { + return 0, err + } + var subscriptionId int64 + if err := json.Unmarshal([]byte(resultStr), &subscriptionId); err != nil { + return 0, fmt.Errorf("subscribeReceive: couldn't parse subscription id from %q: %w", resultStr, err) + } + return subscriptionId, nil +} + +// unsubscribeReceive cancels a manual-mode subscription previously +// returned by subscribeReceive. +func (r *JsonRpc2Client) unsubscribeReceive(account string, subscriptionId int64) error { + type unsubscribeReceiveArgs struct { + Subscription int64 `json:"subscription"` + } + _, err := r.getRaw("unsubscribeReceive", &account, unsubscribeReceiveArgs{Subscription: subscriptionId}) + return err +} + +// acquireReceiveSubscription ensures an active manual-mode subscription +// exists for the given account, refcounting concurrent websocket +// subscribers. The first caller for an account triggers a real +// subscribeReceive RPC; subsequent callers just bump the refcount. +func (r *JsonRpc2Client) acquireReceiveSubscription(account string) error { + r.receiveSubscriptionsMutex.Lock() + defer r.receiveSubscriptionsMutex.Unlock() + + if sub, ok := r.receiveSubscriptions[account]; ok { + sub.refcount++ + return nil + } + id, err := r.subscribeReceive(account) + if err != nil { + return err + } + r.receiveSubscriptions[account] = &receiveSubscription{id: id, refcount: 1} + log.Infof("Subscribed to receive notifications for account %s (subscription=%d)", account, id) + return nil +} + +// releaseReceiveSubscription decrements the per-account refcount and +// cancels the subscription with signal-cli if it drops to zero. +func (r *JsonRpc2Client) releaseReceiveSubscription(account string) { + r.receiveSubscriptionsMutex.Lock() + defer r.receiveSubscriptionsMutex.Unlock() + + sub, ok := r.receiveSubscriptions[account] + if !ok { + return + } + sub.refcount-- + if sub.refcount > 0 { + return + } + if err := r.unsubscribeReceive(account, sub.id); err != nil { + log.Warnf("unsubscribeReceive failed for account %s (subscription=%d): %s", account, sub.id, err.Error()) + } else { + log.Infof("Unsubscribed from receive notifications for account %s (subscription=%d)", account, sub.id) + } + delete(r.receiveSubscriptions, account) +} + +// GetReceiveChannel returns a channel that will receive messages for the +// given account. If signal-cli is in manual receive-mode, it also acquires +// a subscription so that signal-cli starts forwarding messages for the +// account; in auto mode, account is unused (notifications flow regardless). +// +// account may be empty when the caller does not need a subscription +// (e.g. legacy callers in auto mode); in that case no subscribeReceive +// RPC is issued. +func (r *JsonRpc2Client) GetReceiveChannel(account string) (chan JsonRpc2ReceivedMessage, string, error) { + c := make(chan JsonRpc2ReceivedMessage, 64) channelUuid, err := uuid.NewV4() if err != nil { return c, "", err } + if account != "" { + if err := r.acquireReceiveSubscription(account); err != nil { + return c, "", fmt.Errorf("subscribeReceive failed for account %s: %w", account, err) + } + } + r.receivedMessagesMutex.Lock() r.receivedMessagesChannels[channelUuid.String()] = c + r.channelAccountByUuid[channelUuid.String()] = account r.receivedMessagesMutex.Unlock() return c, channelUuid.String(), nil @@ -288,5 +400,11 @@ func (r *JsonRpc2Client) GetReceiveChannel() (chan JsonRpc2ReceivedMessage, stri func (r *JsonRpc2Client) RemoveReceiveChannel(channelUuid string) { r.receivedMessagesMutex.Lock() delete(r.receivedMessagesChannels, channelUuid) + account := r.channelAccountByUuid[channelUuid] + delete(r.channelAccountByUuid, channelUuid) r.receivedMessagesMutex.Unlock() + + if account != "" { + r.releaseReceiveSubscription(account) + } } diff --git a/src/scripts/jsonrpc2-helper.go b/src/scripts/jsonrpc2-helper.go index 96a4c34..2c8a75b 100644 --- a/src/scripts/jsonrpc2-helper.go +++ b/src/scripts/jsonrpc2-helper.go @@ -13,7 +13,7 @@ import ( const supervisorctlConfigTemplate = ` [program:%s] process_name=%s -command=%s --output=json --config %s%s daemon %s%s%s%s --tcp 127.0.0.1:%d +command=%s --output=json --config %s%s daemon%s %s%s%s%s --tcp 127.0.0.1:%d autostart=true autorestart=true startretries=10 @@ -75,6 +75,22 @@ func main() { signalCliIgnoreStickers = " --ignore-stickers" } + // Receive mode: by default signal-cli auto-receives messages on the + // daemon and pushes them as JSON-RPC notifications, regardless of + // whether any websocket subscriber is connected. In a deploy where + // the subscribing client briefly disconnects, those notifications + // have nowhere to go and are silently dropped. With manual receive + // mode, signal-cli only fetches messages when explicitly requested + // via subscribeReceive — see jsonrpc2.go — so messages stay buffered + // on the Signal servers until a subscriber re-attaches. + signalCliReceiveMode := "" + receiveMode := utils.GetEnv("JSON_RPC_RECEIVE_MODE", "") + if receiveMode == "manual" { + signalCliReceiveMode = " --receive-mode=manual" + } else if receiveMode != "" && receiveMode != "on-start" { + log.Fatal("Invalid JSON_RPC_RECEIVE_MODE environment variable set! Must be 'manual' or 'on-start'.") + } + supervisorctlProgramName := "signal-cli-json-rpc-1" supervisorctlLogFolder := "/var/log/" + supervisorctlProgramName _, err := exec.Command("mkdir", "-p", supervisorctlLogFolder).Output() @@ -100,7 +116,7 @@ func main() { supervisorctlConfigFilename := "/etc/supervisor/conf.d/" + "signal-cli-json-rpc-1.conf" supervisorctlConfig := fmt.Sprintf(supervisorctlConfigTemplate, supervisorctlProgramName, supervisorctlProgramName, signalCliBinary, - signalCliConfigDir, trustNewIdentities, signalCliIgnoreAttachments, signalCliIgnoreStories, + signalCliConfigDir, trustNewIdentities, signalCliReceiveMode, signalCliIgnoreAttachments, signalCliIgnoreStories, signalCliIgnoreAvatars, signalCliIgnoreStickers, tcpPort, supervisorctlProgramName, supervisorctlProgramName)