Started caching pending ReadReceipt messages to resolve an edge-case

Fixed an issue where read receipts could be sent for already read messages
Fixed an issue where the read state change might not update the UI
pull/784/head
Morgan Pretty 2 years ago
parent 3344e58716
commit 08b1e9a131

@ -651,6 +651,8 @@
FD3C907127E445E500CD579F /* MessageReceiverDecryptionSpec.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD3C907027E445E500CD579F /* MessageReceiverDecryptionSpec.swift */; };
FD3E0C84283B5835002A425C /* SessionThreadViewModel.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD3E0C83283B5835002A425C /* SessionThreadViewModel.swift */; };
FD42F9A8285064B800A0C77D /* PushNotificationAPI.swift in Sources */ = {isa = PBXBuildFile; fileRef = C33FDBDE255A581900E217F9 /* PushNotificationAPI.swift */; };
FD432432299C6933008A0213 /* _011_AddPendingReadReceipts.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD432431299C6933008A0213 /* _011_AddPendingReadReceipts.swift */; };
FD432434299C6985008A0213 /* PendingReadReceipt.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD432433299C6985008A0213 /* PendingReadReceipt.swift */; };
FD4B200E283492210034334B /* InsetLockableTableView.swift in Sources */ = {isa = PBXBuildFile; fileRef = FD4B200D283492210034334B /* InsetLockableTableView.swift */; };
FD52090028AF6153006098F6 /* OWSBackgroundTask.m in Sources */ = {isa = PBXBuildFile; fileRef = C33FDC1B255A581F00E217F9 /* OWSBackgroundTask.m */; };
FD52090128AF61BA006098F6 /* OWSBackgroundTask.h in Headers */ = {isa = PBXBuildFile; fileRef = C33FDB38255A580B00E217F9 /* OWSBackgroundTask.h */; settings = {ATTRIBUTES = (Public, ); }; };
@ -1735,6 +1737,8 @@
FD3C907027E445E500CD579F /* MessageReceiverDecryptionSpec.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = MessageReceiverDecryptionSpec.swift; sourceTree = "<group>"; };
FD3C907427E83AC200CD579F /* OpenGroupServerIdLookup.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = OpenGroupServerIdLookup.swift; sourceTree = "<group>"; };
FD3E0C83283B5835002A425C /* SessionThreadViewModel.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = SessionThreadViewModel.swift; sourceTree = "<group>"; };
FD432431299C6933008A0213 /* _011_AddPendingReadReceipts.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = _011_AddPendingReadReceipts.swift; sourceTree = "<group>"; };
FD432433299C6985008A0213 /* PendingReadReceipt.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = PendingReadReceipt.swift; sourceTree = "<group>"; };
FD4B200D283492210034334B /* InsetLockableTableView.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = InsetLockableTableView.swift; sourceTree = "<group>"; };
FD52090228B4680F006098F6 /* RadioButton.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = RadioButton.swift; sourceTree = "<group>"; };
FD52090428B4915F006098F6 /* PrivacySettingsViewModel.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = PrivacySettingsViewModel.swift; sourceTree = "<group>"; };
@ -3524,6 +3528,7 @@
FDE77F6A280FEB28002CFC5D /* ControlMessageProcessRecord.swift */,
FD5C7308285007920029977D /* BlindedIdLookup.swift */,
FD09B7E6288670FD00ED0B66 /* Reaction.swift */,
FD432433299C6985008A0213 /* PendingReadReceipt.swift */,
);
path = Models;
sourceTree = "<group>";
@ -3541,6 +3546,7 @@
FD09B7E4288670BB00ED0B66 /* _008_EmojiReacts.swift */,
7BAA7B6528D2DE4700AE1489 /* _009_OpenGroupPermission.swift */,
FD7115F128C6CB3900B47552 /* _010_AddThreadIdToFTS.swift */,
FD432431299C6933008A0213 /* _011_AddPendingReadReceipts.swift */,
);
path = Migrations;
sourceTree = "<group>";
@ -5490,6 +5496,7 @@
FD716E6428502DDD00C96BF4 /* CallManagerProtocol.swift in Sources */,
FDC438C727BB6DF000C60D73 /* DirectMessage.swift in Sources */,
FDC4384F27B4804F00C60D73 /* Header.swift in Sources */,
FD432434299C6985008A0213 /* PendingReadReceipt.swift in Sources */,
FDC4381727B32EC700C60D73 /* Personalization.swift in Sources */,
FD245C51285065CC00B966DD /* MessageReceiver.swift in Sources */,
FD245C652850665400B966DD /* ClosedGroupControlMessage.swift in Sources */,
@ -5551,6 +5558,7 @@
FDC438C127BB4E6800C60D73 /* SMKDependencies.swift in Sources */,
FDC4383827B3863200C60D73 /* VersionResponse.swift in Sources */,
B806ECA126C4A7E4008BDA44 /* WebRTCSession+UI.swift in Sources */,
FD432432299C6933008A0213 /* _011_AddPendingReadReceipts.swift in Sources */,
7BCD116C27016062006330F1 /* WebRTCSession+DataChannel.swift in Sources */,
FD5C72F9284F0E880029977D /* MessageReceiver+TypingIndicators.swift in Sources */,
FD5C7303284F0FA50029977D /* MessageReceiver+Calls.swift in Sources */,

@ -200,7 +200,7 @@ public class ConversationViewModel: OWSAudioPlayerDelegate {
),
PagedData.ObservedChanges(
table: RecipientState.self,
columns: [.state, .mostRecentFailureText],
columns: [.state, .readTimestampMs, .mostRecentFailureText],
joinToPagedType: {
let interaction: TypedTableAlias<Interaction> = TypedTableAlias()
let recipientState: TypedTableAlias<RecipientState> = TypedTableAlias()

@ -4,6 +4,7 @@ import UIKit
import AVFoundation
import Curve25519Kit
import SessionUIKit
import SessionMessagingKit
import SessionUtilitiesKit
final class QRCodeVC : BaseVC, UIPageViewControllerDataSource, UIPageViewControllerDelegate, QRScannerDelegate {

@ -24,7 +24,8 @@ public enum SNMessagingKit { // Just to make the external API nice
[
_008_EmojiReacts.self,
_009_OpenGroupPermission.self,
_010_AddThreadIdToFTS.self
_010_AddThreadIdToFTS.self,
_011_AddPendingReadReceipts.self
]
]
)

@ -0,0 +1,41 @@
// Copyright © 2023 Rangeproof Pty Ltd. All rights reserved.
import Foundation
import GRDB
import SessionUtilitiesKit
/// This migration adds a table to track pending read receipts (it's possible to receive a read receipt message before getting the original
/// message due to how one-to-one conversations work, by storing pending read receipts we should be able to prevent this case)
enum _011_AddPendingReadReceipts: Migration {
static let target: TargetMigrations.Identifier = .messagingKit
static let identifier: String = "AddPendingReadReceipts"
static let needsConfigSync: Bool = false
static let minExpectedRunDuration: TimeInterval = 0.1
static func migrate(_ db: Database) throws {
// Can't actually alter a virtual table in SQLite so we need to drop and recreate it,
// luckily this is actually pretty quick
if try db.tableExists(Interaction.fullTextSearchTableName) {
try db.drop(table: Interaction.fullTextSearchTableName)
try db.dropFTS5SynchronizationTriggers(forTable: Interaction.fullTextSearchTableName)
}
try db.create(table: PendingReadReceipt.self) { t in
t.column(.threadId, .text)
.notNull()
.indexed() // Quicker querying
.references(SessionThread.self, onDelete: .cascade) // Delete if Thread deleted
t.column(.interactionTimestampMs, .integer)
.notNull()
.indexed() // Quicker querying
t.column(.readTimestampMs, .integer)
.notNull()
t.column(.serverExpirationTimestamp, .double)
.notNull()
t.primaryKey([.threadId, .interactionTimestampMs])
}
Storage.update(progress: 1, for: self, in: target) // In case this is the last migration
}
}

@ -450,26 +450,33 @@ public extension Interaction {
trySendReadReceipt: Bool
) throws {
guard let interactionId: Int64 = interactionId else { return }
struct InteractionReadInfo: Decodable, FetchableRecord {
let id: Int64
let variant: Interaction.Variant
let timestampMs: Int64
let wasRead: Bool
}
// Once all of the below is done schedule the jobs
func scheduleJobs(interactionIds: [Int64]) {
func scheduleJobs(interactionInfo: [InteractionReadInfo]) {
// Add the 'DisappearingMessagesJob' if needed - this will update any expiring
// messages `expiresStartedAtMs` values
JobRunner.upsert(
db,
job: DisappearingMessagesJob.updateNextRunIfNeeded(
db,
interactionIds: interactionIds,
interactionIds: interactionInfo.map { $0.id },
startedAtMs: TimeInterval(SnodeAPI.currentOffsetTimestampMs())
)
)
// Clear out any notifications for the interactions we mark as read
Environment.shared?.notificationsManager.wrappedValue?.cancelNotifications(
identifiers: interactionIds
.map { interactionId in
identifiers: interactionInfo
.map { interactionInfo in
Interaction.notificationIdentifier(
for: interactionId,
for: interactionInfo.id,
threadId: threadId,
shouldGroupMessagesForThread: false
)
@ -482,43 +489,54 @@ public extension Interaction {
)
// If we want to send read receipts and it's a contact thread then try to add the
// 'SendReadReceiptsJob'
// 'SendReadReceiptsJob' for and unread messages that weren't outgoing
if trySendReadReceipt && threadVariant == .contact {
JobRunner.upsert(
db,
job: SendReadReceiptsJob.createOrUpdateIfNeeded(
db,
threadId: threadId,
interactionIds: interactionIds
interactionIds: interactionInfo
.filter { !$0.wasRead && $0.variant != .standardOutgoing }
.map { $0.id }
)
)
}
}
// If we aren't including older interactions then update and save the current one
struct InteractionReadInfo: Decodable, FetchableRecord {
let timestampMs: Int64
let wasRead: Bool
}
// Since there is no guarantee on the order messages are inserted into the database
// fetch the timestamp for the interaction and set everything before that as read
let maybeInteractionInfo: InteractionReadInfo? = try Interaction
.select(.timestampMs, .wasRead)
.select(.id, .variant, .timestampMs, .wasRead)
.filter(id: interactionId)
.asRequest(of: InteractionReadInfo.self)
.fetchOne(db)
// If we aren't including older interactions then update and save the current one
guard includingOlder, let interactionInfo: InteractionReadInfo = maybeInteractionInfo else {
// Only mark as read and trigger the subsequent jobs if the interaction is
// actually not read (no point updating and triggering db changes otherwise)
guard maybeInteractionInfo?.wasRead == false else { return }
guard
maybeInteractionInfo?.wasRead == false,
let variant: Variant = try Interaction
.filter(id: interactionId)
.select(.variant)
.asRequest(of: Variant.self)
.fetchOne(db)
else { return }
_ = try Interaction
.filter(id: interactionId)
.updateAll(db, Columns.wasRead.set(to: true))
scheduleJobs(interactionIds: [interactionId])
scheduleJobs(interactionInfo: [
InteractionReadInfo(
id: interactionId,
variant: variant,
timestampMs: 0,
wasRead: false
)
])
return
}
@ -526,16 +544,16 @@ public extension Interaction {
.filter(Interaction.Columns.threadId == threadId)
.filter(Interaction.Columns.timestampMs <= interactionInfo.timestampMs)
.filter(Interaction.Columns.wasRead == false)
let interactionIdsToMarkAsRead: [Int64] = try interactionQuery
.select(.id)
.asRequest(of: Int64.self)
let interactionInfoToMarkAsRead: [InteractionReadInfo] = try interactionQuery
.select(.id, .variant, .timestampMs, .wasRead)
.asRequest(of: InteractionReadInfo.self)
.fetchAll(db)
// If there are no other interactions to mark as read then just schedule the jobs
// for this interaction (need to ensure the disapeparing messages run for sync'ed
// outgoing messages which will always have 'wasRead' as false)
guard !interactionIdsToMarkAsRead.isEmpty else {
scheduleJobs(interactionIds: [interactionId])
guard !interactionInfoToMarkAsRead.isEmpty else {
scheduleJobs(interactionInfo: [interactionInfo])
return
}
@ -543,27 +561,71 @@ public extension Interaction {
try interactionQuery.updateAll(db, Columns.wasRead.set(to: true))
// Retrieve the interaction ids we want to update
scheduleJobs(interactionIds: interactionIdsToMarkAsRead)
scheduleJobs(interactionInfo: interactionInfoToMarkAsRead)
}
/// This method flags sent messages as read for the specified recipients
///
/// **Note:** This method won't update the 'wasRead' flag (it will be updated via the above method)
static func markAsRead(_ db: Database, recipientId: String, timestampMsValues: [Double], readTimestampMs: Double) throws {
guard db[.areReadReceiptsEnabled] == true else { return }
try RecipientState
@discardableResult static func markAsRead(
_ db: Database,
recipientId: String,
timestampMsValues: [Int64],
readTimestampMs: Int64
) throws -> Set<Int64> {
guard db[.areReadReceiptsEnabled] == true else { return [] }
// Update the read state
let rowIds: [Int64] = try RecipientState
.select(Column.rowID)
.filter(RecipientState.Columns.recipientId == recipientId)
.joining(
required: RecipientState.interaction
.filter(Columns.variant == Variant.standardOutgoing)
.filter(timestampMsValues.contains(Columns.timestampMs))
.filter(Columns.variant == Variant.standardOutgoing)
)
.asRequest(of: Int64.self)
.fetchAll(db)
// If there were no 'rowIds' then no need to run the below queries, all of the timestamps
// and for pending read receipts
guard !rowIds.isEmpty else { return timestampMsValues.asSet() }
// Update the 'readTimestampMs' if it doesn't match (need to do this to prevent
// the UI update from being triggered for a redundant update)
try RecipientState
.filter(rowIds.contains(Column.rowID))
.filter(RecipientState.Columns.readTimestampMs == nil)
.updateAll(
db,
RecipientState.Columns.readTimestampMs.set(to: readTimestampMs)
)
// If the message still appeared to be sending then mark it as sent
try RecipientState
.filter(rowIds.contains(Column.rowID))
.filter(RecipientState.Columns.state == RecipientState.State.sending)
.updateAll(
db,
RecipientState.Columns.readTimestampMs.set(to: readTimestampMs),
RecipientState.Columns.state.set(to: RecipientState.State.sent)
)
// Retrieve the set of timestamps which were updated
let timestampsUpdated: Set<Int64> = try Interaction
.select(Columns.timestampMs)
.filter(timestampMsValues.contains(Columns.timestampMs))
.filter(Columns.variant == Variant.standardOutgoing)
.joining(
required: Interaction.recipientStates
.filter(rowIds.contains(Column.rowID))
)
.asRequest(of: Int64.self)
.fetchSet(db)
// Return the timestamps which weren't updated
return timestampMsValues
.asSet()
.subtracting(timestampsUpdated)
}
}

@ -0,0 +1,44 @@
// Copyright © 2023 Rangeproof Pty Ltd. All rights reserved.
import Foundation
import GRDB
import SessionUtilitiesKit
public struct PendingReadReceipt: Codable, Equatable, Hashable, FetchableRecord, PersistableRecord, TableRecord, ColumnExpressible {
public static var databaseTableName: String { "pendingReadReceipt" }
public static let threadForeignKey = ForeignKey([Columns.threadId], to: [SessionThread.Columns.id])
public typealias Columns = CodingKeys
public enum CodingKeys: String, CodingKey, ColumnExpression {
case threadId
case interactionTimestampMs
case readTimestampMs
case serverExpirationTimestamp
}
/// The id for the thread this ReadReceipt belongs to
public let threadId: String
/// The timestamp in milliseconds since epoch for the interaction this read receipt relates to
public let interactionTimestampMs: Int64
/// The timestamp in milliseconds since epoch that the interaction this read receipt relates to was read
public let readTimestampMs: Int64
/// The timestamp for when this message will expire on the server (will be used for garbage collection)
public let serverExpirationTimestamp: TimeInterval
// MARK: - Initialization
public init(
threadId: String,
interactionTimestampMs: Int64,
readTimestampMs: Int64,
serverExpirationTimestamp: TimeInterval
) {
self.threadId = threadId
self.interactionTimestampMs = interactionTimestampMs
self.readTimestampMs = readTimestampMs
self.serverExpirationTimestamp = serverExpirationTimestamp
}
}

@ -41,7 +41,7 @@ public enum GarbageCollectionJob: JobExecutor {
/// are shown)
let lastGarbageCollection: Date = UserDefaults.standard[.lastGarbageCollection]
.defaulting(to: Date.distantPast)
let finalTypesToCollection: Set<Types> = {
let finalTypesToCollect: Set<Types> = {
guard
job.behaviour != .recurringOnActive ||
Date().timeIntervalSince(lastGarbageCollection) > (23 * 60 * 60)
@ -60,20 +60,20 @@ public enum GarbageCollectionJob: JobExecutor {
Storage.shared.writeAsync(
updates: { db in
/// Remove any typing indicators
if finalTypesToCollection.contains(.threadTypingIndicators) {
if finalTypesToCollect.contains(.threadTypingIndicators) {
_ = try ThreadTypingIndicator
.deleteAll(db)
}
/// Remove any expired controlMessageProcessRecords
if finalTypesToCollection.contains(.expiredControlMessageProcessRecords) {
if finalTypesToCollect.contains(.expiredControlMessageProcessRecords) {
_ = try ControlMessageProcessRecord
.filter(ControlMessageProcessRecord.Columns.serverExpirationTimestamp <= timestampNow)
.deleteAll(db)
}
/// Remove any old open group messages - open group messages which are older than six months
if finalTypesToCollection.contains(.oldOpenGroupMessages) && db[.trimOpenGroupMessagesOlderThanSixMonths] {
if finalTypesToCollect.contains(.oldOpenGroupMessages) && db[.trimOpenGroupMessagesOlderThanSixMonths] {
let interaction: TypedTableAlias<Interaction> = TypedTableAlias()
let thread: TypedTableAlias<SessionThread> = TypedTableAlias()
let threadIdLiteral: SQL = SQL(stringLiteral: Interaction.Columns.threadId.name)
@ -104,7 +104,7 @@ public enum GarbageCollectionJob: JobExecutor {
}
/// Orphaned jobs - jobs which have had their threads or interactions removed
if finalTypesToCollection.contains(.orphanedJobs) {
if finalTypesToCollect.contains(.orphanedJobs) {
let job: TypedTableAlias<Job> = TypedTableAlias()
let thread: TypedTableAlias<SessionThread> = TypedTableAlias()
let interaction: TypedTableAlias<Interaction> = TypedTableAlias()
@ -130,7 +130,7 @@ public enum GarbageCollectionJob: JobExecutor {
}
/// Orphaned link previews - link previews which have no interactions with matching url & rounded timestamps
if finalTypesToCollection.contains(.orphanedLinkPreviews) {
if finalTypesToCollect.contains(.orphanedLinkPreviews) {
let linkPreview: TypedTableAlias<LinkPreview> = TypedTableAlias()
let interaction: TypedTableAlias<Interaction> = TypedTableAlias()
@ -150,7 +150,7 @@ public enum GarbageCollectionJob: JobExecutor {
/// Orphaned open groups - open groups which are no longer associated to a thread (except for the session-run ones for which
/// we want cached image data even if the user isn't in the group)
if finalTypesToCollection.contains(.orphanedOpenGroups) {
if finalTypesToCollect.contains(.orphanedOpenGroups) {
let openGroup: TypedTableAlias<OpenGroup> = TypedTableAlias()
let thread: TypedTableAlias<SessionThread> = TypedTableAlias()
@ -169,7 +169,7 @@ public enum GarbageCollectionJob: JobExecutor {
}
/// Orphaned open group capabilities - capabilities which have no existing open groups with the same server
if finalTypesToCollection.contains(.orphanedOpenGroupCapabilities) {
if finalTypesToCollect.contains(.orphanedOpenGroupCapabilities) {
let capability: TypedTableAlias<Capability> = TypedTableAlias()
let openGroup: TypedTableAlias<OpenGroup> = TypedTableAlias()
@ -185,7 +185,7 @@ public enum GarbageCollectionJob: JobExecutor {
}
/// Orphaned blinded id lookups - lookups which have no existing threads or approval/block settings for either blinded/un-blinded id
if finalTypesToCollection.contains(.orphanedBlindedIdLookups) {
if finalTypesToCollect.contains(.orphanedBlindedIdLookups) {
let blindedIdLookup: TypedTableAlias<BlindedIdLookup> = TypedTableAlias()
let thread: TypedTableAlias<SessionThread> = TypedTableAlias()
let contact: TypedTableAlias<Contact> = TypedTableAlias()
@ -213,7 +213,7 @@ public enum GarbageCollectionJob: JobExecutor {
/// Approved blinded contact records - once a blinded contact has been approved there is no need to keep the blinded
/// contact record around anymore
if finalTypesToCollection.contains(.approvedBlindedContactRecords) {
if finalTypesToCollect.contains(.approvedBlindedContactRecords) {
let contact: TypedTableAlias<Contact> = TypedTableAlias()
let blindedIdLookup: TypedTableAlias<BlindedIdLookup> = TypedTableAlias()
@ -232,7 +232,7 @@ public enum GarbageCollectionJob: JobExecutor {
}
/// Orphaned attachments - attachments which have no related interactions, quotes or link previews
if finalTypesToCollection.contains(.orphanedAttachments) {
if finalTypesToCollect.contains(.orphanedAttachments) {
let attachment: TypedTableAlias<Attachment> = TypedTableAlias()
let quote: TypedTableAlias<Quote> = TypedTableAlias()
let linkPreview: TypedTableAlias<LinkPreview> = TypedTableAlias()
@ -255,7 +255,7 @@ public enum GarbageCollectionJob: JobExecutor {
""")
}
if finalTypesToCollection.contains(.orphanedProfiles) {
if finalTypesToCollect.contains(.orphanedProfiles) {
let profile: TypedTableAlias<Profile> = TypedTableAlias()
let thread: TypedTableAlias<SessionThread> = TypedTableAlias()
let interaction: TypedTableAlias<Interaction> = TypedTableAlias()
@ -289,6 +289,12 @@ public enum GarbageCollectionJob: JobExecutor {
)
""")
}
if finalTypesToCollect.contains(.expiredPendingReadReceipts) {
_ = try PendingReadReceipt
.filter(PendingReadReceipt.Columns.serverExpirationTimestamp <= timestampNow)
.deleteAll(db)
}
},
completion: { _, _ in
// Dispatch async so we can swap from the write queue to a read one (we are done writing)
@ -304,7 +310,7 @@ public enum GarbageCollectionJob: JobExecutor {
var profileAvatarFilenames: Set<String> = []
/// Orphaned attachment files - attachment files which don't have an associated record in the database
if finalTypesToCollection.contains(.orphanedAttachmentFiles) {
if finalTypesToCollect.contains(.orphanedAttachmentFiles) {
/// **Note:** Thumbnails are stored in the `NSCachesDirectory` directory which should be automatically manage
/// it's own garbage collection so we can just ignore it according to the various comments in the following stack overflow
/// post, the directory will be cleared during app updates as well as if the system is running low on memory (if the app isn't running)
@ -317,7 +323,7 @@ public enum GarbageCollectionJob: JobExecutor {
}
/// Orphaned profile avatar files - profile avatar files which don't have an associated record in the database
if finalTypesToCollection.contains(.orphanedProfileAvatars) {
if finalTypesToCollect.contains(.orphanedProfileAvatars) {
profileAvatarFilenames = try Profile
.select(.profilePictureFileName)
.filter(Profile.Columns.profilePictureFileName != nil)
@ -340,7 +346,7 @@ public enum GarbageCollectionJob: JobExecutor {
var deletionErrors: [Error] = []
// Orphaned attachment files (actual deletion)
if finalTypesToCollection.contains(.orphanedAttachmentFiles) {
if finalTypesToCollect.contains(.orphanedAttachmentFiles) {
// Note: Looks like in order to recursively look through files we need to use the
// enumerator method
let fileEnumerator = FileManager.default.enumerator(
@ -384,7 +390,7 @@ public enum GarbageCollectionJob: JobExecutor {
}
// Orphaned profile avatar files (actual deletion)
if finalTypesToCollection.contains(.orphanedProfileAvatars) {
if finalTypesToCollect.contains(.orphanedProfileAvatars) {
let allAvatarProfileFilenames: Set<String> = (try? FileManager.default
.contentsOfDirectory(atPath: ProfileManager.sharedDataProfileAvatarsDirPath))
.defaulting(to: [])
@ -442,6 +448,7 @@ extension GarbageCollectionJob {
case orphanedAttachments
case orphanedAttachmentFiles
case orphanedProfileAvatars
case expiredPendingReadReceipts
}
public struct Details: Codable {

@ -36,6 +36,7 @@ public enum MessageReceiveJob: JobExecutor {
try MessageReceiver.handle(
db,
message: messageInfo.message,
serverExpirationTimestamp: messageInfo.serverExpirationTimestamp,
associatedWithProto: try SNProtoContent.parseData(messageInfo.serializedProtoData),
openGroupId: nil
)
@ -104,30 +105,36 @@ extension MessageReceiveJob {
private enum CodingKeys: String, CodingKey {
case message
case variant
case serverExpirationTimestamp
case serializedProtoData
}
public let message: Message
public let variant: Message.Variant
public let serverExpirationTimestamp: TimeInterval?
public let serializedProtoData: Data
public init(
message: Message,
variant: Message.Variant,
serverExpirationTimestamp: TimeInterval?,
proto: SNProtoContent
) throws {
self.message = message
self.variant = variant
self.serverExpirationTimestamp = serverExpirationTimestamp
self.serializedProtoData = try proto.serializedData()
}
private init(
message: Message,
variant: Message.Variant,
serverExpirationTimestamp: TimeInterval?,
serializedProtoData: Data
) {
self.message = message
self.variant = variant
self.serverExpirationTimestamp = serverExpirationTimestamp
self.serializedProtoData = serializedProtoData
}
@ -144,6 +151,7 @@ extension MessageReceiveJob {
self = MessageInfo(
message: try variant.decode(from: container, forKey: .message),
variant: variant,
serverExpirationTimestamp: try? container.decode(TimeInterval.self, forKey: .serverExpirationTimestamp),
serializedProtoData: try container.decode(Data.self, forKey: .serializedProtoData)
)
}
@ -158,6 +166,7 @@ extension MessageReceiveJob {
try container.encode(message, forKey: .message)
try container.encode(variant, forKey: .variant)
try container.encodeIfPresent(serverExpirationTimestamp, forKey: .serverExpirationTimestamp)
try container.encode(serializedProtoData, forKey: .serializedProtoData)
}
}

@ -105,6 +105,7 @@ public extension SendReadReceiptsJob {
/// ensure that is done correctly beforehand
@discardableResult static func createOrUpdateIfNeeded(_ db: Database, threadId: String, interactionIds: [Int64]) -> Job? {
guard db[.areReadReceiptsEnabled] == true else { return nil }
guard !interactionIds.isEmpty else { return nil }
// Retrieve the timestampMs values for the specified interactions
let timestampMsValues: [Int64] = (try? Interaction

@ -547,6 +547,7 @@ public extension Message {
try MessageReceiveJob.Details.MessageInfo(
message: message,
variant: variant,
serverExpirationTimestamp: serverExpirationTimestamp,
proto: proto
)
)

@ -576,6 +576,7 @@ public final class OpenGroupManager: NSObject {
try MessageReceiver.handle(
db,
message: messageInfo.message,
serverExpirationTimestamp: messageInfo.serverExpirationTimestamp,
associatedWithProto: try SNProtoContent.parseData(messageInfo.serializedProtoData),
openGroupId: openGroup.id,
dependencies: dependencies
@ -739,6 +740,7 @@ public final class OpenGroupManager: NSObject {
try MessageReceiver.handle(
db,
message: messageInfo.message,
serverExpirationTimestamp: messageInfo.serverExpirationTimestamp,
associatedWithProto: try SNProtoContent.parseData(messageInfo.serializedProtoData),
openGroupId: nil, // Intentionally nil as they are technically not open group messages
dependencies: dependencies

@ -4,16 +4,32 @@ import Foundation
import GRDB
extension MessageReceiver {
internal static func handleReadReceipt(_ db: Database, message: ReadReceipt) throws {
internal static func handleReadReceipt(
_ db: Database,
message: ReadReceipt,
serverExpirationTimestamp: TimeInterval?
) throws {
guard let sender: String = message.sender else { return }
guard let timestampMsValues: [Double] = message.timestamps?.map({ Double($0) }) else { return }
guard let readTimestampMs: Double = message.receivedTimestamp.map({ Double($0) }) else { return }
guard let timestampMsValues: [Int64] = message.timestamps?.map({ Int64($0) }) else { return }
guard let readTimestampMs: Int64 = message.receivedTimestamp.map({ Int64($0) }) else { return }
try Interaction.markAsRead(
let pendingTimestampMs: Set<Int64> = try Interaction.markAsRead(
db,
recipientId: sender,
timestampMsValues: timestampMsValues,
readTimestampMs: readTimestampMs
)
guard !pendingTimestampMs.isEmpty else { return }
// We have some pending read receipts so store them in the database
try pendingTimestampMs.forEach { timestampMs in
try PendingReadReceipt(
threadId: sender,
interactionTimestampMs: timestampMs,
readTimestampMs: readTimestampMs,
serverExpirationTimestamp: (serverExpirationTimestamp ?? 0)
).save(db)
}
}
}

@ -161,6 +161,7 @@ extension MessageReceiver {
db,
thread: thread,
interactionId: existingInteractionId,
messageSentTimestamp: messageSentTimestamp,
variant: variant,
syncTarget: message.syncTarget
)
@ -178,6 +179,7 @@ extension MessageReceiver {
db,
thread: thread,
interactionId: interactionId,
messageSentTimestamp: messageSentTimestamp,
variant: variant,
syncTarget: message.syncTarget
)
@ -363,6 +365,7 @@ extension MessageReceiver {
_ db: Database,
thread: SessionThread,
interactionId: Int64,
messageSentTimestamp: TimeInterval,
variant: Interaction.Variant,
syncTarget: String?
) throws {
@ -371,6 +374,7 @@ extension MessageReceiver {
// Immediately update any existing outgoing message 'RecipientState' records to be 'sent'
_ = try? RecipientState
.filter(RecipientState.Columns.interactionId == interactionId)
.filter(RecipientState.Columns.state != RecipientState.State.sent)
.updateAll(db, RecipientState.Columns.state.set(to: RecipientState.State.sent))
// Create any addiitonal 'RecipientState' records as needed
@ -415,5 +419,22 @@ extension MessageReceiver {
includingOlder: true,
trySendReadReceipt: true
)
// Process any PendingReadReceipt values
let maybePendingReadReceipt: PendingReadReceipt? = try PendingReadReceipt
.filter(PendingReadReceipt.Columns.threadId == thread.id)
.filter(PendingReadReceipt.Columns.interactionTimestampMs == Int64(messageSentTimestamp * 1000))
.fetchOne(db)
if let pendingReadReceipt: PendingReadReceipt = maybePendingReadReceipt {
try Interaction.markAsRead(
db,
recipientId: thread.id,
timestampMsValues: [pendingReadReceipt.interactionTimestampMs],
readTimestampMs: pendingReadReceipt.readTimestampMs
)
_ = try pendingReadReceipt.delete(db)
}
}
}

@ -179,13 +179,18 @@ public enum MessageReceiver {
public static func handle(
_ db: Database,
message: Message,
serverExpirationTimestamp: TimeInterval?,
associatedWithProto proto: SNProtoContent,
openGroupId: String?,
dependencies: SMKDependencies = SMKDependencies()
) throws {
switch message {
case let message as ReadReceipt:
try MessageReceiver.handleReadReceipt(db, message: message)
try MessageReceiver.handleReadReceipt(
db,
message: message,
serverExpirationTimestamp: serverExpirationTimestamp
)
case let message as TypingIndicator:
try MessageReceiver.handleTypingIndicator(db, message: message)

@ -641,6 +641,7 @@ public final class MessageSender {
// Mark the message as sent
try interaction.recipientStates
.filter(RecipientState.Columns.state != RecipientState.State.sent)
.updateAll(db, RecipientState.Columns.state.set(to: RecipientState.State.sent))
// Start the disappearing messages timer if needed
@ -773,19 +774,21 @@ public final class MessageSender {
if let message = message as? VisibleMessage { message.syncTarget = publicKey }
if let message = message as? ExpirationTimerUpdate { message.syncTarget = publicKey }
JobRunner.add(
db,
job: Job(
variant: .messageSend,
threadId: threadId,
interactionId: interactionId,
details: MessageSendJob.Details(
destination: .contact(publicKey: currentUserPublicKey),
message: message,
isSyncMessage: true
Storage.shared.write { db in
JobRunner.add(
db,
job: Job(
variant: .messageSend,
threadId: threadId,
interactionId: interactionId,
details: MessageSendJob.Details(
destination: .contact(publicKey: currentUserPublicKey),
message: message,
isSyncMessage: true
)
)
)
)
}
}
}
}

Loading…
Cancel
Save