use newer delete system every 5s, deleteMessage(), store token in serverAPI, serverRequest refactor, registerChannel now returns the channel found/created, change baseChannelUrl base

pull/455/head
Ryan Tharp 6 years ago
parent 807b32fac0
commit 33572cac1f

@ -1,10 +1,11 @@
/* global log, textsecure, libloki, Signal, Whisper */
/* global log, textsecure, libloki, Signal, Whisper, Headers */
const EventEmitter = require('events');
const nodeFetch = require('node-fetch');
const { URL, URLSearchParams } = require('url');
const GROUPCHAT_POLL_EVERY = 1000; // 1 second
const DELETION_POLL_EVERY = 60000; // 1 minute
// Can't be less than 1200 if we have unauth'd requests
const GROUPCHAT_POLL_EVERY = 1500; // 1.5s
const DELETION_POLL_EVERY = 5000; // 1 second
// singleton to relay events to libtextsecure/message_receiver
class LokiPublicChatAPI extends EventEmitter {
@ -15,17 +16,18 @@ class LokiPublicChatAPI extends EventEmitter {
this.servers = [];
}
findOrCreateServer(hostport) {
log.info(`LokiPublicChatAPI looking for ${hostport}`);
let thisServer = this.servers.find(server => server.server === hostport);
if (!thisServer) {
log.info(`LokiPublicChatAPI creating ${hostport}`);
thisServer = new LokiPublicServerAPI(this, hostport);
this.servers.push(thisServer);
}
return thisServer;
}
// rename to findOrCreateChannel?
registerChannel(hostport, channelId, conversationId) {
const server = this.findOrCreateServer(hostport);
server.findOrCreateChannel(channelId, conversationId);
return server.findOrCreateChannel(channelId, conversationId);
}
unregisterChannel(hostport, channelId) {
let thisServer;
@ -52,12 +54,18 @@ class LokiPublicServerAPI {
this.channels = [];
this.tokenPromise = null;
this.baseServerUrl = url;
const ref = this;
(async function justToEnableAsyncToGetToken() {
ref.token = await ref.getOrRefreshServerToken();
log.info(`set token ${ref.token}`);
})();
}
findOrCreateChannel(channelId, conversationId) {
let thisChannel = this.channels.find(
channel => channel.channelId === channelId
);
if (!thisChannel) {
log.info(`LokiPublicChatAPI creating channel ${conversationId}`);
thisChannel = new LokiPublicChannelAPI(this, channelId, conversationId);
this.channels.push(thisChannel);
}
@ -165,9 +173,7 @@ class LokiPublicChannelAPI {
constructor(serverAPI, channelId, conversationId) {
this.serverAPI = serverAPI;
this.channelId = channelId;
this.baseChannelUrl = `${serverAPI.baseServerUrl}/channels/${
this.channelId
}`;
this.baseChannelUrl = `channels/${this.channelId}`;
this.groupName = 'unknown';
this.conversationId = conversationId;
this.lastGot = 0;
@ -175,16 +181,62 @@ class LokiPublicChannelAPI {
log.info(`registered LokiPublicChannel ${channelId}`);
// start polling
this.pollForMessages();
this.deleteLastId = 1;
this.pollForDeletions();
}
getEndpoint() {
const endpoint = `${this.serverAPI.baseServerUrl}/channels/${
this.channelId
const endpoint = `${this.serverAPI.baseServerUrl}/${
this.baseChannelUrl
}/messages`;
return endpoint;
}
// we'll pass token for now
async serverRequest(endpoint, params, method) {
const url = new URL(`${this.serverAPI.baseServerUrl}/${endpoint}`);
url.search = new URLSearchParams(params);
let res;
let { token } = this.serverAPI;
if (!token) {
token = await this.serverAPI.getOrRefreshServerToken();
if (!token) {
log.error('NO TOKEN');
return {
err: 'noToken',
};
}
}
try {
// eslint-disable-next-line no-await-in-loop
const options = {
headers: new Headers({
Authorization: `Bearer ${this.serverAPI.token}`,
}),
};
if (method) {
options.method = method;
}
res = await nodeFetch(url, options || undefined);
} catch (e) {
log.info(`e ${e}`);
return {
err: e,
};
}
// eslint-disable-next-line no-await-in-loop
const response = await res.json();
if (response.meta.code !== 200) {
return {
err: 'statusCode',
response,
};
}
return {
response,
};
}
async pollForChannel(source, endpoint) {
// groupName will be loaded from server
const url = new URL(this.baseChannelUrl);
@ -214,54 +266,51 @@ class LokiPublicChannelAPI {
}, DELETION_POLL_EVERY);
};
let numChecked = 0;
const url = new URL(`${this.baseChannelUrl}/messages`);
const params = {
include_annotations: 1,
count: -200,
count: 200,
};
let beforeId = 0;
while (numChecked < 2000) {
params.before_id = beforeId;
url.search = new URLSearchParams(params);
let res;
try {
// eslint-disable-next-line no-await-in-loop
res = await nodeFetch(url);
} catch (e) {
pollAgain();
return;
}
// full scan
let more = true;
while (more) {
params.since_id = this.deleteLastId;
const res = await this.serverRequest(
`loki/v1/channel/${this.channelId}/deletes`,
params
);
// eslint-disable-next-line no-await-in-loop
const response = await res.json();
if (response.meta.code !== 200) {
pollAgain();
return;
}
numChecked += response.data.length;
// eslint-disable-next-line no-loop-func
response.data.reverse().forEach(adnMessage => {
if (beforeId === 0 || adnMessage.id < beforeId) {
beforeId = adnMessage.id;
}
if (adnMessage.is_deleted) {
Whisper.events.trigger('deletePublicMessage', {
messageServerId: adnMessage.id,
conversationId: this.conversationId,
});
}
res.response.data.reverse().forEach(deleteEntry => {
Whisper.events.trigger('deleteLocalPublicMessage', {
messageServerId: deleteEntry.message_id,
conversationId: this.conversationId,
});
});
if (response.data.length < 200) {
if (res.response.data.length < 200) {
break;
}
this.deleteLastId = res.response.meta.max_id;
({ more } = res.response);
}
pollAgain();
}
async deleteMessage(serverId) {
const params = {};
const res = await this.serverRequest(
`${this.baseChannelUrl}/messages/${serverId}`,
params,
'DELETE'
);
if (!res.err && res.response) {
log.info(`deleted ${serverId} on ${this.baseChannelUrl}`);
return true;
}
log.warn(`failed to delete ${serverId} on ${this.baseChannelUrl}`);
return false;
}
async pollForMessages() {
const url = new URL(`${this.baseChannelUrl}/messages`);
const params = {
include_annotations: 1,
count: -20,
@ -270,28 +319,14 @@ class LokiPublicChannelAPI {
if (this.lastGot) {
params.since_id = this.lastGot;
}
url.search = new URLSearchParams(params);
let res;
let success = true;
try {
res = await nodeFetch(url);
} catch (e) {
success = false;
}
const response = await res.json();
if (this.stopPolling) {
// Stop after latest await possible
return;
}
if (response.meta.code !== 200) {
success = false;
}
const res = await this.serverRequest(
`${this.baseChannelUrl}/messages`,
params
);
if (success) {
if (!res.err && res.response) {
let receivedAt = new Date().getTime();
response.data.reverse().forEach(adnMessage => {
res.response.data.reverse().forEach(adnMessage => {
let timestamp = new Date(adnMessage.created_at).getTime();
let from = adnMessage.user.username;
let source;

Loading…
Cancel
Save