diff --git a/Cargo.lock b/Cargo.lock
index a32f8de..8a78e5e 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -574,9 +574,20 @@ dependencies = [
[[package]]
name = "futures-core"
-version = "0.3.30"
+version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d"
+checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
+
+[[package]]
+name = "futures-macro"
+version = "0.3.31"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
[[package]]
name = "futures-sink"
@@ -586,20 +597,22 @@ checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5"
[[package]]
name = "futures-task"
-version = "0.3.30"
+version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004"
+checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988"
[[package]]
name = "futures-util"
-version = "0.3.30"
+version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48"
+checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
dependencies = [
"futures-core",
+ "futures-macro",
"futures-task",
"pin-project-lite",
"pin-utils",
+ "slab",
]
[[package]]
@@ -849,6 +862,7 @@ dependencies = [
"dbus-tree",
"directories",
"env_logger",
+ "futures-util",
"kordophone",
"kordophone-db",
"log",
diff --git a/kordophoned/Cargo.toml b/kordophoned/Cargo.toml
index d5c6ac7..e14a322 100644
--- a/kordophoned/Cargo.toml
+++ b/kordophoned/Cargo.toml
@@ -11,6 +11,7 @@ dbus-tokio = "0.7.6"
dbus-tree = "0.9.2"
directories = "6.0.0"
env_logger = "0.11.6"
+futures-util = "0.3.31"
kordophone = { path = "../kordophone" }
kordophone-db = { path = "../kordophone-db" }
log = "0.4.25"
diff --git a/kordophoned/include/net.buzzert.kordophonecd.Server.xml b/kordophoned/include/net.buzzert.kordophonecd.Server.xml
index 760eda9..9f8bc32 100644
--- a/kordophoned/include/net.buzzert.kordophonecd.Server.xml
+++ b/kordophoned/include/net.buzzert.kordophonecd.Server.xml
@@ -18,10 +18,7 @@
-
-
-
diff --git a/kordophoned/src/daemon/mod.rs b/kordophoned/src/daemon/mod.rs
index 17282f1..d98882c 100644
--- a/kordophoned/src/daemon/mod.rs
+++ b/kordophoned/src/daemon/mod.rs
@@ -1,6 +1,7 @@
mod settings;
use settings::Settings;
+use std::sync::mpsc;
use directories::ProjectDirs;
use std::path::PathBuf;
use anyhow::Result;
@@ -19,6 +20,10 @@ use kordophone::api::{
TokenManagement,
};
+pub enum Event {
+ SyncAllConversations,
+}
+
#[derive(Debug, Error)]
pub enum DaemonError {
#[error("Client Not Configured")]
@@ -87,6 +92,16 @@ impl Daemon {
Ok(settings)
}
+ pub async fn handle_event(&mut self, event: Event) {
+ match event {
+ Event::SyncAllConversations => {
+ self.sync_all_conversations().await.unwrap_or_else(|e| {
+ log::error!("Error handling sync event: {}", e);
+ });
+ }
+ }
+ }
+
fn get_client(&mut self) -> Result {
let settings = self.database.with_settings(|s|
Settings::from_db(s)
@@ -132,4 +147,3 @@ impl TokenManagement for &mut Daemon {
self.database.set_token(token);
}
}
-
diff --git a/kordophoned/src/dbus/server_impl.rs b/kordophoned/src/dbus/server_impl.rs
index c957c3a..0595d51 100644
--- a/kordophoned/src/dbus/server_impl.rs
+++ b/kordophoned/src/dbus/server_impl.rs
@@ -1,63 +1,69 @@
use dbus::arg;
use dbus_tree::MethodErr;
-use std::sync::{Arc, Mutex, MutexGuard};
+use std::sync::Arc;
+use tokio::sync::{Mutex, MutexGuard};
use std::future::Future;
use std::thread;
+use std::sync::mpsc;
+use futures_util::future::FutureExt;
-use crate::daemon::Daemon;
+use crate::daemon::{Daemon, Event};
use crate::dbus::interface::NetBuzzertKordophoneRepository as DbusRepository;
use crate::dbus::interface::NetBuzzertKordophoneSettings as DbusSettings;
#[derive(Clone)]
pub struct ServerImpl {
daemon: Arc>,
+ event_sender: mpsc::Sender,
}
impl ServerImpl {
- pub fn new(daemon: Arc>) -> Self {
- Self { daemon }
+ pub fn new(daemon: Arc>, event_sender: mpsc::Sender) -> Self {
+ Self { daemon, event_sender }
}
- pub fn get_daemon(&self) -> Result, MethodErr> {
- self.daemon.lock().map_err(|_| MethodErr::failed("Failed to lock daemon"))
+ pub async fn get_daemon(&self) -> MutexGuard<'_, Daemon> {
+ self.daemon.lock().await // .map_err(|_| MethodErr::failed("Failed to lock daemon"))
+ }
+
+ pub fn daemon_then(&self, f: F) -> Result
+ where F: FnOnce(MutexGuard<'_, Daemon>) -> T + Send,
+ T: Send,
+ {
+ run_sync_future(self.get_daemon().then(|daemon| async move {
+ f(daemon)
+ }))
}
}
impl DbusRepository for ServerImpl {
fn get_version(&mut self) -> Result {
- let daemon = self.get_daemon()?;
- Ok(daemon.version.clone())
+ self.daemon_then(|daemon| daemon.version.clone())
}
fn get_conversations(&mut self) -> Result, dbus::MethodErr> {
- // Get a repository instance and use it to fetch conversations
- let mut daemon = self.get_daemon()?;
- let conversations = daemon.get_conversations();
-
- // Convert conversations to DBus property maps
- let result = conversations.into_iter().map(|conv| {
- let mut map = arg::PropMap::new();
- map.insert("guid".into(), arg::Variant(Box::new(conv.guid)));
- map.insert("display_name".into(), arg::Variant(Box::new(conv.display_name.unwrap_or_default())));
- map.insert("unread_count".into(), arg::Variant(Box::new(conv.unread_count as i32)));
- map
- }).collect();
-
- Ok(result)
+ self.daemon_then(|mut daemon| {
+ let conversations = daemon.get_conversations();
+
+ // Convert conversations to DBus property maps
+ let result = conversations.into_iter().map(|conv| {
+ let mut map = arg::PropMap::new();
+ map.insert("guid".into(), arg::Variant(Box::new(conv.guid)));
+ map.insert("display_name".into(), arg::Variant(Box::new(conv.display_name.unwrap_or_default())));
+ map.insert("unread_count".into(), arg::Variant(Box::new(conv.unread_count as i32)));
+ map
+ }).collect();
+
+ Ok(result)
+ })?
}
- fn sync_all_conversations(&mut self) -> Result {
- let mut daemon = self.get_daemon()?;
+ fn sync_all_conversations(&mut self) -> Result<(), dbus::MethodErr> {
+ self.event_sender.send(Event::SyncAllConversations).unwrap_or_else(|e| {
+ log::error!("Error sending sync event: {}", e);
+ });
- // TODO: We don't actually probably want to block here.
- run_sync_future(daemon.sync_all_conversations())
- .unwrap()
- .map_err(|e| {
- log::error!("Failed to sync conversations: {}", e);
- MethodErr::failed(&format!("Failed to sync conversations: {}", e))
- })?;
-
- Ok(true)
+ Ok(())
}
}
diff --git a/kordophoned/src/main.rs b/kordophoned/src/main.rs
index e6772e5..0fd1987 100644
--- a/kordophoned/src/main.rs
+++ b/kordophoned/src/main.rs
@@ -2,9 +2,12 @@ mod dbus;
mod daemon;
use std::future;
-use std::sync::{Arc, Mutex};
+use std::sync::mpsc;
use log::LevelFilter;
+use std::sync::Arc;
+use tokio::sync::Mutex;
+
use daemon::Daemon;
use dbus::endpoint::Endpoint as DbusEndpoint;
use dbus::interface;
@@ -21,6 +24,8 @@ fn initialize_logging() {
async fn main() {
initialize_logging();
+ let (sender, receiver) = mpsc::channel::();
+
// Create the daemon
let daemon = Arc::new(
Mutex::new(
@@ -34,7 +39,7 @@ async fn main() {
);
// Create the server implementation
- let server = ServerImpl::new(daemon);
+ let server = ServerImpl::new(daemon.clone(), sender);
// Register DBus interfaces with endpoint
let endpoint = DbusEndpoint::new(server.clone());
@@ -49,6 +54,13 @@ async fn main() {
}
).await;
+ tokio::spawn(async move {
+ for event in receiver {
+ // Important! Only lock the daemon when handling events.
+ daemon.lock().await.handle_event(event).await;
+ }
+ });
+
future::pending::<()>().await;
unreachable!()
}
diff --git a/kpcli/src/daemon/mod.rs b/kpcli/src/daemon/mod.rs
index 8c89165..7a96180 100644
--- a/kpcli/src/daemon/mod.rs
+++ b/kpcli/src/daemon/mod.rs
@@ -64,7 +64,7 @@ impl DaemonCli {
pub async fn sync_conversations(&mut self) -> Result<()> {
let success = KordophoneRepository::sync_all_conversations(&self.proxy())?;
- println!("Synced conversations: {}", success);
+ println!("Initiated sync");
Ok(())
}
}
\ No newline at end of file