Skip to content

Commit 8bb9f71

Browse files
committed
feat(client): cache-mode get_status + single-flight mount refresh + async_cache fire-and-forget
- unified_filesystem: get_status branches by no-mount / FS mode / cache mode; in cache mode calls cv.get_status then check_cache_validity, falls back to UFS when invalid or on error; CacheValidity gains Invalid(Option<FileStatus>) to carry UFS status - unified_filesystem: add invalidate_cache, clear CV cache before open-for-write on mount path; async_cache becomes fire-and-forget (spawn load job in background, return immediately) - mount_cache: single-flight refresh via refresh_lock to avoid blocking all get_mount while holding write lock during RPC - master_handler: forbid delete of mount point root (error when path == mount.cv_path) - curvine_fuse: call invalidate_cache only when handle.writer.is_some() (invalidate cache only after flush/release when file was opened for write)
1 parent abfd560 commit 8bb9f71

File tree

5 files changed

+91
-33
lines changed

5 files changed

+91
-33
lines changed

curvine-client/src/unified/mount_cache.rs

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ use orpc::common::{FastHashMap, LocalTime};
7171
use orpc::sync::AtomicCounter;
7272
use orpc::CommonResult;
7373
use std::sync::{Arc, RwLock};
74+
use tokio::sync::Mutex;
7475

7576
/// Represents a single mount point with its filesystem handler.
7677
/// Contains mount metadata, UFS handler, and path conversion utilities.
@@ -165,6 +166,8 @@ pub struct MountCache {
165166
mounts: RwLock<InnerMap>,
166167
update_interval: u64,
167168
last_update: AtomicCounter,
169+
/// Single-flight lock: only one task performs full refresh when TTL expires.
170+
refresh_lock: Mutex<()>,
168171
}
169172

170173
impl MountCache {
@@ -173,6 +176,7 @@ impl MountCache {
173176
mounts: RwLock::new(InnerMap::default()),
174177
update_interval,
175178
last_update: AtomicCounter::new(0),
179+
refresh_lock: Mutex::new(()),
176180
}
177181
}
178182

@@ -181,20 +185,25 @@ impl MountCache {
181185
}
182186

183187
pub async fn check_update(&self, fs: &UnifiedFileSystem, force: bool) -> FsResult<()> {
184-
if self.need_update() || force {
185-
let mounts = fs.get_mount_table().await?;
188+
if !self.need_update() && !force {
189+
return Ok(());
190+
}
186191

187-
let mut state = self.mounts.write().unwrap();
188-
state.clear();
192+
let _guard = self.refresh_lock.lock().await;
193+
if !self.need_update() && !force {
194+
return Ok(());
195+
}
189196

190-
for item in mounts {
191-
state.insert(item)?;
192-
}
197+
let mounts = fs.get_mount_table().await?;
198+
let mut state = self.mounts.write().unwrap();
193199

194-
debug!("update mounts {:?}", state.len());
195-
self.last_update.set(LocalTime::mills());
200+
state.clear();
201+
for item in mounts {
202+
state.insert(item)?;
196203
}
197204

205+
debug!("update mounts {:?}", state.len());
206+
self.last_update.set(LocalTime::mills());
198207
Ok(())
199208
}
200209

curvine-client/src/unified/unified_filesystem.rs

Lines changed: 59 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -21,22 +21,22 @@ use curvine_common::conf::ClusterConf;
2121
use curvine_common::error::FsError;
2222
use curvine_common::fs::{FileSystem, Path, Reader, Writer};
2323
use curvine_common::state::{
24-
CreateFileOpts, FileAllocOpts, FileLock, FileStatus, JobStatus, LoadJobCommand, LoadJobResult,
25-
MasterInfo, MkdirOpts, MkdirOptsBuilder, MountInfo, MountOptions, OpenFlags, SetAttrOpts,
24+
CreateFileOpts, FileAllocOpts, FileLock, FileStatus, JobStatus, LoadJobCommand, MasterInfo,
25+
MkdirOpts, MkdirOptsBuilder, MountInfo, MountOptions, OpenFlags, SetAttrOpts,
2626
};
2727
use curvine_common::utils::CommonUtils;
2828
use curvine_common::FsResult;
2929
use log::{error, info, warn};
3030
use orpc::common::TimeSpent;
31-
use orpc::runtime::Runtime;
31+
use orpc::runtime::{RpcRuntime, Runtime};
3232
use orpc::{err_box, err_ext};
3333
use std::sync::Arc;
3434

3535
#[allow(clippy::large_enum_variant)]
3636
#[derive(Clone)]
3737
enum CacheValidity {
3838
Valid,
39-
Invalid,
39+
Invalid(Option<FileStatus>),
4040
}
4141

4242
#[derive(Clone)]
@@ -210,11 +210,11 @@ impl UnifiedFileSystem {
210210
mount: &MountValue,
211211
) -> FsResult<CacheValidity> {
212212
if cv_status.is_expired() {
213-
return Ok(CacheValidity::Invalid);
213+
return Ok(CacheValidity::Invalid(None));
214214
}
215215

216216
if !cv_status.is_complete() {
217-
return Ok(CacheValidity::Invalid);
217+
return Ok(CacheValidity::Invalid(None));
218218
}
219219

220220
if !mount.info.read_verify_ufs {
@@ -228,7 +228,7 @@ impl UnifiedFileSystem {
228228
{
229229
Ok(CacheValidity::Valid)
230230
} else {
231-
Ok(CacheValidity::Invalid)
231+
Ok(CacheValidity::Invalid(Some(ufs_status)))
232232
}
233233
}
234234

@@ -274,16 +274,28 @@ impl UnifiedFileSystem {
274274
)?);
275275
Ok(cv_reader)
276276
}
277-
CacheValidity::Invalid => Ok(None),
277+
CacheValidity::Invalid(_) => Ok(None),
278278
}
279279
}
280280
}
281281

282-
pub async fn async_cache(&self, source_path: &Path) -> FsResult<LoadJobResult> {
282+
pub fn async_cache(&self, source_path: &Path) -> FsResult<()> {
283283
let client = JobMasterClient::new(self.fs_client());
284284
let source_path = source_path.clone_uri();
285-
let command = LoadJobCommand::builder(source_path.clone()).build();
286-
client.submit_load_job(command).await
285+
286+
self.fs_context().rt().spawn(async move {
287+
let command = LoadJobCommand::builder(source_path.clone()).build();
288+
let res = client.submit_load_job(command).await;
289+
match res {
290+
Err(e) => warn!("submit async cache error for {}: {}", source_path, e),
291+
Ok(res) => info!(
292+
"submit async cache successfully for {}, job id {}, target_path {}",
293+
source_path, res.job_id, res.target_path
294+
),
295+
}
296+
});
297+
298+
Ok(())
287299
}
288300

289301
pub async fn wait_job_complete(&self, path: &Path, fail_if_not_found: bool) -> FsResult<()> {
@@ -383,6 +395,12 @@ impl UnifiedFileSystem {
383395
}
384396

385397
Some((ufs_path, mount)) => {
398+
if let Err(e) = self.cv.delete(path, false).await {
399+
if !matches!(e, FsError::FileNotFound(_)) {
400+
warn!("failed to delete cache for {}: {}", path, e);
401+
}
402+
}
403+
386404
let writer = if flags.append() {
387405
mount.ufs.append(&ufs_path).await?
388406
} else {
@@ -517,13 +535,7 @@ impl FileSystem<UnifiedWriter, UnifiedReader> for UnifiedFileSystem {
517535
.inc();
518536

519537
if mount.info.auto_cache() {
520-
match self.async_cache(&ufs_path).await {
521-
Err(e) => warn!("submit async cache error for {}: {}", ufs_path, e),
522-
Ok(res) => info!(
523-
"submit async cache successfully for {}, job id {}, target_path {}",
524-
path, res.job_id, res.target_path
525-
),
526-
}
538+
self.async_cache(&ufs_path)?;
527539
}
528540

529541
// Reading from ufs
@@ -569,7 +581,14 @@ impl FileSystem<UnifiedWriter, UnifiedReader> for UnifiedFileSystem {
569581
match self.get_mount_checked(path).await? {
570582
None => self.cv.delete(path, recursive).await,
571583
Some((ufs_path, mount)) => {
572-
// delete from UFS
584+
if path.path() == mount.info.cv_path {
585+
return err_box!(
586+
"cannot delete mount point root: cv_path={}, ufs_path={}",
587+
mount.info.cv_path,
588+
mount.info.ufs_path
589+
);
590+
}
591+
573592
mount.ufs.delete(&ufs_path, recursive).await?;
574593

575594
// delete cache
@@ -590,9 +609,28 @@ impl FileSystem<UnifiedWriter, UnifiedReader> for UnifiedFileSystem {
590609
vec!["get_status".to_string()],
591610
);
592611

593-
match self.get_mount_checked(path).await? {
612+
match self.get_mount(path).await? {
594613
None => self.cv.get_status(path).await,
595-
Some((ufs_path, mount)) => mount.ufs.get_status(&ufs_path).await,
614+
615+
Some((_, mnt)) if mnt.info.is_fs_mode() => self.cv.get_status(path).await,
616+
617+
Some((ufs_path, mnt)) => match self.cv.get_status(path).await {
618+
Ok(mut v) => match self.check_cache_validity(&v, &ufs_path, &mnt).await? {
619+
CacheValidity::Valid => {
620+
v.mtime = v.storage_policy.ufs_mtime;
621+
Ok(v)
622+
}
623+
CacheValidity::Invalid(Some(ufs_status)) => Ok(ufs_status),
624+
CacheValidity::Invalid(None) => mnt.ufs.get_status(&ufs_path).await,
625+
},
626+
627+
Err(e) => {
628+
if !matches!(e, FsError::FileNotFound(_) | FsError::Expired(_)) {
629+
warn!("failed to get status file {}: {}", path, e);
630+
};
631+
mnt.ufs.get_status(&ufs_path).await
632+
}
633+
},
596634
}
597635
}
598636

curvine-fuse/src/fs/curvine_file_system.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1214,7 +1214,9 @@ impl fs::FileSystem for CurvineFileSystem {
12141214
handle.flush(Some(reply)).await?;
12151215

12161216
let path = Path::from_str(&handle.status.path)?;
1217-
self.invalidate_cache(&path)?;
1217+
if handle.writer.is_some() {
1218+
self.invalidate_cache(&path)?;
1219+
}
12181220
Ok(())
12191221
}
12201222

@@ -1241,7 +1243,9 @@ impl fs::FileSystem for CurvineFileSystem {
12411243
}
12421244

12431245
let path = Path::from_str(&handle.status.path)?;
1244-
self.invalidate_cache(&path)?;
1246+
if handle.writer.is_some() {
1247+
self.invalidate_cache(&path)?;
1248+
}
12451249

12461250
complete_result
12471251
}

curvine-server/src/master/master_handler.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,13 @@ impl MasterHandler {
198198
return Ok(true);
199199
}
200200

201+
let path = Path::from_str(&header.path)?;
202+
if let Some(info) = self.mount_manager.get_mount_info(&path)? {
203+
if path.path() == info.cv_path {
204+
return err_box!("cannot delete mount point root: {}", info.cv_path);
205+
}
206+
}
207+
201208
let res = self.fs.delete(&header.path, header.recursive);
202209
self.set_req_cache(req_id, res)
203210
}

curvine-tests/tests/write_cache_test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ fn test_cache_mode() {
5959
let mtime_before = ufs_reader_before.status().mtime;
6060
drop(ufs_reader_before);
6161

62-
fs.async_cache(&path).await.unwrap();
62+
fs.async_cache(&path).unwrap();
6363
fs.wait_job_complete(&path, false).await.unwrap();
6464

6565
let ufs_reader_after = mnt.ufs.open(&ufs_path).await.unwrap();

0 commit comments

Comments
 (0)