diff --git a/js/modules/loki_snode_api.js b/js/modules/loki_snode_api.js index 8795e50af..626aff7e7 100644 --- a/js/modules/loki_snode_api.js +++ b/js/modules/loki_snode_api.js @@ -408,11 +408,7 @@ class LokiSnodeAPI { }); } - // FIXME: need a lock because it is being called multiple times in parallel - async buildNewOnionPaths() { - // Note: this function may be called concurrently, so - // might consider blocking the other calls - + async buildNewOnionPathsWorker() { const _ = window.Lodash; log.info('LokiSnodeAPI::buildNewOnionPaths - building new onion paths'); @@ -490,6 +486,14 @@ class LokiSnodeAPI { log.info(`Built ${this.onionPaths.length} onion paths`, this.onionPaths); } + async buildNewOnionPaths() { + // this function may be called concurrently make sure we only have one inflight + return primitives.allowOnlyOneAtATime( + 'buildNewOnionPaths', + this.buildNewOnionPathsWorker + ); + } + async getRandomSnodeAddress() { // resolve random snode if (this.randomSnodePool.length === 0) { diff --git a/ts/receiver/receiver.ts b/ts/receiver/receiver.ts index 4aa7ebe3c..5169ce269 100644 --- a/ts/receiver/receiver.ts +++ b/ts/receiver/receiver.ts @@ -651,9 +651,7 @@ export async function handleMessageEvent(event: any): Promise { confirm(); return; } - } - - if (source !== ourNumber) { + } else if (source !== ourNumber) { // Ignore auth from our devices conversationId = primarySource.key; } diff --git a/ts/session/sending/MessageQueue.ts b/ts/session/sending/MessageQueue.ts index e4ea74ea0..c3e30465b 100644 --- a/ts/session/sending/MessageQueue.ts +++ b/ts/session/sending/MessageQueue.ts @@ -8,6 +8,7 @@ import { ContentMessage, OpenGroupMessage, SessionRequestMessage, + SyncMessage, } from '../messages/outgoing'; import { PendingMessageCache } from './PendingMessageCache'; import { @@ -26,9 +27,9 @@ export class MessageQueue implements MessageQueueInterface { private readonly jobQueues: Map = new Map(); private readonly pendingMessageCache: PendingMessageCache; - constructor() { + constructor(cache?: PendingMessageCache) { this.events = new EventEmitter(); - this.pendingMessageCache = new PendingMessageCache(); + this.pendingMessageCache = cache ?? new PendingMessageCache(); void this.processAllPending(); } @@ -46,26 +47,24 @@ export class MessageQueue implements MessageQueueInterface { devices: Array, message: ContentMessage ) { - await this.pendingMessageCache.isReady; - let currentDevices = [...devices]; // Sync to our devices if syncable if (SyncMessageUtils.canSync(message)) { - const currentDevice = await UserUtil.getCurrentDevicePubKey(); - - if (currentDevice) { - const ourDevices = await MultiDeviceProtocol.getAllDevices( - currentDevice + const syncMessage = SyncMessageUtils.from(message); + if (!syncMessage) { + throw new Error( + 'MessageQueue internal error occured: failed to make sync message' ); + } - await this.sendSyncMessage(message, ourDevices); + await this.sendSyncMessage(syncMessage); - // Remove our devices from currentDevices - currentDevices = currentDevices.filter(device => - ourDevices.some(d => device.isEqual(d)) - ); - } + const ourDevices = await MultiDeviceProtocol.getOurDevices(); + // Remove our devices from currentDevices + currentDevices = currentDevices.filter( + device => !ourDevices.some(d => device.isEqual(d)) + ); } const promises = currentDevices.map(async device => { @@ -78,39 +77,45 @@ export class MessageQueue implements MessageQueueInterface { public async sendToGroup( message: OpenGroupMessage | ClosedGroupMessage ): Promise { - await this.pendingMessageCache.isReady; - // Closed groups if (message instanceof ClosedGroupMessage) { // Get devices in closed group - const groupPubKey = PubKey.from(message.groupId); - if (!groupPubKey) { + const recipients = await GroupUtils.getGroupMembers(message.groupId); + if (recipients.length === 0) { return false; } - const recipients = await GroupUtils.getGroupMembers(groupPubKey); - - if (recipients.length) { - await Promise.all( - recipients.map(async recipient => - this.sendUsingMultiDevice(recipient, message) - ) - ); + // Send to all devices of members + await Promise.all( + recipients.map(async recipient => + this.sendUsingMultiDevice(recipient, message) + ) + ); - return true; - } + return true; } // Open groups if (message instanceof OpenGroupMessage) { // No queue needed for Open Groups; send directly + const error = new Error('Failed to send message to open group.'); + + // This is absolutely yucky ... we need to make it not use Promise try { - await MessageSender.sendToOpenGroup(message); - this.events.emit('success', message); + const result = await MessageSender.sendToOpenGroup(message); + if (result) { + this.events.emit('success', message); + } else { + this.events.emit('fail', message, error); + } - return true; + return result; } catch (e) { - this.events.emit('fail', message, e); + console.warn( + `Failed to send message to open group: ${message.group.server}`, + e + ); + this.events.emit('fail', message, error); return false; } @@ -119,23 +124,22 @@ export class MessageQueue implements MessageQueueInterface { return false; } - public async sendSyncMessage(message: ContentMessage, sendTo: Array) { - await this.pendingMessageCache.isReady; - // Sync with our devices - const promises = sendTo.map(async device => { - const syncMessage = SyncMessageUtils.from(message); - - return this.process(device, syncMessage); - }); + public async sendSyncMessage(message: SyncMessage | undefined): Promise { + if (!message) { + return; + } + const ourDevices = await MultiDeviceProtocol.getOurDevices(); + const promises = ourDevices.map(async device => + this.process(device, message) + ); return Promise.all(promises); } public async processPending(device: PubKey) { - await this.pendingMessageCache.isReady; - const messages = this.pendingMessageCache.getForDevice(device); + const messages = await this.pendingMessageCache.getForDevice(device); - const isMediumGroup = GroupUtils.isMediumGroup(device); + const isMediumGroup = GroupUtils.isMediumGroup(device.key); const hasSession = await SessionProtocol.hasSession(device); if (!isMediumGroup && !hasSession) { @@ -153,23 +157,28 @@ export class MessageQueue implements MessageQueueInterface { await jobQueue.addWithId(messageId, async () => MessageSender.send(message) ); - void this.pendingMessageCache.remove(message); this.events.emit('success', message); } catch (e) { this.events.emit('fail', message, e); + } finally { + // Remove from the cache because retrying is done in the sender + void this.pendingMessageCache.remove(message); } } }); } private async processAllPending() { - const devices = this.pendingMessageCache.getDevices(); + const devices = await this.pendingMessageCache.getDevices(); const promises = devices.map(async device => this.processPending(device)); return Promise.all(promises); } - private async process(device: PubKey, message?: ContentMessage) { + private async process( + device: PubKey, + message?: ContentMessage + ): Promise { // Don't send to ourselves const currentDevice = await UserUtil.getCurrentDevicePubKey(); if (!message || (currentDevice && device.isEqual(currentDevice))) { @@ -181,7 +190,7 @@ export class MessageQueue implements MessageQueueInterface { } await this.pendingMessageCache.add(device, message); - await this.processPending(device); + void this.processPending(device); } private getJobQueue(device: PubKey): JobQueue { diff --git a/ts/session/sending/MessageQueueInterface.ts b/ts/session/sending/MessageQueueInterface.ts index c3ee606aa..5bed428ca 100644 --- a/ts/session/sending/MessageQueueInterface.ts +++ b/ts/session/sending/MessageQueueInterface.ts @@ -2,6 +2,7 @@ import { ClosedGroupMessage, ContentMessage, OpenGroupMessage, + SyncMessage, } from '../messages/outgoing'; import { RawMessage } from '../types/RawMessage'; import { TypedEventEmitter } from '../utils'; @@ -19,8 +20,5 @@ export interface MessageQueueInterface { sendUsingMultiDevice(user: PubKey, message: ContentMessage): void; send(device: PubKey, message: ContentMessage): void; sendToGroup(message: GroupMessageType): void; - sendSyncMessage( - message: ContentMessage, - sendTo: Array - ): Promise>; + sendSyncMessage(message: SyncMessage | undefined): Promise; } diff --git a/ts/session/sending/PendingMessageCache.ts b/ts/session/sending/PendingMessageCache.ts index 0a4037c1c..a26cea8c6 100644 --- a/ts/session/sending/PendingMessageCache.ts +++ b/ts/session/sending/PendingMessageCache.ts @@ -1,3 +1,4 @@ +import _ from 'lodash'; import { createOrUpdateItem, getItemById } from '../../../js/modules/data'; import { PartialRawMessage, RawMessage } from '../types/RawMessage'; import { ContentMessage } from '../messages/outgoing'; @@ -12,33 +13,25 @@ import { MessageUtils } from '../utils'; // memory and sync its state with the database on modification (add or remove). export class PendingMessageCache { - public readonly isReady: Promise; - private cache: Array; + protected loadPromise: Promise | undefined; + protected cache: Array = []; - constructor() { - // Load pending messages from the database - // You should await isReady on making a new PendingMessageCache - // if you'd like to have instant access to the cache - this.cache = []; - - this.isReady = new Promise(async resolve => { - await this.loadFromDB(); - resolve(true); - }); - } - - public getAllPending(): Array { + public async getAllPending(): Promise> { + await this.loadFromDBIfNeeded(); // Get all pending from cache, sorted with oldest first return [...this.cache].sort((a, b) => a.timestamp - b.timestamp); } - public getForDevice(device: PubKey): Array { - return this.getAllPending().filter(m => m.device === device.key); + public async getForDevice(device: PubKey): Promise> { + const pending = await this.getAllPending(); + return pending.filter(m => m.device === device.key); } - public getDevices(): Array { + public async getDevices(): Promise> { + await this.loadFromDBIfNeeded(); + // Gets all unique devices with pending messages - const pubkeyStrings = [...new Set(this.cache.map(m => m.device))]; + const pubkeyStrings = _.uniq(this.cache.map(m => m.device)); return pubkeyStrings.map(PubKey.from).filter((k): k is PubKey => !!k); } @@ -47,6 +40,7 @@ export class PendingMessageCache { device: PubKey, message: ContentMessage ): Promise { + await this.loadFromDBIfNeeded(); const rawMessage = MessageUtils.toRawMessage(device, message); // Does it exist in cache already? @@ -63,6 +57,7 @@ export class PendingMessageCache { public async remove( message: RawMessage ): Promise | undefined> { + await this.loadFromDBIfNeeded(); // Should only be called after message is processed // Return if message doesn't exist in cache @@ -72,7 +67,11 @@ export class PendingMessageCache { // Remove item from cache and sync with database const updatedCache = this.cache.filter( - m => m.identifier !== message.identifier + cached => + !( + cached.device === message.device && + cached.timestamp === message.timestamp + ) ); this.cache = updatedCache; await this.saveToDB(); @@ -93,12 +92,20 @@ export class PendingMessageCache { await this.saveToDB(); } - private async loadFromDB() { + protected async loadFromDBIfNeeded() { + if (!this.loadPromise) { + this.loadPromise = this.loadFromDB(); + } + + await this.loadPromise; + } + + protected async loadFromDB() { const messages = await this.getFromStorage(); this.cache = messages; } - private async getFromStorage(): Promise> { + protected async getFromStorage(): Promise> { const data = await getItemById('pendingMessages'); if (!data || !data.value) { return []; @@ -117,7 +124,7 @@ export class PendingMessageCache { }); } - private async saveToDB() { + protected async saveToDB() { // For each plainTextBuffer in cache, save in as a simple Array to avoid // Node issues with JSON stringifying Buffer without strict typing const encodedCache = [...this.cache].map(item => { diff --git a/ts/session/utils/Groups.ts b/ts/session/utils/Groups.ts index d73104b63..c3c4cf1b9 100644 --- a/ts/session/utils/Groups.ts +++ b/ts/session/utils/Groups.ts @@ -1,7 +1,11 @@ -import { PubKey } from '../types'; +import _ from 'lodash'; +import { PrimaryPubKey } from '../types'; +import { MultiDeviceProtocol } from '../protocols'; -export async function getGroupMembers(groupId: PubKey): Promise> { - const groupConversation = window.ConversationController.get(groupId.key); +export async function getGroupMembers( + groupId: string +): Promise> { + const groupConversation = window.ConversationController.get(groupId); const groupMembers = groupConversation ? groupConversation.attributes.members : undefined; @@ -10,11 +14,16 @@ export async function getGroupMembers(groupId: PubKey): Promise> { return []; } - return groupMembers.map((member: string) => new PubKey(member)); + const promises = (groupMembers as Array).map(async (member: string) => + MultiDeviceProtocol.getPrimaryDevice(member) + ); + const primaryDevices = await Promise.all(promises); + + return _.uniqWith(primaryDevices, (a, b) => a.isEqual(b)); } -export function isMediumGroup(groupId: PubKey): boolean { - const conversation = window.ConversationController.get(groupId.key); +export function isMediumGroup(groupId: string): boolean { + const conversation = window.ConversationController.get(groupId); if (!conversation) { return false; diff --git a/ts/session/utils/Promise.ts b/ts/session/utils/Promise.ts new file mode 100644 index 000000000..b106f6032 --- /dev/null +++ b/ts/session/utils/Promise.ts @@ -0,0 +1,118 @@ +type SimpleFunction = (arg: T) => void; +type Return = Promise | T; + +async function toPromise(value: Return): Promise { + return value instanceof Promise ? value : Promise.resolve(value); +} + +/** + * Create a promise which waits until `done` is called or until `timeout` period is reached. + * If `timeout` is reached then this will throw an Error. + * + * @param task The task to wait for. + * @param timeout The timeout period. + */ +export async function waitForTask( + task: (done: SimpleFunction) => Return, + timeout: number = 2000 +): Promise { + const timeoutPromise = new Promise((_, rej) => { + const wait = setTimeout(() => { + clearTimeout(wait); + rej(new Error('Task timed out.')); + }, timeout); + }); + + const taskPromise = new Promise(async (res, rej) => { + try { + await toPromise(task(res)); + } catch (e) { + rej(e); + } + }); + + return Promise.race([timeoutPromise, taskPromise]) as Promise; +} + +interface PollOptions { + timeout: number; + interval: number; +} + +/** + * Creates a promise which calls the `task` every `interval` until `done` is called or until `timeout` period is reached. + * If `timeout` is reached then this will throw an Error. + * + * @param check The check which runs every `interval` ms. + * @param options The polling options. + */ +export async function poll( + task: (done: SimpleFunction) => Return, + options: Partial = {} +): Promise { + const defaults: PollOptions = { + timeout: 2000, + interval: 1000, + }; + + const { timeout, interval } = { + ...defaults, + ...options, + }; + + const endTime = Date.now() + timeout; + let stop = false; + const finish = () => { + stop = true; + }; + + const _poll = async (resolve: any, reject: any) => { + if (stop) { + resolve(); + } else if (Date.now() >= endTime) { + finish(); + reject(new Error('Periodic check timeout')); + } else { + try { + await toPromise(task(finish)); + } catch (e) { + finish(); + reject(e); + return; + } + + setTimeout(() => { + void _poll(resolve, reject); + }, interval); + } + }; + + return new Promise((resolve, reject) => { + void _poll(resolve, reject); + }); +} + +/** + * Creates a promise which waits until `check` returns `true` or rejects if `timeout` preiod is reached. + * If `timeout` is reached then this will throw an Error. + * + * @param check The boolean check. + * @param timeout The time before an error is thrown. + */ +export async function waitUntil( + check: () => Return, + timeout: number = 2000 +) { + // This is causing unhandled promise rejection somewhere in MessageQueue tests + return poll( + async done => { + const result = await toPromise(check()); + if (result) { + done(); + } + }, + { + timeout, + } + ); +} diff --git a/ts/session/utils/SyncMessageUtils.ts b/ts/session/utils/SyncMessageUtils.ts index 655189787..51201d2cc 100644 --- a/ts/session/utils/SyncMessageUtils.ts +++ b/ts/session/utils/SyncMessageUtils.ts @@ -5,7 +5,9 @@ import { ContentMessage, SyncMessage } from '../messages/outgoing'; import { MultiDeviceProtocol } from '../protocols'; export function from(message: ContentMessage): SyncMessage | undefined { - // const { timestamp, identifier } = message; + if (message instanceof SyncMessage) { + return message; + } // Stubbed for now return undefined; diff --git a/ts/session/utils/index.ts b/ts/session/utils/index.ts index c619b8d2f..dde73e928 100644 --- a/ts/session/utils/index.ts +++ b/ts/session/utils/index.ts @@ -2,8 +2,15 @@ import * as MessageUtils from './Messages'; import * as GroupUtils from './Groups'; import * as SyncMessageUtils from './SyncMessageUtils'; import * as StringUtils from './String'; +import * as PromiseUtils from './Promise'; export * from './TypedEmitter'; export * from './JobQueue'; -export { MessageUtils, SyncMessageUtils, GroupUtils, StringUtils }; +export { + MessageUtils, + SyncMessageUtils, + GroupUtils, + StringUtils, + PromiseUtils, +}; diff --git a/ts/test/session/protocols/MultiDeviceProtocol_test.ts b/ts/test/session/protocols/MultiDeviceProtocol_test.ts index 09230ca2b..ad287a3dc 100644 --- a/ts/test/session/protocols/MultiDeviceProtocol_test.ts +++ b/ts/test/session/protocols/MultiDeviceProtocol_test.ts @@ -1,6 +1,6 @@ import { expect } from 'chai'; import * as sinon from 'sinon'; -import { TestUtils, timeout } from '../../test-utils'; +import { TestUtils } from '../../test-utils'; import { PairingAuthorisation } from '../../../../js/modules/data'; import { MultiDeviceProtocol } from '../../../session/protocols'; import { PubKey } from '../../../session/types'; @@ -183,7 +183,7 @@ describe('MultiDeviceProtocol', () => { it('should not fetch if the refresh delay has not been met', async () => { await MultiDeviceProtocol.fetchPairingAuthorisationsIfNeeded(device); - await timeout(100); + await TestUtils.timeout(100); await MultiDeviceProtocol.fetchPairingAuthorisationsIfNeeded(device); expect(fetchPairingAuthorisationStub.callCount).to.equal( 1, @@ -202,21 +202,21 @@ describe('MultiDeviceProtocol', () => { it('should fetch again if something went wrong while fetching', async () => { fetchPairingAuthorisationStub.throws(new Error('42')); await MultiDeviceProtocol.fetchPairingAuthorisationsIfNeeded(device); - await timeout(100); + await TestUtils.timeout(100); await MultiDeviceProtocol.fetchPairingAuthorisationsIfNeeded(device); expect(fetchPairingAuthorisationStub.callCount).to.equal(2); }); it('should fetch only once if called rapidly', async () => { fetchPairingAuthorisationStub.callsFake(async () => { - await timeout(200); + await TestUtils.timeout(200); return []; }); void MultiDeviceProtocol.fetchPairingAuthorisationsIfNeeded(device); - await timeout(10); + await TestUtils.timeout(10); void MultiDeviceProtocol.fetchPairingAuthorisationsIfNeeded(device); - await timeout(200); + await TestUtils.timeout(200); expect(fetchPairingAuthorisationStub.callCount).to.equal(1); }); diff --git a/ts/test/session/protocols/SessionProtocol_test.ts b/ts/test/session/protocols/SessionProtocol_test.ts index b78d48e24..712088aa2 100644 --- a/ts/test/session/protocols/SessionProtocol_test.ts +++ b/ts/test/session/protocols/SessionProtocol_test.ts @@ -1,7 +1,7 @@ import { expect } from 'chai'; import { SessionProtocol } from '../../../session/protocols'; import * as sinon from 'sinon'; -import { Stubs, TestUtils, timeout } from '../../test-utils'; +import { Stubs, TestUtils } from '../../test-utils'; import { UserUtil } from '../../../util'; import { SessionRequestMessage } from '../../../session/messages/outgoing'; import { TextEncoder } from 'util'; @@ -219,7 +219,7 @@ describe('SessionProtocol', () => { expect(SessionProtocol.getProcessedSessionsTimestamp()) .to.have.property('deviceid') .to.be.approximately(Date.now(), 5); - await timeout(5); + await TestUtils.timeout(5); const oldTimestamp = SessionProtocol.getProcessedSessionsTimestamp() .deviceid; await SessionProtocol.onSessionRequestProcessed(pubkey); @@ -291,7 +291,7 @@ describe('SessionProtocol', () => { it('protocol: shouldProcessSessionRequest returns false if there is a more recent sent but a less recent processed', async () => { await SessionProtocol.sendSessionRequest(resetMessage, pubkey); // adds a Date.now() entry - await timeout(100); + await TestUtils.timeout(100); await SessionProtocol.onSessionRequestProcessed(pubkey); // adds a Date.now() entry 100ms after expect( @@ -304,7 +304,7 @@ describe('SessionProtocol', () => { it('protocol: shouldProcessSessionRequest returns false if there is a more recent processed but a less recent sent', async () => { await SessionProtocol.onSessionRequestProcessed(pubkey); // adds a Date.now() entry - await timeout(100); + await TestUtils.timeout(100); await SessionProtocol.sendSessionRequest(resetMessage, pubkey); // adds a Date.now() entry 100ms after expect( diff --git a/ts/test/session/sending/MessageQueue_test.ts b/ts/test/session/sending/MessageQueue_test.ts index 746eb224f..c0d223ffc 100644 --- a/ts/test/session/sending/MessageQueue_test.ts +++ b/ts/test/session/sending/MessageQueue_test.ts @@ -1,96 +1,56 @@ -import { expect } from 'chai'; +import chai from 'chai'; import * as sinon from 'sinon'; -import * as _ from 'lodash'; -import { GroupUtils, SyncMessageUtils } from '../../../session/utils'; +import _ from 'lodash'; +import { + GroupUtils, + PromiseUtils, + SyncMessageUtils, +} from '../../../session/utils'; import { Stubs, TestUtils } from '../../../test/test-utils'; import { MessageQueue } from '../../../session/sending/MessageQueue'; import { - ChatMessage, ClosedGroupMessage, ContentMessage, OpenGroupMessage, } from '../../../session/messages/outgoing'; -import { PubKey, RawMessage } from '../../../session/types'; +import { PrimaryPubKey, PubKey, RawMessage } from '../../../session/types'; import { UserUtil } from '../../../util'; -import { MessageSender, PendingMessageCache } from '../../../session/sending'; -import { toRawMessage } from '../../../session/utils/Messages'; +import { MessageSender } from '../../../session/sending'; import { MultiDeviceProtocol, SessionProtocol, } from '../../../session/protocols'; +import { PendingMessageCacheStub } from '../../test-utils/stubs'; +import { describe } from 'mocha'; +import { TestSyncMessage } from '../../test-utils/stubs/messages/TestSyncMessage'; -// Equivalent to Data.StorageItem -interface StorageItem { - id: string; - value: any; -} - -// Helper function to force sequential on events checks -async function tick() { - return new Promise(resolve => { - // tslint:disable-next-line: no-string-based-set-timeout - setTimeout(resolve, 0); - }); -} +// tslint:disable-next-line: no-require-imports no-var-requires +const chaiAsPromised = require('chai-as-promised'); +chai.use(chaiAsPromised); + +const { expect } = chai; describe('MessageQueue', () => { // Initialize new stubbed cache - let data: StorageItem; const sandbox = sinon.createSandbox(); const ourDevice = TestUtils.generateFakePubKey(); const ourNumber = ourDevice.key; - const pairedDevices = TestUtils.generateFakePubKeys(2); // Initialize new stubbed queue + let pendingMessageCache: PendingMessageCacheStub; let messageQueueStub: MessageQueue; - // Spies - let sendMessageToDevicesSpy: sinon.SinonSpy< - [Array, ContentMessage], - Promise> - >; - let sendSyncMessageSpy: sinon.SinonSpy< - [ContentMessage, Array], - Promise> - >; - let sendToGroupSpy: sinon.SinonSpy< - [OpenGroupMessage | ClosedGroupMessage], - Promise - >; - // Message Sender Stubs let sendStub: sinon.SinonStub<[RawMessage, (number | undefined)?]>; - let sendToOpenGroupStub: sinon.SinonStub<[OpenGroupMessage]>; // Utils Stubs - let groupMembersStub: sinon.SinonStub; - let canSyncStub: sinon.SinonStub<[ContentMessage], boolean>; + let isMediumGroupStub: sinon.SinonStub<[string], boolean>; // Session Protocol Stubs let hasSessionStub: sinon.SinonStub<[PubKey]>; let sendSessionRequestIfNeededStub: sinon.SinonStub<[PubKey], Promise>; beforeEach(async () => { - // Stub out methods which touch the database - const storageID = 'pendingMessages'; - data = { - id: storageID, - value: '[]', - }; - - // Pending Message Cache Data Stubs - TestUtils.stubData('getItemById') - .withArgs('pendingMessages') - .resolves(data); - TestUtils.stubData('createOrUpdateItem').callsFake((item: StorageItem) => { - if (item.id === storageID) { - data = item; - } - }); - // Utils Stubs - canSyncStub = sandbox.stub(SyncMessageUtils, 'canSync'); - canSyncStub.returns(false); sandbox.stub(UserUtil, 'getCurrentDevicePubKey').resolves(ourNumber); - sandbox.stub(MultiDeviceProtocol, 'getAllDevices').resolves(pairedDevices); TestUtils.stubWindow('libsignal', { SignalProtocolAddress: sandbox.stub(), @@ -99,15 +59,11 @@ describe('MessageQueue', () => { // Message Sender Stubs sendStub = sandbox.stub(MessageSender, 'send').resolves(); - sendToOpenGroupStub = sandbox - .stub(MessageSender, 'sendToOpenGroup') - .resolves(true); // Group Utils Stubs - sandbox.stub(GroupUtils, 'isMediumGroup').returns(false); - groupMembersStub = sandbox - .stub(GroupUtils, 'getGroupMembers' as any) - .resolves(TestUtils.generateFakePubKeys(10)); + isMediumGroupStub = sandbox + .stub(GroupUtils, 'isMediumGroup') + .returns(false); // Session Protocol Stubs sandbox.stub(SessionProtocol, 'sendSessionRequest').resolves(); @@ -116,37 +72,9 @@ describe('MessageQueue', () => { .stub(SessionProtocol, 'sendSessionRequestIfNeeded') .resolves(); - // Pending Mesage Cache Stubs - const chatMessages = Array.from( - { length: 10 }, - TestUtils.generateChatMessage - ); - const rawMessage = toRawMessage( - TestUtils.generateFakePubKey(), - TestUtils.generateChatMessage() - ); - - sandbox.stub(PendingMessageCache.prototype, 'add').resolves(rawMessage); - sandbox.stub(PendingMessageCache.prototype, 'remove').resolves(); - sandbox - .stub(PendingMessageCache.prototype, 'getDevices') - .returns(TestUtils.generateFakePubKeys(10)); - sandbox - .stub(PendingMessageCache.prototype, 'getForDevice') - .returns( - chatMessages.map(m => toRawMessage(TestUtils.generateFakePubKey(), m)) - ); - - // Spies - sendSyncMessageSpy = sandbox.spy(MessageQueue.prototype, 'sendSyncMessage'); - sendMessageToDevicesSpy = sandbox.spy( - MessageQueue.prototype, - 'sendMessageToDevices' - ); - sendToGroupSpy = sandbox.spy(MessageQueue.prototype, 'sendToGroup'); - // Init Queue - messageQueueStub = new MessageQueue(); + pendingMessageCache = new PendingMessageCacheStub(); + messageQueueStub = new MessageQueue(pendingMessageCache); }); afterEach(() => { @@ -154,233 +82,314 @@ describe('MessageQueue', () => { sandbox.restore(); }); - describe('send', () => { - it('can send to a single device', async () => { - const device = TestUtils.generateFakePubKey(); - const message = TestUtils.generateChatMessage(); - - const promise = messageQueueStub.send(device, message); - await expect(promise).to.be.fulfilled; - }); - - it('can send sync message', async () => { - const devices = TestUtils.generateFakePubKeys(3); - const message = TestUtils.generateChatMessage(); - - const promise = messageQueueStub.sendSyncMessage(message, devices); - expect(promise).to.be.fulfilled; - }); - }); - describe('processPending', () => { it('will send session request message if no session', async () => { hasSessionStub.resolves(false); + isMediumGroupStub.returns(false); const device = TestUtils.generateFakePubKey(); - const promise = messageQueueStub.processPending(device); - await expect(promise).to.be.fulfilled; - expect(sendSessionRequestIfNeededStub.callCount).to.equal(1); + await messageQueueStub.processPending(device); + + const stubCallPromise = PromiseUtils.waitUntil( + () => sendSessionRequestIfNeededStub.callCount === 1 + ); + await expect(stubCallPromise).to.be.fulfilled; }); it('will send message if session exists', async () => { + hasSessionStub.resolves(true); + isMediumGroupStub.returns(false); + sendStub.resolves(); + const device = TestUtils.generateFakePubKey(); - const hasSession = await hasSessionStub(device); + await pendingMessageCache.add(device, TestUtils.generateChatMessage()); - const promise = messageQueueStub.processPending(device); - await expect(promise).to.be.fulfilled; + const successPromise = PromiseUtils.waitForTask(done => { + messageQueueStub.events.once('success', done); + }); - expect(hasSession).to.equal(true, 'session does not exist'); - expect(sendSessionRequestIfNeededStub.callCount).to.equal(0); + await messageQueueStub.processPending(device); + await expect(successPromise).to.be.fulfilled; + expect(sendSessionRequestIfNeededStub.called).to.equal( + false, + 'Session request triggered when we have a session.' + ); }); - }); - describe('sendUsingMultiDevice', () => { - it('can send using multidevice', async () => { + it('will send message if sending to medium group', async () => { + isMediumGroupStub.returns(true); + sendStub.resolves(); + const device = TestUtils.generateFakePubKey(); - const message = TestUtils.generateChatMessage(); + await pendingMessageCache.add(device, TestUtils.generateChatMessage()); - const promise = messageQueueStub.sendUsingMultiDevice(device, message); - await expect(promise).to.be.fulfilled; + const successPromise = PromiseUtils.waitForTask(done => { + messageQueueStub.events.once('success', done); + }); - // Ensure the arguments passed into sendMessageToDevices are correct - const previousArgs = sendMessageToDevicesSpy.lastCall.args as [ - Array, - ChatMessage - ]; + await messageQueueStub.processPending(device); + await expect(successPromise).to.be.fulfilled; + expect(sendSessionRequestIfNeededStub.called).to.equal( + false, + 'Session request triggered on medium group' + ); + }); - // Check that instances are equal - expect(previousArgs).to.have.length(2); + it('should remove message from cache', async () => { + hasSessionStub.resolves(true); + isMediumGroupStub.returns(false); + + const events = ['success', 'fail']; + for (const event of events) { + if (event === 'success') { + sendStub.resolves(); + } else { + sendStub.throws(new Error('fail')); + } + + const device = TestUtils.generateFakePubKey(); + await pendingMessageCache.add(device, TestUtils.generateChatMessage()); + + const initialMessages = await pendingMessageCache.getForDevice(device); + expect(initialMessages).to.have.length(1); + await messageQueueStub.processPending(device); + + const promise = PromiseUtils.waitUntil(async () => { + const messages = await pendingMessageCache.getForDevice(device); + return messages.length === 0; + }); + await expect(promise).to.be.fulfilled; + } + }).timeout(15000); - const argsPairedDevices = previousArgs[0]; - const argsChatMessage = previousArgs[1]; + describe('events', () => { + it('should send a success event if message was sent', async () => { + hasSessionStub.resolves(true); + isMediumGroupStub.returns(false); + sendStub.resolves(); - expect(argsChatMessage instanceof ChatMessage).to.equal( - true, - 'message passed into sendMessageToDevices was not a valid ChatMessage' - ); - expect(argsChatMessage.isEqual(message)).to.equal( - true, - 'message passed into sendMessageToDevices has been mutated' - ); + const device = TestUtils.generateFakePubKey(); + const message = TestUtils.generateChatMessage(); + await pendingMessageCache.add(device, message); - argsPairedDevices.forEach((argsPaired: PubKey, index: number) => { - expect(argsPaired instanceof PubKey).to.equal( - true, - 'a device passed into sendMessageToDevices was not a PubKey' - ); - expect(argsPaired.isEqual(pairedDevices[index])).to.equal( - true, - 'a device passed into sendMessageToDevices did not match MessageDeviceProtocol.getAllDevices' - ); + const eventPromise = PromiseUtils.waitForTask< + RawMessage | OpenGroupMessage + >(complete => { + messageQueueStub.events.once('success', complete); + }); + + await messageQueueStub.processPending(device); + await expect(eventPromise).to.be.fulfilled; + + const rawMessage = await eventPromise; + expect(rawMessage.identifier).to.equal(message.identifier); + }); + + it('should send a fail event if something went wrong while sending', async () => { + hasSessionStub.resolves(true); + isMediumGroupStub.returns(false); + sendStub.throws(new Error('failure')); + + const spy = sandbox.spy(); + messageQueueStub.events.on('fail', spy); + + const device = TestUtils.generateFakePubKey(); + const message = TestUtils.generateChatMessage(); + await pendingMessageCache.add(device, message); + + const eventPromise = PromiseUtils.waitForTask< + [RawMessage | OpenGroupMessage, Error] + >(complete => { + messageQueueStub.events.once('fail', (...args) => { + complete(args); + }); + }); + + await messageQueueStub.processPending(device); + await expect(eventPromise).to.be.fulfilled; + + const [rawMessage, error] = await eventPromise; + expect(rawMessage.identifier).to.equal(message.identifier); + expect(error.message).to.equal('failure'); }); }); }); + describe('sendUsingMultiDevice', () => { + it('should send the message to all the devices', async () => { + const devices = TestUtils.generateFakePubKeys(3); + sandbox.stub(MultiDeviceProtocol, 'getAllDevices').resolves(devices); + const stub = sandbox + .stub(messageQueueStub, 'sendMessageToDevices') + .resolves(); + + const message = TestUtils.generateChatMessage(); + await messageQueueStub.sendUsingMultiDevice(devices[0], message); + + const args = stub.lastCall.args as [Array, ContentMessage]; + expect(args[0]).to.have.same.members(devices); + expect(args[1]).to.equal(message); + }); + }); + describe('sendMessageToDevices', () => { it('can send to many devices', async () => { - const devices = TestUtils.generateFakePubKeys(10); + hasSessionStub.resolves(false); + + const devices = TestUtils.generateFakePubKeys(5); const message = TestUtils.generateChatMessage(); - const promise = messageQueueStub.sendMessageToDevices(devices, message); - await expect(promise).to.be.fulfilled; + await messageQueueStub.sendMessageToDevices(devices, message); + expect(pendingMessageCache.getCache()).to.have.length(devices.length); }); - it('can send sync message and confirm canSync is valid', async () => { - canSyncStub.returns(true); + it('should send sync message if possible', async () => { + hasSessionStub.returns(false); - const devices = TestUtils.generateFakePubKeys(3); - const message = TestUtils.generateChatMessage(); - const pairedDeviceKeys = pairedDevices.map(device => device.key); + sandbox.stub(SyncMessageUtils, 'canSync').returns(true); + + sandbox + .stub(SyncMessageUtils, 'from') + .returns(new TestSyncMessage({ timestamp: Date.now() })); - const promise = messageQueueStub.sendMessageToDevices(devices, message); - await expect(promise).to.be.fulfilled; + // This stub ensures that the message won't process + const sendSyncMessageStub = sandbox + .stub(messageQueueStub, 'sendSyncMessage') + .resolves(); - // Check sendSyncMessage parameters - const previousArgs = sendSyncMessageSpy.lastCall.args as [ - ChatMessage, - Array - ]; - expect(sendSyncMessageSpy.callCount).to.equal(1); + const ourDevices = [ourDevice, ...TestUtils.generateFakePubKeys(2)]; + sandbox + .stub(MultiDeviceProtocol, 'getAllDevices') + .callsFake(async user => { + if (ourDevice.isEqual(user)) { + return ourDevices; + } - // Check that instances are equal - expect(previousArgs).to.have.length(2); + return []; + }); - const argsChatMessage = previousArgs[0]; - const argsPairedKeys = [...previousArgs[1]].map(d => d.key); + const devices = [...ourDevices, ...TestUtils.generateFakePubKeys(3)]; + const message = TestUtils.generateChatMessage(); - expect(argsChatMessage instanceof ChatMessage).to.equal( + await messageQueueStub.sendMessageToDevices(devices, message); + expect(sendSyncMessageStub.called).to.equal( true, - 'message passed into sendMessageToDevices was not a valid ChatMessage' + 'sendSyncMessage was not called.' ); - expect(argsChatMessage.isEqual(message)).to.equal( - true, - 'message passed into sendMessageToDevices has been mutated' + expect( + pendingMessageCache.getCache().map(c => c.device) + ).to.not.have.members( + ourDevices.map(d => d.key), + 'Sending regular messages to our own device is not allowed.' ); - - // argsPairedKeys and pairedDeviceKeys should contain the same values - const keyArgsValid = _.isEmpty(_.xor(argsPairedKeys, pairedDeviceKeys)); - expect(keyArgsValid).to.equal( - true, - 'devices passed into sendSyncMessage were invalid' + expect(pendingMessageCache.getCache()).to.have.length( + devices.length - ourDevices.length, + 'Messages should not be sent to our devices.' ); }); }); - describe('sendToGroup', () => { - it('can send to closed group', async () => { - const message = TestUtils.generateClosedGroupMessage(); - const success = await messageQueueStub.sendToGroup(message); - expect(success).to.equal(true, 'sending to group failed'); - }); - - it('uses correct parameters for sendToGroup with ClosedGroupMessage', async () => { - const message = TestUtils.generateClosedGroupMessage(); - const success = await messageQueueStub.sendToGroup(message); - - expect(success).to.equal(true, 'sending to group failed'); - - // Check parameters - const previousArgs = sendMessageToDevicesSpy.lastCall.args as [ - Array, - ClosedGroupMessage - ]; - expect(previousArgs).to.have.length(2); - - const argsClosedGroupMessage = previousArgs[1]; - expect(argsClosedGroupMessage instanceof ClosedGroupMessage).to.equal( - true, - 'message passed into sendMessageToDevices was not a ClosedGroupMessage' - ); - }); + describe('sendSyncMessage', () => { + it('should send a message to all our devices', async () => { + hasSessionStub.resolves(false); - it("won't send to invalid groupId", async () => { - const message = TestUtils.generateClosedGroupMessage('invalid-group-id'); - const success = await messageQueueStub.sendToGroup(message); + const ourOtherDevices = TestUtils.generateFakePubKeys(2); + const ourDevices = [ourDevice, ...ourOtherDevices]; + sandbox.stub(MultiDeviceProtocol, 'getAllDevices').resolves(ourDevices); - // Ensure message parameter passed into sendToGroup is as expected - expect(success).to.equal( - false, - 'an invalid groupId was treated as valid' + await messageQueueStub.sendSyncMessage( + new TestSyncMessage({ timestamp: Date.now() }) ); - expect(sendToGroupSpy.callCount).to.equal(1); - const argsMessage = sendToGroupSpy.lastCall.args[0]; - expect(argsMessage instanceof ClosedGroupMessage).to.equal( - true, - 'message passed into sendToGroup was not a ClosedGroupMessage' + expect(pendingMessageCache.getCache()).to.have.length( + ourOtherDevices.length ); - expect(success).to.equal( - false, - 'invalid ClosedGroupMessage was propogated through sendToGroup' + expect(pendingMessageCache.getCache().map(c => c.device)).to.have.members( + ourOtherDevices.map(d => d.key) ); }); + }); - it('wont send message to empty closed group', async () => { - groupMembersStub.resolves(TestUtils.generateFakePubKeys(0)); - - const message = TestUtils.generateClosedGroupMessage(); - const response = await messageQueueStub.sendToGroup(message); + describe('sendToGroup', () => { + describe('closed groups', async () => { + it('can send to closed group', async () => { + const members = TestUtils.generateFakePubKeys(4).map( + p => new PrimaryPubKey(p.key) + ); + sandbox.stub(GroupUtils, 'getGroupMembers').resolves(members); - expect(response).to.equal( - false, - 'sendToGroup send a message to an empty group' - ); - }); + const sendUsingMultiDeviceStub = sandbox + .stub(messageQueueStub, 'sendUsingMultiDevice') + .resolves(); - it('can send to open group', async () => { - const message = TestUtils.generateOpenGroupMessage(); - const success = await messageQueueStub.sendToGroup(message); + const message = TestUtils.generateClosedGroupMessage(); + const success = await messageQueueStub.sendToGroup(message); + expect(success).to.equal(true, 'sending to group failed'); + expect(sendUsingMultiDeviceStub.callCount).to.equal(members.length); - expect(success).to.equal(true, 'sending to group failed'); - }); - }); + const arg = sendUsingMultiDeviceStub.getCall(0).args; + expect(arg[1] instanceof ClosedGroupMessage).to.equal( + true, + 'message sent to group member was not a ClosedGroupMessage' + ); + }); - describe('events', () => { - it('can send events on message sending success', async () => { - const successSpy = sandbox.spy(); - messageQueueStub.events.on('success', successSpy); + it('wont send message to empty closed group', async () => { + sandbox.stub(GroupUtils, 'getGroupMembers').resolves([]); + const sendUsingMultiDeviceStub = sandbox + .stub(messageQueueStub, 'sendUsingMultiDevice') + .resolves(); - const device = TestUtils.generateFakePubKey(); - const promise = messageQueueStub.processPending(device); - await expect(promise).to.be.fulfilled; + const message = TestUtils.generateClosedGroupMessage(); + const response = await messageQueueStub.sendToGroup(message); - await tick(); - expect(successSpy.callCount).to.equal(1); + expect(response).to.equal( + false, + 'sendToGroup sent a message to an empty group' + ); + expect(sendUsingMultiDeviceStub.callCount).to.equal(0); + }); }); - it('can send events on message sending failure', async () => { - sendStub.throws(new Error('Failed to send message.')); + describe('open groups', async () => { + let sendToOpenGroupStub: sinon.SinonStub< + [OpenGroupMessage], + Promise + >; + beforeEach(() => { + sendToOpenGroupStub = sandbox + .stub(MessageSender, 'sendToOpenGroup') + .resolves(true); + }); - const failureSpy = sandbox.spy(); - messageQueueStub.events.on('fail', failureSpy); + it('can send to open group', async () => { + const message = TestUtils.generateOpenGroupMessage(); + const success = await messageQueueStub.sendToGroup(message); + expect(sendToOpenGroupStub.callCount).to.equal(1); + expect(success).to.equal(true, 'Sending to open group failed'); + }); - const device = TestUtils.generateFakePubKey(); - const promise = messageQueueStub.processPending(device); - await expect(promise).to.be.fulfilled; + it('should emit a success event when send was successful', async () => { + const message = TestUtils.generateOpenGroupMessage(); + const eventPromise = PromiseUtils.waitForTask(complete => { + messageQueueStub.events.once('success', complete); + }, 2000); + + await messageQueueStub.sendToGroup(message); + await expect(eventPromise).to.be.fulfilled; + }); + + it('should emit a fail event if something went wrong', async () => { + sendToOpenGroupStub.resolves(false); + const message = TestUtils.generateOpenGroupMessage(); + const eventPromise = PromiseUtils.waitForTask(complete => { + messageQueueStub.events.once('fail', complete); + }, 2000); - await tick(); - expect(failureSpy.callCount).to.equal(1); + await messageQueueStub.sendToGroup(message); + await expect(eventPromise).to.be.fulfilled; + }); }); }); }); diff --git a/ts/test/session/sending/PendingMessageCache_test.ts b/ts/test/session/sending/PendingMessageCache_test.ts index 5ed1e211e..5f806b5dc 100644 --- a/ts/test/session/sending/PendingMessageCache_test.ts +++ b/ts/test/session/sending/PendingMessageCache_test.ts @@ -36,7 +36,6 @@ describe('PendingMessageCache', () => { }); pendingMessageCacheStub = new PendingMessageCache(); - await pendingMessageCacheStub.isReady; }); afterEach(() => { @@ -44,7 +43,7 @@ describe('PendingMessageCache', () => { }); it('can initialize cache', async () => { - const cache = pendingMessageCacheStub.getAllPending(); + const cache = await pendingMessageCacheStub.getAllPending(); // We expect the cache to initialise as an empty array expect(cache).to.be.instanceOf(Array); @@ -59,7 +58,7 @@ describe('PendingMessageCache', () => { await pendingMessageCacheStub.add(device, message); // Verify that the message is in the cache - const finalCache = pendingMessageCacheStub.getAllPending(); + const finalCache = await pendingMessageCacheStub.getAllPending(); expect(finalCache).to.have.length(1); @@ -68,6 +67,22 @@ describe('PendingMessageCache', () => { expect(addedMessage.timestamp).to.deep.equal(rawMessage.timestamp); }); + it('can add multiple messages belonging to the same user', async () => { + const device = TestUtils.generateFakePubKey(); + + await pendingMessageCacheStub.add(device, TestUtils.generateChatMessage()); + // We have to timeout here otherwise it's processed too fast and messages start having the same timestamp + await TestUtils.timeout(5); + await pendingMessageCacheStub.add(device, TestUtils.generateChatMessage()); + await TestUtils.timeout(5); + await pendingMessageCacheStub.add(device, TestUtils.generateChatMessage()); + + // Verify that the message is in the cache + const finalCache = await pendingMessageCacheStub.getAllPending(); + + expect(finalCache).to.have.length(3); + }); + it('can remove from cache', async () => { const device = TestUtils.generateFakePubKey(); const message = TestUtils.generateChatMessage(); @@ -75,18 +90,47 @@ describe('PendingMessageCache', () => { await pendingMessageCacheStub.add(device, message); - const initialCache = pendingMessageCacheStub.getAllPending(); + const initialCache = await pendingMessageCacheStub.getAllPending(); expect(initialCache).to.have.length(1); // Remove the message await pendingMessageCacheStub.remove(rawMessage); - const finalCache = pendingMessageCacheStub.getAllPending(); + const finalCache = await pendingMessageCacheStub.getAllPending(); // Verify that the message was removed expect(finalCache).to.have.length(0); }); + it('should only remove messages with different timestamp and device', async () => { + const device = TestUtils.generateFakePubKey(); + const message = TestUtils.generateChatMessage(); + const rawMessage = MessageUtils.toRawMessage(device, message); + + await pendingMessageCacheStub.add(device, message); + await TestUtils.timeout(5); + const one = await pendingMessageCacheStub.add( + device, + TestUtils.generateChatMessage(message.identifier) + ); + const two = await pendingMessageCacheStub.add( + TestUtils.generateFakePubKey(), + message + ); + + const initialCache = await pendingMessageCacheStub.getAllPending(); + expect(initialCache).to.have.length(3); + + // Remove the message + await pendingMessageCacheStub.remove(rawMessage); + + const finalCache = await pendingMessageCacheStub.getAllPending(); + + // Verify that the message was removed + expect(finalCache).to.have.length(2); + expect(finalCache).to.have.deep.members([one, two]); + }); + it('can get devices', async () => { const cacheItems = [ { @@ -103,16 +147,16 @@ describe('PendingMessageCache', () => { }, ]; - cacheItems.forEach(async item => { + for (const item of cacheItems) { await pendingMessageCacheStub.add(item.device, item.message); - }); + } - const cache = pendingMessageCacheStub.getAllPending(); + const cache = await pendingMessageCacheStub.getAllPending(); expect(cache).to.have.length(cacheItems.length); // Get list of devices const devicesKeys = cacheItems.map(item => item.device.key); - const pulledDevices = pendingMessageCacheStub.getDevices(); + const pulledDevices = await pendingMessageCacheStub.getDevices(); const pulledDevicesKeys = pulledDevices.map(d => d.key); // Verify that device list from cache is equivalent to devices added @@ -131,21 +175,21 @@ describe('PendingMessageCache', () => { }, ]; - cacheItems.forEach(async item => { + for (const item of cacheItems) { await pendingMessageCacheStub.add(item.device, item.message); - }); + } - const initialCache = pendingMessageCacheStub.getAllPending(); + const initialCache = await pendingMessageCacheStub.getAllPending(); expect(initialCache).to.have.length(cacheItems.length); // Get pending for each specific device - cacheItems.forEach(item => { - const pendingForDevice = pendingMessageCacheStub.getForDevice( + for (const item of cacheItems) { + const pendingForDevice = await pendingMessageCacheStub.getForDevice( item.device ); expect(pendingForDevice).to.have.length(1); expect(pendingForDevice[0].device).to.equal(item.device.key); - }); + } }); it('can find nothing when empty', async () => { @@ -164,7 +208,7 @@ describe('PendingMessageCache', () => { await pendingMessageCacheStub.add(device, message); - const finalCache = pendingMessageCacheStub.getAllPending(); + const finalCache = await pendingMessageCacheStub.getAllPending(); expect(finalCache).to.have.length(1); const foundMessage = pendingMessageCacheStub.find(rawMessage); @@ -188,17 +232,17 @@ describe('PendingMessageCache', () => { }, ]; - cacheItems.forEach(async item => { + for (const item of cacheItems) { await pendingMessageCacheStub.add(item.device, item.message); - }); + } - const initialCache = pendingMessageCacheStub.getAllPending(); + const initialCache = await pendingMessageCacheStub.getAllPending(); expect(initialCache).to.have.length(cacheItems.length); // Clear cache await pendingMessageCacheStub.clear(); - const finalCache = pendingMessageCacheStub.getAllPending(); + const finalCache = await pendingMessageCacheStub.getAllPending(); expect(finalCache).to.have.length(0); }); @@ -218,21 +262,20 @@ describe('PendingMessageCache', () => { }, ]; - cacheItems.forEach(async item => { + for (const item of cacheItems) { await pendingMessageCacheStub.add(item.device, item.message); - }); + } - const addedMessages = pendingMessageCacheStub.getAllPending(); + const addedMessages = await pendingMessageCacheStub.getAllPending(); expect(addedMessages).to.have.length(cacheItems.length); // Rebuild from DB const freshCache = new PendingMessageCache(); - await freshCache.isReady; // Verify messages - const rebuiltMessages = freshCache.getAllPending(); + const rebuiltMessages = await freshCache.getAllPending(); - rebuiltMessages.forEach((message, index) => { + for (const [index, message] of rebuiltMessages.entries()) { const addedMessage = addedMessages[index]; // Pull out plainTextBuffer for a separate check @@ -254,6 +297,6 @@ describe('PendingMessageCache', () => { true, 'cached messages were not rebuilt properly' ); - }); + } }); }); diff --git a/ts/test/session/utils/JobQueue_test.ts b/ts/test/session/utils/JobQueue_test.ts index 60ddd9c09..379641ae4 100644 --- a/ts/test/session/utils/JobQueue_test.ts +++ b/ts/test/session/utils/JobQueue_test.ts @@ -1,7 +1,7 @@ import chai from 'chai'; import { v4 as uuid } from 'uuid'; import { JobQueue } from '../../../session/utils/JobQueue'; -import { timeout } from '../../test-utils'; +import { TestUtils } from '../../test-utils'; // tslint:disable-next-line: no-require-imports no-var-requires const chaiAsPromised = require('chai-as-promised'); @@ -16,7 +16,7 @@ describe('JobQueue', () => { const id = 'jobId'; assert.isFalse(queue.has(id)); - const promise = queue.addWithId(id, async () => timeout(100)); + const promise = queue.addWithId(id, async () => TestUtils.timeout(100)); assert.isTrue(queue.has(id)); await promise; assert.isFalse(queue.has(id)); @@ -33,7 +33,7 @@ describe('JobQueue', () => { const queue = new JobQueue(); const mapper = async ([value, ms]: Array): Promise => queue.addWithId(uuid(), async () => { - await timeout(ms); + await TestUtils.timeout(ms); return value; }); @@ -55,12 +55,12 @@ describe('JobQueue', () => { it('should return the result of the job', async () => { const queue = new JobQueue(); const success = queue.addWithId(uuid(), async () => { - await timeout(100); + await TestUtils.timeout(100); return 'success'; }); const failure = queue.addWithId(uuid(), async () => { - await timeout(100); + await TestUtils.timeout(100); throw new Error('failed'); }); @@ -72,7 +72,7 @@ describe('JobQueue', () => { const queue = new JobQueue(); const first = queue.addWithId(uuid(), () => 'first'); const second = queue.addWithId(uuid(), async () => { - await timeout(100); + await TestUtils.timeout(100); return 'second'; }); @@ -89,7 +89,7 @@ describe('JobQueue', () => { const queue = new JobQueue(); const id = uuid(); const job = async () => { - await timeout(100); + await TestUtils.timeout(100); return 'job1'; }; @@ -104,13 +104,15 @@ describe('JobQueue', () => { const queue = new JobQueue(); const id = uuid(); - const successfullJob = queue.addWithId(id, async () => timeout(100)); + const successfullJob = queue.addWithId(id, async () => + TestUtils.timeout(100) + ); assert.isTrue(queue.has(id)); await successfullJob; assert.isFalse(queue.has(id)); const failJob = queue.addWithId(id, async () => { - await timeout(100); + await TestUtils.timeout(100); throw new Error('failed'); }); assert.isTrue(queue.has(id)); diff --git a/ts/test/test-utils/index.ts b/ts/test/test-utils/index.ts index fae05690f..d8ec69320 100644 --- a/ts/test/test-utils/index.ts +++ b/ts/test/test-utils/index.ts @@ -1,5 +1,4 @@ -import * as TestUtils from './testUtils'; +import * as TestUtils from './utils'; import * as Stubs from './stubs'; -export * from './timeout'; export { TestUtils, Stubs }; diff --git a/ts/test/test-utils/stubs/index.ts b/ts/test/test-utils/stubs/index.ts index 10ad19f0e..d287adc26 100644 --- a/ts/test/test-utils/stubs/index.ts +++ b/ts/test/test-utils/stubs/index.ts @@ -1 +1,2 @@ export * from './ciphers'; +export * from './sending'; diff --git a/ts/test/test-utils/stubs/messages/TestSyncMessage.ts b/ts/test/test-utils/stubs/messages/TestSyncMessage.ts new file mode 100644 index 000000000..c82eecfa4 --- /dev/null +++ b/ts/test/test-utils/stubs/messages/TestSyncMessage.ts @@ -0,0 +1,7 @@ +import { SyncMessage } from '../../../../session/messages/outgoing'; +import { SignalService } from '../../../../protobuf'; +export class TestSyncMessage extends SyncMessage { + protected syncProto(): SignalService.SyncMessage { + return SignalService.SyncMessage.create({}); + } +} diff --git a/ts/test/test-utils/stubs/sending/PendingMessageCacheStub.ts b/ts/test/test-utils/stubs/sending/PendingMessageCacheStub.ts new file mode 100644 index 000000000..fa1fc5129 --- /dev/null +++ b/ts/test/test-utils/stubs/sending/PendingMessageCacheStub.ts @@ -0,0 +1,22 @@ +import { PendingMessageCache } from '../../../../session/sending'; +import { RawMessage } from '../../../../session/types'; + +export class PendingMessageCacheStub extends PendingMessageCache { + public dbData: Array; + constructor(dbData: Array = []) { + super(); + this.dbData = dbData; + } + + public getCache(): Readonly> { + return this.cache; + } + + protected async getFromStorage() { + return this.dbData; + } + + protected async saveToDB() { + return; + } +} diff --git a/ts/test/test-utils/stubs/sending/index.ts b/ts/test/test-utils/stubs/sending/index.ts new file mode 100644 index 000000000..e9def4705 --- /dev/null +++ b/ts/test/test-utils/stubs/sending/index.ts @@ -0,0 +1 @@ +export * from './PendingMessageCacheStub'; diff --git a/ts/test/test-utils/testUtils.ts b/ts/test/test-utils/testUtils.ts deleted file mode 100644 index ef2efc0fb..000000000 --- a/ts/test/test-utils/testUtils.ts +++ /dev/null @@ -1,126 +0,0 @@ -import * as sinon from 'sinon'; -import * as crypto from 'crypto'; -import * as window from '../../window'; -import * as DataShape from '../../../js/modules/data'; -import { v4 as uuid } from 'uuid'; - -import { PubKey } from '../../../ts/session/types'; -import { - ChatMessage, - ClosedGroupChatMessage, - OpenGroupMessage, -} from '../../session/messages/outgoing'; -import { OpenGroup } from '../../session/types/OpenGroup'; - -const globalAny: any = global; -const sandbox = sinon.createSandbox(); - -// We have to do this in a weird way because Data uses module.exports -// which doesn't play well with sinon or ImportMock -// tslint:disable-next-line: no-require-imports no-var-requires -const Data = require('../../../js/modules/data'); -type DataFunction = typeof DataShape; - -/** - * Stub a function inside Data. - * - * Note: This uses a custom sandbox. - * Please call `restoreStubs()` or `stub.restore()` to restore original functionality. - */ -export function stubData(fn: K): sinon.SinonStub { - return sandbox.stub(Data, fn); -} - -type WindowValue = Partial | undefined; - -/** - * Stub a window object. - * - * Note: This uses a custom sandbox. - * Please call `restoreStubs()` or `stub.restore()` to restore original functionality. - */ -export function stubWindow( - fn: K, - value: WindowValue -) { - // tslint:disable-next-line: no-typeof-undefined - if (typeof globalAny.window === 'undefined') { - globalAny.window = {}; - } - - const set = (newValue: WindowValue) => { - globalAny.window[fn] = newValue; - }; - - const get = () => { - return globalAny.window[fn] as WindowValue; - }; - - globalAny.window[fn] = value; - - return { - get, - set, - }; -} - -export function restoreStubs() { - globalAny.window = undefined; - sandbox.restore(); -} - -export function generateFakePubKey(): PubKey { - // Generates a mock pubkey for testing - const numBytes = PubKey.PUBKEY_LEN / 2 - 1; - const hexBuffer = crypto.randomBytes(numBytes).toString('hex'); - const pubkeyString = `05${hexBuffer}`; - - return new PubKey(pubkeyString); -} - -export function generateFakePubKeys(amount: number): Array { - const numPubKeys = amount > 0 ? Math.floor(amount) : 0; - - // tslint:disable-next-line: no-unnecessary-callback-wrapper - return new Array(numPubKeys).fill(0).map(() => generateFakePubKey()); -} - -export function generateChatMessage(): ChatMessage { - return new ChatMessage({ - body: 'Lorem ipsum dolor sit amet, consectetur adipiscing elit', - identifier: uuid(), - timestamp: Date.now(), - attachments: undefined, - quote: undefined, - expireTimer: undefined, - lokiProfile: undefined, - preview: undefined, - }); -} - -export function generateOpenGroupMessage(): OpenGroupMessage { - const group = new OpenGroup({ - server: 'chat.example.server', - channel: 0, - conversationId: '0', - }); - - return new OpenGroupMessage({ - timestamp: Date.now(), - group, - attachments: undefined, - preview: undefined, - body: 'Lorem ipsum dolor sit amet, consectetur adipiscing elit', - quote: undefined, - }); -} - -export function generateClosedGroupMessage( - groupId?: string -): ClosedGroupChatMessage { - return new ClosedGroupChatMessage({ - identifier: uuid(), - groupId: groupId ?? generateFakePubKey().key, - chatMessage: generateChatMessage(), - }); -} diff --git a/ts/test/test-utils/utils/index.ts b/ts/test/test-utils/utils/index.ts new file mode 100644 index 000000000..7cfc3adc3 --- /dev/null +++ b/ts/test/test-utils/utils/index.ts @@ -0,0 +1,4 @@ +export * from './timeout'; +export * from './stubbing'; +export * from './pubkey'; +export * from './message'; diff --git a/ts/test/test-utils/utils/message.ts b/ts/test/test-utils/utils/message.ts new file mode 100644 index 000000000..e2e71504a --- /dev/null +++ b/ts/test/test-utils/utils/message.ts @@ -0,0 +1,48 @@ +import { + ChatMessage, + ClosedGroupChatMessage, + OpenGroupMessage, +} from '../../../session/messages/outgoing'; +import { v4 as uuid } from 'uuid'; +import { OpenGroup } from '../../../session/types'; +import { generateFakePubKey } from './pubkey'; + +export function generateChatMessage(identifier?: string): ChatMessage { + return new ChatMessage({ + body: 'Lorem ipsum dolor sit amet, consectetur adipiscing elit', + identifier: identifier ?? uuid(), + timestamp: Date.now(), + attachments: undefined, + quote: undefined, + expireTimer: undefined, + lokiProfile: undefined, + preview: undefined, + }); +} + +export function generateOpenGroupMessage(): OpenGroupMessage { + const group = new OpenGroup({ + server: 'chat.example.server', + channel: 0, + conversationId: '0', + }); + + return new OpenGroupMessage({ + timestamp: Date.now(), + group, + attachments: undefined, + preview: undefined, + body: 'Lorem ipsum dolor sit amet, consectetur adipiscing elit', + quote: undefined, + }); +} + +export function generateClosedGroupMessage( + groupId?: string +): ClosedGroupChatMessage { + return new ClosedGroupChatMessage({ + identifier: uuid(), + groupId: groupId ?? generateFakePubKey().key, + chatMessage: generateChatMessage(), + }); +} diff --git a/ts/test/test-utils/utils/pubkey.ts b/ts/test/test-utils/utils/pubkey.ts new file mode 100644 index 000000000..e5805b79c --- /dev/null +++ b/ts/test/test-utils/utils/pubkey.ts @@ -0,0 +1,18 @@ +import * as crypto from 'crypto'; +import { PubKey } from '../../../session/types'; + +export function generateFakePubKey(): PubKey { + // Generates a mock pubkey for testing + const numBytes = PubKey.PUBKEY_LEN / 2 - 1; + const hexBuffer = crypto.randomBytes(numBytes).toString('hex'); + const pubkeyString = `05${hexBuffer}`; + + return new PubKey(pubkeyString); +} + +export function generateFakePubKeys(amount: number): Array { + const numPubKeys = amount > 0 ? Math.floor(amount) : 0; + + // tslint:disable-next-line: no-unnecessary-callback-wrapper + return new Array(numPubKeys).fill(0).map(() => generateFakePubKey()); +} diff --git a/ts/test/test-utils/utils/stubbing.ts b/ts/test/test-utils/utils/stubbing.ts new file mode 100644 index 000000000..9391204d9 --- /dev/null +++ b/ts/test/test-utils/utils/stubbing.ts @@ -0,0 +1,60 @@ +import * as sinon from 'sinon'; +import * as crypto from 'crypto'; +import * as DataShape from '../../../../js/modules/data'; + +const globalAny: any = global; +const sandbox = sinon.createSandbox(); + +// We have to do this in a weird way because Data uses module.exports +// which doesn't play well with sinon or ImportMock +// tslint:disable-next-line: no-require-imports no-var-requires +const Data = require('../../../../js/modules/data'); +type DataFunction = typeof DataShape; + +/** + * Stub a function inside Data. + * + * Note: This uses a custom sandbox. + * Please call `restoreStubs()` or `stub.restore()` to restore original functionality. + */ +export function stubData(fn: K): sinon.SinonStub { + return sandbox.stub(Data, fn); +} + +type WindowValue = Partial | undefined; + +/** + * Stub a window object. + * + * Note: This uses a custom sandbox. + * Please call `restoreStubs()` or `stub.restore()` to restore original functionality. + */ +export function stubWindow( + fn: K, + value: WindowValue +) { + // tslint:disable-next-line: no-typeof-undefined + if (typeof globalAny.window === 'undefined') { + globalAny.window = {}; + } + + const set = (newValue: WindowValue) => { + globalAny.window[fn] = newValue; + }; + + const get = () => { + return globalAny.window[fn] as WindowValue; + }; + + globalAny.window[fn] = value; + + return { + get, + set, + }; +} + +export function restoreStubs() { + globalAny.window = undefined; + sandbox.restore(); +} diff --git a/ts/test/test-utils/timeout.ts b/ts/test/test-utils/utils/timeout.ts similarity index 100% rename from ts/test/test-utils/timeout.ts rename to ts/test/test-utils/utils/timeout.ts