From 778d4b6650ad866443c00f9219ddce005c0bf4cc Mon Sep 17 00:00:00 2001 From: James Magahern Date: Wed, 10 Sep 2025 14:23:02 -0700 Subject: [PATCH] core: attachment store: limit concurrent downloads --- core/kordophoned/src/daemon/attachment_store.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/core/kordophoned/src/daemon/attachment_store.rs b/core/kordophoned/src/daemon/attachment_store.rs index 42fced1..177477a 100644 --- a/core/kordophoned/src/daemon/attachment_store.rs +++ b/core/kordophoned/src/daemon/attachment_store.rs @@ -17,7 +17,7 @@ use crate::daemon::Daemon; use std::sync::Arc; use tokio::sync::mpsc::{Receiver, Sender}; -use tokio::sync::Mutex; +use tokio::sync::{Mutex, Semaphore}; use uuid::Uuid; @@ -63,6 +63,9 @@ pub struct AttachmentStore { event_source: Receiver, event_sink: Option>, + + // Limits concurrent downloads to avoid overloading server and local I/O + download_limit: Arc, } impl AttachmentStore { @@ -84,12 +87,16 @@ impl AttachmentStore { let (event_sink, event_source) = tokio::sync::mpsc::channel(100); + // Limit to at most 5 concurrent downloads by default + let download_limit = Arc::new(Semaphore::new(5)); + AttachmentStore { store_path: store_path, database: database, daemon_event_sink: daemon_event_sink, event_source: event_source, event_sink: Some(event_sink), + download_limit, } } @@ -315,9 +322,12 @@ impl AttachmentStore { let mut database = self.database.clone(); let daemon_event_sink = self.daemon_event_sink.clone(); let _guid = guid.clone(); + let limiter = self.download_limit.clone(); // Spawn a new task here so we don't block incoming queue events. tokio::spawn(async move { + // Acquire a slot in the concurrent download limiter. + let _permit = limiter.acquire_owned().await.expect("Semaphore closed"); let result = Self::download_attachment_impl( &store_path, &mut database,