diff --git a/kordophoned/include/net.buzzert.kordophonecd.Server.xml b/kordophoned/include/net.buzzert.kordophonecd.Server.xml index cbe2060..ca2cb2c 100644 --- a/kordophoned/include/net.buzzert.kordophonecd.Server.xml +++ b/kordophoned/include/net.buzzert.kordophonecd.Server.xml @@ -15,8 +15,8 @@ + + + + + @@ -43,6 +49,7 @@ + diff --git a/kordophoned/src/daemon/events.rs b/kordophoned/src/daemon/events.rs index 598a7a0..b7839cb 100644 --- a/kordophoned/src/daemon/events.rs +++ b/kordophoned/src/daemon/events.rs @@ -12,6 +12,9 @@ pub enum Event { /// Asynchronous event for syncing all conversations with the server. SyncAllConversations(Reply<()>), + /// Asynchronous event for syncing a single conversation with the server. + SyncConversation(String, Reply<()>), + /// Returns all known conversations from the database. GetAllConversations(Reply>), diff --git a/kordophoned/src/daemon/mod.rs b/kordophoned/src/daemon/mod.rs index 1e0375c..9cf182a 100644 --- a/kordophoned/src/daemon/mod.rs +++ b/kordophoned/src/daemon/mod.rs @@ -130,6 +130,19 @@ impl Daemon { reply.send(()).unwrap(); }, + Event::SyncConversation(conversation_id, reply) => { + let mut db_clone = self.database.clone(); + let signal_sender = self.signal_sender.clone(); + self.runtime.spawn(async move { + let result = Self::sync_conversation_impl(&mut db_clone, &signal_sender, conversation_id).await; + if let Err(e) = result { + log::error!("Error handling sync event: {}", e); + } + }); + + reply.send(()).unwrap(); + }, + Event::GetAllConversations(reply) => { let conversations = self.get_conversations().await; reply.send(conversations).unwrap(); @@ -193,24 +206,8 @@ impl Daemon { // Insert the conversation database.with_repository(|r| r.insert_conversation(conversation)).await?; - // Fetch and sync messages for this conversation - let last_message_id = database.with_repository(|r| -> Option { - r.get_last_message_for_conversation(&conversation_id) - .unwrap_or(None) - .map(|m| m.id) - }).await; - - log::debug!(target: target::SYNC, "Fetching messages for conversation {}", conversation_id); - log::debug!(target: target::SYNC, "Last message id: {:?}", last_message_id); - - let messages = client.get_messages(&conversation_id, None, None, last_message_id).await?; - let db_messages: Vec = messages.into_iter() - .map(kordophone_db::models::Message::from) - .collect(); - - // Insert each message - log::debug!(target: target::SYNC, "Inserting {} messages for conversation {}", db_messages.len(), conversation_id); - database.with_repository(|r| r.insert_messages(&conversation_id, db_messages)).await?; + // Sync individual conversation. + Self::sync_conversation_impl(database, signal_sender, conversation_id).await?; } // Send conversations updated signal. @@ -220,6 +217,40 @@ impl Daemon { Ok(()) } + async fn sync_conversation_impl(database: &mut Arc>, signal_sender: &Sender, conversation_id: String) -> Result<()> { + log::info!(target: target::SYNC, "Starting conversation sync for {}", conversation_id); + + let mut client = Self::get_client_impl(database).await?; + + // Fetch and sync messages for this conversation + let last_message_id = database.with_repository(|r| -> Option { + r.get_last_message_for_conversation(&conversation_id) + .unwrap_or(None) + .map(|m| m.id) + }).await; + + log::debug!(target: target::SYNC, "Fetching messages for conversation {}", &conversation_id); + log::debug!(target: target::SYNC, "Last message id: {:?}", last_message_id); + + let messages = client.get_messages(&conversation_id, None, None, last_message_id).await?; + let db_messages: Vec = messages.into_iter() + .map(kordophone_db::models::Message::from) + .collect(); + + // Insert each message + let num_messages = db_messages.len(); + log::debug!(target: target::SYNC, "Inserting {} messages for conversation {}", num_messages, &conversation_id); + database.with_repository(|r| r.insert_messages(&conversation_id, db_messages)).await?; + + // Send messages updated signal, if we actually inserted any messages. + if num_messages > 0 { + signal_sender.send(Signal::MessagesUpdated(conversation_id.clone())).await?; + } + + log::info!(target: target::SYNC, "Synchronized {} messages for conversation {}", num_messages, &conversation_id); + Ok(()) + } + async fn get_settings(&mut self) -> Result { let settings = self.database.with_settings(Settings::from_db ).await?; diff --git a/kordophoned/src/daemon/signals.rs b/kordophoned/src/daemon/signals.rs index 05bb75f..c4fb715 100644 --- a/kordophoned/src/daemon/signals.rs +++ b/kordophoned/src/daemon/signals.rs @@ -1,4 +1,10 @@ #[derive(Debug, Clone)] pub enum Signal { + /// Emitted when the list of conversations is updated. ConversationsUpdated, + + /// Emitted when the list of messages for a conversation is updated. + /// Parameters: + /// - conversation_id: The ID of the conversation that was updated. + MessagesUpdated(String), } diff --git a/kordophoned/src/dbus/mod.rs b/kordophoned/src/dbus/mod.rs index a901658..143fb83 100644 --- a/kordophoned/src/dbus/mod.rs +++ b/kordophoned/src/dbus/mod.rs @@ -11,5 +11,6 @@ pub mod interface { pub mod signals { pub use crate::interface::NetBuzzertKordophoneRepositoryConversationsUpdated as ConversationsUpdated; + pub use crate::interface::NetBuzzertKordophoneRepositoryMessagesUpdated as MessagesUpdated; } } \ No newline at end of file diff --git a/kordophoned/src/dbus/server_impl.rs b/kordophoned/src/dbus/server_impl.rs index 4c7a5b6..a8d01b1 100644 --- a/kordophoned/src/dbus/server_impl.rs +++ b/kordophoned/src/dbus/server_impl.rs @@ -74,6 +74,10 @@ impl DbusRepository for ServerImpl { self.send_event_sync(Event::SyncAllConversations) } + fn sync_conversation(&mut self, conversation_id: String) -> Result<(), dbus::MethodErr> { + self.send_event_sync(|r| Event::SyncConversation(conversation_id, r)) + } + fn get_messages(&mut self, conversation_id: String, last_message_id: String) -> Result, dbus::MethodErr> { let last_message_id_opt = if last_message_id.is_empty() { None diff --git a/kordophoned/src/main.rs b/kordophoned/src/main.rs index fa48fef..56d3399 100644 --- a/kordophoned/src/main.rs +++ b/kordophoned/src/main.rs @@ -65,6 +65,15 @@ async fn main() { 0 }); } + + Signal::MessagesUpdated(conversation_id) => { + log::info!("Sending signal: MessagesUpdated for conversation {}", conversation_id); + endpoint.send_signal(interface::OBJECT_PATH, DbusSignals::MessagesUpdated{ conversation_id }) + .unwrap_or_else(|_| { + log::error!("Failed to send signal"); + 0 + }); + } } } }); diff --git a/kpcli/src/daemon/mod.rs b/kpcli/src/daemon/mod.rs index e159c3f..561c60a 100644 --- a/kpcli/src/daemon/mod.rs +++ b/kpcli/src/daemon/mod.rs @@ -21,7 +21,9 @@ pub enum Commands { Conversations, /// Runs a sync operation. - Sync, + Sync { + conversation_id: Option, + }, /// Prints the server Kordophone version. Version, @@ -69,7 +71,7 @@ impl Commands { match cmd { Commands::Version => client.print_version().await, Commands::Conversations => client.print_conversations().await, - Commands::Sync => client.sync_conversations().await, + Commands::Sync { conversation_id } => client.sync_conversations(conversation_id).await, Commands::Config { command } => client.config(command).await, Commands::Signals => client.wait_for_signals().await, Commands::Messages { conversation_id, last_message_id } => client.print_messages(conversation_id, last_message_id).await, @@ -109,9 +111,14 @@ impl DaemonCli { Ok(()) } - pub async fn sync_conversations(&mut self) -> Result<()> { - KordophoneRepository::sync_all_conversations(&self.proxy()) - .map_err(|e| anyhow::anyhow!("Failed to sync conversations: {}", e)) + pub async fn sync_conversations(&mut self, conversation_id: Option) -> Result<()> { + if let Some(conversation_id) = conversation_id { + KordophoneRepository::sync_conversation(&self.proxy(), &conversation_id) + .map_err(|e| anyhow::anyhow!("Failed to sync conversation: {}", e)) + } else { + KordophoneRepository::sync_all_conversations(&self.proxy()) + .map_err(|e| anyhow::anyhow!("Failed to sync conversations: {}", e)) + } } pub async fn print_messages(&mut self, conversation_id: String, last_message_id: Option) -> Result<()> {