Refactor group send

This commit is contained in:
AsamK 2025-09-06 12:23:07 +02:00
parent c37d8c6ce1
commit 597f0368cf
3 changed files with 90 additions and 82 deletions

View File

@ -48,7 +48,6 @@ import org.whispersystems.signalservice.api.messages.SignalServiceAttachmentStre
import org.whispersystems.signalservice.api.messages.SignalServiceDataMessage;
import org.whispersystems.signalservice.api.messages.SignalServiceGroup;
import org.whispersystems.signalservice.api.messages.SignalServiceGroupV2;
import org.whispersystems.signalservice.api.push.DistributionId;
import org.whispersystems.signalservice.api.push.ServiceId;
import org.whispersystems.signalservice.api.push.exceptions.ConflictException;
@ -238,7 +237,7 @@ public class GroupHelper {
final var result = sendGroupMessage(messageBuilder,
gv2.getMembersIncludingPendingWithout(selfRecipientId),
gv2.getDistributionId());
gv2);
context.getJobExecutor().enqueueJob(new SyncStorageJob());
return new Pair<>(gv2.getGroupId(), result);
}
@ -409,7 +408,7 @@ public class GroupHelper {
var messageBuilder = SignalServiceDataMessage.newBuilder().asGroupMessage(group.build());
// Send group info request message to the recipient who sent us a message with this groupId
return sendGroupMessage(messageBuilder, Set.of(recipientId), null);
return sendGroupMessage(messageBuilder, Set.of(recipientId), new GroupInfoV1(groupId));
}
public SendGroupMessageResults sendGroupInfoMessage(
@ -430,7 +429,7 @@ public class GroupHelper {
var messageBuilder = getGroupUpdateMessageBuilder(g);
// Send group message only to the recipient who requested it
return sendGroupMessage(messageBuilder, Set.of(recipientId), null);
return sendGroupMessage(messageBuilder, Set.of(recipientId), g);
}
private GroupInfo getGroup(GroupId groupId, boolean forceUpdate) {
@ -606,7 +605,7 @@ public class GroupHelper {
var messageBuilder = getGroupUpdateMessageBuilder(gv1);
return sendGroupMessage(messageBuilder,
gv1.getMembersIncludingPendingWithout(account.getSelfRecipientId()),
gv1.getDistributionId());
gv1);
}
private void updateGroupV1Details(
@ -842,7 +841,7 @@ public class GroupHelper {
account.getGroupStore().updateGroup(groupInfoV1);
return sendGroupMessage(messageBuilder,
groupInfoV1.getMembersIncludingPendingWithout(account.getSelfRecipientId()),
groupInfoV1.getDistributionId());
groupInfoV1);
}
private SendGroupMessageResults quitGroupV2(
@ -867,7 +866,7 @@ public class GroupHelper {
handleGroupChangeResponse(groupInfoV2, groupGroupChangePair.second()).encode());
return sendGroupMessage(messageBuilder,
groupInfoV2.getMembersIncludingPendingWithout(account.getSelfRecipientId()),
groupInfoV2.getDistributionId());
groupInfoV2);
}
private SignalServiceDataMessage.Builder getGroupUpdateMessageBuilder(GroupInfoV1 g) throws AttachmentInvalidException {
@ -912,17 +911,17 @@ public class GroupHelper {
account.getGroupStore().updateGroup(group);
final var messageBuilder = getGroupUpdateMessageBuilder(group, groupChange.encode());
return sendGroupMessage(messageBuilder, members, group.getDistributionId());
return sendGroupMessage(messageBuilder, members, group);
}
private SendGroupMessageResults sendGroupMessage(
final SignalServiceDataMessage.Builder messageBuilder,
final Set<RecipientId> members,
final DistributionId distributionId
final GroupInfo groupInfo
) throws IOException {
final var timestamp = System.currentTimeMillis();
messageBuilder.withTimestamp(timestamp);
final var results = context.getSendHelper().sendGroupMessage(messageBuilder.build(), members, distributionId);
final var results = context.getSendHelper().sendGroupMessage(messageBuilder.build(), members, groupInfo);
return new SendGroupMessageResults(timestamp,
results.stream()
.map(sendMessageResult -> SendMessageResult.from(sendMessageResult,

View File

@ -49,10 +49,12 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import okio.ByteString;
@ -119,9 +121,9 @@ public class SendHelper {
public List<SendMessageResult> sendGroupMessage(
final SignalServiceDataMessage message,
final Set<RecipientId> recipientIds,
final DistributionId distributionId
final GroupInfo groupInfo
) throws IOException {
return sendGroupMessage(message, recipientIds, distributionId, ContentHint.IMPLICIT, Optional.empty());
return sendGroupMessage(message, recipientIds, groupInfo, ContentHint.IMPLICIT, Optional.empty());
}
public SendMessageResult sendReceiptMessage(
@ -234,10 +236,9 @@ public class SendHelper {
if (g.isAnnouncementGroup() && !g.isAdmin(account.getSelfRecipientId())) {
throw new GroupSendingNotAllowedException(groupId, g.getTitle());
}
final var distributionId = g.getDistributionId();
final var recipientIds = g.getMembersWithout(account.getSelfRecipientId());
return sendGroupTypingMessage(message, recipientIds, distributionId);
return sendGroupTypingMessage(message, recipientIds, g);
}
public SendMessageResult resendMessage(
@ -324,17 +325,13 @@ public class SendHelper {
}
}
return sendGroupMessage(message,
recipients,
g.getDistributionId(),
ContentHint.RESENDABLE,
editTargetTimestamp);
return sendGroupMessage(message, recipients, g, ContentHint.RESENDABLE, editTargetTimestamp);
}
private List<SendMessageResult> sendGroupMessage(
final SignalServiceDataMessage message,
final Set<RecipientId> recipientIds,
final DistributionId distributionId,
final GroupInfo groupInfo,
final ContentHint contentHint,
final Optional<Long> editTargetTimestamp
) throws IOException {
@ -415,7 +412,7 @@ public class SendHelper {
}
}
});
final var results = sendGroupMessageInternal(legacySender, senderKeySender, recipientIds, distributionId);
final var results = sendGroupMessageInternal(legacySender, senderKeySender, recipientIds, groupInfo, false);
for (var r : results) {
handleSendMessageResult(r);
@ -427,7 +424,7 @@ public class SendHelper {
private List<SendMessageResult> sendGroupTypingMessage(
final SignalServiceTypingMessage message,
final Set<RecipientId> recipientIds,
final DistributionId distributionId
final GroupInfo groupInfo
) throws IOException {
final var messageSender = dependencies.getMessageSender();
final var results = sendGroupMessageInternal((recipients, unidentifiedAccess, isRecipientUpdate) -> messageSender.sendTyping(
@ -442,7 +439,8 @@ public class SendHelper {
groupSendEndorsements,
message),
recipientIds,
distributionId);
groupInfo,
false);
for (var r : results) {
handleSendMessageResult(r);
@ -466,24 +464,42 @@ public class SendHelper {
return g;
}
/**
* @param isRecipientUpdate isRecipientUpdate is true if we've already sent this message to some recipients in the past, otherwise false.
*/
private List<SendMessageResult> sendGroupMessageInternal(
final LegacySenderHandler legacySender,
final SenderKeySenderHandler senderKeySender,
final Set<RecipientId> recipientIds,
final DistributionId distributionId
final GroupInfo groupInfo,
final boolean isRecipientUpdate
) throws IOException {
long startTime = System.currentTimeMillis();
// isRecipientUpdate is true if we've already sent this message to some recipients in the past, otherwise false.
final var isRecipientUpdate = false;
Set<RecipientId> senderKeyTargets = distributionId == null
? Set.of()
: getSenderKeyCapableRecipientIds(recipientIds);
final var allResults = new ArrayList<SendMessageResult>(recipientIds.size());
final var addressesMap = recipientIds.stream()
.collect(Collectors.toMap(id -> id, context.getRecipientHelper()::resolveSignalServiceAddress));
final var unidentifiedAccessesMap = context.getUnidentifiedAccessHelper().getAccessFor(recipientIds);
final GroupSendEndorsements groupSendEndorsements = null; //TODO
Set<RecipientId> senderKeyTargets = groupInfo.getDistributionId() == null
? Set.of()
: recipientIds.stream()
.filter(s -> this.isSenderKeyCapable(addressesMap.get(s), unidentifiedAccessesMap.get(s)))
.collect(Collectors.toSet());
if (senderKeyTargets.size() < 2) {
logger.debug("Too few sender-key-capable users ({}). Doing all legacy sends.", senderKeyTargets.size());
senderKeyTargets = Set.of();
} else {
logger.debug("Can use sender key for {}/{} recipients.", senderKeyTargets.size(), recipientIds.size());
}
final var allResults = new ArrayList<SendMessageResult>(recipientIds.size());
if (!senderKeyTargets.isEmpty()) {
final var results = sendGroupMessageInternalWithSenderKey(senderKeySender,
senderKeyTargets,
distributionId,
groupInfo.getDistributionId(),
senderKeyTargets.stream().map(addressesMap::get).toList(),
senderKeyTargets.stream().map(unidentifiedAccessesMap::get).toList(),
groupSendEndorsements,
isRecipientUpdate);
if (results == null) {
@ -513,8 +529,22 @@ public class SendHelper {
logger.debug("Need to do a legacy send to send a sync message for a group of only ourselves.");
}
final var addresses = legacyTargets.stream().map(addressesMap::get).toList();
final var unidentifiedAccess = legacyTargets.stream().map(unidentifiedAccessesMap::get).toList();
final var senderCertificate = unidentifiedAccess.stream()
.filter(Objects::nonNull)
.findFirst()
.map(UnidentifiedAccess::getUnidentifiedCertificate)
.orElse(null);
final var groupSendTokens = groupSendEndorsements != null
? groupSendEndorsements.forIndividuals(addresses)
: null;
final var sealedSenderAccesses = SealedSenderAccess.forFanOutGroupSend(groupSendTokens,
senderCertificate,
unidentifiedAccess);
final List<SendMessageResult> results = sendGroupMessageInternalWithLegacy(legacySender,
legacyTargets,
addresses,
sealedSenderAccesses,
isRecipientUpdate || !allResults.isEmpty());
allResults.addAll(results);
}
@ -523,55 +553,33 @@ public class SendHelper {
return allResults;
}
private Set<RecipientId> getSenderKeyCapableRecipientIds(final Set<RecipientId> recipientIds) {
final var senderKeyTargets = new HashSet<RecipientId>();
final var recipientList = new ArrayList<>(recipientIds);
for (final var recipientId : recipientList) {
final var access = context.getUnidentifiedAccessHelper().getSealedSenderAccessFor(recipientId);
if (access == null) {
continue;
}
final var serviceId = account.getRecipientAddressResolver()
.resolveRecipientAddress(recipientId)
.serviceId()
.orElse(null);
if (serviceId == null) {
continue;
}
final var identity = account.getIdentityKeyStore().getIdentityInfo(serviceId);
if (identity == null || !identity.getTrustLevel().isTrusted()) {
continue;
}
senderKeyTargets.add(recipientId);
private boolean isSenderKeyCapable(final SignalServiceAddress address, final UnidentifiedAccess access) {
if (access == null) {
return false;
}
if (senderKeyTargets.size() < 2) {
logger.debug("Too few sender-key-capable users ({}). Doing all legacy sends.", senderKeyTargets.size());
return Set.of();
if (!address.hasValidServiceId()) {
return false;
}
logger.debug("Can use sender key for {}/{} recipients.", senderKeyTargets.size(), recipientIds.size());
return senderKeyTargets;
final var identity = account.getIdentityKeyStore().getIdentityInfo(address.getServiceId());
if (identity == null || !identity.getTrustLevel().isTrusted()) {
return false;
}
return true;
}
private List<SendMessageResult> sendGroupMessageInternalWithLegacy(
final LegacySenderHandler sender,
final Set<RecipientId> recipientIds,
final List<SignalServiceAddress> addresses,
final List<SealedSenderAccess> unidentifiedAccesses,
final boolean isRecipientUpdate
) throws IOException {
final var recipientIdList = new ArrayList<>(recipientIds);
final var addresses = recipientIdList.stream()
.map(context.getRecipientHelper()::resolveSignalServiceAddress)
.toList();
final var unidentifiedAccesses = context.getUnidentifiedAccessHelper()
.getSealedSenderAccessFor(recipientIdList);
try {
final var results = sender.send(addresses, unidentifiedAccesses, isRecipientUpdate);
final var successCount = results.stream().filter(SendMessageResult::isSuccess).count();
logger.debug("Successfully sent using 1:1 to {}/{} legacy targets.", successCount, recipientIdList.size());
logger.debug("Successfully sent using 1:1 to {}/{} legacy targets.", successCount, addresses.size());
return results;
} catch (org.whispersystems.signalservice.api.crypto.UntrustedIdentityException e) {
return List.of();
@ -580,12 +588,12 @@ public class SendHelper {
private List<SendMessageResult> sendGroupMessageInternalWithSenderKey(
final SenderKeySenderHandler sender,
final Set<RecipientId> recipientIds,
final DistributionId distributionId,
final List<SignalServiceAddress> addresses,
final List<UnidentifiedAccess> unidentifiedAccesses,
final GroupSendEndorsements groupSendEndorsements,
final boolean isRecipientUpdate
) throws IOException {
final var recipientIdList = new ArrayList<>(recipientIds);
long keyCreateTime = account.getSenderKeyStore()
.getCreateTimeForOurKey(account.getAci(), account.getDeviceId(), distributionId);
long keyAge = System.currentTimeMillis() - keyCreateTime;
@ -599,15 +607,6 @@ public class SendHelper {
account.getSenderKeyStore().deleteOurKey(account.getAci(), distributionId);
}
List<SignalServiceAddress> addresses = recipientIdList.stream()
.map(context.getRecipientHelper()::resolveSignalServiceAddress)
.toList();
List<UnidentifiedAccess> unidentifiedAccesses = context.getUnidentifiedAccessHelper()
.getAccessFor(recipientIdList)
.stream()
.toList();
final GroupSendEndorsements groupSendEndorsements = null;//TODO
try {
List<SendMessageResult> results = sender.send(distributionId,
addresses,

View File

@ -15,7 +15,10 @@ import org.whispersystems.signalservice.api.crypto.SealedSenderAccess;
import org.whispersystems.signalservice.api.crypto.UnidentifiedAccess;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.asamk.signal.manager.util.Utils.handleResponseException;
@ -56,8 +59,15 @@ public class UnidentifiedAccessHelper {
return SealedSenderAccess.forIndividual(getAccessFor(recipient, noRefresh));
}
public List<UnidentifiedAccess> getAccessFor(List<RecipientId> recipients) {
return recipients.stream().map(this::getAccessFor).toList();
public Map<RecipientId, UnidentifiedAccess> getAccessFor(Collection<RecipientId> recipients) {
final var result = new HashMap<RecipientId, UnidentifiedAccess>();
for (final var recipient : recipients) {
final var access = this.getAccessFor(recipient);
if (access != null) {
result.put(recipient, access);
}
}
return result;
}
private @Nullable UnidentifiedAccess getAccessFor(RecipientId recipient) {