2025-05-01 20:36:43 -07:00
|
|
|
use crate::daemon::{
|
|
|
|
|
Daemon,
|
|
|
|
|
DaemonResult,
|
|
|
|
|
|
|
|
|
|
events::{Event, Reply},
|
|
|
|
|
target,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
use kordophone::APIInterface;
|
|
|
|
|
use kordophone::api::event_socket::EventSocket;
|
|
|
|
|
use kordophone::model::event::Event as UpdateEvent;
|
|
|
|
|
|
|
|
|
|
use kordophone_db::database::Database;
|
2025-05-04 00:15:13 -07:00
|
|
|
use kordophone_db::database::DatabaseAccess;
|
2025-05-01 20:36:43 -07:00
|
|
|
|
|
|
|
|
use tokio::sync::mpsc::Sender;
|
|
|
|
|
use std::sync::Arc;
|
|
|
|
|
use tokio::sync::Mutex;
|
|
|
|
|
|
|
|
|
|
pub struct UpdateMonitor {
|
|
|
|
|
database: Arc<Mutex<Database>>,
|
|
|
|
|
event_sender: Sender<Event>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl UpdateMonitor {
|
|
|
|
|
pub fn new(database: Arc<Mutex<Database>>, event_sender: Sender<Event>) -> Self {
|
|
|
|
|
Self { database, event_sender }
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub async fn send_event<T>(
|
|
|
|
|
&self,
|
|
|
|
|
make_event: impl FnOnce(Reply<T>) -> Event,
|
|
|
|
|
) -> DaemonResult<T> {
|
|
|
|
|
let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
|
|
|
|
|
self.event_sender.send(make_event(reply_tx))
|
|
|
|
|
.await
|
|
|
|
|
.map_err(|_| "Failed to send event")?;
|
|
|
|
|
|
|
|
|
|
reply_rx.await.map_err(|_| "Failed to receive reply".into())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn handle_update(&mut self, update: UpdateEvent) {
|
|
|
|
|
match update {
|
|
|
|
|
UpdateEvent::ConversationChanged(conversation) => {
|
|
|
|
|
log::info!(target: target::UPDATES, "Conversation changed: {:?}", conversation);
|
2025-05-04 00:15:13 -07:00
|
|
|
|
|
|
|
|
// Weird. We can get in a loop because calling getMessages triggers a conversation changed
|
|
|
|
|
// event for some reason. Check to see if the change event says the last message id is the same
|
|
|
|
|
// as the last message id in the database. If so, skip the sync.
|
|
|
|
|
let last_message = self.database.with_repository(|r| r.get_last_message_for_conversation(&conversation.guid)).await.unwrap_or_default();
|
|
|
|
|
let should_sync_conversation = match (&last_message, &conversation.last_message) {
|
|
|
|
|
(Some(message), Some(conversation_message)) => {
|
|
|
|
|
if message.id == conversation_message.guid {
|
|
|
|
|
false
|
|
|
|
|
} else {
|
|
|
|
|
true
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
_ => true
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
if !should_sync_conversation {
|
|
|
|
|
log::info!(target: target::UPDATES, "Skipping sync for conversation id: {}. We already have this message.", conversation.guid);
|
|
|
|
|
return;
|
2025-05-03 18:19:48 -07:00
|
|
|
}
|
2025-05-04 00:15:13 -07:00
|
|
|
|
|
|
|
|
log::info!(target: target::UPDATES, "Syncing new messages for conversation id: {}, last message: {:?}", conversation.guid, last_message);
|
|
|
|
|
self.send_event(|r| Event::SyncConversation(conversation.guid, r)).await
|
|
|
|
|
.unwrap_or_else(|e| {
|
|
|
|
|
log::error!("Failed to send daemon event: {}", e);
|
|
|
|
|
});
|
2025-05-01 20:36:43 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
UpdateEvent::MessageReceived(conversation, message) => {
|
|
|
|
|
log::info!(target: target::UPDATES, "Message received: msgid:{:?}, convid:{:?}", message.guid, conversation.guid);
|
|
|
|
|
log::info!(target: target::UPDATES, "Triggering message sync for conversation id: {}", conversation.guid);
|
|
|
|
|
self.send_event(|r| Event::SyncConversation(conversation.guid, r)).await
|
|
|
|
|
.unwrap_or_else(|e| {
|
|
|
|
|
log::error!("Failed to send daemon event: {}", e);
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub async fn run(&mut self) {
|
|
|
|
|
use futures_util::stream::StreamExt;
|
|
|
|
|
|
|
|
|
|
log::info!(target: target::UPDATES, "Starting update monitor");
|
|
|
|
|
|
|
|
|
|
loop {
|
|
|
|
|
log::debug!(target: target::UPDATES, "Creating client");
|
|
|
|
|
let mut client = match Daemon::get_client_impl(&mut self.database).await {
|
|
|
|
|
Ok(client) => client,
|
|
|
|
|
Err(e) => {
|
|
|
|
|
log::error!("Failed to get client: {}", e);
|
|
|
|
|
log::warn!("Retrying in 5 seconds...");
|
|
|
|
|
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
log::debug!(target: target::UPDATES, "Opening event socket");
|
|
|
|
|
let socket = match client.open_event_socket().await {
|
|
|
|
|
Ok(events) => events,
|
|
|
|
|
Err(e) => {
|
|
|
|
|
log::warn!("Failed to open event socket: {}", e);
|
|
|
|
|
log::warn!("Retrying in 5 seconds...");
|
|
|
|
|
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
log::debug!(target: target::UPDATES, "Starting event stream");
|
|
|
|
|
let mut event_stream = socket.events().await;
|
|
|
|
|
while let Some(Ok(event)) = event_stream.next().await {
|
|
|
|
|
self.handle_update(event).await;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|