From 7919a0f4aa5ea689e5ee3174dd95a0d53db577c6 Mon Sep 17 00:00:00 2001 From: AsamK Date: Wed, 1 Apr 2026 22:38:36 +0200 Subject: [PATCH] Change subscribeCallEvents command to match subscribeReceive --- .../SignalJsonRpcDispatcherHandler.java | 173 +++++++++++------- .../jsonrpc/SubscribeCallEventsTest.java | 18 +- 2 files changed, 122 insertions(+), 69 deletions(-) diff --git a/src/main/java/org/asamk/signal/jsonrpc/SignalJsonRpcDispatcherHandler.java b/src/main/java/org/asamk/signal/jsonrpc/SignalJsonRpcDispatcherHandler.java index 68a4ae21..14ff0076 100644 --- a/src/main/java/org/asamk/signal/jsonrpc/SignalJsonRpcDispatcherHandler.java +++ b/src/main/java/org/asamk/signal/jsonrpc/SignalJsonRpcDispatcherHandler.java @@ -14,6 +14,7 @@ import org.asamk.signal.commands.JsonRpcMultiCommand; import org.asamk.signal.commands.JsonRpcSingleCommand; import org.asamk.signal.commands.exceptions.CommandException; import org.asamk.signal.commands.exceptions.UserErrorException; +import org.asamk.signal.json.JsonCallEvent; import org.asamk.signal.json.JsonReceiveMessageHandler; import org.asamk.signal.manager.Manager; import org.asamk.signal.manager.MultiAccountManager; @@ -24,7 +25,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.channels.ClosedChannelException; -import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -41,7 +42,7 @@ public class SignalJsonRpcDispatcherHandler { private final boolean noReceiveOnStart; private final Map>> receiveHandlers = new HashMap<>(); - private final List> callEventHandlers = new ArrayList<>(); + private final Map>> callEventHandlers = new HashMap<>(); private SignalJsonRpcCommandHandler commandHandler; public SignalJsonRpcDispatcherHandler( @@ -63,6 +64,10 @@ public class SignalJsonRpcDispatcherHandler { c.addOnManagerAddedHandler(m -> subscribeReceive(m, true)); c.addOnManagerRemovedHandler(this::unsubscribeReceive); } + c.addOnManagerAddedHandler(m -> receiveHandlers.forEach((subscriptionId, handlers) -> handlers.add( + createReceiveHandler(m, subscriptionId, false)))); + c.addOnManagerAddedHandler(m -> callEventHandlers.forEach((subscriptionId, handlers) -> handlers.add( + createCallEventHandler(m, subscriptionId)))); handleConnection(); } @@ -80,47 +85,57 @@ public class SignalJsonRpcDispatcherHandler { handleConnection(); } - private void subscribeCallEvents(final Manager manager) { - // Prevent duplicate subscriptions for the same manager - if (callEventHandlers.stream().anyMatch(p -> p.first().equals(manager))) { - return; - } - Manager.CallEventListener listener = (callInfo, reason) -> { + private int subscribeCallEvents(final Manager manager) { + return subscribeCallEvents(List.of(manager)); + } + + private int subscribeCallEvents(final Collection managers) { + final var subscriptionId = nextSubscriptionId.getAndIncrement(); + final var listeners = managers.stream().map(m -> createCallEventHandler(m, subscriptionId)).toList(); + callEventHandlers.put(subscriptionId, listeners); + return subscriptionId; + } + + private Pair createCallEventHandler(final Manager m, final int subscriptionId) { + final Manager.CallEventListener listener = (callInfo, reason) -> { final var params = new ObjectNode(objectMapper.getNodeFactory()); - params.set("account", params.textNode(manager.getSelfNumber())); - params.set("callEvent", objectMapper.valueToTree( - org.asamk.signal.json.JsonCallEvent.from(callInfo, reason))); + params.set("subscription", IntNode.valueOf(subscriptionId)); + params.set("result", objectMapper.valueToTree(JsonCallEvent.from(callInfo, reason))); final var jsonRpcRequest = JsonRpcRequest.forNotification("callEvent", params, null); try { jsonRpcSender.sendRequest(jsonRpcRequest); } catch (AssertionError e) { if (e.getCause() instanceof ClosedChannelException) { - logger.debug("Call event channel closed, removing listener"); + unsubscribeReceive(subscriptionId); } } }; - manager.addCallEventListener(listener); - callEventHandlers.add(new Pair<>(manager, listener)); + m.addCallEventListener(listener); + return new Pair<>(m, listener); } - private void unsubscribeCallEvents(final Manager manager) { - var iterator = callEventHandlers.iterator(); - while (iterator.hasNext()) { - var pair = iterator.next(); - if (pair.first().equals(manager)) { - pair.first().removeCallEventListener(pair.second()); - iterator.remove(); - } + private boolean unsubscribeCallEvents(final int subscriptionId) { + final var handlers = callEventHandlers.remove(subscriptionId); + if (handlers == null) { + return false; } + for (final var pair : handlers) { + unsubscribeCallEventHandler(pair); + } + return true; } private void unsubscribeAllCallEvents() { - for (var pair : callEventHandlers) { - pair.first().removeCallEventListener(pair.second()); - } + callEventHandlers.forEach((_subscriptionId, handlers) -> handlers.forEach(this::unsubscribeCallEventHandler)); callEventHandlers.clear(); } + private void unsubscribeCallEventHandler(final Pair pair) { + final var m = pair.first(); + final var handler = pair.second(); + m.removeCallEventListener(handler); + } + private static final AtomicInteger nextSubscriptionId = new AtomicInteger(0); private int subscribeReceive(final Manager manager, boolean internalSubscription) { @@ -129,34 +144,42 @@ public class SignalJsonRpcDispatcherHandler { private int subscribeReceive(final List managers, boolean internalSubscription) { final var subscriptionId = nextSubscriptionId.getAndIncrement(); - final var handlers = managers.stream().map(m -> { - final var receiveMessageHandler = new JsonReceiveMessageHandler(m, s -> { - ContainerNode params; - if (internalSubscription) { - params = objectMapper.valueToTree(s); - } else { - final var paramsNode = new ObjectNode(objectMapper.getNodeFactory()); - paramsNode.set("subscription", IntNode.valueOf(subscriptionId)); - paramsNode.set("result", objectMapper.valueToTree(s)); - params = paramsNode; - } - final var jsonRpcRequest = JsonRpcRequest.forNotification("receive", params, null); - try { - jsonRpcSender.sendRequest(jsonRpcRequest); - } catch (AssertionError e) { - if (e.getCause() instanceof ClosedChannelException) { - unsubscribeReceive(subscriptionId); - } - } - }); - m.addReceiveHandler(receiveMessageHandler); - return new Pair<>(m, (Manager.ReceiveMessageHandler) receiveMessageHandler); - }).toList(); + final var handlers = managers.stream() + .map(m -> createReceiveHandler(m, subscriptionId, internalSubscription)) + .toList(); receiveHandlers.put(subscriptionId, handlers); return subscriptionId; } + private Pair createReceiveHandler( + final Manager m, + final int subscriptionId, + final boolean internalSubscription + ) { + final var receiveMessageHandler = new JsonReceiveMessageHandler(m, s -> { + ContainerNode params; + if (internalSubscription) { + params = objectMapper.valueToTree(s); + } else { + final var paramsNode = new ObjectNode(objectMapper.getNodeFactory()); + paramsNode.set("subscription", IntNode.valueOf(subscriptionId)); + paramsNode.set("result", objectMapper.valueToTree(s)); + params = paramsNode; + } + final var jsonRpcRequest = JsonRpcRequest.forNotification("receive", params, null); + try { + jsonRpcSender.sendRequest(jsonRpcRequest); + } catch (AssertionError e) { + if (e.getCause() instanceof ClosedChannelException) { + unsubscribeReceive(subscriptionId); + } + } + }); + m.addReceiveHandler(receiveMessageHandler); + return new Pair<>(m, receiveMessageHandler); + } + private boolean unsubscribeReceive(final int subscriptionId) { final var handlers = receiveHandlers.remove(subscriptionId); if (handlers == null) { @@ -304,7 +327,8 @@ public class SignalJsonRpcDispatcherHandler { final Manager m, final JsonWriter jsonWriter ) throws CommandException { - subscribeCallEvents(m); + final var subscriptionId = subscribeCallEvents(m); + jsonWriter.write(subscriptionId); } @Override @@ -313,14 +337,12 @@ public class SignalJsonRpcDispatcherHandler { final MultiAccountManager c, final JsonWriter jsonWriter ) throws CommandException { - for (var m : c.getManagers()) { - subscribeCallEvents(m); - } - c.addOnManagerAddedHandler(SignalJsonRpcDispatcherHandler.this::subscribeCallEvents); + final var subscriptionId = subscribeCallEvents(c.getManagers()); + jsonWriter.write(subscriptionId); } } - private class UnsubscribeCallEventsCommand implements JsonRpcSingleCommand, JsonRpcMultiCommand { + private class UnsubscribeCallEventsCommand implements JsonRpcSingleCommand, JsonRpcMultiCommand { @Override public String getName() { @@ -328,21 +350,48 @@ public class SignalJsonRpcDispatcherHandler { } @Override - public void handleCommand( - final Void request, - final Manager m, - final JsonWriter jsonWriter - ) throws CommandException { - unsubscribeCallEvents(m); + public TypeReference getRequestType() { + return new TypeReference<>() {}; } @Override public void handleCommand( - final Void request, + final JsonNode request, + final Manager m, + final JsonWriter jsonWriter + ) throws CommandException { + final var subscriptionId = getSubscriptionId(request); + if (subscriptionId == null) { + throw new UserErrorException("Missing subscription parameter with subscription id"); + } else { + if (!unsubscribeCallEvents(subscriptionId)) { + throw new UserErrorException("Unknown subscription id"); + } + } + } + + @Override + public void handleCommand( + final JsonNode request, final MultiAccountManager c, final JsonWriter jsonWriter ) throws CommandException { - unsubscribeAllCallEvents(); + final var subscriptionId = getSubscriptionId(request); + if (subscriptionId == null) { + throw new UserErrorException("Missing subscription parameter with subscription id"); + } else { + if (!unsubscribeCallEvents(subscriptionId)) { + throw new UserErrorException("Unknown subscription id"); + } + } + } + + private Integer getSubscriptionId(final JsonNode request) { + return switch (request) { + case ArrayNode req -> req.get(0).asInt(); + case ObjectNode req -> req.get("subscription").asInt(); + case null, default -> null; + }; } } } diff --git a/src/test/java/org/asamk/signal/jsonrpc/SubscribeCallEventsTest.java b/src/test/java/org/asamk/signal/jsonrpc/SubscribeCallEventsTest.java index 48cbde22..37c00829 100644 --- a/src/test/java/org/asamk/signal/jsonrpc/SubscribeCallEventsTest.java +++ b/src/test/java/org/asamk/signal/jsonrpc/SubscribeCallEventsTest.java @@ -635,6 +635,10 @@ class SubscribeCallEventsTest { return "{\"jsonrpc\":\"2.0\",\"id\":" + id + ",\"method\":\"" + method + "\"}"; } + private static String jsonRpcCall(int id, String method, String params) { + return "{\"jsonrpc\":\"2.0\",\"id\":" + id + ",\"method\":\"" + method + "\",\"params\":" + params + "}"; + } + // --- Single-account mode tests --- @Test @@ -670,7 +674,7 @@ class SubscribeCallEventsTest { } @Test - void subscribeCallEventsIsIdempotent() { + void subscribeCallEventsCanBeCalledMultipleTimes() { var manager = new StubManager("+15551234567"); var feeder = new LineFeeder(); var writer = new CapturingJsonWriter(); @@ -681,8 +685,8 @@ class SubscribeCallEventsTest { var handler = new SignalJsonRpcDispatcherHandler(writer, feeder::getLine, true); handler.handleConnection(manager); - // Idempotent guard: second call should not add another listener - assertEquals(1, manager.addCount.get(), "duplicate subscribeCallEvents should be ignored"); + // The implementation allows multiple subscriptions, so two calls add two listeners + assertEquals(2, manager.addCount.get(), "multiple subscribeCallEvents should add multiple listeners"); } @Test @@ -692,7 +696,7 @@ class SubscribeCallEventsTest { var writer = new CapturingJsonWriter(); feeder.addLine(jsonRpcCall(1, "subscribeCallEvents")); - feeder.addLine(jsonRpcCall(2, "unsubscribeCallEvents")); + feeder.addLine(jsonRpcCall(2, "unsubscribeCallEvents", "{\"subscription\":0}")); var handler = new SignalJsonRpcDispatcherHandler(writer, feeder::getLine, true); handler.handleConnection(manager); @@ -737,8 +741,8 @@ class SubscribeCallEventsTest { assertEquals(1, manager1.addCount.get(), "manager1 should have one listener"); assertEquals(1, manager2.addCount.get(), "manager2 should have one listener"); - // Also registers an onManagerAdded handler - assertEquals(1, multi.addedHandlers.size(), "should register onManagerAdded handler"); + // Also registers an onManagerAdded handler for receive and one for call events + assertEquals(2, multi.addedHandlers.size(), "should register onManagerAdded handlers"); } @Test @@ -751,7 +755,7 @@ class SubscribeCallEventsTest { var writer = new CapturingJsonWriter(); feeder.addLine(jsonRpcCall(1, "subscribeCallEvents")); - feeder.addLine(jsonRpcCall(2, "unsubscribeCallEvents")); + feeder.addLine(jsonRpcCall(2, "unsubscribeCallEvents", "{\"subscription\":0}")); var handler = new SignalJsonRpcDispatcherHandler(writer, feeder::getLine, true); handler.handleConnection(multi);