From 853fefbdf6d995b39c52ae85c6b1bf52fd4d8ad5 Mon Sep 17 00:00:00 2001 From: Mike Dilger Date: Sat, 16 Nov 2024 20:15:43 +1300 Subject: [PATCH] FileStore (and HashOutput) --- src/filestore/hash_output.rs | 61 ++++++++++++++++ src/filestore/mod.rs | 137 +++++++++++++++++++++++++++++++++++ src/lib.rs | 1 + 3 files changed, 199 insertions(+) create mode 100644 src/filestore/hash_output.rs create mode 100644 src/filestore/mod.rs diff --git a/src/filestore/hash_output.rs b/src/filestore/hash_output.rs new file mode 100644 index 0000000..dc59480 --- /dev/null +++ b/src/filestore/hash_output.rs @@ -0,0 +1,61 @@ +use crate::{ChorusError, Error}; +use std::fmt; +use std::path::{Path, PathBuf}; + +/// A simple type for a SHA-256 hash output of 32 bytes +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct HashOutput([u8; 32]); + +impl HashOutput { + pub fn from_engine(engine: bitcoin_hashes::sha256::HashEngine) -> HashOutput { + use bitcoin_hashes::{sha256, Hash}; + let hashvalue = sha256::Hash::from_engine(engine); + HashOutput(hashvalue.as_byte_array()[0..32].try_into().unwrap()) + } + + pub fn from_hex(input: &str) -> Result { + let bytes = hex::decode(input)?; + if bytes.len() == 32 { + Ok(HashOutput(bytes.try_into().unwrap())) + } else { + Err( + ChorusError::General("HashOutput::from_hex() got wrong length string".to_string()) + .into(), + ) + } + } + + pub fn to_pathbuf>(&self, base: P) -> PathBuf { + let s = hex::encode(self.0); + let mut output: PathBuf = PathBuf::new(); + output.push(base); + output.push(&s[0..=1]); + output.push(&s[2..=3]); + output.push(&s[4..]); + output + } +} + +impl fmt::Display for HashOutput { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", hex::encode(self.0)) + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_hash_output_to_pathbuf() { + let hash = HashOutput::from_hex( + "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", + ) + .unwrap(); + + assert_eq!( + &format!("{}", hash.to_pathbuf("/tmp").display()), + "/tmp/e3/b0/c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" + ); + } +} diff --git a/src/filestore/mod.rs b/src/filestore/mod.rs new file mode 100644 index 0000000..7fe4a5d --- /dev/null +++ b/src/filestore/mod.rs @@ -0,0 +1,137 @@ +use crate::error::Error; +use futures::TryStreamExt; +use http_body_util::combinators::BoxBody; +use http_body_util::{BodyDataStream, BodyExt, StreamBody}; +use hyper::body::{Bytes, Frame}; +use std::fs::Metadata; +use std::path::{Path, PathBuf}; +use tokio::fs; +use tokio::fs::File; +use tokio_util::io::{InspectReader, ReaderStream, StreamReader}; + +mod hash_output; +pub use hash_output::HashOutput; + +pub struct FileStore { + pub base: PathBuf, + pub temp: PathBuf, +} + +impl FileStore { + pub async fn new>(base: P) -> Result { + let base = base.as_ref().to_owned(); + + let temp = { + let mut temp = base.clone(); + temp.push("temp"); + temp + }; + + if !fs::try_exists(&temp).await? { + fs::create_dir_all(&temp).await?; + } + + Ok(FileStore { base, temp }) + } + + fn tmpfile(&self) -> PathBuf { + let mut tf = self.temp.clone(); + let nonce = textnonce::TextNonce::new(); + tf.push(&nonce.0); + tf + } + + /// Store a file in storage, streamed from a hyper BoxBody + /// + /// Returns it's HashOutput by which it can be later retrieved or deleted. + pub async fn store(&self, data: BoxBody) -> Result { + use bitcoin_hashes::sha256; + use std::io::Write; // for hash_engine.write_all() + + // We will download into a temporary file (as we don't know the hash yet) + let temppathbuf = self.tmpfile(); + let mut tempfile = File::options() + .create(true) + .truncate(true) + .write(true) + .open(&temppathbuf) + .await?; + + // Convert the BoxBody into a Data Stream + let body_stream = BodyDataStream::new(data); + + // Convert the Data Stream into something that is AsyncRead (over bytes) + let stream_reader = StreamReader::new(body_stream); + + // Wrap this in something that lets us inspect the content so we can hash it + // as it comes in + let mut hash_engine = sha256::HashEngine::default(); + let mut inspect_reader = InspectReader::new(stream_reader, |bytes: &[u8]| { + hash_engine.write_all(bytes).unwrap(); // I don't think hashing will fail + }); + + // Copy the data into the tempfile (hashing as we go) + tokio::io::copy(&mut inspect_reader, &mut tempfile).await?; + + // Finish the hash + let hash = HashOutput::from_engine(hash_engine); + + // Compute the proper path + let pathbuf = hash.to_pathbuf(&self.base); + + // If it already exists + if fs::try_exists(&pathbuf).await? { + // Just clean up + fs::remove_file(&temppathbuf).await?; + + return Ok(hash); + } + + // Make the parent directory + fs::create_dir_all(pathbuf.parent().unwrap()).await?; + + // Move the file + fs::rename(&temppathbuf, &pathbuf).await?; + + Ok(hash) + } + + /// Retrieve a file from storage by its HashOutput, streamed to a hyper BoxBoxy + pub async fn retrieve(&self, hash: HashOutput) -> Result, Error> { + // Compute the path + let pathbuf = hash.to_pathbuf(&self.base); + + // Open the file + let file = File::open(&pathbuf).await?; + + // Convert the AsyncRead file into a Stream + let reader_stream = ReaderStream::new(file); + + // Convert the Stream into a Body + let stream_body = StreamBody::new(reader_stream.map_ok(Frame::data)); + + // Box the body, mapping the error + let boxed_body = BodyExt::map_err(stream_body, |e| e.into()).boxed(); + + Ok(boxed_body) + } + + /// Check if a file exists and provide it's metadata (including .len()) + pub async fn metadata(&self, hash: HashOutput) -> Result { + // Compute the path + let pathbuf = hash.to_pathbuf(&self.base); + + Ok(tokio::fs::metadata(&pathbuf).await?) + } + + /// Delete a file from storage by its HashOutput + pub async fn delete(&self, hash: HashOutput) -> Result<(), Error> { + // Compute the path + let pathbuf = hash.to_pathbuf(&self.base); + + // Delete the file + tokio::fs::remove_file(&pathbuf).await?; + + Ok(()) + } +} diff --git a/src/lib.rs b/src/lib.rs index 859750b..e3bb337 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,7 @@ pub mod config; pub mod counting_stream; pub mod error; +pub mod filestore; pub mod globals; pub mod ip; pub mod nostr;