diff --git a/Cargo.lock b/Cargo.lock index 8a78e5e..7b619fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -113,9 +113,9 @@ checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" [[package]] name = "async-trait" -version = "0.1.80" +version = "0.1.88" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6fa2087f2753a7da8cc1c0dbfcf89579dd57458e36769de5ac750b4671737ca" +checksum = "e539d3fca749fcee5236ab05e93a52867dd549cc157c8cb7f99595f3cedffdb5" dependencies = [ "proc-macro2", "quote", @@ -565,9 +565,9 @@ checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" [[package]] name = "futures-channel" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" dependencies = [ "futures-core", ] @@ -591,9 +591,9 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" [[package]] name = "futures-task" @@ -840,6 +840,7 @@ name = "kordophone-db" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "bincode", "chrono", "diesel", @@ -847,6 +848,7 @@ dependencies = [ "kordophone", "serde", "time", + "tokio", "uuid", ] @@ -896,9 +898,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.153" +version = "0.2.172" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" +checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa" [[package]] name = "libdbus-sys" @@ -1511,9 +1513,9 @@ dependencies = [ [[package]] name = "tokio" -version = "1.41.1" +version = "1.44.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22cfb5bee7a6a52939ca9224d6ac897bb669134078daa8735560897f69de4d33" +checksum = "e6b88822cbe49de4185e3a4cbf8321dd487cf5fe0c5c65695fef6346371e9c48" dependencies = [ "backtrace", "bytes", @@ -1529,9 +1531,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.4.0" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" +checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8" dependencies = [ "proc-macro2", "quote", diff --git a/kordophone-db/Cargo.toml b/kordophone-db/Cargo.toml index 0696e36..65e7bba 100644 --- a/kordophone-db/Cargo.toml +++ b/kordophone-db/Cargo.toml @@ -5,6 +5,7 @@ edition = "2021" [dependencies] anyhow = "1.0.94" +async-trait = "0.1.88" bincode = "1.3.3" chrono = "0.4.38" diesel = { version = "2.2.6", features = ["chrono", "sqlite", "time"] } @@ -12,4 +13,5 @@ diesel_migrations = { version = "2.2.0", features = ["sqlite"] } kordophone = { path = "../kordophone" } serde = { version = "1.0.215", features = ["derive"] } time = "0.3.37" +tokio = "1.44.2" uuid = { version = "1.11.0", features = ["v4"] } diff --git a/kordophone-db/src/database.rs b/kordophone-db/src/database.rs index 55f99c1..3841140 100644 --- a/kordophone-db/src/database.rs +++ b/kordophone-db/src/database.rs @@ -1,5 +1,9 @@ use anyhow::Result; use diesel::prelude::*; +use async_trait::async_trait; + +pub use std::sync::Arc; +pub use tokio::sync::Mutex; use crate::repository::Repository; use crate::settings::Settings; @@ -10,6 +14,19 @@ use kordophone::model::JwtToken; use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness}; pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!(); +#[async_trait] +pub trait DatabaseAccess { + async fn with_repository(&mut self, f: F) -> R + where + F: FnOnce(&mut Repository) -> R + Send, + R: Send; + + async fn with_settings(&mut self, f: F) -> R + where + F: FnOnce(&mut Settings) -> R + Send, + R: Send; +} + pub struct Database { pub connection: SqliteConnection, } @@ -26,38 +43,64 @@ impl Database { pub fn new_in_memory() -> Result { Self::new(":memory:") } +} - pub fn with_repository(&mut self, f: F) -> R +#[async_trait] +impl DatabaseAccess for Database { + async fn with_repository(&mut self, f: F) -> R where - F: FnOnce(&mut Repository) -> R, + F: FnOnce(&mut Repository) -> R + Send, + R: Send, { let mut repository = Repository::new(&mut self.connection); f(&mut repository) } - pub fn with_settings(&mut self, f: F) -> R + async fn with_settings(&mut self, f: F) -> R where - F: FnOnce(&mut Settings) -> R, + F: FnOnce(&mut Settings) -> R + Send, + R: Send, { let mut settings = Settings::new(&mut self.connection); f(&mut settings) } } +#[async_trait] +impl DatabaseAccess for Arc> { + async fn with_repository(&mut self, f: F) -> R + where + F: FnOnce(&mut Repository) -> R + Send, + R: Send, + { + let mut database = self.lock().await; + database.with_repository(f).await + } + + async fn with_settings(&mut self, f: F) -> R + where + F: FnOnce(&mut Settings) -> R + Send, + R: Send, + { + let mut database = self.lock().await; + database.with_settings(f).await + } +} + static TOKEN_KEY: &str = "token"; impl TokenManagement for Database { - fn get_token(&mut self) -> Option { + async fn get_token(&mut self) -> Option { self.with_settings(|settings| { let token: Result> = settings.get(TOKEN_KEY); match token { Ok(data) => data, Err(_) => None, } - }) + }).await } - fn set_token(&mut self, token: JwtToken) { - self.with_settings(|settings| settings.put(TOKEN_KEY, &token).unwrap()); + async fn set_token(&mut self, token: JwtToken) { + self.with_settings(|settings| settings.put(TOKEN_KEY, &token).unwrap()).await; } } diff --git a/kordophone-db/src/tests/mod.rs b/kordophone-db/src/tests/mod.rs index 535becc..3918875 100644 --- a/kordophone-db/src/tests/mod.rs +++ b/kordophone-db/src/tests/mod.rs @@ -1,5 +1,5 @@ use crate::{ - database::Database, + database::{Database, DatabaseAccess}, repository::Repository, models::{ conversation::{Conversation, ConversationBuilder}, diff --git a/kordophone/src/api/mod.rs b/kordophone/src/api/mod.rs index 6a337b2..55ea9db 100644 --- a/kordophone/src/api/mod.rs +++ b/kordophone/src/api/mod.rs @@ -27,8 +27,8 @@ pub trait APIInterface { } pub trait TokenManagement { - fn get_token(&mut self) -> Option; - fn set_token(&mut self, token: JwtToken); + async fn get_token(&mut self) -> Option; + async fn set_token(&mut self, token: JwtToken); } pub struct InMemoryTokenManagement { @@ -42,11 +42,11 @@ impl InMemoryTokenManagement { } impl TokenManagement for InMemoryTokenManagement { - fn get_token(&mut self) -> Option { + async fn get_token(&mut self) -> Option { self.token.clone() } - fn set_token(&mut self, token: JwtToken) { + async fn set_token(&mut self, token: JwtToken) { self.token = Some(token); } } diff --git a/kordophoned/src/daemon/events.rs b/kordophoned/src/daemon/events.rs index d7342df..3d0e2ba 100644 --- a/kordophoned/src/daemon/events.rs +++ b/kordophoned/src/daemon/events.rs @@ -1,8 +1,9 @@ use tokio::sync::oneshot; use kordophone_db::models::Conversation; -pub type Reply = oneshot::Sender; +pub type Reply = oneshot::Sender; +#[derive(Debug)] pub enum Event { /// Get the version of the daemon. GetVersion(Reply), diff --git a/kordophoned/src/daemon/mod.rs b/kordophoned/src/daemon/mod.rs index 0f5e9c3..6b10f8a 100644 --- a/kordophoned/src/daemon/mod.rs +++ b/kordophoned/src/daemon/mod.rs @@ -10,9 +10,12 @@ use std::error::Error; use std::path::PathBuf; use thiserror::Error; use tokio::sync::mpsc::{Sender, Receiver}; +use std::sync::Arc; +use tokio::sync::Mutex; +use futures_util::FutureExt; use kordophone_db::{ - database::Database, + database::{Database, DatabaseAccess}, models::Conversation, repository::Repository, }; @@ -36,7 +39,7 @@ pub struct Daemon { pub event_sender: Sender, event_receiver: Receiver, version: String, - database: Database, + database: Arc>, runtime: tokio::runtime::Runtime, } @@ -58,7 +61,8 @@ impl Daemon { .build() .unwrap(); - let database = Database::new(&database_path.to_string_lossy())?; + let database_impl = Database::new(&database_path.to_string_lossy())?; + let database = Arc::new(Mutex::new(database_impl)); Ok(Self { version: "0.1.0".to_string(), database, event_receiver, event_sender, runtime }) } @@ -75,70 +79,100 @@ impl Daemon { }, Event::SyncAllConversations(reply) => { - self.sync_all_conversations().await.unwrap_or_else(|e| { - log::error!("Error handling sync event: {}", e); + let db_clone = self.database.clone(); + self.runtime.spawn(async move { + let result = Self::sync_all_conversations_impl(db_clone).await; + if let Err(e) = result { + log::error!("Error handling sync event: {}", e); + } }); + // This is a background operation, so return right away. reply.send(()).unwrap(); - }, + }, Event::GetAllConversations(reply) => { - let conversations = self.get_conversations(); + let conversations = self.get_conversations().await; reply.send(conversations).unwrap(); }, } } - fn get_conversations(&mut self) -> Vec { - self.database.with_repository(|r| r.all_conversations().unwrap()) + async fn get_conversations(&mut self) -> Vec { + self.database.lock().await.with_repository(|r| r.all_conversations().unwrap()).await } - async fn sync_all_conversations(&mut self) -> Result<()> { - tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; + async fn sync_all_conversations_impl(mut database: Arc>) -> Result<()> { + log::info!("Starting conversation sync"); + + // Get client from the database + let settings = database.with_settings(|s| Settings::from_db(s)) + .await?; - let mut client = self.get_client() - .map_err(|_| DaemonError::ClientNotConfigured)?; + let server_url = settings.server_url + .ok_or(DaemonError::ClientNotConfigured)?; + let mut client = HTTPAPIClient::new( + server_url.parse().unwrap(), + match (settings.username, settings.credential_item) { + (Some(username), Some(password)) => Some( + Credentials { + username, + password, + } + ), + _ => None, + } + ); + + // This function needed to implement TokenManagement + // let token = database.lock().await.get_token(); + // TODO: Clent.token = token + + // Fetch conversations from server let fetched_conversations = client.get_conversations().await?; let db_conversations: Vec = fetched_conversations.into_iter() .map(|c| kordophone_db::models::Conversation::from(c)) .collect(); - + // Process each conversation - let mut repository = Repository::new(&mut self.database.connection); for conversation in db_conversations { let conversation_id = conversation.guid.clone(); // Insert the conversation - repository.insert_conversation(conversation)?; - + database.with_repository(|r| r.insert_conversation(conversation)).await?; + // Fetch and sync messages for this conversation let messages = client.get_messages(&conversation_id).await?; let db_messages: Vec = messages.into_iter() .map(|m| kordophone_db::models::Message::from(m)) .collect(); - + // Insert each message - for message in db_messages { - repository.insert_message(&conversation_id, message)?; - } + database.with_repository(|r| -> Result<()> { + for message in db_messages { + r.insert_message(&conversation_id, message)?; + } + + Ok(()) + }).await?; } - + Ok(()) - } + } - fn get_settings(&mut self) -> Result { + async fn get_settings(&mut self) -> Result { let settings = self.database.with_settings(|s| Settings::from_db(s) - )?; + ).await?; Ok(settings) } - fn get_client(&mut self) -> Result { + async fn get_client(&mut self) -> Result { let settings = self.database.with_settings(|s| Settings::from_db(s) - )?; + ).await?; let server_url = settings.server_url .ok_or(DaemonError::ClientNotConfigured)?; @@ -170,13 +204,3 @@ impl Daemon { } } } - -impl TokenManagement for &mut Daemon { - fn get_token(&mut self) -> Option { - self.database.get_token() - } - - fn set_token(&mut self, token: JwtToken) { - self.database.set_token(token); - } -} diff --git a/kpcli/src/db/mod.rs b/kpcli/src/db/mod.rs index a183938..829ca74 100644 --- a/kpcli/src/db/mod.rs +++ b/kpcli/src/db/mod.rs @@ -3,7 +3,10 @@ use clap::Subcommand; use kordophone::APIInterface; use std::{env, path::PathBuf}; -use kordophone_db::database::Database; +use kordophone_db::{ + database::{Database, DatabaseAccess}, + models::{Conversation, Message}, +}; use crate::{client, printers::{ConversationPrinter, MessagePrinter}}; #[derive(Subcommand)] @@ -72,16 +75,16 @@ impl Commands { let mut db = DbClient::new()?; match cmd { Commands::Conversations { command: cmd } => match cmd { - ConversationCommands::List => db.print_conversations(), + ConversationCommands::List => db.print_conversations().await, ConversationCommands::Sync => db.sync_with_client().await, }, Commands::Messages { command: cmd } => match cmd { MessageCommands::List { conversation_id } => db.print_messages(&conversation_id).await, }, Commands::Settings { command: cmd } => match cmd { - SettingsCommands::Get { key } => db.get_setting(key), - SettingsCommands::Put { key, value } => db.put_setting(key, value), - SettingsCommands::Delete { key } => db.delete_setting(key), + SettingsCommands::Get { key } => db.get_setting(key).await, + SettingsCommands::Put { key, value } => db.put_setting(key, value).await, + SettingsCommands::Delete { key } => db.delete_setting(key).await, }, } } @@ -107,10 +110,10 @@ impl DbClient { Ok( Self { database: db }) } - pub fn print_conversations(&mut self) -> Result<()> { + pub async fn print_conversations(&mut self) -> Result<()> { let all_conversations = self.database.with_repository(|repository| { repository.all_conversations() - })?; + }).await?; println!("{} Conversations: ", all_conversations.len()); for conversation in all_conversations { @@ -123,7 +126,7 @@ impl DbClient { pub async fn print_messages(&mut self, conversation_id: &str) -> Result<()> { let messages = self.database.with_repository(|repository| { repository.get_messages_for_conversation(conversation_id) - })?; + }).await?; for message in messages { println!("{}", MessagePrinter::new(&message.into())); @@ -145,7 +148,7 @@ impl DbClient { // Insert the conversation self.database.with_repository(|repository| { repository.insert_conversation(conversation) - })?; + }).await?; // Fetch and sync messages for this conversation let messages = client.get_messages(&conversation_id).await?; @@ -160,13 +163,13 @@ impl DbClient { } Ok(()) - })?; + }).await?; } Ok(()) } - pub fn get_setting(&mut self, key: Option) -> Result<()> { + pub async fn get_setting(&mut self, key: Option) -> Result<()> { self.database.with_settings(|settings| { match key { Some(key) => { @@ -196,23 +199,23 @@ impl DbClient { } Ok(()) - }) + }).await } - pub fn put_setting(&mut self, key: String, value: String) -> Result<()> { + pub async fn put_setting(&mut self, key: String, value: String) -> Result<()> { self.database.with_settings(|settings| { settings.put(&key, &value)?; Ok(()) - }) + }).await } - pub fn delete_setting(&mut self, key: String) -> Result<()> { + pub async fn delete_setting(&mut self, key: String) -> Result<()> { self.database.with_settings(|settings| { let count = settings.del(&key)?; if count == 0 { println!("Setting '{}' not found", key); } Ok(()) - }) + }).await } }