Private
Public Access
1
0

daemon: update monitor: implements ping/pong (required server changes)

This commit is contained in:
2025-06-13 16:45:28 -07:00
parent 4f40be205d
commit dece6f1abc
12 changed files with 202 additions and 55 deletions

View File

@@ -2,15 +2,31 @@ use crate::model::event::Event;
use crate::model::update::UpdateItem; use crate::model::update::UpdateItem;
use async_trait::async_trait; use async_trait::async_trait;
use futures_util::stream::Stream; use futures_util::stream::Stream;
use futures_util::Sink;
#[derive(Debug, Eq, PartialEq, Clone)]
pub enum SinkMessage {
Ping,
}
pub enum SocketUpdate {
Update(Vec<UpdateItem>),
Pong,
}
pub enum SocketEvent {
Update(Event),
Pong,
}
#[async_trait] #[async_trait]
pub trait EventSocket { pub trait EventSocket {
type Error; type Error;
type EventStream: Stream<Item = Result<Event, Self::Error>>; type EventStream: Stream<Item = Result<SocketEvent, Self::Error>>;
type UpdateStream: Stream<Item = Result<Vec<UpdateItem>, Self::Error>>; type UpdateStream: Stream<Item = Result<SocketUpdate, Self::Error>>;
/// Modern event pipeline /// Modern event pipeline
async fn events(self) -> Self::EventStream; async fn events(self) -> (Self::EventStream, impl Sink<SinkMessage, Error = Self::Error>);
/// Raw update items from the v1 API. /// Raw update items from the v1 API.
async fn raw_updates(self) -> Self::UpdateStream; async fn raw_updates(self) -> Self::UpdateStream;

View File

@@ -3,10 +3,9 @@ extern crate serde;
use std::{path::PathBuf, pin::Pin, str, task::Poll}; use std::{path::PathBuf, pin::Pin, str, task::Poll};
use crate::api::event_socket::EventSocket; use crate::api::event_socket::{EventSocket, SinkMessage, SocketEvent, SocketUpdate};
use crate::api::AuthenticationStore; use crate::api::AuthenticationStore;
use bytes::Bytes; use bytes::Bytes;
use hyper::body::HttpBody;
use hyper::{Body, Client, Method, Request, Uri}; use hyper::{Body, Client, Method, Request, Uri};
use async_trait::async_trait; use async_trait::async_trait;
@@ -16,7 +15,7 @@ use tokio::net::TcpStream;
use futures_util::stream::{BoxStream, Stream}; use futures_util::stream::{BoxStream, Stream};
use futures_util::task::Context; use futures_util::task::Context;
use futures_util::{StreamExt, TryStreamExt}; use futures_util::{Sink, SinkExt, StreamExt, TryStreamExt};
use tokio_tungstenite::connect_async; use tokio_tungstenite::connect_async;
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
@@ -49,6 +48,7 @@ pub enum Error {
HTTPError(hyper::Error), HTTPError(hyper::Error),
SerdeError(serde_json::Error), SerdeError(serde_json::Error),
DecodeError(String), DecodeError(String),
PongError(tungstenite::Error),
Unauthorized, Unauthorized,
} }
@@ -124,34 +124,44 @@ impl<B> AuthSetting for hyper::http::Request<B> {
} }
} }
type WebsocketSink = futures_util::stream::SplitSink<WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>, tungstenite::Message>;
type WebsocketStream = futures_util::stream::SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>;
pub struct WebsocketEventSocket { pub struct WebsocketEventSocket {
socket: WebSocketStream<MaybeTlsStream<TcpStream>>, sink: Option<WebsocketSink>,
stream: WebsocketStream
} }
impl WebsocketEventSocket { impl WebsocketEventSocket {
pub fn new(socket: WebSocketStream<MaybeTlsStream<TcpStream>>) -> Self { pub fn new(socket: WebSocketStream<MaybeTlsStream<TcpStream>>) -> Self {
Self { socket } let (sink, stream) = socket.split();
Self { sink: Some(sink), stream }
} }
} }
impl WebsocketEventSocket { impl WebsocketEventSocket {
fn raw_update_stream(self) -> impl Stream<Item = Result<Vec<UpdateItem>, Error>> { fn raw_update_stream(self) -> impl Stream<Item = Result<SocketUpdate, Error>> {
let (_, stream) = self.socket.split(); self.stream
stream
.map_err(Error::from) .map_err(Error::from)
.try_filter_map(|msg| async move { .try_filter_map(|msg| async move {
match msg { match msg {
tungstenite::Message::Text(text) => { tungstenite::Message::Text(text) => {
serde_json::from_str::<Vec<UpdateItem>>(&text) match serde_json::from_str::<Vec<UpdateItem>>(&text) {
.map(Some) Ok(updates) => Ok(Some(SocketUpdate::Update(updates))),
.map_err(Error::from) Err(e) => {
log::error!("Error parsing update: {:?}", e);
Err(Error::from(e))
}
}
} }
tungstenite::Message::Ping(_) => { tungstenite::Message::Ping(_) => {
// Borrowing issue here with the sink, need to handle pings at the client level (whomever // We don't expect the server to send us pings.
// is consuming these updateitems, should be a union type of updateitem | ping).
Ok(None) Ok(None)
} }
tungstenite::Message::Pong(_) => {
Ok(Some(SocketUpdate::Pong))
}
tungstenite::Message::Close(_) => { tungstenite::Message::Close(_) => {
// Connection was closed cleanly // Connection was closed cleanly
Err(Error::ClientError("WebSocket connection closed".into())) Err(Error::ClientError("WebSocket connection closed".into()))
@@ -165,16 +175,37 @@ impl WebsocketEventSocket {
#[async_trait] #[async_trait]
impl EventSocket for WebsocketEventSocket { impl EventSocket for WebsocketEventSocket {
type Error = Error; type Error = Error;
type EventStream = BoxStream<'static, Result<Event, Error>>; type EventStream = BoxStream<'static, Result<SocketEvent, Error>>;
type UpdateStream = BoxStream<'static, Result<Vec<UpdateItem>, Error>>; type UpdateStream = BoxStream<'static, Result<SocketUpdate, Error>>;
async fn events(self) -> Self::EventStream { async fn events(mut self) -> (Self::EventStream, impl Sink<SinkMessage, Error = Self::Error>) {
use futures_util::stream::iter; use futures_util::stream::iter;
self.raw_update_stream() let sink = self.sink.take().unwrap().with(|f| {
.map_ok(|updates| iter(updates.into_iter().map(|update| Ok(Event::from(update))))) match f {
SinkMessage::Ping => futures_util::future::ready(Ok::<tungstenite::Message, Error>(tungstenite::Message::Ping(Bytes::new())))
}
});
let stream = self.raw_update_stream()
.map_ok(|updates| -> BoxStream<'static, Result<SocketEvent, Error>> {
match updates {
SocketUpdate::Update(updates) => {
let iter_stream = iter(
updates.into_iter().map(|u| Ok(SocketEvent::Update(Event::from(u))))
);
iter_stream.boxed()
}
SocketUpdate::Pong => {
iter(std::iter::once(Ok(SocketEvent::Pong))).boxed()
}
}
})
.try_flatten() .try_flatten()
.boxed() .boxed();
(stream, sink)
} }
async fn raw_updates(self) -> Self::UpdateStream { async fn raw_updates(self) -> Self::UpdateStream {

View File

@@ -12,6 +12,9 @@ pub struct UpdateItem {
#[serde(rename = "message")] #[serde(rename = "message")]
pub message: Option<Message>, pub message: Option<Message>,
#[serde(default)]
pub pong: bool,
} }
impl Default for UpdateItem { impl Default for UpdateItem {
@@ -20,6 +23,7 @@ impl Default for UpdateItem {
seq: 0, seq: 0,
conversation: None, conversation: None,
message: None, message: None,
pong: false,
} }
} }
} }

View File

@@ -6,7 +6,7 @@ use uuid::Uuid;
pub use crate::APIInterface; pub use crate::APIInterface;
use crate::{ use crate::{
api::event_socket::EventSocket, api::event_socket::{EventSocket, SinkMessage},
api::http_client::Credentials, api::http_client::Credentials,
model::{ model::{
Conversation, ConversationID, Event, JwtToken, Message, MessageID, OutgoingMessage, Conversation, ConversationID, Event, JwtToken, Message, MessageID, OutgoingMessage,
@@ -63,6 +63,10 @@ impl EventSocket for TestEventSocket {
let results: Vec<Result<Vec<UpdateItem>, TestError>> = vec![]; let results: Vec<Result<Vec<UpdateItem>, TestError>> = vec![];
futures_util::stream::iter(results.into_iter()).boxed() futures_util::stream::iter(results.into_iter()).boxed()
} }
fn get_sink(&mut self) -> impl futures_util::Sink<SinkMessage> {
todo!("")
}
} }
#[async_trait] #[async_trait]

View File

@@ -103,6 +103,11 @@
value="Emitted when the list of messages is updated."/> value="Emitted when the list of messages is updated."/>
</signal> </signal>
<signal name="UpdateStreamReconnected">
<annotation name="org.freedesktop.DBus.DocString"
value="Emitted when the update stream is reconnected after a timeout or configuration change."/>
</signal>
<!-- Attachments --> <!-- Attachments -->
<method name="GetAttachmentInfo"> <method name="GetAttachmentInfo">

View File

@@ -26,6 +26,9 @@ pub enum Event {
/// Asynchronous event for syncing a single conversation with the server. /// Asynchronous event for syncing a single conversation with the server.
SyncConversation(String, Reply<()>), SyncConversation(String, Reply<()>),
/// Sent when the update stream is reconnected after a timeout or configuration change.
UpdateStreamReconnected,
/// Returns all known conversations from the database. /// Returns all known conversations from the database.
/// Parameters: /// Parameters:
/// - limit: The maximum number of conversations to return. (-1 for no limit) /// - limit: The maximum number of conversations to return. (-1 for no limit)

View File

@@ -163,6 +163,17 @@ impl Daemon {
} }
} }
fn spawn_conversation_list_sync(&mut self) {
let mut db_clone = self.database.clone();
let signal_sender = self.signal_sender.clone();
self.runtime.spawn(async move {
let result = Self::sync_conversation_list(&mut db_clone, &signal_sender).await;
if let Err(e) = result {
log::error!(target: target::SYNC, "Error handling sync event: {}", e);
}
});
}
async fn handle_event(&mut self, event: Event) { async fn handle_event(&mut self, event: Event) {
match event { match event {
Event::GetVersion(reply) => { Event::GetVersion(reply) => {
@@ -170,14 +181,7 @@ impl Daemon {
} }
Event::SyncConversationList(reply) => { Event::SyncConversationList(reply) => {
let mut db_clone = self.database.clone(); self.spawn_conversation_list_sync();
let signal_sender = self.signal_sender.clone();
self.runtime.spawn(async move {
let result = Self::sync_conversation_list(&mut db_clone, &signal_sender).await;
if let Err(e) = result {
log::error!(target: target::SYNC, "Error handling sync event: {}", e);
}
});
// This is a background operation, so return right away. // This is a background operation, so return right away.
reply.send(()).unwrap(); reply.send(()).unwrap();
@@ -216,6 +220,19 @@ impl Daemon {
reply.send(()).unwrap(); reply.send(()).unwrap();
} }
Event::UpdateStreamReconnected => {
log::info!(target: target::UPDATES, "Update stream reconnected");
// The ui client will respond differently, but we'll almost certainly want to do a sync-list in response to this.
self.spawn_conversation_list_sync();
// Send signal to the client that the update stream has been reconnected.
self.signal_sender
.send(Signal::UpdateStreamReconnected)
.await
.unwrap();
}
Event::GetAllConversations(limit, offset, reply) => { Event::GetAllConversations(limit, offset, reply) => {
let conversations = self.get_conversations_limit_offset(limit, offset).await; let conversations = self.get_conversations_limit_offset(limit, offset).await;
reply.send(conversations).unwrap(); reply.send(conversations).unwrap();

View File

@@ -18,4 +18,7 @@ pub enum Signal {
/// - upload_guid: The GUID of the upload. /// - upload_guid: The GUID of the upload.
/// - attachment_guid: The GUID of the attachment on the server. /// - attachment_guid: The GUID of the attachment on the server.
AttachmentUploaded(String, String), AttachmentUploaded(String, String),
/// Emitted when the update stream is reconnected after a timeout or configuration change.
UpdateStreamReconnected,
} }

View File

@@ -3,7 +3,8 @@ use crate::daemon::{
target, Daemon, DaemonResult, target, Daemon, DaemonResult,
}; };
use kordophone::api::event_socket::EventSocket; use futures_util::SinkExt;
use kordophone::api::event_socket::{EventSocket, SinkMessage};
use kordophone::model::event::Event as UpdateEvent; use kordophone::model::event::Event as UpdateEvent;
use kordophone::model::event::EventData as UpdateEventData; use kordophone::model::event::EventData as UpdateEventData;
use kordophone::APIInterface; use kordophone::APIInterface;
@@ -22,6 +23,7 @@ pub struct UpdateMonitor {
event_sender: Sender<Event>, event_sender: Sender<Event>,
last_sync_times: HashMap<String, Instant>, last_sync_times: HashMap<String, Instant>,
update_seq: Option<u64>, update_seq: Option<u64>,
first_connection: bool,
} }
impl UpdateMonitor { impl UpdateMonitor {
@@ -31,6 +33,7 @@ impl UpdateMonitor {
event_sender, event_sender,
last_sync_times: HashMap::new(), last_sync_times: HashMap::new(),
update_seq: None, update_seq: None,
first_connection: false, // optimistic assumption that we're not reconnecting the first time.
} }
} }
@@ -48,8 +51,6 @@ impl UpdateMonitor {
} }
async fn handle_update(&mut self, update: UpdateEvent) { async fn handle_update(&mut self, update: UpdateEvent) {
self.update_seq = Some(update.update_seq);
match update.data { match update.data {
UpdateEventData::ConversationChanged(conversation) => { UpdateEventData::ConversationChanged(conversation) => {
log::info!(target: target::UPDATES, "Conversation changed: {:?}", conversation); log::info!(target: target::UPDATES, "Conversation changed: {:?}", conversation);
@@ -134,24 +135,42 @@ impl UpdateMonitor {
}; };
log::debug!(target: target::UPDATES, "Starting event stream"); log::debug!(target: target::UPDATES, "Starting event stream");
let mut event_stream = socket.events().await; let (mut event_stream, mut sink) = socket.events().await;
// We won't know if the websocket is dead until we try to send a message, so time out waiting for // We won't know if the websocket is dead until we try to send a message, so time out waiting for
// a message every 30 seconds. // a message every 30 seconds.
let mut timeout = tokio::time::interval(Duration::from_secs(30)); let mut timeout = tokio::time::interval(Duration::from_secs(10));
timeout.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); timeout.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
// First tick will happen immediately // First tick will happen immediately
timeout.tick().await; timeout.tick().await;
// Track when the last ping was sent so we know when to give up
// waiting for the corresponding pong.
let mut ping_sent_at: Option<Instant> = None;
loop { loop {
tokio::select! { tokio::select! {
Some(result) = event_stream.next() => { Some(result) = event_stream.next() => {
match result { match result {
Ok(event) => { Ok(socket_event) => {
self.handle_update(event).await; match socket_event {
kordophone::api::event_socket::SocketEvent::Update(event) => {
self.handle_update(event).await;
}
// Reset the timeout since we got a message kordophone::api::event_socket::SocketEvent::Pong => {
log::debug!(target: target::UPDATES, "Received websocket pong");
}
}
if self.first_connection {
self.event_sender.send(Event::UpdateStreamReconnected).await.unwrap();
self.first_connection = false;
}
// Any successfully handled message (update or pong) keeps the connection alive.
ping_sent_at = None;
timeout.reset(); timeout.reset();
} }
Err(e) => { Err(e) => {
@@ -160,9 +179,27 @@ impl UpdateMonitor {
} }
} }
} }
_ = timeout.tick() => { _ = timeout.tick() => {
log::warn!("No messages received for 30 seconds, reconnecting..."); // If we previously sent a ping and haven't heard back since the timeout, we'll assume the connection is dead.
break; // Break inner loop to reconnect if let Some(_) = ping_sent_at {
log::error!(target: target::UPDATES, "Ping timed out. Restarting stream.");
self.first_connection = true;
break;
}
log::debug!("Sending websocket ping on timer");
match sink.send(SinkMessage::Ping).await {
Ok(_) => {
ping_sent_at = Some(Instant::now());
}
Err(e) => {
log::error!(target: target::UPDATES, "Error writing ping to event socket: {}, restarting stream.", e);
self.first_connection = true;
break;
}
}
} }
} }
} }

View File

@@ -14,5 +14,6 @@ pub mod interface {
pub use crate::interface::NetBuzzertKordophoneRepositoryMessagesUpdated as MessagesUpdated; pub use crate::interface::NetBuzzertKordophoneRepositoryMessagesUpdated as MessagesUpdated;
pub use crate::interface::NetBuzzertKordophoneRepositoryAttachmentDownloadCompleted as AttachmentDownloadCompleted; pub use crate::interface::NetBuzzertKordophoneRepositoryAttachmentDownloadCompleted as AttachmentDownloadCompleted;
pub use crate::interface::NetBuzzertKordophoneRepositoryAttachmentUploadCompleted as AttachmentUploadCompleted; pub use crate::interface::NetBuzzertKordophoneRepositoryAttachmentUploadCompleted as AttachmentUploadCompleted;
pub use crate::interface::NetBuzzertKordophoneRepositoryUpdateStreamReconnected as UpdateStreamReconnected;
} }
} }

View File

@@ -118,6 +118,16 @@ async fn main() {
0 0
}); });
} }
Signal::UpdateStreamReconnected => {
log::debug!("Sending signal: UpdateStreamReconnected");
dbus_registry
.send_signal(interface::OBJECT_PATH, DbusSignals::UpdateStreamReconnected {})
.unwrap_or_else(|_| {
log::error!("Failed to send signal");
0
});
}
} }
} }
}); });

View File

@@ -1,4 +1,4 @@
use kordophone::api::event_socket::EventSocket; use kordophone::api::event_socket::{EventSocket, SocketEvent, SocketUpdate};
use kordophone::api::http_client::Credentials; use kordophone::api::http_client::Credentials;
use kordophone::api::http_client::HTTPAPIClient; use kordophone::api::http_client::HTTPAPIClient;
use kordophone::api::InMemoryAuthenticationStore; use kordophone::api::InMemoryAuthenticationStore;
@@ -110,17 +110,24 @@ impl ClientCli {
pub async fn print_events(&mut self) -> Result<()> { pub async fn print_events(&mut self) -> Result<()> {
let socket = self.api.open_event_socket(None).await?; let socket = self.api.open_event_socket(None).await?;
let mut stream = socket.events().await; let (mut stream, _) = socket.events().await;
while let Some(Ok(event)) = stream.next().await { while let Some(Ok(socket_event)) = stream.next().await {
match event.data { match socket_event {
EventData::ConversationChanged(conversation) => { SocketEvent::Update(event) => {
println!("Conversation changed: {}", conversation.guid); match event.data {
EventData::ConversationChanged(conversation) => {
println!("Conversation changed: {}", conversation.guid);
}
EventData::MessageReceived(conversation, message) => {
println!(
"Message received: msg: {} conversation: {}",
message.guid, conversation.guid
);
}
}
} }
EventData::MessageReceived(conversation, message) => { SocketEvent::Pong => {
println!( println!("Pong");
"Message received: msg: {} conversation: {}",
message.guid, conversation.guid
);
} }
} }
} }
@@ -132,8 +139,17 @@ impl ClientCli {
println!("Listening for raw updates..."); println!("Listening for raw updates...");
let mut stream = socket.raw_updates().await; let mut stream = socket.raw_updates().await;
while let Some(update) = stream.next().await { while let Some(Ok(update)) = stream.next().await {
println!("Got update: {:?}", update); match update {
SocketUpdate::Update(updates) => {
for update in updates {
println!("Got update: {:?}", update);
}
}
SocketUpdate::Pong => {
println!("Pong");
}
}
} }
Ok(()) Ok(())