Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion curvine-client/src/unified/unified_filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ impl UnifiedFileSystem {
ufs_path: &Path,
mount: &MountValue,
) -> FsResult<Option<FsReader>> {
let blocks = match self.cv.get_block_locations(cv_path).await {
let mut blocks = match self.cv.get_block_locations(cv_path).await {
Ok(blocks) => blocks,
Err(e) => {
if !matches!(e, FsError::FileNotFound(_) | FsError::Expired(_)) {
Expand Down Expand Up @@ -267,6 +267,7 @@ impl UnifiedFileSystem {
.await?
{
CacheValidity::Valid => {
blocks.status.mtime = blocks.status.storage_policy.ufs_mtime;
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In cache-mode, check_cache_validity() returns Valid immediately when mount.info.read_verify_ufs is false, without requiring storage_policy.ufs_mtime to be set. Unconditionally overwriting blocks.status.mtime with storage_policy.ufs_mtime here can therefore set mtime to 0, which will diverge from get_status()/list_status() (they read directly from UFS) and can reintroduce the inconsistency this PR is trying to fix. Consider only overriding mtime when ufs_mtime > 0 (or when read_verify_ufs is enabled / the value has been verified), otherwise keep the existing blocks.status.mtime.

Suggested change
blocks.status.mtime = blocks.status.storage_policy.ufs_mtime;
let ufs_mtime = blocks.status.storage_policy.ufs_mtime;
if ufs_mtime > 0 {
blocks.status.mtime = ufs_mtime;
}

Copilot uses AI. Check for mistakes.
let cv_reader = Some(FsReader::new(
cv_path.clone(),
self.cv.fs_context(),
Expand Down
74 changes: 59 additions & 15 deletions curvine-server/src/master/journal/journal_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::master::meta::inode::InodePath;
use crate::master::meta::inode::InodeView::{Dir, File};
use crate::master::{JobManager, MountManager, SyncFsDir};
use curvine_common::conf::JournalConf;
use curvine_common::error::FsError;
use curvine_common::proto::raft::SnapshotData;
use curvine_common::raft::storage::AppStorage;
use curvine_common::raft::{RaftResult, RaftUtils};
Expand All @@ -31,7 +32,6 @@ use orpc::{err_box, try_option, try_option_ref, CommonResult};
use std::path::Path;
use std::sync::Arc;
use std::{fs, mem};

// Replay the master metadata operation log.
#[derive(Clone)]
pub struct JournalLoader {
Expand Down Expand Up @@ -170,11 +170,15 @@ impl JournalLoader {

Ok(())
}

pub fn rename(&self, entry: RenameEntry) -> CommonResult<()> {
let mut fs_dir = self.fs_dir.write();
let src_inp = InodePath::resolve(fs_dir.root_ptr(), entry.src, &fs_dir.store)?;
let entry_src = entry.src;
let src_inp = InodePath::resolve(fs_dir.root_ptr(), &entry_src, &fs_dir.store)?;
let dst_inp = InodePath::resolve(fs_dir.root_ptr(), entry.dst, &fs_dir.store)?;
if src_inp.get_last_inode().is_none() {
warn!("Rename: source path not found: {}", entry_src);
return Ok(());
}
fs_dir.unprotected_rename(
&src_inp,
&dst_inp,
Expand All @@ -187,14 +191,24 @@ impl JournalLoader {

pub fn delete(&self, entry: DeleteEntry) -> CommonResult<()> {
let mut fs_dir = self.fs_dir.write();
let inp = InodePath::resolve(fs_dir.root_ptr(), entry.path, &fs_dir.store)?;
let entry_path = entry.path;
let inp = InodePath::resolve(fs_dir.root_ptr(), &entry_path, &fs_dir.store)?;
if inp.get_last_inode().is_none() {
warn!("Delete: path not found: {}", entry_path);
return Ok(());
}
fs_dir.unprotected_delete(&inp, entry.mtime)?;
Ok(())
}

pub fn free(&self, entry: FreeEntry) -> CommonResult<()> {
let mut fs_dir = self.fs_dir.write();
let inp = InodePath::resolve(fs_dir.root_ptr(), entry.path, &fs_dir.store)?;
let entry_path = entry.path;
let inp = InodePath::resolve(fs_dir.root_ptr(), &entry_path, &fs_dir.store)?;
if inp.get_last_inode().is_none() {
warn!("Free: path not found: {}", entry_path);
return Ok(());
}
fs_dir.unprotected_free(&inp, entry.mtime)?;
Ok(())
}
Expand All @@ -208,6 +222,10 @@ impl JournalLoader {
}

pub fn unmount(&self, entry: UnMountEntry) -> CommonResult<()> {
if !self.mnt_mgr.has_mounted(entry.id) {
warn!("Unmount: id already unmounted: {}", entry.id);
return Ok(());
}
self.mnt_mgr.unprotected_umount_by_id(entry.id)?;
let mut fs_dir = self.fs_dir.write();
fs_dir.unprotected_unmount(entry.id)?;
Expand All @@ -216,32 +234,58 @@ impl JournalLoader {

pub fn set_attr(&self, entry: SetAttrEntry) -> CommonResult<()> {
let mut fs_dir = self.fs_dir.write();
let inp = InodePath::resolve(fs_dir.root_ptr(), entry.path, &fs_dir.store)?;
let last_inode = try_option!(inp.get_last_inode());
let entry_path = entry.path;
let inp = InodePath::resolve(fs_dir.root_ptr(), &entry_path, &fs_dir.store)?;
let last_inode = match inp.get_last_inode() {
Some(v) => v,
None => {
warn!("SetAttr: path not found: {}", entry_path);
return Ok(());
}
};

fs_dir.unprotected_set_attr(last_inode, entry.opts)?;
Ok(())
}

pub fn symlink(&self, entry: SymlinkEntry) -> CommonResult<()> {
let link_path = entry.link;
let mut fs_dir = self.fs_dir.write();
let inp = InodePath::resolve(fs_dir.root_ptr(), entry.link, &fs_dir.store)?;
fs_dir.unprotected_symlink(inp, entry.new_inode, entry.force)?;
Ok(())
let inp = InodePath::resolve(fs_dir.root_ptr(), &link_path, &fs_dir.store)?;
match fs_dir.unprotected_symlink(inp, entry.new_inode, entry.force) {
Ok(_) => Ok(()),
Err(FsError::FileAlreadyExists(_)) => {
warn!("Symlink: file already exists: {}", link_path);
Ok(())
}
Err(e) => Err(e.into()),
}
}

pub fn link(&self, entry: LinkEntry) -> CommonResult<()> {
let src_path = entry.src_path;
let dst_path = entry.dst_path;
let mut fs_dir = self.fs_dir.write();
let old_path = InodePath::resolve(fs_dir.root_ptr(), entry.src_path, &fs_dir.store)?;
let new_path = InodePath::resolve(fs_dir.root_ptr(), entry.dst_path, &fs_dir.store)?;
let old_path = InodePath::resolve(fs_dir.root_ptr(), &src_path, &fs_dir.store)?;
let new_path = InodePath::resolve(fs_dir.root_ptr(), &dst_path, &fs_dir.store)?;

// Get the original inode ID
let original_inode_id = match old_path.get_last_inode() {
Some(inode) => inode.id(),
None => return err_box!("Original file not found during link recovery"),
None => {
warn!("Link: source path not found: {}", src_path);
return Ok(());
}
};

fs_dir.unprotected_link(new_path, original_inode_id, entry.op_ms)?;
Ok(())
match fs_dir.unprotected_link(new_path, original_inode_id, entry.op_ms) {
Ok(_) => Ok(()),
Err(FsError::FileAlreadyExists(_)) => {
warn!("Link: dst_path already exists: {}", dst_path);
Ok(())
}
Err(e) => Err(e.into()),
}
}

pub fn set_locks(&self, entry: SetLocksEntry) -> CommonResult<()> {
Expand Down
4 changes: 4 additions & 0 deletions curvine-server/src/master/mount/mount_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ impl MountManager {
self.mount_table.unprotected_umount_by_id(id)
}

pub fn has_mounted(&self, id: u32) -> bool {
self.mount_table.has_mounted(id)
}

/**
* use ufs_uri to find mount entry
*/
Expand Down
2 changes: 1 addition & 1 deletion curvine-server/src/master/mount/mount_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl MountTable {
}

// mountid maybe occupied
fn has_mounted(&self, mount_id: u32) -> bool {
pub fn has_mounted(&self, mount_id: u32) -> bool {
let inner = self.inner.read().unwrap();
inner.mountid2entry.contains_key(&mount_id)
}
Expand Down
194 changes: 194 additions & 0 deletions curvine-server/tests/master_fs_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,19 @@
// limitations under the License.

use curvine_common::conf::{ClusterConf, JournalConf, MasterConf};
use curvine_common::fs::CurvineURI;
use curvine_common::fs::RpcCode;
use curvine_common::proto::{
CreateFileRequest, DeleteRequest, MkdirOptsProto, MkdirRequest, RenameRequest,
};
use curvine_common::state::MountOptions;
use curvine_common::state::{
BlockLocation, ClientAddress, CommitBlock, CreateFileOpts, WorkerInfo,
};

use curvine_common::state::{OpenFlags, RenameFlags, SetAttrOptsBuilder};
use curvine_server::master::fs::{FsRetryCache, MasterFilesystem, OperationStatus};
use curvine_server::master::journal::JournalLoader;
use curvine_server::master::journal::JournalSystem;
use curvine_server::master::replication::master_replication_manager::MasterReplicationManager;
use curvine_server::master::{JobHandler, JobManager, Master, MasterHandler, RpcContext};
Expand All @@ -31,6 +35,7 @@ use orpc::message::Builder;
use orpc::runtime::AsyncRuntime;
use orpc::CommonResult;
use std::sync::Arc;

// Test the master filesystem function separately.
// This test does not require a cluster startup.
// Returns (MasterFilesystem, JournalSystem) to ensure proper resource cleanup.
Expand Down Expand Up @@ -690,3 +695,192 @@ fn rename_retry(handler: &mut MasterHandler) -> CommonResult<()> {

Ok(())
}

// Helper: creates a leader + follower pair, returns (leader_fs, leader_js, loader, follower_js)
fn setup_pair(
name: &str,
) -> (
MasterFilesystem,
JournalSystem,
JournalLoader,
JournalSystem,
MasterFilesystem,
) {
Master::init_test_metrics();
let mut conf = ClusterConf {
testing: true,
..Default::default()
};
let worker = WorkerInfo::default();

conf.change_test_meta_dir(format!("idem-{}-leader", name));
let js1 = JournalSystem::from_conf(&conf).unwrap();
let fs1 = MasterFilesystem::with_js(&conf, &js1);
fs1.add_test_worker(worker.clone());

conf.change_test_meta_dir(format!("idem-{}-follower", name));
let js2 = JournalSystem::from_conf(&conf).unwrap();
let fs2 = MasterFilesystem::with_js(&conf, &js2);
fs2.add_test_worker(worker);
let loader = JournalLoader::new(
fs2.fs_dir(),
js2.mount_manager(),
&conf.journal,
js2.job_manager(),
);

(fs1, js1, loader, js2, fs2)
}

/// Simulate the entry replaying at the follower
fn replay_all_then_duplicate_last(js: &JournalSystem, loader: &JournalLoader) {
let entries = js.fs().fs_dir.read().take_entries();
assert!(!entries.is_empty());

// First: replay all entries
for e in entries.iter() {
loader.apply_entry(e.clone()).unwrap();
}

// Second: replay last entry again
let dup_start = entries.len() - 1;
for e in &entries[dup_start..] {
let result = loader.apply_entry(e.clone());
assert!(
result.is_ok(),
"duplicate entry should be idempotent: {:?}",
result
);
}
}

#[test]
fn test_idempotent_mkdir() -> CommonResult<()> {
let (fs, js, loader, _js2, fs2) = setup_pair("mkdir");
fs.mkdir("/data", false)?;
replay_all_then_duplicate_last(&js, &loader);
assert_eq!(fs.sum_hash(), fs2.sum_hash());
Ok(())
}

#[test]
fn test_idempotent_create_file() -> CommonResult<()> {
let (fs, js, loader, _js2, fs2) = setup_pair("create-file");
fs.create("/file.log", true)?;
replay_all_then_duplicate_last(&js, &loader);
assert_eq!(fs.sum_hash(), fs2.sum_hash());
Ok(())
}

#[test]
fn test_idempotent_delete() -> CommonResult<()> {
let (fs, js, loader, _js2, fs2) = setup_pair("delete");
fs.mkdir("/data", false)?;
fs.delete("/data", true)?;
replay_all_then_duplicate_last(&js, &loader);
assert_eq!(fs.sum_hash(), fs2.sum_hash());
Ok(())
}

#[test]
fn test_idempotent_rename() -> CommonResult<()> {
let (fs, js, loader, _js2, fs2) = setup_pair("rename");
fs.mkdir("/src", false)?;
fs.rename("/src", "/dst", RenameFlags::empty())?;
replay_all_then_duplicate_last(&js, &loader);
assert_eq!(fs.sum_hash(), fs2.sum_hash());
Ok(())
}

#[test]
fn test_idempotent_free() -> CommonResult<()> {
let (fs, js, loader, _js2, fs2) = setup_pair("free");
fs.create("/file.log", true)?;
// Set ufs_mtime > 0 so the free function passes the ufs_exists() check
let set_opts = SetAttrOptsBuilder::new().ufs_mtime(1).build();
fs.set_attr("/file.log", set_opts)?;
fs.free("/file.log")?;
replay_all_then_duplicate_last(&js, &loader);
assert_eq!(fs.sum_hash(), fs2.sum_hash());
Ok(())
}

#[test]
fn test_idempotent_set_attr() -> CommonResult<()> {
let (fs, js, loader, _js2, fs2) = setup_pair("set-attr");
fs.mkdir("/data", false)?;
let opts = SetAttrOptsBuilder::new().owner("test_owner").build();
fs.set_attr("/data", opts)?;
replay_all_then_duplicate_last(&js, &loader);
assert_eq!(fs.sum_hash(), fs2.sum_hash());
Ok(())
}

#[test]
fn test_idempotent_unmount() -> CommonResult<()> {
let (fs, js, loader, _js2, fs2) = setup_pair("unmount");
let mnt_mgr = js.mount_manager();
let mnt_opt = MountOptions::builder().build();
mnt_mgr.mount(None, "/mnt/test", "oss://bucket/", &mnt_opt)?;
mnt_mgr.umount("/mnt/test")?;
replay_all_then_duplicate_last(&js, &loader);
assert_eq!(fs.sum_hash(), fs2.sum_hash());
Ok(())
}

#[test]
fn test_idempotent_symlink() -> CommonResult<()> {
let (fs, js, loader, _js2, fs2) = setup_pair("symlink");
fs.mkdir("/dir", false)?;
fs.symlink("/target", "/dir/link", false, 0o777)?;
replay_all_then_duplicate_last(&js, &loader);
assert_eq!(fs.sum_hash(), fs2.sum_hash());
Ok(())
}

#[test]
fn test_idempotent_link() -> CommonResult<()> {
let (fs, js, loader, _js2, fs2) = setup_pair("link");
fs.create("/original.txt", true)?;
fs.link("/original.txt", "/hardlink.txt")?;
replay_all_then_duplicate_last(&js, &loader);
assert_eq!(fs.sum_hash(), fs2.sum_hash());
Ok(())
}

#[test]
fn test_idempotent_mount() -> CommonResult<()> {
let (fs, js, loader, _js2, fs2) = setup_pair("mount");
let mnt_mgr = js.mount_manager();
let mount_uri = CurvineURI::new("/mnt/test")?;
let ufs_uri = CurvineURI::new("oss://bucket1/")?;
let mnt_opt = MountOptions::builder().build();
mnt_mgr.mount(
None,
mount_uri.path(),
ufs_uri.encode_uri().as_ref(),
&mnt_opt,
)?;
replay_all_then_duplicate_last(&js, &loader);
assert_eq!(fs.sum_hash(), fs2.sum_hash());
Ok(())
}

#[test]
fn test_idempotent_set_locks() -> CommonResult<()> {
let (fs, js, loader, _js2, fs2) = setup_pair("set-locks");
fs.create("/lockfile.log", true)?;
let lock = curvine_common::state::FileLock {
client_id: "client1".to_string(),
owner_id: 1,
lock_type: curvine_common::state::LockType::WriteLock,
lock_flags: curvine_common::state::LockFlags::Plock,
start: 0,
end: 100,
..Default::default()
};
fs.set_lock("/lockfile.log", lock)?;
replay_all_then_duplicate_last(&js, &loader);
assert_eq!(fs.sum_hash(), fs2.sum_hash());
Ok(())
}