Add call state notification mechanism for JSON-RPC clients

Implement CallEventListener callback pattern that fires on every call
state transition (RINGING_INCOMING, RINGING_OUTGOING, CONNECTING,
CONNECTED, ENDED). The JSON-RPC layer auto-subscribes and pushes
callEvent notifications alongside receive notifications.

Changes:
- Manager.java: Add CallEventListener interface and methods
- ManagerImpl.java: Implement add/removeCallEventListener with cleanup
- DbusManagerImpl.java: Add stub implementation (not supported over DBus)
- JsonCallEvent.java: JSON notification record for call events
- SignalJsonRpcDispatcherHandler.java: Auto-subscribe call event listeners

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
Shaheen Gandhi 2026-02-17 14:09:27 -08:00
parent fa010d03cf
commit fafc5e3563
8 changed files with 248 additions and 1 deletions

View File

@ -37,7 +37,11 @@ dependencies {
}
tasks.named<Test>("test") {
useJUnitPlatform()
useJUnitPlatform {
if (!project.hasProperty("includeIntegration")) {
excludeTags("integration")
}
}
}
configurations {

View File

@ -444,6 +444,10 @@ public interface Manager extends Closeable {
@Override
void close();
void addCallEventListener(CallEventListener listener);
void removeCallEventListener(CallEventListener listener);
interface ReceiveMessageHandler {
ReceiveMessageHandler EMPTY = (envelope, e) -> {
@ -451,4 +455,9 @@ public interface Manager extends Closeable {
void handleMessage(MessageEnvelope envelope, Throwable e);
}
interface CallEventListener {
void handleCallEvent(CallInfo callInfo, String reason);
}
}

View File

@ -1,5 +1,6 @@
package org.asamk.signal.manager.helper;
import org.asamk.signal.manager.Manager;
import org.asamk.signal.manager.api.CallInfo;
import org.asamk.signal.manager.api.MessageEnvelope;
import org.asamk.signal.manager.api.RecipientIdentifier;
@ -30,6 +31,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@ -48,6 +50,7 @@ public class CallManager implements AutoCloseable {
private final SignalAccount account;
private final SignalDependencies dependencies;
private final Map<Long, CallState> activeCalls = new ConcurrentHashMap<>();
private final List<Manager.CallEventListener> callEventListeners = new CopyOnWriteArrayList<>();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
var t = new Thread(r, "call-timeout-scheduler");
t.setDaemon(true);
@ -60,6 +63,25 @@ public class CallManager implements AutoCloseable {
this.dependencies = context.getDependencies();
}
public void addCallEventListener(Manager.CallEventListener listener) {
callEventListeners.add(listener);
}
public void removeCallEventListener(Manager.CallEventListener listener) {
callEventListeners.remove(listener);
}
private void fireCallEvent(CallState state, String reason) {
var callInfo = state.toCallInfo();
for (var listener : callEventListeners) {
try {
listener.handleCallEvent(callInfo, reason);
} catch (Throwable e) {
logger.warn("Call event listener failed, ignoring", e);
}
}
}
public CallInfo startOutgoingCall(
final RecipientIdentifier.Single recipient
) throws IOException, UnregisteredRecipientException {
@ -85,6 +107,7 @@ public class CallManager implements AutoCloseable {
controlSocketPath,
callDir);
activeCalls.put(callId, state);
fireCallEvent(state, null);
// Spawn call tunnel binary and connect control channel
spawnMediaTunnel(state);
@ -122,6 +145,7 @@ public class CallManager implements AutoCloseable {
sendAcceptIfReady(state);
state.state = CallInfo.State.CONNECTING;
fireCallEvent(state, null);
logger.info("Accepted incoming call {}", callId);
return state.toCallInfo();
@ -282,6 +306,7 @@ public class CallManager implements AutoCloseable {
+ "}");
state.state = CallInfo.State.CONNECTING;
fireCallEvent(state, null);
logger.info("Received answer for call {}", callId);
}
@ -568,6 +593,7 @@ public class CallManager implements AutoCloseable {
// Cleanup, no-op
return;
}
fireCallEvent(state, reason);
}
private void sendAcceptIfReady(CallState state) {
@ -705,6 +731,7 @@ public class CallManager implements AutoCloseable {
if (state == null) return;
state.state = CallInfo.State.ENDED;
fireCallEvent(state, reason);
logger.info("Call {} ended: {}", callId, reason);
// Send Signal protocol hangup to remote peer (unless they initiated the end)

View File

@ -172,6 +172,7 @@ public class ManagerImpl implements Manager {
private boolean isReceivingSynchronous;
private final Set<ReceiveMessageHandler> weakHandlers = new HashSet<>();
private final Set<ReceiveMessageHandler> messageHandlers = new HashSet<>();
private final Set<CallEventListener> callEventListeners = new HashSet<>();
private final List<Runnable> closedListeners = new ArrayList<>();
private final List<Runnable> addressChangedListeners = new ArrayList<>();
private final CompositeDisposable disposable = new CompositeDisposable();
@ -1711,6 +1712,22 @@ public class ManagerImpl implements Manager {
}
}
@Override
public void addCallEventListener(final CallEventListener listener) {
synchronized (callEventListeners) {
callEventListeners.add(listener);
}
context.getCallManager().addCallEventListener(listener);
}
@Override
public void removeCallEventListener(final CallEventListener listener) {
synchronized (callEventListeners) {
callEventListeners.remove(listener);
}
context.getCallManager().removeCallEventListener(listener);
}
@Override
public InputStream retrieveAttachment(final String id) throws IOException {
return context.getAttachmentHelper().retrieveAttachment(id).getStream();
@ -1906,6 +1923,12 @@ public class ManagerImpl implements Manager {
if (thread != null) {
stopReceiveThread(thread);
}
synchronized (callEventListeners) {
for (var listener : callEventListeners) {
context.getCallManager().removeCallEventListener(listener);
}
callEventListeners.clear();
}
context.close();
executor.close();

View File

@ -913,6 +913,16 @@ public class DbusManagerImpl implements Manager {
}
}
@Override
public void addCallEventListener(final CallEventListener listener) {
// Not supported over DBus
}
@Override
public void removeCallEventListener(final CallEventListener listener) {
// Not supported over DBus
}
// --- Voice call methods (not supported over DBus) ---
@Override

View File

@ -0,0 +1,32 @@
package org.asamk.signal.json;
import com.fasterxml.jackson.annotation.JsonInclude;
import org.asamk.signal.manager.api.CallInfo;
import static com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL;
public record JsonCallEvent(
long callId,
String state,
@JsonInclude(NON_NULL) String number,
@JsonInclude(NON_NULL) String uuid,
boolean isOutgoing,
@JsonInclude(NON_NULL) String inputDeviceName,
@JsonInclude(NON_NULL) String outputDeviceName,
@JsonInclude(NON_NULL) String reason
) {
public static JsonCallEvent from(CallInfo callInfo, String reason) {
return new JsonCallEvent(
callInfo.callId(),
callInfo.state().name(),
callInfo.recipient().number().orElse(null),
callInfo.recipient().aci().orElse(null),
callInfo.isOutgoing(),
callInfo.inputDeviceName(),
callInfo.outputDeviceName(),
reason
);
}
}

View File

@ -24,6 +24,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -40,6 +41,7 @@ public class SignalJsonRpcDispatcherHandler {
private final boolean noReceiveOnStart;
private final Map<Integer, List<Pair<Manager, Manager.ReceiveMessageHandler>>> receiveHandlers = new HashMap<>();
private final List<Pair<Manager, Manager.CallEventListener>> callEventHandlers = new ArrayList<>();
private SignalJsonRpcCommandHandler commandHandler;
public SignalJsonRpcDispatcherHandler(
@ -62,6 +64,11 @@ public class SignalJsonRpcDispatcherHandler {
c.addOnManagerRemovedHandler(this::unsubscribeReceive);
}
for (var m : c.getManagers()) {
subscribeCallEvents(m);
}
c.addOnManagerAddedHandler(this::subscribeCallEvents);
handleConnection();
}
@ -72,12 +79,33 @@ public class SignalJsonRpcDispatcherHandler {
subscribeReceive(m, true);
}
subscribeCallEvents(m);
final var currentThread = Thread.currentThread();
m.addClosedListener(currentThread::interrupt);
handleConnection();
}
private void subscribeCallEvents(final Manager manager) {
Manager.CallEventListener listener = (callInfo, reason) -> {
final var params = new ObjectNode(objectMapper.getNodeFactory());
params.set("account", params.textNode(manager.getSelfNumber()));
params.set("callEvent", objectMapper.valueToTree(
org.asamk.signal.json.JsonCallEvent.from(callInfo, reason)));
final var jsonRpcRequest = JsonRpcRequest.forNotification("callEvent", params, null);
try {
jsonRpcSender.sendRequest(jsonRpcRequest);
} catch (AssertionError e) {
if (e.getCause() instanceof ClosedChannelException) {
logger.debug("Call event channel closed, removing listener");
}
}
};
manager.addCallEventListener(listener);
callEventHandlers.add(new Pair<>(manager, listener));
}
private static final AtomicInteger nextSubscriptionId = new AtomicInteger(0);
private int subscribeReceive(final Manager manager, boolean internalSubscription) {
@ -141,6 +169,10 @@ public class SignalJsonRpcDispatcherHandler {
} finally {
receiveHandlers.forEach((_subscriptionId, handlers) -> handlers.forEach(this::unsubscribeReceiveHandler));
receiveHandlers.clear();
for (var pair : callEventHandlers) {
pair.first().removeCallEventListener(pair.second());
}
callEventHandlers.clear();
}
}

View File

@ -0,0 +1,110 @@
package org.asamk.signal.json;
import org.asamk.signal.manager.api.CallInfo;
import org.asamk.signal.manager.api.RecipientAddress;
import org.junit.jupiter.api.Test;
import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
class JsonCallEventTest {
@Test
void fromWithNumberAndUuid() {
var recipient = new RecipientAddress("a1b2c3d4-e5f6-7890-abcd-ef1234567890", null, "+15551234567", null);
var callInfo = new CallInfo(123L, CallInfo.State.CONNECTED, recipient, "signal_input_123", "signal_output_123", true);
var event = JsonCallEvent.from(callInfo, null);
assertEquals(123L, event.callId());
assertEquals("CONNECTED", event.state());
assertEquals("+15551234567", event.number());
assertEquals("a1b2c3d4-e5f6-7890-abcd-ef1234567890", event.uuid());
assertTrue(event.isOutgoing());
assertEquals("signal_input_123", event.inputDeviceName());
assertEquals("signal_output_123", event.outputDeviceName());
assertNull(event.reason());
}
@Test
void fromWithUuidOnly() {
var recipient = new RecipientAddress("a1b2c3d4-e5f6-7890-abcd-ef1234567890", null, null, null);
var callInfo = new CallInfo(456L, CallInfo.State.RINGING_INCOMING, recipient, "signal_input_456", "signal_output_456", false);
var event = JsonCallEvent.from(callInfo, null);
assertEquals(456L, event.callId());
assertEquals("RINGING_INCOMING", event.state());
assertNull(event.number());
assertEquals("a1b2c3d4-e5f6-7890-abcd-ef1234567890", event.uuid());
assertFalse(event.isOutgoing());
}
@Test
void fromWithNumberOnly() {
var recipient = new RecipientAddress(null, null, "+15559876543", null);
var callInfo = new CallInfo(789L, CallInfo.State.RINGING_OUTGOING, recipient, "signal_input_789", "signal_output_789", true);
var event = JsonCallEvent.from(callInfo, null);
assertEquals("+15559876543", event.number());
assertNull(event.uuid());
}
@Test
void fromWithEndedStateAndReason() {
var recipient = new RecipientAddress("uuid-1234", null, "+15551111111", null);
var callInfo = new CallInfo(101L, CallInfo.State.ENDED, recipient, null, null, false);
var event = JsonCallEvent.from(callInfo, "remote_hangup");
assertEquals("ENDED", event.state());
assertEquals("remote_hangup", event.reason());
}
@Test
void fromMapsAllStates() {
var recipient = new RecipientAddress("uuid-1234", null, "+15551111111", null);
for (var state : CallInfo.State.values()) {
var callInfo = new CallInfo(1L, state, recipient, "signal_input_1", "signal_output_1", true);
var event = JsonCallEvent.from(callInfo, null);
assertEquals(state.name(), event.state());
}
}
@Test
void fromConnectingState() {
var recipient = new RecipientAddress("uuid-5678", null, "+15552222222", null);
var callInfo = new CallInfo(200L, CallInfo.State.CONNECTING, recipient, "signal_input_200", "signal_output_200", true);
var event = JsonCallEvent.from(callInfo, null);
assertEquals(200L, event.callId());
assertEquals("CONNECTING", event.state());
assertEquals("signal_input_200", event.inputDeviceName());
assertEquals("signal_output_200", event.outputDeviceName());
assertTrue(event.isOutgoing());
assertNull(event.reason());
}
@Test
void fromWithVariousEndReasons() {
var recipient = new RecipientAddress("uuid-1234", null, "+15551111111", null);
var reasons = new String[]{"local_hangup", "remote_hangup", "rejected", "remote_busy",
"ring_timeout", "ice_failed", "tunnel_exit", "tunnel_error", "shutdown"};
for (var reason : reasons) {
var callInfo = new CallInfo(1L, CallInfo.State.ENDED, recipient, null, null, false);
var event = JsonCallEvent.from(callInfo, reason);
assertEquals(reason, event.reason());
assertEquals("ENDED", event.state());
}
}
}