You cannot select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
	
	
		
			333 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			TypeScript
		
	
			
		
		
	
	
			333 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			TypeScript
		
	
import { PendingMessageCache } from './PendingMessageCache';
 | 
						|
import { JobQueue, MessageUtils, UserUtils } from '../utils';
 | 
						|
import { PubKey, RawMessage } from '../types';
 | 
						|
import { MessageSender } from '.';
 | 
						|
import { ClosedGroupMessage } from '../messages/outgoing/controlMessage/group/ClosedGroupMessage';
 | 
						|
import { ConfigurationMessage } from '../messages/outgoing/controlMessage/ConfigurationMessage';
 | 
						|
import { ClosedGroupNameChangeMessage } from '../messages/outgoing/controlMessage/group/ClosedGroupNameChangeMessage';
 | 
						|
 | 
						|
import { ClosedGroupMemberLeftMessage } from '../messages/outgoing/controlMessage/group/ClosedGroupMemberLeftMessage';
 | 
						|
import { MessageSentHandler } from './MessageSentHandler';
 | 
						|
import { ContentMessage } from '../messages/outgoing';
 | 
						|
import { ExpirationTimerUpdateMessage } from '../messages/outgoing/controlMessage/ExpirationTimerUpdateMessage';
 | 
						|
import { ClosedGroupAddedMembersMessage } from '../messages/outgoing/controlMessage/group/ClosedGroupAddedMembersMessage';
 | 
						|
import { ClosedGroupEncryptionPairMessage } from '../messages/outgoing/controlMessage/group/ClosedGroupEncryptionPairMessage';
 | 
						|
import { ClosedGroupEncryptionPairRequestMessage } from '../messages/outgoing/controlMessage/group/ClosedGroupEncryptionPairRequestMessage';
 | 
						|
import { ClosedGroupNewMessage } from '../messages/outgoing/controlMessage/group/ClosedGroupNewMessage';
 | 
						|
import { ClosedGroupRemovedMembersMessage } from '../messages/outgoing/controlMessage/group/ClosedGroupRemovedMembersMessage';
 | 
						|
import { ClosedGroupVisibleMessage } from '../messages/outgoing/visibleMessage/ClosedGroupVisibleMessage';
 | 
						|
import { SyncMessageType } from '../utils/syncUtils';
 | 
						|
 | 
						|
import { OpenGroupRequestCommonType } from '../apis/open_group_api/opengroupV2/ApiUtil';
 | 
						|
import { OpenGroupVisibleMessage } from '../messages/outgoing/visibleMessage/OpenGroupVisibleMessage';
 | 
						|
import { UnsendMessage } from '../messages/outgoing/controlMessage/UnsendMessage';
 | 
						|
import { CallMessage } from '../messages/outgoing/controlMessage/CallMessage';
 | 
						|
import { OpenGroupMessageV2 } from '../apis/open_group_api/opengroupV2/OpenGroupMessageV2';
 | 
						|
import { AbortController } from 'abort-controller';
 | 
						|
import { sendSogsReactionOnionV4 } from '../apis/open_group_api/sogsv3/sogsV3SendReaction';
 | 
						|
 | 
						|
type ClosedGroupMessageType =
 | 
						|
  | ClosedGroupVisibleMessage
 | 
						|
  | ClosedGroupAddedMembersMessage
 | 
						|
  | ClosedGroupRemovedMembersMessage
 | 
						|
  | ClosedGroupNameChangeMessage
 | 
						|
  | ClosedGroupMemberLeftMessage
 | 
						|
  | ExpirationTimerUpdateMessage
 | 
						|
  | ClosedGroupEncryptionPairMessage
 | 
						|
  | UnsendMessage
 | 
						|
  | ClosedGroupEncryptionPairRequestMessage;
 | 
						|
 | 
						|
// ClosedGroupEncryptionPairReplyMessage must be sent to a user pubkey. Not a group.
 | 
						|
 | 
						|
export class MessageQueue {
 | 
						|
  private readonly jobQueues: Map<string, JobQueue> = new Map();
 | 
						|
  private readonly pendingMessageCache: PendingMessageCache;
 | 
						|
 | 
						|
  constructor(cache?: PendingMessageCache) {
 | 
						|
    this.pendingMessageCache = cache ?? new PendingMessageCache();
 | 
						|
    void this.processAllPending();
 | 
						|
  }
 | 
						|
 | 
						|
  public async sendToPubKey(
 | 
						|
    destinationPubKey: PubKey,
 | 
						|
    message: ContentMessage,
 | 
						|
    sentCb?: (message: RawMessage) => Promise<void>,
 | 
						|
    isGroup = false
 | 
						|
  ): Promise<void> {
 | 
						|
    if (message instanceof ConfigurationMessage || !!(message as any).syncTarget) {
 | 
						|
      throw new Error('SyncMessage needs to be sent with sendSyncMessage');
 | 
						|
    }
 | 
						|
    await this.process(destinationPubKey, message, sentCb, isGroup);
 | 
						|
  }
 | 
						|
 | 
						|
  /**
 | 
						|
   * This function is synced. It will wait for the message to be delivered to the open
 | 
						|
   * group to return.
 | 
						|
   * So there is no need for a sendCb callback
 | 
						|
   *
 | 
						|
   *
 | 
						|
   * fileIds is the array of ids this message is linked to. If we upload files as part of a message but do not link them with this, the files will be deleted much sooner
 | 
						|
   */
 | 
						|
  public async sendToOpenGroupV2(
 | 
						|
    message: OpenGroupVisibleMessage,
 | 
						|
    roomInfos: OpenGroupRequestCommonType,
 | 
						|
    blinded: boolean,
 | 
						|
    filesToLink: Array<number>
 | 
						|
  ) {
 | 
						|
    // Skipping the queue for Open Groups v2; the message is sent directly
 | 
						|
 | 
						|
    try {
 | 
						|
      // NOTE Reactions are handled separately
 | 
						|
      if (message.reaction) {
 | 
						|
        await sendSogsReactionOnionV4(
 | 
						|
          roomInfos.serverUrl,
 | 
						|
          roomInfos.roomId,
 | 
						|
          new AbortController().signal,
 | 
						|
          message.reaction,
 | 
						|
          blinded
 | 
						|
        );
 | 
						|
        return;
 | 
						|
      }
 | 
						|
 | 
						|
      const result = await MessageSender.sendToOpenGroupV2(
 | 
						|
        message,
 | 
						|
        roomInfos,
 | 
						|
        blinded,
 | 
						|
        filesToLink
 | 
						|
      );
 | 
						|
 | 
						|
      const { sentTimestamp, serverId } = result as OpenGroupMessageV2;
 | 
						|
      if (!serverId || serverId === -1) {
 | 
						|
        throw new Error(`Invalid serverId returned by server: ${serverId}`);
 | 
						|
      }
 | 
						|
 | 
						|
      await MessageSentHandler.handlePublicMessageSentSuccess(message.identifier, {
 | 
						|
        serverId: serverId,
 | 
						|
        serverTimestamp: sentTimestamp,
 | 
						|
      });
 | 
						|
    } catch (e) {
 | 
						|
      window?.log?.warn(
 | 
						|
        `Failed to send message to open group: ${roomInfos.serverUrl}:${roomInfos.roomId}:`,
 | 
						|
        e
 | 
						|
      );
 | 
						|
      await MessageSentHandler.handleMessageSentFailure(
 | 
						|
        message,
 | 
						|
        e || new Error('Failed to send message to open group.')
 | 
						|
      );
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  public async sendToOpenGroupV2BlindedRequest(
 | 
						|
    encryptedContent: Uint8Array,
 | 
						|
    roomInfos: OpenGroupRequestCommonType,
 | 
						|
    message: OpenGroupVisibleMessage,
 | 
						|
    recipientBlindedId: string
 | 
						|
  ) {
 | 
						|
    try {
 | 
						|
      if (!PubKey.hasBlindedPrefix(recipientBlindedId)) {
 | 
						|
        throw new Error('sendToOpenGroupV2BlindedRequest needs a blindedId');
 | 
						|
      }
 | 
						|
      const { serverTimestamp, serverId } = await MessageSender.sendToOpenGroupV2BlindedRequest(
 | 
						|
        encryptedContent,
 | 
						|
        roomInfos,
 | 
						|
        recipientBlindedId
 | 
						|
      );
 | 
						|
      if (!serverId || serverId === -1) {
 | 
						|
        throw new Error(`Invalid serverId returned by server: ${serverId}`);
 | 
						|
      }
 | 
						|
      await MessageSentHandler.handlePublicMessageSentSuccess(message.identifier, {
 | 
						|
        serverId,
 | 
						|
        serverTimestamp,
 | 
						|
      });
 | 
						|
    } catch (e) {
 | 
						|
      window?.log?.warn(
 | 
						|
        `Failed to send message to open group: ${roomInfos.serverUrl}:${roomInfos.roomId}:`,
 | 
						|
        e.message
 | 
						|
      );
 | 
						|
      await MessageSentHandler.handleMessageSentFailure(
 | 
						|
        message,
 | 
						|
        e || new Error('Failed to send message to open group.')
 | 
						|
      );
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  /**
 | 
						|
   *
 | 
						|
   * @param sentCb currently only called for medium groups sent message
 | 
						|
   */
 | 
						|
  public async sendToGroup(
 | 
						|
    message: ClosedGroupMessageType,
 | 
						|
    sentCb?: (message: RawMessage) => Promise<void>,
 | 
						|
    groupPubKey?: PubKey
 | 
						|
  ): Promise<void> {
 | 
						|
    let destinationPubKey: PubKey | undefined = groupPubKey;
 | 
						|
    if (message instanceof ExpirationTimerUpdateMessage || message instanceof ClosedGroupMessage) {
 | 
						|
      destinationPubKey = groupPubKey ? groupPubKey : message.groupId;
 | 
						|
    }
 | 
						|
 | 
						|
    if (!destinationPubKey) {
 | 
						|
      throw new Error('Invalid group message passed in sendToGroup.');
 | 
						|
    }
 | 
						|
 | 
						|
    // if groupId is set here, it means it's for a medium group. So send it as it
 | 
						|
    return this.sendToPubKey(PubKey.cast(destinationPubKey), message, sentCb, true);
 | 
						|
  }
 | 
						|
 | 
						|
  public async sendSyncMessage(
 | 
						|
    message?: SyncMessageType,
 | 
						|
    sentCb?: (message: RawMessage) => Promise<void>
 | 
						|
  ): Promise<void> {
 | 
						|
    if (!message) {
 | 
						|
      return;
 | 
						|
    }
 | 
						|
    if (
 | 
						|
      !(message instanceof ConfigurationMessage) &&
 | 
						|
      !(message instanceof UnsendMessage) &&
 | 
						|
      !(message as any)?.syncTarget
 | 
						|
    ) {
 | 
						|
      throw new Error('Invalid message given to sendSyncMessage');
 | 
						|
    }
 | 
						|
 | 
						|
    const ourPubKey = UserUtils.getOurPubKeyStrFromCache();
 | 
						|
 | 
						|
    await this.process(PubKey.cast(ourPubKey), message, sentCb);
 | 
						|
  }
 | 
						|
 | 
						|
  /**
 | 
						|
   * Sends a message that awaits until the message is completed sending
 | 
						|
   * @param user user pub key to send to
 | 
						|
   * @param message Message to be sent
 | 
						|
   */
 | 
						|
  public async sendToPubKeyNonDurably(
 | 
						|
    user: PubKey,
 | 
						|
    message: ClosedGroupNewMessage | CallMessage
 | 
						|
  ): Promise<boolean | number> {
 | 
						|
    let rawMessage;
 | 
						|
    try {
 | 
						|
      rawMessage = await MessageUtils.toRawMessage(user, message);
 | 
						|
      const { wrappedEnvelope, effectiveTimestamp } = await MessageSender.send(rawMessage);
 | 
						|
      await MessageSentHandler.handleMessageSentSuccess(
 | 
						|
        rawMessage,
 | 
						|
        effectiveTimestamp,
 | 
						|
        wrappedEnvelope
 | 
						|
      );
 | 
						|
      return effectiveTimestamp;
 | 
						|
    } catch (error) {
 | 
						|
      if (rawMessage) {
 | 
						|
        await MessageSentHandler.handleMessageSentFailure(rawMessage, error);
 | 
						|
      }
 | 
						|
      return false;
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  /**
 | 
						|
   * processes pending jobs in the message sending queue.
 | 
						|
   * @param device - target device to send to
 | 
						|
   */
 | 
						|
  public async processPending(device: PubKey, isSyncMessage: boolean = false) {
 | 
						|
    const messages = await this.pendingMessageCache.getForDevice(device);
 | 
						|
 | 
						|
    const jobQueue = this.getJobQueue(device);
 | 
						|
    messages.forEach(async message => {
 | 
						|
      const messageId = message.identifier;
 | 
						|
 | 
						|
      if (!jobQueue.has(messageId)) {
 | 
						|
        // We put the event handling inside this job to avoid sending duplicate events
 | 
						|
        const job = async () => {
 | 
						|
          try {
 | 
						|
            const { wrappedEnvelope, effectiveTimestamp } = await MessageSender.send(
 | 
						|
              message,
 | 
						|
              undefined,
 | 
						|
              undefined,
 | 
						|
              isSyncMessage
 | 
						|
            );
 | 
						|
 | 
						|
            await MessageSentHandler.handleMessageSentSuccess(
 | 
						|
              message,
 | 
						|
              effectiveTimestamp,
 | 
						|
              wrappedEnvelope
 | 
						|
            );
 | 
						|
 | 
						|
            const cb = this.pendingMessageCache.callbacks.get(message.identifier);
 | 
						|
 | 
						|
            if (cb) {
 | 
						|
              await cb(message);
 | 
						|
            }
 | 
						|
            this.pendingMessageCache.callbacks.delete(message.identifier);
 | 
						|
          } catch (error) {
 | 
						|
            void MessageSentHandler.handleMessageSentFailure(message, error);
 | 
						|
          } finally {
 | 
						|
            // Remove from the cache because retrying is done in the sender
 | 
						|
            void this.pendingMessageCache.remove(message);
 | 
						|
          }
 | 
						|
        };
 | 
						|
        await jobQueue.addWithId(messageId, job);
 | 
						|
      }
 | 
						|
    });
 | 
						|
  }
 | 
						|
 | 
						|
  /**
 | 
						|
   * This method should be called when the app is started and the user loggedin to fetch
 | 
						|
   * existing message waiting to be sent in the cache of message
 | 
						|
   */
 | 
						|
  public async processAllPending() {
 | 
						|
    const devices = await this.pendingMessageCache.getDevices();
 | 
						|
    const promises = devices.map(async device => this.processPending(device));
 | 
						|
 | 
						|
    return Promise.all(promises);
 | 
						|
  }
 | 
						|
 | 
						|
  /**
 | 
						|
   * This method should not be called directly. Only through sendToPubKey.
 | 
						|
   */
 | 
						|
  private async process(
 | 
						|
    destinationPk: PubKey,
 | 
						|
    message: ContentMessage,
 | 
						|
    sentCb?: (message: RawMessage) => Promise<void>,
 | 
						|
    isGroup = false
 | 
						|
  ): Promise<void> {
 | 
						|
    // Don't send to ourselves
 | 
						|
    const currentDevice = UserUtils.getOurPubKeyFromCache();
 | 
						|
    let isSyncMessage = false;
 | 
						|
    if (currentDevice && destinationPk.isEqual(currentDevice)) {
 | 
						|
      // We allow a message for ourselve only if it's a ConfigurationMessage, a ClosedGroupNewMessage,
 | 
						|
      // or a message with a syncTarget set.
 | 
						|
 | 
						|
      if (
 | 
						|
        message instanceof ConfigurationMessage ||
 | 
						|
        message instanceof ClosedGroupNewMessage ||
 | 
						|
        message instanceof UnsendMessage ||
 | 
						|
        (message as any).syncTarget?.length > 0
 | 
						|
      ) {
 | 
						|
        window?.log?.warn('Processing sync message');
 | 
						|
        isSyncMessage = true;
 | 
						|
      } else {
 | 
						|
        window?.log?.warn('Dropping message in process() to be sent to ourself');
 | 
						|
        return;
 | 
						|
      }
 | 
						|
    }
 | 
						|
 | 
						|
    await this.pendingMessageCache.add(destinationPk, message, sentCb, isGroup);
 | 
						|
    void this.processPending(destinationPk, isSyncMessage);
 | 
						|
  }
 | 
						|
 | 
						|
  private getJobQueue(device: PubKey): JobQueue {
 | 
						|
    let queue = this.jobQueues.get(device.key);
 | 
						|
    if (!queue) {
 | 
						|
      queue = new JobQueue();
 | 
						|
      this.jobQueues.set(device.key, queue);
 | 
						|
    }
 | 
						|
 | 
						|
    return queue;
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
let messageQueue: MessageQueue;
 | 
						|
 | 
						|
export function getMessageQueue(): MessageQueue {
 | 
						|
  if (!messageQueue) {
 | 
						|
    messageQueue = new MessageQueue();
 | 
						|
  }
 | 
						|
  return messageQueue;
 | 
						|
}
 |