Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 32 additions & 159 deletions src/limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,9 @@
//! flashblock partitioning logic.

use {
crate::{
Flashblocks,
state::{FlashblockNumber, TargetFlashblocks},
},
crate::Flashblocks,
core::time::Duration,
rblib::{alloy::consensus::BlockHeader, prelude::*},
std::sync::{Arc, Mutex},
tracing::debug,
};

/// Specifies the limits for individual flashblocks.
Expand All @@ -26,173 +21,51 @@ use {
/// deadline by the flashblock interval.
#[derive(Debug, Clone, Default)]
pub struct FlashblockLimits {
state: Arc<Mutex<FlashblockState>>,
/// The time interval between flashblocks within one payload job.
interval: Duration,
}

#[derive(Debug, Clone, Default)]
pub struct FlashblockState {
/// Current block number being built, or `None` if uninitialized.
current_block: Option<u64>,
/// Current flashblock number. Used to check if we're on the first
/// flashblock or to adjust the target number of flashblocks for a block.
target_flashblocks: Arc<TargetFlashblocks>,
/// Duration for the first flashblock, which may be shortened to absorb
/// timing variance.
first_flashblock_interval: Duration,
/// Gas allocated per flashblock (total gas limit divided by flashblock
/// count).
gas_per_flashblock: u64,
}

impl FlashblockState {
fn current_gas_limit(&self, flashblock_number: &FlashblockNumber) -> u64 {
self
.gas_per_flashblock
.saturating_mul(flashblock_number.current())
}
}

impl FlashblockLimits {
pub fn new(
interval: Duration,
target_flashblocks: Arc<TargetFlashblocks>,
) -> Self {
let state = FlashblockState {
target_flashblocks,
..Default::default()
};
FlashblockLimits {
interval,
state: Arc::new(Mutex::new(state)),
}
pub fn new(interval: Duration) -> Self {
FlashblockLimits { interval }
}
}

/// Resets state when starting a new block, calculating target flashblock
/// count.
///
/// If a new block is detected (different block number than current state),
/// initializes the flashblock partition for this block by:
/// - Calculating available time and dividing it into flashblock intervals
/// - Computing gas per flashblock from the total gas limit
/// - Resetting the current flashblock counter to 0
/// - Adjusting the target number of flashblocks
pub fn update_state(
impl ScopedLimits<Flashblocks> for FlashblockLimits {
/// Creates the payload limits for the next flashblock in a new payload job.
fn create(
&self,
payload: &Checkpoint<Flashblocks>,
enclosing: &Limits<Flashblocks>,
) {
let mut state = self.state.lock().expect("mutex is not poisoned");

if state.current_block != Some(payload.block().number()) {
let payload_deadline = enclosing.deadline.expect(
"Flashblock limit require its enclosing scope to have a deadline",
);
let elapsed = payload.building_since().elapsed();
let remaining_time = payload_deadline.saturating_sub(elapsed);

let (target_flashblocks, first_flashblock_interval) =
self.calculate_flashblocks(payload, remaining_time);

state.gas_per_flashblock = enclosing
.gas_limit
.checked_div(target_flashblocks)
.unwrap_or(enclosing.gas_limit);
state.current_block = Some(payload.block().number());
state.first_flashblock_interval = first_flashblock_interval;
state.target_flashblocks.set(target_flashblocks);

debug!(
target_flashblocks = target_flashblocks,
first_flashblock_interval = ?first_flashblock_interval,
"Set flashblock timing for this block"
);
}
}

/// Returns limits for the current flashblock.
///
/// If all flashblocks have been produced, returns a deadline of 1ms to stop
/// production.
pub fn get_limits(
&self,
enclosing: &Limits<Flashblocks>,
flashblock_number: &FlashblockNumber,
) -> Limits<Flashblocks> {
let state = self.state.lock().expect("mutex is not poisoned");
// If flashblock number == 1, we're building the first flashblock
let deadline = if flashblock_number.current() == 1 {
state.first_flashblock_interval
} else {
self.interval
};

enclosing
.with_deadline(deadline)
.with_gas_limit(state.current_gas_limit(flashblock_number))
}
let flashblock_number = payload.context();

let payload_deadline = enclosing.deadline.expect(
"FlashblockLimits requires its enclosing scope to have a deadline",
);
let elapsed = payload.building_since().elapsed();
let remaining_time = payload_deadline.saturating_sub(elapsed);

/// Calculates the number of flashblocks and first flashblock interval for
/// this block.
///
/// Extracts block time from block timestamps, then partitions the remaining
/// time into flashblock intervals.
pub fn calculate_flashblocks(
&self,
payload: &Checkpoint<Flashblocks>,
remaining_time: Duration,
) -> (u64, Duration) {
let block_time = Duration::from_secs(
payload
.block()
.timestamp()
.saturating_sub(payload.block().parent().header().timestamp()),
);

partition_time_into_flashblocks(block_time, remaining_time, self.interval)
}
}

impl ScopedLimits<Flashblocks> for FlashblockLimits {
/// Creates the payload limits for the next flashblock in a new payload job.
fn create(
&self,
payload: &Checkpoint<Flashblocks>,
enclosing: &Limits<Flashblocks>,
) -> Limits<Flashblocks> {
let flashblock_number = payload.context();
// Check the state and reset if we started building next block
self.update_state(payload, enclosing);

let limits = self.get_limits(enclosing, flashblock_number);
let (target_flashblocks, deadline) = partition_time_into_flashblocks(
block_time,
remaining_time,
self.interval,
);

let state = self.state.lock().expect("mutex is not poisoned");
let flashblock_number = payload.context();
if flashblock_number.current() <= state.target_flashblocks.get() {
let gas_used = payload.cumulative_gas_used();
let remaining_gas = enclosing.gas_limit.saturating_sub(gas_used);
tracing::info!(
"Creating flashblocks limits: {}, payload txs: {}, gas used: {} \
({}%), gas_remaining: {} ({}%), next_block_gas_limit: {} ({}%), gas \
per block: {} ({}%), remaining_time: {}ms, gas_limit: {}",
flashblock_number,
payload.history().transactions().count(),
gas_used,
(gas_used * 100 / enclosing.gas_limit),
remaining_gas,
(remaining_gas * 100 / enclosing.gas_limit),
state.current_gas_limit(flashblock_number),
(state.current_gas_limit(flashblock_number) * 100
/ enclosing.gas_limit),
state.gas_per_flashblock,
(state.gas_per_flashblock * 100 / enclosing.gas_limit),
limits.deadline.expect("deadline is set").as_millis(),
limits.gas_limit
);
}
let gas_limit = enclosing
.gas_limit
.checked_div(target_flashblocks)
.unwrap_or(enclosing.gas_limit)
.saturating_mul(flashblock_number.current());

limits
enclosing.with_deadline(deadline).with_gas_limit(gas_limit)
}
}

Expand Down Expand Up @@ -221,21 +94,21 @@ fn partition_time_into_flashblocks(
) -> (u64, Duration) {
let remaining_time = remaining_time.min(block_time);

let remaining_millis = u64::try_from(remaining_time.as_millis())
let remaining_ms = u64::try_from(remaining_time.as_millis())
.expect("remaining_time should never exceed u64::MAX milliseconds");
let interval_millis = u64::try_from(flashblock_interval.as_millis())
let flashblock_interval_ms = u64::try_from(flashblock_interval.as_millis())
.expect("flashblock_interval should never exceed u64::MAX milliseconds");

let first_offset_millis = remaining_millis % interval_millis;
let first_offset_ms = remaining_ms % flashblock_interval_ms;

if first_offset_millis == 0 {
if first_offset_ms == 0 {
// Perfect division: remaining time is exact multiple of interval
(remaining_millis / interval_millis, flashblock_interval)
(remaining_ms / flashblock_interval_ms, flashblock_interval)
} else {
// Non-perfect division: add extra flashblock with shortened first interval
(
remaining_millis / interval_millis + 1,
Duration::from_millis(first_offset_millis),
remaining_ms / flashblock_interval_ms + 1,
Duration::from_millis(first_offset_ms),
)
}
}
Expand Down
70 changes: 23 additions & 47 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use {
publish::{PublishFlashblock, WebSocketSink},
rpc::TransactionStatusRpc,
signer::BuilderSigner,
state::TargetFlashblocks,
stop::BreakAfterMaxFlashblocks,
},
platform::Flashblocks,
Expand All @@ -20,7 +19,6 @@ use {
steps::*,
},
std::sync::Arc,
tracing::info,
};

mod args;
Expand Down Expand Up @@ -84,8 +82,7 @@ fn build_pipeline(
cli_args: &BuilderArgs,
pool: &OrderPool<Flashblocks>,
) -> eyre::Result<Pipeline<Flashblocks>> {
// how often a flashblock is published
let interval = cli_args.flashblocks_args.interval;
let flashblock_interval = cli_args.flashblocks_args.interval;

// time by which flashblocks will be delivered earlier to account for latency
let leeway_time = cli_args.flashblocks_args.leeway_time;
Expand All @@ -104,14 +101,7 @@ fn build_pipeline(
.clone()
.unwrap_or(BuilderSigner::random());

let target_flashblocks = Arc::new(TargetFlashblocks::new());

info!(
"cli_args.builder_signer.is_some() = {}",
cli_args.builder_signer.is_some()
);

let pipeline = Pipeline::<Flashblocks>::named("top")
let pipeline = Pipeline::<Flashblocks>::named("block")
.with_step(OptimismPrologue)
.with_step_if(
cli_args.flashtestations.flashtestations_enabled
Expand All @@ -123,45 +113,31 @@ fn build_pipeline(
)
.with_pipeline(
Loop,
Pipeline::named("n_flashblocks")
Pipeline::named("flashblocks")
.with_pipeline(
Once,
Loop,
Pipeline::named("single_flashblock")
.with_pipeline(
Once,
Pipeline::named("flashblock_steps")
.with_pipeline(
Loop,
Pipeline::named("inner_flashblock_steps")
.with_step(AppendOrders::from_pool(pool).with_ok_on_limit())
.with_step(OrderByPriorityFee::default())
.with_step_if(
cli_args.revert_protection,
RemoveRevertedTransactions::default(),
)
.with_step(BreakAfterDeadline),
)
.with_step_if(
cli_args.builder_signer.is_some(),
BuilderEpilogue::with_signer(builder_signer.clone().into())
.with_message(|block| {
format!("Block Number: {}", block.number())
}),
)
.with_step(PublishFlashblock::new(
ws.clone(),
cli_args.flashblocks_args.calculate_state_root,
))
.with_limits(FlashblockLimits::new(
interval,
target_flashblocks.clone(),
)),
.with_step(AppendOrders::from_pool(pool).with_ok_on_limit())
.with_step(OrderByPriorityFee::default())
.with_step_if(
cli_args.revert_protection,
RemoveRevertedTransactions::default(),
)
.with_step(BreakAfterDeadline),
.with_step(BreakAfterDeadline)
.with_limits(FlashblockLimits::new(flashblock_interval)),
)
.with_step(BreakAfterMaxFlashblocks::new(target_flashblocks)),
)
.with_limits(Scaled::default().deadline(total_building_time));
.with_step_if(
cli_args.builder_signer.is_some(),
BuilderEpilogue::with_signer(builder_signer.clone().into())
.with_message(|block| format!("Block Number: {}", block.number())),
)
.with_step(PublishFlashblock::new(
ws.clone(),
cli_args.flashblocks_args.calculate_state_root,
))
.with_step(BreakAfterMaxFlashblocks::new(flashblock_interval))
.with_limits(Scaled::default().deadline(total_building_time)),
);

ws.watch_shutdown(&pipeline);
pool.attach_pipeline(&pipeline);
Expand Down
2 changes: 1 addition & 1 deletion src/publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ impl Step<Flashblocks> for PublishFlashblock {
self.capture_payload_metrics(&this_block_span);

// Increment flashblock number since we've built the flashblock
let next_flashblock_number = flashblock_number.advance();
let next_flashblock_number = flashblock_number.next();

// Place a barrier after each published flashblock to freeze the contents
// of the payload up to this point, since this becomes a publicly committed
Expand Down
Loading