fix: randomly pick a snode topollfrom until we build a better way

pull/3080/head
Audric Ackermann 1 month ago
parent e58f88f1f0
commit 52ebcfdbab

@ -1,4 +1,4 @@
import { isArray, omit } from 'lodash';
import { isArray, omit, sortBy } from 'lodash';
import { Snode } from '../../../data/data';
import { updateIsOnline } from '../../../state/ducks/onion';
import { doSnodeBatchRequest } from './batchRequest';
@ -167,11 +167,18 @@ async function retrieveNextMessages(
GetNetworkTime.handleTimestampOffsetFromNetwork('retrieve', bodyFirstResult.t);
// merge results with their corresponding namespaces
return results.map((result, index) => ({
code: result.code,
messages: result.body as RetrieveMessagesResultsContent,
namespace: namespaces[index],
}));
return results.map((result, index) => {
const messages = result.body as RetrieveMessagesResultsContent;
// Not sure if that makes sense, but we probably want those messages sorted.
const sortedMessages = sortBy(messages.messages, m => m.timestamp);
messages.messages = sortedMessages;
return {
code: result.code,
messages,
namespace: namespaces[index],
};
});
} catch (e) {
window?.log?.warn('exception while parsing json of nextMessage:', e);
if (!window.inboxStore?.getState().onionPaths.isOnline) {

@ -1,7 +1,7 @@
/* eslint-disable no-await-in-loop */
/* eslint-disable more/no-then */
/* eslint-disable @typescript-eslint/no-misused-promises */
import { compact, concat, difference, flatten, last, sample, toNumber, uniqBy } from 'lodash';
import { compact, concat, flatten, last, sample, toNumber, uniqBy } from 'lodash';
import { Data, Snode } from '../../../data/data';
import { SignalService } from '../../../protobuf';
import * as Receiver from '../../../receiver/receiver';
@ -24,7 +24,6 @@ import { getConversationController } from '../../conversations';
import { IncomingMessage } from '../../messages/incoming/IncomingMessage';
import { ed25519Str } from '../../onions/onionPath';
import { StringUtils, UserUtils } from '../../utils';
import { perfEnd, perfStart } from '../../utils/Performance';
import { LibSessionUtil } from '../../utils/libsession/libsession_utils';
import { SnodeNamespace, SnodeNamespaces } from './namespaces';
import { SnodeAPIRetrieve } from './retrieveRequest';
@ -228,21 +227,16 @@ export class SwarmPolling {
namespaces: Array<SnodeNamespaces>
) {
const polledPubkey = pubkey.key;
let resultsFromAllNamespaces: RetrieveMessagesResultsBatched | null;
const swarmSnodes = await snodePool.getSwarmFor(polledPubkey);
// Select nodes for which we already have lastHashes
const alreadyPolled = swarmSnodes.filter((n: Snode) => this.lastHashes[n.pubkey_ed25519]);
let toPollFrom = alreadyPolled.length ? alreadyPolled[0] : null;
// If we need more nodes, select randomly from the remaining nodes:
if (!toPollFrom) {
const notPolled = difference(swarmSnodes, alreadyPolled);
toPollFrom = sample(notPolled) as Snode;
}
let resultsFromAllNamespaces: RetrieveMessagesResultsBatched | null;
let toPollFrom: Snode | undefined;
try {
toPollFrom = sample(swarmSnodes);
if (!toPollFrom) {
throw new Error(`pollOnceForKey: no snode in swarm for ${ed25519Str(polledPubkey)}`);
}
// Note: always print something so we know if the polling is hanging
window.log.info(
`about to pollNodeForKey of ${ed25519Str(pubkey.key)} from snode: ${ed25519Str(toPollFrom.pubkey_ed25519)} namespaces: ${namespaces} `
@ -337,9 +331,10 @@ export class SwarmPolling {
});
}
perfStart(`handleSeenMessages-${polledPubkey}`);
const newMessages = await this.handleSeenMessages(messages);
perfEnd(`handleSeenMessages-${polledPubkey}`, 'handleSeenMessages');
window.log.info(
`handleSeenMessages: ${newMessages.length} out of ${messages.length} are not seen yet. snode: ${toPollFrom ? ed25519Str(toPollFrom.pubkey_ed25519) : 'undefined'}`
);
// don't handle incoming messages from group swarms when using the userconfig and the group is not one of the tracked group
const isUserConfigReleaseLive = await ReleasedFeatures.checkIsUserConfigFeatureReleased();

@ -1,14 +1,18 @@
/* eslint-disable no-await-in-loop */
import { to_hex } from 'libsodium-wrappers-sumo';
import { compact, isArray, isEmpty, isNumber, isString } from 'lodash';
import { v4 } from 'uuid';
import { UserUtils } from '../..';
import { ConfigDumpData } from '../../../../data/configDump/configDump';
import { ConfigurationSyncJobDone } from '../../../../shims/events';
import { ReleasedFeatures } from '../../../../util/releaseFeature';
import { isSignInByLinking } from '../../../../util/storage';
import { GenericWrapperActions } from '../../../../webworker/workers/browser/libsession_worker_interface';
import { NotEmptyArrayOfBatchResults } from '../../../apis/snode_api/SnodeRequestTypes';
import { getConversationController } from '../../../conversations';
import { SharedConfigMessage } from '../../../messages/outgoing/controlMessage/SharedConfigMessage';
import { MessageSender } from '../../../sending/MessageSender';
import { allowOnlyOneAtATime } from '../../Promise';
import { LibSessionUtil, OutgoingConfResult } from '../../libsession/libsession_utils';
import { runners } from '../JobRunner';
import {
@ -17,9 +21,6 @@ import {
PersistedJob,
RunJobResult,
} from '../PersistedJob';
import { ReleasedFeatures } from '../../../../util/releaseFeature';
import { allowOnlyOneAtATime } from '../../Promise';
import { isSignInByLinking } from '../../../../util/storage';
const defaultMsBetweenRetries = 15000; // a long time between retries, to avoid running multiple jobs at the same time, when one was postponed at the same time as one already planned (5s)
const defaultMaxAttempts = 2;
@ -208,6 +209,29 @@ class ConfigurationSyncJob extends PersistedJob<ConfigurationSyncPersistedData>
};
});
if (window.sessionFeatureFlags.debug.debugLibsessionDumps) {
for (let index = 0; index < LibSessionUtil.requiredUserVariants.length; index++) {
const variant = LibSessionUtil.requiredUserVariants[index];
window.log.info(
`ConfigurationSyncJob: current dumps: ${variant}:`,
to_hex(await GenericWrapperActions.dump(variant))
);
}
window.log.info(
'ConfigurationSyncJob: About to push changes: ',
msgs.map(m => {
return {
...m,
message: {
...m.message,
data: to_hex(m.message.data),
},
};
})
);
}
const result = await MessageSender.sendMessagesToSnode(
msgs,
thisJobDestination,

Loading…
Cancel
Save