From dba4910a821ac8408466b20e5ddb34626b0930dc Mon Sep 17 00:00:00 2001 From: James Magahern Date: Wed, 21 Nov 2018 15:51:51 -0700 Subject: [PATCH] Batch updates, and fixing bug where daemon would crash when accessing IMCore stuff from different threads --- kordophone/Bridge/MBIMUpdateQueue.h | 4 +- kordophone/Bridge/MBIMUpdateQueue.m | 53 ++++++++++++++++--- .../Bridge/Operations/MBIMBridgeOperation.h | 5 ++ .../Bridge/Operations/MBIMBridgeOperation.m | 27 ++++++++++ .../MBIMConversationListOperation.m | 14 ++--- .../Operations/MBIMFetchAttachmentOperation.m | 10 +--- .../Bridge/Operations/MBIMMarkOperation.m | 37 ++++++------- .../Operations/MBIMMessagesListOperation.m | 43 +++++++-------- .../Operations/MBIMSendMessageOperation.m | 34 ++++++------ .../Operations/MBIMUpdatePollOperation.m | 18 +++++-- 10 files changed, 155 insertions(+), 90 deletions(-) diff --git a/kordophone/Bridge/MBIMUpdateQueue.h b/kordophone/Bridge/MBIMUpdateQueue.h index b66ed94..64d7a1a 100644 --- a/kordophone/Bridge/MBIMUpdateQueue.h +++ b/kordophone/Bridge/MBIMUpdateQueue.h @@ -18,13 +18,13 @@ NS_ASSUME_NONNULL_BEGIN - (NSDictionary *)dictionaryRepresentation; @end -typedef void (^MBIMUpdateConsumer)(MBIMUpdateItem *item); +typedef void (^MBIMUpdateConsumer)(NSArray *items); @interface MBIMUpdateQueue : NSObject + (instancetype)sharedInstance; -- (void)addConsumer:(MBIMUpdateConsumer)consumer; +- (void)addConsumer:(MBIMUpdateConsumer)consumer withLastSyncedMessageSeq:(NSInteger)messageSeq; - (void)removeConsumer:(MBIMUpdateConsumer)consumer; - (void)enqueueUpdateItem:(MBIMUpdateItem *)item; diff --git a/kordophone/Bridge/MBIMUpdateQueue.m b/kordophone/Bridge/MBIMUpdateQueue.m index d8869ae..c8636aa 100644 --- a/kordophone/Bridge/MBIMUpdateQueue.m +++ b/kordophone/Bridge/MBIMUpdateQueue.m @@ -10,6 +10,8 @@ #import "IMMessageItem+Encoded.h" #import "IMChat+Encoded.h" +static const NSUInteger kUpdateItemsCullingLength = 100; + @interface MBIMUpdateItem (/*INTERNAL*/) @property (nonatomic, assign) NSUInteger messageSequenceNumber; @end @@ -18,6 +20,9 @@ NSUInteger _messageSequenceNumber; dispatch_queue_t _accessQueue; NSMutableArray *_consumers; + + // Maps message sequence number to update item + NSMutableDictionary *_updateItemHistory; } + (instancetype)sharedInstance @@ -38,16 +43,30 @@ _accessQueue = dispatch_queue_create("net.buzzert.MBIMUpdateQueue", DISPATCH_QUEUE_SERIAL); _consumers = [[NSMutableArray alloc] init]; _messageSequenceNumber = 0; + _updateItemHistory = [[NSMutableDictionary alloc] init]; } return self; } -- (void)addConsumer:(MBIMUpdateConsumer)consumer +- (void)addConsumer:(MBIMUpdateConsumer)consumer withLastSyncedMessageSeq:(NSInteger)messageSeq { __weak NSMutableArray *consumers = _consumers; + __weak NSMutableDictionary *updateItemHistory = _updateItemHistory; dispatch_async(_accessQueue, ^{ - [consumers addObject:consumer]; + if ((messageSeq >= 0) && messageSeq < self->_messageSequenceNumber) { + NSMutableArray *batchedUpdates = [NSMutableArray array]; + for (NSUInteger seq = messageSeq + 1; seq <= self->_messageSequenceNumber; seq++) { + MBIMUpdateItem *item = [updateItemHistory objectForKey:@(seq)]; + if (item) { + [batchedUpdates addObject:item]; + } + } + + consumer(batchedUpdates); + } else { + [consumers addObject:consumer]; + } }); } @@ -61,16 +80,36 @@ - (void)enqueueUpdateItem:(MBIMUpdateItem *)item { - _messageSequenceNumber++; - item.messageSequenceNumber = _messageSequenceNumber; - __weak NSMutableArray *consumers = _consumers; + __weak NSMutableDictionary *updateItemHistory = _updateItemHistory; dispatch_async(_accessQueue, ^{ + self->_messageSequenceNumber++; + item.messageSequenceNumber = self->_messageSequenceNumber; + for (MBIMUpdateConsumer consumer in consumers) { - consumer(item); + consumer(@[ item ]); } [consumers removeAllObjects]; + [updateItemHistory setObject:item forKey:@(item.messageSequenceNumber)]; + + [self _cullUpdateItems]; + }); +} + +- (void)_cullUpdateItems +{ + __weak NSMutableDictionary *updateItemHistory = _updateItemHistory; + dispatch_async(_accessQueue, ^{ + if ([updateItemHistory count] > kUpdateItemsCullingLength) { + NSArray *sortedKeys = [[updateItemHistory allKeys] sortedArrayUsingSelector:@selector(compare:)]; + for (NSValue *key in sortedKeys) { + [updateItemHistory removeObjectForKey:key]; + if ([updateItemHistory count] <= kUpdateItemsCullingLength) { + break; + } + } + } }); } @@ -80,7 +119,7 @@ - (NSDictionary *)dictionaryRepresentation { NSMutableDictionary *updateDict = [NSMutableDictionary dictionary]; - updateDict[@"messageSequenceNumber"] = @(_messageSequenceNumber); + updateDict[@"messageSequenceNumber"] = @(self.messageSequenceNumber); if ([self changedChat]) { updateDict[@"conversation"] = [[self changedChat] mbim_dictionaryRepresentation]; diff --git a/kordophone/Bridge/Operations/MBIMBridgeOperation.h b/kordophone/Bridge/Operations/MBIMBridgeOperation.h index 6b09950..b07d1bc 100644 --- a/kordophone/Bridge/Operations/MBIMBridgeOperation.h +++ b/kordophone/Bridge/Operations/MBIMBridgeOperation.h @@ -23,11 +23,16 @@ typedef void (^MBIMBridgeOperationCompletionBlock)(NSObject * _Nul @property (nonatomic, readonly) NSURL *requestURL; @property (nonatomic, readonly) MBIMBridgeOperationCompletionBlock serverCompletionBlock; ++ (dispatch_queue_t)sharedIMAccessQueue; + + (nullable Class)operationClassForEndpointName:(NSString *)endpointName; - (instancetype)initWithRequestURL:(NSURL *)requestURL completion:(MBIMBridgeOperationCompletionBlock)completionBlock; - (NSObject *)cancelAndReturnTimeoutResponse; +// convenience +- (nullable NSString *)valueForQueryItemWithName:(NSString *)queryItemName; + @end NS_ASSUME_NONNULL_END diff --git a/kordophone/Bridge/Operations/MBIMBridgeOperation.m b/kordophone/Bridge/Operations/MBIMBridgeOperation.m index 40ee4f2..5e1ae26 100644 --- a/kordophone/Bridge/Operations/MBIMBridgeOperation.m +++ b/kordophone/Bridge/Operations/MBIMBridgeOperation.m @@ -32,6 +32,17 @@ return operationClassMapping; } ++ (dispatch_queue_t)sharedIMAccessQueue +{ + static dispatch_once_t onceToken; + static dispatch_queue_t accessQueue = nil; + dispatch_once(&onceToken, ^{ + accessQueue = dispatch_queue_create("IMAccessQueue", DISPATCH_QUEUE_SERIAL); + }); + + return accessQueue; +} + + (void)load { if ([self class] != [MBIMBridgeOperation class]) { @@ -61,4 +72,20 @@ return [[HTTPErrorResponse alloc] initWithErrorCode:500]; } +- (NSString *)valueForQueryItemWithName:(NSString *)queryItemName +{ + NSURLComponents *urlComponents = [NSURLComponents componentsWithURL:self.requestURL + resolvingAgainstBaseURL:NO]; + + NSString *value = nil; + for (NSURLQueryItem *queryItem in [urlComponents queryItems]) { + if ([[queryItem name] isEqualToString:queryItemName]) { + value = [queryItem value]; + break; + } + } + + return value; +} + @end diff --git a/kordophone/Bridge/Operations/MBIMConversationListOperation.m b/kordophone/Bridge/Operations/MBIMConversationListOperation.m index 06c8473..076346b 100644 --- a/kordophone/Bridge/Operations/MBIMConversationListOperation.m +++ b/kordophone/Bridge/Operations/MBIMConversationListOperation.m @@ -23,13 +23,15 @@ - (void)main { - NSArray *chats = [sChatRegistry allExistingChats]; + __block NSMutableArray *conversations = [NSMutableArray array]; - NSMutableArray *conversations = [NSMutableArray array]; - for (IMChat *chat in chats) { - NSDictionary *chatDict = [chat mbim_dictionaryRepresentation]; - [conversations addObject:chatDict]; - } + dispatch_sync([[self class] sharedIMAccessQueue], ^{ + NSArray *chats = [sChatRegistry allExistingChats]; + for (IMChat *chat in chats) { + NSDictionary *chatDict = [chat mbim_dictionaryRepresentation]; + [conversations addObject:chatDict]; + } + }); MBIMJSONDataResponse *response = [MBIMJSONDataResponse responseWithJSONObject:conversations]; self.serverCompletionBlock(response); diff --git a/kordophone/Bridge/Operations/MBIMFetchAttachmentOperation.m b/kordophone/Bridge/Operations/MBIMFetchAttachmentOperation.m index 8ef6426..8d167a0 100644 --- a/kordophone/Bridge/Operations/MBIMFetchAttachmentOperation.m +++ b/kordophone/Bridge/Operations/MBIMFetchAttachmentOperation.m @@ -24,15 +24,7 @@ { NSObject *response = nil; do { - NSURLComponents *urlComponents = [NSURLComponents componentsWithURL:self.requestURL resolvingAgainstBaseURL:NO]; - - NSString *guid = nil; - for (NSURLQueryItem *queryItem in [urlComponents queryItems]) { - if ([[queryItem name] isEqualToString:@"guid"]) { - guid = [queryItem value]; - break; - } - } + NSString *guid = [self valueForQueryItemWithName:@"guid"]; if (!guid) { NSLog(@"No query item provided"); diff --git a/kordophone/Bridge/Operations/MBIMMarkOperation.m b/kordophone/Bridge/Operations/MBIMMarkOperation.m index 31c5cd1..5e5f091 100644 --- a/kordophone/Bridge/Operations/MBIMMarkOperation.m +++ b/kordophone/Bridge/Operations/MBIMMarkOperation.m @@ -20,17 +20,9 @@ - (void)main { - NSObject *response = nil; + __block NSObject *response = nil; do { - NSURLComponents *urlComponents = [NSURLComponents componentsWithURL:self.requestURL resolvingAgainstBaseURL:NO]; - - NSString *guid = nil; - for (NSURLQueryItem *queryItem in [urlComponents queryItems]) { - if ([[queryItem name] isEqualToString:@"guid"]) { - guid = [queryItem value]; - break; - } - } + NSString *guid = [self valueForQueryItemWithName:@"guid"]; if (!guid) { NSLog(@"No query item provided"); @@ -38,18 +30,19 @@ break; } - IMChat *chat = [sChatRegistry existingChatWithGUID:guid]; - if (!chat) { - NSLog(@"Chat with guid: %@ not found", guid); - response = [[HTTPErrorResponse alloc] initWithErrorCode:500]; - break; - } - - // TODO: be smarter about this and mark individual messages as read? Could lead - // to a race condition - if ([chat unreadMessageCount] > 0) { - [chat markAllMessagesAsRead]; - } + dispatch_sync([[self class] sharedIMAccessQueue], ^{ + IMChat *chat = [sChatRegistry existingChatWithGUID:guid]; + if (!chat) { + NSLog(@"Chat with guid: %@ not found", guid); + response = [[HTTPErrorResponse alloc] initWithErrorCode:500]; + } else { + // TODO: be smarter about this and mark individual messages as read? Could lead + // to a race condition + if ([chat unreadMessageCount] > 0) { + [chat markAllMessagesAsRead]; + } + } + }); response = [[HTTPErrorResponse alloc] initWithErrorCode:200]; } while (0); diff --git a/kordophone/Bridge/Operations/MBIMMessagesListOperation.m b/kordophone/Bridge/Operations/MBIMMessagesListOperation.m index a50c282..f58355c 100644 --- a/kordophone/Bridge/Operations/MBIMMessagesListOperation.m +++ b/kordophone/Bridge/Operations/MBIMMessagesListOperation.m @@ -23,17 +23,9 @@ - (void)main { - NSObject *response = nil; + __block NSObject *response = nil; do { - NSURLComponents *urlComponents = [NSURLComponents componentsWithURL:self.requestURL resolvingAgainstBaseURL:NO]; - - NSString *guid = nil; - for (NSURLQueryItem *queryItem in [urlComponents queryItems]) { - if ([[queryItem name] isEqualToString:@"guid"]) { - guid = [queryItem value]; - break; - } - } + NSString *guid = [self valueForQueryItemWithName:@"guid"]; if (!guid) { NSLog(@"No query item provided"); @@ -41,21 +33,22 @@ break; } - IMChat *chat = [sChatRegistry existingChatWithGUID:guid]; - if (!chat) { - NSLog(@"Chat with guid: %@ not found", guid); - response = [[HTTPErrorResponse alloc] initWithErrorCode:500]; - break; - } - - // Load messages - [chat loadMessagesBeforeDate:[NSDate date] limit:50 loadImmediately:YES]; - - NSMutableArray *messages = [NSMutableArray array]; - for (IMMessageItem *imMessage in [[chat chatItems] messages]) { - NSDictionary *messageDict = [imMessage mbim_dictionaryRepresentation]; - [messages addObject:messageDict]; - } + __block NSMutableArray *messages = [NSMutableArray array]; + dispatch_sync([[self class] sharedIMAccessQueue], ^{ + IMChat *chat = [sChatRegistry existingChatWithGUID:guid]; + if (!chat) { + NSLog(@"Chat with guid: %@ not found", guid); + response = [[HTTPErrorResponse alloc] initWithErrorCode:500]; + } else { + // Load messages + [chat loadMessagesBeforeDate:[NSDate date] limit:50 loadImmediately:YES]; + + for (IMMessageItem *imMessage in [[chat chatItems] messages]) { + NSDictionary *messageDict = [imMessage mbim_dictionaryRepresentation]; + [messages addObject:messageDict]; + } + } + }); response = [MBIMJSONDataResponse responseWithJSONObject:messages]; } while (0); diff --git a/kordophone/Bridge/Operations/MBIMSendMessageOperation.m b/kordophone/Bridge/Operations/MBIMSendMessageOperation.m index b84c655..cfb545c 100644 --- a/kordophone/Bridge/Operations/MBIMSendMessageOperation.m +++ b/kordophone/Bridge/Operations/MBIMSendMessageOperation.m @@ -22,23 +22,27 @@ - (BOOL)_sendMessage:(NSString *)messageBody toChatWithGUID:(NSString *)chatGUID { - IMChat *chat = [sChatRegistry existingChatWithGUID:chatGUID]; + __block BOOL result = YES; - // TODO: chat might not be an iMessage chat! - IMAccount *iMessageAccount = [[IMAccountController sharedInstance] bestAccountForService:[IMServiceImpl iMessageService]]; - IMHandle *senderHandle = [iMessageAccount loginIMHandle]; + dispatch_sync([[self class] sharedIMAccessQueue], ^{ + IMChat *chat = [sChatRegistry existingChatWithGUID:chatGUID]; + + // TODO: chat might not be an iMessage chat! + IMAccount *iMessageAccount = [[IMAccountController sharedInstance] bestAccountForService:[IMServiceImpl iMessageService]]; + IMHandle *senderHandle = [iMessageAccount loginIMHandle]; + + NSAttributedString *replyAttrString = [[NSAttributedString alloc] initWithString:messageBody]; + IMMessage *reply = [IMMessage fromMeIMHandle:senderHandle withText:replyAttrString fileTransferGUIDs:@[] flags:kIMMessageFinished]; + + if (!chat) { + NSLog(@"Chat does not exist: %@", chatGUID); + result = NO; + } else { + [chat sendMessage:reply]; + } + }); - NSAttributedString *replyAttrString = [[NSAttributedString alloc] initWithString:messageBody]; - IMMessage *reply = [IMMessage fromMeIMHandle:senderHandle withText:replyAttrString fileTransferGUIDs:@[] flags:kIMMessageFinished]; - - if (!chat) { - NSLog(@"Chat does not exist: %@", chatGUID); - return NO; - } - - [chat sendMessage:reply]; - - return YES; + return result; } - (void)main diff --git a/kordophone/Bridge/Operations/MBIMUpdatePollOperation.m b/kordophone/Bridge/Operations/MBIMUpdatePollOperation.m index 5c3d40d..939743a 100644 --- a/kordophone/Bridge/Operations/MBIMUpdatePollOperation.m +++ b/kordophone/Bridge/Operations/MBIMUpdatePollOperation.m @@ -22,15 +22,25 @@ - (void)main { + NSInteger messageSeq = -1; + NSString *messageSeqString = [self valueForQueryItemWithName:@"seq"]; + if (messageSeqString) { + messageSeq = [messageSeqString integerValue]; + } + __weak __auto_type weakSelf = self; - _updateConsumer = ^(MBIMUpdateItem *nextUpdateItem) { - NSDictionary *updateDict = [nextUpdateItem dictionaryRepresentation]; + _updateConsumer = ^(NSArray *updates) { + NSMutableArray *encodedUpdates = [NSMutableArray array]; + for (MBIMUpdateItem *item in updates) { + NSDictionary *updateDict = [item dictionaryRepresentation]; + [encodedUpdates addObject:updateDict]; + } - MBIMJSONDataResponse *response = [MBIMJSONDataResponse responseWithJSONObject:updateDict]; + MBIMJSONDataResponse *response = [MBIMJSONDataResponse responseWithJSONObject:encodedUpdates]; weakSelf.serverCompletionBlock(response); }; - [[MBIMUpdateQueue sharedInstance] addConsumer:_updateConsumer]; + [[MBIMUpdateQueue sharedInstance] addConsumer:_updateConsumer withLastSyncedMessageSeq:messageSeq]; } - (void)cancel