Skip to content

Commit 209c0f2

Browse files
authored
import flashblocks types through rblib (#48)
1 parent fc529e6 commit 209c0f2

File tree

5 files changed

+37
-23
lines changed

5 files changed

+37
-23
lines changed

Cargo.toml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@ authors = ["Flashbots <[email protected]>"]
1717
exclude = [".github/"]
1818

1919
[workspace.dependencies]
20-
rblib = { git = "https://github.com/flashbots/rblib", rev = "9eccd22047c3f8978ae022d575dca749a81684a8" }
21-
rollup-boost-types = { git = "https://github.com/flashbots/rollup-boost", rev = "b282527e5f46d0915d1e5bdf44338593bc8beb2d" }
20+
rblib = { git = "https://github.com/flashbots/rblib", rev = "e9f6a539a5d4bb8b3a2cae5f02d8adc05d126c37" }
2221

2322
futures = "0.3"
2423
tokio = "1.46"
@@ -67,7 +66,6 @@ debug = ["tokio/full", "tokio/tracing", "dep:console-subscriber"]
6766

6867
[dependencies]
6968
rblib = { workspace = true }
70-
rollup-boost-types = { workspace = true }
7169

7270
futures = { workspace = true }
7371
tokio = { workspace = true }

src/publish.rs

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,27 @@ use {
1616
futures::{SinkExt, StreamExt},
1717
parking_lot::RwLock,
1818
rblib::{
19-
alloy::{consensus::BlockHeader, eips::Encodable2718, primitives::U256},
19+
alloy::{
20+
consensus::BlockHeader,
21+
eips::Encodable2718,
22+
optimism::rpc_types_engine::{
23+
OpFlashblockPayload,
24+
OpFlashblockPayloadBase,
25+
OpFlashblockPayloadDelta,
26+
OpFlashblockPayloadMetadata,
27+
},
28+
primitives::U256,
29+
},
2030
prelude::{ext::CheckpointOpExt, *},
2131
reth::node::builder::PayloadBuilderAttributes,
2232
},
23-
rollup_boost_types::flashblocks::{
24-
ExecutionPayloadBaseV1,
25-
ExecutionPayloadFlashblockDeltaV1,
26-
FlashblocksPayloadV1,
33+
std::{
34+
collections::BTreeMap,
35+
io,
36+
net::TcpListener,
37+
sync::Arc,
38+
time::Instant,
2739
},
28-
std::{io, net::TcpListener, sync::Arc, time::Instant},
2940
tokio::{
3041
net::TcpStream,
3142
sync::{
@@ -58,7 +69,7 @@ pub struct PublishFlashblock {
5869
/// Set once at the begining of the payload job, captures immutable
5970
/// information about the payload that is being built. This info is derived
6071
/// from the payload attributes parameter on the FCU from the EL node.
61-
block_base: RwLock<Option<ExecutionPayloadBaseV1>>,
72+
block_base: RwLock<Option<OpFlashblockPayloadBase>>,
6273

6374
/// Metrics for monitoring flashblock publishing.
6475
metrics: Metrics,
@@ -107,7 +118,7 @@ impl Step<Flashblocks> for PublishFlashblock {
107118
// TODO: Consider moving this into its own step
108119
let base = self.block_base.write().take();
109120
let (_excess_blob_gas, blob_gas_used) = payload.blob_fields();
110-
let diff = ExecutionPayloadFlashblockDeltaV1 {
121+
let diff = OpFlashblockPayloadDelta {
111122
state_root: sealed_block.block().state_root,
112123
receipts_root: sealed_block.block().receipts_root,
113124
logs_bloom: sealed_block.block().logs_bloom,
@@ -128,12 +139,17 @@ impl Step<Flashblocks> for PublishFlashblock {
128139
let index = flashblock_number.index();
129140

130141
// Push the contents of the payload
131-
if let Err(e) = self.sink.publish(&FlashblocksPayloadV1 {
142+
if let Err(e) = self.sink.publish(&OpFlashblockPayload {
132143
base,
133144
diff,
134145
payload_id: ctx.block().payload_id(),
135146
index,
136-
metadata: serde_json::Value::Null,
147+
metadata: OpFlashblockPayloadMetadata {
148+
block_number: ctx.block().number(),
149+
// TODO: Fill in the following fields
150+
new_account_balances: BTreeMap::new(),
151+
receipts: BTreeMap::new(),
152+
},
137153
}) {
138154
self.metrics.websocket_publish_errors_total.increment(1);
139155
tracing::error!("Failed to publish flashblock to websocket: {e}");
@@ -174,7 +190,7 @@ impl Step<Flashblocks> for PublishFlashblock {
174190
self.times.on_job_started(&self.metrics);
175191

176192
// this remains constant for the entire payload job.
177-
self.block_base.write().replace(ExecutionPayloadBaseV1 {
193+
self.block_base.write().replace(OpFlashblockPayloadBase {
178194
parent_beacon_block_root: ctx
179195
.block()
180196
.attributes()
@@ -406,7 +422,7 @@ impl WebSocketSink {
406422
/// Called once by the `PublishFlashblock` pipeline step every time there is a
407423
/// non-empty flashblock that needs to be broadcasted to all external
408424
/// subscribers.
409-
pub fn publish(&self, payload: &FlashblocksPayloadV1) -> io::Result<usize> {
425+
pub fn publish(&self, payload: &OpFlashblockPayload) -> io::Result<usize> {
410426
// Serialize the payload to a UTF-8 string
411427
// serialize only once, then just copy around only a pointer
412428
// to the serialized data for each subscription.

src/tests/utils/ws.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use {
77
futures::{FutureExt, StreamExt},
88
itertools::Itertools,
99
orx_concurrent_vec::ConcurrentVec,
10-
rollup_boost_types::flashblocks::FlashblocksPayloadV1,
10+
rblib::alloy::optimism::rpc_types_engine::OpFlashblockPayload,
1111
std::{sync::Arc, time::Instant},
1212
tokio::{net::TcpStream, sync::oneshot, task::JoinHandle},
1313
tokio_tungstenite::{
@@ -88,7 +88,7 @@ impl Deref for WebSocketObserver {
8888
#[derive(Clone, Debug, Deref)]
8989
pub struct ObservedFlashblock {
9090
#[deref]
91-
pub block: FlashblocksPayloadV1,
91+
pub block: OpFlashblockPayload,
9292
pub at: Instant,
9393
}
9494

@@ -152,7 +152,7 @@ impl Observations {
152152

153153
fn record_message(&self, msg: Message) {
154154
if let Message::Text(string) = &msg {
155-
match serde_json::from_str::<FlashblocksPayloadV1>(string) {
155+
match serde_json::from_str::<OpFlashblockPayload>(string) {
156156
Ok(fb_payload) => self.record_flashblock(fb_payload),
157157
Err(error) => {
158158
self.errors.push(error.into());
@@ -163,7 +163,7 @@ impl Observations {
163163
self.messages.push(msg);
164164
}
165165

166-
fn record_flashblock(&self, payload: FlashblocksPayloadV1) {
166+
fn record_flashblock(&self, payload: OpFlashblockPayload) {
167167
debug!(
168168
index = payload.index,
169169
payload_id = ?payload.payload_id,

tools/fbutil/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ authors.workspace = true
1111
exclude.workspace = true
1212

1313
[dependencies]
14-
rollup-boost-types = { workspace = true }
14+
rblib = { workspace = true }
1515

1616
futures = { workspace = true }
1717
tokio = { workspace = true }

tools/fbutil/src/watch.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use {
55
colored::Colorize,
66
futures::StreamExt,
77
human_format::Formatter,
8-
rollup_boost_types::flashblocks::FlashblocksPayloadV1,
8+
rblib::alloy::optimism::rpc_types_engine::OpFlashblockPayload,
99
std::time::Instant,
1010
tokio_tungstenite::{connect_async, tungstenite::Message},
1111
};
@@ -18,7 +18,7 @@ pub async fn run(cli: &Cli, _: &WatchArgs) -> eyre::Result<()> {
1818
println!("Connected to {}", cli.ws);
1919

2020
let mut prev_at: Option<Instant> = None;
21-
let mut prev: Option<FlashblocksPayloadV1> = None;
21+
let mut prev: Option<OpFlashblockPayload> = None;
2222

2323
while let Some(msg) = stream.next().await {
2424
let now = Instant::now();
@@ -31,7 +31,7 @@ pub async fn run(cli: &Cli, _: &WatchArgs) -> eyre::Result<()> {
3131
eyre::bail!("Received non-text message: {msg:?}");
3232
};
3333

34-
let block: FlashblocksPayloadV1 = serde_json::from_str(&msg)?;
34+
let block: OpFlashblockPayload = serde_json::from_str(&msg)?;
3535

3636
if block.index == 0 {
3737
println!(); // start of new block

0 commit comments

Comments
 (0)