diff --git a/.gitignore b/.gitignore index 14948125..67d6c44e 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,4 @@ debug/ .vscode venv **/*.env +**/uv.lock diff --git a/cas_client/src/caching_client.rs b/cas_client/src/caching_client.rs index 1b84c667..9f43bda4 100644 --- a/cas_client/src/caching_client.rs +++ b/cas_client/src/caching_client.rs @@ -36,7 +36,7 @@ impl UploadClient for CachingClient ReconstructionClient for CachingClient { - async fn get_file(&self, hash: &MerkleHash, writer: &mut Box) -> Result<()> { + async fn get_file(&self, hash: &MerkleHash, writer: &mut Box) -> Result { /* let file_info = self.reconstruct(hash, None).await?; @@ -60,7 +60,7 @@ impl ReconstructionClient for Caching offset: u64, length: u64, writer: &mut Box, - ) -> Result<()> { + ) -> Result { todo!() } } diff --git a/cas_client/src/interface.rs b/cas_client/src/interface.rs index 690e9acf..d61f99d3 100644 --- a/cas_client/src/interface.rs +++ b/cas_client/src/interface.rs @@ -42,7 +42,7 @@ pub trait UploadClient { #[async_trait] pub trait ReconstructionClient { /// Get a entire file by file hash. - async fn get_file(&self, hash: &MerkleHash, writer: &mut Box) -> Result<()>; + async fn get_file(&self, hash: &MerkleHash, writer: &mut Box) -> Result; /// Get a entire file by file hash at a specific bytes range. async fn get_file_byte_range( @@ -51,7 +51,7 @@ pub trait ReconstructionClient { offset: u64, length: u64, writer: &mut Box, - ) -> Result<()>; + ) -> Result; } pub trait Client: UploadClient + ReconstructionClient {} diff --git a/cas_client/src/remote_client.rs b/cas_client/src/remote_client.rs index c5698d27..a9a02950 100644 --- a/cas_client/src/remote_client.rs +++ b/cas_client/src/remote_client.rs @@ -102,13 +102,13 @@ impl UploadClient for RemoteClient { #[async_trait] impl ReconstructionClient for RemoteClient { - async fn get_file(&self, hash: &MerkleHash, writer: &mut Box) -> Result<()> { + async fn get_file(&self, hash: &MerkleHash, writer: &mut Box) -> Result { // get manifest of xorbs to download let manifest = self.reconstruct(hash, None).await?; - self.get_ranges(manifest, None, writer).await?; + let bytes_downloaded = self.get_ranges(manifest, None, writer).await?; - Ok(()) + Ok(bytes_downloaded) } #[allow(unused_variables)] @@ -118,7 +118,7 @@ impl ReconstructionClient for RemoteClient { offset: u64, length: u64, writer: &mut Box, - ) -> Result<()> { + ) -> Result { todo!() } } @@ -203,7 +203,7 @@ impl RemoteClient { reconstruction_response: QueryReconstructionResponse, _byte_range: Option<(u64, u64)>, writer: &mut Box, - ) -> Result { + ) -> Result { let info = reconstruction_response.reconstruction; let total_len = info.iter().fold(0, |acc, x| acc + x.unpacked_length); let futs = info @@ -215,7 +215,8 @@ impl RemoteClient { .map_err(|e| CasClientError::InternalError(anyhow!("join error {e}")))??; writer.write_all(&piece)?; } - Ok(total_len as usize) + // Todo: return the bytes which were read from the cache for telemetry + Ok(total_len as u64) } } diff --git a/data/src/bin/example.rs b/data/src/bin/example.rs index f3ae748e..1dc84bdd 100644 --- a/data/src/bin/example.rs +++ b/data/src/bin/example.rs @@ -213,7 +213,7 @@ async fn smudge(mut reader: impl Read, writer: &mut Box) -> Re let mut input = String::new(); reader.read_to_string(&mut input)?; - let pointer_file = PointerFile::init_from_string(&input, ""); + let mut pointer_file = PointerFile::init_from_string(&input, ""); // not a pointer file, leave it as it is. if !pointer_file.is_valid() { @@ -223,7 +223,7 @@ async fn smudge(mut reader: impl Read, writer: &mut Box) -> Re let translator = PointerFileTranslator::new(default_smudge_config()?).await?; translator - .smudge_file_from_pointer(&pointer_file, writer, None) + .smudge_file_from_pointer(&mut pointer_file, writer, None) .await?; Ok(()) diff --git a/data/src/clean.rs b/data/src/clean.rs index ab9242d1..ed1b275e 100644 --- a/data/src/clean.rs +++ b/data/src/clean.rs @@ -21,6 +21,7 @@ use std::mem::take; use std::ops::DerefMut; use std::path::{Path, PathBuf}; use std::sync::Arc; +use std::time::Instant; use tokio::sync::mpsc::error::TryRecvError; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::Mutex; @@ -76,6 +77,9 @@ pub struct Cleaner { // Auxiliary info file_name: Option, + + // Telemetry + start: Instant, } impl Cleaner { @@ -113,6 +117,7 @@ impl Cleaner { tracking_info: Mutex::new(Default::default()), small_file_buffer: Mutex::new(Some(Vec::with_capacity(small_file_threshold))), file_name: file_name.map(|f| f.to_owned()), + start: Instant::now(), }); Self::run(cleaner.clone(), chunk_c).await; @@ -239,8 +244,9 @@ impl Cleaner { Ok(false) } - async fn dedup(&self, chunks: &[ChunkYieldType]) -> Result<()> { + async fn dedup(&self, chunks: &[ChunkYieldType]) -> Result { info!("Dedup {} chunks", chunks.len()); + let mut total_compressed_bytes = 0; let mut tracking_info = self.tracking_info.lock().await; let enable_global_dedup = self.enable_global_dedup_queries; @@ -463,13 +469,14 @@ impl Cleaner { tracking_info.cas_data.data.extend(bytes); if tracking_info.cas_data.data.len() > TARGET_CAS_BLOCK_SIZE { - let cas_hash = register_new_cas_block( + let (cas_hash, compressed_bytes) = register_new_cas_block( &mut tracking_info.cas_data, &self.shard_manager, &self.cas, &self.cas_prefix, ) .await?; + total_compressed_bytes += compressed_bytes; for i in take(&mut tracking_info.current_cas_file_info_indices) { tracking_info.file_info[i].cas_hash = cas_hash; @@ -483,7 +490,7 @@ impl Cleaner { } } - Ok(()) + Ok(total_compressed_bytes) } async fn finish(&self) -> Result<()> { @@ -516,7 +523,8 @@ impl Cleaner { Ok(()) } - async fn summarize_dedup_info(&self) -> Result<(MerkleHash, u64)> { + async fn summarize_dedup_info(&self) -> Result<(MerkleHash, u64, u64)> { + let mut total_compressed_bytes = 0; let mut tracking_info = self.tracking_info.lock().await; let file_hash = file_node_hash( @@ -577,13 +585,14 @@ impl Cleaner { if cas_data_accumulator.data.len() >= TARGET_CAS_BLOCK_SIZE { let mut new_cas_data = take(cas_data_accumulator.deref_mut()); drop(cas_data_accumulator); // Release the lock. - register_new_cas_block( + let (_cas_hash, compressed_bytes) = register_new_cas_block( &mut new_cas_data, &self.shard_manager, &self.cas, &self.cas_prefix, ) .await?; + total_compressed_bytes += compressed_bytes; } else { drop(cas_data_accumulator); } @@ -593,11 +602,11 @@ impl Cleaner { *tracking_info = Default::default(); - Ok((file_hash, file_size)) + Ok((file_hash, file_size, total_compressed_bytes)) } async fn to_pointer_file(&self) -> Result { - let (hash, filesize) = self.summarize_dedup_info().await?; + let (hash, filesize, compressed_size) = self.summarize_dedup_info().await?; let pointer_file = PointerFile::init_from_info( &self .file_name @@ -606,6 +615,8 @@ impl Cleaner { .unwrap_or_default(), &hash.hex(), filesize, + compressed_size, + self.start.elapsed(), ); Ok(pointer_file.to_string()) } diff --git a/data/src/data_processing.rs b/data/src/data_processing.rs index 69017db1..b59095db 100644 --- a/data/src/data_processing.rs +++ b/data/src/data_processing.rs @@ -5,7 +5,7 @@ use crate::errors::*; use crate::metrics::FILTER_CAS_BYTES_PRODUCED; use crate::remote_shard_interface::RemoteShardInterface; use crate::shard_interface::create_shard_manager; -use crate::PointerFile; +use crate::{PointerFile, PointerFileTelemetry}; use cas_client::Client; use mdb_shard::cas_structs::{CASChunkSequenceEntry, CASChunkSequenceHeader, MDBCASInfo}; use mdb_shard::file_structs::MDBFileInfo; @@ -17,6 +17,7 @@ use std::mem::take; use std::ops::DerefMut; use std::path::Path; use std::sync::Arc; +use std::time::Instant; use tokio::sync::Mutex; #[derive(Default, Debug)] @@ -59,6 +60,9 @@ pub struct PointerFileTranslator { /* ----- Deduped data shared across files ----- */ global_cas_data: Arc>, + // Telemetry + /* ----- Telemetry ----- */ + pub start: Instant, } // Constructors @@ -97,6 +101,7 @@ impl PointerFileTranslator { remote_shards, cas: cas_client, global_cas_data: Default::default(), + start: Instant::now(), }) } } @@ -208,7 +213,7 @@ pub(crate) async fn register_new_cas_block( shard_manager: &Arc, cas: &Arc, cas_prefix: &str, -) -> Result { +) -> Result<(MerkleHash, u64)> { let cas_hash = cas_node_hash(&cas_data.chunks[..]); let raw_bytes_len = cas_data.data.len(); @@ -283,19 +288,28 @@ pub(crate) async fn register_new_cas_block( cas_data.chunks.clear(); cas_data.pending_file_info.clear(); - Ok(cas_hash) + Ok((cas_hash, compressed_bytes_len as u64)) } /// Smudge operations impl PointerFileTranslator { pub async fn smudge_file_from_pointer( &self, - pointer: &PointerFile, + pointer_file: &mut PointerFile, writer: &mut Box, range: Option<(usize, usize)>, ) -> Result<()> { - self.smudge_file_from_hash(&pointer.hash()?, writer, range) - .await + let start = Instant::now(); + let bytes_downloaded = self + .smudge_file_from_hash(&pointer_file.hash()?, writer, range) + .await?; + + pointer_file.telemetry = Some(PointerFileTelemetry { + latency: Some(start.elapsed()), + network_bytes: Some(bytes_downloaded), + }); + + Ok(()) } pub async fn smudge_file_from_hash( @@ -303,9 +317,8 @@ impl PointerFileTranslator { file_id: &MerkleHash, writer: &mut Box, _range: Option<(usize, usize)>, - ) -> Result<()> { - self.cas.get_file(file_id, writer).await?; - - Ok(()) + ) -> Result { + let bytes_downloaded = self.cas.get_file(file_id, writer).await?; + Ok(bytes_downloaded) } } diff --git a/data/src/lib.rs b/data/src/lib.rs index 0919c3c7..9ddd9438 100644 --- a/data/src/lib.rs +++ b/data/src/lib.rs @@ -17,4 +17,4 @@ mod test_utils; pub use constants::SMALL_FILE_THRESHOLD; pub use data_processing::PointerFileTranslator; -pub use pointer_file::PointerFile; +pub use pointer_file::{PointerFile, PointerFileTelemetry}; diff --git a/data/src/pointer_file.rs b/data/src/pointer_file.rs index bdf4abea..784fd011 100644 --- a/data/src/pointer_file.rs +++ b/data/src/pointer_file.rs @@ -2,6 +2,7 @@ use crate::constants::POINTER_FILE_LIMIT; use merklehash::{DataHashHexParseError, MerkleHash}; use static_assertions::const_assert; +use std::time::Duration; use std::{collections::BTreeMap, fs, path::Path}; use toml::Value; use tracing::{debug, error, warn}; @@ -9,6 +10,14 @@ use tracing::{debug, error, warn}; const HEADER_PREFIX: &str = "# xet version "; const CURRENT_VERSION: &str = "0"; +#[derive(Debug, Clone, PartialEq, Eq, Default)] +pub struct PointerFileTelemetry { + /// The transfer of CAS bytes due to this file + pub network_bytes: Option, + + /// The duration for cleaning or smudging this file + pub latency: Option, +} /// A struct that wraps a Xet pointer file. /// Xet pointer file format is a TOML file, /// and the first line must be of the form "# xet version " @@ -33,6 +42,9 @@ pub struct PointerFile { /// The size of the file pointed to by this pointer file filesize: u64, + + /// Optional telemetry information that is never written to the file + pub telemetry: Option, } impl PointerFile { @@ -59,6 +71,7 @@ impl PointerFile { is_valid, hash, filesize, + telemetry: None, }; } @@ -73,6 +86,7 @@ impl PointerFile { is_valid, hash, filesize, + telemetry: None, }; } @@ -117,6 +131,7 @@ impl PointerFile { is_valid, hash, filesize, + telemetry: None, } } @@ -134,6 +149,7 @@ impl PointerFile { is_valid: false, hash: empty_string, filesize: 0, + telemetry: None, }; let Ok(file_meta) = fs::metadata(path).map_err(|e| { @@ -156,13 +172,23 @@ impl PointerFile { PointerFile::init_from_string(&contents, path) } - pub fn init_from_info(path: &str, hash: &str, filesize: u64) -> Self { + pub fn init_from_info( + path: &str, + hash: &str, + filesize: u64, + compressed_size: u64, + latency: Duration, + ) -> Self { Self { version_string: CURRENT_VERSION.to_string(), path: path.to_string(), is_valid: true, hash: hash.to_string(), filesize, + telemetry: Some(PointerFileTelemetry { + network_bytes: Some(compressed_size), + latency: Some(latency), + }), } } @@ -194,6 +220,20 @@ impl PointerFile { pub fn filesize(&self) -> u64 { self.filesize } + + pub fn compressed_size(&self) -> u64 { + self.telemetry + .as_ref() + .and_then(|t| t.network_bytes) + .unwrap_or(0) + } + + pub fn latency(&self) -> f64 { + self.telemetry + .as_ref() + .and_then(|t| t.latency.map(|dur| dur.as_secs_f64())) + .unwrap_or(0f64) + } } pub fn is_xet_pointer_file(data: &[u8]) -> bool { @@ -322,4 +362,6 @@ mod tests { let test = PointerFile::init_from_string(&test_contents, &empty_string); assert!(!test.is_valid()); // new version is not valid } + + // todo add init_from_info test } diff --git a/data/src/test_utils/local_test_client.rs b/data/src/test_utils/local_test_client.rs index a1991393..3e739ab0 100644 --- a/data/src/test_utils/local_test_client.rs +++ b/data/src/test_utils/local_test_client.rs @@ -53,7 +53,8 @@ impl ReconstructionClient for LocalTestClient { &self, hash: &MerkleHash, writer: &mut Box, - ) -> Result<(), CasClientError> { + ) -> Result { + let mut bytes_downloaded = 0u64; let Some((file_info, _)) = self .shard_manager .get_file_reconstruction_info(hash) @@ -76,10 +77,11 @@ impl ReconstructionClient for LocalTestClient { return Err(CasClientError::InvalidRange); }; + bytes_downloaded += one_range.len() as u64; writer.write_all(&one_range)?; } - Ok(()) + Ok(bytes_downloaded) } #[allow(unused_variables)] @@ -89,7 +91,7 @@ impl ReconstructionClient for LocalTestClient { offset: u64, length: u64, writer: &mut Box, - ) -> Result<(), CasClientError> { + ) -> Result { todo!() } } diff --git a/hf_xet/Cargo.lock b/hf_xet/Cargo.lock index 260ccaee..6fed173d 100644 --- a/hf_xet/Cargo.lock +++ b/hf_xet/Cargo.lock @@ -442,6 +442,7 @@ version = "0.1.0" dependencies = [ "anyhow", "bincode", + "blake3", "bytes", "cas_types", "http 1.1.0", diff --git a/hf_xet/pyproject.toml b/hf_xet/pyproject.toml index c42a592a..a4b3bf62 100644 --- a/hf_xet/pyproject.toml +++ b/hf_xet/pyproject.toml @@ -11,6 +11,9 @@ classifiers = [ "Programming Language :: Python :: Implementation :: PyPy", ] dynamic = ["version"] +dependencies = [ + "maturin>=1.7.4", +] [project.optional-dependencies] tests = [ "pytest", diff --git a/hf_xet/src/data_client.rs b/hf_xet/src/data_client.rs index 77234c11..b86089d1 100644 --- a/hf_xet/src/data_client.rs +++ b/hf_xet/src/data_client.rs @@ -1,7 +1,7 @@ use crate::config::default_config; use utils::auth::TokenRefresher; use data::errors::DataProcessingError; -use data::{errors, PointerFile, PointerFileTranslator}; +use data::{errors, PointerFile, PointerFileTelemetry, PointerFileTranslator}; use parutils::{tokio_par_for_each, ParallelError}; use std::fs; use std::fs::File; @@ -56,7 +56,7 @@ pub async fn download_async( endpoint: Option, token_info: Option<(String, u64)>, token_refresher: Option>, -) -> errors::Result> { +) -> errors::Result> { let config = default_config( endpoint.unwrap_or(DEFAULT_CAS_ENDPOINT.to_string()), token_info, @@ -64,7 +64,7 @@ pub async fn download_async( )?; let processor = Arc::new(PointerFileTranslator::new(config).await?); let processor = &processor; - let paths = tokio_par_for_each( + let pfs = tokio_par_for_each( pointer_files, MAX_CONCURRENT_DOWNLOADS, |pointer_file, _| async move { @@ -78,7 +78,7 @@ pub async fn download_async( ParallelError::TaskError(e) => e, })?; - Ok(paths) + Ok(pfs) } async fn clean_file(processor: &PointerFileTranslator, f: String) -> errors::Result { @@ -96,7 +96,6 @@ async fn clean_file(processor: &PointerFileTranslator, f: String) -> errors::Res handle.add_bytes(read_buf[0..bytes].to_vec()).await?; } - let pf_str = handle.result().await?; let pf = PointerFile::init_from_string(&pf_str, path.to_str().unwrap()); Ok(pf) @@ -105,7 +104,7 @@ async fn clean_file(processor: &PointerFileTranslator, f: String) -> errors::Res async fn smudge_file( proc: &PointerFileTranslator, pointer_file: &PointerFile, -) -> errors::Result { +) -> errors::Result { let path = PathBuf::from(pointer_file.path()); if let Some(parent_dir) = path.parent() { fs::create_dir_all(parent_dir)?; @@ -113,7 +112,17 @@ async fn smudge_file( let mut f: Box = Box::new(File::create(&path)?); proc.smudge_file_from_pointer(pointer_file, &mut f, None) .await?; - Ok(pointer_file.path().to_string()) + let mut pointer_file_clone = pointer_file.clone(); + match pointer_file_clone.telemetry + { + None => pointer_file_clone.telemetry = Some(PointerFileTelemetry { + latency: Some(proc.start.elapsed()), + compressed_size: None + }), + Some(ref mut telemetry) => telemetry.latency = Some(proc.start.elapsed()), + } + + Ok(pointer_file_clone) } #[cfg(test)] diff --git a/hf_xet/src/lib.rs b/hf_xet/src/lib.rs index b6c6ef93..e4a000b9 100644 --- a/hf_xet/src/lib.rs +++ b/hf_xet/src/lib.rs @@ -10,8 +10,32 @@ use pyo3::prelude::*; use pyo3::pyfunction; use std::fmt::Debug; use std::sync::Arc; +use std::time::Duration; use token_refresh::WrappedTokenRefresher; +// This will be the information that will finally be sent to the telemetry endpoint +// +// #[pyclass] +// #[derive(Clone, Debug)] +// pub struct UploadTelemetry { +// #[pyo3(get)] +// total_time_ms: u64, +// #[pyo3(get)] +// uploaded_bytes: u64, +// } +// +// #[pyclass] +// #[derive(Clone, Debug)] +// pub struct DownloadTelemetry { +// #[pyo3(get)] +// total_time_ms: u64, +// #[pyo3(get)] +// downloaded_bytes: u64, +// #[pyo3(get)] +// cached_bytes: u32, +// } + + #[pyfunction] #[pyo3(signature = (file_paths, endpoint, token_info, token_refresher), text_signature = "(file_paths: List[str], endpoint: Optional[str], token_info: Optional[(str, int)], token_refresher: Optional[Callable[[], (str, int)]]) -> List[PyPointerFile]")] pub fn upload_files( @@ -63,8 +87,10 @@ pub fn download_files( .block_on(async move { data_client::download_async(pfs, endpoint, token_info, refresher).await }) + .map(|pfs| pfs.into_iter().map(|pointer_file| pointer_file.path().to_string()).collect()) .map_err(|e| PyException::new_err(format!("{e:?}"))) }) + } // helper to convert the implemented WrappedTokenRefresher into an Arc @@ -82,21 +108,28 @@ pub struct PyPointerFile { hash: String, #[pyo3(get)] filesize: u64, + #[pyo3(get)] + compressed_size: u64, + #[pyo3(get)] + latency: f64, } + impl From for PyPointerFile { fn from(pf: PointerFile) -> Self { Self { path: pf.path().to_string(), hash: pf.hash_string().to_string(), filesize: pf.filesize(), + compressed_size: pf.compressed_size(), + latency: pf.latency(), } } } impl From for PointerFile { fn from(pf: PyPointerFile) -> Self { - PointerFile::init_from_info(&pf.path, &pf.hash, pf.filesize) + PointerFile::init_from_info(&pf.path, &pf.hash, pf.filesize, pf.compressed_size, Duration::from_secs_f64(pf.latency)) } } @@ -108,6 +141,8 @@ impl PyPointerFile { path, hash, filesize, + compressed_size: 0, + latency: 0.0, } }