Skip to content

Commit f96e198

Browse files
authored
Flashblock limits + reth bump (#17)
* Different approach to first flashblock limit * Different approach to first flashblock limit * Different approach to first flashblock limit
1 parent bbf16dd commit f96e198

File tree

6 files changed

+169
-108
lines changed

6 files changed

+169
-108
lines changed

Cargo.toml

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ jemalloc = ["rblib/jemalloc"]
3434
debug = ["tokio/full", "tokio/tracing", "dep:console-subscriber"]
3535

3636
[dependencies]
37-
rblib = { git = "https://github.com/flashbots/rblib", rev = "9e9b93a601a4cb1a1dd5ddd3952b09ab145d3e93" }
37+
rblib = { git = "https://github.com/flashbots/rblib", rev = "5dea87ce8e9ea61692d5d3e05545b62d01671e1e" }
3838

3939
futures = "0.3"
4040
tokio = "1.46"
@@ -56,19 +56,19 @@ humantime = "2.2"
5656
atomic-time = "0.1"
5757

5858
secp256k1 = "0.30"
59-
jsonrpsee = "0.25.1"
59+
jsonrpsee = "0.26.0"
6060

61-
alloy-json-rpc = "1.0.18"
62-
alloy-serde = "1.0.27"
61+
alloy-json-rpc = "1.0.37"
62+
alloy-serde = "1.0.37"
6363

6464

6565
# reth dependencies
66-
reth-network-peers = { git = "https://github.com/paradigmxyz/reth", tag = "v1.6.0" }
67-
reth-db-api = { git = "https://github.com/paradigmxyz/reth", tag = "v1.6.0" }
68-
reth-node-builder = { git = "https://github.com/paradigmxyz/reth", tag = "v1.6.0" }
69-
reth-optimism-node = { git = "https://github.com/paradigmxyz/reth", tag = "v1.6.0" }
70-
reth-optimism-rpc = { git = "https://github.com/paradigmxyz/reth", tag = "v1.6.0" }
71-
reth-provider = { git = "https://github.com/paradigmxyz/reth", tag = "v1.6.0" }
66+
reth-network-peers = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.2" }
67+
reth-db-api = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.2" }
68+
reth-node-builder = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.2" }
69+
reth-optimism-node = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.2" }
70+
reth-optimism-rpc = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.2" }
71+
reth-provider = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.2" }
7272

7373
# debug flag
7474
console-subscriber = { version = "0.4", optional = true }

src/args.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ pub struct FlashblocksArgs {
9090
/// Should we calculate the state root for each flashblock
9191
#[arg(
9292
long = "flashblocks.calculate-state-root",
93-
default_value = "true",
93+
default_value = "false",
9494
env = "FLASHBLOCKS_CALCULATE_STATE_ROOT"
9595
)]
9696
pub calculate_state_root: bool,

src/limits.rs

Lines changed: 124 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,13 @@
44
//! individual flashblocks. This is essentially where we define the block /
55
//! flashblock partitioning logic.
66
7+
use std::sync::{Arc, Mutex};
8+
use std::sync::atomic::{AtomicU64, Ordering};
79
use {
810
crate::Flashblocks,
911
core::time::Duration,
1012
rblib::{alloy::consensus::BlockHeader, prelude::*},
1113
std::ops::{Div, Rem},
12-
tracing::debug,
1314
};
1415

1516
/// Specifies the limits for individual flashblocks.
@@ -21,90 +22,96 @@ use {
2122
/// At the beginning of a payload job this instance will calculate the number of
2223
/// target flashblocks for the given job by dividing the total payload job
2324
/// deadline by the flashblock interval.
25+
#[derive(Debug, Clone, Default)]
2426
pub struct FlashblockLimits {
27+
state: Arc<Mutex<FlashblockState>>,
2528
/// The time interval between flashblocks within one payload job.
2629
interval: Duration,
27-
/// Time by which flashblocks will be delivered earlier to account for
28-
/// latency. This time is absorbed by the first flashblock.
29-
leeway_time: Duration,
3030
}
3131

32-
impl ScopedLimits<Flashblocks> for FlashblockLimits {
33-
/// Creates the payload limits for the next flashblock in a new payload job.
34-
fn create(
35-
&self,
36-
payload: &Checkpoint<Flashblocks>,
37-
enclosing: &Limits,
38-
) -> Limits {
39-
let payload_deadline = enclosing.deadline.expect(
40-
"Flashblock limit require its enclosing scope to have a deadline",
41-
);
42-
let remaining_time =
43-
payload_deadline.saturating_sub(payload.building_since().elapsed());
44-
45-
let is_first_block = Self::is_first_block(payload);
32+
#[derive(Debug, Clone, Default)]
33+
pub struct FlashblockState {
34+
/// Current block being built
35+
/// None - uninitialized state, happens only on first block building job
36+
current_block: Option<u64>,
37+
/// Current flashblock being built, number based
38+
/// 0 - uninitialized state, use progress_state to initialize it
39+
current_flashblock: u64,
40+
/// Interval of the first flashblock, it absorbs leeway time and network lag
41+
first_flashblock_interval: Duration,
42+
/// Gas for flashblock used for current block
43+
gas_per_flashblock: u64,
44+
/// Used to communicate maximum number of flashblocks on every blocks for other steps
45+
// TODO: once we remove max_flashblocks from publish step we could change it to u64
46+
max_flashblocks: Arc<AtomicU64>,
47+
}
4648

47-
// Calculate the number of remaining flashblocks, and the interval for the
48-
// current flashblock.
49-
let (remaining_blocks, current_flashblock_interval) = if is_first_block {
50-
// First block absorbs the leeway time by having a shorter deadline.
51-
self.calculate_flashblocks(payload, remaining_time)
52-
} else {
53-
// Subsequent blocks get the normal interval.
54-
#[allow(clippy::cast_possible_truncation)]
55-
let remaining_blocks =
56-
(remaining_time.as_millis() / self.interval.as_millis()) as u64;
57-
(remaining_blocks, self.interval)
49+
impl FlashblockState {
50+
fn current_gas_limit(&self) -> u64 {
51+
self.gas_per_flashblock.saturating_mul(self.current_flashblock)
52+
}
53+
}
54+
impl FlashblockLimits {
55+
pub fn new(interval: Duration, max_flashblocks: Arc<AtomicU64>) -> Self {
56+
let state = FlashblockState{
57+
max_flashblocks,
58+
..Default::default()
5859
};
59-
60-
if remaining_blocks <= 1 {
61-
// we don't have enough time for more than one block
62-
// saturate the payload gas within the remaining time
63-
return enclosing.with_deadline(remaining_time);
60+
FlashblockLimits {
61+
interval,
62+
state: Arc::new(Mutex::new(state)),
6463
}
64+
}
6565

66-
let gas_used = payload.cumulative_gas_used();
67-
let remaining_gas = enclosing.gas_limit.saturating_sub(gas_used);
66+
/// Checks if we have started building new block, if so we need to reset the state
67+
/// This will produce empty state, progress state before using it
68+
pub fn update_state(&self,
69+
payload: &Checkpoint<Flashblocks>,
70+
enclosing: &Limits) {
71+
let mut state = self.state.lock().expect("mutex is not poisoned");
6872

69-
if remaining_gas == 0 {
70-
debug!("No remaining gas for flashblocks, but still have time left");
71-
return enclosing.with_deadline(remaining_time);
72-
}
73+
if state.current_block != Some(payload.block().number()) {
74+
let payload_deadline = enclosing.deadline.expect(
75+
"Flashblock limit require its enclosing scope to have a deadline",
76+
);
77+
let remaining_time =
78+
payload_deadline.saturating_sub(payload.building_since().elapsed());
7379

74-
let gas_per_block = remaining_gas / remaining_blocks;
75-
let next_block_gas_limit = gas_used.saturating_add(gas_per_block);
76-
77-
tracing::info!(
78-
">--> payload txs: {}, gas used: {} ({}%), gas_remaining: {} ({}%), \
79-
next_block_gas_limit: {} ({}%), gas per block: {} ({}%), remaining \
80-
blocks: {}, remaining time: {:?}, leeway: {:?}, \
81-
current_flashblock_interval: {:?}",
82-
payload.history().transactions().count(),
83-
gas_used,
84-
(gas_used * 100 / enclosing.gas_limit),
85-
remaining_gas,
86-
(remaining_gas * 100 / enclosing.gas_limit),
87-
next_block_gas_limit,
88-
(next_block_gas_limit * 100 / enclosing.gas_limit),
89-
gas_per_block,
90-
(gas_per_block * 100 / enclosing.gas_limit),
91-
remaining_blocks,
92-
remaining_time,
93-
self.leeway_time,
94-
current_flashblock_interval
95-
);
80+
let (target_flashblock, first_flashblock_interval) = self.calculate_flashblocks(payload, remaining_time);
81+
state.gas_per_flashblock = enclosing.gas_limit / target_flashblock;
82+
state.current_block = Some(payload.block().number());
83+
state.current_flashblock = 0;
84+
state.first_flashblock_interval = first_flashblock_interval;
85+
state.max_flashblocks.store(target_flashblock, Ordering::Relaxed);
86+
}
87+
}
9688

97-
enclosing
98-
.with_deadline(current_flashblock_interval)
99-
.with_gas_limit(next_block_gas_limit)
89+
/// Progresses the state to the next flashblock
90+
pub fn progress_state(&self) {
91+
let mut state = self.state.lock().expect("mutex is not poisoned");
92+
state.current_flashblock += 1;
10093
}
101-
}
10294

103-
impl FlashblockLimits {
104-
pub fn new(interval: Duration, leeway_time: Duration) -> Self {
105-
FlashblockLimits {
106-
interval,
107-
leeway_time,
95+
/// Return limits for the current flashblock
96+
pub fn get_limits(&self, enclosing: &Limits) -> Limits {
97+
let state = self.state.lock().expect("mutex is not poisoned");
98+
// Check that state was progressed at least once
99+
assert_ne!(state.current_flashblock, 0, "Get limits on uninitialized state");
100+
// If we don't need to create new flashblocks - exit with immediate deadline
101+
if state.current_flashblock > state.max_flashblocks.load(Ordering::Relaxed) {
102+
enclosing.with_deadline(Duration::from_millis(1))
103+
} else {
104+
// If self.current_flashblock == 1, we are building first flashblock
105+
let enclosing = if state.current_flashblock == 1 {
106+
enclosing
107+
.with_deadline(state.first_flashblock_interval)
108+
109+
} else {
110+
enclosing
111+
.with_deadline(self.interval)
112+
113+
};
114+
enclosing.with_gas_limit(state.current_gas_limit())
108115
}
109116
}
110117

@@ -123,9 +130,10 @@ impl FlashblockLimits {
123130
.timestamp()
124131
.saturating_sub(payload.block().parent().header().timestamp()),
125132
);
126-
let remaining_time = remaining_time.min(block_time);
127-
let interval_millis = u64::try_from(self.interval.as_millis()).unwrap();
128-
let remaining_time_millis = u64::try_from(remaining_time.as_millis()).unwrap();
133+
let remaining_time = remaining_time
134+
.min(block_time);
135+
let interval_millis = self.interval.as_millis() as u64;
136+
let remaining_time_millis = remaining_time.as_millis() as u64;
129137
let first_flashblock_offset = remaining_time_millis.rem(interval_millis);
130138

131139
if first_flashblock_offset == 0 {
@@ -139,16 +147,48 @@ impl FlashblockLimits {
139147
)
140148
}
141149
}
150+
}
151+
152+
impl ScopedLimits<Flashblocks> for FlashblockLimits {
153+
/// Creates the payload limits for the next flashblock in a new payload job.
154+
fn create(
155+
&self,
156+
payload: &Checkpoint<Flashblocks>,
157+
enclosing: &Limits,
158+
) -> Limits {
159+
// Check the state and reset if we started building next block
160+
self.update_state(payload, enclosing);
161+
162+
// Update flashblock state
163+
self.progress_state();
164+
165+
let limits = self.get_limits(enclosing);
166+
167+
let state = self.state.lock().expect("mutex is not poisoned");
168+
if state.current_flashblock <= state.max_flashblocks.load(Ordering::Relaxed) {
169+
let gas_used = payload.cumulative_gas_used();
170+
let remaining_gas = enclosing.gas_limit.saturating_sub(gas_used);
171+
tracing::warn!(
172+
">---> flashblocks: {}/{}, payload txs: {}, gas used: {} ({}%), \
173+
gas_remaining: {} ({}%), next_block_gas_limit: {} ({}%), gas per block: {} ({}%), \
174+
remaining_time: {}ms, gas_limit: {}",
175+
state.current_flashblock,
176+
state.max_flashblocks.load(Ordering::Relaxed),
177+
payload.history().transactions().count(),
178+
gas_used,
179+
(gas_used * 100 / enclosing.gas_limit),
180+
remaining_gas,
181+
(remaining_gas * 100 / enclosing.gas_limit),
182+
state.current_gas_limit(),
183+
(state.current_gas_limit() * 100 / enclosing.gas_limit),
184+
state.gas_per_flashblock,
185+
(state.gas_per_flashblock * 100 / enclosing.gas_limit),
186+
limits.deadline.expect("deadline is set").as_millis(),
187+
limits.gas_limit,
188+
);
189+
}
190+
limits
142191

143-
/// Determines if this is the first block in a payload job, by checking if
144-
/// there are any flashblock barriers. If no flashblock barriers exist, this
145-
/// is considered the first block.
146-
pub fn is_first_block(payload: &Checkpoint<Flashblocks>) -> bool {
147-
payload
148-
.history()
149-
.iter()
150-
.filter(|c| c.is_named_barrier("flashblock"))
151-
.count()
152-
== 0
153192
}
154193
}
194+

src/main.rs

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
use std::sync::atomic::AtomicU64;
2+
use reth_optimism_node::OpAddOns;
3+
use reth_optimism_rpc::OpEthApiBuilder;
14
use {
25
crate::{
36
args::{BuilderArgs, Cli, CliExt},
@@ -34,6 +37,15 @@ fn main() {
3437
let opnode = OpNode::new(cli_args.rollup_args.clone());
3538
let tx_status_rpc = TransactionStatusRpc::new(&pipeline);
3639

40+
let addons: OpAddOns<
41+
_,
42+
OpEthApiBuilder,
43+
OpEngineValidatorBuilder,
44+
OpEngineApiBuilder<OpEngineValidatorBuilder>,
45+
> = opnode
46+
.add_ons_builder::<types::RpcTypes<Flashblocks>>()
47+
.build();
48+
3749
#[expect(clippy::large_futures)]
3850
let handle = builder
3951
.with_types::<OpNode>()
@@ -43,11 +55,7 @@ fn main() {
4355
.attach_pool(&pool)
4456
.payload(pipeline.into_service()),
4557
)
46-
.with_add_ons(
47-
opnode
48-
.add_ons_builder::<types::RpcTypes<Flashblocks>>()
49-
.build::<_, OpEngineValidatorBuilder, OpEngineApiBuilder<OpEngineValidatorBuilder>>(),
50-
)
58+
.with_add_ons(addons)
5159
.extend_rpc_modules(move |mut rpc_ctx| {
5260
pool.attach_rpc(&mut rpc_ctx)?;
5361
tx_status_rpc.attach_rpc(&mut rpc_ctx)?;
@@ -133,6 +141,9 @@ fn build_flashblocks_pipeline(
133141

134142
let ws = Arc::new(WebSocketSink::new(socket_address)?);
135143

144+
// TODO: this is super crutch until we have a way to break from outer payload in limits
145+
let max_flashblocks = Arc::new(AtomicU64::new(0));
146+
136147
let pipeline = Pipeline::<Flashblocks>::named("flashblocks")
137148
.with_prologue(OptimismPrologue)
138149
.with_pipeline(
@@ -149,13 +160,13 @@ fn build_flashblocks_pipeline(
149160
.with_epilogue(PublishFlashblock::new(
150161
&ws,
151162
cli_args.flashblocks_args.calculate_state_root,
163+
max_flashblocks.clone(),
152164
))
153-
.with_limits(FlashblockLimits::new(interval, leeway_time)),
165+
.with_limits(FlashblockLimits::new(interval, max_flashblocks)),
154166
)
155167
.with_step(BreakAfterDeadline),
156168
)
157169
.with_limits(Scaled::default().deadline(total_building_time));
158-
159170
ws.watch_shutdown(&pipeline);
160171

161172
Ok(pipeline)

0 commit comments

Comments
 (0)