You cannot select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
	
	
		
			273 lines
		
	
	
		
			8.0 KiB
		
	
	
	
		
			TypeScript
		
	
			
		
		
	
	
			273 lines
		
	
	
		
			8.0 KiB
		
	
	
	
		
			TypeScript
		
	
/* eslint-disable more/no-then */
 | 
						|
import _ from 'lodash';
 | 
						|
import { v4 as uuidv4 } from 'uuid';
 | 
						|
 | 
						|
import { EnvelopePlus } from './types';
 | 
						|
 | 
						|
import { addToCache, getAllFromCache, getAllFromCacheForSource, removeFromCache } from './cache';
 | 
						|
 | 
						|
// innerHandleSwarmContentMessage is only needed because of code duplication in handleDecryptedEnvelope...
 | 
						|
import { handleSwarmContentMessage, innerHandleSwarmContentMessage } from './contentMessage';
 | 
						|
 | 
						|
import { Data } from '../data/data';
 | 
						|
import { SignalService } from '../protobuf';
 | 
						|
import { StringUtils, UserUtils } from '../session/utils';
 | 
						|
import { perfEnd, perfStart } from '../session/utils/Performance';
 | 
						|
import { createTaskWithTimeout } from '../session/utils/TaskWithTimeout';
 | 
						|
import { UnprocessedParameter } from '../types/sqlSharedTypes';
 | 
						|
import { getEnvelopeId } from './common';
 | 
						|
 | 
						|
export { downloadAttachment } from './attachments';
 | 
						|
 | 
						|
const incomingMessagePromises: Array<Promise<any>> = [];
 | 
						|
 | 
						|
async function handleSwarmEnvelope(
 | 
						|
  envelope: EnvelopePlus,
 | 
						|
  messageHash: string,
 | 
						|
  messageExpiration: number | null
 | 
						|
) {
 | 
						|
  if (envelope.content && envelope.content.length > 0) {
 | 
						|
    return handleSwarmContentMessage(envelope, messageHash, messageExpiration);
 | 
						|
  }
 | 
						|
 | 
						|
  await removeFromCache(envelope);
 | 
						|
  throw new Error('Received message with no content');
 | 
						|
}
 | 
						|
 | 
						|
class EnvelopeQueue {
 | 
						|
  // Last pending promise
 | 
						|
  private pending: Promise<any> = Promise.resolve();
 | 
						|
 | 
						|
  public add(task: any): void {
 | 
						|
    const promise = this.pending.then(task, task);
 | 
						|
    this.pending = promise;
 | 
						|
 | 
						|
    this.pending.then(this.cleanup.bind(this, promise), this.cleanup.bind(this, promise));
 | 
						|
  }
 | 
						|
 | 
						|
  private cleanup(promise: Promise<any>) {
 | 
						|
    // We want to clear out the promise chain whenever possible because it could
 | 
						|
    //   lead to large memory usage over time:
 | 
						|
    //   https://github.com/nodejs/node/issues/6673#issuecomment-244331609
 | 
						|
    if (this.pending === promise) {
 | 
						|
      this.pending = Promise.resolve();
 | 
						|
    }
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
const envelopeQueue = new EnvelopeQueue();
 | 
						|
 | 
						|
function queueSwarmEnvelope(
 | 
						|
  envelope: EnvelopePlus,
 | 
						|
  messageHash: string,
 | 
						|
  messageExpiration: number | null
 | 
						|
) {
 | 
						|
  const id = getEnvelopeId(envelope);
 | 
						|
  const task = handleSwarmEnvelope.bind(null, envelope, messageHash, messageExpiration);
 | 
						|
  const taskWithTimeout = createTaskWithTimeout(task, `queueSwarmEnvelope ${id}`);
 | 
						|
 | 
						|
  try {
 | 
						|
    envelopeQueue.add(taskWithTimeout);
 | 
						|
  } catch (error) {
 | 
						|
    window?.log?.error(
 | 
						|
      'queueSwarmEnvelope error handling envelope',
 | 
						|
      id,
 | 
						|
      ':',
 | 
						|
      error && error.stack ? error.stack : error
 | 
						|
    );
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
async function handleRequestDetail(
 | 
						|
  plaintext: Uint8Array,
 | 
						|
  inConversation: string | null,
 | 
						|
  lastPromise: Promise<any>,
 | 
						|
  messageHash: string,
 | 
						|
  messageExpiration: number
 | 
						|
): Promise<void> {
 | 
						|
  const envelope: any = SignalService.Envelope.decode(plaintext);
 | 
						|
 | 
						|
  // After this point, decoding errors are not the server's
 | 
						|
  //   fault, and we should handle them gracefully and tell the
 | 
						|
  //   user they received an invalid message
 | 
						|
  // The message is for a medium size group
 | 
						|
  if (inConversation) {
 | 
						|
    const ourNumber = UserUtils.getOurPubKeyStrFromCache();
 | 
						|
    const senderIdentity = envelope.source;
 | 
						|
 | 
						|
    if (senderIdentity === ourNumber) {
 | 
						|
      return;
 | 
						|
    }
 | 
						|
 | 
						|
    // Sender identity will be lost if we load from cache, because
 | 
						|
    // plaintext (and protobuf.Envelope) does not have that field...
 | 
						|
    envelope.source = inConversation;
 | 
						|
 | 
						|
    // eslint-disable-next-line no-param-reassign
 | 
						|
    plaintext = SignalService.Envelope.encode(envelope).finish();
 | 
						|
    envelope.senderIdentity = senderIdentity;
 | 
						|
  }
 | 
						|
 | 
						|
  envelope.id = uuidv4();
 | 
						|
  envelope.serverTimestamp = envelope.serverTimestamp ? envelope.serverTimestamp.toNumber() : null;
 | 
						|
  envelope.messageHash = messageHash;
 | 
						|
 | 
						|
  try {
 | 
						|
    // NOTE: Annoyngly we add plaintext to the cache
 | 
						|
    // after we've already processed some of it (thus the
 | 
						|
    // need to handle senderIdentity separately)...
 | 
						|
    perfStart(`addToCache-${envelope.id}`);
 | 
						|
 | 
						|
    await addToCache(envelope, plaintext, messageHash);
 | 
						|
    perfEnd(`addToCache-${envelope.id}`, 'addToCache');
 | 
						|
 | 
						|
    // To ensure that we queue in the same order we receive messages
 | 
						|
    await lastPromise;
 | 
						|
 | 
						|
    queueSwarmEnvelope(envelope, messageHash, messageExpiration);
 | 
						|
  } catch (error) {
 | 
						|
    window?.log?.error(
 | 
						|
      'handleRequest error trying to add message to cache:',
 | 
						|
      error && error.stack ? error.stack : error
 | 
						|
    );
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
/**
 | 
						|
 *
 | 
						|
 * @param inConversation if the request is related to a group, this will be set to the group pubkey. Otherwise, it is set to null
 | 
						|
 */
 | 
						|
export function handleRequest(
 | 
						|
  plaintext: Uint8Array,
 | 
						|
  inConversation: string | null,
 | 
						|
  messageHash: string,
 | 
						|
  messageExpiration: number
 | 
						|
): void {
 | 
						|
  const lastPromise = _.last(incomingMessagePromises) || Promise.resolve();
 | 
						|
 | 
						|
  const promise = handleRequestDetail(
 | 
						|
    plaintext,
 | 
						|
    inConversation,
 | 
						|
    lastPromise,
 | 
						|
    messageHash,
 | 
						|
    messageExpiration
 | 
						|
  ).catch(e => {
 | 
						|
    window?.log?.error('Error handling incoming message:', e && e.stack ? e.stack : e);
 | 
						|
  });
 | 
						|
 | 
						|
  incomingMessagePromises.push(promise);
 | 
						|
}
 | 
						|
 | 
						|
/**
 | 
						|
 * Used in main_renderer.js
 | 
						|
 */
 | 
						|
export async function queueAllCached() {
 | 
						|
  const items = await getAllFromCache();
 | 
						|
 | 
						|
  await items.reduce(async (promise, item) => {
 | 
						|
    await promise;
 | 
						|
    await queueCached(item);
 | 
						|
  }, Promise.resolve());
 | 
						|
}
 | 
						|
 | 
						|
export async function queueAllCachedFromSource(source: string) {
 | 
						|
  const items = await getAllFromCacheForSource(source);
 | 
						|
 | 
						|
  // queue all cached for this source, but keep the order
 | 
						|
  await items.reduce(async (promise, item) => {
 | 
						|
    await promise;
 | 
						|
    await queueCached(item);
 | 
						|
  }, Promise.resolve());
 | 
						|
}
 | 
						|
 | 
						|
async function queueCached(item: UnprocessedParameter) {
 | 
						|
  try {
 | 
						|
    const envelopePlaintext = StringUtils.encode(item.envelope, 'base64');
 | 
						|
    const envelopeArray = new Uint8Array(envelopePlaintext);
 | 
						|
 | 
						|
    const envelope: any = SignalService.Envelope.decode(envelopeArray);
 | 
						|
    envelope.id = envelope.serverGuid || item.id;
 | 
						|
    envelope.source = envelope.source || item.source;
 | 
						|
 | 
						|
    // Why do we need to do this???
 | 
						|
    envelope.senderIdentity = envelope.senderIdentity || item.senderIdentity;
 | 
						|
 | 
						|
    const { decrypted } = item;
 | 
						|
 | 
						|
    if (decrypted) {
 | 
						|
      const payloadPlaintext = StringUtils.encode(decrypted, 'base64');
 | 
						|
      // TODO we don't store the expiration in the cache, but we want to get rid of the cache at some point
 | 
						|
      queueDecryptedEnvelope(envelope, payloadPlaintext, envelope.messageHash, null);
 | 
						|
    } else {
 | 
						|
      // TODO we don't store the expiration in the cache, but we want to get rid of the cache at some point
 | 
						|
      queueSwarmEnvelope(envelope, envelope.messageHash, null);
 | 
						|
    }
 | 
						|
  } catch (error) {
 | 
						|
    window?.log?.error(
 | 
						|
      'queueCached error handling item',
 | 
						|
      item.id,
 | 
						|
      'removing it. Error:',
 | 
						|
      error && error.stack ? error.stack : error
 | 
						|
    );
 | 
						|
 | 
						|
    try {
 | 
						|
      await Data.removeUnprocessed(item.id);
 | 
						|
    } catch (deleteError) {
 | 
						|
      window?.log?.error(
 | 
						|
        'queueCached error deleting item',
 | 
						|
        item.id,
 | 
						|
        'Error:',
 | 
						|
        deleteError && deleteError.stack ? deleteError.stack : deleteError
 | 
						|
      );
 | 
						|
    }
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
function queueDecryptedEnvelope(
 | 
						|
  envelope: any,
 | 
						|
  plaintext: ArrayBuffer,
 | 
						|
  messageHash: string,
 | 
						|
  messageExpiration: number | null
 | 
						|
) {
 | 
						|
  const id = getEnvelopeId(envelope);
 | 
						|
  window?.log?.info('queueing decrypted envelope', id);
 | 
						|
 | 
						|
  const task = handleDecryptedEnvelope.bind(
 | 
						|
    null,
 | 
						|
    envelope,
 | 
						|
    plaintext,
 | 
						|
    messageHash,
 | 
						|
    messageExpiration
 | 
						|
  );
 | 
						|
  const taskWithTimeout = createTaskWithTimeout(task, `queueEncryptedEnvelope ${id}`);
 | 
						|
  try {
 | 
						|
    envelopeQueue.add(taskWithTimeout);
 | 
						|
  } catch (error) {
 | 
						|
    window?.log?.error(
 | 
						|
      `queueDecryptedEnvelope error handling envelope ${id}:`,
 | 
						|
      error && error.stack ? error.stack : error
 | 
						|
    );
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
async function handleDecryptedEnvelope(
 | 
						|
  envelope: EnvelopePlus,
 | 
						|
  plaintext: ArrayBuffer,
 | 
						|
  messageHash: string,
 | 
						|
  messageExpirationFromRetrieve: number | null
 | 
						|
) {
 | 
						|
  if (envelope.content) {
 | 
						|
    const sentAtTimestamp = _.toNumber(envelope.timestamp);
 | 
						|
 | 
						|
    await innerHandleSwarmContentMessage({
 | 
						|
      envelope,
 | 
						|
      sentAtTimestamp,
 | 
						|
      plaintext,
 | 
						|
      messageHash,
 | 
						|
      messageExpirationFromRetrieve,
 | 
						|
    });
 | 
						|
  } else {
 | 
						|
    await removeFromCache(envelope);
 | 
						|
  }
 | 
						|
}
 |