pull/1174/head
Vincent 5 years ago
parent 80bc3520bd
commit 86cbc07855

@ -8,23 +8,33 @@ import {
MessageQueueInterfaceEvents, MessageQueueInterfaceEvents,
GroupMessageType, GroupMessageType,
} from './MessageQueueInterface'; } from './MessageQueueInterface';
import { ContentMessage, OpenGroupMessage, SyncMessage, SessionResetMessage, ClosedGroupMessage } from '../messages/outgoing'; import {
ContentMessage,
OpenGroupMessage,
SyncMessage,
SessionResetMessage,
ClosedGroupMessage,
} from '../messages/outgoing';
import { PendingMessageCache } from './PendingMessageCache'; import { PendingMessageCache } from './PendingMessageCache';
import { JobQueue, TypedEventEmitter, toRawMessage, toSyncMessage } from '../utils'; import {
JobQueue,
TypedEventEmitter,
toRawMessage,
toSyncMessage,
} from '../utils';
import { PubKey } from '../types'; import { PubKey } from '../types';
import { ConversationController } from '../../window'; import { ConversationController } from '../../window';
import { MessageSender } from '.'; import { MessageSender } from '.';
import { SessionProtocol } from '../protocols'; import { SessionProtocol } from '../protocols';
export class MessageQueue implements MessageQueueInterface { export class MessageQueue implements MessageQueueInterface {
public readonly events: TypedEventEmitter<MessageQueueInterfaceEvents>; public readonly events: TypedEventEmitter<MessageQueueInterfaceEvents>;
private readonly jobQueues: Map<PubKey, JobQueue> = new Map(); private readonly jobQueues: Map<PubKey, JobQueue> = new Map();
private readonly cache: PendingMessageCache; private readonly pendingMessageCache: PendingMessageCache;
constructor() { constructor() {
this.events = new EventEmitter(); this.events = new EventEmitter();
this.cache = new PendingMessageCache(); this.pendingMessageCache = new PendingMessageCache();
void this.processAllPending(); void this.processAllPending();
} }
@ -39,17 +49,19 @@ export class MessageQueue implements MessageQueueInterface {
await this.sendMessageToDevices([device], message); await this.sendMessageToDevices([device], message);
} }
public async sendMessageToDevices(devices: Array<PubKey>, message: ContentMessage) { public async sendMessageToDevices(
devices: Array<PubKey>,
message: ContentMessage
) {
let currentDevices = [...devices]; let currentDevices = [...devices];
if (SyncMessage.canSync(message)) { if (message.canSync(message)) {
// Sync to our devices // Sync to our devices
const syncMessage = toSyncMessage.from(message); const syncMessage = toSyncMessage(message);
await this.sendSyncMessage(syncMessage); const ourDevices = await this.sendSyncMessage(syncMessage);
// Remove our devices from currentDevices // Remove our devices from currentDevices
const ourDevices = await this.getOurDevices(); currentDevices = _.xor(currentDevices, ourDevices);
currentDevices = currentDevices.filter(device => !_.includes(ourDevices, device));
} }
currentDevices.forEach(async device => { currentDevices.forEach(async device => {
@ -58,8 +70,10 @@ export class MessageQueue implements MessageQueueInterface {
} }
public async sendToGroup(message: OpenGroupMessage | ContentMessage) { public async sendToGroup(message: OpenGroupMessage | ContentMessage) {
if (
if (!(message instanceof OpenGroupMessage) && !(message instanceof ClosedGroupMessage)) { !(message instanceof OpenGroupMessage) &&
!(message instanceof ClosedGroupMessage)
) {
return; return;
} }
@ -73,31 +87,31 @@ export class MessageQueue implements MessageQueueInterface {
await this.sendMessageToDevices(recipients, message); await this.sendMessageToDevices(recipients, message);
} }
// Open groups // Open groups
if (message instanceof OpenGroupMessage) { if (message instanceof OpenGroupMessage) {
// No queue needed for Open Groups; send directly // No queue needed for Open Groups; send directly
} }
} }
public async sendSyncMessage(message: ContentMessage) { public async sendSyncMessage(
message: ContentMessage
): Promise<Array<PubKey>> {
// Sync with our devices // Sync with our devices
const syncMessage = toSyncMessage(); const syncMessage = toSyncMessage();
if (!syncMessage.canSync()) { if (!syncMessage.canSync()) {
return; return;
} }
const ourDevices = await this.getOurDevices(); const ourDevices = await this.getOurDevices();
ourDevices.forEach(async device => { ourDevices.forEach(async device => {
await this.queue(device, message); await this.queue(device, message);
}); });
return ourDevices;
} }
public async processPending(device: PubKey) { public async processPending(device: PubKey) {
const messages = this.cache.getForDevice(device); const messages = this.pendingMessageCache.getForDevice(device);
const hasSession = SessionProtocol.hasSession(device); const hasSession = SessionProtocol.hasSession(device);
const conversation = ConversationController.get(device.key); const conversation = ConversationController.get(device.key);
@ -114,18 +128,20 @@ export class MessageQueue implements MessageQueueInterface {
if (!jobQueue.has(message.identifier)) { if (!jobQueue.has(message.identifier)) {
const promise = jobQueue.add(async () => MessageSender.send(message)); const promise = jobQueue.add(async () => MessageSender.send(message));
promise.then(() => { promise
// Message sent; remove from cache .then(() => {
void this.cache.remove(message); // Message sent; remove from cache
}).catch(() => { void this.pendingMessageCache.remove(message);
// Message failed to send })
}); .catch(() => {
// Message failed to send
});
} }
}); });
} }
private async processAllPending() { private async processAllPending() {
const devices = this.cache.getDevices(); const devices = this.pendingMessageCache.getDevices();
const promises = devices.map(async device => this.processPending(device)); const promises = devices.map(async device => this.processPending(device));
return Promise.all(promises); return Promise.all(promises);
@ -136,7 +152,7 @@ export class MessageQueue implements MessageQueueInterface {
return; return;
} }
await this.cache.add(device, message); await this.pendingMessageCache.add(device, message);
await this.processPending(device); await this.processPending(device);
} }

Loading…
Cancel
Save