Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .changelog/5751.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
go: Split storage sync p2p protocol

Storage sync protocol was split into two independent protocols (checkpoint
and diff sync).

This change was made since there may be fewer nodes that expose checkpoints
than storage diff. Previously, this could lead to issues with state sync
when a node was connected with peers that supported storage sync protocol
but had no checkpoints available.

This was done in backwards compatible manner, so that both protocols are still
advertised and used. Eventually, we plan to remove legacy protocol.
5 changes: 5 additions & 0 deletions .changelog/6261.internal.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
go/worker/storage: Remove legacy storage sync p2p protocol

The node will now only server new checkpoint and diff sync
p2p protocols. This is backward compatible since legacy
clients still serve both legacy and new protocols.
4 changes: 2 additions & 2 deletions go/oasis-node/cmd/debug/byzantine/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
scheduler "github.com/oasisprotocol/oasis-core/go/scheduler/api"
storage "github.com/oasisprotocol/oasis-core/go/storage/api"
"github.com/oasisprotocol/oasis-core/go/worker/client"
storageP2P "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/sync"
"github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/checkpointsync"
)

type byzantine struct {
Expand Down Expand Up @@ -154,7 +154,7 @@ func initializeAndRegisterByzantineNode(
if err != nil {
return nil, fmt.Errorf("initializing storage node failed: %w", err)
}
b.p2p.service.RegisterProtocolServer(storageP2P.NewServer(b.chainContext, b.runtimeID, storage))
b.p2p.service.RegisterProtocolServer(checkpointsync.NewServer(b.chainContext, b.runtimeID, storage))
b.storage = storage

// Wait for activation epoch.
Expand Down
4 changes: 4 additions & 0 deletions go/oasis-test-runner/oasis/compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type Compute struct { // nolint: maligned
storageBackend string
disablePublicRPC bool
checkpointSyncDisabled bool
legacySyncServerDisabled bool
checkpointCheckInterval time.Duration
checkpointParallelChunker bool
}
Expand All @@ -64,6 +65,7 @@ type ComputeCfg struct {
StorageBackend string
DisablePublicRPC bool
CheckpointSyncDisabled bool
LegacySyncServerDisabled bool
CheckpointCheckInterval time.Duration
CheckpointParallelChunker bool
}
Expand Down Expand Up @@ -170,6 +172,7 @@ func (worker *Compute) ModifyConfig() error {
worker.Config.Storage.Backend = worker.storageBackend
worker.Config.Storage.PublicRPCEnabled = !worker.disablePublicRPC
worker.Config.Storage.CheckpointSyncDisabled = worker.checkpointSyncDisabled
worker.Config.Storage.LegacySyncServerDisabled = worker.legacySyncServerDisabled
worker.Config.Storage.Checkpointer.Enabled = true
worker.Config.Storage.Checkpointer.CheckInterval = worker.checkpointCheckInterval
worker.Config.Storage.Checkpointer.ParallelChunker = worker.checkpointParallelChunker
Expand Down Expand Up @@ -236,6 +239,7 @@ func (net *Network) NewCompute(cfg *ComputeCfg) (*Compute, error) {
sentryIndices: cfg.SentryIndices,
disablePublicRPC: cfg.DisablePublicRPC,
checkpointSyncDisabled: cfg.CheckpointSyncDisabled,
legacySyncServerDisabled: cfg.LegacySyncServerDisabled,
checkpointCheckInterval: cfg.CheckpointCheckInterval,
checkpointParallelChunker: cfg.CheckpointParallelChunker,
sentryPubKey: sentryPubKey,
Expand Down
14 changes: 8 additions & 6 deletions go/oasis-test-runner/oasis/fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ type ComputeWorkerFixture struct {

CheckpointCheckInterval time.Duration `json:"checkpoint_check_interval,omitempty"`
CheckpointSyncEnabled bool `json:"checkpoint_sync_enabled,omitempty"`
LegacySyncServerDisabled bool `json:"legacy_sync_server_disabled,omitempty"`
CheckpointParallelChunker bool `json:"checkpoint_parallel_chunker,omitempty"`

// Runtimes contains the indexes of the runtimes to enable.
Expand Down Expand Up @@ -449,12 +450,13 @@ func (f *ComputeWorkerFixture) Create(net *Network) (*Compute, error) {
CheckpointParallelChunker: f.CheckpointParallelChunker,
// The checkpoint syncing flag is intentionally flipped here.
// Syncing should normally be enabled, but normally disabled in tests.
CheckpointSyncDisabled: !f.CheckpointSyncEnabled,
DisablePublicRPC: f.DisablePublicRPC,
Runtimes: f.Runtimes,
RuntimeConfig: f.RuntimeConfig,
RuntimeProvisioner: f.RuntimeProvisioner,
RuntimeStatePaths: f.RuntimeStatePaths,
CheckpointSyncDisabled: !f.CheckpointSyncEnabled,
LegacySyncServerDisabled: f.LegacySyncServerDisabled,
DisablePublicRPC: f.DisablePublicRPC,
Runtimes: f.Runtimes,
RuntimeConfig: f.RuntimeConfig,
RuntimeProvisioner: f.RuntimeProvisioner,
RuntimeStatePaths: f.RuntimeStatePaths,
})
}

Expand Down
4 changes: 4 additions & 0 deletions go/p2p/rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,10 @@ func (c *client) CallMulti(
) ([]any, []PeerFeedback, error) {
c.logger.Debug("call multiple", "method", method)

if len(peers) == 0 {
return nil, nil, fmt.Errorf("no peers given to service the request")
}

co := NewCallMultiOptions(opts...)

// Prepare the request.
Expand Down
3 changes: 1 addition & 2 deletions go/worker/common/p2p/txsync/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

"github.com/oasisprotocol/oasis-core/go/common"
"github.com/oasisprotocol/oasis-core/go/common/crypto/hash"
"github.com/oasisprotocol/oasis-core/go/p2p/protocol"
"github.com/oasisprotocol/oasis-core/go/p2p/rpc"
)

Expand Down Expand Up @@ -82,7 +81,7 @@ func (c *client) GetTxs(ctx context.Context, request *GetTxsRequest) (*GetTxsRes

// NewClient creates a new transaction sync protocol client.
func NewClient(p2p rpc.P2P, chainContext string, runtimeID common.Namespace) Client {
pid := protocol.NewRuntimeProtocolID(chainContext, runtimeID, TxSyncProtocolID, TxSyncProtocolVersion)
pid := ProtocolID(chainContext, runtimeID)
mgr := rpc.NewPeerManager(p2p, pid)
rc := rpc.NewClient(p2p.Host(), pid)
rc.RegisterListener(mgr)
Expand Down
2 changes: 1 addition & 1 deletion go/worker/common/p2p/txsync/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func init() {

protocols := make([]core.ProtocolID, len(n.Runtimes))
for i, rt := range n.Runtimes {
protocols[i] = protocol.NewRuntimeProtocolID(chainContext, rt.ID, TxSyncProtocolID, TxSyncProtocolVersion)
protocols[i] = ProtocolID(chainContext, rt.ID)
}

return protocols
Expand Down
3 changes: 1 addition & 2 deletions go/worker/keymanager/p2p/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/oasisprotocol/oasis-core/go/common"
p2p "github.com/oasisprotocol/oasis-core/go/p2p/api"
"github.com/oasisprotocol/oasis-core/go/p2p/protocol"
"github.com/oasisprotocol/oasis-core/go/p2p/rpc"
)

Expand Down Expand Up @@ -46,7 +45,7 @@ func (c *client) CallEnclave(ctx context.Context, request *CallEnclaveRequest, p

// NewClient creates a new keymanager protocol client.
func NewClient(p2p p2p.Service, chainContext string, keymanagerID common.Namespace) Client {
pid := protocol.NewRuntimeProtocolID(chainContext, keymanagerID, KeyManagerProtocolID, KeyManagerProtocolVersion)
pid := ProtocolID(chainContext, keymanagerID)
mgr := rpc.NewPeerManager(p2p, pid)
rc := rpc.NewClient(p2p.Host(), pid)
rc.RegisterListener(mgr)
Expand Down
2 changes: 1 addition & 1 deletion go/worker/keymanager/p2p/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func init() {

protocols := make([]core.ProtocolID, len(n.Runtimes))
for i, rt := range n.Runtimes {
protocols[i] = protocol.NewRuntimeProtocolID(chainContext, rt.ID, KeyManagerProtocolID, KeyManagerProtocolVersion)
protocols[i] = ProtocolID(chainContext, rt.ID)
}

return protocols
Expand Down
37 changes: 22 additions & 15 deletions go/worker/storage/committee/checkpoint_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

storageApi "github.com/oasisprotocol/oasis-core/go/storage/api"
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint"
storageSync "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/sync"
"github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/checkpointsync"
)

const (
Expand Down Expand Up @@ -55,7 +55,7 @@ type chunk struct {
*checkpoint.ChunkMetadata

// checkpoint points to the checkpoint this chunk originated from.
checkpoint *storageSync.Checkpoint
checkpoint *checkpointsync.Checkpoint
}

type chunkHeap struct {
Expand Down Expand Up @@ -101,12 +101,19 @@ func (n *Node) checkpointChunkFetcher(
defer cancel()

// Fetch chunk from peers.
rsp, pf, err := n.storageSync.GetCheckpointChunk(chunkCtx, &storageSync.GetCheckpointChunkRequest{
Version: chunk.Version,
Root: chunk.Root,
Index: chunk.Index,
Digest: chunk.Digest,
}, chunk.checkpoint)
rsp, pf, err := n.checkpointSync.GetCheckpointChunk(
chunkCtx,
&checkpointsync.GetCheckpointChunkRequest{
Version: chunk.Version,
Root: chunk.Root,
Index: chunk.Index,
Digest: chunk.Digest,
},
&checkpointsync.Checkpoint{
Metadata: chunk.checkpoint.Metadata,
Peers: chunk.checkpoint.Peers,
},
)
if err != nil {
n.logger.Error("failed to fetch chunk from peers",
"err", err,
Expand Down Expand Up @@ -157,7 +164,7 @@ func (n *Node) checkpointChunkFetcher(
}
}

func (n *Node) handleCheckpoint(check *storageSync.Checkpoint, maxParallelRequests uint) (cpStatus int, rerr error) {
func (n *Node) handleCheckpoint(check *checkpointsync.Checkpoint, maxParallelRequests uint) (cpStatus int, rerr error) {
if err := n.localStorage.Checkpointer().StartRestore(n.ctx, check.Metadata); err != nil {
// Any previous restores were already aborted by the driver up the call stack, so
// things should have been going smoothly here; bail.
Expand Down Expand Up @@ -276,11 +283,11 @@ func (n *Node) handleCheckpoint(check *storageSync.Checkpoint, maxParallelReques
}
}

func (n *Node) getCheckpointList() ([]*storageSync.Checkpoint, error) {
func (n *Node) getCheckpointList() ([]*checkpointsync.Checkpoint, error) {
ctx, cancel := context.WithTimeout(n.ctx, cpListsTimeout)
defer cancel()

list, err := n.storageSync.GetCheckpoints(ctx, &storageSync.GetCheckpointsRequest{
list, err := n.checkpointSync.GetCheckpoints(ctx, &checkpointsync.GetCheckpointsRequest{
Version: 1,
})
if err != nil {
Expand All @@ -297,8 +304,8 @@ func (n *Node) getCheckpointList() ([]*storageSync.Checkpoint, error) {
}

// sortCheckpoints sorts the slice in-place (descending by version, peers, hash).
func sortCheckpoints(s []*storageSync.Checkpoint) {
slices.SortFunc(s, func(a, b *storageSync.Checkpoint) int {
func sortCheckpoints(s []*checkpointsync.Checkpoint) {
slices.SortFunc(s, func(a, b *checkpointsync.Checkpoint) int {
return cmp.Or(
cmp.Compare(b.Root.Version, a.Root.Version),
cmp.Compare(len(b.Peers), len(a.Peers)),
Expand All @@ -307,7 +314,7 @@ func sortCheckpoints(s []*storageSync.Checkpoint) {
})
}

func (n *Node) checkCheckpointUsable(cp *storageSync.Checkpoint, remainingMask outstandingMask, genesisRound uint64) bool {
func (n *Node) checkCheckpointUsable(cp *checkpointsync.Checkpoint, remainingMask outstandingMask, genesisRound uint64) bool {
namespace := n.commonNode.Runtime.ID()
if !namespace.Equal(&cp.Root.Namespace) {
// Not for the right runtime.
Expand Down Expand Up @@ -357,7 +364,7 @@ func (n *Node) syncCheckpoints(genesisRound uint64, wantOnlyGenesis bool) (*bloc

// If we only want the genesis checkpoint, filter it out.
if wantOnlyGenesis && len(cps) > 0 {
var filteredCps []*storageSync.Checkpoint
var filteredCps []*checkpointsync.Checkpoint
for _, cp := range cps {
if cp.Root.Version == genesisRound {
filteredCps = append(filteredCps, cp)
Expand Down
14 changes: 7 additions & 7 deletions go/worker/storage/committee/checkpoint_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,35 +8,35 @@ import (
"github.com/oasisprotocol/oasis-core/go/p2p/rpc"
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint"
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/node"
"github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/sync"
"github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/checkpointsync"
)

func TestSortCheckpoints(t *testing.T) {
cp1 := &sync.Checkpoint{
cp1 := &checkpointsync.Checkpoint{
Metadata: &checkpoint.Metadata{
Root: node.Root{
Version: 2,
},
},
Peers: []rpc.PeerFeedback{rpc.NewNopPeerFeedback(), rpc.NewNopPeerFeedback()},
}
cp2 := &sync.Checkpoint{
cp2 := &checkpointsync.Checkpoint{
Metadata: &checkpoint.Metadata{
Root: node.Root{
Version: 2,
},
},
Peers: []rpc.PeerFeedback{rpc.NewNopPeerFeedback()},
}
cp3 := &sync.Checkpoint{
cp3 := &checkpointsync.Checkpoint{
Metadata: &checkpoint.Metadata{
Root: node.Root{
Version: 1,
},
},
Peers: []rpc.PeerFeedback{rpc.NewNopPeerFeedback(), rpc.NewNopPeerFeedback()},
}
cp4 := &sync.Checkpoint{
cp4 := &checkpointsync.Checkpoint{
Metadata: &checkpoint.Metadata{
Root: node.Root{
Version: 1,
Expand All @@ -45,9 +45,9 @@ func TestSortCheckpoints(t *testing.T) {
Peers: []rpc.PeerFeedback{rpc.NewNopPeerFeedback()},
}

s := []*sync.Checkpoint{cp2, cp3, cp4, cp1}
s := []*checkpointsync.Checkpoint{cp2, cp3, cp4, cp1}

sortCheckpoints(s)

assert.Equal(t, s, []*sync.Checkpoint{cp1, cp2, cp3, cp4})
assert.Equal(t, s, []*checkpointsync.Checkpoint{cp1, cp2, cp3, cp4})
}
Loading