Bernhard B 07260c0811 fixed bug in json-rpc webhook
* only post messages that were received via 'receive'

see #813
2026-03-14 15:54:49 +01:00

293 lines
7.2 KiB
Go

package client
import (
"bufio"
"bytes"
"encoding/json"
"errors"
"net"
"net/http"
"strconv"
"sync"
"time"
"github.com/bbernhard/signal-cli-rest-api/utils"
uuid "github.com/gofrs/uuid"
log "github.com/sirupsen/logrus"
"github.com/tidwall/sjson"
)
type Error struct {
Code int `json:"code"`
Message string `json:"message"`
Data json.RawMessage `json:"data"`
}
type JsonRpc2MessageResponse struct {
Id string `json:"id"`
Result json.RawMessage `json:"result"`
Err Error `json:"error"`
}
type JsonRpc2ReceivedMessage struct {
Method string `json:"method"`
Params json.RawMessage `json:"params"`
Err Error `json:"error"`
}
type RateLimitMessage struct {
Response RateLimitResponse `json:"response"`
}
type RateLimitResponse struct {
Results []RateLimitResult `json:"results"`
}
type RateLimitResult struct {
Token string `json:"token"`
}
type RateLimitErrorType struct {
ChallengeTokens []string
Err error
}
func (r *RateLimitErrorType) Error() string {
return r.Err.Error()
}
type JsonRpc2Client struct {
conn net.Conn
receivedResponsesById map[string]chan JsonRpc2MessageResponse
receivedMessagesChannels map[string]chan JsonRpc2ReceivedMessage
signalCliApiConfig *utils.SignalCliApiConfig
number string
receivedMessagesMutex sync.Mutex
receivedResponsesMutex sync.Mutex
address string
}
func NewJsonRpc2Client(signalCliApiConfig *utils.SignalCliApiConfig, number string) *JsonRpc2Client {
return &JsonRpc2Client{
signalCliApiConfig: signalCliApiConfig,
number: number,
receivedResponsesById: make(map[string]chan JsonRpc2MessageResponse),
receivedMessagesChannels: make(map[string]chan JsonRpc2ReceivedMessage),
}
}
func (r *JsonRpc2Client) Dial(address string, maxRetries int) error {
var err error
r.address = address
connected := false
for i := 0; i < maxRetries; i++ {
r.conn, err = net.Dial("tcp", address)
if err != nil {
log.Info("Waiting for signal-cli to start up in daemon mode...")
time.Sleep(2 * time.Second)
continue
}
connected = true
log.Info("Successfully connected to signal-cli in daemon mode")
break
}
if !connected {
return err
}
return nil
}
func (r *JsonRpc2Client) getRaw(command string, account *string, args interface{}) (string, error) {
type Request struct {
JsonRpc string `json:"jsonrpc"`
Method string `json:"method"`
Id string `json:"id"`
Params interface{} `json:"params,omitempty"`
}
trustModeStr := ""
trustMode, err := r.signalCliApiConfig.GetTrustModeForNumber(r.number)
if err == nil {
trustModeStr, err = utils.TrustModeToString(trustMode)
if err != nil {
trustModeStr = ""
log.Error("Invalid trust mode: ", trustModeStr)
}
}
u, err := uuid.NewV4()
if err != nil {
return "", err
}
fullCommand := Request{JsonRpc: "2.0", Method: command, Id: u.String()}
if args != nil {
fullCommand.Params = args
}
fullCommandBytes, err := json.Marshal(fullCommand)
if err != nil {
return "", err
}
if trustModeStr != "" {
fullCommandBytes, err = sjson.SetBytes(fullCommandBytes, "params.trustNewIdentities", trustModeStr)
if err != nil {
return "", err
}
}
if account != nil {
fullCommandBytes, err = sjson.SetBytes(fullCommandBytes, "params.account", account)
if err != nil {
return "", err
}
}
log.Debug("json-rpc command: ", string(fullCommandBytes))
_, err = r.conn.Write([]byte(string(fullCommandBytes) + "\n"))
if err != nil {
return "", err
}
responseChan := make(chan JsonRpc2MessageResponse)
r.receivedResponsesMutex.Lock()
r.receivedResponsesById[u.String()] = responseChan
r.receivedResponsesMutex.Unlock()
var resp JsonRpc2MessageResponse
resp = <-responseChan
r.receivedResponsesMutex.Lock()
delete(r.receivedResponsesById, u.String())
r.receivedResponsesMutex.Unlock()
log.Debug("json-rpc command response message: ", string(resp.Result))
log.Debug("json-rpc response error: ", string(resp.Err.Message))
if resp.Err.Code != 0 {
log.Debug("json-rpc command error code: ", resp.Err.Code)
if resp.Err.Code == -5 {
var rateLimitMessage RateLimitMessage
err = json.Unmarshal(resp.Err.Data, &rateLimitMessage)
if err != nil {
return "", errors.New(resp.Err.Message + " (Couldn't parse JSON for more details")
}
challengeTokens := []string{}
for _, rateLimitResult := range rateLimitMessage.Response.Results {
challengeTokens = append(challengeTokens, rateLimitResult.Token)
}
return "", &RateLimitErrorType{
ChallengeTokens: challengeTokens,
Err: errors.New(resp.Err.Message),
}
}
return "", errors.New(resp.Err.Message)
}
return string(resp.Result), nil
}
func postMessageToWebhook(webhookUrl string, data []byte) error {
r, err := http.NewRequest("POST", webhookUrl, bytes.NewBuffer(data))
if err != nil {
return err
}
r.Header.Add("Content-Type", "application/json")
client := &http.Client{}
res, err := client.Do(r)
if err != nil {
return err
}
defer res.Body.Close()
log.Info(res.StatusCode)
if res.StatusCode != 200 && res.StatusCode != 201 {
return errors.New("Unexpected status code returned (" + strconv.Itoa(res.StatusCode) + ")")
}
return nil
}
func (r *JsonRpc2Client) ReceiveData(number string, receiveWebhookUrl string) {
connbuf := bufio.NewReader(r.conn)
for {
str, err := connbuf.ReadString('\n')
if err != nil {
log.Error("Lost connection to signal-cli...attempting to reconnect (", err.Error(), ")")
r.conn.Close()
err = r.Dial(r.address, 15)
if err != nil {
log.Fatal("Unable to reconnect to signal-cli: ", err.Error(), "...aborting")
}
connbuf = bufio.NewReader(r.conn)
log.Info("Successfully reconnected to signal-cli")
continue
}
log.Debug("json-rpc received data: ", str)
var resp1 JsonRpc2ReceivedMessage
json.Unmarshal([]byte(str), &resp1)
if resp1.Method == "receive" {
r.receivedMessagesMutex.Lock()
for _, c := range r.receivedMessagesChannels {
select {
case c <- resp1:
log.Debug("Message sent to golang channel")
default:
log.Debug("Couldn't send message to golang channel, as there's no receiver")
}
continue
}
r.receivedMessagesMutex.Unlock()
if receiveWebhookUrl != "" {
err = postMessageToWebhook(receiveWebhookUrl, []byte(str))
if err != nil {
log.Error("Couldn't post data to webhook: ", err)
}
}
}
var resp2 JsonRpc2MessageResponse
err = json.Unmarshal([]byte(str), &resp2)
if err == nil {
if resp2.Id != "" {
if responseChan, ok := r.receivedResponsesById[resp2.Id]; ok {
responseChan <- resp2
}
}
} else {
log.Warn("Received unparsable message: ", str)
}
}
}
func (r *JsonRpc2Client) GetReceiveChannel() (chan JsonRpc2ReceivedMessage, string, error) {
c := make(chan JsonRpc2ReceivedMessage)
channelUuid, err := uuid.NewV4()
if err != nil {
return c, "", err
}
r.receivedMessagesMutex.Lock()
r.receivedMessagesChannels[channelUuid.String()] = c
r.receivedMessagesMutex.Unlock()
return c, channelUuid.String(), nil
}
func (r *JsonRpc2Client) RemoveReceiveChannel(channelUuid string) {
r.receivedMessagesMutex.Lock()
delete(r.receivedMessagesChannels, channelUuid)
r.receivedMessagesMutex.Unlock()
}