Skip to content

Commit 9a6df64

Browse files
committed
fix: improve snapshot restore safety, Raft hard-state durability, and inode nlink atomicity
- Use hard-link copy (RocksUtils::link_dir) instead of rename in DBEngine::restore so the source checkpoint is preserved after restore - Explicitly drop the old DB handle via a temporary path before any filesystem operations to release the RocksDB LOCK reliably - Flush memtable + WAL before creating a checkpoint to guarantee all in-memory data is captured in SST files - Fix set_hard_state_commit to actually persist the commit index to RocksDB (previously it only updated the in-memory field), and use commit_sync so the write is fsync-ed before returning - Fix purge_checkpoint to never delete the currently active checkpoint: only consider directories whose mtime is strictly older than the current checkpoint - Fix increment_inode_nlink / decrement_inode_nlink to accept the caller's WriteBatch so that nlink changes and edge mutations are committed atomically - Fix unlink operations to correctly propagate DeleteResult (block list) when nlink reaches zero
1 parent 9fbe5e2 commit 9a6df64

10 files changed

Lines changed: 159 additions & 64 deletions

File tree

curvine-common/src/raft/raft_node.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,8 @@ where
183183
app_store: &B,
184184
voters: Vec<u64>,
185185
) -> RaftResult<u64> {
186+
info!("init raft state: {:?}", log_store.initial_state()?);
187+
186188
let spend = TimeSpent::new();
187189

188190
match log_store.latest_snapshot()? {

curvine-common/src/raft/storage/rocks_storage_core.rs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,13 +105,21 @@ impl RocksStorageCore {
105105

106106
pub fn set_hard_state(&mut self, hs: HardState) -> RaftResult<()> {
107107
self.raft_state.hard_state = hs.clone();
108-
self.db
109-
.put_cf(Self::CF_META, Self::STATE_KEY, hs.encode_to_vec())?;
108+
109+
let mut batch = StoreWriteBatch::new(&self.db);
110+
batch.set_state(&hs)?;
111+
batch.commit()?;
112+
110113
Ok(())
111114
}
112115

113116
pub fn set_hard_state_commit(&mut self, commit: u64) -> RaftResult<()> {
114117
self.mut_hard_state().set_commit(commit);
118+
119+
let mut batch = StoreWriteBatch::new(&self.db);
120+
batch.set_state(&self.raft_state.hard_state)?;
121+
batch.commit()?;
122+
115123
Ok(())
116124
}
117125

@@ -414,7 +422,16 @@ impl<'a> StoreWriteBatch<'a> {
414422
Ok(())
415423
}
416424

425+
fn set_state(&mut self, state: &HardState) -> CommonResult<()> {
426+
self.0.put_cf(
427+
RocksStorageCore::CF_META,
428+
RocksStorageCore::STATE_KEY,
429+
state.encode_to_vec(),
430+
)?;
431+
Ok(())
432+
}
433+
417434
fn commit(self) -> CommonResult<()> {
418-
self.0.commit()
435+
self.0.commit_sync()
419436
}
420437
}

curvine-common/src/rocksdb/db_engine.rs

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -55,21 +55,19 @@ impl DBEngine {
5555
}
5656

5757
pub fn restore<T: AsRef<str>>(&mut self, checkpoint: T) -> CommonResult<()> {
58-
let db_path = &self.conf.data_dir;
58+
let db_path = self.conf.data_dir.clone();
5959
let db_opt = self.conf.create_db_opt();
6060
let cfs = self.conf.create_cf_opt();
61+
let checkpoint = checkpoint.as_ref();
6162

6263
//The database points to a temporary directory.
6364
let tmp_path = Utils::temp_file();
6465
self.db = try_err!(DB::open(&db_opt, &tmp_path));
66+
let _ = FileUtils::delete_path(&tmp_path, true);
67+
FileUtils::delete_path(&db_path, true)?;
6568

66-
// Delete the original file and move the checkpoint to the data directory.
67-
FileUtils::delete_path(db_path, true)?;
68-
FileUtils::rename(checkpoint.as_ref(), db_path)?;
69-
70-
// Retry instantiating db and delete the temporary directory.
71-
self.db = try_err!(DB::open_cf_with_opts(&db_opt, db_path, cfs));
72-
let _ = FileUtils::delete_path(tmp_path, true);
69+
try_err!(RocksUtils::link_dir(checkpoint, &db_path));
70+
self.db = try_err!(DB::open_cf_with_opts(&db_opt, &db_path, cfs));
7371

7472
Ok(())
7573
}
@@ -204,6 +202,8 @@ impl DBEngine {
204202

205203
// Create a checkpoint.
206204
pub fn create_checkpoint(&self, id: u64) -> CommonResult<String> {
205+
self.flush(true)?;
206+
207207
let checkpoint_path = self.get_checkpoint_path(id);
208208
let existed = FileUtils::exists(&checkpoint_path);
209209

@@ -246,8 +246,10 @@ impl DBEngine {
246246
&self.db
247247
}
248248

249-
pub fn flush(&self) -> CommonResult<()> {
250-
self.db.flush()?;
249+
pub fn flush_mem(&self, sync: bool) -> CommonResult<()> {
250+
let mut opts = FlushOptions::default();
251+
opts.set_wait(sync);
252+
self.db.flush_opt(&opts)?;
251253
Ok(())
252254
}
253255

@@ -256,6 +258,14 @@ impl DBEngine {
256258
Ok(())
257259
}
258260

261+
pub fn flush(&self, sync: bool) -> CommonResult<()> {
262+
self.flush_mem(sync)?;
263+
if !self.conf.disable_wal {
264+
self.flush_wal(sync)?;
265+
}
266+
Ok(())
267+
}
268+
259269
pub fn write_batch(&self, batch: WriteBatchWithTransaction<false>) -> CommonResult<()> {
260270
try_err!(self.db.write(batch));
261271
Ok(())

curvine-common/src/rocksdb/rocks_utils.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,11 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::fs;
1516
use byteorder::{BigEndian, ByteOrder};
1617
use orpc::{err_box, CommonResult};
1718
use prost::bytes::BufMut;
19+
use orpc::io::IOResult;
1820

1921
// A utility class that converts some types to bytes.
2022
pub struct RocksUtils;
@@ -180,6 +182,35 @@ impl RocksUtils {
180182

181183
end
182184
}
185+
186+
pub fn link_dir<P: AsRef<std::path::Path>>(src: P, dst: P) -> IOResult<()> {
187+
let src = src.as_ref();
188+
let dst = dst.as_ref();
189+
if !dst.exists() {
190+
fs::create_dir_all(dst)?;
191+
}
192+
193+
for entry in fs::read_dir(src)? {
194+
let entry = entry?;
195+
let src_path = entry.path();
196+
let dst_path = dst.join(entry.file_name());
197+
if src_path.is_dir() {
198+
Self::link_dir(&src_path, &dst_path)?;
199+
} else if Self::is_sst_file(&src_path) {
200+
fs::hard_link(&src_path, &dst_path)?;
201+
} else {
202+
fs::copy(&src_path, &dst_path)?;
203+
}
204+
}
205+
Ok(())
206+
}
207+
208+
fn is_sst_file(path: &std::path::Path) -> bool {
209+
matches!(
210+
path.extension().and_then(|e: &std::ffi::OsStr| e.to_str()),
211+
Some("sst") | Some("ldb")
212+
)
213+
}
183214
}
184215

185216
#[cfg(test)]

curvine-common/src/rocksdb/write_batch.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,13 @@ impl<'a> WriteBatch<'a> {
5252
self.db.write_batch(self.batch)
5353
}
5454

55-
pub fn commit_wal(self, sync: bool) -> CommonResult<()> {
55+
pub fn commit_flush(self, sync: bool) -> CommonResult<()> {
5656
self.db.write_batch(self.batch)?;
57-
self.db.flush_wal(sync)?;
57+
self.db.flush(sync)?;
5858
Ok(())
5959
}
60+
61+
pub fn commit_sync(self) -> CommonResult<()> {
62+
self.commit_flush(true)
63+
}
6064
}

curvine-server/src/master/journal/journal_loader.rs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -646,26 +646,37 @@ impl JournalLoader {
646646

647647
// Clean up expired checkpoints.
648648
pub fn purge_checkpoint(&self, current_ck: impl AsRef<str>) -> CommonResult<()> {
649-
let ck_dir = match Path::new(current_ck.as_ref()).parent() {
649+
let current_ck = current_ck.as_ref();
650+
let ck_dir = match Path::new(current_ck).parent() {
650651
None => return Ok(()),
651652
Some(v) => v,
652653
};
653654

655+
let current_mtime = match Path::new(current_ck).metadata() {
656+
Ok(meta) => FileUtils::mtime(&meta)?,
657+
Err(_) => return Ok(()),
658+
};
659+
654660
let mut vec = vec![];
655661
for entry in fs::read_dir(ck_dir)? {
656662
let entry = entry?;
657663
let meta = entry.metadata()?;
658-
vec.push((FileUtils::mtime(&meta)?, entry.path()));
664+
let mtime = FileUtils::mtime(&meta)?;
665+
if mtime < current_mtime {
666+
vec.push((mtime, entry.path()));
667+
}
659668
}
660669

661-
// Sort by modification time
670+
// Sort oldest-first and keep at most (retain_checkpoint_num - 1) older
671+
// checkpoints so that together with current_ck the total is retain_checkpoint_num.
662672
vec.sort_by_key(|x| x.0);
663-
let del_num = vec.len().saturating_sub(self.retain_checkpoint_num);
673+
let keep = self.retain_checkpoint_num.saturating_sub(1);
674+
let del_num = vec.len().saturating_sub(keep);
664675

665676
for i in 0..del_num {
666677
let path = vec[i].1.as_path();
667678
FileUtils::delete_path(path, true)?;
668-
info!("delete expired checkpoint, dir: {}", path.to_string_lossy())
679+
info!("delete expired checkpoint: {}", path.to_string_lossy());
669680
}
670681

671682
Ok(())

curvine-server/src/master/meta/fs_dir.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use curvine_common::state::{
2727
MkdirOpts, MountInfo, RenameFlags, SetAttrOpts, WorkerAddress,
2828
};
2929
use curvine_common::FsResult;
30-
use log::{info, warn};
30+
use log::{debug, info, warn};
3131
use orpc::common::{LocalTime, TimeSpent};
3232
use orpc::sync::AtomicCounter;
3333
use orpc::{err_box, err_ext, try_option, CommonResult};
@@ -94,7 +94,9 @@ impl FsDir {
9494
}
9595

9696
pub fn update_op_id(&self, op_id: u64) {
97-
self.op_id.set(op_id);
97+
if op_id > self.op_id.get() {
98+
self.op_id.set(op_id);
99+
}
98100
}
99101

100102
pub fn get_ttl_bucket_list(&self) -> Arc<TtlBucketList> {
@@ -362,7 +364,7 @@ impl FsDir {
362364
let name = inp.name().to_string();
363365

364366
// Create an inode file node.
365-
let file = InodeFile::with_opts(self.inode_id.next()?, LocalTime::mills() as i64, opts);
367+
let file = InodeFile::with_opts(self.next_inode_id()?, LocalTime::mills() as i64, opts);
366368
inp = self.add_last_inode(inp, File(name, file))?;
367369
self.journal_writer.log_create_file(self, &inp)?;
368370

@@ -582,9 +584,9 @@ impl FsDir {
582584
Ok(true)
583585
} else {
584586
err_box!(
585-
"block_id {} resolves to inode_id {} which is not a file",
587+
"block_id {} resolves to inode {:?} which is not a file",
586588
block_id,
587-
file_id
589+
v
588590
)
589591
}
590592
}
@@ -676,7 +678,7 @@ impl FsDir {
676678
let mut spend = TimeSpent::new();
677679
let path = path.as_ref();
678680

679-
// Set to other values ​​first to facilitate memory recycling.
681+
// Set to other value first to facilitate memory recycling.
680682
self.root_dir = Self::create_root();
681683

682684
// Reset rocksdb
@@ -691,10 +693,10 @@ impl FsDir {
691693
let time2 = spend.used_ms();
692694

693695
info!(
694-
"Restore from {}, restore rocksdb used {} ms, \
696+
"restore from {}, restore rocksdb used {} ms, \
695697
build in-memory directory tree used {} ms, \
696-
statistics updated during tree reconstruction",
697-
path, time1, time2
698+
statistics updated during tree reconstruction, last_inode_id {}",
699+
path, time1, time2, last_inode_id
698700
);
699701
Ok(())
700702
}
@@ -985,7 +987,7 @@ impl FsDir {
985987
return Ok(DeleteResult::new());
986988
}
987989
let del_blocks = file.resize(opts.clone())?;
988-
info!("resize file {} success, opts: {:?}", inp.path(), opts);
990+
debug!("resize file {} success, opts: {:?}", inp.path(), opts);
989991

990992
file.complete(file.len, &[], "", true)?;
991993
let mut del_res = DeleteResult::new();

curvine-server/src/master/meta/inode/inodes_children.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,8 +153,8 @@ impl InodeChildren {
153153
// But return owned pointer to complete object
154154
Ok(InodePtr::from_owned(*inode))
155155
} else {
156-
// Directory directly stores complete object and returns its reference
157-
let inserted = v.insert(inode.clone());
156+
// Directory: move ownership into the map and return a reference.
157+
let inserted = v.insert(inode);
158158
Ok(InodePtr::from_ref(inserted.as_ref()))
159159
}
160160
}

0 commit comments

Comments
 (0)