// // XPCClient.swift // kordophone2 // // Created by James Magahern on 8/24/25. // import Foundation import XPC private let serviceName = "net.buzzert.kordophonecd" final class XPCClient { private let connection: xpc_connection_t private let signalLock = NSLock() private var signalSinks: [UUID: (Signal) -> Void] = [:] private var didSubscribeSignals: Bool = false init() { self.connection = xpc_connection_create_mach_service(serviceName, nil, 0) let handler: xpc_handler_t = { [weak self] event in self?.handleIncomingXPCEvent(event) } xpc_connection_set_event_handler(connection, handler) xpc_connection_resume(connection) } public func eventStream() -> AsyncStream { // Auto-subscribe on first stream creation if !didSubscribeSignals { didSubscribeSignals = true Task { try? await subscribeToSignals() } } return AsyncStream { continuation in let id = UUID() signalLock.withLock { signalSinks[id] = { signal in continuation.yield(signal) } } continuation.onTermination = { [weak self] _ in guard let self else { return } _ = self.signalLock.withLock { self.signalSinks.removeValue(forKey: id) } } } } public func getVersion() async throws -> String { let req = makeRequest(method: "GetVersion") guard let reply = try await sendSync(req), xpc_get_type(reply) == XPC_TYPE_DICTIONARY else { throw Error.typeError } if let cstr = xpc_dictionary_get_string(reply, "version") { return String(cString: cstr) } throw Error.typeError } public func getConversations(limit: Int = 100, offset: Int = 0) async throws -> [Serialized.Conversation] { var args: [String: xpc_object_t] = [:] args["limit"] = xpcString(String(limit)) args["offset"] = xpcString(String(offset)) let req = makeRequest(method: "GetConversations", arguments: args) guard let reply = try await sendSync(req), xpc_get_type(reply) == XPC_TYPE_DICTIONARY else { return [] } guard let items = xpc_dictionary_get_value(reply, "conversations"), xpc_get_type(items) == XPC_TYPE_ARRAY else { return [] } var results: [Serialized.Conversation] = [] xpc_array_apply(items) { _, element in if xpc_get_type(element) == XPC_TYPE_DICTIONARY, let conv = Serialized.Conversation(xpc: element) { results.append(conv) } return true } return results } public func getMessages(conversationId: String, limit: Int = 100, offset: Int = 0) async throws -> [Serialized.Message] { var args: [String: xpc_object_t] = [:] args["conversation_id"] = xpcString(conversationId) args["limit"] = xpcString(String(limit)) args["offset"] = xpcString(String(offset)) let req = makeRequest(method: "GetMessages", arguments: args) guard let reply = try await sendSync(req), xpc_get_type(reply) == XPC_TYPE_DICTIONARY else { return [] } guard let items = xpc_dictionary_get_value(reply, "messages"), xpc_get_type(items) == XPC_TYPE_ARRAY else { return [] } var results: [Serialized.Message] = [] xpc_array_apply(items) { _, element in if xpc_get_type(element) == XPC_TYPE_DICTIONARY, let msg = Serialized.Message(xpc: element) { results.append(msg) } return true } return results } public func sendMessage(conversationId: String, message: String) async throws { var args: [String: xpc_object_t] = [:] args["conversation_id"] = xpcString(conversationId) args["text"] = xpcString(message) let req = makeRequest(method: "SendMessage", arguments: args) guard let reply = try await sendSync(req), xpc_get_type(reply) == XPC_TYPE_DICTIONARY else { throw Error.typeError } } // MARK: - Types enum Error: Swift.Error { case typeError case encodingError } enum Signal { case conversationsUpdated case messagesUpdated(conversationId: String) case attachmentDownloaded(attachmentId: String) case attachmentUploaded(uploadGuid: String, attachmentGuid: String) case updateStreamReconnected } } extension XPCClient { private func subscribeToSignals() async throws { let req = makeRequest(method: "SubscribeSignals") _ = try await sendSync(req) } private func makeRequest(method: String, arguments: [String: xpc_object_t]? = nil) -> xpc_object_t { let dict = xpc_dictionary_create(nil, nil, 0) xpc_dictionary_set_string(dict, "method", method) if let args = arguments { let argsDict = xpc_dictionary_create(nil, nil, 0) for (k, v) in args { k.withCString { cKey in xpc_dictionary_set_value(argsDict, cKey, v) } } xpc_dictionary_set_value(dict, "arguments", argsDict) } return dict } private func xpcString(_ s: String) -> xpc_object_t { return s.withCString { ptr in xpc_string_create(ptr) } } private func sendSync(_ request: xpc_object_t) async throws -> xpc_object_t? { try await withCheckedThrowingContinuation { continuation in xpc_connection_send_message_with_reply(connection, request, DispatchQueue.global(qos: .userInitiated)) { r in switch xpc_get_type(r) { case XPC_TYPE_ERROR: let error = xpc_dictionary_get_value(r, "error") if let error = error, let errorString = xpc_string_get_string_ptr(error) { print("XPC error: \(String(cString: errorString))") } continuation.resume(throwing: Error.typeError) case XPC_TYPE_DICTIONARY: continuation.resume(returning: r) default: continuation.resume(throwing: Error.typeError) } } } } private func handleIncomingXPCEvent(_ event: xpc_object_t) { let type = xpc_get_type(event) switch type { case XPC_TYPE_DICTIONARY: guard let namePtr = xpc_dictionary_get_string(event, "name") else { return } let name = String(cString: namePtr) var signal: Signal? if name == "ConversationsUpdated" { signal = .conversationsUpdated } else if name == "MessagesUpdated" { if let args = xpc_dictionary_get_value(event, "arguments"), xpc_get_type(args) == XPC_TYPE_DICTIONARY, let cidPtr = xpc_dictionary_get_string(args, "conversation_id") { signal = .messagesUpdated(conversationId: String(cString: cidPtr)) } } else if name == "AttachmentDownloadCompleted" { if let args = xpc_dictionary_get_value(event, "arguments"), xpc_get_type(args) == XPC_TYPE_DICTIONARY, let aidPtr = xpc_dictionary_get_string(args, "attachment_id") { signal = .attachmentDownloaded(attachmentId: String(cString: aidPtr)) } } else if name == "AttachmentUploadCompleted" { if let args = xpc_dictionary_get_value(event, "arguments"), xpc_get_type(args) == XPC_TYPE_DICTIONARY, let ugPtr = xpc_dictionary_get_string(args, "upload_guid"), let agPtr = xpc_dictionary_get_string(args, "attachment_guid") { signal = .attachmentUploaded(uploadGuid: String(cString: ugPtr), attachmentGuid: String(cString: agPtr)) } } else if name == "UpdateStreamReconnected" { signal = .updateStreamReconnected } if let signal { signalLock.lock() let sinks = signalSinks.values signalLock.unlock() for sink in sinks { sink(signal) } } case XPC_TYPE_ERROR: if let errStr = xpc_string_get_string_ptr(event) { print("XPC event error: \(String(cString: errStr))") } default: break } } }