Fixed an issue where jobs could run before their references are persisted

pull/751/head
Morgan Pretty 2 years ago
parent 0cfd87ee27
commit 9794877692

@ -138,13 +138,15 @@ public final class JobRunner {
return
}
queues.wrappedValue[updatedJob.variant]?.add(db, job: updatedJob, canStartJob: canStartJob)
// Don't start the queue if the job can't be started
guard canStartJob else { return }
// Start the job runner if needed
db.afterNextTransactionNestedOnce(dedupeId: "JobRunner-Start: \(updatedJob.variant)") { _ in
// Wait until the transaction has been completed before updating the queue (to ensure anything
// created during the transaction has been saved to the database before any corresponding jobs
// are run)
db.afterNextTransactionNested { _ in
queues.wrappedValue[updatedJob.variant]?.add(updatedJob, canStartJob: canStartJob)
// Don't start the queue if the job can't be started
guard canStartJob else { return }
queues.wrappedValue[updatedJob.variant]?.start()
}
}
@ -161,17 +163,22 @@ public final class JobRunner {
return
}
queues.wrappedValue[job.variant]?.upsert(db, job: job, canStartJob: canStartJob)
// Don't start the queue if the job can't be started
guard canStartJob else { return }
// Start the job runner if needed
db.afterNextTransactionNestedOnce(dedupeId: "JobRunner-Start: \(job.variant)") { _ in
// Wait until the transaction has been completed before updating the queue (to ensure anything
// created during the transaction has been saved to the database before any corresponding jobs
// are run)
db.afterNextTransactionNested { _ in
queues.wrappedValue[job.variant]?.upsert(job, canStartJob: canStartJob)
// Don't start the queue if the job can't be started
guard canStartJob else { return }
queues.wrappedValue[job.variant]?.start()
}
}
/// Insert a job before another job in the queue
///
/// **Note:** This function assumes the relevant job queue is already running and as such **will not** start the queue if it isn't running
@discardableResult public static func insert(_ db: Database, job: Job?, before otherJob: Job) -> (Int64, Job)? {
switch job?.behaviour {
case .recurringOnActive, .recurringOnLaunch, .runOnceNextLaunch:
@ -191,7 +198,12 @@ public final class JobRunner {
return nil
}
queues.wrappedValue[updatedJob.variant]?.insert(updatedJob, before: otherJob)
// Wait until the transaction has been completed before updating the queue (to ensure anything
// created during the transaction has been saved to the database before any corresponding jobs
// are run)
db.afterNextTransactionNested { _ in
queues.wrappedValue[updatedJob.variant]?.insert(updatedJob, before: otherJob)
}
return (jobId, updatedJob)
}
@ -524,7 +536,7 @@ private final class JobQueue {
// MARK: - Execution
fileprivate func add(_ db: Database, job: Job, canStartJob: Bool = true) {
fileprivate func add(_ job: Job, canStartJob: Bool = true) {
// Check if the job should be added to the queue
guard
canStartJob,
@ -541,11 +553,7 @@ private final class JobQueue {
// If this is a concurrent queue then we should immediately start the next job
guard executionType == .concurrent else { return }
// Ensure that the database commit has completed and then trigger the next job to run (need
// to ensure any interactions have been correctly inserted first)
db.afterNextTransactionNestedOnce(dedupeId: "JobRunner-Add: \(job.variant)") { [weak self] _ in
self?.runNextJob()
}
runNextJob()
}
/// Upsert a job onto the queue, if the queue isn't currently running and 'canStartJob' is true then this will start
@ -553,7 +561,7 @@ private final class JobQueue {
///
/// **Note:** If the job has a `behaviour` of `runOnceNextLaunch` or the `nextRunTimestamp`
/// is in the future then the job won't be started
fileprivate func upsert(_ db: Database, job: Job, canStartJob: Bool = true) {
fileprivate func upsert(_ job: Job, canStartJob: Bool = true) {
guard let jobId: Int64 = job.id else {
SNLog("[JobRunner] Prevented attempt to upsert \(job.variant) job without id to queue")
return
@ -576,7 +584,7 @@ private final class JobQueue {
// If we didn't update an existing job then we need to add it to the queue
guard !didUpdateExistingJob else { return }
add(db, job: job, canStartJob: canStartJob)
add(job, canStartJob: canStartJob)
}
fileprivate func insert(_ job: Job, before otherJob: Job) {

Loading…
Cancel
Save