diff --git a/ts/session/sending/MessageQueue.ts b/ts/session/sending/MessageQueue.ts index 1efd26c95..c1892efd2 100644 --- a/ts/session/sending/MessageQueue.ts +++ b/ts/session/sending/MessageQueue.ts @@ -8,23 +8,33 @@ import { MessageQueueInterfaceEvents, GroupMessageType, } from './MessageQueueInterface'; -import { ContentMessage, OpenGroupMessage, SyncMessage, SessionResetMessage, ClosedGroupMessage } from '../messages/outgoing'; +import { + ContentMessage, + OpenGroupMessage, + SyncMessage, + SessionResetMessage, + ClosedGroupMessage, +} from '../messages/outgoing'; import { PendingMessageCache } from './PendingMessageCache'; -import { JobQueue, TypedEventEmitter, toRawMessage, toSyncMessage } from '../utils'; +import { + JobQueue, + TypedEventEmitter, + toRawMessage, + toSyncMessage, +} from '../utils'; import { PubKey } from '../types'; import { ConversationController } from '../../window'; import { MessageSender } from '.'; import { SessionProtocol } from '../protocols'; - export class MessageQueue implements MessageQueueInterface { public readonly events: TypedEventEmitter; private readonly jobQueues: Map = new Map(); - private readonly cache: PendingMessageCache; + private readonly pendingMessageCache: PendingMessageCache; constructor() { this.events = new EventEmitter(); - this.cache = new PendingMessageCache(); + this.pendingMessageCache = new PendingMessageCache(); void this.processAllPending(); } @@ -39,17 +49,19 @@ export class MessageQueue implements MessageQueueInterface { await this.sendMessageToDevices([device], message); } - public async sendMessageToDevices(devices: Array, message: ContentMessage) { + public async sendMessageToDevices( + devices: Array, + message: ContentMessage + ) { let currentDevices = [...devices]; - if (SyncMessage.canSync(message)) { + if (message.canSync(message)) { // Sync to our devices - const syncMessage = toSyncMessage.from(message); - await this.sendSyncMessage(syncMessage); + const syncMessage = toSyncMessage(message); + const ourDevices = await this.sendSyncMessage(syncMessage); // Remove our devices from currentDevices - const ourDevices = await this.getOurDevices(); - currentDevices = currentDevices.filter(device => !_.includes(ourDevices, device)); + currentDevices = _.xor(currentDevices, ourDevices); } currentDevices.forEach(async device => { @@ -58,8 +70,10 @@ export class MessageQueue implements MessageQueueInterface { } public async sendToGroup(message: OpenGroupMessage | ContentMessage) { - - if (!(message instanceof OpenGroupMessage) && !(message instanceof ClosedGroupMessage)) { + if ( + !(message instanceof OpenGroupMessage) && + !(message instanceof ClosedGroupMessage) + ) { return; } @@ -73,31 +87,31 @@ export class MessageQueue implements MessageQueueInterface { await this.sendMessageToDevices(recipients, message); } - // Open groups + // Open groups if (message instanceof OpenGroupMessage) { // No queue needed for Open Groups; send directly - } - } - public async sendSyncMessage(message: ContentMessage) { + public async sendSyncMessage( + message: ContentMessage + ): Promise> { // Sync with our devices - const syncMessage = toSyncMessage(); if (!syncMessage.canSync()) { return; } const ourDevices = await this.getOurDevices(); - ourDevices.forEach(async device => { await this.queue(device, message); }); + + return ourDevices; } public async processPending(device: PubKey) { - const messages = this.cache.getForDevice(device); + const messages = this.pendingMessageCache.getForDevice(device); const hasSession = SessionProtocol.hasSession(device); const conversation = ConversationController.get(device.key); @@ -114,18 +128,20 @@ export class MessageQueue implements MessageQueueInterface { if (!jobQueue.has(message.identifier)) { const promise = jobQueue.add(async () => MessageSender.send(message)); - promise.then(() => { - // Message sent; remove from cache - void this.cache.remove(message); - }).catch(() => { - // Message failed to send - }); + promise + .then(() => { + // Message sent; remove from cache + void this.pendingMessageCache.remove(message); + }) + .catch(() => { + // Message failed to send + }); } }); } private async processAllPending() { - const devices = this.cache.getDevices(); + const devices = this.pendingMessageCache.getDevices(); const promises = devices.map(async device => this.processPending(device)); return Promise.all(promises); @@ -136,7 +152,7 @@ export class MessageQueue implements MessageQueueInterface { return; } - await this.cache.add(device, message); + await this.pendingMessageCache.add(device, message); await this.processPending(device); }