diff --git a/lib/src/main/java/org/asamk/signal/manager/Manager.java b/lib/src/main/java/org/asamk/signal/manager/Manager.java index dac629e7..1a38ded3 100644 --- a/lib/src/main/java/org/asamk/signal/manager/Manager.java +++ b/lib/src/main/java/org/asamk/signal/manager/Manager.java @@ -407,6 +407,10 @@ public interface Manager extends Closeable { void addClosedListener(Runnable listener); + void addUnidentifiedKeepAlive(String token); + + void removeUnidentifiedKeepAlive(String token); + InputStream retrieveAttachment(final String id) throws IOException; InputStream retrieveContactAvatar(final RecipientIdentifier.Single recipient) throws IOException, UnregisteredRecipientException; diff --git a/lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java b/lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java index 8577d7cf..31f1c04d 100644 --- a/lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java +++ b/lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java @@ -101,9 +101,6 @@ public class ReceiveHelper { signalWebSocket.connect(); signalWebSocket.registerKeepAliveToken("receive"); - final var unauthenticatedSignalWebSocket = dependencies.getUnauthenticatedSignalWebSocket(); - unauthenticatedSignalWebSocket.registerKeepAliveToken("receive"); - try { receiveMessagesInternal(signalWebSocket, timeout, maxMessages, handler, queuedActions); } finally { @@ -112,7 +109,6 @@ public class ReceiveHelper { queuedActions.clear(); signalWebSocket.removeKeepAliveToken("receive"); signalWebSocket.disconnect(); - unauthenticatedSignalWebSocket.removeKeepAliveToken("receive"); webSocketStateDisposable.dispose(); shouldStop = false; } diff --git a/lib/src/main/java/org/asamk/signal/manager/internal/ManagerImpl.java b/lib/src/main/java/org/asamk/signal/manager/internal/ManagerImpl.java index 394fa894..f9cd3e70 100644 --- a/lib/src/main/java/org/asamk/signal/manager/internal/ManagerImpl.java +++ b/lib/src/main/java/org/asamk/signal/manager/internal/ManagerImpl.java @@ -1708,6 +1708,16 @@ public class ManagerImpl implements Manager { } } + @Override + public void addUnidentifiedKeepAlive(final String token) { + dependencies.getUnauthenticatedSignalWebSocket().registerKeepAliveToken(token); + } + + @Override + public void removeUnidentifiedKeepAlive(final String token) { + dependencies.getUnauthenticatedSignalWebSocket().removeKeepAliveToken(token); + } + @Override public void addCallEventListener(final CallEventListener listener) { context.getCallManager().addCallEventListener(listener); diff --git a/src/main/java/org/asamk/signal/dbus/DbusManagerImpl.java b/src/main/java/org/asamk/signal/dbus/DbusManagerImpl.java index 45b937ae..4db02b18 100644 --- a/src/main/java/org/asamk/signal/dbus/DbusManagerImpl.java +++ b/src/main/java/org/asamk/signal/dbus/DbusManagerImpl.java @@ -915,6 +915,14 @@ public class DbusManagerImpl implements Manager { } } + @Override + public void addUnidentifiedKeepAlive(final String token) { + } + + @Override + public void removeUnidentifiedKeepAlive(final String token) { + } + @Override public void addCallEventListener(final CallEventListener listener) { // Not supported over DBus diff --git a/src/main/java/org/asamk/signal/dbus/DbusSignalImpl.java b/src/main/java/org/asamk/signal/dbus/DbusSignalImpl.java index a9983caa..175d6eb4 100644 --- a/src/main/java/org/asamk/signal/dbus/DbusSignalImpl.java +++ b/src/main/java/org/asamk/signal/dbus/DbusSignalImpl.java @@ -100,6 +100,7 @@ public class DbusSignalImpl implements Signal, AutoCloseable { public void initObjects() { exportObjects(); + m.addUnidentifiedKeepAlive("dbus"); if (!noReceiveOnStart) { subscribeReceive(); } @@ -116,6 +117,7 @@ public class DbusSignalImpl implements Signal, AutoCloseable { @Override public void close() { + m.removeUnidentifiedKeepAlive("dbus"); if (dbusMessageHandler != null) { m.removeReceiveHandler(dbusMessageHandler); dbusMessageHandler = null; diff --git a/src/main/java/org/asamk/signal/jsonrpc/SignalJsonRpcDispatcherHandler.java b/src/main/java/org/asamk/signal/jsonrpc/SignalJsonRpcDispatcherHandler.java index 166c822a..a8ecf29a 100644 --- a/src/main/java/org/asamk/signal/jsonrpc/SignalJsonRpcDispatcherHandler.java +++ b/src/main/java/org/asamk/signal/jsonrpc/SignalJsonRpcDispatcherHandler.java @@ -30,6 +30,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -43,8 +44,11 @@ public class SignalJsonRpcDispatcherHandler { private final JsonRpcReader jsonRpcReader; private final boolean noReceiveOnStart; - private final Map>> receiveHandlers = new HashMap<>(); - private final Map>> callEventHandlers = new HashMap<>(); + private final Map>> receiveHandlers = new HashMap<>(); + private final Map>> callEventHandlers = new HashMap<>(); + private final String connectionKeepAliveToken = "jsonrpc-" + UUID.randomUUID(); + private final List keepAliveManagers = new ArrayList<>(); + private boolean connectionActive = true; private SignalJsonRpcCommandHandler commandHandler; public SignalJsonRpcDispatcherHandler( @@ -71,6 +75,10 @@ public class SignalJsonRpcDispatcherHandler { c.addOnManagerAddedHandler(m -> callEventHandlers.forEach((subscriptionId, handlers) -> handlers.add( createCallEventHandler(m, subscriptionId)))); + c.getManagers().forEach(this::registerKeepAlive); + c.addOnManagerAddedHandler(this::registerKeepAlive); + c.addOnManagerRemovedHandler(this::unregisterKeepAlive); + handleConnection(); } @@ -84,6 +92,8 @@ public class SignalJsonRpcDispatcherHandler { final var currentThread = Thread.currentThread(); m.addClosedListener(currentThread::interrupt); + registerKeepAlive(m); + handleConnection(); } @@ -204,14 +214,29 @@ public class SignalJsonRpcDispatcherHandler { subscriptionId.ifPresent(this::unsubscribeReceive); } + private void registerKeepAlive(final Manager m) { + if (!connectionActive) return; + m.addUnidentifiedKeepAlive(connectionKeepAliveToken); + keepAliveManagers.add(m); + } + + private void unregisterKeepAlive(final Manager m) { + if (!connectionActive) return; + m.removeUnidentifiedKeepAlive(connectionKeepAliveToken); + keepAliveManagers.remove(m); + } + private void handleConnection() { try { jsonRpcReader.readMessages((method, params) -> commandHandler.handleRequest(objectMapper, method, params), response -> logger.debug("Received unexpected response for id {}", response.getId())); } finally { + connectionActive = false; receiveHandlers.forEach((_subscriptionId, handlers) -> handlers.forEach(this::unsubscribeReceiveHandler)); receiveHandlers.clear(); unsubscribeAllCallEvents(); + keepAliveManagers.forEach(m -> m.removeUnidentifiedKeepAlive(connectionKeepAliveToken)); + keepAliveManagers.clear(); } } diff --git a/src/test/java/org/asamk/signal/jsonrpc/SubscribeCallEventsTest.java b/src/test/java/org/asamk/signal/jsonrpc/SubscribeCallEventsTest.java index bfc56ed3..bbb18b68 100644 --- a/src/test/java/org/asamk/signal/jsonrpc/SubscribeCallEventsTest.java +++ b/src/test/java/org/asamk/signal/jsonrpc/SubscribeCallEventsTest.java @@ -495,6 +495,14 @@ class SubscribeCallEventsTest { public void addClosedListener(Runnable l) { } + @Override + public void addUnidentifiedKeepAlive(String token) { + } + + @Override + public void removeUnidentifiedKeepAlive(String token) { + } + @Override public InputStream retrieveAttachment(String id) { return null; @@ -740,8 +748,8 @@ class SubscribeCallEventsTest { assertEquals(1, manager1.addCount.get(), "manager1 should have one listener"); assertEquals(1, manager2.addCount.get(), "manager2 should have one listener"); - // Also registers an onManagerAdded handler for receive and one for call events - assertEquals(2, multi.addedHandlers.size(), "should register onManagerAdded handlers"); + // Registers onManagerAdded handlers for receive, call events, and keep-alive + assertEquals(3, multi.addedHandlers.size(), "should register onManagerAdded handlers"); } @Test