// Copyright © 2022 Rangeproof Pty Ltd. All rights reserved. import Foundation import Combine import GRDB import Sodium import SessionSnodeKit import SessionUtilitiesKit public class Poller { private var timers: Atomic<[String: Timer]> = Atomic([:]) internal var isPolling: Atomic<[String: Bool]> = Atomic([:]) internal var pollCount: Atomic<[String: Int]> = Atomic([:]) internal var failureCount: Atomic<[String: Int]> = Atomic([:]) // MARK: - Settings /// The namespaces which this poller queries internal var namespaces: [SnodeAPI.Namespace] { preconditionFailure("abstract class - override in subclass") } /// The number of times the poller can poll before swapping to a new snode internal var maxNodePollCount: UInt { preconditionFailure("abstract class - override in subclass") } // MARK: - Public API public init() {} public func stopAllPollers() { let pollers: [String] = Array(isPolling.wrappedValue.keys) pollers.forEach { groupPublicKey in self.stopPolling(for: groupPublicKey) } } public func stopPolling(for publicKey: String) { isPolling.mutate { $0[publicKey] = false } timers.mutate { $0[publicKey]?.invalidate() } } // MARK: - Abstract Methods /// The name for this poller to appear in the logs internal func pollerName(for publicKey: String) -> String { preconditionFailure("abstract class - override in subclass") } internal func nextPollDelay(for publicKey: String) -> TimeInterval { preconditionFailure("abstract class - override in subclass") } internal func getSnodeForPolling( for publicKey: String ) -> AnyPublisher { preconditionFailure("abstract class - override in subclass") } internal func handlePollError(_ error: Error, for publicKey: String) { preconditionFailure("abstract class - override in subclass") } // MARK: - Private API internal func startIfNeeded(for publicKey: String) { guard isPolling.wrappedValue[publicKey] != true else { return } // Might be a race condition that the setUpPolling finishes too soon, // and the timer is not created, if we mark the group as is polling // after setUpPolling. So the poller may not work, thus misses messages isPolling.mutate { $0[publicKey] = true } setUpPolling(for: publicKey) } /// We want to initially trigger a poll against the target service node and then run the recursive polling, /// if an error is thrown during the poll then this should automatically restart the polling internal func setUpPolling(for publicKey: String) { guard isPolling.wrappedValue[publicKey] == true else { return } let namespaces: [SnodeAPI.Namespace] = self.namespaces getSnodeForPolling(for: publicKey) .subscribe(on: Threading.pollerQueue) .flatMap { snode -> AnyPublisher in Poller.poll( namespaces: namespaces, from: snode, for: publicKey, on: Threading.pollerQueue, poller: self ) } .receive(on: Threading.pollerQueue) .sinkUntilComplete( receiveCompletion: { [weak self] result in switch result { case .finished: self?.pollRecursively(for: publicKey) case .failure(let error): guard self?.isPolling.wrappedValue[publicKey] == true else { return } self?.handlePollError(error, for: publicKey) } } ) } private func pollRecursively(for publicKey: String) { guard isPolling.wrappedValue[publicKey] == true else { return } let namespaces: [SnodeAPI.Namespace] = self.namespaces let nextPollInterval: TimeInterval = nextPollDelay(for: publicKey) timers.mutate { $0[publicKey] = Timer.scheduledTimerOnMainThread( withTimeInterval: nextPollInterval, repeats: false ) { [weak self] timer in timer.invalidate() self?.getSnodeForPolling(for: publicKey) .subscribe(on: Threading.pollerQueue) .flatMap { snode -> AnyPublisher in Poller.poll( namespaces: namespaces, from: snode, for: publicKey, on: Threading.pollerQueue, poller: self ) } .receive(on: Threading.pollerQueue) .sinkUntilComplete( receiveCompletion: { result in switch result { case .failure(let error): self?.handlePollError(error, for: publicKey) case .finished: let maxNodePollCount: UInt = (self?.maxNodePollCount ?? 0) // If we have polled this service node more than the // maximum allowed then throw an error so the parent // loop can restart the polling if maxNodePollCount > 0 { let pollCount: Int = (self?.pollCount.wrappedValue[publicKey] ?? 0) self?.pollCount.mutate { $0[publicKey] = (pollCount + 1) } guard pollCount < maxNodePollCount else { let newSnodeNextPollInterval: TimeInterval = (self?.nextPollDelay(for: publicKey) ?? nextPollInterval) self?.timers.mutate { $0[publicKey] = Timer.scheduledTimerOnMainThread( withTimeInterval: newSnodeNextPollInterval, repeats: false ) { [weak self] timer in timer.invalidate() self?.pollCount.mutate { $0[publicKey] = 0 } self?.setUpPolling(for: publicKey) } } return } } // Otherwise just loop self?.pollRecursively(for: publicKey) } } ) } } } public static func poll( namespaces: [SnodeAPI.Namespace], from snode: Snode, for publicKey: String, on queue: DispatchQueue, calledFromBackgroundPoller: Bool = false, isBackgroundPollValid: @escaping (() -> Bool) = { true }, poller: Poller? = nil ) -> AnyPublisher { // If the polling has been cancelled then don't continue guard (calledFromBackgroundPoller && isBackgroundPollValid()) || poller?.isPolling.wrappedValue[publicKey] == true else { return Just(()) .setFailureType(to: Error.self) .eraseToAnyPublisher() } let pollerName: String = ( poller?.pollerName(for: publicKey) ?? "poller with public key \(publicKey)" ) // Fetch the messages return SnodeAPI.getMessages(in: namespaces, from: snode, associatedWith: publicKey) .flatMap { namespacedResults -> AnyPublisher in guard (calledFromBackgroundPoller && isBackgroundPollValid()) || poller?.isPolling.wrappedValue[publicKey] == true else { return Just(()) .setFailureType(to: Error.self) .eraseToAnyPublisher() } let allMessagesCount: Int = namespacedResults .map { $0.value.data?.messages.count ?? 0 } .reduce(0, +) // No need to do anything if there are no messages guard allMessagesCount > 0 else { if !calledFromBackgroundPoller { SNLog("Received no new messages in \(pollerName)") } return Just(()) .setFailureType(to: Error.self) .eraseToAnyPublisher() } // Otherwise process the messages and add them to the queue for handling let lastHashes: [String] = namespacedResults .compactMap { $0.value.data?.lastHash } var messageCount: Int = 0 var hadValidHashUpdate: Bool = false var jobsToRun: [Job] = [] Storage.shared.write { db in namespacedResults.forEach { namespace, result in result.data?.messages .compactMap { message -> ProcessedMessage? in do { return try Message.processRawReceivedMessage(db, rawMessage: message) } catch { switch error { // Ignore duplicate & selfSend message errors (and don't bother logging // them as there will be a lot since we each service node duplicates messages) case DatabaseError.SQLITE_CONSTRAINT_UNIQUE, MessageReceiverError.duplicateMessage, MessageReceiverError.duplicateControlMessage, MessageReceiverError.selfSend: break case MessageReceiverError.duplicateMessageNewSnode: hadValidHashUpdate = true break case DatabaseError.SQLITE_ABORT: SNLog("Failed to the database being suspended (running in background with no background task).") break default: SNLog("Failed to deserialize envelope due to error: \(error).") } return nil } } .grouped { threadId, _, _ in (threadId ?? Message.nonThreadMessageId) } .forEach { threadId, threadMessages in messageCount += threadMessages.count JobRunner.add( db, job: Job( variant: .messageReceive, behaviour: .runOnce, threadId: threadId, details: MessageReceiveJob.Details( messages: threadMessages.map { $0.messageInfo }, calledFromBackgroundPoller: false ) ) ) } if messageCount == 0 && !hadValidHashUpdate, let lastHash: String = lastHash { SNLog("Received \(messages.count) new message\(messages.count == 1 ? "" : "s"), all duplicates - marking the hash we polled with as invalid") // Update the cached validity of the messages try SnodeReceivedMessageInfo.handlePotentialDeletedOrInvalidHash( db, potentiallyInvalidHashes: [lastHash], otherKnownValidHashes: messages.map { $0.info.hash } ) } else { SNLog("Received \(messageCount) new message\(messageCount == 1 ? "" : "s") (duplicates: \(messages.count - messageCount))") } } } // If we aren't runing in a background poller then just finish immediately guard calledFromBackgroundPoller else { return Just(()) .setFailureType(to: Error.self) .eraseToAnyPublisher() } // We want to try to handle the receive jobs immediately in the background return Publishers .MergeMany( jobsToRun.map { job -> AnyPublisher in Future { resolver in // Note: In the background we just want jobs to fail silently MessageReceiveJob.run( job, queue: queue, success: { _, _ in resolver(Result.success(())) }, failure: { _, _, _ in resolver(Result.success(())) }, deferred: { _ in resolver(Result.success(())) } ) } .eraseToAnyPublisher() } ) .collect() .map { _ in () } .eraseToAnyPublisher() } .eraseToAnyPublisher() } }