diff --git a/Cargo.lock b/Cargo.lock index 01619c7..12d628e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1019,6 +1019,7 @@ dependencies = [ "time", "tokio", "tokio-tungstenite", + "tokio-util", "tungstenite", "uuid", ] @@ -1960,16 +1961,16 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.10" +version = "0.7.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15" +checksum = "66a539a9ad6d5d281510d5bd368c973d636c02dbf8a67300bfb6b950696ad7df" dependencies = [ "bytes", "futures-core", "futures-sink", + "futures-util", "pin-project-lite", "tokio", - "tracing", ] [[package]] diff --git a/kordophone/Cargo.toml b/kordophone/Cargo.toml index 131ca83..24e074c 100644 --- a/kordophone/Cargo.toml +++ b/kordophone/Cargo.toml @@ -22,5 +22,6 @@ serde_plain = "1.0.2" time = { version = "0.3.17", features = ["parsing", "serde"] } tokio = { version = "1.37.0", features = ["full"] } tokio-tungstenite = "0.26.2" +tokio-util = { version = "0.7.15", features = ["futures-util"] } tungstenite = "0.26.2" uuid = { version = "1.6.1", features = ["v4", "fast-rng", "macro-diagnostics"] } diff --git a/kordophone/src/api/http_client.rs b/kordophone/src/api/http_client.rs index 360d92b..ddd6d65 100644 --- a/kordophone/src/api/http_client.rs +++ b/kordophone/src/api/http_client.rs @@ -13,6 +13,7 @@ use async_trait::async_trait; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use tokio::net::TcpStream; +use tokio_util::io::ReaderStream; use futures_util::stream::{BoxStream, Stream}; use futures_util::task::Context; @@ -289,6 +290,40 @@ impl APIInterface for HTTPAPIClient { .map(ResponseStream::from) } + async fn upload_attachment( + &mut self, + data: tokio::io::BufReader, + filename: &str, + ) -> Result + where + R: tokio::io::AsyncRead + Unpin + Send + Sync + 'static, + { + #[derive(Deserialize, Debug)] + struct UploadAttachmentResponse { + #[serde(rename = "fileTransferGUID")] + guid: String, + } + + let endpoint = format!("uploadAttachment?filename={}", filename); + let mut data_opt = Some(data); + + let response: UploadAttachmentResponse = self + .deserialized_response_with_body_retry( + &endpoint, + Method::POST, + move || { + let stream = ReaderStream::new( + data_opt.take().expect("Stream already consumed during retry"), + ); + Body::wrap_stream(stream) + }, + false, // don't retry auth for streaming body + ) + .await?; + + Ok(response.guid) + } + async fn open_event_socket( &mut self, update_seq: Option, @@ -406,7 +441,7 @@ impl HTTPAPIClient { &mut self, endpoint: &str, method: Method, - body_fn: impl Fn() -> Body, + body_fn: impl FnMut() -> Body, ) -> Result where T: DeserializeOwned, @@ -419,7 +454,7 @@ impl HTTPAPIClient { &mut self, endpoint: &str, method: Method, - body_fn: impl Fn() -> Body, + body_fn: impl FnMut() -> Body, retry_auth: bool, ) -> Result where @@ -451,7 +486,7 @@ impl HTTPAPIClient { &mut self, endpoint: &str, method: Method, - body_fn: impl Fn() -> Body, + mut body_fn: impl FnMut() -> Body, retry_auth: bool, ) -> Result, Error> { use hyper::StatusCode; @@ -459,7 +494,7 @@ impl HTTPAPIClient { let uri = self.uri_for_endpoint(endpoint, None); log::debug!("Requesting {:?} {:?}", method, uri); - let build_request = move |auth: &Option| { + let mut build_request = |auth: &Option| { let body = body_fn(); Request::builder() .method(&method) diff --git a/kordophone/src/api/mod.rs b/kordophone/src/api/mod.rs index 0f63308..c0e67c0 100644 --- a/kordophone/src/api/mod.rs +++ b/kordophone/src/api/mod.rs @@ -51,6 +51,15 @@ pub trait APIInterface { preview: bool, ) -> Result; + // (POST) /uploadAttachment + async fn upload_attachment( + &mut self, + data: tokio::io::BufReader, + filename: &str, + ) -> Result + where + R: tokio::io::AsyncRead + Unpin + Send + Sync + 'static; + // (POST) /authenticate async fn authenticate(&mut self, credentials: Credentials) -> Result; diff --git a/kordophone/src/tests/test_client.rs b/kordophone/src/tests/test_client.rs index c051e1f..ffb9874 100644 --- a/kordophone/src/tests/test_client.rs +++ b/kordophone/src/tests/test_client.rs @@ -127,4 +127,15 @@ impl APIInterface for TestClient { ) -> Result { Ok(futures_util::stream::iter(vec![Ok(Bytes::from_static(b"test"))]).boxed()) } + + async fn upload_attachment( + &mut self, + data: tokio::io::BufReader, + filename: &str, + ) -> Result + where + R: tokio::io::AsyncRead + Unpin + Send + Sync + 'static, + { + Ok(String::from("test")) + } } diff --git a/kordophoned/include/net.buzzert.kordophonecd.Server.xml b/kordophoned/include/net.buzzert.kordophonecd.Server.xml index 2cf0e9b..bdc0fb2 100644 --- a/kordophoned/include/net.buzzert.kordophonecd.Server.xml +++ b/kordophoned/include/net.buzzert.kordophonecd.Server.xml @@ -120,6 +120,11 @@ "/> + + + + + @@ -133,6 +138,18 @@ + + + + + + + diff --git a/kordophoned/src/daemon/attachment_store.rs b/kordophoned/src/daemon/attachment_store.rs index 711fb95..b77294f 100644 --- a/kordophoned/src/daemon/attachment_store.rs +++ b/kordophoned/src/daemon/attachment_store.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::Mutex; -use tokio::pin; +use uuid::Uuid; mod target { pub static ATTACHMENTS: &str = "attachments"; @@ -36,6 +36,12 @@ pub enum AttachmentStoreEvent { // - attachment guid // - preview: whether to download the preview (true) or full attachment (false) QueueDownloadAttachment(String, bool), + + // Queue an upload for a given attachment file. + // Args: + // - path: the path to the attachment file + // - reply: a reply channel to send the pending upload guid to + QueueUploadAttachment(PathBuf, Reply), } #[derive(Debug, Error)] @@ -161,6 +167,47 @@ impl AttachmentStore { Ok(()) } + async fn upload_attachment_impl( + store_path: &PathBuf, + incoming_path: &PathBuf, + upload_guid: &String, + database: &mut Arc>, + daemon_event_sink: &Sender, + ) -> Result { + use tokio::fs::File; + use tokio::io::BufReader; + + // Create uploads directory if it doesn't exist. + let uploads_path = store_path.join("uploads"); + std::fs::create_dir_all(&uploads_path).unwrap(); + + // First, copy the file to the store path, under /uploads/. + log::trace!(target: target::ATTACHMENTS, "Copying attachment to uploads directory: {}", uploads_path.display()); + let temporary_path = uploads_path.join(incoming_path.file_name().unwrap()); + std::fs::copy(incoming_path, &temporary_path).unwrap(); + + // Open file handle to the temporary file, + log::trace!(target: target::ATTACHMENTS, "Opening stream to temporary file: {}", temporary_path.display()); + let file = File::open(&temporary_path).await?; + let reader: BufReader = BufReader::new(file); + + // Upload the file to the server. + let filename = incoming_path.file_name().unwrap().to_str().unwrap(); + log::trace!(target: target::ATTACHMENTS, "Uploading attachment to server: {}", &filename); + let mut client = Daemon::get_client_impl(database).await?; + let guid = client.upload_attachment(reader, filename).await?; + + // Delete the temporary file. + log::debug!(target: target::ATTACHMENTS, "Upload completed with guid {}, deleting temporary file: {}", guid, temporary_path.display()); + std::fs::remove_file(&temporary_path).unwrap(); + + // Send a signal to the daemon that the attachment has been uploaded. + let event = DaemonEvent::AttachmentUploaded(upload_guid.clone(), guid.clone()); + daemon_event_sink.send(event).await.unwrap(); + + Ok(guid) + } + pub async fn run(&mut self) { loop { tokio::select! { @@ -201,6 +248,30 @@ impl AttachmentStore { let attachment = self.get_attachment(&guid); reply.send(attachment).unwrap(); } + + AttachmentStoreEvent::QueueUploadAttachment(path, reply) => { + let upload_guid = Uuid::new_v4().to_string(); + let store_path = self.store_path.clone(); + let mut database = self.database.clone(); + let daemon_event_sink = self.daemon_event_sink.clone(); + + let _upload_guid = upload_guid.clone(); + tokio::spawn(async move { + let result = Self::upload_attachment_impl( + &store_path, + &path, + &_upload_guid, + &mut database, + &daemon_event_sink, + ).await; + + if let Err(e) = result { + log::error!(target: target::ATTACHMENTS, "Error uploading attachment {}: {}", &_upload_guid, e); + } + }); + + reply.send(upload_guid).unwrap(); + } } } } diff --git a/kordophoned/src/daemon/events.rs b/kordophoned/src/daemon/events.rs index 19c89a6..43062fb 100644 --- a/kordophoned/src/daemon/events.rs +++ b/kordophoned/src/daemon/events.rs @@ -10,6 +10,8 @@ use crate::daemon::{Attachment, Message}; pub type Reply = oneshot::Sender; +use std::path::PathBuf; + #[derive(Debug)] pub enum Event { /// Get the version of the daemon. @@ -76,4 +78,16 @@ pub enum Event { /// Parameters: /// - attachment_id: The attachment ID that was downloaded. AttachmentDownloaded(String), + + /// Upload an attachment to the server. + /// Parameters: + /// - path: The path to the attachment file + /// - reply: Reply indicating the upload GUID + UploadAttachment(PathBuf, Reply), + + /// Notifies the daemon that an attachment has been uploaded. + /// Parameters: + /// - upload_id: The upload ID that was uploaded. + /// - attachment_id: The attachment ID that was uploaded. + AttachmentUploaded(String, String), } diff --git a/kordophoned/src/daemon/mod.rs b/kordophoned/src/daemon/mod.rs index 3266eda..e32e6ea 100644 --- a/kordophoned/src/daemon/mod.rs +++ b/kordophoned/src/daemon/mod.rs @@ -324,6 +324,24 @@ impl Daemon { .await .unwrap(); } + + Event::UploadAttachment(path, reply) => { + self.attachment_store_sink + .as_ref() + .unwrap() + .send(AttachmentStoreEvent::QueueUploadAttachment(path, reply)) + .await + .unwrap(); + } + + Event::AttachmentUploaded(upload_guid, attachment_guid) => { + log::info!(target: target::ATTACHMENTS, "Daemon: attachment uploaded: {}, {}", upload_guid, attachment_guid); + + self.signal_sender + .send(Signal::AttachmentUploaded(upload_guid, attachment_guid)) + .await + .unwrap(); + } } } diff --git a/kordophoned/src/daemon/signals.rs b/kordophoned/src/daemon/signals.rs index 7509740..6fc5cd2 100644 --- a/kordophoned/src/daemon/signals.rs +++ b/kordophoned/src/daemon/signals.rs @@ -12,4 +12,10 @@ pub enum Signal { /// Parameters: /// - attachment_id: The ID of the attachment that was downloaded. AttachmentDownloaded(String), + + /// Emitted when an attachment has been uploaded. + /// Parameters: + /// - upload_guid: The GUID of the upload. + /// - attachment_guid: The GUID of the attachment on the server. + AttachmentUploaded(String, String), } diff --git a/kordophoned/src/dbus/mod.rs b/kordophoned/src/dbus/mod.rs index 1ad4a0c..2cf6189 100644 --- a/kordophoned/src/dbus/mod.rs +++ b/kordophoned/src/dbus/mod.rs @@ -13,5 +13,6 @@ pub mod interface { pub use crate::interface::NetBuzzertKordophoneRepositoryConversationsUpdated as ConversationsUpdated; pub use crate::interface::NetBuzzertKordophoneRepositoryMessagesUpdated as MessagesUpdated; pub use crate::interface::NetBuzzertKordophoneRepositoryAttachmentDownloadCompleted as AttachmentDownloadCompleted; + pub use crate::interface::NetBuzzertKordophoneRepositoryAttachmentUploadCompleted as AttachmentUploadCompleted; } } diff --git a/kordophoned/src/dbus/server_impl.rs b/kordophoned/src/dbus/server_impl.rs index 0964f4a..7926f99 100644 --- a/kordophoned/src/dbus/server_impl.rs +++ b/kordophoned/src/dbus/server_impl.rs @@ -264,6 +264,16 @@ impl DbusRepository for ServerImpl { // For now, just trigger the download event - we'll implement the actual download logic later self.send_event_sync(|r| Event::DownloadAttachment(attachment_id, preview, r)) } + + fn upload_attachment( + &mut self, + path: String, + ) -> Result { + use std::path::PathBuf; + + let path = PathBuf::from(path); + self.send_event_sync(|r| Event::UploadAttachment(path, r)) + } } impl DbusSettings for ServerImpl { diff --git a/kordophoned/src/main.rs b/kordophoned/src/main.rs index 03bca17..128333a 100644 --- a/kordophoned/src/main.rs +++ b/kordophoned/src/main.rs @@ -108,6 +108,16 @@ async fn main() { 0 }); } + + Signal::AttachmentUploaded(upload_guid, attachment_guid) => { + log::debug!("Sending signal: AttachmentUploaded for upload {}, attachment {}", upload_guid, attachment_guid); + dbus_registry + .send_signal(interface::OBJECT_PATH, DbusSignals::AttachmentUploadCompleted { upload_guid, attachment_guid }) + .unwrap_or_else(|_| { + log::error!("Failed to send signal"); + 0 + }); + } } } }); diff --git a/kpcli/src/daemon/mod.rs b/kpcli/src/daemon/mod.rs index 5bd0f8f..cb4d5ff 100644 --- a/kpcli/src/daemon/mod.rs +++ b/kpcli/src/daemon/mod.rs @@ -52,6 +52,16 @@ pub enum Commands { conversation_id: String, text: String, }, + + /// Downloads an attachment from the server to the attachment store. Returns the path to the attachment. + DownloadAttachment { + attachment_id: String, + }, + + /// Uploads an attachment to the server, returns upload guid. + UploadAttachment { + path: String, + }, } #[derive(Subcommand)] @@ -89,6 +99,8 @@ impl Commands { conversation_id, text, } => client.enqueue_outgoing_message(conversation_id, text).await, + Commands::UploadAttachment { path } => client.upload_attachment(path).await, + Commands::DownloadAttachment { attachment_id } => client.download_attachment(attachment_id).await, } } } @@ -225,4 +237,48 @@ impl DaemonCli { KordophoneRepository::delete_all_conversations(&self.proxy()) .map_err(|e| anyhow::anyhow!("Failed to delete all conversations: {}", e)) } + + pub async fn download_attachment(&mut self, attachment_id: String) -> Result<()> { + // Trigger download. + KordophoneRepository::download_attachment(&self.proxy(), &attachment_id, false)?; + + // Get attachment info. + let attachment_info = KordophoneRepository::get_attachment_info(&self.proxy(), &attachment_id)?; + let (path, preview_path, downloaded, preview_downloaded) = attachment_info; + + if downloaded { + println!("Attachment already downloaded: {}", path); + return Ok(()); + } + + println!("Downloading attachment: {}", attachment_id); + + // Attach to the signal that the attachment has been downloaded. + let _id = self.proxy().match_signal( + move |h: dbus_interface::NetBuzzertKordophoneRepositoryAttachmentDownloadCompleted, _: &Connection, _: &dbus::message::Message| { + println!("Signal: Attachment downloaded: {}", path); + std::process::exit(0); + }, + ); + + let _id = self.proxy().match_signal( + |h: dbus_interface::NetBuzzertKordophoneRepositoryAttachmentDownloadFailed, _: &Connection, _: &dbus::message::Message| { + println!("Signal: Attachment download failed: {}", h.attachment_id); + std::process::exit(1); + }, + ); + + // Wait for the signal. + loop { + self.conn.process(std::time::Duration::from_millis(1000))?; + } + + Ok(()) + } + + pub async fn upload_attachment(&mut self, path: String) -> Result<()> { + let upload_guid = KordophoneRepository::upload_attachment(&self.proxy(), &path)?; + println!("Upload GUID: {}", upload_guid); + Ok(()) + } }