Compare commits

...

12 Commits

Author SHA1 Message Date
Stefan Meinecke
8a8c495865
Merge 2cbff3a4c1d96da9a7f755da5be9603e026ebb9b into 4601e601182c466d60a6f8c88d4bbd0150b6bf9d 2026-05-17 12:32:40 +00:00
Stefan Meinecke
2cbff3a4c1 Add missing unidentified keep-alive methods to test stub
Implement addUnidentifiedKeepAlive and removeUnidentifiedKeepAlive in SseInitialFlushTest's Manager stub to match the updated interface.
2026-05-17 14:32:37 +02:00
Stefan Meinecke
540cfbd977 Tie unauthenticated WebSocket keep-alive to active client connections
Keep the unidentified socket alive while a JSON-RPC connection is open (including stdio mode) and while a D-Bus object is exported, instead of for the lifetime of the receive loop. The receive loop does not use the unauthenticated socket, so keeping it alive there was semantically wrong.

This also covers --receive-mode=manual, where no receive loop runs butclients still send messages.
2026-05-17 14:16:42 +02:00
Stefan Meinecke
97f77b1f69 Keep unauthenticated WebSocket alive during daemon receive loop
The unauthenticated (sealed sender) socket had no keep-alive token
registered, causing SignalWebSocket's DelayedDisconnectThread to tear
down the connection ~10s after each send. Every subsequent group message
then had to re-establish a fresh TLS connection (~6s delay).

The authenticated socket avoids this by registering a "receive" keep-alive
token for the lifetime of the receive loop. Apply the same pattern to the
unauthenticated socket: register the token alongside the authenticated one
and remove it in the same finally block.

This keeps the unidentified connection alive in daemon mode, matching the
behaviour of Signal mobile clients.
2026-05-17 14:16:01 +02:00
AsamK
4601e60118 Adapt containerfile to older apt versions 2026-05-16 11:14:17 +02:00
AsamK
fcf82b9318 Adapt containerfile to older apt versions 2026-05-16 10:48:12 +02:00
AsamK
9c8137fafa Update dependencies 2026-05-15 17:06:21 +02:00
AsamK
0a1531dcce Improve destination/source checks for incoming messages 2026-05-13 18:29:26 +02:00
AsamK
c10f618a3e Update gradle 2026-05-13 18:26:12 +02:00
AsamK
4a3d9d90a6 Update libsignal-service 2026-05-13 18:25:35 +02:00
AsamK
b4275414e1 Pass correct serviceId to SignalServiceCipher
Fixes #2036
2026-05-13 17:39:44 +02:00
Connor Lanigan
5f94b7b6d1
fix: Attempted immutable list modification causes runtime exception (#2038) 2026-05-05 10:15:21 +02:00
18 changed files with 170 additions and 95 deletions

View File

@ -15,7 +15,7 @@ jobs:
matrix:
# java="25" is the LTS Java version used in reproducible builds script (default in Containerfile).
# More Java versions can be added to test compatibility, eg. "26".
java: ["25"]
java: ["25", "26"]
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6

View File

@ -1,9 +1,9 @@
[versions]
slf4j = "2.0.17"
slf4j = "2.0.18"
junit = "6.0.3"
micronaut-json-schema = "2.0.0-M8"
micronaut-core = "4.9.3"
signal-service = "2.15.3_unofficial_145"
micronaut-json-schema = "2.0.0"
micronaut-core = "5.0.0"
signal-service = "2.15.3_unofficial_146"
[libraries]
bouncycastle = "org.bouncycastle:bcprov-jdk18on:1.84"
@ -20,7 +20,7 @@ slf4j-jul = { module = "org.slf4j:jul-to-slf4j", version.ref = "slf4j" }
logback = "ch.qos.logback:logback-classic:1.5.32"
signalnetwork = { module = "com.github.turasa:signal-network", version.ref = "signal-service" }
sqlite = "org.xerial:sqlite-jdbc:3.53.0.0"
sqlite = "org.xerial:sqlite-jdbc:3.53.1.0"
hikari = "com.zaxxer:HikariCP:7.0.2"
junit-jupiter-bom = { module = "org.junit:junit-bom", version.ref = "junit" }
junit-jupiter = { module = "org.junit.jupiter:junit-jupiter", version.ref = "junit" }

Binary file not shown.

View File

@ -1,7 +1,9 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-9.4.1-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-9.5.1-bin.zip
networkTimeout=10000
retries=0
retryBackOffMs=500
validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists

31
gradlew.bat vendored
View File

@ -23,8 +23,8 @@
@rem
@rem ##########################################################################
@rem Set local scope for the variables with windows NT shell
if "%OS%"=="Windows_NT" setlocal
@rem Set local scope for the variables, and ensure extensions are enabled
setlocal EnableExtensions
set DIRNAME=%~dp0
if "%DIRNAME%"=="" set DIRNAME=.
@ -51,7 +51,7 @@ echo. 1>&2
echo Please set the JAVA_HOME variable in your environment to match the 1>&2
echo location of your Java installation. 1>&2
goto fail
"%COMSPEC%" /c exit 1
:findJavaFromJavaHome
set JAVA_HOME=%JAVA_HOME:"=%
@ -65,7 +65,7 @@ echo. 1>&2
echo Please set the JAVA_HOME variable in your environment to match the 1>&2
echo location of your Java installation. 1>&2
goto fail
"%COMSPEC%" /c exit 1
:execute
@rem Setup the command line
@ -73,21 +73,10 @@ goto fail
@rem Execute Gradle
"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -jar "%APP_HOME%\gradle\wrapper\gradle-wrapper.jar" %*
@rem endlocal doesn't take effect until after the line is parsed and variables are expanded
@rem which allows us to clear the local environment before executing the java command
endlocal & "%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -jar "%APP_HOME%\gradle\wrapper\gradle-wrapper.jar" %* & call :exitWithErrorLevel
:end
@rem End local scope for the variables with windows NT shell
if %ERRORLEVEL% equ 0 goto mainEnd
:fail
rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of
rem the _cmd.exe /c_ return code!
set EXIT_CODE=%ERRORLEVEL%
if %EXIT_CODE% equ 0 set EXIT_CODE=1
if not ""=="%GRADLE_EXIT_CONSOLE%" exit %EXIT_CODE%
exit /b %EXIT_CODE%
:mainEnd
if "%OS%"=="Windows_NT" endlocal
:omega
:exitWithErrorLevel
@rem Use "%COMSPEC%" /c exit to allow operators to work properly in scripts
"%COMSPEC%" /c exit %ERRORLEVEL%

View File

@ -259,7 +259,7 @@ public interface Manager extends Closeable {
RecipientIdentifier.Single recipient
) throws IOException;
SendMessageResults sendEndSessionMessage(Set<RecipientIdentifier.Single> recipients) throws IOException;
void sendEndSessionMessage(Set<RecipientIdentifier.Single> recipients) throws IOException;
SendMessageResults sendMessageRequestResponse(
MessageEnvelope.Sync.MessageRequestResponse.Type type,
@ -407,6 +407,10 @@ public interface Manager extends Closeable {
void addClosedListener(Runnable listener);
void addUnidentifiedKeepAlive(String token);
void removeUnidentifiedKeepAlive(String token);
InputStream retrieveAttachment(final String id) throws IOException;
InputStream retrieveContactAvatar(final RecipientIdentifier.Single recipient) throws IOException, UnregisteredRecipientException;

View File

@ -157,7 +157,7 @@ public record MessageEnvelope(
dataMessage.getExpiresInSeconds(),
dataMessage.isExpirationUpdate(),
dataMessage.isViewOnce(),
dataMessage.isEndSession(),
false,
dataMessage.isProfileKeyUpdate(),
dataMessage.getProfileKey().isPresent(),
dataMessage.getReaction().map(r -> Reaction.from(r, recipientResolver, addressResolver)),
@ -1028,7 +1028,7 @@ public record MessageEnvelope(
final AttachmentFileProvider fileProvider,
Exception exception
) {
final var serviceId = envelope.getSourceServiceId().map(ServiceId::parseOrNull).orElse(null);
final var serviceId = envelope.getSourceServiceId();
final var source = !envelope.isUnidentifiedSender() && serviceId != null
? recipientResolver.resolveRecipient(serviceId)
: envelope.isUnidentifiedSender() && content != null

View File

@ -109,8 +109,8 @@ public final class IncomingMessageHandler {
SignalServiceContent content = null;
if (!envelope.isReceipt()) {
account.getIdentityKeyStore().setRetryingDecryption(true);
final var destination = getDestination(envelope).serviceId();
try {
final var destination = getDestination(envelope).serviceId();
final var cipherResult = dependencies.getCipher(destination == null
|| destination.equals(account.getAci()) ? ServiceIdType.ACI : ServiceIdType.PNI)
.decrypt(envelope.getProto(), envelope.getServerDeliveredTimestamp());
@ -140,15 +140,30 @@ public final class IncomingMessageHandler {
final Manager.ReceiveMessageHandler handler
) {
final var actions = new ArrayList<HandleAction>();
if (envelope.isPreKeySignalMessage()) {
actions.add(RefreshPreKeysAction.create());
}
SignalServiceContent content = null;
Exception exception = null;
envelope.getSourceServiceId().map(ServiceId::parseOrNull)
if (envelope.getSourceServiceId() != null) {
// Store uuid if we don't have it already
// uuid in envelope is sent by server
.ifPresent(serviceId -> account.getRecipientResolver().resolveRecipient(serviceId));
account.getRecipientResolver().resolveRecipient(envelope.getSourceServiceId());
}
if (!envelope.isReceipt()) {
final var destination = getDestination(envelope).serviceId();
try {
final var destination = getDestination(envelope).serviceId();
if (destination == account.getPni() && envelope.getSourceServiceId() == null) {
throw new InvalidMessageException(
"Got a sealed sender message to our PNI? Invalid message, ignoring.");
}
if (envelope.getSourceServiceId() instanceof ServiceId.PNI
&& envelope.getProto().type != Envelope.Type.SERVER_DELIVERY_RECEIPT) {
throw new InvalidMessageException("Got a message from a PNI that was not a SERVER_DELIVERY_RECEIPT.");
}
final var cipherResult = dependencies.getCipher(destination == null
|| destination.equals(account.getAci()) ? ServiceIdType.ACI : ServiceIdType.PNI)
.decrypt(envelope.getProto(), envelope.getServerDeliveredTimestamp());
@ -173,7 +188,13 @@ public final class IncomingMessageHandler {
logger.debug("Received invalid message from blocked contact, ignoring.");
} else {
var serviceId = ServiceId.parseOrNull(e.getSender());
if (serviceId != null) {
ServiceId destination;
try {
destination = getDestination(envelope).serviceId();
} catch (InvalidMessageException ex) {
destination = null;
}
if (serviceId != null && destination != null) {
final var isSelf = sender.equals(account.getSelfRecipientId())
&& e.getSenderDevice() == account.getDeviceId();
logger.debug("Received invalid message, queuing renew session action.");
@ -311,7 +332,12 @@ public final class IncomingMessageHandler {
final var sender = senderDeviceAddress.recipientId();
final var senderServiceId = senderDeviceAddress.serviceId();
final var senderDeviceId = senderDeviceAddress.deviceId();
final var destination = getDestination(envelope);
final DeviceAddress destination;
try {
destination = getDestination(envelope);
} catch (InvalidMessageException e) {
throw new AssertionError(e);
}
if (account.getPni().equals(destination.serviceId)) {
account.getRecipientStore().markNeedsPniSignature(destination.recipientId, true);
@ -874,11 +900,6 @@ public final class IncomingMessageHandler {
final var selfAddress = isSync ? source : destination;
final var conversationPartnerAddress = isSync ? destination : source;
if (conversationPartnerAddress != null && message.isEndSession()) {
account.getAccountData(selfAddress.serviceId())
.getSessionStore()
.deleteAllSessions(conversationPartnerAddress.serviceId());
}
if (message.isExpirationUpdate() || message.getBody().isPresent()) {
if (message.getGroupContext().isPresent()) {
final var groupContext = message.getGroupContext().get();
@ -1047,7 +1068,7 @@ public final class IncomingMessageHandler {
}
private SignalServiceAddress getSenderAddress(SignalServiceEnvelope envelope, SignalServiceContent content) {
final var serviceId = envelope.getSourceServiceId().map(ServiceId::parseOrNull).orElse(null);
final var serviceId = envelope.getSourceServiceId();
if (!envelope.isUnidentifiedSender() && serviceId != null) {
return new SignalServiceAddress(serviceId);
} else if (content != null) {
@ -1058,7 +1079,7 @@ public final class IncomingMessageHandler {
}
private DeviceAddress getSender(SignalServiceEnvelope envelope, SignalServiceContent content) {
final var serviceId = envelope.getSourceServiceId().map(ServiceId::parseOrNull).orElse(null);
final var serviceId = envelope.getSourceServiceId();
if (!envelope.isUnidentifiedSender() && serviceId != null) {
return new DeviceAddress(account.getRecipientResolver().resolveRecipient(serviceId),
serviceId,
@ -1070,10 +1091,13 @@ public final class IncomingMessageHandler {
}
}
private DeviceAddress getDestination(SignalServiceEnvelope envelope) {
private DeviceAddress getDestination(SignalServiceEnvelope envelope) throws InvalidMessageException {
final var destination = envelope.getDestinationServiceId();
if (destination == null || destination.isUnknown()) {
return new DeviceAddress(account.getSelfRecipientId(), account.getAci(), account.getDeviceId());
throw new InvalidMessageException("Missing destination");
}
if (!account.getAci().equals(destination) && !account.getPni().equals(destination)) {
throw new InvalidMessageException("Message not intended for this account");
}
return new DeviceAddress(account.getRecipientResolver().resolveRecipient(destination),
destination,

View File

@ -9,7 +9,6 @@ import org.asamk.signal.manager.jobs.CleanOldPreKeysJob;
import org.asamk.signal.manager.storage.SignalAccount;
import org.asamk.signal.manager.storage.messageCache.CachedMessage;
import org.asamk.signal.manager.storage.recipients.RecipientAddress;
import org.signal.core.models.ServiceId;
import org.signal.core.models.ServiceId.ACI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -150,10 +149,10 @@ public class ReceiveHelper {
for (final var it : batch) {
SignalServiceEnvelope envelope1 = new SignalServiceEnvelope(it.getEnvelope(),
it.getServerDeliveredTimestamp());
final var recipientId = envelope1.getSourceServiceId()
.map(ServiceId::parseOrNull)
.map(s -> account.getRecipientResolver().resolveRecipient(s))
.orElse(null);
final var sourceServiceId = envelope1.getSourceServiceId();
final var recipientId = sourceServiceId == null
? null
: account.getRecipientResolver().resolveRecipient(sourceServiceId);
logger.trace("Storing new message from {}", recipientId);
// store message on disk, before acknowledging receipt to the server
cachedMessage[0] = account.getMessageCache().cacheMessage(envelope1, recipientId);
@ -238,7 +237,7 @@ public class ReceiveHelper {
if (exception instanceof UntrustedIdentityException) {
logger.debug("Keeping message with untrusted identity in message cache");
final var address = ((UntrustedIdentityException) exception).getSender();
if (envelope.getSourceServiceId().isEmpty() && address.aci().isPresent()) {
if (envelope.getSourceServiceId() == null && address.aci().isPresent()) {
final var recipientId = account.getRecipientResolver()
.resolveRecipient(ACI.parseOrThrow(address.aci().get()));
try {
@ -292,7 +291,7 @@ public class ReceiveHelper {
cachedMessage.delete();
return null;
}
if (envelope.getSourceServiceId().isEmpty()) {
if (envelope.getSourceServiceId() == null) {
final var identifier = ((UntrustedIdentityException) exception).getSender();
final var recipientId = account.getRecipientResolver()
.resolveRecipient(new RecipientAddress(identifier));

View File

@ -1091,16 +1091,7 @@ public class ManagerImpl implements Manager {
}
@Override
public SendMessageResults sendEndSessionMessage(Set<RecipientIdentifier.Single> recipients) throws IOException {
var messageBuilder = SignalServiceDataMessage.newBuilder().asEndSessionMessage();
try {
return sendMessage(messageBuilder,
recipients.stream().map(RecipientIdentifier.class::cast).collect(Collectors.toSet()),
false);
} catch (GroupNotFoundException | NotAGroupMemberException | GroupSendingNotAllowedException e) {
throw new AssertionError(e);
} finally {
public void sendEndSessionMessage(Set<RecipientIdentifier.Single> recipients) throws IOException {
for (var recipient : recipients) {
final RecipientId recipientId;
try {
@ -1108,13 +1099,18 @@ public class ManagerImpl implements Manager {
} catch (UnregisteredRecipientException e) {
continue;
}
final var serviceId = context.getAccount()
final var recipientAddress = context.getAccount()
.getRecipientAddressResolver()
.resolveRecipientAddress(recipientId)
.serviceId();
if (serviceId.isPresent()) {
account.getAccountData(ServiceIdType.ACI).getSessionStore().deleteAllSessions(serviceId.get());
.resolveRecipientAddress(recipientId);
final var aciSessionStore = account.getAccountData(ServiceIdType.ACI).getSessionStore();
final var pniSessionStore = account.getAccountData(ServiceIdType.PNI).getSessionStore();
if (recipientAddress.aci().isPresent()) {
aciSessionStore.archiveSessions(recipientAddress.aci().get());
pniSessionStore.archiveSessions(recipientAddress.aci().get());
}
if (recipientAddress.pni().isPresent()) {
aciSessionStore.archiveSessions(recipientAddress.pni().get());
pniSessionStore.archiveSessions(recipientAddress.pni().get());
}
}
}
@ -1712,6 +1708,16 @@ public class ManagerImpl implements Manager {
}
}
@Override
public void addUnidentifiedKeepAlive(final String token) {
dependencies.getUnauthenticatedSignalWebSocket().registerKeepAliveToken(token);
}
@Override
public void removeUnidentifiedKeepAlive(final String token) {
dependencies.getUnauthenticatedSignalWebSocket().removeKeepAliveToken(token);
}
@Override
public void addCallEventListener(final CallEventListener listener) {
context.getCallManager().addCallEventListener(listener);

View File

@ -368,7 +368,8 @@ public class SignalDependencies {
public SignalServiceCipher getCipher(ServiceIdType serviceIdType) {
final var certificateValidator = new CertificateValidator(serviceEnvironmentConfig.unidentifiedSenderTrustRoots());
final var address = new SignalServiceAddress(credentialsProvider.getAci(), credentialsProvider.getE164());
final var serviceId = serviceIdType == ServiceIdType.ACI ? credentialsProvider.getAci() : credentialsProvider.getPni();
final var address = new SignalServiceAddress(serviceId, credentialsProvider.getE164());
final var deviceId = credentialsProvider.getDeviceId();
return new SignalServiceCipher(address,
deviceId,

View File

@ -6,7 +6,7 @@ ENV SOURCE_DATE_EPOCH=$SOURCE_DATE_EPOCH
ENV LANG=C.UTF-8
ENV LC_CTYPE=en_US.UTF-8
RUN SNAPSHOT="$(date -u -d "@$SOURCE_DATE_EPOCH" +%Y%m%dT%H%M%SZ)" \
&& apt install -y make asciidoc-base --update --snapshot "$SNAPSHOT" --no-install-recommends --no-install-suggests
&& sed -i 's/^deb /deb [snapshot=yes] /' /etc/apt/sources.list && apt update --snapshot "$SNAPSHOT" && apt install -y make asciidoc-base --snapshot "$SNAPSHOT" --no-install-recommends --no-install-suggests
COPY --chmod=0700 reproducible-builds/entrypoint.sh /usr/local/bin/entrypoint.sh
WORKDIR /signal-cli
ENTRYPOINT [ "/usr/local/bin/entrypoint.sh", "build" ]

View File

@ -144,8 +144,7 @@ public class SendCommand implements JsonRpcLocalCommand {
}
try {
final var results = m.sendEndSessionMessage(singleRecipients);
outputResult(outputWriter, results);
m.sendEndSessionMessage(singleRecipients);
return;
} catch (IOException e) {
throw new UnexpectedErrorException("Failed to send message: " + e.getMessage() + " (" + e.getClass()

View File

@ -543,9 +543,8 @@ public class DbusManagerImpl implements Manager {
}
@Override
public SendMessageResults sendEndSessionMessage(final Set<RecipientIdentifier.Single> recipients) throws IOException {
public void sendEndSessionMessage(final Set<RecipientIdentifier.Single> recipients) throws IOException {
signal.sendEndSessionMessage(recipients.stream().map(RecipientIdentifier.Single::getIdentifier).toList());
return new SendMessageResults(0, Map.of());
}
@Override
@ -916,6 +915,14 @@ public class DbusManagerImpl implements Manager {
}
}
@Override
public void addUnidentifiedKeepAlive(final String token) {
}
@Override
public void removeUnidentifiedKeepAlive(final String token) {
}
@Override
public void addCallEventListener(final CallEventListener listener) {
// Not supported over DBus

View File

@ -100,6 +100,7 @@ public class DbusSignalImpl implements Signal, AutoCloseable {
public void initObjects() {
exportObjects();
m.addUnidentifiedKeepAlive("dbus");
if (!noReceiveOnStart) {
subscribeReceive();
}
@ -116,6 +117,7 @@ public class DbusSignalImpl implements Signal, AutoCloseable {
@Override
public void close() {
m.removeUnidentifiedKeepAlive("dbus");
if (dbusMessageHandler != null) {
m.removeReceiveHandler(dbusMessageHandler);
dbusMessageHandler = null;
@ -430,8 +432,7 @@ public class DbusSignalImpl implements Signal, AutoCloseable {
@Override
public void sendEndSessionMessage(final List<String> recipients) {
try {
final var results = m.sendEndSessionMessage(getSingleRecipientIdentifiers(recipients, m.getSelfNumber()));
checkSendMessageResults(results);
m.sendEndSessionMessage(getSingleRecipientIdentifiers(recipients, m.getSelfNumber()));
} catch (IOException e) {
throw new Error.Failure(e.getMessage());
}

View File

@ -25,12 +25,15 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
public class SignalJsonRpcDispatcherHandler {
@ -43,6 +46,9 @@ public class SignalJsonRpcDispatcherHandler {
private final Map<Integer, List<Pair<Manager, Manager.ReceiveMessageHandler>>> receiveHandlers = new HashMap<>();
private final Map<Integer, List<Pair<Manager, Manager.CallEventListener>>> callEventHandlers = new HashMap<>();
private final String connectionKeepAliveToken = "jsonrpc-" + UUID.randomUUID();
private final List<Manager> keepAliveManagers = new ArrayList<>();
private boolean connectionActive = true;
private SignalJsonRpcCommandHandler commandHandler;
public SignalJsonRpcDispatcherHandler(
@ -69,6 +75,10 @@ public class SignalJsonRpcDispatcherHandler {
c.addOnManagerAddedHandler(m -> callEventHandlers.forEach((subscriptionId, handlers) -> handlers.add(
createCallEventHandler(m, subscriptionId))));
c.getManagers().forEach(this::registerKeepAlive);
c.addOnManagerAddedHandler(this::registerKeepAlive);
c.addOnManagerRemovedHandler(this::unregisterKeepAlive);
handleConnection();
}
@ -82,6 +92,8 @@ public class SignalJsonRpcDispatcherHandler {
final var currentThread = Thread.currentThread();
m.addClosedListener(currentThread::interrupt);
registerKeepAlive(m);
handleConnection();
}
@ -91,7 +103,9 @@ public class SignalJsonRpcDispatcherHandler {
private int subscribeCallEvents(final Collection<Manager> managers) {
final var subscriptionId = nextSubscriptionId.getAndIncrement();
final var listeners = managers.stream().map(m -> createCallEventHandler(m, subscriptionId)).toList();
final var listeners = managers.stream()
.map(m -> createCallEventHandler(m, subscriptionId))
.collect(Collectors.toCollection(ArrayList::new));
callEventHandlers.put(subscriptionId, listeners);
return subscriptionId;
}
@ -146,7 +160,7 @@ public class SignalJsonRpcDispatcherHandler {
final var subscriptionId = nextSubscriptionId.getAndIncrement();
final var handlers = managers.stream()
.map(m -> createReceiveHandler(m, subscriptionId, internalSubscription))
.toList();
.collect(Collectors.toCollection(ArrayList::new));
receiveHandlers.put(subscriptionId, handlers);
return subscriptionId;
@ -200,14 +214,29 @@ public class SignalJsonRpcDispatcherHandler {
subscriptionId.ifPresent(this::unsubscribeReceive);
}
private void registerKeepAlive(final Manager m) {
if (!connectionActive) return;
m.addUnidentifiedKeepAlive(connectionKeepAliveToken);
keepAliveManagers.add(m);
}
private void unregisterKeepAlive(final Manager m) {
if (!connectionActive) return;
m.removeUnidentifiedKeepAlive(connectionKeepAliveToken);
keepAliveManagers.remove(m);
}
private void handleConnection() {
try {
jsonRpcReader.readMessages((method, params) -> commandHandler.handleRequest(objectMapper, method, params),
response -> logger.debug("Received unexpected response for id {}", response.getId()));
} finally {
connectionActive = false;
receiveHandlers.forEach((_subscriptionId, handlers) -> handlers.forEach(this::unsubscribeReceiveHandler));
receiveHandlers.clear();
unsubscribeAllCallEvents();
keepAliveManagers.forEach(m -> m.removeUnidentifiedKeepAlive(connectionKeepAliveToken));
keepAliveManagers.clear();
}
}

View File

@ -326,8 +326,7 @@ class SseInitialFlushTest {
}
@Override
public SendMessageResults sendEndSessionMessage(Set<RecipientIdentifier.Single> recipients) {
return null;
public void sendEndSessionMessage(Set<RecipientIdentifier.Single> recipients) {
}
@Override
@ -448,6 +447,14 @@ class SseInitialFlushTest {
public void addClosedListener(Runnable listener) {
}
@Override
public void addUnidentifiedKeepAlive(String token) {
}
@Override
public void removeUnidentifiedKeepAlive(String token) {
}
@Override
public InputStream retrieveAttachment(String id) {
return null;

View File

@ -328,8 +328,7 @@ class SubscribeCallEventsTest {
}
@Override
public SendMessageResults sendEndSessionMessage(Set<RecipientIdentifier.Single> r) {
return null;
public void sendEndSessionMessage(Set<RecipientIdentifier.Single> r) {
}
@Override
@ -496,6 +495,14 @@ class SubscribeCallEventsTest {
public void addClosedListener(Runnable l) {
}
@Override
public void addUnidentifiedKeepAlive(String token) {
}
@Override
public void removeUnidentifiedKeepAlive(String token) {
}
@Override
public InputStream retrieveAttachment(String id) {
return null;
@ -741,8 +748,8 @@ class SubscribeCallEventsTest {
assertEquals(1, manager1.addCount.get(), "manager1 should have one listener");
assertEquals(1, manager2.addCount.get(), "manager2 should have one listener");
// Also registers an onManagerAdded handler for receive and one for call events
assertEquals(2, multi.addedHandlers.size(), "should register onManagerAdded handlers");
// Registers onManagerAdded handlers for receive, call events, and keep-alive
assertEquals(3, multi.addedHandlers.size(), "should register onManagerAdded handlers");
}
@Test