Private
Public Access
1
0
Files
Kordophone/kordophone/Bridge/MBIMUpdateQueue.m

204 lines
6.4 KiB
Mathematica
Raw Normal View History

2018-11-17 01:07:55 -08:00
//
// MBIMUpdateQueue.m
// kordophoned
//
// Created by James Magahern on 11/16/18.
// Copyright © 2018 James Magahern. All rights reserved.
//
#import "MBIMUpdateQueue.h"
#import "IMMessageItem+Encoded.h"
#import "IMChat+Encoded.h"
#import "MBIMHTTPConnection.h"
#import "MBIMURLUtilities.h"
#import "MBIMPingPongWebSocket.h"
#import <CocoaHTTPServer/GCDAsyncSocket.h>
#import <CocoaHTTPServer/HTTPMessage.h>
#import <CocoaHTTPServer/WebSocket.h>
static const NSUInteger kUpdateItemsCullingLength = 100;
@interface MBIMUpdateItem (/*INTERNAL*/) <WebSocketDelegate>
@property (nonatomic, assign) NSUInteger messageSequenceNumber;
@end
2018-11-17 01:07:55 -08:00
@implementation MBIMUpdateQueue {
NSUInteger _messageSequenceNumber;
2018-11-17 01:07:55 -08:00
dispatch_queue_t _accessQueue;
NSMutableArray *_longPollConsumers;
// Maps message sequence number to update item
NSMutableDictionary<NSNumber *, MBIMUpdateItem *> *_updateItemHistory;
// WebSocket consumers
NSMutableDictionary<NSString *, MBIMUpdateConsumer> *_websocketConsumers;
2018-11-17 01:07:55 -08:00
}
+ (instancetype)sharedInstance
{
static MBIMUpdateQueue *sharedInstance = nil;
static dispatch_once_t onceToken;
dispatch_once(&onceToken, ^{
sharedInstance = [[self alloc] init];
});
return sharedInstance;
}
- (instancetype)init
{
self = [super init];
if (self) {
_accessQueue = dispatch_queue_create("net.buzzert.MBIMUpdateQueue", DISPATCH_QUEUE_SERIAL);
_messageSequenceNumber = 0;
_updateItemHistory = [[NSMutableDictionary alloc] init];
_websocketConsumers = [[NSMutableDictionary alloc] init];
_longPollConsumers = [[NSMutableArray alloc] init];
2018-11-17 01:07:55 -08:00
}
return self;
}
- (void)addPollingConsumer:(MBIMUpdateConsumer)consumer withLastSyncedMessageSeq:(NSInteger)messageSeq
2018-11-17 01:07:55 -08:00
{
if (![self _syncConsumer:consumer fromLastMessageSeq:messageSeq]) {
__weak NSMutableArray *consumers = _longPollConsumers;
dispatch_async(_accessQueue, ^{
[consumers addObject:consumer];
});
}
2018-11-17 01:07:55 -08:00
}
- (void)removePollingConsumer:(MBIMUpdateConsumer)consumer
{
__weak NSMutableArray *consumers = _longPollConsumers;
dispatch_async(_accessQueue, ^{
[consumers removeObject:consumer];
});
}
2018-11-17 01:07:55 -08:00
- (void)enqueueUpdateItem:(MBIMUpdateItem *)item
{
__weak __auto_type pollingConsumers = _longPollConsumers;
__weak __auto_type websocketConsumers = _websocketConsumers;
__weak NSMutableDictionary *updateItemHistory = _updateItemHistory;
2018-11-17 01:07:55 -08:00
dispatch_async(_accessQueue, ^{
self->_messageSequenceNumber++;
item.messageSequenceNumber = self->_messageSequenceNumber;
// Notify polling consumers
for (MBIMUpdateConsumer consumer in pollingConsumers) {
consumer(@[ item ]);
}
[pollingConsumers removeAllObjects];
// Notify websocket consumers
for (MBIMUpdateConsumer consumer in [websocketConsumers allValues]) {
consumer(@[ item ]);
2018-11-17 01:07:55 -08:00
}
[updateItemHistory setObject:item forKey:@(item.messageSequenceNumber)];
[self _cullUpdateItems];
});
}
- (WebSocket *)vendUpdateWebSocketConsumerForRequest:(HTTPMessage *)request socket:(GCDAsyncSocket *)gcdSocket
{
WebSocket *socket = [[MBIMPingPongWebSocket alloc] initWithRequest:request socket:gcdSocket];
socket.delegate = self;
MBIMUpdateConsumer consumer = ^(NSArray<MBIMUpdateItem *> *updates) {
NSMutableArray *encodedUpdates = [NSMutableArray array];
for (MBIMUpdateItem *item in updates) {
NSDictionary *updateDict = [item dictionaryRepresentation];
[encodedUpdates addObject:updateDict];
}
NSData *data = [NSJSONSerialization dataWithJSONObject:encodedUpdates options:0 error:NULL];
[socket sendData:data];
};
NSString *messageSeqString = [[request url] valueForQueryItemWithName:@"seq"];
[self _syncConsumer:consumer fromLastMessageSeq:(messageSeqString ? [messageSeqString integerValue] : -1)];
__weak __auto_type websocketConsumers = _websocketConsumers;
dispatch_async(_accessQueue, ^{
NSString *websocketKey = [socket description];
[websocketConsumers setObject:consumer forKey:websocketKey];
});
return socket;
}
- (BOOL)_syncConsumer:(MBIMUpdateConsumer)consumer fromLastMessageSeq:(NSInteger)messageSeq
{
const BOOL needsSync = (messageSeq >= 0) && messageSeq < self->_messageSequenceNumber;
if (needsSync) {
__weak NSMutableDictionary *updateItemHistory = _updateItemHistory;
dispatch_async(_accessQueue, ^{
NSMutableArray *batchedUpdates = [NSMutableArray array];
for (NSUInteger seq = messageSeq + 1; seq <= self->_messageSequenceNumber; seq++) {
MBIMUpdateItem *item = [updateItemHistory objectForKey:@(seq)];
if (item) {
[batchedUpdates addObject:item];
}
}
consumer(batchedUpdates);
});
}
return needsSync;
}
- (void)_cullUpdateItems
{
__weak NSMutableDictionary *updateItemHistory = _updateItemHistory;
dispatch_async(_accessQueue, ^{
if ([updateItemHistory count] > kUpdateItemsCullingLength) {
NSArray *sortedKeys = [[updateItemHistory allKeys] sortedArrayUsingSelector:@selector(compare:)];
for (NSValue *key in sortedKeys) {
[updateItemHistory removeObjectForKey:key];
if ([updateItemHistory count] <= kUpdateItemsCullingLength) {
break;
}
}
}
2018-11-17 01:07:55 -08:00
});
}
#pragma mark - <WebSocketDelegate>
- (void)webSocketDidClose:(WebSocket *)ws
{
// xxx: not great, but works.
NSString *websocketKey = [ws description];
__weak __auto_type websocketConsumers = _websocketConsumers;
dispatch_async(_accessQueue, ^{
[websocketConsumers removeObjectForKey:websocketKey];
});
}
2018-11-17 01:07:55 -08:00
@end
@implementation MBIMUpdateItem
- (NSDictionary *)dictionaryRepresentation
{
NSMutableDictionary *updateDict = [NSMutableDictionary dictionary];
updateDict[@"messageSequenceNumber"] = @(self.messageSequenceNumber);
if ([self changedChat]) {
updateDict[@"conversation"] = [[self changedChat] mbim_dictionaryRepresentation];
}
if ([self addedMessage]) {
updateDict[@"message"] = [[self addedMessage] mbim_dictionaryRepresentation];
}
return updateDict;
}
2018-11-17 01:07:55 -08:00
@end