Skip to content

Commit f66aad3

Browse files
authored
Calculate megabundle as soon as it's received (#112)
* Calculate megabundle as soon as its received * Make event loop non-blocking and queue tasks and interrupts
1 parent 1d7171c commit f66aad3

File tree

3 files changed

+82
-31
lines changed

3 files changed

+82
-31
lines changed

core/tx_pool.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -253,13 +253,14 @@ type TxPool struct {
253253
locals *accountSet // Set of local transaction to exempt from eviction rules
254254
journal *txJournal // Journal of local transaction to back up to disk
255255

256-
pending map[common.Address]*txList // All currently processable transactions
257-
queue map[common.Address]*txList // Queued but non-processable transactions
258-
beats map[common.Address]time.Time // Last heartbeat from each known account
259-
mevBundles []types.MevBundle
260-
megabundles map[common.Address]types.MevBundle // One megabundle per each trusted relay
261-
all *txLookup // All transactions to allow lookups
262-
priced *txPricedList // All transactions sorted by price
256+
pending map[common.Address]*txList // All currently processable transactions
257+
queue map[common.Address]*txList // Queued but non-processable transactions
258+
beats map[common.Address]time.Time // Last heartbeat from each known account
259+
mevBundles []types.MevBundle
260+
megabundles map[common.Address]types.MevBundle // One megabundle per each trusted relay
261+
NewMegabundleHooks []func(common.Address, *types.MevBundle)
262+
all *txLookup // All transactions to allow lookups
263+
priced *txPricedList // All transactions sorted by price
263264

264265
chainHeadCh chan ChainHeadEvent
265266
chainHeadSub event.Subscription
@@ -630,13 +631,20 @@ func (pool *TxPool) AddMegabundle(relayAddr common.Address, txs types.Transactio
630631
return errors.New("megabundle from non-trusted address")
631632
}
632633

633-
pool.megabundles[relayAddr] = types.MevBundle{
634+
megabundle := types.MevBundle{
634635
Txs: txs,
635636
BlockNumber: blockNumber,
636637
MinTimestamp: minTimestamp,
637638
MaxTimestamp: maxTimestamp,
638639
RevertingTxHashes: revertingTxHashes,
639640
}
641+
642+
pool.megabundles[relayAddr] = megabundle
643+
644+
for _, hook := range pool.NewMegabundleHooks {
645+
go hook(relayAddr, &megabundle)
646+
}
647+
640648
return nil
641649
}
642650

miner/multi_worker.go

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -105,16 +105,31 @@ func newMultiWorker(config *Config, chainConfig *params.ChainConfig, engine cons
105105
}))
106106
}
107107

108+
relayWorkerMap := make(map[common.Address]*worker)
109+
108110
for i := 0; i < len(config.TrustedRelays); i++ {
109-
workers = append(workers,
110-
newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, init, merger, &flashbotsData{
111-
isFlashbots: true,
112-
isMegabundleWorker: true,
113-
queue: queue,
114-
relayAddr: config.TrustedRelays[i],
115-
}))
111+
relayWorker := newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, init, merger, &flashbotsData{
112+
isFlashbots: true,
113+
isMegabundleWorker: true,
114+
queue: queue,
115+
relayAddr: config.TrustedRelays[i],
116+
})
117+
workers = append(workers, relayWorker)
118+
relayWorkerMap[config.TrustedRelays[i]] = relayWorker
116119
}
117120

121+
eth.TxPool().NewMegabundleHooks = append(eth.TxPool().NewMegabundleHooks, func(relayAddr common.Address, megabundle *types.MevBundle) {
122+
worker, found := relayWorkerMap[relayAddr]
123+
if !found {
124+
return
125+
}
126+
127+
select {
128+
case worker.newMegabundleCh <- megabundle:
129+
default:
130+
}
131+
})
132+
118133
log.Info("creating multi worker", "config.MaxMergedBundles", config.MaxMergedBundles, "config.TrustedRelays", config.TrustedRelays, "worker", len(workers))
119134
return &multiWorker{
120135
regularWorker: regularWorker,

miner/worker.go

Lines changed: 44 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ type worker struct {
157157
exitCh chan struct{}
158158
resubmitIntervalCh chan time.Duration
159159
resubmitAdjustCh chan *intervalAdjust
160+
newMegabundleCh chan *types.MevBundle
160161

161162
wg sync.WaitGroup
162163

@@ -240,15 +241,17 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus
240241
txsCh: make(chan core.NewTxsEvent, txChanSize),
241242
chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize),
242243
chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize),
243-
newWorkCh: make(chan *newWorkReq),
244+
newWorkCh: make(chan *newWorkReq, 1),
244245
taskCh: taskCh,
245246
resultCh: make(chan *types.Block, resultQueueSize),
246247
exitCh: exitCh,
247248
startCh: make(chan struct{}, 1),
249+
newMegabundleCh: make(chan *types.MevBundle),
248250
resubmitIntervalCh: make(chan time.Duration),
249251
resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize),
250252
flashbots: flashbots,
251253
}
254+
252255
// Subscribe NewTxsEvent for tx pool
253256
worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh)
254257
// Subscribe events for blockchain
@@ -391,26 +394,38 @@ func recalcRecommit(minRecommit, prev time.Duration, target float64, inc bool) t
391394
func (w *worker) newWorkLoop(recommit time.Duration) {
392395
defer w.wg.Done()
393396
var (
394-
interrupt *int32
395-
minRecommit = recommit // minimal resubmit interval specified by user.
396-
timestamp int64 // timestamp for each round of mining.
397+
runningInterrupt *int32 // Running task interrupt
398+
queuedInterrupt *int32 // Queued task interrupt
399+
minRecommit = recommit // minimal resubmit interval specified by user.
400+
timestamp int64 // timestamp for each round of mining.
397401
)
398402

399403
timer := time.NewTimer(0)
400404
defer timer.Stop()
401405
<-timer.C // discard the initial tick
402406

403-
// commit aborts in-flight transaction execution with given signal and resubmits a new one.
407+
// commit aborts in-flight transaction execution with highest seen signal and resubmits a new one
404408
commit := func(noempty bool, s int32) {
405-
if interrupt != nil {
406-
atomic.StoreInt32(interrupt, s)
407-
}
408-
interrupt = new(int32)
409409
select {
410-
case w.newWorkCh <- &newWorkReq{interrupt: interrupt, noempty: noempty, timestamp: timestamp}:
411410
case <-w.exitCh:
412411
return
412+
case queuedRequest := <-w.newWorkCh:
413+
// Previously queued request wasn't started yet, update the request and resubmit
414+
queuedRequest.noempty = queuedRequest.noempty || noempty
415+
queuedRequest.timestamp = timestamp
416+
w.newWorkCh <- queuedRequest // guaranteed to be nonblocking
417+
default:
418+
// Previously queued request has already started, cycle interrupt pointer and submit new work
419+
runningInterrupt = queuedInterrupt
420+
queuedInterrupt = new(int32)
421+
422+
w.newWorkCh <- &newWorkReq{interrupt: queuedInterrupt, noempty: noempty, timestamp: timestamp} // guaranteed to be nonblocking
413423
}
424+
425+
if runningInterrupt != nil && s > atomic.LoadInt32(runningInterrupt) {
426+
atomic.StoreInt32(runningInterrupt, s)
427+
}
428+
414429
timer.Reset(recommit)
415430
atomic.StoreInt32(&w.newTxs, 0)
416431
}
@@ -437,6 +452,11 @@ func (w *worker) newWorkLoop(recommit time.Duration) {
437452
timestamp = time.Now().Unix()
438453
commit(false, commitInterruptNewHead)
439454

455+
case <-w.newMegabundleCh:
456+
if w.isRunning() {
457+
commit(true, commitInterruptNone)
458+
}
459+
440460
case <-timer.C:
441461
// If mining is running resubmit a new work cycle periodically to pull in
442462
// higher priced transactions. Disable this overhead for pending blocks.
@@ -500,7 +520,10 @@ func (w *worker) mainLoop() {
500520
for {
501521
select {
502522
case req := <-w.newWorkCh:
503-
w.commitNewWork(req.interrupt, req.noempty, req.timestamp)
523+
// Don't start if the work has already been interrupted
524+
if req.interrupt == nil || atomic.LoadInt32(req.interrupt) == commitInterruptNone {
525+
w.commitNewWork(req.interrupt, req.noempty, req.timestamp)
526+
}
504527

505528
case ev := <-w.chainSideCh:
506529
// Short circuit for duplicate side blocks
@@ -761,10 +784,10 @@ func (w *worker) generateEnv(parent *types.Block, header *types.Header) (*enviro
761784
// makeCurrent creates a new environment for the current cycle.
762785
func (w *worker) makeCurrent(parent *types.Block, header *types.Header) error {
763786
env, err := w.generateEnv(parent, header)
764-
env.state.StartPrefetcher("miner")
765787
if err != nil {
766788
return err
767789
}
790+
env.state.StartPrefetcher("miner")
768791

769792
// Swap out the old work with the new one, terminating any leftover prefetcher
770793
// processes in the mean time and starting a new one.
@@ -869,7 +892,6 @@ func (w *worker) commitBundle(txs types.Transactions, coinbase common.Address, i
869892
// (2) worker start or restart, the interrupt signal is 1
870893
// (3) worker recreate the mining block with any newly arrived transactions, the interrupt signal is 2.
871894
// For the first two cases, the semi-finished work will be discarded.
872-
// For the third case, the semi-finished work will be submitted to the consensus engine.
873895
if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone {
874896
// Notify resubmit loop to increase resubmitting interval due to too frequent commits.
875897
if atomic.LoadInt32(interrupt) == commitInterruptResubmit {
@@ -881,8 +903,11 @@ func (w *worker) commitBundle(txs types.Transactions, coinbase common.Address, i
881903
ratio: ratio,
882904
inc: true,
883905
}
906+
return false
884907
}
885-
return atomic.LoadInt32(interrupt) == commitInterruptNewHead
908+
909+
// Discard the work as new head is present
910+
return true
886911
}
887912
// If we don't have enough gas for any further transactions then we're done
888913
if w.current.gasPool.Gas() < params.TxGas {
@@ -982,7 +1007,6 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin
9821007
// (2) worker start or restart, the interrupt signal is 1
9831008
// (3) worker recreate the mining block with any newly arrived transactions, the interrupt signal is 2.
9841009
// For the first two cases, the semi-finished work will be discarded.
985-
// For the third case, the semi-finished work will be submitted to the consensus engine.
9861010
if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone {
9871011
// Notify resubmit loop to increase resubmitting interval due to too frequent commits.
9881012
if atomic.LoadInt32(interrupt) == commitInterruptResubmit {
@@ -994,8 +1018,11 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin
9941018
ratio: ratio,
9951019
inc: true,
9961020
}
1021+
return false
9971022
}
998-
return atomic.LoadInt32(interrupt) == commitInterruptNewHead
1023+
1024+
// Discard the work as new head is present
1025+
return true
9991026
}
10001027
// If we don't have enough gas for any further transactions then we're done
10011028
if w.current.gasPool.Gas() < params.TxGas {
@@ -1223,6 +1250,7 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64)
12231250
if err != nil {
12241251
return // no valid megabundle for this relay, nothing to do
12251252
}
1253+
12261254
// Flashbots bundle merging duplicates work by simulating TXes and then committing them once more.
12271255
// Megabundles API focuses on speed and runs everything in one cycle.
12281256
coinbaseBalanceBefore := w.current.state.GetBalance(w.coinbase)

0 commit comments

Comments
 (0)