Skip to content

Commit b81156a

Browse files
committed
*: use txFeed in SubscribeQueuedTransactions
1 parent ddea6a4 commit b81156a

File tree

13 files changed

+20
-423
lines changed

13 files changed

+20
-423
lines changed

core/txpool/blobpool/blobpool.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -353,8 +353,6 @@ type BlobPool struct {
353353
discoverFeed event.Feed // Event feed to send out new tx events on pool discovery (reorg excluded)
354354
insertFeed event.Feed // Event feed to send out new tx events on pool inclusion (reorg included)
355355

356-
queuedTxFeed event.Feed
357-
358356
lock sync.RWMutex // Mutex protecting the pool during reorg handling
359357
}
360358

@@ -1744,8 +1742,6 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) {
17441742
return err
17451743
}
17461744

1747-
go p.queuedTxFeed.Send(core.NewQueuedTxsEvent{Txs: types.Transactions{tx}})
1748-
17491745
// If the address is not yet known, request exclusivity to track the account
17501746
// only by this subpool until all transactions are evicted
17511747
from, _ := types.Sender(p.signer, tx) // already validated above
@@ -2090,11 +2086,6 @@ func (p *BlobPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool
20902086
}
20912087
}
20922088

2093-
// SubscribeQueuedTransactions subscribes to new queued transaction events.
2094-
func (p *BlobPool) SubscribeQueuedTransactions(ch chan<- core.NewQueuedTxsEvent) event.Subscription {
2095-
return p.queuedTxFeed.Subscribe(ch)
2096-
}
2097-
20982089
// Nonce returns the next nonce of an account, with all transactions executable
20992090
// by the pool already applied on top.
21002091
func (p *BlobPool) Nonce(addr common.Address) uint64 {

core/txpool/legacypool/legacypool.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -234,8 +234,6 @@ type LegacyPool struct {
234234
signer types.Signer
235235
mu sync.RWMutex
236236

237-
queuedTxFeed event.Feed
238-
239237
currentHead atomic.Pointer[types.Header] // Current head of the blockchain
240238
currentState *state.StateDB // Current state in the blockchain head
241239
pendingNonces *noncer // Pending state tracking virtual nonces
@@ -406,11 +404,6 @@ func (pool *LegacyPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs
406404
return pool.txFeed.Subscribe(ch)
407405
}
408406

409-
// SubscribeQueuedTransactions subscribes to new queued transaction events.
410-
func (pool *LegacyPool) SubscribeQueuedTransactions(ch chan<- core.NewQueuedTxsEvent) event.Subscription {
411-
return pool.queuedTxFeed.Subscribe(ch)
412-
}
413-
414407
// SetGasTip updates the minimum gas tip required by the transaction pool for a
415408
// new transaction, and drops all transactions below this threshold.
416409
func (pool *LegacyPool) SetGasTip(tip *big.Int) {
@@ -677,9 +670,6 @@ func (pool *LegacyPool) add(tx *types.Transaction) (replaced bool, err error) {
677670
// already validated by this point
678671
from, _ := types.Sender(pool.signer, tx)
679672

680-
// Broadcast a new tx anyway if it's valid
681-
go pool.queuedTxFeed.Send(core.NewQueuedTxsEvent{Txs: types.Transactions{tx}})
682-
683673
// If the address is not yet known, request exclusivity to track the account
684674
// only by this subpool until all transactions are evicted
685675
var (

core/txpool/legacypool/legacypool_test.go

Lines changed: 0 additions & 167 deletions
Original file line numberDiff line numberDiff line change
@@ -320,34 +320,6 @@ func validateEvents(events chan core.NewTxsEvent, count int) error {
320320
return nil
321321
}
322322

323-
// validateQueuedEvents checks that the correct number of transaction addition events
324-
// were fired on the pool's event feed.
325-
func validateQueuedEvents(events chan core.NewQueuedTxsEvent, count int) error {
326-
var received []*types.Transaction
327-
328-
for len(received) < count {
329-
select {
330-
case ev := <-events:
331-
received = append(received, ev.Txs...)
332-
case <-time.After(time.Second):
333-
return fmt.Errorf("event #%d not fired", len(received))
334-
}
335-
}
336-
if len(received) > count {
337-
return fmt.Errorf("more than %d events fired: %v", count, received[count:])
338-
}
339-
select {
340-
case ev := <-events:
341-
return fmt.Errorf("more than %d events fired: %v", count, ev.Txs)
342-
343-
case <-time.After(50 * time.Millisecond):
344-
// This branch should be "default", but it's a data race between goroutines,
345-
// reading the event channel and pushing into it, so better wait a bit ensuring
346-
// really nothing gets injected.
347-
}
348-
return nil
349-
}
350-
351323
func deriveSender(tx *types.Transaction) (common.Address, error) {
352324
return types.Sender(types.HomesteadSigner{}, tx)
353325
}
@@ -469,145 +441,6 @@ func TestInvalidTransactions(t *testing.T) {
469441
}
470442
}
471443

472-
func TestSubscribePendingAndQueuedTransactions(t *testing.T) {
473-
t.Parallel()
474-
475-
// Create the pool to test the pricing enforcement with
476-
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabaseForTesting())
477-
blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed))
478-
479-
pool := New(testTxPoolConfig, blockchain)
480-
pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), newReserver())
481-
defer pool.Close()
482-
483-
// Keep listening for authenticated transactions.
484-
events := make(chan core.NewQueuedTxsEvent, 32)
485-
sub := pool.SubscribeQueuedTransactions(events)
486-
defer sub.Unsubscribe()
487-
488-
createAccounts := func(numbers int, txpool *LegacyPool) []*ecdsa.PrivateKey {
489-
keys := []*ecdsa.PrivateKey{}
490-
for i := 0; i < numbers; i++ {
491-
key, _ := crypto.GenerateKey()
492-
txpool.currentState.AddBalance(crypto.PubkeyToAddress(key.PublicKey), uint256.NewInt(1000000), tracing.BalanceChangeUnspecified)
493-
494-
keys = append(keys, key)
495-
}
496-
return keys
497-
}
498-
499-
tests := []struct {
500-
name string
501-
setup func(txPool *LegacyPool)
502-
validEvents int
503-
}{
504-
{
505-
name: "valid transaction",
506-
validEvents: 10,
507-
setup: func(txPool *LegacyPool) {
508-
keys := createAccounts(4, txPool)
509-
txs := types.Transactions{}
510-
511-
txs = append(txs, pricedTransaction(0, 100000, big.NewInt(2), keys[0]))
512-
txs = append(txs, pricedTransaction(1, 100000, big.NewInt(1), keys[0]))
513-
txs = append(txs, pricedTransaction(2, 100000, big.NewInt(2), keys[0]))
514-
515-
txs = append(txs, pricedTransaction(0, 100000, big.NewInt(1), keys[1]))
516-
txs = append(txs, pricedTransaction(1, 100000, big.NewInt(2), keys[1]))
517-
txs = append(txs, pricedTransaction(2, 100000, big.NewInt(2), keys[1]))
518-
519-
txs = append(txs, pricedTransaction(0, 100000, big.NewInt(2), keys[2]))
520-
txs = append(txs, pricedTransaction(1, 100000, big.NewInt(1), keys[2]))
521-
txs = append(txs, pricedTransaction(2, 100000, big.NewInt(2), keys[2]))
522-
523-
ltx := pricedTransaction(0, 100000, big.NewInt(1), keys[3])
524-
525-
pool.addRemotes(txs)
526-
pool.addRemoteSync(ltx)
527-
},
528-
},
529-
530-
{
531-
name: "duplicate transaction",
532-
validEvents: 1,
533-
setup: func(txPool *LegacyPool) {
534-
keys := createAccounts(1, txPool)
535-
txs := types.Transactions{}
536-
537-
txs = append(txs, pricedTransaction(0, 100000, big.NewInt(1), keys[0]))
538-
txs = append(txs, pricedTransaction(0, 100000, big.NewInt(1), keys[0]))
539-
540-
pool.addRemotesSync(txs)
541-
},
542-
},
543-
{
544-
name: "bump gas price",
545-
validEvents: 2,
546-
setup: func(txPool *LegacyPool) {
547-
keys := createAccounts(1, txPool)
548-
txs := types.Transactions{}
549-
550-
txs = append(txs, pricedTransaction(0, 100000, big.NewInt(1), keys[0]))
551-
txs = append(txs, pricedTransaction(0, 100000, big.NewInt(2), keys[0]))
552-
553-
pool.addRemotesSync(txs)
554-
},
555-
},
556-
{
557-
name: "duplicate nonce with different gas price",
558-
validEvents: 2,
559-
setup: func(txPool *LegacyPool) {
560-
keys := createAccounts(1, txPool)
561-
txs := types.Transactions{}
562-
563-
txs = append(txs, pricedTransaction(0, 100000, big.NewInt(1), keys[0]))
564-
txs = append(txs, pricedTransaction(0, 100000, big.NewInt(2), keys[0]))
565-
566-
pool.addRemotesSync(txs)
567-
},
568-
},
569-
{
570-
name: "duplicate nonce with different gas limit",
571-
validEvents: 2,
572-
setup: func(txPool *LegacyPool) {
573-
keys := createAccounts(1, txPool)
574-
txs := types.Transactions{}
575-
576-
txs = append(txs, pricedTransaction(0, 100000, big.NewInt(1), keys[0]))
577-
txs = append(txs, pricedTransaction(0, 80000, big.NewInt(1), keys[0]))
578-
579-
pool.addRemotesSync(txs)
580-
},
581-
},
582-
{
583-
name: "discontinuous nonce",
584-
validEvents: 2,
585-
setup: func(txPool *LegacyPool) {
586-
keys := createAccounts(1, txPool)
587-
txs := types.Transactions{}
588-
589-
txs = append(txs, pricedTransaction(0, 100000, big.NewInt(1), keys[0]))
590-
txs = append(txs, pricedTransaction(10000, 80000, big.NewInt(1), keys[0]))
591-
592-
pool.addRemotesSync(txs)
593-
},
594-
},
595-
}
596-
597-
for _, tt := range tests {
598-
t.Run(tt.name, func(t *testing.T) {
599-
tt.setup(pool)
600-
601-
if err := validateQueuedEvents(events, tt.validEvents); err != nil {
602-
t.Fatalf("event firing failed: %v", err)
603-
}
604-
if err := validatePoolInternals(pool); err != nil {
605-
t.Fatalf("pool internal state corrupted: %v", err)
606-
}
607-
})
608-
}
609-
}
610-
611444
func TestQueue(t *testing.T) {
612445
t.Parallel()
613446

core/txpool/subpool.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -158,9 +158,6 @@ type SubPool interface {
158158
// or also for reorged out ones.
159159
SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription
160160

161-
// SubscribeQueuedTransactions subscribes to new queued transaction events.
162-
SubscribeQueuedTransactions(ch chan<- core.NewQueuedTxsEvent) event.Subscription
163-
164161
// Nonce returns the next nonce of an account, with all transactions executable
165162
// by the pool already applied on top.
166163
Nonce(addr common.Address) uint64

core/txpool/txpool.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -385,15 +385,6 @@ func (p *TxPool) SubscribePendingLocalTxsEvent(ch chan<- core.PendingLocalTxsEve
385385
return p.subs.Track(event.JoinSubscriptions(subs...))
386386
}
387387

388-
// SubscribeNewQueuedTxsEvent registers a subscription of NewQueuedTxsEvent and
389-
func (p *TxPool) SubscribeNewQueuedTxsEvent(ch chan<- core.NewQueuedTxsEvent) event.Subscription {
390-
subs := make([]event.Subscription, len(p.subpools))
391-
for i, subpool := range p.subpools {
392-
subs[i] = subpool.SubscribeQueuedTransactions(ch)
393-
}
394-
return p.subs.Track(event.JoinSubscriptions(subs...))
395-
}
396-
397388
// SubscribeTransactions registers a subscription for new transaction events,
398389
// supporting feeding only newly seen or also resurrected transactions.
399390
func (p *TxPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription {

eth/api_backend.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -403,10 +403,6 @@ func (b *EthAPIBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.S
403403
return b.eth.txPool.SubscribeTransactions(ch, true)
404404
}
405405

406-
func (b *EthAPIBackend) SubscribeNewQueuedTxsEvent(ch chan<- core.NewQueuedTxsEvent) event.Subscription {
407-
return b.eth.txPool.SubscribeNewQueuedTxsEvent(ch)
408-
}
409-
410406
func (b *EthAPIBackend) SyncProgress(ctx context.Context) ethereum.SyncProgress {
411407
prog := b.eth.Downloader().Progress()
412408
if txProg, err := b.eth.blockchain.TxIndexProgress(); err == nil {

eth/filters/api.go

Lines changed: 0 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -205,75 +205,6 @@ func (api *FilterAPI) NewPendingTransactions(ctx context.Context, fullTx *bool)
205205
return rpcSub, nil
206206
}
207207

208-
// NewQueuedTransactionFilter creates a filter that fetches queued transaction hashes
209-
// as transactions enter the pending state.
210-
//
211-
// It is part of the filter package because this filter can be used through the
212-
// `eth_getFilterChanges` polling method that is also used for log filters.
213-
func (api *FilterAPI) NewQueuedTransactionFilter() rpc.ID {
214-
var (
215-
queuedTxs = make(chan []*types.Transaction)
216-
queuedTxSub = api.events.SubscribeQueuedTxs(queuedTxs)
217-
deadline = 5 * time.Minute
218-
)
219-
220-
api.filtersMu.Lock()
221-
api.filters[queuedTxSub.ID] = &filter{typ: QueuedTransactionsSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: queuedTxSub}
222-
api.filtersMu.Unlock()
223-
224-
go func() {
225-
for {
226-
select {
227-
case ph := <-queuedTxs:
228-
api.filtersMu.Lock()
229-
if f, found := api.filters[queuedTxSub.ID]; found {
230-
f.txs = append(f.txs, ph...)
231-
}
232-
api.filtersMu.Unlock()
233-
case <-queuedTxSub.Err():
234-
api.filtersMu.Lock()
235-
delete(api.filters, queuedTxSub.ID)
236-
api.filtersMu.Unlock()
237-
return
238-
}
239-
}
240-
}()
241-
242-
return queuedTxSub.ID
243-
}
244-
245-
// NewQueuedTransactions creates a subscription that is triggered each time a transaction
246-
// enters the transaction pool and was signed from one of the transactions this nodes manages.
247-
func (api *FilterAPI) NewQueuedTransactions(ctx context.Context) (*rpc.Subscription, error) {
248-
notifier, supported := rpc.NotifierFromContext(ctx)
249-
if !supported {
250-
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
251-
}
252-
253-
rpcSub := notifier.CreateSubscription()
254-
255-
go func() {
256-
txs := make(chan []*types.Transaction, 128)
257-
queuedTxSub := api.events.SubscribeQueuedTxs(txs)
258-
259-
for {
260-
select {
261-
case hashes := <-txs:
262-
// To keep the original behaviour, send a single tx hash in one notification.
263-
// TODO(rjl493456442) Send a batch of tx hashes in one notification
264-
for _, h := range hashes {
265-
notifier.Notify(rpcSub.ID, h)
266-
}
267-
case <-rpcSub.Err():
268-
queuedTxSub.Unsubscribe()
269-
return
270-
}
271-
}
272-
}()
273-
274-
return rpcSub, nil
275-
}
276-
277208
// NewBlockFilter creates a filter that fetches blocks that are imported into the chain.
278209
// It is part of the filter package since polling goes with eth_getFilterChanges.
279210
func (api *FilterAPI) NewBlockFilter() rpc.ID {
@@ -639,10 +570,6 @@ func (api *FilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
639570
f.txs = nil
640571
return hashes, nil
641572
}
642-
case QueuedTransactionsSubscription:
643-
txs := f.txs
644-
f.txs = nil
645-
return returnTransactions(txs), nil
646573
case LogsSubscription:
647574
logs := f.logs
648575
f.logs = nil
@@ -671,15 +598,6 @@ func returnLogs(logs []*types.Log) []*types.Log {
671598
return logs
672599
}
673600

674-
// returnTxs is a helper that will return an empty transaction array case the given transationcs array is nil,
675-
// otherwise the given transactions array is returned.
676-
func returnTransactions(txs []*types.Transaction) []*types.Transaction {
677-
if txs == nil {
678-
return []*types.Transaction{}
679-
}
680-
return txs
681-
}
682-
683601
// UnmarshalJSON sets *args fields with given data.
684602
func (args *FilterCriteria) UnmarshalJSON(data []byte) error {
685603
type input struct {

0 commit comments

Comments
 (0)