Private
Public Access
1
0

broken: started working on attachment dbus object, but order of endpoint creation seems to matter, need to reuse more parts

This commit is contained in:
2025-05-25 18:52:18 -07:00
parent 0d4c2e5104
commit c02d4ecdf3
5 changed files with 112 additions and 62 deletions

View File

@@ -1,11 +1,12 @@
use tokio::sync::oneshot; use tokio::sync::oneshot;
use uuid::Uuid; use uuid::Uuid;
use kordophone_db::models::{Conversation, Message};
use kordophone::model::ConversationID; use kordophone::model::ConversationID;
use kordophone::model::OutgoingMessage; use kordophone::model::OutgoingMessage;
use kordophone_db::models::{Conversation, Message};
use crate::daemon::settings::Settings; use crate::daemon::settings::Settings;
use crate::daemon::Attachment;
pub type Reply<T> = oneshot::Sender<T>; pub type Reply<T> = oneshot::Sender<T>;
@@ -55,8 +56,12 @@ pub enum Event {
/// - conversation_id: The ID of the conversation that the message was sent to. /// - conversation_id: The ID of the conversation that the message was sent to.
MessageSent(Message, OutgoingMessage, ConversationID), MessageSent(Message, OutgoingMessage, ConversationID),
/// Gets an attachment object from the attachment store.
/// Parameters:
/// - guid: The attachment guid
/// - reply: Reply of the attachment object, if known.
GetAttachment(String, Reply<Attachment>),
/// Delete all conversations from the database. /// Delete all conversations from the database.
DeleteAllConversations(Reply<()>), DeleteAllConversations(Reply<()>),
} }

View File

@@ -279,6 +279,11 @@ impl Daemon {
.await .await
.unwrap(); .unwrap();
} }
Event::GetAttachment(guid, reply) => {
let attachment = self.attachment_store.get_attachment(&guid);
reply.send(attachment).unwrap();
}
} }
} }

View File

@@ -1,60 +1,38 @@
use log::info; use log::info;
use std::sync::Arc; use std::sync::Arc;
use dbus_crossroads::Crossroads;
use dbus_tokio::connection;
use dbus::{ use dbus::{
channel::{MatchingReceiver, Sender},
message::MatchRule, message::MatchRule,
nonblock::SyncConnection, nonblock::SyncConnection,
channel::{Sender, MatchingReceiver},
Path, Path,
}; };
use dbus_crossroads::Crossroads;
#[derive(Clone)]
pub struct Endpoint<T: Send + Clone + 'static> { pub struct Endpoint<T: Send + Clone + 'static> {
connection: Arc<SyncConnection>, connection: Arc<SyncConnection>,
implementation: T, implementation: T,
} }
impl<T: Send + Clone + 'static> Endpoint<T> { impl<T: Send + Clone + 'static> Endpoint<T> {
pub fn new(implementation: T) -> Self { pub fn new(connection: Arc<SyncConnection>, implementation: T) -> Self {
let (resource, connection) = connection::new_session_sync().unwrap();
// The resource is a task that should be spawned onto a tokio compatible
// reactor ASAP. If the resource ever finishes, you lost connection to D-Bus.
//
// To shut down the connection, both call _handle.abort() and drop the connection.
let _handle = tokio::spawn(async {
let err = resource.await;
panic!("Lost connection to D-Bus: {}", err);
});
Self { Self {
connection, connection,
implementation implementation,
} }
} }
pub async fn register<F, R>( pub async fn register_object<F, R>(&self, path: &str, register_fn: F)
&self,
name: &str,
path: &str,
register_fn: F
)
where where
F: Fn(&mut Crossroads) -> R, F: Fn(&mut Crossroads) -> R,
R: IntoIterator<Item = dbus_crossroads::IfaceToken<T>>, R: IntoIterator<Item = dbus_crossroads::IfaceToken<T>>,
{ {
let dbus_path = String::from(path); let dbus_path = String::from(path);
self.connection
.request_name(name, false, true, false)
.await
.expect("Unable to acquire dbus name");
let mut cr = Crossroads::new();
// Enable async support for the crossroads instance. // Enable async support for the crossroads instance.
// (Currently irrelevant since dbus generates sync code) // (Currently irrelevant since dbus generates sync code)
let mut cr = Crossroads::new();
cr.set_async_support(Some(( cr.set_async_support(Some((
self.connection.clone(), self.connection.clone(),
Box::new(|x| { Box::new(|x| {
@@ -69,9 +47,7 @@ impl<T: Send + Clone + 'static> Endpoint<T> {
// Start receiving messages. // Start receiving messages.
self.connection.start_receive( self.connection.start_receive(
MatchRule::new_method_call(), MatchRule::new_method_call(),
Box::new(move |msg, conn| Box::new(move |msg, conn| cr.handle_message(msg, conn).is_ok()),
cr.handle_message(msg, conn).is_ok()
),
); );
info!(target: "dbus", "Registered endpoint at {} with {} interfaces", path, tokens.len()); info!(target: "dbus", "Registered endpoint at {} with {} interfaces", path, tokens.len());

View File

@@ -1,6 +1,8 @@
use dbus::arg; use dbus::arg;
use dbus::nonblock::SyncConnection;
use dbus_tree::MethodErr; use dbus_tree::MethodErr;
use std::future::Future; use std::future::Future;
use std::sync::Arc;
use std::thread; use std::thread;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::sync::oneshot; use tokio::sync::oneshot;
@@ -11,18 +13,25 @@ use crate::daemon::{
Attachment, DaemonResult, Attachment, DaemonResult,
}; };
use crate::dbus::endpoint::Endpoint;
use crate::dbus::interface::NetBuzzertKordophoneAttachment as DbusAttachment; use crate::dbus::interface::NetBuzzertKordophoneAttachment as DbusAttachment;
use crate::dbus::interface::NetBuzzertKordophoneRepository as DbusRepository; use crate::dbus::interface::NetBuzzertKordophoneRepository as DbusRepository;
use crate::dbus::interface::NetBuzzertKordophoneSettings as DbusSettings; use crate::dbus::interface::NetBuzzertKordophoneSettings as DbusSettings;
#[derive(Clone)] #[derive(Clone)]
pub struct ServerImpl { pub struct ServerImpl {
connection: Arc<SyncConnection>,
event_sink: mpsc::Sender<Event>, event_sink: mpsc::Sender<Event>,
attachment_objects: Vec<Endpoint<Attachment>>,
} }
impl ServerImpl { impl ServerImpl {
pub fn new(event_sink: mpsc::Sender<Event>) -> Self { pub fn new(connection: Arc<SyncConnection>, event_sink: mpsc::Sender<Event>) -> Self {
Self { event_sink } Self {
connection: connection,
event_sink: event_sink,
attachment_objects: vec![],
}
} }
pub async fn send_event<T>( pub async fn send_event<T>(
@@ -158,7 +167,24 @@ impl DbusRepository for ServerImpl {
&mut self, &mut self,
attachment_id: String, attachment_id: String,
) -> Result<dbus::Path<'static>, dbus::MethodErr> { ) -> Result<dbus::Path<'static>, dbus::MethodErr> {
todo!() use crate::dbus::interface;
self.send_event_sync(|r| Event::GetAttachment(attachment_id.clone(), r))
.and_then(|attachment| {
let id: &str = attachment_id.split("-").take(1).last().unwrap();
let obj_path = format!("/net/buzzert/kordophonecd/attachments/{}", &id);
log::trace!("Registering attachment at path: {}", &obj_path);
let endpoint = Endpoint::new(self.connection.clone(), attachment);
run_sync_future(endpoint.register_object(obj_path.as_str(), |cr| {
vec![interface::register_net_buzzert_kordophone_attachment(cr)]
}))?;
self.attachment_objects.push(endpoint);
log::trace!("Attachment objects: {:?}", self.attachment_objects.len());
Ok(obj_path.into())
})
} }
} }

View File

@@ -1,15 +1,16 @@
mod dbus;
mod daemon; mod daemon;
mod dbus;
use std::future;
use log::LevelFilter; use log::LevelFilter;
use std::future;
use daemon::Daemon;
use daemon::signals::Signal; use daemon::signals::Signal;
use daemon::{Attachment, Daemon};
use dbus::endpoint::Endpoint as DbusEndpoint; use dbus::endpoint::Endpoint as DbusEndpoint;
use dbus::interface; use dbus::interface;
use dbus::server_impl::ServerImpl; use dbus::server_impl::ServerImpl;
use dbus_tokio::connection;
fn initialize_logging() { fn initialize_logging() {
// Weird: is this the best way to do this? // Weird: is this the best way to do this?
@@ -35,21 +36,50 @@ async fn main() {
}) })
.unwrap(); .unwrap();
// Initialize dbus session connection
let (resource, connection) = connection::new_session_sync().unwrap();
// The resource is a task that should be spawned onto a tokio compatible
// reactor ASAP. If the resource ever finishes, you lost connection to D-Bus.
//
// To shut down the connection, both call _handle.abort() and drop the connection.
let _handle = tokio::spawn(async {
let err = resource.await;
panic!("Lost connection to D-Bus: {}", err);
});
// Acquire the name
connection
.request_name(interface::NAME, false, true, false)
.await
.expect("Unable to acquire dbus name");
let attachment = Attachment {
guid: "asdf".into(),
path: "/dev/null".into(),
downloaded: false,
};
let att_endpoint = DbusEndpoint::new(connection.clone(), attachment);
att_endpoint
.register_object("/net/buzzert/kordophonecd/attachments/test", |cr| {
vec![interface::register_net_buzzert_kordophone_attachment(cr)]
})
.await;
// Create the server implementation // Create the server implementation
let server = ServerImpl::new(daemon.event_sender.clone()); let server = ServerImpl::new(connection.clone(), daemon.event_sender.clone());
// Register DBus interfaces with endpoint // Register DBus interfaces with endpoint
let endpoint = DbusEndpoint::new(server); let endpoint = DbusEndpoint::new(connection.clone(), server);
endpoint.register( endpoint
interface::NAME, .register_object(interface::OBJECT_PATH, |cr| {
interface::OBJECT_PATH,
|cr| {
vec![ vec![
interface::register_net_buzzert_kordophone_repository(cr), interface::register_net_buzzert_kordophone_repository(cr),
interface::register_net_buzzert_kordophone_settings(cr) interface::register_net_buzzert_kordophone_settings(cr),
] ]
} })
).await; .await;
let mut signal_receiver = daemon.obtain_signal_receiver(); let mut signal_receiver = daemon.obtain_signal_receiver();
tokio::spawn(async move { tokio::spawn(async move {
@@ -59,7 +89,8 @@ async fn main() {
match signal { match signal {
Signal::ConversationsUpdated => { Signal::ConversationsUpdated => {
log::debug!("Sending signal: ConversationsUpdated"); log::debug!("Sending signal: ConversationsUpdated");
endpoint.send_signal(interface::OBJECT_PATH, DbusSignals::ConversationsUpdated{}) endpoint
.send_signal(interface::OBJECT_PATH, DbusSignals::ConversationsUpdated {})
.unwrap_or_else(|_| { .unwrap_or_else(|_| {
log::error!("Failed to send signal"); log::error!("Failed to send signal");
0 0
@@ -67,8 +98,15 @@ async fn main() {
} }
Signal::MessagesUpdated(conversation_id) => { Signal::MessagesUpdated(conversation_id) => {
log::debug!("Sending signal: MessagesUpdated for conversation {}", conversation_id); log::debug!(
endpoint.send_signal(interface::OBJECT_PATH, DbusSignals::MessagesUpdated{ conversation_id }) "Sending signal: MessagesUpdated for conversation {}",
conversation_id
);
endpoint
.send_signal(
interface::OBJECT_PATH,
DbusSignals::MessagesUpdated { conversation_id },
)
.unwrap_or_else(|_| { .unwrap_or_else(|_| {
log::error!("Failed to send signal"); log::error!("Failed to send signal");
0 0