Skip to content

Commit f1f91fb

Browse files
author
Anirudh Vyas
committed
p2p skeleton and starting to put together stuff for
1 parent 6872c18 commit f1f91fb

File tree

11 files changed

+232
-34
lines changed

11 files changed

+232
-34
lines changed

src/core/meta.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ pub struct SegmentMeta {
3333
unsafe impl Zeroable for SegmentMeta {}
3434
unsafe impl Pod for SegmentMeta {}
3535

36-
/// Cursor tracking a subscribers read position in the segment log.
36+
/// Cursor tracking a subscriber's read position in the segment log.
3737
///
3838
/// Fields:
3939
/// - `subscriber_name`: fixed-size identifier of the subscriber.

src/core/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ pub trait XaeroData: Any + Send + Sync + Clone + Debug {}
4343
impl<T> XaeroData for T where T: Any + Send + Sync + Clone + Debug {}
4444

4545
/// Global, singleton configuration instance.
46-
///
46+
///
4747
/// Initialized by `load_config` and reused thereafter.
4848
pub static CONF: OnceLock<config::Config> = OnceLock::new();
4949

@@ -84,7 +84,7 @@ pub fn init_global_io_pool() {
8484
/// - Loads and validates configuration (`xaeroflux.toml`).
8585
/// - Initializes dispatcher and I/O thread pools.
8686
/// - Sets up logging and displays startup banner.
87-
///
87+
///
8888
/// # Panics
8989
/// Will panic if the configuration name is not "xaeroflux".
9090
pub fn initialize() {

src/indexing/storage/actors/mmr_actor.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use crate::{
1919
hash::sha_256,
2020
storage::{format::archive, mmr::XaeroMmrOps},
2121
},
22+
system::CONTROL_BUS,
2223
};
2324

2425
/// Actor responsible for indexing events into a Merkle Mountain Range (MMR).
@@ -38,7 +39,7 @@ pub struct MmrIndexingActor {
3839
impl MmrIndexingActor {
3940
/// Create a new `MmrIndexingActor` with optional store and listener.
4041
///
41-
/// If `store` is `None`, a default `SegmentWriterActor` with prefix "mmr" is used.
42+
/// If `store` is `None`, a default `SegmentWriterActor` with prefix "xaeroflux-mmr" is used.
4243
/// If `listener` is `None`, an `EventListener` is created that:
4344
/// 1. Archives each event into bytes.
4445
/// 2. Persists the event into LMDB.
@@ -54,7 +55,7 @@ impl MmrIndexingActor {
5455
let _mmr = Arc::new(Mutex::new(crate::indexing::storage::mmr::XaeroMmr::new()));
5556
let _store = Arc::new(store.unwrap_or_else(|| {
5657
SegmentWriterActor::new_with_config(super::segment_writer_actor::SegmentConfig {
57-
prefix: "mmr".to_string(),
58+
prefix: "xaeroflux-mmr".to_string(),
5859
..Default::default()
5960
})
6061
}));

src/indexing/storage/actors/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@ pub mod indexing_actor;
22
pub mod mmr_actor;
33
pub mod segment_reader_actor;
44
pub mod segment_writer_actor;
5+
pub mod secondary_index_actor;
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
use std::{
2+
collections::HashMap,
3+
sync::{Arc, Mutex},
4+
time::{Duration, Instant},
5+
};
6+
7+
use crate::{
8+
core::{aof::LmdbEnv, event::Event, listeners::EventListener, meta::SegmentMeta, IO_POOL},
9+
system::control_bus::SystemPayload,
10+
};
11+
12+
pub struct SecondaryIndexActor {
13+
/// cache for the secondary index.
14+
pub(crate) cache: HashMap<[u8; 32], (Option<SegmentMeta>, bool, Instant)>,
15+
pub(crate) lmdb_env: Arc<Mutex<LmdbEnv>>,
16+
// The TTL for the garbage collection of the secondary index.
17+
pub(crate) gc_ttl: Duration,
18+
pub listener: EventListener<SystemPayload>,
19+
}
20+
21+
impl SecondaryIndexActor {
22+
pub fn new(lmdb_env: Arc<Mutex<LmdbEnv>>, gc_ttl: Duration) -> Self {
23+
let (tx, rx) = crossbeam::channel::unbounded::<Event<SystemPayload>>();
24+
let txc = tx.clone();
25+
let listener = EventListener::new(
26+
"secondary_index_actor",
27+
Arc::new(move |e: Event<SystemPayload>| {
28+
txc.send(e).expect("failed to send event");
29+
}),
30+
None,
31+
Some(1), // single-threaded handler
32+
);
33+
34+
IO_POOL.get()
35+
.expect("IO_POOL not initialized")
36+
.execute(move || {
37+
let env = lmdb_env.lock().expect("Failed to lock LMDB environment");
38+
env.dbis[1]
39+
});
40+
41+
Self {
42+
cache: HashMap::new(),
43+
lmdb_env,
44+
gc_ttl,
45+
listener,
46+
}
47+
}
48+
49+
/// Checks if the secondary index is empty.
50+
pub fn is_empty(&self) -> bool {
51+
self.cache.is_empty()
52+
}
53+
}

src/indexing/storage/actors/segment_writer_actor.rs

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@
22
//!
33
//! This module provides:
44
//! - `SegmentConfig`: configuration parameters for paging and segmentation.
5-
//! - `SegmentWriterActor`: an actor that listens for archived event blobs
6-
//! and writes them into fixed-size pages within segment files.
7-
//! - `run_writer_loop`: core loop logic for handling page boundaries,
8-
//! file rollover, and metadata events.
5+
//! - `SegmentWriterActor`: an actor that listens for archived event blobs and writes them into
6+
//! fixed-size pages within segment files.
7+
//! - `run_writer_loop`: core loop logic for handling page boundaries, file rollover, and metadata
8+
//! events.
99
//! - Unit tests verifying segment math, flush behavior, and rollover/resume logic.
1010
1111
use std::{
@@ -28,6 +28,7 @@ use crate::{
2828
size::PAGE_SIZE,
2929
},
3030
indexing::storage::format::archive,
31+
system::{CONTROL_BUS, control_bus::SystemPayload},
3132
};
3233

3334
/// Configuration for paged segment storage.
@@ -64,8 +65,8 @@ impl Default for SegmentConfig {
6465
/// Actor responsible for writing serialized event frames into segment files.
6566
///
6667
/// This actor sets up:
67-
/// - An `EventListener<Vec<u8>>` that archives incoming `Event<Vec<u8>>` values
68-
/// and sends their byte frames to `inbox`.
68+
/// - An `EventListener<Vec<u8>>` that archives incoming `Event<Vec<u8>>` values and sends their
69+
/// byte frames to `inbox`.
6970
/// - A background thread (`_handle`) running `run_writer_loop` to consume from `inbox`.
7071
/// - An LMDB environment (`meta_db`) to record segment rollover metadata.
7172
/// - The active `SegmentConfig` used for page size and directory settings.
@@ -153,9 +154,9 @@ impl SegmentWriterActor {
153154
/// 2. Iterates existing `SegmentMeta` entries to resume at the latest segment/page.
154155
/// 3. Opens (or creates) the current segment file, memory-maps it.
155156
/// 4. On each `rx.recv()`:
156-
/// - If the incoming frame does not fit in the current page, flushes the page,
157-
/// advances `page_index`, and handles segment rollover (including
158-
/// emitting a `MetaEvent` into LMDB and remapping a new file).
157+
/// - If the incoming frame does not fit in the current page, flushes the page, advances
158+
/// `page_index`, and handles segment rollover (including emitting a `MetaEvent` into LMDB and
159+
/// remapping a new file).
159160
/// - Copies the frame bytes into the current page at `byte_offset + write_pos`.
160161
/// - Updates `write_pos`.
161162
/// 5. On channel close, performs a final flush of any unwritten bytes.
@@ -267,8 +268,37 @@ fn run_writer_loop(meta_db: &Arc<Mutex<LmdbEnv>>, rx: Receiver<Vec<u8>>, config:
267268
let end = start + write_len;
268269
mm[start..end].copy_from_slice(data.as_slice());
269270
write_pos += write_len;
271+
tracing::debug!(
272+
"Wrote {} bytes to segment {} page {} at offset {}",
273+
write_len,
274+
seg_id,
275+
local_page_idx,
276+
start
277+
);
278+
tracing::debug!("sending message to control bus");
279+
let payload_written_msg_sent_ack = CONTROL_BUS
280+
.get()
281+
.expect("control_bus_not_initialized")
282+
.sender()
283+
.send(SystemPayload::PayloadWritten {
284+
meta: SegmentMeta {
285+
page_index,
286+
segment_index: seg_id,
287+
write_pos,
288+
byte_offset,
289+
latest_segment_id: seg_id,
290+
ts_start,
291+
ts_end: emit_secs(),
292+
},
293+
});
294+
match payload_written_msg_sent_ack {
295+
Ok(_) => tracing::debug!("Payload written message sent successfully"),
296+
Err(e) => {
297+
// FIXME: SHOULD WE PANIC HERE??
298+
tracing::error!("Failed to send PayloadWritten message: {}", e)
299+
}
300+
}
270301
}
271-
272302
// final flush if any data
273303
if write_pos > 0 {
274304
mm.flush_range(byte_offset, page_size)

src/indexing/storage/format.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ pub fn unarchive<'a>(bytes: &'a [u8]) -> (&'a XaeroOnDiskEventHeader, &'a Archiv
162162
/// For each leaf (event) we store exactly where its bytes live on disk
163163
/// and what its timestamp was.
164164
#[repr(C)]
165-
#[derive(Clone, Copy)]
165+
#[derive(Debug, Clone, Copy)]
166166
pub struct LeafLocation {
167167
/// Which payload segment file ('.seg') holds the event
168168
pub segment_index: u32, // 4 bytes

src/indexing/storage/mmr.rs

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
use rkyv::Archive;
22

3-
use crate::indexing::{
4-
hash::sha_256_concat_hash,
5-
merkle_tree::{XaeroMerkleProof, XaeroMerkleTree, XaeroMerkleTreeOps},
3+
use crate::{
4+
indexing::{
5+
hash::sha_256_concat_hash,
6+
merkle_tree::{XaeroMerkleProof, XaeroMerkleTree, XaeroMerkleTreeOps},
7+
},
8+
system::{CONTROL_BUS, control_bus::SystemPayload},
69
};
710

811
/// Peak represents a peak in the Merkle Mountain Range (MMR).
@@ -64,6 +67,9 @@ pub trait XaeroMmrOps {
6467
proof: &XaeroMerkleProof,
6568
expected_root: [u8; 32],
6669
) -> bool;
70+
71+
/// Convenience: gets the leaf hash by its index.
72+
fn get_leaf_hash_by_index(&self, index: usize) -> Option<&[u8; 32]>;
6773
}
6874

6975
impl Default for XaeroMmr {
@@ -119,6 +125,22 @@ impl XaeroMmrOps for XaeroMmr {
119125
// bag peaks and recalculate root
120126
let t = XaeroMerkleTree::neo(self.peaks.iter().map(|p| p.root).collect());
121127
self.root = t.root();
128+
let res = self.leaf_hashes.iter().enumerate().map(|(i, h)| {
129+
tracing::debug!("leaf_hashes[{}]: {:?}", i, h);
130+
CONTROL_BUS
131+
.get()
132+
.expect("CONTROL_BUS must be initialized")
133+
.sender()
134+
.send(SystemPayload::MmrAppended {
135+
leaf_index: i as u64,
136+
leaf_hash: *h,
137+
})
138+
});
139+
res.for_each(|r| {
140+
if let Err(e) = r {
141+
tracing::error!("Failed to send MmrAppended event: {:?}", e);
142+
}
143+
});
122144
changed
123145
}
124146

@@ -203,6 +225,10 @@ impl XaeroMmrOps for XaeroMmr {
203225
}
204226
running == expected_root
205227
}
228+
229+
fn get_leaf_hash_by_index(&self, index: usize) -> Option<&[u8; 32]> {
230+
self.leaf_hashes.get(index)
231+
}
206232
}
207233

208234
#[cfg(test)]

src/lib.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ pub mod indexing;
33
pub mod logs;
44
pub mod networking;
55
pub mod sys;
6-
6+
pub mod system;
77
use core::{DISPATCHER_POOL, aof::AOFActor, event::Event};
88
use std::{
99
sync::{
@@ -23,6 +23,7 @@ use indexing::storage::{
2323
},
2424
format::archive,
2525
};
26+
use system::{CONTROL_BUS, init_control_bus};
2627
use threadpool::ThreadPool;
2728

2829
static NEXT_ID: AtomicU64 = AtomicU64::new(1);
@@ -149,6 +150,10 @@ impl Subject {
149150
///
150151
/// Returns an `XFluxHandle` that keeps everything alive until dropped.
151152
pub fn unsafe_run(self: Arc<Self>) -> XFluxHandle {
153+
tracing::debug!("unsafe_run called for Subject: {}", self.name);
154+
tracing::debug!("initializing control bus");
155+
init_control_bus();
156+
tracing::debug!("control bus initialized");
152157
self.unsafe_run_called.store(true, Ordering::SeqCst);
153158
// Instantiate system actors
154159
let aof = Arc::new(AOFActor::new());

src/system/control_bus.rs

Lines changed: 85 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,91 @@
1+
use std::sync::Arc;
2+
13
use bytemuck::{Pod, Zeroable};
2-
use crate::core::event::{LeafLocation, SystemErrorCode};
4+
use crossbeam::channel::{Receiver, Sender, unbounded};
5+
6+
use crate::core::{DISPATCHER_POOL, meta::SegmentMeta};
37

48
/// The data carried by each system event.
59
/// All fields are fixed-size so this enum is Pod-friendly.
610
#[repr(C)]
7-
#[derive(Debug, Clone, Copy, Archive, Serialize, Deserialize, Pod, Zeroable)]
8-
#[archive_bounds(::core::fmt::Debug)]
11+
#[derive(Debug, Clone, Copy)]
912
pub enum SystemPayload {
10-
PayloadWritten { leaf_index: u64, loc: LeafLocation },
11-
SegmentRolledOver { ts_start: u64, seg_idx: u32 },
12-
SegmentRollOverFailed { ts_start: u64, seg_idx: u32, error_code: u16 },
13-
PageFlushed { page_index: u32 },
14-
PageFlushFailed { page_index: u32, error_code: u16 },
15-
MmrAppended { leaf_index: u64, leaf_hash: [u8;32] },
16-
MmrAppendFailed { leaf_index: u64, error_code: u16 },
17-
SecondaryIndexWritten { leaf_hash: [u8;32] },
18-
SecondaryIndexFailed { leaf_hash: [u8;32], error_code: u16 },
19-
}
13+
/// Indicates an event payload was written to a segment page.
14+
PayloadWritten {
15+
meta: SegmentMeta,
16+
},
17+
SegmentRolledOver {
18+
meta: SegmentMeta,
19+
},
20+
SegmentRollOverFailed {
21+
meta: SegmentMeta,
22+
error_code: u16,
23+
},
24+
PageFlushed {
25+
meta: SegmentMeta,
26+
},
27+
PageFlushFailed {
28+
meta: SegmentMeta,
29+
error_code: u16,
30+
},
31+
MmrAppended {
32+
leaf_index: u64,
33+
leaf_hash: [u8; 32],
34+
},
35+
MmrAppendFailed {
36+
leaf_index: u64,
37+
error_code: u16,
38+
},
39+
SecondaryIndexWritten {
40+
leaf_hash: [u8; 32],
41+
},
42+
SecondaryIndexFailed {
43+
leaf_hash: [u8; 32],
44+
error_code: u16,
45+
},
46+
}
47+
48+
unsafe impl Zeroable for SystemPayload {}
49+
unsafe impl Pod for SystemPayload {}
50+
51+
pub struct ControlBus {
52+
pub(crate) tx: Sender<SystemPayload>,
53+
pub(crate) rx: Receiver<SystemPayload>,
54+
}
55+
56+
impl ControlBus {
57+
/// Create a new ControlBus with an unbounded channel.
58+
pub fn new() -> Self {
59+
let (tx, rx) = unbounded();
60+
Self { tx, rx }
61+
}
62+
63+
/// Get a sender handle for publishing system payloads.
64+
pub fn sender(&self) -> Sender<SystemPayload> {
65+
self.tx.clone()
66+
}
67+
68+
/// Subscribe to receive all system payloads.
69+
pub fn subscribe(&self) -> Receiver<SystemPayload> {
70+
self.rx.clone()
71+
}
72+
73+
/// Register a handler that will be called for every system payload.
74+
pub fn register_handler<H: ControlHandler + Send + Sync + 'static>(&self, handler: Arc<H>) {
75+
let rx = self.rx.clone();
76+
DISPATCHER_POOL
77+
.get()
78+
.expect("DISPATCH POOL not initialized")
79+
.execute(move || {
80+
while let Ok(payload) = rx.recv() {
81+
handler.handle(payload);
82+
}
83+
});
84+
}
85+
}
86+
87+
/// Trait that actors implement to handle incoming system payloads.
88+
pub trait ControlHandler {
89+
/// Called for each system payload received on the control bus.
90+
fn handle(&self, payload: SystemPayload);
91+
}

0 commit comments

Comments
 (0)