Refactor timeout handling in receive helper

This commit is contained in:
AsamK 2025-09-18 21:43:54 +02:00
parent e65fee0efb
commit 21a546d2bc
3 changed files with 21 additions and 14 deletions

View File

@ -20,11 +20,13 @@ import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableExcept
import java.io.IOException; import java.io.IOException;
import java.time.Duration; import java.time.Duration;
import java.time.Instant;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
@ -73,7 +75,7 @@ public class ReceiveHelper {
public void receiveMessagesContinuously(Manager.ReceiveMessageHandler handler) { public void receiveMessagesContinuously(Manager.ReceiveMessageHandler handler) {
while (!shouldStop) { while (!shouldStop) {
try { try {
receiveMessages(Duration.ofMinutes(1), false, null, handler); receiveMessages(Optional.empty(), null, handler);
break; break;
} catch (IOException e) { } catch (IOException e) {
logger.warn("Receiving messages failed, retrying", e); logger.warn("Receiving messages failed, retrying", e);
@ -82,8 +84,7 @@ public class ReceiveHelper {
} }
public void receiveMessages( public void receiveMessages(
Duration timeout, Optional<Duration> timeout,
boolean returnOnTimeout,
Integer maxMessages, Integer maxMessages,
Manager.ReceiveMessageHandler handler Manager.ReceiveMessageHandler handler
) throws IOException { ) throws IOException {
@ -103,7 +104,7 @@ public class ReceiveHelper {
signalWebSocket.registerKeepAliveToken("receive"); signalWebSocket.registerKeepAliveToken("receive");
try { try {
receiveMessagesInternal(signalWebSocket, timeout, returnOnTimeout, maxMessages, handler, queuedActions); receiveMessagesInternal(signalWebSocket, timeout, maxMessages, handler, queuedActions);
} finally { } finally {
hasCaughtUpWithOldMessages = false; hasCaughtUpWithOldMessages = false;
handleQueuedActions(queuedActions.keySet()); handleQueuedActions(queuedActions.keySet());
@ -117,16 +118,17 @@ public class ReceiveHelper {
private void receiveMessagesInternal( private void receiveMessagesInternal(
final SignalWebSocket.AuthenticatedWebSocket signalWebSocket, final SignalWebSocket.AuthenticatedWebSocket signalWebSocket,
Duration timeout, Optional<Duration> timeout,
boolean returnOnTimeout,
Integer maxMessages, Integer maxMessages,
Manager.ReceiveMessageHandler handler, Manager.ReceiveMessageHandler handler,
final Map<HandleAction, HandleAction> queuedActions final Map<HandleAction, HandleAction> queuedActions
) throws IOException { ) throws IOException {
final var timeoutInstant = timeout.map(t -> Instant.now().plus(t));
int remainingMessages = maxMessages == null ? -1 : maxMessages; int remainingMessages = maxMessages == null ? -1 : maxMessages;
var backOffCounter = 0; var backOffCounter = 0;
isWaitingForMessage = false; isWaitingForMessage = false;
logger.debug("Start receiving messages");
while (!shouldStop && remainingMessages != 0) { while (!shouldStop && remainingMessages != 0) {
if (account.getNeedsToRetryFailedMessages()) { if (account.getNeedsToRetryFailedMessages()) {
retryFailedReceivedMessages(handler); retryFailedReceivedMessages(handler);
@ -137,10 +139,15 @@ public class ReceiveHelper {
if (nowMillis - account.getLastReceiveTimestamp() > 4 * 60 * 60 * 1000) { if (nowMillis - account.getLastReceiveTimestamp() > 4 * 60 * 60 * 1000) {
account.setLastReceiveTimestamp(nowMillis); account.setLastReceiveTimestamp(nowMillis);
} }
logger.debug("Checking for new message from server"); logger.trace("Checking for new message from server");
try { try {
isWaitingForMessage = true; isWaitingForMessage = true;
var queueNotEmpty = signalWebSocket.readMessageBatch(timeout.toMillis(), 1, batch -> { final var timeoutMs = timeoutInstant.isPresent() ? Math.min(10_000,
Duration.between(Instant.now(), timeoutInstant.get()).toMillis()) : 10_000L;
if (timeoutMs <= 0L) {
return;
}
var queueNotEmpty = signalWebSocket.readMessageBatch(timeoutMs, 1, batch -> {
logger.debug("Retrieved {} envelopes!", batch.size()); logger.debug("Retrieved {} envelopes!", batch.size());
isWaitingForMessage = false; isWaitingForMessage = false;
for (final var it : batch) { for (final var it : batch) {
@ -205,7 +212,9 @@ public class ReceiveHelper {
throw e; throw e;
} catch (TimeoutException e) { } catch (TimeoutException e) {
backOffCounter = 0; backOffCounter = 0;
if (returnOnTimeout) return; if (timeoutInstant.isPresent() && timeoutInstant.get().isBefore(Instant.now())) {
return;
}
continue; continue;
} catch (Exception e) { } catch (Exception e) {
logger.error("Unknown error when receiving messages", e); logger.error("Unknown error when receiving messages", e);

View File

@ -1339,7 +1339,7 @@ public class ManagerImpl implements Manager {
Optional<Integer> maxMessages, Optional<Integer> maxMessages,
ReceiveMessageHandler handler ReceiveMessageHandler handler
) throws IOException, AlreadyReceivingException { ) throws IOException, AlreadyReceivingException {
receiveMessages(timeout.orElse(Duration.ofMinutes(1)), timeout.isPresent(), maxMessages.orElse(null), handler); receiveMessages(timeout, maxMessages.orElse(null), handler);
} }
@Override @Override
@ -1357,8 +1357,7 @@ public class ManagerImpl implements Manager {
} }
private void receiveMessages( private void receiveMessages(
Duration timeout, Optional<Duration> timeout,
boolean returnOnTimeout,
Integer maxMessages, Integer maxMessages,
ReceiveMessageHandler handler ReceiveMessageHandler handler
) throws IOException, AlreadyReceivingException { ) throws IOException, AlreadyReceivingException {
@ -1370,7 +1369,7 @@ public class ManagerImpl implements Manager {
receiveThread = Thread.currentThread(); receiveThread = Thread.currentThread();
} }
try { try {
context.getReceiveHelper().receiveMessages(timeout, returnOnTimeout, maxMessages, (envelope, e) -> { context.getReceiveHelper().receiveMessages(timeout, maxMessages, (envelope, e) -> {
passReceivedMessageToHandlers(envelope, e); passReceivedMessageToHandlers(envelope, e);
handler.handleMessage(envelope, e); handler.handleMessage(envelope, e);
}); });

View File

@ -42,7 +42,6 @@ public class KeyValueStore {
public <T> T getEntry(KeyValueEntry<T> key) { public <T> T getEntry(KeyValueEntry<T> key) {
synchronized (cache) { synchronized (cache) {
if (cache.containsKey(key)) { if (cache.containsKey(key)) {
logger.trace("Got entry for key {} from cache", key.key());
return (T) cache.get(key); return (T) cache.get(key);
} }
} }