diff --git a/lib/src/main/java/org/asamk/signal/manager/Manager.java b/lib/src/main/java/org/asamk/signal/manager/Manager.java index fda3a323..4cc8351e 100644 --- a/lib/src/main/java/org/asamk/signal/manager/Manager.java +++ b/lib/src/main/java/org/asamk/signal/manager/Manager.java @@ -407,6 +407,10 @@ public interface Manager extends Closeable { void addClosedListener(Runnable listener); + void addUnidentifiedKeepAlive(String token); + + void removeUnidentifiedKeepAlive(String token); + InputStream retrieveAttachment(final String id) throws IOException; InputStream retrieveContactAvatar(final RecipientIdentifier.Single recipient) throws IOException, UnregisteredRecipientException; diff --git a/lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java b/lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java index 9d7d477d..73291d9e 100644 --- a/lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java +++ b/lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java @@ -102,9 +102,6 @@ public class ReceiveHelper { signalWebSocket.connect(); signalWebSocket.registerKeepAliveToken("receive"); - final var unauthenticatedSignalWebSocket = dependencies.getUnauthenticatedSignalWebSocket(); - unauthenticatedSignalWebSocket.registerKeepAliveToken("receive"); - try { receiveMessagesInternal(signalWebSocket, timeout, maxMessages, handler, queuedActions); } finally { @@ -113,7 +110,6 @@ public class ReceiveHelper { queuedActions.clear(); signalWebSocket.removeKeepAliveToken("receive"); signalWebSocket.disconnect(); - unauthenticatedSignalWebSocket.removeKeepAliveToken("receive"); webSocketStateDisposable.dispose(); shouldStop = false; } diff --git a/lib/src/main/java/org/asamk/signal/manager/internal/ManagerImpl.java b/lib/src/main/java/org/asamk/signal/manager/internal/ManagerImpl.java index 08214d01..2bf55e9c 100644 --- a/lib/src/main/java/org/asamk/signal/manager/internal/ManagerImpl.java +++ b/lib/src/main/java/org/asamk/signal/manager/internal/ManagerImpl.java @@ -1712,6 +1712,16 @@ public class ManagerImpl implements Manager { } } + @Override + public void addUnidentifiedKeepAlive(final String token) { + dependencies.getUnauthenticatedSignalWebSocket().registerKeepAliveToken(token); + } + + @Override + public void removeUnidentifiedKeepAlive(final String token) { + dependencies.getUnauthenticatedSignalWebSocket().removeKeepAliveToken(token); + } + @Override public void addCallEventListener(final CallEventListener listener) { context.getCallManager().addCallEventListener(listener); diff --git a/src/main/java/org/asamk/signal/dbus/DbusManagerImpl.java b/src/main/java/org/asamk/signal/dbus/DbusManagerImpl.java index faf099e4..d00ca0f8 100644 --- a/src/main/java/org/asamk/signal/dbus/DbusManagerImpl.java +++ b/src/main/java/org/asamk/signal/dbus/DbusManagerImpl.java @@ -916,6 +916,14 @@ public class DbusManagerImpl implements Manager { } } + @Override + public void addUnidentifiedKeepAlive(final String token) { + } + + @Override + public void removeUnidentifiedKeepAlive(final String token) { + } + @Override public void addCallEventListener(final CallEventListener listener) { // Not supported over DBus diff --git a/src/main/java/org/asamk/signal/dbus/DbusSignalImpl.java b/src/main/java/org/asamk/signal/dbus/DbusSignalImpl.java index 2f4f881a..69a5924b 100644 --- a/src/main/java/org/asamk/signal/dbus/DbusSignalImpl.java +++ b/src/main/java/org/asamk/signal/dbus/DbusSignalImpl.java @@ -100,6 +100,7 @@ public class DbusSignalImpl implements Signal, AutoCloseable { public void initObjects() { exportObjects(); + m.addUnidentifiedKeepAlive("dbus"); if (!noReceiveOnStart) { subscribeReceive(); } @@ -116,6 +117,7 @@ public class DbusSignalImpl implements Signal, AutoCloseable { @Override public void close() { + m.removeUnidentifiedKeepAlive("dbus"); if (dbusMessageHandler != null) { m.removeReceiveHandler(dbusMessageHandler); dbusMessageHandler = null; diff --git a/src/main/java/org/asamk/signal/jsonrpc/SignalJsonRpcDispatcherHandler.java b/src/main/java/org/asamk/signal/jsonrpc/SignalJsonRpcDispatcherHandler.java index 14ff0076..b6c7c0b7 100644 --- a/src/main/java/org/asamk/signal/jsonrpc/SignalJsonRpcDispatcherHandler.java +++ b/src/main/java/org/asamk/signal/jsonrpc/SignalJsonRpcDispatcherHandler.java @@ -25,10 +25,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.channels.ClosedChannelException; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; @@ -43,6 +45,9 @@ public class SignalJsonRpcDispatcherHandler { private final Map>> receiveHandlers = new HashMap<>(); private final Map>> callEventHandlers = new HashMap<>(); + private final String connectionKeepAliveToken = "jsonrpc-" + UUID.randomUUID(); + private final List keepAliveManagers = new ArrayList<>(); + private boolean connectionActive = true; private SignalJsonRpcCommandHandler commandHandler; public SignalJsonRpcDispatcherHandler( @@ -69,6 +74,10 @@ public class SignalJsonRpcDispatcherHandler { c.addOnManagerAddedHandler(m -> callEventHandlers.forEach((subscriptionId, handlers) -> handlers.add( createCallEventHandler(m, subscriptionId)))); + c.getManagers().forEach(this::registerKeepAlive); + c.addOnManagerAddedHandler(this::registerKeepAlive); + c.addOnManagerRemovedHandler(this::unregisterKeepAlive); + handleConnection(); } @@ -82,6 +91,8 @@ public class SignalJsonRpcDispatcherHandler { final var currentThread = Thread.currentThread(); m.addClosedListener(currentThread::interrupt); + registerKeepAlive(m); + handleConnection(); } @@ -200,14 +211,29 @@ public class SignalJsonRpcDispatcherHandler { subscriptionId.ifPresent(this::unsubscribeReceive); } + private void registerKeepAlive(final Manager m) { + if (!connectionActive) return; + m.addUnidentifiedKeepAlive(connectionKeepAliveToken); + keepAliveManagers.add(m); + } + + private void unregisterKeepAlive(final Manager m) { + if (!connectionActive) return; + m.removeUnidentifiedKeepAlive(connectionKeepAliveToken); + keepAliveManagers.remove(m); + } + private void handleConnection() { try { jsonRpcReader.readMessages((method, params) -> commandHandler.handleRequest(objectMapper, method, params), response -> logger.debug("Received unexpected response for id {}", response.getId())); } finally { + connectionActive = false; receiveHandlers.forEach((_subscriptionId, handlers) -> handlers.forEach(this::unsubscribeReceiveHandler)); receiveHandlers.clear(); unsubscribeAllCallEvents(); + keepAliveManagers.forEach(m -> m.removeUnidentifiedKeepAlive(connectionKeepAliveToken)); + keepAliveManagers.clear(); } } diff --git a/src/test/java/org/asamk/signal/jsonrpc/SubscribeCallEventsTest.java b/src/test/java/org/asamk/signal/jsonrpc/SubscribeCallEventsTest.java index 37c00829..bbabf73a 100644 --- a/src/test/java/org/asamk/signal/jsonrpc/SubscribeCallEventsTest.java +++ b/src/test/java/org/asamk/signal/jsonrpc/SubscribeCallEventsTest.java @@ -496,6 +496,14 @@ class SubscribeCallEventsTest { public void addClosedListener(Runnable l) { } + @Override + public void addUnidentifiedKeepAlive(String token) { + } + + @Override + public void removeUnidentifiedKeepAlive(String token) { + } + @Override public InputStream retrieveAttachment(String id) { return null; @@ -741,8 +749,8 @@ class SubscribeCallEventsTest { assertEquals(1, manager1.addCount.get(), "manager1 should have one listener"); assertEquals(1, manager2.addCount.get(), "manager2 should have one listener"); - // Also registers an onManagerAdded handler for receive and one for call events - assertEquals(2, multi.addedHandlers.size(), "should register onManagerAdded handlers"); + // Registers onManagerAdded handlers for receive, call events, and keep-alive + assertEquals(3, multi.addedHandlers.size(), "should register onManagerAdded handlers"); } @Test