-
Notifications
You must be signed in to change notification settings - Fork 41
Handle Inconsistent Response in Miner Api #221
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 18 commits
6ea2e32
06ade97
d40f45b
549ff16
a8c50cb
bd28d7b
718a46a
ec50813
eb799a7
fb1a531
f07a8a3
d506285
2e9e185
d689308
4a2c45b
11aec74
e2fcd4a
b0b5c5e
28d4616
220f259
ee2d4d9
19260a8
c52f9bd
5dadef9
35290d2
52a4ea1
6466b41
52bf461
9c18cc0
20538b1
7235a3c
b3fc0b7
c51e27b
89e6b70
99904fa
507a17a
d28ea6c
f292652
36d67ae
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,222 @@ | ||
use std::{ | ||
sync::{Arc, atomic::AtomicBool}, | ||
time::Duration, | ||
}; | ||
|
||
use eyre::{bail, eyre}; | ||
use http::{request, response}; | ||
use jsonrpsee::{ | ||
core::{BoxError, http_helpers}, | ||
http_client::{HttpBody, HttpRequest, HttpResponse}, | ||
}; | ||
use parking_lot::Mutex; | ||
use tokio::{sync::watch, task::JoinHandle}; | ||
use tracing::{error, info, warn}; | ||
|
||
use crate::{ExecutionMode, Health, HttpClient, Probes}; | ||
|
||
/// A request manager that ensures requests are sent consistently | ||
/// across both the builder and l2 client. | ||
#[derive(Clone, Debug)] | ||
pub struct ConsistentRequest { | ||
method: String, | ||
l2_client: HttpClient, | ||
builder_client: HttpClient, | ||
has_disabled_execution_mode: Arc<AtomicBool>, | ||
req_tx: watch::Sender<Option<(request::Parts, Vec<u8>)>>, | ||
res_rx: watch::Receiver<Option<Result<(response::Parts, Vec<u8>), BoxError>>>, | ||
probes: Arc<Probes>, | ||
execution_mode: Arc<Mutex<ExecutionMode>>, | ||
} | ||
|
||
impl ConsistentRequest { | ||
/// Creates a new `ConsistentRequest` instance. | ||
pub fn new( | ||
method: String, | ||
l2_client: HttpClient, | ||
builder_client: HttpClient, | ||
probes: Arc<Probes>, | ||
execution_mode: Arc<Mutex<ExecutionMode>>, | ||
) -> Self { | ||
let (req_tx, mut req_rx) = watch::channel(None); | ||
let (res_tx, mut res_rx) = watch::channel(None); | ||
req_rx.mark_unchanged(); | ||
res_rx.mark_unchanged(); | ||
|
||
let has_disabled_execution_mode = Arc::new(AtomicBool::new(false)); | ||
|
||
let manager = Self { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. consider putting the manager inside an |
||
method, | ||
l2_client, | ||
builder_client, | ||
has_disabled_execution_mode, | ||
req_tx, | ||
res_rx, | ||
probes, | ||
execution_mode, | ||
}; | ||
|
||
let clone = manager.clone(); | ||
tokio::spawn(async move { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider extracting this future to a separate async function - something like |
||
let mut attempt: Option<JoinHandle<_>> = None; | ||
loop { | ||
req_rx | ||
.changed() | ||
.await | ||
.expect("channel should always be open"); | ||
|
||
if let Some(attempt) = attempt { | ||
if !attempt.is_finished() { | ||
error!(target: "proxy::call", method = clone.method, message = "request cancelled"); | ||
attempt.abort(); | ||
} | ||
} | ||
|
||
let (parts, body) = req_rx | ||
.borrow_and_update() | ||
.as_ref() | ||
.expect("value should always be Some") | ||
.clone(); | ||
|
||
let mut clone = clone.clone(); | ||
let res_tx_clone = res_tx.clone(); | ||
attempt = Some(tokio::spawn(async move { | ||
clone | ||
.send_with_retry_cancel_safe(parts, body, res_tx_clone) | ||
.await | ||
})); | ||
} | ||
}); | ||
|
||
manager | ||
} | ||
|
||
/// This function may be cancelled at any time from an incoming request. | ||
/// We should ensure we're in a valid state at all await points. | ||
async fn send_with_retry_cancel_safe( | ||
&mut self, | ||
parts: request::Parts, | ||
body: Vec<u8>, | ||
res_tx: watch::Sender<Option<Result<(response::Parts, Vec<u8>), BoxError>>>, | ||
) -> eyre::Result<()> { | ||
// We send the l2 request first, because we need to avoid the situation where the | ||
// l2 fails and the builder succeeds. If this were to happen, it would be too dangerous to | ||
// return the l2 error response back to the caller, since the builder would now have an | ||
// invalid state. We can return early if the l2 request fails. | ||
let mut manager = self.clone(); | ||
let parts_clone = parts.clone(); | ||
let body_clone = body.clone(); | ||
let res_sender_clone = res_tx.clone(); | ||
0xForerunner marked this conversation as resolved.
Show resolved
Hide resolved
|
||
tokio::spawn(async move { | ||
0xForerunner marked this conversation as resolved.
Show resolved
Hide resolved
|
||
manager | ||
.send_to_l2(parts_clone, body_clone, res_sender_clone) | ||
.await | ||
}) | ||
.await??; | ||
|
||
loop { | ||
let mut manager = self.clone(); | ||
let parts_clone = parts.clone(); | ||
let body_clone = body.clone(); | ||
match tokio::spawn( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
async move { manager.send_to_builder(parts_clone, body_clone).await }, | ||
) | ||
.await? | ||
{ | ||
Ok(_) => return Ok(()), | ||
Err(_) => { | ||
tokio::time::sleep(Duration::from_millis(200)).await; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. error logging would be useful here |
||
} | ||
} | ||
} | ||
} | ||
|
||
async fn send_to_l2( | ||
&mut self, | ||
parts: request::Parts, | ||
body: Vec<u8>, | ||
res_tx: watch::Sender<Option<Result<(response::Parts, Vec<u8>), BoxError>>>, | ||
) -> eyre::Result<()> { | ||
let l2_req = HttpRequest::from_parts(parts.clone(), HttpBody::from(body.clone())); | ||
let l2_res = self.l2_client.forward(l2_req, self.method.clone()).await; | ||
|
||
// Return the l2 response asap | ||
let (res, l2_res_parts) = match l2_res { | ||
Ok(t) => { | ||
let (parts, body) = t.into_parts(); | ||
let (body_bytes, _) = | ||
http_helpers::read_body(&parts.headers, body, u32::MAX).await?; | ||
(Ok(()), Ok((parts, body_bytes))) | ||
} | ||
Err(e) => (Err(eyre!("failed to send request to l2: {e}")), Err(e)), | ||
}; | ||
res_tx.send(Some(l2_res_parts))?; | ||
res | ||
} | ||
|
||
async fn send_to_builder(&mut self, parts: request::Parts, body: Vec<u8>) -> eyre::Result<()> { | ||
let builder_req = HttpRequest::from_parts(parts.clone(), HttpBody::from(body.clone())); | ||
let builder_res = self | ||
.builder_client | ||
.forward(builder_req, self.method.clone()) | ||
.await; | ||
|
||
match builder_res { | ||
Ok(_) => { | ||
if self | ||
.has_disabled_execution_mode | ||
.load(std::sync::atomic::Ordering::SeqCst) | ||
{ | ||
let mut execution_mode = self.execution_mode.lock(); | ||
*execution_mode = ExecutionMode::Enabled; | ||
// Drop before aquiring health lock | ||
drop(execution_mode); | ||
self.has_disabled_execution_mode | ||
.store(false, std::sync::atomic::Ordering::SeqCst); | ||
info!(target: "proxy::call", message = "setting execution mode to Enabled"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The health should be automatically reset when the builder successfully returns the next payload. |
||
} | ||
Ok(()) | ||
} | ||
Err(e) => { | ||
// l2 request succeeded, but builder request failed | ||
// This state can only be recovered from if either the builder is restarted | ||
// or if a retry eventually goes through. | ||
error!(target: "proxy::call", method = self.method, message = "inconsistent responses from builder and L2"); | ||
let mut execution_mode = self.execution_mode.lock(); | ||
if *execution_mode == ExecutionMode::Enabled { | ||
*execution_mode = ExecutionMode::Disabled; | ||
// Drop before aquiring health lock | ||
drop(execution_mode); | ||
self.has_disabled_execution_mode | ||
.store(true, std::sync::atomic::Ordering::SeqCst); | ||
warn!(target: "proxy::call", message = "setting execution mode to Disabled"); | ||
// This health status will likely be later set back to healthy | ||
// but this should be enough to trigger a new leader election. | ||
self.probes.set_health(Health::PartialContent); | ||
}; | ||
|
||
bail!("failed to send request to builder: {e}"); | ||
} | ||
} | ||
} | ||
|
||
// Send a request, ensuring consistent responses from both the builder and l2 client. | ||
pub async fn send( | ||
&mut self, | ||
parts: request::Parts, | ||
body_bytes: Vec<u8>, | ||
) -> Result<HttpResponse, BoxError> { | ||
self.req_tx | ||
.send(Some((parts.clone(), body_bytes.clone())))?; | ||
|
||
self.res_rx.changed().await?; | ||
let res = self.res_rx.borrow_and_update(); | ||
match res.as_ref().expect("value should always be Some") { | ||
Ok((parts, body)) => Ok(HttpResponse::from_parts( | ||
parts.to_owned(), | ||
HttpBody::from(body.to_owned()), | ||
)), | ||
Err(e) => Err(format!("error sending consistent request: {e}").into()), | ||
} | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.