diff --git a/kordophoned/src/xpc/agent.rs b/kordophoned/src/xpc/agent.rs index 50177a4..f9bb2aa 100644 --- a/kordophoned/src/xpc/agent.rs +++ b/kordophoned/src/xpc/agent.rs @@ -6,7 +6,7 @@ use std::collections::HashMap; use std::ffi::CString; use std::sync::Arc; use std::thread; -use tokio::sync::{mpsc, oneshot, Mutex}; +use tokio::sync::{broadcast, mpsc, oneshot, Mutex}; use xpc_connection::{Message, MessageError, XpcClient, XpcListener}; static LOG_TARGET: &str = "xpc"; @@ -47,10 +47,31 @@ impl XpcAgent { service_name ); + // Broadcast channel for signals to all connected clients + let (signal_tx, _signal_rx) = broadcast::channel::(64); + + // Spawn a single distributor task that forwards daemon signals to broadcast + { + let receiver_arc = self.signal_receiver.clone(); + let signal_tx_clone = signal_tx.clone(); + tokio::spawn(async move { + let mut receiver = receiver_arc + .lock() + .await + .take() + .expect("Signal receiver already taken"); + + while let Some(signal) = receiver.recv().await { + let _ = signal_tx_clone.send(signal); + } + }); + } + let mut listener = XpcListener::listen(&mach_port_name); while let Some(client) = listener.next().await { let agent = self.clone(); + let signal_rx = signal_tx.subscribe(); thread::spawn(move || { let rt = match tokio::runtime::Builder::new_current_thread() .enable_all() @@ -62,7 +83,7 @@ impl XpcAgent { return; } }; - rt.block_on(handle_client(agent, client)); + rt.block_on(handle_client(agent, client, signal_rx)); }); } @@ -383,27 +404,76 @@ async fn dispatch(agent: &XpcAgent, root: &HashMap) -> Message } } + // No-op used by clients to ensure the connection is established and subscribed + "SubscribeSignals" => { + make_ok_reply() + } + // Unknown method fallback other => make_error_reply("UnknownMethod", other), } } -async fn handle_client(agent: XpcAgent, mut client: XpcClient) { +fn signal_to_message(signal: Signal) -> Message { + let mut root: XpcMap = HashMap::new(); + let mut args: XpcMap = HashMap::new(); + match signal { + Signal::ConversationsUpdated => { + dict_put_str(&mut root, "name", "ConversationsUpdated"); + } + Signal::MessagesUpdated(conversation_id) => { + dict_put_str(&mut root, "name", "MessagesUpdated"); + dict_put_str(&mut args, "conversation_id", &conversation_id); + } + Signal::AttachmentDownloaded(attachment_id) => { + dict_put_str(&mut root, "name", "AttachmentDownloadCompleted"); + dict_put_str(&mut args, "attachment_id", &attachment_id); + } + Signal::AttachmentUploaded(upload_guid, attachment_guid) => { + dict_put_str(&mut root, "name", "AttachmentUploadCompleted"); + dict_put_str(&mut args, "upload_guid", &upload_guid); + dict_put_str(&mut args, "attachment_guid", &attachment_guid); + } + Signal::UpdateStreamReconnected => { + dict_put_str(&mut root, "name", "UpdateStreamReconnected"); + } + } + if !args.is_empty() { root.insert(cstr("arguments"), Message::Dictionary(args)); } + Message::Dictionary(root) +} + +async fn handle_client(agent: XpcAgent, mut client: XpcClient, mut signal_rx: broadcast::Receiver) { log::info!(target: LOG_TARGET, "New XPC connection"); - while let Some(message) = client.next().await { - match message { - Message::Error(MessageError::ConnectionInterrupted) => { - log::warn!(target: LOG_TARGET, "XPC connection interrupted"); + loop { + tokio::select! { + maybe_msg = client.next() => { + match maybe_msg { + Some(Message::Error(MessageError::ConnectionInterrupted)) => { + log::warn!(target: LOG_TARGET, "XPC connection interrupted"); + } + Some(Message::Dictionary(map)) => { + let response = dispatch(&agent, &map).await; + client.send_message(response); + } + Some(other) => { + log::info!(target: LOG_TARGET, "Echoing message: {:?}", other); + client.send_message(other); + } + None => break, + } } - Message::Dictionary(map) => { - let response = dispatch(&agent, &map).await; - client.send_message(response); - } - other => { - // For now just echo any non-dictionary messages (useful for testing). - log::info!(target: LOG_TARGET, "Echoing message: {:?}", other); - client.send_message(other); + recv = signal_rx.recv() => { + match recv { + Ok(signal) => { + let msg = signal_to_message(signal); + client.send_message(msg); + } + Err(broadcast::error::RecvError::Closed) => break, + Err(broadcast::error::RecvError::Lagged(_)) => { + log::warn!(target: LOG_TARGET, "Lagged behind on signals; dropping some events for this client"); + } + } } } } diff --git a/kpcli/src/daemon/xpc.rs b/kpcli/src/daemon/xpc.rs index e302f1c..fae71d0 100644 --- a/kpcli/src/daemon/xpc.rs +++ b/kpcli/src/daemon/xpc.rs @@ -347,7 +347,66 @@ impl DaemonInterface for XpcDaemonInterface { Ok(()) } async fn wait_for_signals(&mut self) -> Result<()> { - Err(anyhow::anyhow!("Feature not implemented for XPC")) + let mach_port_name = Self::build_service_name()?; + let mut client = XPCClient::connect(&mach_port_name); + + // Send a subscription/warm-up message so the server loop starts selecting for this client + client.send_message(Message::Dictionary(Self::build_request("SubscribeSignals", None))); + + println!("Waiting for XPC signals..."); + while let Some(msg) = client.next().await { + match msg { + Message::Dictionary(map) => { + let name_key = Self::key("name"); + let args_key = Self::key("arguments"); + let name = match map.get(&name_key) { Some(Message::String(s)) => s.to_string_lossy().into_owned(), _ => continue }; + + match name.as_str() { + "ConversationsUpdated" => { + println!("Signal: Conversations updated"); + } + "MessagesUpdated" => { + if let Some(Message::Dictionary(args)) = map.get(&args_key) { + if let Some(Message::String(cid)) = args.get(&Self::key("conversation_id")) { + println!("Signal: Messages updated for conversation {}", cid.to_string_lossy()); + } + } + } + "UpdateStreamReconnected" => { + println!("Signal: Update stream reconnected"); + } + "AttachmentDownloadCompleted" => { + if let Some(Message::Dictionary(args)) = map.get(&args_key) { + if let Some(Message::String(aid)) = args.get(&Self::key("attachment_id")) { + println!("Signal: Attachment downloaded: {}", aid.to_string_lossy()); + } + } + } + "AttachmentDownloadFailed" => { + if let Some(Message::Dictionary(args)) = map.get(&args_key) { + if let Some(Message::String(aid)) = args.get(&Self::key("attachment_id")) { + eprintln!("Signal: Attachment download failed: {}", aid.to_string_lossy()); + } + } + } + "AttachmentUploadCompleted" => { + if let Some(Message::Dictionary(args)) = map.get(&args_key) { + let upload = args.get(&Self::key("upload_guid")).and_then(|v| match v { Message::String(s) => Some(s.to_string_lossy().into_owned()), _ => None }).unwrap_or_default(); + let attachment = args.get(&Self::key("attachment_guid")).and_then(|v| match v { Message::String(s) => Some(s.to_string_lossy().into_owned()), _ => None }).unwrap_or_default(); + println!("Signal: Attachment uploaded: upload={}, attachment={}", upload, attachment); + } + } + "ConfigChanged" => { + println!("Signal: Config changed"); + } + _ => {} + } + } + Message::Error(xpc_connection::MessageError::ConnectionInvalid) => break, + _ => {} + } + } + Ok(()) } async fn config(&mut self, _cmd: ConfigCommands) -> Result<()> { let mach_port_name = Self::build_service_name()?;