From 8d9251bfe2b181f5b7cd7b5cc059ab0d4b829bff Mon Sep 17 00:00:00 2001 From: James Magahern Date: Sun, 14 Dec 2025 17:09:41 -0800 Subject: [PATCH] first pass at platform agnostic notifications --- core/kordophoned/src/daemon/mod.rs | 162 +++++++++++++---------- core/kordophoned/src/daemon/notifier.rs | 102 +++++++++++++- core/kordophoned/src/daemon/signals.rs | 12 ++ core/kordophoned/src/dbus/agent.rs | 168 +++++++++++++----------- core/kordophoned/src/main.rs | 5 +- core/kordophoned/src/xpc/agent.rs | 37 +++++- 6 files changed, 332 insertions(+), 154 deletions(-) diff --git a/core/kordophoned/src/daemon/mod.rs b/core/kordophoned/src/daemon/mod.rs index 7f088aa..ec893a2 100644 --- a/core/kordophoned/src/daemon/mod.rs +++ b/core/kordophoned/src/daemon/mod.rs @@ -17,8 +17,11 @@ use std::path::PathBuf; use std::sync::Arc; use thiserror::Error; -use tokio::sync::mpsc::{Receiver, Sender}; -use tokio::sync::Mutex; +use tokio::sync::{ + broadcast, + mpsc::{Receiver, Sender}, + Mutex, +}; use uuid::Uuid; use kordophone_db::{ @@ -79,8 +82,7 @@ pub struct Daemon { pub event_sender: Sender, event_receiver: Receiver, - signal_receiver: Option>, - signal_sender: Sender, + signal_sender: broadcast::Sender, post_office_sink: Sender, post_office_source: Option>, @@ -107,7 +109,7 @@ impl Daemon { // Create event channels let (event_sender, event_receiver) = tokio::sync::mpsc::channel(100); - let (signal_sender, signal_receiver) = tokio::sync::mpsc::channel(100); + let (signal_sender, _) = tokio::sync::broadcast::channel(100); let (post_office_sink, post_office_source) = tokio::sync::mpsc::channel(100); // Create background task runtime @@ -126,7 +128,6 @@ impl Daemon { database, event_receiver, event_sender, - signal_receiver: Some(signal_receiver), signal_sender, post_office_sink, post_office_source: Some(post_office_source), @@ -171,6 +172,16 @@ impl Daemon { attachment_store.run().await; }); + // Notification listener + { + let notifier = self.notifier.clone(); + let mut signal_rx = self.signal_sender.subscribe(); + let database = self.database.clone(); + tokio::spawn(async move { + notifier.listen(signal_rx, database).await; + }); + } + while let Some(event) = self.event_receiver.recv().await { log::debug!(target: target::EVENT, "Received event: {:?}", event); self.handle_event(event).await; @@ -204,11 +215,9 @@ impl Daemon { Event::SyncAllConversations(reply) => { let mut db_clone = self.database.clone(); let signal_sender = self.signal_sender.clone(); - let notifier = self.notifier.clone(); self.runtime.spawn(async move { let result = - Self::sync_all_conversations_impl(&mut db_clone, &signal_sender, notifier) - .await; + Self::sync_all_conversations_impl(&mut db_clone, &signal_sender).await; if let Err(e) = result { log::error!(target: target::SYNC, "Error handling sync event: {}", e); } @@ -221,12 +230,10 @@ impl Daemon { Event::SyncConversation(conversation_id, reply) => { let mut db_clone = self.database.clone(); let signal_sender = self.signal_sender.clone(); - let notifier = self.notifier.clone(); self.runtime.spawn(async move { let result = Self::sync_conversation_impl( &mut db_clone, &signal_sender, - notifier, conversation_id, ) .await; @@ -270,10 +277,11 @@ impl Daemon { self.spawn_conversation_list_sync(); // Send signal to the client that the update stream has been reconnected. - self.signal_sender - .send(Signal::UpdateStreamReconnected) - .await - .unwrap(); + Self::send_signal( + &self.signal_sender, + Signal::UpdateStreamReconnected, + target::UPDATES, + ); } Event::GetAllConversations(limit, offset, reply) => { @@ -342,7 +350,7 @@ impl Daemon { reply.send(uuid).unwrap(); // Notify clients that messages have changed (e.g., to refresh placeholders). - self.emit_messages_updated(conversation_id).await; + self.emit_messages_updated(conversation_id); } Event::MessageSent(message, outgoing_message, conversation_id) => { @@ -373,14 +381,18 @@ impl Daemon { .map(|messages| messages.retain(|m| m.guid != outgoing_message.guid)); // Notify clients to refresh the conversation after the final message arrives. - self.emit_messages_updated(conversation_id).await; + self.emit_messages_updated(conversation_id); } Event::TestNotification(summary, body, reply) => { let result = self - .notifier - .send_manual(&summary, &body) - .map_err(|e| format!("Failed to display notification: {}", e)); + .signal_sender + .send(Signal::Internal(InternalSignal::TestNotification { + summary, + body, + })) + .map(|_| ()) + .map_err(|e| e.to_string()); reply.send(result).unwrap(); } @@ -413,10 +425,11 @@ impl Daemon { log::debug!(target: target::ATTACHMENTS, "Daemon: attachment downloaded: {}, sending signal", attachment_id); // Send signal to the client that the attachment has been downloaded. - self.signal_sender - .send(Signal::AttachmentDownloaded(attachment_id)) - .await - .unwrap(); + Self::send_signal( + &self.signal_sender, + Signal::AttachmentDownloaded(attachment_id), + target::ATTACHMENTS, + ); } Event::UploadAttachment(path, reply) => { @@ -431,17 +444,17 @@ impl Daemon { Event::AttachmentUploaded(upload_guid, attachment_guid) => { log::info!(target: target::ATTACHMENTS, "Daemon: attachment uploaded: {}, {}", upload_guid, attachment_guid); - self.signal_sender - .send(Signal::AttachmentUploaded(upload_guid, attachment_guid)) - .await - .unwrap(); + Self::send_signal( + &self.signal_sender, + Signal::AttachmentUploaded(upload_guid, attachment_guid), + target::ATTACHMENTS, + ); } } } - /// Panics if the signal receiver has already been taken. - pub fn obtain_signal_receiver(&mut self) -> Receiver { - self.signal_receiver.take().unwrap() + pub fn subscribe_signals(&self) -> broadcast::Receiver { + self.signal_sender.subscribe() } async fn get_conversations_limit_offset( @@ -456,18 +469,8 @@ impl Daemon { .await } - async fn emit_messages_updated(&self, conversation_id: String) { - self.notifier - .notify_new_messages(&self.database, &conversation_id) - .await; - - if let Err(e) = self - .signal_sender - .send(Signal::MessagesUpdated(conversation_id)) - .await - { - log::warn!(target: target::DAEMON, "Failed to send MessagesUpdated signal: {}", e); - } + fn emit_messages_updated(&self, conversation_id: String) { + Self::send_messages_updated(&self.signal_sender, conversation_id); } async fn get_messages( @@ -545,7 +548,7 @@ impl Daemon { async fn sync_conversation_list( database: &mut Arc>, - signal_sender: &Sender, + signal_sender: &broadcast::Sender, ) -> Result<()> { log::info!(target: target::SYNC, "Starting list conversation sync"); @@ -597,7 +600,7 @@ impl Daemon { } // Send conversations updated signal - signal_sender.send(Signal::ConversationsUpdated).await?; + Self::send_signal(signal_sender, Signal::ConversationsUpdated, target::SYNC); log::info!(target: target::SYNC, "List synchronized: {} conversations", num_conversations); Ok(()) @@ -605,8 +608,7 @@ impl Daemon { async fn sync_all_conversations_impl( database: &mut Arc>, - signal_sender: &Sender, - notifier: Arc, + signal_sender: &broadcast::Sender, ) -> Result<()> { log::info!(target: target::SYNC, "Starting full conversation sync"); @@ -630,17 +632,11 @@ impl Daemon { .await?; // Sync individual conversation. - Self::sync_conversation_impl( - database, - signal_sender, - notifier.clone(), - conversation_id, - ) - .await?; + Self::sync_conversation_impl(database, signal_sender, conversation_id).await?; } // Send conversations updated signal. - signal_sender.send(Signal::ConversationsUpdated).await?; + Self::send_signal(signal_sender, Signal::ConversationsUpdated, target::SYNC); log::info!(target: target::SYNC, "Full sync complete, {} conversations processed", num_conversations); Ok(()) @@ -648,8 +644,7 @@ impl Daemon { async fn sync_conversation_impl( database: &mut Arc>, - signal_sender: &Sender, - notifier: Arc, + signal_sender: &broadcast::Sender, conversation_id: String, ) -> Result<()> { log::debug!(target: target::SYNC, "Starting conversation sync for {}", conversation_id); @@ -707,12 +702,7 @@ impl Daemon { // Send messages updated signal, if we actually inserted any messages. if num_messages > 0 { - notifier - .notify_new_messages(database, &conversation_id) - .await; - signal_sender - .send(Signal::MessagesUpdated(conversation_id.clone())) - .await?; + Self::send_messages_updated(signal_sender, conversation_id.clone()); } log::debug!(target: target::SYNC, "Synchronized {} messages for conversation {}", num_messages, &conversation_id); @@ -733,14 +723,14 @@ impl Daemon { async fn update_conversation_metadata_impl( database: &mut Arc>, conversation: Conversation, - signal_sender: &Sender, + signal_sender: &broadcast::Sender, ) -> Result<()> { log::debug!(target: target::DAEMON, "Updating conversation metadata: {}", conversation.guid); let updated = database .with_repository(|r| r.merge_conversation_metadata(conversation)) .await?; if updated { - signal_sender.send(Signal::ConversationsUpdated).await?; + Self::send_signal(signal_sender, Signal::ConversationsUpdated, target::DAEMON); } Ok(()) @@ -755,6 +745,40 @@ impl Daemon { self.database.with_settings(|s| settings.save(s)).await } + fn send_signal(sender: &broadcast::Sender, signal: Signal, log_target: &str) { + if let Err(error) = sender.send(signal) { + log::trace!( + target: log_target, + "Signal delivery skipped (no listeners?): {}", + error + ); + } + } + + fn send_internal(sender: &broadcast::Sender, signal: InternalSignal) { + if let Err(error) = sender.send(Signal::Internal(signal)) { + log::trace!( + target: target::DAEMON, + "Internal signal delivery skipped: {}", + error + ); + } + } + + fn send_messages_updated(sender: &broadcast::Sender, conversation_id: String) { + Self::send_internal( + sender, + InternalSignal::MessagesUpdated(conversation_id.clone()), + ); + if let Err(error) = sender.send(Signal::MessagesUpdated(conversation_id)) { + log::warn!( + target: target::DAEMON, + "Failed to send MessagesUpdated signal: {}", + error + ); + } + } + async fn get_client_impl( database: &mut Arc>, ) -> Result> { @@ -787,9 +811,11 @@ impl Daemon { }) .await?; - self.signal_sender - .send(Signal::ConversationsUpdated) - .await?; + Self::send_signal( + &self.signal_sender, + Signal::ConversationsUpdated, + target::SYNC, + ); Ok(()) } diff --git a/core/kordophoned/src/daemon/notifier.rs b/core/kordophoned/src/daemon/notifier.rs index bd3863a..0001a24 100644 --- a/core/kordophoned/src/daemon/notifier.rs +++ b/core/kordophoned/src/daemon/notifier.rs @@ -1,5 +1,6 @@ use super::contact_resolver::{ContactResolver, DefaultContactResolverBackend}; use super::models::message::Participant; +use super::signals::{InternalSignal, Signal}; use super::{target, Message}; use kordophone_db::{ @@ -9,7 +10,7 @@ use kordophone_db::{ }; use notify::Notification; use std::sync::Arc; -use tokio::sync::Mutex; +use tokio::sync::{broadcast, Mutex}; /// Centralised notification helper used by platform transports (D-Bus, XPC, …). pub struct NotificationService { @@ -25,6 +26,57 @@ impl NotificationService { } } + pub async fn listen( + self: Arc, + mut signal_rx: broadcast::Receiver, + database: Arc>, + ) { + log::trace!(target: target::DAEMON, "NotificationService listener started"); + loop { + match signal_rx.recv().await { + Ok(Signal::Internal(InternalSignal::MessagesUpdated(conversation_id))) => { + log::trace!( + target: target::DAEMON, + "NotificationService received MessagesUpdated for {}", + conversation_id + ); + self.notify_new_messages(&database, &conversation_id).await; + } + Ok(Signal::Internal(InternalSignal::TestNotification { summary, body })) => { + log::trace!( + target: target::DAEMON, + "NotificationService received TestNotification" + ); + if let Err(error) = self.send_manual(&summary, &body) { + log::warn!( + target: target::DAEMON, + "Failed to display test notification: {}", + error + ); + } + } + Ok(other) => { + log::trace!( + target: target::DAEMON, + "NotificationService ignoring signal: {:?}", + other + ); + } + Err(broadcast::error::RecvError::Lagged(skipped)) => { + log::warn!( + target: target::DAEMON, + "NotificationService lagged; skipped {} signals", + skipped + ); + } + Err(broadcast::error::RecvError::Closed) => { + log::trace!(target: target::DAEMON, "NotificationService listener exiting"); + break; + } + } + } + } + /// Checks whether a new user-visible notification should be shown for the /// given conversation and displays it if appropriate. pub async fn notify_new_messages( @@ -32,7 +84,17 @@ impl NotificationService { database: &Arc>, conversation_id: &str, ) { + log::trace!( + target: target::DAEMON, + "NotificationService preparing payload for {}", + conversation_id + ); if let Some((summary, body)) = self.prepare_payload(database, conversation_id).await { + log::trace!( + target: target::DAEMON, + "NotificationService displaying notification for {}", + conversation_id + ); if let Err(error) = self.show_notification(&summary, &body) { log::warn!( target: target::DAEMON, @@ -41,11 +103,21 @@ impl NotificationService { error ); } + } else { + log::trace!( + target: target::DAEMON, + "NotificationService skipping notification for {}", + conversation_id + ); } } /// Displays a manual test notification. pub fn send_manual(&self, summary: &str, body: &str) -> Result<(), notify::error::Error> { + log::trace!( + target: target::DAEMON, + "NotificationService sending manual notification" + ); self.show_notification(summary, body) } @@ -62,7 +134,14 @@ impl NotificationService { let conversation = match conversation_opt { Ok(Some(conv)) => conv, - Ok(None) => return None, + Ok(None) => { + log::trace!( + target: target::DAEMON, + "NotificationService: conversation {} not found", + conversation_id + ); + return None; + } Err(err) => { log::warn!( target: target::DAEMON, @@ -75,6 +154,11 @@ impl NotificationService { }; if conversation.unread_count == 0 { + log::trace!( + target: target::DAEMON, + "NotificationService: conversation {} has no unread messages", + conversation_id + ); return None; } @@ -86,7 +170,14 @@ impl NotificationService { let last_message: Message = match last_message_opt { Ok(Some(message)) => message.into(), - Ok(None) => return None, + Ok(None) => { + log::trace!( + target: target::DAEMON, + "NotificationService: conversation {} has no messages", + conversation_id + ); + return None; + } Err(err) => { log::warn!( target: target::DAEMON, @@ -99,6 +190,11 @@ impl NotificationService { }; if matches!(last_message.sender, Participant::Me) { + log::trace!( + target: target::DAEMON, + "NotificationService: last message in {} was sent by self", + conversation_id + ); return None; } diff --git a/core/kordophoned/src/daemon/signals.rs b/core/kordophoned/src/daemon/signals.rs index d2a4cfa..916b48c 100644 --- a/core/kordophoned/src/daemon/signals.rs +++ b/core/kordophoned/src/daemon/signals.rs @@ -1,3 +1,12 @@ +#[derive(Debug, Clone)] +pub enum InternalSignal { + /// Notification that new messages are available for a conversation. + MessagesUpdated(String), + + /// Manual test notification request. + TestNotification { summary: String, body: String }, +} + #[derive(Debug, Clone)] pub enum Signal { /// Emitted when the list of conversations is updated. @@ -21,4 +30,7 @@ pub enum Signal { /// Emitted when the update stream is reconnected after a timeout or configuration change. UpdateStreamReconnected, + + /// Internal-only signals consumed by daemon components. + Internal(InternalSignal), } diff --git a/core/kordophoned/src/dbus/agent.rs b/core/kordophoned/src/dbus/agent.rs index 8a3bca0..04dc5ea 100644 --- a/core/kordophoned/src/dbus/agent.rs +++ b/core/kordophoned/src/dbus/agent.rs @@ -2,7 +2,7 @@ use dbus::arg; use dbus_tree::MethodErr; use std::sync::Arc; use std::{future::Future, thread}; -use tokio::sync::{mpsc, oneshot, Mutex}; +use tokio::sync::{broadcast, mpsc, oneshot, Mutex}; use kordophoned::daemon::{ contact_resolver::{ContactResolver, DefaultContactResolverBackend}, @@ -22,12 +22,15 @@ use dbus_tokio::connection; #[derive(Clone)] pub struct DBusAgent { event_sink: mpsc::Sender, - signal_receiver: Arc>>>, + signal_receiver: Arc>>>, contact_resolver: ContactResolver, } impl DBusAgent { - pub fn new(event_sink: mpsc::Sender, signal_receiver: mpsc::Receiver) -> Self { + pub fn new( + event_sink: mpsc::Sender, + signal_receiver: broadcast::Receiver, + ) -> Self { Self { event_sink, signal_receiver: Arc::new(Mutex::new(Some(signal_receiver))), @@ -75,80 +78,95 @@ impl DBusAgent { .take() .expect("Signal receiver already taken"); - while let Some(signal) = receiver.recv().await { - match signal { - Signal::ConversationsUpdated => { - log::debug!("Sending signal: ConversationsUpdated"); - registry - .send_signal( - interface::OBJECT_PATH, - DbusSignals::ConversationsUpdated {}, - ) - .unwrap_or_else(|_| { - log::error!("Failed to send signal"); - 0 - }); - } - Signal::MessagesUpdated(conversation_id) => { - log::debug!( - "Sending signal: MessagesUpdated for conversation {}", - conversation_id + loop { + match receiver.recv().await { + Ok(signal) => match signal { + Signal::ConversationsUpdated => { + log::debug!("Sending signal: ConversationsUpdated"); + registry + .send_signal( + interface::OBJECT_PATH, + DbusSignals::ConversationsUpdated {}, + ) + .unwrap_or_else(|_| { + log::error!("Failed to send signal"); + 0 + }); + } + Signal::MessagesUpdated(conversation_id) => { + log::debug!( + "Sending signal: MessagesUpdated for conversation {}", + conversation_id + ); + registry + .send_signal( + interface::OBJECT_PATH, + DbusSignals::MessagesUpdated { conversation_id }, + ) + .unwrap_or_else(|_| { + log::error!("Failed to send signal"); + 0 + }); + } + Signal::AttachmentDownloaded(attachment_id) => { + log::debug!( + "Sending signal: AttachmentDownloaded for attachment {}", + attachment_id + ); + registry + .send_signal( + interface::OBJECT_PATH, + DbusSignals::AttachmentDownloadCompleted { attachment_id }, + ) + .unwrap_or_else(|_| { + log::error!("Failed to send signal"); + 0 + }); + } + Signal::AttachmentUploaded(upload_guid, attachment_guid) => { + log::debug!( + "Sending signal: AttachmentUploaded for upload {}, attachment {}", + upload_guid, + attachment_guid + ); + registry + .send_signal( + interface::OBJECT_PATH, + DbusSignals::AttachmentUploadCompleted { + upload_guid, + attachment_guid, + }, + ) + .unwrap_or_else(|_| { + log::error!("Failed to send signal"); + 0 + }); + } + Signal::UpdateStreamReconnected => { + log::debug!("Sending signal: UpdateStreamReconnected"); + registry + .send_signal( + interface::OBJECT_PATH, + DbusSignals::UpdateStreamReconnected {}, + ) + .unwrap_or_else(|_| { + log::error!("Failed to send signal"); + 0 + }); + } + Signal::Internal(_) => { + log::trace!("Ignoring internal signal for D-Bus transport"); + } + }, + Err(broadcast::error::RecvError::Lagged(skipped)) => { + log::warn!( + "Signal receiver lagged; skipped {} daemon signals", + skipped ); - registry - .send_signal( - interface::OBJECT_PATH, - DbusSignals::MessagesUpdated { conversation_id }, - ) - .unwrap_or_else(|_| { - log::error!("Failed to send signal"); - 0 - }); } - Signal::AttachmentDownloaded(attachment_id) => { - log::debug!( - "Sending signal: AttachmentDownloaded for attachment {}", - attachment_id - ); - registry - .send_signal( - interface::OBJECT_PATH, - DbusSignals::AttachmentDownloadCompleted { attachment_id }, - ) - .unwrap_or_else(|_| { - log::error!("Failed to send signal"); - 0 - }); - } - Signal::AttachmentUploaded(upload_guid, attachment_guid) => { - log::debug!( - "Sending signal: AttachmentUploaded for upload {}, attachment {}", - upload_guid, - attachment_guid - ); - registry - .send_signal( - interface::OBJECT_PATH, - DbusSignals::AttachmentUploadCompleted { - upload_guid, - attachment_guid, - }, - ) - .unwrap_or_else(|_| { - log::error!("Failed to send signal"); - 0 - }); - } - Signal::UpdateStreamReconnected => { - log::debug!("Sending signal: UpdateStreamReconnected"); - registry - .send_signal( - interface::OBJECT_PATH, - DbusSignals::UpdateStreamReconnected {}, - ) - .unwrap_or_else(|_| { - log::error!("Failed to send signal"); - 0 - }); + Err(broadcast::error::RecvError::Closed) => { + log::warn!("Signal channel closed; stopping D-Bus forwarding"); + break; } } } diff --git a/core/kordophoned/src/main.rs b/core/kordophoned/src/main.rs index 3f1e8dd..dccccdd 100644 --- a/core/kordophoned/src/main.rs +++ b/core/kordophoned/src/main.rs @@ -26,7 +26,7 @@ async fn start_ipc_agent(daemon: &mut Daemon) { use dbus::agent::DBusAgent; // Start the D-Bus agent (events in, signals out). - let agent = DBusAgent::new(daemon.event_sender.clone(), daemon.obtain_signal_receiver()); + let agent = DBusAgent::new(daemon.event_sender.clone(), daemon.subscribe_signals()); tokio::spawn(async move { agent.run().await; }); @@ -35,8 +35,7 @@ async fn start_ipc_agent(daemon: &mut Daemon) { #[cfg(target_os = "macos")] async fn start_ipc_agent(daemon: &mut Daemon) { // Start the macOS XPC agent (events in, signals out) on a dedicated thread. - let agent = - xpc::agent::XpcAgent::new(daemon.event_sender.clone(), daemon.obtain_signal_receiver()); + let agent = xpc::agent::XpcAgent::new(daemon.event_sender.clone(), daemon.subscribe_signals()); std::thread::spawn(move || { // Use a single-threaded Tokio runtime for the XPC agent. let rt = tokio::runtime::Builder::new_current_thread() diff --git a/core/kordophoned/src/xpc/agent.rs b/core/kordophoned/src/xpc/agent.rs index 6ebdf79..41095da 100644 --- a/core/kordophoned/src/xpc/agent.rs +++ b/core/kordophoned/src/xpc/agent.rs @@ -4,7 +4,7 @@ use std::ffi::CString; use std::os::raw::c_char; use std::ptr; use std::sync::Arc; -use tokio::sync::{mpsc, oneshot, Mutex}; +use tokio::sync::{broadcast, mpsc, oneshot, Mutex}; use xpc_connection::{message_to_xpc_object, xpc_object_to_message, Message, MessageError}; use xpc_connection_sys as xpc_sys; @@ -22,11 +22,14 @@ type Subscribers = Arc>>; #[derive(Clone)] pub struct XpcAgent { event_sink: mpsc::Sender, - signal_receiver: Arc>>>, + signal_receiver: Arc>>>, } impl XpcAgent { - pub fn new(event_sink: mpsc::Sender, signal_receiver: mpsc::Receiver) -> Self { + pub fn new( + event_sink: mpsc::Sender, + signal_receiver: broadcast::Receiver, + ) -> Self { Self { event_sink, signal_receiver: Arc::new(Mutex::new(Some(signal_receiver))), @@ -71,7 +74,31 @@ impl XpcAgent { .await .take() .expect("Signal receiver already taken"); - while let Some(signal) = receiver.recv().await { + loop { + let signal = match receiver.recv().await { + Ok(signal) => signal, + Err(broadcast::error::RecvError::Lagged(skipped)) => { + log::warn!( + target: LOG_TARGET, + "XPC agent lagged; skipped {} signals", + skipped + ); + continue; + } + Err(broadcast::error::RecvError::Closed) => { + log::warn!( + target: LOG_TARGET, + "Signal channel closed; stopping XPC forwarding" + ); + break; + } + }; + + if matches!(signal, Signal::Internal(_)) { + log::trace!(target: LOG_TARGET, "Skipping internal signal for XPC"); + continue; + } + log::trace!(target: LOG_TARGET, "Broadcasting signal: {:?}", signal); let msg = super::util::signal_to_message(signal); let xobj = message_to_xpc_object(msg); @@ -127,7 +154,7 @@ impl XpcAgent { // Drop any cleanup resource now that payload is constructed and sent. drop(result.cleanup); - + log::trace!(target: LOG_TARGET, "XPC reply sent for method: {}", method); } else { log::warn!(target: LOG_TARGET, "No reply port for method: {}", method);