Tie unauthenticated WebSocket keep-alive to active client connections

Keep the unidentified socket alive while a JSON-RPC connection is open (including stdio mode) and while a D-Bus object is exported, instead of for the lifetime of the receive loop. The receive loop does not use the unauthenticated socket, so keeping it alive there was semantically wrong.

This also covers --receive-mode=manual, where no receive loop runs butclients still send messages.
This commit is contained in:
Stefan Meinecke 2026-04-17 11:49:05 +02:00
parent 72332750a8
commit 0006fd0dc0
7 changed files with 60 additions and 6 deletions

View File

@ -407,6 +407,10 @@ public interface Manager extends Closeable {
void addClosedListener(Runnable listener); void addClosedListener(Runnable listener);
void addUnidentifiedKeepAlive(String token);
void removeUnidentifiedKeepAlive(String token);
InputStream retrieveAttachment(final String id) throws IOException; InputStream retrieveAttachment(final String id) throws IOException;
InputStream retrieveContactAvatar(final RecipientIdentifier.Single recipient) throws IOException, UnregisteredRecipientException; InputStream retrieveContactAvatar(final RecipientIdentifier.Single recipient) throws IOException, UnregisteredRecipientException;

View File

@ -102,9 +102,6 @@ public class ReceiveHelper {
signalWebSocket.connect(); signalWebSocket.connect();
signalWebSocket.registerKeepAliveToken("receive"); signalWebSocket.registerKeepAliveToken("receive");
final var unauthenticatedSignalWebSocket = dependencies.getUnauthenticatedSignalWebSocket();
unauthenticatedSignalWebSocket.registerKeepAliveToken("receive");
try { try {
receiveMessagesInternal(signalWebSocket, timeout, maxMessages, handler, queuedActions); receiveMessagesInternal(signalWebSocket, timeout, maxMessages, handler, queuedActions);
} finally { } finally {
@ -113,7 +110,6 @@ public class ReceiveHelper {
queuedActions.clear(); queuedActions.clear();
signalWebSocket.removeKeepAliveToken("receive"); signalWebSocket.removeKeepAliveToken("receive");
signalWebSocket.disconnect(); signalWebSocket.disconnect();
unauthenticatedSignalWebSocket.removeKeepAliveToken("receive");
webSocketStateDisposable.dispose(); webSocketStateDisposable.dispose();
shouldStop = false; shouldStop = false;
} }

View File

@ -1712,6 +1712,16 @@ public class ManagerImpl implements Manager {
} }
} }
@Override
public void addUnidentifiedKeepAlive(final String token) {
dependencies.getUnauthenticatedSignalWebSocket().registerKeepAliveToken(token);
}
@Override
public void removeUnidentifiedKeepAlive(final String token) {
dependencies.getUnauthenticatedSignalWebSocket().removeKeepAliveToken(token);
}
@Override @Override
public void addCallEventListener(final CallEventListener listener) { public void addCallEventListener(final CallEventListener listener) {
context.getCallManager().addCallEventListener(listener); context.getCallManager().addCallEventListener(listener);

View File

@ -916,6 +916,14 @@ public class DbusManagerImpl implements Manager {
} }
} }
@Override
public void addUnidentifiedKeepAlive(final String token) {
}
@Override
public void removeUnidentifiedKeepAlive(final String token) {
}
@Override @Override
public void addCallEventListener(final CallEventListener listener) { public void addCallEventListener(final CallEventListener listener) {
// Not supported over DBus // Not supported over DBus

View File

@ -100,6 +100,7 @@ public class DbusSignalImpl implements Signal, AutoCloseable {
public void initObjects() { public void initObjects() {
exportObjects(); exportObjects();
m.addUnidentifiedKeepAlive("dbus");
if (!noReceiveOnStart) { if (!noReceiveOnStart) {
subscribeReceive(); subscribeReceive();
} }
@ -116,6 +117,7 @@ public class DbusSignalImpl implements Signal, AutoCloseable {
@Override @Override
public void close() { public void close() {
m.removeUnidentifiedKeepAlive("dbus");
if (dbusMessageHandler != null) { if (dbusMessageHandler != null) {
m.removeReceiveHandler(dbusMessageHandler); m.removeReceiveHandler(dbusMessageHandler);
dbusMessageHandler = null; dbusMessageHandler = null;

View File

@ -25,10 +25,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier; import java.util.function.Supplier;
@ -43,6 +45,9 @@ public class SignalJsonRpcDispatcherHandler {
private final Map<Integer, List<Pair<Manager, Manager.ReceiveMessageHandler>>> receiveHandlers = new HashMap<>(); private final Map<Integer, List<Pair<Manager, Manager.ReceiveMessageHandler>>> receiveHandlers = new HashMap<>();
private final Map<Integer, List<Pair<Manager, Manager.CallEventListener>>> callEventHandlers = new HashMap<>(); private final Map<Integer, List<Pair<Manager, Manager.CallEventListener>>> callEventHandlers = new HashMap<>();
private final String connectionKeepAliveToken = "jsonrpc-" + UUID.randomUUID();
private final List<Manager> keepAliveManagers = new ArrayList<>();
private boolean connectionActive = true;
private SignalJsonRpcCommandHandler commandHandler; private SignalJsonRpcCommandHandler commandHandler;
public SignalJsonRpcDispatcherHandler( public SignalJsonRpcDispatcherHandler(
@ -69,6 +74,10 @@ public class SignalJsonRpcDispatcherHandler {
c.addOnManagerAddedHandler(m -> callEventHandlers.forEach((subscriptionId, handlers) -> handlers.add( c.addOnManagerAddedHandler(m -> callEventHandlers.forEach((subscriptionId, handlers) -> handlers.add(
createCallEventHandler(m, subscriptionId)))); createCallEventHandler(m, subscriptionId))));
c.getManagers().forEach(this::registerKeepAlive);
c.addOnManagerAddedHandler(this::registerKeepAlive);
c.addOnManagerRemovedHandler(this::unregisterKeepAlive);
handleConnection(); handleConnection();
} }
@ -82,6 +91,8 @@ public class SignalJsonRpcDispatcherHandler {
final var currentThread = Thread.currentThread(); final var currentThread = Thread.currentThread();
m.addClosedListener(currentThread::interrupt); m.addClosedListener(currentThread::interrupt);
registerKeepAlive(m);
handleConnection(); handleConnection();
} }
@ -200,14 +211,29 @@ public class SignalJsonRpcDispatcherHandler {
subscriptionId.ifPresent(this::unsubscribeReceive); subscriptionId.ifPresent(this::unsubscribeReceive);
} }
private void registerKeepAlive(final Manager m) {
if (!connectionActive) return;
m.addUnidentifiedKeepAlive(connectionKeepAliveToken);
keepAliveManagers.add(m);
}
private void unregisterKeepAlive(final Manager m) {
if (!connectionActive) return;
m.removeUnidentifiedKeepAlive(connectionKeepAliveToken);
keepAliveManagers.remove(m);
}
private void handleConnection() { private void handleConnection() {
try { try {
jsonRpcReader.readMessages((method, params) -> commandHandler.handleRequest(objectMapper, method, params), jsonRpcReader.readMessages((method, params) -> commandHandler.handleRequest(objectMapper, method, params),
response -> logger.debug("Received unexpected response for id {}", response.getId())); response -> logger.debug("Received unexpected response for id {}", response.getId()));
} finally { } finally {
connectionActive = false;
receiveHandlers.forEach((_subscriptionId, handlers) -> handlers.forEach(this::unsubscribeReceiveHandler)); receiveHandlers.forEach((_subscriptionId, handlers) -> handlers.forEach(this::unsubscribeReceiveHandler));
receiveHandlers.clear(); receiveHandlers.clear();
unsubscribeAllCallEvents(); unsubscribeAllCallEvents();
keepAliveManagers.forEach(m -> m.removeUnidentifiedKeepAlive(connectionKeepAliveToken));
keepAliveManagers.clear();
} }
} }

View File

@ -496,6 +496,14 @@ class SubscribeCallEventsTest {
public void addClosedListener(Runnable l) { public void addClosedListener(Runnable l) {
} }
@Override
public void addUnidentifiedKeepAlive(String token) {
}
@Override
public void removeUnidentifiedKeepAlive(String token) {
}
@Override @Override
public InputStream retrieveAttachment(String id) { public InputStream retrieveAttachment(String id) {
return null; return null;
@ -741,8 +749,8 @@ class SubscribeCallEventsTest {
assertEquals(1, manager1.addCount.get(), "manager1 should have one listener"); assertEquals(1, manager1.addCount.get(), "manager1 should have one listener");
assertEquals(1, manager2.addCount.get(), "manager2 should have one listener"); assertEquals(1, manager2.addCount.get(), "manager2 should have one listener");
// Also registers an onManagerAdded handler for receive and one for call events // Registers onManagerAdded handlers for receive, call events, and keep-alive
assertEquals(2, multi.addedHandlers.size(), "should register onManagerAdded handlers"); assertEquals(3, multi.addedHandlers.size(), "should register onManagerAdded handlers");
} }
@Test @Test