From 4e8b161d26b54245c8ecb1fa2238ee995c588778 Mon Sep 17 00:00:00 2001 From: James Magahern Date: Wed, 10 Sep 2025 13:48:27 -0700 Subject: [PATCH 1/3] wip: attachment MIME --- core/Cargo.lock | 23 ++++ core/kordophone/src/api/http_client.rs | 21 ++++ core/kordophoned/Cargo.toml | 2 +- .../src/daemon/attachment_store.rs | 117 ++++++++++++++++-- .../src/daemon/models/attachment.rs | 62 +++++++++- 5 files changed, 208 insertions(+), 17 deletions(-) diff --git a/core/Cargo.lock b/core/Cargo.lock index 261b690..645ea55 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -1201,6 +1201,7 @@ dependencies = [ "kordophone", "kordophone-db", "log", + "mime_guess", "once_cell", "serde", "serde_json", @@ -1345,6 +1346,22 @@ dependencies = [ "quote", ] +[[package]] +name = "mime" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" + +[[package]] +name = "mime_guess" +version = "2.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e" +dependencies = [ + "mime", + "unicase", +] + [[package]] name = "miniz_oxide" version = "0.7.2" @@ -2350,6 +2367,12 @@ version = "1.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1dccffe3ce07af9386bfd29e80c0ab1a8205a2fc34e4bcd40364df902cfa8f3f" +[[package]] +name = "unicase" +version = "2.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75b844d17643ee918803943289730bec8aac480150456169e647ed0b576ba539" + [[package]] name = "unicode-ident" version = "1.0.12" diff --git a/core/kordophone/src/api/http_client.rs b/core/kordophone/src/api/http_client.rs index c28c952..4e7a1e9 100644 --- a/core/kordophone/src/api/http_client.rs +++ b/core/kordophone/src/api/http_client.rs @@ -618,6 +618,27 @@ impl HTTPAPIClient { Ok(response) } + + // Fetch an attachment while preserving response headers (e.g., Content-Type). + // Returns the streaming body and the Content-Type header if present. + pub async fn fetch_attachment_with_metadata( + &mut self, + guid: &str, + preview: bool, + ) -> Result<(ResponseStream, Option), Error> { + let endpoint = format!("attachment?guid={}&preview={}", guid, preview); + let response = self + .response_with_body_retry(&endpoint, Method::GET, Body::empty, true) + .await?; + + let content_type = response + .headers() + .get(hyper::header::CONTENT_TYPE) + .and_then(|v| v.to_str().ok()) + .map(|s| s.to_string()); + + Ok((ResponseStream::from(response.into_body()), content_type)) + } } #[cfg(test)] diff --git a/core/kordophoned/Cargo.toml b/core/kordophoned/Cargo.toml index 227efa9..9a15f04 100644 --- a/core/kordophoned/Cargo.toml +++ b/core/kordophoned/Cargo.toml @@ -22,6 +22,7 @@ tokio = { version = "1", features = ["full"] } tokio-condvar = "0.3.0" uuid = "1.16.0" once_cell = "1.19.0" +mime_guess = "2.0" # D-Bus dependencies only on Linux [target.'cfg(target_os = "linux")'.dependencies] @@ -49,4 +50,3 @@ assets = [ { source = "../target/release/kpcli", dest = "/usr/bin/kpcli", mode = "755" }, { source = "include/net.buzzert.kordophonecd.service", dest = "/usr/share/dbus-1/services/net.buzzert.kordophonecd.service", mode = "644" }, ] - diff --git a/core/kordophoned/src/daemon/attachment_store.rs b/core/kordophoned/src/daemon/attachment_store.rs index c699503..61151a1 100644 --- a/core/kordophoned/src/daemon/attachment_store.rs +++ b/core/kordophoned/src/daemon/attachment_store.rs @@ -103,11 +103,49 @@ impl AttachmentStore { pub fn get_attachment_impl(store_path: &PathBuf, guid: &String) -> Attachment { let base_path = store_path.join(guid); - Attachment { + let mut attachment = Attachment { guid: guid.to_owned(), base_path: base_path, metadata: None, + mime_type: None, + }; + + // Best-effort: if a file already exists, try to infer MIME type from extension + let kind = "full"; + let stem = attachment + .base_path + .file_name() + .map(|s| s.to_string_lossy().to_string()) + .unwrap_or_default(); + let legacy = attachment.base_path.with_extension(kind); + let existing_path = if legacy.exists() { + Some(legacy) + } else { + let prefix = format!("{}.{}.", stem, kind); + let parent = attachment + .base_path + .parent() + .unwrap_or_else(|| std::path::Path::new(".")); + let mut found: Option = None; + if let Ok(entries) = std::fs::read_dir(parent) { + for entry in entries.flatten() { + let name = entry.file_name().to_string_lossy().to_string(); + if name.starts_with(&prefix) && !name.ends_with(".download") { + found = Some(entry.path()); + break; + } + } + } + found + }; + + if let Some(existing) = existing_path { + if let Some(m) = mime_guess::from_path(&existing).first_raw() { + attachment.mime_type = Some(m.to_string()); + } } + + attachment } async fn download_attachment_impl( @@ -124,22 +162,45 @@ impl AttachmentStore { return Err(AttachmentStoreError::AttachmentAlreadyDownloaded.into()); } - let temporary_path = attachment.get_path_for_preview_scratch(preview, true); - if std::fs::exists(&temporary_path).unwrap_or(false) { - log::warn!(target: target::ATTACHMENTS, "Temporary file already exists: {}, assuming download is in progress", temporary_path.display()); + // Check if any in-progress temporary file exists already for this attachment + if let Some(in_progress) = Self::find_in_progress_download(&attachment, preview) { + log::warn!(target: target::ATTACHMENTS, "Temporary file already exists: {}, assuming download is in progress", in_progress.display()); return Err(AttachmentStoreError::DownloadAlreadyInProgress.into()); } log::debug!(target: target::ATTACHMENTS, "Starting download for attachment: {}", attachment.guid); - let file = std::fs::File::create(&temporary_path)?; - let mut writer = BufWriter::new(&file); let mut client = Daemon::get_client_impl(database).await?; - let mut stream = client - .fetch_attachment_data(&attachment.guid, preview) + let (mut stream, content_type) = client + .fetch_attachment_with_metadata(&attachment.guid, preview) .await .map_err(|e| AttachmentStoreError::APIClientError(format!("{:?}", e)))?; + let kind = if preview { "preview" } else { "full" }; + let normalized_mime = content_type + .as_deref() + .map(|s| s.split(';').next().unwrap_or(s).trim().to_string()); + + let guessed_ext = normalized_mime + .as_deref() + .and_then(|m| mime_guess::get_mime_extensions_str(m)) + .and_then(|list| list.first().copied()) + .unwrap_or("bin"); + + let final_path = attachment + .base_path + .with_extension(format!("{}.{}", kind, guessed_ext)); + let temporary_path = final_path.with_extension(format!( + "{}.download", + final_path + .extension() + .and_then(|e| e.to_str()) + .unwrap_or("") + )); + + let file = std::fs::File::create(&temporary_path)?; + let mut writer = BufWriter::new(&file); + log::trace!(target: target::ATTACHMENTS, "Writing attachment {:?} data to temporary file {:?}", &attachment.guid, &temporary_path); while let Some(Ok(data)) = stream.next().await { writer.write(data.as_ref())?; @@ -150,10 +211,7 @@ impl AttachmentStore { file.sync_all()?; // Atomically move the temporary file to the final location - std::fs::rename( - &temporary_path, - &attachment.get_path_for_preview_scratch(preview, false), - )?; + std::fs::rename(&temporary_path, &final_path)?; log::debug!(target: target::ATTACHMENTS, "Completed download for attachment: {}", attachment.guid); @@ -164,6 +222,41 @@ impl AttachmentStore { Ok(()) } + fn find_in_progress_download(attachment: &Attachment, preview: bool) -> Option { + let kind = if preview { "preview" } else { "full" }; + + // Legacy temp path: guid..download + let legacy = attachment + .base_path + .with_extension(format!("{}.download", kind)); + if legacy.exists() { + return Some(legacy); + } + + // Scan for any guid...download + let parent = attachment + .base_path + .parent() + .unwrap_or_else(|| std::path::Path::new(".")); + let stem = attachment + .base_path + .file_name() + .map(|s| s.to_string_lossy().to_string()) + .unwrap_or_default(); + let prefix = format!("{}.{}.", stem, kind); + + if let Ok(entries) = std::fs::read_dir(parent) { + for entry in entries.flatten() { + let name = entry.file_name().to_string_lossy().to_string(); + if name.starts_with(&prefix) && name.ends_with(".download") { + return Some(entry.path()); + } + } + } + + None + } + async fn upload_attachment_impl( store_path: &PathBuf, incoming_path: &PathBuf, diff --git a/core/kordophoned/src/daemon/models/attachment.rs b/core/kordophoned/src/daemon/models/attachment.rs index 777fff7..cd38601 100644 --- a/core/kordophoned/src/daemon/models/attachment.rs +++ b/core/kordophoned/src/daemon/models/attachment.rs @@ -1,4 +1,4 @@ -use std::path::PathBuf; +use std::path::{Path, PathBuf}; #[derive(Debug, Clone)] pub struct AttachmentMetadata { @@ -16,6 +16,7 @@ pub struct Attachment { pub guid: String, pub base_path: PathBuf, pub metadata: Option, + pub mime_type: Option, } impl Attachment { @@ -28,12 +29,35 @@ impl Attachment { } pub fn get_path_for_preview_scratch(&self, preview: bool, scratch: bool) -> PathBuf { - let extension = if preview { "preview" } else { "full" }; + // Determine whether this is a preview or full attachment. + let kind = if preview { "preview" } else { "full" }; + + // If not a scratch path, and a file already exists on disk with a concrete + // file extension (e.g., guid.full.jpg), return that existing path. + if !scratch { + if let Some(existing) = self.find_existing_path(preview) { + return existing; + } + } + + // Fall back to constructing a path using known info. If we know the MIME type, + // prefer an extension guessed from it; otherwise keep legacy naming. + let ext_from_mime = self + .mime_type + .as_ref() + .and_then(|m| mime_guess::get_mime_extensions_str(m.split(';').next().unwrap_or(m))) + .and_then(|list| list.first().copied()); + + let base_ext = match ext_from_mime { + Some(ext) => format!("{}.{}", kind, ext), + None => kind.to_string(), + }; + if scratch { self.base_path - .with_extension(format!("{}.download", extension)) + .with_extension(format!("{}.download", base_ext)) } else { - self.base_path.with_extension(extension) + self.base_path.with_extension(base_ext) } } @@ -46,6 +70,36 @@ impl Attachment { .as_str(), ) } + + fn find_existing_path(&self, preview: bool) -> Option { + let kind = if preview { "preview" } else { "full" }; + + // First, check legacy path without a concrete file extension. + let legacy = self.base_path.with_extension(kind); + if legacy.exists() { + return Some(legacy); + } + + // Next, search for a filename like: .. + let file_stem = self + .base_path + .file_name() + .map(|s| s.to_string_lossy().to_string())?; + let prefix = format!("{}.{}.", file_stem, kind); + + let parent = self.base_path.parent().unwrap_or_else(|| Path::new(".")); + if let Ok(dir) = std::fs::read_dir(parent) { + for entry in dir.flatten() { + let file_name = entry.file_name(); + let name = file_name.to_string_lossy(); + if name.starts_with(&prefix) && !name.ends_with(".download") { + return Some(entry.path()); + } + } + } + + None + } } impl From for AttachmentMetadata { From e8256a9e571aefa3f95afd5bd673f95cfe7b8f53 Mon Sep 17 00:00:00 2001 From: James Magahern Date: Wed, 10 Sep 2025 14:06:54 -0700 Subject: [PATCH 2/3] core: attachment mime: prefer jpg instead of jfif --- .../src/daemon/attachment_store.rs | 22 ++++++++++++++++--- .../src/daemon/models/attachment.rs | 19 ++++++++++++++-- 2 files changed, 36 insertions(+), 5 deletions(-) diff --git a/core/kordophoned/src/daemon/attachment_store.rs b/core/kordophoned/src/daemon/attachment_store.rs index 61151a1..42fced1 100644 --- a/core/kordophoned/src/daemon/attachment_store.rs +++ b/core/kordophoned/src/daemon/attachment_store.rs @@ -165,6 +165,7 @@ impl AttachmentStore { // Check if any in-progress temporary file exists already for this attachment if let Some(in_progress) = Self::find_in_progress_download(&attachment, preview) { log::warn!(target: target::ATTACHMENTS, "Temporary file already exists: {}, assuming download is in progress", in_progress.display()); + // Treat as a non-fatal condition so we don't spam errors. return Err(AttachmentStoreError::DownloadAlreadyInProgress.into()); } @@ -183,8 +184,7 @@ impl AttachmentStore { let guessed_ext = normalized_mime .as_deref() - .and_then(|m| mime_guess::get_mime_extensions_str(m)) - .and_then(|list| list.first().copied()) + .and_then(|m| Attachment::preferred_extension_for_mime(m)) .unwrap_or("bin"); let final_path = attachment @@ -327,7 +327,23 @@ impl AttachmentStore { ).await; if let Err(e) = result { - log::error!(target: target::ATTACHMENTS, "Error downloading attachment {}: {}", &_guid, e); + // Downgrade noise for expected cases + if let Some(kind) = e.downcast_ref::() { + match kind { + AttachmentStoreError::AttachmentAlreadyDownloaded => { + log::debug!(target: target::ATTACHMENTS, "Attachment already downloaded: {}", &_guid); + } + AttachmentStoreError::DownloadAlreadyInProgress => { + // Already logged a warning where detected + log::debug!(target: target::ATTACHMENTS, "Download already in progress: {}", &_guid); + } + _ => { + log::error!(target: target::ATTACHMENTS, "Error downloading attachment {}: {}", &_guid, kind); + } + } + } else { + log::error!(target: target::ATTACHMENTS, "Error downloading attachment {}: {}", &_guid, e); + } } }); diff --git a/core/kordophoned/src/daemon/models/attachment.rs b/core/kordophoned/src/daemon/models/attachment.rs index cd38601..6adb261 100644 --- a/core/kordophoned/src/daemon/models/attachment.rs +++ b/core/kordophoned/src/daemon/models/attachment.rs @@ -20,6 +20,22 @@ pub struct Attachment { } impl Attachment { + pub(crate) fn preferred_extension_for_mime(mime: &str) -> Option<&'static str> { + let normalized = mime.split(';').next().unwrap_or(mime).trim(); + // Prefer common, user-friendly extensions over obscure ones + match normalized { + "image/jpeg" | "image/pjpeg" => Some("jpg"), + _ => mime_guess::get_mime_extensions_str(normalized) + .and_then(|list| { + // If jpg is one of the candidates, prefer it + if list.iter().any(|e| *e == "jpg") { + Some("jpg") + } else { + list.first().copied() + } + }), + } + } pub fn get_path(&self) -> PathBuf { self.get_path_for_preview_scratch(false, false) } @@ -45,8 +61,7 @@ impl Attachment { let ext_from_mime = self .mime_type .as_ref() - .and_then(|m| mime_guess::get_mime_extensions_str(m.split(';').next().unwrap_or(m))) - .and_then(|list| list.first().copied()); + .and_then(|m| Self::preferred_extension_for_mime(m)); let base_ext = match ext_from_mime { Some(ext) => format!("{}.{}", kind, ext), From 778d4b6650ad866443c00f9219ddce005c0bf4cc Mon Sep 17 00:00:00 2001 From: James Magahern Date: Wed, 10 Sep 2025 14:23:02 -0700 Subject: [PATCH 3/3] core: attachment store: limit concurrent downloads --- core/kordophoned/src/daemon/attachment_store.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/core/kordophoned/src/daemon/attachment_store.rs b/core/kordophoned/src/daemon/attachment_store.rs index 42fced1..177477a 100644 --- a/core/kordophoned/src/daemon/attachment_store.rs +++ b/core/kordophoned/src/daemon/attachment_store.rs @@ -17,7 +17,7 @@ use crate::daemon::Daemon; use std::sync::Arc; use tokio::sync::mpsc::{Receiver, Sender}; -use tokio::sync::Mutex; +use tokio::sync::{Mutex, Semaphore}; use uuid::Uuid; @@ -63,6 +63,9 @@ pub struct AttachmentStore { event_source: Receiver, event_sink: Option>, + + // Limits concurrent downloads to avoid overloading server and local I/O + download_limit: Arc, } impl AttachmentStore { @@ -84,12 +87,16 @@ impl AttachmentStore { let (event_sink, event_source) = tokio::sync::mpsc::channel(100); + // Limit to at most 5 concurrent downloads by default + let download_limit = Arc::new(Semaphore::new(5)); + AttachmentStore { store_path: store_path, database: database, daemon_event_sink: daemon_event_sink, event_source: event_source, event_sink: Some(event_sink), + download_limit, } } @@ -315,9 +322,12 @@ impl AttachmentStore { let mut database = self.database.clone(); let daemon_event_sink = self.daemon_event_sink.clone(); let _guid = guid.clone(); + let limiter = self.download_limit.clone(); // Spawn a new task here so we don't block incoming queue events. tokio::spawn(async move { + // Acquire a slot in the concurrent download limiter. + let _permit = limiter.acquire_owned().await.expect("Semaphore closed"); let result = Self::download_attachment_impl( &store_path, &mut database,