Skip to content

Commit 9265700

Browse files
klkvrshekhirinmattsse
authored andcommitted
feat: parallelize recovery (#20169)
Co-authored-by: Alexey Shekhirin <[email protected]> Co-authored-by: Matthias Seitz <[email protected]>
1 parent 0b07657 commit 9265700

File tree

7 files changed

+131
-35
lines changed

7 files changed

+131
-35
lines changed

crates/engine/tree/benches/state_root_task.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -229,9 +229,15 @@ fn bench_state_root(c: &mut Criterion) {
229229
black_box({
230230
let mut handle = payload_processor.spawn(
231231
Default::default(),
232-
core::iter::empty::<
233-
Result<Recovered<TransactionSigned>, core::convert::Infallible>,
234-
>(),
232+
(
233+
core::iter::empty::<
234+
Result<
235+
Recovered<TransactionSigned>,
236+
core::convert::Infallible,
237+
>,
238+
>(),
239+
std::convert::identity,
240+
),
235241
StateProviderBuilder::new(provider.clone(), genesis_hash, None),
236242
OverlayStateProviderFactory::new(provider),
237243
&TreeConfig::default(),

crates/engine/tree/src/tree/payload_processor/mod.rs

Lines changed: 47 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use executor::WorkloadExecutor;
2121
use multiproof::{SparseTrieUpdate, *};
2222
use parking_lot::RwLock;
2323
use prewarm::PrewarmMetrics;
24+
use rayon::iter::{ParallelBridge, ParallelIterator};
2425
use reth_engine_primitives::ExecutableTxIterator;
2526
use reth_evm::{
2627
execute::{ExecutableTxFor, WithTxEnv},
@@ -40,6 +41,7 @@ use reth_trie_sparse::{
4041
};
4142
use reth_trie_sparse_parallel::{ParallelSparseTrie, ParallelismThresholds};
4243
use std::{
44+
collections::BTreeMap,
4345
sync::{
4446
atomic::AtomicBool,
4547
mpsc::{self, channel},
@@ -312,21 +314,50 @@ where
312314
mpsc::Receiver<Result<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>>,
313315
usize,
314316
) {
317+
let (transactions, convert) = transactions.into();
318+
let transactions = transactions.into_iter();
315319
// Get the transaction count for prewarming task
316320
// Use upper bound if available (more accurate), otherwise use lower bound
317321
let (lower, upper) = transactions.size_hint();
318322
let transaction_count_hint = upper.unwrap_or(lower);
319323

324+
// Spawn a task that iterates through all transactions in parallel and sends them to the
325+
// main task.
326+
let (tx, rx) = mpsc::channel();
327+
self.executor.spawn_blocking(move || {
328+
transactions.enumerate().par_bridge().for_each_with(tx, |sender, (idx, tx)| {
329+
let tx = convert(tx);
330+
let tx = tx.map(|tx| WithTxEnv { tx_env: tx.to_tx_env(), tx: Arc::new(tx) });
331+
let _ = sender.send((idx, tx));
332+
});
333+
});
334+
335+
// Spawn a task that processes out-of-order transactions from the task above and sends them
336+
// to prewarming and execution tasks.
320337
let (prewarm_tx, prewarm_rx) = mpsc::channel();
321338
let (execute_tx, execute_rx) = mpsc::channel();
322339
self.executor.spawn_blocking(move || {
323-
for tx in transactions {
324-
let tx = tx.map(|tx| WithTxEnv { tx_env: tx.to_tx_env(), tx: Arc::new(tx) });
340+
let mut next_for_execution = 0;
341+
let mut queue = BTreeMap::new();
342+
while let Ok((idx, tx)) = rx.recv() {
325343
// only send Ok(_) variants to prewarming task
326344
if let Ok(tx) = &tx {
327345
let _ = prewarm_tx.send(tx.clone());
328346
}
329-
let _ = execute_tx.send(tx);
347+
348+
if next_for_execution == idx {
349+
let _ = execute_tx.send(tx);
350+
next_for_execution += 1;
351+
352+
while let Some(entry) = queue.first_entry() &&
353+
*entry.key() == next_for_execution
354+
{
355+
let _ = execute_tx.send(entry.remove());
356+
next_for_execution += 1;
357+
}
358+
} else {
359+
queue.insert(idx, tx);
360+
}
330361
}
331362
});
332363

@@ -1017,13 +1048,19 @@ mod tests {
10171048

10181049
let provider_factory = BlockchainProvider::new(factory).unwrap();
10191050

1020-
let mut handle = payload_processor.spawn(
1021-
Default::default(),
1022-
core::iter::empty::<Result<Recovered<TransactionSigned>, core::convert::Infallible>>(),
1023-
StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None),
1024-
OverlayStateProviderFactory::new(provider_factory),
1025-
&TreeConfig::default(),
1026-
);
1051+
let mut handle =
1052+
payload_processor.spawn(
1053+
Default::default(),
1054+
(
1055+
core::iter::empty::<
1056+
Result<Recovered<TransactionSigned>, core::convert::Infallible>,
1057+
>(),
1058+
std::convert::identity,
1059+
),
1060+
StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None),
1061+
OverlayStateProviderFactory::new(provider_factory),
1062+
&TreeConfig::default(),
1063+
);
10271064

10281065
let mut state_hook = handle.state_hook();
10291066

crates/engine/tree/src/tree/payload_validator.rs

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -213,16 +213,31 @@ where
213213
Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
214214
{
215215
match input {
216-
BlockOrPayload::Payload(payload) => Ok(Either::Left(
217-
self.evm_config
216+
BlockOrPayload::Payload(payload) => {
217+
let (iter, convert) = self
218+
.evm_config
218219
.tx_iterator_for_payload(payload)
219220
.map_err(NewPayloadError::other)?
220-
.map(|res| res.map(Either::Left).map_err(NewPayloadError::other)),
221-
)),
221+
.into();
222+
223+
let iter = Either::Left(iter.into_iter().map(Either::Left));
224+
let convert = move |tx| {
225+
let Either::Left(tx) = tx else { unreachable!() };
226+
convert(tx).map(Either::Left).map_err(Either::Left)
227+
};
228+
229+
// Box the closure to satisfy the `Fn` bound both here and in the branch below
230+
Ok((iter, Box::new(convert) as Box<dyn Fn(_) -> _ + Send + Sync + 'static>))
231+
}
222232
BlockOrPayload::Block(block) => {
223-
Ok(Either::Right(block.body().clone_transactions().into_iter().map(|tx| {
224-
Ok(Either::Right(tx.try_into_recovered().map_err(NewPayloadError::other)?))
225-
})))
233+
let iter =
234+
Either::Right(block.body().clone_transactions().into_iter().map(Either::Right));
235+
let convert = move |tx: Either<_, N::SignedTx>| {
236+
let Either::Right(tx) = tx else { unreachable!() };
237+
tx.try_into_recovered().map(Either::Right).map_err(Either::Right)
238+
};
239+
240+
Ok((iter, Box::new(convert)))
226241
}
227242
}
228243
}

crates/ethereum/evm/src/lib.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -289,12 +289,15 @@ where
289289
&self,
290290
payload: &ExecutionData,
291291
) -> Result<impl ExecutableTxIterator<Self>, Self::Error> {
292-
Ok(payload.payload.transactions().clone().into_iter().map(|tx| {
292+
let txs = payload.payload.transactions().clone().into_iter();
293+
let convert = |tx: Bytes| {
293294
let tx =
294295
TxTy::<Self::Primitives>::decode_2718_exact(tx.as_ref()).map_err(AnyError::new)?;
295296
let signer = tx.try_recover().map_err(AnyError::new)?;
296297
Ok::<_, AnyError>(tx.with_signer(signer))
297-
}))
298+
};
299+
300+
Ok((txs, convert))
298301
}
299302
}
300303

crates/evm/evm/src/engine.rs

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,51 @@ pub trait ConfigureEngineEvm<ExecutionData>: ConfigureEvm {
1818
) -> Result<impl ExecutableTxIterator<Self>, Self::Error>;
1919
}
2020

21-
/// Iterator over executable transactions.
22-
pub trait ExecutableTxIterator<Evm: ConfigureEvm>:
23-
Iterator<Item = Result<Self::Tx, Self::Error>> + Send + 'static
24-
{
21+
/// A helper trait representing a pair of a "raw" transactions iterator and a closure that can be
22+
/// used to convert them to an executable transaction. This tuple is used in the engine to
23+
/// parallelize heavy work like decoding or recovery.
24+
pub trait ExecutableTxTuple: Into<(Self::Iter, Self::Convert)> + Send + 'static {
25+
/// Raw transaction that can be converted to an [`ExecutableTxTuple::Tx`]
26+
///
27+
/// This can be any type that can be converted to an [`ExecutableTxTuple::Tx`]. For example,
28+
/// an unrecovered transaction or just the transaction bytes.
29+
type RawTx: Send + Sync + 'static;
2530
/// The executable transaction type iterator yields.
26-
type Tx: ExecutableTxFor<Evm> + Clone + Send + Sync + 'static;
31+
type Tx: Clone + Send + Sync + 'static;
2732
/// Errors that may occur while recovering or decoding transactions.
2833
type Error: core::error::Error + Send + Sync + 'static;
34+
35+
/// Iterator over [`ExecutableTxTuple::Tx`]
36+
type Iter: Iterator<Item = Self::RawTx> + Send + 'static;
37+
/// Closure that can be used to convert a [`ExecutableTxTuple::RawTx`] to a
38+
/// [`ExecutableTxTuple::Tx`]. This might involve heavy work like decoding or recovery
39+
/// and will be parallelized in the engine.
40+
type Convert: Fn(Self::RawTx) -> Result<Self::Tx, Self::Error> + Send + Sync + 'static;
2941
}
3042

31-
impl<Evm: ConfigureEvm, Tx, Err, T> ExecutableTxIterator<Evm> for T
43+
impl<RawTx, Tx, Err, I, F> ExecutableTxTuple for (I, F)
3244
where
33-
Tx: ExecutableTxFor<Evm> + Clone + Send + Sync + 'static,
45+
RawTx: Send + Sync + 'static,
46+
Tx: Clone + Send + Sync + 'static,
3447
Err: core::error::Error + Send + Sync + 'static,
35-
T: Iterator<Item = Result<Tx, Err>> + Send + 'static,
48+
I: Iterator<Item = RawTx> + Send + 'static,
49+
F: Fn(RawTx) -> Result<Tx, Err> + Send + Sync + 'static,
3650
{
51+
type RawTx = RawTx;
3752
type Tx = Tx;
3853
type Error = Err;
54+
55+
type Iter = I;
56+
type Convert = F;
57+
}
58+
59+
/// Iterator over executable transactions.
60+
pub trait ExecutableTxIterator<Evm: ConfigureEvm>:
61+
ExecutableTxTuple<Tx: ExecutableTxFor<Evm>>
62+
{
63+
}
64+
65+
impl<T, Evm: ConfigureEvm> ExecutableTxIterator<Evm> for T where
66+
T: ExecutableTxTuple<Tx: ExecutableTxFor<Evm>>
67+
{
3968
}

crates/optimism/evm/src/lib.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use alloy_consensus::{BlockHeader, Header};
1616
use alloy_eips::Decodable2718;
1717
use alloy_evm::{EvmFactory, FromRecoveredTx, FromTxWithEncoded};
1818
use alloy_op_evm::block::{receipt_builder::OpReceiptBuilder, OpTxEnv};
19-
use alloy_primitives::U256;
19+
use alloy_primitives::{Bytes, U256};
2020
use core::fmt::Debug;
2121
use op_alloy_consensus::EIP1559ParamError;
2222
use op_alloy_rpc_types_engine::OpExecutionData;
@@ -265,12 +265,15 @@ where
265265
&self,
266266
payload: &OpExecutionData,
267267
) -> Result<impl ExecutableTxIterator<Self>, Self::Error> {
268-
Ok(payload.payload.transactions().clone().into_iter().map(|encoded| {
268+
let transactions = payload.payload.transactions().clone().into_iter();
269+
let convert = |encoded: Bytes| {
269270
let tx = TxTy::<Self::Primitives>::decode_2718_exact(encoded.as_ref())
270271
.map_err(AnyError::new)?;
271272
let signer = tx.try_recover().map_err(AnyError::new)?;
272273
Ok::<_, AnyError>(WithEncoded::new(encoded, tx.with_signer(signer)))
273-
}))
274+
};
275+
276+
Ok((transactions, convert))
274277
}
275278
}
276279

examples/custom-node/src/evm/config.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use reth_op::{
2525
primitives::SignedTransaction,
2626
};
2727
use reth_rpc_api::eth::helpers::pending_block::BuildPendingEnv;
28+
use revm_primitives::Bytes;
2829
use std::sync::Arc;
2930

3031
#[derive(Debug, Clone)]
@@ -126,13 +127,15 @@ impl ConfigureEngineEvm<CustomExecutionData> for CustomEvmConfig {
126127
&self,
127128
payload: &CustomExecutionData,
128129
) -> Result<impl ExecutableTxIterator<Self>, Self::Error> {
129-
Ok(payload.inner.payload.transactions().clone().into_iter().map(|encoded| {
130+
let transactions = payload.inner.payload.transactions().clone().into_iter();
131+
let convert = |encoded: Bytes| {
130132
let tx = CustomTransaction::decode_2718_exact(encoded.as_ref())
131133
.map_err(Into::into)
132134
.map_err(PayloadError::Decode)?;
133135
let signer = tx.try_recover().map_err(NewPayloadError::other)?;
134136
Ok::<_, NewPayloadError>(WithEncoded::new(encoded, tx.with_signer(signer)))
135-
}))
137+
};
138+
Ok((transactions, convert))
136139
}
137140
}
138141

0 commit comments

Comments
 (0)