use crate::xpc::interface::SERVICE_NAME; use futures_util::StreamExt; use kordophoned::daemon::{events::Event, signals::Signal, DaemonResult}; use kordophoned::daemon::settings::Settings; use std::collections::HashMap; use std::ffi::CString; use std::sync::Arc; use std::thread; use tokio::sync::{broadcast, mpsc, oneshot, Mutex}; use xpc_connection::{Message, MessageError, XpcClient, XpcListener}; static LOG_TARGET: &str = "xpc"; /// XPC IPC agent that forwards daemon events and signals over libxpc. #[derive(Clone)] pub struct XpcAgent { event_sink: mpsc::Sender, signal_receiver: Arc>>>, } impl XpcAgent { /// Create a new XPC agent with an event sink and signal receiver. pub fn new(event_sink: mpsc::Sender, signal_receiver: mpsc::Receiver) -> Self { Self { event_sink, signal_receiver: Arc::new(Mutex::new(Some(signal_receiver))), } } /// Run the XPC agent and host the XPC service. Implements generic dispatch. pub async fn run(self) { log::info!(target: LOG_TARGET, "XPCAgent running"); // Construct the Mach service name without a trailing NUL for CString. let service_name = SERVICE_NAME.trim_end_matches('\0'); let mach_port_name = match CString::new(service_name) { Ok(c) => c, Err(e) => { log::error!(target: LOG_TARGET, "Invalid XPC service name: {e}"); return; } }; log::info!( target: LOG_TARGET, "Waiting for XPC connections on {}", service_name ); // Broadcast channel for signals to all connected clients let (signal_tx, _signal_rx) = broadcast::channel::(64); // Spawn a single distributor task that forwards daemon signals to broadcast { let receiver_arc = self.signal_receiver.clone(); let signal_tx_clone = signal_tx.clone(); tokio::spawn(async move { let mut receiver = receiver_arc .lock() .await .take() .expect("Signal receiver already taken"); while let Some(signal) = receiver.recv().await { let _ = signal_tx_clone.send(signal); } }); } let mut listener = XpcListener::listen(&mach_port_name); while let Some(client) = listener.next().await { let agent = self.clone(); let signal_rx = signal_tx.subscribe(); thread::spawn(move || { let rt = match tokio::runtime::Builder::new_current_thread() .enable_all() .build() { Ok(rt) => rt, Err(e) => { log::error!(target: LOG_TARGET, "Failed to build runtime for client: {}", e); return; } }; rt.block_on(handle_client(agent, client, signal_rx)); }); } log::info!(target: LOG_TARGET, "XPC listener shutting down"); } /// Send an event to the daemon and await its reply. pub async fn send_event( &self, make_event: impl FnOnce(kordophoned::daemon::events::Reply) -> Event, ) -> DaemonResult { let (tx, rx) = oneshot::channel(); self.event_sink .send(make_event(tx)) .await .map_err(|_| "Failed to send event")?; rx.await.map_err(|_| "Failed to receive reply".into()) } } fn cstr(s: &str) -> CString { CString::new(s).unwrap_or_else(|_| CString::new("").unwrap()) } fn get_string_field(map: &HashMap, key: &str) -> Option { let k = CString::new(key).ok()?; map.get(&k).and_then(|v| match v { Message::String(s) => Some(s.to_string_lossy().into_owned()), _ => None, }) } fn get_dictionary_field<'a>( map: &'a HashMap, key: &str, ) -> Option<&'a HashMap> { let k = CString::new(key).ok()?; map.get(&k).and_then(|v| match v { Message::Dictionary(d) => Some(d), _ => None, }) } fn make_error_reply(code: &str, message: &str) -> Message { let mut reply: HashMap = HashMap::new(); reply.insert(cstr("type"), Message::String(cstr("Error"))); reply.insert(cstr("error"), Message::String(cstr(code))); reply.insert(cstr("message"), Message::String(cstr(message))); Message::Dictionary(reply) } type XpcMap = HashMap; fn dict_get_str(map: &XpcMap, key: &str) -> Option { let k = CString::new(key).ok()?; match map.get(&k) { Some(Message::String(v)) => Some(v.to_string_lossy().into_owned()), _ => None, } } fn dict_get_i64_from_str(map: &XpcMap, key: &str) -> Option { dict_get_str(map, key).and_then(|s| s.parse::().ok()) } fn dict_put_str(map: &mut XpcMap, key: &str, value: impl AsRef) { map.insert(cstr(key), Message::String(cstr(value.as_ref()))); } fn dict_put_i64_as_str(map: &mut XpcMap, key: &str, value: i64) { dict_put_str(map, key, value.to_string()); } fn array_from_strs(values: impl IntoIterator) -> Message { let arr = values .into_iter() .map(|s| Message::String(cstr(&s))) .collect(); Message::Array(arr) } fn make_ok_reply() -> Message { let mut reply: XpcMap = HashMap::new(); dict_put_str(&mut reply, "type", "Ok"); Message::Dictionary(reply) } async fn dispatch(agent: &XpcAgent, root: &HashMap) -> Message { // Standardized request: { method: String, arguments: Dictionary? } let method = match dict_get_str(root, "method").or_else(|| dict_get_str(root, "type")) { Some(m) => m, None => return make_error_reply("InvalidRequest", "Missing method/type"), }; let _arguments = get_dictionary_field(root, "arguments"); match method.as_str() { // Example implemented method: GetVersion "GetVersion" => match agent.send_event(Event::GetVersion).await { Ok(version) => { let mut reply: XpcMap = HashMap::new(); dict_put_str(&mut reply, "type", "GetVersionResponse"); dict_put_str(&mut reply, "version", &version); Message::Dictionary(reply) } Err(e) => make_error_reply("DaemonError", &format!("{}", e)), }, "GetConversations" => { // Defaults let mut limit: i32 = 100; let mut offset: i32 = 0; if let Some(args) = get_dictionary_field(root, "arguments") { if let Some(v) = dict_get_i64_from_str(args, "limit") { limit = v as i32; } if let Some(v) = dict_get_i64_from_str(args, "offset") { offset = v as i32; } } match agent .send_event(|r| Event::GetAllConversations(limit, offset, r)) .await { Ok(conversations) => { // Build array of conversation dictionaries let mut items: Vec = Vec::with_capacity(conversations.len()); for conv in conversations { let mut m: XpcMap = HashMap::new(); dict_put_str(&mut m, "guid", &conv.guid); dict_put_str( &mut m, "display_name", &conv.display_name.unwrap_or_default(), ); dict_put_i64_as_str(&mut m, "unread_count", conv.unread_count as i64); dict_put_str( &mut m, "last_message_preview", &conv.last_message_preview.unwrap_or_default(), ); // participants -> array of strings let participant_names: Vec = conv .participants .into_iter() .map(|p| p.display_name()) .collect(); m.insert(cstr("participants"), array_from_strs(participant_names)); // date as unix timestamp (i64) dict_put_i64_as_str(&mut m, "date", conv.date.and_utc().timestamp()); items.push(Message::Dictionary(m)); } let mut reply: XpcMap = HashMap::new(); dict_put_str(&mut reply, "type", "GetConversationsResponse"); reply.insert(cstr("conversations"), Message::Array(items)); Message::Dictionary(reply) } Err(e) => make_error_reply("DaemonError", &format!("{}", e)), } } "SyncConversationList" => { match agent.send_event(Event::SyncConversationList).await { Ok(()) => make_ok_reply(), Err(e) => make_error_reply("DaemonError", &format!("{}", e)), } } "SyncAllConversations" => { match agent.send_event(Event::SyncAllConversations).await { Ok(()) => make_ok_reply(), Err(e) => make_error_reply("DaemonError", &format!("{}", e)), } } "SyncConversation" => { let conversation_id = match get_dictionary_field(root, "arguments").and_then(|m| dict_get_str(m, "conversation_id")) { Some(id) => id, None => return make_error_reply("InvalidRequest", "Missing conversation_id"), }; match agent.send_event(|r| Event::SyncConversation(conversation_id, r)).await { Ok(()) => make_ok_reply(), Err(e) => make_error_reply("DaemonError", &format!("{}", e)), } } "MarkConversationAsRead" => { let conversation_id = match get_dictionary_field(root, "arguments").and_then(|m| dict_get_str(m, "conversation_id")) { Some(id) => id, None => return make_error_reply("InvalidRequest", "Missing conversation_id"), }; match agent.send_event(|r| Event::MarkConversationAsRead(conversation_id, r)).await { Ok(()) => make_ok_reply(), Err(e) => make_error_reply("DaemonError", &format!("{}", e)), } } "GetMessages" => { let args = match get_dictionary_field(root, "arguments") { Some(a) => a, None => return make_error_reply("InvalidRequest", "Missing arguments") }; let conversation_id = match dict_get_str(args, "conversation_id") { Some(id) => id, None => return make_error_reply("InvalidRequest", "Missing conversation_id") }; let last_message_id = dict_get_str(args, "last_message_id"); match agent.send_event(|r| Event::GetMessages(conversation_id, last_message_id, r)).await { Ok(messages) => { let mut items: Vec = Vec::with_capacity(messages.len()); for msg in messages { let mut m: XpcMap = HashMap::new(); dict_put_str(&mut m, "id", &msg.id); dict_put_str(&mut m, "text", &msg.text.replace('\u{FFFC}', "")); dict_put_i64_as_str(&mut m, "date", msg.date.and_utc().timestamp()); dict_put_str(&mut m, "sender", &msg.sender.display_name()); items.push(Message::Dictionary(m)); } let mut reply: XpcMap = HashMap::new(); dict_put_str(&mut reply, "type", "GetMessagesResponse"); reply.insert(cstr("messages"), Message::Array(items)); Message::Dictionary(reply) } Err(e) => make_error_reply("DaemonError", &format!("{}", e)), } } "DeleteAllConversations" => { match agent.send_event(Event::DeleteAllConversations).await { Ok(()) => make_ok_reply(), Err(e) => make_error_reply("DaemonError", &format!("{}", e)), } } "SendMessage" => { let args = match get_dictionary_field(root, "arguments") { Some(a) => a, None => return make_error_reply("InvalidRequest", "Missing arguments") }; let conversation_id = match dict_get_str(args, "conversation_id") { Some(v) => v, None => return make_error_reply("InvalidRequest", "Missing conversation_id") }; let text = dict_get_str(args, "text").unwrap_or_default(); let attachment_guids: Vec = match args.get(&cstr("attachment_guids")) { Some(Message::Array(arr)) => arr.iter().filter_map(|m| match m { Message::String(s) => Some(s.to_string_lossy().into_owned()), _ => None }).collect(), _ => Vec::new(), }; match agent.send_event(|r| Event::SendMessage(conversation_id, text, attachment_guids, r)).await { Ok(uuid) => { let mut reply: XpcMap = HashMap::new(); dict_put_str(&mut reply, "type", "SendMessageResponse"); dict_put_str(&mut reply, "uuid", &uuid.to_string()); Message::Dictionary(reply) } Err(e) => make_error_reply("DaemonError", &format!("{}", e)), } } "GetAttachmentInfo" => { let args = match get_dictionary_field(root, "arguments") { Some(a) => a, None => return make_error_reply("InvalidRequest", "Missing arguments") }; let attachment_id = match dict_get_str(args, "attachment_id") { Some(v) => v, None => return make_error_reply("InvalidRequest", "Missing attachment_id") }; match agent.send_event(|r| Event::GetAttachment(attachment_id, r)).await { Ok(attachment) => { let mut reply: XpcMap = HashMap::new(); dict_put_str(&mut reply, "type", "GetAttachmentInfoResponse"); dict_put_str(&mut reply, "path", &attachment.get_path_for_preview(false).to_string_lossy()); dict_put_str(&mut reply, "preview_path", &attachment.get_path_for_preview(true).to_string_lossy()); dict_put_str(&mut reply, "downloaded", &attachment.is_downloaded(false).to_string()); dict_put_str(&mut reply, "preview_downloaded", &attachment.is_downloaded(true).to_string()); Message::Dictionary(reply) } Err(e) => make_error_reply("DaemonError", &format!("{}", e)), } } "DownloadAttachment" => { let args = match get_dictionary_field(root, "arguments") { Some(a) => a, None => return make_error_reply("InvalidRequest", "Missing arguments") }; let attachment_id = match dict_get_str(args, "attachment_id") { Some(v) => v, None => return make_error_reply("InvalidRequest", "Missing attachment_id") }; let preview = dict_get_str(args, "preview").map(|s| s == "true").unwrap_or(false); match agent.send_event(|r| Event::DownloadAttachment(attachment_id, preview, r)).await { Ok(()) => make_ok_reply(), Err(e) => make_error_reply("DaemonError", &format!("{}", e)), } } "UploadAttachment" => { use std::path::PathBuf; let args = match get_dictionary_field(root, "arguments") { Some(a) => a, None => return make_error_reply("InvalidRequest", "Missing arguments") }; let path = match dict_get_str(args, "path") { Some(v) => v, None => return make_error_reply("InvalidRequest", "Missing path") }; match agent.send_event(|r| Event::UploadAttachment(PathBuf::from(path), r)).await { Ok(upload_guid) => { let mut reply: XpcMap = HashMap::new(); dict_put_str(&mut reply, "type", "UploadAttachmentResponse"); dict_put_str(&mut reply, "upload_guid", &upload_guid); Message::Dictionary(reply) } Err(e) => make_error_reply("DaemonError", &format!("{}", e)), } } "GetAllSettings" => { match agent.send_event(Event::GetAllSettings).await { Ok(settings) => { let mut reply: XpcMap = HashMap::new(); dict_put_str(&mut reply, "type", "GetAllSettingsResponse"); dict_put_str(&mut reply, "server_url", &settings.server_url.unwrap_or_default()); dict_put_str(&mut reply, "username", &settings.username.unwrap_or_default()); Message::Dictionary(reply) } Err(e) => make_error_reply("DaemonError", &format!("{}", e)), } } "UpdateSettings" => { let args = match get_dictionary_field(root, "arguments") { Some(a) => a, None => return make_error_reply("InvalidRequest", "Missing arguments") }; let server_url = dict_get_str(args, "server_url"); let username = dict_get_str(args, "username"); let settings = Settings { server_url, username, token: None }; match agent.send_event(|r| Event::UpdateSettings(settings, r)).await { Ok(()) => make_ok_reply(), Err(e) => make_error_reply("DaemonError", &format!("{}", e)), } } // No-op used by clients to ensure the connection is established and subscribed "SubscribeSignals" => { make_ok_reply() } // Unknown method fallback other => make_error_reply("UnknownMethod", other), } } fn signal_to_message(signal: Signal) -> Message { let mut root: XpcMap = HashMap::new(); let mut args: XpcMap = HashMap::new(); match signal { Signal::ConversationsUpdated => { dict_put_str(&mut root, "name", "ConversationsUpdated"); } Signal::MessagesUpdated(conversation_id) => { dict_put_str(&mut root, "name", "MessagesUpdated"); dict_put_str(&mut args, "conversation_id", &conversation_id); } Signal::AttachmentDownloaded(attachment_id) => { dict_put_str(&mut root, "name", "AttachmentDownloadCompleted"); dict_put_str(&mut args, "attachment_id", &attachment_id); } Signal::AttachmentUploaded(upload_guid, attachment_guid) => { dict_put_str(&mut root, "name", "AttachmentUploadCompleted"); dict_put_str(&mut args, "upload_guid", &upload_guid); dict_put_str(&mut args, "attachment_guid", &attachment_guid); } Signal::UpdateStreamReconnected => { dict_put_str(&mut root, "name", "UpdateStreamReconnected"); } } if !args.is_empty() { root.insert(cstr("arguments"), Message::Dictionary(args)); } Message::Dictionary(root) } async fn handle_client(agent: XpcAgent, mut client: XpcClient, mut signal_rx: broadcast::Receiver) { log::info!(target: LOG_TARGET, "New XPC connection"); loop { tokio::select! { maybe_msg = client.next() => { match maybe_msg { Some(Message::Error(MessageError::ConnectionInterrupted)) => { log::warn!(target: LOG_TARGET, "XPC connection interrupted"); } Some(Message::Dictionary(map)) => { let response = dispatch(&agent, &map).await; client.send_message(response); } Some(other) => { log::info!(target: LOG_TARGET, "Echoing message: {:?}", other); client.send_message(other); } None => break, } } recv = signal_rx.recv() => { match recv { Ok(signal) => { let msg = signal_to_message(signal); client.send_message(msg); } Err(broadcast::error::RecvError::Closed) => break, Err(broadcast::error::RecvError::Lagged(_)) => { log::warn!(target: LOG_TARGET, "Lagged behind on signals; dropping some events for this client"); } } } } } log::info!(target: LOG_TARGET, "XPC connection closed"); }