refactor: move subrequests to classes and fix updateMessages

pull/2963/head
Audric Ackermann 1 year ago
parent 6094e725fb
commit 5509dc74c5

@ -42,12 +42,12 @@ export type GuardNode = {
ed25519PubKey: string;
};
export interface Snode {
export type Snode = {
ip: string;
port: number;
pubkey_x25519: string;
pubkey_ed25519: string;
}
};
export type SwarmNode = Snode & {
address: string;
@ -227,12 +227,12 @@ async function cleanLastHashes(): Promise<void> {
await channels.cleanLastHashes();
}
async function saveSeenMessageHashes(
data: Array<{
expiresAt: number;
hash: string;
}>
): Promise<void> {
export type SeenMessageHashes = {
expiresAt: number;
hash: string;
};
async function saveSeenMessageHashes(data: Array<SeenMessageHashes>): Promise<void> {
await channels.saveSeenMessageHashes(cleanData(data));
}

@ -31,7 +31,7 @@ import { HexString } from '../node/hexStrings';
import {
SnodeNamespace,
SnodeNamespaces,
UserConfigNamespaces,
SnodeNamespacesUserConfig,
} from '../session/apis/snode_api/namespaces';
import { RetrieveMessageItemWithNamespace } from '../session/apis/snode_api/types';
import { ClosedGroup, GroupInfo } from '../session/group/closed-group';
@ -64,12 +64,12 @@ type IncomingUserResult = {
needsDump: boolean;
publicKey: string;
latestEnvelopeTimestamp: number;
namespace: UserConfigNamespaces;
namespace: SnodeNamespacesUserConfig;
};
function byUserNamespace(incomingConfigs: Array<RetrieveMessageItemWithNamespace>) {
const groupedByVariant: Map<
UserConfigNamespaces,
SnodeNamespacesUserConfig,
Array<RetrieveMessageItemWithNamespace>
> = new Map();

@ -169,7 +169,7 @@ export async function handleSwarmDataMessage({
if (cleanDataMessage.groupUpdateMessage) {
await GroupV2Receiver.handleGroupUpdateMessage({
envelopeTimestamp: sentAtTimestamp,
signatureTimestamp: sentAtTimestamp,
updateMessage: rawDataMessage.groupUpdateMessage as SignalService.GroupUpdateMessage,
source: envelope.source,
senderIdentity: envelope.senderIdentity,

@ -27,7 +27,7 @@ import {
UserGroupsWrapperActions,
} from '../../webworker/workers/browser/libsession_worker_interface';
type WithEnvelopeTimestamp = { envelopeTimestamp: number };
type WithSignatureTimestamp = { signatureTimestamp: number };
type WithAuthor = { author: PubkeyType };
type WithUncheckedSource = { source: string };
@ -35,16 +35,16 @@ type WithUncheckedSenderIdentity = { senderIdentity: string };
type GroupInviteDetails = {
inviteMessage: SignalService.GroupUpdateInviteMessage;
} & WithEnvelopeTimestamp &
} & WithSignatureTimestamp &
WithAuthor;
type GroupUpdateGeneric<T> = { change: Omit<T, 'toJSON'> } & WithEnvelopeTimestamp &
type GroupUpdateGeneric<T> = { change: Omit<T, 'toJSON'> } & WithSignatureTimestamp &
WithGroupPubkey &
WithAuthor;
type GroupUpdateDetails = {
updateMessage: SignalService.GroupUpdateMessage;
} & WithEnvelopeTimestamp;
} & WithSignatureTimestamp;
/**
* Send the invite response to the group's swarm. An admin will handle it and update our invite pending state to not pending.
@ -68,7 +68,7 @@ async function sendInviteResponseToGroup({ groupPk }: { groupPk: GroupPubkeyType
async function handleGroupInviteMessage({
inviteMessage,
author,
envelopeTimestamp,
signatureTimestamp,
}: GroupInviteDetails) {
const groupPk = inviteMessage.groupSessionId;
if (!PubKey.is03Pubkey(groupPk)) {
@ -89,7 +89,7 @@ async function handleGroupInviteMessage({
const sigValid = await verifySig({
pubKey: HexString.fromHexStringNoPrefix(groupPk),
signature: inviteMessage.adminSignature,
data: stringToUint8Array(`INVITE${UserUtils.getOurPubKeyStrFromCache()}${envelopeTimestamp}`),
data: stringToUint8Array(`INVITE${UserUtils.getOurPubKeyStrFromCache()}${signatureTimestamp}`),
});
if (!sigValid) {
@ -101,7 +101,7 @@ async function handleGroupInviteMessage({
const convo = await ConvoHub.use().getOrCreateAndWait(groupPk, ConversationTypeEnum.GROUPV2);
convo.set({
active_at: envelopeTimestamp,
active_at: signatureTimestamp,
didApproveMe: true,
conversationIdOrigin: author,
});
@ -180,13 +180,13 @@ async function verifySig({
async function handleGroupInfoChangeMessage({
change,
groupPk,
envelopeTimestamp,
signatureTimestamp,
author,
}: GroupUpdateGeneric<SignalService.GroupUpdateInfoChangeMessage>) {
const sigValid = await verifySig({
pubKey: HexString.fromHexStringNoPrefix(groupPk),
signature: change.adminSignature,
data: stringToUint8Array(`INFO_CHANGE${change.type}${envelopeTimestamp}`),
data: stringToUint8Array(`INFO_CHANGE${change.type}${signatureTimestamp}`),
});
if (!sigValid) {
window.log.warn('received group info change with invalid signature. dropping');
@ -203,7 +203,7 @@ async function handleGroupInfoChangeMessage({
convo,
diff: { type: 'name', newName: change.updatedName },
sender: author,
sentAt: envelopeTimestamp,
sentAt: signatureTimestamp,
expireUpdate: null,
});
@ -214,7 +214,7 @@ async function handleGroupInfoChangeMessage({
convo,
diff: { type: 'avatarChange' },
sender: author,
sentAt: envelopeTimestamp,
sentAt: signatureTimestamp,
expireUpdate: null,
});
break;
@ -230,13 +230,13 @@ async function handleGroupInfoChangeMessage({
convo,
diff: { type: 'name', newName: change.updatedName },
sender: author,
sentAt: envelopeTimestamp,
sentAt: signatureTimestamp,
expireUpdate: null,
});
await convo.updateExpireTimer({
providedExpireTimer: change.updatedExpiration,
providedSource: author,
receivedAt: envelopeTimestamp,
receivedAt: signatureTimestamp,
fromCurrentDevice: false,
fromSync: false,
fromConfigMessage: false,
@ -249,14 +249,14 @@ async function handleGroupInfoChangeMessage({
}
convo.set({
active_at: envelopeTimestamp,
active_at: signatureTimestamp,
});
}
async function handleGroupMemberChangeMessage({
change,
groupPk,
envelopeTimestamp,
signatureTimestamp,
author,
}: GroupUpdateGeneric<SignalService.GroupUpdateMemberChangeMessage>) {
const convo = ConvoHub.use().get(groupPk);
@ -267,7 +267,7 @@ async function handleGroupMemberChangeMessage({
const sigValid = await verifySig({
pubKey: HexString.fromHexStringNoPrefix(groupPk),
signature: change.adminSignature,
data: stringToUint8Array(`MEMBER_CHANGE${change.type}${envelopeTimestamp}`),
data: stringToUint8Array(`MEMBER_CHANGE${change.type}${signatureTimestamp}`),
});
if (!sigValid) {
window.log.warn('received group member change with invalid signature. dropping');
@ -280,7 +280,12 @@ async function handleGroupMemberChangeMessage({
return;
}
const sharedDetails = { convo, sender: author, sentAt: envelopeTimestamp, expireUpdate: null };
const sharedDetails = {
convo,
sender: author,
sentAt: signatureTimestamp,
expireUpdate: null,
};
switch (change.type) {
case SignalService.GroupUpdateMemberChangeMessage.Type.ADDED: {
@ -310,13 +315,13 @@ async function handleGroupMemberChangeMessage({
}
convo.set({
active_at: envelopeTimestamp,
active_at: signatureTimestamp,
});
}
async function handleGroupMemberLeftMessage({
groupPk,
envelopeTimestamp,
signatureTimestamp,
author,
}: GroupUpdateGeneric<SignalService.GroupUpdateMemberLeftMessage>) {
// No need to verify sig, the author is already verified with the libsession.decrypt()
@ -337,19 +342,19 @@ async function handleGroupMemberLeftMessage({
convo,
diff: { type: 'left', left: [author] },
sender: author,
sentAt: envelopeTimestamp,
sentAt: signatureTimestamp,
expireUpdate: null,
});
convo.set({
active_at: envelopeTimestamp,
active_at: signatureTimestamp,
});
// debugger TODO We should process this message type even if the sender is blocked
}
async function handleGroupDeleteMemberContentMessage({
groupPk,
envelopeTimestamp,
signatureTimestamp,
change,
}: GroupUpdateGeneric<SignalService.GroupUpdateDeleteMemberContentMessage>) {
const convo = ConvoHub.use().get(groupPk);
@ -361,7 +366,7 @@ async function handleGroupDeleteMemberContentMessage({
pubKey: HexString.fromHexStringNoPrefix(groupPk),
signature: change.adminSignature,
data: stringToUint8Array(
`DELETE_CONTENT${envelopeTimestamp}${change.memberSessionIds.join()}${change.messageHashes.join()}`
`DELETE_CONTENT${signatureTimestamp}${change.memberSessionIds.join()}${change.messageHashes.join()}`
),
});
@ -372,14 +377,14 @@ async function handleGroupDeleteMemberContentMessage({
// TODO we should process this message type even if the sender is blocked
convo.set({
active_at: envelopeTimestamp,
active_at: signatureTimestamp,
});
throw new Error('Not implemented');
}
async function handleGroupUpdateDeleteMessage({
groupPk,
envelopeTimestamp,
signatureTimestamp,
change,
}: GroupUpdateGeneric<SignalService.GroupUpdateDeleteMessage>) {
// TODO verify sig?
@ -390,7 +395,7 @@ async function handleGroupUpdateDeleteMessage({
const sigValid = await verifySig({
pubKey: HexString.fromHexStringNoPrefix(groupPk),
signature: change.adminSignature,
data: stringToUint8Array(`DELETE${envelopeTimestamp}${change.memberSessionIds.join()}`),
data: stringToUint8Array(`DELETE${signatureTimestamp}${change.memberSessionIds.join()}`),
});
if (!sigValid) {
@ -398,7 +403,7 @@ async function handleGroupUpdateDeleteMessage({
return;
}
convo.set({
active_at: envelopeTimestamp,
active_at: signatureTimestamp,
});
throw new Error('Not implemented');
// TODO We should process this message type even if the sender is blocked
@ -408,7 +413,7 @@ async function handleGroupUpdateInviteResponseMessage({
groupPk,
change,
author,
}: Omit<GroupUpdateGeneric<SignalService.GroupUpdateInviteResponseMessage>, 'envelopeTimestamp'>) {
}: Omit<GroupUpdateGeneric<SignalService.GroupUpdateInviteResponseMessage>, 'signatureTimestamp'>) {
// no sig verify for this type of message
const convo = ConvoHub.use().get(groupPk);
if (!convo) {

@ -5,6 +5,7 @@ import { OpenGroupData } from '../../../../data/opengroups';
import { assertUnreachable, roomHasBlindEnabled } from '../../../../types/sqlSharedTypes';
import { Reactions } from '../../../../util/reactions';
import { OnionSending, OnionV4JSONSnodeResponse } from '../../../onions/onionSend';
import { MethodBatchType } from '../../snode_api/SnodeRequestTypes';
import {
OpenGroupPollingUtils,
OpenGroupRequestHeaders,
@ -55,7 +56,7 @@ export const sogsBatchSend = async (
roomInfos: Set<string>,
abortSignal: AbortSignal,
batchRequestOptions: Array<OpenGroupBatchRow>,
batchType: 'batch' | 'sequence'
batchType: MethodBatchType
): Promise<BatchSogsReponse | null> => {
// getting server pk for room
const [roomId] = roomInfos;
@ -356,9 +357,9 @@ const getBatchRequest = async (
serverPublicKey: string,
batchOptions: Array<OpenGroupBatchRow>,
requireBlinding: boolean,
batchType: 'batch' | 'sequence'
batchType: MethodBatchType
): Promise<BatchRequest | undefined> => {
const batchEndpoint = batchType === 'sequence' ? '/sequence' : '/batch';
const batchEndpoint = `/${batchType}` as const;
const batchMethod = 'POST';
if (!batchOptions || isEmpty(batchOptions)) {
return undefined;

@ -1,19 +1,19 @@
/* eslint-disable no-prototype-builtins */
/* eslint-disable no-restricted-syntax */
import { compact, sample } from 'lodash';
import { compact } from 'lodash';
import pRetry from 'p-retry';
import { Snode } from '../../../data/data';
import { getSodiumRenderer } from '../../crypto';
import { ed25519Str } from '../../onions/onionPath';
import { StringUtils, UserUtils } from '../../utils';
import { fromBase64ToArray, fromHexToArray } from '../../utils/String';
import { doSnodeBatchRequest } from './batchRequest';
import { SnodeSignature } from './signature/snodeSignatures';
import { getSwarmFor } from './snodePool';
import { getNodeFromSwarmOrThrow } from './snodePool';
export const ERROR_CODE_NO_CONNECT = 'ENETUNREACH: No network connection.';
// TODOLATER we should merge those two functions together as they are almost exactly the same
// TODO make this function use doUnsignedBatchRequest but we need to merge the verify logic into it
const forceNetworkDeletion = async (): Promise<Array<string> | null> => {
const sodium = await getSodiumRenderer();
const usPk = UserUtils.getOurPubKeyStrFromCache();
@ -30,13 +30,7 @@ const forceNetworkDeletion = async (): Promise<Array<string> | null> => {
try {
const maliciousSnodes = await pRetry(
async () => {
const userSwarm = await getSwarmFor(usPk);
const snodeToMakeRequestTo: Snode | undefined = sample(userSwarm);
if (!snodeToMakeRequestTo) {
window?.log?.warn('Cannot forceNetworkDeletion, without a valid swarm node.');
return null;
}
const snodeToMakeRequestTo = await getNodeFromSwarmOrThrow(usPk);
return pRetry(
async () => {
@ -196,13 +190,7 @@ const networkDeleteMessages = async (hashes: Array<string>): Promise<Array<strin
try {
const maliciousSnodes = await pRetry(
async () => {
const userSwarm = await getSwarmFor(userX25519PublicKey);
const snodeToMakeRequestTo: Snode | undefined = sample(userSwarm);
if (!snodeToMakeRequestTo) {
window?.log?.warn('Cannot networkDeleteMessages, without a valid swarm node.');
return null;
}
const snodeToMakeRequestTo = await getNodeFromSwarmOrThrow(userX25519PublicKey);
return pRetry(
async () => {

File diff suppressed because it is too large Load Diff

@ -1,20 +1,19 @@
import { isArray } from 'lodash';
import { Snode } from '../../../data/data';
import { MessageSender } from '../../sending';
import { processOnionRequestErrorAtDestination, SnodeResponse } from './onions';
import { snodeRpc } from './sessionRpc';
import {
builtRequestToLoggingId,
BuiltSnodeSubRequests,
MAX_SUBREQUESTS_COUNT,
MethodBatchType,
NotEmptyArrayOfBatchResults,
SnodeApiSubRequests,
RawSnodeSubRequests,
} from './SnodeRequestTypes';
function logSubRequests(_requests: Array<SnodeApiSubRequests>) {
return 'logSubRequests to do';
// return requests.map(m =>
// m.method === 'retrieve' || m.method === 'store'
// ? `${m.method}-${SnodeNamespace.toRoles(m.params.namespace)}`
// : m.method
// );
function logSubRequests(requests: Array<BuiltSnodeSubRequests>) {
return `[${requests.map(builtRequestToLoggingId).join(', ')}]`;
}
/**
@ -28,13 +27,14 @@ function logSubRequests(_requests: Array<SnodeApiSubRequests>) {
* @param method can be either batch or sequence. A batch call will run all calls even if one of them fails. A sequence call will stop as soon as the first one fails
*/
export async function doSnodeBatchRequest(
subRequests: Array<SnodeApiSubRequests>,
subRequests: Array<BuiltSnodeSubRequests>,
targetNode: Snode,
timeout: number,
associatedWith: string | null,
method: 'batch' | 'sequence' = 'batch'
method: MethodBatchType = 'batch'
): Promise<NotEmptyArrayOfBatchResults> {
window.log.debug(`doSnodeBatchRequest "${method}":`, JSON.stringify(logSubRequests(subRequests)));
if (subRequests.length > MAX_SUBREQUESTS_COUNT) {
window.log.error(
`batch subRequests count cannot be more than ${MAX_SUBREQUESTS_COUNT}. Got ${subRequests.length}`
@ -76,6 +76,17 @@ export async function doSnodeBatchRequest(
return decoded;
}
export async function doUnsignedSnodeBatchRequest(
unsignedSubRequests: Array<RawSnodeSubRequests>,
targetNode: Snode,
timeout: number,
associatedWith: string | null,
method: MethodBatchType = 'batch'
): Promise<NotEmptyArrayOfBatchResults> {
const signedSubRequests = await MessageSender.signSubRequests(unsignedSubRequests);
return doSnodeBatchRequest(signedSubRequests, targetNode, timeout, associatedWith, method);
}
/**
* Make sure the global batch status code is 200, parse the content as json and return it
*/

@ -1,21 +1,10 @@
/* eslint-disable no-restricted-syntax */
import {
chunk,
compact,
difference,
flatten,
isArray,
isEmpty,
isNumber,
sample,
uniqBy,
} from 'lodash';
import { chunk, compact, difference, flatten, isArray, isEmpty, isNumber, uniqBy } from 'lodash';
import pRetry from 'p-retry';
import { Snode } from '../../../data/data';
import { getSodiumRenderer } from '../../crypto';
import { StringUtils, UserUtils } from '../../utils';
import { fromBase64ToArray, fromHexToArray } from '../../utils/String';
import { EmptySwarmError } from '../../utils/errors';
import { SeedNodeAPI } from '../seed_node_api';
import {
MAX_SUBREQUESTS_COUNT,
@ -23,9 +12,8 @@ import {
WithShortenOrExtend,
fakeHash,
} from './SnodeRequestTypes';
import { doSnodeBatchRequest } from './batchRequest';
import { SnodeSignature } from './signature/snodeSignatures';
import { getSwarmFor } from './snodePool';
import { doUnsignedSnodeBatchRequest } from './batchRequest';
import { getNodeFromSwarmOrThrow } from './snodePool';
import { ExpireMessageResultItem, ExpireMessagesResultsContent } from './types';
export type verifyExpireMsgsResponseSignatureProps = ExpireMessageResultItem & {
@ -158,7 +146,13 @@ async function updateExpiryOnNodes(
expireRequests: Array<UpdateExpiryOnNodeUserSubRequest>
): Promise<Array<UpdatedExpiryWithHash>> {
try {
const result = await doSnodeBatchRequest(expireRequests, targetNode, 4000, ourPubKey, 'batch');
const result = await doUnsignedSnodeBatchRequest(
expireRequests,
targetNode,
4000,
ourPubKey,
'batch'
);
if (!result || result.length !== expireRequests.length) {
window.log.error(
@ -189,7 +183,7 @@ async function updateExpiryOnNodes(
ourPubKey,
targetNode,
bodyIndex as ExpireMessagesResultsContent,
request.params.messages
request.messageHashes
);
})
);
@ -225,7 +219,7 @@ async function updateExpiryOnNodes(
}
const hashesRequestedButNotInResults = difference(
flatten(expireRequests.map(m => m.params.messages)),
flatten(expireRequests.map(m => m.messageHashes)),
[...flatten(changesValid.map(c => c.messageHashes)), fakeHash]
);
if (!isEmpty(hashesRequestedButNotInResults)) {
@ -300,30 +294,11 @@ export async function buildExpireRequestSingleExpiry(
// NOTE for shortenOrExtend, '' means we want to hardcode the expiry to a TTL value, otherwise it's a shorten or extension of the TTL
const signResult = await SnodeSignature.generateUpdateExpiryOurSignature({
shortenOrExtend,
timestamp: expiryMs,
return new UpdateExpiryOnNodeUserSubRequest({
expiryMs,
messagesHashes: messageHashes,
shortenOrExtend,
});
if (!signResult) {
window.log.error(
`[buildExpireRequestSingleExpiry] SnodeSignature.generateUpdateExpirySignature returned an empty result`
);
return null;
}
return {
method: 'expire' as const,
params: {
pubkey: ourPubKey,
pubkey_ed25519: signResult.pubkey.toUpperCase(),
messages: messageHashes,
expiry: expiryMs,
extend: shortenOrExtend === 'extend' || undefined,
shorten: shortenOrExtend === 'shorten' || undefined,
signature: signResult?.signature,
},
};
}
type GroupedBySameExpiry = Record<string, Array<string>>;
@ -402,8 +377,6 @@ export async function expireMessagesOnSnode(
throw new Error('[expireMessagesOnSnode] No pubkey found');
}
let snode: Snode | undefined;
try {
// key is a string even if it is really a number because Object.keys only knows strings...
const groupedBySameExpiry = groupMsgByExpiry(expiringDetails);
@ -432,12 +405,9 @@ export async function expireMessagesOnSnode(
expireRequestsParams.map(chunkRequest =>
pRetry(
async () => {
const swarm = await getSwarmFor(ourPubKey);
snode = sample(swarm);
if (!snode) {
throw new EmptySwarmError(ourPubKey, 'Ran out of swarm nodes to query');
}
return updateExpiryOnNodes(snode, ourPubKey, chunkRequest);
const targetNode = await getNodeFromSwarmOrThrow(ourPubKey);
return updateExpiryOnNodes(targetNode, ourPubKey, chunkRequest);
},
{
retries: 3,
@ -455,12 +425,7 @@ export async function expireMessagesOnSnode(
return flatten(compact(allSettled.map(m => (m.status === 'fulfilled' ? m.value : null))));
} catch (e) {
const snodeStr = snode ? `${snode.ip}:${snode.port}` : 'null';
window?.log?.warn(
`[expireMessagesOnSnode] ${e.code || ''}${
e.message || e
} by ${ourPubKey} via snode:${snodeStr}`
);
window?.log?.warn(`[expireMessagesOnSnode] ${e.code || ''}${e.message || e} by ${ourPubKey}`);
throw e;
}
}

@ -1,14 +1,13 @@
/* eslint-disable no-restricted-syntax */
import { PubkeyType } from 'libsession_util_nodejs';
import { isFinite, isNil, isNumber, sample } from 'lodash';
import { isFinite, isNil, isNumber } from 'lodash';
import pRetry from 'p-retry';
import { Snode } from '../../../data/data';
import { UserUtils } from '../../utils';
import { EmptySwarmError } from '../../utils/errors';
import { SeedNodeAPI } from '../seed_node_api';
import { GetExpiriesFromNodeSubRequest, fakeHash } from './SnodeRequestTypes';
import { doSnodeBatchRequest } from './batchRequest';
import { getSwarmFor } from './snodePool';
import { doUnsignedSnodeBatchRequest } from './batchRequest';
import { getNodeFromSwarmOrThrow } from './snodePool';
import { GetExpiriesResultsContent, WithMessagesHashes } from './types';
export type GetExpiriesRequestResponseResults = Record<string, number>;
@ -47,8 +46,13 @@ async function getExpiriesFromNodes(
) {
try {
const expireRequest = new GetExpiriesFromNodeSubRequest({ messagesHashes: messageHashes });
const signed = await expireRequest.buildAndSignParameters();
const result = await doSnodeBatchRequest([signed], targetNode, 4000, associatedWith, 'batch');
const result = await doUnsignedSnodeBatchRequest(
[expireRequest],
targetNode,
4000,
associatedWith,
'batch'
);
if (!result || result.length !== 1) {
throw Error(
@ -113,17 +117,12 @@ export async function getExpiriesFromSnode({ messagesHashes }: WithMessagesHashe
return [];
}
let snode: Snode | undefined;
try {
const fetchedExpiries = await pRetry(
async () => {
const swarm = await getSwarmFor(ourPubKey);
snode = sample(swarm);
if (!snode) {
throw new EmptySwarmError(ourPubKey, 'Ran out of swarm nodes to query');
}
return getExpiriesFromNodes(snode, messagesHashes, ourPubKey);
const targetNode = await getNodeFromSwarmOrThrow(ourPubKey);
return getExpiriesFromNodes(targetNode, messagesHashes, ourPubKey);
},
{
retries: 3,
@ -139,11 +138,10 @@ export async function getExpiriesFromSnode({ messagesHashes }: WithMessagesHashe
return fetchedExpiries;
} catch (e) {
const snodeStr = snode ? `${snode.ip}:${snode.port}` : 'null';
window?.log?.warn(
`[getExpiriesFromSnode] ${e.code ? `${e.code} ` : ''}${
e.message || e
} by ${ourPubKey} for ${messagesHashes} via snode:${snodeStr}`
} by ${ourPubKey} for ${messagesHashes}`
);
throw e;
}

@ -7,12 +7,12 @@
import { isNumber } from 'lodash';
import { Snode } from '../../../data/data';
import { NetworkTimeSubRequest } from './SnodeRequestTypes';
import { doSnodeBatchRequest } from './batchRequest';
import { doUnsignedSnodeBatchRequest } from './batchRequest';
const getNetworkTime = async (snode: Snode): Promise<string | number> => {
const subrequest = new NetworkTimeSubRequest();
const result = await doSnodeBatchRequest([subrequest.build()], snode, 4000, null);
const result = await doUnsignedSnodeBatchRequest([subrequest], snode, 4000, null);
if (!result || !result.length) {
window?.log?.warn(`getNetworkTime on ${snode.ip}:${snode.port} returned falsish value`, result);
throw new Error('getNetworkTime: Invalid result');

@ -2,7 +2,7 @@ import { compact, intersectionWith, sampleSize } from 'lodash';
import { SnodePool } from '.';
import { Snode } from '../../../data/data';
import { GetServiceNodesSubRequest } from './SnodeRequestTypes';
import { doSnodeBatchRequest } from './batchRequest';
import { doUnsignedSnodeBatchRequest } from './batchRequest';
import { GetNetworkTime } from './getNetworkTime';
import { minSnodePoolCount, requiredSnodesForAgreement } from './snodePool';
@ -14,7 +14,7 @@ import { minSnodePoolCount, requiredSnodesForAgreement } from './snodePool';
async function getSnodePoolFromSnode(targetNode: Snode): Promise<Array<Snode>> {
const subrequest = new GetServiceNodesSubRequest();
const results = await doSnodeBatchRequest([subrequest.build()], targetNode, 4000, null);
const results = await doUnsignedSnodeBatchRequest([subrequest], targetNode, 4000, null);
const firstResult = results[0];

@ -3,7 +3,7 @@ import pRetry from 'p-retry';
import { Snode } from '../../../data/data';
import { PubKey } from '../../types';
import { SwarmForSubRequest } from './SnodeRequestTypes';
import { doSnodeBatchRequest } from './batchRequest';
import { doUnsignedSnodeBatchRequest } from './batchRequest';
import { GetNetworkTime } from './getNetworkTime';
import { getRandomSnode } from './snodePool';
@ -19,7 +19,7 @@ async function requestSnodesForPubkeyWithTargetNodeRetryable(
}
const subrequest = new SwarmForSubRequest(pubkey);
const result = await doSnodeBatchRequest([subrequest.build()], targetNode, 4000, pubkey);
const result = await doUnsignedSnodeBatchRequest([subrequest], targetNode, 4000, pubkey);
if (!result || !result.length) {
window?.log?.warn(

@ -1,4 +1,4 @@
import { isNumber, last, orderBy } from 'lodash';
import { last, orderBy } from 'lodash';
import { PickEnum } from '../../../types/Enums';
import { assertUnreachable } from '../../../types/sqlSharedTypes';
@ -79,7 +79,7 @@ export type SnodeNamespacesGroup =
export type SnodeNamespacesUser = PickEnum<SnodeNamespaces, SnodeNamespaces.Default>;
export type UserConfigNamespaces = PickEnum<
export type SnodeNamespacesUserConfig = PickEnum<
SnodeNamespaces,
| SnodeNamespaces.UserProfile
| SnodeNamespaces.UserContacts
@ -91,7 +91,7 @@ export type UserConfigNamespaces = PickEnum<
* Returns true if that namespace is associated with the config of a user (not his messages, only configs)
*/
// eslint-disable-next-line consistent-return
function isUserConfigNamespace(namespace: SnodeNamespaces): namespace is UserConfigNamespaces {
function isUserConfigNamespace(namespace: SnodeNamespaces): namespace is SnodeNamespacesUserConfig {
switch (namespace) {
case SnodeNamespaces.UserProfile:
case SnodeNamespaces.UserContacts:
@ -262,10 +262,7 @@ function toRole(namespace: number) {
}
}
function toRoles(namespace: number | Array<number>) {
if (isNumber(namespace)) {
return [namespace].map(toRole);
}
function toRoles(namespace: Array<number>) {
return namespace.map(toRole);
}
@ -275,4 +272,5 @@ export const SnodeNamespace = {
isGroupNamespace,
maxSizeMap,
toRoles,
toRole,
};

@ -7,7 +7,7 @@ import {
toHex,
} from '../../utils/String';
import { OnsResolveSubRequest } from './SnodeRequestTypes';
import { doSnodeBatchRequest } from './batchRequest';
import { doUnsignedSnodeBatchRequest } from './batchRequest';
import { GetNetworkTime } from './getNetworkTime';
import { getRandomSnode } from './snodePool';
@ -29,7 +29,7 @@ async function getSessionIDForOnsName(onsNameCase: string) {
const promises = range(0, validationCount).map(async () => {
const targetNode = await getRandomSnode();
const results = await doSnodeBatchRequest([subRequest.build()], targetNode, 4000, null);
const results = await doUnsignedSnodeBatchRequest([subRequest], targetNode, 4000, null);
const firstResult = results[0];
if (!firstResult || firstResult.code !== 200 || !firstResult.body) {
throw new Error('ONSresolve:Failed to resolve ONS');

@ -1,25 +1,21 @@
import { GroupPubkeyType } from 'libsession_util_nodejs';
import { omit } from 'lodash';
import { Snode } from '../../../data/data';
import { updateIsOnline } from '../../../state/ducks/onion';
import { doSnodeBatchRequest } from './batchRequest';
import { GetNetworkTime } from './getNetworkTime';
import { SnodeNamespace, SnodeNamespaces, SnodeNamespacesGroup } from './namespaces';
import { UserGroupsWrapperActions } from '../../../webworker/workers/browser/libsession_worker_interface';
import { TTL_DEFAULT } from '../../constants';
import { ed25519Str } from '../../onions/onionPath';
import { PubKey } from '../../types';
import { UserUtils } from '../../utils';
import {
RetrieveGroupAdminSubRequestType,
RetrieveGroupSubAccountSubRequestType,
RetrieveLegacyClosedGroupSubRequestType,
RetrieveSubRequestType,
RetrieveGroupSubRequest,
RetrieveLegacyClosedGroupSubRequest,
RetrieveUserSubRequest,
UpdateExpiryOnNodeGroupSubRequest,
UpdateExpiryOnNodeUserSubRequest,
} from './SnodeRequestTypes';
import { SnodeGroupSignature } from './signature/groupSignature';
import { SnodeSignature } from './signature/snodeSignatures';
import { doUnsignedSnodeBatchRequest } from './batchRequest';
import { RetrieveMessagesResultsBatched, RetrieveMessagesResultsContent } from './types';
type RetrieveParams = {
@ -31,23 +27,19 @@ type RetrieveParams = {
async function retrieveRequestForUs({
namespace,
ourPubkey,
retrieveParam,
}: {
ourPubkey: string;
namespace: SnodeNamespaces;
retrieveParam: RetrieveParams;
}) {
if (!SnodeNamespace.isUserConfigNamespace(namespace) && namespace !== SnodeNamespaces.Default) {
throw new Error(`retrieveRequestForUs not a valid namespace to retrieve as us:${namespace}`);
}
const signatureArgs = { ...retrieveParam, namespace, method: 'retrieve' as const, ourPubkey };
const signatureBuilt = await SnodeSignature.getSnodeSignatureParamsUs(signatureArgs);
const retrieveForUS: RetrieveSubRequestType = {
method: 'retrieve',
params: { ...retrieveParam, namespace, ...signatureBuilt },
};
return retrieveForUS;
return new RetrieveUserSubRequest({
last_hash: retrieveParam.last_hash,
max_size: retrieveParam.max_size,
namespace,
});
}
/**
@ -64,7 +56,7 @@ function retrieveRequestForLegacyGroup({
ourPubkey: string;
retrieveParam: RetrieveParams;
}) {
if (pubkey === ourPubkey || !pubkey.startsWith('05')) {
if (pubkey === ourPubkey || !PubKey.is05Pubkey(pubkey)) {
throw new Error(
'namespace -10 can only be used to retrieve messages from a legacy closed group (prefix 05)'
);
@ -72,16 +64,13 @@ function retrieveRequestForLegacyGroup({
if (namespace !== SnodeNamespaces.LegacyClosedGroup) {
throw new Error(`retrieveRequestForLegacyGroup namespace can only be -10`);
}
const retrieveLegacyClosedGroup = {
...retrieveParam,
namespace,
};
const retrieveParamsLegacy: RetrieveLegacyClosedGroupSubRequestType = {
method: 'retrieve',
params: omit(retrieveLegacyClosedGroup, 'timestamp'), // if we give a timestamp, a signature will be required by the service node, and we don't want to provide one as this is an unauthenticated namespace
};
return retrieveParamsLegacy;
// if we give a timestamp, a signature will be required by the service node, and we don't want to provide one as this is an unauthenticated namespace
return new RetrieveLegacyClosedGroupSubRequest({
last_hash: retrieveParam.last_hash,
max_size: retrieveParam.max_size,
legacyGroupPk: pubkey,
});
}
/**
@ -104,34 +93,28 @@ async function retrieveRequestForGroup({
}
const group = await UserGroupsWrapperActions.getGroup(groupPk);
const sigResult = await SnodeGroupSignature.getSnodeGroupSignature({
method: 'retrieve',
return new RetrieveGroupSubRequest({
last_hash: retrieveParam.last_hash,
namespace,
group,
max_size: retrieveParam.max_size,
groupDetailsNeededForSignature: group,
});
const retrieveParamsGroup:
| RetrieveGroupSubAccountSubRequestType
| RetrieveGroupAdminSubRequestType = {
method: 'retrieve',
params: {
...retrieveParam,
...sigResult,
namespace,
},
};
return retrieveParamsGroup;
}
type RetrieveSubRequestType =
| RetrieveLegacyClosedGroupSubRequest
| RetrieveUserSubRequest
| RetrieveGroupSubRequest
| UpdateExpiryOnNodeUserSubRequest
| UpdateExpiryOnNodeGroupSubRequest;
async function buildRetrieveRequest(
lastHashes: Array<string>,
pubkey: string,
namespaces: Array<SnodeNamespaces>,
ourPubkey: string,
configHashesToBump: Array<string> | null
): Promise<Array<RetrieveSubRequestType>> {
) {
const isUs = pubkey === ourPubkey;
const maxSizeMap = SnodeNamespace.maxSizeMap(namespaces);
const now = GetNetworkTime.now();
@ -160,52 +143,42 @@ async function buildRetrieveRequest(
// all legacy closed group retrieves are unauthenticated and run above.
// if we get here, this can only be a retrieve for our own swarm, which must be authenticated
return retrieveRequestForUs({ namespace, ourPubkey, retrieveParam });
return retrieveRequestForUs({ namespace, retrieveParam });
})
);
if (configHashesToBump?.length) {
const expiry = GetNetworkTime.now() + TTL_DEFAULT.CONFIG_MESSAGE;
if (isUs) {
const signResult = await SnodeSignature.generateUpdateExpiryOurSignature({
shortenOrExtend: '',
timestamp: expiry,
messagesHashes: configHashesToBump,
});
const expireParams: UpdateExpiryOnNodeUserSubRequest = {
method: 'expire',
params: {
messages: configHashesToBump,
pubkey: UserUtils.getOurPubKeyStrFromCache(),
expiry,
signature: signResult.signature,
pubkey_ed25519: signResult.pubkey,
},
};
retrieveRequestsParams.push(expireParams);
} else if (PubKey.is03Pubkey(pubkey)) {
const group = await UserGroupsWrapperActions.getGroup(pubkey);
const expiryMs = GetNetworkTime.now() + TTL_DEFAULT.CONFIG_MESSAGE;
const signResult = await SnodeGroupSignature.generateUpdateExpiryGroupSignature({
shortenOrExtend: '',
timestamp: expiry,
messagesHashes: configHashesToBump,
group,
});
if (configHashesToBump?.length && isUs) {
const request = new UpdateExpiryOnNodeUserSubRequest({
expiryMs,
messagesHashes: configHashesToBump,
shortenOrExtend: '',
});
retrieveRequestsParams.push(request);
return retrieveRequestsParams;
}
const expireParams: UpdateExpiryOnNodeGroupSubRequest = {
method: 'expire',
params: {
messages: configHashesToBump,
expiry,
...omit(signResult, 'timestamp'),
pubkey,
},
};
if (configHashesToBump?.length && PubKey.is03Pubkey(pubkey)) {
const group = await UserGroupsWrapperActions.getGroup(pubkey);
retrieveRequestsParams.push(expireParams);
if (!group) {
window.log.warn(
`trying to retrieve fopr group ${ed25519Str(
pubkey
)} but we are missing the details in the usergroup wrapper`
);
throw new Error('retrieve request is missing group details');
}
retrieveRequestsParams.push(
new UpdateExpiryOnNodeGroupSubRequest({
expiryMs,
messagesHashes: configHashesToBump,
shortenOrExtend: '',
groupDetailsNeededForSignature: group,
})
);
}
return retrieveRequestsParams;
}
@ -222,22 +195,18 @@ async function retrieveNextMessages(
throw new Error('namespaces and lasthashes does not match');
}
const retrieveRequestsParams = await buildRetrieveRequest(
const rawRequests = await buildRetrieveRequest(
lastHashes,
associatedWith,
namespaces,
ourPubkey,
configHashesToBump
);
// let exceptions bubble up
// no retry for this one as this a call we do every few seconds while polling for messages
const results = await doSnodeBatchRequest(
retrieveRequestsParams,
targetNode,
4000,
associatedWith
);
const results = await doUnsignedSnodeBatchRequest(rawRequests, targetNode, 4000, associatedWith);
if (!results || !results.length) {
window?.log?.warn(
`_retrieveNextMessages - sessionRpc could not talk to ${targetNode.ip}:${targetNode.port}`

@ -1,8 +1,6 @@
import { GroupPubkeyType } from 'libsession_util_nodejs';
import { PubKey } from '../../types';
import { SubaccountRevokeSubRequest, SubaccountUnrevokeSubRequest } from './SnodeRequestTypes';
import { GetNetworkTime } from './getNetworkTime';
export type RevokeChanges = Array<{
action: 'revoke_subaccount' | 'unrevoke_subaccount';
@ -11,36 +9,35 @@ export type RevokeChanges = Array<{
async function getRevokeSubaccountParams(
groupPk: GroupPubkeyType,
secretKey: Uint8Array,
{
revokeChanges,
unrevokeChanges,
}: { revokeChanges: RevokeChanges; unrevokeChanges: RevokeChanges }
_secretKey: Uint8Array,
_opts: { revokeChanges: RevokeChanges; unrevokeChanges: RevokeChanges }
) {
if (!PubKey.is03Pubkey(groupPk)) {
throw new Error('revokeSubaccountForGroup: not a 03 group');
}
const revokeSubRequest = revokeChanges
? new SubaccountRevokeSubRequest({
groupPk,
revokeTokenHex: revokeChanges.map(m => m.tokenToRevokeHex),
timestamp: GetNetworkTime.now(),
secretKey,
})
: null;
const unrevokeSubRequest = unrevokeChanges.length
? new SubaccountUnrevokeSubRequest({
groupPk,
revokeTokenHex: unrevokeChanges.map(m => m.tokenToRevokeHex),
timestamp: GetNetworkTime.now(),
secretKey,
})
: null;
window.log.warn('getRevokeSubaccountParams to enable once multisig is done'); // TODO audric debugger
// const revokeSubRequest = revokeChanges.length
// ? new SubaccountRevokeSubRequest({
// groupPk,
// revokeTokenHex: revokeChanges.map(m => m.tokenToRevokeHex),
// timestamp: GetNetworkTime.now(),
// secretKey,
// })
// : null;
// const unrevokeSubRequest = unrevokeChanges.length
// ? new SubaccountUnrevokeSubRequest({
// groupPk,
// revokeTokenHex: unrevokeChanges.map(m => m.tokenToRevokeHex),
// timestamp: GetNetworkTime.now(),
// secretKey,
// })
// : null;
return {
revokeSubRequest,
unrevokeSubRequest,
revokeSubRequest: null,
unrevokeSubRequest: null,
};
}

@ -4,6 +4,7 @@ import {
Uint8ArrayLen100,
Uint8ArrayLen64,
UserGroupsGet,
WithGroupPubkey,
} from 'libsession_util_nodejs';
import { isEmpty, isString } from 'lodash';
import {
@ -18,12 +19,7 @@ import { fromUInt8ArrayToBase64, stringToUint8Array } from '../../../utils/Strin
import { PreConditionFailed } from '../../../utils/errors';
import { GetNetworkTime } from '../getNetworkTime';
import { SnodeNamespacesGroup } from '../namespaces';
import {
SignedGroupHashesParams,
WithMessagesHashes,
WithShortenOrExtend,
WithTimestamp,
} from '../types';
import { SignedGroupHashesParams, WithMessagesHashes, WithShortenOrExtend } from '../types';
import { SignatureShared } from './signatureShared';
import { SnodeSignatureResult } from './snodeSignatures';
@ -112,25 +108,12 @@ export type SigResultSubAccount = SigResultAdmin & {
subaccount_sig: string;
};
async function getSnodeGroupSignatureParams(params: SigParamsAdmin): Promise<SigResultAdmin>;
async function getSnodeGroupSignatureParams(
params: SigParamsSubaccount
): Promise<SigResultSubAccount>;
async function getSnodeGroupSignatureParams(
params: SigParamsAdmin | SigParamsSubaccount
): Promise<SigResultSubAccount | SigResultAdmin> {
if ('groupIdentityPrivKey' in params) {
return getSnodeGroupAdminSignatureParams(params);
}
return getSnodeGroupSubAccountSignatureParams(params);
}
async function getSnodeGroupSubAccountSignatureParams(
params: SigParamsSubaccount
): Promise<SigResultSubAccount> {
const { signatureTimestamp, toSign } =
SignatureShared.getVerificationDataForStoreRetrieve(params);
const sigResult = await MetaGroupWrapperActions.swarmSubaccountSign(
params.groupPk,
toSign,
@ -153,7 +136,10 @@ async function getSnodeGroupAdminSignatureParams(params: SigParamsAdmin): Promis
return { ...sigData, pubkey: params.groupPk };
}
type GroupDetailsNeededForSignature = Pick<UserGroupsGet, 'pubkeyHex' | 'authData' | 'secretKey'>;
export type GroupDetailsNeededForSignature = Pick<
UserGroupsGet,
'pubkeyHex' | 'authData' | 'secretKey'
>;
async function getSnodeGroupSignature({
group,
@ -163,7 +149,7 @@ async function getSnodeGroupSignature({
group: GroupDetailsNeededForSignature | null;
method: 'store' | 'retrieve';
namespace: SnodeNamespacesGroup;
}) {
}): Promise<SigResultSubAccount | SigResultAdmin> {
if (!group) {
throw new Error(`getSnodeGroupSignature: did not find group in wrapper`);
}
@ -173,7 +159,7 @@ async function getSnodeGroupSignature({
const groupAuthData = authData && !isEmpty(authData) ? authData : null;
if (groupSecretKey) {
return getSnodeGroupSignatureParams({
return getSnodeGroupAdminSignatureParams({
method,
namespace,
groupPk,
@ -181,7 +167,7 @@ async function getSnodeGroupSignature({
});
}
if (groupAuthData) {
const subAccountSign = await getSnodeGroupSignatureParams({
const subAccountSign = await getSnodeGroupSubAccountSignatureParams({
groupPk,
method,
namespace,
@ -220,20 +206,20 @@ async function signDataWithAdminSecret(
// this is kind of duplicated with `generateUpdateExpirySignature`, but needs to use the authData when secretKey is not available
async function generateUpdateExpiryGroupSignature({
shortenOrExtend,
timestamp,
expiryMs,
messagesHashes,
group,
}: WithMessagesHashes &
WithShortenOrExtend &
WithTimestamp & {
WithShortenOrExtend & {
group: GroupDetailsNeededForSignature | null;
expiryMs: number;
}) {
if (!group || isEmpty(group.pubkeyHex)) {
throw new PreConditionFailed('generateUpdateExpiryGroupSignature groupPk is empty');
}
// "expire" || ShortenOrExtend || expiry || messages[0] || ... || messages[N]
const verificationString = `expire${shortenOrExtend}${timestamp}${messagesHashes.join('')}`;
const verificationString = `expire${shortenOrExtend}${expiryMs}${messagesHashes.join('')}`;
const verificationData = StringUtils.encode(verificationString, 'utf8');
const message = new Uint8Array(verificationData);
@ -249,7 +235,7 @@ async function generateUpdateExpiryGroupSignature({
}
const sodium = await getSodiumRenderer();
const shared = { timestamp, pubkey: groupPk };
const shared = { expiry: expiryMs, pubkey: groupPk }; // expiry and the other fields come from what the expire endpoint expects
if (groupSecretKey) {
return {
@ -257,36 +243,37 @@ async function generateUpdateExpiryGroupSignature({
...shared,
};
}
if (groupAuthData) {
const subaccountSign = await MetaGroupWrapperActions.swarmSubaccountSign(
groupPk,
message,
groupAuthData
if (!groupAuthData) {
// typescript should see this already but doesn't, so let's enforce it.
throw new Error(
`retrieveRequestForGroup: needs either groupSecretKey or authData but both are empty`
);
return {
...subaccountSign,
...shared,
};
}
throw new Error(`generateUpdateExpiryGroupSignature: needs either groupSecretKey or authData`);
const subaccountSign = await MetaGroupWrapperActions.swarmSubaccountSign(
groupPk,
message,
groupAuthData
);
return {
...subaccountSign,
...shared,
};
}
async function getGroupSignatureByHashesParams({
messagesHashes,
method,
pubkey,
}: WithMessagesHashes & {
pubkey: GroupPubkeyType;
method: 'delete';
}): Promise<SignedGroupHashesParams> {
groupPk,
}: WithMessagesHashes &
WithGroupPubkey & {
method: 'delete';
}): Promise<SignedGroupHashesParams> {
const verificationData = StringUtils.encode(`${method}${messagesHashes.join('')}`, 'utf8');
const message = new Uint8Array(verificationData);
const sodium = await getSodiumRenderer();
try {
const group = await UserGroupsWrapperActions.getGroup(pubkey);
const group = await UserGroupsWrapperActions.getGroup(groupPk);
if (!group || !group.secretKey || isEmpty(group.secretKey)) {
throw new Error('getSnodeGroupSignatureByHashesParams needs admin secretKey');
}
@ -295,7 +282,7 @@ async function getGroupSignatureByHashesParams({
return {
signature: signatureBase64,
pubkey,
pubkey: groupPk,
messages: messagesHashes,
};
} catch (e) {

@ -28,10 +28,10 @@ export type SnodeSigParamsUs = SnodeSigParamsShared & {
function getVerificationDataForStoreRetrieve(params: SnodeSigParamsShared) {
const signatureTimestamp = GetNetworkTime.now();
const verificationData = StringUtils.encode(
`${params.method}${params.namespace === 0 ? '' : params.namespace}${signatureTimestamp}`,
'utf8'
);
const verificationString = `${params.method}${
params.namespace === 0 ? '' : params.namespace
}${signatureTimestamp}`;
const verificationData = StringUtils.encode(verificationString, 'utf8');
return {
toSign: new Uint8Array(verificationData),
signatureTimestamp,

@ -1,14 +1,14 @@
import _, { shuffle } from 'lodash';
import _, { isEmpty, sample, shuffle } from 'lodash';
import pRetry from 'p-retry';
import { Data, Snode } from '../../../data/data';
import { ed25519Str } from '../../onions/onionPath';
import { OnionPaths } from '../../onions';
import { Onions, SnodePool } from '.';
import { OnionPaths } from '../../onions';
import { ed25519Str } from '../../onions/onionPath';
import { SeedNodeAPI } from '../seed_node_api';
import { requestSnodesForPubkeyFromNetwork } from './getSwarmFor';
import { ServiceNodesList } from './getServiceNodesList';
import { requestSnodesForPubkeyFromNetwork } from './getSwarmFor';
/**
* If we get less than this snode in a swarm, we fetch new snodes for this pubkey
@ -316,6 +316,20 @@ export async function getSwarmFor(pubkey: string): Promise<Array<Snode>> {
return getSwarmFromNetworkAndSave(pubkey);
}
export async function getNodeFromSwarmOrThrow(pubkey: string): Promise<Snode> {
const swarm = await getSwarmFor(pubkey);
if (!isEmpty(swarm)) {
const node = sample(swarm);
if (node) {
return node;
}
}
window.log.warn(
`getNodeFromSwarmOrThrow: could not get one random node for pk ${ed25519Str(pubkey)}`
);
throw new Error(`getNodeFromSwarmOrThrow: could not get one random node`);
}
/**
* Force a request to be made to the network to fetch the swarm of the specificied pubkey, and cache the result.
* Note: should not be called directly unless you know what you are doing. Use the cached `getSwarmFor()` function instead

@ -1,91 +0,0 @@
import { Snode } from '../../../data/data';
import {
BatchStoreWithExtraParams,
NotEmptyArrayOfBatchResults,
SnodeApiSubRequests,
StoreOnNodeSubRequest,
SubaccountRevokeSubRequest,
SubaccountUnrevokeSubRequest,
isDeleteByHashesParams,
} from './SnodeRequestTypes';
import { doSnodeBatchRequest } from './batchRequest';
import { GetNetworkTime } from './getNetworkTime';
async function buildStoreRequests(
params: Array<BatchStoreWithExtraParams>
): Promise<Array<SnodeApiSubRequests>> {
const storeRequests = await Promise.all(
params.map(p => {
if (isDeleteByHashesParams(p)) {
return {
method: 'delete' as const,
params: p,
};
}
// those requests are already fully contained.
if (p instanceof SubaccountRevokeSubRequest || p instanceof SubaccountUnrevokeSubRequest) {
return p.buildAndSignParameters();
}
const storeRequest: StoreOnNodeSubRequest = {
method: 'store',
params: p,
};
return storeRequest;
})
);
return storeRequests;
}
/**
* Send a 'store' request to the specified targetNode, using params as argument
* @returns the Array of stored hashes if it is a success, or null
*/
async function batchStoreOnNode(
targetNode: Snode,
params: Array<BatchStoreWithExtraParams>,
method: 'batch' | 'sequence'
): Promise<NotEmptyArrayOfBatchResults> {
try {
const subRequests = await buildStoreRequests(params);
const asssociatedWith = (params[0] as any)?.pubkey as string | undefined;
if (!asssociatedWith) {
// not ideal
throw new Error('batchStoreOnNode first subrequest pubkey needs to be set');
}
const result = await doSnodeBatchRequest(
subRequests,
targetNode,
4000,
asssociatedWith,
method
);
if (!result || !result.length) {
window?.log?.warn(
`SessionSnodeAPI::requestSnodesForPubkeyWithTargetNodeRetryable - sessionRpc on ${targetNode.ip}:${targetNode.port} returned falsish value`,
result
);
throw new Error('requestSnodesForPubkeyWithTargetNodeRetryable: Invalid result');
}
const firstResult = result[0];
if (firstResult.code !== 200) {
window?.log?.warn('first result status is not 200 for storeOnNode but: ', firstResult.code);
throw new Error('storeOnNode: Invalid status code');
}
GetNetworkTime.handleTimestampOffsetFromNetwork('store', firstResult.body.t);
return result;
} catch (e) {
window?.log?.warn('store - send error:', e, `destination ${targetNode.ip}:${targetNode.port}`);
throw e;
}
}
export const SnodeAPIStore = { batchStoreOnNode };

@ -26,6 +26,7 @@ import * as snodePool from './snodePool';
import { ConversationModel } from '../../../models/conversation';
import { ConversationTypeEnum } from '../../../models/conversationAttributes';
import { EnvelopePlus } from '../../../receiver/types';
import { updateIsOnline } from '../../../state/ducks/onion';
import { assertUnreachable } from '../../../types/sqlSharedTypes';
import {
@ -40,7 +41,7 @@ import { StringUtils, UserUtils } from '../../utils';
import { sleepFor } from '../../utils/Promise';
import { PreConditionFailed } from '../../utils/errors';
import { LibSessionUtil } from '../../utils/libsession/libsession_utils';
import { SnodeNamespace, SnodeNamespaces, UserConfigNamespaces } from './namespaces';
import { SnodeNamespace, SnodeNamespaces, SnodeNamespacesUserConfig } from './namespaces';
import { PollForGroup, PollForLegacy, PollForUs } from './pollingTypes';
import { SnodeAPIRetrieve } from './retrieveRequest';
import { SwarmPollingGroupConfig } from './swarm_polling_config/SwarmPollingGroupConfig';
@ -596,11 +597,8 @@ export class SwarmPolling {
const closedGroupsOnly = convos.filter(
(c: ConversationModel) =>
(c.isClosedGroupV2() &&
!c.isBlocked() &&
!c.isKickedFromGroup() &&
c.isApproved()) ||
(c.isClosedGroup() && !c.isBlocked() && !c.isKickedFromGroup() )
(c.isClosedGroupV2() && !c.isBlocked() && !c.isKickedFromGroup() && c.isApproved()) ||
(c.isClosedGroup() && !c.isBlocked() && !c.isKickedFromGroup())
);
closedGroupsOnly.forEach(c => {
@ -637,7 +635,7 @@ export class SwarmPolling {
// eslint-disable-next-line consistent-return
public getNamespacesToPollFrom(type: ConversationTypeEnum) {
if (type === ConversationTypeEnum.PRIVATE) {
const toRet: Array<UserConfigNamespaces | SnodeNamespaces.Default> = [
const toRet: Array<SnodeNamespacesUserConfig | SnodeNamespaces.Default> = [
SnodeNamespaces.Default,
SnodeNamespaces.UserProfile,
SnodeNamespaces.UserContacts,
@ -796,7 +794,10 @@ function filterMessagesPerTypeOfConvo<T extends ConversationTypeEnum>(
}
}
async function decryptForGroupV2(retrieveResult: { groupPk: string; content: Uint8Array }) {
async function decryptForGroupV2(retrieveResult: {
groupPk: string;
content: Uint8Array;
}): Promise<EnvelopePlus | null> {
window?.log?.info('received closed group message v2');
try {
const groupPk = retrieveResult.groupPk;
@ -822,7 +823,6 @@ async function decryptForGroupV2(retrieveResult: { groupPk: string; content: Uin
timestamp: parsedEnvelope.timestamp,
};
} catch (e) {
debugger;
window.log.warn('failed to decrypt message with error: ', e.message);
return null;
}
@ -844,6 +844,7 @@ async function handleMessagesForGroupV2(
throw new Error('decryptForGroupV2 returned empty envelope');
}
console.warn('envelopePlus', envelopePlus);
// this is the processing of the message itself, which can be long.
// We allow 1 minute per message at most, which should be plenty
await Receiver.handleSwarmContentDecryptedWithTimeout({

@ -1,5 +1,4 @@
import { GroupPubkeyType, PubkeyType } from 'libsession_util_nodejs';
import { PubKey } from '../../types';
import { SnodeNamespaces } from './namespaces';
import { SubaccountRevokeSubRequest, SubaccountUnrevokeSubRequest } from './SnodeRequestTypes';
@ -29,16 +28,6 @@ export type RetrieveRequestResult = {
};
export type WithMessagesHashes = { messagesHashes: Array<string> };
export type DeleteMessageByHashesGroupSubRequest = WithMessagesHashes & {
pubkey: GroupPubkeyType;
method: 'delete';
};
export type DeleteMessageByHashesUserSubRequest = WithMessagesHashes & {
pubkey: PubkeyType;
method: 'delete';
};
export type RetrieveMessagesResultsBatched = Array<RetrieveRequestResult>;
export type WithTimestamp = { timestamp: number };
@ -51,12 +40,6 @@ export type WithRevokeSubRequest = {
revokeSubRequest: SubaccountRevokeSubRequest | null;
unrevokeSubRequest: SubaccountUnrevokeSubRequest | null;
};
export type WithMessagesToDeleteSubRequest = {
messagesToDelete:
| DeleteMessageByHashesUserSubRequest
| DeleteMessageByHashesGroupSubRequest
| null;
};
export type SignedHashesParams = WithSignature & {
pubkey: PubkeyType;
@ -69,12 +52,6 @@ export type SignedGroupHashesParams = WithSignature & {
messages: Array<string>;
};
export function isDeleteByHashesGroup(
request: DeleteMessageByHashesUserSubRequest | DeleteMessageByHashesGroupSubRequest
): request is DeleteMessageByHashesGroupSubRequest {
return PubKey.is03Pubkey(request.pubkey);
}
/** inherits from https://api.oxen.io/storage-rpc/#/recursive?id=recursive but we only care about these values */
export type ExpireMessageResultItem = WithSignature & {
/** the expiry timestamp that was applied (which might be different from the request expiry */

@ -43,16 +43,6 @@ import { GroupUpdateInviteMessage } from '../messages/outgoing/controlMessage/gr
import { GroupUpdatePromoteMessage } from '../messages/outgoing/controlMessage/group_v2/to_user/GroupUpdatePromoteMessage';
import { OpenGroupVisibleMessage } from '../messages/outgoing/visibleMessage/OpenGroupVisibleMessage';
type ClosedGroupMessageType =
| ClosedGroupVisibleMessage
| ClosedGroupAddedMembersMessage
| ClosedGroupRemovedMembersMessage
| ClosedGroupNameChangeMessage
| ClosedGroupMemberLeftMessage
| ExpirationTimerUpdateMessage
| ClosedGroupEncryptionPairMessage
| UnsendMessage;
// ClosedGroupEncryptionPairReplyMessage must be sent to a user pubkey. Not a group.
export class MessageQueue {
@ -96,7 +86,7 @@ export class MessageQueue {
blinded: boolean;
filesToLink: Array<number>;
}) {
// Skipping the queue for Open Groups v2; the message is sent directly
// Skipping the MessageQueue for Open Groups v2; the message is sent directly
try {
// NOTE Reactions are handled separately
@ -132,7 +122,7 @@ export class MessageQueue {
`Failed to send message to open group: ${roomInfos.serverUrl}:${roomInfos.roomId}:`,
e
);
await MessageSentHandler.handleMessageSentFailure(
await MessageSentHandler.handlePublicMessageSentFailure(
message,
e || new Error('Failed to send message to open group.')
);
@ -172,7 +162,7 @@ export class MessageQueue {
`Failed to send message to open group: ${roomInfos.serverUrl}:${roomInfos.roomId}:`,
e.message
);
await MessageSentHandler.handleMessageSentFailure(
await MessageSentHandler.handlePublicMessageSentFailure(
message,
e || new Error('Failed to send message to open group.')
);
@ -189,7 +179,15 @@ export class MessageQueue {
groupPubKey,
sentCb,
}: {
message: ClosedGroupMessageType;
message:
| ClosedGroupVisibleMessage
| ClosedGroupAddedMembersMessage
| ClosedGroupRemovedMembersMessage
| ClosedGroupNameChangeMessage
| ClosedGroupMemberLeftMessage
| ExpirationTimerUpdateMessage
| ClosedGroupEncryptionPairMessage
| UnsendMessage;
namespace: SnodeNamespacesLegacyGroup;
sentCb?: (message: OutgoingRawMessage) => Promise<void>;
groupPubKey?: PubKey;
@ -251,6 +249,7 @@ export class MessageQueue {
message,
namespace: message.namespace,
pubkey: PubKey.cast(message.destination),
isSyncMessage: false,
});
}
@ -271,6 +270,7 @@ export class MessageQueue {
message,
namespace,
pubkey: PubKey.cast(destination),
isSyncMessage: false,
});
}
@ -314,7 +314,7 @@ export class MessageQueue {
| GroupUpdatePromoteMessage;
namespace: SnodeNamespaces.Default;
}): Promise<number | null> {
return this.sendToPubKeyNonDurably({ message, namespace, pubkey });
return this.sendToPubKeyNonDurably({ message, namespace, pubkey, isSyncMessage: false });
}
/**
@ -326,30 +326,55 @@ export class MessageQueue {
namespace,
message,
pubkey,
isSyncMessage,
}: {
pubkey: PubKey;
message: ContentMessage;
namespace: SnodeNamespaces;
isSyncMessage: boolean;
}): Promise<number | null> {
let rawMessage;
const rawMessage = await MessageUtils.toRawMessage(pubkey, message, namespace);
return this.sendSingleMessageAndHandleResult({ rawMessage, isSyncMessage });
}
private async sendSingleMessageAndHandleResult({
rawMessage,
isSyncMessage,
}: {
rawMessage: OutgoingRawMessage;
isSyncMessage: boolean;
}) {
try {
rawMessage = await MessageUtils.toRawMessage(pubkey, message, namespace);
const { wrappedEnvelope, effectiveTimestamp } = await MessageSender.send({
const { wrappedEnvelope, effectiveTimestamp } = await MessageSender.sendSingleMessage({
message: rawMessage,
isSyncMessage: false,
isSyncMessage,
});
await MessageSentHandler.handleMessageSentSuccess(
await MessageSentHandler.handleSwarmMessageSentSuccess(
rawMessage,
effectiveTimestamp,
wrappedEnvelope
);
const cb = this.pendingMessageCache.callbacks.get(rawMessage.identifier);
if (cb) {
await cb(rawMessage);
}
this.pendingMessageCache.callbacks.delete(rawMessage.identifier);
return effectiveTimestamp;
} catch (error) {
window.log.error('failed to send message with: ', error.message);
window.log.error(
'sendSingleMessageAndHandleResult: failed to send message with: ',
error.message
);
if (rawMessage) {
await MessageSentHandler.handleMessageSentFailure(rawMessage, error);
await MessageSentHandler.handleSwarmMessageSentFailure(rawMessage, error);
}
return null;
} finally {
// Remove from the cache because retrying is done in the sender
void this.pendingMessageCache.remove(rawMessage);
}
}
@ -368,30 +393,7 @@ export class MessageQueue {
if (!jobQueue.has(messageId)) {
// We put the event handling inside this job to avoid sending duplicate events
const job = async () => {
try {
const { wrappedEnvelope, effectiveTimestamp } = await MessageSender.send({
message,
isSyncMessage,
});
await MessageSentHandler.handleMessageSentSuccess(
message,
effectiveTimestamp,
wrappedEnvelope
);
const cb = this.pendingMessageCache.callbacks.get(message.identifier);
if (cb) {
await cb(message);
}
this.pendingMessageCache.callbacks.delete(message.identifier);
} catch (error) {
void MessageSentHandler.handleMessageSentFailure(message, error);
} finally {
// Remove from the cache because retrying is done in the sender
void this.pendingMessageCache.remove(message);
}
await this.sendSingleMessageAndHandleResult({ rawMessage: message, isSyncMessage });
};
await jobQueue.addWithId(messageId, job);
}
@ -420,9 +422,8 @@ export class MessageQueue {
isGroup = false
): Promise<void> {
// Don't send to ourselves
const us = UserUtils.getOurPubKeyFromCache();
let isSyncMessage = false;
if (us && destinationPk.isEqual(us)) {
if (UserUtils.isUsFromCache(destinationPk)) {
// We allow a message for ourselves only if it's a ClosedGroupNewMessage,
// or a message with a syncTarget set.

@ -1,12 +1,12 @@
// REMOVE COMMENT AFTER: This can just export pure functions as it doesn't need state
import { AbortController } from 'abort-controller';
import ByteBuffer from 'bytebuffer';
import { GroupPubkeyType, PubkeyType } from 'libsession_util_nodejs';
import { compact, isEmpty, isNumber, isString, sample } from 'lodash';
import { compact, isArray, isEmpty, isNumber, isString } from 'lodash';
import pRetry from 'p-retry';
import { Data } from '../../data/data';
import { Data, SeenMessageHashes } from '../../data/data';
import { SignalService } from '../../protobuf';
import { assertUnreachable } from '../../types/sqlSharedTypes';
import { UserGroupsWrapperActions } from '../../webworker/workers/browser/libsession_worker_interface';
import { OpenGroupRequestCommonType } from '../apis/open_group_api/opengroupV2/ApiUtil';
import { OpenGroupMessageV2 } from '../apis/open_group_api/opengroupV2/OpenGroupMessageV2';
@ -15,11 +15,31 @@ import {
sendSogsMessageOnionV4,
} from '../apis/open_group_api/sogsv3/sogsV3SendMessage';
import {
BuiltSnodeSubRequests,
DeleteAllFromUserNodeSubRequest,
DeleteHashesFromGroupNodeSubRequest,
DeleteHashesFromUserNodeSubRequest,
GetExpiriesFromNodeSubRequest,
GetServiceNodesSubRequest,
MethodBatchType,
NetworkTimeSubRequest,
NotEmptyArrayOfBatchResults,
StoreOnNodeData,
StoreOnNodeParams,
StoreOnNodeParamsNoSig,
OnsResolveSubRequest,
RawSnodeSubRequests,
RetrieveGroupSubRequest,
RetrieveLegacyClosedGroupSubRequest,
RetrieveUserSubRequest,
StoreGroupConfigOrMessageSubRequest,
StoreLegacyGroupMessageSubRequest,
StoreUserConfigSubRequest,
StoreUserMessageSubRequest,
SubaccountRevokeSubRequest,
SubaccountUnrevokeSubRequest,
SwarmForSubRequest,
UpdateExpiryOnNodeGroupSubRequest,
UpdateExpiryOnNodeUserSubRequest,
} from '../apis/snode_api/SnodeRequestTypes';
import { doUnsignedSnodeBatchRequest } from '../apis/snode_api/batchRequest';
import { GetNetworkTime } from '../apis/snode_api/getNetworkTime';
import { SnodeNamespace, SnodeNamespaces } from '../apis/snode_api/namespaces';
import {
@ -28,8 +48,7 @@ import {
SnodeGroupSignature,
} from '../apis/snode_api/signature/groupSignature';
import { SnodeSignature, SnodeSignatureResult } from '../apis/snode_api/signature/snodeSignatures';
import { getSwarmFor } from '../apis/snode_api/snodePool';
import { SnodeAPIStore } from '../apis/snode_api/storeMessage';
import { getNodeFromSwarmOrThrow } from '../apis/snode_api/snodePool';
import { WithMessagesHashes, WithRevokeSubRequest } from '../apis/snode_api/types';
import { TTL_DEFAULT } from '../constants';
import { ConvoHub } from '../conversations';
@ -44,7 +63,6 @@ import { PubKey } from '../types';
import { OutgoingRawMessage } from '../types/RawMessage';
import { UserUtils } from '../utils';
import { fromUInt8ArrayToBase64 } from '../utils/String';
import { EmptySwarmError } from '../utils/errors';
// ================ SNODE STORE ================
@ -70,7 +88,7 @@ function isContentSyncMessage(message: ContentMessage) {
* @param attempts The amount of times to attempt sending. Minimum value is 1.
*/
async function send({
async function sendSingleMessage({
message,
retryMinTimeout = 100,
attempts = 3,
@ -111,9 +129,6 @@ async function send({
found.set({ sent_at: encryptedAndWrapped.networkTimestamp });
await found.commit();
}
let foundMessage = encryptedAndWrapped.identifier
? await Data.getMessageById(encryptedAndWrapped.identifier)
: null;
const isSyncedDeleteAfterReadMessage =
found &&
@ -129,63 +144,74 @@ async function send({
overridenTtl = asMs;
}
const batchResult = await MessageSender.sendMessagesDataToSnode(
[
{
pubkey: destination,
data64: encryptedAndWrapped.data64,
ttl: overridenTtl,
timestamp: encryptedAndWrapped.networkTimestamp,
const subRequests: Array<RawSnodeSubRequests> = [];
if (
SnodeNamespace.isUserConfigNamespace(encryptedAndWrapped.namespace) &&
PubKey.is05Pubkey(destination)
) {
subRequests.push(
new StoreUserConfigSubRequest({
encryptedData: encryptedAndWrapped.encryptedAndWrappedData,
namespace: encryptedAndWrapped.namespace,
},
],
destination,
{ messagesHashes: [], revokeSubRequest: null, unrevokeSubRequest: null },
'batch'
);
ttlMs: overridenTtl,
})
);
} else if (
encryptedAndWrapped.namespace === SnodeNamespaces.Default &&
PubKey.is05Pubkey(destination)
) {
subRequests.push(
new StoreUserMessageSubRequest({
encryptedData: encryptedAndWrapped.encryptedAndWrappedData,
dbMessageIdentifier: encryptedAndWrapped.identifier || null,
ttlMs: overridenTtl,
destination,
})
);
} else if (
SnodeNamespace.isGroupConfigNamespace(encryptedAndWrapped.namespace) &&
PubKey.is03Pubkey(destination)
) {
subRequests.push(
new StoreGroupConfigOrMessageSubRequest({
encryptedData: encryptedAndWrapped.encryptedAndWrappedData,
namespace: encryptedAndWrapped.namespace,
ttlMs: overridenTtl,
groupPk: destination,
dbMessageIdentifier: encryptedAndWrapped.identifier || null,
})
);
} else if (
encryptedAndWrapped.namespace === SnodeNamespaces.ClosedGroupMessages &&
PubKey.is03Pubkey(destination)
) {
subRequests.push(
new StoreGroupConfigOrMessageSubRequest({
encryptedData: encryptedAndWrapped.encryptedAndWrappedData,
namespace: encryptedAndWrapped.namespace,
ttlMs: overridenTtl,
groupPk: destination,
dbMessageIdentifier: encryptedAndWrapped.identifier || null,
})
);
} else {
window.log.error('unhandled sendSingleMessage case');
throw new Error('unhandled sendSingleMessage case');
}
const isDestinationClosedGroup = ConvoHub.use().get(recipient.key)?.isClosedGroup();
const storedAt = batchResult?.[0]?.body?.t;
const storedHash = batchResult?.[0]?.body?.hash;
const targetNode = await getNodeFromSwarmOrThrow(destination);
if (
batchResult &&
!isEmpty(batchResult) &&
batchResult[0].code === 200 &&
!isEmpty(storedHash) &&
isString(storedHash) &&
isNumber(storedAt)
) {
// TODO: the expiration is due to be returned by the storage server on "store" soon, we will then be able to use it instead of doing the storedAt + ttl logic below
// if we have a hash and a storedAt, mark it as seen so we don't reprocess it on the next retrieve
await Data.saveSeenMessageHashes([
{
expiresAt: encryptedAndWrapped.networkTimestamp + TTL_DEFAULT.CONTENT_MESSAGE, // non config msg expire at TTL_MAX at most
hash: storedHash,
},
]);
// If message also has a sync message, save that hash. Otherwise save the hash from the regular message send i.e. only closed groups in this case.
const batchResult = await doUnsignedSnodeBatchRequest(
subRequests,
targetNode,
6000,
destination
);
if (
encryptedAndWrapped.identifier &&
(encryptedAndWrapped.isSyncMessage || isDestinationClosedGroup)
) {
// get a fresh copy of the message from the DB
foundMessage = await Data.getMessageById(encryptedAndWrapped.identifier);
if (foundMessage) {
await foundMessage.updateMessageHash(storedHash);
await foundMessage.commit();
window?.log?.info(
`updated message ${foundMessage.get('id')} with hash: ${foundMessage.get(
'messageHash'
)}`
);
}
}
}
await handleBatchResultWithSubRequests({ batchResult, subRequests, destination });
return {
wrappedEnvelope: encryptedAndWrapped.data,
wrappedEnvelope: encryptedAndWrapped.encryptedAndWrappedData,
effectiveTimestamp: encryptedAndWrapped.networkTimestamp,
};
},
@ -198,11 +224,11 @@ async function send({
}
async function getSignatureParamsFromNamespace(
item: StoreOnNodeParamsNoSig,
{ namespace }: { namespace: SnodeNamespaces },
destination: string
): Promise<SigResultSubAccount | SigResultAdmin | SnodeSignatureResult | object> {
const store = 'store' as const;
if (SnodeNamespace.isUserConfigNamespace(item.namespace)) {
if (SnodeNamespace.isUserConfigNamespace(namespace)) {
const ourPrivKey = (await UserUtils.getUserED25519KeyPairBytes())?.privKeyBytes;
if (!ourPrivKey) {
throw new Error(
@ -211,14 +237,14 @@ async function getSignatureParamsFromNamespace(
}
return SnodeSignature.getSnodeSignatureParamsUs({
method: store,
namespace: item.namespace,
namespace,
});
}
if (
SnodeNamespace.isGroupConfigNamespace(item.namespace) ||
item.namespace === SnodeNamespaces.ClosedGroupMessages ||
item.namespace === SnodeNamespaces.ClosedGroupRevokedRetrievableMessages
SnodeNamespace.isGroupConfigNamespace(namespace) ||
namespace === SnodeNamespaces.ClosedGroupMessages ||
namespace === SnodeNamespaces.ClosedGroupRevokedRetrievableMessages
) {
if (!PubKey.is03Pubkey(destination)) {
throw new Error(
@ -228,7 +254,7 @@ async function getSignatureParamsFromNamespace(
const found = await UserGroupsWrapperActions.getGroup(destination);
return SnodeGroupSignature.getSnodeGroupSignature({
method: store,
namespace: item.namespace,
namespace,
group: found,
});
}
@ -236,97 +262,138 @@ async function getSignatureParamsFromNamespace(
return {};
}
async function signDeleteHashesRequest(
destination: PubkeyType | GroupPubkeyType,
messagesHashes: Array<string>
) {
if (isEmpty(messagesHashes)) {
return null;
}
const signedRequest = messagesHashes
? PubKey.is03Pubkey(destination)
? await SnodeGroupSignature.getGroupSignatureByHashesParams({
messagesHashes,
pubkey: destination,
method: 'delete',
})
: await SnodeSignature.getSnodeSignatureByHashesParams({
messagesHashes,
pubkey: destination,
method: 'delete',
})
: null;
return signedRequest || null;
async function signSubRequests(
params: Array<RawSnodeSubRequests>
): Promise<Array<BuiltSnodeSubRequests>> {
const signedRequests: Array<BuiltSnodeSubRequests> = await Promise.all(
params.map(p => {
if (
p instanceof SubaccountRevokeSubRequest ||
p instanceof SubaccountUnrevokeSubRequest ||
p instanceof DeleteHashesFromUserNodeSubRequest ||
p instanceof DeleteHashesFromGroupNodeSubRequest ||
p instanceof DeleteAllFromUserNodeSubRequest ||
p instanceof StoreGroupConfigOrMessageSubRequest ||
p instanceof StoreLegacyGroupMessageSubRequest ||
p instanceof StoreUserConfigSubRequest ||
p instanceof StoreUserMessageSubRequest ||
p instanceof RetrieveUserSubRequest ||
p instanceof RetrieveGroupSubRequest ||
p instanceof UpdateExpiryOnNodeUserSubRequest ||
p instanceof UpdateExpiryOnNodeGroupSubRequest ||
p instanceof GetExpiriesFromNodeSubRequest
) {
return p.buildAndSignParameters();
}
if (
p instanceof RetrieveLegacyClosedGroupSubRequest ||
p instanceof SwarmForSubRequest ||
p instanceof OnsResolveSubRequest ||
p instanceof GetServiceNodesSubRequest ||
p instanceof NetworkTimeSubRequest
) {
return p.build();
}
assertUnreachable(
p,
'If you see this error, you need to add the handling of the rawRequest above'
);
throw new Error(
'If you see this error, you need to add the handling of the rawRequest above'
);
})
);
return signedRequests;
}
async function sendMessagesDataToSnode(
params: Array<StoreOnNodeParamsNoSig>,
destination: PubkeyType | GroupPubkeyType,
storeRequests: Array<
| StoreGroupConfigOrMessageSubRequest
| StoreUserConfigSubRequest
| StoreUserMessageSubRequest
| StoreLegacyGroupMessageSubRequest
>,
asssociatedWith: PubkeyType | GroupPubkeyType,
{
messagesHashes: messagesToDelete,
revokeSubRequest,
unrevokeSubRequest,
}: WithMessagesHashes & WithRevokeSubRequest,
method: 'batch' | 'sequence'
method: MethodBatchType
): Promise<NotEmptyArrayOfBatchResults> {
const rightDestination = params.filter(m => m.pubkey === destination);
const swarm = await getSwarmFor(destination);
const withSigWhenRequired: Array<StoreOnNodeParams> = await Promise.all(
rightDestination.map(async item => {
// some namespaces require a signature to be added
const signOpts = await getSignatureParamsFromNamespace(item, destination);
const store: StoreOnNodeParams = {
data: item.data64,
namespace: item.namespace,
pubkey: item.pubkey,
timestamp: item.timestamp, // sig_timestamp is unused and uneeded
ttl: item.ttl,
...signOpts,
};
return store;
})
);
if (!asssociatedWith) {
throw new Error('sendMessagesDataToSnode first subrequest pubkey needs to be set');
}
const snode = sample(swarm);
if (!snode) {
throw new EmptySwarmError(destination, 'Ran out of swarm nodes to query');
const deleteHashesSubRequest = !messagesToDelete.length
? null
: PubKey.is05Pubkey(asssociatedWith)
? new DeleteHashesFromUserNodeSubRequest({ messagesHashes: messagesToDelete })
: new DeleteHashesFromGroupNodeSubRequest({
messagesHashes: messagesToDelete,
groupPk: asssociatedWith,
});
if (storeRequests.some(m => m.destination !== asssociatedWith)) {
throw new Error(
'sendMessagesDataToSnode tried to send batchrequest containing subrequest not for the right destination'
);
}
const signedDeleteHashesRequest = await signDeleteHashesRequest(destination, messagesToDelete);
const rawRequests = compact([
...storeRequests,
deleteHashesSubRequest,
revokeSubRequest,
unrevokeSubRequest,
]);
try {
// No pRetry here as if this is a bad path it will be handled and retried in lokiOnionFetch.
const storeResults = await SnodeAPIStore.batchStoreOnNode(
snode,
compact([
...withSigWhenRequired,
signedDeleteHashesRequest,
revokeSubRequest,
unrevokeSubRequest,
]),
const targetNode = await getNodeFromSwarmOrThrow(asssociatedWith);
try {
const storeResults = await doUnsignedSnodeBatchRequest(
rawRequests,
targetNode,
4000,
asssociatedWith,
method
);
if (!storeResults || !storeResults.length) {
window?.log?.warn(
`SessionSnodeAPI::doSnodeBatchRequest on ${targetNode.ip}:${targetNode.port} returned falsish value`,
storeResults
);
throw new Error('doSnodeBatchRequest: Invalid result');
}
const firstResult = storeResults[0];
if (firstResult.code !== 200) {
window?.log?.warn(
'first result status is not 200 for sendMessagesDataToSnode but: ',
firstResult.code
);
throw new Error('sendMessagesDataToSnode: Invalid status code');
}
GetNetworkTime.handleTimestampOffsetFromNetwork('store', firstResult.body.t);
if (!isEmpty(storeResults)) {
window?.log?.info(
`sendMessagesDataToSnode - Successfully stored messages to ${ed25519Str(destination)} via ${
snode.ip
}:${snode.port} on namespaces: ${SnodeNamespace.toRoles(
rightDestination.map(m => m.namespace)
).join(',')}`
`sendMessagesDataToSnode - Successfully stored messages to ${ed25519Str(
asssociatedWith
)} via ${targetNode.ip}:${targetNode.port}`
);
}
return storeResults;
} catch (e) {
const snodeStr = snode ? `${snode.ip}:${snode.port}` : 'null';
const snodeStr = targetNode ? `${targetNode.ip}:${targetNode.port}` : 'null';
window?.log?.warn(
`sendMessagesDataToSnode - "${e.code}:${e.message}" to ${destination} via snode:${snodeStr}`
`sendMessagesDataToSnode - "${e.code}:${e.message}" to ${asssociatedWith} via snode:${snodeStr}`
);
throw e;
}
@ -353,9 +420,8 @@ type EncryptAndWrapMessage = {
} & SharedEncryptAndWrap;
type EncryptAndWrapMessageResults = {
data64: string;
networkTimestamp: number;
data: Uint8Array;
encryptedAndWrappedData: Uint8Array;
namespace: number;
} & SharedEncryptAndWrap;
@ -374,7 +440,7 @@ async function encryptForGroupV2(
networkTimestamp,
} = params;
const envelope = await wrapContentIntoEnvelope(
const envelope = wrapContentIntoEnvelope(
SignalService.Envelope.Type.CLOSED_GROUP_MESSAGE,
destination,
networkTimestamp,
@ -389,12 +455,9 @@ async function encryptForGroupV2(
encryptionBasedOnConversation(recipient)
);
const data64 = ByteBuffer.wrap(cipherText).toString('base64');
return {
data64,
networkTimestamp,
data: cipherText,
encryptedAndWrappedData: cipherText,
namespace,
ttl,
identifier,
@ -429,19 +492,17 @@ async function encryptMessageAndWrap(
encryptionBasedOnConversation(recipient)
);
const envelope = await wrapContentIntoEnvelope(
const envelope = wrapContentIntoEnvelope(
envelopeType,
recipient.key,
networkTimestamp,
cipherText
);
const data = wrapEnvelopeInWebSocketMessage(envelope);
const data64 = ByteBuffer.wrap(data).toString('base64');
return {
data64,
encryptedAndWrappedData: data,
networkTimestamp,
data,
namespace,
ttl,
identifier,
@ -457,7 +518,6 @@ async function encryptMessagesAndWrap(
/**
* Send an array of preencrypted data to the corresponding swarm.
* Used currently only for sending libsession GroupInfo, GroupMembers and groupKeys config updates.
*
* @param params the data to deposit
* @param destination the pubkey we should deposit those message to
@ -465,12 +525,12 @@ async function encryptMessagesAndWrap(
*/
async function sendEncryptedDataToSnode({
destination,
encryptedData,
storeRequests,
messagesHashesToDelete,
revokeSubRequest,
unrevokeSubRequest,
}: WithRevokeSubRequest & {
encryptedData: Array<StoreOnNodeData>;
storeRequests: Array<StoreGroupConfigOrMessageSubRequest | StoreUserConfigSubRequest>;
destination: GroupPubkeyType | PubkeyType;
messagesHashesToDelete: Set<string> | null;
}): Promise<NotEmptyArrayOfBatchResults | null> {
@ -478,13 +538,7 @@ async function sendEncryptedDataToSnode({
const batchResults = await pRetry(
async () => {
return MessageSender.sendMessagesDataToSnode(
encryptedData.map(content => ({
pubkey: destination,
data64: ByteBuffer.wrap(content.data).toString('base64'),
ttl: content.ttl,
timestamp: content.networkTimestamp,
namespace: content.namespace,
})),
storeRequests,
destination,
{
messagesHashes: [...(messagesHashesToDelete || [])],
@ -513,12 +567,12 @@ async function sendEncryptedDataToSnode({
}
}
async function wrapContentIntoEnvelope(
function wrapContentIntoEnvelope(
type: SignalService.Envelope.Type,
sskSource: string | undefined,
timestamp: number,
content: Uint8Array
): Promise<SignalService.Envelope> {
): SignalService.Envelope {
let source: string | undefined;
if (type === SignalService.Envelope.Type.CLOSED_GROUP_MESSAGE) {
@ -612,7 +666,76 @@ export const MessageSender = {
sendEncryptedDataToSnode,
getMinRetryTimeout,
sendToOpenGroupV2,
send,
sendSingleMessage,
isContentSyncMessage,
wrapContentIntoEnvelope,
getSignatureParamsFromNamespace,
signSubRequests,
};
async function handleBatchResultWithSubRequests({
batchResult,
destination,
subRequests,
}: {
batchResult: NotEmptyArrayOfBatchResults;
subRequests: Array<RawSnodeSubRequests>;
destination: string;
}) {
const isDestinationClosedGroup = ConvoHub.use().get(destination)?.isClosedGroup();
if (!batchResult || !isArray(batchResult) || isEmpty(batchResult)) {
window.log.error('handleBatchResultWithSubRequests: invalid batch result ');
return;
}
const us = UserUtils.getOurPubKeyStrFromCache();
const seenHashes: Array<SeenMessageHashes> = [];
for (let index = 0; index < subRequests.length; index++) {
const subRequest = subRequests[index];
// there are some stuff we need to do when storing a message (for a group/legacy group or user, but no config messages)
if (
subRequest instanceof StoreGroupConfigOrMessageSubRequest ||
subRequest instanceof StoreLegacyGroupMessageSubRequest ||
subRequest instanceof StoreUserMessageSubRequest
) {
const storedAt = batchResult?.[index]?.body?.t;
const storedHash = batchResult?.[index]?.body?.hash;
const subRequestStatusCode = batchResult?.[index]?.code;
// TODO: the expiration is due to be returned by the storage server on "store" soon, we will then be able to use it instead of doing the storedAt + ttl logic below
// if we have a hash and a storedAt, mark it as seen so we don't reprocess it on the next retrieve
if (
subRequestStatusCode === 200 &&
!isEmpty(storedHash) &&
isString(storedHash) &&
isNumber(storedAt)
) {
seenHashes.push({
expiresAt: GetNetworkTime.now() + TTL_DEFAULT.CONTENT_MESSAGE, // non config msg expire at CONTENT_MESSAGE at most
hash: storedHash,
});
// We need to store the hash of our synced message when for a 1o1. (as this is the one stored on our swarm)
// For groups, we can just store that hash directly as the group's swarm is hosting all of the group messages
if (
subRequest.dbMessageIdentifier &&
(subRequest.destination === us || isDestinationClosedGroup)
) {
// get a fresh copy of the message from the DB
/* eslint-disable no-await-in-loop */
const foundMessage = await Data.getMessageById(subRequest.dbMessageIdentifier);
if (foundMessage) {
await foundMessage.updateMessageHash(storedHash);
await foundMessage.commit();
window?.log?.info(`updated message ${foundMessage.get('id')} with hash: ${storedHash}`);
}
/* eslint-enable no-await-in-loop */
}
}
}
}
await Data.saveSeenMessageHashes(seenHashes);
}

@ -1,4 +1,4 @@
import _ from 'lodash';
import { union } from 'lodash';
import { Data } from '../../data/data';
import { SignalService } from '../../protobuf';
import { PnServer } from '../apis/push_notification_api';
@ -41,7 +41,27 @@ async function handlePublicMessageSentSuccess(
}
}
async function handleMessageSentSuccess(
async function handlePublicMessageSentFailure(sentMessage: OpenGroupVisibleMessage, error: any) {
const fetchedMessage = await fetchHandleMessageSentData(sentMessage.identifier);
if (!fetchedMessage) {
return;
}
if (error instanceof Error) {
await fetchedMessage.saveErrors(error);
}
// always mark the message as sent.
// the fact that we have errors on the sent is based on the saveErrors()
fetchedMessage.set({
sent: true,
});
await fetchedMessage.commit();
await fetchedMessage.getConversation()?.updateLastMessage();
}
async function handleSwarmMessageSentSuccess(
sentMessage: OutgoingRawMessage,
effectiveTimestamp: number,
wrappedEnvelope?: Uint8Array
@ -86,16 +106,10 @@ async function handleMessageSentSuccess(
const hasBodyOrAttachments = Boolean(
dataMessage && (dataMessage.body || (dataMessage.attachments && dataMessage.attachments.length))
);
const shouldNotifyPushServer = hasBodyOrAttachments && !isOurDevice;
if (shouldNotifyPushServer) {
// notify the push notification server if needed
if (!wrappedEnvelope) {
window?.log?.warn('Should send PN notify but no wrapped envelope set.');
} else {
// we do not really care about the result, neither of waiting for it
void PnServer.notifyPnServer(wrappedEnvelope, sentMessage.device);
}
if (hasBodyOrAttachments && !isOurDevice && wrappedEnvelope) {
// we do not really care about the result, neither of waiting for it
void PnServer.notifyPnServer(wrappedEnvelope, sentMessage.device);
}
// Handle the sync logic here
@ -119,7 +133,7 @@ async function handleMessageSentSuccess(
fetchedMessage.set({ synced: true });
}
sentTo = _.union(sentTo, [sentMessage.device]);
sentTo = union(sentTo, [sentMessage.device]);
fetchedMessage.set({
sent_to: sentTo,
@ -133,10 +147,7 @@ async function handleMessageSentSuccess(
fetchedMessage.getConversation()?.updateLastMessage();
}
async function handleMessageSentFailure(
sentMessage: OutgoingRawMessage | OpenGroupVisibleMessage,
error: any
) {
async function handleSwarmMessageSentFailure(sentMessage: OutgoingRawMessage, error: any) {
const fetchedMessage = await fetchHandleMessageSentData(sentMessage.identifier);
if (!fetchedMessage) {
return;
@ -168,7 +179,7 @@ async function handleMessageSentFailure(
expirationStartTimestamp: undefined,
});
window.log.warn(
`[handleMessageSentFailure] Stopping a message from disppearing until we retry the send operation. messageId: ${fetchedMessage.get(
`[handleSwarmMessageSentFailure] Stopping a message from disppearing until we retry the send operation. messageId: ${fetchedMessage.get(
'id'
)}`
);
@ -198,6 +209,7 @@ async function fetchHandleMessageSentData(messageIdentifier: string) {
export const MessageSentHandler = {
handlePublicMessageSentSuccess,
handleMessageSentSuccess,
handleMessageSentFailure,
handlePublicMessageSentFailure,
handleSwarmMessageSentFailure,
handleSwarmMessageSentSuccess,
};

@ -537,13 +537,13 @@ export async function USER_callRecipient(recipient: string) {
// Note: we do the sending of the preoffer manually as the sendTo1o1NonDurably rely on having a message saved to the db for MessageSentSuccess
// which is not the case for a pre offer message (the message only exists in memory)
const rawMessage = await MessageUtils.toRawMessage(
const rawPreOffer = await MessageUtils.toRawMessage(
PubKey.cast(recipient),
preOfferMsg,
SnodeNamespaces.Default
);
const { wrappedEnvelope } = await MessageSender.send({
message: rawMessage,
const { wrappedEnvelope } = await MessageSender.sendSingleMessage({
message: rawPreOffer,
isSyncMessage: false,
});
void PnServer.notifyPnServer(wrappedEnvelope, recipient);

@ -6,7 +6,10 @@ import { SignalService } from '../../../../protobuf';
import { assertUnreachable } from '../../../../types/sqlSharedTypes';
import { isSignInByLinking } from '../../../../util/storage';
import { MetaGroupWrapperActions } from '../../../../webworker/workers/browser/libsession_worker_interface';
import { StoreOnNodeData } from '../../../apis/snode_api/SnodeRequestTypes';
import {
StoreGroupConfigOrMessageSubRequest,
StoreGroupExtraData,
} from '../../../apis/snode_api/SnodeRequestTypes';
import { GetNetworkTime } from '../../../apis/snode_api/getNetworkTime';
import { SnodeNamespaces } from '../../../apis/snode_api/namespaces';
import { WithRevokeSubRequest } from '../../../apis/snode_api/types';
@ -70,29 +73,100 @@ async function confirmPushedAndDump(
return LibSessionUtil.saveDumpsToDb(groupPk);
}
async function storeGroupUpdateMessages({
updateMessages,
groupPk,
}: WithGroupPubkey & {
updateMessages: Array<GroupUpdateMemberChangeMessage | GroupUpdateInfoChangeMessage>;
}) {
if (!updateMessages.length) {
return true;
}
const updateMessagesToEncrypt: Array<StoreGroupExtraData> = updateMessages.map(updateMessage => {
const wrapped = MessageSender.wrapContentIntoEnvelope(
SignalService.Envelope.Type.SESSION_MESSAGE,
undefined,
updateMessage.createAtNetworkTimestamp, // message is signed with this timestmap
updateMessage.plainTextBuffer()
);
return {
namespace: SnodeNamespaces.ClosedGroupMessages,
pubkey: groupPk,
ttl: TTL_DEFAULT.CONTENT_MESSAGE,
networkTimestamp: updateMessage.createAtNetworkTimestamp,
data: SignalService.Envelope.encode(wrapped).finish(),
dbMessageIdentifier: updateMessage.identifier,
};
});
const encryptedUpdate = updateMessagesToEncrypt
? await MetaGroupWrapperActions.encryptMessages(
groupPk,
updateMessagesToEncrypt.map(m => m.data)
)
: [];
const updateMessagesEncrypted = updateMessagesToEncrypt.map((requestDetails, index) => ({
...requestDetails,
data: encryptedUpdate[index],
}));
const updateMessagesRequests = updateMessagesEncrypted.map(m => {
return new StoreGroupConfigOrMessageSubRequest({
encryptedData: m.data,
groupPk,
namespace: m.namespace,
ttlMs: m.ttl,
dbMessageIdentifier: m.dbMessageIdentifier,
});
});
const result = await MessageSender.sendEncryptedDataToSnode({
storeRequests: [...updateMessagesRequests],
destination: groupPk,
messagesHashesToDelete: null,
revokeSubRequest: null,
unrevokeSubRequest: null,
});
const expectedReplyLength = updateMessagesRequests.length; // each of those messages are sent as a subrequest
// we do a sequence call here. If we do not have the right expected number of results, consider it a failure
if (!isArray(result) || result.length !== expectedReplyLength) {
window.log.info(
`GroupSyncJob: unexpected result length: expected ${expectedReplyLength} but got ${result?.length}`
);
// this might be a 421 error (already handled) so let's retry this request a little bit later
return false;
}
return true;
}
async function pushChangesToGroupSwarmIfNeeded({
revokeSubRequest,
unrevokeSubRequest,
updateMessages,
groupPk,
supplementKeys,
}: WithGroupPubkey &
WithRevokeSubRequest & {
supplementKeys: Array<Uint8Array>;
updateMessages: Array<GroupUpdateMemberChangeMessage | GroupUpdateInfoChangeMessage>;
}): Promise<RunJobResult> {
// save the dumps to DB even before trying to push them, so at least we have an up to date dumps in the DB in case of crash, no network etc
await LibSessionUtil.saveDumpsToDb(groupPk);
const { allOldHashes, messages } = await LibSessionUtil.pendingChangesForGroup(groupPk);
const { allOldHashes, messages: pendingConfigData } =
await LibSessionUtil.pendingChangesForGroup(groupPk);
// If there are no pending changes then the job can just complete (next time something
// is updated we want to try and run immediately so don't schedule another run in this case)
if (isEmpty(messages) && !supplementKeys.length) {
if (isEmpty(pendingConfigData) && !supplementKeys.length) {
return RunJobResult.Success;
}
const networkTimestamp = GetNetworkTime.now();
const encryptedMessage: Array<StoreOnNodeData> = messages.map(item => {
const pendingConfigMsgs = pendingConfigData.map(item => {
return {
namespace: item.namespace,
pubkey: groupPk,
@ -102,49 +176,58 @@ async function pushChangesToGroupSwarmIfNeeded({
};
});
const extraMessagesToEncrypt: Array<StoreOnNodeData> = [];
if (supplementKeys.length) {
supplementKeys.forEach(key =>
extraMessagesToEncrypt.push({
namespace: SnodeNamespaces.ClosedGroupKeys,
pubkey: groupPk,
ttl: TTL_DEFAULT.CONFIG_MESSAGE,
networkTimestamp,
data: key,
})
);
}
for (let index = 0; index < updateMessages.length; index++) {
const updateMessage = updateMessages[index];
const wrapped = await MessageSender.wrapContentIntoEnvelope(
SignalService.Envelope.Type.SESSION_MESSAGE,
undefined,
networkTimestamp,
updateMessage.plainTextBuffer()
);
extraMessagesToEncrypt.push({
namespace: SnodeNamespaces.ClosedGroupMessages,
pubkey: groupPk,
ttl: TTL_DEFAULT.CONTENT_MESSAGE,
networkTimestamp,
data: SignalService.Envelope.encode(wrapped).finish(),
});
}
const keysMessagesToEncrypt: Array<StoreGroupExtraData> = supplementKeys.map(key => ({
namespace: SnodeNamespaces.ClosedGroupKeys,
pubkey: groupPk,
ttl: TTL_DEFAULT.CONFIG_MESSAGE,
networkTimestamp,
data: key,
dbMessageIdentifier: null,
}));
const encryptedData = await MetaGroupWrapperActions.encryptMessages(
groupPk,
extraMessagesToEncrypt.map(m => m.data)
);
const keysEncrypted = keysMessagesToEncrypt
? await MetaGroupWrapperActions.encryptMessages(
groupPk,
keysMessagesToEncrypt.map(m => m.data)
)
: [];
const extraMessagesEncrypted = extraMessagesToEncrypt.map((requestDetails, index) => ({
const keysEncryptedmessage = keysMessagesToEncrypt.map((requestDetails, index) => ({
...requestDetails,
data: encryptedData[index],
data: keysEncrypted[index],
}));
const pendingConfigRequests = pendingConfigMsgs.map(m => {
return new StoreGroupConfigOrMessageSubRequest({
encryptedData: m.data,
groupPk,
namespace: m.namespace,
ttlMs: m.ttl,
dbMessageIdentifier: null, // those are config messages only, they have no dbMessageIdentifier
});
});
const keysEncryptedRequests = keysEncryptedmessage.map(m => {
return new StoreGroupConfigOrMessageSubRequest({
encryptedData: m.data,
groupPk,
namespace: m.namespace,
ttlMs: m.ttl,
dbMessageIdentifier: null, // those are supplemental keys messages only, they have no dbMessageIdentifier
});
});
if (
revokeSubRequest?.revokeTokenHex.length === 0 ||
unrevokeSubRequest?.revokeTokenHex.length === 0
) {
throw new Error(
'revokeSubRequest and unrevoke request must be null when not doing token change'
);
}
const result = await MessageSender.sendEncryptedDataToSnode({
encryptedData: [...encryptedMessage, ...extraMessagesEncrypted],
storeRequests: [...pendingConfigRequests, ...keysEncryptedRequests],
destination: groupPk,
messagesHashesToDelete: allOldHashes,
revokeSubRequest,
@ -152,11 +235,11 @@ async function pushChangesToGroupSwarmIfNeeded({
});
const expectedReplyLength =
messages.length + // each of those messages are sent as a subrequest
extraMessagesEncrypted.length + // each of those messages are sent as a subrequest
pendingConfigRequests.length + // each of those messages are sent as a subrequest
keysEncryptedRequests.length + // each of those messages are sent as a subrequest
(allOldHashes.size ? 1 : 0) + // we are sending all hashes changes as a single request
(revokeSubRequest?.revokeTokenHex.length ? 1 : 0) + // we are sending all revoke updates as a single request
(unrevokeSubRequest?.revokeTokenHex.length ? 1 : 0); // we are sending all revoke updates as a single request
(revokeSubRequest ? 1 : 0) + // we are sending all revoke updates as a single request
(unrevokeSubRequest ? 1 : 0); // we are sending all revoke updates as a single request
// we do a sequence call here. If we do not have the right expected number of results, consider it a failure
if (!isArray(result) || result.length !== expectedReplyLength) {
@ -170,8 +253,9 @@ async function pushChangesToGroupSwarmIfNeeded({
const changes = LibSessionUtil.batchResultsToGroupSuccessfulChange(result, {
allOldHashes,
messages,
messages: pendingConfigData,
});
if (isEmpty(changes)) {
return RunJobResult.RetryJobIfPossible;
}
@ -228,7 +312,6 @@ class GroupSyncJob extends PersistedJob<GroupSyncPersistedData> {
revokeSubRequest: null,
unrevokeSubRequest: null,
supplementKeys: [],
updateMessages: [],
});
// eslint-disable-next-line no-useless-catch
@ -306,6 +389,7 @@ async function queueNewJobIfNeeded(groupPk: GroupPubkeyType) {
export const GroupSync = {
GroupSyncJob,
pushChangesToGroupSwarmIfNeeded,
storeGroupUpdateMessages,
queueNewJobIfNeeded: (groupPk: GroupPubkeyType) =>
allowOnlyOneAtATime(`GroupSyncJob-oneAtAtTime-${groupPk}`, () => queueNewJobIfNeeded(groupPk)),
};

@ -7,8 +7,7 @@ import { ConfigDumpData } from '../../../../data/configDump/configDump';
import { UserSyncJobDone } from '../../../../shims/events';
import { isSignInByLinking } from '../../../../util/storage';
import { GenericWrapperActions } from '../../../../webworker/workers/browser/libsession_worker_interface';
import { StoreOnNodeData } from '../../../apis/snode_api/SnodeRequestTypes';
import { GetNetworkTime } from '../../../apis/snode_api/getNetworkTime';
import { StoreUserConfigSubRequest } from '../../../apis/snode_api/SnodeRequestTypes';
import { TTL_DEFAULT } from '../../../constants';
import { ConvoHub } from '../../../conversations';
import { MessageSender } from '../../../sending/MessageSender';
@ -86,18 +85,17 @@ async function pushChangesToUserSwarmIfNeeded() {
triggerConfSyncJobDone();
return RunJobResult.Success;
}
const msgs: Array<StoreOnNodeData> = changesToPush.messages.map(item => {
return {
namespace: item.namespace,
pubkey: us,
networkTimestamp: GetNetworkTime.now(),
ttl: TTL_DEFAULT.CONFIG_MESSAGE,
data: item.ciphertext,
};
const storeRequests = changesToPush.messages.map(m => {
return new StoreUserConfigSubRequest({
encryptedData: m.ciphertext,
namespace: m.namespace,
ttlMs: TTL_DEFAULT.CONFIG_MESSAGE,
});
});
const result = await MessageSender.sendEncryptedDataToSnode({
encryptedData: msgs,
storeRequests,
destination: us,
messagesHashesToDelete: changesToPush.allOldHashes,
revokeSubRequest: null,

@ -20,7 +20,7 @@ import {
BatchResultEntry,
NotEmptyArrayOfBatchResults,
} from '../../apis/snode_api/SnodeRequestTypes';
import { SnodeNamespaces, UserConfigNamespaces } from '../../apis/snode_api/namespaces';
import { SnodeNamespaces, SnodeNamespacesUserConfig } from '../../apis/snode_api/namespaces';
import { ed25519Str } from '../../onions/onionPath';
import { PubKey } from '../../types';
import { UserSync } from '../job_runners/jobs/UserSyncJob';
@ -105,7 +105,7 @@ type PendingChangesShared = {
export type PendingChangesForUs = PendingChangesShared & {
seqno: Long;
namespace: UserConfigNamespaces;
namespace: SnodeNamespacesUserConfig;
};
type PendingChangesForGroupNonKey = PendingChangesShared & {
@ -232,7 +232,7 @@ async function pendingChangesForGroup(groupPk: GroupPubkeyType): Promise<GroupDe
* Return the wrapperId associated with a specific namespace.
* WrapperIds are what we use in the database and with the libsession workers calls, and namespace is what we push to.
*/
function userNamespaceToVariant(namespace: UserConfigNamespaces) {
function userNamespaceToVariant(namespace: SnodeNamespacesUserConfig) {
// TODO Might be worth migrating them to use directly the namespaces?
switch (namespace) {
case SnodeNamespaces.UserProfile:

@ -11,7 +11,6 @@ import {
} from 'libsession_util_nodejs';
import { base64_variants, from_base64 } from 'libsodium-wrappers-sumo';
import { intersection, isEmpty, uniq } from 'lodash';
import { v4 } from 'uuid';
import { ConfigDumpData } from '../../data/configDump/configDump';
import { ConversationModel } from '../../models/conversation';
import { ConversationTypeEnum } from '../../models/conversationAttributes';
@ -55,7 +54,6 @@ import { resetOverlayMode } from './section';
type WithAddWithoutHistoryMembers = { withoutHistory: Array<PubkeyType> };
type WithAddWithHistoryMembers = { withHistory: Array<PubkeyType> };
type WithRemoveMembers = { removed: Array<PubkeyType> };
type WithFromCurrentDevice = { fromCurrentDevice: boolean }; // there are some changes we want to do only when the current user do the change, and not when a network change triggers it.
type WithFromMemberLeftMessage = { fromMemberLeftMessage: boolean }; // there are some changes we want to skip when doing changes triggered from a memberLeft message.
export type GroupState = {
@ -186,7 +184,6 @@ const initNewGroupInWrapper = createAsyncThunk(
revokeSubRequest: null,
unrevokeSubRequest: null,
supplementKeys: [],
updateMessages: [],
});
if (result !== RunJobResult.Success) {
window.log.warn('GroupSync.pushChangesToGroupSwarmIfNeeded during create failed');
@ -511,13 +508,9 @@ async function handleRemoveMembersAndRekey({
groupPk,
removed,
secretKey,
fromCurrentDevice,
fromMemberLeftMessage,
}: WithGroupPubkey &
WithRemoveMembers &
WithFromCurrentDevice &
WithFromMemberLeftMessage & { secretKey: Uint8Array }) {
if (!fromCurrentDevice || !removed.length) {
}: WithGroupPubkey & WithRemoveMembers & WithFromMemberLeftMessage & { secretKey: Uint8Array }) {
if (!removed.length) {
return;
}
const createAtNetworkTimestamp = GetNetworkTime.now();
@ -609,91 +602,112 @@ function getConvoExpireDetailsForMsg(convo: ConversationModel) {
* Those are not going to change the state, they are just here as a "notification".
* i.e. "Alice was removed from the group"
*/
async function getUpdateMessagesToPush({
async function getRemovedControlMessage({
convo,
withHistory,
withoutHistory,
fromCurrentDevice,
groupPk,
removed,
adminSecretKey,
createAtNetworkTimestamp,
fromMemberLeftMessage,
}: WithAddWithHistoryMembers &
WithAddWithoutHistoryMembers &
WithFromMemberLeftMessage &
dbMsgIdentifier,
}: WithFromMemberLeftMessage &
WithRemoveMembers &
WithFromCurrentDevice &
WithGroupPubkey & {
convo: ConversationModel;
adminSecretKey: Uint8ArrayLen64;
createAtNetworkTimestamp: number;
dbMsgIdentifier: string;
}) {
const sodium = await getSodiumRenderer();
const updateMessages: Array<GroupUpdateMemberChangeMessage> = [];
if (!fromCurrentDevice || fromMemberLeftMessage) {
return updateMessages;
if (fromMemberLeftMessage || !removed.length) {
return null;
}
if (withoutHistory.length) {
updateMessages.push(
new GroupUpdateMemberChangeMessage({
identifier: v4(),
added: withoutHistory,
groupPk,
typeOfChange: 'added',
createAtNetworkTimestamp,
secretKey: adminSecretKey,
sodium,
...getConvoExpireDetailsForMsg(convo),
})
);
}
if (withHistory.length) {
updateMessages.push(
new GroupUpdateMemberChangeMessage({
identifier: v4(),
added: withHistory,
groupPk,
typeOfChange: 'addedWithHistory',
createAtNetworkTimestamp,
secretKey: adminSecretKey,
sodium,
...getConvoExpireDetailsForMsg(convo),
})
);
return new GroupUpdateMemberChangeMessage({
identifier: dbMsgIdentifier,
removed,
groupPk,
typeOfChange: 'removed',
createAtNetworkTimestamp,
secretKey: adminSecretKey,
sodium,
...getConvoExpireDetailsForMsg(convo),
});
}
async function getWithoutHistoryControlMessage({
convo,
withoutHistory,
groupPk,
adminSecretKey,
createAtNetworkTimestamp,
dbMsgIdentifier,
}: WithAddWithoutHistoryMembers &
WithGroupPubkey & {
dbMsgIdentifier: string;
convo: ConversationModel;
adminSecretKey: Uint8ArrayLen64;
createAtNetworkTimestamp: number;
}) {
const sodium = await getSodiumRenderer();
if (!withoutHistory.length) {
return null;
}
if (removed.length) {
updateMessages.push(
new GroupUpdateMemberChangeMessage({
identifier: v4(),
removed,
groupPk,
typeOfChange: 'removed',
createAtNetworkTimestamp,
secretKey: adminSecretKey,
sodium,
...getConvoExpireDetailsForMsg(convo),
})
);
return new GroupUpdateMemberChangeMessage({
identifier: dbMsgIdentifier,
added: withoutHistory,
groupPk,
typeOfChange: 'added',
createAtNetworkTimestamp,
secretKey: adminSecretKey,
sodium,
...getConvoExpireDetailsForMsg(convo),
});
}
async function getWithHistoryControlMessage({
convo,
withHistory,
groupPk,
adminSecretKey,
createAtNetworkTimestamp,
dbMsgIdentifier,
}: WithAddWithHistoryMembers &
WithGroupPubkey & {
dbMsgIdentifier: string;
convo: ConversationModel;
adminSecretKey: Uint8ArrayLen64;
createAtNetworkTimestamp: number;
}) {
const sodium = await getSodiumRenderer();
if (!withHistory.length) {
return null;
}
// TODO might need to add the promote case here
return updateMessages;
return new GroupUpdateMemberChangeMessage({
identifier: dbMsgIdentifier,
added: withHistory,
groupPk,
typeOfChange: 'addedWithHistory',
createAtNetworkTimestamp,
secretKey: adminSecretKey,
sodium,
...getConvoExpireDetailsForMsg(convo),
});
}
async function handleMemberAddedFromUIOrNot({
async function handleMemberAddedFromUI({
addMembersWithHistory,
addMembersWithoutHistory,
groupPk,
fromCurrentDevice,
}: WithFromCurrentDevice &
WithGroupPubkey & {
addMembersWithHistory: Array<PubkeyType>;
addMembersWithoutHistory: Array<PubkeyType>;
}) {
}: WithGroupPubkey & {
addMembersWithHistory: Array<PubkeyType>;
addMembersWithoutHistory: Array<PubkeyType>;
}) {
const group = await UserGroupsWrapperActions.getGroup(groupPk);
if (!group || !group.secretKey || isEmpty(group.secretKey)) {
throw new Error('tried to make change to group but we do not have the admin secret key');
@ -724,25 +738,12 @@ async function handleMemberAddedFromUIOrNot({
await handleWithoutHistoryMembers({ groupPk, withoutHistory });
const createAtNetworkTimestamp = GetNetworkTime.now();
const updateMessages = await getUpdateMessagesToPush({
adminSecretKey: group.secretKey,
convo,
fromCurrentDevice,
groupPk,
removed: [],
withHistory,
withoutHistory,
createAtNetworkTimestamp,
fromMemberLeftMessage: false,
});
await LibSessionUtil.saveDumpsToDb(groupPk);
// push new members & key supplement in a single batch call
const sequenceResult = await GroupSync.pushChangesToGroupSwarmIfNeeded({
groupPk,
supplementKeys,
updateMessages,
...revokeUnrevokeParams,
});
if (sequenceResult !== RunJobResult.Success) {
@ -774,22 +775,46 @@ async function handleMemberAddedFromUIOrNot({
: null,
},
};
const additionsWithoutHistory = updateMessages.find(m => m.typeOfChange === 'added');
const additionsWithHistory = updateMessages.find(m => m.typeOfChange === 'addedWithHistory');
if (additionsWithoutHistory) {
await ClosedGroup.addUpdateMessage({
diff: { type: 'add', added: additionsWithoutHistory.memberSessionIds, withHistory: false },
const updateMessagesToPush: Array<GroupUpdateMemberChangeMessage> = [];
if (withHistory.length) {
const msgModel = await ClosedGroup.addUpdateMessage({
diff: { type: 'add', added: withHistory, withHistory: false },
...shared,
});
const groupChange = await getWithHistoryControlMessage({
adminSecretKey: group.secretKey,
convo,
groupPk,
withHistory,
createAtNetworkTimestamp,
dbMsgIdentifier: msgModel.id,
});
if (groupChange) {
updateMessagesToPush.push(groupChange);
}
}
if (additionsWithHistory) {
await ClosedGroup.addUpdateMessage({
diff: { type: 'add', added: additionsWithHistory.memberSessionIds, withHistory: true },
if (withoutHistory.length) {
const msgModel = await ClosedGroup.addUpdateMessage({
diff: { type: 'add', added: withoutHistory, withHistory: true },
...shared,
});
const groupChange = await getWithoutHistoryControlMessage({
adminSecretKey: group.secretKey,
convo,
groupPk,
withoutHistory,
createAtNetworkTimestamp,
dbMsgIdentifier: msgModel.id,
});
if (groupChange) {
updateMessagesToPush.push(groupChange);
}
console.warn(`diff: { type: ' should add case for addWithHistory here ?`);
}
await convo.commit();
await GroupSync.storeGroupUpdateMessages({ groupPk, updateMessages: updateMessagesToPush });
}
/**
@ -797,13 +822,11 @@ async function handleMemberAddedFromUIOrNot({
* - to udpate the state when kicking a member from the group from the UI
* - to update the state when handling a MEMBER_LEFT message
*/
async function handleMemberRemovedFromUIOrNot({
async function handleMemberRemovedFromUI({
groupPk,
removeMembers,
fromCurrentDevice,
fromMemberLeftMessage,
}: WithFromCurrentDevice &
WithFromMemberLeftMessage &
}: WithFromMemberLeftMessage &
WithGroupPubkey & {
removeMembers: Array<PubkeyType>;
}) {
@ -812,7 +835,11 @@ async function handleMemberRemovedFromUIOrNot({
throw new Error('tried to make change to group but we do not have the admin secret key');
}
await checkWeAreAdminOrThrow(groupPk, 'handleMemberRemovedFromUIOrNot');
await checkWeAreAdminOrThrow(groupPk, 'handleMemberRemovedFromUI');
if (removeMembers.length === 0) {
return;
}
const { removed, convo, us } = validateMemberRemoveChange({
groupPk,
@ -833,30 +860,16 @@ async function handleMemberRemovedFromUIOrNot({
groupPk,
removed,
secretKey: group.secretKey,
fromCurrentDevice,
fromMemberLeftMessage,
});
const createAtNetworkTimestamp = GetNetworkTime.now();
const updateMessages = await getUpdateMessagesToPush({
adminSecretKey: group.secretKey,
convo,
fromCurrentDevice,
groupPk,
removed,
withHistory: [],
withoutHistory: [],
createAtNetworkTimestamp,
fromMemberLeftMessage,
});
await LibSessionUtil.saveDumpsToDb(groupPk);
// revoked pubkeys, update messages, and libsession groups config in a single batchcall
const sequenceResult = await GroupSync.pushChangesToGroupSwarmIfNeeded({
groupPk,
updateMessages,
supplementKeys: [],
...revokeUnrevokeParams,
});
@ -887,26 +900,37 @@ async function handleMemberRemovedFromUIOrNot({
: null,
},
};
await convo.commit();
const removals = updateMessages.find(m => m.typeOfChange === 'removed');
if (removals) {
await ClosedGroup.addUpdateMessage({
if (removed.length) {
const msgModel = await ClosedGroup.addUpdateMessage({
diff: { type: 'kicked', kicked: removed },
...shared,
});
const removedControlMessage = await getRemovedControlMessage({
adminSecretKey: group.secretKey,
convo,
groupPk,
removed,
createAtNetworkTimestamp,
fromMemberLeftMessage,
dbMsgIdentifier: msgModel.id,
});
if (removedControlMessage) {
await GroupSync.storeGroupUpdateMessages({
groupPk,
updateMessages: [removedControlMessage],
});
}
}
await convo.commit();
}
async function handleNameChangeFromUI({
groupPk,
newName: uncheckedName,
fromCurrentDevice,
}: WithFromCurrentDevice &
WithGroupPubkey & {
newName: string;
}) {
}: WithGroupPubkey & {
newName: string;
}) {
const group = await UserGroupsWrapperActions.getGroup(groupPk);
if (!group || !group.secretKey || isEmpty(group.secretKey)) {
throw new Error('tried to make change to group but we do not have the admin secret key');
@ -931,7 +955,6 @@ async function handleNameChangeFromUI({
await MetaGroupWrapperActions.infoSet(groupPk, infos);
const createAtNetworkTimestamp = GetNetworkTime.now();
const updateMessages: Array<GroupUpdateInfoChangeMessage> = [];
// we want to add an update message even if the change was done remotely
const msg = await ClosedGroup.addUpdateMessage({
convo,
@ -942,27 +965,22 @@ async function handleNameChangeFromUI({
});
// we want to send an update only if the change was made locally.
if (fromCurrentDevice) {
updateMessages.push(
new GroupUpdateInfoChangeMessage({
groupPk,
typeOfChange: SignalService.GroupUpdateInfoChangeMessage.Type.NAME,
updatedName: newName,
identifier: msg.id,
createAtNetworkTimestamp,
secretKey: group.secretKey,
sodium: await getSodiumRenderer(),
...getConvoExpireDetailsForMsg(convo),
})
);
}
const nameChangeMsg = new GroupUpdateInfoChangeMessage({
groupPk,
typeOfChange: SignalService.GroupUpdateInfoChangeMessage.Type.NAME,
updatedName: newName,
identifier: msg.id,
createAtNetworkTimestamp,
secretKey: group.secretKey,
sodium: await getSodiumRenderer(),
...getConvoExpireDetailsForMsg(convo),
});
const batchResult = await GroupSync.pushChangesToGroupSwarmIfNeeded({
groupPk,
supplementKeys: [],
revokeSubRequest: null,
unrevokeSubRequest: null,
updateMessages,
});
if (batchResult !== RunJobResult.Success) {
@ -972,6 +990,7 @@ async function handleNameChangeFromUI({
}
await UserSync.queueNewJobIfNeeded();
await GroupSync.storeGroupUpdateMessages({ groupPk, updateMessages: [nameChangeMsg] });
convo.set({
active_at: createAtNetworkTimestamp,
@ -1005,16 +1024,14 @@ const currentDeviceGroupMembersChange = createAsyncThunk(
);
}
await handleMemberRemovedFromUIOrNot({
await handleMemberRemovedFromUI({
groupPk,
removeMembers: args.removeMembers,
fromCurrentDevice: true,
fromMemberLeftMessage: false,
});
await handleMemberAddedFromUIOrNot({
await handleMemberAddedFromUI({
groupPk,
fromCurrentDevice: true,
addMembersWithHistory: args.addMembersWithHistory,
addMembersWithoutHistory: args.addMembersWithoutHistory,
});
@ -1051,10 +1068,9 @@ const handleMemberLeftMessage = createAsyncThunk(
);
}
await handleMemberRemovedFromUIOrNot({
await handleMemberRemovedFromUI({
groupPk,
removeMembers: [memberLeft],
fromCurrentDevice: true,
fromMemberLeftMessage: true,
});
@ -1160,7 +1176,7 @@ const currentDeviceGroupNameChange = createAsyncThunk(
}
await checkWeAreAdminOrThrow(groupPk, 'currentDeviceGroupNameChange');
await handleNameChangeFromUI({ groupPk, ...args, fromCurrentDevice: true });
await handleNameChangeFromUI({ groupPk, ...args });
return {
groupPk,

@ -261,7 +261,7 @@ describe('SnodeSignature', () => {
group: { pubkeyHex: null as any, secretKey: privKeyUint, authData: null },
messagesHashes: ['[;p['],
shortenOrExtend: '',
timestamp: hardcodedTimestamp,
expiryMs: hardcodedTimestamp,
});
};
await expect(func()).to.be.rejectedWith(
@ -280,7 +280,7 @@ describe('SnodeSignature', () => {
messagesHashes: ['[;p['],
shortenOrExtend: '',
timestamp: hardcodedTimestamp,
expiryMs: hardcodedTimestamp,
});
};
await expect(func()).to.be.rejectedWith(
@ -290,74 +290,74 @@ describe('SnodeSignature', () => {
it('works with valid pubkey and privkey', async () => {
const hashes = ['hash4321', 'hash4221'];
const timestamp = hardcodedTimestamp;
const expiryMs = hardcodedTimestamp;
const shortenOrExtend = '';
const ret = await SnodeGroupSignature.generateUpdateExpiryGroupSignature({
group: { pubkeyHex: validGroupPk, secretKey: privKeyUint, authData: null },
messagesHashes: hashes,
shortenOrExtend: '',
timestamp,
expiryMs,
});
expect(ret.pubkey).to.be.eq(validGroupPk);
const verificationData = `expire${shortenOrExtend}${timestamp}${hashes.join('')}`;
const verificationData = `expire${shortenOrExtend}${expiryMs}${hashes.join('')}`;
await verifySig(ret, verificationData);
});
it('fails with invalid timestamp', async () => {
const hashes = ['hash4321', 'hash4221'];
const timestamp = hardcodedTimestamp;
const expiryMs = hardcodedTimestamp;
const shortenOrExtend = '';
const ret = await SnodeGroupSignature.generateUpdateExpiryGroupSignature({
group: { pubkeyHex: validGroupPk, secretKey: privKeyUint, authData: null },
messagesHashes: hashes,
shortenOrExtend: '',
timestamp,
expiryMs,
});
expect(ret.pubkey).to.be.eq(validGroupPk);
const verificationData = `expire${shortenOrExtend}${timestamp}1${hashes.join('')}`;
const verificationData = `expire${shortenOrExtend}${expiryMs}1${hashes.join('')}`;
const func = async () => verifySig(ret, verificationData);
await expect(func()).rejectedWith('sig failed to be verified');
});
it('fails with invalid hashes', async () => {
const hashes = ['hash4321', 'hash4221'];
const timestamp = hardcodedTimestamp;
const expiryMs = hardcodedTimestamp;
const shortenOrExtend = '';
const ret = await SnodeGroupSignature.generateUpdateExpiryGroupSignature({
group: { pubkeyHex: validGroupPk, secretKey: privKeyUint, authData: null },
messagesHashes: hashes,
shortenOrExtend: '',
timestamp,
expiryMs,
});
expect(ret.pubkey).to.be.eq(validGroupPk);
const overridenHash = hashes.slice();
overridenHash[0] = '1111';
const verificationData = `expire${shortenOrExtend}${timestamp}${overridenHash.join('')}`;
const verificationData = `expire${shortenOrExtend}${expiryMs}${overridenHash.join('')}`;
const func = async () => verifySig(ret, verificationData);
await expect(func()).rejectedWith('sig failed to be verified');
});
it('fails with invalid number of hashes', async () => {
const hashes = ['hash4321', 'hash4221'];
const timestamp = hardcodedTimestamp;
const expiryMs = hardcodedTimestamp;
const shortenOrExtend = '';
const ret = await SnodeGroupSignature.generateUpdateExpiryGroupSignature({
group: { pubkeyHex: validGroupPk, secretKey: privKeyUint, authData: null },
messagesHashes: hashes,
shortenOrExtend: '',
timestamp,
expiryMs,
});
expect(ret.pubkey).to.be.eq(validGroupPk);
const overridenHash = [hashes[0]];
const verificationData = `expire${shortenOrExtend}${timestamp}${overridenHash.join('')}`;
const verificationData = `expire${shortenOrExtend}${expiryMs}${overridenHash.join('')}`;
const func = async () => verifySig(ret, verificationData);
await expect(func()).rejectedWith('sig failed to be verified');
});

@ -55,18 +55,20 @@ describe('ExpireRequest', () => {
throw Error('nothing was returned when building the request');
}
expect(request, "method should be 'expire'").to.have.property('method', 'expire');
expect(request.params.pubkey, 'should have a matching pubkey').to.equal(ourNumber);
expect(request.params.messages, 'messageHash should be in messages array').to.deep.equal(
const signedReq = await request.buildAndSignParameters();
expect(signedReq, "method should be 'expire'").to.have.property('method', 'expire');
expect(signedReq.params.pubkey, 'should have a matching pubkey').to.equal(ourNumber);
expect(signedReq.params.messages, 'messageHash should be in messages array').to.deep.equal(
props.messageHashes
);
expect(
request.params.expiry && isValidUnixTimestamp(request?.params.expiry),
signedReq.params.expiry && isValidUnixTimestamp(signedReq.params.expiry),
'expiry should be a valid unix timestamp'
).to.be.true;
expect(request.params.extend, 'extend should be undefined').to.be.undefined;
expect(request.params.shorten, 'shorten should be undefined').to.be.undefined;
expect(request.params.signature, 'signature should not be empty').to.not.be.empty;
expect(signedReq.params.extend, 'extend should be undefined').to.be.undefined;
expect(signedReq.params.shorten, 'shorten should be undefined').to.be.undefined;
expect(signedReq.params.signature, 'signature should not be empty').to.not.be.empty;
});
it('builds a request with extend enabled', async () => {
const request: UpdateExpiryOnNodeUserSubRequest | null = await buildExpireRequestSingleExpiry(
@ -81,19 +83,20 @@ describe('ExpireRequest', () => {
if (!request) {
throw Error('nothing was returned when building the request');
}
const signedReq = await request.buildAndSignParameters();
expect(request, "method should be 'expire'").to.have.property('method', 'expire');
expect(request.params.pubkey, 'should have a matching pubkey').to.equal(ourNumber);
expect(request.params.messages, 'messageHash should be in messages array').to.equal(
expect(signedReq, "method should be 'expire'").to.have.property('method', 'expire');
expect(signedReq.params.pubkey, 'should have a matching pubkey').to.equal(ourNumber);
expect(signedReq.params.messages, 'messageHash should be in messages array').to.equal(
props.messageHashes
);
expect(
request.params.expiry && isValidUnixTimestamp(request?.params.expiry),
signedReq.params.expiry && isValidUnixTimestamp(signedReq?.params.expiry),
'expiry should be a valid unix timestamp'
).to.be.true;
expect(request.params.extend, 'extend should be true').to.be.true;
expect(request.params.shorten, 'shorten should be undefined').to.be.undefined;
expect(request.params.signature, 'signature should not be empty').to.not.be.empty;
expect(signedReq.params.extend, 'extend should be true').to.be.true;
expect(signedReq.params.shorten, 'shorten should be undefined').to.be.undefined;
expect(signedReq.params.signature, 'signature should not be empty').to.not.be.empty;
});
it('builds a request with shorten enabled', async () => {
const request: UpdateExpiryOnNodeUserSubRequest | null = await buildExpireRequestSingleExpiry(
@ -108,19 +111,20 @@ describe('ExpireRequest', () => {
if (!request) {
throw Error('nothing was returned when building the request');
}
const signedReq = await request.buildAndSignParameters();
expect(request, "method should be 'expire'").to.have.property('method', 'expire');
expect(request.params.pubkey, 'should have a matching pubkey').to.equal(ourNumber);
expect(request.params.messages, 'messageHash should be in messages array').to.equal(
expect(signedReq, "method should be 'expire'").to.have.property('method', 'expire');
expect(signedReq.params.pubkey, 'should have a matching pubkey').to.equal(ourNumber);
expect(signedReq.params.messages, 'messageHash should be in messages array').to.equal(
props.messageHashes
);
expect(
request.params.expiry && isValidUnixTimestamp(request?.params.expiry),
signedReq.params.expiry && isValidUnixTimestamp(signedReq?.params.expiry),
'expiry should be a valid unix timestamp'
).to.be.true;
expect(request.params.extend, 'extend should be undefined').to.be.undefined;
expect(request.params.shorten, 'shorten should be true').to.be.true;
expect(request.params.signature, 'signature should not be empty').to.not.be.empty;
expect(signedReq.params.extend, 'extend should be undefined').to.be.undefined;
expect(signedReq.params.shorten, 'shorten should be true').to.be.true;
expect(signedReq.params.signature, 'signature should not be empty').to.not.be.empty;
});
});

@ -52,14 +52,14 @@ describe('MessageQueue', () => {
Sinon.stub(UserUtils, 'getOurPubKeyStrFromCache').returns(ourNumber);
// Message Sender Stubs
sendStub = Sinon.stub(MessageSender, 'send');
sendStub = Sinon.stub(MessageSender, 'sendSingleMessage');
messageSentHandlerFailedStub = Sinon.stub(
MessageSentHandler,
'handleMessageSentFailure'
'handleSwarmMessageSentFailure'
).resolves();
messageSentHandlerSuccessStub = Sinon.stub(
MessageSentHandler,
'handleMessageSentSuccess'
'handleSwarmMessageSentSuccess'
).resolves();
messageSentPublicHandlerSuccessStub = Sinon.stub(
MessageSentHandler,

@ -69,7 +69,7 @@ describe('MessageSender', () => {
it('should not retry if an error occurred during encryption', async () => {
encryptStub.throws(new Error('Failed to encrypt.'));
const promise = MessageSender.send({
const promise = MessageSender.sendSingleMessage({
message: rawMessage,
attempts: 3,
retryMinTimeout: 10,
@ -80,7 +80,7 @@ describe('MessageSender', () => {
});
it('should only call lokiMessageAPI once if no errors occured', async () => {
await MessageSender.send({
await MessageSender.sendSingleMessage({
message: rawMessage,
attempts: 3,
retryMinTimeout: 10,
@ -92,7 +92,7 @@ describe('MessageSender', () => {
it('should only retry the specified amount of times before throwing', async () => {
sessionMessageAPISendStub.throws(new Error('API error'));
const attempts = 2;
const promise = MessageSender.send({
const promise = MessageSender.sendSingleMessage({
message: rawMessage,
attempts,
retryMinTimeout: 10,
@ -104,7 +104,7 @@ describe('MessageSender', () => {
it('should not throw error if successful send occurs within the retry limit', async () => {
sessionMessageAPISendStub.onFirstCall().throws(new Error('API error'));
await MessageSender.send({
await MessageSender.sendSingleMessage({
message: rawMessage,
attempts: 3,
retryMinTimeout: 10,
@ -135,7 +135,7 @@ describe('MessageSender', () => {
SnodeNamespaces.Default
);
await MessageSender.send({
await MessageSender.sendSingleMessage({
message: rawMessage,
attempts: 3,
retryMinTimeout: 10,
@ -166,7 +166,7 @@ describe('MessageSender', () => {
);
const offset = 200000;
Sinon.stub(GetNetworkTime, 'getLatestTimestampOffset').returns(offset);
await MessageSender.send({
await MessageSender.sendSingleMessage({
message: rawMessage,
attempts: 3,
retryMinTimeout: 10,
@ -225,7 +225,7 @@ describe('MessageSender', () => {
visibleMessage,
SnodeNamespaces.Default
);
await MessageSender.send({
await MessageSender.sendSingleMessage({
message: rawMessage,
attempts: 3,
retryMinTimeout: 10,

@ -276,7 +276,6 @@ describe('GroupSyncJob pushChangesToGroupSwarmIfNeeded', () => {
revokeSubRequest: null,
unrevokeSubRequest: null,
supplementKeys: [],
updateMessages: [],
});
pendingChangesForGroupStub.resolves(undefined);
expect(result).to.be.eq(RunJobResult.Success);
@ -301,7 +300,6 @@ describe('GroupSyncJob pushChangesToGroupSwarmIfNeeded', () => {
revokeSubRequest: null,
unrevokeSubRequest: null,
supplementKeys: [],
updateMessages: [],
});
sendStub.resolves(undefined);
@ -366,7 +364,6 @@ describe('GroupSyncJob pushChangesToGroupSwarmIfNeeded', () => {
revokeSubRequest: null,
unrevokeSubRequest: null,
supplementKeys: [],
updateMessages: [],
});
expect(sendStub.callCount).to.be.eq(1);

@ -8,7 +8,7 @@ import { NotEmptyArrayOfBatchResults } from '../../../../../../session/apis/snod
import { GetNetworkTime } from '../../../../../../session/apis/snode_api/getNetworkTime';
import {
SnodeNamespaces,
UserConfigNamespaces,
SnodeNamespacesUserConfig,
} from '../../../../../../session/apis/snode_api/namespaces';
import { TTL_DEFAULT } from '../../../../../../session/constants';
import { ConvoHub } from '../../../../../../session/conversations';
@ -29,7 +29,7 @@ import { TypedStub, stubConfigDumpData } from '../../../../../test-utils/utils';
function userChange(
sodium: LibSodiumWrappers,
namespace: UserConfigNamespaces,
namespace: SnodeNamespacesUserConfig,
seqno: number
): PendingChangesForUs {
return {

@ -24,6 +24,8 @@ export type AsyncWrapper<T extends (...args: any) => any> = (
...args: Parameters<T>
) => Promise<ReturnType<T>>;
export type AwaitedReturn<T extends (...args: any) => any> = Awaited<ReturnType<T>>;
/**
* This type is used to build from an objectType filled with functions, a new object type where all the functions their async equivalent
*/

Loading…
Cancel
Save