Decrypt and process messages in batches.

// FREEBIE
pull/1/head
Matthew Chen 8 years ago
parent fa353259c3
commit 023c804a61

@ -7,6 +7,9 @@ NS_ASSUME_NONNULL_BEGIN
@class OWSSignalServiceProtosEnvelope;
@class YapDatabase;
// This class is used to write incoming (decrypted, unprocessed)
// messages to a durable queue and then process them in batches,
// in the order in which they were received.
@interface OWSBatchMessageProcessor : NSObject
+ (instancetype)sharedInstance;

@ -73,7 +73,7 @@ NSString *const OWSBatchMessageProcessingJobFinderExtensionGroup = @"OWSBatchMes
@interface OWSBatchMessageProcessingJobFinder : NSObject
- (nullable OWSBatchMessageProcessingJob *)nextJob;
- (NSArray<OWSBatchMessageProcessingJob *> *)nextJobsForBatchSize:(NSUInteger)maxBatchSize;
- (void)addJobWithEnvelopeData:(NSData *)envelopeData plaintextData:(NSData *_Nullable)plaintextData;
- (void)removeJobWithId:(NSString *)uniqueId;
@ -105,16 +105,36 @@ NSString *const OWSBatchMessageProcessingJobFinderExtensionGroup = @"OWSBatchMes
return self;
}
- (nullable OWSBatchMessageProcessingJob *)nextJob
- (NSArray<OWSBatchMessageProcessingJob *> *)nextJobsForBatchSize:(NSUInteger)maxBatchSize
{
__block OWSBatchMessageProcessingJob *_Nullable job;
NSMutableArray<OWSBatchMessageProcessingJob *> *jobs = [NSMutableArray new];
[self.dbConnection readWithBlock:^(YapDatabaseReadTransaction *_Nonnull transaction) {
YapDatabaseViewTransaction *viewTransaction = [transaction ext:OWSBatchMessageProcessingJobFinderExtensionName];
OWSAssert(viewTransaction != nil);
job = [viewTransaction firstObjectInGroup:OWSBatchMessageProcessingJobFinderExtensionGroup];
NSMutableArray<NSString *> *jobIds = [NSMutableArray new];
[viewTransaction enumerateKeysInGroup:OWSBatchMessageProcessingJobFinderExtensionGroup
usingBlock:^(NSString *_Nonnull collection,
NSString *_Nonnull key,
NSUInteger index,
BOOL *_Nonnull stop) {
[jobIds addObject:key];
if (jobIds.count >= maxBatchSize) {
*stop = YES;
}
}];
for (NSString *jobId in jobIds) {
OWSBatchMessageProcessingJob *_Nullable job =
[OWSBatchMessageProcessingJob fetchObjectWithUniqueID:jobId transaction:transaction];
if (job) {
[jobs addObject:job];
} else {
OWSFail(@"Could not load job: %@", jobId);
}
}
}];
return job;
return jobs;
}
- (void)addJobWithEnvelopeData:(NSData *)envelopeData plaintextData:(NSData *_Nullable)plaintextData
@ -279,32 +299,39 @@ NSString *const OWSBatchMessageProcessingJobFinderExtensionGroup = @"OWSBatchMes
{
AssertIsOnMainThread();
OWSBatchMessageProcessingJob *_Nullable job = [self.finder nextJob];
if (job == nil) {
const NSUInteger kMaxBatchSize = 16;
NSArray<OWSBatchMessageProcessingJob *> *jobs = [self.finder nextJobsForBatchSize:kMaxBatchSize];
OWSAssert(jobs);
if (jobs.count < 1) {
self.isDrainingQueue = NO;
DDLogVerbose(@"%@ Queue is drained", self.tag);
return;
}
[self processJob:job
completion:^{
dispatch_async(dispatch_get_main_queue(), ^{
DDLogVerbose(@"%@ completed job. %lu jobs left.",
self.tag,
(unsigned long)[OWSBatchMessageProcessingJob numberOfKeysInCollection]);
[self.finder removeJobWithId:job.uniqueId];
[self drainQueueWorkStep];
});
}];
[self processJobs:jobs
completion:^{
dispatch_async(dispatch_get_main_queue(), ^{
for (OWSBatchMessageProcessingJob *job in jobs) {
[self.finder removeJobWithId:job.uniqueId];
}
DDLogVerbose(@"%@ completed %zd jobs. %zd jobs left.",
self.tag,
jobs.count,
[OWSBatchMessageProcessingJob numberOfKeysInCollection]);
[self drainQueueWorkStep];
});
}];
}
- (void)processJob:(OWSBatchMessageProcessingJob *)job completion:(void (^)())completion
- (void)processJobs:(NSArray<OWSBatchMessageProcessingJob *> *)jobs completion:(void (^)())completion
{
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
[self.dbReadWriteConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) {
[self.messagesManager processEnvelope:job.envelopeProto
plaintextData:job.plaintextData
transaction:transaction];
for (OWSBatchMessageProcessingJob *job in jobs) {
[self.messagesManager processEnvelope:job.envelopeProto
plaintextData:job.plaintextData
transaction:transaction];
}
}];
completion();
});

@ -7,6 +7,10 @@ NS_ASSUME_NONNULL_BEGIN
@class OWSSignalServiceProtosEnvelope;
@class YapDatabase;
// This class is used to write incoming (encrypted, unprocessed)
// messages to a durable queue and then decrypt them in the order
// in which they were received. Successfully decrypted messages
// are forwarded to OWSBatchMessageProcessor.
@interface OWSMessageReceiver : NSObject
+ (instancetype)sharedInstance;

@ -71,7 +71,7 @@ NSString *const OWSMessageProcessingJobFinderExtensionGroup = @"OWSMessageProces
@interface OWSMessageProcessingJobFinder : NSObject
- (nullable OWSMessageProcessingJob *)nextJob;
- (NSArray<OWSMessageProcessingJob *> *)nextJobsForBatchSize:(NSUInteger)maxBatchSize;
- (void)addJobForEnvelope:(OWSSignalServiceProtosEnvelope *)envelope;
- (void)removeJobWithId:(NSString *)uniqueId;
@ -103,16 +103,37 @@ NSString *const OWSMessageProcessingJobFinderExtensionGroup = @"OWSMessageProces
return self;
}
- (nullable OWSMessageProcessingJob *)nextJob
- (NSArray<OWSMessageProcessingJob *> *)nextJobsForBatchSize:(NSUInteger)maxBatchSize
{
__block OWSMessageProcessingJob *_Nullable job;
NSMutableArray<OWSMessageProcessingJob *> *jobs = [NSMutableArray new];
[self.dbConnection readWithBlock:^(YapDatabaseReadTransaction *_Nonnull transaction) {
YapDatabaseViewTransaction *viewTransaction = [transaction ext:OWSMessageProcessingJobFinderExtensionName];
OWSAssert(viewTransaction != nil);
job = [viewTransaction firstObjectInGroup:OWSMessageProcessingJobFinderExtensionGroup];
NSMutableArray<NSString *> *jobIds = [NSMutableArray new];
[viewTransaction enumerateKeysInGroup:OWSMessageProcessingJobFinderExtensionGroup
usingBlock:^(NSString *_Nonnull collection,
NSString *_Nonnull key,
NSUInteger index,
BOOL *_Nonnull stop) {
DDLogVerbose(@"key: %@", key);
[jobIds addObject:key];
if (jobIds.count >= maxBatchSize) {
*stop = YES;
}
}];
for (NSString *jobId in jobIds) {
OWSMessageProcessingJob *_Nullable job =
[OWSMessageProcessingJob fetchObjectWithUniqueID:jobId transaction:transaction];
if (job) {
[jobs addObject:job];
} else {
OWSFail(@"Could not load job: %@", jobId);
}
}
}];
return job;
return jobs;
}
- (void)addJobForEnvelope:(OWSSignalServiceProtosEnvelope *)envelope
@ -274,35 +295,74 @@ NSString *const OWSMessageProcessingJobFinderExtensionGroup = @"OWSMessageProces
{
AssertIsOnMainThread();
OWSMessageProcessingJob *_Nullable job = [self.finder nextJob];
if (job == nil) {
const NSUInteger kMaxBatchSize = 16;
NSArray<OWSMessageProcessingJob *> *jobs = [self.finder nextJobsForBatchSize:kMaxBatchSize];
OWSAssert(jobs);
if (jobs.count < 1) {
self.isDrainingQueue = NO;
DDLogVerbose(@"%@ Queue is drained", self.tag);
DDLogVerbose(@"%@ Queue is drained.", self.tag);
return;
}
[self processJob:job
completion:^{
dispatch_async(dispatch_get_main_queue(), ^{
DDLogVerbose(@"%@ completed job. %lu jobs left.",
self.tag,
(unsigned long)[OWSMessageProcessingJob numberOfKeysInCollection]);
[self.finder removeJobWithId:job.uniqueId];
[self drainQueueWorkStep];
});
}];
[self processJobs:jobs
completion:^{
dispatch_async(dispatch_get_main_queue(), ^{
for (OWSMessageProcessingJob *job in jobs) {
[self.finder removeJobWithId:job.uniqueId];
}
DDLogVerbose(@"%@ completed %zd jobs. %zd jobs left.",
self.tag,
jobs.count,
[OWSMessageProcessingJob numberOfKeysInCollection]);
[self drainQueueWorkStep];
});
}];
}
- (void)processJobs:(NSArray<OWSMessageProcessingJob *> *)jobs completion:(void (^)())completion
{
[self processJobs:jobs
unprocessedJobs:[jobs mutableCopy]
plaintextDataMap:[NSMutableDictionary new]
completion:completion];
}
- (void)processJob:(OWSMessageProcessingJob *)job completion:(void (^)())completion
- (void)processJobs:(NSArray<OWSMessageProcessingJob *> *)jobs
unprocessedJobs:(NSMutableArray<OWSMessageProcessingJob *> *)unprocessedJobs
plaintextDataMap:(NSMutableDictionary<NSString *, NSData *> *)plaintextDataMap
completion:(void (^)())completion
{
OWSAssert(jobs.count > 0);
OWSAssert(unprocessedJobs.count <= jobs.count);
if (unprocessedJobs.count < 1) {
for (OWSMessageProcessingJob *job in jobs) {
NSData *_Nullable plaintextData = plaintextDataMap[job.uniqueId];
[self.batchMessageProcessor enqueueEnvelopeData:job.envelopeData plaintextData:plaintextData];
}
completion();
return;
}
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
OWSAssert(unprocessedJobs.count > 0);
OWSMessageProcessingJob *job = unprocessedJobs.firstObject;
[unprocessedJobs removeObjectAtIndex:0];
[self.messagesManager decryptEnvelope:job.envelopeProto
successBlock:^(NSData *_Nullable plaintextData) {
[self.batchMessageProcessor enqueueEnvelopeData:job.envelopeData plaintextData:plaintextData];
completion();
if (plaintextData) {
plaintextDataMap[job.uniqueId] = plaintextData;
}
[self processJobs:jobs
unprocessedJobs:unprocessedJobs
plaintextDataMap:plaintextDataMap
completion:completion];
}
failureBlock:^{
completion();
[self processJobs:jobs
unprocessedJobs:unprocessedJobs
plaintextDataMap:plaintextDataMap
completion:completion];
}];
});
}

Loading…
Cancel
Save