diff --git a/Cargo.lock b/Cargo.lock index 0172a63..2e4b394 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1005,6 +1005,7 @@ version = "0.1.0" dependencies = [ "async-trait", "base64", + "bytes", "chrono", "ctor", "env_logger", diff --git a/kordophone/Cargo.toml b/kordophone/Cargo.toml index 9a64c68..131ca83 100644 --- a/kordophone/Cargo.toml +++ b/kordophone/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" [dependencies] async-trait = "0.1.80" base64 = "0.22.1" +bytes = "1.10.1" chrono = { version = "0.4.38", features = ["serde"] } ctor = "0.2.8" env_logger = "0.11.5" diff --git a/kordophone/src/api/http_client.rs b/kordophone/src/api/http_client.rs index 7b7eaa9..d69318a 100644 --- a/kordophone/src/api/http_client.rs +++ b/kordophone/src/api/http_client.rs @@ -1,10 +1,12 @@ extern crate hyper; extern crate serde; -use std::{path::PathBuf, str}; +use std::{path::PathBuf, pin::Pin, str, task::Poll}; -use crate::api::AuthenticationStore; use crate::api::event_socket::EventSocket; +use crate::api::AuthenticationStore; +use bytes::Bytes; +use hyper::body::HttpBody; use hyper::{Body, Client, Method, Request, Uri}; use async_trait::async_trait; @@ -12,26 +14,19 @@ use serde::{de::DeserializeOwned, Deserialize, Serialize}; use tokio::net::TcpStream; +use futures_util::stream::{BoxStream, Stream}; +use futures_util::task::Context; use futures_util::{SinkExt, StreamExt, TryStreamExt}; -use futures_util::stream::{SplitStream, SplitSink, Stream}; -use futures_util::stream::BoxStream; use tokio_tungstenite::connect_async; use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; use crate::{ model::{ - Conversation, - ConversationID, - JwtToken, - Message, - MessageID, - UpdateItem, - Event, - OutgoingMessage, - }, - - APIInterface + Conversation, ConversationID, Event, JwtToken, Message, MessageID, OutgoingMessage, + UpdateItem, + }, + APIInterface, }; type HttpClient = Client; @@ -72,19 +67,19 @@ impl std::fmt::Display for Error { } } -impl From for Error { +impl From for Error { fn from(err: hyper::Error) -> Error { Error::HTTPError(err) } } -impl From for Error { +impl From for Error { fn from(err: serde_json::Error) -> Error { Error::SerdeError(err) } } -impl From for Error { +impl From for Error { fn from(err: tungstenite::Error) -> Error { Error::ClientError(err.to_string()) } @@ -99,13 +94,17 @@ impl AuthBuilder for hyper::http::request::Builder { fn with_auth(self, token: &Option) -> Self { if let Some(token) = &token { self.header("Authorization", token.to_header_value()) - } else { self } + } else { + self + } } fn with_auth_string(self, token: &Option) -> Self { if let Some(token) = &token { self.header("Authorization", format!("Bearer: {}", token)) - } else { self } + } else { + self + } } } @@ -119,7 +118,8 @@ trait AuthSetting { impl AuthSetting for hyper::http::Request { fn authenticate(&mut self, token: &Option) { if let Some(token) = &token { - self.headers_mut().insert("Authorization", token.to_header_value()); + self.headers_mut() + .insert("Authorization", token.to_header_value()); } } } @@ -156,7 +156,7 @@ impl WebsocketEventSocket { // Connection was closed cleanly Err(Error::ClientError("WebSocket connection closed".into())) } - _ => Ok(None) + _ => Ok(None), } }) } @@ -182,17 +182,40 @@ impl EventSocket for WebsocketEventSocket { } } +pub struct ResponseStream { + body: hyper::Body, +} + +impl Stream for ResponseStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.body + .poll_next_unpin(cx) + .map_err(|e| Error::HTTPError(e)) + } +} + +impl From for ResponseStream { + fn from(value: hyper::Body) -> Self { + ResponseStream { body: value } + } +} + #[async_trait] impl APIInterface for HTTPAPIClient { type Error = Error; + type ResponseStream = ResponseStream; async fn get_version(&mut self) -> Result { - let version: String = self.request("version", Method::GET).await?; + let version: String = self.deserialized_response("version", Method::GET).await?; Ok(version) } async fn get_conversations(&mut self) -> Result, Self::Error> { - let conversations: Vec = self.request("conversations", Method::GET).await?; + let conversations: Vec = self + .deserialized_response("conversations", Method::GET) + .await?; Ok(conversations) } @@ -205,7 +228,9 @@ impl APIInterface for HTTPAPIClient { log::debug!("Authenticating with username: {:?}", credentials.username); let body = || -> Body { serde_json::to_string(&credentials).unwrap().into() }; - let token: AuthResponse = self.request_with_body_retry("authenticate", Method::POST, body, false).await?; + let token: AuthResponse = self + .deserialized_response_with_body_retry("authenticate", Method::POST, body, false) + .await?; let token = JwtToken::new(&token.jwt).map_err(|e| Error::DecodeError(e.to_string()))?; log::debug!("Saving token: {:?}", token); @@ -215,46 +240,60 @@ impl APIInterface for HTTPAPIClient { } async fn get_messages( - &mut self, + &mut self, conversation_id: &ConversationID, limit: Option, before: Option, after: Option, ) -> Result, Self::Error> { let mut endpoint = format!("messages?guid={}", conversation_id); - + if let Some(limit_val) = limit { endpoint.push_str(&format!("&limit={}", limit_val)); } - + if let Some(before_id) = before { endpoint.push_str(&format!("&beforeMessageGUID={}", before_id)); } - + if let Some(after_id) = after { endpoint.push_str(&format!("&afterMessageGUID={}", after_id)); } - let messages: Vec = self.request(&endpoint, Method::GET).await?; + let messages: Vec = self.deserialized_response(&endpoint, Method::GET).await?; Ok(messages) } async fn send_message( - &mut self, + &mut self, outgoing_message: &OutgoingMessage, ) -> Result { - let message: Message = self.request_with_body( - "sendMessage", - Method::POST, - || serde_json::to_string(&outgoing_message).unwrap().into() - ).await?; + let message: Message = self + .deserialized_response_with_body("sendMessage", Method::POST, || { + serde_json::to_string(&outgoing_message).unwrap().into() + }) + .await?; Ok(message) } - async fn open_event_socket(&mut self, update_seq: Option) -> Result { - use tungstenite::handshake::client::Request as TungsteniteRequest; + async fn fetch_attachment_data( + &mut self, + guid: &String, + ) -> Result { + let endpoint = format!("attachment?guid={}", guid); + self.response_with_body_retry(&endpoint, Method::GET, Body::empty, true) + .await + .map(hyper::Response::into_body) + .map(ResponseStream::from) + } + + async fn open_event_socket( + &mut self, + update_seq: Option, + ) -> Result { use tungstenite::handshake::client::generate_key; + use tungstenite::handshake::client::Request as TungsteniteRequest; let endpoint = match update_seq { Some(seq) => format!("updates?seq={}", seq), @@ -279,7 +318,10 @@ impl APIInterface for HTTPAPIClient { match &auth { Some(token) => { - request.headers_mut().insert("Authorization", format!("Bearer: {}", token).parse().unwrap()); + request.headers_mut().insert( + "Authorization", + format!("Bearer: {}", token).parse().unwrap(), + ); } None => { log::warn!(target: "websocket", "Proceeding without auth token."); @@ -306,21 +348,23 @@ impl APIInterface for HTTPAPIClient { return Err(Error::Unauthorized); } else { log::error!("Websocket unauthorized, no credentials provided"); - return Err(Error::ClientError("Unauthorized, no credentials provided".into())); + return Err(Error::ClientError( + "Unauthorized, no credentials provided".into(), + )); } } - _ => Err(e) - } + _ => Err(e), + }, - _ => Err(e) - } + _ => Err(e), + }, } } } impl HTTPAPIClient { pub fn new(base_url: Uri, auth_store: K) -> HTTPAPIClient { - HTTPAPIClient { + HTTPAPIClient { base_url, auth_store, client: Client::new(), @@ -348,25 +392,67 @@ impl HTTPAPIClient { } } - async fn request(&mut self, endpoint: &str, method: Method) -> Result { - self.request_with_body(endpoint, method, Body::empty).await + async fn deserialized_response( + &mut self, + endpoint: &str, + method: Method, + ) -> Result { + self.deserialized_response_with_body(endpoint, method, Body::empty) + .await } - async fn request_with_body(&mut self, endpoint: &str, method: Method, body_fn: impl Fn() -> Body) -> Result - where T: DeserializeOwned + async fn deserialized_response_with_body( + &mut self, + endpoint: &str, + method: Method, + body_fn: impl Fn() -> Body, + ) -> Result + where + T: DeserializeOwned, { - self.request_with_body_retry(endpoint, method, body_fn, true).await + self.deserialized_response_with_body_retry(endpoint, method, body_fn, true) + .await } - async fn request_with_body_retry( - &mut self, - endpoint: &str, - method: Method, - body_fn: impl Fn() -> Body, - retry_auth: bool) -> Result - where - T: DeserializeOwned, + async fn deserialized_response_with_body_retry( + &mut self, + endpoint: &str, + method: Method, + body_fn: impl Fn() -> Body, + retry_auth: bool, + ) -> Result + where + T: DeserializeOwned, { + let response = self + .response_with_body_retry(endpoint, method, body_fn, retry_auth) + .await?; + + // Read and parse response body + let body = hyper::body::to_bytes(response.into_body()).await?; + let parsed: T = match serde_json::from_slice(&body) { + Ok(result) => Ok(result), + Err(json_err) => { + log::error!("Error deserializing JSON: {:?}", json_err); + log::error!("Body: {:?}", String::from_utf8_lossy(&body)); + + // If JSON deserialization fails, try to interpret it as plain text + // Unfortunately the server does return things like this... + let s = str::from_utf8(&body).map_err(|e| Error::DecodeError(e.to_string()))?; + serde_plain::from_str(s).map_err(|_| json_err) + } + }?; + + Ok(parsed) + } + + async fn response_with_body_retry( + &mut self, + endpoint: &str, + method: Method, + body_fn: impl Fn() -> Body, + retry_auth: bool, + ) -> Result, Error> { use hyper::StatusCode; let uri = self.uri_for_endpoint(endpoint, None); @@ -389,48 +475,38 @@ impl HTTPAPIClient { log::debug!("-> Response: {:}", response.status()); match response.status() { - StatusCode::OK => { /* cool */ }, + StatusCode::OK => { /* cool */ } - // 401: Unauthorized. Token may have expired or is invalid. Attempt to renew. + // 401: Unauthorized. Token may have expired or is invalid. Attempt to renew. StatusCode::UNAUTHORIZED => { if !retry_auth { return Err(Error::ClientError("Unauthorized".into())); } if let Some(credentials) = &self.auth_store.get_credentials().await { - log::debug!("Renewing token using credentials: u: {:?}", credentials.username); + log::debug!( + "Renewing token using credentials: u: {:?}", + credentials.username + ); let new_token = self.authenticate(credentials.clone()).await?; let request = build_request(&Some(new_token.to_string())); response = self.client.request(request).await?; } else { - return Err(Error::ClientError("Unauthorized, no credentials provided".into())); + return Err(Error::ClientError( + "Unauthorized, no credentials provided".into(), + )); } - }, + } - // Other errors: bubble up. + // Other errors: bubble up. _ => { let message = format!("Request failed ({:})", response.status()); - return Err(Error::ClientError(message)); + return Err(Error::ClientError(message)); } } - // Read and parse response body - let body = hyper::body::to_bytes(response.into_body()).await?; - let parsed: T = match serde_json::from_slice(&body) { - Ok(result) => Ok(result), - Err(json_err) => { - log::error!("Error deserializing JSON: {:?}", json_err); - log::error!("Body: {:?}", String::from_utf8_lossy(&body)); - - // If JSON deserialization fails, try to interpret it as plain text - // Unfortunately the server does return things like this... - let s = str::from_utf8(&body).map_err(|e| Error::DecodeError(e.to_string()))?; - serde_plain::from_str(s).map_err(|_| json_err) - } - }?; - - Ok(parsed) + Ok(response) } } @@ -438,7 +514,7 @@ impl HTTPAPIClient { mod test { use super::*; use crate::api::InMemoryAuthenticationStore; - + #[cfg(test)] fn local_mock_client() -> HTTPAPIClient { let base_url = "http://localhost:5738".parse().unwrap(); @@ -447,7 +523,10 @@ mod test { password: "test".to_string(), }; - HTTPAPIClient::new(base_url, InMemoryAuthenticationStore::new(Some(credentials))) + HTTPAPIClient::new( + base_url, + InMemoryAuthenticationStore::new(Some(credentials)), + ) } #[cfg(test)] @@ -459,7 +538,7 @@ mod test { Ok(_) => true, Err(e) => { log::error!("Mock client error: {:?}", e); - false + false } } } @@ -498,7 +577,10 @@ mod test { let mut client = local_mock_client(); let conversations = client.get_conversations().await.unwrap(); let conversation = conversations.first().unwrap(); - let messages = client.get_messages(&conversation.guid, None, None, None).await.unwrap(); + let messages = client + .get_messages(&conversation.guid, None, None, None) + .await + .unwrap(); assert!(!messages.is_empty()); } diff --git a/kordophone/src/api/mod.rs b/kordophone/src/api/mod.rs index e7fa0ae..c25f124 100644 --- a/kordophone/src/api/mod.rs +++ b/kordophone/src/api/mod.rs @@ -1,7 +1,8 @@ +pub use crate::model::{Conversation, ConversationID, Message, MessageID, OutgoingMessage}; + use async_trait::async_trait; -pub use crate::model::{ - Conversation, Message, ConversationID, MessageID, OutgoingMessage, -}; +use bytes::Bytes; +use futures_util::Stream; pub mod auth; pub use crate::api::auth::{AuthenticationStore, InMemoryAuthenticationStore}; @@ -15,21 +16,23 @@ pub mod event_socket; pub use event_socket::EventSocket; use self::http_client::Credentials; -use std::fmt::Debug; +use core::error::Error as StdError; +use std::{fmt::Debug, io::BufRead}; #[async_trait] pub trait APIInterface { type Error: Debug; + type ResponseStream: Stream>; // (GET) /version async fn get_version(&mut self) -> Result; - + // (GET) /conversations async fn get_conversations(&mut self) -> Result, Self::Error>; // (GET) /messages async fn get_messages( - &mut self, + &mut self, conversation_id: &ConversationID, limit: Option, before: Option, @@ -38,13 +41,22 @@ pub trait APIInterface { // (POST) /sendMessage async fn send_message( - &mut self, + &mut self, outgoing_message: &OutgoingMessage, ) -> Result; + // (GET) /attachment + async fn fetch_attachment_data( + &mut self, + guid: &String, + ) -> Result; + // (POST) /authenticate async fn authenticate(&mut self, credentials: Credentials) -> Result; // (WS) /updates - async fn open_event_socket(&mut self, update_seq: Option) -> Result; + async fn open_event_socket( + &mut self, + update_seq: Option, + ) -> Result; } diff --git a/kordophone/src/tests/test_client.rs b/kordophone/src/tests/test_client.rs index 4d945ca..a9f548e 100644 --- a/kordophone/src/tests/test_client.rs +++ b/kordophone/src/tests/test_client.rs @@ -6,13 +6,16 @@ use uuid::Uuid; pub use crate::APIInterface; use crate::{ - api::http_client::Credentials, - model::{Conversation, ConversationID, JwtToken, Message, MessageID, UpdateItem, Event, OutgoingMessage}, api::event_socket::EventSocket, -}; + api::http_client::Credentials, + model::{ + Conversation, ConversationID, Event, JwtToken, Message, MessageID, OutgoingMessage, + UpdateItem, + }, +}; -use futures_util::StreamExt; use futures_util::stream::BoxStream; +use futures_util::StreamExt; pub struct TestClient { pub version: &'static str, @@ -59,7 +62,7 @@ impl EventSocket for TestEventSocket { let results: Vec, TestError>> = vec![]; futures_util::stream::iter(results.into_iter()).boxed() } -} +} #[async_trait] impl APIInterface for TestClient { @@ -78,21 +81,21 @@ impl APIInterface for TestClient { } async fn get_messages( - &mut self, - conversation_id: &ConversationID, - limit: Option, - before: Option, - after: Option + &mut self, + conversation_id: &ConversationID, + limit: Option, + before: Option, + after: Option, ) -> Result, Self::Error> { if let Some(messages) = self.messages.get(conversation_id) { - return Ok(messages.clone()) + return Ok(messages.clone()); } Err(TestError::ConversationNotFound) } async fn send_message( - &mut self, + &mut self, outgoing_message: &OutgoingMessage, ) -> Result { let message = Message::builder() @@ -101,13 +104,21 @@ impl APIInterface for TestClient { .date(OffsetDateTime::now_utc()) .build(); - self.messages.entry(outgoing_message.conversation_id.clone()).or_insert(vec![]).push(message.clone()); + self.messages + .entry(outgoing_message.conversation_id.clone()) + .or_insert(vec![]) + .push(message.clone()); Ok(message) } - async fn open_event_socket(&mut self, _update_seq: Option) -> Result { + async fn open_event_socket( + &mut self, + _update_seq: Option, + ) -> Result { Ok(TestEventSocket::new()) } + + async fn fetch_attachment_data(&mut self, guid: &String) -> Result, Self::Error> { + Ok(vec![]) + } } - - diff --git a/kordophoned/include/net.buzzert.kordophonecd.Server.xml b/kordophoned/include/net.buzzert.kordophonecd.Server.xml index 8198155..b29ed0c 100644 --- a/kordophoned/include/net.buzzert.kordophonecd.Server.xml +++ b/kordophoned/include/net.buzzert.kordophonecd.Server.xml @@ -4,7 +4,7 @@ - @@ -13,9 +13,9 @@ - + - - - - - - @@ -66,16 +66,34 @@ - - - + + + + + + + + + + + + + + + + + + @@ -90,6 +108,6 @@ - + diff --git a/kordophoned/src/daemon/attachment_store.rs b/kordophoned/src/daemon/attachment_store.rs new file mode 100644 index 0000000..38f8324 --- /dev/null +++ b/kordophoned/src/daemon/attachment_store.rs @@ -0,0 +1,104 @@ +use std::{ + io::{BufReader, BufWriter, Read, Write}, + path::{Path, PathBuf}, +}; + +use anyhow::{Error, Result}; +use futures_util::{poll, StreamExt}; +use kordophone::APIInterface; +use thiserror::Error; +use tokio::pin; + +mod target { + pub static ATTACHMENTS: &str = "attachments"; +} + +#[derive(Debug, Clone)] +pub struct Attachment { + pub guid: String, + pub path: PathBuf, + pub downloaded: bool, +} + +#[derive(Debug, Error)] +enum AttachmentStoreError { + #[error("attachment has already been downloaded")] + AttachmentAlreadyDownloaded, + + #[error("Client error: {0}")] + APIClientError(String), +} + +pub struct AttachmentStore { + store_path: PathBuf, +} + +impl AttachmentStore { + pub fn new(data_dir: &PathBuf) -> AttachmentStore { + let store_path = data_dir.join("attachments"); + log::info!(target: target::ATTACHMENTS, "Attachment store path: {}", store_path.display()); + + // Create the attachment store if it doesn't exist + std::fs::create_dir_all(&store_path) + .expect("Wasn't able to create the attachment store path"); + + AttachmentStore { + store_path: store_path, + } + } + + pub fn get_attachment(&self, guid: &String) -> Attachment { + let path = self.store_path.join(guid); + let path_exists = std::fs::exists(&path).expect( + format!( + "Wasn't able to check for the existence of an attachment file path at {}", + &path.display() + ) + .as_str(), + ); + + Attachment { + guid: guid.to_owned(), + path: path, + downloaded: path_exists, + } + } + + pub async fn download_attachent( + &mut self, + attachment: &Attachment, + mut client_factory: F, + ) -> Result<()> + where + C: APIInterface, + F: AsyncFnMut() -> Result, + { + if attachment.downloaded { + log::error!(target: target::ATTACHMENTS, "Attempted to download existing attachment."); + return Err(AttachmentStoreError::AttachmentAlreadyDownloaded.into()); + } + + // Create temporary file first, we'll atomically swap later. + assert!(!std::fs::exists(&attachment.path).unwrap()); + let file = std::fs::File::create(&attachment.path)?; + let mut writer = BufWriter::new(&file); + + log::trace!(target: target::ATTACHMENTS, "Created attachment file at {}", &attachment.path.display()); + + let mut client = (client_factory)().await?; + let stream = client + .fetch_attachment_data(&attachment.guid) + .await + .map_err(|e| AttachmentStoreError::APIClientError(format!("{:?}", e)))?; + + // Since we're async, we need to pin this. + pin!(stream); + + log::trace!(target: target::ATTACHMENTS, "Writing attachment data to disk"); + while let Some(Ok(data)) = stream.next().await { + writer.write(data.as_ref())?; + } + + Ok(()) + } +} diff --git a/kordophoned/src/daemon/auth_store.rs b/kordophoned/src/daemon/auth_store.rs index c4b3945..283b169 100644 --- a/kordophoned/src/daemon/auth_store.rs +++ b/kordophoned/src/daemon/auth_store.rs @@ -1,10 +1,10 @@ use crate::daemon::SettingsKey; +use keyring::{Entry, Result}; use std::sync::Arc; use tokio::sync::Mutex; -use keyring::{Entry, Result}; -use kordophone::api::{AuthenticationStore, http_client::Credentials}; +use kordophone::api::{http_client::Credentials, AuthenticationStore}; use kordophone::model::JwtToken; use kordophone_db::database::{Database, DatabaseAccess}; @@ -25,52 +25,67 @@ impl AuthenticationStore for DatabaseAuthenticationStore { async fn get_credentials(&mut self) -> Option { use keyring::secret_service::SsCredential; - self.database.lock().await.with_settings(|settings| { - let username: Option = settings.get::(SettingsKey::USERNAME) - .unwrap_or_else(|e| { - log::warn!("error getting username from database: {}", e); - None - }); + self.database + .lock() + .await + .with_settings(|settings| { + let username: Option = settings + .get::(SettingsKey::USERNAME) + .unwrap_or_else(|e| { + log::warn!("error getting username from database: {}", e); + None + }); - match username { - Some(username) => { - let credential = SsCredential::new_with_target(None, "net.buzzert.kordophonecd", &username).unwrap(); + match username { + Some(username) => { + let credential = SsCredential::new_with_target( + None, + "net.buzzert.kordophonecd", + &username, + ) + .unwrap(); - let password: Result = Entry::new_with_credential(Box::new(credential)) - .get_password(); + let password: Result = + Entry::new_with_credential(Box::new(credential)).get_password(); - log::debug!("password: {:?}", password); - - match password { - Ok(password) => Some(Credentials { username, password }), - Err(e) => { - log::error!("error getting password from keyring: {}", e); - None + match password { + Ok(password) => Some(Credentials { username, password }), + Err(e) => { + log::error!("error getting password from keyring: {}", e); + None + } } } + None => None, } - None => None, - } - }).await + }) + .await } async fn get_token(&mut self) -> Option { - self.database.lock().await - .with_settings(|settings| { - match settings.get::(SettingsKey::TOKEN) { + self.database + .lock() + .await + .with_settings( + |settings| match settings.get::(SettingsKey::TOKEN) { Ok(token) => token, Err(e) => { log::warn!("Failed to get token from settings: {}", e); None } - } - }).await + }, + ) + .await } async fn set_token(&mut self, token: String) { - self.database.lock().await - .with_settings(|settings| settings.put(SettingsKey::TOKEN, &token)).await.unwrap_or_else(|e| { + self.database + .lock() + .await + .with_settings(|settings| settings.put(SettingsKey::TOKEN, &token)) + .await + .unwrap_or_else(|e| { log::error!("Failed to set token: {}", e); }); } -} \ No newline at end of file +} diff --git a/kordophoned/src/daemon/mod.rs b/kordophoned/src/daemon/mod.rs index 4cb3bcb..ce950b8 100644 --- a/kordophoned/src/daemon/mod.rs +++ b/kordophoned/src/daemon/mod.rs @@ -1,6 +1,6 @@ pub mod settings; -use settings::Settings; use settings::keys as SettingsKey; +use settings::Settings; pub mod events; use events::*; @@ -11,13 +11,13 @@ use signals::*; use anyhow::Result; use directories::ProjectDirs; -use std::error::Error; -use std::path::PathBuf; use std::collections::HashMap; +use std::error::Error; +use std::path::{Path, PathBuf}; use std::sync::Arc; use thiserror::Error; -use tokio::sync::mpsc::{Sender, Receiver}; +use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::Mutex; use uuid::Uuid; @@ -26,8 +26,8 @@ use kordophone_db::{ models::{Conversation, Message}, }; -use kordophone::api::APIInterface; use kordophone::api::http_client::HTTPAPIClient; +use kordophone::api::APIInterface; use kordophone::model::outgoing_message::OutgoingMessage; use kordophone::model::ConversationID; @@ -38,8 +38,12 @@ mod auth_store; use auth_store::DatabaseAuthenticationStore; mod post_office; -use post_office::PostOffice; use post_office::Event as PostOfficeEvent; +use post_office::PostOffice; + +mod attachment_store; +pub use attachment_store::Attachment; +use attachment_store::AttachmentStore; #[derive(Debug, Error)] pub enum DaemonError { @@ -49,6 +53,8 @@ pub enum DaemonError { pub type DaemonResult = Result>; +type DaemonClient = HTTPAPIClient; + pub mod target { pub static SYNC: &str = "sync"; pub static EVENT: &str = "event"; @@ -68,6 +74,8 @@ pub struct Daemon { outgoing_messages: HashMap>, + attachment_store: AttachmentStore, + version: String, database: Arc>, runtime: tokio::runtime::Runtime, @@ -87,7 +95,7 @@ impl Daemon { let (signal_sender, signal_receiver) = tokio::sync::mpsc::channel(100); let (post_office_sink, post_office_source) = tokio::sync::mpsc::channel(100); - // Create background task runtime + // Create background task runtime let runtime = tokio::runtime::Builder::new_multi_thread() .enable_all() .build() @@ -95,17 +103,22 @@ impl Daemon { let database_impl = Database::new(&database_path.to_string_lossy())?; let database = Arc::new(Mutex::new(database_impl)); - Ok(Self { - version: "0.1.0".to_string(), - database, - event_receiver, - event_sender, + + let data_path = Self::get_data_dir().expect("Unable to get data path"); + let attachment_store = AttachmentStore::new(&data_path); + + Ok(Self { + version: "0.1.0".to_string(), + database, + event_receiver, + event_sender, signal_receiver: Some(signal_receiver), - signal_sender, + signal_sender, post_office_sink, post_office_source: Some(post_office_source), outgoing_messages: HashMap::new(), - runtime + attachment_store: attachment_store, + runtime, }) } @@ -114,7 +127,8 @@ impl Daemon { log::debug!("Debug logging enabled."); // Update monitor - let mut update_monitor = UpdateMonitor::new(self.database.clone(), self.event_sender.clone()); + let mut update_monitor = + UpdateMonitor::new(self.database.clone(), self.event_sender.clone()); tokio::spawn(async move { update_monitor.run().await; // should run indefinitely }); @@ -125,7 +139,10 @@ impl Daemon { let event_sender = self.event_sender.clone(); let post_office_source = self.post_office_source.take().unwrap(); tokio::spawn(async move { - let mut post_office = PostOffice::new(post_office_source, event_sender, async move || Self::get_client_impl(&mut database).await ); + let mut post_office = + PostOffice::new(post_office_source, event_sender, async move || { + Self::get_client_impl(&mut database).await + }); post_office.run().await; }); } @@ -140,7 +157,7 @@ impl Daemon { match event { Event::GetVersion(reply) => { reply.send(self.version.clone()).unwrap(); - }, + } Event::SyncConversationList(reply) => { let mut db_clone = self.database.clone(); @@ -152,132 +169,166 @@ impl Daemon { } }); - // This is a background operation, so return right away. + // This is a background operation, so return right away. reply.send(()).unwrap(); - }, + } Event::SyncAllConversations(reply) => { let mut db_clone = self.database.clone(); let signal_sender = self.signal_sender.clone(); self.runtime.spawn(async move { - let result = Self::sync_all_conversations_impl(&mut db_clone, &signal_sender).await; + let result = + Self::sync_all_conversations_impl(&mut db_clone, &signal_sender).await; if let Err(e) = result { log::error!(target: target::SYNC, "Error handling sync event: {}", e); } }); - // This is a background operation, so return right away. + // This is a background operation, so return right away. reply.send(()).unwrap(); - }, + } Event::SyncConversation(conversation_id, reply) => { let mut db_clone = self.database.clone(); let signal_sender = self.signal_sender.clone(); self.runtime.spawn(async move { - let result = Self::sync_conversation_impl(&mut db_clone, &signal_sender, conversation_id).await; + let result = Self::sync_conversation_impl( + &mut db_clone, + &signal_sender, + conversation_id, + ) + .await; if let Err(e) = result { log::error!(target: target::SYNC, "Error handling sync event: {}", e); } }); reply.send(()).unwrap(); - }, + } Event::GetAllConversations(limit, offset, reply) => { let conversations = self.get_conversations_limit_offset(limit, offset).await; reply.send(conversations).unwrap(); - }, + } Event::GetAllSettings(reply) => { - let settings = self.get_settings().await - .unwrap_or_else(|e| { - log::error!(target: target::SETTINGS, "Failed to get settings: {:#?}", e); - Settings::default() - }); + let settings = self.get_settings().await.unwrap_or_else(|e| { + log::error!(target: target::SETTINGS, "Failed to get settings: {:#?}", e); + Settings::default() + }); reply.send(settings).unwrap(); - }, + } Event::UpdateSettings(settings, reply) => { - self.update_settings(&settings).await - .unwrap_or_else(|e| { - log::error!(target: target::SETTINGS, "Failed to update settings: {}", e); - }); + self.update_settings(&settings).await.unwrap_or_else(|e| { + log::error!(target: target::SETTINGS, "Failed to update settings: {}", e); + }); reply.send(()).unwrap(); - }, + } Event::GetMessages(conversation_id, last_message_id, reply) => { let messages = self.get_messages(conversation_id, last_message_id).await; reply.send(messages).unwrap(); - }, + } Event::DeleteAllConversations(reply) => { - self.delete_all_conversations().await - .unwrap_or_else(|e| { - log::error!(target: target::SYNC, "Failed to delete all conversations: {}", e); - }); + self.delete_all_conversations().await.unwrap_or_else(|e| { + log::error!(target: target::SYNC, "Failed to delete all conversations: {}", e); + }); reply.send(()).unwrap(); - }, + } Event::SendMessage(conversation_id, text, reply) => { let conversation_id = conversation_id.clone(); - let uuid = self.enqueue_outgoing_message(text, conversation_id.clone()).await; + let uuid = self + .enqueue_outgoing_message(text, conversation_id.clone()) + .await; reply.send(uuid).unwrap(); - // Send message updated signal, we have a placeholder message we will return. - self.signal_sender.send(Signal::MessagesUpdated(conversation_id.clone())).await.unwrap(); - }, + // Send message updated signal, we have a placeholder message we will return. + self.signal_sender + .send(Signal::MessagesUpdated(conversation_id.clone())) + .await + .unwrap(); + } Event::MessageSent(message, outgoing_message, conversation_id) => { log::info!(target: target::EVENT, "Daemon: message sent: {}", message.id); - // Insert the message into the database. + // Insert the message into the database. log::debug!(target: target::EVENT, "inserting sent message into database: {}", message.id); - self.database.lock().await - .with_repository(|r| - r.insert_message( &conversation_id, message) - ).await.unwrap(); + self.database + .lock() + .await + .with_repository(|r| r.insert_message(&conversation_id, message)) + .await + .unwrap(); // Remove from outgoing messages. log::debug!(target: target::EVENT, "Removing message from outgoing messages: {}", outgoing_message.guid); - self.outgoing_messages.get_mut(&conversation_id) + self.outgoing_messages + .get_mut(&conversation_id) .map(|messages| messages.retain(|m| m.guid != outgoing_message.guid)); // Send message updated signal. - self.signal_sender.send(Signal::MessagesUpdated(conversation_id)).await.unwrap(); - }, + self.signal_sender + .send(Signal::MessagesUpdated(conversation_id)) + .await + .unwrap(); + } } } - /// Panics if the signal receiver has already been taken. - pub fn obtain_signal_receiver(&mut self) -> Receiver { + /// Panics if the signal receiver has already been taken. + pub fn obtain_signal_receiver(&mut self) -> Receiver { self.signal_receiver.take().unwrap() } async fn get_conversations(&mut self) -> Vec { - self.database.lock().await.with_repository(|r| r.all_conversations(i32::MAX, 0).unwrap()).await + self.database + .lock() + .await + .with_repository(|r| r.all_conversations(i32::MAX, 0).unwrap()) + .await } - async fn get_conversations_limit_offset(&mut self, limit: i32, offset: i32) -> Vec { - self.database.lock().await.with_repository(|r| r.all_conversations(limit, offset).unwrap()).await + async fn get_conversations_limit_offset( + &mut self, + limit: i32, + offset: i32, + ) -> Vec { + self.database + .lock() + .await + .with_repository(|r| r.all_conversations(limit, offset).unwrap()) + .await } - async fn get_messages(&mut self, conversation_id: String, last_message_id: Option) -> Vec { - // Get outgoing messages for this conversation. + async fn get_messages( + &mut self, + conversation_id: String, + last_message_id: Option, + ) -> Vec { + // Get outgoing messages for this conversation. let empty_vec: Vec = vec![]; - let outgoing_messages: &Vec = self.outgoing_messages.get(&conversation_id) + let outgoing_messages: &Vec = self + .outgoing_messages + .get(&conversation_id) .unwrap_or(&empty_vec); - self.database.lock().await - .with_repository(|r| + self.database + .lock() + .await + .with_repository(|r| { r.get_messages_for_conversation(&conversation_id) - .unwrap() - .into_iter() - .chain(outgoing_messages.into_iter().map(|m| m.into())) - .collect() - ) + .unwrap() + .into_iter() + .chain(outgoing_messages.into_iter().map(|m| m.into())) + .collect() + }) .await } @@ -289,31 +340,41 @@ impl Daemon { .build(); // Keep a record of this so we can provide a consistent model to the client. - self.outgoing_messages.entry(conversation_id) + self.outgoing_messages + .entry(conversation_id) .or_insert(vec![]) .push(outgoing_message.clone()); let guid = outgoing_message.guid.clone(); - self.post_office_sink.send(PostOfficeEvent::EnqueueOutgoingMessage(outgoing_message)).await.unwrap(); + self.post_office_sink + .send(PostOfficeEvent::EnqueueOutgoingMessage(outgoing_message)) + .await + .unwrap(); guid } - async fn sync_conversation_list(database: &mut Arc>, signal_sender: &Sender) -> Result<()> { + async fn sync_conversation_list( + database: &mut Arc>, + signal_sender: &Sender, + ) -> Result<()> { log::info!(target: target::SYNC, "Starting list conversation sync"); let mut client = Self::get_client_impl(database).await?; // Fetch conversations from server let fetched_conversations = client.get_conversations().await?; - let db_conversations: Vec = fetched_conversations.into_iter() + let db_conversations: Vec = fetched_conversations + .into_iter() .map(kordophone_db::models::Conversation::from) .collect(); // Insert each conversation let num_conversations = db_conversations.len(); for conversation in db_conversations { - database.with_repository(|r| r.insert_conversation(conversation)).await?; + database + .with_repository(|r| r.insert_conversation(conversation)) + .await?; } // Send conversations updated signal @@ -323,25 +384,31 @@ impl Daemon { Ok(()) } - async fn sync_all_conversations_impl(database: &mut Arc>, signal_sender: &Sender) -> Result<()> { + async fn sync_all_conversations_impl( + database: &mut Arc>, + signal_sender: &Sender, + ) -> Result<()> { log::info!(target: target::SYNC, "Starting full conversation sync"); let mut client = Self::get_client_impl(database).await?; - + // Fetch conversations from server let fetched_conversations = client.get_conversations().await?; - let db_conversations: Vec = fetched_conversations.into_iter() + let db_conversations: Vec = fetched_conversations + .into_iter() .map(kordophone_db::models::Conversation::from) .collect(); - + // Process each conversation let num_conversations = db_conversations.len(); for conversation in db_conversations { let conversation_id = conversation.guid.clone(); - + // Insert the conversation - database.with_repository(|r| r.insert_conversation(conversation)).await?; - + database + .with_repository(|r| r.insert_conversation(conversation)) + .await?; + // Sync individual conversation. Self::sync_conversation_impl(database, signal_sender, conversation_id).await?; } @@ -351,44 +418,59 @@ impl Daemon { log::info!(target: target::SYNC, "Full sync complete, {} conversations processed", num_conversations); Ok(()) - } + } - async fn sync_conversation_impl(database: &mut Arc>, signal_sender: &Sender, conversation_id: String) -> Result<()> { + async fn sync_conversation_impl( + database: &mut Arc>, + signal_sender: &Sender, + conversation_id: String, + ) -> Result<()> { log::debug!(target: target::SYNC, "Starting conversation sync for {}", conversation_id); let mut client = Self::get_client_impl(database).await?; // Check if conversation exists in database. - let conversation = database.with_repository(|r| r.get_conversation_by_guid(&conversation_id)).await?; + let conversation = database + .with_repository(|r| r.get_conversation_by_guid(&conversation_id)) + .await?; if conversation.is_none() { - // If the conversation doesn't exist, first do a conversation list sync. + // If the conversation doesn't exist, first do a conversation list sync. log::warn!(target: target::SYNC, "Conversation {} not found, performing list sync", conversation_id); Self::sync_conversation_list(database, signal_sender).await?; } // Fetch and sync messages for this conversation - let last_message_id = database.with_repository(|r| -> Option { - r.get_last_message_for_conversation(&conversation_id) - .unwrap_or(None) - .map(|m| m.id) - }).await; + let last_message_id = database + .with_repository(|r| -> Option { + r.get_last_message_for_conversation(&conversation_id) + .unwrap_or(None) + .map(|m| m.id) + }) + .await; log::debug!(target: target::SYNC, "Fetching messages for conversation {}", &conversation_id); log::debug!(target: target::SYNC, "Last message id: {:?}", last_message_id); - let messages = client.get_messages(&conversation_id, None, None, last_message_id).await?; - let db_messages: Vec = messages.into_iter() + let messages = client + .get_messages(&conversation_id, None, None, last_message_id) + .await?; + let db_messages: Vec = messages + .into_iter() .map(kordophone_db::models::Message::from) .collect(); // Insert each message let num_messages = db_messages.len(); log::debug!(target: target::SYNC, "Inserting {} messages for conversation {}", num_messages, &conversation_id); - database.with_repository(|r| r.insert_messages(&conversation_id, db_messages)).await?; + database + .with_repository(|r| r.insert_messages(&conversation_id, db_messages)) + .await?; // Send messages updated signal, if we actually inserted any messages. if num_messages > 0 { - signal_sender.send(Signal::MessagesUpdated(conversation_id.clone())).await?; + signal_sender + .send(Signal::MessagesUpdated(conversation_id.clone())) + .await?; } log::debug!(target: target::SYNC, "Synchronized {} messages for conversation {}", num_messages, &conversation_id); @@ -408,35 +490,45 @@ impl Daemon { Self::get_client_impl(&mut self.database).await } - async fn get_client_impl(database: &mut Arc>) -> Result> { + async fn get_client_impl( + database: &mut Arc>, + ) -> Result> { let settings = database.with_settings(Settings::from_db).await?; - let server_url = settings.server_url + let server_url = settings + .server_url .ok_or(DaemonError::ClientNotConfigured)?; let client = HTTPAPIClient::new( server_url.parse().unwrap(), - DatabaseAuthenticationStore::new(database.clone()) + DatabaseAuthenticationStore::new(database.clone()), ); Ok(client) } async fn delete_all_conversations(&mut self) -> Result<()> { - self.database.with_repository(|r| -> Result<()> { - r.delete_all_conversations()?; - r.delete_all_messages()?; - Ok(()) - }).await?; + self.database + .with_repository(|r| -> Result<()> { + r.delete_all_conversations()?; + r.delete_all_messages()?; + Ok(()) + }) + .await?; - self.signal_sender.send(Signal::ConversationsUpdated).await?; + self.signal_sender + .send(Signal::ConversationsUpdated) + .await?; Ok(()) } + fn get_data_dir() -> Option { + ProjectDirs::from("net", "buzzert", "kordophonecd").map(|p| PathBuf::from(p.data_dir())) + } + fn get_database_path() -> PathBuf { - if let Some(proj_dirs) = ProjectDirs::from("net", "buzzert", "kordophonecd") { - let data_dir = proj_dirs.data_dir(); + if let Some(data_dir) = Self::get_data_dir() { data_dir.join("database.db") } else { // Fallback to a local path if we can't get the system directories diff --git a/kordophoned/src/dbus/server_impl.rs b/kordophoned/src/dbus/server_impl.rs index 93b3cb8..955f325 100644 --- a/kordophoned/src/dbus/server_impl.rs +++ b/kordophoned/src/dbus/server_impl.rs @@ -1,16 +1,17 @@ use dbus::arg; use dbus_tree::MethodErr; -use tokio::sync::mpsc; use std::future::Future; use std::thread; -use tokio::sync::oneshot; +use tokio::sync::mpsc; +use tokio::sync::oneshot; use crate::daemon::{ - DaemonResult, events::{Event, Reply}, settings::Settings, + Attachment, DaemonResult, }; +use crate::dbus::interface::NetBuzzertKordophoneAttachment as DbusAttachment; use crate::dbus::interface::NetBuzzertKordophoneRepository as DbusRepository; use crate::dbus::interface::NetBuzzertKordophoneSettings as DbusSettings; @@ -29,10 +30,11 @@ impl ServerImpl { make_event: impl FnOnce(Reply) -> Event, ) -> DaemonResult { let (reply_tx, reply_rx) = oneshot::channel(); - self.event_sink.send(make_event(reply_tx)) + self.event_sink + .send(make_event(reply_tx)) .await .map_err(|_| "Failed to send event")?; - + reply_rx.await.map_err(|_| "Failed to receive reply".into()) } @@ -49,21 +51,48 @@ impl ServerImpl { impl DbusRepository for ServerImpl { fn get_version(&mut self) -> Result { self.send_event_sync(Event::GetVersion) - } + } - fn get_conversations(&mut self, limit: i32, offset: i32) -> Result, dbus::MethodErr> { + fn get_conversations( + &mut self, + limit: i32, + offset: i32, + ) -> Result, dbus::MethodErr> { self.send_event_sync(|r| Event::GetAllConversations(limit, offset, r)) .map(|conversations| { - conversations.into_iter().map(|conv| { - let mut map = arg::PropMap::new(); - map.insert("guid".into(), arg::Variant(Box::new(conv.guid))); - map.insert("display_name".into(), arg::Variant(Box::new(conv.display_name.unwrap_or_default()))); - map.insert("unread_count".into(), arg::Variant(Box::new(conv.unread_count as i32))); - map.insert("last_message_preview".into(), arg::Variant(Box::new(conv.last_message_preview.unwrap_or_default()))); - map.insert("participants".into(), arg::Variant(Box::new(conv.participants.into_iter().map(|p| p.display_name()).collect::>()))); - map.insert("date".into(), arg::Variant(Box::new(conv.date.and_utc().timestamp()))); - map - }).collect() + conversations + .into_iter() + .map(|conv| { + let mut map = arg::PropMap::new(); + map.insert("guid".into(), arg::Variant(Box::new(conv.guid))); + map.insert( + "display_name".into(), + arg::Variant(Box::new(conv.display_name.unwrap_or_default())), + ); + map.insert( + "unread_count".into(), + arg::Variant(Box::new(conv.unread_count as i32)), + ); + map.insert( + "last_message_preview".into(), + arg::Variant(Box::new(conv.last_message_preview.unwrap_or_default())), + ); + map.insert( + "participants".into(), + arg::Variant(Box::new( + conv.participants + .into_iter() + .map(|p| p.display_name()) + .collect::>(), + )), + ); + map.insert( + "date".into(), + arg::Variant(Box::new(conv.date.and_utc().timestamp())), + ); + map + }) + .collect() }) } @@ -79,7 +108,11 @@ impl DbusRepository for ServerImpl { self.send_event_sync(|r| Event::SyncConversation(conversation_id, r)) } - fn get_messages(&mut self, conversation_id: String, last_message_id: String) -> Result, dbus::MethodErr> { + fn get_messages( + &mut self, + conversation_id: String, + last_message_id: String, + ) -> Result, dbus::MethodErr> { let last_message_id_opt = if last_message_id.is_empty() { None } else { @@ -88,14 +121,23 @@ impl DbusRepository for ServerImpl { self.send_event_sync(|r| Event::GetMessages(conversation_id, last_message_id_opt, r)) .map(|messages| { - messages.into_iter().map(|msg| { - let mut map = arg::PropMap::new(); - map.insert("id".into(), arg::Variant(Box::new(msg.id))); - map.insert("text".into(), arg::Variant(Box::new(msg.text))); - map.insert("date".into(), arg::Variant(Box::new(msg.date.and_utc().timestamp()))); - map.insert("sender".into(), arg::Variant(Box::new(msg.sender.display_name()))); - map - }).collect() + messages + .into_iter() + .map(|msg| { + let mut map = arg::PropMap::new(); + map.insert("id".into(), arg::Variant(Box::new(msg.id))); + map.insert("text".into(), arg::Variant(Box::new(msg.text))); + map.insert( + "date".into(), + arg::Variant(Box::new(msg.date.and_utc().timestamp())), + ); + map.insert( + "sender".into(), + arg::Variant(Box::new(msg.sender.display_name())), + ); + map + }) + .collect() }) } @@ -103,21 +145,35 @@ impl DbusRepository for ServerImpl { self.send_event_sync(Event::DeleteAllConversations) } - fn send_message(&mut self, conversation_id: String, text: String) -> Result { + fn send_message( + &mut self, + conversation_id: String, + text: String, + ) -> Result { self.send_event_sync(|r| Event::SendMessage(conversation_id, text, r)) .map(|uuid| uuid.to_string()) } -} + + fn get_attachment( + &mut self, + attachment_id: String, + ) -> Result, dbus::MethodErr> { + todo!() + } +} impl DbusSettings for ServerImpl { fn set_server(&mut self, url: String, user: String) -> Result<(), dbus::MethodErr> { - self.send_event_sync(|r| - Event::UpdateSettings(Settings { - server_url: Some(url), - username: Some(user), - token: None, - }, r) - ) + self.send_event_sync(|r| { + Event::UpdateSettings( + Settings { + server_url: Some(url), + username: Some(user), + token: None, + }, + r, + ) + }) } fn server_url(&self) -> Result { @@ -126,13 +182,16 @@ impl DbusSettings for ServerImpl { } fn set_server_url(&self, value: String) -> Result<(), dbus::MethodErr> { - self.send_event_sync(|r| - Event::UpdateSettings(Settings { - server_url: Some(value), - username: None, - token: None, - }, r) - ) + self.send_event_sync(|r| { + Event::UpdateSettings( + Settings { + server_url: Some(value), + username: None, + token: None, + }, + r, + ) + }) } fn username(&self) -> Result { @@ -141,13 +200,32 @@ impl DbusSettings for ServerImpl { } fn set_username(&self, value: String) -> Result<(), dbus::MethodErr> { - self.send_event_sync(|r| - Event::UpdateSettings(Settings { - server_url: None, - username: Some(value), - token: None, - }, r) - ) + self.send_event_sync(|r| { + Event::UpdateSettings( + Settings { + server_url: None, + username: Some(value), + token: None, + }, + r, + ) + }) + } +} + +impl DbusAttachment for Attachment { + fn file_path(&self) -> Result { + Ok(self.path.as_os_str().to_os_string().into_string().unwrap()) + } + + fn downloaded(&self) -> Result { + Ok(self.downloaded) + } + + fn delete(&mut self) -> Result<(), dbus::MethodErr> { + // Mostly a placeholder method because dbuscodegen for some reason barfs on this + // if there are no methods defined. + todo!() } } @@ -172,4 +250,4 @@ where .join() }) .expect("Error joining runtime thread") -} \ No newline at end of file +}