Private
Public Access
1
0

Merge branch 'wip/attachment_mime'

* wip/attachment_mime:
  core: attachment store: limit concurrent downloads
  core: attachment mime: prefer jpg instead of jfif
  wip: attachment MIME
This commit is contained in:
2025-09-10 14:41:36 -07:00
5 changed files with 251 additions and 19 deletions

23
core/Cargo.lock generated
View File

@@ -1201,6 +1201,7 @@ dependencies = [
"kordophone",
"kordophone-db",
"log",
"mime_guess",
"once_cell",
"serde",
"serde_json",
@@ -1345,6 +1346,22 @@ dependencies = [
"quote",
]
[[package]]
name = "mime"
version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
[[package]]
name = "mime_guess"
version = "2.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e"
dependencies = [
"mime",
"unicase",
]
[[package]]
name = "miniz_oxide"
version = "0.7.2"
@@ -2350,6 +2367,12 @@ version = "1.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1dccffe3ce07af9386bfd29e80c0ab1a8205a2fc34e4bcd40364df902cfa8f3f"
[[package]]
name = "unicase"
version = "2.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75b844d17643ee918803943289730bec8aac480150456169e647ed0b576ba539"
[[package]]
name = "unicode-ident"
version = "1.0.12"

View File

@@ -625,6 +625,27 @@ impl<K: AuthenticationStore + Send + Sync> HTTPAPIClient<K> {
Ok(response)
}
// Fetch an attachment while preserving response headers (e.g., Content-Type).
// Returns the streaming body and the Content-Type header if present.
pub async fn fetch_attachment_with_metadata(
&mut self,
guid: &str,
preview: bool,
) -> Result<(ResponseStream, Option<String>), Error> {
let endpoint = format!("attachment?guid={}&preview={}", guid, preview);
let response = self
.response_with_body_retry(&endpoint, Method::GET, Body::empty, true)
.await?;
let content_type = response
.headers()
.get(hyper::header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string());
Ok((ResponseStream::from(response.into_body()), content_type))
}
}
#[cfg(test)]

View File

@@ -22,6 +22,7 @@ tokio = { version = "1", features = ["full"] }
tokio-condvar = "0.3.0"
uuid = "1.16.0"
once_cell = "1.19.0"
mime_guess = "2.0"
# D-Bus dependencies only on Linux
[target.'cfg(target_os = "linux")'.dependencies]
@@ -49,4 +50,3 @@ assets = [
{ source = "../target/release/kpcli", dest = "/usr/bin/kpcli", mode = "755" },
{ source = "include/net.buzzert.kordophonecd.service", dest = "/usr/share/dbus-1/services/net.buzzert.kordophonecd.service", mode = "644" },
]

View File

@@ -17,7 +17,7 @@ use crate::daemon::Daemon;
use std::sync::Arc;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::Mutex;
use tokio::sync::{Mutex, Semaphore};
use uuid::Uuid;
@@ -63,6 +63,9 @@ pub struct AttachmentStore {
event_source: Receiver<AttachmentStoreEvent>,
event_sink: Option<Sender<AttachmentStoreEvent>>,
// Limits concurrent downloads to avoid overloading server and local I/O
download_limit: Arc<Semaphore>,
}
impl AttachmentStore {
@@ -84,12 +87,16 @@ impl AttachmentStore {
let (event_sink, event_source) = tokio::sync::mpsc::channel(100);
// Limit to at most 5 concurrent downloads by default
let download_limit = Arc::new(Semaphore::new(5));
AttachmentStore {
store_path: store_path,
database: database,
daemon_event_sink: daemon_event_sink,
event_source: event_source,
event_sink: Some(event_sink),
download_limit,
}
}
@@ -103,12 +110,50 @@ impl AttachmentStore {
pub fn get_attachment_impl(store_path: &PathBuf, guid: &String) -> Attachment {
let base_path = store_path.join(guid);
Attachment {
let mut attachment = Attachment {
guid: guid.to_owned(),
base_path: base_path,
metadata: None,
mime_type: None,
};
// Best-effort: if a file already exists, try to infer MIME type from extension
let kind = "full";
let stem = attachment
.base_path
.file_name()
.map(|s| s.to_string_lossy().to_string())
.unwrap_or_default();
let legacy = attachment.base_path.with_extension(kind);
let existing_path = if legacy.exists() {
Some(legacy)
} else {
let prefix = format!("{}.{}.", stem, kind);
let parent = attachment
.base_path
.parent()
.unwrap_or_else(|| std::path::Path::new("."));
let mut found: Option<PathBuf> = None;
if let Ok(entries) = std::fs::read_dir(parent) {
for entry in entries.flatten() {
let name = entry.file_name().to_string_lossy().to_string();
if name.starts_with(&prefix) && !name.ends_with(".download") {
found = Some(entry.path());
break;
}
}
}
found
};
if let Some(existing) = existing_path {
if let Some(m) = mime_guess::from_path(&existing).first_raw() {
attachment.mime_type = Some(m.to_string());
}
}
attachment
}
async fn download_attachment_impl(
store_path: &PathBuf,
@@ -124,22 +169,45 @@ impl AttachmentStore {
return Err(AttachmentStoreError::AttachmentAlreadyDownloaded.into());
}
let temporary_path = attachment.get_path_for_preview_scratch(preview, true);
if std::fs::exists(&temporary_path).unwrap_or(false) {
log::warn!(target: target::ATTACHMENTS, "Temporary file already exists: {}, assuming download is in progress", temporary_path.display());
// Check if any in-progress temporary file exists already for this attachment
if let Some(in_progress) = Self::find_in_progress_download(&attachment, preview) {
log::warn!(target: target::ATTACHMENTS, "Temporary file already exists: {}, assuming download is in progress", in_progress.display());
// Treat as a non-fatal condition so we don't spam errors.
return Err(AttachmentStoreError::DownloadAlreadyInProgress.into());
}
log::debug!(target: target::ATTACHMENTS, "Starting download for attachment: {}", attachment.guid);
let file = std::fs::File::create(&temporary_path)?;
let mut writer = BufWriter::new(&file);
let mut client = Daemon::get_client_impl(database).await?;
let mut stream = client
.fetch_attachment_data(&attachment.guid, preview)
let (mut stream, content_type) = client
.fetch_attachment_with_metadata(&attachment.guid, preview)
.await
.map_err(|e| AttachmentStoreError::APIClientError(format!("{:?}", e)))?;
let kind = if preview { "preview" } else { "full" };
let normalized_mime = content_type
.as_deref()
.map(|s| s.split(';').next().unwrap_or(s).trim().to_string());
let guessed_ext = normalized_mime
.as_deref()
.and_then(|m| Attachment::preferred_extension_for_mime(m))
.unwrap_or("bin");
let final_path = attachment
.base_path
.with_extension(format!("{}.{}", kind, guessed_ext));
let temporary_path = final_path.with_extension(format!(
"{}.download",
final_path
.extension()
.and_then(|e| e.to_str())
.unwrap_or("")
));
let file = std::fs::File::create(&temporary_path)?;
let mut writer = BufWriter::new(&file);
log::trace!(target: target::ATTACHMENTS, "Writing attachment {:?} data to temporary file {:?}", &attachment.guid, &temporary_path);
while let Some(Ok(data)) = stream.next().await {
writer.write(data.as_ref())?;
@@ -150,10 +218,7 @@ impl AttachmentStore {
file.sync_all()?;
// Atomically move the temporary file to the final location
std::fs::rename(
&temporary_path,
&attachment.get_path_for_preview_scratch(preview, false),
)?;
std::fs::rename(&temporary_path, &final_path)?;
log::debug!(target: target::ATTACHMENTS, "Completed download for attachment: {}", attachment.guid);
@@ -164,6 +229,41 @@ impl AttachmentStore {
Ok(())
}
fn find_in_progress_download(attachment: &Attachment, preview: bool) -> Option<PathBuf> {
let kind = if preview { "preview" } else { "full" };
// Legacy temp path: guid.<kind>.download
let legacy = attachment
.base_path
.with_extension(format!("{}.download", kind));
if legacy.exists() {
return Some(legacy);
}
// Scan for any guid.<kind>.<ext>.download
let parent = attachment
.base_path
.parent()
.unwrap_or_else(|| std::path::Path::new("."));
let stem = attachment
.base_path
.file_name()
.map(|s| s.to_string_lossy().to_string())
.unwrap_or_default();
let prefix = format!("{}.{}.", stem, kind);
if let Ok(entries) = std::fs::read_dir(parent) {
for entry in entries.flatten() {
let name = entry.file_name().to_string_lossy().to_string();
if name.starts_with(&prefix) && name.ends_with(".download") {
return Some(entry.path());
}
}
}
None
}
async fn upload_attachment_impl(
store_path: &PathBuf,
incoming_path: &PathBuf,
@@ -222,9 +322,12 @@ impl AttachmentStore {
let mut database = self.database.clone();
let daemon_event_sink = self.daemon_event_sink.clone();
let _guid = guid.clone();
let limiter = self.download_limit.clone();
// Spawn a new task here so we don't block incoming queue events.
tokio::spawn(async move {
// Acquire a slot in the concurrent download limiter.
let _permit = limiter.acquire_owned().await.expect("Semaphore closed");
let result = Self::download_attachment_impl(
&store_path,
&mut database,
@@ -234,8 +337,24 @@ impl AttachmentStore {
).await;
if let Err(e) = result {
// Downgrade noise for expected cases
if let Some(kind) = e.downcast_ref::<AttachmentStoreError>() {
match kind {
AttachmentStoreError::AttachmentAlreadyDownloaded => {
log::debug!(target: target::ATTACHMENTS, "Attachment already downloaded: {}", &_guid);
}
AttachmentStoreError::DownloadAlreadyInProgress => {
// Already logged a warning where detected
log::debug!(target: target::ATTACHMENTS, "Download already in progress: {}", &_guid);
}
_ => {
log::error!(target: target::ATTACHMENTS, "Error downloading attachment {}: {}", &_guid, kind);
}
}
} else {
log::error!(target: target::ATTACHMENTS, "Error downloading attachment {}: {}", &_guid, e);
}
}
});
log::debug!(target: target::ATTACHMENTS, "Queued download for attachment: {}", &guid);

View File

@@ -1,4 +1,4 @@
use std::path::PathBuf;
use std::path::{Path, PathBuf};
#[derive(Debug, Clone)]
pub struct AttachmentMetadata {
@@ -16,9 +16,26 @@ pub struct Attachment {
pub guid: String,
pub base_path: PathBuf,
pub metadata: Option<AttachmentMetadata>,
pub mime_type: Option<String>,
}
impl Attachment {
pub(crate) fn preferred_extension_for_mime(mime: &str) -> Option<&'static str> {
let normalized = mime.split(';').next().unwrap_or(mime).trim();
// Prefer common, user-friendly extensions over obscure ones
match normalized {
"image/jpeg" | "image/pjpeg" => Some("jpg"),
_ => mime_guess::get_mime_extensions_str(normalized)
.and_then(|list| {
// If jpg is one of the candidates, prefer it
if list.iter().any(|e| *e == "jpg") {
Some("jpg")
} else {
list.first().copied()
}
}),
}
}
pub fn get_path(&self) -> PathBuf {
self.get_path_for_preview_scratch(false, false)
}
@@ -28,12 +45,34 @@ impl Attachment {
}
pub fn get_path_for_preview_scratch(&self, preview: bool, scratch: bool) -> PathBuf {
let extension = if preview { "preview" } else { "full" };
// Determine whether this is a preview or full attachment.
let kind = if preview { "preview" } else { "full" };
// If not a scratch path, and a file already exists on disk with a concrete
// file extension (e.g., guid.full.jpg), return that existing path.
if !scratch {
if let Some(existing) = self.find_existing_path(preview) {
return existing;
}
}
// Fall back to constructing a path using known info. If we know the MIME type,
// prefer an extension guessed from it; otherwise keep legacy naming.
let ext_from_mime = self
.mime_type
.as_ref()
.and_then(|m| Self::preferred_extension_for_mime(m));
let base_ext = match ext_from_mime {
Some(ext) => format!("{}.{}", kind, ext),
None => kind.to_string(),
};
if scratch {
self.base_path
.with_extension(format!("{}.download", extension))
.with_extension(format!("{}.download", base_ext))
} else {
self.base_path.with_extension(extension)
self.base_path.with_extension(base_ext)
}
}
@@ -46,6 +85,36 @@ impl Attachment {
.as_str(),
)
}
fn find_existing_path(&self, preview: bool) -> Option<PathBuf> {
let kind = if preview { "preview" } else { "full" };
// First, check legacy path without a concrete file extension.
let legacy = self.base_path.with_extension(kind);
if legacy.exists() {
return Some(legacy);
}
// Next, search for a filename like: <guid>.<kind>.<ext>
let file_stem = self
.base_path
.file_name()
.map(|s| s.to_string_lossy().to_string())?;
let prefix = format!("{}.{}.", file_stem, kind);
let parent = self.base_path.parent().unwrap_or_else(|| Path::new("."));
if let Ok(dir) = std::fs::read_dir(parent) {
for entry in dir.flatten() {
let file_name = entry.file_name();
let name = file_name.to_string_lossy();
if name.starts_with(&prefix) && !name.ends_with(".download") {
return Some(entry.path());
}
}
}
None
}
}
impl From<kordophone::model::message::AttachmentMetadata> for AttachmentMetadata {