Private
Public Access
1
0

Implements attachment uploading

This commit is contained in:
2025-06-12 17:58:03 -07:00
parent 4ddc0dca39
commit 2f4e9b7c07
14 changed files with 268 additions and 8 deletions

7
Cargo.lock generated
View File

@@ -1019,6 +1019,7 @@ dependencies = [
"time",
"tokio",
"tokio-tungstenite",
"tokio-util",
"tungstenite",
"uuid",
]
@@ -1960,16 +1961,16 @@ dependencies = [
[[package]]
name = "tokio-util"
version = "0.7.10"
version = "0.7.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15"
checksum = "66a539a9ad6d5d281510d5bd368c973d636c02dbf8a67300bfb6b950696ad7df"
dependencies = [
"bytes",
"futures-core",
"futures-sink",
"futures-util",
"pin-project-lite",
"tokio",
"tracing",
]
[[package]]

View File

@@ -22,5 +22,6 @@ serde_plain = "1.0.2"
time = { version = "0.3.17", features = ["parsing", "serde"] }
tokio = { version = "1.37.0", features = ["full"] }
tokio-tungstenite = "0.26.2"
tokio-util = { version = "0.7.15", features = ["futures-util"] }
tungstenite = "0.26.2"
uuid = { version = "1.6.1", features = ["v4", "fast-rng", "macro-diagnostics"] }

View File

@@ -13,6 +13,7 @@ use async_trait::async_trait;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use tokio::net::TcpStream;
use tokio_util::io::ReaderStream;
use futures_util::stream::{BoxStream, Stream};
use futures_util::task::Context;
@@ -289,6 +290,40 @@ impl<K: AuthenticationStore + Send + Sync> APIInterface for HTTPAPIClient<K> {
.map(ResponseStream::from)
}
async fn upload_attachment<R>(
&mut self,
data: tokio::io::BufReader<R>,
filename: &str,
) -> Result<String, Self::Error>
where
R: tokio::io::AsyncRead + Unpin + Send + Sync + 'static,
{
#[derive(Deserialize, Debug)]
struct UploadAttachmentResponse {
#[serde(rename = "fileTransferGUID")]
guid: String,
}
let endpoint = format!("uploadAttachment?filename={}", filename);
let mut data_opt = Some(data);
let response: UploadAttachmentResponse = self
.deserialized_response_with_body_retry(
&endpoint,
Method::POST,
move || {
let stream = ReaderStream::new(
data_opt.take().expect("Stream already consumed during retry"),
);
Body::wrap_stream(stream)
},
false, // don't retry auth for streaming body
)
.await?;
Ok(response.guid)
}
async fn open_event_socket(
&mut self,
update_seq: Option<u64>,
@@ -406,7 +441,7 @@ impl<K: AuthenticationStore + Send + Sync> HTTPAPIClient<K> {
&mut self,
endpoint: &str,
method: Method,
body_fn: impl Fn() -> Body,
body_fn: impl FnMut() -> Body,
) -> Result<T, Error>
where
T: DeserializeOwned,
@@ -419,7 +454,7 @@ impl<K: AuthenticationStore + Send + Sync> HTTPAPIClient<K> {
&mut self,
endpoint: &str,
method: Method,
body_fn: impl Fn() -> Body,
body_fn: impl FnMut() -> Body,
retry_auth: bool,
) -> Result<T, Error>
where
@@ -451,7 +486,7 @@ impl<K: AuthenticationStore + Send + Sync> HTTPAPIClient<K> {
&mut self,
endpoint: &str,
method: Method,
body_fn: impl Fn() -> Body,
mut body_fn: impl FnMut() -> Body,
retry_auth: bool,
) -> Result<hyper::Response<Body>, Error> {
use hyper::StatusCode;
@@ -459,7 +494,7 @@ impl<K: AuthenticationStore + Send + Sync> HTTPAPIClient<K> {
let uri = self.uri_for_endpoint(endpoint, None);
log::debug!("Requesting {:?} {:?}", method, uri);
let build_request = move |auth: &Option<String>| {
let mut build_request = |auth: &Option<String>| {
let body = body_fn();
Request::builder()
.method(&method)

View File

@@ -51,6 +51,15 @@ pub trait APIInterface {
preview: bool,
) -> Result<Self::ResponseStream, Self::Error>;
// (POST) /uploadAttachment
async fn upload_attachment<R>(
&mut self,
data: tokio::io::BufReader<R>,
filename: &str,
) -> Result<String, Self::Error>
where
R: tokio::io::AsyncRead + Unpin + Send + Sync + 'static;
// (POST) /authenticate
async fn authenticate(&mut self, credentials: Credentials) -> Result<JwtToken, Self::Error>;

View File

@@ -127,4 +127,15 @@ impl APIInterface for TestClient {
) -> Result<Self::ResponseStream, Self::Error> {
Ok(futures_util::stream::iter(vec![Ok(Bytes::from_static(b"test"))]).boxed())
}
async fn upload_attachment<R>(
&mut self,
data: tokio::io::BufReader<R>,
filename: &str,
) -> Result<String, Self::Error>
where
R: tokio::io::AsyncRead + Unpin + Send + Sync + 'static,
{
Ok(String::from("test"))
}
}

View File

@@ -120,6 +120,11 @@
"/>
</method>
<method name="UploadAttachment">
<arg type="s" name="path" direction="in"/>
<arg type="s" name="upload_guid" direction="out"/>
</method>
<signal name="AttachmentDownloadCompleted">
<arg type="s" name="attachment_id"/>
@@ -133,6 +138,18 @@
<annotation name="org.freedesktop.DBus.DocString"
value="Emitted when an attachment download fails."/>
</signal>
<signal name="AttachmentUploadCompleted">
<arg type="s" name="upload_guid"/>
<arg type="s" name="attachment_guid"/>
<annotation name="org.freedesktop.DBus.DocString"
value="Emitted when an attachment upload completes successfully.
Returns:
- upload_guid: The GUID of the upload.
- attachment_guid: The GUID of the attachment on the server.
"/>
</signal>
</interface>
<interface name="net.buzzert.kordophone.Settings">

View File

@@ -19,7 +19,7 @@ use std::sync::Arc;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::Mutex;
use tokio::pin;
use uuid::Uuid;
mod target {
pub static ATTACHMENTS: &str = "attachments";
@@ -36,6 +36,12 @@ pub enum AttachmentStoreEvent {
// - attachment guid
// - preview: whether to download the preview (true) or full attachment (false)
QueueDownloadAttachment(String, bool),
// Queue an upload for a given attachment file.
// Args:
// - path: the path to the attachment file
// - reply: a reply channel to send the pending upload guid to
QueueUploadAttachment(PathBuf, Reply<String>),
}
#[derive(Debug, Error)]
@@ -161,6 +167,47 @@ impl AttachmentStore {
Ok(())
}
async fn upload_attachment_impl(
store_path: &PathBuf,
incoming_path: &PathBuf,
upload_guid: &String,
database: &mut Arc<Mutex<Database>>,
daemon_event_sink: &Sender<DaemonEvent>,
) -> Result<String> {
use tokio::fs::File;
use tokio::io::BufReader;
// Create uploads directory if it doesn't exist.
let uploads_path = store_path.join("uploads");
std::fs::create_dir_all(&uploads_path).unwrap();
// First, copy the file to the store path, under /uploads/.
log::trace!(target: target::ATTACHMENTS, "Copying attachment to uploads directory: {}", uploads_path.display());
let temporary_path = uploads_path.join(incoming_path.file_name().unwrap());
std::fs::copy(incoming_path, &temporary_path).unwrap();
// Open file handle to the temporary file,
log::trace!(target: target::ATTACHMENTS, "Opening stream to temporary file: {}", temporary_path.display());
let file = File::open(&temporary_path).await?;
let reader: BufReader<File> = BufReader::new(file);
// Upload the file to the server.
let filename = incoming_path.file_name().unwrap().to_str().unwrap();
log::trace!(target: target::ATTACHMENTS, "Uploading attachment to server: {}", &filename);
let mut client = Daemon::get_client_impl(database).await?;
let guid = client.upload_attachment(reader, filename).await?;
// Delete the temporary file.
log::debug!(target: target::ATTACHMENTS, "Upload completed with guid {}, deleting temporary file: {}", guid, temporary_path.display());
std::fs::remove_file(&temporary_path).unwrap();
// Send a signal to the daemon that the attachment has been uploaded.
let event = DaemonEvent::AttachmentUploaded(upload_guid.clone(), guid.clone());
daemon_event_sink.send(event).await.unwrap();
Ok(guid)
}
pub async fn run(&mut self) {
loop {
tokio::select! {
@@ -201,6 +248,30 @@ impl AttachmentStore {
let attachment = self.get_attachment(&guid);
reply.send(attachment).unwrap();
}
AttachmentStoreEvent::QueueUploadAttachment(path, reply) => {
let upload_guid = Uuid::new_v4().to_string();
let store_path = self.store_path.clone();
let mut database = self.database.clone();
let daemon_event_sink = self.daemon_event_sink.clone();
let _upload_guid = upload_guid.clone();
tokio::spawn(async move {
let result = Self::upload_attachment_impl(
&store_path,
&path,
&_upload_guid,
&mut database,
&daemon_event_sink,
).await;
if let Err(e) = result {
log::error!(target: target::ATTACHMENTS, "Error uploading attachment {}: {}", &_upload_guid, e);
}
});
reply.send(upload_guid).unwrap();
}
}
}
}

View File

@@ -10,6 +10,8 @@ use crate::daemon::{Attachment, Message};
pub type Reply<T> = oneshot::Sender<T>;
use std::path::PathBuf;
#[derive(Debug)]
pub enum Event {
/// Get the version of the daemon.
@@ -76,4 +78,16 @@ pub enum Event {
/// Parameters:
/// - attachment_id: The attachment ID that was downloaded.
AttachmentDownloaded(String),
/// Upload an attachment to the server.
/// Parameters:
/// - path: The path to the attachment file
/// - reply: Reply indicating the upload GUID
UploadAttachment(PathBuf, Reply<String>),
/// Notifies the daemon that an attachment has been uploaded.
/// Parameters:
/// - upload_id: The upload ID that was uploaded.
/// - attachment_id: The attachment ID that was uploaded.
AttachmentUploaded(String, String),
}

View File

@@ -324,6 +324,24 @@ impl Daemon {
.await
.unwrap();
}
Event::UploadAttachment(path, reply) => {
self.attachment_store_sink
.as_ref()
.unwrap()
.send(AttachmentStoreEvent::QueueUploadAttachment(path, reply))
.await
.unwrap();
}
Event::AttachmentUploaded(upload_guid, attachment_guid) => {
log::info!(target: target::ATTACHMENTS, "Daemon: attachment uploaded: {}, {}", upload_guid, attachment_guid);
self.signal_sender
.send(Signal::AttachmentUploaded(upload_guid, attachment_guid))
.await
.unwrap();
}
}
}

View File

@@ -12,4 +12,10 @@ pub enum Signal {
/// Parameters:
/// - attachment_id: The ID of the attachment that was downloaded.
AttachmentDownloaded(String),
/// Emitted when an attachment has been uploaded.
/// Parameters:
/// - upload_guid: The GUID of the upload.
/// - attachment_guid: The GUID of the attachment on the server.
AttachmentUploaded(String, String),
}

View File

@@ -13,5 +13,6 @@ pub mod interface {
pub use crate::interface::NetBuzzertKordophoneRepositoryConversationsUpdated as ConversationsUpdated;
pub use crate::interface::NetBuzzertKordophoneRepositoryMessagesUpdated as MessagesUpdated;
pub use crate::interface::NetBuzzertKordophoneRepositoryAttachmentDownloadCompleted as AttachmentDownloadCompleted;
pub use crate::interface::NetBuzzertKordophoneRepositoryAttachmentUploadCompleted as AttachmentUploadCompleted;
}
}

View File

@@ -264,6 +264,16 @@ impl DbusRepository for ServerImpl {
// For now, just trigger the download event - we'll implement the actual download logic later
self.send_event_sync(|r| Event::DownloadAttachment(attachment_id, preview, r))
}
fn upload_attachment(
&mut self,
path: String,
) -> Result<String, dbus::MethodErr> {
use std::path::PathBuf;
let path = PathBuf::from(path);
self.send_event_sync(|r| Event::UploadAttachment(path, r))
}
}
impl DbusSettings for ServerImpl {

View File

@@ -108,6 +108,16 @@ async fn main() {
0
});
}
Signal::AttachmentUploaded(upload_guid, attachment_guid) => {
log::debug!("Sending signal: AttachmentUploaded for upload {}, attachment {}", upload_guid, attachment_guid);
dbus_registry
.send_signal(interface::OBJECT_PATH, DbusSignals::AttachmentUploadCompleted { upload_guid, attachment_guid })
.unwrap_or_else(|_| {
log::error!("Failed to send signal");
0
});
}
}
}
});

View File

@@ -52,6 +52,16 @@ pub enum Commands {
conversation_id: String,
text: String,
},
/// Downloads an attachment from the server to the attachment store. Returns the path to the attachment.
DownloadAttachment {
attachment_id: String,
},
/// Uploads an attachment to the server, returns upload guid.
UploadAttachment {
path: String,
},
}
#[derive(Subcommand)]
@@ -89,6 +99,8 @@ impl Commands {
conversation_id,
text,
} => client.enqueue_outgoing_message(conversation_id, text).await,
Commands::UploadAttachment { path } => client.upload_attachment(path).await,
Commands::DownloadAttachment { attachment_id } => client.download_attachment(attachment_id).await,
}
}
}
@@ -225,4 +237,48 @@ impl DaemonCli {
KordophoneRepository::delete_all_conversations(&self.proxy())
.map_err(|e| anyhow::anyhow!("Failed to delete all conversations: {}", e))
}
pub async fn download_attachment(&mut self, attachment_id: String) -> Result<()> {
// Trigger download.
KordophoneRepository::download_attachment(&self.proxy(), &attachment_id, false)?;
// Get attachment info.
let attachment_info = KordophoneRepository::get_attachment_info(&self.proxy(), &attachment_id)?;
let (path, preview_path, downloaded, preview_downloaded) = attachment_info;
if downloaded {
println!("Attachment already downloaded: {}", path);
return Ok(());
}
println!("Downloading attachment: {}", attachment_id);
// Attach to the signal that the attachment has been downloaded.
let _id = self.proxy().match_signal(
move |h: dbus_interface::NetBuzzertKordophoneRepositoryAttachmentDownloadCompleted, _: &Connection, _: &dbus::message::Message| {
println!("Signal: Attachment downloaded: {}", path);
std::process::exit(0);
},
);
let _id = self.proxy().match_signal(
|h: dbus_interface::NetBuzzertKordophoneRepositoryAttachmentDownloadFailed, _: &Connection, _: &dbus::message::Message| {
println!("Signal: Attachment download failed: {}", h.attachment_id);
std::process::exit(1);
},
);
// Wait for the signal.
loop {
self.conn.process(std::time::Duration::from_millis(1000))?;
}
Ok(())
}
pub async fn upload_attachment(&mut self, path: String) -> Result<()> {
let upload_guid = KordophoneRepository::upload_attachment(&self.proxy(), &path)?;
println!("Upload GUID: {}", upload_guid);
Ok(())
}
}