Skip to content

Commit d9c72f2

Browse files
committed
use new rblib checkpoint context to track flashblocks
1 parent c104a73 commit d9c72f2

File tree

10 files changed

+91
-113
lines changed

10 files changed

+91
-113
lines changed

src/bundle.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
//! - The `eth_sendBundle` input parameters and their validation.
1010
1111
use {
12-
crate::platform::Flashblocks,
12+
crate::{platform::Flashblocks, state::FlashblockNumber},
1313
core::convert::Infallible,
1414
rblib::{
1515
alloy::{
@@ -126,7 +126,7 @@ impl Bundle<Flashblocks> for FlashblocksBundle {
126126
fn is_eligible(
127127
&self,
128128
block: &BlockContext<Flashblocks>,
129-
_ctx: &(),
129+
_ctx: &FlashblockNumber,
130130
) -> Eligibility {
131131
if self.txs.is_empty() {
132132
// empty bundles are never eligible

src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
mod bundle;
22
mod platform;
33
mod primitives;
4+
mod state;
45

5-
pub use {bundle::*, platform::*, primitives::*};
6+
pub use {bundle::*, platform::*, primitives::*, state::*};

src/limits.rs

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@
55
//! flashblock partitioning logic.
66
77
use {
8-
crate::{Flashblocks, state::FlashblockNumber},
8+
crate::{
9+
Flashblocks,
10+
state::{FlashblockNumber, TargetFlashblocks},
11+
},
912
core::time::Duration,
1013
rblib::{alloy::consensus::BlockHeader, prelude::*},
1114
std::sync::{Arc, Mutex},
@@ -33,7 +36,7 @@ pub struct FlashblockState {
3336
current_block: Option<u64>,
3437
/// Current flashblock number. Used to check if we're on the first
3538
/// flashblock or to adjust the target number of flashblocks for a block.
36-
flashblock_number: Arc<FlashblockNumber>,
39+
target_flashblocks: Arc<TargetFlashblocks>,
3740
/// Duration for the first flashblock, which may be shortened to absorb
3841
/// timing variance.
3942
first_flashblock_interval: Duration,
@@ -43,20 +46,16 @@ pub struct FlashblockState {
4346
}
4447

4548
impl FlashblockState {
46-
fn current_gas_limit(&self) -> u64 {
49+
fn current_gas_limit(&self, flashblock_number: &FlashblockNumber) -> u64 {
4750
self
4851
.gas_per_flashblock
49-
.saturating_mul(self.flashblock_number.current())
52+
.saturating_mul(flashblock_number.current())
5053
}
5154
}
5255

5356
impl FlashblockLimits {
54-
pub fn new(
55-
interval: Duration,
56-
flashblock_number: Arc<FlashblockNumber>,
57-
) -> Self {
57+
pub fn new(interval: Duration) -> Self {
5858
let state = FlashblockState {
59-
flashblock_number,
6059
..Default::default()
6160
};
6261
FlashblockLimits {
@@ -97,10 +96,7 @@ impl FlashblockLimits {
9796
.unwrap_or(enclosing.gas_limit);
9897
state.current_block = Some(payload.block().number());
9998
state.first_flashblock_interval = first_flashblock_interval;
100-
state.flashblock_number.reset_current_flashblock();
101-
state
102-
.flashblock_number
103-
.set_target_flashblocks(target_flashblock);
99+
state.target_flashblocks.set(target_flashblock);
104100
}
105101
}
106102

@@ -111,18 +107,19 @@ impl FlashblockLimits {
111107
pub fn get_limits(
112108
&self,
113109
enclosing: &Limits<Flashblocks>,
110+
flashblock_number: &FlashblockNumber,
114111
) -> Limits<Flashblocks> {
115112
let state = self.state.lock().expect("mutex is not poisoned");
116113
// If flashblock number == 1, we're building the first flashblock
117-
let deadline = if state.flashblock_number.current() == 1 {
114+
let deadline = if flashblock_number.current() == 1 {
118115
state.first_flashblock_interval
119116
} else {
120117
self.interval
121118
};
122119

123120
enclosing
124121
.with_deadline(deadline)
125-
.with_gas_limit(state.current_gas_limit())
122+
.with_gas_limit(state.current_gas_limit(flashblock_number))
126123
}
127124

128125
/// Calculates the number of flashblocks and first flashblock interval for
@@ -153,27 +150,30 @@ impl ScopedLimits<Flashblocks> for FlashblockLimits {
153150
payload: &Checkpoint<Flashblocks>,
154151
enclosing: &Limits<Flashblocks>,
155152
) -> Limits<Flashblocks> {
153+
let flashblock_number = payload.context();
156154
// Check the state and reset if we started building next block
157155
self.update_state(payload, enclosing);
158156

159-
let limits = self.get_limits(enclosing);
157+
let limits = self.get_limits(enclosing, flashblock_number);
160158

161159
let state = self.state.lock().expect("mutex is not poisoned");
162-
if state.flashblock_number.in_bounds() {
160+
let flashblock_number = payload.context();
161+
if flashblock_number.current() <= state.target_flashblocks.get() {
163162
let gas_used = payload.cumulative_gas_used();
164163
let remaining_gas = enclosing.gas_limit.saturating_sub(gas_used);
165164
tracing::info!(
166165
"Creating flashblocks limits: {}, payload txs: {}, gas used: {} \
167166
({}%), gas_remaining: {} ({}%), next_block_gas_limit: {} ({}%), gas \
168167
per block: {} ({}%), remaining_time: {}ms, gas_limit: {}",
169-
state.flashblock_number,
168+
flashblock_number,
170169
payload.history().transactions().count(),
171170
gas_used,
172171
(gas_used * 100 / enclosing.gas_limit),
173172
remaining_gas,
174173
(remaining_gas * 100 / enclosing.gas_limit),
175-
state.current_gas_limit(),
176-
(state.current_gas_limit() * 100 / enclosing.gas_limit),
174+
state.current_gas_limit(flashblock_number),
175+
(state.current_gas_limit(flashblock_number) * 100
176+
/ enclosing.gas_limit),
177177
state.gas_per_flashblock,
178178
(state.gas_per_flashblock * 100 / enclosing.gas_limit),
179179
limits.deadline.expect("deadline is set").as_millis(),

src/main.rs

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use {
66
publish::{PublishFlashblock, WebSocketSink},
77
rpc::TransactionStatusRpc,
88
signer::BuilderSigner,
9-
state::FlashblockNumber,
9+
state::TargetFlashblocks,
1010
stop::BreakAfterMaxFlashblocks,
1111
},
1212
platform::Flashblocks,
@@ -104,9 +104,7 @@ fn build_pipeline(
104104
.clone()
105105
.unwrap_or(BuilderSigner::random());
106106

107-
// Multiple steps need to access flashblock number state, so we need to
108-
// initialize it outside
109-
let flashblock_number = Arc::new(FlashblockNumber::new());
107+
let target_flashblocks = Arc::new(TargetFlashblocks::new());
110108

111109
let pipeline = Pipeline::<Flashblocks>::named("top")
112110
.with_step(OptimismPrologue)
@@ -143,17 +141,13 @@ fn build_pipeline(
143141
)
144142
.with_epilogue(PublishFlashblock::new(
145143
ws.clone(),
146-
flashblock_number.clone(),
147144
cli_args.flashblocks_args.calculate_state_root,
148145
))
149-
.with_limits(FlashblockLimits::new(
150-
interval,
151-
flashblock_number.clone(),
152-
)),
146+
.with_limits(FlashblockLimits::new(interval)),
153147
)
154148
.with_step(BreakAfterDeadline),
155149
)
156-
.with_step(BreakAfterMaxFlashblocks::new(flashblock_number)),
150+
.with_step(BreakAfterMaxFlashblocks::new(target_flashblocks)),
157151
)
158152
.with_limits(Scaled::default().deadline(total_building_time));
159153

src/platform.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use {
2-
crate::bundle::FlashblocksBundle,
2+
crate::{bundle::FlashblocksBundle, state::FlashblockNumber},
33
rblib::{prelude::*, reth::providers::StateProvider},
44
serde::{Deserialize, Serialize},
55
std::sync::Arc,
@@ -19,7 +19,7 @@ pub struct Flashblocks;
1919

2020
impl Platform for Flashblocks {
2121
type Bundle = FlashblocksBundle;
22-
type CheckpointContext = ();
22+
type CheckpointContext = FlashblockNumber;
2323
type DefaultLimits = types::DefaultLimits<Optimism>;
2424
type EvmConfig = types::EvmConfig<Optimism>;
2525
type ExtraLimits = types::ExtraLimits<Optimism>;

src/publish.rs

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
//! building jobs.
1111
1212
use {
13-
crate::{Flashblocks, primitives::*, state::FlashblockNumber},
13+
crate::{Flashblocks, primitives::*},
1414
atomic_time::AtomicOptionInstant,
1515
core::{net::SocketAddr, sync::atomic::Ordering},
1616
futures::{SinkExt, StreamExt},
@@ -50,10 +50,6 @@ pub struct PublishFlashblock {
5050
/// subscribers.
5151
sink: Arc<WebSocketSink>,
5252

53-
/// Reference to the current flashblock number within the payload job.
54-
/// Once we build a flashblock we increment this.
55-
flashblock_number: Arc<FlashblockNumber>,
56-
5753
/// Set once at the begining of the payload job, captures immutable
5854
/// information about the payload that is being built. This info is derived
5955
/// from the payload attributes parameter on the FCU from the EL node.
@@ -70,13 +66,11 @@ pub struct PublishFlashblock {
7066
impl PublishFlashblock {
7167
pub fn new(
7268
sink: Arc<WebSocketSink>,
73-
flashblock_number: Arc<FlashblockNumber>,
7469
// TODO: Will be implemented later
7570
_calculate_state_root: bool,
7671
) -> Self {
7772
Self {
7873
sink,
79-
flashblock_number,
8074
block_base: RwLock::new(None),
8175
metrics: Metrics::default(),
8276
times: Times::default(),
@@ -120,10 +114,10 @@ impl Step<Flashblocks> for PublishFlashblock {
120114
.expect("withdrawals_root is present"),
121115
};
122116

117+
let flashblock_number = payload.context();
118+
123119
// Get 0-index to use in flashblock
124-
let index = self.flashblock_number.index();
125-
// Increment flashblock number since we've built the flashblock
126-
self.flashblock_number.advance();
120+
let index = flashblock_number.index();
127121

128122
// Push the contents of the payload
129123
if let Err(e) = self.sink.publish(&FlashblocksPayloadV1 {
@@ -152,10 +146,13 @@ impl Step<Flashblocks> for PublishFlashblock {
152146
self.times.on_published_block(&self.metrics);
153147
self.capture_payload_metrics(&this_block_span);
154148

149+
// Increment flashblock number since we've built the flashblock
150+
let next_flashblock_number = flashblock_number.advance();
151+
155152
// Place a barrier after each published flashblock to freeze the contents
156153
// of the payload up to this point, since this becomes a publicly committed
157154
// state.
158-
ControlFlow::Ok(payload.barrier())
155+
ControlFlow::Ok(payload.barrier_with_context(next_flashblock_number))
159156
}
160157

161158
/// Before the payload job starts prepare the contents of the
@@ -200,9 +197,6 @@ impl Step<Flashblocks> for PublishFlashblock {
200197
) -> Result<(), PayloadBuilderError> {
201198
self.times.on_job_ended(&self.metrics);
202199

203-
// Reset current flashblock number since we're done with the whole block
204-
let count = self.flashblock_number.reset_current_flashblock();
205-
self.metrics.blocks_per_payload_job.record(count as f64);
206200
*self.block_base.write() = None;
207201

208202
Ok(())
@@ -232,7 +226,10 @@ impl PublishFlashblock {
232226
payload: &Checkpoint<Flashblocks>,
233227
) -> Span<Flashblocks> {
234228
// If we haven't published flashblock return whole history
235-
payload.history()
229+
let previous_flashblock_number = payload.context().clone();
230+
payload
231+
.history_since_last_context(&previous_flashblock_number)
232+
.unwrap_or(payload.history())
236233
}
237234

238235
/// Called for each flashblock to capture metrics about the produced
@@ -273,9 +270,6 @@ struct Metrics {
273270
/// Histogram of the number of bundles per flashblock.
274271
pub bundles_per_block: Histogram,
275272

276-
/// Histogram of flashblocks per job.
277-
pub blocks_per_payload_job: Histogram,
278-
279273
/// The time interval flashblocks within one block.
280274
pub intra_block_interval: Histogram,
281275

src/state.rs

Lines changed: 35 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,68 +1,56 @@
1-
use std::{
2-
fmt::Display,
3-
sync::atomic::{AtomicU64, Ordering},
1+
use {
2+
crate::Flashblocks,
3+
rblib::prelude::CheckpointContext,
4+
std::{
5+
fmt::Display,
6+
sync::atomic::{AtomicU64, Ordering},
7+
},
48
};
59

6-
#[derive(Debug)]
7-
pub struct FlashblockNumber {
8-
/// Current flashblock number (1-indexed).
9-
current_flashblock: AtomicU64,
10-
/// Number of flashblocks we're targeting to build for this block.
11-
target_flashblocks: AtomicU64,
12-
}
13-
14-
impl FlashblockNumber {
15-
pub fn new() -> Self {
16-
Self {
17-
current_flashblock: AtomicU64::new(1),
18-
target_flashblocks: AtomicU64::new(0),
19-
}
20-
}
10+
/// Current flashblock number (1-indexed).
11+
#[derive(Debug, Clone, Default, PartialEq, Eq)]
12+
pub struct FlashblockNumber(u64);
2113

22-
pub fn current(&self) -> u64 {
23-
self.current_flashblock.load(Ordering::Relaxed)
24-
}
14+
/// Number of flashblocks we're targeting to build for this block.
15+
#[derive(Debug, Default)]
16+
pub struct TargetFlashblocks(AtomicU64);
2517

26-
pub fn max(&self) -> u64 {
27-
self.target_flashblocks.load(Ordering::Relaxed)
28-
}
18+
// TODO fix the default impl
2919

20+
impl FlashblockNumber {
3021
/// Returns current flashblock in 0-index format
3122
pub fn index(&self) -> u64 {
32-
self
33-
.current_flashblock
34-
.load(Ordering::Relaxed)
35-
.saturating_sub(1)
23+
self.0 - 1
3624
}
3725

38-
pub fn advance(&self) -> u64 {
39-
self.current_flashblock.fetch_add(1, Ordering::Relaxed)
26+
pub fn current(&self) -> u64 {
27+
self.0
4028
}
4129

42-
pub fn in_bounds(&self) -> bool {
43-
self.current_flashblock.load(Ordering::Relaxed)
44-
<= self.target_flashblocks.load(Ordering::Relaxed)
30+
#[must_use]
31+
pub fn advance(&self) -> Self {
32+
Self(self.0 + 1)
4533
}
34+
}
4635

47-
pub fn set_target_flashblocks(&self, target_flashblock: u64) {
48-
self
49-
.target_flashblocks
50-
.store(target_flashblock, Ordering::Relaxed);
36+
impl Display for FlashblockNumber {
37+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38+
write!(f, "{}", self.current())
5139
}
40+
}
41+
42+
impl CheckpointContext<Flashblocks> for FlashblockNumber {}
5243

53-
pub fn reset_current_flashblock(&self) -> u64 {
54-
self.current_flashblock.swap(1, Ordering::Relaxed)
44+
impl TargetFlashblocks {
45+
pub fn new() -> Self {
46+
Self(AtomicU64::default())
5547
}
56-
}
5748

58-
impl Default for FlashblockNumber {
59-
fn default() -> Self {
60-
Self::new()
49+
pub fn get(&self) -> u64 {
50+
self.0.load(Ordering::Relaxed)
6151
}
62-
}
6352

64-
impl Display for FlashblockNumber {
65-
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66-
write!(f, "{}/{}", self.current(), self.max())
53+
pub fn set(&self, val: u64) {
54+
self.0.store(val, Ordering::Relaxed);
6755
}
6856
}

0 commit comments

Comments
 (0)