// // XPCClient.swift // kordophone2 // // Created by James Magahern on 8/24/25. // import Foundation import ServiceManagement import XPC private let serviceName = "net.buzzert.kordophonecd" final class XPCClient { private var connection: xpc_connection_t? private let connectionQueue = DispatchQueue(label: "net.buzzert.kordophone.xpc.connection") private var isReconnecting: Bool = false private var reconnectAttempt: Int = 0 private let signalLock = NSLock() private var signalSinks: [UUID: (Signal) -> Void] = [:] private var didSubscribeSignals: Bool = false static let appService: SMAppService = { do { let service = SMAppService.agent(plistName: "net.buzzert.kordophonecd.plist") if service.status != .enabled { try service.register() } return service } catch { print("Unable to register agent: \(error)") fatalError() } }() init() { _ = Self.appService connect() } public func eventStream() -> AsyncStream { // Auto-subscribe on first stream creation if !didSubscribeSignals { didSubscribeSignals = true Task { try? await subscribeToSignals() } } return AsyncStream { continuation in let id = UUID() signalLock.withLock { signalSinks[id] = { signal in continuation.yield(signal) } } continuation.onTermination = { [weak self] _ in guard let self else { return } _ = self.signalLock.withLock { self.signalSinks.removeValue(forKey: id) } } } } public func getVersion() async throws -> String { let req = makeRequest(method: "GetVersion") guard let reply = try await sendSync(req), xpc_get_type(reply) == XPC_TYPE_DICTIONARY else { throw Error.typeError } if let cstr = xpc_dictionary_get_string(reply, "version") { return String(cString: cstr) } throw Error.typeError } public func getConversations(limit: Int = 100, offset: Int = 0) async throws -> [Serialized.Conversation] { var args: [String: xpc_object_t] = [:] args["limit"] = xpcString(String(limit)) args["offset"] = xpcString(String(offset)) let req = makeRequest(method: "GetConversations", arguments: args) guard let reply = try await sendSync(req), xpc_get_type(reply) == XPC_TYPE_DICTIONARY else { return [] } guard let items = xpc_dictionary_get_value(reply, "conversations"), xpc_get_type(items) == XPC_TYPE_ARRAY else { return [] } var results: [Serialized.Conversation] = [] xpc_array_apply(items) { _, element in if xpc_get_type(element) == XPC_TYPE_DICTIONARY, let conv = Serialized.Conversation(xpc: element) { results.append(conv) } return true } return results } public func syncConversation(conversationId: String) async throws { let req = makeRequest(method: "SyncConversation", arguments: ["conversation_id": xpcString(conversationId)]) _ = try await sendSync(req) } public func syncConversationList() async throws { let req = makeRequest(method: "SyncConversationList") _ = try await sendSync(req) } public func markConversationAsRead(conversationId: String) async throws { let req = makeRequest(method: "MarkConversationAsRead", arguments: ["conversation_id": xpcString(conversationId)]) _ = try await sendSync(req) } public func getMessages(conversationId: String, limit: Int = 100, offset: Int = 0) async throws -> [Serialized.Message] { var args: [String: xpc_object_t] = [:] args["conversation_id"] = xpcString(conversationId) args["limit"] = xpcString(String(limit)) args["offset"] = xpcString(String(offset)) let req = makeRequest(method: "GetMessages", arguments: args) guard let reply = try await sendSync(req), xpc_get_type(reply) == XPC_TYPE_DICTIONARY else { return [] } guard let items = xpc_dictionary_get_value(reply, "messages"), xpc_get_type(items) == XPC_TYPE_ARRAY else { return [] } var results: [Serialized.Message] = [] xpc_array_apply(items) { _, element in if xpc_get_type(element) == XPC_TYPE_DICTIONARY, let msg = Serialized.Message(xpc: element) { results.append(msg) } return true } return results } public func sendMessage(conversationId: String, message: String, transferGuids: Set) async throws { var args: [String: xpc_object_t] = [:] args["conversation_id"] = xpcString(conversationId) args["text"] = xpcString(message) if !transferGuids.isEmpty { args["attachment_guids"] = xpcStringArray(transferGuids) } let req = makeRequest(method: "SendMessage", arguments: args) guard let reply = try await sendSync(req), xpc_get_type(reply) == XPC_TYPE_DICTIONARY else { throw Error.typeError } } public func getAttachmentInfo(attachmentId: String) async throws -> AttachmentInfo { var args: [String: xpc_object_t] = [:] args["attachment_id"] = xpcString(attachmentId) let req = makeRequest(method: "GetAttachmentInfo", arguments: args) guard let reply = try await sendSync(req), xpc_get_type(reply) == XPC_TYPE_DICTIONARY else { throw Error.typeError } return AttachmentInfo( path: reply["path"] ?? "", previewPath: reply["preview_path"] ?? "", isDownloaded: reply["is_downloaded"] ?? false, isPreviewDownloaded: reply["is_preview_downloaded"] ?? false ) } public func downloadAttachment(attachmentId: String, preview: Bool, awaitCompletion: Bool = false) async throws { var args: [String: xpc_object_t] = [:] args["attachment_id"] = xpcString(attachmentId) args["preview"] = xpcString(preview ? "true" : "false") let req = makeRequest(method: "DownloadAttachment", arguments: args) _ = try await sendSync(req) if awaitCompletion { // Wait for downloaded event let _ = await eventStream().first { $0 == .attachmentDownloaded(attachmentId: attachmentId) } } } public func uploadAttachment(path: String) async throws -> String { var args: [String: xpc_object_t] = [:] args["path"] = xpcString(path) let req = makeRequest(method: "UploadAttachment", arguments: args) guard let reply = try await sendSync(req), xpc_get_type(reply) == XPC_TYPE_DICTIONARY else { throw Error.typeError } guard let guid: String = reply["upload_guid"] else { throw Error.encodingError } return guid } public func openAttachmentFileHandle(attachmentId: String, preview: Bool) async throws -> FileHandle { var args: [String: xpc_object_t] = [:] args["attachment_id"] = xpcString(attachmentId) args["preview"] = xpcString(preview ? "true" : "false") let req = makeRequest(method: "OpenAttachmentFd", arguments: args) guard let reply = try await sendSync(req), xpc_get_type(reply) == XPC_TYPE_DICTIONARY else { throw Error.typeError } let fd = xpc_dictionary_dup_fd(reply, "fd") let fileHandler = FileHandle(fileDescriptor: fd, closeOnDealloc: true) if fd < 0 { throw Error.badFileHandle } return fileHandler } public func getSettings() async throws -> Serialized.Settings { let req = makeRequest(method: "GetAllSettings") guard let reply = try await sendSync(req), xpc_get_type(reply) == XPC_TYPE_DICTIONARY else { throw Error.typeError } return Serialized.Settings.fromXPC(reply) ?? Serialized.Settings(serverUrl: "", username: "") } public func setSettings(settings: Serialized.Settings) async throws { let req = makeRequest( method: "UpdateSettings", arguments: [ "server_url": xpcString(settings.serverUrl), "username": xpcString(settings.username), ] ) _ = try await sendSync(req) } // MARK: - Types struct AttachmentInfo: Decodable { let path: String let previewPath: String let isDownloaded: Bool let isPreviewDownloaded: Bool } enum Error: Swift.Error { case typeError case encodingError case badFileHandle case connectionError } enum Signal: Equatable { case conversationsUpdated case messagesUpdated(conversationId: String) case attachmentDownloaded(attachmentId: String) case attachmentUploaded(uploadGuid: String, attachmentGuid: String) case updateStreamReconnected } } extension XPCClient { private func connect() { connectionQueue.async { [weak self] in guard let self else { return } if let existing = connection { xpc_connection_cancel(existing) connection = nil } let newConn = xpc_connection_create_mach_service(serviceName, nil, 0) let handler: xpc_handler_t = { [weak self] event in self?.handleIncomingXPCEvent(event) } xpc_connection_set_event_handler(newConn, handler) xpc_connection_resume(newConn) self.connection = newConn self.isReconnecting = false self.reconnectAttempt = 0 } if didSubscribeSignals { Task { try? await subscribeToSignals() } } } private func scheduleReconnect() { connectionQueue.async { [weak self] in guard let self else { return } if self.isReconnecting { return } self.isReconnecting = true let attempt = self.reconnectAttempt self.reconnectAttempt += 1 let delaySeconds = min(pow(2.0, Double(attempt)), 30.0) self.connectionQueue.asyncAfter(deadline: .now() + delaySeconds) { [weak self] in self?.connect() } } } private func subscribeToSignals() async throws { let req = makeRequest(method: "SubscribeSignals") _ = try await sendSync(req) } private func makeRequest(method: String, arguments: [String: xpc_object_t]? = nil) -> xpc_object_t { let dict = xpc_dictionary_create(nil, nil, 0) xpc_dictionary_set_string(dict, "method", method) if let args = arguments { let argsDict = xpc_dictionary_create(nil, nil, 0) for (k, v) in args { k.withCString { cKey in xpc_dictionary_set_value(argsDict, cKey, v) } } xpc_dictionary_set_value(dict, "arguments", argsDict) } return dict } private func xpcString(_ s: String) -> xpc_object_t { return s.withCString { ptr in xpc_string_create(ptr) } } private func xpcStringArray(_ set: Set) -> xpc_object_t { let array = xpc_array_create(nil, 0) for str in set { xpc_array_append_value(array, xpcString(str)) } return array } private func sendSync(_ request: xpc_object_t) async throws -> xpc_object_t? { try await withCheckedThrowingContinuation { continuation in let conn: xpc_connection_t? = self.connectionQueue.sync { self.connection } guard let conn else { self.scheduleReconnect() continuation.resume(throwing: Error.connectionError) return } xpc_connection_send_message_with_reply(conn, request, DispatchQueue.global(qos: .userInitiated)) { r in switch xpc_get_type(r) { case XPC_TYPE_ERROR: if r.isInterruptionError { self.scheduleReconnect() continuation.resume(throwing: Error.connectionError) } else { continuation.resume(throwing: Error.typeError) } case XPC_TYPE_DICTIONARY: continuation.resume(returning: r) default: continuation.resume(throwing: Error.typeError) } } } } private func handleIncomingXPCEvent(_ event: xpc_object_t) { switch xpc_get_type(event) { case XPC_TYPE_DICTIONARY: guard let eventDict = XPCDictionary(event), let name: String = eventDict["name"] else { return } let args = eventDict.object("arguments").flatMap { XPCDictionary($0) } let signal: Signal? = { switch name { case "ConversationsUpdated": return .conversationsUpdated case "MessagesUpdated": if let args, let cid: String = args["conversation_id"] { return .messagesUpdated(conversationId: cid) } return nil case "AttachmentDownloadCompleted": if let args, let aid: String = args["attachment_id"] { return .attachmentDownloaded(attachmentId: aid) } return nil case "AttachmentUploadCompleted": if let args, let uploadGuid: String = args["upload_guid"], let attachmentGuid: String = args["attachment_guid"] { return .attachmentUploaded(uploadGuid: uploadGuid, attachmentGuid: attachmentGuid) } return nil case "UpdateStreamReconnected": return .updateStreamReconnected default: return nil } }() if let signal { signalLock.lock() let sinks = signalSinks.values signalLock.unlock() for sink in sinks { sink(signal) } } case XPC_TYPE_ERROR: if event.isInterruptionError { scheduleReconnect() } default: break } } } extension xpc_object_t { var isInterruptionError: Bool { return ( xpc_equal(self, XPC_ERROR_CONNECTION_INTERRUPTED) || xpc_equal(self, XPC_ERROR_CONNECTION_INVALID) || xpc_equal(self, XPC_ERROR_TERMINATION_IMMINENT) ) } }