diff --git a/kordophone/src/api/http_client.rs b/kordophone/src/api/http_client.rs index e0f4249..360d92b 100644 --- a/kordophone/src/api/http_client.rs +++ b/kordophone/src/api/http_client.rs @@ -469,10 +469,14 @@ impl HTTPAPIClient { .expect("Unable to build request") }; + log::trace!("Obtaining token from auth store"); let token = self.auth_store.get_token().await; - let request = build_request(&token); - let mut response = self.client.request(request).await?; + log::trace!("Token: {:?}", token); + let request = build_request(&token); + log::trace!("Request: {:?}. Sending request...", request); + + let mut response = self.client.request(request).await?; log::debug!("-> Response: {:}", response.status()); match response.status() { @@ -502,7 +506,9 @@ impl HTTPAPIClient { // Other errors: bubble up. _ => { - let message = format!("Request failed ({:})", response.status()); + let status = response.status(); + let body_str = hyper::body::to_bytes(response.into_body()).await?; + let message = format!("Request failed ({:}). Response body: {:?}", status, String::from_utf8_lossy(&body_str)); return Err(Error::ClientError(message)); } } diff --git a/kordophone/src/model/message.rs b/kordophone/src/model/message.rs index cca26f5..2b53bdf 100644 --- a/kordophone/src/model/message.rs +++ b/kordophone/src/model/message.rs @@ -10,11 +10,11 @@ pub type MessageID = ::ID; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct AttributionInfo { /// Picture width - #[serde(rename = "pgensh")] + #[serde(rename = "pgensw")] pub width: Option, /// Picture height - #[serde(rename = "pgensw")] + #[serde(rename = "pgensh")] pub height: Option, } diff --git a/kordophoned/include/net.buzzert.kordophonecd.Server.xml b/kordophoned/include/net.buzzert.kordophonecd.Server.xml index d771c05..2cf0e9b 100644 --- a/kordophoned/include/net.buzzert.kordophonecd.Server.xml +++ b/kordophoned/include/net.buzzert.kordophonecd.Server.xml @@ -122,8 +122,6 @@ - - diff --git a/kordophoned/src/daemon/attachment_store.rs b/kordophoned/src/daemon/attachment_store.rs index 7b35cb9..711fb95 100644 --- a/kordophoned/src/daemon/attachment_store.rs +++ b/kordophoned/src/daemon/attachment_store.rs @@ -10,7 +10,7 @@ use thiserror::Error; use kordophone_db::database::Database; -use crate::daemon::events::Event; +use crate::daemon::events::Event as DaemonEvent; use crate::daemon::events::Reply; use crate::daemon::models::Attachment; use crate::daemon::Daemon; @@ -43,14 +43,23 @@ enum AttachmentStoreError { #[error("attachment has already been downloaded")] AttachmentAlreadyDownloaded, + #[error("temporary file already exists, assuming download is in progress")] + DownloadAlreadyInProgress, + #[error("Client error: {0}")] APIClientError(String), } +#[derive(Debug, Clone)] +struct DownloadRequest { + guid: String, + preview: bool, +} + pub struct AttachmentStore { store_path: PathBuf, database: Arc>, - daemon_event_sink: Sender, + daemon_event_sink: Sender, event_source: Receiver, event_sink: Option>, @@ -64,7 +73,7 @@ impl AttachmentStore { pub fn new( database: Arc>, - daemon_event_sink: Sender, + daemon_event_sink: Sender, ) -> AttachmentStore { let store_path = Self::get_default_store_path(); log::info!(target: target::ATTACHMENTS, "Attachment store path: {}", store_path.display()); @@ -101,36 +110,54 @@ impl AttachmentStore { } } - async fn download_attachment(&mut self, attachment: &Attachment, preview: bool) -> Result<()> { + async fn download_attachment_impl( + store_path: &PathBuf, + database: &mut Arc>, + daemon_event_sink: &Sender, + guid: &String, + preview: bool + ) -> Result<()> { + let attachment = Self::get_attachment_impl(store_path, guid); + if attachment.is_downloaded(preview) { log::info!(target: target::ATTACHMENTS, "Attachment already downloaded: {}", attachment.guid); return Err(AttachmentStoreError::AttachmentAlreadyDownloaded.into()); } + let temporary_path = attachment.get_path_for_preview_scratch(preview, true); + if std::fs::exists(&temporary_path).unwrap_or(false) { + log::info!(target: target::ATTACHMENTS, "Temporary file already exists: {}, assuming download is in progress", temporary_path.display()); + return Err(AttachmentStoreError::DownloadAlreadyInProgress.into()); + } + log::info!(target: target::ATTACHMENTS, "Starting download for attachment: {}", attachment.guid); - // Create temporary file first, we'll atomically swap later. - assert!(!std::fs::exists(&attachment.get_path(preview)).unwrap()); - let file = std::fs::File::create(&attachment.get_path(preview))?; + let file = std::fs::File::create(&temporary_path)?; let mut writer = BufWriter::new(&file); - - log::trace!(target: target::ATTACHMENTS, "Created attachment file at {}", &attachment.get_path(preview).display()); - - let mut client = Daemon::get_client_impl(&mut self.database).await?; - let stream = client + let mut client = Daemon::get_client_impl(database).await?; + let mut stream = client .fetch_attachment_data(&attachment.guid, preview) .await .map_err(|e| AttachmentStoreError::APIClientError(format!("{:?}", e)))?; - // Since we're async, we need to pin this. - pin!(stream); - - log::trace!(target: target::ATTACHMENTS, "Writing attachment data to disk"); + log::trace!(target: target::ATTACHMENTS, "Writing attachment {:?} data to temporary file {:?}", &attachment.guid, &temporary_path); while let Some(Ok(data)) = stream.next().await { writer.write(data.as_ref())?; } + + // Flush and sync the temporary file before moving + writer.flush()?; + file.sync_all()?; + + // Atomically move the temporary file to the final location + std::fs::rename(&temporary_path, &attachment.get_path_for_preview_scratch(preview, false))?; log::info!(target: target::ATTACHMENTS, "Completed download for attachment: {}", attachment.guid); + + // Send a signal to the daemon that the attachment has been downloaded. + let event = DaemonEvent::AttachmentDownloaded(attachment.guid.clone()); + daemon_event_sink.send(event).await.unwrap(); + Ok(()) } @@ -144,9 +171,27 @@ impl AttachmentStore { AttachmentStoreEvent::QueueDownloadAttachment(guid, preview) => { let attachment = self.get_attachment(&guid); if !attachment.is_downloaded(preview) { - self.download_attachment(&attachment, preview).await.unwrap_or_else(|e| { - log::error!(target: target::ATTACHMENTS, "Error downloading attachment: {}", e); + let store_path = self.store_path.clone(); + let mut database = self.database.clone(); + let daemon_event_sink = self.daemon_event_sink.clone(); + let _guid = guid.clone(); + + // Spawn a new task here so we don't block incoming queue events. + tokio::spawn(async move { + let result = Self::download_attachment_impl( + &store_path, + &mut database, + &daemon_event_sink, + &_guid, + preview, + ).await; + + if let Err(e) = result { + log::error!(target: target::ATTACHMENTS, "Error downloading attachment {}: {}", &_guid, e); + } }); + + log::debug!(target: target::ATTACHMENTS, "Queued download for attachment: {}", &guid); } else { log::info!(target: target::ATTACHMENTS, "Attachment already downloaded: {}", guid); } diff --git a/kordophoned/src/daemon/events.rs b/kordophoned/src/daemon/events.rs index 627461e..19c89a6 100644 --- a/kordophoned/src/daemon/events.rs +++ b/kordophoned/src/daemon/events.rs @@ -71,4 +71,9 @@ pub enum Event { /// Delete all conversations from the database. DeleteAllConversations(Reply<()>), + + /// Notifies the daemon that an attachment has been downloaded. + /// Parameters: + /// - attachment_id: The attachment ID that was downloaded. + AttachmentDownloaded(String), } diff --git a/kordophoned/src/daemon/mod.rs b/kordophoned/src/daemon/mod.rs index d62b67c..3266eda 100644 --- a/kordophoned/src/daemon/mod.rs +++ b/kordophoned/src/daemon/mod.rs @@ -314,6 +314,16 @@ impl Daemon { reply.send(()).unwrap(); } + + Event::AttachmentDownloaded(attachment_id) => { + log::info!(target: target::ATTACHMENTS, "Daemon: attachment downloaded: {}, sending signal", attachment_id); + + // Send signal to the client that the attachment has been downloaded. + self.signal_sender + .send(Signal::AttachmentDownloaded(attachment_id)) + .await + .unwrap(); + } } } diff --git a/kordophoned/src/daemon/models/attachment.rs b/kordophoned/src/daemon/models/attachment.rs index 0251451..6e0b95a 100644 --- a/kordophoned/src/daemon/models/attachment.rs +++ b/kordophoned/src/daemon/models/attachment.rs @@ -19,16 +19,28 @@ pub struct Attachment { } impl Attachment { - pub fn get_path(&self, preview: bool) -> PathBuf { - self.base_path - .with_extension(if preview { "preview" } else { "full" }) + pub fn get_path(&self) -> PathBuf { + self.get_path_for_preview_scratch(false, false) + } + + pub fn get_path_for_preview(&self, preview: bool) -> PathBuf { + self.get_path_for_preview_scratch(preview, false) + } + + pub fn get_path_for_preview_scratch(&self, preview: bool, scratch: bool) -> PathBuf { + let extension = if preview { "preview" } else { "full" }; + if scratch { + self.base_path.with_extension(format!("{}.download", extension)) + } else { + self.base_path.with_extension(extension) + } } pub fn is_downloaded(&self, preview: bool) -> bool { - std::fs::exists(&self.get_path(preview)).expect( + std::fs::exists(&self.get_path_for_preview(preview)).expect( format!( "Wasn't able to check for the existence of an attachment file path at {}", - &self.get_path(preview).display() + &self.get_path_for_preview(preview).display() ) .as_str(), ) diff --git a/kordophoned/src/daemon/signals.rs b/kordophoned/src/daemon/signals.rs index c4fb715..7509740 100644 --- a/kordophoned/src/daemon/signals.rs +++ b/kordophoned/src/daemon/signals.rs @@ -7,4 +7,9 @@ pub enum Signal { /// Parameters: /// - conversation_id: The ID of the conversation that was updated. MessagesUpdated(String), + + /// Emitted when an attachment has been downloaded. + /// Parameters: + /// - attachment_id: The ID of the attachment that was downloaded. + AttachmentDownloaded(String), } diff --git a/kordophoned/src/dbus/mod.rs b/kordophoned/src/dbus/mod.rs index 17fbf1e..1ad4a0c 100644 --- a/kordophoned/src/dbus/mod.rs +++ b/kordophoned/src/dbus/mod.rs @@ -12,5 +12,6 @@ pub mod interface { pub mod signals { pub use crate::interface::NetBuzzertKordophoneRepositoryConversationsUpdated as ConversationsUpdated; pub use crate::interface::NetBuzzertKordophoneRepositoryMessagesUpdated as MessagesUpdated; + pub use crate::interface::NetBuzzertKordophoneRepositoryAttachmentDownloadCompleted as AttachmentDownloadCompleted; } } diff --git a/kordophoned/src/dbus/server_impl.rs b/kordophoned/src/dbus/server_impl.rs index ba5773e..0964f4a 100644 --- a/kordophoned/src/dbus/server_impl.rs +++ b/kordophoned/src/dbus/server_impl.rs @@ -125,7 +125,6 @@ impl DbusRepository for ServerImpl { messages .into_iter() .map(|msg| { - let msg_id = msg.id.clone(); // Store ID for potential error logging let mut map = arg::PropMap::new(); map.insert("id".into(), arg::Variant(Box::new(msg.id))); map.insert("text".into(), arg::Variant(Box::new(msg.text))); @@ -150,8 +149,8 @@ impl DbusRepository for ServerImpl { ); // Get attachment paths and download status - let path = attachment.get_path(false); - let preview_path = attachment.get_path(true); + let path = attachment.get_path_for_preview(false); + let preview_path = attachment.get_path_for_preview(true); let downloaded = attachment.is_downloaded(false); let preview_downloaded = attachment.is_downloaded(true); @@ -238,10 +237,10 @@ impl DbusRepository for ServerImpl { ) -> Result<(String, String, bool, bool), dbus::MethodErr> { self.send_event_sync(|r| Event::GetAttachment(attachment_id, r)) .map(|attachment| { - let path = attachment.get_path(false); + let path = attachment.get_path_for_preview(false); let downloaded = attachment.is_downloaded(false); - let preview_path = attachment.get_path(true); + let preview_path = attachment.get_path_for_preview(true); let preview_downloaded = attachment.is_downloaded(true); ( diff --git a/kordophoned/src/main.rs b/kordophoned/src/main.rs index 629b26a..03bca17 100644 --- a/kordophoned/src/main.rs +++ b/kordophoned/src/main.rs @@ -98,6 +98,16 @@ async fn main() { 0 }); } + + Signal::AttachmentDownloaded(attachment_id) => { + log::debug!("Sending signal: AttachmentDownloaded for attachment {}", attachment_id); + dbus_registry + .send_signal(interface::OBJECT_PATH, DbusSignals::AttachmentDownloadCompleted { attachment_id }) + .unwrap_or_else(|_| { + log::error!("Failed to send signal"); + 0 + }); + } } } });