diff --git a/ts/interactions/conversations/unsendingInteractions.ts b/ts/interactions/conversations/unsendingInteractions.ts index 4a713613e..97997fe00 100644 --- a/ts/interactions/conversations/unsendingInteractions.ts +++ b/ts/interactions/conversations/unsendingInteractions.ts @@ -13,6 +13,7 @@ import { resetSelectedMessageIds } from '../../state/ducks/conversations'; import { updateConfirmModal } from '../../state/ducks/modalDialog'; import { SessionButtonColor } from '../../components/basic/SessionButton'; import { deleteSogsMessageByServerIds } from '../../session/apis/open_group_api/sogsv3/sogsV3DeleteMessages'; +import { SnodeNamespaces } from '../../session/apis/snode_api/namespaces'; /** * Deletes messages for everyone in a 1-1 or everyone in a closed group conversation. @@ -38,14 +39,14 @@ async function unsendMessagesForEveryone( await Promise.all( unsendMsgObjects.map(unsendObject => getMessageQueue() - .sendToPubKey(new PubKey(destinationId), unsendObject) + .sendToPubKey(new PubKey(destinationId), unsendObject, SnodeNamespaces.UserMessages) .catch(window?.log?.error) ) ); await Promise.all( unsendMsgObjects.map(unsendObject => getMessageQueue() - .sendSyncMessage(unsendObject) + .sendSyncMessage({ namespace: SnodeNamespaces.UserMessages, message: unsendObject }) .catch(window?.log?.error) ) ); @@ -54,7 +55,11 @@ async function unsendMessagesForEveryone( await Promise.all( unsendMsgObjects.map(unsendObject => { getMessageQueue() - .sendToGroup(unsendObject, undefined, new PubKey(destinationId)) + .sendToGroup({ + message: unsendObject, + namespace: SnodeNamespaces.ClosedGroupMessage, + groupPubKey: new PubKey(destinationId), + }) .catch(window?.log?.error); }) ); @@ -222,7 +227,7 @@ async function unsendMessageJustForThisUser( await Promise.all( unsendMsgObjects.map(unsendObject => getMessageQueue() - .sendSyncMessage(unsendObject) + .sendSyncMessage({ namespace: SnodeNamespaces.UserMessages, message: unsendObject }) .catch(window?.log?.error) ) ); diff --git a/ts/models/conversation.ts b/ts/models/conversation.ts index ca16d6c73..748a155c7 100644 --- a/ts/models/conversation.ts +++ b/ts/models/conversation.ts @@ -98,6 +98,7 @@ import { sogsV3FetchPreviewAndSaveIt } from '../session/apis/open_group_api/sogs import { Reaction } from '../types/Reaction'; import { Reactions } from '../util/reactions'; import { GetNetworkTime } from '../session/apis/snode_api/getNetworkTime'; +import { SnodeNamespaces } from '../session/apis/snode_api/namespaces'; export class ConversationModel extends Backbone.Model { public updateLastMessage: () => any; @@ -609,12 +610,12 @@ export class ConversationModel extends Backbone.Model { } const openGroup = OpenGroupData.getV2OpenGroupRoom(this.id); // send with blinding if we need to - await getMessageQueue().sendToOpenGroupV2( - chatMessageOpenGroupV2, + await getMessageQueue().sendToOpenGroupV2({ + message: chatMessageOpenGroupV2, roomInfos, - Boolean(roomHasBlindEnabled(openGroup)), - fileIdsToLink - ); + blinded: Boolean(roomHasBlindEnabled(openGroup)), + filesToLink: fileIdsToLink, + }); return; } @@ -625,7 +626,10 @@ export class ConversationModel extends Backbone.Model { chatMessageParams.syncTarget = this.id; const chatMessageMe = new VisibleMessage(chatMessageParams); - await getMessageQueue().sendSyncMessage(chatMessageMe); + await getMessageQueue().sendSyncMessage({ + namespace: SnodeNamespaces.UserMessages, + message: chatMessageMe, + }); return; } @@ -639,12 +643,20 @@ export class ConversationModel extends Backbone.Model { expireTimer: this.get('expireTimer'), }); // we need the return await so that errors are caught in the catch {} - await getMessageQueue().sendToPubKey(destinationPubkey, groupInvitMessage); + await getMessageQueue().sendToPubKey( + destinationPubkey, + groupInvitMessage, + SnodeNamespaces.UserMessages + ); return; } const chatMessagePrivate = new VisibleMessage(chatMessageParams); - await getMessageQueue().sendToPubKey(destinationPubkey, chatMessagePrivate); + await getMessageQueue().sendToPubKey( + destinationPubkey, + chatMessagePrivate, + SnodeNamespaces.UserMessages + ); return; } @@ -656,7 +668,10 @@ export class ConversationModel extends Backbone.Model { }); // we need the return await so that errors are caught in the catch {} - await getMessageQueue().sendToGroup(closedGroupVisibleMessage); + await getMessageQueue().sendToGroup({ + message: closedGroupVisibleMessage, + namespace: SnodeNamespaces.ClosedGroupMessage, + }); return; } @@ -729,7 +744,12 @@ export class ConversationModel extends Backbone.Model { const blinded = Boolean(roomHasBlindEnabled(openGroup)); // send with blinding if we need to - await getMessageQueue().sendToOpenGroupV2(chatMessageOpenGroupV2, roomInfos, blinded, []); + await getMessageQueue().sendToOpenGroupV2({ + message: chatMessageOpenGroupV2, + roomInfos, + blinded, + filesToLink: [], + }); return; } @@ -740,10 +760,17 @@ export class ConversationModel extends Backbone.Model { ...chatMessageParams, syncTarget: this.id, }); - await getMessageQueue().sendSyncMessage(chatMessageMe); + await getMessageQueue().sendSyncMessage({ + namespace: SnodeNamespaces.UserMessages, + message: chatMessageMe, + }); const chatMessagePrivate = new VisibleMessage(chatMessageParams); - await getMessageQueue().sendToPubKey(destinationPubkey, chatMessagePrivate); + await getMessageQueue().sendToPubKey( + destinationPubkey, + chatMessagePrivate, + SnodeNamespaces.UserMessages + ); await Reactions.handleMessageReaction({ reaction, sender: UserUtils.getOurPubKeyStrFromCache(), @@ -760,7 +787,10 @@ export class ConversationModel extends Backbone.Model { groupId: destination, }); // we need the return await so that errors are caught in the catch {} - await getMessageQueue().sendToGroup(closedGroupVisibleMessage); + await getMessageQueue().sendToGroup({ + message: closedGroupVisibleMessage, + namespace: SnodeNamespaces.ClosedGroupMessage, + }); await Reactions.handleMessageReaction({ reaction, sender: UserUtils.getOurPubKeyStrFromCache(), @@ -893,12 +923,12 @@ export class ConversationModel extends Backbone.Model { this.set({ active_at: Date.now(), isApproved: true }); - await getMessageQueue().sendToOpenGroupV2BlindedRequest( - encryptedMsg, - roomInfo, - sogsVisibleMessage, - this.id - ); + await getMessageQueue().sendToOpenGroupV2BlindedRequest({ + encryptedContent: encryptedMsg, + roomInfos: roomInfo, + message: sogsVisibleMessage, + recipientBlindedId: this.id, + }); } /** @@ -920,7 +950,7 @@ export class ConversationModel extends Backbone.Model { const messageRequestResponse = new MessageRequestResponse(messageRequestResponseParams); const pubkeyForSending = new PubKey(this.id); await getMessageQueue() - .sendToPubKey(pubkeyForSending, messageRequestResponse) + .sendToPubKey(pubkeyForSending, messageRequestResponse, SnodeNamespaces.UserMessages) .catch(window?.log?.error); } @@ -1118,7 +1148,11 @@ export class ConversationModel extends Backbone.Model { if (this.isPrivate()) { const expirationTimerMessage = new ExpirationTimerUpdateMessage(expireUpdate); const pubkey = new PubKey(this.get('id')); - await getMessageQueue().sendToPubKey(pubkey, expirationTimerMessage); + await getMessageQueue().sendToPubKey( + pubkey, + expirationTimerMessage, + SnodeNamespaces.UserMessages + ); } else { window?.log?.warn('TODO: Expiration update for closed groups are to be updated'); const expireUpdateForGroup = { @@ -1128,7 +1162,10 @@ export class ConversationModel extends Backbone.Model { const expirationTimerMessage = new ExpirationTimerUpdateMessage(expireUpdateForGroup); - await getMessageQueue().sendToGroup(expirationTimerMessage); + await getMessageQueue().sendToGroup({ + message: expirationTimerMessage, + namespace: SnodeNamespaces.ClosedGroupMessage, + }); } return; } @@ -1365,7 +1402,7 @@ export class ConversationModel extends Backbone.Model { }); const device = new PubKey(this.id); - await getMessageQueue().sendToPubKey(device, receiptMessage); + await getMessageQueue().sendToPubKey(device, receiptMessage, SnodeNamespaces.UserMessages); } } @@ -2045,7 +2082,7 @@ export class ConversationModel extends Backbone.Model { // send the message to a single recipient if this is a session chat const device = new PubKey(recipientId); getMessageQueue() - .sendToPubKey(device, typingMessage) + .sendToPubKey(device, typingMessage, SnodeNamespaces.UserMessages) .catch(window?.log?.error); } diff --git a/ts/models/message.ts b/ts/models/message.ts index 0b66676d4..3bc5b709a 100644 --- a/ts/models/message.ts +++ b/ts/models/message.ts @@ -95,6 +95,7 @@ import { QUOTED_TEXT_MAX_LENGTH } from '../session/constants'; import { ReactionList } from '../types/Reaction'; import { getAttachmentMetadata } from '../types/message/initializeAttachmentMetadata'; import { GetNetworkTime } from '../session/apis/snode_api/getNetworkTime'; +import { SnodeNamespaces } from '../session/apis/snode_api/namespaces'; // tslint:disable: cyclomatic-complexity /** @@ -905,12 +906,12 @@ export class MessageModel extends Backbone.Model { const openGroupMessage = new OpenGroupVisibleMessage(openGroupParams); const openGroup = OpenGroupData.getV2OpenGroupRoom(conversation.id); - return getMessageQueue().sendToOpenGroupV2( - openGroupMessage, + return getMessageQueue().sendToOpenGroupV2({ + message: openGroupMessage, roomInfos, - roomHasBlindEnabled(openGroup), - fileIdsToLink - ); + blinded: roomHasBlindEnabled(openGroup), + filesToLink: fileIdsToLink, + }); } const chatParams = { @@ -936,7 +937,11 @@ export class MessageModel extends Backbone.Model { } if (conversation.isPrivate()) { - return getMessageQueue().sendToPubKey(PubKey.cast(conversation.id), chatMessage); + return getMessageQueue().sendToPubKey( + PubKey.cast(conversation.id), + chatMessage, + SnodeNamespaces.UserMessages + ); } // Here, the convo is neither an open group, a private convo or ourself. It can only be a medium group. @@ -954,7 +959,10 @@ export class MessageModel extends Backbone.Model { groupId: this.get('conversationId'), }); - return getMessageQueue().sendToGroup(closedGroupVisibleMessage); + return getMessageQueue().sendToGroup({ + message: closedGroupVisibleMessage, + namespace: SnodeNamespaces.ClosedGroupMessage, + }); } catch (e) { await this.saveErrors(e); return null; @@ -1060,7 +1068,10 @@ export class MessageModel extends Backbone.Model { throw new Error('Cannot trigger syncMessage with unknown convo.'); } const syncMessage = buildSyncMessage(this.id, dataMessage, conversation.id, sentTimestamp); - await getMessageQueue().sendSyncMessage(syncMessage); + await getMessageQueue().sendSyncMessage({ + namespace: SnodeNamespaces.UserMessages, + message: syncMessage, + }); } this.set({ sentSync: true }); await this.commit(); diff --git a/ts/receiver/closedGroups.ts b/ts/receiver/closedGroups.ts index a1c7d9c1a..64695f86b 100644 --- a/ts/receiver/closedGroups.ts +++ b/ts/receiver/closedGroups.ts @@ -19,6 +19,7 @@ import { queueAllCachedFromSource } from './receiver'; import { getSwarmPollingInstance } from '../session/apis/snode_api'; import { perfEnd, perfStart } from '../session/utils/Performance'; import { ConversationTypeEnum } from '../models/conversationAttributes'; +import { SnodeNamespaces } from '../session/apis/snode_api/namespaces'; export const distributingClosedGroupEncryptionKeyPairs = new Map(); @@ -906,7 +907,11 @@ async function sendLatestKeyPairToUsers( }); // the encryption keypair is sent using established channels - await getMessageQueue().sendToPubKey(PubKey.cast(member), keypairsMessage); + await getMessageQueue().sendToPubKey( + PubKey.cast(member), + keypairsMessage, + SnodeNamespaces.UserMessages + ); }) ); } diff --git a/ts/session/apis/snode_api/SnodeRequestTypes.ts b/ts/session/apis/snode_api/SnodeRequestTypes.ts index 80aa52689..7b8665d35 100644 --- a/ts/session/apis/snode_api/SnodeRequestTypes.ts +++ b/ts/session/apis/snode_api/SnodeRequestTypes.ts @@ -1,3 +1,5 @@ +import { SnodeNamespaces } from './namespaces'; + export type SwarmForSubRequest = { method: 'get_swarm'; params: { pubkey: string } }; type RetrieveMaxCountSize = { max_count?: number; max_size?: number }; @@ -29,7 +31,7 @@ export type RetrievePubkeySubRequestType = { export type RetrieveLegacyClosedGroupSubRequestType = { method: 'retrieve'; params: { - namespace: -10; // legacy closed groups retrieve are not authenticated because the clients do not have a shared key + namespace: SnodeNamespaces.ClosedGroupMessage; // legacy closed groups retrieve are not authenticated because the clients do not have a shared key } & RetrieveAlwaysNeeded & RetrieveMaxCountSize; }; diff --git a/ts/session/apis/snode_api/batchRequest.ts b/ts/session/apis/snode_api/batchRequest.ts index 7946ece0b..01f239bff 100644 --- a/ts/session/apis/snode_api/batchRequest.ts +++ b/ts/session/apis/snode_api/batchRequest.ts @@ -44,7 +44,7 @@ export async function doSnodeBatchRequest( */ function decodeBatchRequest(snodeResponse: SnodeResponse): NotEmptyArrayOfBatchResults { try { - console.warn('decodeBatch: ', snodeResponse); + // console.warn('decodeBatch: ', snodeResponse); if (snodeResponse.status !== 200) { throw new Error(`decodeBatchRequest invalid status code: ${snodeResponse.status}`); } diff --git a/ts/session/apis/snode_api/namespaces.ts b/ts/session/apis/snode_api/namespaces.ts index 40d154492..4b5234ed1 100644 --- a/ts/session/apis/snode_api/namespaces.ts +++ b/ts/session/apis/snode_api/namespaces.ts @@ -1,9 +1,4 @@ export enum SnodeNamespaces { - /** - * The messages sent to a closed group are sent and polled from this namespace - */ - ClosedGroupMessages = -10, - /** * This is the namespace anyone can deposit a message for us */ @@ -13,14 +8,51 @@ export enum SnodeNamespaces { * This is the namespace used to sync our profile */ UserProfile = 2, - /** * This is the namespace used to sync our contacts */ UserContacts = 3, + /** + * The messages sent to a closed group are sent and polled from this namespace + */ + ClosedGroupMessage = -10, + /** * This is the namespace used to sync the closed group details for each of the closed groups we are polling */ - ClosedGroupInfo = 11, + ClosedGroupInfo = 1, } + +type PickEnum = { + [P in keyof K]: P extends K ? P : never; +}; + +export type SnodeNamespacesGroup = PickEnum< + SnodeNamespaces, + SnodeNamespaces.ClosedGroupInfo | SnodeNamespaces.ClosedGroupMessage +>; + +export type SnodeNamespacesUser = PickEnum< + SnodeNamespaces, + SnodeNamespaces.UserContacts | SnodeNamespaces.UserProfile | SnodeNamespaces.UserMessages +>; + +/** + * Returns true if that namespace is associated with the config of a user (not his messages, only configs) + */ +function isUserConfigNamespace(namespace: SnodeNamespaces) { + return namespace === SnodeNamespaces.UserContacts || namespace === SnodeNamespaces.UserProfile; +} + +/** + * Returns true if that namespace is associated with the config of a closed group (not its messages, only configs) + */ +function isGroupConfigNamespace(namespace: SnodeNamespaces) { + return namespace === SnodeNamespaces.ClosedGroupInfo; +} + +export const SnodeNamespace = { + isUserConfigNamespace, + isGroupConfigNamespace, +}; diff --git a/ts/session/apis/snode_api/retrieveRequest.ts b/ts/session/apis/snode_api/retrieveRequest.ts index 6e5efa206..47c6361a0 100644 --- a/ts/session/apis/snode_api/retrieveRequest.ts +++ b/ts/session/apis/snode_api/retrieveRequest.ts @@ -6,10 +6,12 @@ import { fromHexToArray, fromUInt8ArrayToBase64 } from '../../utils/String'; import { doSnodeBatchRequest } from './batchRequest'; import { GetNetworkTime } from './getNetworkTime'; import { SnodeNamespaces } from './namespaces'; + import { RetrieveLegacyClosedGroupSubRequestType, RetrieveSubRequestType, } from './SnodeRequestTypes'; +import { RetrieveMessagesResultsBatched, RetrieveMessagesResultsContent } from './types'; async function getRetrieveSignatureParams(params: { pubkey: string; @@ -71,15 +73,15 @@ async function buildRetrieveRequest( timestamp: GetNetworkTime.getNowWithNetworkOffset(), }; - if (namespace === SnodeNamespaces.ClosedGroupMessages) { + if (namespace === SnodeNamespaces.ClosedGroupMessage) { if (pubkey === ourPubkey || !pubkey.startsWith('05')) { throw new Error( - 'namespace -10 can only be used to retrieve messages from a legacy closed group' + 'namespace -10 can only be used to retrieve messages from a legacy closed group (prefix 05)' ); } const retrieveLegacyClosedGroup = { ...retrieveParam, - namespace: namespace as -10, + namespace, }; const retrieveParamsLegacy: RetrieveLegacyClosedGroupSubRequestType = { method: 'retrieve', @@ -99,7 +101,7 @@ async function buildRetrieveRequest( throw new Error('not a legacy closed group. namespace can only be 0'); } if (pubkey !== ourPubkey) { - throw new Error('not a legacy closed group. pubkey can only be our number'); + throw new Error('not a legacy closed group. pubkey can only be ours'); } const signatureArgs = { ...retrieveParam, ourPubkey }; const signatureBuilt = await getRetrieveSignatureParams(signatureArgs); @@ -121,7 +123,7 @@ async function retrieveNextMessages( associatedWith: string, namespaces: Array, ourPubkey: string -): Promise> }>> { +): Promise { if (namespaces.length !== lastHashes.length) { throw new Error('namespaces and lasthashes does not match'); } @@ -135,7 +137,7 @@ async function retrieveNextMessages( // let exceptions bubble up // no retry for this one as this a call we do every few seconds while polling for messages - console.warn('retrieveRequestsParams', retrieveRequestsParams); + console.warn(`fetching messages associatedWith:${associatedWith} namespaces:${namespaces}`); const results = await doSnodeBatchRequest(retrieveRequestsParams, targetNode, 4000); if (!results || !results.length) { @@ -153,11 +155,11 @@ async function retrieveNextMessages( ); } - if (namespaces.length > 1) { - throw new Error('multiple namespace polling todo'); - } const firstResult = results[0]; + // TODO we should probably check for status code of all the results (when polling for a few namespaces at a time) + console.warn('what should we do if we dont get a 200 on any of those fetches?'); + if (firstResult.code !== 200) { window?.log?.warn(`retrieveNextMessages result is not 200 but ${firstResult.code}`); throw new Error( @@ -165,21 +167,19 @@ async function retrieveNextMessages( ); } - console.warn('what should we do if we dont get a 200 on any of those fetches?'); - try { - // we rely on the code of the + // we rely on the code of the first one to check for online status const bodyFirstResult = firstResult.body; if (!window.inboxStore?.getState().onionPaths.isOnline) { window.inboxStore?.dispatch(updateIsOnline(true)); } GetNetworkTime.handleTimestampOffsetFromNetwork('retrieve', bodyFirstResult.t); - // merge results with their corresponding namespaces + // merge results with their corresponding namespaces return results.map((result, index) => ({ code: result.code, - messages: result.body as Array, + messages: result.body as RetrieveMessagesResultsContent, namespace: namespaces[index], })); } catch (e) { diff --git a/ts/session/apis/snode_api/sessionRpc.ts b/ts/session/apis/snode_api/sessionRpc.ts index db6597167..197de119a 100644 --- a/ts/session/apis/snode_api/sessionRpc.ts +++ b/ts/session/apis/snode_api/sessionRpc.ts @@ -129,8 +129,6 @@ export async function snodeRpc( params: clone(params), }; - console.warn('snodeRPC', body); - const fetchOptions: LokiFetchOptions = { method: 'POST', body: JSON.stringify(body), diff --git a/ts/session/apis/snode_api/swarmPolling.ts b/ts/session/apis/snode_api/swarmPolling.ts index 2bcd641b5..e4ac9b8da 100644 --- a/ts/session/apis/snode_api/swarmPolling.ts +++ b/ts/session/apis/snode_api/swarmPolling.ts @@ -3,7 +3,7 @@ import * as snodePool from './snodePool'; import { ERROR_CODE_NO_CONNECT } from './SNodeAPI'; import { SignalService } from '../../../protobuf'; import * as Receiver from '../../../receiver/receiver'; -import _, { concat, last } from 'lodash'; +import _, { compact, concat, difference, flatten, last, sample, uniqBy } from 'lodash'; import { Data, Snode } from '../../../data/data'; import { StringUtils, UserUtils } from '../../utils'; @@ -15,18 +15,8 @@ import { ed25519Str } from '../../onions/onionPath'; import { updateIsOnline } from '../../../state/ducks/onion'; import pRetry from 'p-retry'; import { SnodeAPIRetrieve } from './retrieveRequest'; -import { SnodeNamespaces } from './namespaces'; - -interface Message { - hash: string; - expiration: number; - data: string; -} - -export type RetrieveMessagesResults = Array<{ - code: number; - messages: Record[]; -}>; +import { SnodeNamespace, SnodeNamespaces } from './namespaces'; +import { RetrieveMessageItem, RetrieveMessagesResultsBatched } from './types'; // Some websocket nonsense export function processMessage(message: string, options: any = {}, messageHash: string) { @@ -172,7 +162,7 @@ export class SwarmPolling { `Polling for ${loggingId}; timeout: ${convoPollingTimeout}; diff: ${diff} ` ); - return this.pollOnceForKey(group.pubkey, true, [SnodeNamespaces.ClosedGroupMessages]); + return this.pollOnceForKey(group.pubkey, true, [SnodeNamespaces.ClosedGroupMessage]); } window?.log?.info( `Not polling for ${loggingId}; timeout: ${convoPollingTimeout} ; diff: ${diff}` @@ -208,27 +198,54 @@ export class SwarmPolling { // If we need more nodes, select randomly from the remaining nodes: if (!toPollFrom) { - const notPolled = _.difference(swarmSnodes, alreadyPolled); - toPollFrom = _.sample(notPolled) as Snode; + const notPolled = difference(swarmSnodes, alreadyPolled); + toPollFrom = sample(notPolled) as Snode; } - let resultsFromAllNamespaces: RetrieveMessagesResults | null; + let resultsFromAllNamespaces: RetrieveMessagesResultsBatched | null; try { resultsFromAllNamespaces = await this.pollNodeForKey(toPollFrom, pubkey, namespaces); } catch (e) { - window.log.warn('pollNodeForKey failed with: ', e.message); + window.log.warn( + `pollNodeForKey of ${pubkey} namespaces: ${namespaces} failed with: ${e.message}` + ); resultsFromAllNamespaces = null; } - // filter out null (exception thrown) - const arrayOfResults = resultsFromAllNamespaces?.length - ? _.compact(resultsFromAllNamespaces) - : null; + let userConfigMessagesMerged: Array = []; + let allNamespacesWithoutUserConfigIfNeeded: Array = []; + + // check if we just fetched the details from the config namespaces. + // If yes, merge them together and exclude them from the rest of the messages. + if (window.sessionFeatureFlags.useSharedUtilForUserConfig && resultsFromAllNamespaces) { + const userConfigMessages = resultsFromAllNamespaces + .filter(m => SnodeNamespace.isUserConfigNamespace(m.namespace)) + .map(r => r.messages.messages); + + allNamespacesWithoutUserConfigIfNeeded = flatten( + compact( + resultsFromAllNamespaces + .filter(m => !SnodeNamespace.isUserConfigNamespace(m.namespace)) + .map(r => r.messages.messages) + ) + ); + userConfigMessagesMerged = flatten(compact(userConfigMessages)); + } else { + allNamespacesWithoutUserConfigIfNeeded = flatten( + compact(resultsFromAllNamespaces?.map(m => m.messages.messages)) + ); + } + + console.warn(`received userConfigMessagesMerged: ${userConfigMessagesMerged.length}`); + console.warn( + `received allNamespacesWithoutUserConfigIfNeeded: ${allNamespacesWithoutUserConfigIfNeeded.length}` + ); + // Merge results into one list of unique messages - const messages = _.uniqBy(_.flatten(arrayOfResults), (x: any) => x.hash); + const messages = uniqBy(allNamespacesWithoutUserConfigIfNeeded, x => x.hash); // if all snodes returned an error (null), no need to update the lastPolledTimestamp - if (isGroup && arrayOfResults?.length) { + if (isGroup && allNamespacesWithoutUserConfigIfNeeded?.length) { window?.log?.info( `Polled for group(${ed25519Str(pubkey.key)}):, got ${messages.length} messages back.` ); @@ -263,10 +280,7 @@ export class SwarmPolling { perfEnd(`handleSeenMessages-${pkStr}`, 'handleSeenMessages'); - if (window.sessionFeatureFlags.useSharedUtilForUserConfig) { - } - - newMessages.forEach((m: Message) => { + newMessages.forEach((m: RetrieveMessageItem) => { const options = isGroup ? { conversationId: pkStr } : {}; processMessage(m.data, options, m.hash); }); @@ -278,7 +292,7 @@ export class SwarmPolling { node: Snode, pubkey: PubKey, namespaces: Array - ): Promise { + ): Promise { const namespaceLength = namespaces.length; if (namespaceLength > 3 || namespaceLength <= 0) { throw new Error('pollNodeForKey needs 1 or 2 namespaces to be given at all times'); @@ -313,7 +327,7 @@ export class SwarmPolling { } const lastMessages = results.map(r => { - return last(r.messages); + return last(r.messages.messages); }); await Promise.all( @@ -336,7 +350,6 @@ export class SwarmPolling { { minTimeout: 100, retries: 1, - onFailedAttempt: e => { window?.log?.warn( `retrieveNextMessages attempt #${e.attemptNumber} failed. ${e.retriesLeft} retries left... ${e.name}` @@ -375,18 +388,20 @@ export class SwarmPolling { }); } - private async handleSeenMessages(messages: Array): Promise> { + private async handleSeenMessages( + messages: Array + ): Promise> { if (!messages.length) { return []; } - const incomingHashes = messages.map((m: Message) => m.hash); + const incomingHashes = messages.map((m: RetrieveMessageItem) => m.hash); const dupHashes = await Data.getSeenMessagesByHashList(incomingHashes); - const newMessages = messages.filter((m: Message) => !dupHashes.includes(m.hash)); + const newMessages = messages.filter((m: RetrieveMessageItem) => !dupHashes.includes(m.hash)); if (newMessages.length) { - const newHashes = newMessages.map((m: Message) => ({ + const newHashes = newMessages.map((m: RetrieveMessageItem) => ({ expiresAt: m.expiration, hash: m.hash, })); diff --git a/ts/session/apis/snode_api/types.ts b/ts/session/apis/snode_api/types.ts new file mode 100644 index 000000000..705c7b5fe --- /dev/null +++ b/ts/session/apis/snode_api/types.ts @@ -0,0 +1,23 @@ +import { SnodeNamespaces } from './namespaces'; + +export type RetrieveMessageItem = { + hash: string; + expiration: number; + data: string; // base64 encrypted content of the emssage + timestamp: number; +}; + +export type RetrieveMessagesResultsContent = { + hf?: Array; + messages?: Array; + more: boolean; + t: number; +}; + +export type RetrieveRequestResult = { + code: number; + messages: RetrieveMessagesResultsContent; + namespace: SnodeNamespaces; +}; + +export type RetrieveMessagesResultsBatched = Array; diff --git a/ts/session/conversations/createClosedGroup.ts b/ts/session/conversations/createClosedGroup.ts index 8973cd24a..51424e481 100644 --- a/ts/session/conversations/createClosedGroup.ts +++ b/ts/session/conversations/createClosedGroup.ts @@ -7,6 +7,7 @@ import { ECKeyPair } from '../../receiver/keypairs'; import { openConversationWithMessages } from '../../state/ducks/conversations'; import { updateConfirmModal } from '../../state/ducks/modalDialog'; import { getSwarmPollingInstance } from '../apis/snode_api'; +import { SnodeNamespaces } from '../apis/snode_api/namespaces'; import { generateClosedGroupPublicKey, generateCurve25519KeyPairWithoutPrefix, @@ -230,6 +231,10 @@ function createInvitePromises( expireTimer: existingExpireTimer, }; const message = new ClosedGroupNewMessage(messageParams); - return getMessageQueue().sendToPubKeyNonDurably(PubKey.cast(m), message); + return getMessageQueue().sendToPubKeyNonDurably({ + pubkey: PubKey.cast(m), + message, + namespace: SnodeNamespaces.UserMessages, + }); }); } diff --git a/ts/session/group/closed-group.ts b/ts/session/group/closed-group.ts index abd9379f5..d16c33a31 100644 --- a/ts/session/group/closed-group.ts +++ b/ts/session/group/closed-group.ts @@ -29,6 +29,7 @@ import { ClosedGroupRemovedMembersMessage } from '../messages/outgoing/controlMe import { getSwarmPollingInstance } from '../apis/snode_api'; import { ConversationAttributes, ConversationTypeEnum } from '../../models/conversationAttributes'; import { GetNetworkTime } from '../apis/snode_api/getNetworkTime'; +import { SnodeNamespaces } from '../apis/snode_api/namespaces'; export type GroupInfo = { id: string; @@ -338,11 +339,15 @@ export async function leaveClosedGroup(groupId: string) { window?.log?.info(`We are leaving the group ${groupId}. Sending our leaving message.`); // sent the message to the group and once done, remove everything related to this group getSwarmPollingInstance().removePubkey(groupId); - await getMessageQueue().sendToGroup(ourLeavingMessage, async () => { - window?.log?.info( - `Leaving message sent ${groupId}. Removing everything related to this group.` - ); - await markGroupAsLeftOrKicked(groupId, convo, false); + await getMessageQueue().sendToGroup({ + message: ourLeavingMessage, + namespace: SnodeNamespaces.ClosedGroupMessage, + sentCb: async () => { + window?.log?.info( + `Leaving message sent ${groupId}. Removing everything related to this group.` + ); + await markGroupAsLeftOrKicked(groupId, convo, false); + }, }); } @@ -361,7 +366,10 @@ async function sendNewName(convo: ConversationModel, name: string, messageId: st identifier: messageId, name, }); - await getMessageQueue().sendToGroup(nameChangeMessage); + await getMessageQueue().sendToGroup({ + message: nameChangeMessage, + namespace: SnodeNamespaces.ClosedGroupMessage, + }); } async function sendAddedMembers( @@ -393,7 +401,10 @@ async function sendAddedMembers( addedMembers, identifier: messageId, }); - await getMessageQueue().sendToGroup(closedGroupControlMessage); + await getMessageQueue().sendToGroup({ + message: closedGroupControlMessage, + namespace: SnodeNamespaces.ClosedGroupMessage, + }); // Send closed group update messages to any new members individually const newClosedGroupUpdate = new ClosedGroupNewMessage({ @@ -410,7 +421,11 @@ async function sendAddedMembers( const promises = addedMembers.map(async m => { await getConversationController().getOrCreateAndWait(m, ConversationTypeEnum.PRIVATE); const memberPubKey = PubKey.cast(m); - await getMessageQueue().sendToPubKey(memberPubKey, newClosedGroupUpdate); + await getMessageQueue().sendToPubKey( + memberPubKey, + newClosedGroupUpdate, + SnodeNamespaces.ClosedGroupMessage + ); }); await Promise.all(promises); } @@ -445,15 +460,19 @@ export async function sendRemovedMembers( identifier: messageId, }); // Send the group update, and only once sent, generate and distribute a new encryption key pair if needed - await getMessageQueue().sendToGroup(mainClosedGroupControlMessage, async () => { - if (isCurrentUserAdmin) { - // we send the new encryption key only to members already here before the update - window?.log?.info( - `Sending group update: A user was removed from ${groupId} and we are the admin. Generating and sending a new EncryptionKeyPair` - ); - - await generateAndSendNewEncryptionKeyPair(groupId, stillMembers); - } + await getMessageQueue().sendToGroup({ + message: mainClosedGroupControlMessage, + namespace: SnodeNamespaces.ClosedGroupMessage, + sentCb: async () => { + if (isCurrentUserAdmin) { + // we send the new encryption key only to members already here before the update + window?.log?.info( + `Sending group update: A user was removed from ${groupId} and we are the admin. Generating and sending a new EncryptionKeyPair` + ); + + await generateAndSendNewEncryptionKeyPair(groupId, stillMembers); + } + }, }); } @@ -513,7 +532,11 @@ async function generateAndSendNewEncryptionKeyPair( await addKeyPairToCacheAndDBIfNeeded(toHex(groupId), newKeyPair.toHexKeyPair()); }; // this is to be sent to the group pubkey adress - await getMessageQueue().sendToGroup(keypairsMessage, messageSentCallback); + await getMessageQueue().sendToGroup({ + message: keypairsMessage, + namespace: SnodeNamespaces.ClosedGroupMessage, + sentCb: messageSentCallback, + }); } export async function buildEncryptionKeyPairWrappers( diff --git a/ts/session/messages/outgoing/controlMessage/DataExtractionNotificationMessage.ts b/ts/session/messages/outgoing/controlMessage/DataExtractionNotificationMessage.ts index 04ab63910..1a9b8e955 100644 --- a/ts/session/messages/outgoing/controlMessage/DataExtractionNotificationMessage.ts +++ b/ts/session/messages/outgoing/controlMessage/DataExtractionNotificationMessage.ts @@ -8,6 +8,7 @@ import { getConversationController } from '../../../conversations'; import { UserUtils } from '../../../utils'; import { SettingsKey } from '../../../../data/settings-key'; import { Storage } from '../../../../util/storage'; +import { SnodeNamespaces } from '../../../apis/snode_api/namespaces'; interface DataExtractionNotificationMessageParams extends MessageParams { referencedAttachmentTimestamp: number; } @@ -73,7 +74,7 @@ export const sendDataExtractionNotification = async ( ); try { - await getMessageQueue().sendToPubKey(pubkey, dataExtractionNotificationMessage); + await getMessageQueue().sendToPubKey(pubkey, dataExtractionNotificationMessage, SnodeNamespaces.UserMessages); } catch (e) { window.log.warn('failed to send data extraction notification', e); } diff --git a/ts/session/sending/MessageQueue.ts b/ts/session/sending/MessageQueue.ts index 0d323b8e5..ab4050991 100644 --- a/ts/session/sending/MessageQueue.ts +++ b/ts/session/sending/MessageQueue.ts @@ -24,6 +24,11 @@ import { CallMessage } from '../messages/outgoing/controlMessage/CallMessage'; import { OpenGroupMessageV2 } from '../apis/open_group_api/opengroupV2/OpenGroupMessageV2'; import { AbortController } from 'abort-controller'; import { sendSogsReactionOnionV4 } from '../apis/open_group_api/sogsv3/sogsV3SendReaction'; +import { + SnodeNamespaces, + SnodeNamespacesGroup, + SnodeNamespacesUser, +} from '../apis/snode_api/namespaces'; type ClosedGroupMessageType = | ClosedGroupVisibleMessage @@ -49,13 +54,14 @@ export class MessageQueue { public async sendToPubKey( destinationPubKey: PubKey, message: ContentMessage, + namespace: SnodeNamespaces, sentCb?: (message: RawMessage) => Promise, isGroup = false ): Promise { if (message instanceof ConfigurationMessage || !!(message as any).syncTarget) { throw new Error('SyncMessage needs to be sent with sendSyncMessage'); } - await this.process(destinationPubKey, message, sentCb, isGroup); + await this.process(destinationPubKey, message, namespace, sentCb, isGroup); } /** @@ -66,12 +72,17 @@ export class MessageQueue { * * fileIds is the array of ids this message is linked to. If we upload files as part of a message but do not link them with this, the files will be deleted much sooner */ - public async sendToOpenGroupV2( - message: OpenGroupVisibleMessage, - roomInfos: OpenGroupRequestCommonType, - blinded: boolean, - filesToLink: Array - ) { + public async sendToOpenGroupV2({ + blinded, + filesToLink, + message, + roomInfos, + }: { + message: OpenGroupVisibleMessage; + roomInfos: OpenGroupRequestCommonType; + blinded: boolean; + filesToLink: Array; + }) { // Skipping the queue for Open Groups v2; the message is sent directly try { @@ -115,12 +126,17 @@ export class MessageQueue { } } - public async sendToOpenGroupV2BlindedRequest( - encryptedContent: Uint8Array, - roomInfos: OpenGroupRequestCommonType, - message: OpenGroupVisibleMessage, - recipientBlindedId: string - ) { + public async sendToOpenGroupV2BlindedRequest({ + encryptedContent, + message, + recipientBlindedId, + roomInfos, + }: { + encryptedContent: Uint8Array; + roomInfos: OpenGroupRequestCommonType; + message: OpenGroupVisibleMessage; + recipientBlindedId: string; + }) { try { if (!PubKey.hasBlindedPrefix(recipientBlindedId)) { throw new Error('sendToOpenGroupV2BlindedRequest needs a blindedId'); @@ -153,11 +169,17 @@ export class MessageQueue { * * @param sentCb currently only called for medium groups sent message */ - public async sendToGroup( - message: ClosedGroupMessageType, - sentCb?: (message: RawMessage) => Promise, - groupPubKey?: PubKey - ): Promise { + public async sendToGroup({ + message, + namespace, + groupPubKey, + sentCb, + }: { + message: ClosedGroupMessageType; + namespace: SnodeNamespacesGroup; + sentCb?: (message: RawMessage) => Promise; + groupPubKey?: PubKey; + }): Promise { let destinationPubKey: PubKey | undefined = groupPubKey; if (message instanceof ExpirationTimerUpdateMessage || message instanceof ClosedGroupMessage) { destinationPubKey = groupPubKey ? groupPubKey : message.groupId; @@ -168,13 +190,18 @@ export class MessageQueue { } // if groupId is set here, it means it's for a medium group. So send it as it - return this.sendToPubKey(PubKey.cast(destinationPubKey), message, sentCb, true); + return this.sendToPubKey(PubKey.cast(destinationPubKey), message, namespace, sentCb, true); } - public async sendSyncMessage( - message?: SyncMessageType, - sentCb?: (message: RawMessage) => Promise - ): Promise { + public async sendSyncMessage({ + namespace, + message, + sentCb, + }: { + namespace: SnodeNamespacesUser; + message?: SyncMessageType; + sentCb?: (message: RawMessage) => Promise; + }): Promise { if (!message) { return; } @@ -188,7 +215,7 @@ export class MessageQueue { const ourPubKey = UserUtils.getOurPubKeyStrFromCache(); - await this.process(PubKey.cast(ourPubKey), message, sentCb); + await this.process(PubKey.cast(ourPubKey), message, namespace, sentCb); } /** @@ -196,13 +223,18 @@ export class MessageQueue { * @param user user pub key to send to * @param message Message to be sent */ - public async sendToPubKeyNonDurably( - user: PubKey, - message: ClosedGroupNewMessage | CallMessage - ): Promise { + public async sendToPubKeyNonDurably({ + message, + namespace, + pubkey, + }: { + pubkey: PubKey; + message: ClosedGroupNewMessage | CallMessage; + namespace: SnodeNamespaces; + }): Promise { let rawMessage; try { - rawMessage = await MessageUtils.toRawMessage(user, message); + rawMessage = await MessageUtils.toRawMessage(pubkey, message, namespace); const { wrappedEnvelope, effectiveTimestamp } = await MessageSender.send(rawMessage); await MessageSentHandler.handleMessageSentSuccess( rawMessage, @@ -281,6 +313,7 @@ export class MessageQueue { private async process( destinationPk: PubKey, message: ContentMessage, + namespace: SnodeNamespaces, sentCb?: (message: RawMessage) => Promise, isGroup = false ): Promise { @@ -305,7 +338,7 @@ export class MessageQueue { } } - await this.pendingMessageCache.add(destinationPk, message, sentCb, isGroup); + await this.pendingMessageCache.add(destinationPk, message, namespace, sentCb, isGroup); void this.processPending(destinationPk, isSyncMessage); } diff --git a/ts/session/sending/MessageSender.ts b/ts/session/sending/MessageSender.ts index 530a7a8df..78e944817 100644 --- a/ts/session/sending/MessageSender.ts +++ b/ts/session/sending/MessageSender.ts @@ -12,7 +12,6 @@ import { OpenGroupVisibleMessage } from '../messages/outgoing/visibleMessage/Ope import { addMessagePadding } from '../crypto/BufferPadding'; import _ from 'lodash'; import { getSwarmFor } from '../apis/snode_api/snodePool'; -import { firstTrue } from '../utils/Promise'; import { MessageSender } from '.'; import { Data, Snode } from '../../../ts/data/data'; import { getConversationController } from '../conversations'; @@ -27,8 +26,7 @@ import { AbortController } from 'abort-controller'; import { SnodeAPIStore } from '../apis/snode_api/storeMessage'; import { StoreOnNodeParams } from '../apis/snode_api/SnodeRequestTypes'; import { GetNetworkTime } from '../apis/snode_api/getNetworkTime'; - -const DEFAULT_CONNECTIONS = 1; +import { SnodeNamespaces } from '../apis/snode_api/namespaces'; // ================ SNODE STORE ================ @@ -83,6 +81,7 @@ export async function send( async () => { const recipient = PubKey.cast(message.device); const { encryption, ttl } = message; + let namespace = message.namespace; const { overRiddenTimestampBuffer, @@ -113,14 +112,25 @@ export async function send( found.set({ sent_at: networkTimestamp }); await found.commit(); } - await MessageSender.sendMessageToSnode( - recipient.key, + + // right when we upgrade from not having namespaces stored in the outgoing cached messages our messages won't have a namespace associated. + // So we need to keep doing the lookup of where they should go if the namespace is not set. + if (namespace === null || namespace === undefined) { + namespace = getConversationController() + .get(recipient.key) + ?.isClosedGroup() + ? SnodeNamespaces.ClosedGroupMessage + : SnodeNamespaces.UserMessages; + } + await MessageSender.sendMessageToSnode({ + pubKey: recipient.key, data, ttl, - networkTimestamp, + timestamp: networkTimestamp, isSyncMessage, - message.identifier - ); + messageId: message.identifier, + namespace, + }); return { wrappedEnvelope: data, effectiveTimestamp: networkTimestamp }; }, { @@ -132,22 +142,29 @@ export async function send( } // tslint:disable-next-line: function-name -export async function sendMessageToSnode( - pubKey: string, - data: Uint8Array, - ttl: number, - timestamp: number, - isSyncMessage?: boolean, - messageId?: string -): Promise { +export async function sendMessageToSnode({ + data, + namespace, + pubKey, + timestamp, + ttl, + isSyncMessage, + messageId, +}: { + pubKey: string; + data: Uint8Array; + ttl: number; + timestamp: number; + namespace: SnodeNamespaces; + isSyncMessage?: boolean; + messageId?: string; +}): Promise { const data64 = ByteBuffer.wrap(data).toString('base64'); const swarm = await getSwarmFor(pubKey); const conversation = getConversationController().get(pubKey); const isClosedGroup = conversation?.isClosedGroup(); - const namespace = isClosedGroup ? -10 : 0; - // send parameters const params: StoreOnNodeParams = { pubkey: pubKey, @@ -157,32 +174,27 @@ export async function sendMessageToSnode( namespace, }; - const usedNodes = _.slice(swarm, 0, DEFAULT_CONNECTIONS); + const usedNodes = _.slice(swarm, 0, 1); if (!usedNodes || usedNodes.length === 0) { throw new EmptySwarmError(pubKey, 'Ran out of swarm nodes to query'); } let successfulSendHash: string | undefined; - const promises = usedNodes.map(async usedNode => { + + let snode: Snode | undefined; + try { + const snodeTried = usedNodes[0]; // No pRetry here as if this is a bad path it will be handled and retried in lokiOnionFetch. // the only case we could care about a retry would be when the usedNode is not correct, // but considering we trigger this request with a few snode in //, this should be fine. - const successfulSend = await SnodeAPIStore.storeOnNode(usedNode, params); + const successfulSend = await SnodeAPIStore.storeOnNode(snodeTried, params); if (successfulSend) { if (_.isString(successfulSend)) { successfulSendHash = successfulSend; } - return usedNode; + snode = snodeTried; } - // should we mark snode as bad if it can't store our message? - return undefined; - }); - - let snode: Snode | undefined; - try { - const firstSuccessSnode = await firstTrue(promises); - snode = firstSuccessSnode; } catch (e) { const snodeStr = snode ? `${snode.ip}:${snode.port}` : 'null'; window?.log?.warn( @@ -203,11 +215,13 @@ export async function sendMessageToSnode( } } - window?.log?.info( - `loki_message:::sendMessage - Successfully stored message to ${ed25519Str(pubKey)} via ${ - snode.ip - }:${snode.port}` - ); + if (snode) { + window?.log?.info( + `loki_message:::sendMessage - Successfully stored message to ${ed25519Str(pubKey)} via ${ + snode.ip + }:${snode.port}` + ); + } } async function buildEnvelope( diff --git a/ts/session/sending/PendingMessageCache.ts b/ts/session/sending/PendingMessageCache.ts index f3affb3b2..5ded9c477 100644 --- a/ts/session/sending/PendingMessageCache.ts +++ b/ts/session/sending/PendingMessageCache.ts @@ -4,6 +4,7 @@ import { PartialRawMessage, RawMessage } from '../types/RawMessage'; import { ContentMessage } from '../messages/outgoing'; import { PubKey } from '../types'; import { MessageUtils } from '../utils'; +import { SnodeNamespaces } from '../apis/snode_api/namespaces'; // This is an abstraction for storing pending messages. // Ideally we want to store pending messages in the database so that @@ -41,11 +42,17 @@ export class PendingMessageCache { public async add( destinationPubKey: PubKey, message: ContentMessage, + namespace: SnodeNamespaces, sentCb?: (message: any) => Promise, isGroup = false ): Promise { await this.loadFromDBIfNeeded(); - const rawMessage = await MessageUtils.toRawMessage(destinationPubKey, message, isGroup); + const rawMessage = await MessageUtils.toRawMessage( + destinationPubKey, + message, + namespace, + isGroup + ); // Does it exist in cache already? if (this.find(rawMessage)) { diff --git a/ts/session/types/RawMessage.ts b/ts/session/types/RawMessage.ts index 07a933b67..bebabc47c 100644 --- a/ts/session/types/RawMessage.ts +++ b/ts/session/types/RawMessage.ts @@ -1,4 +1,5 @@ import { SignalService } from '../../protobuf'; +import { SnodeNamespaces } from '../apis/snode_api/namespaces'; export type RawMessage = { identifier: string; @@ -6,6 +7,7 @@ export type RawMessage = { device: string; ttl: number; encryption: SignalService.Envelope.Type; + namespace: SnodeNamespaces | null; // allowing null as when we upgrade, we might have messages awaiting sending which won't have a namespace }; // For building RawMessages from JSON diff --git a/ts/session/utils/Messages.ts b/ts/session/utils/Messages.ts index 7a42ca5c0..fe94a9b05 100644 --- a/ts/session/utils/Messages.ts +++ b/ts/session/utils/Messages.ts @@ -7,6 +7,7 @@ import { ClosedGroupEncryptionPairReplyMessage } from '../messages/outgoing/cont import { ContentMessage } from '../messages/outgoing'; import { ExpirationTimerUpdateMessage } from '../messages/outgoing/controlMessage/ExpirationTimerUpdateMessage'; import { SignalService } from '../../protobuf'; +import { SnodeNamespaces } from '../apis/snode_api/namespaces'; function getEncryptionTypeFromMessageType( message: ContentMessage, @@ -36,6 +37,7 @@ function getEncryptionTypeFromMessageType( export async function toRawMessage( destinationPubKey: PubKey, message: ContentMessage, + namespace: SnodeNamespaces, isGroup = false ): Promise { const ttl = message.ttl(); @@ -50,6 +52,7 @@ export async function toRawMessage( device: destinationPubKey.key, ttl, encryption, + namespace, }; return rawMessage; diff --git a/ts/session/utils/calling/CallManager.ts b/ts/session/utils/calling/CallManager.ts index dddeef8f2..be04987af 100644 --- a/ts/session/utils/calling/CallManager.ts +++ b/ts/session/utils/calling/CallManager.ts @@ -28,6 +28,7 @@ import { getCallMediaPermissionsSettings } from '../../../components/settings/Se import { PnServer } from '../../apis/push_notification_api'; import { approveConvoAndSendResponse } from '../../../interactions/conversationInteractions'; import { GetNetworkTime } from '../../apis/snode_api/getNetworkTime'; +import { SnodeNamespaces } from '../../apis/snode_api/namespaces'; // tslint:disable: function-name @@ -414,10 +415,11 @@ async function createOfferAndSendIt(recipient: string) { }); window.log.info(`sending '${offer.type}'' with callUUID: ${currentCallUUID}`); - const negotiationOfferSendResult = await getMessageQueue().sendToPubKeyNonDurably( - PubKey.cast(recipient), - offerMessage - ); + const negotiationOfferSendResult = await getMessageQueue().sendToPubKeyNonDurably({ + pubkey: PubKey.cast(recipient), + message: offerMessage, + namespace: SnodeNamespaces.UserMessages, + }); if (typeof negotiationOfferSendResult === 'number') { // window.log?.warn('setting last sent timestamp'); lastOutgoingOfferTimestamp = negotiationOfferSendResult; @@ -514,7 +516,11 @@ export async function USER_callRecipient(recipient: string) { // we do it manually as the sendToPubkeyNonDurably rely on having a message saved to the db for MessageSentSuccess // which is not the case for a pre offer message (the message only exists in memory) - const rawMessage = await MessageUtils.toRawMessage(PubKey.cast(recipient), preOfferMsg); + const rawMessage = await MessageUtils.toRawMessage( + PubKey.cast(recipient), + preOfferMsg, + SnodeNamespaces.UserMessages + ); const { wrappedEnvelope } = await MessageSender.send(rawMessage); void PnServer.notifyPnServer(wrappedEnvelope, recipient); @@ -572,7 +578,11 @@ const iceSenderDebouncer = _.debounce(async (recipient: string) => { `sending ICE CANDIDATES MESSAGE to ${ed25519Str(recipient)} about call ${currentCallUUID}` ); - await getMessageQueue().sendToPubKeyNonDurably(PubKey.cast(recipient), callIceCandicates); + await getMessageQueue().sendToPubKeyNonDurably({ + pubkey: PubKey.cast(recipient), + message: callIceCandicates, + namespace: SnodeNamespaces.UserMessages, + }); }, 2000); const findLastMessageTypeFromSender = (sender: string, msgType: SignalService.CallMessage.Type) => { @@ -903,8 +913,16 @@ export async function USER_rejectIncomingCallRequest(fromSender: string) { async function sendCallMessageAndSync(callmessage: CallMessage, user: string) { await Promise.all([ - getMessageQueue().sendToPubKeyNonDurably(PubKey.cast(user), callmessage), - getMessageQueue().sendToPubKeyNonDurably(UserUtils.getOurPubKeyFromCache(), callmessage), + getMessageQueue().sendToPubKeyNonDurably({ + pubkey: PubKey.cast(user), + message: callmessage, + namespace: SnodeNamespaces.UserMessages, + }), + getMessageQueue().sendToPubKeyNonDurably({ + pubkey: UserUtils.getOurPubKeyFromCache(), + message: callmessage, + namespace: SnodeNamespaces.UserMessages, + }), ]); } @@ -921,7 +939,11 @@ export async function USER_hangup(fromSender: string) { timestamp: Date.now(), uuid: currentCallUUID, }); - void getMessageQueue().sendToPubKeyNonDurably(PubKey.cast(fromSender), endCallMessage); + void getMessageQueue().sendToPubKeyNonDurably({ + pubkey: PubKey.cast(fromSender), + message: endCallMessage, + namespace: SnodeNamespaces.UserMessages, + }); } window.inboxStore?.dispatch(endCall()); diff --git a/ts/session/utils/syncUtils.ts b/ts/session/utils/syncUtils.ts index cb41ff728..62b18628b 100644 --- a/ts/session/utils/syncUtils.ts +++ b/ts/session/utils/syncUtils.ts @@ -26,6 +26,7 @@ import { DURATION } from '../constants'; import { UnsendMessage } from '../messages/outgoing/controlMessage/UnsendMessage'; import { MessageRequestResponse } from '../messages/outgoing/controlMessage/MessageRequestResponse'; import { PubKey } from '../types'; +import { SnodeNamespaces } from '../apis/snode_api/namespaces'; const ITEM_ID_LAST_SYNC_TIMESTAMP = 'lastSyncedTimestamp'; @@ -53,7 +54,10 @@ export const syncConfigurationIfNeeded = async () => { try { // window?.log?.info('syncConfigurationIfNeeded with', configMessage); - await getMessageQueue().sendSyncMessage(configMessage); + await getMessageQueue().sendSyncMessage({ + namespace: SnodeNamespaces.UserMessages, + message: configMessage, + }); } catch (e) { window?.log?.warn('Caught an error while sending our ConfigurationMessage:', e); // we do return early so that next time we use the old timestamp again @@ -81,7 +85,11 @@ export const forceSyncConfigurationNowIfNeeded = async (waitForMessageSent = fal resolve(true); } : undefined; - void getMessageQueue().sendSyncMessage(configMessage, callback as any); + void getMessageQueue().sendSyncMessage({ + namespace: SnodeNamespaces.UserMessages, + message: configMessage, + sentCb: callback as any, + }); // either we resolve from the callback if we need to wait for it, // or we don't want to wait, we resolve it here. if (!waitForMessageSent) { diff --git a/ts/test/session/unit/sending/MessageQueue_test.ts b/ts/test/session/unit/sending/MessageQueue_test.ts index 51892a99b..e07421342 100644 --- a/ts/test/session/unit/sending/MessageQueue_test.ts +++ b/ts/test/session/unit/sending/MessageQueue_test.ts @@ -17,6 +17,7 @@ import { ClosedGroupMessage } from '../../../../session/messages/outgoing/contro import chaiAsPromised from 'chai-as-promised'; import { MessageSentHandler } from '../../../../session/sending/MessageSentHandler'; import { stubData } from '../../../test-utils/utils'; +import { SnodeNamespaces } from '../../../../session/apis/snode_api/namespaces'; chai.use(chaiAsPromised as any); chai.should(); @@ -102,7 +103,11 @@ describe('MessageQueue', () => { } const device = TestUtils.generateFakePubKey(); - await pendingMessageCache.add(device, TestUtils.generateVisibleMessage()); + await pendingMessageCache.add( + device, + TestUtils.generateVisibleMessage(), + SnodeNamespaces.UserMessages + ); const initialMessages = await pendingMessageCache.getForDevice(device); expect(initialMessages).to.have.length(1); @@ -139,7 +144,7 @@ describe('MessageQueue', () => { }); void pendingMessageCache - .add(device, message, waitForMessageSentEvent) + .add(device, message, SnodeNamespaces.UserMessages, waitForMessageSentEvent) .then(() => messageQueueStub.processPending(device)); }); @@ -149,7 +154,7 @@ describe('MessageQueue', () => { const device = TestUtils.generateFakePubKey(); const message = TestUtils.generateVisibleMessage(); void pendingMessageCache - .add(device, message) + .add(device, message, SnodeNamespaces.UserMessages) .then(() => messageQueueStub.processPending(device)); // The cb is only invoke is all reties fails. Here we poll until the messageSentHandlerFailed was invoked as this is what we want to do @@ -177,7 +182,7 @@ describe('MessageQueue', () => { const stub = Sinon.stub(messageQueueStub as any, 'process').resolves(); const message = TestUtils.generateVisibleMessage(); - await messageQueueStub.sendToPubKey(device, message); + await messageQueueStub.sendToPubKey(device, message, SnodeNamespaces.UserMessages); const args = stub.lastCall.args as [Array, ContentMessage]; expect(args[0]).to.be.equal(device); @@ -188,9 +193,12 @@ describe('MessageQueue', () => { describe('sendToGroup', () => { it('should throw an error if invalid non-group message was passed', async () => { const chatMessage = TestUtils.generateVisibleMessage(); - return expect(messageQueueStub.sendToGroup(chatMessage as any)).to.be.rejectedWith( - 'Invalid group message passed in sendToGroup.' - ); + return expect( + messageQueueStub.sendToGroup({ + message: chatMessage as any, + namespace: SnodeNamespaces.ClosedGroupMessage, + }) + ).to.be.rejectedWith('Invalid group message passed in sendToGroup.'); }); describe('closed groups', () => { @@ -201,7 +209,10 @@ describe('MessageQueue', () => { const send = Sinon.stub(messageQueueStub, 'sendToPubKey').resolves(); const message = TestUtils.generateClosedGroupMessage(); - await messageQueueStub.sendToGroup(message); + await messageQueueStub.sendToGroup({ + message, + namespace: SnodeNamespaces.ClosedGroupMessage, + }); expect(send.callCount).to.equal(1); const arg = send.getCall(0).args; @@ -223,7 +234,12 @@ describe('MessageQueue', () => { const message = TestUtils.generateOpenGroupVisibleMessage(); const roomInfos = TestUtils.generateOpenGroupV2RoomInfos(); - await messageQueueStub.sendToOpenGroupV2(message, roomInfos, false, []); + await messageQueueStub.sendToOpenGroupV2({ + message, + roomInfos, + blinded: false, + filesToLink: [], + }); expect(sendToOpenGroupV2Stub.callCount).to.equal(1); }); @@ -235,7 +251,12 @@ describe('MessageQueue', () => { const message = TestUtils.generateOpenGroupVisibleMessage(); const roomInfos = TestUtils.generateOpenGroupV2RoomInfos(); - await messageQueueStub.sendToOpenGroupV2(message, roomInfos, false, []); + await messageQueueStub.sendToOpenGroupV2({ + message, + roomInfos, + blinded: false, + filesToLink: [], + }); expect(messageSentPublicHandlerSuccessStub.callCount).to.equal(1); expect(messageSentPublicHandlerSuccessStub.lastCall.args[0]).to.equal(message.identifier); @@ -250,7 +271,12 @@ describe('MessageQueue', () => { const message = TestUtils.generateOpenGroupVisibleMessage(); const roomInfos = TestUtils.generateOpenGroupV2RoomInfos(); - await messageQueueStub.sendToOpenGroupV2(message, roomInfos, false, []); + await messageQueueStub.sendToOpenGroupV2({ + message, + roomInfos, + blinded: false, + filesToLink: [], + }); expect(messageSentHandlerFailedStub.callCount).to.equal(1); expect(messageSentHandlerFailedStub.lastCall.args[0].identifier).to.equal( message.identifier diff --git a/ts/test/session/unit/sending/MessageSender_test.ts b/ts/test/session/unit/sending/MessageSender_test.ts index 7839abf6c..9db043876 100644 --- a/ts/test/session/unit/sending/MessageSender_test.ts +++ b/ts/test/session/unit/sending/MessageSender_test.ts @@ -17,6 +17,7 @@ import { OnionV4 } from '../../../../session/onions/onionv4'; import { OnionSending } from '../../../../session/onions/onionSend'; import { OpenGroupMessageV2 } from '../../../../session/apis/open_group_api/opengroupV2/OpenGroupMessageV2'; import { GetNetworkTime } from '../../../../session/apis/snode_api/getNetworkTime'; +import { SnodeNamespaces } from '../../../../session/apis/snode_api/namespaces'; describe('MessageSender', () => { afterEach(() => { @@ -52,7 +53,8 @@ describe('MessageSender', () => { beforeEach(async () => { rawMessage = await MessageUtils.toRawMessage( TestUtils.generateFakePubKey(), - TestUtils.generateVisibleMessage() + TestUtils.generateVisibleMessage(), + SnodeNamespaces.UserMessages ); }); @@ -100,7 +102,11 @@ describe('MessageSender', () => { const device = TestUtils.generateFakePubKey(); const visibleMessage = TestUtils.generateVisibleMessage(); - const rawMessage = await MessageUtils.toRawMessage(device, visibleMessage); + const rawMessage = await MessageUtils.toRawMessage( + device, + visibleMessage, + SnodeNamespaces.UserMessages + ); await MessageSender.send(rawMessage, 3, 10); @@ -117,7 +123,11 @@ describe('MessageSender', () => { const device = TestUtils.generateFakePubKey(); const visibleMessage = TestUtils.generateVisibleMessage(); - const rawMessage = await MessageUtils.toRawMessage(device, visibleMessage); + const rawMessage = await MessageUtils.toRawMessage( + device, + visibleMessage, + SnodeNamespaces.UserMessages + ); const offset = 200000; Sinon.stub(GetNetworkTime, 'getLatestTimestampOffset').returns(offset); await MessageSender.send(rawMessage, 3, 10); @@ -149,7 +159,11 @@ describe('MessageSender', () => { const visibleMessageExpected = TestUtils.generateVisibleMessage({ timestamp: decodedTimestampFromSending, }); - const rawMessageExpected = await MessageUtils.toRawMessage(device, visibleMessageExpected); + const rawMessageExpected = await MessageUtils.toRawMessage( + device, + visibleMessageExpected, + 0 + ); expect(envelope.content).to.deep.equal(rawMessageExpected.plainTextBuffer); }); @@ -162,7 +176,11 @@ describe('MessageSender', () => { const device = TestUtils.generateFakePubKey(); const visibleMessage = TestUtils.generateVisibleMessage(); - const rawMessage = await MessageUtils.toRawMessage(device, visibleMessage); + const rawMessage = await MessageUtils.toRawMessage( + device, + visibleMessage, + SnodeNamespaces.UserMessages + ); await MessageSender.send(rawMessage, 3, 10); const data = sessionMessageAPISendStub.getCall(0).args[1]; diff --git a/ts/test/session/unit/sending/PendingMessageCache_test.ts b/ts/test/session/unit/sending/PendingMessageCache_test.ts index eb52b5210..4da6d7d28 100644 --- a/ts/test/session/unit/sending/PendingMessageCache_test.ts +++ b/ts/test/session/unit/sending/PendingMessageCache_test.ts @@ -5,6 +5,7 @@ import * as _ from 'lodash'; import { MessageUtils } from '../../../../session/utils'; import { TestUtils } from '../../../../test/test-utils'; import { PendingMessageCache } from '../../../../session/sending/PendingMessageCache'; +import { SnodeNamespaces } from '../../../../session/apis/snode_api/namespaces'; // Equivalent to Data.StorageItem interface StorageItem { @@ -55,9 +56,13 @@ describe('PendingMessageCache', () => { it('can add to cache', async () => { const device = TestUtils.generateFakePubKey(); const message = TestUtils.generateVisibleMessage(); - const rawMessage = await MessageUtils.toRawMessage(device, message); + const rawMessage = await MessageUtils.toRawMessage( + device, + message, + SnodeNamespaces.UserMessages + ); - await pendingMessageCacheStub.add(device, message); + await pendingMessageCacheStub.add(device, message, SnodeNamespaces.UserMessages); // Verify that the message is in the cache const finalCache = await pendingMessageCacheStub.getAllPending(); @@ -71,12 +76,24 @@ describe('PendingMessageCache', () => { it('can add multiple messages belonging to the same user', async () => { const device = TestUtils.generateFakePubKey(); - await pendingMessageCacheStub.add(device, TestUtils.generateVisibleMessage()); + await pendingMessageCacheStub.add( + device, + TestUtils.generateVisibleMessage(), + SnodeNamespaces.UserMessages + ); // We have to timeout here otherwise it's processed too fast and messages start having the same timestamp await TestUtils.timeout(5); - await pendingMessageCacheStub.add(device, TestUtils.generateVisibleMessage()); + await pendingMessageCacheStub.add( + device, + TestUtils.generateVisibleMessage(), + SnodeNamespaces.UserMessages + ); await TestUtils.timeout(5); - await pendingMessageCacheStub.add(device, TestUtils.generateVisibleMessage()); + await pendingMessageCacheStub.add( + device, + TestUtils.generateVisibleMessage(), + SnodeNamespaces.UserMessages + ); // Verify that the message is in the cache const finalCache = await pendingMessageCacheStub.getAllPending(); @@ -87,9 +104,13 @@ describe('PendingMessageCache', () => { it('can remove from cache', async () => { const device = TestUtils.generateFakePubKey(); const message = TestUtils.generateVisibleMessage(); - const rawMessage = await MessageUtils.toRawMessage(device, message); + const rawMessage = await MessageUtils.toRawMessage( + device, + message, + SnodeNamespaces.UserMessages + ); - await pendingMessageCacheStub.add(device, message); + await pendingMessageCacheStub.add(device, message, SnodeNamespaces.UserMessages); const initialCache = await pendingMessageCacheStub.getAllPending(); expect(initialCache).to.have.length(1); @@ -106,12 +127,24 @@ describe('PendingMessageCache', () => { it('should only remove messages with different identifier and device', async () => { const device = TestUtils.generateFakePubKey(); const message = TestUtils.generateVisibleMessage(); - const rawMessage = await MessageUtils.toRawMessage(device, message); + const rawMessage = await MessageUtils.toRawMessage( + device, + message, + SnodeNamespaces.UserMessages + ); - await pendingMessageCacheStub.add(device, message); + await pendingMessageCacheStub.add(device, message, SnodeNamespaces.UserMessages); await TestUtils.timeout(5); - const one = await pendingMessageCacheStub.add(device, TestUtils.generateVisibleMessage()); - const two = await pendingMessageCacheStub.add(TestUtils.generateFakePubKey(), message); + const one = await pendingMessageCacheStub.add( + device, + TestUtils.generateVisibleMessage(), + SnodeNamespaces.UserMessages + ); + const two = await pendingMessageCacheStub.add( + TestUtils.generateFakePubKey(), + message, + SnodeNamespaces.UserMessages + ); const initialCache = await pendingMessageCacheStub.getAllPending(); expect(initialCache).to.have.length(3); @@ -143,7 +176,7 @@ describe('PendingMessageCache', () => { ]; for (const item of cacheItems) { - await pendingMessageCacheStub.add(item.device, item.message); + await pendingMessageCacheStub.add(item.device, item.message, SnodeNamespaces.UserMessages); } const cache = await pendingMessageCacheStub.getAllPending(); @@ -171,7 +204,7 @@ describe('PendingMessageCache', () => { ]; for (const item of cacheItems) { - await pendingMessageCacheStub.add(item.device, item.message); + await pendingMessageCacheStub.add(item.device, item.message, SnodeNamespaces.UserMessages); } const initialCache = await pendingMessageCacheStub.getAllPending(); @@ -188,7 +221,11 @@ describe('PendingMessageCache', () => { it('can find nothing when empty', async () => { const device = TestUtils.generateFakePubKey(); const message = TestUtils.generateVisibleMessage(); - const rawMessage = await MessageUtils.toRawMessage(device, message); + const rawMessage = await MessageUtils.toRawMessage( + device, + message, + SnodeNamespaces.UserMessages + ); const foundMessage = pendingMessageCacheStub.find(rawMessage); expect(foundMessage, 'a message was found in empty cache').to.be.undefined; @@ -197,9 +234,13 @@ describe('PendingMessageCache', () => { it('can find message in cache', async () => { const device = TestUtils.generateFakePubKey(); const message = TestUtils.generateVisibleMessage(); - const rawMessage = await MessageUtils.toRawMessage(device, message); + const rawMessage = await MessageUtils.toRawMessage( + device, + message, + SnodeNamespaces.UserMessages + ); - await pendingMessageCacheStub.add(device, message); + await pendingMessageCacheStub.add(device, message, SnodeNamespaces.UserMessages); const finalCache = await pendingMessageCacheStub.getAllPending(); expect(finalCache).to.have.length(1); @@ -226,7 +267,7 @@ describe('PendingMessageCache', () => { ]; for (const item of cacheItems) { - await pendingMessageCacheStub.add(item.device, item.message); + await pendingMessageCacheStub.add(item.device, item.message, SnodeNamespaces.UserMessages); } const initialCache = await pendingMessageCacheStub.getAllPending(); @@ -256,7 +297,7 @@ describe('PendingMessageCache', () => { ]; for (const item of cacheItems) { - await pendingMessageCacheStub.add(item.device, item.message); + await pendingMessageCacheStub.add(item.device, item.message, SnodeNamespaces.UserMessages); } const addedMessages = await pendingMessageCacheStub.getAllPending(); diff --git a/ts/test/session/unit/utils/Messages_test.ts b/ts/test/session/unit/utils/Messages_test.ts index 1362158a5..b6835285e 100644 --- a/ts/test/session/unit/utils/Messages_test.ts +++ b/ts/test/session/unit/utils/Messages_test.ts @@ -25,6 +25,7 @@ import { ConversationTypeEnum } from '../../../../models/conversationAttributes' import { getOpenGroupV2ConversationId } from '../../../../session/apis/open_group_api/utils/OpenGroupUtils'; import { beforeEach } from 'mocha'; import { OpenGroupData, OpenGroupV2Room } from '../../../../data/opengroups'; +import { SnodeNamespaces } from '../../../../session/apis/snode_api/namespaces'; const { expect } = chai; @@ -39,7 +40,11 @@ describe('Message Utils', () => { const device = TestUtils.generateFakePubKey(); const message = TestUtils.generateVisibleMessage(); - const rawMessage = await MessageUtils.toRawMessage(device, message); + const rawMessage = await MessageUtils.toRawMessage( + device, + message, + SnodeNamespaces.UserContacts + ); expect(Object.keys(rawMessage)).to.have.length(5); @@ -56,13 +61,18 @@ describe('Message Utils', () => { expect(rawMessage.device).to.equal(device.key); expect(rawMessage.plainTextBuffer).to.deep.equal(message.plainTextBuffer()); expect(rawMessage.ttl).to.equal(message.ttl()); + expect(rawMessage.namespace).to.equal(3); }); it('should generate valid plainTextBuffer', async () => { const device = TestUtils.generateFakePubKey(); const message = TestUtils.generateVisibleMessage(); - const rawMessage = await MessageUtils.toRawMessage(device, message); + const rawMessage = await MessageUtils.toRawMessage( + device, + message, + SnodeNamespaces.UserMessages + ); const rawBuffer = rawMessage.plainTextBuffer; const rawBufferJSON = JSON.stringify(rawBuffer); @@ -82,7 +92,11 @@ describe('Message Utils', () => { const device = TestUtils.generateFakePubKey(); const message = TestUtils.generateVisibleMessage(); - const rawMessage = await MessageUtils.toRawMessage(device, message); + const rawMessage = await MessageUtils.toRawMessage( + device, + message, + SnodeNamespaces.UserMessages + ); const derivedPubKey = PubKey.from(rawMessage.device); expect(derivedPubKey).to.not.be.eq(undefined, 'should maintain pubkey'); @@ -98,14 +112,22 @@ describe('Message Utils', () => { const chatMessage = TestUtils.generateVisibleMessage(); const message = new ClosedGroupVisibleMessage({ chatMessage, groupId }); - const rawMessage = await MessageUtils.toRawMessage(device, message); + const rawMessage = await MessageUtils.toRawMessage( + device, + message, + SnodeNamespaces.UserMessages + ); expect(rawMessage.encryption).to.equal(SignalService.Envelope.Type.CLOSED_GROUP_MESSAGE); }); it('should set encryption to Fallback on other messages', async () => { const device = TestUtils.generateFakePubKey(); const message = TestUtils.generateVisibleMessage(); - const rawMessage = await MessageUtils.toRawMessage(device, message); + const rawMessage = await MessageUtils.toRawMessage( + device, + message, + SnodeNamespaces.UserMessages + ); expect(rawMessage.encryption).to.equal(SignalService.Envelope.Type.SESSION_MESSAGE); }); @@ -123,7 +145,7 @@ describe('Message Utils', () => { keypair: TestUtils.generateFakeECKeyPair(), expireTimer: 0, }); - const rawMessage = await MessageUtils.toRawMessage(device, msg); + const rawMessage = await MessageUtils.toRawMessage(device, msg, SnodeNamespaces.UserMessages); expect(rawMessage.encryption).to.equal(SignalService.Envelope.Type.SESSION_MESSAGE); }); @@ -135,7 +157,7 @@ describe('Message Utils', () => { name: 'df', groupId: TestUtils.generateFakePubKey().key, }); - const rawMessage = await MessageUtils.toRawMessage(device, msg); + const rawMessage = await MessageUtils.toRawMessage(device, msg, SnodeNamespaces.UserMessages); expect(rawMessage.encryption).to.equal(SignalService.Envelope.Type.CLOSED_GROUP_MESSAGE); }); @@ -147,7 +169,7 @@ describe('Message Utils', () => { addedMembers: [TestUtils.generateFakePubKey().key], groupId: TestUtils.generateFakePubKey().key, }); - const rawMessage = await MessageUtils.toRawMessage(device, msg); + const rawMessage = await MessageUtils.toRawMessage(device, msg, SnodeNamespaces.UserMessages); expect(rawMessage.encryption).to.equal(SignalService.Envelope.Type.CLOSED_GROUP_MESSAGE); }); @@ -159,7 +181,7 @@ describe('Message Utils', () => { removedMembers: [TestUtils.generateFakePubKey().key], groupId: TestUtils.generateFakePubKey().key, }); - const rawMessage = await MessageUtils.toRawMessage(device, msg); + const rawMessage = await MessageUtils.toRawMessage(device, msg, SnodeNamespaces.UserMessages); expect(rawMessage.encryption).to.equal(SignalService.Envelope.Type.CLOSED_GROUP_MESSAGE); }); @@ -180,7 +202,7 @@ describe('Message Utils', () => { groupId: TestUtils.generateFakePubKey().key, encryptedKeyPairs: fakeWrappers, }); - const rawMessage = await MessageUtils.toRawMessage(device, msg); + const rawMessage = await MessageUtils.toRawMessage(device, msg, SnodeNamespaces.UserMessages); expect(rawMessage.encryption).to.equal(SignalService.Envelope.Type.CLOSED_GROUP_MESSAGE); }); @@ -201,7 +223,7 @@ describe('Message Utils', () => { groupId: TestUtils.generateFakePubKey().key, encryptedKeyPairs: fakeWrappers, }); - const rawMessage = await MessageUtils.toRawMessage(device, msg); + const rawMessage = await MessageUtils.toRawMessage(device, msg, SnodeNamespaces.UserMessages); expect(rawMessage.encryption).to.equal(SignalService.Envelope.Type.SESSION_MESSAGE); }); @@ -215,7 +237,7 @@ describe('Message Utils', () => { displayName: 'displayName', contacts: [], }); - const rawMessage = await MessageUtils.toRawMessage(device, msg); + const rawMessage = await MessageUtils.toRawMessage(device, msg, SnodeNamespaces.UserMessages); expect(rawMessage.encryption).to.equal(SignalService.Envelope.Type.SESSION_MESSAGE); }); });