use dbus::arg; use dbus_tree::MethodErr; use tokio::sync::mpsc; use std::future::Future; use std::thread; use tokio::sync::oneshot; use crate::daemon::{ DaemonResult, events::{Event, Reply}, settings::Settings, }; use crate::dbus::interface::NetBuzzertKordophoneRepository as DbusRepository; use crate::dbus::interface::NetBuzzertKordophoneSettings as DbusSettings; #[derive(Clone)] pub struct ServerImpl { event_sink: mpsc::Sender, } impl ServerImpl { pub fn new(event_sink: mpsc::Sender) -> Self { Self { event_sink } } pub async fn send_event( &self, make_event: impl FnOnce(Reply) -> Event, ) -> DaemonResult { let (reply_tx, reply_rx) = oneshot::channel(); self.event_sink.send(make_event(reply_tx)) .await .map_err(|_| "Failed to send event")?; reply_rx.await.map_err(|_| "Failed to receive reply".into()) } pub fn send_event_sync( &self, make_event: impl FnOnce(Reply) -> Event + Send, ) -> Result { run_sync_future(self.send_event(make_event)) .unwrap() .map_err(|e| MethodErr::failed(&format!("Daemon error: {}", e))) } } impl DbusRepository for ServerImpl { fn get_version(&mut self) -> Result { self.send_event_sync(Event::GetVersion) } fn get_conversations(&mut self) -> Result, dbus::MethodErr> { self.send_event_sync(Event::GetAllConversations) .map(|conversations| { // Convert conversations to DBus property maps conversations.into_iter().map(|conv| { let mut map = arg::PropMap::new(); map.insert("guid".into(), arg::Variant(Box::new(conv.guid))); map.insert("display_name".into(), arg::Variant(Box::new(conv.display_name.unwrap_or_default()))); map.insert("unread_count".into(), arg::Variant(Box::new(conv.unread_count as i32))); map.insert("last_message_preview".into(), arg::Variant(Box::new(conv.last_message_preview.unwrap_or_default()))); map.insert("participants".into(), arg::Variant(Box::new(conv.participants.into_iter().map(|p| p.display_name()).collect::>()))); map.insert("date".into(), arg::Variant(Box::new(conv.date.and_utc().timestamp()))); map }).collect() }) } fn sync_all_conversations(&mut self) -> Result<(), dbus::MethodErr> { self.send_event_sync(Event::SyncAllConversations) } fn sync_conversation(&mut self, conversation_id: String) -> Result<(), dbus::MethodErr> { self.send_event_sync(|r| Event::SyncConversation(conversation_id, r)) } fn get_messages(&mut self, conversation_id: String, last_message_id: String) -> Result, dbus::MethodErr> { let last_message_id_opt = if last_message_id.is_empty() { None } else { Some(last_message_id) }; self.send_event_sync(|r| Event::GetMessages(conversation_id, last_message_id_opt, r)) .map(|messages| { messages.into_iter().map(|msg| { let mut map = arg::PropMap::new(); map.insert("id".into(), arg::Variant(Box::new(msg.id))); map.insert("text".into(), arg::Variant(Box::new(msg.text))); map.insert("date".into(), arg::Variant(Box::new(msg.date.and_utc().timestamp()))); map.insert("sender".into(), arg::Variant(Box::new(msg.sender.display_name()))); map }).collect() }) } } impl DbusSettings for ServerImpl { fn set_server(&mut self, url: String, user: String) -> Result<(), dbus::MethodErr> { self.send_event_sync(|r| Event::UpdateSettings(Settings { server_url: Some(url), username: Some(user), credential_item: None, token: None, }, r) ) } fn server_url(&self) -> Result { self.send_event_sync(Event::GetAllSettings) .map(|settings| settings.server_url.unwrap_or_default()) } fn set_server_url(&self, value: String) -> Result<(), dbus::MethodErr> { self.send_event_sync(|r| Event::UpdateSettings(Settings { server_url: Some(value), username: None, credential_item: None, token: None, }, r) ) } fn username(&self) -> Result { self.send_event_sync(Event::GetAllSettings) .map(|settings| settings.username.unwrap_or_default()) } fn set_username(&self, value: String) -> Result<(), dbus::MethodErr> { self.send_event_sync(|r| Event::UpdateSettings(Settings { server_url: None, username: Some(value), credential_item: None, token: None, }, r) ) } fn credential_item(&self) -> Result, dbus::MethodErr> { self.send_event_sync(Event::GetAllSettings) .map(|settings| settings.credential_item.unwrap_or_default()).map(|item| dbus::Path::new(item).unwrap_or_default()) } fn set_credential_item(&self, value: dbus::Path<'static>) -> Result<(), dbus::MethodErr> { self.send_event_sync(|r| Event::UpdateSettings(Settings { server_url: None, username: None, credential_item: Some(value.to_string()), token: None, }, r) ) } } fn run_sync_future(f: F) -> Result where T: Send, F: Future + Send, { // We use `scope` here to ensure that the thread is joined before the // function returns. This allows us to capture references of values that // have lifetimes shorter than 'static, which is what thread::spawn requires. thread::scope(move |s| { s.spawn(move || { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .map_err(|_| MethodErr::failed("Unable to create tokio runtime"))?; let result = rt.block_on(f); Ok(result) }) .join() }) .expect("Error joining runtime thread") }