mirror of https://github.com/oxen-io/session-ios
parent
bd360262cb
commit
183f0f1ccd
@ -1,42 +0,0 @@
|
||||
#import <Foundation/Foundation.h>
|
||||
#import "CollapsingFutures.h"
|
||||
#import "Queue.h"
|
||||
|
||||
typedef void (^LatestValueCallback)(id latestValue);
|
||||
|
||||
/**
|
||||
*
|
||||
* An ObservableValue represents an asynchronous stream of values, such as 'latest state of toggle' or 'latest sensor
|
||||
* reading'.
|
||||
*
|
||||
*/
|
||||
@interface ObservableValue : NSObject {
|
||||
@protected
|
||||
NSMutableSet *callbacks;
|
||||
@private
|
||||
Queue *queuedActionsToRun;
|
||||
@private
|
||||
bool isRunningActions;
|
||||
@protected
|
||||
bool sealed;
|
||||
}
|
||||
|
||||
@property (readonly, atomic) id currentValue;
|
||||
|
||||
- (void)watchLatestValueOnArbitraryThread:(LatestValueCallback)callback
|
||||
untilCancelled:(TOCCancelToken *)untilCancelledToken;
|
||||
|
||||
- (void)watchLatestValue:(LatestValueCallback)callback
|
||||
onThread:(NSThread *)thread
|
||||
untilCancelled:(TOCCancelToken *)untilCancelledToken;
|
||||
|
||||
@end
|
||||
|
||||
@interface ObservableValueController : ObservableValue
|
||||
|
||||
+ (ObservableValueController *)observableValueControllerWithInitialValue:(id)value;
|
||||
- (void)updateValue:(id)value;
|
||||
- (void)adjustValue:(id (^)(id))adjustment;
|
||||
- (void)sealValue;
|
||||
|
||||
@end
|
@ -1,136 +0,0 @@
|
||||
//
|
||||
// Copyright (c) 2017 Open Whisper Systems. All rights reserved.
|
||||
//
|
||||
|
||||
#import "Environment.h"
|
||||
#import "ObservableValue.h"
|
||||
#import "Util.h"
|
||||
|
||||
@implementation ObservableValue
|
||||
|
||||
@synthesize currentValue;
|
||||
|
||||
- (ObservableValue *)initWithValue:(id)value {
|
||||
callbacks = [NSMutableSet set];
|
||||
queuedActionsToRun = [Queue new];
|
||||
currentValue = value;
|
||||
return self;
|
||||
}
|
||||
|
||||
- (void)watchLatestValueOnArbitraryThread:(LatestValueCallback)callback
|
||||
untilCancelled:(TOCCancelToken *)untilCancelledToken {
|
||||
ows_require(callback != nil);
|
||||
if (untilCancelledToken.isAlreadyCancelled)
|
||||
return;
|
||||
|
||||
void (^callbackCopy)(id value) = [callback copy];
|
||||
[self queueRun:^{
|
||||
callbackCopy(self.currentValue);
|
||||
[callbacks addObject:callbackCopy];
|
||||
}];
|
||||
[untilCancelledToken whenCancelledDo:^{
|
||||
[self queueRun:^{
|
||||
[callbacks removeObject:callbackCopy];
|
||||
}];
|
||||
}];
|
||||
}
|
||||
- (void)watchLatestValue:(LatestValueCallback)callback
|
||||
onThread:(NSThread *)thread
|
||||
untilCancelled:(TOCCancelToken *)untilCancelledToken {
|
||||
ows_require(callback != nil);
|
||||
ows_require(thread != nil);
|
||||
|
||||
void (^callbackCopy)(id value) = [callback copy];
|
||||
void (^threadedCallback)(id value) = ^(id value) {
|
||||
[Operation asyncRun:^{
|
||||
callbackCopy(value);
|
||||
}
|
||||
onThread:thread];
|
||||
};
|
||||
|
||||
[self watchLatestValueOnArbitraryThread:threadedCallback untilCancelled:untilCancelledToken];
|
||||
}
|
||||
|
||||
/// used for avoiding re-entrancy issues (e.g. a callback registering another callback during enumeration)
|
||||
- (void)queueRun:(void (^)())action {
|
||||
@synchronized(self) {
|
||||
if (isRunningActions) {
|
||||
[queuedActionsToRun enqueue:[action copy]];
|
||||
return;
|
||||
}
|
||||
isRunningActions = true;
|
||||
}
|
||||
|
||||
while (true) {
|
||||
@try {
|
||||
action();
|
||||
} @catch (id ex) {
|
||||
DDLogError(@"A queued action failed and may have stalled an ObservableValue.");
|
||||
@synchronized(self) {
|
||||
isRunningActions = false;
|
||||
}
|
||||
[ex raise];
|
||||
}
|
||||
|
||||
@
|
||||
synchronized(self) {
|
||||
action = [queuedActionsToRun tryDequeue];
|
||||
if (action == nil) {
|
||||
isRunningActions = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
- (void)updateValue:(id)value {
|
||||
[self queueRun:^{
|
||||
if (value == currentValue)
|
||||
return;
|
||||
requireState(!sealed);
|
||||
|
||||
currentValue = value;
|
||||
for (void (^callback)(id value) in callbacks) {
|
||||
callback(value);
|
||||
}
|
||||
}];
|
||||
}
|
||||
|
||||
- (void)adjustValue:(id (^)(id))adjustment {
|
||||
ows_require(adjustment != nil);
|
||||
[self queueRun:^{
|
||||
id oldValue = currentValue;
|
||||
id newValue = adjustment(oldValue);
|
||||
if (oldValue == newValue)
|
||||
return;
|
||||
requireState(!sealed);
|
||||
|
||||
currentValue = newValue;
|
||||
for (void (^callback)(id value) in callbacks) {
|
||||
callback(currentValue);
|
||||
}
|
||||
}];
|
||||
}
|
||||
|
||||
@end
|
||||
|
||||
@implementation ObservableValueController
|
||||
|
||||
+ (ObservableValueController *)observableValueControllerWithInitialValue:(id)value {
|
||||
return [[ObservableValueController alloc] initWithValue:value];
|
||||
}
|
||||
|
||||
- (void)updateValue:(id)value {
|
||||
[super updateValue:value];
|
||||
}
|
||||
- (void)adjustValue:(id (^)(id))adjustment {
|
||||
[super adjustValue:adjustment];
|
||||
}
|
||||
- (void)sealValue {
|
||||
[self queueRun:^{
|
||||
sealed = true;
|
||||
callbacks = nil;
|
||||
}];
|
||||
}
|
||||
|
||||
@end
|
@ -1,10 +0,0 @@
|
||||
#import <Foundation/Foundation.h>
|
||||
|
||||
@interface Queue : NSObject
|
||||
- (void)enqueue:(id)item;
|
||||
- (id)dequeue;
|
||||
- (id)tryDequeue;
|
||||
- (id)peek;
|
||||
- (id)peekAt:(NSUInteger)offset;
|
||||
- (NSUInteger)count;
|
||||
@end
|
@ -1,42 +0,0 @@
|
||||
//
|
||||
// Copyright (c) 2017 Open Whisper Systems. All rights reserved.
|
||||
//
|
||||
|
||||
#import "Queue.h"
|
||||
|
||||
@implementation Queue {
|
||||
@private
|
||||
NSMutableArray *items;
|
||||
}
|
||||
- (id)init {
|
||||
if (self = [super init]) {
|
||||
self->items = [NSMutableArray array];
|
||||
}
|
||||
return self;
|
||||
}
|
||||
- (void)enqueue:(id)item {
|
||||
[items addObject:item];
|
||||
}
|
||||
- (id)tryDequeue {
|
||||
if (self.count == 0)
|
||||
return nil;
|
||||
return [self dequeue];
|
||||
}
|
||||
- (id)dequeue {
|
||||
requireState(self.count > 0);
|
||||
id result = items[0];
|
||||
[items removeObjectAtIndex:0];
|
||||
return result;
|
||||
}
|
||||
- (id)peek {
|
||||
requireState(self.count > 0);
|
||||
return items[0];
|
||||
}
|
||||
- (id)peekAt:(NSUInteger)offset {
|
||||
ows_require(offset < self.count);
|
||||
return items[offset];
|
||||
}
|
||||
- (NSUInteger)count {
|
||||
return items.count;
|
||||
}
|
||||
@end
|
@ -1,134 +0,0 @@
|
||||
#import <XCTest/XCTest.h>
|
||||
#import "ObservableValue.h"
|
||||
#import "TestUtil.h"
|
||||
|
||||
@interface ObservableTest : XCTestCase
|
||||
@end
|
||||
|
||||
@implementation ObservableTest
|
||||
|
||||
-(void) testObservableAddRemove {
|
||||
ObservableValueController* s = [ObservableValueController observableValueControllerWithInitialValue:@""];
|
||||
ObservableValue* t = s;
|
||||
NSMutableArray* a = [NSMutableArray array];
|
||||
TOCCancelTokenSource* c = [TOCCancelTokenSource new];
|
||||
|
||||
[t watchLatestValueOnArbitraryThread:^(id value) {[a addObject:value];}
|
||||
untilCancelled:c.token];
|
||||
|
||||
test([a isEqualToArray:@[@""]]);
|
||||
[s updateValue:@5];
|
||||
test([a isEqualToArray:(@[@"", @5])]);
|
||||
[s updateValue:@7];
|
||||
test([a isEqualToArray:(@[@"", @5, @7])]);
|
||||
[c cancel];
|
||||
[s updateValue:@11];
|
||||
test([a isEqualToArray:(@[@"", @5, @7])]);
|
||||
}
|
||||
-(void) testObservableAddAdd {
|
||||
ObservableValueController* s = [ObservableValueController observableValueControllerWithInitialValue:@""];
|
||||
ObservableValue* t = s;
|
||||
NSMutableArray* a = [NSMutableArray array];
|
||||
TOCCancelTokenSource* c = [TOCCancelTokenSource new];
|
||||
|
||||
[t watchLatestValueOnArbitraryThread:^(id value) {[a addObject:value];}
|
||||
untilCancelled:c.token];
|
||||
[t watchLatestValueOnArbitraryThread:^(id value) {[a addObject:value];}
|
||||
untilCancelled:c.token];
|
||||
[t watchLatestValueOnArbitraryThread:^(id value) {[a addObject:value];}
|
||||
untilCancelled:c.token];
|
||||
|
||||
test([a isEqualToArray:(@[@"", @"", @""])]);
|
||||
[s updateValue:@5];
|
||||
test([a isEqualToArray:(@[@"", @"", @"", @5, @5, @5])]);
|
||||
}
|
||||
-(void) testObservableRedundantSetIgnored {
|
||||
id v1 = @"";
|
||||
id v2 = nil;
|
||||
id v3 = @1;
|
||||
|
||||
ObservableValueController* s = [ObservableValueController observableValueControllerWithInitialValue:v1];
|
||||
ObservableValue* t = s;
|
||||
__block id latest = nil;
|
||||
__block int count = 0;
|
||||
[t watchLatestValueOnArbitraryThread:^(id value) {latest = value;count++;}
|
||||
untilCancelled:nil];
|
||||
|
||||
test(latest == v1);
|
||||
test(count == 1);
|
||||
|
||||
[s updateValue:v1];
|
||||
test(latest == v1);
|
||||
test(count == 1);
|
||||
|
||||
[s updateValue:v2];
|
||||
test(latest == v2);
|
||||
test(count == 2);
|
||||
|
||||
[s updateValue:v2];
|
||||
test(latest == v2);
|
||||
test(count == 2);
|
||||
|
||||
[s updateValue:v1];
|
||||
test(latest == v1);
|
||||
test(count == 3);
|
||||
|
||||
[s updateValue:v3];
|
||||
test(latest == v3);
|
||||
test(count == 4);
|
||||
}
|
||||
-(void) testObservableReentrantAdd {
|
||||
ObservableValueController* s = [ObservableValueController observableValueControllerWithInitialValue:@""];
|
||||
ObservableValue* t = s;
|
||||
NSMutableArray* a = [NSMutableArray array];
|
||||
TOCCancelTokenSource* c = [TOCCancelTokenSource new];
|
||||
|
||||
__block void(^registerSelf)() = nil;
|
||||
void(^registerSelf_builder)() = ^{
|
||||
__block bool first = true;
|
||||
[t watchLatestValueOnArbitraryThread:^(id value) {
|
||||
if (!first) registerSelf();
|
||||
first = false;
|
||||
[a addObject:value];
|
||||
} untilCancelled:c.token];
|
||||
};
|
||||
registerSelf = [registerSelf_builder copy];
|
||||
registerSelf();
|
||||
|
||||
// adding during a callback counts as adding after the callback
|
||||
// so we should see a doubling each time
|
||||
test([a isEqualToArray:@[@""]]);
|
||||
[s updateValue:@1];
|
||||
test([a isEqualToArray:(@[@"", @1, @1])]);
|
||||
[s updateValue:@2];
|
||||
test([a isEqualToArray:(@[@"", @1, @1, @2, @2, @2, @2])]);
|
||||
[s updateValue:@3];
|
||||
test([a isEqualToArray:(@[@"", @1, @1, @2, @2, @2, @2, @3, @3, @3, @3, @3, @3, @3, @3])]);
|
||||
}
|
||||
-(void) testObservableReentrantRemove {
|
||||
ObservableValueController* s = [ObservableValueController observableValueControllerWithInitialValue:@""];
|
||||
ObservableValue* t = s;
|
||||
NSMutableArray* a = [NSMutableArray array];
|
||||
TOCCancelTokenSource* c = [TOCCancelTokenSource new];
|
||||
|
||||
for (int i = 0; i < 3; i++) {
|
||||
__block bool first = true;
|
||||
[t watchLatestValueOnArbitraryThread:^(id value) {
|
||||
if (!first) {
|
||||
[c cancel];
|
||||
[a addObject:value];
|
||||
}
|
||||
first = false;
|
||||
} untilCancelled:c.token];
|
||||
}
|
||||
|
||||
// removing during a callback counts as removing after the callback
|
||||
// so we should see all the callbacks run, then they're all cancelled
|
||||
test([a isEqualToArray:(@[])]);
|
||||
[s updateValue:@1];
|
||||
test([a isEqualToArray:(@[@1, @1, @1])]);
|
||||
[s updateValue:@2];
|
||||
test([a isEqualToArray:(@[@1, @1, @1])]);
|
||||
}
|
||||
|
||||
@end
|
Loading…
Reference in New Issue