From f6ac3b5a5894b84485daf7380189394dad9c2d6b Mon Sep 17 00:00:00 2001 From: James Magahern Date: Thu, 1 May 2025 18:07:18 -0700 Subject: [PATCH] client: implements event/updates websocket --- Cargo.lock | 224 ++++++++++++++++++++++++++-- kordophone/Cargo.toml | 3 + kordophone/src/api/auth.rs | 45 ++++++ kordophone/src/api/event_socket.rs | 17 +++ kordophone/src/api/http_client.rs | 140 ++++++++++++++++- kordophone/src/api/mod.rs | 52 ++----- kordophone/src/model/event.rs | 17 +++ kordophone/src/model/mod.rs | 6 + kordophone/src/model/update.rs | 21 +++ kordophone/src/tests/test_client.rs | 40 ++++- kordophoned/src/dbus/server_impl.rs | 5 - kpcli/Cargo.toml | 2 + kpcli/src/client/mod.rs | 41 +++++ kpcli/src/main.rs | 15 ++ 14 files changed, 561 insertions(+), 67 deletions(-) create mode 100644 kordophone/src/api/auth.rs create mode 100644 kordophone/src/api/event_socket.rs create mode 100644 kordophone/src/model/event.rs create mode 100644 kordophone/src/model/update.rs diff --git a/Cargo.lock b/Cargo.lock index f29747f..bdbc538 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -181,6 +181,15 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf4b9d6a944f767f8e5e0db018570623c85f3d925ac718db4e06d0187adb21c1" +[[package]] +name = "block-buffer" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" +dependencies = [ + "generic-array", +] + [[package]] name = "bumpalo" version = "3.16.0" @@ -189,9 +198,9 @@ checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" [[package]] name = "bytes" -version = "1.6.0" +version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9" +checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" [[package]] name = "cc" @@ -297,6 +306,25 @@ version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" +[[package]] +name = "cpufeatures" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280" +dependencies = [ + "libc", +] + +[[package]] +name = "crypto-common" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" +dependencies = [ + "generic-array", + "typenum", +] + [[package]] name = "csv" version = "1.3.1" @@ -363,6 +391,12 @@ dependencies = [ "syn", ] +[[package]] +name = "data-encoding" +version = "2.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a2330da5de22e8a3cb63252ce2abb30116bf5265e89c0e01bc17015ce30a476" + [[package]] name = "dbus" version = "0.9.7" @@ -471,6 +505,16 @@ dependencies = [ "syn", ] +[[package]] +name = "digest" +version = "0.10.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" +dependencies = [ + "block-buffer", + "crypto-common", +] + [[package]] name = "directories" version = "6.0.0" @@ -657,12 +701,23 @@ checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ "futures-core", "futures-macro", + "futures-sink", "futures-task", "pin-project-lite", "pin-utils", "slab", ] +[[package]] +name = "generic-array" +version = "0.14.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" +dependencies = [ + "typenum", + "version_check", +] + [[package]] name = "getrandom" version = "0.2.14" @@ -671,7 +726,19 @@ checksum = "94b22e06ecb0110981051723910cbf0b5f5e09a2062dd7663334ee79a9d1286c" dependencies = [ "cfg-if", "libc", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", +] + +[[package]] +name = "getrandom" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73fea8450eea4bac3940448fb7ae50d91f034f941199fcd9d909a5a07aa455f0" +dependencies = [ + "cfg-if", + "libc", + "r-efi", + "wasi 0.14.2+wasi-0.2.4", ] [[package]] @@ -691,7 +758,7 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http", + "http 0.2.12", "indexmap", "slab", "tokio", @@ -743,6 +810,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4a85d31aea989eead29a3aaf9e1115a180df8282431156e533de47660892565" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http-body" version = "0.4.6" @@ -750,7 +828,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" dependencies = [ "bytes", - "http", + "http 0.2.12", "pin-project-lite", ] @@ -777,7 +855,7 @@ dependencies = [ "futures-core", "futures-util", "h2", - "http", + "http 0.2.12", "http-body", "httparse", "httpdate", @@ -907,6 +985,7 @@ dependencies = [ "chrono", "ctor", "env_logger", + "futures-util", "hyper", "hyper-tls", "log", @@ -915,6 +994,8 @@ dependencies = [ "serde_plain", "time", "tokio", + "tokio-tungstenite", + "tungstenite", "uuid", ] @@ -967,6 +1048,8 @@ dependencies = [ "dbus-codegen", "dbus-tree", "dotenv", + "env_logger", + "futures-util", "kordophone", "kordophone-db", "log", @@ -1083,7 +1166,7 @@ checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec" dependencies = [ "hermit-abi 0.3.9", "libc", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", "windows-sys 0.52.0", ] @@ -1297,6 +1380,12 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "r-efi" +version = "5.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5" + [[package]] name = "rand" version = "0.8.5" @@ -1304,8 +1393,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", - "rand_chacha", - "rand_core", + "rand_chacha 0.3.1", + "rand_core 0.6.4", +] + +[[package]] +name = "rand" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fbfd9d094a40bf3ae768db9361049ace4c0e04a4fd6b359518bd7b73a73dd97" +dependencies = [ + "rand_chacha 0.9.0", + "rand_core 0.9.3", ] [[package]] @@ -1315,7 +1414,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.6.4", +] + +[[package]] +name = "rand_chacha" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" +dependencies = [ + "ppv-lite86", + "rand_core 0.9.3", ] [[package]] @@ -1324,7 +1433,16 @@ version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" dependencies = [ - "getrandom", + "getrandom 0.2.14", +] + +[[package]] +name = "rand_core" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38" +dependencies = [ + "getrandom 0.3.2", ] [[package]] @@ -1342,7 +1460,7 @@ version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba009ff324d1fc1b900bd1fdb31564febe58a8ccc8a6fdbb93b543d33b13ca43" dependencies = [ - "getrandom", + "getrandom 0.2.14", "libredox", "thiserror 1.0.69", ] @@ -1353,7 +1471,7 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd6f9d3d47bdd2ad6945c5015a226ec6155d0bcdfd8f7cd29f86b71f8de99d2b" dependencies = [ - "getrandom", + "getrandom 0.2.14", "libredox", "thiserror 2.0.12", ] @@ -1505,6 +1623,17 @@ dependencies = [ "serde", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "signal-hook-registry" version = "1.4.1" @@ -1713,6 +1842,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.26.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a9daff607c6d2bf6c16fd681ccb7eecc83e4e2cdc1ca067ffaadfca5de7f084" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.7.10" @@ -1792,12 +1933,35 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.26.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4793cb5e56680ecbb1d843515b23b6de9a75eb04b66643e256a396d43be33c13" +dependencies = [ + "bytes", + "data-encoding", + "http 1.3.1", + "httparse", + "log", + "rand 0.9.1", + "sha1", + "thiserror 2.0.12", + "utf-8", +] + [[package]] name = "typed-arena" version = "2.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6af6ae20167a9ece4bcb41af5b80f8a1f1df981f6391189ce00fd257af04126a" +[[package]] +name = "typenum" +version = "1.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1dccffe3ce07af9386bfd29e80c0ab1a8205a2fc34e4bcd40364df902cfa8f3f" + [[package]] name = "unicode-ident" version = "1.0.12" @@ -1810,6 +1974,12 @@ version = "0.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7dd6e30e90baa6f72411720665d41d89b9a3d039dc45b8faea1ddd07f617f6af" +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "utf8parse" version = "0.2.2" @@ -1822,8 +1992,8 @@ version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a" dependencies = [ - "getrandom", - "rand", + "getrandom 0.2.14", + "rand 0.8.5", "uuid-macro-internal", ] @@ -1850,6 +2020,12 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191" +[[package]] +name = "version_check" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" + [[package]] name = "want" version = "0.3.1" @@ -1865,6 +2041,15 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasi" +version = "0.14.2+wasi-0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9683f9a5a998d873c0d21fcbe3c083009670149a8fab228644b8bd36b2c48cb3" +dependencies = [ + "wit-bindgen-rt", +] + [[package]] name = "wasm-bindgen" version = "0.2.95" @@ -2108,6 +2293,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "wit-bindgen-rt" +version = "0.39.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f42320e61fe2cfd34354ecb597f86f413484a798ba44a8ca1165c58d42da6c1" +dependencies = [ + "bitflags 2.5.0", +] + [[package]] name = "xml-rs" version = "0.8.25" diff --git a/kordophone/Cargo.toml b/kordophone/Cargo.toml index 4d0d2aa..9a64c68 100644 --- a/kordophone/Cargo.toml +++ b/kordophone/Cargo.toml @@ -11,6 +11,7 @@ base64 = "0.22.1" chrono = { version = "0.4.38", features = ["serde"] } ctor = "0.2.8" env_logger = "0.11.5" +futures-util = "0.3.31" hyper = { version = "0.14", features = ["full"] } hyper-tls = "0.5.0" log = { version = "0.4.21", features = [] } @@ -19,4 +20,6 @@ serde_json = "1.0.91" serde_plain = "1.0.2" time = { version = "0.3.17", features = ["parsing", "serde"] } tokio = { version = "1.37.0", features = ["full"] } +tokio-tungstenite = "0.26.2" +tungstenite = "0.26.2" uuid = { version = "1.6.1", features = ["v4", "fast-rng", "macro-diagnostics"] } diff --git a/kordophone/src/api/auth.rs b/kordophone/src/api/auth.rs new file mode 100644 index 0000000..d192c25 --- /dev/null +++ b/kordophone/src/api/auth.rs @@ -0,0 +1,45 @@ +use crate::api::Credentials; +use crate::api::JwtToken; +use async_trait::async_trait; + +#[async_trait] +pub trait AuthenticationStore { + async fn get_credentials(&mut self) -> Option; + async fn get_token(&mut self) -> Option; + async fn set_token(&mut self, token: JwtToken); +} + +pub struct InMemoryAuthenticationStore { + credentials: Option, + token: Option, +} + +impl Default for InMemoryAuthenticationStore { + fn default() -> Self { + Self::new(None) + } +} + +impl InMemoryAuthenticationStore { + pub fn new(credentials: Option) -> Self { + Self { + credentials, + token: None, + } + } +} + +#[async_trait] +impl AuthenticationStore for InMemoryAuthenticationStore { + async fn get_credentials(&mut self) -> Option { + self.credentials.clone() + } + + async fn get_token(&mut self) -> Option { + self.token.clone() + } + + async fn set_token(&mut self, token: JwtToken) { + self.token = Some(token); + } +} diff --git a/kordophone/src/api/event_socket.rs b/kordophone/src/api/event_socket.rs new file mode 100644 index 0000000..8896c3b --- /dev/null +++ b/kordophone/src/api/event_socket.rs @@ -0,0 +1,17 @@ +use async_trait::async_trait; +use crate::model::update::UpdateItem; +use crate::model::event::Event; +use futures_util::stream::Stream; + +#[async_trait] +pub trait EventSocket { + type Error; + type EventStream: Stream>; + type UpdateStream: Stream, Self::Error>>; + + /// Modern event pipeline + async fn events(self) -> Self::EventStream; + + /// Raw update items from the v1 API. + async fn raw_updates(self) -> Self::UpdateStream; +} \ No newline at end of file diff --git a/kordophone/src/api/http_client.rs b/kordophone/src/api/http_client.rs index 86c61de..e72af03 100644 --- a/kordophone/src/api/http_client.rs +++ b/kordophone/src/api/http_client.rs @@ -4,13 +4,25 @@ extern crate serde; use std::{path::PathBuf, str}; use crate::api::AuthenticationStore; +use crate::api::event_socket::EventSocket; use hyper::{Body, Client, Method, Request, Uri}; use async_trait::async_trait; use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use tokio::net::TcpStream; + +use futures_util::{StreamExt, TryStreamExt}; +use futures_util::stream::{SplitStream, SplitSink, TryFilterMap, MapErr, Stream}; +use futures_util::stream::Map; +use futures_util::stream::BoxStream; +use std::future::Future; + +use tokio_tungstenite::connect_async; +use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; + use crate::{ - model::{Conversation, ConversationID, JwtToken, Message, MessageID}, + model::{Conversation, ConversationID, JwtToken, Message, MessageID, UpdateItem, Event}, APIInterface }; @@ -63,6 +75,12 @@ impl From for Error { } } +impl From for Error { + fn from(err: tungstenite::Error) -> Error { + Error::ClientError(err.to_string()) + } +} + trait AuthBuilder { fn with_auth(self, token: &Option) -> Self; } @@ -90,6 +108,58 @@ impl AuthSetting for hyper::http::Request { } } +type WebsocketSink = SplitSink>, tungstenite::Message>; +type WebsocketStream = SplitStream>>; + +pub struct WebsocketEventSocket { + _sink: WebsocketSink, + stream: WebsocketStream, +} + +impl WebsocketEventSocket { + pub fn new(socket: WebSocketStream>) -> Self { + let (sink, stream) = socket.split(); + Self { _sink: sink, stream } + } +} + +impl WebsocketEventSocket { + fn raw_update_stream(self) -> impl Stream, Error>> { + self.stream + .map_err(Error::from) + .try_filter_map(|msg| async move { + match msg { + tungstenite::Message::Text(text) => { + serde_json::from_str::>(&text) + .map(Some) + .map_err(Error::from) + } + _ => Ok(None) + } + }) + } +} + +#[async_trait] +impl EventSocket for WebsocketEventSocket { + type Error = Error; + type EventStream = BoxStream<'static, Result>; + type UpdateStream = BoxStream<'static, Result, Error>>; + + async fn events(self) -> Self::EventStream { + use futures_util::stream::iter; + + self.raw_update_stream() + .map_ok(|updates| iter(updates.into_iter().map(|update| Ok(Event::from(update))))) + .try_flatten() + .boxed() + } + + async fn raw_updates(self) -> Self::UpdateStream { + self.raw_update_stream().boxed() + } +} + #[async_trait] impl APIInterface for HTTPAPIClient { type Error = Error; @@ -146,6 +216,44 @@ impl APIInterface for HTTPAPIClient { let messages: Vec = self.request(&endpoint, Method::GET).await?; Ok(messages) } + + async fn open_event_socket(&mut self) -> Result { + use tungstenite::http::StatusCode; + use tungstenite::handshake::client::Request as TungsteniteRequest; + use tungstenite::handshake::client::generate_key; + + let uri = self.uri_for_endpoint("updates", Some(self.websocket_scheme())); + + log::debug!("Connecting to websocket: {:?}", uri); + + let auth = self.auth_store.get_token().await; + let host = uri.authority().unwrap().host(); + let mut request = TungsteniteRequest::builder() + .header("Host", host) + .header("Connection", "Upgrade") + .header("Upgrade", "websocket") + .header("Sec-WebSocket-Version", "13") + .header("Sec-WebSocket-Key", generate_key()) + .uri(uri.to_string()) + .body(()) + .expect("Unable to build websocket request"); + + log::debug!("Websocket request: {:?}", request); + + if let Some(token) = &auth { + let header_value = token.to_header_value().to_str().unwrap().parse().unwrap(); // ugh + request.headers_mut().insert("Authorization", header_value); + } + + let (socket, response) = connect_async(request).await.unwrap(); + log::debug!("Websocket connected: {:?}", response.status()); + + if response.status() != StatusCode::SWITCHING_PROTOCOLS { + return Err(Error::ClientError("Websocket connection failed".into())); + } + + Ok(WebsocketEventSocket::new(socket)) + } } impl HTTPAPIClient { @@ -157,15 +265,27 @@ impl HTTPAPIClient { } } - fn uri_for_endpoint(&self, endpoint: &str) -> Uri { + fn uri_for_endpoint(&self, endpoint: &str, scheme: Option<&str>) -> Uri { let mut parts = self.base_url.clone().into_parts(); let root_path: PathBuf = parts.path_and_query.unwrap().path().into(); let path = root_path.join(endpoint); parts.path_and_query = Some(path.to_str().unwrap().parse().unwrap()); + if let Some(scheme) = scheme { + parts.scheme = Some(scheme.parse().unwrap()); + } + Uri::try_from(parts).unwrap() } + fn websocket_scheme(&self) -> &str { + if self.base_url.scheme().unwrap() == "https" { + "wss" + } else { + "ws" + } + } + async fn request(&mut self, endpoint: &str, method: Method) -> Result { self.request_with_body(endpoint, method, || { Body::empty() }).await } @@ -188,7 +308,7 @@ impl HTTPAPIClient { { use hyper::StatusCode; - let uri = self.uri_for_endpoint(endpoint); + let uri = self.uri_for_endpoint(endpoint, None); log::debug!("Requesting {:?} {:?}", method, uri); let build_request = move |auth: &Option| { @@ -320,4 +440,18 @@ mod test { let messages = client.get_messages(&conversation.guid, None, None, None).await.unwrap(); assert!(!messages.is_empty()); } + + #[tokio::test] + async fn test_updates() { + if !mock_client_is_reachable().await { + log::warn!("Skipping http_client tests (mock server not reachable)"); + return; + } + + let mut client = local_mock_client(); + + // We just want to see if the connection is established, we won't wait for any events + let _ = client.open_event_socket().await.unwrap(); + assert!(true); + } } diff --git a/kordophone/src/api/mod.rs b/kordophone/src/api/mod.rs index 159a657..5f35bfd 100644 --- a/kordophone/src/api/mod.rs +++ b/kordophone/src/api/mod.rs @@ -2,11 +2,18 @@ use async_trait::async_trait; pub use crate::model::{ Conversation, Message, ConversationID, MessageID, }; + +pub mod auth; +pub use crate::api::auth::{AuthenticationStore, InMemoryAuthenticationStore}; + use crate::model::JwtToken; pub mod http_client; pub use http_client::HTTPAPIClient; +pub mod event_socket; +pub use event_socket::EventSocket; + use self::http_client::Credentials; #[async_trait] @@ -30,46 +37,7 @@ pub trait APIInterface { // (POST) /authenticate async fn authenticate(&mut self, credentials: Credentials) -> Result; -} - -#[async_trait] -pub trait AuthenticationStore { - async fn get_credentials(&mut self) -> Option; - async fn get_token(&mut self) -> Option; - async fn set_token(&mut self, token: JwtToken); -} - -pub struct InMemoryAuthenticationStore { - credentials: Option, - token: Option, -} - -impl Default for InMemoryAuthenticationStore { - fn default() -> Self { - Self::new(None) - } -} - -impl InMemoryAuthenticationStore { - pub fn new(credentials: Option) -> Self { - Self { - credentials, - token: None, - } - } -} - -#[async_trait] -impl AuthenticationStore for InMemoryAuthenticationStore { - async fn get_credentials(&mut self) -> Option { - self.credentials.clone() - } - - async fn get_token(&mut self) -> Option { - self.token.clone() - } - - async fn set_token(&mut self, token: JwtToken) { - self.token = Some(token); - } + + // (WS) /updates + async fn open_event_socket(&mut self) -> Result; } diff --git a/kordophone/src/model/event.rs b/kordophone/src/model/event.rs new file mode 100644 index 0000000..aca7cb1 --- /dev/null +++ b/kordophone/src/model/event.rs @@ -0,0 +1,17 @@ +use crate::model::{Conversation, Message, UpdateItem}; + +#[derive(Debug, Clone)] +pub enum Event { + ConversationChanged(Conversation), + MessageReceived(Conversation, Message), +} + +impl From for Event { + fn from(update: UpdateItem) -> Self { + match update { + UpdateItem { conversation: Some(conversation), message: None, .. } => Event::ConversationChanged(conversation), + UpdateItem { conversation: Some(conversation), message: Some(message), .. } => Event::MessageReceived(conversation, message), + _ => panic!("Invalid update item: {:?}", update), + } + } +} \ No newline at end of file diff --git a/kordophone/src/model/mod.rs b/kordophone/src/model/mod.rs index f07d6f6..b54ce2e 100644 --- a/kordophone/src/model/mod.rs +++ b/kordophone/src/model/mod.rs @@ -1,5 +1,7 @@ pub mod conversation; +pub mod event; pub mod message; +pub mod update; pub use conversation::Conversation; pub use conversation::ConversationID; @@ -7,6 +9,10 @@ pub use conversation::ConversationID; pub use message::Message; pub use message::MessageID; +pub use update::UpdateItem; + +pub use event::Event; + pub mod jwt; pub use jwt::JwtToken; diff --git a/kordophone/src/model/update.rs b/kordophone/src/model/update.rs new file mode 100644 index 0000000..4d89896 --- /dev/null +++ b/kordophone/src/model/update.rs @@ -0,0 +1,21 @@ +use serde::Deserialize; +use super::conversation::Conversation; +use super::message::Message; + +#[derive(Debug, Clone, Deserialize)] +pub struct UpdateItem { + #[serde(rename = "messageSequenceNumber")] + pub seq: u64, + + #[serde(rename = "conversation")] + pub conversation: Option, + + #[serde(rename = "message")] + pub message: Option, +} + +impl Default for UpdateItem { + fn default() -> Self { + Self { seq: 0, conversation: None, message: None } + } +} \ No newline at end of file diff --git a/kordophone/src/tests/test_client.rs b/kordophone/src/tests/test_client.rs index 1ddfd35..3e06f3a 100644 --- a/kordophone/src/tests/test_client.rs +++ b/kordophone/src/tests/test_client.rs @@ -4,8 +4,12 @@ use std::collections::HashMap; pub use crate::APIInterface; use crate::{ api::http_client::Credentials, - model::{Conversation, ConversationID, JwtToken, Message, MessageID} -}; + model::{Conversation, ConversationID, JwtToken, Message, MessageID, UpdateItem, Event}, + api::event_socket::EventSocket, +}; + +use futures_util::StreamExt; +use futures_util::stream::BoxStream; pub struct TestClient { pub version: &'static str, @@ -28,6 +32,32 @@ impl TestClient { } } +pub struct TestEventSocket { + pub events: Vec, +} + +impl TestEventSocket { + pub fn new() -> Self { + Self { events: vec![] } + } +} + +#[async_trait] +impl EventSocket for TestEventSocket { + type Error = TestError; + type EventStream = BoxStream<'static, Result>; + type UpdateStream = BoxStream<'static, Result, TestError>>; + + async fn events(self) -> Self::EventStream { + futures_util::stream::iter(self.events.into_iter().map(Ok)).boxed() + } + + async fn raw_updates(self) -> Self::UpdateStream { + let results: Vec, TestError>> = vec![]; + futures_util::stream::iter(results.into_iter()).boxed() + } +} + #[async_trait] impl APIInterface for TestClient { type Error = TestError; @@ -57,4 +87,10 @@ impl APIInterface for TestClient { Err(TestError::ConversationNotFound) } + + async fn open_event_socket(&mut self) -> Result { + Ok(TestEventSocket::new()) + } } + + diff --git a/kordophoned/src/dbus/server_impl.rs b/kordophoned/src/dbus/server_impl.rs index 04632e2..9bc0c75 100644 --- a/kordophoned/src/dbus/server_impl.rs +++ b/kordophoned/src/dbus/server_impl.rs @@ -54,9 +54,6 @@ impl DbusRepository for ServerImpl { fn get_conversations(&mut self) -> Result, dbus::MethodErr> { self.send_event_sync(Event::GetAllConversations) .map(|conversations| { - // Convert conversations to DBus property maps - - conversations.into_iter().map(|conv| { let mut map = arg::PropMap::new(); map.insert("guid".into(), arg::Variant(Box::new(conv.guid))); @@ -87,8 +84,6 @@ impl DbusRepository for ServerImpl { self.send_event_sync(|r| Event::GetMessages(conversation_id, last_message_id_opt, r)) .map(|messages| { - - messages.into_iter().map(|msg| { let mut map = arg::PropMap::new(); map.insert("id".into(), arg::Variant(Box::new(msg.id))); diff --git a/kpcli/Cargo.toml b/kpcli/Cargo.toml index 420baf6..cda9f57 100644 --- a/kpcli/Cargo.toml +++ b/kpcli/Cargo.toml @@ -11,6 +11,8 @@ clap = { version = "4.5.20", features = ["derive"] } dbus = "0.9.7" dbus-tree = "0.9.2" dotenv = "0.15.0" +env_logger = "0.11.8" +futures-util = "0.3.31" kordophone = { path = "../kordophone" } kordophone-db = { path = "../kordophone-db" } log = "0.4.22" diff --git a/kpcli/src/client/mod.rs b/kpcli/src/client/mod.rs index 1057601..87a0e08 100644 --- a/kpcli/src/client/mod.rs +++ b/kpcli/src/client/mod.rs @@ -2,10 +2,14 @@ use kordophone::APIInterface; use kordophone::api::http_client::HTTPAPIClient; use kordophone::api::http_client::Credentials; use kordophone::api::InMemoryAuthenticationStore; +use kordophone::api::event_socket::EventSocket; use anyhow::Result; use clap::Subcommand; use crate::printers::{ConversationPrinter, MessagePrinter}; +use kordophone::model::event::Event; + +use futures_util::StreamExt; pub fn make_api_client_from_env() -> HTTPAPIClient { dotenv::dotenv().ok(); @@ -37,6 +41,12 @@ pub enum Commands { /// Prints the server Kordophone version. Version, + + /// Prints all events from the server. + Events, + + /// Prints all raw updates from the server. + RawUpdates, } impl Commands { @@ -46,6 +56,8 @@ impl Commands { Commands::Version => client.print_version().await, Commands::Conversations => client.print_conversations().await, Commands::Messages { conversation_id } => client.print_messages(conversation_id).await, + Commands::RawUpdates => client.print_raw_updates().await, + Commands::Events => client.print_events().await, } } } @@ -82,6 +94,35 @@ impl ClientCli { } Ok(()) } + + pub async fn print_events(&mut self) -> Result<()> { + let socket = self.api.open_event_socket().await?; + + let mut stream = socket.events().await; + while let Some(Ok(event)) = stream.next().await { + match event { + Event::ConversationChanged(conversation) => { + println!("Conversation changed: {}", conversation.guid); + } + Event::MessageReceived(conversation, message) => { + println!("Message received: msg: {} conversation: {}", message.guid, conversation.guid); + } + } + } + Ok(()) + } + + pub async fn print_raw_updates(&mut self) -> Result<()> { + let socket = self.api.open_event_socket().await?; + + println!("Listening for raw updates..."); + let mut stream = socket.raw_updates().await; + while let Some(update) = stream.next().await { + println!("Got update: {:?}", update); + } + + Ok(()) + } } diff --git a/kpcli/src/main.rs b/kpcli/src/main.rs index e0f7743..2cf4ba8 100644 --- a/kpcli/src/main.rs +++ b/kpcli/src/main.rs @@ -5,6 +5,7 @@ mod daemon; use anyhow::Result; use clap::{Parser, Subcommand}; +use log::LevelFilter; /// A command line interface for the Kordophone library and daemon #[derive(Parser)] @@ -43,8 +44,22 @@ async fn run_command(command: Commands) -> Result<()> { } } +fn initialize_logging() { + // Weird: is this the best way to do this? + let log_level = std::env::var("RUST_LOG") + .map(|s| s.parse::().unwrap_or(LevelFilter::Info)) + .unwrap_or(LevelFilter::Info); + + env_logger::Builder::from_default_env() + .format_timestamp_secs() + .filter_level(log_level) + .init(); +} + #[tokio::main] async fn main() { + initialize_logging(); + let cli = Cli::parse(); run_command(cli.command).await