Merge 67105e14c19d9abf09a3ab133bd43da13662a7d6 into dc43e4402050fc774ef2f420e1f7578da0290e17

This commit is contained in:
Stefan Meinecke 2026-04-29 22:53:07 +02:00 committed by GitHub
commit c1c827d2d3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 60 additions and 2 deletions

View File

@ -407,6 +407,10 @@ public interface Manager extends Closeable {
void addClosedListener(Runnable listener);
void addUnidentifiedKeepAlive(String token);
void removeUnidentifiedKeepAlive(String token);
InputStream retrieveAttachment(final String id) throws IOException;
InputStream retrieveContactAvatar(final RecipientIdentifier.Single recipient) throws IOException, UnregisteredRecipientException;

View File

@ -1712,6 +1712,16 @@ public class ManagerImpl implements Manager {
}
}
@Override
public void addUnidentifiedKeepAlive(final String token) {
dependencies.getUnauthenticatedSignalWebSocket().registerKeepAliveToken(token);
}
@Override
public void removeUnidentifiedKeepAlive(final String token) {
dependencies.getUnauthenticatedSignalWebSocket().removeKeepAliveToken(token);
}
@Override
public void addCallEventListener(final CallEventListener listener) {
context.getCallManager().addCallEventListener(listener);

View File

@ -916,6 +916,14 @@ public class DbusManagerImpl implements Manager {
}
}
@Override
public void addUnidentifiedKeepAlive(final String token) {
}
@Override
public void removeUnidentifiedKeepAlive(final String token) {
}
@Override
public void addCallEventListener(final CallEventListener listener) {
// Not supported over DBus

View File

@ -100,6 +100,7 @@ public class DbusSignalImpl implements Signal, AutoCloseable {
public void initObjects() {
exportObjects();
m.addUnidentifiedKeepAlive("dbus");
if (!noReceiveOnStart) {
subscribeReceive();
}
@ -116,6 +117,7 @@ public class DbusSignalImpl implements Signal, AutoCloseable {
@Override
public void close() {
m.removeUnidentifiedKeepAlive("dbus");
if (dbusMessageHandler != null) {
m.removeReceiveHandler(dbusMessageHandler);
dbusMessageHandler = null;

View File

@ -25,10 +25,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
@ -43,6 +45,9 @@ public class SignalJsonRpcDispatcherHandler {
private final Map<Integer, List<Pair<Manager, Manager.ReceiveMessageHandler>>> receiveHandlers = new HashMap<>();
private final Map<Integer, List<Pair<Manager, Manager.CallEventListener>>> callEventHandlers = new HashMap<>();
private final String connectionKeepAliveToken = "jsonrpc-" + UUID.randomUUID();
private final List<Manager> keepAliveManagers = new ArrayList<>();
private boolean connectionActive = true;
private SignalJsonRpcCommandHandler commandHandler;
public SignalJsonRpcDispatcherHandler(
@ -69,6 +74,10 @@ public class SignalJsonRpcDispatcherHandler {
c.addOnManagerAddedHandler(m -> callEventHandlers.forEach((subscriptionId, handlers) -> handlers.add(
createCallEventHandler(m, subscriptionId))));
c.getManagers().forEach(this::registerKeepAlive);
c.addOnManagerAddedHandler(this::registerKeepAlive);
c.addOnManagerRemovedHandler(this::unregisterKeepAlive);
handleConnection();
}
@ -82,6 +91,8 @@ public class SignalJsonRpcDispatcherHandler {
final var currentThread = Thread.currentThread();
m.addClosedListener(currentThread::interrupt);
registerKeepAlive(m);
handleConnection();
}
@ -200,14 +211,29 @@ public class SignalJsonRpcDispatcherHandler {
subscriptionId.ifPresent(this::unsubscribeReceive);
}
private void registerKeepAlive(final Manager m) {
if (!connectionActive) return;
m.addUnidentifiedKeepAlive(connectionKeepAliveToken);
keepAliveManagers.add(m);
}
private void unregisterKeepAlive(final Manager m) {
if (!connectionActive) return;
m.removeUnidentifiedKeepAlive(connectionKeepAliveToken);
keepAliveManagers.remove(m);
}
private void handleConnection() {
try {
jsonRpcReader.readMessages((method, params) -> commandHandler.handleRequest(objectMapper, method, params),
response -> logger.debug("Received unexpected response for id {}", response.getId()));
} finally {
connectionActive = false;
receiveHandlers.forEach((_subscriptionId, handlers) -> handlers.forEach(this::unsubscribeReceiveHandler));
receiveHandlers.clear();
unsubscribeAllCallEvents();
keepAliveManagers.forEach(m -> m.removeUnidentifiedKeepAlive(connectionKeepAliveToken));
keepAliveManagers.clear();
}
}

View File

@ -496,6 +496,14 @@ class SubscribeCallEventsTest {
public void addClosedListener(Runnable l) {
}
@Override
public void addUnidentifiedKeepAlive(String token) {
}
@Override
public void removeUnidentifiedKeepAlive(String token) {
}
@Override
public InputStream retrieveAttachment(String id) {
return null;
@ -741,8 +749,8 @@ class SubscribeCallEventsTest {
assertEquals(1, manager1.addCount.get(), "manager1 should have one listener");
assertEquals(1, manager2.addCount.get(), "manager2 should have one listener");
// Also registers an onManagerAdded handler for receive and one for call events
assertEquals(2, multi.addedHandlers.size(), "should register onManagerAdded handlers");
// Registers onManagerAdded handlers for receive, call events, and keep-alive
assertEquals(3, multi.addedHandlers.size(), "should register onManagerAdded handlers");
}
@Test