From fa6c7c50b7eb1222bb0bc2cb7bd54f4e6a26cc91 Mon Sep 17 00:00:00 2001 From: James Magahern Date: Wed, 18 Jun 2025 01:03:14 -0700 Subject: [PATCH] Refactor: serverimpl -> dbus::agent, clean up main.rs --- .../src/dbus/{server_impl.rs => agent.rs} | 277 ++++++++++++------ kordophoned/src/dbus/mod.rs | 12 +- kordophoned/src/main.rs | 126 +------- 3 files changed, 203 insertions(+), 212 deletions(-) rename kordophoned/src/dbus/{server_impl.rs => agent.rs} (54%) diff --git a/kordophoned/src/dbus/server_impl.rs b/kordophoned/src/dbus/agent.rs similarity index 54% rename from kordophoned/src/dbus/server_impl.rs rename to kordophoned/src/dbus/agent.rs index 6bcc241..21708bd 100644 --- a/kordophoned/src/dbus/server_impl.rs +++ b/kordophoned/src/dbus/agent.rs @@ -1,35 +1,155 @@ use dbus::arg; use dbus_tree::MethodErr; -use std::future::Future; -use std::thread; -use tokio::sync::mpsc; -use tokio::sync::oneshot; +use std::{future::Future, thread}; +use std::sync::Arc; +use tokio::sync::{mpsc, oneshot, Mutex}; use crate::daemon::{ events::{Event, Reply}, settings::Settings, + signals::Signal, DaemonResult, }; -use crate::dbus::interface::NetBuzzertKordophoneRepository as DbusRepository; -use crate::dbus::interface::NetBuzzertKordophoneSettings as DbusSettings; +use crate::dbus::endpoint::DbusRegistry; +use crate::dbus::interface; +use crate::dbus::interface::signals as DbusSignals; +use dbus_tokio::connection; #[derive(Clone)] -pub struct ServerImpl { +pub struct DBusAgent { event_sink: mpsc::Sender, + signal_receiver: Arc>>>, } -impl ServerImpl { - pub fn new(event_sink: mpsc::Sender) -> Self { +impl DBusAgent { + pub fn new(event_sink: mpsc::Sender, signal_receiver: mpsc::Receiver) -> Self { Self { - event_sink: event_sink, + event_sink, + signal_receiver: Arc::new(Mutex::new(Some(signal_receiver))), } } - pub async fn send_event( - &self, - make_event: impl FnOnce(Reply) -> Event, - ) -> DaemonResult { + pub async fn run(self) { + // Establish a session bus connection. + let (resource, connection) = connection::new_session_sync().expect("Failed to connect to session bus"); + + // Ensure the D-Bus resource is polled. + tokio::spawn(async move { + let err = resource.await; + panic!("Lost connection to D-Bus: {:?}", err); + }); + + // Claim well-known bus name. + connection + .request_name(interface::NAME, false, true, false) + .await + .expect("Unable to acquire D-Bus name"); + + // Registry for objects & signals. + let dbus_registry = DbusRegistry::new(connection.clone()); + + // Register our object implementation. + let implementation = self.clone(); + dbus_registry.register_object(interface::OBJECT_PATH, implementation, |cr| { + vec![ + interface::register_net_buzzert_kordophone_repository(cr), + interface::register_net_buzzert_kordophone_settings(cr), + ] + }); + + // Spawn task that forwards daemon signals to D-Bus. + { + let registry = dbus_registry.clone(); + let receiver_arc = self.signal_receiver.clone(); + tokio::spawn(async move { + let mut receiver = receiver_arc + .lock() + .await + .take() + .expect("Signal receiver already taken"); + + while let Some(signal) = receiver.recv().await { + match signal { + Signal::ConversationsUpdated => { + log::debug!("Sending signal: ConversationsUpdated"); + registry + .send_signal(interface::OBJECT_PATH, DbusSignals::ConversationsUpdated {}) + .unwrap_or_else(|_| { + log::error!("Failed to send signal"); + 0 + }); + } + Signal::MessagesUpdated(conversation_id) => { + log::debug!( + "Sending signal: MessagesUpdated for conversation {}", + conversation_id + ); + registry + .send_signal( + interface::OBJECT_PATH, + DbusSignals::MessagesUpdated { conversation_id }, + ) + .unwrap_or_else(|_| { + log::error!("Failed to send signal"); + 0 + }); + } + Signal::AttachmentDownloaded(attachment_id) => { + log::debug!( + "Sending signal: AttachmentDownloaded for attachment {}", + attachment_id + ); + registry + .send_signal( + interface::OBJECT_PATH, + DbusSignals::AttachmentDownloadCompleted { attachment_id }, + ) + .unwrap_or_else(|_| { + log::error!("Failed to send signal"); + 0 + }); + } + Signal::AttachmentUploaded(upload_guid, attachment_guid) => { + log::debug!( + "Sending signal: AttachmentUploaded for upload {}, attachment {}", + upload_guid, attachment_guid + ); + registry + .send_signal( + interface::OBJECT_PATH, + DbusSignals::AttachmentUploadCompleted { + upload_guid, + attachment_guid, + }, + ) + .unwrap_or_else(|_| { + log::error!("Failed to send signal"); + 0 + }); + } + Signal::UpdateStreamReconnected => { + log::debug!("Sending signal: UpdateStreamReconnected"); + registry + .send_signal( + interface::OBJECT_PATH, + DbusSignals::UpdateStreamReconnected {}, + ) + .unwrap_or_else(|_| { + log::error!("Failed to send signal"); + 0 + }); + } + } + } + }); + } + + // Keep running forever. + std::future::pending::<()>().await; + } + + pub async fn send_event(&self, make_event: impl FnOnce(Reply) -> Event) -> DaemonResult { let (reply_tx, reply_rx) = oneshot::channel(); self.event_sink .send(make_event(reply_tx)) @@ -49,17 +169,21 @@ impl ServerImpl { } } -impl DbusRepository for ServerImpl { +// +// D-Bus repository interface implementation +// + +use crate::dbus::interface::NetBuzzertKordophoneRepository as DbusRepository; +use crate::dbus::interface::NetBuzzertKordophoneSettings as DbusSettings; + +impl DbusRepository for DBusAgent { fn get_version(&mut self) -> Result { self.send_event_sync(Event::GetVersion) } - fn get_conversations( - &mut self, - limit: i32, - offset: i32, - ) -> Result, dbus::MethodErr> { - self.send_event_sync(|r| Event::GetAllConversations(limit, offset, r)) + fn get_conversations(&mut self, limit: i32, offset: i32) -> Result, MethodErr> { + self + .send_event_sync(|r| Event::GetAllConversations(limit, offset, r)) .map(|conversations| { conversations .into_iter() @@ -97,30 +221,27 @@ impl DbusRepository for ServerImpl { }) } - fn sync_conversation_list(&mut self) -> Result<(), dbus::MethodErr> { + fn sync_conversation_list(&mut self) -> Result<(), MethodErr> { self.send_event_sync(Event::SyncConversationList) } - fn sync_all_conversations(&mut self) -> Result<(), dbus::MethodErr> { + fn sync_all_conversations(&mut self) -> Result<(), MethodErr> { self.send_event_sync(Event::SyncAllConversations) } - fn sync_conversation(&mut self, conversation_id: String) -> Result<(), dbus::MethodErr> { + fn sync_conversation(&mut self, conversation_id: String) -> Result<(), MethodErr> { self.send_event_sync(|r| Event::SyncConversation(conversation_id, r)) } - fn get_messages( - &mut self, - conversation_id: String, - last_message_id: String, - ) -> Result, dbus::MethodErr> { + fn get_messages(&mut self, conversation_id: String, last_message_id: String) -> Result, MethodErr> { let last_message_id_opt = if last_message_id.is_empty() { None } else { Some(last_message_id) }; - self.send_event_sync(|r| Event::GetMessages(conversation_id, last_message_id_opt, r)) + self + .send_event_sync(|r| Event::GetMessages(conversation_id, last_message_id_opt, r)) .map(|messages| { messages .into_iter() @@ -128,9 +249,7 @@ impl DbusRepository for ServerImpl { let mut map = arg::PropMap::new(); map.insert("id".into(), arg::Variant(Box::new(msg.id))); - // xxx: Remove the attachment placeholder here. - // This is not the ideal place to do this, but once we start using ChatItems instead of IMMessages - // from the server, we shouldn't be seeing these placeholders. + // Remove the attachment placeholder here. let text = msg.text.replace("\u{FFFC}", ""); map.insert("text".into(), arg::Variant(Box::new(text))); @@ -143,7 +262,7 @@ impl DbusRepository for ServerImpl { arg::Variant(Box::new(msg.sender.display_name())), ); - // Add attachments array + // Attachments array let attachments: Vec = msg .attachments .into_iter() @@ -154,7 +273,7 @@ impl DbusRepository for ServerImpl { arg::Variant(Box::new(attachment.guid.clone())), ); - // Get attachment paths and download status + // Paths and download status let path = attachment.get_path_for_preview(false); let preview_path = attachment.get_path_for_preview(true); let downloaded = attachment.is_downloaded(false); @@ -166,9 +285,7 @@ impl DbusRepository for ServerImpl { ); attachment_map.insert( "preview_path".into(), - arg::Variant(Box::new( - preview_path.to_string_lossy().to_string(), - )), + arg::Variant(Box::new(preview_path.to_string_lossy().to_string())), ); attachment_map.insert( "downloaded".into(), @@ -179,14 +296,12 @@ impl DbusRepository for ServerImpl { arg::Variant(Box::new(preview_downloaded)), ); - // Add metadata if present + // Metadata if let Some(ref metadata) = attachment.metadata { let mut metadata_map = arg::PropMap::new(); - // Add attribution_info if present if let Some(ref attribution_info) = metadata.attribution_info { let mut attribution_map = arg::PropMap::new(); - if let Some(width) = attribution_info.width { attribution_map.insert( "width".into(), @@ -199,7 +314,6 @@ impl DbusRepository for ServerImpl { arg::Variant(Box::new(height as i32)), ); } - metadata_map.insert( "attribution_info".into(), arg::Variant(Box::new(attribution_map)), @@ -217,14 +331,13 @@ impl DbusRepository for ServerImpl { .collect(); map.insert("attachments".into(), arg::Variant(Box::new(attachments))); - map }) .collect() }) } - fn delete_all_conversations(&mut self) -> Result<(), dbus::MethodErr> { + fn delete_all_conversations(&mut self) -> Result<(), MethodErr> { self.send_event_sync(Event::DeleteAllConversations) } @@ -233,55 +346,44 @@ impl DbusRepository for ServerImpl { conversation_id: String, text: String, attachment_guids: Vec, - ) -> Result { - self.send_event_sync(|r| Event::SendMessage(conversation_id, text, attachment_guids, r)) + ) -> Result { + self + .send_event_sync(|r| Event::SendMessage(conversation_id, text, attachment_guids, r)) .map(|uuid| uuid.to_string()) } - fn get_attachment_info( - &mut self, - attachment_id: String, - ) -> Result<(String, String, bool, bool), dbus::MethodErr> { - self.send_event_sync(|r| Event::GetAttachment(attachment_id, r)) - .map(|attachment| { - let path = attachment.get_path_for_preview(false); - let downloaded = attachment.is_downloaded(false); - - let preview_path = attachment.get_path_for_preview(true); - let preview_downloaded = attachment.is_downloaded(true); - - ( - // - path: string - path.to_string_lossy().to_string(), - // - preview_path: string - preview_path.to_string_lossy().to_string(), - // - downloaded: boolean - downloaded, - // - preview_downloaded: boolean - preview_downloaded, - ) - }) + fn get_attachment_info(&mut self, attachment_id: String) -> Result<(String, String, bool, bool), MethodErr> { + self.send_event_sync(|r| Event::GetAttachment(attachment_id, r)).map(|attachment| { + let path = attachment.get_path_for_preview(false); + let downloaded = attachment.is_downloaded(false); + let preview_path = attachment.get_path_for_preview(true); + let preview_downloaded = attachment.is_downloaded(true); + ( + path.to_string_lossy().to_string(), + preview_path.to_string_lossy().to_string(), + downloaded, + preview_downloaded, + ) + }) } - fn download_attachment( - &mut self, - attachment_id: String, - preview: bool, - ) -> Result<(), dbus::MethodErr> { - // For now, just trigger the download event - we'll implement the actual download logic later + fn download_attachment(&mut self, attachment_id: String, preview: bool) -> Result<(), MethodErr> { self.send_event_sync(|r| Event::DownloadAttachment(attachment_id, preview, r)) } - fn upload_attachment(&mut self, path: String) -> Result { + fn upload_attachment(&mut self, path: String) -> Result { use std::path::PathBuf; - let path = PathBuf::from(path); self.send_event_sync(|r| Event::UploadAttachment(path, r)) } } -impl DbusSettings for ServerImpl { - fn set_server(&mut self, url: String, user: String) -> Result<(), dbus::MethodErr> { +// +// D-Bus settings interface implementation. +// + +impl DbusSettings for DBusAgent { + fn set_server(&mut self, url: String, user: String) -> Result<(), MethodErr> { self.send_event_sync(|r| { Event::UpdateSettings( Settings { @@ -294,12 +396,12 @@ impl DbusSettings for ServerImpl { }) } - fn server_url(&self) -> Result { + fn server_url(&self) -> Result { self.send_event_sync(Event::GetAllSettings) .map(|settings| settings.server_url.unwrap_or_default()) } - fn set_server_url(&self, value: String) -> Result<(), dbus::MethodErr> { + fn set_server_url(&self, value: String) -> Result<(), MethodErr> { self.send_event_sync(|r| { Event::UpdateSettings( Settings { @@ -312,12 +414,12 @@ impl DbusSettings for ServerImpl { }) } - fn username(&self) -> Result { + fn username(&self) -> Result { self.send_event_sync(Event::GetAllSettings) .map(|settings| settings.username.unwrap_or_default()) } - fn set_username(&self, value: String) -> Result<(), dbus::MethodErr> { + fn set_username(&self, value: String) -> Result<(), MethodErr> { self.send_event_sync(|r| { Event::UpdateSettings( Settings { @@ -331,14 +433,15 @@ impl DbusSettings for ServerImpl { } } +// +// Helper utilities. +// + fn run_sync_future(f: F) -> Result where T: Send, F: Future + Send, { - // We use `scope` here to ensure that the thread is joined before the - // function returns. This allows us to capture references of values that - // have lifetimes shorter than 'static, which is what thread::spawn requires. thread::scope(move |s| { s.spawn(move || { let rt = tokio::runtime::Builder::new_current_thread() @@ -352,4 +455,4 @@ where .join() }) .expect("Error joining runtime thread") -} +} \ No newline at end of file diff --git a/kordophoned/src/dbus/mod.rs b/kordophoned/src/dbus/mod.rs index 5efd0d2..1bc9930 100644 --- a/kordophoned/src/dbus/mod.rs +++ b/kordophoned/src/dbus/mod.rs @@ -1,5 +1,5 @@ pub mod endpoint; -pub mod server_impl; +pub mod agent; pub mod interface { #![allow(unused)] @@ -10,10 +10,10 @@ pub mod interface { include!(concat!(env!("OUT_DIR"), "/kordophone-server.rs")); pub mod signals { - pub use crate::interface::NetBuzzertKordophoneRepositoryAttachmentDownloadCompleted as AttachmentDownloadCompleted; - pub use crate::interface::NetBuzzertKordophoneRepositoryAttachmentUploadCompleted as AttachmentUploadCompleted; - pub use crate::interface::NetBuzzertKordophoneRepositoryConversationsUpdated as ConversationsUpdated; - pub use crate::interface::NetBuzzertKordophoneRepositoryMessagesUpdated as MessagesUpdated; - pub use crate::interface::NetBuzzertKordophoneRepositoryUpdateStreamReconnected as UpdateStreamReconnected; + pub use super::NetBuzzertKordophoneRepositoryAttachmentDownloadCompleted as AttachmentDownloadCompleted; + pub use super::NetBuzzertKordophoneRepositoryAttachmentUploadCompleted as AttachmentUploadCompleted; + pub use super::NetBuzzertKordophoneRepositoryConversationsUpdated as ConversationsUpdated; + pub use super::NetBuzzertKordophoneRepositoryMessagesUpdated as MessagesUpdated; + pub use super::NetBuzzertKordophoneRepositoryUpdateStreamReconnected as UpdateStreamReconnected; } } diff --git a/kordophoned/src/main.rs b/kordophoned/src/main.rs index 3ee1608..43a1b29 100644 --- a/kordophoned/src/main.rs +++ b/kordophoned/src/main.rs @@ -4,13 +4,9 @@ mod dbus; use log::LevelFilter; use std::future; -use daemon::signals::Signal; use daemon::Daemon; -use dbus::endpoint::DbusRegistry; -use dbus::interface; -use dbus::server_impl::ServerImpl; -use dbus_tokio::connection; +use dbus::agent::DBusAgent; fn initialize_logging() { // Weird: is this the best way to do this? @@ -31,128 +27,20 @@ async fn main() { // Create the daemon let mut daemon = Daemon::new() .map_err(|e| { - log::error!("Failed to start daemon: {}", e); + log::error!("Failed to initialize daemon: {}", e); std::process::exit(1); }) .unwrap(); - // Initialize dbus session connection - let (resource, connection) = connection::new_session_sync().unwrap(); - - // The resource is a task that should be spawned onto a tokio compatible - // reactor ASAP. If the resource ever finishes, you lost connection to D-Bus. - // - // To shut down the connection, both call _handle.abort() and drop the connection. - let _handle = tokio::spawn(async { - let err = resource.await; - panic!("Lost connection to D-Bus: {}", err); - }); - - // Acquire the name - connection - .request_name(interface::NAME, false, true, false) - .await - .expect("Unable to acquire dbus name"); - - // Create shared D-Bus registry - let dbus_registry = DbusRegistry::new(connection.clone()); - - // Create and register server implementation - let server = ServerImpl::new(daemon.event_sender.clone()); - - dbus_registry.register_object(interface::OBJECT_PATH, server, |cr| { - vec![ - interface::register_net_buzzert_kordophone_repository(cr), - interface::register_net_buzzert_kordophone_settings(cr), - ] - }); - - let mut signal_receiver = daemon.obtain_signal_receiver(); + // Start the D-Bus agent (events in, signals out). + let agent = DBusAgent::new(daemon.event_sender.clone(), daemon.obtain_signal_receiver()); tokio::spawn(async move { - use dbus::interface::signals as DbusSignals; - - while let Some(signal) = signal_receiver.recv().await { - match signal { - Signal::ConversationsUpdated => { - log::debug!("Sending signal: ConversationsUpdated"); - dbus_registry - .send_signal(interface::OBJECT_PATH, DbusSignals::ConversationsUpdated {}) - .unwrap_or_else(|_| { - log::error!("Failed to send signal"); - 0 - }); - } - - Signal::MessagesUpdated(conversation_id) => { - log::debug!( - "Sending signal: MessagesUpdated for conversation {}", - conversation_id - ); - dbus_registry - .send_signal( - interface::OBJECT_PATH, - DbusSignals::MessagesUpdated { conversation_id }, - ) - .unwrap_or_else(|_| { - log::error!("Failed to send signal"); - 0 - }); - } - - Signal::AttachmentDownloaded(attachment_id) => { - log::debug!( - "Sending signal: AttachmentDownloaded for attachment {}", - attachment_id - ); - dbus_registry - .send_signal( - interface::OBJECT_PATH, - DbusSignals::AttachmentDownloadCompleted { attachment_id }, - ) - .unwrap_or_else(|_| { - log::error!("Failed to send signal"); - 0 - }); - } - - Signal::AttachmentUploaded(upload_guid, attachment_guid) => { - log::debug!( - "Sending signal: AttachmentUploaded for upload {}, attachment {}", - upload_guid, - attachment_guid - ); - dbus_registry - .send_signal( - interface::OBJECT_PATH, - DbusSignals::AttachmentUploadCompleted { - upload_guid, - attachment_guid, - }, - ) - .unwrap_or_else(|_| { - log::error!("Failed to send signal"); - 0 - }); - } - - Signal::UpdateStreamReconnected => { - log::debug!("Sending signal: UpdateStreamReconnected"); - dbus_registry - .send_signal( - interface::OBJECT_PATH, - DbusSignals::UpdateStreamReconnected {}, - ) - .unwrap_or_else(|_| { - log::error!("Failed to send signal"); - 0 - }); - } - } - } + agent.run().await; }); + // Run the main daemon loop. daemon.run().await; + // Keep the process alive as long as any background tasks are running. future::pending::<()>().await; - unreachable!() }