diff --git a/kordophone-db/src/lib.rs b/kordophone-db/src/lib.rs index e026a7d..3abd200 100644 --- a/kordophone-db/src/lib.rs +++ b/kordophone-db/src/lib.rs @@ -7,4 +7,8 @@ pub mod settings; #[cfg(test)] mod tests; +pub mod target { + pub static REPOSITORY: &str = "repository"; +} + pub use repository::Repository; diff --git a/kordophone-db/src/models/conversation.rs b/kordophone-db/src/models/conversation.rs index c7de2de..2df10a7 100644 --- a/kordophone-db/src/models/conversation.rs +++ b/kordophone-db/src/models/conversation.rs @@ -1,4 +1,4 @@ -use crate::models::participant::Participant; +use crate::models::{message::Message, participant::Participant}; use chrono::{DateTime, NaiveDateTime}; use uuid::Uuid; @@ -27,6 +27,36 @@ impl Conversation { display_name: self.display_name.clone(), } } + + pub fn merge(&self, other: &Conversation, last_message: Option<&Message>) -> Conversation { + let mut new_conversation = self.clone(); + new_conversation.unread_count = other.unread_count; + new_conversation.participants = other.participants.clone(); + new_conversation.display_name = other.display_name.clone(); + + if let Some(last_message) = last_message { + if last_message.date > self.date { + new_conversation.date = last_message.date; + } + + if !last_message.text.is_empty() && !last_message.text.trim().is_empty() { + new_conversation.last_message_preview = Some(last_message.text.clone()); + } + } + + new_conversation + } +} + +impl PartialEq for Conversation { + fn eq(&self, other: &Self) -> bool { + self.guid == other.guid && + self.unread_count == other.unread_count && + self.display_name == other.display_name && + self.last_message_preview == other.last_message_preview && + self.date == other.date && + self.participants == other.participants + } } impl From for Conversation { diff --git a/kordophone-db/src/repository.rs b/kordophone-db/src/repository.rs index c37e956..0f104ad 100644 --- a/kordophone-db/src/repository.rs +++ b/kordophone-db/src/repository.rs @@ -14,6 +14,7 @@ use crate::{ Conversation, Message, Participant, }, schema, + target, }; pub struct Repository<'a> { @@ -323,25 +324,35 @@ impl<'a> Repository<'a> { Ok(()) } + pub fn merge_conversation_metadata(&mut self, in_conversation: Conversation) -> Result { + let mut updated = false; + let conversation = self.get_conversation_by_guid(&in_conversation.guid)?; + if let Some(conversation) = conversation { + let merged_conversation = conversation.merge(&in_conversation, None); + + if merged_conversation != conversation { + self.insert_conversation(merged_conversation)?; + updated = true; + } + } + + log::debug!(target: target::REPOSITORY, "Merged conversation metadata: {} updated: {}", in_conversation.guid, updated); + Ok(updated) + } + fn update_conversation_metadata(&mut self, conversation_guid: &str) -> Result<()> { let conversation = self.get_conversation_by_guid(conversation_guid)?; - if let Some(mut conversation) = conversation { + if let Some(conversation) = conversation { if let Some(last_message) = self.get_last_message_for_conversation(conversation_guid)? { log::debug!( + target: target::REPOSITORY, "Updating conversation metadata: {} message: {:?}", conversation_guid, last_message ); - if last_message.date > conversation.date { - conversation.date = last_message.date; - } - - if !last_message.text.is_empty() && !last_message.text.trim().is_empty() { - conversation.last_message_preview = Some(last_message.text.clone()); - } - - self.insert_conversation(conversation)?; + let merged_conversation = conversation.merge(&conversation, Some(&last_message)); + self.insert_conversation(merged_conversation)?; } } diff --git a/kordophone/src/api/http_client.rs b/kordophone/src/api/http_client.rs index 265d72e..f940cc3 100644 --- a/kordophone/src/api/http_client.rs +++ b/kordophone/src/api/http_client.rs @@ -271,6 +271,13 @@ impl APIInterface for HTTPAPIClient { Ok(token) } + async fn mark_conversation_as_read(&mut self, conversation_id: &ConversationID) -> Result<(), Self::Error> { + // SERVER JANK: This should be POST, but it's GET for some reason. + let endpoint = format!("markConversation?guid={}", conversation_id); + self.response_with_body_retry(&endpoint, Method::GET, Body::empty, true).await?; + Ok(()) + } + async fn get_messages( &mut self, conversation_id: &ConversationID, diff --git a/kordophone/src/api/mod.rs b/kordophone/src/api/mod.rs index f092307..5da1ced 100644 --- a/kordophone/src/api/mod.rs +++ b/kordophone/src/api/mod.rs @@ -64,6 +64,9 @@ pub trait APIInterface { // (POST) /authenticate async fn authenticate(&mut self, credentials: Credentials) -> Result; + // (GET) /markConversation + async fn mark_conversation_as_read(&mut self, conversation_id: &ConversationID) -> Result<(), Self::Error>; + // (WS) /updates async fn open_event_socket( &mut self, diff --git a/kordophone/src/tests/test_client.rs b/kordophone/src/tests/test_client.rs index 3709855..c51260e 100644 --- a/kordophone/src/tests/test_client.rs +++ b/kordophone/src/tests/test_client.rs @@ -148,4 +148,8 @@ impl APIInterface for TestClient { { Ok(String::from("test")) } + + async fn mark_conversation_as_read(&mut self, conversation_id: &ConversationID) -> Result<(), Self::Error> { + Ok(()) + } } diff --git a/kordophoned/include/net.buzzert.kordophonecd.Server.xml b/kordophoned/include/net.buzzert.kordophonecd.Server.xml index a0898e3..cdef983 100644 --- a/kordophoned/include/net.buzzert.kordophonecd.Server.xml +++ b/kordophoned/include/net.buzzert.kordophonecd.Server.xml @@ -43,6 +43,12 @@ value="Initiates a background sync of a single conversation with the server."/> + + + + + diff --git a/kordophoned/src/daemon/events.rs b/kordophoned/src/daemon/events.rs index 56c082a..debdbd6 100644 --- a/kordophoned/src/daemon/events.rs +++ b/kordophoned/src/daemon/events.rs @@ -26,6 +26,12 @@ pub enum Event { /// Asynchronous event for syncing a single conversation with the server. SyncConversation(String, Reply<()>), + /// Asynchronous event for marking a conversation as read. + MarkConversationAsRead(String, Reply<()>), + + /// Asynchronous event for updating the metadata for a conversation. + UpdateConversationMetadata(Conversation, Reply<()>), + /// Sent when the update stream is reconnected after a timeout or configuration change. UpdateStreamReconnected, diff --git a/kordophoned/src/daemon/mod.rs b/kordophoned/src/daemon/mod.rs index ed2edb9..3d10d3c 100644 --- a/kordophoned/src/daemon/mod.rs +++ b/kordophoned/src/daemon/mod.rs @@ -63,6 +63,7 @@ pub mod target { pub static SETTINGS: &str = "settings"; pub static UPDATES: &str = "updates"; pub static ATTACHMENTS: &str = "attachments"; + pub static DAEMON: &str = "daemon"; } pub struct Daemon { @@ -221,6 +222,31 @@ impl Daemon { reply.send(()).unwrap(); } + Event::MarkConversationAsRead(conversation_id, reply) => { + let mut db_clone = self.database.clone(); + self.runtime.spawn(async move { + let result = Self::mark_conversation_as_read_impl(&mut db_clone, conversation_id).await; + if let Err(e) = result { + log::error!(target: target::DAEMON, "Error handling mark conversation as read event: {}", e); + } + }); + + reply.send(()).unwrap(); + } + + Event::UpdateConversationMetadata(conversation, reply) => { + let mut db_clone = self.database.clone(); + let signal_sender = self.signal_sender.clone(); + self.runtime.spawn(async move { + let result = Self::update_conversation_metadata_impl(&mut db_clone, conversation, &signal_sender).await; + if let Err(e) = result { + log::error!(target: target::DAEMON, "Error handling update conversation metadata event: {}", e); + } + }); + + reply.send(()).unwrap(); + } + Event::UpdateStreamReconnected => { log::info!(target: target::UPDATES, "Update stream reconnected"); @@ -590,6 +616,33 @@ impl Daemon { Ok(()) } + async fn mark_conversation_as_read_impl( + database: &mut Arc>, + conversation_id: String, + ) -> Result<()> { + log::debug!(target: target::DAEMON, "Marking conversation as read: {}", conversation_id); + + let mut client = Self::get_client_impl(database).await?; + client.mark_conversation_as_read(&conversation_id).await?; + Ok(()) + } + + async fn update_conversation_metadata_impl( + database: &mut Arc>, + conversation: Conversation, + signal_sender: &Sender, + ) -> Result<()> { + log::debug!(target: target::DAEMON, "Updating conversation metadata: {}", conversation.guid); + let updated = database.with_repository(|r| r.merge_conversation_metadata(conversation)).await?; + if updated { + signal_sender + .send(Signal::ConversationsUpdated) + .await?; + } + + Ok(()) + } + async fn get_settings(&mut self) -> Result { let settings = self.database.with_settings(Settings::from_db).await?; Ok(settings) diff --git a/kordophoned/src/daemon/update_monitor.rs b/kordophoned/src/daemon/update_monitor.rs index 3b0da7c..329954b 100644 --- a/kordophoned/src/daemon/update_monitor.rs +++ b/kordophoned/src/daemon/update_monitor.rs @@ -65,11 +65,19 @@ impl UpdateMonitor { UpdateEventData::ConversationChanged(conversation) => { log::info!(target: target::UPDATES, "Conversation changed: {:?}", conversation); + // Explicitly update the unread count, we assume this is fresh from the notification. + let db_conversation: kordophone_db::models::Conversation = conversation.clone().into(); + self.send_event(|r| Event::UpdateConversationMetadata(db_conversation, r)) + .await + .unwrap_or_else(|e| { + log::error!("Failed to send daemon event: {}", e); + }); + // Check if we've synced this conversation recently (within 5 seconds) // This is currently a hack/workaround to prevent an infinite loop of sync events, because for some reason // imagent will post a conversation changed notification when we call getMessages. if let Some(last_sync) = self.last_sync_times.get(&conversation.guid) { - if last_sync.elapsed() < Duration::from_secs(5) { + if last_sync.elapsed() < Duration::from_secs(1) { log::info!(target: target::UPDATES, "Skipping sync for conversation id: {}. Last sync was {} seconds ago.", conversation.guid, last_sync.elapsed().as_secs_f64()); return; @@ -85,7 +93,7 @@ impl UpdateMonitor { match (&last_message, &conversation.last_message) { (Some(message), Some(conversation_message)) => { if message.id == conversation_message.guid { - log::info!(target: target::UPDATES, "Skipping sync for conversation id: {}. We already have this message.", conversation.guid); + log::info!(target: target::UPDATES, "Skipping sync for conversation id: {}. We already have this message.", &conversation.guid); return; } } diff --git a/kordophoned/src/dbus/agent.rs b/kordophoned/src/dbus/agent.rs index 21708bd..cb42995 100644 --- a/kordophoned/src/dbus/agent.rs +++ b/kordophoned/src/dbus/agent.rs @@ -233,6 +233,10 @@ impl DbusRepository for DBusAgent { self.send_event_sync(|r| Event::SyncConversation(conversation_id, r)) } + fn mark_conversation_as_read(&mut self, conversation_id: String) -> Result<(), MethodErr> { + self.send_event_sync(|r| Event::MarkConversationAsRead(conversation_id, r)) + } + fn get_messages(&mut self, conversation_id: String, last_message_id: String) -> Result, MethodErr> { let last_message_id_opt = if last_message_id.is_empty() { None diff --git a/kpcli/src/client/mod.rs b/kpcli/src/client/mod.rs index 4ee5eaa..679e795 100644 --- a/kpcli/src/client/mod.rs +++ b/kpcli/src/client/mod.rs @@ -52,6 +52,9 @@ pub enum Commands { conversation_id: String, message: String, }, + + /// Marks a conversation as read. + Mark { conversation_id: String }, } impl Commands { @@ -67,6 +70,9 @@ impl Commands { conversation_id, message, } => client.send_message(conversation_id, message).await, + Commands::Mark { conversation_id } => { + client.mark_conversation_as_read(conversation_id).await + } } } } @@ -163,4 +169,10 @@ impl ClientCli { println!("Message sent: {}", message.guid); Ok(()) } + + pub async fn mark_conversation_as_read(&mut self, conversation_id: String) -> Result<()> { + self.api.mark_conversation_as_read(&conversation_id).await?; + println!("Conversation marked as read: {}", conversation_id); + Ok(()) + } } diff --git a/kpcli/src/daemon/mod.rs b/kpcli/src/daemon/mod.rs index b67fb3d..e154bde 100644 --- a/kpcli/src/daemon/mod.rs +++ b/kpcli/src/daemon/mod.rs @@ -58,6 +58,9 @@ pub enum Commands { /// Uploads an attachment to the server, returns upload guid. UploadAttachment { path: String }, + + /// Marks a conversation as read. + MarkConversationAsRead { conversation_id: String }, } #[derive(Subcommand)] @@ -99,6 +102,9 @@ impl Commands { Commands::DownloadAttachment { attachment_id } => { client.download_attachment(attachment_id).await } + Commands::MarkConversationAsRead { conversation_id } => { + client.mark_conversation_as_read(conversation_id).await + } } } } @@ -289,4 +295,9 @@ impl DaemonCli { println!("Upload GUID: {}", upload_guid); Ok(()) } + + pub async fn mark_conversation_as_read(&mut self, conversation_id: String) -> Result<()> { + KordophoneRepository::mark_conversation_as_read(&self.proxy(), &conversation_id) + .map_err(|e| anyhow::anyhow!("Failed to mark conversation as read: {}", e)) + } }