loki_primitives refactor, pass swarmPool into _openRetrieveConnection, _openSendConnection now returns the snode it used, refreshSendingSwarm() now uses lokiSnodeAPI.refreshSwarmNodesForPubKey, bump MAX_ACCEPTABLE_FAILURES from 1 to 10 to make sure we retry

pull/1061/head
Ryan Tharp 5 years ago
parent bee436ebbb
commit 53a624ff1d

@ -4,15 +4,10 @@
const _ = require('lodash');
const { lokiRpc } = require('./loki_rpc');
const primitives = require('./loki_primitives');
const DEFAULT_CONNECTIONS = 3;
const MAX_ACCEPTABLE_FAILURES = 1;
function sleepFor(time) {
return new Promise(resolve => {
setTimeout(() => resolve(), time);
});
}
const MAX_ACCEPTABLE_FAILURES = 10;
const filterIncomingMessages = async messages => {
const incomingHashes = messages.map(m => m.hash);
@ -113,26 +108,10 @@ class LokiMessageAPI {
promises.push(connectionPromise);
}
// Taken from https://stackoverflow.com/questions/51160260/clean-way-to-wait-for-first-true-returned-by-promise
// The promise returned by this function will resolve true when the first promise
// in ps resolves true *or* it will resolve false when all of ps resolve false
const firstTrue = ps => {
const newPs = ps.map(
p =>
new Promise(
// eslint-disable-next-line more/no-then
(resolve, reject) => p.then(v => v && resolve(true), reject)
)
);
// eslint-disable-next-line more/no-then
newPs.push(Promise.all(ps).then(() => false));
return Promise.race(newPs);
};
let success;
try {
// eslint-disable-next-line more/no-then
success = await firstTrue(promises);
success = await primitives.firstTrue(promises);
} catch (e) {
if (e instanceof textsecure.WrongDifficultyError) {
// Force nonce recalculation
@ -157,21 +136,23 @@ class LokiMessageAPI {
}
async refreshSendingSwarm(pubKey, timestamp) {
const freshNodes = await lokiSnodeAPI.getFreshSwarmNodes(pubKey);
await lokiSnodeAPI.updateSwarmNodes(pubKey, freshNodes);
const freshNodes = await lokiSnodeAPI.refreshSwarmNodesForPubKey(this.ourKey);
this.sendingData[timestamp].swarm = freshNodes;
this.sendingData[timestamp].hasFreshList = true;
return true;
}
async _openSendConnection(params) {
// timestamp is likely the current second...
while (!_.isEmpty(this.sendingData[params.timestamp].swarm)) {
const snode = this.sendingData[params.timestamp].swarm.shift();
// TODO: Revert back to using snode address instead of IP
const successfulSend = await this._sendToNode(snode, params);
if (successfulSend) {
return true;
return snode;
}
// should we mark snode as bad if it can't store our message?
}
if (!this.sendingData[params.timestamp].hasFreshList) {
@ -194,7 +175,13 @@ class LokiMessageAPI {
async _sendToNode(targetNode, params) {
let successiveFailures = 0;
while (successiveFailures < MAX_ACCEPTABLE_FAILURES) {
await sleepFor(successiveFailures * 500);
// the higher this is, the longer the user delay is
// we don't want to burn through all our retries quickly
// we need to give the node a chance to heal
// also failed the user quickly, just means they pound the retry faster
// this favors a lot more retries and lower delays
// but that may chew up the bandwidth...
await primitives.sleepFor(successiveFailures * 500);
try {
const result = await lokiRpc(
`https://${targetNode.ip}`,
@ -208,10 +195,9 @@ class LokiMessageAPI {
// do not return true if we get false here...
if (result === false) {
// this means the node we asked for is likely down
log.warn(
`loki_message:::_sendToNode - Got false from ${targetNode.ip}:${
targetNode.port
}`
`loki_message:::_sendToNode - Try #${successiveFailures}/${MAX_ACCEPTABLE_FAILURES} ${targetNode.ip}:${targetNode.port} failed`
);
successiveFailures += 1;
// eslint-disable-next-line no-continue
@ -273,7 +259,7 @@ class LokiMessageAPI {
return false;
}
async _openRetrieveConnection(stopPollingPromise, callback) {
async _openRetrieveConnection(swarmPool, stopPollingPromise, callback) {
let stopPollingResult = false;
// When message_receiver restarts from onoffline/ononline events it closes
@ -285,10 +271,10 @@ class LokiMessageAPI {
stopPollingResult = result;
});
while (!stopPollingResult && !_.isEmpty(this.ourSwarmNodes)) {
const address = Object.keys(this.ourSwarmNodes)[0];
const nodeData = this.ourSwarmNodes[address];
delete this.ourSwarmNodes[address];
while (!stopPollingResult && !_.isEmpty(swarmPool)) {
const address = Object.keys(swarmPool)[0]; // X.snode hostname
const nodeData = swarmPool[address];
delete swarmPool[address];
let successiveFailures = 0;
while (
!stopPollingResult &&
@ -300,6 +286,7 @@ class LokiMessageAPI {
// so the user facing UI can report unhandled errors
// except in this case of living inside http-resource pollServer
// because it just restarts more connections...
let messages = await this._retrieveNextMessages(nodeData);
// this only tracks retrieval failures
// won't include parsing failures...
@ -328,11 +315,12 @@ class LokiMessageAPI {
if (e instanceof textsecure.WrongSwarmError) {
const { newSwarm } = e;
await lokiSnodeAPI.updateSwarmNodes(this.ourKey, newSwarm);
// FIXME: lokiSnode should handle this
for (let i = 0; i < newSwarm.length; i += 1) {
const lastHash = await window.Signal.Data.getLastHashBySnode(
newSwarm[i]
);
this.ourSwarmNodes[newSwarm[i]] = {
swarmPool[newSwarm[i]] = {
lastHash,
};
}
@ -348,7 +336,7 @@ class LokiMessageAPI {
}
// Always wait a bit as we are no longer long-polling
await sleepFor(Math.max(successiveFailures, 2) * 1000);
await primitives.sleepFor(Math.max(successiveFailures, 2) * 1000);
}
if (successiveFailures >= MAX_ACCEPTABLE_FAILURES) {
const remainingSwarmSnodes = await lokiSnodeAPI.unreachableNode(
@ -359,15 +347,15 @@ class LokiMessageAPI {
`loki_message:::_openRetrieveConnection - too many successive failures, removing ${
nodeData.ip
}:${nodeData.port} from our swarm pool. We have ${
Object.keys(this.ourSwarmNodes).length
} usable swarm nodes left (${
Object.keys(swarmPool).length
} usable swarm nodes left for our connection (${
remainingSwarmSnodes.length
} in local db)`
);
}
}
// if not stopPollingResult
if (_.isEmpty(this.ourSwarmNodes)) {
if (_.isEmpty(swarmPool)) {
log.error(
'loki_message:::_openRetrieveConnection - We no longer have any swarm nodes available to try in pool, closing retrieve connection'
);
@ -402,7 +390,7 @@ class LokiMessageAPI {
if (result === false) {
// make a note of it because of caller doesn't care...
log.warn(
`loki_message:::_retrieveNextMessages - lokiRpc returned false to ${
`loki_message:::_retrieveNextMessages - lokiRpc could not talk to ${
nodeData.ip
}:${nodeData.port}`
);
@ -413,7 +401,6 @@ class LokiMessageAPI {
// we don't throw or catch here
async startLongPolling(numConnections, stopPolling, callback) {
this.ourSwarmNodes = {};
// load from local DB
let nodes = await lokiSnodeAPI.getSwarmNodesForPubKey(this.ourKey);
if (nodes.length < numConnections) {
@ -436,19 +423,17 @@ class LokiMessageAPI {
'for',
this.ourKey
);
Object.keys(nodes).forEach(j => {
const node = nodes[j];
log.info(`loki_message: ${j} ${node.ip}:${node.port}`);
});
for (let i = 0; i < nodes.length; i += 1) {
const lastHash = await window.Signal.Data.getLastHashBySnode(
nodes[i].address
);
this.ourSwarmNodes[nodes[i].address] = {
...nodes[i],
lastHash,
};
// floor or ceil probably doesn't matter, since it's likely always uneven
const poolSize = Math.floor(nodes.length / numConnections, 10);
const pools = [];
while(nodes.length) {
const poolList = nodes.splice(0, poolSize);
const byAddressObj = poolList.reduce(function(result, node) {
result[node.address] = node;
return result;
}, {});
pools.push(byAddressObj);
}
const promises = [];
@ -457,7 +442,7 @@ class LokiMessageAPI {
for (let i = 0; i < numConnections; i += 1) {
promises.push(
// eslint-disable-next-line more/no-then
this._openRetrieveConnection(stopPolling, callback).then(() => {
this._openRetrieveConnection(pools[i], stopPolling, callback).then(() => {
unresolved -= 1;
log.info(
'loki_message:::startLongPolling - There are',

Loading…
Cancel
Save