@ -2,7 +2,6 @@
import Foundation
import Foundation
import GRDB
import GRDB
import SignalCoreKit
public protocol JobExecutor {
public protocol JobExecutor {
// / T h e m a x i m u m n u m b e r o f t i m e s t h e j o b c a n f a i l b e f o r e i t f a i l s p e r m a n e n t l y
// / T h e m a x i m u m n u m b e r o f t i m e s t h e j o b c a n f a i l b e f o r e i t f a i l s p e r m a n e n t l y
@ -29,9 +28,10 @@ public protocol JobExecutor {
static func run (
static func run (
_ job : Job ,
_ job : Job ,
queue : DispatchQueue ,
queue : DispatchQueue ,
success : @ escaping ( Job , Bool ) -> ( ) ,
success : @ escaping ( Job , Bool , Dependencies ) -> ( ) ,
failure : @ escaping ( Job , Error ? , Bool ) -> ( ) ,
failure : @ escaping ( Job , Error ? , Bool , Dependencies ) -> ( ) ,
deferred : @ escaping ( Job ) -> ( )
deferred : @ escaping ( Job , Dependencies ) -> ( ) ,
dependencies : Dependencies
)
)
}
}
@ -43,7 +43,22 @@ public final class JobRunner {
case notFound
case notFound
}
}
private static let blockingQueue : Atomic < JobQueue ? > = Atomic (
// MARK: - V a r i a b l e s
private let blockingQueue : Atomic < JobQueue ? >
private let queues : Atomic < [ Job . Variant : JobQueue ] >
internal var perSessionJobsCompleted : Atomic < Set < Int64 > > = Atomic ( [ ] )
internal var hasCompletedInitialBecomeActive : Atomic < Bool > = Atomic ( false )
internal var shutdownBackgroundTask : Atomic < OWSBackgroundTask ? > = Atomic ( nil )
internal var canStartQueues : Atomic < Bool > = Atomic ( false )
// MARK: - I n i t i a l i z a t i o n
init ( dependencies : Dependencies = Dependencies ( ) ) {
var jobVariants : Set < Job . Variant > = Job . Variant . allCases . asSet ( )
self . blockingQueue = Atomic (
JobQueue (
JobQueue (
type : . blocking ,
type : . blocking ,
qos : . default ,
qos : . default ,
@ -51,14 +66,14 @@ public final class JobRunner {
onQueueDrained : {
onQueueDrained : {
// O n c e a l l b l o c k i n g j o b s h a v e b e e n c o m p l e t e d w e w a n t t o s t a r t r u n n i n g
// O n c e a l l b l o c k i n g j o b s h a v e b e e n c o m p l e t e d w e w a n t t o s t a r t r u n n i n g
// t h e r e m a i n i n g j o b q u e u e s
// t h e r e m a i n i n g j o b q u e u e s
queues . wrappedValue . forEach { _ , queue in queue . start ( ) }
JobRunner . startNonBlockingQueues ( dependencies : dependencies )
}
}
)
)
)
)
private static let queues : Atomic < [ Job . Variant : JobQueue ] > = {
self . queues = Atomic ( [
var jobVariants : Set < Job . Variant > = Job . Variant . allCases . asSet ( )
// MARK: - - M e s s a g e S e n d Q u e u e
let messageSendQueue : JobQueue = JobQueue (
JobQueue (
type : . messageSend ,
type : . messageSend ,
executionType : . concurrent , // A l l o w a s m a n y j o b s t o r u n a t o n c e a s s u p p o r t e d b y t h e d e v i c e
executionType : . concurrent , // A l l o w a s m a n y j o b s t o r u n a t o n c e a s s u p p o r t e d b y t h e d e v i c e
qos : . default ,
qos : . default ,
@ -68,8 +83,11 @@ public final class JobRunner {
jobVariants . remove ( . notifyPushServer ) ,
jobVariants . remove ( . notifyPushServer ) ,
jobVariants . remove ( . sendReadReceipts )
jobVariants . remove ( . sendReadReceipts )
] . compactMap { $0 }
] . compactMap { $0 }
)
) ,
let messageReceiveQueue : JobQueue = JobQueue (
// MARK: - - M e s s a g e R e c e i v e Q u e u e
JobQueue (
type : . messageReceive ,
type : . messageReceive ,
// E x p l i c i t l y s e r i a l a s e x e c u t i n g c o n c u r r e n t l y m e a n s m e s s a g e r e c e i v e s g e t t i n g p r o c e s s e d a t
// E x p l i c i t l y s e r i a l a s e x e c u t i n g c o n c u r r e n t l y m e a n s m e s s a g e r e c e i v e s g e t t i n g p r o c e s s e d a t
// d i f f e r e n t s p e e d s w h i c h c a n r e s u l t i n :
// d i f f e r e n t s p e e d s w h i c h c a n r e s u l t i n :
@ -81,127 +99,47 @@ public final class JobRunner {
jobVariants : [
jobVariants : [
jobVariants . remove ( . messageReceive )
jobVariants . remove ( . messageReceive )
] . compactMap { $0 }
] . compactMap { $0 }
)
) ,
let attachmentDownloadQueue : JobQueue = JobQueue (
// MARK: - - A t t a c h m e n t D o w n l o a d Q u e u e
JobQueue (
type : . attachmentDownload ,
type : . attachmentDownload ,
qos : . utility ,
qos : . utility ,
jobVariants : [
jobVariants : [
jobVariants . remove ( . attachmentDownload )
jobVariants . remove ( . attachmentDownload )
] . compactMap { $0 }
] . compactMap { $0 }
)
) ,
let generalQueue : JobQueue = JobQueue (
// MARK: - - G e n e r a l Q u e u e
JobQueue (
type : . general ( number : 0 ) ,
type : . general ( number : 0 ) ,
qos : . utility ,
qos : . utility ,
jobVariants : Array ( jobVariants )
jobVariants : Array ( jobVariants )
)
)
return Atomic ( [
messageSendQueue ,
messageReceiveQueue ,
attachmentDownloadQueue ,
generalQueue
] . reduce ( into : [ : ] ) { prev , next in
] . reduce ( into : [ : ] ) { prev , next in
next . jobVariants . forEach { variant in
next . jobVariants . forEach { variant in
prev [ variant ] = next
prev [ variant ] = next
}
}
} )
} )
} ( )
}
internal static var executorMap : Atomic < [ Job . Variant : JobExecutor . Type ] > = Atomic ( [ : ] )
fileprivate static var perSessionJobsCompleted : Atomic < Set < Int64 > > = Atomic ( [ ] )
private static var hasCompletedInitialBecomeActive : Atomic < Bool > = Atomic ( false )
private static var shutdownBackgroundTask : Atomic < OWSBackgroundTask ? > = Atomic ( nil )
fileprivate static var canStartQueues : Atomic < Bool > = Atomic ( false )
// MARK: - C o n f i g u r a t i o n
// MARK: - C o n f i g u r a t i o n
public static func add ( executor : JobExecutor . Type , for variant : Job . Variant ) {
internal func add ( executor : JobExecutor . Type , for variant : Job . Variant ) {
executorMap. mutate { $0 [ variant ] = executor }
queues . wrappedValue [ variant ] ? . addExecutor ( executor , for : variant )
}
}
// MARK: - E x e c u t i o n
// MARK: - E x e c u t i o n
// / A d d a j o b o n t o t h e q u e u e , i f t h e q u e u e i s n ' t c u r r e n t l y r u n n i n g a n d ' c a n S t a r t J o b ' i s t r u e t h e n t h i s w i l l s t a r t
internal func appDidFinishLaunching ( dependencies : Dependencies ) {
// / t h e J o b R u n n e r
// /
// / * * N o t e : * * I f t h e j o b h a s a ` b e h a v i o u r ` o f ` r u n O n c e N e x t L a u n c h ` o r t h e ` n e x t R u n T i m e s t a m p `
// / i s i n t h e f u t u r e t h e n t h e j o b w o n ' t b e s t a r t e d
public static func add ( _ db : Database , job : Job ? , canStartJob : Bool = true ) {
// S t o r e t h e j o b i n t o t h e d a t a b a s e ( g e t t i n g a n i d f o r i t )
guard let updatedJob : Job = try ? job ? . inserted ( db ) else {
SNLog ( " [JobRunner] Unable to add \( job . map { " \( $0 . variant ) " } ? ? " unknown " ) job " )
return
}
guard ! canStartJob || updatedJob . id != nil else {
SNLog ( " [JobRunner] Not starting \( job . map { " \( $0 . variant ) " } ? ? " unknown " ) job due to missing id " )
return
}
queues . mutate { $0 [ updatedJob . variant ] ? . add ( updatedJob , canStartJob : canStartJob ) }
// D o n ' t s t a r t t h e q u e u e i f t h e j o b c a n ' t b e s t a r t e d
guard canStartJob else { return }
// S t a r t t h e j o b r u n n e r i f n e e d e d
db . afterNextTransaction { _ in
queues . wrappedValue [ updatedJob . variant ] ? . start ( )
}
}
// / U p s e r t a j o b o n t o t h e q u e u e , i f t h e q u e u e i s n ' t c u r r e n t l y r u n n i n g a n d ' c a n S t a r t J o b ' i s t r u e t h e n t h i s w i l l s t a r t
// / t h e J o b R u n n e r
// /
// / * * N o t e : * * I f t h e j o b h a s a ` b e h a v i o u r ` o f ` r u n O n c e N e x t L a u n c h ` o r t h e ` n e x t R u n T i m e s t a m p `
// / i s i n t h e f u t u r e t h e n t h e j o b w o n ' t b e s t a r t e d
public static func upsert ( _ db : Database , job : Job ? , canStartJob : Bool = true ) {
guard let job : Job = job else { return } // I g n o r e n u l l j o b s
guard job . id != nil else {
add ( db , job : job , canStartJob : canStartJob )
return
}
queues . wrappedValue [ job . variant ] ? . upsert ( job , canStartJob : canStartJob )
// D o n ' t s t a r t t h e q u e u e i f t h e j o b c a n ' t b e s t a r t e d
guard canStartJob else { return }
// S t a r t t h e j o b r u n n e r i f n e e d e d
db . afterNextTransaction { _ in
queues . wrappedValue [ job . variant ] ? . start ( )
}
}
@ discardableResult public static func insert ( _ db : Database , job : Job ? , before otherJob : Job ) -> ( Int64 , Job ) ? {
switch job ? . behaviour {
case . recurringOnActive , . recurringOnLaunch , . runOnceNextLaunch :
SNLog ( " [JobRunner] Attempted to insert \( job . map { " \( $0 . variant ) " } ? ? " unknown " ) job before the current one even though it's behaviour is \( job . map { " \( $0 . behaviour ) " } ? ? " unknown " ) " )
return nil
default : break
}
// S t o r e t h e j o b i n t o t h e d a t a b a s e ( g e t t i n g a n i d f o r i t )
guard let updatedJob : Job = try ? job ? . inserted ( db ) else {
SNLog ( " [JobRunner] Unable to add \( job . map { " \( $0 . variant ) " } ? ? " unknown " ) job " )
return nil
}
guard let jobId : Int64 = updatedJob . id else {
SNLog ( " [JobRunner] Unable to add \( job . map { " \( $0 . variant ) " } ? ? " unknown " ) job due to missing id " )
return nil
}
queues . wrappedValue [ updatedJob . variant ] ? . insert ( updatedJob , before : otherJob )
return ( jobId , updatedJob )
}
public static func appDidFinishLaunching ( ) {
// F l a g t h a t t h e J o b R u n n e r c a n s t a r t i t ' s q u e u e s
// F l a g t h a t t h e J o b R u n n e r c a n s t a r t i t ' s q u e u e s
JobRunner. canStartQueues. mutate { $0 = true }
canStartQueues . mutate { $0 = true }
// N o t e : ' a p p D i d B e c o m e A c t i v e ' w i l l r u n o n f i r s t l a u n c h a n y w a y s o w e c a n
// N o t e : ' a p p D i d B e c o m e A c t i v e ' w i l l r u n o n f i r s t l a u n c h a n y w a y s o w e c a n
// l e a v e t h o s e j o b s o u t a n d c a n w a i t u n t i l t h e n t o s t a r t t h e J o b R u n n e r
// l e a v e t h o s e j o b s o u t a n d c a n w a i t u n t i l t h e n t o s t a r t t h e J o b R u n n e r
let jobsToRun : ( blocking : [ Job ] , nonBlocking : [ Job ] ) = Storage. shared
let jobsToRun : ( blocking : [ Job ] , nonBlocking : [ Job ] ) = dependencies . storage
. read { db in
. read { db in
let blockingJobs : [ Job ] = try Job
let blockingJobs : [ Job ] = try Job
. filter (
. filter (
@ -231,7 +169,11 @@ public final class JobRunner {
guard ! jobsToRun . blocking . isEmpty || ! jobsToRun . nonBlocking . isEmpty else { return }
guard ! jobsToRun . blocking . isEmpty || ! jobsToRun . nonBlocking . isEmpty else { return }
// A d d a n d s t a r t a n y b l o c k i n g j o b s
// A d d a n d s t a r t a n y b l o c k i n g j o b s
blockingQueue . wrappedValue ? . appDidFinishLaunching ( with : jobsToRun . blocking , canStart : true )
blockingQueue . wrappedValue ? . appDidFinishLaunching (
with : jobsToRun . blocking ,
canStart : true ,
dependencies : dependencies
)
// A d d a n y n o n - b l o c k i n g j o b s ( w e d o n ' t s t a r t t h e s e i n c a s e t h e r e a r e b l o c k i n g " o n a c t i v e "
// A d d a n y n o n - b l o c k i n g j o b s ( w e d o n ' t s t a r t t h e s e i n c a s e t h e r e a r e b l o c k i n g " o n a c t i v e "
// j o b s a s w e l l )
// j o b s a s w e l l )
@ -239,13 +181,13 @@ public final class JobRunner {
let jobQueues : [ Job . Variant : JobQueue ] = queues . wrappedValue
let jobQueues : [ Job . Variant : JobQueue ] = queues . wrappedValue
jobsByVariant . forEach { variant , jobs in
jobsByVariant . forEach { variant , jobs in
jobQueues [ variant ] ? . appDidFinishLaunching ( with : jobs , canStart : false )
jobQueues [ variant ] ? . appDidFinishLaunching ( with : jobs , canStart : false , dependencies : dependencies )
}
}
}
}
public static func appDidBecomeActive ( ) {
internal func appDidBecomeActive ( dependencies : Dependencies ) {
// F l a g t h a t t h e J o b R u n n e r c a n s t a r t i t ' s q u e u e s
// F l a g t h a t t h e J o b R u n n e r c a n s t a r t i t ' s q u e u e s
JobRunner. canStartQueues. mutate { $0 = true }
canStartQueues. mutate { $0 = true }
// I f w e h a v e a r u n n i n g " s u t d o w n B a c k g r o u n d T a s k " t h e n w e w a n t t o c a n c e l i t a s o t h e r w i s e i t
// I f w e h a v e a r u n n i n g " s u t d o w n B a c k g r o u n d T a s k " t h e n w e w a n t t o c a n c e l i t a s o t h e r w i s e i t
// c a n r e s u l t i n t h e d a t a b a s e b e i n g s u s p e n d e d a n d u s b e i n g u n a b l e t o i n t e r a c t w i t h i t a t a l l
// c a n r e s u l t i n t h e d a t a b a s e b e i n g s u s p e n d e d a n d u s b e i n g u n a b l e t o i n t e r a c t w i t h i t a t a l l
@ -255,8 +197,8 @@ public final class JobRunner {
}
}
// R e t r i e v e a n y j o b s w h i c h s h o u l d r u n w h e n b e c o m i n g a c t i v e
// R e t r i e v e a n y j o b s w h i c h s h o u l d r u n w h e n b e c o m i n g a c t i v e
let hasCompletedInitialBecomeActive : Bool = JobRunner . hasCompletedInitialBecomeActive . wrappedValue
let hasCompletedInitialBecomeActive : Bool = self . hasCompletedInitialBecomeActive . wrappedValue
let jobsToRun : [ Job ] = Storage. shared
let jobsToRun : [ Job ] = dependencies. storage
. read { db in
. read { db in
return try Job
return try Job
. filter ( Job . Columns . behaviour = = Job . Behaviour . recurringOnActive )
. filter ( Job . Columns . behaviour = = Job . Behaviour . recurringOnActive )
@ -272,7 +214,7 @@ public final class JobRunner {
guard ! jobsToRun . isEmpty else {
guard ! jobsToRun . isEmpty else {
if ! blockingQueueIsRunning {
if ! blockingQueueIsRunning {
jobQueues . forEach { _ , queue in queue . start ( ) }
jobQueues . forEach { _ , queue in queue . start ( dependencies : dependencies ) }
}
}
return
return
}
}
@ -283,23 +225,104 @@ public final class JobRunner {
jobQueues . forEach { variant , queue in
jobQueues . forEach { variant , queue in
queue . appDidBecomeActive (
queue . appDidBecomeActive (
with : ( jobsByVariant [ variant ] ? ? [ ] ) ,
with : ( jobsByVariant [ variant ] ? ? [ ] ) ,
canStart : ! blockingQueueIsRunning
canStart : ! blockingQueueIsRunning ,
dependencies : dependencies
)
)
}
}
JobRunner . hasCompletedInitialBecomeActive . mutate { $0 = true }
self . hasCompletedInitialBecomeActive . mutate { $0 = true }
}
}
// / C a l l i n g t h i s w i l l c l e a r t h e J o b R u n n e r q u e u e s a n d s t o p i t f r o m r u n n i n g n e w j o b s , a n y c u r r e n t l y e x e c u t i n g j o b s w i l l c o n t i n u e t o r u n
internal func add (
// / t h o u g h ( t h i s m e a n s i f w e s u s p e n d t h e d a t a b a s e i t ' s l i k e l y t h a t a n y c u r r e n t l y r u n n i n g j o b s w i l l f a i l t o c o m p l e t e a n d f a i l t o r e c o r d t h e i r
_ db : Database ,
// / f a i l u r e - t h e y _ s h o u l d _ b e p i c k e d u p a g a i n t h e n e x t t i m e t h e a p p i s l a u n c h e d )
job : Job ? ,
public static func stopAndClearPendingJobs (
canStartJob : Bool ,
dependencies : Dependencies
) {
// S t o r e t h e j o b i n t o t h e d a t a b a s e ( g e t t i n g a n i d f o r i t )
guard let updatedJob : Job = try ? job ? . inserted ( db ) else {
SNLog ( " [JobRunner] Unable to add \( job . map { " \( $0 . variant ) " } ? ? " unknown " ) job " )
return
}
guard ! canStartJob || updatedJob . id != nil else {
SNLog ( " [JobRunner] Not starting \( job . map { " \( $0 . variant ) " } ? ? " unknown " ) job due to missing id " )
return
}
queues . mutate {
$0 [ updatedJob . variant ] ?
. add ( updatedJob , canStartJob : canStartJob , dependencies : dependencies )
}
// D o n ' t s t a r t t h e q u e u e i f t h e j o b c a n ' t b e s t a r t e d
guard canStartJob else { return }
// S t a r t t h e j o b r u n n e r i f n e e d e d
db . afterNextTransactionNestedOnce ( dedupeId : " JobRunner-Start: \( updatedJob . variant ) " ) { [ weak self ] _ in
self ? . queues . wrappedValue [ updatedJob . variant ] ? . start ( dependencies : dependencies )
}
}
internal func upsert (
_ db : Database ,
job : Job ? ,
canStartJob : Bool ,
dependencies : Dependencies
) {
guard let job : Job = job else { return } // I g n o r e n u l l j o b s
guard job . id != nil else {
add ( db , job : job , canStartJob : canStartJob , dependencies : dependencies )
return
}
queues . wrappedValue [ job . variant ] ? . upsert ( job , canStartJob : canStartJob , dependencies : dependencies )
// D o n ' t s t a r t t h e q u e u e i f t h e j o b c a n ' t b e s t a r t e d
guard canStartJob else { return }
// S t a r t t h e j o b r u n n e r i f n e e d e d
db . afterNextTransactionNestedOnce ( dedupeId : " JobRunner-Start: \( job . variant ) " ) { [ weak self ] _ in
self ? . queues . wrappedValue [ job . variant ] ? . start ( dependencies : dependencies )
}
}
@ discardableResult internal func insert (
_ db : Database ,
job : Job ? ,
before otherJob : Job ,
dependencies : Dependencies
) -> ( Int64 , Job ) ? {
switch job ? . behaviour {
case . recurringOnActive , . recurringOnLaunch , . runOnceNextLaunch :
SNLog ( " [JobRunner] Attempted to insert \( job . map { " \( $0 . variant ) " } ? ? " unknown " ) job before the current one even though it's behaviour is \( job . map { " \( $0 . behaviour ) " } ? ? " unknown " ) " )
return nil
default : break
}
// S t o r e t h e j o b i n t o t h e d a t a b a s e ( g e t t i n g a n i d f o r i t )
guard let updatedJob : Job = try ? job ? . inserted ( db ) else {
SNLog ( " [JobRunner] Unable to add \( job . map { " \( $0 . variant ) " } ? ? " unknown " ) job " )
return nil
}
guard let jobId : Int64 = updatedJob . id else {
SNLog ( " [JobRunner] Unable to add \( job . map { " \( $0 . variant ) " } ? ? " unknown " ) job due to missing id " )
return nil
}
queues . wrappedValue [ updatedJob . variant ] ?
. insert ( updatedJob , before : otherJob , dependencies : dependencies )
return ( jobId , updatedJob )
}
internal func stopAndClearPendingJobs (
exceptForVariant : Job . Variant ? = nil ,
exceptForVariant : Job . Variant ? = nil ,
onComplete : ( ( ) -> ( ) ) ? = nil
onComplete : ( ( ) -> ( ) ) ? = nil
) {
) {
// I n f o r m t h e J o b R u n n e r t h a t i t c a n ' t s t a r t a n y q u e u e s ( t h i s i s t o p r e v e n t q u e u e s f r o m
// I n f o r m t h e J o b R u n n e r t h a t i t c a n ' t s t a r t a n y q u e u e s ( t h i s i s t o p r e v e n t q u e u e s f r o m
// r e s c h e d u l i n g t h e m s e l v e s w h i l e i n t h e b a c k g r o u n d , w h e n t h e a p p r e s t a r t s o r b e c o m e s a c t i v e
// r e s c h e d u l i n g t h e m s e l v e s w h i l e i n t h e b a c k g r o u n d , w h e n t h e a p p r e s t a r t s o r b e c o m e s a c t i v e
// t h e J o b R u n e n r w i l l u p d a t e t h i s f l a g )
// t h e J o b R u n e n r w i l l u p d a t e t h i s f l a g )
JobRunner . canStartQueues . mutate { $0 = false }
canStartQueues. mutate { $0 = false }
// S t o p a l l q u e u e s e x c e p t f o r t h e o n e c o n t a i n i n g t h e ` e x c e p t F o r V a r i a n t `
// S t o p a l l q u e u e s e x c e p t f o r t h e o n e c o n t a i n i n g t h e ` e x c e p t F o r V a r i a n t `
queues . wrappedValue
queues . wrappedValue
@ -341,27 +364,27 @@ public final class JobRunner {
}
}
// A d d a c a l l b a c k t o b e t r i g g e r e d o n c e t h e q u e u e i s d r a i n e d
// A d d a c a l l b a c k t o b e t r i g g e r e d o n c e t h e q u e u e i s d r a i n e d
queue . onQueueDrained = { [ weak queue ] in
queue . onQueueDrained = { [ weak self , weak queue ] in
oldQueueDrained ? ( )
oldQueueDrained ? ( )
queue ? . onQueueDrained = oldQueueDrained
queue ? . onQueueDrained = oldQueueDrained
onComplete ? ( )
onComplete ? ( )
shutdownBackgroundTask . mutate { $0 = nil }
self ? . shutdownBackgroundTask . mutate { $0 = nil }
}
}
}
}
public static func isCurrentlyRunning ( _ job : Job ? ) -> Bool {
internal func isCurrentlyRunning ( _ job : Job ? ) -> Bool {
guard let job : Job = job , let jobId : Int64 = job . id else { return false }
guard let job : Job = job , let jobId : Int64 = job . id else { return false }
return ( queues . wrappedValue [ job . variant ] ? . isCurrentlyRunning ( jobId ) = = true )
return ( queues . wrappedValue [ job . variant ] ? . isCurrentlyRunning ( jobId ) = = true )
}
}
public static func def ailsForCurrentlyRunningJobs( of variant : Job . Variant ) -> [ Int64 : Data ? ] {
internal func det ailsForCurrentlyRunningJobs( of variant : Job . Variant ) -> [ Int64 : Data ? ] {
return ( queues . wrappedValue [ variant ] ? . detailsForAllCurrentlyRunningJobs ( ) )
return ( queues . wrappedValue [ variant ] ? . detailsForAllCurrentlyRunningJobs ( ) )
. defaulting ( to : [ : ] )
. defaulting ( to : [ : ] )
}
}
public static func afterCurrentlyRunningJob ( _ job : Job ? , callback : @ escaping ( JobResult ) -> ( ) ) {
internal func afterCurrentlyRunningJob ( _ job : Job ? , callback : @ escaping ( JobResult ) -> ( ) ) {
guard let job : Job = job , let jobId : Int64 = job . id , let queue : JobQueue = queues . wrappedValue [ job . variant ] else {
guard let job : Job = job , let jobId : Int64 = job . id , let queue : JobQueue = queues . wrappedValue [ job . variant ] else {
callback ( . notFound )
callback ( . notFound )
return
return
@ -370,14 +393,14 @@ public final class JobRunner {
queue . afterCurrentlyRunningJob ( jobId , callback : callback )
queue . afterCurrentlyRunningJob ( jobId , callback : callback )
}
}
public static func hasPendingOrRunningJob < T : Encodable > ( with variant : Job . Variant , details : T ) -> Bool {
internal func hasPendingOrRunningJob < T : Encodable > ( with variant : Job . Variant , details : T ) -> Bool {
guard let targetQueue : JobQueue = queues . wrappedValue [ variant ] else { return false }
guard let targetQueue : JobQueue = queues . wrappedValue [ variant ] else { return false }
guard let detailsData : Data = try ? JSONEncoder ( ) . encode ( details ) else { return false }
guard let detailsData : Data = try ? JSONEncoder ( ) . encode ( details ) else { return false }
return targetQueue . hasPendingOrRunningJob ( with : detailsData )
return targetQueue . hasPendingOrRunningJob ( with : detailsData )
}
}
public static func removePendingJob ( _ job : Job ? ) {
internal func removePendingJob ( _ job : Job ? ) {
guard let job : Job = job , let jobId : Int64 = job . id else { return }
guard let job : Job = job , let jobId : Int64 = job . id else { return }
queues . wrappedValue [ job . variant ] ? . removePendingJob ( jobId )
queues . wrappedValue [ job . variant ] ? . removePendingJob ( jobId )
@ -398,6 +421,97 @@ public final class JobRunner {
}
}
}
}
// MARK: - J o b R u n n e r S i n g l e t o n
public extension JobRunner {
private static let instance : JobRunner = JobRunner ( )
// MARK: - S t a t i c A c c e s s
static func add ( executor : JobExecutor . Type , for variant : Job . Variant ) {
instance . add ( executor : executor , for : variant )
}
static func appDidFinishLaunching ( dependencies : Dependencies = Dependencies ( ) ) {
instance . appDidFinishLaunching ( dependencies : dependencies )
}
static func appDidBecomeActive ( dependencies : Dependencies = Dependencies ( ) ) {
instance . appDidBecomeActive ( dependencies : dependencies )
}
// / A d d a j o b o n t o t h e q u e u e , i f t h e q u e u e i s n ' t c u r r e n t l y r u n n i n g a n d ' c a n S t a r t J o b ' i s t r u e t h e n t h i s w i l l s t a r t
// / t h e J o b R u n n e r
// /
// / * * N o t e : * * I f t h e j o b h a s a ` b e h a v i o u r ` o f ` r u n O n c e N e x t L a u n c h ` o r t h e ` n e x t R u n T i m e s t a m p `
// / i s i n t h e f u t u r e t h e n t h e j o b w o n ' t b e s t a r t e d
static func add (
_ db : Database ,
job : Job ? ,
canStartJob : Bool = true ,
dependencies : Dependencies = Dependencies ( )
) { instance . add ( db , job : job , canStartJob : canStartJob , dependencies : dependencies ) }
// / U p s e r t a j o b o n t o t h e q u e u e , i f t h e q u e u e i s n ' t c u r r e n t l y r u n n i n g a n d ' c a n S t a r t J o b ' i s t r u e t h e n t h i s w i l l s t a r t
// / t h e J o b R u n n e r
// /
// / * * N o t e : * * I f t h e j o b h a s a ` b e h a v i o u r ` o f ` r u n O n c e N e x t L a u n c h ` o r t h e ` n e x t R u n T i m e s t a m p `
// / i s i n t h e f u t u r e t h e n t h e j o b w o n ' t b e s t a r t e d
static func upsert (
_ db : Database ,
job : Job ? ,
canStartJob : Bool = true ,
dependencies : Dependencies = Dependencies ( )
) { instance . upsert ( db , job : job , canStartJob : canStartJob , dependencies : dependencies ) }
@ discardableResult static func insert (
_ db : Database ,
job : Job ? ,
before otherJob : Job ,
dependencies : Dependencies = Dependencies ( )
) -> ( Int64 , Job ) ? { instance . insert ( db , job : job , before : otherJob , dependencies : dependencies ) }
// / C a l l i n g t h i s w i l l c l e a r t h e J o b R u n n e r q u e u e s a n d s t o p i t f r o m r u n n i n g n e w j o b s , a n y c u r r e n t l y e x e c u t i n g j o b s w i l l c o n t i n u e t o r u n
// / t h o u g h ( t h i s m e a n s i f w e s u s p e n d t h e d a t a b a s e i t ' s l i k e l y t h a t a n y c u r r e n t l y r u n n i n g j o b s w i l l f a i l t o c o m p l e t e a n d f a i l t o r e c o r d t h e i r
// / f a i l u r e - t h e y _ s h o u l d _ b e p i c k e d u p a g a i n t h e n e x t t i m e t h e a p p i s l a u n c h e d )
static func stopAndClearPendingJobs (
exceptForVariant : Job . Variant ? = nil ,
onComplete : ( ( ) -> ( ) ) ? = nil
) { instance . stopAndClearPendingJobs ( exceptForVariant : exceptForVariant , onComplete : onComplete ) }
static func isCurrentlyRunning ( _ job : Job ? ) -> Bool {
return instance . isCurrentlyRunning ( job )
}
static func detailsForCurrentlyRunningJobs ( of variant : Job . Variant ) -> [ Int64 : Data ? ] {
return instance . detailsForCurrentlyRunningJobs ( of : variant )
}
static func afterCurrentlyRunningJob ( _ job : Job ? , callback : @ escaping ( JobResult ) -> ( ) ) {
instance . afterCurrentlyRunningJob ( job , callback : callback )
}
static func hasPendingOrRunningJob < T : Encodable > ( with variant : Job . Variant , details : T ) -> Bool {
return instance . hasPendingOrRunningJob ( with : variant , details : details )
}
static func removePendingJob ( _ job : Job ? ) {
instance . removePendingJob ( job )
}
// MARK: - I n t e r n a l S t a t i c A c c e s s
fileprivate static func canStart ( queue : JobQueue ) -> Bool {
return instance . canStartQueues . wrappedValue
}
fileprivate static func startNonBlockingQueues ( dependencies : Dependencies ) {
instance . queues . wrappedValue . forEach { _ , queue in
queue . start ( dependencies : dependencies )
}
}
}
// MARK: - J o b Q u e u e
// MARK: - J o b Q u e u e
private final class JobQueue {
private final class JobQueue {
@ -433,7 +547,11 @@ private final class JobQueue {
private var timer : Timer ?
private var timer : Timer ?
fileprivate var fireTimestamp : TimeInterval = 0
fileprivate var fireTimestamp : TimeInterval = 0
static func create ( queue : JobQueue , timestamp : TimeInterval ) -> Trigger ? {
static func create (
queue : JobQueue ,
timestamp : TimeInterval ,
dependencies : Dependencies
) -> Trigger ? {
// / S e t u p t h e t r i g g e r ( w a i t a t l e a s t 1 s e c o n d b e f o r e t r i g g e r i n g )
// / S e t u p t h e t r i g g e r ( w a i t a t l e a s t 1 s e c o n d b e f o r e t r i g g e r i n g )
// /
// /
// / * * N o t e : * * W e u s e t h e ` T i m e r . s c h e d u l e d T i m e r O n M a i n T h r e a d ` m e t h o d b e c a u s e r u n n i n g a t i m e r
// / * * N o t e : * * W e u s e t h e ` T i m e r . s c h e d u l e d T i m e r O n M a i n T h r e a d ` m e t h o d b e c a u s e r u n n i n g a t i m e r
@ -445,7 +563,7 @@ private final class JobQueue {
withTimeInterval : trigger . fireTimestamp ,
withTimeInterval : trigger . fireTimestamp ,
repeats : false ,
repeats : false ,
block : { [ weak queue ] _ in
block : { [ weak queue ] _ in
queue ? . start ( )
queue ? . start ( dependencies : dependencies )
}
}
)
)
@ -485,6 +603,7 @@ private final class JobQueue {
return result
return result
} ( )
} ( )
private var executorMap : Atomic < [ Job . Variant : JobExecutor . Type ] > = Atomic ( [ : ] )
private var nextTrigger : Atomic < Trigger ? > = Atomic ( nil )
private var nextTrigger : Atomic < Trigger ? > = Atomic ( nil )
fileprivate var isRunning : Atomic < Bool > = Atomic ( false )
fileprivate var isRunning : Atomic < Bool > = Atomic ( false )
private var queue : Atomic < [ Job ] > = Atomic ( [ ] )
private var queue : Atomic < [ Job ] > = Atomic ( [ ] )
@ -512,9 +631,15 @@ private final class JobQueue {
self . onQueueDrained = onQueueDrained
self . onQueueDrained = onQueueDrained
}
}
// MARK: - C o n f i g u r a t i o n
fileprivate func addExecutor ( _ executor : JobExecutor . Type , for variant : Job . Variant ) {
executorMap . mutate { $0 [ variant ] = executor }
}
// MARK: - E x e c u t i o n
// MARK: - E x e c u t i o n
fileprivate func add ( _ job : Job , canStartJob : Bool = true ) {
fileprivate func add ( _ job : Job , canStartJob : Bool = true , dependencies : Dependencies ) {
// C h e c k i f t h e j o b s h o u l d b e a d d e d t o t h e q u e u e
// C h e c k i f t h e j o b s h o u l d b e a d d e d t o t h e q u e u e
guard
guard
canStartJob ,
canStartJob ,
@ -534,7 +659,7 @@ private final class JobQueue {
// /
// /
// / * * N o t e : * * I f t h e j o b h a s a ` b e h a v i o u r ` o f ` r u n O n c e N e x t L a u n c h ` o r t h e ` n e x t R u n T i m e s t a m p `
// / * * N o t e : * * I f t h e j o b h a s a ` b e h a v i o u r ` o f ` r u n O n c e N e x t L a u n c h ` o r t h e ` n e x t R u n T i m e s t a m p `
// / i s i n t h e f u t u r e t h e n t h e j o b w o n ' t b e s t a r t e d
// / i s i n t h e f u t u r e t h e n t h e j o b w o n ' t b e s t a r t e d
fileprivate func upsert ( _ job : Job , canStartJob : Bool = true ) {
fileprivate func upsert ( _ job : Job , canStartJob : Bool = true , dependencies : Dependencies ) {
guard let jobId : Int64 = job . id else {
guard let jobId : Int64 = job . id else {
SNLog ( " [JobRunner] Prevented attempt to upsert \( job . variant ) job without id to queue " )
SNLog ( " [JobRunner] Prevented attempt to upsert \( job . variant ) job without id to queue " )
return
return
@ -557,10 +682,10 @@ private final class JobQueue {
// I f w e d i d n ' t u p d a t e a n e x i s t i n g j o b t h e n w e n e e d t o a d d i t t o t h e q u e u e
// I f w e d i d n ' t u p d a t e a n e x i s t i n g j o b t h e n w e n e e d t o a d d i t t o t h e q u e u e
guard ! didUpdateExistingJob else { return }
guard ! didUpdateExistingJob else { return }
add ( job , canStartJob : canStartJob )
add ( job , canStartJob : canStartJob , dependencies : dependencies )
}
}
fileprivate func insert ( _ job : Job , before otherJob : Job ) {
fileprivate func insert ( _ job : Job , before otherJob : Job , dependencies : Dependencies ) {
guard job . id != nil else {
guard job . id != nil else {
SNLog ( " [JobRunner] Prevented attempt to insert \( job . variant ) job without id to queue " )
SNLog ( " [JobRunner] Prevented attempt to insert \( job . variant ) job without id to queue " )
return
return
@ -580,16 +705,24 @@ private final class JobQueue {
}
}
}
}
fileprivate func appDidFinishLaunching ( with jobs : [ Job ] , canStart : Bool ) {
fileprivate func appDidFinishLaunching (
with jobs : [ Job ] ,
canStart : Bool ,
dependencies : Dependencies
) {
queue . mutate { $0 . append ( contentsOf : jobs ) }
queue . mutate { $0 . append ( contentsOf : jobs ) }
// S t a r t t h e j o b r u n n e r i f n e e d e d
// S t a r t t h e j o b r u n n e r i f n e e d e d
if canStart && ! isRunning . wrappedValue {
if canStart && ! isRunning . wrappedValue {
start ( )
start ( dependencies : dependencies )
}
}
}
}
fileprivate func appDidBecomeActive ( with jobs : [ Job ] , canStart : Bool ) {
fileprivate func appDidBecomeActive (
with jobs : [ Job ] ,
canStart : Bool ,
dependencies : Dependencies
) {
queue . mutate { queue in
queue . mutate { queue in
// A v o i d r e - a d d i n g j o b s t o t h e q u e u e t h a t a r e a l r e a d y i n i t ( t h i s c a n
// A v o i d r e - a d d i n g j o b s t o t h e q u e u e t h a t a r e a l r e a d y i n i t ( t h i s c a n
// h a p p e n i f t h e u s e r s e n d s t h e a p p t o t h e b a c k g r o u n d b e f o r e t h e ' o n A c t i v e '
// h a p p e n i f t h e u s e r s e n d s t h e a p p t o t h e b a c k g r o u n d b e f o r e t h e ' o n A c t i v e '
@ -602,7 +735,7 @@ private final class JobQueue {
// S t a r t t h e j o b r u n n e r i f n e e d e d
// S t a r t t h e j o b r u n n e r i f n e e d e d
if canStart && ! isRunning . wrappedValue {
if canStart && ! isRunning . wrappedValue {
start ( )
start ( dependencies : dependencies )
}
}
}
}
@ -639,17 +772,24 @@ private final class JobQueue {
// MARK: - J o b R u n n i n g
// MARK: - J o b R u n n i n g
fileprivate func start ( force : Bool = false ) {
fileprivate func start (
force : Bool = false ,
dependencies : Dependencies
) {
// W e o n l y w a n t t h e J o b R u n n e r t o r u n i n t h e m a i n a p p
// W e o n l y w a n t t h e J o b R u n n e r t o r u n i n t h e m a i n a p p
guard CurrentAppContext ( ) . isMainApp else { return }
guard
guard JobRunner . canStartQueues . wrappedValue else { return }
HasAppContext ( ) &&
CurrentAppContext ( ) . isMainApp &&
! CurrentAppContext ( ) . isRunningTests &&
JobRunner . canStart ( queue : self )
else { return }
guard force || ! isRunning . wrappedValue else { return }
guard force || ! isRunning . wrappedValue else { return }
// T h e J o b R u n n e r r u n s s y n c h r o n o u s l y w e n e e d t o e n s u r e t h i s d o e s n ' t s t a r t
// T h e J o b R u n n e r r u n s s y n c h r o n o u s l y w e n e e d t o e n s u r e t h i s d o e s n ' t s t a r t
// o n t h e m a i n t h r e a d ( i f i t i s o n t h e m a i n t h r e a d t h e n s w a p t o a d i f f e r e n t t h r e a d )
// o n t h e m a i n t h r e a d ( i f i t i s o n t h e m a i n t h r e a d t h e n s w a p t o a d i f f e r e n t t h r e a d )
guard DispatchQueue . getSpecific ( key : queueKey ) = = queueContext else {
guard DispatchQueue . getSpecific ( key : queueKey ) = = queueContext else {
internalQueue . async { [ weak self ] in
internalQueue . async { [ weak self ] in
self ? . start ( )
self ? . start ( dependencies : dependencies )
}
}
return
return
}
}
@ -665,7 +805,7 @@ private final class JobQueue {
// G e t a n y p e n d i n g j o b s
// G e t a n y p e n d i n g j o b s
let jobIdsAlreadyRunning : Set < Int64 > = jobsCurrentlyRunning . wrappedValue
let jobIdsAlreadyRunning : Set < Int64 > = jobsCurrentlyRunning . wrappedValue
let jobsAlreadyInQueue : Set < Int64 > = queue . wrappedValue . compactMap { $0 . id } . asSet ( )
let jobsAlreadyInQueue : Set < Int64 > = queue . wrappedValue . compactMap { $0 . id } . asSet ( )
let jobsToRun : [ Job ] = Storage. shared . read { db in
let jobsToRun : [ Job ] = dependencies. storage . read { db in
try Job
try Job
. filterPendingJobs (
. filterPendingJobs (
variants : jobVariants ,
variants : jobVariants ,
@ -691,7 +831,7 @@ private final class JobQueue {
guard jobCount > 0 else {
guard jobCount > 0 else {
if jobIdsAlreadyRunning . isEmpty {
if jobIdsAlreadyRunning . isEmpty {
isRunning . mutate { $0 = false }
isRunning . mutate { $0 = false }
scheduleNextSoonestJob ( )
scheduleNextSoonestJob ( dependencies : dependencies )
}
}
return
return
}
}
@ -700,7 +840,7 @@ private final class JobQueue {
if ! wasAlreadyRunning {
if ! wasAlreadyRunning {
SNLog ( " [JobRunner] Starting \( queueContext ) with ( \( jobCount ) job \( jobCount != 1 ? " s " : " " ) ) " )
SNLog ( " [JobRunner] Starting \( queueContext ) with ( \( jobCount ) job \( jobCount != 1 ? " s " : " " ) ) " )
}
}
runNextJob ( )
runNextJob ( dependencies : dependencies )
}
}
fileprivate func stopAndClearPendingJobs ( ) {
fileprivate func stopAndClearPendingJobs ( ) {
@ -709,14 +849,14 @@ private final class JobQueue {
deferLoopTracker . mutate { $0 = [ : ] }
deferLoopTracker . mutate { $0 = [ : ] }
}
}
private func runNextJob ( ) {
private func runNextJob ( dependencies : Dependencies ) {
// E n s u r e t h e q u e u e i s r u n n i n g ( i f w e ' v e s t o p p e d t h e q u e u e t h e n w e s h o u l d n ' t s t a r t t h e n e x t j o b )
// E n s u r e t h e q u e u e i s r u n n i n g ( i f w e ' v e s t o p p e d t h e q u e u e t h e n w e s h o u l d n ' t s t a r t t h e n e x t j o b )
guard isRunning . wrappedValue else { return }
guard isRunning . wrappedValue else { return }
// E n s u r e t h i s i s r u n n i n g o n t h e c o r r e c t q u e u e
// E n s u r e t h i s i s r u n n i n g o n t h e c o r r e c t q u e u e
guard DispatchQueue . getSpecific ( key : queueKey ) = = queueContext else {
guard DispatchQueue . getSpecific ( key : queueKey ) = = queueContext else {
internalQueue . async { [ weak self ] in
internalQueue . async { [ weak self ] in
self ? . runNextJob ( )
self ? . runNextJob ( dependencies : dependencies )
}
}
return
return
}
}
@ -728,38 +868,58 @@ private final class JobQueue {
// A l w a y s a t t e m p t t o s c h e d u l e t h e n e x t s o o n e s t j o b ( o t h e r w i s e i f e n o u g h j o b s g e t s t a r t e d i n r a p i d
// A l w a y s a t t e m p t t o s c h e d u l e t h e n e x t s o o n e s t j o b ( o t h e r w i s e i f e n o u g h j o b s g e t s t a r t e d i n r a p i d
// s u c c e s s i o n t h e n p e n d i n g / f a i l e d j o b s i n t h e d a t a b a s e m a y n e v e r g e t r e - s t a r t e d i n a c o n c u r r e n t q u e u e )
// s u c c e s s i o n t h e n p e n d i n g / f a i l e d j o b s i n t h e d a t a b a s e m a y n e v e r g e t r e - s t a r t e d i n a c o n c u r r e n t q u e u e )
scheduleNextSoonestJob ( )
scheduleNextSoonestJob ( dependencies : dependencies )
return
return
}
}
guard let jobExecutor : JobExecutor . Type = JobRunner. executorMap. wrappedValue [ nextJob . variant ] else {
guard let jobExecutor : JobExecutor . Type = executorMap. wrappedValue [ nextJob . variant ] else {
SNLog ( " [JobRunner] \( queueContext ) Unable to run \( nextJob . variant ) job due to missing executor " )
SNLog ( " [JobRunner] \( queueContext ) Unable to run \( nextJob . variant ) job due to missing executor " )
handleJobFailed ( nextJob , error : JobRunnerError . executorMissing , permanentFailure : true )
handleJobFailed (
nextJob ,
error : JobRunnerError . executorMissing ,
permanentFailure : true ,
dependencies : dependencies
)
return
return
}
}
guard ! jobExecutor . requiresThreadId || nextJob . threadId != nil else {
guard ! jobExecutor . requiresThreadId || nextJob . threadId != nil else {
SNLog ( " [JobRunner] \( queueContext ) Unable to run \( nextJob . variant ) job due to missing required threadId " )
SNLog ( " [JobRunner] \( queueContext ) Unable to run \( nextJob . variant ) job due to missing required threadId " )
handleJobFailed ( nextJob , error : JobRunnerError . requiredThreadIdMissing , permanentFailure : true )
handleJobFailed (
nextJob ,
error : JobRunnerError . requiredThreadIdMissing ,
permanentFailure : true ,
dependencies : dependencies
)
return
return
}
}
guard ! jobExecutor . requiresInteractionId || nextJob . interactionId != nil else {
guard ! jobExecutor . requiresInteractionId || nextJob . interactionId != nil else {
SNLog ( " [JobRunner] \( queueContext ) Unable to run \( nextJob . variant ) job due to missing required interactionId " )
SNLog ( " [JobRunner] \( queueContext ) Unable to run \( nextJob . variant ) job due to missing required interactionId " )
handleJobFailed ( nextJob , error : JobRunnerError . requiredInteractionIdMissing , permanentFailure : true )
handleJobFailed (
nextJob ,
error : JobRunnerError . requiredInteractionIdMissing ,
permanentFailure : true ,
dependencies : dependencies
)
return
return
}
}
guard nextJob . id != nil else {
guard nextJob . id != nil else {
SNLog ( " [JobRunner] \( queueContext ) Unable to run \( nextJob . variant ) job due to missing id " )
SNLog ( " [JobRunner] \( queueContext ) Unable to run \( nextJob . variant ) job due to missing id " )
handleJobFailed ( nextJob , error : JobRunnerError . jobIdMissing , permanentFailure : false )
handleJobFailed (
nextJob ,
error : JobRunnerError . jobIdMissing ,
permanentFailure : false ,
dependencies : dependencies
)
return
return
}
}
// I f t h e ' n e x t R u n T i m e s t a m p ' f o r t h e j o b i s i n t h e f u t u r e t h e n d o n ' t r u n i t y e t
// I f t h e ' n e x t R u n T i m e s t a m p ' f o r t h e j o b i s i n t h e f u t u r e t h e n d o n ' t r u n i t y e t
guard nextJob . nextRunTimestamp <= Date ( ) . timeIntervalSince1970 else {
guard nextJob . nextRunTimestamp <= Date ( ) . timeIntervalSince1970 else {
handleJobDeferred ( nextJob )
handleJobDeferred ( nextJob , dependencies : dependencies )
return
return
}
}
// C h e c k i f t h e n e x t j o b h a s a n y d e p e n d e n c i e s
// C h e c k i f t h e n e x t j o b h a s a n y d e p e n d e n c i e s
let dependencyInfo : ( expectedCount : Int , jobs : [ Job ] ) = Storage. shared . read { db in
let dependencyInfo : ( expectedCount : Int , jobs : [ Job ] ) = dependencies. storage . read { db in
let numExpectedDependencies : Int = try JobDependencies
let numExpectedDependencies : Int = try JobDependencies
. filter ( JobDependencies . Columns . jobId = = nextJob . id )
. filter ( JobDependencies . Columns . jobId = = nextJob . id )
. fetchCount ( db )
. fetchCount ( db )
@ -771,7 +931,12 @@ private final class JobQueue {
guard dependencyInfo . jobs . count = = dependencyInfo . expectedCount else {
guard dependencyInfo . jobs . count = = dependencyInfo . expectedCount else {
SNLog ( " [JobRunner] \( queueContext ) found job with missing dependencies, removing the job " )
SNLog ( " [JobRunner] \( queueContext ) found job with missing dependencies, removing the job " )
handleJobFailed ( nextJob , error : JobRunnerError . missingDependencies , permanentFailure : true )
handleJobFailed (
nextJob ,
error : JobRunnerError . missingDependencies ,
permanentFailure : true ,
dependencies : dependencies
)
return
return
}
}
guard dependencyInfo . jobs . isEmpty else {
guard dependencyInfo . jobs . isEmpty else {
@ -792,7 +957,7 @@ private final class JobQueue {
)
)
queue . append ( nextJob )
queue . append ( nextJob )
}
}
handleJobDeferred ( nextJob )
handleJobDeferred ( nextJob , dependencies : dependencies )
return
return
}
}
@ -810,7 +975,7 @@ private final class JobQueue {
}
}
}
}
handleJobDeferred ( nextJob )
handleJobDeferred ( nextJob , dependencies : dependencies )
return
return
}
}
@ -831,26 +996,45 @@ private final class JobQueue {
detailsForCurrentlyRunningJobs . mutate { $0 = $0 . setting ( nextJob . id , nextJob . details ) }
detailsForCurrentlyRunningJobs . mutate { $0 = $0 . setting ( nextJob . id , nextJob . details ) }
SNLog ( " [JobRunner] \( queueContext ) started \( nextJob . variant ) job ( \( executionType = = . concurrent ? " \( numJobsRunning ) currently running, " : " " ) \( numJobsRemaining ) remaining) " )
SNLog ( " [JobRunner] \( queueContext ) started \( nextJob . variant ) job ( \( executionType = = . concurrent ? " \( numJobsRunning ) currently running, " : " " ) \( numJobsRemaining ) remaining) " )
// / A s i t t u r n s o u t C o m b i n e d o e s n ' t p l a t t o o n i c e l y w i t h c o n c u r r e n t D i s p a t c h Q u e u e s , i n C o m b i n e e v e n t s a r e d i s p a t c h e d a s y n c h r o n o u s l y t o
// / t h e q u e u e w h i c h m e a n s a n o d d s i t u a t i o n c a n o c c a s i o n a l l y o c c u r w h e r e t h e ` f i n i s h e d ` e v e n t c a n a c t u a l l y r u n b e f o r e t h e ` o u t p u t `
// / e v e n t - t h i s c a n r e s u l t i n u n e x p e c t e d b e h a v i o u r s ( f o r m o r e i n f o r m a t i o n s e e h t t p s : / / g i t h u b . c o m / g r o u e / G R D B . s w i f t / i s s u e s / 1 3 3 4 )
// /
// / D u e t o t h i s i f a j o b i s m e a n t t o r u n o n a c o n c u r r e n t q u e u e t h e n w e a c t u a l l y w a n t t o c r e a t e a t e m p o r a r y s e r i a l q u e u e j u s t f o r t h e e x e c u t i o n
// / o f t h a t j o b
let targetQueue : DispatchQueue = {
guard executionType = = . concurrent else { return internalQueue }
return DispatchQueue (
label : " \( self . queueContext ) -serial " ,
qos : self . qosClass ,
attributes : [ ] ,
autoreleaseFrequency : . inherit ,
target : nil
)
} ( )
jobExecutor . run (
jobExecutor . run (
nextJob ,
nextJob ,
queue : internalQueue ,
queue : target Queue,
success : handleJobSucceeded ,
success : handleJobSucceeded ,
failure : handleJobFailed ,
failure : handleJobFailed ,
deferred : handleJobDeferred
deferred : handleJobDeferred ,
dependencies : dependencies
)
)
// I f t h i s q u e u e e x e c u t e s c o n c u r r e n t l y a n d t h e r e a r e s t i l l j o b s r e m a i n i n g t h e n i m m e d i a t e l y a t t e m p t
// I f t h i s q u e u e e x e c u t e s c o n c u r r e n t l y a n d t h e r e a r e s t i l l j o b s r e m a i n i n g t h e n i m m e d i a t e l y a t t e m p t
// t o s t a r t t h e n e x t j o b
// t o s t a r t t h e n e x t j o b
if executionType = = . concurrent && numJobsRemaining > 0 {
if executionType = = . concurrent && numJobsRemaining > 0 {
internalQueue . async { [ weak self ] in
internalQueue . async { [ weak self ] in
self ? . runNextJob ( )
self ? . runNextJob ( dependencies : dependencies )
}
}
}
}
}
}
private func scheduleNextSoonestJob ( ) {
private func scheduleNextSoonestJob ( dependencies : Dependencies ) {
let jobIdsAlreadyRunning : Set < Int64 > = jobsCurrentlyRunning . wrappedValue
let jobIdsAlreadyRunning : Set < Int64 > = jobsCurrentlyRunning . wrappedValue
let nextJobTimestamp : TimeInterval ? = Storage. shared . read { db in
let nextJobTimestamp : TimeInterval ? = dependencies. storage . read { db in
try Job
try Job
. filterPendingJobs (
. filterPendingJobs (
variants : jobVariants ,
variants : jobVariants ,
@ -865,7 +1049,7 @@ private final class JobQueue {
// I f t h e r e a r e n o r e m a i n i n g j o b s o r t h e J o b R u n n e r i s n ' t a l l o w e d t o s t a r t a n y q u e u e s t h e n t r i g g e r
// I f t h e r e a r e n o r e m a i n i n g j o b s o r t h e J o b R u n n e r i s n ' t a l l o w e d t o s t a r t a n y q u e u e s t h e n t r i g g e r
// t h e ' o n Q u e u e D r a i n e d ' c a l l b a c k a n d s t o p
// t h e ' o n Q u e u e D r a i n e d ' c a l l b a c k a n d s t o p
guard let nextJobTimestamp : TimeInterval = nextJobTimestamp , JobRunner . canStart Queues. wrappedValue else {
guard let nextJobTimestamp : TimeInterval = nextJobTimestamp , JobRunner . canStart ( queue : self ) else {
if executionType != . concurrent || jobsCurrentlyRunning . wrappedValue . isEmpty {
if executionType != . concurrent || jobsCurrentlyRunning . wrappedValue . isEmpty {
self . onQueueDrained ? ( )
self . onQueueDrained ? ( )
}
}
@ -889,7 +1073,7 @@ private final class JobQueue {
// q u e u e ( f o r c o n c u r r e n t q u e u e s w e w a n t t o f o r c e t h e m t o l o a d i n p e n d i n g j o b s a n d a d d
// q u e u e ( f o r c o n c u r r e n t q u e u e s w e w a n t t o f o r c e t h e m t o l o a d i n p e n d i n g j o b s a n d a d d
// t h e m t o t h e q u e u e r e g a r d l e s s o f w h e t h e r t h e q u e u e i s a l r e a d y r u n n i n g )
// t h e m t o t h e q u e u e r e g a r d l e s s o f w h e t h e r t h e q u e u e i s a l r e a d y r u n n i n g )
internalQueue . async { [ weak self ] in
internalQueue . async { [ weak self ] in
self ? . start ( force : ( self ? . executionType = = . concurrent ) )
self ? . start ( force : ( self ? . executionType = = . concurrent ) , dependencies : dependencies )
}
}
return
return
}
}
@ -901,17 +1085,21 @@ private final class JobQueue {
SNLog ( " [JobRunner] Stopping \( queueContext ) until next job in \( Int ( ceil ( abs ( secondsUntilNextJob ) ) ) ) second \( Int ( ceil ( abs ( secondsUntilNextJob ) ) ) = = 1 ? " " : " s " ) " )
SNLog ( " [JobRunner] Stopping \( queueContext ) until next job in \( Int ( ceil ( abs ( secondsUntilNextJob ) ) ) ) second \( Int ( ceil ( abs ( secondsUntilNextJob ) ) ) = = 1 ? " " : " s " ) " )
nextTrigger . mutate { trigger in
nextTrigger . mutate { trigger in
trigger ? . invalidate ( ) // N e e d t o i n v a l i d a t e t h e o l d t r i g g e r t o p r e v e n t a m e m o r y l e a k
trigger ? . invalidate ( ) // N e e d t o i n v a l i d a t e t h e o l d t r i g g e r t o p r e v e n t a m e m o r y l e a k
trigger = Trigger . create ( queue : self , timestamp : nextJobTimestamp )
trigger = Trigger . create ( queue : self , timestamp : nextJobTimestamp , dependencies : dependencies )
}
}
}
}
// MARK: - H a n d l i n g R e s u l t s
// MARK: - H a n d l i n g R e s u l t s
// / T h i s f u n c t i o n i s c a l l e d w h e n a j o b s u c c e e d s
// / T h i s f u n c t i o n i s c a l l e d w h e n a j o b s u c c e e d s
private func handleJobSucceeded ( _ job : Job , shouldStop : Bool ) {
private func handleJobSucceeded (
_ job : Job ,
shouldStop : Bool ,
dependencies : Dependencies
) {
switch job . behaviour {
switch job . behaviour {
case . runOnce , . runOnceNextLaunch :
case . runOnce , . runOnceNextLaunch :
Storage . shared . write { db in
dependencies. storage . write { db in
// F i r s t r e m o v e a n y J o b D e p e n d e n c i e s r e q u i r i n g t h i s j o b t o b e c o m p l e t e d ( i f
// F i r s t r e m o v e a n y J o b D e p e n d e n c i e s r e q u i r i n g t h i s j o b t o b e c o m p l e t e d ( i f
// w e d o n ' t t h e n t h e d e p e n d a n t j o b s w i l l a u t o m a t i c a l l y b e d e l e t e d )
// w e d o n ' t t h e n t h e d e p e n d a n t j o b s w i l l a u t o m a t i c a l l y b e d e l e t e d )
_ = try JobDependencies
_ = try JobDependencies
@ -922,7 +1110,7 @@ private final class JobQueue {
}
}
case . recurring where shouldStop = = true :
case . recurring where shouldStop = = true :
Storage. shared . write { db in
dependencies. storage . write { db in
// F i r s t r e m o v e a n y J o b D e p e n d e n c i e s r e q u i r i n g t h i s j o b t o b e c o m p l e t e d ( i f
// F i r s t r e m o v e a n y J o b D e p e n d e n c i e s r e q u i r i n g t h i s j o b t o b e c o m p l e t e d ( i f
// w e d o n ' t t h e n t h e d e p e n d a n t j o b s w i l l a u t o m a t i c a l l y b e d e l e t e d )
// w e d o n ' t t h e n t h e d e p e n d a n t j o b s w i l l a u t o m a t i c a l l y b e d e l e t e d )
_ = try JobDependencies
_ = try JobDependencies
@ -938,7 +1126,7 @@ private final class JobQueue {
case . recurring where job . nextRunTimestamp <= Date ( ) . timeIntervalSince1970 :
case . recurring where job . nextRunTimestamp <= Date ( ) . timeIntervalSince1970 :
guard let jobId : Int64 = job . id else { break }
guard let jobId : Int64 = job . id else { break }
Storage. shared . write { db in
dependencies. storage . write { db in
_ = try Job
_ = try Job
. filter ( id : jobId )
. filter ( id : jobId )
. updateAll (
. updateAll (
@ -958,7 +1146,7 @@ private final class JobQueue {
job . nextRunTimestamp > TimeInterval . leastNonzeroMagnitude
job . nextRunTimestamp > TimeInterval . leastNonzeroMagnitude
else { break }
else { break }
Storage. shared . write { db in
dependencies. storage . write { db in
_ = try Job
_ = try Job
. filter ( id : jobId )
. filter ( id : jobId )
. updateAll (
. updateAll (
@ -974,7 +1162,7 @@ private final class JobQueue {
// F o r c o n c u r r e n t q u e u e s r e t r i e v e a n y ' d e p e n d a n t ' j o b s a n d r e - a d d t h e m h e r e ( i f t h e y h a v e o t h e r
// F o r c o n c u r r e n t q u e u e s r e t r i e v e a n y ' d e p e n d a n t ' j o b s a n d r e - a d d t h e m h e r e ( i f t h e y h a v e o t h e r
// d e p e n d e n c i e s t h e y w i l l b e r e m o v e d a g a i n w h e n t h e y t r y t o e x e c u t e )
// d e p e n d e n c i e s t h e y w i l l b e r e m o v e d a g a i n w h e n t h e y t r y t o e x e c u t e )
if executionType = = . concurrent {
if executionType = = . concurrent {
let dependantJobs : [ Job ] = Storage. shared
let dependantJobs : [ Job ] = dependencies. storage
. read { db in try job . dependantJobs . fetchAll ( db ) }
. read { db in try job . dependantJobs . fetchAll ( db ) }
. defaulting ( to : [ ] )
. defaulting ( to : [ ] )
let dependantJobIds : [ Int64 ] = dependantJobs
let dependantJobIds : [ Int64 ] = dependantJobs
@ -997,19 +1185,24 @@ private final class JobQueue {
// P e r f o r m j o b c l e a n u p a n d s t a r t t h e n e x t j o b
// P e r f o r m j o b c l e a n u p a n d s t a r t t h e n e x t j o b
performCleanUp ( for : job , result : . succeeded )
performCleanUp ( for : job , result : . succeeded )
internalQueue . async { [ weak self ] in
internalQueue . async { [ weak self ] in
self ? . runNextJob ( )
self ? . runNextJob ( dependencies : dependencies )
}
}
}
}
// / T h i s f u n c t i o n i s c a l l e d w h e n a j o b f a i l s , i f i t ' s w a s n ' t a p e r m a n e n t f a i l u r e t h e n t h e ' f a i l u r e C o u n t ' f o r t h e j o b w i l l b e i n c r e m e n t e d a n d i t ' l l
// / T h i s f u n c t i o n i s c a l l e d w h e n a j o b f a i l s , i f i t ' s w a s n ' t a p e r m a n e n t f a i l u r e t h e n t h e ' f a i l u r e C o u n t ' f o r t h e j o b w i l l b e i n c r e m e n t e d a n d i t ' l l
// / b e r e - r u n a f t e r a r e t r y i n t e r v a l h a s p a s s e d
// / b e r e - r u n a f t e r a r e t r y i n t e r v a l h a s p a s s e d
private func handleJobFailed ( _ job : Job , error : Error ? , permanentFailure : Bool ) {
private func handleJobFailed (
_ job : Job ,
error : Error ? ,
permanentFailure : Bool ,
dependencies : Dependencies
) {
guard Storage . shared . read ( { db in try Job . exists ( db , id : job . id ? ? - 1 ) } ) = = true else {
guard Storage . shared . read ( { db in try Job . exists ( db , id : job . id ? ? - 1 ) } ) = = true else {
SNLog ( " [JobRunner] \( queueContext ) \( job . variant ) job canceled " )
SNLog ( " [JobRunner] \( queueContext ) \( job . variant ) job canceled " )
performCleanUp ( for : job , result : . failed )
performCleanUp ( for : job , result : . failed )
internalQueue . async { [ weak self ] in
internalQueue . async { [ weak self ] in
self ? . runNextJob ( )
self ? . runNextJob ( dependencies : dependencies )
}
}
return
return
}
}
@ -1040,13 +1233,13 @@ private final class JobQueue {
}
}
internalQueue . async { [ weak self ] in
internalQueue . async { [ weak self ] in
self ? . runNextJob ( )
self ? . runNextJob ( dependencies : dependencies )
}
}
return
return
}
}
// G e t t h e m a x f a i l u r e c o u n t f o r t h e j o b ( a v a l u e o f ' - 1 ' m e a n s i t w i l l r e t r y i n d e f i n i t e l y )
// G e t t h e m a x f a i l u r e c o u n t f o r t h e j o b ( a v a l u e o f ' - 1 ' m e a n s i t w i l l r e t r y i n d e f i n i t e l y )
let maxFailureCount : Int = ( JobRunner. executorMap. wrappedValue [ job . variant ] ? . maxFailureCount ? ? 0 )
let maxFailureCount : Int = ( executorMap. wrappedValue [ job . variant ] ? . maxFailureCount ? ? 0 )
let nextRunTimestamp : TimeInterval = ( Date ( ) . timeIntervalSince1970 + JobRunner . getRetryInterval ( for : job ) )
let nextRunTimestamp : TimeInterval = ( Date ( ) . timeIntervalSince1970 + JobRunner . getRetryInterval ( for : job ) )
Storage . shared . write { db in
Storage . shared . write { db in
@ -1116,13 +1309,16 @@ private final class JobQueue {
performCleanUp ( for : job , result : . failed )
performCleanUp ( for : job , result : . failed )
internalQueue . async { [ weak self ] in
internalQueue . async { [ weak self ] in
self ? . runNextJob ( )
self ? . runNextJob ( dependencies : dependencies )
}
}
}
}
// / T h i s f u n c t i o n i s c a l l e d w h e n a j o b n e i t h e r s u c c e e d s o r f a i l s ( t h i s s h o u l d o n l y o c c u r i f t h e j o b h a s s p e c i f i c l o g i c t h a t m a k e s i t d e p e n d a n t
// / T h i s f u n c t i o n i s c a l l e d w h e n a j o b n e i t h e r s u c c e e d s o r f a i l s ( t h i s s h o u l d o n l y o c c u r i f t h e j o b h a s s p e c i f i c l o g i c t h a t m a k e s i t d e p e n d a n t
// / o n o t h e r j o b s , a n d i t s h o u l d a u t o m a t i c a l l y m a n a g e t h o s e d e p e n d e n c i e s )
// / o n o t h e r j o b s , a n d i t s h o u l d a u t o m a t i c a l l y m a n a g e t h o s e d e p e n d e n c i e s )
private func handleJobDeferred ( _ job : Job ) {
private func handleJobDeferred (
_ job : Job ,
dependencies : Dependencies
) {
var stuckInDeferLoop : Bool = false
var stuckInDeferLoop : Bool = false
deferLoopTracker . mutate {
deferLoopTracker . mutate {
@ -1160,13 +1356,18 @@ private final class JobQueue {
// m o r e t h a n ' d e f e r r a l L o o p T h r e s h o l d ' t i m e s w i t h i n ' d e f e r r a l L o o p T h r e s h o l d ' s e c o n d s )
// m o r e t h a n ' d e f e r r a l L o o p T h r e s h o l d ' t i m e s w i t h i n ' d e f e r r a l L o o p T h r e s h o l d ' s e c o n d s )
guard ! stuckInDeferLoop else {
guard ! stuckInDeferLoop else {
deferLoopTracker . mutate { $0 = $0 . removingValue ( forKey : job . id ) }
deferLoopTracker . mutate { $0 = $0 . removingValue ( forKey : job . id ) }
handleJobFailed ( job , error : JobRunnerError . possibleDeferralLoop , permanentFailure : false )
handleJobFailed (
job ,
error : JobRunnerError . possibleDeferralLoop ,
permanentFailure : false ,
dependencies : dependencies
)
return
return
}
}
performCleanUp ( for : job , result : . deferred )
performCleanUp ( for : job , result : . deferred )
internalQueue . async { [ weak self ] in
internalQueue . async { [ weak self ] in
self ? . runNextJob ( )
self ? . runNextJob ( dependencies : dependencies )
}
}
}
}