Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 18 additions & 9 deletions curvine-client/src/unified/mount_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ use orpc::common::{FastHashMap, LocalTime};
use orpc::sync::AtomicCounter;
use orpc::CommonResult;
use std::sync::{Arc, RwLock};
use tokio::sync::Mutex;

/// Represents a single mount point with its filesystem handler.
/// Contains mount metadata, UFS handler, and path conversion utilities.
Expand Down Expand Up @@ -165,6 +166,8 @@ pub struct MountCache {
mounts: RwLock<InnerMap>,
update_interval: u64,
last_update: AtomicCounter,
/// Single-flight lock: only one task performs full refresh when TTL expires.
refresh_lock: Mutex<()>,
}

impl MountCache {
Expand All @@ -173,6 +176,7 @@ impl MountCache {
mounts: RwLock::new(InnerMap::default()),
update_interval,
last_update: AtomicCounter::new(0),
refresh_lock: Mutex::new(()),
}
}

Expand All @@ -181,20 +185,25 @@ impl MountCache {
}

pub async fn check_update(&self, fs: &UnifiedFileSystem, force: bool) -> FsResult<()> {
if self.need_update() || force {
let mounts = fs.get_mount_table().await?;
if !self.need_update() && !force {
return Ok(());
}

let mut state = self.mounts.write().unwrap();
state.clear();
let _guard = self.refresh_lock.lock().await;
if !self.need_update() && !force {
return Ok(());
}

for item in mounts {
state.insert(item)?;
}
let mounts = fs.get_mount_table().await?;
let mut state = self.mounts.write().unwrap();

debug!("update mounts {:?}", state.len());
self.last_update.set(LocalTime::mills());
state.clear();
for item in mounts {
state.insert(item)?;
}

debug!("update mounts {:?}", state.len());
self.last_update.set(LocalTime::mills());
Ok(())
}

Expand Down
82 changes: 61 additions & 21 deletions curvine-client/src/unified/unified_filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,22 @@ use curvine_common::conf::ClusterConf;
use curvine_common::error::FsError;
use curvine_common::fs::{FileSystem, Path, Reader, Writer};
use curvine_common::state::{
CreateFileOpts, FileAllocOpts, FileLock, FileStatus, JobStatus, LoadJobCommand, LoadJobResult,
MasterInfo, MkdirOpts, MkdirOptsBuilder, MountInfo, MountOptions, OpenFlags, SetAttrOpts,
CreateFileOpts, FileAllocOpts, FileLock, FileStatus, JobStatus, LoadJobCommand, MasterInfo,
MkdirOpts, MkdirOptsBuilder, MountInfo, MountOptions, OpenFlags, SetAttrOpts,
};
use curvine_common::utils::CommonUtils;
use curvine_common::FsResult;
use log::{error, info, warn};
use orpc::common::TimeSpent;
use orpc::runtime::Runtime;
use orpc::runtime::{RpcRuntime, Runtime};
use orpc::{err_box, err_ext};
use std::sync::Arc;

#[allow(clippy::large_enum_variant)]
#[derive(Clone)]
enum CacheValidity {
Valid,
Invalid,
Invalid(Option<FileStatus>),
}

#[derive(Clone)]
Expand Down Expand Up @@ -210,11 +210,11 @@ impl UnifiedFileSystem {
mount: &MountValue,
) -> FsResult<CacheValidity> {
if cv_status.is_expired() {
return Ok(CacheValidity::Invalid);
return Ok(CacheValidity::Invalid(None));
}

if !cv_status.is_complete() {
return Ok(CacheValidity::Invalid);
return Ok(CacheValidity::Invalid(None));
}

if !mount.info.read_verify_ufs {
Expand All @@ -228,7 +228,7 @@ impl UnifiedFileSystem {
{
Ok(CacheValidity::Valid)
} else {
Ok(CacheValidity::Invalid)
Ok(CacheValidity::Invalid(Some(ufs_status)))
}
}

Expand Down Expand Up @@ -274,16 +274,28 @@ impl UnifiedFileSystem {
)?);
Ok(cv_reader)
}
CacheValidity::Invalid => Ok(None),
CacheValidity::Invalid(_) => Ok(None),
}
}
}

pub async fn async_cache(&self, source_path: &Path) -> FsResult<LoadJobResult> {
pub fn async_cache(&self, source_path: &Path) -> FsResult<()> {
let client = JobMasterClient::new(self.fs_client());
let source_path = source_path.clone_uri();
let command = LoadJobCommand::builder(source_path.clone()).build();
client.submit_load_job(command).await

self.fs_context().rt().spawn(async move {
let command = LoadJobCommand::builder(source_path.clone()).build();
let res = client.submit_load_job(command).await;
match res {
Err(e) => warn!("submit async cache error for {}: {}", source_path, e),
Ok(res) => info!(
"submit async cache successfully for {}, job id {}, target_path {}",
source_path, res.job_id, res.target_path
),
}
});

Ok(())
}

pub async fn wait_job_complete(&self, path: &Path, fail_if_not_found: bool) -> FsResult<()> {
Expand Down Expand Up @@ -383,6 +395,12 @@ impl UnifiedFileSystem {
}

Some((ufs_path, mount)) => {
if let Err(e) = self.cv.delete(path, false).await {
if !matches!(e, FsError::FileNotFound(_)) {
warn!("failed to delete cache for {}: {}", path, e);
}
}

let writer = if flags.append() {
mount.ufs.append(&ufs_path).await?
} else {
Expand Down Expand Up @@ -517,13 +535,7 @@ impl FileSystem<UnifiedWriter, UnifiedReader> for UnifiedFileSystem {
.inc();

if mount.info.auto_cache() {
match self.async_cache(&ufs_path).await {
Err(e) => warn!("submit async cache error for {}: {}", ufs_path, e),
Ok(res) => info!(
"submit async cache successfully for {}, job id {}, target_path {}",
path, res.job_id, res.target_path
),
}
self.async_cache(&ufs_path)?;
}

// Reading from ufs
Expand Down Expand Up @@ -569,7 +581,14 @@ impl FileSystem<UnifiedWriter, UnifiedReader> for UnifiedFileSystem {
match self.get_mount_checked(path).await? {
None => self.cv.delete(path, recursive).await,
Some((ufs_path, mount)) => {
// delete from UFS
if path.path() == mount.info.cv_path {
return err_box!(
"cannot delete mount point root: cv_path={}, ufs_path={}",
mount.info.cv_path,
mount.info.ufs_path
);
}

mount.ufs.delete(&ufs_path, recursive).await?;

// delete cache
Expand All @@ -590,9 +609,30 @@ impl FileSystem<UnifiedWriter, UnifiedReader> for UnifiedFileSystem {
vec!["get_status".to_string()],
);

match self.get_mount_checked(path).await? {
match self.get_mount(path).await? {
None => self.cv.get_status(path).await,
Some((ufs_path, mount)) => mount.ufs.get_status(&ufs_path).await,

Some((_, mnt)) if mnt.info.is_fs_mode() => self.cv.get_status(path).await,

Some((ufs_path, mnt)) => match self.cv.get_status(path).await {
Ok(mut v) => match self.check_cache_validity(&v, &ufs_path, &mnt).await? {
CacheValidity::Valid => {
if v.ufs_exists() {
v.mtime = v.storage_policy.ufs_mtime;
}
Ok(v)
}
CacheValidity::Invalid(Some(ufs_status)) => Ok(ufs_status),
CacheValidity::Invalid(None) => mnt.ufs.get_status(&ufs_path).await,
},

Err(e) => {
if !matches!(e, FsError::FileNotFound(_) | FsError::Expired(_)) {
warn!("failed to get status file {}: {}", path, e);
};
mnt.ufs.get_status(&ufs_path).await
}
},
}
}

Expand Down
8 changes: 6 additions & 2 deletions curvine-fuse/src/fs/curvine_file_system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1214,7 +1214,9 @@ impl fs::FileSystem for CurvineFileSystem {
handle.flush(Some(reply)).await?;

let path = Path::from_str(&handle.status.path)?;
self.invalidate_cache(&path)?;
if handle.writer.is_some() {
self.invalidate_cache(&path)?;
}
Ok(())
}

Expand All @@ -1241,7 +1243,9 @@ impl fs::FileSystem for CurvineFileSystem {
}

let path = Path::from_str(&handle.status.path)?;
self.invalidate_cache(&path)?;
if handle.writer.is_some() {
self.invalidate_cache(&path)?;
}

complete_result
}
Expand Down
7 changes: 7 additions & 0 deletions curvine-server/src/master/master_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,13 @@ impl MasterHandler {
return Ok(true);
}

let path = Path::from_str(&header.path)?;
if let Some(info) = self.mount_manager.get_mount_info(&path)? {
if path.path() == info.cv_path {
return err_box!("cannot delete mount point root: {}", info.cv_path);
}
}

let res = self.fs.delete(&header.path, header.recursive);
self.set_req_cache(req_id, res)
}
Expand Down
2 changes: 1 addition & 1 deletion curvine-tests/tests/write_cache_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ fn test_cache_mode() {
let mtime_before = ufs_reader_before.status().mtime;
drop(ufs_reader_before);

fs.async_cache(&path).await.unwrap();
fs.async_cache(&path).unwrap();
fs.wait_job_complete(&path, false).await.unwrap();

let ufs_reader_after = mnt.ufs.open(&ufs_path).await.unwrap();
Expand Down