devises a strategy for signals
This commit is contained in:
@@ -4,6 +4,8 @@
|
|||||||
<interface name="net.buzzert.kordophone.Repository">
|
<interface name="net.buzzert.kordophone.Repository">
|
||||||
<method name="GetVersion">
|
<method name="GetVersion">
|
||||||
<arg type="s" name="version" direction="out" />
|
<arg type="s" name="version" direction="out" />
|
||||||
|
<annotation name="org.freedesktop.DBus.DocString"
|
||||||
|
value="Returns the version of the client daemon."/>
|
||||||
</method>
|
</method>
|
||||||
|
|
||||||
<method name="GetConversations">
|
<method name="GetConversations">
|
||||||
@@ -21,7 +23,14 @@
|
|||||||
</method>
|
</method>
|
||||||
|
|
||||||
<method name="SyncAllConversations">
|
<method name="SyncAllConversations">
|
||||||
|
<annotation name="org.freedesktop.DBus.DocString"
|
||||||
|
value="Initiates a background sync of all conversations with the server."/>
|
||||||
</method>
|
</method>
|
||||||
|
|
||||||
|
<signal name="ConversationsUpdated">
|
||||||
|
<annotation name="org.freedesktop.DBus.DocString"
|
||||||
|
value="Emitted when the list of conversations is updated."/>
|
||||||
|
</signal>
|
||||||
</interface>
|
</interface>
|
||||||
|
|
||||||
<interface name="net.buzzert.kordophone.Settings">
|
<interface name="net.buzzert.kordophone.Settings">
|
||||||
|
|||||||
@@ -4,6 +4,9 @@ use settings::Settings;
|
|||||||
pub mod events;
|
pub mod events;
|
||||||
use events::*;
|
use events::*;
|
||||||
|
|
||||||
|
pub mod signals;
|
||||||
|
use signals::*;
|
||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use directories::ProjectDirs;
|
use directories::ProjectDirs;
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
@@ -57,6 +60,10 @@ mod target {
|
|||||||
pub struct Daemon {
|
pub struct Daemon {
|
||||||
pub event_sender: Sender<Event>,
|
pub event_sender: Sender<Event>,
|
||||||
event_receiver: Receiver<Event>,
|
event_receiver: Receiver<Event>,
|
||||||
|
|
||||||
|
signal_receiver: Option<Receiver<Signal>>,
|
||||||
|
signal_sender: Sender<Signal>,
|
||||||
|
|
||||||
version: String,
|
version: String,
|
||||||
database: Arc<Mutex<Database>>,
|
database: Arc<Mutex<Database>>,
|
||||||
runtime: tokio::runtime::Runtime,
|
runtime: tokio::runtime::Runtime,
|
||||||
@@ -73,7 +80,7 @@ impl Daemon {
|
|||||||
|
|
||||||
// Create event channels
|
// Create event channels
|
||||||
let (event_sender, event_receiver) = tokio::sync::mpsc::channel(100);
|
let (event_sender, event_receiver) = tokio::sync::mpsc::channel(100);
|
||||||
|
let (signal_sender, signal_receiver) = tokio::sync::mpsc::channel(100);
|
||||||
// Create background task runtime
|
// Create background task runtime
|
||||||
let runtime = tokio::runtime::Builder::new_multi_thread()
|
let runtime = tokio::runtime::Builder::new_multi_thread()
|
||||||
.enable_all()
|
.enable_all()
|
||||||
@@ -82,7 +89,15 @@ impl Daemon {
|
|||||||
|
|
||||||
let database_impl = Database::new(&database_path.to_string_lossy())?;
|
let database_impl = Database::new(&database_path.to_string_lossy())?;
|
||||||
let database = Arc::new(Mutex::new(database_impl));
|
let database = Arc::new(Mutex::new(database_impl));
|
||||||
Ok(Self { version: "0.1.0".to_string(), database, event_receiver, event_sender, runtime })
|
Ok(Self {
|
||||||
|
version: "0.1.0".to_string(),
|
||||||
|
database,
|
||||||
|
event_receiver,
|
||||||
|
event_sender,
|
||||||
|
signal_receiver: Some(signal_receiver),
|
||||||
|
signal_sender,
|
||||||
|
runtime
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn run(&mut self) {
|
pub async fn run(&mut self) {
|
||||||
@@ -99,8 +114,9 @@ impl Daemon {
|
|||||||
|
|
||||||
Event::SyncAllConversations(reply) => {
|
Event::SyncAllConversations(reply) => {
|
||||||
let db_clone = self.database.clone();
|
let db_clone = self.database.clone();
|
||||||
|
let signal_sender = self.signal_sender.clone();
|
||||||
self.runtime.spawn(async move {
|
self.runtime.spawn(async move {
|
||||||
let result = Self::sync_all_conversations_impl(db_clone).await;
|
let result = Self::sync_all_conversations_impl(db_clone, signal_sender).await;
|
||||||
if let Err(e) = result {
|
if let Err(e) = result {
|
||||||
log::error!("Error handling sync event: {}", e);
|
log::error!("Error handling sync event: {}", e);
|
||||||
}
|
}
|
||||||
@@ -136,11 +152,16 @@ impl Daemon {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Panics if the signal receiver has already been taken.
|
||||||
|
pub fn obtain_signal_receiver(&mut self) -> Receiver<Signal> {
|
||||||
|
self.signal_receiver.take().unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
async fn get_conversations(&mut self) -> Vec<Conversation> {
|
async fn get_conversations(&mut self) -> Vec<Conversation> {
|
||||||
self.database.lock().await.with_repository(|r| r.all_conversations().unwrap()).await
|
self.database.lock().await.with_repository(|r| r.all_conversations().unwrap()).await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn sync_all_conversations_impl(mut database: Arc<Mutex<Database>>) -> Result<()> {
|
async fn sync_all_conversations_impl(mut database: Arc<Mutex<Database>>, signal_sender: Sender<Signal>) -> Result<()> {
|
||||||
log::info!(target: target::SYNC, "Starting conversation sync");
|
log::info!(target: target::SYNC, "Starting conversation sync");
|
||||||
|
|
||||||
let mut client = Self::get_client_impl(database.clone()).await?;
|
let mut client = Self::get_client_impl(database.clone()).await?;
|
||||||
@@ -171,6 +192,9 @@ impl Daemon {
|
|||||||
database.with_repository(|r| r.insert_messages(&conversation_id, db_messages)).await?;
|
database.with_repository(|r| r.insert_messages(&conversation_id, db_messages)).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Send conversations updated signal.
|
||||||
|
signal_sender.send(Signal::ConversationsUpdated).await?;
|
||||||
|
|
||||||
log::info!(target: target::SYNC, "Synchronized {} conversations", num_conversations);
|
log::info!(target: target::SYNC, "Synchronized {} conversations", num_conversations);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
4
kordophoned/src/daemon/signals.rs
Normal file
4
kordophoned/src/daemon/signals.rs
Normal file
@@ -0,0 +1,4 @@
|
|||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub enum Signal {
|
||||||
|
ConversationsUpdated,
|
||||||
|
}
|
||||||
@@ -8,4 +8,8 @@ pub mod interface {
|
|||||||
pub const OBJECT_PATH: &str = "/net/buzzert/kordophonecd/daemon";
|
pub const OBJECT_PATH: &str = "/net/buzzert/kordophonecd/daemon";
|
||||||
|
|
||||||
include!(concat!(env!("OUT_DIR"), "/kordophone-server.rs"));
|
include!(concat!(env!("OUT_DIR"), "/kordophone-server.rs"));
|
||||||
|
|
||||||
|
pub mod signals {
|
||||||
|
pub use crate::interface::NetBuzzertKordophoneRepositoryConversationsUpdated as ConversationsUpdated;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
@@ -9,6 +9,7 @@ use crate::daemon::{
|
|||||||
DaemonResult,
|
DaemonResult,
|
||||||
events::{Event, Reply},
|
events::{Event, Reply},
|
||||||
settings::Settings,
|
settings::Settings,
|
||||||
|
signals::Signal,
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::dbus::interface::NetBuzzertKordophoneRepository as DbusRepository;
|
use crate::dbus::interface::NetBuzzertKordophoneRepository as DbusRepository;
|
||||||
|
|||||||
@@ -4,10 +4,9 @@ mod daemon;
|
|||||||
use std::future;
|
use std::future;
|
||||||
use log::LevelFilter;
|
use log::LevelFilter;
|
||||||
|
|
||||||
use std::sync::Arc;
|
|
||||||
use tokio::sync::Mutex;
|
|
||||||
|
|
||||||
use daemon::Daemon;
|
use daemon::Daemon;
|
||||||
|
use daemon::signals::Signal;
|
||||||
|
|
||||||
use dbus::endpoint::Endpoint as DbusEndpoint;
|
use dbus::endpoint::Endpoint as DbusEndpoint;
|
||||||
use dbus::interface;
|
use dbus::interface;
|
||||||
use dbus::server_impl::ServerImpl;
|
use dbus::server_impl::ServerImpl;
|
||||||
@@ -35,7 +34,7 @@ async fn main() {
|
|||||||
let server = ServerImpl::new(daemon.event_sender.clone());
|
let server = ServerImpl::new(daemon.event_sender.clone());
|
||||||
|
|
||||||
// Register DBus interfaces with endpoint
|
// Register DBus interfaces with endpoint
|
||||||
let endpoint = DbusEndpoint::new(server.clone());
|
let endpoint = DbusEndpoint::new(server);
|
||||||
endpoint.register(
|
endpoint.register(
|
||||||
interface::NAME,
|
interface::NAME,
|
||||||
interface::OBJECT_PATH,
|
interface::OBJECT_PATH,
|
||||||
@@ -47,6 +46,24 @@ async fn main() {
|
|||||||
}
|
}
|
||||||
).await;
|
).await;
|
||||||
|
|
||||||
|
let mut signal_receiver = daemon.obtain_signal_receiver();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
use dbus::interface::signals as DbusSignals;
|
||||||
|
|
||||||
|
while let Some(signal) = signal_receiver.recv().await {
|
||||||
|
match signal {
|
||||||
|
Signal::ConversationsUpdated => {
|
||||||
|
log::info!("Sending signal: ConversationsUpdated");
|
||||||
|
endpoint.send_signal(interface::OBJECT_PATH, DbusSignals::ConversationsUpdated{})
|
||||||
|
.unwrap_or_else(|_| {
|
||||||
|
log::error!("Failed to send signal");
|
||||||
|
0
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
daemon.run().await;
|
daemon.run().await;
|
||||||
|
|
||||||
future::pending::<()>().await;
|
future::pending::<()>().await;
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ use anyhow::Result;
|
|||||||
use clap::Subcommand;
|
use clap::Subcommand;
|
||||||
use dbus::blocking::{Connection, Proxy};
|
use dbus::blocking::{Connection, Proxy};
|
||||||
use crate::printers::{ConversationPrinter, MessagePrinter};
|
use crate::printers::{ConversationPrinter, MessagePrinter};
|
||||||
|
use std::future;
|
||||||
|
|
||||||
const DBUS_NAME: &str = "net.buzzert.kordophonecd";
|
const DBUS_NAME: &str = "net.buzzert.kordophonecd";
|
||||||
const DBUS_PATH: &str = "/net/buzzert/kordophonecd/daemon";
|
const DBUS_PATH: &str = "/net/buzzert/kordophonecd/daemon";
|
||||||
@@ -30,6 +31,9 @@ pub enum Commands {
|
|||||||
#[command(subcommand)]
|
#[command(subcommand)]
|
||||||
command: ConfigCommands,
|
command: ConfigCommands,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
/// Waits for signals from the daemon.
|
||||||
|
Signals,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Subcommand)]
|
#[derive(Subcommand)]
|
||||||
@@ -61,6 +65,7 @@ impl Commands {
|
|||||||
Commands::Conversations => client.print_conversations().await,
|
Commands::Conversations => client.print_conversations().await,
|
||||||
Commands::Sync => client.sync_conversations().await,
|
Commands::Sync => client.sync_conversations().await,
|
||||||
Commands::Config { command } => client.config(command).await,
|
Commands::Config { command } => client.config(command).await,
|
||||||
|
Commands::Signals => client.wait_for_signals().await,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -102,6 +107,25 @@ impl DaemonCli {
|
|||||||
.map_err(|e| anyhow::anyhow!("Failed to sync conversations: {}", e))
|
.map_err(|e| anyhow::anyhow!("Failed to sync conversations: {}", e))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn wait_for_signals(&mut self) -> Result<()> {
|
||||||
|
use dbus::Message;
|
||||||
|
mod dbus_signals {
|
||||||
|
pub use super::dbus_interface::NetBuzzertKordophoneRepositoryConversationsUpdated as ConversationsUpdated;
|
||||||
|
}
|
||||||
|
|
||||||
|
let _id = self.proxy().match_signal(|h: dbus_signals::ConversationsUpdated, _: &Connection, _: &Message| {
|
||||||
|
println!("Signal: Conversations updated");
|
||||||
|
true
|
||||||
|
});
|
||||||
|
|
||||||
|
println!("Waiting for signals...");
|
||||||
|
loop {
|
||||||
|
self.conn.process(std::time::Duration::from_millis(1000))?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn config(&mut self, cmd: ConfigCommands) -> Result<()> {
|
pub async fn config(&mut self, cmd: ConfigCommands) -> Result<()> {
|
||||||
match cmd {
|
match cmd {
|
||||||
ConfigCommands::Print => self.print_settings().await,
|
ConfigCommands::Print => self.print_settings().await,
|
||||||
|
|||||||
Reference in New Issue
Block a user