From c455a13a7b53f2f25314cbd6ca6ef0729ff3d02d Mon Sep 17 00:00:00 2001 From: Morgan Pretty Date: Wed, 31 May 2023 14:30:33 +1000 Subject: [PATCH] Finished adding logic to ignore messages invalidated by config Added timestamps to the Profile table to avoid overriding current profile info with older info Updated the MessageReceiver to ignore the rest of the messages invalidated by the config Updated to the latest libSession Updated the JobRunner to expose some more info about the currently running jobs Made some tweaks to the ConfigurationSyncJob to better support concurrent jobs running for different targets --- LibSession-Util | 2 +- Session/Onboarding/Onboarding.swift | 17 ++- Session/Utilities/MockDataGenerator.swift | 12 +- .../Migrations/_003_YDBToGRDBMigration.swift | 12 +- .../Migrations/_013_SessionUtilChanges.swift | 10 ++ .../Database/Models/Profile.swift | 29 +++- .../Jobs/Types/AttachmentDownloadJob.swift | 6 +- .../Jobs/Types/ConfigMessageReceiveJob.swift | 29 +++- .../Jobs/Types/ConfigurationSyncJob.swift | 51 ++++--- .../Errors/MessageReceiverError.swift | 4 +- .../MessageReceiver+Calls.swift | 25 +++- .../MessageReceiver+ClosedGroups.swift | 26 +++- ...eReceiver+DataExtractionNotification.swift | 42 +++++- .../MessageReceiver+ExpirationTimers.swift | 59 +++++--- .../MessageReceiver+MessageRequests.swift | 35 ++++- .../MessageReceiver+VisibleMessages.swift | 28 +++- .../SessionUtil+Contacts.swift | 32 +++- .../Config Handling/SessionUtil+Shared.swift | 2 +- .../SessionUtil/SessionUtil.swift | 3 +- .../Utilities/ProfileManager.swift | 34 ++--- .../Components/ProfilePictureView.swift | 2 +- .../Database/Models/JobDependencies.swift | 4 +- .../General/SNUserDefaults.swift | 2 - SessionUtilitiesKit/JobRunner/JobRunner.swift | 141 ++++++++++++++---- 24 files changed, 466 insertions(+), 141 deletions(-) diff --git a/LibSession-Util b/LibSession-Util index 97084c69f..9777b37e8 160000 --- a/LibSession-Util +++ b/LibSession-Util @@ -1 +1 @@ -Subproject commit 97084c69f86e67c675095b48efacc86113ccebb0 +Subproject commit 9777b37e8545febcc082578341352dba7433db21 diff --git a/Session/Onboarding/Onboarding.swift b/Session/Onboarding/Onboarding.swift index 8c664c045..914976728 100644 --- a/Session/Onboarding/Onboarding.swift +++ b/Session/Onboarding/Onboarding.swift @@ -219,11 +219,18 @@ enum Onboarding { } func completeRegistration() { - // Set the `lastDisplayNameUpdate` to the current date, so that we don't - // overwrite what the user set in the display name step with whatever we - // find in their swarm (otherwise the user could enter a display name and - // have it immediately overwritten due to the config request running slow) - UserDefaults.standard[.lastDisplayNameUpdate] = Date() + // Set the `lastNameUpdate` to the current date, so that we don't overwrite + // what the user set in the display name step with whatever we find in their + // swarm (otherwise the user could enter a display name and have it immediately + // overwritten due to the config request running slow) + Storage.shared.write { db in + try Profile + .filter(id: getUserHexEncodedPublicKey(db)) + .updateAllAndConfig( + db, + Profile.Columns.lastNameUpdate.set(to: Date().timeIntervalSince1970) + ) + } // Notify the app that registration is complete Identity.didRegister() diff --git a/Session/Utilities/MockDataGenerator.swift b/Session/Utilities/MockDataGenerator.swift index e20b9b1e9..d4ab94abc 100644 --- a/Session/Utilities/MockDataGenerator.swift +++ b/Session/Utilities/MockDataGenerator.swift @@ -158,7 +158,9 @@ enum MockDataGenerator { id: randomSessionId, name: (0.. = JobRunner - .defailsForCurrentlyRunningJobs(of: .attachmentDownload) + .infoForCurrentlyRunningJobs(of: .attachmentDownload) .filter { key, _ in key != job.id } .values - .compactMap { data -> String? in - guard let data: Data = data else { return nil } + .compactMap { info -> String? in + guard let data: Data = info.detailsData else { return nil } return (try? JSONDecoder().decode(Details.self, from: data))? .attachmentId diff --git a/SessionMessagingKit/Jobs/Types/ConfigMessageReceiveJob.swift b/SessionMessagingKit/Jobs/Types/ConfigMessageReceiveJob.swift index c5f0ae21f..29fe85ab0 100644 --- a/SessionMessagingKit/Jobs/Types/ConfigMessageReceiveJob.swift +++ b/SessionMessagingKit/Jobs/Types/ConfigMessageReceiveJob.swift @@ -16,17 +16,38 @@ public enum ConfigMessageReceiveJob: JobExecutor { failure: @escaping (Job, Error?, Bool) -> (), deferred: @escaping (Job) -> () ) { + /// When the `configMessageReceive` job fails we want to unblock any `messageReceive` jobs it was blocking + /// to ensure the user isn't losing any messages - this generally _shouldn't_ happen but if it does then having a temporary + /// "outdated" state due to standard messages which would have been invalidated by a config change incorrectly being + /// processed is less severe then dropping a bunch on messages just because they were processed in the same poll as + /// invalid config messages + let removeDependencyOnMessageReceiveJobs: () -> () = { + guard let jobId: Int64 = job.id else { return } + + Storage.shared.write { db in + try JobDependencies + .filter(JobDependencies.Columns.dependantId == jobId) + .joining( + required: JobDependencies.job + .filter(Job.Columns.variant == Job.Variant.messageReceive) + ) + .deleteAll(db) + } + } + guard let detailsData: Data = job.details, let details: Details = try? JSONDecoder().decode(Details.self, from: detailsData) else { + removeDependencyOnMessageReceiveJobs() failure(job, JobRunnerError.missingRequiredDetails, true) return } - + // Ensure no standard messages are sent through this job guard !details.messages.contains(where: { $0.variant != .sharedConfigMessage }) else { SNLog("[ConfigMessageReceiveJob] Standard messages incorrectly sent to the 'configMessageReceive' job") + removeDependencyOnMessageReceiveJobs() failure(job, MessageReceiverError.invalidMessage, true) return } @@ -49,8 +70,10 @@ public enum ConfigMessageReceiveJob: JobExecutor { // Handle the result switch lastError { - case let error as MessageReceiverError where !error.isRetryable: failure(job, error, true) - case .some(let error): failure(job, error, false) + case .some(let error): + removeDependencyOnMessageReceiveJobs() + failure(job, error, true) + case .none: success(job, false) } } diff --git a/SessionMessagingKit/Jobs/Types/ConfigurationSyncJob.swift b/SessionMessagingKit/Jobs/Types/ConfigurationSyncJob.swift index 7dbde8b4a..e7eee9f34 100644 --- a/SessionMessagingKit/Jobs/Types/ConfigurationSyncJob.swift +++ b/SessionMessagingKit/Jobs/Types/ConfigurationSyncJob.swift @@ -9,7 +9,7 @@ import SessionUtilitiesKit public enum ConfigurationSyncJob: JobExecutor { public static let maxFailureCount: Int = -1 - public static let requiresThreadId: Bool = false + public static let requiresThreadId: Bool = true public static let requiresInteractionId: Bool = false private static let maxRunFrequency: TimeInterval = 3 @@ -25,13 +25,29 @@ public enum ConfigurationSyncJob: JobExecutor { Identity.userCompletedRequiredOnboarding() else { return success(job, true) } - // On startup it's possible for multiple ConfigSyncJob's to run at the same time (which is - // redundant) so check if there is another job already running and, if so, defer this job - let jobDetails: [Int64: Data?] = JobRunner.defailsForCurrentlyRunningJobs(of: .configurationSync) - - guard jobDetails.setting(job.id, nil).count == 0 else { - deferred(job) // We will re-enqueue when needed - return + // It's possible for multiple ConfigSyncJob's with the same target (user/group) to try to run at the + // same time since as soon as one is started we will enqueue a second one, rather than adding dependencies + // between the jobs we just continue to defer the subsequent job while the first one is running in + // order to prevent multiple configurationSync jobs with the same target from running at the same time + guard + JobRunner + .infoForCurrentlyRunningJobs(of: .configurationSync) + .filter({ key, info in + key != job.id && // Exclude this job + info.threadId == job.threadId // Exclude jobs for different ids + }) + .isEmpty + else { + // Defer the job to run 'maxRunFrequency' from when this one ran (if we don't it'll try start + // it again immediately which is pointless) + let updatedJob: Job? = Storage.shared.write { db in + try job + .with(nextRunTimestamp: Date().timeIntervalSince1970 + maxRunFrequency) + .saved(db) + } + + SNLog("[ConfigurationSyncJob] For \(job.threadId ?? "UnknownId") deferred due to in progress job") + return deferred(updatedJob ?? job) } // If we don't have a userKeyPair yet then there is no need to sync the configuration @@ -42,16 +58,15 @@ public enum ConfigurationSyncJob: JobExecutor { let pendingConfigChanges: [SessionUtil.OutgoingConfResult] = Storage.shared .read({ db in try SessionUtil.pendingChanges(db, publicKey: publicKey) }) else { - failure(job, StorageError.generic, false) - return + SNLog("[ConfigurationSyncJob] For \(job.threadId ?? "UnknownId") failed due to invalid data") + return failure(job, StorageError.generic, false) } // If there are no pending changes then the job can just complete (next time something // is updated we want to try and run immediately so don't scuedule another run in this case) guard !pendingConfigChanges.isEmpty else { - SNLog("[ConfigurationSyncJob] Completed with no pending changes") - success(job, true) - return + SNLog("[ConfigurationSyncJob] For \(publicKey) completed with no pending changes") + return success(job, true) } // Identify the destination and merge all obsolete hashes into a single set @@ -63,6 +78,8 @@ public enum ConfigurationSyncJob: JobExecutor { .map { $0.obsoleteHashes } .reduce([], +) .asSet() + let jobStartTimestamp: TimeInterval = Date().timeIntervalSince1970 + SNLog("[ConfigurationSyncJob] For \(publicKey) started with \(pendingConfigChanges.count) change\(pendingConfigChanges.count == 1 ? "" : "s")") Storage.shared .readPublisher { db in @@ -119,9 +136,9 @@ public enum ConfigurationSyncJob: JobExecutor { .sinkUntilComplete( receiveCompletion: { result in switch result { - case .finished: SNLog("[ConfigurationSyncJob] Completed") + case .finished: SNLog("[ConfigurationSyncJob] For \(publicKey) completed") case .failure(let error): - SNLog("[ConfigurationSyncJob] Failed due to error: \(error)") + SNLog("[ConfigurationSyncJob] For \(publicKey) failed due to error: \(error)") failure(job, error, false) } }, @@ -137,7 +154,7 @@ public enum ConfigurationSyncJob: JobExecutor { // When we complete the 'ConfigurationSync' job we want to immediately schedule // another one with a 'nextRunTimestamp' set to the 'maxRunFrequency' value to // throttle the config sync requests - let nextRunTimestamp: TimeInterval = (Date().timeIntervalSince1970 + maxRunFrequency) + let nextRunTimestamp: TimeInterval = (jobStartTimestamp + maxRunFrequency) // If another 'ConfigurationSync' job was scheduled then update that one // to run at 'nextRunTimestamp' and make the current job stop @@ -146,6 +163,7 @@ public enum ConfigurationSyncJob: JobExecutor { .filter(Job.Columns.id != job.id) .filter(Job.Columns.variant == Job.Variant.configurationSync) .filter(Job.Columns.threadId == publicKey) + .order(Job.Columns.nextRunTimestamp.asc) .fetchOne(db) { // If the next job isn't currently running then delay it's start time @@ -175,7 +193,6 @@ public enum ConfigurationSyncJob: JobExecutor { // MARK: - Convenience public extension ConfigurationSyncJob { - static func enqueue(_ db: Database, publicKey: String) { // FIXME: Remove this once `useSharedUtilForUserConfig` is permanent guard SessionUtil.userConfigsEnabled(db) else { diff --git a/SessionMessagingKit/Sending & Receiving/Errors/MessageReceiverError.swift b/SessionMessagingKit/Sending & Receiving/Errors/MessageReceiverError.swift index 5a89819cb..bcc097efe 100644 --- a/SessionMessagingKit/Sending & Receiving/Errors/MessageReceiverError.swift +++ b/SessionMessagingKit/Sending & Receiving/Errors/MessageReceiverError.swift @@ -21,13 +21,14 @@ public enum MessageReceiverError: LocalizedError { case noGroupKeyPair case invalidSharedConfigMessageHandling case requiredThreadNotInConfig + case outdatedMessage public var isRetryable: Bool { switch self { case .duplicateMessage, .duplicateMessageNewSnode, .duplicateControlMessage, .invalidMessage, .unknownMessage, .unknownEnvelopeType, .invalidSignature, .noData, .senderBlocked, .noThread, .selfSend, .decryptionFailed, - .invalidSharedConfigMessageHandling, .requiredThreadNotInConfig: + .invalidSharedConfigMessageHandling, .requiredThreadNotInConfig, .outdatedMessage: return false default: return true @@ -57,6 +58,7 @@ public enum MessageReceiverError: LocalizedError { case .invalidSharedConfigMessageHandling: return "Invalid handling of a shared config message." case .requiredThreadNotInConfig: return "Required thread not in config." + case .outdatedMessage: return "Message was sent before a config change which would have removed the message." } } } diff --git a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+Calls.swift b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+Calls.swift index 8737f2643..dafc634d2 100644 --- a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+Calls.swift +++ b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+Calls.swift @@ -13,8 +13,31 @@ extension MessageReceiver { threadVariant: SessionThread.Variant, message: CallMessage ) throws { + let timestampMs: Int64 = (message.sentTimestamp.map { Int64($0) } ?? SnodeAPI.currentOffsetTimestampMs()) + // Only support calls from contact threads - guard threadVariant == .contact else { return } + guard + threadVariant == .contact, + /// Only process the message if the thread `shouldBeVisible` or it was sent after the libSession buffer period + ( + SessionThread + .filter(id: threadId) + .filter(SessionThread.Columns.shouldBeVisible == true) + .isNotEmpty(db) || + SessionUtil.conversationInConfig( + db, + threadId: threadId, + threadVariant: threadVariant, + visibleOnly: true + ) || + SessionUtil.canPerformChange( + db, + threadId: threadId, + targetConfig: .contacts, + changeTimestampMs: timestampMs + ) + ) + else { return } switch message.kind { case .preOffer: try MessageReceiver.handleNewCallMessage(db, message: message) diff --git a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+ClosedGroups.swift b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+ClosedGroups.swift index 0fb862067..e413db82c 100644 --- a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+ClosedGroups.swift +++ b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+ClosedGroups.swift @@ -77,7 +77,28 @@ extension MessageReceiver { targetConfig: .userGroups, changeTimestampMs: Int64(sentTimestamp) ) - else { return SNLog("Ignoring outdated NEW legacy group message due to more recent config state") } + else { + // If the closed group already exists then store the encryption keys (just in case - there can be + // some weird edge-cases where we don't have keys we need if we don't store them) + let groupPublicKey: String = publicKeyAsData.toHexString() + let receivedTimestamp: TimeInterval = (TimeInterval(SnodeAPI.currentOffsetTimestampMs()) / 1000) + let newKeyPair: ClosedGroupKeyPair = ClosedGroupKeyPair( + threadId: groupPublicKey, + publicKey: Data(encryptionKeyPair.publicKey), + secretKey: Data(encryptionKeyPair.secretKey), + receivedTimestamp: receivedTimestamp + ) + + guard + ClosedGroup.filter(id: groupPublicKey).isNotEmpty(db), + !ClosedGroupKeyPair + .filter(ClosedGroupKeyPair.Columns.threadKeyPairHash == newKeyPair.threadKeyPairHash) + .isNotEmpty(db) + else { return SNLog("Ignoring outdated NEW legacy group message due to more recent config state") } + + try newKeyPair.insert(db) + return + } try handleNewClosedGroup( db, @@ -591,9 +612,10 @@ extension MessageReceiver { message.sentTimestamp.map { Int64($0) } ?? SnodeAPI.currentOffsetTimestampMs() ) + // Only actually make the change if SessionUtil says we can (we always want to insert the info // message though) - if SessionUtil.canPerformChange(db, threadId: threadId, targetConfig: .userGroups, changeTimestampMs: timestampMs ) { + if SessionUtil.canPerformChange(db, threadId: threadId, targetConfig: .userGroups, changeTimestampMs: timestampMs) { // Legacy groups used these control messages for making changes, new groups only use them // for information purposes switch threadVariant { diff --git a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+DataExtractionNotification.swift b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+DataExtractionNotification.swift index c1edbbe6a..e9397be26 100644 --- a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+DataExtractionNotification.swift +++ b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+DataExtractionNotification.swift @@ -3,6 +3,7 @@ import Foundation import GRDB import SessionSnodeKit +import SessionUtilitiesKit extension MessageReceiver { internal static func handleDataExtractionNotification( @@ -11,11 +12,45 @@ extension MessageReceiver { threadVariant: SessionThread.Variant, message: DataExtractionNotification ) throws { + let timestampMs: Int64 = ( + message.sentTimestamp.map { Int64($0) } ?? + SnodeAPI.currentOffsetTimestampMs() + ) + guard threadVariant == .contact, let sender: String = message.sender, let messageKind: DataExtractionNotification.Kind = message.kind - else { return } + else { throw MessageReceiverError.invalidMessage } + + /// Only process the message if the thread `shouldBeVisible` or it was sent after the libSession buffer period + guard + SessionThread + .filter(id: threadId) + .filter(SessionThread.Columns.shouldBeVisible == true) + .isNotEmpty(db) || + SessionUtil.conversationInConfig( + db, + threadId: threadId, + threadVariant: threadVariant, + visibleOnly: true + ) || + SessionUtil.canPerformChange( + db, + threadId: threadId, + targetConfig: { + switch threadVariant { + case .contact: + let currentUserPublicKey: String = getUserHexEncodedPublicKey(db) + + return (threadId == currentUserPublicKey ? .userProfile : .contacts) + + default: return .userGroups + } + }(), + changeTimestampMs: timestampMs + ) + else { throw MessageReceiverError.outdatedMessage } _ = try Interaction( serverHash: message.serverHash, @@ -27,10 +62,7 @@ extension MessageReceiver { case .mediaSaved: return .infoMediaSavedNotification } }(), - timestampMs: ( - message.sentTimestamp.map { Int64($0) } ?? - SnodeAPI.currentOffsetTimestampMs() - ) + timestampMs: timestampMs ).inserted(db) } } diff --git a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+ExpirationTimers.swift b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+ExpirationTimers.swift index 1d8ef1033..fbec8536b 100644 --- a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+ExpirationTimers.swift +++ b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+ExpirationTimers.swift @@ -15,9 +15,9 @@ extension MessageReceiver { // Only process these for contact and legacy groups (new groups handle it separately) (threadVariant == .contact || threadVariant == .legacyGroup), let sender: String = message.sender - else { return } + else { throw MessageReceiverError.invalidMessage } - // Update the configuration + // Generate an updated configuration // // Note: Messages which had been sent during the previous configuration will still // use it's settings (so if you enable, send a message and then disable disappearing @@ -34,18 +34,40 @@ extension MessageReceiver { DisappearingMessagesConfiguration.defaultDuration ) ) + let timestampMs: Int64 = Int64(message.sentTimestamp ?? 0) // Default to `0` if not set - // Legacy closed groups need to update the SessionUtil - switch threadVariant { - case .legacyGroup: - try SessionUtil - .update( - db, - groupPublicKey: threadId, - disappearingConfig: config - ) - - default: break + // Only actually make the change if SessionUtil says we can (we always want to insert the info + // message though) + let canPerformChange: Bool = SessionUtil.canPerformChange( + db, + threadId: threadId, + targetConfig: { + switch threadVariant { + case .contact: + let currentUserPublicKey: String = getUserHexEncodedPublicKey(db) + + return (threadId == currentUserPublicKey ? .userProfile : .contacts) + + default: return .userGroups + } + }(), + changeTimestampMs: timestampMs + ) + + // Only update libSession if we can perform the change + if canPerformChange { + // Legacy closed groups need to update the SessionUtil + switch threadVariant { + case .legacyGroup: + try SessionUtil + .update( + db, + groupPublicKey: threadId, + disappearingConfig: config + ) + + default: break + } } // Add an info message for the user @@ -60,11 +82,14 @@ extension MessageReceiver { nil ) ), - timestampMs: Int64(message.sentTimestamp ?? 0) // Default to `0` if not set + timestampMs: timestampMs ).inserted(db) - // Finally save the changes to the DisappearingMessagesConfiguration (If it's a duplicate - // then the interaction unique constraint will prevent the code from getting here) - try config.save(db) + // Only save the updated config if we can perform the change + if canPerformChange { + // Finally save the changes to the DisappearingMessagesConfiguration (If it's a duplicate + // then the interaction unique constraint will prevent the code from getting here) + try config.save(db) + } } } diff --git a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+MessageRequests.swift b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+MessageRequests.swift index 16968da4a..fbaad1124 100644 --- a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+MessageRequests.swift +++ b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+MessageRequests.swift @@ -13,11 +13,37 @@ extension MessageReceiver { dependencies: SMKDependencies ) throws { let userPublicKey = getUserHexEncodedPublicKey(db, dependencies: dependencies) + let timestampMs: Int64 = ( + message.sentTimestamp.map { Int64($0) } ?? + SnodeAPI.currentOffsetTimestampMs() + ) var blindedContactIds: [String] = [] // Ignore messages which were sent from the current user - guard message.sender != userPublicKey else { return } - guard let senderId: String = message.sender else { return } + guard + message.sender != userPublicKey, + let senderId: String = message.sender + else { throw MessageReceiverError.invalidMessage } + + /// Only process the message if the thread `shouldBeVisible` or it was sent after the libSession buffer period + guard + SessionThread + .filter(id: senderId) + .filter(SessionThread.Columns.shouldBeVisible == true) + .isNotEmpty(db) || + SessionUtil.conversationInConfig( + db, + threadId: senderId, + threadVariant: .contact, + visibleOnly: true + ) || + SessionUtil.canPerformChange( + db, + threadId: senderId, + targetConfig: .contacts, + changeTimestampMs: timestampMs + ) + else { throw MessageReceiverError.outdatedMessage } // Update profile if needed (want to do this regardless of whether the message exists or // not to ensure the profile info gets sync between a users devices at every chance) @@ -134,10 +160,7 @@ extension MessageReceiver { threadId: unblindedThread.id, authorId: senderId, variant: .infoMessageRequestAccepted, - timestampMs: ( - message.sentTimestamp.map { Int64($0) } ?? - SnodeAPI.currentOffsetTimestampMs() - ) + timestampMs: timestampMs ).inserted(db) } diff --git a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+VisibleMessages.swift b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+VisibleMessages.swift index a891bb3ba..d41cf4f40 100644 --- a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+VisibleMessages.swift +++ b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+VisibleMessages.swift @@ -3,6 +3,7 @@ import Foundation import GRDB import Sodium +import SessionSnodeKit import SessionUtilitiesKit extension MessageReceiver { @@ -22,6 +23,32 @@ extension MessageReceiver { // seconds to maintain the accuracy) let messageSentTimestamp: TimeInterval = (TimeInterval(message.sentTimestamp ?? 0) / 1000) let isMainAppActive: Bool = (UserDefaults.sharedLokiProject?[.isMainAppActive]).defaulting(to: false) + let currentUserPublicKey: String = getUserHexEncodedPublicKey(db, dependencies: dependencies) + + /// Only process the message if the thread `shouldBeVisible` or it was sent after the libSession buffer period + guard + SessionThread + .filter(id: threadId) + .filter(SessionThread.Columns.shouldBeVisible == true) + .isNotEmpty(db) || + SessionUtil.conversationInConfig( + db, + threadId: threadId, + threadVariant: threadVariant, + visibleOnly: true + ) || + SessionUtil.canPerformChange( + db, + threadId: threadId, + targetConfig: { + switch threadVariant { + case .contact: return (threadId == currentUserPublicKey ? .userProfile : .contacts) + default: return .userGroups + } + }(), + changeTimestampMs: (message.sentTimestamp.map { Int64($0) } ?? SnodeAPI.currentOffsetTimestampMs()) + ) + else { throw MessageReceiverError.outdatedMessage } // Update profile if needed (want to do this regardless of whether the message exists or // not to ensure the profile info gets sync between a users devices at every chance) @@ -63,7 +90,6 @@ extension MessageReceiver { } // Store the message variant so we can run variant-specific behaviours - let currentUserPublicKey: String = getUserHexEncodedPublicKey(db, dependencies: dependencies) let thread: SessionThread = try SessionThread .fetchOrCreate(db, id: threadId, variant: threadVariant, shouldBeVisible: nil) let maybeOpenGroup: OpenGroup? = { diff --git a/SessionMessagingKit/SessionUtil/Config Handling/SessionUtil+Contacts.swift b/SessionMessagingKit/SessionUtil/Config Handling/SessionUtil+Contacts.swift index 63301fa5b..66fe34ec3 100644 --- a/SessionMessagingKit/SessionUtil/Config Handling/SessionUtil+Contacts.swift +++ b/SessionMessagingKit/SessionUtil/Config Handling/SessionUtil+Contacts.swift @@ -31,7 +31,8 @@ internal extension SessionUtil { static func handleContactsUpdate( _ db: Database, in conf: UnsafeMutablePointer?, - mergeNeedsDump: Bool + mergeNeedsDump: Bool, + latestConfigSentTimestampMs: Int64 ) throws { typealias ContactData = [ String: ( @@ -64,6 +65,7 @@ internal extension SessionUtil { let profileResult: Profile = Profile( id: contactId, name: String(libSessionVal: contact.name), + lastNameUpdate: (TimeInterval(latestConfigSentTimestampMs) / 1000), nickname: String(libSessionVal: contact.nickname, nullIfEmpty: true), profilePictureUrl: profilePictureUrl, profileEncryptionKey: (profilePictureUrl == nil ? nil : @@ -71,7 +73,8 @@ internal extension SessionUtil { libSessionVal: contact.profile_pic.key, count: ProfileManager.avatarAES256KeyByteLength ) - ) + ), + lastProfilePictureUpdate: (TimeInterval(latestConfigSentTimestampMs) / 1000) ) contactData[contactId] = ( @@ -99,12 +102,23 @@ internal extension SessionUtil { // observation system can't differ between update calls which do and don't change anything) let contact: Contact = Contact.fetchOrCreate(db, id: sessionId) let profile: Profile = Profile.fetchOrCreate(db, id: sessionId) + let profileNameShouldBeUpdated: Bool = ( + !data.profile.name.isEmpty && + profile.name != data.profile.name && + profile.lastNameUpdate < data.profile.lastNameUpdate + ) + let profilePictureShouldBeUpdated: Bool = ( + ( + profile.profilePictureUrl != data.profile.profilePictureUrl || + profile.profileEncryptionKey != data.profile.profileEncryptionKey + ) && + profile.lastProfilePictureUpdate < data.profile.lastProfilePictureUpdate + ) if - (!data.profile.name.isEmpty && profile.name != data.profile.name) || + profileNameShouldBeUpdated || profile.nickname != data.profile.nickname || - profile.profilePictureUrl != data.profile.profilePictureUrl || - profile.profileEncryptionKey != data.profile.profileEncryptionKey + profilePictureShouldBeUpdated { try profile.save(db) try Profile @@ -112,9 +126,12 @@ internal extension SessionUtil { .updateAll( // Handling a config update so don't use `updateAllAndConfig` db, [ - (data.profile.name.isEmpty || profile.name == data.profile.name ? nil : + (!profileNameShouldBeUpdated ? nil : Profile.Columns.name.set(to: data.profile.name) ), + (!profileNameShouldBeUpdated ? nil : + Profile.Columns.lastNameUpdate.set(to: data.profile.lastNameUpdate) + ), (profile.nickname == data.profile.nickname ? nil : Profile.Columns.nickname.set(to: data.profile.nickname) ), @@ -123,6 +140,9 @@ internal extension SessionUtil { ), (profile.profileEncryptionKey != data.profile.profileEncryptionKey ? nil : Profile.Columns.profileEncryptionKey.set(to: data.profile.profileEncryptionKey) + ), + (!profilePictureShouldBeUpdated ? nil : + Profile.Columns.lastProfilePictureUpdate.set(to: data.profile.lastProfilePictureUpdate) ) ].compactMap { $0 } ) diff --git a/SessionMessagingKit/SessionUtil/Config Handling/SessionUtil+Shared.swift b/SessionMessagingKit/SessionUtil/Config Handling/SessionUtil+Shared.swift index 3ea0d9194..f6d6e74a5 100644 --- a/SessionMessagingKit/SessionUtil/Config Handling/SessionUtil+Shared.swift +++ b/SessionMessagingKit/SessionUtil/Config Handling/SessionUtil+Shared.swift @@ -326,7 +326,7 @@ internal extension SessionUtil { .defaulting(to: 0) // Ensure the change occurred after the last config message was handled (minus the buffer period) - return (changeTimestampMs > (configDumpTimestampMs - Int64(SessionUtil.configChangeBufferPeriod * 1000))) + return (changeTimestampMs >= (configDumpTimestampMs - Int64(SessionUtil.configChangeBufferPeriod * 1000))) } } diff --git a/SessionMessagingKit/SessionUtil/SessionUtil.swift b/SessionMessagingKit/SessionUtil/SessionUtil.swift index 391a4a45a..de5e22d45 100644 --- a/SessionMessagingKit/SessionUtil/SessionUtil.swift +++ b/SessionMessagingKit/SessionUtil/SessionUtil.swift @@ -461,7 +461,8 @@ public enum SessionUtil { try SessionUtil.handleContactsUpdate( db, in: conf, - mergeNeedsDump: config_needs_dump(conf) + mergeNeedsDump: config_needs_dump(conf), + latestConfigSentTimestampMs: latestConfigSentTimestampMs ) case .convoInfoVolatile: diff --git a/SessionMessagingKit/Utilities/ProfileManager.swift b/SessionMessagingKit/Utilities/ProfileManager.swift index 4b5708675..ffbff0aeb 100644 --- a/SessionMessagingKit/Utilities/ProfileManager.swift +++ b/SessionMessagingKit/Utilities/ProfileManager.swift @@ -495,47 +495,28 @@ public struct ProfileManager { // Name if let name: String = name, !name.isEmpty, name != profile.name { - let shouldUpdate: Bool = { - guard isCurrentUser else { return true } - - return UserDefaults.standard[.lastDisplayNameUpdate] - .map { sentTimestamp > $0.timeIntervalSince1970 } - .defaulting(to: true) - }() - - if shouldUpdate { - if isCurrentUser { - UserDefaults.standard[.lastDisplayNameUpdate] = Date(timeIntervalSince1970: sentTimestamp) - } - + // FIXME: Remove the `userConfigsEnabled` check once `useSharedUtilForUserConfig` is permanent + if sentTimestamp > profile.lastNameUpdate || (isCurrentUser && (calledFromConfigHandling || !SessionUtil.userConfigsEnabled(db))) { profileChanges.append(Profile.Columns.name.set(to: name)) + profileChanges.append(Profile.Columns.lastNameUpdate.set(to: sentTimestamp)) } } // Profile picture & profile key var avatarNeedsDownload: Bool = false var targetAvatarUrl: String? = nil - let shouldUpdateAvatar: Bool = { - guard isCurrentUser else { return true } - - return UserDefaults.standard[.lastProfilePictureUpdate] - .map { sentTimestamp > $0.timeIntervalSince1970 } - .defaulting(to: true) - }() - if shouldUpdateAvatar { + // FIXME: Remove the `userConfigsEnabled` check once `useSharedUtilForUserConfig` is permanent + if sentTimestamp > profile.lastProfilePictureUpdate || (isCurrentUser && (calledFromConfigHandling || !SessionUtil.userConfigsEnabled(db))) { switch avatarUpdate { case .none: break case .uploadImageData: preconditionFailure("Invalid options for this function") case .remove: - if isCurrentUser { - UserDefaults.standard[.lastProfilePictureUpdate] = Date(timeIntervalSince1970: sentTimestamp) - } - profileChanges.append(Profile.Columns.profilePictureUrl.set(to: nil)) profileChanges.append(Profile.Columns.profileEncryptionKey.set(to: nil)) profileChanges.append(Profile.Columns.profilePictureFileName.set(to: nil)) + profileChanges.append(Profile.Columns.lastProfilePictureUpdate.set(to: sentTimestamp)) case .updateTo(let url, let key, let fileName): if url != profile.profilePictureUrl { @@ -558,6 +539,9 @@ public struct ProfileManager { !ProfileManager.hasProfileImageData(with: fileName) ) } + + // Update the 'lastProfilePictureUpdate' timestamp for either external or local changes + profileChanges.append(Profile.Columns.lastProfilePictureUpdate.set(to: sentTimestamp)) } } diff --git a/SessionUIKit/Components/ProfilePictureView.swift b/SessionUIKit/Components/ProfilePictureView.swift index 1ffe575a7..b05e69606 100644 --- a/SessionUIKit/Components/ProfilePictureView.swift +++ b/SessionUIKit/Components/ProfilePictureView.swift @@ -65,7 +65,7 @@ public final class ProfilePictureView: UIView { var iconSize: CGFloat { switch self { - case .navigation, .message: return 8 + case .navigation, .message: return 10 // Intentionally not a multiple of 4 case .list: return 16 case .hero: return 24 } diff --git a/SessionUtilitiesKit/Database/Models/JobDependencies.swift b/SessionUtilitiesKit/Database/Models/JobDependencies.swift index 16201367b..bca762c5a 100644 --- a/SessionUtilitiesKit/Database/Models/JobDependencies.swift +++ b/SessionUtilitiesKit/Database/Models/JobDependencies.swift @@ -7,8 +7,8 @@ public struct JobDependencies: Codable, Equatable, Hashable, FetchableRecord, Pe public static var databaseTableName: String { "jobDependencies" } internal static let jobForeignKey = ForeignKey([Columns.jobId], to: [Job.Columns.id]) internal static let dependantForeignKey = ForeignKey([Columns.dependantId], to: [Job.Columns.id]) - internal static let job = belongsTo(Job.self, using: jobForeignKey) - internal static let dependant = hasOne(Job.self, using: Job.dependencyForeignKey) + public static let job = belongsTo(Job.self, using: jobForeignKey) + public static let dependant = hasOne(Job.self, using: Job.dependencyForeignKey) public typealias Columns = CodingKeys public enum CodingKeys: String, CodingKey, ColumnExpression { diff --git a/SessionUtilitiesKit/General/SNUserDefaults.swift b/SessionUtilitiesKit/General/SNUserDefaults.swift index 82f18bd64..bc55e8231 100644 --- a/SessionUtilitiesKit/General/SNUserDefaults.swift +++ b/SessionUtilitiesKit/General/SNUserDefaults.swift @@ -38,8 +38,6 @@ public enum SNUserDefaults { public enum Date: Swift.String { case lastConfigurationSync - case lastDisplayNameUpdate - case lastProfilePictureUpdate case lastProfilePictureUpload case lastOpenGroupImageUpdate case lastOpen diff --git a/SessionUtilitiesKit/JobRunner/JobRunner.swift b/SessionUtilitiesKit/JobRunner/JobRunner.swift index 19a750f26..48752c5bf 100644 --- a/SessionUtilitiesKit/JobRunner/JobRunner.swift +++ b/SessionUtilitiesKit/JobRunner/JobRunner.swift @@ -42,6 +42,12 @@ public final class JobRunner { case notFound } + public struct JobInfo { + public let threadId: String? + public let interactionId: Int64? + public let detailsData: Data? + } + private static let blockingQueue: Atomic = Atomic( JobQueue( type: .blocking, @@ -381,8 +387,8 @@ public final class JobRunner { return (queues.wrappedValue[job.variant]?.isCurrentlyRunning(jobId) == true) } - public static func defailsForCurrentlyRunningJobs(of variant: Job.Variant) -> [Int64: Data?] { - return (queues.wrappedValue[variant]?.detailsForAllCurrentlyRunningJobs()) + public static func infoForCurrentlyRunningJobs(of variant: Job.Variant) -> [Int64: JobInfo] { + return (queues.wrappedValue[variant]?.infoForAllCurrentlyRunningJobs()) .defaulting(to: [:]) } @@ -395,11 +401,24 @@ public final class JobRunner { queue.afterCurrentlyRunningJob(jobId, callback: callback) } - public static func hasPendingOrRunningJob(with variant: Job.Variant, details: T) -> Bool { + public static func hasPendingOrRunningJob( + with variant: Job.Variant, + threadId: String? = nil, + interactionId: Int64? = nil, + details: T? = nil + ) -> Bool { guard let targetQueue: JobQueue = queues.wrappedValue[variant] else { return false } - guard let detailsData: Data = try? JSONEncoder().encode(details) else { return false } - return targetQueue.hasPendingOrRunningJob(with: detailsData) + // Ensure we can encode the details (if provided) + let detailsData: Data? = details.map { try? JSONEncoder().encode($0) } + + guard details == nil || detailsData != nil else { return false } + + return targetQueue.hasPendingOrRunningJobWith( + threadId: threadId, + interactionId: interactionId, + detailsData: detailsData + ) } public static func removePendingJob(_ job: Job?) { @@ -513,9 +532,9 @@ private final class JobQueue { private var nextTrigger: Atomic = Atomic(nil) fileprivate var isRunning: Atomic = Atomic(false) private var queue: Atomic<[Job]> = Atomic([]) - private var jobsCurrentlyRunning: Atomic> = Atomic([]) private var jobCallbacks: Atomic<[Int64: [(JobRunner.JobResult) -> ()]]> = Atomic([:]) - private var detailsForCurrentlyRunningJobs: Atomic<[Int64: Data?]> = Atomic([:]) + private var currentlyRunningJobIds: Atomic> = Atomic([]) + private var currentlyRunningJobInfo: Atomic<[Int64: JobRunner.JobInfo]> = Atomic([:]) private var deferLoopTracker: Atomic<[Int64: (count: Int, times: [TimeInterval])]> = Atomic([:]) fileprivate var hasPendingJobs: Bool { !queue.wrappedValue.isEmpty } @@ -620,7 +639,7 @@ private final class JobQueue { } fileprivate func appDidBecomeActive(with jobs: [Job], canStart: Bool) { - let currentlyRunningJobIds: Set = jobsCurrentlyRunning.wrappedValue + let currentlyRunningJobIds: Set = currentlyRunningJobIds.wrappedValue queue.mutate { queue in // Avoid re-adding jobs to the queue that are already in it (this can @@ -642,11 +661,11 @@ private final class JobQueue { } fileprivate func isCurrentlyRunning(_ jobId: Int64) -> Bool { - return jobsCurrentlyRunning.wrappedValue.contains(jobId) + return currentlyRunningJobIds.wrappedValue.contains(jobId) } - fileprivate func detailsForAllCurrentlyRunningJobs() -> [Int64: Data?] { - return detailsForCurrentlyRunningJobs.wrappedValue + fileprivate func infoForAllCurrentlyRunningJobs() -> [Int64: JobRunner.JobInfo] { + return currentlyRunningJobInfo.wrappedValue } fileprivate func afterCurrentlyRunningJob(_ jobId: Int64, callback: @escaping (JobRunner.JobResult) -> ()) { @@ -660,14 +679,65 @@ private final class JobQueue { } } - fileprivate func hasPendingOrRunningJob(with detailsData: Data?) -> Bool { - guard let detailsData: Data = detailsData else { return false } - + fileprivate func hasPendingOrRunningJobWith( + threadId: String? = nil, + interactionId: Int64? = nil, + detailsData: Data? = nil + ) -> Bool { let pendingJobs: [Job] = queue.wrappedValue + let currentlyRunningJobInfo: [Int64: JobRunner.JobInfo] = currentlyRunningJobInfo.wrappedValue + var possibleJobIds: Set = Set(currentlyRunningJobInfo.keys) + .inserting(contentsOf: pendingJobs.compactMap { $0.id }.asSet()) + + // Remove any which don't have the matching threadId (if provided) + if let targetThreadId: String = threadId { + let pendingJobIdsWithWrongThreadId: Set = pendingJobs + .filter { $0.threadId != targetThreadId } + .compactMap { $0.id } + .asSet() + let runningJobIdsWithWrongThreadId: Set = currentlyRunningJobInfo + .filter { _, info -> Bool in info.threadId != targetThreadId } + .map { key, _ in key } + .asSet() + + possibleJobIds = possibleJobIds + .subtracting(pendingJobIdsWithWrongThreadId) + .subtracting(runningJobIdsWithWrongThreadId) + } + + // Remove any which don't have the matching interactionId (if provided) + if let targetInteractionId: Int64 = interactionId { + let pendingJobIdsWithWrongInteractionId: Set = pendingJobs + .filter { $0.interactionId != targetInteractionId } + .compactMap { $0.id } + .asSet() + let runningJobIdsWithWrongInteractionId: Set = currentlyRunningJobInfo + .filter { _, info -> Bool in info.interactionId != targetInteractionId } + .map { key, _ in key } + .asSet() + + possibleJobIds = possibleJobIds + .subtracting(pendingJobIdsWithWrongInteractionId) + .subtracting(runningJobIdsWithWrongInteractionId) + } + + // Remove any which don't have the matching details (if provided) + if let targetDetailsData: Data = detailsData { + let pendingJobIdsWithWrongDetailsData: Set = pendingJobs + .filter { $0.details != targetDetailsData } + .compactMap { $0.id } + .asSet() + let runningJobIdsWithWrongDetailsData: Set = currentlyRunningJobInfo + .filter { _, info -> Bool in info.detailsData != detailsData } + .map { key, _ in key } + .asSet() + + possibleJobIds = possibleJobIds + .subtracting(pendingJobIdsWithWrongDetailsData) + .subtracting(runningJobIdsWithWrongDetailsData) + } - guard !pendingJobs.contains(where: { job in job.details == detailsData }) else { return true } - - return detailsForCurrentlyRunningJobs.wrappedValue.values.contains(detailsData) + return !possibleJobIds.isEmpty } fileprivate func removePendingJob(_ jobId: Int64) { @@ -706,7 +776,7 @@ private final class JobQueue { } // Get any pending jobs - let jobIdsAlreadyRunning: Set = jobsCurrentlyRunning.wrappedValue + let jobIdsAlreadyRunning: Set = currentlyRunningJobIds.wrappedValue let jobsAlreadyInQueue: Set = queue.wrappedValue.compactMap { $0.id }.asSet() let jobsToRun: [Job] = Storage.shared.read { db in try Job @@ -765,7 +835,7 @@ private final class JobQueue { } guard let (nextJob, numJobsRemaining): (Job, Int) = queue.mutate({ queue in queue.popFirst().map { ($0, queue.count) } }) else { // If it's a serial queue, or there are no more jobs running then update the 'isRunning' flag - if executionType != .concurrent || jobsCurrentlyRunning.wrappedValue.isEmpty { + if executionType != .concurrent || currentlyRunningJobIds.wrappedValue.isEmpty { isRunning.mutate { $0 = false } } @@ -827,7 +897,7 @@ private final class JobQueue { /// /// **Note:** We don't add the current job back the the queue because it should only be re-added if it's dependencies /// are successfully completed - let currentlyRunningJobIds: [Int64] = Array(detailsForCurrentlyRunningJobs.wrappedValue.keys) + let currentlyRunningJobIds: [Int64] = Array(currentlyRunningJobIds.wrappedValue) let dependencyJobsNotCurrentlyRunning: [Job] = dependencyInfo.jobs .filter { job in !currentlyRunningJobIds.contains(job.id ?? -1) } .sorted { lhs, rhs in (lhs.id ?? -1) < (rhs.id ?? -1) } @@ -851,11 +921,20 @@ private final class JobQueue { trigger?.invalidate() // Need to invalidate to prevent a memory leak trigger = nil } - jobsCurrentlyRunning.mutate { jobsCurrentlyRunning in - jobsCurrentlyRunning = jobsCurrentlyRunning.inserting(nextJob.id) - numJobsRunning = jobsCurrentlyRunning.count + currentlyRunningJobIds.mutate { currentlyRunningJobIds in + currentlyRunningJobIds = currentlyRunningJobIds.inserting(nextJob.id) + numJobsRunning = currentlyRunningJobIds.count + } + currentlyRunningJobInfo.mutate { currentlyRunningJobInfo in + currentlyRunningJobInfo = currentlyRunningJobInfo.setting( + nextJob.id, + JobRunner.JobInfo( + threadId: nextJob.threadId, + interactionId: nextJob.interactionId, + detailsData: nextJob.details + ) + ) } - detailsForCurrentlyRunningJobs.mutate { $0 = $0.setting(nextJob.id, nextJob.details) } SNLog("[JobRunner] \(queueContext) started \(nextJob.variant) job (\(executionType == .concurrent ? "\(numJobsRunning) currently running, " : "")\(numJobsRemaining) remaining)") /// As it turns out Combine doesn't plat too nicely with concurrent Dispatch Queues, in Combine events are dispatched asynchronously to @@ -894,7 +973,7 @@ private final class JobQueue { } private func scheduleNextSoonestJob() { - let jobIdsAlreadyRunning: Set = jobsCurrentlyRunning.wrappedValue + let jobIdsAlreadyRunning: Set = currentlyRunningJobIds.wrappedValue let nextJobTimestamp: TimeInterval? = Storage.shared.read { db in try Job .filterPendingJobs( @@ -911,7 +990,7 @@ private final class JobQueue { // If there are no remaining jobs or the JobRunner isn't allowed to start any queues then trigger // the 'onQueueDrained' callback and stop guard let nextJobTimestamp: TimeInterval = nextJobTimestamp, JobRunner.canStartQueues.wrappedValue else { - if executionType != .concurrent || jobsCurrentlyRunning.wrappedValue.isEmpty { + if executionType != .concurrent || currentlyRunningJobIds.wrappedValue.isEmpty { self.onQueueDrained?() } return @@ -922,7 +1001,7 @@ private final class JobQueue { guard secondsUntilNextJob > 0 else { // Only log that the queue is getting restarted if this queue had actually been about to stop - if executionType != .concurrent || jobsCurrentlyRunning.wrappedValue.isEmpty { + if executionType != .concurrent || currentlyRunningJobIds.wrappedValue.isEmpty { let timingString: String = (nextJobTimestamp == 0 ? "that should be in the queue" : "scheduled \(Int(ceil(abs(secondsUntilNextJob)))) second\(Int(ceil(abs(secondsUntilNextJob))) == 1 ? "" : "s") ago" @@ -940,7 +1019,7 @@ private final class JobQueue { } // Only schedule a trigger if this queue has actually completed - guard executionType != .concurrent || jobsCurrentlyRunning.wrappedValue.isEmpty else { return } + guard executionType != .concurrent || currentlyRunningJobIds.wrappedValue.isEmpty else { return } // Setup a trigger SNLog("[JobRunner] Stopping \(queueContext) until next job in \(Int(ceil(abs(secondsUntilNextJob)))) second\(Int(ceil(abs(secondsUntilNextJob))) == 1 ? "" : "s")") @@ -1029,7 +1108,7 @@ private final class JobQueue { /// **Note:** If any of these `dependantJobs` have other dependencies then when they attempt to start they will be /// removed from the queue, replaced by their dependencies if !dependantJobs.isEmpty { - let currentlyRunningJobIds: [Int64] = Array(detailsForCurrentlyRunningJobs.wrappedValue.keys) + let currentlyRunningJobIds: [Int64] = Array(currentlyRunningJobIds.wrappedValue) let dependantJobsNotCurrentlyRunning: [Job] = dependantJobs .filter { job in !currentlyRunningJobIds.contains(job.id ?? -1) } .sorted { lhs, rhs in (lhs.id ?? -1) < (rhs.id ?? -1) } @@ -1211,8 +1290,8 @@ private final class JobQueue { private func performCleanUp(for job: Job, result: JobRunner.JobResult, shouldTriggerCallbacks: Bool = true) { // The job is removed from the queue before it runs so all we need to to is remove it // from the 'currentlyRunning' set - jobsCurrentlyRunning.mutate { $0 = $0.removing(job.id) } - detailsForCurrentlyRunningJobs.mutate { $0 = $0.removingValue(forKey: job.id) } + currentlyRunningJobIds.mutate { $0 = $0.removing(job.id) } + currentlyRunningJobInfo.mutate { $0 = $0.removingValue(forKey: job.id) } guard shouldTriggerCallbacks else { return }