From 8ff95f4bf9296e4774499dd91836435d5f4d3a98 Mon Sep 17 00:00:00 2001 From: James Magahern Date: Sat, 23 Aug 2025 19:24:42 -0700 Subject: [PATCH] xpc: generic interface for dispatching methods --- .../include/net.buzzert.kordophonecd.plist | 8 +- kordophoned/src/xpc/agent.rs | 133 ++++++++++-------- kordophoned/src/xpc/endpoint.rs | 27 ---- kordophoned/src/xpc/mod.rs | 1 - 4 files changed, 81 insertions(+), 88 deletions(-) delete mode 100644 kordophoned/src/xpc/endpoint.rs diff --git a/kordophoned/include/net.buzzert.kordophonecd.plist b/kordophoned/include/net.buzzert.kordophonecd.plist index 69617ae..976049b 100644 --- a/kordophoned/include/net.buzzert.kordophonecd.plist +++ b/kordophoned/include/net.buzzert.kordophonecd.plist @@ -10,6 +10,12 @@ /Users/buzzert/src/kordophone-rs/target/debug/kordophoned + EnvironmentVariables + + RUST_LOG + info + + MachServices net.buzzert.kordophonecd @@ -26,4 +32,4 @@ StandardErrorPath /tmp/kordophoned.err.log - \ No newline at end of file + diff --git a/kordophoned/src/xpc/agent.rs b/kordophoned/src/xpc/agent.rs index 3a1209d..a069cfa 100644 --- a/kordophoned/src/xpc/agent.rs +++ b/kordophoned/src/xpc/agent.rs @@ -5,6 +5,7 @@ use std::collections::HashMap; use std::ffi::CString; use std::sync::Arc; use tokio::sync::{mpsc, oneshot, Mutex}; +use std::thread; use xpc_connection::{Message, MessageError, XpcClient, XpcListener}; @@ -26,7 +27,7 @@ impl XpcAgent { } } - /// Run the XPC agent and host the XPC service. Implements `GetVersion`. + /// Run the XPC agent and host the XPC service. Implements generic dispatch. pub async fn run(self) { log::info!(target: LOG_TARGET, "XPCAgent running"); @@ -49,7 +50,20 @@ impl XpcAgent { let mut listener = XpcListener::listen(&mach_port_name); while let Some(client) = listener.next().await { - tokio::spawn(handle_client(client)); + let agent = self.clone(); + thread::spawn(move || { + let rt = match tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + { + Ok(rt) => rt, + Err(e) => { + log::error!(target: LOG_TARGET, "Failed to build runtime for client: {}", e); + return; + } + }; + rt.block_on(handle_client(agent, client)); + }); } log::info!(target: LOG_TARGET, "XPC listener shutting down"); @@ -69,7 +83,62 @@ impl XpcAgent { } } -async fn handle_client(mut client: XpcClient) { +fn cstr(s: &str) -> CString { CString::new(s).unwrap_or_else(|_| CString::new("").unwrap()) } + +fn get_string_field(map: &HashMap, key: &str) -> Option { + let k = CString::new(key).ok()?; + map.get(&k).and_then(|v| match v { + Message::String(s) => Some(s.to_string_lossy().into_owned()), + _ => None, + }) +} + +fn get_dictionary_field<'a>(map: &'a HashMap, key: &str) -> Option<&'a HashMap> { + let k = CString::new(key).ok()?; + map.get(&k).and_then(|v| match v { + Message::Dictionary(d) => Some(d), + _ => None, + }) +} + +fn make_error_reply(code: &str, message: &str) -> Message { + let mut reply: HashMap = HashMap::new(); + reply.insert(cstr("type"), Message::String(cstr("Error"))); + reply.insert(cstr("error"), Message::String(cstr(code))); + reply.insert(cstr("message"), Message::String(cstr(message))); + + Message::Dictionary(reply) +} + +async fn dispatch(agent: &XpcAgent, root: &HashMap) -> Message { + // Standardized request: { method: String, arguments: Dictionary? } + let method = match get_string_field(root, "method").or_else(|| get_string_field(root, "type")) { + Some(m) => m, + None => return make_error_reply("InvalidRequest", "Missing method/type"), + }; + + let _arguments = get_dictionary_field(root, "arguments"); + + match method.as_str() { + // Example implemented method: GetVersion + "GetVersion" => { + match agent.send_event(Event::GetVersion).await { + Ok(version) => { + let mut reply: HashMap = HashMap::new(); + reply.insert(cstr("type"), Message::String(cstr("GetVersionResponse"))); + reply.insert(cstr("version"), Message::String(cstr(&version))); + Message::Dictionary(reply) + } + Err(e) => make_error_reply("DaemonError", &format!("{}", e)), + } + } + + // Unknown method fallback + other => make_error_reply("UnknownMethod", other), + } +} + +async fn handle_client(agent: XpcAgent, mut client: XpcClient) { log::info!(target: LOG_TARGET, "New XPC connection"); while let Some(message) = client.next().await { @@ -78,62 +147,8 @@ async fn handle_client(mut client: XpcClient) { log::warn!(target: LOG_TARGET, "XPC connection interrupted"); } Message::Dictionary(map) => { - // Try keys "method" or "type" to identify the call. - let method_key = CString::new("method").unwrap(); - let type_key = CString::new("type").unwrap(); - - let maybe_method = map - .get(&method_key) - .or_else(|| map.get(&type_key)) - .and_then(|v| match v { - Message::String(s) => Some(s.to_string_lossy().into_owned()), - _ => None, - }); - - match maybe_method.as_deref() { - Some("GetVersion") => { - let mut reply: HashMap = HashMap::new(); - reply.insert( - CString::new("type").unwrap(), - Message::String(CString::new("GetVersionResponse").unwrap()), - ); - reply.insert( - CString::new("version").unwrap(), - Message::String(CString::new(env!("CARGO_PKG_VERSION")).unwrap()), - ); - client.send_message(Message::Dictionary(reply)); - } - Some(other) => { - log::warn!(target: LOG_TARGET, "Unknown XPC method: {}", other); - let mut reply: HashMap = HashMap::new(); - reply.insert( - CString::new("type").unwrap(), - Message::String(CString::new("Error").unwrap()), - ); - reply.insert( - CString::new("error").unwrap(), - Message::String(CString::new("UnknownMethod").unwrap()), - ); - reply.insert( - CString::new("message").unwrap(), - Message::String(CString::new(other).unwrap_or_else(|_| CString::new("").unwrap())), - ); - client.send_message(Message::Dictionary(reply)); - } - None => { - log::warn!(target: LOG_TARGET, "XPC message missing method/type"); - let mut reply: HashMap = HashMap::new(); - reply.insert( - CString::new("type").unwrap(), - Message::String(CString::new("Error").unwrap()), - ); - reply.insert( - CString::new("error").unwrap(), - Message::String(CString::new("InvalidRequest").unwrap()), - ); - client.send_message(Message::Dictionary(reply)); - } - } + let response = dispatch(&agent, &map).await; + client.send_message(response); } other => { // For now just echo any non-dictionary messages (useful for testing). diff --git a/kordophoned/src/xpc/endpoint.rs b/kordophoned/src/xpc/endpoint.rs deleted file mode 100644 index 459e5c4..0000000 --- a/kordophoned/src/xpc/endpoint.rs +++ /dev/null @@ -1,27 +0,0 @@ -#![cfg(target_os = "macos")] -//! XPC registry for registering handlers and emitting signals. - -/// Registry for XPC message handlers and signal emission. -pub struct XpcRegistry; - -impl XpcRegistry { - /// Create a new XPC registry for the service. - pub fn new() -> Self { - XpcRegistry - } - - /// Register a handler for incoming messages at a given endpoint. - pub fn register_handler(&self, _name: &str, _handler: F) - where - F: Fn(&[u8]) -> Vec + Send + Sync + 'static, - { - // TODO: Implement handler registration over libxpc using SERVICE_NAME - let _ = (_name, _handler); - } - - /// Send a signal (notification) to connected clients. - pub fn send_signal(&self, _signal: &str, _data: &T) { - // TODO: Serialize and send signal over XPC - let _ = (_signal, _data); - } -} diff --git a/kordophoned/src/xpc/mod.rs b/kordophoned/src/xpc/mod.rs index 87689fb..8f189b2 100644 --- a/kordophoned/src/xpc/mod.rs +++ b/kordophoned/src/xpc/mod.rs @@ -1,3 +1,2 @@ pub mod agent; -pub mod endpoint; pub mod interface;