Merge pull request #311 from BeaudanBrown/refactor-snode-api

Refactor snode api
pull/314/head
Beaudan Campbell-Brown 6 years ago committed by GitHub
commit 1a71addcf5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -233,6 +233,8 @@
window.libloki.api.sendOnlineBroadcastMessage(pubKey, isPing); window.libloki.api.sendOnlineBroadcastMessage(pubKey, isPing);
}); });
window.lokiMessageAPI = new window.LokiMessageAPI();
const currentPoWDifficulty = storage.get('PoWDifficulty', null); const currentPoWDifficulty = storage.get('PoWDifficulty', null);
if (!currentPoWDifficulty) { if (!currentPoWDifficulty) {
storage.put('PoWDifficulty', window.getDefaultPoWDifficulty()); storage.put('PoWDifficulty', window.getDefaultPoWDifficulty());

@ -66,32 +66,11 @@ const trySendP2p = async (pubKey, data64, isPing, messageEventData) => {
} }
}; };
const retrieveNextMessages = async (nodeUrl, nodeData, ourKey) => {
const params = {
pubKey: ourKey,
lastHash: nodeData.lastHash || '',
};
const options = {
timeout: 40000,
headers: {
[LOKI_LONGPOLL_HEADER]: true,
},
};
const result = await rpc(
`https://${nodeUrl}`,
nodeData.port,
'retrieve',
params,
options
);
return result.messages || [];
};
class LokiMessageAPI { class LokiMessageAPI {
constructor() { constructor() {
this.jobQueue = new window.JobQueue(); this.jobQueue = new window.JobQueue();
this.sendingSwarmNodes = {}; this.sendingSwarmNodes = {};
this.ourKey = window.textsecure.storage.user.getNumber();
} }
async sendMessage(pubKey, data, messageTimeStamp, ttl, options = {}) { async sendMessage(pubKey, data, messageTimeStamp, ttl, options = {}) {
@ -227,8 +206,7 @@ class LokiMessageAPI {
return false; return false;
} }
async openConnection(callback) { async openRetrieveConnection(callback) {
const ourKey = window.textsecure.storage.user.getNumber();
while (!_.isEmpty(this.ourSwarmNodes)) { while (!_.isEmpty(this.ourSwarmNodes)) {
const address = Object.keys(this.ourSwarmNodes)[0]; const address = Object.keys(this.ourSwarmNodes)[0];
const nodeData = this.ourSwarmNodes[address]; const nodeData = this.ourSwarmNodes[address];
@ -239,16 +217,12 @@ class LokiMessageAPI {
try { try {
// TODO: Revert back to using snode address instead of IP // TODO: Revert back to using snode address instead of IP
let messages = await retrieveNextMessages( let messages = await this.retrieveNextMessages(nodeData.ip, nodeData);
nodeData.ip,
nodeData,
ourKey
);
successiveFailures = 0; successiveFailures = 0;
if (messages.length) { if (messages.length) {
const lastMessage = _.last(messages); const lastMessage = _.last(messages);
nodeData.lashHash = lastMessage.hash; nodeData.lastHash = lastMessage.hash;
lokiSnodeAPI.updateLastHash( await lokiSnodeAPI.updateLastHash(
address, address,
lastMessage.hash, lastMessage.hash,
lastMessage.expiration lastMessage.expiration
@ -263,7 +237,15 @@ class LokiMessageAPI {
log.warn('Loki retrieve messages:', e); log.warn('Loki retrieve messages:', e);
if (e instanceof textsecure.WrongSwarmError) { if (e instanceof textsecure.WrongSwarmError) {
const { newSwarm } = e; const { newSwarm } = e;
await lokiSnodeAPI.updateOurSwarmNodes(newSwarm); await lokiSnodeAPI.updateSwarmNodes(this.ourKey, newSwarm);
for (let i = 0; i < newSwarm.length; i += 1) {
const lastHash = await window.Signal.Data.getLastHashBySnode(
newSwarm[i]
);
this.ourSwarmNodes[newSwarm[i]] = {
lastHash,
};
}
// Try another snode // Try another snode
break; break;
} else if (e instanceof textsecure.NotFoundError) { } else if (e instanceof textsecure.NotFoundError) {
@ -275,16 +257,54 @@ class LokiMessageAPI {
successiveFailures += 1; successiveFailures += 1;
} }
} }
if (successiveFailures >= 3) {
await lokiSnodeAPI.unreachableNode(this.ourKey, address);
} }
} }
}
async retrieveNextMessages(nodeUrl, nodeData) {
const params = {
pubKey: this.ourKey,
lastHash: nodeData.lastHash || '',
};
const options = {
timeout: 40000,
headers: {
[LOKI_LONGPOLL_HEADER]: true,
},
};
const result = await rpc(
`https://${nodeUrl}`,
nodeData.port,
'retrieve',
params,
options
);
return result.messages || [];
};
async startLongPolling(numConnections, callback) { async startLongPolling(numConnections, callback) {
this.ourSwarmNodes = await lokiSnodeAPI.getOurSwarmNodes(); this.ourSwarmNodes = {};
let nodes = await lokiSnodeAPI.getSwarmNodesForPubKey(this.ourKey);
if (nodes.length < numConnections) {
await lokiSnodeAPI.refreshSwarmNodesForPubKey(this.ourKey);
nodes = await lokiSnodeAPI.getSwarmNodesForPubKey(this.ourKey);
}
for (let i = 0; i < nodes.length; i += 1) {
const lastHash = await window.Signal.Data.getLastHashBySnode(nodes[i].address);
this.ourSwarmNodes[nodes[i].address] = {
lastHash,
ip: nodes[i].ip,
port: nodes[i].port,
};
}
const promises = []; const promises = [];
for (let i = 0; i < numConnections; i += 1) for (let i = 0; i < numConnections; i += 1)
promises.push(this.openConnection(callback)); promises.push(this.openRetrieveConnection(callback));
// blocks until all snodes in our swarms have been removed from the list // blocks until all snodes in our swarms have been removed from the list
// or if there is network issues (ENOUTFOUND due to lokinet) // or if there is network issues (ENOUTFOUND due to lokinet)

@ -6,9 +6,6 @@ const dns = require('dns');
const process = require('process'); const process = require('process');
const { rpc } = require('./loki_rpc'); const { rpc } = require('./loki_rpc');
// Will be raised (to 3?) when we get more nodes
const MINIMUM_SWARM_NODES = 1;
const resolve4 = url => const resolve4 = url =>
new Promise((resolve, reject) => { new Promise((resolve, reject) => {
dns.resolve4(url, (err, ip) => { dns.resolve4(url, (err, ip) => {
@ -40,8 +37,6 @@ class LokiSnodeAPI {
this.localUrl = localUrl; this.localUrl = localUrl;
this.randomSnodePool = []; this.randomSnodePool = [];
this.swarmsPendingReplenish = {}; this.swarmsPendingReplenish = {};
this.ourSwarmNodes = {};
this.contactSwarmNodes = {};
// When we package lokinet with messenger we can ensure this ip is correct // When we package lokinet with messenger we can ensure this ip is correct
if (process.platform === 'win32') { if (process.platform === 'win32') {
dns.setServers(['127.0.0.1']); dns.setServers(['127.0.0.1']);
@ -92,26 +87,14 @@ class LokiSnodeAPI {
} }
async unreachableNode(pubKey, nodeUrl) { async unreachableNode(pubKey, nodeUrl) {
if (pubKey === window.textsecure.storage.user.getNumber()) {
delete this.ourSwarmNodes[nodeUrl];
return;
}
const conversation = ConversationController.get(pubKey); const conversation = ConversationController.get(pubKey);
const swarmNodes = [...conversation.get('swarmNodes')]; const swarmNodes = [...conversation.get('swarmNodes')];
if (swarmNodes.includes(nodeUrl)) { const filteredNodes = swarmNodes.filter(node => node.address !== nodeUrl);
const filteredNodes = swarmNodes.filter(node => node !== nodeUrl);
await conversation.updateSwarmNodes(filteredNodes); await conversation.updateSwarmNodes(filteredNodes);
delete this.contactSwarmNodes[nodeUrl];
}
} }
async updateLastHash(nodeUrl, lastHash, expiresAt) { async updateLastHash(nodeUrl, lastHash, expiresAt) {
await window.Signal.Data.updateLastHash({ nodeUrl, lastHash, expiresAt }); await window.Signal.Data.updateLastHash({ nodeUrl, lastHash, expiresAt });
if (!this.ourSwarmNodes[nodeUrl]) {
return;
}
this.ourSwarmNodes[nodeUrl].lastHash = lastHash;
} }
getSwarmNodesForPubKey(pubKey) { getSwarmNodesForPubKey(pubKey) {
@ -137,33 +120,6 @@ class LokiSnodeAPI {
} }
} }
async updateOurSwarmNodes(newNodes) {
this.ourSwarmNodes = {};
const ps = newNodes.map(async snode => {
const lastHash = await window.Signal.Data.getLastHashBySnode(
snode.address
);
this.ourSwarmNodes[snode.address] = {
lastHash,
port: snode.port,
ip: snode.ip,
};
});
await Promise.all(ps);
}
async getOurSwarmNodes() {
if (
!this.ourSwarmNodes ||
Object.keys(this.ourSwarmNodes).length < MINIMUM_SWARM_NODES
) {
const ourKey = window.textsecure.storage.user.getNumber();
const nodeAddresses = await this.getSwarmNodes(ourKey);
await this.updateOurSwarmNodes(nodeAddresses);
}
return { ...this.ourSwarmNodes };
}
async refreshSwarmNodesForPubKey(pubKey) { async refreshSwarmNodesForPubKey(pubKey) {
const newNodes = await this.getFreshSwarmNodes(pubKey); const newNodes = await this.getFreshSwarmNodes(pubKey);
this.updateSwarmNodes(pubKey, newNodes); this.updateSwarmNodes(pubKey, newNodes);

@ -302,9 +302,7 @@ window.lokiSnodeAPI = new LokiSnodeAPI({
window.LokiP2pAPI = require('./js/modules/loki_p2p_api'); window.LokiP2pAPI = require('./js/modules/loki_p2p_api');
const LokiMessageAPI = require('./js/modules/loki_message_api'); window.LokiMessageAPI = require('./js/modules/loki_message_api');
window.lokiMessageAPI = new LokiMessageAPI();
const LocalLokiServer = require('./libloki/modules/local_loki_server'); const LocalLokiServer = require('./libloki/modules/local_loki_server');

Loading…
Cancel
Save