websocket: automatically reconnect if not heard from for a while
This commit is contained in:
@@ -12,7 +12,7 @@ use serde::{de::DeserializeOwned, Deserialize, Serialize};
|
|||||||
|
|
||||||
use tokio::net::TcpStream;
|
use tokio::net::TcpStream;
|
||||||
|
|
||||||
use futures_util::{StreamExt, TryStreamExt};
|
use futures_util::{SinkExt, StreamExt, TryStreamExt};
|
||||||
use futures_util::stream::{SplitStream, SplitSink, Stream};
|
use futures_util::stream::{SplitStream, SplitSink, Stream};
|
||||||
use futures_util::stream::BoxStream;
|
use futures_util::stream::BoxStream;
|
||||||
|
|
||||||
@@ -124,24 +124,21 @@ impl<B> AuthSetting for hyper::http::Request<B> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type WebsocketSink = SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>;
|
|
||||||
type WebsocketStream = SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>;
|
|
||||||
|
|
||||||
pub struct WebsocketEventSocket {
|
pub struct WebsocketEventSocket {
|
||||||
_sink: WebsocketSink,
|
socket: WebSocketStream<MaybeTlsStream<TcpStream>>,
|
||||||
stream: WebsocketStream,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl WebsocketEventSocket {
|
impl WebsocketEventSocket {
|
||||||
pub fn new(socket: WebSocketStream<MaybeTlsStream<TcpStream>>) -> Self {
|
pub fn new(socket: WebSocketStream<MaybeTlsStream<TcpStream>>) -> Self {
|
||||||
let (sink, stream) = socket.split();
|
Self { socket }
|
||||||
Self { _sink: sink, stream }
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl WebsocketEventSocket {
|
impl WebsocketEventSocket {
|
||||||
fn raw_update_stream(self) -> impl Stream<Item = Result<Vec<UpdateItem>, Error>> {
|
fn raw_update_stream(self) -> impl Stream<Item = Result<Vec<UpdateItem>, Error>> {
|
||||||
self.stream
|
let (_, stream) = self.socket.split();
|
||||||
|
|
||||||
|
stream
|
||||||
.map_err(Error::from)
|
.map_err(Error::from)
|
||||||
.try_filter_map(|msg| async move {
|
.try_filter_map(|msg| async move {
|
||||||
match msg {
|
match msg {
|
||||||
@@ -150,6 +147,15 @@ impl WebsocketEventSocket {
|
|||||||
.map(Some)
|
.map(Some)
|
||||||
.map_err(Error::from)
|
.map_err(Error::from)
|
||||||
}
|
}
|
||||||
|
tungstenite::Message::Ping(_) => {
|
||||||
|
// Borrowing issue here with the sink, need to handle pings at the client level (whomever
|
||||||
|
// is consuming these updateitems, should be a union type of updateitem | ping).
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
tungstenite::Message::Close(_) => {
|
||||||
|
// Connection was closed cleanly
|
||||||
|
Err(Error::ClientError("WebSocket connection closed".into()))
|
||||||
|
}
|
||||||
_ => Ok(None)
|
_ => Ok(None)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
@@ -246,11 +252,16 @@ impl<K: AuthenticationStore + Send + Sync> APIInterface for HTTPAPIClient<K> {
|
|||||||
Ok(message)
|
Ok(message)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn open_event_socket(&mut self) -> Result<WebsocketEventSocket, Self::Error> {
|
async fn open_event_socket(&mut self, update_seq: Option<u64>) -> Result<WebsocketEventSocket, Self::Error> {
|
||||||
use tungstenite::handshake::client::Request as TungsteniteRequest;
|
use tungstenite::handshake::client::Request as TungsteniteRequest;
|
||||||
use tungstenite::handshake::client::generate_key;
|
use tungstenite::handshake::client::generate_key;
|
||||||
|
|
||||||
let uri = self.uri_for_endpoint("updates", Some(self.websocket_scheme()));
|
let endpoint = match update_seq {
|
||||||
|
Some(seq) => format!("updates?seq={}", seq),
|
||||||
|
None => "updates".to_string(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let uri = self.uri_for_endpoint(&endpoint, Some(self.websocket_scheme()));
|
||||||
|
|
||||||
log::debug!("Connecting to websocket: {:?}", uri);
|
log::debug!("Connecting to websocket: {:?}", uri);
|
||||||
|
|
||||||
@@ -501,7 +512,7 @@ mod test {
|
|||||||
let mut client = local_mock_client();
|
let mut client = local_mock_client();
|
||||||
|
|
||||||
// We just want to see if the connection is established, we won't wait for any events
|
// We just want to see if the connection is established, we won't wait for any events
|
||||||
let _ = client.open_event_socket().await.unwrap();
|
let _ = client.open_event_socket(None).await.unwrap();
|
||||||
assert!(true);
|
assert!(true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -46,5 +46,5 @@ pub trait APIInterface {
|
|||||||
async fn authenticate(&mut self, credentials: Credentials) -> Result<JwtToken, Self::Error>;
|
async fn authenticate(&mut self, credentials: Credentials) -> Result<JwtToken, Self::Error>;
|
||||||
|
|
||||||
// (WS) /updates
|
// (WS) /updates
|
||||||
async fn open_event_socket(&mut self) -> Result<impl EventSocket, Self::Error>;
|
async fn open_event_socket(&mut self, update_seq: Option<u64>) -> Result<impl EventSocket, Self::Error>;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,7 +1,13 @@
|
|||||||
use crate::model::{Conversation, Message, UpdateItem};
|
use crate::model::{Conversation, Message, UpdateItem};
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub enum Event {
|
pub struct Event {
|
||||||
|
pub data: EventData,
|
||||||
|
pub update_seq: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub enum EventData {
|
||||||
ConversationChanged(Conversation),
|
ConversationChanged(Conversation),
|
||||||
MessageReceived(Conversation, Message),
|
MessageReceived(Conversation, Message),
|
||||||
}
|
}
|
||||||
@@ -9,8 +15,12 @@ pub enum Event {
|
|||||||
impl From<UpdateItem> for Event {
|
impl From<UpdateItem> for Event {
|
||||||
fn from(update: UpdateItem) -> Self {
|
fn from(update: UpdateItem) -> Self {
|
||||||
match update {
|
match update {
|
||||||
UpdateItem { conversation: Some(conversation), message: None, .. } => Event::ConversationChanged(conversation),
|
UpdateItem { conversation: Some(conversation), message: None, .. }
|
||||||
UpdateItem { conversation: Some(conversation), message: Some(message), .. } => Event::MessageReceived(conversation, message),
|
=> Event { data: EventData::ConversationChanged(conversation), update_seq: update.seq },
|
||||||
|
|
||||||
|
UpdateItem { conversation: Some(conversation), message: Some(message), .. }
|
||||||
|
=> Event { data: EventData::MessageReceived(conversation, message), update_seq: update.seq },
|
||||||
|
|
||||||
_ => panic!("Invalid update item: {:?}", update),
|
_ => panic!("Invalid update item: {:?}", update),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -105,7 +105,7 @@ impl APIInterface for TestClient {
|
|||||||
Ok(message)
|
Ok(message)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn open_event_socket(&mut self) -> Result<impl EventSocket, Self::Error> {
|
async fn open_event_socket(&mut self, _update_seq: Option<u64>) -> Result<impl EventSocket, Self::Error> {
|
||||||
Ok(TestEventSocket::new())
|
Ok(TestEventSocket::new())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ use crate::daemon::{
|
|||||||
use kordophone::APIInterface;
|
use kordophone::APIInterface;
|
||||||
use kordophone::api::event_socket::EventSocket;
|
use kordophone::api::event_socket::EventSocket;
|
||||||
use kordophone::model::event::Event as UpdateEvent;
|
use kordophone::model::event::Event as UpdateEvent;
|
||||||
|
use kordophone::model::event::EventData as UpdateEventData;
|
||||||
|
|
||||||
use kordophone_db::database::Database;
|
use kordophone_db::database::Database;
|
||||||
use kordophone_db::database::DatabaseAccess;
|
use kordophone_db::database::DatabaseAccess;
|
||||||
@@ -23,6 +24,7 @@ pub struct UpdateMonitor {
|
|||||||
database: Arc<Mutex<Database>>,
|
database: Arc<Mutex<Database>>,
|
||||||
event_sender: Sender<Event>,
|
event_sender: Sender<Event>,
|
||||||
last_sync_times: HashMap<String, Instant>,
|
last_sync_times: HashMap<String, Instant>,
|
||||||
|
update_seq: Option<u64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl UpdateMonitor {
|
impl UpdateMonitor {
|
||||||
@@ -31,6 +33,7 @@ impl UpdateMonitor {
|
|||||||
database,
|
database,
|
||||||
event_sender,
|
event_sender,
|
||||||
last_sync_times: HashMap::new(),
|
last_sync_times: HashMap::new(),
|
||||||
|
update_seq: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -47,8 +50,10 @@ impl UpdateMonitor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_update(&mut self, update: UpdateEvent) {
|
async fn handle_update(&mut self, update: UpdateEvent) {
|
||||||
match update {
|
self.update_seq = Some(update.update_seq);
|
||||||
UpdateEvent::ConversationChanged(conversation) => {
|
|
||||||
|
match update.data {
|
||||||
|
UpdateEventData::ConversationChanged(conversation) => {
|
||||||
log::info!(target: target::UPDATES, "Conversation changed: {:?}", conversation);
|
log::info!(target: target::UPDATES, "Conversation changed: {:?}", conversation);
|
||||||
|
|
||||||
// Check if we've synced this conversation recently (within 5 seconds)
|
// Check if we've synced this conversation recently (within 5 seconds)
|
||||||
@@ -84,7 +89,7 @@ impl UpdateMonitor {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
UpdateEvent::MessageReceived(conversation, message) => {
|
UpdateEventData::MessageReceived(conversation, message) => {
|
||||||
log::info!(target: target::UPDATES, "Message received: msgid:{:?}, convid:{:?}", message.guid, conversation.guid);
|
log::info!(target: target::UPDATES, "Message received: msgid:{:?}, convid:{:?}", message.guid, conversation.guid);
|
||||||
log::info!(target: target::UPDATES, "Triggering message sync for conversation id: {}", conversation.guid);
|
log::info!(target: target::UPDATES, "Triggering message sync for conversation id: {}", conversation.guid);
|
||||||
self.send_event(|r| Event::SyncConversation(conversation.guid, r)).await
|
self.send_event(|r| Event::SyncConversation(conversation.guid, r)).await
|
||||||
@@ -113,7 +118,7 @@ impl UpdateMonitor {
|
|||||||
};
|
};
|
||||||
|
|
||||||
log::debug!(target: target::UPDATES, "Opening event socket");
|
log::debug!(target: target::UPDATES, "Opening event socket");
|
||||||
let socket = match client.open_event_socket().await {
|
let socket = match client.open_event_socket(self.update_seq).await {
|
||||||
Ok(events) => events,
|
Ok(events) => events,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
log::warn!("Failed to open event socket: {}", e);
|
log::warn!("Failed to open event socket: {}", e);
|
||||||
@@ -125,9 +130,40 @@ impl UpdateMonitor {
|
|||||||
|
|
||||||
log::debug!(target: target::UPDATES, "Starting event stream");
|
log::debug!(target: target::UPDATES, "Starting event stream");
|
||||||
let mut event_stream = socket.events().await;
|
let mut event_stream = socket.events().await;
|
||||||
while let Some(Ok(event)) = event_stream.next().await {
|
|
||||||
self.handle_update(event).await;
|
// We won't know if the websocket is dead until we try to send a message, so time out waiting for
|
||||||
|
// a message every 30 seconds.
|
||||||
|
let mut timeout = tokio::time::interval(Duration::from_secs(30));
|
||||||
|
timeout.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
|
||||||
|
|
||||||
|
// First tick will happen immediately
|
||||||
|
timeout.tick().await;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
Some(result) = event_stream.next() => {
|
||||||
|
match result {
|
||||||
|
Ok(event) => {
|
||||||
|
self.handle_update(event).await;
|
||||||
|
|
||||||
|
// Reset the timeout since we got a message
|
||||||
|
timeout.reset();
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
log::error!("Error in event stream: {}", e);
|
||||||
|
break; // Break inner loop to reconnect
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ = timeout.tick() => {
|
||||||
|
log::warn!("No messages received for 30 seconds, reconnecting...");
|
||||||
|
break; // Break inner loop to reconnect
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Add a small delay before reconnecting to avoid tight reconnection loops
|
||||||
|
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Reference in New Issue
Block a user