diff --git a/ts/session/sending/MessageQueue.ts b/ts/session/sending/MessageQueue.ts index d5e62286b..f1c3a5525 100644 --- a/ts/session/sending/MessageQueue.ts +++ b/ts/session/sending/MessageQueue.ts @@ -63,7 +63,9 @@ export class MessageQueue implements MessageQueueInterface { const ourDevices = await MultiDeviceProtocol.getOurDevices(); // Remove our devices from currentDevices - currentDevices = currentDevices.filter(device => !ourDevices.some(d => device.isEqual(d))); + currentDevices = currentDevices.filter( + device => !ourDevices.some(d => device.isEqual(d)) + ); } const promises = currentDevices.map(async device => { @@ -129,7 +131,9 @@ export class MessageQueue implements MessageQueueInterface { } const ourDevices = await MultiDeviceProtocol.getOurDevices(); - const promises = ourDevices.map(async device => this.process(device, message)); + const promises = ourDevices.map(async device => + this.process(device, message) + ); return Promise.all(promises); } diff --git a/ts/session/sending/PendingMessageCache.ts b/ts/session/sending/PendingMessageCache.ts index fcc571cc6..a26cea8c6 100644 --- a/ts/session/sending/PendingMessageCache.ts +++ b/ts/session/sending/PendingMessageCache.ts @@ -67,7 +67,11 @@ export class PendingMessageCache { // Remove item from cache and sync with database const updatedCache = this.cache.filter( - cached => !(cached.device === message.device && cached.timestamp === message.timestamp) + cached => + !( + cached.device === message.device && + cached.timestamp === message.timestamp + ) ); this.cache = updatedCache; await this.saveToDB(); diff --git a/ts/session/utils/Promise.ts b/ts/session/utils/Promise.ts new file mode 100644 index 000000000..b106f6032 --- /dev/null +++ b/ts/session/utils/Promise.ts @@ -0,0 +1,118 @@ +type SimpleFunction = (arg: T) => void; +type Return = Promise | T; + +async function toPromise(value: Return): Promise { + return value instanceof Promise ? value : Promise.resolve(value); +} + +/** + * Create a promise which waits until `done` is called or until `timeout` period is reached. + * If `timeout` is reached then this will throw an Error. + * + * @param task The task to wait for. + * @param timeout The timeout period. + */ +export async function waitForTask( + task: (done: SimpleFunction) => Return, + timeout: number = 2000 +): Promise { + const timeoutPromise = new Promise((_, rej) => { + const wait = setTimeout(() => { + clearTimeout(wait); + rej(new Error('Task timed out.')); + }, timeout); + }); + + const taskPromise = new Promise(async (res, rej) => { + try { + await toPromise(task(res)); + } catch (e) { + rej(e); + } + }); + + return Promise.race([timeoutPromise, taskPromise]) as Promise; +} + +interface PollOptions { + timeout: number; + interval: number; +} + +/** + * Creates a promise which calls the `task` every `interval` until `done` is called or until `timeout` period is reached. + * If `timeout` is reached then this will throw an Error. + * + * @param check The check which runs every `interval` ms. + * @param options The polling options. + */ +export async function poll( + task: (done: SimpleFunction) => Return, + options: Partial = {} +): Promise { + const defaults: PollOptions = { + timeout: 2000, + interval: 1000, + }; + + const { timeout, interval } = { + ...defaults, + ...options, + }; + + const endTime = Date.now() + timeout; + let stop = false; + const finish = () => { + stop = true; + }; + + const _poll = async (resolve: any, reject: any) => { + if (stop) { + resolve(); + } else if (Date.now() >= endTime) { + finish(); + reject(new Error('Periodic check timeout')); + } else { + try { + await toPromise(task(finish)); + } catch (e) { + finish(); + reject(e); + return; + } + + setTimeout(() => { + void _poll(resolve, reject); + }, interval); + } + }; + + return new Promise((resolve, reject) => { + void _poll(resolve, reject); + }); +} + +/** + * Creates a promise which waits until `check` returns `true` or rejects if `timeout` preiod is reached. + * If `timeout` is reached then this will throw an Error. + * + * @param check The boolean check. + * @param timeout The time before an error is thrown. + */ +export async function waitUntil( + check: () => Return, + timeout: number = 2000 +) { + // This is causing unhandled promise rejection somewhere in MessageQueue tests + return poll( + async done => { + const result = await toPromise(check()); + if (result) { + done(); + } + }, + { + timeout, + } + ); +} diff --git a/ts/session/utils/index.ts b/ts/session/utils/index.ts index c619b8d2f..dde73e928 100644 --- a/ts/session/utils/index.ts +++ b/ts/session/utils/index.ts @@ -2,8 +2,15 @@ import * as MessageUtils from './Messages'; import * as GroupUtils from './Groups'; import * as SyncMessageUtils from './SyncMessageUtils'; import * as StringUtils from './String'; +import * as PromiseUtils from './Promise'; export * from './TypedEmitter'; export * from './JobQueue'; -export { MessageUtils, SyncMessageUtils, GroupUtils, StringUtils }; +export { + MessageUtils, + SyncMessageUtils, + GroupUtils, + StringUtils, + PromiseUtils, +}; diff --git a/ts/test/session/sending/MessageQueue_test.ts b/ts/test/session/sending/MessageQueue_test.ts index 739bae2cf..813ae20c6 100644 --- a/ts/test/session/sending/MessageQueue_test.ts +++ b/ts/test/session/sending/MessageQueue_test.ts @@ -1,7 +1,11 @@ import chai from 'chai'; import * as sinon from 'sinon'; -import * as _ from 'lodash'; -import { GroupUtils, SyncMessageUtils } from '../../../session/utils'; +import _ from 'lodash'; +import { + GroupUtils, + PromiseUtils, + SyncMessageUtils, +} from '../../../session/utils'; import { Stubs, TestUtils } from '../../../test/test-utils'; import { MessageQueue } from '../../../session/sending/MessageQueue'; import { @@ -81,25 +85,27 @@ describe('MessageQueue', () => { describe('processPending', () => { it('will send session request message if no session', async () => { hasSessionStub.resolves(false); - isMediumGroupStub.resolves(false); + isMediumGroupStub.returns(false); const device = TestUtils.generateFakePubKey(); - const stubCallPromise = TestUtils.waitUntil(() => sendSessionRequestIfNeededStub.callCount === 1); - await messageQueueStub.processPending(device); - expect(stubCallPromise).to.be.fulfilled; + + const stubCallPromise = PromiseUtils.waitUntil( + () => sendSessionRequestIfNeededStub.callCount === 1 + ); + await expect(stubCallPromise).to.be.fulfilled; }); it('will send message if session exists', async () => { hasSessionStub.resolves(true); - isMediumGroupStub.resolves(false); + isMediumGroupStub.returns(false); sendStub.resolves(); const device = TestUtils.generateFakePubKey(); await pendingMessageCache.add(device, TestUtils.generateChatMessage()); - const successPromise = TestUtils.waitForTask(done => { + const successPromise = PromiseUtils.waitForTask(done => { messageQueueStub.events.once('success', done); }); @@ -112,13 +118,13 @@ describe('MessageQueue', () => { }); it('will send message if sending to medium group', async () => { - isMediumGroupStub.resolves(true); + isMediumGroupStub.returns(true); sendStub.resolves(); const device = TestUtils.generateFakePubKey(); await pendingMessageCache.add(device, TestUtils.generateChatMessage()); - const successPromise = TestUtils.waitForTask(done => { + const successPromise = PromiseUtils.waitForTask(done => { messageQueueStub.events.once('success', done); }); @@ -132,7 +138,7 @@ describe('MessageQueue', () => { it('should remove message from cache', async () => { hasSessionStub.resolves(true); - isMediumGroupStub.resolves(false); + isMediumGroupStub.returns(false); const events = ['success', 'fail']; for (const event of events) { @@ -149,25 +155,27 @@ describe('MessageQueue', () => { expect(initialMessages).to.have.length(1); await messageQueueStub.processPending(device); - const promise = TestUtils.waitUntil(async () => { + const promise = PromiseUtils.waitUntil(async () => { const messages = await pendingMessageCache.getForDevice(device); return messages.length === 0; }); - expect(promise).to.be.fulfilled; + await expect(promise).to.be.fulfilled; } }); describe('events', () => { it('should send a success event if message was sent', async () => { hasSessionStub.resolves(true); - isMediumGroupStub.resolves(false); + isMediumGroupStub.returns(false); sendStub.resolves(); const device = TestUtils.generateFakePubKey(); const message = TestUtils.generateChatMessage(); await pendingMessageCache.add(device, message); - const eventPromise = TestUtils.waitForTask(complete => { + const eventPromise = PromiseUtils.waitForTask< + RawMessage | OpenGroupMessage + >(complete => { messageQueueStub.events.once('success', complete); }); @@ -180,7 +188,7 @@ describe('MessageQueue', () => { it('should send a fail event if something went wrong while sending', async () => { hasSessionStub.resolves(true); - isMediumGroupStub.resolves(false); + isMediumGroupStub.returns(false); sendStub.throws(new Error('failure')); const spy = sandbox.spy(); @@ -190,7 +198,9 @@ describe('MessageQueue', () => { const message = TestUtils.generateChatMessage(); await pendingMessageCache.add(device, message); - const eventPromise = TestUtils.waitForTask<[RawMessage | OpenGroupMessage, Error]>(complete => { + const eventPromise = PromiseUtils.waitForTask< + [RawMessage | OpenGroupMessage, Error] + >(complete => { messageQueueStub.events.once('fail', (...args) => { complete(args); }); @@ -231,8 +241,7 @@ describe('MessageQueue', () => { const message = TestUtils.generateChatMessage(); await messageQueueStub.sendMessageToDevices(devices, message); - const promise = TestUtils.waitUntil(() => pendingMessageCache.getCache().length === devices.length); - await expect(promise).to.be.fulfilled; + expect(pendingMessageCache.getCache()).to.have.length(devices.length); }); it('should send sync message if possible', async () => { @@ -268,7 +277,12 @@ describe('MessageQueue', () => { true, 'sendSyncMessage was not called.' ); - expect(pendingMessageCache.getCache().map(c => c.device)).to.not.have.members(ourDevices.map(d => d.key), 'Sending regular messages to our own device is not allowed.'); + expect( + pendingMessageCache.getCache().map(c => c.device) + ).to.not.have.members( + ourDevices.map(d => d.key), + 'Sending regular messages to our own device is not allowed.' + ); expect(pendingMessageCache.getCache()).to.have.length( devices.length - ourDevices.length, 'Messages should not be sent to our devices.' @@ -288,8 +302,12 @@ describe('MessageQueue', () => { new TestSyncMessage({ timestamp: Date.now() }) ); - expect(pendingMessageCache.getCache()).to.have.length(ourOtherDevices.length); - expect(pendingMessageCache.getCache().map(c => c.device)).to.have.members(ourOtherDevices.map(d => d.key)); + expect(pendingMessageCache.getCache()).to.have.length( + ourOtherDevices.length + ); + expect(pendingMessageCache.getCache().map(c => c.device)).to.have.members( + ourOtherDevices.map(d => d.key) + ); }); }); @@ -354,7 +372,7 @@ describe('MessageQueue', () => { it('should emit a success event when send was successful', async () => { const message = TestUtils.generateOpenGroupMessage(); - const eventPromise = TestUtils.waitForTask(complete => { + const eventPromise = PromiseUtils.waitForTask(complete => { messageQueueStub.events.once('success', complete); }, 2000); @@ -365,7 +383,7 @@ describe('MessageQueue', () => { it('should emit a fail event if something went wrong', async () => { sendToOpenGroupStub.resolves(false); const message = TestUtils.generateOpenGroupMessage(); - const eventPromise = TestUtils.waitForTask(complete => { + const eventPromise = PromiseUtils.waitForTask(complete => { messageQueueStub.events.once('fail', complete); }, 2000); diff --git a/ts/test/session/sending/PendingMessageCache_test.ts b/ts/test/session/sending/PendingMessageCache_test.ts index 2dacef6f7..dd6544364 100644 --- a/ts/test/session/sending/PendingMessageCache_test.ts +++ b/ts/test/session/sending/PendingMessageCache_test.ts @@ -3,7 +3,6 @@ import * as _ from 'lodash'; import { MessageUtils } from '../../../session/utils'; import { TestUtils, timeout } from '../../../test/test-utils'; import { PendingMessageCache } from '../../../session/sending/PendingMessageCache'; -import { initial } from 'lodash'; // Equivalent to Data.StorageItem interface StorageItem { @@ -271,7 +270,7 @@ describe('PendingMessageCache', () => { // Verify messages const rebuiltMessages = await freshCache.getAllPending(); - // tslint:disable-next-line: no-for-in no-for-in-array + for (const [index, message] of rebuiltMessages.entries()) { const addedMessage = addedMessages[index]; diff --git a/ts/test/test-utils/testUtils.ts b/ts/test/test-utils/testUtils.ts index 079148170..eb9644838 100644 --- a/ts/test/test-utils/testUtils.ts +++ b/ts/test/test-utils/testUtils.ts @@ -124,83 +124,3 @@ export function generateClosedGroupMessage( chatMessage: generateChatMessage(), }); } - -type ArgFunction = (arg: T) => void; -type MaybePromise = Promise | T; - -/** - * Create a promise which waits until `done` is called or until timeout period is reached. - * @param task The task to wait for. - * @param timeout The timeout period. - */ -// tslint:disable-next-line: no-shadowed-variable -export async function waitForTask(task: (done: ArgFunction) => MaybePromise, timeout: number = 2000): Promise { - const timeoutPromise = new Promise((_, rej) => { - const wait = setTimeout(() => { - clearTimeout(wait); - rej(new Error('Task timed out.')); - }, timeout); - }); - - // tslint:disable-next-line: no-shadowed-variable - const taskPromise = new Promise(async (res, rej) => { - try { - const taskReturn = task(res); - return taskReturn instanceof Promise ? taskReturn : Promise.resolve(taskReturn); - } catch (e) { - rej(e); - } - }); - - return Promise.race([timeoutPromise, taskPromise]) as Promise; -} - -/** - * Creates a promise which periodically calls the `check` until `done` is called or until timeout period is reached. - * @param check The check which runs every 100ms. - * @param timeout The time before an error is thrown. - */ -// tslint:disable-next-line: no-shadowed-variable -export async function periodicallyCheck(check: (done: ArgFunction) => MaybePromise, timeout: number = 1000): Promise { - return waitForTask(complete => { - let interval: NodeJS.Timeout | undefined; - const cleanup = () => { - if (interval) { - clearInterval(interval); - interval = undefined; - } - }; - setTimeout(cleanup, timeout); - - const onDone = () => { - complete(); - cleanup(); - }; - interval = setInterval(async () => { - try { - await toPromise(check(onDone)); - } catch (e) { - cleanup(); - throw e; - } - }, 100); - }, timeout); -} - -/** - * Creates a promise which waits until `check` returns `true` or rejects if timeout preiod is reached. - * @param check The boolean check. - * @param timeout The time before an error is thrown. - */ -export async function waitUntil(check: () => MaybePromise, timeout: number = 2000) { - return periodicallyCheck(async done => { - const result = await toPromise(check()); - if (result) { - done(); - } - }, timeout); -} - -async function toPromise(maybePromise: MaybePromise): Promise { - return maybePromise instanceof Promise ? maybePromise : Promise.resolve(maybePromise); -}