chore: cleaned up the batch sender

pull/3052/head
Audric Ackermann 11 months ago
parent 816f29d682
commit a49a65c92b

@ -189,7 +189,7 @@ export async function deleteMessagesFromSwarmOnly(
) {
const deletionMessageHashes = isStringArray(messages) ? messages : getMessageHashes(messages);
try {
if (messages.length === 0) {
if (isEmpty(messages)) {
return false;
}
@ -199,10 +199,11 @@ export async function deleteMessagesFromSwarmOnly(
);
return false;
}
const hashesAsSet = new Set(deletionMessageHashes);
if (PubKey.is03Pubkey(pubkey)) {
return await SnodeAPI.networkDeleteMessagesForGroup(deletionMessageHashes, pubkey);
return await SnodeAPI.networkDeleteMessagesForGroup(hashesAsSet, pubkey);
}
return await SnodeAPI.networkDeleteMessageOurSwarm(deletionMessageHashes, pubkey);
return await SnodeAPI.networkDeleteMessageOurSwarm(hashesAsSet, pubkey);
} catch (e) {
window.log?.error(
`deleteMessagesFromSwarmOnly: Error deleting message from swarm of ${ed25519Str(pubkey)}, hashes: ${deletionMessageHashes}`,

@ -1130,7 +1130,6 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
await GroupSync.pushChangesToGroupSwarmIfNeeded({
groupPk: this.id,
deleteAllMessagesSubRequest: null,
supplementalKeysSubRequest: [],
extraStoreRequests,
});

@ -3,18 +3,16 @@
import { GroupPubkeyType, PubkeyType } from 'libsession_util_nodejs';
import { compact, isEmpty } from 'lodash';
import pRetry from 'p-retry';
import { UserGroupsWrapperActions } from '../../../webworker/workers/browser/libsession_worker_interface';
import { getSodiumRenderer } from '../../crypto';
import { PubKey } from '../../types';
import {
DeleteAllFromUserNodeSubRequest,
DeleteHashesFromGroupNodeSubRequest,
DeleteHashesFromUserNodeSubRequest,
} from './SnodeRequestTypes';
import { BatchRequests } from './batchRequest';
import { SnodePool } from './snodePool';
import { StringUtils, UserUtils } from '../../utils';
import { ed25519Str, fromBase64ToArray, fromHexToArray } from '../../utils/String';
import { UserGroupsWrapperActions } from '../../../webworker/workers/browser/libsession_worker_interface';
import { DeleteAllFromUserNodeSubRequest } from './SnodeRequestTypes';
import { BatchRequests } from './batchRequest';
import { DeleteGroupHashesFactory } from './factories/DeleteGroupHashesRequestFactory';
import { DeleteUserHashesFactory } from './factories/DeleteUserHashesRequestFactory';
import { SnodePool } from './snodePool';
export const ERROR_CODE_NO_CONNECT = 'ENETUNREACH: No network connection.';
@ -158,14 +156,22 @@ const TEST_getMinTimeout = () => 500;
* Note: legacy group did not support removing messages from the swarm.
*/
const networkDeleteMessageOurSwarm = async (
messagesHashes: Array<string>,
messagesHashes: Set<string>,
pubkey: PubkeyType
): Promise<boolean> => {
const sodium = await getSodiumRenderer();
if (!PubKey.is05Pubkey(pubkey) || pubkey !== UserUtils.getOurPubKeyStrFromCache()) {
throw new Error('networkDeleteMessageOurSwarm with 05 pk can only for our own swarm');
}
const request = new DeleteHashesFromUserNodeSubRequest({ messagesHashes });
if (isEmpty(messagesHashes)) {
window.log.info('networkDeleteMessageOurSwarm: messageHashes is empty');
return true;
}
const messageHashesArr = [...messagesHashes];
const request = DeleteUserHashesFactory.makeUserHashesToDeleteSubRequest({ messagesHashes });
if (!request) {
throw new Error('makeUserHashesToDeleteSubRequest returned invalid subrequest');
}
try {
const success = await pRetry(
@ -237,7 +243,7 @@ const networkDeleteMessageOurSwarm = async (
const responseHashes = snodeJson.deleted as Array<string>;
const signatureSnode = snodeJson.signature as string;
// The signature looks like ( PUBKEY_HEX || RMSG[0] || ... || RMSG[N] || DMSG[0] || ... || DMSG[M] )
const dataToVerify = `${request.pubkey}${messagesHashes.join(
const dataToVerify = `${request.pubkey}${messageHashesArr.join(
''
)}${responseHashes.join('')}`;
const dataToVerifyUtf8 = StringUtils.encode(dataToVerify, 'utf8');
@ -292,7 +298,7 @@ const networkDeleteMessageOurSwarm = async (
* - if the request failed too many times
*/
const networkDeleteMessagesForGroup = async (
messagesHashes: Array<string>,
messagesHashes: Set<string>,
groupPk: GroupPubkeyType
): Promise<boolean> => {
if (!PubKey.is03Pubkey(groupPk)) {
@ -301,17 +307,21 @@ const networkDeleteMessagesForGroup = async (
const group = await UserGroupsWrapperActions.getGroup(groupPk);
if (!group || !group.secretKey || isEmpty(group.secretKey)) {
window.log.warn(
`networkDeleteMessagesForGroup: not deleting from swarm of 03-group ${messagesHashes.length} hashes as we do not the adminKey`
`networkDeleteMessagesForGroup: not deleting from swarm of 03-group ${messagesHashes.size} hashes as we do not the adminKey`
);
return false;
}
try {
const request = new DeleteHashesFromGroupNodeSubRequest({
const request = DeleteGroupHashesFactory.makeGroupHashesToDeleteSubRequest({
messagesHashes,
groupPk,
secretKey: group.secretKey,
group,
});
if (!request) {
throw new Error(
'DeleteGroupHashesFactory.makeGroupHashesToDeleteSubRequest failed to build a request '
);
}
await pRetry(
async () => {

@ -39,7 +39,7 @@ abstract class SnodeAPISubRequest {
* Retrieve for legacy was not authenticated
*/
export class RetrieveLegacyClosedGroupSubRequest extends SnodeAPISubRequest {
public method = 'retrieve' as const;
method = 'retrieve' as const;
public readonly legacyGroupPk: PubkeyType;
public readonly last_hash: string;
public readonly max_size: number | undefined;

@ -4,32 +4,32 @@ import { ed25519Str } from '../../../utils/String';
import { DeleteHashesFromGroupNodeSubRequest } from '../SnodeRequestTypes';
function makeGroupHashesToDeleteSubRequest({
allOldHashes,
messagesHashes,
group,
}: {
group: Pick<UserGroupsGet, 'secretKey' | 'pubkeyHex'>;
allOldHashes: Set<string>;
messagesHashes: Set<string>;
}) {
const groupPk = group.pubkeyHex;
const allOldHashesArray = [...allOldHashes];
if (allOldHashesArray.length) {
const messagesHashesArr = [...messagesHashes];
if (messagesHashesArr.length) {
if (!group.secretKey || isEmpty(group.secretKey)) {
window.log.debug(
`makeGroupHashesToDeleteSubRequest: ${ed25519Str(groupPk)}: allOldHashesArray not empty but we do not have the secretKey`
`makeGroupHashesToDeleteSubRequest: ${ed25519Str(groupPk)}: messagesHashesArr not empty but we do not have the secretKey`
);
throw new Error(
'makeGroupHashesToDeleteSubRequest: allOldHashesArray not empty but we do not have the secretKey'
'makeGroupHashesToDeleteSubRequest: messagesHashesArr not empty but we do not have the secretKey'
);
}
return new DeleteHashesFromGroupNodeSubRequest({
messagesHashes: [...allOldHashes],
messagesHashes: messagesHashesArr,
groupPk,
secretKey: group.secretKey,
});
}
return null;
return undefined;
}
export const DeleteGroupHashesFactory = { makeGroupHashesToDeleteSubRequest };

@ -0,0 +1,13 @@
import { DeleteHashesFromUserNodeSubRequest } from '../SnodeRequestTypes';
function makeUserHashesToDeleteSubRequest({ messagesHashes }: { messagesHashes: Set<string> }) {
const messagesHashesArr = [...messagesHashes];
if (messagesHashesArr.length) {
return new DeleteHashesFromUserNodeSubRequest({
messagesHashes: messagesHashesArr,
});
}
return undefined;
}
export const DeleteUserHashesFactory = { makeUserHashesToDeleteSubRequest };

@ -321,7 +321,7 @@ class ConvoController {
groupPk,
secretKey,
})
: null;
: undefined;
// this marks the group info as deleted. We need to push those details
await MetaGroupWrapperActions.infoDestroy(groupPk);

@ -429,8 +429,8 @@ async function sendMessagesDataToSnode<T extends PubkeyType | GroupPubkeyType>(
deleteHashesSubRequest,
deleteAllMessagesSubRequest,
}: WithRevokeSubRequest & {
deleteAllMessagesSubRequest?: DeleteAllFromGroupMsgNodeSubRequest | null;
deleteHashesSubRequest: DeleteHashesRequestPerPubkey<T> | null;
deleteAllMessagesSubRequest?: DeleteAllFromGroupMsgNodeSubRequest;
deleteHashesSubRequest?: DeleteHashesRequestPerPubkey<T>;
},
method: MethodBatchType
): Promise<NotEmptyArrayOfBatchResults> {
@ -645,8 +645,8 @@ async function sendEncryptedDataToSnode<T extends GroupPubkeyType | PubkeyType>(
}: WithRevokeSubRequest & {
storeRequests: StoreRequestsPerPubkey<T>; // keeping those as an array because the order needs to be enforced for some (groupkeys for instance)
destination: T;
deleteHashesSubRequest: DeleteHashesRequestPerPubkey<T> | null;
deleteAllMessagesSubRequest?: DeleteAllFromGroupMsgNodeSubRequest | null;
deleteHashesSubRequest?: DeleteHashesRequestPerPubkey<T>;
deleteAllMessagesSubRequest?: DeleteAllFromGroupMsgNodeSubRequest;
}): Promise<NotEmptyArrayOfBatchResults | null> {
try {
const batchResults = await pRetry(
@ -736,7 +736,6 @@ async function sendUnencryptedDataToSnode<T extends GroupPubkeyType | PubkeyType
return sendEncryptedDataToSnode({
destination,
deleteHashesSubRequest: null,
storeRequests,
});
}
@ -832,7 +831,8 @@ async function handleBatchResultWithSubRequests({
for (let index = 0; index < subRequests.length; index++) {
const subRequest = subRequests[index];
// there are some stuff we need to do when storing a message (for a group/legacy group or user, but no config messages)
// there are some things we need to do when storing messages
// for groups/legacy groups or user (but not for config messages)
if (
subRequest instanceof StoreGroupMessageSubRequest ||
subRequest instanceof StoreLegacyGroupMessageSubRequest ||
@ -862,9 +862,7 @@ async function handleBatchResultWithSubRequests({
await MessageSentHandler.handleSwarmMessageSentSuccess(
{
device: subRequest.destination,
encryption: isDestinationClosedGroup
? SignalService.Envelope.Type.CLOSED_GROUP_MESSAGE
: SignalService.Envelope.Type.SESSION_MESSAGE,
isDestinationClosedGroup,
identifier: subRequest.dbMessageIdentifier,
plainTextBuffer:
subRequest instanceof StoreUserMessageSubRequest

@ -61,35 +61,39 @@ async function handlePublicMessageSentFailure(sentMessage: OpenGroupVisibleMessa
}
async function handleSwarmMessageSentSuccess(
sentMessage: Pick<OutgoingRawMessage, 'device' | 'encryption' | 'identifier'> & {
{
device: destination,
identifier,
isDestinationClosedGroup,
plainTextBuffer,
}: Pick<OutgoingRawMessage, 'device' | 'identifier'> & {
/**
* plainTextBuffer is only required when sending a message to a 1o1,
* as we need it to encrypt it again for our linked devices (synced messages)
*/
plainTextBuffer: Uint8Array | null;
/**
* We must not sync a message when it was sent to a closed group
*/
isDestinationClosedGroup: boolean;
},
effectiveTimestamp: number,
storedHash: string | null
) {
// The wrappedEnvelope will be set only if the message is not one of OpenGroupV2Message type.
let fetchedMessage = await fetchHandleMessageSentData(sentMessage.identifier);
let fetchedMessage = await fetchHandleMessageSentData(identifier);
if (!fetchedMessage) {
return;
}
let sentTo = fetchedMessage.get('sent_to') || [];
const isOurDevice = UserUtils.isUsFromCache(sentMessage.device);
const isOurDevice = UserUtils.isUsFromCache(destination);
// FIXME this is not correct and will cause issues with syncing
// At this point the only way to check for medium
// group is by comparing the encryption type
const isClosedGroupMessage =
sentMessage.encryption === SignalService.Envelope.Type.CLOSED_GROUP_MESSAGE ||
PubKey.is03Pubkey(sentMessage.device);
const isClosedGroupMessage = isDestinationClosedGroup || PubKey.is03Pubkey(destination);
// We trigger a sync message only when the message is not to one of our devices, AND
// the message is not for an open group (there is no sync for opengroups, each device pulls all messages), AND
// the message is not for a group (there is no sync for groups, each device pulls all messages), AND
// if we did not sync or trigger a sync message for this specific message already
const shouldTriggerSyncMessage =
!isOurDevice &&
@ -100,16 +104,16 @@ async function handleSwarmMessageSentSuccess(
// A message is synced if we triggered a sync message (sentSync)
// and the current message was sent to our device (so a sync message)
const shouldMarkMessageAsSynced =
isOurDevice && fetchedMessage.get('sentSync') && isClosedGroupMessage;
(isOurDevice && fetchedMessage.get('sentSync')) || isClosedGroupMessage;
// Handle the sync logic here
if (shouldTriggerSyncMessage && sentMessage && sentMessage.plainTextBuffer) {
if (shouldTriggerSyncMessage && plainTextBuffer) {
try {
const contentDecoded = SignalService.Content.decode(sentMessage.plainTextBuffer);
const contentDecoded = SignalService.Content.decode(plainTextBuffer);
if (contentDecoded && contentDecoded.dataMessage) {
try {
await fetchedMessage.sendSyncMessage(contentDecoded, effectiveTimestamp);
const tempFetchMessage = await fetchHandleMessageSentData(sentMessage.identifier);
const tempFetchMessage = await fetchHandleMessageSentData(identifier);
if (!tempFetchMessage) {
window?.log?.warn(
'Got an error while trying to sendSyncMessage(): fetchedMessage is null'
@ -130,7 +134,7 @@ async function handleSwarmMessageSentSuccess(
fetchedMessage.set({ synced: true });
}
sentTo = union(sentTo, [sentMessage.device]);
sentTo = union(sentTo, [destination]);
if (storedHash) {
fetchedMessage.updateMessageHash(storedHash);
}

@ -161,7 +161,6 @@ class GroupPendingRemovalsJob extends PersistedJob<GroupPendingRemovalsPersisted
const result = await MessageSender.sendEncryptedDataToSnode({
storeRequests: [multiEncryptRequest],
destination: groupPk,
deleteHashesSubRequest: null,
...revokeUnrevokeParams,
});

@ -86,15 +86,16 @@ async function pushChangesToGroupSwarmIfNeeded({
}: WithGroupPubkey &
WithRevokeSubRequest & {
supplementalKeysSubRequest: Array<StoreGroupKeysSubRequest>;
deleteAllMessagesSubRequest?: DeleteAllFromGroupMsgNodeSubRequest | null;
deleteAllMessagesSubRequest?: DeleteAllFromGroupMsgNodeSubRequest;
extraStoreRequests: Array<StoreGroupMessageSubRequest>;
}): Promise<RunJobResult> {
// save the dumps to DB even before trying to push them, so at least we have an up to date dumps in the DB in case of crash, no network etc
await LibSessionUtil.saveDumpsToDb(groupPk);
const { allOldHashes, messages: pendingConfigData } =
await LibSessionUtil.pendingChangesForGroup(groupPk);
// If there are no pending changes then the job can just complete (next time something
// is updated we want to try and run immediately so don't schedule another run in this case)
// If there are no pending changes nor any requests to be made,
// then the job can just complete (next time something is updated we want
// to try and run immediately so don't schedule another run in this case)
if (
isEmpty(pendingConfigData) &&
isEmpty(supplementalKeysSubRequest) &&
@ -128,12 +129,13 @@ async function pushChangesToGroupSwarmIfNeeded({
const deleteHashesSubRequest = DeleteGroupHashesFactory.makeGroupHashesToDeleteSubRequest({
group,
allOldHashes,
messagesHashes: allOldHashes,
});
const result = await MessageSender.sendEncryptedDataToSnode({
// Note: this is on purpose that supplementalKeysSubRequest is before pendingConfigRequests
// as this is to avoid a race condition where a device polls while we are posting the configs (already encrypted with the new keys)
// as this is to avoid a race condition where a device is polling right
// while we are posting the configs (already encrypted with the new keys)
storeRequests: [...supplementalKeysSubRequest, ...pendingConfigRequests, ...extraStoreRequests],
destination: groupPk,
deleteHashesSubRequest,

@ -113,7 +113,7 @@ async function pushChangesToUserSwarmIfNeeded() {
? new DeleteHashesFromUserNodeSubRequest({
messagesHashes: [...changesToPush.allOldHashes],
})
: null;
: undefined;
const result = await MessageSender.sendEncryptedDataToSnode({
storeRequests,

@ -210,7 +210,6 @@ const initNewGroupInWrapper = createAsyncThunk(
const result = await GroupSync.pushChangesToGroupSwarmIfNeeded({
groupPk,
supplementalKeysSubRequest: [],
deleteAllMessagesSubRequest: null,
extraStoreRequests,
});
if (result !== RunJobResult.Success) {
@ -730,7 +729,6 @@ async function handleMemberAddedFromUI({
supplementalKeysSubRequest,
revokeSubRequest,
unrevokeSubRequest,
deleteAllMessagesSubRequest: null,
extraStoreRequests,
});
if (sequenceResult !== RunJobResult.Success) {
@ -834,7 +832,6 @@ async function handleMemberRemovedFromUI({
const sequenceResult = await GroupSync.pushChangesToGroupSwarmIfNeeded({
groupPk,
supplementalKeysSubRequest: [],
deleteAllMessagesSubRequest: null,
extraStoreRequests,
});
if (sequenceResult !== RunJobResult.Success) {
@ -914,7 +911,6 @@ async function handleNameChangeFromUI({
const batchResult = await GroupSync.pushChangesToGroupSwarmIfNeeded({
groupPk,
supplementalKeysSubRequest: [],
deleteAllMessagesSubRequest: null,
extraStoreRequests,
});
@ -1035,7 +1031,6 @@ const triggerFakeAvatarUpdate = createAsyncThunk(
const batchResult = await GroupSync.pushChangesToGroupSwarmIfNeeded({
groupPk,
supplementalKeysSubRequest: [],
deleteAllMessagesSubRequest: null,
extraStoreRequests,
});
if (!batchResult) {

Loading…
Cancel
Save