Started to factor out DbusRegistry from Endpoint
This commit is contained in:
@@ -1,5 +1,5 @@
|
|||||||
use log::info;
|
use log::info;
|
||||||
use std::sync::Arc;
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
use dbus::{
|
use dbus::{
|
||||||
channel::{MatchingReceiver, Sender},
|
channel::{MatchingReceiver, Sender},
|
||||||
@@ -9,6 +9,72 @@ use dbus::{
|
|||||||
};
|
};
|
||||||
use dbus_crossroads::Crossroads;
|
use dbus_crossroads::Crossroads;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct DbusRegistry {
|
||||||
|
connection: Arc<SyncConnection>,
|
||||||
|
crossroads: Arc<Mutex<Crossroads>>,
|
||||||
|
message_handler_started: Arc<Mutex<bool>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DbusRegistry {
|
||||||
|
pub fn new(connection: Arc<SyncConnection>) -> Self {
|
||||||
|
let mut cr = Crossroads::new();
|
||||||
|
// Enable async support for the crossroads instance.
|
||||||
|
// (Currently irrelevant since dbus generates sync code)
|
||||||
|
cr.set_async_support(Some((
|
||||||
|
connection.clone(),
|
||||||
|
Box::new(|x| {
|
||||||
|
tokio::spawn(x);
|
||||||
|
}),
|
||||||
|
)));
|
||||||
|
|
||||||
|
Self {
|
||||||
|
connection,
|
||||||
|
crossroads: Arc::new(Mutex::new(cr)),
|
||||||
|
message_handler_started: Arc::new(Mutex::new(false)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn register_object<T, F, R>(&self, path: &str, implementation: T, register_fn: F)
|
||||||
|
where
|
||||||
|
T: Send + Clone + 'static,
|
||||||
|
F: Fn(&mut Crossroads) -> R,
|
||||||
|
R: IntoIterator<Item = dbus_crossroads::IfaceToken<T>>,
|
||||||
|
{
|
||||||
|
let dbus_path = String::from(path);
|
||||||
|
|
||||||
|
let mut cr = self.crossroads.lock().unwrap();
|
||||||
|
let tokens: Vec<_> = register_fn(&mut cr).into_iter().collect();
|
||||||
|
cr.insert(dbus_path, &tokens, implementation);
|
||||||
|
|
||||||
|
// Start message handler if not already started
|
||||||
|
let mut handler_started = self.message_handler_started.lock().unwrap();
|
||||||
|
if !*handler_started {
|
||||||
|
let crossroads_clone = self.crossroads.clone();
|
||||||
|
self.connection.start_receive(
|
||||||
|
MatchRule::new_method_call(),
|
||||||
|
Box::new(move |msg, conn| {
|
||||||
|
let mut cr = crossroads_clone.lock().unwrap();
|
||||||
|
cr.handle_message(msg, conn).is_ok()
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
*handler_started = true;
|
||||||
|
info!(target: "dbus", "Started D-Bus message handler");
|
||||||
|
}
|
||||||
|
|
||||||
|
info!(target: "dbus", "Registered object at {} with {} interfaces", path, tokens.len());
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn send_signal<S>(&self, path: &str, signal: S) -> Result<u32, ()>
|
||||||
|
where
|
||||||
|
S: dbus::message::SignalArgs + dbus::arg::AppendAll,
|
||||||
|
{
|
||||||
|
let message = signal.to_emit_message(&Path::new(path).unwrap());
|
||||||
|
self.connection.send(message)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Keep the old Endpoint struct for backward compatibility during transition
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct Endpoint<T: Send + Clone + 'static> {
|
pub struct Endpoint<T: Send + Clone + 'static> {
|
||||||
connection: Arc<SyncConnection>,
|
connection: Arc<SyncConnection>,
|
||||||
|
|||||||
@@ -1,8 +1,6 @@
|
|||||||
use dbus::arg;
|
use dbus::arg;
|
||||||
use dbus::nonblock::SyncConnection;
|
|
||||||
use dbus_tree::MethodErr;
|
use dbus_tree::MethodErr;
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
use std::sync::Arc;
|
|
||||||
use std::thread;
|
use std::thread;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use tokio::sync::oneshot;
|
use tokio::sync::oneshot;
|
||||||
@@ -13,24 +11,22 @@ use crate::daemon::{
|
|||||||
Attachment, DaemonResult,
|
Attachment, DaemonResult,
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::dbus::endpoint::Endpoint;
|
use crate::dbus::endpoint::DbusRegistry;
|
||||||
use crate::dbus::interface::NetBuzzertKordophoneAttachment as DbusAttachment;
|
use crate::dbus::interface::NetBuzzertKordophoneAttachment as DbusAttachment;
|
||||||
use crate::dbus::interface::NetBuzzertKordophoneRepository as DbusRepository;
|
use crate::dbus::interface::NetBuzzertKordophoneRepository as DbusRepository;
|
||||||
use crate::dbus::interface::NetBuzzertKordophoneSettings as DbusSettings;
|
use crate::dbus::interface::NetBuzzertKordophoneSettings as DbusSettings;
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct ServerImpl {
|
pub struct ServerImpl {
|
||||||
connection: Arc<SyncConnection>,
|
|
||||||
event_sink: mpsc::Sender<Event>,
|
event_sink: mpsc::Sender<Event>,
|
||||||
attachment_objects: Vec<Endpoint<Attachment>>,
|
dbus_registry: DbusRegistry,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ServerImpl {
|
impl ServerImpl {
|
||||||
pub fn new(connection: Arc<SyncConnection>, event_sink: mpsc::Sender<Event>) -> Self {
|
pub fn new(event_sink: mpsc::Sender<Event>, dbus_registry: DbusRegistry) -> Self {
|
||||||
Self {
|
Self {
|
||||||
connection: connection,
|
|
||||||
event_sink: event_sink,
|
event_sink: event_sink,
|
||||||
attachment_objects: vec![],
|
dbus_registry: dbus_registry,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -175,13 +171,11 @@ impl DbusRepository for ServerImpl {
|
|||||||
let obj_path = format!("/net/buzzert/kordophonecd/attachments/{}", &id);
|
let obj_path = format!("/net/buzzert/kordophonecd/attachments/{}", &id);
|
||||||
log::trace!("Registering attachment at path: {}", &obj_path);
|
log::trace!("Registering attachment at path: {}", &obj_path);
|
||||||
|
|
||||||
let endpoint = Endpoint::new(self.connection.clone(), attachment);
|
self.dbus_registry.register_object(
|
||||||
run_sync_future(endpoint.register_object(obj_path.as_str(), |cr| {
|
&obj_path,
|
||||||
vec![interface::register_net_buzzert_kordophone_attachment(cr)]
|
attachment,
|
||||||
}))?;
|
|cr| vec![interface::register_net_buzzert_kordophone_attachment(cr)]
|
||||||
|
);
|
||||||
self.attachment_objects.push(endpoint);
|
|
||||||
log::trace!("Attachment objects: {:?}", self.attachment_objects.len());
|
|
||||||
|
|
||||||
Ok(obj_path.into())
|
Ok(obj_path.into())
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -5,9 +5,9 @@ use log::LevelFilter;
|
|||||||
use std::future;
|
use std::future;
|
||||||
|
|
||||||
use daemon::signals::Signal;
|
use daemon::signals::Signal;
|
||||||
use daemon::{Attachment, Daemon};
|
use daemon::Daemon;
|
||||||
|
|
||||||
use dbus::endpoint::Endpoint as DbusEndpoint;
|
use dbus::endpoint::DbusRegistry;
|
||||||
use dbus::interface;
|
use dbus::interface;
|
||||||
use dbus::server_impl::ServerImpl;
|
use dbus::server_impl::ServerImpl;
|
||||||
use dbus_tokio::connection;
|
use dbus_tokio::connection;
|
||||||
@@ -54,32 +54,20 @@ async fn main() {
|
|||||||
.await
|
.await
|
||||||
.expect("Unable to acquire dbus name");
|
.expect("Unable to acquire dbus name");
|
||||||
|
|
||||||
let attachment = Attachment {
|
// Create shared D-Bus registry
|
||||||
guid: "asdf".into(),
|
let dbus_registry = DbusRegistry::new(connection.clone());
|
||||||
path: "/dev/null".into(),
|
|
||||||
downloaded: false,
|
|
||||||
};
|
|
||||||
|
|
||||||
let att_endpoint = DbusEndpoint::new(connection.clone(), attachment);
|
// Create and register server implementation
|
||||||
att_endpoint
|
let server = ServerImpl::new(daemon.event_sender.clone(), dbus_registry.clone());
|
||||||
.register_object("/net/buzzert/kordophonecd/attachments/test", |cr| {
|
|
||||||
vec![interface::register_net_buzzert_kordophone_attachment(cr)]
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
|
|
||||||
// Create the server implementation
|
dbus_registry.register_object(
|
||||||
let server = ServerImpl::new(connection.clone(), daemon.event_sender.clone());
|
interface::OBJECT_PATH,
|
||||||
|
server,
|
||||||
// Register DBus interfaces with endpoint
|
|cr| vec![
|
||||||
let endpoint = DbusEndpoint::new(connection.clone(), server);
|
|
||||||
endpoint
|
|
||||||
.register_object(interface::OBJECT_PATH, |cr| {
|
|
||||||
vec![
|
|
||||||
interface::register_net_buzzert_kordophone_repository(cr),
|
interface::register_net_buzzert_kordophone_repository(cr),
|
||||||
interface::register_net_buzzert_kordophone_settings(cr),
|
interface::register_net_buzzert_kordophone_settings(cr),
|
||||||
]
|
]
|
||||||
})
|
);
|
||||||
.await;
|
|
||||||
|
|
||||||
let mut signal_receiver = daemon.obtain_signal_receiver();
|
let mut signal_receiver = daemon.obtain_signal_receiver();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
@@ -89,7 +77,7 @@ async fn main() {
|
|||||||
match signal {
|
match signal {
|
||||||
Signal::ConversationsUpdated => {
|
Signal::ConversationsUpdated => {
|
||||||
log::debug!("Sending signal: ConversationsUpdated");
|
log::debug!("Sending signal: ConversationsUpdated");
|
||||||
endpoint
|
dbus_registry
|
||||||
.send_signal(interface::OBJECT_PATH, DbusSignals::ConversationsUpdated {})
|
.send_signal(interface::OBJECT_PATH, DbusSignals::ConversationsUpdated {})
|
||||||
.unwrap_or_else(|_| {
|
.unwrap_or_else(|_| {
|
||||||
log::error!("Failed to send signal");
|
log::error!("Failed to send signal");
|
||||||
@@ -102,7 +90,7 @@ async fn main() {
|
|||||||
"Sending signal: MessagesUpdated for conversation {}",
|
"Sending signal: MessagesUpdated for conversation {}",
|
||||||
conversation_id
|
conversation_id
|
||||||
);
|
);
|
||||||
endpoint
|
dbus_registry
|
||||||
.send_signal(
|
.send_signal(
|
||||||
interface::OBJECT_PATH,
|
interface::OBJECT_PATH,
|
||||||
DbusSignals::MessagesUpdated { conversation_id },
|
DbusSignals::MessagesUpdated { conversation_id },
|
||||||
|
|||||||
Reference in New Issue
Block a user