diff --git a/Cargo.lock b/Cargo.lock index 07b9d26..f90bab8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1301,13 +1301,14 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", + "block", "clap 4.5.20", "dbus", "dbus-codegen", "dbus-tree", "dotenv", "env_logger 0.11.8", - "futures-preview", + "futures", "futures-util", "kordophone", "kordophone-db", @@ -1318,6 +1319,7 @@ dependencies = [ "time", "tokio", "xpc-connection", + "xpc-connection-sys", ] [[package]] diff --git a/kordophoned/README.md b/kordophoned/README.md new file mode 100644 index 0000000..2026166 --- /dev/null +++ b/kordophoned/README.md @@ -0,0 +1,13 @@ +# kordophoned + +The daemon executable that exposes an IPC interface (Dbus on Linux, XPC on macoS) to the client. + + +## Running on macOS + +Before any client can talk to the kordophone daemon on macOS, the XPC service needs to be manually registered with launchd. + +- Copy `include/net.buzzert.kordophonecd.plist` to `~/Library/LaunchAgents` (note the `ProgramArguments` key/value). +- Register using `launchctl load ~/Library/LaunchAgents/net.buzzert.kordophonecd.plist` + + diff --git a/kordophoned/include/net.buzzert.kordophonecd.plist b/kordophoned/include/net.buzzert.kordophonecd.plist new file mode 100644 index 0000000..69617ae --- /dev/null +++ b/kordophoned/include/net.buzzert.kordophonecd.plist @@ -0,0 +1,29 @@ + + + + + Label + net.buzzert.kordophonecd + + ProgramArguments + + /Users/buzzert/src/kordophone-rs/target/debug/kordophoned + + + MachServices + + net.buzzert.kordophonecd + + + + RunAtLoad + + KeepAlive + + + StandardOutPath + /tmp/kordophoned.out.log + StandardErrorPath + /tmp/kordophoned.err.log + + \ No newline at end of file diff --git a/kordophoned/src/xpc/agent.rs b/kordophoned/src/xpc/agent.rs index e49e405..6090bfc 100644 --- a/kordophoned/src/xpc/agent.rs +++ b/kordophoned/src/xpc/agent.rs @@ -1,6 +1,14 @@ use crate::daemon::{events::Event, signals::Signal, DaemonResult}; +use crate::xpc::interface::SERVICE_NAME; +use futures_util::StreamExt; +use std::collections::HashMap; +use std::ffi::CString; use std::sync::Arc; use tokio::sync::{mpsc, oneshot, Mutex}; +use xpc_connection::{Message, MessageError, XpcClient, XpcListener}; + + +static LOG_TARGET: &str = "xpc"; /// XPC IPC agent that forwards daemon events and signals over libxpc. #[derive(Clone)] @@ -18,9 +26,33 @@ impl XpcAgent { } } - /// Run the XPC agent: perform a basic GetVersion IPC call to the daemon and print the result. + /// Run the XPC agent and host the XPC service. Implements `GetVersion`. pub async fn run(self) { - todo!() + log::info!(target: LOG_TARGET, "XPCAgent running"); + + // Construct the Mach service name without a trailing NUL for CString. + let service_name = SERVICE_NAME.trim_end_matches('\0'); + let mach_port_name = match CString::new(service_name) { + Ok(c) => c, + Err(e) => { + log::error!(target: LOG_TARGET, "Invalid XPC service name: {e}"); + return; + } + }; + + log::info!( + target: LOG_TARGET, + "Waiting for XPC connections on {}", + service_name + ); + + let mut listener = XpcListener::listen(&mach_port_name); + + while let Some(client) = listener.next().await { + tokio::spawn(handle_client(client)); + } + + log::info!(target: LOG_TARGET, "XPC listener shutting down"); } /// Send an event to the daemon and await its reply. @@ -36,3 +68,80 @@ impl XpcAgent { rx.await.map_err(|_| "Failed to receive reply".into()) } } + +async fn handle_client(mut client: XpcClient) { + log::info!(target: LOG_TARGET, "New XPC connection"); + + while let Some(message) = client.next().await { + match message { + Message::Error(MessageError::ConnectionInterrupted) => { + log::warn!(target: LOG_TARGET, "XPC connection interrupted"); + } + Message::Dictionary(map) => { + // Try keys "method" or "type" to identify the call. + let method_key = CString::new("method").unwrap(); + let type_key = CString::new("type").unwrap(); + + let maybe_method = map + .get(&method_key) + .or_else(|| map.get(&type_key)) + .and_then(|v| match v { + Message::String(s) => Some(s.to_string_lossy().into_owned()), + _ => None, + }); + + match maybe_method.as_deref() { + Some("GetVersion") => { + let mut reply: HashMap = HashMap::new(); + reply.insert( + CString::new("type").unwrap(), + Message::String(CString::new("GetVersionResponse").unwrap()), + ); + reply.insert( + CString::new("version").unwrap(), + Message::String(CString::new(env!("CARGO_PKG_VERSION")).unwrap()), + ); + client.send_message(Message::Dictionary(reply)); + } + Some(other) => { + log::warn!(target: LOG_TARGET, "Unknown XPC method: {}", other); + let mut reply: HashMap = HashMap::new(); + reply.insert( + CString::new("type").unwrap(), + Message::String(CString::new("Error").unwrap()), + ); + reply.insert( + CString::new("error").unwrap(), + Message::String(CString::new("UnknownMethod").unwrap()), + ); + reply.insert( + CString::new("message").unwrap(), + Message::String(CString::new(other).unwrap_or_else(|_| CString::new("").unwrap())), + ); + client.send_message(Message::Dictionary(reply)); + } + None => { + log::warn!(target: LOG_TARGET, "XPC message missing method/type"); + let mut reply: HashMap = HashMap::new(); + reply.insert( + CString::new("type").unwrap(), + Message::String(CString::new("Error").unwrap()), + ); + reply.insert( + CString::new("error").unwrap(), + Message::String(CString::new("InvalidRequest").unwrap()), + ); + client.send_message(Message::Dictionary(reply)); + } + } + } + other => { + // For now just echo any non-dictionary messages (useful for testing). + log::info!(target: LOG_TARGET, "Echoing message: {:?}", other); + client.send_message(other); + } + } + } + + log::info!(target: LOG_TARGET, "XPC connection closed"); +} diff --git a/kordophoned/src/xpc/mod.rs b/kordophoned/src/xpc/mod.rs index 6c335d2..87689fb 100644 --- a/kordophoned/src/xpc/mod.rs +++ b/kordophoned/src/xpc/mod.rs @@ -1,6 +1,3 @@ -#![cfg(target_os = "macos")] -//! macOS XPC IPC interface modules. - pub mod agent; pub mod endpoint; pub mod interface; diff --git a/kpcli/Cargo.toml b/kpcli/Cargo.toml index 6ecef03..da2843e 100644 --- a/kpcli/Cargo.toml +++ b/kpcli/Cargo.toml @@ -32,5 +32,7 @@ dbus-codegen = "0.10.0" # XPC (libxpc) interface only on macOS [target.'cfg(target_os = "macos")'.dependencies] -futures-preview = "=0.2.2" +block = "0.1.6" +futures = "0.3.4" xpc-connection = { git = "https://github.com/dfrankland/xpc-connection-rs.git", rev = "cd4fb3d", package = "xpc-connection" } +xpc-connection-sys = { git = "https://github.com/dfrankland/xpc-connection-rs.git", rev = "cd4fb3d", package = "xpc-connection-sys" } diff --git a/kpcli/src/daemon/xpc.rs b/kpcli/src/daemon/xpc.rs index 951d514..21fd956 100644 --- a/kpcli/src/daemon/xpc.rs +++ b/kpcli/src/daemon/xpc.rs @@ -1,16 +1,107 @@ -#![cfg(target_os = "macos")] -//! macOS XPC implementation of the DaemonInterface for kpcli. - use super::{ConfigCommands, DaemonInterface}; use anyhow::Result; use async_trait::async_trait; -use futures::stream::StreamExt; -use futures::executor::block_on; +use futures_util::StreamExt; use std::collections::HashMap; -use xpc_connection::{Message, XpcConnection}; +use std::ffi::{CStr, CString}; +use std::ops::Deref; +use std::{pin::Pin, task::Poll}; + +use xpc_connection::Message; + +use futures::{ + channel::mpsc::{unbounded as unbounded_channel, UnboundedReceiver, UnboundedSender}, + Stream, +}; + const SERVICE_NAME: &str = "net.buzzert.kordophonecd\0"; -const GET_VERSION_METHOD: &str = "GetVersion\0"; +const GET_VERSION_METHOD: &str = "GetVersion"; + +// We can't use XPCClient from xpc-connection because of some strange decisions with which flags +// are passed to xpc_connection_create_mach_service. +struct XPCClient { + connection: xpc_connection_sys::xpc_connection_t, + receiver: UnboundedReceiver, + sender: UnboundedSender, + event_handler_is_running: bool, +} + +impl XPCClient { + pub fn connect(name: impl AsRef) -> Self { + use block::ConcreteBlock; + use xpc_connection::xpc_object_to_message; + use xpc_connection_sys::xpc_connection_set_event_handler; + use xpc_connection_sys::xpc_connection_resume; + + let name = name.as_ref(); + let connection = unsafe { + xpc_connection_sys::xpc_connection_create_mach_service(name.as_ptr(), std::ptr::null_mut(), 0) + }; + + let (sender, receiver) = unbounded_channel(); + let sender_clone = sender.clone(); + + let block = ConcreteBlock::new(move |event| { + let message = xpc_object_to_message(event); + sender_clone.unbounded_send(message).ok() + }); + + let block = block.copy(); + + unsafe { + xpc_connection_set_event_handler(connection, block.deref() as *const _ as *mut _); + xpc_connection_resume(connection); + } + + Self { + connection, + receiver, + sender, + event_handler_is_running: true, + } + } + + pub fn send_message(&self, message: Message) { + use xpc_connection::message_to_xpc_object; + use xpc_connection_sys::xpc_connection_send_message; + use xpc_connection_sys::xpc_release; + + let xpc_object = message_to_xpc_object(message); + unsafe { + xpc_connection_send_message(self.connection, xpc_object); + xpc_release(xpc_object); + } + } +} + +impl Drop for XPCClient { + fn drop(&mut self) { + use xpc_connection_sys::xpc_release; + use xpc_connection_sys::xpc_object_t; + + unsafe { xpc_release(self.connection as xpc_object_t) }; + } +} + +impl Stream for XPCClient { + type Item = Message; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + match Stream::poll_next(Pin::new(&mut self.receiver), cx) { + Poll::Ready(Some(Message::Error(xpc_connection::MessageError::ConnectionInvalid))) => { + self.event_handler_is_running = false; + Poll::Ready(None) + } + v => v, + } + } +} + +unsafe impl Send for XPCClient {} /// XPC-based implementation of DaemonInterface that sends method calls to the daemon over libxpc. pub struct XpcDaemonInterface; @@ -25,33 +116,43 @@ impl XpcDaemonInterface { #[async_trait] impl DaemonInterface for XpcDaemonInterface { async fn print_version(&mut self) -> Result<()> { + // Build service name CString (trim trailing NUL from const) + let service_name = SERVICE_NAME.trim_end_matches('\0'); + let mach_port_name = CString::new(service_name)?; + // Open an XPC connection to the daemon service - let mut conn = XpcConnection::new(SERVICE_NAME); - let mut incoming = conn.connect(); + let mut client = XPCClient::connect(&mach_port_name); - // Send a GetVersion request as a dictionary message - let mut dict = HashMap::new(); - dict.insert( - GET_VERSION_METHOD.to_string(), - Message::String(String::new()), - ); - conn.send_message(Message::Dictionary(dict)); - - // Wait for a single string reply (futures-preview StreamFuture returns (Option, Stream)) - let (opt_msg, _) = match block_on(incoming.next()) { - Ok(pair) => pair, - Err(e) => { - eprintln!("Error reading XPC reply: {:?}", e); - return Ok(()); - } - }; - if let Some(Message::String(ver_raw)) = opt_msg { - // Trim the trailing NUL if present - let version = ver_raw.trim_end_matches('\0'); - println!("Server version: {}", version); - } else { - eprintln!("Unexpected XPC reply for GetVersion"); + // Send a GetVersion request as a dictionary message: { method: "GetVersion" } + { + let mut request = HashMap::new(); + request.insert( + CString::new("method").unwrap(), + Message::String(CString::new(GET_VERSION_METHOD).unwrap()), + ); + client.send_message(Message::Dictionary(request)); } + + // Await a single reply and print the version + match client.next().await { + Some(Message::Dictionary(map)) => { + if let Some(Message::String(ver)) = map.get(&CString::new("version").unwrap()) { + println!("Server version: {}", ver.to_string_lossy()); + } else if let Some(Message::String(ty)) = map.get(&CString::new("type").unwrap()) + { + println!("XPC replied with type: {}", ty.to_string_lossy()); + } else { + eprintln!("Unexpected XPC reply payload for GetVersion"); + } + } + Some(other) => { + eprintln!("Unexpected XPC reply: {:?}", other); + } + None => { + eprintln!("No reply received from XPC daemon"); + } + } + Ok(()) }