fix: make the ConfSyncJob fetch be per destination

pull/2620/head
Audric Ackermann 2 years ago
parent 554b445a3e
commit c623e2e49e

@ -183,7 +183,7 @@ async function send(
async function sendMessagesDataToSnode( async function sendMessagesDataToSnode(
params: Array<StoreOnNodeParamsNoSig>, params: Array<StoreOnNodeParamsNoSig>,
destination: string, destination: string,
oldMessageHashes: Set<string> | null messagesHashesToDelete: Set<string> | null
): Promise<NotEmptyArrayOfBatchResults> { ): Promise<NotEmptyArrayOfBatchResults> {
const rightDestination = params.filter(m => m.pubkey === destination); const rightDestination = params.filter(m => m.pubkey === destination);
const swarm = await getSwarmFor(destination); const swarm = await getSwarmFor(destination);
@ -215,10 +215,10 @@ async function sendMessagesDataToSnode(
); );
const signedDeleteOldHashesRequest = const signedDeleteOldHashesRequest =
oldMessageHashes && oldMessageHashes.size messagesHashesToDelete && messagesHashesToDelete.size
? await SnodeSignature.getSnodeSignatureByHashesParams({ ? await SnodeSignature.getSnodeSignatureByHashesParams({
method: 'delete' as const, method: 'delete' as const,
messages: [...oldMessageHashes], messages: [...messagesHashesToDelete],
pubkey: destination, pubkey: destination,
}) })
: null; : null;
@ -354,7 +354,7 @@ async function encryptMessagesAndWrap(
async function sendMessagesToSnode( async function sendMessagesToSnode(
params: Array<StoreOnNodeMessage>, params: Array<StoreOnNodeMessage>,
destination: string, destination: string,
oldMessageHashes: Set<string> | null messagesHashesToDelete: Set<string> | null
): Promise<NotEmptyArrayOfBatchResults | null> { ): Promise<NotEmptyArrayOfBatchResults | null> {
try { try {
const recipient = PubKey.cast(destination); const recipient = PubKey.cast(destination);
@ -397,7 +397,7 @@ async function sendMessagesToSnode(
namespace: wrapped.namespace, namespace: wrapped.namespace,
})), })),
recipient.key, recipient.key,
oldMessageHashes messagesHashesToDelete
); );
}, },
{ {

@ -1,4 +1,4 @@
import { compact, groupBy, isArray, isEmpty, isNumber, isString } from 'lodash'; import { compact, isArray, isEmpty, isNumber, isString } from 'lodash';
import { v4 } from 'uuid'; import { v4 } from 'uuid';
import { UserUtils } from '../..'; import { UserUtils } from '../..';
import { ConfigDumpData } from '../../../../data/configDump/configDump'; import { ConfigDumpData } from '../../../../data/configDump/configDump';
@ -27,14 +27,12 @@ const defaultMaxAttempts = 3;
let lastRunConfigSyncJobTimestamp: number | null = null; let lastRunConfigSyncJobTimestamp: number | null = null;
export type SingleDestinationChanges = { export type SingleDestinationChanges = {
destination: string;
messages: Array<OutgoingConfResult>; messages: Array<OutgoingConfResult>;
allOldHashes: Array<string>; allOldHashes: Array<string>;
}; };
type SuccessfulChange = { type SuccessfulChange = {
message: SharedConfigMessage; message: SharedConfigMessage;
publicKey: string;
updatedHash: string; updatedHash: string;
}; };
@ -42,33 +40,22 @@ type SuccessfulChange = {
* Later in the syncing logic, we want to batch-send all the updates for a pubkey in a single batch call. * Later in the syncing logic, we want to batch-send all the updates for a pubkey in a single batch call.
* To make this easier, this function prebuilds and merges together all the changes for each pubkey. * To make this easier, this function prebuilds and merges together all the changes for each pubkey.
*/ */
async function retrieveSingleDestinationChanges(): Promise<Array<SingleDestinationChanges>> { async function retrieveSingleDestinationChanges(
const outgoingConfResults = await LibSessionUtil.pendingChangesForPubkey( destination: string
UserUtils.getOurPubKeyStrFromCache() ): Promise<SingleDestinationChanges> {
); const outgoingConfResults = await LibSessionUtil.pendingChangesForPubkey(destination);
const groupedByDestination = groupBy(outgoingConfResults, m => m.destination);
const singleDestChanges: Array<SingleDestinationChanges> = Object.keys(groupedByDestination).map(
destination => {
const messages = groupedByDestination[destination];
// the delete hashes sub request can be done accross namespaces, so we can do a single one of it with all the hashes to remove (per pubkey)
const hashes = compact(messages.map(m => m.oldMessageHashes)).flat();
return { allOldHashes: hashes, destination, messages }; const compactedHashes = compact(outgoingConfResults.map(m => m.oldMessageHashes)).flat();
}
);
return singleDestChanges; return { messages: outgoingConfResults, allOldHashes: compactedHashes };
} }
/** /**
* This function is run once we get the results from the multiple batch-send. * This function is run once we get the results from the multiple batch-send.
* For each results, it checks wha
*/ */
function resultsToSuccessfulChange( function resultsToSuccessfulChange(
allResults: Array<PromiseSettledResult<NotEmptyArrayOfBatchResults | null>>, result: NotEmptyArrayOfBatchResults | null,
requests: Array<SingleDestinationChanges> request: SingleDestinationChanges
): Array<SuccessfulChange> { ): Array<SuccessfulChange> {
const successfulChanges: Array<SuccessfulChange> = []; const successfulChanges: Array<SuccessfulChange> = [];
@ -82,58 +69,44 @@ function resultsToSuccessfulChange(
*/ */
try { try {
for (let i = 0; i < allResults.length; i++) { if (!result?.length) {
const result = allResults[i]; return successfulChanges;
// the batch send was rejected. Let's skip handling those results altogether. Another job will handle the retry logic.
if (result.status !== 'fulfilled') {
continue;
}
const resultValue = result.value;
if (!resultValue) {
continue;
}
const request = requests?.[i];
if (!result) {
continue;
} }
for (let j = 0; j < resultValue.length; j++) { for (let j = 0; j < result.length; j++) {
const batchResult = resultValue[j]; const batchResult = result[j];
const messagePostedHashes = batchResult?.body?.hash; const messagePostedHashes = batchResult?.body?.hash;
if ( if (
batchResult.code === 200 && batchResult.code === 200 &&
isString(messagePostedHashes) && isString(messagePostedHashes) &&
request.messages?.[j].message && request.messages?.[j].message
request.destination
) { ) {
// the library keeps track of the hashes to push and pushed using the hashes now // the library keeps track of the hashes to push and pushed using the hashes now
successfulChanges.push({ successfulChanges.push({
publicKey: request.destination,
updatedHash: messagePostedHashes, updatedHash: messagePostedHashes,
message: request.messages?.[j].message, message: request.messages?.[j].message,
}); });
} }
} }
}
return successfulChanges;
} catch (e) { } catch (e) {
throw e; throw e;
} }
return successfulChanges;
} }
async function buildAndSaveDumpsToDB(changes: Array<SuccessfulChange>): Promise<void> { async function buildAndSaveDumpsToDB(
changes: Array<SuccessfulChange>,
destination: string
): Promise<void> {
for (let i = 0; i < changes.length; i++) { for (let i = 0; i < changes.length; i++) {
const change = changes[i]; const change = changes[i];
const variant = LibSessionUtil.kindToVariant(change.message.kind); const variant = LibSessionUtil.kindToVariant(change.message.kind);
const needsDump = await LibSessionUtil.markAsPushed( const needsDump = await LibSessionUtil.markAsPushed(
variant, variant,
change.publicKey, destination,
change.message.seqno.toNumber(), change.message.seqno.toNumber(),
change.updatedHash change.updatedHash
); );
@ -144,7 +117,7 @@ async function buildAndSaveDumpsToDB(changes: Array<SuccessfulChange>): Promise<
const dump = await GenericWrapperActions.dump(variant); const dump = await GenericWrapperActions.dump(variant);
await ConfigDumpData.saveConfigDump({ await ConfigDumpData.saveConfigDump({
data: dump, data: dump,
publicKey: change.publicKey, publicKey: destination,
variant, variant,
}); });
} }
@ -207,44 +180,51 @@ class ConfigurationSyncJob extends PersistedJob<ConfigurationSyncPersistedData>
} }
} }
const singleDestChanges = await retrieveSingleDestinationChanges(); // TODO add a way to have a few configuration sync jobs running at the same time, but only a single one per pubkey
const thisJobDestination = us;
const singleDestChanges = await retrieveSingleDestinationChanges(thisJobDestination);
// If there are no pending changes then the job can just complete (next time something // If there are no pending changes then the job can just complete (next time something
// is updated we want to try and run immediately so don't scuedule another run in this case) // is updated we want to try and run immediately so don't scuedule another run in this case)
if (isEmpty(singleDestChanges?.messages)) {
if (isEmpty(singleDestChanges)) {
return RunJobResult.Success; return RunJobResult.Success;
} }
const oldHashesToDelete = new Set(singleDestChanges.allOldHashes);
const allResults = await Promise.allSettled( const msgs = singleDestChanges.messages.map(item => {
singleDestChanges.map(async dest => {
const msgs = dest.messages.map(item => {
return { return {
namespace: item.namespace, namespace: item.namespace,
pubkey: item.destination, pubkey: thisJobDestination,
timestamp: item.message.timestamp, timestamp: item.message.timestamp,
ttl: item.message.ttl(), ttl: item.message.ttl(),
message: item.message, message: item.message,
}; };
}); });
const asSet = new Set(dest.allOldHashes);
return MessageSender.sendMessagesToSnode(msgs, dest.destination, asSet); const result = await MessageSender.sendMessagesToSnode(
}) msgs,
thisJobDestination,
oldHashesToDelete
); );
const expectedReplyLength =
singleDestChanges.messages.length + (oldHashesToDelete.size ? 1 : 0);
// we do a sequence call here. If we do not have the right expected number of results, consider it a failure // we do a sequence call here. If we do not have the right expected number of results, consider it a failure
if (!isArray(allResults) || allResults.length !== singleDestChanges.length) { if (!isArray(result) || result.length !== expectedReplyLength) {
window.log.info(
`ConfigurationSyncJob: unexpected result length: expected ${expectedReplyLength} but got ${result?.length}`
);
return RunJobResult.RetryJobIfPossible; return RunJobResult.RetryJobIfPossible;
} }
const changes = resultsToSuccessfulChange(allResults, singleDestChanges); const changes = resultsToSuccessfulChange(result, singleDestChanges);
if (isEmpty(changes)) { if (isEmpty(changes)) {
return RunJobResult.RetryJobIfPossible; return RunJobResult.RetryJobIfPossible;
} }
// Now that we have the successful changes, we need to mark them as pushed and // Now that we have the successful changes, we need to mark them as pushed and
// generate any config dumps which need to be stored // generate any config dumps which need to be stored
await buildAndSaveDumpsToDB(changes); await buildAndSaveDumpsToDB(changes, thisJobDestination);
return RunJobResult.Success; return RunJobResult.Success;
} catch (e) { } catch (e) {
throw e; throw e;
@ -300,7 +280,7 @@ async function queueNewJobIfNeeded() {
); );
window.log.debug('Scheduling ConfSyncJob: ASAP'); window.log.debug('Scheduling ConfSyncJob: ASAP');
} else { } else {
// if we did run at t=100, and it is currently t=110, diff is 10 // if we did run at t=100, and it is currently t=110, the difference is 10
const diff = Math.max(Date.now() - lastRunConfigSyncJobTimestamp, 0); const diff = Math.max(Date.now() - lastRunConfigSyncJobTimestamp, 0);
// but we want to run every 30, so what we need is actually `30-10` from now = 20 // but we want to run every 30, so what we need is actually `30-10` from now = 20
const leftBeforeNextTick = Math.max(defaultMsBetweenRetries - diff, 0); const leftBeforeNextTick = Math.max(defaultMsBetweenRetries - diff, 0);

@ -35,7 +35,6 @@ export type IncomingConfResult = {
export type OutgoingConfResult = { export type OutgoingConfResult = {
message: SharedConfigMessage; message: SharedConfigMessage;
namespace: SnodeNamespaces; namespace: SnodeNamespaces;
destination: string;
oldMessageHashes: Array<string>; oldMessageHashes: Array<string>;
}; };
@ -121,7 +120,6 @@ async function pendingChangesForPubkey(pubkey: string): Promise<Array<OutgoingCo
const kind = variantToKind(variant); const kind = variantToKind(variant);
const namespace = await GenericWrapperActions.storageNamespace(variant); const namespace = await GenericWrapperActions.storageNamespace(variant);
results.push({ results.push({
destination: pubkey,
message: new SharedConfigMessage({ message: new SharedConfigMessage({
data, data,
kind, kind,

Loading…
Cancel
Save