Private
Public Access
1
0

Refactor: serverimpl -> dbus::agent, clean up main.rs

This commit is contained in:
2025-06-18 01:03:14 -07:00
parent 032573d23b
commit fa6c7c50b7
3 changed files with 203 additions and 212 deletions

View File

@@ -1,35 +1,155 @@
use dbus::arg; use dbus::arg;
use dbus_tree::MethodErr; use dbus_tree::MethodErr;
use std::future::Future; use std::{future::Future, thread};
use std::thread; use std::sync::Arc;
use tokio::sync::mpsc; use tokio::sync::{mpsc, oneshot, Mutex};
use tokio::sync::oneshot;
use crate::daemon::{ use crate::daemon::{
events::{Event, Reply}, events::{Event, Reply},
settings::Settings, settings::Settings,
signals::Signal,
DaemonResult, DaemonResult,
}; };
use crate::dbus::interface::NetBuzzertKordophoneRepository as DbusRepository; use crate::dbus::endpoint::DbusRegistry;
use crate::dbus::interface::NetBuzzertKordophoneSettings as DbusSettings; use crate::dbus::interface;
use crate::dbus::interface::signals as DbusSignals;
use dbus_tokio::connection;
#[derive(Clone)] #[derive(Clone)]
pub struct ServerImpl { pub struct DBusAgent {
event_sink: mpsc::Sender<Event>, event_sink: mpsc::Sender<Event>,
signal_receiver: Arc<Mutex<Option<mpsc::Receiver<Signal>>>>,
} }
impl ServerImpl { impl DBusAgent {
pub fn new(event_sink: mpsc::Sender<Event>) -> Self { pub fn new(event_sink: mpsc::Sender<Event>, signal_receiver: mpsc::Receiver<Signal>) -> Self {
Self { Self {
event_sink: event_sink, event_sink,
signal_receiver: Arc::new(Mutex::new(Some(signal_receiver))),
} }
} }
pub async fn send_event<T>( pub async fn run(self) {
&self, // Establish a session bus connection.
make_event: impl FnOnce(Reply<T>) -> Event, let (resource, connection) = connection::new_session_sync().expect("Failed to connect to session bus");
) -> DaemonResult<T> {
// Ensure the D-Bus resource is polled.
tokio::spawn(async move {
let err = resource.await;
panic!("Lost connection to D-Bus: {:?}", err);
});
// Claim well-known bus name.
connection
.request_name(interface::NAME, false, true, false)
.await
.expect("Unable to acquire D-Bus name");
// Registry for objects & signals.
let dbus_registry = DbusRegistry::new(connection.clone());
// Register our object implementation.
let implementation = self.clone();
dbus_registry.register_object(interface::OBJECT_PATH, implementation, |cr| {
vec![
interface::register_net_buzzert_kordophone_repository(cr),
interface::register_net_buzzert_kordophone_settings(cr),
]
});
// Spawn task that forwards daemon signals to D-Bus.
{
let registry = dbus_registry.clone();
let receiver_arc = self.signal_receiver.clone();
tokio::spawn(async move {
let mut receiver = receiver_arc
.lock()
.await
.take()
.expect("Signal receiver already taken");
while let Some(signal) = receiver.recv().await {
match signal {
Signal::ConversationsUpdated => {
log::debug!("Sending signal: ConversationsUpdated");
registry
.send_signal(interface::OBJECT_PATH, DbusSignals::ConversationsUpdated {})
.unwrap_or_else(|_| {
log::error!("Failed to send signal");
0
});
}
Signal::MessagesUpdated(conversation_id) => {
log::debug!(
"Sending signal: MessagesUpdated for conversation {}",
conversation_id
);
registry
.send_signal(
interface::OBJECT_PATH,
DbusSignals::MessagesUpdated { conversation_id },
)
.unwrap_or_else(|_| {
log::error!("Failed to send signal");
0
});
}
Signal::AttachmentDownloaded(attachment_id) => {
log::debug!(
"Sending signal: AttachmentDownloaded for attachment {}",
attachment_id
);
registry
.send_signal(
interface::OBJECT_PATH,
DbusSignals::AttachmentDownloadCompleted { attachment_id },
)
.unwrap_or_else(|_| {
log::error!("Failed to send signal");
0
});
}
Signal::AttachmentUploaded(upload_guid, attachment_guid) => {
log::debug!(
"Sending signal: AttachmentUploaded for upload {}, attachment {}",
upload_guid, attachment_guid
);
registry
.send_signal(
interface::OBJECT_PATH,
DbusSignals::AttachmentUploadCompleted {
upload_guid,
attachment_guid,
},
)
.unwrap_or_else(|_| {
log::error!("Failed to send signal");
0
});
}
Signal::UpdateStreamReconnected => {
log::debug!("Sending signal: UpdateStreamReconnected");
registry
.send_signal(
interface::OBJECT_PATH,
DbusSignals::UpdateStreamReconnected {},
)
.unwrap_or_else(|_| {
log::error!("Failed to send signal");
0
});
}
}
}
});
}
// Keep running forever.
std::future::pending::<()>().await;
}
pub async fn send_event<T>(&self, make_event: impl FnOnce(Reply<T>) -> Event) -> DaemonResult<T> {
let (reply_tx, reply_rx) = oneshot::channel(); let (reply_tx, reply_rx) = oneshot::channel();
self.event_sink self.event_sink
.send(make_event(reply_tx)) .send(make_event(reply_tx))
@@ -49,17 +169,21 @@ impl ServerImpl {
} }
} }
impl DbusRepository for ServerImpl { //
// D-Bus repository interface implementation
//
use crate::dbus::interface::NetBuzzertKordophoneRepository as DbusRepository;
use crate::dbus::interface::NetBuzzertKordophoneSettings as DbusSettings;
impl DbusRepository for DBusAgent {
fn get_version(&mut self) -> Result<String, MethodErr> { fn get_version(&mut self) -> Result<String, MethodErr> {
self.send_event_sync(Event::GetVersion) self.send_event_sync(Event::GetVersion)
} }
fn get_conversations( fn get_conversations(&mut self, limit: i32, offset: i32) -> Result<Vec<arg::PropMap>, MethodErr> {
&mut self, self
limit: i32, .send_event_sync(|r| Event::GetAllConversations(limit, offset, r))
offset: i32,
) -> Result<Vec<arg::PropMap>, dbus::MethodErr> {
self.send_event_sync(|r| Event::GetAllConversations(limit, offset, r))
.map(|conversations| { .map(|conversations| {
conversations conversations
.into_iter() .into_iter()
@@ -97,30 +221,27 @@ impl DbusRepository for ServerImpl {
}) })
} }
fn sync_conversation_list(&mut self) -> Result<(), dbus::MethodErr> { fn sync_conversation_list(&mut self) -> Result<(), MethodErr> {
self.send_event_sync(Event::SyncConversationList) self.send_event_sync(Event::SyncConversationList)
} }
fn sync_all_conversations(&mut self) -> Result<(), dbus::MethodErr> { fn sync_all_conversations(&mut self) -> Result<(), MethodErr> {
self.send_event_sync(Event::SyncAllConversations) self.send_event_sync(Event::SyncAllConversations)
} }
fn sync_conversation(&mut self, conversation_id: String) -> Result<(), dbus::MethodErr> { fn sync_conversation(&mut self, conversation_id: String) -> Result<(), MethodErr> {
self.send_event_sync(|r| Event::SyncConversation(conversation_id, r)) self.send_event_sync(|r| Event::SyncConversation(conversation_id, r))
} }
fn get_messages( fn get_messages(&mut self, conversation_id: String, last_message_id: String) -> Result<Vec<arg::PropMap>, MethodErr> {
&mut self,
conversation_id: String,
last_message_id: String,
) -> Result<Vec<arg::PropMap>, dbus::MethodErr> {
let last_message_id_opt = if last_message_id.is_empty() { let last_message_id_opt = if last_message_id.is_empty() {
None None
} else { } else {
Some(last_message_id) Some(last_message_id)
}; };
self.send_event_sync(|r| Event::GetMessages(conversation_id, last_message_id_opt, r)) self
.send_event_sync(|r| Event::GetMessages(conversation_id, last_message_id_opt, r))
.map(|messages| { .map(|messages| {
messages messages
.into_iter() .into_iter()
@@ -128,9 +249,7 @@ impl DbusRepository for ServerImpl {
let mut map = arg::PropMap::new(); let mut map = arg::PropMap::new();
map.insert("id".into(), arg::Variant(Box::new(msg.id))); map.insert("id".into(), arg::Variant(Box::new(msg.id)));
// xxx: Remove the attachment placeholder here. // Remove the attachment placeholder here.
// This is not the ideal place to do this, but once we start using ChatItems instead of IMMessages
// from the server, we shouldn't be seeing these placeholders.
let text = msg.text.replace("\u{FFFC}", ""); let text = msg.text.replace("\u{FFFC}", "");
map.insert("text".into(), arg::Variant(Box::new(text))); map.insert("text".into(), arg::Variant(Box::new(text)));
@@ -143,7 +262,7 @@ impl DbusRepository for ServerImpl {
arg::Variant(Box::new(msg.sender.display_name())), arg::Variant(Box::new(msg.sender.display_name())),
); );
// Add attachments array // Attachments array
let attachments: Vec<arg::PropMap> = msg let attachments: Vec<arg::PropMap> = msg
.attachments .attachments
.into_iter() .into_iter()
@@ -154,7 +273,7 @@ impl DbusRepository for ServerImpl {
arg::Variant(Box::new(attachment.guid.clone())), arg::Variant(Box::new(attachment.guid.clone())),
); );
// Get attachment paths and download status // Paths and download status
let path = attachment.get_path_for_preview(false); let path = attachment.get_path_for_preview(false);
let preview_path = attachment.get_path_for_preview(true); let preview_path = attachment.get_path_for_preview(true);
let downloaded = attachment.is_downloaded(false); let downloaded = attachment.is_downloaded(false);
@@ -166,9 +285,7 @@ impl DbusRepository for ServerImpl {
); );
attachment_map.insert( attachment_map.insert(
"preview_path".into(), "preview_path".into(),
arg::Variant(Box::new( arg::Variant(Box::new(preview_path.to_string_lossy().to_string())),
preview_path.to_string_lossy().to_string(),
)),
); );
attachment_map.insert( attachment_map.insert(
"downloaded".into(), "downloaded".into(),
@@ -179,14 +296,12 @@ impl DbusRepository for ServerImpl {
arg::Variant(Box::new(preview_downloaded)), arg::Variant(Box::new(preview_downloaded)),
); );
// Add metadata if present // Metadata
if let Some(ref metadata) = attachment.metadata { if let Some(ref metadata) = attachment.metadata {
let mut metadata_map = arg::PropMap::new(); let mut metadata_map = arg::PropMap::new();
// Add attribution_info if present
if let Some(ref attribution_info) = metadata.attribution_info { if let Some(ref attribution_info) = metadata.attribution_info {
let mut attribution_map = arg::PropMap::new(); let mut attribution_map = arg::PropMap::new();
if let Some(width) = attribution_info.width { if let Some(width) = attribution_info.width {
attribution_map.insert( attribution_map.insert(
"width".into(), "width".into(),
@@ -199,7 +314,6 @@ impl DbusRepository for ServerImpl {
arg::Variant(Box::new(height as i32)), arg::Variant(Box::new(height as i32)),
); );
} }
metadata_map.insert( metadata_map.insert(
"attribution_info".into(), "attribution_info".into(),
arg::Variant(Box::new(attribution_map)), arg::Variant(Box::new(attribution_map)),
@@ -217,14 +331,13 @@ impl DbusRepository for ServerImpl {
.collect(); .collect();
map.insert("attachments".into(), arg::Variant(Box::new(attachments))); map.insert("attachments".into(), arg::Variant(Box::new(attachments)));
map map
}) })
.collect() .collect()
}) })
} }
fn delete_all_conversations(&mut self) -> Result<(), dbus::MethodErr> { fn delete_all_conversations(&mut self) -> Result<(), MethodErr> {
self.send_event_sync(Event::DeleteAllConversations) self.send_event_sync(Event::DeleteAllConversations)
} }
@@ -233,55 +346,44 @@ impl DbusRepository for ServerImpl {
conversation_id: String, conversation_id: String,
text: String, text: String,
attachment_guids: Vec<String>, attachment_guids: Vec<String>,
) -> Result<String, dbus::MethodErr> { ) -> Result<String, MethodErr> {
self.send_event_sync(|r| Event::SendMessage(conversation_id, text, attachment_guids, r)) self
.send_event_sync(|r| Event::SendMessage(conversation_id, text, attachment_guids, r))
.map(|uuid| uuid.to_string()) .map(|uuid| uuid.to_string())
} }
fn get_attachment_info( fn get_attachment_info(&mut self, attachment_id: String) -> Result<(String, String, bool, bool), MethodErr> {
&mut self, self.send_event_sync(|r| Event::GetAttachment(attachment_id, r)).map(|attachment| {
attachment_id: String,
) -> Result<(String, String, bool, bool), dbus::MethodErr> {
self.send_event_sync(|r| Event::GetAttachment(attachment_id, r))
.map(|attachment| {
let path = attachment.get_path_for_preview(false); let path = attachment.get_path_for_preview(false);
let downloaded = attachment.is_downloaded(false); let downloaded = attachment.is_downloaded(false);
let preview_path = attachment.get_path_for_preview(true); let preview_path = attachment.get_path_for_preview(true);
let preview_downloaded = attachment.is_downloaded(true); let preview_downloaded = attachment.is_downloaded(true);
( (
// - path: string
path.to_string_lossy().to_string(), path.to_string_lossy().to_string(),
// - preview_path: string
preview_path.to_string_lossy().to_string(), preview_path.to_string_lossy().to_string(),
// - downloaded: boolean
downloaded, downloaded,
// - preview_downloaded: boolean
preview_downloaded, preview_downloaded,
) )
}) })
} }
fn download_attachment( fn download_attachment(&mut self, attachment_id: String, preview: bool) -> Result<(), MethodErr> {
&mut self,
attachment_id: String,
preview: bool,
) -> Result<(), dbus::MethodErr> {
// For now, just trigger the download event - we'll implement the actual download logic later
self.send_event_sync(|r| Event::DownloadAttachment(attachment_id, preview, r)) self.send_event_sync(|r| Event::DownloadAttachment(attachment_id, preview, r))
} }
fn upload_attachment(&mut self, path: String) -> Result<String, dbus::MethodErr> { fn upload_attachment(&mut self, path: String) -> Result<String, MethodErr> {
use std::path::PathBuf; use std::path::PathBuf;
let path = PathBuf::from(path); let path = PathBuf::from(path);
self.send_event_sync(|r| Event::UploadAttachment(path, r)) self.send_event_sync(|r| Event::UploadAttachment(path, r))
} }
} }
impl DbusSettings for ServerImpl { //
fn set_server(&mut self, url: String, user: String) -> Result<(), dbus::MethodErr> { // D-Bus settings interface implementation.
//
impl DbusSettings for DBusAgent {
fn set_server(&mut self, url: String, user: String) -> Result<(), MethodErr> {
self.send_event_sync(|r| { self.send_event_sync(|r| {
Event::UpdateSettings( Event::UpdateSettings(
Settings { Settings {
@@ -294,12 +396,12 @@ impl DbusSettings for ServerImpl {
}) })
} }
fn server_url(&self) -> Result<String, dbus::MethodErr> { fn server_url(&self) -> Result<String, MethodErr> {
self.send_event_sync(Event::GetAllSettings) self.send_event_sync(Event::GetAllSettings)
.map(|settings| settings.server_url.unwrap_or_default()) .map(|settings| settings.server_url.unwrap_or_default())
} }
fn set_server_url(&self, value: String) -> Result<(), dbus::MethodErr> { fn set_server_url(&self, value: String) -> Result<(), MethodErr> {
self.send_event_sync(|r| { self.send_event_sync(|r| {
Event::UpdateSettings( Event::UpdateSettings(
Settings { Settings {
@@ -312,12 +414,12 @@ impl DbusSettings for ServerImpl {
}) })
} }
fn username(&self) -> Result<String, dbus::MethodErr> { fn username(&self) -> Result<String, MethodErr> {
self.send_event_sync(Event::GetAllSettings) self.send_event_sync(Event::GetAllSettings)
.map(|settings| settings.username.unwrap_or_default()) .map(|settings| settings.username.unwrap_or_default())
} }
fn set_username(&self, value: String) -> Result<(), dbus::MethodErr> { fn set_username(&self, value: String) -> Result<(), MethodErr> {
self.send_event_sync(|r| { self.send_event_sync(|r| {
Event::UpdateSettings( Event::UpdateSettings(
Settings { Settings {
@@ -331,14 +433,15 @@ impl DbusSettings for ServerImpl {
} }
} }
//
// Helper utilities.
//
fn run_sync_future<F, T>(f: F) -> Result<T, MethodErr> fn run_sync_future<F, T>(f: F) -> Result<T, MethodErr>
where where
T: Send, T: Send,
F: Future<Output = T> + Send, F: Future<Output = T> + Send,
{ {
// We use `scope` here to ensure that the thread is joined before the
// function returns. This allows us to capture references of values that
// have lifetimes shorter than 'static, which is what thread::spawn requires.
thread::scope(move |s| { thread::scope(move |s| {
s.spawn(move || { s.spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread() let rt = tokio::runtime::Builder::new_current_thread()

View File

@@ -1,5 +1,5 @@
pub mod endpoint; pub mod endpoint;
pub mod server_impl; pub mod agent;
pub mod interface { pub mod interface {
#![allow(unused)] #![allow(unused)]
@@ -10,10 +10,10 @@ pub mod interface {
include!(concat!(env!("OUT_DIR"), "/kordophone-server.rs")); include!(concat!(env!("OUT_DIR"), "/kordophone-server.rs"));
pub mod signals { pub mod signals {
pub use crate::interface::NetBuzzertKordophoneRepositoryAttachmentDownloadCompleted as AttachmentDownloadCompleted; pub use super::NetBuzzertKordophoneRepositoryAttachmentDownloadCompleted as AttachmentDownloadCompleted;
pub use crate::interface::NetBuzzertKordophoneRepositoryAttachmentUploadCompleted as AttachmentUploadCompleted; pub use super::NetBuzzertKordophoneRepositoryAttachmentUploadCompleted as AttachmentUploadCompleted;
pub use crate::interface::NetBuzzertKordophoneRepositoryConversationsUpdated as ConversationsUpdated; pub use super::NetBuzzertKordophoneRepositoryConversationsUpdated as ConversationsUpdated;
pub use crate::interface::NetBuzzertKordophoneRepositoryMessagesUpdated as MessagesUpdated; pub use super::NetBuzzertKordophoneRepositoryMessagesUpdated as MessagesUpdated;
pub use crate::interface::NetBuzzertKordophoneRepositoryUpdateStreamReconnected as UpdateStreamReconnected; pub use super::NetBuzzertKordophoneRepositoryUpdateStreamReconnected as UpdateStreamReconnected;
} }
} }

View File

@@ -4,13 +4,9 @@ mod dbus;
use log::LevelFilter; use log::LevelFilter;
use std::future; use std::future;
use daemon::signals::Signal;
use daemon::Daemon; use daemon::Daemon;
use dbus::endpoint::DbusRegistry; use dbus::agent::DBusAgent;
use dbus::interface;
use dbus::server_impl::ServerImpl;
use dbus_tokio::connection;
fn initialize_logging() { fn initialize_logging() {
// Weird: is this the best way to do this? // Weird: is this the best way to do this?
@@ -31,128 +27,20 @@ async fn main() {
// Create the daemon // Create the daemon
let mut daemon = Daemon::new() let mut daemon = Daemon::new()
.map_err(|e| { .map_err(|e| {
log::error!("Failed to start daemon: {}", e); log::error!("Failed to initialize daemon: {}", e);
std::process::exit(1); std::process::exit(1);
}) })
.unwrap(); .unwrap();
// Initialize dbus session connection // Start the D-Bus agent (events in, signals out).
let (resource, connection) = connection::new_session_sync().unwrap(); let agent = DBusAgent::new(daemon.event_sender.clone(), daemon.obtain_signal_receiver());
// The resource is a task that should be spawned onto a tokio compatible
// reactor ASAP. If the resource ever finishes, you lost connection to D-Bus.
//
// To shut down the connection, both call _handle.abort() and drop the connection.
let _handle = tokio::spawn(async {
let err = resource.await;
panic!("Lost connection to D-Bus: {}", err);
});
// Acquire the name
connection
.request_name(interface::NAME, false, true, false)
.await
.expect("Unable to acquire dbus name");
// Create shared D-Bus registry
let dbus_registry = DbusRegistry::new(connection.clone());
// Create and register server implementation
let server = ServerImpl::new(daemon.event_sender.clone());
dbus_registry.register_object(interface::OBJECT_PATH, server, |cr| {
vec![
interface::register_net_buzzert_kordophone_repository(cr),
interface::register_net_buzzert_kordophone_settings(cr),
]
});
let mut signal_receiver = daemon.obtain_signal_receiver();
tokio::spawn(async move { tokio::spawn(async move {
use dbus::interface::signals as DbusSignals; agent.run().await;
while let Some(signal) = signal_receiver.recv().await {
match signal {
Signal::ConversationsUpdated => {
log::debug!("Sending signal: ConversationsUpdated");
dbus_registry
.send_signal(interface::OBJECT_PATH, DbusSignals::ConversationsUpdated {})
.unwrap_or_else(|_| {
log::error!("Failed to send signal");
0
});
}
Signal::MessagesUpdated(conversation_id) => {
log::debug!(
"Sending signal: MessagesUpdated for conversation {}",
conversation_id
);
dbus_registry
.send_signal(
interface::OBJECT_PATH,
DbusSignals::MessagesUpdated { conversation_id },
)
.unwrap_or_else(|_| {
log::error!("Failed to send signal");
0
});
}
Signal::AttachmentDownloaded(attachment_id) => {
log::debug!(
"Sending signal: AttachmentDownloaded for attachment {}",
attachment_id
);
dbus_registry
.send_signal(
interface::OBJECT_PATH,
DbusSignals::AttachmentDownloadCompleted { attachment_id },
)
.unwrap_or_else(|_| {
log::error!("Failed to send signal");
0
});
}
Signal::AttachmentUploaded(upload_guid, attachment_guid) => {
log::debug!(
"Sending signal: AttachmentUploaded for upload {}, attachment {}",
upload_guid,
attachment_guid
);
dbus_registry
.send_signal(
interface::OBJECT_PATH,
DbusSignals::AttachmentUploadCompleted {
upload_guid,
attachment_guid,
},
)
.unwrap_or_else(|_| {
log::error!("Failed to send signal");
0
});
}
Signal::UpdateStreamReconnected => {
log::debug!("Sending signal: UpdateStreamReconnected");
dbus_registry
.send_signal(
interface::OBJECT_PATH,
DbusSignals::UpdateStreamReconnected {},
)
.unwrap_or_else(|_| {
log::error!("Failed to send signal");
0
});
}
}
}
}); });
// Run the main daemon loop.
daemon.run().await; daemon.run().await;
// Keep the process alive as long as any background tasks are running.
future::pending::<()>().await; future::pending::<()>().await;
unreachable!()
} }