mirror of
https://github.com/mikedilger/chorus.git
synced 2026-03-04 06:36:27 +00:00
FileStore (and HashOutput)
This commit is contained in:
parent
bcb7fc84b6
commit
853fefbdf6
61
src/filestore/hash_output.rs
Normal file
61
src/filestore/hash_output.rs
Normal file
@ -0,0 +1,61 @@
|
||||
use crate::{ChorusError, Error};
|
||||
use std::fmt;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
/// A simple type for a SHA-256 hash output of 32 bytes
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
||||
pub struct HashOutput([u8; 32]);
|
||||
|
||||
impl HashOutput {
|
||||
pub fn from_engine(engine: bitcoin_hashes::sha256::HashEngine) -> HashOutput {
|
||||
use bitcoin_hashes::{sha256, Hash};
|
||||
let hashvalue = sha256::Hash::from_engine(engine);
|
||||
HashOutput(hashvalue.as_byte_array()[0..32].try_into().unwrap())
|
||||
}
|
||||
|
||||
pub fn from_hex(input: &str) -> Result<HashOutput, Error> {
|
||||
let bytes = hex::decode(input)?;
|
||||
if bytes.len() == 32 {
|
||||
Ok(HashOutput(bytes.try_into().unwrap()))
|
||||
} else {
|
||||
Err(
|
||||
ChorusError::General("HashOutput::from_hex() got wrong length string".to_string())
|
||||
.into(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn to_pathbuf<P: AsRef<Path>>(&self, base: P) -> PathBuf {
|
||||
let s = hex::encode(self.0);
|
||||
let mut output: PathBuf = PathBuf::new();
|
||||
output.push(base);
|
||||
output.push(&s[0..=1]);
|
||||
output.push(&s[2..=3]);
|
||||
output.push(&s[4..]);
|
||||
output
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for HashOutput {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "{}", hex::encode(self.0))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_hash_output_to_pathbuf() {
|
||||
let hash = HashOutput::from_hex(
|
||||
"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
&format!("{}", hash.to_pathbuf("/tmp").display()),
|
||||
"/tmp/e3/b0/c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
|
||||
);
|
||||
}
|
||||
}
|
||||
137
src/filestore/mod.rs
Normal file
137
src/filestore/mod.rs
Normal file
@ -0,0 +1,137 @@
|
||||
use crate::error::Error;
|
||||
use futures::TryStreamExt;
|
||||
use http_body_util::combinators::BoxBody;
|
||||
use http_body_util::{BodyDataStream, BodyExt, StreamBody};
|
||||
use hyper::body::{Bytes, Frame};
|
||||
use std::fs::Metadata;
|
||||
use std::path::{Path, PathBuf};
|
||||
use tokio::fs;
|
||||
use tokio::fs::File;
|
||||
use tokio_util::io::{InspectReader, ReaderStream, StreamReader};
|
||||
|
||||
mod hash_output;
|
||||
pub use hash_output::HashOutput;
|
||||
|
||||
pub struct FileStore {
|
||||
pub base: PathBuf,
|
||||
pub temp: PathBuf,
|
||||
}
|
||||
|
||||
impl FileStore {
|
||||
pub async fn new<P: AsRef<Path>>(base: P) -> Result<FileStore, Error> {
|
||||
let base = base.as_ref().to_owned();
|
||||
|
||||
let temp = {
|
||||
let mut temp = base.clone();
|
||||
temp.push("temp");
|
||||
temp
|
||||
};
|
||||
|
||||
if !fs::try_exists(&temp).await? {
|
||||
fs::create_dir_all(&temp).await?;
|
||||
}
|
||||
|
||||
Ok(FileStore { base, temp })
|
||||
}
|
||||
|
||||
fn tmpfile(&self) -> PathBuf {
|
||||
let mut tf = self.temp.clone();
|
||||
let nonce = textnonce::TextNonce::new();
|
||||
tf.push(&nonce.0);
|
||||
tf
|
||||
}
|
||||
|
||||
/// Store a file in storage, streamed from a hyper BoxBody
|
||||
///
|
||||
/// Returns it's HashOutput by which it can be later retrieved or deleted.
|
||||
pub async fn store(&self, data: BoxBody<Bytes, Error>) -> Result<HashOutput, Error> {
|
||||
use bitcoin_hashes::sha256;
|
||||
use std::io::Write; // for hash_engine.write_all()
|
||||
|
||||
// We will download into a temporary file (as we don't know the hash yet)
|
||||
let temppathbuf = self.tmpfile();
|
||||
let mut tempfile = File::options()
|
||||
.create(true)
|
||||
.truncate(true)
|
||||
.write(true)
|
||||
.open(&temppathbuf)
|
||||
.await?;
|
||||
|
||||
// Convert the BoxBody into a Data Stream
|
||||
let body_stream = BodyDataStream::new(data);
|
||||
|
||||
// Convert the Data Stream into something that is AsyncRead (over bytes)
|
||||
let stream_reader = StreamReader::new(body_stream);
|
||||
|
||||
// Wrap this in something that lets us inspect the content so we can hash it
|
||||
// as it comes in
|
||||
let mut hash_engine = sha256::HashEngine::default();
|
||||
let mut inspect_reader = InspectReader::new(stream_reader, |bytes: &[u8]| {
|
||||
hash_engine.write_all(bytes).unwrap(); // I don't think hashing will fail
|
||||
});
|
||||
|
||||
// Copy the data into the tempfile (hashing as we go)
|
||||
tokio::io::copy(&mut inspect_reader, &mut tempfile).await?;
|
||||
|
||||
// Finish the hash
|
||||
let hash = HashOutput::from_engine(hash_engine);
|
||||
|
||||
// Compute the proper path
|
||||
let pathbuf = hash.to_pathbuf(&self.base);
|
||||
|
||||
// If it already exists
|
||||
if fs::try_exists(&pathbuf).await? {
|
||||
// Just clean up
|
||||
fs::remove_file(&temppathbuf).await?;
|
||||
|
||||
return Ok(hash);
|
||||
}
|
||||
|
||||
// Make the parent directory
|
||||
fs::create_dir_all(pathbuf.parent().unwrap()).await?;
|
||||
|
||||
// Move the file
|
||||
fs::rename(&temppathbuf, &pathbuf).await?;
|
||||
|
||||
Ok(hash)
|
||||
}
|
||||
|
||||
/// Retrieve a file from storage by its HashOutput, streamed to a hyper BoxBoxy
|
||||
pub async fn retrieve(&self, hash: HashOutput) -> Result<BoxBody<Bytes, Error>, Error> {
|
||||
// Compute the path
|
||||
let pathbuf = hash.to_pathbuf(&self.base);
|
||||
|
||||
// Open the file
|
||||
let file = File::open(&pathbuf).await?;
|
||||
|
||||
// Convert the AsyncRead file into a Stream
|
||||
let reader_stream = ReaderStream::new(file);
|
||||
|
||||
// Convert the Stream into a Body
|
||||
let stream_body = StreamBody::new(reader_stream.map_ok(Frame::data));
|
||||
|
||||
// Box the body, mapping the error
|
||||
let boxed_body = BodyExt::map_err(stream_body, |e| e.into()).boxed();
|
||||
|
||||
Ok(boxed_body)
|
||||
}
|
||||
|
||||
/// Check if a file exists and provide it's metadata (including .len())
|
||||
pub async fn metadata(&self, hash: HashOutput) -> Result<Metadata, Error> {
|
||||
// Compute the path
|
||||
let pathbuf = hash.to_pathbuf(&self.base);
|
||||
|
||||
Ok(tokio::fs::metadata(&pathbuf).await?)
|
||||
}
|
||||
|
||||
/// Delete a file from storage by its HashOutput
|
||||
pub async fn delete(&self, hash: HashOutput) -> Result<(), Error> {
|
||||
// Compute the path
|
||||
let pathbuf = hash.to_pathbuf(&self.base);
|
||||
|
||||
// Delete the file
|
||||
tokio::fs::remove_file(&pathbuf).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@ -1,6 +1,7 @@
|
||||
pub mod config;
|
||||
pub mod counting_stream;
|
||||
pub mod error;
|
||||
pub mod filestore;
|
||||
pub mod globals;
|
||||
pub mod ip;
|
||||
pub mod nostr;
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user