fix: use DaR ttl for synced message store request

removing one request at a time
pull/2940/head
Audric Ackermann 1 year ago
parent 5cfbb8405c
commit b61745fd94

@ -1842,7 +1842,7 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
timestamp: sentAt,
attachments,
expirationType: message.getExpirationType() ?? 'unknown',
expireTimer: message.getExpireTimer(),
expireTimer: message.getExpireTimerSeconds(),
preview: preview ? [preview] : [],
quote,
lokiProfile: UserUtils.getOurProfile(),

@ -291,8 +291,8 @@ export class MessageModel extends Backbone.Model<MessageAttributes> {
public getPropsForExpiringMessage(): PropsForExpiringMessage {
const expirationType = this.getExpirationType();
const expirationDurationMs = this.getExpireTimer()
? this.getExpireTimer() * DURATION.SECONDS
const expirationDurationMs = this.getExpireTimerSeconds()
? this.getExpireTimerSeconds() * DURATION.SECONDS
: null;
const expireTimerStart = this.getExpirationStartTimestamp() || null;
@ -515,7 +515,7 @@ export class MessageModel extends Backbone.Model<MessageAttributes> {
public getPropsForMessage(): PropsForMessageWithoutConvoProps {
const sender = this.getSource();
const expirationType = this.getExpirationType();
const expirationDurationMs = this.getExpireTimer() * DURATION.SECONDS;
const expirationDurationMs = this.getExpireTimerSeconds() * DURATION.SECONDS;
const expireTimerStart = this.getExpirationStartTimestamp();
const expirationTimestamp =
expirationType && expireTimerStart && expirationDurationMs
@ -988,6 +988,11 @@ export class MessageModel extends Backbone.Model<MessageAttributes> {
return lodashSize(this.get('errors')) > 0;
}
/**
* Update the messageHash field of that message instance. Does not call commit()
*
* @param messageHash
*/
public async updateMessageHash(messageHash: string) {
if (!messageHash) {
window?.log?.error('Message hash not provided to update message hash');
@ -995,8 +1000,6 @@ export class MessageModel extends Backbone.Model<MessageAttributes> {
this.set({
messageHash,
});
await this.commit();
}
public async sendSyncMessageOnly(contentMessage: ContentMessage) {
@ -1117,7 +1120,7 @@ export class MessageModel extends Backbone.Model<MessageAttributes> {
const convo = this.getConversation();
const canBeDeleteAfterRead = convo && !convo.isMe() && convo.isPrivate();
const expirationType = this.getExpirationType();
const expireTimer = this.getExpireTimer();
const expireTimer = this.getExpireTimerSeconds();
if (canBeDeleteAfterRead && expirationType && expireTimer > 0) {
const expirationMode = DisappearingMessages.changeToDisappearingConversationMode(
@ -1153,7 +1156,7 @@ export class MessageModel extends Backbone.Model<MessageAttributes> {
}
public isExpiring() {
return this.getExpireTimer() && this.getExpirationStartTimestamp();
return this.getExpireTimerSeconds() && this.getExpirationStartTimestamp();
}
public isExpired() {
@ -1165,14 +1168,14 @@ export class MessageModel extends Backbone.Model<MessageAttributes> {
if (!start) {
return false;
}
const delta = this.getExpireTimer() * 1000;
const delta = this.getExpireTimerSeconds() * 1000;
const msFromNow = start + delta - now;
return msFromNow < 0;
}
public async setToExpire() {
if (this.isExpiring() && !this.getExpiresAt()) {
const start = this.getExpirationStartTimestamp();
const delta = this.getExpireTimer() * 1000;
const delta = this.getExpireTimerSeconds() * 1000;
if (!start) {
return;
}
@ -1364,7 +1367,11 @@ export class MessageModel extends Backbone.Model<MessageAttributes> {
return this.get('expirationType');
}
public getExpireTimer() {
/**
*
* @returns the expireTimer (in seconds) for this message
*/
public getExpireTimerSeconds() {
return this.get('expireTimer');
}

@ -708,7 +708,7 @@ async function applyConvoVolatileUpdateFromWrapper(
);
const messagesExpiringAfterRead = messagesExpiring.filter(
m => m.getExpirationType() === 'deleteAfterRead' && m.getExpireTimer() > 0
m => m.getExpirationType() === 'deleteAfterRead' && m.getExpireTimerSeconds() > 0
);
const messageIdsToFetchExpiriesFor = compact(messagesExpiringAfterRead.map(m => m.id));

@ -316,7 +316,7 @@ async function markConvoAsReadIfOutgoingMessage(
const sentAt = message.get('sent_at') || message.get('serverTimestamp');
if (sentAt) {
const expirationType = message.getExpirationType();
const expireTimer = message.getExpireTimer();
const expireTimer = message.getExpireTimerSeconds();
// NOTE starting disappearing messages timer for all outbound messages
if (
expirationType &&
@ -373,13 +373,13 @@ export async function handleMessageJob(
// NOTE we handle incoming disappear after send messages and sync messages here
if (
conversation &&
messageModel.getExpireTimer() > 0 &&
messageModel.getExpireTimerSeconds() > 0 &&
!messageModel.getExpirationStartTimestamp()
) {
const expirationMode = DisappearingMessages.changeToDisappearingConversationMode(
conversation,
messageModel.getExpirationType(),
messageModel.getExpireTimer()
messageModel.getExpireTimerSeconds()
);
// TODO legacy messages support will be removed in a future release

@ -394,7 +394,7 @@ async function checkForExpireUpdateInContentMessage(
*/
function checkForExpiringOutgoingMessage(message: MessageModel, location?: string) {
const convo = message.getConversation();
const expireTimer = message.getExpireTimer();
const expireTimer = message.getExpireTimerSeconds();
const expirationType = message.getExpirationType();
if (
@ -540,7 +540,7 @@ async function updateMessageExpiriesOnSwarm(messages: Array<MessageModel>) {
messages.forEach(msg => {
const hash = msg.getMessageHash();
const timestampStarted = msg.getExpirationStartTimestamp();
const timerSeconds = msg.getExpireTimer();
const timerSeconds = msg.getExpireTimerSeconds();
const disappearingType = msg.getExpirationType();
if (
!hash ||
@ -575,12 +575,12 @@ async function updateMessageExpiriesOnSwarm(messages: Array<MessageModel>) {
}
const newTTLms = m.updatedExpiryMs;
const realReadAt = newTTLms - message.getExpireTimer() * 1000;
const realReadAt = newTTLms - message.getExpireTimerSeconds() * 1000;
if (
newTTLms &&
(newTTLms !== message.getExpiresAt() ||
message.get('expirationStartTimestamp') !== realReadAt) &&
message.getExpireTimer()
message.getExpireTimerSeconds()
) {
window.log.debug(`updateMessageExpiriesOnSwarm: setting for msg hash ${m.messageHash}:`, {
expires_at: newTTLms,

@ -227,8 +227,8 @@ export class MessageQueue {
* @param message Message to be sent
*/
public async sendToPubKeyNonDurably({
message,
namespace,
message,
pubkey,
}: {
pubkey: PubKey;
@ -242,7 +242,10 @@ export class MessageQueue {
let rawMessage;
try {
rawMessage = await MessageUtils.toRawMessage(pubkey, message, namespace);
const { wrappedEnvelope, effectiveTimestamp } = await MessageSender.send(rawMessage);
const { wrappedEnvelope, effectiveTimestamp } = await MessageSender.send({
message: rawMessage,
isSyncMessage: false,
});
await MessageSentHandler.handleMessageSentSuccess(
rawMessage,
effectiveTimestamp,
@ -273,12 +276,10 @@ export class MessageQueue {
// We put the event handling inside this job to avoid sending duplicate events
const job = async () => {
try {
const { wrappedEnvelope, effectiveTimestamp } = await MessageSender.send(
const { wrappedEnvelope, effectiveTimestamp } = await MessageSender.send({
message,
undefined,
undefined,
isSyncMessage
);
isSyncMessage,
});
await MessageSentHandler.handleMessageSentSuccess(
message,
@ -332,7 +333,7 @@ export class MessageQueue {
// We allow a message for ourselves only if it's a ConfigurationMessage, a ClosedGroupNewMessage,
// or a message with a syncTarget set.
if (MessageSender.isSyncMessage(message)) {
if (MessageSender.isContentSyncMessage(message)) {
window?.log?.info('OutgoingMessageQueue: Processing sync message');
isSyncMessage = true;
} else {

@ -26,7 +26,6 @@ import { SnodeAPIStore } from '../apis/snode_api/storeMessage';
import { getConversationController } from '../conversations';
import { MessageEncrypter } from '../crypto';
import { addMessagePadding } from '../crypto/BufferPadding';
import { DisappearingMessages } from '../disappearing_messages';
import { ContentMessage } from '../messages/outgoing';
import { ConfigurationMessage } from '../messages/outgoing/controlMessage/ConfigurationMessage';
import { SharedConfigMessage } from '../messages/outgoing/controlMessage/SharedConfigMessage';
@ -36,6 +35,7 @@ import { OpenGroupVisibleMessage } from '../messages/outgoing/visibleMessage/Ope
import { ed25519Str } from '../onions/onionPath';
import { PubKey } from '../types';
import { RawMessage } from '../types/RawMessage';
import { UserUtils } from '../utils';
import { fromUInt8ArrayToBase64 } from '../utils/String';
import { EmptySwarmError } from '../utils/errors';
@ -76,7 +76,7 @@ function getMinRetryTimeout() {
return 1000;
}
function isSyncMessage(message: ContentMessage) {
function isContentSyncMessage(message: ContentMessage) {
if (
message instanceof ConfigurationMessage ||
message instanceof ClosedGroupNewMessage ||
@ -95,16 +95,20 @@ function isSyncMessage(message: ContentMessage) {
* @param message The message to send.
* @param attempts The amount of times to attempt sending. Minimum value is 1.
*/
async function send(
message: RawMessage,
attempts: number = 3,
retryMinTimeout?: number, // in ms
isASyncMessage?: boolean
): Promise<{ wrappedEnvelope: Uint8Array; effectiveTimestamp: number }> {
async function send({
message,
retryMinTimeout = 100,
attempts = 3,
isSyncMessage,
}: {
message: RawMessage;
attempts?: number;
retryMinTimeout?: number; // in ms
isSyncMessage: boolean;
}): Promise<{ wrappedEnvelope: Uint8Array; effectiveTimestamp: number }> {
return pRetry(
async () => {
const recipient = PubKey.cast(message.device);
const { ttl } = message;
// we can only have a single message in this send function for now
const [encryptedAndWrapped] = await encryptMessagesAndWrap([
@ -112,9 +116,9 @@ async function send(
destination: message.device,
plainTextBuffer: message.plainTextBuffer,
namespace: message.namespace,
ttl,
ttl: message.ttl,
identifier: message.identifier,
isSyncMessage: Boolean(isASyncMessage),
isSyncMessage: Boolean(isSyncMessage),
},
]);
@ -127,13 +131,30 @@ async function send(
found.set({ sent_at: encryptedAndWrapped.networkTimestamp });
await found.commit();
}
let foundMessage = encryptedAndWrapped.identifier
? await Data.getMessageById(encryptedAndWrapped.identifier)
: null;
const isSyncedDeleteAfterReadMessage =
found &&
UserUtils.isUsFromCache(recipient.key) &&
found.getExpirationType() === 'deleteAfterRead' &&
found.getExpireTimerSeconds() > 0 &&
encryptedAndWrapped.isSyncMessage;
let overridenTtl = encryptedAndWrapped.ttl;
if (isSyncedDeleteAfterReadMessage && found.getExpireTimerSeconds() > 0) {
const asMs = found.getExpireTimerSeconds() * 1000;
window.log.debug(`overriding ttl for synced DaR message to ${asMs}`);
overridenTtl = asMs;
}
const batchResult = await MessageSender.sendMessagesDataToSnode(
[
{
pubkey: recipient.key,
data64: encryptedAndWrapped.data64,
ttl,
ttl: overridenTtl,
timestamp: encryptedAndWrapped.networkTimestamp,
namespace: encryptedAndWrapped.namespace,
},
@ -158,44 +179,19 @@ async function send(
) {
// TODO: the expiration is due to be returned by the storage server on "store" soon, we will then be able to use it instead of doing the storedAt + ttl logic below
// if we have a hash and a storedAt, mark it as seen so we don't reprocess it on the next retrieve
await Data.saveSeenMessageHashes([{ expiresAt: storedAt + ttl, hash: storedHash }]);
await Data.saveSeenMessageHashes([
{ expiresAt: storedAt + encryptedAndWrapped.ttl, hash: storedHash },
]);
// If message also has a sync message, save that hash. Otherwise save the hash from the regular message send i.e. only closed groups in this case.
if (
encryptedAndWrapped.identifier &&
(encryptedAndWrapped.isSyncMessage || isDestinationClosedGroup)
) {
const foundMessage = await Data.getMessageById(encryptedAndWrapped.identifier);
// get a fresh copy of the message from the DB
foundMessage = await Data.getMessageById(encryptedAndWrapped.identifier);
if (foundMessage) {
await foundMessage.updateMessageHash(storedHash);
const convo = foundMessage.getConversation();
const expireTimer = foundMessage.getExpireTimer();
const expirationType = foundMessage.getExpirationType();
if (
convo &&
expirationType &&
expireTimer > 0 &&
// a message has started to disappear
foundMessage.getExpirationStartTimestamp()
) {
const expirationMode = DisappearingMessages.changeToDisappearingConversationMode(
convo,
expirationType,
expireTimer
);
const canBeDeleteAfterRead = !convo.isMe() && convo.isPrivate();
// TODO legacy messages support will be removed in a future release
if (
canBeDeleteAfterRead &&
(expirationMode === 'legacy' || expirationMode === 'deleteAfterRead')
) {
await DisappearingMessages.updateMessageExpiriesOnSwarm([foundMessage]);
}
}
await foundMessage.commit();
}
}
@ -399,7 +395,7 @@ async function sendMessagesToSnode(
namespace: m.namespace,
ttl: m.message.ttl(),
identifier: m.message.identifier,
isSyncMessage: MessageSender.isSyncMessage(m.message),
isSyncMessage: MessageSender.isContentSyncMessage(m.message),
}))
);
@ -581,5 +577,5 @@ export const MessageSender = {
getMinRetryTimeout,
sendToOpenGroupV2,
send,
isSyncMessage,
isContentSyncMessage,
};

@ -163,7 +163,7 @@ async function handleMessageSentFailure(
});
// Disappeared messages that fail to send should not disappear
if (fetchedMessage.getExpirationType() && fetchedMessage.getExpireTimer() > 0) {
if (fetchedMessage.getExpirationType() && fetchedMessage.getExpireTimerSeconds() > 0) {
fetchedMessage.set({
expirationStartTimestamp: undefined,
});

@ -5,7 +5,7 @@ export type RawMessage = {
identifier: string;
plainTextBuffer: Uint8Array;
device: string;
ttl: number;
ttl: number; // ttl is in millis
encryption: SignalService.Envelope.Type;
namespace: SnodeNamespaces | null; // allowing null as when we upgrade, we might have messages awaiting sending which won't have a namespace
};

@ -548,14 +548,17 @@ export async function USER_callRecipient(recipient: string) {
// initiating a call is analogous to sending a message request
await approveConvoAndSendResponse(recipient, true);
// we do it manually as the sendToPubkeyNonDurably rely on having a message saved to the db for MessageSentSuccess
// Note: we do the sending of the preoffer manually as the sendToPubkeyNonDurably rely on having a message saved to the db for MessageSentSuccess
// which is not the case for a pre offer message (the message only exists in memory)
const rawMessage = await MessageUtils.toRawMessage(
PubKey.cast(recipient),
preOfferMsg,
SnodeNamespaces.UserMessages
);
const { wrappedEnvelope } = await MessageSender.send(rawMessage);
const { wrappedEnvelope } = await MessageSender.send({
message: rawMessage,
isSyncMessage: false,
});
void PnServer.notifyPnServer(wrappedEnvelope, recipient);
await openMediaDevicesAndAddTracks();

@ -71,12 +71,12 @@ class FetchMsgExpirySwarmJob extends PersistedJob<FetchMsgExpirySwarmPersistedDa
if (!message) {
continue;
}
const realReadAt = expiry.fetchedExpiry - message.getExpireTimer() * 1000;
const realReadAt = expiry.fetchedExpiry - message.getExpireTimerSeconds() * 1000;
if (
(message.get('expirationStartTimestamp') !== realReadAt ||
message.get('expires_at') !== expiry.fetchedExpiry) &&
message.getExpireTimer()
message.getExpireTimerSeconds()
) {
window.log.debug(
`FetchMsgExpirySwarmJob: setting for msg hash ${message.getMessageHash()}:`,

@ -17,7 +17,7 @@ import { ContentMessage } from '../../../../session/messages/outgoing';
import { ClosedGroupMessage } from '../../../../session/messages/outgoing/controlMessage/group/ClosedGroupMessage';
import { MessageSender } from '../../../../session/sending';
import { MessageQueue } from '../../../../session/sending/MessageQueue';
import { PubKey, RawMessage } from '../../../../session/types';
import { PubKey } from '../../../../session/types';
import { GroupUtils, PromiseUtils, UserUtils } from '../../../../session/utils';
import { TestUtils } from '../../../test-utils';
import { PendingMessageCacheStub } from '../../../test-utils/stubs';
@ -44,12 +44,7 @@ describe('MessageQueue', () => {
let messageQueueStub: MessageQueue;
// Message Sender Stubs
let sendStub: sinon.SinonStub<[
RawMessage,
(number | undefined)?,
(number | undefined)?,
(boolean | undefined)?
]>;
let sendStub: sinon.SinonStub;
beforeEach(() => {
// Utils Stubs

@ -1,5 +1,5 @@
import * as crypto from 'crypto';
import { expect } from 'chai';
import * as crypto from 'crypto';
import _ from 'lodash';
import Sinon, * as sinon from 'sinon';
import { SignalService } from '../../../../protobuf';
@ -16,10 +16,10 @@ import { OnionV4 } from '../../../../session/onions/onionv4';
import { MessageSender } from '../../../../session/sending';
import { PubKey, RawMessage } from '../../../../session/types';
import { MessageUtils, UserUtils } from '../../../../session/utils';
import { fromBase64ToArrayBuffer } from '../../../../session/utils/String';
import { TestUtils } from '../../../test-utils';
import { stubCreateObjectUrl, stubData, stubUtilWorker } from '../../../test-utils/utils';
import { TEST_identityKeyPair } from '../crypto/MessageEncrypter_test';
import { fromBase64ToArrayBuffer } from '../../../../session/utils/String';
describe('MessageSender', () => {
afterEach(() => {
@ -68,27 +68,47 @@ describe('MessageSender', () => {
it('should not retry if an error occurred during encryption', async () => {
encryptStub.throws(new Error('Failed to encrypt.'));
const promise = MessageSender.send(rawMessage, 3, 10);
const promise = MessageSender.send({
message: rawMessage,
attempts: 3,
retryMinTimeout: 10,
isSyncMessage: false,
});
await expect(promise).is.rejectedWith('Failed to encrypt.');
expect(sessionMessageAPISendStub.callCount).to.equal(0);
});
it('should only call lokiMessageAPI once if no errors occured', async () => {
await MessageSender.send(rawMessage, 3, 10);
await MessageSender.send({
message: rawMessage,
attempts: 3,
retryMinTimeout: 10,
isSyncMessage: false,
});
expect(sessionMessageAPISendStub.callCount).to.equal(1);
});
it('should only retry the specified amount of times before throwing', async () => {
sessionMessageAPISendStub.throws(new Error('API error'));
const attempts = 2;
const promise = MessageSender.send(rawMessage, attempts, 10);
const promise = MessageSender.send({
message: rawMessage,
attempts,
retryMinTimeout: 10,
isSyncMessage: false,
});
await expect(promise).is.rejectedWith('API error');
expect(sessionMessageAPISendStub.callCount).to.equal(attempts);
});
it('should not throw error if successful send occurs within the retry limit', async () => {
sessionMessageAPISendStub.onFirstCall().throws(new Error('API error'));
await MessageSender.send(rawMessage, 3, 10);
await MessageSender.send({
message: rawMessage,
attempts: 3,
retryMinTimeout: 10,
isSyncMessage: false,
});
expect(sessionMessageAPISendStub.callCount).to.equal(2);
});
});
@ -114,7 +134,12 @@ describe('MessageSender', () => {
SnodeNamespaces.UserMessages
);
await MessageSender.send(rawMessage, 3, 10);
await MessageSender.send({
message: rawMessage,
attempts: 3,
retryMinTimeout: 10,
isSyncMessage: false,
});
const args = sessionMessageAPISendStub.getCall(0).args;
expect(args[1]).to.equal(device.key);
@ -140,7 +165,12 @@ describe('MessageSender', () => {
);
const offset = 200000;
Sinon.stub(GetNetworkTime, 'getLatestTimestampOffset').returns(offset);
await MessageSender.send(rawMessage, 3, 10);
await MessageSender.send({
message: rawMessage,
attempts: 3,
retryMinTimeout: 10,
isSyncMessage: false,
});
const firstArg = sessionMessageAPISendStub.getCall(0).args[0];
const { data64 } = firstArg[0];
@ -194,7 +224,12 @@ describe('MessageSender', () => {
visibleMessage,
SnodeNamespaces.UserMessages
);
await MessageSender.send(rawMessage, 3, 10);
await MessageSender.send({
message: rawMessage,
attempts: 3,
retryMinTimeout: 10,
isSyncMessage: false,
});
const firstArg = sessionMessageAPISendStub.getCall(0).args[0];
const { data64 } = firstArg[0];

Loading…
Cancel
Save