// C o p y r i g h t © 2 0 2 2 R a n g e p r o o f P t y L t d . A l l r i g h t s r e s e r v e d .
import Foundation
import Combine
import GRDB
import Sodium
import SessionSnodeKit
import SessionUtilitiesKit
public class Poller {
private var timers : Atomic < [ String : Timer ] > = Atomic ( [ : ] )
internal var isPolling : Atomic < [ String : Bool ] > = Atomic ( [ : ] )
internal var pollCount : Atomic < [ String : Int ] > = Atomic ( [ : ] )
internal var failureCount : Atomic < [ String : Int ] > = Atomic ( [ : ] )
// MARK: - S e t t i n g s
// / T h e n a m e s p a c e s w h i c h t h i s p o l l e r q u e r i e s
internal var namespaces : [ SnodeAPI . Namespace ] {
preconditionFailure ( " abstract class - override in subclass " )
}
// / T h e n u m b e r o f t i m e s t h e p o l l e r c a n p o l l b e f o r e s w a p p i n g t o a n e w s n o d e
internal var maxNodePollCount : UInt {
preconditionFailure ( " abstract class - override in subclass " )
}
// MARK: - P u b l i c A P I
public init ( ) { }
public func stopAllPollers ( ) {
let pollers : [ String ] = Array ( isPolling . wrappedValue . keys )
pollers . forEach { groupPublicKey in
self . stopPolling ( for : groupPublicKey )
}
}
public func stopPolling ( for publicKey : String ) {
isPolling . mutate { $0 [ publicKey ] = false }
timers . mutate { $0 [ publicKey ] ? . invalidate ( ) }
}
// MARK: - A b s t r a c t M e t h o d s
// / T h e n a m e f o r t h i s p o l l e r t o a p p e a r i n t h e l o g s
internal func pollerName ( for publicKey : String ) -> String {
preconditionFailure ( " abstract class - override in subclass " )
}
internal func nextPollDelay ( for publicKey : String ) -> TimeInterval {
preconditionFailure ( " abstract class - override in subclass " )
}
internal func getSnodeForPolling (
for publicKey : String
) -> AnyPublisher < Snode , Error > {
preconditionFailure ( " abstract class - override in subclass " )
}
internal func handlePollError ( _ error : Error , for publicKey : String , using dependencies : SMKDependencies ) {
preconditionFailure ( " abstract class - override in subclass " )
}
// MARK: - P r i v a t e A P I
internal func startIfNeeded ( for publicKey : String ) {
// R u n o n t h e ' p o l l e r Q u e u e ' t o e n s u r e a n y ' A t o m i c ' a c c e s s d o e s n ' t b l o c k t h e m a i n t h r e a d
// o n s t a r t u p
Threading . pollerQueue . async { [ weak self ] in
guard self ? . isPolling . wrappedValue [ publicKey ] != true else { return }
// M i g h t b e a r a c e c o n d i t i o n t h a t t h e s e t U p P o l l i n g f i n i s h e s t o o s o o n ,
// a n d t h e t i m e r i s n o t c r e a t e d , i f w e m a r k t h e g r o u p a s i s p o l l i n g
// a f t e r s e t U p P o l l i n g . S o t h e p o l l e r m a y n o t w o r k , t h u s m i s s e s m e s s a g e s
self ? . isPolling . mutate { $0 [ publicKey ] = true }
self ? . setUpPolling ( for : publicKey )
}
}
// / W e w a n t t o i n i t i a l l y t r i g g e r a p o l l a g a i n s t t h e t a r g e t s e r v i c e n o d e a n d t h e n r u n t h e r e c u r s i v e p o l l i n g ,
// / i f a n e r r o r i s t h r o w n d u r i n g t h e p o l l t h e n t h i s s h o u l d a u t o m a t i c a l l y r e s t a r t t h e p o l l i n g
internal func setUpPolling (
for publicKey : String ,
using dependencies : SMKDependencies = SMKDependencies (
subscribeQueue : Threading . pollerQueue ,
receiveQueue : Threading . pollerQueue
)
) {
guard isPolling . wrappedValue [ publicKey ] = = true else { return }
let namespaces : [ SnodeAPI . Namespace ] = self . namespaces
getSnodeForPolling ( for : publicKey )
. flatMap { snode -> AnyPublisher < [ Message ] , Error > in
Poller . poll (
namespaces : namespaces ,
from : snode ,
for : publicKey ,
poller : self ,
using : dependencies
)
}
. subscribe ( on : dependencies . subscribeQueue )
. receive ( on : dependencies . receiveQueue )
. sinkUntilComplete (
receiveCompletion : { [ weak self ] result in
switch result {
case . finished : self ? . pollRecursively ( for : publicKey , using : dependencies )
case . failure ( let error ) :
guard self ? . isPolling . wrappedValue [ publicKey ] = = true else { return }
self ? . handlePollError ( error , for : publicKey , using : dependencies )
}
}
)
}
private func pollRecursively (
for publicKey : String ,
using dependencies : SMKDependencies = SMKDependencies ( )
) {
guard isPolling . wrappedValue [ publicKey ] = = true else { return }
let namespaces : [ SnodeAPI . Namespace ] = self . namespaces
let nextPollInterval : TimeInterval = nextPollDelay ( for : publicKey )
timers . mutate {
$0 [ publicKey ] = Timer . scheduledTimerOnMainThread (
withTimeInterval : nextPollInterval ,
repeats : false
) { [ weak self ] timer in
timer . invalidate ( )
self ? . getSnodeForPolling ( for : publicKey )
. flatMap { snode -> AnyPublisher < [ Message ] , Error > in
Poller . poll (
namespaces : namespaces ,
from : snode ,
for : publicKey ,
poller : self ,
using : dependencies
)
}
. subscribe ( on : dependencies . subscribeQueue )
. receive ( on : dependencies . receiveQueue )
. sinkUntilComplete (
receiveCompletion : { result in
switch result {
case . failure ( let error ) : self ? . handlePollError ( error , for : publicKey , using : dependencies )
case . finished :
let maxNodePollCount : UInt = ( self ? . maxNodePollCount ? ? 0 )
// I f w e h a v e p o l l e d t h i s s e r v i c e n o d e m o r e t h a n t h e
// m a x i m u m a l l o w e d t h e n t h r o w a n e r r o r s o t h e p a r e n t
// l o o p c a n r e s t a r t t h e p o l l i n g
if maxNodePollCount > 0 {
let pollCount : Int = ( self ? . pollCount . wrappedValue [ publicKey ] ? ? 0 )
self ? . pollCount . mutate { $0 [ publicKey ] = ( pollCount + 1 ) }
guard pollCount < maxNodePollCount else {
let newSnodeNextPollInterval : TimeInterval = ( self ? . nextPollDelay ( for : publicKey ) ? ? nextPollInterval )
self ? . timers . mutate {
$0 [ publicKey ] = Timer . scheduledTimerOnMainThread (
withTimeInterval : newSnodeNextPollInterval ,
repeats : false
) { [ weak self ] timer in
timer . invalidate ( )
self ? . pollCount . mutate { $0 [ publicKey ] = 0 }
self ? . setUpPolling ( for : publicKey , using : dependencies )
}
}
return
}
}
// O t h e r w i s e j u s t l o o p
self ? . pollRecursively ( for : publicKey , using : dependencies )
}
}
)
}
}
}
// / P o l l s t h e s p e c i f i e d n a m e s p a c e s a n d p r o c e s s e s a n y m e s s a g e s , r e t u r n i n g a n a r r a y o f m e s s a g e s t h a t w e r e
// / s u c c e s s f u l l y p r o c e s s e d
// /
// / * * N o t e : * * T h e r e t u r n e d m e s s a g e s w i l l h a v e a l r e a d y b e e n p r o c e s s e d b y t h e ` P o l l e r ` , t h e y a r e o n l y r e t u r n e d
// / f o r c a s e s w h e r e w e n e e d e x p l i c i t / c u s t o m b e h a v i o u r s t o o c c u r ( e g . O n b o a r d i n g )
public static func poll (
namespaces : [ SnodeAPI . Namespace ] ,
from snode : Snode ,
for publicKey : String ,
calledFromBackgroundPoller : Bool = false ,
isBackgroundPollValid : @ escaping ( ( ) -> Bool ) = { true } ,
poller : Poller ? = nil ,
using dependencies : SMKDependencies = SMKDependencies (
receiveQueue : Threading . pollerQueue
)
) -> AnyPublisher < [ Message ] , Error > {
// I f t h e p o l l i n g h a s b e e n c a n c e l l e d t h e n d o n ' t c o n t i n u e
guard
( calledFromBackgroundPoller && isBackgroundPollValid ( ) ) ||
poller ? . isPolling . wrappedValue [ publicKey ] = = true
else {
return Just ( [ ] )
. setFailureType ( to : Error . self )
. eraseToAnyPublisher ( )
}
let pollerName : String = (
poller ? . pollerName ( for : publicKey ) ? ?
" poller with public key \( publicKey ) "
)
let configHashes : [ String ] = SessionUtil . configHashes ( for : publicKey )
// F e t c h t h e m e s s a g e s
return SnodeAPI
. poll (
namespaces : namespaces ,
refreshingConfigHashes : configHashes ,
from : snode ,
associatedWith : publicKey ,
using : dependencies
)
. flatMap { namespacedResults -> AnyPublisher < [ Message ] , Error > in
guard
( calledFromBackgroundPoller && isBackgroundPollValid ( ) ) ||
poller ? . isPolling . wrappedValue [ publicKey ] = = true
else {
return Just ( [ ] )
. setFailureType ( to : Error . self )
. eraseToAnyPublisher ( )
}
let allMessages : [ SnodeReceivedMessage ] = namespacedResults
. compactMap { _ , result -> [ SnodeReceivedMessage ] ? in result . data ? . messages }
. flatMap { $0 }
// N o n e e d t o d o a n y t h i n g i f t h e r e a r e n o m e s s a g e s
guard ! allMessages . isEmpty else {
if ! calledFromBackgroundPoller { SNLog ( " Received no new messages in \( pollerName ) " ) }
return Just ( [ ] )
. setFailureType ( to : Error . self )
. eraseToAnyPublisher ( )
}
// O t h e r w i s e p r o c e s s t h e m e s s a g e s a n d a d d t h e m t o t h e q u e u e f o r h a n d l i n g
let lastHashes : [ String ] = namespacedResults
. compactMap { $0 . value . data ? . lastHash }
let otherKnownHashes : [ String ] = namespacedResults
. filter { $0 . key . shouldDedupeMessages }
. compactMap { $0 . value . data ? . messages . map { $0 . info . hash } }
. reduce ( [ ] , + )
var messageCount : Int = 0
var processedMessages : [ Message ] = [ ]
var hadValidHashUpdate : Bool = false
var configMessageJobsToRun : [ Job ] = [ ]
var standardMessageJobsToRun : [ Job ] = [ ]
var pollerLogOutput : String = " \( pollerName ) failed to process any messages "
Storage . shared . write { db in
let allProcessedMessages : [ ProcessedMessage ] = allMessages
. compactMap { message -> ProcessedMessage ? in
do {
return try Message . processRawReceivedMessage ( db , rawMessage : message )
}
catch {
switch error {
// I g n o r e d u p l i c a t e & s e l f S e n d m e s s a g e e r r o r s ( a n d d o n ' t b o t h e r l o g g i n g
// t h e m a s t h e r e w i l l b e a l o t s i n c e w e e a c h s e r v i c e n o d e d u p l i c a t e s m e s s a g e s )
case DatabaseError . SQLITE_CONSTRAINT_UNIQUE ,
MessageReceiverError . duplicateMessage ,
MessageReceiverError . duplicateControlMessage ,
MessageReceiverError . selfSend :
break
case MessageReceiverError . duplicateMessageNewSnode :
hadValidHashUpdate = true
break
case DatabaseError . SQLITE_ABORT :
// I n t h e b a c k g r o u n d i g n o r e ' S Q L I T E _ A B O R T ' ( i t g e n e r a l l y m e a n s
// t h e B a c k g r o u n d P o l l e r h a s t i m e d o u t
if ! calledFromBackgroundPoller {
SNLog ( " Failed to the database being suspended (running in background with no background task). " )
}
break
default : SNLog ( " Failed to deserialize envelope due to error: \( error ) . " )
}
return nil
}
}
// A d d a j o b t o p r o c e s s t h e c o n f i g m e s s a g e s f i r s t
let configJobIds : [ Int64 ] = allProcessedMessages
. filter { $0 . messageInfo . variant = = . sharedConfigMessage }
. grouped { threadId , _ , _ , _ in threadId }
. compactMap { threadId , threadMessages in
messageCount += threadMessages . count
processedMessages += threadMessages . map { $0 . messageInfo . message }
let jobToRun : Job ? = Job (
variant : . configMessageReceive ,
behaviour : . runOnce ,
threadId : threadId ,
details : ConfigMessageReceiveJob . Details (
messages : threadMessages . map { $0 . messageInfo } ,
calledFromBackgroundPoller : calledFromBackgroundPoller
)
)
configMessageJobsToRun = configMessageJobsToRun . appending ( jobToRun )
// I f w e a r e f o r c e - p o l l i n g t h e n a d d t o t h e J o b R u n n e r s o t h e y a r e
// p e r s i s t e n t a n d w i l l r e t r y o n t h e n e x t a p p r u n i f t h e y f a i l b u t
// d o n ' t l e t t h e m a u t o - s t a r t
let updatedJob : Job ? = JobRunner
. add ( db , job : jobToRun , canStartJob : ! calledFromBackgroundPoller )
return updatedJob ? . id
}
// A d d j o b s f o r p r o c e s s i n g n o n - c o n f i g m e s s a g e s w h i c h a r e d e p e n d a n t o n t h e c o n f i g m e s s a g e
// p r o c e s s i n g j o b s
allProcessedMessages
. filter { $0 . messageInfo . variant != . sharedConfigMessage }
. grouped { threadId , _ , _ , _ in threadId }
. forEach { threadId , threadMessages in
messageCount += threadMessages . count
processedMessages += threadMessages . map { $0 . messageInfo . message }
let jobToRun : Job ? = Job (
variant : . messageReceive ,
behaviour : . runOnce ,
threadId : threadId ,
details : MessageReceiveJob . Details (
messages : threadMessages . map { $0 . messageInfo } ,
calledFromBackgroundPoller : calledFromBackgroundPoller
)
)
standardMessageJobsToRun = standardMessageJobsToRun . appending ( jobToRun )
// I f w e a r e f o r c e - p o l l i n g t h e n a d d t o t h e J o b R u n n e r s o t h e y a r e
// p e r s i s t e n t a n d w i l l r e t r y o n t h e n e x t a p p r u n i f t h e y f a i l b u t
// d o n ' t l e t t h e m a u t o - s t a r t
let updatedJob : Job ? = JobRunner
. add ( db , job : jobToRun , canStartJob : ! calledFromBackgroundPoller )
// C r e a t e t h e d e p e n d e n c y b e t w e e n t h e j o b s
if let updatedJobId : Int64 = updatedJob ? . id {
do {
try configJobIds . forEach { configJobId in
try JobDependencies (
jobId : updatedJobId ,
dependantId : configJobId
)
. insert ( db )
}
}
catch {
SNLog ( " Failed to add dependency between config processing and non-config processing messageReceive jobs. " )
}
}
}
// S e t t h e o u t p u t f o r l o g g i n g
pollerLogOutput = " Received \( messageCount ) new message \( messageCount = = 1 ? " " : " s " ) in \( pollerName ) (duplicates: \( allMessages . count - messageCount ) ) "
// C l e a n u p m e s s a g e h a s h e s a n d a d d s o m e l o g s a b o u t t h e p o l l r e s u l t s
if allMessages . isEmpty && ! hadValidHashUpdate {
pollerLogOutput = " Received \( allMessages . count ) new message \( allMessages . count = = 1 ? " " : " s " ) in \( pollerName ) , all duplicates - marking the hash we polled with as invalid "
// U p d a t e t h e c a c h e d v a l i d i t y o f t h e m e s s a g e s
try SnodeReceivedMessageInfo . handlePotentialDeletedOrInvalidHash (
db ,
potentiallyInvalidHashes : lastHashes ,
otherKnownValidHashes : otherKnownHashes
)
}
}
// O n l y o u t p u t l o g s i f i t i s n ' t t h e b a c k g r o u n d p o l l e r
if ! calledFromBackgroundPoller {
SNLog ( pollerLogOutput )
}
// I f w e a r e n ' t r u n i n g i n a b a c k g r o u n d p o l l e r t h e n j u s t f i n i s h i m m e d i a t e l y
guard calledFromBackgroundPoller else {
return Just ( processedMessages )
. setFailureType ( to : Error . self )
. eraseToAnyPublisher ( )
}
// W e w a n t t o t r y t o h a n d l e t h e r e c e i v e j o b s i m m e d i a t e l y i n t h e b a c k g r o u n d
return Publishers
. MergeMany (
configMessageJobsToRun . map { job -> AnyPublisher < Void , Error > in
Deferred {
Future < Void , Error > { resolver in
// N o t e : I n t h e b a c k g r o u n d w e j u s t w a n t j o b s t o f a i l s i l e n t l y
ConfigMessageReceiveJob . run (
job ,
queue : dependencies . receiveQueue ,
success : { _ , _ in resolver ( Result . success ( ( ) ) ) } ,
failure : { _ , _ , _ in resolver ( Result . success ( ( ) ) ) } ,
deferred : { _ in resolver ( Result . success ( ( ) ) ) }
)
}
}
. eraseToAnyPublisher ( )
}
)
. collect ( )
. flatMap { _ in
Publishers
. MergeMany (
standardMessageJobsToRun . map { job -> AnyPublisher < Void , Error > in
Deferred {
Future < Void , Error > { resolver in
// N o t e : I n t h e b a c k g r o u n d w e j u s t w a n t j o b s t o f a i l s i l e n t l y
MessageReceiveJob . run (
job ,
queue : dependencies . receiveQueue ,
success : { _ , _ in resolver ( Result . success ( ( ) ) ) } ,
failure : { _ , _ , _ in resolver ( Result . success ( ( ) ) ) } ,
deferred : { _ in resolver ( Result . success ( ( ) ) ) }
)
}
}
. eraseToAnyPublisher ( )
}
)
. collect ( )
}
. map { _ in processedMessages }
. eraseToAnyPublisher ( )
}
. eraseToAnyPublisher ( )
}
}