// Copyright © 2022 Rangeproof Pty Ltd. All rights reserved. import Foundation import GRDB import PromiseKit import Sodium import SessionSnodeKit @objc(LKPoller) public final class Poller : NSObject { private let storage = OWSPrimaryStorage.shared() private var isPolling = false private var usedSnodes = Set() private var pollCount = 0 // MARK: Settings private static let pollInterval: TimeInterval = 1.5 private static let retryInterval: TimeInterval = 0.25 /// After polling a given snode this many times we always switch to a new one. /// /// The reason for doing this is that sometimes a snode will be giving us successful responses while /// it isn't actually getting messages from other snodes. private static let maxPollCount: UInt = 6 // MARK: Error private enum Error : LocalizedError { case pollLimitReached var localizedDescription: String { switch self { case .pollLimitReached: return "Poll limit reached for current snode." } } } // MARK: Public API @objc public func startIfNeeded() { guard !isPolling else { return } SNLog("Started polling.") isPolling = true setUpPolling() } @objc public func stop() { SNLog("Stopped polling.") isPolling = false usedSnodes.removeAll() } // MARK: Private API private func setUpPolling() { guard isPolling else { return } Threading.pollerQueue.async { let _ = SnodeAPI.getSwarm(for: getUserHexEncodedPublicKey()).then(on: Threading.pollerQueue) { [weak self] _ -> Promise in guard let strongSelf = self else { return Promise { $0.fulfill(()) } } strongSelf.usedSnodes.removeAll() let (promise, seal) = Promise.pending() strongSelf.pollNextSnode(seal: seal) return promise }.ensure(on: Threading.pollerQueue) { [weak self] in // Timers don't do well on background queues guard let strongSelf = self, strongSelf.isPolling else { return } Timer.scheduledTimerOnMainThread(withTimeInterval: Poller.retryInterval, repeats: false) { _ in guard let strongSelf = self else { return } strongSelf.setUpPolling() } } } } private func pollNextSnode(seal: Resolver) { let userPublicKey = getUserHexEncodedPublicKey() let swarm = SnodeAPI.swarmCache[userPublicKey] ?? [] let unusedSnodes = swarm.subtracting(usedSnodes) if !unusedSnodes.isEmpty { // randomElement() uses the system's default random generator, which is cryptographically secure let nextSnode = unusedSnodes.randomElement()! usedSnodes.insert(nextSnode) poll(nextSnode, seal: seal).done2 { seal.fulfill(()) }.catch2 { [weak self] error in if let error = error as? Error, error == .pollLimitReached { self?.pollCount = 0 } else { SNLog("Polling \(nextSnode) failed; dropping it and switching to next snode.") SnodeAPI.dropSnodeFromSwarmIfNeeded(nextSnode, publicKey: userPublicKey) } Threading.pollerQueue.async { self?.pollNextSnode(seal: seal) } } } else { seal.fulfill(()) } } private func poll(_ snode: Snode, seal longTermSeal: Resolver) -> Promise { guard isPolling else { return Promise { $0.fulfill(()) } } let userPublicKey = getUserHexEncodedPublicKey() return SnodeAPI.getMessages(from: snode, associatedWith: userPublicKey) .then(on: Threading.pollerQueue) { [weak self] messages -> Promise in guard self?.isPolling == true else { return Promise { $0.fulfill(()) } } if !messages.isEmpty { SNLog("Received \(messages.count) message(s).") GRDBStorage.shared.write { db in var threadMessages: [String: [MessageReceiveJob.Details.MessageInfo]] = [:] messages.forEach { message in guard let envelope = SNProtoEnvelope.from(message) else { return } // Extract the threadId and add that to the messageReceive job for // multi-threading and garbage collection purposes let threadId: String? = MessageReceiver.extractSenderPublicKey(db, from: envelope) if threadId == nil { } do { threadMessages[threadId ?? ""] = (threadMessages[threadId ?? ""] ?? []) .appending( MessageReceiveJob.Details.MessageInfo( data: try envelope.serializedData(), serverHash: message.info.hash, serverExpirationTimestamp: (TimeInterval(message.info.expirationDateMs) / 1000) ) ) // Persist the received message after the MessageReceiveJob is created _ = try message.info.saved(db) } catch { switch error { // Ignore unique constraint violations here (they will be hanled in the MessageReceiveJob) case .SQLITE_CONSTRAINT_UNIQUE: break default: SNLog("Failed to deserialize envelope due to error: \(error).") } } } threadMessages.forEach { threadId, threadMessages in JobRunner.add( db, job: Job( variant: .messageReceive, behaviour: .runOnce, threadId: threadId, details: MessageReceiveJob.Details( messages: threadMessages, isBackgroundPoll: false ) ) ) } } } self?.pollCount += 1 guard (self?.pollCount ?? 0) < Poller.maxPollCount else { throw Error.pollLimitReached } return withDelay(Poller.pollInterval, completionQueue: Threading.pollerQueue) { guard let strongSelf = self, strongSelf.isPolling else { return Promise { $0.fulfill(()) } } return strongSelf.poll(snode, seal: longTermSeal) } } } }