From 060175004f8948a55719966d42f44a91f35b6dcb Mon Sep 17 00:00:00 2001 From: Audric Ackermann Date: Tue, 21 Jan 2025 17:46:48 +1100 Subject: [PATCH] fix: retrieve from 2 snodes on every call to take care of a snode being out of sync --- ts/session/apis/snode_api/swarmPolling.ts | 80 ++++++++++++++++++----- ts/session/apis/snode_api/types.ts | 15 ++++- 2 files changed, 76 insertions(+), 19 deletions(-) diff --git a/ts/session/apis/snode_api/swarmPolling.ts b/ts/session/apis/snode_api/swarmPolling.ts index 189b84cdd..1b502d0c8 100644 --- a/ts/session/apis/snode_api/swarmPolling.ts +++ b/ts/session/apis/snode_api/swarmPolling.ts @@ -13,6 +13,7 @@ import { last, omit, sample, + sampleSize, toNumber, uniqBy, } from 'lodash'; @@ -53,12 +54,19 @@ import { RetrieveMessageItem, RetrieveMessageItemWithNamespace, RetrieveMessagesResultsBatched, - RetrieveRequestResult, + type RetrieveMessagesResultsMergedBatched, } from './types'; import { ConversationTypeEnum } from '../../../models/types'; import { Snode } from '../../../data/types'; const minMsgCountShouldRetry = 95; +/** + * We retrieve from multiple snodes at the same time, and merge their reported messages because it's easy + * for a snode to be out of sync. + * Sometimes, being out of sync means that we won't be able to retrieve a message at all (revoked_subaccount). + * We need a proper fix server side, but in the meantime, that's all we can do. + */ +const RETRIEVE_SNODES_COUNT = 2; function extractWebSocketContent( message: string, @@ -105,6 +113,33 @@ function entryToKey(entry: GroupPollingEntry) { return entry.pubkey.key; } +function mergeMultipleRetrieveResults( + results: RetrieveMessagesResultsBatched +): RetrieveMessagesResultsMergedBatched { + const mapped: Map> = new Map(); + for (let resultIndex = 0; resultIndex < results.length; resultIndex++) { + const result = results[resultIndex]; + if (!mapped.has(result.namespace)) { + mapped.set(result.namespace, new Map()); + } + if (result.messages.messages) { + for (let msgIndex = 0; msgIndex < result.messages.messages.length; msgIndex++) { + const msg = result.messages.messages[msgIndex]; + if (!mapped.get(result.namespace)!.has(msg.hash)) { + mapped.get(result.namespace)!.set(msg.hash, msg); + } + } + } + } + + // Convert the merged map back to an array + return Array.from(mapped.entries()).map(([namespace, messagesMap]) => ({ + code: 200, // Assuming success code, adjust as needed + namespace, + messages: { messages: Array.from(messagesMap.values()) }, + })); +} + export class SwarmPolling { private groupPolling: Array; @@ -414,27 +449,40 @@ export class SwarmPolling { public async pollOnceForKey([pubkey, type]: PollForUs | PollForLegacy | PollForGroup) { const namespaces = this.getNamespacesToPollFrom(type); const swarmSnodes = await SnodePool.getSwarmFor(pubkey); - let resultsFromAllNamespaces: RetrieveMessagesResultsBatched | null; + let resultsFromAllNamespaces: RetrieveMessagesResultsMergedBatched | null; - let toPollFrom: Snode | undefined; + let toPollFrom: Array = []; try { - toPollFrom = sample(swarmSnodes); + toPollFrom = sampleSize(swarmSnodes, RETRIEVE_SNODES_COUNT); - if (!toPollFrom) { + if (toPollFrom.length !== RETRIEVE_SNODES_COUNT) { throw new Error( - `SwarmPolling: pollOnceForKey: no snode in swarm for ${ed25519Str(pubkey)}` + `SwarmPolling: pollOnceForKey: not snodes in swarm for ${ed25519Str(pubkey)}. Expected to have at least ${RETRIEVE_SNODES_COUNT}.` ); } - // Note: always print something so we know if the polling is hanging - window.log.info( - `SwarmPolling: about to pollNodeForKey of ${ed25519Str(pubkey)} from snode: ${ed25519Str(toPollFrom.pubkey_ed25519)} namespaces: ${namespaces} ` - ); - resultsFromAllNamespaces = await this.pollNodeForKey(toPollFrom, pubkey, namespaces, type); - // Note: always print something so we know if the polling is hanging + const resultsFromAllSnodesSettled = await Promise.allSettled( + toPollFrom.map(async snode => { + // Note: always print something so we know if the polling is hanging + window.log.info( + `SwarmPolling: about to pollNodeForKey of ${ed25519Str(pubkey)} from snode: ${ed25519Str(snode.pubkey_ed25519)} namespaces: ${namespaces} ` + ); + const thisSnodeResults = await this.pollNodeForKey(snode, pubkey, namespaces, type); + // Note: always print something so we know if the polling is hanging + window.log.info( + `SwarmPolling: pollNodeForKey of ${ed25519Str(pubkey)} from snode: ${ed25519Str(snode.pubkey_ed25519)} namespaces: ${namespaces} returned: ${thisSnodeResults?.length}` + ); + return thisSnodeResults; + }) + ); window.log.info( - `SwarmPolling: pollNodeForKey of ${ed25519Str(pubkey)} from snode: ${ed25519Str(toPollFrom.pubkey_ed25519)} namespaces: ${namespaces} returned: ${resultsFromAllNamespaces?.length}` + `SwarmPolling: pollNodeForKey of ${ed25519Str(pubkey)} namespaces: ${namespaces} returned ${resultsFromAllSnodesSettled.filter(m => m.status === 'fulfilled')}/${RETRIEVE_SNODES_COUNT} fulfilled promises` + ); + resultsFromAllNamespaces = mergeMultipleRetrieveResults( + compact( + resultsFromAllSnodesSettled.filter(m => m.status === 'fulfilled').flatMap(m => m.value) + ) ); } catch (e) { window.log.warn( @@ -490,7 +538,7 @@ export class SwarmPolling { const newMessages = await this.handleSeenMessages(uniqOtherMsgs); window.log.info( - `SwarmPolling: handleSeenMessages: ${newMessages.length} out of ${uniqOtherMsgs.length} are not seen yet about pk:${ed25519Str(pubkey)} snode: ${toPollFrom ? ed25519Str(toPollFrom.pubkey_ed25519) : 'undefined'}` + `SwarmPolling: handleSeenMessages: ${newMessages.length} out of ${uniqOtherMsgs.length} are not seen yet about pk:${ed25519Str(pubkey)} snode: ${JSON.stringify(toPollFrom.map(m => ed25519Str(m.pubkey_ed25519)))}` ); if (type === ConversationTypeEnum.GROUPV2) { if (!PubKey.is03Pubkey(pubkey)) { @@ -975,7 +1023,7 @@ const retrieveItemSchema = z.object({ }); function retrieveItemWithNamespace( - results: Array + results: RetrieveMessagesResultsMergedBatched ): Array { return flatten( compact( @@ -996,7 +1044,7 @@ function retrieveItemWithNamespace( function filterMessagesPerTypeOfConvo( type: T, - retrieveResults: RetrieveMessagesResultsBatched + retrieveResults: RetrieveMessagesResultsMergedBatched ): { confMessages: Array | null; revokedMessages: Array | null; diff --git a/ts/session/apis/snode_api/types.ts b/ts/session/apis/snode_api/types.ts index 160648388..8e21b3e42 100644 --- a/ts/session/apis/snode_api/types.ts +++ b/ts/session/apis/snode_api/types.ts @@ -34,12 +34,21 @@ export type RetrieveMessagesResultsContent = { t: number; }; -export type RetrieveRequestResult = { +type RetrieveMessagesResultsContentMerged = Pick; + +type RetrieveRequestResult< + T extends RetrieveMessagesResultsContent | RetrieveMessagesResultsContentMerged, +> = { code: number; - messages: RetrieveMessagesResultsContent; + messages: T; namespace: SnodeNamespaces; }; -export type RetrieveMessagesResultsBatched = Array; +export type RetrieveMessagesResultsBatched = Array< + RetrieveRequestResult +>; +export type RetrieveMessagesResultsMergedBatched = Array< + RetrieveRequestResult +>; export type WithRevokeSubRequest = { revokeSubRequest?: SubaccountRevokeSubRequest;