daemon: start working on events. notes:
Probably need to make the locking mechanism more granular. Only lock the database during db writes, see if we can do multiple readers and a single writer. Otherwise, the daemon will not be able to service requests while an event is being handled, which is not good.
This commit is contained in:
26
Cargo.lock
generated
26
Cargo.lock
generated
@@ -574,9 +574,20 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "futures-core"
|
name = "futures-core"
|
||||||
version = "0.3.30"
|
version = "0.3.31"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d"
|
checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "futures-macro"
|
||||||
|
version = "0.3.31"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
|
||||||
|
dependencies = [
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"syn",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "futures-sink"
|
name = "futures-sink"
|
||||||
@@ -586,20 +597,22 @@ checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "futures-task"
|
name = "futures-task"
|
||||||
version = "0.3.30"
|
version = "0.3.31"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004"
|
checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "futures-util"
|
name = "futures-util"
|
||||||
version = "0.3.30"
|
version = "0.3.31"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48"
|
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"futures-core",
|
"futures-core",
|
||||||
|
"futures-macro",
|
||||||
"futures-task",
|
"futures-task",
|
||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
"pin-utils",
|
"pin-utils",
|
||||||
|
"slab",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@@ -849,6 +862,7 @@ dependencies = [
|
|||||||
"dbus-tree",
|
"dbus-tree",
|
||||||
"directories",
|
"directories",
|
||||||
"env_logger",
|
"env_logger",
|
||||||
|
"futures-util",
|
||||||
"kordophone",
|
"kordophone",
|
||||||
"kordophone-db",
|
"kordophone-db",
|
||||||
"log",
|
"log",
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ dbus-tokio = "0.7.6"
|
|||||||
dbus-tree = "0.9.2"
|
dbus-tree = "0.9.2"
|
||||||
directories = "6.0.0"
|
directories = "6.0.0"
|
||||||
env_logger = "0.11.6"
|
env_logger = "0.11.6"
|
||||||
|
futures-util = "0.3.31"
|
||||||
kordophone = { path = "../kordophone" }
|
kordophone = { path = "../kordophone" }
|
||||||
kordophone-db = { path = "../kordophone-db" }
|
kordophone-db = { path = "../kordophone-db" }
|
||||||
log = "0.4.25"
|
log = "0.4.25"
|
||||||
|
|||||||
@@ -18,10 +18,7 @@
|
|||||||
</method>
|
</method>
|
||||||
|
|
||||||
<method name="SyncAllConversations">
|
<method name="SyncAllConversations">
|
||||||
<arg type="b" name="success" direction="out" />
|
|
||||||
</method>
|
</method>
|
||||||
|
|
||||||
|
|
||||||
</interface>
|
</interface>
|
||||||
|
|
||||||
<interface name="net.buzzert.kordophone.Settings">
|
<interface name="net.buzzert.kordophone.Settings">
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
mod settings;
|
mod settings;
|
||||||
use settings::Settings;
|
use settings::Settings;
|
||||||
|
|
||||||
|
use std::sync::mpsc;
|
||||||
use directories::ProjectDirs;
|
use directories::ProjectDirs;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
@@ -19,6 +20,10 @@ use kordophone::api::{
|
|||||||
TokenManagement,
|
TokenManagement,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
pub enum Event {
|
||||||
|
SyncAllConversations,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Error)]
|
#[derive(Debug, Error)]
|
||||||
pub enum DaemonError {
|
pub enum DaemonError {
|
||||||
#[error("Client Not Configured")]
|
#[error("Client Not Configured")]
|
||||||
@@ -87,6 +92,16 @@ impl Daemon {
|
|||||||
Ok(settings)
|
Ok(settings)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn handle_event(&mut self, event: Event) {
|
||||||
|
match event {
|
||||||
|
Event::SyncAllConversations => {
|
||||||
|
self.sync_all_conversations().await.unwrap_or_else(|e| {
|
||||||
|
log::error!("Error handling sync event: {}", e);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn get_client(&mut self) -> Result<HTTPAPIClient> {
|
fn get_client(&mut self) -> Result<HTTPAPIClient> {
|
||||||
let settings = self.database.with_settings(|s|
|
let settings = self.database.with_settings(|s|
|
||||||
Settings::from_db(s)
|
Settings::from_db(s)
|
||||||
@@ -132,4 +147,3 @@ impl TokenManagement for &mut Daemon {
|
|||||||
self.database.set_token(token);
|
self.database.set_token(token);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,63 +1,69 @@
|
|||||||
use dbus::arg;
|
use dbus::arg;
|
||||||
use dbus_tree::MethodErr;
|
use dbus_tree::MethodErr;
|
||||||
use std::sync::{Arc, Mutex, MutexGuard};
|
use std::sync::Arc;
|
||||||
|
use tokio::sync::{Mutex, MutexGuard};
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
use std::thread;
|
use std::thread;
|
||||||
|
use std::sync::mpsc;
|
||||||
|
use futures_util::future::FutureExt;
|
||||||
|
|
||||||
use crate::daemon::Daemon;
|
use crate::daemon::{Daemon, Event};
|
||||||
use crate::dbus::interface::NetBuzzertKordophoneRepository as DbusRepository;
|
use crate::dbus::interface::NetBuzzertKordophoneRepository as DbusRepository;
|
||||||
use crate::dbus::interface::NetBuzzertKordophoneSettings as DbusSettings;
|
use crate::dbus::interface::NetBuzzertKordophoneSettings as DbusSettings;
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct ServerImpl {
|
pub struct ServerImpl {
|
||||||
daemon: Arc<Mutex<Daemon>>,
|
daemon: Arc<Mutex<Daemon>>,
|
||||||
|
event_sender: mpsc::Sender<Event>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ServerImpl {
|
impl ServerImpl {
|
||||||
pub fn new(daemon: Arc<Mutex<Daemon>>) -> Self {
|
pub fn new(daemon: Arc<Mutex<Daemon>>, event_sender: mpsc::Sender<Event>) -> Self {
|
||||||
Self { daemon }
|
Self { daemon, event_sender }
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_daemon(&self) -> Result<MutexGuard<'_, Daemon>, MethodErr> {
|
pub async fn get_daemon(&self) -> MutexGuard<'_, Daemon> {
|
||||||
self.daemon.lock().map_err(|_| MethodErr::failed("Failed to lock daemon"))
|
self.daemon.lock().await // .map_err(|_| MethodErr::failed("Failed to lock daemon"))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn daemon_then<F, T>(&self, f: F) -> Result<T, MethodErr>
|
||||||
|
where F: FnOnce(MutexGuard<'_, Daemon>) -> T + Send,
|
||||||
|
T: Send,
|
||||||
|
{
|
||||||
|
run_sync_future(self.get_daemon().then(|daemon| async move {
|
||||||
|
f(daemon)
|
||||||
|
}))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl DbusRepository for ServerImpl {
|
impl DbusRepository for ServerImpl {
|
||||||
fn get_version(&mut self) -> Result<String, MethodErr> {
|
fn get_version(&mut self) -> Result<String, MethodErr> {
|
||||||
let daemon = self.get_daemon()?;
|
self.daemon_then(|daemon| daemon.version.clone())
|
||||||
Ok(daemon.version.clone())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_conversations(&mut self) -> Result<Vec<arg::PropMap>, dbus::MethodErr> {
|
fn get_conversations(&mut self) -> Result<Vec<arg::PropMap>, dbus::MethodErr> {
|
||||||
// Get a repository instance and use it to fetch conversations
|
self.daemon_then(|mut daemon| {
|
||||||
let mut daemon = self.get_daemon()?;
|
let conversations = daemon.get_conversations();
|
||||||
let conversations = daemon.get_conversations();
|
|
||||||
|
// Convert conversations to DBus property maps
|
||||||
// Convert conversations to DBus property maps
|
let result = conversations.into_iter().map(|conv| {
|
||||||
let result = conversations.into_iter().map(|conv| {
|
let mut map = arg::PropMap::new();
|
||||||
let mut map = arg::PropMap::new();
|
map.insert("guid".into(), arg::Variant(Box::new(conv.guid)));
|
||||||
map.insert("guid".into(), arg::Variant(Box::new(conv.guid)));
|
map.insert("display_name".into(), arg::Variant(Box::new(conv.display_name.unwrap_or_default())));
|
||||||
map.insert("display_name".into(), arg::Variant(Box::new(conv.display_name.unwrap_or_default())));
|
map.insert("unread_count".into(), arg::Variant(Box::new(conv.unread_count as i32)));
|
||||||
map.insert("unread_count".into(), arg::Variant(Box::new(conv.unread_count as i32)));
|
map
|
||||||
map
|
}).collect();
|
||||||
}).collect();
|
|
||||||
|
Ok(result)
|
||||||
Ok(result)
|
})?
|
||||||
}
|
}
|
||||||
|
|
||||||
fn sync_all_conversations(&mut self) -> Result<bool, dbus::MethodErr> {
|
fn sync_all_conversations(&mut self) -> Result<(), dbus::MethodErr> {
|
||||||
let mut daemon = self.get_daemon()?;
|
self.event_sender.send(Event::SyncAllConversations).unwrap_or_else(|e| {
|
||||||
|
log::error!("Error sending sync event: {}", e);
|
||||||
|
});
|
||||||
|
|
||||||
// TODO: We don't actually probably want to block here.
|
Ok(())
|
||||||
run_sync_future(daemon.sync_all_conversations())
|
|
||||||
.unwrap()
|
|
||||||
.map_err(|e| {
|
|
||||||
log::error!("Failed to sync conversations: {}", e);
|
|
||||||
MethodErr::failed(&format!("Failed to sync conversations: {}", e))
|
|
||||||
})?;
|
|
||||||
|
|
||||||
Ok(true)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -2,9 +2,12 @@ mod dbus;
|
|||||||
mod daemon;
|
mod daemon;
|
||||||
|
|
||||||
use std::future;
|
use std::future;
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::mpsc;
|
||||||
use log::LevelFilter;
|
use log::LevelFilter;
|
||||||
|
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tokio::sync::Mutex;
|
||||||
|
|
||||||
use daemon::Daemon;
|
use daemon::Daemon;
|
||||||
use dbus::endpoint::Endpoint as DbusEndpoint;
|
use dbus::endpoint::Endpoint as DbusEndpoint;
|
||||||
use dbus::interface;
|
use dbus::interface;
|
||||||
@@ -21,6 +24,8 @@ fn initialize_logging() {
|
|||||||
async fn main() {
|
async fn main() {
|
||||||
initialize_logging();
|
initialize_logging();
|
||||||
|
|
||||||
|
let (sender, receiver) = mpsc::channel::<daemon::Event>();
|
||||||
|
|
||||||
// Create the daemon
|
// Create the daemon
|
||||||
let daemon = Arc::new(
|
let daemon = Arc::new(
|
||||||
Mutex::new(
|
Mutex::new(
|
||||||
@@ -34,7 +39,7 @@ async fn main() {
|
|||||||
);
|
);
|
||||||
|
|
||||||
// Create the server implementation
|
// Create the server implementation
|
||||||
let server = ServerImpl::new(daemon);
|
let server = ServerImpl::new(daemon.clone(), sender);
|
||||||
|
|
||||||
// Register DBus interfaces with endpoint
|
// Register DBus interfaces with endpoint
|
||||||
let endpoint = DbusEndpoint::new(server.clone());
|
let endpoint = DbusEndpoint::new(server.clone());
|
||||||
@@ -49,6 +54,13 @@ async fn main() {
|
|||||||
}
|
}
|
||||||
).await;
|
).await;
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
for event in receiver {
|
||||||
|
// Important! Only lock the daemon when handling events.
|
||||||
|
daemon.lock().await.handle_event(event).await;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
future::pending::<()>().await;
|
future::pending::<()>().await;
|
||||||
unreachable!()
|
unreachable!()
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -64,7 +64,7 @@ impl DaemonCli {
|
|||||||
|
|
||||||
pub async fn sync_conversations(&mut self) -> Result<()> {
|
pub async fn sync_conversations(&mut self) -> Result<()> {
|
||||||
let success = KordophoneRepository::sync_all_conversations(&self.proxy())?;
|
let success = KordophoneRepository::sync_all_conversations(&self.proxy())?;
|
||||||
println!("Synced conversations: {}", success);
|
println!("Initiated sync");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Reference in New Issue
Block a user