Merge e98c03e702efaed305be0d1a5379ea1b3a8d2f2f into db63fd15e0bc2e2de4cff0b1969b12c23508a8d7

This commit is contained in:
Stephan Richter 2026-05-14 13:20:56 +08:00 committed by GitHub
commit 39c850bcdb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 151 additions and 16 deletions

View File

@ -160,3 +160,4 @@ There are a bunch of environmental variables that can be set inside the docker c
* `JSON_RPC_IGNORE_AVATARS`: When set to `true`, avatars are not automatically downloaded in json-rpc mode (default: `false`) * `JSON_RPC_IGNORE_AVATARS`: When set to `true`, avatars are not automatically downloaded in json-rpc mode (default: `false`)
* `JSON_RPC_IGNORE_STICKERS`: When set to `true`, sticker packs are not automatically downloaded in json-rpc mode (default: `false`) * `JSON_RPC_IGNORE_STICKERS`: When set to `true`, sticker packs are not automatically downloaded in json-rpc mode (default: `false`)
* `JSON_RPC_TRUST_NEW_IDENTITIES`: Choose how to trust new identities in json-rpc mode. Supported values: `on-first-use`, `always`, `never`. (default: `on-first-use`) * `JSON_RPC_TRUST_NEW_IDENTITIES`: Choose how to trust new identities in json-rpc mode. Supported values: `on-first-use`, `always`, `never`. (default: `on-first-use`)
* `JSON_RPC_RECEIVE_MODE`: Controls when signal-cli pulls inbound messages from the Signal servers in json-rpc mode. Supported values: `on-start` (default), `manual`. In the default `on-start` mode, signal-cli auto-receives messages and pushes them as JSON-RPC notifications regardless of whether any websocket subscriber is currently attached to `/v1/receive/{number}` — messages arriving while no subscriber is attached (e.g. during a brief subscriber redeploy) are silently dropped. In `manual` mode signal-cli only fetches messages while at least one websocket subscriber is attached; while no subscriber is attached, messages remain queued server-side under Signal's normal retention rules and are delivered on the next subscriber re-attach. Recommended for any deployment where the websocket consumer is restarted or scaled.

View File

@ -578,7 +578,7 @@ func (a *Api) SendV2(c *gin.Context) {
} }
func (a *Api) handleSignalReceive(ws *websocket.Conn, number string, stop chan struct{}) { func (a *Api) handleSignalReceive(ws *websocket.Conn, number string, stop chan struct{}) {
receiveChannel, channelUuid, err := a.signalClient.GetReceiveChannel() receiveChannel, channelUuid, err := a.signalClient.GetReceiveChannel(number)
if err != nil { if err != nil {
log.Error("Couldn't get receive channel: ", err.Error()) log.Error("Couldn't get receive channel: ", err.Error())
return return

View File

@ -1067,12 +1067,12 @@ func (s *SignalClient) Receive(number string, timeout int64, ignoreAttachments b
} }
} }
func (s *SignalClient) GetReceiveChannel() (chan JsonRpc2ReceivedMessage, string, error) { func (s *SignalClient) GetReceiveChannel(number string) (chan JsonRpc2ReceivedMessage, string, error) {
jsonRpc2Client, err := s.getJsonRpc2Client() jsonRpc2Client, err := s.getJsonRpc2Client()
if err != nil { if err != nil {
return nil, "", err return nil, "", err
} }
return jsonRpc2Client.GetReceiveChannel() return jsonRpc2Client.GetReceiveChannel(number)
} }
func (s *SignalClient) RemoveReceiveChannel(channelUuid string) { func (s *SignalClient) RemoveReceiveChannel(channelUuid string) {

View File

@ -5,6 +5,7 @@ import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt"
"net" "net"
"net/http" "net/http"
"strconv" "strconv"
@ -56,14 +57,25 @@ func (r *RateLimitErrorType) Error() string {
return r.Err.Error() return r.Err.Error()
} }
// receiveSubscription tracks the state of a manual-mode signal-cli
// subscribeReceive call: the subscription id assigned by signal-cli and
// a refcount of websocket subscribers attached to that account.
type receiveSubscription struct {
id int64
refcount int
}
type JsonRpc2Client struct { type JsonRpc2Client struct {
conn net.Conn conn net.Conn
receivedResponsesById map[string]chan JsonRpc2MessageResponse receivedResponsesById map[string]chan JsonRpc2MessageResponse
receivedMessagesChannels map[string]chan JsonRpc2ReceivedMessage receivedMessagesChannels map[string]chan JsonRpc2ReceivedMessage
receiveSubscriptions map[string]*receiveSubscription // account -> sub state
channelAccountByUuid map[string]string // channelUuid -> account
signalCliApiConfig *utils.SignalCliApiConfig signalCliApiConfig *utils.SignalCliApiConfig
number string number string
receivedMessagesMutex sync.Mutex receivedMessagesMutex sync.Mutex
receivedResponsesMutex sync.Mutex receivedResponsesMutex sync.Mutex
receiveSubscriptionsMutex sync.Mutex
address string address string
} }
@ -73,6 +85,8 @@ func NewJsonRpc2Client(signalCliApiConfig *utils.SignalCliApiConfig, number stri
number: number, number: number,
receivedResponsesById: make(map[string]chan JsonRpc2MessageResponse), receivedResponsesById: make(map[string]chan JsonRpc2MessageResponse),
receivedMessagesChannels: make(map[string]chan JsonRpc2ReceivedMessage), receivedMessagesChannels: make(map[string]chan JsonRpc2ReceivedMessage),
receiveSubscriptions: make(map[string]*receiveSubscription),
channelAccountByUuid: make(map[string]string),
} }
} }
@ -236,6 +250,19 @@ func (r *JsonRpc2Client) ReceiveData(number string, receiveWebhookUrl string) {
var resp1 JsonRpc2ReceivedMessage var resp1 JsonRpc2ReceivedMessage
json.Unmarshal([]byte(str), &resp1) json.Unmarshal([]byte(str), &resp1)
if resp1.Method == "receive" { if resp1.Method == "receive" {
// In manual receive-mode signal-cli wraps the envelope in
// {"subscription":N,"result":{...}}; in auto mode it sends
// the envelope directly. Unwrap so the broadcast format is
// the same in both modes and downstream consumers (e.g. the
// websocket handler) don't have to know which mode is in use.
var manualWrapper struct {
Subscription int64 `json:"subscription"`
Result json.RawMessage `json:"result"`
}
if err := json.Unmarshal(resp1.Params, &manualWrapper); err == nil && len(manualWrapper.Result) > 0 {
resp1.Params = manualWrapper.Result
}
r.receivedMessagesMutex.Lock() r.receivedMessagesMutex.Lock()
for _, c := range r.receivedMessagesChannels { for _, c := range r.receivedMessagesChannels {
select { select {
@ -244,7 +271,6 @@ func (r *JsonRpc2Client) ReceiveData(number string, receiveWebhookUrl string) {
default: default:
log.Debug("Couldn't send message to golang channel, as there's no receiver") log.Debug("Couldn't send message to golang channel, as there's no receiver")
} }
continue
} }
r.receivedMessagesMutex.Unlock() r.receivedMessagesMutex.Unlock()
@ -270,16 +296,102 @@ func (r *JsonRpc2Client) ReceiveData(number string, receiveWebhookUrl string) {
} }
} }
func (r *JsonRpc2Client) GetReceiveChannel() (chan JsonRpc2ReceivedMessage, string, error) { // subscribeReceive starts receiving messages for an account by calling
c := make(chan JsonRpc2ReceivedMessage) // signal-cli's subscribeReceive JSON-RPC method. Only relevant when
// signal-cli was launched with --receive-mode=manual; in auto mode the
// daemon pushes notifications without an explicit subscribe call.
// Returns the subscription id assigned by signal-cli.
func (r *JsonRpc2Client) subscribeReceive(account string) (int64, error) {
type subscribeReceiveArgs struct{}
resultStr, err := r.getRaw("subscribeReceive", &account, subscribeReceiveArgs{})
if err != nil {
return 0, err
}
var subscriptionId int64
if err := json.Unmarshal([]byte(resultStr), &subscriptionId); err != nil {
return 0, fmt.Errorf("subscribeReceive: couldn't parse subscription id from %q: %w", resultStr, err)
}
return subscriptionId, nil
}
// unsubscribeReceive cancels a manual-mode subscription previously
// returned by subscribeReceive.
func (r *JsonRpc2Client) unsubscribeReceive(account string, subscriptionId int64) error {
type unsubscribeReceiveArgs struct {
Subscription int64 `json:"subscription"`
}
_, err := r.getRaw("unsubscribeReceive", &account, unsubscribeReceiveArgs{Subscription: subscriptionId})
return err
}
// acquireReceiveSubscription ensures an active manual-mode subscription
// exists for the given account, refcounting concurrent websocket
// subscribers. The first caller for an account triggers a real
// subscribeReceive RPC; subsequent callers just bump the refcount.
func (r *JsonRpc2Client) acquireReceiveSubscription(account string) error {
r.receiveSubscriptionsMutex.Lock()
defer r.receiveSubscriptionsMutex.Unlock()
if sub, ok := r.receiveSubscriptions[account]; ok {
sub.refcount++
return nil
}
id, err := r.subscribeReceive(account)
if err != nil {
return err
}
r.receiveSubscriptions[account] = &receiveSubscription{id: id, refcount: 1}
log.Infof("Subscribed to receive notifications for account %s (subscription=%d)", account, id)
return nil
}
// releaseReceiveSubscription decrements the per-account refcount and
// cancels the subscription with signal-cli if it drops to zero.
func (r *JsonRpc2Client) releaseReceiveSubscription(account string) {
r.receiveSubscriptionsMutex.Lock()
defer r.receiveSubscriptionsMutex.Unlock()
sub, ok := r.receiveSubscriptions[account]
if !ok {
return
}
sub.refcount--
if sub.refcount > 0 {
return
}
if err := r.unsubscribeReceive(account, sub.id); err != nil {
log.Warnf("unsubscribeReceive failed for account %s (subscription=%d): %s", account, sub.id, err.Error())
} else {
log.Infof("Unsubscribed from receive notifications for account %s (subscription=%d)", account, sub.id)
}
delete(r.receiveSubscriptions, account)
}
// GetReceiveChannel returns a channel that will receive messages for the
// given account. If signal-cli is in manual receive-mode, it also acquires
// a subscription so that signal-cli starts forwarding messages for the
// account; in auto mode, account is unused (notifications flow regardless).
//
// account may be empty when the caller does not need a subscription
// (e.g. legacy callers in auto mode); in that case no subscribeReceive
// RPC is issued.
func (r *JsonRpc2Client) GetReceiveChannel(account string) (chan JsonRpc2ReceivedMessage, string, error) {
c := make(chan JsonRpc2ReceivedMessage, 64)
channelUuid, err := uuid.NewV4() channelUuid, err := uuid.NewV4()
if err != nil { if err != nil {
return c, "", err return c, "", err
} }
if account != "" {
if err := r.acquireReceiveSubscription(account); err != nil {
return c, "", fmt.Errorf("subscribeReceive failed for account %s: %w", account, err)
}
}
r.receivedMessagesMutex.Lock() r.receivedMessagesMutex.Lock()
r.receivedMessagesChannels[channelUuid.String()] = c r.receivedMessagesChannels[channelUuid.String()] = c
r.channelAccountByUuid[channelUuid.String()] = account
r.receivedMessagesMutex.Unlock() r.receivedMessagesMutex.Unlock()
return c, channelUuid.String(), nil return c, channelUuid.String(), nil
@ -288,5 +400,11 @@ func (r *JsonRpc2Client) GetReceiveChannel() (chan JsonRpc2ReceivedMessage, stri
func (r *JsonRpc2Client) RemoveReceiveChannel(channelUuid string) { func (r *JsonRpc2Client) RemoveReceiveChannel(channelUuid string) {
r.receivedMessagesMutex.Lock() r.receivedMessagesMutex.Lock()
delete(r.receivedMessagesChannels, channelUuid) delete(r.receivedMessagesChannels, channelUuid)
account := r.channelAccountByUuid[channelUuid]
delete(r.channelAccountByUuid, channelUuid)
r.receivedMessagesMutex.Unlock() r.receivedMessagesMutex.Unlock()
if account != "" {
r.releaseReceiveSubscription(account)
}
} }

View File

@ -13,7 +13,7 @@ import (
const supervisorctlConfigTemplate = ` const supervisorctlConfigTemplate = `
[program:%s] [program:%s]
process_name=%s process_name=%s
command=%s --output=json --config %s%s daemon %s%s%s%s --tcp 127.0.0.1:%d command=%s --output=json --config %s%s daemon%s %s%s%s%s --tcp 127.0.0.1:%d
autostart=true autostart=true
autorestart=true autorestart=true
startretries=10 startretries=10
@ -75,6 +75,22 @@ func main() {
signalCliIgnoreStickers = " --ignore-stickers" signalCliIgnoreStickers = " --ignore-stickers"
} }
// Receive mode: by default signal-cli auto-receives messages on the
// daemon and pushes them as JSON-RPC notifications, regardless of
// whether any websocket subscriber is connected. In a deploy where
// the subscribing client briefly disconnects, those notifications
// have nowhere to go and are silently dropped. With manual receive
// mode, signal-cli only fetches messages when explicitly requested
// via subscribeReceive — see jsonrpc2.go — so messages stay buffered
// on the Signal servers until a subscriber re-attaches.
signalCliReceiveMode := ""
receiveMode := utils.GetEnv("JSON_RPC_RECEIVE_MODE", "")
if receiveMode == "manual" {
signalCliReceiveMode = " --receive-mode=manual"
} else if receiveMode != "" && receiveMode != "on-start" {
log.Fatal("Invalid JSON_RPC_RECEIVE_MODE environment variable set! Must be 'manual' or 'on-start'.")
}
supervisorctlProgramName := "signal-cli-json-rpc-1" supervisorctlProgramName := "signal-cli-json-rpc-1"
supervisorctlLogFolder := "/var/log/" + supervisorctlProgramName supervisorctlLogFolder := "/var/log/" + supervisorctlProgramName
_, err := exec.Command("mkdir", "-p", supervisorctlLogFolder).Output() _, err := exec.Command("mkdir", "-p", supervisorctlLogFolder).Output()
@ -100,7 +116,7 @@ func main() {
supervisorctlConfigFilename := "/etc/supervisor/conf.d/" + "signal-cli-json-rpc-1.conf" supervisorctlConfigFilename := "/etc/supervisor/conf.d/" + "signal-cli-json-rpc-1.conf"
supervisorctlConfig := fmt.Sprintf(supervisorctlConfigTemplate, supervisorctlProgramName, supervisorctlProgramName, signalCliBinary, supervisorctlConfig := fmt.Sprintf(supervisorctlConfigTemplate, supervisorctlProgramName, supervisorctlProgramName, signalCliBinary,
signalCliConfigDir, trustNewIdentities, signalCliIgnoreAttachments, signalCliIgnoreStories, signalCliConfigDir, trustNewIdentities, signalCliReceiveMode, signalCliIgnoreAttachments, signalCliIgnoreStories,
signalCliIgnoreAvatars, signalCliIgnoreStickers, tcpPort, signalCliIgnoreAvatars, signalCliIgnoreStickers, tcpPort,
supervisorctlProgramName, supervisorctlProgramName) supervisorctlProgramName, supervisorctlProgramName)