diff --git a/kordophoned/src/dbus/endpoint.rs b/kordophoned/src/dbus/endpoint.rs index ddef080..ec76030 100644 --- a/kordophoned/src/dbus/endpoint.rs +++ b/kordophoned/src/dbus/endpoint.rs @@ -1,5 +1,5 @@ use log::info; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use dbus::{ channel::{MatchingReceiver, Sender}, @@ -9,6 +9,72 @@ use dbus::{ }; use dbus_crossroads::Crossroads; +#[derive(Clone)] +pub struct DbusRegistry { + connection: Arc, + crossroads: Arc>, + message_handler_started: Arc>, +} + +impl DbusRegistry { + pub fn new(connection: Arc) -> Self { + let mut cr = Crossroads::new(); + // Enable async support for the crossroads instance. + // (Currently irrelevant since dbus generates sync code) + cr.set_async_support(Some(( + connection.clone(), + Box::new(|x| { + tokio::spawn(x); + }), + ))); + + Self { + connection, + crossroads: Arc::new(Mutex::new(cr)), + message_handler_started: Arc::new(Mutex::new(false)), + } + } + + pub fn register_object(&self, path: &str, implementation: T, register_fn: F) + where + T: Send + Clone + 'static, + F: Fn(&mut Crossroads) -> R, + R: IntoIterator>, + { + let dbus_path = String::from(path); + + let mut cr = self.crossroads.lock().unwrap(); + let tokens: Vec<_> = register_fn(&mut cr).into_iter().collect(); + cr.insert(dbus_path, &tokens, implementation); + + // Start message handler if not already started + let mut handler_started = self.message_handler_started.lock().unwrap(); + if !*handler_started { + let crossroads_clone = self.crossroads.clone(); + self.connection.start_receive( + MatchRule::new_method_call(), + Box::new(move |msg, conn| { + let mut cr = crossroads_clone.lock().unwrap(); + cr.handle_message(msg, conn).is_ok() + }), + ); + *handler_started = true; + info!(target: "dbus", "Started D-Bus message handler"); + } + + info!(target: "dbus", "Registered object at {} with {} interfaces", path, tokens.len()); + } + + pub fn send_signal(&self, path: &str, signal: S) -> Result + where + S: dbus::message::SignalArgs + dbus::arg::AppendAll, + { + let message = signal.to_emit_message(&Path::new(path).unwrap()); + self.connection.send(message) + } +} + +// Keep the old Endpoint struct for backward compatibility during transition #[derive(Clone)] pub struct Endpoint { connection: Arc, diff --git a/kordophoned/src/dbus/server_impl.rs b/kordophoned/src/dbus/server_impl.rs index 8f853db..cb94674 100644 --- a/kordophoned/src/dbus/server_impl.rs +++ b/kordophoned/src/dbus/server_impl.rs @@ -1,8 +1,6 @@ use dbus::arg; -use dbus::nonblock::SyncConnection; use dbus_tree::MethodErr; use std::future::Future; -use std::sync::Arc; use std::thread; use tokio::sync::mpsc; use tokio::sync::oneshot; @@ -13,24 +11,22 @@ use crate::daemon::{ Attachment, DaemonResult, }; -use crate::dbus::endpoint::Endpoint; +use crate::dbus::endpoint::DbusRegistry; use crate::dbus::interface::NetBuzzertKordophoneAttachment as DbusAttachment; use crate::dbus::interface::NetBuzzertKordophoneRepository as DbusRepository; use crate::dbus::interface::NetBuzzertKordophoneSettings as DbusSettings; #[derive(Clone)] pub struct ServerImpl { - connection: Arc, event_sink: mpsc::Sender, - attachment_objects: Vec>, + dbus_registry: DbusRegistry, } impl ServerImpl { - pub fn new(connection: Arc, event_sink: mpsc::Sender) -> Self { + pub fn new(event_sink: mpsc::Sender, dbus_registry: DbusRegistry) -> Self { Self { - connection: connection, event_sink: event_sink, - attachment_objects: vec![], + dbus_registry: dbus_registry, } } @@ -175,13 +171,11 @@ impl DbusRepository for ServerImpl { let obj_path = format!("/net/buzzert/kordophonecd/attachments/{}", &id); log::trace!("Registering attachment at path: {}", &obj_path); - let endpoint = Endpoint::new(self.connection.clone(), attachment); - run_sync_future(endpoint.register_object(obj_path.as_str(), |cr| { - vec![interface::register_net_buzzert_kordophone_attachment(cr)] - }))?; - - self.attachment_objects.push(endpoint); - log::trace!("Attachment objects: {:?}", self.attachment_objects.len()); + self.dbus_registry.register_object( + &obj_path, + attachment, + |cr| vec![interface::register_net_buzzert_kordophone_attachment(cr)] + ); Ok(obj_path.into()) }) diff --git a/kordophoned/src/main.rs b/kordophoned/src/main.rs index 4a0f65d..4cd6727 100644 --- a/kordophoned/src/main.rs +++ b/kordophoned/src/main.rs @@ -5,9 +5,9 @@ use log::LevelFilter; use std::future; use daemon::signals::Signal; -use daemon::{Attachment, Daemon}; +use daemon::Daemon; -use dbus::endpoint::Endpoint as DbusEndpoint; +use dbus::endpoint::DbusRegistry; use dbus::interface; use dbus::server_impl::ServerImpl; use dbus_tokio::connection; @@ -54,32 +54,20 @@ async fn main() { .await .expect("Unable to acquire dbus name"); - let attachment = Attachment { - guid: "asdf".into(), - path: "/dev/null".into(), - downloaded: false, - }; + // Create shared D-Bus registry + let dbus_registry = DbusRegistry::new(connection.clone()); - let att_endpoint = DbusEndpoint::new(connection.clone(), attachment); - att_endpoint - .register_object("/net/buzzert/kordophonecd/attachments/test", |cr| { - vec![interface::register_net_buzzert_kordophone_attachment(cr)] - }) - .await; + // Create and register server implementation + let server = ServerImpl::new(daemon.event_sender.clone(), dbus_registry.clone()); - // Create the server implementation - let server = ServerImpl::new(connection.clone(), daemon.event_sender.clone()); - - // Register DBus interfaces with endpoint - let endpoint = DbusEndpoint::new(connection.clone(), server); - endpoint - .register_object(interface::OBJECT_PATH, |cr| { - vec![ - interface::register_net_buzzert_kordophone_repository(cr), - interface::register_net_buzzert_kordophone_settings(cr), - ] - }) - .await; + dbus_registry.register_object( + interface::OBJECT_PATH, + server, + |cr| vec![ + interface::register_net_buzzert_kordophone_repository(cr), + interface::register_net_buzzert_kordophone_settings(cr), + ] + ); let mut signal_receiver = daemon.obtain_signal_receiver(); tokio::spawn(async move { @@ -89,7 +77,7 @@ async fn main() { match signal { Signal::ConversationsUpdated => { log::debug!("Sending signal: ConversationsUpdated"); - endpoint + dbus_registry .send_signal(interface::OBJECT_PATH, DbusSignals::ConversationsUpdated {}) .unwrap_or_else(|_| { log::error!("Failed to send signal"); @@ -102,7 +90,7 @@ async fn main() { "Sending signal: MessagesUpdated for conversation {}", conversation_id ); - endpoint + dbus_registry .send_signal( interface::OBJECT_PATH, DbusSignals::MessagesUpdated { conversation_id },