Change subscribeCallEvents command to match subscribeReceive

This commit is contained in:
AsamK 2026-04-01 22:38:36 +02:00
parent 7a8a34f45e
commit 7919a0f4aa
2 changed files with 122 additions and 69 deletions

View File

@ -14,6 +14,7 @@ import org.asamk.signal.commands.JsonRpcMultiCommand;
import org.asamk.signal.commands.JsonRpcSingleCommand;
import org.asamk.signal.commands.exceptions.CommandException;
import org.asamk.signal.commands.exceptions.UserErrorException;
import org.asamk.signal.json.JsonCallEvent;
import org.asamk.signal.json.JsonReceiveMessageHandler;
import org.asamk.signal.manager.Manager;
import org.asamk.signal.manager.MultiAccountManager;
@ -24,7 +25,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -41,7 +42,7 @@ public class SignalJsonRpcDispatcherHandler {
private final boolean noReceiveOnStart;
private final Map<Integer, List<Pair<Manager, Manager.ReceiveMessageHandler>>> receiveHandlers = new HashMap<>();
private final List<Pair<Manager, Manager.CallEventListener>> callEventHandlers = new ArrayList<>();
private final Map<Integer, List<Pair<Manager, Manager.CallEventListener>>> callEventHandlers = new HashMap<>();
private SignalJsonRpcCommandHandler commandHandler;
public SignalJsonRpcDispatcherHandler(
@ -63,6 +64,10 @@ public class SignalJsonRpcDispatcherHandler {
c.addOnManagerAddedHandler(m -> subscribeReceive(m, true));
c.addOnManagerRemovedHandler(this::unsubscribeReceive);
}
c.addOnManagerAddedHandler(m -> receiveHandlers.forEach((subscriptionId, handlers) -> handlers.add(
createReceiveHandler(m, subscriptionId, false))));
c.addOnManagerAddedHandler(m -> callEventHandlers.forEach((subscriptionId, handlers) -> handlers.add(
createCallEventHandler(m, subscriptionId))));
handleConnection();
}
@ -80,47 +85,57 @@ public class SignalJsonRpcDispatcherHandler {
handleConnection();
}
private void subscribeCallEvents(final Manager manager) {
// Prevent duplicate subscriptions for the same manager
if (callEventHandlers.stream().anyMatch(p -> p.first().equals(manager))) {
return;
}
Manager.CallEventListener listener = (callInfo, reason) -> {
private int subscribeCallEvents(final Manager manager) {
return subscribeCallEvents(List.of(manager));
}
private int subscribeCallEvents(final Collection<Manager> managers) {
final var subscriptionId = nextSubscriptionId.getAndIncrement();
final var listeners = managers.stream().map(m -> createCallEventHandler(m, subscriptionId)).toList();
callEventHandlers.put(subscriptionId, listeners);
return subscriptionId;
}
private Pair<Manager, Manager.CallEventListener> createCallEventHandler(final Manager m, final int subscriptionId) {
final Manager.CallEventListener listener = (callInfo, reason) -> {
final var params = new ObjectNode(objectMapper.getNodeFactory());
params.set("account", params.textNode(manager.getSelfNumber()));
params.set("callEvent", objectMapper.valueToTree(
org.asamk.signal.json.JsonCallEvent.from(callInfo, reason)));
params.set("subscription", IntNode.valueOf(subscriptionId));
params.set("result", objectMapper.valueToTree(JsonCallEvent.from(callInfo, reason)));
final var jsonRpcRequest = JsonRpcRequest.forNotification("callEvent", params, null);
try {
jsonRpcSender.sendRequest(jsonRpcRequest);
} catch (AssertionError e) {
if (e.getCause() instanceof ClosedChannelException) {
logger.debug("Call event channel closed, removing listener");
unsubscribeReceive(subscriptionId);
}
}
};
manager.addCallEventListener(listener);
callEventHandlers.add(new Pair<>(manager, listener));
m.addCallEventListener(listener);
return new Pair<>(m, listener);
}
private void unsubscribeCallEvents(final Manager manager) {
var iterator = callEventHandlers.iterator();
while (iterator.hasNext()) {
var pair = iterator.next();
if (pair.first().equals(manager)) {
pair.first().removeCallEventListener(pair.second());
iterator.remove();
}
private boolean unsubscribeCallEvents(final int subscriptionId) {
final var handlers = callEventHandlers.remove(subscriptionId);
if (handlers == null) {
return false;
}
for (final var pair : handlers) {
unsubscribeCallEventHandler(pair);
}
return true;
}
private void unsubscribeAllCallEvents() {
for (var pair : callEventHandlers) {
pair.first().removeCallEventListener(pair.second());
}
callEventHandlers.forEach((_subscriptionId, handlers) -> handlers.forEach(this::unsubscribeCallEventHandler));
callEventHandlers.clear();
}
private void unsubscribeCallEventHandler(final Pair<Manager, Manager.CallEventListener> pair) {
final var m = pair.first();
final var handler = pair.second();
m.removeCallEventListener(handler);
}
private static final AtomicInteger nextSubscriptionId = new AtomicInteger(0);
private int subscribeReceive(final Manager manager, boolean internalSubscription) {
@ -129,34 +144,42 @@ public class SignalJsonRpcDispatcherHandler {
private int subscribeReceive(final List<Manager> managers, boolean internalSubscription) {
final var subscriptionId = nextSubscriptionId.getAndIncrement();
final var handlers = managers.stream().map(m -> {
final var receiveMessageHandler = new JsonReceiveMessageHandler(m, s -> {
ContainerNode<?> params;
if (internalSubscription) {
params = objectMapper.valueToTree(s);
} else {
final var paramsNode = new ObjectNode(objectMapper.getNodeFactory());
paramsNode.set("subscription", IntNode.valueOf(subscriptionId));
paramsNode.set("result", objectMapper.valueToTree(s));
params = paramsNode;
}
final var jsonRpcRequest = JsonRpcRequest.forNotification("receive", params, null);
try {
jsonRpcSender.sendRequest(jsonRpcRequest);
} catch (AssertionError e) {
if (e.getCause() instanceof ClosedChannelException) {
unsubscribeReceive(subscriptionId);
}
}
});
m.addReceiveHandler(receiveMessageHandler);
return new Pair<>(m, (Manager.ReceiveMessageHandler) receiveMessageHandler);
}).toList();
final var handlers = managers.stream()
.map(m -> createReceiveHandler(m, subscriptionId, internalSubscription))
.toList();
receiveHandlers.put(subscriptionId, handlers);
return subscriptionId;
}
private Pair<Manager, Manager.ReceiveMessageHandler> createReceiveHandler(
final Manager m,
final int subscriptionId,
final boolean internalSubscription
) {
final var receiveMessageHandler = new JsonReceiveMessageHandler(m, s -> {
ContainerNode<?> params;
if (internalSubscription) {
params = objectMapper.valueToTree(s);
} else {
final var paramsNode = new ObjectNode(objectMapper.getNodeFactory());
paramsNode.set("subscription", IntNode.valueOf(subscriptionId));
paramsNode.set("result", objectMapper.valueToTree(s));
params = paramsNode;
}
final var jsonRpcRequest = JsonRpcRequest.forNotification("receive", params, null);
try {
jsonRpcSender.sendRequest(jsonRpcRequest);
} catch (AssertionError e) {
if (e.getCause() instanceof ClosedChannelException) {
unsubscribeReceive(subscriptionId);
}
}
});
m.addReceiveHandler(receiveMessageHandler);
return new Pair<>(m, receiveMessageHandler);
}
private boolean unsubscribeReceive(final int subscriptionId) {
final var handlers = receiveHandlers.remove(subscriptionId);
if (handlers == null) {
@ -304,7 +327,8 @@ public class SignalJsonRpcDispatcherHandler {
final Manager m,
final JsonWriter jsonWriter
) throws CommandException {
subscribeCallEvents(m);
final var subscriptionId = subscribeCallEvents(m);
jsonWriter.write(subscriptionId);
}
@Override
@ -313,14 +337,12 @@ public class SignalJsonRpcDispatcherHandler {
final MultiAccountManager c,
final JsonWriter jsonWriter
) throws CommandException {
for (var m : c.getManagers()) {
subscribeCallEvents(m);
}
c.addOnManagerAddedHandler(SignalJsonRpcDispatcherHandler.this::subscribeCallEvents);
final var subscriptionId = subscribeCallEvents(c.getManagers());
jsonWriter.write(subscriptionId);
}
}
private class UnsubscribeCallEventsCommand implements JsonRpcSingleCommand<Void>, JsonRpcMultiCommand<Void> {
private class UnsubscribeCallEventsCommand implements JsonRpcSingleCommand<JsonNode>, JsonRpcMultiCommand<JsonNode> {
@Override
public String getName() {
@ -328,21 +350,48 @@ public class SignalJsonRpcDispatcherHandler {
}
@Override
public void handleCommand(
final Void request,
final Manager m,
final JsonWriter jsonWriter
) throws CommandException {
unsubscribeCallEvents(m);
public TypeReference<JsonNode> getRequestType() {
return new TypeReference<>() {};
}
@Override
public void handleCommand(
final Void request,
final JsonNode request,
final Manager m,
final JsonWriter jsonWriter
) throws CommandException {
final var subscriptionId = getSubscriptionId(request);
if (subscriptionId == null) {
throw new UserErrorException("Missing subscription parameter with subscription id");
} else {
if (!unsubscribeCallEvents(subscriptionId)) {
throw new UserErrorException("Unknown subscription id");
}
}
}
@Override
public void handleCommand(
final JsonNode request,
final MultiAccountManager c,
final JsonWriter jsonWriter
) throws CommandException {
unsubscribeAllCallEvents();
final var subscriptionId = getSubscriptionId(request);
if (subscriptionId == null) {
throw new UserErrorException("Missing subscription parameter with subscription id");
} else {
if (!unsubscribeCallEvents(subscriptionId)) {
throw new UserErrorException("Unknown subscription id");
}
}
}
private Integer getSubscriptionId(final JsonNode request) {
return switch (request) {
case ArrayNode req -> req.get(0).asInt();
case ObjectNode req -> req.get("subscription").asInt();
case null, default -> null;
};
}
}
}

View File

@ -635,6 +635,10 @@ class SubscribeCallEventsTest {
return "{\"jsonrpc\":\"2.0\",\"id\":" + id + ",\"method\":\"" + method + "\"}";
}
private static String jsonRpcCall(int id, String method, String params) {
return "{\"jsonrpc\":\"2.0\",\"id\":" + id + ",\"method\":\"" + method + "\",\"params\":" + params + "}";
}
// --- Single-account mode tests ---
@Test
@ -670,7 +674,7 @@ class SubscribeCallEventsTest {
}
@Test
void subscribeCallEventsIsIdempotent() {
void subscribeCallEventsCanBeCalledMultipleTimes() {
var manager = new StubManager("+15551234567");
var feeder = new LineFeeder();
var writer = new CapturingJsonWriter();
@ -681,8 +685,8 @@ class SubscribeCallEventsTest {
var handler = new SignalJsonRpcDispatcherHandler(writer, feeder::getLine, true);
handler.handleConnection(manager);
// Idempotent guard: second call should not add another listener
assertEquals(1, manager.addCount.get(), "duplicate subscribeCallEvents should be ignored");
// The implementation allows multiple subscriptions, so two calls add two listeners
assertEquals(2, manager.addCount.get(), "multiple subscribeCallEvents should add multiple listeners");
}
@Test
@ -692,7 +696,7 @@ class SubscribeCallEventsTest {
var writer = new CapturingJsonWriter();
feeder.addLine(jsonRpcCall(1, "subscribeCallEvents"));
feeder.addLine(jsonRpcCall(2, "unsubscribeCallEvents"));
feeder.addLine(jsonRpcCall(2, "unsubscribeCallEvents", "{\"subscription\":0}"));
var handler = new SignalJsonRpcDispatcherHandler(writer, feeder::getLine, true);
handler.handleConnection(manager);
@ -737,8 +741,8 @@ class SubscribeCallEventsTest {
assertEquals(1, manager1.addCount.get(), "manager1 should have one listener");
assertEquals(1, manager2.addCount.get(), "manager2 should have one listener");
// Also registers an onManagerAdded handler
assertEquals(1, multi.addedHandlers.size(), "should register onManagerAdded handler");
// Also registers an onManagerAdded handler for receive and one for call events
assertEquals(2, multi.addedHandlers.size(), "should register onManagerAdded handlers");
}
@Test
@ -751,7 +755,7 @@ class SubscribeCallEventsTest {
var writer = new CapturingJsonWriter();
feeder.addLine(jsonRpcCall(1, "subscribeCallEvents"));
feeder.addLine(jsonRpcCall(2, "unsubscribeCallEvents"));
feeder.addLine(jsonRpcCall(2, "unsubscribeCallEvents", "{\"subscription\":0}"));
var handler = new SignalJsonRpcDispatcherHandler(writer, feeder::getLine, true);
handler.handleConnection(multi);