From 1e9b5709933401990a9685647e81fe77c62588fe Mon Sep 17 00:00:00 2001 From: James Magahern Date: Sun, 27 Apr 2025 22:44:05 -0700 Subject: [PATCH] devises a strategy for signals --- .../net.buzzert.kordophonecd.Server.xml | 9 ++++++ kordophoned/src/daemon/mod.rs | 32 ++++++++++++++++--- kordophoned/src/daemon/signals.rs | 4 +++ kordophoned/src/dbus/mod.rs | 4 +++ kordophoned/src/dbus/server_impl.rs | 1 + kordophoned/src/main.rs | 25 ++++++++++++--- kpcli/src/daemon/mod.rs | 24 ++++++++++++++ 7 files changed, 91 insertions(+), 8 deletions(-) create mode 100644 kordophoned/src/daemon/signals.rs diff --git a/kordophoned/include/net.buzzert.kordophonecd.Server.xml b/kordophoned/include/net.buzzert.kordophonecd.Server.xml index e5f4474..00e98f8 100644 --- a/kordophoned/include/net.buzzert.kordophonecd.Server.xml +++ b/kordophoned/include/net.buzzert.kordophonecd.Server.xml @@ -4,6 +4,8 @@ + @@ -21,7 +23,14 @@ + + + + + diff --git a/kordophoned/src/daemon/mod.rs b/kordophoned/src/daemon/mod.rs index e50fd7d..58e2740 100644 --- a/kordophoned/src/daemon/mod.rs +++ b/kordophoned/src/daemon/mod.rs @@ -4,6 +4,9 @@ use settings::Settings; pub mod events; use events::*; +pub mod signals; +use signals::*; + use anyhow::Result; use directories::ProjectDirs; use std::error::Error; @@ -57,6 +60,10 @@ mod target { pub struct Daemon { pub event_sender: Sender, event_receiver: Receiver, + + signal_receiver: Option>, + signal_sender: Sender, + version: String, database: Arc>, runtime: tokio::runtime::Runtime, @@ -73,7 +80,7 @@ impl Daemon { // Create event channels let (event_sender, event_receiver) = tokio::sync::mpsc::channel(100); - + let (signal_sender, signal_receiver) = tokio::sync::mpsc::channel(100); // Create background task runtime let runtime = tokio::runtime::Builder::new_multi_thread() .enable_all() @@ -82,7 +89,15 @@ impl Daemon { let database_impl = Database::new(&database_path.to_string_lossy())?; let database = Arc::new(Mutex::new(database_impl)); - Ok(Self { version: "0.1.0".to_string(), database, event_receiver, event_sender, runtime }) + Ok(Self { + version: "0.1.0".to_string(), + database, + event_receiver, + event_sender, + signal_receiver: Some(signal_receiver), + signal_sender, + runtime + }) } pub async fn run(&mut self) { @@ -99,8 +114,9 @@ impl Daemon { Event::SyncAllConversations(reply) => { let db_clone = self.database.clone(); + let signal_sender = self.signal_sender.clone(); self.runtime.spawn(async move { - let result = Self::sync_all_conversations_impl(db_clone).await; + let result = Self::sync_all_conversations_impl(db_clone, signal_sender).await; if let Err(e) = result { log::error!("Error handling sync event: {}", e); } @@ -136,11 +152,16 @@ impl Daemon { } } + /// Panics if the signal receiver has already been taken. + pub fn obtain_signal_receiver(&mut self) -> Receiver { + self.signal_receiver.take().unwrap() + } + async fn get_conversations(&mut self) -> Vec { self.database.lock().await.with_repository(|r| r.all_conversations().unwrap()).await } - async fn sync_all_conversations_impl(mut database: Arc>) -> Result<()> { + async fn sync_all_conversations_impl(mut database: Arc>, signal_sender: Sender) -> Result<()> { log::info!(target: target::SYNC, "Starting conversation sync"); let mut client = Self::get_client_impl(database.clone()).await?; @@ -171,6 +192,9 @@ impl Daemon { database.with_repository(|r| r.insert_messages(&conversation_id, db_messages)).await?; } + // Send conversations updated signal. + signal_sender.send(Signal::ConversationsUpdated).await?; + log::info!(target: target::SYNC, "Synchronized {} conversations", num_conversations); Ok(()) } diff --git a/kordophoned/src/daemon/signals.rs b/kordophoned/src/daemon/signals.rs new file mode 100644 index 0000000..05bb75f --- /dev/null +++ b/kordophoned/src/daemon/signals.rs @@ -0,0 +1,4 @@ +#[derive(Debug, Clone)] +pub enum Signal { + ConversationsUpdated, +} diff --git a/kordophoned/src/dbus/mod.rs b/kordophoned/src/dbus/mod.rs index 66c8455..a901658 100644 --- a/kordophoned/src/dbus/mod.rs +++ b/kordophoned/src/dbus/mod.rs @@ -8,4 +8,8 @@ pub mod interface { pub const OBJECT_PATH: &str = "/net/buzzert/kordophonecd/daemon"; include!(concat!(env!("OUT_DIR"), "/kordophone-server.rs")); + + pub mod signals { + pub use crate::interface::NetBuzzertKordophoneRepositoryConversationsUpdated as ConversationsUpdated; + } } \ No newline at end of file diff --git a/kordophoned/src/dbus/server_impl.rs b/kordophoned/src/dbus/server_impl.rs index 8ab4cbf..63458a3 100644 --- a/kordophoned/src/dbus/server_impl.rs +++ b/kordophoned/src/dbus/server_impl.rs @@ -9,6 +9,7 @@ use crate::daemon::{ DaemonResult, events::{Event, Reply}, settings::Settings, + signals::Signal, }; use crate::dbus::interface::NetBuzzertKordophoneRepository as DbusRepository; diff --git a/kordophoned/src/main.rs b/kordophoned/src/main.rs index a36f719..9b507aa 100644 --- a/kordophoned/src/main.rs +++ b/kordophoned/src/main.rs @@ -4,10 +4,9 @@ mod daemon; use std::future; use log::LevelFilter; -use std::sync::Arc; -use tokio::sync::Mutex; - use daemon::Daemon; +use daemon::signals::Signal; + use dbus::endpoint::Endpoint as DbusEndpoint; use dbus::interface; use dbus::server_impl::ServerImpl; @@ -35,7 +34,7 @@ async fn main() { let server = ServerImpl::new(daemon.event_sender.clone()); // Register DBus interfaces with endpoint - let endpoint = DbusEndpoint::new(server.clone()); + let endpoint = DbusEndpoint::new(server); endpoint.register( interface::NAME, interface::OBJECT_PATH, @@ -47,6 +46,24 @@ async fn main() { } ).await; + let mut signal_receiver = daemon.obtain_signal_receiver(); + tokio::spawn(async move { + use dbus::interface::signals as DbusSignals; + + while let Some(signal) = signal_receiver.recv().await { + match signal { + Signal::ConversationsUpdated => { + log::info!("Sending signal: ConversationsUpdated"); + endpoint.send_signal(interface::OBJECT_PATH, DbusSignals::ConversationsUpdated{}) + .unwrap_or_else(|_| { + log::error!("Failed to send signal"); + 0 + }); + } + } + } + }); + daemon.run().await; future::pending::<()>().await; diff --git a/kpcli/src/daemon/mod.rs b/kpcli/src/daemon/mod.rs index 17889aa..e2a6644 100644 --- a/kpcli/src/daemon/mod.rs +++ b/kpcli/src/daemon/mod.rs @@ -2,6 +2,7 @@ use anyhow::Result; use clap::Subcommand; use dbus::blocking::{Connection, Proxy}; use crate::printers::{ConversationPrinter, MessagePrinter}; +use std::future; const DBUS_NAME: &str = "net.buzzert.kordophonecd"; const DBUS_PATH: &str = "/net/buzzert/kordophonecd/daemon"; @@ -30,6 +31,9 @@ pub enum Commands { #[command(subcommand)] command: ConfigCommands, }, + + /// Waits for signals from the daemon. + Signals, } #[derive(Subcommand)] @@ -61,6 +65,7 @@ impl Commands { Commands::Conversations => client.print_conversations().await, Commands::Sync => client.sync_conversations().await, Commands::Config { command } => client.config(command).await, + Commands::Signals => client.wait_for_signals().await, } } } @@ -102,6 +107,25 @@ impl DaemonCli { .map_err(|e| anyhow::anyhow!("Failed to sync conversations: {}", e)) } + pub async fn wait_for_signals(&mut self) -> Result<()> { + use dbus::Message; + mod dbus_signals { + pub use super::dbus_interface::NetBuzzertKordophoneRepositoryConversationsUpdated as ConversationsUpdated; + } + + let _id = self.proxy().match_signal(|h: dbus_signals::ConversationsUpdated, _: &Connection, _: &Message| { + println!("Signal: Conversations updated"); + true + }); + + println!("Waiting for signals..."); + loop { + self.conn.process(std::time::Duration::from_millis(1000))?; + } + + Ok(()) + } + pub async fn config(&mut self, cmd: ConfigCommands) -> Result<()> { match cmd { ConfigCommands::Print => self.print_settings().await,