Skip to content

Commit b08974a

Browse files
committed
Collect telemetry for upload path
Get the telemetry data to the py layer. This will be sent over the telemetry endpoint later. Related to STO-7
1 parent 92abd75 commit b08974a

File tree

7 files changed

+96
-17
lines changed

7 files changed

+96
-17
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,4 @@ debug/
1717
.vscode
1818
venv
1919
**/*.env
20+
**/uv.lock

data/src/clean.rs

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use std::mem::take;
2121
use std::ops::DerefMut;
2222
use std::path::{Path, PathBuf};
2323
use std::sync::Arc;
24+
use std::time::Instant;
2425
use tokio::sync::mpsc::error::TryRecvError;
2526
use tokio::sync::mpsc::{channel, Receiver, Sender};
2627
use tokio::sync::Mutex;
@@ -76,6 +77,9 @@ pub struct Cleaner {
7677

7778
// Auxiliary info
7879
file_name: Option<PathBuf>,
80+
81+
// Telemetry
82+
start: Instant,
7983
}
8084

8185
impl Cleaner {
@@ -113,6 +117,7 @@ impl Cleaner {
113117
tracking_info: Mutex::new(Default::default()),
114118
small_file_buffer: Mutex::new(Some(Vec::with_capacity(small_file_threshold))),
115119
file_name: file_name.map(|f| f.to_owned()),
120+
start: Instant::now(),
116121
});
117122

118123
Self::run(cleaner.clone(), chunk_c).await;
@@ -239,8 +244,9 @@ impl Cleaner {
239244
Ok(false)
240245
}
241246

242-
async fn dedup(&self, chunks: &[ChunkYieldType]) -> Result<()> {
247+
async fn dedup(&self, chunks: &[ChunkYieldType]) -> Result<u64> {
243248
info!("Dedup {} chunks", chunks.len());
249+
let mut total_compressed_bytes = 0;
244250
let mut tracking_info = self.tracking_info.lock().await;
245251

246252
let enable_global_dedup = self.enable_global_dedup_queries;
@@ -463,13 +469,14 @@ impl Cleaner {
463469
tracking_info.cas_data.data.extend(bytes);
464470

465471
if tracking_info.cas_data.data.len() > TARGET_CAS_BLOCK_SIZE {
466-
let cas_hash = register_new_cas_block(
472+
let (cas_hash, compressed_bytes) = register_new_cas_block(
467473
&mut tracking_info.cas_data,
468474
&self.shard_manager,
469475
&self.cas,
470476
&self.cas_prefix,
471477
)
472478
.await?;
479+
total_compressed_bytes += compressed_bytes;
473480

474481
for i in take(&mut tracking_info.current_cas_file_info_indices) {
475482
tracking_info.file_info[i].cas_hash = cas_hash;
@@ -483,7 +490,7 @@ impl Cleaner {
483490
}
484491
}
485492

486-
Ok(())
493+
Ok(total_compressed_bytes)
487494
}
488495

489496
async fn finish(&self) -> Result<()> {
@@ -516,7 +523,8 @@ impl Cleaner {
516523
Ok(())
517524
}
518525

519-
async fn summarize_dedup_info(&self) -> Result<(MerkleHash, u64)> {
526+
async fn summarize_dedup_info(&self) -> Result<(MerkleHash, u64, u64)> {
527+
let mut total_compressed_bytes = 0;
520528
let mut tracking_info = self.tracking_info.lock().await;
521529

522530
let file_hash = file_node_hash(
@@ -577,13 +585,14 @@ impl Cleaner {
577585
if cas_data_accumulator.data.len() >= TARGET_CAS_BLOCK_SIZE {
578586
let mut new_cas_data = take(cas_data_accumulator.deref_mut());
579587
drop(cas_data_accumulator); // Release the lock.
580-
register_new_cas_block(
588+
let (_cas_hash, compressed_bytes) = register_new_cas_block(
581589
&mut new_cas_data,
582590
&self.shard_manager,
583591
&self.cas,
584592
&self.cas_prefix,
585593
)
586594
.await?;
595+
total_compressed_bytes += compressed_bytes;
587596
} else {
588597
drop(cas_data_accumulator);
589598
}
@@ -593,11 +602,11 @@ impl Cleaner {
593602

594603
*tracking_info = Default::default();
595604

596-
Ok((file_hash, file_size))
605+
Ok((file_hash, file_size, total_compressed_bytes))
597606
}
598607

599608
async fn to_pointer_file(&self) -> Result<String> {
600-
let (hash, filesize) = self.summarize_dedup_info().await?;
609+
let (hash, filesize, compressed_size) = self.summarize_dedup_info().await?;
601610
let pointer_file = PointerFile::init_from_info(
602611
&self
603612
.file_name
@@ -606,6 +615,8 @@ impl Cleaner {
606615
.unwrap_or_default(),
607616
&hash.hex(),
608617
filesize,
618+
compressed_size,
619+
self.start.elapsed(),
609620
);
610621
Ok(pointer_file.to_string())
611622
}

data/src/data_processing.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use std::mem::take;
1717
use std::ops::DerefMut;
1818
use std::path::Path;
1919
use std::sync::Arc;
20+
use std::time::Instant;
2021
use tokio::sync::Mutex;
2122

2223
#[derive(Default, Debug)]
@@ -59,6 +60,9 @@ pub struct PointerFileTranslator {
5960

6061
/* ----- Deduped data shared across files ----- */
6162
global_cas_data: Arc<Mutex<CASDataAggregator>>,
63+
// Telemetry
64+
/* ----- Telemetry ----- */
65+
pub start: Instant,
6266
}
6367

6468
// Constructors
@@ -97,6 +101,7 @@ impl PointerFileTranslator {
97101
remote_shards,
98102
cas: cas_client,
99103
global_cas_data: Default::default(),
104+
start: Instant::now(),
100105
})
101106
}
102107
}
@@ -208,7 +213,7 @@ pub(crate) async fn register_new_cas_block(
208213
shard_manager: &Arc<ShardFileManager>,
209214
cas: &Arc<dyn Client + Send + Sync>,
210215
cas_prefix: &str,
211-
) -> Result<MerkleHash> {
216+
) -> Result<(MerkleHash, u64)> {
212217
let cas_hash = cas_node_hash(&cas_data.chunks[..]);
213218

214219
let raw_bytes_len = cas_data.data.len();
@@ -283,7 +288,7 @@ pub(crate) async fn register_new_cas_block(
283288
cas_data.chunks.clear();
284289
cas_data.pending_file_info.clear();
285290

286-
Ok(cas_hash)
291+
Ok((cas_hash, compressed_bytes_len as u64))
287292
}
288293

289294
/// Smudge operations

data/src/pointer_file.rs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use crate::constants::POINTER_FILE_LIMIT;
33
use merklehash::{DataHashHexParseError, MerkleHash};
44
use static_assertions::const_assert;
55
use std::{collections::BTreeMap, fs, path::Path};
6+
use std::time::Duration;
67
use toml::Value;
78
use tracing::{debug, error, warn};
89

@@ -33,6 +34,12 @@ pub struct PointerFile {
3334

3435
/// The size of the file pointed to by this pointer file
3536
filesize: u64,
37+
38+
/// The addition to CAS bytes due to this file
39+
compressed_size: Option<u64>,
40+
41+
/// The duration for cleaning or smudging this file
42+
pub latency: Option<Duration>,
3643
}
3744

3845
impl PointerFile {
@@ -59,6 +66,8 @@ impl PointerFile {
5966
is_valid,
6067
hash,
6168
filesize,
69+
compressed_size: None,
70+
latency: None,
6271
};
6372
}
6473

@@ -73,6 +82,8 @@ impl PointerFile {
7382
is_valid,
7483
hash,
7584
filesize,
85+
compressed_size: None,
86+
latency: None,
7687
};
7788
}
7889

@@ -117,6 +128,8 @@ impl PointerFile {
117128
is_valid,
118129
hash,
119130
filesize,
131+
compressed_size: None,
132+
latency: None,
120133
}
121134
}
122135

@@ -134,6 +147,8 @@ impl PointerFile {
134147
is_valid: false,
135148
hash: empty_string,
136149
filesize: 0,
150+
compressed_size: None,
151+
latency: None,
137152
};
138153

139154
let Ok(file_meta) = fs::metadata(path).map_err(|e| {
@@ -156,13 +171,15 @@ impl PointerFile {
156171
PointerFile::init_from_string(&contents, path)
157172
}
158173

159-
pub fn init_from_info(path: &str, hash: &str, filesize: u64) -> Self {
174+
pub fn init_from_info(path: &str, hash: &str, filesize: u64, compressed_size: u64, latency: Duration) -> Self {
160175
Self {
161176
version_string: CURRENT_VERSION.to_string(),
162177
path: path.to_string(),
163178
is_valid: true,
164179
hash: hash.to_string(),
165180
filesize,
181+
compressed_size: Some(compressed_size),
182+
latency: Some(latency),
166183
}
167184
}
168185

@@ -194,6 +211,10 @@ impl PointerFile {
194211
pub fn filesize(&self) -> u64 {
195212
self.filesize
196213
}
214+
215+
pub fn compressed_size(&self) -> u64 {self.compressed_size.unwrap_or(0)}
216+
217+
pub fn latency(&self) -> f64 {self.latency.map(|dur| dur.as_secs_f64()).unwrap_or(0f64)}
197218
}
198219

199220
pub fn is_xet_pointer_file(data: &[u8]) -> bool {
@@ -322,4 +343,6 @@ mod tests {
322343
let test = PointerFile::init_from_string(&test_contents, &empty_string);
323344
assert!(!test.is_valid()); // new version is not valid
324345
}
346+
347+
// todo add init_from_info test
325348
}

hf_xet/pyproject.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ classifiers = [
1111
"Programming Language :: Python :: Implementation :: PyPy",
1212
]
1313
dynamic = ["version"]
14+
dependencies = [
15+
"maturin>=1.7.4",
16+
]
1417
[project.optional-dependencies]
1518
tests = [
1619
"pytest",

hf_xet/src/data_client.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,15 +56,15 @@ pub async fn download_async(
5656
endpoint: Option<String>,
5757
token_info: Option<(String, u64)>,
5858
token_refresher: Option<Arc<dyn TokenRefresher>>,
59-
) -> errors::Result<Vec<String>> {
59+
) -> errors::Result<Vec<PointerFile>> {
6060
let config = default_config(
6161
endpoint.unwrap_or(DEFAULT_CAS_ENDPOINT.to_string()),
6262
token_info,
6363
token_refresher,
6464
)?;
6565
let processor = Arc::new(PointerFileTranslator::new(config).await?);
6666
let processor = &processor;
67-
let paths = tokio_par_for_each(
67+
let pfs = tokio_par_for_each(
6868
pointer_files,
6969
MAX_CONCURRENT_DOWNLOADS,
7070
|pointer_file, _| async move {
@@ -78,7 +78,7 @@ pub async fn download_async(
7878
ParallelError::TaskError(e) => e,
7979
})?;
8080

81-
Ok(paths)
81+
Ok(pfs)
8282
}
8383

8484
async fn clean_file(processor: &PointerFileTranslator, f: String) -> errors::Result<PointerFile> {
@@ -96,7 +96,6 @@ async fn clean_file(processor: &PointerFileTranslator, f: String) -> errors::Res
9696

9797
handle.add_bytes(read_buf[0..bytes].to_vec()).await?;
9898
}
99-
10099
let pf_str = handle.result().await?;
101100
let pf = PointerFile::init_from_string(&pf_str, path.to_str().unwrap());
102101
Ok(pf)
@@ -105,15 +104,17 @@ async fn clean_file(processor: &PointerFileTranslator, f: String) -> errors::Res
105104
async fn smudge_file(
106105
proc: &PointerFileTranslator,
107106
pointer_file: &PointerFile,
108-
) -> errors::Result<String> {
107+
) -> errors::Result<PointerFile> {
109108
let path = PathBuf::from(pointer_file.path());
110109
if let Some(parent_dir) = path.parent() {
111110
fs::create_dir_all(parent_dir)?;
112111
}
113112
let mut f: Box<dyn Write + Send> = Box::new(File::create(&path)?);
114113
proc.smudge_file_from_pointer(pointer_file, &mut f, None)
115114
.await?;
116-
Ok(pointer_file.path().to_string())
115+
let mut pointer_file_clone = pointer_file.clone();
116+
pointer_file_clone.latency = Some(proc.start.elapsed());
117+
Ok(pointer_file_clone)
117118
}
118119

119120
#[cfg(test)]

hf_xet/src/lib.rs

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,32 @@ use pyo3::prelude::*;
1010
use pyo3::pyfunction;
1111
use std::fmt::Debug;
1212
use std::sync::Arc;
13+
use std::time::Duration;
1314
use token_refresh::WrappedTokenRefresher;
1415

16+
// This will be the information that will finally be sent to the telemetry endpoint
17+
//
18+
// #[pyclass]
19+
// #[derive(Clone, Debug)]
20+
// pub struct UploadTelemetry {
21+
// #[pyo3(get)]
22+
// total_time_ms: u64,
23+
// #[pyo3(get)]
24+
// uploaded_bytes: u64,
25+
// }
26+
//
27+
// #[pyclass]
28+
// #[derive(Clone, Debug)]
29+
// pub struct DownloadTelemetry {
30+
// #[pyo3(get)]
31+
// total_time_ms: u64,
32+
// #[pyo3(get)]
33+
// downloaded_bytes: u64,
34+
// #[pyo3(get)]
35+
// cached_bytes: u32,
36+
// }
37+
38+
1539
#[pyfunction]
1640
#[pyo3(signature = (file_paths, endpoint, token_info, token_refresher), text_signature = "(file_paths: List[str], endpoint: Optional[str], token_info: Optional[(str, int)], token_refresher: Optional[Callable[[], (str, int)]]) -> List[PyPointerFile]")]
1741
pub fn upload_files(
@@ -63,8 +87,10 @@ pub fn download_files(
6387
.block_on(async move {
6488
data_client::download_async(pfs, endpoint, token_info, refresher).await
6589
})
90+
.map(|pfs| pfs.into_iter().map(|pointer_file| pointer_file.path().to_string()).collect())
6691
.map_err(|e| PyException::new_err(format!("{e:?}")))
6792
})
93+
6894
}
6995

7096
// helper to convert the implemented WrappedTokenRefresher into an Arc<dyn TokenRefresher>
@@ -82,21 +108,28 @@ pub struct PyPointerFile {
82108
hash: String,
83109
#[pyo3(get)]
84110
filesize: u64,
111+
#[pyo3(get)]
112+
compressed_size: u64,
113+
#[pyo3(get)]
114+
latency: f64,
85115
}
86116

117+
87118
impl From<PointerFile> for PyPointerFile {
88119
fn from(pf: PointerFile) -> Self {
89120
Self {
90121
path: pf.path().to_string(),
91122
hash: pf.hash_string().to_string(),
92123
filesize: pf.filesize(),
124+
compressed_size: pf.compressed_size(),
125+
latency: pf.latency(),
93126
}
94127
}
95128
}
96129

97130
impl From<PyPointerFile> for PointerFile {
98131
fn from(pf: PyPointerFile) -> Self {
99-
PointerFile::init_from_info(&pf.path, &pf.hash, pf.filesize)
132+
PointerFile::init_from_info(&pf.path, &pf.hash, pf.filesize, pf.compressed_size, Duration::from_secs_f64(pf.latency))
100133
}
101134
}
102135

@@ -108,6 +141,8 @@ impl PyPointerFile {
108141
path,
109142
hash,
110143
filesize,
144+
compressed_size: 0,
145+
latency: 0.0,
111146
}
112147
}
113148

0 commit comments

Comments
 (0)