2025-05-01 20:36:43 -07:00
|
|
|
use crate::daemon::{
|
|
|
|
|
events::{Event, Reply},
|
2025-06-06 16:39:31 -07:00
|
|
|
target, Daemon, DaemonResult,
|
2025-05-01 20:36:43 -07:00
|
|
|
};
|
|
|
|
|
|
2025-06-13 16:45:28 -07:00
|
|
|
use futures_util::SinkExt;
|
|
|
|
|
use kordophone::api::event_socket::{EventSocket, SinkMessage};
|
2025-05-01 20:36:43 -07:00
|
|
|
use kordophone::model::event::Event as UpdateEvent;
|
2025-05-14 17:39:23 -07:00
|
|
|
use kordophone::model::event::EventData as UpdateEventData;
|
2025-06-06 16:39:31 -07:00
|
|
|
use kordophone::APIInterface;
|
2025-05-01 20:36:43 -07:00
|
|
|
|
|
|
|
|
use kordophone_db::database::Database;
|
2025-05-04 00:15:13 -07:00
|
|
|
use kordophone_db::database::DatabaseAccess;
|
2025-05-01 20:36:43 -07:00
|
|
|
|
2025-05-12 20:46:26 -07:00
|
|
|
use std::collections::HashMap;
|
2025-06-06 16:39:31 -07:00
|
|
|
use std::sync::Arc;
|
2025-05-12 20:46:26 -07:00
|
|
|
use std::time::{Duration, Instant};
|
2025-06-13 17:11:29 -07:00
|
|
|
use tokio::sync::mpsc::{Receiver, Sender};
|
2025-06-06 16:39:31 -07:00
|
|
|
use tokio::sync::Mutex;
|
2025-05-01 20:36:43 -07:00
|
|
|
|
2025-06-13 17:11:29 -07:00
|
|
|
pub enum UpdateMonitorCommand {
|
|
|
|
|
Restart,
|
|
|
|
|
}
|
|
|
|
|
|
2025-05-01 20:36:43 -07:00
|
|
|
pub struct UpdateMonitor {
|
2025-06-13 17:11:29 -07:00
|
|
|
command_tx: Option<Sender<UpdateMonitorCommand>>,
|
|
|
|
|
command_rx: Receiver<UpdateMonitorCommand>,
|
2025-05-01 20:36:43 -07:00
|
|
|
database: Arc<Mutex<Database>>,
|
|
|
|
|
event_sender: Sender<Event>,
|
2025-05-12 20:46:26 -07:00
|
|
|
last_sync_times: HashMap<String, Instant>,
|
2025-05-14 17:39:23 -07:00
|
|
|
update_seq: Option<u64>,
|
2025-06-13 16:45:28 -07:00
|
|
|
first_connection: bool,
|
2025-05-01 20:36:43 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl UpdateMonitor {
|
|
|
|
|
pub fn new(database: Arc<Mutex<Database>>, event_sender: Sender<Event>) -> Self {
|
2025-06-16 19:06:35 -07:00
|
|
|
let (command_tx, command_rx) = tokio::sync::mpsc::channel(100);
|
2025-06-06 16:39:31 -07:00
|
|
|
Self {
|
|
|
|
|
database,
|
2025-05-12 20:46:26 -07:00
|
|
|
event_sender,
|
|
|
|
|
last_sync_times: HashMap::new(),
|
2025-05-14 17:39:23 -07:00
|
|
|
update_seq: None,
|
2025-06-16 19:26:13 -07:00
|
|
|
first_connection: false, // optimistic assumption that we're not reconnecting the first time.
|
2025-06-13 17:11:29 -07:00
|
|
|
command_tx: Some(command_tx),
|
|
|
|
|
command_rx,
|
2025-05-12 20:46:26 -07:00
|
|
|
}
|
2025-05-01 20:36:43 -07:00
|
|
|
}
|
|
|
|
|
|
2025-06-13 17:11:29 -07:00
|
|
|
pub fn take_command_channel(&mut self) -> Sender<UpdateMonitorCommand> {
|
|
|
|
|
self.command_tx.take().unwrap()
|
|
|
|
|
}
|
|
|
|
|
|
2025-06-16 19:26:13 -07:00
|
|
|
async fn send_event<T>(&self, make_event: impl FnOnce(Reply<T>) -> Event) -> DaemonResult<T> {
|
2025-05-01 20:36:43 -07:00
|
|
|
let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
|
2025-06-06 16:39:31 -07:00
|
|
|
self.event_sender
|
|
|
|
|
.send(make_event(reply_tx))
|
2025-05-01 20:36:43 -07:00
|
|
|
.await
|
|
|
|
|
.map_err(|_| "Failed to send event")?;
|
2025-06-06 16:39:31 -07:00
|
|
|
|
2025-05-01 20:36:43 -07:00
|
|
|
reply_rx.await.map_err(|_| "Failed to receive reply".into())
|
|
|
|
|
}
|
2025-06-06 16:39:31 -07:00
|
|
|
|
2025-05-01 20:36:43 -07:00
|
|
|
async fn handle_update(&mut self, update: UpdateEvent) {
|
2025-05-14 17:39:23 -07:00
|
|
|
match update.data {
|
|
|
|
|
UpdateEventData::ConversationChanged(conversation) => {
|
2025-05-01 20:36:43 -07:00
|
|
|
log::info!(target: target::UPDATES, "Conversation changed: {:?}", conversation);
|
2025-05-04 00:15:13 -07:00
|
|
|
|
2025-06-18 15:02:04 -07:00
|
|
|
// Explicitly update the unread count, we assume this is fresh from the notification.
|
|
|
|
|
let db_conversation: kordophone_db::models::Conversation = conversation.clone().into();
|
|
|
|
|
self.send_event(|r| Event::UpdateConversationMetadata(db_conversation, r))
|
|
|
|
|
.await
|
|
|
|
|
.unwrap_or_else(|e| {
|
|
|
|
|
log::error!("Failed to send daemon event: {}", e);
|
|
|
|
|
});
|
|
|
|
|
|
2025-05-12 20:46:26 -07:00
|
|
|
// Check if we've synced this conversation recently (within 5 seconds)
|
|
|
|
|
// This is currently a hack/workaround to prevent an infinite loop of sync events, because for some reason
|
2025-06-06 16:39:31 -07:00
|
|
|
// imagent will post a conversation changed notification when we call getMessages.
|
2025-05-12 20:46:26 -07:00
|
|
|
if let Some(last_sync) = self.last_sync_times.get(&conversation.guid) {
|
2025-06-18 15:02:04 -07:00
|
|
|
if last_sync.elapsed() < Duration::from_secs(1) {
|
2025-06-27 00:52:09 -07:00
|
|
|
log::warn!(target: target::UPDATES, "Skipping sync for conversation id: {}. Last sync was {} seconds ago.",
|
2025-05-12 20:46:26 -07:00
|
|
|
conversation.guid, last_sync.elapsed().as_secs_f64());
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2025-06-06 16:39:31 -07:00
|
|
|
// This is the non-hacky path once we can reason about chat items with associatedMessageGUIDs (e.g., reactions).
|
|
|
|
|
let last_message = self
|
|
|
|
|
.database
|
|
|
|
|
.with_repository(|r| r.get_last_message_for_conversation(&conversation.guid))
|
|
|
|
|
.await
|
|
|
|
|
.unwrap_or_default();
|
2025-05-12 20:46:26 -07:00
|
|
|
match (&last_message, &conversation.last_message) {
|
2025-05-04 00:15:13 -07:00
|
|
|
(Some(message), Some(conversation_message)) => {
|
|
|
|
|
if message.id == conversation_message.guid {
|
2025-06-27 00:52:09 -07:00
|
|
|
log::warn!(target: target::UPDATES, "Skipping sync for conversation id: {}. We already have this message.", &conversation.guid);
|
2025-05-12 20:46:26 -07:00
|
|
|
return;
|
2025-05-04 00:15:13 -07:00
|
|
|
}
|
|
|
|
|
}
|
2025-05-12 20:46:26 -07:00
|
|
|
_ => {}
|
2025-05-04 00:15:13 -07:00
|
|
|
};
|
|
|
|
|
|
2025-05-12 20:46:26 -07:00
|
|
|
// Update the last sync time and proceed with sync
|
2025-06-06 16:39:31 -07:00
|
|
|
self.last_sync_times
|
|
|
|
|
.insert(conversation.guid.clone(), Instant::now());
|
|
|
|
|
|
2025-05-12 20:46:26 -07:00
|
|
|
log::info!(target: target::UPDATES, "Syncing new messages for conversation id: {}", conversation.guid);
|
2025-06-06 16:39:31 -07:00
|
|
|
self.send_event(|r| Event::SyncConversation(conversation.guid, r))
|
|
|
|
|
.await
|
2025-05-04 00:15:13 -07:00
|
|
|
.unwrap_or_else(|e| {
|
|
|
|
|
log::error!("Failed to send daemon event: {}", e);
|
|
|
|
|
});
|
2025-05-01 20:36:43 -07:00
|
|
|
}
|
|
|
|
|
|
2025-05-14 17:39:23 -07:00
|
|
|
UpdateEventData::MessageReceived(conversation, message) => {
|
2025-05-01 20:36:43 -07:00
|
|
|
log::info!(target: target::UPDATES, "Message received: msgid:{:?}, convid:{:?}", message.guid, conversation.guid);
|
|
|
|
|
log::info!(target: target::UPDATES, "Triggering message sync for conversation id: {}", conversation.guid);
|
2025-06-06 16:39:31 -07:00
|
|
|
self.send_event(|r| Event::SyncConversation(conversation.guid, r))
|
|
|
|
|
.await
|
2025-05-01 20:36:43 -07:00
|
|
|
.unwrap_or_else(|e| {
|
|
|
|
|
log::error!("Failed to send daemon event: {}", e);
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2025-06-06 16:39:31 -07:00
|
|
|
|
2025-05-01 20:36:43 -07:00
|
|
|
pub async fn run(&mut self) {
|
|
|
|
|
use futures_util::stream::StreamExt;
|
|
|
|
|
|
|
|
|
|
log::info!(target: target::UPDATES, "Starting update monitor");
|
|
|
|
|
|
|
|
|
|
loop {
|
|
|
|
|
log::debug!(target: target::UPDATES, "Creating client");
|
|
|
|
|
let mut client = match Daemon::get_client_impl(&mut self.database).await {
|
|
|
|
|
Ok(client) => client,
|
|
|
|
|
Err(e) => {
|
|
|
|
|
log::error!("Failed to get client: {}", e);
|
|
|
|
|
log::warn!("Retrying in 5 seconds...");
|
|
|
|
|
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
log::debug!(target: target::UPDATES, "Opening event socket");
|
2025-05-14 17:39:23 -07:00
|
|
|
let socket = match client.open_event_socket(self.update_seq).await {
|
2025-05-01 20:36:43 -07:00
|
|
|
Ok(events) => events,
|
|
|
|
|
Err(e) => {
|
|
|
|
|
log::warn!("Failed to open event socket: {}", e);
|
|
|
|
|
log::warn!("Retrying in 5 seconds...");
|
|
|
|
|
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
log::debug!(target: target::UPDATES, "Starting event stream");
|
2025-06-13 16:45:28 -07:00
|
|
|
let (mut event_stream, mut sink) = socket.events().await;
|
2025-06-06 16:39:31 -07:00
|
|
|
|
|
|
|
|
// We won't know if the websocket is dead until we try to send a message, so time out waiting for
|
|
|
|
|
// a message every 30 seconds.
|
2025-06-13 16:45:28 -07:00
|
|
|
let mut timeout = tokio::time::interval(Duration::from_secs(10));
|
2025-05-14 17:39:23 -07:00
|
|
|
timeout.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
|
|
|
|
|
|
|
|
|
|
// First tick will happen immediately
|
|
|
|
|
timeout.tick().await;
|
2025-06-06 16:39:31 -07:00
|
|
|
|
2025-06-13 16:45:28 -07:00
|
|
|
// Track when the last ping was sent so we know when to give up
|
|
|
|
|
// waiting for the corresponding pong.
|
|
|
|
|
let mut ping_sent_at: Option<Instant> = None;
|
|
|
|
|
|
2025-05-14 17:39:23 -07:00
|
|
|
loop {
|
|
|
|
|
tokio::select! {
|
|
|
|
|
Some(result) = event_stream.next() => {
|
|
|
|
|
match result {
|
2025-06-13 16:45:28 -07:00
|
|
|
Ok(socket_event) => {
|
|
|
|
|
match socket_event {
|
|
|
|
|
kordophone::api::event_socket::SocketEvent::Update(event) => {
|
|
|
|
|
self.handle_update(event).await;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
kordophone::api::event_socket::SocketEvent::Pong => {
|
|
|
|
|
log::debug!(target: target::UPDATES, "Received websocket pong");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if self.first_connection {
|
|
|
|
|
self.event_sender.send(Event::UpdateStreamReconnected).await.unwrap();
|
|
|
|
|
self.first_connection = false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Any successfully handled message (update or pong) keeps the connection alive.
|
|
|
|
|
ping_sent_at = None;
|
2025-05-14 17:39:23 -07:00
|
|
|
timeout.reset();
|
|
|
|
|
}
|
|
|
|
|
Err(e) => {
|
|
|
|
|
log::error!("Error in event stream: {}", e);
|
2025-06-13 19:01:00 -07:00
|
|
|
self.first_connection = true;
|
2025-05-14 17:39:23 -07:00
|
|
|
break; // Break inner loop to reconnect
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2025-06-13 16:45:28 -07:00
|
|
|
|
2025-05-14 17:39:23 -07:00
|
|
|
_ = timeout.tick() => {
|
2025-06-13 16:45:28 -07:00
|
|
|
// If we previously sent a ping and haven't heard back since the timeout, we'll assume the connection is dead.
|
|
|
|
|
if let Some(_) = ping_sent_at {
|
|
|
|
|
log::error!(target: target::UPDATES, "Ping timed out. Restarting stream.");
|
|
|
|
|
self.first_connection = true;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
log::debug!("Sending websocket ping on timer");
|
|
|
|
|
match sink.send(SinkMessage::Ping).await {
|
|
|
|
|
Ok(_) => {
|
|
|
|
|
ping_sent_at = Some(Instant::now());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Err(e) => {
|
|
|
|
|
log::error!(target: target::UPDATES, "Error writing ping to event socket: {}, restarting stream.", e);
|
|
|
|
|
self.first_connection = true;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
2025-05-14 17:39:23 -07:00
|
|
|
}
|
2025-06-13 17:11:29 -07:00
|
|
|
|
|
|
|
|
Some(command) = self.command_rx.recv() => {
|
|
|
|
|
match command {
|
|
|
|
|
UpdateMonitorCommand::Restart => {
|
|
|
|
|
log::info!(target: target::UPDATES, "Restarting update monitor");
|
2025-06-13 19:01:00 -07:00
|
|
|
self.first_connection = true;
|
2025-06-13 17:11:29 -07:00
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2025-05-14 17:39:23 -07:00
|
|
|
}
|
2025-05-01 20:36:43 -07:00
|
|
|
}
|
2025-06-06 16:39:31 -07:00
|
|
|
|
2025-05-14 17:39:23 -07:00
|
|
|
// Add a small delay before reconnecting to avoid tight reconnection loops
|
|
|
|
|
tokio::time::sleep(Duration::from_secs(1)).await;
|
2025-05-01 20:36:43 -07:00
|
|
|
}
|
|
|
|
|
}
|
2025-06-06 16:39:31 -07:00
|
|
|
}
|