Skip to content

Commit ab6b83e

Browse files
Anirudh VyasAnirudhVyas
Anirudh Vyas
authored andcommitted
sdk_api_read ready - the replay / scan is wired using segment reader actor as a 'first' operation that is is run on materializer materialize so that events are sinked before any events start flowing
1 parent bdda1b4 commit ab6b83e

File tree

10 files changed

+944
-141
lines changed

10 files changed

+944
-141
lines changed

src/core/aof.rs

Lines changed: 387 additions & 78 deletions
Large diffs are not rendered by default.

src/core/date_time.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,34 @@ pub const MS_PER_HOUR: u64 = 3_600_000; // 3 600 s * 1 000 ms
33
pub const MS_PER_MINUTE: u64 = 60_000; // 60 s * 1 000 ms
44
pub const MS_PER_SECOND: u64 = 1_000; // 1 s * 1 000 ms
55

6+
/// Shortcut for epoch ms.
7+
pub fn emit_secs() -> u64 {
8+
std::time::SystemTime::now()
9+
.duration_since(std::time::UNIX_EPOCH)
10+
.expect("failed to get timestamp")
11+
.as_secs()
12+
}
13+
14+
pub fn emit_millis() -> u128 {
15+
std::time::SystemTime::now()
16+
.duration_since(std::time::UNIX_EPOCH)
17+
.expect("failed to get timestamp")
18+
.as_millis()
19+
}
20+
21+
pub fn emit_micros() -> u128 {
22+
std::time::SystemTime::now()
23+
.duration_since(std::time::UNIX_EPOCH)
24+
.expect("failed to get timestamp")
25+
.as_micros()
26+
}
27+
28+
pub fn emit_nanos() -> u128 {
29+
std::time::SystemTime::now()
30+
.duration_since(std::time::UNIX_EPOCH)
31+
.expect("failed to get timestamp")
32+
.as_nanos()
33+
}
634
/// Given any epoch‐ms, return (start_of_utc_day, start_of_next_utc_day).
735
/// All in UTC. If you need local‐midnight instead, just add/subtract your fixed offset.
836
pub fn day_bounds_from_epoch_ms(ms: u64) -> (u64, u64) {

src/core/event.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ pub enum SystemEventKind {
3232
Resume,
3333
Shutdown,
3434
Restart,
35+
Replay, // Replay is a special event that is used to replay the event log
3536
}
3637

3738
impl EventType {
@@ -48,8 +49,9 @@ impl EventType {
4849
4 => EventType::SystemEvent(SystemEventKind::Resume),
4950
5 => EventType::SystemEvent(SystemEventKind::Shutdown),
5051
6 => EventType::SystemEvent(SystemEventKind::Restart),
51-
7 => EventType::NetworkEvent(value),
52-
8 => EventType::StorageEvent(value),
52+
7 => EventType::SystemEvent(SystemEventKind::Replay),
53+
8 => EventType::NetworkEvent(value),
54+
9 => EventType::StorageEvent(value),
5355
_ => panic!("Invalid event type"),
5456
}
5557
}
@@ -63,6 +65,7 @@ impl EventType {
6365
EventType::SystemEvent(SystemEventKind::Resume) => 4,
6466
EventType::SystemEvent(SystemEventKind::Shutdown) => 5,
6567
EventType::SystemEvent(SystemEventKind::Restart) => 6,
68+
EventType::SystemEvent(SystemEventKind::Replay) => 7,
6669
EventType::NetworkEvent(v) => *v,
6770
EventType::StorageEvent(v) => *v,
6871
EventType::MetaEvent(v) => META_BASE + *v,

src/core/meta.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
11
use bytemuck::{Pod, Zeroable};
22
use rkyv::{Archive, Deserialize, Serialize};
33

4-
#[repr(C)]
5-
#[derive(Debug, Clone, Archive, Serialize, Deserialize, Default)]
6-
#[rkyv(derive(Debug))]
7-
#[derive(Copy)]
4+
#[repr(C, packed)]
5+
#[derive(Debug, Clone, Default, Copy)]
86
pub struct SegmentMeta {
97
pub page_index: usize,
108
pub segment_index: usize,
119
pub write_pos: usize,
1210
pub byte_offset: usize,
1311
pub latest_segment_id: usize,
12+
pub ts_start: u64, // Timestamp of the first event in this segment
13+
pub ts_end: u64, // Timestamp of the last event in this segment
1414
}
1515

1616
unsafe impl Zeroable for SegmentMeta {}
@@ -35,10 +35,8 @@ pub struct ReaderCursor {
3535
unsafe impl Zeroable for ReaderCursor {}
3636
unsafe impl Pod for ReaderCursor {}
3737

38-
#[repr(C)]
39-
#[derive(Debug, Clone, Archive, Serialize, Deserialize, Default)]
40-
#[rkyv(derive(Debug))]
41-
#[derive(Copy)]
38+
#[repr(C, packed)]
39+
#[derive(Debug, Clone, Default, Copy)]
4240
pub struct MMRMeta {
4341
pub root_hash: [u8; 32], // Replace String with a fixed-size array to make it Copy
4442
pub peaks_count: usize,

src/indexing/hash.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,13 @@ pub fn sha_256_concat_hash(left: &[u8; 32], right: &[u8; 32]) -> [u8; 32] {
5656
hash.as_slice().try_into().unwrap_or_default()
5757
}
5858

59+
pub fn sha_256_hash(n: Vec<u8>) -> [u8; 32] {
60+
let mut sha256 = sha2::Sha256::new();
61+
sha256.update(n);
62+
let hash = sha256.finalize();
63+
hash.as_slice().try_into().unwrap_or_default()
64+
}
65+
5966
#[cfg(test)]
6067
mod tests {
6168

src/indexing/storage/actors/mmr_actor.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -112,25 +112,23 @@ mod actor_tests {
112112
}
113113

114114
fn make_test_store(prefix: String) -> SegmentWriterActor {
115+
let tmp = tempdir().expect("failed to create tempdir");
115116
// small page size so tests finish quickly, 1 page per segment to avoid rollover
116117
let cfg = SegmentConfig {
117118
page_size: 32,
118119
pages_per_segment: 1,
119120
prefix,
120-
lmdb_env_path: "xaeroflux".to_string(),
121+
segment_dir: tmp.path().to_string_lossy().into(),
122+
lmdb_env_path: tmp.path().to_string_lossy().into(),
121123
};
122124
SegmentWriterActor::new_with_config(cfg)
123125
}
124126

125127
#[test]
126128
fn actor_appends_to_in_memory_mmr() {
127129
initialize();
128-
let path = "xaeroflux";
129-
std::fs::create_dir_all(path).expect("couldn't create LMDB dir");
130-
131130
// store config uses small pages; we only care about MMR here
132-
let dir = tempdir().expect("failed to create tempdir");
133-
let store = make_test_store(dir.path().join("mmr").display().to_string());
131+
let store = make_test_store("mmr".to_string());
134132
let actor = MmrIndexingActor::new(Some(store), None);
135133

136134
// Fire one event

src/indexing/storage/actors/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
pub mod indexing_actor;
22
pub mod mmr_actor;
3+
pub mod segment_reader_actor;
34
pub mod segment_writer_actor;

0 commit comments

Comments
 (0)