// // MBIMUpdateQueue.m // kordophoned // // Created by James Magahern on 11/16/18. // Copyright © 2018 James Magahern. All rights reserved. // #import "MBIMUpdateQueue.h" #import "IMMessageItem+Encoded.h" #import "IMChat+Encoded.h" static const NSUInteger kUpdateItemsCullingLength = 100; @interface MBIMUpdateItem (/*INTERNAL*/) @property (nonatomic, assign) NSUInteger messageSequenceNumber; @end @implementation MBIMUpdateQueue { NSUInteger _messageSequenceNumber; dispatch_queue_t _accessQueue; NSMutableArray *_consumers; // Maps message sequence number to update item NSMutableDictionary *_updateItemHistory; } + (instancetype)sharedInstance { static MBIMUpdateQueue *sharedInstance = nil; static dispatch_once_t onceToken; dispatch_once(&onceToken, ^{ sharedInstance = [[self alloc] init]; }); return sharedInstance; } - (instancetype)init { self = [super init]; if (self) { _accessQueue = dispatch_queue_create("net.buzzert.MBIMUpdateQueue", DISPATCH_QUEUE_SERIAL); _consumers = [[NSMutableArray alloc] init]; _messageSequenceNumber = 0; _updateItemHistory = [[NSMutableDictionary alloc] init]; } return self; } - (void)addConsumer:(MBIMUpdateConsumer)consumer withLastSyncedMessageSeq:(NSInteger)messageSeq { __weak NSMutableArray *consumers = _consumers; __weak NSMutableDictionary *updateItemHistory = _updateItemHistory; dispatch_async(_accessQueue, ^{ if ((messageSeq >= 0) && messageSeq < self->_messageSequenceNumber) { NSMutableArray *batchedUpdates = [NSMutableArray array]; for (NSUInteger seq = messageSeq + 1; seq <= self->_messageSequenceNumber; seq++) { MBIMUpdateItem *item = [updateItemHistory objectForKey:@(seq)]; if (item) { [batchedUpdates addObject:item]; } } consumer(batchedUpdates); } else { [consumers addObject:consumer]; } }); } - (void)removeConsumer:(MBIMUpdateConsumer)consumer { __weak NSMutableArray *consumers = _consumers; dispatch_async(_accessQueue, ^{ [consumers removeObject:consumer]; }); } - (void)enqueueUpdateItem:(MBIMUpdateItem *)item { __weak NSMutableArray *consumers = _consumers; __weak NSMutableDictionary *updateItemHistory = _updateItemHistory; dispatch_async(_accessQueue, ^{ self->_messageSequenceNumber++; item.messageSequenceNumber = self->_messageSequenceNumber; for (MBIMUpdateConsumer consumer in consumers) { consumer(@[ item ]); } [consumers removeAllObjects]; [updateItemHistory setObject:item forKey:@(item.messageSequenceNumber)]; [self _cullUpdateItems]; }); } - (void)_cullUpdateItems { __weak NSMutableDictionary *updateItemHistory = _updateItemHistory; dispatch_async(_accessQueue, ^{ if ([updateItemHistory count] > kUpdateItemsCullingLength) { NSArray *sortedKeys = [[updateItemHistory allKeys] sortedArrayUsingSelector:@selector(compare:)]; for (NSValue *key in sortedKeys) { [updateItemHistory removeObjectForKey:key]; if ([updateItemHistory count] <= kUpdateItemsCullingLength) { break; } } } }); } @end @implementation MBIMUpdateItem - (NSDictionary *)dictionaryRepresentation { NSMutableDictionary *updateDict = [NSMutableDictionary dictionary]; updateDict[@"messageSequenceNumber"] = @(self.messageSequenceNumber); if ([self changedChat]) { updateDict[@"conversation"] = [[self changedChat] mbim_dictionaryRepresentation]; } if ([self addedMessage]) { updateDict[@"message"] = [[self addedMessage] mbim_dictionaryRepresentation]; } return updateDict; } @end