Process messages in a single transaction (wherever possible).

// FREEBIE
pull/1/head
Matthew Chen 8 years ago
parent afc753e7ed
commit 6fce2c26b7

@ -2600,6 +2600,7 @@ typedef NS_ENUM(NSInteger, MessagesRangeSizeMode) {
[[OWSAttachmentsProcessor alloc] initWithAttachmentPointer:attachmentPointer
networkManager:self.networkManager];
[processor fetchAttachmentsForMessage:message
storageManager:self.storageManager
success:^(TSAttachmentStream *_Nonnull attachmentStream) {
DDLogInfo(
@"%@ Successfully redownloaded attachment in thread: %@", self.tag, message.thread);

@ -51,8 +51,9 @@
// If we're not going to cancel the pop/back, we need to call the super
// implementation since it has important side effects.
if (result) {
// NOTE: result might end up NO if the super implementation cancels the
// the pop/back.
result = [super navigationBar:navigationBar shouldPopItem:item];
OWSAssert(result);
}
return result;
}

@ -8,6 +8,7 @@
NS_ASSUME_NONNULL_BEGIN
@class TSAttachmentStream;
@class YapDatabaseReadWriteTransaction;
@interface TSGroupThread : TSThread
@ -18,6 +19,8 @@ NS_ASSUME_NONNULL_BEGIN
transaction:(YapDatabaseReadWriteTransaction *)transaction;
+ (instancetype)getOrCreateThreadWithGroupIdData:(NSData *)groupId;
+ (instancetype)getOrCreateThreadWithGroupIdData:(NSData *)groupId
transaction:(YapDatabaseReadWriteTransaction *)transaction;
+ (instancetype)threadWithGroupModel:(TSGroupModel *)groupModel transaction:(YapDatabaseReadTransaction *)transaction;
@ -27,6 +30,8 @@ NS_ASSUME_NONNULL_BEGIN
+ (NSArray<TSGroupThread *> *)groupThreadsWithRecipientId:(NSString *)recipientId;
- (void)updateAvatarWithAttachmentStream:(TSAttachmentStream *)attachmentStream;
- (void)updateAvatarWithAttachmentStream:(TSAttachmentStream *)attachmentStream
transaction:(YapDatabaseReadWriteTransaction *)transaction;
@end

@ -59,21 +59,35 @@ NS_ASSUME_NONNULL_BEGIN
}
+ (instancetype)getOrCreateThreadWithGroupIdData:(NSData *)groupId
transaction:(YapDatabaseReadWriteTransaction *)transaction
{
OWSAssert(groupId.length > 0);
OWSAssert(transaction);
TSGroupThread *thread = [self fetchObjectWithUniqueID:[self threadIdFromGroupId:groupId]];
TSGroupThread *thread = [self fetchObjectWithUniqueID:[self threadIdFromGroupId:groupId] transaction:transaction];
if (!thread) {
thread = [[self alloc] initWithGroupIdData:groupId];
[thread save];
[thread saveWithTransaction:transaction];
}
return thread;
}
+ (instancetype)getOrCreateThreadWithGroupIdData:(NSData *)groupId
{
OWSAssert(groupId.length > 0);
__block TSGroupThread *thread;
[[self dbReadWriteConnection] readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) {
thread = [self getOrCreateThreadWithGroupIdData:groupId transaction:transaction];
}];
return thread;
}
+ (instancetype)getOrCreateThreadWithGroupModel:(TSGroupModel *)groupModel
transaction:(YapDatabaseReadWriteTransaction *)transaction {
OWSAssert(groupModel);
OWSAssert(groupModel.groupId.length > 0);
OWSAssert(transaction);
TSGroupThread *thread =
[self fetchObjectWithUniqueID:[self threadIdFromGroupId:groupModel.groupId] transaction:transaction];
@ -162,12 +176,23 @@ NS_ASSUME_NONNULL_BEGIN
- (void)updateAvatarWithAttachmentStream:(TSAttachmentStream *)attachmentStream
{
[self.dbReadWriteConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) {
[self updateAvatarWithAttachmentStream:attachmentStream transaction:transaction];
}];
}
- (void)updateAvatarWithAttachmentStream:(TSAttachmentStream *)attachmentStream
transaction:(YapDatabaseReadWriteTransaction *)transaction
{
OWSAssert(attachmentStream);
OWSAssert(transaction);
self.groupModel.groupImage = [attachmentStream image];
[self save];
[self saveWithTransaction:transaction];
// Avatars are stored directly in the database, so there's no need
// to keep the attachment around after assigning the image.
[attachmentStream remove];
[attachmentStream removeWithTransaction:transaction];
}
@end

@ -1,4 +1,6 @@
// Copyright © 2016 Open Whisper Systems. All rights reserved.
//
// Copyright (c) 2017 Open Whisper Systems. All rights reserved.
//
NS_ASSUME_NONNULL_BEGIN
@ -6,6 +8,7 @@ NS_ASSUME_NONNULL_BEGIN
@class OWSReadReceipt;
@class TSIncomingMessage;
@class TSStorageManager;
@class YapDatabaseReadWriteTransaction;
extern NSString *const OWSReadReceiptsProcessorMarkedMessageAsReadNotification;
@ -30,6 +33,7 @@ extern NSString *const OWSReadReceiptsProcessorMarkedMessageAsReadNotification;
- (instancetype)init NS_UNAVAILABLE;
- (void)process;
- (void)processWithTransaction:(YapDatabaseReadWriteTransaction *)transaction;
@end

@ -80,10 +80,20 @@ NSString *const OWSReadReceiptsProcessorMarkedMessageAsReadNotification =
- (void)process
{
[[self.storageManager newDatabaseConnection] readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) {
[self processWithTransaction:transaction];
}];
}
- (void)processWithTransaction:(YapDatabaseReadWriteTransaction *)transaction
{
OWSAssert(transaction);
DDLogDebug(@"%@ Processing %ld read receipts.", self.tag, (unsigned long)self.readReceipts.count);
for (OWSReadReceipt *readReceipt in self.readReceipts) {
TSIncomingMessage *message =
[TSIncomingMessage findMessageWithAuthorId:readReceipt.senderId timestamp:readReceipt.timestamp];
TSIncomingMessage *message = [TSIncomingMessage findMessageWithAuthorId:readReceipt.senderId
timestamp:readReceipt.timestamp
transaction:transaction];
if (message) {
OWSAssert(message.thread);
@ -94,55 +104,52 @@ NSString *const OWSReadReceiptsProcessorMarkedMessageAsReadNotification =
// Always mark the message specified by the read receipt as read.
[interactionsToMarkAsRead addObject:message];
[self.storageManager.dbReadWriteConnection readWriteWithBlock:^(
YapDatabaseReadWriteTransaction *transaction) {
[[TSDatabaseView unseenDatabaseViewExtension:transaction]
enumerateRowsInGroup:message.uniqueThreadId
usingBlock:^(NSString *collection,
NSString *key,
id object,
id metadata,
NSUInteger index,
BOOL *stop) {
TSInteraction *interaction = object;
if (interaction.timestampForSorting > message.timestampForSorting) {
*stop = YES;
return;
}
id<OWSReadTracking> possiblyRead = (id<OWSReadTracking>)object;
OWSAssert(!possiblyRead.read);
[interactionsToMarkAsRead addObject:possiblyRead];
}];
for (id<OWSReadTracking> interaction in interactionsToMarkAsRead) {
// * Don't send a read receipt in response to a read receipt.
// * Don't update expiration; we'll do that in the next statement.
[interaction markAsReadWithTransaction:transaction sendReadReceipt:NO updateExpiration:NO];
if ([interaction isKindOfClass:[TSMessage class]]) {
TSMessage *otherMessage = (TSMessage *)interaction;
// Update expiration using the timestamp from the readReceipt.
[OWSDisappearingMessagesJob setExpirationForMessage:otherMessage
expirationStartedAt:readReceipt.timestamp];
// Fire event that will cancel any pending notifications for this message.
dispatch_async(dispatch_get_main_queue(), ^{
[[NSNotificationCenter defaultCenter]
postNotificationName:OWSReadReceiptsProcessorMarkedMessageAsReadNotification
object:otherMessage];
});
}
[[TSDatabaseView unseenDatabaseViewExtension:transaction]
enumerateRowsInGroup:message.uniqueThreadId
usingBlock:^(NSString *collection,
NSString *key,
id object,
id metadata,
NSUInteger index,
BOOL *stop) {
TSInteraction *interaction = object;
if (interaction.timestampForSorting > message.timestampForSorting) {
*stop = YES;
return;
}
id<OWSReadTracking> possiblyRead = (id<OWSReadTracking>)object;
OWSAssert(!possiblyRead.read);
[interactionsToMarkAsRead addObject:possiblyRead];
}];
for (id<OWSReadTracking> interaction in interactionsToMarkAsRead) {
// * Don't send a read receipt in response to a read receipt.
// * Don't update expiration; we'll do that in the next statement.
[interaction markAsReadWithTransaction:transaction sendReadReceipt:NO updateExpiration:NO];
if ([interaction isKindOfClass:[TSMessage class]]) {
TSMessage *otherMessage = (TSMessage *)interaction;
// Update expiration using the timestamp from the readReceipt.
[OWSDisappearingMessagesJob setExpirationForMessage:otherMessage
expirationStartedAt:readReceipt.timestamp];
// Fire event that will cancel any pending notifications for this message.
dispatch_async(dispatch_get_main_queue(), ^{
[[NSNotificationCenter defaultCenter]
postNotificationName:OWSReadReceiptsProcessorMarkedMessageAsReadNotification
object:otherMessage];
});
}
}];
}
// If it was previously saved, no need to keep it around any longer.
[readReceipt remove];
[readReceipt removeWithTransaction:transaction];
} else {
DDLogDebug(@"%@ Received read receipt for an unknown message. Saving it for later.", self.tag);
[readReceipt save];
[readReceipt saveWithTransaction:transaction];
}
}
}

@ -1,5 +1,6 @@
// Created by Michael Kirk on 9/23/16.
// Copyright © 2016 Open Whisper Systems. All rights reserved.
//
// Copyright (c) 2017 Open Whisper Systems. All rights reserved.
//
NS_ASSUME_NONNULL_BEGIN
@ -7,6 +8,7 @@ NS_ASSUME_NONNULL_BEGIN
@class OWSMessageSender;
@class TSNetworkManager;
@class TSAttachmentStream;
@class YapDatabaseReadWriteTransaction;
@interface OWSRecordTranscriptJob : NSObject
@ -15,7 +17,8 @@ NS_ASSUME_NONNULL_BEGIN
messageSender:(OWSMessageSender *)messageSender
networkManager:(TSNetworkManager *)networkManager NS_DESIGNATED_INITIALIZER;
- (void)runWithAttachmentHandler:(void (^)(TSAttachmentStream *attachmentStream))attachmentHandler;
- (void)runWithAttachmentHandler:(void (^)(TSAttachmentStream *attachmentStream))attachmentHandler
transaction:(YapDatabaseReadWriteTransaction *)transaction;
@end

@ -41,16 +41,22 @@ NS_ASSUME_NONNULL_BEGIN
}
- (void)runWithAttachmentHandler:(void (^)(TSAttachmentStream *attachmentStream))attachmentHandler
transaction:(YapDatabaseReadWriteTransaction *)transaction
{
OWSAssert(transaction);
OWSIncomingSentMessageTranscript *transcript = self.incomingSentMessageTranscript;
DDLogDebug(@"%@ Recording transcript: %@", self.tag, transcript);
if (transcript.isEndSessionMessage) {
DDLogInfo(@"%@ EndSession was sent to recipient: %@.", self.tag, transcript.recipientId);
[self.storageManager deleteAllSessionsForContact:transcript.recipientId];
// NOTE: We dispatch_sync() here.
dispatch_sync([OWSDispatch sessionStoreQueue], ^{
[self.storageManager deleteAllSessionsForContact:transcript.recipientId];
});
[[[TSInfoMessage alloc] initWithTimestamp:transcript.timestamp
inThread:transcript.thread
messageType:TSInfoMessageTypeSessionDidEnd] save];
messageType:TSInfoMessageTypeSessionDidEnd] saveWithTransaction:transaction];
// Don't continue processing lest we print a bubble for the session reset.
return;
@ -62,7 +68,8 @@ NS_ASSUME_NONNULL_BEGIN
timestamp:transcript.timestamp
relay:transcript.relay
thread:thread
networkManager:self.networkManager];
networkManager:self.networkManager
transaction:transaction];
// TODO group updates. Currently desktop doesn't support group updates, so not a problem yet.
TSOutgoingMessage *outgoingMessage =
@ -79,10 +86,13 @@ NS_ASSUME_NONNULL_BEGIN
return;
}
[self.messageSender handleMessageSentRemotely:outgoingMessage sentAt:transcript.expirationStartedAt];
[self.messageSender handleMessageSentRemotely:outgoingMessage
sentAt:transcript.expirationStartedAt
transaction:transaction];
[attachmentsProcessor
fetchAttachmentsForMessage:outgoingMessage
transaction:transaction
success:attachmentHandler
failure:^(NSError *_Nonnull error) {
DDLogError(@"%@ failed to fetch transcripts attachments for message: %@",
@ -101,7 +111,7 @@ NS_ASSUME_NONNULL_BEGIN
expiresInSeconds:transcript.expirationDuration
expireStartedAt:transcript.expirationStartedAt];
// Since textMessage is a new message, updateWithWasSentAndDelivered will save it.
[textMessage updateWithWasSentAndDelivered];
[textMessage updateWithWasSentFromLinkedDeviceWithTransaction:transaction];
}
}

@ -8,12 +8,14 @@ extern NSString *const kAttachmentDownloadProgressNotification;
extern NSString *const kAttachmentDownloadProgressKey;
extern NSString *const kAttachmentDownloadAttachmentIDKey;
@class TSAttachmentPointer;
@class TSAttachmentStream;
@class TSMessage;
@class TSThread;
@class TSNetworkManager;
@class TSStorageManager;
@class TSThread;
@class OWSSignalServiceProtosAttachmentPointer;
@class TSAttachmentStream;
@class TSAttachmentPointer;
@class YapDatabaseReadWriteTransaction;
/**
* Given incoming attachment protos, determines which we support.
@ -31,7 +33,8 @@ extern NSString *const kAttachmentDownloadAttachmentIDKey;
timestamp:(uint64_t)timestamp
relay:(nullable NSString *)relay
thread:(TSThread *)thread
networkManager:(TSNetworkManager *)networkManager NS_DESIGNATED_INITIALIZER;
networkManager:(TSNetworkManager *)networkManager
transaction:(YapDatabaseReadWriteTransaction *)transaction NS_DESIGNATED_INITIALIZER;
/*
* Retry fetching failed attachment download
@ -40,6 +43,11 @@ extern NSString *const kAttachmentDownloadAttachmentIDKey;
networkManager:(TSNetworkManager *)networkManager NS_DESIGNATED_INITIALIZER;
- (void)fetchAttachmentsForMessage:(nullable TSMessage *)message
storageManager:(TSStorageManager *)storageManager
success:(void (^)(TSAttachmentStream *attachmentStream))successHandler
failure:(void (^)(NSError *error))failureHandler;
- (void)fetchAttachmentsForMessage:(nullable TSMessage *)message
transaction:(YapDatabaseReadWriteTransaction *)transaction
success:(void (^)(TSAttachmentStream *attachmentStream))successHandler
failure:(void (^)(NSError *error))failureHandler;
@end

@ -15,6 +15,7 @@
#import "TSInfoMessage.h"
#import "TSMessage.h"
#import "TSNetworkManager.h"
#import "TSStorageManager.h"
#import "TSThread.h"
#import <YapDatabase/YapDatabaseConnection.h>
@ -58,6 +59,7 @@ static const CGFloat kAttachmentDownloadProgressTheta = 0.001f;
relay:(nullable NSString *)relay
thread:(TSThread *)thread
networkManager:(TSNetworkManager *)networkManager
transaction:(YapDatabaseReadWriteTransaction *)transaction
{
self = [super init];
if (!self) {
@ -97,7 +99,7 @@ static const CGFloat kAttachmentDownloadProgressTheta = 0.001f;
[attachmentIds addObject:pointer.uniqueId];
[pointer save];
[pointer saveWithTransaction:transaction];
[supportedAttachmentPointers addObject:pointer];
[supportedAttachmentIds addObject:pointer.uniqueId];
}
@ -110,31 +112,60 @@ static const CGFloat kAttachmentDownloadProgressTheta = 0.001f;
}
- (void)fetchAttachmentsForMessage:(nullable TSMessage *)message
storageManager:(TSStorageManager *)storageManager
success:(void (^)(TSAttachmentStream *attachmentStream))successHandler
failure:(void (^)(NSError *error))failureHandler
{
[[storageManager newDatabaseConnection] readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) {
[self fetchAttachmentsForMessage:message
transaction:transaction
success:successHandler
failure:failureHandler];
}];
}
- (void)fetchAttachmentsForMessage:(nullable TSMessage *)message
transaction:(YapDatabaseReadWriteTransaction *)transaction
success:(void (^)(TSAttachmentStream *attachmentStream))successHandler
failure:(void (^)(NSError *error))failureHandler
{
OWSAssert(transaction);
for (TSAttachmentPointer *attachmentPointer in self.supportedAttachmentPointers) {
[self retrieveAttachment:attachmentPointer message:message success:successHandler failure:failureHandler];
[self retrieveAttachment:attachmentPointer
message:message
transaction:transaction
success:successHandler
failure:failureHandler];
}
}
- (void)retrieveAttachment:(TSAttachmentPointer *)attachment
message:(nullable TSMessage *)message
transaction:(YapDatabaseReadWriteTransaction *)transaction
success:(void (^)(TSAttachmentStream *attachmentStream))successHandler
failure:(void (^)(NSError *error))failureHandler
{
[self setAttachment:attachment isDownloadingInMessage:message];
OWSAssert(transaction);
[self setAttachment:attachment isDownloadingInMessage:message transaction:transaction];
void (^markAndHandleFailure)(NSError *) = ^(NSError *error) {
[self setAttachment:attachment didFailInMessage:message];
return failureHandler(error);
// Ensure enclosing transaction is complete.
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
[self setAttachment:attachment didFailInMessage:message];
failureHandler(error);
});
};
void (^markAndHandleSuccess)(TSAttachmentStream *attachmentStream) = ^(TSAttachmentStream *attachmentStream) {
successHandler(attachmentStream);
if (message) {
[message touch];
}
// Ensure enclosing transaction is complete.
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
successHandler(attachmentStream);
if (message) {
[message touch];
}
});
};
if (attachment.serverId < 100) {
@ -352,12 +383,16 @@ static const CGFloat kAttachmentDownloadProgressTheta = 0.001f;
});
}
- (void)setAttachment:(TSAttachmentPointer *)pointer isDownloadingInMessage:(nullable TSMessage *)message
- (void)setAttachment:(TSAttachmentPointer *)pointer
isDownloadingInMessage:(nullable TSMessage *)message
transaction:(YapDatabaseReadWriteTransaction *)transaction
{
OWSAssert(transaction);
pointer.state = TSAttachmentPointerStateDownloading;
[pointer save];
[pointer saveWithTransaction:transaction];
if (message) {
[message touch];
[message touchWithTransaction:transaction];
}
}

@ -101,7 +101,9 @@ extern NSString *const TSIncomingMessageWasReadOnThisDeviceNotification;
* When the message was created in milliseconds since epoch
*
*/
+ (nullable instancetype)findMessageWithAuthorId:(NSString *)authorId timestamp:(uint64_t)timestamp;
+ (nullable instancetype)findMessageWithAuthorId:(NSString *)authorId
timestamp:(uint64_t)timestamp
transaction:(YapDatabaseReadWriteTransaction *)transaction;
@property (nonatomic, readonly) NSString *authorId;

@ -75,37 +75,39 @@ NSString *const TSIncomingMessageWasReadOnThisDeviceNotification = @"TSIncomingM
return self;
}
+ (nullable instancetype)findMessageWithAuthorId:(NSString *)authorId timestamp:(uint64_t)timestamp
+ (nullable instancetype)findMessageWithAuthorId:(NSString *)authorId
timestamp:(uint64_t)timestamp
transaction:(YapDatabaseReadWriteTransaction *)transaction
{
OWSAssert(transaction);
__block TSIncomingMessage *foundMessage;
[self.dbReadWriteConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) {
// In theory we could build a new secondaryIndex for (authorId,timestamp), but in practice there should
// be *very* few (millisecond) timestamps with multiple authors.
[TSDatabaseSecondaryIndexes
enumerateMessagesWithTimestamp:timestamp
withBlock:^(NSString *collection, NSString *key, BOOL *stop) {
TSInteraction *interaction =
[TSInteraction fetchObjectWithUniqueID:key transaction:transaction];
if ([interaction isKindOfClass:[TSIncomingMessage class]]) {
TSIncomingMessage *message = (TSIncomingMessage *)interaction;
// Only groupthread sets authorId, thus this crappy code.
// TODO ALL incoming messages should have an authorId.
NSString *messageAuthorId;
if (message.authorId) { // Group Thread
messageAuthorId = message.authorId;
} else { // Contact Thread
messageAuthorId =
[TSContactThread contactIdFromThreadId:message.uniqueThreadId];
}
if ([messageAuthorId isEqualToString:authorId]) {
foundMessage = message;
}
// In theory we could build a new secondaryIndex for (authorId,timestamp), but in practice there should
// be *very* few (millisecond) timestamps with multiple authors.
[TSDatabaseSecondaryIndexes
enumerateMessagesWithTimestamp:timestamp
withBlock:^(NSString *collection, NSString *key, BOOL *stop) {
TSInteraction *interaction =
[TSInteraction fetchObjectWithUniqueID:key transaction:transaction];
if ([interaction isKindOfClass:[TSIncomingMessage class]]) {
TSIncomingMessage *message = (TSIncomingMessage *)interaction;
// Only groupthread sets authorId, thus this crappy code.
// TODO ALL incoming messages should have an authorId.
NSString *messageAuthorId;
if (message.authorId) { // Group Thread
messageAuthorId = message.authorId;
} else { // Contact Thread
messageAuthorId =
[TSContactThread contactIdFromThreadId:message.uniqueThreadId];
}
if ([messageAuthorId isEqualToString:authorId]) {
foundMessage = message;
}
}
usingTransaction:transaction];
}];
}
usingTransaction:transaction];
return foundMessage;
}

@ -170,8 +170,7 @@ typedef NS_ENUM(NSInteger, TSGroupMetaMessage) {
- (void)updateWithCustomMessage:(NSString *)customMessage;
- (void)updateWithWasDeliveredWithTransaction:(YapDatabaseReadWriteTransaction *)transaction;
- (void)updateWithWasDelivered;
- (void)updateWithWasSentAndDelivered;
- (void)updateWithWasSentAndDeliveredFromLinkedDevice;
- (void)updateWithWasSentFromLinkedDeviceWithTransaction:(YapDatabaseReadWriteTransaction *)transaction;
- (void)updateWithSingleGroupRecipient:(NSString *)singleGroupRecipient
transaction:(YapDatabaseReadWriteTransaction *)transaction;

@ -323,27 +323,16 @@ NSString *const kTSOutgoingMessageSentRecipientAll = @"kTSOutgoingMessageSentRec
}];
}
- (void)updateWithWasSentAndDelivered
- (void)updateWithWasSentFromLinkedDeviceWithTransaction:(YapDatabaseReadWriteTransaction *)transaction
{
[self.dbReadWriteConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) {
[self applyChangeToSelfAndLatestOutgoingMessage:transaction
changeBlock:^(TSOutgoingMessage *message) {
[message setMessageState:TSOutgoingMessageStateSentToService];
[message setWasDelivered:YES];
}];
}];
}
OWSAssert(transaction);
- (void)updateWithWasSentAndDeliveredFromLinkedDevice
{
[self.dbReadWriteConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) {
[self applyChangeToSelfAndLatestOutgoingMessage:transaction
changeBlock:^(TSOutgoingMessage *message) {
[message setMessageState:TSOutgoingMessageStateSentToService];
[message setWasDelivered:YES];
[message setIsFromLinkedDevice:YES];
}];
}];
[self applyChangeToSelfAndLatestOutgoingMessage:transaction
changeBlock:^(TSOutgoingMessage *message) {
[message setMessageState:TSOutgoingMessageStateSentToService];
[message setWasDelivered:YES];
[message setIsFromLinkedDevice:YES];
}];
}
- (void)updateWithSingleGroupRecipient:(NSString *)singleGroupRecipient

@ -8,6 +8,7 @@
#import "TSMessagesManager.h"
#import "TSStorageManager.h"
#import "TSYapDatabaseObject.h"
#import "Threading.h"
#import <YapDatabase/YapDatabaseConnection.h>
#import <YapDatabase/YapDatabaseTransaction.h>
#import <YapDatabase/YapDatabaseView.h>
@ -198,10 +199,12 @@ NSString *const OWSBatchMessageProcessingJobFinderExtensionGroup = @"OWSBatchMes
@interface OWSBatchMessageProcessingQueue : NSObject
@property (nonatomic, readonly) TSMessagesManager *messagesManager;
@property (nonatomic, readonly) YapDatabaseConnection *dbReadWriteConnection;
@property (nonatomic, readonly) OWSBatchMessageProcessingJobFinder *finder;
@property (nonatomic) BOOL isDrainingQueue;
- (instancetype)initWithMessagesManager:(TSMessagesManager *)messagesManager
storageManager:(TSStorageManager *)storageManager
finder:(OWSBatchMessageProcessingJobFinder *)finder NS_DESIGNATED_INITIALIZER;
- (instancetype)init NS_UNAVAILABLE;
@ -212,6 +215,7 @@ NSString *const OWSBatchMessageProcessingJobFinderExtensionGroup = @"OWSBatchMes
@implementation OWSBatchMessageProcessingQueue
- (instancetype)initWithMessagesManager:(TSMessagesManager *)messagesManager
storageManager:(TSStorageManager *)storageManager
finder:(OWSBatchMessageProcessingJobFinder *)finder
{
OWSSingletonAssert();
@ -222,12 +226,28 @@ NSString *const OWSBatchMessageProcessingJobFinderExtensionGroup = @"OWSBatchMes
}
_messagesManager = messagesManager;
_dbReadWriteConnection = [storageManager newDatabaseConnection];
_finder = finder;
_isDrainingQueue = NO;
[[NSNotificationCenter defaultCenter] addObserver:self
selector:@selector(databaseViewRegistrationComplete)
name:kNSNotificationName_DatabaseViewRegistrationComplete
object:nil];
return self;
}
- (void)dealloc
{
[[NSNotificationCenter defaultCenter] removeObserver:self];
}
- (void)databaseViewRegistrationComplete
{
[self drainQueue];
}
#pragma mark - instance methods
- (void)enqueueEnvelopeData:(NSData *)envelopeData plaintextData:(NSData *_Nullable)plaintextData
@ -239,7 +259,13 @@ NSString *const OWSBatchMessageProcessingJobFinderExtensionGroup = @"OWSBatchMes
- (void)drainQueue
{
dispatch_async(dispatch_get_main_queue(), ^{
DispatchMainThreadSafe(^{
if ([TSDatabaseView hasPendingViewRegistrations]) {
// We don't want to process incoming messages until database
// view registration is complete.
return;
}
if (self.isDrainingQueue) {
return;
}
@ -274,8 +300,12 @@ NSString *const OWSBatchMessageProcessingJobFinderExtensionGroup = @"OWSBatchMes
- (void)processJob:(OWSBatchMessageProcessingJob *)job completion:(void (^)())completion
{
dispatch_async(dispatch_get_main_queue(), ^{
[self.messagesManager processEnvelope:job.envelopeProto plaintextData:job.plaintextData];
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
[self.dbReadWriteConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) {
[self.messagesManager processEnvelope:job.envelopeProto
plaintextData:job.plaintextData
transaction:transaction];
}];
completion();
});
}
@ -309,6 +339,7 @@ NSString *const OWSBatchMessageProcessingJobFinderExtensionGroup = @"OWSBatchMes
- (instancetype)initWithDBConnection:(YapDatabaseConnection *)dbConnection
messagesManager:(TSMessagesManager *)messagesManager
storageManager:(TSStorageManager *)storageManager
{
OWSSingletonAssert();
@ -320,7 +351,9 @@ NSString *const OWSBatchMessageProcessingJobFinderExtensionGroup = @"OWSBatchMes
OWSBatchMessageProcessingJobFinder *finder =
[[OWSBatchMessageProcessingJobFinder alloc] initWithDBConnection:dbConnection];
OWSBatchMessageProcessingQueue *processingQueue =
[[OWSBatchMessageProcessingQueue alloc] initWithMessagesManager:messagesManager finder:finder];
[[OWSBatchMessageProcessingQueue alloc] initWithMessagesManager:messagesManager
storageManager:storageManager
finder:finder];
_processingQueue = processingQueue;
@ -332,8 +365,9 @@ NSString *const OWSBatchMessageProcessingJobFinderExtensionGroup = @"OWSBatchMes
// For concurrency coherency we use the same dbConnection to persist and read the unprocessed envelopes
YapDatabaseConnection *dbConnection = [[TSStorageManager sharedManager].database newConnection];
TSMessagesManager *messagesManager = [TSMessagesManager sharedManager];
TSStorageManager *storageManager = [TSStorageManager sharedManager];
return [self initWithDBConnection:dbConnection messagesManager:messagesManager];
return [self initWithDBConnection:dbConnection messagesManager:messagesManager storageManager:storageManager];
}
+ (instancetype)sharedInstance

@ -9,6 +9,7 @@
#import "TSMessagesManager.h"
#import "TSStorageManager.h"
#import "TSYapDatabaseObject.h"
#import "Threading.h"
#import <YapDatabase/YapDatabaseConnection.h>
#import <YapDatabase/YapDatabaseTransaction.h>
#import <YapDatabase/YapDatabaseView.h>
@ -226,9 +227,24 @@ NSString *const OWSMessageProcessingJobFinderExtensionGroup = @"OWSMessageProces
_finder = finder;
_isDrainingQueue = NO;
[[NSNotificationCenter defaultCenter] addObserver:self
selector:@selector(databaseViewRegistrationComplete)
name:kNSNotificationName_DatabaseViewRegistrationComplete
object:nil];
return self;
}
- (void)dealloc
{
[[NSNotificationCenter defaultCenter] removeObserver:self];
}
- (void)databaseViewRegistrationComplete
{
[self drainQueue];
}
#pragma mark - instance methods
- (void)enqueueEnvelopeForProcessing:(OWSSignalServiceProtosEnvelope *)envelope
@ -238,14 +254,20 @@ NSString *const OWSMessageProcessingJobFinderExtensionGroup = @"OWSMessageProces
- (void)drainQueue
{
AssertIsOnMainThread();
if (self.isDrainingQueue) {
return;
}
self.isDrainingQueue = YES;
[self drainQueueWorkStep];
DispatchMainThreadSafe(^{
if ([TSDatabaseView hasPendingViewRegistrations]) {
// We don't want to process incoming messages until database
// view registration is complete.
return;
}
if (self.isDrainingQueue) {
return;
}
self.isDrainingQueue = YES;
[self drainQueueWorkStep];
});
}
- (void)drainQueueWorkStep
@ -369,9 +391,7 @@ NSString *const OWSMessageProcessingJobFinderExtensionGroup = @"OWSMessageProces
- (void)handleAnyUnprocessedEnvelopesAsync
{
dispatch_async(dispatch_get_main_queue(), ^{
[self.processingQueue drainQueue];
});
[self.processingQueue drainQueue];
}
- (void)handleReceivedEnvelope:(OWSSignalServiceProtosEnvelope *)envelope

@ -15,6 +15,7 @@ NS_ASSUME_NONNULL_BEGIN
@class TSOutgoingMessage;
@class TSStorageManager;
@class TSThread;
@class YapDatabaseReadWriteTransaction;
@protocol ContactsManagerProtocol;
@ -67,6 +68,10 @@ NS_SWIFT_NAME(MessageSender)
- (void)sendMessage:(TSOutgoingMessage *)message
success:(void (^)())successHandler
failure:(void (^)(NSError *error))failureHandler;
- (void)sendMessage:(TSOutgoingMessage *)message
transaction:(YapDatabaseReadWriteTransaction *_Nullable)transaction
success:(void (^)())successHandler
failure:(void (^)(NSError *error))failureHandler;
/**
* Takes care of allocating and uploading the attachment, then sends the message.
@ -89,7 +94,9 @@ NS_SWIFT_NAME(MessageSender)
success:(void (^)())successHandler
failure:(void (^)(NSError *error))failureHandler;
- (void)handleMessageSentRemotely:(TSOutgoingMessage *)message sentAt:(uint64_t)sentAt;
- (void)handleMessageSentRemotely:(TSOutgoingMessage *)message
sentAt:(uint64_t)sentAt
transaction:(YapDatabaseReadWriteTransaction *)transaction;
/**
* Set local configuration to match that of the of `outgoingMessage`'s sender

@ -424,11 +424,22 @@ NSString *const OWSMessageSenderRateLimitedException = @"RateLimitedException";
- (void)sendMessage:(TSOutgoingMessage *)message
success:(void (^)())successHandler
failure:(void (^)(NSError *error))failureHandler
{
[self sendMessage:message transaction:nil success:successHandler failure:failureHandler];
}
- (void)sendMessage:(TSOutgoingMessage *)message
transaction:(YapDatabaseReadWriteTransaction *_Nullable)transaction
success:(void (^)())successHandler
failure:(void (^)(NSError *error))failureHandler
{
OWSAssert(message);
AssertIsOnMainThread();
[message updateWithMessageState:TSOutgoingMessageStateAttemptingOut];
if (transaction) {
[message updateWithMessageState:TSOutgoingMessageStateAttemptingOut transaction:transaction];
} else {
[message updateWithMessageState:TSOutgoingMessageStateAttemptingOut];
}
OWSSendMessageOperation *sendMessageOperation = [[OWSSendMessageOperation alloc] initWithMessage:message
messageSender:self
success:successHandler
@ -1082,9 +1093,14 @@ NSString *const OWSMessageSenderRateLimitedException = @"RateLimitedException";
[OWSDisappearingMessagesJob setExpirationForMessage:message];
}
- (void)handleMessageSentRemotely:(TSOutgoingMessage *)message sentAt:(uint64_t)sentAt
- (void)handleMessageSentRemotely:(TSOutgoingMessage *)message
sentAt:(uint64_t)sentAt
transaction:(YapDatabaseReadWriteTransaction *)transaction
{
[message updateWithWasSentAndDeliveredFromLinkedDevice];
OWSAssert(message);
OWSAssert(transaction);
[message updateWithWasSentFromLinkedDeviceWithTransaction:transaction];
[self becomeConsistentWithDisappearingConfigurationForMessage:message];
[OWSDisappearingMessagesJob setExpirationForMessage:message expirationStartedAt:sentAt];
}

@ -36,7 +36,9 @@ typedef void (^MessageManagerCompletionBlock)();
successBlock:(DecryptSuccessBlock)successBlock
failureBlock:(DecryptFailureBlock)failureBlock;
- (void)processEnvelope:(OWSSignalServiceProtosEnvelope *)envelope plaintextData:(NSData *_Nullable)plaintextData;
- (void)processEnvelope:(OWSSignalServiceProtosEnvelope *)envelope
plaintextData:(NSData *_Nullable)plaintextData
transaction:(YapDatabaseReadWriteTransaction *)transaction;
- (NSUInteger)unreadMessagesCount;
- (NSUInteger)unreadMessagesCountExcept:(TSThread *)thread;

@ -447,9 +447,12 @@ NS_ASSUME_NONNULL_BEGIN
#pragma mark - message handling
// TODO: Add transaction parameter.
- (void)processEnvelope:(OWSSignalServiceProtosEnvelope *)envelope plaintextData:(NSData *_Nullable)plaintextData
- (void)processEnvelope:(OWSSignalServiceProtosEnvelope *)envelope
plaintextData:(NSData *_Nullable)plaintextData
transaction:(YapDatabaseReadWriteTransaction *)transaction
{
OWSAssert(envelope);
OWSAssert(transaction);
OWSAssert([TSAccountManager isRegistered]);
DDLogInfo(@"%@ received envelope: %@", self.tag, [self descriptionForEnvelope:envelope]);
@ -461,7 +464,7 @@ NS_ASSUME_NONNULL_BEGIN
case OWSSignalServiceProtosEnvelopeTypeCiphertext:
case OWSSignalServiceProtosEnvelopeTypePrekeyBundle:
if (plaintextData) {
[self handleEnvelope:envelope plaintextData:plaintextData];
[self handleEnvelope:envelope plaintextData:plaintextData transaction:transaction];
} else {
OWSFail(
@"%@ missing decrypted data for envelope: %@", self.tag, [self descriptionForEnvelope:envelope]);
@ -469,7 +472,7 @@ NS_ASSUME_NONNULL_BEGIN
break;
case OWSSignalServiceProtosEnvelopeTypeReceipt:
OWSAssert(!plaintextData);
[self handleDeliveryReceipt:envelope];
[self handleDeliveryReceipt:envelope transaction:transaction];
break;
// Other messages are just dismissed for now.
case OWSSignalServiceProtosEnvelopeTypeKeyExchange:
@ -485,26 +488,33 @@ NS_ASSUME_NONNULL_BEGIN
}
- (void)handleDeliveryReceipt:(OWSSignalServiceProtosEnvelope *)envelope
transaction:(YapDatabaseReadWriteTransaction *)transaction
{
[self.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) {
TSInteraction *interaction =
[TSInteraction interactionForTimestamp:envelope.timestamp withTransaction:transaction];
if ([interaction isKindOfClass:[TSOutgoingMessage class]]) {
TSOutgoingMessage *outgoingMessage = (TSOutgoingMessage *)interaction;
[outgoingMessage updateWithWasDeliveredWithTransaction:transaction];
}
}];
OWSAssert(envelope);
OWSAssert(transaction);
TSInteraction *interaction = [TSInteraction interactionForTimestamp:envelope.timestamp withTransaction:transaction];
if ([interaction isKindOfClass:[TSOutgoingMessage class]]) {
TSOutgoingMessage *outgoingMessage = (TSOutgoingMessage *)interaction;
[outgoingMessage updateWithWasDeliveredWithTransaction:transaction];
}
}
- (void)handleEnvelope:(OWSSignalServiceProtosEnvelope *)envelope plaintextData:(NSData *)plaintextData
- (void)handleEnvelope:(OWSSignalServiceProtosEnvelope *)envelope
plaintextData:(NSData *)plaintextData
transaction:(YapDatabaseReadWriteTransaction *)transaction
{
OWSAssert(envelope);
OWSAssert(plaintextData);
OWSAssert(transaction);
OWSAssert(envelope.hasTimestamp && envelope.timestamp > 0);
OWSAssert(envelope.hasSource && envelope.source.length > 0);
OWSAssert(envelope.hasSourceDevice && envelope.sourceDevice > 0);
BOOL duplicateEnvelope = [self.incomingMessageFinder existsMessageWithTimestamp:envelope.timestamp
sourceId:envelope.source
sourceDeviceId:envelope.sourceDevice];
sourceDeviceId:envelope.sourceDevice
transaction:transaction];
if (duplicateEnvelope) {
DDLogInfo(@"%@ Ignoring previously received envelope from %@ with timestamp: %llu",
self.tag,
@ -518,9 +528,9 @@ NS_ASSUME_NONNULL_BEGIN
DDLogInfo(@"%@ handling content: <Content: %@>", self.tag, [self descriptionForContent:content]);
if (content.hasSyncMessage) {
[self handleIncomingEnvelope:envelope withSyncMessage:content.syncMessage];
[self handleIncomingEnvelope:envelope withSyncMessage:content.syncMessage transaction:transaction];
} else if (content.hasDataMessage) {
[self handleIncomingEnvelope:envelope withDataMessage:content.dataMessage];
[self handleIncomingEnvelope:envelope withDataMessage:content.dataMessage transaction:transaction];
} else if (content.hasCallMessage) {
[self handleIncomingEnvelope:envelope withCallMessage:content.callMessage];
} else if (content.hasNullMessage) {
@ -533,20 +543,23 @@ NS_ASSUME_NONNULL_BEGIN
[OWSSignalServiceProtosDataMessage parseFromData:plaintextData];
DDLogInfo(@"%@ handling message: <DataMessage: %@ />", self.tag, [self descriptionForDataMessage:dataMessage]);
[self handleIncomingEnvelope:envelope withDataMessage:dataMessage];
[self handleIncomingEnvelope:envelope withDataMessage:dataMessage transaction:transaction];
} else {
OWSProdInfoWEnvelope([OWSAnalyticsEvents messageManagerErrorEnvelopeNoActionablePayload], envelope);
}
}
- (void)handleIncomingEnvelope:(OWSSignalServiceProtosEnvelope *)incomingEnvelope
- (void)handleIncomingEnvelope:(OWSSignalServiceProtosEnvelope *)envelope
withDataMessage:(OWSSignalServiceProtosDataMessage *)dataMessage
transaction:(YapDatabaseReadWriteTransaction *)transaction
{
OWSAssert([NSThread isMainThread]);
OWSAssert(envelope);
OWSAssert(dataMessage);
OWSAssert(transaction);
if ([dataMessage hasProfileKey]) {
NSData *profileKey = [dataMessage profileKey];
NSString *recipientId = incomingEnvelope.source;
NSString *recipientId = envelope.source;
if (profileKey.length == kAES256_KeyByteLength) {
[self.profileManager setProfileKeyData:profileKey forRecipientId:recipientId];
} else {
@ -556,64 +569,56 @@ NS_ASSUME_NONNULL_BEGIN
}
if (dataMessage.hasGroup) {
__block BOOL unknownGroup = NO;
[self.dbConnection readWithBlock:^(YapDatabaseReadTransaction *transaction) {
TSGroupModel *emptyModelToFillOutId =
[[TSGroupModel alloc] initWithTitle:nil memberIds:nil image:nil groupId:dataMessage.group.id];
TSGroupThread *gThread = [TSGroupThread threadWithGroupModel:emptyModelToFillOutId transaction:transaction];
if (gThread == nil && dataMessage.group.type != OWSSignalServiceProtosGroupContextTypeUpdate) {
unknownGroup = YES;
}
}];
TSGroupModel *emptyModelToFillOutId =
[[TSGroupModel alloc] initWithTitle:nil memberIds:nil image:nil groupId:dataMessage.group.id];
TSGroupThread *gThread = [TSGroupThread threadWithGroupModel:emptyModelToFillOutId transaction:transaction];
BOOL unknownGroup = NO;
if (gThread == nil && dataMessage.group.type != OWSSignalServiceProtosGroupContextTypeUpdate) {
unknownGroup = YES;
}
if (unknownGroup) {
if (dataMessage.group.type == OWSSignalServiceProtosGroupContextTypeRequestInfo) {
DDLogInfo(@"%@ Ignoring group info request for group I don't know about from: %@",
self.tag,
incomingEnvelope.source);
DDLogInfo(
@"%@ Ignoring group info request for group I don't know about from: %@", self.tag, envelope.source);
return;
}
// FIXME: https://github.com/WhisperSystems/Signal-iOS/issues/1340
DDLogInfo(@"%@ Received message from group that I left or don't know about from: %@",
self.tag,
envelopeAddress(incomingEnvelope));
envelopeAddress(envelope));
NSString *recipientId = incomingEnvelope.source;
NSString *recipientId = envelope.source;
__block TSThread *thread;
[[TSStorageManager sharedManager].dbReadWriteConnection
readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) {
thread = [TSContactThread getOrCreateThreadWithContactId:recipientId transaction:transaction];
}];
TSThread *thread = [TSContactThread getOrCreateThreadWithContactId:recipientId transaction:transaction];
NSData *groupId = dataMessage.group.id;
OWSAssert(groupId);
OWSSyncGroupsRequestMessage *syncGroupsRequestMessage =
[[OWSSyncGroupsRequestMessage alloc] initWithThread:thread groupId:groupId];
[self.messageSender sendMessage:syncGroupsRequestMessage
transaction:transaction
success:^{
DDLogWarn(@"%@ Successfully sent Request Group Info message.", self.tag);
}
failure:^(NSError *error) {
DDLogError(@"%@ Failed to send Request Group Info message with error: %@", self.tag, error);
}];
return;
}
}
if ((dataMessage.flags & OWSSignalServiceProtosDataMessageFlagsEndSession) != 0) {
[self handleEndSessionMessageWithEnvelope:incomingEnvelope dataMessage:dataMessage];
[self handleEndSessionMessageWithEnvelope:envelope dataMessage:dataMessage transaction:transaction];
} else if ((dataMessage.flags & OWSSignalServiceProtosDataMessageFlagsExpirationTimerUpdate) != 0) {
[self handleExpirationTimerUpdateMessageWithEnvelope:incomingEnvelope dataMessage:dataMessage];
[self handleExpirationTimerUpdateMessageWithEnvelope:envelope dataMessage:dataMessage transaction:transaction];
} else if ((dataMessage.flags & OWSSignalServiceProtosDataMessageFlagsProfileKey) != 0) {
[self handleProfileKeyMessageWithEnvelope:incomingEnvelope dataMessage:dataMessage];
[self handleProfileKeyMessageWithEnvelope:envelope dataMessage:dataMessage];
} else if (dataMessage.attachments.count > 0) {
[self handleReceivedMediaWithEnvelope:incomingEnvelope dataMessage:dataMessage];
[self handleReceivedMediaWithEnvelope:envelope dataMessage:dataMessage transaction:transaction];
} else {
[self handleReceivedTextMessageWithEnvelope:incomingEnvelope dataMessage:dataMessage];
[self handleReceivedTextMessageWithEnvelope:envelope dataMessage:dataMessage transaction:transaction];
if ([self isDataMessageGroupAvatarUpdate:dataMessage]) {
DDLogVerbose(@"%@ Data message had group avatar attachment", self.tag);
[self handleReceivedGroupAvatarUpdateWithEnvelope:incomingEnvelope dataMessage:dataMessage];
[self handleReceivedGroupAvatarUpdateWithEnvelope:envelope dataMessage:dataMessage transaction:transaction];
}
}
}
@ -623,54 +628,64 @@ NS_ASSUME_NONNULL_BEGIN
return [TextSecureKitEnv sharedEnv].profileManager;
}
- (void)handleIncomingEnvelope:(OWSSignalServiceProtosEnvelope *)incomingEnvelope
- (void)handleIncomingEnvelope:(OWSSignalServiceProtosEnvelope *)envelope
withCallMessage:(OWSSignalServiceProtosCallMessage *)callMessage
{
OWSAssert(incomingEnvelope);
OWSAssert(envelope);
OWSAssert(callMessage);
if ([callMessage hasProfileKey]) {
NSData *profileKey = [callMessage profileKey];
NSString *recipientId = incomingEnvelope.source;
NSString *recipientId = envelope.source;
[self.profileManager setProfileKeyData:profileKey forRecipientId:recipientId];
}
if (callMessage.hasOffer) {
[self.callMessageHandler receivedOffer:callMessage.offer fromCallerId:incomingEnvelope.source];
} else if (callMessage.hasAnswer) {
[self.callMessageHandler receivedAnswer:callMessage.answer fromCallerId:incomingEnvelope.source];
} else if (callMessage.iceUpdate.count > 0) {
for (OWSSignalServiceProtosCallMessageIceUpdate *iceUpdate in callMessage.iceUpdate) {
[self.callMessageHandler receivedIceUpdate:iceUpdate fromCallerId:incomingEnvelope.source];
// TODO: Should we do this synchronously?
dispatch_async(dispatch_get_main_queue(), ^{
if (callMessage.hasOffer) {
[self.callMessageHandler receivedOffer:callMessage.offer fromCallerId:envelope.source];
} else if (callMessage.hasAnswer) {
[self.callMessageHandler receivedAnswer:callMessage.answer fromCallerId:envelope.source];
} else if (callMessage.iceUpdate.count > 0) {
for (OWSSignalServiceProtosCallMessageIceUpdate *iceUpdate in callMessage.iceUpdate) {
[self.callMessageHandler receivedIceUpdate:iceUpdate fromCallerId:envelope.source];
}
} else if (callMessage.hasHangup) {
DDLogVerbose(@"%@ Received CallMessage with Hangup.", self.tag);
[self.callMessageHandler receivedHangup:callMessage.hangup fromCallerId:envelope.source];
} else if (callMessage.hasBusy) {
[self.callMessageHandler receivedBusy:callMessage.busy fromCallerId:envelope.source];
} else {
OWSProdInfoWEnvelope([OWSAnalyticsEvents messageManagerErrorCallMessageNoActionablePayload], envelope);
}
} else if (callMessage.hasHangup) {
DDLogVerbose(@"%@ Received CallMessage with Hangup.", self.tag);
[self.callMessageHandler receivedHangup:callMessage.hangup fromCallerId:incomingEnvelope.source];
} else if (callMessage.hasBusy) {
[self.callMessageHandler receivedBusy:callMessage.busy fromCallerId:incomingEnvelope.source];
} else {
OWSProdInfoWEnvelope([OWSAnalyticsEvents messageManagerErrorCallMessageNoActionablePayload], incomingEnvelope);
}
});
}
- (void)handleReceivedGroupAvatarUpdateWithEnvelope:(OWSSignalServiceProtosEnvelope *)envelope
dataMessage:(OWSSignalServiceProtosDataMessage *)dataMessage
transaction:(YapDatabaseReadWriteTransaction *)transaction
{
OWSAssert([NSThread isMainThread]);
TSGroupThread *groupThread = [TSGroupThread getOrCreateThreadWithGroupIdData:dataMessage.group.id];
OWSAssert(envelope);
OWSAssert(dataMessage);
OWSAssert(transaction);
TSGroupThread *groupThread =
[TSGroupThread getOrCreateThreadWithGroupIdData:dataMessage.group.id transaction:transaction];
OWSAssert(groupThread);
OWSAttachmentsProcessor *attachmentsProcessor =
[[OWSAttachmentsProcessor alloc] initWithAttachmentProtos:@[ dataMessage.group.avatar ]
timestamp:envelope.timestamp
relay:envelope.relay
thread:groupThread
networkManager:self.networkManager];
networkManager:self.networkManager
transaction:transaction];
if (!attachmentsProcessor.hasSupportedAttachments) {
DDLogWarn(@"%@ received unsupported group avatar envelope", self.tag);
return;
}
[attachmentsProcessor fetchAttachmentsForMessage:nil
transaction:transaction
success:^(TSAttachmentStream *attachmentStream) {
[groupThread updateAvatarWithAttachmentStream:attachmentStream];
}
@ -684,16 +699,21 @@ NS_ASSUME_NONNULL_BEGIN
- (void)handleReceivedMediaWithEnvelope:(OWSSignalServiceProtosEnvelope *)envelope
dataMessage:(OWSSignalServiceProtosDataMessage *)dataMessage
transaction:(YapDatabaseReadWriteTransaction *)transaction
{
OWSAssert([NSThread isMainThread]);
TSThread *thread = [self threadForEnvelope:envelope dataMessage:dataMessage];
OWSAssert(envelope);
OWSAssert(dataMessage);
OWSAssert(transaction);
TSThread *thread = [self threadForEnvelope:envelope dataMessage:dataMessage transaction:transaction];
OWSAssert(thread);
OWSAttachmentsProcessor *attachmentsProcessor =
[[OWSAttachmentsProcessor alloc] initWithAttachmentProtos:dataMessage.attachments
timestamp:envelope.timestamp
relay:envelope.relay
thread:thread
networkManager:self.networkManager];
networkManager:self.networkManager
transaction:transaction];
if (!attachmentsProcessor.hasSupportedAttachments) {
DDLogWarn(@"%@ received unsupported media envelope", self.tag);
return;
@ -702,7 +722,8 @@ NS_ASSUME_NONNULL_BEGIN
TSIncomingMessage *_Nullable createdMessage =
[self handleReceivedEnvelope:envelope
withDataMessage:dataMessage
attachmentIds:attachmentsProcessor.supportedAttachmentIds];
attachmentIds:attachmentsProcessor.supportedAttachmentIds
transaction:transaction];
if (!createdMessage) {
return;
@ -711,6 +732,7 @@ NS_ASSUME_NONNULL_BEGIN
DDLogDebug(@"%@ incoming attachment message: %@", self.tag, createdMessage.debugDescription);
[attachmentsProcessor fetchAttachmentsForMessage:createdMessage
transaction:transaction
success:^(TSAttachmentStream *attachmentStream) {
DDLogDebug(
@"%@ successfully fetched attachment: %@ for message: %@", self.tag, attachmentStream, createdMessage);
@ -721,22 +743,25 @@ NS_ASSUME_NONNULL_BEGIN
}];
}
- (void)handleIncomingEnvelope:(OWSSignalServiceProtosEnvelope *)messageEnvelope
- (void)handleIncomingEnvelope:(OWSSignalServiceProtosEnvelope *)envelope
withSyncMessage:(OWSSignalServiceProtosSyncMessage *)syncMessage
transaction:(YapDatabaseReadWriteTransaction *)transaction
{
OWSAssert([NSThread isMainThread]);
OWSAssert(envelope);
OWSAssert(syncMessage);
OWSAssert(transaction);
OWSAssert([TSAccountManager isRegistered]);
NSString *localNumber = [TSAccountManager localNumber];
if (![localNumber isEqualToString:messageEnvelope.source]) {
if (![localNumber isEqualToString:envelope.source]) {
// Sync messages should only come from linked devices.
OWSProdErrorWEnvelope([OWSAnalyticsEvents messageManagerErrorSyncMessageFromUnknownSource], messageEnvelope);
OWSProdErrorWEnvelope([OWSAnalyticsEvents messageManagerErrorSyncMessageFromUnknownSource], envelope);
return;
}
if (syncMessage.hasSent) {
OWSIncomingSentMessageTranscript *transcript =
[[OWSIncomingSentMessageTranscript alloc] initWithProto:syncMessage.sent relay:messageEnvelope.relay];
[[OWSIncomingSentMessageTranscript alloc] initWithProto:syncMessage.sent relay:envelope.relay];
OWSRecordTranscriptJob *recordJob =
[[OWSRecordTranscriptJob alloc] initWithIncomingSentMessageTranscript:transcript
@ -759,13 +784,16 @@ NS_ASSUME_NONNULL_BEGIN
if ([self isDataMessageGroupAvatarUpdate:syncMessage.sent.message]) {
[recordJob runWithAttachmentHandler:^(TSAttachmentStream *attachmentStream) {
TSGroupThread *groupThread =
[TSGroupThread getOrCreateThreadWithGroupIdData:syncMessage.sent.message.group.id];
[groupThread updateAvatarWithAttachmentStream:attachmentStream];
}];
[TSGroupThread getOrCreateThreadWithGroupIdData:syncMessage.sent.message.group.id
transaction:transaction];
[groupThread updateAvatarWithAttachmentStream:attachmentStream transaction:transaction];
}
transaction:transaction];
} else {
[recordJob runWithAttachmentHandler:^(TSAttachmentStream *attachmentStream) {
DDLogDebug(@"%@ successfully fetched transcript attachment: %@", self.tag, attachmentStream);
}];
}
transaction:transaction];
}
} else if (syncMessage.hasRequest) {
if (syncMessage.request.type == OWSSignalServiceProtosSyncMessageRequestTypeContacts) {
@ -801,49 +829,57 @@ NS_ASSUME_NONNULL_BEGIN
DDLogWarn(@"%@ ignoring unsupported sync request message", self.tag);
}
} else if (syncMessage.hasBlocked) {
NSArray<NSString *> *blockedPhoneNumbers = [syncMessage.blocked.numbers copy];
[_blockingManager setBlockedPhoneNumbers:blockedPhoneNumbers sendSyncMessage:NO];
dispatch_async(dispatch_get_main_queue(), ^{
NSArray<NSString *> *blockedPhoneNumbers = [syncMessage.blocked.numbers copy];
[_blockingManager setBlockedPhoneNumbers:blockedPhoneNumbers sendSyncMessage:NO];
});
} else if (syncMessage.read.count > 0) {
DDLogInfo(@"%@ Received %ld read receipt(s)", self.tag, (u_long)syncMessage.read.count);
OWSReadReceiptsProcessor *readReceiptsProcessor =
[[OWSReadReceiptsProcessor alloc] initWithReadReceiptProtos:syncMessage.read
storageManager:self.storageManager];
[readReceiptsProcessor process];
[readReceiptsProcessor processWithTransaction:transaction];
} else if (syncMessage.hasVerified) {
DDLogInfo(@"%@ Received verification state for %@", self.tag, syncMessage.verified.destination);
[self.identityManager processIncomingSyncMessage:syncMessage.verified];
dispatch_async(dispatch_get_main_queue(), ^{
[self.identityManager processIncomingSyncMessage:syncMessage.verified];
});
} else {
DDLogWarn(@"%@ Ignoring unsupported sync message.", self.tag);
}
}
- (void)handleEndSessionMessageWithEnvelope:(OWSSignalServiceProtosEnvelope *)endSessionEnvelope
- (void)handleEndSessionMessageWithEnvelope:(OWSSignalServiceProtosEnvelope *)envelope
dataMessage:(OWSSignalServiceProtosDataMessage *)dataMessage
transaction:(YapDatabaseReadWriteTransaction *)transaction
{
OWSAssert([NSThread isMainThread]);
[self.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) {
TSContactThread *thread =
[TSContactThread getOrCreateThreadWithContactId:endSessionEnvelope.source transaction:transaction];
uint64_t timeStamp = endSessionEnvelope.timestamp;
if (thread) { // TODO thread should always be nonnull.
[[[TSInfoMessage alloc] initWithTimestamp:timeStamp
inThread:thread
messageType:TSInfoMessageTypeSessionDidEnd] saveWithTransaction:transaction];
}
}];
OWSAssert(envelope);
OWSAssert(dataMessage);
OWSAssert(transaction);
TSContactThread *thread = [TSContactThread getOrCreateThreadWithContactId:envelope.source transaction:transaction];
if (thread) { // TODO thread should always be nonnull.
[[[TSInfoMessage alloc] initWithTimestamp:envelope.timestamp
inThread:thread
messageType:TSInfoMessageTypeSessionDidEnd] saveWithTransaction:transaction];
}
dispatch_async([OWSDispatch sessionStoreQueue], ^{
[[TSStorageManager sharedManager] deleteAllSessionsForContact:endSessionEnvelope.source];
[[TSStorageManager sharedManager] deleteAllSessionsForContact:envelope.source];
});
}
- (void)handleExpirationTimerUpdateMessageWithEnvelope:(OWSSignalServiceProtosEnvelope *)envelope
dataMessage:(OWSSignalServiceProtosDataMessage *)dataMessage
transaction:(YapDatabaseReadWriteTransaction *)transaction
{
OWSAssert([NSThread isMainThread]);
TSThread *thread = [self threadForEnvelope:envelope dataMessage:dataMessage];
OWSAssert(envelope);
OWSAssert(dataMessage);
OWSAssert(transaction);
TSThread *thread = [self threadForEnvelope:envelope dataMessage:dataMessage transaction:transaction];
OWSDisappearingMessagesConfiguration *disappearingMessagesConfiguration;
if (dataMessage.hasExpireTimer && dataMessage.expireTimer > 0) {
@ -863,24 +899,25 @@ NS_ASSUME_NONNULL_BEGIN
durationSeconds:OWSDisappearingMessagesConfigurationDefaultExpirationDuration];
}
OWSAssert(disappearingMessagesConfiguration);
[disappearingMessagesConfiguration save];
[disappearingMessagesConfiguration saveWithTransaction:transaction];
NSString *name = [self.contactsManager displayNameForPhoneIdentifier:envelope.source];
OWSDisappearingConfigurationUpdateInfoMessage *message =
[[OWSDisappearingConfigurationUpdateInfoMessage alloc] initWithTimestamp:envelope.timestamp
thread:thread
configuration:disappearingMessagesConfiguration
createdByRemoteName:name];
[message save];
[message saveWithTransaction:transaction];
}
- (void)handleProfileKeyMessageWithEnvelope:(OWSSignalServiceProtosEnvelope *)incomingEnvelope
- (void)handleProfileKeyMessageWithEnvelope:(OWSSignalServiceProtosEnvelope *)envelope
dataMessage:(OWSSignalServiceProtosDataMessage *)dataMessage
{
NSString *recipientId = incomingEnvelope.source;
OWSAssert(envelope);
OWSAssert(dataMessage);
NSString *recipientId = envelope.source;
if (!dataMessage.hasProfileKey) {
OWSFail(@"%@ received profile key message without profile key from: %@",
self.tag,
envelopeAddress(incomingEnvelope));
OWSFail(@"%@ received profile key message without profile key from: %@", self.tag, envelopeAddress(envelope));
return;
}
NSData *profileKey = dataMessage.profileKey;
@ -888,7 +925,7 @@ NS_ASSUME_NONNULL_BEGIN
OWSFail(@"%@ received profile key of unexpected length:%lu from:%@",
self.tag,
(unsigned long)profileKey.length,
envelopeAddress(incomingEnvelope));
envelopeAddress(envelope));
return;
}
@ -896,11 +933,15 @@ NS_ASSUME_NONNULL_BEGIN
[profileManager setProfileKeyData:profileKey forRecipientId:recipientId];
}
- (void)handleReceivedTextMessageWithEnvelope:(OWSSignalServiceProtosEnvelope *)textMessageEnvelope
- (void)handleReceivedTextMessageWithEnvelope:(OWSSignalServiceProtosEnvelope *)envelope
dataMessage:(OWSSignalServiceProtosDataMessage *)dataMessage
transaction:(YapDatabaseReadWriteTransaction *)transaction
{
OWSAssert([NSThread isMainThread]);
[self handleReceivedEnvelope:textMessageEnvelope withDataMessage:dataMessage attachmentIds:@[]];
OWSAssert(envelope);
OWSAssert(dataMessage);
OWSAssert(transaction);
[self handleReceivedEnvelope:envelope withDataMessage:dataMessage attachmentIds:@[] transaction:transaction];
}
- (void)sendGroupUpdateForThread:(TSGroupThread *)gThread message:(TSOutgoingMessage *)message
@ -936,8 +977,11 @@ NS_ASSUME_NONNULL_BEGIN
- (void)handleGroupInfoRequest:(OWSSignalServiceProtosEnvelope *)envelope
dataMessage:(OWSSignalServiceProtosDataMessage *)dataMessage
transaction:(YapDatabaseReadWriteTransaction *)transaction
{
OWSAssert([NSThread isMainThread]);
OWSAssert(envelope);
OWSAssert(dataMessage);
OWSAssert(transaction);
OWSAssert(dataMessage.group.type == OWSSignalServiceProtosGroupContextTypeRequestInfo);
NSData *groupId = dataMessage.hasGroup ? dataMessage.group.id : nil;
@ -948,42 +992,44 @@ NS_ASSUME_NONNULL_BEGIN
DDLogWarn(@"%@ Received 'Request Group Info' message for group: %@ from: %@", self.tag, groupId, envelope.source);
[self.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) {
TSGroupModel *emptyModelToFillOutId =
[[TSGroupModel alloc] initWithTitle:nil memberIds:nil image:nil groupId:dataMessage.group.id];
TSGroupThread *gThread = [TSGroupThread threadWithGroupModel:emptyModelToFillOutId transaction:transaction];
if (!gThread) {
DDLogWarn(@"%@ Unknown group: %@", self.tag, groupId);
return;
}
if (![gThread.groupModel.groupMemberIds containsObject:envelope.source]) {
DDLogWarn(@"%@ Ignoring 'Request Group Info' message for non-member of group. %@ not in %@",
self.tag,
envelope.source,
gThread.groupModel.groupMemberIds);
}
TSGroupModel *emptyModelToFillOutId =
[[TSGroupModel alloc] initWithTitle:nil memberIds:nil image:nil groupId:dataMessage.group.id];
TSGroupThread *gThread = [TSGroupThread threadWithGroupModel:emptyModelToFillOutId transaction:transaction];
if (!gThread) {
DDLogWarn(@"%@ Unknown group: %@", self.tag, groupId);
return;
}
NSString *updateGroupInfo =
[gThread.groupModel getInfoStringAboutUpdateTo:gThread.groupModel contactsManager:self.contactsManager];
TSOutgoingMessage *message = [[TSOutgoingMessage alloc] initWithTimestamp:[NSDate ows_millisecondTimeStamp]
inThread:gThread
groupMetaMessage:TSGroupMessageUpdate];
[message updateWithCustomMessage:updateGroupInfo transaction:transaction];
// Only send this group update to the requester.
[message updateWithSingleGroupRecipient:envelope.source transaction:transaction];
if (![gThread.groupModel.groupMemberIds containsObject:envelope.source]) {
DDLogWarn(@"%@ Ignoring 'Request Group Info' message for non-member of group. %@ not in %@",
self.tag,
envelope.source,
gThread.groupModel.groupMemberIds);
}
dispatch_async(dispatch_get_main_queue(), ^{
[self sendGroupUpdateForThread:gThread message:message];
});
}];
NSString *updateGroupInfo =
[gThread.groupModel getInfoStringAboutUpdateTo:gThread.groupModel contactsManager:self.contactsManager];
TSOutgoingMessage *message = [[TSOutgoingMessage alloc] initWithTimestamp:[NSDate ows_millisecondTimeStamp]
inThread:gThread
groupMetaMessage:TSGroupMessageUpdate];
[message updateWithCustomMessage:updateGroupInfo transaction:transaction];
// Only send this group update to the requester.
[message updateWithSingleGroupRecipient:envelope.source transaction:transaction];
dispatch_async(dispatch_get_main_queue(), ^{
[self sendGroupUpdateForThread:gThread message:message];
});
}
- (TSIncomingMessage *_Nullable)handleReceivedEnvelope:(OWSSignalServiceProtosEnvelope *)envelope
withDataMessage:(OWSSignalServiceProtosDataMessage *)dataMessage
attachmentIds:(NSArray<NSString *> *)attachmentIds
transaction:(YapDatabaseReadWriteTransaction *)transaction
{
OWSAssert([NSThread isMainThread]);
OWSAssert(envelope);
OWSAssert(dataMessage);
OWSAssert(transaction);
uint64_t timestamp = envelope.timestamp;
NSString *body = dataMessage.body;
NSData *groupId = dataMessage.hasGroup ? dataMessage.group.id : nil;
@ -996,152 +1042,152 @@ NS_ASSUME_NONNULL_BEGIN
NSString *localNumber = [TSAccountManager localNumber];
if (dataMessage.group.type == OWSSignalServiceProtosGroupContextTypeRequestInfo) {
[self handleGroupInfoRequest:envelope dataMessage:dataMessage];
[self handleGroupInfoRequest:envelope dataMessage:dataMessage transaction:transaction];
return nil;
}
[self.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) {
if (groupId) {
NSMutableArray *uniqueMemberIds = [[[NSSet setWithArray:dataMessage.group.members] allObjects] mutableCopy];
TSGroupModel *model = [[TSGroupModel alloc] initWithTitle:dataMessage.group.name
memberIds:uniqueMemberIds
image:nil
groupId:dataMessage.group.id];
TSGroupThread *gThread = [TSGroupThread getOrCreateThreadWithGroupModel:model transaction:transaction];
[gThread saveWithTransaction:transaction];
switch (dataMessage.group.type) {
case OWSSignalServiceProtosGroupContextTypeUpdate: {
NSString *updateGroupInfo =
[gThread.groupModel getInfoStringAboutUpdateTo:model contactsManager:self.contactsManager];
gThread.groupModel = model;
[gThread saveWithTransaction:transaction];
[[[TSInfoMessage alloc] initWithTimestamp:timestamp
inThread:gThread
messageType:TSInfoMessageTypeGroupUpdate
customMessage:updateGroupInfo] saveWithTransaction:transaction];
break;
}
case OWSSignalServiceProtosGroupContextTypeQuit: {
NSString *nameString = [self.contactsManager displayNameForPhoneIdentifier:envelope.source];
NSString *updateGroupInfo =
[NSString stringWithFormat:NSLocalizedString(@"GROUP_MEMBER_LEFT", @""), nameString];
NSMutableArray *newGroupMembers = [NSMutableArray arrayWithArray:gThread.groupModel.groupMemberIds];
[newGroupMembers removeObject:envelope.source];
gThread.groupModel.groupMemberIds = newGroupMembers;
[gThread saveWithTransaction:transaction];
[[[TSInfoMessage alloc] initWithTimestamp:timestamp
inThread:gThread
messageType:TSInfoMessageTypeGroupUpdate
customMessage:updateGroupInfo] saveWithTransaction:transaction];
break;
}
case OWSSignalServiceProtosGroupContextTypeDeliver: {
if (body.length == 0 && attachmentIds.count < 1) {
DDLogWarn(@"%@ ignoring empty incoming message from: %@ for group: %@ with timestampe: %lu",
self.tag,
envelopeAddress(envelope),
groupId,
(unsigned long)timestamp);
} else {
DDLogDebug(@"%@ incoming message from: %@ for group: %@ with timestampe: %lu",
self.tag,
envelopeAddress(envelope),
groupId,
(unsigned long)timestamp);
incomingMessage = [[TSIncomingMessage alloc] initWithTimestamp:timestamp
inThread:gThread
authorId:envelope.source
sourceDeviceId:envelope.sourceDevice
messageBody:body
attachmentIds:attachmentIds
expiresInSeconds:dataMessage.expireTimer];
[incomingMessage saveWithTransaction:transaction];
}
break;
}
default: {
DDLogWarn(@"%@ Ignoring unknown group message type:%d", self.tag, (int)dataMessage.group.type);
}
}
thread = gThread;
} else {
if (body.length == 0 && attachmentIds.count < 1) {
DDLogWarn(@"%@ ignoring empty incoming message from: %@ with timestampe: %lu",
self.tag,
envelopeAddress(envelope),
(unsigned long)timestamp);
} else {
DDLogDebug(@"%@ incoming message from: %@ with timestampe: %lu",
self.tag,
envelopeAddress(envelope),
(unsigned long)timestamp);
TSContactThread *cThread = [TSContactThread getOrCreateThreadWithContactId:envelope.source
transaction:transaction
relay:envelope.relay];
incomingMessage = [[TSIncomingMessage alloc] initWithTimestamp:timestamp
inThread:cThread
authorId:[cThread contactIdentifier]
sourceDeviceId:envelope.sourceDevice
messageBody:body
attachmentIds:attachmentIds
expiresInSeconds:dataMessage.expireTimer];
[incomingMessage saveWithTransaction:transaction];
thread = cThread;
}
}
if (thread && incomingMessage) {
// Any messages sent from the current user - from this device or another - should be
// automatically marked as read.
BOOL shouldMarkMessageAsRead = [envelope.source isEqualToString:localNumber];
if (shouldMarkMessageAsRead) {
// Don't send a read receipt for messages sent by ourselves.
[incomingMessage markAsReadWithTransaction:transaction sendReadReceipt:NO updateExpiration:YES];
}
DDLogDebug(@"%@ shouldMarkMessageAsRead: %d (%@)", self.tag, shouldMarkMessageAsRead, envelope.source);
// Other clients allow attachments to be sent along with body, we want the text displayed as a separate
// message
if ([attachmentIds count] > 0 && body != nil && body.length > 0) {
// We want the text to be displayed under the attachment
uint64_t textMessageTimestamp = timestamp + 1;
TSIncomingMessage *textMessage = [[TSIncomingMessage alloc] initWithTimestamp:textMessageTimestamp
inThread:thread
authorId:envelope.source
sourceDeviceId:envelope.sourceDevice
messageBody:body
attachmentIds:@[]
expiresInSeconds:dataMessage.expireTimer];
DDLogDebug(@"%@ incoming extra text message: %@", self.tag, incomingMessage.debugDescription);
[textMessage saveWithTransaction:transaction];
}
}
}];
if (groupId) {
NSMutableArray *uniqueMemberIds = [[[NSSet setWithArray:dataMessage.group.members] allObjects] mutableCopy];
TSGroupModel *model = [[TSGroupModel alloc] initWithTitle:dataMessage.group.name
memberIds:uniqueMemberIds
image:nil
groupId:dataMessage.group.id];
TSGroupThread *gThread = [TSGroupThread getOrCreateThreadWithGroupModel:model transaction:transaction];
[gThread saveWithTransaction:transaction];
switch (dataMessage.group.type) {
case OWSSignalServiceProtosGroupContextTypeUpdate: {
NSString *updateGroupInfo =
[gThread.groupModel getInfoStringAboutUpdateTo:model contactsManager:self.contactsManager];
gThread.groupModel = model;
[gThread saveWithTransaction:transaction];
[[[TSInfoMessage alloc] initWithTimestamp:timestamp
inThread:gThread
messageType:TSInfoMessageTypeGroupUpdate
customMessage:updateGroupInfo] saveWithTransaction:transaction];
break;
}
case OWSSignalServiceProtosGroupContextTypeQuit: {
NSString *nameString = [self.contactsManager displayNameForPhoneIdentifier:envelope.source];
NSString *updateGroupInfo =
[NSString stringWithFormat:NSLocalizedString(@"GROUP_MEMBER_LEFT", @""), nameString];
NSMutableArray *newGroupMembers = [NSMutableArray arrayWithArray:gThread.groupModel.groupMemberIds];
[newGroupMembers removeObject:envelope.source];
gThread.groupModel.groupMemberIds = newGroupMembers;
[gThread saveWithTransaction:transaction];
[[[TSInfoMessage alloc] initWithTimestamp:timestamp
inThread:gThread
messageType:TSInfoMessageTypeGroupUpdate
customMessage:updateGroupInfo] saveWithTransaction:transaction];
break;
}
case OWSSignalServiceProtosGroupContextTypeDeliver: {
if (body.length == 0 && attachmentIds.count < 1) {
DDLogWarn(@"%@ ignoring empty incoming message from: %@ for group: %@ with timestampe: %lu",
self.tag,
envelopeAddress(envelope),
groupId,
(unsigned long)timestamp);
} else {
DDLogDebug(@"%@ incoming message from: %@ for group: %@ with timestampe: %lu",
self.tag,
envelopeAddress(envelope),
groupId,
(unsigned long)timestamp);
incomingMessage = [[TSIncomingMessage alloc] initWithTimestamp:timestamp
inThread:gThread
authorId:envelope.source
sourceDeviceId:envelope.sourceDevice
messageBody:body
attachmentIds:attachmentIds
expiresInSeconds:dataMessage.expireTimer];
[incomingMessage saveWithTransaction:transaction];
}
break;
}
default: {
DDLogWarn(@"%@ Ignoring unknown group message type:%d", self.tag, (int)dataMessage.group.type);
}
}
thread = gThread;
} else {
if (body.length == 0 && attachmentIds.count < 1) {
DDLogWarn(@"%@ ignoring empty incoming message from: %@ with timestampe: %lu",
self.tag,
envelopeAddress(envelope),
(unsigned long)timestamp);
} else {
DDLogDebug(@"%@ incoming message from: %@ with timestampe: %lu",
self.tag,
envelopeAddress(envelope),
(unsigned long)timestamp);
TSContactThread *cThread = [TSContactThread getOrCreateThreadWithContactId:envelope.source
transaction:transaction
relay:envelope.relay];
incomingMessage = [[TSIncomingMessage alloc] initWithTimestamp:timestamp
inThread:cThread
authorId:[cThread contactIdentifier]
sourceDeviceId:envelope.sourceDevice
messageBody:body
attachmentIds:attachmentIds
expiresInSeconds:dataMessage.expireTimer];
[incomingMessage saveWithTransaction:transaction];
thread = cThread;
}
}
if (thread && incomingMessage) {
// In case we already have a read receipt for this new message (happens sometimes).
OWSReadReceiptsProcessor *readReceiptsProcessor =
[[OWSReadReceiptsProcessor alloc] initWithIncomingMessage:incomingMessage
storageManager:self.storageManager];
[readReceiptsProcessor process];
// Any messages sent from the current user - from this device or another - should be
// automatically marked as read.
BOOL shouldMarkMessageAsRead = [envelope.source isEqualToString:localNumber];
if (shouldMarkMessageAsRead) {
// Don't send a read receipt for messages sent by ourselves.
[incomingMessage markAsReadWithTransaction:transaction sendReadReceipt:NO updateExpiration:YES];
}
DDLogDebug(@"%@ shouldMarkMessageAsRead: %d (%@)", self.tag, shouldMarkMessageAsRead, envelope.source);
// Other clients allow attachments to be sent along with body, we want the text displayed as a separate
// message
if ([attachmentIds count] > 0 && body != nil && body.length > 0) {
// We want the text to be displayed under the attachment
uint64_t textMessageTimestamp = timestamp + 1;
TSIncomingMessage *textMessage = [[TSIncomingMessage alloc] initWithTimestamp:textMessageTimestamp
inThread:thread
authorId:envelope.source
sourceDeviceId:envelope.sourceDevice
messageBody:body
attachmentIds:@[]
expiresInSeconds:dataMessage.expireTimer];
DDLogDebug(@"%@ incoming extra text message: %@", self.tag, incomingMessage.debugDescription);
[textMessage saveWithTransaction:transaction];
}
}
[OWSDisappearingMessagesJob becomeConsistentWithConfigurationForMessage:incomingMessage
contactsManager:self.contactsManager];
if (thread && incomingMessage) {
dispatch_async(dispatch_get_main_queue(), ^{
// In case we already have a read receipt for this new message (happens sometimes).
OWSReadReceiptsProcessor *readReceiptsProcessor =
[[OWSReadReceiptsProcessor alloc] initWithIncomingMessage:incomingMessage
storageManager:self.storageManager];
[readReceiptsProcessor process];
// Update thread preview in inbox
[thread touch];
[OWSDisappearingMessagesJob becomeConsistentWithConfigurationForMessage:incomingMessage
contactsManager:self.contactsManager];
[[TextSecureKitEnv sharedEnv].notificationsManager notifyUserForIncomingMessage:incomingMessage
inThread:thread
contactsManager:self.contactsManager];
// Update thread preview in inbox
[thread touch];
[[TextSecureKitEnv sharedEnv].notificationsManager notifyUserForIncomingMessage:incomingMessage
inThread:thread
contactsManager:self.contactsManager];
});
}
return incomingMessage;
@ -1219,11 +1265,16 @@ NSString *envelopeAddress(OWSSignalServiceProtosEnvelope *envelope)
*/
- (TSThread *)threadForEnvelope:(OWSSignalServiceProtosEnvelope *)envelope
dataMessage:(OWSSignalServiceProtosDataMessage *)dataMessage
transaction:(YapDatabaseReadWriteTransaction *)transaction
{
OWSAssert(envelope);
OWSAssert(dataMessage);
OWSAssert(transaction);
if (dataMessage.hasGroup) {
return [TSGroupThread getOrCreateThreadWithGroupIdData:dataMessage.group.id];
return [TSGroupThread getOrCreateThreadWithGroupIdData:dataMessage.group.id transaction:transaction];
} else {
return [TSContactThread getOrCreateThreadWithContactId:envelope.source];
return [TSContactThread getOrCreateThreadWithContactId:envelope.source transaction:transaction];
}
}

@ -21,7 +21,8 @@ NS_ASSUME_NONNULL_BEGIN
*/
- (BOOL)existsMessageWithTimestamp:(uint64_t)timestamp
sourceId:(NSString *)sourceId
sourceDeviceId:(uint32_t)sourceDeviceId;
sourceDeviceId:(uint32_t)sourceDeviceId
transaction:(YapDatabaseReadTransaction *)transaction;
@end

@ -114,6 +114,7 @@ NSString *const OWSIncomingMessageFinderColumnSourceDeviceId = @"OWSIncomingMess
- (BOOL)existsMessageWithTimestamp:(uint64_t)timestamp
sourceId:(NSString *)sourceId
sourceDeviceId:(uint32_t)sourceDeviceId
transaction:(YapDatabaseReadTransaction *)transaction
{
if (![self.database registeredExtension:OWSIncomingMessageFinderExtensionName]) {
OWSFail(@"%@ in %s but extension is not registered", self.tag, __PRETTY_FUNCTION__);
@ -129,13 +130,8 @@ NSString *const OWSIncomingMessageFinderColumnSourceDeviceId = @"OWSIncomingMess
// YapDatabaseQuery params must be objects
YapDatabaseQuery *query = [YapDatabaseQuery queryWithFormat:queryFormat, @(timestamp), sourceId, @(sourceDeviceId)];
__block NSUInteger count;
__block BOOL success;
[self.dbConnection readWithBlock:^(YapDatabaseReadTransaction *_Nonnull transaction) {
success = [[transaction ext:OWSIncomingMessageFinderExtensionName] getNumberOfRows:&count matchingQuery:query];
}];
NSUInteger count;
BOOL success = [[transaction ext:OWSIncomingMessageFinderExtensionName] getNumberOfRows:&count matchingQuery:query];
if (!success) {
OWSFail(@"%@ Could not execute query", self.tag);
return NO;

Loading…
Cancel
Save