Lots of logic for establishing a p2p connection, managing when the other user is online vs offline etc. Will always try to use P2P messaging when it can and fall back to storage server otherwise

pull/161/head
Beaudan 6 years ago
parent 93015b151f
commit a40a3d164f

@ -642,6 +642,7 @@
await this.respondToAllPendingFriendRequests({ await this.respondToAllPendingFriendRequests({
response: 'accepted', response: 'accepted',
}); });
window.libloki.api.sendOnlineBroadcastMessage(this.id);
return true; return true;
} }
return false; return false;

@ -1,6 +1,6 @@
/* eslint-disable no-await-in-loop */ /* eslint-disable no-await-in-loop */
/* eslint-disable no-loop-func */ /* eslint-disable no-loop-func */
/* global log, dcodeIO, window, callWorker, Whisper */ /* global log, dcodeIO, window, callWorker, Whisper, lokiP2pAPI */
const nodeFetch = require('node-fetch'); const nodeFetch = require('node-fetch');
const _ = require('lodash'); const _ = require('lodash');
@ -27,9 +27,9 @@ const fetch = async (url, options = {}) => {
try { try {
const response = await nodeFetch(url, { const response = await nodeFetch(url, {
...options,
timeout, timeout,
method, method,
...options,
}); });
if (!response.ok) { if (!response.ok) {
@ -63,9 +63,28 @@ class LokiMessageAPI {
this.messageServerPort = messageServerPort ? `:${messageServerPort}` : ''; this.messageServerPort = messageServerPort ? `:${messageServerPort}` : '';
} }
async sendMessage(pubKey, data, messageTimeStamp, ttl) { async sendMessage(pubKey, data, messageTimeStamp, ttl, forceP2p = false) {
const data64 = dcodeIO.ByteBuffer.wrap(data).toString('base64'); const data64 = dcodeIO.ByteBuffer.wrap(data).toString('base64');
const timestamp = Math.floor(Date.now() / 1000); const timestamp = Math.floor(Date.now() / 1000);
const p2pDetails = lokiP2pAPI.getContactP2pDetails(pubKey);
if (p2pDetails && (forceP2p || p2pDetails.isOnline)) {
try {
const port = p2pDetails.port ? `:${p2pDetails.port}` : '';
const url = `${p2pDetails.address}${port}/store`;
const fetchOptions = {
method: 'POST',
body: data64,
};
await fetch(url, fetchOptions);
lokiP2pAPI.setContactOnline(pubKey);
return;
} catch (e) {
log.warn('Failed to send P2P message, falling back to storage', e);
lokiP2pAPI.setContactOffline(pubKey);
}
}
// Nonce is returned as a base64 string to include in header // Nonce is returned as a base64 string to include in header
let nonce; let nonce;
try { try {

@ -1,21 +1,94 @@
class LokiP2pAPI { /* global setTimeout, clearTimeout, window */
const EventEmitter = require('events');
class LokiP2pAPI extends EventEmitter {
constructor() { constructor() {
super();
this.contactP2pDetails = {}; this.contactP2pDetails = {};
} }
addContactP2pDetails(pubKey, address, port) { addContactP2pDetails(pubKey, address, port, resetTimer = false) {
// Stagger the timers so the friends don't ping each other at the same time
this.ourKey = this.ourKey || window.textsecure.storage.user.getNumber();
const timerDuration =
pubKey < this.ourKey
? 60 * 1000 // 1 minute
: 2 * 60 * 1000; // 2 minutes
if (!this.contactP2pDetails[pubKey]) {
// If this is the first time we are getting this contacts details
// then we try to ping them straight away
this.contactP2pDetails[pubKey] = { this.contactP2pDetails[pubKey] = {
address, address,
port, port,
timerDuration,
isOnline: false,
pingTimer: null,
}; };
this.pingContact(pubKey);
return;
}
clearTimeout(this.contactP2pDetails[pubKey].pingTimer);
if (
this.contactP2pDetails[pubKey].address !== address ||
this.contactP2pDetails[pubKey].port !== port
) {
// If this contact has changed their details
// then we try to ping them straight away
this.contactP2pDetails[pubKey].address = address;
this.contactP2pDetails[pubKey].port = port;
this.contactP2pDetails[pubKey].isOnline = false;
this.pingContact(pubKey);
return;
}
if (resetTimer) {
// If this contact is simply sharing the same details with us
// then we just reset our timer
this.contactP2pDetails[pubKey].pingTimer = setTimeout(
this.pingContact.bind(this),
this.contactP2pDetails[pubKey].timerDuration,
pubKey
);
return;
}
this.pingContact(pubKey);
} }
getContactP2pDetails(pubKey) { getContactP2pDetails(pubKey) {
return this.contactP2pDetails[pubKey] || null; return this.contactP2pDetails[pubKey] || null;
} }
removeContactP2pDetails(pubKey) { setContactOffline(pubKey) {
delete this.contactP2pDetails[pubKey]; this.emit('offline', pubKey);
if (!this.contactP2pDetails[pubKey]) {
return;
}
clearTimeout(this.contactP2pDetails[pubKey].pingTimer);
this.contactP2pDetails[pubKey].isOnline = false;
}
setContactOnline(pubKey) {
if (!this.contactP2pDetails[pubKey]) {
return;
}
this.emit('online', pubKey);
clearTimeout(this.contactP2pDetails[pubKey].pingTimer);
this.contactP2pDetails[pubKey].isOnline = true;
this.contactP2pDetails[pubKey].pingTimer = setTimeout(
this.pingContact.bind(this),
this.contactP2pDetails[pubKey].timerDuration,
pubKey
);
}
pingContact(pubKey) {
if (!this.contactP2pDetails[pubKey]) {
return;
}
window.libloki.api.sendOnlineBroadcastMessage(pubKey, true);
} }
} }

@ -23,10 +23,9 @@
); );
} }
async function sendOnlineBroadcastMessage(pubKey) { async function sendOnlineBroadcastMessage(pubKey, forceP2p = false) {
// TODO: Make this actually get a loki address rather than junk string
const lokiAddressMessage = new textsecure.protobuf.LokiAddressMessage({ const lokiAddressMessage = new textsecure.protobuf.LokiAddressMessage({
p2pAddress: 'testAddress', p2pAddress: 'http://localhost',
p2pPort: parseInt(window.localServerPort, 10), p2pPort: parseInt(window.localServerPort, 10),
}); });
const content = new textsecure.protobuf.Content({ const content = new textsecure.protobuf.Content({
@ -41,7 +40,7 @@
log.info('Online broadcast message sent successfully'); log.info('Online broadcast message sent successfully');
} }
}; };
const options = { messageType: 'onlineBroadcast' }; const options = { messageType: 'onlineBroadcast', forceP2p };
// Send a empty message with information about how to contact us directly // Send a empty message with information about how to contact us directly
const outgoingMessage = new textsecure.OutgoingMessage( const outgoingMessage = new textsecure.OutgoingMessage(
null, // server null, // server

@ -74,7 +74,7 @@
}); });
}; };
this.handleMessage = message => { this.handleMessage = (message, isP2p = false) => {
try { try {
const dataPlaintext = stringToArrayBufferBase64(message); const dataPlaintext = stringToArrayBufferBase64(message);
const messageBuf = textsecure.protobuf.WebSocketMessage.decode( const messageBuf = textsecure.protobuf.WebSocketMessage.decode(
@ -89,7 +89,8 @@
path: messageBuf.request.path, path: messageBuf.request.path,
body: messageBuf.request.body, body: messageBuf.request.body,
id: messageBuf.request.id, id: messageBuf.request.id,
}) }),
isP2p
); );
} }
} catch (error) { } catch (error) {

@ -87,7 +87,7 @@ MessageReceiver.prototype.extend({
localLokiServer.start(localServerPort).then(port => { localLokiServer.start(localServerPort).then(port => {
window.log.info(`Local Server started at localhost:${port}`); window.log.info(`Local Server started at localhost:${port}`);
libloki.api.broadcastOnlineStatus(); libloki.api.broadcastOnlineStatus();
localLokiServer.on('message', this.httpPollingResource.handleMessage); localLokiServer.on('message', this.handleP2pMessage.bind(this));
}); });
// TODO: Rework this socket stuff to work with online messaging // TODO: Rework this socket stuff to work with online messaging
@ -119,6 +119,9 @@ MessageReceiver.prototype.extend({
// all cached envelopes are processed. // all cached envelopes are processed.
this.incoming = [this.pending]; this.incoming = [this.pending];
}, },
handleP2pMessage(message) {
this.httpPollingResource.handleMessage(message, true);
},
shutdown() { shutdown() {
if (this.socket) { if (this.socket) {
this.socket.onclose = null; this.socket.onclose = null;
@ -135,7 +138,7 @@ MessageReceiver.prototype.extend({
if (localLokiServer) { if (localLokiServer) {
localLokiServer.removeListener( localLokiServer.removeListener(
'message', 'message',
this.httpPollingResource.handleMessage this.handleP2pMessage.bind(this)
); );
} }
}, },
@ -194,7 +197,7 @@ MessageReceiver.prototype.extend({
// return this.dispatchAndWait(event); // return this.dispatchAndWait(event);
// }); // });
}, },
handleRequest(request) { handleRequest(request, isP2p = false) {
this.incoming = this.incoming || []; this.incoming = this.incoming || [];
const lastPromise = _.last(this.incoming); const lastPromise = _.last(this.incoming);
@ -214,6 +217,9 @@ MessageReceiver.prototype.extend({
const promise = Promise.resolve(request.body.toArrayBuffer()) // textsecure.crypto const promise = Promise.resolve(request.body.toArrayBuffer()) // textsecure.crypto
.then(plaintext => { .then(plaintext => {
const envelope = textsecure.protobuf.Envelope.decode(plaintext); const envelope = textsecure.protobuf.Envelope.decode(plaintext);
if (isP2p) {
lokiP2pAPI.setContactOnline(envelope.source);
}
// After this point, decoding errors are not the server's // After this point, decoding errors are not the server's
// fault, and we should handle them gracefully and tell the // fault, and we should handle them gracefully and tell the
// user they received an invalid message // user they received an invalid message
@ -223,6 +229,7 @@ MessageReceiver.prototype.extend({
} }
envelope.id = envelope.serverGuid || window.getGuid(); envelope.id = envelope.serverGuid || window.getGuid();
envelope.isP2p = isP2p;
envelope.serverTimestamp = envelope.serverTimestamp envelope.serverTimestamp = envelope.serverTimestamp
? envelope.serverTimestamp.toNumber() ? envelope.serverTimestamp.toNumber()
: null; : null;
@ -901,7 +908,12 @@ MessageReceiver.prototype.extend({
}, },
async handleLokiAddressMessage(envelope, lokiAddressMessage) { async handleLokiAddressMessage(envelope, lokiAddressMessage) {
const { p2pAddress, p2pPort } = lokiAddressMessage; const { p2pAddress, p2pPort } = lokiAddressMessage;
lokiP2pAPI.addContactP2pDetails(envelope.source, p2pAddress, p2pPort); lokiP2pAPI.addContactP2pDetails(
envelope.source,
p2pAddress,
p2pPort,
envelope.isP2p
);
return this.removeFromCache(envelope); return this.removeFromCache(envelope);
}, },
handleDataMessage(envelope, msg) { handleDataMessage(envelope, msg) {

@ -42,11 +42,13 @@ function OutgoingMessage(
this.failoverNumbers = []; this.failoverNumbers = [];
this.unidentifiedDeliveries = []; this.unidentifiedDeliveries = [];
const { numberInfo, senderCertificate, online, messageType } = options || {}; const { numberInfo, senderCertificate, online, messageType, forceP2p } =
options || {};
this.numberInfo = numberInfo; this.numberInfo = numberInfo;
this.senderCertificate = senderCertificate; this.senderCertificate = senderCertificate;
this.online = online; this.online = online;
this.messageType = messageType || 'outgoing'; this.messageType = messageType || 'outgoing';
this.forceP2p = forceP2p || false;
} }
OutgoingMessage.prototype = { OutgoingMessage.prototype = {
@ -185,7 +187,13 @@ OutgoingMessage.prototype = {
async transmitMessage(number, data, timestamp, ttl = 24 * 60 * 60) { async transmitMessage(number, data, timestamp, ttl = 24 * 60 * 60) {
const pubKey = number; const pubKey = number;
try { try {
await lokiMessageAPI.sendMessage(pubKey, data, timestamp, ttl); await lokiMessageAPI.sendMessage(
pubKey,
data,
timestamp,
ttl,
this.forceP2p
);
} catch (e) { } catch (e) {
if (e.name === 'HTTPError' && (e.code !== 409 && e.code !== 410)) { if (e.name === 'HTTPError' && (e.code !== 409 && e.code !== 410)) {
// 409 and 410 should bubble and be handled by doSendMessage // 409 and 410 should bubble and be handled by doSendMessage

Loading…
Cancel
Save