Finished adding logic to ignore messages invalidated by config

Added timestamps to the Profile table to avoid overriding current profile info with older info
Updated the MessageReceiver to ignore the rest of the messages invalidated by the config
Updated to the latest libSession
Updated the JobRunner to expose some more info about the currently running jobs
Made some tweaks to the ConfigurationSyncJob to better support concurrent jobs running for different targets
pull/751/head
Morgan Pretty 2 years ago
parent 3b772b7f90
commit c455a13a7b

@ -1 +1 @@
Subproject commit 97084c69f86e67c675095b48efacc86113ccebb0 Subproject commit 9777b37e8545febcc082578341352dba7433db21

@ -219,11 +219,18 @@ enum Onboarding {
} }
func completeRegistration() { func completeRegistration() {
// Set the `lastDisplayNameUpdate` to the current date, so that we don't // Set the `lastNameUpdate` to the current date, so that we don't overwrite
// overwrite what the user set in the display name step with whatever we // what the user set in the display name step with whatever we find in their
// find in their swarm (otherwise the user could enter a display name and // swarm (otherwise the user could enter a display name and have it immediately
// have it immediately overwritten due to the config request running slow) // overwritten due to the config request running slow)
UserDefaults.standard[.lastDisplayNameUpdate] = Date() Storage.shared.write { db in
try Profile
.filter(id: getUserHexEncodedPublicKey(db))
.updateAllAndConfig(
db,
Profile.Columns.lastNameUpdate.set(to: Date().timeIntervalSince1970)
)
}
// Notify the app that registration is complete // Notify the app that registration is complete
Identity.didRegister() Identity.didRegister()

@ -158,7 +158,9 @@ enum MockDataGenerator {
id: randomSessionId, id: randomSessionId,
name: (0..<contactNameLength) name: (0..<contactNameLength)
.compactMap { _ in stringContent.randomElement(using: &dmThreadRandomGenerator) } .compactMap { _ in stringContent.randomElement(using: &dmThreadRandomGenerator) }
.joined() .joined(),
lastNameUpdate: Date().timeIntervalSince1970,
lastProfilePictureUpdate: Date().timeIntervalSince1970
) )
.saved(db) .saved(db)
@ -237,7 +239,9 @@ enum MockDataGenerator {
id: randomSessionId, id: randomSessionId,
name: (0..<contactNameLength) name: (0..<contactNameLength)
.compactMap { _ in stringContent.randomElement(using: &cgThreadRandomGenerator) } .compactMap { _ in stringContent.randomElement(using: &cgThreadRandomGenerator) }
.joined() .joined(),
lastNameUpdate: Date().timeIntervalSince1970,
lastProfilePictureUpdate: Date().timeIntervalSince1970
) )
.saved(db) .saved(db)
@ -365,7 +369,9 @@ enum MockDataGenerator {
id: randomSessionId, id: randomSessionId,
name: (0..<contactNameLength) name: (0..<contactNameLength)
.compactMap { _ in stringContent.randomElement(using: &ogThreadRandomGenerator) } .compactMap { _ in stringContent.randomElement(using: &ogThreadRandomGenerator) }
.joined() .joined(),
lastNameUpdate: Date().timeIntervalSince1970,
lastProfilePictureUpdate: Date().timeIntervalSince1970
) )
.saved(db) .saved(db)

@ -417,10 +417,12 @@ enum _003_YDBToGRDBMigration: Migration {
try Profile( try Profile(
id: legacyContact.sessionID, id: legacyContact.sessionID,
name: (legacyContact.name ?? legacyContact.sessionID), name: (legacyContact.name ?? legacyContact.sessionID),
lastNameUpdate: 0,
nickname: legacyContact.nickname, nickname: legacyContact.nickname,
profilePictureUrl: legacyContact.profilePictureURL, profilePictureUrl: legacyContact.profilePictureURL,
profilePictureFileName: legacyContact.profilePictureFileName, profilePictureFileName: legacyContact.profilePictureFileName,
profileEncryptionKey: legacyContact.profileEncryptionKey?.keyData profileEncryptionKey: legacyContact.profileEncryptionKey?.keyData,
lastProfilePictureUpdate: 0
).migrationSafeInsert(db) ).migrationSafeInsert(db)
/// **Note:** The blow "shouldForce" flags are here to allow us to avoid having to run legacy migrations they /// **Note:** The blow "shouldForce" flags are here to allow us to avoid having to run legacy migrations they
@ -641,7 +643,9 @@ enum _003_YDBToGRDBMigration: Migration {
// constraint violation // constraint violation
try? Profile( try? Profile(
id: profileId, id: profileId,
name: profileId name: profileId,
lastNameUpdate: 0,
lastProfilePictureUpdate: 0
).migrationSafeSave(db) ).migrationSafeSave(db)
} }
@ -1053,7 +1057,9 @@ enum _003_YDBToGRDBMigration: Migration {
// constraint violation // constraint violation
try Profile( try Profile(
id: quotedMessage.authorId, id: quotedMessage.authorId,
name: quotedMessage.authorId name: quotedMessage.authorId,
lastNameUpdate: 0,
lastProfilePictureUpdate: 0
).migrationSafeSave(db) ).migrationSafeSave(db)
} }

@ -20,6 +20,16 @@ enum _013_SessionUtilChanges: Migration {
t.add(.pinnedPriority, .integer) t.add(.pinnedPriority, .integer)
} }
// Add `lastNameUpdate` and `lastProfilePictureUpdate` columns to the profile table
try db.alter(table: Profile.self) { t in
t.add(.lastNameUpdate, .integer)
.notNull()
.defaults(to: 0)
t.add(.lastProfilePictureUpdate, .integer)
.notNull()
.defaults(to: 0)
}
// SQLite doesn't support adding a new primary key after creation so we need to create a new table with // SQLite doesn't support adding a new primary key after creation so we need to create a new table with
// the setup we want, copy data from the old table over, drop the old table and rename the new table // the setup we want, copy data from the old table over, drop the old table and rename the new table
struct TmpGroupMember: Codable, TableRecord, FetchableRecord, PersistableRecord, ColumnExpressible { struct TmpGroupMember: Codable, TableRecord, FetchableRecord, PersistableRecord, ColumnExpressible {

@ -20,11 +20,13 @@ public struct Profile: Codable, Identifiable, Equatable, Hashable, FetchableReco
case id case id
case name case name
case lastNameUpdate
case nickname case nickname
case profilePictureUrl case profilePictureUrl
case profilePictureFileName case profilePictureFileName
case profileEncryptionKey case profileEncryptionKey
case lastProfilePictureUpdate
} }
/// The id for the user that owns the profile (Note: This could be a sessionId, a blindedId or some future variant) /// The id for the user that owns the profile (Note: This could be a sessionId, a blindedId or some future variant)
@ -33,6 +35,9 @@ public struct Profile: Codable, Identifiable, Equatable, Hashable, FetchableReco
/// The name of the contact. Use this whenever you need the "real", underlying name of a user (e.g. when sending a message). /// The name of the contact. Use this whenever you need the "real", underlying name of a user (e.g. when sending a message).
public let name: String public let name: String
/// The timestamp (in seconds since epoch) that the name was last updated
public let lastNameUpdate: TimeInterval
/// A custom name for the profile set by the current user /// A custom name for the profile set by the current user
public let nickname: String? public let nickname: String?
@ -45,22 +50,29 @@ public struct Profile: Codable, Identifiable, Equatable, Hashable, FetchableReco
/// The key with which the profile is encrypted. /// The key with which the profile is encrypted.
public let profileEncryptionKey: Data? public let profileEncryptionKey: Data?
/// The timestamp (in seconds since epoch) that the profile picture was last updated
public let lastProfilePictureUpdate: TimeInterval
// MARK: - Initialization // MARK: - Initialization
public init( public init(
id: String, id: String,
name: String, name: String,
lastNameUpdate: TimeInterval,
nickname: String? = nil, nickname: String? = nil,
profilePictureUrl: String? = nil, profilePictureUrl: String? = nil,
profilePictureFileName: String? = nil, profilePictureFileName: String? = nil,
profileEncryptionKey: Data? = nil profileEncryptionKey: Data? = nil,
lastProfilePictureUpdate: TimeInterval
) { ) {
self.id = id self.id = id
self.name = name self.name = name
self.lastNameUpdate = lastNameUpdate
self.nickname = nickname self.nickname = nickname
self.profilePictureUrl = profilePictureUrl self.profilePictureUrl = profilePictureUrl
self.profilePictureFileName = profilePictureFileName self.profilePictureFileName = profilePictureFileName
self.profileEncryptionKey = profileEncryptionKey self.profileEncryptionKey = profileEncryptionKey
self.lastProfilePictureUpdate = lastProfilePictureUpdate
} }
// MARK: - Description // MARK: - Description
@ -97,10 +109,12 @@ public extension Profile {
self = Profile( self = Profile(
id: try container.decode(String.self, forKey: .id), id: try container.decode(String.self, forKey: .id),
name: try container.decode(String.self, forKey: .name), name: try container.decode(String.self, forKey: .name),
lastNameUpdate: try container.decode(TimeInterval.self, forKey: .lastNameUpdate),
nickname: try? container.decode(String.self, forKey: .nickname), nickname: try? container.decode(String.self, forKey: .nickname),
profilePictureUrl: profilePictureUrl, profilePictureUrl: profilePictureUrl,
profilePictureFileName: try? container.decode(String.self, forKey: .profilePictureFileName), profilePictureFileName: try? container.decode(String.self, forKey: .profilePictureFileName),
profileEncryptionKey: profileKey profileEncryptionKey: profileKey,
lastProfilePictureUpdate: try container.decode(TimeInterval.self, forKey: .lastProfilePictureUpdate)
) )
} }
@ -109,10 +123,12 @@ public extension Profile {
try container.encode(id, forKey: .id) try container.encode(id, forKey: .id)
try container.encode(name, forKey: .name) try container.encode(name, forKey: .name)
try container.encode(lastNameUpdate, forKey: .lastNameUpdate)
try container.encodeIfPresent(nickname, forKey: .nickname) try container.encodeIfPresent(nickname, forKey: .nickname)
try container.encodeIfPresent(profilePictureUrl, forKey: .profilePictureUrl) try container.encodeIfPresent(profilePictureUrl, forKey: .profilePictureUrl)
try container.encodeIfPresent(profilePictureFileName, forKey: .profilePictureFileName) try container.encodeIfPresent(profilePictureFileName, forKey: .profilePictureFileName)
try container.encodeIfPresent(profileEncryptionKey, forKey: .profileEncryptionKey) try container.encodeIfPresent(profileEncryptionKey, forKey: .profileEncryptionKey)
try container.encode(lastProfilePictureUpdate, forKey: .lastProfilePictureUpdate)
} }
} }
@ -124,6 +140,7 @@ public extension Profile {
var profileKey: Data? var profileKey: Data?
var profilePictureUrl: String? var profilePictureUrl: String?
let sentTimestamp: TimeInterval = (proto.hasTimestamp ? (TimeInterval(proto.timestamp) / 1000) : 0)
// If we have both a `profileKey` and a `profilePicture` then the key MUST be valid // If we have both a `profileKey` and a `profilePicture` then the key MUST be valid
if let profileKeyData: Data = proto.profileKey, profileProto.profilePicture != nil { if let profileKeyData: Data = proto.profileKey, profileProto.profilePicture != nil {
@ -134,10 +151,12 @@ public extension Profile {
return Profile( return Profile(
id: id, id: id,
name: displayName, name: displayName,
lastNameUpdate: sentTimestamp,
nickname: nil, nickname: nil,
profilePictureUrl: profilePictureUrl, profilePictureUrl: profilePictureUrl,
profilePictureFileName: nil, profilePictureFileName: nil,
profileEncryptionKey: profileKey profileEncryptionKey: profileKey,
lastProfilePictureUpdate: sentTimestamp
) )
} }
@ -218,10 +237,12 @@ public extension Profile {
return Profile( return Profile(
id: id, id: id,
name: "", name: "",
lastNameUpdate: 0,
nickname: nil, nickname: nil,
profilePictureUrl: nil, profilePictureUrl: nil,
profilePictureFileName: nil, profilePictureFileName: nil,
profileEncryptionKey: nil profileEncryptionKey: nil,
lastProfilePictureUpdate: 0
) )
} }

@ -42,11 +42,11 @@ public enum AttachmentDownloadJob: JobExecutor {
// if an attachment ends up stuck in a "downloading" state incorrectly // if an attachment ends up stuck in a "downloading" state incorrectly
guard attachment.state != .downloading else { guard attachment.state != .downloading else {
let otherCurrentJobAttachmentIds: Set<String> = JobRunner let otherCurrentJobAttachmentIds: Set<String> = JobRunner
.defailsForCurrentlyRunningJobs(of: .attachmentDownload) .infoForCurrentlyRunningJobs(of: .attachmentDownload)
.filter { key, _ in key != job.id } .filter { key, _ in key != job.id }
.values .values
.compactMap { data -> String? in .compactMap { info -> String? in
guard let data: Data = data else { return nil } guard let data: Data = info.detailsData else { return nil }
return (try? JSONDecoder().decode(Details.self, from: data))? return (try? JSONDecoder().decode(Details.self, from: data))?
.attachmentId .attachmentId

@ -16,10 +16,30 @@ public enum ConfigMessageReceiveJob: JobExecutor {
failure: @escaping (Job, Error?, Bool) -> (), failure: @escaping (Job, Error?, Bool) -> (),
deferred: @escaping (Job) -> () deferred: @escaping (Job) -> ()
) { ) {
/// When the `configMessageReceive` job fails we want to unblock any `messageReceive` jobs it was blocking
/// to ensure the user isn't losing any messages - this generally _shouldn't_ happen but if it does then having a temporary
/// "outdated" state due to standard messages which would have been invalidated by a config change incorrectly being
/// processed is less severe then dropping a bunch on messages just because they were processed in the same poll as
/// invalid config messages
let removeDependencyOnMessageReceiveJobs: () -> () = {
guard let jobId: Int64 = job.id else { return }
Storage.shared.write { db in
try JobDependencies
.filter(JobDependencies.Columns.dependantId == jobId)
.joining(
required: JobDependencies.job
.filter(Job.Columns.variant == Job.Variant.messageReceive)
)
.deleteAll(db)
}
}
guard guard
let detailsData: Data = job.details, let detailsData: Data = job.details,
let details: Details = try? JSONDecoder().decode(Details.self, from: detailsData) let details: Details = try? JSONDecoder().decode(Details.self, from: detailsData)
else { else {
removeDependencyOnMessageReceiveJobs()
failure(job, JobRunnerError.missingRequiredDetails, true) failure(job, JobRunnerError.missingRequiredDetails, true)
return return
} }
@ -27,6 +47,7 @@ public enum ConfigMessageReceiveJob: JobExecutor {
// Ensure no standard messages are sent through this job // Ensure no standard messages are sent through this job
guard !details.messages.contains(where: { $0.variant != .sharedConfigMessage }) else { guard !details.messages.contains(where: { $0.variant != .sharedConfigMessage }) else {
SNLog("[ConfigMessageReceiveJob] Standard messages incorrectly sent to the 'configMessageReceive' job") SNLog("[ConfigMessageReceiveJob] Standard messages incorrectly sent to the 'configMessageReceive' job")
removeDependencyOnMessageReceiveJobs()
failure(job, MessageReceiverError.invalidMessage, true) failure(job, MessageReceiverError.invalidMessage, true)
return return
} }
@ -49,8 +70,10 @@ public enum ConfigMessageReceiveJob: JobExecutor {
// Handle the result // Handle the result
switch lastError { switch lastError {
case let error as MessageReceiverError where !error.isRetryable: failure(job, error, true) case .some(let error):
case .some(let error): failure(job, error, false) removeDependencyOnMessageReceiveJobs()
failure(job, error, true)
case .none: success(job, false) case .none: success(job, false)
} }
} }

@ -9,7 +9,7 @@ import SessionUtilitiesKit
public enum ConfigurationSyncJob: JobExecutor { public enum ConfigurationSyncJob: JobExecutor {
public static let maxFailureCount: Int = -1 public static let maxFailureCount: Int = -1
public static let requiresThreadId: Bool = false public static let requiresThreadId: Bool = true
public static let requiresInteractionId: Bool = false public static let requiresInteractionId: Bool = false
private static let maxRunFrequency: TimeInterval = 3 private static let maxRunFrequency: TimeInterval = 3
@ -25,13 +25,29 @@ public enum ConfigurationSyncJob: JobExecutor {
Identity.userCompletedRequiredOnboarding() Identity.userCompletedRequiredOnboarding()
else { return success(job, true) } else { return success(job, true) }
// On startup it's possible for multiple ConfigSyncJob's to run at the same time (which is // It's possible for multiple ConfigSyncJob's with the same target (user/group) to try to run at the
// redundant) so check if there is another job already running and, if so, defer this job // same time since as soon as one is started we will enqueue a second one, rather than adding dependencies
let jobDetails: [Int64: Data?] = JobRunner.defailsForCurrentlyRunningJobs(of: .configurationSync) // between the jobs we just continue to defer the subsequent job while the first one is running in
// order to prevent multiple configurationSync jobs with the same target from running at the same time
guard
JobRunner
.infoForCurrentlyRunningJobs(of: .configurationSync)
.filter({ key, info in
key != job.id && // Exclude this job
info.threadId == job.threadId // Exclude jobs for different ids
})
.isEmpty
else {
// Defer the job to run 'maxRunFrequency' from when this one ran (if we don't it'll try start
// it again immediately which is pointless)
let updatedJob: Job? = Storage.shared.write { db in
try job
.with(nextRunTimestamp: Date().timeIntervalSince1970 + maxRunFrequency)
.saved(db)
}
guard jobDetails.setting(job.id, nil).count == 0 else { SNLog("[ConfigurationSyncJob] For \(job.threadId ?? "UnknownId") deferred due to in progress job")
deferred(job) // We will re-enqueue when needed return deferred(updatedJob ?? job)
return
} }
// If we don't have a userKeyPair yet then there is no need to sync the configuration // If we don't have a userKeyPair yet then there is no need to sync the configuration
@ -42,16 +58,15 @@ public enum ConfigurationSyncJob: JobExecutor {
let pendingConfigChanges: [SessionUtil.OutgoingConfResult] = Storage.shared let pendingConfigChanges: [SessionUtil.OutgoingConfResult] = Storage.shared
.read({ db in try SessionUtil.pendingChanges(db, publicKey: publicKey) }) .read({ db in try SessionUtil.pendingChanges(db, publicKey: publicKey) })
else { else {
failure(job, StorageError.generic, false) SNLog("[ConfigurationSyncJob] For \(job.threadId ?? "UnknownId") failed due to invalid data")
return return failure(job, StorageError.generic, false)
} }
// If there are no pending changes then the job can just complete (next time something // If there are no pending changes then the job can just complete (next time something
// is updated we want to try and run immediately so don't scuedule another run in this case) // is updated we want to try and run immediately so don't scuedule another run in this case)
guard !pendingConfigChanges.isEmpty else { guard !pendingConfigChanges.isEmpty else {
SNLog("[ConfigurationSyncJob] Completed with no pending changes") SNLog("[ConfigurationSyncJob] For \(publicKey) completed with no pending changes")
success(job, true) return success(job, true)
return
} }
// Identify the destination and merge all obsolete hashes into a single set // Identify the destination and merge all obsolete hashes into a single set
@ -63,6 +78,8 @@ public enum ConfigurationSyncJob: JobExecutor {
.map { $0.obsoleteHashes } .map { $0.obsoleteHashes }
.reduce([], +) .reduce([], +)
.asSet() .asSet()
let jobStartTimestamp: TimeInterval = Date().timeIntervalSince1970
SNLog("[ConfigurationSyncJob] For \(publicKey) started with \(pendingConfigChanges.count) change\(pendingConfigChanges.count == 1 ? "" : "s")")
Storage.shared Storage.shared
.readPublisher { db in .readPublisher { db in
@ -119,9 +136,9 @@ public enum ConfigurationSyncJob: JobExecutor {
.sinkUntilComplete( .sinkUntilComplete(
receiveCompletion: { result in receiveCompletion: { result in
switch result { switch result {
case .finished: SNLog("[ConfigurationSyncJob] Completed") case .finished: SNLog("[ConfigurationSyncJob] For \(publicKey) completed")
case .failure(let error): case .failure(let error):
SNLog("[ConfigurationSyncJob] Failed due to error: \(error)") SNLog("[ConfigurationSyncJob] For \(publicKey) failed due to error: \(error)")
failure(job, error, false) failure(job, error, false)
} }
}, },
@ -137,7 +154,7 @@ public enum ConfigurationSyncJob: JobExecutor {
// When we complete the 'ConfigurationSync' job we want to immediately schedule // When we complete the 'ConfigurationSync' job we want to immediately schedule
// another one with a 'nextRunTimestamp' set to the 'maxRunFrequency' value to // another one with a 'nextRunTimestamp' set to the 'maxRunFrequency' value to
// throttle the config sync requests // throttle the config sync requests
let nextRunTimestamp: TimeInterval = (Date().timeIntervalSince1970 + maxRunFrequency) let nextRunTimestamp: TimeInterval = (jobStartTimestamp + maxRunFrequency)
// If another 'ConfigurationSync' job was scheduled then update that one // If another 'ConfigurationSync' job was scheduled then update that one
// to run at 'nextRunTimestamp' and make the current job stop // to run at 'nextRunTimestamp' and make the current job stop
@ -146,6 +163,7 @@ public enum ConfigurationSyncJob: JobExecutor {
.filter(Job.Columns.id != job.id) .filter(Job.Columns.id != job.id)
.filter(Job.Columns.variant == Job.Variant.configurationSync) .filter(Job.Columns.variant == Job.Variant.configurationSync)
.filter(Job.Columns.threadId == publicKey) .filter(Job.Columns.threadId == publicKey)
.order(Job.Columns.nextRunTimestamp.asc)
.fetchOne(db) .fetchOne(db)
{ {
// If the next job isn't currently running then delay it's start time // If the next job isn't currently running then delay it's start time
@ -175,7 +193,6 @@ public enum ConfigurationSyncJob: JobExecutor {
// MARK: - Convenience // MARK: - Convenience
public extension ConfigurationSyncJob { public extension ConfigurationSyncJob {
static func enqueue(_ db: Database, publicKey: String) { static func enqueue(_ db: Database, publicKey: String) {
// FIXME: Remove this once `useSharedUtilForUserConfig` is permanent // FIXME: Remove this once `useSharedUtilForUserConfig` is permanent
guard SessionUtil.userConfigsEnabled(db) else { guard SessionUtil.userConfigsEnabled(db) else {

@ -21,13 +21,14 @@ public enum MessageReceiverError: LocalizedError {
case noGroupKeyPair case noGroupKeyPair
case invalidSharedConfigMessageHandling case invalidSharedConfigMessageHandling
case requiredThreadNotInConfig case requiredThreadNotInConfig
case outdatedMessage
public var isRetryable: Bool { public var isRetryable: Bool {
switch self { switch self {
case .duplicateMessage, .duplicateMessageNewSnode, .duplicateControlMessage, case .duplicateMessage, .duplicateMessageNewSnode, .duplicateControlMessage,
.invalidMessage, .unknownMessage, .unknownEnvelopeType, .invalidSignature, .invalidMessage, .unknownMessage, .unknownEnvelopeType, .invalidSignature,
.noData, .senderBlocked, .noThread, .selfSend, .decryptionFailed, .noData, .senderBlocked, .noThread, .selfSend, .decryptionFailed,
.invalidSharedConfigMessageHandling, .requiredThreadNotInConfig: .invalidSharedConfigMessageHandling, .requiredThreadNotInConfig, .outdatedMessage:
return false return false
default: return true default: return true
@ -57,6 +58,7 @@ public enum MessageReceiverError: LocalizedError {
case .invalidSharedConfigMessageHandling: return "Invalid handling of a shared config message." case .invalidSharedConfigMessageHandling: return "Invalid handling of a shared config message."
case .requiredThreadNotInConfig: return "Required thread not in config." case .requiredThreadNotInConfig: return "Required thread not in config."
case .outdatedMessage: return "Message was sent before a config change which would have removed the message."
} }
} }
} }

@ -13,8 +13,31 @@ extension MessageReceiver {
threadVariant: SessionThread.Variant, threadVariant: SessionThread.Variant,
message: CallMessage message: CallMessage
) throws { ) throws {
let timestampMs: Int64 = (message.sentTimestamp.map { Int64($0) } ?? SnodeAPI.currentOffsetTimestampMs())
// Only support calls from contact threads // Only support calls from contact threads
guard threadVariant == .contact else { return } guard
threadVariant == .contact,
/// Only process the message if the thread `shouldBeVisible` or it was sent after the libSession buffer period
(
SessionThread
.filter(id: threadId)
.filter(SessionThread.Columns.shouldBeVisible == true)
.isNotEmpty(db) ||
SessionUtil.conversationInConfig(
db,
threadId: threadId,
threadVariant: threadVariant,
visibleOnly: true
) ||
SessionUtil.canPerformChange(
db,
threadId: threadId,
targetConfig: .contacts,
changeTimestampMs: timestampMs
)
)
else { return }
switch message.kind { switch message.kind {
case .preOffer: try MessageReceiver.handleNewCallMessage(db, message: message) case .preOffer: try MessageReceiver.handleNewCallMessage(db, message: message)

@ -77,7 +77,28 @@ extension MessageReceiver {
targetConfig: .userGroups, targetConfig: .userGroups,
changeTimestampMs: Int64(sentTimestamp) changeTimestampMs: Int64(sentTimestamp)
) )
else { return SNLog("Ignoring outdated NEW legacy group message due to more recent config state") } else {
// If the closed group already exists then store the encryption keys (just in case - there can be
// some weird edge-cases where we don't have keys we need if we don't store them)
let groupPublicKey: String = publicKeyAsData.toHexString()
let receivedTimestamp: TimeInterval = (TimeInterval(SnodeAPI.currentOffsetTimestampMs()) / 1000)
let newKeyPair: ClosedGroupKeyPair = ClosedGroupKeyPair(
threadId: groupPublicKey,
publicKey: Data(encryptionKeyPair.publicKey),
secretKey: Data(encryptionKeyPair.secretKey),
receivedTimestamp: receivedTimestamp
)
guard
ClosedGroup.filter(id: groupPublicKey).isNotEmpty(db),
!ClosedGroupKeyPair
.filter(ClosedGroupKeyPair.Columns.threadKeyPairHash == newKeyPair.threadKeyPairHash)
.isNotEmpty(db)
else { return SNLog("Ignoring outdated NEW legacy group message due to more recent config state") }
try newKeyPair.insert(db)
return
}
try handleNewClosedGroup( try handleNewClosedGroup(
db, db,
@ -591,9 +612,10 @@ extension MessageReceiver {
message.sentTimestamp.map { Int64($0) } ?? message.sentTimestamp.map { Int64($0) } ??
SnodeAPI.currentOffsetTimestampMs() SnodeAPI.currentOffsetTimestampMs()
) )
// Only actually make the change if SessionUtil says we can (we always want to insert the info // Only actually make the change if SessionUtil says we can (we always want to insert the info
// message though) // message though)
if SessionUtil.canPerformChange(db, threadId: threadId, targetConfig: .userGroups, changeTimestampMs: timestampMs ) { if SessionUtil.canPerformChange(db, threadId: threadId, targetConfig: .userGroups, changeTimestampMs: timestampMs) {
// Legacy groups used these control messages for making changes, new groups only use them // Legacy groups used these control messages for making changes, new groups only use them
// for information purposes // for information purposes
switch threadVariant { switch threadVariant {

@ -3,6 +3,7 @@
import Foundation import Foundation
import GRDB import GRDB
import SessionSnodeKit import SessionSnodeKit
import SessionUtilitiesKit
extension MessageReceiver { extension MessageReceiver {
internal static func handleDataExtractionNotification( internal static func handleDataExtractionNotification(
@ -11,11 +12,45 @@ extension MessageReceiver {
threadVariant: SessionThread.Variant, threadVariant: SessionThread.Variant,
message: DataExtractionNotification message: DataExtractionNotification
) throws { ) throws {
let timestampMs: Int64 = (
message.sentTimestamp.map { Int64($0) } ??
SnodeAPI.currentOffsetTimestampMs()
)
guard guard
threadVariant == .contact, threadVariant == .contact,
let sender: String = message.sender, let sender: String = message.sender,
let messageKind: DataExtractionNotification.Kind = message.kind let messageKind: DataExtractionNotification.Kind = message.kind
else { return } else { throw MessageReceiverError.invalidMessage }
/// Only process the message if the thread `shouldBeVisible` or it was sent after the libSession buffer period
guard
SessionThread
.filter(id: threadId)
.filter(SessionThread.Columns.shouldBeVisible == true)
.isNotEmpty(db) ||
SessionUtil.conversationInConfig(
db,
threadId: threadId,
threadVariant: threadVariant,
visibleOnly: true
) ||
SessionUtil.canPerformChange(
db,
threadId: threadId,
targetConfig: {
switch threadVariant {
case .contact:
let currentUserPublicKey: String = getUserHexEncodedPublicKey(db)
return (threadId == currentUserPublicKey ? .userProfile : .contacts)
default: return .userGroups
}
}(),
changeTimestampMs: timestampMs
)
else { throw MessageReceiverError.outdatedMessage }
_ = try Interaction( _ = try Interaction(
serverHash: message.serverHash, serverHash: message.serverHash,
@ -27,10 +62,7 @@ extension MessageReceiver {
case .mediaSaved: return .infoMediaSavedNotification case .mediaSaved: return .infoMediaSavedNotification
} }
}(), }(),
timestampMs: ( timestampMs: timestampMs
message.sentTimestamp.map { Int64($0) } ??
SnodeAPI.currentOffsetTimestampMs()
)
).inserted(db) ).inserted(db)
} }
} }

@ -15,9 +15,9 @@ extension MessageReceiver {
// Only process these for contact and legacy groups (new groups handle it separately) // Only process these for contact and legacy groups (new groups handle it separately)
(threadVariant == .contact || threadVariant == .legacyGroup), (threadVariant == .contact || threadVariant == .legacyGroup),
let sender: String = message.sender let sender: String = message.sender
else { return } else { throw MessageReceiverError.invalidMessage }
// Update the configuration // Generate an updated configuration
// //
// Note: Messages which had been sent during the previous configuration will still // Note: Messages which had been sent during the previous configuration will still
// use it's settings (so if you enable, send a message and then disable disappearing // use it's settings (so if you enable, send a message and then disable disappearing
@ -34,18 +34,40 @@ extension MessageReceiver {
DisappearingMessagesConfiguration.defaultDuration DisappearingMessagesConfiguration.defaultDuration
) )
) )
let timestampMs: Int64 = Int64(message.sentTimestamp ?? 0) // Default to `0` if not set
// Legacy closed groups need to update the SessionUtil // Only actually make the change if SessionUtil says we can (we always want to insert the info
switch threadVariant { // message though)
case .legacyGroup: let canPerformChange: Bool = SessionUtil.canPerformChange(
try SessionUtil db,
.update( threadId: threadId,
db, targetConfig: {
groupPublicKey: threadId, switch threadVariant {
disappearingConfig: config case .contact:
) let currentUserPublicKey: String = getUserHexEncodedPublicKey(db)
return (threadId == currentUserPublicKey ? .userProfile : .contacts)
default: return .userGroups
}
}(),
changeTimestampMs: timestampMs
)
default: break // Only update libSession if we can perform the change
if canPerformChange {
// Legacy closed groups need to update the SessionUtil
switch threadVariant {
case .legacyGroup:
try SessionUtil
.update(
db,
groupPublicKey: threadId,
disappearingConfig: config
)
default: break
}
} }
// Add an info message for the user // Add an info message for the user
@ -60,11 +82,14 @@ extension MessageReceiver {
nil nil
) )
), ),
timestampMs: Int64(message.sentTimestamp ?? 0) // Default to `0` if not set timestampMs: timestampMs
).inserted(db) ).inserted(db)
// Finally save the changes to the DisappearingMessagesConfiguration (If it's a duplicate // Only save the updated config if we can perform the change
// then the interaction unique constraint will prevent the code from getting here) if canPerformChange {
try config.save(db) // Finally save the changes to the DisappearingMessagesConfiguration (If it's a duplicate
// then the interaction unique constraint will prevent the code from getting here)
try config.save(db)
}
} }
} }

@ -13,11 +13,37 @@ extension MessageReceiver {
dependencies: SMKDependencies dependencies: SMKDependencies
) throws { ) throws {
let userPublicKey = getUserHexEncodedPublicKey(db, dependencies: dependencies) let userPublicKey = getUserHexEncodedPublicKey(db, dependencies: dependencies)
let timestampMs: Int64 = (
message.sentTimestamp.map { Int64($0) } ??
SnodeAPI.currentOffsetTimestampMs()
)
var blindedContactIds: [String] = [] var blindedContactIds: [String] = []
// Ignore messages which were sent from the current user // Ignore messages which were sent from the current user
guard message.sender != userPublicKey else { return } guard
guard let senderId: String = message.sender else { return } message.sender != userPublicKey,
let senderId: String = message.sender
else { throw MessageReceiverError.invalidMessage }
/// Only process the message if the thread `shouldBeVisible` or it was sent after the libSession buffer period
guard
SessionThread
.filter(id: senderId)
.filter(SessionThread.Columns.shouldBeVisible == true)
.isNotEmpty(db) ||
SessionUtil.conversationInConfig(
db,
threadId: senderId,
threadVariant: .contact,
visibleOnly: true
) ||
SessionUtil.canPerformChange(
db,
threadId: senderId,
targetConfig: .contacts,
changeTimestampMs: timestampMs
)
else { throw MessageReceiverError.outdatedMessage }
// Update profile if needed (want to do this regardless of whether the message exists or // Update profile if needed (want to do this regardless of whether the message exists or
// not to ensure the profile info gets sync between a users devices at every chance) // not to ensure the profile info gets sync between a users devices at every chance)
@ -134,10 +160,7 @@ extension MessageReceiver {
threadId: unblindedThread.id, threadId: unblindedThread.id,
authorId: senderId, authorId: senderId,
variant: .infoMessageRequestAccepted, variant: .infoMessageRequestAccepted,
timestampMs: ( timestampMs: timestampMs
message.sentTimestamp.map { Int64($0) } ??
SnodeAPI.currentOffsetTimestampMs()
)
).inserted(db) ).inserted(db)
} }

@ -3,6 +3,7 @@
import Foundation import Foundation
import GRDB import GRDB
import Sodium import Sodium
import SessionSnodeKit
import SessionUtilitiesKit import SessionUtilitiesKit
extension MessageReceiver { extension MessageReceiver {
@ -22,6 +23,32 @@ extension MessageReceiver {
// seconds to maintain the accuracy) // seconds to maintain the accuracy)
let messageSentTimestamp: TimeInterval = (TimeInterval(message.sentTimestamp ?? 0) / 1000) let messageSentTimestamp: TimeInterval = (TimeInterval(message.sentTimestamp ?? 0) / 1000)
let isMainAppActive: Bool = (UserDefaults.sharedLokiProject?[.isMainAppActive]).defaulting(to: false) let isMainAppActive: Bool = (UserDefaults.sharedLokiProject?[.isMainAppActive]).defaulting(to: false)
let currentUserPublicKey: String = getUserHexEncodedPublicKey(db, dependencies: dependencies)
/// Only process the message if the thread `shouldBeVisible` or it was sent after the libSession buffer period
guard
SessionThread
.filter(id: threadId)
.filter(SessionThread.Columns.shouldBeVisible == true)
.isNotEmpty(db) ||
SessionUtil.conversationInConfig(
db,
threadId: threadId,
threadVariant: threadVariant,
visibleOnly: true
) ||
SessionUtil.canPerformChange(
db,
threadId: threadId,
targetConfig: {
switch threadVariant {
case .contact: return (threadId == currentUserPublicKey ? .userProfile : .contacts)
default: return .userGroups
}
}(),
changeTimestampMs: (message.sentTimestamp.map { Int64($0) } ?? SnodeAPI.currentOffsetTimestampMs())
)
else { throw MessageReceiverError.outdatedMessage }
// Update profile if needed (want to do this regardless of whether the message exists or // Update profile if needed (want to do this regardless of whether the message exists or
// not to ensure the profile info gets sync between a users devices at every chance) // not to ensure the profile info gets sync between a users devices at every chance)
@ -63,7 +90,6 @@ extension MessageReceiver {
} }
// Store the message variant so we can run variant-specific behaviours // Store the message variant so we can run variant-specific behaviours
let currentUserPublicKey: String = getUserHexEncodedPublicKey(db, dependencies: dependencies)
let thread: SessionThread = try SessionThread let thread: SessionThread = try SessionThread
.fetchOrCreate(db, id: threadId, variant: threadVariant, shouldBeVisible: nil) .fetchOrCreate(db, id: threadId, variant: threadVariant, shouldBeVisible: nil)
let maybeOpenGroup: OpenGroup? = { let maybeOpenGroup: OpenGroup? = {

@ -31,7 +31,8 @@ internal extension SessionUtil {
static func handleContactsUpdate( static func handleContactsUpdate(
_ db: Database, _ db: Database,
in conf: UnsafeMutablePointer<config_object>?, in conf: UnsafeMutablePointer<config_object>?,
mergeNeedsDump: Bool mergeNeedsDump: Bool,
latestConfigSentTimestampMs: Int64
) throws { ) throws {
typealias ContactData = [ typealias ContactData = [
String: ( String: (
@ -64,6 +65,7 @@ internal extension SessionUtil {
let profileResult: Profile = Profile( let profileResult: Profile = Profile(
id: contactId, id: contactId,
name: String(libSessionVal: contact.name), name: String(libSessionVal: contact.name),
lastNameUpdate: (TimeInterval(latestConfigSentTimestampMs) / 1000),
nickname: String(libSessionVal: contact.nickname, nullIfEmpty: true), nickname: String(libSessionVal: contact.nickname, nullIfEmpty: true),
profilePictureUrl: profilePictureUrl, profilePictureUrl: profilePictureUrl,
profileEncryptionKey: (profilePictureUrl == nil ? nil : profileEncryptionKey: (profilePictureUrl == nil ? nil :
@ -71,7 +73,8 @@ internal extension SessionUtil {
libSessionVal: contact.profile_pic.key, libSessionVal: contact.profile_pic.key,
count: ProfileManager.avatarAES256KeyByteLength count: ProfileManager.avatarAES256KeyByteLength
) )
) ),
lastProfilePictureUpdate: (TimeInterval(latestConfigSentTimestampMs) / 1000)
) )
contactData[contactId] = ( contactData[contactId] = (
@ -99,12 +102,23 @@ internal extension SessionUtil {
// observation system can't differ between update calls which do and don't change anything) // observation system can't differ between update calls which do and don't change anything)
let contact: Contact = Contact.fetchOrCreate(db, id: sessionId) let contact: Contact = Contact.fetchOrCreate(db, id: sessionId)
let profile: Profile = Profile.fetchOrCreate(db, id: sessionId) let profile: Profile = Profile.fetchOrCreate(db, id: sessionId)
let profileNameShouldBeUpdated: Bool = (
!data.profile.name.isEmpty &&
profile.name != data.profile.name &&
profile.lastNameUpdate < data.profile.lastNameUpdate
)
let profilePictureShouldBeUpdated: Bool = (
(
profile.profilePictureUrl != data.profile.profilePictureUrl ||
profile.profileEncryptionKey != data.profile.profileEncryptionKey
) &&
profile.lastProfilePictureUpdate < data.profile.lastProfilePictureUpdate
)
if if
(!data.profile.name.isEmpty && profile.name != data.profile.name) || profileNameShouldBeUpdated ||
profile.nickname != data.profile.nickname || profile.nickname != data.profile.nickname ||
profile.profilePictureUrl != data.profile.profilePictureUrl || profilePictureShouldBeUpdated
profile.profileEncryptionKey != data.profile.profileEncryptionKey
{ {
try profile.save(db) try profile.save(db)
try Profile try Profile
@ -112,9 +126,12 @@ internal extension SessionUtil {
.updateAll( // Handling a config update so don't use `updateAllAndConfig` .updateAll( // Handling a config update so don't use `updateAllAndConfig`
db, db,
[ [
(data.profile.name.isEmpty || profile.name == data.profile.name ? nil : (!profileNameShouldBeUpdated ? nil :
Profile.Columns.name.set(to: data.profile.name) Profile.Columns.name.set(to: data.profile.name)
), ),
(!profileNameShouldBeUpdated ? nil :
Profile.Columns.lastNameUpdate.set(to: data.profile.lastNameUpdate)
),
(profile.nickname == data.profile.nickname ? nil : (profile.nickname == data.profile.nickname ? nil :
Profile.Columns.nickname.set(to: data.profile.nickname) Profile.Columns.nickname.set(to: data.profile.nickname)
), ),
@ -123,6 +140,9 @@ internal extension SessionUtil {
), ),
(profile.profileEncryptionKey != data.profile.profileEncryptionKey ? nil : (profile.profileEncryptionKey != data.profile.profileEncryptionKey ? nil :
Profile.Columns.profileEncryptionKey.set(to: data.profile.profileEncryptionKey) Profile.Columns.profileEncryptionKey.set(to: data.profile.profileEncryptionKey)
),
(!profilePictureShouldBeUpdated ? nil :
Profile.Columns.lastProfilePictureUpdate.set(to: data.profile.lastProfilePictureUpdate)
) )
].compactMap { $0 } ].compactMap { $0 }
) )

@ -326,7 +326,7 @@ internal extension SessionUtil {
.defaulting(to: 0) .defaulting(to: 0)
// Ensure the change occurred after the last config message was handled (minus the buffer period) // Ensure the change occurred after the last config message was handled (minus the buffer period)
return (changeTimestampMs > (configDumpTimestampMs - Int64(SessionUtil.configChangeBufferPeriod * 1000))) return (changeTimestampMs >= (configDumpTimestampMs - Int64(SessionUtil.configChangeBufferPeriod * 1000)))
} }
} }

@ -461,7 +461,8 @@ public enum SessionUtil {
try SessionUtil.handleContactsUpdate( try SessionUtil.handleContactsUpdate(
db, db,
in: conf, in: conf,
mergeNeedsDump: config_needs_dump(conf) mergeNeedsDump: config_needs_dump(conf),
latestConfigSentTimestampMs: latestConfigSentTimestampMs
) )
case .convoInfoVolatile: case .convoInfoVolatile:

@ -495,47 +495,28 @@ public struct ProfileManager {
// Name // Name
if let name: String = name, !name.isEmpty, name != profile.name { if let name: String = name, !name.isEmpty, name != profile.name {
let shouldUpdate: Bool = { // FIXME: Remove the `userConfigsEnabled` check once `useSharedUtilForUserConfig` is permanent
guard isCurrentUser else { return true } if sentTimestamp > profile.lastNameUpdate || (isCurrentUser && (calledFromConfigHandling || !SessionUtil.userConfigsEnabled(db))) {
return UserDefaults.standard[.lastDisplayNameUpdate]
.map { sentTimestamp > $0.timeIntervalSince1970 }
.defaulting(to: true)
}()
if shouldUpdate {
if isCurrentUser {
UserDefaults.standard[.lastDisplayNameUpdate] = Date(timeIntervalSince1970: sentTimestamp)
}
profileChanges.append(Profile.Columns.name.set(to: name)) profileChanges.append(Profile.Columns.name.set(to: name))
profileChanges.append(Profile.Columns.lastNameUpdate.set(to: sentTimestamp))
} }
} }
// Profile picture & profile key // Profile picture & profile key
var avatarNeedsDownload: Bool = false var avatarNeedsDownload: Bool = false
var targetAvatarUrl: String? = nil var targetAvatarUrl: String? = nil
let shouldUpdateAvatar: Bool = {
guard isCurrentUser else { return true }
return UserDefaults.standard[.lastProfilePictureUpdate] // FIXME: Remove the `userConfigsEnabled` check once `useSharedUtilForUserConfig` is permanent
.map { sentTimestamp > $0.timeIntervalSince1970 } if sentTimestamp > profile.lastProfilePictureUpdate || (isCurrentUser && (calledFromConfigHandling || !SessionUtil.userConfigsEnabled(db))) {
.defaulting(to: true)
}()
if shouldUpdateAvatar {
switch avatarUpdate { switch avatarUpdate {
case .none: break case .none: break
case .uploadImageData: preconditionFailure("Invalid options for this function") case .uploadImageData: preconditionFailure("Invalid options for this function")
case .remove: case .remove:
if isCurrentUser {
UserDefaults.standard[.lastProfilePictureUpdate] = Date(timeIntervalSince1970: sentTimestamp)
}
profileChanges.append(Profile.Columns.profilePictureUrl.set(to: nil)) profileChanges.append(Profile.Columns.profilePictureUrl.set(to: nil))
profileChanges.append(Profile.Columns.profileEncryptionKey.set(to: nil)) profileChanges.append(Profile.Columns.profileEncryptionKey.set(to: nil))
profileChanges.append(Profile.Columns.profilePictureFileName.set(to: nil)) profileChanges.append(Profile.Columns.profilePictureFileName.set(to: nil))
profileChanges.append(Profile.Columns.lastProfilePictureUpdate.set(to: sentTimestamp))
case .updateTo(let url, let key, let fileName): case .updateTo(let url, let key, let fileName):
if url != profile.profilePictureUrl { if url != profile.profilePictureUrl {
@ -558,6 +539,9 @@ public struct ProfileManager {
!ProfileManager.hasProfileImageData(with: fileName) !ProfileManager.hasProfileImageData(with: fileName)
) )
} }
// Update the 'lastProfilePictureUpdate' timestamp for either external or local changes
profileChanges.append(Profile.Columns.lastProfilePictureUpdate.set(to: sentTimestamp))
} }
} }

@ -65,7 +65,7 @@ public final class ProfilePictureView: UIView {
var iconSize: CGFloat { var iconSize: CGFloat {
switch self { switch self {
case .navigation, .message: return 8 case .navigation, .message: return 10 // Intentionally not a multiple of 4
case .list: return 16 case .list: return 16
case .hero: return 24 case .hero: return 24
} }

@ -7,8 +7,8 @@ public struct JobDependencies: Codable, Equatable, Hashable, FetchableRecord, Pe
public static var databaseTableName: String { "jobDependencies" } public static var databaseTableName: String { "jobDependencies" }
internal static let jobForeignKey = ForeignKey([Columns.jobId], to: [Job.Columns.id]) internal static let jobForeignKey = ForeignKey([Columns.jobId], to: [Job.Columns.id])
internal static let dependantForeignKey = ForeignKey([Columns.dependantId], to: [Job.Columns.id]) internal static let dependantForeignKey = ForeignKey([Columns.dependantId], to: [Job.Columns.id])
internal static let job = belongsTo(Job.self, using: jobForeignKey) public static let job = belongsTo(Job.self, using: jobForeignKey)
internal static let dependant = hasOne(Job.self, using: Job.dependencyForeignKey) public static let dependant = hasOne(Job.self, using: Job.dependencyForeignKey)
public typealias Columns = CodingKeys public typealias Columns = CodingKeys
public enum CodingKeys: String, CodingKey, ColumnExpression { public enum CodingKeys: String, CodingKey, ColumnExpression {

@ -38,8 +38,6 @@ public enum SNUserDefaults {
public enum Date: Swift.String { public enum Date: Swift.String {
case lastConfigurationSync case lastConfigurationSync
case lastDisplayNameUpdate
case lastProfilePictureUpdate
case lastProfilePictureUpload case lastProfilePictureUpload
case lastOpenGroupImageUpdate case lastOpenGroupImageUpdate
case lastOpen case lastOpen

@ -42,6 +42,12 @@ public final class JobRunner {
case notFound case notFound
} }
public struct JobInfo {
public let threadId: String?
public let interactionId: Int64?
public let detailsData: Data?
}
private static let blockingQueue: Atomic<JobQueue?> = Atomic( private static let blockingQueue: Atomic<JobQueue?> = Atomic(
JobQueue( JobQueue(
type: .blocking, type: .blocking,
@ -381,8 +387,8 @@ public final class JobRunner {
return (queues.wrappedValue[job.variant]?.isCurrentlyRunning(jobId) == true) return (queues.wrappedValue[job.variant]?.isCurrentlyRunning(jobId) == true)
} }
public static func defailsForCurrentlyRunningJobs(of variant: Job.Variant) -> [Int64: Data?] { public static func infoForCurrentlyRunningJobs(of variant: Job.Variant) -> [Int64: JobInfo] {
return (queues.wrappedValue[variant]?.detailsForAllCurrentlyRunningJobs()) return (queues.wrappedValue[variant]?.infoForAllCurrentlyRunningJobs())
.defaulting(to: [:]) .defaulting(to: [:])
} }
@ -395,11 +401,24 @@ public final class JobRunner {
queue.afterCurrentlyRunningJob(jobId, callback: callback) queue.afterCurrentlyRunningJob(jobId, callback: callback)
} }
public static func hasPendingOrRunningJob<T: Encodable>(with variant: Job.Variant, details: T) -> Bool { public static func hasPendingOrRunningJob<T: Encodable>(
with variant: Job.Variant,
threadId: String? = nil,
interactionId: Int64? = nil,
details: T? = nil
) -> Bool {
guard let targetQueue: JobQueue = queues.wrappedValue[variant] else { return false } guard let targetQueue: JobQueue = queues.wrappedValue[variant] else { return false }
guard let detailsData: Data = try? JSONEncoder().encode(details) else { return false }
return targetQueue.hasPendingOrRunningJob(with: detailsData) // Ensure we can encode the details (if provided)
let detailsData: Data? = details.map { try? JSONEncoder().encode($0) }
guard details == nil || detailsData != nil else { return false }
return targetQueue.hasPendingOrRunningJobWith(
threadId: threadId,
interactionId: interactionId,
detailsData: detailsData
)
} }
public static func removePendingJob(_ job: Job?) { public static func removePendingJob(_ job: Job?) {
@ -513,9 +532,9 @@ private final class JobQueue {
private var nextTrigger: Atomic<Trigger?> = Atomic(nil) private var nextTrigger: Atomic<Trigger?> = Atomic(nil)
fileprivate var isRunning: Atomic<Bool> = Atomic(false) fileprivate var isRunning: Atomic<Bool> = Atomic(false)
private var queue: Atomic<[Job]> = Atomic([]) private var queue: Atomic<[Job]> = Atomic([])
private var jobsCurrentlyRunning: Atomic<Set<Int64>> = Atomic([])
private var jobCallbacks: Atomic<[Int64: [(JobRunner.JobResult) -> ()]]> = Atomic([:]) private var jobCallbacks: Atomic<[Int64: [(JobRunner.JobResult) -> ()]]> = Atomic([:])
private var detailsForCurrentlyRunningJobs: Atomic<[Int64: Data?]> = Atomic([:]) private var currentlyRunningJobIds: Atomic<Set<Int64>> = Atomic([])
private var currentlyRunningJobInfo: Atomic<[Int64: JobRunner.JobInfo]> = Atomic([:])
private var deferLoopTracker: Atomic<[Int64: (count: Int, times: [TimeInterval])]> = Atomic([:]) private var deferLoopTracker: Atomic<[Int64: (count: Int, times: [TimeInterval])]> = Atomic([:])
fileprivate var hasPendingJobs: Bool { !queue.wrappedValue.isEmpty } fileprivate var hasPendingJobs: Bool { !queue.wrappedValue.isEmpty }
@ -620,7 +639,7 @@ private final class JobQueue {
} }
fileprivate func appDidBecomeActive(with jobs: [Job], canStart: Bool) { fileprivate func appDidBecomeActive(with jobs: [Job], canStart: Bool) {
let currentlyRunningJobIds: Set<Int64> = jobsCurrentlyRunning.wrappedValue let currentlyRunningJobIds: Set<Int64> = currentlyRunningJobIds.wrappedValue
queue.mutate { queue in queue.mutate { queue in
// Avoid re-adding jobs to the queue that are already in it (this can // Avoid re-adding jobs to the queue that are already in it (this can
@ -642,11 +661,11 @@ private final class JobQueue {
} }
fileprivate func isCurrentlyRunning(_ jobId: Int64) -> Bool { fileprivate func isCurrentlyRunning(_ jobId: Int64) -> Bool {
return jobsCurrentlyRunning.wrappedValue.contains(jobId) return currentlyRunningJobIds.wrappedValue.contains(jobId)
} }
fileprivate func detailsForAllCurrentlyRunningJobs() -> [Int64: Data?] { fileprivate func infoForAllCurrentlyRunningJobs() -> [Int64: JobRunner.JobInfo] {
return detailsForCurrentlyRunningJobs.wrappedValue return currentlyRunningJobInfo.wrappedValue
} }
fileprivate func afterCurrentlyRunningJob(_ jobId: Int64, callback: @escaping (JobRunner.JobResult) -> ()) { fileprivate func afterCurrentlyRunningJob(_ jobId: Int64, callback: @escaping (JobRunner.JobResult) -> ()) {
@ -660,14 +679,65 @@ private final class JobQueue {
} }
} }
fileprivate func hasPendingOrRunningJob(with detailsData: Data?) -> Bool { fileprivate func hasPendingOrRunningJobWith(
guard let detailsData: Data = detailsData else { return false } threadId: String? = nil,
interactionId: Int64? = nil,
detailsData: Data? = nil
) -> Bool {
let pendingJobs: [Job] = queue.wrappedValue let pendingJobs: [Job] = queue.wrappedValue
let currentlyRunningJobInfo: [Int64: JobRunner.JobInfo] = currentlyRunningJobInfo.wrappedValue
guard !pendingJobs.contains(where: { job in job.details == detailsData }) else { return true } var possibleJobIds: Set<Int64> = Set(currentlyRunningJobInfo.keys)
.inserting(contentsOf: pendingJobs.compactMap { $0.id }.asSet())
return detailsForCurrentlyRunningJobs.wrappedValue.values.contains(detailsData)
// Remove any which don't have the matching threadId (if provided)
if let targetThreadId: String = threadId {
let pendingJobIdsWithWrongThreadId: Set<Int64> = pendingJobs
.filter { $0.threadId != targetThreadId }
.compactMap { $0.id }
.asSet()
let runningJobIdsWithWrongThreadId: Set<Int64> = currentlyRunningJobInfo
.filter { _, info -> Bool in info.threadId != targetThreadId }
.map { key, _ in key }
.asSet()
possibleJobIds = possibleJobIds
.subtracting(pendingJobIdsWithWrongThreadId)
.subtracting(runningJobIdsWithWrongThreadId)
}
// Remove any which don't have the matching interactionId (if provided)
if let targetInteractionId: Int64 = interactionId {
let pendingJobIdsWithWrongInteractionId: Set<Int64> = pendingJobs
.filter { $0.interactionId != targetInteractionId }
.compactMap { $0.id }
.asSet()
let runningJobIdsWithWrongInteractionId: Set<Int64> = currentlyRunningJobInfo
.filter { _, info -> Bool in info.interactionId != targetInteractionId }
.map { key, _ in key }
.asSet()
possibleJobIds = possibleJobIds
.subtracting(pendingJobIdsWithWrongInteractionId)
.subtracting(runningJobIdsWithWrongInteractionId)
}
// Remove any which don't have the matching details (if provided)
if let targetDetailsData: Data = detailsData {
let pendingJobIdsWithWrongDetailsData: Set<Int64> = pendingJobs
.filter { $0.details != targetDetailsData }
.compactMap { $0.id }
.asSet()
let runningJobIdsWithWrongDetailsData: Set<Int64> = currentlyRunningJobInfo
.filter { _, info -> Bool in info.detailsData != detailsData }
.map { key, _ in key }
.asSet()
possibleJobIds = possibleJobIds
.subtracting(pendingJobIdsWithWrongDetailsData)
.subtracting(runningJobIdsWithWrongDetailsData)
}
return !possibleJobIds.isEmpty
} }
fileprivate func removePendingJob(_ jobId: Int64) { fileprivate func removePendingJob(_ jobId: Int64) {
@ -706,7 +776,7 @@ private final class JobQueue {
} }
// Get any pending jobs // Get any pending jobs
let jobIdsAlreadyRunning: Set<Int64> = jobsCurrentlyRunning.wrappedValue let jobIdsAlreadyRunning: Set<Int64> = currentlyRunningJobIds.wrappedValue
let jobsAlreadyInQueue: Set<Int64> = queue.wrappedValue.compactMap { $0.id }.asSet() let jobsAlreadyInQueue: Set<Int64> = queue.wrappedValue.compactMap { $0.id }.asSet()
let jobsToRun: [Job] = Storage.shared.read { db in let jobsToRun: [Job] = Storage.shared.read { db in
try Job try Job
@ -765,7 +835,7 @@ private final class JobQueue {
} }
guard let (nextJob, numJobsRemaining): (Job, Int) = queue.mutate({ queue in queue.popFirst().map { ($0, queue.count) } }) else { guard let (nextJob, numJobsRemaining): (Job, Int) = queue.mutate({ queue in queue.popFirst().map { ($0, queue.count) } }) else {
// If it's a serial queue, or there are no more jobs running then update the 'isRunning' flag // If it's a serial queue, or there are no more jobs running then update the 'isRunning' flag
if executionType != .concurrent || jobsCurrentlyRunning.wrappedValue.isEmpty { if executionType != .concurrent || currentlyRunningJobIds.wrappedValue.isEmpty {
isRunning.mutate { $0 = false } isRunning.mutate { $0 = false }
} }
@ -827,7 +897,7 @@ private final class JobQueue {
/// ///
/// **Note:** We don't add the current job back the the queue because it should only be re-added if it's dependencies /// **Note:** We don't add the current job back the the queue because it should only be re-added if it's dependencies
/// are successfully completed /// are successfully completed
let currentlyRunningJobIds: [Int64] = Array(detailsForCurrentlyRunningJobs.wrappedValue.keys) let currentlyRunningJobIds: [Int64] = Array(currentlyRunningJobIds.wrappedValue)
let dependencyJobsNotCurrentlyRunning: [Job] = dependencyInfo.jobs let dependencyJobsNotCurrentlyRunning: [Job] = dependencyInfo.jobs
.filter { job in !currentlyRunningJobIds.contains(job.id ?? -1) } .filter { job in !currentlyRunningJobIds.contains(job.id ?? -1) }
.sorted { lhs, rhs in (lhs.id ?? -1) < (rhs.id ?? -1) } .sorted { lhs, rhs in (lhs.id ?? -1) < (rhs.id ?? -1) }
@ -851,11 +921,20 @@ private final class JobQueue {
trigger?.invalidate() // Need to invalidate to prevent a memory leak trigger?.invalidate() // Need to invalidate to prevent a memory leak
trigger = nil trigger = nil
} }
jobsCurrentlyRunning.mutate { jobsCurrentlyRunning in currentlyRunningJobIds.mutate { currentlyRunningJobIds in
jobsCurrentlyRunning = jobsCurrentlyRunning.inserting(nextJob.id) currentlyRunningJobIds = currentlyRunningJobIds.inserting(nextJob.id)
numJobsRunning = jobsCurrentlyRunning.count numJobsRunning = currentlyRunningJobIds.count
}
currentlyRunningJobInfo.mutate { currentlyRunningJobInfo in
currentlyRunningJobInfo = currentlyRunningJobInfo.setting(
nextJob.id,
JobRunner.JobInfo(
threadId: nextJob.threadId,
interactionId: nextJob.interactionId,
detailsData: nextJob.details
)
)
} }
detailsForCurrentlyRunningJobs.mutate { $0 = $0.setting(nextJob.id, nextJob.details) }
SNLog("[JobRunner] \(queueContext) started \(nextJob.variant) job (\(executionType == .concurrent ? "\(numJobsRunning) currently running, " : "")\(numJobsRemaining) remaining)") SNLog("[JobRunner] \(queueContext) started \(nextJob.variant) job (\(executionType == .concurrent ? "\(numJobsRunning) currently running, " : "")\(numJobsRemaining) remaining)")
/// As it turns out Combine doesn't plat too nicely with concurrent Dispatch Queues, in Combine events are dispatched asynchronously to /// As it turns out Combine doesn't plat too nicely with concurrent Dispatch Queues, in Combine events are dispatched asynchronously to
@ -894,7 +973,7 @@ private final class JobQueue {
} }
private func scheduleNextSoonestJob() { private func scheduleNextSoonestJob() {
let jobIdsAlreadyRunning: Set<Int64> = jobsCurrentlyRunning.wrappedValue let jobIdsAlreadyRunning: Set<Int64> = currentlyRunningJobIds.wrappedValue
let nextJobTimestamp: TimeInterval? = Storage.shared.read { db in let nextJobTimestamp: TimeInterval? = Storage.shared.read { db in
try Job try Job
.filterPendingJobs( .filterPendingJobs(
@ -911,7 +990,7 @@ private final class JobQueue {
// If there are no remaining jobs or the JobRunner isn't allowed to start any queues then trigger // If there are no remaining jobs or the JobRunner isn't allowed to start any queues then trigger
// the 'onQueueDrained' callback and stop // the 'onQueueDrained' callback and stop
guard let nextJobTimestamp: TimeInterval = nextJobTimestamp, JobRunner.canStartQueues.wrappedValue else { guard let nextJobTimestamp: TimeInterval = nextJobTimestamp, JobRunner.canStartQueues.wrappedValue else {
if executionType != .concurrent || jobsCurrentlyRunning.wrappedValue.isEmpty { if executionType != .concurrent || currentlyRunningJobIds.wrappedValue.isEmpty {
self.onQueueDrained?() self.onQueueDrained?()
} }
return return
@ -922,7 +1001,7 @@ private final class JobQueue {
guard secondsUntilNextJob > 0 else { guard secondsUntilNextJob > 0 else {
// Only log that the queue is getting restarted if this queue had actually been about to stop // Only log that the queue is getting restarted if this queue had actually been about to stop
if executionType != .concurrent || jobsCurrentlyRunning.wrappedValue.isEmpty { if executionType != .concurrent || currentlyRunningJobIds.wrappedValue.isEmpty {
let timingString: String = (nextJobTimestamp == 0 ? let timingString: String = (nextJobTimestamp == 0 ?
"that should be in the queue" : "that should be in the queue" :
"scheduled \(Int(ceil(abs(secondsUntilNextJob)))) second\(Int(ceil(abs(secondsUntilNextJob))) == 1 ? "" : "s") ago" "scheduled \(Int(ceil(abs(secondsUntilNextJob)))) second\(Int(ceil(abs(secondsUntilNextJob))) == 1 ? "" : "s") ago"
@ -940,7 +1019,7 @@ private final class JobQueue {
} }
// Only schedule a trigger if this queue has actually completed // Only schedule a trigger if this queue has actually completed
guard executionType != .concurrent || jobsCurrentlyRunning.wrappedValue.isEmpty else { return } guard executionType != .concurrent || currentlyRunningJobIds.wrappedValue.isEmpty else { return }
// Setup a trigger // Setup a trigger
SNLog("[JobRunner] Stopping \(queueContext) until next job in \(Int(ceil(abs(secondsUntilNextJob)))) second\(Int(ceil(abs(secondsUntilNextJob))) == 1 ? "" : "s")") SNLog("[JobRunner] Stopping \(queueContext) until next job in \(Int(ceil(abs(secondsUntilNextJob)))) second\(Int(ceil(abs(secondsUntilNextJob))) == 1 ? "" : "s")")
@ -1029,7 +1108,7 @@ private final class JobQueue {
/// **Note:** If any of these `dependantJobs` have other dependencies then when they attempt to start they will be /// **Note:** If any of these `dependantJobs` have other dependencies then when they attempt to start they will be
/// removed from the queue, replaced by their dependencies /// removed from the queue, replaced by their dependencies
if !dependantJobs.isEmpty { if !dependantJobs.isEmpty {
let currentlyRunningJobIds: [Int64] = Array(detailsForCurrentlyRunningJobs.wrappedValue.keys) let currentlyRunningJobIds: [Int64] = Array(currentlyRunningJobIds.wrappedValue)
let dependantJobsNotCurrentlyRunning: [Job] = dependantJobs let dependantJobsNotCurrentlyRunning: [Job] = dependantJobs
.filter { job in !currentlyRunningJobIds.contains(job.id ?? -1) } .filter { job in !currentlyRunningJobIds.contains(job.id ?? -1) }
.sorted { lhs, rhs in (lhs.id ?? -1) < (rhs.id ?? -1) } .sorted { lhs, rhs in (lhs.id ?? -1) < (rhs.id ?? -1) }
@ -1211,8 +1290,8 @@ private final class JobQueue {
private func performCleanUp(for job: Job, result: JobRunner.JobResult, shouldTriggerCallbacks: Bool = true) { private func performCleanUp(for job: Job, result: JobRunner.JobResult, shouldTriggerCallbacks: Bool = true) {
// The job is removed from the queue before it runs so all we need to to is remove it // The job is removed from the queue before it runs so all we need to to is remove it
// from the 'currentlyRunning' set // from the 'currentlyRunning' set
jobsCurrentlyRunning.mutate { $0 = $0.removing(job.id) } currentlyRunningJobIds.mutate { $0 = $0.removing(job.id) }
detailsForCurrentlyRunningJobs.mutate { $0 = $0.removingValue(forKey: job.id) } currentlyRunningJobInfo.mutate { $0 = $0.removingValue(forKey: job.id) }
guard shouldTriggerCallbacks else { return } guard shouldTriggerCallbacks else { return }

Loading…
Cancel
Save