diff --git a/kordophone/src/api/http_client.rs b/kordophone/src/api/http_client.rs index c89b74f..7b7eaa9 100644 --- a/kordophone/src/api/http_client.rs +++ b/kordophone/src/api/http_client.rs @@ -12,7 +12,7 @@ use serde::{de::DeserializeOwned, Deserialize, Serialize}; use tokio::net::TcpStream; -use futures_util::{StreamExt, TryStreamExt}; +use futures_util::{SinkExt, StreamExt, TryStreamExt}; use futures_util::stream::{SplitStream, SplitSink, Stream}; use futures_util::stream::BoxStream; @@ -124,24 +124,21 @@ impl AuthSetting for hyper::http::Request { } } -type WebsocketSink = SplitSink>, tungstenite::Message>; -type WebsocketStream = SplitStream>>; - pub struct WebsocketEventSocket { - _sink: WebsocketSink, - stream: WebsocketStream, + socket: WebSocketStream>, } impl WebsocketEventSocket { pub fn new(socket: WebSocketStream>) -> Self { - let (sink, stream) = socket.split(); - Self { _sink: sink, stream } + Self { socket } } } impl WebsocketEventSocket { fn raw_update_stream(self) -> impl Stream, Error>> { - self.stream + let (_, stream) = self.socket.split(); + + stream .map_err(Error::from) .try_filter_map(|msg| async move { match msg { @@ -150,6 +147,15 @@ impl WebsocketEventSocket { .map(Some) .map_err(Error::from) } + tungstenite::Message::Ping(_) => { + // Borrowing issue here with the sink, need to handle pings at the client level (whomever + // is consuming these updateitems, should be a union type of updateitem | ping). + Ok(None) + } + tungstenite::Message::Close(_) => { + // Connection was closed cleanly + Err(Error::ClientError("WebSocket connection closed".into())) + } _ => Ok(None) } }) @@ -246,11 +252,16 @@ impl APIInterface for HTTPAPIClient { Ok(message) } - async fn open_event_socket(&mut self) -> Result { + async fn open_event_socket(&mut self, update_seq: Option) -> Result { use tungstenite::handshake::client::Request as TungsteniteRequest; use tungstenite::handshake::client::generate_key; - let uri = self.uri_for_endpoint("updates", Some(self.websocket_scheme())); + let endpoint = match update_seq { + Some(seq) => format!("updates?seq={}", seq), + None => "updates".to_string(), + }; + + let uri = self.uri_for_endpoint(&endpoint, Some(self.websocket_scheme())); log::debug!("Connecting to websocket: {:?}", uri); @@ -501,7 +512,7 @@ mod test { let mut client = local_mock_client(); // We just want to see if the connection is established, we won't wait for any events - let _ = client.open_event_socket().await.unwrap(); + let _ = client.open_event_socket(None).await.unwrap(); assert!(true); } } diff --git a/kordophone/src/api/mod.rs b/kordophone/src/api/mod.rs index d711948..e7fa0ae 100644 --- a/kordophone/src/api/mod.rs +++ b/kordophone/src/api/mod.rs @@ -46,5 +46,5 @@ pub trait APIInterface { async fn authenticate(&mut self, credentials: Credentials) -> Result; // (WS) /updates - async fn open_event_socket(&mut self) -> Result; + async fn open_event_socket(&mut self, update_seq: Option) -> Result; } diff --git a/kordophone/src/model/event.rs b/kordophone/src/model/event.rs index aca7cb1..f44e4c7 100644 --- a/kordophone/src/model/event.rs +++ b/kordophone/src/model/event.rs @@ -1,7 +1,13 @@ use crate::model::{Conversation, Message, UpdateItem}; #[derive(Debug, Clone)] -pub enum Event { +pub struct Event { + pub data: EventData, + pub update_seq: u64, +} + +#[derive(Debug, Clone)] +pub enum EventData { ConversationChanged(Conversation), MessageReceived(Conversation, Message), } @@ -9,8 +15,12 @@ pub enum Event { impl From for Event { fn from(update: UpdateItem) -> Self { match update { - UpdateItem { conversation: Some(conversation), message: None, .. } => Event::ConversationChanged(conversation), - UpdateItem { conversation: Some(conversation), message: Some(message), .. } => Event::MessageReceived(conversation, message), + UpdateItem { conversation: Some(conversation), message: None, .. } + => Event { data: EventData::ConversationChanged(conversation), update_seq: update.seq }, + + UpdateItem { conversation: Some(conversation), message: Some(message), .. } + => Event { data: EventData::MessageReceived(conversation, message), update_seq: update.seq }, + _ => panic!("Invalid update item: {:?}", update), } } diff --git a/kordophone/src/tests/test_client.rs b/kordophone/src/tests/test_client.rs index 4edd627..4d945ca 100644 --- a/kordophone/src/tests/test_client.rs +++ b/kordophone/src/tests/test_client.rs @@ -105,7 +105,7 @@ impl APIInterface for TestClient { Ok(message) } - async fn open_event_socket(&mut self) -> Result { + async fn open_event_socket(&mut self, _update_seq: Option) -> Result { Ok(TestEventSocket::new()) } } diff --git a/kordophoned/src/daemon/update_monitor.rs b/kordophoned/src/daemon/update_monitor.rs index 1888ad9..8447619 100644 --- a/kordophoned/src/daemon/update_monitor.rs +++ b/kordophoned/src/daemon/update_monitor.rs @@ -9,6 +9,7 @@ use crate::daemon::{ use kordophone::APIInterface; use kordophone::api::event_socket::EventSocket; use kordophone::model::event::Event as UpdateEvent; +use kordophone::model::event::EventData as UpdateEventData; use kordophone_db::database::Database; use kordophone_db::database::DatabaseAccess; @@ -23,6 +24,7 @@ pub struct UpdateMonitor { database: Arc>, event_sender: Sender, last_sync_times: HashMap, + update_seq: Option, } impl UpdateMonitor { @@ -31,6 +33,7 @@ impl UpdateMonitor { database, event_sender, last_sync_times: HashMap::new(), + update_seq: None, } } @@ -47,8 +50,10 @@ impl UpdateMonitor { } async fn handle_update(&mut self, update: UpdateEvent) { - match update { - UpdateEvent::ConversationChanged(conversation) => { + self.update_seq = Some(update.update_seq); + + match update.data { + UpdateEventData::ConversationChanged(conversation) => { log::info!(target: target::UPDATES, "Conversation changed: {:?}", conversation); // Check if we've synced this conversation recently (within 5 seconds) @@ -84,7 +89,7 @@ impl UpdateMonitor { }); } - UpdateEvent::MessageReceived(conversation, message) => { + UpdateEventData::MessageReceived(conversation, message) => { log::info!(target: target::UPDATES, "Message received: msgid:{:?}, convid:{:?}", message.guid, conversation.guid); log::info!(target: target::UPDATES, "Triggering message sync for conversation id: {}", conversation.guid); self.send_event(|r| Event::SyncConversation(conversation.guid, r)).await @@ -113,7 +118,7 @@ impl UpdateMonitor { }; log::debug!(target: target::UPDATES, "Opening event socket"); - let socket = match client.open_event_socket().await { + let socket = match client.open_event_socket(self.update_seq).await { Ok(events) => events, Err(e) => { log::warn!("Failed to open event socket: {}", e); @@ -125,9 +130,40 @@ impl UpdateMonitor { log::debug!(target: target::UPDATES, "Starting event stream"); let mut event_stream = socket.events().await; - while let Some(Ok(event)) = event_stream.next().await { - self.handle_update(event).await; + + // We won't know if the websocket is dead until we try to send a message, so time out waiting for + // a message every 30 seconds. + let mut timeout = tokio::time::interval(Duration::from_secs(30)); + timeout.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + // First tick will happen immediately + timeout.tick().await; + + loop { + tokio::select! { + Some(result) = event_stream.next() => { + match result { + Ok(event) => { + self.handle_update(event).await; + + // Reset the timeout since we got a message + timeout.reset(); + } + Err(e) => { + log::error!("Error in event stream: {}", e); + break; // Break inner loop to reconnect + } + } + } + _ = timeout.tick() => { + log::warn!("No messages received for 30 seconds, reconnecting..."); + break; // Break inner loop to reconnect + } + } } + + // Add a small delay before reconnecting to avoid tight reconnection loops + tokio::time::sleep(Duration::from_secs(1)).await; } } } \ No newline at end of file