Fixed a couple of BackgroundPoller behaviours

• Fixed an issue where the BackgroundPoller would fail if a single destination failed
  - This would suspend the network, then the other pending requests would error and automatically retry, attempting to create a new connection which could loop a number of times rapidly since the network was suspended
• Fixed an issue where the BackgroundPoller could incorrectly try to write to the database after it had been suspended
pull/986/head
Morgan Pretty 10 months ago
parent 65807ec6d3
commit 234694b292

@ -1 +1 @@
Subproject commit 449c86d645df7a82059693ee70178725ad00ffab
Subproject commit 8d944ab0fbab1c5f0fe8861285daffcda7faf9c5

@ -44,6 +44,7 @@ public final class BackgroundPoller {
}
.defaulting(to: ([], []))
let pollStart: TimeInterval = dependencies.dateNow.timeIntervalSince1970
Log.info("[BackgroundPoller] Fetching Users: 1, Groups: \(groupIds.count), Communities: \(servers.count).")
Publishers
.MergeMany(
@ -56,7 +57,9 @@ public final class BackgroundPoller {
.collect()
.handleEvents(
receiveOutput: { _ in
Log.info("[BackgroundPoller] Finished polling.")
let endTime: TimeInterval = dependencies.dateNow.timeIntervalSince1970
let duration: TimeUnit = .seconds(endTime - pollStart)
Log.info("[BackgroundPoller] Finished polling after \(duration, unit: .s).")
}
)
.sinkUntilComplete(
@ -67,7 +70,9 @@ public final class BackgroundPoller {
switch result {
case .finished: completionHandler(.newData)
case .failure(let error):
Log.error("[BackgroundPoller] Failed due to error: \(error).")
let endTime: TimeInterval = dependencies.dateNow.timeIntervalSince1970
let duration: TimeUnit = .seconds(endTime - pollStart)
Log.error("[BackgroundPoller] Failed due to error: \(error) after \(duration, unit: .s).")
completionHandler(.failed)
}
}
@ -76,10 +81,14 @@ public final class BackgroundPoller {
private static func pollForMessages(
using dependencies: Dependencies
) -> AnyPublisher<Void, Error> {
) -> AnyPublisher<Void, Never> {
let userPublicKey: String = getUserHexEncodedPublicKey(using: dependencies)
return CurrentUserPoller().poll(
let poller: Poller = CurrentUserPoller()
let pollerName: String = poller.pollerName(for: userPublicKey)
let pollStart: TimeInterval = dependencies.dateNow.timeIntervalSince1970
return poller.poll(
namespaces: CurrentUserPoller.namespaces,
for: userPublicKey,
calledFromBackgroundPoller: true,
@ -89,21 +98,37 @@ public final class BackgroundPoller {
)
.handleEvents(
receiveOutput: { _, _, validMessageCount, _ in
Log.info("[BackgroundPoller] Received \(validMessageCount) valid message(s).")
let endTime: TimeInterval = dependencies.dateNow.timeIntervalSince1970
let duration: TimeUnit = .seconds(endTime - pollStart)
Log.info("[BackgroundPoller] \(pollerName) received \(validMessageCount) valid message(s) after \(duration, unit: .s).")
},
receiveCompletion: { result in
switch result {
case .finished: break
case .failure(let error):
let endTime: TimeInterval = dependencies.dateNow.timeIntervalSince1970
let duration: TimeUnit = .seconds(endTime - pollStart)
Log.error("[BackgroundPoller] \(pollerName) failed after \(duration, unit: .s) due to error: \(error).")
}
}
)
.map { _ in () }
.catch { _ in Just(()).eraseToAnyPublisher() }
.eraseToAnyPublisher()
}
private static func pollForClosedGroupMessages(
groupIds: Set<String>,
using dependencies: Dependencies
) -> [AnyPublisher<Void, Error>] {
) -> [AnyPublisher<Void, Never>] {
// Fetch all closed groups (excluding any don't contain the current user as a
// GroupMemeber as the user is no longer a member of those)
return groupIds.map { groupPublicKey in
return ClosedGroupPoller()
let poller: Poller = ClosedGroupPoller()
let pollerName: String = poller.pollerName(for: groupPublicKey)
let pollStart: TimeInterval = dependencies.dateNow.timeIntervalSince1970
return poller
.poll(
namespaces: ClosedGroupPoller.namespaces,
for: groupPublicKey,
@ -114,10 +139,22 @@ public final class BackgroundPoller {
)
.handleEvents(
receiveOutput: { _, _, validMessageCount, _ in
Log.info("[BackgroundPoller] Received \(validMessageCount) valid message(s) for group: \(groupPublicKey).")
let endTime: TimeInterval = dependencies.dateNow.timeIntervalSince1970
let duration: TimeUnit = .seconds(endTime - pollStart)
Log.info("[BackgroundPoller] \(pollerName) received \(validMessageCount) valid message(s) after \(duration, unit: .s).")
},
receiveCompletion: { result in
switch result {
case .finished: break
case .failure(let error):
let endTime: TimeInterval = dependencies.dateNow.timeIntervalSince1970
let duration: TimeUnit = .seconds(endTime - pollStart)
Log.error("[BackgroundPoller] \(pollerName) failed after \(duration, unit: .s) due to error: \(error).")
}
}
)
.map { _ in () }
.catch { _ in Just(()).eraseToAnyPublisher() }
.eraseToAnyPublisher()
}
}
@ -125,9 +162,11 @@ public final class BackgroundPoller {
private static func pollForCommunityMessages(
servers: Set<String>,
using dependencies: Dependencies
) -> [AnyPublisher<Void, Error>] {
return servers.map { server -> AnyPublisher<Void, Error> in
) -> [AnyPublisher<Void, Never>] {
return servers.map { server -> AnyPublisher<Void, Never> in
let poller: OpenGroupAPI.Poller = OpenGroupAPI.Poller(for: server)
let pollerName: String = "Community poller for server: \(server)"
let pollStart: TimeInterval = dependencies.dateNow.timeIntervalSince1970
poller.stop()
return poller.poll(
@ -136,6 +175,25 @@ public final class BackgroundPoller {
isPostCapabilitiesRetry: false,
using: dependencies
)
.handleEvents(
receiveOutput: { _ in
let endTime: TimeInterval = dependencies.dateNow.timeIntervalSince1970
let duration: TimeUnit = .seconds(endTime - pollStart)
Log.info("[BackgroundPoller] \(pollerName) succeeded after \(duration, unit: .s).")
},
receiveCompletion: { result in
switch result {
case .finished: break
case .failure(let error):
let endTime: TimeInterval = dependencies.dateNow.timeIntervalSince1970
let duration: TimeUnit = .seconds(endTime - pollStart)
Log.error("[BackgroundPoller] \(pollerName) failed after \(duration, unit: .s) due to error: \(error).")
}
}
)
.map { _ in () }
.catch { _ in Just(()).eraseToAnyPublisher() }
.eraseToAnyPublisher()
}
}
}

@ -48,7 +48,7 @@ public final class ClosedGroupPoller: Poller {
// MARK: - Abstract Methods
override func pollerName(for publicKey: String) -> String {
override public func pollerName(for publicKey: String) -> String {
return "Closed group poller with public key: \(publicKey)"
}

@ -47,7 +47,7 @@ public final class CurrentUserPoller: Poller {
// MARK: - Abstract Methods
override func pollerName(for publicKey: String) -> String {
override public func pollerName(for publicKey: String) -> String {
return "Main Poller"
}

@ -68,7 +68,7 @@ public class Poller {
// MARK: - Abstract Methods
/// The name for this poller to appear in the logs
internal func pollerName(for publicKey: String) -> String {
public func pollerName(for publicKey: String) -> String {
preconditionFailure("abstract class - override in subclass")
}
@ -227,6 +227,8 @@ public class Poller {
refreshingConfigHashes: configHashes,
from: snode,
swarmPublicKey: swarmPublicKey,
calledFromBackgroundPoller: calledFromBackgroundPoller,
isBackgroundPollValid: isBackgroundPollValid,
using: dependencies
)
}

@ -51,6 +51,8 @@ public final class SnodeAPI {
refreshingConfigHashes: [String] = [],
from snode: LibSession.Snode,
swarmPublicKey: String,
calledFromBackgroundPoller: Bool,
isBackgroundPollValid: @escaping (() -> Bool),
using dependencies: Dependencies
) -> AnyPublisher<[SnodeAPI.Namespace: (info: ResponseInfoType, data: (messages: [SnodeReceivedMessage], lastHash: String?)?)], Error> {
guard let userED25519KeyPair = Identity.fetchUserEd25519KeyPair() else {
@ -184,7 +186,11 @@ public final class SnodeAPI {
/// Since we have extended the TTL for a number of messages we need to make sure we update the local
/// `SnodeReceivedMessageInfo.expirationDateMs` values so we don't end up deleting them
/// incorrectly before they actually expire on the swarm
///
/// **Note:** If this was triggered from a background poll which is no longer valid then don't bother trying
/// to write the changes (the database will be suspended)
if
(!calledFromBackgroundPoller || isBackgroundPollValid()),
!refreshingConfigHashes.isEmpty,
let refreshTTLSubReponse: Network.BatchSubResponse<UpdateExpiryResponse> = batchResponse
.first(where: { $0 is Network.BatchSubResponse<UpdateExpiryResponse> })

Loading…
Cancel
Save