use std::{ io::{BufWriter, Write}, path::PathBuf, }; use anyhow::Result; use futures_util::StreamExt; use kordophone::APIInterface; use thiserror::Error; use kordophone_db::database::Database; use crate::daemon::events::Event as DaemonEvent; use crate::daemon::events::Reply; use crate::daemon::models::Attachment; use crate::daemon::Daemon; use std::sync::Arc; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::Mutex; use tokio::pin; mod target { pub static ATTACHMENTS: &str = "attachments"; } #[derive(Debug)] pub enum AttachmentStoreEvent { // Get the attachment info for a given attachment guid. // Args: attachment guid, reply channel. GetAttachmentInfo(String, Reply), // Queue a download for a given attachment guid. // Args: // - attachment guid // - preview: whether to download the preview (true) or full attachment (false) QueueDownloadAttachment(String, bool), } #[derive(Debug, Error)] enum AttachmentStoreError { #[error("attachment has already been downloaded")] AttachmentAlreadyDownloaded, #[error("temporary file already exists, assuming download is in progress")] DownloadAlreadyInProgress, #[error("Client error: {0}")] APIClientError(String), } #[derive(Debug, Clone)] struct DownloadRequest { guid: String, preview: bool, } pub struct AttachmentStore { store_path: PathBuf, database: Arc>, daemon_event_sink: Sender, event_source: Receiver, event_sink: Option>, } impl AttachmentStore { pub fn get_default_store_path() -> PathBuf { let data_dir = Daemon::get_data_dir().expect("Unable to get data path"); data_dir.join("attachments") } pub fn new( database: Arc>, daemon_event_sink: Sender, ) -> AttachmentStore { let store_path = Self::get_default_store_path(); log::info!(target: target::ATTACHMENTS, "Attachment store path: {}", store_path.display()); // Create the attachment store if it doesn't exist std::fs::create_dir_all(&store_path) .expect("Wasn't able to create the attachment store path"); let (event_sink, event_source) = tokio::sync::mpsc::channel(100); AttachmentStore { store_path: store_path, database: database, daemon_event_sink: daemon_event_sink, event_source: event_source, event_sink: Some(event_sink), } } pub fn get_event_sink(&mut self) -> Sender { self.event_sink.take().unwrap() } fn get_attachment(&self, guid: &String) -> Attachment { Self::get_attachment_impl(&self.store_path, guid) } pub fn get_attachment_impl(store_path: &PathBuf, guid: &String) -> Attachment { let base_path = store_path.join(guid); Attachment { guid: guid.to_owned(), base_path: base_path, metadata: None, } } async fn download_attachment_impl( store_path: &PathBuf, database: &mut Arc>, daemon_event_sink: &Sender, guid: &String, preview: bool ) -> Result<()> { let attachment = Self::get_attachment_impl(store_path, guid); if attachment.is_downloaded(preview) { log::info!(target: target::ATTACHMENTS, "Attachment already downloaded: {}", attachment.guid); return Err(AttachmentStoreError::AttachmentAlreadyDownloaded.into()); } let temporary_path = attachment.get_path_for_preview_scratch(preview, true); if std::fs::exists(&temporary_path).unwrap_or(false) { log::info!(target: target::ATTACHMENTS, "Temporary file already exists: {}, assuming download is in progress", temporary_path.display()); return Err(AttachmentStoreError::DownloadAlreadyInProgress.into()); } log::info!(target: target::ATTACHMENTS, "Starting download for attachment: {}", attachment.guid); let file = std::fs::File::create(&temporary_path)?; let mut writer = BufWriter::new(&file); let mut client = Daemon::get_client_impl(database).await?; let mut stream = client .fetch_attachment_data(&attachment.guid, preview) .await .map_err(|e| AttachmentStoreError::APIClientError(format!("{:?}", e)))?; log::trace!(target: target::ATTACHMENTS, "Writing attachment {:?} data to temporary file {:?}", &attachment.guid, &temporary_path); while let Some(Ok(data)) = stream.next().await { writer.write(data.as_ref())?; } // Flush and sync the temporary file before moving writer.flush()?; file.sync_all()?; // Atomically move the temporary file to the final location std::fs::rename(&temporary_path, &attachment.get_path_for_preview_scratch(preview, false))?; log::info!(target: target::ATTACHMENTS, "Completed download for attachment: {}", attachment.guid); // Send a signal to the daemon that the attachment has been downloaded. let event = DaemonEvent::AttachmentDownloaded(attachment.guid.clone()); daemon_event_sink.send(event).await.unwrap(); Ok(()) } pub async fn run(&mut self) { loop { tokio::select! { Some(event) = self.event_source.recv() => { log::debug!(target: target::ATTACHMENTS, "Received attachment store event: {:?}", event); match event { AttachmentStoreEvent::QueueDownloadAttachment(guid, preview) => { let attachment = self.get_attachment(&guid); if !attachment.is_downloaded(preview) { let store_path = self.store_path.clone(); let mut database = self.database.clone(); let daemon_event_sink = self.daemon_event_sink.clone(); let _guid = guid.clone(); // Spawn a new task here so we don't block incoming queue events. tokio::spawn(async move { let result = Self::download_attachment_impl( &store_path, &mut database, &daemon_event_sink, &_guid, preview, ).await; if let Err(e) = result { log::error!(target: target::ATTACHMENTS, "Error downloading attachment {}: {}", &_guid, e); } }); log::debug!(target: target::ATTACHMENTS, "Queued download for attachment: {}", &guid); } else { log::info!(target: target::ATTACHMENTS, "Attachment already downloaded: {}", guid); } } AttachmentStoreEvent::GetAttachmentInfo(guid, reply) => { let attachment = self.get_attachment(&guid); reply.send(attachment).unwrap(); } } } } } } }