You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
session-desktop/ts/session/sending/MessageQueue.ts

160 lines
4.6 KiB
TypeScript

5 years ago
import * as _ from 'lodash';
import * as Data from '../../../js/modules/data';
import { textsecure } from '../../window';
import { EventEmitter } from 'events';
import {
MessageQueueInterface,
MessageQueueInterfaceEvents,
5 years ago
GroupMessageType,
} from './MessageQueueInterface';
5 years ago
import { ContentMessage, OpenGroupMessage, SyncMessage, SessionResetMessage, ClosedGroupMessage } from '../messages/outgoing';
import { PendingMessageCache } from './PendingMessageCache';
5 years ago
import { JobQueue, TypedEventEmitter, toRawMessage, toSyncMessage } from '../utils';
5 years ago
import { PubKey } from '../types';
import { ConversationController } from '../../window';
import { MessageSender } from '.';
5 years ago
import { SessionProtocol } from '../protocols';
export class MessageQueue implements MessageQueueInterface {
public readonly events: TypedEventEmitter<MessageQueueInterfaceEvents>;
5 years ago
private readonly jobQueues: Map<PubKey, JobQueue> = new Map();
private readonly cache: PendingMessageCache;
constructor() {
this.events = new EventEmitter();
this.cache = new PendingMessageCache();
5 years ago
void this.processAllPending();
}
5 years ago
public async sendUsingMultiDevice(user: PubKey, message: ContentMessage) {
const userLinked = await Data.getPairedDevicesFor(user.key);
const userDevices = userLinked.map(d => new PubKey(d));
await this.sendMessageToDevices(userDevices, message);
}
5 years ago
public async send(device: PubKey, message: ContentMessage) {
await this.sendMessageToDevices([device], message);
}
5 years ago
public async sendMessageToDevices(devices: Array<PubKey>, message: ContentMessage) {
let currentDevices = [...devices];
if (SyncMessage.canSync(message)) {
// Sync to our devices
const syncMessage = toSyncMessage.from(message);
await this.sendSyncMessage(syncMessage);
// Remove our devices from currentDevices
const ourDevices = await this.getOurDevices();
currentDevices = currentDevices.filter(device => !_.includes(ourDevices, device));
}
currentDevices.forEach(async device => {
await this.queue(device, message);
});
}
5 years ago
public async sendToGroup(message: OpenGroupMessage | ContentMessage) {
if (!(message instanceof OpenGroupMessage) && !(message instanceof ClosedGroupMessage)) {
return;
}
// Closed groups
if (message instanceof ClosedGroupMessage) {
// Get devices in closed group
const conversation = ConversationController.get(message.groupId);
const recipients = 5;
await this.sendMessageToDevices(recipients, message);
}
// Open groups
if (message instanceof OpenGroupMessage) {
// No queue needed for Open Groups; send directly
}
}
5 years ago
public async sendSyncMessage(message: ContentMessage) {
// Sync with our devices
const syncMessage = toSyncMessage();
if (!syncMessage.canSync()) {
return;
}
const ourDevices = await this.getOurDevices();
5 years ago
5 years ago
ourDevices.forEach(async device => {
await this.queue(device, message);
});
}
public async processPending(device: PubKey) {
5 years ago
const messages = this.cache.getForDevice(device);
5 years ago
const hasSession = SessionProtocol.hasSession(device);
5 years ago
const conversation = ConversationController.get(device.key);
const isMediumGroup = conversation.isMediumGroup();
if (!isMediumGroup && !hasSession) {
5 years ago
await SessionProtocol.sendSessionRequestIfNeeded(device);
5 years ago
return;
}
const jobQueue = this.getJobQueue(device);
messages.forEach(message => {
if (!jobQueue.has(message.identifier)) {
5 years ago
const promise = jobQueue.add(async () => MessageSender.send(message));
5 years ago
promise.then(() => {
// Message sent; remove from cache
void this.cache.remove(message);
}).catch(() => {
// Message failed to send
});
}
});
}
5 years ago
private async processAllPending() {
const devices = this.cache.getDevices();
const promises = devices.map(async device => this.processPending(device));
return Promise.all(promises);
}
5 years ago
private async queue(device: PubKey, message: ContentMessage) {
5 years ago
if (message instanceof SessionResetMessage) {
return;
}
5 years ago
await this.cache.add(device, message);
await this.processPending(device);
}
5 years ago
private getJobQueue(device: PubKey): JobQueue {
let queue = this.jobQueues.get(device);
if (!queue) {
queue = new JobQueue();
this.jobQueues.set(device, queue);
}
return queue;
}
5 years ago
private async getOurDevices(): Promise<Array<PubKey>> {
const ourKey = await textsecure.storage.user.getNumber();
const ourLinked = await Data.getPairedDevicesFor(ourKey);
return ourLinked.map(d => new PubKey(d));
}
}