aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSven-Hendrik Haase <svenstaro@gmail.com>2025-03-11 10:25:38 +0000
committerSven-Hendrik Haase <svenstaro@gmail.com>2025-03-11 11:56:02 +0000
commit612bcc1b80d95bd677da8c6453d6e9415370acbe (patch)
tree715cbb11bb7ef3e51696c3770023e99c9438669b
parentSSE-based directory size handling (diff)
downloadminiserve-origin/sse-based-dirsize.tar.gz
miniserve-origin/sse-based-dirsize.zip
Simplify SseManager a bitorigin/sse-based-dirsize
-rw-r--r--Cargo.lock1
-rw-r--r--Cargo.toml1
-rw-r--r--src/handlers.rs208
-rw-r--r--src/main.rs13
-rw-r--r--tests/api.rs2
5 files changed, 153 insertions, 72 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 9690274..efaf265 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2319,6 +2319,7 @@ dependencies = [
"tempfile",
"thiserror 2.0.11",
"tokio",
+ "tokio-stream",
"url",
"zip",
]
diff --git a/Cargo.toml b/Cargo.toml
index dc54c7e..e5033cf 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -60,6 +60,7 @@ tar = "0.4"
tempfile = "3.17.0"
thiserror = "2"
tokio = { version = "1.42.0", features = ["fs", "macros"] }
+tokio-stream = "0.1.17"
zip = { version = "2", default-features = false }
[features]
diff --git a/src/handlers.rs b/src/handlers.rs
index 8b4d6e5..e749292 100644
--- a/src/handlers.rs
+++ b/src/handlers.rs
@@ -1,17 +1,22 @@
-use std::time::Duration;
+use std::{path::PathBuf, sync::Arc, time::Duration};
use actix_web::{HttpRequest, HttpResponse, Responder, http::header::ContentType, web};
-use actix_web_lab::sse;
+use actix_web_lab::{
+ sse::{self, Sse},
+ util::InfallibleStream,
+};
use bytesize::ByteSize;
use dav_server::{
DavConfig, DavHandler,
actix::{DavRequest, DavResponse},
};
-use log::{error, info, warn};
+use futures::future::join_all;
+use log::{error, info};
use percent_encoding::percent_decode_str;
use serde::{Deserialize, Serialize};
-use tokio::sync::Mutex;
-use tokio::task::JoinSet;
+use tokio::{sync::Mutex, time::interval};
+use tokio::{sync::mpsc, task::JoinSet};
+use tokio_stream::wrappers::ReceiverStream;
use crate::{config::MiniserveConfig, errors::RuntimeError};
use crate::{file_op::recursive_dir_size, file_utils};
@@ -39,7 +44,71 @@ pub enum ApiCommand {
CalculateDirSizes(Vec<String>),
}
-pub type DirSizeJoinSet = JoinSet<Result<DirSize, RuntimeError>>;
+#[derive(Debug)]
+pub struct DirSizeTasks {
+ tasks: Arc<Mutex<JoinSet<Result<DirSize, RuntimeError>>>>,
+}
+
+impl DirSizeTasks {
+ pub fn new(show_exact_bytes: bool, sse_manager: web::Data<SseManager>) -> Self {
+ let tasks = Arc::new(Mutex::new(JoinSet::<Result<DirSize, RuntimeError>>::new()));
+
+ // Spawn a task that will periodically check for finished calculations.
+ let tasks_ = tasks.clone();
+ actix_web::rt::spawn(async move {
+ let mut interval = interval(Duration::from_millis(50));
+ loop {
+ // See whether there are any calculations finished and if so dispatch a message to
+ // the SSE channels.
+ match tasks_.lock().await.try_join_next() {
+ Some(Ok(Ok(finished_task))) => {
+ let dir_size = if show_exact_bytes {
+ format!("{} B", finished_task.size)
+ } else {
+ ByteSize::b(finished_task.size).to_string()
+ };
+
+ let dir_size_reply = DirSizeReply {
+ web_path: finished_task.web_path,
+ size: dir_size,
+ };
+
+ let msg = sse::Data::new_json(dir_size_reply)
+ .expect("Couldn't serialize as JSON")
+ .event("dir-size");
+ sse_manager.broadcast(msg).await
+ }
+ Some(Ok(Err(e))) => {
+ error!("Some error during dir size calculation: {e}");
+ break;
+ }
+ Some(Err(e)) => {
+ error!("Some error during dir size calculation joining: {e}");
+ break;
+ }
+ None => {
+ // If there's nothing we'll just chill a sec
+ interval.tick().await;
+ }
+ };
+ }
+ });
+
+ Self { tasks }
+ }
+
+ pub async fn calc_dir_size(&self, web_path: String, path: PathBuf) {
+ self.tasks.lock().await.spawn(async move {
+ recursive_dir_size(&path).await.map(|dir_size| {
+ info!("Finished dir size calculation for {path:?}");
+ DirSize {
+ web_path,
+ size: dir_size,
+ }
+ })
+ });
+ }
+}
// Holds the result of a calculated dir size
#[derive(Debug, Clone)]
@@ -61,67 +130,81 @@ pub struct DirSizeReply {
pub size: String,
}
-// Reply to check whether the client is still connected
-//
-// If the client has disconnected, we can cancel all the tasks and save some compute.
-#[derive(Debug, Clone, Serialize)]
-pub struct HeartbeatReply;
+#[derive(Debug, Clone, Default)]
+pub struct SseManager {
+ clients: Arc<Mutex<Vec<mpsc::Sender<sse::Event>>>>,
+}
-/// SSE API route that yields an event stream that clients can subscribe to
-pub async fn api_sse(
- config: web::Data<MiniserveConfig>,
- task_joinset: web::Data<Mutex<DirSizeJoinSet>>,
-) -> impl Responder {
- let (sender, receiver) = tokio::sync::mpsc::channel(2);
-
- actix_web::rt::spawn(async move {
- loop {
- let msg = match task_joinset.lock().await.try_join_next() {
- Some(Ok(Ok(finished_task))) => {
- let dir_size = if config.show_exact_bytes {
- format!("{} B", finished_task.size)
- } else {
- ByteSize::b(finished_task.size).to_string()
- };
+impl SseManager {
+ /// Constructs new broadcaster and spawns ping loop.
+ pub fn new() -> Self {
+ let clients = Arc::new(Mutex::new(Vec::<mpsc::Sender<sse::Event>>::new()));
- let dir_size_reply = DirSizeReply {
- web_path: finished_task.web_path,
- size: dir_size,
- };
+ // Spawn a task that will periodically check for stale clients.
+ let clients_ = clients.clone();
+ actix_web::rt::spawn(async move {
+ let mut interval = interval(Duration::from_secs(10));
- sse::Data::new_json(dir_size_reply)
- .expect("Couldn't serialize as JSON")
- .event("dir-size")
- }
- Some(Ok(Err(e))) => {
- error!("Some error during dir size calculation: {e}");
- break;
- }
- Some(Err(e)) => {
- error!("Some error during dir size calculation joining: {e}");
- break;
+ loop {
+ interval.tick().await;
+
+ // Clean up stale clients
+ let clients = clients_.lock().await.clone();
+ let mut ok_clients = Vec::new();
+ for client in clients {
+ if client
+ .send(sse::Event::Comment("ping".into()))
+ .await
+ .is_ok()
+ {
+ // Clients that are able to receive this are still connected and the rest
+ // will be dropped.
+ ok_clients.push(client.clone());
+ } else {
+ info!("Removing a stale client");
+ }
}
- None => sse::Data::new_json(HeartbeatReply)
- .expect("Couldn't serialize as JSON")
- .event("heartbeat"),
- };
-
- if sender.send(msg.into()).await.is_err() {
- warn!("Client disconnected; could not send SSE message");
- break;
+ *clients_.lock().await = ok_clients;
}
+ });
- tokio::time::sleep(Duration::from_secs(1)).await;
- }
- });
+ Self { clients }
+ }
+
+ /// Registers client with broadcaster, returning an SSE response body.
+ pub async fn new_client(&self) -> Sse<InfallibleStream<ReceiverStream<sse::Event>>> {
+ let (tx, rx) = mpsc::channel(10);
+
+ tx.send(sse::Data::new("Connected to SSE event stream").into())
+ .await
+ .unwrap();
+
+ self.clients.lock().await.push(tx);
+
+ Sse::from_infallible_receiver(rx)
+ }
- sse::Sse::from_infallible_receiver(receiver).with_keep_alive(Duration::from_secs(3))
+ /// Broadcasts `msg` to all clients.
+ pub async fn broadcast(&self, msg: sse::Data) {
+ let clients = self.clients.lock().await.clone();
+
+ let send_futures = clients.iter().map(|client| client.send(msg.clone().into()));
+
+ // Try to send to all clients, ignoring failures disconnected clients will get swept up by
+ // `remove_stale_clients`.
+ let _ = join_all(send_futures).await;
+ }
+}
+
+/// SSE API route that yields an event stream that clients can subscribe to
+pub async fn api_sse(sse_manager: web::Data<SseManager>) -> impl Responder {
+ sse_manager.new_client().await
}
async fn handle_dir_size_tasks(
dirs: Vec<String>,
config: &MiniserveConfig,
- task_joinset: web::Data<Mutex<DirSizeJoinSet>>,
+ dir_size_tasks: web::Data<DirSizeTasks>,
) -> Result<(), RuntimeError> {
for dir in dirs {
// The dir argument might be percent-encoded so let's decode it just in case.
@@ -140,16 +223,7 @@ async fn handle_dir_size_tasks(
.join(sanitized_path);
info!("Requested directory size for {full_path:?}");
- let mut joinset = task_joinset.lock().await;
- joinset.spawn(async move {
- recursive_dir_size(&full_path).await.map(|dir_size| {
- info!("Finished dir size calculation for {full_path:?}");
- DirSize {
- web_path: dir,
- size: dir_size,
- }
- })
- });
+ dir_size_tasks.calc_dir_size(dir, full_path).await;
}
Ok(())
}
@@ -159,11 +233,11 @@ async fn handle_dir_size_tasks(
pub async fn api_command(
command: web::Json<ApiCommand>,
config: web::Data<MiniserveConfig>,
- task_joinset: web::Data<Mutex<DirSizeJoinSet>>,
+ dir_size_tasks: web::Data<DirSizeTasks>,
) -> Result<impl Responder, RuntimeError> {
match command.into_inner() {
ApiCommand::CalculateDirSizes(dirs) => {
- handle_dir_size_tasks(dirs, &config, task_joinset).await?;
+ handle_dir_size_tasks(dirs, &config, dir_size_tasks).await?;
Ok("Directories are being calculated")
}
}
diff --git a/src/main.rs b/src/main.rs
index 3fe1410..28a666b 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -19,7 +19,6 @@ use colored::*;
use dav_server::{DavHandler, DavMethodSet};
use fast_qr::QRBuilder;
use log::{error, warn};
-use tokio::sync::Mutex;
mod archive;
mod args;
@@ -38,7 +37,8 @@ mod webdav_fs;
use crate::config::MiniserveConfig;
use crate::errors::StartupError;
use crate::handlers::{
- DirSizeJoinSet, api_command, api_sse, css, dav_handler, error_404, favicon, healthcheck,
+ DirSizeTasks, SseManager, api_command, api_sse, css, dav_handler, error_404, favicon,
+ healthcheck,
};
use crate::webdav_fs::RestrictedFs;
@@ -214,13 +214,18 @@ async fn run(miniserve_config: MiniserveConfig) -> Result<(), StartupError> {
.join("\n"),
);
- let dir_size_join_set = web::Data::new(Mutex::new(DirSizeJoinSet::new()));
+ let sse_manager = web::Data::new(SseManager::new());
+ let dir_size_tasks = web::Data::new(DirSizeTasks::new(
+ miniserve_config.show_exact_bytes,
+ sse_manager.clone(),
+ ));
let srv = actix_web::HttpServer::new(move || {
App::new()
.wrap(configure_header(&inside_config.clone()))
.app_data(web::Data::new(inside_config.clone()))
- .app_data(dir_size_join_set.clone())
+ .app_data(dir_size_tasks.clone())
+ .app_data(sse_manager.clone())
.app_data(stylesheet.clone())
.wrap(from_fn(errors::error_page_middleware))
.wrap(middleware::Logger::default())
diff --git a/tests/api.rs b/tests/api.rs
index fe313b0..69fe5ee 100644
--- a/tests/api.rs
+++ b/tests/api.rs
@@ -18,7 +18,7 @@ use crate::fixtures::{DIRECTORIES, Error, TestServer, server};
#[case(utf8_percent_encode(DIRECTORIES[2], NON_ALPHANUMERIC).to_string())]
fn api_dir_size(#[case] dir: String, server: TestServer) -> Result<(), Error> {
let mut command = HashMap::new();
- command.insert("DirSize", dir);
+ command.insert("CalculateDirSizes", vec![dir]);
let resp = Client::new()
.post(server.url().join(&format!("__miniserve_internal/api"))?)