pub mod settings; use settings::keys as SettingsKey; use settings::Settings; pub mod events; use events::*; pub mod signals; use signals::*; use anyhow::Result; use directories::ProjectDirs; use std::collections::HashMap; use std::error::Error; use std::path::{Path, PathBuf}; use std::sync::Arc; use thiserror::Error; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::Mutex; use uuid::Uuid; use kordophone_db::{ database::{Database, DatabaseAccess}, models::Conversation, }; use kordophone::api::http_client::HTTPAPIClient; use kordophone::api::APIInterface; use kordophone::model::outgoing_message::OutgoingMessage; use kordophone::model::ConversationID; mod update_monitor; use update_monitor::UpdateMonitor; mod auth_store; use auth_store::DatabaseAuthenticationStore; mod post_office; use post_office::Event as PostOfficeEvent; use post_office::PostOffice; mod models; pub use models::Attachment; pub use models::Message; mod attachment_store; pub use attachment_store::AttachmentStore; pub use attachment_store::AttachmentStoreEvent; #[derive(Debug, Error)] pub enum DaemonError { #[error("Client Not Configured")] ClientNotConfigured, } pub type DaemonResult = Result>; type DaemonClient = HTTPAPIClient; pub mod target { pub static SYNC: &str = "sync"; pub static EVENT: &str = "event"; pub static SETTINGS: &str = "settings"; pub static UPDATES: &str = "updates"; pub static ATTACHMENTS: &str = "attachments"; } pub struct Daemon { pub event_sender: Sender, event_receiver: Receiver, signal_receiver: Option>, signal_sender: Sender, post_office_sink: Sender, post_office_source: Option>, outgoing_messages: HashMap>, attachment_store_sink: Option>, version: String, database: Arc>, runtime: tokio::runtime::Runtime, } impl Daemon { pub fn new() -> Result { let database_path = Self::get_database_path(); log::info!("Database path: {}", database_path.display()); // Create the database directory if it doesn't exist let database_dir = database_path.parent().unwrap(); std::fs::create_dir_all(database_dir)?; // Create event channels let (event_sender, event_receiver) = tokio::sync::mpsc::channel(100); let (signal_sender, signal_receiver) = tokio::sync::mpsc::channel(100); let (post_office_sink, post_office_source) = tokio::sync::mpsc::channel(100); // Create background task runtime let runtime = tokio::runtime::Builder::new_multi_thread() .enable_all() .build() .unwrap(); let database_impl = Database::new(&database_path.to_string_lossy())?; let database = Arc::new(Mutex::new(database_impl)); Ok(Self { version: "0.1.0".to_string(), database, event_receiver, event_sender, signal_receiver: Some(signal_receiver), signal_sender, post_office_sink, post_office_source: Some(post_office_source), outgoing_messages: HashMap::new(), attachment_store_sink: None, runtime, }) } pub async fn run(&mut self) { log::info!("Starting daemon version {}", self.version); log::debug!("Debug logging enabled."); // Update monitor let mut update_monitor = UpdateMonitor::new(self.database.clone(), self.event_sender.clone()); tokio::spawn(async move { update_monitor.run().await; // should run indefinitely }); // Post office { let mut database = self.database.clone(); let event_sender = self.event_sender.clone(); let post_office_source = self.post_office_source.take().unwrap(); tokio::spawn(async move { let mut post_office = PostOffice::new(post_office_source, event_sender, async move || { Self::get_client_impl(&mut database).await }); post_office.run().await; }); } // Attachment store let mut attachment_store = AttachmentStore::new(self.database.clone(), self.event_sender.clone()); self.attachment_store_sink = Some(attachment_store.get_event_sink()); tokio::spawn(async move { attachment_store.run().await; }); while let Some(event) = self.event_receiver.recv().await { log::debug!(target: target::EVENT, "Received event: {:?}", event); self.handle_event(event).await; } } async fn handle_event(&mut self, event: Event) { match event { Event::GetVersion(reply) => { reply.send(self.version.clone()).unwrap(); } Event::SyncConversationList(reply) => { let mut db_clone = self.database.clone(); let signal_sender = self.signal_sender.clone(); self.runtime.spawn(async move { let result = Self::sync_conversation_list(&mut db_clone, &signal_sender).await; if let Err(e) = result { log::error!(target: target::SYNC, "Error handling sync event: {}", e); } }); // This is a background operation, so return right away. reply.send(()).unwrap(); } Event::SyncAllConversations(reply) => { let mut db_clone = self.database.clone(); let signal_sender = self.signal_sender.clone(); self.runtime.spawn(async move { let result = Self::sync_all_conversations_impl(&mut db_clone, &signal_sender).await; if let Err(e) = result { log::error!(target: target::SYNC, "Error handling sync event: {}", e); } }); // This is a background operation, so return right away. reply.send(()).unwrap(); } Event::SyncConversation(conversation_id, reply) => { let mut db_clone = self.database.clone(); let signal_sender = self.signal_sender.clone(); self.runtime.spawn(async move { let result = Self::sync_conversation_impl( &mut db_clone, &signal_sender, conversation_id, ) .await; if let Err(e) = result { log::error!(target: target::SYNC, "Error handling sync event: {}", e); } }); reply.send(()).unwrap(); } Event::GetAllConversations(limit, offset, reply) => { let conversations = self.get_conversations_limit_offset(limit, offset).await; reply.send(conversations).unwrap(); } Event::GetAllSettings(reply) => { let settings = self.get_settings().await.unwrap_or_else(|e| { log::error!(target: target::SETTINGS, "Failed to get settings: {:#?}", e); Settings::default() }); reply.send(settings).unwrap(); } Event::UpdateSettings(settings, reply) => { self.update_settings(&settings).await.unwrap_or_else(|e| { log::error!(target: target::SETTINGS, "Failed to update settings: {}", e); }); reply.send(()).unwrap(); } Event::GetMessages(conversation_id, last_message_id, reply) => { let messages = self.get_messages(conversation_id, last_message_id).await; reply.send(messages).unwrap(); } Event::DeleteAllConversations(reply) => { self.delete_all_conversations().await.unwrap_or_else(|e| { log::error!(target: target::SYNC, "Failed to delete all conversations: {}", e); }); reply.send(()).unwrap(); } Event::SendMessage(conversation_id, text, reply) => { let conversation_id = conversation_id.clone(); let uuid = self .enqueue_outgoing_message(text, conversation_id.clone()) .await; reply.send(uuid).unwrap(); // Send message updated signal, we have a placeholder message we will return. self.signal_sender .send(Signal::MessagesUpdated(conversation_id.clone())) .await .unwrap(); } Event::MessageSent(message, outgoing_message, conversation_id) => { log::info!(target: target::EVENT, "Daemon: message sent: {}", message.id); // Insert the message into the database. log::debug!(target: target::EVENT, "inserting sent message into database: {}", message.id); self.database .lock() .await .with_repository(|r| r.insert_message(&conversation_id, message.into())) .await .unwrap(); // Remove from outgoing messages. log::debug!(target: target::EVENT, "Removing message from outgoing messages: {}", outgoing_message.guid); self.outgoing_messages .get_mut(&conversation_id) .map(|messages| messages.retain(|m| m.guid != outgoing_message.guid)); // Send message updated signal. self.signal_sender .send(Signal::MessagesUpdated(conversation_id)) .await .unwrap(); } Event::GetAttachment(guid, reply) => { self.attachment_store_sink .as_ref() .unwrap() .send(AttachmentStoreEvent::GetAttachmentInfo(guid, reply)) .await .unwrap(); } Event::DownloadAttachment(attachment_id, preview, reply) => { log::info!(target: target::ATTACHMENTS, "Download requested for attachment: {}, preview: {}", &attachment_id, preview); self.attachment_store_sink .as_ref() .unwrap() .send(AttachmentStoreEvent::QueueDownloadAttachment(attachment_id, preview)) .await .unwrap(); reply.send(()).unwrap(); } } } /// Panics if the signal receiver has already been taken. pub fn obtain_signal_receiver(&mut self) -> Receiver { self.signal_receiver.take().unwrap() } async fn get_conversations(&mut self) -> Vec { self.database .lock() .await .with_repository(|r| r.all_conversations(i32::MAX, 0).unwrap()) .await } async fn get_conversations_limit_offset( &mut self, limit: i32, offset: i32, ) -> Vec { self.database .lock() .await .with_repository(|r| r.all_conversations(limit, offset).unwrap()) .await } async fn get_messages( &mut self, conversation_id: String, last_message_id: Option, ) -> Vec { // Get outgoing messages for this conversation. let empty_vec: Vec = vec![]; let outgoing_messages: &Vec = self .outgoing_messages .get(&conversation_id) .unwrap_or(&empty_vec); self.database .lock() .await .with_repository(|r| { r.get_messages_for_conversation(&conversation_id) .unwrap() .into_iter() .map(|m| m.into()) // Convert db::Message to daemon::Message .chain(outgoing_messages.into_iter().map(|m| m.into())) .collect() }) .await } async fn enqueue_outgoing_message(&mut self, text: String, conversation_id: String) -> Uuid { let conversation_id = conversation_id.clone(); let outgoing_message = OutgoingMessage::builder() .text(text) .conversation_id(conversation_id.clone()) .build(); // Keep a record of this so we can provide a consistent model to the client. self.outgoing_messages .entry(conversation_id) .or_insert(vec![]) .push(outgoing_message.clone()); let guid = outgoing_message.guid.clone(); self.post_office_sink .send(PostOfficeEvent::EnqueueOutgoingMessage(outgoing_message)) .await .unwrap(); guid } async fn sync_conversation_list( database: &mut Arc>, signal_sender: &Sender, ) -> Result<()> { log::info!(target: target::SYNC, "Starting list conversation sync"); let mut client = Self::get_client_impl(database).await?; // Fetch conversations from server let fetched_conversations = client.get_conversations().await?; let db_conversations: Vec = fetched_conversations .into_iter() .map(kordophone_db::models::Conversation::from) .collect(); // Insert each conversation let num_conversations = db_conversations.len(); for conversation in db_conversations { database .with_repository(|r| r.insert_conversation(conversation)) .await?; } // Send conversations updated signal signal_sender.send(Signal::ConversationsUpdated).await?; log::info!(target: target::SYNC, "List synchronized: {} conversations", num_conversations); Ok(()) } async fn sync_all_conversations_impl( database: &mut Arc>, signal_sender: &Sender, ) -> Result<()> { log::info!(target: target::SYNC, "Starting full conversation sync"); let mut client = Self::get_client_impl(database).await?; // Fetch conversations from server let fetched_conversations = client.get_conversations().await?; let db_conversations: Vec = fetched_conversations .into_iter() .map(kordophone_db::models::Conversation::from) .collect(); // Process each conversation let num_conversations = db_conversations.len(); for conversation in db_conversations { let conversation_id = conversation.guid.clone(); // Insert the conversation database .with_repository(|r| r.insert_conversation(conversation)) .await?; // Sync individual conversation. Self::sync_conversation_impl(database, signal_sender, conversation_id).await?; } // Send conversations updated signal. signal_sender.send(Signal::ConversationsUpdated).await?; log::info!(target: target::SYNC, "Full sync complete, {} conversations processed", num_conversations); Ok(()) } async fn sync_conversation_impl( database: &mut Arc>, signal_sender: &Sender, conversation_id: String, ) -> Result<()> { log::debug!(target: target::SYNC, "Starting conversation sync for {}", conversation_id); let mut client = Self::get_client_impl(database).await?; // Check if conversation exists in database. let conversation = database .with_repository(|r| r.get_conversation_by_guid(&conversation_id)) .await?; if conversation.is_none() { // If the conversation doesn't exist, first do a conversation list sync. log::warn!(target: target::SYNC, "Conversation {} not found, performing list sync", conversation_id); Self::sync_conversation_list(database, signal_sender).await?; } // Fetch and sync messages for this conversation let last_message_id = database .with_repository(|r| -> Option { r.get_last_message_for_conversation(&conversation_id) .unwrap_or(None) .map(|m| m.id) }) .await; log::debug!(target: target::SYNC, "Fetching messages for conversation {}", &conversation_id); log::debug!(target: target::SYNC, "Last message id: {:?}", last_message_id); let messages = client .get_messages(&conversation_id, None, None, last_message_id) .await?; let db_messages: Vec = messages .into_iter() .map(kordophone_db::models::Message::from) .collect(); // Insert each message let num_messages = db_messages.len(); log::debug!(target: target::SYNC, "Inserting {} messages for conversation {}", num_messages, &conversation_id); database .with_repository(|r| r.insert_messages(&conversation_id, db_messages)) .await?; // Send messages updated signal, if we actually inserted any messages. if num_messages > 0 { signal_sender .send(Signal::MessagesUpdated(conversation_id.clone())) .await?; } log::debug!(target: target::SYNC, "Synchronized {} messages for conversation {}", num_messages, &conversation_id); Ok(()) } async fn get_settings(&mut self) -> Result { let settings = self.database.with_settings(Settings::from_db).await?; Ok(settings) } async fn update_settings(&mut self, settings: &Settings) -> Result<()> { self.database.with_settings(|s| settings.save(s)).await } async fn get_client(&mut self) -> Result> { Self::get_client_impl(&mut self.database).await } async fn get_client_impl( database: &mut Arc>, ) -> Result> { let settings = database.with_settings(Settings::from_db).await?; let server_url = settings .server_url .ok_or(DaemonError::ClientNotConfigured)?; let client = HTTPAPIClient::new( server_url.parse().unwrap(), DatabaseAuthenticationStore::new(database.clone()), ); Ok(client) } async fn delete_all_conversations(&mut self) -> Result<()> { self.database .with_repository(|r| -> Result<()> { r.delete_all_conversations()?; r.delete_all_messages()?; Ok(()) }) .await?; self.signal_sender .send(Signal::ConversationsUpdated) .await?; Ok(()) } fn get_data_dir() -> Option { ProjectDirs::from("net", "buzzert", "kordophonecd").map(|p| PathBuf::from(p.data_dir())) } fn get_database_path() -> PathBuf { if let Some(data_dir) = Self::get_data_dir() { data_dir.join("database.db") } else { // Fallback to a local path if we can't get the system directories PathBuf::from("database.db") } } }