fix: speed up expiration/deletion of messages by batching updates in UI

pull/2532/head
Audric Ackermann 3 years ago
parent ad22482274
commit 0cc7994c12

@ -8,7 +8,7 @@ import _ from 'lodash';
import { Data } from '../../../../data/data';
import { MessageRenderingProps } from '../../../../models/messageType';
import { getConversationController } from '../../../../session/conversations';
import { messageExpired } from '../../../../state/ducks/conversations';
import { messagesExpired } from '../../../../state/ducks/conversations';
import {
getGenericReadableMessageSelectorProps,
getIsMessageSelected,
@ -68,10 +68,12 @@ function useIsExpired(props: ExpiringProps) {
await Data.removeMessage(messageId);
if (convoId) {
dispatch(
messageExpired({
conversationKey: convoId,
messageId,
})
messagesExpired([
{
conversationKey: convoId,
messageId,
},
])
);
const convo = getConversationController().get(convoId);
convo?.updateLastMessage();

@ -139,7 +139,7 @@ export const Data = {
saveMessage,
saveMessages,
removeMessage,
_removeMessages,
removeMessagesByIds,
getMessageIdsFromServerIds,
getMessageById,
getMessageBySenderAndSentAt,
@ -390,9 +390,13 @@ async function removeMessage(id: string): Promise<void> {
}
}
// Note: this method will not clean up external files, just delete from SQL
async function _removeMessages(ids: Array<string>): Promise<void> {
await channels.removeMessage(ids);
/**
* Note: this method will not clean up external files, just delete from SQL.
* File are cleaned up on app start if they are not linked to any messages
*
*/
async function removeMessagesByIds(ids: Array<string>): Promise<void> {
await channels.removeMessagesByIds(ids);
}
async function getMessageIdsFromServerIds(
@ -630,7 +634,7 @@ async function removeAllMessagesInConversation(conversationId: string): Promise<
await Promise.all(messages.map(message => message.cleanup()));
// eslint-disable-next-line no-await-in-loop
await channels.removeMessage(ids);
await channels.removeMessagesByIds(ids);
} while (messages.length > 0);
}

@ -38,7 +38,7 @@ const channelsToMake = new Set([
'saveSeenMessageHashes',
'saveMessages',
'removeMessage',
'_removeMessages',
'removeMessagesByIds',
'getUnreadByConversation',
'markAllAsReadByConversationNoExpiration',
'getUnreadCountByConversation',

@ -212,8 +212,10 @@ async function start() {
);
window.log.info(`Cleanup: Found ${messagesForCleanup.length} messages for cleanup`);
const idsToCleanUp: Array<string> = [];
await Promise.all(
messagesForCleanup.map(async (message: MessageModel) => {
messagesForCleanup.map((message: MessageModel) => {
const sentAt = message.get('sent_at');
if (message.hasErrors()) {
@ -221,9 +223,12 @@ async function start() {
}
window.log.info(`Cleanup: Deleting unsent message ${sentAt}`);
await Data.removeMessage(message.id);
idsToCleanUp.push(message.id);
})
);
if (idsToCleanUp.length) {
await Data.removeMessagesByIds(idsToCleanUp);
}
window.log.info('Cleanup: complete');
window.log.info('listening for registration events');

@ -248,12 +248,6 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
await deleteExternalFilesOfConversation(this.attributes);
}
public async onExpired(_message: MessageModel) {
await this.updateLastMessage();
// removeMessage();
}
public getGroupAdmins(): Array<string> {
const groupAdmins = this.get('groupAdmins');
@ -1681,15 +1675,17 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
return this.get('type') === ConversationTypeEnum.GROUP;
}
public async removeMessage(messageId: any) {
public async removeMessage(messageId: string) {
await Data.removeMessage(messageId);
this.updateLastMessage();
window.inboxStore?.dispatch(
conversationActions.messageDeleted({
conversationKey: this.id,
messageId,
})
conversationActions.messagesDeleted([
{
conversationKey: this.id,
messageId,
},
])
);
}

@ -125,7 +125,6 @@ export class MessageModel extends Backbone.Model<MessageAttributes> {
throw new Error('A message always needs to have an conversationId.');
}
// this.on('expired', this.onExpired);
if (!attributes.skipTimerInit) {
void this.setToExpire();
}

@ -945,21 +945,44 @@ function saveMessages(arrayOfMessages: Array<any>) {
}
function removeMessage(id: string, instance?: BetterSqlite3.Database) {
if (!Array.isArray(id)) {
assertGlobalInstanceOrInstance(instance)
.prepare(`DELETE FROM ${MESSAGES_TABLE} WHERE id = $id;`)
.run({ id });
if (!isString(id)) {
throw new Error('removeMessage: only takes single message to delete!');
return;
}
if (!id.length) {
throw new Error('removeMessages: No ids to delete!');
assertGlobalInstanceOrInstance(instance)
.prepare(`DELETE FROM ${MESSAGES_TABLE} WHERE id = $id;`)
.run({ id });
}
function removeMessagesByIds(ids: Array<string>, instance?: BetterSqlite3.Database) {
if (!Array.isArray(ids)) {
throw new Error('removeMessagesByIds only allowed an array of strings');
}
if (!ids.length) {
throw new Error('removeMessagesByIds: No ids to delete!');
}
// Our node interface doesn't seem to allow you to replace one single ? with an array
assertGlobalInstanceOrInstance(instance)
.prepare(`DELETE FROM ${MESSAGES_TABLE} WHERE id IN ( ${id.map(() => '?').join(', ')} );`)
.run(id);
.prepare(`DELETE FROM ${MESSAGES_TABLE} WHERE id IN ( ${ids.map(() => '?').join(', ')} );`)
.run(ids);
}
function removeAllMessagesInConversation(
conversationId: string,
instance?: BetterSqlite3.Database
) {
if (!conversationId) {
return;
}
// Our node interface doesn't seem to allow you to replace one single ? with an array
assertGlobalInstanceOrInstance(instance)
.prepare(`DELETE FROM ${MESSAGES_TABLE} WHERE conversationId = $conversationId`)
.run({ conversationId });
}
function getMessageIdsFromServerIds(serverIds: Array<string | number>, conversationId: string) {
@ -2435,6 +2458,8 @@ export const sqlNode = {
updateLastHash,
saveMessages,
removeMessage,
removeMessagesByIds,
removeAllMessagesInConversation,
getUnreadByConversation,
markAllAsReadByConversationNoExpiration,
getUnreadCountByConversation,

@ -1,4 +1,4 @@
import _, { compact, isArray, isNumber, isObject, pick } from 'lodash';
import _, { compact, isArray, isEmpty, isNumber, isObject, pick } from 'lodash';
import { OpenGroupData } from '../../../../data/opengroups';
import { handleOpenGroupV4Message } from '../../../../receiver/opengroup';
import { OpenGroupRequestCommonType } from '../opengroupV2/ApiUtil';
@ -35,6 +35,7 @@ import { ConversationTypeEnum } from '../../../../models/conversationAttributes'
import { createSwarmMessageSentFromUs } from '../../../../models/messageFactory';
import { Data } from '../../../../data/data';
import { processMessagesUsingCache } from './sogsV3MutationCache';
import { destroyMessagesAndUpdateRedux } from '../../../../util/expiringMessages';
/**
* Get the convo matching those criteria and make sure it is an opengroup convo, or return null.
@ -154,6 +155,7 @@ async function filterOutMessagesInvalidSignature(
return signaturesValidMessages;
}
let totalDeletedMessages = 0;
const handleSogsV3DeletedMessages = async (
messages: Array<OpenGroupMessageV4>,
serverUrl: string,
@ -164,29 +166,38 @@ const handleSogsV3DeletedMessages = async (
if (!deletions.length) {
return messages;
}
totalDeletedMessages += deletions.length;
console.warn(
JSON.stringify({
totalDeletedMessages,
})
);
const allIdsRemoved = deletions.map(m => m.id);
try {
const convoId = getOpenGroupV2ConversationId(serverUrl, roomId);
const convo = getConversationController().get(convoId);
const messageIds = await Data.getMessageIdsFromServerIds(allIdsRemoved, convo.id);
// we shouldn't get too many messages to delete at a time, so no need to add a function to remove multiple messages for now
await Promise.all(
(messageIds || []).map(async id => {
if (convo) {
await convo.removeMessage(id);
}
await Data.removeMessage(id);
})
);
if (messageIds && messageIds.length) {
await destroyMessagesAndUpdateRedux(
messageIds.map(messageId => ({
conversationKey: convoId,
messageId,
}))
);
}
} catch (e) {
window?.log?.warn('handleDeletions failed:', e);
}
return exceptDeletion;
};
// tslint:disable-next-line: cyclomatic-complexity
// tslint:disable-next-line: one-variable-per-declaration
let totalEmptyReactions = 0,
totalMessagesWithResolvedBlindedIdsIfFound = 0,
totalMessageReactions = 0;
// tslint:disable-next-line: max-func-body-length cyclomatic-complexity
const handleMessagesResponseV4 = async (
messages: Array<OpenGroupMessageV4>,
serverUrl: string,
@ -284,6 +295,9 @@ const handleMessagesResponseV4 = async (
const incomingMessageSeqNo = compact(messages.map(n => n.seqno));
const maxNewMessageSeqNo = Math.max(...incomingMessageSeqNo);
totalMessagesWithResolvedBlindedIdsIfFound += messagesWithResolvedBlindedIdsIfFound.length;
for (let index = 0; index < messagesWithResolvedBlindedIdsIfFound.length; index++) {
const msgToHandle = messagesWithResolvedBlindedIdsIfFound[index];
try {
@ -309,6 +323,18 @@ const handleMessagesResponseV4 = async (
await OpenGroupData.saveV2OpenGroupRoom(roomInfosRefreshed);
const messagesWithReactions = messages.filter(m => m.reactions !== undefined);
const messagesWithEmptyReactions = messagesWithReactions.filter(m => isEmpty(m.reactions));
totalMessageReactions += messagesWithReactions.length;
totalEmptyReactions += messagesWithEmptyReactions.length;
console.warn(
JSON.stringify({
totalMessagesWithResolvedBlindedIdsIfFound,
totalMessageReactions,
totalEmptyReactions,
})
);
if (messagesWithReactions.length > 0) {
const conversationId = getOpenGroupV2ConversationId(serverUrl, roomId);
const groupConvo = getConversationController().get(conversationId);
@ -526,6 +552,7 @@ export const handleBatchPollResults = async (
break;
case 'pollInfo':
await handlePollInfoResponse(subResponse.code, subResponse.body, serverUrl);
break;
case 'inbox':
await handleInboxOutboxMessages(subResponse.body, serverUrl, false);

@ -241,7 +241,8 @@ const makeBatchRequestPayload = (
method: 'GET',
path: isNumber(options.messages.sinceSeqNo)
? `/room/${options.messages.roomId}/messages/since/${options.messages.sinceSeqNo}?t=r&reactors=${Reactions.SOGSReactorsFetchCount}`
: `/room/${options.messages.roomId}/messages/recent?reactors=${Reactions.SOGSReactorsFetchCount}`,
: // : `/room/${options.messages.roomId}/messages/since/180000?t=r&reactors=${Reactions.SOGSReactorsFetchCount}`,
`/room/${options.messages.roomId}/messages/recent?reactors=${Reactions.SOGSReactorsFetchCount}`,
};
}
break;

@ -504,12 +504,12 @@ function handleMessagesChangedOrAdded(
function handleMessageExpiredOrDeleted(
state: ConversationsStateType,
action: PayloadAction<{
payload: {
messageId: string;
conversationKey: string;
}>
): ConversationsStateType {
const { conversationKey, messageId } = action.payload;
}
) {
const { conversationKey, messageId } = payload;
if (conversationKey === state.selectedConversation) {
// search if we find this message id.
// we might have not loaded yet, so this case might not happen
@ -539,6 +539,23 @@ function handleMessageExpiredOrDeleted(
return state;
}
function handleMessagesExpiredOrDeleted(
state: ConversationsStateType,
action: PayloadAction<
Array<{
messageId: string;
conversationKey: string;
}>
>
): ConversationsStateType {
action.payload.forEach(element => {
// tslint:disable-next-line: no-parameter-reassignment
state = handleMessageExpiredOrDeleted(state, element);
});
return state;
}
function handleConversationReset(state: ConversationsStateType, action: PayloadAction<string>) {
const conversationKey = action.payload;
if (conversationKey === state.selectedConversation) {
@ -670,24 +687,28 @@ const conversationsSlice = createSlice({
return handleMessagesChangedOrAdded(state, action.payload);
},
messageExpired(
messagesExpired(
state: ConversationsStateType,
action: PayloadAction<{
messageId: string;
conversationKey: string;
}>
action: PayloadAction<
Array<{
messageId: string;
conversationKey: string;
}>
>
) {
return handleMessageExpiredOrDeleted(state, action);
return handleMessagesExpiredOrDeleted(state, action);
},
messageDeleted(
messagesDeleted(
state: ConversationsStateType,
action: PayloadAction<{
messageId: string;
conversationKey: string;
}>
action: PayloadAction<
Array<{
messageId: string;
conversationKey: string;
}>
>
) {
return handleMessageExpiredOrDeleted(state, action);
return handleMessagesExpiredOrDeleted(state, action);
},
conversationReset(state: ConversationsStateType, action: PayloadAction<string>) {
@ -973,8 +994,8 @@ export const {
conversationsChanged,
conversationRemoved,
removeAllConversations,
messageExpired,
messageDeleted,
messagesExpired,
messagesDeleted,
conversationReset,
messagesChanged,
resetOldTopMessageId,

@ -1,42 +1,61 @@
import _ from 'lodash';
import { throttle, uniq } from 'lodash';
import moment from 'moment';
import { MessageModel } from '../models/message';
import { messageExpired } from '../state/ducks/conversations';
import { messagesExpired } from '../state/ducks/conversations';
import { TimerOptionsArray } from '../state/ducks/timerOptions';
import { LocalizerKeys } from '../types/LocalizerKeys';
import { initWallClockListener } from './wallClockListener';
import { Data } from '../data/data';
import { getConversationController } from '../session/conversations';
export async function destroyMessagesAndUpdateRedux(
messages: Array<{
conversationKey: string;
messageId: string;
}>
) {
if (!messages.length) {
return;
}
const conversationWithChanges = uniq(messages.map(m => m.conversationKey));
try {
// Delete all thoses messages in a single sql call
await Data.removeMessagesByIds(messages.map(m => m.messageId));
} catch (e) {
window.log.error('destroyMessages: removeMessagesByIds failed', e && e.message ? e.message : e);
}
// trigger a redux update if needed for all those messages
window.inboxStore?.dispatch(messagesExpired(messages));
// trigger a refresh the last message for all those uniq conversation
conversationWithChanges.map(convoIdToUpdate => {
getConversationController()
.get(convoIdToUpdate)
?.updateLastMessage();
});
}
async function destroyExpiredMessages() {
try {
window.log.info('destroyExpiredMessages: Loading messages...');
const messages = await Data.getExpiredMessages();
await Promise.all(
messages.map(async (message: MessageModel) => {
window.log.info('Message expired', {
sentAt: message.get('sent_at'),
});
// We delete after the trigger to allow the conversation time to process
// the expiration before the message is removed from the database.
await Data.removeMessage(message.id);
// trigger the expiration of the message on the redux itself.
window.inboxStore?.dispatch(
messageExpired({
conversationKey: message.attributes.conversationId,
messageId: message.id,
})
);
const conversation = message.getConversation();
if (conversation) {
await conversation.onExpired(message);
}
})
);
const messagesExpiredDetails: Array<{
conversationKey: string;
messageId: string;
}> = messages.map(m => ({
conversationKey: m.attributes.conversationId,
messageId: m.id,
}));
messages.map(expired => {
window.log.info('Message expired', {
sentAt: expired.get('sent_at'),
});
});
await destroyMessagesAndUpdateRedux(messagesExpiredDetails);
} catch (error) {
window.log.error(
'destroyExpiredMessages: Error deleting expired messages',
@ -81,7 +100,7 @@ async function checkExpiringMessages() {
}
timeout = global.setTimeout(destroyExpiredMessages, wait);
}
const throttledCheckExpiringMessages = _.throttle(checkExpiringMessages, 1000);
const throttledCheckExpiringMessages = throttle(checkExpiringMessages, 1000);
let isInit = false;

Loading…
Cancel
Save