Skip to content

Commit f6857a3

Browse files
authored
Merge pull request #133 from getamis/feature/protocol-consensus-interface
cmd, consensus, eth, ethstats: modify consensus interface to handle different consensus messages
2 parents 3838cb6 + 6572173 commit f6857a3

25 files changed

+369
-514
lines changed

cmd/geth/misccmd.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,6 @@ func version(ctx *cli.Context) error {
102102
fmt.Println("Git Commit:", gitCommit)
103103
}
104104
fmt.Println("Architecture:", runtime.GOARCH)
105-
fmt.Println("Protocol Versions:", eth.ProtocolVersions)
106105
fmt.Println("Network Id:", eth.DefaultConfig.NetworkId)
107106
fmt.Println("Go Version:", runtime.Version())
108107
fmt.Println("Operating System:", runtime.GOOS)

consensus/clique/clique.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -654,3 +654,8 @@ func (c *Clique) APIs(chain consensus.ChainReader) []rpc.API {
654654
Public: false,
655655
}}
656656
}
657+
658+
// Protocol implements consensus.Engine.Protocol
659+
func (c *Clique) Protocol() consensus.Protocol {
660+
return consensus.EthProtocol
661+
}

consensus/consensus.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/ethereum/go-ethereum/common"
2222
"github.com/ethereum/go-ethereum/core/state"
2323
"github.com/ethereum/go-ethereum/core/types"
24+
"github.com/ethereum/go-ethereum/p2p"
2425
"github.com/ethereum/go-ethereum/params"
2526
"github.com/ethereum/go-ethereum/rpc"
2627
)
@@ -90,6 +91,18 @@ type Engine interface {
9091

9192
// APIs returns the RPC APIs this consensus engine provides.
9293
APIs(chain ChainReader) []rpc.API
94+
95+
// Protocol returns the protocol for this consensus
96+
Protocol() Protocol
97+
}
98+
99+
// Handler should be implemented is the consensus needs to handle and send peer's message
100+
type Handler interface {
101+
// HandleMsg handles a message from peer
102+
HandleMsg(address common.Address, data p2p.Msg) (bool, error)
103+
104+
// SetBroadcaster sets the broadcaster to send message to peers
105+
SetBroadcaster(Broadcaster)
93106
}
94107

95108
// PoW is a consensus engine based on proof-of-work.
@@ -104,12 +117,6 @@ type PoW interface {
104117
type Istanbul interface {
105118
Engine
106119

107-
// HandleMsg handles a message from peer
108-
HandleMsg(addr common.Address, data []byte) error
109-
110-
// NewChainHead is called if a new chain head block comes
111-
NewChainHead(block *types.Block) error
112-
113120
// Start starts the engine
114121
Start(chain ChainReader, inserter func(types.Blocks) (int, error)) error
115122

consensus/ethash/ethash.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -597,3 +597,8 @@ func (ethash *Ethash) APIs(chain consensus.ChainReader) []rpc.API {
597597
func SeedHash(block uint64) []byte {
598598
return seedHash(block)
599599
}
600+
601+
// Protocol implements consensus.Engine.Protocol
602+
func (ethash *Ethash) Protocol() consensus.Protocol {
603+
return consensus.EthProtocol
604+
}

consensus/istanbul/backend.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package istanbul
1818

1919
import (
20-
"math/big"
2120
"time"
2221

2322
"github.com/ethereum/go-ethereum/common"
@@ -59,15 +58,9 @@ type Backend interface {
5958
// the given validator
6059
CheckSignature(data []byte, addr common.Address, sig []byte) error
6160

62-
// HasBlock checks if the combination of the given hash and height matches any existing blocks
63-
HasBlock(hash common.Hash, number *big.Int) bool
64-
6561
// GetProposer returns the proposer of the given block height
6662
GetProposer(number uint64) common.Address
6763

68-
// ParentValidators returns the validator set of the given proposal's parent block
69-
ParentValidators(proposal Proposal) ValidatorSet
70-
7164
// LastProposal retrieves latest committed proposal and the address of proposer
7265
LastProposal() (Proposal, common.Address)
7366
}

consensus/istanbul/backend/backend.go

Lines changed: 37 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package backend
1818

1919
import (
2020
"crypto/ecdsa"
21-
"math/big"
2221
"sync"
2322
"time"
2423

@@ -40,6 +39,8 @@ import (
4039
func New(config *istanbul.Config, eventMux *event.TypeMux, privateKey *ecdsa.PrivateKey, db ethdb.Database) consensus.Istanbul {
4140
// Allocate the snapshot caches and create the engine
4241
recents, _ := lru.NewARC(inmemorySnapshots)
42+
recentMessages, _ := lru.NewARC(inmemoryPeers)
43+
knownMessages, _ := lru.NewARC(inmemoryMessages)
4344
backend := &backend{
4445
config: config,
4546
eventMux: eventMux,
@@ -52,6 +53,8 @@ func New(config *istanbul.Config, eventMux *event.TypeMux, privateKey *ecdsa.Pri
5253
recents: recents,
5354
candidates: make(map[common.Address]bool),
5455
coreStarted: false,
56+
recentMessages: recentMessages,
57+
knownMessages: knownMessages,
5558
}
5659
backend.core = istanbulCore.New(backend, backend.config)
5760
return backend
@@ -84,6 +87,13 @@ type backend struct {
8487
candidatesLock sync.RWMutex
8588
// Snapshots for recent block to speed up reorgs
8689
recents *lru.ARCCache
90+
91+
// event subscription for ChainHeadEvent event
92+
eventSub *event.TypeMuxSubscription
93+
broadcaster consensus.Broadcaster
94+
95+
recentMessages *lru.ARCCache // the cache of peer's messages
96+
knownMessages *lru.ARCCache // the cache of self messages
8797
}
8898

8999
// Address implements istanbul.Backend.Address
@@ -110,18 +120,36 @@ func (sb *backend) Broadcast(valSet istanbul.ValidatorSet, payload []byte) error
110120

111121
// Broadcast implements istanbul.Backend.Gossip
112122
func (sb *backend) Gossip(valSet istanbul.ValidatorSet, payload []byte) error {
123+
hash := istanbul.RLPHash(payload)
124+
sb.knownMessages.Add(hash, true)
125+
113126
targets := make(map[common.Address]bool)
114127
for _, val := range valSet.List() {
115128
if val.Address() != sb.Address() {
116129
targets[val.Address()] = true
117130
}
118131
}
119132

120-
if len(targets) > 0 {
121-
go sb.eventMux.Post(istanbul.ConsensusDataEvent{
122-
Targets: targets,
123-
Data: payload,
124-
})
133+
if sb.broadcaster != nil && len(targets) > 0 {
134+
ps := sb.broadcaster.FindPeers(targets)
135+
for addr, p := range ps {
136+
ms, ok := sb.recentMessages.Get(addr)
137+
var m *lru.ARCCache
138+
if ok {
139+
m, _ = ms.(*lru.ARCCache)
140+
if _, k := m.Get(hash); k {
141+
// This peer had this event, skip it
142+
continue
143+
}
144+
} else {
145+
m, _ = lru.NewARC(inmemoryMessages)
146+
}
147+
148+
m.Add(hash, true)
149+
sb.recentMessages.Add(addr, m)
150+
151+
go p.Send(istanbulMsg, payload)
152+
}
125153
}
126154
return nil
127155
}
@@ -162,10 +190,10 @@ func (sb *backend) Commit(proposal istanbul.Proposal, seals [][]byte) error {
162190
if _, err := sb.inserter(types.Blocks{block}); err != nil {
163191
return err
164192
}
165-
msg := istanbul.NewCommittedEvent{
166-
Block: block,
193+
194+
if sb.broadcaster != nil {
195+
go sb.broadcaster.BroadcastBlock(block, false)
167196
}
168-
go sb.eventMux.Post(msg)
169197
return nil
170198
}
171199

@@ -222,11 +250,6 @@ func (sb *backend) CheckSignature(data []byte, address common.Address, sig []byt
222250
return nil
223251
}
224252

225-
// HasBlock implements istanbul.Backend.HashBlock
226-
func (sb *backend) HasBlock(hash common.Hash, number *big.Int) bool {
227-
return sb.chain.GetHeader(hash, number.Uint64()) != nil
228-
}
229-
230253
// GetProposer implements istanbul.Backend.GetProposer
231254
func (sb *backend) GetProposer(number uint64) common.Address {
232255
if h := sb.chain.GetHeaderByNumber(number); h != nil {
@@ -236,14 +259,6 @@ func (sb *backend) GetProposer(number uint64) common.Address {
236259
return common.Address{}
237260
}
238261

239-
// ParentValidators implements istanbul.Backend.GetParentValidators
240-
func (sb *backend) ParentValidators(proposal istanbul.Proposal) istanbul.ValidatorSet {
241-
if block, ok := proposal.(*types.Block); ok {
242-
return sb.getValidators(block.Number().Uint64()-1, block.ParentHash())
243-
}
244-
return validator.NewSet(nil, sb.config.ProposerPolicy)
245-
}
246-
247262
func (sb *backend) getValidators(number uint64, hash common.Hash) istanbul.ValidatorSet {
248263
snap, err := sb.snapshot(sb.chain, number, hash, nil)
249264
if err != nil {

consensus/istanbul/backend/backend_test.go

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -177,19 +177,6 @@ func TestCommit(t *testing.T) {
177177
}
178178
}
179179

180-
func TestHasBlock(t *testing.T) {
181-
chain, engine := newBlockChain(1)
182-
block := makeBlockWithoutSeal(chain, engine, chain.Genesis())
183-
finalBlock, _ := engine.Seal(chain, block, nil)
184-
chain.InsertChain(types.Blocks{finalBlock})
185-
if engine.HasBlock(block.Hash(), finalBlock.Number()) {
186-
t.Errorf("error mismatch: have true, want false")
187-
}
188-
if !engine.HasBlock(finalBlock.Hash(), finalBlock.Number()) {
189-
t.Errorf("error mismatch: have false, want true")
190-
}
191-
}
192-
193180
func TestGetProposer(t *testing.T) {
194181
chain, engine := newBlockChain(1)
195182
block := makeBlock(chain, engine, chain.Genesis())
@@ -201,20 +188,6 @@ func TestGetProposer(t *testing.T) {
201188
}
202189
}
203190

204-
func TestParentValidators(t *testing.T) {
205-
chain, engine := newBlockChain(1)
206-
block := makeBlock(chain, engine, chain.Genesis())
207-
chain.InsertChain(types.Blocks{block})
208-
expected := engine.Validators(block).List()
209-
//Block without seal will make empty validator set
210-
block = makeBlockWithoutSeal(chain, engine, block)
211-
chain.InsertChain(types.Blocks{block})
212-
actual := engine.ParentValidators(block).List()
213-
if len(expected) != len(actual) || expected[0] != actual[0] {
214-
t.Errorf("validator set mismatch: have %v, want %v", actual, expected)
215-
}
216-
}
217-
218191
/**
219192
* SimpleBackend
220193
* Private key: bb047e5940b6d83354d9432db7c449ac8fca2248008aaa7271369880f9f11cc1

consensus/istanbul/backend/engine.go

Lines changed: 9 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"github.com/ethereum/go-ethereum/consensus/istanbul"
3030
istanbulCore "github.com/ethereum/go-ethereum/consensus/istanbul/core"
3131
"github.com/ethereum/go-ethereum/consensus/istanbul/validator"
32+
"github.com/ethereum/go-ethereum/core"
3233
"github.com/ethereum/go-ethereum/core/state"
3334
"github.com/ethereum/go-ethereum/core/types"
3435
"github.com/ethereum/go-ethereum/crypto/sha3"
@@ -41,6 +42,8 @@ import (
4142
const (
4243
checkpointInterval = 1024 // Number of blocks after which to save the vote snapshot to the database
4344
inmemorySnapshots = 128 // Number of recent vote snapshots to keep in memory
45+
inmemoryPeers = 40
46+
inmemoryMessages = 100
4447
)
4548

4649
var (
@@ -482,39 +485,6 @@ func (sb *backend) APIs(chain consensus.ChainReader) []rpc.API {
482485
}}
483486
}
484487

485-
// HandleMsg implements consensus.Istanbul.HandleMsg
486-
func (sb *backend) HandleMsg(addr common.Address, data []byte) error {
487-
sb.coreMu.Lock()
488-
defer sb.coreMu.Unlock()
489-
if !sb.coreStarted {
490-
return istanbul.ErrStoppedEngine
491-
}
492-
493-
go sb.istanbulEventMux.Post(istanbul.MessageEvent{
494-
Payload: data,
495-
})
496-
return nil
497-
}
498-
499-
// NewChainHead implements consensus.Istanbul.NewChainHead
500-
func (sb *backend) NewChainHead(block *types.Block) error {
501-
sb.coreMu.Lock()
502-
defer sb.coreMu.Unlock()
503-
if !sb.coreStarted {
504-
return istanbul.ErrStoppedEngine
505-
}
506-
p, err := sb.Author(block.Header())
507-
if err != nil {
508-
sb.logger.Error("Failed to get block proposer", "err", err)
509-
return err
510-
}
511-
go sb.istanbulEventMux.Post(istanbul.FinalCommittedEvent{
512-
Proposal: block,
513-
Proposer: p,
514-
})
515-
return nil
516-
}
517-
518488
// Start implements consensus.Istanbul.Start
519489
func (sb *backend) Start(chain consensus.ChainReader, inserter func(types.Blocks) (int, error)) error {
520490
sb.coreMu.Lock()
@@ -550,6 +520,11 @@ func (sb *backend) Start(chain consensus.ChainReader, inserter func(types.Blocks
550520
if err := sb.core.Start(lastSequence, lastProposer, lastProposal); err != nil {
551521
return err
552522
}
523+
524+
// subscribe for chain head event
525+
sb.eventSub = sb.eventMux.Subscribe(core.ChainHeadEvent{})
526+
go sb.eventLoop()
527+
553528
sb.coreStarted = true
554529
return nil
555530
}
@@ -565,6 +540,7 @@ func (sb *backend) Stop() error {
565540
return err
566541
}
567542
sb.coreStarted = false
543+
sb.eventSub.Unsubscribe()
568544
return nil
569545
}
570546

0 commit comments

Comments
 (0)