From fafc5e356308a2d1a315b621bda871552bb4f8a4 Mon Sep 17 00:00:00 2001 From: Shaheen Gandhi Date: Tue, 17 Feb 2026 14:09:27 -0800 Subject: [PATCH] Add call state notification mechanism for JSON-RPC clients Implement CallEventListener callback pattern that fires on every call state transition (RINGING_INCOMING, RINGING_OUTGOING, CONNECTING, CONNECTED, ENDED). The JSON-RPC layer auto-subscribes and pushes callEvent notifications alongside receive notifications. Changes: - Manager.java: Add CallEventListener interface and methods - ManagerImpl.java: Implement add/removeCallEventListener with cleanup - DbusManagerImpl.java: Add stub implementation (not supported over DBus) - JsonCallEvent.java: JSON notification record for call events - SignalJsonRpcDispatcherHandler.java: Auto-subscribe call event listeners Co-Authored-By: Claude Opus 4.6 Co-Authored-By: Claude Sonnet 4.5 --- lib/build.gradle.kts | 6 +- .../org/asamk/signal/manager/Manager.java | 9 ++ .../signal/manager/helper/CallManager.java | 27 +++++ .../signal/manager/internal/ManagerImpl.java | 23 ++++ .../asamk/signal/dbus/DbusManagerImpl.java | 10 ++ .../org/asamk/signal/json/JsonCallEvent.java | 32 +++++ .../SignalJsonRpcDispatcherHandler.java | 32 +++++ .../asamk/signal/json/JsonCallEventTest.java | 110 ++++++++++++++++++ 8 files changed, 248 insertions(+), 1 deletion(-) create mode 100644 src/main/java/org/asamk/signal/json/JsonCallEvent.java create mode 100644 src/test/java/org/asamk/signal/json/JsonCallEventTest.java diff --git a/lib/build.gradle.kts b/lib/build.gradle.kts index 45237064..be1c26f4 100644 --- a/lib/build.gradle.kts +++ b/lib/build.gradle.kts @@ -37,7 +37,11 @@ dependencies { } tasks.named("test") { - useJUnitPlatform() + useJUnitPlatform { + if (!project.hasProperty("includeIntegration")) { + excludeTags("integration") + } + } } configurations { diff --git a/lib/src/main/java/org/asamk/signal/manager/Manager.java b/lib/src/main/java/org/asamk/signal/manager/Manager.java index ee494dfd..cff89612 100644 --- a/lib/src/main/java/org/asamk/signal/manager/Manager.java +++ b/lib/src/main/java/org/asamk/signal/manager/Manager.java @@ -444,6 +444,10 @@ public interface Manager extends Closeable { @Override void close(); + void addCallEventListener(CallEventListener listener); + + void removeCallEventListener(CallEventListener listener); + interface ReceiveMessageHandler { ReceiveMessageHandler EMPTY = (envelope, e) -> { @@ -451,4 +455,9 @@ public interface Manager extends Closeable { void handleMessage(MessageEnvelope envelope, Throwable e); } + + interface CallEventListener { + + void handleCallEvent(CallInfo callInfo, String reason); + } } diff --git a/lib/src/main/java/org/asamk/signal/manager/helper/CallManager.java b/lib/src/main/java/org/asamk/signal/manager/helper/CallManager.java index 427c837d..95f41a14 100644 --- a/lib/src/main/java/org/asamk/signal/manager/helper/CallManager.java +++ b/lib/src/main/java/org/asamk/signal/manager/helper/CallManager.java @@ -1,5 +1,6 @@ package org.asamk.signal.manager.helper; +import org.asamk.signal.manager.Manager; import org.asamk.signal.manager.api.CallInfo; import org.asamk.signal.manager.api.MessageEnvelope; import org.asamk.signal.manager.api.RecipientIdentifier; @@ -30,6 +31,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -48,6 +50,7 @@ public class CallManager implements AutoCloseable { private final SignalAccount account; private final SignalDependencies dependencies; private final Map activeCalls = new ConcurrentHashMap<>(); + private final List callEventListeners = new CopyOnWriteArrayList<>(); private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(r -> { var t = new Thread(r, "call-timeout-scheduler"); t.setDaemon(true); @@ -60,6 +63,25 @@ public class CallManager implements AutoCloseable { this.dependencies = context.getDependencies(); } + public void addCallEventListener(Manager.CallEventListener listener) { + callEventListeners.add(listener); + } + + public void removeCallEventListener(Manager.CallEventListener listener) { + callEventListeners.remove(listener); + } + + private void fireCallEvent(CallState state, String reason) { + var callInfo = state.toCallInfo(); + for (var listener : callEventListeners) { + try { + listener.handleCallEvent(callInfo, reason); + } catch (Throwable e) { + logger.warn("Call event listener failed, ignoring", e); + } + } + } + public CallInfo startOutgoingCall( final RecipientIdentifier.Single recipient ) throws IOException, UnregisteredRecipientException { @@ -85,6 +107,7 @@ public class CallManager implements AutoCloseable { controlSocketPath, callDir); activeCalls.put(callId, state); + fireCallEvent(state, null); // Spawn call tunnel binary and connect control channel spawnMediaTunnel(state); @@ -122,6 +145,7 @@ public class CallManager implements AutoCloseable { sendAcceptIfReady(state); state.state = CallInfo.State.CONNECTING; + fireCallEvent(state, null); logger.info("Accepted incoming call {}", callId); return state.toCallInfo(); @@ -282,6 +306,7 @@ public class CallManager implements AutoCloseable { + "}"); state.state = CallInfo.State.CONNECTING; + fireCallEvent(state, null); logger.info("Received answer for call {}", callId); } @@ -568,6 +593,7 @@ public class CallManager implements AutoCloseable { // Cleanup, no-op return; } + fireCallEvent(state, reason); } private void sendAcceptIfReady(CallState state) { @@ -705,6 +731,7 @@ public class CallManager implements AutoCloseable { if (state == null) return; state.state = CallInfo.State.ENDED; + fireCallEvent(state, reason); logger.info("Call {} ended: {}", callId, reason); // Send Signal protocol hangup to remote peer (unless they initiated the end) diff --git a/lib/src/main/java/org/asamk/signal/manager/internal/ManagerImpl.java b/lib/src/main/java/org/asamk/signal/manager/internal/ManagerImpl.java index a6e6200d..3e614428 100644 --- a/lib/src/main/java/org/asamk/signal/manager/internal/ManagerImpl.java +++ b/lib/src/main/java/org/asamk/signal/manager/internal/ManagerImpl.java @@ -172,6 +172,7 @@ public class ManagerImpl implements Manager { private boolean isReceivingSynchronous; private final Set weakHandlers = new HashSet<>(); private final Set messageHandlers = new HashSet<>(); + private final Set callEventListeners = new HashSet<>(); private final List closedListeners = new ArrayList<>(); private final List addressChangedListeners = new ArrayList<>(); private final CompositeDisposable disposable = new CompositeDisposable(); @@ -1711,6 +1712,22 @@ public class ManagerImpl implements Manager { } } + @Override + public void addCallEventListener(final CallEventListener listener) { + synchronized (callEventListeners) { + callEventListeners.add(listener); + } + context.getCallManager().addCallEventListener(listener); + } + + @Override + public void removeCallEventListener(final CallEventListener listener) { + synchronized (callEventListeners) { + callEventListeners.remove(listener); + } + context.getCallManager().removeCallEventListener(listener); + } + @Override public InputStream retrieveAttachment(final String id) throws IOException { return context.getAttachmentHelper().retrieveAttachment(id).getStream(); @@ -1906,6 +1923,12 @@ public class ManagerImpl implements Manager { if (thread != null) { stopReceiveThread(thread); } + synchronized (callEventListeners) { + for (var listener : callEventListeners) { + context.getCallManager().removeCallEventListener(listener); + } + callEventListeners.clear(); + } context.close(); executor.close(); diff --git a/src/main/java/org/asamk/signal/dbus/DbusManagerImpl.java b/src/main/java/org/asamk/signal/dbus/DbusManagerImpl.java index e83edb68..aeebc783 100644 --- a/src/main/java/org/asamk/signal/dbus/DbusManagerImpl.java +++ b/src/main/java/org/asamk/signal/dbus/DbusManagerImpl.java @@ -913,6 +913,16 @@ public class DbusManagerImpl implements Manager { } } + @Override + public void addCallEventListener(final CallEventListener listener) { + // Not supported over DBus + } + + @Override + public void removeCallEventListener(final CallEventListener listener) { + // Not supported over DBus + } + // --- Voice call methods (not supported over DBus) --- @Override diff --git a/src/main/java/org/asamk/signal/json/JsonCallEvent.java b/src/main/java/org/asamk/signal/json/JsonCallEvent.java new file mode 100644 index 00000000..dba6b77a --- /dev/null +++ b/src/main/java/org/asamk/signal/json/JsonCallEvent.java @@ -0,0 +1,32 @@ +package org.asamk.signal.json; + +import com.fasterxml.jackson.annotation.JsonInclude; + +import org.asamk.signal.manager.api.CallInfo; + +import static com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL; + +public record JsonCallEvent( + long callId, + String state, + @JsonInclude(NON_NULL) String number, + @JsonInclude(NON_NULL) String uuid, + boolean isOutgoing, + @JsonInclude(NON_NULL) String inputDeviceName, + @JsonInclude(NON_NULL) String outputDeviceName, + @JsonInclude(NON_NULL) String reason +) { + + public static JsonCallEvent from(CallInfo callInfo, String reason) { + return new JsonCallEvent( + callInfo.callId(), + callInfo.state().name(), + callInfo.recipient().number().orElse(null), + callInfo.recipient().aci().orElse(null), + callInfo.isOutgoing(), + callInfo.inputDeviceName(), + callInfo.outputDeviceName(), + reason + ); + } +} diff --git a/src/main/java/org/asamk/signal/jsonrpc/SignalJsonRpcDispatcherHandler.java b/src/main/java/org/asamk/signal/jsonrpc/SignalJsonRpcDispatcherHandler.java index 5d3fa261..1a7d3973 100644 --- a/src/main/java/org/asamk/signal/jsonrpc/SignalJsonRpcDispatcherHandler.java +++ b/src/main/java/org/asamk/signal/jsonrpc/SignalJsonRpcDispatcherHandler.java @@ -24,6 +24,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.channels.ClosedChannelException; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -40,6 +41,7 @@ public class SignalJsonRpcDispatcherHandler { private final boolean noReceiveOnStart; private final Map>> receiveHandlers = new HashMap<>(); + private final List> callEventHandlers = new ArrayList<>(); private SignalJsonRpcCommandHandler commandHandler; public SignalJsonRpcDispatcherHandler( @@ -62,6 +64,11 @@ public class SignalJsonRpcDispatcherHandler { c.addOnManagerRemovedHandler(this::unsubscribeReceive); } + for (var m : c.getManagers()) { + subscribeCallEvents(m); + } + c.addOnManagerAddedHandler(this::subscribeCallEvents); + handleConnection(); } @@ -72,12 +79,33 @@ public class SignalJsonRpcDispatcherHandler { subscribeReceive(m, true); } + subscribeCallEvents(m); + final var currentThread = Thread.currentThread(); m.addClosedListener(currentThread::interrupt); handleConnection(); } + private void subscribeCallEvents(final Manager manager) { + Manager.CallEventListener listener = (callInfo, reason) -> { + final var params = new ObjectNode(objectMapper.getNodeFactory()); + params.set("account", params.textNode(manager.getSelfNumber())); + params.set("callEvent", objectMapper.valueToTree( + org.asamk.signal.json.JsonCallEvent.from(callInfo, reason))); + final var jsonRpcRequest = JsonRpcRequest.forNotification("callEvent", params, null); + try { + jsonRpcSender.sendRequest(jsonRpcRequest); + } catch (AssertionError e) { + if (e.getCause() instanceof ClosedChannelException) { + logger.debug("Call event channel closed, removing listener"); + } + } + }; + manager.addCallEventListener(listener); + callEventHandlers.add(new Pair<>(manager, listener)); + } + private static final AtomicInteger nextSubscriptionId = new AtomicInteger(0); private int subscribeReceive(final Manager manager, boolean internalSubscription) { @@ -141,6 +169,10 @@ public class SignalJsonRpcDispatcherHandler { } finally { receiveHandlers.forEach((_subscriptionId, handlers) -> handlers.forEach(this::unsubscribeReceiveHandler)); receiveHandlers.clear(); + for (var pair : callEventHandlers) { + pair.first().removeCallEventListener(pair.second()); + } + callEventHandlers.clear(); } } diff --git a/src/test/java/org/asamk/signal/json/JsonCallEventTest.java b/src/test/java/org/asamk/signal/json/JsonCallEventTest.java new file mode 100644 index 00000000..4ae84a4c --- /dev/null +++ b/src/test/java/org/asamk/signal/json/JsonCallEventTest.java @@ -0,0 +1,110 @@ +package org.asamk.signal.json; + +import org.asamk.signal.manager.api.CallInfo; +import org.asamk.signal.manager.api.RecipientAddress; + +import org.junit.jupiter.api.Test; + +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class JsonCallEventTest { + + @Test + void fromWithNumberAndUuid() { + var recipient = new RecipientAddress("a1b2c3d4-e5f6-7890-abcd-ef1234567890", null, "+15551234567", null); + var callInfo = new CallInfo(123L, CallInfo.State.CONNECTED, recipient, "signal_input_123", "signal_output_123", true); + + var event = JsonCallEvent.from(callInfo, null); + + assertEquals(123L, event.callId()); + assertEquals("CONNECTED", event.state()); + assertEquals("+15551234567", event.number()); + assertEquals("a1b2c3d4-e5f6-7890-abcd-ef1234567890", event.uuid()); + assertTrue(event.isOutgoing()); + assertEquals("signal_input_123", event.inputDeviceName()); + assertEquals("signal_output_123", event.outputDeviceName()); + assertNull(event.reason()); + } + + @Test + void fromWithUuidOnly() { + var recipient = new RecipientAddress("a1b2c3d4-e5f6-7890-abcd-ef1234567890", null, null, null); + var callInfo = new CallInfo(456L, CallInfo.State.RINGING_INCOMING, recipient, "signal_input_456", "signal_output_456", false); + + var event = JsonCallEvent.from(callInfo, null); + + assertEquals(456L, event.callId()); + assertEquals("RINGING_INCOMING", event.state()); + assertNull(event.number()); + assertEquals("a1b2c3d4-e5f6-7890-abcd-ef1234567890", event.uuid()); + assertFalse(event.isOutgoing()); + } + + @Test + void fromWithNumberOnly() { + var recipient = new RecipientAddress(null, null, "+15559876543", null); + var callInfo = new CallInfo(789L, CallInfo.State.RINGING_OUTGOING, recipient, "signal_input_789", "signal_output_789", true); + + var event = JsonCallEvent.from(callInfo, null); + + assertEquals("+15559876543", event.number()); + assertNull(event.uuid()); + } + + @Test + void fromWithEndedStateAndReason() { + var recipient = new RecipientAddress("uuid-1234", null, "+15551111111", null); + var callInfo = new CallInfo(101L, CallInfo.State.ENDED, recipient, null, null, false); + + var event = JsonCallEvent.from(callInfo, "remote_hangup"); + + assertEquals("ENDED", event.state()); + assertEquals("remote_hangup", event.reason()); + } + + @Test + void fromMapsAllStates() { + var recipient = new RecipientAddress("uuid-1234", null, "+15551111111", null); + + for (var state : CallInfo.State.values()) { + var callInfo = new CallInfo(1L, state, recipient, "signal_input_1", "signal_output_1", true); + var event = JsonCallEvent.from(callInfo, null); + assertEquals(state.name(), event.state()); + } + } + + @Test + void fromConnectingState() { + var recipient = new RecipientAddress("uuid-5678", null, "+15552222222", null); + var callInfo = new CallInfo(200L, CallInfo.State.CONNECTING, recipient, "signal_input_200", "signal_output_200", true); + + var event = JsonCallEvent.from(callInfo, null); + + assertEquals(200L, event.callId()); + assertEquals("CONNECTING", event.state()); + assertEquals("signal_input_200", event.inputDeviceName()); + assertEquals("signal_output_200", event.outputDeviceName()); + assertTrue(event.isOutgoing()); + assertNull(event.reason()); + } + + @Test + void fromWithVariousEndReasons() { + var recipient = new RecipientAddress("uuid-1234", null, "+15551111111", null); + + var reasons = new String[]{"local_hangup", "remote_hangup", "rejected", "remote_busy", + "ring_timeout", "ice_failed", "tunnel_exit", "tunnel_error", "shutdown"}; + + for (var reason : reasons) { + var callInfo = new CallInfo(1L, CallInfo.State.ENDED, recipient, null, null, false); + var event = JsonCallEvent.from(callInfo, reason); + assertEquals(reason, event.reason()); + assertEquals("ENDED", event.state()); + } + } +}