use crate::daemon::{events::Event, signals::Signal, DaemonResult}; use crate::xpc::interface::SERVICE_NAME; use futures_util::StreamExt; use std::collections::HashMap; use std::ffi::CString; use std::sync::Arc; use tokio::sync::{mpsc, oneshot, Mutex}; use xpc_connection::{Message, MessageError, XpcClient, XpcListener}; static LOG_TARGET: &str = "xpc"; /// XPC IPC agent that forwards daemon events and signals over libxpc. #[derive(Clone)] pub struct XpcAgent { event_sink: mpsc::Sender, signal_receiver: Arc>>>, } impl XpcAgent { /// Create a new XPC agent with an event sink and signal receiver. pub fn new(event_sink: mpsc::Sender, signal_receiver: mpsc::Receiver) -> Self { Self { event_sink, signal_receiver: Arc::new(Mutex::new(Some(signal_receiver))), } } /// Run the XPC agent and host the XPC service. Implements `GetVersion`. pub async fn run(self) { log::info!(target: LOG_TARGET, "XPCAgent running"); // Construct the Mach service name without a trailing NUL for CString. let service_name = SERVICE_NAME.trim_end_matches('\0'); let mach_port_name = match CString::new(service_name) { Ok(c) => c, Err(e) => { log::error!(target: LOG_TARGET, "Invalid XPC service name: {e}"); return; } }; log::info!( target: LOG_TARGET, "Waiting for XPC connections on {}", service_name ); let mut listener = XpcListener::listen(&mach_port_name); while let Some(client) = listener.next().await { tokio::spawn(handle_client(client)); } log::info!(target: LOG_TARGET, "XPC listener shutting down"); } /// Send an event to the daemon and await its reply. pub async fn send_event( &self, make_event: impl FnOnce(crate::daemon::events::Reply) -> Event, ) -> DaemonResult { let (tx, rx) = oneshot::channel(); self.event_sink .send(make_event(tx)) .await .map_err(|_| "Failed to send event")?; rx.await.map_err(|_| "Failed to receive reply".into()) } } async fn handle_client(mut client: XpcClient) { log::info!(target: LOG_TARGET, "New XPC connection"); while let Some(message) = client.next().await { match message { Message::Error(MessageError::ConnectionInterrupted) => { log::warn!(target: LOG_TARGET, "XPC connection interrupted"); } Message::Dictionary(map) => { // Try keys "method" or "type" to identify the call. let method_key = CString::new("method").unwrap(); let type_key = CString::new("type").unwrap(); let maybe_method = map .get(&method_key) .or_else(|| map.get(&type_key)) .and_then(|v| match v { Message::String(s) => Some(s.to_string_lossy().into_owned()), _ => None, }); match maybe_method.as_deref() { Some("GetVersion") => { let mut reply: HashMap = HashMap::new(); reply.insert( CString::new("type").unwrap(), Message::String(CString::new("GetVersionResponse").unwrap()), ); reply.insert( CString::new("version").unwrap(), Message::String(CString::new(env!("CARGO_PKG_VERSION")).unwrap()), ); client.send_message(Message::Dictionary(reply)); } Some(other) => { log::warn!(target: LOG_TARGET, "Unknown XPC method: {}", other); let mut reply: HashMap = HashMap::new(); reply.insert( CString::new("type").unwrap(), Message::String(CString::new("Error").unwrap()), ); reply.insert( CString::new("error").unwrap(), Message::String(CString::new("UnknownMethod").unwrap()), ); reply.insert( CString::new("message").unwrap(), Message::String(CString::new(other).unwrap_or_else(|_| CString::new("").unwrap())), ); client.send_message(Message::Dictionary(reply)); } None => { log::warn!(target: LOG_TARGET, "XPC message missing method/type"); let mut reply: HashMap = HashMap::new(); reply.insert( CString::new("type").unwrap(), Message::String(CString::new("Error").unwrap()), ); reply.insert( CString::new("error").unwrap(), Message::String(CString::new("InvalidRequest").unwrap()), ); client.send_message(Message::Dictionary(reply)); } } } other => { // For now just echo any non-dictionary messages (useful for testing). log::info!(target: LOG_TARGET, "Echoing message: {:?}", other); client.send_message(other); } } } log::info!(target: LOG_TARGET, "XPC connection closed"); }