Private
Public Access
1
0

Handle xpc connection interruptions

This commit is contained in:
2025-08-29 22:30:37 -06:00
parent 7992c03fb6
commit 8257b8dbd6
2 changed files with 76 additions and 20 deletions

Binary file not shown.

View File

@@ -11,10 +11,12 @@ import XPC
private let serviceName = "net.buzzert.kordophonecd" private let serviceName = "net.buzzert.kordophonecd"
final class XPCClient final class XPCClient
{ {
private let connection: xpc_connection_t private var connection: xpc_connection_t?
private let connectionQueue = DispatchQueue(label: "net.buzzert.kordophone.xpc.connection")
private var isReconnecting: Bool = false
private var reconnectAttempt: Int = 0
private let signalLock = NSLock() private let signalLock = NSLock()
private var signalSinks: [UUID: (Signal) -> Void] = [:] private var signalSinks: [UUID: (Signal) -> Void] = [:]
private var didSubscribeSignals: Bool = false private var didSubscribeSignals: Bool = false
@@ -28,21 +30,14 @@ final class XPCClient
return service return service
} catch { } catch {
fatalError("Unable to register agent: \(error)") print("Unable to register agent: \(error)")
fatalError()
} }
}() }()
init() { init() {
_ = Self.appService _ = Self.appService
connect()
self.connection = xpc_connection_create_mach_service(serviceName, nil, 0)
let handler: xpc_handler_t = { [weak self] event in
self?.handleIncomingXPCEvent(event)
}
xpc_connection_set_event_handler(connection, handler)
xpc_connection_resume(connection)
} }
public func eventStream() -> AsyncStream<Signal> { public func eventStream() -> AsyncStream<Signal> {
@@ -197,6 +192,7 @@ final class XPCClient
case typeError case typeError
case encodingError case encodingError
case badFileHandle case badFileHandle
case connectionError
} }
enum Signal enum Signal
@@ -211,6 +207,49 @@ final class XPCClient
extension XPCClient extension XPCClient
{ {
private func connect() {
connectionQueue.async { [weak self] in
guard let self else { return }
if let existing = connection {
xpc_connection_cancel(existing)
connection = nil
}
let newConn = xpc_connection_create_mach_service(serviceName, nil, 0)
let handler: xpc_handler_t = { [weak self] event in
self?.handleIncomingXPCEvent(event)
}
xpc_connection_set_event_handler(newConn, handler)
xpc_connection_resume(newConn)
self.connection = newConn
self.isReconnecting = false
self.reconnectAttempt = 0
}
if didSubscribeSignals {
Task { try? await subscribeToSignals() }
}
}
private func scheduleReconnect() {
connectionQueue.async { [weak self] in
guard let self else { return }
if self.isReconnecting { return }
self.isReconnecting = true
let attempt = self.reconnectAttempt
self.reconnectAttempt += 1
let delaySeconds = min(pow(2.0, Double(attempt)), 30.0)
self.connectionQueue.asyncAfter(deadline: .now() + delaySeconds) { [weak self] in
self?.connect()
}
}
}
private func subscribeToSignals() async throws { private func subscribeToSignals() async throws {
let req = makeRequest(method: "SubscribeSignals") let req = makeRequest(method: "SubscribeSignals")
_ = try await sendSync(req) _ = try await sendSync(req)
@@ -237,15 +276,22 @@ extension XPCClient
private func sendSync(_ request: xpc_object_t) async throws -> xpc_object_t? { private func sendSync(_ request: xpc_object_t) async throws -> xpc_object_t? {
try await withCheckedThrowingContinuation { continuation in try await withCheckedThrowingContinuation { continuation in
xpc_connection_send_message_with_reply(connection, request, DispatchQueue.global(qos: .userInitiated)) { r in let conn: xpc_connection_t? = self.connectionQueue.sync { self.connection }
switch xpc_get_type(r) { guard let conn else {
case XPC_TYPE_ERROR: self.scheduleReconnect()
let error = xpc_dictionary_get_value(r, "error") continuation.resume(throwing: Error.connectionError)
if let error = error, let errorString = xpc_string_get_string_ptr(error) { return
print("XPC error: \(String(cString: errorString))")
} }
xpc_connection_send_message_with_reply(conn, request, DispatchQueue.global(qos: .userInitiated)) { r in
switch xpc_get_type(r) {
case XPC_TYPE_ERROR:
if r.isInterruptionError {
self.scheduleReconnect()
continuation.resume(throwing: Error.connectionError)
} else {
continuation.resume(throwing: Error.typeError) continuation.resume(throwing: Error.typeError)
}
case XPC_TYPE_DICTIONARY: case XPC_TYPE_DICTIONARY:
continuation.resume(returning: r) continuation.resume(returning: r)
@@ -295,8 +341,8 @@ extension XPCClient
} }
case XPC_TYPE_ERROR: case XPC_TYPE_ERROR:
if let errStr = xpc_string_get_string_ptr(event) { if event.isInterruptionError {
print("XPC event error: \(String(cString: errStr))") scheduleReconnect()
} }
default: default:
break break
@@ -304,4 +350,14 @@ extension XPCClient
} }
} }
extension xpc_object_t
{
var isInterruptionError: Bool {
return (
xpc_equal(self, XPC_ERROR_CONNECTION_INTERRUPTED) ||
xpc_equal(self, XPC_ERROR_CONNECTION_INVALID) ||
xpc_equal(self, XPC_ERROR_TERMINATION_IMMINENT)
)
}
}