From 6fce2c26b71982ee33475690cb3fc5f8d6ecca93 Mon Sep 17 00:00:00 2001 From: Matthew Chen Date: Wed, 13 Sep 2017 15:35:33 -0400 Subject: [PATCH] Process messages in a single transaction (wherever possible). // FREEBIE --- .../ConversationViewController.m | 1 + .../ViewControllers/OWSNavigationController.m | 3 +- .../src/Contacts/Threads/TSGroupThread.h | 5 + .../src/Contacts/Threads/TSGroupThread.m | 33 +- .../src/Devices/OWSReadReceiptsProcessor.h | 6 +- .../src/Devices/OWSReadReceiptsProcessor.m | 99 +-- .../src/Devices/OWSRecordTranscriptJob.h | 9 +- .../src/Devices/OWSRecordTranscriptJob.m | 20 +- .../Attachments/OWSAttachmentsProcessor.h | 16 +- .../Attachments/OWSAttachmentsProcessor.m | 59 +- .../Messages/Interactions/TSIncomingMessage.h | 4 +- .../Messages/Interactions/TSIncomingMessage.m | 56 +- .../Messages/Interactions/TSOutgoingMessage.h | 3 +- .../Messages/Interactions/TSOutgoingMessage.m | 27 +- .../src/Messages/OWSBatchMessageProcessor.m | 44 +- .../src/Messages/OWSMessageReceiver.m | 42 +- .../src/Messages/OWSMessageSender.h | 9 +- .../src/Messages/OWSMessageSender.m | 24 +- .../src/Messages/TSMessagesManager.h | 4 +- .../src/Messages/TSMessagesManager.m | 615 ++++++++++-------- .../src/Storage/OWSIncomingMessageFinder.h | 3 +- .../src/Storage/OWSIncomingMessageFinder.m | 10 +- 22 files changed, 655 insertions(+), 437 deletions(-) diff --git a/Signal/src/ViewControllers/ConversationView/ConversationViewController.m b/Signal/src/ViewControllers/ConversationView/ConversationViewController.m index 92abc6a46..37fc15bb1 100644 --- a/Signal/src/ViewControllers/ConversationView/ConversationViewController.m +++ b/Signal/src/ViewControllers/ConversationView/ConversationViewController.m @@ -2600,6 +2600,7 @@ typedef NS_ENUM(NSInteger, MessagesRangeSizeMode) { [[OWSAttachmentsProcessor alloc] initWithAttachmentPointer:attachmentPointer networkManager:self.networkManager]; [processor fetchAttachmentsForMessage:message + storageManager:self.storageManager success:^(TSAttachmentStream *_Nonnull attachmentStream) { DDLogInfo( @"%@ Successfully redownloaded attachment in thread: %@", self.tag, message.thread); diff --git a/Signal/src/ViewControllers/OWSNavigationController.m b/Signal/src/ViewControllers/OWSNavigationController.m index 0f21f71d0..87bdffd41 100644 --- a/Signal/src/ViewControllers/OWSNavigationController.m +++ b/Signal/src/ViewControllers/OWSNavigationController.m @@ -51,8 +51,9 @@ // If we're not going to cancel the pop/back, we need to call the super // implementation since it has important side effects. if (result) { + // NOTE: result might end up NO if the super implementation cancels the + // the pop/back. result = [super navigationBar:navigationBar shouldPopItem:item]; - OWSAssert(result); } return result; } diff --git a/SignalServiceKit/src/Contacts/Threads/TSGroupThread.h b/SignalServiceKit/src/Contacts/Threads/TSGroupThread.h index 68a5a2004..fff927604 100644 --- a/SignalServiceKit/src/Contacts/Threads/TSGroupThread.h +++ b/SignalServiceKit/src/Contacts/Threads/TSGroupThread.h @@ -8,6 +8,7 @@ NS_ASSUME_NONNULL_BEGIN @class TSAttachmentStream; +@class YapDatabaseReadWriteTransaction; @interface TSGroupThread : TSThread @@ -18,6 +19,8 @@ NS_ASSUME_NONNULL_BEGIN transaction:(YapDatabaseReadWriteTransaction *)transaction; + (instancetype)getOrCreateThreadWithGroupIdData:(NSData *)groupId; ++ (instancetype)getOrCreateThreadWithGroupIdData:(NSData *)groupId + transaction:(YapDatabaseReadWriteTransaction *)transaction; + (instancetype)threadWithGroupModel:(TSGroupModel *)groupModel transaction:(YapDatabaseReadTransaction *)transaction; @@ -27,6 +30,8 @@ NS_ASSUME_NONNULL_BEGIN + (NSArray *)groupThreadsWithRecipientId:(NSString *)recipientId; - (void)updateAvatarWithAttachmentStream:(TSAttachmentStream *)attachmentStream; +- (void)updateAvatarWithAttachmentStream:(TSAttachmentStream *)attachmentStream + transaction:(YapDatabaseReadWriteTransaction *)transaction; @end diff --git a/SignalServiceKit/src/Contacts/Threads/TSGroupThread.m b/SignalServiceKit/src/Contacts/Threads/TSGroupThread.m index d5448c731..1549ac1a9 100644 --- a/SignalServiceKit/src/Contacts/Threads/TSGroupThread.m +++ b/SignalServiceKit/src/Contacts/Threads/TSGroupThread.m @@ -59,21 +59,35 @@ NS_ASSUME_NONNULL_BEGIN } + (instancetype)getOrCreateThreadWithGroupIdData:(NSData *)groupId + transaction:(YapDatabaseReadWriteTransaction *)transaction { OWSAssert(groupId.length > 0); + OWSAssert(transaction); - TSGroupThread *thread = [self fetchObjectWithUniqueID:[self threadIdFromGroupId:groupId]]; + TSGroupThread *thread = [self fetchObjectWithUniqueID:[self threadIdFromGroupId:groupId] transaction:transaction]; if (!thread) { thread = [[self alloc] initWithGroupIdData:groupId]; - [thread save]; + [thread saveWithTransaction:transaction]; } return thread; } ++ (instancetype)getOrCreateThreadWithGroupIdData:(NSData *)groupId +{ + OWSAssert(groupId.length > 0); + + __block TSGroupThread *thread; + [[self dbReadWriteConnection] readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) { + thread = [self getOrCreateThreadWithGroupIdData:groupId transaction:transaction]; + }]; + return thread; +} + + (instancetype)getOrCreateThreadWithGroupModel:(TSGroupModel *)groupModel transaction:(YapDatabaseReadWriteTransaction *)transaction { OWSAssert(groupModel); OWSAssert(groupModel.groupId.length > 0); + OWSAssert(transaction); TSGroupThread *thread = [self fetchObjectWithUniqueID:[self threadIdFromGroupId:groupModel.groupId] transaction:transaction]; @@ -162,12 +176,23 @@ NS_ASSUME_NONNULL_BEGIN - (void)updateAvatarWithAttachmentStream:(TSAttachmentStream *)attachmentStream { + [self.dbReadWriteConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) { + [self updateAvatarWithAttachmentStream:attachmentStream transaction:transaction]; + }]; +} + +- (void)updateAvatarWithAttachmentStream:(TSAttachmentStream *)attachmentStream + transaction:(YapDatabaseReadWriteTransaction *)transaction +{ + OWSAssert(attachmentStream); + OWSAssert(transaction); + self.groupModel.groupImage = [attachmentStream image]; - [self save]; + [self saveWithTransaction:transaction]; // Avatars are stored directly in the database, so there's no need // to keep the attachment around after assigning the image. - [attachmentStream remove]; + [attachmentStream removeWithTransaction:transaction]; } @end diff --git a/SignalServiceKit/src/Devices/OWSReadReceiptsProcessor.h b/SignalServiceKit/src/Devices/OWSReadReceiptsProcessor.h index 5f7c6b2d4..39b1663db 100644 --- a/SignalServiceKit/src/Devices/OWSReadReceiptsProcessor.h +++ b/SignalServiceKit/src/Devices/OWSReadReceiptsProcessor.h @@ -1,4 +1,6 @@ -// Copyright © 2016 Open Whisper Systems. All rights reserved. +// +// Copyright (c) 2017 Open Whisper Systems. All rights reserved. +// NS_ASSUME_NONNULL_BEGIN @@ -6,6 +8,7 @@ NS_ASSUME_NONNULL_BEGIN @class OWSReadReceipt; @class TSIncomingMessage; @class TSStorageManager; +@class YapDatabaseReadWriteTransaction; extern NSString *const OWSReadReceiptsProcessorMarkedMessageAsReadNotification; @@ -30,6 +33,7 @@ extern NSString *const OWSReadReceiptsProcessorMarkedMessageAsReadNotification; - (instancetype)init NS_UNAVAILABLE; - (void)process; +- (void)processWithTransaction:(YapDatabaseReadWriteTransaction *)transaction; @end diff --git a/SignalServiceKit/src/Devices/OWSReadReceiptsProcessor.m b/SignalServiceKit/src/Devices/OWSReadReceiptsProcessor.m index dd31e8285..1beb207fa 100644 --- a/SignalServiceKit/src/Devices/OWSReadReceiptsProcessor.m +++ b/SignalServiceKit/src/Devices/OWSReadReceiptsProcessor.m @@ -80,10 +80,20 @@ NSString *const OWSReadReceiptsProcessorMarkedMessageAsReadNotification = - (void)process { + [[self.storageManager newDatabaseConnection] readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) { + [self processWithTransaction:transaction]; + }]; +} + +- (void)processWithTransaction:(YapDatabaseReadWriteTransaction *)transaction +{ + OWSAssert(transaction); + DDLogDebug(@"%@ Processing %ld read receipts.", self.tag, (unsigned long)self.readReceipts.count); for (OWSReadReceipt *readReceipt in self.readReceipts) { - TSIncomingMessage *message = - [TSIncomingMessage findMessageWithAuthorId:readReceipt.senderId timestamp:readReceipt.timestamp]; + TSIncomingMessage *message = [TSIncomingMessage findMessageWithAuthorId:readReceipt.senderId + timestamp:readReceipt.timestamp + transaction:transaction]; if (message) { OWSAssert(message.thread); @@ -94,55 +104,52 @@ NSString *const OWSReadReceiptsProcessorMarkedMessageAsReadNotification = // Always mark the message specified by the read receipt as read. [interactionsToMarkAsRead addObject:message]; - [self.storageManager.dbReadWriteConnection readWriteWithBlock:^( - YapDatabaseReadWriteTransaction *transaction) { - [[TSDatabaseView unseenDatabaseViewExtension:transaction] - enumerateRowsInGroup:message.uniqueThreadId - usingBlock:^(NSString *collection, - NSString *key, - id object, - id metadata, - NSUInteger index, - BOOL *stop) { - - TSInteraction *interaction = object; - if (interaction.timestampForSorting > message.timestampForSorting) { - *stop = YES; - return; - } - - id possiblyRead = (id)object; - OWSAssert(!possiblyRead.read); - [interactionsToMarkAsRead addObject:possiblyRead]; - }]; - - for (id interaction in interactionsToMarkAsRead) { - // * Don't send a read receipt in response to a read receipt. - // * Don't update expiration; we'll do that in the next statement. - [interaction markAsReadWithTransaction:transaction sendReadReceipt:NO updateExpiration:NO]; - - if ([interaction isKindOfClass:[TSMessage class]]) { - TSMessage *otherMessage = (TSMessage *)interaction; - - // Update expiration using the timestamp from the readReceipt. - [OWSDisappearingMessagesJob setExpirationForMessage:otherMessage - expirationStartedAt:readReceipt.timestamp]; - - // Fire event that will cancel any pending notifications for this message. - dispatch_async(dispatch_get_main_queue(), ^{ - [[NSNotificationCenter defaultCenter] - postNotificationName:OWSReadReceiptsProcessorMarkedMessageAsReadNotification - object:otherMessage]; - }); - } + [[TSDatabaseView unseenDatabaseViewExtension:transaction] + enumerateRowsInGroup:message.uniqueThreadId + usingBlock:^(NSString *collection, + NSString *key, + id object, + id metadata, + NSUInteger index, + BOOL *stop) { + + TSInteraction *interaction = object; + if (interaction.timestampForSorting > message.timestampForSorting) { + *stop = YES; + return; + } + + id possiblyRead = (id)object; + OWSAssert(!possiblyRead.read); + [interactionsToMarkAsRead addObject:possiblyRead]; + }]; + + for (id interaction in interactionsToMarkAsRead) { + // * Don't send a read receipt in response to a read receipt. + // * Don't update expiration; we'll do that in the next statement. + [interaction markAsReadWithTransaction:transaction sendReadReceipt:NO updateExpiration:NO]; + + if ([interaction isKindOfClass:[TSMessage class]]) { + TSMessage *otherMessage = (TSMessage *)interaction; + + // Update expiration using the timestamp from the readReceipt. + [OWSDisappearingMessagesJob setExpirationForMessage:otherMessage + expirationStartedAt:readReceipt.timestamp]; + + // Fire event that will cancel any pending notifications for this message. + dispatch_async(dispatch_get_main_queue(), ^{ + [[NSNotificationCenter defaultCenter] + postNotificationName:OWSReadReceiptsProcessorMarkedMessageAsReadNotification + object:otherMessage]; + }); } - }]; + } // If it was previously saved, no need to keep it around any longer. - [readReceipt remove]; + [readReceipt removeWithTransaction:transaction]; } else { DDLogDebug(@"%@ Received read receipt for an unknown message. Saving it for later.", self.tag); - [readReceipt save]; + [readReceipt saveWithTransaction:transaction]; } } } diff --git a/SignalServiceKit/src/Devices/OWSRecordTranscriptJob.h b/SignalServiceKit/src/Devices/OWSRecordTranscriptJob.h index b32a07212..35e780e9c 100644 --- a/SignalServiceKit/src/Devices/OWSRecordTranscriptJob.h +++ b/SignalServiceKit/src/Devices/OWSRecordTranscriptJob.h @@ -1,5 +1,6 @@ -// Created by Michael Kirk on 9/23/16. -// Copyright © 2016 Open Whisper Systems. All rights reserved. +// +// Copyright (c) 2017 Open Whisper Systems. All rights reserved. +// NS_ASSUME_NONNULL_BEGIN @@ -7,6 +8,7 @@ NS_ASSUME_NONNULL_BEGIN @class OWSMessageSender; @class TSNetworkManager; @class TSAttachmentStream; +@class YapDatabaseReadWriteTransaction; @interface OWSRecordTranscriptJob : NSObject @@ -15,7 +17,8 @@ NS_ASSUME_NONNULL_BEGIN messageSender:(OWSMessageSender *)messageSender networkManager:(TSNetworkManager *)networkManager NS_DESIGNATED_INITIALIZER; -- (void)runWithAttachmentHandler:(void (^)(TSAttachmentStream *attachmentStream))attachmentHandler; +- (void)runWithAttachmentHandler:(void (^)(TSAttachmentStream *attachmentStream))attachmentHandler + transaction:(YapDatabaseReadWriteTransaction *)transaction; @end diff --git a/SignalServiceKit/src/Devices/OWSRecordTranscriptJob.m b/SignalServiceKit/src/Devices/OWSRecordTranscriptJob.m index 2a6bcea7b..29e20c273 100644 --- a/SignalServiceKit/src/Devices/OWSRecordTranscriptJob.m +++ b/SignalServiceKit/src/Devices/OWSRecordTranscriptJob.m @@ -41,16 +41,22 @@ NS_ASSUME_NONNULL_BEGIN } - (void)runWithAttachmentHandler:(void (^)(TSAttachmentStream *attachmentStream))attachmentHandler + transaction:(YapDatabaseReadWriteTransaction *)transaction { + OWSAssert(transaction); + OWSIncomingSentMessageTranscript *transcript = self.incomingSentMessageTranscript; DDLogDebug(@"%@ Recording transcript: %@", self.tag, transcript); if (transcript.isEndSessionMessage) { DDLogInfo(@"%@ EndSession was sent to recipient: %@.", self.tag, transcript.recipientId); - [self.storageManager deleteAllSessionsForContact:transcript.recipientId]; + // NOTE: We dispatch_sync() here. + dispatch_sync([OWSDispatch sessionStoreQueue], ^{ + [self.storageManager deleteAllSessionsForContact:transcript.recipientId]; + }); [[[TSInfoMessage alloc] initWithTimestamp:transcript.timestamp inThread:transcript.thread - messageType:TSInfoMessageTypeSessionDidEnd] save]; + messageType:TSInfoMessageTypeSessionDidEnd] saveWithTransaction:transaction]; // Don't continue processing lest we print a bubble for the session reset. return; @@ -62,7 +68,8 @@ NS_ASSUME_NONNULL_BEGIN timestamp:transcript.timestamp relay:transcript.relay thread:thread - networkManager:self.networkManager]; + networkManager:self.networkManager + transaction:transaction]; // TODO group updates. Currently desktop doesn't support group updates, so not a problem yet. TSOutgoingMessage *outgoingMessage = @@ -79,10 +86,13 @@ NS_ASSUME_NONNULL_BEGIN return; } - [self.messageSender handleMessageSentRemotely:outgoingMessage sentAt:transcript.expirationStartedAt]; + [self.messageSender handleMessageSentRemotely:outgoingMessage + sentAt:transcript.expirationStartedAt + transaction:transaction]; [attachmentsProcessor fetchAttachmentsForMessage:outgoingMessage + transaction:transaction success:attachmentHandler failure:^(NSError *_Nonnull error) { DDLogError(@"%@ failed to fetch transcripts attachments for message: %@", @@ -101,7 +111,7 @@ NS_ASSUME_NONNULL_BEGIN expiresInSeconds:transcript.expirationDuration expireStartedAt:transcript.expirationStartedAt]; // Since textMessage is a new message, updateWithWasSentAndDelivered will save it. - [textMessage updateWithWasSentAndDelivered]; + [textMessage updateWithWasSentFromLinkedDeviceWithTransaction:transaction]; } } diff --git a/SignalServiceKit/src/Messages/Attachments/OWSAttachmentsProcessor.h b/SignalServiceKit/src/Messages/Attachments/OWSAttachmentsProcessor.h index 1fb3ef4f2..4f5a8713a 100644 --- a/SignalServiceKit/src/Messages/Attachments/OWSAttachmentsProcessor.h +++ b/SignalServiceKit/src/Messages/Attachments/OWSAttachmentsProcessor.h @@ -8,12 +8,14 @@ extern NSString *const kAttachmentDownloadProgressNotification; extern NSString *const kAttachmentDownloadProgressKey; extern NSString *const kAttachmentDownloadAttachmentIDKey; +@class TSAttachmentPointer; +@class TSAttachmentStream; @class TSMessage; -@class TSThread; @class TSNetworkManager; +@class TSStorageManager; +@class TSThread; @class OWSSignalServiceProtosAttachmentPointer; -@class TSAttachmentStream; -@class TSAttachmentPointer; +@class YapDatabaseReadWriteTransaction; /** * Given incoming attachment protos, determines which we support. @@ -31,7 +33,8 @@ extern NSString *const kAttachmentDownloadAttachmentIDKey; timestamp:(uint64_t)timestamp relay:(nullable NSString *)relay thread:(TSThread *)thread - networkManager:(TSNetworkManager *)networkManager NS_DESIGNATED_INITIALIZER; + networkManager:(TSNetworkManager *)networkManager + transaction:(YapDatabaseReadWriteTransaction *)transaction NS_DESIGNATED_INITIALIZER; /* * Retry fetching failed attachment download @@ -40,6 +43,11 @@ extern NSString *const kAttachmentDownloadAttachmentIDKey; networkManager:(TSNetworkManager *)networkManager NS_DESIGNATED_INITIALIZER; - (void)fetchAttachmentsForMessage:(nullable TSMessage *)message + storageManager:(TSStorageManager *)storageManager + success:(void (^)(TSAttachmentStream *attachmentStream))successHandler + failure:(void (^)(NSError *error))failureHandler; +- (void)fetchAttachmentsForMessage:(nullable TSMessage *)message + transaction:(YapDatabaseReadWriteTransaction *)transaction success:(void (^)(TSAttachmentStream *attachmentStream))successHandler failure:(void (^)(NSError *error))failureHandler; @end diff --git a/SignalServiceKit/src/Messages/Attachments/OWSAttachmentsProcessor.m b/SignalServiceKit/src/Messages/Attachments/OWSAttachmentsProcessor.m index 4cd4415f4..842084e8f 100644 --- a/SignalServiceKit/src/Messages/Attachments/OWSAttachmentsProcessor.m +++ b/SignalServiceKit/src/Messages/Attachments/OWSAttachmentsProcessor.m @@ -15,6 +15,7 @@ #import "TSInfoMessage.h" #import "TSMessage.h" #import "TSNetworkManager.h" +#import "TSStorageManager.h" #import "TSThread.h" #import @@ -58,6 +59,7 @@ static const CGFloat kAttachmentDownloadProgressTheta = 0.001f; relay:(nullable NSString *)relay thread:(TSThread *)thread networkManager:(TSNetworkManager *)networkManager + transaction:(YapDatabaseReadWriteTransaction *)transaction { self = [super init]; if (!self) { @@ -97,7 +99,7 @@ static const CGFloat kAttachmentDownloadProgressTheta = 0.001f; [attachmentIds addObject:pointer.uniqueId]; - [pointer save]; + [pointer saveWithTransaction:transaction]; [supportedAttachmentPointers addObject:pointer]; [supportedAttachmentIds addObject:pointer.uniqueId]; } @@ -110,31 +112,60 @@ static const CGFloat kAttachmentDownloadProgressTheta = 0.001f; } - (void)fetchAttachmentsForMessage:(nullable TSMessage *)message + storageManager:(TSStorageManager *)storageManager success:(void (^)(TSAttachmentStream *attachmentStream))successHandler failure:(void (^)(NSError *error))failureHandler { + [[storageManager newDatabaseConnection] readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) { + [self fetchAttachmentsForMessage:message + transaction:transaction + success:successHandler + failure:failureHandler]; + }]; +} + +- (void)fetchAttachmentsForMessage:(nullable TSMessage *)message + transaction:(YapDatabaseReadWriteTransaction *)transaction + success:(void (^)(TSAttachmentStream *attachmentStream))successHandler + failure:(void (^)(NSError *error))failureHandler +{ + OWSAssert(transaction); + for (TSAttachmentPointer *attachmentPointer in self.supportedAttachmentPointers) { - [self retrieveAttachment:attachmentPointer message:message success:successHandler failure:failureHandler]; + [self retrieveAttachment:attachmentPointer + message:message + transaction:transaction + success:successHandler + failure:failureHandler]; } } - (void)retrieveAttachment:(TSAttachmentPointer *)attachment message:(nullable TSMessage *)message + transaction:(YapDatabaseReadWriteTransaction *)transaction success:(void (^)(TSAttachmentStream *attachmentStream))successHandler failure:(void (^)(NSError *error))failureHandler { - [self setAttachment:attachment isDownloadingInMessage:message]; + OWSAssert(transaction); + + [self setAttachment:attachment isDownloadingInMessage:message transaction:transaction]; void (^markAndHandleFailure)(NSError *) = ^(NSError *error) { - [self setAttachment:attachment didFailInMessage:message]; - return failureHandler(error); + // Ensure enclosing transaction is complete. + dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{ + [self setAttachment:attachment didFailInMessage:message]; + failureHandler(error); + }); }; void (^markAndHandleSuccess)(TSAttachmentStream *attachmentStream) = ^(TSAttachmentStream *attachmentStream) { - successHandler(attachmentStream); - if (message) { - [message touch]; - } + // Ensure enclosing transaction is complete. + dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{ + successHandler(attachmentStream); + if (message) { + [message touch]; + } + }); }; if (attachment.serverId < 100) { @@ -352,12 +383,16 @@ static const CGFloat kAttachmentDownloadProgressTheta = 0.001f; }); } -- (void)setAttachment:(TSAttachmentPointer *)pointer isDownloadingInMessage:(nullable TSMessage *)message +- (void)setAttachment:(TSAttachmentPointer *)pointer + isDownloadingInMessage:(nullable TSMessage *)message + transaction:(YapDatabaseReadWriteTransaction *)transaction { + OWSAssert(transaction); + pointer.state = TSAttachmentPointerStateDownloading; - [pointer save]; + [pointer saveWithTransaction:transaction]; if (message) { - [message touch]; + [message touchWithTransaction:transaction]; } } diff --git a/SignalServiceKit/src/Messages/Interactions/TSIncomingMessage.h b/SignalServiceKit/src/Messages/Interactions/TSIncomingMessage.h index 262852131..b8b543a10 100644 --- a/SignalServiceKit/src/Messages/Interactions/TSIncomingMessage.h +++ b/SignalServiceKit/src/Messages/Interactions/TSIncomingMessage.h @@ -101,7 +101,9 @@ extern NSString *const TSIncomingMessageWasReadOnThisDeviceNotification; * When the message was created in milliseconds since epoch * */ -+ (nullable instancetype)findMessageWithAuthorId:(NSString *)authorId timestamp:(uint64_t)timestamp; ++ (nullable instancetype)findMessageWithAuthorId:(NSString *)authorId + timestamp:(uint64_t)timestamp + transaction:(YapDatabaseReadWriteTransaction *)transaction; @property (nonatomic, readonly) NSString *authorId; diff --git a/SignalServiceKit/src/Messages/Interactions/TSIncomingMessage.m b/SignalServiceKit/src/Messages/Interactions/TSIncomingMessage.m index 240065142..3812b3425 100644 --- a/SignalServiceKit/src/Messages/Interactions/TSIncomingMessage.m +++ b/SignalServiceKit/src/Messages/Interactions/TSIncomingMessage.m @@ -75,37 +75,39 @@ NSString *const TSIncomingMessageWasReadOnThisDeviceNotification = @"TSIncomingM return self; } -+ (nullable instancetype)findMessageWithAuthorId:(NSString *)authorId timestamp:(uint64_t)timestamp ++ (nullable instancetype)findMessageWithAuthorId:(NSString *)authorId + timestamp:(uint64_t)timestamp + transaction:(YapDatabaseReadWriteTransaction *)transaction { + OWSAssert(transaction); + __block TSIncomingMessage *foundMessage; - [self.dbReadWriteConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) { - // In theory we could build a new secondaryIndex for (authorId,timestamp), but in practice there should - // be *very* few (millisecond) timestamps with multiple authors. - [TSDatabaseSecondaryIndexes - enumerateMessagesWithTimestamp:timestamp - withBlock:^(NSString *collection, NSString *key, BOOL *stop) { - TSInteraction *interaction = - [TSInteraction fetchObjectWithUniqueID:key transaction:transaction]; - if ([interaction isKindOfClass:[TSIncomingMessage class]]) { - TSIncomingMessage *message = (TSIncomingMessage *)interaction; - - // Only groupthread sets authorId, thus this crappy code. - // TODO ALL incoming messages should have an authorId. - NSString *messageAuthorId; - if (message.authorId) { // Group Thread - messageAuthorId = message.authorId; - } else { // Contact Thread - messageAuthorId = - [TSContactThread contactIdFromThreadId:message.uniqueThreadId]; - } - - if ([messageAuthorId isEqualToString:authorId]) { - foundMessage = message; - } + // In theory we could build a new secondaryIndex for (authorId,timestamp), but in practice there should + // be *very* few (millisecond) timestamps with multiple authors. + [TSDatabaseSecondaryIndexes + enumerateMessagesWithTimestamp:timestamp + withBlock:^(NSString *collection, NSString *key, BOOL *stop) { + TSInteraction *interaction = + [TSInteraction fetchObjectWithUniqueID:key transaction:transaction]; + if ([interaction isKindOfClass:[TSIncomingMessage class]]) { + TSIncomingMessage *message = (TSIncomingMessage *)interaction; + + // Only groupthread sets authorId, thus this crappy code. + // TODO ALL incoming messages should have an authorId. + NSString *messageAuthorId; + if (message.authorId) { // Group Thread + messageAuthorId = message.authorId; + } else { // Contact Thread + messageAuthorId = + [TSContactThread contactIdFromThreadId:message.uniqueThreadId]; + } + + if ([messageAuthorId isEqualToString:authorId]) { + foundMessage = message; } } - usingTransaction:transaction]; - }]; + } + usingTransaction:transaction]; return foundMessage; } diff --git a/SignalServiceKit/src/Messages/Interactions/TSOutgoingMessage.h b/SignalServiceKit/src/Messages/Interactions/TSOutgoingMessage.h index ce38af07e..0774d90cc 100644 --- a/SignalServiceKit/src/Messages/Interactions/TSOutgoingMessage.h +++ b/SignalServiceKit/src/Messages/Interactions/TSOutgoingMessage.h @@ -170,8 +170,7 @@ typedef NS_ENUM(NSInteger, TSGroupMetaMessage) { - (void)updateWithCustomMessage:(NSString *)customMessage; - (void)updateWithWasDeliveredWithTransaction:(YapDatabaseReadWriteTransaction *)transaction; - (void)updateWithWasDelivered; -- (void)updateWithWasSentAndDelivered; -- (void)updateWithWasSentAndDeliveredFromLinkedDevice; +- (void)updateWithWasSentFromLinkedDeviceWithTransaction:(YapDatabaseReadWriteTransaction *)transaction; - (void)updateWithSingleGroupRecipient:(NSString *)singleGroupRecipient transaction:(YapDatabaseReadWriteTransaction *)transaction; diff --git a/SignalServiceKit/src/Messages/Interactions/TSOutgoingMessage.m b/SignalServiceKit/src/Messages/Interactions/TSOutgoingMessage.m index 61a1a8139..86b540b8c 100644 --- a/SignalServiceKit/src/Messages/Interactions/TSOutgoingMessage.m +++ b/SignalServiceKit/src/Messages/Interactions/TSOutgoingMessage.m @@ -323,27 +323,16 @@ NSString *const kTSOutgoingMessageSentRecipientAll = @"kTSOutgoingMessageSentRec }]; } -- (void)updateWithWasSentAndDelivered +- (void)updateWithWasSentFromLinkedDeviceWithTransaction:(YapDatabaseReadWriteTransaction *)transaction { - [self.dbReadWriteConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) { - [self applyChangeToSelfAndLatestOutgoingMessage:transaction - changeBlock:^(TSOutgoingMessage *message) { - [message setMessageState:TSOutgoingMessageStateSentToService]; - [message setWasDelivered:YES]; - }]; - }]; -} + OWSAssert(transaction); -- (void)updateWithWasSentAndDeliveredFromLinkedDevice -{ - [self.dbReadWriteConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) { - [self applyChangeToSelfAndLatestOutgoingMessage:transaction - changeBlock:^(TSOutgoingMessage *message) { - [message setMessageState:TSOutgoingMessageStateSentToService]; - [message setWasDelivered:YES]; - [message setIsFromLinkedDevice:YES]; - }]; - }]; + [self applyChangeToSelfAndLatestOutgoingMessage:transaction + changeBlock:^(TSOutgoingMessage *message) { + [message setMessageState:TSOutgoingMessageStateSentToService]; + [message setWasDelivered:YES]; + [message setIsFromLinkedDevice:YES]; + }]; } - (void)updateWithSingleGroupRecipient:(NSString *)singleGroupRecipient diff --git a/SignalServiceKit/src/Messages/OWSBatchMessageProcessor.m b/SignalServiceKit/src/Messages/OWSBatchMessageProcessor.m index 45fdb55e0..f4109a3f2 100644 --- a/SignalServiceKit/src/Messages/OWSBatchMessageProcessor.m +++ b/SignalServiceKit/src/Messages/OWSBatchMessageProcessor.m @@ -8,6 +8,7 @@ #import "TSMessagesManager.h" #import "TSStorageManager.h" #import "TSYapDatabaseObject.h" +#import "Threading.h" #import #import #import @@ -198,10 +199,12 @@ NSString *const OWSBatchMessageProcessingJobFinderExtensionGroup = @"OWSBatchMes @interface OWSBatchMessageProcessingQueue : NSObject @property (nonatomic, readonly) TSMessagesManager *messagesManager; +@property (nonatomic, readonly) YapDatabaseConnection *dbReadWriteConnection; @property (nonatomic, readonly) OWSBatchMessageProcessingJobFinder *finder; @property (nonatomic) BOOL isDrainingQueue; - (instancetype)initWithMessagesManager:(TSMessagesManager *)messagesManager + storageManager:(TSStorageManager *)storageManager finder:(OWSBatchMessageProcessingJobFinder *)finder NS_DESIGNATED_INITIALIZER; - (instancetype)init NS_UNAVAILABLE; @@ -212,6 +215,7 @@ NSString *const OWSBatchMessageProcessingJobFinderExtensionGroup = @"OWSBatchMes @implementation OWSBatchMessageProcessingQueue - (instancetype)initWithMessagesManager:(TSMessagesManager *)messagesManager + storageManager:(TSStorageManager *)storageManager finder:(OWSBatchMessageProcessingJobFinder *)finder { OWSSingletonAssert(); @@ -222,12 +226,28 @@ NSString *const OWSBatchMessageProcessingJobFinderExtensionGroup = @"OWSBatchMes } _messagesManager = messagesManager; + _dbReadWriteConnection = [storageManager newDatabaseConnection]; _finder = finder; _isDrainingQueue = NO; + [[NSNotificationCenter defaultCenter] addObserver:self + selector:@selector(databaseViewRegistrationComplete) + name:kNSNotificationName_DatabaseViewRegistrationComplete + object:nil]; + return self; } +- (void)dealloc +{ + [[NSNotificationCenter defaultCenter] removeObserver:self]; +} + +- (void)databaseViewRegistrationComplete +{ + [self drainQueue]; +} + #pragma mark - instance methods - (void)enqueueEnvelopeData:(NSData *)envelopeData plaintextData:(NSData *_Nullable)plaintextData @@ -239,7 +259,13 @@ NSString *const OWSBatchMessageProcessingJobFinderExtensionGroup = @"OWSBatchMes - (void)drainQueue { - dispatch_async(dispatch_get_main_queue(), ^{ + DispatchMainThreadSafe(^{ + if ([TSDatabaseView hasPendingViewRegistrations]) { + // We don't want to process incoming messages until database + // view registration is complete. + return; + } + if (self.isDrainingQueue) { return; } @@ -274,8 +300,12 @@ NSString *const OWSBatchMessageProcessingJobFinderExtensionGroup = @"OWSBatchMes - (void)processJob:(OWSBatchMessageProcessingJob *)job completion:(void (^)())completion { - dispatch_async(dispatch_get_main_queue(), ^{ - [self.messagesManager processEnvelope:job.envelopeProto plaintextData:job.plaintextData]; + dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{ + [self.dbReadWriteConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) { + [self.messagesManager processEnvelope:job.envelopeProto + plaintextData:job.plaintextData + transaction:transaction]; + }]; completion(); }); } @@ -309,6 +339,7 @@ NSString *const OWSBatchMessageProcessingJobFinderExtensionGroup = @"OWSBatchMes - (instancetype)initWithDBConnection:(YapDatabaseConnection *)dbConnection messagesManager:(TSMessagesManager *)messagesManager + storageManager:(TSStorageManager *)storageManager { OWSSingletonAssert(); @@ -320,7 +351,9 @@ NSString *const OWSBatchMessageProcessingJobFinderExtensionGroup = @"OWSBatchMes OWSBatchMessageProcessingJobFinder *finder = [[OWSBatchMessageProcessingJobFinder alloc] initWithDBConnection:dbConnection]; OWSBatchMessageProcessingQueue *processingQueue = - [[OWSBatchMessageProcessingQueue alloc] initWithMessagesManager:messagesManager finder:finder]; + [[OWSBatchMessageProcessingQueue alloc] initWithMessagesManager:messagesManager + storageManager:storageManager + finder:finder]; _processingQueue = processingQueue; @@ -332,8 +365,9 @@ NSString *const OWSBatchMessageProcessingJobFinderExtensionGroup = @"OWSBatchMes // For concurrency coherency we use the same dbConnection to persist and read the unprocessed envelopes YapDatabaseConnection *dbConnection = [[TSStorageManager sharedManager].database newConnection]; TSMessagesManager *messagesManager = [TSMessagesManager sharedManager]; + TSStorageManager *storageManager = [TSStorageManager sharedManager]; - return [self initWithDBConnection:dbConnection messagesManager:messagesManager]; + return [self initWithDBConnection:dbConnection messagesManager:messagesManager storageManager:storageManager]; } + (instancetype)sharedInstance diff --git a/SignalServiceKit/src/Messages/OWSMessageReceiver.m b/SignalServiceKit/src/Messages/OWSMessageReceiver.m index cde7ade6b..4f896d9de 100644 --- a/SignalServiceKit/src/Messages/OWSMessageReceiver.m +++ b/SignalServiceKit/src/Messages/OWSMessageReceiver.m @@ -9,6 +9,7 @@ #import "TSMessagesManager.h" #import "TSStorageManager.h" #import "TSYapDatabaseObject.h" +#import "Threading.h" #import #import #import @@ -226,9 +227,24 @@ NSString *const OWSMessageProcessingJobFinderExtensionGroup = @"OWSMessageProces _finder = finder; _isDrainingQueue = NO; + [[NSNotificationCenter defaultCenter] addObserver:self + selector:@selector(databaseViewRegistrationComplete) + name:kNSNotificationName_DatabaseViewRegistrationComplete + object:nil]; + return self; } +- (void)dealloc +{ + [[NSNotificationCenter defaultCenter] removeObserver:self]; +} + +- (void)databaseViewRegistrationComplete +{ + [self drainQueue]; +} + #pragma mark - instance methods - (void)enqueueEnvelopeForProcessing:(OWSSignalServiceProtosEnvelope *)envelope @@ -238,14 +254,20 @@ NSString *const OWSMessageProcessingJobFinderExtensionGroup = @"OWSMessageProces - (void)drainQueue { - AssertIsOnMainThread(); - - if (self.isDrainingQueue) { - return; - } - self.isDrainingQueue = YES; - - [self drainQueueWorkStep]; + DispatchMainThreadSafe(^{ + if ([TSDatabaseView hasPendingViewRegistrations]) { + // We don't want to process incoming messages until database + // view registration is complete. + return; + } + + if (self.isDrainingQueue) { + return; + } + self.isDrainingQueue = YES; + + [self drainQueueWorkStep]; + }); } - (void)drainQueueWorkStep @@ -369,9 +391,7 @@ NSString *const OWSMessageProcessingJobFinderExtensionGroup = @"OWSMessageProces - (void)handleAnyUnprocessedEnvelopesAsync { - dispatch_async(dispatch_get_main_queue(), ^{ - [self.processingQueue drainQueue]; - }); + [self.processingQueue drainQueue]; } - (void)handleReceivedEnvelope:(OWSSignalServiceProtosEnvelope *)envelope diff --git a/SignalServiceKit/src/Messages/OWSMessageSender.h b/SignalServiceKit/src/Messages/OWSMessageSender.h index 0739cf209..d789c5fce 100644 --- a/SignalServiceKit/src/Messages/OWSMessageSender.h +++ b/SignalServiceKit/src/Messages/OWSMessageSender.h @@ -15,6 +15,7 @@ NS_ASSUME_NONNULL_BEGIN @class TSOutgoingMessage; @class TSStorageManager; @class TSThread; +@class YapDatabaseReadWriteTransaction; @protocol ContactsManagerProtocol; @@ -67,6 +68,10 @@ NS_SWIFT_NAME(MessageSender) - (void)sendMessage:(TSOutgoingMessage *)message success:(void (^)())successHandler failure:(void (^)(NSError *error))failureHandler; +- (void)sendMessage:(TSOutgoingMessage *)message + transaction:(YapDatabaseReadWriteTransaction *_Nullable)transaction + success:(void (^)())successHandler + failure:(void (^)(NSError *error))failureHandler; /** * Takes care of allocating and uploading the attachment, then sends the message. @@ -89,7 +94,9 @@ NS_SWIFT_NAME(MessageSender) success:(void (^)())successHandler failure:(void (^)(NSError *error))failureHandler; -- (void)handleMessageSentRemotely:(TSOutgoingMessage *)message sentAt:(uint64_t)sentAt; +- (void)handleMessageSentRemotely:(TSOutgoingMessage *)message + sentAt:(uint64_t)sentAt + transaction:(YapDatabaseReadWriteTransaction *)transaction; /** * Set local configuration to match that of the of `outgoingMessage`'s sender diff --git a/SignalServiceKit/src/Messages/OWSMessageSender.m b/SignalServiceKit/src/Messages/OWSMessageSender.m index 5175683ba..7d008334e 100644 --- a/SignalServiceKit/src/Messages/OWSMessageSender.m +++ b/SignalServiceKit/src/Messages/OWSMessageSender.m @@ -424,11 +424,22 @@ NSString *const OWSMessageSenderRateLimitedException = @"RateLimitedException"; - (void)sendMessage:(TSOutgoingMessage *)message success:(void (^)())successHandler failure:(void (^)(NSError *error))failureHandler +{ + [self sendMessage:message transaction:nil success:successHandler failure:failureHandler]; +} + +- (void)sendMessage:(TSOutgoingMessage *)message + transaction:(YapDatabaseReadWriteTransaction *_Nullable)transaction + success:(void (^)())successHandler + failure:(void (^)(NSError *error))failureHandler { OWSAssert(message); - AssertIsOnMainThread(); - [message updateWithMessageState:TSOutgoingMessageStateAttemptingOut]; + if (transaction) { + [message updateWithMessageState:TSOutgoingMessageStateAttemptingOut transaction:transaction]; + } else { + [message updateWithMessageState:TSOutgoingMessageStateAttemptingOut]; + } OWSSendMessageOperation *sendMessageOperation = [[OWSSendMessageOperation alloc] initWithMessage:message messageSender:self success:successHandler @@ -1082,9 +1093,14 @@ NSString *const OWSMessageSenderRateLimitedException = @"RateLimitedException"; [OWSDisappearingMessagesJob setExpirationForMessage:message]; } -- (void)handleMessageSentRemotely:(TSOutgoingMessage *)message sentAt:(uint64_t)sentAt +- (void)handleMessageSentRemotely:(TSOutgoingMessage *)message + sentAt:(uint64_t)sentAt + transaction:(YapDatabaseReadWriteTransaction *)transaction { - [message updateWithWasSentAndDeliveredFromLinkedDevice]; + OWSAssert(message); + OWSAssert(transaction); + + [message updateWithWasSentFromLinkedDeviceWithTransaction:transaction]; [self becomeConsistentWithDisappearingConfigurationForMessage:message]; [OWSDisappearingMessagesJob setExpirationForMessage:message expirationStartedAt:sentAt]; } diff --git a/SignalServiceKit/src/Messages/TSMessagesManager.h b/SignalServiceKit/src/Messages/TSMessagesManager.h index bf83d0d0b..ce1e88c92 100644 --- a/SignalServiceKit/src/Messages/TSMessagesManager.h +++ b/SignalServiceKit/src/Messages/TSMessagesManager.h @@ -36,7 +36,9 @@ typedef void (^MessageManagerCompletionBlock)(); successBlock:(DecryptSuccessBlock)successBlock failureBlock:(DecryptFailureBlock)failureBlock; -- (void)processEnvelope:(OWSSignalServiceProtosEnvelope *)envelope plaintextData:(NSData *_Nullable)plaintextData; +- (void)processEnvelope:(OWSSignalServiceProtosEnvelope *)envelope + plaintextData:(NSData *_Nullable)plaintextData + transaction:(YapDatabaseReadWriteTransaction *)transaction; - (NSUInteger)unreadMessagesCount; - (NSUInteger)unreadMessagesCountExcept:(TSThread *)thread; diff --git a/SignalServiceKit/src/Messages/TSMessagesManager.m b/SignalServiceKit/src/Messages/TSMessagesManager.m index abd7b185e..1643ceaab 100644 --- a/SignalServiceKit/src/Messages/TSMessagesManager.m +++ b/SignalServiceKit/src/Messages/TSMessagesManager.m @@ -447,9 +447,12 @@ NS_ASSUME_NONNULL_BEGIN #pragma mark - message handling -// TODO: Add transaction parameter. -- (void)processEnvelope:(OWSSignalServiceProtosEnvelope *)envelope plaintextData:(NSData *_Nullable)plaintextData +- (void)processEnvelope:(OWSSignalServiceProtosEnvelope *)envelope + plaintextData:(NSData *_Nullable)plaintextData + transaction:(YapDatabaseReadWriteTransaction *)transaction { + OWSAssert(envelope); + OWSAssert(transaction); OWSAssert([TSAccountManager isRegistered]); DDLogInfo(@"%@ received envelope: %@", self.tag, [self descriptionForEnvelope:envelope]); @@ -461,7 +464,7 @@ NS_ASSUME_NONNULL_BEGIN case OWSSignalServiceProtosEnvelopeTypeCiphertext: case OWSSignalServiceProtosEnvelopeTypePrekeyBundle: if (plaintextData) { - [self handleEnvelope:envelope plaintextData:plaintextData]; + [self handleEnvelope:envelope plaintextData:plaintextData transaction:transaction]; } else { OWSFail( @"%@ missing decrypted data for envelope: %@", self.tag, [self descriptionForEnvelope:envelope]); @@ -469,7 +472,7 @@ NS_ASSUME_NONNULL_BEGIN break; case OWSSignalServiceProtosEnvelopeTypeReceipt: OWSAssert(!plaintextData); - [self handleDeliveryReceipt:envelope]; + [self handleDeliveryReceipt:envelope transaction:transaction]; break; // Other messages are just dismissed for now. case OWSSignalServiceProtosEnvelopeTypeKeyExchange: @@ -485,26 +488,33 @@ NS_ASSUME_NONNULL_BEGIN } - (void)handleDeliveryReceipt:(OWSSignalServiceProtosEnvelope *)envelope + transaction:(YapDatabaseReadWriteTransaction *)transaction { - [self.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) { - TSInteraction *interaction = - [TSInteraction interactionForTimestamp:envelope.timestamp withTransaction:transaction]; - if ([interaction isKindOfClass:[TSOutgoingMessage class]]) { - TSOutgoingMessage *outgoingMessage = (TSOutgoingMessage *)interaction; - [outgoingMessage updateWithWasDeliveredWithTransaction:transaction]; - } - }]; + OWSAssert(envelope); + OWSAssert(transaction); + + TSInteraction *interaction = [TSInteraction interactionForTimestamp:envelope.timestamp withTransaction:transaction]; + if ([interaction isKindOfClass:[TSOutgoingMessage class]]) { + TSOutgoingMessage *outgoingMessage = (TSOutgoingMessage *)interaction; + [outgoingMessage updateWithWasDeliveredWithTransaction:transaction]; + } } -- (void)handleEnvelope:(OWSSignalServiceProtosEnvelope *)envelope plaintextData:(NSData *)plaintextData +- (void)handleEnvelope:(OWSSignalServiceProtosEnvelope *)envelope + plaintextData:(NSData *)plaintextData + transaction:(YapDatabaseReadWriteTransaction *)transaction { + OWSAssert(envelope); + OWSAssert(plaintextData); + OWSAssert(transaction); OWSAssert(envelope.hasTimestamp && envelope.timestamp > 0); OWSAssert(envelope.hasSource && envelope.source.length > 0); OWSAssert(envelope.hasSourceDevice && envelope.sourceDevice > 0); BOOL duplicateEnvelope = [self.incomingMessageFinder existsMessageWithTimestamp:envelope.timestamp sourceId:envelope.source - sourceDeviceId:envelope.sourceDevice]; + sourceDeviceId:envelope.sourceDevice + transaction:transaction]; if (duplicateEnvelope) { DDLogInfo(@"%@ Ignoring previously received envelope from %@ with timestamp: %llu", self.tag, @@ -518,9 +528,9 @@ NS_ASSUME_NONNULL_BEGIN DDLogInfo(@"%@ handling content: ", self.tag, [self descriptionForContent:content]); if (content.hasSyncMessage) { - [self handleIncomingEnvelope:envelope withSyncMessage:content.syncMessage]; + [self handleIncomingEnvelope:envelope withSyncMessage:content.syncMessage transaction:transaction]; } else if (content.hasDataMessage) { - [self handleIncomingEnvelope:envelope withDataMessage:content.dataMessage]; + [self handleIncomingEnvelope:envelope withDataMessage:content.dataMessage transaction:transaction]; } else if (content.hasCallMessage) { [self handleIncomingEnvelope:envelope withCallMessage:content.callMessage]; } else if (content.hasNullMessage) { @@ -533,20 +543,23 @@ NS_ASSUME_NONNULL_BEGIN [OWSSignalServiceProtosDataMessage parseFromData:plaintextData]; DDLogInfo(@"%@ handling message: ", self.tag, [self descriptionForDataMessage:dataMessage]); - [self handleIncomingEnvelope:envelope withDataMessage:dataMessage]; + [self handleIncomingEnvelope:envelope withDataMessage:dataMessage transaction:transaction]; } else { OWSProdInfoWEnvelope([OWSAnalyticsEvents messageManagerErrorEnvelopeNoActionablePayload], envelope); } } -- (void)handleIncomingEnvelope:(OWSSignalServiceProtosEnvelope *)incomingEnvelope +- (void)handleIncomingEnvelope:(OWSSignalServiceProtosEnvelope *)envelope withDataMessage:(OWSSignalServiceProtosDataMessage *)dataMessage + transaction:(YapDatabaseReadWriteTransaction *)transaction { - OWSAssert([NSThread isMainThread]); + OWSAssert(envelope); + OWSAssert(dataMessage); + OWSAssert(transaction); if ([dataMessage hasProfileKey]) { NSData *profileKey = [dataMessage profileKey]; - NSString *recipientId = incomingEnvelope.source; + NSString *recipientId = envelope.source; if (profileKey.length == kAES256_KeyByteLength) { [self.profileManager setProfileKeyData:profileKey forRecipientId:recipientId]; } else { @@ -556,64 +569,56 @@ NS_ASSUME_NONNULL_BEGIN } if (dataMessage.hasGroup) { - __block BOOL unknownGroup = NO; - [self.dbConnection readWithBlock:^(YapDatabaseReadTransaction *transaction) { - TSGroupModel *emptyModelToFillOutId = - [[TSGroupModel alloc] initWithTitle:nil memberIds:nil image:nil groupId:dataMessage.group.id]; - TSGroupThread *gThread = [TSGroupThread threadWithGroupModel:emptyModelToFillOutId transaction:transaction]; - if (gThread == nil && dataMessage.group.type != OWSSignalServiceProtosGroupContextTypeUpdate) { - unknownGroup = YES; - } - }]; + TSGroupModel *emptyModelToFillOutId = + [[TSGroupModel alloc] initWithTitle:nil memberIds:nil image:nil groupId:dataMessage.group.id]; + TSGroupThread *gThread = [TSGroupThread threadWithGroupModel:emptyModelToFillOutId transaction:transaction]; + BOOL unknownGroup = NO; + if (gThread == nil && dataMessage.group.type != OWSSignalServiceProtosGroupContextTypeUpdate) { + unknownGroup = YES; + } if (unknownGroup) { if (dataMessage.group.type == OWSSignalServiceProtosGroupContextTypeRequestInfo) { - DDLogInfo(@"%@ Ignoring group info request for group I don't know about from: %@", - self.tag, - incomingEnvelope.source); + DDLogInfo( + @"%@ Ignoring group info request for group I don't know about from: %@", self.tag, envelope.source); return; } // FIXME: https://github.com/WhisperSystems/Signal-iOS/issues/1340 DDLogInfo(@"%@ Received message from group that I left or don't know about from: %@", self.tag, - envelopeAddress(incomingEnvelope)); + envelopeAddress(envelope)); - NSString *recipientId = incomingEnvelope.source; + NSString *recipientId = envelope.source; - __block TSThread *thread; - [[TSStorageManager sharedManager].dbReadWriteConnection - readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) { - thread = [TSContactThread getOrCreateThreadWithContactId:recipientId transaction:transaction]; - }]; + TSThread *thread = [TSContactThread getOrCreateThreadWithContactId:recipientId transaction:transaction]; NSData *groupId = dataMessage.group.id; OWSAssert(groupId); OWSSyncGroupsRequestMessage *syncGroupsRequestMessage = [[OWSSyncGroupsRequestMessage alloc] initWithThread:thread groupId:groupId]; [self.messageSender sendMessage:syncGroupsRequestMessage + transaction:transaction success:^{ DDLogWarn(@"%@ Successfully sent Request Group Info message.", self.tag); } failure:^(NSError *error) { DDLogError(@"%@ Failed to send Request Group Info message with error: %@", self.tag, error); }]; - - return; } } if ((dataMessage.flags & OWSSignalServiceProtosDataMessageFlagsEndSession) != 0) { - [self handleEndSessionMessageWithEnvelope:incomingEnvelope dataMessage:dataMessage]; + [self handleEndSessionMessageWithEnvelope:envelope dataMessage:dataMessage transaction:transaction]; } else if ((dataMessage.flags & OWSSignalServiceProtosDataMessageFlagsExpirationTimerUpdate) != 0) { - [self handleExpirationTimerUpdateMessageWithEnvelope:incomingEnvelope dataMessage:dataMessage]; + [self handleExpirationTimerUpdateMessageWithEnvelope:envelope dataMessage:dataMessage transaction:transaction]; } else if ((dataMessage.flags & OWSSignalServiceProtosDataMessageFlagsProfileKey) != 0) { - [self handleProfileKeyMessageWithEnvelope:incomingEnvelope dataMessage:dataMessage]; + [self handleProfileKeyMessageWithEnvelope:envelope dataMessage:dataMessage]; } else if (dataMessage.attachments.count > 0) { - [self handleReceivedMediaWithEnvelope:incomingEnvelope dataMessage:dataMessage]; + [self handleReceivedMediaWithEnvelope:envelope dataMessage:dataMessage transaction:transaction]; } else { - [self handleReceivedTextMessageWithEnvelope:incomingEnvelope dataMessage:dataMessage]; + [self handleReceivedTextMessageWithEnvelope:envelope dataMessage:dataMessage transaction:transaction]; if ([self isDataMessageGroupAvatarUpdate:dataMessage]) { DDLogVerbose(@"%@ Data message had group avatar attachment", self.tag); - [self handleReceivedGroupAvatarUpdateWithEnvelope:incomingEnvelope dataMessage:dataMessage]; + [self handleReceivedGroupAvatarUpdateWithEnvelope:envelope dataMessage:dataMessage transaction:transaction]; } } } @@ -623,54 +628,64 @@ NS_ASSUME_NONNULL_BEGIN return [TextSecureKitEnv sharedEnv].profileManager; } -- (void)handleIncomingEnvelope:(OWSSignalServiceProtosEnvelope *)incomingEnvelope +- (void)handleIncomingEnvelope:(OWSSignalServiceProtosEnvelope *)envelope withCallMessage:(OWSSignalServiceProtosCallMessage *)callMessage { - OWSAssert(incomingEnvelope); + OWSAssert(envelope); OWSAssert(callMessage); if ([callMessage hasProfileKey]) { NSData *profileKey = [callMessage profileKey]; - NSString *recipientId = incomingEnvelope.source; + NSString *recipientId = envelope.source; [self.profileManager setProfileKeyData:profileKey forRecipientId:recipientId]; } - if (callMessage.hasOffer) { - [self.callMessageHandler receivedOffer:callMessage.offer fromCallerId:incomingEnvelope.source]; - } else if (callMessage.hasAnswer) { - [self.callMessageHandler receivedAnswer:callMessage.answer fromCallerId:incomingEnvelope.source]; - } else if (callMessage.iceUpdate.count > 0) { - for (OWSSignalServiceProtosCallMessageIceUpdate *iceUpdate in callMessage.iceUpdate) { - [self.callMessageHandler receivedIceUpdate:iceUpdate fromCallerId:incomingEnvelope.source]; + // TODO: Should we do this synchronously? + dispatch_async(dispatch_get_main_queue(), ^{ + if (callMessage.hasOffer) { + [self.callMessageHandler receivedOffer:callMessage.offer fromCallerId:envelope.source]; + } else if (callMessage.hasAnswer) { + [self.callMessageHandler receivedAnswer:callMessage.answer fromCallerId:envelope.source]; + } else if (callMessage.iceUpdate.count > 0) { + for (OWSSignalServiceProtosCallMessageIceUpdate *iceUpdate in callMessage.iceUpdate) { + [self.callMessageHandler receivedIceUpdate:iceUpdate fromCallerId:envelope.source]; + } + } else if (callMessage.hasHangup) { + DDLogVerbose(@"%@ Received CallMessage with Hangup.", self.tag); + [self.callMessageHandler receivedHangup:callMessage.hangup fromCallerId:envelope.source]; + } else if (callMessage.hasBusy) { + [self.callMessageHandler receivedBusy:callMessage.busy fromCallerId:envelope.source]; + } else { + OWSProdInfoWEnvelope([OWSAnalyticsEvents messageManagerErrorCallMessageNoActionablePayload], envelope); } - } else if (callMessage.hasHangup) { - DDLogVerbose(@"%@ Received CallMessage with Hangup.", self.tag); - [self.callMessageHandler receivedHangup:callMessage.hangup fromCallerId:incomingEnvelope.source]; - } else if (callMessage.hasBusy) { - [self.callMessageHandler receivedBusy:callMessage.busy fromCallerId:incomingEnvelope.source]; - } else { - OWSProdInfoWEnvelope([OWSAnalyticsEvents messageManagerErrorCallMessageNoActionablePayload], incomingEnvelope); - } + }); } - (void)handleReceivedGroupAvatarUpdateWithEnvelope:(OWSSignalServiceProtosEnvelope *)envelope dataMessage:(OWSSignalServiceProtosDataMessage *)dataMessage + transaction:(YapDatabaseReadWriteTransaction *)transaction { - OWSAssert([NSThread isMainThread]); - TSGroupThread *groupThread = [TSGroupThread getOrCreateThreadWithGroupIdData:dataMessage.group.id]; + OWSAssert(envelope); + OWSAssert(dataMessage); + OWSAssert(transaction); + + TSGroupThread *groupThread = + [TSGroupThread getOrCreateThreadWithGroupIdData:dataMessage.group.id transaction:transaction]; OWSAssert(groupThread); OWSAttachmentsProcessor *attachmentsProcessor = [[OWSAttachmentsProcessor alloc] initWithAttachmentProtos:@[ dataMessage.group.avatar ] timestamp:envelope.timestamp relay:envelope.relay thread:groupThread - networkManager:self.networkManager]; + networkManager:self.networkManager + transaction:transaction]; if (!attachmentsProcessor.hasSupportedAttachments) { DDLogWarn(@"%@ received unsupported group avatar envelope", self.tag); return; } [attachmentsProcessor fetchAttachmentsForMessage:nil + transaction:transaction success:^(TSAttachmentStream *attachmentStream) { [groupThread updateAvatarWithAttachmentStream:attachmentStream]; } @@ -684,16 +699,21 @@ NS_ASSUME_NONNULL_BEGIN - (void)handleReceivedMediaWithEnvelope:(OWSSignalServiceProtosEnvelope *)envelope dataMessage:(OWSSignalServiceProtosDataMessage *)dataMessage + transaction:(YapDatabaseReadWriteTransaction *)transaction { - OWSAssert([NSThread isMainThread]); - TSThread *thread = [self threadForEnvelope:envelope dataMessage:dataMessage]; + OWSAssert(envelope); + OWSAssert(dataMessage); + OWSAssert(transaction); + + TSThread *thread = [self threadForEnvelope:envelope dataMessage:dataMessage transaction:transaction]; OWSAssert(thread); OWSAttachmentsProcessor *attachmentsProcessor = [[OWSAttachmentsProcessor alloc] initWithAttachmentProtos:dataMessage.attachments timestamp:envelope.timestamp relay:envelope.relay thread:thread - networkManager:self.networkManager]; + networkManager:self.networkManager + transaction:transaction]; if (!attachmentsProcessor.hasSupportedAttachments) { DDLogWarn(@"%@ received unsupported media envelope", self.tag); return; @@ -702,7 +722,8 @@ NS_ASSUME_NONNULL_BEGIN TSIncomingMessage *_Nullable createdMessage = [self handleReceivedEnvelope:envelope withDataMessage:dataMessage - attachmentIds:attachmentsProcessor.supportedAttachmentIds]; + attachmentIds:attachmentsProcessor.supportedAttachmentIds + transaction:transaction]; if (!createdMessage) { return; @@ -711,6 +732,7 @@ NS_ASSUME_NONNULL_BEGIN DDLogDebug(@"%@ incoming attachment message: %@", self.tag, createdMessage.debugDescription); [attachmentsProcessor fetchAttachmentsForMessage:createdMessage + transaction:transaction success:^(TSAttachmentStream *attachmentStream) { DDLogDebug( @"%@ successfully fetched attachment: %@ for message: %@", self.tag, attachmentStream, createdMessage); @@ -721,22 +743,25 @@ NS_ASSUME_NONNULL_BEGIN }]; } -- (void)handleIncomingEnvelope:(OWSSignalServiceProtosEnvelope *)messageEnvelope +- (void)handleIncomingEnvelope:(OWSSignalServiceProtosEnvelope *)envelope withSyncMessage:(OWSSignalServiceProtosSyncMessage *)syncMessage + transaction:(YapDatabaseReadWriteTransaction *)transaction { - OWSAssert([NSThread isMainThread]); - + OWSAssert(envelope); + OWSAssert(syncMessage); + OWSAssert(transaction); OWSAssert([TSAccountManager isRegistered]); + NSString *localNumber = [TSAccountManager localNumber]; - if (![localNumber isEqualToString:messageEnvelope.source]) { + if (![localNumber isEqualToString:envelope.source]) { // Sync messages should only come from linked devices. - OWSProdErrorWEnvelope([OWSAnalyticsEvents messageManagerErrorSyncMessageFromUnknownSource], messageEnvelope); + OWSProdErrorWEnvelope([OWSAnalyticsEvents messageManagerErrorSyncMessageFromUnknownSource], envelope); return; } if (syncMessage.hasSent) { OWSIncomingSentMessageTranscript *transcript = - [[OWSIncomingSentMessageTranscript alloc] initWithProto:syncMessage.sent relay:messageEnvelope.relay]; + [[OWSIncomingSentMessageTranscript alloc] initWithProto:syncMessage.sent relay:envelope.relay]; OWSRecordTranscriptJob *recordJob = [[OWSRecordTranscriptJob alloc] initWithIncomingSentMessageTranscript:transcript @@ -759,13 +784,16 @@ NS_ASSUME_NONNULL_BEGIN if ([self isDataMessageGroupAvatarUpdate:syncMessage.sent.message]) { [recordJob runWithAttachmentHandler:^(TSAttachmentStream *attachmentStream) { TSGroupThread *groupThread = - [TSGroupThread getOrCreateThreadWithGroupIdData:syncMessage.sent.message.group.id]; - [groupThread updateAvatarWithAttachmentStream:attachmentStream]; - }]; + [TSGroupThread getOrCreateThreadWithGroupIdData:syncMessage.sent.message.group.id + transaction:transaction]; + [groupThread updateAvatarWithAttachmentStream:attachmentStream transaction:transaction]; + } + transaction:transaction]; } else { [recordJob runWithAttachmentHandler:^(TSAttachmentStream *attachmentStream) { DDLogDebug(@"%@ successfully fetched transcript attachment: %@", self.tag, attachmentStream); - }]; + } + transaction:transaction]; } } else if (syncMessage.hasRequest) { if (syncMessage.request.type == OWSSignalServiceProtosSyncMessageRequestTypeContacts) { @@ -801,49 +829,57 @@ NS_ASSUME_NONNULL_BEGIN DDLogWarn(@"%@ ignoring unsupported sync request message", self.tag); } } else if (syncMessage.hasBlocked) { - NSArray *blockedPhoneNumbers = [syncMessage.blocked.numbers copy]; - [_blockingManager setBlockedPhoneNumbers:blockedPhoneNumbers sendSyncMessage:NO]; + dispatch_async(dispatch_get_main_queue(), ^{ + NSArray *blockedPhoneNumbers = [syncMessage.blocked.numbers copy]; + [_blockingManager setBlockedPhoneNumbers:blockedPhoneNumbers sendSyncMessage:NO]; + }); } else if (syncMessage.read.count > 0) { DDLogInfo(@"%@ Received %ld read receipt(s)", self.tag, (u_long)syncMessage.read.count); OWSReadReceiptsProcessor *readReceiptsProcessor = [[OWSReadReceiptsProcessor alloc] initWithReadReceiptProtos:syncMessage.read storageManager:self.storageManager]; - [readReceiptsProcessor process]; + [readReceiptsProcessor processWithTransaction:transaction]; } else if (syncMessage.hasVerified) { DDLogInfo(@"%@ Received verification state for %@", self.tag, syncMessage.verified.destination); - [self.identityManager processIncomingSyncMessage:syncMessage.verified]; + dispatch_async(dispatch_get_main_queue(), ^{ + [self.identityManager processIncomingSyncMessage:syncMessage.verified]; + }); } else { DDLogWarn(@"%@ Ignoring unsupported sync message.", self.tag); } } -- (void)handleEndSessionMessageWithEnvelope:(OWSSignalServiceProtosEnvelope *)endSessionEnvelope +- (void)handleEndSessionMessageWithEnvelope:(OWSSignalServiceProtosEnvelope *)envelope dataMessage:(OWSSignalServiceProtosDataMessage *)dataMessage + transaction:(YapDatabaseReadWriteTransaction *)transaction { - OWSAssert([NSThread isMainThread]); - [self.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) { - TSContactThread *thread = - [TSContactThread getOrCreateThreadWithContactId:endSessionEnvelope.source transaction:transaction]; - uint64_t timeStamp = endSessionEnvelope.timestamp; - - if (thread) { // TODO thread should always be nonnull. - [[[TSInfoMessage alloc] initWithTimestamp:timeStamp - inThread:thread - messageType:TSInfoMessageTypeSessionDidEnd] saveWithTransaction:transaction]; - } - }]; + OWSAssert(envelope); + OWSAssert(dataMessage); + OWSAssert(transaction); + + TSContactThread *thread = [TSContactThread getOrCreateThreadWithContactId:envelope.source transaction:transaction]; + + if (thread) { // TODO thread should always be nonnull. + [[[TSInfoMessage alloc] initWithTimestamp:envelope.timestamp + inThread:thread + messageType:TSInfoMessageTypeSessionDidEnd] saveWithTransaction:transaction]; + } dispatch_async([OWSDispatch sessionStoreQueue], ^{ - [[TSStorageManager sharedManager] deleteAllSessionsForContact:endSessionEnvelope.source]; + [[TSStorageManager sharedManager] deleteAllSessionsForContact:envelope.source]; }); } - (void)handleExpirationTimerUpdateMessageWithEnvelope:(OWSSignalServiceProtosEnvelope *)envelope dataMessage:(OWSSignalServiceProtosDataMessage *)dataMessage + transaction:(YapDatabaseReadWriteTransaction *)transaction { - OWSAssert([NSThread isMainThread]); - TSThread *thread = [self threadForEnvelope:envelope dataMessage:dataMessage]; + OWSAssert(envelope); + OWSAssert(dataMessage); + OWSAssert(transaction); + + TSThread *thread = [self threadForEnvelope:envelope dataMessage:dataMessage transaction:transaction]; OWSDisappearingMessagesConfiguration *disappearingMessagesConfiguration; if (dataMessage.hasExpireTimer && dataMessage.expireTimer > 0) { @@ -863,24 +899,25 @@ NS_ASSUME_NONNULL_BEGIN durationSeconds:OWSDisappearingMessagesConfigurationDefaultExpirationDuration]; } OWSAssert(disappearingMessagesConfiguration); - [disappearingMessagesConfiguration save]; + [disappearingMessagesConfiguration saveWithTransaction:transaction]; NSString *name = [self.contactsManager displayNameForPhoneIdentifier:envelope.source]; OWSDisappearingConfigurationUpdateInfoMessage *message = [[OWSDisappearingConfigurationUpdateInfoMessage alloc] initWithTimestamp:envelope.timestamp thread:thread configuration:disappearingMessagesConfiguration createdByRemoteName:name]; - [message save]; + [message saveWithTransaction:transaction]; } -- (void)handleProfileKeyMessageWithEnvelope:(OWSSignalServiceProtosEnvelope *)incomingEnvelope +- (void)handleProfileKeyMessageWithEnvelope:(OWSSignalServiceProtosEnvelope *)envelope dataMessage:(OWSSignalServiceProtosDataMessage *)dataMessage { - NSString *recipientId = incomingEnvelope.source; + OWSAssert(envelope); + OWSAssert(dataMessage); + + NSString *recipientId = envelope.source; if (!dataMessage.hasProfileKey) { - OWSFail(@"%@ received profile key message without profile key from: %@", - self.tag, - envelopeAddress(incomingEnvelope)); + OWSFail(@"%@ received profile key message without profile key from: %@", self.tag, envelopeAddress(envelope)); return; } NSData *profileKey = dataMessage.profileKey; @@ -888,7 +925,7 @@ NS_ASSUME_NONNULL_BEGIN OWSFail(@"%@ received profile key of unexpected length:%lu from:%@", self.tag, (unsigned long)profileKey.length, - envelopeAddress(incomingEnvelope)); + envelopeAddress(envelope)); return; } @@ -896,11 +933,15 @@ NS_ASSUME_NONNULL_BEGIN [profileManager setProfileKeyData:profileKey forRecipientId:recipientId]; } -- (void)handleReceivedTextMessageWithEnvelope:(OWSSignalServiceProtosEnvelope *)textMessageEnvelope +- (void)handleReceivedTextMessageWithEnvelope:(OWSSignalServiceProtosEnvelope *)envelope dataMessage:(OWSSignalServiceProtosDataMessage *)dataMessage + transaction:(YapDatabaseReadWriteTransaction *)transaction { - OWSAssert([NSThread isMainThread]); - [self handleReceivedEnvelope:textMessageEnvelope withDataMessage:dataMessage attachmentIds:@[]]; + OWSAssert(envelope); + OWSAssert(dataMessage); + OWSAssert(transaction); + + [self handleReceivedEnvelope:envelope withDataMessage:dataMessage attachmentIds:@[] transaction:transaction]; } - (void)sendGroupUpdateForThread:(TSGroupThread *)gThread message:(TSOutgoingMessage *)message @@ -936,8 +977,11 @@ NS_ASSUME_NONNULL_BEGIN - (void)handleGroupInfoRequest:(OWSSignalServiceProtosEnvelope *)envelope dataMessage:(OWSSignalServiceProtosDataMessage *)dataMessage + transaction:(YapDatabaseReadWriteTransaction *)transaction { - OWSAssert([NSThread isMainThread]); + OWSAssert(envelope); + OWSAssert(dataMessage); + OWSAssert(transaction); OWSAssert(dataMessage.group.type == OWSSignalServiceProtosGroupContextTypeRequestInfo); NSData *groupId = dataMessage.hasGroup ? dataMessage.group.id : nil; @@ -948,42 +992,44 @@ NS_ASSUME_NONNULL_BEGIN DDLogWarn(@"%@ Received 'Request Group Info' message for group: %@ from: %@", self.tag, groupId, envelope.source); - [self.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) { - TSGroupModel *emptyModelToFillOutId = - [[TSGroupModel alloc] initWithTitle:nil memberIds:nil image:nil groupId:dataMessage.group.id]; - TSGroupThread *gThread = [TSGroupThread threadWithGroupModel:emptyModelToFillOutId transaction:transaction]; - if (!gThread) { - DDLogWarn(@"%@ Unknown group: %@", self.tag, groupId); - return; - } - - if (![gThread.groupModel.groupMemberIds containsObject:envelope.source]) { - DDLogWarn(@"%@ Ignoring 'Request Group Info' message for non-member of group. %@ not in %@", - self.tag, - envelope.source, - gThread.groupModel.groupMemberIds); - } + TSGroupModel *emptyModelToFillOutId = + [[TSGroupModel alloc] initWithTitle:nil memberIds:nil image:nil groupId:dataMessage.group.id]; + TSGroupThread *gThread = [TSGroupThread threadWithGroupModel:emptyModelToFillOutId transaction:transaction]; + if (!gThread) { + DDLogWarn(@"%@ Unknown group: %@", self.tag, groupId); + return; + } - NSString *updateGroupInfo = - [gThread.groupModel getInfoStringAboutUpdateTo:gThread.groupModel contactsManager:self.contactsManager]; - TSOutgoingMessage *message = [[TSOutgoingMessage alloc] initWithTimestamp:[NSDate ows_millisecondTimeStamp] - inThread:gThread - groupMetaMessage:TSGroupMessageUpdate]; - [message updateWithCustomMessage:updateGroupInfo transaction:transaction]; - // Only send this group update to the requester. - [message updateWithSingleGroupRecipient:envelope.source transaction:transaction]; + if (![gThread.groupModel.groupMemberIds containsObject:envelope.source]) { + DDLogWarn(@"%@ Ignoring 'Request Group Info' message for non-member of group. %@ not in %@", + self.tag, + envelope.source, + gThread.groupModel.groupMemberIds); + } - dispatch_async(dispatch_get_main_queue(), ^{ - [self sendGroupUpdateForThread:gThread message:message]; - }); - }]; + NSString *updateGroupInfo = + [gThread.groupModel getInfoStringAboutUpdateTo:gThread.groupModel contactsManager:self.contactsManager]; + TSOutgoingMessage *message = [[TSOutgoingMessage alloc] initWithTimestamp:[NSDate ows_millisecondTimeStamp] + inThread:gThread + groupMetaMessage:TSGroupMessageUpdate]; + [message updateWithCustomMessage:updateGroupInfo transaction:transaction]; + // Only send this group update to the requester. + [message updateWithSingleGroupRecipient:envelope.source transaction:transaction]; + + dispatch_async(dispatch_get_main_queue(), ^{ + [self sendGroupUpdateForThread:gThread message:message]; + }); } - (TSIncomingMessage *_Nullable)handleReceivedEnvelope:(OWSSignalServiceProtosEnvelope *)envelope withDataMessage:(OWSSignalServiceProtosDataMessage *)dataMessage attachmentIds:(NSArray *)attachmentIds + transaction:(YapDatabaseReadWriteTransaction *)transaction { - OWSAssert([NSThread isMainThread]); + OWSAssert(envelope); + OWSAssert(dataMessage); + OWSAssert(transaction); + uint64_t timestamp = envelope.timestamp; NSString *body = dataMessage.body; NSData *groupId = dataMessage.hasGroup ? dataMessage.group.id : nil; @@ -996,152 +1042,152 @@ NS_ASSUME_NONNULL_BEGIN NSString *localNumber = [TSAccountManager localNumber]; if (dataMessage.group.type == OWSSignalServiceProtosGroupContextTypeRequestInfo) { - [self handleGroupInfoRequest:envelope dataMessage:dataMessage]; + [self handleGroupInfoRequest:envelope dataMessage:dataMessage transaction:transaction]; return nil; } - [self.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) { - if (groupId) { - NSMutableArray *uniqueMemberIds = [[[NSSet setWithArray:dataMessage.group.members] allObjects] mutableCopy]; - TSGroupModel *model = [[TSGroupModel alloc] initWithTitle:dataMessage.group.name - memberIds:uniqueMemberIds - image:nil - groupId:dataMessage.group.id]; - TSGroupThread *gThread = [TSGroupThread getOrCreateThreadWithGroupModel:model transaction:transaction]; - [gThread saveWithTransaction:transaction]; - - switch (dataMessage.group.type) { - case OWSSignalServiceProtosGroupContextTypeUpdate: { - NSString *updateGroupInfo = - [gThread.groupModel getInfoStringAboutUpdateTo:model contactsManager:self.contactsManager]; - gThread.groupModel = model; - [gThread saveWithTransaction:transaction]; - [[[TSInfoMessage alloc] initWithTimestamp:timestamp - inThread:gThread - messageType:TSInfoMessageTypeGroupUpdate - customMessage:updateGroupInfo] saveWithTransaction:transaction]; - break; - } - case OWSSignalServiceProtosGroupContextTypeQuit: { - NSString *nameString = [self.contactsManager displayNameForPhoneIdentifier:envelope.source]; - - NSString *updateGroupInfo = - [NSString stringWithFormat:NSLocalizedString(@"GROUP_MEMBER_LEFT", @""), nameString]; - NSMutableArray *newGroupMembers = [NSMutableArray arrayWithArray:gThread.groupModel.groupMemberIds]; - [newGroupMembers removeObject:envelope.source]; - gThread.groupModel.groupMemberIds = newGroupMembers; - - [gThread saveWithTransaction:transaction]; - [[[TSInfoMessage alloc] initWithTimestamp:timestamp - inThread:gThread - messageType:TSInfoMessageTypeGroupUpdate - customMessage:updateGroupInfo] saveWithTransaction:transaction]; - break; - } - case OWSSignalServiceProtosGroupContextTypeDeliver: { - if (body.length == 0 && attachmentIds.count < 1) { - DDLogWarn(@"%@ ignoring empty incoming message from: %@ for group: %@ with timestampe: %lu", - self.tag, - envelopeAddress(envelope), - groupId, - (unsigned long)timestamp); - } else { - DDLogDebug(@"%@ incoming message from: %@ for group: %@ with timestampe: %lu", - self.tag, - envelopeAddress(envelope), - groupId, - (unsigned long)timestamp); - incomingMessage = [[TSIncomingMessage alloc] initWithTimestamp:timestamp - inThread:gThread - authorId:envelope.source - sourceDeviceId:envelope.sourceDevice - messageBody:body - attachmentIds:attachmentIds - expiresInSeconds:dataMessage.expireTimer]; - - [incomingMessage saveWithTransaction:transaction]; - } - break; - } - default: { - DDLogWarn(@"%@ Ignoring unknown group message type:%d", self.tag, (int)dataMessage.group.type); - } - } - - thread = gThread; - } else { - if (body.length == 0 && attachmentIds.count < 1) { - DDLogWarn(@"%@ ignoring empty incoming message from: %@ with timestampe: %lu", - self.tag, - envelopeAddress(envelope), - (unsigned long)timestamp); - } else { - DDLogDebug(@"%@ incoming message from: %@ with timestampe: %lu", - self.tag, - envelopeAddress(envelope), - (unsigned long)timestamp); - TSContactThread *cThread = [TSContactThread getOrCreateThreadWithContactId:envelope.source - transaction:transaction - relay:envelope.relay]; - - incomingMessage = [[TSIncomingMessage alloc] initWithTimestamp:timestamp - inThread:cThread - authorId:[cThread contactIdentifier] - sourceDeviceId:envelope.sourceDevice - messageBody:body - attachmentIds:attachmentIds - expiresInSeconds:dataMessage.expireTimer]; - - [incomingMessage saveWithTransaction:transaction]; - thread = cThread; - } - } - - if (thread && incomingMessage) { - // Any messages sent from the current user - from this device or another - should be - // automatically marked as read. - BOOL shouldMarkMessageAsRead = [envelope.source isEqualToString:localNumber]; - if (shouldMarkMessageAsRead) { - // Don't send a read receipt for messages sent by ourselves. - [incomingMessage markAsReadWithTransaction:transaction sendReadReceipt:NO updateExpiration:YES]; - } - - DDLogDebug(@"%@ shouldMarkMessageAsRead: %d (%@)", self.tag, shouldMarkMessageAsRead, envelope.source); - - // Other clients allow attachments to be sent along with body, we want the text displayed as a separate - // message - if ([attachmentIds count] > 0 && body != nil && body.length > 0) { - // We want the text to be displayed under the attachment - uint64_t textMessageTimestamp = timestamp + 1; - TSIncomingMessage *textMessage = [[TSIncomingMessage alloc] initWithTimestamp:textMessageTimestamp - inThread:thread - authorId:envelope.source - sourceDeviceId:envelope.sourceDevice - messageBody:body - attachmentIds:@[] - expiresInSeconds:dataMessage.expireTimer]; - DDLogDebug(@"%@ incoming extra text message: %@", self.tag, incomingMessage.debugDescription); - [textMessage saveWithTransaction:transaction]; - } - } - }]; + if (groupId) { + NSMutableArray *uniqueMemberIds = [[[NSSet setWithArray:dataMessage.group.members] allObjects] mutableCopy]; + TSGroupModel *model = [[TSGroupModel alloc] initWithTitle:dataMessage.group.name + memberIds:uniqueMemberIds + image:nil + groupId:dataMessage.group.id]; + TSGroupThread *gThread = [TSGroupThread getOrCreateThreadWithGroupModel:model transaction:transaction]; + [gThread saveWithTransaction:transaction]; + + switch (dataMessage.group.type) { + case OWSSignalServiceProtosGroupContextTypeUpdate: { + NSString *updateGroupInfo = + [gThread.groupModel getInfoStringAboutUpdateTo:model contactsManager:self.contactsManager]; + gThread.groupModel = model; + [gThread saveWithTransaction:transaction]; + [[[TSInfoMessage alloc] initWithTimestamp:timestamp + inThread:gThread + messageType:TSInfoMessageTypeGroupUpdate + customMessage:updateGroupInfo] saveWithTransaction:transaction]; + break; + } + case OWSSignalServiceProtosGroupContextTypeQuit: { + NSString *nameString = [self.contactsManager displayNameForPhoneIdentifier:envelope.source]; + + NSString *updateGroupInfo = + [NSString stringWithFormat:NSLocalizedString(@"GROUP_MEMBER_LEFT", @""), nameString]; + NSMutableArray *newGroupMembers = [NSMutableArray arrayWithArray:gThread.groupModel.groupMemberIds]; + [newGroupMembers removeObject:envelope.source]; + gThread.groupModel.groupMemberIds = newGroupMembers; + + [gThread saveWithTransaction:transaction]; + [[[TSInfoMessage alloc] initWithTimestamp:timestamp + inThread:gThread + messageType:TSInfoMessageTypeGroupUpdate + customMessage:updateGroupInfo] saveWithTransaction:transaction]; + break; + } + case OWSSignalServiceProtosGroupContextTypeDeliver: { + if (body.length == 0 && attachmentIds.count < 1) { + DDLogWarn(@"%@ ignoring empty incoming message from: %@ for group: %@ with timestampe: %lu", + self.tag, + envelopeAddress(envelope), + groupId, + (unsigned long)timestamp); + } else { + DDLogDebug(@"%@ incoming message from: %@ for group: %@ with timestampe: %lu", + self.tag, + envelopeAddress(envelope), + groupId, + (unsigned long)timestamp); + incomingMessage = [[TSIncomingMessage alloc] initWithTimestamp:timestamp + inThread:gThread + authorId:envelope.source + sourceDeviceId:envelope.sourceDevice + messageBody:body + attachmentIds:attachmentIds + expiresInSeconds:dataMessage.expireTimer]; + + [incomingMessage saveWithTransaction:transaction]; + } + break; + } + default: { + DDLogWarn(@"%@ Ignoring unknown group message type:%d", self.tag, (int)dataMessage.group.type); + } + } + + thread = gThread; + } else { + if (body.length == 0 && attachmentIds.count < 1) { + DDLogWarn(@"%@ ignoring empty incoming message from: %@ with timestampe: %lu", + self.tag, + envelopeAddress(envelope), + (unsigned long)timestamp); + } else { + DDLogDebug(@"%@ incoming message from: %@ with timestampe: %lu", + self.tag, + envelopeAddress(envelope), + (unsigned long)timestamp); + TSContactThread *cThread = [TSContactThread getOrCreateThreadWithContactId:envelope.source + transaction:transaction + relay:envelope.relay]; + + incomingMessage = [[TSIncomingMessage alloc] initWithTimestamp:timestamp + inThread:cThread + authorId:[cThread contactIdentifier] + sourceDeviceId:envelope.sourceDevice + messageBody:body + attachmentIds:attachmentIds + expiresInSeconds:dataMessage.expireTimer]; + + [incomingMessage saveWithTransaction:transaction]; + thread = cThread; + } + } if (thread && incomingMessage) { - // In case we already have a read receipt for this new message (happens sometimes). - OWSReadReceiptsProcessor *readReceiptsProcessor = - [[OWSReadReceiptsProcessor alloc] initWithIncomingMessage:incomingMessage - storageManager:self.storageManager]; - [readReceiptsProcessor process]; + // Any messages sent from the current user - from this device or another - should be + // automatically marked as read. + BOOL shouldMarkMessageAsRead = [envelope.source isEqualToString:localNumber]; + if (shouldMarkMessageAsRead) { + // Don't send a read receipt for messages sent by ourselves. + [incomingMessage markAsReadWithTransaction:transaction sendReadReceipt:NO updateExpiration:YES]; + } + + DDLogDebug(@"%@ shouldMarkMessageAsRead: %d (%@)", self.tag, shouldMarkMessageAsRead, envelope.source); + + // Other clients allow attachments to be sent along with body, we want the text displayed as a separate + // message + if ([attachmentIds count] > 0 && body != nil && body.length > 0) { + // We want the text to be displayed under the attachment + uint64_t textMessageTimestamp = timestamp + 1; + TSIncomingMessage *textMessage = [[TSIncomingMessage alloc] initWithTimestamp:textMessageTimestamp + inThread:thread + authorId:envelope.source + sourceDeviceId:envelope.sourceDevice + messageBody:body + attachmentIds:@[] + expiresInSeconds:dataMessage.expireTimer]; + DDLogDebug(@"%@ incoming extra text message: %@", self.tag, incomingMessage.debugDescription); + [textMessage saveWithTransaction:transaction]; + } + } - [OWSDisappearingMessagesJob becomeConsistentWithConfigurationForMessage:incomingMessage - contactsManager:self.contactsManager]; + if (thread && incomingMessage) { + dispatch_async(dispatch_get_main_queue(), ^{ + // In case we already have a read receipt for this new message (happens sometimes). + OWSReadReceiptsProcessor *readReceiptsProcessor = + [[OWSReadReceiptsProcessor alloc] initWithIncomingMessage:incomingMessage + storageManager:self.storageManager]; + [readReceiptsProcessor process]; - // Update thread preview in inbox - [thread touch]; + [OWSDisappearingMessagesJob becomeConsistentWithConfigurationForMessage:incomingMessage + contactsManager:self.contactsManager]; - [[TextSecureKitEnv sharedEnv].notificationsManager notifyUserForIncomingMessage:incomingMessage - inThread:thread - contactsManager:self.contactsManager]; + // Update thread preview in inbox + [thread touch]; + + [[TextSecureKitEnv sharedEnv].notificationsManager notifyUserForIncomingMessage:incomingMessage + inThread:thread + contactsManager:self.contactsManager]; + }); } return incomingMessage; @@ -1219,11 +1265,16 @@ NSString *envelopeAddress(OWSSignalServiceProtosEnvelope *envelope) */ - (TSThread *)threadForEnvelope:(OWSSignalServiceProtosEnvelope *)envelope dataMessage:(OWSSignalServiceProtosDataMessage *)dataMessage + transaction:(YapDatabaseReadWriteTransaction *)transaction { + OWSAssert(envelope); + OWSAssert(dataMessage); + OWSAssert(transaction); + if (dataMessage.hasGroup) { - return [TSGroupThread getOrCreateThreadWithGroupIdData:dataMessage.group.id]; + return [TSGroupThread getOrCreateThreadWithGroupIdData:dataMessage.group.id transaction:transaction]; } else { - return [TSContactThread getOrCreateThreadWithContactId:envelope.source]; + return [TSContactThread getOrCreateThreadWithContactId:envelope.source transaction:transaction]; } } diff --git a/SignalServiceKit/src/Storage/OWSIncomingMessageFinder.h b/SignalServiceKit/src/Storage/OWSIncomingMessageFinder.h index 49c16c5de..a75d43e44 100644 --- a/SignalServiceKit/src/Storage/OWSIncomingMessageFinder.h +++ b/SignalServiceKit/src/Storage/OWSIncomingMessageFinder.h @@ -21,7 +21,8 @@ NS_ASSUME_NONNULL_BEGIN */ - (BOOL)existsMessageWithTimestamp:(uint64_t)timestamp sourceId:(NSString *)sourceId - sourceDeviceId:(uint32_t)sourceDeviceId; + sourceDeviceId:(uint32_t)sourceDeviceId + transaction:(YapDatabaseReadTransaction *)transaction; @end diff --git a/SignalServiceKit/src/Storage/OWSIncomingMessageFinder.m b/SignalServiceKit/src/Storage/OWSIncomingMessageFinder.m index 74a9ab70f..29c2a3d92 100644 --- a/SignalServiceKit/src/Storage/OWSIncomingMessageFinder.m +++ b/SignalServiceKit/src/Storage/OWSIncomingMessageFinder.m @@ -114,6 +114,7 @@ NSString *const OWSIncomingMessageFinderColumnSourceDeviceId = @"OWSIncomingMess - (BOOL)existsMessageWithTimestamp:(uint64_t)timestamp sourceId:(NSString *)sourceId sourceDeviceId:(uint32_t)sourceDeviceId + transaction:(YapDatabaseReadTransaction *)transaction { if (![self.database registeredExtension:OWSIncomingMessageFinderExtensionName]) { OWSFail(@"%@ in %s but extension is not registered", self.tag, __PRETTY_FUNCTION__); @@ -129,13 +130,8 @@ NSString *const OWSIncomingMessageFinderColumnSourceDeviceId = @"OWSIncomingMess // YapDatabaseQuery params must be objects YapDatabaseQuery *query = [YapDatabaseQuery queryWithFormat:queryFormat, @(timestamp), sourceId, @(sourceDeviceId)]; - __block NSUInteger count; - __block BOOL success; - - [self.dbConnection readWithBlock:^(YapDatabaseReadTransaction *_Nonnull transaction) { - success = [[transaction ext:OWSIncomingMessageFinderExtensionName] getNumberOfRows:&count matchingQuery:query]; - }]; - + NSUInteger count; + BOOL success = [[transaction ext:OWSIncomingMessageFinderExtensionName] getNumberOfRows:&count matchingQuery:query]; if (!success) { OWSFail(@"%@ Could not execute query", self.tag); return NO;