use std::{ io::{BufWriter, Write}, path::PathBuf, }; use anyhow::Result; use futures_util::StreamExt; use kordophone::APIInterface; use thiserror::Error; use kordophone_db::database::Database; use crate::daemon::events::Event as DaemonEvent; use crate::daemon::events::Reply; use crate::daemon::models::Attachment; use crate::daemon::Daemon; use std::sync::Arc; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::Mutex; use uuid::Uuid; mod target { pub static ATTACHMENTS: &str = "attachments"; } #[derive(Debug)] pub enum AttachmentStoreEvent { // Get the attachment info for a given attachment guid. // Args: attachment guid, reply channel. GetAttachmentInfo(String, Reply), // Queue a download for a given attachment guid. // Args: // - attachment guid // - preview: whether to download the preview (true) or full attachment (false) QueueDownloadAttachment(String, bool), // Queue an upload for a given attachment file. // Args: // - path: the path to the attachment file // - reply: a reply channel to send the pending upload guid to QueueUploadAttachment(PathBuf, Reply), } #[derive(Debug, Error)] enum AttachmentStoreError { #[error("attachment has already been downloaded")] AttachmentAlreadyDownloaded, #[error("temporary file already exists, assuming download is in progress")] DownloadAlreadyInProgress, #[error("Client error: {0}")] APIClientError(String), } pub struct AttachmentStore { store_path: PathBuf, database: Arc>, daemon_event_sink: Sender, event_source: Receiver, event_sink: Option>, } impl AttachmentStore { pub fn get_default_store_path() -> PathBuf { let data_dir = Daemon::get_data_dir().expect("Unable to get data path"); data_dir.join("attachments") } pub fn new( database: Arc>, daemon_event_sink: Sender, ) -> AttachmentStore { let store_path = Self::get_default_store_path(); log::info!(target: target::ATTACHMENTS, "Attachment store path: {}", store_path.display()); // Create the attachment store if it doesn't exist std::fs::create_dir_all(&store_path) .expect("Wasn't able to create the attachment store path"); let (event_sink, event_source) = tokio::sync::mpsc::channel(100); AttachmentStore { store_path: store_path, database: database, daemon_event_sink: daemon_event_sink, event_source: event_source, event_sink: Some(event_sink), } } pub fn get_event_sink(&mut self) -> Sender { self.event_sink.take().unwrap() } fn get_attachment(&self, guid: &String) -> Attachment { Self::get_attachment_impl(&self.store_path, guid) } pub fn get_attachment_impl(store_path: &PathBuf, guid: &String) -> Attachment { let base_path = store_path.join(guid); Attachment { guid: guid.to_owned(), base_path: base_path, metadata: None, } } async fn download_attachment_impl( store_path: &PathBuf, database: &mut Arc>, daemon_event_sink: &Sender, guid: &String, preview: bool ) -> Result<()> { let attachment = Self::get_attachment_impl(store_path, guid); if attachment.is_downloaded(preview) { log::info!(target: target::ATTACHMENTS, "Attachment already downloaded: {}", attachment.guid); return Err(AttachmentStoreError::AttachmentAlreadyDownloaded.into()); } let temporary_path = attachment.get_path_for_preview_scratch(preview, true); if std::fs::exists(&temporary_path).unwrap_or(false) { log::info!(target: target::ATTACHMENTS, "Temporary file already exists: {}, assuming download is in progress", temporary_path.display()); return Err(AttachmentStoreError::DownloadAlreadyInProgress.into()); } log::info!(target: target::ATTACHMENTS, "Starting download for attachment: {}", attachment.guid); let file = std::fs::File::create(&temporary_path)?; let mut writer = BufWriter::new(&file); let mut client = Daemon::get_client_impl(database).await?; let mut stream = client .fetch_attachment_data(&attachment.guid, preview) .await .map_err(|e| AttachmentStoreError::APIClientError(format!("{:?}", e)))?; log::trace!(target: target::ATTACHMENTS, "Writing attachment {:?} data to temporary file {:?}", &attachment.guid, &temporary_path); while let Some(Ok(data)) = stream.next().await { writer.write(data.as_ref())?; } // Flush and sync the temporary file before moving writer.flush()?; file.sync_all()?; // Atomically move the temporary file to the final location std::fs::rename(&temporary_path, &attachment.get_path_for_preview_scratch(preview, false))?; log::info!(target: target::ATTACHMENTS, "Completed download for attachment: {}", attachment.guid); // Send a signal to the daemon that the attachment has been downloaded. let event = DaemonEvent::AttachmentDownloaded(attachment.guid.clone()); daemon_event_sink.send(event).await.unwrap(); Ok(()) } async fn upload_attachment_impl( store_path: &PathBuf, incoming_path: &PathBuf, upload_guid: &String, database: &mut Arc>, daemon_event_sink: &Sender, ) -> Result { use tokio::fs::File; use tokio::io::BufReader; // Create uploads directory if it doesn't exist. let uploads_path = store_path.join("uploads"); std::fs::create_dir_all(&uploads_path).unwrap(); // First, copy the file to the store path, under /uploads/. log::trace!(target: target::ATTACHMENTS, "Copying attachment to uploads directory: {}", uploads_path.display()); let temporary_path = uploads_path.join(incoming_path.file_name().unwrap()); std::fs::copy(incoming_path, &temporary_path).unwrap(); // Open file handle to the temporary file, log::trace!(target: target::ATTACHMENTS, "Opening stream to temporary file: {}", temporary_path.display()); let file = File::open(&temporary_path).await?; let reader: BufReader = BufReader::new(file); // Upload the file to the server. let filename = incoming_path.file_name().unwrap().to_str().unwrap(); log::trace!(target: target::ATTACHMENTS, "Uploading attachment to server: {}", &filename); let mut client = Daemon::get_client_impl(database).await?; let metadata = std::fs::metadata(&temporary_path)?; let size = metadata.len(); let guid = client.upload_attachment(reader, filename, size).await?; // Delete the temporary file. log::debug!(target: target::ATTACHMENTS, "Upload completed with guid {}, deleting temporary file: {}", guid, temporary_path.display()); std::fs::remove_file(&temporary_path).unwrap(); // Send a signal to the daemon that the attachment has been uploaded. let event = DaemonEvent::AttachmentUploaded(upload_guid.clone(), guid.clone()); daemon_event_sink.send(event).await.unwrap(); Ok(guid) } pub async fn run(&mut self) { loop { tokio::select! { Some(event) = self.event_source.recv() => { log::debug!(target: target::ATTACHMENTS, "Received attachment store event: {:?}", event); match event { AttachmentStoreEvent::QueueDownloadAttachment(guid, preview) => { let attachment = self.get_attachment(&guid); if !attachment.is_downloaded(preview) { let store_path = self.store_path.clone(); let mut database = self.database.clone(); let daemon_event_sink = self.daemon_event_sink.clone(); let _guid = guid.clone(); // Spawn a new task here so we don't block incoming queue events. tokio::spawn(async move { let result = Self::download_attachment_impl( &store_path, &mut database, &daemon_event_sink, &_guid, preview, ).await; if let Err(e) = result { log::error!(target: target::ATTACHMENTS, "Error downloading attachment {}: {}", &_guid, e); } }); log::debug!(target: target::ATTACHMENTS, "Queued download for attachment: {}", &guid); } else { log::info!(target: target::ATTACHMENTS, "Attachment already downloaded: {}", guid); } } AttachmentStoreEvent::GetAttachmentInfo(guid, reply) => { let attachment = self.get_attachment(&guid); reply.send(attachment).unwrap(); } AttachmentStoreEvent::QueueUploadAttachment(path, reply) => { let upload_guid = Uuid::new_v4().to_string(); let store_path = self.store_path.clone(); let mut database = self.database.clone(); let daemon_event_sink = self.daemon_event_sink.clone(); let _upload_guid = upload_guid.clone(); tokio::spawn(async move { let result = Self::upload_attachment_impl( &store_path, &path, &_upload_guid, &mut database, &daemon_event_sink, ).await; if let Err(e) = result { log::error!(target: target::ATTACHMENTS, "Error uploading attachment {}: {}", &_upload_guid, e); } }); reply.send(upload_guid).unwrap(); } } } } } } }