1
1
use crate :: client:: http:: HttpClient ;
2
- use crate :: server:: PayloadSource ;
2
+ use crate :: server:: { ExecutionMode , PayloadSource } ;
3
+ use crate :: { Health , Probes } ;
3
4
use alloy_rpc_types_engine:: JwtSecret ;
4
5
use http:: Uri ;
5
6
use jsonrpsee:: core:: { BoxError , http_helpers} ;
6
7
use jsonrpsee:: http_client:: { HttpBody , HttpRequest , HttpResponse } ;
8
+ use parking_lot:: Mutex ;
9
+ use std:: sync:: Arc ;
7
10
use std:: task:: { Context , Poll } ;
8
11
use std:: { future:: Future , pin:: Pin } ;
9
12
use tokio:: sync:: mpsc:: { UnboundedReceiver , UnboundedSender } ;
10
13
use tokio:: sync:: oneshot;
11
14
use tower:: { Layer , Service } ;
12
- use tracing:: info;
15
+ use tracing:: { error , info, warn } ;
13
16
14
17
const ENGINE_METHOD : & str = "engine_" ;
15
18
16
19
/// Requests that should be forwarded to both the builder and default execution client
17
- const FORWARD_REQUESTS : [ & str ; 6 ] = [
20
+ const FORWARD_REQUESTS : [ & str ; 2 ] = [
18
21
"eth_sendRawTransaction" ,
19
22
"eth_sendRawTransactionConditional" ,
23
+ ] ;
24
+
25
+ /// Handle these similar to FORWARD_REQUESTS, but enforce consistant responses
26
+ /// between the builder and default execution client.
27
+ const MINER_REQUESTS : [ & str ; 4 ] = [
20
28
"miner_setExtra" ,
21
29
"miner_setGasPrice" ,
22
30
"miner_setGasLimit" ,
@@ -29,6 +37,8 @@ pub struct ProxyLayer {
29
37
l2_auth_secret : JwtSecret ,
30
38
builder_auth_rpc : Uri ,
31
39
builder_auth_secret : JwtSecret ,
40
+ probes : Arc < Probes > ,
41
+ execution_mode : Arc < Mutex < ExecutionMode > > ,
32
42
}
33
43
34
44
impl ProxyLayer {
@@ -37,12 +47,16 @@ impl ProxyLayer {
37
47
l2_auth_secret : JwtSecret ,
38
48
builder_auth_rpc : Uri ,
39
49
builder_auth_secret : JwtSecret ,
50
+ probes : Arc < Probes > ,
51
+ execution_mode : Arc < Mutex < ExecutionMode > > ,
40
52
) -> Self {
41
53
ProxyLayer {
42
54
l2_auth_rpc,
43
55
l2_auth_secret,
44
56
builder_auth_rpc,
45
57
builder_auth_secret,
58
+ probes,
59
+ execution_mode,
46
60
}
47
61
}
48
62
}
75
89
l2_client,
76
90
builder_client,
77
91
engine_tx : tx,
92
+ probes : self . probes . clone ( ) ,
93
+ execution_mode : self . execution_mode . clone ( ) ,
78
94
} ;
79
95
80
96
service. process_engine_queue ( rx) ;
@@ -98,6 +114,8 @@ pub struct ProxyService<S> {
98
114
l2_client : HttpClient ,
99
115
builder_client : HttpClient ,
100
116
engine_tx : EngineResponseTx ,
117
+ probes : Arc < Probes > ,
118
+ execution_mode : Arc < Mutex < ExecutionMode > > ,
101
119
}
102
120
103
121
impl < S > ProxyService < S >
@@ -179,8 +197,32 @@ where
179
197
} ) ;
180
198
181
199
let l2_req = HttpRequest :: from_parts ( parts, HttpBody :: from ( body_bytes) ) ;
182
- info ! ( target: "proxy::call" , message = "forward request to default execution client" , ?method) ;
183
200
service. l2_client . forward ( l2_req, method) . await
201
+ } else if MINER_REQUESTS . contains ( & method. as_str ( ) ) {
202
+ // miner api, send to both the
203
+ // default execution client and the builder
204
+ let builder_req =
205
+ HttpRequest :: from_parts ( parts. clone ( ) , HttpBody :: from ( body_bytes. clone ( ) ) ) ;
206
+ let builder_method = method. clone ( ) ;
207
+ let mut builder_client = service. builder_client . clone ( ) ;
208
+
209
+ let l2_req = HttpRequest :: from_parts ( parts, HttpBody :: from ( body_bytes) ) ;
210
+ let ( builder_res, l2_res) = tokio:: join!(
211
+ builder_client. forward( builder_req, builder_method) ,
212
+ service. l2_client. forward( l2_req, method)
213
+ ) ;
214
+ if builder_res. is_ok ( ) != l2_res. is_ok ( ) {
215
+ error ! ( target: "proxy::call" , message = "inconsistent miner api responses from builder and L2" ) ;
216
+ let mut execution_mode = service. execution_mode . lock ( ) ;
217
+ if * execution_mode == ExecutionMode :: Enabled {
218
+ * execution_mode = ExecutionMode :: Fallback ;
219
+ // Drop before aquiring health lock
220
+ drop ( execution_mode) ;
221
+ warn ! ( target: "proxy::call" , message = "setting execution mode to Fallback" ) ;
222
+ service. probes . set_health ( Health :: PartialContent ) ;
223
+ }
224
+ }
225
+ l2_res
184
226
} else {
185
227
// If the request should not be forwarded, send directly to the
186
228
// default execution client
@@ -220,7 +262,7 @@ mod tests {
220
262
use std:: {
221
263
net:: { IpAddr , SocketAddr } ,
222
264
str:: FromStr ,
223
- sync:: { Arc , Mutex } ,
265
+ sync:: Arc ,
224
266
} ;
225
267
use tokio:: net:: TcpListener ;
226
268
use tokio:: task:: JoinHandle ;
@@ -246,12 +288,15 @@ mod tests {
246
288
async fn new ( ) -> eyre:: Result < Self > {
247
289
let builder = MockHttpServer :: serve ( ) . await ?;
248
290
let l2 = MockHttpServer :: serve ( ) . await ?;
291
+ let execution_mode = Arc :: new ( Mutex :: new ( ExecutionMode :: Enabled ) ) ;
292
+ let probes = Arc :: new ( Probes :: default ( ) ) ;
249
293
let middleware = tower:: ServiceBuilder :: new ( ) . layer ( ProxyLayer :: new (
250
294
format ! ( "http://{}:{}" , l2. addr. ip( ) , l2. addr. port( ) ) . parse :: < Uri > ( ) ?,
251
295
JwtSecret :: random ( ) ,
252
296
format ! ( "http://{}:{}" , builder. addr. ip( ) , builder. addr. port( ) ) . parse :: < Uri > ( ) ?,
253
297
JwtSecret :: random ( ) ,
254
- // None,
298
+ probes. clone ( ) ,
299
+ execution_mode. clone ( ) ,
255
300
) ) ;
256
301
257
302
let temp_listener = TcpListener :: bind ( "0.0.0.0:0" ) . await ?;
@@ -361,7 +406,7 @@ mod tests {
361
406
}
362
407
} ;
363
408
364
- requests. lock ( ) . unwrap ( ) . push ( request_body. clone ( ) ) ;
409
+ requests. lock ( ) . push ( request_body. clone ( ) ) ;
365
410
366
411
let method = request_body[ "method" ] . as_str ( ) . unwrap_or_default ( ) ;
367
412
@@ -439,7 +484,8 @@ mod tests {
439
484
}
440
485
441
486
async fn health_check ( ) {
442
- let proxy_server = spawn_proxy_server ( ) . await ;
487
+ let execution_mode = Arc :: new ( Mutex :: new ( ExecutionMode :: Enabled ) ) ;
488
+ let proxy_server = spawn_proxy_server ( execution_mode) . await ;
443
489
// Create a new HTTP client
444
490
let client: Client < HttpConnector , HttpBody > =
445
491
Client :: builder ( TokioExecutor :: new ( ) ) . build_http ( ) ;
@@ -456,8 +502,9 @@ mod tests {
456
502
}
457
503
458
504
async fn send_request ( method : & str ) -> Result < String , ClientError > {
505
+ let execution_mode = Arc :: new ( Mutex :: new ( ExecutionMode :: Enabled ) ) ;
459
506
let server = spawn_server ( ) . await ;
460
- let proxy_server = spawn_proxy_server ( ) . await ;
507
+ let proxy_server = spawn_proxy_server ( execution_mode ) . await ;
461
508
let proxy_client = HttpClient :: builder ( )
462
509
. build ( format ! ( "http://{ADDR}:{PORT}" ) )
463
510
. unwrap ( ) ;
@@ -494,7 +541,7 @@ mod tests {
494
541
}
495
542
496
543
/// Spawn a new RPC server with a proxy layer.
497
- async fn spawn_proxy_server ( ) -> ServerHandle {
544
+ async fn spawn_proxy_server ( execution_mode : Arc < Mutex < ExecutionMode > > ) -> ServerHandle {
498
545
let addr = format ! ( "{ADDR}:{PORT}" ) ;
499
546
500
547
let jwt = JwtSecret :: random ( ) ;
@@ -505,9 +552,16 @@ mod tests {
505
552
. parse :: < Uri > ( )
506
553
. unwrap ( ) ;
507
554
508
- let ( probe_layer, _probes ) = ProbeLayer :: new ( ) ;
555
+ let ( probe_layer, probes ) = ProbeLayer :: new ( ) ;
509
556
510
- let proxy_layer = ProxyLayer :: new ( l2_auth_uri. clone ( ) , jwt, l2_auth_uri, jwt) ;
557
+ let proxy_layer = ProxyLayer :: new (
558
+ l2_auth_uri. clone ( ) ,
559
+ jwt,
560
+ l2_auth_uri,
561
+ jwt,
562
+ probes,
563
+ execution_mode,
564
+ ) ;
511
565
512
566
// Create a layered server
513
567
let server = ServerBuilder :: default ( )
@@ -557,7 +611,7 @@ mod tests {
557
611
558
612
// Assert the builder received the correct payload
559
613
let builder = & test_harness. builder ;
560
- let builder_requests = builder. requests . lock ( ) . unwrap ( ) ;
614
+ let builder_requests = builder. requests . lock ( ) ;
561
615
let builder_req = builder_requests. first ( ) . unwrap ( ) ;
562
616
assert_eq ! ( builder_requests. len( ) , 1 ) ;
563
617
assert_eq ! ( builder_req[ "method" ] , expected_method) ;
@@ -566,7 +620,7 @@ mod tests {
566
620
567
621
// Assert the l2 received the correct payload
568
622
let l2 = & test_harness. l2 ;
569
- let l2_requests = l2. requests . lock ( ) . unwrap ( ) ;
623
+ let l2_requests = l2. requests . lock ( ) ;
570
624
let l2_req = l2_requests. first ( ) . unwrap ( ) ;
571
625
assert_eq ! ( l2_requests. len( ) , 1 ) ;
572
626
assert_eq ! ( l2_req[ "method" ] , expected_method) ;
@@ -593,15 +647,15 @@ mod tests {
593
647
594
648
// Assert the builder received the correct payload
595
649
let builder = & test_harness. builder ;
596
- let builder_requests = builder. requests . lock ( ) . unwrap ( ) ;
650
+ let builder_requests = builder. requests . lock ( ) ;
597
651
let builder_req = builder_requests. first ( ) . unwrap ( ) ;
598
652
assert_eq ! ( builder_requests. len( ) , 1 ) ;
599
653
assert_eq ! ( builder_req[ "method" ] , expected_method) ;
600
654
assert_eq ! ( builder_req[ "params" ] [ 0 ] , expected_tx) ;
601
655
602
656
// Assert the l2 received the correct payload
603
657
let l2 = & test_harness. l2 ;
604
- let l2_requests = l2. requests . lock ( ) . unwrap ( ) ;
658
+ let l2_requests = l2. requests . lock ( ) ;
605
659
let l2_req = l2_requests. first ( ) . unwrap ( ) ;
606
660
assert_eq ! ( l2_requests. len( ) , 1 ) ;
607
661
assert_eq ! ( l2_req[ "method" ] , expected_method) ;
@@ -630,7 +684,7 @@ mod tests {
630
684
let expected_conditionals = json ! ( transact_conditionals) ;
631
685
// Assert the builder received the correct payload
632
686
let builder = & test_harness. builder ;
633
- let builder_requests = builder. requests . lock ( ) . unwrap ( ) ;
687
+ let builder_requests = builder. requests . lock ( ) ;
634
688
let builder_req = builder_requests. first ( ) . unwrap ( ) ;
635
689
assert_eq ! ( builder_requests. len( ) , 1 ) ;
636
690
assert_eq ! ( builder_req[ "method" ] , expected_method) ;
@@ -639,7 +693,7 @@ mod tests {
639
693
640
694
// Assert the l2 received the correct payload
641
695
let l2 = & test_harness. l2 ;
642
- let l2_requests = l2. requests . lock ( ) . unwrap ( ) ;
696
+ let l2_requests = l2. requests . lock ( ) ;
643
697
let l2_req = l2_requests. first ( ) . unwrap ( ) ;
644
698
assert_eq ! ( l2_requests. len( ) , 1 ) ;
645
699
assert_eq ! ( l2_req[ "method" ] , expected_method) ;
@@ -666,15 +720,15 @@ mod tests {
666
720
667
721
// Assert the builder received the correct payload
668
722
let builder = & test_harness. builder ;
669
- let builder_requests = builder. requests . lock ( ) . unwrap ( ) ;
723
+ let builder_requests = builder. requests . lock ( ) ;
670
724
let builder_req = builder_requests. first ( ) . unwrap ( ) ;
671
725
assert_eq ! ( builder_requests. len( ) , 1 ) ;
672
726
assert_eq ! ( builder_req[ "method" ] , expected_method) ;
673
727
assert_eq ! ( builder_req[ "params" ] [ 0 ] , expected_extra) ;
674
728
675
729
// Assert the l2 received the correct payload
676
730
let l2 = & test_harness. l2 ;
677
- let l2_requests = l2. requests . lock ( ) . unwrap ( ) ;
731
+ let l2_requests = l2. requests . lock ( ) ;
678
732
let l2_req = l2_requests. first ( ) . unwrap ( ) ;
679
733
assert_eq ! ( l2_requests. len( ) , 1 ) ;
680
734
assert_eq ! ( l2_req[ "method" ] , expected_method) ;
@@ -700,15 +754,15 @@ mod tests {
700
754
701
755
// Assert the builder received the correct payload
702
756
let builder = & test_harness. builder ;
703
- let builder_requests = builder. requests . lock ( ) . unwrap ( ) ;
757
+ let builder_requests = builder. requests . lock ( ) ;
704
758
let builder_req = builder_requests. first ( ) . unwrap ( ) ;
705
759
assert_eq ! ( builder_requests. len( ) , 1 ) ;
706
760
assert_eq ! ( builder_req[ "method" ] , expected_method) ;
707
761
assert_eq ! ( builder_req[ "params" ] [ 0 ] , expected_price) ;
708
762
709
763
// Assert the l2 received the correct payload
710
764
let l2 = & test_harness. l2 ;
711
- let l2_requests = l2. requests . lock ( ) . unwrap ( ) ;
765
+ let l2_requests = l2. requests . lock ( ) ;
712
766
let l2_req = l2_requests. first ( ) . unwrap ( ) ;
713
767
assert_eq ! ( l2_requests. len( ) , 1 ) ;
714
768
assert_eq ! ( l2_req[ "method" ] , expected_method) ;
@@ -735,15 +789,15 @@ mod tests {
735
789
736
790
// Assert the builder received the correct payload
737
791
let builder = & test_harness. builder ;
738
- let builder_requests = builder. requests . lock ( ) . unwrap ( ) ;
792
+ let builder_requests = builder. requests . lock ( ) ;
739
793
let builder_req = builder_requests. first ( ) . unwrap ( ) ;
740
794
assert_eq ! ( builder_requests. len( ) , 1 ) ;
741
795
assert_eq ! ( builder_req[ "method" ] , expected_method) ;
742
796
assert_eq ! ( builder_req[ "params" ] [ 0 ] , expected_price) ;
743
797
744
798
// Assert the l2 received the correct payload
745
799
let l2 = & test_harness. l2 ;
746
- let l2_requests = l2. requests . lock ( ) . unwrap ( ) ;
800
+ let l2_requests = l2. requests . lock ( ) ;
747
801
let l2_req = l2_requests. first ( ) . unwrap ( ) ;
748
802
assert_eq ! ( l2_requests. len( ) , 1 ) ;
749
803
assert_eq ! ( l2_req[ "method" ] , expected_method) ;
@@ -769,12 +823,12 @@ mod tests {
769
823
770
824
// Assert the builder has not received the payload
771
825
let builder = & test_harness. builder ;
772
- let builder_requests = builder. requests . lock ( ) . unwrap ( ) ;
826
+ let builder_requests = builder. requests . lock ( ) ;
773
827
assert_eq ! ( builder_requests. len( ) , 0 ) ;
774
828
775
829
// Assert the l2 auth received the correct payload
776
830
let l2 = & test_harness. l2 ;
777
- let l2_requests = l2. requests . lock ( ) . unwrap ( ) ;
831
+ let l2_requests = l2. requests . lock ( ) ;
778
832
let l2_req = l2_requests. first ( ) . unwrap ( ) ;
779
833
assert_eq ! ( l2_requests. len( ) , 1 ) ;
780
834
assert_eq ! ( l2_req[ "method" ] , expected_method) ;
0 commit comments