Batch updates, and fixing bug where daemon would crash when accessing IMCore stuff from different threads
This commit is contained in:
@@ -18,13 +18,13 @@ NS_ASSUME_NONNULL_BEGIN
|
||||
- (NSDictionary *)dictionaryRepresentation;
|
||||
@end
|
||||
|
||||
typedef void (^MBIMUpdateConsumer)(MBIMUpdateItem *item);
|
||||
typedef void (^MBIMUpdateConsumer)(NSArray<MBIMUpdateItem *> *items);
|
||||
|
||||
@interface MBIMUpdateQueue : NSObject
|
||||
|
||||
+ (instancetype)sharedInstance;
|
||||
|
||||
- (void)addConsumer:(MBIMUpdateConsumer)consumer;
|
||||
- (void)addConsumer:(MBIMUpdateConsumer)consumer withLastSyncedMessageSeq:(NSInteger)messageSeq;
|
||||
- (void)removeConsumer:(MBIMUpdateConsumer)consumer;
|
||||
|
||||
- (void)enqueueUpdateItem:(MBIMUpdateItem *)item;
|
||||
|
||||
@@ -10,6 +10,8 @@
|
||||
#import "IMMessageItem+Encoded.h"
|
||||
#import "IMChat+Encoded.h"
|
||||
|
||||
static const NSUInteger kUpdateItemsCullingLength = 100;
|
||||
|
||||
@interface MBIMUpdateItem (/*INTERNAL*/)
|
||||
@property (nonatomic, assign) NSUInteger messageSequenceNumber;
|
||||
@end
|
||||
@@ -18,6 +20,9 @@
|
||||
NSUInteger _messageSequenceNumber;
|
||||
dispatch_queue_t _accessQueue;
|
||||
NSMutableArray *_consumers;
|
||||
|
||||
// Maps message sequence number to update item
|
||||
NSMutableDictionary<NSNumber *, MBIMUpdateItem *> *_updateItemHistory;
|
||||
}
|
||||
|
||||
+ (instancetype)sharedInstance
|
||||
@@ -38,16 +43,30 @@
|
||||
_accessQueue = dispatch_queue_create("net.buzzert.MBIMUpdateQueue", DISPATCH_QUEUE_SERIAL);
|
||||
_consumers = [[NSMutableArray alloc] init];
|
||||
_messageSequenceNumber = 0;
|
||||
_updateItemHistory = [[NSMutableDictionary alloc] init];
|
||||
}
|
||||
|
||||
return self;
|
||||
}
|
||||
|
||||
- (void)addConsumer:(MBIMUpdateConsumer)consumer
|
||||
- (void)addConsumer:(MBIMUpdateConsumer)consumer withLastSyncedMessageSeq:(NSInteger)messageSeq
|
||||
{
|
||||
__weak NSMutableArray *consumers = _consumers;
|
||||
__weak NSMutableDictionary *updateItemHistory = _updateItemHistory;
|
||||
dispatch_async(_accessQueue, ^{
|
||||
[consumers addObject:consumer];
|
||||
if ((messageSeq >= 0) && messageSeq < self->_messageSequenceNumber) {
|
||||
NSMutableArray *batchedUpdates = [NSMutableArray array];
|
||||
for (NSUInteger seq = messageSeq + 1; seq <= self->_messageSequenceNumber; seq++) {
|
||||
MBIMUpdateItem *item = [updateItemHistory objectForKey:@(seq)];
|
||||
if (item) {
|
||||
[batchedUpdates addObject:item];
|
||||
}
|
||||
}
|
||||
|
||||
consumer(batchedUpdates);
|
||||
} else {
|
||||
[consumers addObject:consumer];
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -61,16 +80,36 @@
|
||||
|
||||
- (void)enqueueUpdateItem:(MBIMUpdateItem *)item
|
||||
{
|
||||
_messageSequenceNumber++;
|
||||
item.messageSequenceNumber = _messageSequenceNumber;
|
||||
|
||||
__weak NSMutableArray *consumers = _consumers;
|
||||
__weak NSMutableDictionary *updateItemHistory = _updateItemHistory;
|
||||
dispatch_async(_accessQueue, ^{
|
||||
self->_messageSequenceNumber++;
|
||||
item.messageSequenceNumber = self->_messageSequenceNumber;
|
||||
|
||||
for (MBIMUpdateConsumer consumer in consumers) {
|
||||
consumer(item);
|
||||
consumer(@[ item ]);
|
||||
}
|
||||
|
||||
[consumers removeAllObjects];
|
||||
[updateItemHistory setObject:item forKey:@(item.messageSequenceNumber)];
|
||||
|
||||
[self _cullUpdateItems];
|
||||
});
|
||||
}
|
||||
|
||||
- (void)_cullUpdateItems
|
||||
{
|
||||
__weak NSMutableDictionary *updateItemHistory = _updateItemHistory;
|
||||
dispatch_async(_accessQueue, ^{
|
||||
if ([updateItemHistory count] > kUpdateItemsCullingLength) {
|
||||
NSArray *sortedKeys = [[updateItemHistory allKeys] sortedArrayUsingSelector:@selector(compare:)];
|
||||
for (NSValue *key in sortedKeys) {
|
||||
[updateItemHistory removeObjectForKey:key];
|
||||
if ([updateItemHistory count] <= kUpdateItemsCullingLength) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -80,7 +119,7 @@
|
||||
- (NSDictionary *)dictionaryRepresentation
|
||||
{
|
||||
NSMutableDictionary *updateDict = [NSMutableDictionary dictionary];
|
||||
updateDict[@"messageSequenceNumber"] = @(_messageSequenceNumber);
|
||||
updateDict[@"messageSequenceNumber"] = @(self.messageSequenceNumber);
|
||||
|
||||
if ([self changedChat]) {
|
||||
updateDict[@"conversation"] = [[self changedChat] mbim_dictionaryRepresentation];
|
||||
|
||||
@@ -23,11 +23,16 @@ typedef void (^MBIMBridgeOperationCompletionBlock)(NSObject<HTTPResponse> * _Nul
|
||||
@property (nonatomic, readonly) NSURL *requestURL;
|
||||
@property (nonatomic, readonly) MBIMBridgeOperationCompletionBlock serverCompletionBlock;
|
||||
|
||||
+ (dispatch_queue_t)sharedIMAccessQueue;
|
||||
|
||||
+ (nullable Class)operationClassForEndpointName:(NSString *)endpointName;
|
||||
- (instancetype)initWithRequestURL:(NSURL *)requestURL completion:(MBIMBridgeOperationCompletionBlock)completionBlock;
|
||||
|
||||
- (NSObject<HTTPResponse> *)cancelAndReturnTimeoutResponse;
|
||||
|
||||
// convenience
|
||||
- (nullable NSString *)valueForQueryItemWithName:(NSString *)queryItemName;
|
||||
|
||||
@end
|
||||
|
||||
NS_ASSUME_NONNULL_END
|
||||
|
||||
@@ -32,6 +32,17 @@
|
||||
return operationClassMapping;
|
||||
}
|
||||
|
||||
+ (dispatch_queue_t)sharedIMAccessQueue
|
||||
{
|
||||
static dispatch_once_t onceToken;
|
||||
static dispatch_queue_t accessQueue = nil;
|
||||
dispatch_once(&onceToken, ^{
|
||||
accessQueue = dispatch_queue_create("IMAccessQueue", DISPATCH_QUEUE_SERIAL);
|
||||
});
|
||||
|
||||
return accessQueue;
|
||||
}
|
||||
|
||||
+ (void)load
|
||||
{
|
||||
if ([self class] != [MBIMBridgeOperation class]) {
|
||||
@@ -61,4 +72,20 @@
|
||||
return [[HTTPErrorResponse alloc] initWithErrorCode:500];
|
||||
}
|
||||
|
||||
- (NSString *)valueForQueryItemWithName:(NSString *)queryItemName
|
||||
{
|
||||
NSURLComponents *urlComponents = [NSURLComponents componentsWithURL:self.requestURL
|
||||
resolvingAgainstBaseURL:NO];
|
||||
|
||||
NSString *value = nil;
|
||||
for (NSURLQueryItem *queryItem in [urlComponents queryItems]) {
|
||||
if ([[queryItem name] isEqualToString:queryItemName]) {
|
||||
value = [queryItem value];
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return value;
|
||||
}
|
||||
|
||||
@end
|
||||
|
||||
@@ -23,13 +23,15 @@
|
||||
|
||||
- (void)main
|
||||
{
|
||||
NSArray<IMChat *> *chats = [sChatRegistry allExistingChats];
|
||||
__block NSMutableArray *conversations = [NSMutableArray array];
|
||||
|
||||
NSMutableArray *conversations = [NSMutableArray array];
|
||||
for (IMChat *chat in chats) {
|
||||
NSDictionary *chatDict = [chat mbim_dictionaryRepresentation];
|
||||
[conversations addObject:chatDict];
|
||||
}
|
||||
dispatch_sync([[self class] sharedIMAccessQueue], ^{
|
||||
NSArray<IMChat *> *chats = [sChatRegistry allExistingChats];
|
||||
for (IMChat *chat in chats) {
|
||||
NSDictionary *chatDict = [chat mbim_dictionaryRepresentation];
|
||||
[conversations addObject:chatDict];
|
||||
}
|
||||
});
|
||||
|
||||
MBIMJSONDataResponse *response = [MBIMJSONDataResponse responseWithJSONObject:conversations];
|
||||
self.serverCompletionBlock(response);
|
||||
|
||||
@@ -24,15 +24,7 @@
|
||||
{
|
||||
NSObject<HTTPResponse> *response = nil;
|
||||
do {
|
||||
NSURLComponents *urlComponents = [NSURLComponents componentsWithURL:self.requestURL resolvingAgainstBaseURL:NO];
|
||||
|
||||
NSString *guid = nil;
|
||||
for (NSURLQueryItem *queryItem in [urlComponents queryItems]) {
|
||||
if ([[queryItem name] isEqualToString:@"guid"]) {
|
||||
guid = [queryItem value];
|
||||
break;
|
||||
}
|
||||
}
|
||||
NSString *guid = [self valueForQueryItemWithName:@"guid"];
|
||||
|
||||
if (!guid) {
|
||||
NSLog(@"No query item provided");
|
||||
|
||||
@@ -20,17 +20,9 @@
|
||||
|
||||
- (void)main
|
||||
{
|
||||
NSObject<HTTPResponse> *response = nil;
|
||||
__block NSObject<HTTPResponse> *response = nil;
|
||||
do {
|
||||
NSURLComponents *urlComponents = [NSURLComponents componentsWithURL:self.requestURL resolvingAgainstBaseURL:NO];
|
||||
|
||||
NSString *guid = nil;
|
||||
for (NSURLQueryItem *queryItem in [urlComponents queryItems]) {
|
||||
if ([[queryItem name] isEqualToString:@"guid"]) {
|
||||
guid = [queryItem value];
|
||||
break;
|
||||
}
|
||||
}
|
||||
NSString *guid = [self valueForQueryItemWithName:@"guid"];
|
||||
|
||||
if (!guid) {
|
||||
NSLog(@"No query item provided");
|
||||
@@ -38,18 +30,19 @@
|
||||
break;
|
||||
}
|
||||
|
||||
IMChat *chat = [sChatRegistry existingChatWithGUID:guid];
|
||||
if (!chat) {
|
||||
NSLog(@"Chat with guid: %@ not found", guid);
|
||||
response = [[HTTPErrorResponse alloc] initWithErrorCode:500];
|
||||
break;
|
||||
}
|
||||
|
||||
// TODO: be smarter about this and mark individual messages as read? Could lead
|
||||
// to a race condition
|
||||
if ([chat unreadMessageCount] > 0) {
|
||||
[chat markAllMessagesAsRead];
|
||||
}
|
||||
dispatch_sync([[self class] sharedIMAccessQueue], ^{
|
||||
IMChat *chat = [sChatRegistry existingChatWithGUID:guid];
|
||||
if (!chat) {
|
||||
NSLog(@"Chat with guid: %@ not found", guid);
|
||||
response = [[HTTPErrorResponse alloc] initWithErrorCode:500];
|
||||
} else {
|
||||
// TODO: be smarter about this and mark individual messages as read? Could lead
|
||||
// to a race condition
|
||||
if ([chat unreadMessageCount] > 0) {
|
||||
[chat markAllMessagesAsRead];
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
response = [[HTTPErrorResponse alloc] initWithErrorCode:200];
|
||||
} while (0);
|
||||
|
||||
@@ -23,17 +23,9 @@
|
||||
|
||||
- (void)main
|
||||
{
|
||||
NSObject<HTTPResponse> *response = nil;
|
||||
__block NSObject<HTTPResponse> *response = nil;
|
||||
do {
|
||||
NSURLComponents *urlComponents = [NSURLComponents componentsWithURL:self.requestURL resolvingAgainstBaseURL:NO];
|
||||
|
||||
NSString *guid = nil;
|
||||
for (NSURLQueryItem *queryItem in [urlComponents queryItems]) {
|
||||
if ([[queryItem name] isEqualToString:@"guid"]) {
|
||||
guid = [queryItem value];
|
||||
break;
|
||||
}
|
||||
}
|
||||
NSString *guid = [self valueForQueryItemWithName:@"guid"];
|
||||
|
||||
if (!guid) {
|
||||
NSLog(@"No query item provided");
|
||||
@@ -41,21 +33,22 @@
|
||||
break;
|
||||
}
|
||||
|
||||
IMChat *chat = [sChatRegistry existingChatWithGUID:guid];
|
||||
if (!chat) {
|
||||
NSLog(@"Chat with guid: %@ not found", guid);
|
||||
response = [[HTTPErrorResponse alloc] initWithErrorCode:500];
|
||||
break;
|
||||
}
|
||||
__block NSMutableArray *messages = [NSMutableArray array];
|
||||
dispatch_sync([[self class] sharedIMAccessQueue], ^{
|
||||
IMChat *chat = [sChatRegistry existingChatWithGUID:guid];
|
||||
if (!chat) {
|
||||
NSLog(@"Chat with guid: %@ not found", guid);
|
||||
response = [[HTTPErrorResponse alloc] initWithErrorCode:500];
|
||||
} else {
|
||||
// Load messages
|
||||
[chat loadMessagesBeforeDate:[NSDate date] limit:50 loadImmediately:YES];
|
||||
|
||||
// Load messages
|
||||
[chat loadMessagesBeforeDate:[NSDate date] limit:50 loadImmediately:YES];
|
||||
|
||||
NSMutableArray *messages = [NSMutableArray array];
|
||||
for (IMMessageItem *imMessage in [[chat chatItems] messages]) {
|
||||
NSDictionary *messageDict = [imMessage mbim_dictionaryRepresentation];
|
||||
[messages addObject:messageDict];
|
||||
}
|
||||
for (IMMessageItem *imMessage in [[chat chatItems] messages]) {
|
||||
NSDictionary *messageDict = [imMessage mbim_dictionaryRepresentation];
|
||||
[messages addObject:messageDict];
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
response = [MBIMJSONDataResponse responseWithJSONObject:messages];
|
||||
} while (0);
|
||||
|
||||
@@ -22,23 +22,27 @@
|
||||
|
||||
- (BOOL)_sendMessage:(NSString *)messageBody toChatWithGUID:(NSString *)chatGUID
|
||||
{
|
||||
IMChat *chat = [sChatRegistry existingChatWithGUID:chatGUID];
|
||||
__block BOOL result = YES;
|
||||
|
||||
// TODO: chat might not be an iMessage chat!
|
||||
IMAccount *iMessageAccount = [[IMAccountController sharedInstance] bestAccountForService:[IMServiceImpl iMessageService]];
|
||||
IMHandle *senderHandle = [iMessageAccount loginIMHandle];
|
||||
dispatch_sync([[self class] sharedIMAccessQueue], ^{
|
||||
IMChat *chat = [sChatRegistry existingChatWithGUID:chatGUID];
|
||||
|
||||
NSAttributedString *replyAttrString = [[NSAttributedString alloc] initWithString:messageBody];
|
||||
IMMessage *reply = [IMMessage fromMeIMHandle:senderHandle withText:replyAttrString fileTransferGUIDs:@[] flags:kIMMessageFinished];
|
||||
// TODO: chat might not be an iMessage chat!
|
||||
IMAccount *iMessageAccount = [[IMAccountController sharedInstance] bestAccountForService:[IMServiceImpl iMessageService]];
|
||||
IMHandle *senderHandle = [iMessageAccount loginIMHandle];
|
||||
|
||||
if (!chat) {
|
||||
NSLog(@"Chat does not exist: %@", chatGUID);
|
||||
return NO;
|
||||
}
|
||||
NSAttributedString *replyAttrString = [[NSAttributedString alloc] initWithString:messageBody];
|
||||
IMMessage *reply = [IMMessage fromMeIMHandle:senderHandle withText:replyAttrString fileTransferGUIDs:@[] flags:kIMMessageFinished];
|
||||
|
||||
[chat sendMessage:reply];
|
||||
if (!chat) {
|
||||
NSLog(@"Chat does not exist: %@", chatGUID);
|
||||
result = NO;
|
||||
} else {
|
||||
[chat sendMessage:reply];
|
||||
}
|
||||
});
|
||||
|
||||
return YES;
|
||||
return result;
|
||||
}
|
||||
|
||||
- (void)main
|
||||
|
||||
@@ -22,15 +22,25 @@
|
||||
|
||||
- (void)main
|
||||
{
|
||||
__weak __auto_type weakSelf = self;
|
||||
_updateConsumer = ^(MBIMUpdateItem *nextUpdateItem) {
|
||||
NSDictionary *updateDict = [nextUpdateItem dictionaryRepresentation];
|
||||
NSInteger messageSeq = -1;
|
||||
NSString *messageSeqString = [self valueForQueryItemWithName:@"seq"];
|
||||
if (messageSeqString) {
|
||||
messageSeq = [messageSeqString integerValue];
|
||||
}
|
||||
|
||||
MBIMJSONDataResponse *response = [MBIMJSONDataResponse responseWithJSONObject:updateDict];
|
||||
__weak __auto_type weakSelf = self;
|
||||
_updateConsumer = ^(NSArray<MBIMUpdateItem *> *updates) {
|
||||
NSMutableArray *encodedUpdates = [NSMutableArray array];
|
||||
for (MBIMUpdateItem *item in updates) {
|
||||
NSDictionary *updateDict = [item dictionaryRepresentation];
|
||||
[encodedUpdates addObject:updateDict];
|
||||
}
|
||||
|
||||
MBIMJSONDataResponse *response = [MBIMJSONDataResponse responseWithJSONObject:encodedUpdates];
|
||||
weakSelf.serverCompletionBlock(response);
|
||||
};
|
||||
|
||||
[[MBIMUpdateQueue sharedInstance] addConsumer:_updateConsumer];
|
||||
[[MBIMUpdateQueue sharedInstance] addConsumer:_updateConsumer withLastSyncedMessageSeq:messageSeq];
|
||||
}
|
||||
|
||||
- (void)cancel
|
||||
|
||||
Reference in New Issue
Block a user