use crate::daemon::{ events::{Event, Reply}, target, Daemon, DaemonResult, }; use futures_util::SinkExt; use kordophone::api::event_socket::{EventSocket, SinkMessage}; use kordophone::model::event::Event as UpdateEvent; use kordophone::model::event::EventData as UpdateEventData; use kordophone::APIInterface; use kordophone_db::database::Database; use kordophone_db::database::DatabaseAccess; use std::collections::HashMap; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::Mutex; pub enum UpdateMonitorCommand { Restart, } pub struct UpdateMonitor { command_tx: Option>, command_rx: Receiver, database: Arc>, event_sender: Sender, last_sync_times: HashMap, update_seq: Option, first_connection: bool, } impl UpdateMonitor { pub fn new(database: Arc>, event_sender: Sender) -> Self { let (command_tx, command_rx) = tokio::sync::mpsc::channel(100); Self { database, event_sender, last_sync_times: HashMap::new(), update_seq: None, first_connection: false, // optimistic assumption that we're not reconnecting the first time. command_tx: Some(command_tx), command_rx, } } pub fn take_command_channel(&mut self) -> Sender { self.command_tx.take().unwrap() } async fn send_event(&self, make_event: impl FnOnce(Reply) -> Event) -> DaemonResult { let (reply_tx, reply_rx) = tokio::sync::oneshot::channel(); self.event_sender .send(make_event(reply_tx)) .await .map_err(|_| "Failed to send event")?; reply_rx.await.map_err(|_| "Failed to receive reply".into()) } async fn handle_update(&mut self, update: UpdateEvent) { match update.data { UpdateEventData::ConversationChanged(conversation) => { log::info!(target: target::UPDATES, "Conversation changed: {}", conversation.guid); // Explicitly update the unread count, we assume this is fresh from the notification. let db_conversation: kordophone_db::models::Conversation = conversation.clone().into(); self.send_event(|r| Event::UpdateConversationMetadata(db_conversation, r)) .await .unwrap_or_else(|e| { log::error!("Failed to send daemon event: {}", e); }); // Check if we've synced this conversation recently (within 5 seconds) // This is currently a hack/workaround to prevent an infinite loop of sync events, because for some reason // imagent will post a conversation changed notification when we call getMessages. if let Some(last_sync) = self.last_sync_times.get(&conversation.guid) { if last_sync.elapsed() < Duration::from_secs(1) { log::warn!(target: target::UPDATES, "Skipping sync for conversation id: {}. Last sync was {} seconds ago.", conversation.guid, last_sync.elapsed().as_secs_f64()); return; } } // This is the non-hacky path once we can reason about chat items with associatedMessageGUIDs (e.g., reactions). let last_message = self .database .with_repository(|r| r.get_last_message_for_conversation(&conversation.guid)) .await .unwrap_or_default(); match (&last_message, &conversation.last_message) { (Some(message), Some(conversation_message)) => { if message.id == conversation_message.guid { log::warn!(target: target::UPDATES, "Skipping sync for conversation id: {}. We already have this message.", &conversation.guid); return; } } _ => {} }; // Update the last sync time and proceed with sync self.last_sync_times .insert(conversation.guid.clone(), Instant::now()); log::info!(target: target::UPDATES, "Syncing new messages for conversation id: {}", conversation.guid); self.send_event(|r| Event::SyncConversation(conversation.guid, r)) .await .unwrap_or_else(|e| { log::error!("Failed to send daemon event: {}", e); }); } UpdateEventData::MessageReceived(conversation, message) => { log::info!(target: target::UPDATES, "Message received: msgid:{:?}, convid:{:?}", message.guid, conversation.guid); log::info!(target: target::UPDATES, "Triggering message sync for conversation id: {}", conversation.guid); self.send_event(|r| Event::SyncConversation(conversation.guid, r)) .await .unwrap_or_else(|e| { log::error!("Failed to send daemon event: {}", e); }); } } } pub async fn run(&mut self) { use futures_util::stream::StreamExt; log::info!(target: target::UPDATES, "Starting update monitor"); loop { log::debug!(target: target::UPDATES, "Creating client"); let mut client = match Daemon::get_client_impl(&mut self.database).await { Ok(client) => client, Err(e) => { log::error!("Failed to get client: {}", e); log::warn!("Retrying in 5 seconds..."); tokio::time::sleep(std::time::Duration::from_secs(5)).await; continue; } }; log::debug!(target: target::UPDATES, "Opening event socket"); let socket = match client.open_event_socket(self.update_seq).await { Ok(events) => events, Err(e) => { log::warn!("Failed to open event socket: {}", e); log::warn!("Retrying in 5 seconds..."); tokio::time::sleep(std::time::Duration::from_secs(5)).await; continue; } }; log::debug!(target: target::UPDATES, "Starting event stream"); let (mut event_stream, mut sink) = socket.events().await; // We won't know if the websocket is dead until we try to send a message, so time out waiting for // a message every 30 seconds. let mut timeout = tokio::time::interval(Duration::from_secs(10)); timeout.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); // First tick will happen immediately timeout.tick().await; // Track when the last ping was sent so we know when to give up // waiting for the corresponding pong. let mut ping_sent_at: Option = None; loop { tokio::select! { Some(result) = event_stream.next() => { match result { Ok(socket_event) => { match socket_event { kordophone::api::event_socket::SocketEvent::Update(event) => { self.handle_update(event).await; } kordophone::api::event_socket::SocketEvent::Pong => { log::debug!(target: target::UPDATES, "Received websocket pong"); } } if self.first_connection { self.event_sender.send(Event::UpdateStreamReconnected).await.unwrap(); self.first_connection = false; } // Any successfully handled message (update or pong) keeps the connection alive. ping_sent_at = None; timeout.reset(); } Err(e) => { log::error!("Error in event stream: {}", e); self.first_connection = true; break; // Break inner loop to reconnect } } } _ = timeout.tick() => { // If we previously sent a ping and haven't heard back since the timeout, we'll assume the connection is dead. if let Some(_) = ping_sent_at { log::error!(target: target::UPDATES, "Ping timed out. Restarting stream."); self.first_connection = true; break; } log::debug!("Sending websocket ping on timer"); match sink.send(SinkMessage::Ping).await { Ok(_) => { ping_sent_at = Some(Instant::now()); } Err(e) => { log::error!(target: target::UPDATES, "Error writing ping to event socket: {}, restarting stream.", e); self.first_connection = true; break; } } } Some(command) = self.command_rx.recv() => { match command { UpdateMonitorCommand::Restart => { log::info!(target: target::UPDATES, "Restarting update monitor"); self.first_connection = true; break; } } } } } // Add a small delay before reconnecting to avoid tight reconnection loops tokio::time::sleep(Duration::from_secs(1)).await; } } }