Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .changelog/5751.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
go: Split storage sync p2p protocol

Storage sync protocol was split into two independent protocols (checkpoint
and diff sync).

This change was made since there may be fewer nodes that expose checkpoints
than storage diff. Previously, this could lead to issues with state sync
when a node was connected with peers that supported storage sync protocol
but had no checkpoints available.

This was done in backwards compatible manner, so that both protocols are still
advertised and used. Eventually, we plan to remove legacy protocol.
4 changes: 2 additions & 2 deletions go/oasis-node/cmd/debug/byzantine/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
scheduler "github.com/oasisprotocol/oasis-core/go/scheduler/api"
storage "github.com/oasisprotocol/oasis-core/go/storage/api"
"github.com/oasisprotocol/oasis-core/go/worker/client"
storageP2P "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/sync"
"github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/synclegacy"
)

type byzantine struct {
Expand Down Expand Up @@ -154,7 +154,7 @@ func initializeAndRegisterByzantineNode(
if err != nil {
return nil, fmt.Errorf("initializing storage node failed: %w", err)
}
b.p2p.service.RegisterProtocolServer(storageP2P.NewServer(b.chainContext, b.runtimeID, storage))
b.p2p.service.RegisterProtocolServer(synclegacy.NewServer(b.chainContext, b.runtimeID, storage))
b.storage = storage

// Wait for activation epoch.
Expand Down
4 changes: 4 additions & 0 deletions go/p2p/rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,10 @@ func (c *client) CallMulti(
) ([]any, []PeerFeedback, error) {
c.logger.Debug("call multiple", "method", method)

if len(peers) == 0 {
return nil, nil, fmt.Errorf("no peers given to service the request")
}

co := NewCallMultiOptions(opts...)

// Prepare the request.
Expand Down
3 changes: 1 addition & 2 deletions go/worker/common/p2p/txsync/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

"github.com/oasisprotocol/oasis-core/go/common"
"github.com/oasisprotocol/oasis-core/go/common/crypto/hash"
"github.com/oasisprotocol/oasis-core/go/p2p/protocol"
"github.com/oasisprotocol/oasis-core/go/p2p/rpc"
)

Expand Down Expand Up @@ -82,7 +81,7 @@ func (c *client) GetTxs(ctx context.Context, request *GetTxsRequest) (*GetTxsRes

// NewClient creates a new transaction sync protocol client.
func NewClient(p2p rpc.P2P, chainContext string, runtimeID common.Namespace) Client {
pid := protocol.NewRuntimeProtocolID(chainContext, runtimeID, TxSyncProtocolID, TxSyncProtocolVersion)
pid := ProtocolID(chainContext, runtimeID)
mgr := rpc.NewPeerManager(p2p, pid)
rc := rpc.NewClient(p2p.Host(), pid)
rc.RegisterListener(mgr)
Expand Down
2 changes: 1 addition & 1 deletion go/worker/common/p2p/txsync/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func init() {

protocols := make([]core.ProtocolID, len(n.Runtimes))
for i, rt := range n.Runtimes {
protocols[i] = protocol.NewRuntimeProtocolID(chainContext, rt.ID, TxSyncProtocolID, TxSyncProtocolVersion)
protocols[i] = ProtocolID(chainContext, rt.ID)
}

return protocols
Expand Down
3 changes: 1 addition & 2 deletions go/worker/keymanager/p2p/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/oasisprotocol/oasis-core/go/common"
p2p "github.com/oasisprotocol/oasis-core/go/p2p/api"
"github.com/oasisprotocol/oasis-core/go/p2p/protocol"
"github.com/oasisprotocol/oasis-core/go/p2p/rpc"
)

Expand Down Expand Up @@ -46,7 +45,7 @@ func (c *client) CallEnclave(ctx context.Context, request *CallEnclaveRequest, p

// NewClient creates a new keymanager protocol client.
func NewClient(p2p p2p.Service, chainContext string, keymanagerID common.Namespace) Client {
pid := protocol.NewRuntimeProtocolID(chainContext, keymanagerID, KeyManagerProtocolID, KeyManagerProtocolVersion)
pid := ProtocolID(chainContext, keymanagerID)
mgr := rpc.NewPeerManager(p2p, pid)
rc := rpc.NewClient(p2p.Host(), pid)
rc.RegisterListener(mgr)
Expand Down
2 changes: 1 addition & 1 deletion go/worker/keymanager/p2p/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func init() {

protocols := make([]core.ProtocolID, len(n.Runtimes))
for i, rt := range n.Runtimes {
protocols[i] = protocol.NewRuntimeProtocolID(chainContext, rt.ID, KeyManagerProtocolID, KeyManagerProtocolVersion)
protocols[i] = ProtocolID(chainContext, rt.ID)
}

return protocols
Expand Down
98 changes: 80 additions & 18 deletions go/worker/storage/committee/checkpoint_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import (
"sync"
"time"

"github.com/oasisprotocol/oasis-core/go/p2p/rpc"
storageApi "github.com/oasisprotocol/oasis-core/go/storage/api"
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint"
storageSync "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/sync"
"github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/checkpointsync"
"github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/synclegacy"
)

const (
Expand Down Expand Up @@ -55,7 +57,7 @@ type chunk struct {
*checkpoint.ChunkMetadata

// checkpoint points to the checkpoint this chunk originated from.
checkpoint *storageSync.Checkpoint
checkpoint *checkpointsync.Checkpoint
}

type chunkHeap struct {
Expand Down Expand Up @@ -101,12 +103,7 @@ func (n *Node) checkpointChunkFetcher(
defer cancel()

// Fetch chunk from peers.
rsp, pf, err := n.storageSync.GetCheckpointChunk(chunkCtx, &storageSync.GetCheckpointChunkRequest{
Version: chunk.Version,
Root: chunk.Root,
Index: chunk.Index,
Digest: chunk.Digest,
}, chunk.checkpoint)
rsp, pf, err := n.fetchChunk(chunkCtx, chunk)
if err != nil {
n.logger.Error("failed to fetch chunk from peers",
"err", err,
Expand All @@ -117,7 +114,7 @@ func (n *Node) checkpointChunkFetcher(
}

// Restore fetched chunk.
done, err := n.localStorage.Checkpointer().RestoreChunk(chunkCtx, chunk.Index, bytes.NewBuffer(rsp.Chunk))
done, err := n.localStorage.Checkpointer().RestoreChunk(chunkCtx, chunk.Index, bytes.NewBuffer(rsp))
cancel()

switch {
Expand Down Expand Up @@ -157,7 +154,47 @@ func (n *Node) checkpointChunkFetcher(
}
}

func (n *Node) handleCheckpoint(check *storageSync.Checkpoint, maxParallelRequests uint) (cpStatus int, rerr error) {
// fetchChunk fetches chunk using checkpoint sync p2p protocol client.
//
// In case of no peers or error, it fallbacks to the legacy storage sync protocol.
func (n *Node) fetchChunk(ctx context.Context, chunk *chunk) ([]byte, rpc.PeerFeedback, error) {
rsp1, pf, err := n.checkpointSync.GetCheckpointChunk(
ctx,
&checkpointsync.GetCheckpointChunkRequest{
Version: chunk.Version,
Root: chunk.Root,
Index: chunk.Index,
Digest: chunk.Digest,
},
&checkpointsync.Checkpoint{
Metadata: chunk.checkpoint.Metadata,
Peers: chunk.checkpoint.Peers,
},
)
if err == nil { // if NO error
return rsp1.Chunk, pf, nil
}

rsp2, pf, err := n.legacyStorageSync.GetCheckpointChunk(
ctx,
&synclegacy.GetCheckpointChunkRequest{
Version: chunk.Version,
Root: chunk.Root,
Index: chunk.Index,
Digest: chunk.Digest,
},
&synclegacy.Checkpoint{
Metadata: chunk.checkpoint.Metadata,
Peers: chunk.checkpoint.Peers,
},
)
if err != nil {
return nil, nil, err
}
return rsp2.Chunk, pf, nil
}

func (n *Node) handleCheckpoint(check *checkpointsync.Checkpoint, maxParallelRequests uint) (cpStatus int, rerr error) {
if err := n.localStorage.Checkpointer().StartRestore(n.ctx, check.Metadata); err != nil {
// Any previous restores were already aborted by the driver up the call stack, so
// things should have been going smoothly here; bail.
Expand Down Expand Up @@ -276,13 +313,11 @@ func (n *Node) handleCheckpoint(check *storageSync.Checkpoint, maxParallelReques
}
}

func (n *Node) getCheckpointList() ([]*storageSync.Checkpoint, error) {
func (n *Node) getCheckpointList() ([]*checkpointsync.Checkpoint, error) {
ctx, cancel := context.WithTimeout(n.ctx, cpListsTimeout)
defer cancel()

list, err := n.storageSync.GetCheckpoints(ctx, &storageSync.GetCheckpointsRequest{
Version: 1,
})
list, err := n.fetchCheckpoints(ctx)
if err != nil {
n.logger.Error("failed to retrieve any checkpoints",
"err", err,
Expand All @@ -296,9 +331,36 @@ func (n *Node) getCheckpointList() ([]*storageSync.Checkpoint, error) {
return list, nil
}

// fetchCheckpoints fetches checkpoints using checkpoint sync p2p protocol client.
//
// In case of no peers, error or no checkpoints, it fallbacks to the legacy storage sync protocol.
func (n *Node) fetchCheckpoints(ctx context.Context) ([]*checkpointsync.Checkpoint, error) {
list1, err := n.checkpointSync.GetCheckpoints(ctx, &checkpointsync.GetCheckpointsRequest{
Version: 1,
})
if err == nil && len(list1) > 0 { // if NO error and at least one checkpoint
return list1, nil
}

list2, err := n.legacyStorageSync.GetCheckpoints(ctx, &synclegacy.GetCheckpointsRequest{
Version: 1,
})
if err != nil {
return nil, err
}
var cps []*checkpointsync.Checkpoint
for _, cp := range list2 {
cps = append(cps, &checkpointsync.Checkpoint{
Metadata: cp.Metadata,
Peers: cp.Peers,
})
}
return cps, nil
}

// sortCheckpoints sorts the slice in-place (descending by version, peers, hash).
func sortCheckpoints(s []*storageSync.Checkpoint) {
slices.SortFunc(s, func(a, b *storageSync.Checkpoint) int {
func sortCheckpoints(s []*checkpointsync.Checkpoint) {
slices.SortFunc(s, func(a, b *checkpointsync.Checkpoint) int {
return cmp.Or(
cmp.Compare(b.Root.Version, a.Root.Version),
cmp.Compare(len(b.Peers), len(a.Peers)),
Expand All @@ -307,7 +369,7 @@ func sortCheckpoints(s []*storageSync.Checkpoint) {
})
}

func (n *Node) checkCheckpointUsable(cp *storageSync.Checkpoint, remainingMask outstandingMask, genesisRound uint64) bool {
func (n *Node) checkCheckpointUsable(cp *checkpointsync.Checkpoint, remainingMask outstandingMask, genesisRound uint64) bool {
namespace := n.commonNode.Runtime.ID()
if !namespace.Equal(&cp.Root.Namespace) {
// Not for the right runtime.
Expand Down Expand Up @@ -357,7 +419,7 @@ func (n *Node) syncCheckpoints(genesisRound uint64, wantOnlyGenesis bool) (*bloc

// If we only want the genesis checkpoint, filter it out.
if wantOnlyGenesis && len(cps) > 0 {
var filteredCps []*storageSync.Checkpoint
var filteredCps []*checkpointsync.Checkpoint
for _, cp := range cps {
if cp.Root.Version == genesisRound {
filteredCps = append(filteredCps, cp)
Expand Down
14 changes: 7 additions & 7 deletions go/worker/storage/committee/checkpoint_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,35 +8,35 @@ import (
"github.com/oasisprotocol/oasis-core/go/p2p/rpc"
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint"
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/node"
"github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/sync"
"github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/checkpointsync"
)

func TestSortCheckpoints(t *testing.T) {
cp1 := &sync.Checkpoint{
cp1 := &checkpointsync.Checkpoint{
Metadata: &checkpoint.Metadata{
Root: node.Root{
Version: 2,
},
},
Peers: []rpc.PeerFeedback{rpc.NewNopPeerFeedback(), rpc.NewNopPeerFeedback()},
}
cp2 := &sync.Checkpoint{
cp2 := &checkpointsync.Checkpoint{
Metadata: &checkpoint.Metadata{
Root: node.Root{
Version: 2,
},
},
Peers: []rpc.PeerFeedback{rpc.NewNopPeerFeedback()},
}
cp3 := &sync.Checkpoint{
cp3 := &checkpointsync.Checkpoint{
Metadata: &checkpoint.Metadata{
Root: node.Root{
Version: 1,
},
},
Peers: []rpc.PeerFeedback{rpc.NewNopPeerFeedback(), rpc.NewNopPeerFeedback()},
}
cp4 := &sync.Checkpoint{
cp4 := &checkpointsync.Checkpoint{
Metadata: &checkpoint.Metadata{
Root: node.Root{
Version: 1,
Expand All @@ -45,9 +45,9 @@ func TestSortCheckpoints(t *testing.T) {
Peers: []rpc.PeerFeedback{rpc.NewNopPeerFeedback()},
}

s := []*sync.Checkpoint{cp2, cp3, cp4, cp1}
s := []*checkpointsync.Checkpoint{cp2, cp3, cp4, cp1}

sortCheckpoints(s)

assert.Equal(t, s, []*sync.Checkpoint{cp1, cp2, cp3, cp4})
assert.Equal(t, s, []*checkpointsync.Checkpoint{cp1, cp2, cp3, cp4})
}
Loading
Loading