Skip to content

Commit cc4d405

Browse files
committed
*: use queuedTxFeed in SubscribeNewTxsEvent
1 parent ddea6a4 commit cc4d405

File tree

13 files changed

+30
-234
lines changed

13 files changed

+30
-234
lines changed

core/txpool/blobpool/blobpool.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1744,7 +1744,7 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) {
17441744
return err
17451745
}
17461746

1747-
go p.queuedTxFeed.Send(core.NewQueuedTxsEvent{Txs: types.Transactions{tx}})
1747+
go p.queuedTxFeed.Send(core.NewTxsEvent{Txs: types.Transactions{tx}})
17481748

17491749
// If the address is not yet known, request exclusivity to track the account
17501750
// only by this subpool until all transactions are evicted
@@ -2091,7 +2091,7 @@ func (p *BlobPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool
20912091
}
20922092

20932093
// SubscribeQueuedTransactions subscribes to new queued transaction events.
2094-
func (p *BlobPool) SubscribeQueuedTransactions(ch chan<- core.NewQueuedTxsEvent) event.Subscription {
2094+
func (p *BlobPool) SubscribeQueuedTransactions(ch chan<- core.NewTxsEvent) event.Subscription {
20952095
return p.queuedTxFeed.Subscribe(ch)
20962096
}
20972097

core/txpool/legacypool/legacypool.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -407,7 +407,7 @@ func (pool *LegacyPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs
407407
}
408408

409409
// SubscribeQueuedTransactions subscribes to new queued transaction events.
410-
func (pool *LegacyPool) SubscribeQueuedTransactions(ch chan<- core.NewQueuedTxsEvent) event.Subscription {
410+
func (pool *LegacyPool) SubscribeQueuedTransactions(ch chan<- core.NewTxsEvent) event.Subscription {
411411
return pool.queuedTxFeed.Subscribe(ch)
412412
}
413413

@@ -678,7 +678,7 @@ func (pool *LegacyPool) add(tx *types.Transaction) (replaced bool, err error) {
678678
from, _ := types.Sender(pool.signer, tx)
679679

680680
// Broadcast a new tx anyway if it's valid
681-
go pool.queuedTxFeed.Send(core.NewQueuedTxsEvent{Txs: types.Transactions{tx}})
681+
go pool.queuedTxFeed.Send(core.NewTxsEvent{Txs: types.Transactions{tx}})
682682

683683
// If the address is not yet known, request exclusivity to track the account
684684
// only by this subpool until all transactions are evicted

core/txpool/legacypool/legacypool_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,7 @@ func validateEvents(events chan core.NewTxsEvent, count int) error {
322322

323323
// validateQueuedEvents checks that the correct number of transaction addition events
324324
// were fired on the pool's event feed.
325-
func validateQueuedEvents(events chan core.NewQueuedTxsEvent, count int) error {
325+
func validateQueuedEvents(events chan core.NewTxsEvent, count int) error {
326326
var received []*types.Transaction
327327

328328
for len(received) < count {
@@ -481,7 +481,7 @@ func TestSubscribePendingAndQueuedTransactions(t *testing.T) {
481481
defer pool.Close()
482482

483483
// Keep listening for authenticated transactions.
484-
events := make(chan core.NewQueuedTxsEvent, 32)
484+
events := make(chan core.NewTxsEvent, 32)
485485
sub := pool.SubscribeQueuedTransactions(events)
486486
defer sub.Unsubscribe()
487487

core/txpool/subpool.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ type SubPool interface {
159159
SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription
160160

161161
// SubscribeQueuedTransactions subscribes to new queued transaction events.
162-
SubscribeQueuedTransactions(ch chan<- core.NewQueuedTxsEvent) event.Subscription
162+
SubscribeQueuedTransactions(ch chan<- core.NewTxsEvent) event.Subscription
163163

164164
// Nonce returns the next nonce of an account, with all transactions executable
165165
// by the pool already applied on top.

core/txpool/txpool.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -385,8 +385,9 @@ func (p *TxPool) SubscribePendingLocalTxsEvent(ch chan<- core.PendingLocalTxsEve
385385
return p.subs.Track(event.JoinSubscriptions(subs...))
386386
}
387387

388-
// SubscribeNewQueuedTxsEvent registers a subscription of NewQueuedTxsEvent and
389-
func (p *TxPool) SubscribeNewQueuedTxsEvent(ch chan<- core.NewQueuedTxsEvent) event.Subscription {
388+
// SubscribeNewQueuedTxsEvent registers a subscription of NewTxsEvent and
389+
// starts sending event to the given channel.
390+
func (p *TxPool) SubscribeNewQueuedTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
390391
subs := make([]event.Subscription, len(p.subpools))
391392
for i, subpool := range p.subpools {
392393
subs[i] = subpool.SubscribeQueuedTransactions(ch)

eth/api_backend.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -400,10 +400,6 @@ func (b *EthAPIBackend) TxPool() *txpool.TxPool {
400400
}
401401

402402
func (b *EthAPIBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
403-
return b.eth.txPool.SubscribeTransactions(ch, true)
404-
}
405-
406-
func (b *EthAPIBackend) SubscribeNewQueuedTxsEvent(ch chan<- core.NewQueuedTxsEvent) event.Subscription {
407403
return b.eth.txPool.SubscribeNewQueuedTxsEvent(ch)
408404
}
409405

eth/filters/api.go

Lines changed: 0 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -205,75 +205,6 @@ func (api *FilterAPI) NewPendingTransactions(ctx context.Context, fullTx *bool)
205205
return rpcSub, nil
206206
}
207207

208-
// NewQueuedTransactionFilter creates a filter that fetches queued transaction hashes
209-
// as transactions enter the pending state.
210-
//
211-
// It is part of the filter package because this filter can be used through the
212-
// `eth_getFilterChanges` polling method that is also used for log filters.
213-
func (api *FilterAPI) NewQueuedTransactionFilter() rpc.ID {
214-
var (
215-
queuedTxs = make(chan []*types.Transaction)
216-
queuedTxSub = api.events.SubscribeQueuedTxs(queuedTxs)
217-
deadline = 5 * time.Minute
218-
)
219-
220-
api.filtersMu.Lock()
221-
api.filters[queuedTxSub.ID] = &filter{typ: QueuedTransactionsSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: queuedTxSub}
222-
api.filtersMu.Unlock()
223-
224-
go func() {
225-
for {
226-
select {
227-
case ph := <-queuedTxs:
228-
api.filtersMu.Lock()
229-
if f, found := api.filters[queuedTxSub.ID]; found {
230-
f.txs = append(f.txs, ph...)
231-
}
232-
api.filtersMu.Unlock()
233-
case <-queuedTxSub.Err():
234-
api.filtersMu.Lock()
235-
delete(api.filters, queuedTxSub.ID)
236-
api.filtersMu.Unlock()
237-
return
238-
}
239-
}
240-
}()
241-
242-
return queuedTxSub.ID
243-
}
244-
245-
// NewQueuedTransactions creates a subscription that is triggered each time a transaction
246-
// enters the transaction pool and was signed from one of the transactions this nodes manages.
247-
func (api *FilterAPI) NewQueuedTransactions(ctx context.Context) (*rpc.Subscription, error) {
248-
notifier, supported := rpc.NotifierFromContext(ctx)
249-
if !supported {
250-
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
251-
}
252-
253-
rpcSub := notifier.CreateSubscription()
254-
255-
go func() {
256-
txs := make(chan []*types.Transaction, 128)
257-
queuedTxSub := api.events.SubscribeQueuedTxs(txs)
258-
259-
for {
260-
select {
261-
case hashes := <-txs:
262-
// To keep the original behaviour, send a single tx hash in one notification.
263-
// TODO(rjl493456442) Send a batch of tx hashes in one notification
264-
for _, h := range hashes {
265-
notifier.Notify(rpcSub.ID, h)
266-
}
267-
case <-rpcSub.Err():
268-
queuedTxSub.Unsubscribe()
269-
return
270-
}
271-
}
272-
}()
273-
274-
return rpcSub, nil
275-
}
276-
277208
// NewBlockFilter creates a filter that fetches blocks that are imported into the chain.
278209
// It is part of the filter package since polling goes with eth_getFilterChanges.
279210
func (api *FilterAPI) NewBlockFilter() rpc.ID {
@@ -639,10 +570,6 @@ func (api *FilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
639570
f.txs = nil
640571
return hashes, nil
641572
}
642-
case QueuedTransactionsSubscription:
643-
txs := f.txs
644-
f.txs = nil
645-
return returnTransactions(txs), nil
646573
case LogsSubscription:
647574
logs := f.logs
648575
f.logs = nil
@@ -671,15 +598,6 @@ func returnLogs(logs []*types.Log) []*types.Log {
671598
return logs
672599
}
673600

674-
// returnTxs is a helper that will return an empty transaction array case the given transationcs array is nil,
675-
// otherwise the given transactions array is returned.
676-
func returnTransactions(txs []*types.Transaction) []*types.Transaction {
677-
if txs == nil {
678-
return []*types.Transaction{}
679-
}
680-
return txs
681-
}
682-
683601
// UnmarshalJSON sets *args fields with given data.
684602
func (args *FilterCriteria) UnmarshalJSON(data []byte) error {
685603
type input struct {

eth/filters/filter_system.go

Lines changed: 19 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ type Backend interface {
6868
ChainConfig() *params.ChainConfig
6969
HistoryPruningCutoff() uint64
7070
SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
71-
SubscribeNewQueuedTxsEvent(chan<- core.NewQueuedTxsEvent) event.Subscription
7271
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
7372
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
7473
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
@@ -157,9 +156,6 @@ const (
157156
// PendingTransactionsSubscription queries for pending transactions entering
158157
// the pending state
159158
PendingTransactionsSubscription
160-
// QueuedTransactionsSubscription queries tx hashes for queued
161-
// transactions entering the queued state
162-
QueuedTransactionsSubscription
163159
// BlocksSubscription queries hashes for blocks that are imported
164160
BlocksSubscription
165161
// TransactionReceiptsSubscription queries for transaction receipts when transactions are included in blocks
@@ -172,9 +168,6 @@ const (
172168
// txChanSize is the size of channel listening to NewTxsEvent.
173169
// The number is referenced from the size of tx pool.
174170
txChanSize = 4096
175-
// queuedTxChanSize is the size of channel listening to NewQueuedTxsEvent.
176-
// The number is referenced from the size of tx pool.
177-
queuedTxChanSize = 4096
178171
// rmLogsChanSize is the size of channel listening to RemovedLogsEvent.
179172
rmLogsChanSize = 10
180173
// logsChanSize is the size of channel listening to LogsEvent.
@@ -190,8 +183,6 @@ type subscription struct {
190183
logsCrit ethereum.FilterQuery
191184
logs chan []*types.Log
192185
txs chan []*types.Transaction
193-
hashes chan []common.Hash
194-
queuedTxs chan []*types.Transaction
195186
headers chan *types.Header
196187
receipts chan []*ReceiptWithTx
197188
txHashes map[common.Hash]bool // contains transaction hashes for transactionReceipts subscription filtering
@@ -206,20 +197,18 @@ type EventSystem struct {
206197
sys *FilterSystem
207198

208199
// Subscriptions
209-
txsSub event.Subscription // Subscription for new transaction event
210-
queuedTxsSub event.Subscription // Subscription for new queued transaction event
211-
logsSub event.Subscription // Subscription for new log event
212-
rmLogsSub event.Subscription // Subscription for removed log event
213-
chainSub event.Subscription // Subscription for new chain event
200+
txsSub event.Subscription // Subscription for new transaction event
201+
logsSub event.Subscription // Subscription for new log event
202+
rmLogsSub event.Subscription // Subscription for removed log event
203+
chainSub event.Subscription // Subscription for new chain event
214204

215205
// Channels
216-
install chan *subscription // install filter for event notification
217-
uninstall chan *subscription // remove filter for event notification
218-
txsCh chan core.NewTxsEvent // Channel to receive new transactions event
219-
queuedTxsCh chan core.NewQueuedTxsEvent // Channel to receive new queued transactions event
220-
logsCh chan []*types.Log // Channel to receive new log event
221-
rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
222-
chainCh chan core.ChainEvent // Channel to receive new chain event
206+
install chan *subscription // install filter for event notification
207+
uninstall chan *subscription // remove filter for event notification
208+
txsCh chan core.NewTxsEvent // Channel to receive new transactions event
209+
logsCh chan []*types.Log // Channel to receive new log event
210+
rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
211+
chainCh chan core.ChainEvent // Channel to receive new chain event
223212
}
224213

225214
// NewEventSystem creates a new manager that listens for event on the given mux,
@@ -230,26 +219,24 @@ type EventSystem struct {
230219
// or by stopping the given mux.
231220
func NewEventSystem(sys *FilterSystem) *EventSystem {
232221
m := &EventSystem{
233-
sys: sys,
234-
backend: sys.backend,
235-
install: make(chan *subscription),
236-
uninstall: make(chan *subscription),
237-
txsCh: make(chan core.NewTxsEvent, txChanSize),
238-
queuedTxsCh: make(chan core.NewQueuedTxsEvent, queuedTxChanSize),
239-
logsCh: make(chan []*types.Log, logsChanSize),
240-
rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize),
241-
chainCh: make(chan core.ChainEvent, chainEvChanSize),
222+
sys: sys,
223+
backend: sys.backend,
224+
install: make(chan *subscription),
225+
uninstall: make(chan *subscription),
226+
txsCh: make(chan core.NewTxsEvent, txChanSize),
227+
logsCh: make(chan []*types.Log, logsChanSize),
228+
rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize),
229+
chainCh: make(chan core.ChainEvent, chainEvChanSize),
242230
}
243231

244232
// Subscribe events
245233
m.txsSub = m.backend.SubscribeNewTxsEvent(m.txsCh)
246-
m.queuedTxsSub = m.backend.SubscribeNewQueuedTxsEvent(m.queuedTxsCh)
247234
m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh)
248235
m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh)
249236
m.chainSub = m.backend.SubscribeChainEvent(m.chainCh)
250237

251238
// Make sure none of the subscriptions are empty
252-
if m.txsSub == nil || m.queuedTxsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil {
239+
if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil {
253240
log.Crit("Subscribe for event system failed")
254241
}
255242

@@ -284,8 +271,6 @@ func (sub *Subscription) Unsubscribe() {
284271
break uninstallLoop
285272
case <-sub.f.logs:
286273
case <-sub.f.txs:
287-
case <-sub.f.hashes:
288-
case <-sub.f.queuedTxs:
289274
case <-sub.f.headers:
290275
case <-sub.f.receipts:
291276
}
@@ -372,8 +357,6 @@ func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*typ
372357
created: time.Now(),
373358
logs: logs,
374359
txs: make(chan []*types.Transaction),
375-
hashes: make(chan []common.Hash),
376-
queuedTxs: make(chan []*types.Transaction),
377360
headers: make(chan *types.Header),
378361
receipts: make(chan []*ReceiptWithTx),
379362
installed: make(chan struct{}),
@@ -391,8 +374,6 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti
391374
created: time.Now(),
392375
logs: make(chan []*types.Log),
393376
txs: make(chan []*types.Transaction),
394-
hashes: make(chan []common.Hash),
395-
queuedTxs: make(chan []*types.Transaction),
396377
headers: headers,
397378
receipts: make(chan []*ReceiptWithTx),
398379
installed: make(chan struct{}),
@@ -410,26 +391,7 @@ func (es *EventSystem) SubscribePendingTxs(txs chan []*types.Transaction) *Subsc
410391
created: time.Now(),
411392
logs: make(chan []*types.Log),
412393
txs: txs,
413-
queuedTxs: make(chan []*types.Transaction),
414-
headers: make(chan *types.Header),
415-
installed: make(chan struct{}),
416-
err: make(chan error),
417-
}
418-
return es.subscribe(sub)
419-
}
420-
421-
// SubscribeQueuedTxs creates a subscription that writes transaction hashes for
422-
// transactions that enter the transaction pool.
423-
func (es *EventSystem) SubscribeQueuedTxs(queuedTxs chan []*types.Transaction) *Subscription {
424-
sub := &subscription{
425-
id: rpc.NewID(),
426-
typ: QueuedTransactionsSubscription,
427-
created: time.Now(),
428-
logs: make(chan []*types.Log),
429-
hashes: make(chan []common.Hash),
430-
queuedTxs: queuedTxs,
431394
headers: make(chan *types.Header),
432-
receipts: make(chan []*ReceiptWithTx),
433395
installed: make(chan struct{}),
434396
err: make(chan error),
435397
}
@@ -493,20 +455,11 @@ func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent)
493455
}
494456
}
495457

496-
func (es *EventSystem) handleQueuedTxsEvent(filters filterIndex, ev core.NewQueuedTxsEvent) {
497-
txs := make([]*types.Transaction, 0, len(ev.Txs))
498-
txs = append(txs, ev.Txs...)
499-
for _, f := range filters[QueuedTransactionsSubscription] {
500-
f.queuedTxs <- txs
501-
}
502-
}
503-
504458
// eventLoop (un)installs filters and processes mux events.
505459
func (es *EventSystem) eventLoop() {
506460
// Ensure all subscriptions get cleaned up
507461
defer func() {
508462
es.txsSub.Unsubscribe()
509-
es.queuedTxsSub.Unsubscribe()
510463
es.logsSub.Unsubscribe()
511464
es.rmLogsSub.Unsubscribe()
512465
es.chainSub.Unsubscribe()
@@ -521,8 +474,6 @@ func (es *EventSystem) eventLoop() {
521474
select {
522475
case ev := <-es.txsCh:
523476
es.handleTxsEvent(index, ev)
524-
case ev := <-es.queuedTxsCh:
525-
es.handleQueuedTxsEvent(index, ev)
526477
case ev := <-es.logsCh:
527478
es.handleLogs(index, ev)
528479
case ev := <-es.rmLogsCh:
@@ -541,8 +492,6 @@ func (es *EventSystem) eventLoop() {
541492
// System stopped
542493
case <-es.txsSub.Err():
543494
return
544-
case <-es.queuedTxsSub.Err():
545-
return
546495
case <-es.logsSub.Err():
547496
return
548497
case <-es.rmLogsSub.Err():

0 commit comments

Comments
 (0)