diff --git a/core/kordophoned/src/daemon/events.rs b/core/kordophoned/src/daemon/events.rs index 4fa5ad7..edec4c1 100644 --- a/core/kordophoned/src/daemon/events.rs +++ b/core/kordophoned/src/daemon/events.rs @@ -41,12 +41,6 @@ pub enum Event { /// - offset: The offset into the conversation list to start returning conversations from. GetAllConversations(i32, i32, Reply>), - /// Returns a conversation by its ID. - GetConversation(String, Reply>), - - /// Returns the most recent message for a conversation. - GetLastMessage(String, Reply>), - /// Returns all known settings from the database. GetAllSettings(Reply), @@ -67,6 +61,9 @@ pub enum Event { /// - reply: The outgoing message ID (not the server-assigned message ID). SendMessage(String, String, Vec, Reply), + /// Triggers a manual test notification. + TestNotification(String, String, Reply>), + /// Notifies the daemon that a message has been sent. /// Parameters: /// - message: The message that was sent. diff --git a/core/kordophoned/src/daemon/mod.rs b/core/kordophoned/src/daemon/mod.rs index 346f715..7f088aa 100644 --- a/core/kordophoned/src/daemon/mod.rs +++ b/core/kordophoned/src/daemon/mod.rs @@ -41,6 +41,9 @@ mod post_office; use post_office::Event as PostOfficeEvent; use post_office::PostOffice; +mod notifier; +use notifier::NotificationService; + mod models; pub use models::Attachment; pub use models::Message; @@ -87,6 +90,7 @@ pub struct Daemon { attachment_store_sink: Option>, update_monitor_command_tx: Option>, + notifier: Arc, version: String, database: Arc>, runtime: tokio::runtime::Runtime, @@ -114,9 +118,11 @@ impl Daemon { let database_impl = Database::new(&database_path.to_string_lossy())?; let database = Arc::new(Mutex::new(database_impl)); + let notifier = Arc::new(NotificationService::new()); Ok(Self { version: env!("CARGO_PKG_VERSION").to_string(), + notifier, database, event_receiver, event_sender, @@ -198,9 +204,11 @@ impl Daemon { Event::SyncAllConversations(reply) => { let mut db_clone = self.database.clone(); let signal_sender = self.signal_sender.clone(); + let notifier = self.notifier.clone(); self.runtime.spawn(async move { let result = - Self::sync_all_conversations_impl(&mut db_clone, &signal_sender).await; + Self::sync_all_conversations_impl(&mut db_clone, &signal_sender, notifier) + .await; if let Err(e) = result { log::error!(target: target::SYNC, "Error handling sync event: {}", e); } @@ -213,10 +221,12 @@ impl Daemon { Event::SyncConversation(conversation_id, reply) => { let mut db_clone = self.database.clone(); let signal_sender = self.signal_sender.clone(); + let notifier = self.notifier.clone(); self.runtime.spawn(async move { let result = Self::sync_conversation_impl( &mut db_clone, &signal_sender, + notifier, conversation_id, ) .await; @@ -271,16 +281,6 @@ impl Daemon { reply.send(conversations).unwrap(); } - Event::GetConversation(conversation_id, reply) => { - let conversation = self.get_conversation(conversation_id).await; - reply.send(conversation).unwrap(); - } - - Event::GetLastMessage(conversation_id, reply) => { - let message = self.get_last_message(conversation_id).await; - reply.send(message).unwrap(); - } - Event::GetAllSettings(reply) => { let settings = self.get_settings().await.unwrap_or_else(|e| { log::error!(target: target::SETTINGS, "Failed to get settings: {:#?}", e); @@ -336,17 +336,13 @@ impl Daemon { } Event::SendMessage(conversation_id, text, attachment_guids, reply) => { - let conversation_id = conversation_id.clone(); let uuid = self .enqueue_outgoing_message(text, conversation_id.clone(), attachment_guids) .await; reply.send(uuid).unwrap(); - // Send message updated signal, we have a placeholder message we will return. - self.signal_sender - .send(Signal::MessagesUpdated(conversation_id.clone())) - .await - .unwrap(); + // Notify clients that messages have changed (e.g., to refresh placeholders). + self.emit_messages_updated(conversation_id).await; } Event::MessageSent(message, outgoing_message, conversation_id) => { @@ -376,11 +372,16 @@ impl Daemon { .get_mut(&conversation_id) .map(|messages| messages.retain(|m| m.guid != outgoing_message.guid)); - // Send message updated signal. - self.signal_sender - .send(Signal::MessagesUpdated(conversation_id)) - .await - .unwrap(); + // Notify clients to refresh the conversation after the final message arrives. + self.emit_messages_updated(conversation_id).await; + } + + Event::TestNotification(summary, body, reply) => { + let result = self + .notifier + .send_manual(&summary, &body) + .map_err(|e| format!("Failed to display notification: {}", e)); + reply.send(result).unwrap(); } Event::GetAttachment(guid, reply) => { @@ -443,14 +444,6 @@ impl Daemon { self.signal_receiver.take().unwrap() } - async fn get_conversation(&mut self, conversation_id: String) -> Option { - self.database - .lock() - .await - .with_repository(|r| r.get_conversation_by_guid(&conversation_id).unwrap()) - .await - } - async fn get_conversations_limit_offset( &mut self, limit: i32, @@ -463,16 +456,18 @@ impl Daemon { .await } - async fn get_last_message(&mut self, conversation_id: String) -> Option { - self.database - .lock() - .await - .with_repository(|r| { - r.get_last_message_for_conversation(&conversation_id) - .unwrap() - .map(|message| message.into()) - }) + async fn emit_messages_updated(&self, conversation_id: String) { + self.notifier + .notify_new_messages(&self.database, &conversation_id) + .await; + + if let Err(e) = self + .signal_sender + .send(Signal::MessagesUpdated(conversation_id)) .await + { + log::warn!(target: target::DAEMON, "Failed to send MessagesUpdated signal: {}", e); + } } async fn get_messages( @@ -611,6 +606,7 @@ impl Daemon { async fn sync_all_conversations_impl( database: &mut Arc>, signal_sender: &Sender, + notifier: Arc, ) -> Result<()> { log::info!(target: target::SYNC, "Starting full conversation sync"); @@ -634,7 +630,13 @@ impl Daemon { .await?; // Sync individual conversation. - Self::sync_conversation_impl(database, signal_sender, conversation_id).await?; + Self::sync_conversation_impl( + database, + signal_sender, + notifier.clone(), + conversation_id, + ) + .await?; } // Send conversations updated signal. @@ -647,6 +649,7 @@ impl Daemon { async fn sync_conversation_impl( database: &mut Arc>, signal_sender: &Sender, + notifier: Arc, conversation_id: String, ) -> Result<()> { log::debug!(target: target::SYNC, "Starting conversation sync for {}", conversation_id); @@ -704,6 +707,9 @@ impl Daemon { // Send messages updated signal, if we actually inserted any messages. if num_messages > 0 { + notifier + .notify_new_messages(database, &conversation_id) + .await; signal_sender .send(Signal::MessagesUpdated(conversation_id.clone())) .await?; diff --git a/core/kordophoned/src/daemon/notifier.rs b/core/kordophoned/src/daemon/notifier.rs new file mode 100644 index 0000000..bd3863a --- /dev/null +++ b/core/kordophoned/src/daemon/notifier.rs @@ -0,0 +1,192 @@ +use super::contact_resolver::{ContactResolver, DefaultContactResolverBackend}; +use super::models::message::Participant; +use super::{target, Message}; + +use kordophone_db::{ + database::{Database, DatabaseAccess}, + models::Conversation, + models::Participant as DbParticipant, +}; +use notify::Notification; +use std::sync::Arc; +use tokio::sync::Mutex; + +/// Centralised notification helper used by platform transports (D-Bus, XPC, …). +pub struct NotificationService { + resolver: Mutex>, +} + +impl NotificationService { + pub fn new() -> Self { + Self { + resolver: Mutex::new(ContactResolver::new( + DefaultContactResolverBackend::default(), + )), + } + } + + /// Checks whether a new user-visible notification should be shown for the + /// given conversation and displays it if appropriate. + pub async fn notify_new_messages( + &self, + database: &Arc>, + conversation_id: &str, + ) { + if let Some((summary, body)) = self.prepare_payload(database, conversation_id).await { + if let Err(error) = self.show_notification(&summary, &body) { + log::warn!( + target: target::DAEMON, + "Failed to display notification for conversation {}: {}", + conversation_id, + error + ); + } + } + } + + /// Displays a manual test notification. + pub fn send_manual(&self, summary: &str, body: &str) -> Result<(), notify::error::Error> { + self.show_notification(summary, body) + } + + async fn prepare_payload( + &self, + database: &Arc>, + conversation_id: &str, + ) -> Option<(String, String)> { + let conversation_opt = database + .lock() + .await + .with_repository(|r| r.get_conversation_by_guid(conversation_id)) + .await; + + let conversation = match conversation_opt { + Ok(Some(conv)) => conv, + Ok(None) => return None, + Err(err) => { + log::warn!( + target: target::DAEMON, + "Notification lookup failed for conversation {}: {}", + conversation_id, + err + ); + return None; + } + }; + + if conversation.unread_count == 0 { + return None; + } + + let last_message_opt = database + .lock() + .await + .with_repository(|r| r.get_last_message_for_conversation(conversation_id)) + .await; + + let last_message: Message = match last_message_opt { + Ok(Some(message)) => message.into(), + Ok(None) => return None, + Err(err) => { + log::warn!( + target: target::DAEMON, + "Notification lookup failed for conversation {}: {}", + conversation_id, + err + ); + return None; + } + }; + + if matches!(last_message.sender, Participant::Me) { + return None; + } + + let mut resolver = self.resolver.lock().await; + let summary = self.conversation_display_name(&conversation, &mut resolver); + let sender_display_name = + self.resolve_participant_display_name(&last_message.sender, &mut resolver); + + let mut message_text = last_message.text.replace('\u{FFFC}', ""); + if message_text.trim().is_empty() { + if !last_message.attachments.is_empty() { + message_text = "Sent an attachment".to_string(); + } else { + message_text = "Sent a message".to_string(); + } + } + + let body = if sender_display_name.is_empty() { + message_text + } else { + format!("{}: {}", sender_display_name, message_text) + }; + + Some((summary, body)) + } + + fn conversation_display_name( + &self, + conversation: &Conversation, + resolver: &mut ContactResolver, + ) -> String { + if let Some(display_name) = &conversation.display_name { + if !display_name.trim().is_empty() { + return display_name.clone(); + } + } + + let names: Vec = conversation + .participants + .iter() + .filter_map(|participant| match participant { + DbParticipant::Me => None, + DbParticipant::Remote { handle, contact_id } => { + if let Some(contact_id) = contact_id { + Some( + resolver + .get_contact_display_name(contact_id) + .unwrap_or_else(|| handle.clone()), + ) + } else { + Some(handle.clone()) + } + } + }) + .collect(); + + if names.is_empty() { + "Kordophone".to_string() + } else { + names.join(", ") + } + } + + fn resolve_participant_display_name( + &self, + participant: &Participant, + resolver: &mut ContactResolver, + ) -> String { + match participant { + Participant::Me => "".to_string(), + Participant::Remote { handle, contact_id } => { + if let Some(contact_id) = contact_id { + resolver + .get_contact_display_name(contact_id) + .unwrap_or_else(|| handle.clone()) + } else { + handle.clone() + } + } + } + } + + fn show_notification(&self, summary: &str, body: &str) -> Result<(), notify::error::Error> { + Notification::new() + .appname("Kordophone") + .summary(summary) + .body(body) + .show() + .map(|_| ()) + } +} diff --git a/core/kordophoned/src/dbus/agent.rs b/core/kordophoned/src/dbus/agent.rs index 62022c6..8a3bca0 100644 --- a/core/kordophoned/src/dbus/agent.rs +++ b/core/kordophoned/src/dbus/agent.rs @@ -1,7 +1,6 @@ use dbus::arg; use dbus_tree::MethodErr; -use notify::Notification; -use std::sync::{Arc, Mutex as StdMutex}; +use std::sync::Arc; use std::{future::Future, thread}; use tokio::sync::{mpsc, oneshot, Mutex}; @@ -10,11 +9,10 @@ use kordophoned::daemon::{ events::{Event, Reply}, settings::Settings, signals::Signal, - DaemonResult, Message, + DaemonResult, }; use kordophone_db::models::participant::Participant; -use kordophone_db::models::Conversation; use crate::dbus::endpoint::DbusRegistry; use crate::dbus::interface; @@ -25,7 +23,7 @@ use dbus_tokio::connection; pub struct DBusAgent { event_sink: mpsc::Sender, signal_receiver: Arc>>>, - contact_resolver: Arc>>, + contact_resolver: ContactResolver, } impl DBusAgent { @@ -33,9 +31,7 @@ impl DBusAgent { Self { event_sink, signal_receiver: Arc::new(Mutex::new(Some(signal_receiver))), - contact_resolver: Arc::new(StdMutex::new(ContactResolver::new( - DefaultContactResolverBackend::default(), - ))), + contact_resolver: ContactResolver::new(DefaultContactResolverBackend::default()), } } @@ -72,7 +68,6 @@ impl DBusAgent { { let registry = dbus_registry.clone(); let receiver_arc = self.signal_receiver.clone(); - let agent_clone = self.clone(); tokio::spawn(async move { let mut receiver = receiver_arc .lock() @@ -99,7 +94,6 @@ impl DBusAgent { "Sending signal: MessagesUpdated for conversation {}", conversation_id ); - let conversation_id_for_notification = conversation_id.clone(); registry .send_signal( interface::OBJECT_PATH, @@ -109,10 +103,6 @@ impl DBusAgent { log::error!("Failed to send signal"); 0 }); - - agent_clone - .maybe_notify_on_messages_updated(&conversation_id_for_notification) - .await; } Signal::AttachmentDownloaded(attachment_id) => { log::debug!( @@ -191,7 +181,7 @@ impl DBusAgent { .map_err(|e| MethodErr::failed(&format!("Daemon error: {}", e))) } - fn resolve_participant_display_name(&self, participant: &Participant) -> String { + fn resolve_participant_display_name(&mut self, participant: &Participant) -> String { match participant { // Me (we should use a special string here...) Participant::Me => "(Me)".to_string(), @@ -201,15 +191,10 @@ impl DBusAgent { handle, contact_id: Some(contact_id), .. - } => { - if let Ok(mut resolver) = self.contact_resolver.lock() { - resolver - .get_contact_display_name(contact_id) - .unwrap_or_else(|| handle.clone()) - } else { - handle.clone() - } - } + } => self + .contact_resolver + .get_contact_display_name(contact_id) + .unwrap_or_else(|| handle.clone()), // Remote participant without a resolved contact_id Participant::Remote { handle, .. } => handle.clone(), @@ -217,113 +202,6 @@ impl DBusAgent { } } -impl DBusAgent { - fn conversation_display_name(&self, conversation: &Conversation) -> String { - if let Some(display_name) = &conversation.display_name { - if !display_name.trim().is_empty() { - return display_name.clone(); - } - } - - let names: Vec = conversation - .participants - .iter() - .filter(|participant| !matches!(participant, Participant::Me)) - .map(|participant| self.resolve_participant_display_name(participant)) - .collect(); - - if names.is_empty() { - "Kordophone".to_string() - } else { - names.join(", ") - } - } - - async fn prepare_incoming_message_notification( - &self, - conversation_id: &str, - ) -> DaemonResult> { - let conversation = match self - .send_event(|reply| Event::GetConversation(conversation_id.to_string(), reply)) - .await? - { - Some(conv) => conv, - None => return Ok(None), - }; - - if conversation.unread_count == 0 { - return Ok(None); - } - - let last_message: Option = self - .send_event(|reply| Event::GetLastMessage(conversation_id.to_string(), reply)) - .await?; - - let last_message = match last_message { - Some(message) => message, - None => return Ok(None), - }; - - let sender_participant: Participant = Participant::from(last_message.sender.clone()); - if matches!(sender_participant, Participant::Me) { - return Ok(None); - } - - let summary = self.conversation_display_name(&conversation); - let sender_display_name = self.resolve_participant_display_name(&sender_participant); - - let mut message_text = last_message.text.replace('\u{FFFC}', ""); - if message_text.trim().is_empty() { - if !last_message.attachments.is_empty() { - message_text = "Sent an attachment".to_string(); - } else { - message_text = "Sent a message".to_string(); - } - } - - let body = if sender_display_name.is_empty() { - message_text - } else { - format!("{}: {}", sender_display_name, message_text) - }; - - Ok(Some((summary, body))) - } - fn show_notification(&self, summary: &str, body: &str) -> Result<(), notify::error::Error> { - Notification::new() - .appname("Kordophone") - .summary(summary) - .body(body) - .show() - .map(|_| ()) - } - - async fn maybe_notify_on_messages_updated(&self, conversation_id: &str) { - match self - .prepare_incoming_message_notification(conversation_id) - .await - { - Ok(Some((summary, body))) => { - if let Err(error) = self.show_notification(&summary, &body) { - log::warn!( - "Failed to display notification for conversation {}: {}", - conversation_id, - error - ); - } - } - Ok(None) => {} - Err(error) => { - log::warn!( - "Unable to prepare notification for conversation {}: {}", - conversation_id, - error - ); - } - } - } -} - // // D-Bus repository interface implementation // @@ -521,8 +399,10 @@ impl DbusRepository for DBusAgent { } fn test_notification(&mut self, summary: String, body: String) -> Result<(), MethodErr> { - self.show_notification(&summary, &body) - .map_err(|e| MethodErr::failed(&format!("Failed to display notification: {}", e))) + match self.send_event_sync(|r| Event::TestNotification(summary, body, r))? { + Ok(()) => Ok(()), + Err(message) => Err(MethodErr::failed(&message)), + } } fn get_attachment_info(