From 71d51a873a436a6af9d6c325cf8419bf3c754e7d Mon Sep 17 00:00:00 2001 From: nielsandriesse Date: Sun, 8 Nov 2020 13:54:40 +1100 Subject: [PATCH] Further implement JobQueue --- SessionMessagingKit/Jobs/JobQueue.swift | 44 +++++++++++++++++-- .../Jobs/MessageReceiveJob.swift | 3 +- SessionMessagingKit/Jobs/MessageSendJob.swift | 1 - .../Jobs/NotifyPNServerJob.swift | 1 - SessionMessagingKit/Storage.swift | 3 ++ 5 files changed, 44 insertions(+), 8 deletions(-) diff --git a/SessionMessagingKit/Jobs/JobQueue.swift b/SessionMessagingKit/Jobs/JobQueue.swift index 223b94741..3fb56e705 100644 --- a/SessionMessagingKit/Jobs/JobQueue.swift +++ b/SessionMessagingKit/Jobs/JobQueue.swift @@ -1,3 +1,4 @@ +import SessionUtilities public final class JobQueue : JobDelegate { @@ -10,12 +11,47 @@ public final class JobQueue : JobDelegate { } public func handleJobSucceeded(_ job: Job) { - // Mark the job as succeeded + Configuration.shared.storage.withAsync({ transaction in + Configuration.shared.storage.markJobAsSucceeded(job, using: transaction) + }, completion: { + // Do nothing + }) } public func handleJobFailed(_ job: Job, with error: Error) { - // Persist the job - // Retry it if the max failure count hasn't been reached - // Propagate the error otherwise + job.failureCount += 1 + let storage = Configuration.shared.storage + storage.withAsync({ transaction in + storage.persist(job, using: transaction) + }, completion: { // Intentionally capture self + if job.failureCount == type(of: job).maxFailureCount { + storage.withAsync({ transaction in + storage.markJobAsFailed(job, using: transaction) + }, completion: { + // Do nothing + }) + } else { + let retryInterval = self.getRetryInterval(for: job) + Timer.weakScheduledTimer(withTimeInterval: retryInterval, target: self, selector: #selector(retry(_:)), userInfo: job, repeats: false) + } + }) + } + + private func getRetryInterval(for job: Job) -> TimeInterval { + // Arbitrary backoff factor... + // try 1 delay: 0.00s + // try 2 delay: 0.19s + // ... + // try 5 delay: 1.30s + // ... + // try 11 delay: 61.31s + let backoffFactor = 1.9 + let maxBackoff: Double = 60 * 60 * 1000 + return 0.1 * min(maxBackoff, pow(backoffFactor, Double(job.failureCount))) + } + + @objc private func retry(_ job: Any) { + guard let job = job as? Job else { return } + job.execute() } } diff --git a/SessionMessagingKit/Jobs/MessageReceiveJob.swift b/SessionMessagingKit/Jobs/MessageReceiveJob.swift index af0e37ed1..180b0a0eb 100644 --- a/SessionMessagingKit/Jobs/MessageReceiveJob.swift +++ b/SessionMessagingKit/Jobs/MessageReceiveJob.swift @@ -6,7 +6,7 @@ public final class MessageReceiveJob : NSObject, Job, NSCoding { // NSObject/NS public var failureCount: UInt = 0 // MARK: Settings - public static let maxFailureCount: UInt = 20 + public static let maxFailureCount: UInt = 10 // MARK: Initialization init(data: Data) { @@ -45,7 +45,6 @@ public final class MessageReceiveJob : NSObject, Job, NSCoding { // NSObject/NS } private func handleFailure(error: Error) { - self.failureCount += 1 delegate?.handleJobFailed(self, with: error) } } diff --git a/SessionMessagingKit/Jobs/MessageSendJob.swift b/SessionMessagingKit/Jobs/MessageSendJob.swift index 43dade570..59a592b08 100644 --- a/SessionMessagingKit/Jobs/MessageSendJob.swift +++ b/SessionMessagingKit/Jobs/MessageSendJob.swift @@ -51,7 +51,6 @@ public final class MessageSendJob : NSObject, Job, NSCoding { // NSObject/NSCodi } private func handleFailure(error: Error) { - self.failureCount += 1 delegate?.handleJobFailed(self, with: error) } } diff --git a/SessionMessagingKit/Jobs/NotifyPNServerJob.swift b/SessionMessagingKit/Jobs/NotifyPNServerJob.swift index 587bfe7cf..30a0abf92 100644 --- a/SessionMessagingKit/Jobs/NotifyPNServerJob.swift +++ b/SessionMessagingKit/Jobs/NotifyPNServerJob.swift @@ -39,7 +39,6 @@ public final class NotifyPNServerJob : NSObject, Job, NSCoding { // NSObject/NSC } private func handleFailure(error: Error) { - self.failureCount += 1 delegate?.handleJobFailed(self, with: error) } } diff --git a/SessionMessagingKit/Storage.swift b/SessionMessagingKit/Storage.swift index d10c879c2..151c2226a 100644 --- a/SessionMessagingKit/Storage.swift +++ b/SessionMessagingKit/Storage.swift @@ -3,10 +3,13 @@ import SessionProtocolKit public protocol SessionMessagingKitStorageProtocol { func with(_ work: (Any) -> Void) + func withAsync(_ work: (Any) -> Void, completion: () -> Void) func getUserPublicKey() -> String? func getOrGenerateRegistrationID(using transaction: Any) -> UInt32 func isClosedGroup(_ publicKey: String) -> Bool func getClosedGroupPrivateKey(for publicKey: String) -> String? func persist(_ job: Job, using transaction: Any) + func markJobAsSucceeded(_ job: Job, using transaction: Any) + func markJobAsFailed(_ job: Job, using transaction: Any) }