// // MBIMUpdateQueue.m // kordophoned // // Created by James Magahern on 11/16/18. // Copyright © 2018 James Magahern. All rights reserved. // #import "MBIMUpdateQueue.h" #import "IMMessageItem+Encoded.h" #import "IMChat+Encoded.h" #import "MBIMHTTPConnection.h" #import "MBIMURLUtilities.h" #import #import #import static const NSUInteger kUpdateItemsCullingLength = 100; @interface MBIMUpdateItem (/*INTERNAL*/) @property (nonatomic, assign) NSUInteger messageSequenceNumber; @end @implementation MBIMUpdateQueue { NSUInteger _messageSequenceNumber; dispatch_queue_t _accessQueue; NSMutableArray *_longPollConsumers; // Maps message sequence number to update item NSMutableDictionary *_updateItemHistory; // WebSocket consumers NSMutableDictionary *_websocketConsumers; } + (instancetype)sharedInstance { static MBIMUpdateQueue *sharedInstance = nil; static dispatch_once_t onceToken; dispatch_once(&onceToken, ^{ sharedInstance = [[self alloc] init]; }); return sharedInstance; } - (instancetype)init { self = [super init]; if (self) { _accessQueue = dispatch_queue_create("net.buzzert.MBIMUpdateQueue", DISPATCH_QUEUE_SERIAL); _messageSequenceNumber = 0; _updateItemHistory = [[NSMutableDictionary alloc] init]; _websocketConsumers = [[NSMutableDictionary alloc] init]; _longPollConsumers = [[NSMutableArray alloc] init]; } return self; } - (void)addPollingConsumer:(MBIMUpdateConsumer)consumer withLastSyncedMessageSeq:(NSInteger)messageSeq { if (![self _syncConsumer:consumer fromLastMessageSeq:messageSeq]) { __weak NSMutableArray *consumers = _longPollConsumers; dispatch_async(_accessQueue, ^{ [consumers addObject:consumer]; }); } } - (void)removePollingConsumer:(MBIMUpdateConsumer)consumer { __weak NSMutableArray *consumers = _longPollConsumers; dispatch_async(_accessQueue, ^{ [consumers removeObject:consumer]; }); } - (void)enqueueUpdateItem:(MBIMUpdateItem *)item { __weak __auto_type pollingConsumers = _longPollConsumers; __weak __auto_type websocketConsumers = _websocketConsumers; __weak NSMutableDictionary *updateItemHistory = _updateItemHistory; dispatch_async(_accessQueue, ^{ self->_messageSequenceNumber++; item.messageSequenceNumber = self->_messageSequenceNumber; // Notify polling consumers for (MBIMUpdateConsumer consumer in pollingConsumers) { consumer(@[ item ]); } [pollingConsumers removeAllObjects]; // Notify websocket consumers for (MBIMUpdateConsumer consumer in [websocketConsumers allValues]) { consumer(@[ item ]); } [updateItemHistory setObject:item forKey:@(item.messageSequenceNumber)]; [self _cullUpdateItems]; }); } - (WebSocket *)vendUpdateWebSocketConsumerForRequest:(HTTPMessage *)request socket:(GCDAsyncSocket *)gcdSocket { WebSocket *socket = [[WebSocket alloc] initWithRequest:request socket:gcdSocket]; socket.delegate = self; MBIMUpdateConsumer consumer = ^(NSArray *updates) { NSMutableArray *encodedUpdates = [NSMutableArray array]; for (MBIMUpdateItem *item in updates) { NSDictionary *updateDict = [item dictionaryRepresentation]; [encodedUpdates addObject:updateDict]; } NSData *data = [NSJSONSerialization dataWithJSONObject:encodedUpdates options:0 error:NULL]; [socket sendData:data]; }; NSString *messageSeqString = [[request url] valueForQueryItemWithName:@"seq"]; [self _syncConsumer:consumer fromLastMessageSeq:(messageSeqString ? [messageSeqString integerValue] : -1)]; __weak __auto_type websocketConsumers = _websocketConsumers; dispatch_async(_accessQueue, ^{ NSString *websocketKey = [socket description]; [websocketConsumers setObject:consumer forKey:websocketKey]; }); return socket; } - (BOOL)_syncConsumer:(MBIMUpdateConsumer)consumer fromLastMessageSeq:(NSInteger)messageSeq { const BOOL needsSync = (messageSeq >= 0) && messageSeq < self->_messageSequenceNumber; if (needsSync) { __weak NSMutableDictionary *updateItemHistory = _updateItemHistory; dispatch_async(_accessQueue, ^{ NSMutableArray *batchedUpdates = [NSMutableArray array]; for (NSUInteger seq = messageSeq + 1; seq <= self->_messageSequenceNumber; seq++) { MBIMUpdateItem *item = [updateItemHistory objectForKey:@(seq)]; if (item) { [batchedUpdates addObject:item]; } } consumer(batchedUpdates); }); } return needsSync; } - (void)_cullUpdateItems { __weak NSMutableDictionary *updateItemHistory = _updateItemHistory; dispatch_async(_accessQueue, ^{ if ([updateItemHistory count] > kUpdateItemsCullingLength) { NSArray *sortedKeys = [[updateItemHistory allKeys] sortedArrayUsingSelector:@selector(compare:)]; for (NSValue *key in sortedKeys) { [updateItemHistory removeObjectForKey:key]; if ([updateItemHistory count] <= kUpdateItemsCullingLength) { break; } } } }); } #pragma mark - - (void)webSocketDidClose:(WebSocket *)ws { // xxx: not great, but works. NSString *websocketKey = [ws description]; __weak __auto_type websocketConsumers = _websocketConsumers; dispatch_async(_accessQueue, ^{ [websocketConsumers removeObjectForKey:websocketKey]; }); } @end @implementation MBIMUpdateItem - (NSDictionary *)dictionaryRepresentation { NSMutableDictionary *updateDict = [NSMutableDictionary dictionary]; updateDict[@"messageSequenceNumber"] = @(self.messageSequenceNumber); if ([self changedChat]) { updateDict[@"conversation"] = [[self changedChat] mbim_dictionaryRepresentation]; } if ([self addedMessage]) { updateDict[@"message"] = [[self addedMessage] mbim_dictionaryRepresentation]; } return updateDict; } @end