Initial refactoring of sendmessage

pull/295/head
Beaudan 6 years ago
parent 1e11a6527c
commit b43978ece1

@ -31,15 +31,64 @@ const filterIncomingMessages = async messages => {
return newMessages;
};
const calcNonce = async (messageEventData, pubKey, data64, timestamp, ttl) => {
// Nonce is returned as a base64 string to include in header
try {
window.Whisper.events.trigger('calculatingPoW', messageEventData);
const development = window.getEnvironment() !== 'production';
return callWorker(
'calcPoW',
timestamp,
ttl,
pubKey,
data64,
development
);
} catch (err) {
// Something went horribly wrong
throw err;
}
}
const trySendP2p = async (pubKey, data64, isPing, messageEventData) => {
const p2pDetails = lokiP2pAPI.getContactP2pDetails(pubKey);
if (!p2pDetails || (!isPing && !p2pDetails.isOnline)) {
return false;
}
try {
const port = p2pDetails.port ? `:${p2pDetails.port}` : '';
await rpc(p2pDetails.address, port, 'store', {
data: data64,
});
lokiP2pAPI.setContactOnline(pubKey);
window.Whisper.events.trigger('p2pMessageSent', messageEventData);
if (isPing) {
log.info(`Successfully pinged ${pubKey}`);
} else {
log.info(`Successful p2p message to ${pubKey}`);
}
return true;
} catch (e) {
lokiP2pAPI.setContactOffline(pubKey);
if (isPing) {
// If this was just a ping, we don't bother sending to storage server
log.warn('Ping failed, contact marked offline', e);
return true;
}
log.warn('Failed to send P2P message, falling back to storage', e);
return false;
}
}
class LokiMessageAPI {
constructor({ snodeServerPort }) {
this.snodeServerPort = snodeServerPort ? `:${snodeServerPort}` : '';
this.jobQueue = new window.JobQueue();
this.sendingSwarmNodes = {};
}
async sendMessage(pubKey, data, messageTimeStamp, ttl, isPing = false) {
const timestamp = Date.now();
async sendMessage(numConnections, pubKey, data, messageTimeStamp, ttl, isPing = false) {
// Data required to identify a message in a conversation
const messageEventData = {
pubKey,
@ -47,134 +96,90 @@ class LokiMessageAPI {
};
const data64 = dcodeIO.ByteBuffer.wrap(data).toString('base64');
const p2pDetails = lokiP2pAPI.getContactP2pDetails(pubKey);
if (p2pDetails && (isPing || p2pDetails.isOnline)) {
try {
const port = p2pDetails.port ? `:${p2pDetails.port}` : '';
await rpc(p2pDetails.address, port, 'store', {
data: data64,
});
lokiP2pAPI.setContactOnline(pubKey);
window.Whisper.events.trigger('p2pMessageSent', messageEventData);
if (isPing) {
log.info(`Successfully pinged ${pubKey}`);
} else {
log.info(`Successful p2p message to ${pubKey}`);
}
return;
} catch (e) {
lokiP2pAPI.setContactOffline(pubKey);
if (isPing) {
// If this was just a ping, we don't bother sending to storage server
log.warn('Ping failed, contact marked offline', e);
return;
}
log.warn('Failed to send P2P message, falling back to storage', e);
}
const p2pSuccess = await trySendP2p(pubKey, data64, isPing, messageEventData);
if (p2pSuccess) {
return;
}
// Nonce is returned as a base64 string to include in header
let nonce;
try {
window.Whisper.events.trigger('calculatingPoW', messageEventData);
const development = window.getEnvironment() !== 'production';
nonce = await callWorker(
'calcPoW',
timestamp,
ttl,
pubKey,
data64,
development
);
} catch (err) {
// Something went horribly wrong
throw err;
const timestamp = Date.now();
const nonce = await calcNonce(messageEventData, pubKey, data64, timestamp, ttl);
// Using timestamp as a unique identifier
this.sendingSwarmNodes[timestamp] = lokiSnodeAPI.getSwarmNodesForPubKey(pubKey);
if (this.sendingSwarmNodes[timestamp].length < numConnections) {
const freshNodes = await lokiSnodeAPI.getFreshSwarmNodes(pubKey);
await lokiSnodeAPI.updateSwarmNodes(pubKey, freshNodes);
this.sendingSwarmNodes[timestamp] = freshNodes;
}
const completedNodes = [];
const failedNodes = [];
let successfulRequests = 0;
let canResolve = true;
let swarmNodes = await lokiSnodeAPI.getSwarmNodesForPubKey(pubKey);
const nodeComplete = nodeUrl => {
completedNodes.push(nodeUrl);
swarmNodes = swarmNodes.filter(node => node !== nodeUrl);
const params = {
pubKey,
ttl: ttl.toString(),
nonce,
timestamp: timestamp.toString(),
data: data64,
};
const promises = [];
for (let i = 0; i < numConnections; i += 1) {
promises.push(this.openSendConnection(params));
}
const doRequest = async nodeUrl => {
const params = {
const results = await Promise.all(promises);
delete this.sendingSwarmNodes[timestamp];
if (results.every(value => value === false)) {
throw new window.textsecure.EmptySwarmError(
pubKey,
ttl: ttl.toString(),
nonce,
timestamp: timestamp.toString(),
data: data64,
};
'Ran out of swarm nodes to query'
);
}
if (results.every(value => value === true)) {
log.info(`Successful storage message to ${pubKey}`);
} else {
log.warn(`Partially successful storage message to ${pubKey}`);
}
}
try {
await rpc(`http://${nodeUrl}`, this.snodeServerPort, 'store', params);
async openSendConnection(params) {
while (!_.isEmpty(this.sendingSwarmNodes[params.timestamp])) {
const url = this.sendingSwarmNodes[params.timestamp].shift();
const successfulSend = await this.sendToNode(url, params);
if (successfulSend) {
return true;
}
}
return false;
}
nodeComplete(nodeUrl);
successfulRequests += 1;
async sendToNode(url, params) {
let successiveFailures = 0;
while (successiveFailures < 3) {
await sleepFor(successiveFailures * 500);
try {
await rpc(`http://${url}`, this.snodeServerPort, 'store', params);
return true;
} catch (e) {
log.warn('Loki send message:', e);
if (e instanceof textsecure.WrongSwarmError) {
const { newSwarm } = e;
await lokiSnodeAPI.updateSwarmNodes(pubKey, newSwarm);
completedNodes.push(nodeUrl);
await lokiSnodeAPI.updateSwarmNodes(params.pubKey, newSwarm);
this.sendingSwarmNodes[params.timestamp] = newSwarm;
return false;
} else if (e instanceof textsecure.NotFoundError) {
canResolve = false;
// TODO: Handle resolution error
successiveFailures += 1;
} else if (e instanceof textsecure.HTTPError) {
// We mark the node as complete as we could still reach it
nodeComplete(nodeUrl);
// TODO: Handle working connection but error response
successiveFailures += 1;
} else {
const removeNode = await lokiSnodeAPI.unreachableNode(
pubKey,
nodeUrl
);
if (removeNode) {
log.error('Loki send message:', e);
nodeComplete(nodeUrl);
failedNodes.push(nodeUrl);
}
}
}
};
while (successfulRequests < MINIMUM_SUCCESSFUL_REQUESTS) {
if (!canResolve) {
throw new window.textsecure.DNSResolutionError('Sending messages');
}
if (swarmNodes.length === 0) {
const freshNodes = await lokiSnodeAPI.getFreshSwarmNodes(pubKey);
const goodNodes = _.difference(freshNodes, failedNodes);
await lokiSnodeAPI.updateSwarmNodes(pubKey, goodNodes);
swarmNodes = _.difference(freshNodes, completedNodes);
if (swarmNodes.length === 0) {
if (successfulRequests !== 0) {
// TODO: Decide how to handle some completed requests but not enough
log.warn(`Partially successful storage message to ${pubKey}`);
return;
}
throw new window.textsecure.EmptySwarmError(
pubKey,
'Ran out of swarm nodes to query'
);
successiveFailures += 1;
}
}
const remainingRequests =
MINIMUM_SUCCESSFUL_REQUESTS - successfulRequests;
await Promise.all(
swarmNodes
.splice(0, remainingRequests)
.map(nodeUrl => doRequest(nodeUrl))
);
}
log.info(`Successful storage message to ${pubKey}`);
log.error(`Failed to send to node: ${url}`);
await lokiSnodeAPI.unreachableNode(
params.pubKey,
url
);
return false;
}
async retrieveNextMessages(nodeUrl, nodeData, ourKey) {

@ -73,29 +73,10 @@ class LokiSnodeAPI {
async unreachableNode(pubKey, nodeUrl) {
if (pubKey === window.textsecure.storage.user.getNumber()) {
if (!this.ourSwarmNodes[nodeUrl]) {
this.ourSwarmNodes[nodeUrl] = {
failureCount: 1,
};
} else {
this.ourSwarmNodes[nodeUrl].failureCount += 1;
}
if (this.ourSwarmNodes[nodeUrl].failureCount < FAILURE_THRESHOLD) {
return false;
}
delete this.ourSwarmNodes[nodeUrl];
return true;
}
if (!this.contactSwarmNodes[nodeUrl]) {
this.contactSwarmNodes[nodeUrl] = {
failureCount: 1,
};
} else {
this.contactSwarmNodes[nodeUrl].failureCount += 1;
}
if (this.contactSwarmNodes[nodeUrl].failureCount < FAILURE_THRESHOLD) {
return false;
return;
}
const conversation = ConversationController.get(pubKey);
const swarmNodes = [...conversation.get('swarmNodes')];
if (swarmNodes.includes(nodeUrl)) {
@ -103,14 +84,12 @@ class LokiSnodeAPI {
await conversation.updateSwarmNodes(filteredNodes);
delete this.contactSwarmNodes[nodeUrl];
}
return true;
}
async updateLastHash(nodeUrl, lastHash, expiresAt) {
await window.Signal.Data.updateLastHash({ nodeUrl, lastHash, expiresAt });
if (!this.ourSwarmNodes[nodeUrl]) {
this.ourSwarmNodes[nodeUrl] = {
failureCount: 0,
lastHash,
};
} else {
@ -118,7 +97,7 @@ class LokiSnodeAPI {
}
}
async getSwarmNodesForPubKey(pubKey) {
getSwarmNodesForPubKey(pubKey) {
try {
const conversation = ConversationController.get(pubKey);
const swarmNodes = [...conversation.get('swarmNodes')];
@ -146,7 +125,6 @@ class LokiSnodeAPI {
const ps = newNodes.map(async url => {
const lastHash = await window.Signal.Data.getLastHashBySnode(url);
this.ourSwarmNodes[url] = {
failureCount: 0,
lastHash,
};
});

@ -187,7 +187,9 @@ OutgoingMessage.prototype = {
async transmitMessage(number, data, timestamp, ttl = 24 * 60 * 60 * 1000) {
const pubKey = number;
try {
// TODO: Make NUM_CONCURRENT_CONNECTIONS a global constant
await lokiMessageAPI.sendMessage(
2,
pubKey,
data,
timestamp,

Loading…
Cancel
Save