Skip to content

Commit 4b3e9a0

Browse files
committed
add error count and retries
1 parent fb1bc05 commit 4b3e9a0

File tree

5 files changed

+168
-10
lines changed

5 files changed

+168
-10
lines changed

metrics/exports.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@ var (
1818
FlashblocksLandedCount otelapi.Int64Gauge
1919
FlashblocksMissedCount otelapi.Int64Gauge
2020

21-
FlashtestationsLandedCount otelapi.Int64Gauge
22-
FlashtestationsMissedCount otelapi.Int64Gauge
23-
RegisteredFlashtestationsCount otelapi.Int64Gauge
24-
WorkloadAddedToPolicyCount otelapi.Int64Gauge
21+
FlashtestationsLandedCount otelapi.Int64Gauge
22+
FlashtestationsMissedCount otelapi.Int64Gauge
23+
RegisteredFlashtestationsCount otelapi.Int64Gauge
24+
RegisteredFlashtestationsErrorCount otelapi.Int64Gauge
25+
WorkloadAddedToPolicyCount otelapi.Int64Gauge
26+
WorkloadAddedToPolicyErrorCount otelapi.Int64Gauge
2527

2628
ReorgsCount otelapi.Int64Counter
2729
ReorgDepth otelapi.Int64Gauge
@@ -60,7 +62,9 @@ var (
6062
setupFlashtestationsLandedCount,
6163
setupFlashtestationsMissedCount,
6264
setupRegisteredFlashtestationsCount,
65+
setupRegisteredFlashtestationsErrorCount,
6366
setupWorkloadAddedToPolicyCount,
67+
setupWorkloadAddedToPolicyErrorCount,
6468

6569
setupReorgsCount,
6670
setupReorgDepth,

metrics/metrics.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,17 @@ func setupRegisteredFlashtestationsCount(ctx context.Context, _ *config.ProbeTx)
166166
return nil
167167
}
168168

169+
func setupRegisteredFlashtestationsErrorCount(ctx context.Context, _ *config.ProbeTx) error {
170+
m, err := meter.Int64Gauge("registered_flashtestations_error_count",
171+
otelapi.WithDescription("registered flashtestations error count"),
172+
)
173+
if err != nil {
174+
return err
175+
}
176+
RegisteredFlashtestationsErrorCount = m
177+
return nil
178+
}
179+
169180
func setupWorkloadAddedToPolicyCount(ctx context.Context, _ *config.ProbeTx) error {
170181
m, err := meter.Int64Gauge("workload_added_to_policy_count",
171182
otelapi.WithDescription("workload added to policy count"),
@@ -177,6 +188,17 @@ func setupWorkloadAddedToPolicyCount(ctx context.Context, _ *config.ProbeTx) err
177188
return nil
178189
}
179190

191+
func setupWorkloadAddedToPolicyErrorCount(ctx context.Context, _ *config.ProbeTx) error {
192+
m, err := meter.Int64Gauge("workload_added_to_policy_error_count",
193+
otelapi.WithDescription("workload added to policy error count"),
194+
)
195+
if err != nil {
196+
return err
197+
}
198+
WorkloadAddedToPolicyErrorCount = m
199+
return nil
200+
}
201+
180202
func setupBlocksSeenCount(ctx context.Context, _ *config.ProbeTx) error {
181203
m, err := meter.Int64Gauge("blocks_seen_count",
182204
otelapi.WithDescription("blocks seen by the monitor"),

rpc/call.go

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@ package rpc
33
import (
44
"context"
55
"fmt"
6+
"math"
67
"sync"
8+
"time"
79

810
"github.com/ethereum/go-ethereum/ethclient"
911
"github.com/flashbots/chain-monitor/utils"
@@ -177,3 +179,118 @@ func callFallbackThenMainWithResult[R any](
177179
var _nil R
178180
return _nil, utils.FlattenErrors(errs)
179181
}
182+
183+
// callMainThenFallbackWithBackoff calls the main RPC then fallback RPCs with exponential backoff
184+
func callMainThenFallbackWithBackoff(
185+
ctx context.Context,
186+
rpc *RPC,
187+
call func(ctx context.Context, rpc *ethclient.Client) error,
188+
maxRetries int,
189+
initialDelay time.Duration,
190+
) error {
191+
_, err := callMainThenFallbackWithBackoffAndResult(ctx, rpc, func(ctx context.Context, cli *ethclient.Client) (struct{}, error) {
192+
return struct{}{}, call(ctx, cli)
193+
}, maxRetries, initialDelay)
194+
return err
195+
}
196+
197+
// callMainThenFallbackWithBackoffAndResult calls the main RPC then fallback RPCs with exponential backoff
198+
func callMainThenFallbackWithBackoffAndResult[R any](
199+
ctx context.Context,
200+
rpc *RPC,
201+
call func(ctx context.Context, rpc *ethclient.Client) (R, error),
202+
maxRetries int,
203+
initialDelay time.Duration,
204+
) (R, error) {
205+
errs := make([]error, 0, len(rpc.fallback)+1)
206+
207+
// Try main with exponential backoff
208+
if res, err := retryWithBackoff(ctx, rpc, rpc.main, call, maxRetries, initialDelay); err == nil {
209+
return res, nil
210+
} else {
211+
errs = append(errs, fmt.Errorf("%s: %w", rpc.url.main, err))
212+
}
213+
214+
// Try fallback RPCs with exponential backoff
215+
for idx, fallback := range rpc.fallback {
216+
if res, err := retryWithBackoff(ctx, rpc, fallback, call, maxRetries, initialDelay); err == nil {
217+
return res, nil
218+
} else {
219+
errs = append(errs, fmt.Errorf("%s: %w", rpc.url.fallback[idx], err))
220+
}
221+
}
222+
223+
var _nil R
224+
return _nil, utils.FlattenErrors(errs)
225+
}
226+
227+
// callFallbackThenMainWithBackoffAndResult calls fallback RPCs then main RPC with exponential backoff
228+
func callFallbackThenMainWithBackoffAndResult[R any](
229+
ctx context.Context,
230+
rpc *RPC,
231+
call func(ctx context.Context, rpc *ethclient.Client) (R, error),
232+
maxRetries int,
233+
initialDelay time.Duration,
234+
) (R, error) {
235+
errs := make([]error, 0, len(rpc.fallback)+1)
236+
237+
// Try fallback RPCs with exponential backoff
238+
for idx, fallback := range rpc.fallback {
239+
if res, err := retryWithBackoff(ctx, rpc, fallback, call, maxRetries, initialDelay); err == nil {
240+
return res, nil
241+
} else {
242+
errs = append(errs, fmt.Errorf("%s: %w", rpc.url.fallback[idx], err))
243+
}
244+
}
245+
246+
// Try main with exponential backoff
247+
if res, err := retryWithBackoff(ctx, rpc, rpc.main, call, maxRetries, initialDelay); err == nil {
248+
return res, nil
249+
} else {
250+
errs = append(errs, fmt.Errorf("%s: %w", rpc.url.main, err))
251+
}
252+
253+
var _nil R
254+
return _nil, utils.FlattenErrors(errs)
255+
}
256+
257+
// retryWithBackoff retries a call with exponential backoff
258+
func retryWithBackoff[R any](
259+
ctx context.Context,
260+
rpc *RPC,
261+
client *ethclient.Client,
262+
call func(ctx context.Context, rpc *ethclient.Client) (R, error),
263+
maxRetries int,
264+
initialDelay time.Duration,
265+
) (R, error) {
266+
var lastErr error
267+
var result R
268+
269+
for attempt := 0; attempt <= maxRetries; attempt++ {
270+
_ctx, cancel := context.WithTimeout(ctx, rpc.timeout)
271+
res, err := call(_ctx, client)
272+
cancel()
273+
274+
if err == nil {
275+
return res, nil
276+
}
277+
278+
lastErr = err
279+
280+
// Don't sleep after the last attempt
281+
if attempt < maxRetries {
282+
// Calculate exponential backoff delay: initialDelay * 2^attempt
283+
delay := time.Duration(float64(initialDelay) * math.Pow(2, float64(attempt)))
284+
285+
// Wait for the backoff delay or context cancellation
286+
select {
287+
case <-ctx.Done():
288+
var _nil R
289+
return _nil, ctx.Err()
290+
case <-time.After(delay):
291+
}
292+
}
293+
}
294+
295+
return result, lastErr
296+
}

rpc/rpc.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -275,18 +275,18 @@ func (rpc *RPC) SendTransaction(ctx context.Context, tx *ethtypes.Transaction) e
275275
return err
276276
}
277277
rawTx := hexutil.Encode(data)
278-
return callMainThenFallback(ctx, rpc, func(ctx context.Context, cli *ethclient.Client) error {
278+
return callMainThenFallbackWithBackoff(ctx, rpc, func(ctx context.Context, cli *ethclient.Client) error {
279279
var txhash hexutil.Bytes
280280
return rpc.callCheckingNetworkID(ctx, cli, ethrpc.BatchElem{
281281
Method: "eth_sendRawTransaction",
282282
Args: []interface{}{rawTx},
283283
Result: &txhash,
284284
})
285-
})
285+
}, 3, 100*time.Millisecond) // 3 retries with 100ms initial delay
286286
}
287287

288288
func (rpc *RPC) TransactionReceipt(ctx context.Context, txHash ethcommon.Hash) (*ethtypes.Receipt, error) {
289-
return callFallbackThenMainWithResult(ctx, rpc, func(ctx context.Context, cli *ethclient.Client) (*ethtypes.Receipt, error) {
289+
return callFallbackThenMainWithBackoffAndResult(ctx, rpc, func(ctx context.Context, cli *ethclient.Client) (*ethtypes.Receipt, error) {
290290
var receipt *ethtypes.Receipt
291291
err := rpc.callCheckingNetworkID(ctx, cli, ethrpc.BatchElem{
292292
Method: "eth_getTransactionReceipt",
@@ -297,5 +297,5 @@ func (rpc *RPC) TransactionReceipt(ctx context.Context, txHash ethcommon.Hash) (
297297
return nil, err
298298
}
299299
return receipt, nil
300-
})
300+
}, 3, 100*time.Millisecond) // 3 retries with 100ms initial delay
301301
}

server/l2.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,10 @@ type L2 struct {
7373
flashtestationsLanded int64
7474
flashtestationsMissed int64
7575

76-
addWorkloadSeen int64
77-
registrationsSeen int64
76+
addWorkloadSeen int64
77+
addWorkloadError int64
78+
registrationsSeen int64
79+
registrationsError int64
7880

7981
processBlockFailuresCount uint
8082

@@ -522,6 +524,10 @@ func (l2 *L2) processBlock(ctx context.Context, blockNumber uint64) error {
522524
if l2.cfg.MonitorBuilderPolicyContract != "" && l2.isBuilderPolicyAddWorkloadIdTx(tx) {
523525
go func() {
524526
l2.handleAddWorkloadIdTx(ctx, tx.Hash())
527+
metrics.WorkloadAddedToPolicyErrorCount.Record(ctx, l2.addWorkloadError, otelapi.WithAttributes(
528+
attribute.KeyValue{Key: "kind", Value: attribute.StringValue("l2")},
529+
attribute.KeyValue{Key: "network_id", Value: attribute.Int64Value(l2.chainID.Int64())},
530+
))
525531
}()
526532
}
527533

@@ -533,6 +539,10 @@ func (l2 *L2) processBlock(ctx context.Context, blockNumber uint64) error {
533539
if l2.cfg.MonitorFlashtestationRegistryContract != "" && l2.isFlashtestationsRegisterTx(tx) {
534540
go func() {
535541
l2.handleRegistrationTx(ctx, tx.Hash())
542+
metrics.RegisteredFlashtestationsErrorCount.Record(ctx, l2.registrationsError, otelapi.WithAttributes(
543+
attribute.KeyValue{Key: "kind", Value: attribute.StringValue("l2")},
544+
attribute.KeyValue{Key: "network_id", Value: attribute.Int64Value(l2.chainID.Int64())},
545+
))
536546
}()
537547
}
538548

@@ -1146,6 +1156,7 @@ func (l2 *L2) handleRegistrationTx(ctx context.Context, txHash ethcommon.Hash) {
11461156
zap.Error(err),
11471157
zap.String("tx", txHash.Hex()),
11481158
)
1159+
l2.registrationsError++
11491160
return
11501161
}
11511162

@@ -1155,6 +1166,7 @@ func (l2 *L2) handleRegistrationTx(ctx context.Context, txHash ethcommon.Hash) {
11551166
zap.Error(err),
11561167
zap.String("tx", txHash.Hex()),
11571168
)
1169+
l2.registrationsError++
11581170
return
11591171
}
11601172

@@ -1181,13 +1193,15 @@ func (l2 *L2) handleAddWorkloadIdTx(ctx context.Context, txHash ethcommon.Hash)
11811193
zap.Error(err),
11821194
zap.String("tx", txHash.Hex()),
11831195
)
1196+
l2.addWorkloadError++
11841197
return
11851198
}
11861199

11871200
if receipt.Status == ethtypes.ReceiptStatusFailed {
11881201
l.Warn("Add workload id transaction did not succeed",
11891202
zap.String("tx", txHash.Hex()),
11901203
)
1204+
l2.addWorkloadError++
11911205
return
11921206
}
11931207

@@ -1211,6 +1225,7 @@ func (l2 *L2) handleAddWorkloadIdTx(ctx context.Context, txHash ethcommon.Hash)
12111225
}
12121226
}
12131227

1228+
l2.addWorkloadError++
12141229
l.Warn("WorkloadAddedToPolicy event not found in transaction",
12151230
zap.String("tx", txHash.Hex()),
12161231
)

0 commit comments

Comments
 (0)