daemon: reorg: use channels for comms instead of copying daemon arc/mutex
This commit is contained in:
17
kordophoned/src/daemon/events.rs
Normal file
17
kordophoned/src/daemon/events.rs
Normal file
@@ -0,0 +1,17 @@
|
||||
use tokio::sync::oneshot;
|
||||
use kordophone_db::models::Conversation;
|
||||
|
||||
pub type Reply<T: Send> = oneshot::Sender<T>;
|
||||
|
||||
pub enum Event {
|
||||
/// Get the version of the daemon.
|
||||
GetVersion(Reply<String>),
|
||||
|
||||
/// Asynchronous event for syncing all conversations with the server.
|
||||
SyncAllConversations(Reply<()>),
|
||||
|
||||
/// Returns all known conversations from the database.
|
||||
GetAllConversations(Reply<Vec<Conversation>>),
|
||||
}
|
||||
|
||||
|
||||
@@ -1,11 +1,15 @@
|
||||
mod settings;
|
||||
pub mod settings;
|
||||
use settings::Settings;
|
||||
|
||||
use std::sync::mpsc;
|
||||
use directories::ProjectDirs;
|
||||
use std::path::PathBuf;
|
||||
pub mod events;
|
||||
use events::*;
|
||||
|
||||
use anyhow::Result;
|
||||
use directories::ProjectDirs;
|
||||
use std::error::Error;
|
||||
use std::path::PathBuf;
|
||||
use thiserror::Error;
|
||||
use tokio::sync::mpsc::{Sender, Receiver};
|
||||
|
||||
use kordophone_db::{
|
||||
database::Database,
|
||||
@@ -20,19 +24,20 @@ use kordophone::api::{
|
||||
TokenManagement,
|
||||
};
|
||||
|
||||
pub enum Event {
|
||||
SyncAllConversations,
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum DaemonError {
|
||||
#[error("Client Not Configured")]
|
||||
ClientNotConfigured,
|
||||
}
|
||||
|
||||
pub type DaemonResult<T> = Result<T, Box<dyn Error + Send + Sync>>;
|
||||
|
||||
pub struct Daemon {
|
||||
pub version: String,
|
||||
pub event_sender: Sender<Event>,
|
||||
event_receiver: Receiver<Event>,
|
||||
version: String,
|
||||
database: Database,
|
||||
runtime: tokio::runtime::Runtime,
|
||||
}
|
||||
|
||||
impl Daemon {
|
||||
@@ -44,15 +49,53 @@ impl Daemon {
|
||||
let database_dir = database_path.parent().unwrap();
|
||||
std::fs::create_dir_all(database_dir)?;
|
||||
|
||||
// Create event channels
|
||||
let (event_sender, event_receiver) = tokio::sync::mpsc::channel(100);
|
||||
|
||||
// Create background task runtime
|
||||
let runtime = tokio::runtime::Builder::new_multi_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let database = Database::new(&database_path.to_string_lossy())?;
|
||||
Ok(Self { version: "0.1.0".to_string(), database })
|
||||
Ok(Self { version: "0.1.0".to_string(), database, event_receiver, event_sender, runtime })
|
||||
}
|
||||
|
||||
pub fn get_conversations(&mut self) -> Vec<Conversation> {
|
||||
pub async fn run(&mut self) {
|
||||
while let Some(event) = self.event_receiver.recv().await {
|
||||
self.handle_event(event).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_event(&mut self, event: Event) {
|
||||
match event {
|
||||
Event::GetVersion(reply) => {
|
||||
reply.send(self.version.clone()).unwrap();
|
||||
},
|
||||
|
||||
Event::SyncAllConversations(reply) => {
|
||||
self.sync_all_conversations().await.unwrap_or_else(|e| {
|
||||
log::error!("Error handling sync event: {}", e);
|
||||
});
|
||||
|
||||
reply.send(()).unwrap();
|
||||
},
|
||||
|
||||
Event::GetAllConversations(reply) => {
|
||||
let conversations = self.get_conversations();
|
||||
reply.send(conversations).unwrap();
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn get_conversations(&mut self) -> Vec<Conversation> {
|
||||
self.database.with_repository(|r| r.all_conversations().unwrap())
|
||||
}
|
||||
|
||||
pub async fn sync_all_conversations(&mut self) -> Result<()> {
|
||||
async fn sync_all_conversations(&mut self) -> Result<()> {
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
|
||||
|
||||
let mut client = self.get_client()
|
||||
.map_err(|_| DaemonError::ClientNotConfigured)?;
|
||||
|
||||
@@ -84,7 +127,7 @@ impl Daemon {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn get_settings(&mut self) -> Result<Settings> {
|
||||
fn get_settings(&mut self) -> Result<Settings> {
|
||||
let settings = self.database.with_settings(|s|
|
||||
Settings::from_db(s)
|
||||
)?;
|
||||
@@ -92,16 +135,6 @@ impl Daemon {
|
||||
Ok(settings)
|
||||
}
|
||||
|
||||
pub async fn handle_event(&mut self, event: Event) {
|
||||
match event {
|
||||
Event::SyncAllConversations => {
|
||||
self.sync_all_conversations().await.unwrap_or_else(|e| {
|
||||
log::error!("Error handling sync event: {}", e);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn get_client(&mut self) -> Result<HTTPAPIClient> {
|
||||
let settings = self.database.with_settings(|s|
|
||||
Settings::from_db(s)
|
||||
|
||||
@@ -4,66 +4,74 @@ use std::sync::Arc;
|
||||
use tokio::sync::{Mutex, MutexGuard};
|
||||
use std::future::Future;
|
||||
use std::thread;
|
||||
use std::sync::mpsc;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::sync::mpsc;
|
||||
use futures_util::future::FutureExt;
|
||||
|
||||
use crate::daemon::{Daemon, Event};
|
||||
use crate::daemon::{
|
||||
Daemon,
|
||||
DaemonResult,
|
||||
events::{Event, Reply},
|
||||
};
|
||||
|
||||
use crate::dbus::interface::NetBuzzertKordophoneRepository as DbusRepository;
|
||||
use crate::dbus::interface::NetBuzzertKordophoneSettings as DbusSettings;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ServerImpl {
|
||||
daemon: Arc<Mutex<Daemon>>,
|
||||
event_sender: mpsc::Sender<Event>,
|
||||
event_sink: mpsc::Sender<Event>,
|
||||
}
|
||||
|
||||
impl ServerImpl {
|
||||
pub fn new(daemon: Arc<Mutex<Daemon>>, event_sender: mpsc::Sender<Event>) -> Self {
|
||||
Self { daemon, event_sender }
|
||||
pub fn new(event_sink: mpsc::Sender<Event>) -> Self {
|
||||
Self { event_sink }
|
||||
}
|
||||
|
||||
pub async fn get_daemon(&self) -> MutexGuard<'_, Daemon> {
|
||||
self.daemon.lock().await // .map_err(|_| MethodErr::failed("Failed to lock daemon"))
|
||||
pub async fn send_event<T>(
|
||||
&self,
|
||||
make_event: impl FnOnce(Reply<T>) -> Event,
|
||||
) -> DaemonResult<T> {
|
||||
let (reply_tx, reply_rx) = oneshot::channel();
|
||||
self.event_sink.send(make_event(reply_tx))
|
||||
.await
|
||||
.map_err(|_| "Failed to send event")?;
|
||||
|
||||
reply_rx.await.map_err(|_| "Failed to receive reply".into())
|
||||
}
|
||||
|
||||
pub fn daemon_then<F, T>(&self, f: F) -> Result<T, MethodErr>
|
||||
where F: FnOnce(MutexGuard<'_, Daemon>) -> T + Send,
|
||||
T: Send,
|
||||
{
|
||||
run_sync_future(self.get_daemon().then(|daemon| async move {
|
||||
f(daemon)
|
||||
}))
|
||||
pub fn send_event_sync<T: Send>(
|
||||
&self,
|
||||
make_event: impl FnOnce(Reply<T>) -> Event + Send,
|
||||
) -> Result<T, MethodErr> {
|
||||
run_sync_future(self.send_event(make_event))
|
||||
.unwrap()
|
||||
.map_err(|e| MethodErr::failed(&format!("Daemon error: {}", e)))
|
||||
}
|
||||
}
|
||||
|
||||
impl DbusRepository for ServerImpl {
|
||||
fn get_version(&mut self) -> Result<String, MethodErr> {
|
||||
self.daemon_then(|daemon| daemon.version.clone())
|
||||
self.send_event_sync(Event::GetVersion)
|
||||
}
|
||||
|
||||
fn get_conversations(&mut self) -> Result<Vec<arg::PropMap>, dbus::MethodErr> {
|
||||
self.daemon_then(|mut daemon| {
|
||||
let conversations = daemon.get_conversations();
|
||||
self.send_event_sync(Event::GetAllConversations)
|
||||
.and_then(|conversations| {
|
||||
// Convert conversations to DBus property maps
|
||||
let result = conversations.into_iter().map(|conv| {
|
||||
let mut map = arg::PropMap::new();
|
||||
map.insert("guid".into(), arg::Variant(Box::new(conv.guid)));
|
||||
map.insert("display_name".into(), arg::Variant(Box::new(conv.display_name.unwrap_or_default())));
|
||||
map.insert("unread_count".into(), arg::Variant(Box::new(conv.unread_count as i32)));
|
||||
map
|
||||
}).collect();
|
||||
|
||||
// Convert conversations to DBus property maps
|
||||
let result = conversations.into_iter().map(|conv| {
|
||||
let mut map = arg::PropMap::new();
|
||||
map.insert("guid".into(), arg::Variant(Box::new(conv.guid)));
|
||||
map.insert("display_name".into(), arg::Variant(Box::new(conv.display_name.unwrap_or_default())));
|
||||
map.insert("unread_count".into(), arg::Variant(Box::new(conv.unread_count as i32)));
|
||||
map
|
||||
}).collect();
|
||||
|
||||
Ok(result)
|
||||
})?
|
||||
Ok(result)
|
||||
})
|
||||
}
|
||||
|
||||
fn sync_all_conversations(&mut self) -> Result<(), dbus::MethodErr> {
|
||||
self.event_sender.send(Event::SyncAllConversations).unwrap_or_else(|e| {
|
||||
log::error!("Error sending sync event: {}", e);
|
||||
});
|
||||
|
||||
Ok(())
|
||||
self.send_event_sync(Event::SyncAllConversations)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -2,7 +2,6 @@ mod dbus;
|
||||
mod daemon;
|
||||
|
||||
use std::future;
|
||||
use std::sync::mpsc;
|
||||
use log::LevelFilter;
|
||||
|
||||
use std::sync::Arc;
|
||||
@@ -24,22 +23,16 @@ fn initialize_logging() {
|
||||
async fn main() {
|
||||
initialize_logging();
|
||||
|
||||
let (sender, receiver) = mpsc::channel::<daemon::Event>();
|
||||
|
||||
// Create the daemon
|
||||
let daemon = Arc::new(
|
||||
Mutex::new(
|
||||
Daemon::new()
|
||||
.map_err(|e| {
|
||||
log::error!("Failed to start daemon: {}", e);
|
||||
std::process::exit(1);
|
||||
})
|
||||
.unwrap()
|
||||
)
|
||||
);
|
||||
let mut daemon = Daemon::new()
|
||||
.map_err(|e| {
|
||||
log::error!("Failed to start daemon: {}", e);
|
||||
std::process::exit(1);
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
// Create the server implementation
|
||||
let server = ServerImpl::new(daemon.clone(), sender);
|
||||
let server = ServerImpl::new(daemon.event_sender.clone());
|
||||
|
||||
// Register DBus interfaces with endpoint
|
||||
let endpoint = DbusEndpoint::new(server.clone());
|
||||
@@ -54,12 +47,7 @@ async fn main() {
|
||||
}
|
||||
).await;
|
||||
|
||||
tokio::spawn(async move {
|
||||
for event in receiver {
|
||||
// Important! Only lock the daemon when handling events.
|
||||
daemon.lock().await.handle_event(event).await;
|
||||
}
|
||||
});
|
||||
daemon.run().await;
|
||||
|
||||
future::pending::<()>().await;
|
||||
unreachable!()
|
||||
|
||||
Reference in New Issue
Block a user