fix: add namespace to all sending message calls

pull/2620/head
Audric Ackermann 2 years ago
parent 0f58e11a17
commit 6d1b406c85

@ -13,6 +13,7 @@ import { resetSelectedMessageIds } from '../../state/ducks/conversations';
import { updateConfirmModal } from '../../state/ducks/modalDialog';
import { SessionButtonColor } from '../../components/basic/SessionButton';
import { deleteSogsMessageByServerIds } from '../../session/apis/open_group_api/sogsv3/sogsV3DeleteMessages';
import { SnodeNamespaces } from '../../session/apis/snode_api/namespaces';
/**
* Deletes messages for everyone in a 1-1 or everyone in a closed group conversation.
@ -38,14 +39,14 @@ async function unsendMessagesForEveryone(
await Promise.all(
unsendMsgObjects.map(unsendObject =>
getMessageQueue()
.sendToPubKey(new PubKey(destinationId), unsendObject)
.sendToPubKey(new PubKey(destinationId), unsendObject, SnodeNamespaces.UserMessages)
.catch(window?.log?.error)
)
);
await Promise.all(
unsendMsgObjects.map(unsendObject =>
getMessageQueue()
.sendSyncMessage(unsendObject)
.sendSyncMessage({ namespace: SnodeNamespaces.UserMessages, message: unsendObject })
.catch(window?.log?.error)
)
);
@ -54,7 +55,11 @@ async function unsendMessagesForEveryone(
await Promise.all(
unsendMsgObjects.map(unsendObject => {
getMessageQueue()
.sendToGroup(unsendObject, undefined, new PubKey(destinationId))
.sendToGroup({
message: unsendObject,
namespace: SnodeNamespaces.ClosedGroupMessage,
groupPubKey: new PubKey(destinationId),
})
.catch(window?.log?.error);
})
);
@ -222,7 +227,7 @@ async function unsendMessageJustForThisUser(
await Promise.all(
unsendMsgObjects.map(unsendObject =>
getMessageQueue()
.sendSyncMessage(unsendObject)
.sendSyncMessage({ namespace: SnodeNamespaces.UserMessages, message: unsendObject })
.catch(window?.log?.error)
)
);

@ -98,6 +98,7 @@ import { sogsV3FetchPreviewAndSaveIt } from '../session/apis/open_group_api/sogs
import { Reaction } from '../types/Reaction';
import { Reactions } from '../util/reactions';
import { GetNetworkTime } from '../session/apis/snode_api/getNetworkTime';
import { SnodeNamespaces } from '../session/apis/snode_api/namespaces';
export class ConversationModel extends Backbone.Model<ConversationAttributes> {
public updateLastMessage: () => any;
@ -609,12 +610,12 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
}
const openGroup = OpenGroupData.getV2OpenGroupRoom(this.id);
// send with blinding if we need to
await getMessageQueue().sendToOpenGroupV2(
chatMessageOpenGroupV2,
await getMessageQueue().sendToOpenGroupV2({
message: chatMessageOpenGroupV2,
roomInfos,
Boolean(roomHasBlindEnabled(openGroup)),
fileIdsToLink
);
blinded: Boolean(roomHasBlindEnabled(openGroup)),
filesToLink: fileIdsToLink,
});
return;
}
@ -625,7 +626,10 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
chatMessageParams.syncTarget = this.id;
const chatMessageMe = new VisibleMessage(chatMessageParams);
await getMessageQueue().sendSyncMessage(chatMessageMe);
await getMessageQueue().sendSyncMessage({
namespace: SnodeNamespaces.UserMessages,
message: chatMessageMe,
});
return;
}
@ -639,12 +643,20 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
expireTimer: this.get('expireTimer'),
});
// we need the return await so that errors are caught in the catch {}
await getMessageQueue().sendToPubKey(destinationPubkey, groupInvitMessage);
await getMessageQueue().sendToPubKey(
destinationPubkey,
groupInvitMessage,
SnodeNamespaces.UserMessages
);
return;
}
const chatMessagePrivate = new VisibleMessage(chatMessageParams);
await getMessageQueue().sendToPubKey(destinationPubkey, chatMessagePrivate);
await getMessageQueue().sendToPubKey(
destinationPubkey,
chatMessagePrivate,
SnodeNamespaces.UserMessages
);
return;
}
@ -656,7 +668,10 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
});
// we need the return await so that errors are caught in the catch {}
await getMessageQueue().sendToGroup(closedGroupVisibleMessage);
await getMessageQueue().sendToGroup({
message: closedGroupVisibleMessage,
namespace: SnodeNamespaces.ClosedGroupMessage,
});
return;
}
@ -729,7 +744,12 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
const blinded = Boolean(roomHasBlindEnabled(openGroup));
// send with blinding if we need to
await getMessageQueue().sendToOpenGroupV2(chatMessageOpenGroupV2, roomInfos, blinded, []);
await getMessageQueue().sendToOpenGroupV2({
message: chatMessageOpenGroupV2,
roomInfos,
blinded,
filesToLink: [],
});
return;
}
@ -740,10 +760,17 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
...chatMessageParams,
syncTarget: this.id,
});
await getMessageQueue().sendSyncMessage(chatMessageMe);
await getMessageQueue().sendSyncMessage({
namespace: SnodeNamespaces.UserMessages,
message: chatMessageMe,
});
const chatMessagePrivate = new VisibleMessage(chatMessageParams);
await getMessageQueue().sendToPubKey(destinationPubkey, chatMessagePrivate);
await getMessageQueue().sendToPubKey(
destinationPubkey,
chatMessagePrivate,
SnodeNamespaces.UserMessages
);
await Reactions.handleMessageReaction({
reaction,
sender: UserUtils.getOurPubKeyStrFromCache(),
@ -760,7 +787,10 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
groupId: destination,
});
// we need the return await so that errors are caught in the catch {}
await getMessageQueue().sendToGroup(closedGroupVisibleMessage);
await getMessageQueue().sendToGroup({
message: closedGroupVisibleMessage,
namespace: SnodeNamespaces.ClosedGroupMessage,
});
await Reactions.handleMessageReaction({
reaction,
sender: UserUtils.getOurPubKeyStrFromCache(),
@ -893,12 +923,12 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
this.set({ active_at: Date.now(), isApproved: true });
await getMessageQueue().sendToOpenGroupV2BlindedRequest(
encryptedMsg,
roomInfo,
sogsVisibleMessage,
this.id
);
await getMessageQueue().sendToOpenGroupV2BlindedRequest({
encryptedContent: encryptedMsg,
roomInfos: roomInfo,
message: sogsVisibleMessage,
recipientBlindedId: this.id,
});
}
/**
@ -920,7 +950,7 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
const messageRequestResponse = new MessageRequestResponse(messageRequestResponseParams);
const pubkeyForSending = new PubKey(this.id);
await getMessageQueue()
.sendToPubKey(pubkeyForSending, messageRequestResponse)
.sendToPubKey(pubkeyForSending, messageRequestResponse, SnodeNamespaces.UserMessages)
.catch(window?.log?.error);
}
@ -1118,7 +1148,11 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
if (this.isPrivate()) {
const expirationTimerMessage = new ExpirationTimerUpdateMessage(expireUpdate);
const pubkey = new PubKey(this.get('id'));
await getMessageQueue().sendToPubKey(pubkey, expirationTimerMessage);
await getMessageQueue().sendToPubKey(
pubkey,
expirationTimerMessage,
SnodeNamespaces.UserMessages
);
} else {
window?.log?.warn('TODO: Expiration update for closed groups are to be updated');
const expireUpdateForGroup = {
@ -1128,7 +1162,10 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
const expirationTimerMessage = new ExpirationTimerUpdateMessage(expireUpdateForGroup);
await getMessageQueue().sendToGroup(expirationTimerMessage);
await getMessageQueue().sendToGroup({
message: expirationTimerMessage,
namespace: SnodeNamespaces.ClosedGroupMessage,
});
}
return;
}
@ -1365,7 +1402,7 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
});
const device = new PubKey(this.id);
await getMessageQueue().sendToPubKey(device, receiptMessage);
await getMessageQueue().sendToPubKey(device, receiptMessage, SnodeNamespaces.UserMessages);
}
}
@ -2045,7 +2082,7 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
// send the message to a single recipient if this is a session chat
const device = new PubKey(recipientId);
getMessageQueue()
.sendToPubKey(device, typingMessage)
.sendToPubKey(device, typingMessage, SnodeNamespaces.UserMessages)
.catch(window?.log?.error);
}

@ -95,6 +95,7 @@ import { QUOTED_TEXT_MAX_LENGTH } from '../session/constants';
import { ReactionList } from '../types/Reaction';
import { getAttachmentMetadata } from '../types/message/initializeAttachmentMetadata';
import { GetNetworkTime } from '../session/apis/snode_api/getNetworkTime';
import { SnodeNamespaces } from '../session/apis/snode_api/namespaces';
// tslint:disable: cyclomatic-complexity
/**
@ -905,12 +906,12 @@ export class MessageModel extends Backbone.Model<MessageAttributes> {
const openGroupMessage = new OpenGroupVisibleMessage(openGroupParams);
const openGroup = OpenGroupData.getV2OpenGroupRoom(conversation.id);
return getMessageQueue().sendToOpenGroupV2(
openGroupMessage,
return getMessageQueue().sendToOpenGroupV2({
message: openGroupMessage,
roomInfos,
roomHasBlindEnabled(openGroup),
fileIdsToLink
);
blinded: roomHasBlindEnabled(openGroup),
filesToLink: fileIdsToLink,
});
}
const chatParams = {
@ -936,7 +937,11 @@ export class MessageModel extends Backbone.Model<MessageAttributes> {
}
if (conversation.isPrivate()) {
return getMessageQueue().sendToPubKey(PubKey.cast(conversation.id), chatMessage);
return getMessageQueue().sendToPubKey(
PubKey.cast(conversation.id),
chatMessage,
SnodeNamespaces.UserMessages
);
}
// Here, the convo is neither an open group, a private convo or ourself. It can only be a medium group.
@ -954,7 +959,10 @@ export class MessageModel extends Backbone.Model<MessageAttributes> {
groupId: this.get('conversationId'),
});
return getMessageQueue().sendToGroup(closedGroupVisibleMessage);
return getMessageQueue().sendToGroup({
message: closedGroupVisibleMessage,
namespace: SnodeNamespaces.ClosedGroupMessage,
});
} catch (e) {
await this.saveErrors(e);
return null;
@ -1060,7 +1068,10 @@ export class MessageModel extends Backbone.Model<MessageAttributes> {
throw new Error('Cannot trigger syncMessage with unknown convo.');
}
const syncMessage = buildSyncMessage(this.id, dataMessage, conversation.id, sentTimestamp);
await getMessageQueue().sendSyncMessage(syncMessage);
await getMessageQueue().sendSyncMessage({
namespace: SnodeNamespaces.UserMessages,
message: syncMessage,
});
}
this.set({ sentSync: true });
await this.commit();

@ -19,6 +19,7 @@ import { queueAllCachedFromSource } from './receiver';
import { getSwarmPollingInstance } from '../session/apis/snode_api';
import { perfEnd, perfStart } from '../session/utils/Performance';
import { ConversationTypeEnum } from '../models/conversationAttributes';
import { SnodeNamespaces } from '../session/apis/snode_api/namespaces';
export const distributingClosedGroupEncryptionKeyPairs = new Map<string, ECKeyPair>();
@ -906,7 +907,11 @@ async function sendLatestKeyPairToUsers(
});
// the encryption keypair is sent using established channels
await getMessageQueue().sendToPubKey(PubKey.cast(member), keypairsMessage);
await getMessageQueue().sendToPubKey(
PubKey.cast(member),
keypairsMessage,
SnodeNamespaces.UserMessages
);
})
);
}

@ -1,3 +1,5 @@
import { SnodeNamespaces } from './namespaces';
export type SwarmForSubRequest = { method: 'get_swarm'; params: { pubkey: string } };
type RetrieveMaxCountSize = { max_count?: number; max_size?: number };
@ -29,7 +31,7 @@ export type RetrievePubkeySubRequestType = {
export type RetrieveLegacyClosedGroupSubRequestType = {
method: 'retrieve';
params: {
namespace: -10; // legacy closed groups retrieve are not authenticated because the clients do not have a shared key
namespace: SnodeNamespaces.ClosedGroupMessage; // legacy closed groups retrieve are not authenticated because the clients do not have a shared key
} & RetrieveAlwaysNeeded &
RetrieveMaxCountSize;
};

@ -44,7 +44,7 @@ export async function doSnodeBatchRequest(
*/
function decodeBatchRequest(snodeResponse: SnodeResponse): NotEmptyArrayOfBatchResults {
try {
console.warn('decodeBatch: ', snodeResponse);
// console.warn('decodeBatch: ', snodeResponse);
if (snodeResponse.status !== 200) {
throw new Error(`decodeBatchRequest invalid status code: ${snodeResponse.status}`);
}

@ -1,9 +1,4 @@
export enum SnodeNamespaces {
/**
* The messages sent to a closed group are sent and polled from this namespace
*/
ClosedGroupMessages = -10,
/**
* This is the namespace anyone can deposit a message for us
*/
@ -13,14 +8,51 @@ export enum SnodeNamespaces {
* This is the namespace used to sync our profile
*/
UserProfile = 2,
/**
* This is the namespace used to sync our contacts
*/
UserContacts = 3,
/**
* The messages sent to a closed group are sent and polled from this namespace
*/
ClosedGroupMessage = -10,
/**
* This is the namespace used to sync the closed group details for each of the closed groups we are polling
*/
ClosedGroupInfo = 11,
ClosedGroupInfo = 1,
}
type PickEnum<T, K extends T> = {
[P in keyof K]: P extends K ? P : never;
};
export type SnodeNamespacesGroup = PickEnum<
SnodeNamespaces,
SnodeNamespaces.ClosedGroupInfo | SnodeNamespaces.ClosedGroupMessage
>;
export type SnodeNamespacesUser = PickEnum<
SnodeNamespaces,
SnodeNamespaces.UserContacts | SnodeNamespaces.UserProfile | SnodeNamespaces.UserMessages
>;
/**
* Returns true if that namespace is associated with the config of a user (not his messages, only configs)
*/
function isUserConfigNamespace(namespace: SnodeNamespaces) {
return namespace === SnodeNamespaces.UserContacts || namespace === SnodeNamespaces.UserProfile;
}
/**
* Returns true if that namespace is associated with the config of a closed group (not its messages, only configs)
*/
function isGroupConfigNamespace(namespace: SnodeNamespaces) {
return namespace === SnodeNamespaces.ClosedGroupInfo;
}
export const SnodeNamespace = {
isUserConfigNamespace,
isGroupConfigNamespace,
};

@ -6,10 +6,12 @@ import { fromHexToArray, fromUInt8ArrayToBase64 } from '../../utils/String';
import { doSnodeBatchRequest } from './batchRequest';
import { GetNetworkTime } from './getNetworkTime';
import { SnodeNamespaces } from './namespaces';
import {
RetrieveLegacyClosedGroupSubRequestType,
RetrieveSubRequestType,
} from './SnodeRequestTypes';
import { RetrieveMessagesResultsBatched, RetrieveMessagesResultsContent } from './types';
async function getRetrieveSignatureParams(params: {
pubkey: string;
@ -71,15 +73,15 @@ async function buildRetrieveRequest(
timestamp: GetNetworkTime.getNowWithNetworkOffset(),
};
if (namespace === SnodeNamespaces.ClosedGroupMessages) {
if (namespace === SnodeNamespaces.ClosedGroupMessage) {
if (pubkey === ourPubkey || !pubkey.startsWith('05')) {
throw new Error(
'namespace -10 can only be used to retrieve messages from a legacy closed group'
'namespace -10 can only be used to retrieve messages from a legacy closed group (prefix 05)'
);
}
const retrieveLegacyClosedGroup = {
...retrieveParam,
namespace: namespace as -10,
namespace,
};
const retrieveParamsLegacy: RetrieveLegacyClosedGroupSubRequestType = {
method: 'retrieve',
@ -99,7 +101,7 @@ async function buildRetrieveRequest(
throw new Error('not a legacy closed group. namespace can only be 0');
}
if (pubkey !== ourPubkey) {
throw new Error('not a legacy closed group. pubkey can only be our number');
throw new Error('not a legacy closed group. pubkey can only be ours');
}
const signatureArgs = { ...retrieveParam, ourPubkey };
const signatureBuilt = await getRetrieveSignatureParams(signatureArgs);
@ -121,7 +123,7 @@ async function retrieveNextMessages(
associatedWith: string,
namespaces: Array<SnodeNamespaces>,
ourPubkey: string
): Promise<Array<{ code: number; messages: Array<Record<string, any>> }>> {
): Promise<RetrieveMessagesResultsBatched> {
if (namespaces.length !== lastHashes.length) {
throw new Error('namespaces and lasthashes does not match');
}
@ -135,7 +137,7 @@ async function retrieveNextMessages(
// let exceptions bubble up
// no retry for this one as this a call we do every few seconds while polling for messages
console.warn('retrieveRequestsParams', retrieveRequestsParams);
console.warn(`fetching messages associatedWith:${associatedWith} namespaces:${namespaces}`);
const results = await doSnodeBatchRequest(retrieveRequestsParams, targetNode, 4000);
if (!results || !results.length) {
@ -153,11 +155,11 @@ async function retrieveNextMessages(
);
}
if (namespaces.length > 1) {
throw new Error('multiple namespace polling todo');
}
const firstResult = results[0];
// TODO we should probably check for status code of all the results (when polling for a few namespaces at a time)
console.warn('what should we do if we dont get a 200 on any of those fetches?');
if (firstResult.code !== 200) {
window?.log?.warn(`retrieveNextMessages result is not 200 but ${firstResult.code}`);
throw new Error(
@ -165,21 +167,19 @@ async function retrieveNextMessages(
);
}
console.warn('what should we do if we dont get a 200 on any of those fetches?');
try {
// we rely on the code of the
// we rely on the code of the first one to check for online status
const bodyFirstResult = firstResult.body;
if (!window.inboxStore?.getState().onionPaths.isOnline) {
window.inboxStore?.dispatch(updateIsOnline(true));
}
GetNetworkTime.handleTimestampOffsetFromNetwork('retrieve', bodyFirstResult.t);
// merge results with their corresponding namespaces
// merge results with their corresponding namespaces
return results.map((result, index) => ({
code: result.code,
messages: result.body as Array<any>,
messages: result.body as RetrieveMessagesResultsContent,
namespace: namespaces[index],
}));
} catch (e) {

@ -129,8 +129,6 @@ export async function snodeRpc(
params: clone(params),
};
console.warn('snodeRPC', body);
const fetchOptions: LokiFetchOptions = {
method: 'POST',
body: JSON.stringify(body),

@ -3,7 +3,7 @@ import * as snodePool from './snodePool';
import { ERROR_CODE_NO_CONNECT } from './SNodeAPI';
import { SignalService } from '../../../protobuf';
import * as Receiver from '../../../receiver/receiver';
import _, { concat, last } from 'lodash';
import _, { compact, concat, difference, flatten, last, sample, uniqBy } from 'lodash';
import { Data, Snode } from '../../../data/data';
import { StringUtils, UserUtils } from '../../utils';
@ -15,18 +15,8 @@ import { ed25519Str } from '../../onions/onionPath';
import { updateIsOnline } from '../../../state/ducks/onion';
import pRetry from 'p-retry';
import { SnodeAPIRetrieve } from './retrieveRequest';
import { SnodeNamespaces } from './namespaces';
interface Message {
hash: string;
expiration: number;
data: string;
}
export type RetrieveMessagesResults = Array<{
code: number;
messages: Record<string, any>[];
}>;
import { SnodeNamespace, SnodeNamespaces } from './namespaces';
import { RetrieveMessageItem, RetrieveMessagesResultsBatched } from './types';
// Some websocket nonsense
export function processMessage(message: string, options: any = {}, messageHash: string) {
@ -172,7 +162,7 @@ export class SwarmPolling {
`Polling for ${loggingId}; timeout: ${convoPollingTimeout}; diff: ${diff} `
);
return this.pollOnceForKey(group.pubkey, true, [SnodeNamespaces.ClosedGroupMessages]);
return this.pollOnceForKey(group.pubkey, true, [SnodeNamespaces.ClosedGroupMessage]);
}
window?.log?.info(
`Not polling for ${loggingId}; timeout: ${convoPollingTimeout} ; diff: ${diff}`
@ -208,27 +198,54 @@ export class SwarmPolling {
// If we need more nodes, select randomly from the remaining nodes:
if (!toPollFrom) {
const notPolled = _.difference(swarmSnodes, alreadyPolled);
toPollFrom = _.sample(notPolled) as Snode;
const notPolled = difference(swarmSnodes, alreadyPolled);
toPollFrom = sample(notPolled) as Snode;
}
let resultsFromAllNamespaces: RetrieveMessagesResults | null;
let resultsFromAllNamespaces: RetrieveMessagesResultsBatched | null;
try {
resultsFromAllNamespaces = await this.pollNodeForKey(toPollFrom, pubkey, namespaces);
} catch (e) {
window.log.warn('pollNodeForKey failed with: ', e.message);
window.log.warn(
`pollNodeForKey of ${pubkey} namespaces: ${namespaces} failed with: ${e.message}`
);
resultsFromAllNamespaces = null;
}
// filter out null (exception thrown)
const arrayOfResults = resultsFromAllNamespaces?.length
? _.compact(resultsFromAllNamespaces)
: null;
let userConfigMessagesMerged: Array<RetrieveMessageItem> = [];
let allNamespacesWithoutUserConfigIfNeeded: Array<RetrieveMessageItem> = [];
// check if we just fetched the details from the config namespaces.
// If yes, merge them together and exclude them from the rest of the messages.
if (window.sessionFeatureFlags.useSharedUtilForUserConfig && resultsFromAllNamespaces) {
const userConfigMessages = resultsFromAllNamespaces
.filter(m => SnodeNamespace.isUserConfigNamespace(m.namespace))
.map(r => r.messages.messages);
allNamespacesWithoutUserConfigIfNeeded = flatten(
compact(
resultsFromAllNamespaces
.filter(m => !SnodeNamespace.isUserConfigNamespace(m.namespace))
.map(r => r.messages.messages)
)
);
userConfigMessagesMerged = flatten(compact(userConfigMessages));
} else {
allNamespacesWithoutUserConfigIfNeeded = flatten(
compact(resultsFromAllNamespaces?.map(m => m.messages.messages))
);
}
console.warn(`received userConfigMessagesMerged: ${userConfigMessagesMerged.length}`);
console.warn(
`received allNamespacesWithoutUserConfigIfNeeded: ${allNamespacesWithoutUserConfigIfNeeded.length}`
);
// Merge results into one list of unique messages
const messages = _.uniqBy(_.flatten(arrayOfResults), (x: any) => x.hash);
const messages = uniqBy(allNamespacesWithoutUserConfigIfNeeded, x => x.hash);
// if all snodes returned an error (null), no need to update the lastPolledTimestamp
if (isGroup && arrayOfResults?.length) {
if (isGroup && allNamespacesWithoutUserConfigIfNeeded?.length) {
window?.log?.info(
`Polled for group(${ed25519Str(pubkey.key)}):, got ${messages.length} messages back.`
);
@ -263,10 +280,7 @@ export class SwarmPolling {
perfEnd(`handleSeenMessages-${pkStr}`, 'handleSeenMessages');
if (window.sessionFeatureFlags.useSharedUtilForUserConfig) {
}
newMessages.forEach((m: Message) => {
newMessages.forEach((m: RetrieveMessageItem) => {
const options = isGroup ? { conversationId: pkStr } : {};
processMessage(m.data, options, m.hash);
});
@ -278,7 +292,7 @@ export class SwarmPolling {
node: Snode,
pubkey: PubKey,
namespaces: Array<SnodeNamespaces>
): Promise<RetrieveMessagesResults | null> {
): Promise<RetrieveMessagesResultsBatched | null> {
const namespaceLength = namespaces.length;
if (namespaceLength > 3 || namespaceLength <= 0) {
throw new Error('pollNodeForKey needs 1 or 2 namespaces to be given at all times');
@ -313,7 +327,7 @@ export class SwarmPolling {
}
const lastMessages = results.map(r => {
return last(r.messages);
return last(r.messages.messages);
});
await Promise.all(
@ -336,7 +350,6 @@ export class SwarmPolling {
{
minTimeout: 100,
retries: 1,
onFailedAttempt: e => {
window?.log?.warn(
`retrieveNextMessages attempt #${e.attemptNumber} failed. ${e.retriesLeft} retries left... ${e.name}`
@ -375,18 +388,20 @@ export class SwarmPolling {
});
}
private async handleSeenMessages(messages: Array<Message>): Promise<Array<Message>> {
private async handleSeenMessages(
messages: Array<RetrieveMessageItem>
): Promise<Array<RetrieveMessageItem>> {
if (!messages.length) {
return [];
}
const incomingHashes = messages.map((m: Message) => m.hash);
const incomingHashes = messages.map((m: RetrieveMessageItem) => m.hash);
const dupHashes = await Data.getSeenMessagesByHashList(incomingHashes);
const newMessages = messages.filter((m: Message) => !dupHashes.includes(m.hash));
const newMessages = messages.filter((m: RetrieveMessageItem) => !dupHashes.includes(m.hash));
if (newMessages.length) {
const newHashes = newMessages.map((m: Message) => ({
const newHashes = newMessages.map((m: RetrieveMessageItem) => ({
expiresAt: m.expiration,
hash: m.hash,
}));

@ -0,0 +1,23 @@
import { SnodeNamespaces } from './namespaces';
export type RetrieveMessageItem = {
hash: string;
expiration: number;
data: string; // base64 encrypted content of the emssage
timestamp: number;
};
export type RetrieveMessagesResultsContent = {
hf?: Array<number>;
messages?: Array<RetrieveMessageItem>;
more: boolean;
t: number;
};
export type RetrieveRequestResult = {
code: number;
messages: RetrieveMessagesResultsContent;
namespace: SnodeNamespaces;
};
export type RetrieveMessagesResultsBatched = Array<RetrieveRequestResult>;

@ -7,6 +7,7 @@ import { ECKeyPair } from '../../receiver/keypairs';
import { openConversationWithMessages } from '../../state/ducks/conversations';
import { updateConfirmModal } from '../../state/ducks/modalDialog';
import { getSwarmPollingInstance } from '../apis/snode_api';
import { SnodeNamespaces } from '../apis/snode_api/namespaces';
import {
generateClosedGroupPublicKey,
generateCurve25519KeyPairWithoutPrefix,
@ -230,6 +231,10 @@ function createInvitePromises(
expireTimer: existingExpireTimer,
};
const message = new ClosedGroupNewMessage(messageParams);
return getMessageQueue().sendToPubKeyNonDurably(PubKey.cast(m), message);
return getMessageQueue().sendToPubKeyNonDurably({
pubkey: PubKey.cast(m),
message,
namespace: SnodeNamespaces.UserMessages,
});
});
}

@ -29,6 +29,7 @@ import { ClosedGroupRemovedMembersMessage } from '../messages/outgoing/controlMe
import { getSwarmPollingInstance } from '../apis/snode_api';
import { ConversationAttributes, ConversationTypeEnum } from '../../models/conversationAttributes';
import { GetNetworkTime } from '../apis/snode_api/getNetworkTime';
import { SnodeNamespaces } from '../apis/snode_api/namespaces';
export type GroupInfo = {
id: string;
@ -338,11 +339,15 @@ export async function leaveClosedGroup(groupId: string) {
window?.log?.info(`We are leaving the group ${groupId}. Sending our leaving message.`);
// sent the message to the group and once done, remove everything related to this group
getSwarmPollingInstance().removePubkey(groupId);
await getMessageQueue().sendToGroup(ourLeavingMessage, async () => {
window?.log?.info(
`Leaving message sent ${groupId}. Removing everything related to this group.`
);
await markGroupAsLeftOrKicked(groupId, convo, false);
await getMessageQueue().sendToGroup({
message: ourLeavingMessage,
namespace: SnodeNamespaces.ClosedGroupMessage,
sentCb: async () => {
window?.log?.info(
`Leaving message sent ${groupId}. Removing everything related to this group.`
);
await markGroupAsLeftOrKicked(groupId, convo, false);
},
});
}
@ -361,7 +366,10 @@ async function sendNewName(convo: ConversationModel, name: string, messageId: st
identifier: messageId,
name,
});
await getMessageQueue().sendToGroup(nameChangeMessage);
await getMessageQueue().sendToGroup({
message: nameChangeMessage,
namespace: SnodeNamespaces.ClosedGroupMessage,
});
}
async function sendAddedMembers(
@ -393,7 +401,10 @@ async function sendAddedMembers(
addedMembers,
identifier: messageId,
});
await getMessageQueue().sendToGroup(closedGroupControlMessage);
await getMessageQueue().sendToGroup({
message: closedGroupControlMessage,
namespace: SnodeNamespaces.ClosedGroupMessage,
});
// Send closed group update messages to any new members individually
const newClosedGroupUpdate = new ClosedGroupNewMessage({
@ -410,7 +421,11 @@ async function sendAddedMembers(
const promises = addedMembers.map(async m => {
await getConversationController().getOrCreateAndWait(m, ConversationTypeEnum.PRIVATE);
const memberPubKey = PubKey.cast(m);
await getMessageQueue().sendToPubKey(memberPubKey, newClosedGroupUpdate);
await getMessageQueue().sendToPubKey(
memberPubKey,
newClosedGroupUpdate,
SnodeNamespaces.ClosedGroupMessage
);
});
await Promise.all(promises);
}
@ -445,15 +460,19 @@ export async function sendRemovedMembers(
identifier: messageId,
});
// Send the group update, and only once sent, generate and distribute a new encryption key pair if needed
await getMessageQueue().sendToGroup(mainClosedGroupControlMessage, async () => {
if (isCurrentUserAdmin) {
// we send the new encryption key only to members already here before the update
window?.log?.info(
`Sending group update: A user was removed from ${groupId} and we are the admin. Generating and sending a new EncryptionKeyPair`
);
await generateAndSendNewEncryptionKeyPair(groupId, stillMembers);
}
await getMessageQueue().sendToGroup({
message: mainClosedGroupControlMessage,
namespace: SnodeNamespaces.ClosedGroupMessage,
sentCb: async () => {
if (isCurrentUserAdmin) {
// we send the new encryption key only to members already here before the update
window?.log?.info(
`Sending group update: A user was removed from ${groupId} and we are the admin. Generating and sending a new EncryptionKeyPair`
);
await generateAndSendNewEncryptionKeyPair(groupId, stillMembers);
}
},
});
}
@ -513,7 +532,11 @@ async function generateAndSendNewEncryptionKeyPair(
await addKeyPairToCacheAndDBIfNeeded(toHex(groupId), newKeyPair.toHexKeyPair());
};
// this is to be sent to the group pubkey adress
await getMessageQueue().sendToGroup(keypairsMessage, messageSentCallback);
await getMessageQueue().sendToGroup({
message: keypairsMessage,
namespace: SnodeNamespaces.ClosedGroupMessage,
sentCb: messageSentCallback,
});
}
export async function buildEncryptionKeyPairWrappers(

@ -8,6 +8,7 @@ import { getConversationController } from '../../../conversations';
import { UserUtils } from '../../../utils';
import { SettingsKey } from '../../../../data/settings-key';
import { Storage } from '../../../../util/storage';
import { SnodeNamespaces } from '../../../apis/snode_api/namespaces';
interface DataExtractionNotificationMessageParams extends MessageParams {
referencedAttachmentTimestamp: number;
}
@ -73,7 +74,7 @@ export const sendDataExtractionNotification = async (
);
try {
await getMessageQueue().sendToPubKey(pubkey, dataExtractionNotificationMessage);
await getMessageQueue().sendToPubKey(pubkey, dataExtractionNotificationMessage, SnodeNamespaces.UserMessages);
} catch (e) {
window.log.warn('failed to send data extraction notification', e);
}

@ -24,6 +24,11 @@ import { CallMessage } from '../messages/outgoing/controlMessage/CallMessage';
import { OpenGroupMessageV2 } from '../apis/open_group_api/opengroupV2/OpenGroupMessageV2';
import { AbortController } from 'abort-controller';
import { sendSogsReactionOnionV4 } from '../apis/open_group_api/sogsv3/sogsV3SendReaction';
import {
SnodeNamespaces,
SnodeNamespacesGroup,
SnodeNamespacesUser,
} from '../apis/snode_api/namespaces';
type ClosedGroupMessageType =
| ClosedGroupVisibleMessage
@ -49,13 +54,14 @@ export class MessageQueue {
public async sendToPubKey(
destinationPubKey: PubKey,
message: ContentMessage,
namespace: SnodeNamespaces,
sentCb?: (message: RawMessage) => Promise<void>,
isGroup = false
): Promise<void> {
if (message instanceof ConfigurationMessage || !!(message as any).syncTarget) {
throw new Error('SyncMessage needs to be sent with sendSyncMessage');
}
await this.process(destinationPubKey, message, sentCb, isGroup);
await this.process(destinationPubKey, message, namespace, sentCb, isGroup);
}
/**
@ -66,12 +72,17 @@ export class MessageQueue {
*
* fileIds is the array of ids this message is linked to. If we upload files as part of a message but do not link them with this, the files will be deleted much sooner
*/
public async sendToOpenGroupV2(
message: OpenGroupVisibleMessage,
roomInfos: OpenGroupRequestCommonType,
blinded: boolean,
filesToLink: Array<number>
) {
public async sendToOpenGroupV2({
blinded,
filesToLink,
message,
roomInfos,
}: {
message: OpenGroupVisibleMessage;
roomInfos: OpenGroupRequestCommonType;
blinded: boolean;
filesToLink: Array<number>;
}) {
// Skipping the queue for Open Groups v2; the message is sent directly
try {
@ -115,12 +126,17 @@ export class MessageQueue {
}
}
public async sendToOpenGroupV2BlindedRequest(
encryptedContent: Uint8Array,
roomInfos: OpenGroupRequestCommonType,
message: OpenGroupVisibleMessage,
recipientBlindedId: string
) {
public async sendToOpenGroupV2BlindedRequest({
encryptedContent,
message,
recipientBlindedId,
roomInfos,
}: {
encryptedContent: Uint8Array;
roomInfos: OpenGroupRequestCommonType;
message: OpenGroupVisibleMessage;
recipientBlindedId: string;
}) {
try {
if (!PubKey.hasBlindedPrefix(recipientBlindedId)) {
throw new Error('sendToOpenGroupV2BlindedRequest needs a blindedId');
@ -153,11 +169,17 @@ export class MessageQueue {
*
* @param sentCb currently only called for medium groups sent message
*/
public async sendToGroup(
message: ClosedGroupMessageType,
sentCb?: (message: RawMessage) => Promise<void>,
groupPubKey?: PubKey
): Promise<void> {
public async sendToGroup({
message,
namespace,
groupPubKey,
sentCb,
}: {
message: ClosedGroupMessageType;
namespace: SnodeNamespacesGroup;
sentCb?: (message: RawMessage) => Promise<void>;
groupPubKey?: PubKey;
}): Promise<void> {
let destinationPubKey: PubKey | undefined = groupPubKey;
if (message instanceof ExpirationTimerUpdateMessage || message instanceof ClosedGroupMessage) {
destinationPubKey = groupPubKey ? groupPubKey : message.groupId;
@ -168,13 +190,18 @@ export class MessageQueue {
}
// if groupId is set here, it means it's for a medium group. So send it as it
return this.sendToPubKey(PubKey.cast(destinationPubKey), message, sentCb, true);
return this.sendToPubKey(PubKey.cast(destinationPubKey), message, namespace, sentCb, true);
}
public async sendSyncMessage(
message?: SyncMessageType,
sentCb?: (message: RawMessage) => Promise<void>
): Promise<void> {
public async sendSyncMessage({
namespace,
message,
sentCb,
}: {
namespace: SnodeNamespacesUser;
message?: SyncMessageType;
sentCb?: (message: RawMessage) => Promise<void>;
}): Promise<void> {
if (!message) {
return;
}
@ -188,7 +215,7 @@ export class MessageQueue {
const ourPubKey = UserUtils.getOurPubKeyStrFromCache();
await this.process(PubKey.cast(ourPubKey), message, sentCb);
await this.process(PubKey.cast(ourPubKey), message, namespace, sentCb);
}
/**
@ -196,13 +223,18 @@ export class MessageQueue {
* @param user user pub key to send to
* @param message Message to be sent
*/
public async sendToPubKeyNonDurably(
user: PubKey,
message: ClosedGroupNewMessage | CallMessage
): Promise<boolean | number> {
public async sendToPubKeyNonDurably({
message,
namespace,
pubkey,
}: {
pubkey: PubKey;
message: ClosedGroupNewMessage | CallMessage;
namespace: SnodeNamespaces;
}): Promise<boolean | number> {
let rawMessage;
try {
rawMessage = await MessageUtils.toRawMessage(user, message);
rawMessage = await MessageUtils.toRawMessage(pubkey, message, namespace);
const { wrappedEnvelope, effectiveTimestamp } = await MessageSender.send(rawMessage);
await MessageSentHandler.handleMessageSentSuccess(
rawMessage,
@ -281,6 +313,7 @@ export class MessageQueue {
private async process(
destinationPk: PubKey,
message: ContentMessage,
namespace: SnodeNamespaces,
sentCb?: (message: RawMessage) => Promise<void>,
isGroup = false
): Promise<void> {
@ -305,7 +338,7 @@ export class MessageQueue {
}
}
await this.pendingMessageCache.add(destinationPk, message, sentCb, isGroup);
await this.pendingMessageCache.add(destinationPk, message, namespace, sentCb, isGroup);
void this.processPending(destinationPk, isSyncMessage);
}

@ -12,7 +12,6 @@ import { OpenGroupVisibleMessage } from '../messages/outgoing/visibleMessage/Ope
import { addMessagePadding } from '../crypto/BufferPadding';
import _ from 'lodash';
import { getSwarmFor } from '../apis/snode_api/snodePool';
import { firstTrue } from '../utils/Promise';
import { MessageSender } from '.';
import { Data, Snode } from '../../../ts/data/data';
import { getConversationController } from '../conversations';
@ -27,8 +26,7 @@ import { AbortController } from 'abort-controller';
import { SnodeAPIStore } from '../apis/snode_api/storeMessage';
import { StoreOnNodeParams } from '../apis/snode_api/SnodeRequestTypes';
import { GetNetworkTime } from '../apis/snode_api/getNetworkTime';
const DEFAULT_CONNECTIONS = 1;
import { SnodeNamespaces } from '../apis/snode_api/namespaces';
// ================ SNODE STORE ================
@ -83,6 +81,7 @@ export async function send(
async () => {
const recipient = PubKey.cast(message.device);
const { encryption, ttl } = message;
let namespace = message.namespace;
const {
overRiddenTimestampBuffer,
@ -113,14 +112,25 @@ export async function send(
found.set({ sent_at: networkTimestamp });
await found.commit();
}
await MessageSender.sendMessageToSnode(
recipient.key,
// right when we upgrade from not having namespaces stored in the outgoing cached messages our messages won't have a namespace associated.
// So we need to keep doing the lookup of where they should go if the namespace is not set.
if (namespace === null || namespace === undefined) {
namespace = getConversationController()
.get(recipient.key)
?.isClosedGroup()
? SnodeNamespaces.ClosedGroupMessage
: SnodeNamespaces.UserMessages;
}
await MessageSender.sendMessageToSnode({
pubKey: recipient.key,
data,
ttl,
networkTimestamp,
timestamp: networkTimestamp,
isSyncMessage,
message.identifier
);
messageId: message.identifier,
namespace,
});
return { wrappedEnvelope: data, effectiveTimestamp: networkTimestamp };
},
{
@ -132,22 +142,29 @@ export async function send(
}
// tslint:disable-next-line: function-name
export async function sendMessageToSnode(
pubKey: string,
data: Uint8Array,
ttl: number,
timestamp: number,
isSyncMessage?: boolean,
messageId?: string
): Promise<void> {
export async function sendMessageToSnode({
data,
namespace,
pubKey,
timestamp,
ttl,
isSyncMessage,
messageId,
}: {
pubKey: string;
data: Uint8Array;
ttl: number;
timestamp: number;
namespace: SnodeNamespaces;
isSyncMessage?: boolean;
messageId?: string;
}): Promise<void> {
const data64 = ByteBuffer.wrap(data).toString('base64');
const swarm = await getSwarmFor(pubKey);
const conversation = getConversationController().get(pubKey);
const isClosedGroup = conversation?.isClosedGroup();
const namespace = isClosedGroup ? -10 : 0;
// send parameters
const params: StoreOnNodeParams = {
pubkey: pubKey,
@ -157,32 +174,27 @@ export async function sendMessageToSnode(
namespace,
};
const usedNodes = _.slice(swarm, 0, DEFAULT_CONNECTIONS);
const usedNodes = _.slice(swarm, 0, 1);
if (!usedNodes || usedNodes.length === 0) {
throw new EmptySwarmError(pubKey, 'Ran out of swarm nodes to query');
}
let successfulSendHash: string | undefined;
const promises = usedNodes.map(async usedNode => {
let snode: Snode | undefined;
try {
const snodeTried = usedNodes[0];
// No pRetry here as if this is a bad path it will be handled and retried in lokiOnionFetch.
// the only case we could care about a retry would be when the usedNode is not correct,
// but considering we trigger this request with a few snode in //, this should be fine.
const successfulSend = await SnodeAPIStore.storeOnNode(usedNode, params);
const successfulSend = await SnodeAPIStore.storeOnNode(snodeTried, params);
if (successfulSend) {
if (_.isString(successfulSend)) {
successfulSendHash = successfulSend;
}
return usedNode;
snode = snodeTried;
}
// should we mark snode as bad if it can't store our message?
return undefined;
});
let snode: Snode | undefined;
try {
const firstSuccessSnode = await firstTrue(promises);
snode = firstSuccessSnode;
} catch (e) {
const snodeStr = snode ? `${snode.ip}:${snode.port}` : 'null';
window?.log?.warn(
@ -203,11 +215,13 @@ export async function sendMessageToSnode(
}
}
window?.log?.info(
`loki_message:::sendMessage - Successfully stored message to ${ed25519Str(pubKey)} via ${
snode.ip
}:${snode.port}`
);
if (snode) {
window?.log?.info(
`loki_message:::sendMessage - Successfully stored message to ${ed25519Str(pubKey)} via ${
snode.ip
}:${snode.port}`
);
}
}
async function buildEnvelope(

@ -4,6 +4,7 @@ import { PartialRawMessage, RawMessage } from '../types/RawMessage';
import { ContentMessage } from '../messages/outgoing';
import { PubKey } from '../types';
import { MessageUtils } from '../utils';
import { SnodeNamespaces } from '../apis/snode_api/namespaces';
// This is an abstraction for storing pending messages.
// Ideally we want to store pending messages in the database so that
@ -41,11 +42,17 @@ export class PendingMessageCache {
public async add(
destinationPubKey: PubKey,
message: ContentMessage,
namespace: SnodeNamespaces,
sentCb?: (message: any) => Promise<void>,
isGroup = false
): Promise<RawMessage> {
await this.loadFromDBIfNeeded();
const rawMessage = await MessageUtils.toRawMessage(destinationPubKey, message, isGroup);
const rawMessage = await MessageUtils.toRawMessage(
destinationPubKey,
message,
namespace,
isGroup
);
// Does it exist in cache already?
if (this.find(rawMessage)) {

@ -1,4 +1,5 @@
import { SignalService } from '../../protobuf';
import { SnodeNamespaces } from '../apis/snode_api/namespaces';
export type RawMessage = {
identifier: string;
@ -6,6 +7,7 @@ export type RawMessage = {
device: string;
ttl: number;
encryption: SignalService.Envelope.Type;
namespace: SnodeNamespaces | null; // allowing null as when we upgrade, we might have messages awaiting sending which won't have a namespace
};
// For building RawMessages from JSON

@ -7,6 +7,7 @@ import { ClosedGroupEncryptionPairReplyMessage } from '../messages/outgoing/cont
import { ContentMessage } from '../messages/outgoing';
import { ExpirationTimerUpdateMessage } from '../messages/outgoing/controlMessage/ExpirationTimerUpdateMessage';
import { SignalService } from '../../protobuf';
import { SnodeNamespaces } from '../apis/snode_api/namespaces';
function getEncryptionTypeFromMessageType(
message: ContentMessage,
@ -36,6 +37,7 @@ function getEncryptionTypeFromMessageType(
export async function toRawMessage(
destinationPubKey: PubKey,
message: ContentMessage,
namespace: SnodeNamespaces,
isGroup = false
): Promise<RawMessage> {
const ttl = message.ttl();
@ -50,6 +52,7 @@ export async function toRawMessage(
device: destinationPubKey.key,
ttl,
encryption,
namespace,
};
return rawMessage;

@ -28,6 +28,7 @@ import { getCallMediaPermissionsSettings } from '../../../components/settings/Se
import { PnServer } from '../../apis/push_notification_api';
import { approveConvoAndSendResponse } from '../../../interactions/conversationInteractions';
import { GetNetworkTime } from '../../apis/snode_api/getNetworkTime';
import { SnodeNamespaces } from '../../apis/snode_api/namespaces';
// tslint:disable: function-name
@ -414,10 +415,11 @@ async function createOfferAndSendIt(recipient: string) {
});
window.log.info(`sending '${offer.type}'' with callUUID: ${currentCallUUID}`);
const negotiationOfferSendResult = await getMessageQueue().sendToPubKeyNonDurably(
PubKey.cast(recipient),
offerMessage
);
const negotiationOfferSendResult = await getMessageQueue().sendToPubKeyNonDurably({
pubkey: PubKey.cast(recipient),
message: offerMessage,
namespace: SnodeNamespaces.UserMessages,
});
if (typeof negotiationOfferSendResult === 'number') {
// window.log?.warn('setting last sent timestamp');
lastOutgoingOfferTimestamp = negotiationOfferSendResult;
@ -514,7 +516,11 @@ export async function USER_callRecipient(recipient: string) {
// we do it manually as the sendToPubkeyNonDurably rely on having a message saved to the db for MessageSentSuccess
// which is not the case for a pre offer message (the message only exists in memory)
const rawMessage = await MessageUtils.toRawMessage(PubKey.cast(recipient), preOfferMsg);
const rawMessage = await MessageUtils.toRawMessage(
PubKey.cast(recipient),
preOfferMsg,
SnodeNamespaces.UserMessages
);
const { wrappedEnvelope } = await MessageSender.send(rawMessage);
void PnServer.notifyPnServer(wrappedEnvelope, recipient);
@ -572,7 +578,11 @@ const iceSenderDebouncer = _.debounce(async (recipient: string) => {
`sending ICE CANDIDATES MESSAGE to ${ed25519Str(recipient)} about call ${currentCallUUID}`
);
await getMessageQueue().sendToPubKeyNonDurably(PubKey.cast(recipient), callIceCandicates);
await getMessageQueue().sendToPubKeyNonDurably({
pubkey: PubKey.cast(recipient),
message: callIceCandicates,
namespace: SnodeNamespaces.UserMessages,
});
}, 2000);
const findLastMessageTypeFromSender = (sender: string, msgType: SignalService.CallMessage.Type) => {
@ -903,8 +913,16 @@ export async function USER_rejectIncomingCallRequest(fromSender: string) {
async function sendCallMessageAndSync(callmessage: CallMessage, user: string) {
await Promise.all([
getMessageQueue().sendToPubKeyNonDurably(PubKey.cast(user), callmessage),
getMessageQueue().sendToPubKeyNonDurably(UserUtils.getOurPubKeyFromCache(), callmessage),
getMessageQueue().sendToPubKeyNonDurably({
pubkey: PubKey.cast(user),
message: callmessage,
namespace: SnodeNamespaces.UserMessages,
}),
getMessageQueue().sendToPubKeyNonDurably({
pubkey: UserUtils.getOurPubKeyFromCache(),
message: callmessage,
namespace: SnodeNamespaces.UserMessages,
}),
]);
}
@ -921,7 +939,11 @@ export async function USER_hangup(fromSender: string) {
timestamp: Date.now(),
uuid: currentCallUUID,
});
void getMessageQueue().sendToPubKeyNonDurably(PubKey.cast(fromSender), endCallMessage);
void getMessageQueue().sendToPubKeyNonDurably({
pubkey: PubKey.cast(fromSender),
message: endCallMessage,
namespace: SnodeNamespaces.UserMessages,
});
}
window.inboxStore?.dispatch(endCall());

@ -26,6 +26,7 @@ import { DURATION } from '../constants';
import { UnsendMessage } from '../messages/outgoing/controlMessage/UnsendMessage';
import { MessageRequestResponse } from '../messages/outgoing/controlMessage/MessageRequestResponse';
import { PubKey } from '../types';
import { SnodeNamespaces } from '../apis/snode_api/namespaces';
const ITEM_ID_LAST_SYNC_TIMESTAMP = 'lastSyncedTimestamp';
@ -53,7 +54,10 @@ export const syncConfigurationIfNeeded = async () => {
try {
// window?.log?.info('syncConfigurationIfNeeded with', configMessage);
await getMessageQueue().sendSyncMessage(configMessage);
await getMessageQueue().sendSyncMessage({
namespace: SnodeNamespaces.UserMessages,
message: configMessage,
});
} catch (e) {
window?.log?.warn('Caught an error while sending our ConfigurationMessage:', e);
// we do return early so that next time we use the old timestamp again
@ -81,7 +85,11 @@ export const forceSyncConfigurationNowIfNeeded = async (waitForMessageSent = fal
resolve(true);
}
: undefined;
void getMessageQueue().sendSyncMessage(configMessage, callback as any);
void getMessageQueue().sendSyncMessage({
namespace: SnodeNamespaces.UserMessages,
message: configMessage,
sentCb: callback as any,
});
// either we resolve from the callback if we need to wait for it,
// or we don't want to wait, we resolve it here.
if (!waitForMessageSent) {

@ -17,6 +17,7 @@ import { ClosedGroupMessage } from '../../../../session/messages/outgoing/contro
import chaiAsPromised from 'chai-as-promised';
import { MessageSentHandler } from '../../../../session/sending/MessageSentHandler';
import { stubData } from '../../../test-utils/utils';
import { SnodeNamespaces } from '../../../../session/apis/snode_api/namespaces';
chai.use(chaiAsPromised as any);
chai.should();
@ -102,7 +103,11 @@ describe('MessageQueue', () => {
}
const device = TestUtils.generateFakePubKey();
await pendingMessageCache.add(device, TestUtils.generateVisibleMessage());
await pendingMessageCache.add(
device,
TestUtils.generateVisibleMessage(),
SnodeNamespaces.UserMessages
);
const initialMessages = await pendingMessageCache.getForDevice(device);
expect(initialMessages).to.have.length(1);
@ -139,7 +144,7 @@ describe('MessageQueue', () => {
});
void pendingMessageCache
.add(device, message, waitForMessageSentEvent)
.add(device, message, SnodeNamespaces.UserMessages, waitForMessageSentEvent)
.then(() => messageQueueStub.processPending(device));
});
@ -149,7 +154,7 @@ describe('MessageQueue', () => {
const device = TestUtils.generateFakePubKey();
const message = TestUtils.generateVisibleMessage();
void pendingMessageCache
.add(device, message)
.add(device, message, SnodeNamespaces.UserMessages)
.then(() => messageQueueStub.processPending(device));
// The cb is only invoke is all reties fails. Here we poll until the messageSentHandlerFailed was invoked as this is what we want to do
@ -177,7 +182,7 @@ describe('MessageQueue', () => {
const stub = Sinon.stub(messageQueueStub as any, 'process').resolves();
const message = TestUtils.generateVisibleMessage();
await messageQueueStub.sendToPubKey(device, message);
await messageQueueStub.sendToPubKey(device, message, SnodeNamespaces.UserMessages);
const args = stub.lastCall.args as [Array<PubKey>, ContentMessage];
expect(args[0]).to.be.equal(device);
@ -188,9 +193,12 @@ describe('MessageQueue', () => {
describe('sendToGroup', () => {
it('should throw an error if invalid non-group message was passed', async () => {
const chatMessage = TestUtils.generateVisibleMessage();
return expect(messageQueueStub.sendToGroup(chatMessage as any)).to.be.rejectedWith(
'Invalid group message passed in sendToGroup.'
);
return expect(
messageQueueStub.sendToGroup({
message: chatMessage as any,
namespace: SnodeNamespaces.ClosedGroupMessage,
})
).to.be.rejectedWith('Invalid group message passed in sendToGroup.');
});
describe('closed groups', () => {
@ -201,7 +209,10 @@ describe('MessageQueue', () => {
const send = Sinon.stub(messageQueueStub, 'sendToPubKey').resolves();
const message = TestUtils.generateClosedGroupMessage();
await messageQueueStub.sendToGroup(message);
await messageQueueStub.sendToGroup({
message,
namespace: SnodeNamespaces.ClosedGroupMessage,
});
expect(send.callCount).to.equal(1);
const arg = send.getCall(0).args;
@ -223,7 +234,12 @@ describe('MessageQueue', () => {
const message = TestUtils.generateOpenGroupVisibleMessage();
const roomInfos = TestUtils.generateOpenGroupV2RoomInfos();
await messageQueueStub.sendToOpenGroupV2(message, roomInfos, false, []);
await messageQueueStub.sendToOpenGroupV2({
message,
roomInfos,
blinded: false,
filesToLink: [],
});
expect(sendToOpenGroupV2Stub.callCount).to.equal(1);
});
@ -235,7 +251,12 @@ describe('MessageQueue', () => {
const message = TestUtils.generateOpenGroupVisibleMessage();
const roomInfos = TestUtils.generateOpenGroupV2RoomInfos();
await messageQueueStub.sendToOpenGroupV2(message, roomInfos, false, []);
await messageQueueStub.sendToOpenGroupV2({
message,
roomInfos,
blinded: false,
filesToLink: [],
});
expect(messageSentPublicHandlerSuccessStub.callCount).to.equal(1);
expect(messageSentPublicHandlerSuccessStub.lastCall.args[0]).to.equal(message.identifier);
@ -250,7 +271,12 @@ describe('MessageQueue', () => {
const message = TestUtils.generateOpenGroupVisibleMessage();
const roomInfos = TestUtils.generateOpenGroupV2RoomInfos();
await messageQueueStub.sendToOpenGroupV2(message, roomInfos, false, []);
await messageQueueStub.sendToOpenGroupV2({
message,
roomInfos,
blinded: false,
filesToLink: [],
});
expect(messageSentHandlerFailedStub.callCount).to.equal(1);
expect(messageSentHandlerFailedStub.lastCall.args[0].identifier).to.equal(
message.identifier

@ -17,6 +17,7 @@ import { OnionV4 } from '../../../../session/onions/onionv4';
import { OnionSending } from '../../../../session/onions/onionSend';
import { OpenGroupMessageV2 } from '../../../../session/apis/open_group_api/opengroupV2/OpenGroupMessageV2';
import { GetNetworkTime } from '../../../../session/apis/snode_api/getNetworkTime';
import { SnodeNamespaces } from '../../../../session/apis/snode_api/namespaces';
describe('MessageSender', () => {
afterEach(() => {
@ -52,7 +53,8 @@ describe('MessageSender', () => {
beforeEach(async () => {
rawMessage = await MessageUtils.toRawMessage(
TestUtils.generateFakePubKey(),
TestUtils.generateVisibleMessage()
TestUtils.generateVisibleMessage(),
SnodeNamespaces.UserMessages
);
});
@ -100,7 +102,11 @@ describe('MessageSender', () => {
const device = TestUtils.generateFakePubKey();
const visibleMessage = TestUtils.generateVisibleMessage();
const rawMessage = await MessageUtils.toRawMessage(device, visibleMessage);
const rawMessage = await MessageUtils.toRawMessage(
device,
visibleMessage,
SnodeNamespaces.UserMessages
);
await MessageSender.send(rawMessage, 3, 10);
@ -117,7 +123,11 @@ describe('MessageSender', () => {
const device = TestUtils.generateFakePubKey();
const visibleMessage = TestUtils.generateVisibleMessage();
const rawMessage = await MessageUtils.toRawMessage(device, visibleMessage);
const rawMessage = await MessageUtils.toRawMessage(
device,
visibleMessage,
SnodeNamespaces.UserMessages
);
const offset = 200000;
Sinon.stub(GetNetworkTime, 'getLatestTimestampOffset').returns(offset);
await MessageSender.send(rawMessage, 3, 10);
@ -149,7 +159,11 @@ describe('MessageSender', () => {
const visibleMessageExpected = TestUtils.generateVisibleMessage({
timestamp: decodedTimestampFromSending,
});
const rawMessageExpected = await MessageUtils.toRawMessage(device, visibleMessageExpected);
const rawMessageExpected = await MessageUtils.toRawMessage(
device,
visibleMessageExpected,
0
);
expect(envelope.content).to.deep.equal(rawMessageExpected.plainTextBuffer);
});
@ -162,7 +176,11 @@ describe('MessageSender', () => {
const device = TestUtils.generateFakePubKey();
const visibleMessage = TestUtils.generateVisibleMessage();
const rawMessage = await MessageUtils.toRawMessage(device, visibleMessage);
const rawMessage = await MessageUtils.toRawMessage(
device,
visibleMessage,
SnodeNamespaces.UserMessages
);
await MessageSender.send(rawMessage, 3, 10);
const data = sessionMessageAPISendStub.getCall(0).args[1];

@ -5,6 +5,7 @@ import * as _ from 'lodash';
import { MessageUtils } from '../../../../session/utils';
import { TestUtils } from '../../../../test/test-utils';
import { PendingMessageCache } from '../../../../session/sending/PendingMessageCache';
import { SnodeNamespaces } from '../../../../session/apis/snode_api/namespaces';
// Equivalent to Data.StorageItem
interface StorageItem {
@ -55,9 +56,13 @@ describe('PendingMessageCache', () => {
it('can add to cache', async () => {
const device = TestUtils.generateFakePubKey();
const message = TestUtils.generateVisibleMessage();
const rawMessage = await MessageUtils.toRawMessage(device, message);
const rawMessage = await MessageUtils.toRawMessage(
device,
message,
SnodeNamespaces.UserMessages
);
await pendingMessageCacheStub.add(device, message);
await pendingMessageCacheStub.add(device, message, SnodeNamespaces.UserMessages);
// Verify that the message is in the cache
const finalCache = await pendingMessageCacheStub.getAllPending();
@ -71,12 +76,24 @@ describe('PendingMessageCache', () => {
it('can add multiple messages belonging to the same user', async () => {
const device = TestUtils.generateFakePubKey();
await pendingMessageCacheStub.add(device, TestUtils.generateVisibleMessage());
await pendingMessageCacheStub.add(
device,
TestUtils.generateVisibleMessage(),
SnodeNamespaces.UserMessages
);
// We have to timeout here otherwise it's processed too fast and messages start having the same timestamp
await TestUtils.timeout(5);
await pendingMessageCacheStub.add(device, TestUtils.generateVisibleMessage());
await pendingMessageCacheStub.add(
device,
TestUtils.generateVisibleMessage(),
SnodeNamespaces.UserMessages
);
await TestUtils.timeout(5);
await pendingMessageCacheStub.add(device, TestUtils.generateVisibleMessage());
await pendingMessageCacheStub.add(
device,
TestUtils.generateVisibleMessage(),
SnodeNamespaces.UserMessages
);
// Verify that the message is in the cache
const finalCache = await pendingMessageCacheStub.getAllPending();
@ -87,9 +104,13 @@ describe('PendingMessageCache', () => {
it('can remove from cache', async () => {
const device = TestUtils.generateFakePubKey();
const message = TestUtils.generateVisibleMessage();
const rawMessage = await MessageUtils.toRawMessage(device, message);
const rawMessage = await MessageUtils.toRawMessage(
device,
message,
SnodeNamespaces.UserMessages
);
await pendingMessageCacheStub.add(device, message);
await pendingMessageCacheStub.add(device, message, SnodeNamespaces.UserMessages);
const initialCache = await pendingMessageCacheStub.getAllPending();
expect(initialCache).to.have.length(1);
@ -106,12 +127,24 @@ describe('PendingMessageCache', () => {
it('should only remove messages with different identifier and device', async () => {
const device = TestUtils.generateFakePubKey();
const message = TestUtils.generateVisibleMessage();
const rawMessage = await MessageUtils.toRawMessage(device, message);
const rawMessage = await MessageUtils.toRawMessage(
device,
message,
SnodeNamespaces.UserMessages
);
await pendingMessageCacheStub.add(device, message);
await pendingMessageCacheStub.add(device, message, SnodeNamespaces.UserMessages);
await TestUtils.timeout(5);
const one = await pendingMessageCacheStub.add(device, TestUtils.generateVisibleMessage());
const two = await pendingMessageCacheStub.add(TestUtils.generateFakePubKey(), message);
const one = await pendingMessageCacheStub.add(
device,
TestUtils.generateVisibleMessage(),
SnodeNamespaces.UserMessages
);
const two = await pendingMessageCacheStub.add(
TestUtils.generateFakePubKey(),
message,
SnodeNamespaces.UserMessages
);
const initialCache = await pendingMessageCacheStub.getAllPending();
expect(initialCache).to.have.length(3);
@ -143,7 +176,7 @@ describe('PendingMessageCache', () => {
];
for (const item of cacheItems) {
await pendingMessageCacheStub.add(item.device, item.message);
await pendingMessageCacheStub.add(item.device, item.message, SnodeNamespaces.UserMessages);
}
const cache = await pendingMessageCacheStub.getAllPending();
@ -171,7 +204,7 @@ describe('PendingMessageCache', () => {
];
for (const item of cacheItems) {
await pendingMessageCacheStub.add(item.device, item.message);
await pendingMessageCacheStub.add(item.device, item.message, SnodeNamespaces.UserMessages);
}
const initialCache = await pendingMessageCacheStub.getAllPending();
@ -188,7 +221,11 @@ describe('PendingMessageCache', () => {
it('can find nothing when empty', async () => {
const device = TestUtils.generateFakePubKey();
const message = TestUtils.generateVisibleMessage();
const rawMessage = await MessageUtils.toRawMessage(device, message);
const rawMessage = await MessageUtils.toRawMessage(
device,
message,
SnodeNamespaces.UserMessages
);
const foundMessage = pendingMessageCacheStub.find(rawMessage);
expect(foundMessage, 'a message was found in empty cache').to.be.undefined;
@ -197,9 +234,13 @@ describe('PendingMessageCache', () => {
it('can find message in cache', async () => {
const device = TestUtils.generateFakePubKey();
const message = TestUtils.generateVisibleMessage();
const rawMessage = await MessageUtils.toRawMessage(device, message);
const rawMessage = await MessageUtils.toRawMessage(
device,
message,
SnodeNamespaces.UserMessages
);
await pendingMessageCacheStub.add(device, message);
await pendingMessageCacheStub.add(device, message, SnodeNamespaces.UserMessages);
const finalCache = await pendingMessageCacheStub.getAllPending();
expect(finalCache).to.have.length(1);
@ -226,7 +267,7 @@ describe('PendingMessageCache', () => {
];
for (const item of cacheItems) {
await pendingMessageCacheStub.add(item.device, item.message);
await pendingMessageCacheStub.add(item.device, item.message, SnodeNamespaces.UserMessages);
}
const initialCache = await pendingMessageCacheStub.getAllPending();
@ -256,7 +297,7 @@ describe('PendingMessageCache', () => {
];
for (const item of cacheItems) {
await pendingMessageCacheStub.add(item.device, item.message);
await pendingMessageCacheStub.add(item.device, item.message, SnodeNamespaces.UserMessages);
}
const addedMessages = await pendingMessageCacheStub.getAllPending();

@ -25,6 +25,7 @@ import { ConversationTypeEnum } from '../../../../models/conversationAttributes'
import { getOpenGroupV2ConversationId } from '../../../../session/apis/open_group_api/utils/OpenGroupUtils';
import { beforeEach } from 'mocha';
import { OpenGroupData, OpenGroupV2Room } from '../../../../data/opengroups';
import { SnodeNamespaces } from '../../../../session/apis/snode_api/namespaces';
const { expect } = chai;
@ -39,7 +40,11 @@ describe('Message Utils', () => {
const device = TestUtils.generateFakePubKey();
const message = TestUtils.generateVisibleMessage();
const rawMessage = await MessageUtils.toRawMessage(device, message);
const rawMessage = await MessageUtils.toRawMessage(
device,
message,
SnodeNamespaces.UserContacts
);
expect(Object.keys(rawMessage)).to.have.length(5);
@ -56,13 +61,18 @@ describe('Message Utils', () => {
expect(rawMessage.device).to.equal(device.key);
expect(rawMessage.plainTextBuffer).to.deep.equal(message.plainTextBuffer());
expect(rawMessage.ttl).to.equal(message.ttl());
expect(rawMessage.namespace).to.equal(3);
});
it('should generate valid plainTextBuffer', async () => {
const device = TestUtils.generateFakePubKey();
const message = TestUtils.generateVisibleMessage();
const rawMessage = await MessageUtils.toRawMessage(device, message);
const rawMessage = await MessageUtils.toRawMessage(
device,
message,
SnodeNamespaces.UserMessages
);
const rawBuffer = rawMessage.plainTextBuffer;
const rawBufferJSON = JSON.stringify(rawBuffer);
@ -82,7 +92,11 @@ describe('Message Utils', () => {
const device = TestUtils.generateFakePubKey();
const message = TestUtils.generateVisibleMessage();
const rawMessage = await MessageUtils.toRawMessage(device, message);
const rawMessage = await MessageUtils.toRawMessage(
device,
message,
SnodeNamespaces.UserMessages
);
const derivedPubKey = PubKey.from(rawMessage.device);
expect(derivedPubKey).to.not.be.eq(undefined, 'should maintain pubkey');
@ -98,14 +112,22 @@ describe('Message Utils', () => {
const chatMessage = TestUtils.generateVisibleMessage();
const message = new ClosedGroupVisibleMessage({ chatMessage, groupId });
const rawMessage = await MessageUtils.toRawMessage(device, message);
const rawMessage = await MessageUtils.toRawMessage(
device,
message,
SnodeNamespaces.UserMessages
);
expect(rawMessage.encryption).to.equal(SignalService.Envelope.Type.CLOSED_GROUP_MESSAGE);
});
it('should set encryption to Fallback on other messages', async () => {
const device = TestUtils.generateFakePubKey();
const message = TestUtils.generateVisibleMessage();
const rawMessage = await MessageUtils.toRawMessage(device, message);
const rawMessage = await MessageUtils.toRawMessage(
device,
message,
SnodeNamespaces.UserMessages
);
expect(rawMessage.encryption).to.equal(SignalService.Envelope.Type.SESSION_MESSAGE);
});
@ -123,7 +145,7 @@ describe('Message Utils', () => {
keypair: TestUtils.generateFakeECKeyPair(),
expireTimer: 0,
});
const rawMessage = await MessageUtils.toRawMessage(device, msg);
const rawMessage = await MessageUtils.toRawMessage(device, msg, SnodeNamespaces.UserMessages);
expect(rawMessage.encryption).to.equal(SignalService.Envelope.Type.SESSION_MESSAGE);
});
@ -135,7 +157,7 @@ describe('Message Utils', () => {
name: 'df',
groupId: TestUtils.generateFakePubKey().key,
});
const rawMessage = await MessageUtils.toRawMessage(device, msg);
const rawMessage = await MessageUtils.toRawMessage(device, msg, SnodeNamespaces.UserMessages);
expect(rawMessage.encryption).to.equal(SignalService.Envelope.Type.CLOSED_GROUP_MESSAGE);
});
@ -147,7 +169,7 @@ describe('Message Utils', () => {
addedMembers: [TestUtils.generateFakePubKey().key],
groupId: TestUtils.generateFakePubKey().key,
});
const rawMessage = await MessageUtils.toRawMessage(device, msg);
const rawMessage = await MessageUtils.toRawMessage(device, msg, SnodeNamespaces.UserMessages);
expect(rawMessage.encryption).to.equal(SignalService.Envelope.Type.CLOSED_GROUP_MESSAGE);
});
@ -159,7 +181,7 @@ describe('Message Utils', () => {
removedMembers: [TestUtils.generateFakePubKey().key],
groupId: TestUtils.generateFakePubKey().key,
});
const rawMessage = await MessageUtils.toRawMessage(device, msg);
const rawMessage = await MessageUtils.toRawMessage(device, msg, SnodeNamespaces.UserMessages);
expect(rawMessage.encryption).to.equal(SignalService.Envelope.Type.CLOSED_GROUP_MESSAGE);
});
@ -180,7 +202,7 @@ describe('Message Utils', () => {
groupId: TestUtils.generateFakePubKey().key,
encryptedKeyPairs: fakeWrappers,
});
const rawMessage = await MessageUtils.toRawMessage(device, msg);
const rawMessage = await MessageUtils.toRawMessage(device, msg, SnodeNamespaces.UserMessages);
expect(rawMessage.encryption).to.equal(SignalService.Envelope.Type.CLOSED_GROUP_MESSAGE);
});
@ -201,7 +223,7 @@ describe('Message Utils', () => {
groupId: TestUtils.generateFakePubKey().key,
encryptedKeyPairs: fakeWrappers,
});
const rawMessage = await MessageUtils.toRawMessage(device, msg);
const rawMessage = await MessageUtils.toRawMessage(device, msg, SnodeNamespaces.UserMessages);
expect(rawMessage.encryption).to.equal(SignalService.Envelope.Type.SESSION_MESSAGE);
});
@ -215,7 +237,7 @@ describe('Message Utils', () => {
displayName: 'displayName',
contacts: [],
});
const rawMessage = await MessageUtils.toRawMessage(device, msg);
const rawMessage = await MessageUtils.toRawMessage(device, msg, SnodeNamespaces.UserMessages);
expect(rawMessage.encryption).to.equal(SignalService.Envelope.Type.SESSION_MESSAGE);
});
});

Loading…
Cancel
Save