From a93a77307149dc12e37a17f2003939b6a1ba3d82 Mon Sep 17 00:00:00 2001 From: James Magahern Date: Sun, 24 Aug 2025 15:28:33 -0700 Subject: [PATCH] xpc: Use reply port when replying to RPC messages --- Cargo.lock | 123 +-------- kordophoned/Cargo.toml | 3 +- .../include/net.buzzert.kordophonecd.plist | 2 +- kordophoned/src/xpc/agent.rs | 249 ++++++++++++------ kpcli/src/daemon/xpc.rs | 40 ++- 5 files changed, 205 insertions(+), 212 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f90bab8..1bd2904 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -745,16 +745,6 @@ dependencies = [ "futures-util", ] -[[package]] -name = "futures-async-runtime-preview" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33c03035be1dae627b7e05c6984acb1f2086043fde5249ae51604f1ff20ed037" -dependencies = [ - "futures-core-preview", - "futures-stable-preview", -] - [[package]] name = "futures-channel" version = "0.3.31" @@ -765,30 +755,12 @@ dependencies = [ "futures-sink", ] -[[package]] -name = "futures-channel-preview" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6f8aec6b0eb1d281843ec666fba2b71a49610181e3078fbef7a8cbed481821e" -dependencies = [ - "futures-core-preview", -] - [[package]] name = "futures-core" version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" -[[package]] -name = "futures-core-preview" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "098785413db44e5dbf3b1fc23c24039a9091bea5acb3eb0d293f386f18aff97d" -dependencies = [ - "either", -] - [[package]] name = "futures-executor" version = "0.3.31" @@ -800,35 +772,12 @@ dependencies = [ "futures-util", ] -[[package]] -name = "futures-executor-preview" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28ff61425699ca85de5c63c1f135278403518c3398bd15cf4b6fd1d21c9846e4" -dependencies = [ - "futures-channel-preview", - "futures-core-preview", - "futures-util-preview", - "lazy_static", - "num_cpus", -] - [[package]] name = "futures-io" version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" -[[package]] -name = "futures-io-preview" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aaa769a6ac904912c1557b4dcf85b93db2bc9ba57c349f9ce43870e49d67f8e1" -dependencies = [ - "futures-core-preview", - "iovec", -] - [[package]] name = "futures-macro" version = "0.3.31" @@ -840,49 +789,12 @@ dependencies = [ "syn", ] -[[package]] -name = "futures-preview" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4d575096a4e2cf458f309b5b7bce5c8aaad8e874b8d77f0aa26c08d7ac18f74" -dependencies = [ - "futures-async-runtime-preview", - "futures-channel-preview", - "futures-core-preview", - "futures-executor-preview", - "futures-io-preview", - "futures-sink-preview", - "futures-stable-preview", - "futures-util-preview", -] - [[package]] name = "futures-sink" version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" -[[package]] -name = "futures-sink-preview" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5dc4cdc628b934f18a11ba070d589655f68cfec031a16381b0e7784ff0e9cc18" -dependencies = [ - "either", - "futures-channel-preview", - "futures-core-preview", -] - -[[package]] -name = "futures-stable-preview" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a6ba960b8bbbc14a9a741cc8ad9c26aff44538ea14be021db905b43f33854da" -dependencies = [ - "futures-core-preview", - "futures-executor-preview", -] - [[package]] name = "futures-task" version = "0.3.31" @@ -907,19 +819,6 @@ dependencies = [ "slab", ] -[[package]] -name = "futures-util-preview" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b29aa737dba9e2e47a5dcd4d58ec7c7c2d5f78e8460f609f857bcf04163235e" -dependencies = [ - "either", - "futures-channel-preview", - "futures-core-preview", - "futures-io-preview", - "futures-sink-preview", -] - [[package]] name = "generic-array" version = "0.14.7" @@ -1144,15 +1043,6 @@ dependencies = [ "hashbrown", ] -[[package]] -name = "iovec" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e" -dependencies = [ - "libc", -] - [[package]] name = "is-terminal" version = "0.4.16" @@ -1270,6 +1160,7 @@ version = "1.0.0" dependencies = [ "anyhow", "async-trait", + "block", "chrono", "dbus", "dbus-codegen", @@ -1278,7 +1169,7 @@ dependencies = [ "dbus-tree", "directories", "env_logger 0.11.8", - "futures-preview", + "futures", "futures-util", "keyring", "kordophone", @@ -1556,16 +1447,6 @@ dependencies = [ "autocfg", ] -[[package]] -name = "num_cpus" -version = "1.17.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91df4bbde75afed763b708b7eee1e8e7651e02d97f6d5dd763e89367e957b23b" -dependencies = [ - "hermit-abi 0.5.0", - "libc", -] - [[package]] name = "object" version = "0.32.2" diff --git a/kordophoned/Cargo.toml b/kordophoned/Cargo.toml index 22b005f..dedfe31 100644 --- a/kordophoned/Cargo.toml +++ b/kordophoned/Cargo.toml @@ -35,7 +35,8 @@ dbus-crossroads = "0.5.1" # XPC (libxpc) interface for macOS IPC [target.'cfg(target_os = "macos")'.dependencies] -futures-preview = "=0.2.2" +block = "0.1.6" +futures = "0.3.31" xpc-connection = { git = "https://github.com/dfrankland/xpc-connection-rs.git", rev = "cd4fb3d", package = "xpc-connection" } xpc-connection-sys = { git = "https://github.com/dfrankland/xpc-connection-rs.git", rev = "cd4fb3d", package = "xpc-connection-sys" } serde = { version = "1.0", features = ["derive"] } diff --git a/kordophoned/include/net.buzzert.kordophonecd.plist b/kordophoned/include/net.buzzert.kordophonecd.plist index 976049b..df9c1a1 100644 --- a/kordophoned/include/net.buzzert.kordophonecd.plist +++ b/kordophoned/include/net.buzzert.kordophonecd.plist @@ -7,7 +7,7 @@ ProgramArguments - /Users/buzzert/src/kordophone-rs/target/debug/kordophoned + /Users/buzzert/src/kordophone/kordophone-rs/target/debug/kordophoned EnvironmentVariables diff --git a/kordophoned/src/xpc/agent.rs b/kordophoned/src/xpc/agent.rs index cd7b0b3..49079bb 100644 --- a/kordophoned/src/xpc/agent.rs +++ b/kordophoned/src/xpc/agent.rs @@ -1,16 +1,25 @@ use crate::xpc::interface::SERVICE_NAME; -use futures_util::StreamExt; use kordophoned::daemon::settings::Settings; use kordophoned::daemon::{events::Event, signals::Signal, DaemonResult}; use std::collections::HashMap; use std::ffi::CString; +use std::os::raw::c_char; +use std::ptr; use std::sync::Arc; use std::thread; -use tokio::sync::{broadcast, mpsc, oneshot, Mutex}; -use xpc_connection::{Message, MessageError, XpcClient, XpcListener}; +use tokio::sync::{mpsc, oneshot, Mutex}; +use xpc_connection::{message_to_xpc_object, xpc_object_to_message, Message, MessageError}; +use xpc_connection_sys as xpc_sys; static LOG_TARGET: &str = "xpc"; +/// Wrapper for raw XPC connection pointer to declare cross-thread usage. +/// Safety: libxpc connections are reference-counted and may be used to send from other threads. +#[derive(Copy, Clone)] +struct XpcConn(pub xpc_sys::xpc_connection_t); +unsafe impl Send for XpcConn {} +unsafe impl Sync for XpcConn {} + /// XPC IPC agent that forwards daemon events and signals over libxpc. #[derive(Clone)] pub struct XpcAgent { @@ -21,14 +30,15 @@ pub struct XpcAgent { impl XpcAgent { /// Create a new XPC agent with an event sink and signal receiver. pub fn new(event_sink: mpsc::Sender, signal_receiver: mpsc::Receiver) -> Self { - Self { - event_sink, - signal_receiver: Arc::new(Mutex::new(Some(signal_receiver))), - } + Self { event_sink, signal_receiver: Arc::new(Mutex::new(Some(signal_receiver))) } } /// Run the XPC agent and host the XPC service. Implements generic dispatch. pub async fn run(self) { + use block::ConcreteBlock; + use std::ops::Deref; + use std::sync::Mutex as StdMutex; + log::info!(target: LOG_TARGET, "XPCAgent running"); // Construct the Mach service name without a trailing NUL for CString. @@ -47,47 +57,137 @@ impl XpcAgent { service_name ); - // Broadcast channel for signals to all connected clients - let (signal_tx, _signal_rx) = broadcast::channel::(64); + // Multi-thread runtime to drive async dispatch from XPC event handlers. + let rt = match tokio::runtime::Runtime::new() { + Ok(rt) => Arc::new(rt), + Err(e) => { + log::error!(target: LOG_TARGET, "Failed to create Tokio runtime: {}", e); + return; + } + }; - // Spawn a single distributor task that forwards daemon signals to broadcast + // Shared list of connected clients for signal fanout + let connections: Arc>> = Arc::new(StdMutex::new(Vec::new())); + // Forward daemon signals to all connected clients { let receiver_arc = self.signal_receiver.clone(); - let signal_tx_clone = signal_tx.clone(); - tokio::spawn(async move { + let conns = connections.clone(); + rt.spawn(async move { let mut receiver = receiver_arc .lock() .await .take() .expect("Signal receiver already taken"); - while let Some(signal) = receiver.recv().await { - let _ = signal_tx_clone.send(signal); + log::info!(target: LOG_TARGET, "Broadcasting signal: {:?}", signal); + let msg = signal_to_message(signal); + let xobj = unsafe { message_to_xpc_object(msg) }; + let list = conns.lock().unwrap(); + log::info!(target: LOG_TARGET, "Active XPC clients: {}", list.len()); + for c in list.iter() { + log::info!(target: LOG_TARGET, "Sending signal to client"); + unsafe { xpc_sys::xpc_connection_send_message(c.0, xobj) }; + } + unsafe { xpc_sys::xpc_release(xobj) }; } }); } - let mut listener = XpcListener::listen(&mach_port_name); + // Create the XPC Mach service listener. + let service = unsafe { + xpc_sys::xpc_connection_create_mach_service( + mach_port_name.as_ptr(), + ptr::null_mut(), + xpc_sys::XPC_CONNECTION_MACH_SERVICE_LISTENER as u64, + ) + }; - while let Some(client) = listener.next().await { - let agent = self.clone(); - let signal_rx = signal_tx.subscribe(); - thread::spawn(move || { - let rt = match tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - { - Ok(rt) => rt, - Err(e) => { - log::error!(target: LOG_TARGET, "Failed to build runtime for client: {}", e); - return; - } - }; - rt.block_on(handle_client(agent, client, signal_rx)); - }); + // Event handler for the service: accepts new client connections. + let agent = self.clone(); + let rt_accept = rt.clone(); + let conns_accept = connections.clone(); + let service_handler = ConcreteBlock::new(move |event: xpc_sys::xpc_object_t| { + unsafe { + // Treat incoming events as connections; ignore others + // We detect connections by trying to set a per-connection handler. + let client = event as xpc_sys::xpc_connection_t; + log::info!(target: LOG_TARGET, "New XPC connection accepted"); + // Do not register for signals until the client explicitly subscribes + + // Per-connection handler + let agent_conn = agent.clone(); + let rt_conn = rt_accept.clone(); + let conns_for_handler = conns_accept.clone(); + let conn_handler = ConcreteBlock::new(move |msg: xpc_sys::xpc_object_t| { + unsafe { + // Convert to higher-level Message for type matching + match xpc_object_to_message(msg) { + Message::Dictionary(map) => { + // Trace inbound method + let method = dict_get_str(&map, "method").or_else(|| dict_get_str(&map, "type")).unwrap_or_else(|| "".to_string()); + log::info!(target: LOG_TARGET, "XPC request received: {}", method); + let response = rt_conn.block_on(dispatch(&agent_conn, &conns_for_handler, client, &map)); + let reply = xpc_sys::xpc_dictionary_create_reply(msg); + if !reply.is_null() { + let payload = message_to_xpc_object(response); + let apply_block = ConcreteBlock::new(move |key: *const c_char, value: xpc_sys::xpc_object_t| { + xpc_sys::xpc_dictionary_set_value(reply, key, value); + }) + .copy(); + xpc_sys::xpc_dictionary_apply(payload, apply_block.deref() as *const _ as *mut _); + xpc_sys::xpc_connection_send_message(client, reply); + xpc_sys::xpc_release(payload); + xpc_sys::xpc_release(reply); + log::info!(target: LOG_TARGET, "XPC reply sent for method: {}", method); + } else { + log::warn!(target: LOG_TARGET, "No reply port for method: {}", method); + } + } + Message::Error(e) => { + match e { + MessageError::ConnectionInvalid => { + // Normal for one-shot RPC connections; keep logs quiet + let mut list = conns_for_handler.lock().unwrap(); + let before = list.len(); + list.retain(|c| c.0 != client); + let after = list.len(); + if after < before { + log::info!(target: LOG_TARGET, "Removed closed XPC client from subscribers ({} -> {})", before, after); + } else { + log::debug!(target: LOG_TARGET, "XPC connection closed (no subscription)"); + } + } + other => { + log::warn!(target: LOG_TARGET, "XPC error event: {:?}", other); + } + } + } + _ => {} + } + } + }) + .copy(); + + xpc_sys::xpc_connection_set_event_handler( + client, + conn_handler.deref() as *const _ as *mut _, + ); + xpc_sys::xpc_connection_resume(client); + } + } + ) + .copy(); + + unsafe { + xpc_sys::xpc_connection_set_event_handler( + service, + service_handler.deref() as *const _ as *mut _, + ); + xpc_sys::xpc_connection_resume(service); } - log::info!(target: LOG_TARGET, "XPC listener shutting down"); + // Keep this future alive forever. + futures_util::future::pending::<()>().await; } /// Send an event to the daemon and await its reply. @@ -128,6 +228,8 @@ fn get_dictionary_field<'a>( } fn make_error_reply(code: &str, message: &str) -> Message { + log::error!(target: LOG_TARGET, "XPC error: {code}: {message}"); + let mut reply: HashMap = HashMap::new(); reply.insert(cstr("type"), Message::String(cstr("Error"))); reply.insert(cstr("error"), Message::String(cstr(code))); @@ -172,16 +274,31 @@ fn make_ok_reply() -> Message { Message::Dictionary(reply) } -async fn dispatch(agent: &XpcAgent, root: &HashMap) -> Message { - // Standardized request: { method: String, arguments: Dictionary? } +/// Attach an optional request_id to a dictionary reply message. +fn attach_request_id(mut message: Message, request_id: Option) -> Message { + if let (Some(id), Message::Dictionary(ref mut m)) = (request_id, &mut message) { + dict_put_str(m, "request_id", &id); + } + message +} + +async fn dispatch( + agent: &XpcAgent, + subscribers: &std::sync::Mutex>, + current_client: xpc_sys::xpc_connection_t, + root: &HashMap, +) -> Message { + // Standardized request: { method: String, arguments: Dictionary?, request_id: String? } + let request_id = dict_get_str(root, "request_id"); + let method = match dict_get_str(root, "method").or_else(|| dict_get_str(root, "type")) { Some(m) => m, - None => return make_error_reply("InvalidRequest", "Missing method/type"), + None => return attach_request_id(make_error_reply("InvalidRequest", "Missing method/type"), request_id), }; let _arguments = get_dictionary_field(root, "arguments"); - match method.as_str() { + let mut response = match method.as_str() { // Example implemented method: GetVersion "GetVersion" => match agent.send_event(Event::GetVersion).await { Ok(version) => { @@ -493,12 +610,24 @@ async fn dispatch(agent: &XpcAgent, root: &HashMap) -> Message } } - // No-op used by clients to ensure the connection is established and subscribed - "SubscribeSignals" => make_ok_reply(), + // Subscribe and return immediately + "SubscribeSignals" => { + let mut list = subscribers.lock().unwrap(); + // Avoid duplicates + if !list.iter().any(|c| c.0 == current_client) { + list.push(XpcConn(current_client)); + log::info!(target: LOG_TARGET, "Client subscribed to signals (total subscribers: {})", list.len()); + } + make_ok_reply() + }, // Unknown method fallback other => make_error_reply("UnknownMethod", other), - } + }; + + // Echo request_id back (if present) so clients can correlate replies + response = attach_request_id(response, request_id); + response } fn signal_to_message(signal: Signal) -> Message { @@ -531,45 +660,5 @@ fn signal_to_message(signal: Signal) -> Message { Message::Dictionary(root) } -async fn handle_client( - agent: XpcAgent, - mut client: XpcClient, - mut signal_rx: broadcast::Receiver, -) { - log::info!(target: LOG_TARGET, "New XPC connection"); +// legacy async client handler removed in reply-port implementation - loop { - tokio::select! { - maybe_msg = client.next() => { - match maybe_msg { - Some(Message::Error(MessageError::ConnectionInterrupted)) => { - log::warn!(target: LOG_TARGET, "XPC connection interrupted"); - } - Some(Message::Dictionary(map)) => { - let response = dispatch(&agent, &map).await; - client.send_message(response); - } - Some(other) => { - log::info!(target: LOG_TARGET, "Echoing message: {:?}", other); - client.send_message(other); - } - None => break, - } - } - recv = signal_rx.recv() => { - match recv { - Ok(signal) => { - let msg = signal_to_message(signal); - client.send_message(msg); - } - Err(broadcast::error::RecvError::Closed) => break, - Err(broadcast::error::RecvError::Lagged(_)) => { - log::warn!(target: LOG_TARGET, "Lagged behind on signals; dropping some events for this client"); - } - } - } - } - } - - log::info!(target: LOG_TARGET, "XPC connection closed"); -} diff --git a/kpcli/src/daemon/xpc.rs b/kpcli/src/daemon/xpc.rs index d67d9c2..bada46e 100644 --- a/kpcli/src/daemon/xpc.rs +++ b/kpcli/src/daemon/xpc.rs @@ -78,6 +78,23 @@ impl XPCClient { xpc_release(xpc_object); } } + + pub fn send_message_with_reply(&self, message: Message) -> Message { + use xpc_connection::message_to_xpc_object; + use xpc_connection::xpc_object_to_message; + use xpc_connection_sys::{xpc_connection_send_message_with_reply_sync, xpc_release}; + + unsafe { + let xobj = message_to_xpc_object(message); + let reply = xpc_connection_send_message_with_reply_sync(self.connection, xobj); + xpc_release(xobj); + let msg = xpc_object_to_message(reply); + if !reply.is_null() { + xpc_release(reply); + } + msg + } + } } impl Drop for XPCClient { @@ -147,12 +164,10 @@ impl XpcDaemonInterface { args: Option>, ) -> anyhow::Result> { let request = Self::build_request(method, args); - client.send_message(Message::Dictionary(request)); - - match client.next().await { - Some(Message::Dictionary(map)) => Ok(map), - Some(other) => Err(anyhow::anyhow!("Unexpected XPC reply: {:?}", other)), - None => Err(anyhow::anyhow!("No reply received from XPC daemon")), + let reply = client.send_message_with_reply(Message::Dictionary(request)); + match reply { + Message::Dictionary(map) => Ok(map), + other => Err(anyhow::anyhow!("Unexpected XPC reply: {:?}", other)), } } @@ -384,7 +399,8 @@ impl DaemonInterface for XpcDaemonInterface { let mach_port_name = Self::build_service_name()?; let mut client = XPCClient::connect(&mach_port_name); - // Send a subscription/warm-up message so the server loop starts selecting for this client + // Subscribe to begin receiving signals on this connection + eprintln!("[kpcli] Sending SubscribeSignals"); client.send_message(Message::Dictionary(Self::build_request( "SubscribeSignals", None, @@ -394,6 +410,7 @@ impl DaemonInterface for XpcDaemonInterface { while let Some(msg) = client.next().await { match msg { Message::Dictionary(map) => { + eprintln!("[kpcli] Received signal dictionary"); let name_key = Self::key("name"); let args_key = Self::key("arguments"); let name = match map.get(&name_key) { @@ -476,8 +493,13 @@ impl DaemonInterface for XpcDaemonInterface { _ => {} } } - Message::Error(xpc_connection::MessageError::ConnectionInvalid) => break, - _ => {} + Message::Error(xpc_connection::MessageError::ConnectionInvalid) => { + eprintln!("[kpcli] XPC connection invalid"); + break + } + other => { + eprintln!("[kpcli] Unexpected XPC message: {:?}", other); + } } } Ok(())