daemon: incorporate update monitor in daemon activities
This commit is contained in:
@@ -127,6 +127,9 @@ impl<'a> Repository<'a> {
|
|||||||
))
|
))
|
||||||
.execute(self.connection)?;
|
.execute(self.connection)?;
|
||||||
|
|
||||||
|
// Update conversation date
|
||||||
|
self.update_conversation_metadata(conversation_guid, &db_message)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -174,6 +177,9 @@ impl<'a> Repository<'a> {
|
|||||||
.values(&conv_msg_records)
|
.values(&conv_msg_records)
|
||||||
.execute(self.connection)?;
|
.execute(self.connection)?;
|
||||||
|
|
||||||
|
// Update conversation date
|
||||||
|
self.update_conversation_metadata(conversation_guid, &db_messages.last().unwrap())?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -234,6 +240,17 @@ impl<'a> Repository<'a> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn update_conversation_metadata(&mut self, conversation_guid: &str, last_message: &MessageRecord) -> Result<()> {
|
||||||
|
let conversation = self.get_conversation_by_guid(conversation_guid)?;
|
||||||
|
if let Some(mut conversation) = conversation {
|
||||||
|
conversation.date = last_message.date;
|
||||||
|
conversation.last_message_preview = Some(last_message.text.clone());
|
||||||
|
self.insert_conversation(conversation)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
// Helper function to get the last inserted row ID
|
// Helper function to get the last inserted row ID
|
||||||
// This is a workaround since the Sqlite backend doesn't support `RETURNING`
|
// This is a workaround since the Sqlite backend doesn't support `RETURNING`
|
||||||
// Huge caveat with this is that it depends on whatever the last insert was, prevents concurrent inserts.
|
// Huge caveat with this is that it depends on whatever the last insert was, prevents concurrent inserts.
|
||||||
|
|||||||
@@ -243,7 +243,7 @@ impl<K: AuthenticationStore + Send + Sync> APIInterface for HTTPAPIClient<K> {
|
|||||||
request.headers_mut().insert("Authorization", header_value);
|
request.headers_mut().insert("Authorization", header_value);
|
||||||
}
|
}
|
||||||
|
|
||||||
let (socket, response) = connect_async(request).await.unwrap();
|
let (socket, response) = connect_async(request).await.map_err(Error::from)?;
|
||||||
log::debug!("Websocket connected: {:?}", response.status());
|
log::debug!("Websocket connected: {:?}", response.status());
|
||||||
|
|
||||||
if response.status() != StatusCode::SWITCHING_PROTOCOLS {
|
if response.status() != StatusCode::SWITCHING_PROTOCOLS {
|
||||||
|
|||||||
@@ -24,6 +24,11 @@
|
|||||||
</arg>
|
</arg>
|
||||||
</method>
|
</method>
|
||||||
|
|
||||||
|
<method name="SyncConversationList">
|
||||||
|
<annotation name="org.freedesktop.DBus.DocString"
|
||||||
|
value="Initiates a background sync of the conversation list with the server."/>
|
||||||
|
</method>
|
||||||
|
|
||||||
<method name="SyncAllConversations">
|
<method name="SyncAllConversations">
|
||||||
<annotation name="org.freedesktop.DBus.DocString"
|
<annotation name="org.freedesktop.DBus.DocString"
|
||||||
value="Initiates a background sync of all conversations with the server."/>
|
value="Initiates a background sync of all conversations with the server."/>
|
||||||
|
|||||||
@@ -9,6 +9,9 @@ pub enum Event {
|
|||||||
/// Get the version of the daemon.
|
/// Get the version of the daemon.
|
||||||
GetVersion(Reply<String>),
|
GetVersion(Reply<String>),
|
||||||
|
|
||||||
|
/// Asynchronous event for syncing the conversation list with the server.
|
||||||
|
SyncConversationList(Reply<()>),
|
||||||
|
|
||||||
/// Asynchronous event for syncing all conversations with the server.
|
/// Asynchronous event for syncing all conversations with the server.
|
||||||
SyncAllConversations(Reply<()>),
|
SyncAllConversations(Reply<()>),
|
||||||
|
|
||||||
|
|||||||
@@ -30,6 +30,9 @@ use kordophone::api::{
|
|||||||
AuthenticationStore,
|
AuthenticationStore,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
mod update_monitor;
|
||||||
|
use update_monitor::UpdateMonitor;
|
||||||
|
|
||||||
#[derive(Debug, Error)]
|
#[derive(Debug, Error)]
|
||||||
pub enum DaemonError {
|
pub enum DaemonError {
|
||||||
#[error("Client Not Configured")]
|
#[error("Client Not Configured")]
|
||||||
@@ -87,11 +90,11 @@ impl AuthenticationStore for DatabaseAuthenticationStore {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
mod target {
|
pub mod target {
|
||||||
pub static SYNC: &str = "sync";
|
pub static SYNC: &str = "sync";
|
||||||
pub static EVENT: &str = "event";
|
pub static EVENT: &str = "event";
|
||||||
|
pub static UPDATES: &str = "updates";
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Daemon {
|
pub struct Daemon {
|
||||||
pub event_sender: Sender<Event>,
|
pub event_sender: Sender<Event>,
|
||||||
event_receiver: Receiver<Event>,
|
event_receiver: Receiver<Event>,
|
||||||
@@ -139,6 +142,13 @@ impl Daemon {
|
|||||||
log::info!("Starting daemon version {}", self.version);
|
log::info!("Starting daemon version {}", self.version);
|
||||||
log::debug!("Debug logging enabled.");
|
log::debug!("Debug logging enabled.");
|
||||||
|
|
||||||
|
let mut update_monitor = UpdateMonitor::new(self.database.clone(), self.event_sender.clone());
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
log::info!(target: target::UPDATES, "Starting update monitor");
|
||||||
|
update_monitor.run().await; // should run indefinitely
|
||||||
|
});
|
||||||
|
|
||||||
while let Some(event) = self.event_receiver.recv().await {
|
while let Some(event) = self.event_receiver.recv().await {
|
||||||
log::debug!(target: target::EVENT, "Received event: {:?}", event);
|
log::debug!(target: target::EVENT, "Received event: {:?}", event);
|
||||||
self.handle_event(event).await;
|
self.handle_event(event).await;
|
||||||
@@ -151,6 +161,20 @@ impl Daemon {
|
|||||||
reply.send(self.version.clone()).unwrap();
|
reply.send(self.version.clone()).unwrap();
|
||||||
},
|
},
|
||||||
|
|
||||||
|
Event::SyncConversationList(reply) => {
|
||||||
|
let mut db_clone = self.database.clone();
|
||||||
|
let signal_sender = self.signal_sender.clone();
|
||||||
|
self.runtime.spawn(async move {
|
||||||
|
let result = Self::sync_conversation_list(&mut db_clone, &signal_sender).await;
|
||||||
|
if let Err(e) = result {
|
||||||
|
log::error!("Error handling sync event: {}", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// This is a background operation, so return right away.
|
||||||
|
reply.send(()).unwrap();
|
||||||
|
},
|
||||||
|
|
||||||
Event::SyncAllConversations(reply) => {
|
Event::SyncAllConversations(reply) => {
|
||||||
let mut db_clone = self.database.clone();
|
let mut db_clone = self.database.clone();
|
||||||
let signal_sender = self.signal_sender.clone();
|
let signal_sender = self.signal_sender.clone();
|
||||||
@@ -231,8 +255,32 @@ impl Daemon {
|
|||||||
self.database.lock().await.with_repository(|r| r.get_messages_for_conversation(&conversation_id).unwrap()).await
|
self.database.lock().await.with_repository(|r| r.get_messages_for_conversation(&conversation_id).unwrap()).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn sync_conversation_list(database: &mut Arc<Mutex<Database>>, signal_sender: &Sender<Signal>) -> Result<()> {
|
||||||
|
log::info!(target: target::SYNC, "Starting list conversation sync");
|
||||||
|
|
||||||
|
let mut client = Self::get_client_impl(database).await?;
|
||||||
|
|
||||||
|
// Fetch conversations from server
|
||||||
|
let fetched_conversations = client.get_conversations().await?;
|
||||||
|
let db_conversations: Vec<kordophone_db::models::Conversation> = fetched_conversations.into_iter()
|
||||||
|
.map(kordophone_db::models::Conversation::from)
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
// Insert each conversation
|
||||||
|
let num_conversations = db_conversations.len();
|
||||||
|
for conversation in db_conversations {
|
||||||
|
database.with_repository(|r| r.insert_conversation(conversation)).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send conversations updated signal
|
||||||
|
signal_sender.send(Signal::ConversationsUpdated).await?;
|
||||||
|
|
||||||
|
log::info!(target: target::SYNC, "Synchronized {} conversations", num_conversations);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
async fn sync_all_conversations_impl(database: &mut Arc<Mutex<Database>>, signal_sender: &Sender<Signal>) -> Result<()> {
|
async fn sync_all_conversations_impl(database: &mut Arc<Mutex<Database>>, signal_sender: &Sender<Signal>) -> Result<()> {
|
||||||
log::info!(target: target::SYNC, "Starting conversation sync");
|
log::info!(target: target::SYNC, "Starting full conversation sync");
|
||||||
|
|
||||||
let mut client = Self::get_client_impl(database).await?;
|
let mut client = Self::get_client_impl(database).await?;
|
||||||
|
|
||||||
@@ -266,6 +314,13 @@ impl Daemon {
|
|||||||
|
|
||||||
let mut client = Self::get_client_impl(database).await?;
|
let mut client = Self::get_client_impl(database).await?;
|
||||||
|
|
||||||
|
// Check if conversation exists in database.
|
||||||
|
let conversation = database.with_repository(|r| r.get_conversation_by_guid(&conversation_id)).await?;
|
||||||
|
if conversation.is_none() {
|
||||||
|
// If the conversation doesn't exist, first do a conversation list sync.
|
||||||
|
Self::sync_conversation_list(database, signal_sender).await?;
|
||||||
|
}
|
||||||
|
|
||||||
// Fetch and sync messages for this conversation
|
// Fetch and sync messages for this conversation
|
||||||
let last_message_id = database.with_repository(|r| -> Option<String> {
|
let last_message_id = database.with_repository(|r| -> Option<String> {
|
||||||
r.get_last_message_for_conversation(&conversation_id)
|
r.get_last_message_for_conversation(&conversation_id)
|
||||||
|
|||||||
98
kordophoned/src/daemon/update_monitor.rs
Normal file
98
kordophoned/src/daemon/update_monitor.rs
Normal file
@@ -0,0 +1,98 @@
|
|||||||
|
use crate::daemon::{
|
||||||
|
Daemon,
|
||||||
|
DaemonResult,
|
||||||
|
|
||||||
|
events::{Event, Reply},
|
||||||
|
target,
|
||||||
|
};
|
||||||
|
|
||||||
|
use kordophone::APIInterface;
|
||||||
|
use kordophone::api::event_socket::EventSocket;
|
||||||
|
use kordophone::model::event::Event as UpdateEvent;
|
||||||
|
|
||||||
|
use kordophone_db::database::Database;
|
||||||
|
|
||||||
|
use tokio::sync::mpsc::Sender;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tokio::sync::Mutex;
|
||||||
|
|
||||||
|
pub struct UpdateMonitor {
|
||||||
|
database: Arc<Mutex<Database>>,
|
||||||
|
event_sender: Sender<Event>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl UpdateMonitor {
|
||||||
|
pub fn new(database: Arc<Mutex<Database>>, event_sender: Sender<Event>) -> Self {
|
||||||
|
Self { database, event_sender }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn send_event<T>(
|
||||||
|
&self,
|
||||||
|
make_event: impl FnOnce(Reply<T>) -> Event,
|
||||||
|
) -> DaemonResult<T> {
|
||||||
|
let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
|
||||||
|
self.event_sender.send(make_event(reply_tx))
|
||||||
|
.await
|
||||||
|
.map_err(|_| "Failed to send event")?;
|
||||||
|
|
||||||
|
reply_rx.await.map_err(|_| "Failed to receive reply".into())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_update(&mut self, update: UpdateEvent) {
|
||||||
|
match update {
|
||||||
|
UpdateEvent::ConversationChanged(conversation) => {
|
||||||
|
log::info!(target: target::UPDATES, "Conversation changed: {:?}", conversation);
|
||||||
|
log::info!(target: target::UPDATES, "Triggering conversation list sync");
|
||||||
|
self.send_event(Event::SyncConversationList).await
|
||||||
|
.unwrap_or_else(|e| {
|
||||||
|
log::error!("Failed to send daemon event: {}", e);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
UpdateEvent::MessageReceived(conversation, message) => {
|
||||||
|
log::info!(target: target::UPDATES, "Message received: msgid:{:?}, convid:{:?}", message.guid, conversation.guid);
|
||||||
|
log::info!(target: target::UPDATES, "Triggering message sync for conversation id: {}", conversation.guid);
|
||||||
|
self.send_event(|r| Event::SyncConversation(conversation.guid, r)).await
|
||||||
|
.unwrap_or_else(|e| {
|
||||||
|
log::error!("Failed to send daemon event: {}", e);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn run(&mut self) {
|
||||||
|
use futures_util::stream::StreamExt;
|
||||||
|
|
||||||
|
log::info!(target: target::UPDATES, "Starting update monitor");
|
||||||
|
|
||||||
|
loop {
|
||||||
|
log::debug!(target: target::UPDATES, "Creating client");
|
||||||
|
let mut client = match Daemon::get_client_impl(&mut self.database).await {
|
||||||
|
Ok(client) => client,
|
||||||
|
Err(e) => {
|
||||||
|
log::error!("Failed to get client: {}", e);
|
||||||
|
log::warn!("Retrying in 5 seconds...");
|
||||||
|
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
log::debug!(target: target::UPDATES, "Opening event socket");
|
||||||
|
let socket = match client.open_event_socket().await {
|
||||||
|
Ok(events) => events,
|
||||||
|
Err(e) => {
|
||||||
|
log::warn!("Failed to open event socket: {}", e);
|
||||||
|
log::warn!("Retrying in 5 seconds...");
|
||||||
|
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
log::debug!(target: target::UPDATES, "Starting event stream");
|
||||||
|
let mut event_stream = socket.events().await;
|
||||||
|
while let Some(Ok(event)) = event_stream.next().await {
|
||||||
|
self.handle_update(event).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -67,6 +67,10 @@ impl DbusRepository for ServerImpl {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn sync_conversation_list(&mut self) -> Result<(), dbus::MethodErr> {
|
||||||
|
self.send_event_sync(Event::SyncConversationList)
|
||||||
|
}
|
||||||
|
|
||||||
fn sync_all_conversations(&mut self) -> Result<(), dbus::MethodErr> {
|
fn sync_all_conversations(&mut self) -> Result<(), dbus::MethodErr> {
|
||||||
self.send_event_sync(Event::SyncAllConversations)
|
self.send_event_sync(Event::SyncAllConversations)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -20,11 +20,14 @@ pub enum Commands {
|
|||||||
/// Gets all known conversations.
|
/// Gets all known conversations.
|
||||||
Conversations,
|
Conversations,
|
||||||
|
|
||||||
/// Runs a sync operation.
|
/// Runs a full sync operation for a conversation and its messages.
|
||||||
Sync {
|
Sync {
|
||||||
conversation_id: Option<String>,
|
conversation_id: Option<String>,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
/// Runs a sync operation for the conversation list.
|
||||||
|
SyncList,
|
||||||
|
|
||||||
/// Prints the server Kordophone version.
|
/// Prints the server Kordophone version.
|
||||||
Version,
|
Version,
|
||||||
|
|
||||||
@@ -75,6 +78,7 @@ impl Commands {
|
|||||||
Commands::Version => client.print_version().await,
|
Commands::Version => client.print_version().await,
|
||||||
Commands::Conversations => client.print_conversations().await,
|
Commands::Conversations => client.print_conversations().await,
|
||||||
Commands::Sync { conversation_id } => client.sync_conversations(conversation_id).await,
|
Commands::Sync { conversation_id } => client.sync_conversations(conversation_id).await,
|
||||||
|
Commands::SyncList => client.sync_conversations_list().await,
|
||||||
Commands::Config { command } => client.config(command).await,
|
Commands::Config { command } => client.config(command).await,
|
||||||
Commands::Signals => client.wait_for_signals().await,
|
Commands::Signals => client.wait_for_signals().await,
|
||||||
Commands::Messages { conversation_id, last_message_id } => client.print_messages(conversation_id, last_message_id).await,
|
Commands::Messages { conversation_id, last_message_id } => client.print_messages(conversation_id, last_message_id).await,
|
||||||
@@ -125,6 +129,11 @@ impl DaemonCli {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn sync_conversations_list(&mut self) -> Result<()> {
|
||||||
|
KordophoneRepository::sync_conversation_list(&self.proxy())
|
||||||
|
.map_err(|e| anyhow::anyhow!("Failed to sync conversations: {}", e))
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn print_messages(&mut self, conversation_id: String, last_message_id: Option<String>) -> Result<()> {
|
pub async fn print_messages(&mut self, conversation_id: String, last_message_id: Option<String>) -> Result<()> {
|
||||||
let messages = KordophoneRepository::get_messages(&self.proxy(), &conversation_id, &last_message_id.unwrap_or_default())?;
|
let messages = KordophoneRepository::get_messages(&self.proxy(), &conversation_id, &last_message_id.unwrap_or_default())?;
|
||||||
println!("Number of messages: {}", messages.len());
|
println!("Number of messages: {}", messages.len());
|
||||||
|
|||||||
Reference in New Issue
Block a user