From ef74df9f28031b9f1fc40428dc645fac227650e7 Mon Sep 17 00:00:00 2001 From: James Magahern Date: Fri, 25 Apr 2025 21:42:29 -0700 Subject: [PATCH] daemon: start working on events. notes: Probably need to make the locking mechanism more granular. Only lock the database during db writes, see if we can do multiple readers and a single writer. Otherwise, the daemon will not be able to service requests while an event is being handled, which is not good. --- Cargo.lock | 26 +++++-- kordophoned/Cargo.toml | 1 + .../net.buzzert.kordophonecd.Server.xml | 3 - kordophoned/src/daemon/mod.rs | 16 ++++- kordophoned/src/dbus/server_impl.rs | 72 ++++++++++--------- kordophoned/src/main.rs | 16 ++++- kpcli/src/daemon/mod.rs | 2 +- 7 files changed, 90 insertions(+), 46 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a32f8de..8a78e5e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -574,9 +574,20 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" + +[[package]] +name = "futures-macro" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] [[package]] name = "futures-sink" @@ -586,20 +597,22 @@ checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" [[package]] name = "futures-task" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" [[package]] name = "futures-util" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ "futures-core", + "futures-macro", "futures-task", "pin-project-lite", "pin-utils", + "slab", ] [[package]] @@ -849,6 +862,7 @@ dependencies = [ "dbus-tree", "directories", "env_logger", + "futures-util", "kordophone", "kordophone-db", "log", diff --git a/kordophoned/Cargo.toml b/kordophoned/Cargo.toml index d5c6ac7..e14a322 100644 --- a/kordophoned/Cargo.toml +++ b/kordophoned/Cargo.toml @@ -11,6 +11,7 @@ dbus-tokio = "0.7.6" dbus-tree = "0.9.2" directories = "6.0.0" env_logger = "0.11.6" +futures-util = "0.3.31" kordophone = { path = "../kordophone" } kordophone-db = { path = "../kordophone-db" } log = "0.4.25" diff --git a/kordophoned/include/net.buzzert.kordophonecd.Server.xml b/kordophoned/include/net.buzzert.kordophonecd.Server.xml index 760eda9..9f8bc32 100644 --- a/kordophoned/include/net.buzzert.kordophonecd.Server.xml +++ b/kordophoned/include/net.buzzert.kordophonecd.Server.xml @@ -18,10 +18,7 @@ - - - diff --git a/kordophoned/src/daemon/mod.rs b/kordophoned/src/daemon/mod.rs index 17282f1..d98882c 100644 --- a/kordophoned/src/daemon/mod.rs +++ b/kordophoned/src/daemon/mod.rs @@ -1,6 +1,7 @@ mod settings; use settings::Settings; +use std::sync::mpsc; use directories::ProjectDirs; use std::path::PathBuf; use anyhow::Result; @@ -19,6 +20,10 @@ use kordophone::api::{ TokenManagement, }; +pub enum Event { + SyncAllConversations, +} + #[derive(Debug, Error)] pub enum DaemonError { #[error("Client Not Configured")] @@ -87,6 +92,16 @@ impl Daemon { Ok(settings) } + pub async fn handle_event(&mut self, event: Event) { + match event { + Event::SyncAllConversations => { + self.sync_all_conversations().await.unwrap_or_else(|e| { + log::error!("Error handling sync event: {}", e); + }); + } + } + } + fn get_client(&mut self) -> Result { let settings = self.database.with_settings(|s| Settings::from_db(s) @@ -132,4 +147,3 @@ impl TokenManagement for &mut Daemon { self.database.set_token(token); } } - diff --git a/kordophoned/src/dbus/server_impl.rs b/kordophoned/src/dbus/server_impl.rs index c957c3a..0595d51 100644 --- a/kordophoned/src/dbus/server_impl.rs +++ b/kordophoned/src/dbus/server_impl.rs @@ -1,63 +1,69 @@ use dbus::arg; use dbus_tree::MethodErr; -use std::sync::{Arc, Mutex, MutexGuard}; +use std::sync::Arc; +use tokio::sync::{Mutex, MutexGuard}; use std::future::Future; use std::thread; +use std::sync::mpsc; +use futures_util::future::FutureExt; -use crate::daemon::Daemon; +use crate::daemon::{Daemon, Event}; use crate::dbus::interface::NetBuzzertKordophoneRepository as DbusRepository; use crate::dbus::interface::NetBuzzertKordophoneSettings as DbusSettings; #[derive(Clone)] pub struct ServerImpl { daemon: Arc>, + event_sender: mpsc::Sender, } impl ServerImpl { - pub fn new(daemon: Arc>) -> Self { - Self { daemon } + pub fn new(daemon: Arc>, event_sender: mpsc::Sender) -> Self { + Self { daemon, event_sender } } - pub fn get_daemon(&self) -> Result, MethodErr> { - self.daemon.lock().map_err(|_| MethodErr::failed("Failed to lock daemon")) + pub async fn get_daemon(&self) -> MutexGuard<'_, Daemon> { + self.daemon.lock().await // .map_err(|_| MethodErr::failed("Failed to lock daemon")) + } + + pub fn daemon_then(&self, f: F) -> Result + where F: FnOnce(MutexGuard<'_, Daemon>) -> T + Send, + T: Send, + { + run_sync_future(self.get_daemon().then(|daemon| async move { + f(daemon) + })) } } impl DbusRepository for ServerImpl { fn get_version(&mut self) -> Result { - let daemon = self.get_daemon()?; - Ok(daemon.version.clone()) + self.daemon_then(|daemon| daemon.version.clone()) } fn get_conversations(&mut self) -> Result, dbus::MethodErr> { - // Get a repository instance and use it to fetch conversations - let mut daemon = self.get_daemon()?; - let conversations = daemon.get_conversations(); - - // Convert conversations to DBus property maps - let result = conversations.into_iter().map(|conv| { - let mut map = arg::PropMap::new(); - map.insert("guid".into(), arg::Variant(Box::new(conv.guid))); - map.insert("display_name".into(), arg::Variant(Box::new(conv.display_name.unwrap_or_default()))); - map.insert("unread_count".into(), arg::Variant(Box::new(conv.unread_count as i32))); - map - }).collect(); - - Ok(result) + self.daemon_then(|mut daemon| { + let conversations = daemon.get_conversations(); + + // Convert conversations to DBus property maps + let result = conversations.into_iter().map(|conv| { + let mut map = arg::PropMap::new(); + map.insert("guid".into(), arg::Variant(Box::new(conv.guid))); + map.insert("display_name".into(), arg::Variant(Box::new(conv.display_name.unwrap_or_default()))); + map.insert("unread_count".into(), arg::Variant(Box::new(conv.unread_count as i32))); + map + }).collect(); + + Ok(result) + })? } - fn sync_all_conversations(&mut self) -> Result { - let mut daemon = self.get_daemon()?; + fn sync_all_conversations(&mut self) -> Result<(), dbus::MethodErr> { + self.event_sender.send(Event::SyncAllConversations).unwrap_or_else(|e| { + log::error!("Error sending sync event: {}", e); + }); - // TODO: We don't actually probably want to block here. - run_sync_future(daemon.sync_all_conversations()) - .unwrap() - .map_err(|e| { - log::error!("Failed to sync conversations: {}", e); - MethodErr::failed(&format!("Failed to sync conversations: {}", e)) - })?; - - Ok(true) + Ok(()) } } diff --git a/kordophoned/src/main.rs b/kordophoned/src/main.rs index e6772e5..0fd1987 100644 --- a/kordophoned/src/main.rs +++ b/kordophoned/src/main.rs @@ -2,9 +2,12 @@ mod dbus; mod daemon; use std::future; -use std::sync::{Arc, Mutex}; +use std::sync::mpsc; use log::LevelFilter; +use std::sync::Arc; +use tokio::sync::Mutex; + use daemon::Daemon; use dbus::endpoint::Endpoint as DbusEndpoint; use dbus::interface; @@ -21,6 +24,8 @@ fn initialize_logging() { async fn main() { initialize_logging(); + let (sender, receiver) = mpsc::channel::(); + // Create the daemon let daemon = Arc::new( Mutex::new( @@ -34,7 +39,7 @@ async fn main() { ); // Create the server implementation - let server = ServerImpl::new(daemon); + let server = ServerImpl::new(daemon.clone(), sender); // Register DBus interfaces with endpoint let endpoint = DbusEndpoint::new(server.clone()); @@ -49,6 +54,13 @@ async fn main() { } ).await; + tokio::spawn(async move { + for event in receiver { + // Important! Only lock the daemon when handling events. + daemon.lock().await.handle_event(event).await; + } + }); + future::pending::<()>().await; unreachable!() } diff --git a/kpcli/src/daemon/mod.rs b/kpcli/src/daemon/mod.rs index 8c89165..7a96180 100644 --- a/kpcli/src/daemon/mod.rs +++ b/kpcli/src/daemon/mod.rs @@ -64,7 +64,7 @@ impl DaemonCli { pub async fn sync_conversations(&mut self) -> Result<()> { let success = KordophoneRepository::sync_all_conversations(&self.proxy())?; - println!("Synced conversations: {}", success); + println!("Initiated sync"); Ok(()) } } \ No newline at end of file