Skip to content

wasm poc interface #294

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: wasm-poc
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cas_client/src/http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ impl RetryConfig<No429RetryStratey> {

/// Builds authenticated HTTP Client to talk to CAS.
/// Includes retry middleware with exponential backoff.
#[allow(unused_variables)]
pub fn build_auth_http_client<R: RetryableStrategy + Send + Sync + 'static>(
auth_config: &Option<AuthConfig>,
retry_config: RetryConfig<R>,
Expand Down Expand Up @@ -106,6 +107,7 @@ pub fn build_auth_http_client<R: RetryableStrategy + Send + Sync + 'static>(

/// Builds HTTP Client to talk to CAS.
/// Includes retry middleware with exponential backoff.
#[allow(unused_variables)]
pub fn build_http_client<R: RetryableStrategy + Send + Sync + 'static>(
retry_config: RetryConfig<R>,
) -> std::result::Result<ClientWithMiddleware, CasClientError> {
Expand Down
5 changes: 3 additions & 2 deletions cas_client/src/interface.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use mdb_shard::shard_file_reconstructor::FileReconstructor;

#[cfg(not(target_family = "wasm"))]
use crate::CasClientError;
#[cfg(not(target_family = "wasm"))]
use mdb_shard::shard_file_reconstructor::FileReconstructor;

/// A Client to the Shard service. The shard service
/// provides for
Expand Down
3 changes: 2 additions & 1 deletion data/src/migration_tool/hub_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ pub struct HubClientTokenRefresher {
pub client: Arc<HubClient>,
}

#[async_trait]
#[cfg_attr(not(target_family = "wasm"), async_trait::async_trait)]
#[cfg_attr(target_family = "wasm", async_trait::async_trait(?Send))]
impl TokenRefresher for HubClientTokenRefresher {
async fn refresh(&self) -> std::result::Result<TokenInfo, AuthError> {
let client = self.client.clone();
Expand Down
3 changes: 2 additions & 1 deletion hf_xet/src/token_refresh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ impl WrappedTokenRefresher {
}
}

#[async_trait]
#[cfg_attr(not(target_family = "wasm"), async_trait::async_trait)]
#[cfg_attr(target_family = "wasm", async_trait::async_trait(?Send))]
impl TokenRefresher for WrappedTokenRefresher {
async fn refresh(&self) -> Result<TokenInfo, AuthError> {
Python::with_gil(|py| {
Expand Down
2 changes: 2 additions & 0 deletions hf_xet_wasm/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion hf_xet_wasm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ edition = "2021"
crate-type = ["cdylib", "rlib"]

[dependencies]
wasm-bindgen = "0.2.95"
wasm-bindgen = "0.2.100"
wasm-bindgen-futures = "0.4.50"
tokio = { version = "1.44", features = ["sync", "rt"] }
tokio_with_wasm = { version = "0.8.2", features = ["rt"] }
Expand Down Expand Up @@ -57,7 +57,9 @@ mdb_shard = { path = "../mdb_shard" }
merkledb = { path = "../merkledb" }
cas_object = { path = "../cas_object" }
cas_client = { path = "../cas_client" }
cas_types = { path = "../cas_types" }
utils = { path = "../utils" }
serde = { version = "1.0.217", features = ["derive"] }


[package.metadata.docs.rs]
Expand Down
1 change: 1 addition & 0 deletions hf_xet_wasm/build_wasm.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ RUSTFLAGS='-C target-feature=+atomics,+bulk-memory,+mutable-globals --cfg getran
RUSTFLAGS='--cfg getrandom_backend="wasm_js"' wasm-bindgen \
target/wasm32-unknown-unknown/release/examples/simple.wasm \
--out-dir ./examples/target/ \
--typescript \
--target web
9 changes: 5 additions & 4 deletions hf_xet_wasm/examples/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ use cas_object::test_utils::build_cas_object;
use cas_object::CompressionScheme;
use futures::AsyncReadExt;
use hf_xet_wasm::blob_reader::BlobReader;
use hf_xet_wasm::configurations::{DataConfig, RepoSalt, ShardConfig, TranslatorConfig};
use hf_xet_wasm::configurations::{
DataConfig, RepoSalt, RepoSalt, ShardConfig, ShardConfig, TranslatorConfig, TranslatorConfig,
};
use hf_xet_wasm::wasm_file_upload_session::FileUploadSession;
use merklehash::MerkleHash;
use reqwest::header;
Expand Down Expand Up @@ -112,7 +114,7 @@ pub async fn test_async_blob_reader(file: web_sys::File) -> String {
let s: u32 = data_local.iter().map(|&x| x as u32).sum();
sum += s;
}
o_tx.send(sum).await;
let _ = o_tx.send(sum).await;
})
}));
let Ok(()) = tx.send(data_local).await else {
Expand All @@ -123,7 +125,6 @@ pub async fn test_async_blob_reader(file: web_sys::File) -> String {
log::info!("data sent");
}

let mut id = inputs.len() - 1;
for (id, input) in inputs.into_iter().enumerate() {
log::info!("closing input {id}");
drop(input);
Expand Down Expand Up @@ -176,7 +177,7 @@ pub async fn clean_file(file: web_sys::File, endpoint: String, jwt_token: String

let upload_session = Arc::new(FileUploadSession::new(Arc::new(config)));

let mut handle = upload_session.start_clean();
let mut handle = upload_session.start_clean("".to_string());

const READ_BUF_SIZE: usize = 8 * 1024 * 1024;
let mut buf = vec![0u8; READ_BUF_SIZE];
Expand Down
140 changes: 70 additions & 70 deletions hf_xet_wasm/examples/xet_meta.js
Original file line number Diff line number Diff line change
@@ -1,90 +1,90 @@
function xetMetadataOrNone(jsonData) {
/**
* Extract XET metadata from the HTTP body or return null if not found.
*
* @param {jsonData} - HTTP body in JSON to extract the XET metadata from.
* @returns {XetMetadata|null} The extracted metadata or null if missing.
*/
/**
* Extract XET metadata from the HTTP body or return null if not found.
*
* @param {jsonData} - HTTP body in JSON to extract the XET metadata from.
* @returns {XetMetadata|null} The extracted metadata or null if missing.
*/

const xetEndpoint = jsonData.casUrl;
const accessToken = jsonData.accessToken;
const expiration = jsonData.exp;
const xetEndpoint = jsonData.casUrl;
const accessToken = jsonData.accessToken;
const expiration = jsonData.exp;

if (xetEndpoint == undefined || accessToken == undefined || expiration == undefined) {
return null;
}
if (xetEndpoint == undefined || accessToken == undefined || expiration == undefined) {
return null;
}

const expirationUnixEpoch = parseInt(expiration, 10);
if (isNaN(expirationUnixEpoch)) {
return null;
}
const expirationUnixEpoch = parseInt(expiration, 10);
if (isNaN(expirationUnixEpoch)) {
return null;
}

return {
endpoint: xetEndpoint,
accessToken: accessToken,
expirationUnixEpoch: expirationUnixEpoch,
};
return {
endpoint: xetEndpoint,
accessToken: accessToken,
expirationUnixEpoch: expirationUnixEpoch,
};
}

async function fetchXetMetadataFromRepoInfo({
tokenType,
repoId,
repoType,
headers,
params = null
}) {
/**
* Uses the repo info to request a XET access token from Hub.
*
* @param {string} tokenType - Type of the token to request: "read" or "write".
* @param {string} repoId - A namespace (user or an organization) and a repo name separated by a `/`.
* @param {string} repoType - Type of the repo to upload to: "model", "dataset", or "space".
* @param {Object} headers - Headers to use for the request, including authorization headers and user agent.
* @param {Object|null} params - Additional parameters to pass with the request.
* @returns {Promise<Object|null>} The metadata needed to make the request to the XET storage service.
* @throws {Error} If the Hub API returned an error or the response is improperly formatted.
*/
tokenType,
repoId,
repoType,
headers,
params = null,
}) {
/**
* Uses the repo info to request a XET access token from Hub.
*
* @param {string} tokenType - Type of the token to request: "read" or "write".
* @param {string} repoId - A namespace (user or an organization) and a repo name separated by a `/`.
* @param {string} repoType - Type of the repo to upload to: "model", "dataset", or "space".
* @param {Object} headers - Headers to use for the request, including authorization headers and user agent.
* @param {Object|null} params - Additional parameters to pass with the request.
* @returns {Promise<Object|null>} The metadata needed to make the request to the XET storage service.
* @throws {Error} If the Hub API returned an error or the response is improperly formatted.
*/

const url = `https://huggingface.co/api/${repoType}s/${repoId}/xet-${tokenType}-token/main`;
console.log(`${url}`);
const url = `http://localhost:5564/api/${repoType}s/${repoId}/xet-${tokenType}-token/main`;
console.log(`${url}`);

return fetchXetMetadataWithUrl(url, headers, params);
return fetchXetMetadataWithUrl(url, headers, params);
}

async function fetchXetMetadataWithUrl(url, headers, params = null) {
/**
* Requests the XET access token from the supplied URL.
*
* @param {string} url - The access token endpoint URL.
* @param {Object} headers - Headers to use for the request, including authorization headers and user agent.
* @param {Object|null} params - Additional parameters to pass with the request.
* @returns {Promise<Object|null>} The metadata needed to make the request to the XET storage service.
* @throws {Error} If the Hub API returned an error or the response is improperly formatted.
*/
/**
* Requests the XET access token from the supplied URL.
*
* @param {string} url - The access token endpoint URL.
* @param {Object} headers - Headers to use for the request, including authorization headers and user agent.
* @param {Object|null} params - Additional parameters to pass with the request.
* @returns {Promise<Object|null>} The metadata needed to make the request to the XET storage service.
* @throws {Error} If the Hub API returned an error or the response is improperly formatted.
*/

const response = await fetch(url, {
method: "GET",
headers: headers
});
const response = await fetch(url, {
method: "GET",
headers: headers,
});

// Get headers
// const hedrs = response.headers;
// hedrs.forEach((value, key) => {
// console.log(`${key}: ${value}`);
// });
// Get headers
// const hedrs = response.headers;
// hedrs.forEach((value, key) => {
// console.log(`${key}: ${value}`);
// });

const jsonData = await response.json();
// console.log("Response Body :", jsonData);
const jsonData = await response.json();
// console.log("Response Body :", jsonData);

if (!response.ok) {
console.log("response not ok");
throw new Error(`HTTP error! Status: ${response.status}`);
}
if (!response.ok) {
console.log("response not ok");
throw new Error(`HTTP error! Status: ${response.status}`);
}

const metadata = xetMetadataOrNone(jsonData);
if (!metadata) {
throw new Error("XET headers have not been correctly set by the server.");
}
const metadata = xetMetadataOrNone(jsonData);
if (!metadata) {
throw new Error("XET headers have not been correctly set by the server.");
}

return metadata;
return metadata;
}
61 changes: 61 additions & 0 deletions hf_xet_wasm/src/auth.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

use tokio::sync::Mutex;
use wasm_bindgen::prelude::*;
use wasm_bindgen::JsValue;

#[wasm_bindgen]
extern "C" {
pub type TokenInfo;
#[wasm_bindgen(method, getter)]
pub fn token(this: &TokenInfo) -> String;
#[wasm_bindgen(method, getter)]
pub fn expiration(this: &TokenInfo) -> u64;

pub type TokenRefresher;
#[wasm_bindgen(method, catch, js_name = "refreshToken")]
pub async fn refresh_token(this: &TokenRefresher) -> Result<TokenInfo, JsValue>;
}

// interface TokenRefresher {
// refresh_token(): Promise<TokenInfo>;
// }

impl From<TokenInfo> for utils::auth::TokenInfo {
fn from(value: TokenInfo) -> Self {
(value.token(), value.expiration())
}
}

impl Debug for TokenRefresher {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "TokenRefresher")
}
}

#[derive(Debug, Clone)]
pub(crate) struct WrappedTokenRefresher(Arc<Mutex<TokenRefresher>>);

unsafe impl Send for WrappedTokenRefresher {}
unsafe impl Sync for WrappedTokenRefresher {}

impl From<TokenRefresher> for WrappedTokenRefresher {
fn from(value: TokenRefresher) -> Self {
#[allow(clippy::arc_with_non_send_sync)]
WrappedTokenRefresher(Arc::new(Mutex::new(value)))
}
}

#[async_trait::async_trait(?Send)]
impl utils::auth::TokenRefresher for WrappedTokenRefresher {
async fn refresh(&self) -> Result<utils::auth::TokenInfo, utils::errors::AuthError> {
self.0
.lock()
.await
.refresh_token()
.await
.map(utils::auth::TokenInfo::from)
.map_err(|e| utils::errors::AuthError::token_refresh_failure(format!("{e:?}")))
}
}
7 changes: 7 additions & 0 deletions hf_xet_wasm/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
#[cfg(not(target_family = "wasm"))]
compile_error!("This crate is only meant to be used on the WebAssembly target");

mod auth;
pub mod blob_reader;
pub mod configurations;
mod errors;
mod session;
mod sha256;
mod wasm_deduplication_interface;
mod wasm_file_cleaner;
pub mod wasm_file_upload_session;

mod xorb_uploader;

pub use session::XetSession;

// sample test
#[cfg(test)]
mod tests {
Expand Down
Loading
Loading