Adds websocket updates via the /updates endpoint
This commit is contained in:
@@ -9,20 +9,29 @@
|
||||
#import "MBIMUpdateQueue.h"
|
||||
#import "IMMessageItem+Encoded.h"
|
||||
#import "IMChat+Encoded.h"
|
||||
#import "MBIMHTTPConnection.h"
|
||||
#import "MBIMURLUtilities.h"
|
||||
|
||||
#import <CocoaHTTPServer/GCDAsyncSocket.h>
|
||||
#import <CocoaHTTPServer/HTTPMessage.h>
|
||||
#import <CocoaHTTPServer/WebSocket.h>
|
||||
|
||||
static const NSUInteger kUpdateItemsCullingLength = 100;
|
||||
|
||||
@interface MBIMUpdateItem (/*INTERNAL*/)
|
||||
@interface MBIMUpdateItem (/*INTERNAL*/) <WebSocketDelegate>
|
||||
@property (nonatomic, assign) NSUInteger messageSequenceNumber;
|
||||
@end
|
||||
|
||||
@implementation MBIMUpdateQueue {
|
||||
NSUInteger _messageSequenceNumber;
|
||||
dispatch_queue_t _accessQueue;
|
||||
NSMutableArray *_consumers;
|
||||
NSMutableArray *_longPollConsumers;
|
||||
|
||||
// Maps message sequence number to update item
|
||||
NSMutableDictionary<NSNumber *, MBIMUpdateItem *> *_updateItemHistory;
|
||||
|
||||
// WebSocket consumers
|
||||
NSMutableDictionary<NSString *, MBIMUpdateConsumer> *_websocketConsumers;
|
||||
}
|
||||
|
||||
+ (instancetype)sharedInstance
|
||||
@@ -41,20 +50,93 @@ static const NSUInteger kUpdateItemsCullingLength = 100;
|
||||
self = [super init];
|
||||
if (self) {
|
||||
_accessQueue = dispatch_queue_create("net.buzzert.MBIMUpdateQueue", DISPATCH_QUEUE_SERIAL);
|
||||
_consumers = [[NSMutableArray alloc] init];
|
||||
_messageSequenceNumber = 0;
|
||||
_updateItemHistory = [[NSMutableDictionary alloc] init];
|
||||
_websocketConsumers = [[NSMutableDictionary alloc] init];
|
||||
_longPollConsumers = [[NSMutableArray alloc] init];
|
||||
}
|
||||
|
||||
return self;
|
||||
}
|
||||
|
||||
- (void)addConsumer:(MBIMUpdateConsumer)consumer withLastSyncedMessageSeq:(NSInteger)messageSeq
|
||||
- (void)addPollingConsumer:(MBIMUpdateConsumer)consumer withLastSyncedMessageSeq:(NSInteger)messageSeq
|
||||
{
|
||||
__weak NSMutableArray *consumers = _consumers;
|
||||
if (![self _syncConsumer:consumer fromLastMessageSeq:messageSeq]) {
|
||||
__weak NSMutableArray *consumers = _longPollConsumers;
|
||||
dispatch_async(_accessQueue, ^{
|
||||
[consumers addObject:consumer];
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
- (void)removePollingConsumer:(MBIMUpdateConsumer)consumer
|
||||
{
|
||||
__weak NSMutableArray *consumers = _longPollConsumers;
|
||||
dispatch_async(_accessQueue, ^{
|
||||
[consumers removeObject:consumer];
|
||||
});
|
||||
}
|
||||
|
||||
- (void)enqueueUpdateItem:(MBIMUpdateItem *)item
|
||||
{
|
||||
__weak __auto_type pollingConsumers = _longPollConsumers;
|
||||
__weak __auto_type websocketConsumers = _websocketConsumers;
|
||||
__weak NSMutableDictionary *updateItemHistory = _updateItemHistory;
|
||||
dispatch_async(_accessQueue, ^{
|
||||
if ((messageSeq >= 0) && messageSeq < self->_messageSequenceNumber) {
|
||||
self->_messageSequenceNumber++;
|
||||
item.messageSequenceNumber = self->_messageSequenceNumber;
|
||||
|
||||
// Notify polling consumers
|
||||
for (MBIMUpdateConsumer consumer in pollingConsumers) {
|
||||
consumer(@[ item ]);
|
||||
}
|
||||
[pollingConsumers removeAllObjects];
|
||||
|
||||
// Notify websocket consumers
|
||||
for (MBIMUpdateConsumer consumer in [websocketConsumers allValues]) {
|
||||
consumer(@[ item ]);
|
||||
}
|
||||
|
||||
[updateItemHistory setObject:item forKey:@(item.messageSequenceNumber)];
|
||||
|
||||
[self _cullUpdateItems];
|
||||
});
|
||||
}
|
||||
|
||||
- (WebSocket *)vendUpdateWebSocketConsumerForRequest:(HTTPMessage *)request socket:(GCDAsyncSocket *)gcdSocket
|
||||
{
|
||||
WebSocket *socket = [[WebSocket alloc] initWithRequest:request socket:gcdSocket];
|
||||
socket.delegate = self;
|
||||
|
||||
MBIMUpdateConsumer consumer = ^(NSArray<MBIMUpdateItem *> *updates) {
|
||||
NSMutableArray *encodedUpdates = [NSMutableArray array];
|
||||
for (MBIMUpdateItem *item in updates) {
|
||||
NSDictionary *updateDict = [item dictionaryRepresentation];
|
||||
[encodedUpdates addObject:updateDict];
|
||||
}
|
||||
|
||||
NSData *data = [NSJSONSerialization dataWithJSONObject:encodedUpdates options:0 error:NULL];
|
||||
[socket sendData:data];
|
||||
};
|
||||
|
||||
NSString *messageSeqString = [[request url] valueForQueryItemWithName:@"seq"];
|
||||
[self _syncConsumer:consumer fromLastMessageSeq:(messageSeqString ? [messageSeqString integerValue] : -1)];
|
||||
|
||||
__weak __auto_type websocketConsumers = _websocketConsumers;
|
||||
dispatch_async(_accessQueue, ^{
|
||||
NSString *websocketKey = [socket description];
|
||||
[websocketConsumers setObject:consumer forKey:websocketKey];
|
||||
});
|
||||
|
||||
return socket;
|
||||
}
|
||||
|
||||
- (BOOL)_syncConsumer:(MBIMUpdateConsumer)consumer fromLastMessageSeq:(NSInteger)messageSeq
|
||||
{
|
||||
const BOOL needsSync = (messageSeq >= 0) && messageSeq < self->_messageSequenceNumber;
|
||||
if (needsSync) {
|
||||
__weak NSMutableDictionary *updateItemHistory = _updateItemHistory;
|
||||
dispatch_async(_accessQueue, ^{
|
||||
NSMutableArray *batchedUpdates = [NSMutableArray array];
|
||||
for (NSUInteger seq = messageSeq + 1; seq <= self->_messageSequenceNumber; seq++) {
|
||||
MBIMUpdateItem *item = [updateItemHistory objectForKey:@(seq)];
|
||||
@@ -64,37 +146,10 @@ static const NSUInteger kUpdateItemsCullingLength = 100;
|
||||
}
|
||||
|
||||
consumer(batchedUpdates);
|
||||
} else {
|
||||
[consumers addObject:consumer];
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
- (void)removeConsumer:(MBIMUpdateConsumer)consumer
|
||||
{
|
||||
__weak NSMutableArray *consumers = _consumers;
|
||||
dispatch_async(_accessQueue, ^{
|
||||
[consumers removeObject:consumer];
|
||||
});
|
||||
}
|
||||
|
||||
- (void)enqueueUpdateItem:(MBIMUpdateItem *)item
|
||||
{
|
||||
__weak NSMutableArray *consumers = _consumers;
|
||||
__weak NSMutableDictionary *updateItemHistory = _updateItemHistory;
|
||||
dispatch_async(_accessQueue, ^{
|
||||
self->_messageSequenceNumber++;
|
||||
item.messageSequenceNumber = self->_messageSequenceNumber;
|
||||
|
||||
for (MBIMUpdateConsumer consumer in consumers) {
|
||||
consumer(@[ item ]);
|
||||
}
|
||||
|
||||
[consumers removeAllObjects];
|
||||
[updateItemHistory setObject:item forKey:@(item.messageSequenceNumber)];
|
||||
|
||||
[self _cullUpdateItems];
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
return needsSync;
|
||||
}
|
||||
|
||||
- (void)_cullUpdateItems
|
||||
@@ -113,6 +168,18 @@ static const NSUInteger kUpdateItemsCullingLength = 100;
|
||||
});
|
||||
}
|
||||
|
||||
#pragma mark - <WebSocketDelegate>
|
||||
|
||||
- (void)webSocketDidClose:(WebSocket *)ws
|
||||
{
|
||||
// xxx: not great, but works.
|
||||
NSString *websocketKey = [ws description];
|
||||
__weak __auto_type websocketConsumers = _websocketConsumers;
|
||||
dispatch_async(_accessQueue, ^{
|
||||
[websocketConsumers removeObjectForKey:websocketKey];
|
||||
});
|
||||
}
|
||||
|
||||
@end
|
||||
|
||||
@implementation MBIMUpdateItem
|
||||
|
||||
Reference in New Issue
Block a user