diff --git a/kordophone2/Daemon/kordophoned b/kordophone2/Daemon/kordophoned index 8d02d59..2276fba 100755 Binary files a/kordophone2/Daemon/kordophoned and b/kordophone2/Daemon/kordophoned differ diff --git a/kordophone2/XPC/XPCClient.swift b/kordophone2/XPC/XPCClient.swift index 068b298..8725944 100644 --- a/kordophone2/XPC/XPCClient.swift +++ b/kordophone2/XPC/XPCClient.swift @@ -11,10 +11,12 @@ import XPC private let serviceName = "net.buzzert.kordophonecd" - final class XPCClient { - private let connection: xpc_connection_t + private var connection: xpc_connection_t? + private let connectionQueue = DispatchQueue(label: "net.buzzert.kordophone.xpc.connection") + private var isReconnecting: Bool = false + private var reconnectAttempt: Int = 0 private let signalLock = NSLock() private var signalSinks: [UUID: (Signal) -> Void] = [:] private var didSubscribeSignals: Bool = false @@ -28,21 +30,14 @@ final class XPCClient return service } catch { - fatalError("Unable to register agent: \(error)") + print("Unable to register agent: \(error)") + fatalError() } }() init() { _ = Self.appService - - self.connection = xpc_connection_create_mach_service(serviceName, nil, 0) - - let handler: xpc_handler_t = { [weak self] event in - self?.handleIncomingXPCEvent(event) - } - - xpc_connection_set_event_handler(connection, handler) - xpc_connection_resume(connection) + connect() } public func eventStream() -> AsyncStream { @@ -197,6 +192,7 @@ final class XPCClient case typeError case encodingError case badFileHandle + case connectionError } enum Signal @@ -211,6 +207,49 @@ final class XPCClient extension XPCClient { + private func connect() { + connectionQueue.async { [weak self] in + guard let self else { return } + + if let existing = connection { + xpc_connection_cancel(existing) + connection = nil + } + + let newConn = xpc_connection_create_mach_service(serviceName, nil, 0) + let handler: xpc_handler_t = { [weak self] event in + self?.handleIncomingXPCEvent(event) + } + xpc_connection_set_event_handler(newConn, handler) + xpc_connection_resume(newConn) + + self.connection = newConn + self.isReconnecting = false + self.reconnectAttempt = 0 + } + + if didSubscribeSignals { + Task { try? await subscribeToSignals() } + } + } + + private func scheduleReconnect() { + connectionQueue.async { [weak self] in + guard let self else { return } + + if self.isReconnecting { return } + self.isReconnecting = true + + let attempt = self.reconnectAttempt + self.reconnectAttempt += 1 + + let delaySeconds = min(pow(2.0, Double(attempt)), 30.0) + self.connectionQueue.asyncAfter(deadline: .now() + delaySeconds) { [weak self] in + self?.connect() + } + } + } + private func subscribeToSignals() async throws { let req = makeRequest(method: "SubscribeSignals") _ = try await sendSync(req) @@ -237,15 +276,22 @@ extension XPCClient private func sendSync(_ request: xpc_object_t) async throws -> xpc_object_t? { try await withCheckedThrowingContinuation { continuation in - xpc_connection_send_message_with_reply(connection, request, DispatchQueue.global(qos: .userInitiated)) { r in + let conn: xpc_connection_t? = self.connectionQueue.sync { self.connection } + guard let conn else { + self.scheduleReconnect() + continuation.resume(throwing: Error.connectionError) + return + } + + xpc_connection_send_message_with_reply(conn, request, DispatchQueue.global(qos: .userInitiated)) { r in switch xpc_get_type(r) { case XPC_TYPE_ERROR: - let error = xpc_dictionary_get_value(r, "error") - if let error = error, let errorString = xpc_string_get_string_ptr(error) { - print("XPC error: \(String(cString: errorString))") + if r.isInterruptionError { + self.scheduleReconnect() + continuation.resume(throwing: Error.connectionError) + } else { + continuation.resume(throwing: Error.typeError) } - - continuation.resume(throwing: Error.typeError) case XPC_TYPE_DICTIONARY: continuation.resume(returning: r) @@ -295,8 +341,8 @@ extension XPCClient } case XPC_TYPE_ERROR: - if let errStr = xpc_string_get_string_ptr(event) { - print("XPC event error: \(String(cString: errStr))") + if event.isInterruptionError { + scheduleReconnect() } default: break @@ -304,4 +350,14 @@ extension XPCClient } } +extension xpc_object_t +{ + var isInterruptionError: Bool { + return ( + xpc_equal(self, XPC_ERROR_CONNECTION_INTERRUPTED) || + xpc_equal(self, XPC_ERROR_CONNECTION_INVALID) || + xpc_equal(self, XPC_ERROR_TERMINATION_IMMINENT) + ) + } +}