diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml deleted file mode 100644 index 50c9fe7f7511..000000000000 --- a/.github/workflows/go.yml +++ /dev/null @@ -1,99 +0,0 @@ -on: - push: - branches: - - master - pull_request: - branches: - - master - workflow_dispatch: - -jobs: - lint: - name: Lint - runs-on: [self-hosted-ghr, size-s-x64] - steps: - - uses: actions/checkout@v4 - with: - submodules: false - - # Cache build tools to avoid downloading them each time - - uses: actions/cache@v4 - with: - path: build/cache - key: ${{ runner.os }}-build-tools-cache-${{ hashFiles('build/checksums.txt') }} - - - name: Set up Go - uses: actions/setup-go@v5 - with: - go-version: 1.25 - cache: false - - - name: Run linters - run: | - go run build/ci.go lint - go run build/ci.go check_generate - go run build/ci.go check_baddeps - - keeper: - name: Keeper Builds - needs: test - runs-on: [self-hosted-ghr, size-l-x64] - steps: - - uses: actions/checkout@v4 - with: - submodules: true - - - name: Set up Go - uses: actions/setup-go@v5 - with: - go-version: '1.25' - cache: false - - - name: Build - run: go run build/ci.go keeper - - test-32bit: - name: "32bit tests" - needs: test - runs-on: [self-hosted-ghr, size-l-x64] - steps: - - uses: actions/checkout@v4 - with: - submodules: false - - - name: Set up Go - uses: actions/setup-go@v5 - with: - go-version: '1.25' - cache: false - - - name: Install cross toolchain - run: | - apt-get update - apt-get -yq --no-install-suggests --no-install-recommends install gcc-multilib - - - name: Build - run: go run build/ci.go test -arch 386 -short -p 8 - - test: - name: Test - needs: lint - runs-on: [self-hosted-ghr, size-l-x64] - strategy: - matrix: - go: - - '1.25' - - '1.24' - steps: - - uses: actions/checkout@v4 - with: - submodules: true - - - name: Set up Go - uses: actions/setup-go@v5 - with: - go-version: ${{ matrix.go }} - cache: false - - - name: Run tests - run: go run build/ci.go test -p 8 diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 109b36836a04..628da5f4970f 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -72,6 +72,7 @@ var ( utils.TxPoolNoLocalsFlag, utils.TxPoolJournalFlag, utils.TxPoolRejournalFlag, + utils.TxPoolBroadcastPendingLocalTxFlag, utils.TxPoolPriceLimitFlag, utils.TxPoolPriceBumpFlag, utils.TxPoolAccountSlotsFlag, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 5a7e40767cfb..16148cc868f2 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -404,11 +404,15 @@ var ( Value: ethconfig.Defaults.TxPool.Rejournal, Category: flags.TxPoolCategory, } + TxPoolBroadcastPendingLocalTxFlag = &cli.DurationFlag{ + Name: "txpool.broadcastpendinglocaltx", + Usage: "Time interval to broadcast the pending local transaction", + Value: legacypool.DefaultConfig.BroadcastPendingLocalTx, + } TxPoolPriceLimitFlag = &cli.Uint64Flag{ - Name: "txpool.pricelimit", - Usage: "Minimum gas price tip to enforce for acceptance into the pool", - Value: ethconfig.Defaults.TxPool.PriceLimit, - Category: flags.TxPoolCategory, + Name: "txpool.pricelimit", + Usage: "Minimum gas price tip to enforce for acceptance into the pool", + Value: ethconfig.Defaults.TxPool.PriceLimit, } TxPoolPriceBumpFlag = &cli.Uint64Flag{ Name: "txpool.pricebump", @@ -1516,6 +1520,9 @@ func setTxPool(ctx *cli.Context, cfg *legacypool.Config) { if ctx.IsSet(TxPoolRejournalFlag.Name) { cfg.Rejournal = ctx.Duration(TxPoolRejournalFlag.Name) } + if ctx.IsSet(TxPoolBroadcastPendingLocalTxFlag.Name) { + cfg.BroadcastPendingLocalTx = ctx.Duration(TxPoolBroadcastPendingLocalTxFlag.Name) + } if ctx.IsSet(TxPoolPriceLimitFlag.Name) { cfg.PriceLimit = ctx.Uint64(TxPoolPriceLimitFlag.Name) } diff --git a/core/blockchain.go b/core/blockchain.go index 61eab591539d..c5703efff5ea 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -118,10 +118,11 @@ var ( ) const ( - bodyCacheLimit = 256 - blockCacheLimit = 256 - receiptsCacheLimit = 32 - txLookupCacheLimit = 1024 + bodyCacheLimit = 256 + blockCacheLimit = 256 + receiptsCacheLimit = 32 + transferLogsCacheLimit = 32 + txLookupCacheLimit = 1024 // BlockChainVersion ensures that an incompatible database forces a resync from scratch. // @@ -319,10 +320,11 @@ type BlockChain struct { currentSafeBlock atomic.Pointer[types.Header] // Latest (consensus) safe block historyPrunePoint atomic.Pointer[history.PrunePoint] - bodyCache *lru.Cache[common.Hash, *types.Body] - bodyRLPCache *lru.Cache[common.Hash, rlp.RawValue] - receiptsCache *lru.Cache[common.Hash, []*types.Receipt] // Receipts cache with all fields derived - blockCache *lru.Cache[common.Hash, *types.Block] + bodyCache *lru.Cache[common.Hash, *types.Body] + bodyRLPCache *lru.Cache[common.Hash, rlp.RawValue] + receiptsCache *lru.Cache[common.Hash, []*types.Receipt] // Receipts cache with all fields derived + transferLogsCache *lru.Cache[common.Hash, []*types.TransferLog] // Cache for the most recent receipts per block + blockCache *lru.Cache[common.Hash, *types.Block] txLookupLock sync.RWMutex txLookupCache *lru.Cache[common.Hash, txLookup] @@ -372,19 +374,20 @@ func NewBlockChain(db ethdb.Database, genesis *Genesis, engine consensus.Engine, log.Info("") bc := &BlockChain{ - chainConfig: chainConfig, - cfg: cfg, - db: db, - triedb: triedb, - triegc: prque.New[int64, common.Hash](nil), - chainmu: syncx.NewClosableMutex(), - bodyCache: lru.NewCache[common.Hash, *types.Body](bodyCacheLimit), - bodyRLPCache: lru.NewCache[common.Hash, rlp.RawValue](bodyCacheLimit), - receiptsCache: lru.NewCache[common.Hash, []*types.Receipt](receiptsCacheLimit), - blockCache: lru.NewCache[common.Hash, *types.Block](blockCacheLimit), - txLookupCache: lru.NewCache[common.Hash, txLookup](txLookupCacheLimit), - engine: engine, - logger: cfg.VmConfig.Tracer, + chainConfig: chainConfig, + cfg: cfg, + db: db, + triedb: triedb, + triegc: prque.New[int64, common.Hash](nil), + chainmu: syncx.NewClosableMutex(), + bodyCache: lru.NewCache[common.Hash, *types.Body](bodyCacheLimit), + bodyRLPCache: lru.NewCache[common.Hash, rlp.RawValue](bodyCacheLimit), + receiptsCache: lru.NewCache[common.Hash, []*types.Receipt](receiptsCacheLimit), + blockCache: lru.NewCache[common.Hash, *types.Block](blockCacheLimit), + txLookupCache: lru.NewCache[common.Hash, txLookup](txLookupCacheLimit), + transferLogsCache: lru.NewCache[common.Hash, []*types.TransferLog](transferLogsCacheLimit), + engine: engine, + logger: cfg.VmConfig.Tracer, } bc.hc, err = NewHeaderChain(db, chainConfig, engine, bc.insertStopped) if err != nil { @@ -1068,6 +1071,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha // removed by the hc.SetHead function. rawdb.DeleteBody(db, hash, num) rawdb.DeleteReceipts(db, hash, num) + rawdb.DeleteTransferLogs(db, hash, num) } // Todo(rjl493456442) txlookup, log index, etc } @@ -1092,6 +1096,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha bc.bodyCache.Purge() bc.bodyRLPCache.Purge() bc.receiptsCache.Purge() + bc.transferLogsCache.Purge() bc.blockCache.Purge() bc.txLookupCache.Purge() @@ -1425,7 +1430,11 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ // Ensure genesis is in the ancient store if blockChain[0].NumberU64() == 1 { if frozen, _ := bc.db.Ancients(); frozen == 0 { - writeSize, err := rawdb.WriteAncientBlocks(bc.db, []*types.Block{bc.genesisBlock}, []rlp.RawValue{rlp.EmptyList}) + tfLogs, err := rawdb.ReadTransferLogs(bc.db, bc.genesisBlock.Hash(), frozen) + if err != nil { + return 0, err + } + writeSize, err := rawdb.WriteAncientBlocks(bc.db, []*types.Block{bc.genesisBlock}, []rlp.RawValue{rlp.EmptyList}, tfLogs) if err != nil { log.Error("Error writing genesis to ancients", "err", err) return 0, err @@ -1435,7 +1444,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ } } // Write all chain data to ancients. - writeSize, err := rawdb.WriteAncientBlocks(bc.db, blockChain, receiptChain) + writeSize, err := rawdb.WriteAncientBlocks(bc.db, blockChain, receiptChain, nil) if err != nil { log.Error("Error importing chain data to ancients", "err", err) return 0, err @@ -1495,6 +1504,9 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ rawdb.WriteBlock(batch, block) rawdb.WriteRawReceipts(batch, block.Hash(), block.NumberU64(), receiptChain[i]) + // We don't have transfer logs for fast sync blocks + rawdb.WriteMissingTransferLogs(batch, block.Hash(), block.NumberU64()) + // Write everything belongs to the blocks into the database. So that // we can ensure all components of body is completed(body, receipts) // except transaction indexes(will be created once sync is finished). @@ -1599,6 +1611,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. blockBatch := bc.db.NewBatch() rawdb.WriteBlock(blockBatch, block) rawdb.WriteReceipts(blockBatch, block.Hash(), block.NumberU64(), receipts) + rawdb.WriteTransferLogs(blockBatch, block.Hash(), block.NumberU64(), statedb.TransferLogs()) rawdb.WritePreimages(blockBatch, statedb.Preimages()) if err := blockBatch.Write(); err != nil { log.Crit("Failed to write block into disk", "err", err) @@ -2771,7 +2784,7 @@ func (bc *BlockChain) InsertHeadersBeforeCutoff(headers []*types.Header) (int, e first = headers[0].Number.Uint64() ) if first == 1 && frozen == 0 { - _, err := rawdb.WriteAncientBlocks(bc.db, []*types.Block{bc.genesisBlock}, []rlp.RawValue{rlp.EmptyList}) + _, err := rawdb.WriteAncientBlocks(bc.db, []*types.Block{bc.genesisBlock}, []rlp.RawValue{rlp.EmptyList}, nil) if err != nil { log.Error("Error writing genesis to ancients", "err", err) return 0, err diff --git a/core/blockchain_reader.go b/core/blockchain_reader.go index 25dcdb512e10..c2c2d6af7b70 100644 --- a/core/blockchain_reader.go +++ b/core/blockchain_reader.go @@ -21,6 +21,8 @@ import ( "fmt" "math/big" + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/consensus/misc/eip4844" @@ -524,10 +526,15 @@ func (bc *BlockChain) SubscribeBlockProcessingEvent(ch chan<- bool) event.Subscr } // GetTransferLogs retrieves the transfer logs for all transactions in a given block. -func (bc *BlockChain) GetTransferLogs(hash common.Hash) []*types.TransferLog { +func (bc *BlockChain) GetTransferLogs(hash common.Hash) ([]*types.TransferLog, error) { number, ok := rawdb.ReadHeaderNumber(bc.db, hash) if !ok { - return nil + return nil, ethereum.NotFound + } + transferLogs, err := rawdb.ReadTransferLogs(bc.db, hash, number) + if err != nil { + return nil, err } - return rawdb.ReadTransferLogs(bc.db, hash, number) + bc.transferLogsCache.Add(hash, transferLogs) + return transferLogs, nil } diff --git a/core/events.go b/core/events.go index ef0de3242621..414a79c0d02a 100644 --- a/core/events.go +++ b/core/events.go @@ -23,6 +23,12 @@ import ( // NewTxsEvent is posted when a batch of transactions enter the transaction pool. type NewTxsEvent struct{ Txs []*types.Transaction } +// PendingLocalTxsEvent is posted when there are pending local transactions in the transaction pool. +type PendingLocalTxsEvent struct{ Txs []*types.Transaction } + +// NewQueuedTxsEvent is posted when a batch of transactions enter the transaction pool. +type NewQueuedTxsEvent struct{ Txs []*types.Transaction } + // RemovedLogsEvent is posted when a reorg happens type RemovedLogsEvent struct{ Logs []*types.Log } diff --git a/core/genesis.go b/core/genesis.go index d0d490874d07..f25204671f20 100644 --- a/core/genesis.go +++ b/core/genesis.go @@ -567,6 +567,7 @@ func (g *Genesis) Commit(db ethdb.Database, triedb *triedb.Database) (*types.Blo rawdb.WriteGenesisStateSpec(batch, block.Hash(), blob) rawdb.WriteBlock(batch, block) rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), nil) + rawdb.WriteTransferLogs(db, block.Hash(), block.NumberU64(), nil) rawdb.WriteCanonicalHash(batch, block.Hash(), block.NumberU64()) rawdb.WriteHeadBlockHash(batch, block.Hash()) rawdb.WriteHeadFastBlockHash(batch, block.Hash()) diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index 1721b09dcd00..b77611f2cd82 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -19,6 +19,7 @@ package rawdb import ( "bytes" "encoding/binary" + "errors" "fmt" "math/big" "slices" @@ -33,6 +34,11 @@ import ( "github.com/ethereum/go-ethereum/rlp" ) +var ( + errNotFound = errors.New("not found") + errMissingTransferLogs = errors.New("missing transfer logs") +) + // ReadCanonicalHash retrieves the hash assigned to a canonical block number. func ReadCanonicalHash(db ethdb.Reader, number uint64) common.Hash { var data []byte @@ -703,38 +709,80 @@ func ReadLogs(db ethdb.Reader, hash common.Hash, number uint64) [][]*types.Log { return logs } +// ReadTransferLogsRLP retrieves all the transfer logs belonging to a block in RLP encoding. +func ReadTransferLogsRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue { + // First try to look up the data in ancient database. Extra hash + // comparison is necessary since ancient database only maintains + // the canonical data. + data, _ := db.Ancient(ChainFreezerTransferLogTable, number) + if len(data) > 0 { + h, _ := db.Ancient(ChainFreezerHashTable, number) + if common.BytesToHash(h) == hash { + return data + } + } + // Then try to look up the data in leveldb. + data, _ = db.Get(blockTransferLogsKey(number, hash)) + if len(data) > 0 { + return data + } + // In the background freezer is moving data from leveldb to flatten files. + // So during the first check for ancient db, the data is not yet in there, + // but when we reach into leveldb, the data was already moved. That would + // result in a not found error. + data, _ = db.Ancient(ChainFreezerTransferLogTable, number) + if len(data) > 0 { + h, _ := db.Ancient(ChainFreezerHashTable, number) + if common.BytesToHash(h) == hash { + return data + } + } + return nil // Can't find the data anywhere. +} + // ReadTransferLogs retrieves all the transfer logs belonging to a block. -func ReadTransferLogs(db ethdb.KeyValueReader, hash common.Hash, number uint64) []*types.TransferLog { +func ReadTransferLogs(db ethdb.Reader, hash common.Hash, number uint64) ([]*types.TransferLog, error) { // Retrieve the flattened transfer log slice - data, _ := db.Get(append(append(blockTranferLogsPrefix, encodeBlockNumber(number)...), hash.Bytes()...)) + data := ReadTransferLogsRLP(db, hash, number) if len(data) == 0 { - return nil + return nil, errNotFound } transferLogs := []*types.TransferLog{} if err := rlp.DecodeBytes(data, &transferLogs); err != nil { - log.Error("Invalid transfer log array RLP", "hash", hash, "err", err) - return nil + if string(data) == errMissingTransferLogs.Error() { + return nil, errMissingTransferLogs + } + log.Error("Invalid transfer log array RLP", "hash", hash, "number", number, "err", err) + return nil, err } - return transferLogs + return transferLogs, nil } // WriteTransferLogs stores all the transfer logs belonging to a block. func WriteTransferLogs(db ethdb.KeyValueWriter, hash common.Hash, number uint64, transferLogs []*types.TransferLog) { bytes, err := rlp.EncodeToBytes(transferLogs) if err != nil { - log.Crit("Failed to encode block transfer logs", "err", err) + log.Crit("Failed to encode block transfer logs", "hash", hash, "number", number, "err", err) + } + // Store the flattened transfer log slice + if err := db.Put(blockTransferLogsKey(number, hash), bytes); err != nil { + log.Crit("Failed to store block transfer logs", "hash", hash, "number", number, "err", err) } +} + +// WriteMissingTransferLogs stores missing transfer logs message for a block. +func WriteMissingTransferLogs(db ethdb.KeyValueWriter, hash common.Hash, number uint64) { + bytes := []byte(errMissingTransferLogs.Error()) // Store the flattened transfer log slice - key := append(append(blockTranferLogsPrefix, encodeBlockNumber(number)...), hash.Bytes()...) - if err := db.Put(key, bytes); err != nil { - log.Crit("Failed to store block transfer logs", "err", err) + if err := db.Put(blockTransferLogsKey(number, hash), bytes); err != nil { + log.Crit("Failed to store block transfer logs", "hash", hash, "number", number, "err", err) } } // DeleteTransferLogs removes all transfer logs associated with a block hash. func DeleteTransferLogs(db ethdb.KeyValueWriter, hash common.Hash, number uint64) { - if err := db.Delete(append(append(blockTranferLogsPrefix, encodeBlockNumber(number)...), hash.Bytes()...)); err != nil { - log.Crit("Failed to delete block transfer logs", "err", err) + if err := db.Delete(blockTransferLogsKey(number, hash)); err != nil { + log.Crit("Failed to delete block transfer logs", "hash", hash, "number", number, "err", err) } } @@ -763,11 +811,11 @@ func WriteBlock(db ethdb.KeyValueWriter, block *types.Block) { } // WriteAncientBlocks writes entire block data into ancient store and returns the total written size. -func WriteAncientBlocks(db ethdb.AncientWriter, blocks []*types.Block, receipts []rlp.RawValue) (int64, error) { +func WriteAncientBlocks(db ethdb.AncientWriter, blocks []*types.Block, receipts []rlp.RawValue, transferLogs []*types.TransferLog) (int64, error) { return db.ModifyAncients(func(op ethdb.AncientWriteOp) error { for i, block := range blocks { header := block.Header() - if err := writeAncientBlock(op, block, header, receipts[i]); err != nil { + if err := writeAncientBlock(op, block, header, receipts[i], transferLogs); err != nil { return err } } @@ -775,7 +823,7 @@ func WriteAncientBlocks(db ethdb.AncientWriter, blocks []*types.Block, receipts }) } -func writeAncientBlock(op ethdb.AncientWriteOp, block *types.Block, header *types.Header, receipts rlp.RawValue) error { +func writeAncientBlock(op ethdb.AncientWriteOp, block *types.Block, header *types.Header, receipts rlp.RawValue, transferLogs []*types.TransferLog) error { num := block.NumberU64() if err := op.AppendRaw(ChainFreezerHashTable, num, block.Hash().Bytes()); err != nil { return fmt.Errorf("can't add block %d hash: %v", num, err) @@ -789,6 +837,21 @@ func writeAncientBlock(op ethdb.AncientWriteOp, block *types.Block, header *type if err := op.Append(ChainFreezerReceiptTable, num, receipts); err != nil { return fmt.Errorf("can't append block %d receipts: %v", num, err) } + // Transfer logs might be nil when fast sync. + // To keep complete ancient table, we append the specific string to indicate nil transfer logs. + var transferLogBlob []byte + if transferLogs != nil { + var err error + transferLogBlob, err = rlp.EncodeToBytes(transferLogs) + if err != nil { + log.Crit("Failed to RLP encode block transfer logs", "err", err) + } + } else { + transferLogBlob = []byte(errMissingTransferLogs.Error()) + } + if err := op.AppendRaw(ChainFreezerTransferLogTable, num, transferLogBlob); err != nil { + return fmt.Errorf("can't append block %d transfer logs: %v", num, err) + } return nil } @@ -811,6 +874,9 @@ func WriteAncientHeaderChain(db ethdb.AncientWriter, headers []*types.Header) (i if err := op.AppendRaw(ChainFreezerReceiptTable, num, nil); err != nil { return fmt.Errorf("can't append block %d receipts: %v", num, err) } + if err := op.AppendRaw(ChainFreezerTransferLogTable, num, nil); err != nil { + return fmt.Errorf("can't append block %d transfer logs: %v", num, err) + } } return nil }) diff --git a/core/rawdb/accessors_chain_test.go b/core/rawdb/accessors_chain_test.go index 26e6536c0324..464453d5c066 100644 --- a/core/rawdb/accessors_chain_test.go +++ b/core/rawdb/accessors_chain_test.go @@ -444,9 +444,12 @@ func TestAncientStorage(t *testing.T) { if blob := ReadCanonicalReceiptsRLP(db, number, &hash); len(blob) > 0 { t.Fatalf("non existent receipts returned") } + if blob := ReadTransferLogsRLP(db, hash, number); len(blob) > 0 { + t.Fatalf("non existent transfer logs returned") + } // Write and verify the header in the database - WriteAncientBlocks(db, []*types.Block{block}, types.EncodeBlockReceiptLists([]types.Receipts{nil})) + WriteAncientBlocks(db, []*types.Block{block}, types.EncodeBlockReceiptLists([]types.Receipts{nil}), nil) if blob := ReadHeaderRLP(db, hash, number); len(blob) == 0 { t.Fatalf("no header returned") @@ -460,6 +463,9 @@ func TestAncientStorage(t *testing.T) { if blob := ReadCanonicalReceiptsRLP(db, number, &hash); len(blob) == 0 { t.Fatalf("no receipts returned") } + if blob := ReadTransferLogsRLP(db, hash, number); len(blob) == 0 { + t.Fatalf("no transfer logs returned") + } // Use a fake hash for data retrieval, nothing should be returned. fakeHash := common.BytesToHash([]byte{0x01, 0x02, 0x03}) @@ -472,6 +478,48 @@ func TestAncientStorage(t *testing.T) { if blob := ReadReceiptsRLP(db, fakeHash, number); len(blob) != 0 { t.Fatalf("invalid receipts returned") } + if blob := ReadTransferLogsRLP(db, fakeHash, number); len(blob) != 0 { + t.Fatalf("invalid transfer logs returned") + } +} + +func TestAncientTransferLogStorageTransferLog(t *testing.T) { + // Freezer style fast import the chain. + frdir := t.TempDir() + + db, err := NewDatabaseWithFreezer(NewMemoryDatabase(), frdir, "", false) + if err != nil { + t.Fatalf("failed to create database with ancient backend") + } + // Create a test block + block := types.NewBlockWithHeader(&types.Header{ + Number: big.NewInt(0), + Extra: []byte("test block"), + UncleHash: types.EmptyUncleHash, + TxHash: types.EmptyRootHash, + ReceiptHash: types.EmptyRootHash, + }) + hash, number := block.Hash(), block.NumberU64() + // Write with nil transfer logs, should get nil transfer logs. + WriteAncientBlocks(db, []*types.Block{block}, types.EncodeBlockReceiptLists([]types.Receipts{nil}), nil) + if tlogs, err := ReadTransferLogs(db, hash, number); tlogs != nil || err != errMissingTransferLogs { + t.Fatalf("should return nil transfer logs and missing transfer logs error") + } + + // Create a test block + block2 := types.NewBlockWithHeader(&types.Header{ + Number: big.NewInt(1), + Extra: []byte("test block"), + UncleHash: types.EmptyUncleHash, + TxHash: types.EmptyRootHash, + ReceiptHash: types.EmptyRootHash, + }) + hash, number = block2.Hash(), block2.NumberU64() + // Write with nil transfer logs, should get nil transfer logs. + WriteAncientBlocks(db, []*types.Block{block2}, types.EncodeBlockReceiptLists([]types.Receipts{nil}), []*types.TransferLog{}) + if tlogs, err := ReadTransferLogs(db, hash, number); tlogs == nil || err != nil { + t.Fatalf("invalid transfer logs returned") + } } func TestWriteAncientHeaderChain(t *testing.T) { @@ -619,7 +667,7 @@ func BenchmarkWriteAncientBlocks(b *testing.B) { blocks := allBlocks[i : i+length] receipts := batchReceipts[:length] - writeSize, err := WriteAncientBlocks(db, blocks, types.EncodeBlockReceiptLists(receipts)) + writeSize, err := WriteAncientBlocks(db, blocks, types.EncodeBlockReceiptLists(receipts), nil) if err != nil { b.Fatal(err) } @@ -924,7 +972,7 @@ func TestHeadersRLPStorage(t *testing.T) { } receipts := make([]types.Receipts, 100) // Write first half to ancients - WriteAncientBlocks(db, chain[:50], types.EncodeBlockReceiptLists(receipts[:50])) + WriteAncientBlocks(db, chain[:50], types.EncodeBlockReceiptLists(receipts[:50]), nil) // Write second half to db for i := 50; i < 100; i++ { WriteCanonicalHash(db, chain[i].Hash(), chain[i].NumberU64()) @@ -978,12 +1026,12 @@ func TestTransferLogStorage(t *testing.T) { // Check that no transfer logs entries are in a pristine database hash := common.BytesToHash([]byte{0x03, 0x14}) - if ls := ReadTransferLogs(db, hash, 0); len(ls) != 0 { + if ls, err := ReadTransferLogs(db, hash, 0); len(ls) != 0 || err != errNotFound { t.Fatalf("non existent transfer logs returned: %v", ls) } // Insert the transfer log slice into the database and check presence WriteTransferLogs(db, hash, 0, transferLogs) - if ls := ReadTransferLogs(db, hash, 0); len(ls) == 0 { + if ls, err := ReadTransferLogs(db, hash, 0); len(ls) == 0 || err != nil { t.Fatalf("no transfer logs returned") } else { for i := 0; i < len(transferLogs); i++ { @@ -997,7 +1045,13 @@ func TestTransferLogStorage(t *testing.T) { } // Delete the transfer log slice and check purge DeleteTransferLogs(db, hash, 0) - if ls := ReadTransferLogs(db, hash, 0); len(ls) != 0 { + if ls, err := ReadTransferLogs(db, hash, 0); len(ls) != 0 || err != errNotFound { t.Fatalf("deleted transfer logs returned: %v", ls) } + // Insert missing transfer logs into the database and check error + hash2 := common.BytesToHash([]byte{0x07, 0x15}) + WriteMissingTransferLogs(db, hash2, 1) + if ls, err := ReadTransferLogs(db, hash2, 1); len(ls) != 0 || err != errMissingTransferLogs { + t.Fatalf("no transfer logs returned and should return missing transfer logs error") + } } diff --git a/core/rawdb/ancient_scheme.go b/core/rawdb/ancient_scheme.go index afec7848c882..588c005f9d41 100644 --- a/core/rawdb/ancient_scheme.go +++ b/core/rawdb/ancient_scheme.go @@ -35,6 +35,9 @@ const ( // ChainFreezerReceiptTable indicates the name of the freezer receipts table. ChainFreezerReceiptTable = "receipts" + + // chainFreezerTransferLogTable indicates the name of the freezer transfer logs table. + ChainFreezerTransferLogTable = "transfers" ) // chainFreezerTableConfigs configures the settings for tables in the chain freezer. @@ -42,10 +45,11 @@ const ( // tail truncation is disabled for the header and hash tables, as these are intended // to be retained long-term. var chainFreezerTableConfigs = map[string]freezerTableConfig{ - ChainFreezerHeaderTable: {noSnappy: false, prunable: false}, - ChainFreezerHashTable: {noSnappy: true, prunable: false}, - ChainFreezerBodiesTable: {noSnappy: false, prunable: true}, - ChainFreezerReceiptTable: {noSnappy: false, prunable: true}, + ChainFreezerHeaderTable: {noSnappy: false, prunable: false}, + ChainFreezerHashTable: {noSnappy: true, prunable: false}, + ChainFreezerBodiesTable: {noSnappy: false, prunable: true}, + ChainFreezerReceiptTable: {noSnappy: false, prunable: true}, + ChainFreezerTransferLogTable: {noSnappy: false, prunable: true}, } // freezerTableConfig contains the settings for a freezer table. diff --git a/core/rawdb/chain_freezer.go b/core/rawdb/chain_freezer.go index d33f7ce33d43..b7418225826d 100644 --- a/core/rawdb/chain_freezer.go +++ b/core/rawdb/chain_freezer.go @@ -327,6 +327,10 @@ func (f *chainFreezer) freezeRange(nfdb *nofreezedb, number, limit uint64) (hash if len(receipts) == 0 { return fmt.Errorf("block receipts missing, can't freeze block %d", number) } + transferLogs := ReadTransferLogsRLP(nfdb, hash, number) + if len(transferLogs) == 0 { + return fmt.Errorf("block transfer logs missing, can't freeze block %d", number) + } // Write to the batch. if err := op.AppendRaw(ChainFreezerHashTable, number, hash[:]); err != nil { return fmt.Errorf("can't write hash to Freezer: %v", err) @@ -340,6 +344,10 @@ func (f *chainFreezer) freezeRange(nfdb *nofreezedb, number, limit uint64) (hash if err := op.AppendRaw(ChainFreezerReceiptTable, number, receipts); err != nil { return fmt.Errorf("can't write receipts to Freezer: %v", err) } + if err := op.AppendRaw(ChainFreezerTransferLogTable, number, transferLogs); err != nil { + return fmt.Errorf("can't write transfer logs to Freezer: %v", err) + } + hashes = append(hashes, hash) } return nil diff --git a/core/rawdb/database.go b/core/rawdb/database.go index 29483baa5f7b..6931b1fc1033 100644 --- a/core/rawdb/database.go +++ b/core/rawdb/database.go @@ -426,6 +426,7 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error { filterMapRows stat filterMapLastBlock stat filterMapBlockLV stat + transferLog stat // Path-mode archive data stateIndex stat @@ -491,6 +492,8 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error { storageTries.add(size) case bytes.HasPrefix(key, CodePrefix) && len(key) == len(CodePrefix)+common.HashLength: codes.add(size) + case bytes.HasPrefix(key, blockTranferLogsPrefix) && len(key) == (len(blockTranferLogsPrefix)+8+common.HashLength): + transferLog.add(size) case bytes.HasPrefix(key, txLookupPrefix) && len(key) == (len(txLookupPrefix)+common.HashLength): txLookups.add(size) case bytes.HasPrefix(key, SnapshotAccountPrefix) && len(key) == (len(SnapshotAccountPrefix)+common.HashLength): @@ -609,6 +612,7 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error { {"Key-Value store", "Bodies", bodies.sizeString(), bodies.countString()}, {"Key-Value store", "Receipt lists", receipts.sizeString(), receipts.countString()}, {"Key-Value store", "Difficulties (deprecated)", tds.sizeString(), tds.countString()}, + {"Key-Value store", "Transfer logs", transferLog.sizeString(), transferLog.sizeString()}, {"Key-Value store", "Block number->hash", numHashPairings.sizeString(), numHashPairings.countString()}, {"Key-Value store", "Block hash->number", hashNumPairings.sizeString(), hashNumPairings.countString()}, {"Key-Value store", "Transaction index", txLookups.sizeString(), txLookups.countString()}, diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index 36848965a128..cb3adc2ebb99 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -215,6 +215,11 @@ func blockReceiptsKey(number uint64, hash common.Hash) []byte { return append(append(blockReceiptsPrefix, encodeBlockNumber(number)...), hash.Bytes()...) } +// blockTransferLogsKey = blockReceiptsPrefix + num (uint64 big endian) + hash +func blockTransferLogsKey(number uint64, hash common.Hash) []byte { + return append(append(blockTranferLogsPrefix, encodeBlockNumber(number)...), hash.Bytes()...) +} + // txLookupKey = txLookupPrefix + hash func txLookupKey(hash common.Hash) []byte { return append(txLookupPrefix, hash.Bytes()...) diff --git a/core/state/statedb.go b/core/state/statedb.go index 3c0138d7cc7c..31b3e44a059f 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -157,7 +157,7 @@ type StateDB struct { StorageUpdated atomic.Int64 // Number of storage slots updated during the state transition StorageDeleted atomic.Int64 // Number of storage slots deleted during the state transition - // transferLogs records trasfer logs for each transaction. + // transferLogs records transfer logs for each transaction. transferLogs map[common.Hash][]*types.TransferLog } diff --git a/core/txindexer_test.go b/core/txindexer_test.go index 71c78d506bc8..5d19bd5b368c 100644 --- a/core/txindexer_test.go +++ b/core/txindexer_test.go @@ -117,7 +117,7 @@ func TestTxIndexer(t *testing.T) { } for _, c := range cases { db, _ := rawdb.Open(rawdb.NewMemoryDatabase(), rawdb.OpenOptions{}) - rawdb.WriteAncientBlocks(db, append([]*types.Block{gspec.ToBlock()}, blocks...), types.EncodeBlockReceiptLists(append([]types.Receipts{{}}, receipts...))) + rawdb.WriteAncientBlocks(db, append([]*types.Block{gspec.ToBlock()}, blocks...), types.EncodeBlockReceiptLists(append([]types.Receipts{{}}, receipts...)), nil) // Index the initial blocks from ancient store indexer := &txIndexer{ @@ -237,7 +237,7 @@ func TestTxIndexerRepair(t *testing.T) { for _, c := range cases { db, _ := rawdb.Open(rawdb.NewMemoryDatabase(), rawdb.OpenOptions{}) encReceipts := types.EncodeBlockReceiptLists(append([]types.Receipts{{}}, receipts...)) - rawdb.WriteAncientBlocks(db, append([]*types.Block{gspec.ToBlock()}, blocks...), encReceipts) + rawdb.WriteAncientBlocks(db, append([]*types.Block{gspec.ToBlock()}, blocks...), encReceipts, nil) // Index the initial blocks from ancient store indexer := &txIndexer{ @@ -428,7 +428,7 @@ func TestTxIndexerReport(t *testing.T) { for _, c := range cases { db, _ := rawdb.Open(rawdb.NewMemoryDatabase(), rawdb.OpenOptions{}) encReceipts := types.EncodeBlockReceiptLists(append([]types.Receipts{{}}, receipts...)) - rawdb.WriteAncientBlocks(db, append([]*types.Block{gspec.ToBlock()}, blocks...), encReceipts) + rawdb.WriteAncientBlocks(db, append([]*types.Block{gspec.ToBlock()}, blocks...), encReceipts, nil) // Index the initial blocks from ancient store indexer := &txIndexer{ diff --git a/core/txpool/blobpool/blobpool.go b/core/txpool/blobpool/blobpool.go index bfaf4d5b8e3c..545ce5b453eb 100644 --- a/core/txpool/blobpool/blobpool.go +++ b/core/txpool/blobpool/blobpool.go @@ -1741,6 +1741,7 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) { } return err } + // If the address is not yet known, request exclusivity to track the account // only by this subpool until all transactions are evicted from, _ := types.Sender(p.signer, tx) // already validated above diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index ceedc74a531e..e601f0a0b01f 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -135,10 +135,11 @@ type BlockChain interface { // Config are the configuration parameters of the transaction pool. type Config struct { - Locals []common.Address // Addresses that should be treated by default as local - NoLocals bool // Whether local transaction handling should be disabled - Journal string // Journal of local transactions to survive node restarts - Rejournal time.Duration // Time interval to regenerate the local transaction journal + Locals []common.Address // Addresses that should be treated by default as local + NoLocals bool // Whether local transaction handling should be disabled + Journal string // Journal of local transactions to survive node restarts + Rejournal time.Duration // Time interval to regenerate the local transaction journal + BroadcastPendingLocalTx time.Duration // Time interval to broadcast the local transaction PriceLimit uint64 // Minimum gas price to enforce for acceptance into the pool PriceBump uint64 // Minimum price bump percentage to replace an already existing transaction (nonce) @@ -153,8 +154,9 @@ type Config struct { // DefaultConfig contains the default configurations for the transaction pool. var DefaultConfig = Config{ - Journal: "transactions.rlp", - Rejournal: time.Hour, + Journal: "transactions.rlp", + Rejournal: time.Hour, + BroadcastPendingLocalTx: 5 * time.Minute, PriceLimit: 1, PriceBump: 10, diff --git a/core/txpool/legacypool/legacypool_test.go b/core/txpool/legacypool/legacypool_test.go index fb994d82086d..197f7b968059 100644 --- a/core/txpool/legacypool/legacypool_test.go +++ b/core/txpool/legacypool/legacypool_test.go @@ -29,6 +29,8 @@ import ( "testing" "time" + "github.com/holiman/uint256" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/state" @@ -40,7 +42,6 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/trie" - "github.com/holiman/uint256" ) var ( diff --git a/core/txpool/locals/tx_tracker.go b/core/txpool/locals/tx_tracker.go index bb178f175e5d..cc8fe65962a3 100644 --- a/core/txpool/locals/tx_tracker.go +++ b/core/txpool/locals/tx_tracker.go @@ -23,9 +23,11 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/txpool/legacypool" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/params" @@ -46,10 +48,13 @@ type TxTracker struct { all map[common.Hash]*types.Transaction // All tracked transactions byAddr map[common.Address]*legacypool.SortedMap // Transactions by address - journal *journal // Journal of local transaction to back up to disk - rejournal time.Duration // How often to rotate journal - pool *txpool.TxPool // The tx pool to interact with - signer types.Signer + journal *journal // Journal of local transaction to back up to disk + rejournal time.Duration // How often to rotate journal + broadcastPendingLocalTx time.Duration // Time interval to broadcast the local transaction + pool *txpool.TxPool // The tx pool to interact with + signer types.Signer + + pendingLocalTxFeed event.Feed shutdownCh chan struct{} mu sync.Mutex @@ -57,7 +62,8 @@ type TxTracker struct { } // New creates a new TxTracker -func New(journalPath string, journalTime time.Duration, chainConfig *params.ChainConfig, next *txpool.TxPool) *TxTracker { +func New(journalPath string, journalTime time.Duration, chainConfig *params.ChainConfig, next *txpool.TxPool, + broadcastPendingLocalTxTime time.Duration) *TxTracker { pool := &TxTracker{ all: make(map[common.Hash]*types.Transaction), byAddr: make(map[common.Address]*legacypool.SortedMap), @@ -69,6 +75,7 @@ func New(journalPath string, journalTime time.Duration, chainConfig *params.Chai pool.journal = newTxJournal(journalPath) pool.rejournal = journalTime } + pool.broadcastPendingLocalTx = broadcastPendingLocalTxTime return pool } @@ -206,6 +213,10 @@ func (tracker *TxTracker) loop() { lastJournal = time.Now() timer = time.NewTimer(10 * time.Second) // Do initial check after 10 seconds, do rechecks more seldom. ) + + pendingLocalTxs := time.NewTicker(tracker.broadcastPendingLocalTx) + defer pendingLocalTxs.Stop() + for { select { case <-tracker.shutdownCh: @@ -221,6 +232,25 @@ func (tracker *TxTracker) loop() { tracker.pool.Add(resubmits, false) } timer.Reset(recheckInterval) + case <-pendingLocalTxs.C: + lTxs := types.Transactions{} + for addr, lazyTxs := range tracker.pool.Pending(txpool.PendingFilter{BlobTxs: false}) { + if _, ok := tracker.byAddr[addr]; !ok { + continue + } + for _, lazyTx := range lazyTxs { + lTxs = append(lTxs, lazyTx.Tx) + } + } + + if len(lTxs) > 0 { + go tracker.pendingLocalTxFeed.Send(core.PendingLocalTxsEvent{Txs: lTxs}) + } } } } + +// SubscribePendingLocalTransactions subscribes to pending local transaction events. +func (tracker *TxTracker) SubscribePendingLocalTransactions(ch chan<- core.PendingLocalTxsEvent) event.Subscription { + return tracker.pendingLocalTxFeed.Subscribe(ch) +} diff --git a/core/txpool/locals/tx_tracker_test.go b/core/txpool/locals/tx_tracker_test.go index dde875460589..1a4e4809d416 100644 --- a/core/txpool/locals/tx_tracker_test.go +++ b/core/txpool/locals/tx_tracker_test.go @@ -84,7 +84,7 @@ func newTestEnv(t *testing.T, n int, gasTip uint64, journal string) *testEnv { return &testEnv{ chain: chain, pool: pool, - tracker: New(journal, time.Minute, gspec.Config, pool), + tracker: New(journal, time.Minute, gspec.Config, pool, time.Minute), genDb: genDb, } } @@ -192,7 +192,7 @@ func TestJournal(t *testing.T) { env.tracker.recheck(true) // manually rejournal the tracker // Make sure all the transactions are properly journalled - trackerB := New(journalPath, time.Minute, gspec.Config, env.pool) + trackerB := New(journalPath, time.Minute, gspec.Config, env.pool, time.Minute) trackerB.journal.load(func(transactions []*types.Transaction) []error { trackerB.TrackAll(transactions) return nil diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index 437861efca7c..43c0502eb460 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -57,6 +57,10 @@ type BlockChain interface { StateAt(root common.Hash) (*state.StateDB, error) } +type PendingLocalTxsPublisher interface { + SubscribePendingLocalTransactions(ch chan<- core.PendingLocalTxsEvent) event.Subscription +} + // TxPool is an aggregator for various transaction specific pools, collectively // tracking all the transactions deemed interesting by the node. Transactions // enter the pool when they are received from the network or submitted locally. @@ -74,6 +78,8 @@ type TxPool struct { term chan struct{} // Termination channel to detect a closed pool sync chan chan error // Testing / simulator channel to block until internal reset is done + + PendingLocalTxsPublisher PendingLocalTxsPublisher // Publisher for pending local transactions } // New creates a new transaction pool to gather, sort and filter inbound @@ -369,6 +375,16 @@ func (p *TxPool) Pending(filter PendingFilter) map[common.Address][]*LazyTransac return txs } +// SubscribePendingLocalTxsEvent registers a subscription of PendingLocalTxsEvent and +// starts sending event to the given channel. +func (p *TxPool) SubscribePendingLocalTxsEvent(ch chan<- core.PendingLocalTxsEvent) event.Subscription { + var subs []event.Subscription + if p.PendingLocalTxsPublisher != nil { + subs = append(subs, p.PendingLocalTxsPublisher.SubscribePendingLocalTransactions(ch)) + } + return p.subs.Track(event.JoinSubscriptions(subs...)) +} + // SubscribeTransactions registers a subscription for new transaction events, // supporting feeding only newly seen or also resurrected transactions. func (p *TxPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription { diff --git a/eth/api_backend_test.go b/eth/api_backend_test.go index aa0539511b02..f69a5af37e09 100644 --- a/eth/api_backend_test.go +++ b/eth/api_backend_test.go @@ -25,6 +25,8 @@ import ( "testing" "time" + "github.com/holiman/uint256" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus/beacon" "github.com/ethereum/go-ethereum/consensus/ethash" @@ -37,7 +39,6 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/params" - "github.com/holiman/uint256" ) var ( @@ -75,7 +76,7 @@ func initBackend(withLocal bool) *EthAPIBackend { txPool: txpool, } if withLocal { - eth.localTxTracker = locals.New("", time.Minute, gspec.Config, txpool) + eth.localTxTracker = locals.New("", time.Minute, gspec.Config, txpool, time.Minute) } return &EthAPIBackend{ eth: eth, diff --git a/eth/api_debug.go b/eth/api_debug.go index 892e1032134b..e75da8663586 100644 --- a/eth/api_debug.go +++ b/eth/api_debug.go @@ -532,3 +532,14 @@ func (api *DebugAPI) ExecutionWitnessByHash(hash common.Hash) (*stateless.ExtWit return result.Witness().ToExtWitness(), nil } + +// GetTransferLogs is a debug API function that returns the transfer logs for a block hash, if known. +func (api *DebugAPI) GetTransferLogs(ctx context.Context, hash common.Hash) ([]*types.TransferLog, error) { + return api.eth.blockchain.GetTransferLogs(hash) +} + +// GetBlockReceipts returns all transaction receipts of the specified block. +func (api *DebugAPI) GetBlockReceipts(ctx context.Context, blockHash common.Hash) ([]map[string]interface{}, error) { + bc := ethapi.NewBlockChainAPI(api.eth.APIBackend) + return bc.GetBlockReceipts(ctx, rpc.BlockNumberOrHashWithHash(blockHash, true)) +} diff --git a/eth/backend.go b/eth/backend.go index 85095618222d..a5a6b184f52d 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -320,8 +320,14 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { log.Warn("Sanitizing invalid txpool journal time", "provided", rejournal, "updated", time.Second) rejournal = time.Second } - eth.localTxTracker = locals.New(config.TxPool.Journal, rejournal, eth.blockchain.Config(), eth.txPool) + broadcastPendingLocalTx := config.TxPool.BroadcastPendingLocalTx + if broadcastPendingLocalTx < time.Second { + log.Warn("Sanitizing invalid txpool broadcast local tx time", "provided", broadcastPendingLocalTx, "updated", time.Second) + broadcastPendingLocalTx = time.Second + } + eth.localTxTracker = locals.New(config.TxPool.Journal, rejournal, eth.blockchain.Config(), eth.txPool, broadcastPendingLocalTx) stack.RegisterLifecycle(eth.localTxTracker) + eth.txPool.PendingLocalTxsPublisher = eth.localTxTracker } // Permit the downloader to use the trie cache allowance during fast sync diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 09837a304505..82389ac984e7 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -1133,11 +1133,12 @@ func (d *Downloader) reportSnapSyncProgress(force bool) { } // Don't report anything until we have a meaningful progress var ( - headerBytes, _ = d.stateDB.AncientSize(rawdb.ChainFreezerHeaderTable) - bodyBytes, _ = d.stateDB.AncientSize(rawdb.ChainFreezerBodiesTable) - receiptBytes, _ = d.stateDB.AncientSize(rawdb.ChainFreezerReceiptTable) + headerBytes, _ = d.stateDB.AncientSize(rawdb.ChainFreezerHeaderTable) + bodyBytes, _ = d.stateDB.AncientSize(rawdb.ChainFreezerBodiesTable) + receiptBytes, _ = d.stateDB.AncientSize(rawdb.ChainFreezerReceiptTable) + transferLogBytes, _ = d.stateDB.AncientSize(rawdb.ChainFreezerTransferLogTable) ) - syncedBytes := common.StorageSize(headerBytes + bodyBytes + receiptBytes) + syncedBytes := common.StorageSize(headerBytes + bodyBytes + receiptBytes + transferLogBytes) if syncedBytes == 0 { return } @@ -1175,11 +1176,12 @@ func (d *Downloader) reportSnapSyncProgress(force bool) { left = latest.Number.Uint64() - block.Number.Uint64() eta = time.Since(d.syncStartTime) / time.Duration(fetchedBlocks) * time.Duration(left) - progress = fmt.Sprintf("%.2f%%", float64(block.Number.Uint64())*100/float64(latest.Number.Uint64())) - headers = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(header.Number.Uint64()), common.StorageSize(headerBytes).TerminalString()) - bodies = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(block.Number.Uint64()), common.StorageSize(bodyBytes).TerminalString()) - receipts = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(block.Number.Uint64()), common.StorageSize(receiptBytes).TerminalString()) + progress = fmt.Sprintf("%.2f%%", float64(block.Number.Uint64())*100/float64(latest.Number.Uint64())) + headers = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(header.Number.Uint64()), common.StorageSize(headerBytes).TerminalString()) + bodies = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(block.Number.Uint64()), common.StorageSize(bodyBytes).TerminalString()) + receipts = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(block.Number.Uint64()), common.StorageSize(receiptBytes).TerminalString()) + transferLogs = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(block.Number.Uint64()), common.StorageSize(transferLogBytes).TerminalString()) ) - log.Info("Syncing: chain download in progress", "synced", progress, "chain", syncedBytes, "headers", headers, "bodies", bodies, "receipts", receipts, "eta", common.PrettyDuration(eta)) + log.Info("Syncing: chain download in progress", "synced", progress, "chain", syncedBytes, "headers", headers, "bodies", bodies, "receipts", receipts, "transferLogs", transferLogs, "eta", common.PrettyDuration(eta)) d.syncLogTime = time.Now() } diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index f10e6a277bb7..0ff142265d27 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -392,7 +392,6 @@ func (es *EventSystem) SubscribePendingTxs(txs chan []*types.Transaction) *Subsc logs: make(chan []*types.Log), txs: txs, headers: make(chan *types.Header), - receipts: make(chan []*ReceiptWithTx), installed: make(chan struct{}), err: make(chan error), } diff --git a/eth/handler.go b/eth/handler.go index ff970e2ba628..ef4bafbee966 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -58,6 +58,10 @@ const ( // All transactions with a higher size will be announced and need to be fetched // by the peer. txMaxBroadcastSize = 4096 + + // pendingLocalTxChanSize is the size of channel listening to NewTxsEvent. + // The number is referenced from the size of tx pool. + pendingLocalTxChanSize = 4096 ) var syncChallengeTimeout = 15 * time.Second // Time allowance for a node to reply to the sync progress challenge @@ -92,6 +96,10 @@ type txPool interface { // can decide whether to receive notifications only for newly seen transactions // or also for reorged out ones. SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription + + // SubscribePendingLocalTxsEvent should return an event subscription of + // NewTxsEvent and send events to the given channel. + SubscribePendingLocalTxsEvent(chan<- core.PendingLocalTxsEvent) event.Subscription } // handlerConfig is the collection of initialization parameters to create a full @@ -125,10 +133,12 @@ type handler struct { peers *peerSet txBroadcastKey [16]byte - eventMux *event.TypeMux - txsCh chan core.NewTxsEvent - txsSub event.Subscription - blockRange *blockRangeState + eventMux *event.TypeMux + txsCh chan core.NewTxsEvent + txsSub event.Subscription + blockRange *blockRangeState + pendingLocalTxsCh chan core.PendingLocalTxsEvent + pendingLocalTxsSub event.Subscription requiredBlocks map[uint64]common.Hash @@ -433,7 +443,10 @@ func (h *handler) Start(maxPeers int) { h.wg.Add(1) h.txsCh = make(chan core.NewTxsEvent, txChanSize) h.txsSub = h.txpool.SubscribeTransactions(h.txsCh, false) + h.pendingLocalTxsCh = make(chan core.PendingLocalTxsEvent, pendingLocalTxChanSize) + h.pendingLocalTxsSub = h.txpool.SubscribePendingLocalTxsEvent(h.pendingLocalTxsCh) go h.txBroadcastLoop() + go h.pendingLocalTxBroadcastLoop() // broadcast block range h.wg.Add(1) @@ -453,6 +466,7 @@ func (h *handler) Stop() { h.blockRange.stop() h.txFetcher.Stop() h.downloader.Terminate() + h.pendingLocalTxsSub.Unsubscribe() // Quit chainSync and txsync64. // After this is done, no new peers will be accepted. @@ -528,6 +542,20 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) { "bcastpeers", len(txset), "bcastcount", directCount, "annpeers", len(annos), "anncount", annCount) } +// BroadcastPendingLocalTxs will propagate a batch of transactions to all peers +func (h *handler) BroadcastPendingLocalTxs(txs types.Transactions) { + peers := h.peers.Clone() + // Build tx hashes + txHashes := make([]common.Hash, len(txs)) + for i, tx := range txs { + txHashes[i] = tx.Hash() + } + for _, peer := range peers { + peer.AsyncSendTransactions(txHashes) + } + log.Info("Broadcast pending local transaction to all peers", "recipients", len(peers)) +} + // txBroadcastLoop announces new transactions to connected peers. func (h *handler) txBroadcastLoop() { defer h.wg.Done() @@ -604,6 +632,19 @@ func (h *handler) blockRangeLoop(st *blockRangeState) { } } +func (h *handler) pendingLocalTxBroadcastLoop() { + for { + select { + case event := <-h.pendingLocalTxsCh: + h.BroadcastPendingLocalTxs(event.Txs) + + // Err() channel will be closed when unsubscribing. + case <-h.pendingLocalTxsSub.Err(): + return + } + } +} + // blockRangeWhileSnapSyncing announces block range updates during snap sync. // Here we poll the CurrentSnapBlock on a timer and announce updates to it. func (h *handler) blockRangeWhileSnapSyncing(st *blockRangeState) { diff --git a/eth/handler_test.go b/eth/handler_test.go index b37e6227f42c..b63236918694 100644 --- a/eth/handler_test.go +++ b/eth/handler_test.go @@ -54,10 +54,10 @@ var ( // Its goal is to get around setting up a valid statedb for the balance and nonce // checks. type testTxPool struct { - pool map[common.Hash]*types.Transaction // Hash map of collected transactions - - txFeed event.Feed // Notification feed to allow waiting for inclusion - lock sync.RWMutex // Protects the transaction pool + pool map[common.Hash]*types.Transaction // Hash map of collected transactions + pendingLocalTxFeed event.Feed + txFeed event.Feed // Notification feed to allow waiting for inclusion + lock sync.RWMutex // Protects the transaction pool } // newTestTxPool creates a mock transaction pool. @@ -157,6 +157,10 @@ func (p *testTxPool) Pending(filter txpool.PendingFilter) map[common.Address][]* return pending } +func (p *testTxPool) SubscribePendingLocalTxsEvent(ch chan<- core.PendingLocalTxsEvent) event.Subscription { + return p.pendingLocalTxFeed.Subscribe(ch) +} + // SubscribeTransactions should return an event subscription of NewTxsEvent and // send events to the given channel. func (p *testTxPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription { diff --git a/eth/peer.go b/eth/peer.go index 5808c3a3c557..0117c4638b49 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -71,3 +71,15 @@ func (p *snapPeer) info() *snapPeerInfo { Version: p.Version(), } } + +// Clone clones a peers +func (ps *peerSet) Clone() []*ethPeer { + ps.lock.RLock() + defer ps.lock.RUnlock() + + list := make([]*ethPeer, 0, len(ps.peers)) + for _, p := range ps.peers { + list = append(list, p) + } + return list +} diff --git a/ethclient/ethclient.go b/ethclient/ethclient.go index 5008378da6a6..a8502bfac44b 100644 --- a/ethclient/ethclient.go +++ b/ethclient/ethclient.go @@ -392,6 +392,14 @@ func (ec *Client) SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) return sub, nil } +func (ec *Client) SubscribePendingTransactions(ctx context.Context, ch chan<- common.Hash) (ethereum.Subscription, error) { + return ec.c.EthSubscribe(ctx, ch, "newPendingTransactions") +} + +func (ec *Client) SubscribeQueuedTransactions(ctx context.Context, ch chan<- *types.Transaction) (ethereum.Subscription, error) { + return ec.c.EthSubscribe(ctx, ch, "newPendingTransactions", true) +} + // State Access // NetworkID returns the network ID for this client. diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index d7cf47468c48..3a40ce5ce2d7 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -334,6 +334,15 @@ func (api *BlockChainAPI) GetBalance(ctx context.Context, address common.Address return (*hexutil.Big)(b), state.Error() } +// GetBalanceByHash returns the amount of wei for the given address in the state of the +// given block hash. The rpc.LatestBlockNumber and rpc.PendingBlockNumber meta +// block numbers are also allowed. +// Deprecated: can be replaced by GetBalance +func (api *BlockChainAPI) GetBalanceByHash(ctx context.Context, address common.Address, blockHash common.Hash) (*hexutil.Big, error) { + hash := rpc.BlockNumberOrHashWithHash(blockHash, false) + return api.GetBalance(ctx, address, hash) +} + // AccountResult structs for GetProof type AccountResult struct { Address common.Address `json:"address"` diff --git a/internal/jsre/deps/web3.js b/internal/jsre/deps/web3.js index 3a19dce06c24..739095ed72b4 100644 --- a/internal/jsre/deps/web3.js +++ b/internal/jsre/deps/web3.js @@ -1192,7 +1192,7 @@ module.exports = SolidityTypeInt; You should have received a copy of the GNU Lesser General Public License along with web3.js. If not, see . */ -/** +/** * @file param.js * @author Marek Kotewicz * @date 2015 @@ -1211,7 +1211,7 @@ var SolidityParam = function (value, offset) { /** * This method should be used to get length of params's dynamic part - * + * * @method dynamicPartLength * @returns {Number} length of dynamic part (in bytes) */ @@ -1239,7 +1239,7 @@ SolidityParam.prototype.withOffset = function (offset) { * @param {SolidityParam} result of combination */ SolidityParam.prototype.combine = function (param) { - return new SolidityParam(this.value + param.value); + return new SolidityParam(this.value + param.value); }; /** @@ -1271,8 +1271,8 @@ SolidityParam.prototype.offsetAsBytes = function () { */ SolidityParam.prototype.staticPart = function () { if (!this.isDynamic()) { - return this.value; - } + return this.value; + } return this.offsetAsBytes(); }; @@ -1304,7 +1304,7 @@ SolidityParam.prototype.encode = function () { * @returns {String} */ SolidityParam.encodeList = function (params) { - + // updating offsets var totalOffset = params.length * 32; var offsetParams = params.map(function (param) { @@ -1746,13 +1746,13 @@ if (typeof XMLHttpRequest === 'undefined') { /** * Utils - * + * * @module utils */ /** * Utility functions - * + * * @class [utils] config * @constructor */ @@ -1819,7 +1819,7 @@ module.exports = { You should have received a copy of the GNU Lesser General Public License along with web3.js. If not, see . */ -/** +/** * @file sha3.js * @author Marek Kotewicz * @date 2015 @@ -2730,7 +2730,7 @@ module.exports = AllSolidityEvents; You should have received a copy of the GNU Lesser General Public License along with web3.js. If not, see . */ -/** +/** * @file batch.js * @author Marek Kotewicz * @date 2015 @@ -2775,7 +2775,7 @@ Batch.prototype.execute = function () { requests[index].callback(null, (requests[index].format ? requests[index].format(result.result) : result.result)); } }); - }); + }); }; module.exports = Batch; @@ -2962,7 +2962,7 @@ var ContractFactory = function (eth, abi) { */ this.new = function () { /*jshint maxcomplexity: 7 */ - + var contract = new Contract(this.eth, this.abi); // parse arguments @@ -3110,7 +3110,7 @@ module.exports = ContractFactory; You should have received a copy of the GNU Lesser General Public License along with web3.js. If not, see . */ -/** +/** * @file errors.js * @author Marek Kotewicz * @date 2015 @@ -3385,7 +3385,7 @@ var extend = function (web3) { } }; - ex.formatters = formatters; + ex.formatters = formatters; ex.utils = utils; ex.Method = Method; ex.Property = Property; @@ -4449,7 +4449,7 @@ module.exports = HttpProvider; You should have received a copy of the GNU Lesser General Public License along with web3.js. If not, see . */ -/** +/** * @file iban.js * @author Marek Kotewicz * @date 2015 @@ -4649,7 +4649,7 @@ Iban.prototype.address = function () { var base36 = this._iban.substr(4); var asBn = new BigNumber(base36, 36); return padLeft(asBn.toString(16), 20); - } + } return ''; }; @@ -4694,7 +4694,7 @@ var IpcProvider = function (path, net) { var _this = this; this.responseCallbacks = {}; this.path = path; - + this.connection = net.connect({path: this.path}); this.connection.on('error', function(e){ @@ -4704,7 +4704,7 @@ var IpcProvider = function (path, net) { this.connection.on('end', function(){ _this._timeout(); - }); + }); // LISTEN FOR CONNECTION RESPONSES @@ -4743,7 +4743,7 @@ Will parse the response and make an array out of it. IpcProvider.prototype._parseResponse = function(data) { var _this = this, returnValues = []; - + // DE-CHUNKER var dechunkedData = data .replace(/\}[\n\r]?\{/g,'}|--|{') // }{ @@ -4847,7 +4847,7 @@ IpcProvider.prototype.send = function (payload) { try { result = JSON.parse(data); } catch(e) { - throw errors.InvalidResponse(data); + throw errors.InvalidResponse(data); } return result; @@ -5022,7 +5022,7 @@ Method.prototype.extractCallback = function (args) { /** * Should be called to check if the number of arguments is correct - * + * * @method validateArgs * @param {Array} arguments * @throws {Error} if it is not @@ -5035,7 +5035,7 @@ Method.prototype.validateArgs = function (args) { /** * Should be called to format input args of method - * + * * @method formatInput * @param {Array} * @return {Array} @@ -5089,7 +5089,7 @@ Method.prototype.attachToObject = function (obj) { obj[name[0]] = obj[name[0]] || {}; obj[name[0]][name[1]] = func; } else { - obj[name[0]] = func; + obj[name[0]] = func; } }; @@ -5152,8 +5152,8 @@ var DB = function (web3) { this._requestManager = web3._requestManager; var self = this; - - methods().forEach(function(method) { + + methods().forEach(function(method) { method.attachToObject(self); method.setRequestManager(web3._requestManager); }); @@ -5299,6 +5299,14 @@ var methods = function () { outputFormatter: formatters.outputBigNumberFormatter }); + var getBalanceByHash = new Method({ + name: 'getBalanceByHash', + call: 'eth_getBalanceByHash', + params: 2, + inputFormatter: [formatters.inputAddressFormatter, utils.toHex], + outputFormatter: formatters.outputBigNumberFormatter + }); + var getStorageAt = new Method({ name: 'getStorageAt', call: 'eth_getStorageAt', @@ -5417,6 +5425,13 @@ var methods = function () { inputFormatter: [formatters.inputCallFormatter, formatters.inputDefaultBlockNumberFormatter] }); + var callByHash = new Method({ + name: 'callByHash', + call: 'eth_callByHash', + params: 2, + inputFormatter: [formatters.inputCallFormatter, utils.toHex] + }); + var estimateGas = new Method({ name: 'estimateGas', call: 'eth_estimateGas', @@ -5427,6 +5442,7 @@ var methods = function () { return [ getBalance, + getBalanceByHash, getStorageAt, getCode, getBlock, @@ -5439,6 +5455,7 @@ var methods = function () { getTransactionReceipt, getTransactionCount, call, + callByHash, estimateGas, sendRawTransaction, signTransaction, @@ -5548,7 +5565,7 @@ var Net = function (web3) { var self = this; - properties().forEach(function(p) { + properties().forEach(function(p) { p.attachToObject(self); p.setRequestManager(web3._requestManager); }); @@ -5786,7 +5803,7 @@ module.exports = { You should have received a copy of the GNU Lesser General Public License along with web3.js. If not, see . */ -/** +/** * @file namereg.js * @author Marek Kotewicz * @date 2015 @@ -5973,7 +5990,7 @@ module.exports = Property; You should have received a copy of the GNU Lesser General Public License along with web3.js. If not, see . */ -/** +/** * @file requestmanager.js * @author Jeffrey Wilcke * @author Marek Kotewicz @@ -6040,7 +6057,7 @@ RequestManager.prototype.sendAsync = function (data, callback) { if (err) { return callback(err); } - + if (!Jsonrpc.isValidResponse(result)) { return callback(errors.InvalidResponse(result)); } @@ -6073,7 +6090,7 @@ RequestManager.prototype.sendBatch = function (data, callback) { } callback(err, results); - }); + }); }; /** @@ -6177,7 +6194,7 @@ RequestManager.prototype.poll = function () { } var payload = Jsonrpc.toBatchPayload(pollsData); - + // map the request id to they poll id var pollsIdMap = {}; payload.forEach(function(load, index){ @@ -6207,7 +6224,7 @@ RequestManager.prototype.poll = function () { } else return false; }).filter(function (result) { - return !!result; + return !!result; }).filter(function (result) { var valid = Jsonrpc.isValidResponse(result); if (!valid) { @@ -6282,16 +6299,16 @@ var pollSyncing = function(self) { self.callbacks.forEach(function (callback) { if (self.lastSyncState !== sync) { - + // call the callback with true first so the app can stop anything, before receiving the sync data if(!self.lastSyncState && utils.isObject(sync)) callback(null, true); - + // call on the next CPU cycle, so the actions of the sync stop can be processes first setTimeout(function() { callback(null, sync); }, 0); - + self.lastSyncState = sync; } }); @@ -6346,7 +6363,7 @@ module.exports = IsSyncing; You should have received a copy of the GNU Lesser General Public License along with web3.js. If not, see . */ -/** +/** * @file transfer.js * @author Marek Kotewicz * @date 2015 @@ -6365,7 +6382,7 @@ var exchangeAbi = require('../contracts/SmartExchange.json'); * @param {Function} callback, callback */ var transfer = function (eth, from, to, value, callback) { - var iban = new Iban(to); + var iban = new Iban(to); if (!iban.isValid()) { throw new Error('invalid iban address'); } @@ -6373,7 +6390,7 @@ var transfer = function (eth, from, to, value, callback) { if (iban.isDirect()) { return transferToAddress(eth, from, iban.address(), value, callback); } - + if (!callback) { var address = eth.icapNamereg().addr(iban.institution()); return deposit(eth, from, address, value, iban.client()); @@ -6382,7 +6399,7 @@ var transfer = function (eth, from, to, value, callback) { eth.icapNamereg().addr(iban.institution(), function (err, address) { return deposit(eth, from, address, value, iban.client(), callback); }); - + }; /** diff --git a/internal/web3ext/web3ext.go b/internal/web3ext/web3ext.go index 0aedffe2307a..e4cf3b434cce 100644 --- a/internal/web3ext/web3ext.go +++ b/internal/web3ext/web3ext.go @@ -474,6 +474,18 @@ web3._extend({ params: 1, inputFormatter: [null], }), + new web3._extend.Method({ + name: 'getTransferLogs', + call: 'debug_getTransferLogs', + params: 1, + inputFormatter: [null], + }), + new web3._extend.Method({ + name: 'getBlockReceipts', + call: 'debug_getBlockReceipts', + params: 1, + inputFormatter: [null], + }), ], properties: [] }); diff --git a/rpc/websocket.go b/rpc/websocket.go index 543ff617baac..48dc72d0cce7 100644 --- a/rpc/websocket.go +++ b/rpc/websocket.go @@ -38,7 +38,7 @@ const ( wsPingInterval = 30 * time.Second wsPingWriteTimeout = 5 * time.Second wsPongTimeout = 30 * time.Second - wsDefaultReadLimit = 32 * 1024 * 1024 + wsDefaultReadLimit = 128 * 1024 * 1024 ) var wsBufferPool = new(sync.Pool)