aboutsummaryrefslogtreecommitdiffstats
path: root/src/file_op.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/file_op.rs')
-rw-r--r--src/file_op.rs222
1 files changed, 195 insertions, 27 deletions
diff --git a/src/file_op.rs b/src/file_op.rs
index 18bdcbe..afd6449 100644
--- a/src/file_op.rs
+++ b/src/file_op.rs
@@ -4,10 +4,12 @@ use std::io::ErrorKind;
use std::path::{Component, Path, PathBuf};
use actix_web::{http::header, web, HttpRequest, HttpResponse};
-use futures::TryFutureExt;
-use futures::TryStreamExt;
+use futures::{StreamExt, TryStreamExt};
+use log::{info, warn};
use serde::Deserialize;
-use tokio::fs::File;
+use sha2::digest::DynDigest;
+use sha2::{Digest, Sha256, Sha512};
+use tempfile::NamedTempFile;
use tokio::io::AsyncWriteExt;
use crate::{
@@ -15,52 +17,183 @@ use crate::{
file_utils::sanitize_path,
};
-/// Saves file data from a multipart form field (`field`) to `file_path`, optionally overwriting
-/// existing file.
+enum FileHash {
+ SHA256(String),
+ SHA512(String),
+}
+
+impl FileHash {
+ pub fn get_hasher(&self) -> Box<dyn DynDigest> {
+ match self {
+ Self::SHA256(_) => Box::new(Sha256::new()),
+ Self::SHA512(_) => Box::new(Sha512::new()),
+ }
+ }
+
+ pub fn get_hash(&self) -> &str {
+ match self {
+ Self::SHA256(string) => string,
+ Self::SHA512(string) => string,
+ }
+ }
+}
+
+/// Saves file data from a multipart form field (`field`) to `file_path`. Optionally overwriting
+/// existing file and comparing the uploaded file checksum to the user provided `file_hash`.
///
/// Returns total bytes written to file.
async fn save_file(
- field: actix_multipart::Field,
+ field: &mut actix_multipart::Field,
file_path: PathBuf,
overwrite_files: bool,
+ file_checksum: Option<&FileHash>,
+ temporary_upload_directory: Option<&PathBuf>,
) -> Result<u64, RuntimeError> {
if !overwrite_files && file_path.exists() {
return Err(RuntimeError::DuplicateFileError);
}
- let file = match File::create(&file_path).await {
+ let temp_upload_directory = temporary_upload_directory.cloned();
+ // Tempfile doesn't support async operations, so we'll do it on a background thread.
+ let temp_upload_directory_task = tokio::task::spawn_blocking(move || {
+ // If the user provided a temporary directory path, then use it.
+ if let Some(temp_directory) = temp_upload_directory {
+ NamedTempFile::new_in(temp_directory)
+ } else {
+ NamedTempFile::new()
+ }
+ });
+
+ // Validate that the temporary task completed successfully.
+ let named_temp_file_task = match temp_upload_directory_task.await {
+ Ok(named_temp_file) => Ok(named_temp_file),
+ Err(err) => Err(RuntimeError::MultipartError(format!(
+ "Failed to complete spawned task to create named temp file. {err}",
+ ))),
+ }?;
+
+ // Validate the the temporary file was created successfully.
+ let named_temp_file = match named_temp_file_task {
Err(err) if err.kind() == ErrorKind::PermissionDenied => Err(
RuntimeError::InsufficientPermissionsError(file_path.display().to_string()),
),
Err(err) => Err(RuntimeError::IoError(
- format!("Failed to create {}", file_path.display()),
+ format!("Failed to create temporary file {}", file_path.display()),
err,
)),
- Ok(v) => Ok(v),
+ Ok(file) => Ok(file),
}?;
- let (_, written_len) = field
- .map_err(|x| RuntimeError::MultipartError(x.to_string()))
- .try_fold((file, 0u64), |(mut file, written_len), bytes| async move {
- file.write_all(bytes.as_ref())
- .map_err(|e| RuntimeError::IoError("Failed to write to file".to_string(), e))
- .await?;
- Ok((file, written_len + bytes.len() as u64))
- })
- .await?;
+ // Convert the temporary file into a non-temporary file. This allows us
+ // to control the lifecycle of the file. This is useful for us because
+ // we need to convert the temporary file into an async enabled file and
+ // on successful upload, we want to move it to the target directory.
+ let (file, temp_path) = named_temp_file
+ .keep()
+ .map_err(|err| RuntimeError::IoError("Failed to keep temporary file".into(), err.error))?;
+ let mut temp_file = tokio::fs::File::from_std(file);
+
+ let mut written_len = 0;
+ let mut hasher = file_checksum.as_ref().map(|h| h.get_hasher());
+ let mut save_upload_file_error: Option<RuntimeError> = None;
+
+ // This while loop take a stream (in this case `field`) and awaits
+ // new chunks from the websocket connection. The while loop reads
+ // the file from the HTTP connection and writes it to disk or until
+ // the stream from the multipart request is aborted.
+ while let Some(Ok(bytes)) = field.next().await {
+ // If the hasher exists (if the user has also sent a chunksum with the request)
+ // then we want to update the hasher with the new bytes uploaded.
+ if let Some(hasher) = hasher.as_mut() {
+ hasher.update(&bytes)
+ }
+ // Write the bytes from the stream into our temporary file.
+ if let Err(e) = temp_file.write_all(&bytes).await {
+ // Failed to write to file. Drop it and return the error
+ save_upload_file_error =
+ Some(RuntimeError::IoError("Failed to write to file".into(), e));
+ break;
+ }
+ // record the bytes written to the file.
+ written_len += bytes.len() as u64;
+ }
+
+ if save_upload_file_error.is_none() {
+ // Flush the changes to disk so that we are sure they are there.
+ if let Err(e) = temp_file.flush().await {
+ save_upload_file_error = Some(RuntimeError::IoError(
+ "Failed to flush all the file writes to disk".into(),
+ e,
+ ));
+ }
+ }
+
+ // Drop the file expcitly here because IF there is an error when writing to the
+ // temp file, we won't be able to remove as per the comment in `tokio::fs::remove_file`
+ // > Note that there is no guarantee that the file is immediately deleted
+ // > (e.g. depending on platform, other open file descriptors may prevent immediate removal).
+ drop(temp_file);
+
+ // If there was an error during uploading.
+ if let Some(e) = save_upload_file_error {
+ // If there was an error when writing the file to disk, remove it and return
+ // the error that was encountered.
+ let _ = tokio::fs::remove_file(temp_path).await;
+ return Err(e);
+ }
+
+ // There isn't a way to get notified when a request is cancelled
+ // by the user in actix it seems. References:
+ // - https://github.com/actix/actix-web/issues/1313
+ // - https://github.com/actix/actix-web/discussions/3011
+ // Therefore, we are relying on the fact that the web UI uploads a
+ // hash of the file to determine if it was completed uploaded or not.
+ if let Some(hasher) = hasher {
+ if let Some(expected_hash) = file_checksum.as_ref().map(|f| f.get_hash()) {
+ let actual_hash = hex::encode(hasher.finalize());
+ if actual_hash != expected_hash {
+ warn!("The expected file hash {expected_hash} did not match the calculated hash of {actual_hash}. This can be caused if a file upload was aborted.");
+ let _ = tokio::fs::remove_file(&temp_path).await;
+ return Err(RuntimeError::UploadHashMismatchError);
+ }
+ }
+ }
+
+ info!("File upload successful to {temp_path:?}. Moving to {file_path:?}",);
+ if let Err(e) = tokio::fs::rename(&temp_path, &file_path).await {
+ let _ = tokio::fs::remove_file(&temp_path).await;
+ return Err(RuntimeError::IoError(
+ format!("Failed to move temporary file {temp_path:?} to {file_path:?}",),
+ e,
+ ));
+ }
Ok(written_len)
}
-/// Handles a single field in a multipart form
-async fn handle_multipart(
- mut field: actix_multipart::Field,
- path: PathBuf,
+struct HandleMultipartOpts<'a> {
overwrite_files: bool,
allow_mkdir: bool,
allow_hidden_paths: bool,
allow_symlinks: bool,
+ file_hash: Option<&'a FileHash>,
+ upload_directory: Option<&'a PathBuf>,
+}
+
+/// Handles a single field in a multipart form
+async fn handle_multipart(
+ mut field: actix_multipart::Field,
+ path: PathBuf,
+ opts: HandleMultipartOpts<'_>,
) -> Result<u64, RuntimeError> {
+ let HandleMultipartOpts {
+ overwrite_files,
+ allow_mkdir,
+ allow_hidden_paths,
+ allow_symlinks,
+ file_hash,
+ upload_directory,
+ } = opts;
let field_name = field.name().expect("No name field found").to_string();
match tokio::fs::metadata(&path).await {
@@ -168,7 +301,14 @@ async fn handle_multipart(
}
}
- save_file(field, path.join(filename_path), overwrite_files).await
+ save_file(
+ &mut field,
+ path.join(filename_path),
+ overwrite_files,
+ file_hash,
+ upload_directory,
+ )
+ .await
}
/// Query parameters used by upload and rm APIs
@@ -218,16 +358,44 @@ pub async fn upload_file(
)),
}?;
+ let upload_directory = conf.temp_upload_directory.as_ref();
+
+ let file_hash = if let (Some(hash), Some(hash_function)) = (
+ req.headers()
+ .get("X-File-Hash")
+ .and_then(|h| h.to_str().ok()),
+ req.headers()
+ .get("X-File-Hash-Function")
+ .and_then(|h| h.to_str().ok()),
+ ) {
+ match hash_function.to_ascii_uppercase().as_str() {
+ "SHA256" => Some(FileHash::SHA256(hash.to_string())),
+ "SHA512" => Some(FileHash::SHA512(hash.to_string())),
+ sha => {
+ return Err(RuntimeError::InvalidHttpRequestError(format!(
+ "Invalid header value found for 'X-File-Hash-Function'. Supported values are SHA256 or SHA512. Found {sha}.",
+ )))
+ }
+ }
+ } else {
+ None
+ };
+
+ let hash_ref = file_hash.as_ref();
actix_multipart::Multipart::new(req.headers(), payload)
.map_err(|x| RuntimeError::MultipartError(x.to_string()))
.and_then(|field| {
handle_multipart(
field,
non_canonicalized_target_dir.clone(),
- conf.overwrite_files,
- conf.mkdir_enabled,
- conf.show_hidden,
- !conf.no_symlinks,
+ HandleMultipartOpts {
+ overwrite_files: conf.overwrite_files,
+ allow_mkdir: conf.mkdir_enabled,
+ allow_hidden_paths: conf.show_hidden,
+ allow_symlinks: !conf.no_symlinks,
+ file_hash: hash_ref,
+ upload_directory,
+ },
)
})
.try_collect::<Vec<u64>>()