diff --git a/js/expire.js b/js/expire.js index c068921e5..228508caf 100644 --- a/js/expire.js +++ b/js/expire.js @@ -16,7 +16,7 @@ LokiFileServerAPI.secureRpcPubKey ); - let nextWaitSeconds = 1; + let nextWaitSeconds = 5; const checkForUpgrades = async () => { const result = await window.tokenlessFileServerAdnAPI.serverRequest( 'loki/v1/version/client/desktop' @@ -67,9 +67,7 @@ return res(expiredVersion); } log.info( - 'Delaying sending checks for', - nextWaitSeconds, - 's, no version yet' + `Delaying sending checks for ${nextWaitSeconds}s, no version yet` ); setTimeout(waitForVersion, nextWaitSeconds * 1000); return true; @@ -85,11 +83,7 @@ window.extension.expired = cb => { if (expiredVersion === null) { // just give it another second - log.info( - 'Delaying expire banner determination for', - nextWaitSeconds, - 's' - ); + log.info(`Delaying expire banner determination for ${nextWaitSeconds}s`); setTimeout(() => { window.extension.expired(cb); }, nextWaitSeconds * 1000); diff --git a/js/modules/loki_app_dot_net_api.js b/js/modules/loki_app_dot_net_api.js index 945ec66a5..ef56f8c1b 100644 --- a/js/modules/loki_app_dot_net_api.js +++ b/js/modules/loki_app_dot_net_api.js @@ -32,6 +32,8 @@ const snodeHttpsAgent = new https.Agent({ rejectUnauthorized: false, }); +const timeoutDelay = ms => new Promise(resolve => setTimeout(resolve, ms)); + const sendToProxy = async ( srvPubKey, endpoint, @@ -44,8 +46,6 @@ const sendToProxy = async ( ); return {}; } - const randSnode = await lokiSnodeAPI.getRandomSnodeAddress(); - const url = `https://${randSnode.ip}:${randSnode.port}/file_proxy`; const fetchOptions = pFetchOptions; // make lint happy // safety issue with file server, just safer to have this @@ -61,6 +61,7 @@ const sendToProxy = async ( }; // from https://github.com/sindresorhus/is-stream/blob/master/index.js + let fileUpload = false; if ( payloadObj.body && typeof payloadObj.body === 'object' && @@ -74,8 +75,22 @@ const sendToProxy = async ( payloadObj.body = { fileUpload: fData.toString('base64'), }; + fileUpload = true; } + // use nodes that support more than 1mb + const randomFunc = fileUpload + ? 'getRandomProxySnodeAddress' + : 'getRandomSnodeAddress'; + const randSnode = await lokiSnodeAPI[randomFunc](); + if (randSnode === false) { + log.warn('proxy random snode pool is not ready, retrying 10s', endpoint); + // no nodes in the pool yet, give it some time and retry + await timeoutDelay(1000); + return sendToProxy(srvPubKey, endpoint, pFetchOptions, options); + } + const url = `https://${randSnode.ip}:${randSnode.port}/file_proxy`; + // convert our payload to binary buffer const payloadData = Buffer.from( dcodeIO.ByteBuffer.wrap(JSON.stringify(payloadObj)).toArrayBuffer() @@ -138,7 +153,7 @@ const sendToProxy = async ( ); // retry (hopefully with new snode) // FIXME: max number of retries... - return sendToProxy(srvPubKey, endpoint, fetchOptions); + return sendToProxy(srvPubKey, endpoint, fetchOptions, options); } let response = {}; diff --git a/js/modules/loki_rpc.js b/js/modules/loki_rpc.js index 78f2414d9..89d2ba615 100644 --- a/js/modules/loki_rpc.js +++ b/js/modules/loki_rpc.js @@ -136,33 +136,32 @@ const processOnionResponse = async (reqIdx, response, sharedKey, useAesGcm) => { // detect SNode is not ready (not in swarm; not done syncing) if (response.status === 503) { - log.warn('Got 503: snode not ready'); + log.warn(`(${reqIdx}) [path] Got 503: snode not ready`); return BAD_PATH; } if (response.status === 504) { - log.warn('Got 504: Gateway timeout'); + log.warn(`(${reqIdx}) [path] Got 504: Gateway timeout`); return BAD_PATH; } if (response.status === 404) { // Why would we get this error on testnet? - log.warn('Got 404: Gateway timeout'); + log.warn(`(${reqIdx}) [path] Got 404: Gateway timeout`); return BAD_PATH; } if (response.status !== 200) { log.warn( - 'lokiRpc sendToProxy fetch unhandled error code:', - response.status + `(${reqIdx}) [path] fetch unhandled error code: ${response.status}` ); return false; } const ciphertext = await response.text(); if (!ciphertext) { - log.warn('[path]: Target node return empty ciphertext'); + log.warn(`(${reqIdx}) [path]: Target node return empty ciphertext`); return false; } @@ -183,9 +182,9 @@ const processOnionResponse = async (reqIdx, response, sharedKey, useAesGcm) => { const textDecoder = new TextDecoder(); plaintext = textDecoder.decode(plaintextBuffer); } catch (e) { - log.error(`(${reqIdx}) lokiRpc sendToProxy decode error`); + log.error(`(${reqIdx}) [path] decode error`); if (ciphertextBuffer) { - log.error('ciphertextBuffer', ciphertextBuffer); + log.error(`(${reqIdx}) [path] ciphertextBuffer`, ciphertextBuffer); } return false; } @@ -198,22 +197,13 @@ const processOnionResponse = async (reqIdx, response, sharedKey, useAesGcm) => { const res = JSON.parse(jsonRes.body); return res; } catch (e) { - log.error( - `(${reqIdx}) lokiRpc sendToProxy parse error json: `, - jsonRes.body - ); + log.error(`(${reqIdx}) [path] parse error json: `, jsonRes.body); } return false; }; return jsonRes; } catch (e) { - log.error( - 'lokiRpc sendToProxy parse error', - e.code, - e.message, - `json:`, - plaintext - ); + log.error('[path] parse error', e.code, e.message, `json:`, plaintext); return false; } }; @@ -280,8 +270,9 @@ const sendToProxy = async (options = {}, targetNode, retryNumber = 0) => { // we got a ton of randomPool nodes, let's just not worry about this one lokiSnodeAPI.markRandomNodeUnreachable(randSnode); const randomPoolRemainingCount = lokiSnodeAPI.getRandomPoolLength(); + const ciphertext = await response.text(); log.warn( - `lokiRpc sendToProxy`, + `lokiRpc:::sendToProxy -`, `snode ${randSnode.ip}:${randSnode.port} to ${targetNode.ip}:${ targetNode.port }`, @@ -297,16 +288,13 @@ const sendToProxy = async (options = {}, targetNode, retryNumber = 0) => { // detect SNode is not ready (not in swarm; not done syncing) if (response.status === 503 || response.status === 500) { - const ciphertext = await response.text(); - // we shouldn't do these, - // it's seems to be not the random node that's always bad - // but the target node - - // we got a ton of randomPool nodes, let's just not worry about this one + // this doesn't mean the random node is bad, it could be the target node + // but we got a ton of randomPool nodes, let's just not worry about this one lokiSnodeAPI.markRandomNodeUnreachable(randSnode); const randomPoolRemainingCount = lokiSnodeAPI.getRandomPoolLength(); + const ciphertext = await response.text(); log.warn( - `lokiRpc sendToProxy`, + `lokiRpc:::sendToProxy -`, `snode ${randSnode.ip}:${randSnode.port} to ${targetNode.ip}:${ targetNode.port }`, @@ -346,7 +334,7 @@ const sendToProxy = async (options = {}, targetNode, retryNumber = 0) => { if (response.status !== 200) { // let us know we need to create handlers for new unhandled codes log.warn( - 'lokiRpc sendToProxy fetch non-200 statusCode', + 'lokiRpc:::sendToProxy - fetch non-200 statusCode', response.status, `from snode ${randSnode.ip}:${randSnode.port} to ${targetNode.ip}:${ targetNode.port @@ -381,7 +369,7 @@ const sendToProxy = async (options = {}, targetNode, retryNumber = 0) => { plaintext = textDecoder.decode(plaintextBuffer); } catch (e) { log.error( - 'lokiRpc sendToProxy decode error', + 'lokiRpc:::sendToProxy - decode error', e.code, e.message, `from ${randSnode.ip}:${randSnode.port} to ${targetNode.ip}:${ @@ -403,7 +391,7 @@ const sendToProxy = async (options = {}, targetNode, retryNumber = 0) => { return JSON.parse(jsonRes.body); } catch (e) { log.error( - 'lokiRpc sendToProxy parse error', + 'lokiRpc:::sendToProxy - parse error', e.code, e.message, `from ${randSnode.ip}:${randSnode.port} json:`, @@ -414,7 +402,7 @@ const sendToProxy = async (options = {}, targetNode, retryNumber = 0) => { }; if (retryNumber) { log.info( - `lokiRpc sendToProxy request succeeded,`, + `lokiRpc:::sendToProxy - request succeeded,`, `snode ${randSnode.ip}:${randSnode.port} to ${targetNode.ip}:${ targetNode.port }`, @@ -424,7 +412,7 @@ const sendToProxy = async (options = {}, targetNode, retryNumber = 0) => { return jsonRes; } catch (e) { log.error( - 'lokiRpc sendToProxy parse error', + 'lokiRpc:::sendToProxy - parse error', e.code, e.message, `from ${randSnode.ip}:${randSnode.port} json:`, @@ -515,7 +503,7 @@ const lokiFetch = async (url, options = {}, targetNode = null) => { fetchOptions.agent = snodeHttpsAgent; process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0'; } else { - log.info('lokiRpc http communication', url); + log.info('lokirpc:::lokiFetch - http communication', url); } const response = await nodeFetch(url, fetchOptions); // restore TLS checking diff --git a/js/modules/loki_snode_api.js b/js/modules/loki_snode_api.js index 8709f2a4b..9642d64e0 100644 --- a/js/modules/loki_snode_api.js +++ b/js/modules/loki_snode_api.js @@ -3,11 +3,19 @@ const is = require('@sindresorhus/is'); const { lokiRpc } = require('./loki_rpc'); +const https = require('https'); const nodeFetch = require('node-fetch'); +const semver = require('semver'); + +const snodeHttpsAgent = new https.Agent({ + rejectUnauthorized: false, +}); const RANDOM_SNODES_TO_USE_FOR_PUBKEY_SWARM = 3; const SEED_NODE_RETRIES = 3; +const timeoutDelay = ms => new Promise(resolve => setTimeout(resolve, ms)); + class LokiSnodeAPI { constructor({ serverUrl, localUrl }) { if (!is.string(serverUrl)) { @@ -18,6 +26,9 @@ class LokiSnodeAPI { this.randomSnodePool = []; this.swarmsPendingReplenish = {}; this.refreshRandomPoolPromise = false; + this.versionPools = {}; + this.versionMap = {}; // reverse version look up + this.versionsRetrieved = false; // to mark when it's done getting versions this.onionPaths = []; this.guardNodes = []; @@ -30,6 +41,10 @@ class LokiSnodeAPI { return this.randomSnodePool; } + getRandomPoolLength() { + return this.randomSnodePool.length; + } + async testGuardNode(snode) { log.info('Testing a candidate guard node ', snode); @@ -202,7 +217,7 @@ class LokiSnodeAPI { log.warn( `could not find some guard nodes: ${this.guardNodes.length}/${ edKeys.length - }` + } left` ); } } @@ -249,11 +264,85 @@ class LokiSnodeAPI { if (this.randomSnodePool.length === 0) { throw new window.textsecure.SeedNodeError('Invalid seed node response'); } + // FIXME: _.sample? return this.randomSnodePool[ Math.floor(Math.random() * this.randomSnodePool.length) ]; } + // use nodes that support more than 1mb + async getRandomProxySnodeAddress() { + /* resolve random snode */ + if (this.randomSnodePool.length === 0) { + // allow exceptions to pass through upwards + await this.refreshRandomPool(); + } + if (this.randomSnodePool.length === 0) { + throw new window.textsecure.SeedNodeError('Invalid seed node response'); + } + const goodVersions = Object.keys(this.versionPools).filter(version => + semver.gt(version, '2.0.1') + ); + if (!goodVersions.length) { + return false; + } + // FIXME: _.sample? + const goodVersion = + goodVersions[Math.floor(Math.random() * goodVersions.length)]; + const pool = this.versionPools[goodVersion]; + // FIXME: _.sample? + return pool[Math.floor(Math.random() * pool.length)]; + } + + // WARNING: this leaks our IP to all snodes but with no other identifying information + // except that a client started up or ran out of random pool snodes + async getVersion(node) { + try { + process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0'; + const result = await nodeFetch( + `https://${node.ip}:${node.port}/get_stats/v1`, + { agent: snodeHttpsAgent } + ); + process.env.NODE_TLS_REJECT_UNAUTHORIZED = '1'; + const data = await result.json(); + if (data.version) { + if (this.versionPools[data.version] === undefined) { + this.versionPools[data.version] = [node]; + } else { + this.versionPools[data.version].push(node); + } + // set up reverse mapping for removal lookup + this.versionMap[`${node.ip}:${node.port}`] = data.version; + } + } catch (e) { + // ECONNREFUSED likely means it's just offline... + // ECONNRESET seems to retry and fail as ECONNREFUSED (so likely a node going offline) + // ETIMEDOUT not sure what to do about these + // retry for now but maybe we should be marking bad... + if (e.code === 'ECONNREFUSED') { + this.markRandomNodeUnreachable(node, { versionPoolFailure: true }); + const randomNodesLeft = this.getRandomPoolLength(); + // clean up these error messages to be a little neater + log.warn( + `loki_snode:::getVersion - ${node.ip}:${ + node.port + } is offline, removing, leaving ${randomNodesLeft} in the randomPool` + ); + } else { + // mostly ECONNRESETs + // ENOTFOUND could mean no internet or hiccup + log.warn( + 'loki_snode:::getVersion - Error', + e.code, + e.message, + `on ${node.ip}:${node.port} retrying in 1s` + ); + await timeoutDelay(1000); + await this.getVersion(node); + } + } + } + async refreshRandomPool(seedNodes = [...window.seedNodeList]) { // if currently not in progress if (this.refreshRandomPoolPromise === false) { @@ -295,6 +384,12 @@ class LokiSnodeAPI { snodes = response.result.service_node_states.filter( snode => snode.public_ip !== '0.0.0.0' ); + // make sure order of the list is random, so we get version in a non-deterministic way + snodes = _.shuffle(snodes); + // commit changes to be live + // we'll update the version (in case they upgrade) every cycle + this.versionPools = {}; + this.versionsRetrieved = false; this.randomSnodePool = snodes.map(snode => ({ ip: snode.public_ip, port: snode.storage_port, @@ -312,7 +407,35 @@ class LokiSnodeAPI { clearTimeout(timeoutTimer); timeoutTimer = null; } + // start polling versions resolve(); + // now get version for all snodes + // also acts an early online test/purge of bad nodes + let c = 0; + const verionStart = Date.now(); + const t = this.randomSnodePool.length; + const noticeEvery = parseInt(t / 10, 10); + // eslint-disable-next-line no-restricted-syntax + for (const node of this.randomSnodePool) { + c += 1; + // eslint-disable-next-line no-await-in-loop + await this.getVersion(node); + if (c % noticeEvery === 0) { + // give stats + const diff = Date.now() - verionStart; + log.info( + `${c}/${t} pool version status update, has taken ${diff.toLocaleString()}ms` + ); + Object.keys(this.versionPools).forEach(version => { + const nodes = this.versionPools[version].length; + log.info( + `version ${version} has ${nodes.toLocaleString()} snodes` + ); + }); + } + } + log.info('Versions retrieved from network!'); + this.versionsRetrieved = true; } catch (e) { log.warn( 'loki_snodes:::refreshRandomPoolPromise - error', @@ -402,15 +525,45 @@ class LokiSnodeAPI { return filteredNodes; } - markRandomNodeUnreachable(snode) { - this.randomSnodePool = _.without( - this.randomSnodePool, - _.find(this.randomSnodePool, { ip: snode.ip, port: snode.port }) - ); - } - - getRandomPoolLength() { - return this.randomSnodePool.length; + markRandomNodeUnreachable(snode, options = {}) { + // avoid retries when we can't get the version because they're offline + if (!options.versionPoolFailure) { + const snodeVersion = this.versionMap[`${snode.ip}:${snode.port}`]; + if (this.versionPools[snodeVersion]) { + this.versionPools[snodeVersion] = _.without( + this.versionPools[snodeVersion], + snode + ); + } else { + if (snodeVersion) { + // reverse map (versionMap) is out of sync with versionPools + log.error( + 'loki_snode:::markRandomNodeUnreachable - No snodes for version', + snodeVersion, + 'retrying in 10s' + ); + } else { + // we don't know our version yet + // and if we're offline, we'll likely not get it until it restarts if it does... + log.warn( + 'loki_snode:::markRandomNodeUnreachable - No version for snode', + `${snode.ip}:${snode.port}`, + 'retrying in 10s' + ); + } + // make sure we don't retry past 15 mins (10s * 100 ~ 1000s) + const retries = options.retries || 0; + if (retries < 100) { + setTimeout(() => { + this.markRandomNodeUnreachable(snode, { + ...options, + retries: retries + 1, + }); + }, 10000); + } + } + } + this.randomSnodePool = _.without(this.randomSnodePool, snode); } async updateLastHash(snode, hash, expiresAt) {