core: attachment store: limit concurrent downloads
This commit is contained in:
@@ -17,7 +17,7 @@ use crate::daemon::Daemon;
|
|||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::sync::mpsc::{Receiver, Sender};
|
use tokio::sync::mpsc::{Receiver, Sender};
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::{Mutex, Semaphore};
|
||||||
|
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
@@ -63,6 +63,9 @@ pub struct AttachmentStore {
|
|||||||
|
|
||||||
event_source: Receiver<AttachmentStoreEvent>,
|
event_source: Receiver<AttachmentStoreEvent>,
|
||||||
event_sink: Option<Sender<AttachmentStoreEvent>>,
|
event_sink: Option<Sender<AttachmentStoreEvent>>,
|
||||||
|
|
||||||
|
// Limits concurrent downloads to avoid overloading server and local I/O
|
||||||
|
download_limit: Arc<Semaphore>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AttachmentStore {
|
impl AttachmentStore {
|
||||||
@@ -84,12 +87,16 @@ impl AttachmentStore {
|
|||||||
|
|
||||||
let (event_sink, event_source) = tokio::sync::mpsc::channel(100);
|
let (event_sink, event_source) = tokio::sync::mpsc::channel(100);
|
||||||
|
|
||||||
|
// Limit to at most 5 concurrent downloads by default
|
||||||
|
let download_limit = Arc::new(Semaphore::new(5));
|
||||||
|
|
||||||
AttachmentStore {
|
AttachmentStore {
|
||||||
store_path: store_path,
|
store_path: store_path,
|
||||||
database: database,
|
database: database,
|
||||||
daemon_event_sink: daemon_event_sink,
|
daemon_event_sink: daemon_event_sink,
|
||||||
event_source: event_source,
|
event_source: event_source,
|
||||||
event_sink: Some(event_sink),
|
event_sink: Some(event_sink),
|
||||||
|
download_limit,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -315,9 +322,12 @@ impl AttachmentStore {
|
|||||||
let mut database = self.database.clone();
|
let mut database = self.database.clone();
|
||||||
let daemon_event_sink = self.daemon_event_sink.clone();
|
let daemon_event_sink = self.daemon_event_sink.clone();
|
||||||
let _guid = guid.clone();
|
let _guid = guid.clone();
|
||||||
|
let limiter = self.download_limit.clone();
|
||||||
|
|
||||||
// Spawn a new task here so we don't block incoming queue events.
|
// Spawn a new task here so we don't block incoming queue events.
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
// Acquire a slot in the concurrent download limiter.
|
||||||
|
let _permit = limiter.acquire_owned().await.expect("Semaphore closed");
|
||||||
let result = Self::download_attachment_impl(
|
let result = Self::download_attachment_impl(
|
||||||
&store_path,
|
&store_path,
|
||||||
&mut database,
|
&mut database,
|
||||||
|
|||||||
Reference in New Issue
Block a user