core: implement get_attachment_fd event for dbus, message limit for get_messages
This commit is contained in:
@@ -128,6 +128,20 @@
|
|||||||
"/>
|
"/>
|
||||||
</method>
|
</method>
|
||||||
|
|
||||||
|
<method name="OpenAttachmentFd">
|
||||||
|
<arg type="s" name="attachment_id" direction="in"/>
|
||||||
|
<arg type="b" name="preview" direction="in"/>
|
||||||
|
<arg type="h" name="fd" direction="out"/>
|
||||||
|
|
||||||
|
<annotation name="org.freedesktop.DBus.DocString"
|
||||||
|
value="Opens a read-only file descriptor for an attachment path.
|
||||||
|
Arguments:
|
||||||
|
attachment_id: the attachment GUID
|
||||||
|
preview: whether to open the preview path (true) or full path (false)
|
||||||
|
Returns:
|
||||||
|
fd: a Unix file descriptor to read attachment bytes"/>
|
||||||
|
</method>
|
||||||
|
|
||||||
<method name="DownloadAttachment">
|
<method name="DownloadAttachment">
|
||||||
<arg type="s" name="attachment_id" direction="in"/>
|
<arg type="s" name="attachment_id" direction="in"/>
|
||||||
<arg type="b" name="preview" direction="in"/>
|
<arg type="b" name="preview" direction="in"/>
|
||||||
|
|||||||
@@ -115,39 +115,57 @@ impl AttachmentStore {
|
|||||||
base_path: base_path,
|
base_path: base_path,
|
||||||
metadata: None,
|
metadata: None,
|
||||||
mime_type: None,
|
mime_type: None,
|
||||||
|
cached_full_path: None,
|
||||||
|
cached_preview_path: None,
|
||||||
};
|
};
|
||||||
|
|
||||||
// Best-effort: if a file already exists, try to infer MIME type from extension
|
// Best-effort: if files already exist, cache their exact paths and infer MIME type.
|
||||||
let kind = "full";
|
|
||||||
let stem = attachment
|
let stem = attachment
|
||||||
.base_path
|
.base_path
|
||||||
.file_name()
|
.file_name()
|
||||||
.map(|s| s.to_string_lossy().to_string())
|
.map(|s| s.to_string_lossy().to_string())
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
let legacy = attachment.base_path.with_extension(kind);
|
|
||||||
let existing_path = if legacy.exists() {
|
let legacy_full = attachment.base_path.with_extension("full");
|
||||||
Some(legacy)
|
if legacy_full.exists() {
|
||||||
} else {
|
attachment.cached_full_path = Some(legacy_full);
|
||||||
let prefix = format!("{}.{}.", stem, kind);
|
}
|
||||||
|
|
||||||
|
let legacy_preview = attachment.base_path.with_extension("preview");
|
||||||
|
if legacy_preview.exists() {
|
||||||
|
attachment.cached_preview_path = Some(legacy_preview);
|
||||||
|
}
|
||||||
|
|
||||||
|
if attachment.cached_full_path.is_none() || attachment.cached_preview_path.is_none() {
|
||||||
|
let full_prefix = format!("{}.full.", stem);
|
||||||
|
let preview_prefix = format!("{}.preview.", stem);
|
||||||
let parent = attachment
|
let parent = attachment
|
||||||
.base_path
|
.base_path
|
||||||
.parent()
|
.parent()
|
||||||
.unwrap_or_else(|| std::path::Path::new("."));
|
.unwrap_or_else(|| std::path::Path::new("."));
|
||||||
let mut found: Option<PathBuf> = None;
|
|
||||||
if let Ok(entries) = std::fs::read_dir(parent) {
|
if let Ok(entries) = std::fs::read_dir(parent) {
|
||||||
for entry in entries.flatten() {
|
for entry in entries.flatten() {
|
||||||
let name = entry.file_name().to_string_lossy().to_string();
|
let name = entry.file_name().to_string_lossy().to_string();
|
||||||
if name.starts_with(&prefix) && !name.ends_with(".download") {
|
|
||||||
found = Some(entry.path());
|
if !name.ends_with(".download") {
|
||||||
break;
|
if attachment.cached_full_path.is_none() && name.starts_with(&full_prefix) {
|
||||||
|
attachment.cached_full_path = Some(entry.path());
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if attachment.cached_preview_path.is_none()
|
||||||
|
&& name.starts_with(&preview_prefix)
|
||||||
|
{
|
||||||
|
attachment.cached_preview_path = Some(entry.path());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
found
|
}
|
||||||
};
|
|
||||||
|
|
||||||
if let Some(existing) = existing_path {
|
if let Some(existing_full) = &attachment.cached_full_path {
|
||||||
if let Some(m) = mime_guess::from_path(&existing).first_raw() {
|
if let Some(m) = mime_guess::from_path(existing_full).first_raw() {
|
||||||
attachment.mime_type = Some(m.to_string());
|
attachment.mime_type = Some(m.to_string());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -342,6 +360,9 @@ impl AttachmentStore {
|
|||||||
match kind {
|
match kind {
|
||||||
AttachmentStoreError::AttachmentAlreadyDownloaded => {
|
AttachmentStoreError::AttachmentAlreadyDownloaded => {
|
||||||
log::debug!(target: target::ATTACHMENTS, "Attachment already downloaded: {}", &_guid);
|
log::debug!(target: target::ATTACHMENTS, "Attachment already downloaded: {}", &_guid);
|
||||||
|
let _ = daemon_event_sink
|
||||||
|
.send(DaemonEvent::AttachmentDownloaded(_guid.clone()))
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
AttachmentStoreError::DownloadAlreadyInProgress => {
|
AttachmentStoreError::DownloadAlreadyInProgress => {
|
||||||
// Already logged a warning where detected
|
// Already logged a warning where detected
|
||||||
@@ -360,6 +381,10 @@ impl AttachmentStore {
|
|||||||
log::debug!(target: target::ATTACHMENTS, "Queued download for attachment: {}", &guid);
|
log::debug!(target: target::ATTACHMENTS, "Queued download for attachment: {}", &guid);
|
||||||
} else {
|
} else {
|
||||||
log::debug!(target: target::ATTACHMENTS, "Attachment already downloaded: {}", guid);
|
log::debug!(target: target::ATTACHMENTS, "Attachment already downloaded: {}", guid);
|
||||||
|
let _ = self
|
||||||
|
.daemon_event_sink
|
||||||
|
.send(DaemonEvent::AttachmentDownloaded(guid))
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -15,6 +15,7 @@ use std::collections::HashMap;
|
|||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::time::Instant;
|
||||||
|
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
use tokio::sync::mpsc::{Receiver, Sender};
|
use tokio::sync::mpsc::{Receiver, Sender};
|
||||||
@@ -72,6 +73,8 @@ pub mod target {
|
|||||||
pub static DAEMON: &str = "daemon";
|
pub static DAEMON: &str = "daemon";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const GET_MESSAGES_INITIAL_WINDOW: usize = 300;
|
||||||
|
|
||||||
pub struct Daemon {
|
pub struct Daemon {
|
||||||
pub event_sender: Sender<Event>,
|
pub event_sender: Sender<Event>,
|
||||||
event_receiver: Receiver<Event>,
|
event_receiver: Receiver<Event>,
|
||||||
@@ -314,7 +317,9 @@ impl Daemon {
|
|||||||
|
|
||||||
Event::GetMessages(conversation_id, last_message_id, reply) => {
|
Event::GetMessages(conversation_id, last_message_id, reply) => {
|
||||||
let messages = self.get_messages(conversation_id, last_message_id).await;
|
let messages = self.get_messages(conversation_id, last_message_id).await;
|
||||||
let _ = reply.send(messages);
|
if reply.send(messages).is_err() {
|
||||||
|
log::warn!(target: target::EVENT, "GetMessages reply receiver dropped before send");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Event::DeleteAllConversations(reply) => {
|
Event::DeleteAllConversations(reply) => {
|
||||||
@@ -448,8 +453,9 @@ impl Daemon {
|
|||||||
async fn get_messages(
|
async fn get_messages(
|
||||||
&mut self,
|
&mut self,
|
||||||
conversation_id: String,
|
conversation_id: String,
|
||||||
_last_message_id: Option<MessageID>,
|
last_message_id: Option<MessageID>,
|
||||||
) -> Vec<Message> {
|
) -> Vec<Message> {
|
||||||
|
let started = Instant::now();
|
||||||
// Get outgoing messages for this conversation.
|
// Get outgoing messages for this conversation.
|
||||||
let empty_vec: Vec<OutgoingMessage> = vec![];
|
let empty_vec: Vec<OutgoingMessage> = vec![];
|
||||||
let outgoing_messages: &Vec<OutgoingMessage> = self
|
let outgoing_messages: &Vec<OutgoingMessage> = self
|
||||||
@@ -488,6 +494,27 @@ impl Daemon {
|
|||||||
result.push(om.into());
|
result.push(om.into());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if let Some(last_id) = last_message_id {
|
||||||
|
if let Some(last_index) = result.iter().position(|message| message.id == last_id) {
|
||||||
|
result = result.split_off(last_index + 1);
|
||||||
|
}
|
||||||
|
} else if result.len() > GET_MESSAGES_INITIAL_WINDOW {
|
||||||
|
let dropped = result.len() - GET_MESSAGES_INITIAL_WINDOW;
|
||||||
|
result = result.split_off(dropped);
|
||||||
|
log::debug!(
|
||||||
|
target: target::EVENT,
|
||||||
|
"GetMessages initial window applied: dropped {} older messages",
|
||||||
|
dropped
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
log::debug!(
|
||||||
|
target: target::EVENT,
|
||||||
|
"GetMessages completed in {}ms: {} messages",
|
||||||
|
started.elapsed().as_millis(),
|
||||||
|
result.len()
|
||||||
|
);
|
||||||
|
|
||||||
result
|
result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
use std::path::{Path, PathBuf};
|
use std::path::PathBuf;
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct AttachmentMetadata {
|
pub struct AttachmentMetadata {
|
||||||
@@ -17,6 +17,8 @@ pub struct Attachment {
|
|||||||
pub base_path: PathBuf,
|
pub base_path: PathBuf,
|
||||||
pub metadata: Option<AttachmentMetadata>,
|
pub metadata: Option<AttachmentMetadata>,
|
||||||
pub mime_type: Option<String>,
|
pub mime_type: Option<String>,
|
||||||
|
pub cached_full_path: Option<PathBuf>,
|
||||||
|
pub cached_preview_path: Option<PathBuf>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Attachment {
|
impl Attachment {
|
||||||
@@ -25,15 +27,14 @@ impl Attachment {
|
|||||||
// Prefer common, user-friendly extensions over obscure ones
|
// Prefer common, user-friendly extensions over obscure ones
|
||||||
match normalized {
|
match normalized {
|
||||||
"image/jpeg" | "image/pjpeg" => Some("jpg"),
|
"image/jpeg" | "image/pjpeg" => Some("jpg"),
|
||||||
_ => mime_guess::get_mime_extensions_str(normalized)
|
_ => mime_guess::get_mime_extensions_str(normalized).and_then(|list| {
|
||||||
.and_then(|list| {
|
// If jpg is one of the candidates, prefer it
|
||||||
// If jpg is one of the candidates, prefer it
|
if list.iter().any(|e| *e == "jpg") {
|
||||||
if list.iter().any(|e| *e == "jpg") {
|
Some("jpg")
|
||||||
Some("jpg")
|
} else {
|
||||||
} else {
|
list.first().copied()
|
||||||
list.first().copied()
|
}
|
||||||
}
|
}),
|
||||||
}),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub fn get_path(&self) -> PathBuf {
|
pub fn get_path(&self) -> PathBuf {
|
||||||
@@ -45,17 +46,21 @@ impl Attachment {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_path_for_preview_scratch(&self, preview: bool, scratch: bool) -> PathBuf {
|
pub fn get_path_for_preview_scratch(&self, preview: bool, scratch: bool) -> PathBuf {
|
||||||
// Determine whether this is a preview or full attachment.
|
|
||||||
let kind = if preview { "preview" } else { "full" };
|
|
||||||
|
|
||||||
// If not a scratch path, and a file already exists on disk with a concrete
|
|
||||||
// file extension (e.g., guid.full.jpg), return that existing path.
|
|
||||||
if !scratch {
|
if !scratch {
|
||||||
if let Some(existing) = self.find_existing_path(preview) {
|
let cached = if preview {
|
||||||
return existing;
|
self.cached_preview_path.as_ref()
|
||||||
|
} else {
|
||||||
|
self.cached_full_path.as_ref()
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(path) = cached {
|
||||||
|
return path.clone();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Determine whether this is a preview or full attachment.
|
||||||
|
let kind = if preview { "preview" } else { "full" };
|
||||||
|
|
||||||
// Fall back to constructing a path using known info. If we know the MIME type,
|
// Fall back to constructing a path using known info. If we know the MIME type,
|
||||||
// prefer an extension guessed from it; otherwise keep legacy naming.
|
// prefer an extension guessed from it; otherwise keep legacy naming.
|
||||||
let ext_from_mime = self
|
let ext_from_mime = self
|
||||||
@@ -77,44 +82,15 @@ impl Attachment {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn is_downloaded(&self, preview: bool) -> bool {
|
pub fn is_downloaded(&self, preview: bool) -> bool {
|
||||||
std::fs::exists(&self.get_path_for_preview(preview)).expect(
|
let path = self.get_path_for_preview(preview);
|
||||||
|
std::fs::exists(&path).expect(
|
||||||
format!(
|
format!(
|
||||||
"Wasn't able to check for the existence of an attachment file path at {}",
|
"Wasn't able to check for the existence of an attachment file path at {}",
|
||||||
&self.get_path_for_preview(preview).display()
|
path.display()
|
||||||
)
|
)
|
||||||
.as_str(),
|
.as_str(),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn find_existing_path(&self, preview: bool) -> Option<PathBuf> {
|
|
||||||
let kind = if preview { "preview" } else { "full" };
|
|
||||||
|
|
||||||
// First, check legacy path without a concrete file extension.
|
|
||||||
let legacy = self.base_path.with_extension(kind);
|
|
||||||
if legacy.exists() {
|
|
||||||
return Some(legacy);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Next, search for a filename like: <guid>.<kind>.<ext>
|
|
||||||
let file_stem = self
|
|
||||||
.base_path
|
|
||||||
.file_name()
|
|
||||||
.map(|s| s.to_string_lossy().to_string())?;
|
|
||||||
let prefix = format!("{}.{}.", file_stem, kind);
|
|
||||||
|
|
||||||
let parent = self.base_path.parent().unwrap_or_else(|| Path::new("."));
|
|
||||||
if let Ok(dir) = std::fs::read_dir(parent) {
|
|
||||||
for entry in dir.flatten() {
|
|
||||||
let file_name = entry.file_name();
|
|
||||||
let name = file_name.to_string_lossy();
|
|
||||||
if name.starts_with(&prefix) && !name.ends_with(".download") {
|
|
||||||
return Some(entry.path());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<kordophone::model::message::AttachmentMetadata> for AttachmentMetadata {
|
impl From<kordophone::model::message::AttachmentMetadata> for AttachmentMetadata {
|
||||||
|
|||||||
@@ -1,6 +1,9 @@
|
|||||||
use dbus::arg;
|
use dbus::arg;
|
||||||
use dbus_tree::MethodErr;
|
use dbus_tree::MethodErr;
|
||||||
|
use std::fs::OpenOptions;
|
||||||
|
use std::os::fd::{FromRawFd, IntoRawFd};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::time::Instant;
|
||||||
use std::{future::Future, thread};
|
use std::{future::Future, thread};
|
||||||
use tokio::sync::{mpsc, oneshot, Mutex};
|
use tokio::sync::{mpsc, oneshot, Mutex};
|
||||||
|
|
||||||
@@ -277,6 +280,7 @@ impl DbusRepository for DBusAgent {
|
|||||||
conversation_id: String,
|
conversation_id: String,
|
||||||
last_message_id: String,
|
last_message_id: String,
|
||||||
) -> Result<Vec<arg::PropMap>, MethodErr> {
|
) -> Result<Vec<arg::PropMap>, MethodErr> {
|
||||||
|
let started = Instant::now();
|
||||||
let last_message_id_opt = if last_message_id.is_empty() {
|
let last_message_id_opt = if last_message_id.is_empty() {
|
||||||
None
|
None
|
||||||
} else {
|
} else {
|
||||||
@@ -286,6 +290,9 @@ impl DbusRepository for DBusAgent {
|
|||||||
let messages =
|
let messages =
|
||||||
self.send_event_sync(|r| Event::GetMessages(conversation_id, last_message_id_opt, r))?;
|
self.send_event_sync(|r| Event::GetMessages(conversation_id, last_message_id_opt, r))?;
|
||||||
|
|
||||||
|
let mut attachment_count: usize = 0;
|
||||||
|
let mut text_bytes: usize = 0;
|
||||||
|
|
||||||
let mapped: Vec<arg::PropMap> = messages
|
let mapped: Vec<arg::PropMap> = messages
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|msg| {
|
.map(|msg| {
|
||||||
@@ -294,6 +301,7 @@ impl DbusRepository for DBusAgent {
|
|||||||
|
|
||||||
// Remove the attachment placeholder here.
|
// Remove the attachment placeholder here.
|
||||||
let text = msg.text.replace("\u{FFFC}", "");
|
let text = msg.text.replace("\u{FFFC}", "");
|
||||||
|
text_bytes += text.len();
|
||||||
|
|
||||||
map.insert("text".into(), arg::Variant(Box::new(text)));
|
map.insert("text".into(), arg::Variant(Box::new(text)));
|
||||||
map.insert(
|
map.insert(
|
||||||
@@ -305,10 +313,12 @@ impl DbusRepository for DBusAgent {
|
|||||||
arg::Variant(Box::new(msg.sender.display_name())),
|
arg::Variant(Box::new(msg.sender.display_name())),
|
||||||
);
|
);
|
||||||
|
|
||||||
let attachments: Vec<arg::PropMap> = msg
|
if !msg.attachments.is_empty() {
|
||||||
.attachments
|
let attachments: Vec<arg::PropMap> = msg
|
||||||
.into_iter()
|
.attachments
|
||||||
.map(|attachment| {
|
.into_iter()
|
||||||
|
.map(|attachment| {
|
||||||
|
attachment_count += 1;
|
||||||
let mut attachment_map = arg::PropMap::new();
|
let mut attachment_map = arg::PropMap::new();
|
||||||
attachment_map.insert(
|
attachment_map.insert(
|
||||||
"guid".into(),
|
"guid".into(),
|
||||||
@@ -351,17 +361,26 @@ impl DbusRepository for DBusAgent {
|
|||||||
arg::Variant(Box::new(metadata_map)),
|
arg::Variant(Box::new(metadata_map)),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
attachment_map
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
attachment_map
|
map.insert("attachments".into(), arg::Variant(Box::new(attachments)));
|
||||||
})
|
}
|
||||||
.collect();
|
|
||||||
|
|
||||||
map.insert("attachments".into(), arg::Variant(Box::new(attachments)));
|
|
||||||
|
|
||||||
map
|
map
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
|
log::debug!(
|
||||||
|
target: "dbus",
|
||||||
|
"GetMessages mapped in {}ms: {} messages, {} attachments, {} text-bytes",
|
||||||
|
started.elapsed().as_millis(),
|
||||||
|
mapped.len(),
|
||||||
|
attachment_count,
|
||||||
|
text_bytes
|
||||||
|
);
|
||||||
|
|
||||||
Ok(mapped)
|
Ok(mapped)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -406,6 +425,23 @@ impl DbusRepository for DBusAgent {
|
|||||||
self.send_event_sync(|r| Event::DownloadAttachment(attachment_id, preview, r))
|
self.send_event_sync(|r| Event::DownloadAttachment(attachment_id, preview, r))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn open_attachment_fd(
|
||||||
|
&mut self,
|
||||||
|
attachment_id: String,
|
||||||
|
preview: bool,
|
||||||
|
) -> Result<arg::OwnedFd, MethodErr> {
|
||||||
|
let attachment = self.send_event_sync(|r| Event::GetAttachment(attachment_id, r))?;
|
||||||
|
let path = attachment.get_path_for_preview(preview);
|
||||||
|
|
||||||
|
let file = OpenOptions::new()
|
||||||
|
.read(true)
|
||||||
|
.open(&path)
|
||||||
|
.map_err(|e| MethodErr::failed(&format!("Failed to open attachment: {}", e)))?;
|
||||||
|
|
||||||
|
let fd = file.into_raw_fd();
|
||||||
|
Ok(unsafe { arg::OwnedFd::from_raw_fd(fd) })
|
||||||
|
}
|
||||||
|
|
||||||
fn upload_attachment(&mut self, path: String) -> Result<String, MethodErr> {
|
fn upload_attachment(&mut self, path: String) -> Result<String, MethodErr> {
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
let path = PathBuf::from(path);
|
let path = PathBuf::from(path);
|
||||||
|
|||||||
Reference in New Issue
Block a user