Cleaned up some TODOs, QA fixes and refetch on admin promotion

• Added logic to trigger a full refetch when getting promoted to admin within a group
• Added a timeout to display picture uploads
• Updated the code to inject the DisplayPictureManager via dependencies
• Updated the resend invite functionality to have a blocking loading indicator as well
• Refactored some closure-based logic for display picture uploads to use Combine instead
• Refactored the SnodeReceivedMessageInfo (with a migration) so it's no longer using a weird constructed key (now have individual queryable columns for the values)
pull/894/head
Morgan Pretty 4 months ago
parent 8d4365d89c
commit 800e32c633

@ -705,6 +705,7 @@
FD5E93D12C100FD70038C25A /* FileUploadResponse.swift in Sources */ = {isa = PBXBuildFile; fileRef = FDC4387127B5BB3B00C60D73 /* FileUploadResponse.swift */; };
FD5E93D22C12B0580038C25A /* AppVersionResponse.swift in Sources */ = {isa = PBXBuildFile; fileRef = FDC4383727B3863200C60D73 /* AppVersionResponse.swift */; };
FD61FCF92D308CC9005752DE /* GroupMemberSpec.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD61FCF82D308CC5005752DE /* GroupMemberSpec.swift */; };
FD61FCFB2D34A5EA005752DE /* _007_SplitSnodeReceivedMessageInfo.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD61FCFA2D34A5DE005752DE /* _007_SplitSnodeReceivedMessageInfo.swift */; };
FD65318A2AA025C500DFEEAA /* TestDependencies.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD6531892AA025C500DFEEAA /* TestDependencies.swift */; };
FD65318B2AA025C500DFEEAA /* TestDependencies.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD6531892AA025C500DFEEAA /* TestDependencies.swift */; };
FD65318C2AA025C500DFEEAA /* TestDependencies.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD6531892AA025C500DFEEAA /* TestDependencies.swift */; };
@ -1915,6 +1916,7 @@
FD5CE3442A3C5D96001A6DE3 /* DecryptExportedKey.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = DecryptExportedKey.swift; sourceTree = "<group>"; };
FD5D201D27B0D87C00FEA984 /* SessionId.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = SessionId.swift; sourceTree = "<group>"; };
FD61FCF82D308CC5005752DE /* GroupMemberSpec.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = GroupMemberSpec.swift; sourceTree = "<group>"; };
FD61FCFA2D34A5DE005752DE /* _007_SplitSnodeReceivedMessageInfo.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = _007_SplitSnodeReceivedMessageInfo.swift; sourceTree = "<group>"; };
FD6531892AA025C500DFEEAA /* TestDependencies.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = TestDependencies.swift; sourceTree = "<group>"; };
FD66CB272BF3449B00268FAB /* SessionSnodeKit.xctestplan */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text; path = SessionSnodeKit.xctestplan; sourceTree = "<group>"; };
FD66CB2B2BF344C600268FAB /* SessionMessageKit.xctestplan */ = {isa = PBXFileReference; lastKnownFileType = text; path = SessionMessageKit.xctestplan; sourceTree = "<group>"; };
@ -3816,6 +3818,7 @@
FD39353528F7C3390084DADA /* _004_FlagMessageHashAsDeletedOrInvalid.swift */,
FD6DF00A2ACFE40D0084BA4C /* _005_AddSnodeReveivedMessageInfoPrimaryKey.swift */,
FD7F74562BAA9D31006DDFD8 /* _006_DropSnodeCache.swift */,
FD61FCFA2D34A5DE005752DE /* _007_SplitSnodeReceivedMessageInfo.swift */,
);
path = Migrations;
sourceTree = "<group>";
@ -5915,6 +5918,7 @@
FD2272D62C34ED6A004D8A6C /* RetryWithDependencies.swift in Sources */,
FDF848D229405C5B007DCAE5 /* LegacyGetMessagesRequest.swift in Sources */,
FDF848E529405D6E007DCAE5 /* SnodeAPIError.swift in Sources */,
FD61FCFB2D34A5EA005752DE /* _007_SplitSnodeReceivedMessageInfo.swift in Sources */,
FDF848D529405C5B007DCAE5 /* DeleteAllMessagesResponse.swift in Sources */,
FD2272B22C33E337004D8A6C /* PreparedRequest.swift in Sources */,
FDF848BF29405C5A007DCAE5 /* SnodeResponse.swift in Sources */,
@ -7912,7 +7916,7 @@
CLANG_WARN__ARC_BRIDGE_CAST_NONARC = YES;
CLANG_WARN__DUPLICATE_METHOD_MATCH = YES;
CODE_SIGN_IDENTITY = "iPhone Developer";
CURRENT_PROJECT_VERSION = 525;
CURRENT_PROJECT_VERSION = 527;
ENABLE_BITCODE = NO;
ENABLE_STRICT_OBJC_MSGSEND = YES;
ENABLE_TESTABILITY = YES;
@ -7988,7 +7992,7 @@
CLANG_WARN__ARC_BRIDGE_CAST_NONARC = YES;
CLANG_WARN__DUPLICATE_METHOD_MATCH = YES;
CODE_SIGN_IDENTITY = "iPhone Distribution";
CURRENT_PROJECT_VERSION = 525;
CURRENT_PROJECT_VERSION = 527;
ENABLE_BITCODE = NO;
ENABLE_MODULE_VERIFIER = YES;
ENABLE_STRICT_OBJC_MSGSEND = YES;

@ -159,7 +159,7 @@ public final class SessionCall: CurrentCallProtocol, WebRTCSessionDelegate {
self.webRTCSession = WebRTCSession.current ?? WebRTCSession(for: sessionId, with: uuid, using: dependencies)
self.isOutgoing = outgoing
let avatarData: Data? = DisplayPictureManager.displayPicture(db, id: .user(sessionId), using: dependencies)
let avatarData: Data? = dependencies[singleton: .displayPictureManager].displayPicture(db, id: .user(sessionId))
self.contactName = Profile.displayName(db, id: sessionId, threadVariant: .contact, using: dependencies)
self.profilePicture = avatarData
.map { UIImage(data: $0) }

@ -359,7 +359,7 @@ class EditGroupViewModel: SessionTableViewModel, NavigatableStateHolder, Editabl
)
case (.standard, _, true):
self?.resendInvitation(memberId: memberInfo.profileId)
self?.resendInvitations(memberIds: [memberInfo.profileId])
case (.standard, _, false), (.zombie, _, _):
if !selectedIdsSubject.value.ids.contains(memberInfo.profileId) {
@ -676,13 +676,66 @@ class EditGroupViewModel: SessionTableViewModel, NavigatableStateHolder, Editabl
self.transitionToScreen(viewController, transitionType: .present)
}
private func resendInvitation(memberId: String) {
MessageSender.resendInvitation(
groupSessionId: threadId,
memberId: memberId,
using: dependencies
)
self.showToast(text: "groupInviteSending".putNumber(1).localized())
private func resendInvitations(memberIds: [String]) {
let viewController = ModalActivityIndicatorViewController(canCancel: false) { [weak self, dependencies, threadId] modalActivityIndicator in
MessageSender
.resendInvitations(
groupSessionId: threadId,
memberIds: memberIds,
using: dependencies
)
.sinkUntilComplete(
receiveCompletion: { [weak self] result in
modalActivityIndicator.dismiss {
switch result {
case .failure:
self?.transitionToScreen(
ConfirmationModal(
info: ConfirmationModal.Info(
title: "Invitation Failed",//.localized(),
body: .text("An error occurred and the invitations were not successfully sent, would you like to try again?"),//.localized()),
confirmTitle: "retry".localized(),
cancelTitle: "dismiss".localized(),
cancelStyle: .alert_text,
dismissOnConfirm: false,
onConfirm: { modal in
modal.dismiss(animated: true) {
self?.resendInvitations(memberIds: memberIds)
}
},
onCancel: { modal in
/// Flag the members as failed
dependencies[singleton: .storage].writeAsync { db in
try? GroupMember
.filter(GroupMember.Columns.groupId == threadId)
.filter(memberIds.contains(GroupMember.Columns.profileId))
.updateAllAndConfig(
db,
GroupMember.Columns.roleStatus.set(to: GroupMember.RoleStatus.failed),
using: dependencies
)
}
modal.dismiss(animated: true)
}
)
),
transitionType: .present
)
case .finished:
/// Show a toast that we have sent the invitations
self?.showToast(
text: "groupInviteSending"
.putNumber(memberIds.count)
.localized(),
backgroundColor: .backgroundSecondary
)
}
}
}
)
}
self.transitionToScreen(viewController, transitionType: .present)
}
private func removeMembers(currentGroupName: String, memberIds: Set<String>) {

@ -796,7 +796,7 @@ class ThreadSettingsViewModel: SessionTableViewModel, NavigatableStateHolder, Ob
case .contact:
guard
let profile: Profile = threadViewModel.profile,
let imageData: Data = DisplayPictureManager.displayPicture(owner: .user(profile), using: dependencies)
let imageData: Data = dependencies[singleton: .displayPictureManager].displayPicture(owner: .user(profile))
else { return }
displayPictureData = imageData
@ -805,7 +805,7 @@ class ThreadSettingsViewModel: SessionTableViewModel, NavigatableStateHolder, Ob
guard
threadViewModel.displayPictureFilename != nil,
let imageData: Data = dependencies[singleton: .storage].read({ [dependencies] db in
DisplayPictureManager.displayPicture(db, id: ownerId, using: dependencies)
dependencies[singleton: .displayPictureManager].displayPicture(db, id: ownerId)
})
else { return }
@ -1329,7 +1329,7 @@ class ThreadSettingsViewModel: SessionTableViewModel, NavigatableStateHolder, Ob
guard dependencies[feature: .updatedGroupsAllowDisplayPicture] else { return }
let existingImageData: Data? = dependencies[singleton: .storage].read { [threadId, dependencies] db in
DisplayPictureManager.displayPicture(db, id: .group(threadId), using: dependencies)
dependencies[singleton: .displayPictureManager].displayPicture(db, id: .group(threadId))
}
self.transitionToScreen(
ConfirmationModal(
@ -1444,42 +1444,48 @@ class ThreadSettingsViewModel: SessionTableViewModel, NavigatableStateHolder, Ob
case .groupRemove, .groupUpdateTo: performChanges(viewController, displayPictureUpdate)
case .groupUploadImageData(let data):
DisplayPictureManager.prepareAndUploadDisplayPicture(
queue: DispatchQueue.global(qos: .background),
imageData: data,
success: { url, fileName, key in
performChanges(viewController, .groupUpdateTo(url: url, key: key, fileName: fileName))
},
failure: { error in
DispatchQueue.main.async {
viewController.dismiss {
let message: String = {
switch (displayPictureUpdate, error) {
case (.groupRemove, _): return "profileDisplayPictureRemoveError".localized()
case (_, .uploadMaxFileSizeExceeded):
return "profileDisplayPictureSizeError".localized()
dependencies[singleton: .displayPictureManager]
.prepareAndUploadDisplayPicture(imageData: data)
.subscribe(on: DispatchQueue.global(qos: .background), using: dependencies)
.receive(on: DispatchQueue.main, using: dependencies)
.sinkUntilComplete(
receiveCompletion: { result in
switch result {
case .finished: break
case .failure(let error):
viewController.dismiss {
let message: String = {
switch (displayPictureUpdate, error) {
case (.groupRemove, _): return "profileDisplayPictureRemoveError".localized()
case (_, .uploadMaxFileSizeExceeded):
return "profileDisplayPictureSizeError".localized()
default: return "errorConnection".localized()
}
}()
default: return "errorConnection".localized()
}
}()
self?.transitionToScreen(
ConfirmationModal(
info: ConfirmationModal.Info(
title: "deleteAfterLegacyGroupsGroupUpdateErrorTitle".localized(),
body: .text(message),
cancelTitle: "okay".localized(),
cancelStyle: .alert_text,
dismissType: .single
self?.transitionToScreen(
ConfirmationModal(
info: ConfirmationModal.Info(
title: "deleteAfterLegacyGroupsGroupUpdateErrorTitle".localized(),
body: .text(message),
cancelTitle: "okay".localized(),
cancelStyle: .alert_text,
dismissType: .single
)
),
transitionType: .present
)
),
transitionType: .present
)
}
}
},
receiveValue: { url, fileName, key in
performChanges(
viewController,
.groupUpdateTo(url: url, key: key, fileName: fileName)
)
}
},
using: dependencies
)
)
}
}
self.transitionToScreen(viewController, transitionType: .present)

@ -143,7 +143,7 @@ public class SessionApp: SessionAppType {
$0.suspendNetworkAccess()
}
dependencies[singleton: .storage].resetAllStorage()
DisplayPictureManager.resetStorage(using: dependencies)
dependencies[singleton: .displayPictureManager].resetStorage()
Attachment.resetAttachmentStorage(using: dependencies)
dependencies[singleton: .notificationsManager].clearAllNotifications()
try? dependencies[singleton: .keychain].removeAll()

@ -356,8 +356,8 @@ extension OpenGroupSuggestionGrid {
fileprivate func update(with room: OpenGroupAPI.Room, openGroup: OpenGroup, using dependencies: Dependencies) {
label.text = room.name
imageView.image = DisplayPictureManager
.displayPicture(owner: .community(openGroup), using: dependencies)
imageView.image = dependencies[singleton: .displayPictureManager]
.displayPicture(owner: .community(openGroup))
.map { UIImage(data: $0) }
imageView.isHidden = (imageView.image == nil)
}

@ -510,7 +510,7 @@ class SettingsViewModel: SessionTableViewModel, NavigationItemSource, Navigatabl
private func updateProfilePicture(currentFileName: String?) {
let existingImageData: Data? = dependencies[singleton: .storage].read { [userSessionId, dependencies] db in
DisplayPictureManager.displayPicture(db, id: .user(userSessionId.hexString), using: dependencies)
dependencies[singleton: .displayPictureManager].displayPicture(db, id: .user(userSessionId.hexString))
}
self.transitionToScreen(
ConfirmationModal(
@ -583,50 +583,46 @@ class SettingsViewModel: SessionTableViewModel, NavigationItemSource, Navigatabl
onComplete: @escaping () -> ()
) {
let viewController = ModalActivityIndicatorViewController(canCancel: false) { [weak self, dependencies] modalActivityIndicator in
Profile.updateLocal(
queue: .global(qos: .default),
displayNameUpdate: displayNameUpdate,
displayPictureUpdate: displayPictureUpdate,
success: { db in
// Wait for the database transaction to complete before updating the UI
db.afterNextTransactionNested(using: dependencies) { _ in
DispatchQueue.main.async {
modalActivityIndicator.dismiss(completion: {
onComplete()
})
}
}
},
failure: { [weak self] error in
DispatchQueue.main.async {
Profile
.updateLocal(
displayNameUpdate: displayNameUpdate,
displayPictureUpdate: displayPictureUpdate,
using: dependencies
)
.subscribe(on: DispatchQueue.global(qos: .default), using: dependencies)
.receive(on: DispatchQueue.main, using: dependencies)
.sinkUntilComplete(
receiveCompletion: { result in
modalActivityIndicator.dismiss {
let message: String = {
switch (displayPictureUpdate, error) {
case (.currentUserRemove, _): return "profileDisplayPictureRemoveError".localized()
case (_, .uploadMaxFileSizeExceeded):
return "profileDisplayPictureSizeError".localized()
switch result {
case .finished: onComplete()
case .failure(let error):
let message: String = {
switch (displayPictureUpdate, error) {
case (.currentUserRemove, _): return "profileDisplayPictureRemoveError".localized()
case (_, .uploadMaxFileSizeExceeded):
return "profileDisplayPictureSizeError".localized()
default: return "errorConnection".localized()
}
}()
default: return "errorConnection".localized()
}
}()
self?.transitionToScreen(
ConfirmationModal(
info: ConfirmationModal.Info(
title: "profileErrorUpdate".localized(),
body: .text(message),
cancelTitle: "okay".localized(),
cancelStyle: .alert_text,
dismissType: .single
self?.transitionToScreen(
ConfirmationModal(
info: ConfirmationModal.Info(
title: "profileErrorUpdate".localized(),
body: .text(message),
cancelTitle: "okay".localized(),
cancelStyle: .alert_text,
dismissType: .single
)
),
transitionType: .present
)
),
transitionType: .present
)
}
}
}
},
using: dependencies
)
)
}
self.transitionToScreen(viewController, transitionType: .present)

@ -101,9 +101,9 @@ enum _022_GroupsRebuildChanges: Migration {
.fetchAll(db)
existingImageInfo.forEach { imageInfo in
let fileName: String = DisplayPictureManager.generateFilename(using: dependencies)
let fileName: String = dependencies[singleton: .displayPictureManager].generateFilename()
guard let filePath: String = try? DisplayPictureManager.filepath(for: fileName, using: dependencies) else {
guard let filePath: String = try? dependencies[singleton: .displayPictureManager].filepath(for: fileName) else {
Log.error("[GroupsRebuildChanges] Failed to generate community file path for current file name")
return
}

@ -389,7 +389,7 @@ public extension ClosedGroup {
/// historic access they can re-download and process all of the old messages
try threadIds.forEach { threadId in
try SnodeReceivedMessageInfo
.filter(SnodeReceivedMessageInfo.Columns.key.like("%\(threadId)%")) // stringlint:ignore
.filter(SnodeReceivedMessageInfo.Columns.swarmPublicKey == threadId)
.deleteAll(db)
}
}

@ -1407,8 +1407,8 @@ public extension Interaction {
.filter(interactionIds.contains(Reaction.Columns.interactionId))
.deleteAll(db)
/// Remove the `SnodeReceivedMessageInfo` records (otherwise we might try to poll for a hash which no longer exists, resulting
/// in fetching the last 14 days of messages)
/// Flag the `SnodeReceivedMessageInfo` records as invalid (otherwise we might try to poll for a hash which no longer
/// exists, resulting in fetching the last 14 days of messages)
let serverHashes: Set<String> = interactionInfo.compactMap(\.serverHash).asSet()
if !serverHashes.isEmpty {

@ -85,7 +85,6 @@ public enum ConfigurationSyncJob: JobExecutor {
let additionalSequenceRequests: AdditionalSequenceRequests? = (job.transientData as? AdditionalSequenceRequests)
Log.info(.cat, "For \(swarmPublicKey) started with changes: \(pendingChanges.pushData.count), old hashes: \(pendingChanges.obsoleteHashes.count)")
// TODO: Seems like the conversation list will randomly not get the last message (Lokinet updates???).
dependencies[singleton: .storage]
.readPublisher { db -> Network.PreparedRequest<Network.BatchResponse> in
try SnodeAPI.preparedSequence(

@ -59,13 +59,12 @@ public enum DisplayPictureDownloadJob: JobExecutor {
}()
else { return failure(job, JobRunnerError.missingRequiredDetails, true) }
let fileName: String = DisplayPictureManager.generateFilename(
let fileName: String = dependencies[singleton: .displayPictureManager].generateFilename(
for: (preparedDownload.destination.url?.absoluteString)
.defaulting(to: preparedDownload.destination.urlPathAndParamsString),
using: dependencies
.defaulting(to: preparedDownload.destination.urlPathAndParamsString)
)
guard let filePath: String = try? DisplayPictureManager.filepath(for: fileName, using: dependencies) else {
guard let filePath: String = try? dependencies[singleton: .displayPictureManager].filepath(for: fileName) else {
Log.error(.cat, "Failed to generate display picture file path for \(details.target)")
failure(job, DisplayPictureError.invalidFilename, true)
return

@ -349,7 +349,7 @@ public enum GarbageCollectionJob: JobExecutor {
// Delete any expired SnodeReceivedMessageInfo values associated to a specific node
try SnodeReceivedMessageInfo
.select(Column.rowID)
.filter(SnodeReceivedMessageInfo.Columns.expirationDateMs <= timestampNow)
.filter(SnodeReceivedMessageInfo.Columns.expirationDateMs <= (timestampNow * 1000))
.deleteAll(db)
}
},
@ -467,7 +467,7 @@ public enum GarbageCollectionJob: JobExecutor {
// Orphaned display picture files (actual deletion)
if finalTypesToCollect.contains(.orphanedDisplayPictures) {
let allDisplayPictureFilenames: Set<String> = (try? dependencies[singleton: .fileManager]
.contentsOfDirectory(atPath: DisplayPictureManager.sharedDataDisplayPictureDirPath(using: dependencies)))
.contentsOfDirectory(atPath: dependencies[singleton: .displayPictureManager].sharedDataDisplayPictureDirPath()))
.defaulting(to: [])
.asSet()
let orphanedFiles: Set<String> = allDisplayPictureFilenames
@ -478,7 +478,7 @@ public enum GarbageCollectionJob: JobExecutor {
// each one and store the error to be used to determine success/failure of the job
do {
try dependencies[singleton: .fileManager].removeItem(
atPath: DisplayPictureManager.filepath(for: filename, using: dependencies)
atPath: dependencies[singleton: .displayPictureManager].filepath(for: filename)
)
}
catch { deletionErrors.append(error) }

@ -52,24 +52,26 @@ public enum UpdateProfilePictureJob: JobExecutor {
// Note: The user defaults flag is updated in DisplayPictureManager
let profile: Profile = Profile.fetchOrCreateCurrentUser(using: dependencies)
let displayPictureUpdate: DisplayPictureManager.Update = profile.profilePictureFileName
.map { DisplayPictureManager.loadDisplayPictureFromDisk(for: $0, using: dependencies) }
.map { dependencies[singleton: .displayPictureManager].loadDisplayPictureFromDisk(for: $0) }
.map { .currentUserUploadImageData($0) }
.defaulting(to: .none)
Profile.updateLocal(
queue: queue,
displayPictureUpdate: displayPictureUpdate,
success: { db in
// Need to call the 'success' closure asynchronously on the queue to prevent a reentrancy
// issue as it will write to the database and this closure is already called within
// another database write
queue.async {
Log.info(.cat, "Profile successfully updated")
success(job, false)
Profile
.updateLocal(
displayPictureUpdate: displayPictureUpdate,
using: dependencies
)
.subscribe(on: queue, using: dependencies)
.receive(on: queue, using: dependencies)
.sinkUntilComplete(
receiveCompletion: { result in
switch result {
case .failure(let error): failure(job, error, false)
case .finished:
Log.info(.cat, "Profile successfully updated")
success(job, false)
}
}
},
failure: { error in failure(job, error, false) },
using: dependencies
)
)
}
}

@ -344,7 +344,9 @@ internal extension LibSession {
oldAvatarKey != (updatedProfile.profileEncryptionKey ?? Data(repeating: 0, count: DisplayPictureManager.aes256KeyByteLength))
)
{
DisplayPictureManager.displayPicture(owner: .user(updatedProfile), using: dependencies)
dependencies[singleton: .displayPictureManager].displayPicture(
owner: .user(updatedProfile)
)
}
// Store the updated contact (needs to happen before variables go out of scope)

@ -303,6 +303,19 @@ extension MessageReceiver {
GroupMember.Columns.roleStatus.set(to: GroupMember.RoleStatus.accepted),
using: dependencies
)
// Finally we want to invalidate the `lastHash` data for all messages in the group because
// admins get historic message access by default (if we don't do this then restoring a device
// would get all of the old messages and result in a conversation history that differs from
// devices that had the group before they were promoted
try SnodeReceivedMessageInfo
.filter(SnodeReceivedMessageInfo.Columns.swarmPublicKey == groupSessionId.hexString)
.filter(SnodeReceivedMessageInfo.Columns.namespace == SnodeAPI.Namespace.groupMessages.rawValue)
.updateAllAndConfig(
db,
SnodeReceivedMessageInfo.Columns.wasDeletedOrInvalid.set(to: true),
using: dependencies
)
}
private static func handleGroupInfoChanged(
@ -745,9 +758,9 @@ extension MessageReceiver {
switch result {
case .failure: break
case .finished:
/// Since the server deletion was successful we should also remove the `SnodeReceivedMessageInfo`
/// entries for the hashes (otherwise we might try to poll for a hash which no longer exists, resulting in fetching
/// the last 14 days of messages)
/// Since the server deletion was successful we should also flag the `SnodeReceivedMessageInfo`
/// entries for the hashes as invalid (otherwise we might try to poll for a hash which no longer exists,
/// resulting in fetching the last 14 days of messages)
dependencies[singleton: .storage].writeAsync { db in
try SnodeReceivedMessageInfo.handlePotentialDeletedOrInvalidHash(
db,

@ -599,8 +599,6 @@ extension MessageReceiver {
case .memberLeft = messageKind
else { return }
// TODO: [GROUPS REBUILD] If the current user is an admin then we need to actually remove the member from the group.
try processIfValid(
db,
threadId: threadId,

@ -88,9 +88,9 @@ extension MessageReceiver {
switch result {
case .failure: break
case .finished:
/// Since the server deletion was successful we should also remove the `SnodeReceivedMessageInfo`
/// entries for the hashes (otherwise we might try to poll for a hash which no longer exists, resulting in fetching
/// the last 14 days of messages)
/// Since the server deletion was successful we should also flag the `SnodeReceivedMessageInfo`
/// entries for the hashes as invalud (otherwise we might try to poll for a hash which no longer exists,
/// resulting in fetching the last 14 days of messages)
dependencies[singleton: .storage].writeAsync { db in
try SnodeReceivedMessageInfo.handlePotentialDeletedOrInvalidHash(
db,

@ -23,30 +23,22 @@ extension MessageSender {
members: [(String, Profile?)],
using dependencies: Dependencies
) -> AnyPublisher<SessionThread, Error> {
typealias ImageUploadResponse = (downloadUrl: String, fileName: String, encryptionKey: Data)
return Just(())
.setFailureType(to: Error.self)
.flatMap { _ -> AnyPublisher<ImageUploadResponse?, Error> in
.flatMap { _ -> AnyPublisher<DisplayPictureManager.UploadResult?, Error> in
guard let displayPictureData: Data = displayPictureData else {
return Just(nil)
.setFailureType(to: Error.self)
.eraseToAnyPublisher()
}
return Deferred {
Future<ImageUploadResponse?, Error> { resolver in
DisplayPictureManager.prepareAndUploadDisplayPicture(
queue: DispatchQueue.global(qos: .userInitiated),
imageData: displayPictureData,
success: { resolver(Result.success($0)) },
failure: { resolver(Result.failure($0)) },
using: dependencies
)
}
}.eraseToAnyPublisher()
return dependencies[singleton: .displayPictureManager]
.prepareAndUploadDisplayPicture(imageData: displayPictureData)
.mapError { error -> Error in error }
.map { Optional($0) }
.eraseToAnyPublisher()
}
.flatMap { (displayPictureInfo: ImageUploadResponse?) -> AnyPublisher<PreparedGroupData, Error> in
.flatMap { (displayPictureInfo: DisplayPictureManager.UploadResult?) -> AnyPublisher<PreparedGroupData, Error> in
dependencies[singleton: .storage].writePublisher { db -> PreparedGroupData in
// Create and cache the libSession entries
let createdInfo: LibSession.CreatedGroupInfo = try LibSession.createGroup(
@ -504,22 +496,22 @@ extension MessageSender {
using: dependencies
)
}
/// Since we have added them to `GROUP_MEMBERS` we may as well insert them into the database (even if the request
/// fails the local state will have already been updated anyway)
members.forEach { id, _ in
/// Add the member to the database
try? GroupMember(
groupId: sessionId.hexString,
profileId: id,
role: .standard,
roleStatus: .notSentYet,
isHidden: false
).upsert(db)
}
}
}
/// Since we have added them to `GROUP_MEMBERS` we may as well insert them into the database (even if the request
/// fails the local state will have already been updated anyway)
members.forEach { id, _ in
/// Add the member to the database
try? GroupMember(
groupId: sessionId.hexString,
profileId: id,
role: .standard,
roleStatus: .notSentYet,
isHidden: false
).upsert(db)
}
/// Generate the data needed to send the new members invitations to the group
let memberJobData: [MemberJobData] = (try? members
.map { id, profile in
@ -650,97 +642,131 @@ extension MessageSender {
.eraseToAnyPublisher()
}
public static func resendInvitation(
public static func resendInvitations(
groupSessionId: String,
memberId: String,
memberIds: [String],
using dependencies: Dependencies
) {
guard let sessionId: SessionId = try? SessionId(from: groupSessionId), sessionId.prefix == .group else { return }
) -> AnyPublisher<Void, Error> {
guard let sessionId: SessionId = try? SessionId(from: groupSessionId), sessionId.prefix == .group else {
return Fail(error: MessageSenderError.invalidClosedGroupUpdate).eraseToAnyPublisher()
}
dependencies[singleton: .storage].writeAsync { [dependencies] db in
guard
let groupIdentityPrivateKey: Data = try? ClosedGroup
.filter(id: groupSessionId)
.select(.groupIdentityPrivateKey)
.asRequest(of: Data.self)
.fetchOne(db)
else { throw MessageSenderError.invalidClosedGroupUpdate }
let memberInfo: (token: [UInt8], details: GroupInviteMemberJob.Details) = try dependencies.mutate(cache: .libSession) { cache in
return (
try dependencies[singleton: .crypto].tryGenerate(
.tokenSubaccount(
config: cache.config(for: .groupKeys, sessionId: sessionId),
groupSessionId: sessionId,
memberId: memberId
)
),
try GroupInviteMemberJob.Details(
memberSessionIdHexString: memberId,
authInfo: try dependencies[singleton: .crypto].tryGenerate(
.memberAuthData(
config: cache.config(for: .groupKeys, sessionId: sessionId),
groupSessionId: sessionId,
memberId: memberId
return dependencies[singleton: .storage]
.writePublisher { db -> ([GroupInviteMemberJob.Details], Network.PreparedRequest<Void>) in
guard
let groupIdentityPrivateKey: Data = try? ClosedGroup
.filter(id: groupSessionId)
.select(.groupIdentityPrivateKey)
.asRequest(of: Data.self)
.fetchOne(db)
else { throw MessageSenderError.invalidClosedGroupUpdate }
/// Perform the config changes without triggering a config sync (we will do so manually after the process completes)
try dependencies.mutate(cache: .libSession) { cache in
try cache.withCustomBehaviour(.skipAutomaticConfigSync, for: sessionId) {
try memberIds.forEach { memberId in
try LibSession.updateMemberStatus(
db,
groupSessionId: SessionId(.group, hex: groupSessionId),
memberId: memberId,
role: .standard,
status: .notSentYet,
profile: nil,
using: dependencies
)
)
/// If the current `GroupMember` isn't already in the `notSentYet` state then update them to be in it
let memberStatus: GroupMember.RoleStatus? = try GroupMember
.select(.roleStatus)
.filter(GroupMember.Columns.groupId == groupSessionId)
.filter(GroupMember.Columns.profileId == memberId)
.asRequest(of: GroupMember.RoleStatus.self)
.fetchOne(db)
if memberStatus != .notSentYet {
try GroupMember
.filter(GroupMember.Columns.groupId == groupSessionId)
.filter(GroupMember.Columns.profileId == memberId)
.updateAllAndConfig(
db,
GroupMember.Columns.roleStatus.set(to: GroupMember.RoleStatus.notSentYet),
using: dependencies
)
}
}
}
}
let memberInfo: [(token: [UInt8], details: GroupInviteMemberJob.Details)] = try memberIds
.map { memberId in
try dependencies.mutate(cache: .libSession) { cache in
return (
try dependencies[singleton: .crypto].tryGenerate(
.tokenSubaccount(
config: cache.config(for: .groupKeys, sessionId: sessionId),
groupSessionId: sessionId,
memberId: memberId
)
),
try GroupInviteMemberJob.Details(
memberSessionIdHexString: memberId,
authInfo: try dependencies[singleton: .crypto].tryGenerate(
.memberAuthData(
config: cache.config(for: .groupKeys, sessionId: sessionId),
groupSessionId: sessionId,
memberId: memberId
)
)
)
)
}
}
/// Unrevoke the member just in case they had previously gotten their access to the group revoked and the
/// unrevoke request when initially added them failed (fire-and-forget this request, we don't want it to be blocking)
let unrevokeRequest: Network.PreparedRequest<Void> = try SnodeAPI
.preparedUnrevokeSubaccounts(
subaccountsToUnrevoke: memberInfo.map { token, _ in token },
authMethod: Authentication.groupAdmin(
groupSessionId: sessionId,
ed25519SecretKey: Array(groupIdentityPrivateKey)
),
using: dependencies
)
)
return (memberInfo.map { _, jobDetails in jobDetails }, unrevokeRequest)
}
/// Unrevoke the member just in case they had previously gotten their access to the group revoked and the
/// unrevoke request when initially added them failed (fire-and-forget this request, we don't want it to be blocking)
try SnodeAPI
.preparedUnrevokeSubaccounts(
subaccountsToUnrevoke: [memberInfo.token],
authMethod: Authentication.groupAdmin(
groupSessionId: sessionId,
ed25519SecretKey: Array(groupIdentityPrivateKey)
),
using: dependencies
)
.send(using: dependencies)
.subscribe(on: DispatchQueue.global(qos: .background), using: dependencies)
.sinkUntilComplete()
try LibSession.updateMemberStatus(
db,
groupSessionId: SessionId(.group, hex: groupSessionId),
memberId: memberId,
role: .standard,
status: .notSentYet,
profile: nil,
using: dependencies
)
/// If the current `GroupMember` isn't already in the `notSentYet` state then update them to be in it
let existingMember: GroupMember? = try GroupMember
.filter(GroupMember.Columns.groupId == groupSessionId)
.filter(GroupMember.Columns.profileId == memberId)
.fetchOne(db)
if existingMember?.roleStatus != .notSentYet {
try GroupMember
.filter(GroupMember.Columns.groupId == groupSessionId)
.filter(GroupMember.Columns.profileId == memberId)
.updateAllAndConfig(
db,
GroupMember.Columns.roleStatus.set(to: GroupMember.RoleStatus.notSentYet),
.flatMap { memberJobData, unrevokeRequest -> AnyPublisher<[GroupInviteMemberJob.Details], Error> in
ConfigurationSyncJob
.run(
swarmPublicKey: sessionId.hexString,
beforeSequenceRequests: [unrevokeRequest],
requireAllBatchResponses: true,
using: dependencies
)
.map { _ in memberJobData }
.eraseToAnyPublisher()
}
/// Schedule a job to send an invitation to the newly added member
dependencies[singleton: .jobRunner].add(
db,
job: Job(
variant: .groupInviteMember,
threadId: groupSessionId,
details: memberInfo.details
),
canStartJob: true
.handleEvents(
receiveOutput: { memberJobData in
/// Schedule a job to send an invitation to the member
dependencies[singleton: .storage].writeAsync { db in
memberJobData.forEach { details in
dependencies[singleton: .jobRunner].add(
db,
job: Job(
variant: .groupInviteMember,
threadId: sessionId.hexString,
details: details
),
canStartJob: true
)
}
}
}
)
}
.map { _ in () }
.eraseToAnyPublisher()
}
public static func removeGroupMembers(

@ -236,8 +236,13 @@ public enum PushNotificationAPI {
SubscribeRequest.Subscription(
namespaces: {
switch sessionId.prefix {
// TODO: Confirm no config subscriptions for groups
case .group: return [.groupMessages, .revokedRetrievableGroupMessages]
case .group: return [
.groupMessages,
.configGroupKeys,
.configGroupInfo,
.configGroupMembers,
.revokedRetrievableGroupMessages
]
default: return [.default, .configConvoInfoVolatile]
}
}(),

@ -101,26 +101,32 @@ public class SwarmPoller: SwarmPollerType & PollerType {
/// Fetch the messages
return dependencies[singleton: .network]
.getSwarm(for: pollerDestination.target)
.tryFlatMapWithRandomSnode(drainBehaviour: _pollerDrainBehaviour, using: dependencies) { [pollerDestination, customAuthMethod, namespaces, dependencies] snode -> AnyPublisher<Network.PreparedRequest<SnodeAPI.PollResponse>, Error> in
dependencies[singleton: .storage].readPublisher { db -> Network.PreparedRequest<SnodeAPI.PollResponse> in
.tryFlatMapWithRandomSnode(drainBehaviour: _pollerDrainBehaviour, using: dependencies) { [pollerDestination, customAuthMethod, namespaces, dependencies] snode -> AnyPublisher<(LibSession.Snode, Network.PreparedRequest<SnodeAPI.PollResponse>), Error> in
dependencies[singleton: .storage].readPublisher { db -> (LibSession.Snode, Network.PreparedRequest<SnodeAPI.PollResponse>) in
let authMethod: AuthenticationMethod = try (customAuthMethod ?? Authentication.with(
db,
swarmPublicKey: pollerDestination.target,
using: dependencies
))
return try SnodeAPI.preparedPoll(
db,
namespaces: namespaces,
refreshingConfigHashes: configHashes,
from: snode,
authMethod: authMethod,
using: dependencies
return (
snode,
try SnodeAPI.preparedPoll(
db,
namespaces: namespaces,
refreshingConfigHashes: configHashes,
from: snode,
authMethod: authMethod,
using: dependencies
)
)
}
}
.flatMap { [dependencies] request in request.send(using: dependencies) }
.flatMap { [pollerDestination, shouldStoreMessages, dependencies] (_: ResponseInfoType, namespacedResults: SnodeAPI.PollResponse) -> AnyPublisher<(configMessageJobs: [Job], standardMessageJobs: [Job], pollResult: PollResult), Error> in
.flatMap { [dependencies] snode, request in
request.send(using: dependencies)
.map { _, response in (snode, response) }
}
.flatMap { [pollerDestination, shouldStoreMessages, dependencies] (snode: LibSession.Snode, namespacedResults: SnodeAPI.PollResponse) -> AnyPublisher<(configMessageJobs: [Job], standardMessageJobs: [Job], pollResult: PollResult), Error> in
// Get all of the messages and sort them by their required 'processingOrder'
let sortedMessages: [(namespace: SnodeAPI.Namespace, messages: [SnodeReceivedMessage])] = namespacedResults
.compactMap { namespace, result in (result.data?.messages).map { (namespace, $0) } }
@ -146,6 +152,29 @@ public class SwarmPoller: SwarmPollerType & PollerType {
var hadValidHashUpdate: Bool = false
return dependencies[singleton: .storage].writePublisher { db -> (configMessageJobs: [Job], standardMessageJobs: [Job], pollResult: PollResult) in
// If the poll was successful we need to retrieve the `lastHash` values
// direct from the database again to ensure they still line up (if they
// have been reset in the database then we want to ignore the poll as it
// would invalidate whatever change modified the `lastHash` values potentially
// resulting in us not polling again from scratch even if we want to)
let lastHashesAfterFetch: Set<String> = try Set(namespacedResults
.compactMap { namespace, _ in
try SnodeReceivedMessageInfo
.fetchLastNotExpired(
db,
for: snode,
namespace: namespace,
swarmPublicKey: pollerDestination.target,
using: dependencies
)?
.hash
})
guard lastHashes.isEmpty || Set(lastHashes) == lastHashesAfterFetch else {
return ([], [], ([], 0, 0, false))
}
// Since the hashes are still accurate we can now process the messages
let allProcessedMessages: [ProcessedMessage] = sortedMessages
.compactMap { namespace, messages -> [ProcessedMessage]? in
let processedMessages: [ProcessedMessage] = messages
@ -319,15 +348,15 @@ public class SwarmPoller: SwarmPollerType & PollerType {
return updatedJob
}
// Clean up message hashes and add some logs about the poll results
if sortedMessages.isEmpty && !hadValidHashUpdate {
// Update the cached validity of the messages
try SnodeReceivedMessageInfo.handlePotentialDeletedOrInvalidHash(
db,
potentiallyInvalidHashes: lastHashes,
otherKnownValidHashes: otherKnownHashes
)
}
// Update the cached validity of the messages
try SnodeReceivedMessageInfo.handlePotentialDeletedOrInvalidHash(
db,
potentiallyInvalidHashes: (sortedMessages.isEmpty && !hadValidHashUpdate ?
lastHashes :
[]
),
otherKnownValidHashes: otherKnownHashes
)
return (configMessageJobs, standardMessageJobs, (processedMessages, rawMessageCount, messageCount, hadValidHashUpdate))
}

@ -7,6 +7,7 @@ import Foundation
public enum DisplayPictureError: LocalizedError {
case imageTooLarge
case writeFailed
case databaseChangesFailed
case encryptionFailed
case uploadFailed
case uploadMaxFileSizeExceeded
@ -17,6 +18,7 @@ public enum DisplayPictureError: LocalizedError {
switch self {
case .imageTooLarge: return "Display picture too large."
case .writeFailed: return "Display picture write failed."
case .databaseChangesFailed: return "Failed to save display picture to database."
case .encryptionFailed: return "Display picture encryption failed."
case .uploadFailed: return "Display picture upload failed."
case .uploadMaxFileSizeExceeded: return "Maximum file size exceeded."

@ -6,6 +6,15 @@ import GRDB
import SessionSnodeKit
import SessionUtilitiesKit
// MARK: - Singleton
public extension Singleton {
static let displayPictureManager: SingletonConfig<DisplayPictureManager> = Dependencies.create(
identifier: "displayPictureManager",
createInstance: { dependencies in DisplayPictureManager(using: dependencies) }
)
}
// MARK: - Log.Category
public extension Log.Category {
@ -13,7 +22,10 @@ public extension Log.Category {
}
// MARK: - DisplayPictureManager
public struct DisplayPictureManager {
public class DisplayPictureManager {
public typealias UploadResult = (downloadUrl: String, fileName: String, encryptionKey: Data)
public enum Update {
case none
@ -35,8 +47,19 @@ public struct DisplayPictureManager {
internal static let nonceLength: Int = 12
internal static let tagLength: Int = 16
private static var scheduleDownloadsPublisher: AnyPublisher<Void, Never>?
private static let scheduleDownloadsTrigger: PassthroughSubject<(), Never> = PassthroughSubject()
private let dependencies: Dependencies
private let scheduleDownloads: PassthroughSubject<(), Never> = PassthroughSubject()
private var scheduleDownloadsCancellable: AnyCancellable?
// MARK: - Initalization
init(using dependencies: Dependencies) {
self.dependencies = dependencies
setupThrottledDownloading()
}
// MARK: - General
public static func isTooLong(profileUrl: String) -> Bool {
/// String.utf8CString will include the null terminator (Int8)0 as the end of string buffer.
@ -46,7 +69,7 @@ public struct DisplayPictureManager {
return (profileUrl.utf8CString.count > LibSession.sizeMaxProfileUrlBytes)
}
public static func sharedDataDisplayPictureDirPath(using dependencies: Dependencies) -> String {
public func sharedDataDisplayPictureDirPath() -> String {
let path: String = URL(fileURLWithPath: dependencies[singleton: .fileManager].appSharedDataDirectoryPath)
.appendingPathComponent("ProfileAvatars") // stringlint:ignore
.path
@ -57,11 +80,7 @@ public struct DisplayPictureManager {
// MARK: - Loading
public static func displayPicture(
_ db: Database,
id: OwnerId,
using dependencies: Dependencies
) -> Data? {
public func displayPicture(_ db: Database, id: OwnerId) -> Data? {
let maybeOwner: Owner? = {
switch id {
case .user(let id): return try? Profile.fetchOne(db, id: id).map { Owner.user($0) }
@ -72,41 +91,34 @@ public struct DisplayPictureManager {
guard let owner: Owner = maybeOwner else { return nil }
return displayPicture(owner: owner, using: dependencies)
return displayPicture(owner: owner)
}
@discardableResult public static func displayPicture(
owner: Owner,
using dependencies: Dependencies
) -> Data? {
@discardableResult public func displayPicture(owner: Owner) -> Data? {
switch (owner.fileName, owner.canDownloadImage) {
case (.some(let fileName), _):
return loadDisplayPicture(for: fileName, owner: owner, using: dependencies)
return loadDisplayPicture(for: fileName, owner: owner)
case (_, true):
scheduleDownload(for: owner, currentFileInvalid: false, using: dependencies)
scheduleDownload(for: owner, currentFileInvalid: false)
return nil
default: return nil
}
}
private static func loadDisplayPicture(
for fileName: String,
owner: Owner,
using dependencies: Dependencies
) -> Data? {
private func loadDisplayPicture(for fileName: String, owner: Owner) -> Data? {
if let cachedImageData: Data = dependencies[cache: .displayPicture].imageData[fileName] {
return cachedImageData
}
guard
!fileName.isEmpty,
let data: Data = loadDisplayPictureFromDisk(for: fileName, using: dependencies),
let data: Data = loadDisplayPictureFromDisk(for: fileName),
data.isValidImage
else {
// If we can't load the avatar or it's an invalid/corrupted image then clear it out and re-download
scheduleDownload(for: owner, currentFileInvalid: true, using: dependencies)
scheduleDownload(for: owner, currentFileInvalid: true)
return nil
}
@ -114,23 +126,22 @@ public struct DisplayPictureManager {
return data
}
public static func loadDisplayPictureFromDisk(for fileName: String, using dependencies: Dependencies) -> Data? {
guard let filePath: String = try? DisplayPictureManager.filepath(for: fileName, using: dependencies) else {
return nil
}
public func loadDisplayPictureFromDisk(for fileName: String) -> Data? {
guard let filePath: String = try? filepath(for: fileName) else { return nil }
return try? Data(contentsOf: URL(fileURLWithPath: filePath))
}
// MARK: - File Paths
public static func profileAvatarFilepath(
public func profileAvatarFilepath(
_ db: Database? = nil,
id: String,
using dependencies: Dependencies
id: String
) -> String? {
guard let db: Database = db else {
return dependencies[singleton: .storage].read { db in profileAvatarFilepath(db, id: id, using: dependencies) }
return dependencies[singleton: .storage].read { [weak self] db in
self?.profileAvatarFilepath(db, id: id)
}
}
let maybeFileName: String? = try? Profile
@ -139,10 +150,10 @@ public struct DisplayPictureManager {
.asRequest(of: String.self)
.fetchOne(db)
return maybeFileName.map { try? DisplayPictureManager.filepath(for: $0, using: dependencies) }
return maybeFileName.map { try? filepath(for: $0) }
}
public static func generateFilename(for url: String, using dependencies: Dependencies) -> String {
public func generateFilename(for url: String) -> String {
return (dependencies[singleton: .crypto]
.generate(.hash(message: url.bytes))?
.toHexString())
@ -150,7 +161,7 @@ public struct DisplayPictureManager {
.appendingFileExtension("jpg") // stringlint:ignore
}
public static func generateFilename(using dependencies: Dependencies) -> String {
public func generateFilename() -> String {
return dependencies[singleton: .crypto]
.generate(.uuid())
.defaulting(to: UUID())
@ -158,91 +169,76 @@ public struct DisplayPictureManager {
.appendingFileExtension("jpg") // stringlint:ignore
}
public static func filepath(for filename: String, using dependencies: Dependencies) throws -> String {
public func filepath(for filename: String) throws -> String {
guard !filename.isEmpty else { throw DisplayPictureError.invalidCall }
return URL(fileURLWithPath: sharedDataDisplayPictureDirPath(using: dependencies))
return URL(fileURLWithPath: sharedDataDisplayPictureDirPath())
.appendingPathComponent(filename)
.path
}
public static func resetStorage(using dependencies: Dependencies) {
public func resetStorage() {
try? dependencies[singleton: .fileManager].removeItem(
atPath: DisplayPictureManager.sharedDataDisplayPictureDirPath(using: dependencies)
atPath: sharedDataDisplayPictureDirPath()
)
}
// MARK: - Downloading
private static func scheduleDownload(
for owner: Owner,
currentFileInvalid invalid: Bool,
using dependencies: Dependencies
) {
dependencies.mutate(cache: .displayPicture) { cache in
cache.downloadsToSchedule.insert(DownloadInfo(owner: owner, currentFileInvalid: invalid))
}
/// This method can be triggered very frequently when processing messages so we want to throttle the updates to 250ms (it's for starting
/// avatar downloads so that should definitely be fast enough)
if scheduleDownloadsPublisher == nil {
scheduleDownloadsPublisher = scheduleDownloadsTrigger
.throttle(for: .milliseconds(250), scheduler: DispatchQueue.global(qos: .userInitiated), latest: true)
.handleEvents(
receiveOutput: { [dependencies] _ in
let pendingInfo: Set<DownloadInfo> = dependencies.mutate(cache: .displayPicture) { cache in
let result: Set<DownloadInfo> = cache.downloadsToSchedule
cache.downloadsToSchedule.removeAll()
return result
}
dependencies[singleton: .storage].writeAsync { db in
pendingInfo.forEach { info in
// If the current file is invalid then clear out the 'profilePictureFileName'
// and try to re-download the file
if info.currentFileInvalid {
info.owner.clearCurrentFile(db)
}
dependencies[singleton: .jobRunner].add(
db,
job: Job(
variant: .displayPictureDownload,
shouldBeUnique: true,
details: DisplayPictureDownloadJob.Details(owner: info.owner)
),
canStartJob: true
)
/// Profile picture downloads can be triggered very frequently when processing messages so we want to throttle the updates to
/// 250ms (it's for starting avatar downloads so that should definitely be fast enough)
private func setupThrottledDownloading() {
scheduleDownloadsCancellable = scheduleDownloads
.throttle(for: .milliseconds(250), scheduler: DispatchQueue.global(qos: .userInitiated), latest: true)
.sink(
receiveValue: { [dependencies] _ in
let pendingInfo: Set<DownloadInfo> = dependencies.mutate(cache: .displayPicture) { cache in
let result: Set<DownloadInfo> = cache.downloadsToSchedule
cache.downloadsToSchedule.removeAll()
return result
}
dependencies[singleton: .storage].writeAsync { db in
pendingInfo.forEach { info in
// If the current file is invalid then clear out the 'profilePictureFileName'
// and try to re-download the file
if info.currentFileInvalid {
info.owner.clearCurrentFile(db)
}
dependencies[singleton: .jobRunner].add(
db,
job: Job(
variant: .displayPictureDownload,
shouldBeUnique: true,
details: DisplayPictureDownloadJob.Details(owner: info.owner)
),
canStartJob: true
)
}
}
)
.map { _ in () }
.eraseToAnyPublisher()
scheduleDownloadsPublisher?.sinkUntilComplete()
}
)
}
private func scheduleDownload(for owner: Owner, currentFileInvalid invalid: Bool) {
dependencies.mutate(cache: .displayPicture) { cache in
cache.downloadsToSchedule.insert(DownloadInfo(owner: owner, currentFileInvalid: invalid))
}
scheduleDownloadsTrigger.send(())
scheduleDownloads.send(())
}
// MARK: - Uploading
public static func prepareAndUploadDisplayPicture(
queue: DispatchQueue,
imageData: Data,
success: @escaping ((downloadUrl: String, fileName: String, encryptionKey: Data)) -> (),
failure: ((DisplayPictureError) -> ())? = nil,
using dependencies: Dependencies
) {
queue.async(using: dependencies) {
// If the profile avatar was updated or removed then encrypt with a new profile key
// to ensure that other users know that our profile picture was updated
let newEncryptionKey: Data
let finalImageData: Data
let fileExtension: String
do {
public func prepareAndUploadDisplayPicture(imageData: Data) -> AnyPublisher<UploadResult, DisplayPictureError> {
return Just(())
.setFailureType(to: DisplayPictureError.self)
.tryMap { [weak self, dependencies] _ -> (Network.PreparedRequest<FileUploadResponse>, String, Data, Data) in
// If the profile avatar was updated or removed then encrypt with a new profile key
// to ensure that other users know that our profile picture was updated
let newEncryptionKey: Data
let finalImageData: Data
let fileExtension: String
let guessedFormat: ImageFormat = imageData.guessedImageFormat
finalImageData = try {
@ -301,80 +297,83 @@ public struct DisplayPictureManager {
default: return "jpg" // stringlint:ignore
}
}()
// If we have a new avatar image, we must first:
//
// * Write it to disk.
// * Encrypt it
// * Upload it to asset service
// * Send asset service info to Signal Service
Log.verbose(.displayPictureManager, "Updating local profile on service with new avatar.")
let fileName: String = dependencies[singleton: .crypto].generate(.uuid())
.defaulting(to: UUID())
.uuidString
.appendingFileExtension(fileExtension)
guard let filePath: String = try? self?.filepath(for: fileName) else {
throw DisplayPictureError.invalidFilename
}
// Write the avatar to disk
do { try finalImageData.write(to: URL(fileURLWithPath: filePath), options: [.atomic]) }
catch {
Log.error(.displayPictureManager, "Updating service with profile failed.")
throw DisplayPictureError.writeFailed
}
// Encrypt the avatar for upload
guard
let encryptedData: Data = dependencies[singleton: .crypto].generate(
.encryptedDataDisplayPicture(data: finalImageData, key: newEncryptionKey, using: dependencies)
)
else {
Log.error(.displayPictureManager, "Updating service with profile failed.")
throw DisplayPictureError.encryptionFailed
}
// Upload the avatar to the FileServer
guard
let preparedUpload: Network.PreparedRequest<FileUploadResponse> = try? Network.preparedUpload(
data: encryptedData,
requestAndPathBuildTimeout: Network.fileUploadTimeout,
using: dependencies
)
else {
Log.error(.displayPictureManager, "Updating service with profile failed.")
throw DisplayPictureError.uploadFailed
}
return (preparedUpload, fileName, newEncryptionKey, finalImageData)
}
catch let error as DisplayPictureError { return (failure?(error) ?? {}()) }
catch { return (failure?(DisplayPictureError.invalidCall) ?? {}()) }
// If we have a new avatar image, we must first:
//
// * Write it to disk.
// * Encrypt it
// * Upload it to asset service
// * Send asset service info to Signal Service
Log.verbose(.displayPictureManager, "Updating local profile on service with new avatar.")
let fileName: String = dependencies[singleton: .crypto].generate(.uuid())
.defaulting(to: UUID())
.uuidString
.appendingFileExtension(fileExtension)
guard let filePath: String = try? DisplayPictureManager.filepath(for: fileName, using: dependencies) else {
failure?(.invalidFilename)
return
}
// Write the avatar to disk
do { try finalImageData.write(to: URL(fileURLWithPath: filePath), options: [.atomic]) }
catch {
Log.error(.displayPictureManager, "Updating service with profile failed.")
failure?(.writeFailed)
return
.flatMap { [dependencies] preparedUpload, fileName, newEncryptionKey, finalImageData -> AnyPublisher<(FileUploadResponse, String, Data, Data), Error> in
preparedUpload.send(using: dependencies)
.map { _, response -> (FileUploadResponse, String, Data, Data) in
(response, fileName, newEncryptionKey, finalImageData)
}
.eraseToAnyPublisher()
}
// Encrypt the avatar for upload
guard
let encryptedData: Data = dependencies[singleton: .crypto].generate(
.encryptedDataDisplayPicture(data: finalImageData, key: newEncryptionKey, using: dependencies)
)
else {
Log.error(.displayPictureManager, "Updating service with profile failed.")
failure?(.encryptionFailed)
return
.mapError { error in
Log.error(.displayPictureManager, "Updating service with profile failed with error: \(error).")
switch error {
case NetworkError.maxFileSizeExceeded: return DisplayPictureError.uploadMaxFileSizeExceeded
case let displayPictureError as DisplayPictureError: return displayPictureError
default: return DisplayPictureError.uploadFailed
}
}
// Upload the avatar to the FileServer
guard let preparedUpload: Network.PreparedRequest<FileUploadResponse> = try? Network.preparedUpload(data: encryptedData, using: dependencies) else {
Log.error(.displayPictureManager, "Updating service with profile failed.")
failure?(.uploadFailed)
return
.map { [dependencies] fileUploadResponse, fileName, newEncryptionKey, finalImageData -> UploadResult in
let downloadUrl: String = Network.FileServer.downloadUrlString(for: fileUploadResponse.id)
// Update the cached avatar image value
dependencies.mutate(cache: .displayPicture) {
$0.imageData[fileName] = finalImageData
}
Log.verbose(.displayPictureManager, "Successfully uploaded avatar image.")
return (downloadUrl, fileName, newEncryptionKey)
}
preparedUpload
.send(using: dependencies)
.subscribe(on: DispatchQueue.global(qos: .userInitiated), using: dependencies)
.receive(on: queue, using: dependencies)
.sinkUntilComplete(
receiveCompletion: { result in
switch result {
case .finished: break
case .failure(let error):
Log.error(.displayPictureManager, "Updating service with profile failed with error: \(error).")
let isMaxFileSizeExceeded: Bool = ((error as? NetworkError) == .maxFileSizeExceeded)
failure?(isMaxFileSizeExceeded ? .uploadMaxFileSizeExceeded : .uploadFailed)
}
},
receiveValue: { _, fileUploadResponse in
let downloadUrl: String = Network.FileServer.downloadUrlString(for: fileUploadResponse.id)
// Update the cached avatar image value
dependencies.mutate(cache: .displayPicture) { $0.imageData[fileName] = finalImageData }
Log.verbose(.displayPictureManager, "Successfully uploaded avatar image.")
success((downloadUrl, fileName, newEncryptionKey))
}
)
}
.eraseToAnyPublisher()
}
}

@ -1,6 +1,7 @@
// Copyright © 2023 Rangeproof Pty Ltd. All rights reserved.
import UIKit.UIImage
import Combine
import GRDB
import SessionUtilitiesKit
@ -28,13 +29,10 @@ public extension Profile {
}
static func updateLocal(
queue: DispatchQueue,
displayNameUpdate: DisplayNameUpdate = .none,
displayPictureUpdate: DisplayPictureManager.Update = .none,
success: ((Database) throws -> ())? = nil,
failure: ((DisplayPictureError) -> ())? = nil,
using dependencies: Dependencies
) {
) -> AnyPublisher<Void, DisplayPictureError> {
let userSessionId: SessionId = dependencies[cache: .general].sessionId
let isRemovingAvatar: Bool = {
switch displayPictureUpdate {
@ -45,73 +43,76 @@ public extension Profile {
switch displayPictureUpdate {
case .contactRemove, .contactUpdateTo, .groupRemove, .groupUpdateTo, .groupUploadImageData:
failure?(DisplayPictureError.invalidCall)
return Fail(error: DisplayPictureError.invalidCall)
.eraseToAnyPublisher()
case .none, .currentUserRemove, .currentUserUpdateTo:
dependencies[singleton: .storage].writeAsync { db in
if isRemovingAvatar {
let existingProfileUrl: String? = try Profile
.filter(id: userSessionId.hexString)
.select(.profilePictureUrl)
.asRequest(of: String.self)
.fetchOne(db)
let existingProfileFileName: String? = try Profile
.filter(id: userSessionId.hexString)
.select(.profilePictureFileName)
.asRequest(of: String.self)
.fetchOne(db)
// Remove any cached avatar image value
if let fileName: String = existingProfileFileName {
dependencies.mutate(cache: .displayPicture) { $0.imageData[fileName] = nil }
return dependencies[singleton: .storage]
.writePublisher { db in
if isRemovingAvatar {
let existingProfileUrl: String? = try Profile
.filter(id: userSessionId.hexString)
.select(.profilePictureUrl)
.asRequest(of: String.self)
.fetchOne(db)
let existingProfileFileName: String? = try Profile
.filter(id: userSessionId.hexString)
.select(.profilePictureFileName)
.asRequest(of: String.self)
.fetchOne(db)
// Remove any cached avatar image value
if let fileName: String = existingProfileFileName {
dependencies.mutate(cache: .displayPicture) { $0.imageData[fileName] = nil }
}
switch existingProfileUrl {
case .some: Log.verbose(.profile, "Updating local profile on service with cleared avatar.")
case .none: Log.verbose(.profile, "Updating local profile on service with no avatar.")
}
}
switch existingProfileUrl {
case .some: Log.verbose(.profile, "Updating local profile on service with cleared avatar.")
case .none: Log.verbose(.profile, "Updating local profile on service with no avatar.")
}
try Profile.updateIfNeeded(
db,
publicKey: userSessionId.hexString,
displayNameUpdate: displayNameUpdate,
displayPictureUpdate: displayPictureUpdate,
sentTimestamp: dependencies.dateNow.timeIntervalSince1970,
using: dependencies
)
Log.info(.profile, "Successfully updated user profile.")
}
try Profile.updateIfNeeded(
db,
publicKey: userSessionId.hexString,
displayNameUpdate: displayNameUpdate,
displayPictureUpdate: displayPictureUpdate,
sentTimestamp: dependencies.dateNow.timeIntervalSince1970,
using: dependencies
)
Log.info(.profile, "Successfully updated user profile.")
try success?(db)
}
.mapError { _ in DisplayPictureError.databaseChangesFailed }
.eraseToAnyPublisher()
case .currentUserUploadImageData(let data):
DisplayPictureManager.prepareAndUploadDisplayPicture(
queue: queue,
imageData: data,
success: { downloadUrl, fileName, newProfileKey in
dependencies[singleton: .storage].writeAsync { db in
try Profile.updateIfNeeded(
db,
publicKey: userSessionId.hexString,
displayNameUpdate: displayNameUpdate,
displayPictureUpdate: .currentUserUpdateTo(
url: downloadUrl,
key: newProfileKey,
fileName: fileName
),
sentTimestamp: dependencies.dateNow.timeIntervalSince1970,
using: dependencies
)
dependencies[defaults: .standard, key: .lastProfilePictureUpload] = dependencies.dateNow
Log.info(.profile, "Successfully updated user profile.")
try success?(db)
return dependencies[singleton: .displayPictureManager]
.prepareAndUploadDisplayPicture(imageData: data)
.mapError { $0 as Error }
.flatMapStorageWritePublisher(using: dependencies, updates: { db, result in
try Profile.updateIfNeeded(
db,
publicKey: userSessionId.hexString,
displayNameUpdate: displayNameUpdate,
displayPictureUpdate: .currentUserUpdateTo(
url: result.downloadUrl,
key: result.encryptionKey,
fileName: result.fileName
),
sentTimestamp: dependencies.dateNow.timeIntervalSince1970,
using: dependencies
)
dependencies[defaults: .standard, key: .lastProfilePictureUpload] = dependencies.dateNow
Log.info(.profile, "Successfully updated user profile.")
})
.mapError { error in
switch error {
case let displayPictureError as DisplayPictureError: return displayPictureError
default: return DisplayPictureError.databaseChangesFailed
}
},
failure: failure,
using: dependencies
)
}
.eraseToAnyPublisher()
}
}
@ -193,9 +194,10 @@ public extension Profile {
}
// If we have already downloaded the image then no need to download it again
let maybeFilePath: String? = try? DisplayPictureManager.filepath(
for: fileName.defaulting(to: DisplayPictureManager.generateFilename(for: url, using: dependencies)),
using: dependencies
let maybeFilePath: String? = try? dependencies[singleton: .displayPictureManager].filepath(
for: fileName.defaulting(
to: dependencies[singleton: .displayPictureManager].generateFilename(for: url)
)
)
if avatarNeedsDownload, let filePath: String = maybeFilePath, !dependencies[singleton: .fileManager].fileExists(atPath: filePath) {

@ -47,9 +47,8 @@ public extension ProfilePictureView {
// either Community conversations or updated groups)
if let displayPictureFilename: String = displayPictureFilename {
return (Info(
imageData: DisplayPictureManager.displayPicture(
owner: .file(displayPictureFilename),
using: dependencies
imageData: dependencies[singleton: .displayPictureManager].displayPicture(
owner: .file(displayPictureFilename)
),
icon: profileIcon
), nil)
@ -88,7 +87,7 @@ public extension ProfilePictureView {
return (
Info(
imageData: (
profile.map { DisplayPictureManager.displayPicture(owner: .user($0), using: dependencies) } ??
profile.map { dependencies[singleton: .displayPictureManager].displayPicture(owner: .user($0)) } ??
PlaceholderIcon.generate(
seed: (profile?.id ?? publicKey),
text: (profile?.displayName(for: threadVariant))
@ -105,7 +104,7 @@ public extension ProfilePictureView {
.map { otherProfile in
Info(
imageData: (
DisplayPictureManager.displayPicture(owner: .user(otherProfile), using: dependencies) ??
dependencies[singleton: .displayPictureManager].displayPicture(owner: .user(otherProfile)) ??
PlaceholderIcon.generate(
seed: otherProfile.id,
text: otherProfile.displayName(for: threadVariant),
@ -137,7 +136,7 @@ public extension ProfilePictureView {
return (
Info(
imageData: (
profile.map { DisplayPictureManager.displayPicture(owner: .user($0), using: dependencies) } ??
profile.map { dependencies[singleton: .displayPictureManager].displayPicture(owner: .user($0)) } ??
PlaceholderIcon.generate(
seed: publicKey,
text: (profile?.displayName(for: threadVariant))

@ -24,6 +24,7 @@ class DisplayPictureDownloadJobSpec: QuickSpec {
}
@TestState(cache: .libSession, in: dependencies) var mockLibSessionCache: MockLibSessionCache! = MockLibSessionCache(
initialSetup: { cache in
cache.when { $0.config(for: .any, sessionId: .any) }.thenReturn(nil)
cache
.when { try $0.performAndPushChange(.any, for: .any, sessionId: .any, change: { _ in }) }
.thenReturn(())

@ -32,6 +32,7 @@ class MessageSendJobSpec: QuickSpec {
}
@TestState(cache: .libSession, in: dependencies) var mockLibSessionCache: MockLibSessionCache! = MockLibSessionCache(
initialSetup: { cache in
cache.when { $0.config(for: .any, sessionId: .any) }.thenReturn(nil)
cache
.when { $0.pinnedPriority(.any, threadId: .any, threadVariant: .any) }
.thenReturn(LibSession.defaultNewThreadPriority)

@ -648,7 +648,11 @@ class MessageSenderGroupsSpec: QuickSpec {
.sinkAndStore(in: &disposables)
let expectedRequest: Network.PreparedRequest<FileUploadResponse> = try Network
.preparedUpload(data: TestConstants.validImageData, using: dependencies)
.preparedUpload(
data: TestConstants.validImageData,
requestAndPathBuildTimeout: Network.fileUploadTimeout,
using: dependencies
)
expect(mockNetwork)
.to(call(.exactly(times: 1), matchingParameters: .all) { network in

@ -369,7 +369,7 @@ final class ShareNavController: UINavigationController, ShareViewDelegate {
}
private func loadItemProvider(itemProvider: NSItemProvider) -> AnyPublisher<LoadedItem, Error> {
Log.info("attachment: \(itemProvider)")
Log.info("utiTypes for attachment: \(itemProvider.registeredTypeIdentifiers)")
// We need to be very careful about which UTI type we use.
//
@ -403,7 +403,7 @@ final class ShareNavController: UINavigationController, ShareViewDelegate {
return
}
Log.info("value type: \(type(of: value))")
Log.debug("value type: \(type(of: value))")
switch value {
case let data as Data:
@ -595,10 +595,10 @@ final class ShareNavController: UINavigationController, ShareViewDelegate {
let attachment = SignalAttachment.attachment(dataSource: dataSource, type: specificType, imageQuality: .medium, using: dependencies)
if loadedItem.isConvertibleToContactShare {
Log.info("isConvertibleToContactShare")
Log.debug("isConvertibleToContactShare")
attachment.isConvertibleToContactShare = true
} else if loadedItem.isConvertibleToTextMessage {
Log.info("isConvertibleToTextMessage")
Log.debug("isConvertibleToTextMessage")
attachment.isConvertibleToTextMessage = true
}
return Just(attachment)
@ -704,8 +704,6 @@ private extension NSItemProvider {
}
var type: UTType? {
Log.info("utiTypeForItem: \(registeredTypeIdentifiers)")
switch (matches(type: .url), matches(type: .contact)) {
case (true, _): return .url
case (_, true): return .contact

@ -23,7 +23,8 @@ public enum SNSnodeKit: MigratableTarget { // Just to make the external API nice
[], // Fix thread FTS
[
_005_AddSnodeReveivedMessageInfoPrimaryKey.self,
_006_DropSnodeCache.self
_006_DropSnodeCache.self,
_007_SplitSnodeReceivedMessageInfo.self
]
]
)

@ -13,7 +13,7 @@ enum _001_InitialSetupMigration: Migration {
static let minExpectedRunDuration: TimeInterval = 0.1
static let fetchedTables: [(TableRecord & FetchableRecord).Type] = []
static let createdOrAlteredTables: [(TableRecord & FetchableRecord).Type] = [
LegacySnode.self, LegacySnodeSet.self, SnodeReceivedMessageInfo.self
LegacySnode.self, LegacySnodeSet.self, _001_InitialSetupMigration.LegacySnodeReceivedMessageInfo.self
]
static let droppedTables: [(TableRecord & FetchableRecord).Type] = []
@ -32,7 +32,7 @@ enum _001_InitialSetupMigration: Migration {
t.deprecatedColumn(name: "port", .integer)
}
try db.create(table: SnodeReceivedMessageInfo.self) { t in
try db.create(table: LegacySnodeReceivedMessageInfo.self) { t in
t.deprecatedColumn(name: "id", .integer)
.notNull()
.primaryKey(autoincrement: true)
@ -89,3 +89,22 @@ internal extension _001_InitialSetupMigration {
public let lmqPort: UInt16
}
}
internal extension _001_InitialSetupMigration {
struct LegacySnodeReceivedMessageInfo: Codable, FetchableRecord, PersistableRecord, TableRecord, ColumnExpressible {
public static var databaseTableName: String { "snodeReceivedMessageInfo" }
public typealias Columns = CodingKeys
public enum CodingKeys: String, CodingKey, ColumnExpression {
case key
case hash
case expirationDateMs
case wasDeletedOrInvalid
}
public let key: String
public let hash: String
public let expirationDateMs: Int64
public var wasDeletedOrInvalid: Bool?
}
}

@ -13,11 +13,13 @@ enum _004_FlagMessageHashAsDeletedOrInvalid: Migration {
static let needsConfigSync: Bool = false
static let minExpectedRunDuration: TimeInterval = 0.2
static let fetchedTables: [(TableRecord & FetchableRecord).Type] = []
static let createdOrAlteredTables: [(TableRecord & FetchableRecord).Type] = [SnodeReceivedMessageInfo.self]
static let createdOrAlteredTables: [(TableRecord & FetchableRecord).Type] = [
_001_InitialSetupMigration.LegacySnodeReceivedMessageInfo.self
]
static let droppedTables: [(TableRecord & FetchableRecord).Type] = []
static func migrate(_ db: Database, using dependencies: Dependencies) throws {
try db.alter(table: SnodeReceivedMessageInfo.self) { t in
try db.alter(table: _001_InitialSetupMigration.LegacySnodeReceivedMessageInfo.self) { t in
t.add(.wasDeletedOrInvalid, .boolean)
.indexed() // Faster querying
}

@ -10,11 +10,17 @@ enum _005_AddSnodeReveivedMessageInfoPrimaryKey: Migration {
static let identifier: String = "AddSnodeReveivedMessageInfoPrimaryKey"
static let needsConfigSync: Bool = false
static let minExpectedRunDuration: TimeInterval = 0.2
static let fetchedTables: [(TableRecord & FetchableRecord).Type] = [SnodeReceivedMessageInfo.self]
static let createdOrAlteredTables: [(TableRecord & FetchableRecord).Type] = [SnodeReceivedMessageInfo.self]
static let fetchedTables: [(TableRecord & FetchableRecord).Type] = [
_001_InitialSetupMigration.LegacySnodeReceivedMessageInfo.self
]
static let createdOrAlteredTables: [(TableRecord & FetchableRecord).Type] = [
_001_InitialSetupMigration.LegacySnodeReceivedMessageInfo.self
]
static let droppedTables: [(TableRecord & FetchableRecord).Type] = []
static func migrate(_ db: Database, using dependencies: Dependencies) throws {
typealias LegacyInfo = _001_InitialSetupMigration.LegacySnodeReceivedMessageInfo
// SQLite doesn't support adding a new primary key after creation so we need to create a new table with
// the setup we want, copy data from the old table over, drop the old table and rename the new table
struct TmpSnodeReceivedMessageInfo: Codable, TableRecord, FetchableRecord, PersistableRecord, ColumnExpressible {
@ -45,25 +51,25 @@ enum _005_AddSnodeReveivedMessageInfoPrimaryKey: Migration {
// Insert into the new table, drop the old table and rename the new table to be the old one
let tmpInfo: TypedTableAlias<TmpSnodeReceivedMessageInfo> = TypedTableAlias()
let info: TypedTableAlias<SnodeReceivedMessageInfo> = TypedTableAlias()
let info: TypedTableAlias<LegacyInfo> = TypedTableAlias()
try db.execute(literal: """
INSERT INTO \(tmpInfo)
SELECT \(info[.key]), \(info[.hash]), \(info[.expirationDateMs]), \(info[.wasDeletedOrInvalid])
FROM \(info)
""")
try db.drop(table: SnodeReceivedMessageInfo.self)
try db.drop(table: LegacyInfo.self)
try db.rename(
table: TmpSnodeReceivedMessageInfo.databaseTableName,
to: SnodeReceivedMessageInfo.databaseTableName
to: LegacyInfo.databaseTableName
)
// Need to create the indexes separately from creating 'TmpGroupMember' to ensure they
// have the correct names
try db.createIndex(on: SnodeReceivedMessageInfo.self, columns: [.key])
try db.createIndex(on: SnodeReceivedMessageInfo.self, columns: [.hash])
try db.createIndex(on: SnodeReceivedMessageInfo.self, columns: [.expirationDateMs])
try db.createIndex(on: SnodeReceivedMessageInfo.self, columns: [.wasDeletedOrInvalid])
// Need to create the indexes separately from creating 'TmpSnodeReceivedMessageInfo' to
// ensure they have the correct names
try db.createIndex(on: LegacyInfo.self, columns: [.key])
try db.createIndex(on: LegacyInfo.self, columns: [.hash])
try db.createIndex(on: LegacyInfo.self, columns: [.expirationDateMs])
try db.createIndex(on: LegacyInfo.self, columns: [.wasDeletedOrInvalid])
Storage.update(progress: 1, for: self, in: target, using: dependencies)
}

@ -0,0 +1,111 @@
// Copyright © 2025 Rangeproof Pty Ltd. All rights reserved.
import Foundation
import GRDB
import SessionUtilitiesKit
/// This migration splits the old `key` structure used for `SnodeReceivedMessageInfo` into separate columns for more efficient querying
enum _007_SplitSnodeReceivedMessageInfo: Migration {
static let target: TargetMigrations.Identifier = .snodeKit
static let identifier: String = "SplitSnodeReceivedMessageInfo"
static let needsConfigSync: Bool = false
static let minExpectedRunDuration: TimeInterval = 0.1
static let fetchedTables: [(TableRecord & FetchableRecord).Type] = [
_001_InitialSetupMigration.LegacySnodeReceivedMessageInfo.self
]
static let createdOrAlteredTables: [(TableRecord & FetchableRecord).Type] = [SnodeReceivedMessageInfo.self]
static let droppedTables: [(TableRecord & FetchableRecord).Type] = [
_001_InitialSetupMigration.LegacySnodeReceivedMessageInfo.self
]
static func migrate(_ db: Database, using dependencies: Dependencies) throws {
typealias LegacyInfo = _001_InitialSetupMigration.LegacySnodeReceivedMessageInfo
/// Fetch the existing values and then drop the table
let existingValues: [LegacyInfo] = try LegacyInfo.fetchAll(db)
try db.drop(table: LegacyInfo.self)
/// Create the new table
try db.create(table: SnodeReceivedMessageInfo.self) { t in
t.column(.swarmPublicKey, .text)
.notNull()
.indexed()
t.column(.snodeAddress, .text).notNull()
t.column(.namespace, .integer).notNull()
.indexed()
t.column(.hash, .text)
.notNull()
.indexed()
t.column(.expirationDateMs, .integer)
.notNull()
.indexed()
t.column(.wasDeletedOrInvalid, .boolean)
.notNull()
.indexed()
t.primaryKey([.swarmPublicKey, .snodeAddress, .namespace, .hash])
}
/// Convert the old data to the new structure and insert it
let timestampNowMs: Int64 = Int64(dependencies.dateNow.timeIntervalSince1970 * 1000)
let updatedValues: [SnodeReceivedMessageInfo] = existingValues.compactMap { info in
/// The old key was a combination of `{snode address}.{publicKey}.{namespace}`
let keyComponents: [String] = info.key.components(separatedBy: ".") // stringlint:ignore
/// Because node addresses are likely ip addresses the `keyComponents` array above may have an inconsistent length
/// as such we need to find the swarm public key and then split again based on that value
let maybeSwarmPublicKey: String? = {
/// Legacy versions only included the `publicKey` which isn't supported anymore
/// so just ignore those
guard keyComponents.count > 2 else { return nil }
/// If this wasn't associated to a `namespace` the the last value will be the `swarmPublicKey`
guard (try? SessionId(from: keyComponents[keyComponents.count - 1])) != nil else {
/// Otherwise it'll be the 2nd last value
guard (try? SessionId(from: keyComponents[keyComponents.count - 2])) != nil else {
return nil
}
return keyComponents[keyComponents.count - 2]
}
return keyComponents[keyComponents.count - 1]
}()
/// There was a bug in an old version of the code where it wouldn't correctly prune expired hashes so we may as well
/// exclude them here as they just take up space otherwise
guard
let swarmPublicKey: String = maybeSwarmPublicKey,
info.expirationDateMs > timestampNowMs
else { return nil }
/// Split on the `swarmPublicKey`
let swarmPublicKeySplitComponents: [String] = info.key
.components(separatedBy: ".\(swarmPublicKey).") // stringlint:ignore
.filter { !$0.isEmpty }
guard !swarmPublicKeySplitComponents.isEmpty else { return nil }
let targetNamespace: Int = {
guard swarmPublicKeySplitComponents.count == 2 else {
return SnodeAPI.Namespace.default.rawValue
}
return (Int(swarmPublicKeySplitComponents[1]) ?? SnodeAPI.Namespace.default.rawValue)
}()
return SnodeReceivedMessageInfo(
swarmPublicKey: swarmPublicKey,
snodeAddress: swarmPublicKeySplitComponents[0],
namespace: targetNamespace,
hash: info.hash,
expirationDateMs: info.expirationDateMs,
wasDeletedOrInvalid: (info.wasDeletedOrInvalid == true)
)
}
try updatedValues.forEach { _ = try $0.inserted(db) }
Storage.update(progress: 1, for: self, in: target, using: dependencies)
}
}

@ -11,16 +11,22 @@ public struct SnodeReceivedMessageInfo: Codable, FetchableRecord, MutablePersist
public typealias Columns = CodingKeys
public enum CodingKeys: String, CodingKey, ColumnExpression {
case key
case swarmPublicKey
case snodeAddress
case namespace
case hash
case expirationDateMs
case wasDeletedOrInvalid
}
/// The key this message hash is associated to
///
/// This will be a combination of {address}.{port}.{publicKey} for new rows and just the {publicKey} for legacy rows
public let key: String
/// The public key for the swarm this message info was retrieved from
public let swarmPublicKey: String
/// The address for the snode this message info was retrieved from (in the form of `{server}:{port}`)
public let snodeAddress: String
/// The namespace this message info was retrieved from
public let namespace: Int
/// The is the hash for the received message
public let hash: String
@ -28,29 +34,24 @@ public struct SnodeReceivedMessageInfo: Codable, FetchableRecord, MutablePersist
/// This is the timestamp (in milliseconds since epoch) when the message hash should expire
///
/// **Note:** If no value exists this will default to 15 days from now (since the service node caches messages for
/// 14 days)
/// 14 days for standard messages)
public let expirationDateMs: Int64
/// This flag indicates whether the interaction associated with this message hash was deleted or whether this message
/// This flag indicates whether the message associated with this message hash was deleted or whether this message
/// hash is potentially invalid (if a poll results in 100% of the `SnodeReceivedMessageInfo` entries being seen as
/// duplicates then we assume that the `lastHash` value provided when retrieving messages was invalid and mark
/// it as such)
///
/// **Note:** When retrieving the `lastNotExpired` we will ignore any entries where this flag is true
public var wasDeletedOrInvalid: Bool?
/// This flag can also be used to refetch messages from a swarm without impacting the hash-based deduping mechanism
/// as if a hash with this value set to `true` is received when pollig then the value gets reset to `false`
///
/// **Note:** When retrieving the `lastNotExpired` we will ignore any entries where this flag is `true`
public var wasDeletedOrInvalid: Bool
}
// MARK: - Convenience
public extension SnodeReceivedMessageInfo {
private static func key(for snode: LibSession.Snode, swarmPublicKey: String, namespace: SnodeAPI.Namespace) -> String {
guard namespace != .default else {
return "\(snode.address).\(swarmPublicKey)"
}
return "\(snode.address).\(swarmPublicKey).\(namespace.rawValue)"
}
init(
snode: LibSession.Snode,
swarmPublicKey: String,
@ -58,9 +59,12 @@ public extension SnodeReceivedMessageInfo {
hash: String,
expirationDateMs: Int64?
) {
self.key = SnodeReceivedMessageInfo.key(for: snode, swarmPublicKey: swarmPublicKey, namespace: namespace)
self.swarmPublicKey = swarmPublicKey
self.snodeAddress = snode.address
self.namespace = namespace.rawValue
self.hash = hash
self.expirationDateMs = (expirationDateMs ?? 0)
self.wasDeletedOrInvalid = false
}
}
@ -78,11 +82,12 @@ public extension SnodeReceivedMessageInfo {
let currentOffsetTimestampMs: Int64 = dependencies[cache: .snodeAPI].currentOffsetTimestampMs()
return try SnodeReceivedMessageInfo
.filter(SnodeReceivedMessageInfo.Columns.wasDeletedOrInvalid == false)
.filter(
SnodeReceivedMessageInfo.Columns.wasDeletedOrInvalid == nil ||
SnodeReceivedMessageInfo.Columns.wasDeletedOrInvalid == false
SnodeReceivedMessageInfo.Columns.swarmPublicKey == swarmPublicKey &&
SnodeReceivedMessageInfo.Columns.snodeAddress == snode.address &&
SnodeReceivedMessageInfo.Columns.namespace == namespace.rawValue
)
.filter(SnodeReceivedMessageInfo.Columns.key == key(for: snode, swarmPublicKey: swarmPublicKey, namespace: namespace))
.filter(SnodeReceivedMessageInfo.Columns.expirationDateMs > currentOffsetTimestampMs)
.order(Column.rowID.desc)
.fetchOne(db)
@ -99,23 +104,25 @@ public extension SnodeReceivedMessageInfo {
potentiallyInvalidHashes: [String],
otherKnownValidHashes: [String] = []
) throws {
_ = try SnodeReceivedMessageInfo
.filter(potentiallyInvalidHashes.contains(SnodeReceivedMessageInfo.Columns.hash))
.updateAll(
db,
SnodeReceivedMessageInfo.Columns.wasDeletedOrInvalid.set(to: true)
)
if !potentiallyInvalidHashes.isEmpty {
_ = try SnodeReceivedMessageInfo
.filter(potentiallyInvalidHashes.contains(SnodeReceivedMessageInfo.Columns.hash))
.updateAll(
db,
SnodeReceivedMessageInfo.Columns.wasDeletedOrInvalid.set(to: true)
)
}
// If we have any server hashes which we know are valid (eg. we fetched the oldest messages) then
// mark them all as valid to prevent the case where we just slowly work backwards from the latest
// message, polling for one earlier each time
guard !otherKnownValidHashes.isEmpty else { return }
_ = try SnodeReceivedMessageInfo
.filter(otherKnownValidHashes.contains(SnodeReceivedMessageInfo.Columns.hash))
.updateAll(
db,
SnodeReceivedMessageInfo.Columns.wasDeletedOrInvalid.set(to: false)
)
if !otherKnownValidHashes.isEmpty {
_ = try SnodeReceivedMessageInfo
.filter(otherKnownValidHashes.contains(SnodeReceivedMessageInfo.Columns.hash))
.updateAll(
db,
SnodeReceivedMessageInfo.Columns.wasDeletedOrInvalid.set(to: false)
)
}
}
}

@ -117,7 +117,6 @@ public final class SnodeAPI {
result[next.0] = (next.1, messageResponse)
}
}
}

@ -107,6 +107,7 @@ public extension Network {
static func preparedUpload(
data: Data,
requestAndPathBuildTimeout: TimeInterval? = nil,
using dependencies: Dependencies
) throws -> PreparedRequest<FileUploadResponse> {
return try PreparedRequest(
@ -121,6 +122,7 @@ public extension Network {
),
responseType: FileUploadResponse.self,
requestTimeout: Network.fileUploadTimeout,
requestAndPathBuildTimeout: requestAndPathBuildTimeout,
using: dependencies
)
}

@ -13,7 +13,8 @@ extension Timer {
// If we are forcing synchrnonous execution (ie. running unit tests) then ceil the
// timeInterval for execution and append it to the execution set so the test can
// trigger the logic in a synchronous way)
// trigger the logic in a synchronous way - the `dependencies.async` function stores
// the closure and executes it when the `dependencies.stepForwardInTime` is triggered)
guard !dependencies.forceSynchronous else {
dependencies.async(at: dependencies.dateNow.timeIntervalSince1970 + timeInterval) {
block(timer)

Loading…
Cancel
Save