diff options
author | Sven-Hendrik Haase <svenstaro@gmail.com> | 2025-03-11 10:25:38 +0000 |
---|---|---|
committer | Sven-Hendrik Haase <svenstaro@gmail.com> | 2025-03-11 11:56:02 +0000 |
commit | 612bcc1b80d95bd677da8c6453d6e9415370acbe (patch) | |
tree | 715cbb11bb7ef3e51696c3770023e99c9438669b | |
parent | SSE-based directory size handling (diff) | |
download | miniserve-origin/sse-based-dirsize.tar.gz miniserve-origin/sse-based-dirsize.zip |
Simplify SseManager a bitorigin/sse-based-dirsize
-rw-r--r-- | Cargo.lock | 1 | ||||
-rw-r--r-- | Cargo.toml | 1 | ||||
-rw-r--r-- | src/handlers.rs | 208 | ||||
-rw-r--r-- | src/main.rs | 13 | ||||
-rw-r--r-- | tests/api.rs | 2 |
5 files changed, 153 insertions, 72 deletions
@@ -2319,6 +2319,7 @@ dependencies = [ "tempfile", "thiserror 2.0.11", "tokio", + "tokio-stream", "url", "zip", ] @@ -60,6 +60,7 @@ tar = "0.4" tempfile = "3.17.0" thiserror = "2" tokio = { version = "1.42.0", features = ["fs", "macros"] } +tokio-stream = "0.1.17" zip = { version = "2", default-features = false } [features] diff --git a/src/handlers.rs b/src/handlers.rs index 8b4d6e5..e749292 100644 --- a/src/handlers.rs +++ b/src/handlers.rs @@ -1,17 +1,22 @@ -use std::time::Duration; +use std::{path::PathBuf, sync::Arc, time::Duration}; use actix_web::{HttpRequest, HttpResponse, Responder, http::header::ContentType, web}; -use actix_web_lab::sse; +use actix_web_lab::{ + sse::{self, Sse}, + util::InfallibleStream, +}; use bytesize::ByteSize; use dav_server::{ DavConfig, DavHandler, actix::{DavRequest, DavResponse}, }; -use log::{error, info, warn}; +use futures::future::join_all; +use log::{error, info}; use percent_encoding::percent_decode_str; use serde::{Deserialize, Serialize}; -use tokio::sync::Mutex; -use tokio::task::JoinSet; +use tokio::{sync::Mutex, time::interval}; +use tokio::{sync::mpsc, task::JoinSet}; +use tokio_stream::wrappers::ReceiverStream; use crate::{config::MiniserveConfig, errors::RuntimeError}; use crate::{file_op::recursive_dir_size, file_utils}; @@ -39,7 +44,71 @@ pub enum ApiCommand { CalculateDirSizes(Vec<String>), } -pub type DirSizeJoinSet = JoinSet<Result<DirSize, RuntimeError>>; +#[derive(Debug)] +pub struct DirSizeTasks { + tasks: Arc<Mutex<JoinSet<Result<DirSize, RuntimeError>>>>, +} + +impl DirSizeTasks { + pub fn new(show_exact_bytes: bool, sse_manager: web::Data<SseManager>) -> Self { + let tasks = Arc::new(Mutex::new(JoinSet::<Result<DirSize, RuntimeError>>::new())); + + // Spawn a task that will periodically check for finished calculations. + let tasks_ = tasks.clone(); + actix_web::rt::spawn(async move { + let mut interval = interval(Duration::from_millis(50)); + loop { + // See whether there are any calculations finished and if so dispatch a message to + // the SSE channels. + match tasks_.lock().await.try_join_next() { + Some(Ok(Ok(finished_task))) => { + let dir_size = if show_exact_bytes { + format!("{} B", finished_task.size) + } else { + ByteSize::b(finished_task.size).to_string() + }; + + let dir_size_reply = DirSizeReply { + web_path: finished_task.web_path, + size: dir_size, + }; + + let msg = sse::Data::new_json(dir_size_reply) + .expect("Couldn't serialize as JSON") + .event("dir-size"); + sse_manager.broadcast(msg).await + } + Some(Ok(Err(e))) => { + error!("Some error during dir size calculation: {e}"); + break; + } + Some(Err(e)) => { + error!("Some error during dir size calculation joining: {e}"); + break; + } + None => { + // If there's nothing we'll just chill a sec + interval.tick().await; + } + }; + } + }); + + Self { tasks } + } + + pub async fn calc_dir_size(&self, web_path: String, path: PathBuf) { + self.tasks.lock().await.spawn(async move { + recursive_dir_size(&path).await.map(|dir_size| { + info!("Finished dir size calculation for {path:?}"); + DirSize { + web_path, + size: dir_size, + } + }) + }); + } +} // Holds the result of a calculated dir size #[derive(Debug, Clone)] @@ -61,67 +130,81 @@ pub struct DirSizeReply { pub size: String, } -// Reply to check whether the client is still connected -// -// If the client has disconnected, we can cancel all the tasks and save some compute. -#[derive(Debug, Clone, Serialize)] -pub struct HeartbeatReply; +#[derive(Debug, Clone, Default)] +pub struct SseManager { + clients: Arc<Mutex<Vec<mpsc::Sender<sse::Event>>>>, +} -/// SSE API route that yields an event stream that clients can subscribe to -pub async fn api_sse( - config: web::Data<MiniserveConfig>, - task_joinset: web::Data<Mutex<DirSizeJoinSet>>, -) -> impl Responder { - let (sender, receiver) = tokio::sync::mpsc::channel(2); - - actix_web::rt::spawn(async move { - loop { - let msg = match task_joinset.lock().await.try_join_next() { - Some(Ok(Ok(finished_task))) => { - let dir_size = if config.show_exact_bytes { - format!("{} B", finished_task.size) - } else { - ByteSize::b(finished_task.size).to_string() - }; +impl SseManager { + /// Constructs new broadcaster and spawns ping loop. + pub fn new() -> Self { + let clients = Arc::new(Mutex::new(Vec::<mpsc::Sender<sse::Event>>::new())); - let dir_size_reply = DirSizeReply { - web_path: finished_task.web_path, - size: dir_size, - }; + // Spawn a task that will periodically check for stale clients. + let clients_ = clients.clone(); + actix_web::rt::spawn(async move { + let mut interval = interval(Duration::from_secs(10)); - sse::Data::new_json(dir_size_reply) - .expect("Couldn't serialize as JSON") - .event("dir-size") - } - Some(Ok(Err(e))) => { - error!("Some error during dir size calculation: {e}"); - break; - } - Some(Err(e)) => { - error!("Some error during dir size calculation joining: {e}"); - break; + loop { + interval.tick().await; + + // Clean up stale clients + let clients = clients_.lock().await.clone(); + let mut ok_clients = Vec::new(); + for client in clients { + if client + .send(sse::Event::Comment("ping".into())) + .await + .is_ok() + { + // Clients that are able to receive this are still connected and the rest + // will be dropped. + ok_clients.push(client.clone()); + } else { + info!("Removing a stale client"); + } } - None => sse::Data::new_json(HeartbeatReply) - .expect("Couldn't serialize as JSON") - .event("heartbeat"), - }; - - if sender.send(msg.into()).await.is_err() { - warn!("Client disconnected; could not send SSE message"); - break; + *clients_.lock().await = ok_clients; } + }); - tokio::time::sleep(Duration::from_secs(1)).await; - } - }); + Self { clients } + } + + /// Registers client with broadcaster, returning an SSE response body. + pub async fn new_client(&self) -> Sse<InfallibleStream<ReceiverStream<sse::Event>>> { + let (tx, rx) = mpsc::channel(10); + + tx.send(sse::Data::new("Connected to SSE event stream").into()) + .await + .unwrap(); + + self.clients.lock().await.push(tx); + + Sse::from_infallible_receiver(rx) + } - sse::Sse::from_infallible_receiver(receiver).with_keep_alive(Duration::from_secs(3)) + /// Broadcasts `msg` to all clients. + pub async fn broadcast(&self, msg: sse::Data) { + let clients = self.clients.lock().await.clone(); + + let send_futures = clients.iter().map(|client| client.send(msg.clone().into())); + + // Try to send to all clients, ignoring failures disconnected clients will get swept up by + // `remove_stale_clients`. + let _ = join_all(send_futures).await; + } +} + +/// SSE API route that yields an event stream that clients can subscribe to +pub async fn api_sse(sse_manager: web::Data<SseManager>) -> impl Responder { + sse_manager.new_client().await } async fn handle_dir_size_tasks( dirs: Vec<String>, config: &MiniserveConfig, - task_joinset: web::Data<Mutex<DirSizeJoinSet>>, + dir_size_tasks: web::Data<DirSizeTasks>, ) -> Result<(), RuntimeError> { for dir in dirs { // The dir argument might be percent-encoded so let's decode it just in case. @@ -140,16 +223,7 @@ async fn handle_dir_size_tasks( .join(sanitized_path); info!("Requested directory size for {full_path:?}"); - let mut joinset = task_joinset.lock().await; - joinset.spawn(async move { - recursive_dir_size(&full_path).await.map(|dir_size| { - info!("Finished dir size calculation for {full_path:?}"); - DirSize { - web_path: dir, - size: dir_size, - } - }) - }); + dir_size_tasks.calc_dir_size(dir, full_path).await; } Ok(()) } @@ -159,11 +233,11 @@ async fn handle_dir_size_tasks( pub async fn api_command( command: web::Json<ApiCommand>, config: web::Data<MiniserveConfig>, - task_joinset: web::Data<Mutex<DirSizeJoinSet>>, + dir_size_tasks: web::Data<DirSizeTasks>, ) -> Result<impl Responder, RuntimeError> { match command.into_inner() { ApiCommand::CalculateDirSizes(dirs) => { - handle_dir_size_tasks(dirs, &config, task_joinset).await?; + handle_dir_size_tasks(dirs, &config, dir_size_tasks).await?; Ok("Directories are being calculated") } } diff --git a/src/main.rs b/src/main.rs index 3fe1410..28a666b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -19,7 +19,6 @@ use colored::*; use dav_server::{DavHandler, DavMethodSet}; use fast_qr::QRBuilder; use log::{error, warn}; -use tokio::sync::Mutex; mod archive; mod args; @@ -38,7 +37,8 @@ mod webdav_fs; use crate::config::MiniserveConfig; use crate::errors::StartupError; use crate::handlers::{ - DirSizeJoinSet, api_command, api_sse, css, dav_handler, error_404, favicon, healthcheck, + DirSizeTasks, SseManager, api_command, api_sse, css, dav_handler, error_404, favicon, + healthcheck, }; use crate::webdav_fs::RestrictedFs; @@ -214,13 +214,18 @@ async fn run(miniserve_config: MiniserveConfig) -> Result<(), StartupError> { .join("\n"), ); - let dir_size_join_set = web::Data::new(Mutex::new(DirSizeJoinSet::new())); + let sse_manager = web::Data::new(SseManager::new()); + let dir_size_tasks = web::Data::new(DirSizeTasks::new( + miniserve_config.show_exact_bytes, + sse_manager.clone(), + )); let srv = actix_web::HttpServer::new(move || { App::new() .wrap(configure_header(&inside_config.clone())) .app_data(web::Data::new(inside_config.clone())) - .app_data(dir_size_join_set.clone()) + .app_data(dir_size_tasks.clone()) + .app_data(sse_manager.clone()) .app_data(stylesheet.clone()) .wrap(from_fn(errors::error_page_middleware)) .wrap(middleware::Logger::default()) diff --git a/tests/api.rs b/tests/api.rs index fe313b0..69fe5ee 100644 --- a/tests/api.rs +++ b/tests/api.rs @@ -18,7 +18,7 @@ use crate::fixtures::{DIRECTORIES, Error, TestServer, server}; #[case(utf8_percent_encode(DIRECTORIES[2], NON_ALPHANUMERIC).to_string())] fn api_dir_size(#[case] dir: String, server: TestServer) -> Result<(), Error> { let mut command = HashMap::new(); - command.insert("DirSize", dir); + command.insert("CalculateDirSizes", vec![dir]); let resp = Client::new() .post(server.url().join(&format!("__miniserve_internal/api"))?) |