diff --git a/kordophone/src/api/http_client.rs b/kordophone/src/api/http_client.rs index d69318a..8d58724 100644 --- a/kordophone/src/api/http_client.rs +++ b/kordophone/src/api/http_client.rs @@ -280,8 +280,9 @@ impl APIInterface for HTTPAPIClient { async fn fetch_attachment_data( &mut self, guid: &String, + preview: bool, ) -> Result { - let endpoint = format!("attachment?guid={}", guid); + let endpoint = format!("attachment?guid={}&preview={}", guid, preview); self.response_with_body_retry(&endpoint, Method::GET, Body::empty, true) .await .map(hyper::Response::into_body) diff --git a/kordophone/src/api/mod.rs b/kordophone/src/api/mod.rs index c25f124..b0326db 100644 --- a/kordophone/src/api/mod.rs +++ b/kordophone/src/api/mod.rs @@ -49,6 +49,7 @@ pub trait APIInterface { async fn fetch_attachment_data( &mut self, guid: &String, + preview: bool, ) -> Result; // (POST) /authenticate diff --git a/kordophoned/include/net.buzzert.kordophonecd.Server.xml b/kordophoned/include/net.buzzert.kordophonecd.Server.xml index da4e231..34b99a8 100644 --- a/kordophoned/include/net.buzzert.kordophonecd.Server.xml +++ b/kordophoned/include/net.buzzert.kordophonecd.Server.xml @@ -89,15 +89,27 @@ - + + + value="Returns attachment info: + - path: string + - preview_path: string + - downloaded: boolean + - preview_downloaded: boolean + "/> + + + value="Initiates download of the specified attachment if not already downloaded. + Arguments: + attachment_id: the attachment GUID + preview: whether to download the preview (true) or full attachment (false) + "/> diff --git a/kordophoned/src/daemon/attachment_store.rs b/kordophoned/src/daemon/attachment_store.rs index 33e8510..429c19c 100644 --- a/kordophoned/src/daemon/attachment_store.rs +++ b/kordophoned/src/daemon/attachment_store.rs @@ -7,7 +7,20 @@ use anyhow::Result; use futures_util::StreamExt; use kordophone::APIInterface; use thiserror::Error; + +use kordophone_db::database::Database; +use kordophone_db::database::DatabaseAccess; + +use crate::daemon::events::Event; +use crate::daemon::events::Reply; +use crate::daemon::Daemon; + +use std::sync::Arc; +use tokio::sync::Mutex; +use tokio::sync::mpsc::{Receiver, Sender}; + use tokio::pin; +use tokio::time::Duration; mod target { pub static ATTACHMENTS: &str = "attachments"; @@ -16,8 +29,31 @@ mod target { #[derive(Debug, Clone)] pub struct Attachment { pub guid: String, - pub path: PathBuf, - pub downloaded: bool, + pub base_path: PathBuf, +} + +impl Attachment { + pub fn get_path(&self, preview: bool) -> PathBuf { + self.base_path.with_extension(if preview { "preview" } else { "full" }) + } + + pub fn is_downloaded(&self, preview: bool) -> bool { + std::fs::exists(&self.get_path(preview)) + .expect(format!("Wasn't able to check for the existence of an attachment file path at {}", &self.get_path(preview).display()).as_str()) + } +} + +#[derive(Debug)] +pub enum AttachmentStoreEvent { + // Get the attachment info for a given attachment guid. + // Args: attachment guid, reply channel. + GetAttachmentInfo(String, Reply), + + // Queue a download for a given attachment guid. + // Args: + // - attachment guid + // - preview: whether to download the preview (true) or full attachment (false) + QueueDownloadAttachment(String, bool), } #[derive(Debug, Error)] @@ -31,10 +67,15 @@ enum AttachmentStoreError { pub struct AttachmentStore { store_path: PathBuf, + database: Arc>, + daemon_event_sink: Sender, + + event_source: Receiver, + event_sink: Option>, } impl AttachmentStore { - pub fn new(data_dir: &PathBuf) -> AttachmentStore { + pub fn new(data_dir: &PathBuf, database: Arc>, daemon_event_sink: Sender) -> AttachmentStore { let store_path = data_dir.join("attachments"); log::info!(target: target::ATTACHMENTS, "Attachment store path: {}", store_path.display()); @@ -42,39 +83,31 @@ impl AttachmentStore { std::fs::create_dir_all(&store_path) .expect("Wasn't able to create the attachment store path"); + let (event_sink, event_source) = tokio::sync::mpsc::channel(100); + AttachmentStore { store_path: store_path, + database: database, + daemon_event_sink: daemon_event_sink, + event_source: event_source, + event_sink: Some(event_sink), } } - pub fn get_attachment(&self, guid: &String) -> Attachment { - let path = self.store_path.join(guid); - let path_exists = std::fs::exists(&path).expect( - format!( - "Wasn't able to check for the existence of an attachment file path at {}", - &path.display() - ) - .as_str(), - ); + pub fn get_event_sink(&mut self) -> Sender { + self.event_sink.take().unwrap() + } + fn get_attachment(&self, guid: &String, preview: bool) -> Attachment { + let base_path = self.store_path.join(guid); Attachment { guid: guid.to_owned(), - path: path, - downloaded: path_exists, + base_path: base_path, } } - - pub async fn download_attachment( - &mut self, - attachment: &Attachment, - mut client_factory: F, - ) -> Result<()> - where - C: APIInterface, - F: FnMut() -> Fut, - Fut: std::future::Future>, - { - if attachment.downloaded { + + async fn download_attachment(&mut self, attachment: &Attachment, preview: bool) -> Result<()> { + if attachment.is_downloaded(preview) { log::info!(target: target::ATTACHMENTS, "Attachment already downloaded: {}", attachment.guid); return Err(AttachmentStoreError::AttachmentAlreadyDownloaded.into()); } @@ -82,15 +115,15 @@ impl AttachmentStore { log::info!(target: target::ATTACHMENTS, "Starting download for attachment: {}", attachment.guid); // Create temporary file first, we'll atomically swap later. - assert!(!std::fs::exists(&attachment.path).unwrap()); - let file = std::fs::File::create(&attachment.path)?; + assert!(!std::fs::exists(&attachment.get_path(preview)).unwrap()); + let file = std::fs::File::create(&attachment.get_path(preview))?; let mut writer = BufWriter::new(&file); - log::trace!(target: target::ATTACHMENTS, "Created attachment file at {}", &attachment.path.display()); + log::trace!(target: target::ATTACHMENTS, "Created attachment file at {}", &attachment.get_path(preview).display()); - let mut client = client_factory().await?; + let mut client = Daemon::get_client_impl(&mut self.database).await?; let stream = client - .fetch_attachment_data(&attachment.guid) + .fetch_attachment_data(&attachment.guid, preview) .await .map_err(|e| AttachmentStoreError::APIClientError(format!("{:?}", e)))?; @@ -106,9 +139,31 @@ impl AttachmentStore { Ok(()) } - /// Check if an attachment should be downloaded - pub fn should_download(&self, attachment_id: &str) -> bool { - let attachment = self.get_attachment(&attachment_id.to_string()); - !attachment.downloaded + pub async fn run(&mut self) { + loop { + tokio::select! { + Some(event) = self.event_source.recv() => { + log::debug!(target: target::ATTACHMENTS, "Received attachment store event: {:?}", event); + + match event { + AttachmentStoreEvent::QueueDownloadAttachment(guid, preview) => { + let attachment = self.get_attachment(&guid, preview); + if !attachment.is_downloaded(preview) { + self.download_attachment(&attachment, preview).await.unwrap_or_else(|e| { + log::error!(target: target::ATTACHMENTS, "Error downloading attachment: {}", e); + }); + } else { + log::info!(target: target::ATTACHMENTS, "Attachment already downloaded: {}", guid); + } + } + + AttachmentStoreEvent::GetAttachmentInfo(guid, reply) => { + let attachment = self.get_attachment(&guid, false); + reply.send(attachment).unwrap(); + } + } + } + } + } } } diff --git a/kordophoned/src/daemon/events.rs b/kordophoned/src/daemon/events.rs index 633d119..2e11ff2 100644 --- a/kordophoned/src/daemon/events.rs +++ b/kordophoned/src/daemon/events.rs @@ -65,8 +65,9 @@ pub enum Event { /// Downloads an attachment from the server. /// Parameters: /// - attachment_id: The attachment ID to download + /// - preview: Whether to download the preview (true) or full attachment (false) /// - reply: Reply indicating success or failure - DownloadAttachment(String, Reply<()>), + DownloadAttachment(String, bool, Reply<()>), /// Delete all conversations from the database. DeleteAllConversations(Reply<()>), diff --git a/kordophoned/src/daemon/mod.rs b/kordophoned/src/daemon/mod.rs index 7012bf0..4c444e4 100644 --- a/kordophoned/src/daemon/mod.rs +++ b/kordophoned/src/daemon/mod.rs @@ -43,7 +43,8 @@ use post_office::PostOffice; mod attachment_store; pub use attachment_store::Attachment; -use attachment_store::AttachmentStore; +pub use attachment_store::AttachmentStore; +pub use attachment_store::AttachmentStoreEvent; #[derive(Debug, Error)] pub enum DaemonError { @@ -75,7 +76,7 @@ pub struct Daemon { outgoing_messages: HashMap>, - attachment_store: AttachmentStore, + attachment_store_sink: Option>, version: String, database: Arc>, @@ -105,9 +106,6 @@ impl Daemon { let database_impl = Database::new(&database_path.to_string_lossy())?; let database = Arc::new(Mutex::new(database_impl)); - let data_path = Self::get_data_dir().expect("Unable to get data path"); - let attachment_store = AttachmentStore::new(&data_path); - Ok(Self { version: "0.1.0".to_string(), database, @@ -118,7 +116,7 @@ impl Daemon { post_office_sink, post_office_source: Some(post_office_source), outgoing_messages: HashMap::new(), - attachment_store: attachment_store, + attachment_store_sink: None, runtime, }) } @@ -148,6 +146,14 @@ impl Daemon { }); } + // Attachment store + let data_path = Self::get_data_dir().expect("Unable to get data path"); + let mut attachment_store = AttachmentStore::new(&data_path, self.database.clone(), self.event_sender.clone()); + self.attachment_store_sink = Some(attachment_store.get_event_sink()); + tokio::spawn(async move { + attachment_store.run().await; + }); + while let Some(event) = self.event_receiver.recv().await { log::debug!(target: target::EVENT, "Received event: {:?}", event); self.handle_event(event).await; @@ -282,13 +288,24 @@ impl Daemon { } Event::GetAttachment(guid, reply) => { - let attachment = self.attachment_store.get_attachment(&guid); - reply.send(attachment).unwrap(); + self.attachment_store_sink + .as_ref() + .unwrap() + .send(AttachmentStoreEvent::GetAttachmentInfo(guid, reply)) + .await + .unwrap(); } - Event::DownloadAttachment(attachment_id, reply) => { - // For now, just return success - we'll implement the actual download logic later - log::info!(target: target::ATTACHMENTS, "Download requested for attachment: {}", attachment_id); + Event::DownloadAttachment(attachment_id, preview, reply) => { + log::info!(target: target::ATTACHMENTS, "Download requested for attachment: {}, preview: {}", &attachment_id, preview); + + self.attachment_store_sink + .as_ref() + .unwrap() + .send(AttachmentStoreEvent::QueueDownloadAttachment(attachment_id, preview)) + .await + .unwrap(); + reply.send(()).unwrap(); } } diff --git a/kordophoned/src/dbus/server_impl.rs b/kordophoned/src/dbus/server_impl.rs index a953573..2e7155f 100644 --- a/kordophoned/src/dbus/server_impl.rs +++ b/kordophoned/src/dbus/server_impl.rs @@ -11,21 +11,18 @@ use crate::daemon::{ Attachment, DaemonResult, }; -use crate::dbus::endpoint::DbusRegistry; use crate::dbus::interface::NetBuzzertKordophoneRepository as DbusRepository; use crate::dbus::interface::NetBuzzertKordophoneSettings as DbusSettings; #[derive(Clone)] pub struct ServerImpl { event_sink: mpsc::Sender, - dbus_registry: DbusRegistry, } impl ServerImpl { - pub fn new(event_sink: mpsc::Sender, dbus_registry: DbusRegistry) -> Self { + pub fn new(event_sink: mpsc::Sender) -> Self { Self { event_sink: event_sink, - dbus_registry: dbus_registry, } } @@ -187,28 +184,34 @@ impl DbusRepository for ServerImpl { fn get_attachment_info( &mut self, attachment_id: String, - ) -> Result<(String, bool, u32), dbus::MethodErr> { + ) -> Result<(String, String, bool, bool), dbus::MethodErr> { self.send_event_sync(|r| Event::GetAttachment(attachment_id, r)) .map(|attachment| { - let file_size = if attachment.downloaded { - std::fs::metadata(&attachment.path) - .map(|m| m.len() as u32) - .unwrap_or(0) - } else { - 0 - }; - + let path = attachment.get_path(false); + let downloaded = attachment.is_downloaded(false); + + let preview_path = attachment.get_path(true); + let preview_downloaded = attachment.is_downloaded(true); + ( - attachment.path.to_string_lossy().to_string(), - attachment.downloaded, - file_size, + // - path: string + path.to_string_lossy().to_string(), + + // - preview_path: string + preview_path.to_string_lossy().to_string(), + + // - downloaded: boolean + downloaded, + + // - preview_downloaded: boolean + preview_downloaded, ) }) } - fn download_attachment(&mut self, attachment_id: String) -> Result<(), dbus::MethodErr> { + fn download_attachment(&mut self, attachment_id: String, preview: bool) -> Result<(), dbus::MethodErr> { // For now, just trigger the download event - we'll implement the actual download logic later - self.send_event_sync(|r| Event::DownloadAttachment(attachment_id, r)) + self.send_event_sync(|r| Event::DownloadAttachment(attachment_id, preview, r)) } } diff --git a/kordophoned/src/main.rs b/kordophoned/src/main.rs index 4cd6727..c3454f4 100644 --- a/kordophoned/src/main.rs +++ b/kordophoned/src/main.rs @@ -58,7 +58,7 @@ async fn main() { let dbus_registry = DbusRegistry::new(connection.clone()); // Create and register server implementation - let server = ServerImpl::new(daemon.event_sender.clone(), dbus_registry.clone()); + let server = ServerImpl::new(daemon.event_sender.clone()); dbus_registry.register_object( interface::OBJECT_PATH,