From 22554a764455d43cc3e6e3ce01e36321938f2328 Mon Sep 17 00:00:00 2001 From: James Magahern Date: Sun, 27 Apr 2025 12:53:45 -0700 Subject: [PATCH] daemon: reorg: use channels for comms instead of copying daemon arc/mutex --- kordophoned/src/daemon/events.rs | 17 +++++++ kordophoned/src/daemon/mod.rs | 79 ++++++++++++++++++++--------- kordophoned/src/dbus/server_impl.rs | 78 +++++++++++++++------------- kordophoned/src/main.rs | 28 +++------- 4 files changed, 124 insertions(+), 78 deletions(-) create mode 100644 kordophoned/src/daemon/events.rs diff --git a/kordophoned/src/daemon/events.rs b/kordophoned/src/daemon/events.rs new file mode 100644 index 0000000..d7342df --- /dev/null +++ b/kordophoned/src/daemon/events.rs @@ -0,0 +1,17 @@ +use tokio::sync::oneshot; +use kordophone_db::models::Conversation; + +pub type Reply = oneshot::Sender; + +pub enum Event { + /// Get the version of the daemon. + GetVersion(Reply), + + /// Asynchronous event for syncing all conversations with the server. + SyncAllConversations(Reply<()>), + + /// Returns all known conversations from the database. + GetAllConversations(Reply>), +} + + diff --git a/kordophoned/src/daemon/mod.rs b/kordophoned/src/daemon/mod.rs index d98882c..0f5e9c3 100644 --- a/kordophoned/src/daemon/mod.rs +++ b/kordophoned/src/daemon/mod.rs @@ -1,11 +1,15 @@ -mod settings; +pub mod settings; use settings::Settings; -use std::sync::mpsc; -use directories::ProjectDirs; -use std::path::PathBuf; +pub mod events; +use events::*; + use anyhow::Result; +use directories::ProjectDirs; +use std::error::Error; +use std::path::PathBuf; use thiserror::Error; +use tokio::sync::mpsc::{Sender, Receiver}; use kordophone_db::{ database::Database, @@ -20,19 +24,20 @@ use kordophone::api::{ TokenManagement, }; -pub enum Event { - SyncAllConversations, -} - #[derive(Debug, Error)] pub enum DaemonError { #[error("Client Not Configured")] ClientNotConfigured, } +pub type DaemonResult = Result>; + pub struct Daemon { - pub version: String, + pub event_sender: Sender, + event_receiver: Receiver, + version: String, database: Database, + runtime: tokio::runtime::Runtime, } impl Daemon { @@ -44,15 +49,53 @@ impl Daemon { let database_dir = database_path.parent().unwrap(); std::fs::create_dir_all(database_dir)?; + // Create event channels + let (event_sender, event_receiver) = tokio::sync::mpsc::channel(100); + + // Create background task runtime + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + let database = Database::new(&database_path.to_string_lossy())?; - Ok(Self { version: "0.1.0".to_string(), database }) + Ok(Self { version: "0.1.0".to_string(), database, event_receiver, event_sender, runtime }) } - pub fn get_conversations(&mut self) -> Vec { + pub async fn run(&mut self) { + while let Some(event) = self.event_receiver.recv().await { + self.handle_event(event).await; + } + } + + async fn handle_event(&mut self, event: Event) { + match event { + Event::GetVersion(reply) => { + reply.send(self.version.clone()).unwrap(); + }, + + Event::SyncAllConversations(reply) => { + self.sync_all_conversations().await.unwrap_or_else(|e| { + log::error!("Error handling sync event: {}", e); + }); + + reply.send(()).unwrap(); + }, + + Event::GetAllConversations(reply) => { + let conversations = self.get_conversations(); + reply.send(conversations).unwrap(); + }, + } + } + + fn get_conversations(&mut self) -> Vec { self.database.with_repository(|r| r.all_conversations().unwrap()) } - pub async fn sync_all_conversations(&mut self) -> Result<()> { + async fn sync_all_conversations(&mut self) -> Result<()> { + tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; + let mut client = self.get_client() .map_err(|_| DaemonError::ClientNotConfigured)?; @@ -84,7 +127,7 @@ impl Daemon { Ok(()) } - pub fn get_settings(&mut self) -> Result { + fn get_settings(&mut self) -> Result { let settings = self.database.with_settings(|s| Settings::from_db(s) )?; @@ -92,16 +135,6 @@ impl Daemon { Ok(settings) } - pub async fn handle_event(&mut self, event: Event) { - match event { - Event::SyncAllConversations => { - self.sync_all_conversations().await.unwrap_or_else(|e| { - log::error!("Error handling sync event: {}", e); - }); - } - } - } - fn get_client(&mut self) -> Result { let settings = self.database.with_settings(|s| Settings::from_db(s) diff --git a/kordophoned/src/dbus/server_impl.rs b/kordophoned/src/dbus/server_impl.rs index 0595d51..469507c 100644 --- a/kordophoned/src/dbus/server_impl.rs +++ b/kordophoned/src/dbus/server_impl.rs @@ -4,66 +4,74 @@ use std::sync::Arc; use tokio::sync::{Mutex, MutexGuard}; use std::future::Future; use std::thread; -use std::sync::mpsc; +use tokio::sync::oneshot; +use tokio::sync::mpsc; use futures_util::future::FutureExt; -use crate::daemon::{Daemon, Event}; +use crate::daemon::{ + Daemon, + DaemonResult, + events::{Event, Reply}, +}; + use crate::dbus::interface::NetBuzzertKordophoneRepository as DbusRepository; use crate::dbus::interface::NetBuzzertKordophoneSettings as DbusSettings; #[derive(Clone)] pub struct ServerImpl { - daemon: Arc>, - event_sender: mpsc::Sender, + event_sink: mpsc::Sender, } impl ServerImpl { - pub fn new(daemon: Arc>, event_sender: mpsc::Sender) -> Self { - Self { daemon, event_sender } + pub fn new(event_sink: mpsc::Sender) -> Self { + Self { event_sink } } - pub async fn get_daemon(&self) -> MutexGuard<'_, Daemon> { - self.daemon.lock().await // .map_err(|_| MethodErr::failed("Failed to lock daemon")) + pub async fn send_event( + &self, + make_event: impl FnOnce(Reply) -> Event, + ) -> DaemonResult { + let (reply_tx, reply_rx) = oneshot::channel(); + self.event_sink.send(make_event(reply_tx)) + .await + .map_err(|_| "Failed to send event")?; + + reply_rx.await.map_err(|_| "Failed to receive reply".into()) } - pub fn daemon_then(&self, f: F) -> Result - where F: FnOnce(MutexGuard<'_, Daemon>) -> T + Send, - T: Send, - { - run_sync_future(self.get_daemon().then(|daemon| async move { - f(daemon) - })) + pub fn send_event_sync( + &self, + make_event: impl FnOnce(Reply) -> Event + Send, + ) -> Result { + run_sync_future(self.send_event(make_event)) + .unwrap() + .map_err(|e| MethodErr::failed(&format!("Daemon error: {}", e))) } } impl DbusRepository for ServerImpl { fn get_version(&mut self) -> Result { - self.daemon_then(|daemon| daemon.version.clone()) + self.send_event_sync(Event::GetVersion) } fn get_conversations(&mut self) -> Result, dbus::MethodErr> { - self.daemon_then(|mut daemon| { - let conversations = daemon.get_conversations(); - - // Convert conversations to DBus property maps - let result = conversations.into_iter().map(|conv| { - let mut map = arg::PropMap::new(); - map.insert("guid".into(), arg::Variant(Box::new(conv.guid))); - map.insert("display_name".into(), arg::Variant(Box::new(conv.display_name.unwrap_or_default()))); - map.insert("unread_count".into(), arg::Variant(Box::new(conv.unread_count as i32))); - map - }).collect(); - - Ok(result) - })? + self.send_event_sync(Event::GetAllConversations) + .and_then(|conversations| { + // Convert conversations to DBus property maps + let result = conversations.into_iter().map(|conv| { + let mut map = arg::PropMap::new(); + map.insert("guid".into(), arg::Variant(Box::new(conv.guid))); + map.insert("display_name".into(), arg::Variant(Box::new(conv.display_name.unwrap_or_default()))); + map.insert("unread_count".into(), arg::Variant(Box::new(conv.unread_count as i32))); + map + }).collect(); + + Ok(result) + }) } fn sync_all_conversations(&mut self) -> Result<(), dbus::MethodErr> { - self.event_sender.send(Event::SyncAllConversations).unwrap_or_else(|e| { - log::error!("Error sending sync event: {}", e); - }); - - Ok(()) + self.send_event_sync(Event::SyncAllConversations) } } diff --git a/kordophoned/src/main.rs b/kordophoned/src/main.rs index 0fd1987..a36f719 100644 --- a/kordophoned/src/main.rs +++ b/kordophoned/src/main.rs @@ -2,7 +2,6 @@ mod dbus; mod daemon; use std::future; -use std::sync::mpsc; use log::LevelFilter; use std::sync::Arc; @@ -24,22 +23,16 @@ fn initialize_logging() { async fn main() { initialize_logging(); - let (sender, receiver) = mpsc::channel::(); - // Create the daemon - let daemon = Arc::new( - Mutex::new( - Daemon::new() - .map_err(|e| { - log::error!("Failed to start daemon: {}", e); - std::process::exit(1); - }) - .unwrap() - ) - ); + let mut daemon = Daemon::new() + .map_err(|e| { + log::error!("Failed to start daemon: {}", e); + std::process::exit(1); + }) + .unwrap(); // Create the server implementation - let server = ServerImpl::new(daemon.clone(), sender); + let server = ServerImpl::new(daemon.event_sender.clone()); // Register DBus interfaces with endpoint let endpoint = DbusEndpoint::new(server.clone()); @@ -54,12 +47,7 @@ async fn main() { } ).await; - tokio::spawn(async move { - for event in receiver { - // Important! Only lock the daemon when handling events. - daemon.lock().await.handle_event(event).await; - } - }); + daemon.run().await; future::pending::<()>().await; unreachable!()