diff --git a/SignalServiceKit/src/Messages/OWSBatchMessageProcessor.h b/SignalServiceKit/src/Messages/OWSBatchMessageProcessor.h index 66009908c..220e493bc 100644 --- a/SignalServiceKit/src/Messages/OWSBatchMessageProcessor.h +++ b/SignalServiceKit/src/Messages/OWSBatchMessageProcessor.h @@ -7,6 +7,9 @@ NS_ASSUME_NONNULL_BEGIN @class OWSSignalServiceProtosEnvelope; @class YapDatabase; +// This class is used to write incoming (decrypted, unprocessed) +// messages to a durable queue and then process them in batches, +// in the order in which they were received. @interface OWSBatchMessageProcessor : NSObject + (instancetype)sharedInstance; diff --git a/SignalServiceKit/src/Messages/OWSBatchMessageProcessor.m b/SignalServiceKit/src/Messages/OWSBatchMessageProcessor.m index f4109a3f2..176bde594 100644 --- a/SignalServiceKit/src/Messages/OWSBatchMessageProcessor.m +++ b/SignalServiceKit/src/Messages/OWSBatchMessageProcessor.m @@ -73,7 +73,7 @@ NSString *const OWSBatchMessageProcessingJobFinderExtensionGroup = @"OWSBatchMes @interface OWSBatchMessageProcessingJobFinder : NSObject -- (nullable OWSBatchMessageProcessingJob *)nextJob; +- (NSArray *)nextJobsForBatchSize:(NSUInteger)maxBatchSize; - (void)addJobWithEnvelopeData:(NSData *)envelopeData plaintextData:(NSData *_Nullable)plaintextData; - (void)removeJobWithId:(NSString *)uniqueId; @@ -105,16 +105,36 @@ NSString *const OWSBatchMessageProcessingJobFinderExtensionGroup = @"OWSBatchMes return self; } -- (nullable OWSBatchMessageProcessingJob *)nextJob +- (NSArray *)nextJobsForBatchSize:(NSUInteger)maxBatchSize { - __block OWSBatchMessageProcessingJob *_Nullable job; + NSMutableArray *jobs = [NSMutableArray new]; [self.dbConnection readWithBlock:^(YapDatabaseReadTransaction *_Nonnull transaction) { YapDatabaseViewTransaction *viewTransaction = [transaction ext:OWSBatchMessageProcessingJobFinderExtensionName]; OWSAssert(viewTransaction != nil); - job = [viewTransaction firstObjectInGroup:OWSBatchMessageProcessingJobFinderExtensionGroup]; + NSMutableArray *jobIds = [NSMutableArray new]; + [viewTransaction enumerateKeysInGroup:OWSBatchMessageProcessingJobFinderExtensionGroup + usingBlock:^(NSString *_Nonnull collection, + NSString *_Nonnull key, + NSUInteger index, + BOOL *_Nonnull stop) { + [jobIds addObject:key]; + if (jobIds.count >= maxBatchSize) { + *stop = YES; + } + }]; + + for (NSString *jobId in jobIds) { + OWSBatchMessageProcessingJob *_Nullable job = + [OWSBatchMessageProcessingJob fetchObjectWithUniqueID:jobId transaction:transaction]; + if (job) { + [jobs addObject:job]; + } else { + OWSFail(@"Could not load job: %@", jobId); + } + } }]; - return job; + return jobs; } - (void)addJobWithEnvelopeData:(NSData *)envelopeData plaintextData:(NSData *_Nullable)plaintextData @@ -279,32 +299,39 @@ NSString *const OWSBatchMessageProcessingJobFinderExtensionGroup = @"OWSBatchMes { AssertIsOnMainThread(); - OWSBatchMessageProcessingJob *_Nullable job = [self.finder nextJob]; - if (job == nil) { + const NSUInteger kMaxBatchSize = 16; + NSArray *jobs = [self.finder nextJobsForBatchSize:kMaxBatchSize]; + OWSAssert(jobs); + if (jobs.count < 1) { self.isDrainingQueue = NO; DDLogVerbose(@"%@ Queue is drained", self.tag); return; } - [self processJob:job - completion:^{ - dispatch_async(dispatch_get_main_queue(), ^{ - DDLogVerbose(@"%@ completed job. %lu jobs left.", - self.tag, - (unsigned long)[OWSBatchMessageProcessingJob numberOfKeysInCollection]); - [self.finder removeJobWithId:job.uniqueId]; - [self drainQueueWorkStep]; - }); - }]; + [self processJobs:jobs + completion:^{ + dispatch_async(dispatch_get_main_queue(), ^{ + for (OWSBatchMessageProcessingJob *job in jobs) { + [self.finder removeJobWithId:job.uniqueId]; + } + DDLogVerbose(@"%@ completed %zd jobs. %zd jobs left.", + self.tag, + jobs.count, + [OWSBatchMessageProcessingJob numberOfKeysInCollection]); + [self drainQueueWorkStep]; + }); + }]; } -- (void)processJob:(OWSBatchMessageProcessingJob *)job completion:(void (^)())completion +- (void)processJobs:(NSArray *)jobs completion:(void (^)())completion { dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{ [self.dbReadWriteConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) { - [self.messagesManager processEnvelope:job.envelopeProto - plaintextData:job.plaintextData - transaction:transaction]; + for (OWSBatchMessageProcessingJob *job in jobs) { + [self.messagesManager processEnvelope:job.envelopeProto + plaintextData:job.plaintextData + transaction:transaction]; + } }]; completion(); }); diff --git a/SignalServiceKit/src/Messages/OWSMessageReceiver.h b/SignalServiceKit/src/Messages/OWSMessageReceiver.h index 5af585219..c73d500c8 100644 --- a/SignalServiceKit/src/Messages/OWSMessageReceiver.h +++ b/SignalServiceKit/src/Messages/OWSMessageReceiver.h @@ -7,6 +7,10 @@ NS_ASSUME_NONNULL_BEGIN @class OWSSignalServiceProtosEnvelope; @class YapDatabase; +// This class is used to write incoming (encrypted, unprocessed) +// messages to a durable queue and then decrypt them in the order +// in which they were received. Successfully decrypted messages +// are forwarded to OWSBatchMessageProcessor. @interface OWSMessageReceiver : NSObject + (instancetype)sharedInstance; diff --git a/SignalServiceKit/src/Messages/OWSMessageReceiver.m b/SignalServiceKit/src/Messages/OWSMessageReceiver.m index 4f896d9de..7a79747da 100644 --- a/SignalServiceKit/src/Messages/OWSMessageReceiver.m +++ b/SignalServiceKit/src/Messages/OWSMessageReceiver.m @@ -71,7 +71,7 @@ NSString *const OWSMessageProcessingJobFinderExtensionGroup = @"OWSMessageProces @interface OWSMessageProcessingJobFinder : NSObject -- (nullable OWSMessageProcessingJob *)nextJob; +- (NSArray *)nextJobsForBatchSize:(NSUInteger)maxBatchSize; - (void)addJobForEnvelope:(OWSSignalServiceProtosEnvelope *)envelope; - (void)removeJobWithId:(NSString *)uniqueId; @@ -103,16 +103,37 @@ NSString *const OWSMessageProcessingJobFinderExtensionGroup = @"OWSMessageProces return self; } -- (nullable OWSMessageProcessingJob *)nextJob +- (NSArray *)nextJobsForBatchSize:(NSUInteger)maxBatchSize { - __block OWSMessageProcessingJob *_Nullable job; + NSMutableArray *jobs = [NSMutableArray new]; [self.dbConnection readWithBlock:^(YapDatabaseReadTransaction *_Nonnull transaction) { YapDatabaseViewTransaction *viewTransaction = [transaction ext:OWSMessageProcessingJobFinderExtensionName]; OWSAssert(viewTransaction != nil); - job = [viewTransaction firstObjectInGroup:OWSMessageProcessingJobFinderExtensionGroup]; + NSMutableArray *jobIds = [NSMutableArray new]; + [viewTransaction enumerateKeysInGroup:OWSMessageProcessingJobFinderExtensionGroup + usingBlock:^(NSString *_Nonnull collection, + NSString *_Nonnull key, + NSUInteger index, + BOOL *_Nonnull stop) { + DDLogVerbose(@"key: %@", key); + [jobIds addObject:key]; + if (jobIds.count >= maxBatchSize) { + *stop = YES; + } + }]; + + for (NSString *jobId in jobIds) { + OWSMessageProcessingJob *_Nullable job = + [OWSMessageProcessingJob fetchObjectWithUniqueID:jobId transaction:transaction]; + if (job) { + [jobs addObject:job]; + } else { + OWSFail(@"Could not load job: %@", jobId); + } + } }]; - return job; + return jobs; } - (void)addJobForEnvelope:(OWSSignalServiceProtosEnvelope *)envelope @@ -274,35 +295,74 @@ NSString *const OWSMessageProcessingJobFinderExtensionGroup = @"OWSMessageProces { AssertIsOnMainThread(); - OWSMessageProcessingJob *_Nullable job = [self.finder nextJob]; - if (job == nil) { + const NSUInteger kMaxBatchSize = 16; + NSArray *jobs = [self.finder nextJobsForBatchSize:kMaxBatchSize]; + OWSAssert(jobs); + if (jobs.count < 1) { self.isDrainingQueue = NO; - DDLogVerbose(@"%@ Queue is drained", self.tag); + DDLogVerbose(@"%@ Queue is drained.", self.tag); return; } - [self processJob:job - completion:^{ - dispatch_async(dispatch_get_main_queue(), ^{ - DDLogVerbose(@"%@ completed job. %lu jobs left.", - self.tag, - (unsigned long)[OWSMessageProcessingJob numberOfKeysInCollection]); - [self.finder removeJobWithId:job.uniqueId]; - [self drainQueueWorkStep]; - }); - }]; + [self processJobs:jobs + completion:^{ + dispatch_async(dispatch_get_main_queue(), ^{ + for (OWSMessageProcessingJob *job in jobs) { + [self.finder removeJobWithId:job.uniqueId]; + } + DDLogVerbose(@"%@ completed %zd jobs. %zd jobs left.", + self.tag, + jobs.count, + [OWSMessageProcessingJob numberOfKeysInCollection]); + [self drainQueueWorkStep]; + }); + }]; +} + +- (void)processJobs:(NSArray *)jobs completion:(void (^)())completion +{ + [self processJobs:jobs + unprocessedJobs:[jobs mutableCopy] + plaintextDataMap:[NSMutableDictionary new] + completion:completion]; } -- (void)processJob:(OWSMessageProcessingJob *)job completion:(void (^)())completion +- (void)processJobs:(NSArray *)jobs + unprocessedJobs:(NSMutableArray *)unprocessedJobs + plaintextDataMap:(NSMutableDictionary *)plaintextDataMap + completion:(void (^)())completion { + OWSAssert(jobs.count > 0); + OWSAssert(unprocessedJobs.count <= jobs.count); + + if (unprocessedJobs.count < 1) { + for (OWSMessageProcessingJob *job in jobs) { + NSData *_Nullable plaintextData = plaintextDataMap[job.uniqueId]; + [self.batchMessageProcessor enqueueEnvelopeData:job.envelopeData plaintextData:plaintextData]; + } + completion(); + return; + } + dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{ + OWSAssert(unprocessedJobs.count > 0); + OWSMessageProcessingJob *job = unprocessedJobs.firstObject; + [unprocessedJobs removeObjectAtIndex:0]; [self.messagesManager decryptEnvelope:job.envelopeProto successBlock:^(NSData *_Nullable plaintextData) { - [self.batchMessageProcessor enqueueEnvelopeData:job.envelopeData plaintextData:plaintextData]; - completion(); + if (plaintextData) { + plaintextDataMap[job.uniqueId] = plaintextData; + } + [self processJobs:jobs + unprocessedJobs:unprocessedJobs + plaintextDataMap:plaintextDataMap + completion:completion]; } failureBlock:^{ - completion(); + [self processJobs:jobs + unprocessedJobs:unprocessedJobs + plaintextDataMap:plaintextDataMap + completion:completion]; }]; }); }