From dece6f1abc0c1a45dc4bf3f2c593f0080936f4f3 Mon Sep 17 00:00:00 2001 From: James Magahern Date: Fri, 13 Jun 2025 16:45:28 -0700 Subject: [PATCH] daemon: update monitor: implements ping/pong (required server changes) --- kordophone/src/api/event_socket.rs | 22 +++++- kordophone/src/api/http_client.rs | 71 +++++++++++++------ kordophone/src/model/update.rs | 4 ++ kordophone/src/tests/test_client.rs | 6 +- .../net.buzzert.kordophonecd.Server.xml | 5 ++ kordophoned/src/daemon/events.rs | 3 + kordophoned/src/daemon/mod.rs | 33 ++++++--- kordophoned/src/daemon/signals.rs | 3 + kordophoned/src/daemon/update_monitor.rs | 57 ++++++++++++--- kordophoned/src/dbus/mod.rs | 1 + kordophoned/src/main.rs | 10 +++ kpcli/src/client/mod.rs | 42 +++++++---- 12 files changed, 202 insertions(+), 55 deletions(-) diff --git a/kordophone/src/api/event_socket.rs b/kordophone/src/api/event_socket.rs index 636677d..31ae740 100644 --- a/kordophone/src/api/event_socket.rs +++ b/kordophone/src/api/event_socket.rs @@ -2,15 +2,31 @@ use crate::model::event::Event; use crate::model::update::UpdateItem; use async_trait::async_trait; use futures_util::stream::Stream; +use futures_util::Sink; + +#[derive(Debug, Eq, PartialEq, Clone)] +pub enum SinkMessage { + Ping, +} + +pub enum SocketUpdate { + Update(Vec), + Pong, +} + +pub enum SocketEvent { + Update(Event), + Pong, +} #[async_trait] pub trait EventSocket { type Error; - type EventStream: Stream>; - type UpdateStream: Stream, Self::Error>>; + type EventStream: Stream>; + type UpdateStream: Stream>; /// Modern event pipeline - async fn events(self) -> Self::EventStream; + async fn events(self) -> (Self::EventStream, impl Sink); /// Raw update items from the v1 API. async fn raw_updates(self) -> Self::UpdateStream; diff --git a/kordophone/src/api/http_client.rs b/kordophone/src/api/http_client.rs index c604c6e..0f379f2 100644 --- a/kordophone/src/api/http_client.rs +++ b/kordophone/src/api/http_client.rs @@ -3,10 +3,9 @@ extern crate serde; use std::{path::PathBuf, pin::Pin, str, task::Poll}; -use crate::api::event_socket::EventSocket; +use crate::api::event_socket::{EventSocket, SinkMessage, SocketEvent, SocketUpdate}; use crate::api::AuthenticationStore; use bytes::Bytes; -use hyper::body::HttpBody; use hyper::{Body, Client, Method, Request, Uri}; use async_trait::async_trait; @@ -16,7 +15,7 @@ use tokio::net::TcpStream; use futures_util::stream::{BoxStream, Stream}; use futures_util::task::Context; -use futures_util::{StreamExt, TryStreamExt}; +use futures_util::{Sink, SinkExt, StreamExt, TryStreamExt}; use tokio_tungstenite::connect_async; use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; @@ -49,6 +48,7 @@ pub enum Error { HTTPError(hyper::Error), SerdeError(serde_json::Error), DecodeError(String), + PongError(tungstenite::Error), Unauthorized, } @@ -124,34 +124,44 @@ impl AuthSetting for hyper::http::Request { } } +type WebsocketSink = futures_util::stream::SplitSink>, tungstenite::Message>; +type WebsocketStream = futures_util::stream::SplitStream>>; + pub struct WebsocketEventSocket { - socket: WebSocketStream>, + sink: Option, + stream: WebsocketStream } impl WebsocketEventSocket { pub fn new(socket: WebSocketStream>) -> Self { - Self { socket } + let (sink, stream) = socket.split(); + + Self { sink: Some(sink), stream } } } impl WebsocketEventSocket { - fn raw_update_stream(self) -> impl Stream, Error>> { - let (_, stream) = self.socket.split(); - - stream + fn raw_update_stream(self) -> impl Stream> { + self.stream .map_err(Error::from) .try_filter_map(|msg| async move { match msg { tungstenite::Message::Text(text) => { - serde_json::from_str::>(&text) - .map(Some) - .map_err(Error::from) + match serde_json::from_str::>(&text) { + Ok(updates) => Ok(Some(SocketUpdate::Update(updates))), + Err(e) => { + log::error!("Error parsing update: {:?}", e); + Err(Error::from(e)) + } + } } tungstenite::Message::Ping(_) => { - // Borrowing issue here with the sink, need to handle pings at the client level (whomever - // is consuming these updateitems, should be a union type of updateitem | ping). + // We don't expect the server to send us pings. Ok(None) } + tungstenite::Message::Pong(_) => { + Ok(Some(SocketUpdate::Pong)) + } tungstenite::Message::Close(_) => { // Connection was closed cleanly Err(Error::ClientError("WebSocket connection closed".into())) @@ -165,16 +175,37 @@ impl WebsocketEventSocket { #[async_trait] impl EventSocket for WebsocketEventSocket { type Error = Error; - type EventStream = BoxStream<'static, Result>; - type UpdateStream = BoxStream<'static, Result, Error>>; + type EventStream = BoxStream<'static, Result>; + type UpdateStream = BoxStream<'static, Result>; - async fn events(self) -> Self::EventStream { + async fn events(mut self) -> (Self::EventStream, impl Sink) { use futures_util::stream::iter; - self.raw_update_stream() - .map_ok(|updates| iter(updates.into_iter().map(|update| Ok(Event::from(update))))) + let sink = self.sink.take().unwrap().with(|f| { + match f { + SinkMessage::Ping => futures_util::future::ready(Ok::(tungstenite::Message::Ping(Bytes::new()))) + } + }); + + let stream = self.raw_update_stream() + .map_ok(|updates| -> BoxStream<'static, Result> { + match updates { + SocketUpdate::Update(updates) => { + let iter_stream = iter( + updates.into_iter().map(|u| Ok(SocketEvent::Update(Event::from(u)))) + ); + iter_stream.boxed() + } + SocketUpdate::Pong => { + iter(std::iter::once(Ok(SocketEvent::Pong))).boxed() + } + } + }) .try_flatten() - .boxed() + .boxed(); + + + (stream, sink) } async fn raw_updates(self) -> Self::UpdateStream { diff --git a/kordophone/src/model/update.rs b/kordophone/src/model/update.rs index 69889fa..e92f857 100644 --- a/kordophone/src/model/update.rs +++ b/kordophone/src/model/update.rs @@ -12,6 +12,9 @@ pub struct UpdateItem { #[serde(rename = "message")] pub message: Option, + + #[serde(default)] + pub pong: bool, } impl Default for UpdateItem { @@ -20,6 +23,7 @@ impl Default for UpdateItem { seq: 0, conversation: None, message: None, + pong: false, } } } diff --git a/kordophone/src/tests/test_client.rs b/kordophone/src/tests/test_client.rs index 122e74c..c514bbd 100644 --- a/kordophone/src/tests/test_client.rs +++ b/kordophone/src/tests/test_client.rs @@ -6,7 +6,7 @@ use uuid::Uuid; pub use crate::APIInterface; use crate::{ - api::event_socket::EventSocket, + api::event_socket::{EventSocket, SinkMessage}, api::http_client::Credentials, model::{ Conversation, ConversationID, Event, JwtToken, Message, MessageID, OutgoingMessage, @@ -63,6 +63,10 @@ impl EventSocket for TestEventSocket { let results: Vec, TestError>> = vec![]; futures_util::stream::iter(results.into_iter()).boxed() } + + fn get_sink(&mut self) -> impl futures_util::Sink { + todo!("") + } } #[async_trait] diff --git a/kordophoned/include/net.buzzert.kordophonecd.Server.xml b/kordophoned/include/net.buzzert.kordophonecd.Server.xml index 8e93ca5..a0898e3 100644 --- a/kordophoned/include/net.buzzert.kordophonecd.Server.xml +++ b/kordophoned/include/net.buzzert.kordophonecd.Server.xml @@ -103,6 +103,11 @@ value="Emitted when the list of messages is updated."/> + + + + diff --git a/kordophoned/src/daemon/events.rs b/kordophoned/src/daemon/events.rs index d14f371..56c082a 100644 --- a/kordophoned/src/daemon/events.rs +++ b/kordophoned/src/daemon/events.rs @@ -26,6 +26,9 @@ pub enum Event { /// Asynchronous event for syncing a single conversation with the server. SyncConversation(String, Reply<()>), + /// Sent when the update stream is reconnected after a timeout or configuration change. + UpdateStreamReconnected, + /// Returns all known conversations from the database. /// Parameters: /// - limit: The maximum number of conversations to return. (-1 for no limit) diff --git a/kordophoned/src/daemon/mod.rs b/kordophoned/src/daemon/mod.rs index 7606b90..b673f9b 100644 --- a/kordophoned/src/daemon/mod.rs +++ b/kordophoned/src/daemon/mod.rs @@ -163,6 +163,17 @@ impl Daemon { } } + fn spawn_conversation_list_sync(&mut self) { + let mut db_clone = self.database.clone(); + let signal_sender = self.signal_sender.clone(); + self.runtime.spawn(async move { + let result = Self::sync_conversation_list(&mut db_clone, &signal_sender).await; + if let Err(e) = result { + log::error!(target: target::SYNC, "Error handling sync event: {}", e); + } + }); + } + async fn handle_event(&mut self, event: Event) { match event { Event::GetVersion(reply) => { @@ -170,14 +181,7 @@ impl Daemon { } Event::SyncConversationList(reply) => { - let mut db_clone = self.database.clone(); - let signal_sender = self.signal_sender.clone(); - self.runtime.spawn(async move { - let result = Self::sync_conversation_list(&mut db_clone, &signal_sender).await; - if let Err(e) = result { - log::error!(target: target::SYNC, "Error handling sync event: {}", e); - } - }); + self.spawn_conversation_list_sync(); // This is a background operation, so return right away. reply.send(()).unwrap(); @@ -216,6 +220,19 @@ impl Daemon { reply.send(()).unwrap(); } + Event::UpdateStreamReconnected => { + log::info!(target: target::UPDATES, "Update stream reconnected"); + + // The ui client will respond differently, but we'll almost certainly want to do a sync-list in response to this. + self.spawn_conversation_list_sync(); + + // Send signal to the client that the update stream has been reconnected. + self.signal_sender + .send(Signal::UpdateStreamReconnected) + .await + .unwrap(); + } + Event::GetAllConversations(limit, offset, reply) => { let conversations = self.get_conversations_limit_offset(limit, offset).await; reply.send(conversations).unwrap(); diff --git a/kordophoned/src/daemon/signals.rs b/kordophoned/src/daemon/signals.rs index 6fc5cd2..d2a4cfa 100644 --- a/kordophoned/src/daemon/signals.rs +++ b/kordophoned/src/daemon/signals.rs @@ -18,4 +18,7 @@ pub enum Signal { /// - upload_guid: The GUID of the upload. /// - attachment_guid: The GUID of the attachment on the server. AttachmentUploaded(String, String), + + /// Emitted when the update stream is reconnected after a timeout or configuration change. + UpdateStreamReconnected, } diff --git a/kordophoned/src/daemon/update_monitor.rs b/kordophoned/src/daemon/update_monitor.rs index 184832a..39ee875 100644 --- a/kordophoned/src/daemon/update_monitor.rs +++ b/kordophoned/src/daemon/update_monitor.rs @@ -3,7 +3,8 @@ use crate::daemon::{ target, Daemon, DaemonResult, }; -use kordophone::api::event_socket::EventSocket; +use futures_util::SinkExt; +use kordophone::api::event_socket::{EventSocket, SinkMessage}; use kordophone::model::event::Event as UpdateEvent; use kordophone::model::event::EventData as UpdateEventData; use kordophone::APIInterface; @@ -22,6 +23,7 @@ pub struct UpdateMonitor { event_sender: Sender, last_sync_times: HashMap, update_seq: Option, + first_connection: bool, } impl UpdateMonitor { @@ -31,6 +33,7 @@ impl UpdateMonitor { event_sender, last_sync_times: HashMap::new(), update_seq: None, + first_connection: false, // optimistic assumption that we're not reconnecting the first time. } } @@ -48,8 +51,6 @@ impl UpdateMonitor { } async fn handle_update(&mut self, update: UpdateEvent) { - self.update_seq = Some(update.update_seq); - match update.data { UpdateEventData::ConversationChanged(conversation) => { log::info!(target: target::UPDATES, "Conversation changed: {:?}", conversation); @@ -134,24 +135,42 @@ impl UpdateMonitor { }; log::debug!(target: target::UPDATES, "Starting event stream"); - let mut event_stream = socket.events().await; + let (mut event_stream, mut sink) = socket.events().await; // We won't know if the websocket is dead until we try to send a message, so time out waiting for // a message every 30 seconds. - let mut timeout = tokio::time::interval(Duration::from_secs(30)); + let mut timeout = tokio::time::interval(Duration::from_secs(10)); timeout.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); // First tick will happen immediately timeout.tick().await; + // Track when the last ping was sent so we know when to give up + // waiting for the corresponding pong. + let mut ping_sent_at: Option = None; + loop { tokio::select! { Some(result) = event_stream.next() => { match result { - Ok(event) => { - self.handle_update(event).await; + Ok(socket_event) => { + match socket_event { + kordophone::api::event_socket::SocketEvent::Update(event) => { + self.handle_update(event).await; + } - // Reset the timeout since we got a message + kordophone::api::event_socket::SocketEvent::Pong => { + log::debug!(target: target::UPDATES, "Received websocket pong"); + } + } + + if self.first_connection { + self.event_sender.send(Event::UpdateStreamReconnected).await.unwrap(); + self.first_connection = false; + } + + // Any successfully handled message (update or pong) keeps the connection alive. + ping_sent_at = None; timeout.reset(); } Err(e) => { @@ -160,9 +179,27 @@ impl UpdateMonitor { } } } + _ = timeout.tick() => { - log::warn!("No messages received for 30 seconds, reconnecting..."); - break; // Break inner loop to reconnect + // If we previously sent a ping and haven't heard back since the timeout, we'll assume the connection is dead. + if let Some(_) = ping_sent_at { + log::error!(target: target::UPDATES, "Ping timed out. Restarting stream."); + self.first_connection = true; + break; + } + + log::debug!("Sending websocket ping on timer"); + match sink.send(SinkMessage::Ping).await { + Ok(_) => { + ping_sent_at = Some(Instant::now()); + } + + Err(e) => { + log::error!(target: target::UPDATES, "Error writing ping to event socket: {}, restarting stream.", e); + self.first_connection = true; + break; + } + } } } } diff --git a/kordophoned/src/dbus/mod.rs b/kordophoned/src/dbus/mod.rs index 2cf6189..4edc275 100644 --- a/kordophoned/src/dbus/mod.rs +++ b/kordophoned/src/dbus/mod.rs @@ -14,5 +14,6 @@ pub mod interface { pub use crate::interface::NetBuzzertKordophoneRepositoryMessagesUpdated as MessagesUpdated; pub use crate::interface::NetBuzzertKordophoneRepositoryAttachmentDownloadCompleted as AttachmentDownloadCompleted; pub use crate::interface::NetBuzzertKordophoneRepositoryAttachmentUploadCompleted as AttachmentUploadCompleted; + pub use crate::interface::NetBuzzertKordophoneRepositoryUpdateStreamReconnected as UpdateStreamReconnected; } } diff --git a/kordophoned/src/main.rs b/kordophoned/src/main.rs index 18228d0..59db0fb 100644 --- a/kordophoned/src/main.rs +++ b/kordophoned/src/main.rs @@ -118,6 +118,16 @@ async fn main() { 0 }); } + + Signal::UpdateStreamReconnected => { + log::debug!("Sending signal: UpdateStreamReconnected"); + dbus_registry + .send_signal(interface::OBJECT_PATH, DbusSignals::UpdateStreamReconnected {}) + .unwrap_or_else(|_| { + log::error!("Failed to send signal"); + 0 + }); + } } } }); diff --git a/kpcli/src/client/mod.rs b/kpcli/src/client/mod.rs index edfece7..8709d89 100644 --- a/kpcli/src/client/mod.rs +++ b/kpcli/src/client/mod.rs @@ -1,4 +1,4 @@ -use kordophone::api::event_socket::EventSocket; +use kordophone::api::event_socket::{EventSocket, SocketEvent, SocketUpdate}; use kordophone::api::http_client::Credentials; use kordophone::api::http_client::HTTPAPIClient; use kordophone::api::InMemoryAuthenticationStore; @@ -110,17 +110,24 @@ impl ClientCli { pub async fn print_events(&mut self) -> Result<()> { let socket = self.api.open_event_socket(None).await?; - let mut stream = socket.events().await; - while let Some(Ok(event)) = stream.next().await { - match event.data { - EventData::ConversationChanged(conversation) => { - println!("Conversation changed: {}", conversation.guid); + let (mut stream, _) = socket.events().await; + while let Some(Ok(socket_event)) = stream.next().await { + match socket_event { + SocketEvent::Update(event) => { + match event.data { + EventData::ConversationChanged(conversation) => { + println!("Conversation changed: {}", conversation.guid); + } + EventData::MessageReceived(conversation, message) => { + println!( + "Message received: msg: {} conversation: {}", + message.guid, conversation.guid + ); + } + } } - EventData::MessageReceived(conversation, message) => { - println!( - "Message received: msg: {} conversation: {}", - message.guid, conversation.guid - ); + SocketEvent::Pong => { + println!("Pong"); } } } @@ -132,8 +139,17 @@ impl ClientCli { println!("Listening for raw updates..."); let mut stream = socket.raw_updates().await; - while let Some(update) = stream.next().await { - println!("Got update: {:?}", update); + while let Some(Ok(update)) = stream.next().await { + match update { + SocketUpdate::Update(updates) => { + for update in updates { + println!("Got update: {:?}", update); + } + } + SocketUpdate::Pong => { + println!("Pong"); + } + } } Ok(())