Merge pull request #1196 from Mikunj/message-queue-fix

Fix message queue
pull/1200/head
Mikunj Varsani 5 years ago committed by GitHub
commit 88d3f54f86
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -26,7 +26,7 @@ import { UserUtil } from '../../util';
export class MessageQueue implements MessageQueueInterface { export class MessageQueue implements MessageQueueInterface {
public readonly events: TypedEventEmitter<MessageQueueInterfaceEvents>; public readonly events: TypedEventEmitter<MessageQueueInterfaceEvents>;
private readonly jobQueues: Map<PubKey, JobQueue> = new Map(); private readonly jobQueues: Map<string, JobQueue> = new Map();
private readonly pendingMessageCache: PendingMessageCache; private readonly pendingMessageCache: PendingMessageCache;
constructor(cache?: PendingMessageCache) { constructor(cache?: PendingMessageCache) {
@ -162,17 +162,19 @@ export class MessageQueue implements MessageQueueInterface {
const messageId = String(message.timestamp); const messageId = String(message.timestamp);
if (!jobQueue.has(messageId)) { if (!jobQueue.has(messageId)) {
try { // We put the event handling inside this job to avoid sending duplicate events
await jobQueue.addWithId(messageId, async () => const job = async () => {
MessageSender.send(message) try {
); await MessageSender.send(message);
this.events.emit('success', message); this.events.emit('success', message);
} catch (e) { } catch (e) {
this.events.emit('fail', message, e); this.events.emit('fail', message, e);
} finally { } finally {
// Remove from the cache because retrying is done in the sender // Remove from the cache because retrying is done in the sender
void this.pendingMessageCache.remove(message); void this.pendingMessageCache.remove(message);
} }
};
await jobQueue.addWithId(messageId, job);
} }
}); });
} }
@ -203,10 +205,10 @@ export class MessageQueue implements MessageQueueInterface {
} }
private getJobQueue(device: PubKey): JobQueue { private getJobQueue(device: PubKey): JobQueue {
let queue = this.jobQueues.get(device); let queue = this.jobQueues.get(device.key);
if (!queue) { if (!queue) {
queue = new JobQueue(); queue = new JobQueue();
this.jobQueues.set(device, queue); this.jobQueues.set(device.key, queue);
} }
return queue; return queue;

@ -15,7 +15,7 @@ export class PubKey {
if (!PubKey.validate(pubkeyString)) { if (!PubKey.validate(pubkeyString)) {
throw new Error(`Invalid pubkey string passed: ${pubkeyString}`); throw new Error(`Invalid pubkey string passed: ${pubkeyString}`);
} }
this.key = pubkeyString; this.key = pubkeyString.toLowerCase();
} }
/** /**
@ -54,7 +54,7 @@ export class PubKey {
public isEqual(comparator: PubKey | string) { public isEqual(comparator: PubKey | string) {
return comparator instanceof PubKey return comparator instanceof PubKey
? this.key === comparator.key ? this.key === comparator.key
: this.key === comparator; : this.key === comparator.toLowerCase();
} }
} }

Loading…
Cancel
Save