diff --git a/.changelog/6306.trivial.md b/.changelog/6306.trivial.md new file mode 100644 index 00000000000..e69de29bb2d diff --git a/go/consensus/cometbft/abci/state.go b/go/consensus/cometbft/abci/state.go index e580cd3e720..ced587f45de 100644 --- a/go/consensus/cometbft/abci/state.go +++ b/go/consensus/cometbft/abci/state.go @@ -782,10 +782,11 @@ func newApplicationState(ctx context.Context, upgrader upgrade.Backend, cfg *App }, nil }, } - s.checkpointer, err = checkpoint.NewCheckpointer(s.ctx, ndb, ldb.Checkpointer(), checkpointerCfg) - if err != nil { - return nil, fmt.Errorf("state: failed to create checkpointer: %w", err) - } + s.checkpointer = checkpoint.NewCheckpointer(ndb, ldb.Checkpointer(), checkpointerCfg) + go func() { + err := s.checkpointer.Serve(ctx) + s.logger.Error("checkpointer stopped", "err", err) + }() } go s.metricsWorker() diff --git a/go/oasis-node/cmd/node/node_control.go b/go/oasis-node/cmd/node/node_control.go index 9310d0780f5..450d00e6b04 100644 --- a/go/oasis-node/cmd/node/node_control.go +++ b/go/oasis-node/cmd/node/node_control.go @@ -312,8 +312,8 @@ func (n *Node) getRuntimeStatus(ctx context.Context) (map[common.Namespace]contr } // Fetch storage worker status. - if storageNode := n.StorageWorker.GetRuntime(rt.ID()); storageNode != nil { - status.Storage, err = storageNode.GetStatus(ctx) + if storageWorker := n.StorageWorker.GetRuntime(rt.ID()); storageWorker != nil { + status.Storage, err = storageWorker.GetStatus(ctx) if err != nil { logger.Error("failed to fetch storage worker status", "err", err) } diff --git a/go/oasis-test-runner/oasis/log.go b/go/oasis-test-runner/oasis/log.go index cd38354d83f..af33a5cd156 100644 --- a/go/oasis-test-runner/oasis/log.go +++ b/go/oasis-test-runner/oasis/log.go @@ -8,7 +8,7 @@ import ( roothash "github.com/oasisprotocol/oasis-core/go/roothash/api" "github.com/oasisprotocol/oasis-core/go/roothash/api/commitment" upgrade "github.com/oasisprotocol/oasis-core/go/upgrade/api" - workerStorage "github.com/oasisprotocol/oasis-core/go/worker/storage/committee" + storageWorker "github.com/oasisprotocol/oasis-core/go/worker/storage/committee" ) // LogAssertEvent returns a handler which checks whether a specific log event was @@ -116,7 +116,7 @@ func LogAssertRoothashRoothashReindexing() log.WatcherHandlerFactory { // LogAssertCheckpointSync returns a handler which checks whether initial storage sync from // a checkpoint was successful or not. func LogAssertCheckpointSync() log.WatcherHandlerFactory { - return LogAssertEvent(workerStorage.LogEventCheckpointSyncSuccess, "checkpoint sync did not succeed") + return LogAssertEvent(storageWorker.LogEventCheckpointSyncSuccess, "checkpoint sync did not succeed") } // LogAssertDiscrepancyMajorityFailure returns a handler which checks whether a discrepancy resolution diff --git a/go/storage/mkvs/checkpoint/checkpointer.go b/go/storage/mkvs/checkpoint/checkpointer.go index 7a43a834871..b42d7291e44 100644 --- a/go/storage/mkvs/checkpoint/checkpointer.go +++ b/go/storage/mkvs/checkpoint/checkpointer.go @@ -72,7 +72,7 @@ type CreationParameters struct { ChunkerThreads uint16 } -// Checkpointer is a checkpointer. +// Checkpointer is responsible for creating the storage snapshots (checkpoints). type Checkpointer interface { // NotifyNewVersion notifies the checkpointer that a new version has been finalized. NotifyNewVersion(version uint64) @@ -95,6 +95,9 @@ type Checkpointer interface { // intervals; after unpausing, a checkpoint won't be created immediately, but the checkpointer // will wait for the next regular event. Pause(pause bool) + + // Serve starts running the checkpointer. + Serve(ctx context.Context) error } type checkpointer struct { @@ -112,6 +115,23 @@ type checkpointer struct { logger *logging.Logger } +// NewCheckpointer creates a new checkpointer that can be notified of new finalized versions and +// will automatically generate the configured number of checkpoints. +func NewCheckpointer(ndb db.NodeDB, creator Creator, cfg CheckpointerConfig) Checkpointer { + return &checkpointer{ + cfg: cfg, + ndb: ndb, + creator: creator, + notifyCh: channels.NewRingChannel(1), + forceCh: channels.NewRingChannel(1), + flushCh: channels.NewRingChannel(1), + statusCh: make(chan struct{}), + pausedCh: make(chan bool), + cpNotifier: pubsub.NewBroker(false), + logger: logging.GetLogger("storage/mkvs/checkpoint/"+cfg.Name).With("namespace", cfg.Namespace), + } +} + // Implements Checkpointer. func (c *checkpointer) NotifyNewVersion(version uint64) { c.notifyCh.In() <- version @@ -140,6 +160,106 @@ func (c *checkpointer) Pause(pause bool) { c.pausedCh <- pause } +func (c *checkpointer) Serve(ctx context.Context) error { + c.logger.Debug("storage checkpointer started", + "check_interval", c.cfg.CheckInterval, + ) + defer func() { + c.logger.Debug("storage checkpointer terminating") + }() + + paused := false + + for { + var interval time.Duration + switch c.cfg.CheckInterval { + case CheckIntervalDisabled: + interval = CheckIntervalDisabled + default: + interval = random.GetRandomValueFromInterval( + checkpointIntervalRandomizationFactor, + rand.Float64(), + c.cfg.CheckInterval, + ) + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(interval): + case <-c.flushCh.Out(): + case paused = <-c.pausedCh: + continue + } + + var ( + version uint64 + force bool + ) + select { + case <-ctx.Done(): + return ctx.Err() + case v := <-c.notifyCh.Out(): + version = v.(uint64) + case v := <-c.forceCh.Out(): + version = v.(uint64) + force = true + } + + // Fetch current checkpoint parameters. + params := c.cfg.Parameters + if params == nil && c.cfg.GetParameters != nil { + var err error + params, err = c.cfg.GetParameters(ctx) + if err != nil { + c.logger.Error("failed to get checkpoint parameters", + "err", err, + "version", version, + ) + continue + } + } + if params == nil { + c.logger.Error("no checkpoint parameters") + continue + } + + // Don't checkpoint if checkpoints are disabled. + switch { + case force: + // Always checkpoint when forced. + case paused: + continue + case params.Interval == 0: + continue + case c.cfg.CheckInterval == CheckIntervalDisabled: + continue + default: + } + + var err error + switch force { + case false: + err = c.maybeCheckpoint(ctx, version, params) + case true: + err = c.checkpoint(ctx, version, params) + } + if err != nil { + c.logger.Error("failed to checkpoint", + "version", version, + "err", err, + ) + continue + } + + // Emit status update if someone is listening. This is only used in tests. + select { + case c.statusCh <- struct{}{}: + default: + } + } +} + func (c *checkpointer) checkpoint(ctx context.Context, version uint64, params *CreationParameters) (err error) { // Notify watchers about the checkpoint we are about to make. c.cpNotifier.Broadcast(version) @@ -284,127 +404,3 @@ func (c *checkpointer) maybeCheckpoint(ctx context.Context, version uint64, para return nil } - -func (c *checkpointer) worker(ctx context.Context) { - c.logger.Debug("storage checkpointer started", - "check_interval", c.cfg.CheckInterval, - ) - defer func() { - c.logger.Debug("storage checkpointer terminating") - }() - - paused := false - - for { - var interval time.Duration - switch c.cfg.CheckInterval { - case CheckIntervalDisabled: - interval = CheckIntervalDisabled - default: - interval = random.GetRandomValueFromInterval( - checkpointIntervalRandomizationFactor, - rand.Float64(), - c.cfg.CheckInterval, - ) - } - - select { - case <-ctx.Done(): - return - case <-time.After(interval): - case <-c.flushCh.Out(): - case paused = <-c.pausedCh: - continue - } - - var ( - version uint64 - force bool - ) - select { - case <-ctx.Done(): - return - case v := <-c.notifyCh.Out(): - version = v.(uint64) - case v := <-c.forceCh.Out(): - version = v.(uint64) - force = true - } - - // Fetch current checkpoint parameters. - params := c.cfg.Parameters - if params == nil && c.cfg.GetParameters != nil { - var err error - params, err = c.cfg.GetParameters(ctx) - if err != nil { - c.logger.Error("failed to get checkpoint parameters", - "err", err, - "version", version, - ) - continue - } - } - if params == nil { - c.logger.Error("no checkpoint parameters") - continue - } - - // Don't checkpoint if checkpoints are disabled. - switch { - case force: - // Always checkpoint when forced. - case paused: - continue - case params.Interval == 0: - continue - case c.cfg.CheckInterval == CheckIntervalDisabled: - continue - default: - } - - var err error - switch force { - case false: - err = c.maybeCheckpoint(ctx, version, params) - case true: - err = c.checkpoint(ctx, version, params) - } - if err != nil { - c.logger.Error("failed to checkpoint", - "version", version, - "err", err, - ) - continue - } - - // Emit status update if someone is listening. This is only used in tests. - select { - case c.statusCh <- struct{}{}: - default: - } - } -} - -// NewCheckpointer creates a new checkpointer that can be notified of new finalized versions and -// will automatically generate the configured number of checkpoints. -func NewCheckpointer( - ctx context.Context, - ndb db.NodeDB, - creator Creator, - cfg CheckpointerConfig, -) (Checkpointer, error) { - c := &checkpointer{ - cfg: cfg, - ndb: ndb, - creator: creator, - notifyCh: channels.NewRingChannel(1), - forceCh: channels.NewRingChannel(1), - flushCh: channels.NewRingChannel(1), - statusCh: make(chan struct{}), - pausedCh: make(chan bool), - cpNotifier: pubsub.NewBroker(false), - logger: logging.GetLogger("storage/mkvs/checkpoint/"+cfg.Name).With("namespace", cfg.Namespace), - } - go c.worker(ctx) - return c, nil -} diff --git a/go/storage/mkvs/checkpoint/checkpointer_test.go b/go/storage/mkvs/checkpoint/checkpointer_test.go index 9782f904b85..67e3e5ea193 100644 --- a/go/storage/mkvs/checkpoint/checkpointer_test.go +++ b/go/storage/mkvs/checkpoint/checkpointer_test.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "path/filepath" + "sync" "testing" "time" @@ -25,7 +26,12 @@ const ( func testCheckpointer(t *testing.T, factory dbApi.Factory, earliestVersion, interval uint64, preExistingData bool) { require := require.New(t) - ctx := context.Background() + + var wg sync.WaitGroup + defer wg.Wait() + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() // Initialize a database. dir, err := os.MkdirTemp("", "mkvs.checkpointer") @@ -69,8 +75,8 @@ func testCheckpointer(t *testing.T, factory dbApi.Factory, earliestVersion, inte fc, err := NewFileCreator(filepath.Join(dir, "checkpoints"), ndb) require.NoError(err, "NewFileCreator") - // Create a checkpointer. - cp, err := NewCheckpointer(ctx, ndb, fc, CheckpointerConfig{ + // Create and run a checkpointer. + cp := NewCheckpointer(ndb, fc, CheckpointerConfig{ Name: "test", Namespace: testNs, CheckInterval: testCheckInterval, @@ -89,7 +95,12 @@ func testCheckpointer(t *testing.T, factory dbApi.Factory, earliestVersion, inte return ndb.GetRootsForVersion(version) }, }) - require.NoError(err, "NewCheckpointer") + wg.Go(func() { + err := cp.Serve(ctx) + if err != context.Canceled { + require.NoError(err) + } + }) // Start watching checkpoints. cpCh, sub, err := cp.WatchCheckpoints() diff --git a/go/worker/storage/committee/checkpoint_sync.go b/go/worker/storage/committee/checkpoint_sync.go index ad553272a90..9d38936158e 100644 --- a/go/worker/storage/committee/checkpoint_sync.go +++ b/go/worker/storage/committee/checkpoint_sync.go @@ -21,8 +21,8 @@ import ( const ( // cpListsTimeout is the timeout for fetching checkpoints from all nodes. cpListsTimeout = 30 * time.Second - // cpRestoreTimeout is the timeout for restoring a checkpoint chunk from a node. - cpRestoreTimeout = 60 * time.Second + // cpRestoreTimeout is the timeout for restoring a checkpoint chunk from the remote peer. + cpRestoreTimeout = time.Minute checkpointStatusDone = 0 checkpointStatusNext = 1 @@ -37,7 +37,7 @@ var ErrNoUsableCheckpoints = errors.New("storage: no checkpoint could be synced" // CheckpointSyncConfig is the checkpoint sync configuration. type CheckpointSyncConfig struct { - // Disabled specifies whether checkpoint sync should be disabled. In this case the node will + // Disabled specifies whether checkpoint sync should be disabled. In this case the storage worker will // only sync by applying all diffs from genesis. Disabled bool @@ -81,7 +81,7 @@ func (h *chunkHeap) Pop() any { return ret } -func (n *Node) checkpointChunkFetcher( +func (w *Worker) checkpointChunkFetcher( ctx context.Context, chunkDispatchCh chan *chunk, chunkReturnCh chan *chunk, @@ -103,9 +103,9 @@ func (n *Node) checkpointChunkFetcher( defer cancel() // Fetch chunk from peers. - rsp, pf, err := n.fetchChunk(chunkCtx, chunk) + rsp, pf, err := w.fetchChunk(chunkCtx, chunk) if err != nil { - n.logger.Error("failed to fetch chunk from peers", + w.logger.Error("failed to fetch chunk from peers", "err", err, "chunk", chunk.Index, ) @@ -114,7 +114,7 @@ func (n *Node) checkpointChunkFetcher( } // Restore fetched chunk. - done, err := n.localStorage.Checkpointer().RestoreChunk(chunkCtx, chunk.Index, bytes.NewBuffer(rsp)) + done, err := w.localStorage.Checkpointer().RestoreChunk(chunkCtx, chunk.Index, bytes.NewBuffer(rsp)) cancel() switch { @@ -124,7 +124,7 @@ func (n *Node) checkpointChunkFetcher( chunkReturnCh <- nil return case err != nil: - n.logger.Error("chunk restoration failed", + w.logger.Error("chunk restoration failed", "chunk", chunk.Index, "root", chunk.Root, "err", err, @@ -157,8 +157,8 @@ func (n *Node) checkpointChunkFetcher( // fetchChunk fetches chunk using checkpoint sync p2p protocol client. // // In case of no peers or error, it fallbacks to the legacy storage sync protocol. -func (n *Node) fetchChunk(ctx context.Context, chunk *chunk) ([]byte, rpc.PeerFeedback, error) { - rsp1, pf, err := n.checkpointSync.GetCheckpointChunk( +func (w *Worker) fetchChunk(ctx context.Context, chunk *chunk) ([]byte, rpc.PeerFeedback, error) { + rsp1, pf, err := w.checkpointSync.GetCheckpointChunk( ctx, &checkpointsync.GetCheckpointChunkRequest{ Version: chunk.Version, @@ -175,7 +175,7 @@ func (n *Node) fetchChunk(ctx context.Context, chunk *chunk) ([]byte, rpc.PeerFe return rsp1.Chunk, pf, nil } - rsp2, pf, err := n.legacyStorageSync.GetCheckpointChunk( + rsp2, pf, err := w.legacyStorageSync.GetCheckpointChunk( ctx, &synclegacy.GetCheckpointChunkRequest{ Version: chunk.Version, @@ -194,8 +194,8 @@ func (n *Node) fetchChunk(ctx context.Context, chunk *chunk) ([]byte, rpc.PeerFe return rsp2.Chunk, pf, nil } -func (n *Node) handleCheckpoint(check *checkpointsync.Checkpoint, maxParallelRequests uint) (cpStatus int, rerr error) { - if err := n.localStorage.Checkpointer().StartRestore(n.ctx, check.Metadata); err != nil { +func (w *Worker) handleCheckpoint(ctx context.Context, check *checkpointsync.Checkpoint, maxParallelRequests uint) (cpStatus int, rerr error) { + if err := w.localStorage.Checkpointer().StartRestore(ctx, check.Metadata); err != nil { // Any previous restores were already aborted by the driver up the call stack, so // things should have been going smoothly here; bail. return checkpointStatusBail, fmt.Errorf("can't start checkpoint restore: %w", err) @@ -208,9 +208,9 @@ func (n *Node) handleCheckpoint(check *checkpointsync.Checkpoint, maxParallelReq } // Abort has to succeed even if we were interrupted by context cancellation. ctx := context.Background() - if err := n.localStorage.Checkpointer().AbortRestore(ctx); err != nil { + if err := w.localStorage.Checkpointer().AbortRestore(ctx); err != nil { cpStatus = checkpointStatusBail - n.logger.Error("error while aborting checkpoint restore on handler exit, aborting sync", + w.logger.Error("error while aborting checkpoint restore on handler exit, aborting sync", "err", err, ) } @@ -222,7 +222,7 @@ func (n *Node) handleCheckpoint(check *checkpointsync.Checkpoint, maxParallelReq chunkReturnCh := make(chan *chunk, maxParallelRequests) errorCh := make(chan int, maxParallelRequests) - ctx, cancel := context.WithCancel(n.ctx) + chunkCtx, cancel := context.WithCancel(ctx) // Spawn the worker group to fetch and restore checkpoint chunks. var workerGroup sync.WaitGroup @@ -231,7 +231,7 @@ func (n *Node) handleCheckpoint(check *checkpointsync.Checkpoint, maxParallelReq workerGroup.Add(1) go func() { defer workerGroup.Done() - n.checkpointChunkFetcher(ctx, chunkDispatchCh, chunkReturnCh, errorCh) + w.checkpointChunkFetcher(chunkCtx, chunkDispatchCh, chunkReturnCh, errorCh) }() } go func() { @@ -264,7 +264,7 @@ func (n *Node) handleCheckpoint(check *checkpointsync.Checkpoint, maxParallelReq checkpoint: check, }) } - n.logger.Debug("checkpoint chunks prepared for dispatch", + w.logger.Debug("checkpoint chunks prepared for dispatch", "chunks", len(check.Chunks), "checkpoint_root", check.Root, ) @@ -283,8 +283,8 @@ func (n *Node) handleCheckpoint(check *checkpointsync.Checkpoint, maxParallelReq } select { - case <-n.ctx.Done(): - return checkpointStatusBail, n.ctx.Err() + case <-ctx.Done(): + return checkpointStatusBail, ctx.Err() case returned := <-chunkReturnCh: if returned == nil { @@ -313,13 +313,13 @@ func (n *Node) handleCheckpoint(check *checkpointsync.Checkpoint, maxParallelReq } } -func (n *Node) getCheckpointList() ([]*checkpointsync.Checkpoint, error) { - ctx, cancel := context.WithTimeout(n.ctx, cpListsTimeout) +func (w *Worker) getCheckpointList(ctx context.Context) ([]*checkpointsync.Checkpoint, error) { + ctx, cancel := context.WithTimeout(ctx, cpListsTimeout) defer cancel() - list, err := n.fetchCheckpoints(ctx) + list, err := w.fetchCheckpoints(ctx) if err != nil { - n.logger.Error("failed to retrieve any checkpoints", + w.logger.Error("failed to retrieve any checkpoints", "err", err, ) return nil, err @@ -334,15 +334,15 @@ func (n *Node) getCheckpointList() ([]*checkpointsync.Checkpoint, error) { // fetchCheckpoints fetches checkpoints using checkpoint sync p2p protocol client. // // In case of no peers, error or no checkpoints, it fallbacks to the legacy storage sync protocol. -func (n *Node) fetchCheckpoints(ctx context.Context) ([]*checkpointsync.Checkpoint, error) { - list1, err := n.checkpointSync.GetCheckpoints(ctx, &checkpointsync.GetCheckpointsRequest{ +func (w *Worker) fetchCheckpoints(ctx context.Context) ([]*checkpointsync.Checkpoint, error) { + list1, err := w.checkpointSync.GetCheckpoints(ctx, &checkpointsync.GetCheckpointsRequest{ Version: 1, }) if err == nil && len(list1) > 0 { // if NO error and at least one checkpoint return list1, nil } - list2, err := n.legacyStorageSync.GetCheckpoints(ctx, &synclegacy.GetCheckpointsRequest{ + list2, err := w.legacyStorageSync.GetCheckpoints(ctx, &synclegacy.GetCheckpointsRequest{ Version: 1, }) if err != nil { @@ -369,8 +369,8 @@ func sortCheckpoints(s []*checkpointsync.Checkpoint) { }) } -func (n *Node) checkCheckpointUsable(cp *checkpointsync.Checkpoint, remainingMask outstandingMask, genesisRound uint64) bool { - namespace := n.commonNode.Runtime.ID() +func (w *Worker) checkCheckpointUsable(ctx context.Context, cp *checkpointsync.Checkpoint, remainingMask outstandingMask, genesisRound uint64) bool { + namespace := w.commonNode.Runtime.ID() if !namespace.Equal(&cp.Root.Namespace) { // Not for the right runtime. return false @@ -380,12 +380,12 @@ func (n *Node) checkCheckpointUsable(cp *checkpointsync.Checkpoint, remainingMas return false } - blk, err := n.commonNode.Runtime.History().GetCommittedBlock(n.ctx, cp.Root.Version) + blk, err := w.commonNode.Runtime.History().GetCommittedBlock(ctx, cp.Root.Version) if err != nil { - n.logger.Error("can't get block information for checkpoint, skipping", "err", err, "root", cp.Root) + w.logger.Error("can't get block information for checkpoint, skipping", "err", err, "root", cp.Root) return false } - _, lastIORoot, lastStateRoot := n.GetLastSynced() + _, lastIORoot, lastStateRoot := w.GetLastSynced() lastVersions := map[storageApi.RootType]uint64{ storageApi.RootTypeIO: lastIORoot.Version, storageApi.RootTypeState: lastStateRoot.Version, @@ -401,18 +401,18 @@ func (n *Node) checkCheckpointUsable(cp *checkpointsync.Checkpoint, remainingMas } } } - n.logger.Info("checkpoint for unknown root skipped", "root", cp.Root) + w.logger.Info("checkpoint for unknown root skipped", "root", cp.Root) return false } -func (n *Node) syncCheckpoints(genesisRound uint64, wantOnlyGenesis bool) (*blockSummary, error) { +func (w *Worker) syncCheckpoints(ctx context.Context, genesisRound uint64, wantOnlyGenesis bool) (*blockSummary, error) { // Store roots and round info for checkpoints that finished syncing. // Round and namespace info will get overwritten as rounds are skipped // for errors, driven by remainingRoots. var syncState blockSummary // Fetch checkpoints from peers. - cps, err := n.getCheckpointList() + cps, err := w.getCheckpointList(ctx) if err != nil { return nil, fmt.Errorf("can't get checkpoint list from peers: %w", err) } @@ -440,8 +440,8 @@ func (n *Node) syncCheckpoints(genesisRound uint64, wantOnlyGenesis bool) (*bloc if !multipartRunning { return } - if err := n.localStorage.NodeDB().AbortMultipartInsert(); err != nil { - n.logger.Error("error aborting multipart restore on exit from syncer", + if err := w.localStorage.NodeDB().AbortMultipartInsert(); err != nil { + w.logger.Error("error aborting multipart restore on exit from syncer", "err", err, ) } @@ -449,7 +449,7 @@ func (n *Node) syncCheckpoints(genesisRound uint64, wantOnlyGenesis bool) (*bloc for _, check := range cps { - if check.Root.Version < genesisRound || !n.checkCheckpointUsable(check, remainingRoots, genesisRound) { + if check.Root.Version < genesisRound || !w.checkCheckpointUsable(ctx, check, remainingRoots, genesisRound) { continue } @@ -458,10 +458,10 @@ func (n *Node) syncCheckpoints(genesisRound uint64, wantOnlyGenesis bool) (*bloc // previous retries. Aborting multipart works with no multipart in // progress too. multipartRunning = false - if err := n.localStorage.NodeDB().AbortMultipartInsert(); err != nil { + if err := w.localStorage.NodeDB().AbortMultipartInsert(); err != nil { return nil, fmt.Errorf("error aborting previous multipart restore: %w", err) } - if err := n.localStorage.NodeDB().StartMultipartInsert(check.Root.Version); err != nil { + if err := w.localStorage.NodeDB().StartMultipartInsert(check.Root.Version); err != nil { return nil, fmt.Errorf("error starting multipart insert for round %d: %w", check.Root.Version, err) } multipartRunning = true @@ -486,18 +486,18 @@ func (n *Node) syncCheckpoints(genesisRound uint64, wantOnlyGenesis bool) (*bloc } } - status, err := n.handleCheckpoint(check, n.checkpointSyncCfg.ChunkFetcherCount) + status, err := w.handleCheckpoint(ctx, check, w.checkpointSyncCfg.ChunkFetcherCount) switch status { case checkpointStatusDone: - n.logger.Info("successfully restored from checkpoint", "root", check.Root, "mask", mask) + w.logger.Info("successfully restored from checkpoint", "root", check.Root, "mask", mask) syncState.Namespace = check.Root.Namespace syncState.Round = check.Root.Version syncState.Roots = append(syncState.Roots, check.Root) remainingRoots.remove(check.Root.Type) if remainingRoots.isEmpty() { - if err = n.localStorage.NodeDB().Finalize(syncState.Roots); err != nil { - n.logger.Error("can't finalize version after all checkpoints restored", + if err = w.localStorage.NodeDB().Finalize(syncState.Roots); err != nil { + w.logger.Error("can't finalize version after all checkpoints restored", "err", err, "version", prevVersion, "roots", syncState.Roots, @@ -510,10 +510,10 @@ func (n *Node) syncCheckpoints(genesisRound uint64, wantOnlyGenesis bool) (*bloc } continue case checkpointStatusNext: - n.logger.Info("error trying to restore from checkpoint, trying next most recent", "root", check.Root, "err", err) + w.logger.Info("error trying to restore from checkpoint, trying next most recent", "root", check.Root, "err", err) continue case checkpointStatusBail: - n.logger.Error("error trying to restore from checkpoint, unrecoverable", "root", check.Root, "err", err) + w.logger.Error("error trying to restore from checkpoint, unrecoverable", "root", check.Root, "err", err) return nil, fmt.Errorf("error restoring from checkpoints: %w", err) } } diff --git a/go/worker/storage/committee/metrics.go b/go/worker/storage/committee/metrics.go index 7f641f71fdd..6b3ad5e0216 100644 --- a/go/worker/storage/committee/metrics.go +++ b/go/worker/storage/committee/metrics.go @@ -49,9 +49,9 @@ var ( prometheusOnce sync.Once ) -func (n *Node) getMetricLabels() prometheus.Labels { +func (w *Worker) getMetricLabels() prometheus.Labels { return prometheus.Labels{ - "runtime": n.commonNode.Runtime.ID().String(), + "runtime": w.commonNode.Runtime.ID().String(), } } diff --git a/go/worker/storage/committee/prune.go b/go/worker/storage/committee/prune.go new file mode 100644 index 00000000000..f61f96ce1e7 --- /dev/null +++ b/go/worker/storage/committee/prune.go @@ -0,0 +1,48 @@ +package committee + +import ( + "fmt" + + "github.com/oasisprotocol/oasis-core/go/common/logging" + mkvsDB "github.com/oasisprotocol/oasis-core/go/storage/mkvs/db/api" +) + +type pruneHandler struct { + logger *logging.Logger + worker *Worker +} + +func (p *pruneHandler) Prune(rounds []uint64) error { + // Make sure we never prune past what was synced. + lastSycnedRound, _, _ := p.worker.GetLastSynced() + + for _, round := range rounds { + if round >= lastSycnedRound { + return fmt.Errorf("worker/storage: tried to prune past last synced round (last synced: %d)", + lastSycnedRound, + ) + } + + // Old suggestion: Make sure we don't prune rounds that need to be checkpointed but haven't been yet. + + p.logger.Debug("pruning storage for round", "round", round) + + // Prune given block. + err := p.worker.localStorage.NodeDB().Prune(round) + switch err { + case nil: + case mkvsDB.ErrNotEarliest: + p.logger.Debug("skipping non-earliest round", + "round", round, + ) + continue + default: + p.logger.Error("failed to prune block", + "err", err, + ) + return err + } + } + + return nil +} diff --git a/go/worker/storage/committee/node.go b/go/worker/storage/committee/worker.go similarity index 69% rename from go/worker/storage/committee/node.go rename to go/worker/storage/committee/worker.go index a1318a0802f..8548b526c51 100644 --- a/go/worker/storage/committee/node.go +++ b/go/worker/storage/committee/worker.go @@ -1,3 +1,5 @@ +// Package committee defines the logic responsible for initializing, syncing, +// and pruning of the runtime state using the relevant p2p protocol clients. package committee import ( @@ -27,7 +29,6 @@ import ( storageApi "github.com/oasisprotocol/oasis-core/go/storage/api" "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" dbApi "github.com/oasisprotocol/oasis-core/go/storage/mkvs/db/api" - mkvsDB "github.com/oasisprotocol/oasis-core/go/storage/mkvs/db/api" workerCommon "github.com/oasisprotocol/oasis-core/go/worker/common" "github.com/oasisprotocol/oasis-core/go/worker/common/committee" "github.com/oasisprotocol/oasis-core/go/worker/registration" @@ -39,7 +40,7 @@ import ( ) var ( - _ committee.NodeHooks = (*Node)(nil) + _ committee.NodeHooks = (*Worker)(nil) // ErrNonLocalBackend is the error returned when the storage backend doesn't implement the LocalBackend interface. ErrNonLocalBackend = errors.New("storage: storage backend doesn't support local storage") @@ -70,6 +71,9 @@ const ( // same checkpoint hashes. The current value was chosen based on the benchmarks // done on the modern developer machine. chunkerThreads = 12 + + // diffResponseTimeout is the maximum time for fetching storage diff from the peer. + diffResponseTimeout = 15 * time.Second ) type roundItem interface { @@ -118,8 +122,18 @@ type finalizeResult struct { err error } -// Node watches blocks for storage changes. -type Node struct { +// Worker is the runtime storage worker, responsible for syncing state +// that corresponds to the incoming runtime block headers received from the +// consensus service. +// +// In addition this worker is responsible for: +// 1. Initializing the runtime state, possibly using checkpoints (if configured). +// 2. Pruning the state as specified by the configuration. +// 3. Optionally creating runtime state checkpoints (used by other nodes) for the state sync. +// 4. Creating (and optionally advertising) statesync p2p protocol clients and servers. +// 5. Registering node availability when it has synced sufficiently close to +// the latest known block header. +type Worker struct { commonNode *committee.Node roleProvider registration.RoleProvider @@ -136,8 +150,6 @@ type Node struct { undefinedRound uint64 - fetchPool *workerpool.Pool - workerCommonCfg workerCommon.Config checkpointer checkpoint.Checkpointer @@ -154,29 +166,21 @@ type Node struct { diffCh chan *fetchedDiff finalizeCh chan finalizeResult - ctx context.Context - ctxCancel context.CancelFunc - - quitCh chan struct{} - initCh chan struct{} } -func NewNode( +// New creates a new storage worker. +func New( commonNode *committee.Node, roleProvider registration.RoleProvider, rpcRoleProvider registration.RoleProvider, workerCommonCfg workerCommon.Config, localStorage storageApi.LocalBackend, checkpointSyncCfg *CheckpointSyncConfig, -) (*Node, error) { +) (*Worker, error) { initMetrics() - // Create the fetcher pool. - fetchPool := workerpool.New("storage_fetch/" + commonNode.Runtime.ID().String()) - fetchPool.Resize(config.GlobalConfig.Storage.FetcherCount) - - n := &Node{ + w := &Worker{ commonNode: commonNode, roleProvider: roleProvider, @@ -188,8 +192,6 @@ func NewNode( localStorage: localStorage, - fetchPool: fetchPool, - checkpointSyncCfg: checkpointSyncCfg, status: api.StatusInitializing, @@ -198,7 +200,6 @@ func NewNode( diffCh: make(chan *fetchedDiff), finalizeCh: make(chan finalizeResult), - quitCh: make(chan struct{}), initCh: make(chan struct{}), } @@ -208,21 +209,19 @@ func NewNode( } // Initialize sync state. - n.syncedState.Round = defaultUndefinedRound - - n.ctx, n.ctxCancel = context.WithCancel(context.Background()) + w.syncedState.Round = defaultUndefinedRound // Create a checkpointer (even if checkpointing is disabled) to ensure the genesis checkpoint is available. - checkpointer, err := n.newCheckpointer(n.ctx, commonNode, localStorage) + checkpointer, err := w.newCheckpointer(commonNode, localStorage) if err != nil { return nil, fmt.Errorf("failed to create checkpointer: %w", err) } - n.checkpointer = checkpointer + w.checkpointer = checkpointer // Register prune handler. commonNode.Runtime.History().Pruner().RegisterHandler(&pruneHandler{ - logger: n.logger, - node: n, + logger: w.logger, + worker: w, }) // Advertise and serve p2p protocols. @@ -236,14 +235,14 @@ func NewNode( } // Create p2p protocol clients. - n.legacyStorageSync = synclegacy.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) - n.diffSync = diffsync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) - n.checkpointSync = checkpointsync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) + w.legacyStorageSync = synclegacy.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) + w.diffSync = diffsync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) + w.checkpointSync = checkpointsync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) - return n, nil + return w, nil } -func (n *Node) newCheckpointer(ctx context.Context, commonNode *committee.Node, localStorage storageApi.LocalBackend) (checkpoint.Checkpointer, error) { +func (w *Worker) newCheckpointer(commonNode *committee.Node, localStorage storageApi.LocalBackend) (checkpoint.Checkpointer, error) { checkInterval := checkpoint.CheckIntervalDisabled if config.GlobalConfig.Storage.Checkpointer.Enabled { checkInterval = config.GlobalConfig.Storage.Checkpointer.CheckInterval @@ -291,109 +290,71 @@ func (n *Node) newCheckpointer(ctx context.Context, commonNode *committee.Node, } return checkpoint.NewCheckpointer( - ctx, localStorage.NodeDB(), localStorage.Checkpointer(), checkpointerCfg, - ) -} - -// Service interface. - -// Name returns the service name. -func (n *Node) Name() string { - return "committee node" -} - -// Start causes the worker to start responding to CometBFT new block events. -func (n *Node) Start() error { - go n.worker() - if config.GlobalConfig.Storage.Checkpointer.Enabled { - go n.consensusCheckpointSyncer() - } - return nil -} - -// Stop causes the worker to stop watching and shut down. -func (n *Node) Stop() { - n.statusLock.Lock() - n.status = api.StatusStopping - n.statusLock.Unlock() - - n.fetchPool.Stop() - - n.ctxCancel() -} - -// Quit returns a channel that will be closed when the worker stops. -func (n *Node) Quit() <-chan struct{} { - return n.quitCh -} - -// Cleanup cleans up any leftover state after the worker is stopped. -func (n *Node) Cleanup() { - // Nothing to do here? + ), nil } // Initialized returns a channel that will be closed once the worker finished starting up. -func (n *Node) Initialized() <-chan struct{} { - return n.initCh +func (w *Worker) Initialized() <-chan struct{} { + return w.initCh } -// GetStatus returns the storage committee node status. -func (n *Node) GetStatus(context.Context) (*api.Status, error) { - n.syncedLock.RLock() - defer n.syncedLock.RUnlock() +// GetStatus returns the state sync worker status. +func (w *Worker) GetStatus(context.Context) (*api.Status, error) { + w.syncedLock.RLock() + defer w.syncedLock.RUnlock() - n.statusLock.RLock() - defer n.statusLock.RUnlock() + w.statusLock.RLock() + defer w.statusLock.RUnlock() return &api.Status{ - LastFinalizedRound: n.syncedState.Round, - Status: n.status, + LastFinalizedRound: w.syncedState.Round, + Status: w.status, }, nil } -func (n *Node) PauseCheckpointer(pause bool) error { +func (w *Worker) PauseCheckpointer(pause bool) error { if !commonFlags.DebugDontBlameOasis() { return api.ErrCantPauseCheckpointer } - n.checkpointer.Pause(pause) + w.checkpointer.Pause(pause) return nil } -// GetLocalStorage returns the local storage backend used by this storage node. -func (n *Node) GetLocalStorage() storageApi.LocalBackend { - return n.localStorage +// GetLocalStorage returns the local storage backend used by the worker. +func (w *Worker) GetLocalStorage() storageApi.LocalBackend { + return w.localStorage } // NodeHooks implementation. // HandleNewBlockEarlyLocked is guarded by CrossNode. -func (n *Node) HandleNewBlockEarlyLocked(*runtime.BlockInfo) { +func (w *Worker) HandleNewBlockEarlyLocked(*runtime.BlockInfo) { // Nothing to do here. } // HandleNewBlockLocked is guarded by CrossNode. -func (n *Node) HandleNewBlockLocked(bi *runtime.BlockInfo) { +func (w *Worker) HandleNewBlockLocked(bi *runtime.BlockInfo) { // Notify the state syncer that there is a new block. - n.blockCh.In() <- bi.RuntimeBlock + w.blockCh.In() <- bi.RuntimeBlock } // HandleRuntimeHostEventLocked is guarded by CrossNode. -func (n *Node) HandleRuntimeHostEventLocked(*host.Event) { +func (w *Worker) HandleRuntimeHostEventLocked(*host.Event) { // Nothing to do here. } // Watcher implementation. // GetLastSynced returns the height, IORoot hash and StateRoot hash of the last block that was fully synced to. -func (n *Node) GetLastSynced() (uint64, storageApi.Root, storageApi.Root) { - n.syncedLock.RLock() - defer n.syncedLock.RUnlock() +func (w *Worker) GetLastSynced() (uint64, storageApi.Root, storageApi.Root) { + w.syncedLock.RLock() + defer w.syncedLock.RUnlock() var io, state storageApi.Root - for _, root := range n.syncedState.Roots { + for _, root := range w.syncedState.Roots { switch root.Type { case storageApi.RootTypeIO: io = root @@ -402,10 +363,10 @@ func (n *Node) GetLastSynced() (uint64, storageApi.Root, storageApi.Root) { } } - return n.syncedState.Round, io, state + return w.syncedState.Round, io, state } -func (n *Node) fetchDiff(round uint64, prevRoot, thisRoot storageApi.Root) { +func (w *Worker) fetchDiff(ctx context.Context, round uint64, prevRoot, thisRoot storageApi.Root) { result := &fetchedDiff{ fetched: false, pf: rpc.NewNopPeerFeedback(), @@ -415,13 +376,13 @@ func (n *Node) fetchDiff(round uint64, prevRoot, thisRoot storageApi.Root) { } defer func() { select { - case n.diffCh <- result: - case <-n.ctx.Done(): + case w.diffCh <- result: + case <-ctx.Done(): } }() // Check if the new root doesn't already exist. - if n.localStorage.NodeDB().HasRoot(thisRoot) { + if w.localStorage.NodeDB().HasRoot(thisRoot) { return } @@ -436,15 +397,12 @@ func (n *Node) fetchDiff(round uint64, prevRoot, thisRoot storageApi.Root) { } // New root does not yet exist in storage and we need to fetch it from a peer. - n.logger.Debug("calling GetDiff", + w.logger.Debug("calling GetDiff", "old_root", prevRoot, "new_root", thisRoot, ) - ctx, cancel := context.WithCancel(n.ctx) - defer cancel() - - wl, pf, err := n.getDiff(ctx, prevRoot, thisRoot) + wl, pf, err := w.getDiff(ctx, prevRoot, thisRoot) if err != nil { result.err = err return @@ -456,35 +414,39 @@ func (n *Node) fetchDiff(round uint64, prevRoot, thisRoot storageApi.Root) { // getDiff fetches writelog using diff sync p2p protocol client. // // In case of no peers or error, it fallbacks to the legacy storage sync protocol. -func (n *Node) getDiff(ctx context.Context, prevRoot, thisRoot storageApi.Root) (storageApi.WriteLog, rpc.PeerFeedback, error) { - rsp1, pf, err := n.diffSync.GetDiff(ctx, &diffsync.GetDiffRequest{StartRoot: prevRoot, EndRoot: thisRoot}) +func (w *Worker) getDiff(ctx context.Context, prevRoot, thisRoot storageApi.Root) (storageApi.WriteLog, rpc.PeerFeedback, error) { + ctx, cancel := context.WithTimeout(ctx, diffResponseTimeout) + defer cancel() + rsp1, pf, err := w.diffSync.GetDiff(ctx, &diffsync.GetDiffRequest{StartRoot: prevRoot, EndRoot: thisRoot}) if err == nil { // if NO error return rsp1.WriteLog, pf, nil } - rsp2, pf, err := n.legacyStorageSync.GetDiff(ctx, &synclegacy.GetDiffRequest{StartRoot: prevRoot, EndRoot: thisRoot}) + ctx, cancel = context.WithTimeout(ctx, diffResponseTimeout) + defer cancel() + rsp2, pf, err := w.legacyStorageSync.GetDiff(ctx, &synclegacy.GetDiffRequest{StartRoot: prevRoot, EndRoot: thisRoot}) if err != nil { return nil, nil, err } return rsp2.WriteLog, pf, nil } -func (n *Node) finalize(summary *blockSummary) { - err := n.localStorage.NodeDB().Finalize(summary.Roots) +func (w *Worker) finalize(ctx context.Context, summary *blockSummary) { + err := w.localStorage.NodeDB().Finalize(summary.Roots) switch err { case nil: - n.logger.Debug("storage round finalized", + w.logger.Debug("storage round finalized", "round", summary.Round, ) case storageApi.ErrAlreadyFinalized: // This can happen if we are restoring after a roothash migration or if // we crashed before updating the sync state. - n.logger.Warn("storage round already finalized", + w.logger.Warn("storage round already finalized", "round", summary.Round, ) err = nil default: - n.logger.Error("failed to finalize storage round", + w.logger.Error("failed to finalize storage round", "err", err, "round", summary.Round, ) @@ -496,31 +458,31 @@ func (n *Node) finalize(summary *blockSummary) { } select { - case n.finalizeCh <- result: - case <-n.ctx.Done(): + case w.finalizeCh <- result: + case <-ctx.Done(): } } -func (n *Node) initGenesis(rt *registryApi.Runtime, genesisBlock *block.Block) error { - n.logger.Info("initializing storage at genesis") +func (w *Worker) initGenesis(ctx context.Context, rt *registryApi.Runtime, genesisBlock *block.Block) error { + w.logger.Info("initializing storage at genesis") // Check what the latest finalized version in the database is as we may be using a database // from a previous version or network. - latestVersion, alreadyInitialized := n.localStorage.NodeDB().GetLatestVersion() + latestVersion, alreadyInitialized := w.localStorage.NodeDB().GetLatestVersion() // Finalize any versions that were not yet finalized in the old database. This is only possible // as long as there is only one non-finalized root per version. Note that we also cannot be sure // that any of these roots are valid, but this is fine as long as the final version matches the // genesis root. if alreadyInitialized { - n.logger.Debug("already initialized, finalizing any non-finalized versions", + w.logger.Debug("already initialized, finalizing any non-finalized versions", "genesis_state_root", genesisBlock.Header.StateRoot, "genesis_round", genesisBlock.Header.Round, "latest_version", latestVersion, ) for v := latestVersion + 1; v < genesisBlock.Header.Round; v++ { - roots, err := n.localStorage.NodeDB().GetRootsForVersion(v) + roots, err := w.localStorage.NodeDB().GetRootsForVersion(v) if err != nil { return fmt.Errorf("failed to fetch roots for version %d: %w", v, err) } @@ -535,7 +497,7 @@ func (n *Node) initGenesis(rt *registryApi.Runtime, genesisBlock *block.Block) e break // We must have exactly one non-finalized state root to continue. } - err = n.localStorage.NodeDB().Finalize(stateRoots) + err = w.localStorage.NodeDB().Finalize(stateRoots) if err != nil { return fmt.Errorf("failed to finalize version %d: %w", v, err) } @@ -559,14 +521,14 @@ func (n *Node) initGenesis(rt *registryApi.Runtime, genesisBlock *block.Block) e maybeRoot := stateRoot maybeRoot.Version = latestVersion - if n.localStorage.NodeDB().HasRoot(maybeRoot) { - n.logger.Debug("latest version earlier than genesis state root, filling in versions", + if w.localStorage.NodeDB().HasRoot(maybeRoot) { + w.logger.Debug("latest version earlier than genesis state root, filling in versions", "genesis_state_root", genesisBlock.Header.StateRoot, "genesis_round", genesisBlock.Header.Round, "latest_version", latestVersion, ) for v := latestVersion; v < stateRoot.Version; v++ { - err := n.localStorage.Apply(n.ctx, &storageApi.ApplyRequest{ + err := w.localStorage.Apply(ctx, &storageApi.ApplyRequest{ Namespace: rt.ID, RootType: storageApi.RootTypeState, SrcRound: v, @@ -579,7 +541,7 @@ func (n *Node) initGenesis(rt *registryApi.Runtime, genesisBlock *block.Block) e return fmt.Errorf("failed to fill in version %d: %w", v, err) } - err = n.localStorage.NodeDB().Finalize([]storageApi.Root{{ + err = w.localStorage.NodeDB().Finalize([]storageApi.Root{{ Namespace: rt.ID, Version: v + 1, Type: storageApi.RootTypeState, @@ -594,14 +556,14 @@ func (n *Node) initGenesis(rt *registryApi.Runtime, genesisBlock *block.Block) e } default: // Latest finalized version is the same or ahead, root must exist. - compatible = n.localStorage.NodeDB().HasRoot(stateRoot) + compatible = w.localStorage.NodeDB().HasRoot(stateRoot) } // If we are incompatible and the local version is greater or the same as the genesis version, // we cannot do anything. If the local version is lower we assume the node will sync from a // different node. if !compatible && latestVersion >= stateRoot.Version { - n.logger.Error("existing state is incompatible with runtime genesis state", + w.logger.Error("existing state is incompatible with runtime genesis state", "genesis_state_root", genesisBlock.Header.StateRoot, "genesis_round", genesisBlock.Header.Round, "latest_version", latestVersion, @@ -611,46 +573,46 @@ func (n *Node) initGenesis(rt *registryApi.Runtime, genesisBlock *block.Block) e if !compatible { // Database is empty, so assume the state will be replicated from another node. - n.logger.Warn("non-empty state root but no state available, assuming replication", + w.logger.Warn("non-empty state root but no state available, assuming replication", "state_root", genesisBlock.Header.StateRoot, ) - n.checkpointSyncForced = true + w.checkpointSyncForced = true } return nil } -func (n *Node) flushSyncedState(summary *blockSummary) (uint64, error) { - n.syncedLock.Lock() - defer n.syncedLock.Unlock() +func (w *Worker) flushSyncedState(summary *blockSummary) (uint64, error) { + w.syncedLock.Lock() + defer w.syncedLock.Unlock() - n.syncedState = *summary - if err := n.commonNode.Runtime.History().StorageSyncCheckpoint(n.syncedState.Round); err != nil { + w.syncedState = *summary + if err := w.commonNode.Runtime.History().StorageSyncCheckpoint(w.syncedState.Round); err != nil { return 0, err } - return n.syncedState.Round, nil + return w.syncedState.Round, nil } -func (n *Node) consensusCheckpointSyncer() { +func (w *Worker) consensusCheckpointSyncer(ctx context.Context) { // Make sure we always create a checkpoint when the consensus layer creates a checkpoint. The // reason why we do this is to make it faster for storage nodes that use consensus state sync // to catch up as exactly the right checkpoint will be available. - consensusCp := n.commonNode.Consensus.Checkpointer() + consensusCp := w.commonNode.Consensus.Checkpointer() if consensusCp == nil { return } // Wait for the common node to be initialized. select { - case <-n.commonNode.Initialized(): - case <-n.ctx.Done(): + case <-w.commonNode.Initialized(): + case <-ctx.Done(): return } // Determine the maximum number of consensus checkpoints to keep. - consensusParams, err := n.commonNode.Consensus.Core().GetParameters(n.ctx, consensus.HeightLatest) + consensusParams, err := w.commonNode.Consensus.Core().GetParameters(ctx, consensus.HeightLatest) if err != nil { - n.logger.Error("failed to fetch consensus parameters", + w.logger.Error("failed to fetch consensus parameters", "err", err, ) return @@ -658,7 +620,7 @@ func (n *Node) consensusCheckpointSyncer() { ch, sub, err := consensusCp.WatchCheckpoints() if err != nil { - n.logger.Error("failed to watch checkpoints", + w.logger.Error("failed to watch checkpoints", "err", err, ) return @@ -679,9 +641,7 @@ func (n *Node) consensusCheckpointSyncer() { }() for { select { - case <-n.quitCh: - return - case <-n.ctx.Done(): + case <-ctx.Done(): return case version := <-ch: // We need to wait for the next version as that is what will be in the consensus @@ -692,15 +652,15 @@ func (n *Node) consensusCheckpointSyncer() { versions = versions[1:] } - n.logger.Debug("consensus checkpoint detected, queuing runtime checkpoint", + w.logger.Debug("consensus checkpoint detected, queuing runtime checkpoint", "version", version+1, "num_versions", len(versions), ) if blkCh == nil { - blkCh, blkSub, err = n.commonNode.Consensus.Core().WatchBlocks(n.ctx) + blkCh, blkSub, err = w.commonNode.Consensus.Core().WatchBlocks(ctx) if err != nil { - n.logger.Error("failed to watch blocks", + w.logger.Error("failed to watch blocks", "err", err, ) continue @@ -709,7 +669,7 @@ func (n *Node) consensusCheckpointSyncer() { case blk := <-blkCh: // If there's nothing remaining, unsubscribe. if len(versions) == 0 { - n.logger.Debug("no more queued consensus checkpoint versions") + w.logger.Debug("no more queued consensus checkpoint versions") blkSub.Close() blkSub = nil @@ -727,12 +687,12 @@ func (n *Node) consensusCheckpointSyncer() { // Lookup what runtime round corresponds to the given consensus layer version and make // sure we checkpoint it. - blk, err := n.commonNode.Consensus.RootHash().GetLatestBlock(n.ctx, &roothashApi.RuntimeRequest{ - RuntimeID: n.commonNode.Runtime.ID(), + blk, err := w.commonNode.Consensus.RootHash().GetLatestBlock(ctx, &roothashApi.RuntimeRequest{ + RuntimeID: w.commonNode.Runtime.ID(), Height: int64(version), }) if err != nil { - n.logger.Error("failed to get runtime block corresponding to consensus checkpoint", + w.logger.Error("failed to get runtime block corresponding to consensus checkpoint", "err", err, "height", version, ) @@ -741,11 +701,11 @@ func (n *Node) consensusCheckpointSyncer() { // We may have not yet synced the corresponding runtime round locally. In this case // we need to wait until this is the case. - n.syncedLock.RLock() - lastSyncedRound := n.syncedState.Round - n.syncedLock.RUnlock() + w.syncedLock.RLock() + lastSyncedRound := w.syncedState.Round + w.syncedLock.RUnlock() if blk.Header.Round > lastSyncedRound { - n.logger.Debug("runtime round not available yet for checkpoint, waiting", + w.logger.Debug("runtime round not available yet for checkpoint, waiting", "height", version, "round", blk.Header.Round, "last_synced_round", lastSyncedRound, @@ -755,12 +715,12 @@ func (n *Node) consensusCheckpointSyncer() { } // Force runtime storage checkpointer to create a checkpoint at this round. - n.logger.Info("consensus checkpoint, force runtime checkpoint", + w.logger.Info("consensus checkpoint, force runtime checkpoint", "height", version, "round", blk.Header.Round, ) - n.checkpointer.ForceCheckpoint(blk.Header.Round) + w.checkpointer.ForceCheckpoint(blk.Header.Round) } versions = newVersions } @@ -768,134 +728,143 @@ func (n *Node) consensusCheckpointSyncer() { } // This is only called from the main worker goroutine, so no locking should be necessary. -func (n *Node) nudgeAvailability(lastSynced, latest uint64) { - if lastSynced == n.undefinedRound || latest == n.undefinedRound { +func (w *Worker) nudgeAvailability(lastSynced, latest uint64) { + if lastSynced == w.undefinedRound || latest == w.undefinedRound { return } - if latest-lastSynced < maximumRoundDelayForAvailability && !n.roleAvailable { - n.roleProvider.SetAvailable(func(_ *node.Node) error { + if latest-lastSynced < maximumRoundDelayForAvailability && !w.roleAvailable { + w.roleProvider.SetAvailable(func(_ *node.Node) error { return nil }) - if n.rpcRoleProvider != nil { - n.rpcRoleProvider.SetAvailable(func(_ *node.Node) error { + if w.rpcRoleProvider != nil { + w.rpcRoleProvider.SetAvailable(func(_ *node.Node) error { return nil }) } - n.roleAvailable = true + w.roleAvailable = true } - if latest-lastSynced > minimumRoundDelayForUnavailability && n.roleAvailable { - n.roleProvider.SetUnavailable() - if n.rpcRoleProvider != nil { - n.rpcRoleProvider.SetUnavailable() + if latest-lastSynced > minimumRoundDelayForUnavailability && w.roleAvailable { + w.roleProvider.SetUnavailable() + if w.rpcRoleProvider != nil { + w.rpcRoleProvider.SetUnavailable() } - n.roleAvailable = false + w.roleAvailable = false } } -func (n *Node) worker() { // nolint: gocyclo - defer close(n.quitCh) - defer close(n.diffCh) +// Serve runs the storage worker. +func (w *Worker) Serve(ctx context.Context) error { // nolint: gocyclo + defer close(w.diffCh) // Wait for the common node to be initialized. select { - case <-n.commonNode.Initialized(): - case <-n.ctx.Done(): - close(n.initCh) - return + case <-w.commonNode.Initialized(): + case <-ctx.Done(): + close(w.initCh) + return ctx.Err() } - n.logger.Info("starting committee node") + w.logger.Info("starting") + w.statusLock.Lock() + w.status = api.StatusStarting + w.statusLock.Unlock() - n.statusLock.Lock() - n.status = api.StatusStarting - n.statusLock.Unlock() + var wg sync.WaitGroup + defer wg.Wait() + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + wg.Go(func() { + <-ctx.Done() + w.statusLock.Lock() + w.status = api.StatusStopping + w.statusLock.Unlock() + }) + defer w.logger.Info("stopped") + + wg.Go(func() { + err := w.checkpointer.Serve(ctx) + w.logger.Error("checkpointer stopped", "err", err) + }) + if config.GlobalConfig.Storage.Checkpointer.Enabled { + wg.Go(func() { + w.consensusCheckpointSyncer(ctx) + }) + } // Determine genesis block. - genesisBlock, err := n.commonNode.Consensus.RootHash().GetGenesisBlock(n.ctx, &roothashApi.RuntimeRequest{ - RuntimeID: n.commonNode.Runtime.ID(), + genesisBlock, err := w.commonNode.Consensus.RootHash().GetGenesisBlock(ctx, &roothashApi.RuntimeRequest{ + RuntimeID: w.commonNode.Runtime.ID(), Height: consensus.HeightLatest, }) if err != nil { - n.logger.Error("can't retrieve genesis block", "err", err) - return + return fmt.Errorf("can't retrieve genesis block: %w", err) } - n.undefinedRound = genesisBlock.Header.Round - 1 + w.undefinedRound = genesisBlock.Header.Round - 1 // Determine last finalized storage version. - if version, dbNonEmpty := n.localStorage.NodeDB().GetLatestVersion(); dbNonEmpty { + if version, dbNonEmpty := w.localStorage.NodeDB().GetLatestVersion(); dbNonEmpty { var blk *block.Block - blk, err = n.commonNode.Runtime.History().GetCommittedBlock(n.ctx, version) + blk, err = w.commonNode.Runtime.History().GetCommittedBlock(ctx, version) switch err { case nil: // Set last synced version to last finalized storage version. - if _, err = n.flushSyncedState(summaryFromBlock(blk)); err != nil { - n.logger.Error("failed to flush synced state", "err", err) - return + if _, err = w.flushSyncedState(summaryFromBlock(blk)); err != nil { + return fmt.Errorf("failed to flush synced state: %w", err) } default: // Failed to fetch historic block. This is fine when the network just went through a // dump/restore upgrade and we don't have any information before genesis. We treat the // database as unsynced and will proceed to either use checkpoints or sync iteratively. - n.logger.Warn("failed to fetch historic block", + w.logger.Warn("failed to fetch historic block", "err", err, "round", version, ) } } - n.syncedLock.RLock() - cachedLastRound := n.syncedState.Round - n.syncedLock.RUnlock() + w.syncedLock.RLock() + cachedLastRound := w.syncedState.Round + w.syncedLock.RUnlock() if cachedLastRound == defaultUndefinedRound || cachedLastRound < genesisBlock.Header.Round { - cachedLastRound = n.undefinedRound + cachedLastRound = w.undefinedRound } // Initialize genesis from the runtime descriptor. - isInitialStartup := (cachedLastRound == n.undefinedRound) + isInitialStartup := (cachedLastRound == w.undefinedRound) if isInitialStartup { - n.statusLock.Lock() - n.status = api.StatusInitializingGenesis - n.statusLock.Unlock() + w.statusLock.Lock() + w.status = api.StatusInitializingGenesis + w.statusLock.Unlock() var rt *registryApi.Runtime - rt, err = n.commonNode.Runtime.ActiveDescriptor(n.ctx) + rt, err = w.commonNode.Runtime.ActiveDescriptor(ctx) if err != nil { - n.logger.Error("failed to retrieve runtime registry descriptor", - "err", err, - ) - return + return fmt.Errorf("failed to retrieve runtime registry descriptor: %w", err) } - if err = n.initGenesis(rt, genesisBlock); err != nil { - n.logger.Error("failed to initialize storage at genesis", - "err", err, - ) - return + if err = w.initGenesis(ctx, rt, genesisBlock); err != nil { + return fmt.Errorf("failed to initialize storage at genesis: %w", err) } } - // Notify the checkpointer of the genesis round so it can be checkpointed. - if n.checkpointer != nil { - n.checkpointer.ForceCheckpoint(genesisBlock.Header.Round) - n.checkpointer.Flush() - } - // Check if we are able to fetch the first block that we would be syncing if we used iterative // syncing. In case we cannot (likely because we synced the consensus layer via state sync), we // must wait for a later checkpoint to become available. - if !n.checkpointSyncForced { - n.statusLock.Lock() - n.status = api.StatusSyncStartCheck - n.statusLock.Unlock() + if !w.checkpointSyncForced { + w.statusLock.Lock() + w.status = api.StatusSyncStartCheck + w.statusLock.Unlock() // Determine what is the first round that we would need to sync. iterativeSyncStart := cachedLastRound - if iterativeSyncStart == n.undefinedRound { + if iterativeSyncStart == w.undefinedRound { iterativeSyncStart++ } // Check if we actually have information about that round. This assumes that any reindexing // was already performed (the common node would not indicate being initialized otherwise). - _, err = n.commonNode.Runtime.History().GetCommittedBlock(n.ctx, iterativeSyncStart) + _, err = w.commonNode.Runtime.History().GetCommittedBlock(ctx, iterativeSyncStart) SyncStartCheck: switch { case err == nil: @@ -903,7 +872,7 @@ func (n *Node) worker() { // nolint: gocyclo // No information is available about the initial round. Query the earliest historic // block and check if that block has the genesis state root and empty I/O root. var earlyBlk *block.Block - earlyBlk, err = n.commonNode.Runtime.History().GetEarliestBlock(n.ctx) + earlyBlk, err = w.commonNode.Runtime.History().GetEarliestBlock(ctx) switch err { case nil: // Make sure the state root is still the same as at genesis time. @@ -917,13 +886,13 @@ func (n *Node) worker() { // nolint: gocyclo // If this is the case, we can start syncing from this round instead. Fill in the // remaining versions to make sure they actually exist in the database. - n.logger.Debug("filling in versions to genesis", + w.logger.Debug("filling in versions to genesis", "genesis_round", genesisBlock.Header.Round, "earliest_round", earlyBlk.Header.Round, ) for v := genesisBlock.Header.Round; v < earlyBlk.Header.Round; v++ { - err = n.localStorage.Apply(n.ctx, &storageApi.ApplyRequest{ - Namespace: n.commonNode.Runtime.ID(), + err = w.localStorage.Apply(ctx, &storageApi.ApplyRequest{ + Namespace: w.commonNode.Runtime.ID(), RootType: storageApi.RootTypeState, SrcRound: v, SrcRoot: genesisBlock.Header.StateRoot, @@ -937,65 +906,49 @@ func (n *Node) worker() { // nolint: gocyclo // Ignore already finalized versions. continue default: - n.logger.Error("failed to fill in version", - "version", v, - "err", err, - ) - return + return fmt.Errorf("failed to fill in version %d: %w", v, err) } - err = n.localStorage.NodeDB().Finalize([]storageApi.Root{{ - Namespace: n.commonNode.Runtime.ID(), + err = w.localStorage.NodeDB().Finalize([]storageApi.Root{{ + Namespace: w.commonNode.Runtime.ID(), Version: v + 1, Type: storageApi.RootTypeState, Hash: genesisBlock.Header.StateRoot, // We can ignore I/O roots. }}) if err != nil { - n.logger.Error("failed to finalize filled in version", - "version", v, - "err", err, - ) - return + return fmt.Errorf("failed to finalize filled in version %v: %w", v, err) } } - cachedLastRound, err = n.flushSyncedState(summaryFromBlock(earlyBlk)) + cachedLastRound, err = w.flushSyncedState(summaryFromBlock(earlyBlk)) if err != nil { - n.logger.Error("failed to flush synced state", - "err", err, - ) - return + return fmt.Errorf("failed to flush synced state: %w", err) } // No need to force a checkpoint sync. break SyncStartCheck default: // This should never happen as the block should exist. - n.logger.Warn("failed to query earliest block in local history", + w.logger.Warn("failed to query earliest block in local history", "err", err, ) } // No information is available about this round, force checkpoint sync. - n.logger.Warn("forcing checkpoint sync as we don't have authoritative block info", + w.logger.Warn("forcing checkpoint sync as we don't have authoritative block info", "round", iterativeSyncStart, ) - n.checkpointSyncForced = true + w.checkpointSyncForced = true default: // Unknown error while fetching block information, abort. - n.logger.Error("failed to query block", - "err", err, - ) - return + return fmt.Errorf("failed to query block: %w", err) } } - n.logger.Info("worker initialized", + w.logger.Info("worker initialized", "genesis_round", genesisBlock.Header.Round, "last_synced", cachedLastRound, ) - lastFullyAppliedRound := cachedLastRound - // Try to perform initial sync from state and io checkpoints if either: // // - Checkpoint sync has been forced because there is insufficient information available to use @@ -1008,10 +961,10 @@ func (n *Node) worker() { // nolint: gocyclo // to a later state which may not be desired given that checkpoint sync has been explicitly // disabled via config. // - if (isInitialStartup && !n.checkpointSyncCfg.Disabled) || n.checkpointSyncForced { - n.statusLock.Lock() - n.status = api.StatusSyncingCheckpoints - n.statusLock.Unlock() + if (isInitialStartup && !w.checkpointSyncCfg.Disabled) || w.checkpointSyncForced { + w.statusLock.Lock() + w.status = api.StatusSyncingCheckpoints + w.statusLock.Unlock() var ( summary *blockSummary @@ -1019,17 +972,17 @@ func (n *Node) worker() { // nolint: gocyclo ) CheckpointSyncRetry: for { - summary, err = n.syncCheckpoints(genesisBlock.Header.Round, n.checkpointSyncCfg.Disabled) + summary, err = w.syncCheckpoints(ctx, genesisBlock.Header.Round, w.checkpointSyncCfg.Disabled) if err == nil { break } attempt++ - switch n.checkpointSyncForced { + switch w.checkpointSyncForced { case true: // We have no other options but to perform a checkpoint sync as we are missing // either state or authoritative blocks. - n.logger.Info("checkpoint sync required, retrying", + w.logger.Info("checkpoint sync required, retrying", "err", err, "attempt", attempt, ) @@ -1041,43 +994,49 @@ func (n *Node) worker() { // nolint: gocyclo // Try syncing again. The main reason for this is the sync failing due to a // checkpoint pruning race condition (where nodes list a checkpoint which is // then deleted just before we request its chunks). One retry is enough. - n.logger.Info("first checkpoint sync failed, trying once more", "err", err) + w.logger.Info("first checkpoint sync failed, trying once more", "err", err) } // Delay before retrying. select { case <-time.After(checkpointSyncRetryDelay): - case <-n.ctx.Done(): - return + case <-ctx.Done(): + return ctx.Err() } } if err != nil { - n.logger.Info("checkpoint sync failed", "err", err) + w.logger.Info("checkpoint sync failed", "err", err) } else { - cachedLastRound, err = n.flushSyncedState(summary) + cachedLastRound, err = w.flushSyncedState(summary) if err != nil { - n.logger.Error("failed to flush synced state", - "err", err, - ) - return + return fmt.Errorf("failed to flush synced state %w", err) } - lastFullyAppliedRound = cachedLastRound - n.logger.Info("checkpoint sync succeeded", + w.logger.Info("checkpoint sync succeeded", logging.LogEvent, LogEventCheckpointSyncSuccess, ) } } - close(n.initCh) + close(w.initCh) + w.logger.Info("initialized") + + // Notify the checkpointer of the genesis round so it can be checkpointed. + w.checkpointer.ForceCheckpoint(genesisBlock.Header.Round) + w.checkpointer.Flush() // Don't register availability immediately, we want to know first how far behind consensus we are. - latestBlockRound := n.undefinedRound + latestBlockRound := w.undefinedRound + lastFullyAppliedRound := cachedLastRound heartbeat := heartbeat{} heartbeat.reset() - var wg sync.WaitGroup syncingRounds := make(map[uint64]*inFlight) summaryCache := make(map[uint64]*blockSummary) + + fetchPool := workerpool.New("storage_fetch/" + w.commonNode.Runtime.ID().String()) + fetchPool.Resize(config.GlobalConfig.Storage.FetcherCount) + defer fetchPool.Stop() + triggerRoundFetches := func() { for i := lastFullyAppliedRound + 1; i <= latestBlockRound; i++ { syncing, ok := syncingRounds[i] @@ -1097,10 +1056,10 @@ func (n *Node) worker() { // nolint: gocyclo syncingRounds[i] = syncing if i == latestBlockRound { - storageWorkerLastPendingRound.With(n.getMetricLabels()).Set(float64(i)) + storageWorkerLastPendingRound.With(w.getMetricLabels()).Set(float64(i)) } } - n.logger.Debug("preparing round sync", + w.logger.Debug("preparing round sync", "round", i, "outstanding_mask", syncing.outstanding, "awaiting_retry", syncing.awaitingRetry, @@ -1128,18 +1087,18 @@ func (n *Node) worker() { // nolint: gocyclo if !syncing.outstanding.contains(rootType) && syncing.awaitingRetry.contains(rootType) { syncing.scheduleDiff(rootType) wg.Add(1) - n.fetchPool.Submit(func() { + fetchPool.Submit(func() { defer wg.Done() - n.fetchDiff(this.Round, prevRoots[i], this.Roots[i]) + w.fetchDiff(ctx, this.Round, prevRoots[i], this.Roots[i]) }) } } } } - n.statusLock.Lock() - n.status = api.StatusSyncingRounds - n.statusLock.Unlock() + w.statusLock.Lock() + w.status = api.StatusSyncingRounds + w.statusLock.Unlock() pendingApply := &minRoundQueue{} pendingFinalize := &minRoundQueue{} @@ -1149,7 +1108,6 @@ func (n *Node) worker() { // nolint: gocyclo // including all missing rounds since the last fully applied one. Fetched diffs are then applied // in round order, ensuring no gaps. Once a round has all its roots applied, background finalization // for that round is triggered asynchronously, not blocking concurrent fetching and diff application. -mainLoop: for { // Drain the Apply and Finalize queues first, before waiting for new events in the select below. @@ -1163,7 +1121,7 @@ mainLoop: // Apply the write log if one exists. err = nil if lastDiff.fetched { - err = n.localStorage.Apply(n.ctx, &storageApi.ApplyRequest{ + err = w.localStorage.Apply(ctx, &storageApi.ApplyRequest{ Namespace: lastDiff.thisRoot.Namespace, RootType: lastDiff.thisRoot.Type, SrcRound: lastDiff.prevRoot.Version, @@ -1178,7 +1136,7 @@ mainLoop: case errors.Is(err, storageApi.ErrExpectedRootMismatch): lastDiff.pf.RecordBadPeer() default: - n.logger.Error("can't apply write log", + w.logger.Error("can't apply write log", "err", err, "old_root", lastDiff.prevRoot, "new_root", lastDiff.thisRoot, @@ -1198,14 +1156,14 @@ mainLoop: } // We have fully synced the given round. - n.logger.Debug("finished syncing round", "round", lastDiff.round) + w.logger.Debug("finished syncing round", "round", lastDiff.round) delete(syncingRounds, lastDiff.round) summary := summaryCache[lastDiff.round] delete(summaryCache, lastDiff.round-1) lastFullyAppliedRound = lastDiff.round - storageWorkerLastSyncedRound.With(n.getMetricLabels()).Set(float64(lastDiff.round)) - storageWorkerRoundSyncLatency.With(n.getMetricLabels()).Observe(time.Since(syncing.startedAt).Seconds()) + storageWorkerLastSyncedRound.With(w.getMetricLabels()).Set(float64(lastDiff.round)) + storageWorkerRoundSyncLatency.With(w.getMetricLabels()).Observe(time.Since(syncing.startedAt).Seconds()) // Finalize storage for this round. This happens asynchronously // with respect to Apply operations for subsequent rounds. @@ -1222,15 +1180,15 @@ mainLoop: wg.Add(1) go func() { // Don't block fetching and applying remaining rounds. defer wg.Done() - n.finalize(lastSummary) + w.finalize(ctx, lastSummary) }() continue } select { - case inBlk := <-n.blockCh.Out(): + case inBlk := <-w.blockCh.Out(): blk := inBlk.(*block.Block) - n.logger.Debug("incoming block", + w.logger.Debug("incoming block", "round", blk.Header.Round, "last_synced", lastFullyAppliedRound, "last_finalized", cachedLastRound, @@ -1238,9 +1196,9 @@ mainLoop: // Check if we're far enough to reasonably register as available. latestBlockRound = blk.Header.Round - n.nudgeAvailability(cachedLastRound, latestBlockRound) + w.nudgeAvailability(cachedLastRound, latestBlockRound) - if _, ok := summaryCache[lastFullyAppliedRound]; !ok && lastFullyAppliedRound == n.undefinedRound { + if _, ok := summaryCache[lastFullyAppliedRound]; !ok && lastFullyAppliedRound == w.undefinedRound { dummy := blockSummary{ Namespace: blk.Header.Namespace, Round: lastFullyAppliedRound + 1, @@ -1264,7 +1222,7 @@ mainLoop: // since the undefined round may be unsigned -1 and in this case the loop // would not do any iterations. startSummaryRound := lastFullyAppliedRound - if startSummaryRound == n.undefinedRound { + if startSummaryRound == w.undefinedRound { startSummaryRound++ } for i := startSummaryRound; i < blk.Header.Round; i++ { @@ -1272,14 +1230,9 @@ mainLoop: continue } var oldBlock *block.Block - oldBlock, err = n.commonNode.Runtime.History().GetCommittedBlock(n.ctx, i) + oldBlock, err = w.commonNode.Runtime.History().GetCommittedBlock(ctx, i) if err != nil { - n.logger.Error("can't get block for round", - "err", err, - "round", i, - "current_round", blk.Header.Round, - ) - panic("can't get block in storage worker") + return fmt.Errorf("failed to get block for round %d (current round: %d): %w", i, blk.Header.Round, err) } summaryCache[i] = summaryFromBlock(oldBlock) } @@ -1291,14 +1244,14 @@ mainLoop: heartbeat.reset() case <-heartbeat.C: - if latestBlockRound != n.undefinedRound { - n.logger.Debug("heartbeat", "in_flight_rounds", len(syncingRounds)) + if latestBlockRound != w.undefinedRound { + w.logger.Debug("heartbeat", "in_flight_rounds", len(syncingRounds)) triggerRoundFetches() } - case item := <-n.diffCh: + case item := <-w.diffCh: if item.err != nil { - n.logger.Error("error calling getdiff", + w.logger.Error("error calling getdiff", "err", item.err, "round", item.round, "old_root", item.prevRoot, @@ -1315,81 +1268,38 @@ mainLoop: // when we're syncing and are far behind. triggerRoundFetches() - case finalized := <-n.finalizeCh: + case finalized := <-w.finalizeCh: // If finalization failed, things start falling apart. // There's no point redoing it, since it's probably not a transient // error, and cachedLastRound also can't be updated legitimately. if finalized.err != nil { - // Request a node shutdown given that syncing is effectively blocked. - _ = n.commonNode.HostNode.RequestShutdown(n.ctx, false) - break mainLoop + return fmt.Errorf("failed to finalize (round: %d): %w", finalized.summary.Round, finalized.err) } // No further sync or out of order handling needed here, since // only one finalize at a time is triggered (for round cachedLastRound+1) - cachedLastRound, err = n.flushSyncedState(finalized.summary) + cachedLastRound, err = w.flushSyncedState(finalized.summary) if err != nil { - n.logger.Error("failed to flush synced state", + w.logger.Error("failed to flush synced state", "err", err, ) } - storageWorkerLastFullRound.With(n.getMetricLabels()).Set(float64(finalized.summary.Round)) + storageWorkerLastFullRound.With(w.getMetricLabels()).Set(float64(finalized.summary.Round)) // Check if we're far enough to reasonably register as available. - n.nudgeAvailability(cachedLastRound, latestBlockRound) + w.nudgeAvailability(cachedLastRound, latestBlockRound) // Notify the checkpointer that there is a new finalized round. if config.GlobalConfig.Storage.Checkpointer.Enabled { - n.checkpointer.NotifyNewVersion(finalized.summary.Round) + w.checkpointer.NotifyNewVersion(finalized.summary.Round) } - case <-n.ctx.Done(): - break mainLoop + case <-ctx.Done(): + return ctx.Err() } } - wg.Wait() // blockCh will be garbage-collected without being closed. It can potentially still contain // some new blocks, but only as many as were already in-flight at the point when the main // context was canceled. } - -type pruneHandler struct { - logger *logging.Logger - node *Node -} - -func (p *pruneHandler) Prune(rounds []uint64) error { - // Make sure we never prune past what was synced. - lastSycnedRound, _, _ := p.node.GetLastSynced() - - for _, round := range rounds { - if round >= lastSycnedRound { - return fmt.Errorf("worker/storage: tried to prune past last synced round (last synced: %d)", - lastSycnedRound, - ) - } - - // TODO: Make sure we don't prune rounds that need to be checkpointed but haven't been yet. - - p.logger.Debug("pruning storage for round", "round", round) - - // Prune given block. - err := p.node.localStorage.NodeDB().Prune(round) - switch err { - case nil: - case mkvsDB.ErrNotEarliest: - p.logger.Debug("skipping non-earliest round", - "round", round, - ) - continue - default: - p.logger.Error("failed to prune block", - "err", err, - ) - return err - } - } - - return nil -} diff --git a/go/worker/storage/service_internal.go b/go/worker/storage/service_internal.go index cf5d0ecf064..bd1843e0379 100644 --- a/go/worker/storage/service_internal.go +++ b/go/worker/storage/service_internal.go @@ -9,12 +9,12 @@ import ( var _ api.StorageWorker = (*Worker)(nil) func (w *Worker) GetLastSyncedRound(_ context.Context, request *api.GetLastSyncedRoundRequest) (*api.GetLastSyncedRoundResponse, error) { - node := w.runtimes[request.RuntimeID] - if node == nil { + worker := w.runtimes[request.RuntimeID] + if worker == nil { return nil, api.ErrRuntimeNotFound } - round, ioRoot, stateRoot := node.GetLastSynced() + round, ioRoot, stateRoot := worker.GetLastSynced() return &api.GetLastSyncedRoundResponse{ Round: round, IORoot: ioRoot, diff --git a/go/worker/storage/worker.go b/go/worker/storage/worker.go index f49988bd1c7..9797db4ac45 100644 --- a/go/worker/storage/worker.go +++ b/go/worker/storage/worker.go @@ -1,8 +1,11 @@ package storage import ( + "context" "fmt" + "golang.org/x/sync/errgroup" + "github.com/oasisprotocol/oasis-core/go/common" "github.com/oasisprotocol/oasis-core/go/common/grpc" "github.com/oasisprotocol/oasis-core/go/common/logging" @@ -15,7 +18,7 @@ import ( "github.com/oasisprotocol/oasis-core/go/worker/storage/committee" ) -// Worker is a worker handling storage operations. +// Worker is a worker handling storage operations for all common worker runtimes. type Worker struct { enabled bool @@ -26,7 +29,10 @@ type Worker struct { initCh chan struct{} quitCh chan struct{} - runtimes map[common.Namespace]*committee.Node + runtimes map[common.Namespace]*committee.Worker + + ctx context.Context + cancel context.CancelFunc } // New constructs a new storage worker. @@ -35,6 +41,7 @@ func New( commonWorker *workerCommon.Worker, registration *registration.Worker, ) (*Worker, error) { + ctx, cancel := context.WithCancel(context.Background()) enabled := config.GlobalConfig.Mode.HasLocalStorage() && len(commonWorker.GetRuntimes()) > 0 s := &Worker{ @@ -44,14 +51,16 @@ func New( logger: logging.GetLogger("worker/storage"), initCh: make(chan struct{}), quitCh: make(chan struct{}), - runtimes: make(map[common.Namespace]*committee.Node), + runtimes: make(map[common.Namespace]*committee.Worker), + ctx: ctx, + cancel: cancel, } if !enabled { return s, nil } - // Start storage node for every runtime. + // Register the storage worker for every runtime. for id, rt := range s.commonWorker.GetRuntimes() { if err := s.registerRuntime(rt); err != nil { return nil, fmt.Errorf("failed to create storage worker for runtime %s: %w", id, err) @@ -90,7 +99,7 @@ func (w *Worker) registerRuntime(commonNode *committeeCommon.Node) error { return fmt.Errorf("can't create local storage backend: %w", err) } - node, err := committee.NewNode( + worker, err := committee.New( commonNode, rp, rpRPC, @@ -105,8 +114,8 @@ func (w *Worker) registerRuntime(commonNode *committeeCommon.Node) error { return err } commonNode.Runtime.RegisterStorage(localStorage) - commonNode.AddHooks(node) - w.runtimes[id] = node + commonNode.AddHooks(worker) + w.runtimes[id] = worker w.logger.Info("new runtime registered", "runtime_id", id, @@ -115,7 +124,7 @@ func (w *Worker) registerRuntime(commonNode *committeeCommon.Node) error { return nil } -// Name returns the service name. +// Name returns the worker name. func (w *Worker) Name() string { return "storage worker" } @@ -133,6 +142,21 @@ func (w *Worker) Initialized() <-chan struct{} { // Start starts the storage service. func (w *Worker) Start() error { + go func() { + if err := w.Serve(w.ctx); err != nil { + w.logger.Error("worker stopped", "error", err) + } + }() + return nil +} + +// Serve starts a state sync worker for each of the configured runtime, unless +// disabled. +// +// If any state sync worker returns an error, it cancels the remaining ones and +// waits for all of them to finish. The error from the first failing worker is +// returned. +func (w *Worker) Serve(ctx context.Context) error { if !w.enabled { w.logger.Info("not starting storage worker as it is disabled") @@ -142,34 +166,34 @@ func (w *Worker) Start() error { return nil } - // Wait for all runtimes to terminate. - go func() { - defer close(w.quitCh) - - for _, r := range w.runtimes { - <-r.Quit() - } + w.logger.Info("starting", "num_runtimes", len(w.runtimes)) + defer func() { + close(w.quitCh) + w.logger.Info("stopped") }() - // Start all runtimes and wait for initialization. go func() { - w.logger.Info("starting storage sync services", "num_runtimes", len(w.runtimes)) - - for _, r := range w.runtimes { - _ = r.Start() - } - - // Wait for runtimes to be initialized. for _, r := range w.runtimes { <-r.Initialized() } - - w.logger.Info("storage worker started") - + w.logger.Info("initialized") close(w.initCh) }() - return nil + return w.serve(ctx) +} + +func (w *Worker) serve(ctx context.Context) error { + g, ctx := errgroup.WithContext(ctx) + for id, r := range w.runtimes { + g.Go(func() error { + if err := r.Serve(ctx); err != nil { + return fmt.Errorf("storage worker failed (runtimeID: %s): %w", id, err) + } + return nil + }) + } + return g.Wait() } // Stop halts the service. @@ -179,9 +203,9 @@ func (w *Worker) Stop() { return } - for _, r := range w.runtimes { - r.Stop() - } + w.logger.Info("stopping") + w.cancel() + <-w.quitCh } // Quit returns a channel that will be closed when the service terminates. @@ -196,6 +220,6 @@ func (w *Worker) Cleanup() { // GetRuntime returns a storage committee node for the given runtime (if available). // // In case the runtime with the specified id was not configured for this node it returns nil. -func (w *Worker) GetRuntime(id common.Namespace) *committee.Node { +func (w *Worker) GetRuntime(id common.Namespace) *committee.Worker { return w.runtimes[id] }