From 21a546d2bc5ad452d7bf0839a4d15fd081e185c2 Mon Sep 17 00:00:00 2001 From: AsamK Date: Thu, 18 Sep 2025 21:43:54 +0200 Subject: [PATCH] Refactor timeout handling in receive helper --- .../signal/manager/helper/ReceiveHelper.java | 27 ++++++++++++------- .../signal/manager/internal/ManagerImpl.java | 7 +++-- .../storage/keyValue/KeyValueStore.java | 1 - 3 files changed, 21 insertions(+), 14 deletions(-) diff --git a/lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java b/lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java index b12a9bdf..e326aea9 100644 --- a/lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java +++ b/lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java @@ -20,11 +20,13 @@ import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableExcept import java.io.IOException; import java.time.Duration; +import java.time.Instant; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeoutException; @@ -73,7 +75,7 @@ public class ReceiveHelper { public void receiveMessagesContinuously(Manager.ReceiveMessageHandler handler) { while (!shouldStop) { try { - receiveMessages(Duration.ofMinutes(1), false, null, handler); + receiveMessages(Optional.empty(), null, handler); break; } catch (IOException e) { logger.warn("Receiving messages failed, retrying", e); @@ -82,8 +84,7 @@ public class ReceiveHelper { } public void receiveMessages( - Duration timeout, - boolean returnOnTimeout, + Optional timeout, Integer maxMessages, Manager.ReceiveMessageHandler handler ) throws IOException { @@ -103,7 +104,7 @@ public class ReceiveHelper { signalWebSocket.registerKeepAliveToken("receive"); try { - receiveMessagesInternal(signalWebSocket, timeout, returnOnTimeout, maxMessages, handler, queuedActions); + receiveMessagesInternal(signalWebSocket, timeout, maxMessages, handler, queuedActions); } finally { hasCaughtUpWithOldMessages = false; handleQueuedActions(queuedActions.keySet()); @@ -117,16 +118,17 @@ public class ReceiveHelper { private void receiveMessagesInternal( final SignalWebSocket.AuthenticatedWebSocket signalWebSocket, - Duration timeout, - boolean returnOnTimeout, + Optional timeout, Integer maxMessages, Manager.ReceiveMessageHandler handler, final Map queuedActions ) throws IOException { + final var timeoutInstant = timeout.map(t -> Instant.now().plus(t)); int remainingMessages = maxMessages == null ? -1 : maxMessages; var backOffCounter = 0; isWaitingForMessage = false; + logger.debug("Start receiving messages"); while (!shouldStop && remainingMessages != 0) { if (account.getNeedsToRetryFailedMessages()) { retryFailedReceivedMessages(handler); @@ -137,10 +139,15 @@ public class ReceiveHelper { if (nowMillis - account.getLastReceiveTimestamp() > 4 * 60 * 60 * 1000) { account.setLastReceiveTimestamp(nowMillis); } - logger.debug("Checking for new message from server"); + logger.trace("Checking for new message from server"); try { isWaitingForMessage = true; - var queueNotEmpty = signalWebSocket.readMessageBatch(timeout.toMillis(), 1, batch -> { + final var timeoutMs = timeoutInstant.isPresent() ? Math.min(10_000, + Duration.between(Instant.now(), timeoutInstant.get()).toMillis()) : 10_000L; + if (timeoutMs <= 0L) { + return; + } + var queueNotEmpty = signalWebSocket.readMessageBatch(timeoutMs, 1, batch -> { logger.debug("Retrieved {} envelopes!", batch.size()); isWaitingForMessage = false; for (final var it : batch) { @@ -205,7 +212,9 @@ public class ReceiveHelper { throw e; } catch (TimeoutException e) { backOffCounter = 0; - if (returnOnTimeout) return; + if (timeoutInstant.isPresent() && timeoutInstant.get().isBefore(Instant.now())) { + return; + } continue; } catch (Exception e) { logger.error("Unknown error when receiving messages", e); diff --git a/lib/src/main/java/org/asamk/signal/manager/internal/ManagerImpl.java b/lib/src/main/java/org/asamk/signal/manager/internal/ManagerImpl.java index 47b44b16..31394cd5 100644 --- a/lib/src/main/java/org/asamk/signal/manager/internal/ManagerImpl.java +++ b/lib/src/main/java/org/asamk/signal/manager/internal/ManagerImpl.java @@ -1339,7 +1339,7 @@ public class ManagerImpl implements Manager { Optional maxMessages, ReceiveMessageHandler handler ) throws IOException, AlreadyReceivingException { - receiveMessages(timeout.orElse(Duration.ofMinutes(1)), timeout.isPresent(), maxMessages.orElse(null), handler); + receiveMessages(timeout, maxMessages.orElse(null), handler); } @Override @@ -1357,8 +1357,7 @@ public class ManagerImpl implements Manager { } private void receiveMessages( - Duration timeout, - boolean returnOnTimeout, + Optional timeout, Integer maxMessages, ReceiveMessageHandler handler ) throws IOException, AlreadyReceivingException { @@ -1370,7 +1369,7 @@ public class ManagerImpl implements Manager { receiveThread = Thread.currentThread(); } try { - context.getReceiveHelper().receiveMessages(timeout, returnOnTimeout, maxMessages, (envelope, e) -> { + context.getReceiveHelper().receiveMessages(timeout, maxMessages, (envelope, e) -> { passReceivedMessageToHandlers(envelope, e); handler.handleMessage(envelope, e); }); diff --git a/lib/src/main/java/org/asamk/signal/manager/storage/keyValue/KeyValueStore.java b/lib/src/main/java/org/asamk/signal/manager/storage/keyValue/KeyValueStore.java index 2f59c9d2..113aecd5 100644 --- a/lib/src/main/java/org/asamk/signal/manager/storage/keyValue/KeyValueStore.java +++ b/lib/src/main/java/org/asamk/signal/manager/storage/keyValue/KeyValueStore.java @@ -42,7 +42,6 @@ public class KeyValueStore { public T getEntry(KeyValueEntry key) { synchronized (cache) { if (cache.containsKey(key)) { - logger.trace("Got entry for key {} from cache", key.key()); return (T) cache.get(key); } }