feat(jsonrpc2): add manual receive mode to prevent message loss

In the default 'on-start' receive mode, signal-cli's daemon
auto-pulls inbound messages and pushes them to its JSON-RPC clients
the moment they arrive from the Signal servers. If no websocket
subscriber is currently attached to /v1/receive/{number} — for
example during a brief subscriber redeploy — the receiver loop in
signal-cli-rest-api drops the message via a non-blocking channel
send (see the 'no receiver' debug log in ReceiveData). signal-cli
has already acknowledged the message to Signal, so it is never
redelivered.

Add support for signal-cli's --receive-mode=manual, opt-in via the
new JSON_RPC_RECEIVE_MODE=manual environment variable. In manual
mode signal-cli does NOT auto-receive; signal-cli-rest-api now
issues subscribeReceive when a websocket subscriber attaches and
unsubscribeReceive when the last subscriber detaches. While no
subscriber is attached, signal-cli does not pull messages from the
Signal servers, so they remain buffered server-side under Signal's
normal retention rules and are delivered to the next subscriber
that attaches.

Implementation:

* jsonrpc2-helper.go: thread JSON_RPC_RECEIVE_MODE through to the
  daemon launch command line.
* jsonrpc2.go: add per-account subscription tracking with
  refcounting; subscribeReceive/unsubscribeReceive helpers; an
  unwrap step in ReceiveData so manual-mode notifications
  ({subscription,result}) and auto-mode notifications (envelope
  directly) reach downstream consumers in the same shape.
* client.go, api.go: thread the account number through
  GetReceiveChannel so the JsonRpc2Client can attach the right
  subscription.

The change is fully backward compatible: when JSON_RPC_RECEIVE_MODE
is unset or set to 'on-start', signal-cli runs in auto mode and the
new subscribeReceive code path is bypassed (account is empty, no
RPC issued).

Closes #255 (the same root cause: no subscriber → 'no receiver'
drop).
This commit is contained in:
Stephan Richter 2026-04-08 16:16:55 -04:00
parent 2a776618e9
commit ad0c231032
4 changed files with 150 additions and 16 deletions

View File

@ -572,7 +572,7 @@ func (a *Api) SendV2(c *gin.Context) {
} }
func (a *Api) handleSignalReceive(ws *websocket.Conn, number string, stop chan struct{}) { func (a *Api) handleSignalReceive(ws *websocket.Conn, number string, stop chan struct{}) {
receiveChannel, channelUuid, err := a.signalClient.GetReceiveChannel() receiveChannel, channelUuid, err := a.signalClient.GetReceiveChannel(number)
if err != nil { if err != nil {
log.Error("Couldn't get receive channel: ", err.Error()) log.Error("Couldn't get receive channel: ", err.Error())
return return

View File

@ -1059,12 +1059,12 @@ func (s *SignalClient) Receive(number string, timeout int64, ignoreAttachments b
} }
} }
func (s *SignalClient) GetReceiveChannel() (chan JsonRpc2ReceivedMessage, string, error) { func (s *SignalClient) GetReceiveChannel(number string) (chan JsonRpc2ReceivedMessage, string, error) {
jsonRpc2Client, err := s.getJsonRpc2Client() jsonRpc2Client, err := s.getJsonRpc2Client()
if err != nil { if err != nil {
return nil, "", err return nil, "", err
} }
return jsonRpc2Client.GetReceiveChannel() return jsonRpc2Client.GetReceiveChannel(number)
} }
func (s *SignalClient) RemoveReceiveChannel(channelUuid string) { func (s *SignalClient) RemoveReceiveChannel(channelUuid string) {

View File

@ -5,6 +5,7 @@ import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt"
"net" "net"
"net/http" "net/http"
"strconv" "strconv"
@ -56,15 +57,26 @@ func (r *RateLimitErrorType) Error() string {
return r.Err.Error() return r.Err.Error()
} }
// receiveSubscription tracks the state of a manual-mode signal-cli
// subscribeReceive call: the subscription id assigned by signal-cli and
// a refcount of websocket subscribers attached to that account.
type receiveSubscription struct {
id int64
refcount int
}
type JsonRpc2Client struct { type JsonRpc2Client struct {
conn net.Conn conn net.Conn
receivedResponsesById map[string]chan JsonRpc2MessageResponse receivedResponsesById map[string]chan JsonRpc2MessageResponse
receivedMessagesChannels map[string]chan JsonRpc2ReceivedMessage receivedMessagesChannels map[string]chan JsonRpc2ReceivedMessage
signalCliApiConfig *utils.SignalCliApiConfig receiveSubscriptions map[string]*receiveSubscription // account -> sub state
number string channelAccountByUuid map[string]string // channelUuid -> account
receivedMessagesMutex sync.Mutex signalCliApiConfig *utils.SignalCliApiConfig
receivedResponsesMutex sync.Mutex number string
address string receivedMessagesMutex sync.Mutex
receivedResponsesMutex sync.Mutex
receiveSubscriptionsMutex sync.Mutex
address string
} }
func NewJsonRpc2Client(signalCliApiConfig *utils.SignalCliApiConfig, number string) *JsonRpc2Client { func NewJsonRpc2Client(signalCliApiConfig *utils.SignalCliApiConfig, number string) *JsonRpc2Client {
@ -73,6 +85,8 @@ func NewJsonRpc2Client(signalCliApiConfig *utils.SignalCliApiConfig, number stri
number: number, number: number,
receivedResponsesById: make(map[string]chan JsonRpc2MessageResponse), receivedResponsesById: make(map[string]chan JsonRpc2MessageResponse),
receivedMessagesChannels: make(map[string]chan JsonRpc2ReceivedMessage), receivedMessagesChannels: make(map[string]chan JsonRpc2ReceivedMessage),
receiveSubscriptions: make(map[string]*receiveSubscription),
channelAccountByUuid: make(map[string]string),
} }
} }
@ -236,6 +250,19 @@ func (r *JsonRpc2Client) ReceiveData(number string, receiveWebhookUrl string) {
var resp1 JsonRpc2ReceivedMessage var resp1 JsonRpc2ReceivedMessage
json.Unmarshal([]byte(str), &resp1) json.Unmarshal([]byte(str), &resp1)
if resp1.Method == "receive" { if resp1.Method == "receive" {
// In manual receive-mode signal-cli wraps the envelope in
// {"subscription":N,"result":{...}}; in auto mode it sends
// the envelope directly. Unwrap so the broadcast format is
// the same in both modes and downstream consumers (e.g. the
// websocket handler) don't have to know which mode is in use.
var manualWrapper struct {
Subscription int64 `json:"subscription"`
Result json.RawMessage `json:"result"`
}
if err := json.Unmarshal(resp1.Params, &manualWrapper); err == nil && len(manualWrapper.Result) > 0 {
resp1.Params = manualWrapper.Result
}
r.receivedMessagesMutex.Lock() r.receivedMessagesMutex.Lock()
for _, c := range r.receivedMessagesChannels { for _, c := range r.receivedMessagesChannels {
select { select {
@ -244,7 +271,6 @@ func (r *JsonRpc2Client) ReceiveData(number string, receiveWebhookUrl string) {
default: default:
log.Debug("Couldn't send message to golang channel, as there's no receiver") log.Debug("Couldn't send message to golang channel, as there's no receiver")
} }
continue
} }
r.receivedMessagesMutex.Unlock() r.receivedMessagesMutex.Unlock()
@ -270,16 +296,102 @@ func (r *JsonRpc2Client) ReceiveData(number string, receiveWebhookUrl string) {
} }
} }
func (r *JsonRpc2Client) GetReceiveChannel() (chan JsonRpc2ReceivedMessage, string, error) { // subscribeReceive starts receiving messages for an account by calling
c := make(chan JsonRpc2ReceivedMessage) // signal-cli's subscribeReceive JSON-RPC method. Only relevant when
// signal-cli was launched with --receive-mode=manual; in auto mode the
// daemon pushes notifications without an explicit subscribe call.
// Returns the subscription id assigned by signal-cli.
func (r *JsonRpc2Client) subscribeReceive(account string) (int64, error) {
type subscribeReceiveArgs struct{}
resultStr, err := r.getRaw("subscribeReceive", &account, subscribeReceiveArgs{})
if err != nil {
return 0, err
}
var subscriptionId int64
if err := json.Unmarshal([]byte(resultStr), &subscriptionId); err != nil {
return 0, fmt.Errorf("subscribeReceive: couldn't parse subscription id from %q: %w", resultStr, err)
}
return subscriptionId, nil
}
// unsubscribeReceive cancels a manual-mode subscription previously
// returned by subscribeReceive.
func (r *JsonRpc2Client) unsubscribeReceive(account string, subscriptionId int64) error {
type unsubscribeReceiveArgs struct {
Subscription int64 `json:"subscription"`
}
_, err := r.getRaw("unsubscribeReceive", &account, unsubscribeReceiveArgs{Subscription: subscriptionId})
return err
}
// acquireReceiveSubscription ensures an active manual-mode subscription
// exists for the given account, refcounting concurrent websocket
// subscribers. The first caller for an account triggers a real
// subscribeReceive RPC; subsequent callers just bump the refcount.
func (r *JsonRpc2Client) acquireReceiveSubscription(account string) error {
r.receiveSubscriptionsMutex.Lock()
defer r.receiveSubscriptionsMutex.Unlock()
if sub, ok := r.receiveSubscriptions[account]; ok {
sub.refcount++
return nil
}
id, err := r.subscribeReceive(account)
if err != nil {
return err
}
r.receiveSubscriptions[account] = &receiveSubscription{id: id, refcount: 1}
log.Infof("Subscribed to receive notifications for account %s (subscription=%d)", account, id)
return nil
}
// releaseReceiveSubscription decrements the per-account refcount and
// cancels the subscription with signal-cli if it drops to zero.
func (r *JsonRpc2Client) releaseReceiveSubscription(account string) {
r.receiveSubscriptionsMutex.Lock()
defer r.receiveSubscriptionsMutex.Unlock()
sub, ok := r.receiveSubscriptions[account]
if !ok {
return
}
sub.refcount--
if sub.refcount > 0 {
return
}
if err := r.unsubscribeReceive(account, sub.id); err != nil {
log.Warnf("unsubscribeReceive failed for account %s (subscription=%d): %s", account, sub.id, err.Error())
} else {
log.Infof("Unsubscribed from receive notifications for account %s (subscription=%d)", account, sub.id)
}
delete(r.receiveSubscriptions, account)
}
// GetReceiveChannel returns a channel that will receive messages for the
// given account. If signal-cli is in manual receive-mode, it also acquires
// a subscription so that signal-cli starts forwarding messages for the
// account; in auto mode, account is unused (notifications flow regardless).
//
// account may be empty when the caller does not need a subscription
// (e.g. legacy callers in auto mode); in that case no subscribeReceive
// RPC is issued.
func (r *JsonRpc2Client) GetReceiveChannel(account string) (chan JsonRpc2ReceivedMessage, string, error) {
c := make(chan JsonRpc2ReceivedMessage, 64)
channelUuid, err := uuid.NewV4() channelUuid, err := uuid.NewV4()
if err != nil { if err != nil {
return c, "", err return c, "", err
} }
if account != "" {
if err := r.acquireReceiveSubscription(account); err != nil {
return c, "", fmt.Errorf("subscribeReceive failed for account %s: %w", account, err)
}
}
r.receivedMessagesMutex.Lock() r.receivedMessagesMutex.Lock()
r.receivedMessagesChannels[channelUuid.String()] = c r.receivedMessagesChannels[channelUuid.String()] = c
r.channelAccountByUuid[channelUuid.String()] = account
r.receivedMessagesMutex.Unlock() r.receivedMessagesMutex.Unlock()
return c, channelUuid.String(), nil return c, channelUuid.String(), nil
@ -288,5 +400,11 @@ func (r *JsonRpc2Client) GetReceiveChannel() (chan JsonRpc2ReceivedMessage, stri
func (r *JsonRpc2Client) RemoveReceiveChannel(channelUuid string) { func (r *JsonRpc2Client) RemoveReceiveChannel(channelUuid string) {
r.receivedMessagesMutex.Lock() r.receivedMessagesMutex.Lock()
delete(r.receivedMessagesChannels, channelUuid) delete(r.receivedMessagesChannels, channelUuid)
account := r.channelAccountByUuid[channelUuid]
delete(r.channelAccountByUuid, channelUuid)
r.receivedMessagesMutex.Unlock() r.receivedMessagesMutex.Unlock()
if account != "" {
r.releaseReceiveSubscription(account)
}
} }

View File

@ -13,7 +13,7 @@ import (
const supervisorctlConfigTemplate = ` const supervisorctlConfigTemplate = `
[program:%s] [program:%s]
process_name=%s process_name=%s
command=%s --output=json --config %s%s daemon %s%s%s%s --tcp 127.0.0.1:%d command=%s --output=json --config %s%s daemon%s %s%s%s%s --tcp 127.0.0.1:%d
autostart=true autostart=true
autorestart=true autorestart=true
startretries=10 startretries=10
@ -75,6 +75,22 @@ func main() {
signalCliIgnoreStickers = " --ignore-stickers" signalCliIgnoreStickers = " --ignore-stickers"
} }
// Receive mode: by default signal-cli auto-receives messages on the
// daemon and pushes them as JSON-RPC notifications, regardless of
// whether any websocket subscriber is connected. In a deploy where
// the subscribing client briefly disconnects, those notifications
// have nowhere to go and are silently dropped. With manual receive
// mode, signal-cli only fetches messages when explicitly requested
// via subscribeReceive — see jsonrpc2.go — so messages stay buffered
// on the Signal servers until a subscriber re-attaches.
signalCliReceiveMode := ""
receiveMode := utils.GetEnv("JSON_RPC_RECEIVE_MODE", "")
if receiveMode == "manual" {
signalCliReceiveMode = " --receive-mode=manual"
} else if receiveMode != "" && receiveMode != "on-start" {
log.Fatal("Invalid JSON_RPC_RECEIVE_MODE environment variable set! Must be 'manual' or 'on-start'.")
}
supervisorctlProgramName := "signal-cli-json-rpc-1" supervisorctlProgramName := "signal-cli-json-rpc-1"
supervisorctlLogFolder := "/var/log/" + supervisorctlProgramName supervisorctlLogFolder := "/var/log/" + supervisorctlProgramName
_, err := exec.Command("mkdir", "-p", supervisorctlLogFolder).Output() _, err := exec.Command("mkdir", "-p", supervisorctlLogFolder).Output()
@ -100,7 +116,7 @@ func main() {
supervisorctlConfigFilename := "/etc/supervisor/conf.d/" + "signal-cli-json-rpc-1.conf" supervisorctlConfigFilename := "/etc/supervisor/conf.d/" + "signal-cli-json-rpc-1.conf"
supervisorctlConfig := fmt.Sprintf(supervisorctlConfigTemplate, supervisorctlProgramName, supervisorctlProgramName, signalCliBinary, supervisorctlConfig := fmt.Sprintf(supervisorctlConfigTemplate, supervisorctlProgramName, supervisorctlProgramName, signalCliBinary,
signalCliConfigDir, trustNewIdentities, signalCliIgnoreAttachments, signalCliIgnoreStories, signalCliConfigDir, trustNewIdentities, signalCliReceiveMode, signalCliIgnoreAttachments, signalCliIgnoreStories,
signalCliIgnoreAvatars, signalCliIgnoreStickers, tcpPort, signalCliIgnoreAvatars, signalCliIgnoreStickers, tcpPort,
supervisorctlProgramName, supervisorctlProgramName) supervisorctlProgramName, supervisorctlProgramName)