diff --git a/js/background.js b/js/background.js index 6bdb08222..e4c29700c 100644 --- a/js/background.js +++ b/js/background.js @@ -233,6 +233,8 @@ window.libloki.api.sendOnlineBroadcastMessage(pubKey, isPing); }); + window.lokiMessageAPI = new window.LokiMessageAPI(); + const currentPoWDifficulty = storage.get('PoWDifficulty', null); if (!currentPoWDifficulty) { storage.put('PoWDifficulty', window.getDefaultPoWDifficulty()); diff --git a/js/modules/loki_message_api.js b/js/modules/loki_message_api.js index fabbdceb2..7dfc99d35 100644 --- a/js/modules/loki_message_api.js +++ b/js/modules/loki_message_api.js @@ -66,32 +66,11 @@ const trySendP2p = async (pubKey, data64, isPing, messageEventData) => { } }; -const retrieveNextMessages = async (nodeUrl, nodeData, ourKey) => { - const params = { - pubKey: ourKey, - lastHash: nodeData.lastHash || '', - }; - const options = { - timeout: 40000, - headers: { - [LOKI_LONGPOLL_HEADER]: true, - }, - }; - - const result = await rpc( - `https://${nodeUrl}`, - nodeData.port, - 'retrieve', - params, - options - ); - return result.messages || []; -}; - class LokiMessageAPI { constructor() { this.jobQueue = new window.JobQueue(); this.sendingSwarmNodes = {}; + this.ourKey = window.textsecure.storage.user.getNumber(); } async sendMessage(pubKey, data, messageTimeStamp, ttl, options = {}) { @@ -227,8 +206,7 @@ class LokiMessageAPI { return false; } - async openConnection(callback) { - const ourKey = window.textsecure.storage.user.getNumber(); + async openRetrieveConnection(callback) { while (!_.isEmpty(this.ourSwarmNodes)) { const address = Object.keys(this.ourSwarmNodes)[0]; const nodeData = this.ourSwarmNodes[address]; @@ -239,16 +217,12 @@ class LokiMessageAPI { try { // TODO: Revert back to using snode address instead of IP - let messages = await retrieveNextMessages( - nodeData.ip, - nodeData, - ourKey - ); + let messages = await this.retrieveNextMessages(nodeData.ip, nodeData); successiveFailures = 0; if (messages.length) { const lastMessage = _.last(messages); - nodeData.lashHash = lastMessage.hash; - lokiSnodeAPI.updateLastHash( + nodeData.lastHash = lastMessage.hash; + await lokiSnodeAPI.updateLastHash( address, lastMessage.hash, lastMessage.expiration @@ -263,7 +237,15 @@ class LokiMessageAPI { log.warn('Loki retrieve messages:', e); if (e instanceof textsecure.WrongSwarmError) { const { newSwarm } = e; - await lokiSnodeAPI.updateOurSwarmNodes(newSwarm); + await lokiSnodeAPI.updateSwarmNodes(this.ourKey, newSwarm); + for (let i = 0; i < newSwarm.length; i += 1) { + const lastHash = await window.Signal.Data.getLastHashBySnode( + newSwarm[i] + ); + this.ourSwarmNodes[newSwarm[i]] = { + lastHash, + }; + } // Try another snode break; } else if (e instanceof textsecure.NotFoundError) { @@ -275,16 +257,54 @@ class LokiMessageAPI { successiveFailures += 1; } } + if (successiveFailures >= 3) { + await lokiSnodeAPI.unreachableNode(this.ourKey, address); + } } } + async retrieveNextMessages(nodeUrl, nodeData) { + const params = { + pubKey: this.ourKey, + lastHash: nodeData.lastHash || '', + }; + const options = { + timeout: 40000, + headers: { + [LOKI_LONGPOLL_HEADER]: true, + }, + }; + + const result = await rpc( + `https://${nodeUrl}`, + nodeData.port, + 'retrieve', + params, + options + ); + return result.messages || []; + }; + async startLongPolling(numConnections, callback) { - this.ourSwarmNodes = await lokiSnodeAPI.getOurSwarmNodes(); + this.ourSwarmNodes = {}; + let nodes = await lokiSnodeAPI.getSwarmNodesForPubKey(this.ourKey); + if (nodes.length < numConnections) { + await lokiSnodeAPI.refreshSwarmNodesForPubKey(this.ourKey); + nodes = await lokiSnodeAPI.getSwarmNodesForPubKey(this.ourKey); + } + for (let i = 0; i < nodes.length; i += 1) { + const lastHash = await window.Signal.Data.getLastHashBySnode(nodes[i].address); + this.ourSwarmNodes[nodes[i].address] = { + lastHash, + ip: nodes[i].ip, + port: nodes[i].port, + }; + } const promises = []; for (let i = 0; i < numConnections; i += 1) - promises.push(this.openConnection(callback)); + promises.push(this.openRetrieveConnection(callback)); // blocks until all snodes in our swarms have been removed from the list // or if there is network issues (ENOUTFOUND due to lokinet) diff --git a/js/modules/loki_snode_api.js b/js/modules/loki_snode_api.js index fc79cd77f..4a23208c3 100644 --- a/js/modules/loki_snode_api.js +++ b/js/modules/loki_snode_api.js @@ -6,9 +6,6 @@ const dns = require('dns'); const process = require('process'); const { rpc } = require('./loki_rpc'); -// Will be raised (to 3?) when we get more nodes -const MINIMUM_SWARM_NODES = 1; - const resolve4 = url => new Promise((resolve, reject) => { dns.resolve4(url, (err, ip) => { @@ -40,8 +37,6 @@ class LokiSnodeAPI { this.localUrl = localUrl; this.randomSnodePool = []; this.swarmsPendingReplenish = {}; - this.ourSwarmNodes = {}; - this.contactSwarmNodes = {}; // When we package lokinet with messenger we can ensure this ip is correct if (process.platform === 'win32') { dns.setServers(['127.0.0.1']); @@ -92,26 +87,14 @@ class LokiSnodeAPI { } async unreachableNode(pubKey, nodeUrl) { - if (pubKey === window.textsecure.storage.user.getNumber()) { - delete this.ourSwarmNodes[nodeUrl]; - return; - } - const conversation = ConversationController.get(pubKey); const swarmNodes = [...conversation.get('swarmNodes')]; - if (swarmNodes.includes(nodeUrl)) { - const filteredNodes = swarmNodes.filter(node => node !== nodeUrl); - await conversation.updateSwarmNodes(filteredNodes); - delete this.contactSwarmNodes[nodeUrl]; - } + const filteredNodes = swarmNodes.filter(node => node.address !== nodeUrl); + await conversation.updateSwarmNodes(filteredNodes); } async updateLastHash(nodeUrl, lastHash, expiresAt) { await window.Signal.Data.updateLastHash({ nodeUrl, lastHash, expiresAt }); - if (!this.ourSwarmNodes[nodeUrl]) { - return; - } - this.ourSwarmNodes[nodeUrl].lastHash = lastHash; } getSwarmNodesForPubKey(pubKey) { @@ -137,33 +120,6 @@ class LokiSnodeAPI { } } - async updateOurSwarmNodes(newNodes) { - this.ourSwarmNodes = {}; - const ps = newNodes.map(async snode => { - const lastHash = await window.Signal.Data.getLastHashBySnode( - snode.address - ); - this.ourSwarmNodes[snode.address] = { - lastHash, - port: snode.port, - ip: snode.ip, - }; - }); - await Promise.all(ps); - } - - async getOurSwarmNodes() { - if ( - !this.ourSwarmNodes || - Object.keys(this.ourSwarmNodes).length < MINIMUM_SWARM_NODES - ) { - const ourKey = window.textsecure.storage.user.getNumber(); - const nodeAddresses = await this.getSwarmNodes(ourKey); - await this.updateOurSwarmNodes(nodeAddresses); - } - return { ...this.ourSwarmNodes }; - } - async refreshSwarmNodesForPubKey(pubKey) { const newNodes = await this.getFreshSwarmNodes(pubKey); this.updateSwarmNodes(pubKey, newNodes); diff --git a/preload.js b/preload.js index 6c80e3f3a..e9e70580d 100644 --- a/preload.js +++ b/preload.js @@ -302,9 +302,7 @@ window.lokiSnodeAPI = new LokiSnodeAPI({ window.LokiP2pAPI = require('./js/modules/loki_p2p_api'); -const LokiMessageAPI = require('./js/modules/loki_message_api'); - -window.lokiMessageAPI = new LokiMessageAPI(); +window.LokiMessageAPI = require('./js/modules/loki_message_api'); const LocalLokiServer = require('./libloki/modules/local_loki_server');