From 00bbc3b330da2e403656a74bc87c0d7ac0b61c44 Mon Sep 17 00:00:00 2001 From: James Magahern Date: Sun, 24 Aug 2025 15:49:55 -0700 Subject: [PATCH] xpc: refactor -- separate rpc impl and xpc glue --- kordophoned/src/xpc/agent.rs | 590 ++++------------------------------- kordophoned/src/xpc/mod.rs | 2 + kordophoned/src/xpc/rpc.rs | 235 ++++++++++++++ kordophoned/src/xpc/util.rs | 87 ++++++ 4 files changed, 381 insertions(+), 533 deletions(-) create mode 100644 kordophoned/src/xpc/rpc.rs create mode 100644 kordophoned/src/xpc/util.rs diff --git a/kordophoned/src/xpc/agent.rs b/kordophoned/src/xpc/agent.rs index 880334a..226f1b5 100644 --- a/kordophoned/src/xpc/agent.rs +++ b/kordophoned/src/xpc/agent.rs @@ -10,18 +10,17 @@ use tokio::sync::{mpsc, oneshot, Mutex}; use xpc_connection::{message_to_xpc_object, xpc_object_to_message, Message, MessageError}; use xpc_connection_sys as xpc_sys; -static LOG_TARGET: &str = "xpc"; +pub(super) static LOG_TARGET: &str = "xpc"; /// Wrapper for raw XPC connection pointer to declare cross-thread usage. /// Safety: libxpc connections are reference-counted and may be used to send from other threads. #[derive(Copy, Clone)] -struct XpcConn(pub xpc_sys::xpc_connection_t); +pub(super) struct XpcConn(pub xpc_sys::xpc_connection_t); unsafe impl Send for XpcConn {} unsafe impl Sync for XpcConn {} type Subscribers = Arc>>; -/// XPC IPC agent that forwards daemon events and signals over libxpc. #[derive(Clone)] pub struct XpcAgent { event_sink: mpsc::Sender, @@ -29,12 +28,10 @@ pub struct XpcAgent { } impl XpcAgent { - /// Create a new XPC agent with an event sink and signal receiver. pub fn new(event_sink: mpsc::Sender, signal_receiver: mpsc::Receiver) -> Self { Self { event_sink, signal_receiver: Arc::new(Mutex::new(Some(signal_receiver))) } } - /// Run the XPC agent and host the XPC service. Implements generic dispatch. pub async fn run(self) { use block::ConcreteBlock; use std::ops::Deref; @@ -55,7 +52,6 @@ impl XpcAgent { service_name ); - // Multi-thread runtime to drive async dispatch from XPC event handlers. let rt = match tokio::runtime::Runtime::new() { Ok(rt) => Arc::new(rt), Err(e) => { @@ -64,9 +60,7 @@ impl XpcAgent { } }; - // Shared list of connected clients for signal fanout let connections: Subscribers = Arc::new(std::sync::Mutex::new(Vec::new())); - // Forward daemon signals to all connected clients { let receiver_arc = self.signal_receiver.clone(); let conns = connections.clone(); @@ -78,7 +72,7 @@ impl XpcAgent { .expect("Signal receiver already taken"); while let Some(signal) = receiver.recv().await { log::trace!(target: LOG_TARGET, "Broadcasting signal: {:?}", signal); - let msg = signal_to_message(signal); + let msg = super::util::signal_to_message(signal); let xobj = message_to_xpc_object(msg); let list = conns.lock().unwrap(); log::trace!(target: LOG_TARGET, "Active XPC clients: {}", list.len()); @@ -91,7 +85,6 @@ impl XpcAgent { }); } - // Create the XPC Mach service listener. let service = unsafe { xpc_sys::xpc_connection_create_mach_service( mach_port_name.as_ptr(), @@ -100,80 +93,71 @@ impl XpcAgent { ) }; - // Event handler for the service: accepts new client connections. let agent = self.clone(); let rt_accept = rt.clone(); let conns_accept = connections.clone(); let service_handler = ConcreteBlock::new(move |event: xpc_sys::xpc_object_t| { unsafe { - // Treat incoming events as connections; ignore others - // We detect connections by trying to set a per-connection handler. - let client = event as xpc_sys::xpc_connection_t; - log::trace!(target: LOG_TARGET, "New XPC connection accepted"); - // Do not register for signals until the client explicitly subscribes + let client = event as xpc_sys::xpc_connection_t; + log::trace!(target: LOG_TARGET, "New XPC connection accepted"); - // Per-connection handler - let agent_conn = agent.clone(); - let rt_conn = rt_accept.clone(); - let conns_for_handler = conns_accept.clone(); - let conn_handler = ConcreteBlock::new(move |msg: xpc_sys::xpc_object_t| { - // Convert to higher-level Message for type matching - match xpc_object_to_message(msg) { - Message::Dictionary(map) => { - // Trace inbound method - let method = dict_get_str(&map, "method").or_else(|| dict_get_str(&map, "type")).unwrap_or_else(|| "".to_string()); - log::trace!(target: LOG_TARGET, "XPC request received: {}", method); - let response = rt_conn.block_on(dispatch(&agent_conn, &conns_for_handler, client, &map)); - let reply = unsafe { xpc_sys::xpc_dictionary_create_reply(msg) }; - if !reply.is_null() { - let payload = message_to_xpc_object(response); - let apply_block = ConcreteBlock::new(move |key: *const c_char, value: xpc_sys::xpc_object_t| { - unsafe { xpc_sys::xpc_dictionary_set_value(reply, key, value); } - }) - .copy(); - unsafe { - xpc_sys::xpc_dictionary_apply(payload, apply_block.deref() as *const _ as *mut _); - xpc_sys::xpc_connection_send_message(client, reply); - xpc_sys::xpc_release(payload); - xpc_sys::xpc_release(reply); - } - log::trace!(target: LOG_TARGET, "XPC reply sent for method: {}", method); - } else { - log::warn!(target: LOG_TARGET, "No reply port for method: {}", method); + let agent_conn = agent.clone(); + let rt_conn = rt_accept.clone(); + let conns_for_handler = conns_accept.clone(); + let conn_handler = ConcreteBlock::new(move |msg: xpc_sys::xpc_object_t| { + match xpc_object_to_message(msg) { + Message::Dictionary(map) => { + let method = super::util::dict_get_str(&map, "method").or_else(|| super::util::dict_get_str(&map, "type")).unwrap_or_else(|| "".to_string()); + log::trace!(target: LOG_TARGET, "XPC request received: {}", method); + let response = rt_conn.block_on(super::rpc::dispatch(&agent_conn, &conns_for_handler, client, &map)); + let reply = unsafe { xpc_sys::xpc_dictionary_create_reply(msg) }; + if !reply.is_null() { + let payload = message_to_xpc_object(response); + let apply_block = ConcreteBlock::new(move |key: *const c_char, value: xpc_sys::xpc_object_t| { + unsafe { xpc_sys::xpc_dictionary_set_value(reply, key, value); } + }) + .copy(); + unsafe { + xpc_sys::xpc_dictionary_apply(payload, apply_block.deref() as *const _ as *mut _); + xpc_sys::xpc_connection_send_message(client, reply); + xpc_sys::xpc_release(payload); + xpc_sys::xpc_release(reply); } + log::trace!(target: LOG_TARGET, "XPC reply sent for method: {}", method); + } else { + log::warn!(target: LOG_TARGET, "No reply port for method: {}", method); } - Message::Error(e) => { - match e { - MessageError::ConnectionInvalid => { - // Normal for one-shot RPC connections; keep logs quiet - let mut list = conns_for_handler.lock().unwrap(); - let before = list.len(); - list.retain(|c| c.0 != client); - let after = list.len(); - if after < before { - log::trace!(target: LOG_TARGET, "Removed closed XPC client from subscribers ({} -> {})", before, after); - } else { - log::debug!(target: LOG_TARGET, "XPC connection closed (no subscription)"); - } - } - other => { - log::warn!(target: LOG_TARGET, "XPC error event: {:?}", other); - } - } - } - _ => {} } - }) - .copy(); + Message::Error(e) => { + match e { + MessageError::ConnectionInvalid => { + let mut list = conns_for_handler.lock().unwrap(); + let before = list.len(); + list.retain(|c| c.0 != client); + let after = list.len(); + if after < before { + log::trace!(target: LOG_TARGET, "Removed closed XPC client from subscribers ({} -> {})", before, after); + } else { + log::debug!(target: LOG_TARGET, "XPC connection closed (no subscription)"); + } + } + other => { + log::warn!(target: LOG_TARGET, "XPC error event: {:?}", other); + } + } + } + _ => {} + } + }) + .copy(); - xpc_sys::xpc_connection_set_event_handler( - client, - conn_handler.deref() as *const _ as *mut _, - ); - xpc_sys::xpc_connection_resume(client); - } + xpc_sys::xpc_connection_set_event_handler( + client, + conn_handler.deref() as *const _ as *mut _, + ); + xpc_sys::xpc_connection_resume(client); } - ) + }) .copy(); unsafe { @@ -184,11 +168,9 @@ impl XpcAgent { xpc_sys::xpc_connection_resume(service); } - // Keep this future alive forever. futures_util::future::pending::<()>().await; } - /// Send an event to the daemon and await its reply. pub async fn send_event( &self, make_event: impl FnOnce(kordophoned::daemon::events::Reply) -> Event, @@ -202,461 +184,3 @@ impl XpcAgent { } } -fn cstr(s: &str) -> CString { - CString::new(s).unwrap_or_else(|_| CString::new("").unwrap()) -} - -fn get_string_field(map: &HashMap, key: &str) -> Option { - let k = CString::new(key).ok()?; - map.get(&k).and_then(|v| match v { - Message::String(s) => Some(s.to_string_lossy().into_owned()), - _ => None, - }) -} - -fn get_dictionary_field<'a>( - map: &'a HashMap, - key: &str, -) -> Option<&'a HashMap> { - let k = CString::new(key).ok()?; - map.get(&k).and_then(|v| match v { - Message::Dictionary(d) => Some(d), - _ => None, - }) -} - -fn make_error_reply(code: &str, message: &str) -> Message { - log::error!(target: LOG_TARGET, "XPC error: {code}: {message}"); - - let mut reply: HashMap = HashMap::new(); - reply.insert(cstr("type"), Message::String(cstr("Error"))); - reply.insert(cstr("error"), Message::String(cstr(code))); - reply.insert(cstr("message"), Message::String(cstr(message))); - - Message::Dictionary(reply) -} - -type XpcMap = HashMap; - -fn dict_get_str(map: &XpcMap, key: &str) -> Option { - let k = CString::new(key).ok()?; - match map.get(&k) { - Some(Message::String(v)) => Some(v.to_string_lossy().into_owned()), - _ => None, - } -} - -fn dict_get_i64_from_str(map: &XpcMap, key: &str) -> Option { - dict_get_str(map, key).and_then(|s| s.parse::().ok()) -} - -fn dict_put_str(map: &mut XpcMap, key: &str, value: impl AsRef) { - map.insert(cstr(key), Message::String(cstr(value.as_ref()))); -} - -fn dict_put_i64_as_str(map: &mut XpcMap, key: &str, value: i64) { - dict_put_str(map, key, value.to_string()); -} - -fn array_from_strs(values: impl IntoIterator) -> Message { - let arr = values - .into_iter() - .map(|s| Message::String(cstr(&s))) - .collect(); - Message::Array(arr) -} - -fn make_ok_reply() -> Message { - let mut reply: XpcMap = HashMap::new(); - dict_put_str(&mut reply, "type", "Ok"); - Message::Dictionary(reply) -} - -/// Attach an optional request_id to a dictionary reply message. -fn attach_request_id(mut message: Message, request_id: Option) -> Message { - if let (Some(id), Message::Dictionary(ref mut m)) = (request_id, &mut message) { - dict_put_str(m, "request_id", &id); - } - message -} - -async fn dispatch( - agent: &XpcAgent, - subscribers: &std::sync::Mutex>, - current_client: xpc_sys::xpc_connection_t, - root: &HashMap, -) -> Message { - // Standardized request: { method: String, arguments: Dictionary?, request_id: String? } - let request_id = dict_get_str(root, "request_id"); - - let method = match dict_get_str(root, "method").or_else(|| dict_get_str(root, "type")) { - Some(m) => m, - None => return attach_request_id(make_error_reply("InvalidRequest", "Missing method/type"), request_id), - }; - - let _arguments = get_dictionary_field(root, "arguments"); - - let mut response = match method.as_str() { - // Example implemented method: GetVersion - "GetVersion" => match agent.send_event(Event::GetVersion).await { - Ok(version) => { - let mut reply: XpcMap = HashMap::new(); - dict_put_str(&mut reply, "type", "GetVersionResponse"); - dict_put_str(&mut reply, "version", &version); - Message::Dictionary(reply) - } - Err(e) => make_error_reply("DaemonError", &format!("{}", e)), - }, - - "GetConversations" => { - // Defaults - let mut limit: i32 = 100; - let mut offset: i32 = 0; - - if let Some(args) = get_dictionary_field(root, "arguments") { - if let Some(v) = dict_get_i64_from_str(args, "limit") { - limit = v as i32; - } - if let Some(v) = dict_get_i64_from_str(args, "offset") { - offset = v as i32; - } - } - - match agent - .send_event(|r| Event::GetAllConversations(limit, offset, r)) - .await - { - Ok(conversations) => { - // Build array of conversation dictionaries - let mut items: Vec = Vec::with_capacity(conversations.len()); - for conv in conversations { - let mut m: XpcMap = HashMap::new(); - dict_put_str(&mut m, "guid", &conv.guid); - dict_put_str( - &mut m, - "display_name", - &conv.display_name.unwrap_or_default(), - ); - dict_put_i64_as_str(&mut m, "unread_count", conv.unread_count as i64); - dict_put_str( - &mut m, - "last_message_preview", - &conv.last_message_preview.unwrap_or_default(), - ); - - // participants -> array of strings - let participant_names: Vec = conv - .participants - .into_iter() - .map(|p| p.display_name()) - .collect(); - m.insert(cstr("participants"), array_from_strs(participant_names)); - - // date as unix timestamp (i64) - dict_put_i64_as_str(&mut m, "date", conv.date.and_utc().timestamp()); - - items.push(Message::Dictionary(m)); - } - - let mut reply: XpcMap = HashMap::new(); - dict_put_str(&mut reply, "type", "GetConversationsResponse"); - reply.insert(cstr("conversations"), Message::Array(items)); - Message::Dictionary(reply) - } - Err(e) => make_error_reply("DaemonError", &format!("{}", e)), - } - } - - "SyncConversationList" => match agent.send_event(Event::SyncConversationList).await { - Ok(()) => make_ok_reply(), - Err(e) => make_error_reply("DaemonError", &format!("{}", e)), - }, - - "SyncAllConversations" => match agent.send_event(Event::SyncAllConversations).await { - Ok(()) => make_ok_reply(), - Err(e) => make_error_reply("DaemonError", &format!("{}", e)), - }, - - "SyncConversation" => { - let conversation_id = match get_dictionary_field(root, "arguments") - .and_then(|m| dict_get_str(m, "conversation_id")) - { - Some(id) => id, - None => return make_error_reply("InvalidRequest", "Missing conversation_id"), - }; - match agent - .send_event(|r| Event::SyncConversation(conversation_id, r)) - .await - { - Ok(()) => make_ok_reply(), - Err(e) => make_error_reply("DaemonError", &format!("{}", e)), - } - } - - "MarkConversationAsRead" => { - let conversation_id = match get_dictionary_field(root, "arguments") - .and_then(|m| dict_get_str(m, "conversation_id")) - { - Some(id) => id, - None => return make_error_reply("InvalidRequest", "Missing conversation_id"), - }; - match agent - .send_event(|r| Event::MarkConversationAsRead(conversation_id, r)) - .await - { - Ok(()) => make_ok_reply(), - Err(e) => make_error_reply("DaemonError", &format!("{}", e)), - } - } - - "GetMessages" => { - let args = match get_dictionary_field(root, "arguments") { - Some(a) => a, - None => return make_error_reply("InvalidRequest", "Missing arguments"), - }; - let conversation_id = match dict_get_str(args, "conversation_id") { - Some(id) => id, - None => return make_error_reply("InvalidRequest", "Missing conversation_id"), - }; - let last_message_id = dict_get_str(args, "last_message_id"); - match agent - .send_event(|r| Event::GetMessages(conversation_id, last_message_id, r)) - .await - { - Ok(messages) => { - let mut items: Vec = Vec::with_capacity(messages.len()); - for msg in messages { - let mut m: XpcMap = HashMap::new(); - dict_put_str(&mut m, "id", &msg.id); - dict_put_str(&mut m, "text", &msg.text.replace('\u{FFFC}', "")); - dict_put_i64_as_str(&mut m, "date", msg.date.and_utc().timestamp()); - dict_put_str(&mut m, "sender", &msg.sender.display_name()); - items.push(Message::Dictionary(m)); - } - let mut reply: XpcMap = HashMap::new(); - dict_put_str(&mut reply, "type", "GetMessagesResponse"); - reply.insert(cstr("messages"), Message::Array(items)); - Message::Dictionary(reply) - } - Err(e) => make_error_reply("DaemonError", &format!("{}", e)), - } - } - - "DeleteAllConversations" => match agent.send_event(Event::DeleteAllConversations).await { - Ok(()) => make_ok_reply(), - Err(e) => make_error_reply("DaemonError", &format!("{}", e)), - }, - - "SendMessage" => { - let args = match get_dictionary_field(root, "arguments") { - Some(a) => a, - None => return make_error_reply("InvalidRequest", "Missing arguments"), - }; - let conversation_id = match dict_get_str(args, "conversation_id") { - Some(v) => v, - None => return make_error_reply("InvalidRequest", "Missing conversation_id"), - }; - let text = dict_get_str(args, "text").unwrap_or_default(); - let attachment_guids: Vec = match args.get(&cstr("attachment_guids")) { - Some(Message::Array(arr)) => arr - .iter() - .filter_map(|m| match m { - Message::String(s) => Some(s.to_string_lossy().into_owned()), - _ => None, - }) - .collect(), - _ => Vec::new(), - }; - match agent - .send_event(|r| Event::SendMessage(conversation_id, text, attachment_guids, r)) - .await - { - Ok(uuid) => { - let mut reply: XpcMap = HashMap::new(); - dict_put_str(&mut reply, "type", "SendMessageResponse"); - dict_put_str(&mut reply, "uuid", &uuid.to_string()); - Message::Dictionary(reply) - } - Err(e) => make_error_reply("DaemonError", &format!("{}", e)), - } - } - - "GetAttachmentInfo" => { - let args = match get_dictionary_field(root, "arguments") { - Some(a) => a, - None => return make_error_reply("InvalidRequest", "Missing arguments"), - }; - let attachment_id = match dict_get_str(args, "attachment_id") { - Some(v) => v, - None => return make_error_reply("InvalidRequest", "Missing attachment_id"), - }; - match agent - .send_event(|r| Event::GetAttachment(attachment_id, r)) - .await - { - Ok(attachment) => { - let mut reply: XpcMap = HashMap::new(); - dict_put_str(&mut reply, "type", "GetAttachmentInfoResponse"); - dict_put_str( - &mut reply, - "path", - &attachment.get_path_for_preview(false).to_string_lossy(), - ); - dict_put_str( - &mut reply, - "preview_path", - &attachment.get_path_for_preview(true).to_string_lossy(), - ); - dict_put_str( - &mut reply, - "downloaded", - &attachment.is_downloaded(false).to_string(), - ); - dict_put_str( - &mut reply, - "preview_downloaded", - &attachment.is_downloaded(true).to_string(), - ); - Message::Dictionary(reply) - } - Err(e) => make_error_reply("DaemonError", &format!("{}", e)), - } - } - - "DownloadAttachment" => { - let args = match get_dictionary_field(root, "arguments") { - Some(a) => a, - None => return make_error_reply("InvalidRequest", "Missing arguments"), - }; - let attachment_id = match dict_get_str(args, "attachment_id") { - Some(v) => v, - None => return make_error_reply("InvalidRequest", "Missing attachment_id"), - }; - let preview = dict_get_str(args, "preview") - .map(|s| s == "true") - .unwrap_or(false); - match agent - .send_event(|r| Event::DownloadAttachment(attachment_id, preview, r)) - .await - { - Ok(()) => make_ok_reply(), - Err(e) => make_error_reply("DaemonError", &format!("{}", e)), - } - } - - "UploadAttachment" => { - use std::path::PathBuf; - let args = match get_dictionary_field(root, "arguments") { - Some(a) => a, - None => return make_error_reply("InvalidRequest", "Missing arguments"), - }; - let path = match dict_get_str(args, "path") { - Some(v) => v, - None => return make_error_reply("InvalidRequest", "Missing path"), - }; - match agent - .send_event(|r| Event::UploadAttachment(PathBuf::from(path), r)) - .await - { - Ok(upload_guid) => { - let mut reply: XpcMap = HashMap::new(); - dict_put_str(&mut reply, "type", "UploadAttachmentResponse"); - dict_put_str(&mut reply, "upload_guid", &upload_guid); - Message::Dictionary(reply) - } - Err(e) => make_error_reply("DaemonError", &format!("{}", e)), - } - } - - "GetAllSettings" => match agent.send_event(Event::GetAllSettings).await { - Ok(settings) => { - let mut reply: XpcMap = HashMap::new(); - dict_put_str(&mut reply, "type", "GetAllSettingsResponse"); - dict_put_str( - &mut reply, - "server_url", - &settings.server_url.unwrap_or_default(), - ); - dict_put_str( - &mut reply, - "username", - &settings.username.unwrap_or_default(), - ); - Message::Dictionary(reply) - } - Err(e) => make_error_reply("DaemonError", &format!("{}", e)), - }, - - "UpdateSettings" => { - let args = match get_dictionary_field(root, "arguments") { - Some(a) => a, - None => return make_error_reply("InvalidRequest", "Missing arguments"), - }; - let server_url = dict_get_str(args, "server_url"); - let username = dict_get_str(args, "username"); - let settings = Settings { - server_url, - username, - token: None, - }; - match agent - .send_event(|r| Event::UpdateSettings(settings, r)) - .await - { - Ok(()) => make_ok_reply(), - Err(e) => make_error_reply("DaemonError", &format!("{}", e)), - } - } - - // Subscribe and return immediately - "SubscribeSignals" => { - let mut list = subscribers.lock().unwrap(); - // Avoid duplicates - if !list.iter().any(|c| c.0 == current_client) { - list.push(XpcConn(current_client)); - log::trace!(target: LOG_TARGET, "Client subscribed to signals (total subscribers: {})", list.len()); - } - make_ok_reply() - }, - - // Unknown method fallback - other => make_error_reply("UnknownMethod", other), - }; - - // Echo request_id back (if present) so clients can correlate replies - response = attach_request_id(response, request_id); - response -} - -fn signal_to_message(signal: Signal) -> Message { - let mut root: XpcMap = HashMap::new(); - let mut args: XpcMap = HashMap::new(); - match signal { - Signal::ConversationsUpdated => { - dict_put_str(&mut root, "name", "ConversationsUpdated"); - } - Signal::MessagesUpdated(conversation_id) => { - dict_put_str(&mut root, "name", "MessagesUpdated"); - dict_put_str(&mut args, "conversation_id", &conversation_id); - } - Signal::AttachmentDownloaded(attachment_id) => { - dict_put_str(&mut root, "name", "AttachmentDownloadCompleted"); - dict_put_str(&mut args, "attachment_id", &attachment_id); - } - Signal::AttachmentUploaded(upload_guid, attachment_guid) => { - dict_put_str(&mut root, "name", "AttachmentUploadCompleted"); - dict_put_str(&mut args, "upload_guid", &upload_guid); - dict_put_str(&mut args, "attachment_guid", &attachment_guid); - } - Signal::UpdateStreamReconnected => { - dict_put_str(&mut root, "name", "UpdateStreamReconnected"); - } - } - if !args.is_empty() { - root.insert(cstr("arguments"), Message::Dictionary(args)); - } - Message::Dictionary(root) -} - -// legacy async client handler removed in reply-port implementation - diff --git a/kordophoned/src/xpc/mod.rs b/kordophoned/src/xpc/mod.rs index 8f189b2..81cbe3f 100644 --- a/kordophoned/src/xpc/mod.rs +++ b/kordophoned/src/xpc/mod.rs @@ -1,2 +1,4 @@ pub mod agent; pub mod interface; +pub mod rpc; +pub mod util; diff --git a/kordophoned/src/xpc/rpc.rs b/kordophoned/src/xpc/rpc.rs new file mode 100644 index 0000000..1647db1 --- /dev/null +++ b/kordophoned/src/xpc/rpc.rs @@ -0,0 +1,235 @@ +use super::agent::{XpcAgent, XpcConn, LOG_TARGET}; +use kordophoned::daemon::events::Event; +use kordophoned::daemon::settings::Settings; +use std::collections::HashMap; +use std::ffi::CString; +use xpc_connection::Message; +use xpc_connection_sys as xpc_sys; + +use super::util::*; + +pub async fn dispatch( + agent: &XpcAgent, + subscribers: &std::sync::Mutex>, + current_client: xpc_sys::xpc_connection_t, + root: &HashMap, +) -> Message { + let request_id = dict_get_str(root, "request_id"); + + let method = match dict_get_str(root, "method").or_else(|| dict_get_str(root, "type")) { + Some(m) => m, + None => return attach_request_id(make_error_reply("InvalidRequest", "Missing method/type"), request_id), + }; + + let _arguments = get_dictionary_field(root, "arguments"); + + let mut response = match method.as_str() { + // GetVersion + "GetVersion" => match agent.send_event(Event::GetVersion).await { + Ok(version) => { + let mut reply: XpcMap = HashMap::new(); + dict_put_str(&mut reply, "type", "GetVersionResponse"); + dict_put_str(&mut reply, "version", &version); + Message::Dictionary(reply) + } + Err(e) => make_error_reply("DaemonError", &format!("{}", e)), + }, + + // GetConversations + "GetConversations" => { + let mut limit: i32 = 100; + let mut offset: i32 = 0; + if let Some(args) = get_dictionary_field(root, "arguments") { + if let Some(v) = dict_get_i64_from_str(args, "limit") { limit = v as i32; } + if let Some(v) = dict_get_i64_from_str(args, "offset") { offset = v as i32; } + } + match agent.send_event(|r| Event::GetAllConversations(limit, offset, r)).await { + Ok(conversations) => { + let mut items: Vec = Vec::with_capacity(conversations.len()); + for conv in conversations { + let mut m: XpcMap = HashMap::new(); + dict_put_str(&mut m, "guid", &conv.guid); + dict_put_str(&mut m, "display_name", &conv.display_name.unwrap_or_default()); + dict_put_i64_as_str(&mut m, "unread_count", conv.unread_count as i64); + dict_put_str(&mut m, "last_message_preview", &conv.last_message_preview.unwrap_or_default()); + let participant_names: Vec = conv.participants.into_iter().map(|p| p.display_name()).collect(); + m.insert(cstr("participants"), array_from_strs(participant_names)); + dict_put_i64_as_str(&mut m, "date", conv.date.and_utc().timestamp()); + items.push(Message::Dictionary(m)); + } + let mut reply: XpcMap = HashMap::new(); + dict_put_str(&mut reply, "type", "GetConversationsResponse"); + reply.insert(cstr("conversations"), Message::Array(items)); + Message::Dictionary(reply) + } + Err(e) => make_error_reply("DaemonError", &format!("{}", e)), + } + } + + // Sync ops + "SyncConversationList" => match agent.send_event(Event::SyncConversationList).await { + Ok(()) => make_ok_reply(), + Err(e) => make_error_reply("DaemonError", &format!("{}", e)), + }, + "SyncAllConversations" => match agent.send_event(Event::SyncAllConversations).await { + Ok(()) => make_ok_reply(), + Err(e) => make_error_reply("DaemonError", &format!("{}", e)), + }, + "SyncConversation" => { + let conversation_id = match get_dictionary_field(root, "arguments").and_then(|m| dict_get_str(m, "conversation_id")) { + Some(id) => id, + None => return make_error_reply("InvalidRequest", "Missing conversation_id"), + }; + match agent.send_event(|r| Event::SyncConversation(conversation_id, r)).await { + Ok(()) => make_ok_reply(), + Err(e) => make_error_reply("DaemonError", &format!("{}", e)), + } + } + + // Mark as read + "MarkConversationAsRead" => { + let conversation_id = match get_dictionary_field(root, "arguments").and_then(|m| dict_get_str(m, "conversation_id")) { + Some(id) => id, + None => return make_error_reply("InvalidRequest", "Missing conversation_id"), + }; + match agent.send_event(|r| Event::MarkConversationAsRead(conversation_id, r)).await { + Ok(()) => make_ok_reply(), + Err(e) => make_error_reply("DaemonError", &format!("{}", e)), + } + } + + // GetMessages + "GetMessages" => { + let args = match get_dictionary_field(root, "arguments") { Some(a) => a, None => return make_error_reply("InvalidRequest", "Missing arguments") }; + let conversation_id = match dict_get_str(args, "conversation_id") { Some(id) => id, None => return make_error_reply("InvalidRequest", "Missing conversation_id") }; + let last_message_id = dict_get_str(args, "last_message_id"); + match agent.send_event(|r| Event::GetMessages(conversation_id, last_message_id, r)).await { + Ok(messages) => { + let mut items: Vec = Vec::with_capacity(messages.len()); + for msg in messages { + let mut m: XpcMap = HashMap::new(); + dict_put_str(&mut m, "id", &msg.id); + dict_put_str(&mut m, "text", &msg.text.replace('\u{FFFC}', "")); + dict_put_i64_as_str(&mut m, "date", msg.date.and_utc().timestamp()); + dict_put_str(&mut m, "sender", &msg.sender.display_name()); + items.push(Message::Dictionary(m)); + } + let mut reply: XpcMap = HashMap::new(); + dict_put_str(&mut reply, "type", "GetMessagesResponse"); + reply.insert(cstr("messages"), Message::Array(items)); + Message::Dictionary(reply) + } + Err(e) => make_error_reply("DaemonError", &format!("{}", e)), + } + } + + // Delete all + "DeleteAllConversations" => match agent.send_event(Event::DeleteAllConversations).await { + Ok(()) => make_ok_reply(), + Err(e) => make_error_reply("DaemonError", &format!("{}", e)), + }, + + // SendMessage + "SendMessage" => { + let args = match get_dictionary_field(root, "arguments") { Some(a) => a, None => return make_error_reply("InvalidRequest", "Missing arguments") }; + let conversation_id = match dict_get_str(args, "conversation_id") { Some(v) => v, None => return make_error_reply("InvalidRequest", "Missing conversation_id") }; + let text = dict_get_str(args, "text").unwrap_or_default(); + let attachment_guids: Vec = match args.get(&cstr("attachment_guids")) { + Some(Message::Array(arr)) => arr.iter().filter_map(|m| match m { Message::String(s) => Some(s.to_string_lossy().into_owned()), _ => None }).collect(), + _ => Vec::new(), + }; + match agent.send_event(|r| Event::SendMessage(conversation_id, text, attachment_guids, r)).await { + Ok(uuid) => { + let mut reply: XpcMap = HashMap::new(); + dict_put_str(&mut reply, "type", "SendMessageResponse"); + dict_put_str(&mut reply, "uuid", &uuid.to_string()); + Message::Dictionary(reply) + } + Err(e) => make_error_reply("DaemonError", &format!("{}", e)), + } + } + + // GetAttachmentInfo + "GetAttachmentInfo" => { + let args = match get_dictionary_field(root, "arguments") { Some(a) => a, None => return make_error_reply("InvalidRequest", "Missing arguments") }; + let attachment_id = match dict_get_str(args, "attachment_id") { Some(v) => v, None => return make_error_reply("InvalidRequest", "Missing attachment_id") }; + match agent.send_event(|r| Event::GetAttachment(attachment_id, r)).await { + Ok(attachment) => { + let mut reply: XpcMap = HashMap::new(); + dict_put_str(&mut reply, "type", "GetAttachmentInfoResponse"); + dict_put_str(&mut reply, "path", &attachment.get_path_for_preview(false).to_string_lossy()); + dict_put_str(&mut reply, "preview_path", &attachment.get_path_for_preview(true).to_string_lossy()); + dict_put_str(&mut reply, "downloaded", &attachment.is_downloaded(false).to_string()); + dict_put_str(&mut reply, "preview_downloaded", &attachment.is_downloaded(true).to_string()); + Message::Dictionary(reply) + } + Err(e) => make_error_reply("DaemonError", &format!("{}", e)), + } + } + + // DownloadAttachment + "DownloadAttachment" => { + let args = match get_dictionary_field(root, "arguments") { Some(a) => a, None => return make_error_reply("InvalidRequest", "Missing arguments") }; + let attachment_id = match dict_get_str(args, "attachment_id") { Some(v) => v, None => return make_error_reply("InvalidRequest", "Missing attachment_id") }; + let preview = dict_get_str(args, "preview").map(|s| s == "true").unwrap_or(false); + match agent.send_event(|r| Event::DownloadAttachment(attachment_id, preview, r)).await { + Ok(()) => make_ok_reply(), + Err(e) => make_error_reply("DaemonError", &format!("{}", e)), + } + } + + // UploadAttachment + "UploadAttachment" => { + use std::path::PathBuf; + let args = match get_dictionary_field(root, "arguments") { Some(a) => a, None => return make_error_reply("InvalidRequest", "Missing arguments") }; + let path = match dict_get_str(args, "path") { Some(v) => v, None => return make_error_reply("InvalidRequest", "Missing path") }; + match agent.send_event(|r| Event::UploadAttachment(PathBuf::from(path), r)).await { + Ok(upload_guid) => { + let mut reply: XpcMap = HashMap::new(); + dict_put_str(&mut reply, "type", "UploadAttachmentResponse"); + dict_put_str(&mut reply, "upload_guid", &upload_guid); + Message::Dictionary(reply) + } + Err(e) => make_error_reply("DaemonError", &format!("{}", e)), + } + } + + // Settings + "GetAllSettings" => match agent.send_event(Event::GetAllSettings).await { + Ok(settings) => { + let mut reply: XpcMap = HashMap::new(); + dict_put_str(&mut reply, "type", "GetAllSettingsResponse"); + dict_put_str(&mut reply, "server_url", &settings.server_url.unwrap_or_default()); + dict_put_str(&mut reply, "username", &settings.username.unwrap_or_default()); + Message::Dictionary(reply) + } + Err(e) => make_error_reply("DaemonError", &format!("{}", e)), + }, + "UpdateSettings" => { + let args = match get_dictionary_field(root, "arguments") { Some(a) => a, None => return make_error_reply("InvalidRequest", "Missing arguments") }; + let server_url = dict_get_str(args, "server_url"); + let username = dict_get_str(args, "username"); + let settings = Settings { server_url, username, token: None }; + match agent.send_event(|r| Event::UpdateSettings(settings, r)).await { + Ok(()) => make_ok_reply(), + Err(e) => make_error_reply("DaemonError", &format!("{}", e)), + } + } + + // Subscribe + "SubscribeSignals" => { + let mut list = subscribers.lock().unwrap(); + if !list.iter().any(|c| c.0 == current_client) { + list.push(XpcConn(current_client)); + log::trace!(target: LOG_TARGET, "Client subscribed to signals (total subscribers: {})", list.len()); + } + make_ok_reply() + }, + + // Unknown method fallback + other => make_error_reply("UnknownMethod", other), + }; + + response = attach_request_id(response, request_id); + response +} diff --git a/kordophoned/src/xpc/util.rs b/kordophoned/src/xpc/util.rs new file mode 100644 index 0000000..2889c72 --- /dev/null +++ b/kordophoned/src/xpc/util.rs @@ -0,0 +1,87 @@ +use kordophoned::daemon::signals::Signal; +use std::collections::HashMap; +use std::ffi::CString; +use xpc_connection::Message; + +pub type XpcMap = HashMap; + +pub fn cstr(s: &str) -> CString { CString::new(s).unwrap_or_else(|_| CString::new("").unwrap()) } + +pub fn get_dictionary_field<'a>( + map: &'a HashMap, + key: &str, +) -> Option<&'a HashMap> { + let k = CString::new(key).ok()?; + map.get(&k).and_then(|v| match v { Message::Dictionary(d) => Some(d), _ => None }) +} + +pub fn dict_get_str(map: &HashMap, key: &str) -> Option { + let k = CString::new(key).ok()?; + match map.get(&k) { Some(Message::String(v)) => Some(v.to_string_lossy().into_owned()), _ => None } +} + +pub fn dict_get_i64_from_str(map: &HashMap, key: &str) -> Option { + dict_get_str(map, key).and_then(|s| s.parse::().ok()) +} + +pub fn dict_put_str(map: &mut XpcMap, key: &str, value: impl AsRef) { + map.insert(cstr(key), Message::String(cstr(value.as_ref()))); +} + +pub fn dict_put_i64_as_str(map: &mut XpcMap, key: &str, value: i64) { dict_put_str(map, key, value.to_string()); } + +pub fn array_from_strs(values: impl IntoIterator) -> Message { + let arr = values.into_iter().map(|s| Message::String(cstr(&s))).collect(); + Message::Array(arr) +} + +pub fn make_ok_reply() -> Message { + let mut reply: XpcMap = HashMap::new(); + dict_put_str(&mut reply, "type", "Ok"); + Message::Dictionary(reply) +} + +pub fn make_error_reply(code: &str, message: &str) -> Message { + let mut reply: HashMap = HashMap::new(); + reply.insert(cstr("type"), Message::String(cstr("Error"))); + reply.insert(cstr("error"), Message::String(cstr(code))); + reply.insert(cstr("message"), Message::String(cstr(message))); + Message::Dictionary(reply) +} + +pub fn attach_request_id(mut message: Message, request_id: Option) -> Message { + if let (Some(id), Message::Dictionary(ref mut m)) = (request_id, &mut message) { + dict_put_str(m, "request_id", &id); + } + message +} + +pub fn signal_to_message(signal: Signal) -> Message { + let mut root: XpcMap = HashMap::new(); + let mut args: XpcMap = HashMap::new(); + match signal { + Signal::ConversationsUpdated => { + dict_put_str(&mut root, "name", "ConversationsUpdated"); + } + Signal::MessagesUpdated(conversation_id) => { + dict_put_str(&mut root, "name", "MessagesUpdated"); + dict_put_str(&mut args, "conversation_id", &conversation_id); + } + Signal::AttachmentDownloaded(attachment_id) => { + dict_put_str(&mut root, "name", "AttachmentDownloadCompleted"); + dict_put_str(&mut args, "attachment_id", &attachment_id); + } + Signal::AttachmentUploaded(upload_guid, attachment_guid) => { + dict_put_str(&mut root, "name", "AttachmentUploadCompleted"); + dict_put_str(&mut args, "upload_guid", &upload_guid); + dict_put_str(&mut args, "attachment_guid", &attachment_guid); + } + Signal::UpdateStreamReconnected => { + dict_put_str(&mut root, "name", "UpdateStreamReconnected"); + } + } + if !args.is_empty() { + root.insert(cstr("arguments"), Message::Dictionary(args)); + } + Message::Dictionary(root) +}