diff --git a/LibSession-Util b/LibSession-Util index 97084c69f..9777b37e8 160000 --- a/LibSession-Util +++ b/LibSession-Util @@ -1 +1 @@ -Subproject commit 97084c69f86e67c675095b48efacc86113ccebb0 +Subproject commit 9777b37e8545febcc082578341352dba7433db21 diff --git a/Session/Onboarding/Onboarding.swift b/Session/Onboarding/Onboarding.swift index 8c664c045..914976728 100644 --- a/Session/Onboarding/Onboarding.swift +++ b/Session/Onboarding/Onboarding.swift @@ -219,11 +219,18 @@ enum Onboarding { } func completeRegistration() { - // Set the `lastDisplayNameUpdate` to the current date, so that we don't - // overwrite what the user set in the display name step with whatever we - // find in their swarm (otherwise the user could enter a display name and - // have it immediately overwritten due to the config request running slow) - UserDefaults.standard[.lastDisplayNameUpdate] = Date() + // Set the `lastNameUpdate` to the current date, so that we don't overwrite + // what the user set in the display name step with whatever we find in their + // swarm (otherwise the user could enter a display name and have it immediately + // overwritten due to the config request running slow) + Storage.shared.write { db in + try Profile + .filter(id: getUserHexEncodedPublicKey(db)) + .updateAllAndConfig( + db, + Profile.Columns.lastNameUpdate.set(to: Date().timeIntervalSince1970) + ) + } // Notify the app that registration is complete Identity.didRegister() diff --git a/Session/Utilities/MockDataGenerator.swift b/Session/Utilities/MockDataGenerator.swift index e20b9b1e9..d4ab94abc 100644 --- a/Session/Utilities/MockDataGenerator.swift +++ b/Session/Utilities/MockDataGenerator.swift @@ -158,7 +158,9 @@ enum MockDataGenerator { id: randomSessionId, name: (0.. = JobRunner - .defailsForCurrentlyRunningJobs(of: .attachmentDownload) + .infoForCurrentlyRunningJobs(of: .attachmentDownload) .filter { key, _ in key != job.id } .values - .compactMap { data -> String? in - guard let data: Data = data else { return nil } + .compactMap { info -> String? in + guard let data: Data = info.detailsData else { return nil } return (try? JSONDecoder().decode(Details.self, from: data))? .attachmentId diff --git a/SessionMessagingKit/Jobs/Types/ConfigMessageReceiveJob.swift b/SessionMessagingKit/Jobs/Types/ConfigMessageReceiveJob.swift index c5f0ae21f..29fe85ab0 100644 --- a/SessionMessagingKit/Jobs/Types/ConfigMessageReceiveJob.swift +++ b/SessionMessagingKit/Jobs/Types/ConfigMessageReceiveJob.swift @@ -16,17 +16,38 @@ public enum ConfigMessageReceiveJob: JobExecutor { failure: @escaping (Job, Error?, Bool) -> (), deferred: @escaping (Job) -> () ) { + /// When the `configMessageReceive` job fails we want to unblock any `messageReceive` jobs it was blocking + /// to ensure the user isn't losing any messages - this generally _shouldn't_ happen but if it does then having a temporary + /// "outdated" state due to standard messages which would have been invalidated by a config change incorrectly being + /// processed is less severe then dropping a bunch on messages just because they were processed in the same poll as + /// invalid config messages + let removeDependencyOnMessageReceiveJobs: () -> () = { + guard let jobId: Int64 = job.id else { return } + + Storage.shared.write { db in + try JobDependencies + .filter(JobDependencies.Columns.dependantId == jobId) + .joining( + required: JobDependencies.job + .filter(Job.Columns.variant == Job.Variant.messageReceive) + ) + .deleteAll(db) + } + } + guard let detailsData: Data = job.details, let details: Details = try? JSONDecoder().decode(Details.self, from: detailsData) else { + removeDependencyOnMessageReceiveJobs() failure(job, JobRunnerError.missingRequiredDetails, true) return } - + // Ensure no standard messages are sent through this job guard !details.messages.contains(where: { $0.variant != .sharedConfigMessage }) else { SNLog("[ConfigMessageReceiveJob] Standard messages incorrectly sent to the 'configMessageReceive' job") + removeDependencyOnMessageReceiveJobs() failure(job, MessageReceiverError.invalidMessage, true) return } @@ -49,8 +70,10 @@ public enum ConfigMessageReceiveJob: JobExecutor { // Handle the result switch lastError { - case let error as MessageReceiverError where !error.isRetryable: failure(job, error, true) - case .some(let error): failure(job, error, false) + case .some(let error): + removeDependencyOnMessageReceiveJobs() + failure(job, error, true) + case .none: success(job, false) } } diff --git a/SessionMessagingKit/Jobs/Types/ConfigurationSyncJob.swift b/SessionMessagingKit/Jobs/Types/ConfigurationSyncJob.swift index 7dbde8b4a..e7eee9f34 100644 --- a/SessionMessagingKit/Jobs/Types/ConfigurationSyncJob.swift +++ b/SessionMessagingKit/Jobs/Types/ConfigurationSyncJob.swift @@ -9,7 +9,7 @@ import SessionUtilitiesKit public enum ConfigurationSyncJob: JobExecutor { public static let maxFailureCount: Int = -1 - public static let requiresThreadId: Bool = false + public static let requiresThreadId: Bool = true public static let requiresInteractionId: Bool = false private static let maxRunFrequency: TimeInterval = 3 @@ -25,13 +25,29 @@ public enum ConfigurationSyncJob: JobExecutor { Identity.userCompletedRequiredOnboarding() else { return success(job, true) } - // On startup it's possible for multiple ConfigSyncJob's to run at the same time (which is - // redundant) so check if there is another job already running and, if so, defer this job - let jobDetails: [Int64: Data?] = JobRunner.defailsForCurrentlyRunningJobs(of: .configurationSync) - - guard jobDetails.setting(job.id, nil).count == 0 else { - deferred(job) // We will re-enqueue when needed - return + // It's possible for multiple ConfigSyncJob's with the same target (user/group) to try to run at the + // same time since as soon as one is started we will enqueue a second one, rather than adding dependencies + // between the jobs we just continue to defer the subsequent job while the first one is running in + // order to prevent multiple configurationSync jobs with the same target from running at the same time + guard + JobRunner + .infoForCurrentlyRunningJobs(of: .configurationSync) + .filter({ key, info in + key != job.id && // Exclude this job + info.threadId == job.threadId // Exclude jobs for different ids + }) + .isEmpty + else { + // Defer the job to run 'maxRunFrequency' from when this one ran (if we don't it'll try start + // it again immediately which is pointless) + let updatedJob: Job? = Storage.shared.write { db in + try job + .with(nextRunTimestamp: Date().timeIntervalSince1970 + maxRunFrequency) + .saved(db) + } + + SNLog("[ConfigurationSyncJob] For \(job.threadId ?? "UnknownId") deferred due to in progress job") + return deferred(updatedJob ?? job) } // If we don't have a userKeyPair yet then there is no need to sync the configuration @@ -42,16 +58,15 @@ public enum ConfigurationSyncJob: JobExecutor { let pendingConfigChanges: [SessionUtil.OutgoingConfResult] = Storage.shared .read({ db in try SessionUtil.pendingChanges(db, publicKey: publicKey) }) else { - failure(job, StorageError.generic, false) - return + SNLog("[ConfigurationSyncJob] For \(job.threadId ?? "UnknownId") failed due to invalid data") + return failure(job, StorageError.generic, false) } // If there are no pending changes then the job can just complete (next time something // is updated we want to try and run immediately so don't scuedule another run in this case) guard !pendingConfigChanges.isEmpty else { - SNLog("[ConfigurationSyncJob] Completed with no pending changes") - success(job, true) - return + SNLog("[ConfigurationSyncJob] For \(publicKey) completed with no pending changes") + return success(job, true) } // Identify the destination and merge all obsolete hashes into a single set @@ -63,6 +78,8 @@ public enum ConfigurationSyncJob: JobExecutor { .map { $0.obsoleteHashes } .reduce([], +) .asSet() + let jobStartTimestamp: TimeInterval = Date().timeIntervalSince1970 + SNLog("[ConfigurationSyncJob] For \(publicKey) started with \(pendingConfigChanges.count) change\(pendingConfigChanges.count == 1 ? "" : "s")") Storage.shared .readPublisher { db in @@ -119,9 +136,9 @@ public enum ConfigurationSyncJob: JobExecutor { .sinkUntilComplete( receiveCompletion: { result in switch result { - case .finished: SNLog("[ConfigurationSyncJob] Completed") + case .finished: SNLog("[ConfigurationSyncJob] For \(publicKey) completed") case .failure(let error): - SNLog("[ConfigurationSyncJob] Failed due to error: \(error)") + SNLog("[ConfigurationSyncJob] For \(publicKey) failed due to error: \(error)") failure(job, error, false) } }, @@ -137,7 +154,7 @@ public enum ConfigurationSyncJob: JobExecutor { // When we complete the 'ConfigurationSync' job we want to immediately schedule // another one with a 'nextRunTimestamp' set to the 'maxRunFrequency' value to // throttle the config sync requests - let nextRunTimestamp: TimeInterval = (Date().timeIntervalSince1970 + maxRunFrequency) + let nextRunTimestamp: TimeInterval = (jobStartTimestamp + maxRunFrequency) // If another 'ConfigurationSync' job was scheduled then update that one // to run at 'nextRunTimestamp' and make the current job stop @@ -146,6 +163,7 @@ public enum ConfigurationSyncJob: JobExecutor { .filter(Job.Columns.id != job.id) .filter(Job.Columns.variant == Job.Variant.configurationSync) .filter(Job.Columns.threadId == publicKey) + .order(Job.Columns.nextRunTimestamp.asc) .fetchOne(db) { // If the next job isn't currently running then delay it's start time @@ -175,7 +193,6 @@ public enum ConfigurationSyncJob: JobExecutor { // MARK: - Convenience public extension ConfigurationSyncJob { - static func enqueue(_ db: Database, publicKey: String) { // FIXME: Remove this once `useSharedUtilForUserConfig` is permanent guard SessionUtil.userConfigsEnabled(db) else { diff --git a/SessionMessagingKit/Sending & Receiving/Errors/MessageReceiverError.swift b/SessionMessagingKit/Sending & Receiving/Errors/MessageReceiverError.swift index 5a89819cb..bcc097efe 100644 --- a/SessionMessagingKit/Sending & Receiving/Errors/MessageReceiverError.swift +++ b/SessionMessagingKit/Sending & Receiving/Errors/MessageReceiverError.swift @@ -21,13 +21,14 @@ public enum MessageReceiverError: LocalizedError { case noGroupKeyPair case invalidSharedConfigMessageHandling case requiredThreadNotInConfig + case outdatedMessage public var isRetryable: Bool { switch self { case .duplicateMessage, .duplicateMessageNewSnode, .duplicateControlMessage, .invalidMessage, .unknownMessage, .unknownEnvelopeType, .invalidSignature, .noData, .senderBlocked, .noThread, .selfSend, .decryptionFailed, - .invalidSharedConfigMessageHandling, .requiredThreadNotInConfig: + .invalidSharedConfigMessageHandling, .requiredThreadNotInConfig, .outdatedMessage: return false default: return true @@ -57,6 +58,7 @@ public enum MessageReceiverError: LocalizedError { case .invalidSharedConfigMessageHandling: return "Invalid handling of a shared config message." case .requiredThreadNotInConfig: return "Required thread not in config." + case .outdatedMessage: return "Message was sent before a config change which would have removed the message." } } } diff --git a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+Calls.swift b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+Calls.swift index 8737f2643..dafc634d2 100644 --- a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+Calls.swift +++ b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+Calls.swift @@ -13,8 +13,31 @@ extension MessageReceiver { threadVariant: SessionThread.Variant, message: CallMessage ) throws { + let timestampMs: Int64 = (message.sentTimestamp.map { Int64($0) } ?? SnodeAPI.currentOffsetTimestampMs()) + // Only support calls from contact threads - guard threadVariant == .contact else { return } + guard + threadVariant == .contact, + /// Only process the message if the thread `shouldBeVisible` or it was sent after the libSession buffer period + ( + SessionThread + .filter(id: threadId) + .filter(SessionThread.Columns.shouldBeVisible == true) + .isNotEmpty(db) || + SessionUtil.conversationInConfig( + db, + threadId: threadId, + threadVariant: threadVariant, + visibleOnly: true + ) || + SessionUtil.canPerformChange( + db, + threadId: threadId, + targetConfig: .contacts, + changeTimestampMs: timestampMs + ) + ) + else { return } switch message.kind { case .preOffer: try MessageReceiver.handleNewCallMessage(db, message: message) diff --git a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+ClosedGroups.swift b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+ClosedGroups.swift index 0fb862067..e413db82c 100644 --- a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+ClosedGroups.swift +++ b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+ClosedGroups.swift @@ -77,7 +77,28 @@ extension MessageReceiver { targetConfig: .userGroups, changeTimestampMs: Int64(sentTimestamp) ) - else { return SNLog("Ignoring outdated NEW legacy group message due to more recent config state") } + else { + // If the closed group already exists then store the encryption keys (just in case - there can be + // some weird edge-cases where we don't have keys we need if we don't store them) + let groupPublicKey: String = publicKeyAsData.toHexString() + let receivedTimestamp: TimeInterval = (TimeInterval(SnodeAPI.currentOffsetTimestampMs()) / 1000) + let newKeyPair: ClosedGroupKeyPair = ClosedGroupKeyPair( + threadId: groupPublicKey, + publicKey: Data(encryptionKeyPair.publicKey), + secretKey: Data(encryptionKeyPair.secretKey), + receivedTimestamp: receivedTimestamp + ) + + guard + ClosedGroup.filter(id: groupPublicKey).isNotEmpty(db), + !ClosedGroupKeyPair + .filter(ClosedGroupKeyPair.Columns.threadKeyPairHash == newKeyPair.threadKeyPairHash) + .isNotEmpty(db) + else { return SNLog("Ignoring outdated NEW legacy group message due to more recent config state") } + + try newKeyPair.insert(db) + return + } try handleNewClosedGroup( db, @@ -591,9 +612,10 @@ extension MessageReceiver { message.sentTimestamp.map { Int64($0) } ?? SnodeAPI.currentOffsetTimestampMs() ) + // Only actually make the change if SessionUtil says we can (we always want to insert the info // message though) - if SessionUtil.canPerformChange(db, threadId: threadId, targetConfig: .userGroups, changeTimestampMs: timestampMs ) { + if SessionUtil.canPerformChange(db, threadId: threadId, targetConfig: .userGroups, changeTimestampMs: timestampMs) { // Legacy groups used these control messages for making changes, new groups only use them // for information purposes switch threadVariant { diff --git a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+DataExtractionNotification.swift b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+DataExtractionNotification.swift index c1edbbe6a..e9397be26 100644 --- a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+DataExtractionNotification.swift +++ b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+DataExtractionNotification.swift @@ -3,6 +3,7 @@ import Foundation import GRDB import SessionSnodeKit +import SessionUtilitiesKit extension MessageReceiver { internal static func handleDataExtractionNotification( @@ -11,11 +12,45 @@ extension MessageReceiver { threadVariant: SessionThread.Variant, message: DataExtractionNotification ) throws { + let timestampMs: Int64 = ( + message.sentTimestamp.map { Int64($0) } ?? + SnodeAPI.currentOffsetTimestampMs() + ) + guard threadVariant == .contact, let sender: String = message.sender, let messageKind: DataExtractionNotification.Kind = message.kind - else { return } + else { throw MessageReceiverError.invalidMessage } + + /// Only process the message if the thread `shouldBeVisible` or it was sent after the libSession buffer period + guard + SessionThread + .filter(id: threadId) + .filter(SessionThread.Columns.shouldBeVisible == true) + .isNotEmpty(db) || + SessionUtil.conversationInConfig( + db, + threadId: threadId, + threadVariant: threadVariant, + visibleOnly: true + ) || + SessionUtil.canPerformChange( + db, + threadId: threadId, + targetConfig: { + switch threadVariant { + case .contact: + let currentUserPublicKey: String = getUserHexEncodedPublicKey(db) + + return (threadId == currentUserPublicKey ? .userProfile : .contacts) + + default: return .userGroups + } + }(), + changeTimestampMs: timestampMs + ) + else { throw MessageReceiverError.outdatedMessage } _ = try Interaction( serverHash: message.serverHash, @@ -27,10 +62,7 @@ extension MessageReceiver { case .mediaSaved: return .infoMediaSavedNotification } }(), - timestampMs: ( - message.sentTimestamp.map { Int64($0) } ?? - SnodeAPI.currentOffsetTimestampMs() - ) + timestampMs: timestampMs ).inserted(db) } } diff --git a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+ExpirationTimers.swift b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+ExpirationTimers.swift index 1d8ef1033..fbec8536b 100644 --- a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+ExpirationTimers.swift +++ b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+ExpirationTimers.swift @@ -15,9 +15,9 @@ extension MessageReceiver { // Only process these for contact and legacy groups (new groups handle it separately) (threadVariant == .contact || threadVariant == .legacyGroup), let sender: String = message.sender - else { return } + else { throw MessageReceiverError.invalidMessage } - // Update the configuration + // Generate an updated configuration // // Note: Messages which had been sent during the previous configuration will still // use it's settings (so if you enable, send a message and then disable disappearing @@ -34,18 +34,40 @@ extension MessageReceiver { DisappearingMessagesConfiguration.defaultDuration ) ) + let timestampMs: Int64 = Int64(message.sentTimestamp ?? 0) // Default to `0` if not set - // Legacy closed groups need to update the SessionUtil - switch threadVariant { - case .legacyGroup: - try SessionUtil - .update( - db, - groupPublicKey: threadId, - disappearingConfig: config - ) - - default: break + // Only actually make the change if SessionUtil says we can (we always want to insert the info + // message though) + let canPerformChange: Bool = SessionUtil.canPerformChange( + db, + threadId: threadId, + targetConfig: { + switch threadVariant { + case .contact: + let currentUserPublicKey: String = getUserHexEncodedPublicKey(db) + + return (threadId == currentUserPublicKey ? .userProfile : .contacts) + + default: return .userGroups + } + }(), + changeTimestampMs: timestampMs + ) + + // Only update libSession if we can perform the change + if canPerformChange { + // Legacy closed groups need to update the SessionUtil + switch threadVariant { + case .legacyGroup: + try SessionUtil + .update( + db, + groupPublicKey: threadId, + disappearingConfig: config + ) + + default: break + } } // Add an info message for the user @@ -60,11 +82,14 @@ extension MessageReceiver { nil ) ), - timestampMs: Int64(message.sentTimestamp ?? 0) // Default to `0` if not set + timestampMs: timestampMs ).inserted(db) - // Finally save the changes to the DisappearingMessagesConfiguration (If it's a duplicate - // then the interaction unique constraint will prevent the code from getting here) - try config.save(db) + // Only save the updated config if we can perform the change + if canPerformChange { + // Finally save the changes to the DisappearingMessagesConfiguration (If it's a duplicate + // then the interaction unique constraint will prevent the code from getting here) + try config.save(db) + } } } diff --git a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+MessageRequests.swift b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+MessageRequests.swift index 16968da4a..fbaad1124 100644 --- a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+MessageRequests.swift +++ b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+MessageRequests.swift @@ -13,11 +13,37 @@ extension MessageReceiver { dependencies: SMKDependencies ) throws { let userPublicKey = getUserHexEncodedPublicKey(db, dependencies: dependencies) + let timestampMs: Int64 = ( + message.sentTimestamp.map { Int64($0) } ?? + SnodeAPI.currentOffsetTimestampMs() + ) var blindedContactIds: [String] = [] // Ignore messages which were sent from the current user - guard message.sender != userPublicKey else { return } - guard let senderId: String = message.sender else { return } + guard + message.sender != userPublicKey, + let senderId: String = message.sender + else { throw MessageReceiverError.invalidMessage } + + /// Only process the message if the thread `shouldBeVisible` or it was sent after the libSession buffer period + guard + SessionThread + .filter(id: senderId) + .filter(SessionThread.Columns.shouldBeVisible == true) + .isNotEmpty(db) || + SessionUtil.conversationInConfig( + db, + threadId: senderId, + threadVariant: .contact, + visibleOnly: true + ) || + SessionUtil.canPerformChange( + db, + threadId: senderId, + targetConfig: .contacts, + changeTimestampMs: timestampMs + ) + else { throw MessageReceiverError.outdatedMessage } // Update profile if needed (want to do this regardless of whether the message exists or // not to ensure the profile info gets sync between a users devices at every chance) @@ -134,10 +160,7 @@ extension MessageReceiver { threadId: unblindedThread.id, authorId: senderId, variant: .infoMessageRequestAccepted, - timestampMs: ( - message.sentTimestamp.map { Int64($0) } ?? - SnodeAPI.currentOffsetTimestampMs() - ) + timestampMs: timestampMs ).inserted(db) } diff --git a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+VisibleMessages.swift b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+VisibleMessages.swift index a891bb3ba..d41cf4f40 100644 --- a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+VisibleMessages.swift +++ b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageReceiver+VisibleMessages.swift @@ -3,6 +3,7 @@ import Foundation import GRDB import Sodium +import SessionSnodeKit import SessionUtilitiesKit extension MessageReceiver { @@ -22,6 +23,32 @@ extension MessageReceiver { // seconds to maintain the accuracy) let messageSentTimestamp: TimeInterval = (TimeInterval(message.sentTimestamp ?? 0) / 1000) let isMainAppActive: Bool = (UserDefaults.sharedLokiProject?[.isMainAppActive]).defaulting(to: false) + let currentUserPublicKey: String = getUserHexEncodedPublicKey(db, dependencies: dependencies) + + /// Only process the message if the thread `shouldBeVisible` or it was sent after the libSession buffer period + guard + SessionThread + .filter(id: threadId) + .filter(SessionThread.Columns.shouldBeVisible == true) + .isNotEmpty(db) || + SessionUtil.conversationInConfig( + db, + threadId: threadId, + threadVariant: threadVariant, + visibleOnly: true + ) || + SessionUtil.canPerformChange( + db, + threadId: threadId, + targetConfig: { + switch threadVariant { + case .contact: return (threadId == currentUserPublicKey ? .userProfile : .contacts) + default: return .userGroups + } + }(), + changeTimestampMs: (message.sentTimestamp.map { Int64($0) } ?? SnodeAPI.currentOffsetTimestampMs()) + ) + else { throw MessageReceiverError.outdatedMessage } // Update profile if needed (want to do this regardless of whether the message exists or // not to ensure the profile info gets sync between a users devices at every chance) @@ -63,7 +90,6 @@ extension MessageReceiver { } // Store the message variant so we can run variant-specific behaviours - let currentUserPublicKey: String = getUserHexEncodedPublicKey(db, dependencies: dependencies) let thread: SessionThread = try SessionThread .fetchOrCreate(db, id: threadId, variant: threadVariant, shouldBeVisible: nil) let maybeOpenGroup: OpenGroup? = { diff --git a/SessionMessagingKit/SessionUtil/Config Handling/SessionUtil+Contacts.swift b/SessionMessagingKit/SessionUtil/Config Handling/SessionUtil+Contacts.swift index 63301fa5b..66fe34ec3 100644 --- a/SessionMessagingKit/SessionUtil/Config Handling/SessionUtil+Contacts.swift +++ b/SessionMessagingKit/SessionUtil/Config Handling/SessionUtil+Contacts.swift @@ -31,7 +31,8 @@ internal extension SessionUtil { static func handleContactsUpdate( _ db: Database, in conf: UnsafeMutablePointer?, - mergeNeedsDump: Bool + mergeNeedsDump: Bool, + latestConfigSentTimestampMs: Int64 ) throws { typealias ContactData = [ String: ( @@ -64,6 +65,7 @@ internal extension SessionUtil { let profileResult: Profile = Profile( id: contactId, name: String(libSessionVal: contact.name), + lastNameUpdate: (TimeInterval(latestConfigSentTimestampMs) / 1000), nickname: String(libSessionVal: contact.nickname, nullIfEmpty: true), profilePictureUrl: profilePictureUrl, profileEncryptionKey: (profilePictureUrl == nil ? nil : @@ -71,7 +73,8 @@ internal extension SessionUtil { libSessionVal: contact.profile_pic.key, count: ProfileManager.avatarAES256KeyByteLength ) - ) + ), + lastProfilePictureUpdate: (TimeInterval(latestConfigSentTimestampMs) / 1000) ) contactData[contactId] = ( @@ -99,12 +102,23 @@ internal extension SessionUtil { // observation system can't differ between update calls which do and don't change anything) let contact: Contact = Contact.fetchOrCreate(db, id: sessionId) let profile: Profile = Profile.fetchOrCreate(db, id: sessionId) + let profileNameShouldBeUpdated: Bool = ( + !data.profile.name.isEmpty && + profile.name != data.profile.name && + profile.lastNameUpdate < data.profile.lastNameUpdate + ) + let profilePictureShouldBeUpdated: Bool = ( + ( + profile.profilePictureUrl != data.profile.profilePictureUrl || + profile.profileEncryptionKey != data.profile.profileEncryptionKey + ) && + profile.lastProfilePictureUpdate < data.profile.lastProfilePictureUpdate + ) if - (!data.profile.name.isEmpty && profile.name != data.profile.name) || + profileNameShouldBeUpdated || profile.nickname != data.profile.nickname || - profile.profilePictureUrl != data.profile.profilePictureUrl || - profile.profileEncryptionKey != data.profile.profileEncryptionKey + profilePictureShouldBeUpdated { try profile.save(db) try Profile @@ -112,9 +126,12 @@ internal extension SessionUtil { .updateAll( // Handling a config update so don't use `updateAllAndConfig` db, [ - (data.profile.name.isEmpty || profile.name == data.profile.name ? nil : + (!profileNameShouldBeUpdated ? nil : Profile.Columns.name.set(to: data.profile.name) ), + (!profileNameShouldBeUpdated ? nil : + Profile.Columns.lastNameUpdate.set(to: data.profile.lastNameUpdate) + ), (profile.nickname == data.profile.nickname ? nil : Profile.Columns.nickname.set(to: data.profile.nickname) ), @@ -123,6 +140,9 @@ internal extension SessionUtil { ), (profile.profileEncryptionKey != data.profile.profileEncryptionKey ? nil : Profile.Columns.profileEncryptionKey.set(to: data.profile.profileEncryptionKey) + ), + (!profilePictureShouldBeUpdated ? nil : + Profile.Columns.lastProfilePictureUpdate.set(to: data.profile.lastProfilePictureUpdate) ) ].compactMap { $0 } ) diff --git a/SessionMessagingKit/SessionUtil/Config Handling/SessionUtil+Shared.swift b/SessionMessagingKit/SessionUtil/Config Handling/SessionUtil+Shared.swift index 3ea0d9194..f6d6e74a5 100644 --- a/SessionMessagingKit/SessionUtil/Config Handling/SessionUtil+Shared.swift +++ b/SessionMessagingKit/SessionUtil/Config Handling/SessionUtil+Shared.swift @@ -326,7 +326,7 @@ internal extension SessionUtil { .defaulting(to: 0) // Ensure the change occurred after the last config message was handled (minus the buffer period) - return (changeTimestampMs > (configDumpTimestampMs - Int64(SessionUtil.configChangeBufferPeriod * 1000))) + return (changeTimestampMs >= (configDumpTimestampMs - Int64(SessionUtil.configChangeBufferPeriod * 1000))) } } diff --git a/SessionMessagingKit/SessionUtil/SessionUtil.swift b/SessionMessagingKit/SessionUtil/SessionUtil.swift index 391a4a45a..de5e22d45 100644 --- a/SessionMessagingKit/SessionUtil/SessionUtil.swift +++ b/SessionMessagingKit/SessionUtil/SessionUtil.swift @@ -461,7 +461,8 @@ public enum SessionUtil { try SessionUtil.handleContactsUpdate( db, in: conf, - mergeNeedsDump: config_needs_dump(conf) + mergeNeedsDump: config_needs_dump(conf), + latestConfigSentTimestampMs: latestConfigSentTimestampMs ) case .convoInfoVolatile: diff --git a/SessionMessagingKit/Utilities/ProfileManager.swift b/SessionMessagingKit/Utilities/ProfileManager.swift index 4b5708675..ffbff0aeb 100644 --- a/SessionMessagingKit/Utilities/ProfileManager.swift +++ b/SessionMessagingKit/Utilities/ProfileManager.swift @@ -495,47 +495,28 @@ public struct ProfileManager { // Name if let name: String = name, !name.isEmpty, name != profile.name { - let shouldUpdate: Bool = { - guard isCurrentUser else { return true } - - return UserDefaults.standard[.lastDisplayNameUpdate] - .map { sentTimestamp > $0.timeIntervalSince1970 } - .defaulting(to: true) - }() - - if shouldUpdate { - if isCurrentUser { - UserDefaults.standard[.lastDisplayNameUpdate] = Date(timeIntervalSince1970: sentTimestamp) - } - + // FIXME: Remove the `userConfigsEnabled` check once `useSharedUtilForUserConfig` is permanent + if sentTimestamp > profile.lastNameUpdate || (isCurrentUser && (calledFromConfigHandling || !SessionUtil.userConfigsEnabled(db))) { profileChanges.append(Profile.Columns.name.set(to: name)) + profileChanges.append(Profile.Columns.lastNameUpdate.set(to: sentTimestamp)) } } // Profile picture & profile key var avatarNeedsDownload: Bool = false var targetAvatarUrl: String? = nil - let shouldUpdateAvatar: Bool = { - guard isCurrentUser else { return true } - - return UserDefaults.standard[.lastProfilePictureUpdate] - .map { sentTimestamp > $0.timeIntervalSince1970 } - .defaulting(to: true) - }() - if shouldUpdateAvatar { + // FIXME: Remove the `userConfigsEnabled` check once `useSharedUtilForUserConfig` is permanent + if sentTimestamp > profile.lastProfilePictureUpdate || (isCurrentUser && (calledFromConfigHandling || !SessionUtil.userConfigsEnabled(db))) { switch avatarUpdate { case .none: break case .uploadImageData: preconditionFailure("Invalid options for this function") case .remove: - if isCurrentUser { - UserDefaults.standard[.lastProfilePictureUpdate] = Date(timeIntervalSince1970: sentTimestamp) - } - profileChanges.append(Profile.Columns.profilePictureUrl.set(to: nil)) profileChanges.append(Profile.Columns.profileEncryptionKey.set(to: nil)) profileChanges.append(Profile.Columns.profilePictureFileName.set(to: nil)) + profileChanges.append(Profile.Columns.lastProfilePictureUpdate.set(to: sentTimestamp)) case .updateTo(let url, let key, let fileName): if url != profile.profilePictureUrl { @@ -558,6 +539,9 @@ public struct ProfileManager { !ProfileManager.hasProfileImageData(with: fileName) ) } + + // Update the 'lastProfilePictureUpdate' timestamp for either external or local changes + profileChanges.append(Profile.Columns.lastProfilePictureUpdate.set(to: sentTimestamp)) } } diff --git a/SessionUIKit/Components/ProfilePictureView.swift b/SessionUIKit/Components/ProfilePictureView.swift index 1ffe575a7..b05e69606 100644 --- a/SessionUIKit/Components/ProfilePictureView.swift +++ b/SessionUIKit/Components/ProfilePictureView.swift @@ -65,7 +65,7 @@ public final class ProfilePictureView: UIView { var iconSize: CGFloat { switch self { - case .navigation, .message: return 8 + case .navigation, .message: return 10 // Intentionally not a multiple of 4 case .list: return 16 case .hero: return 24 } diff --git a/SessionUtilitiesKit/Database/Models/JobDependencies.swift b/SessionUtilitiesKit/Database/Models/JobDependencies.swift index 16201367b..bca762c5a 100644 --- a/SessionUtilitiesKit/Database/Models/JobDependencies.swift +++ b/SessionUtilitiesKit/Database/Models/JobDependencies.swift @@ -7,8 +7,8 @@ public struct JobDependencies: Codable, Equatable, Hashable, FetchableRecord, Pe public static var databaseTableName: String { "jobDependencies" } internal static let jobForeignKey = ForeignKey([Columns.jobId], to: [Job.Columns.id]) internal static let dependantForeignKey = ForeignKey([Columns.dependantId], to: [Job.Columns.id]) - internal static let job = belongsTo(Job.self, using: jobForeignKey) - internal static let dependant = hasOne(Job.self, using: Job.dependencyForeignKey) + public static let job = belongsTo(Job.self, using: jobForeignKey) + public static let dependant = hasOne(Job.self, using: Job.dependencyForeignKey) public typealias Columns = CodingKeys public enum CodingKeys: String, CodingKey, ColumnExpression { diff --git a/SessionUtilitiesKit/General/SNUserDefaults.swift b/SessionUtilitiesKit/General/SNUserDefaults.swift index 82f18bd64..bc55e8231 100644 --- a/SessionUtilitiesKit/General/SNUserDefaults.swift +++ b/SessionUtilitiesKit/General/SNUserDefaults.swift @@ -38,8 +38,6 @@ public enum SNUserDefaults { public enum Date: Swift.String { case lastConfigurationSync - case lastDisplayNameUpdate - case lastProfilePictureUpdate case lastProfilePictureUpload case lastOpenGroupImageUpdate case lastOpen diff --git a/SessionUtilitiesKit/JobRunner/JobRunner.swift b/SessionUtilitiesKit/JobRunner/JobRunner.swift index 19a750f26..48752c5bf 100644 --- a/SessionUtilitiesKit/JobRunner/JobRunner.swift +++ b/SessionUtilitiesKit/JobRunner/JobRunner.swift @@ -42,6 +42,12 @@ public final class JobRunner { case notFound } + public struct JobInfo { + public let threadId: String? + public let interactionId: Int64? + public let detailsData: Data? + } + private static let blockingQueue: Atomic = Atomic( JobQueue( type: .blocking, @@ -381,8 +387,8 @@ public final class JobRunner { return (queues.wrappedValue[job.variant]?.isCurrentlyRunning(jobId) == true) } - public static func defailsForCurrentlyRunningJobs(of variant: Job.Variant) -> [Int64: Data?] { - return (queues.wrappedValue[variant]?.detailsForAllCurrentlyRunningJobs()) + public static func infoForCurrentlyRunningJobs(of variant: Job.Variant) -> [Int64: JobInfo] { + return (queues.wrappedValue[variant]?.infoForAllCurrentlyRunningJobs()) .defaulting(to: [:]) } @@ -395,11 +401,24 @@ public final class JobRunner { queue.afterCurrentlyRunningJob(jobId, callback: callback) } - public static func hasPendingOrRunningJob(with variant: Job.Variant, details: T) -> Bool { + public static func hasPendingOrRunningJob( + with variant: Job.Variant, + threadId: String? = nil, + interactionId: Int64? = nil, + details: T? = nil + ) -> Bool { guard let targetQueue: JobQueue = queues.wrappedValue[variant] else { return false } - guard let detailsData: Data = try? JSONEncoder().encode(details) else { return false } - return targetQueue.hasPendingOrRunningJob(with: detailsData) + // Ensure we can encode the details (if provided) + let detailsData: Data? = details.map { try? JSONEncoder().encode($0) } + + guard details == nil || detailsData != nil else { return false } + + return targetQueue.hasPendingOrRunningJobWith( + threadId: threadId, + interactionId: interactionId, + detailsData: detailsData + ) } public static func removePendingJob(_ job: Job?) { @@ -513,9 +532,9 @@ private final class JobQueue { private var nextTrigger: Atomic = Atomic(nil) fileprivate var isRunning: Atomic = Atomic(false) private var queue: Atomic<[Job]> = Atomic([]) - private var jobsCurrentlyRunning: Atomic> = Atomic([]) private var jobCallbacks: Atomic<[Int64: [(JobRunner.JobResult) -> ()]]> = Atomic([:]) - private var detailsForCurrentlyRunningJobs: Atomic<[Int64: Data?]> = Atomic([:]) + private var currentlyRunningJobIds: Atomic> = Atomic([]) + private var currentlyRunningJobInfo: Atomic<[Int64: JobRunner.JobInfo]> = Atomic([:]) private var deferLoopTracker: Atomic<[Int64: (count: Int, times: [TimeInterval])]> = Atomic([:]) fileprivate var hasPendingJobs: Bool { !queue.wrappedValue.isEmpty } @@ -620,7 +639,7 @@ private final class JobQueue { } fileprivate func appDidBecomeActive(with jobs: [Job], canStart: Bool) { - let currentlyRunningJobIds: Set = jobsCurrentlyRunning.wrappedValue + let currentlyRunningJobIds: Set = currentlyRunningJobIds.wrappedValue queue.mutate { queue in // Avoid re-adding jobs to the queue that are already in it (this can @@ -642,11 +661,11 @@ private final class JobQueue { } fileprivate func isCurrentlyRunning(_ jobId: Int64) -> Bool { - return jobsCurrentlyRunning.wrappedValue.contains(jobId) + return currentlyRunningJobIds.wrappedValue.contains(jobId) } - fileprivate func detailsForAllCurrentlyRunningJobs() -> [Int64: Data?] { - return detailsForCurrentlyRunningJobs.wrappedValue + fileprivate func infoForAllCurrentlyRunningJobs() -> [Int64: JobRunner.JobInfo] { + return currentlyRunningJobInfo.wrappedValue } fileprivate func afterCurrentlyRunningJob(_ jobId: Int64, callback: @escaping (JobRunner.JobResult) -> ()) { @@ -660,14 +679,65 @@ private final class JobQueue { } } - fileprivate func hasPendingOrRunningJob(with detailsData: Data?) -> Bool { - guard let detailsData: Data = detailsData else { return false } - + fileprivate func hasPendingOrRunningJobWith( + threadId: String? = nil, + interactionId: Int64? = nil, + detailsData: Data? = nil + ) -> Bool { let pendingJobs: [Job] = queue.wrappedValue + let currentlyRunningJobInfo: [Int64: JobRunner.JobInfo] = currentlyRunningJobInfo.wrappedValue + var possibleJobIds: Set = Set(currentlyRunningJobInfo.keys) + .inserting(contentsOf: pendingJobs.compactMap { $0.id }.asSet()) + + // Remove any which don't have the matching threadId (if provided) + if let targetThreadId: String = threadId { + let pendingJobIdsWithWrongThreadId: Set = pendingJobs + .filter { $0.threadId != targetThreadId } + .compactMap { $0.id } + .asSet() + let runningJobIdsWithWrongThreadId: Set = currentlyRunningJobInfo + .filter { _, info -> Bool in info.threadId != targetThreadId } + .map { key, _ in key } + .asSet() + + possibleJobIds = possibleJobIds + .subtracting(pendingJobIdsWithWrongThreadId) + .subtracting(runningJobIdsWithWrongThreadId) + } + + // Remove any which don't have the matching interactionId (if provided) + if let targetInteractionId: Int64 = interactionId { + let pendingJobIdsWithWrongInteractionId: Set = pendingJobs + .filter { $0.interactionId != targetInteractionId } + .compactMap { $0.id } + .asSet() + let runningJobIdsWithWrongInteractionId: Set = currentlyRunningJobInfo + .filter { _, info -> Bool in info.interactionId != targetInteractionId } + .map { key, _ in key } + .asSet() + + possibleJobIds = possibleJobIds + .subtracting(pendingJobIdsWithWrongInteractionId) + .subtracting(runningJobIdsWithWrongInteractionId) + } + + // Remove any which don't have the matching details (if provided) + if let targetDetailsData: Data = detailsData { + let pendingJobIdsWithWrongDetailsData: Set = pendingJobs + .filter { $0.details != targetDetailsData } + .compactMap { $0.id } + .asSet() + let runningJobIdsWithWrongDetailsData: Set = currentlyRunningJobInfo + .filter { _, info -> Bool in info.detailsData != detailsData } + .map { key, _ in key } + .asSet() + + possibleJobIds = possibleJobIds + .subtracting(pendingJobIdsWithWrongDetailsData) + .subtracting(runningJobIdsWithWrongDetailsData) + } - guard !pendingJobs.contains(where: { job in job.details == detailsData }) else { return true } - - return detailsForCurrentlyRunningJobs.wrappedValue.values.contains(detailsData) + return !possibleJobIds.isEmpty } fileprivate func removePendingJob(_ jobId: Int64) { @@ -706,7 +776,7 @@ private final class JobQueue { } // Get any pending jobs - let jobIdsAlreadyRunning: Set = jobsCurrentlyRunning.wrappedValue + let jobIdsAlreadyRunning: Set = currentlyRunningJobIds.wrappedValue let jobsAlreadyInQueue: Set = queue.wrappedValue.compactMap { $0.id }.asSet() let jobsToRun: [Job] = Storage.shared.read { db in try Job @@ -765,7 +835,7 @@ private final class JobQueue { } guard let (nextJob, numJobsRemaining): (Job, Int) = queue.mutate({ queue in queue.popFirst().map { ($0, queue.count) } }) else { // If it's a serial queue, or there are no more jobs running then update the 'isRunning' flag - if executionType != .concurrent || jobsCurrentlyRunning.wrappedValue.isEmpty { + if executionType != .concurrent || currentlyRunningJobIds.wrappedValue.isEmpty { isRunning.mutate { $0 = false } } @@ -827,7 +897,7 @@ private final class JobQueue { /// /// **Note:** We don't add the current job back the the queue because it should only be re-added if it's dependencies /// are successfully completed - let currentlyRunningJobIds: [Int64] = Array(detailsForCurrentlyRunningJobs.wrappedValue.keys) + let currentlyRunningJobIds: [Int64] = Array(currentlyRunningJobIds.wrappedValue) let dependencyJobsNotCurrentlyRunning: [Job] = dependencyInfo.jobs .filter { job in !currentlyRunningJobIds.contains(job.id ?? -1) } .sorted { lhs, rhs in (lhs.id ?? -1) < (rhs.id ?? -1) } @@ -851,11 +921,20 @@ private final class JobQueue { trigger?.invalidate() // Need to invalidate to prevent a memory leak trigger = nil } - jobsCurrentlyRunning.mutate { jobsCurrentlyRunning in - jobsCurrentlyRunning = jobsCurrentlyRunning.inserting(nextJob.id) - numJobsRunning = jobsCurrentlyRunning.count + currentlyRunningJobIds.mutate { currentlyRunningJobIds in + currentlyRunningJobIds = currentlyRunningJobIds.inserting(nextJob.id) + numJobsRunning = currentlyRunningJobIds.count + } + currentlyRunningJobInfo.mutate { currentlyRunningJobInfo in + currentlyRunningJobInfo = currentlyRunningJobInfo.setting( + nextJob.id, + JobRunner.JobInfo( + threadId: nextJob.threadId, + interactionId: nextJob.interactionId, + detailsData: nextJob.details + ) + ) } - detailsForCurrentlyRunningJobs.mutate { $0 = $0.setting(nextJob.id, nextJob.details) } SNLog("[JobRunner] \(queueContext) started \(nextJob.variant) job (\(executionType == .concurrent ? "\(numJobsRunning) currently running, " : "")\(numJobsRemaining) remaining)") /// As it turns out Combine doesn't plat too nicely with concurrent Dispatch Queues, in Combine events are dispatched asynchronously to @@ -894,7 +973,7 @@ private final class JobQueue { } private func scheduleNextSoonestJob() { - let jobIdsAlreadyRunning: Set = jobsCurrentlyRunning.wrappedValue + let jobIdsAlreadyRunning: Set = currentlyRunningJobIds.wrappedValue let nextJobTimestamp: TimeInterval? = Storage.shared.read { db in try Job .filterPendingJobs( @@ -911,7 +990,7 @@ private final class JobQueue { // If there are no remaining jobs or the JobRunner isn't allowed to start any queues then trigger // the 'onQueueDrained' callback and stop guard let nextJobTimestamp: TimeInterval = nextJobTimestamp, JobRunner.canStartQueues.wrappedValue else { - if executionType != .concurrent || jobsCurrentlyRunning.wrappedValue.isEmpty { + if executionType != .concurrent || currentlyRunningJobIds.wrappedValue.isEmpty { self.onQueueDrained?() } return @@ -922,7 +1001,7 @@ private final class JobQueue { guard secondsUntilNextJob > 0 else { // Only log that the queue is getting restarted if this queue had actually been about to stop - if executionType != .concurrent || jobsCurrentlyRunning.wrappedValue.isEmpty { + if executionType != .concurrent || currentlyRunningJobIds.wrappedValue.isEmpty { let timingString: String = (nextJobTimestamp == 0 ? "that should be in the queue" : "scheduled \(Int(ceil(abs(secondsUntilNextJob)))) second\(Int(ceil(abs(secondsUntilNextJob))) == 1 ? "" : "s") ago" @@ -940,7 +1019,7 @@ private final class JobQueue { } // Only schedule a trigger if this queue has actually completed - guard executionType != .concurrent || jobsCurrentlyRunning.wrappedValue.isEmpty else { return } + guard executionType != .concurrent || currentlyRunningJobIds.wrappedValue.isEmpty else { return } // Setup a trigger SNLog("[JobRunner] Stopping \(queueContext) until next job in \(Int(ceil(abs(secondsUntilNextJob)))) second\(Int(ceil(abs(secondsUntilNextJob))) == 1 ? "" : "s")") @@ -1029,7 +1108,7 @@ private final class JobQueue { /// **Note:** If any of these `dependantJobs` have other dependencies then when they attempt to start they will be /// removed from the queue, replaced by their dependencies if !dependantJobs.isEmpty { - let currentlyRunningJobIds: [Int64] = Array(detailsForCurrentlyRunningJobs.wrappedValue.keys) + let currentlyRunningJobIds: [Int64] = Array(currentlyRunningJobIds.wrappedValue) let dependantJobsNotCurrentlyRunning: [Job] = dependantJobs .filter { job in !currentlyRunningJobIds.contains(job.id ?? -1) } .sorted { lhs, rhs in (lhs.id ?? -1) < (rhs.id ?? -1) } @@ -1211,8 +1290,8 @@ private final class JobQueue { private func performCleanUp(for job: Job, result: JobRunner.JobResult, shouldTriggerCallbacks: Bool = true) { // The job is removed from the queue before it runs so all we need to to is remove it // from the 'currentlyRunning' set - jobsCurrentlyRunning.mutate { $0 = $0.removing(job.id) } - detailsForCurrentlyRunningJobs.mutate { $0 = $0.removingValue(forKey: job.id) } + currentlyRunningJobIds.mutate { $0 = $0.removing(job.id) } + currentlyRunningJobInfo.mutate { $0 = $0.removingValue(forKey: job.id) } guard shouldTriggerCallbacks else { return }