diff --git a/.changelog/5751.feature.md b/.changelog/5751.feature.md new file mode 100644 index 00000000000..766413d7ce9 --- /dev/null +++ b/.changelog/5751.feature.md @@ -0,0 +1,12 @@ +go: Split storage sync p2p protocol + +Storage sync protocol was split into two independent protocols (checkpoint +and diff sync). + +This change was made since there may be fewer nodes that expose checkpoints +than storage diff. Previously, this could lead to issues with state sync +when a node was connected with peers that supported storage sync protocol +but had no checkpoints available. + +This was done in backwards compatible manner, so that both protocols are still +advertised and used. Eventually, we plan to remove legacy protocol. diff --git a/.changelog/6261.internal.md b/.changelog/6261.internal.md new file mode 100644 index 00000000000..3991e71fe89 --- /dev/null +++ b/.changelog/6261.internal.md @@ -0,0 +1,5 @@ +go/worker/storage: Remove legacy storage sync p2p protocol + +The node will now only server new checkpoint and diff sync +p2p protocols. This is backward compatible since legacy +clients still serve both legacy and new protocols. diff --git a/go/oasis-node/cmd/debug/byzantine/node.go b/go/oasis-node/cmd/debug/byzantine/node.go index 447def2d766..416f6d4acb8 100644 --- a/go/oasis-node/cmd/debug/byzantine/node.go +++ b/go/oasis-node/cmd/debug/byzantine/node.go @@ -22,7 +22,7 @@ import ( scheduler "github.com/oasisprotocol/oasis-core/go/scheduler/api" storage "github.com/oasisprotocol/oasis-core/go/storage/api" "github.com/oasisprotocol/oasis-core/go/worker/client" - storageP2P "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/sync" + "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/checkpointsync" ) type byzantine struct { @@ -154,7 +154,7 @@ func initializeAndRegisterByzantineNode( if err != nil { return nil, fmt.Errorf("initializing storage node failed: %w", err) } - b.p2p.service.RegisterProtocolServer(storageP2P.NewServer(b.chainContext, b.runtimeID, storage)) + b.p2p.service.RegisterProtocolServer(checkpointsync.NewServer(b.chainContext, b.runtimeID, storage)) b.storage = storage // Wait for activation epoch. diff --git a/go/oasis-test-runner/oasis/compute.go b/go/oasis-test-runner/oasis/compute.go index 2bebd20a12a..b6d62043a35 100644 --- a/go/oasis-test-runner/oasis/compute.go +++ b/go/oasis-test-runner/oasis/compute.go @@ -46,6 +46,7 @@ type Compute struct { // nolint: maligned storageBackend string disablePublicRPC bool checkpointSyncDisabled bool + legacySyncServerDisabled bool checkpointCheckInterval time.Duration checkpointParallelChunker bool } @@ -64,6 +65,7 @@ type ComputeCfg struct { StorageBackend string DisablePublicRPC bool CheckpointSyncDisabled bool + LegacySyncServerDisabled bool CheckpointCheckInterval time.Duration CheckpointParallelChunker bool } @@ -170,6 +172,7 @@ func (worker *Compute) ModifyConfig() error { worker.Config.Storage.Backend = worker.storageBackend worker.Config.Storage.PublicRPCEnabled = !worker.disablePublicRPC worker.Config.Storage.CheckpointSyncDisabled = worker.checkpointSyncDisabled + worker.Config.Storage.LegacySyncServerDisabled = worker.legacySyncServerDisabled worker.Config.Storage.Checkpointer.Enabled = true worker.Config.Storage.Checkpointer.CheckInterval = worker.checkpointCheckInterval worker.Config.Storage.Checkpointer.ParallelChunker = worker.checkpointParallelChunker @@ -236,6 +239,7 @@ func (net *Network) NewCompute(cfg *ComputeCfg) (*Compute, error) { sentryIndices: cfg.SentryIndices, disablePublicRPC: cfg.DisablePublicRPC, checkpointSyncDisabled: cfg.CheckpointSyncDisabled, + legacySyncServerDisabled: cfg.LegacySyncServerDisabled, checkpointCheckInterval: cfg.CheckpointCheckInterval, checkpointParallelChunker: cfg.CheckpointParallelChunker, sentryPubKey: sentryPubKey, diff --git a/go/oasis-test-runner/oasis/fixture.go b/go/oasis-test-runner/oasis/fixture.go index fd358f787cb..597f2a948dd 100644 --- a/go/oasis-test-runner/oasis/fixture.go +++ b/go/oasis-test-runner/oasis/fixture.go @@ -411,6 +411,7 @@ type ComputeWorkerFixture struct { CheckpointCheckInterval time.Duration `json:"checkpoint_check_interval,omitempty"` CheckpointSyncEnabled bool `json:"checkpoint_sync_enabled,omitempty"` + LegacySyncServerDisabled bool `json:"legacy_sync_server_disabled,omitempty"` CheckpointParallelChunker bool `json:"checkpoint_parallel_chunker,omitempty"` // Runtimes contains the indexes of the runtimes to enable. @@ -449,12 +450,13 @@ func (f *ComputeWorkerFixture) Create(net *Network) (*Compute, error) { CheckpointParallelChunker: f.CheckpointParallelChunker, // The checkpoint syncing flag is intentionally flipped here. // Syncing should normally be enabled, but normally disabled in tests. - CheckpointSyncDisabled: !f.CheckpointSyncEnabled, - DisablePublicRPC: f.DisablePublicRPC, - Runtimes: f.Runtimes, - RuntimeConfig: f.RuntimeConfig, - RuntimeProvisioner: f.RuntimeProvisioner, - RuntimeStatePaths: f.RuntimeStatePaths, + CheckpointSyncDisabled: !f.CheckpointSyncEnabled, + LegacySyncServerDisabled: f.LegacySyncServerDisabled, + DisablePublicRPC: f.DisablePublicRPC, + Runtimes: f.Runtimes, + RuntimeConfig: f.RuntimeConfig, + RuntimeProvisioner: f.RuntimeProvisioner, + RuntimeStatePaths: f.RuntimeStatePaths, }) } diff --git a/go/p2p/rpc/client.go b/go/p2p/rpc/client.go index 888045ad471..e471db89092 100644 --- a/go/p2p/rpc/client.go +++ b/go/p2p/rpc/client.go @@ -364,6 +364,10 @@ func (c *client) CallMulti( ) ([]any, []PeerFeedback, error) { c.logger.Debug("call multiple", "method", method) + if len(peers) == 0 { + return nil, nil, fmt.Errorf("no peers given to service the request") + } + co := NewCallMultiOptions(opts...) // Prepare the request. diff --git a/go/worker/common/p2p/txsync/client.go b/go/worker/common/p2p/txsync/client.go index 54d69f6242c..f293986b123 100644 --- a/go/worker/common/p2p/txsync/client.go +++ b/go/worker/common/p2p/txsync/client.go @@ -5,7 +5,6 @@ import ( "github.com/oasisprotocol/oasis-core/go/common" "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" - "github.com/oasisprotocol/oasis-core/go/p2p/protocol" "github.com/oasisprotocol/oasis-core/go/p2p/rpc" ) @@ -82,7 +81,7 @@ func (c *client) GetTxs(ctx context.Context, request *GetTxsRequest) (*GetTxsRes // NewClient creates a new transaction sync protocol client. func NewClient(p2p rpc.P2P, chainContext string, runtimeID common.Namespace) Client { - pid := protocol.NewRuntimeProtocolID(chainContext, runtimeID, TxSyncProtocolID, TxSyncProtocolVersion) + pid := ProtocolID(chainContext, runtimeID) mgr := rpc.NewPeerManager(p2p, pid) rc := rpc.NewClient(p2p.Host(), pid) rc.RegisterListener(mgr) diff --git a/go/worker/common/p2p/txsync/protocol.go b/go/worker/common/p2p/txsync/protocol.go index e5aef20bdd7..5d1e05f9676 100644 --- a/go/worker/common/p2p/txsync/protocol.go +++ b/go/worker/common/p2p/txsync/protocol.go @@ -47,7 +47,7 @@ func init() { protocols := make([]core.ProtocolID, len(n.Runtimes)) for i, rt := range n.Runtimes { - protocols[i] = protocol.NewRuntimeProtocolID(chainContext, rt.ID, TxSyncProtocolID, TxSyncProtocolVersion) + protocols[i] = ProtocolID(chainContext, rt.ID) } return protocols diff --git a/go/worker/keymanager/p2p/client.go b/go/worker/keymanager/p2p/client.go index b7972d1168e..0eb0a3194ca 100644 --- a/go/worker/keymanager/p2p/client.go +++ b/go/worker/keymanager/p2p/client.go @@ -7,7 +7,6 @@ import ( "github.com/oasisprotocol/oasis-core/go/common" p2p "github.com/oasisprotocol/oasis-core/go/p2p/api" - "github.com/oasisprotocol/oasis-core/go/p2p/protocol" "github.com/oasisprotocol/oasis-core/go/p2p/rpc" ) @@ -46,7 +45,7 @@ func (c *client) CallEnclave(ctx context.Context, request *CallEnclaveRequest, p // NewClient creates a new keymanager protocol client. func NewClient(p2p p2p.Service, chainContext string, keymanagerID common.Namespace) Client { - pid := protocol.NewRuntimeProtocolID(chainContext, keymanagerID, KeyManagerProtocolID, KeyManagerProtocolVersion) + pid := ProtocolID(chainContext, keymanagerID) mgr := rpc.NewPeerManager(p2p, pid) rc := rpc.NewClient(p2p.Host(), pid) rc.RegisterListener(mgr) diff --git a/go/worker/keymanager/p2p/protocol.go b/go/worker/keymanager/p2p/protocol.go index f7dfda7f407..21d7dd383f0 100644 --- a/go/worker/keymanager/p2p/protocol.go +++ b/go/worker/keymanager/p2p/protocol.go @@ -50,7 +50,7 @@ func init() { protocols := make([]core.ProtocolID, len(n.Runtimes)) for i, rt := range n.Runtimes { - protocols[i] = protocol.NewRuntimeProtocolID(chainContext, rt.ID, KeyManagerProtocolID, KeyManagerProtocolVersion) + protocols[i] = ProtocolID(chainContext, rt.ID) } return protocols diff --git a/go/worker/storage/committee/checkpoint_sync.go b/go/worker/storage/committee/checkpoint_sync.go index c9236f4cada..ac2ccaa306b 100644 --- a/go/worker/storage/committee/checkpoint_sync.go +++ b/go/worker/storage/committee/checkpoint_sync.go @@ -13,7 +13,7 @@ import ( storageApi "github.com/oasisprotocol/oasis-core/go/storage/api" "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" - storageSync "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/sync" + "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/checkpointsync" ) const ( @@ -55,7 +55,7 @@ type chunk struct { *checkpoint.ChunkMetadata // checkpoint points to the checkpoint this chunk originated from. - checkpoint *storageSync.Checkpoint + checkpoint *checkpointsync.Checkpoint } type chunkHeap struct { @@ -101,12 +101,19 @@ func (n *Node) checkpointChunkFetcher( defer cancel() // Fetch chunk from peers. - rsp, pf, err := n.storageSync.GetCheckpointChunk(chunkCtx, &storageSync.GetCheckpointChunkRequest{ - Version: chunk.Version, - Root: chunk.Root, - Index: chunk.Index, - Digest: chunk.Digest, - }, chunk.checkpoint) + rsp, pf, err := n.checkpointSync.GetCheckpointChunk( + chunkCtx, + &checkpointsync.GetCheckpointChunkRequest{ + Version: chunk.Version, + Root: chunk.Root, + Index: chunk.Index, + Digest: chunk.Digest, + }, + &checkpointsync.Checkpoint{ + Metadata: chunk.checkpoint.Metadata, + Peers: chunk.checkpoint.Peers, + }, + ) if err != nil { n.logger.Error("failed to fetch chunk from peers", "err", err, @@ -157,7 +164,7 @@ func (n *Node) checkpointChunkFetcher( } } -func (n *Node) handleCheckpoint(check *storageSync.Checkpoint, maxParallelRequests uint) (cpStatus int, rerr error) { +func (n *Node) handleCheckpoint(check *checkpointsync.Checkpoint, maxParallelRequests uint) (cpStatus int, rerr error) { if err := n.localStorage.Checkpointer().StartRestore(n.ctx, check.Metadata); err != nil { // Any previous restores were already aborted by the driver up the call stack, so // things should have been going smoothly here; bail. @@ -276,11 +283,11 @@ func (n *Node) handleCheckpoint(check *storageSync.Checkpoint, maxParallelReques } } -func (n *Node) getCheckpointList() ([]*storageSync.Checkpoint, error) { +func (n *Node) getCheckpointList() ([]*checkpointsync.Checkpoint, error) { ctx, cancel := context.WithTimeout(n.ctx, cpListsTimeout) defer cancel() - list, err := n.storageSync.GetCheckpoints(ctx, &storageSync.GetCheckpointsRequest{ + list, err := n.checkpointSync.GetCheckpoints(ctx, &checkpointsync.GetCheckpointsRequest{ Version: 1, }) if err != nil { @@ -297,8 +304,8 @@ func (n *Node) getCheckpointList() ([]*storageSync.Checkpoint, error) { } // sortCheckpoints sorts the slice in-place (descending by version, peers, hash). -func sortCheckpoints(s []*storageSync.Checkpoint) { - slices.SortFunc(s, func(a, b *storageSync.Checkpoint) int { +func sortCheckpoints(s []*checkpointsync.Checkpoint) { + slices.SortFunc(s, func(a, b *checkpointsync.Checkpoint) int { return cmp.Or( cmp.Compare(b.Root.Version, a.Root.Version), cmp.Compare(len(b.Peers), len(a.Peers)), @@ -307,7 +314,7 @@ func sortCheckpoints(s []*storageSync.Checkpoint) { }) } -func (n *Node) checkCheckpointUsable(cp *storageSync.Checkpoint, remainingMask outstandingMask, genesisRound uint64) bool { +func (n *Node) checkCheckpointUsable(cp *checkpointsync.Checkpoint, remainingMask outstandingMask, genesisRound uint64) bool { namespace := n.commonNode.Runtime.ID() if !namespace.Equal(&cp.Root.Namespace) { // Not for the right runtime. @@ -357,7 +364,7 @@ func (n *Node) syncCheckpoints(genesisRound uint64, wantOnlyGenesis bool) (*bloc // If we only want the genesis checkpoint, filter it out. if wantOnlyGenesis && len(cps) > 0 { - var filteredCps []*storageSync.Checkpoint + var filteredCps []*checkpointsync.Checkpoint for _, cp := range cps { if cp.Root.Version == genesisRound { filteredCps = append(filteredCps, cp) diff --git a/go/worker/storage/committee/checkpoint_sync_test.go b/go/worker/storage/committee/checkpoint_sync_test.go index 2e0ff2c206d..d39e50f3239 100644 --- a/go/worker/storage/committee/checkpoint_sync_test.go +++ b/go/worker/storage/committee/checkpoint_sync_test.go @@ -8,11 +8,11 @@ import ( "github.com/oasisprotocol/oasis-core/go/p2p/rpc" "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" "github.com/oasisprotocol/oasis-core/go/storage/mkvs/node" - "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/sync" + "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/checkpointsync" ) func TestSortCheckpoints(t *testing.T) { - cp1 := &sync.Checkpoint{ + cp1 := &checkpointsync.Checkpoint{ Metadata: &checkpoint.Metadata{ Root: node.Root{ Version: 2, @@ -20,7 +20,7 @@ func TestSortCheckpoints(t *testing.T) { }, Peers: []rpc.PeerFeedback{rpc.NewNopPeerFeedback(), rpc.NewNopPeerFeedback()}, } - cp2 := &sync.Checkpoint{ + cp2 := &checkpointsync.Checkpoint{ Metadata: &checkpoint.Metadata{ Root: node.Root{ Version: 2, @@ -28,7 +28,7 @@ func TestSortCheckpoints(t *testing.T) { }, Peers: []rpc.PeerFeedback{rpc.NewNopPeerFeedback()}, } - cp3 := &sync.Checkpoint{ + cp3 := &checkpointsync.Checkpoint{ Metadata: &checkpoint.Metadata{ Root: node.Root{ Version: 1, @@ -36,7 +36,7 @@ func TestSortCheckpoints(t *testing.T) { }, Peers: []rpc.PeerFeedback{rpc.NewNopPeerFeedback(), rpc.NewNopPeerFeedback()}, } - cp4 := &sync.Checkpoint{ + cp4 := &checkpointsync.Checkpoint{ Metadata: &checkpoint.Metadata{ Root: node.Root{ Version: 1, @@ -45,9 +45,9 @@ func TestSortCheckpoints(t *testing.T) { Peers: []rpc.PeerFeedback{rpc.NewNopPeerFeedback()}, } - s := []*sync.Checkpoint{cp2, cp3, cp4, cp1} + s := []*checkpointsync.Checkpoint{cp2, cp3, cp4, cp1} sortCheckpoints(s) - assert.Equal(t, s, []*sync.Checkpoint{cp1, cp2, cp3, cp4}) + assert.Equal(t, s, []*checkpointsync.Checkpoint{cp1, cp2, cp3, cp4}) } diff --git a/go/worker/storage/committee/node.go b/go/worker/storage/committee/node.go index 9832cb98648..fae4adc8701 100644 --- a/go/worker/storage/committee/node.go +++ b/go/worker/storage/committee/node.go @@ -32,8 +32,9 @@ import ( "github.com/oasisprotocol/oasis-core/go/worker/common/committee" "github.com/oasisprotocol/oasis-core/go/worker/registration" "github.com/oasisprotocol/oasis-core/go/worker/storage/api" + "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/checkpointsync" + "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/diffsync" storagePub "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/pub" - storageSync "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/sync" ) var ( @@ -128,7 +129,8 @@ type Node struct { // nolint: maligned localStorage storageApi.LocalBackend - storageSync storageSync.Client + diffSync diffsync.Client + checkpointSync checkpointsync.Client undefinedRound uint64 @@ -208,8 +210,36 @@ func NewNode( n.ctx, n.ctxCancel = context.WithCancel(context.Background()) - // Create a new checkpointer. Always create a checkpointer, even if checkpointing is disabled - // in configuration so we can ensure that the genesis checkpoint is available. + // Create a checkpointer (even if checkpointing is disabled) to ensure the genesis checkpoint is available. + checkpointer, err := n.createCheckpointer(commonNode, localStorage) + if err != nil { + return nil, fmt.Errorf("failed to create checkpointer: %w", err) + } + n.checkpointer = checkpointer + + // Register prune handler. + commonNode.Runtime.History().Pruner().RegisterHandler(&pruneHandler{ + logger: n.logger, + node: n, + }) + + // Advertise and serve p2p protocols. + commonNode.P2P.RegisterProtocolServer(diffsync.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) + if config.GlobalConfig.Storage.Checkpointer.Enabled { + commonNode.P2P.RegisterProtocolServer(checkpointsync.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) + } + if rpcRoleProvider != nil { + commonNode.P2P.RegisterProtocolServer(storagePub.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) + } + + // Create p2p protocol clients. + n.diffSync = diffsync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) + n.checkpointSync = checkpointsync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) + + return n, nil +} + +func (n *Node) createCheckpointer(commonNode *committee.Node, localStorage storageApi.LocalBackend) (checkpoint.Checkpointer, error) { checkInterval := checkpoint.CheckIntervalDisabled if config.GlobalConfig.Storage.Checkpointer.Enabled { checkInterval = config.GlobalConfig.Storage.Checkpointer.CheckInterval @@ -255,33 +285,13 @@ func NewNode( return blk.Header.StorageRoots(), nil }, } - var err error - n.checkpointer, err = checkpoint.NewCheckpointer( + + return checkpoint.NewCheckpointer( n.ctx, localStorage.NodeDB(), localStorage.Checkpointer(), checkpointerCfg, ) - if err != nil { - return nil, fmt.Errorf("failed to create checkpointer: %w", err) - } - - // Register prune handler. - commonNode.Runtime.History().Pruner().RegisterHandler(&pruneHandler{ - logger: n.logger, - node: n, - }) - - // Register storage sync service. - commonNode.P2P.RegisterProtocolServer(storageSync.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) - n.storageSync = storageSync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) - - // Register storage pub service if configured. - if rpcRoleProvider != nil { - commonNode.P2P.RegisterProtocolServer(storagePub.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) - } - - return n, nil } // Service interface. @@ -430,7 +440,7 @@ func (n *Node) fetchDiff(round uint64, prevRoot, thisRoot storageApi.Root) { ctx, cancel := context.WithCancel(n.ctx) defer cancel() - rsp, pf, err := n.storageSync.GetDiff(ctx, &storageSync.GetDiffRequest{StartRoot: prevRoot, EndRoot: thisRoot}) + rsp, pf, err := n.diffSync.GetDiff(ctx, &diffsync.GetDiffRequest{StartRoot: prevRoot, EndRoot: thisRoot}) if err != nil { result.err = err return diff --git a/go/worker/storage/config/config.go b/go/worker/storage/config/config.go index d4f2c54aed9..77846089d04 100644 --- a/go/worker/storage/config/config.go +++ b/go/worker/storage/config/config.go @@ -20,6 +20,8 @@ type Config struct { PublicRPCEnabled bool `yaml:"public_rpc_enabled,omitempty"` // Disable initial storage sync from checkpoints. CheckpointSyncDisabled bool `yaml:"checkpoint_sync_disabled,omitempty"` + // Disable serving legacy storage sync p2p protocol. + LegacySyncServerDisabled bool `yaml:"legacy_sync_server_disabled,omitempty"` // Storage checkpointer configuration. Checkpointer CheckpointerConfig `yaml:"checkpointer,omitempty"` @@ -47,11 +49,12 @@ func (c *Config) Validate() error { // DefaultConfig returns the default configuration settings. func DefaultConfig() Config { return Config{ - Backend: "auto", - MaxCacheSize: "64mb", - FetcherCount: 4, - PublicRPCEnabled: false, - CheckpointSyncDisabled: false, + Backend: "auto", + MaxCacheSize: "64mb", + FetcherCount: 4, + PublicRPCEnabled: false, + CheckpointSyncDisabled: false, + LegacySyncServerDisabled: false, Checkpointer: CheckpointerConfig{ Enabled: false, CheckInterval: 1 * time.Minute, diff --git a/go/worker/storage/p2p/sync/client.go b/go/worker/storage/p2p/checkpointsync/client.go similarity index 62% rename from go/worker/storage/p2p/sync/client.go rename to go/worker/storage/p2p/checkpointsync/client.go index fdb8ed7379d..e364a8a54d7 100644 --- a/go/worker/storage/p2p/sync/client.go +++ b/go/worker/storage/p2p/checkpointsync/client.go @@ -1,4 +1,4 @@ -package sync +package checkpointsync import ( "context" @@ -7,26 +7,21 @@ import ( "github.com/oasisprotocol/oasis-core/go/common" "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" - "github.com/oasisprotocol/oasis-core/go/p2p/protocol" "github.com/oasisprotocol/oasis-core/go/p2p/rpc" "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" ) const ( // minProtocolPeers is the minimum number of peers from the registry we want to have connected - // for StorageSync protocol. + // for checkpoint sync protocol. minProtocolPeers = 5 - // totalProtocolPeers is the number of peers we want to have connected for StorageSync protocol. + // totalProtocolPeers is the number of peers we want to have connected for checkpoint sync protocol. totalProtocolPeers = 10 ) -// Client is a storage sync protocol client. +// Client is a checkpoint sync protocol client. type Client interface { - // GetDiff requests a write log of entries that must be applied to get from the first given root - // to the second one. - GetDiff(ctx context.Context, request *GetDiffRequest) (*GetDiffResponse, rpc.PeerFeedback, error) - // GetCheckpoints returns a list of checkpoint metadata for all known checkpoints. GetCheckpoints(ctx context.Context, request *GetCheckpointsRequest) ([]*Checkpoint, error) @@ -47,26 +42,13 @@ type Checkpoint struct { } type client struct { - rcC rpc.Client - rcD rpc.Client - mgrC rpc.PeerManager - mgrD rpc.PeerManager -} - -func (c *client) GetDiff(ctx context.Context, request *GetDiffRequest) (*GetDiffResponse, rpc.PeerFeedback, error) { - var rsp GetDiffResponse - pf, err := c.rcD.CallOne(ctx, c.mgrD.GetBestPeers(), MethodGetDiff, request, &rsp, - rpc.WithMaxPeerResponseTime(MaxGetDiffResponseTime), - ) - if err != nil { - return nil, nil, err - } - return &rsp, pf, nil + rc rpc.Client + mgr rpc.PeerManager } func (c *client) GetCheckpoints(ctx context.Context, request *GetCheckpointsRequest) ([]*Checkpoint, error) { var rsp GetCheckpointsResponse - rsps, pfs, err := c.rcC.CallMulti(ctx, c.mgrC.GetBestPeers(), MethodGetCheckpoints, request, rsp) + rsps, pfs, err := c.rc.CallMulti(ctx, c.mgr.GetBestPeers(), MethodGetCheckpoints, request, rsp) if err != nil { return nil, err } @@ -115,7 +97,7 @@ func (c *client) GetCheckpointChunk( } var rsp GetCheckpointChunkResponse - pf, err := c.rcC.CallOne(ctx, c.mgrC.GetBestPeers(opts...), MethodGetCheckpointChunk, request, &rsp, + pf, err := c.rc.CallOne(ctx, c.mgr.GetBestPeers(opts...), MethodGetCheckpointChunk, request, &rsp, rpc.WithMaxPeerResponseTime(MaxGetCheckpointChunkResponseTime), ) if err != nil { @@ -124,27 +106,19 @@ func (c *client) GetCheckpointChunk( return &rsp, pf, nil } -// NewClient creates a new storage sync protocol client. +// NewClient creates a new checkpoint sync protocol client. +// +// Moreover, it ensures underlying p2p service starts tracking protocol peers. func NewClient(p2p rpc.P2P, chainContext string, runtimeID common.Namespace) Client { - // Use two separate clients and managers for the same protocol. This is to make sure that peers - // are scored differently between the two use cases (syncing diffs vs. syncing checkpoints). We - // could consider separating this into two protocols in the future. - pid := protocol.NewRuntimeProtocolID(chainContext, runtimeID, StorageSyncProtocolID, StorageSyncProtocolVersion) - - rcC := rpc.NewClient(p2p.Host(), pid) - mgrC := rpc.NewPeerManager(p2p, pid) - rcC.RegisterListener(mgrC) - - rcD := rpc.NewClient(p2p.Host(), pid) - mgrD := rpc.NewPeerManager(p2p, pid) - rcD.RegisterListener(mgrD) + pid := ProtocolID(chainContext, runtimeID) + rc := rpc.NewClient(p2p.Host(), pid) + mgr := rpc.NewPeerManager(p2p, pid) + rc.RegisterListener(mgr) p2p.RegisterProtocol(pid, minProtocolPeers, totalProtocolPeers) return &client{ - rcC: rcC, - rcD: rcD, - mgrC: mgrC, - mgrD: mgrD, + rc: rc, + mgr: mgr, } } diff --git a/go/worker/storage/p2p/sync/protocol.go b/go/worker/storage/p2p/checkpointsync/protocol.go similarity index 56% rename from go/worker/storage/p2p/sync/protocol.go rename to go/worker/storage/p2p/checkpointsync/protocol.go index 7dca895e014..2b07d38e917 100644 --- a/go/worker/storage/p2p/sync/protocol.go +++ b/go/worker/storage/p2p/checkpointsync/protocol.go @@ -1,4 +1,6 @@ -package sync +// Package checkpointsync defines wire protocol together with client/server +// implementations for the checkpoint sync protocol, used for runtime state sync. +package checkpointsync import ( "time" @@ -11,36 +13,19 @@ import ( "github.com/oasisprotocol/oasis-core/go/common/version" "github.com/oasisprotocol/oasis-core/go/p2p/peermgmt" "github.com/oasisprotocol/oasis-core/go/p2p/protocol" - storage "github.com/oasisprotocol/oasis-core/go/storage/api" + "github.com/oasisprotocol/oasis-core/go/storage/api" "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" ) -// StorageSyncProtocolID is a unique protocol identifier for the storage sync protocol. -const StorageSyncProtocolID = "storagesync" +// CheckpointSyncProtocolID is a unique protocol identifier for the checkpoint sync protocol. +const CheckpointSyncProtocolID = "checkpointsync" -// StorageSyncProtocolVersion is the supported version of the storage sync protocol. -var StorageSyncProtocolVersion = version.Version{Major: 2, Minor: 0, Patch: 0} +// CheckpointSyncProtocolVersion is the supported version of the checkpoint sync protocol. +var CheckpointSyncProtocolVersion = version.Version{Major: 1, Minor: 0, Patch: 0} -// ProtocolID returns the runtime storage sync protocol ID. +// ProtocolID returns the runtime checkpoint sync protocol ID. func ProtocolID(chainContext string, runtimeID common.Namespace) core.ProtocolID { - return protocol.NewRuntimeProtocolID(chainContext, runtimeID, StorageSyncProtocolID, StorageSyncProtocolVersion) -} - -// Constants related to the GetDiff method. -const ( - MethodGetDiff = "GetDiff" - MaxGetDiffResponseTime = 15 * time.Second -) - -// GetDiffRequest is a GetDiff request. -type GetDiffRequest struct { - StartRoot storage.Root `json:"start_root"` - EndRoot storage.Root `json:"end_root"` -} - -// GetDiffResponse is a response to a GetDiff request. -type GetDiffResponse struct { - WriteLog storage.WriteLog `json:"write_log,omitempty"` + return protocol.NewRuntimeProtocolID(chainContext, runtimeID, CheckpointSyncProtocolID, CheckpointSyncProtocolVersion) } // Constants related to the GetCheckpoints method. @@ -61,15 +46,15 @@ type GetCheckpointsResponse struct { // Constants related to the GetCheckpointChunk method. const ( MethodGetCheckpointChunk = "GetCheckpointChunk" - MaxGetCheckpointChunkResponseTime = 60 * time.Second + MaxGetCheckpointChunkResponseTime = time.Minute ) // GetCheckpointChunkRequest is a GetCheckpointChunk request. type GetCheckpointChunkRequest struct { - Version uint16 `json:"version"` - Root storage.Root `json:"root"` - Index uint64 `json:"index"` - Digest hash.Hash `json:"digest"` + Version uint16 `json:"version"` + Root api.Root `json:"root"` + Index uint64 `json:"index"` + Digest hash.Hash `json:"digest"` } // GetCheckpointChunkResponse is a response to a GetCheckpointChunk request. @@ -86,7 +71,7 @@ func init() { protocols := make([]core.ProtocolID, len(n.Runtimes)) for i, rt := range n.Runtimes { - protocols[i] = protocol.NewRuntimeProtocolID(chainContext, rt.ID, StorageSyncProtocolID, StorageSyncProtocolVersion) + protocols[i] = ProtocolID(chainContext, rt.ID) } return protocols diff --git a/go/worker/storage/p2p/sync/server.go b/go/worker/storage/p2p/checkpointsync/server.go similarity index 65% rename from go/worker/storage/p2p/sync/server.go rename to go/worker/storage/p2p/checkpointsync/server.go index 22b6731465a..2df4d0f68db 100644 --- a/go/worker/storage/p2p/sync/server.go +++ b/go/worker/storage/p2p/checkpointsync/server.go @@ -1,4 +1,4 @@ -package sync +package checkpointsync import ( "bytes" @@ -7,23 +7,16 @@ import ( "github.com/oasisprotocol/oasis-core/go/common" "github.com/oasisprotocol/oasis-core/go/common/cbor" "github.com/oasisprotocol/oasis-core/go/p2p/rpc" - storage "github.com/oasisprotocol/oasis-core/go/storage/api" + "github.com/oasisprotocol/oasis-core/go/storage/api" "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" ) type service struct { - backend storage.Backend + backend api.Backend } func (s *service) HandleRequest(ctx context.Context, method string, body cbor.RawMessage) (any, error) { switch method { - case MethodGetDiff: - var rq GetDiffRequest - if err := cbor.Unmarshal(body, &rq); err != nil { - return nil, rpc.ErrBadRequest - } - - return s.handleGetDiff(ctx, &rq) case MethodGetCheckpoints: var rq GetCheckpointsRequest if err := cbor.Unmarshal(body, &rq); err != nil { @@ -43,34 +36,6 @@ func (s *service) HandleRequest(ctx context.Context, method string, body cbor.Ra } } -func (s *service) handleGetDiff(ctx context.Context, request *GetDiffRequest) (*GetDiffResponse, error) { - it, err := s.backend.GetDiff(ctx, &storage.GetDiffRequest{ - StartRoot: request.StartRoot, - EndRoot: request.EndRoot, - }) - if err != nil { - return nil, err - } - - var rsp GetDiffResponse - for { - more, err := it.Next() - if err != nil { - return nil, err - } - if !more { - break - } - - chunk, err := it.Value() - if err != nil { - return nil, err - } - rsp.WriteLog = append(rsp.WriteLog, chunk) - } - return &rsp, nil -} - func (s *service) handleGetCheckpoints(ctx context.Context, request *GetCheckpointsRequest) (*GetCheckpointsResponse, error) { cps, err := s.backend.GetCheckpoints(ctx, &checkpoint.GetCheckpointsRequest{ Version: request.Version, @@ -85,7 +50,7 @@ func (s *service) handleGetCheckpoints(ctx context.Context, request *GetCheckpoi } func (s *service) handleGetCheckpointChunk(ctx context.Context, request *GetCheckpointChunkRequest) (*GetCheckpointChunkResponse, error) { - // TODO: Use stream resource manager to track buffer use. + // Consider using stream resource manager to track buffer use. var buf bytes.Buffer err := s.backend.GetCheckpointChunk(ctx, &checkpoint.ChunkMetadata{ Version: request.Version, @@ -102,7 +67,7 @@ func (s *service) handleGetCheckpointChunk(ctx context.Context, request *GetChec }, nil } -// NewServer creates a new storage sync protocol server. -func NewServer(chainContext string, runtimeID common.Namespace, backend storage.Backend) rpc.Server { +// NewServer creates a new checkpoint sync protocol server. +func NewServer(chainContext string, runtimeID common.Namespace, backend api.Backend) rpc.Server { return rpc.NewServer(ProtocolID(chainContext, runtimeID), &service{backend}) } diff --git a/go/worker/storage/p2p/diffsync/client.go b/go/worker/storage/p2p/diffsync/client.go new file mode 100644 index 00000000000..4c0363fe21d --- /dev/null +++ b/go/worker/storage/p2p/diffsync/client.go @@ -0,0 +1,57 @@ +package diffsync + +import ( + "context" + + "github.com/oasisprotocol/oasis-core/go/common" + "github.com/oasisprotocol/oasis-core/go/p2p/rpc" +) + +const ( + // minProtocolPeers is the minimum number of peers from the registry we want to have connected + // for diff sync protocol. + minProtocolPeers = 5 + + // totalProtocolPeers is the number of peers we want to have connected for diff sync protocol. + totalProtocolPeers = 10 +) + +// Client is a diff sync protocol client. +type Client interface { + // GetDiff requests a write log of entries that must be applied to get from the first given root + // to the second one. + GetDiff(ctx context.Context, request *GetDiffRequest) (*GetDiffResponse, rpc.PeerFeedback, error) +} + +type client struct { + rc rpc.Client + mgr rpc.PeerManager +} + +func (c *client) GetDiff(ctx context.Context, request *GetDiffRequest) (*GetDiffResponse, rpc.PeerFeedback, error) { + var rsp GetDiffResponse + pf, err := c.rc.CallOne(ctx, c.mgr.GetBestPeers(), MethodGetDiff, request, &rsp, + rpc.WithMaxPeerResponseTime(MaxGetDiffResponseTime), + ) + if err != nil { + return nil, nil, err + } + return &rsp, pf, nil +} + +// NewClient creates a new diff sync protocol client. +// +// Moreover, it ensures underlying p2p service starts tracking protocol peers. +func NewClient(p2p rpc.P2P, chainContext string, runtimeID common.Namespace) Client { + pid := ProtocolID(chainContext, runtimeID) + rc := rpc.NewClient(p2p.Host(), pid) + mgr := rpc.NewPeerManager(p2p, pid) + rc.RegisterListener(mgr) + + p2p.RegisterProtocol(pid, minProtocolPeers, totalProtocolPeers) + + return &client{ + rc: rc, + mgr: mgr, + } +} diff --git a/go/worker/storage/p2p/diffsync/protocol.go b/go/worker/storage/p2p/diffsync/protocol.go new file mode 100644 index 00000000000..e2949504039 --- /dev/null +++ b/go/worker/storage/p2p/diffsync/protocol.go @@ -0,0 +1,61 @@ +// Package diffsync defines wire protocol together with client/server +// implementations for the diff sync protocol, used for runtime block sync. +package diffsync + +import ( + "time" + + "github.com/libp2p/go-libp2p/core" + + "github.com/oasisprotocol/oasis-core/go/common" + "github.com/oasisprotocol/oasis-core/go/common/node" + "github.com/oasisprotocol/oasis-core/go/common/version" + "github.com/oasisprotocol/oasis-core/go/p2p/peermgmt" + "github.com/oasisprotocol/oasis-core/go/p2p/protocol" + "github.com/oasisprotocol/oasis-core/go/storage/api" +) + +// DiffSyncProtocolID is a unique protocol identifier for the diff sync protocol. +const DiffSyncProtocolID = "diffsync" + +// DiffSyncProtocolVersion is the supported version of the diff sync protocol. +var DiffSyncProtocolVersion = version.Version{Major: 1, Minor: 0, Patch: 0} + +// ProtocolID returns the runtime diff sync protocol ID. +func ProtocolID(chainContext string, runtimeID common.Namespace) core.ProtocolID { + return protocol.NewRuntimeProtocolID(chainContext, runtimeID, DiffSyncProtocolID, DiffSyncProtocolVersion) +} + +// Constants related to the GetDiff method. +const ( + MethodGetDiff = "GetDiff" + MaxGetDiffResponseTime = 15 * time.Second +) + +// GetDiffRequest is a GetDiff request. +type GetDiffRequest struct { + StartRoot api.Root `json:"start_root"` + EndRoot api.Root `json:"end_root"` +} + +// GetDiffResponse is a response to a GetDiff request. +type GetDiffResponse struct { + WriteLog api.WriteLog `json:"write_log,omitempty"` +} + +func init() { + peermgmt.RegisterNodeHandler(&peermgmt.NodeHandlerBundle{ + ProtocolsFn: func(n *node.Node, chainContext string) []core.ProtocolID { + if !n.HasRoles(node.RoleComputeWorker | node.RoleStorageRPC) { + return []core.ProtocolID{} + } + + protocols := make([]core.ProtocolID, len(n.Runtimes)) + for i, rt := range n.Runtimes { + protocols[i] = ProtocolID(chainContext, rt.ID) + } + + return protocols + }, + }) +} diff --git a/go/worker/storage/p2p/diffsync/server.go b/go/worker/storage/p2p/diffsync/server.go new file mode 100644 index 00000000000..4267f8dc6a8 --- /dev/null +++ b/go/worker/storage/p2p/diffsync/server.go @@ -0,0 +1,61 @@ +package diffsync + +import ( + "context" + + "github.com/oasisprotocol/oasis-core/go/common" + "github.com/oasisprotocol/oasis-core/go/common/cbor" + "github.com/oasisprotocol/oasis-core/go/p2p/rpc" + "github.com/oasisprotocol/oasis-core/go/storage/api" +) + +type service struct { + backend api.Backend +} + +func (s *service) HandleRequest(ctx context.Context, method string, body cbor.RawMessage) (any, error) { + switch method { + case MethodGetDiff: + var rq GetDiffRequest + if err := cbor.Unmarshal(body, &rq); err != nil { + return nil, rpc.ErrBadRequest + } + + return s.handleGetDiff(ctx, &rq) + default: + return nil, rpc.ErrMethodNotSupported + } +} + +func (s *service) handleGetDiff(ctx context.Context, request *GetDiffRequest) (*GetDiffResponse, error) { + it, err := s.backend.GetDiff(ctx, &api.GetDiffRequest{ + StartRoot: request.StartRoot, + EndRoot: request.EndRoot, + }) + if err != nil { + return nil, err + } + + var rsp GetDiffResponse + for { + more, err := it.Next() + if err != nil { + return nil, err + } + if !more { + break + } + + chunk, err := it.Value() + if err != nil { + return nil, err + } + rsp.WriteLog = append(rsp.WriteLog, chunk) + } + return &rsp, nil +} + +// NewServer creates a new diff sync protocol server. +func NewServer(chainContext string, runtimeID common.Namespace, backend api.Backend) rpc.Server { + return rpc.NewServer(ProtocolID(chainContext, runtimeID), &service{backend}) +} diff --git a/go/worker/storage/p2p/pub/client.go b/go/worker/storage/p2p/pub/client.go index 4b75b8f5d32..1590df41cb2 100644 --- a/go/worker/storage/p2p/pub/client.go +++ b/go/worker/storage/p2p/pub/client.go @@ -4,7 +4,6 @@ import ( "context" "github.com/oasisprotocol/oasis-core/go/common" - "github.com/oasisprotocol/oasis-core/go/p2p/protocol" "github.com/oasisprotocol/oasis-core/go/p2p/rpc" ) @@ -64,7 +63,7 @@ func (c *client) Iterate(ctx context.Context, request *IterateRequest) (*ProofRe // NewClient creates a new storage pub protocol client. func NewClient(p2p rpc.P2P, chainContext string, runtimeID common.Namespace) Client { - pid := protocol.NewRuntimeProtocolID(chainContext, runtimeID, StoragePubProtocolID, StoragePubProtocolVersion) + pid := ProtocolID(chainContext, runtimeID) mgr := rpc.NewPeerManager(p2p, pid) rc := rpc.NewClient(p2p.Host(), pid) rc.RegisterListener(mgr) diff --git a/go/worker/storage/p2p/pub/protocol.go b/go/worker/storage/p2p/pub/protocol.go index e3641b77b78..48ac31eff0e 100644 --- a/go/worker/storage/p2p/pub/protocol.go +++ b/go/worker/storage/p2p/pub/protocol.go @@ -58,7 +58,7 @@ func init() { protocols := make([]core.ProtocolID, len(n.Runtimes)) for i, rt := range n.Runtimes { - protocols[i] = protocol.NewRuntimeProtocolID(chainContext, rt.ID, StoragePubProtocolID, StoragePubProtocolVersion) + protocols[i] = ProtocolID(chainContext, rt.ID) } return protocols