aboutsummaryrefslogtreecommitdiffstats
path: root/src/handlers.rs
diff options
context:
space:
mode:
authorSven-Hendrik Haase <svenstaro@gmail.com>2025-03-11 10:25:38 +0000
committerSven-Hendrik Haase <svenstaro@gmail.com>2025-03-11 11:56:02 +0000
commit612bcc1b80d95bd677da8c6453d6e9415370acbe (patch)
tree715cbb11bb7ef3e51696c3770023e99c9438669b /src/handlers.rs
parentSSE-based directory size handling (diff)
downloadminiserve-612bcc1b80d95bd677da8c6453d6e9415370acbe.tar.gz
miniserve-612bcc1b80d95bd677da8c6453d6e9415370acbe.zip
Simplify SseManager a bitorigin/sse-based-dirsize
Diffstat (limited to 'src/handlers.rs')
-rw-r--r--src/handlers.rs208
1 files changed, 141 insertions, 67 deletions
diff --git a/src/handlers.rs b/src/handlers.rs
index 8b4d6e5..e749292 100644
--- a/src/handlers.rs
+++ b/src/handlers.rs
@@ -1,17 +1,22 @@
-use std::time::Duration;
+use std::{path::PathBuf, sync::Arc, time::Duration};
use actix_web::{HttpRequest, HttpResponse, Responder, http::header::ContentType, web};
-use actix_web_lab::sse;
+use actix_web_lab::{
+ sse::{self, Sse},
+ util::InfallibleStream,
+};
use bytesize::ByteSize;
use dav_server::{
DavConfig, DavHandler,
actix::{DavRequest, DavResponse},
};
-use log::{error, info, warn};
+use futures::future::join_all;
+use log::{error, info};
use percent_encoding::percent_decode_str;
use serde::{Deserialize, Serialize};
-use tokio::sync::Mutex;
-use tokio::task::JoinSet;
+use tokio::{sync::Mutex, time::interval};
+use tokio::{sync::mpsc, task::JoinSet};
+use tokio_stream::wrappers::ReceiverStream;
use crate::{config::MiniserveConfig, errors::RuntimeError};
use crate::{file_op::recursive_dir_size, file_utils};
@@ -39,7 +44,71 @@ pub enum ApiCommand {
CalculateDirSizes(Vec<String>),
}
-pub type DirSizeJoinSet = JoinSet<Result<DirSize, RuntimeError>>;
+#[derive(Debug)]
+pub struct DirSizeTasks {
+ tasks: Arc<Mutex<JoinSet<Result<DirSize, RuntimeError>>>>,
+}
+
+impl DirSizeTasks {
+ pub fn new(show_exact_bytes: bool, sse_manager: web::Data<SseManager>) -> Self {
+ let tasks = Arc::new(Mutex::new(JoinSet::<Result<DirSize, RuntimeError>>::new()));
+
+ // Spawn a task that will periodically check for finished calculations.
+ let tasks_ = tasks.clone();
+ actix_web::rt::spawn(async move {
+ let mut interval = interval(Duration::from_millis(50));
+ loop {
+ // See whether there are any calculations finished and if so dispatch a message to
+ // the SSE channels.
+ match tasks_.lock().await.try_join_next() {
+ Some(Ok(Ok(finished_task))) => {
+ let dir_size = if show_exact_bytes {
+ format!("{} B", finished_task.size)
+ } else {
+ ByteSize::b(finished_task.size).to_string()
+ };
+
+ let dir_size_reply = DirSizeReply {
+ web_path: finished_task.web_path,
+ size: dir_size,
+ };
+
+ let msg = sse::Data::new_json(dir_size_reply)
+ .expect("Couldn't serialize as JSON")
+ .event("dir-size");
+ sse_manager.broadcast(msg).await
+ }
+ Some(Ok(Err(e))) => {
+ error!("Some error during dir size calculation: {e}");
+ break;
+ }
+ Some(Err(e)) => {
+ error!("Some error during dir size calculation joining: {e}");
+ break;
+ }
+ None => {
+ // If there's nothing we'll just chill a sec
+ interval.tick().await;
+ }
+ };
+ }
+ });
+
+ Self { tasks }
+ }
+
+ pub async fn calc_dir_size(&self, web_path: String, path: PathBuf) {
+ self.tasks.lock().await.spawn(async move {
+ recursive_dir_size(&path).await.map(|dir_size| {
+ info!("Finished dir size calculation for {path:?}");
+ DirSize {
+ web_path,
+ size: dir_size,
+ }
+ })
+ });
+ }
+}
// Holds the result of a calculated dir size
#[derive(Debug, Clone)]
@@ -61,67 +130,81 @@ pub struct DirSizeReply {
pub size: String,
}
-// Reply to check whether the client is still connected
-//
-// If the client has disconnected, we can cancel all the tasks and save some compute.
-#[derive(Debug, Clone, Serialize)]
-pub struct HeartbeatReply;
+#[derive(Debug, Clone, Default)]
+pub struct SseManager {
+ clients: Arc<Mutex<Vec<mpsc::Sender<sse::Event>>>>,
+}
-/// SSE API route that yields an event stream that clients can subscribe to
-pub async fn api_sse(
- config: web::Data<MiniserveConfig>,
- task_joinset: web::Data<Mutex<DirSizeJoinSet>>,
-) -> impl Responder {
- let (sender, receiver) = tokio::sync::mpsc::channel(2);
-
- actix_web::rt::spawn(async move {
- loop {
- let msg = match task_joinset.lock().await.try_join_next() {
- Some(Ok(Ok(finished_task))) => {
- let dir_size = if config.show_exact_bytes {
- format!("{} B", finished_task.size)
- } else {
- ByteSize::b(finished_task.size).to_string()
- };
+impl SseManager {
+ /// Constructs new broadcaster and spawns ping loop.
+ pub fn new() -> Self {
+ let clients = Arc::new(Mutex::new(Vec::<mpsc::Sender<sse::Event>>::new()));
- let dir_size_reply = DirSizeReply {
- web_path: finished_task.web_path,
- size: dir_size,
- };
+ // Spawn a task that will periodically check for stale clients.
+ let clients_ = clients.clone();
+ actix_web::rt::spawn(async move {
+ let mut interval = interval(Duration::from_secs(10));
- sse::Data::new_json(dir_size_reply)
- .expect("Couldn't serialize as JSON")
- .event("dir-size")
- }
- Some(Ok(Err(e))) => {
- error!("Some error during dir size calculation: {e}");
- break;
- }
- Some(Err(e)) => {
- error!("Some error during dir size calculation joining: {e}");
- break;
+ loop {
+ interval.tick().await;
+
+ // Clean up stale clients
+ let clients = clients_.lock().await.clone();
+ let mut ok_clients = Vec::new();
+ for client in clients {
+ if client
+ .send(sse::Event::Comment("ping".into()))
+ .await
+ .is_ok()
+ {
+ // Clients that are able to receive this are still connected and the rest
+ // will be dropped.
+ ok_clients.push(client.clone());
+ } else {
+ info!("Removing a stale client");
+ }
}
- None => sse::Data::new_json(HeartbeatReply)
- .expect("Couldn't serialize as JSON")
- .event("heartbeat"),
- };
-
- if sender.send(msg.into()).await.is_err() {
- warn!("Client disconnected; could not send SSE message");
- break;
+ *clients_.lock().await = ok_clients;
}
+ });
- tokio::time::sleep(Duration::from_secs(1)).await;
- }
- });
+ Self { clients }
+ }
+
+ /// Registers client with broadcaster, returning an SSE response body.
+ pub async fn new_client(&self) -> Sse<InfallibleStream<ReceiverStream<sse::Event>>> {
+ let (tx, rx) = mpsc::channel(10);
+
+ tx.send(sse::Data::new("Connected to SSE event stream").into())
+ .await
+ .unwrap();
+
+ self.clients.lock().await.push(tx);
+
+ Sse::from_infallible_receiver(rx)
+ }
- sse::Sse::from_infallible_receiver(receiver).with_keep_alive(Duration::from_secs(3))
+ /// Broadcasts `msg` to all clients.
+ pub async fn broadcast(&self, msg: sse::Data) {
+ let clients = self.clients.lock().await.clone();
+
+ let send_futures = clients.iter().map(|client| client.send(msg.clone().into()));
+
+ // Try to send to all clients, ignoring failures disconnected clients will get swept up by
+ // `remove_stale_clients`.
+ let _ = join_all(send_futures).await;
+ }
+}
+
+/// SSE API route that yields an event stream that clients can subscribe to
+pub async fn api_sse(sse_manager: web::Data<SseManager>) -> impl Responder {
+ sse_manager.new_client().await
}
async fn handle_dir_size_tasks(
dirs: Vec<String>,
config: &MiniserveConfig,
- task_joinset: web::Data<Mutex<DirSizeJoinSet>>,
+ dir_size_tasks: web::Data<DirSizeTasks>,
) -> Result<(), RuntimeError> {
for dir in dirs {
// The dir argument might be percent-encoded so let's decode it just in case.
@@ -140,16 +223,7 @@ async fn handle_dir_size_tasks(
.join(sanitized_path);
info!("Requested directory size for {full_path:?}");
- let mut joinset = task_joinset.lock().await;
- joinset.spawn(async move {
- recursive_dir_size(&full_path).await.map(|dir_size| {
- info!("Finished dir size calculation for {full_path:?}");
- DirSize {
- web_path: dir,
- size: dir_size,
- }
- })
- });
+ dir_size_tasks.calc_dir_size(dir, full_path).await;
}
Ok(())
}
@@ -159,11 +233,11 @@ async fn handle_dir_size_tasks(
pub async fn api_command(
command: web::Json<ApiCommand>,
config: web::Data<MiniserveConfig>,
- task_joinset: web::Data<Mutex<DirSizeJoinSet>>,
+ dir_size_tasks: web::Data<DirSizeTasks>,
) -> Result<impl Responder, RuntimeError> {
match command.into_inner() {
ApiCommand::CalculateDirSizes(dirs) => {
- handle_dir_size_tasks(dirs, &config, task_joinset).await?;
+ handle_dir_size_tasks(dirs, &config, dir_size_tasks).await?;
Ok("Directories are being calculated")
}
}