From 9794877692f2856e9d480ea57816e46823c26279 Mon Sep 17 00:00:00 2001 From: Morgan Pretty Date: Tue, 30 May 2023 14:20:35 +1000 Subject: [PATCH] Fixed an issue where jobs could run before their references are persisted --- SessionUtilitiesKit/JobRunner/JobRunner.swift | 54 +++++++++++-------- 1 file changed, 31 insertions(+), 23 deletions(-) diff --git a/SessionUtilitiesKit/JobRunner/JobRunner.swift b/SessionUtilitiesKit/JobRunner/JobRunner.swift index 7906fd532..c6b0bad56 100644 --- a/SessionUtilitiesKit/JobRunner/JobRunner.swift +++ b/SessionUtilitiesKit/JobRunner/JobRunner.swift @@ -138,13 +138,15 @@ public final class JobRunner { return } - queues.wrappedValue[updatedJob.variant]?.add(db, job: updatedJob, canStartJob: canStartJob) - - // Don't start the queue if the job can't be started - guard canStartJob else { return } - - // Start the job runner if needed - db.afterNextTransactionNestedOnce(dedupeId: "JobRunner-Start: \(updatedJob.variant)") { _ in + // Wait until the transaction has been completed before updating the queue (to ensure anything + // created during the transaction has been saved to the database before any corresponding jobs + // are run) + db.afterNextTransactionNested { _ in + queues.wrappedValue[updatedJob.variant]?.add(updatedJob, canStartJob: canStartJob) + + // Don't start the queue if the job can't be started + guard canStartJob else { return } + queues.wrappedValue[updatedJob.variant]?.start() } } @@ -161,17 +163,22 @@ public final class JobRunner { return } - queues.wrappedValue[job.variant]?.upsert(db, job: job, canStartJob: canStartJob) - - // Don't start the queue if the job can't be started - guard canStartJob else { return } - - // Start the job runner if needed - db.afterNextTransactionNestedOnce(dedupeId: "JobRunner-Start: \(job.variant)") { _ in + // Wait until the transaction has been completed before updating the queue (to ensure anything + // created during the transaction has been saved to the database before any corresponding jobs + // are run) + db.afterNextTransactionNested { _ in + queues.wrappedValue[job.variant]?.upsert(job, canStartJob: canStartJob) + + // Don't start the queue if the job can't be started + guard canStartJob else { return } + queues.wrappedValue[job.variant]?.start() } } + /// Insert a job before another job in the queue + /// + /// **Note:** This function assumes the relevant job queue is already running and as such **will not** start the queue if it isn't running @discardableResult public static func insert(_ db: Database, job: Job?, before otherJob: Job) -> (Int64, Job)? { switch job?.behaviour { case .recurringOnActive, .recurringOnLaunch, .runOnceNextLaunch: @@ -191,7 +198,12 @@ public final class JobRunner { return nil } - queues.wrappedValue[updatedJob.variant]?.insert(updatedJob, before: otherJob) + // Wait until the transaction has been completed before updating the queue (to ensure anything + // created during the transaction has been saved to the database before any corresponding jobs + // are run) + db.afterNextTransactionNested { _ in + queues.wrappedValue[updatedJob.variant]?.insert(updatedJob, before: otherJob) + } return (jobId, updatedJob) } @@ -524,7 +536,7 @@ private final class JobQueue { // MARK: - Execution - fileprivate func add(_ db: Database, job: Job, canStartJob: Bool = true) { + fileprivate func add(_ job: Job, canStartJob: Bool = true) { // Check if the job should be added to the queue guard canStartJob, @@ -541,11 +553,7 @@ private final class JobQueue { // If this is a concurrent queue then we should immediately start the next job guard executionType == .concurrent else { return } - // Ensure that the database commit has completed and then trigger the next job to run (need - // to ensure any interactions have been correctly inserted first) - db.afterNextTransactionNestedOnce(dedupeId: "JobRunner-Add: \(job.variant)") { [weak self] _ in - self?.runNextJob() - } + runNextJob() } /// Upsert a job onto the queue, if the queue isn't currently running and 'canStartJob' is true then this will start @@ -553,7 +561,7 @@ private final class JobQueue { /// /// **Note:** If the job has a `behaviour` of `runOnceNextLaunch` or the `nextRunTimestamp` /// is in the future then the job won't be started - fileprivate func upsert(_ db: Database, job: Job, canStartJob: Bool = true) { + fileprivate func upsert(_ job: Job, canStartJob: Bool = true) { guard let jobId: Int64 = job.id else { SNLog("[JobRunner] Prevented attempt to upsert \(job.variant) job without id to queue") return @@ -576,7 +584,7 @@ private final class JobQueue { // If we didn't update an existing job then we need to add it to the queue guard !didUpdateExistingJob else { return } - add(db, job: job, canStartJob: canStartJob) + add(job, canStartJob: canStartJob) } fileprivate func insert(_ job: Job, before otherJob: Job) {