diff --git a/Cargo.lock b/Cargo.lock index bdbc538..026e141 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1036,6 +1036,8 @@ dependencies = [ "log", "thiserror 2.0.12", "tokio", + "tokio-condvar", + "uuid", ] [[package]] @@ -1386,35 +1388,14 @@ version = "5.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5" -[[package]] -name = "rand" -version = "0.8.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" -dependencies = [ - "libc", - "rand_chacha 0.3.1", - "rand_core 0.6.4", -] - [[package]] name = "rand" version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fbfd9d094a40bf3ae768db9361049ace4c0e04a4fd6b359518bd7b73a73dd97" dependencies = [ - "rand_chacha 0.9.0", - "rand_core 0.9.3", -] - -[[package]] -name = "rand_chacha" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" -dependencies = [ - "ppv-lite86", - "rand_core 0.6.4", + "rand_chacha", + "rand_core", ] [[package]] @@ -1424,16 +1405,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" dependencies = [ "ppv-lite86", - "rand_core 0.9.3", -] - -[[package]] -name = "rand_core" -version = "0.6.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" -dependencies = [ - "getrandom 0.2.14", + "rand_core", ] [[package]] @@ -1821,6 +1793,15 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "tokio-condvar" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8530e402d24f6a65019baa57593f1769557c670302f493cdf8fa3dfbe4d85ac" +dependencies = [ + "tokio", +] + [[package]] name = "tokio-macros" version = "2.5.0" @@ -1944,7 +1925,7 @@ dependencies = [ "http 1.3.1", "httparse", "log", - "rand 0.9.1", + "rand", "sha1", "thiserror 2.0.12", "utf-8", @@ -1988,20 +1969,20 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.11.0" +version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a" +checksum = "458f7a779bf54acc9f347480ac654f68407d3aab21269a6e3c9f922acd9e2da9" dependencies = [ - "getrandom 0.2.14", - "rand 0.8.5", + "getrandom 0.3.2", + "rand", "uuid-macro-internal", ] [[package]] name = "uuid-macro-internal" -version = "1.11.0" +version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b91f57fe13a38d0ce9e28a03463d8d3c2468ed03d75375110ec71d93b449a08" +checksum = "72dcd78c4f979627a754f5522cea6e6a25e55139056535fe6e69c506cd64a862" dependencies = [ "proc-macro2", "quote", diff --git a/kordophone/src/api/http_client.rs b/kordophone/src/api/http_client.rs index 1a9c3c4..63d2522 100644 --- a/kordophone/src/api/http_client.rs +++ b/kordophone/src/api/http_client.rs @@ -227,7 +227,7 @@ impl APIInterface for HTTPAPIClient { async fn send_message( &mut self, - outgoing_message: OutgoingMessage, + outgoing_message: &OutgoingMessage, ) -> Result { let message: Message = self.request_with_body( "sendMessage", diff --git a/kordophone/src/api/mod.rs b/kordophone/src/api/mod.rs index 81f8fae..d711948 100644 --- a/kordophone/src/api/mod.rs +++ b/kordophone/src/api/mod.rs @@ -15,10 +15,11 @@ pub mod event_socket; pub use event_socket::EventSocket; use self::http_client::Credentials; +use std::fmt::Debug; #[async_trait] pub trait APIInterface { - type Error; + type Error: Debug; // (GET) /version async fn get_version(&mut self) -> Result; @@ -38,7 +39,7 @@ pub trait APIInterface { // (POST) /sendMessage async fn send_message( &mut self, - outgoing_message: OutgoingMessage, + outgoing_message: &OutgoingMessage, ) -> Result; // (POST) /authenticate diff --git a/kordophone/src/model/outgoing_message.rs b/kordophone/src/model/outgoing_message.rs index 1f13d2f..530b741 100644 --- a/kordophone/src/model/outgoing_message.rs +++ b/kordophone/src/model/outgoing_message.rs @@ -1,8 +1,12 @@ use serde::Serialize; use super::conversation::ConversationID; +use uuid::Uuid; #[derive(Debug, Clone, Serialize)] pub struct OutgoingMessage { + #[serde(skip)] + pub guid: Uuid, + #[serde(rename = "body")] pub text: String, @@ -21,6 +25,7 @@ impl OutgoingMessage { #[derive(Default)] pub struct OutgoingMessageBuilder { + guid: Option, text: Option, conversation_id: Option, file_transfer_guids: Option>, @@ -31,6 +36,11 @@ impl OutgoingMessageBuilder { Self::default() } + pub fn guid(mut self, guid: Uuid) -> Self { + self.guid = Some(guid); + self + } + pub fn text(mut self, text: String) -> Self { self.text = Some(text); self @@ -48,6 +58,7 @@ impl OutgoingMessageBuilder { pub fn build(self) -> OutgoingMessage { OutgoingMessage { + guid: self.guid.unwrap_or_else(|| Uuid::new_v4()), text: self.text.unwrap(), conversation_id: self.conversation_id.unwrap(), file_transfer_guids: self.file_transfer_guids.unwrap_or_default(), diff --git a/kordophone/src/tests/test_client.rs b/kordophone/src/tests/test_client.rs index fdc3eff..4edd627 100644 --- a/kordophone/src/tests/test_client.rs +++ b/kordophone/src/tests/test_client.rs @@ -93,15 +93,15 @@ impl APIInterface for TestClient { async fn send_message( &mut self, - outgoing_message: OutgoingMessage, + outgoing_message: &OutgoingMessage, ) -> Result { let message = Message::builder() .guid(Uuid::new_v4().to_string()) - .text(outgoing_message.text) + .text(outgoing_message.text.clone()) .date(OffsetDateTime::now_utc()) .build(); - self.messages.entry(outgoing_message.conversation_id).or_insert(vec![]).push(message.clone()); + self.messages.entry(outgoing_message.conversation_id.clone()).or_insert(vec![]).push(message.clone()); Ok(message) } diff --git a/kordophoned/Cargo.toml b/kordophoned/Cargo.toml index 74c2556..7891e35 100644 --- a/kordophoned/Cargo.toml +++ b/kordophoned/Cargo.toml @@ -18,6 +18,8 @@ kordophone-db = { path = "../kordophone-db" } log = "0.4.25" thiserror = "2.0.12" tokio = { version = "1", features = ["full"] } +tokio-condvar = "0.3.0" +uuid = "1.16.0" [build-dependencies] dbus-codegen = "0.10.0" diff --git a/kordophoned/include/net.buzzert.kordophonecd.Server.xml b/kordophoned/include/net.buzzert.kordophonecd.Server.xml index b346742..01fb1c3 100644 --- a/kordophoned/include/net.buzzert.kordophonecd.Server.xml +++ b/kordophoned/include/net.buzzert.kordophonecd.Server.xml @@ -58,6 +58,15 @@ + + + + + + + + , Reply>), + /// Enqueues a message to be sent to the server. + /// Parameters: + /// - conversation_id: The ID of the conversation to send the message to. + /// - text: The text of the message to send. + /// - reply: The outgoing message ID (not the server-assigned message ID). + SendMessage(String, String, Reply), + /// Delete all conversations from the database. DeleteAllConversations(Reply<()>), } diff --git a/kordophoned/src/daemon/mod.rs b/kordophoned/src/daemon/mod.rs index 4af1875..f4eeda0 100644 --- a/kordophoned/src/daemon/mod.rs +++ b/kordophoned/src/daemon/mod.rs @@ -16,6 +16,7 @@ use thiserror::Error; use tokio::sync::mpsc::{Sender, Receiver}; use std::sync::Arc; use tokio::sync::Mutex; +use uuid::Uuid; use kordophone_db::{ database::{Database, DatabaseAccess}, @@ -24,6 +25,7 @@ use kordophone_db::{ use kordophone::api::APIInterface; use kordophone::api::http_client::HTTPAPIClient; +use kordophone::model::outgoing_message::OutgoingMessage; mod update_monitor; use update_monitor::UpdateMonitor; @@ -31,6 +33,10 @@ use update_monitor::UpdateMonitor; mod auth_store; use auth_store::DatabaseAuthenticationStore; +mod post_office; +use post_office::PostOffice; +use post_office::Event as PostOfficeEvent; + #[derive(Debug, Error)] pub enum DaemonError { #[error("Client Not Configured")] @@ -52,6 +58,9 @@ pub struct Daemon { signal_receiver: Option>, signal_sender: Sender, + post_office_sink: Sender, + post_office_source: Option>, + version: String, database: Arc>, runtime: tokio::runtime::Runtime, @@ -69,6 +78,7 @@ impl Daemon { // Create event channels let (event_sender, event_receiver) = tokio::sync::mpsc::channel(100); let (signal_sender, signal_receiver) = tokio::sync::mpsc::channel(100); + let (post_office_sink, post_office_source) = tokio::sync::mpsc::channel(100); // Create background task runtime let runtime = tokio::runtime::Builder::new_multi_thread() .enable_all() @@ -84,6 +94,8 @@ impl Daemon { event_sender, signal_receiver: Some(signal_receiver), signal_sender, + post_office_sink, + post_office_source: Some(post_office_source), runtime }) } @@ -92,12 +104,23 @@ impl Daemon { log::info!("Starting daemon version {}", self.version); log::debug!("Debug logging enabled."); + // Update monitor let mut update_monitor = UpdateMonitor::new(self.database.clone(), self.event_sender.clone()); - tokio::spawn(async move { update_monitor.run().await; // should run indefinitely }); + // Post office + { + let mut database = self.database.clone(); + let event_sender = self.event_sender.clone(); + let post_office_source = self.post_office_source.take().unwrap(); + tokio::spawn(async move { + let mut post_office = PostOffice::new(post_office_source, event_sender, async move || Self::get_client_impl(&mut database).await ); + post_office.run().await; + }); + } + while let Some(event) = self.event_receiver.recv().await { log::debug!(target: target::EVENT, "Received event: {:?}", event); self.handle_event(event).await; @@ -188,6 +211,11 @@ impl Daemon { reply.send(()).unwrap(); }, + + Event::SendMessage(conversation_id, text, reply) => { + let uuid = self.enqueue_outgoing_message(text, conversation_id).await; + reply.send(uuid).unwrap(); + }, } } @@ -204,6 +232,18 @@ impl Daemon { self.database.lock().await.with_repository(|r| r.get_messages_for_conversation(&conversation_id).unwrap()).await } + async fn enqueue_outgoing_message(&mut self, text: String, conversation_id: String) -> Uuid { + let outgoing_message = OutgoingMessage::builder() + .text(text) + .conversation_id(conversation_id) + .build(); + + let guid = outgoing_message.guid.clone(); + self.post_office_sink.send(PostOfficeEvent::EnqueueOutgoingMessage(outgoing_message)).await.unwrap(); + + guid + } + async fn sync_conversation_list(database: &mut Arc>, signal_sender: &Sender) -> Result<()> { log::info!(target: target::SYNC, "Starting list conversation sync"); diff --git a/kordophoned/src/daemon/post_office.rs b/kordophoned/src/daemon/post_office.rs new file mode 100644 index 0000000..88ebe30 --- /dev/null +++ b/kordophoned/src/daemon/post_office.rs @@ -0,0 +1,115 @@ +use std::collections::VecDeque; +use std::time::Duration; + +use tokio::sync::mpsc::{Sender, Receiver}; +use tokio::sync::{Mutex, MutexGuard}; +use tokio_condvar::Condvar; + +use crate::daemon::events::Event as DaemonEvent; +use kordophone::model::outgoing_message::OutgoingMessage; +use kordophone::api::APIInterface; + +use anyhow::Result; + +mod target { + pub static POST_OFFICE: &str = "post_office"; +} + +#[derive(Debug)] +pub enum Event { + EnqueueOutgoingMessage(OutgoingMessage), +} + +pub struct PostOffice Result> { + event_source: Receiver, + event_sink: Sender, + make_client: F, + message_queue: Mutex>, + message_available: Condvar, +} + +impl Result> PostOffice { + pub fn new(event_source: Receiver, event_sink: Sender, make_client: F) -> Self { + Self { + event_source, + event_sink, + make_client, + message_queue: Mutex::new(VecDeque::new()), + message_available: Condvar::new(), + } + } + + pub async fn queue_message(&mut self, message: &OutgoingMessage) { + self.message_queue.lock().await.push_back(message.clone()); + self.message_available.notify_one(); + } + + pub async fn run(&mut self) { + log::info!(target: target::POST_OFFICE, "Starting post office"); + + loop { + let mut retry_messages = Vec::new(); + + log::debug!(target: target::POST_OFFICE, "Waiting for event"); + + tokio::select! { + // Incoming events + Some(event) = self.event_source.recv() => { + match event { + Event::EnqueueOutgoingMessage(message) => { + log::debug!(target: target::POST_OFFICE, "Received enqueue outgoing message event"); + self.message_queue.lock().await.push_back(message); + self.message_available.notify_one(); + } + } + } + + // Message queue + mut lock = self.message_available.wait(self.message_queue.lock().await) => { + log::debug!(target: target::POST_OFFICE, "Message available in queue"); + retry_messages = Self::try_send_message_impl(&mut lock, &mut self.make_client).await; + } + } + + if !retry_messages.is_empty() { + log::debug!(target: target::POST_OFFICE, "Queueing {} messages for retry", retry_messages.len()); + for message in retry_messages { + self.queue_message(&message).await; + } + } + } + } + + async fn try_send_message_impl(message_queue: &mut MutexGuard<'_, VecDeque>, make_client: &mut F) -> Vec { + log::debug!(target: target::POST_OFFICE, "Trying to send enqueued messages"); + + let mut retry_messages = Vec::new(); + while let Some(message) = message_queue.pop_front() { + match (make_client)().await { + Ok(mut client) => { + log::debug!(target: target::POST_OFFICE, "Obtained client, sending message."); + match client.send_message(&message).await { + Ok(message) => { + log::info!(target: target::POST_OFFICE, "Message sent successfully: {}", message.guid); + // TODO: Notify the daemon via the event sink. + } + Err(e) => { + log::error!(target: target::POST_OFFICE, "Error sending message: {:?}", e); + log::warn!(target: target::POST_OFFICE, "Retrying in 5 seconds"); + tokio::time::sleep(Duration::from_secs(5)).await; + retry_messages.push(message); + } + } + } + + Err(e) => { + log::error!(target: target::POST_OFFICE, "Error creating client: {:?}", e); + log::warn!(target: target::POST_OFFICE, "Retrying in 5 seconds"); + tokio::time::sleep(Duration::from_secs(5)).await; + } + } + } + + retry_messages + } +} \ No newline at end of file diff --git a/kordophoned/src/dbus/server_impl.rs b/kordophoned/src/dbus/server_impl.rs index 5c00a58..2dc8a38 100644 --- a/kordophoned/src/dbus/server_impl.rs +++ b/kordophoned/src/dbus/server_impl.rs @@ -102,6 +102,11 @@ impl DbusRepository for ServerImpl { fn delete_all_conversations(&mut self) -> Result<(), dbus::MethodErr> { self.send_event_sync(Event::DeleteAllConversations) } + + fn send_message(&mut self, conversation_id: String, text: String) -> Result { + self.send_event_sync(|r| Event::SendMessage(conversation_id, text, r)) + .map(|uuid| uuid.to_string()) + } } impl DbusSettings for ServerImpl { diff --git a/kpcli/src/client/mod.rs b/kpcli/src/client/mod.rs index d805f9e..9356d33 100644 --- a/kpcli/src/client/mod.rs +++ b/kpcli/src/client/mod.rs @@ -138,7 +138,7 @@ impl ClientCli { .text(message) .build(); - let message = self.api.send_message(outgoing_message).await?; + let message = self.api.send_message(&outgoing_message).await?; println!("Message sent: {}", message.guid); Ok(()) } diff --git a/kpcli/src/daemon/mod.rs b/kpcli/src/daemon/mod.rs index ad5b52f..26bafd9 100644 --- a/kpcli/src/daemon/mod.rs +++ b/kpcli/src/daemon/mod.rs @@ -48,6 +48,12 @@ pub enum Commands { /// Deletes all conversations. DeleteAllConversations, + + /// Enqueues an outgoing message to be sent to a conversation. + SendMessage { + conversation_id: String, + text: String, + }, } #[derive(Subcommand)] @@ -83,6 +89,7 @@ impl Commands { Commands::Signals => client.wait_for_signals().await, Commands::Messages { conversation_id, last_message_id } => client.print_messages(conversation_id, last_message_id).await, Commands::DeleteAllConversations => client.delete_all_conversations().await, + Commands::SendMessage { conversation_id, text } => client.enqueue_outgoing_message(conversation_id, text).await, } } } @@ -145,6 +152,12 @@ impl DaemonCli { Ok(()) } + pub async fn enqueue_outgoing_message(&mut self, conversation_id: String, text: String) -> Result<()> { + let outgoing_message_id = KordophoneRepository::send_message(&self.proxy(), &conversation_id, &text)?; + println!("Outgoing message ID: {}", outgoing_message_id); + Ok(()) + } + pub async fn wait_for_signals(&mut self) -> Result<()> { use dbus::Message; mod dbus_signals {