Fixed a number of bugs found in the internal release

• Moved the 'getSwarm' behaviour into a distinct job to prevent duplicate API calls
• Updated to the latest libSession (fix libQuic crash)
• Updated the JobRunner to support the `runOnceTransient` behaviour and be able to run transient jobs in the app extensions
• Reworked the extension file logging to be written directly to the file in a single operation rather than line-by-line via the logger
• Fixed a bug where community invites has the wrong author
• Fixed a bug where the title on the disappearing messages settings screen was clipping vertically
• Fixed a bug where tapping on the disappearing messages setting subtitle could incorrectly appear in read-only state for admins
• Fixed a log which contained notification content
• Tweaks to extension logging logic
pull/960/head
Morgan Pretty 2 months ago
parent 2cffda17bc
commit a3188ebea4

@ -1 +1 @@
Subproject commit 1c4667ba0c56c924d4e957743d1324be2c899040
Subproject commit 7651967104845db16e6a58f70635c01f7f4c2033

@ -460,6 +460,7 @@
FCB11D8C1A129A76002F93FB /* CoreMedia.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = FCB11D8B1A129A76002F93FB /* CoreMedia.framework */; };
FD0606BD2BC8BF6F00C3816E /* BuildPathsJob.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD0606BC2BC8BF6F00C3816E /* BuildPathsJob.swift */; };
FD0606BF2BC8C10200C3816E /* _005_AddJobUniqueHash.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD0606BE2BC8C10200C3816E /* _005_AddJobUniqueHash.swift */; };
FD0606C12BCC9A1500C3816E /* GetSwarmJob.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD0606C02BCC9A1500C3816E /* GetSwarmJob.swift */; };
FD078E4827E02561000769AF /* CommonMockedExtensions.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD078E4727E02561000769AF /* CommonMockedExtensions.swift */; };
FD078E4927E02576000769AF /* CommonMockedExtensions.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD078E4727E02561000769AF /* CommonMockedExtensions.swift */; };
FD078E4D27E17156000769AF /* MockOGMCache.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD078E4C27E17156000769AF /* MockOGMCache.swift */; };
@ -1688,6 +1689,7 @@
FCB11D8B1A129A76002F93FB /* CoreMedia.framework */ = {isa = PBXFileReference; lastKnownFileType = wrapper.framework; name = CoreMedia.framework; path = System/Library/Frameworks/CoreMedia.framework; sourceTree = SDKROOT; };
FD0606BC2BC8BF6F00C3816E /* BuildPathsJob.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = BuildPathsJob.swift; sourceTree = "<group>"; };
FD0606BE2BC8C10200C3816E /* _005_AddJobUniqueHash.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = _005_AddJobUniqueHash.swift; sourceTree = "<group>"; };
FD0606C02BCC9A1500C3816E /* GetSwarmJob.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = GetSwarmJob.swift; sourceTree = "<group>"; };
FD078E4727E02561000769AF /* CommonMockedExtensions.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = CommonMockedExtensions.swift; sourceTree = "<group>"; };
FD078E4C27E17156000769AF /* MockOGMCache.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = MockOGMCache.swift; sourceTree = "<group>"; };
FD0969F82A69FFE700C5C365 /* Mocked.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Mocked.swift; sourceTree = "<group>"; };
@ -4564,6 +4566,7 @@
isa = PBXGroup;
children = (
FDF8488D29405C04007DCAE5 /* GetSnodePoolJob.swift */,
FD0606C02BCC9A1500C3816E /* GetSwarmJob.swift */,
FD0606BC2BC8BF6F00C3816E /* BuildPathsJob.swift */,
);
path = Jobs;
@ -6020,6 +6023,7 @@
FD17D7B327F51E5B00122BE0 /* SSKSetting.swift in Sources */,
FDF848E429405D6E007DCAE5 /* SnodeAPIEndpoint.swift in Sources */,
FDF848BE29405C5A007DCAE5 /* GetServiceNodesRequest.swift in Sources */,
FD0606C12BCC9A1500C3816E /* GetSwarmJob.swift in Sources */,
FDF848EB29405E4F007DCAE5 /* OnionRequestAPI.swift in Sources */,
FD17D7AE27F41C4300122BE0 /* SnodeReceivedMessageInfo.swift in Sources */,
);
@ -8100,7 +8104,7 @@
GCC_WARN_UNUSED_VARIABLE = YES;
HEADER_SEARCH_PATHS = "";
IPHONEOS_DEPLOYMENT_TARGET = 13.0;
MARKETING_VERSION = 2.5.1;
MARKETING_VERSION = 2.6.0;
ONLY_ACTIVE_ARCH = YES;
OTHER_CFLAGS = (
"-fobjc-arc-exceptions",
@ -8173,7 +8177,7 @@
GCC_WARN_UNUSED_VARIABLE = YES;
HEADER_SEARCH_PATHS = "";
IPHONEOS_DEPLOYMENT_TARGET = 13.0;
MARKETING_VERSION = 2.5.1;
MARKETING_VERSION = 2.6.0;
ONLY_ACTIVE_ARCH = NO;
OTHER_CFLAGS = (
"-DNS_BLOCK_ASSERTIONS=1",

@ -80,6 +80,7 @@ public class ConversationViewModel: OWSAudioPlayerDelegate {
initialUnreadInteractionInfo: Interaction.TimestampInfo?,
threadIsBlocked: Bool,
currentUserIsClosedGroupMember: Bool?,
currentUserIsClosedGroupAdmin: Bool?,
openGroupPermissions: OpenGroup.Permissions?,
blinded15Key: String?,
blinded25Key: String?
@ -107,13 +108,23 @@ public class ConversationViewModel: OWSAudioPlayerDelegate {
.fetchOne(db)
.defaulting(to: false)
)
let currentUserIsClosedGroupMember: Bool? = (![.legacyGroup, .group].contains(threadVariant) ? nil :
let currentUserIsClosedGroupAdmin: Bool? = (![.legacyGroup, .group].contains(threadVariant) ? nil :
GroupMember
.filter(groupMember[.groupId] == threadId)
.filter(groupMember[.profileId] == currentUserPublicKey)
.filter(groupMember[.role] == GroupMember.Role.standard)
.filter(groupMember[.role] == GroupMember.Role.admin)
.isNotEmpty(db)
)
let currentUserIsClosedGroupMember: Bool? = {
guard [.legacyGroup, .group].contains(threadVariant) else { return nil }
guard currentUserIsClosedGroupAdmin != true else { return true }
return GroupMember
.filter(groupMember[.groupId] == threadId)
.filter(groupMember[.profileId] == currentUserPublicKey)
.filter(groupMember[.role] == GroupMember.Role.standard)
.isNotEmpty(db)
}()
let openGroupPermissions: OpenGroup.Permissions? = (threadVariant != .community ? nil :
try OpenGroup
.filter(id: threadId)
@ -139,6 +150,7 @@ public class ConversationViewModel: OWSAudioPlayerDelegate {
initialUnreadInteractionInfo,
threadIsBlocked,
currentUserIsClosedGroupMember,
currentUserIsClosedGroupAdmin,
openGroupPermissions,
blinded15Key,
blinded25Key
@ -157,6 +169,7 @@ public class ConversationViewModel: OWSAudioPlayerDelegate {
threadIsNoteToSelf: (initialData?.currentUserPublicKey == threadId),
threadIsBlocked: initialData?.threadIsBlocked,
currentUserIsClosedGroupMember: initialData?.currentUserIsClosedGroupMember,
currentUserIsClosedGroupAdmin: initialData?.currentUserIsClosedGroupAdmin,
openGroupPermissions: initialData?.openGroupPermissions
).populatingCurrentUserBlindedKeys(
currentUserBlinded15PublicKeyForThisThread: initialData?.blinded15Key,

@ -775,6 +775,7 @@ class ThreadSettingsViewModel: SessionTableViewModel, NavigationItemSource, Navi
)
dependencies.storage.writeAsync { [dependencies] db in
let currentUserSessionId: String = getUserHexEncodedPublicKey(db, using: dependencies)
try selectedUsers.forEach { userId in
let thread: SessionThread = try SessionThread
.fetchOrCreate(db, id: userId, variant: .contact, shouldBeVisible: nil)
@ -788,7 +789,7 @@ class ThreadSettingsViewModel: SessionTableViewModel, NavigationItemSource, Navi
let interaction: Interaction = try Interaction(
threadId: thread.id,
authorId: userId,
authorId: currentUserSessionId,
variant: .standardOutgoing,
timestampMs: SnodeAPI.currentOffsetTimestampMs(),
expiresInSeconds: try? DisappearingMessagesConfiguration

@ -9,6 +9,18 @@ import SignalCoreKit
import SessionMessagingKit
public class AppEnvironment {
enum ExtensionType {
case share
case notification
var name: String {
switch self {
case .share: return "ShareExtension"
case .notification: return "NotificationExtension"
}
}
}
private static var _shared: AppEnvironment = AppEnvironment()
@ -67,45 +79,61 @@ public class AppEnvironment {
// to a local directory (so they can be exported via XCode) - the below code reads any
// logs from the shared directly and attempts to add them to the main app logs to make
// debugging user issues in extensions easier
DispatchQueue.global(qos: .background).async {
let extensionDirs: [String] = [
"\(OWSFileSystem.appSharedDataDirectoryPath())/Logs/NotificationExtension",
"\(OWSFileSystem.appSharedDataDirectoryPath())/Logs/ShareExtension"
DispatchQueue.global(qos: .background).async { [fileLogger] in
let extensionInfo: [(dir: String, type: ExtensionType)] = [
("\(OWSFileSystem.appSharedDataDirectoryPath())/Logs/NotificationExtension", .notification),
("\(OWSFileSystem.appSharedDataDirectoryPath())/Logs/ShareExtension", .share)
]
let extensionLogs: [String] = extensionDirs.flatMap { dir -> [String] in
let extensionLogs: [(path: String, type: ExtensionType)] = extensionInfo.flatMap { dir, type -> [(path: String, type: ExtensionType)] in
guard let files: [String] = try? FileManager.default.contentsOfDirectory(atPath: dir) else { return [] }
return files.map { "\(dir)/\($0)" }
return files.map { ("\(dir)/\($0)", type) }
}
// Log to ensure the log file exists
OWSLogger.info("")
DDLog.flushLog()
extensionLogs.forEach { logFilePath in
guard let logs: String = try? String(contentsOfFile: logFilePath) else {
try? FileManager.default.removeItem(atPath: logFilePath)
return
}
do {
guard
let currentLogFileInfo: DDLogFileInfo = fileLogger.currentLogFileInfo,
let fileHandle: FileHandle = FileHandle(forWritingAtPath: currentLogFileInfo.filePath)
else { throw StorageError.objectNotFound }
logs.split(separator: "\n").forEach { line in
let lineEmoji: Character? = line
.split(separator: "[")
.first
.map { String($0) }?
.trimmingCharacters(in: .whitespaces)
.last
switch lineEmoji {
case "💙": OWSLogger.verbose("Extension: \(String(line))")
case "💚": OWSLogger.debug("Extension: \(String(line))")
case "💛": OWSLogger.info("Extension: \(String(line))")
case "🧡": OWSLogger.warn("Extension: \(String(line))")
case "❤️": OWSLogger.error("Extension: \(String(line))")
default: OWSLogger.info("Extension: \(String(line))")
}
}
// Ensure we close the file handle
defer { fileHandle.closeFile() }
// Logs have been added - remove them now
DDLog.flushLog()
try? FileManager.default.removeItem(atPath: logFilePath)
// Move to the end of the file to insert the logs
if #available(iOS 13.4, *) { try fileHandle.seekToEnd() }
else { fileHandle.seekToEndOfFile() }
try extensionLogs
.grouped(by: \.type)
.forEach { type, value in
guard
let typeNameStartData: Data = "🧩 \(type.name) -- Start\n".data(using: .utf8),
let typeNameEndData: Data = "🧩 \(type.name) -- End\n".data(using: .utf8)
else { throw StorageError.invalidData }
// Write the type start separator
if #available(iOS 13.4, *) { try fileHandle.write(contentsOf: typeNameStartData) }
else { fileHandle.write(typeNameStartData) }
// Write the logs
try value.forEach { path, _ in
let logData: Data = try Data(contentsOf: URL(fileURLWithPath: path))
if #available(iOS 13.4, *) { try fileHandle.write(contentsOf: logData) }
else { fileHandle.write(logData) }
// Extension logs have been writen to the app logs, remove them now
try? FileManager.default.removeItem(atPath: path)
}
// Write the type end separator
if #available(iOS 13.4, *) { try fileHandle.write(contentsOf: typeNameEndData) }
else { fileHandle.write(typeNameEndData) }
}
}
catch { SNLog("Unable to write extension logs to current log file") }
}
}
}

@ -176,7 +176,11 @@ enum Onboarding {
// Only continue if this isn't a new account
guard self != .register else { return }
// Fetch the
// Enable single-execution jobs (this allows fetching the swarm for retrieving the
// profile name below without triggering other jobs)
JobRunner.enableNewSingleExecutionJobsOnly()
// Fetch any existing profile name
Onboarding.profileNamePublisher
.subscribe(on: DispatchQueue.global(qos: .userInitiated))
.sinkUntilComplete()

@ -27,6 +27,7 @@ final class SessionTableViewTitleView: UIView {
private lazy var titleLabel: UILabel = {
let result: UILabel = UILabel()
result.setContentCompressionResistancePriority(.required, for: .vertical)
result.font = .boldSystemFont(ofSize: Values.mediumFontSize)
result.themeTextColor = .textPrimary
result.lineBreakMode = .byTruncatingTail
@ -63,8 +64,12 @@ final class SessionTableViewTitleView: UIView {
addSubview(stackView)
stackView.pin([ UIView.HorizontalEdge.trailing, UIView.VerticalEdge.top, UIView.VerticalEdge.bottom ], to: self)
stackView.pin(.leading, to: .leading, of: self, withInset: 0)
// Note: We are intentionally letting the stackView go out of bounds because the title will clip
// in some cases when the subtitle wraps over 2 lines (this provides the extra space we need)
stackView.pin(.top, to: .top, of: self, withInset: -2)
stackView.pin(.leading, to: .leading, of: self)
stackView.pin(.trailing, to: .trailing, of: self)
stackView.pin(.bottom, to: .bottom, of: self, withInset: 2)
}
deinit {

@ -43,8 +43,8 @@ public enum GetExpirationJob: JobExecutor {
}
let userPublicKey: String = getUserHexEncodedPublicKey(using: dependencies)
SnodeAPI
.getSwarm(for: userPublicKey, using: dependencies)
GetSwarmJob
.run(for: userPublicKey, using: dependencies)
.tryFlatMapWithRandomSnode(using: dependencies) { snode -> AnyPublisher<(ResponseInfoType, GetExpiriesResponse), Error> in
SnodeAPI.getExpiries(
from: snode,

@ -177,7 +177,7 @@ public class Poller {
let configHashes: [String] = LibSession.configHashes(for: swarmPublicKey)
// Fetch the messages
return SnodeAPI.getSwarm(for: swarmPublicKey, using: dependencies)
return GetSwarmJob.run(for: swarmPublicKey, using: dependencies)
.tryFlatMapWithRandomSnode(drainBehaviour: drainBehaviour, using: dependencies) { snode -> AnyPublisher<[SnodeAPI.Namespace: (info: ResponseInfoType, data: (messages: [SnodeReceivedMessage], lastHash: String?)?)], Error> in
SnodeAPI.poll(
namespaces: namespaces,

@ -363,6 +363,7 @@ public extension SessionThreadViewModel {
threadIsBlocked: Bool? = nil,
contactProfile: Profile? = nil,
currentUserIsClosedGroupMember: Bool? = nil,
currentUserIsClosedGroupAdmin: Bool? = nil,
openGroupPermissions: OpenGroup.Permissions? = nil,
unreadCount: UInt = 0,
hasUnreadMessagesOfAnyKind: Bool = false,
@ -402,7 +403,7 @@ public extension SessionThreadViewModel {
self.closedGroupName = nil
self.closedGroupUserCount = nil
self.currentUserIsClosedGroupMember = currentUserIsClosedGroupMember
self.currentUserIsClosedGroupAdmin = nil
self.currentUserIsClosedGroupAdmin = currentUserIsClosedGroupAdmin
self.openGroupName = nil
self.openGroupServer = nil
self.openGroupRoomToken = nil
@ -997,6 +998,16 @@ public extension SessionThreadViewModel {
\(SQL("\(groupMember[.profileId]) = \(userPublicKey)"))
)
) AS \(ViewModel.Columns.currentUserIsClosedGroupMember),
EXISTS (
SELECT 1
FROM \(GroupMember.self)
WHERE (
\(groupMember[.groupId]) = \(closedGroup[.threadId]) AND
\(SQL("\(groupMember[.role]) = \(GroupMember.Role.admin)")) AND
\(SQL("\(groupMember[.profileId]) = \(userPublicKey)"))
)
) AS \(ViewModel.Columns.currentUserIsClosedGroupAdmin),
\(openGroup[.name]) AS \(ViewModel.Columns.openGroupName),
\(openGroup[.server]) AS \(ViewModel.Columns.openGroupServer),

@ -237,16 +237,16 @@ public class NSENotificationPresenter: NSObject, NotificationsProtocol {
private func addNotifcationRequest(identifier: String, notificationContent: UNNotificationContent, trigger: UNNotificationTrigger?) {
let request = UNNotificationRequest(identifier: identifier, content: notificationContent, trigger: trigger)
SNLog("Add remote notification request: \(notificationContent.body)")
SNLog("Add remote notification request: \(identifier)")
let semaphore = DispatchSemaphore(value: 0)
UNUserNotificationCenter.current().add(request) { error in
if let error = error {
SNLog("Failed to add notification request due to error:\(error)")
SNLog("Failed to add notification request '\(identifier)' due to error: \(error)")
}
semaphore.signal()
}
semaphore.wait()
SNLog("Finish adding remote notification request")
SNLog("Finish adding remote notification request '\(identifier)")
}
}

@ -1,18 +1,20 @@
// Copyright © 2023 Rangeproof Pty Ltd. All rights reserved.
//
// stringlint:disable
import Foundation
import SessionMessagingKit
enum NotificationError: LocalizedError {
enum NotificationError: Error, CustomStringConvertible {
case processing(PushNotificationAPI.ProcessResult)
case messageProcessing
case messageHandling(MessageReceiverError)
public var errorDescription: String? {
public var description: String {
switch self {
case .processing(let result): return "Failed to process notification (\(result))"
case .messageProcessing: return "Failed to process message"
case .messageHandling(let error): return "Failed to handle message (\(error))"
case .processing(let result): return "Failed to process notification (\(result)) (NotificationError.processing)."
case .messageProcessing: return "Failed to process message (NotificationError.messageProcessing)."
case .messageHandling(let error): return "Failed to handle message (\(error)) (NotificationError.messageHandling)."
}
}
}

@ -302,6 +302,8 @@ public final class NotificationServiceExtension: UNNotificationServiceExtension
// Note that this does much more than set a flag; it will also run all deferred blocks.
Singleton.appReadiness.setAppReady()
JobRunner.enableNewSingleExecutionJobsOnly()
}
// MARK: Handle completion

@ -1,6 +1,6 @@
//
// Copyright (c) 2020 Open Whisper Systems. All rights reserved.
//
// stringlint:disable
import Foundation
import SignalUtilitiesKit

@ -153,6 +153,7 @@ final class ShareNavController: UINavigationController, ShareViewDelegate {
// We don't need to use SyncPushTokensJob in the SAE.
// We don't need to use DeviceSleepManager in the SAE.
JobRunner.enableNewSingleExecutionJobsOnly()
AppVersion.sharedInstance().saeLaunchDidComplete()
showLockScreenOrMainContent()

@ -244,8 +244,8 @@ final class ThreadPickerVC: UIViewController, UITableViewDataSource, UITableView
}
.subscribe(on: DispatchQueue.global(qos: .userInitiated))
.flatMap { _ in
SnodeAPI
.getSwarm(
GetSwarmJob
.run(
for: {
switch threadVariant {
case .contact, .legacyGroup, .group: return threadId

@ -33,5 +33,6 @@ public enum SNSnodeKit: MigratableTarget { // Just to make the external API nice
// Configure the job executors
JobRunner.setExecutor(GetSnodePoolJob.self, for: .getSnodePool)
JobRunner.setExecutor(BuildPathsJob.self, for: .buildPaths)
JobRunner.setExecutor(GetSwarmJob.self, for: .getSwarm)
}
}

@ -200,15 +200,11 @@ public enum BuildPathsJob: JobExecutor {
.defaulting(to: true)
let targetJob: Job? = dependencies.storage.write(using: dependencies) { db in
// Fetch an existing job if there is one (if there are multiple it doesn't matter which we select)
if let existingJob: Job = try? Job.filter(Job.Columns.variant == Job.Variant.buildPaths).fetchOne(db) {
return existingJob
}
return dependencies.jobRunner.add(
return dependencies.jobRunner.upsert(
db,
job: Job(
variant: .buildPaths,
behaviour: .runOnceTransient,
shouldBeUnique: true,
details: Details(reusablePaths: paths, ed25519SecretKey: ed25519SecretKey)
),

@ -0,0 +1,133 @@
// Copyright © 2024 Rangeproof Pty Ltd. All rights reserved.
import Foundation
import Combine
import GRDB
import SessionUtilitiesKit
public enum GetSwarmJob: JobExecutor {
public static let maxFailureCount: Int = 0
public static let requiresThreadId: Bool = false
public static let requiresInteractionId: Bool = false
/// The minimum number of snodes in a swarm.
private static let minSwarmSnodeCount: Int = 3
public static func run(
_ job: Job,
queue: DispatchQueue,
success: @escaping (Job, Bool, Dependencies) -> (),
failure: @escaping (Job, Error?, Bool, Dependencies) -> (),
deferred: @escaping (Job, Dependencies) -> (),
using dependencies: Dependencies
) {
guard
let detailsData: Data = job.details,
let details: Details = try? JSONDecoder().decode(Details.self, from: detailsData)
else {
SNLog("[GetSwarmJob] Failing due to missing details.")
return failure(job, JobRunnerError.missingRequiredDetails, true, dependencies)
}
SNLog("[GetSwarmJob] Retrieving swarm for \(details.swarmPublicKey).")
return SnodeAPI
.getSwarm(for: details.swarmPublicKey, using: dependencies)
.subscribe(on: queue, using: dependencies)
.receive(on: queue, using: dependencies)
.sinkUntilComplete(
receiveCompletion: { result in
switch result {
case .finished: break
case .failure(let error):
SNLog("[GetSwarmJob] Failed due to error: \(error)")
failure(job, error, false, dependencies)
}
},
receiveValue: { (snodes: Set<Snode>) in
// Store the swarm and update the 'loadedSwarms' state so we don't fetch it again from the
// database the next time it's used
SnodeAPI.setSwarm(to: snodes, for: details.swarmPublicKey)
SnodeAPI.loadedSwarms.mutate { $0.insert(details.swarmPublicKey) }
SNLog("[GetSwarmJob] Complete.")
success(job, false, dependencies)
}
)
}
public static func run(
for swarmPublicKey: String,
using dependencies: Dependencies
) -> AnyPublisher<Set<Snode>, Error> {
// Try to load the swarm from the database if we haven't already
if !SnodeAPI.loadedSwarms.wrappedValue.contains(swarmPublicKey) {
let updatedCacheForKey: Set<Snode> = dependencies.storage
.read { db in try Snode.fetchSet(db, publicKey: swarmPublicKey) }
.defaulting(to: [])
SnodeAPI.swarmCache.mutate { $0[swarmPublicKey] = updatedCacheForKey }
SnodeAPI.loadedSwarms.mutate { $0.insert(swarmPublicKey) }
}
// If we already have a cached version of the swarm which is large enough then use that
if let cachedSwarm = SnodeAPI.swarmCache.wrappedValue[swarmPublicKey], cachedSwarm.count >= minSwarmSnodeCount {
return Just(cachedSwarm)
.setFailureType(to: Error.self)
.eraseToAnyPublisher()
}
// Otherwise trigger the job
return Deferred {
Future<Set<Snode>, Error> { resolver in
let targetJob: Job? = dependencies.storage.write(using: dependencies) { db in
return dependencies.jobRunner.upsert(
db,
job: Job(
variant: .getSwarm,
behaviour: .runOnceTransient,
shouldBeUnique: true,
details: Details(swarmPublicKey: swarmPublicKey)
),
canStartJob: true,
using: dependencies
)
}
guard let job: Job = targetJob else {
SNLog("[GetSwarmJob] Failed to retrieve existing job or schedule a new one.")
return resolver(Result.failure(JobRunnerError.generic))
}
dependencies.jobRunner.afterJob(job) { result in
switch result {
case .succeeded:
guard
let cachedSwarm = SnodeAPI.swarmCache.wrappedValue[swarmPublicKey],
cachedSwarm.count >= minSwarmSnodeCount
else {
SNLog("[GetSwarmJob] Failed to find swarm in cache after job.")
return resolver(Result.failure(JobRunnerError.generic))
}
resolver(Result.success(cachedSwarm))
case .failed(let error, _): resolver(Result.failure(error ?? JobRunnerError.generic))
case .deferred, .notFound: resolver(Result.failure(JobRunnerError.generic))
}
}
}
}.eraseToAnyPublisher()
}
}
// MARK: - GetSwarmJob.Details
extension GetSwarmJob {
public struct Details: Codable {
private enum CodingKeys: String, CodingKey {
case swarmPublicKey
}
fileprivate let swarmPublicKey: String
}
}

@ -55,8 +55,8 @@ public extension Network.PreparedRequest {
case let randomSnode as Network.RandomSnodeTarget:
guard let payload: Data = request.httpBody else { throw NetworkError.invalidPreparedRequest }
return SnodeAPI.getSwarm(for: randomSnode.swarmPublicKey, using: dependencies)
.tryFlatMapWithRandomSnode(retry: SnodeAPI.maxRetryCount, using: dependencies) { snode in
return GetSwarmJob.run(for: randomSnode.swarmPublicKey, using: dependencies)
.tryFlatMapWithRandomSnode(retry: randomSnode.retryCount, using: dependencies) { snode in
dependencies.network
.send(
.onionRequest(
@ -72,7 +72,7 @@ public extension Network.PreparedRequest {
case let randomSnode as Network.RandomSnodeLatestNetworkTimeTarget:
guard request.httpBody != nil else { throw NetworkError.invalidPreparedRequest }
return SnodeAPI.getSwarm(for: randomSnode.swarmPublicKey, using: dependencies)
return GetSwarmJob.run(for: randomSnode.swarmPublicKey, using: dependencies)
.tryFlatMapWithRandomSnode(retry: SnodeAPI.maxRetryCount, using: dependencies) { snode in
SnodeAPI
.getNetworkTime(from: snode, using: dependencies)

@ -22,6 +22,7 @@ internal extension Network {
internal extension Network {
struct RandomSnodeTarget: RequestTarget, Equatable {
let swarmPublicKey: String
let retryCount: Int
var url: URL? { URL(string: "snode:\(swarmPublicKey)") }
var urlPathAndParamsString: String { return "" }
@ -53,7 +54,8 @@ public extension Request {
snode: Snode,
headers: [HTTPHeader: String] = [:],
body: T? = nil,
swarmPublicKey: String?
swarmPublicKey: String?,
retryCount: Int
) {
self = Request(
method: method,
@ -76,13 +78,15 @@ public extension Request {
endpoint: Endpoint,
swarmPublicKey: String,
headers: [HTTPHeader: String] = [:],
body: T? = nil
body: T? = nil,
retryCount: Int
) {
self = Request(
method: method,
endpoint: endpoint,
target: Network.RandomSnodeTarget(
swarmPublicKey: swarmPublicKey
swarmPublicKey: swarmPublicKey,
retryCount: retryCount
),
headers: headers,
body: body
@ -99,7 +103,8 @@ public extension Request {
swarmPublicKey: String,
headers: [HTTPHeader: String] = [:],
requiresLatestNetworkTime: Bool,
body: T? = nil
body: T? = nil,
retryCount: Int
) where T: UpdatableTimestamp {
self = Request(
method: method,
@ -112,7 +117,8 @@ public extension Request {
endpoint: endpoint,
swarmPublicKey: swarmPublicKey,
headers: headers,
body: body?.with(timestampMs: timestampMs)
body: body?.with(timestampMs: timestampMs),
retryCount: retryCount
).generateUrlRequest(using: dependencies)
}
),

@ -23,7 +23,7 @@ public final class SnodeAPI {
internal static let sodium: Atomic<Sodium> = Atomic(Sodium())
private static var hasLoadedSnodePool: Atomic<Bool> = Atomic(false)
private static var loadedSwarms: Atomic<Set<String>> = Atomic([])
internal static var loadedSwarms: Atomic<Set<String>> = Atomic([])
private static var getSnodePoolPublisher: Atomic<AnyPublisher<Set<Snode>, Error>?> = Atomic(nil)
/// - Note: Should only be accessed from `Threading.workQueue` to avoid race conditions.
@ -47,7 +47,6 @@ public final class SnodeAPI {
// MARK: - Settings
internal static let maxRetryCount: Int = 8
private static let minSwarmSnodeCount: Int = 3
private static let seedNodePool: Set<Snode> = {
guard !Features.useTestnet else {
return [
@ -146,22 +145,9 @@ public final class SnodeAPI {
// MARK: - Swarm Interaction
private static func loadSwarmIfNeeded(for publicKey: String) {
guard !loadedSwarms.wrappedValue.contains(publicKey) else { return }
let updatedCacheForKey: Set<Snode> = Storage.shared
.read { db in try Snode.fetchSet(db, publicKey: publicKey) }
.defaulting(to: [])
swarmCache.mutate { $0[publicKey] = updatedCacheForKey }
loadedSwarms.mutate { $0.insert(publicKey) }
}
internal static func setSwarm(to newValue: Set<Snode>, for publicKey: String, persist: Bool = true) {
internal static func setSwarm(to newValue: Set<Snode>, for publicKey: String) {
swarmCache.mutate { $0[publicKey] = newValue }
guard persist else { return }
Storage.shared.write { db in
try? newValue.save(db, key: publicKey)
}
@ -257,41 +243,31 @@ public final class SnodeAPI {
}
}
public static func getSwarm(
internal static func getSwarm(
for swarmPublicKey: String,
using dependencies: Dependencies = Dependencies()
using dependencies: Dependencies
) -> AnyPublisher<Set<Snode>, Error> {
loadSwarmIfNeeded(for: swarmPublicKey)
if let cachedSwarm = swarmCache.wrappedValue[swarmPublicKey], cachedSwarm.count >= minSwarmSnodeCount {
return Just(cachedSwarm)
.setFailureType(to: Error.self)
.eraseToAnyPublisher()
}
SNLog("Getting swarm for: \((swarmPublicKey == getUserHexEncodedPublicKey()) ? "self" : swarmPublicKey).")
// Note: We do an explicit `getRandomSnode` call here because we want to send the request
// to _any_ random snode rather than a random snode for the given `swarmPublicKey`
return getRandomSnode()
.tryFlatMap { snode in
.tryFlatMap { snode -> AnyPublisher<Set<Snode>, Error> in
try SnodeAPI
.prepareRequest(
request: Request(
endpoint: .getSwarm,
snode: snode,
swarmPublicKey: swarmPublicKey,
body: GetSwarmRequest(pubkey: swarmPublicKey)
body: GetSwarmRequest(pubkey: swarmPublicKey),
retryCount: 4
),
responseType: GetSwarmResponse.self,
using: dependencies
)
.send(using: dependencies)
.retry(4)
.map { _, response in response.snodes }
.handleEvents(
receiveOutput: { snodes in setSwarm(to: snodes, for: swarmPublicKey) }
)
.eraseToAnyPublisher()
}
.eraseToAnyPublisher()
}
// MARK: - Batching & Polling
@ -1415,7 +1391,8 @@ private extension Request {
init<B: Encodable>(
endpoint: SnodeAPI.Endpoint,
swarmPublicKey: String,
body: B
body: B,
retryCount: Int = SnodeAPI.maxRetryCount
) where T == SnodeRequest<B>, Endpoint == SnodeAPI.Endpoint {
self = Request(
method: .post,
@ -1424,7 +1401,8 @@ private extension Request {
body: SnodeRequest<B>(
endpoint: endpoint,
body: body
)
),
retryCount: retryCount
)
}
@ -1432,7 +1410,8 @@ private extension Request {
endpoint: SnodeAPI.Endpoint,
snode: Snode,
swarmPublicKey: String? = nil,
body: B
body: B,
retryCount: Int = SnodeAPI.maxRetryCount
) where T == SnodeRequest<B>, Endpoint == SnodeAPI.Endpoint {
self = Request(
method: .post,
@ -1442,7 +1421,8 @@ private extension Request {
endpoint: endpoint,
body: body
),
swarmPublicKey: swarmPublicKey
swarmPublicKey: swarmPublicKey,
retryCount: retryCount
)
}
@ -1450,7 +1430,8 @@ private extension Request {
endpoint: SnodeAPI.Endpoint,
swarmPublicKey: String,
requiresLatestNetworkTime: Bool,
body: B
body: B,
retryCount: Int = SnodeAPI.maxRetryCount
) where T == SnodeRequest<B>, Endpoint == SnodeAPI.Endpoint, B: Encodable & UpdatableTimestamp {
self = Request(
method: .post,
@ -1460,7 +1441,8 @@ private extension Request {
body: SnodeRequest<B>(
endpoint: endpoint,
body: body
)
),
retryCount: retryCount
)
}
}

@ -133,6 +133,10 @@ public struct Job: Codable, Equatable, Hashable, Identifiable, FetchableRecord,
/// This job runs whenever we don't have enough onion request paths, it also runs distinctly so there should only
/// ever be one at a time
case buildPaths
/// This job runs whenever we don't have the swarm for a public key, it also runs distinctly so there should only
/// ever be one at a time
case getSwarm
}
public enum Behaviour: Int, Codable, DatabaseValueConvertible, CaseIterable {
@ -153,6 +157,10 @@ public struct Job: Codable, Equatable, Hashable, Identifiable, FetchableRecord,
/// This job will run once each whenever the app becomes active (launch and return from background) and
/// may run again during the same session if `nextRunTimestamp` gets set
case recurringOnActive
/// This job will run once and, while it does get persisted to the database, upon subsequent launch jobs with
/// this behaivour will not be run and will be cleared from the database
case runOnceTransient
}
/// The `id` value is auto incremented by the database, if the `Job` hasn't been inserted into

@ -20,12 +20,13 @@ public protocol JobRunnerType {
func appDidFinishLaunching(using dependencies: Dependencies)
func appDidBecomeActive(using dependencies: Dependencies)
func startNonBlockingQueues(using dependencies: Dependencies)
func enableNewSingleExecutionJobsOnly(using dependencies: Dependencies)
func stopAndClearPendingJobs(exceptForVariant: Job.Variant?, using dependencies: Dependencies, onComplete: (() -> ())?)
// MARK: - Job Scheduling
@discardableResult func add(_ db: Database, job: Job?, dependantJob: Job?, canStartJob: Bool, using dependencies: Dependencies) -> Job?
func upsert(_ db: Database, job: Job?, canStartJob: Bool, using dependencies: Dependencies)
@discardableResult func upsert(_ db: Database, job: Job?, canStartJob: Bool, using dependencies: Dependencies) -> Job?
@discardableResult func insert(_ db: Database, job: Job?, before otherJob: Job) -> (Int64, Job)?
func enqueueDependenciesIfNeeded(_ jobs: [Job], using dependencies: Dependencies)
func afterJob(_ job: Job?, state: JobRunner.JobState, callback: @escaping (JobRunner.JobResult) -> ())
@ -204,6 +205,7 @@ public final class JobRunner: JobRunnerType {
internal var appReadyToStartQueues: Atomic<Bool> = Atomic(false)
internal var appHasBecomeActive: Atomic<Bool> = Atomic(false)
internal var forceAllowSingleExecutionJobs: Atomic<Bool> = Atomic(false)
internal var perSessionJobsCompleted: Atomic<Set<Int64>> = Atomic([])
internal var hasCompletedInitialBecomeActive: Atomic<Bool> = Atomic(false)
internal var shutdownBackgroundTask: Atomic<OWSBackgroundTask?> = Atomic(nil)
@ -228,7 +230,6 @@ public final class JobRunner: JobRunnerType {
self.allowToExecuteJobs = (
isTestingJobRunner || (
Singleton.hasAppContext &&
Singleton.appContext.isMainApp &&
!SNUtilitiesKit.isRunningTests
)
)
@ -256,7 +257,8 @@ public final class JobRunner: JobRunnerType {
jobVariants.remove(.sendReadReceipts),
jobVariants.remove(.groupLeaving),
jobVariants.remove(.configurationSync),
jobVariants.remove(.buildPaths)
jobVariants.remove(.buildPaths),
jobVariants.remove(.getSwarm)
].compactMap { $0 }
),
@ -322,6 +324,7 @@ public final class JobRunner: JobRunnerType {
// Now that we've finished setting up the JobRunner, update the queue closures
self.blockingQueue.mutate {
$0?.canStart = { [weak self] queue -> Bool in (self?.canStart(queue: queue) == true) }
$0?.canStartPendingJobs = { [weak self] queue -> Bool in (self?.canStartPendingJobs(queue: queue) == true) }
$0?.onQueueDrained = { [weak self] in
// Once all blocking jobs have been completed we want to start running
// the remaining job queues
@ -337,6 +340,9 @@ public final class JobRunner: JobRunnerType {
self.queues.mutate {
$0.values.forEach { queue in
queue.canStart = { [weak self] targetQueue -> Bool in (self?.canStart(queue: targetQueue) == true) }
queue.canStartPendingJobs = { [weak self] targetQueue -> Bool in
(self?.canStartPendingJobs(queue: targetQueue) == true)
}
}
}
}
@ -349,6 +355,19 @@ public final class JobRunner: JobRunnerType {
}
public func canStart(queue: JobQueue?) -> Bool {
return (
allowToExecuteJobs && (
forceAllowSingleExecutionJobs.wrappedValue || (
appReadyToStartQueues.wrappedValue && (
queue?.type == .blocking ||
canStartNonBlockingQueue
)
)
)
)
}
public func canStartPendingJobs(queue: JobQueue?) -> Bool {
return (
allowToExecuteJobs &&
appReadyToStartQueues.wrappedValue && (
@ -448,8 +467,20 @@ public final class JobRunner: JobRunnerType {
}
public func appDidFinishLaunching(using dependencies: Dependencies) {
// Clear any 'runOnceTransient' entries in the database (they should only ever be run during
// the app session that they were scheduled in)
//
// Note: If we are already in "single-execution mode" then don't do this as there could be running
// jobs (this case occurs during Onboarding when trying to retrieve the existing profile name)
if !forceAllowSingleExecutionJobs.wrappedValue {
dependencies.storage.writeAsync { db in
try Job.filter(Job.Columns.behaviour == Job.Behaviour.runOnceTransient).deleteAll(db)
}
}
// Flag that the JobRunner can start it's queues
appReadyToStartQueues.mutate { $0 = true }
forceAllowSingleExecutionJobs.mutate { $0 = false }
// Note: 'appDidBecomeActive' will run on first launch anyway so we can
// leave those jobs out and can wait until then to start the JobRunner
@ -574,6 +605,27 @@ public final class JobRunner: JobRunnerType {
}
}
public func enableNewSingleExecutionJobsOnly(using dependencies: Dependencies) {
// If we have already fully started the JobRunner then don't bother doing this (this shouldn't
// currently be possible but might be in the future and swapping this flag while the JobRunner
// is in it's "normal" mode could result in unexpected behaviour)
guard !appReadyToStartQueues.wrappedValue else { return }
// Clear any 'runOnceTransient' entries in the database (they should only ever be run during
// the app session that they were scheduled in)
dependencies.storage.writeAsync { db in
try Job.filter(Job.Columns.behaviour == Job.Behaviour.runOnceTransient).deleteAll(db)
}
// This function is called by the app extensions to allow them to run jobs directly without
// triggering any recurring or pending jobs
//
// Note: This will only allow jobs to run if they are directly added to a job queue as if
// `canStartPendingJobs` returns `false` then any persisted jobs **WILL NOT** be fetched and
// added to the queue
forceAllowSingleExecutionJobs.mutate { $0 = true }
}
public func stopAndClearPendingJobs(
exceptForVariant: Job.Variant?,
using dependencies: Dependencies,
@ -678,28 +730,39 @@ public final class JobRunner: JobRunnerType {
job: Job?,
canStartJob: Bool,
using dependencies: Dependencies
) {
guard let job: Job = job else { return } // Ignore null jobs
) -> Job? {
guard let job: Job = job else { return nil } // Ignore null jobs
guard job.id != nil else {
add(db, job: job, canStartJob: canStartJob, using: dependencies)
return
// When we upsert a job that should be unique we want to return the existing job (if it exists)
switch job.uniqueHashValue {
case .none: return add(db, job: job, canStartJob: canStartJob, using: dependencies)
case .some:
let existingJob: Job? = try? Job
.filter(Job.Columns.variant == job.variant)
.filter(Job.Columns.uniqueHashValue == job.uniqueHashValue)
.fetchOne(db)
return (existingJob ?? add(db, job: job, canStartJob: canStartJob, using: dependencies))
}
}
guard let updatedJob: Job = validatedJob(db, job: job, validation: .enqueueOnly) else { return }
guard let updatedJob: Job = validatedJob(db, job: job, validation: .enqueueOnly) else { return nil }
// Don't add to the queue if the JobRunner isn't ready (it's been saved to the db so it'll be loaded
// once the queue actually get started later)
guard canAddToQueue(updatedJob) else { return }
guard canAddToQueue(updatedJob) else { return updatedJob }
let jobQueue: JobQueue? = queues.wrappedValue[updatedJob.variant]
jobQueue?.upsert(db, job: updatedJob, canStartJob: canStartJob, using: dependencies)
// Don't start the queue if the job can't be started
guard canStartJob else { return }
guard canStartJob else { return updatedJob }
// Start the job runner if needed
db.afterNextTransactionNestedOnce(dedupeId: "JobRunner-Start: \(jobQueue?.queueContext ?? "N/A")") { _ in
jobQueue?.start(using: dependencies)
}
return updatedJob
}
@discardableResult public func insert(
@ -798,6 +861,10 @@ public final class JobRunner: JobRunnerType {
return (
job.behaviour == .runOnceNextLaunch ||
job.behaviour == .recurringOnLaunch ||
(
job.behaviour == .runOnceTransient &&
forceAllowSingleExecutionJobs.wrappedValue
) ||
appHasBecomeActive.wrappedValue
)
}
@ -949,6 +1016,7 @@ public final class JobQueue: Hashable {
private var executorMap: Atomic<[Job.Variant: JobExecutor.Type]> = Atomic([:])
fileprivate var canStart: ((JobQueue?) -> Bool)?
fileprivate var canStartPendingJobs: ((JobQueue?) -> Bool)?
fileprivate var onQueueDrained: (() -> ())?
fileprivate var hasStartedAtLeastOnce: Atomic<Bool> = Atomic(false)
fileprivate var isRunning: Atomic<Bool> = Atomic(false)
@ -1253,21 +1321,25 @@ public final class JobQueue: Hashable {
hasStartedAtLeastOnce.mutate { $0 = true }
// Get any pending jobs
let jobVariants: [Job.Variant] = self.jobVariants
var jobsToRun: [Job] = []
let jobIdsAlreadyRunning: Set<Int64> = currentlyRunningJobIds.wrappedValue
let jobsAlreadyInQueue: Set<Int64> = pendingJobsQueue.wrappedValue.compactMap { $0.id }.asSet()
let jobsToRun: [Job] = dependencies.storage.read(using: dependencies) { db in
try Job
.filterPendingJobs(
variants: jobVariants,
excludeFutureJobs: true,
includeJobsWithDependencies: false
)
.filter(!jobIdsAlreadyRunning.contains(Job.Columns.id)) // Exclude jobs already running
.filter(!jobsAlreadyInQueue.contains(Job.Columns.id)) // Exclude jobs already in the queue
.fetchAll(db)
if canStartPendingJobs?(self) == true {
let jobVariants: [Job.Variant] = self.jobVariants
let jobsAlreadyInQueue: Set<Int64> = pendingJobsQueue.wrappedValue.compactMap { $0.id }.asSet()
jobsToRun = dependencies.storage.read(using: dependencies) { db in
try Job
.filterPendingJobs(
variants: jobVariants,
excludeFutureJobs: true,
includeJobsWithDependencies: false
)
.filter(!jobIdsAlreadyRunning.contains(Job.Columns.id)) // Exclude jobs already running
.filter(!jobsAlreadyInQueue.contains(Job.Columns.id)) // Exclude jobs already in the queue
.fetchAll(db)
}
.defaulting(to: [])
}
.defaulting(to: [])
// Determine the number of jobs to run
var jobCount: Int = 0
@ -1454,6 +1526,15 @@ public final class JobQueue: Hashable {
}
private func scheduleNextSoonestJob(using dependencies: Dependencies) {
// If we can't schedule pending jobs then complete the queue
guard canStartPendingJobs?(self) == true else {
if executionType != .concurrent || currentlyRunningJobIds.wrappedValue.isEmpty {
self.onQueueDrained?()
}
return
}
// Retrieve any pending jobs from the database
let jobVariants: [Job.Variant] = self.jobVariants
let jobIdsAlreadyRunning: Set<Int64> = currentlyRunningJobIds.wrappedValue
let nextJobTimestamp: TimeInterval? = dependencies.storage.read(using: dependencies) { db in
@ -1835,6 +1916,10 @@ public extension JobRunner {
instance.appDidBecomeActive(using: dependencies)
}
static func enableNewSingleExecutionJobsOnly(using dependencies: Dependencies = Dependencies()) {
instance.enableNewSingleExecutionJobsOnly(using: dependencies)
}
static func afterBlockingQueue(callback: @escaping () -> ()) {
instance.afterBlockingQueue(callback: callback)
}
@ -1856,12 +1941,12 @@ public extension JobRunner {
///
/// **Note:** If the job has a `behaviour` of `runOnceNextLaunch` or the `nextRunTimestamp`
/// is in the future then the job won't be started
static func upsert(
@discardableResult static func upsert(
_ db: Database,
job: Job?,
canStartJob: Bool = true,
using dependencies: Dependencies = Dependencies()
) { instance.upsert(db, job: job, canStartJob: canStartJob, using: dependencies) }
) -> Job? { return instance.upsert(db, job: job, canStartJob: canStartJob, using: dependencies) }
@discardableResult static func insert(
_ db: Database,

Loading…
Cancel
Save