Added queue manager to properly fix the race condition bug and reset the changes I made in other commit

pull/164/head
Beaudan 6 years ago
parent b2f456031f
commit fac8e72861

@ -1534,22 +1534,18 @@ async function saveMessage(data, { forceSave } = {}) {
return toCreate.id;
}
async function saveSeenMessageHashes(incomingHashes) {
async function saveSeenMessageHashes(arrayOfHashes) {
let promise;
const hashList = incomingHashes.map(h => h.hash);
const dupHashes = await getSeenMessagesByHashList(hashList);
const newHashes = incomingHashes.filter(h => !dupHashes.includes(h.hash));
db.serialize(() => {
promise = Promise.all([
db.run('BEGIN TRANSACTION;'),
...map(newHashes, hashData => saveSeenMessageHash(hashData)),
...map(arrayOfHashes, hashData => saveSeenMessageHash(hashData)),
db.run('COMMIT TRANSACTION;'),
]);
});
await promise;
return newHashes;
}
async function saveSeenMessageHash(data) {

@ -723,6 +723,7 @@
<script type='text/javascript' src='js/models/blockedNumbers.js'></script>
<script type='text/javascript' src='js/models/profile.js'></script>
<script type='text/javascript' src='js/expiring_messages.js'></script>
<script type='text/javascript' src='js/queue_manager.js'></script>
<script type='text/javascript' src='js/chromium.js'></script>
<script type='text/javascript' src='js/registration.js'></script>

@ -788,7 +788,7 @@ async function cleanSeenMessages() {
}
async function saveSeenMessageHashes(data) {
return channels.saveSeenMessageHashes(_cleanData(data));
await channels.saveSeenMessageHashes(_cleanData(data));
}
async function saveSeenMessageHash(data) {

@ -0,0 +1,28 @@
/* eslint-disable more/no-then */
// eslint-disable-next-line func-names
(function() {
'use strict'
class JobQueue {
constructor() {
this.pending = Promise.resolve();
}
add(job) {
const previous = this.pending || Promise.resolve();
this.pending = previous.then(job, job);
const current = this.pending;
current.then(() => {
if (this.pending === current) {
delete this.pending;
}
});
return current;
}
}
window.JobQueue = JobQueue;
})();

@ -45,15 +45,16 @@
const filterIncomingMessages = async function filterIncomingMessages(
messages
) {
const incomingHashes = messages.map(m => ({
const incomingHashes = messages.map(m => m.hash);
const dupHashes = await window.Signal.Data.getSeenMessagesByHashList(
incomingHashes
);
const newMessages = messages.filter(m => !dupHashes.includes(m.hash));
const newHashes = newMessages.map(m => ({
expiresAt: m.expiration,
hash: m.hash,
}));
let newHashes = await window.Signal.Data.saveSeenMessageHashes(
incomingHashes
);
newHashes = newHashes.map(h => h.hash);
const newMessages = messages.filter(m => newHashes.includes(m.hash));
await window.Signal.Data.saveSeenMessageHashes(newHashes);
return newMessages;
};
@ -64,9 +65,12 @@
handleRequest = request => request.respond(404, 'Not found');
}
let connected = false;
const jobQueue = new window.JobQueue();
const processMessages = async messages => {
const newMessages = await filterIncomingMessages(messages);
const newMessages = await jobQueue.add(
() => filterIncomingMessages(messages)
);
newMessages.forEach(async message => {
const { data } = message;
this.handleMessage(data);

Loading…
Cancel
Save