diff --git a/js/modules/loki_message_api.js b/js/modules/loki_message_api.js index 8b442dfb7..d6c2e65e6 100644 --- a/js/modules/loki_message_api.js +++ b/js/modules/loki_message_api.js @@ -31,15 +31,64 @@ const filterIncomingMessages = async messages => { return newMessages; }; +const calcNonce = async (messageEventData, pubKey, data64, timestamp, ttl) => { + // Nonce is returned as a base64 string to include in header + try { + window.Whisper.events.trigger('calculatingPoW', messageEventData); + const development = window.getEnvironment() !== 'production'; + return callWorker( + 'calcPoW', + timestamp, + ttl, + pubKey, + data64, + development + ); + } catch (err) { + // Something went horribly wrong + throw err; + } +} + +const trySendP2p = async (pubKey, data64, isPing, messageEventData) => { + const p2pDetails = lokiP2pAPI.getContactP2pDetails(pubKey); + if (!p2pDetails || (!isPing && !p2pDetails.isOnline)) { + return false; + } + try { + const port = p2pDetails.port ? `:${p2pDetails.port}` : ''; + + await rpc(p2pDetails.address, port, 'store', { + data: data64, + }); + lokiP2pAPI.setContactOnline(pubKey); + window.Whisper.events.trigger('p2pMessageSent', messageEventData); + if (isPing) { + log.info(`Successfully pinged ${pubKey}`); + } else { + log.info(`Successful p2p message to ${pubKey}`); + } + return true; + } catch (e) { + lokiP2pAPI.setContactOffline(pubKey); + if (isPing) { + // If this was just a ping, we don't bother sending to storage server + log.warn('Ping failed, contact marked offline', e); + return true; + } + log.warn('Failed to send P2P message, falling back to storage', e); + return false; + } +} + class LokiMessageAPI { constructor({ snodeServerPort }) { this.snodeServerPort = snodeServerPort ? `:${snodeServerPort}` : ''; this.jobQueue = new window.JobQueue(); + this.sendingSwarmNodes = {}; } - async sendMessage(pubKey, data, messageTimeStamp, ttl, isPing = false) { - const timestamp = Date.now(); - + async sendMessage(numConnections, pubKey, data, messageTimeStamp, ttl, isPing = false) { // Data required to identify a message in a conversation const messageEventData = { pubKey, @@ -47,134 +96,90 @@ class LokiMessageAPI { }; const data64 = dcodeIO.ByteBuffer.wrap(data).toString('base64'); - const p2pDetails = lokiP2pAPI.getContactP2pDetails(pubKey); - if (p2pDetails && (isPing || p2pDetails.isOnline)) { - try { - const port = p2pDetails.port ? `:${p2pDetails.port}` : ''; - - await rpc(p2pDetails.address, port, 'store', { - data: data64, - }); - lokiP2pAPI.setContactOnline(pubKey); - window.Whisper.events.trigger('p2pMessageSent', messageEventData); - if (isPing) { - log.info(`Successfully pinged ${pubKey}`); - } else { - log.info(`Successful p2p message to ${pubKey}`); - } - return; - } catch (e) { - lokiP2pAPI.setContactOffline(pubKey); - if (isPing) { - // If this was just a ping, we don't bother sending to storage server - log.warn('Ping failed, contact marked offline', e); - return; - } - log.warn('Failed to send P2P message, falling back to storage', e); - } + const p2pSuccess = await trySendP2p(pubKey, data64, isPing, messageEventData); + if (p2pSuccess) { + return; } - // Nonce is returned as a base64 string to include in header - let nonce; - try { - window.Whisper.events.trigger('calculatingPoW', messageEventData); - const development = window.getEnvironment() !== 'production'; - nonce = await callWorker( - 'calcPoW', - timestamp, - ttl, - pubKey, - data64, - development - ); - } catch (err) { - // Something went horribly wrong - throw err; + const timestamp = Date.now(); + const nonce = await calcNonce(messageEventData, pubKey, data64, timestamp, ttl); + // Using timestamp as a unique identifier + this.sendingSwarmNodes[timestamp] = lokiSnodeAPI.getSwarmNodesForPubKey(pubKey); + if (this.sendingSwarmNodes[timestamp].length < numConnections) { + const freshNodes = await lokiSnodeAPI.getFreshSwarmNodes(pubKey); + await lokiSnodeAPI.updateSwarmNodes(pubKey, freshNodes); + this.sendingSwarmNodes[timestamp] = freshNodes; } - const completedNodes = []; - const failedNodes = []; - let successfulRequests = 0; - let canResolve = true; - - let swarmNodes = await lokiSnodeAPI.getSwarmNodesForPubKey(pubKey); - - const nodeComplete = nodeUrl => { - completedNodes.push(nodeUrl); - swarmNodes = swarmNodes.filter(node => node !== nodeUrl); + const params = { + pubKey, + ttl: ttl.toString(), + nonce, + timestamp: timestamp.toString(), + data: data64, }; + const promises = []; + for (let i = 0; i < numConnections; i += 1) { + promises.push(this.openSendConnection(params)); + } - const doRequest = async nodeUrl => { - const params = { + const results = await Promise.all(promises); + delete this.sendingSwarmNodes[timestamp]; + if (results.every(value => value === false)) { + throw new window.textsecure.EmptySwarmError( pubKey, - ttl: ttl.toString(), - nonce, - timestamp: timestamp.toString(), - data: data64, - }; + 'Ran out of swarm nodes to query' + ); + } + if (results.every(value => value === true)) { + log.info(`Successful storage message to ${pubKey}`); + } else { + log.warn(`Partially successful storage message to ${pubKey}`); + } + } - try { - await rpc(`http://${nodeUrl}`, this.snodeServerPort, 'store', params); + async openSendConnection(params) { + while (!_.isEmpty(this.sendingSwarmNodes[params.timestamp])) { + const url = this.sendingSwarmNodes[params.timestamp].shift(); + const successfulSend = await this.sendToNode(url, params); + if (successfulSend) { + return true; + } + } + return false; + } - nodeComplete(nodeUrl); - successfulRequests += 1; + async sendToNode(url, params) { + let successiveFailures = 0; + while (successiveFailures < 3) { + await sleepFor(successiveFailures * 500); + try { + await rpc(`http://${url}`, this.snodeServerPort, 'store', params); + return true; } catch (e) { log.warn('Loki send message:', e); if (e instanceof textsecure.WrongSwarmError) { const { newSwarm } = e; - await lokiSnodeAPI.updateSwarmNodes(pubKey, newSwarm); - completedNodes.push(nodeUrl); + await lokiSnodeAPI.updateSwarmNodes(params.pubKey, newSwarm); + this.sendingSwarmNodes[params.timestamp] = newSwarm; + return false; } else if (e instanceof textsecure.NotFoundError) { - canResolve = false; + // TODO: Handle resolution error + successiveFailures += 1; } else if (e instanceof textsecure.HTTPError) { - // We mark the node as complete as we could still reach it - nodeComplete(nodeUrl); + // TODO: Handle working connection but error response + successiveFailures += 1; } else { - const removeNode = await lokiSnodeAPI.unreachableNode( - pubKey, - nodeUrl - ); - if (removeNode) { - log.error('Loki send message:', e); - nodeComplete(nodeUrl); - failedNodes.push(nodeUrl); - } - } - } - }; - - while (successfulRequests < MINIMUM_SUCCESSFUL_REQUESTS) { - if (!canResolve) { - throw new window.textsecure.DNSResolutionError('Sending messages'); - } - if (swarmNodes.length === 0) { - const freshNodes = await lokiSnodeAPI.getFreshSwarmNodes(pubKey); - const goodNodes = _.difference(freshNodes, failedNodes); - await lokiSnodeAPI.updateSwarmNodes(pubKey, goodNodes); - swarmNodes = _.difference(freshNodes, completedNodes); - if (swarmNodes.length === 0) { - if (successfulRequests !== 0) { - // TODO: Decide how to handle some completed requests but not enough - log.warn(`Partially successful storage message to ${pubKey}`); - return; - } - throw new window.textsecure.EmptySwarmError( - pubKey, - 'Ran out of swarm nodes to query' - ); + successiveFailures += 1; } } - - const remainingRequests = - MINIMUM_SUCCESSFUL_REQUESTS - successfulRequests; - - await Promise.all( - swarmNodes - .splice(0, remainingRequests) - .map(nodeUrl => doRequest(nodeUrl)) - ); } - log.info(`Successful storage message to ${pubKey}`); + log.error(`Failed to send to node: ${url}`); + await lokiSnodeAPI.unreachableNode( + params.pubKey, + url + ); + return false; } async retrieveNextMessages(nodeUrl, nodeData, ourKey) { diff --git a/js/modules/loki_snode_api.js b/js/modules/loki_snode_api.js index 79667f13f..88a92b35d 100644 --- a/js/modules/loki_snode_api.js +++ b/js/modules/loki_snode_api.js @@ -73,29 +73,10 @@ class LokiSnodeAPI { async unreachableNode(pubKey, nodeUrl) { if (pubKey === window.textsecure.storage.user.getNumber()) { - if (!this.ourSwarmNodes[nodeUrl]) { - this.ourSwarmNodes[nodeUrl] = { - failureCount: 1, - }; - } else { - this.ourSwarmNodes[nodeUrl].failureCount += 1; - } - if (this.ourSwarmNodes[nodeUrl].failureCount < FAILURE_THRESHOLD) { - return false; - } delete this.ourSwarmNodes[nodeUrl]; - return true; - } - if (!this.contactSwarmNodes[nodeUrl]) { - this.contactSwarmNodes[nodeUrl] = { - failureCount: 1, - }; - } else { - this.contactSwarmNodes[nodeUrl].failureCount += 1; - } - if (this.contactSwarmNodes[nodeUrl].failureCount < FAILURE_THRESHOLD) { - return false; + return; } + const conversation = ConversationController.get(pubKey); const swarmNodes = [...conversation.get('swarmNodes')]; if (swarmNodes.includes(nodeUrl)) { @@ -103,14 +84,12 @@ class LokiSnodeAPI { await conversation.updateSwarmNodes(filteredNodes); delete this.contactSwarmNodes[nodeUrl]; } - return true; } async updateLastHash(nodeUrl, lastHash, expiresAt) { await window.Signal.Data.updateLastHash({ nodeUrl, lastHash, expiresAt }); if (!this.ourSwarmNodes[nodeUrl]) { this.ourSwarmNodes[nodeUrl] = { - failureCount: 0, lastHash, }; } else { @@ -118,7 +97,7 @@ class LokiSnodeAPI { } } - async getSwarmNodesForPubKey(pubKey) { + getSwarmNodesForPubKey(pubKey) { try { const conversation = ConversationController.get(pubKey); const swarmNodes = [...conversation.get('swarmNodes')]; @@ -146,7 +125,6 @@ class LokiSnodeAPI { const ps = newNodes.map(async url => { const lastHash = await window.Signal.Data.getLastHashBySnode(url); this.ourSwarmNodes[url] = { - failureCount: 0, lastHash, }; }); diff --git a/libtextsecure/outgoing_message.js b/libtextsecure/outgoing_message.js index bf30cdf69..5d2610f17 100644 --- a/libtextsecure/outgoing_message.js +++ b/libtextsecure/outgoing_message.js @@ -187,7 +187,9 @@ OutgoingMessage.prototype = { async transmitMessage(number, data, timestamp, ttl = 24 * 60 * 60 * 1000) { const pubKey = number; try { + // TODO: Make NUM_CONCURRENT_CONNECTIONS a global constant await lokiMessageAPI.sendMessage( + 2, pubKey, data, timestamp,