Merge 2cbff3a4c1d96da9a7f755da5be9603e026ebb9b into 4601e601182c466d60a6f8c88d4bbd0150b6bf9d

This commit is contained in:
Stefan Meinecke 2026-05-17 12:32:40 +00:00 committed by GitHub
commit 8a8c495865
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 69 additions and 4 deletions

View File

@ -407,6 +407,10 @@ public interface Manager extends Closeable {
void addClosedListener(Runnable listener);
void addUnidentifiedKeepAlive(String token);
void removeUnidentifiedKeepAlive(String token);
InputStream retrieveAttachment(final String id) throws IOException;
InputStream retrieveContactAvatar(final RecipientIdentifier.Single recipient) throws IOException, UnregisteredRecipientException;

View File

@ -1708,6 +1708,16 @@ public class ManagerImpl implements Manager {
}
}
@Override
public void addUnidentifiedKeepAlive(final String token) {
dependencies.getUnauthenticatedSignalWebSocket().registerKeepAliveToken(token);
}
@Override
public void removeUnidentifiedKeepAlive(final String token) {
dependencies.getUnauthenticatedSignalWebSocket().removeKeepAliveToken(token);
}
@Override
public void addCallEventListener(final CallEventListener listener) {
context.getCallManager().addCallEventListener(listener);

View File

@ -915,6 +915,14 @@ public class DbusManagerImpl implements Manager {
}
}
@Override
public void addUnidentifiedKeepAlive(final String token) {
}
@Override
public void removeUnidentifiedKeepAlive(final String token) {
}
@Override
public void addCallEventListener(final CallEventListener listener) {
// Not supported over DBus

View File

@ -100,6 +100,7 @@ public class DbusSignalImpl implements Signal, AutoCloseable {
public void initObjects() {
exportObjects();
m.addUnidentifiedKeepAlive("dbus");
if (!noReceiveOnStart) {
subscribeReceive();
}
@ -116,6 +117,7 @@ public class DbusSignalImpl implements Signal, AutoCloseable {
@Override
public void close() {
m.removeUnidentifiedKeepAlive("dbus");
if (dbusMessageHandler != null) {
m.removeReceiveHandler(dbusMessageHandler);
dbusMessageHandler = null;

View File

@ -30,6 +30,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@ -43,8 +44,11 @@ public class SignalJsonRpcDispatcherHandler {
private final JsonRpcReader jsonRpcReader;
private final boolean noReceiveOnStart;
private final Map<Integer, ArrayList<Pair<Manager, Manager.ReceiveMessageHandler>>> receiveHandlers = new HashMap<>();
private final Map<Integer, ArrayList<Pair<Manager, Manager.CallEventListener>>> callEventHandlers = new HashMap<>();
private final Map<Integer, List<Pair<Manager, Manager.ReceiveMessageHandler>>> receiveHandlers = new HashMap<>();
private final Map<Integer, List<Pair<Manager, Manager.CallEventListener>>> callEventHandlers = new HashMap<>();
private final String connectionKeepAliveToken = "jsonrpc-" + UUID.randomUUID();
private final List<Manager> keepAliveManagers = new ArrayList<>();
private boolean connectionActive = true;
private SignalJsonRpcCommandHandler commandHandler;
public SignalJsonRpcDispatcherHandler(
@ -71,6 +75,10 @@ public class SignalJsonRpcDispatcherHandler {
c.addOnManagerAddedHandler(m -> callEventHandlers.forEach((subscriptionId, handlers) -> handlers.add(
createCallEventHandler(m, subscriptionId))));
c.getManagers().forEach(this::registerKeepAlive);
c.addOnManagerAddedHandler(this::registerKeepAlive);
c.addOnManagerRemovedHandler(this::unregisterKeepAlive);
handleConnection();
}
@ -84,6 +92,8 @@ public class SignalJsonRpcDispatcherHandler {
final var currentThread = Thread.currentThread();
m.addClosedListener(currentThread::interrupt);
registerKeepAlive(m);
handleConnection();
}
@ -204,14 +214,29 @@ public class SignalJsonRpcDispatcherHandler {
subscriptionId.ifPresent(this::unsubscribeReceive);
}
private void registerKeepAlive(final Manager m) {
if (!connectionActive) return;
m.addUnidentifiedKeepAlive(connectionKeepAliveToken);
keepAliveManagers.add(m);
}
private void unregisterKeepAlive(final Manager m) {
if (!connectionActive) return;
m.removeUnidentifiedKeepAlive(connectionKeepAliveToken);
keepAliveManagers.remove(m);
}
private void handleConnection() {
try {
jsonRpcReader.readMessages((method, params) -> commandHandler.handleRequest(objectMapper, method, params),
response -> logger.debug("Received unexpected response for id {}", response.getId()));
} finally {
connectionActive = false;
receiveHandlers.forEach((_subscriptionId, handlers) -> handlers.forEach(this::unsubscribeReceiveHandler));
receiveHandlers.clear();
unsubscribeAllCallEvents();
keepAliveManagers.forEach(m -> m.removeUnidentifiedKeepAlive(connectionKeepAliveToken));
keepAliveManagers.clear();
}
}

View File

@ -447,6 +447,14 @@ class SseInitialFlushTest {
public void addClosedListener(Runnable listener) {
}
@Override
public void addUnidentifiedKeepAlive(String token) {
}
@Override
public void removeUnidentifiedKeepAlive(String token) {
}
@Override
public InputStream retrieveAttachment(String id) {
return null;

View File

@ -495,6 +495,14 @@ class SubscribeCallEventsTest {
public void addClosedListener(Runnable l) {
}
@Override
public void addUnidentifiedKeepAlive(String token) {
}
@Override
public void removeUnidentifiedKeepAlive(String token) {
}
@Override
public InputStream retrieveAttachment(String id) {
return null;
@ -740,8 +748,8 @@ class SubscribeCallEventsTest {
assertEquals(1, manager1.addCount.get(), "manager1 should have one listener");
assertEquals(1, manager2.addCount.get(), "manager2 should have one listener");
// Also registers an onManagerAdded handler for receive and one for call events
assertEquals(2, multi.addedHandlers.size(), "should register onManagerAdded handlers");
// Registers onManagerAdded handlers for receive, call events, and keep-alive
assertEquals(3, multi.addedHandlers.size(), "should register onManagerAdded handlers");
}
@Test