diff options
-rw-r--r-- | src/archive.rs | 177 | ||||
-rw-r--r-- | src/listing.rs | 85 | ||||
-rw-r--r-- | src/main.rs | 21 | ||||
-rw-r--r-- | src/pipe.rs | 54 |
4 files changed, 217 insertions, 120 deletions
diff --git a/src/archive.rs b/src/archive.rs index b62c3bd..268bb47 100644 --- a/src/archive.rs +++ b/src/archive.rs @@ -1,8 +1,6 @@ use actix_web::http::ContentEncoding; -use bytes::Bytes; use libflate::gzip::Encoder; use serde::Deserialize; -use std::io; use std::path::Path; use strum_macros::{Display, EnumIter, EnumString}; use tar::Builder; @@ -10,123 +8,154 @@ use tar::Builder; use crate::errors::ContextualError; /// Available compression methods -#[derive(Deserialize, Clone, EnumIter, EnumString, Display)] +#[derive(Deserialize, Clone, Copy, EnumIter, EnumString, Display)] #[serde(rename_all = "snake_case")] #[strum(serialize_all = "snake_case")] pub enum CompressionMethod { - /// TAR GZ + /// Gzipped tarball TarGz, + + /// Regular tarball + Tar, } impl CompressionMethod { - pub fn extension(&self) -> String { - match &self { + pub fn extension(self) -> String { + match self { CompressionMethod::TarGz => "tar.gz", + CompressionMethod::Tar => "tar", } .to_string() } - pub fn content_type(&self) -> String { - match &self { + pub fn content_type(self) -> String { + match self { CompressionMethod::TarGz => "application/gzip", + CompressionMethod::Tar => "application/tar", } .to_string() } - pub fn content_encoding(&self) -> ContentEncoding { - match &self { + pub fn content_encoding(self) -> ContentEncoding { + match self { CompressionMethod::TarGz => ContentEncoding::Gzip, + CompressionMethod::Tar => ContentEncoding::Identity, } } - /// Creates an archive of a folder, using the algorithm the user chose from the web interface - /// This method returns the archive as a stream of bytes - pub fn create_archive<T: AsRef<Path>>( - &self, + + /// Make an archive out of the given directory, and write the output to the given writer. + /// + /// Recursively includes all files and subdirectories. + /// + /// If `skip_symlinks` is `true`, symlinks fill not be followed and will just be ignored. + pub fn create_archive<T, W>( + self, dir: T, skip_symlinks: bool, - ) -> Result<(String, Bytes), ContextualError> { + out: W, + ) -> Result<(), ContextualError> + where + T: AsRef<Path>, + W: std::io::Write, + { + let dir = dir.as_ref(); match self { - CompressionMethod::TarGz => tgz_compress(dir, skip_symlinks), + CompressionMethod::TarGz => tar_gz(dir, skip_symlinks, out), + CompressionMethod::Tar => tar_dir(dir, skip_symlinks, out), } } } -/// Compresses a given folder in .tar.gz format, and returns the result as a stream of bytes -fn tgz_compress<T: AsRef<Path>>( - dir: T, - skip_symlinks: bool, -) -> Result<(String, Bytes), ContextualError> { - if let Some(inner_folder) = dir.as_ref().file_name() { - if let Some(directory) = inner_folder.to_str() { - let dst_filename = format!("{}.tar", directory); - let dst_tgz_filename = format!("{}.gz", dst_filename); - let mut tgz_data = Bytes::new(); - - let tar_data = - tar(dir.as_ref(), directory.to_string(), skip_symlinks).map_err(|e| { - ContextualError::ArchiveCreationError("tarball".to_string(), Box::new(e)) - })?; - - let gz_data = gzip(&tar_data).map_err(|e| { - ContextualError::ArchiveCreationError("GZIP archive".to_string(), Box::new(e)) - })?; - - tgz_data.extend_from_slice(&gz_data); - - Ok((dst_tgz_filename, tgz_data)) - } else { - // https://doc.rust-lang.org/std/ffi/struct.OsStr.html#method.to_str - Err(ContextualError::InvalidPathError( - "Directory name contains invalid UTF-8 characters".to_string(), - )) - } - } else { - // https://doc.rust-lang.org/std/path/struct.Path.html#method.file_name - Err(ContextualError::InvalidPathError( - "Directory name terminates in \"..\"".to_string(), - )) - } +/// Write a gzipped tarball of `dir` in `out`. +fn tar_gz<W>(dir: &Path, skip_symlinks: bool, out: W) -> Result<(), ContextualError> +where + W: std::io::Write, +{ + let mut out = Encoder::new(out).map_err(|e| ContextualError::IOError("GZIP".to_string(), e))?; + + tar_dir(dir, skip_symlinks, &mut out)?; + + out.finish() + .into_result() + .map_err(|e| ContextualError::IOError("GZIP finish".to_string(), e))?; + + Ok(()) } -/// Creates a TAR archive of a folder, and returns it as a stream of bytes -fn tar<T: AsRef<Path>>( - src_dir: T, +/// Write a tarball of `dir` in `out`. +/// +/// The target directory will be saved as a top-level directory in the archive. +/// +/// For example, consider this directory structure: +/// +/// ``` +/// a +/// └── b +/// └── c +/// ├── e +/// ├── f +/// └── g +/// ``` +/// +/// Making a tarball out of `"a/b/c"` will result in this archive content: +/// +/// ``` +/// c +/// ├── e +/// ├── f +/// └── g +/// ``` +fn tar_dir<W>(dir: &Path, skip_symlinks: bool, out: W) -> Result<(), ContextualError> +where + W: std::io::Write, +{ + let inner_folder = dir.file_name().ok_or_else(|| { + ContextualError::InvalidPathError("Directory name terminates in \"..\"".to_string()) + })?; + + let directory = inner_folder.to_str().ok_or_else(|| { + ContextualError::InvalidPathError( + "Directory name contains invalid UTF-8 characters".to_string(), + ) + })?; + + tar(dir, directory.to_string(), skip_symlinks, out) + .map_err(|e| ContextualError::ArchiveCreationError("tarball".to_string(), Box::new(e))) +} + +/// Writes a tarball of `dir` in `out`. +/// +/// The content of `src_dir` will be saved in the archive as a folder named `inner_folder`. +fn tar<W>( + src_dir: &Path, inner_folder: String, skip_symlinks: bool, -) -> Result<Vec<u8>, ContextualError> { - let mut tar_builder = Builder::new(Vec::new()); + out: W, +) -> Result<(), ContextualError> +where + W: std::io::Write, +{ + let mut tar_builder = Builder::new(out); tar_builder.follow_symlinks(!skip_symlinks); + // Recursively adds the content of src_dir into the archive stream tar_builder - .append_dir_all(inner_folder, src_dir.as_ref()) + .append_dir_all(inner_folder, src_dir) .map_err(|e| { ContextualError::IOError( format!( "Failed to append the content of {} to the TAR archive", - src_dir.as_ref().to_str().unwrap_or("file") + src_dir.to_str().unwrap_or("file") ), e, ) })?; - let tar_content = tar_builder.into_inner().map_err(|e| { + // Finish the archive + tar_builder.into_inner().map_err(|e| { ContextualError::IOError("Failed to finish writing the TAR archive".to_string(), e) })?; - Ok(tar_content) -} - -/// Compresses a stream of bytes using the GZIP algorithm, and returns the resulting stream -fn gzip(mut data: &[u8]) -> Result<Vec<u8>, ContextualError> { - let mut encoder = Encoder::new(Vec::new()) - .map_err(|e| ContextualError::IOError("Failed to create GZIP encoder".to_string(), e))?; - io::copy(&mut data, &mut encoder) - .map_err(|e| ContextualError::IOError("Failed to write GZIP data".to_string(), e))?; - let data = encoder - .finish() - .into_result() - .map_err(|e| ContextualError::IOError("Failed to write GZIP trailer".to_string(), e))?; - - Ok(data) + Ok(()) } diff --git a/src/listing.rs b/src/listing.rs index 2ffcf2f..ee9c581 100644 --- a/src/listing.rs +++ b/src/listing.rs @@ -1,7 +1,6 @@ -use actix_web::http::StatusCode; -use actix_web::{fs, http, Body, FromRequest, HttpRequest, HttpResponse, Query, Result}; +use actix_web::{fs, Body, FromRequest, HttpRequest, HttpResponse, Query, Result}; use bytesize::ByteSize; -use futures::stream::once; +use futures::Stream; use htmlescape::encode_minimal as escape_html_entity; use percent_encoding::{utf8_percent_encode, DEFAULT_ENCODE_SET}; use serde::Deserialize; @@ -235,46 +234,56 @@ pub fn directory_listing<S>( let color_scheme = query_params.theme.unwrap_or(default_color_scheme); - if let Some(compression_method) = &query_params.download { + if let Some(compression_method) = query_params.download { log::info!( "Creating an archive ({extension}) of {path}...", extension = compression_method.extension(), path = &dir.path.display().to_string() ); - match compression_method.create_archive(&dir.path, skip_symlinks) { - Ok((filename, content)) => { - log::info!("{file} successfully created !", file = &filename); - Ok(HttpResponse::Ok() - .content_type(compression_method.content_type()) - .content_encoding(compression_method.content_encoding()) - .header("Content-Transfer-Encoding", "binary") - .header( - "Content-Disposition", - format!("attachment; filename={:?}", filename), - ) - .chunked() - .body(Body::Streaming(Box::new(once(Ok(content)))))) - } - Err(err) => { - errors::log_error_chain(err.to_string()); - Ok(HttpResponse::Ok() - .status(http::StatusCode::INTERNAL_SERVER_ERROR) - .body( - renderer::render_error( - &err.to_string(), - StatusCode::INTERNAL_SERVER_ERROR, - serve_path, - query_params.sort, - query_params.order, - color_scheme, - default_color_scheme, - false, - true, - ) - .into_string(), - )) + + let filename = format!( + "{}.{}", + dir.path.file_name().unwrap().to_str().unwrap(), + compression_method.extension() + ); + + // We will create the archive in a separate thread, and stream the content using a pipe. + // The pipe is made of a futures channel, and an adapter to implement the `Write` trait. + // Include 10 messages of buffer for erratic connection speeds. + let (tx, rx) = futures::sync::mpsc::channel(10); + let pipe = crate::pipe::Pipe::new(tx); + + // Start the actual archive creation in a separate thread. + let dir = dir.path.to_path_buf(); + std::thread::spawn(move || { + if let Err(err) = compression_method.create_archive(dir, skip_symlinks, pipe) { + log::error!("Error during archive creation: {:?}", err); } - } + }); + + // `rx` is a receiver of bytes - it can act like a `Stream` of bytes, and that's exactly + // what actix-web wants to stream the response. + // + // But right now the error types do not match: + // `<rx as Stream>::Error == ()`, but we want `actix_web::error::Error` + // + // That being said, `rx` will never fail because the `Stream` implementation for `Receiver` + // never returns an error - it simply cannot fail. + let rx = rx.map_err(|_| unreachable!("pipes never fail")); + + // At this point, `rx` implements everything actix want for a streamed response, + // so we can just give a `Box::new(rx)` as streaming body. + + Ok(HttpResponse::Ok() + .content_type(compression_method.content_type()) + .content_encoding(compression_method.content_encoding()) + .header("Content-Transfer-Encoding", "binary") + .header( + "Content-Disposition", + format!("attachment; filename={:?}", filename), + ) + .chunked() + .body(Body::Streaming(Box::new(rx)))) } else { Ok(HttpResponse::Ok() .content_type("text/html; charset=utf-8") @@ -302,7 +311,7 @@ pub fn extract_query_parameters<S>(req: &HttpRequest<S>) -> QueryParameters { Ok(query) => QueryParameters { sort: query.sort, order: query.order, - download: query.download.clone(), + download: query.download, theme: query.theme, path: query.path.clone(), }, diff --git a/src/main.rs b/src/main.rs index f26369a..ddf25e1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,6 +16,7 @@ mod auth; mod errors; mod file_upload; mod listing; +mod pipe; mod renderer; mod themes; @@ -79,17 +80,21 @@ fn run() -> Result<(), ContextualError> { TermLogger::init(LevelFilter::Error, Config::default()) }; - if miniserve_config.no_symlinks - && miniserve_config + if miniserve_config.no_symlinks { + let is_symlink = miniserve_config .path .symlink_metadata() - .map_err(|e| ContextualError::IOError("Failed to retrieve symlink's metadata".to_string(), e))? + .map_err(|e| { + ContextualError::IOError("Failed to retrieve symlink's metadata".to_string(), e) + })? .file_type() - .is_symlink() - { - return Err(ContextualError::from( - "The no-symlinks option cannot be used with a symlink path".to_string(), - )); + .is_symlink(); + + if is_symlink { + return Err(ContextualError::from( + "The no-symlinks option cannot be used with a symlink path".to_string(), + )); + } } let inside_config = miniserve_config.clone(); diff --git a/src/pipe.rs b/src/pipe.rs new file mode 100644 index 0000000..09ca580 --- /dev/null +++ b/src/pipe.rs @@ -0,0 +1,54 @@ +//! Define an adapter to implement `std::io::Write` on `Sender<Bytes>`. +use bytes::{Bytes, BytesMut}; +use futures::sink::{Sink, Wait}; +use futures::sync::mpsc::Sender; +use std::io::{Error, ErrorKind, Result, Write}; + +/// Adapter to implement the `std::io::Write` trait on a `Sender<Bytes>` from a futures channel. +/// +/// It uses an intermediate buffer to transfer packets. +pub struct Pipe { + // Wrapping the sender in `Wait` makes it blocking, so we can implement blocking-style + // io::Write over the async-style Sender. + dest: Wait<Sender<Bytes>>, + bytes: BytesMut, +} + +impl Pipe { + /// Wrap the given sender in a `Pipe`. + pub fn new(destination: Sender<Bytes>) -> Self { + Pipe { + dest: destination.wait(), + bytes: BytesMut::new(), + } + } +} + +impl Drop for Pipe { + fn drop(&mut self) { + // This is the correct thing to do, but is not super important since the `Sink` + // implementation of `Sender` just returns `Ok` without doing anything else. + let _ = self.dest.close(); + } +} + +impl Write for Pipe { + fn write(&mut self, buf: &[u8]) -> Result<usize> { + // We are given a slice of bytes we do not own, so we must start by copying it. + self.bytes.extend_from_slice(buf); + + // Then, take the buffer and send it in the channel. + self.dest + .send(self.bytes.take().into()) + .map_err(|e| Error::new(ErrorKind::UnexpectedEof, e))?; + + // Return how much we sent - all of it. + Ok(buf.len()) + } + + fn flush(&mut self) -> Result<()> { + self.dest + .flush() + .map_err(|e| Error::new(ErrorKind::UnexpectedEof, e)) + } +} |