From 2314713bb43939f2b752a6cf4754b2784428aedc Mon Sep 17 00:00:00 2001 From: James Magahern Date: Thu, 1 May 2025 20:36:43 -0700 Subject: [PATCH] daemon: incorporate update monitor in daemon activities --- kordophone-db/src/repository.rs | 17 ++++ kordophone/src/api/http_client.rs | 2 +- .../net.buzzert.kordophonecd.Server.xml | 5 + kordophoned/src/daemon/events.rs | 3 + kordophoned/src/daemon/mod.rs | 61 +++++++++++- kordophoned/src/daemon/update_monitor.rs | 98 +++++++++++++++++++ kordophoned/src/dbus/server_impl.rs | 4 + kpcli/src/daemon/mod.rs | 11 ++- 8 files changed, 196 insertions(+), 5 deletions(-) create mode 100644 kordophoned/src/daemon/update_monitor.rs diff --git a/kordophone-db/src/repository.rs b/kordophone-db/src/repository.rs index 8189d62..dee4497 100644 --- a/kordophone-db/src/repository.rs +++ b/kordophone-db/src/repository.rs @@ -127,6 +127,9 @@ impl<'a> Repository<'a> { )) .execute(self.connection)?; + // Update conversation date + self.update_conversation_metadata(conversation_guid, &db_message)?; + Ok(()) } @@ -174,6 +177,9 @@ impl<'a> Repository<'a> { .values(&conv_msg_records) .execute(self.connection)?; + // Update conversation date + self.update_conversation_metadata(conversation_guid, &db_messages.last().unwrap())?; + Ok(()) } @@ -234,6 +240,17 @@ impl<'a> Repository<'a> { Ok(()) } + fn update_conversation_metadata(&mut self, conversation_guid: &str, last_message: &MessageRecord) -> Result<()> { + let conversation = self.get_conversation_by_guid(conversation_guid)?; + if let Some(mut conversation) = conversation { + conversation.date = last_message.date; + conversation.last_message_preview = Some(last_message.text.clone()); + self.insert_conversation(conversation)?; + } + + Ok(()) + } + // Helper function to get the last inserted row ID // This is a workaround since the Sqlite backend doesn't support `RETURNING` // Huge caveat with this is that it depends on whatever the last insert was, prevents concurrent inserts. diff --git a/kordophone/src/api/http_client.rs b/kordophone/src/api/http_client.rs index c7321b1..bfc2488 100644 --- a/kordophone/src/api/http_client.rs +++ b/kordophone/src/api/http_client.rs @@ -243,7 +243,7 @@ impl APIInterface for HTTPAPIClient { request.headers_mut().insert("Authorization", header_value); } - let (socket, response) = connect_async(request).await.unwrap(); + let (socket, response) = connect_async(request).await.map_err(Error::from)?; log::debug!("Websocket connected: {:?}", response.status()); if response.status() != StatusCode::SWITCHING_PROTOCOLS { diff --git a/kordophoned/include/net.buzzert.kordophonecd.Server.xml b/kordophoned/include/net.buzzert.kordophonecd.Server.xml index 37ad35f..b346742 100644 --- a/kordophoned/include/net.buzzert.kordophonecd.Server.xml +++ b/kordophoned/include/net.buzzert.kordophonecd.Server.xml @@ -24,6 +24,11 @@ + + + + diff --git a/kordophoned/src/daemon/events.rs b/kordophoned/src/daemon/events.rs index 97427e9..b356992 100644 --- a/kordophoned/src/daemon/events.rs +++ b/kordophoned/src/daemon/events.rs @@ -9,6 +9,9 @@ pub enum Event { /// Get the version of the daemon. GetVersion(Reply), + /// Asynchronous event for syncing the conversation list with the server. + SyncConversationList(Reply<()>), + /// Asynchronous event for syncing all conversations with the server. SyncAllConversations(Reply<()>), diff --git a/kordophoned/src/daemon/mod.rs b/kordophoned/src/daemon/mod.rs index a7f9997..30e4266 100644 --- a/kordophoned/src/daemon/mod.rs +++ b/kordophoned/src/daemon/mod.rs @@ -30,6 +30,9 @@ use kordophone::api::{ AuthenticationStore, }; +mod update_monitor; +use update_monitor::UpdateMonitor; + #[derive(Debug, Error)] pub enum DaemonError { #[error("Client Not Configured")] @@ -87,11 +90,11 @@ impl AuthenticationStore for DatabaseAuthenticationStore { } } -mod target { +pub mod target { pub static SYNC: &str = "sync"; pub static EVENT: &str = "event"; + pub static UPDATES: &str = "updates"; } - pub struct Daemon { pub event_sender: Sender, event_receiver: Receiver, @@ -139,6 +142,13 @@ impl Daemon { log::info!("Starting daemon version {}", self.version); log::debug!("Debug logging enabled."); + let mut update_monitor = UpdateMonitor::new(self.database.clone(), self.event_sender.clone()); + + tokio::spawn(async move { + log::info!(target: target::UPDATES, "Starting update monitor"); + update_monitor.run().await; // should run indefinitely + }); + while let Some(event) = self.event_receiver.recv().await { log::debug!(target: target::EVENT, "Received event: {:?}", event); self.handle_event(event).await; @@ -151,6 +161,20 @@ impl Daemon { reply.send(self.version.clone()).unwrap(); }, + Event::SyncConversationList(reply) => { + let mut db_clone = self.database.clone(); + let signal_sender = self.signal_sender.clone(); + self.runtime.spawn(async move { + let result = Self::sync_conversation_list(&mut db_clone, &signal_sender).await; + if let Err(e) = result { + log::error!("Error handling sync event: {}", e); + } + }); + + // This is a background operation, so return right away. + reply.send(()).unwrap(); + }, + Event::SyncAllConversations(reply) => { let mut db_clone = self.database.clone(); let signal_sender = self.signal_sender.clone(); @@ -231,8 +255,32 @@ impl Daemon { self.database.lock().await.with_repository(|r| r.get_messages_for_conversation(&conversation_id).unwrap()).await } + async fn sync_conversation_list(database: &mut Arc>, signal_sender: &Sender) -> Result<()> { + log::info!(target: target::SYNC, "Starting list conversation sync"); + + let mut client = Self::get_client_impl(database).await?; + + // Fetch conversations from server + let fetched_conversations = client.get_conversations().await?; + let db_conversations: Vec = fetched_conversations.into_iter() + .map(kordophone_db::models::Conversation::from) + .collect(); + + // Insert each conversation + let num_conversations = db_conversations.len(); + for conversation in db_conversations { + database.with_repository(|r| r.insert_conversation(conversation)).await?; + } + + // Send conversations updated signal + signal_sender.send(Signal::ConversationsUpdated).await?; + + log::info!(target: target::SYNC, "Synchronized {} conversations", num_conversations); + Ok(()) + } + async fn sync_all_conversations_impl(database: &mut Arc>, signal_sender: &Sender) -> Result<()> { - log::info!(target: target::SYNC, "Starting conversation sync"); + log::info!(target: target::SYNC, "Starting full conversation sync"); let mut client = Self::get_client_impl(database).await?; @@ -266,6 +314,13 @@ impl Daemon { let mut client = Self::get_client_impl(database).await?; + // Check if conversation exists in database. + let conversation = database.with_repository(|r| r.get_conversation_by_guid(&conversation_id)).await?; + if conversation.is_none() { + // If the conversation doesn't exist, first do a conversation list sync. + Self::sync_conversation_list(database, signal_sender).await?; + } + // Fetch and sync messages for this conversation let last_message_id = database.with_repository(|r| -> Option { r.get_last_message_for_conversation(&conversation_id) diff --git a/kordophoned/src/daemon/update_monitor.rs b/kordophoned/src/daemon/update_monitor.rs new file mode 100644 index 0000000..57ed1ca --- /dev/null +++ b/kordophoned/src/daemon/update_monitor.rs @@ -0,0 +1,98 @@ +use crate::daemon::{ + Daemon, + DaemonResult, + + events::{Event, Reply}, + target, +}; + +use kordophone::APIInterface; +use kordophone::api::event_socket::EventSocket; +use kordophone::model::event::Event as UpdateEvent; + +use kordophone_db::database::Database; + +use tokio::sync::mpsc::Sender; +use std::sync::Arc; +use tokio::sync::Mutex; + +pub struct UpdateMonitor { + database: Arc>, + event_sender: Sender, +} + +impl UpdateMonitor { + pub fn new(database: Arc>, event_sender: Sender) -> Self { + Self { database, event_sender } + } + + pub async fn send_event( + &self, + make_event: impl FnOnce(Reply) -> Event, + ) -> DaemonResult { + let (reply_tx, reply_rx) = tokio::sync::oneshot::channel(); + self.event_sender.send(make_event(reply_tx)) + .await + .map_err(|_| "Failed to send event")?; + + reply_rx.await.map_err(|_| "Failed to receive reply".into()) + } + + async fn handle_update(&mut self, update: UpdateEvent) { + match update { + UpdateEvent::ConversationChanged(conversation) => { + log::info!(target: target::UPDATES, "Conversation changed: {:?}", conversation); + log::info!(target: target::UPDATES, "Triggering conversation list sync"); + self.send_event(Event::SyncConversationList).await + .unwrap_or_else(|e| { + log::error!("Failed to send daemon event: {}", e); + }); + } + + UpdateEvent::MessageReceived(conversation, message) => { + log::info!(target: target::UPDATES, "Message received: msgid:{:?}, convid:{:?}", message.guid, conversation.guid); + log::info!(target: target::UPDATES, "Triggering message sync for conversation id: {}", conversation.guid); + self.send_event(|r| Event::SyncConversation(conversation.guid, r)).await + .unwrap_or_else(|e| { + log::error!("Failed to send daemon event: {}", e); + }); + } + } + } + + pub async fn run(&mut self) { + use futures_util::stream::StreamExt; + + log::info!(target: target::UPDATES, "Starting update monitor"); + + loop { + log::debug!(target: target::UPDATES, "Creating client"); + let mut client = match Daemon::get_client_impl(&mut self.database).await { + Ok(client) => client, + Err(e) => { + log::error!("Failed to get client: {}", e); + log::warn!("Retrying in 5 seconds..."); + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + continue; + } + }; + + log::debug!(target: target::UPDATES, "Opening event socket"); + let socket = match client.open_event_socket().await { + Ok(events) => events, + Err(e) => { + log::warn!("Failed to open event socket: {}", e); + log::warn!("Retrying in 5 seconds..."); + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + continue; + } + }; + + log::debug!(target: target::UPDATES, "Starting event stream"); + let mut event_stream = socket.events().await; + while let Some(Ok(event)) = event_stream.next().await { + self.handle_update(event).await; + } + } + } +} \ No newline at end of file diff --git a/kordophoned/src/dbus/server_impl.rs b/kordophoned/src/dbus/server_impl.rs index 9bc0c75..5c00a58 100644 --- a/kordophoned/src/dbus/server_impl.rs +++ b/kordophoned/src/dbus/server_impl.rs @@ -67,6 +67,10 @@ impl DbusRepository for ServerImpl { }) } + fn sync_conversation_list(&mut self) -> Result<(), dbus::MethodErr> { + self.send_event_sync(Event::SyncConversationList) + } + fn sync_all_conversations(&mut self) -> Result<(), dbus::MethodErr> { self.send_event_sync(Event::SyncAllConversations) } diff --git a/kpcli/src/daemon/mod.rs b/kpcli/src/daemon/mod.rs index 2212f8b..ad5b52f 100644 --- a/kpcli/src/daemon/mod.rs +++ b/kpcli/src/daemon/mod.rs @@ -20,11 +20,14 @@ pub enum Commands { /// Gets all known conversations. Conversations, - /// Runs a sync operation. + /// Runs a full sync operation for a conversation and its messages. Sync { conversation_id: Option, }, + /// Runs a sync operation for the conversation list. + SyncList, + /// Prints the server Kordophone version. Version, @@ -75,6 +78,7 @@ impl Commands { Commands::Version => client.print_version().await, Commands::Conversations => client.print_conversations().await, Commands::Sync { conversation_id } => client.sync_conversations(conversation_id).await, + Commands::SyncList => client.sync_conversations_list().await, Commands::Config { command } => client.config(command).await, Commands::Signals => client.wait_for_signals().await, Commands::Messages { conversation_id, last_message_id } => client.print_messages(conversation_id, last_message_id).await, @@ -125,6 +129,11 @@ impl DaemonCli { } } + pub async fn sync_conversations_list(&mut self) -> Result<()> { + KordophoneRepository::sync_conversation_list(&self.proxy()) + .map_err(|e| anyhow::anyhow!("Failed to sync conversations: {}", e)) + } + pub async fn print_messages(&mut self, conversation_id: String, last_message_id: Option) -> Result<()> { let messages = KordophoneRepository::get_messages(&self.proxy(), &conversation_id, &last_message_id.unwrap_or_default())?; println!("Number of messages: {}", messages.len());