Private
Public Access
1
0
Files
Kordophone/kordophone2/XPC/XPCClient.swift

364 lines
13 KiB
Swift
Raw Normal View History

2025-08-24 16:24:21 -07:00
//
// XPCClient.swift
// kordophone2
//
// Created by James Magahern on 8/24/25.
//
import Foundation
2025-08-29 19:45:27 -06:00
import ServiceManagement
2025-08-24 16:24:21 -07:00
import XPC
private let serviceName = "net.buzzert.kordophonecd"
final class XPCClient
{
2025-08-29 22:30:37 -06:00
private var connection: xpc_connection_t?
private let connectionQueue = DispatchQueue(label: "net.buzzert.kordophone.xpc.connection")
private var isReconnecting: Bool = false
private var reconnectAttempt: Int = 0
2025-08-24 18:41:42 -07:00
private let signalLock = NSLock()
private var signalSinks: [UUID: (Signal) -> Void] = [:]
private var didSubscribeSignals: Bool = false
2025-08-29 19:45:27 -06:00
static let appService: SMAppService = {
do {
let service = SMAppService.agent(plistName: "net.buzzert.kordophonecd.plist")
if service.status != .enabled {
try service.register()
}
return service
} catch {
2025-08-29 22:30:37 -06:00
print("Unable to register agent: \(error)")
fatalError()
2025-08-29 19:45:27 -06:00
}
}()
2025-08-24 16:24:21 -07:00
init() {
2025-08-29 19:45:27 -06:00
_ = Self.appService
2025-08-29 22:30:37 -06:00
connect()
2025-08-24 16:24:21 -07:00
}
2025-08-24 18:41:42 -07:00
public func eventStream() -> AsyncStream<Signal> {
// Auto-subscribe on first stream creation
if !didSubscribeSignals {
didSubscribeSignals = true
Task {
try? await subscribeToSignals()
}
}
return AsyncStream { continuation in
let id = UUID()
signalLock.withLock {
signalSinks[id] = { signal in
continuation.yield(signal)
}
}
continuation.onTermination = { [weak self] _ in
guard let self else { return }
_ = self.signalLock.withLock {
self.signalSinks.removeValue(forKey: id)
}
}
}
}
2025-08-24 16:24:21 -07:00
public func getVersion() async throws -> String {
let req = makeRequest(method: "GetVersion")
guard let reply = try await sendSync(req), xpc_get_type(reply) == XPC_TYPE_DICTIONARY else { throw Error.typeError }
if let cstr = xpc_dictionary_get_string(reply, "version") {
return String(cString: cstr)
}
throw Error.typeError
}
public func getConversations(limit: Int = 100, offset: Int = 0) async throws -> [Serialized.Conversation] {
var args: [String: xpc_object_t] = [:]
args["limit"] = xpcString(String(limit))
args["offset"] = xpcString(String(offset))
let req = makeRequest(method: "GetConversations", arguments: args)
guard let reply = try await sendSync(req), xpc_get_type(reply) == XPC_TYPE_DICTIONARY else { return [] }
guard let items = xpc_dictionary_get_value(reply, "conversations"), xpc_get_type(items) == XPC_TYPE_ARRAY else { return [] }
var results: [Serialized.Conversation] = []
xpc_array_apply(items) { _, element in
if xpc_get_type(element) == XPC_TYPE_DICTIONARY, let conv = Serialized.Conversation(xpc: element) {
results.append(conv)
}
2025-08-24 18:41:42 -07:00
return true
2025-08-24 17:58:37 -07:00
}
return results
}
2025-08-25 00:13:55 -07:00
public func syncConversation(conversationId: String) async throws {
let req = makeRequest(method: "SyncConversation", arguments: ["conversation_id": xpcString(conversationId)])
_ = try await sendSync(req)
}
2025-08-25 00:37:48 -07:00
public func syncConversationList() async throws {
let req = makeRequest(method: "SyncConversationList")
_ = try await sendSync(req)
}
public func markConversationAsRead(conversationId: String) async throws {
let req = makeRequest(method: "MarkConversationAsRead", arguments: ["conversation_id": xpcString(conversationId)])
_ = try await sendSync(req)
}
2025-08-24 17:58:37 -07:00
public func getMessages(conversationId: String, limit: Int = 100, offset: Int = 0) async throws -> [Serialized.Message] {
var args: [String: xpc_object_t] = [:]
args["conversation_id"] = xpcString(conversationId)
args["limit"] = xpcString(String(limit))
args["offset"] = xpcString(String(offset))
let req = makeRequest(method: "GetMessages", arguments: args)
guard let reply = try await sendSync(req), xpc_get_type(reply) == XPC_TYPE_DICTIONARY else { return [] }
guard let items = xpc_dictionary_get_value(reply, "messages"), xpc_get_type(items) == XPC_TYPE_ARRAY else { return [] }
var results: [Serialized.Message] = []
xpc_array_apply(items) { _, element in
if xpc_get_type(element) == XPC_TYPE_DICTIONARY, let msg = Serialized.Message(xpc: element) {
results.append(msg)
}
2025-08-24 18:41:42 -07:00
2025-08-24 16:24:21 -07:00
return true
}
2025-08-24 17:58:37 -07:00
2025-08-24 16:24:21 -07:00
return results
}
2025-08-24 17:58:37 -07:00
public func sendMessage(conversationId: String, message: String) async throws {
var args: [String: xpc_object_t] = [:]
args["conversation_id"] = xpcString(conversationId)
args["text"] = xpcString(message)
let req = makeRequest(method: "SendMessage", arguments: args)
guard let reply = try await sendSync(req), xpc_get_type(reply) == XPC_TYPE_DICTIONARY else { throw Error.typeError }
}
2025-08-24 23:38:35 -07:00
public func downloadAttachment(attachmentId: String, preview: Bool) async throws {
var args: [String: xpc_object_t] = [:]
args["attachment_id"] = xpcString(attachmentId)
args["preview"] = xpcString(preview ? "true" : "false")
let req = makeRequest(method: "DownloadAttachment", arguments: args)
_ = try await sendSync(req)
}
public func openAttachmentFileHandle(attachmentId: String, preview: Bool) async throws -> FileHandle {
var args: [String: xpc_object_t] = [:]
args["attachment_id"] = xpcString(attachmentId)
args["preview"] = xpcString(preview ? "true" : "false")
let req = makeRequest(method: "OpenAttachmentFd", arguments: args)
guard let reply = try await sendSync(req), xpc_get_type(reply) == XPC_TYPE_DICTIONARY else { throw Error.typeError }
let fd = xpc_dictionary_dup_fd(reply, "fd")
2025-08-29 18:49:00 -06:00
let fileHandler = FileHandle(fileDescriptor: fd, closeOnDealloc: true)
2025-08-24 23:38:35 -07:00
2025-08-29 18:49:00 -06:00
if fd < 0 { throw Error.badFileHandle }
return fileHandler
2025-08-24 23:38:35 -07:00
}
2025-08-25 00:13:55 -07:00
public func getSettings() async throws -> Serialized.Settings {
let req = makeRequest(method: "GetAllSettings")
guard let reply = try await sendSync(req), xpc_get_type(reply) == XPC_TYPE_DICTIONARY else { throw Error.typeError }
return Serialized.Settings.fromXPC(reply) ?? Serialized.Settings(serverUrl: "", username: "")
}
public func setSettings(settings: Serialized.Settings) async throws {
let req = makeRequest(
method: "UpdateSettings",
arguments: [
"server_url": xpcString(settings.serverUrl),
"username": xpcString(settings.username),
]
)
_ = try await sendSync(req)
}
2025-08-24 16:24:21 -07:00
// MARK: - Types
enum Error: Swift.Error
{
case typeError
case encodingError
2025-08-29 18:49:00 -06:00
case badFileHandle
2025-08-29 22:30:37 -06:00
case connectionError
2025-08-24 16:24:21 -07:00
}
2025-08-24 18:41:42 -07:00
enum Signal
{
case conversationsUpdated
case messagesUpdated(conversationId: String)
case attachmentDownloaded(attachmentId: String)
case attachmentUploaded(uploadGuid: String, attachmentGuid: String)
case updateStreamReconnected
}
2025-08-24 16:24:21 -07:00
}
extension XPCClient
{
2025-08-29 22:30:37 -06:00
private func connect() {
connectionQueue.async { [weak self] in
guard let self else { return }
if let existing = connection {
xpc_connection_cancel(existing)
connection = nil
}
let newConn = xpc_connection_create_mach_service(serviceName, nil, 0)
let handler: xpc_handler_t = { [weak self] event in
self?.handleIncomingXPCEvent(event)
}
xpc_connection_set_event_handler(newConn, handler)
xpc_connection_resume(newConn)
self.connection = newConn
self.isReconnecting = false
self.reconnectAttempt = 0
}
if didSubscribeSignals {
Task { try? await subscribeToSignals() }
}
}
private func scheduleReconnect() {
connectionQueue.async { [weak self] in
guard let self else { return }
if self.isReconnecting { return }
self.isReconnecting = true
let attempt = self.reconnectAttempt
self.reconnectAttempt += 1
let delaySeconds = min(pow(2.0, Double(attempt)), 30.0)
self.connectionQueue.asyncAfter(deadline: .now() + delaySeconds) { [weak self] in
self?.connect()
}
}
}
2025-08-24 18:41:42 -07:00
private func subscribeToSignals() async throws {
let req = makeRequest(method: "SubscribeSignals")
_ = try await sendSync(req)
}
2025-08-24 16:24:21 -07:00
private func makeRequest(method: String, arguments: [String: xpc_object_t]? = nil) -> xpc_object_t {
let dict = xpc_dictionary_create(nil, nil, 0)
xpc_dictionary_set_string(dict, "method", method)
if let args = arguments {
let argsDict = xpc_dictionary_create(nil, nil, 0)
for (k, v) in args {
k.withCString { cKey in
xpc_dictionary_set_value(argsDict, cKey, v)
}
}
xpc_dictionary_set_value(dict, "arguments", argsDict)
}
return dict
}
private func xpcString(_ s: String) -> xpc_object_t {
return s.withCString { ptr in xpc_string_create(ptr) }
}
private func sendSync(_ request: xpc_object_t) async throws -> xpc_object_t? {
try await withCheckedThrowingContinuation { continuation in
2025-08-29 22:30:37 -06:00
let conn: xpc_connection_t? = self.connectionQueue.sync { self.connection }
guard let conn else {
self.scheduleReconnect()
continuation.resume(throwing: Error.connectionError)
return
}
xpc_connection_send_message_with_reply(conn, request, DispatchQueue.global(qos: .userInitiated)) { r in
2025-08-24 16:24:21 -07:00
switch xpc_get_type(r) {
case XPC_TYPE_ERROR:
2025-08-29 22:30:37 -06:00
if r.isInterruptionError {
self.scheduleReconnect()
continuation.resume(throwing: Error.connectionError)
} else {
continuation.resume(throwing: Error.typeError)
2025-08-24 16:24:21 -07:00
}
case XPC_TYPE_DICTIONARY:
continuation.resume(returning: r)
default:
continuation.resume(throwing: Error.typeError)
}
}
}
}
2025-08-24 18:41:42 -07:00
private func handleIncomingXPCEvent(_ event: xpc_object_t) {
2025-08-24 18:54:50 -07:00
switch xpc_get_type(event) {
case XPC_TYPE_DICTIONARY:
guard let eventDict = XPCDictionary(event), let name: String = eventDict["name"] else { return }
let args = eventDict.object("arguments").flatMap { XPCDictionary($0) }
let signal: Signal? = {
switch name {
case "ConversationsUpdated":
return .conversationsUpdated
case "MessagesUpdated":
if let args, let cid: String = args["conversation_id"] { return .messagesUpdated(conversationId: cid) }
return nil
case "AttachmentDownloadCompleted":
if let args, let aid: String = args["attachment_id"] { return .attachmentDownloaded(attachmentId: aid) }
return nil
case "AttachmentUploadCompleted":
if let args,
let uploadGuid: String = args["upload_guid"],
let attachmentGuid: String = args["attachment_guid"] {
return .attachmentUploaded(uploadGuid: uploadGuid, attachmentGuid: attachmentGuid)
2025-08-24 18:41:42 -07:00
}
2025-08-24 18:54:50 -07:00
return nil
case "UpdateStreamReconnected":
return .updateStreamReconnected
default:
return nil
2025-08-24 18:41:42 -07:00
}
2025-08-24 18:54:50 -07:00
}()
2025-08-24 18:41:42 -07:00
2025-08-24 18:54:50 -07:00
if let signal {
signalLock.lock()
let sinks = signalSinks.values
signalLock.unlock()
for sink in sinks { sink(signal) }
}
2025-08-24 18:41:42 -07:00
2025-08-24 18:54:50 -07:00
case XPC_TYPE_ERROR:
2025-08-29 22:30:37 -06:00
if event.isInterruptionError {
scheduleReconnect()
2025-08-24 18:54:50 -07:00
}
default:
break
2025-08-24 18:41:42 -07:00
}
}
2025-08-24 16:24:21 -07:00
}
2025-08-29 22:30:37 -06:00
extension xpc_object_t
{
var isInterruptionError: Bool {
return (
xpc_equal(self, XPC_ERROR_CONNECTION_INTERRUPTED) ||
xpc_equal(self, XPC_ERROR_CONNECTION_INVALID) ||
xpc_equal(self, XPC_ERROR_TERMINATION_IMMINENT)
)
}
}
2025-08-24 16:24:21 -07:00