Private
Public Access
1
0

first pass at platform agnostic notifications

This commit is contained in:
2025-12-14 17:09:41 -08:00
parent 0cfa5e05d4
commit 8d9251bfe2
6 changed files with 332 additions and 154 deletions

View File

@@ -17,8 +17,11 @@ use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use thiserror::Error; use thiserror::Error;
use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::{
use tokio::sync::Mutex; broadcast,
mpsc::{Receiver, Sender},
Mutex,
};
use uuid::Uuid; use uuid::Uuid;
use kordophone_db::{ use kordophone_db::{
@@ -79,8 +82,7 @@ pub struct Daemon {
pub event_sender: Sender<Event>, pub event_sender: Sender<Event>,
event_receiver: Receiver<Event>, event_receiver: Receiver<Event>,
signal_receiver: Option<Receiver<Signal>>, signal_sender: broadcast::Sender<Signal>,
signal_sender: Sender<Signal>,
post_office_sink: Sender<PostOfficeEvent>, post_office_sink: Sender<PostOfficeEvent>,
post_office_source: Option<Receiver<PostOfficeEvent>>, post_office_source: Option<Receiver<PostOfficeEvent>>,
@@ -107,7 +109,7 @@ impl Daemon {
// Create event channels // Create event channels
let (event_sender, event_receiver) = tokio::sync::mpsc::channel(100); let (event_sender, event_receiver) = tokio::sync::mpsc::channel(100);
let (signal_sender, signal_receiver) = tokio::sync::mpsc::channel(100); let (signal_sender, _) = tokio::sync::broadcast::channel(100);
let (post_office_sink, post_office_source) = tokio::sync::mpsc::channel(100); let (post_office_sink, post_office_source) = tokio::sync::mpsc::channel(100);
// Create background task runtime // Create background task runtime
@@ -126,7 +128,6 @@ impl Daemon {
database, database,
event_receiver, event_receiver,
event_sender, event_sender,
signal_receiver: Some(signal_receiver),
signal_sender, signal_sender,
post_office_sink, post_office_sink,
post_office_source: Some(post_office_source), post_office_source: Some(post_office_source),
@@ -171,6 +172,16 @@ impl Daemon {
attachment_store.run().await; attachment_store.run().await;
}); });
// Notification listener
{
let notifier = self.notifier.clone();
let mut signal_rx = self.signal_sender.subscribe();
let database = self.database.clone();
tokio::spawn(async move {
notifier.listen(signal_rx, database).await;
});
}
while let Some(event) = self.event_receiver.recv().await { while let Some(event) = self.event_receiver.recv().await {
log::debug!(target: target::EVENT, "Received event: {:?}", event); log::debug!(target: target::EVENT, "Received event: {:?}", event);
self.handle_event(event).await; self.handle_event(event).await;
@@ -204,11 +215,9 @@ impl Daemon {
Event::SyncAllConversations(reply) => { Event::SyncAllConversations(reply) => {
let mut db_clone = self.database.clone(); let mut db_clone = self.database.clone();
let signal_sender = self.signal_sender.clone(); let signal_sender = self.signal_sender.clone();
let notifier = self.notifier.clone();
self.runtime.spawn(async move { self.runtime.spawn(async move {
let result = let result =
Self::sync_all_conversations_impl(&mut db_clone, &signal_sender, notifier) Self::sync_all_conversations_impl(&mut db_clone, &signal_sender).await;
.await;
if let Err(e) = result { if let Err(e) = result {
log::error!(target: target::SYNC, "Error handling sync event: {}", e); log::error!(target: target::SYNC, "Error handling sync event: {}", e);
} }
@@ -221,12 +230,10 @@ impl Daemon {
Event::SyncConversation(conversation_id, reply) => { Event::SyncConversation(conversation_id, reply) => {
let mut db_clone = self.database.clone(); let mut db_clone = self.database.clone();
let signal_sender = self.signal_sender.clone(); let signal_sender = self.signal_sender.clone();
let notifier = self.notifier.clone();
self.runtime.spawn(async move { self.runtime.spawn(async move {
let result = Self::sync_conversation_impl( let result = Self::sync_conversation_impl(
&mut db_clone, &mut db_clone,
&signal_sender, &signal_sender,
notifier,
conversation_id, conversation_id,
) )
.await; .await;
@@ -270,10 +277,11 @@ impl Daemon {
self.spawn_conversation_list_sync(); self.spawn_conversation_list_sync();
// Send signal to the client that the update stream has been reconnected. // Send signal to the client that the update stream has been reconnected.
self.signal_sender Self::send_signal(
.send(Signal::UpdateStreamReconnected) &self.signal_sender,
.await Signal::UpdateStreamReconnected,
.unwrap(); target::UPDATES,
);
} }
Event::GetAllConversations(limit, offset, reply) => { Event::GetAllConversations(limit, offset, reply) => {
@@ -342,7 +350,7 @@ impl Daemon {
reply.send(uuid).unwrap(); reply.send(uuid).unwrap();
// Notify clients that messages have changed (e.g., to refresh placeholders). // Notify clients that messages have changed (e.g., to refresh placeholders).
self.emit_messages_updated(conversation_id).await; self.emit_messages_updated(conversation_id);
} }
Event::MessageSent(message, outgoing_message, conversation_id) => { Event::MessageSent(message, outgoing_message, conversation_id) => {
@@ -373,14 +381,18 @@ impl Daemon {
.map(|messages| messages.retain(|m| m.guid != outgoing_message.guid)); .map(|messages| messages.retain(|m| m.guid != outgoing_message.guid));
// Notify clients to refresh the conversation after the final message arrives. // Notify clients to refresh the conversation after the final message arrives.
self.emit_messages_updated(conversation_id).await; self.emit_messages_updated(conversation_id);
} }
Event::TestNotification(summary, body, reply) => { Event::TestNotification(summary, body, reply) => {
let result = self let result = self
.notifier .signal_sender
.send_manual(&summary, &body) .send(Signal::Internal(InternalSignal::TestNotification {
.map_err(|e| format!("Failed to display notification: {}", e)); summary,
body,
}))
.map(|_| ())
.map_err(|e| e.to_string());
reply.send(result).unwrap(); reply.send(result).unwrap();
} }
@@ -413,10 +425,11 @@ impl Daemon {
log::debug!(target: target::ATTACHMENTS, "Daemon: attachment downloaded: {}, sending signal", attachment_id); log::debug!(target: target::ATTACHMENTS, "Daemon: attachment downloaded: {}, sending signal", attachment_id);
// Send signal to the client that the attachment has been downloaded. // Send signal to the client that the attachment has been downloaded.
self.signal_sender Self::send_signal(
.send(Signal::AttachmentDownloaded(attachment_id)) &self.signal_sender,
.await Signal::AttachmentDownloaded(attachment_id),
.unwrap(); target::ATTACHMENTS,
);
} }
Event::UploadAttachment(path, reply) => { Event::UploadAttachment(path, reply) => {
@@ -431,17 +444,17 @@ impl Daemon {
Event::AttachmentUploaded(upload_guid, attachment_guid) => { Event::AttachmentUploaded(upload_guid, attachment_guid) => {
log::info!(target: target::ATTACHMENTS, "Daemon: attachment uploaded: {}, {}", upload_guid, attachment_guid); log::info!(target: target::ATTACHMENTS, "Daemon: attachment uploaded: {}, {}", upload_guid, attachment_guid);
self.signal_sender Self::send_signal(
.send(Signal::AttachmentUploaded(upload_guid, attachment_guid)) &self.signal_sender,
.await Signal::AttachmentUploaded(upload_guid, attachment_guid),
.unwrap(); target::ATTACHMENTS,
);
} }
} }
} }
/// Panics if the signal receiver has already been taken. pub fn subscribe_signals(&self) -> broadcast::Receiver<Signal> {
pub fn obtain_signal_receiver(&mut self) -> Receiver<Signal> { self.signal_sender.subscribe()
self.signal_receiver.take().unwrap()
} }
async fn get_conversations_limit_offset( async fn get_conversations_limit_offset(
@@ -456,18 +469,8 @@ impl Daemon {
.await .await
} }
async fn emit_messages_updated(&self, conversation_id: String) { fn emit_messages_updated(&self, conversation_id: String) {
self.notifier Self::send_messages_updated(&self.signal_sender, conversation_id);
.notify_new_messages(&self.database, &conversation_id)
.await;
if let Err(e) = self
.signal_sender
.send(Signal::MessagesUpdated(conversation_id))
.await
{
log::warn!(target: target::DAEMON, "Failed to send MessagesUpdated signal: {}", e);
}
} }
async fn get_messages( async fn get_messages(
@@ -545,7 +548,7 @@ impl Daemon {
async fn sync_conversation_list( async fn sync_conversation_list(
database: &mut Arc<Mutex<Database>>, database: &mut Arc<Mutex<Database>>,
signal_sender: &Sender<Signal>, signal_sender: &broadcast::Sender<Signal>,
) -> Result<()> { ) -> Result<()> {
log::info!(target: target::SYNC, "Starting list conversation sync"); log::info!(target: target::SYNC, "Starting list conversation sync");
@@ -597,7 +600,7 @@ impl Daemon {
} }
// Send conversations updated signal // Send conversations updated signal
signal_sender.send(Signal::ConversationsUpdated).await?; Self::send_signal(signal_sender, Signal::ConversationsUpdated, target::SYNC);
log::info!(target: target::SYNC, "List synchronized: {} conversations", num_conversations); log::info!(target: target::SYNC, "List synchronized: {} conversations", num_conversations);
Ok(()) Ok(())
@@ -605,8 +608,7 @@ impl Daemon {
async fn sync_all_conversations_impl( async fn sync_all_conversations_impl(
database: &mut Arc<Mutex<Database>>, database: &mut Arc<Mutex<Database>>,
signal_sender: &Sender<Signal>, signal_sender: &broadcast::Sender<Signal>,
notifier: Arc<NotificationService>,
) -> Result<()> { ) -> Result<()> {
log::info!(target: target::SYNC, "Starting full conversation sync"); log::info!(target: target::SYNC, "Starting full conversation sync");
@@ -630,17 +632,11 @@ impl Daemon {
.await?; .await?;
// Sync individual conversation. // Sync individual conversation.
Self::sync_conversation_impl( Self::sync_conversation_impl(database, signal_sender, conversation_id).await?;
database,
signal_sender,
notifier.clone(),
conversation_id,
)
.await?;
} }
// Send conversations updated signal. // Send conversations updated signal.
signal_sender.send(Signal::ConversationsUpdated).await?; Self::send_signal(signal_sender, Signal::ConversationsUpdated, target::SYNC);
log::info!(target: target::SYNC, "Full sync complete, {} conversations processed", num_conversations); log::info!(target: target::SYNC, "Full sync complete, {} conversations processed", num_conversations);
Ok(()) Ok(())
@@ -648,8 +644,7 @@ impl Daemon {
async fn sync_conversation_impl( async fn sync_conversation_impl(
database: &mut Arc<Mutex<Database>>, database: &mut Arc<Mutex<Database>>,
signal_sender: &Sender<Signal>, signal_sender: &broadcast::Sender<Signal>,
notifier: Arc<NotificationService>,
conversation_id: String, conversation_id: String,
) -> Result<()> { ) -> Result<()> {
log::debug!(target: target::SYNC, "Starting conversation sync for {}", conversation_id); log::debug!(target: target::SYNC, "Starting conversation sync for {}", conversation_id);
@@ -707,12 +702,7 @@ impl Daemon {
// Send messages updated signal, if we actually inserted any messages. // Send messages updated signal, if we actually inserted any messages.
if num_messages > 0 { if num_messages > 0 {
notifier Self::send_messages_updated(signal_sender, conversation_id.clone());
.notify_new_messages(database, &conversation_id)
.await;
signal_sender
.send(Signal::MessagesUpdated(conversation_id.clone()))
.await?;
} }
log::debug!(target: target::SYNC, "Synchronized {} messages for conversation {}", num_messages, &conversation_id); log::debug!(target: target::SYNC, "Synchronized {} messages for conversation {}", num_messages, &conversation_id);
@@ -733,14 +723,14 @@ impl Daemon {
async fn update_conversation_metadata_impl( async fn update_conversation_metadata_impl(
database: &mut Arc<Mutex<Database>>, database: &mut Arc<Mutex<Database>>,
conversation: Conversation, conversation: Conversation,
signal_sender: &Sender<Signal>, signal_sender: &broadcast::Sender<Signal>,
) -> Result<()> { ) -> Result<()> {
log::debug!(target: target::DAEMON, "Updating conversation metadata: {}", conversation.guid); log::debug!(target: target::DAEMON, "Updating conversation metadata: {}", conversation.guid);
let updated = database let updated = database
.with_repository(|r| r.merge_conversation_metadata(conversation)) .with_repository(|r| r.merge_conversation_metadata(conversation))
.await?; .await?;
if updated { if updated {
signal_sender.send(Signal::ConversationsUpdated).await?; Self::send_signal(signal_sender, Signal::ConversationsUpdated, target::DAEMON);
} }
Ok(()) Ok(())
@@ -755,6 +745,40 @@ impl Daemon {
self.database.with_settings(|s| settings.save(s)).await self.database.with_settings(|s| settings.save(s)).await
} }
fn send_signal(sender: &broadcast::Sender<Signal>, signal: Signal, log_target: &str) {
if let Err(error) = sender.send(signal) {
log::trace!(
target: log_target,
"Signal delivery skipped (no listeners?): {}",
error
);
}
}
fn send_internal(sender: &broadcast::Sender<Signal>, signal: InternalSignal) {
if let Err(error) = sender.send(Signal::Internal(signal)) {
log::trace!(
target: target::DAEMON,
"Internal signal delivery skipped: {}",
error
);
}
}
fn send_messages_updated(sender: &broadcast::Sender<Signal>, conversation_id: String) {
Self::send_internal(
sender,
InternalSignal::MessagesUpdated(conversation_id.clone()),
);
if let Err(error) = sender.send(Signal::MessagesUpdated(conversation_id)) {
log::warn!(
target: target::DAEMON,
"Failed to send MessagesUpdated signal: {}",
error
);
}
}
async fn get_client_impl( async fn get_client_impl(
database: &mut Arc<Mutex<Database>>, database: &mut Arc<Mutex<Database>>,
) -> Result<HTTPAPIClient<DatabaseAuthenticationStore>> { ) -> Result<HTTPAPIClient<DatabaseAuthenticationStore>> {
@@ -787,9 +811,11 @@ impl Daemon {
}) })
.await?; .await?;
self.signal_sender Self::send_signal(
.send(Signal::ConversationsUpdated) &self.signal_sender,
.await?; Signal::ConversationsUpdated,
target::SYNC,
);
Ok(()) Ok(())
} }

View File

@@ -1,5 +1,6 @@
use super::contact_resolver::{ContactResolver, DefaultContactResolverBackend}; use super::contact_resolver::{ContactResolver, DefaultContactResolverBackend};
use super::models::message::Participant; use super::models::message::Participant;
use super::signals::{InternalSignal, Signal};
use super::{target, Message}; use super::{target, Message};
use kordophone_db::{ use kordophone_db::{
@@ -9,7 +10,7 @@ use kordophone_db::{
}; };
use notify::Notification; use notify::Notification;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::Mutex; use tokio::sync::{broadcast, Mutex};
/// Centralised notification helper used by platform transports (D-Bus, XPC, …). /// Centralised notification helper used by platform transports (D-Bus, XPC, …).
pub struct NotificationService { pub struct NotificationService {
@@ -25,6 +26,57 @@ impl NotificationService {
} }
} }
pub async fn listen(
self: Arc<Self>,
mut signal_rx: broadcast::Receiver<Signal>,
database: Arc<Mutex<Database>>,
) {
log::trace!(target: target::DAEMON, "NotificationService listener started");
loop {
match signal_rx.recv().await {
Ok(Signal::Internal(InternalSignal::MessagesUpdated(conversation_id))) => {
log::trace!(
target: target::DAEMON,
"NotificationService received MessagesUpdated for {}",
conversation_id
);
self.notify_new_messages(&database, &conversation_id).await;
}
Ok(Signal::Internal(InternalSignal::TestNotification { summary, body })) => {
log::trace!(
target: target::DAEMON,
"NotificationService received TestNotification"
);
if let Err(error) = self.send_manual(&summary, &body) {
log::warn!(
target: target::DAEMON,
"Failed to display test notification: {}",
error
);
}
}
Ok(other) => {
log::trace!(
target: target::DAEMON,
"NotificationService ignoring signal: {:?}",
other
);
}
Err(broadcast::error::RecvError::Lagged(skipped)) => {
log::warn!(
target: target::DAEMON,
"NotificationService lagged; skipped {} signals",
skipped
);
}
Err(broadcast::error::RecvError::Closed) => {
log::trace!(target: target::DAEMON, "NotificationService listener exiting");
break;
}
}
}
}
/// Checks whether a new user-visible notification should be shown for the /// Checks whether a new user-visible notification should be shown for the
/// given conversation and displays it if appropriate. /// given conversation and displays it if appropriate.
pub async fn notify_new_messages( pub async fn notify_new_messages(
@@ -32,7 +84,17 @@ impl NotificationService {
database: &Arc<Mutex<Database>>, database: &Arc<Mutex<Database>>,
conversation_id: &str, conversation_id: &str,
) { ) {
log::trace!(
target: target::DAEMON,
"NotificationService preparing payload for {}",
conversation_id
);
if let Some((summary, body)) = self.prepare_payload(database, conversation_id).await { if let Some((summary, body)) = self.prepare_payload(database, conversation_id).await {
log::trace!(
target: target::DAEMON,
"NotificationService displaying notification for {}",
conversation_id
);
if let Err(error) = self.show_notification(&summary, &body) { if let Err(error) = self.show_notification(&summary, &body) {
log::warn!( log::warn!(
target: target::DAEMON, target: target::DAEMON,
@@ -41,11 +103,21 @@ impl NotificationService {
error error
); );
} }
} else {
log::trace!(
target: target::DAEMON,
"NotificationService skipping notification for {}",
conversation_id
);
} }
} }
/// Displays a manual test notification. /// Displays a manual test notification.
pub fn send_manual(&self, summary: &str, body: &str) -> Result<(), notify::error::Error> { pub fn send_manual(&self, summary: &str, body: &str) -> Result<(), notify::error::Error> {
log::trace!(
target: target::DAEMON,
"NotificationService sending manual notification"
);
self.show_notification(summary, body) self.show_notification(summary, body)
} }
@@ -62,7 +134,14 @@ impl NotificationService {
let conversation = match conversation_opt { let conversation = match conversation_opt {
Ok(Some(conv)) => conv, Ok(Some(conv)) => conv,
Ok(None) => return None, Ok(None) => {
log::trace!(
target: target::DAEMON,
"NotificationService: conversation {} not found",
conversation_id
);
return None;
}
Err(err) => { Err(err) => {
log::warn!( log::warn!(
target: target::DAEMON, target: target::DAEMON,
@@ -75,6 +154,11 @@ impl NotificationService {
}; };
if conversation.unread_count == 0 { if conversation.unread_count == 0 {
log::trace!(
target: target::DAEMON,
"NotificationService: conversation {} has no unread messages",
conversation_id
);
return None; return None;
} }
@@ -86,7 +170,14 @@ impl NotificationService {
let last_message: Message = match last_message_opt { let last_message: Message = match last_message_opt {
Ok(Some(message)) => message.into(), Ok(Some(message)) => message.into(),
Ok(None) => return None, Ok(None) => {
log::trace!(
target: target::DAEMON,
"NotificationService: conversation {} has no messages",
conversation_id
);
return None;
}
Err(err) => { Err(err) => {
log::warn!( log::warn!(
target: target::DAEMON, target: target::DAEMON,
@@ -99,6 +190,11 @@ impl NotificationService {
}; };
if matches!(last_message.sender, Participant::Me) { if matches!(last_message.sender, Participant::Me) {
log::trace!(
target: target::DAEMON,
"NotificationService: last message in {} was sent by self",
conversation_id
);
return None; return None;
} }

View File

@@ -1,3 +1,12 @@
#[derive(Debug, Clone)]
pub enum InternalSignal {
/// Notification that new messages are available for a conversation.
MessagesUpdated(String),
/// Manual test notification request.
TestNotification { summary: String, body: String },
}
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum Signal { pub enum Signal {
/// Emitted when the list of conversations is updated. /// Emitted when the list of conversations is updated.
@@ -21,4 +30,7 @@ pub enum Signal {
/// Emitted when the update stream is reconnected after a timeout or configuration change. /// Emitted when the update stream is reconnected after a timeout or configuration change.
UpdateStreamReconnected, UpdateStreamReconnected,
/// Internal-only signals consumed by daemon components.
Internal(InternalSignal),
} }

View File

@@ -2,7 +2,7 @@ use dbus::arg;
use dbus_tree::MethodErr; use dbus_tree::MethodErr;
use std::sync::Arc; use std::sync::Arc;
use std::{future::Future, thread}; use std::{future::Future, thread};
use tokio::sync::{mpsc, oneshot, Mutex}; use tokio::sync::{broadcast, mpsc, oneshot, Mutex};
use kordophoned::daemon::{ use kordophoned::daemon::{
contact_resolver::{ContactResolver, DefaultContactResolverBackend}, contact_resolver::{ContactResolver, DefaultContactResolverBackend},
@@ -22,12 +22,15 @@ use dbus_tokio::connection;
#[derive(Clone)] #[derive(Clone)]
pub struct DBusAgent { pub struct DBusAgent {
event_sink: mpsc::Sender<Event>, event_sink: mpsc::Sender<Event>,
signal_receiver: Arc<Mutex<Option<mpsc::Receiver<Signal>>>>, signal_receiver: Arc<Mutex<Option<broadcast::Receiver<Signal>>>>,
contact_resolver: ContactResolver<DefaultContactResolverBackend>, contact_resolver: ContactResolver<DefaultContactResolverBackend>,
} }
impl DBusAgent { impl DBusAgent {
pub fn new(event_sink: mpsc::Sender<Event>, signal_receiver: mpsc::Receiver<Signal>) -> Self { pub fn new(
event_sink: mpsc::Sender<Event>,
signal_receiver: broadcast::Receiver<Signal>,
) -> Self {
Self { Self {
event_sink, event_sink,
signal_receiver: Arc::new(Mutex::new(Some(signal_receiver))), signal_receiver: Arc::new(Mutex::new(Some(signal_receiver))),
@@ -75,8 +78,9 @@ impl DBusAgent {
.take() .take()
.expect("Signal receiver already taken"); .expect("Signal receiver already taken");
while let Some(signal) = receiver.recv().await { loop {
match signal { match receiver.recv().await {
Ok(signal) => match signal {
Signal::ConversationsUpdated => { Signal::ConversationsUpdated => {
log::debug!("Sending signal: ConversationsUpdated"); log::debug!("Sending signal: ConversationsUpdated");
registry registry
@@ -150,6 +154,20 @@ impl DBusAgent {
0 0
}); });
} }
Signal::Internal(_) => {
log::trace!("Ignoring internal signal for D-Bus transport");
}
},
Err(broadcast::error::RecvError::Lagged(skipped)) => {
log::warn!(
"Signal receiver lagged; skipped {} daemon signals",
skipped
);
}
Err(broadcast::error::RecvError::Closed) => {
log::warn!("Signal channel closed; stopping D-Bus forwarding");
break;
}
} }
} }
}); });

View File

@@ -26,7 +26,7 @@ async fn start_ipc_agent(daemon: &mut Daemon) {
use dbus::agent::DBusAgent; use dbus::agent::DBusAgent;
// Start the D-Bus agent (events in, signals out). // Start the D-Bus agent (events in, signals out).
let agent = DBusAgent::new(daemon.event_sender.clone(), daemon.obtain_signal_receiver()); let agent = DBusAgent::new(daemon.event_sender.clone(), daemon.subscribe_signals());
tokio::spawn(async move { tokio::spawn(async move {
agent.run().await; agent.run().await;
}); });
@@ -35,8 +35,7 @@ async fn start_ipc_agent(daemon: &mut Daemon) {
#[cfg(target_os = "macos")] #[cfg(target_os = "macos")]
async fn start_ipc_agent(daemon: &mut Daemon) { async fn start_ipc_agent(daemon: &mut Daemon) {
// Start the macOS XPC agent (events in, signals out) on a dedicated thread. // Start the macOS XPC agent (events in, signals out) on a dedicated thread.
let agent = let agent = xpc::agent::XpcAgent::new(daemon.event_sender.clone(), daemon.subscribe_signals());
xpc::agent::XpcAgent::new(daemon.event_sender.clone(), daemon.obtain_signal_receiver());
std::thread::spawn(move || { std::thread::spawn(move || {
// Use a single-threaded Tokio runtime for the XPC agent. // Use a single-threaded Tokio runtime for the XPC agent.
let rt = tokio::runtime::Builder::new_current_thread() let rt = tokio::runtime::Builder::new_current_thread()

View File

@@ -4,7 +4,7 @@ use std::ffi::CString;
use std::os::raw::c_char; use std::os::raw::c_char;
use std::ptr; use std::ptr;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::{mpsc, oneshot, Mutex}; use tokio::sync::{broadcast, mpsc, oneshot, Mutex};
use xpc_connection::{message_to_xpc_object, xpc_object_to_message, Message, MessageError}; use xpc_connection::{message_to_xpc_object, xpc_object_to_message, Message, MessageError};
use xpc_connection_sys as xpc_sys; use xpc_connection_sys as xpc_sys;
@@ -22,11 +22,14 @@ type Subscribers = Arc<std::sync::Mutex<Vec<XpcConn>>>;
#[derive(Clone)] #[derive(Clone)]
pub struct XpcAgent { pub struct XpcAgent {
event_sink: mpsc::Sender<Event>, event_sink: mpsc::Sender<Event>,
signal_receiver: Arc<Mutex<Option<mpsc::Receiver<Signal>>>>, signal_receiver: Arc<Mutex<Option<broadcast::Receiver<Signal>>>>,
} }
impl XpcAgent { impl XpcAgent {
pub fn new(event_sink: mpsc::Sender<Event>, signal_receiver: mpsc::Receiver<Signal>) -> Self { pub fn new(
event_sink: mpsc::Sender<Event>,
signal_receiver: broadcast::Receiver<Signal>,
) -> Self {
Self { Self {
event_sink, event_sink,
signal_receiver: Arc::new(Mutex::new(Some(signal_receiver))), signal_receiver: Arc::new(Mutex::new(Some(signal_receiver))),
@@ -71,7 +74,31 @@ impl XpcAgent {
.await .await
.take() .take()
.expect("Signal receiver already taken"); .expect("Signal receiver already taken");
while let Some(signal) = receiver.recv().await { loop {
let signal = match receiver.recv().await {
Ok(signal) => signal,
Err(broadcast::error::RecvError::Lagged(skipped)) => {
log::warn!(
target: LOG_TARGET,
"XPC agent lagged; skipped {} signals",
skipped
);
continue;
}
Err(broadcast::error::RecvError::Closed) => {
log::warn!(
target: LOG_TARGET,
"Signal channel closed; stopping XPC forwarding"
);
break;
}
};
if matches!(signal, Signal::Internal(_)) {
log::trace!(target: LOG_TARGET, "Skipping internal signal for XPC");
continue;
}
log::trace!(target: LOG_TARGET, "Broadcasting signal: {:?}", signal); log::trace!(target: LOG_TARGET, "Broadcasting signal: {:?}", signal);
let msg = super::util::signal_to_message(signal); let msg = super::util::signal_to_message(signal);
let xobj = message_to_xpc_object(msg); let xobj = message_to_xpc_object(msg);