diff --git a/ts/session/messages/outgoing/content/data/group/ClosedGroupMessage.ts b/ts/session/messages/outgoing/content/data/group/ClosedGroupMessage.ts index a1e21dd2e..ccdb8f121 100644 --- a/ts/session/messages/outgoing/content/data/group/ClosedGroupMessage.ts +++ b/ts/session/messages/outgoing/content/data/group/ClosedGroupMessage.ts @@ -8,7 +8,7 @@ interface ClosedGroupMessageParams extends MessageParams { } export abstract class ClosedGroupMessage extends DataMessage { - protected readonly groupId: string; + public readonly groupId: string; constructor(params: ClosedGroupMessageParams) { super({ diff --git a/ts/session/messages/outgoing/content/sync/SyncMessage.ts b/ts/session/messages/outgoing/content/sync/SyncMessage.ts index 3eb336dc6..cf774f6c6 100644 --- a/ts/session/messages/outgoing/content/sync/SyncMessage.ts +++ b/ts/session/messages/outgoing/content/sync/SyncMessage.ts @@ -2,6 +2,11 @@ import { ContentMessage } from '../ContentMessage'; import { SignalService } from '../../../../../protobuf'; export abstract class SyncMessage extends ContentMessage { + public static canSync(message: ContentMessage): boolean { + // TODO: implement + return true; + } + public ttl(): number { return this.getDefaultTTL(); } diff --git a/ts/session/protocols/SessionProtocol.ts b/ts/session/protocols/SessionProtocol.ts index f150e70d9..9e9c356c5 100644 --- a/ts/session/protocols/SessionProtocol.ts +++ b/ts/session/protocols/SessionProtocol.ts @@ -3,19 +3,20 @@ // The reason i haven't done it is to avoid having instances of the protocol, rather you should be able to call the functions directly import { SessionResetMessage } from '../messages/outgoing'; +import { PubKey } from '../types'; -export function hasSession(device: string): boolean { +export function hasSession(device: PubKey): boolean { return false; // TODO: Implement } -export function hasSentSessionRequest(device: string): boolean { +export function hasSentSessionRequest(device: PubKey): boolean { // TODO: need a way to keep track of if we've sent a session request // My idea was to use the timestamp of when it was sent but there might be another better approach return false; } export async function sendSessionRequestIfNeeded( - device: string + device: PubKey ): Promise { if (hasSession(device) || hasSentSessionRequest(device)) { return Promise.resolve(); @@ -34,14 +35,14 @@ export async function sendSessionRequest( return Promise.resolve(); } -export function sessionEstablished(device: string) { +export function sessionEstablished(device: PubKey) { // TODO: this is called when we receive an encrypted message from the other user // Maybe it should be renamed to something else // TODO: This should make `hasSentSessionRequest` return `false` } export function shouldProcessSessionRequest( - device: string, + device: PubKey, messageTimestamp: number ): boolean { // TODO: Need to do the following here @@ -49,7 +50,7 @@ export function shouldProcessSessionRequest( return false; } -export function sessionRequestProcessed(device: string) { +export function sessionRequestProcessed(device: PubKey) { // TODO: this is called when we process the session request // This should store the processed timestamp // Again naming is crap so maybe some other name is better diff --git a/ts/session/sending/MessageQueue.ts b/ts/session/sending/MessageQueue.ts index cecbe589d..1efd26c95 100644 --- a/ts/session/sending/MessageQueue.ts +++ b/ts/session/sending/MessageQueue.ts @@ -1,14 +1,21 @@ +import * as _ from 'lodash'; +import * as Data from '../../../js/modules/data'; +import { textsecure } from '../../window'; + import { EventEmitter } from 'events'; import { MessageQueueInterface, MessageQueueInterfaceEvents, + GroupMessageType, } from './MessageQueueInterface'; -import { ContentMessage, OpenGroupMessage, SyncMessage, SessionResetMessage } from '../messages/outgoing'; +import { ContentMessage, OpenGroupMessage, SyncMessage, SessionResetMessage, ClosedGroupMessage } from '../messages/outgoing'; import { PendingMessageCache } from './PendingMessageCache'; -import { JobQueue, TypedEventEmitter } from '../utils'; +import { JobQueue, TypedEventEmitter, toRawMessage, toSyncMessage } from '../utils'; import { PubKey } from '../types'; import { ConversationController } from '../../window'; import { MessageSender } from '.'; +import { SessionProtocol } from '../protocols'; + export class MessageQueue implements MessageQueueInterface { public readonly events: TypedEventEmitter; @@ -18,37 +25,86 @@ export class MessageQueue implements MessageQueueInterface { constructor() { this.events = new EventEmitter(); this.cache = new PendingMessageCache(); - this.processAllPending(); + void this.processAllPending(); } - public sendUsingMultiDevice(user: string, message: ContentMessage) { - // this.cache - - // throw new Error('Method not implemented.'); + public async sendUsingMultiDevice(user: PubKey, message: ContentMessage) { + const userLinked = await Data.getPairedDevicesFor(user.key); + const userDevices = userLinked.map(d => new PubKey(d)); + + await this.sendMessageToDevices(userDevices, message); } - public send(device: PubKey, message: ContentMessage) { - throw new Error('Method not implemented.'); + + public async send(device: PubKey, message: ContentMessage) { + await this.sendMessageToDevices([device], message); } - public sendToGroup(message: ContentMessage | OpenGroupMessage) { - throw new Error('Method not implemented.'); + + public async sendMessageToDevices(devices: Array, message: ContentMessage) { + let currentDevices = [...devices]; + + if (SyncMessage.canSync(message)) { + // Sync to our devices + const syncMessage = toSyncMessage.from(message); + await this.sendSyncMessage(syncMessage); + + // Remove our devices from currentDevices + const ourDevices = await this.getOurDevices(); + currentDevices = currentDevices.filter(device => !_.includes(ourDevices, device)); + } + + currentDevices.forEach(async device => { + await this.queue(device, message); + }); } - public sendSyncMessage(message: ContentMessage) { - throw new Error('Method not implemented.'); + + public async sendToGroup(message: OpenGroupMessage | ContentMessage) { + + if (!(message instanceof OpenGroupMessage) && !(message instanceof ClosedGroupMessage)) { + return; + } + + // Closed groups + if (message instanceof ClosedGroupMessage) { + // Get devices in closed group + const conversation = ConversationController.get(message.groupId); + + const recipients = 5; + + await this.sendMessageToDevices(recipients, message); + } + + // Open groups + if (message instanceof OpenGroupMessage) { + // No queue needed for Open Groups; send directly + + } + } - public processPending(device: PubKey) { - // TODO: implement - const SessionManager: any = {}; // TEMP FIX + public async sendSyncMessage(message: ContentMessage) { + // Sync with our devices + + const syncMessage = toSyncMessage(); + if (!syncMessage.canSync()) { + return; + } + + const ourDevices = await this.getOurDevices(); + ourDevices.forEach(async device => { + await this.queue(device, message); + }); + } + + public async processPending(device: PubKey) { const messages = this.cache.getForDevice(device); + const hasSession = SessionProtocol.hasSession(device); const conversation = ConversationController.get(device.key); const isMediumGroup = conversation.isMediumGroup(); - const hasSession = false; // TODO ; = SessionManager.hasSession(device); - if (!isMediumGroup && !hasSession) { - SessionManager.sendSessionRequestIfNeeded(); + await SessionProtocol.sendSessionRequestIfNeeded(device); return; } @@ -56,7 +112,7 @@ export class MessageQueue implements MessageQueueInterface { const jobQueue = this.getJobQueue(device); messages.forEach(message => { if (!jobQueue.has(message.identifier)) { - const promise = jobQueue.add(message.identifier, MessageSender.send(message)); + const promise = jobQueue.add(async () => MessageSender.send(message)); promise.then(() => { // Message sent; remove from cache @@ -66,32 +122,22 @@ export class MessageQueue implements MessageQueueInterface { }); } }); - - - } - private processAllPending() { - // TODO: Get all devices which are pending here - + private async processAllPending() { + const devices = this.cache.getDevices(); + const promises = devices.map(async device => this.processPending(device)); + + return Promise.all(promises); } - private queue(device: PubKey, message: ContentMessage) { - // TODO: implement + private async queue(device: PubKey, message: ContentMessage) { if (message instanceof SessionResetMessage) { return; } - const added = this.cache.add(device, message); - - // if not added? - - this.processPending(device); - } - - private queueOpenGroupMessage(message: OpenGroupMessage) { - // TODO: Do we need to queue open group messages? - // If so we can get open group job queue and add the send job here + await this.cache.add(device, message); + await this.processPending(device); } private getJobQueue(device: PubKey): JobQueue { @@ -103,4 +149,11 @@ export class MessageQueue implements MessageQueueInterface { return queue; } + + private async getOurDevices(): Promise> { + const ourKey = await textsecure.storage.user.getNumber(); + const ourLinked = await Data.getPairedDevicesFor(ourKey); + + return ourLinked.map(d => new PubKey(d)); + } } diff --git a/ts/session/sending/MessageQueueInterface.ts b/ts/session/sending/MessageQueueInterface.ts index 21a018cbf..c6ffc328e 100644 --- a/ts/session/sending/MessageQueueInterface.ts +++ b/ts/session/sending/MessageQueueInterface.ts @@ -16,7 +16,7 @@ export interface MessageQueueInterfaceEvents { export interface MessageQueueInterface { events: TypedEventEmitter; - sendUsingMultiDevice(user: string, message: ContentMessage): void; + sendUsingMultiDevice(user: PubKey, message: ContentMessage): void; send(device: PubKey, message: ContentMessage): void; sendToGroup(message: GroupMessageType): void; sendSyncMessage(message: ContentMessage): void; diff --git a/ts/session/utils/Messages.ts b/ts/session/utils/Messages.ts index 04ae0f815..f3c8ed144 100644 --- a/ts/session/utils/Messages.ts +++ b/ts/session/utils/Messages.ts @@ -1,5 +1,5 @@ import { RawMessage } from '../types/RawMessage'; -import { ContentMessage } from '../messages/outgoing'; +import { ContentMessage, SyncMessage } from '../messages/outgoing'; import { EncryptionType, PubKey } from '../types'; export function toRawMessage( @@ -22,3 +22,8 @@ export function toRawMessage( return rawMessage; } + +export function toSyncMessage(message: ContentMessage): { + + return SyncMessage; +} \ No newline at end of file