diff --git a/.changelog/6036.trivial.md b/.changelog/6036.trivial.md new file mode 100644 index 00000000000..e69de29bb2d diff --git a/.changelog/6308.trivial.md b/.changelog/6308.trivial.md new file mode 100644 index 00000000000..e69de29bb2d diff --git a/go/consensus/cometbft/abci/state.go b/go/consensus/cometbft/abci/state.go index e580cd3e720..21ea769f026 100644 --- a/go/consensus/cometbft/abci/state.go +++ b/go/consensus/cometbft/abci/state.go @@ -782,10 +782,11 @@ func newApplicationState(ctx context.Context, upgrader upgrade.Backend, cfg *App }, nil }, } - s.checkpointer, err = checkpoint.NewCheckpointer(s.ctx, ndb, ldb.Checkpointer(), checkpointerCfg) - if err != nil { - return nil, fmt.Errorf("state: failed to create checkpointer: %w", err) - } + s.checkpointer = checkpoint.NewCheckpointer(ndb, ldb.Checkpointer(), checkpointerCfg) + go func() { + err := s.checkpointer.Serve(ctx) + s.logger.Error("checkpointer failed", "err", err) + }() } go s.metricsWorker() diff --git a/go/oasis-node/cmd/node/node_control.go b/go/oasis-node/cmd/node/node_control.go index 9310d0780f5..494bf9b2332 100644 --- a/go/oasis-node/cmd/node/node_control.go +++ b/go/oasis-node/cmd/node/node_control.go @@ -312,10 +312,10 @@ func (n *Node) getRuntimeStatus(ctx context.Context) (map[common.Namespace]contr } // Fetch storage worker status. - if storageNode := n.StorageWorker.GetRuntime(rt.ID()); storageNode != nil { - status.Storage, err = storageNode.GetStatus(ctx) + if stateSync := n.StorageWorker.GetRuntime(rt.ID()); stateSync != nil { + status.Storage, err = stateSync.GetStatus(ctx) if err != nil { - logger.Error("failed to fetch storage worker status", "err", err) + logger.Error("failed to fetch state sync worker status", "err", err) } } diff --git a/go/oasis-test-runner/oasis/log.go b/go/oasis-test-runner/oasis/log.go index cd38354d83f..a46b126c18a 100644 --- a/go/oasis-test-runner/oasis/log.go +++ b/go/oasis-test-runner/oasis/log.go @@ -8,7 +8,7 @@ import ( roothash "github.com/oasisprotocol/oasis-core/go/roothash/api" "github.com/oasisprotocol/oasis-core/go/roothash/api/commitment" upgrade "github.com/oasisprotocol/oasis-core/go/upgrade/api" - workerStorage "github.com/oasisprotocol/oasis-core/go/worker/storage/committee" + "github.com/oasisprotocol/oasis-core/go/worker/storage/statesync" ) // LogAssertEvent returns a handler which checks whether a specific log event was @@ -116,7 +116,7 @@ func LogAssertRoothashRoothashReindexing() log.WatcherHandlerFactory { // LogAssertCheckpointSync returns a handler which checks whether initial storage sync from // a checkpoint was successful or not. func LogAssertCheckpointSync() log.WatcherHandlerFactory { - return LogAssertEvent(workerStorage.LogEventCheckpointSyncSuccess, "checkpoint sync did not succeed") + return LogAssertEvent(statesync.LogEventCheckpointSyncSuccess, "checkpoint sync did not succeed") } // LogAssertDiscrepancyMajorityFailure returns a handler which checks whether a discrepancy resolution diff --git a/go/storage/mkvs/checkpoint/checkpointer.go b/go/storage/mkvs/checkpoint/checkpointer.go index 7a43a834871..fd64c8b0d50 100644 --- a/go/storage/mkvs/checkpoint/checkpointer.go +++ b/go/storage/mkvs/checkpoint/checkpointer.go @@ -72,7 +72,7 @@ type CreationParameters struct { ChunkerThreads uint16 } -// Checkpointer is a checkpointer. +// Checkpointer is responsible for creating the storage snapshots (checkpoints). type Checkpointer interface { // NotifyNewVersion notifies the checkpointer that a new version has been finalized. NotifyNewVersion(version uint64) @@ -88,6 +88,10 @@ type Checkpointer interface { // versions are emitted before the checkpointing process starts. WatchCheckpoints() (<-chan uint64, pubsub.ClosableSubscription, error) + // // WatchCreatedCheckpoints returns a channel that produces a stream of checkpointed versions. The + // versions are emitted immediately after the checkpoint is created. + WatchCreatedCheckpoints() (<-chan uint64, pubsub.ClosableSubscription, error) + // Flush makes the checkpointer immediately process any notifications. Flush() @@ -95,23 +99,46 @@ type Checkpointer interface { // intervals; after unpausing, a checkpoint won't be created immediately, but the checkpointer // will wait for the next regular event. Pause(pause bool) + + // Serve starts running the checkpointer. + Serve(ctx context.Context) error } type checkpointer struct { cfg CheckpointerConfig - ndb db.NodeDB - creator Creator - notifyCh *channels.RingChannel - forceCh *channels.RingChannel - flushCh *channels.RingChannel - statusCh chan struct{} - pausedCh chan bool - cpNotifier *pubsub.Broker + ndb db.NodeDB + creator Creator + notifyCh *channels.RingChannel + forceCh *channels.RingChannel + flushCh *channels.RingChannel + statusCh chan struct{} + pausedCh chan bool + cpNotifier *pubsub.Broker + cpCreatedNotifier *pubsub.Broker logger *logging.Logger } +// NewCheckpointer creates a new checkpointer that can be notified of new finalized versions and +// will automatically generate the configured number of checkpoints. +func NewCheckpointer(ndb db.NodeDB, creator Creator, cfg CheckpointerConfig) Checkpointer { + c := &checkpointer{ + cfg: cfg, + ndb: ndb, + creator: creator, + notifyCh: channels.NewRingChannel(1), + forceCh: channels.NewRingChannel(1), + flushCh: channels.NewRingChannel(1), + statusCh: make(chan struct{}), + pausedCh: make(chan bool), + cpNotifier: pubsub.NewBroker(false), + cpCreatedNotifier: pubsub.NewBroker(false), + logger: logging.GetLogger("storage/mkvs/checkpoint/"+cfg.Name).With("namespace", cfg.Namespace), + } + return c +} + // Implements Checkpointer. func (c *checkpointer) NotifyNewVersion(version uint64) { c.notifyCh.In() <- version @@ -131,6 +158,15 @@ func (c *checkpointer) WatchCheckpoints() (<-chan uint64, pubsub.ClosableSubscri return ch, sub, nil } +// Implements Checkpointer. +func (c *checkpointer) WatchCreatedCheckpoints() (<-chan uint64, pubsub.ClosableSubscription, error) { + ch := make(chan uint64) + sub := c.cpCreatedNotifier.Subscribe() + sub.Unwrap(ch) + + return ch, sub, nil +} + // Implements Checkpointer. func (c *checkpointer) Flush() { c.flushCh.In() <- struct{}{} @@ -140,6 +176,106 @@ func (c *checkpointer) Pause(pause bool) { c.pausedCh <- pause } +func (c *checkpointer) Serve(ctx context.Context) error { + c.logger.Debug("storage checkpointer started", + "check_interval", c.cfg.CheckInterval, + ) + defer func() { + c.logger.Debug("storage checkpointer terminating") + }() + + paused := false + + for { + var interval time.Duration + switch c.cfg.CheckInterval { + case CheckIntervalDisabled: + interval = CheckIntervalDisabled + default: + interval = random.GetRandomValueFromInterval( + checkpointIntervalRandomizationFactor, + rand.Float64(), + c.cfg.CheckInterval, + ) + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(interval): + case <-c.flushCh.Out(): + case paused = <-c.pausedCh: + continue + } + + var ( + version uint64 + force bool + ) + select { + case <-ctx.Done(): + return ctx.Err() + case v := <-c.notifyCh.Out(): + version = v.(uint64) + case v := <-c.forceCh.Out(): + version = v.(uint64) + force = true + } + + // Fetch current checkpoint parameters. + params := c.cfg.Parameters + if params == nil && c.cfg.GetParameters != nil { + var err error + params, err = c.cfg.GetParameters(ctx) + if err != nil { + c.logger.Error("failed to get checkpoint parameters", + "err", err, + "version", version, + ) + continue + } + } + if params == nil { + c.logger.Error("no checkpoint parameters") + continue + } + + // Don't checkpoint if checkpoints are disabled. + switch { + case force: + // Always checkpoint when forced. + case paused: + continue + case params.Interval == 0: + continue + case c.cfg.CheckInterval == CheckIntervalDisabled: + continue + default: + } + + var err error + switch force { + case false: + err = c.maybeCheckpoint(ctx, version, params) + case true: + err = c.checkpoint(ctx, version, params) + } + if err != nil { + c.logger.Error("failed to checkpoint", + "version", version, + "err", err, + ) + continue + } + + // Emit status update if someone is listening. This is only used in tests. + select { + case c.statusCh <- struct{}{}: + default: + } + } +} + func (c *checkpointer) checkpoint(ctx context.Context, version uint64, params *CreationParameters) (err error) { // Notify watchers about the checkpoint we are about to make. c.cpNotifier.Broadcast(version) @@ -191,6 +327,7 @@ func (c *checkpointer) checkpoint(ctx context.Context, version uint64, params *C return fmt.Errorf("checkpointer: failed to create checkpoint: %w", err) } } + c.cpCreatedNotifier.Broadcast(version) return nil } @@ -284,127 +421,3 @@ func (c *checkpointer) maybeCheckpoint(ctx context.Context, version uint64, para return nil } - -func (c *checkpointer) worker(ctx context.Context) { - c.logger.Debug("storage checkpointer started", - "check_interval", c.cfg.CheckInterval, - ) - defer func() { - c.logger.Debug("storage checkpointer terminating") - }() - - paused := false - - for { - var interval time.Duration - switch c.cfg.CheckInterval { - case CheckIntervalDisabled: - interval = CheckIntervalDisabled - default: - interval = random.GetRandomValueFromInterval( - checkpointIntervalRandomizationFactor, - rand.Float64(), - c.cfg.CheckInterval, - ) - } - - select { - case <-ctx.Done(): - return - case <-time.After(interval): - case <-c.flushCh.Out(): - case paused = <-c.pausedCh: - continue - } - - var ( - version uint64 - force bool - ) - select { - case <-ctx.Done(): - return - case v := <-c.notifyCh.Out(): - version = v.(uint64) - case v := <-c.forceCh.Out(): - version = v.(uint64) - force = true - } - - // Fetch current checkpoint parameters. - params := c.cfg.Parameters - if params == nil && c.cfg.GetParameters != nil { - var err error - params, err = c.cfg.GetParameters(ctx) - if err != nil { - c.logger.Error("failed to get checkpoint parameters", - "err", err, - "version", version, - ) - continue - } - } - if params == nil { - c.logger.Error("no checkpoint parameters") - continue - } - - // Don't checkpoint if checkpoints are disabled. - switch { - case force: - // Always checkpoint when forced. - case paused: - continue - case params.Interval == 0: - continue - case c.cfg.CheckInterval == CheckIntervalDisabled: - continue - default: - } - - var err error - switch force { - case false: - err = c.maybeCheckpoint(ctx, version, params) - case true: - err = c.checkpoint(ctx, version, params) - } - if err != nil { - c.logger.Error("failed to checkpoint", - "version", version, - "err", err, - ) - continue - } - - // Emit status update if someone is listening. This is only used in tests. - select { - case c.statusCh <- struct{}{}: - default: - } - } -} - -// NewCheckpointer creates a new checkpointer that can be notified of new finalized versions and -// will automatically generate the configured number of checkpoints. -func NewCheckpointer( - ctx context.Context, - ndb db.NodeDB, - creator Creator, - cfg CheckpointerConfig, -) (Checkpointer, error) { - c := &checkpointer{ - cfg: cfg, - ndb: ndb, - creator: creator, - notifyCh: channels.NewRingChannel(1), - forceCh: channels.NewRingChannel(1), - flushCh: channels.NewRingChannel(1), - statusCh: make(chan struct{}), - pausedCh: make(chan bool), - cpNotifier: pubsub.NewBroker(false), - logger: logging.GetLogger("storage/mkvs/checkpoint/"+cfg.Name).With("namespace", cfg.Namespace), - } - go c.worker(ctx) - return c, nil -} diff --git a/go/storage/mkvs/checkpoint/checkpointer_test.go b/go/storage/mkvs/checkpoint/checkpointer_test.go index 9782f904b85..e9994248590 100644 --- a/go/storage/mkvs/checkpoint/checkpointer_test.go +++ b/go/storage/mkvs/checkpoint/checkpointer_test.go @@ -25,7 +25,8 @@ const ( func testCheckpointer(t *testing.T, factory dbApi.Factory, earliestVersion, interval uint64, preExistingData bool) { require := require.New(t) - ctx := context.Background() + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() // Initialize a database. dir, err := os.MkdirTemp("", "mkvs.checkpointer") @@ -70,7 +71,7 @@ func testCheckpointer(t *testing.T, factory dbApi.Factory, earliestVersion, inte require.NoError(err, "NewFileCreator") // Create a checkpointer. - cp, err := NewCheckpointer(ctx, ndb, fc, CheckpointerConfig{ + cp := NewCheckpointer(ndb, fc, CheckpointerConfig{ Name: "test", Namespace: testNs, CheckInterval: testCheckInterval, @@ -89,7 +90,12 @@ func testCheckpointer(t *testing.T, factory dbApi.Factory, earliestVersion, inte return ndb.GetRootsForVersion(version) }, }) - require.NoError(err, "NewCheckpointer") + go func() { + err := cp.Serve(ctx) + if err != context.Canceled { + require.NoError(err) + } + }() // Start watching checkpoints. cpCh, sub, err := cp.WatchCheckpoints() diff --git a/go/worker/storage/availabilitynudger/availability.go b/go/worker/storage/availabilitynudger/availability.go new file mode 100644 index 00000000000..be6a2e1338c --- /dev/null +++ b/go/worker/storage/availabilitynudger/availability.go @@ -0,0 +1,144 @@ +// Package availabilitynudger defines logic for updating the role providers. +package availabilitynudger + +import ( + "context" + "fmt" + "math" + + "github.com/eapache/channels" + + "github.com/oasisprotocol/oasis-core/go/common" + "github.com/oasisprotocol/oasis-core/go/common/logging" + "github.com/oasisprotocol/oasis-core/go/common/node" + "github.com/oasisprotocol/oasis-core/go/roothash/api/block" + "github.com/oasisprotocol/oasis-core/go/worker/registration" + "github.com/oasisprotocol/oasis-core/go/worker/storage/statesync" +) + +const ( + // The maximum number of rounds the worker can be behind the chain before it's sensible for + // it to register as available. + maximumRoundDelayForAvailability = uint64(10) + + // The minimum number of rounds the worker can be behind the chain before it's sensible for + // it to stop advertising availability. + minimumRoundDelayForUnavailability = uint64(15) +) + +// Worker tracks the progress of last and last synced rounds +// and “nudges” role providers to mark themselves available or unavailable +// based on how closely the node is keeping up with consensus. +type Worker struct { + roleProvider registration.RoleProvider + rpcRoleProvider registration.RoleProvider + roleAvailable bool + + lastRound uint64 + lastSyncedRound uint64 + + blockCh *channels.InfiniteChannel + stateSync *statesync.Worker + + logger *logging.Logger +} + +// New creates a new worker that updates the availability to role providers. +func New(localProvider, rpcProvider registration.RoleProvider, blockCh *channels.InfiniteChannel, stateSync *statesync.Worker, runtimeID common.Namespace) *Worker { + return &Worker{ + roleProvider: localProvider, + rpcRoleProvider: rpcProvider, + lastRound: math.MaxUint64, + lastSyncedRound: math.MaxUint64, + blockCh: blockCh, + stateSync: stateSync, + logger: logging.GetLogger("worker/storage/availabilitynudger").With("runtime_id", runtimeID), + } +} + +// Serve starts the worker. +func (w *Worker) Serve(ctx context.Context) error { + w.logger.Info("started") + defer w.logger.Info("stopped") + + finalizeCh, sub, err := w.stateSync.WatchFinalizedRounds() + if err != nil { + return fmt.Errorf("failed to watch finalized rounds: %w", err) + } + defer sub.Close() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case inBlk := <-w.blockCh.Out(): + blk := inBlk.(*block.Block) + w.setLastRound(blk.Header.Round) + case round := <-finalizeCh: + w.setLastSyncedRound(round) + } + w.updateAvailability() + } +} + +// setLastRound updates the last round number. +func (w *Worker) setLastRound(round uint64) { + w.lastRound = round +} + +// setLastSyncedRound updates the most recently synced round number. +func (w *Worker) setLastSyncedRound(round uint64) { + w.lastSyncedRound = round +} + +// updateAvailability updates the role's availability based on the gap +// between the last round and the last synced round. +func (w *Worker) updateAvailability() { + if w.lastRound == math.MaxUint64 || w.lastSyncedRound == math.MaxUint64 { + return + } + // if w.lastRound > w.lastSyncedRound { + // return + // } not sure what was intent here given this we are looking for the gap. + + switch roundLag := w.lastRound - w.lastSyncedRound; { + case roundLag < maximumRoundDelayForAvailability: + w.markAvailable() + case roundLag > minimumRoundDelayForUnavailability: + w.markUnavailable() + } +} + +// markAvailable sets the role as available if it is not already. +func (w *Worker) markAvailable() { + if w.roleAvailable { + return + } + w.roleAvailable = true + + w.logger.Info("marking as available") + + if w.roleProvider != nil { + w.roleProvider.SetAvailable(func(*node.Node) error { return nil }) + } + if w.rpcRoleProvider != nil { + w.rpcRoleProvider.SetAvailable(func(*node.Node) error { return nil }) + } +} + +// markUnavailable sets the role as unavailable if it is currently available. +func (w *Worker) markUnavailable() { + if !w.roleAvailable { + return + } + w.roleAvailable = false + + w.logger.Info("marking as unavailable") + + if w.roleProvider != nil { + w.roleProvider.SetUnavailable() + } + if w.rpcRoleProvider != nil { + w.rpcRoleProvider.SetUnavailable() + } +} diff --git a/go/worker/storage/checkpointer/checkpointer.go b/go/worker/storage/checkpointer/checkpointer.go new file mode 100644 index 00000000000..61fb577359a --- /dev/null +++ b/go/worker/storage/checkpointer/checkpointer.go @@ -0,0 +1,323 @@ +// Package checkpointer defines logic for periodically creating checkpoints +// of the runtime state. +package checkpointer + +import ( + "context" + "fmt" + "time" + + "github.com/oasisprotocol/oasis-core/go/common/logging" + "github.com/oasisprotocol/oasis-core/go/common/pubsub" + consensusAPI "github.com/oasisprotocol/oasis-core/go/consensus/api" + commonFlags "github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/common/flags" + roothashAPI "github.com/oasisprotocol/oasis-core/go/roothash/api" + storageAPI "github.com/oasisprotocol/oasis-core/go/storage/api" + "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" + "github.com/oasisprotocol/oasis-core/go/worker/common/committee" + "github.com/oasisprotocol/oasis-core/go/worker/storage/api" + "github.com/oasisprotocol/oasis-core/go/worker/storage/statesync" +) + +// chunkerThreads is target number of subtrees during parallel checkpoint creation. +// It is intentionally non-configurable since we want operators to produce +// same checkpoint hashes. The current value was chosen based on the benchmarks +// done on the modern developer machine. +const chunkerThreads = 12 + +// Worker is responsible for creating runtime checkpoints for every consensus checkpoint, +// and notifying the checkpointer about the new finalized versions. +// +// If the checkpointer is disabled, it will wait until the state is initialized +// and ensure at least checkpoint for the genesis height was created. +type Worker struct { + commonNode *committee.Node + localStorage storageAPI.LocalBackend + checkpointer checkpoint.Checkpointer + stateSync *statesync.Worker + cfg Config + logger *logging.Logger +} + +// Config is the worker configuration. +type Config struct { + // CheckpointerEnabled specifies creation of period runtime checkpoints is enabled. + CheckpointerEnabled bool + // CheckInterval is the interval on which to check if any checkpointing is needed. + CheckInterval time.Duration + // ParallelChunker specifies if the new parallel chunking algorithm can be used. + ParallelChunker bool +} + +// New creates new worker. +func New(commonNode *committee.Node, localStorage storageAPI.LocalBackend, stateSync *statesync.Worker, cfg Config) (*Worker, error) { + checkInterval := checkpoint.CheckIntervalDisabled + if cfg.CheckpointerEnabled { + checkInterval = cfg.CheckInterval + } + checkpointerCfg := checkpoint.CheckpointerConfig{ + Name: "runtime", + Namespace: commonNode.Runtime.ID(), + CheckInterval: checkInterval, + RootsPerVersion: 2, // State root and I/O root. + GetParameters: func(ctx context.Context) (*checkpoint.CreationParameters, error) { + rt, rerr := commonNode.Runtime.ActiveDescriptor(ctx) + if rerr != nil { + return nil, fmt.Errorf("failed to retrieve runtime descriptor: %w", rerr) + } + + blk, rerr := commonNode.Consensus.RootHash().GetGenesisBlock(ctx, &roothashAPI.RuntimeRequest{ + RuntimeID: rt.ID, + Height: consensusAPI.HeightLatest, + }) + if rerr != nil { + return nil, fmt.Errorf("failed to retrieve genesis block: %w", rerr) + } + + var threads uint16 + if cfg.ParallelChunker { + threads = chunkerThreads + } + + return &checkpoint.CreationParameters{ + Interval: rt.Storage.CheckpointInterval, + NumKept: rt.Storage.CheckpointNumKept, + ChunkSize: rt.Storage.CheckpointChunkSize, + InitialVersion: blk.Header.Round, + ChunkerThreads: threads, + }, nil + }, + GetRoots: func(ctx context.Context, version uint64) ([]storageAPI.Root, error) { + blk, berr := commonNode.Runtime.History().GetCommittedBlock(ctx, version) + if berr != nil { + return nil, berr + } + + return blk.Header.StorageRoots(), nil + }, + } + + checkpointer := checkpoint.NewCheckpointer( + localStorage.NodeDB(), + localStorage.Checkpointer(), + checkpointerCfg, + ) + + return &Worker{ + commonNode: commonNode, + localStorage: localStorage, + checkpointer: checkpointer, + stateSync: stateSync, + cfg: cfg, + logger: logging.GetLogger("worker/storage/checkpointer").With("runtime_id", commonNode.Runtime.ID()), + }, nil +} + +func (w *Worker) PauseCheckpointer(pause bool) error { + if !commonFlags.DebugDontBlameOasis() { + return api.ErrCantPauseCheckpointer + } + w.checkpointer.Pause(pause) + return nil +} + +// Serve runs the worker. +func (w *Worker) Serve(ctx context.Context) error { + w.logger.Info("started") + defer w.logger.Info("stopped") + + consensusCp := w.commonNode.Consensus.Checkpointer() + if consensusCp == nil { + return nil // TODO was existing code robust here? + } + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + go func() { // TODO make it more robust as worker should probably stop here? + if err := w.checkpointer.Serve(ctx); err != nil { + w.logger.Error("checkpointer failed", "err", err) + } + }() + + if err := w.ensureGenesisCheckpoint(ctx); err != nil { + return fmt.Errorf("failed to ensure genesis checkpoint was created: %w", err) + } + + if !w.cfg.CheckpointerEnabled { + return nil // We can return safely after creating the genesis checkpoint. + } + + // Determine the maximum number of consensus checkpoints to keep. + // TODO: This should probably be checked more then once, as params can change without the node + // being restarted. + consensusParams, err := w.commonNode.Consensus.Core().GetParameters(ctx, consensusAPI.HeightLatest) + if err != nil { + return fmt.Errorf("failed to fetch consensus parameters: %w", err) + } + + ch, sub, err := consensusCp.WatchCheckpoints() + if err != nil { + return fmt.Errorf("failed to watch checkpoints: %w", err) + } + defer sub.Close() + + finalizeCh, sub, err := w.stateSync.WatchFinalizedRounds() + if err != nil { + return fmt.Errorf("failed to watch finalized summaries: %w", err) + } + defer sub.Close() + + var ( + versions []uint64 + blkCh <-chan *consensusAPI.Block + blkSub pubsub.ClosableSubscription + ) + defer func() { + if blkCh != nil { + blkSub.Close() + blkSub = nil + blkCh = nil + } + }() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case version := <-ch: + // We need to wait for the next version as that is what will be in the consensus + // checkpoint. + versions = append(versions, version+1) + // Make sure that we limit the size of the checkpoint queue. + if uint64(len(versions)) > consensusParams.Parameters.StateCheckpointNumKept { + versions = versions[1:] + } + + w.logger.Debug("consensus checkpoint detected, queuing runtime checkpoint", + "version", version+1, + "num_versions", len(versions), + ) + + if blkCh == nil { + blkCh, blkSub, err = w.commonNode.Consensus.Core().WatchBlocks(ctx) + if err != nil { + w.logger.Error("failed to watch blocks", + "err", err, + ) + continue + } + } + case blk := <-blkCh: + // If there's nothing remaining, unsubscribe. + if len(versions) == 0 { + w.logger.Debug("no more queued consensus checkpoint versions") + + blkSub.Close() + blkSub = nil + blkCh = nil + continue + } + + var newVersions []uint64 + for idx, version := range versions { + if version > uint64(blk.Height) { + // We need to wait for further versions. + newVersions = versions[idx:] + break + } + + // Lookup what runtime round corresponds to the given consensus layer version and make + // sure we checkpoint it. + blk, err := w.commonNode.Consensus.RootHash().GetLatestBlock(ctx, &roothashAPI.RuntimeRequest{ + RuntimeID: w.commonNode.Runtime.ID(), + Height: int64(version), + }) + if err != nil { + w.logger.Error("failed to get runtime block corresponding to consensus checkpoint", + "err", err, + "height", version, + ) + continue + } + + // We may have not yet synced the corresponding runtime round locally. In this case + // we need to wait until this is the case. + lastSyncedRound, _, _ := w.stateSync.GetLastSynced() + if blk.Header.Round > lastSyncedRound { + w.logger.Debug("runtime round not available yet for checkpoint, waiting", + "height", version, + "round", blk.Header.Round, + "last_synced_round", lastSyncedRound, + ) + newVersions = versions[idx:] + break + } + + // Force runtime storage checkpointer to create a checkpoint at this round. + w.logger.Info("consensus checkpoint, force runtime checkpoint", + "height", version, + "round", blk.Header.Round, + ) + + w.checkpointer.ForceCheckpoint(blk.Header.Round) + } + versions = newVersions + case round := <-finalizeCh: + w.checkpointer.NotifyNewVersion(round) + } + } +} + +func (w *Worker) ensureGenesisCheckpoint(ctx context.Context) error { + // Wait for the common node to be initialized. + select { + case <-w.commonNode.Initialized(): + case <-ctx.Done(): + return ctx.Err() + } + + // Wait for state sync worker to be initialized which guarantees us to have the state initialized. + select { + case <-w.stateSync.Initialized(): + case <-ctx.Done(): + return ctx.Err() + } + + genesisBlock, err := w.commonNode.Consensus.RootHash().GetGenesisBlock(ctx, &roothashAPI.RuntimeRequest{ + RuntimeID: w.commonNode.Runtime.ID(), + Height: consensusAPI.HeightLatest, + }) + if err != nil { + return fmt.Errorf("can't retrieve genesis block: %w", err) + } + + ch, sub, err := w.checkpointer.WatchCreatedCheckpoints() + if err != nil { + return fmt.Errorf("failed to watch created checkpoints: %w", err) + } + defer sub.Close() + + _, err = w.localStorage.Checkpointer().GetCheckpoint(ctx, 1, genesisBlock.Header.StorageRootState()) + if err == nil { // if NOT error we already have a checkpoint. TODO: this is not robust, even though genesis has no io root. + return nil + } + + // Notify the checkpointer of the genesis round so it can be checkpointed. + if w.checkpointer != nil { + w.checkpointer.ForceCheckpoint(genesisBlock.Header.Round) + w.checkpointer.Flush() + } + + // TODO add timeout. + for { + select { + case <-ctx.Done(): + return ctx.Err() + case r := <-ch: + if r != genesisBlock.Header.Round { + continue + } + return nil // genesis checkpoint created successfully. + } + } +} diff --git a/go/worker/storage/runtime_worker.go b/go/worker/storage/runtime_worker.go new file mode 100644 index 00000000000..1d0429b9c05 --- /dev/null +++ b/go/worker/storage/runtime_worker.go @@ -0,0 +1,139 @@ +package storage + +import ( + "context" + "fmt" + + "github.com/eapache/channels" + "golang.org/x/sync/errgroup" + + "github.com/oasisprotocol/oasis-core/go/common/logging" + "github.com/oasisprotocol/oasis-core/go/config" + runtimeAPI "github.com/oasisprotocol/oasis-core/go/runtime/api" + "github.com/oasisprotocol/oasis-core/go/runtime/host" + "github.com/oasisprotocol/oasis-core/go/storage/api" + committeeCommon "github.com/oasisprotocol/oasis-core/go/worker/common/committee" + "github.com/oasisprotocol/oasis-core/go/worker/registration" + storageAPI "github.com/oasisprotocol/oasis-core/go/worker/storage/api" + "github.com/oasisprotocol/oasis-core/go/worker/storage/availabilitynudger" + "github.com/oasisprotocol/oasis-core/go/worker/storage/checkpointer" + "github.com/oasisprotocol/oasis-core/go/worker/storage/statesync" +) + +// Worker is handling storage operations for a single runtime. +type worker struct { + commonNode *committeeCommon.Node + logger *logging.Logger + stateSync *statesync.Worker + checkpointer *checkpointer.Worker + availabilityNudger *availabilitynudger.Worker + stateSyncBlkCh *channels.InfiniteChannel + availabilityBlkCh *channels.InfiniteChannel +} + +func newRuntimeWorker( + commonNode *committeeCommon.Node, + rp registration.RoleProvider, + rpRPC registration.RoleProvider, + localStorage api.LocalBackend, + checkpointSyncCfg *statesync.CheckpointSyncConfig, + checkpointerEnabled bool, +) (*worker, error) { + worker := &worker{ + commonNode: commonNode, + logger: logging.GetLogger("worker/storage").With("runtimeID", commonNode.Runtime.ID()), + stateSyncBlkCh: channels.NewInfiniteChannel(), + availabilityBlkCh: channels.NewInfiniteChannel(), + } + + stateSync, err := statesync.New( + commonNode, + localStorage, + worker.stateSyncBlkCh, + checkpointSyncCfg, + ) + if err != nil { + return nil, fmt.Errorf("failed to create state sync worker: %w", err) + } + worker.stateSync = stateSync + + cpCfg := checkpointer.Config{ + CheckpointerEnabled: config.GlobalConfig.Storage.Checkpointer.Enabled, + CheckInterval: config.GlobalConfig.Storage.Checkpointer.CheckInterval, + ParallelChunker: config.GlobalConfig.Storage.Checkpointer.ParallelChunker, + } + checkpointer, err := checkpointer.New(commonNode, localStorage, stateSync, cpCfg) + if err != nil { + return nil, fmt.Errorf("failed to create checkpointer worker: %w", err) + } + worker.checkpointer = checkpointer + + worker.availabilityNudger = availabilitynudger.New(rp, rpRPC, worker.availabilityBlkCh, stateSync, commonNode.Runtime.ID()) + + return worker, nil +} + +// NodeHooks implementation. + +// HandleNewBlockEarlyLocked is guarded by CrossNode. +func (w *worker) HandleNewBlockEarlyLocked(*runtimeAPI.BlockInfo) { + // Nothing to do here. +} + +// HandleNewBlockLocked is guarded by CrossNode. +func (w *worker) HandleNewBlockLocked(bi *runtimeAPI.BlockInfo) { + // Notify the state syncer and availability nudger that there is a new block. + w.stateSyncBlkCh.In() <- bi.RuntimeBlock + w.availabilityBlkCh.In() <- bi.RuntimeBlock +} + +// HandleRuntimeHostEventLocked is guarded by CrossNode. +func (w *worker) HandleRuntimeHostEventLocked(*host.Event) { + // Nothing to do here. +} + +// Initialized returns a channel that will be closed once the worker finished starting up. +func (w *worker) Initialized() <-chan struct{} { + return w.stateSync.Initialized() +} + +func (w *worker) GetStatus(ctx context.Context) (*storageAPI.Status, error) { + return w.stateSync.GetStatus(ctx) +} + +func (w *worker) PauseCheckpointer(pause bool) error { + return w.checkpointer.PauseCheckpointer(pause) +} + +func (w *worker) serve(ctx context.Context) error { + w.logger.Info("started") + defer w.logger.Info("stopped") + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // Create runtime checkpoint for every consensus checkpoint, to make it faster for storage nodes + // that use consensus state sync to catch up as exactly the right checkpoint will be available. + // Intentionally not part of the errgroup below as failing checkpointer should not stop state sync. + go func() { + err := w.checkpointer.Serve(ctx) + if err != nil { + w.logger.Info("checkpointer worker failed", "err", err) + } + }() + + g, ctx := errgroup.WithContext(ctx) + g.Go(func() error { + if err := w.stateSync.Serve(ctx); err != nil { + return fmt.Errorf("state sync worker failed: %w", err) + } + return nil + }) + g.Go(func() error { + if err := w.availabilityNudger.Serve(ctx); err != nil { + return fmt.Errorf("availability nudger failed: %w", err) + } + return nil + }) + return g.Wait() +} diff --git a/go/worker/storage/service_internal.go b/go/worker/storage/service_internal.go index cf5d0ecf064..7c26399099a 100644 --- a/go/worker/storage/service_internal.go +++ b/go/worker/storage/service_internal.go @@ -14,7 +14,7 @@ func (w *Worker) GetLastSyncedRound(_ context.Context, request *api.GetLastSynce return nil, api.ErrRuntimeNotFound } - round, ioRoot, stateRoot := node.GetLastSynced() + round, ioRoot, stateRoot := node.stateSync.GetLastSynced() return &api.GetLastSyncedRoundResponse{ Round: round, IORoot: ioRoot, diff --git a/go/worker/storage/committee/checkpoint_sync.go b/go/worker/storage/statesync/checkpoint_sync.go similarity index 76% rename from go/worker/storage/committee/checkpoint_sync.go rename to go/worker/storage/statesync/checkpoint_sync.go index ad553272a90..91ce2743101 100644 --- a/go/worker/storage/committee/checkpoint_sync.go +++ b/go/worker/storage/statesync/checkpoint_sync.go @@ -1,4 +1,4 @@ -package committee +package statesync import ( "bytes" @@ -21,8 +21,15 @@ import ( const ( // cpListsTimeout is the timeout for fetching checkpoints from all nodes. cpListsTimeout = 30 * time.Second - // cpRestoreTimeout is the timeout for restoring a checkpoint chunk from a node. - cpRestoreTimeout = 60 * time.Second + + // cpRestoreChunkTimeout is the timeout for restoring a checkpoint chunk from the remote peer. + cpRestoreChunkTimeout = 60 * time.Second + + // cpRestoreTimeout is the timeout for restoring the whole checkpoint from the remote peers. + // + // As of now it takes ~10-30 min to restore the state from the checkpoint, however the timeout + // should be significantly higher to account for the growing state. + cpRestoreTimeout = 12 * time.Hour checkpointStatusDone = 0 checkpointStatusNext = 1 @@ -37,7 +44,7 @@ var ErrNoUsableCheckpoints = errors.New("storage: no checkpoint could be synced" // CheckpointSyncConfig is the checkpoint sync configuration. type CheckpointSyncConfig struct { - // Disabled specifies whether checkpoint sync should be disabled. In this case the node will + // Disabled specifies whether checkpoint sync should be disabled. In this case the state sync worker will // only sync by applying all diffs from genesis. Disabled bool @@ -81,7 +88,7 @@ func (h *chunkHeap) Pop() any { return ret } -func (n *Node) checkpointChunkFetcher( +func (w *Worker) checkpointChunkFetcher( ctx context.Context, chunkDispatchCh chan *chunk, chunkReturnCh chan *chunk, @@ -99,13 +106,13 @@ func (n *Node) checkpointChunkFetcher( } } - chunkCtx, cancel := context.WithTimeout(ctx, cpRestoreTimeout) + chunkCtx, cancel := context.WithTimeout(ctx, cpRestoreChunkTimeout) defer cancel() // Fetch chunk from peers. - rsp, pf, err := n.fetchChunk(chunkCtx, chunk) + rsp, pf, err := w.fetchChunk(chunkCtx, chunk) if err != nil { - n.logger.Error("failed to fetch chunk from peers", + w.logger.Error("failed to fetch chunk from peers", "err", err, "chunk", chunk.Index, ) @@ -114,7 +121,7 @@ func (n *Node) checkpointChunkFetcher( } // Restore fetched chunk. - done, err := n.localStorage.Checkpointer().RestoreChunk(chunkCtx, chunk.Index, bytes.NewBuffer(rsp)) + done, err := w.localStorage.Checkpointer().RestoreChunk(chunkCtx, chunk.Index, bytes.NewBuffer(rsp)) cancel() switch { @@ -124,7 +131,7 @@ func (n *Node) checkpointChunkFetcher( chunkReturnCh <- nil return case err != nil: - n.logger.Error("chunk restoration failed", + w.logger.Error("chunk restoration failed", "chunk", chunk.Index, "root", chunk.Root, "err", err, @@ -157,8 +164,8 @@ func (n *Node) checkpointChunkFetcher( // fetchChunk fetches chunk using checkpoint sync p2p protocol client. // // In case of no peers or error, it fallbacks to the legacy storage sync protocol. -func (n *Node) fetchChunk(ctx context.Context, chunk *chunk) ([]byte, rpc.PeerFeedback, error) { - rsp1, pf, err := n.checkpointSync.GetCheckpointChunk( +func (w *Worker) fetchChunk(ctx context.Context, chunk *chunk) ([]byte, rpc.PeerFeedback, error) { + rsp1, pf, err := w.checkpointSync.GetCheckpointChunk( ctx, &checkpointsync.GetCheckpointChunkRequest{ Version: chunk.Version, @@ -175,7 +182,7 @@ func (n *Node) fetchChunk(ctx context.Context, chunk *chunk) ([]byte, rpc.PeerFe return rsp1.Chunk, pf, nil } - rsp2, pf, err := n.legacyStorageSync.GetCheckpointChunk( + rsp2, pf, err := w.legacyStorageSync.GetCheckpointChunk( ctx, &synclegacy.GetCheckpointChunkRequest{ Version: chunk.Version, @@ -194,8 +201,10 @@ func (n *Node) fetchChunk(ctx context.Context, chunk *chunk) ([]byte, rpc.PeerFe return rsp2.Chunk, pf, nil } -func (n *Node) handleCheckpoint(check *checkpointsync.Checkpoint, maxParallelRequests uint) (cpStatus int, rerr error) { - if err := n.localStorage.Checkpointer().StartRestore(n.ctx, check.Metadata); err != nil { +func (w *Worker) handleCheckpoint(ctx context.Context, check *checkpointsync.Checkpoint, maxParallelRequests uint) (cpStatus int, rerr error) { + ctx, cancel := context.WithTimeout(ctx, cpRestoreTimeout) + defer cancel() + if err := w.localStorage.Checkpointer().StartRestore(ctx, check.Metadata); err != nil { // Any previous restores were already aborted by the driver up the call stack, so // things should have been going smoothly here; bail. return checkpointStatusBail, fmt.Errorf("can't start checkpoint restore: %w", err) @@ -208,9 +217,9 @@ func (n *Node) handleCheckpoint(check *checkpointsync.Checkpoint, maxParallelReq } // Abort has to succeed even if we were interrupted by context cancellation. ctx := context.Background() - if err := n.localStorage.Checkpointer().AbortRestore(ctx); err != nil { + if err := w.localStorage.Checkpointer().AbortRestore(ctx); err != nil { cpStatus = checkpointStatusBail - n.logger.Error("error while aborting checkpoint restore on handler exit, aborting sync", + w.logger.Error("error while aborting checkpoint restore on handler exit, aborting sync", "err", err, ) } @@ -222,7 +231,7 @@ func (n *Node) handleCheckpoint(check *checkpointsync.Checkpoint, maxParallelReq chunkReturnCh := make(chan *chunk, maxParallelRequests) errorCh := make(chan int, maxParallelRequests) - ctx, cancel := context.WithCancel(n.ctx) + chunkCtx, cancel := context.WithCancel(ctx) // Spawn the worker group to fetch and restore checkpoint chunks. var workerGroup sync.WaitGroup @@ -231,7 +240,7 @@ func (n *Node) handleCheckpoint(check *checkpointsync.Checkpoint, maxParallelReq workerGroup.Add(1) go func() { defer workerGroup.Done() - n.checkpointChunkFetcher(ctx, chunkDispatchCh, chunkReturnCh, errorCh) + w.checkpointChunkFetcher(chunkCtx, chunkDispatchCh, chunkReturnCh, errorCh) }() } go func() { @@ -264,7 +273,7 @@ func (n *Node) handleCheckpoint(check *checkpointsync.Checkpoint, maxParallelReq checkpoint: check, }) } - n.logger.Debug("checkpoint chunks prepared for dispatch", + w.logger.Debug("checkpoint chunks prepared for dispatch", "chunks", len(check.Chunks), "checkpoint_root", check.Root, ) @@ -283,8 +292,8 @@ func (n *Node) handleCheckpoint(check *checkpointsync.Checkpoint, maxParallelReq } select { - case <-n.ctx.Done(): - return checkpointStatusBail, n.ctx.Err() + case <-ctx.Done(): + return checkpointStatusBail, ctx.Err() case returned := <-chunkReturnCh: if returned == nil { @@ -313,13 +322,13 @@ func (n *Node) handleCheckpoint(check *checkpointsync.Checkpoint, maxParallelReq } } -func (n *Node) getCheckpointList() ([]*checkpointsync.Checkpoint, error) { - ctx, cancel := context.WithTimeout(n.ctx, cpListsTimeout) +func (w *Worker) getCheckpointList(ctx context.Context) ([]*checkpointsync.Checkpoint, error) { + ctx, cancel := context.WithTimeout(ctx, cpListsTimeout) defer cancel() - list, err := n.fetchCheckpoints(ctx) + list, err := w.fetchCheckpoints(ctx) if err != nil { - n.logger.Error("failed to retrieve any checkpoints", + w.logger.Error("failed to retrieve any checkpoints", "err", err, ) return nil, err @@ -334,15 +343,15 @@ func (n *Node) getCheckpointList() ([]*checkpointsync.Checkpoint, error) { // fetchCheckpoints fetches checkpoints using checkpoint sync p2p protocol client. // // In case of no peers, error or no checkpoints, it fallbacks to the legacy storage sync protocol. -func (n *Node) fetchCheckpoints(ctx context.Context) ([]*checkpointsync.Checkpoint, error) { - list1, err := n.checkpointSync.GetCheckpoints(ctx, &checkpointsync.GetCheckpointsRequest{ +func (w *Worker) fetchCheckpoints(ctx context.Context) ([]*checkpointsync.Checkpoint, error) { + list1, err := w.checkpointSync.GetCheckpoints(ctx, &checkpointsync.GetCheckpointsRequest{ Version: 1, }) if err == nil && len(list1) > 0 { // if NO error and at least one checkpoint return list1, nil } - list2, err := n.legacyStorageSync.GetCheckpoints(ctx, &synclegacy.GetCheckpointsRequest{ + list2, err := w.legacyStorageSync.GetCheckpoints(ctx, &synclegacy.GetCheckpointsRequest{ Version: 1, }) if err != nil { @@ -369,8 +378,8 @@ func sortCheckpoints(s []*checkpointsync.Checkpoint) { }) } -func (n *Node) checkCheckpointUsable(cp *checkpointsync.Checkpoint, remainingMask outstandingMask, genesisRound uint64) bool { - namespace := n.commonNode.Runtime.ID() +func (w *Worker) checkCheckpointUsable(ctx context.Context, cp *checkpointsync.Checkpoint, remainingMask outstandingMask, genesisRound uint64) bool { + namespace := w.commonNode.Runtime.ID() if !namespace.Equal(&cp.Root.Namespace) { // Not for the right runtime. return false @@ -380,12 +389,12 @@ func (n *Node) checkCheckpointUsable(cp *checkpointsync.Checkpoint, remainingMas return false } - blk, err := n.commonNode.Runtime.History().GetCommittedBlock(n.ctx, cp.Root.Version) + blk, err := w.commonNode.Runtime.History().GetCommittedBlock(ctx, cp.Root.Version) if err != nil { - n.logger.Error("can't get block information for checkpoint, skipping", "err", err, "root", cp.Root) + w.logger.Error("can't get block information for checkpoint, skipping", "err", err, "root", cp.Root) return false } - _, lastIORoot, lastStateRoot := n.GetLastSynced() + _, lastIORoot, lastStateRoot := w.GetLastSynced() lastVersions := map[storageApi.RootType]uint64{ storageApi.RootTypeIO: lastIORoot.Version, storageApi.RootTypeState: lastStateRoot.Version, @@ -401,18 +410,18 @@ func (n *Node) checkCheckpointUsable(cp *checkpointsync.Checkpoint, remainingMas } } } - n.logger.Info("checkpoint for unknown root skipped", "root", cp.Root) + w.logger.Info("checkpoint for unknown root skipped", "root", cp.Root) return false } -func (n *Node) syncCheckpoints(genesisRound uint64, wantOnlyGenesis bool) (*blockSummary, error) { +func (w *Worker) syncCheckpoints(ctx context.Context, genesisRound uint64, wantOnlyGenesis bool) (*blockSummary, error) { // Store roots and round info for checkpoints that finished syncing. // Round and namespace info will get overwritten as rounds are skipped // for errors, driven by remainingRoots. var syncState blockSummary // Fetch checkpoints from peers. - cps, err := n.getCheckpointList() + cps, err := w.getCheckpointList(ctx) if err != nil { return nil, fmt.Errorf("can't get checkpoint list from peers: %w", err) } @@ -440,8 +449,8 @@ func (n *Node) syncCheckpoints(genesisRound uint64, wantOnlyGenesis bool) (*bloc if !multipartRunning { return } - if err := n.localStorage.NodeDB().AbortMultipartInsert(); err != nil { - n.logger.Error("error aborting multipart restore on exit from syncer", + if err := w.localStorage.NodeDB().AbortMultipartInsert(); err != nil { + w.logger.Error("error aborting multipart restore on exit from syncer", "err", err, ) } @@ -449,7 +458,7 @@ func (n *Node) syncCheckpoints(genesisRound uint64, wantOnlyGenesis bool) (*bloc for _, check := range cps { - if check.Root.Version < genesisRound || !n.checkCheckpointUsable(check, remainingRoots, genesisRound) { + if check.Root.Version < genesisRound || !w.checkCheckpointUsable(ctx, check, remainingRoots, genesisRound) { continue } @@ -458,10 +467,10 @@ func (n *Node) syncCheckpoints(genesisRound uint64, wantOnlyGenesis bool) (*bloc // previous retries. Aborting multipart works with no multipart in // progress too. multipartRunning = false - if err := n.localStorage.NodeDB().AbortMultipartInsert(); err != nil { + if err := w.localStorage.NodeDB().AbortMultipartInsert(); err != nil { return nil, fmt.Errorf("error aborting previous multipart restore: %w", err) } - if err := n.localStorage.NodeDB().StartMultipartInsert(check.Root.Version); err != nil { + if err := w.localStorage.NodeDB().StartMultipartInsert(check.Root.Version); err != nil { return nil, fmt.Errorf("error starting multipart insert for round %d: %w", check.Root.Version, err) } multipartRunning = true @@ -486,18 +495,18 @@ func (n *Node) syncCheckpoints(genesisRound uint64, wantOnlyGenesis bool) (*bloc } } - status, err := n.handleCheckpoint(check, n.checkpointSyncCfg.ChunkFetcherCount) + status, err := w.handleCheckpoint(ctx, check, w.checkpointSyncCfg.ChunkFetcherCount) switch status { case checkpointStatusDone: - n.logger.Info("successfully restored from checkpoint", "root", check.Root, "mask", mask) + w.logger.Info("successfully restored from checkpoint", "root", check.Root, "mask", mask) syncState.Namespace = check.Root.Namespace syncState.Round = check.Root.Version syncState.Roots = append(syncState.Roots, check.Root) remainingRoots.remove(check.Root.Type) if remainingRoots.isEmpty() { - if err = n.localStorage.NodeDB().Finalize(syncState.Roots); err != nil { - n.logger.Error("can't finalize version after all checkpoints restored", + if err = w.localStorage.NodeDB().Finalize(syncState.Roots); err != nil { + w.logger.Error("can't finalize version after all checkpoints restored", "err", err, "version", prevVersion, "roots", syncState.Roots, @@ -510,10 +519,10 @@ func (n *Node) syncCheckpoints(genesisRound uint64, wantOnlyGenesis bool) (*bloc } continue case checkpointStatusNext: - n.logger.Info("error trying to restore from checkpoint, trying next most recent", "root", check.Root, "err", err) + w.logger.Info("error trying to restore from checkpoint, trying next most recent", "root", check.Root, "err", err) continue case checkpointStatusBail: - n.logger.Error("error trying to restore from checkpoint, unrecoverable", "root", check.Root, "err", err) + w.logger.Error("error trying to restore from checkpoint, unrecoverable", "root", check.Root, "err", err) return nil, fmt.Errorf("error restoring from checkpoints: %w", err) } } diff --git a/go/worker/storage/committee/checkpoint_sync_test.go b/go/worker/storage/statesync/checkpoint_sync_test.go similarity index 98% rename from go/worker/storage/committee/checkpoint_sync_test.go rename to go/worker/storage/statesync/checkpoint_sync_test.go index d39e50f3239..c9ac133c1bd 100644 --- a/go/worker/storage/committee/checkpoint_sync_test.go +++ b/go/worker/storage/statesync/checkpoint_sync_test.go @@ -1,4 +1,4 @@ -package committee +package statesync import ( "testing" diff --git a/go/worker/storage/committee/metrics.go b/go/worker/storage/statesync/metrics.go similarity index 78% rename from go/worker/storage/committee/metrics.go rename to go/worker/storage/statesync/metrics.go index 7f641f71fdd..5af29597b21 100644 --- a/go/worker/storage/committee/metrics.go +++ b/go/worker/storage/statesync/metrics.go @@ -1,4 +1,4 @@ -package committee +package statesync import ( "sync" @@ -7,7 +7,7 @@ import ( ) var ( - storageWorkerLastFullRound = prometheus.NewGaugeVec( + storageWorkerLastFinalizedRound = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "oasis_worker_storage_full_round", Help: "The last round that was fully synced and finalized.", @@ -15,7 +15,7 @@ var ( []string{"runtime"}, ) - storageWorkerLastSyncedRound = prometheus.NewGaugeVec( + storageWorkerLastFullyAppliedRound = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "oasis_worker_storage_synced_round", Help: "The last round that was synced but not yet finalized.", @@ -40,8 +40,8 @@ var ( ) storageWorkerCollectors = []prometheus.Collector{ - storageWorkerLastFullRound, - storageWorkerLastSyncedRound, + storageWorkerLastFinalizedRound, + storageWorkerLastFullyAppliedRound, storageWorkerLastPendingRound, storageWorkerRoundSyncLatency, } @@ -49,9 +49,9 @@ var ( prometheusOnce sync.Once ) -func (n *Node) getMetricLabels() prometheus.Labels { +func (w *Worker) getMetricLabels() prometheus.Labels { return prometheus.Labels{ - "runtime": n.commonNode.Runtime.ID().String(), + "runtime": w.commonNode.Runtime.ID().String(), } } diff --git a/go/worker/storage/statesync/prune.go b/go/worker/storage/statesync/prune.go new file mode 100644 index 00000000000..8e1a9c3e20b --- /dev/null +++ b/go/worker/storage/statesync/prune.go @@ -0,0 +1,48 @@ +package statesync + +import ( + "fmt" + + "github.com/oasisprotocol/oasis-core/go/common/logging" + mkvsDB "github.com/oasisprotocol/oasis-core/go/storage/mkvs/db/api" +) + +type pruneHandler struct { + logger *logging.Logger + worker *Worker +} + +func (p *pruneHandler) Prune(rounds []uint64) error { + // Make sure we never prune past what was synced. + lastSycnedRound, _, _ := p.worker.GetLastSynced() + + for _, round := range rounds { + if round >= lastSycnedRound { + return fmt.Errorf("worker/storage: tried to prune past last synced round (last synced: %d)", + lastSycnedRound, + ) + } + + // Old suggestion: Make sure we don't prune rounds that need to be checkpointed but haven't been yet. + + p.logger.Debug("pruning storage for round", "round", round) + + // Prune given block. + err := p.worker.localStorage.NodeDB().Prune(round) + switch err { + case nil: + case mkvsDB.ErrNotEarliest: + p.logger.Debug("skipping non-earliest round", + "round", round, + ) + continue + default: + p.logger.Error("failed to prune block", + "err", err, + ) + return err + } + } + + return nil +} diff --git a/go/worker/storage/committee/node.go b/go/worker/storage/statesync/state_sync.go similarity index 50% rename from go/worker/storage/committee/node.go rename to go/worker/storage/statesync/state_sync.go index a1318a0802f..af1aa522d09 100644 --- a/go/worker/storage/committee/node.go +++ b/go/worker/storage/statesync/state_sync.go @@ -1,4 +1,6 @@ -package committee +// Package statesync defines the logic responsible for initializing, syncing, +// and pruning of the runtime state using the relevant p2p protocol clients. +package statesync import ( "container/heap" @@ -12,38 +14,25 @@ import ( "github.com/eapache/channels" "github.com/oasisprotocol/oasis-core/go/common/logging" - "github.com/oasisprotocol/oasis-core/go/common/node" "github.com/oasisprotocol/oasis-core/go/common/pubsub" "github.com/oasisprotocol/oasis-core/go/common/workerpool" "github.com/oasisprotocol/oasis-core/go/config" consensus "github.com/oasisprotocol/oasis-core/go/consensus/api" - commonFlags "github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/common/flags" "github.com/oasisprotocol/oasis-core/go/p2p/rpc" registryApi "github.com/oasisprotocol/oasis-core/go/registry/api" roothashApi "github.com/oasisprotocol/oasis-core/go/roothash/api" "github.com/oasisprotocol/oasis-core/go/roothash/api/block" - runtime "github.com/oasisprotocol/oasis-core/go/runtime/api" - "github.com/oasisprotocol/oasis-core/go/runtime/host" storageApi "github.com/oasisprotocol/oasis-core/go/storage/api" - "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" dbApi "github.com/oasisprotocol/oasis-core/go/storage/mkvs/db/api" - mkvsDB "github.com/oasisprotocol/oasis-core/go/storage/mkvs/db/api" - workerCommon "github.com/oasisprotocol/oasis-core/go/worker/common" "github.com/oasisprotocol/oasis-core/go/worker/common/committee" - "github.com/oasisprotocol/oasis-core/go/worker/registration" "github.com/oasisprotocol/oasis-core/go/worker/storage/api" "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/checkpointsync" "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/diffsync" - storagePub "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/pub" "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/synclegacy" ) -var ( - _ committee.NodeHooks = (*Node)(nil) - - // ErrNonLocalBackend is the error returned when the storage backend doesn't implement the LocalBackend interface. - ErrNonLocalBackend = errors.New("storage: storage backend doesn't support local storage") -) +// ErrNonLocalBackend is the error returned when the storage backend doesn't implement the LocalBackend interface. +var ErrNonLocalBackend = errors.New("storage: storage backend doesn't support local storage") const ( // RoundLatest is a magic value for the latest round. @@ -53,23 +42,12 @@ const ( checkpointSyncRetryDelay = 10 * time.Second - // The maximum number of rounds the worker can be behind the chain before it's sensible for - // it to register as available. - maximumRoundDelayForAvailability = uint64(10) - - // The minimum number of rounds the worker can be behind the chain before it's sensible for - // it to stop advertising availability. - minimumRoundDelayForUnavailability = uint64(15) - // maxInFlightRounds is the maximum number of rounds that should be fetched before waiting // for them to be applied. maxInFlightRounds = 100 - // chunkerThreads is target number of subtrees during parallel checkpoint creation. - // It is intentionally non-configurable since we want operators to produce - // same checkpoint hashes. The current value was chosen based on the benchmarks - // done on the modern developer machine. - chunkerThreads = 12 + // diffResponseTimeout is the maximum time for fetching storage diff from the peer. + diffResponseTimeout = 15 * time.Second ) type roundItem interface { @@ -118,14 +96,17 @@ type finalizeResult struct { err error } -// Node watches blocks for storage changes. -type Node struct { +// Worker is the runtime state sync worker, responsible for syncing state +// that corresponds to the incoming runtime block headers received from the +// consensus service. +// +// In addition this worker is responsible for: +// 1. Initializing the runtime state, possibly using checkpoints (if configured). +// 2. Pruning the state as specified by the configuration. +// 3. Creating (and optionally advertising) statesync p2p protocol clients and servers. +type Worker struct { commonNode *committee.Node - roleProvider registration.RoleProvider - rpcRoleProvider registration.RoleProvider - roleAvailable bool - logger *logging.Logger localStorage storageApi.LocalBackend @@ -136,17 +117,14 @@ type Node struct { undefinedRound uint64 - fetchPool *workerpool.Pool - - workerCommonCfg workerCommon.Config - - checkpointer checkpoint.Checkpointer checkpointSyncCfg *CheckpointSyncConfig checkpointSyncForced bool syncedLock sync.RWMutex syncedState blockSummary + finalizedNotifier *pubsub.Broker + statusLock sync.RWMutex status api.StorageWorkerStatus @@ -154,75 +132,45 @@ type Node struct { diffCh chan *fetchedDiff finalizeCh chan finalizeResult - ctx context.Context - ctxCancel context.CancelFunc - - quitCh chan struct{} - initCh chan struct{} } -func NewNode( +// New creates a new state sync worker. +func New( commonNode *committee.Node, - roleProvider registration.RoleProvider, - rpcRoleProvider registration.RoleProvider, - workerCommonCfg workerCommon.Config, localStorage storageApi.LocalBackend, + blockCh *channels.InfiniteChannel, checkpointSyncCfg *CheckpointSyncConfig, -) (*Node, error) { +) (*Worker, error) { initMetrics() - // Create the fetcher pool. - fetchPool := workerpool.New("storage_fetch/" + commonNode.Runtime.ID().String()) - fetchPool.Resize(config.GlobalConfig.Storage.FetcherCount) - - n := &Node{ + w := &Worker{ commonNode: commonNode, - roleProvider: roleProvider, - rpcRoleProvider: rpcRoleProvider, - - logger: logging.GetLogger("worker/storage/committee").With("runtime_id", commonNode.Runtime.ID()), - - workerCommonCfg: workerCommonCfg, + logger: logging.GetLogger("worker/storage/statesync").With("runtime_id", commonNode.Runtime.ID()), localStorage: localStorage, - fetchPool: fetchPool, - checkpointSyncCfg: checkpointSyncCfg, + finalizedNotifier: pubsub.NewBroker(false), + status: api.StatusInitializing, - blockCh: channels.NewInfiniteChannel(), + blockCh: blockCh, diffCh: make(chan *fetchedDiff), finalizeCh: make(chan finalizeResult), - quitCh: make(chan struct{}), initCh: make(chan struct{}), } - // Validate checkpoint sync configuration. - if err := checkpointSyncCfg.Validate(); err != nil { - return nil, fmt.Errorf("bad checkpoint sync configuration: %w", err) - } - // Initialize sync state. - n.syncedState.Round = defaultUndefinedRound - - n.ctx, n.ctxCancel = context.WithCancel(context.Background()) - - // Create a checkpointer (even if checkpointing is disabled) to ensure the genesis checkpoint is available. - checkpointer, err := n.newCheckpointer(n.ctx, commonNode, localStorage) - if err != nil { - return nil, fmt.Errorf("failed to create checkpointer: %w", err) - } - n.checkpointer = checkpointer + w.syncedState.Round = defaultUndefinedRound // Register prune handler. commonNode.Runtime.History().Pruner().RegisterHandler(&pruneHandler{ - logger: n.logger, - node: n, + logger: w.logger, + worker: w, }) // Advertise and serve p2p protocols. @@ -231,169 +179,55 @@ func NewNode( if config.GlobalConfig.Storage.Checkpointer.Enabled { commonNode.P2P.RegisterProtocolServer(checkpointsync.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) } - if rpcRoleProvider != nil { - commonNode.P2P.RegisterProtocolServer(storagePub.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) - } // Create p2p protocol clients. - n.legacyStorageSync = synclegacy.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) - n.diffSync = diffsync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) - n.checkpointSync = checkpointsync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) + w.legacyStorageSync = synclegacy.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) + w.diffSync = diffsync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) + w.checkpointSync = checkpointsync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) - return n, nil -} - -func (n *Node) newCheckpointer(ctx context.Context, commonNode *committee.Node, localStorage storageApi.LocalBackend) (checkpoint.Checkpointer, error) { - checkInterval := checkpoint.CheckIntervalDisabled - if config.GlobalConfig.Storage.Checkpointer.Enabled { - checkInterval = config.GlobalConfig.Storage.Checkpointer.CheckInterval - } - checkpointerCfg := checkpoint.CheckpointerConfig{ - Name: "runtime", - Namespace: commonNode.Runtime.ID(), - CheckInterval: checkInterval, - RootsPerVersion: 2, // State root and I/O root. - GetParameters: func(ctx context.Context) (*checkpoint.CreationParameters, error) { - rt, rerr := commonNode.Runtime.ActiveDescriptor(ctx) - if rerr != nil { - return nil, fmt.Errorf("failed to retrieve runtime descriptor: %w", rerr) - } - - blk, rerr := commonNode.Consensus.RootHash().GetGenesisBlock(ctx, &roothashApi.RuntimeRequest{ - RuntimeID: rt.ID, - Height: consensus.HeightLatest, - }) - if rerr != nil { - return nil, fmt.Errorf("failed to retrieve genesis block: %w", rerr) - } - - var threads uint16 - if config.GlobalConfig.Storage.Checkpointer.ParallelChunker { - threads = chunkerThreads - } - - return &checkpoint.CreationParameters{ - Interval: rt.Storage.CheckpointInterval, - NumKept: rt.Storage.CheckpointNumKept, - ChunkSize: rt.Storage.CheckpointChunkSize, - InitialVersion: blk.Header.Round, - ChunkerThreads: threads, - }, nil - }, - GetRoots: func(ctx context.Context, version uint64) ([]storageApi.Root, error) { - blk, berr := commonNode.Runtime.History().GetCommittedBlock(ctx, version) - if berr != nil { - return nil, berr - } - - return blk.Header.StorageRoots(), nil - }, - } - - return checkpoint.NewCheckpointer( - ctx, - localStorage.NodeDB(), - localStorage.Checkpointer(), - checkpointerCfg, - ) -} - -// Service interface. - -// Name returns the service name. -func (n *Node) Name() string { - return "committee node" -} - -// Start causes the worker to start responding to CometBFT new block events. -func (n *Node) Start() error { - go n.worker() - if config.GlobalConfig.Storage.Checkpointer.Enabled { - go n.consensusCheckpointSyncer() - } - return nil -} - -// Stop causes the worker to stop watching and shut down. -func (n *Node) Stop() { - n.statusLock.Lock() - n.status = api.StatusStopping - n.statusLock.Unlock() - - n.fetchPool.Stop() - - n.ctxCancel() -} - -// Quit returns a channel that will be closed when the worker stops. -func (n *Node) Quit() <-chan struct{} { - return n.quitCh -} - -// Cleanup cleans up any leftover state after the worker is stopped. -func (n *Node) Cleanup() { - // Nothing to do here? + return w, nil } // Initialized returns a channel that will be closed once the worker finished starting up. -func (n *Node) Initialized() <-chan struct{} { - return n.initCh +// +// If worker is initialized it is guaranteed that the storage has state available for the +// genesis height or higher. +func (w *Worker) Initialized() <-chan struct{} { + return w.initCh } -// GetStatus returns the storage committee node status. -func (n *Node) GetStatus(context.Context) (*api.Status, error) { - n.syncedLock.RLock() - defer n.syncedLock.RUnlock() +// GetStatus returns the state sync worker status. +func (w *Worker) GetStatus(context.Context) (*api.Status, error) { + w.syncedLock.RLock() + defer w.syncedLock.RUnlock() - n.statusLock.RLock() - defer n.statusLock.RUnlock() + w.statusLock.RLock() + defer w.statusLock.RUnlock() return &api.Status{ - LastFinalizedRound: n.syncedState.Round, - Status: n.status, + LastFinalizedRound: w.syncedState.Round, + Status: w.status, }, nil } -func (n *Node) PauseCheckpointer(pause bool) error { - if !commonFlags.DebugDontBlameOasis() { - return api.ErrCantPauseCheckpointer - } - n.checkpointer.Pause(pause) - return nil -} +// WatchFinalizedRounds watches block rounds that have been successfully finalized. +func (w *Worker) WatchFinalizedRounds() (<-chan uint64, pubsub.ClosableSubscription, error) { + ch := make(chan uint64) + sub := w.finalizedNotifier.Subscribe() + sub.Unwrap(ch) -// GetLocalStorage returns the local storage backend used by this storage node. -func (n *Node) GetLocalStorage() storageApi.LocalBackend { - return n.localStorage -} - -// NodeHooks implementation. - -// HandleNewBlockEarlyLocked is guarded by CrossNode. -func (n *Node) HandleNewBlockEarlyLocked(*runtime.BlockInfo) { - // Nothing to do here. -} - -// HandleNewBlockLocked is guarded by CrossNode. -func (n *Node) HandleNewBlockLocked(bi *runtime.BlockInfo) { - // Notify the state syncer that there is a new block. - n.blockCh.In() <- bi.RuntimeBlock -} - -// HandleRuntimeHostEventLocked is guarded by CrossNode. -func (n *Node) HandleRuntimeHostEventLocked(*host.Event) { - // Nothing to do here. + return ch, sub, nil } // Watcher implementation. // GetLastSynced returns the height, IORoot hash and StateRoot hash of the last block that was fully synced to. -func (n *Node) GetLastSynced() (uint64, storageApi.Root, storageApi.Root) { - n.syncedLock.RLock() - defer n.syncedLock.RUnlock() +func (w *Worker) GetLastSynced() (uint64, storageApi.Root, storageApi.Root) { + w.syncedLock.RLock() + defer w.syncedLock.RUnlock() var io, state storageApi.Root - for _, root := range n.syncedState.Roots { + for _, root := range w.syncedState.Roots { switch root.Type { case storageApi.RootTypeIO: io = root @@ -402,10 +236,10 @@ func (n *Node) GetLastSynced() (uint64, storageApi.Root, storageApi.Root) { } } - return n.syncedState.Round, io, state + return w.syncedState.Round, io, state } -func (n *Node) fetchDiff(round uint64, prevRoot, thisRoot storageApi.Root) { +func (w *Worker) fetchDiff(ctx context.Context, round uint64, prevRoot, thisRoot storageApi.Root) { result := &fetchedDiff{ fetched: false, pf: rpc.NewNopPeerFeedback(), @@ -415,13 +249,13 @@ func (n *Node) fetchDiff(round uint64, prevRoot, thisRoot storageApi.Root) { } defer func() { select { - case n.diffCh <- result: - case <-n.ctx.Done(): + case w.diffCh <- result: + case <-ctx.Done(): } }() // Check if the new root doesn't already exist. - if n.localStorage.NodeDB().HasRoot(thisRoot) { + if w.localStorage.NodeDB().HasRoot(thisRoot) { return } @@ -436,15 +270,15 @@ func (n *Node) fetchDiff(round uint64, prevRoot, thisRoot storageApi.Root) { } // New root does not yet exist in storage and we need to fetch it from a peer. - n.logger.Debug("calling GetDiff", + w.logger.Debug("calling GetDiff", "old_root", prevRoot, "new_root", thisRoot, ) - ctx, cancel := context.WithCancel(n.ctx) + diffCtx, cancel := context.WithTimeout(ctx, diffResponseTimeout) defer cancel() - wl, pf, err := n.getDiff(ctx, prevRoot, thisRoot) + wl, pf, err := w.getDiff(diffCtx, prevRoot, thisRoot) if err != nil { result.err = err return @@ -456,35 +290,35 @@ func (n *Node) fetchDiff(round uint64, prevRoot, thisRoot storageApi.Root) { // getDiff fetches writelog using diff sync p2p protocol client. // // In case of no peers or error, it fallbacks to the legacy storage sync protocol. -func (n *Node) getDiff(ctx context.Context, prevRoot, thisRoot storageApi.Root) (storageApi.WriteLog, rpc.PeerFeedback, error) { - rsp1, pf, err := n.diffSync.GetDiff(ctx, &diffsync.GetDiffRequest{StartRoot: prevRoot, EndRoot: thisRoot}) +func (w *Worker) getDiff(ctx context.Context, prevRoot, thisRoot storageApi.Root) (storageApi.WriteLog, rpc.PeerFeedback, error) { + rsp1, pf, err := w.diffSync.GetDiff(ctx, &diffsync.GetDiffRequest{StartRoot: prevRoot, EndRoot: thisRoot}) if err == nil { // if NO error return rsp1.WriteLog, pf, nil } - rsp2, pf, err := n.legacyStorageSync.GetDiff(ctx, &synclegacy.GetDiffRequest{StartRoot: prevRoot, EndRoot: thisRoot}) + rsp2, pf, err := w.legacyStorageSync.GetDiff(ctx, &synclegacy.GetDiffRequest{StartRoot: prevRoot, EndRoot: thisRoot}) if err != nil { return nil, nil, err } return rsp2.WriteLog, pf, nil } -func (n *Node) finalize(summary *blockSummary) { - err := n.localStorage.NodeDB().Finalize(summary.Roots) +func (w *Worker) finalize(ctx context.Context, summary *blockSummary) { + err := w.localStorage.NodeDB().Finalize(summary.Roots) switch err { case nil: - n.logger.Debug("storage round finalized", + w.logger.Debug("storage round finalized", "round", summary.Round, ) case storageApi.ErrAlreadyFinalized: // This can happen if we are restoring after a roothash migration or if // we crashed before updating the sync state. - n.logger.Warn("storage round already finalized", + w.logger.Warn("storage round already finalized", "round", summary.Round, ) err = nil default: - n.logger.Error("failed to finalize storage round", + w.logger.Error("failed to finalize storage round", "err", err, "round", summary.Round, ) @@ -496,31 +330,31 @@ func (n *Node) finalize(summary *blockSummary) { } select { - case n.finalizeCh <- result: - case <-n.ctx.Done(): + case w.finalizeCh <- result: + case <-ctx.Done(): } } -func (n *Node) initGenesis(rt *registryApi.Runtime, genesisBlock *block.Block) error { - n.logger.Info("initializing storage at genesis") +func (w *Worker) initGenesis(ctx context.Context, rt *registryApi.Runtime, genesisBlock *block.Block) error { + w.logger.Info("initializing storage at genesis") // Check what the latest finalized version in the database is as we may be using a database // from a previous version or network. - latestVersion, alreadyInitialized := n.localStorage.NodeDB().GetLatestVersion() + latestVersion, alreadyInitialized := w.localStorage.NodeDB().GetLatestVersion() // Finalize any versions that were not yet finalized in the old database. This is only possible // as long as there is only one non-finalized root per version. Note that we also cannot be sure // that any of these roots are valid, but this is fine as long as the final version matches the // genesis root. if alreadyInitialized { - n.logger.Debug("already initialized, finalizing any non-finalized versions", + w.logger.Debug("already initialized, finalizing any non-finalized versions", "genesis_state_root", genesisBlock.Header.StateRoot, "genesis_round", genesisBlock.Header.Round, "latest_version", latestVersion, ) for v := latestVersion + 1; v < genesisBlock.Header.Round; v++ { - roots, err := n.localStorage.NodeDB().GetRootsForVersion(v) + roots, err := w.localStorage.NodeDB().GetRootsForVersion(v) if err != nil { return fmt.Errorf("failed to fetch roots for version %d: %w", v, err) } @@ -535,7 +369,7 @@ func (n *Node) initGenesis(rt *registryApi.Runtime, genesisBlock *block.Block) e break // We must have exactly one non-finalized state root to continue. } - err = n.localStorage.NodeDB().Finalize(stateRoots) + err = w.localStorage.NodeDB().Finalize(stateRoots) if err != nil { return fmt.Errorf("failed to finalize version %d: %w", v, err) } @@ -559,14 +393,14 @@ func (n *Node) initGenesis(rt *registryApi.Runtime, genesisBlock *block.Block) e maybeRoot := stateRoot maybeRoot.Version = latestVersion - if n.localStorage.NodeDB().HasRoot(maybeRoot) { - n.logger.Debug("latest version earlier than genesis state root, filling in versions", + if w.localStorage.NodeDB().HasRoot(maybeRoot) { + w.logger.Debug("latest version earlier than genesis state root, filling in versions", "genesis_state_root", genesisBlock.Header.StateRoot, "genesis_round", genesisBlock.Header.Round, "latest_version", latestVersion, ) for v := latestVersion; v < stateRoot.Version; v++ { - err := n.localStorage.Apply(n.ctx, &storageApi.ApplyRequest{ + err := w.localStorage.Apply(ctx, &storageApi.ApplyRequest{ Namespace: rt.ID, RootType: storageApi.RootTypeState, SrcRound: v, @@ -579,7 +413,7 @@ func (n *Node) initGenesis(rt *registryApi.Runtime, genesisBlock *block.Block) e return fmt.Errorf("failed to fill in version %d: %w", v, err) } - err = n.localStorage.NodeDB().Finalize([]storageApi.Root{{ + err = w.localStorage.NodeDB().Finalize([]storageApi.Root{{ Namespace: rt.ID, Version: v + 1, Type: storageApi.RootTypeState, @@ -594,14 +428,14 @@ func (n *Node) initGenesis(rt *registryApi.Runtime, genesisBlock *block.Block) e } default: // Latest finalized version is the same or ahead, root must exist. - compatible = n.localStorage.NodeDB().HasRoot(stateRoot) + compatible = w.localStorage.NodeDB().HasRoot(stateRoot) } // If we are incompatible and the local version is greater or the same as the genesis version, // we cannot do anything. If the local version is lower we assume the node will sync from a // different node. if !compatible && latestVersion >= stateRoot.Version { - n.logger.Error("existing state is incompatible with runtime genesis state", + w.logger.Error("existing state is incompatible with runtime genesis state", "genesis_state_root", genesisBlock.Header.StateRoot, "genesis_round", genesisBlock.Header.Round, "latest_version", latestVersion, @@ -611,291 +445,130 @@ func (n *Node) initGenesis(rt *registryApi.Runtime, genesisBlock *block.Block) e if !compatible { // Database is empty, so assume the state will be replicated from another node. - n.logger.Warn("non-empty state root but no state available, assuming replication", + w.logger.Warn("non-empty state root but no state available, assuming replication", "state_root", genesisBlock.Header.StateRoot, ) - n.checkpointSyncForced = true + w.checkpointSyncForced = true } return nil } -func (n *Node) flushSyncedState(summary *blockSummary) (uint64, error) { - n.syncedLock.Lock() - defer n.syncedLock.Unlock() +func (w *Worker) flushSyncedState(summary *blockSummary) (uint64, error) { + w.syncedLock.Lock() + defer w.syncedLock.Unlock() + + w.syncedState = *summary + w.finalizedNotifier.Broadcast(summary.Round) - n.syncedState = *summary - if err := n.commonNode.Runtime.History().StorageSyncCheckpoint(n.syncedState.Round); err != nil { + if err := w.commonNode.Runtime.History().StorageSyncCheckpoint(w.syncedState.Round); err != nil { return 0, err } - return n.syncedState.Round, nil + return w.syncedState.Round, nil } -func (n *Node) consensusCheckpointSyncer() { - // Make sure we always create a checkpoint when the consensus layer creates a checkpoint. The - // reason why we do this is to make it faster for storage nodes that use consensus state sync - // to catch up as exactly the right checkpoint will be available. - consensusCp := n.commonNode.Consensus.Checkpointer() - if consensusCp == nil { - return - } +// Serve runs the state sync worker. +func (w *Worker) Serve(ctx context.Context) error { // nolint: gocyclo + defer close(w.diffCh) // Wait for the common node to be initialized. select { - case <-n.commonNode.Initialized(): - case <-n.ctx.Done(): - return + case <-w.commonNode.Initialized(): + case <-ctx.Done(): + close(w.initCh) + return ctx.Err() } - // Determine the maximum number of consensus checkpoints to keep. - consensusParams, err := n.commonNode.Consensus.Core().GetParameters(n.ctx, consensus.HeightLatest) - if err != nil { - n.logger.Error("failed to fetch consensus parameters", - "err", err, - ) - return - } + w.logger.Info("starting") + w.statusLock.Lock() + w.status = api.StatusStarting + w.statusLock.Unlock() - ch, sub, err := consensusCp.WatchCheckpoints() - if err != nil { - n.logger.Error("failed to watch checkpoints", - "err", err, - ) - return - } - defer sub.Close() + ctx, cancel := context.WithCancel(ctx) + defer cancel() - var ( - versions []uint64 - blkCh <-chan *consensus.Block - blkSub pubsub.ClosableSubscription - ) - defer func() { - if blkCh != nil { - blkSub.Close() - blkSub = nil - blkCh = nil - } - }() - for { + go func() { select { - case <-n.quitCh: - return - case <-n.ctx.Done(): - return - case version := <-ch: - // We need to wait for the next version as that is what will be in the consensus - // checkpoint. - versions = append(versions, version+1) - // Make sure that we limit the size of the checkpoint queue. - if uint64(len(versions)) > consensusParams.Parameters.StateCheckpointNumKept { - versions = versions[1:] - } - - n.logger.Debug("consensus checkpoint detected, queuing runtime checkpoint", - "version", version+1, - "num_versions", len(versions), - ) - - if blkCh == nil { - blkCh, blkSub, err = n.commonNode.Consensus.Core().WatchBlocks(n.ctx) - if err != nil { - n.logger.Error("failed to watch blocks", - "err", err, - ) - continue - } - } - case blk := <-blkCh: - // If there's nothing remaining, unsubscribe. - if len(versions) == 0 { - n.logger.Debug("no more queued consensus checkpoint versions") - - blkSub.Close() - blkSub = nil - blkCh = nil - continue - } - - var newVersions []uint64 - for idx, version := range versions { - if version > uint64(blk.Height) { - // We need to wait for further versions. - newVersions = versions[idx:] - break - } - - // Lookup what runtime round corresponds to the given consensus layer version and make - // sure we checkpoint it. - blk, err := n.commonNode.Consensus.RootHash().GetLatestBlock(n.ctx, &roothashApi.RuntimeRequest{ - RuntimeID: n.commonNode.Runtime.ID(), - Height: int64(version), - }) - if err != nil { - n.logger.Error("failed to get runtime block corresponding to consensus checkpoint", - "err", err, - "height", version, - ) - continue - } - - // We may have not yet synced the corresponding runtime round locally. In this case - // we need to wait until this is the case. - n.syncedLock.RLock() - lastSyncedRound := n.syncedState.Round - n.syncedLock.RUnlock() - if blk.Header.Round > lastSyncedRound { - n.logger.Debug("runtime round not available yet for checkpoint, waiting", - "height", version, - "round", blk.Header.Round, - "last_synced_round", lastSyncedRound, - ) - newVersions = versions[idx:] - break - } - - // Force runtime storage checkpointer to create a checkpoint at this round. - n.logger.Info("consensus checkpoint, force runtime checkpoint", - "height", version, - "round", blk.Header.Round, - ) - - n.checkpointer.ForceCheckpoint(blk.Header.Round) - } - versions = newVersions + case <-ctx.Done(): + w.statusLock.Lock() + w.status = api.StatusStopping + w.statusLock.Unlock() } - } -} - -// This is only called from the main worker goroutine, so no locking should be necessary. -func (n *Node) nudgeAvailability(lastSynced, latest uint64) { - if lastSynced == n.undefinedRound || latest == n.undefinedRound { - return - } - if latest-lastSynced < maximumRoundDelayForAvailability && !n.roleAvailable { - n.roleProvider.SetAvailable(func(_ *node.Node) error { - return nil - }) - if n.rpcRoleProvider != nil { - n.rpcRoleProvider.SetAvailable(func(_ *node.Node) error { - return nil - }) - } - n.roleAvailable = true - } - if latest-lastSynced > minimumRoundDelayForUnavailability && n.roleAvailable { - n.roleProvider.SetUnavailable() - if n.rpcRoleProvider != nil { - n.rpcRoleProvider.SetUnavailable() - } - n.roleAvailable = false - } -} - -func (n *Node) worker() { // nolint: gocyclo - defer close(n.quitCh) - defer close(n.diffCh) - - // Wait for the common node to be initialized. - select { - case <-n.commonNode.Initialized(): - case <-n.ctx.Done(): - close(n.initCh) - return - } - - n.logger.Info("starting committee node") - - n.statusLock.Lock() - n.status = api.StatusStarting - n.statusLock.Unlock() + }() + defer w.logger.Info("stopped") // Determine genesis block. - genesisBlock, err := n.commonNode.Consensus.RootHash().GetGenesisBlock(n.ctx, &roothashApi.RuntimeRequest{ - RuntimeID: n.commonNode.Runtime.ID(), + genesisBlock, err := w.commonNode.Consensus.RootHash().GetGenesisBlock(ctx, &roothashApi.RuntimeRequest{ + RuntimeID: w.commonNode.Runtime.ID(), Height: consensus.HeightLatest, }) if err != nil { - n.logger.Error("can't retrieve genesis block", "err", err) - return + return fmt.Errorf("can't retrieve genesis block: %w", err) } - n.undefinedRound = genesisBlock.Header.Round - 1 + w.undefinedRound = genesisBlock.Header.Round - 1 // Determine last finalized storage version. - if version, dbNonEmpty := n.localStorage.NodeDB().GetLatestVersion(); dbNonEmpty { + if version, dbNonEmpty := w.localStorage.NodeDB().GetLatestVersion(); dbNonEmpty { var blk *block.Block - blk, err = n.commonNode.Runtime.History().GetCommittedBlock(n.ctx, version) + blk, err = w.commonNode.Runtime.History().GetCommittedBlock(ctx, version) switch err { case nil: // Set last synced version to last finalized storage version. - if _, err = n.flushSyncedState(summaryFromBlock(blk)); err != nil { - n.logger.Error("failed to flush synced state", "err", err) - return + if _, err = w.flushSyncedState(summaryFromBlock(blk)); err != nil { + return fmt.Errorf("failed to flush synced state: %w", err) } default: // Failed to fetch historic block. This is fine when the network just went through a // dump/restore upgrade and we don't have any information before genesis. We treat the // database as unsynced and will proceed to either use checkpoints or sync iteratively. - n.logger.Warn("failed to fetch historic block", + w.logger.Warn("failed to fetch historic block", "err", err, "round", version, ) } } - n.syncedLock.RLock() - cachedLastRound := n.syncedState.Round - n.syncedLock.RUnlock() - if cachedLastRound == defaultUndefinedRound || cachedLastRound < genesisBlock.Header.Round { - cachedLastRound = n.undefinedRound + w.syncedLock.RLock() + lastFinalizedRound := w.syncedState.Round + w.syncedLock.RUnlock() + if lastFinalizedRound == defaultUndefinedRound || lastFinalizedRound < genesisBlock.Header.Round { + lastFinalizedRound = w.undefinedRound } // Initialize genesis from the runtime descriptor. - isInitialStartup := (cachedLastRound == n.undefinedRound) + isInitialStartup := (lastFinalizedRound == w.undefinedRound) if isInitialStartup { - n.statusLock.Lock() - n.status = api.StatusInitializingGenesis - n.statusLock.Unlock() + w.statusLock.Lock() + w.status = api.StatusInitializingGenesis + w.statusLock.Unlock() var rt *registryApi.Runtime - rt, err = n.commonNode.Runtime.ActiveDescriptor(n.ctx) + rt, err = w.commonNode.Runtime.ActiveDescriptor(ctx) if err != nil { - n.logger.Error("failed to retrieve runtime registry descriptor", - "err", err, - ) - return + return fmt.Errorf("failed to retrieve runtime registry descriptor: %w", err) } - if err = n.initGenesis(rt, genesisBlock); err != nil { - n.logger.Error("failed to initialize storage at genesis", - "err", err, - ) - return + if err = w.initGenesis(ctx, rt, genesisBlock); err != nil { + return fmt.Errorf("failed to initialize storage at genesis: %w", err) } } - // Notify the checkpointer of the genesis round so it can be checkpointed. - if n.checkpointer != nil { - n.checkpointer.ForceCheckpoint(genesisBlock.Header.Round) - n.checkpointer.Flush() - } - // Check if we are able to fetch the first block that we would be syncing if we used iterative // syncing. In case we cannot (likely because we synced the consensus layer via state sync), we // must wait for a later checkpoint to become available. - if !n.checkpointSyncForced { - n.statusLock.Lock() - n.status = api.StatusSyncStartCheck - n.statusLock.Unlock() + if !w.checkpointSyncForced { + w.statusLock.Lock() + w.status = api.StatusSyncStartCheck + w.statusLock.Unlock() // Determine what is the first round that we would need to sync. - iterativeSyncStart := cachedLastRound - if iterativeSyncStart == n.undefinedRound { + iterativeSyncStart := lastFinalizedRound + if iterativeSyncStart == w.undefinedRound { iterativeSyncStart++ } // Check if we actually have information about that round. This assumes that any reindexing // was already performed (the common node would not indicate being initialized otherwise). - _, err = n.commonNode.Runtime.History().GetCommittedBlock(n.ctx, iterativeSyncStart) + _, err = w.commonNode.Runtime.History().GetCommittedBlock(ctx, iterativeSyncStart) SyncStartCheck: switch { case err == nil: @@ -903,7 +576,7 @@ func (n *Node) worker() { // nolint: gocyclo // No information is available about the initial round. Query the earliest historic // block and check if that block has the genesis state root and empty I/O root. var earlyBlk *block.Block - earlyBlk, err = n.commonNode.Runtime.History().GetEarliestBlock(n.ctx) + earlyBlk, err = w.commonNode.Runtime.History().GetEarliestBlock(ctx) switch err { case nil: // Make sure the state root is still the same as at genesis time. @@ -917,13 +590,13 @@ func (n *Node) worker() { // nolint: gocyclo // If this is the case, we can start syncing from this round instead. Fill in the // remaining versions to make sure they actually exist in the database. - n.logger.Debug("filling in versions to genesis", + w.logger.Debug("filling in versions to genesis", "genesis_round", genesisBlock.Header.Round, "earliest_round", earlyBlk.Header.Round, ) for v := genesisBlock.Header.Round; v < earlyBlk.Header.Round; v++ { - err = n.localStorage.Apply(n.ctx, &storageApi.ApplyRequest{ - Namespace: n.commonNode.Runtime.ID(), + err = w.localStorage.Apply(ctx, &storageApi.ApplyRequest{ + Namespace: w.commonNode.Runtime.ID(), RootType: storageApi.RootTypeState, SrcRound: v, SrcRoot: genesisBlock.Header.StateRoot, @@ -937,65 +610,49 @@ func (n *Node) worker() { // nolint: gocyclo // Ignore already finalized versions. continue default: - n.logger.Error("failed to fill in version", - "version", v, - "err", err, - ) - return + return fmt.Errorf("failed to fill in version %d: %w", v, err) } - err = n.localStorage.NodeDB().Finalize([]storageApi.Root{{ - Namespace: n.commonNode.Runtime.ID(), + err = w.localStorage.NodeDB().Finalize([]storageApi.Root{{ + Namespace: w.commonNode.Runtime.ID(), Version: v + 1, Type: storageApi.RootTypeState, Hash: genesisBlock.Header.StateRoot, // We can ignore I/O roots. }}) if err != nil { - n.logger.Error("failed to finalize filled in version", - "version", v, - "err", err, - ) - return + return fmt.Errorf("failed to finalize filled in version %v: %w", v, err) } } - cachedLastRound, err = n.flushSyncedState(summaryFromBlock(earlyBlk)) + lastFinalizedRound, err = w.flushSyncedState(summaryFromBlock(earlyBlk)) if err != nil { - n.logger.Error("failed to flush synced state", - "err", err, - ) - return + return fmt.Errorf("failed to flush synced state: %w", err) } // No need to force a checkpoint sync. break SyncStartCheck default: // This should never happen as the block should exist. - n.logger.Warn("failed to query earliest block in local history", + w.logger.Warn("failed to query earliest block in local history", "err", err, ) } // No information is available about this round, force checkpoint sync. - n.logger.Warn("forcing checkpoint sync as we don't have authoritative block info", + w.logger.Warn("forcing checkpoint sync as we don't have authoritative block info", "round", iterativeSyncStart, ) - n.checkpointSyncForced = true + w.checkpointSyncForced = true default: // Unknown error while fetching block information, abort. - n.logger.Error("failed to query block", - "err", err, - ) - return + return fmt.Errorf("failed to query block: %w", err) } } - n.logger.Info("worker initialized", + w.logger.Info("worker initialized", "genesis_round", genesisBlock.Header.Round, - "last_synced", cachedLastRound, + "last_finalized_round", lastFinalizedRound, ) - lastFullyAppliedRound := cachedLastRound - // Try to perform initial sync from state and io checkpoints if either: // // - Checkpoint sync has been forced because there is insufficient information available to use @@ -1008,10 +665,10 @@ func (n *Node) worker() { // nolint: gocyclo // to a later state which may not be desired given that checkpoint sync has been explicitly // disabled via config. // - if (isInitialStartup && !n.checkpointSyncCfg.Disabled) || n.checkpointSyncForced { - n.statusLock.Lock() - n.status = api.StatusSyncingCheckpoints - n.statusLock.Unlock() + if (isInitialStartup && !w.checkpointSyncCfg.Disabled) || w.checkpointSyncForced { + w.statusLock.Lock() + w.status = api.StatusSyncingCheckpoints + w.statusLock.Unlock() var ( summary *blockSummary @@ -1019,17 +676,17 @@ func (n *Node) worker() { // nolint: gocyclo ) CheckpointSyncRetry: for { - summary, err = n.syncCheckpoints(genesisBlock.Header.Round, n.checkpointSyncCfg.Disabled) + summary, err = w.syncCheckpoints(ctx, genesisBlock.Header.Round, w.checkpointSyncCfg.Disabled) if err == nil { break } attempt++ - switch n.checkpointSyncForced { + switch w.checkpointSyncForced { case true: // We have no other options but to perform a checkpoint sync as we are missing // either state or authoritative blocks. - n.logger.Info("checkpoint sync required, retrying", + w.logger.Info("checkpoint sync required, retrying", "err", err, "attempt", attempt, ) @@ -1041,43 +698,47 @@ func (n *Node) worker() { // nolint: gocyclo // Try syncing again. The main reason for this is the sync failing due to a // checkpoint pruning race condition (where nodes list a checkpoint which is // then deleted just before we request its chunks). One retry is enough. - n.logger.Info("first checkpoint sync failed, trying once more", "err", err) + w.logger.Info("first checkpoint sync failed, trying once more", "err", err) } // Delay before retrying. select { case <-time.After(checkpointSyncRetryDelay): - case <-n.ctx.Done(): - return + case <-ctx.Done(): + return ctx.Err() } } if err != nil { - n.logger.Info("checkpoint sync failed", "err", err) + w.logger.Info("checkpoint sync failed", "err", err) } else { - cachedLastRound, err = n.flushSyncedState(summary) + lastFinalizedRound, err = w.flushSyncedState(summary) if err != nil { - n.logger.Error("failed to flush synced state", - "err", err, - ) - return + return fmt.Errorf("failed to flush synced state %w", err) } - lastFullyAppliedRound = cachedLastRound - n.logger.Info("checkpoint sync succeeded", + w.logger.Info("checkpoint sync succeeded", logging.LogEvent, LogEventCheckpointSyncSuccess, ) } } - close(n.initCh) + close(w.initCh) + w.logger.Info("initialized") - // Don't register availability immediately, we want to know first how far behind consensus we are. - latestBlockRound := n.undefinedRound + // Main syncing loop: + err = nil + var wg sync.WaitGroup + + latestBlockRound := w.undefinedRound // Don't register availability immediately, we want to know first how far behind consensus we are. + lastFullyAppliedRound := lastFinalizedRound + syncingRounds := make(map[uint64]*inFlight) + summaryCache := make(map[uint64]*blockSummary) + + fetchPool := workerpool.New("storage_fetch/" + w.commonNode.Runtime.ID().String()) + fetchPool.Resize(config.GlobalConfig.Storage.FetcherCount) + defer fetchPool.Stop() heartbeat := heartbeat{} heartbeat.reset() - var wg sync.WaitGroup - syncingRounds := make(map[uint64]*inFlight) - summaryCache := make(map[uint64]*blockSummary) triggerRoundFetches := func() { for i := lastFullyAppliedRound + 1; i <= latestBlockRound; i++ { syncing, ok := syncingRounds[i] @@ -1097,10 +758,10 @@ func (n *Node) worker() { // nolint: gocyclo syncingRounds[i] = syncing if i == latestBlockRound { - storageWorkerLastPendingRound.With(n.getMetricLabels()).Set(float64(i)) + storageWorkerLastPendingRound.With(w.getMetricLabels()).Set(float64(i)) } } - n.logger.Debug("preparing round sync", + w.logger.Debug("preparing round sync", "round", i, "outstanding_mask", syncing.outstanding, "awaiting_retry", syncing.awaitingRetry, @@ -1128,18 +789,18 @@ func (n *Node) worker() { // nolint: gocyclo if !syncing.outstanding.contains(rootType) && syncing.awaitingRetry.contains(rootType) { syncing.scheduleDiff(rootType) wg.Add(1) - n.fetchPool.Submit(func() { + fetchPool.Submit(func() { defer wg.Done() - n.fetchDiff(this.Round, prevRoots[i], this.Roots[i]) + w.fetchDiff(ctx, this.Round, prevRoots[i], this.Roots[i]) }) } } } } - n.statusLock.Lock() - n.status = api.StatusSyncingRounds - n.statusLock.Unlock() + w.statusLock.Lock() + w.status = api.StatusSyncingRounds + w.statusLock.Unlock() pendingApply := &minRoundQueue{} pendingFinalize := &minRoundQueue{} @@ -1163,7 +824,7 @@ mainLoop: // Apply the write log if one exists. err = nil if lastDiff.fetched { - err = n.localStorage.Apply(n.ctx, &storageApi.ApplyRequest{ + err = w.localStorage.Apply(ctx, &storageApi.ApplyRequest{ Namespace: lastDiff.thisRoot.Namespace, RootType: lastDiff.thisRoot.Type, SrcRound: lastDiff.prevRoot.Version, @@ -1178,7 +839,7 @@ mainLoop: case errors.Is(err, storageApi.ErrExpectedRootMismatch): lastDiff.pf.RecordBadPeer() default: - n.logger.Error("can't apply write log", + w.logger.Error("can't apply write log", "err", err, "old_root", lastDiff.prevRoot, "new_root", lastDiff.thisRoot, @@ -1198,14 +859,14 @@ mainLoop: } // We have fully synced the given round. - n.logger.Debug("finished syncing round", "round", lastDiff.round) + w.logger.Debug("finished syncing round", "round", lastDiff.round) delete(syncingRounds, lastDiff.round) summary := summaryCache[lastDiff.round] delete(summaryCache, lastDiff.round-1) lastFullyAppliedRound = lastDiff.round - storageWorkerLastSyncedRound.With(n.getMetricLabels()).Set(float64(lastDiff.round)) - storageWorkerRoundSyncLatency.With(n.getMetricLabels()).Observe(time.Since(syncing.startedAt).Seconds()) + storageWorkerLastFullyAppliedRound.With(w.getMetricLabels()).Set(float64(lastDiff.round)) + storageWorkerRoundSyncLatency.With(w.getMetricLabels()).Observe(time.Since(syncing.startedAt).Seconds()) // Finalize storage for this round. This happens asynchronously // with respect to Apply operations for subsequent rounds. @@ -1217,30 +878,29 @@ mainLoop: // Check if any new rounds were fully applied and need to be finalized. // Only finalize if it's the round after the one that was finalized last. // As a consequence at most one finalization can be happening at the time. - if len(*pendingFinalize) > 0 && cachedLastRound+1 == (*pendingFinalize)[0].GetRound() { + if len(*pendingFinalize) > 0 && lastFinalizedRound+1 == (*pendingFinalize)[0].GetRound() { lastSummary := heap.Pop(pendingFinalize).(*blockSummary) wg.Add(1) go func() { // Don't block fetching and applying remaining rounds. defer wg.Done() - n.finalize(lastSummary) + w.finalize(ctx, lastSummary) }() continue } select { - case inBlk := <-n.blockCh.Out(): + case inBlk := <-w.blockCh.Out(): blk := inBlk.(*block.Block) - n.logger.Debug("incoming block", + w.logger.Debug("incoming block", "round", blk.Header.Round, - "last_synced", lastFullyAppliedRound, - "last_finalized", cachedLastRound, + "last_fully_applied", lastFullyAppliedRound, + "last_finalized", lastFinalizedRound, ) // Check if we're far enough to reasonably register as available. latestBlockRound = blk.Header.Round - n.nudgeAvailability(cachedLastRound, latestBlockRound) - if _, ok := summaryCache[lastFullyAppliedRound]; !ok && lastFullyAppliedRound == n.undefinedRound { + if _, ok := summaryCache[lastFullyAppliedRound]; !ok && lastFullyAppliedRound == w.undefinedRound { dummy := blockSummary{ Namespace: blk.Header.Namespace, Round: lastFullyAppliedRound + 1, @@ -1264,7 +924,7 @@ mainLoop: // since the undefined round may be unsigned -1 and in this case the loop // would not do any iterations. startSummaryRound := lastFullyAppliedRound - if startSummaryRound == n.undefinedRound { + if startSummaryRound == w.undefinedRound { startSummaryRound++ } for i := startSummaryRound; i < blk.Header.Round; i++ { @@ -1272,14 +932,10 @@ mainLoop: continue } var oldBlock *block.Block - oldBlock, err = n.commonNode.Runtime.History().GetCommittedBlock(n.ctx, i) + oldBlock, err = w.commonNode.Runtime.History().GetCommittedBlock(ctx, i) if err != nil { - n.logger.Error("can't get block for round", - "err", err, - "round", i, - "current_round", blk.Header.Round, - ) - panic("can't get block in storage worker") + err = fmt.Errorf("failed to get block for round %d (current round: %d): %w", i, blk.Header.Round, err) + break mainLoop } summaryCache[i] = summaryFromBlock(oldBlock) } @@ -1291,14 +947,14 @@ mainLoop: heartbeat.reset() case <-heartbeat.C: - if latestBlockRound != n.undefinedRound { - n.logger.Debug("heartbeat", "in_flight_rounds", len(syncingRounds)) + if latestBlockRound != w.undefinedRound { + w.logger.Debug("heartbeat", "in_flight_rounds", len(syncingRounds)) triggerRoundFetches() } - case item := <-n.diffCh: + case item := <-w.diffCh: if item.err != nil { - n.logger.Error("error calling getdiff", + w.logger.Error("error calling getdiff", "err", item.err, "round", item.round, "old_root", item.prevRoot, @@ -1315,81 +971,36 @@ mainLoop: // when we're syncing and are far behind. triggerRoundFetches() - case finalized := <-n.finalizeCh: + case finalized := <-w.finalizeCh: // If finalization failed, things start falling apart. // There's no point redoing it, since it's probably not a transient - // error, and cachedLastRound also can't be updated legitimately. + // error, and lastFinalizedRound also can't be updated legitimately. if finalized.err != nil { - // Request a node shutdown given that syncing is effectively blocked. - _ = n.commonNode.HostNode.RequestShutdown(n.ctx, false) + w.logger.Error("failed to finalize", "err", err, "summary", finalized.summary) + err = fmt.Errorf("failed to finalize (round: %d): %w", finalized.summary.Round, finalized.err) break mainLoop } // No further sync or out of order handling needed here, since - // only one finalize at a time is triggered (for round cachedLastRound+1) - cachedLastRound, err = n.flushSyncedState(finalized.summary) + // only one finalize at a time is triggered (for round lastFinalizedLastRound+1) + lastFinalizedRound, err = w.flushSyncedState(finalized.summary) if err != nil { - n.logger.Error("failed to flush synced state", + w.logger.Error("failed to flush synced state", "err", err, ) } - storageWorkerLastFullRound.With(n.getMetricLabels()).Set(float64(finalized.summary.Round)) - - // Check if we're far enough to reasonably register as available. - n.nudgeAvailability(cachedLastRound, latestBlockRound) - - // Notify the checkpointer that there is a new finalized round. - if config.GlobalConfig.Storage.Checkpointer.Enabled { - n.checkpointer.NotifyNewVersion(finalized.summary.Round) - } + storageWorkerLastFinalizedRound.With(w.getMetricLabels()).Set(float64(finalized.summary.Round)) - case <-n.ctx.Done(): + case <-ctx.Done(): + err = ctx.Err() break mainLoop } } + cancel() // Ctx has to be canceled so that fetcher go routines can be emptied. wg.Wait() // blockCh will be garbage-collected without being closed. It can potentially still contain // some new blocks, but only as many as were already in-flight at the point when the main // context was canceled. -} - -type pruneHandler struct { - logger *logging.Logger - node *Node -} - -func (p *pruneHandler) Prune(rounds []uint64) error { - // Make sure we never prune past what was synced. - lastSycnedRound, _, _ := p.node.GetLastSynced() - - for _, round := range rounds { - if round >= lastSycnedRound { - return fmt.Errorf("worker/storage: tried to prune past last synced round (last synced: %d)", - lastSycnedRound, - ) - } - - // TODO: Make sure we don't prune rounds that need to be checkpointed but haven't been yet. - - p.logger.Debug("pruning storage for round", "round", round) - - // Prune given block. - err := p.node.localStorage.NodeDB().Prune(round) - switch err { - case nil: - case mkvsDB.ErrNotEarliest: - p.logger.Debug("skipping non-earliest round", - "round", round, - ) - continue - default: - p.logger.Error("failed to prune block", - "err", err, - ) - return err - } - } - - return nil + return err } diff --git a/go/worker/storage/committee/utils.go b/go/worker/storage/statesync/utils.go similarity index 99% rename from go/worker/storage/committee/utils.go rename to go/worker/storage/statesync/utils.go index 863b9fc7bd0..88adb492b33 100644 --- a/go/worker/storage/committee/utils.go +++ b/go/worker/storage/statesync/utils.go @@ -1,4 +1,4 @@ -package committee +package statesync import ( "fmt" diff --git a/go/worker/storage/worker.go b/go/worker/storage/worker.go index f49988bd1c7..16ef4fe0d86 100644 --- a/go/worker/storage/worker.go +++ b/go/worker/storage/worker.go @@ -1,8 +1,11 @@ package storage import ( + "context" "fmt" + "golang.org/x/sync/errgroup" + "github.com/oasisprotocol/oasis-core/go/common" "github.com/oasisprotocol/oasis-core/go/common/grpc" "github.com/oasisprotocol/oasis-core/go/common/logging" @@ -12,10 +15,11 @@ import ( committeeCommon "github.com/oasisprotocol/oasis-core/go/worker/common/committee" "github.com/oasisprotocol/oasis-core/go/worker/registration" storageWorkerAPI "github.com/oasisprotocol/oasis-core/go/worker/storage/api" - "github.com/oasisprotocol/oasis-core/go/worker/storage/committee" + "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/pub" + "github.com/oasisprotocol/oasis-core/go/worker/storage/statesync" ) -// Worker is a worker handling storage operations. +// Worker is a worker handling storage operations for all common worker runtimes. type Worker struct { enabled bool @@ -26,7 +30,10 @@ type Worker struct { initCh chan struct{} quitCh chan struct{} - runtimes map[common.Namespace]*committee.Node + runtimes map[common.Namespace]*worker + + ctx context.Context + cancel context.CancelFunc } // New constructs a new storage worker. @@ -35,6 +42,7 @@ func New( commonWorker *workerCommon.Worker, registration *registration.Worker, ) (*Worker, error) { + ctx, cancel := context.WithCancel(context.Background()) enabled := config.GlobalConfig.Mode.HasLocalStorage() && len(commonWorker.GetRuntimes()) > 0 s := &Worker{ @@ -44,14 +52,16 @@ func New( logger: logging.GetLogger("worker/storage"), initCh: make(chan struct{}), quitCh: make(chan struct{}), - runtimes: make(map[common.Namespace]*committee.Node), + runtimes: make(map[common.Namespace]*worker), + ctx: ctx, + cancel: cancel, } if !enabled { return s, nil } - // Start storage node for every runtime. + // Register the state sync worker for every runtime. for id, rt := range s.commonWorker.GetRuntimes() { if err := s.registerRuntime(rt); err != nil { return nil, fmt.Errorf("failed to create storage worker for runtime %s: %w", id, err) @@ -70,6 +80,11 @@ func (w *Worker) registerRuntime(commonNode *committeeCommon.Node) error { "runtime_id", id, ) + localStorage, err := NewLocalBackend(commonNode.Runtime.DataDir(), id) + if err != nil { + return fmt.Errorf("can't create local storage backend: %w", err) + } + // Since the storage node is always coupled with another role, make sure to not add any // particular role here. Instead this only serves to prevent registration until the storage node // is synced by making the role provider unavailable. @@ -84,29 +99,27 @@ func (w *Worker) registerRuntime(commonNode *committeeCommon.Node) error { return fmt.Errorf("failed to create rpc role provider: %w", err) } } - - localStorage, err := NewLocalBackend(commonNode.Runtime.DataDir(), id) - if err != nil { - return fmt.Errorf("can't create local storage backend: %w", err) + if rpRPC != nil { + commonNode.P2P.RegisterProtocolServer(pub.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) } - node, err := committee.NewNode( + worker, err := newRuntimeWorker( commonNode, rp, rpRPC, - w.commonWorker.GetConfig(), localStorage, - &committee.CheckpointSyncConfig{ + &statesync.CheckpointSyncConfig{ Disabled: config.GlobalConfig.Storage.CheckpointSyncDisabled, ChunkFetcherCount: config.GlobalConfig.Storage.FetcherCount, }, + config.GlobalConfig.Storage.Checkpointer.Enabled, ) if err != nil { return err } commonNode.Runtime.RegisterStorage(localStorage) - commonNode.AddHooks(node) - w.runtimes[id] = node + commonNode.AddHooks(worker) + w.runtimes[id] = worker w.logger.Info("new runtime registered", "runtime_id", id, @@ -115,7 +128,7 @@ func (w *Worker) registerRuntime(commonNode *committeeCommon.Node) error { return nil } -// Name returns the service name. +// Name returns the worker name. func (w *Worker) Name() string { return "storage worker" } @@ -133,6 +146,25 @@ func (w *Worker) Initialized() <-chan struct{} { // Start starts the storage service. func (w *Worker) Start() error { + go func() { + if err := w.Serve(w.ctx); err != nil { + w.logger.Error("failed", "error", err) + } + }() + return nil +} + +// Serve starts a state sync worker for each of the configured runtime, unless +// disabled. +// +// Once all workers have been initialized the init channel is closed. +// +// If any state sync worker returns an error, it cancels the remaining ones and +// waits for all of them to finish. The error from the first failing worker is +// returned. +// +// Finally, upon exit the quit channel is closed. +func (w *Worker) Serve(ctx context.Context) error { if !w.enabled { w.logger.Info("not starting storage worker as it is disabled") @@ -142,34 +174,35 @@ func (w *Worker) Start() error { return nil } - // Wait for all runtimes to terminate. - go func() { - defer close(w.quitCh) - - for _, r := range w.runtimes { - <-r.Quit() - } + w.logger.Info("starting", "num_runtimes", len(w.runtimes)) + defer func() { + close(w.quitCh) + w.logger.Info("stopped") }() - // Start all runtimes and wait for initialization. go func() { - w.logger.Info("starting storage sync services", "num_runtimes", len(w.runtimes)) - - for _, r := range w.runtimes { - _ = r.Start() - } - - // Wait for runtimes to be initialized. for _, r := range w.runtimes { <-r.Initialized() } - - w.logger.Info("storage worker started") - + w.logger.Info("initialized") close(w.initCh) }() - return nil + return w.serve(ctx) +} + +func (w *Worker) serve(ctx context.Context) error { + g, ctx := errgroup.WithContext(ctx) + for id, r := range w.runtimes { + g.Go(func() error { + err := r.serve(ctx) + if err != nil { + return fmt.Errorf("runtime storage worker failed (runtimeID: %s): %w", id, err) + } + return nil + }) + } + return g.Wait() } // Stop halts the service. @@ -179,9 +212,9 @@ func (w *Worker) Stop() { return } - for _, r := range w.runtimes { - r.Stop() - } + w.logger.Info("stopping") + w.cancel() + <-w.quitCh } // Quit returns a channel that will be closed when the service terminates. @@ -193,9 +226,11 @@ func (w *Worker) Quit() <-chan struct{} { func (w *Worker) Cleanup() { } -// GetRuntime returns a storage committee node for the given runtime (if available). +// GetRuntime returns the state sync for the given runtime (if available). // // In case the runtime with the specified id was not configured for this node it returns nil. -func (w *Worker) GetRuntime(id common.Namespace) *committee.Node { - return w.runtimes[id] +// +// Sugggestion: This is only used to get status, how about making this GetRuntimeStatus? +func (w *Worker) GetRuntime(id common.Namespace) *statesync.Worker { + return w.runtimes[id].stateSync }