use kordophoned::daemon::{events::Event, signals::Signal, DaemonResult}; use crate::xpc::interface::SERVICE_NAME; use futures_util::StreamExt; use std::collections::HashMap; use std::ffi::CString; use std::sync::Arc; use tokio::sync::{mpsc, oneshot, Mutex}; use std::thread; use xpc_connection::{Message, MessageError, XpcClient, XpcListener}; static LOG_TARGET: &str = "xpc"; /// XPC IPC agent that forwards daemon events and signals over libxpc. #[derive(Clone)] pub struct XpcAgent { event_sink: mpsc::Sender, signal_receiver: Arc>>>, } impl XpcAgent { /// Create a new XPC agent with an event sink and signal receiver. pub fn new(event_sink: mpsc::Sender, signal_receiver: mpsc::Receiver) -> Self { Self { event_sink, signal_receiver: Arc::new(Mutex::new(Some(signal_receiver))), } } /// Run the XPC agent and host the XPC service. Implements generic dispatch. pub async fn run(self) { log::info!(target: LOG_TARGET, "XPCAgent running"); // Construct the Mach service name without a trailing NUL for CString. let service_name = SERVICE_NAME.trim_end_matches('\0'); let mach_port_name = match CString::new(service_name) { Ok(c) => c, Err(e) => { log::error!(target: LOG_TARGET, "Invalid XPC service name: {e}"); return; } }; log::info!( target: LOG_TARGET, "Waiting for XPC connections on {}", service_name ); let mut listener = XpcListener::listen(&mach_port_name); while let Some(client) = listener.next().await { let agent = self.clone(); thread::spawn(move || { let rt = match tokio::runtime::Builder::new_current_thread() .enable_all() .build() { Ok(rt) => rt, Err(e) => { log::error!(target: LOG_TARGET, "Failed to build runtime for client: {}", e); return; } }; rt.block_on(handle_client(agent, client)); }); } log::info!(target: LOG_TARGET, "XPC listener shutting down"); } /// Send an event to the daemon and await its reply. pub async fn send_event( &self, make_event: impl FnOnce(kordophoned::daemon::events::Reply) -> Event, ) -> DaemonResult { let (tx, rx) = oneshot::channel(); self.event_sink .send(make_event(tx)) .await .map_err(|_| "Failed to send event")?; rx.await.map_err(|_| "Failed to receive reply".into()) } } fn cstr(s: &str) -> CString { CString::new(s).unwrap_or_else(|_| CString::new("").unwrap()) } fn get_string_field(map: &HashMap, key: &str) -> Option { let k = CString::new(key).ok()?; map.get(&k).and_then(|v| match v { Message::String(s) => Some(s.to_string_lossy().into_owned()), _ => None, }) } fn get_dictionary_field<'a>(map: &'a HashMap, key: &str) -> Option<&'a HashMap> { let k = CString::new(key).ok()?; map.get(&k).and_then(|v| match v { Message::Dictionary(d) => Some(d), _ => None, }) } fn make_error_reply(code: &str, message: &str) -> Message { let mut reply: HashMap = HashMap::new(); reply.insert(cstr("type"), Message::String(cstr("Error"))); reply.insert(cstr("error"), Message::String(cstr(code))); reply.insert(cstr("message"), Message::String(cstr(message))); Message::Dictionary(reply) } async fn dispatch(agent: &XpcAgent, root: &HashMap) -> Message { // Standardized request: { method: String, arguments: Dictionary? } let method = match get_string_field(root, "method").or_else(|| get_string_field(root, "type")) { Some(m) => m, None => return make_error_reply("InvalidRequest", "Missing method/type"), }; let _arguments = get_dictionary_field(root, "arguments"); match method.as_str() { // Example implemented method: GetVersion "GetVersion" => { match agent.send_event(Event::GetVersion).await { Ok(version) => { let mut reply: HashMap = HashMap::new(); reply.insert(cstr("type"), Message::String(cstr("GetVersionResponse"))); reply.insert(cstr("version"), Message::String(cstr(&version))); Message::Dictionary(reply) } Err(e) => make_error_reply("DaemonError", &format!("{}", e)), } } "GetConversations" => { // Defaults let mut limit: i32 = 100; let mut offset: i32 = 0; if let Some(args) = get_dictionary_field(root, "arguments") { if let Some(Message::String(v)) = args.get(&cstr("limit")) { limit = v.to_string_lossy().parse().unwrap_or(100); } if let Some(Message::String(v)) = args.get(&cstr("offset")) { offset = v.to_string_lossy().parse().unwrap_or(0); } } match agent.send_event(|r| Event::GetAllConversations(limit, offset, r)).await { Ok(conversations) => { // Build array of conversation dictionaries let mut items: Vec = Vec::with_capacity(conversations.len()); for conv in conversations { let mut m: HashMap = HashMap::new(); m.insert(cstr("guid"), Message::String(cstr(&conv.guid))); m.insert(cstr("display_name"), Message::String(cstr(&conv.display_name.unwrap_or_default()))); m.insert(cstr("unread_count"), Message::String(cstr(&(conv.unread_count as i64).to_string()))); m.insert(cstr("last_message_preview"), Message::String(cstr(&conv.last_message_preview.unwrap_or_default()))); // participants -> array of strings let participants: Vec = conv .participants .into_iter() .map(|p| Message::String(cstr(&p.display_name()))) .collect(); m.insert(cstr("participants"), Message::Array(participants)); // date as unix timestamp (i64) m.insert(cstr("date"), Message::String(cstr(&conv.date.and_utc().timestamp().to_string()))); items.push(Message::Dictionary(m)); } let mut reply: HashMap = HashMap::new(); reply.insert(cstr("type"), Message::String(cstr("GetConversationsResponse"))); reply.insert(cstr("conversations"), Message::Array(items)); Message::Dictionary(reply) } Err(e) => make_error_reply("DaemonError", &format!("{}", e)), } } // Unknown method fallback other => make_error_reply("UnknownMethod", other), } } async fn handle_client(agent: XpcAgent, mut client: XpcClient) { log::info!(target: LOG_TARGET, "New XPC connection"); while let Some(message) = client.next().await { match message { Message::Error(MessageError::ConnectionInterrupted) => { log::warn!(target: LOG_TARGET, "XPC connection interrupted"); } Message::Dictionary(map) => { let response = dispatch(&agent, &map).await; client.send_message(response); } other => { // For now just echo any non-dictionary messages (useful for testing). log::info!(target: LOG_TARGET, "Echoing message: {:?}", other); client.send_message(other); } } } log::info!(target: LOG_TARGET, "XPC connection closed"); }