diff --git a/kordophoned/src/daemon/events.rs b/kordophoned/src/daemon/events.rs index 2b20d6b..287f188 100644 --- a/kordophoned/src/daemon/events.rs +++ b/kordophoned/src/daemon/events.rs @@ -1,11 +1,12 @@ use tokio::sync::oneshot; use uuid::Uuid; -use kordophone_db::models::{Conversation, Message}; use kordophone::model::ConversationID; use kordophone::model::OutgoingMessage; +use kordophone_db::models::{Conversation, Message}; use crate::daemon::settings::Settings; +use crate::daemon::Attachment; pub type Reply = oneshot::Sender; @@ -17,14 +18,14 @@ pub enum Event { /// Asynchronous event for syncing the conversation list with the server. SyncConversationList(Reply<()>), - /// Asynchronous event for syncing all conversations with the server. + /// Asynchronous event for syncing all conversations with the server. SyncAllConversations(Reply<()>), /// Asynchronous event for syncing a single conversation with the server. SyncConversation(String, Reply<()>), /// Returns all known conversations from the database. - /// Parameters: + /// Parameters: /// - limit: The maximum number of conversations to return. (-1 for no limit) /// - offset: The offset into the conversation list to start returning conversations from. GetAllConversations(i32, i32, Reply>), @@ -36,27 +37,31 @@ pub enum Event { UpdateSettings(Settings, Reply<()>), /// Returns all messages for a conversation from the database. - /// Parameters: + /// Parameters: /// - conversation_id: The ID of the conversation to get messages for. /// - last_message_id: (optional) The ID of the last message to get. If None, all messages are returned. GetMessages(String, Option, Reply>), /// Enqueues a message to be sent to the server. - /// Parameters: + /// Parameters: /// - conversation_id: The ID of the conversation to send the message to. /// - text: The text of the message to send. /// - reply: The outgoing message ID (not the server-assigned message ID). SendMessage(String, String, Reply), /// Notifies the daemon that a message has been sent. - /// Parameters: + /// Parameters: /// - message: The message that was sent. /// - outgoing_message: The outgoing message that was sent. /// - conversation_id: The ID of the conversation that the message was sent to. MessageSent(Message, OutgoingMessage, ConversationID), + /// Gets an attachment object from the attachment store. + /// Parameters: + /// - guid: The attachment guid + /// - reply: Reply of the attachment object, if known. + GetAttachment(String, Reply), + /// Delete all conversations from the database. DeleteAllConversations(Reply<()>), } - - diff --git a/kordophoned/src/daemon/mod.rs b/kordophoned/src/daemon/mod.rs index ce950b8..5cee910 100644 --- a/kordophoned/src/daemon/mod.rs +++ b/kordophoned/src/daemon/mod.rs @@ -279,6 +279,11 @@ impl Daemon { .await .unwrap(); } + + Event::GetAttachment(guid, reply) => { + let attachment = self.attachment_store.get_attachment(&guid); + reply.send(attachment).unwrap(); + } } } diff --git a/kordophoned/src/dbus/endpoint.rs b/kordophoned/src/dbus/endpoint.rs index a35f371..ddef080 100644 --- a/kordophoned/src/dbus/endpoint.rs +++ b/kordophoned/src/dbus/endpoint.rs @@ -1,60 +1,38 @@ use log::info; use std::sync::Arc; -use dbus_crossroads::Crossroads; -use dbus_tokio::connection; use dbus::{ + channel::{MatchingReceiver, Sender}, message::MatchRule, nonblock::SyncConnection, - channel::{Sender, MatchingReceiver}, Path, }; +use dbus_crossroads::Crossroads; +#[derive(Clone)] pub struct Endpoint { connection: Arc, implementation: T, } impl Endpoint { - pub fn new(implementation: T) -> Self { - let (resource, connection) = connection::new_session_sync().unwrap(); - - // The resource is a task that should be spawned onto a tokio compatible - // reactor ASAP. If the resource ever finishes, you lost connection to D-Bus. - // - // To shut down the connection, both call _handle.abort() and drop the connection. - let _handle = tokio::spawn(async { - let err = resource.await; - panic!("Lost connection to D-Bus: {}", err); - }); - - Self { - connection, - implementation + pub fn new(connection: Arc, implementation: T) -> Self { + Self { + connection, + implementation, } } - pub async fn register( - &self, - name: &str, - path: &str, - register_fn: F - ) + pub async fn register_object(&self, path: &str, register_fn: F) where F: Fn(&mut Crossroads) -> R, R: IntoIterator>, { let dbus_path = String::from(path); - self.connection - .request_name(name, false, true, false) - .await - .expect("Unable to acquire dbus name"); - - let mut cr = Crossroads::new(); - - // Enable async support for the crossroads instance. + // Enable async support for the crossroads instance. // (Currently irrelevant since dbus generates sync code) + let mut cr = Crossroads::new(); cr.set_async_support(Some(( self.connection.clone(), Box::new(|x| { @@ -69,9 +47,7 @@ impl Endpoint { // Start receiving messages. self.connection.start_receive( MatchRule::new_method_call(), - Box::new(move |msg, conn| - cr.handle_message(msg, conn).is_ok() - ), + Box::new(move |msg, conn| cr.handle_message(msg, conn).is_ok()), ); info!(target: "dbus", "Registered endpoint at {} with {} interfaces", path, tokens.len()); diff --git a/kordophoned/src/dbus/server_impl.rs b/kordophoned/src/dbus/server_impl.rs index 955f325..8f853db 100644 --- a/kordophoned/src/dbus/server_impl.rs +++ b/kordophoned/src/dbus/server_impl.rs @@ -1,6 +1,8 @@ use dbus::arg; +use dbus::nonblock::SyncConnection; use dbus_tree::MethodErr; use std::future::Future; +use std::sync::Arc; use std::thread; use tokio::sync::mpsc; use tokio::sync::oneshot; @@ -11,18 +13,25 @@ use crate::daemon::{ Attachment, DaemonResult, }; +use crate::dbus::endpoint::Endpoint; use crate::dbus::interface::NetBuzzertKordophoneAttachment as DbusAttachment; use crate::dbus::interface::NetBuzzertKordophoneRepository as DbusRepository; use crate::dbus::interface::NetBuzzertKordophoneSettings as DbusSettings; #[derive(Clone)] pub struct ServerImpl { + connection: Arc, event_sink: mpsc::Sender, + attachment_objects: Vec>, } impl ServerImpl { - pub fn new(event_sink: mpsc::Sender) -> Self { - Self { event_sink } + pub fn new(connection: Arc, event_sink: mpsc::Sender) -> Self { + Self { + connection: connection, + event_sink: event_sink, + attachment_objects: vec![], + } } pub async fn send_event( @@ -158,7 +167,24 @@ impl DbusRepository for ServerImpl { &mut self, attachment_id: String, ) -> Result, dbus::MethodErr> { - todo!() + use crate::dbus::interface; + + self.send_event_sync(|r| Event::GetAttachment(attachment_id.clone(), r)) + .and_then(|attachment| { + let id: &str = attachment_id.split("-").take(1).last().unwrap(); + let obj_path = format!("/net/buzzert/kordophonecd/attachments/{}", &id); + log::trace!("Registering attachment at path: {}", &obj_path); + + let endpoint = Endpoint::new(self.connection.clone(), attachment); + run_sync_future(endpoint.register_object(obj_path.as_str(), |cr| { + vec![interface::register_net_buzzert_kordophone_attachment(cr)] + }))?; + + self.attachment_objects.push(endpoint); + log::trace!("Attachment objects: {:?}", self.attachment_objects.len()); + + Ok(obj_path.into()) + }) } } diff --git a/kordophoned/src/main.rs b/kordophoned/src/main.rs index d3806da..4a0f65d 100644 --- a/kordophoned/src/main.rs +++ b/kordophoned/src/main.rs @@ -1,15 +1,16 @@ -mod dbus; mod daemon; +mod dbus; -use std::future; use log::LevelFilter; +use std::future; -use daemon::Daemon; use daemon::signals::Signal; +use daemon::{Attachment, Daemon}; use dbus::endpoint::Endpoint as DbusEndpoint; use dbus::interface; use dbus::server_impl::ServerImpl; +use dbus_tokio::connection; fn initialize_logging() { // Weird: is this the best way to do this? @@ -17,7 +18,7 @@ fn initialize_logging() { .map(|s| s.parse::().unwrap_or(LevelFilter::Info)) .unwrap_or(LevelFilter::Info); - env_logger::Builder::from_default_env() + env_logger::Builder::from_default_env() .format_timestamp_secs() .filter_level(log_level) .init(); @@ -35,21 +36,50 @@ async fn main() { }) .unwrap(); + // Initialize dbus session connection + let (resource, connection) = connection::new_session_sync().unwrap(); + + // The resource is a task that should be spawned onto a tokio compatible + // reactor ASAP. If the resource ever finishes, you lost connection to D-Bus. + // + // To shut down the connection, both call _handle.abort() and drop the connection. + let _handle = tokio::spawn(async { + let err = resource.await; + panic!("Lost connection to D-Bus: {}", err); + }); + + // Acquire the name + connection + .request_name(interface::NAME, false, true, false) + .await + .expect("Unable to acquire dbus name"); + + let attachment = Attachment { + guid: "asdf".into(), + path: "/dev/null".into(), + downloaded: false, + }; + + let att_endpoint = DbusEndpoint::new(connection.clone(), attachment); + att_endpoint + .register_object("/net/buzzert/kordophonecd/attachments/test", |cr| { + vec![interface::register_net_buzzert_kordophone_attachment(cr)] + }) + .await; + // Create the server implementation - let server = ServerImpl::new(daemon.event_sender.clone()); + let server = ServerImpl::new(connection.clone(), daemon.event_sender.clone()); // Register DBus interfaces with endpoint - let endpoint = DbusEndpoint::new(server); - endpoint.register( - interface::NAME, - interface::OBJECT_PATH, - |cr| { + let endpoint = DbusEndpoint::new(connection.clone(), server); + endpoint + .register_object(interface::OBJECT_PATH, |cr| { vec![ interface::register_net_buzzert_kordophone_repository(cr), - interface::register_net_buzzert_kordophone_settings(cr) + interface::register_net_buzzert_kordophone_settings(cr), ] - } - ).await; + }) + .await; let mut signal_receiver = daemon.obtain_signal_receiver(); tokio::spawn(async move { @@ -59,7 +89,8 @@ async fn main() { match signal { Signal::ConversationsUpdated => { log::debug!("Sending signal: ConversationsUpdated"); - endpoint.send_signal(interface::OBJECT_PATH, DbusSignals::ConversationsUpdated{}) + endpoint + .send_signal(interface::OBJECT_PATH, DbusSignals::ConversationsUpdated {}) .unwrap_or_else(|_| { log::error!("Failed to send signal"); 0 @@ -67,8 +98,15 @@ async fn main() { } Signal::MessagesUpdated(conversation_id) => { - log::debug!("Sending signal: MessagesUpdated for conversation {}", conversation_id); - endpoint.send_signal(interface::OBJECT_PATH, DbusSignals::MessagesUpdated{ conversation_id }) + log::debug!( + "Sending signal: MessagesUpdated for conversation {}", + conversation_id + ); + endpoint + .send_signal( + interface::OBJECT_PATH, + DbusSignals::MessagesUpdated { conversation_id }, + ) .unwrap_or_else(|_| { log::error!("Failed to send signal"); 0