cargo fmt
This commit is contained in:
@@ -1,24 +1,21 @@
|
||||
use crate::daemon::{
|
||||
Daemon,
|
||||
DaemonResult,
|
||||
|
||||
events::{Event, Reply},
|
||||
target,
|
||||
target, Daemon, DaemonResult,
|
||||
};
|
||||
|
||||
use kordophone::APIInterface;
|
||||
use kordophone::api::event_socket::EventSocket;
|
||||
use kordophone::model::event::Event as UpdateEvent;
|
||||
use kordophone::model::event::EventData as UpdateEventData;
|
||||
use kordophone::APIInterface;
|
||||
|
||||
use kordophone_db::database::Database;
|
||||
use kordophone_db::database::DatabaseAccess;
|
||||
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Mutex;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
pub struct UpdateMonitor {
|
||||
database: Arc<Mutex<Database>>,
|
||||
@@ -29,8 +26,8 @@ pub struct UpdateMonitor {
|
||||
|
||||
impl UpdateMonitor {
|
||||
pub fn new(database: Arc<Mutex<Database>>, event_sender: Sender<Event>) -> Self {
|
||||
Self {
|
||||
database,
|
||||
Self {
|
||||
database,
|
||||
event_sender,
|
||||
last_sync_times: HashMap::new(),
|
||||
update_seq: None,
|
||||
@@ -42,23 +39,24 @@ impl UpdateMonitor {
|
||||
make_event: impl FnOnce(Reply<T>) -> Event,
|
||||
) -> DaemonResult<T> {
|
||||
let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
|
||||
self.event_sender.send(make_event(reply_tx))
|
||||
self.event_sender
|
||||
.send(make_event(reply_tx))
|
||||
.await
|
||||
.map_err(|_| "Failed to send event")?;
|
||||
|
||||
|
||||
reply_rx.await.map_err(|_| "Failed to receive reply".into())
|
||||
}
|
||||
|
||||
|
||||
async fn handle_update(&mut self, update: UpdateEvent) {
|
||||
self.update_seq = Some(update.update_seq);
|
||||
|
||||
|
||||
match update.data {
|
||||
UpdateEventData::ConversationChanged(conversation) => {
|
||||
log::info!(target: target::UPDATES, "Conversation changed: {:?}", conversation);
|
||||
|
||||
// Check if we've synced this conversation recently (within 5 seconds)
|
||||
// This is currently a hack/workaround to prevent an infinite loop of sync events, because for some reason
|
||||
// imagent will post a conversation changed notification when we call getMessages.
|
||||
// imagent will post a conversation changed notification when we call getMessages.
|
||||
if let Some(last_sync) = self.last_sync_times.get(&conversation.guid) {
|
||||
if last_sync.elapsed() < Duration::from_secs(5) {
|
||||
log::info!(target: target::UPDATES, "Skipping sync for conversation id: {}. Last sync was {} seconds ago.",
|
||||
@@ -67,8 +65,12 @@ impl UpdateMonitor {
|
||||
}
|
||||
}
|
||||
|
||||
// This is the non-hacky path once we can reason about chat items with associatedMessageGUIDs (e.g., reactions).
|
||||
let last_message = self.database.with_repository(|r| r.get_last_message_for_conversation(&conversation.guid)).await.unwrap_or_default();
|
||||
// This is the non-hacky path once we can reason about chat items with associatedMessageGUIDs (e.g., reactions).
|
||||
let last_message = self
|
||||
.database
|
||||
.with_repository(|r| r.get_last_message_for_conversation(&conversation.guid))
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
match (&last_message, &conversation.last_message) {
|
||||
(Some(message), Some(conversation_message)) => {
|
||||
if message.id == conversation_message.guid {
|
||||
@@ -80,10 +82,12 @@ impl UpdateMonitor {
|
||||
};
|
||||
|
||||
// Update the last sync time and proceed with sync
|
||||
self.last_sync_times.insert(conversation.guid.clone(), Instant::now());
|
||||
|
||||
self.last_sync_times
|
||||
.insert(conversation.guid.clone(), Instant::now());
|
||||
|
||||
log::info!(target: target::UPDATES, "Syncing new messages for conversation id: {}", conversation.guid);
|
||||
self.send_event(|r| Event::SyncConversation(conversation.guid, r)).await
|
||||
self.send_event(|r| Event::SyncConversation(conversation.guid, r))
|
||||
.await
|
||||
.unwrap_or_else(|e| {
|
||||
log::error!("Failed to send daemon event: {}", e);
|
||||
});
|
||||
@@ -92,14 +96,15 @@ impl UpdateMonitor {
|
||||
UpdateEventData::MessageReceived(conversation, message) => {
|
||||
log::info!(target: target::UPDATES, "Message received: msgid:{:?}, convid:{:?}", message.guid, conversation.guid);
|
||||
log::info!(target: target::UPDATES, "Triggering message sync for conversation id: {}", conversation.guid);
|
||||
self.send_event(|r| Event::SyncConversation(conversation.guid, r)).await
|
||||
self.send_event(|r| Event::SyncConversation(conversation.guid, r))
|
||||
.await
|
||||
.unwrap_or_else(|e| {
|
||||
log::error!("Failed to send daemon event: {}", e);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
pub async fn run(&mut self) {
|
||||
use futures_util::stream::StreamExt;
|
||||
|
||||
@@ -130,15 +135,15 @@ impl UpdateMonitor {
|
||||
|
||||
log::debug!(target: target::UPDATES, "Starting event stream");
|
||||
let mut event_stream = socket.events().await;
|
||||
|
||||
// We won't know if the websocket is dead until we try to send a message, so time out waiting for
|
||||
// a message every 30 seconds.
|
||||
|
||||
// We won't know if the websocket is dead until we try to send a message, so time out waiting for
|
||||
// a message every 30 seconds.
|
||||
let mut timeout = tokio::time::interval(Duration::from_secs(30));
|
||||
timeout.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
|
||||
|
||||
// First tick will happen immediately
|
||||
timeout.tick().await;
|
||||
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
Some(result) = event_stream.next() => {
|
||||
@@ -161,9 +166,9 @@ impl UpdateMonitor {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Add a small delay before reconnecting to avoid tight reconnection loops
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user