Skip to content

Handle Inconsistent Response in Miner Api #221

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 39 commits into from
May 28, 2025
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
6ea2e32
Handle Inconsistent Response in Miner Api
0xForerunner May 15, 2025
06ade97
Fix log message
0xForerunner May 15, 2025
d40f45b
wip
0xForerunner May 16, 2025
549ff16
wip
0xForerunner May 17, 2025
a8c50cb
retry layer
0xForerunner May 17, 2025
bd28d7b
just handle miner_setMaxDASize
0xForerunner May 17, 2025
718a46a
rename attempts -> retries
0xForerunner May 17, 2025
ec50813
wip
0xForerunner May 20, 2025
eb799a7
wip
0xForerunner May 20, 2025
fb1a531
wip
0xForerunner May 20, 2025
f07a8a3
wip
0xForerunner May 21, 2025
d506285
wip
0xForerunner May 21, 2025
2e9e185
cleanup
0xForerunner May 21, 2025
d689308
remove http retry layer
0xForerunner May 21, 2025
4a2c45b
fix tests
0xForerunner May 21, 2025
11aec74
don't update on retry
0xForerunner May 21, 2025
e2fcd4a
switch to join handle abort
0xForerunner May 21, 2025
b0b5c5e
typo
0xForerunner May 21, 2025
28d4616
cleanup match in send_to_l2
0xForerunner May 22, 2025
220f259
wip
0xForerunner May 27, 2025
ee2d4d9
wip
0xForerunner May 27, 2025
19260a8
wip
0xForerunner May 27, 2025
c52f9bd
wip
0xForerunner May 27, 2025
5dadef9
wip
0xForerunner May 27, 2025
35290d2
wip
0xForerunner May 27, 2025
52a4ea1
wip
0xForerunner May 27, 2025
6466b41
fix tests
0xForerunner May 27, 2025
52bf461
cleanup, fix tests
0xForerunner May 28, 2025
9c18cc0
cleanup
0xForerunner May 28, 2025
20538b1
comment
0xForerunner May 28, 2025
7235a3c
Merge branch 'main' into forerunner/inconsistant-miner-api
0xForerunner May 28, 2025
b3fc0b7
remove stray comment
0xForerunner May 28, 2025
c51e27b
add tests
0xForerunner May 28, 2025
89e6b70
Extra test
ferranbt May 28, 2025
99904fa
add more logs
ferranbt May 28, 2025
507a17a
cleanup and fix test
0xForerunner May 28, 2025
d28ea6c
Add additional test case
0xForerunner May 28, 2025
f292652
add additional test case
0xForerunner May 28, 2025
36d67ae
remove stray comment
0xForerunner May 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::{net::SocketAddr, path::PathBuf};
use std::{net::SocketAddr, path::PathBuf, sync::Arc};

use alloy_rpc_types_engine::JwtSecret;
use clap::{Parser, Subcommand};
use eyre::bail;
use jsonrpsee::{RpcModule, server::Server};
use parking_lot::Mutex;
use tokio::signal::unix::{SignalKind, signal as unix_signal};
use tracing::{Level, info};

Expand Down Expand Up @@ -157,11 +158,12 @@ impl Args {

let (probe_layer, probes) = ProbeLayer::new();

let execution_mode = Arc::new(Mutex::new(self.execution_mode));
let rollup_boost = RollupBoostServer::new(
l2_client,
builder_client,
self.execution_mode,
probes,
execution_mode.clone(),
probes.clone(),
self.health_check_interval,
self.max_unsafe_interval,
);
Expand All @@ -182,6 +184,8 @@ impl Args {
l2_auth_jwt,
builder_args.builder_url,
builder_auth_jwt,
probes,
execution_mode,
));

let server = Server::builder()
Expand Down
222 changes: 222 additions & 0 deletions src/consistent_request.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
use std::{
sync::{Arc, atomic::AtomicBool},
time::Duration,
};

use eyre::{bail, eyre};
use http::{request, response};
use jsonrpsee::{
core::{BoxError, http_helpers},
http_client::{HttpBody, HttpRequest, HttpResponse},
};
use parking_lot::Mutex;
use tokio::{sync::watch, task::JoinHandle};
use tracing::{error, info, warn};

use crate::{ExecutionMode, Health, HttpClient, Probes};

/// A request manager that ensures requests are sent consistently
/// across both the builder and l2 client.
#[derive(Clone, Debug)]
pub struct ConsistentRequest {
method: String,
l2_client: HttpClient,
builder_client: HttpClient,
has_disabled_execution_mode: Arc<AtomicBool>,
req_tx: watch::Sender<Option<(request::Parts, Vec<u8>)>>,
res_rx: watch::Receiver<Option<Result<(response::Parts, Vec<u8>), BoxError>>>,
probes: Arc<Probes>,
execution_mode: Arc<Mutex<ExecutionMode>>,
}

impl ConsistentRequest {
/// Creates a new `ConsistentRequest` instance.
pub fn new(
method: String,
l2_client: HttpClient,
builder_client: HttpClient,
probes: Arc<Probes>,
execution_mode: Arc<Mutex<ExecutionMode>>,
) -> Self {
let (req_tx, mut req_rx) = watch::channel(None);
let (res_tx, mut res_rx) = watch::channel(None);
req_rx.mark_unchanged();
res_rx.mark_unchanged();

let has_disabled_execution_mode = Arc::new(AtomicBool::new(false));

let manager = Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider putting the manager inside an Arc and share the same instance across many tasks

method,
l2_client,
builder_client,
has_disabled_execution_mode,
req_tx,
res_rx,
probes,
execution_mode,
};

let clone = manager.clone();
tokio::spawn(async move {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider extracting this future to a separate async function - something like async fn handle_requests

let mut attempt: Option<JoinHandle<_>> = None;
loop {
req_rx
.changed()
.await
.expect("channel should always be open");

if let Some(attempt) = attempt {
if !attempt.is_finished() {
error!(target: "proxy::call", method = clone.method, message = "request cancelled");
attempt.abort();
}
}

let (parts, body) = req_rx
.borrow_and_update()
.as_ref()
.expect("value should always be Some")
.clone();

let mut clone = clone.clone();
let res_tx_clone = res_tx.clone();
attempt = Some(tokio::spawn(async move {
clone
.send_with_retry_cancel_safe(parts, body, res_tx_clone)
.await
}));
}
});

manager
}

/// This function may be cancelled at any time from an incoming request.
/// We should ensure we're in a valid state at all await points.
async fn send_with_retry_cancel_safe(
&mut self,
parts: request::Parts,
body: Vec<u8>,
res_tx: watch::Sender<Option<Result<(response::Parts, Vec<u8>), BoxError>>>,
) -> eyre::Result<()> {
// We send the l2 request first, because we need to avoid the situation where the
// l2 fails and the builder succeeds. If this were to happen, it would be too dangerous to
// return the l2 error response back to the caller, since the builder would now have an
// invalid state. We can return early if the l2 request fails.
let mut manager = self.clone();
let parts_clone = parts.clone();
let body_clone = body.clone();
let res_sender_clone = res_tx.clone();
tokio::spawn(async move {
manager
.send_to_l2(parts_clone, body_clone, res_sender_clone)
.await
})
.await??;

loop {
let mut manager = self.clone();
let parts_clone = parts.clone();
let body_clone = body.clone();
match tokio::spawn(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

async move { manager.send_to_builder(parts_clone, body_clone).await },
)
.await?
{
Ok(_) => return Ok(()),
Err(_) => {
tokio::time::sleep(Duration::from_millis(200)).await;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error logging would be useful here

}
}
}
}

async fn send_to_l2(
&mut self,
parts: request::Parts,
body: Vec<u8>,
res_tx: watch::Sender<Option<Result<(response::Parts, Vec<u8>), BoxError>>>,
) -> eyre::Result<()> {
let l2_req = HttpRequest::from_parts(parts.clone(), HttpBody::from(body.clone()));
let l2_res = self.l2_client.forward(l2_req, self.method.clone()).await;

// Return the l2 response asap
let (res, l2_res_parts) = match l2_res {
Ok(t) => {
let (parts, body) = t.into_parts();
let (body_bytes, _) =
http_helpers::read_body(&parts.headers, body, u32::MAX).await?;
(Ok(()), Ok((parts, body_bytes)))
}
Err(e) => (Err(eyre!("failed to send request to l2: {e}")), Err(e)),
};
res_tx.send(Some(l2_res_parts))?;
res
}

async fn send_to_builder(&mut self, parts: request::Parts, body: Vec<u8>) -> eyre::Result<()> {
let builder_req = HttpRequest::from_parts(parts.clone(), HttpBody::from(body.clone()));
let builder_res = self
.builder_client
.forward(builder_req, self.method.clone())
.await;

match builder_res {
Ok(_) => {
if self
.has_disabled_execution_mode
.load(std::sync::atomic::Ordering::SeqCst)
{
let mut execution_mode = self.execution_mode.lock();
*execution_mode = ExecutionMode::Enabled;
// Drop before aquiring health lock
drop(execution_mode);
self.has_disabled_execution_mode
.store(false, std::sync::atomic::Ordering::SeqCst);
info!(target: "proxy::call", message = "setting execution mode to Enabled");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should self.probes.set_health be reset here if the call failed previously

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The health should be automatically reset when the builder successfully returns the next payload.

}
Ok(())
}
Err(e) => {
// l2 request succeeded, but builder request failed
// This state can only be recovered from if either the builder is restarted
// or if a retry eventually goes through.
error!(target: "proxy::call", method = self.method, message = "inconsistent responses from builder and L2");
let mut execution_mode = self.execution_mode.lock();
if *execution_mode == ExecutionMode::Enabled {
*execution_mode = ExecutionMode::Disabled;
// Drop before aquiring health lock
drop(execution_mode);
self.has_disabled_execution_mode
.store(true, std::sync::atomic::Ordering::SeqCst);
warn!(target: "proxy::call", message = "setting execution mode to Disabled");
// This health status will likely be later set back to healthy
// but this should be enough to trigger a new leader election.
self.probes.set_health(Health::PartialContent);
};

bail!("failed to send request to builder: {e}");
}
}
}

// Send a request, ensuring consistent responses from both the builder and l2 client.
pub async fn send(
&mut self,
parts: request::Parts,
body_bytes: Vec<u8>,
) -> Result<HttpResponse, BoxError> {
self.req_tx
.send(Some((parts.clone(), body_bytes.clone())))?;

self.res_rx.changed().await?;
let res = self.res_rx.borrow_and_update();
match res.as_ref().expect("value should always be Some") {
Ok((parts, body)) => Ok(HttpResponse::from_parts(
parts.to_owned(),
HttpBody::from(body.to_owned()),
)),
Err(e) => Err(format!("error sending consistent request: {e}").into()),
}
}
}
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
#![allow(clippy::complexity)]
use dotenv as _;

mod client;
Expand Down Expand Up @@ -27,3 +28,5 @@ pub use probe::*;

mod health;
pub use health::*;

mod consistent_request;
Loading
Loading