use std::{ io::{BufWriter, Write}, path::PathBuf, }; use anyhow::Result; use futures_util::StreamExt; use kordophone::APIInterface; use thiserror::Error; use kordophone_db::database::Database; use kordophone_db::database::DatabaseAccess; use crate::daemon::events::Event; use crate::daemon::events::Reply; use crate::daemon::models::Attachment; use crate::daemon::Daemon; use std::sync::Arc; use tokio::sync::Mutex; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::pin; use tokio::time::Duration; mod target { pub static ATTACHMENTS: &str = "attachments"; } #[derive(Debug)] pub enum AttachmentStoreEvent { // Get the attachment info for a given attachment guid. // Args: attachment guid, reply channel. GetAttachmentInfo(String, Reply), // Queue a download for a given attachment guid. // Args: // - attachment guid // - preview: whether to download the preview (true) or full attachment (false) QueueDownloadAttachment(String, bool), } #[derive(Debug, Error)] enum AttachmentStoreError { #[error("attachment has already been downloaded")] AttachmentAlreadyDownloaded, #[error("Client error: {0}")] APIClientError(String), } pub struct AttachmentStore { store_path: PathBuf, database: Arc>, daemon_event_sink: Sender, event_source: Receiver, event_sink: Option>, } impl AttachmentStore { pub fn get_default_store_path() -> PathBuf { let data_dir = Daemon::get_data_dir().expect("Unable to get data path"); data_dir.join("attachments") } pub fn new(database: Arc>, daemon_event_sink: Sender) -> AttachmentStore { let store_path = Self::get_default_store_path(); log::info!(target: target::ATTACHMENTS, "Attachment store path: {}", store_path.display()); // Create the attachment store if it doesn't exist std::fs::create_dir_all(&store_path) .expect("Wasn't able to create the attachment store path"); let (event_sink, event_source) = tokio::sync::mpsc::channel(100); AttachmentStore { store_path: store_path, database: database, daemon_event_sink: daemon_event_sink, event_source: event_source, event_sink: Some(event_sink), } } pub fn get_event_sink(&mut self) -> Sender { self.event_sink.take().unwrap() } fn get_attachment(&self, guid: &String) -> Attachment { Self::get_attachment_impl(&self.store_path, guid) } pub fn get_attachment_impl(store_path: &PathBuf, guid: &String) -> Attachment { let base_path = store_path.join(guid); Attachment { guid: guid.to_owned(), base_path: base_path, metadata: None, } } async fn download_attachment(&mut self, attachment: &Attachment, preview: bool) -> Result<()> { if attachment.is_downloaded(preview) { log::info!(target: target::ATTACHMENTS, "Attachment already downloaded: {}", attachment.guid); return Err(AttachmentStoreError::AttachmentAlreadyDownloaded.into()); } log::info!(target: target::ATTACHMENTS, "Starting download for attachment: {}", attachment.guid); // Create temporary file first, we'll atomically swap later. assert!(!std::fs::exists(&attachment.get_path(preview)).unwrap()); let file = std::fs::File::create(&attachment.get_path(preview))?; let mut writer = BufWriter::new(&file); log::trace!(target: target::ATTACHMENTS, "Created attachment file at {}", &attachment.get_path(preview).display()); let mut client = Daemon::get_client_impl(&mut self.database).await?; let stream = client .fetch_attachment_data(&attachment.guid, preview) .await .map_err(|e| AttachmentStoreError::APIClientError(format!("{:?}", e)))?; // Since we're async, we need to pin this. pin!(stream); log::trace!(target: target::ATTACHMENTS, "Writing attachment data to disk"); while let Some(Ok(data)) = stream.next().await { writer.write(data.as_ref())?; } log::info!(target: target::ATTACHMENTS, "Completed download for attachment: {}", attachment.guid); Ok(()) } pub async fn run(&mut self) { loop { tokio::select! { Some(event) = self.event_source.recv() => { log::debug!(target: target::ATTACHMENTS, "Received attachment store event: {:?}", event); match event { AttachmentStoreEvent::QueueDownloadAttachment(guid, preview) => { let attachment = self.get_attachment(&guid); if !attachment.is_downloaded(preview) { self.download_attachment(&attachment, preview).await.unwrap_or_else(|e| { log::error!(target: target::ATTACHMENTS, "Error downloading attachment: {}", e); }); } else { log::info!(target: target::ATTACHMENTS, "Attachment already downloaded: {}", guid); } } AttachmentStoreEvent::GetAttachmentInfo(guid, reply) => { let attachment = self.get_attachment(&guid); reply.send(attachment).unwrap(); } } } } } } }