From a795d5060b0de7f26abe74e75884909a076d0f02 Mon Sep 17 00:00:00 2001 From: Martin Tomazic Date: Sat, 2 Aug 2025 18:28:12 +0200 Subject: [PATCH 01/10] go/worker/storage: Rename committee package to statesync Also rename node to worker, to avoid confusion. --- go/oasis-node/cmd/node/node_control.go | 4 +- go/oasis-test-runner/oasis/log.go | 4 +- .../storage/committee/checkpoint_sync.go | 90 ++-- go/worker/storage/committee/metrics.go | 4 +- .../storage/committee/{node.go => worker.go} | 476 +++++++++--------- go/worker/storage/service_internal.go | 6 +- go/worker/storage/worker.go | 18 +- 7 files changed, 305 insertions(+), 297 deletions(-) rename go/worker/storage/committee/{node.go => worker.go} (73%) diff --git a/go/oasis-node/cmd/node/node_control.go b/go/oasis-node/cmd/node/node_control.go index 9310d0780f5..450d00e6b04 100644 --- a/go/oasis-node/cmd/node/node_control.go +++ b/go/oasis-node/cmd/node/node_control.go @@ -312,8 +312,8 @@ func (n *Node) getRuntimeStatus(ctx context.Context) (map[common.Namespace]contr } // Fetch storage worker status. - if storageNode := n.StorageWorker.GetRuntime(rt.ID()); storageNode != nil { - status.Storage, err = storageNode.GetStatus(ctx) + if storageWorker := n.StorageWorker.GetRuntime(rt.ID()); storageWorker != nil { + status.Storage, err = storageWorker.GetStatus(ctx) if err != nil { logger.Error("failed to fetch storage worker status", "err", err) } diff --git a/go/oasis-test-runner/oasis/log.go b/go/oasis-test-runner/oasis/log.go index cd38354d83f..af33a5cd156 100644 --- a/go/oasis-test-runner/oasis/log.go +++ b/go/oasis-test-runner/oasis/log.go @@ -8,7 +8,7 @@ import ( roothash "github.com/oasisprotocol/oasis-core/go/roothash/api" "github.com/oasisprotocol/oasis-core/go/roothash/api/commitment" upgrade "github.com/oasisprotocol/oasis-core/go/upgrade/api" - workerStorage "github.com/oasisprotocol/oasis-core/go/worker/storage/committee" + storageWorker "github.com/oasisprotocol/oasis-core/go/worker/storage/committee" ) // LogAssertEvent returns a handler which checks whether a specific log event was @@ -116,7 +116,7 @@ func LogAssertRoothashRoothashReindexing() log.WatcherHandlerFactory { // LogAssertCheckpointSync returns a handler which checks whether initial storage sync from // a checkpoint was successful or not. func LogAssertCheckpointSync() log.WatcherHandlerFactory { - return LogAssertEvent(workerStorage.LogEventCheckpointSyncSuccess, "checkpoint sync did not succeed") + return LogAssertEvent(storageWorker.LogEventCheckpointSyncSuccess, "checkpoint sync did not succeed") } // LogAssertDiscrepancyMajorityFailure returns a handler which checks whether a discrepancy resolution diff --git a/go/worker/storage/committee/checkpoint_sync.go b/go/worker/storage/committee/checkpoint_sync.go index ad553272a90..cfed8ed7d26 100644 --- a/go/worker/storage/committee/checkpoint_sync.go +++ b/go/worker/storage/committee/checkpoint_sync.go @@ -21,7 +21,7 @@ import ( const ( // cpListsTimeout is the timeout for fetching checkpoints from all nodes. cpListsTimeout = 30 * time.Second - // cpRestoreTimeout is the timeout for restoring a checkpoint chunk from a node. + // cpRestoreTimeout is the timeout for restoring a checkpoint chunk from the remote peer. cpRestoreTimeout = 60 * time.Second checkpointStatusDone = 0 @@ -37,7 +37,7 @@ var ErrNoUsableCheckpoints = errors.New("storage: no checkpoint could be synced" // CheckpointSyncConfig is the checkpoint sync configuration. type CheckpointSyncConfig struct { - // Disabled specifies whether checkpoint sync should be disabled. In this case the node will + // Disabled specifies whether checkpoint sync should be disabled. In this case the storage worker will // only sync by applying all diffs from genesis. Disabled bool @@ -81,7 +81,7 @@ func (h *chunkHeap) Pop() any { return ret } -func (n *Node) checkpointChunkFetcher( +func (w *Worker) checkpointChunkFetcher( ctx context.Context, chunkDispatchCh chan *chunk, chunkReturnCh chan *chunk, @@ -103,9 +103,9 @@ func (n *Node) checkpointChunkFetcher( defer cancel() // Fetch chunk from peers. - rsp, pf, err := n.fetchChunk(chunkCtx, chunk) + rsp, pf, err := w.fetchChunk(chunkCtx, chunk) if err != nil { - n.logger.Error("failed to fetch chunk from peers", + w.logger.Error("failed to fetch chunk from peers", "err", err, "chunk", chunk.Index, ) @@ -114,7 +114,7 @@ func (n *Node) checkpointChunkFetcher( } // Restore fetched chunk. - done, err := n.localStorage.Checkpointer().RestoreChunk(chunkCtx, chunk.Index, bytes.NewBuffer(rsp)) + done, err := w.localStorage.Checkpointer().RestoreChunk(chunkCtx, chunk.Index, bytes.NewBuffer(rsp)) cancel() switch { @@ -124,7 +124,7 @@ func (n *Node) checkpointChunkFetcher( chunkReturnCh <- nil return case err != nil: - n.logger.Error("chunk restoration failed", + w.logger.Error("chunk restoration failed", "chunk", chunk.Index, "root", chunk.Root, "err", err, @@ -157,8 +157,8 @@ func (n *Node) checkpointChunkFetcher( // fetchChunk fetches chunk using checkpoint sync p2p protocol client. // // In case of no peers or error, it fallbacks to the legacy storage sync protocol. -func (n *Node) fetchChunk(ctx context.Context, chunk *chunk) ([]byte, rpc.PeerFeedback, error) { - rsp1, pf, err := n.checkpointSync.GetCheckpointChunk( +func (w *Worker) fetchChunk(ctx context.Context, chunk *chunk) ([]byte, rpc.PeerFeedback, error) { + rsp1, pf, err := w.checkpointSync.GetCheckpointChunk( ctx, &checkpointsync.GetCheckpointChunkRequest{ Version: chunk.Version, @@ -175,7 +175,7 @@ func (n *Node) fetchChunk(ctx context.Context, chunk *chunk) ([]byte, rpc.PeerFe return rsp1.Chunk, pf, nil } - rsp2, pf, err := n.legacyStorageSync.GetCheckpointChunk( + rsp2, pf, err := w.legacyStorageSync.GetCheckpointChunk( ctx, &synclegacy.GetCheckpointChunkRequest{ Version: chunk.Version, @@ -194,8 +194,8 @@ func (n *Node) fetchChunk(ctx context.Context, chunk *chunk) ([]byte, rpc.PeerFe return rsp2.Chunk, pf, nil } -func (n *Node) handleCheckpoint(check *checkpointsync.Checkpoint, maxParallelRequests uint) (cpStatus int, rerr error) { - if err := n.localStorage.Checkpointer().StartRestore(n.ctx, check.Metadata); err != nil { +func (w *Worker) handleCheckpoint(check *checkpointsync.Checkpoint, maxParallelRequests uint) (cpStatus int, rerr error) { + if err := w.localStorage.Checkpointer().StartRestore(w.ctx, check.Metadata); err != nil { // Any previous restores were already aborted by the driver up the call stack, so // things should have been going smoothly here; bail. return checkpointStatusBail, fmt.Errorf("can't start checkpoint restore: %w", err) @@ -208,9 +208,9 @@ func (n *Node) handleCheckpoint(check *checkpointsync.Checkpoint, maxParallelReq } // Abort has to succeed even if we were interrupted by context cancellation. ctx := context.Background() - if err := n.localStorage.Checkpointer().AbortRestore(ctx); err != nil { + if err := w.localStorage.Checkpointer().AbortRestore(ctx); err != nil { cpStatus = checkpointStatusBail - n.logger.Error("error while aborting checkpoint restore on handler exit, aborting sync", + w.logger.Error("error while aborting checkpoint restore on handler exit, aborting sync", "err", err, ) } @@ -222,7 +222,7 @@ func (n *Node) handleCheckpoint(check *checkpointsync.Checkpoint, maxParallelReq chunkReturnCh := make(chan *chunk, maxParallelRequests) errorCh := make(chan int, maxParallelRequests) - ctx, cancel := context.WithCancel(n.ctx) + ctx, cancel := context.WithCancel(w.ctx) // Spawn the worker group to fetch and restore checkpoint chunks. var workerGroup sync.WaitGroup @@ -231,7 +231,7 @@ func (n *Node) handleCheckpoint(check *checkpointsync.Checkpoint, maxParallelReq workerGroup.Add(1) go func() { defer workerGroup.Done() - n.checkpointChunkFetcher(ctx, chunkDispatchCh, chunkReturnCh, errorCh) + w.checkpointChunkFetcher(ctx, chunkDispatchCh, chunkReturnCh, errorCh) }() } go func() { @@ -264,7 +264,7 @@ func (n *Node) handleCheckpoint(check *checkpointsync.Checkpoint, maxParallelReq checkpoint: check, }) } - n.logger.Debug("checkpoint chunks prepared for dispatch", + w.logger.Debug("checkpoint chunks prepared for dispatch", "chunks", len(check.Chunks), "checkpoint_root", check.Root, ) @@ -283,8 +283,8 @@ func (n *Node) handleCheckpoint(check *checkpointsync.Checkpoint, maxParallelReq } select { - case <-n.ctx.Done(): - return checkpointStatusBail, n.ctx.Err() + case <-w.ctx.Done(): + return checkpointStatusBail, w.ctx.Err() case returned := <-chunkReturnCh: if returned == nil { @@ -313,13 +313,13 @@ func (n *Node) handleCheckpoint(check *checkpointsync.Checkpoint, maxParallelReq } } -func (n *Node) getCheckpointList() ([]*checkpointsync.Checkpoint, error) { - ctx, cancel := context.WithTimeout(n.ctx, cpListsTimeout) +func (w *Worker) getCheckpointList() ([]*checkpointsync.Checkpoint, error) { + ctx, cancel := context.WithTimeout(w.ctx, cpListsTimeout) defer cancel() - list, err := n.fetchCheckpoints(ctx) + list, err := w.fetchCheckpoints(ctx) if err != nil { - n.logger.Error("failed to retrieve any checkpoints", + w.logger.Error("failed to retrieve any checkpoints", "err", err, ) return nil, err @@ -334,15 +334,15 @@ func (n *Node) getCheckpointList() ([]*checkpointsync.Checkpoint, error) { // fetchCheckpoints fetches checkpoints using checkpoint sync p2p protocol client. // // In case of no peers, error or no checkpoints, it fallbacks to the legacy storage sync protocol. -func (n *Node) fetchCheckpoints(ctx context.Context) ([]*checkpointsync.Checkpoint, error) { - list1, err := n.checkpointSync.GetCheckpoints(ctx, &checkpointsync.GetCheckpointsRequest{ +func (w *Worker) fetchCheckpoints(ctx context.Context) ([]*checkpointsync.Checkpoint, error) { + list1, err := w.checkpointSync.GetCheckpoints(ctx, &checkpointsync.GetCheckpointsRequest{ Version: 1, }) if err == nil && len(list1) > 0 { // if NO error and at least one checkpoint return list1, nil } - list2, err := n.legacyStorageSync.GetCheckpoints(ctx, &synclegacy.GetCheckpointsRequest{ + list2, err := w.legacyStorageSync.GetCheckpoints(ctx, &synclegacy.GetCheckpointsRequest{ Version: 1, }) if err != nil { @@ -369,8 +369,8 @@ func sortCheckpoints(s []*checkpointsync.Checkpoint) { }) } -func (n *Node) checkCheckpointUsable(cp *checkpointsync.Checkpoint, remainingMask outstandingMask, genesisRound uint64) bool { - namespace := n.commonNode.Runtime.ID() +func (w *Worker) checkCheckpointUsable(cp *checkpointsync.Checkpoint, remainingMask outstandingMask, genesisRound uint64) bool { + namespace := w.commonNode.Runtime.ID() if !namespace.Equal(&cp.Root.Namespace) { // Not for the right runtime. return false @@ -380,12 +380,12 @@ func (n *Node) checkCheckpointUsable(cp *checkpointsync.Checkpoint, remainingMas return false } - blk, err := n.commonNode.Runtime.History().GetCommittedBlock(n.ctx, cp.Root.Version) + blk, err := w.commonNode.Runtime.History().GetCommittedBlock(w.ctx, cp.Root.Version) if err != nil { - n.logger.Error("can't get block information for checkpoint, skipping", "err", err, "root", cp.Root) + w.logger.Error("can't get block information for checkpoint, skipping", "err", err, "root", cp.Root) return false } - _, lastIORoot, lastStateRoot := n.GetLastSynced() + _, lastIORoot, lastStateRoot := w.GetLastSynced() lastVersions := map[storageApi.RootType]uint64{ storageApi.RootTypeIO: lastIORoot.Version, storageApi.RootTypeState: lastStateRoot.Version, @@ -401,18 +401,18 @@ func (n *Node) checkCheckpointUsable(cp *checkpointsync.Checkpoint, remainingMas } } } - n.logger.Info("checkpoint for unknown root skipped", "root", cp.Root) + w.logger.Info("checkpoint for unknown root skipped", "root", cp.Root) return false } -func (n *Node) syncCheckpoints(genesisRound uint64, wantOnlyGenesis bool) (*blockSummary, error) { +func (w *Worker) syncCheckpoints(genesisRound uint64, wantOnlyGenesis bool) (*blockSummary, error) { // Store roots and round info for checkpoints that finished syncing. // Round and namespace info will get overwritten as rounds are skipped // for errors, driven by remainingRoots. var syncState blockSummary // Fetch checkpoints from peers. - cps, err := n.getCheckpointList() + cps, err := w.getCheckpointList() if err != nil { return nil, fmt.Errorf("can't get checkpoint list from peers: %w", err) } @@ -440,8 +440,8 @@ func (n *Node) syncCheckpoints(genesisRound uint64, wantOnlyGenesis bool) (*bloc if !multipartRunning { return } - if err := n.localStorage.NodeDB().AbortMultipartInsert(); err != nil { - n.logger.Error("error aborting multipart restore on exit from syncer", + if err := w.localStorage.NodeDB().AbortMultipartInsert(); err != nil { + w.logger.Error("error aborting multipart restore on exit from syncer", "err", err, ) } @@ -449,7 +449,7 @@ func (n *Node) syncCheckpoints(genesisRound uint64, wantOnlyGenesis bool) (*bloc for _, check := range cps { - if check.Root.Version < genesisRound || !n.checkCheckpointUsable(check, remainingRoots, genesisRound) { + if check.Root.Version < genesisRound || !w.checkCheckpointUsable(check, remainingRoots, genesisRound) { continue } @@ -458,10 +458,10 @@ func (n *Node) syncCheckpoints(genesisRound uint64, wantOnlyGenesis bool) (*bloc // previous retries. Aborting multipart works with no multipart in // progress too. multipartRunning = false - if err := n.localStorage.NodeDB().AbortMultipartInsert(); err != nil { + if err := w.localStorage.NodeDB().AbortMultipartInsert(); err != nil { return nil, fmt.Errorf("error aborting previous multipart restore: %w", err) } - if err := n.localStorage.NodeDB().StartMultipartInsert(check.Root.Version); err != nil { + if err := w.localStorage.NodeDB().StartMultipartInsert(check.Root.Version); err != nil { return nil, fmt.Errorf("error starting multipart insert for round %d: %w", check.Root.Version, err) } multipartRunning = true @@ -486,18 +486,18 @@ func (n *Node) syncCheckpoints(genesisRound uint64, wantOnlyGenesis bool) (*bloc } } - status, err := n.handleCheckpoint(check, n.checkpointSyncCfg.ChunkFetcherCount) + status, err := w.handleCheckpoint(check, w.checkpointSyncCfg.ChunkFetcherCount) switch status { case checkpointStatusDone: - n.logger.Info("successfully restored from checkpoint", "root", check.Root, "mask", mask) + w.logger.Info("successfully restored from checkpoint", "root", check.Root, "mask", mask) syncState.Namespace = check.Root.Namespace syncState.Round = check.Root.Version syncState.Roots = append(syncState.Roots, check.Root) remainingRoots.remove(check.Root.Type) if remainingRoots.isEmpty() { - if err = n.localStorage.NodeDB().Finalize(syncState.Roots); err != nil { - n.logger.Error("can't finalize version after all checkpoints restored", + if err = w.localStorage.NodeDB().Finalize(syncState.Roots); err != nil { + w.logger.Error("can't finalize version after all checkpoints restored", "err", err, "version", prevVersion, "roots", syncState.Roots, @@ -510,10 +510,10 @@ func (n *Node) syncCheckpoints(genesisRound uint64, wantOnlyGenesis bool) (*bloc } continue case checkpointStatusNext: - n.logger.Info("error trying to restore from checkpoint, trying next most recent", "root", check.Root, "err", err) + w.logger.Info("error trying to restore from checkpoint, trying next most recent", "root", check.Root, "err", err) continue case checkpointStatusBail: - n.logger.Error("error trying to restore from checkpoint, unrecoverable", "root", check.Root, "err", err) + w.logger.Error("error trying to restore from checkpoint, unrecoverable", "root", check.Root, "err", err) return nil, fmt.Errorf("error restoring from checkpoints: %w", err) } } diff --git a/go/worker/storage/committee/metrics.go b/go/worker/storage/committee/metrics.go index 7f641f71fdd..6b3ad5e0216 100644 --- a/go/worker/storage/committee/metrics.go +++ b/go/worker/storage/committee/metrics.go @@ -49,9 +49,9 @@ var ( prometheusOnce sync.Once ) -func (n *Node) getMetricLabels() prometheus.Labels { +func (w *Worker) getMetricLabels() prometheus.Labels { return prometheus.Labels{ - "runtime": n.commonNode.Runtime.ID().String(), + "runtime": w.commonNode.Runtime.ID().String(), } } diff --git a/go/worker/storage/committee/node.go b/go/worker/storage/committee/worker.go similarity index 73% rename from go/worker/storage/committee/node.go rename to go/worker/storage/committee/worker.go index a1318a0802f..d487d0676bc 100644 --- a/go/worker/storage/committee/node.go +++ b/go/worker/storage/committee/worker.go @@ -1,3 +1,5 @@ +// Package committee defines the logic responsible for initializing, syncing, +// and pruning of the runtime state using the relevant p2p protocol clients. package committee import ( @@ -39,7 +41,7 @@ import ( ) var ( - _ committee.NodeHooks = (*Node)(nil) + _ committee.NodeHooks = (*Worker)(nil) // ErrNonLocalBackend is the error returned when the storage backend doesn't implement the LocalBackend interface. ErrNonLocalBackend = errors.New("storage: storage backend doesn't support local storage") @@ -118,8 +120,18 @@ type finalizeResult struct { err error } -// Node watches blocks for storage changes. -type Node struct { +// Worker is the runtime storage worker, responsible for syncing state +// that corresponds to the incoming runtime block headers received from the +// consensus service. +// +// In addition this worker is responsible for: +// 1. Initializing the runtime state, possibly using checkpoints (if configured). +// 2. Pruning the state as specified by the configuration. +// 3. Optionally creating runtime state checkpoints (used by other nodes) for the state sync. +// 4. Creating (and optionally advertising) statesync p2p protocol clients and servers. +// 5. Registering node availability when it has synced sufficiently close to +// the latest known block header. +type Worker struct { commonNode *committee.Node roleProvider registration.RoleProvider @@ -162,21 +174,22 @@ type Node struct { initCh chan struct{} } -func NewNode( +// New creates a new storage worker. +func New( commonNode *committee.Node, roleProvider registration.RoleProvider, rpcRoleProvider registration.RoleProvider, workerCommonCfg workerCommon.Config, localStorage storageApi.LocalBackend, checkpointSyncCfg *CheckpointSyncConfig, -) (*Node, error) { +) (*Worker, error) { initMetrics() // Create the fetcher pool. fetchPool := workerpool.New("storage_fetch/" + commonNode.Runtime.ID().String()) fetchPool.Resize(config.GlobalConfig.Storage.FetcherCount) - n := &Node{ + w := &Worker{ commonNode: commonNode, roleProvider: roleProvider, @@ -208,21 +221,21 @@ func NewNode( } // Initialize sync state. - n.syncedState.Round = defaultUndefinedRound + w.syncedState.Round = defaultUndefinedRound - n.ctx, n.ctxCancel = context.WithCancel(context.Background()) + w.ctx, w.ctxCancel = context.WithCancel(context.Background()) // Create a checkpointer (even if checkpointing is disabled) to ensure the genesis checkpoint is available. - checkpointer, err := n.newCheckpointer(n.ctx, commonNode, localStorage) + checkpointer, err := w.newCheckpointer(w.ctx, commonNode, localStorage) if err != nil { return nil, fmt.Errorf("failed to create checkpointer: %w", err) } - n.checkpointer = checkpointer + w.checkpointer = checkpointer // Register prune handler. commonNode.Runtime.History().Pruner().RegisterHandler(&pruneHandler{ - logger: n.logger, - node: n, + logger: w.logger, + worker: w, }) // Advertise and serve p2p protocols. @@ -236,14 +249,14 @@ func NewNode( } // Create p2p protocol clients. - n.legacyStorageSync = synclegacy.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) - n.diffSync = diffsync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) - n.checkpointSync = checkpointsync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) + w.legacyStorageSync = synclegacy.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) + w.diffSync = diffsync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) + w.checkpointSync = checkpointsync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) - return n, nil + return w, nil } -func (n *Node) newCheckpointer(ctx context.Context, commonNode *committee.Node, localStorage storageApi.LocalBackend) (checkpoint.Checkpointer, error) { +func (w *Worker) newCheckpointer(ctx context.Context, commonNode *committee.Node, localStorage storageApi.LocalBackend) (checkpoint.Checkpointer, error) { checkInterval := checkpoint.CheckIntervalDisabled if config.GlobalConfig.Storage.Checkpointer.Enabled { checkInterval = config.GlobalConfig.Storage.Checkpointer.CheckInterval @@ -300,100 +313,95 @@ func (n *Node) newCheckpointer(ctx context.Context, commonNode *committee.Node, // Service interface. -// Name returns the service name. -func (n *Node) Name() string { - return "committee node" -} - // Start causes the worker to start responding to CometBFT new block events. -func (n *Node) Start() error { - go n.worker() +func (w *Worker) Start() error { + go w.worker() if config.GlobalConfig.Storage.Checkpointer.Enabled { - go n.consensusCheckpointSyncer() + go w.consensusCheckpointSyncer() } return nil } // Stop causes the worker to stop watching and shut down. -func (n *Node) Stop() { - n.statusLock.Lock() - n.status = api.StatusStopping - n.statusLock.Unlock() +func (w *Worker) Stop() { + w.statusLock.Lock() + w.status = api.StatusStopping + w.statusLock.Unlock() - n.fetchPool.Stop() + w.fetchPool.Stop() - n.ctxCancel() + w.ctxCancel() } // Quit returns a channel that will be closed when the worker stops. -func (n *Node) Quit() <-chan struct{} { - return n.quitCh +func (w *Worker) Quit() <-chan struct{} { + return w.quitCh } // Cleanup cleans up any leftover state after the worker is stopped. -func (n *Node) Cleanup() { +func (w *Worker) Cleanup() { // Nothing to do here? } // Initialized returns a channel that will be closed once the worker finished starting up. -func (n *Node) Initialized() <-chan struct{} { - return n.initCh +func (w *Worker) Initialized() <-chan struct{} { + return w.initCh } -// GetStatus returns the storage committee node status. -func (n *Node) GetStatus(context.Context) (*api.Status, error) { - n.syncedLock.RLock() - defer n.syncedLock.RUnlock() +// GetStatus returns the state sync worker status. +func (w *Worker) GetStatus(context.Context) (*api.Status, error) { + w.syncedLock.RLock() + defer w.syncedLock.RUnlock() - n.statusLock.RLock() - defer n.statusLock.RUnlock() + w.statusLock.RLock() + defer w.statusLock.RUnlock() return &api.Status{ - LastFinalizedRound: n.syncedState.Round, - Status: n.status, + LastFinalizedRound: w.syncedState.Round, + Status: w.status, }, nil } -func (n *Node) PauseCheckpointer(pause bool) error { +func (w *Worker) PauseCheckpointer(pause bool) error { if !commonFlags.DebugDontBlameOasis() { return api.ErrCantPauseCheckpointer } - n.checkpointer.Pause(pause) + w.checkpointer.Pause(pause) return nil } -// GetLocalStorage returns the local storage backend used by this storage node. -func (n *Node) GetLocalStorage() storageApi.LocalBackend { - return n.localStorage +// GetLocalStorage returns the local storage backend used by the worker. +func (w *Worker) GetLocalStorage() storageApi.LocalBackend { + return w.localStorage } // NodeHooks implementation. // HandleNewBlockEarlyLocked is guarded by CrossNode. -func (n *Node) HandleNewBlockEarlyLocked(*runtime.BlockInfo) { +func (w *Worker) HandleNewBlockEarlyLocked(*runtime.BlockInfo) { // Nothing to do here. } // HandleNewBlockLocked is guarded by CrossNode. -func (n *Node) HandleNewBlockLocked(bi *runtime.BlockInfo) { +func (w *Worker) HandleNewBlockLocked(bi *runtime.BlockInfo) { // Notify the state syncer that there is a new block. - n.blockCh.In() <- bi.RuntimeBlock + w.blockCh.In() <- bi.RuntimeBlock } // HandleRuntimeHostEventLocked is guarded by CrossNode. -func (n *Node) HandleRuntimeHostEventLocked(*host.Event) { +func (w *Worker) HandleRuntimeHostEventLocked(*host.Event) { // Nothing to do here. } // Watcher implementation. // GetLastSynced returns the height, IORoot hash and StateRoot hash of the last block that was fully synced to. -func (n *Node) GetLastSynced() (uint64, storageApi.Root, storageApi.Root) { - n.syncedLock.RLock() - defer n.syncedLock.RUnlock() +func (w *Worker) GetLastSynced() (uint64, storageApi.Root, storageApi.Root) { + w.syncedLock.RLock() + defer w.syncedLock.RUnlock() var io, state storageApi.Root - for _, root := range n.syncedState.Roots { + for _, root := range w.syncedState.Roots { switch root.Type { case storageApi.RootTypeIO: io = root @@ -402,10 +410,10 @@ func (n *Node) GetLastSynced() (uint64, storageApi.Root, storageApi.Root) { } } - return n.syncedState.Round, io, state + return w.syncedState.Round, io, state } -func (n *Node) fetchDiff(round uint64, prevRoot, thisRoot storageApi.Root) { +func (w *Worker) fetchDiff(round uint64, prevRoot, thisRoot storageApi.Root) { result := &fetchedDiff{ fetched: false, pf: rpc.NewNopPeerFeedback(), @@ -415,13 +423,13 @@ func (n *Node) fetchDiff(round uint64, prevRoot, thisRoot storageApi.Root) { } defer func() { select { - case n.diffCh <- result: - case <-n.ctx.Done(): + case w.diffCh <- result: + case <-w.ctx.Done(): } }() // Check if the new root doesn't already exist. - if n.localStorage.NodeDB().HasRoot(thisRoot) { + if w.localStorage.NodeDB().HasRoot(thisRoot) { return } @@ -436,15 +444,15 @@ func (n *Node) fetchDiff(round uint64, prevRoot, thisRoot storageApi.Root) { } // New root does not yet exist in storage and we need to fetch it from a peer. - n.logger.Debug("calling GetDiff", + w.logger.Debug("calling GetDiff", "old_root", prevRoot, "new_root", thisRoot, ) - ctx, cancel := context.WithCancel(n.ctx) + ctx, cancel := context.WithCancel(w.ctx) defer cancel() - wl, pf, err := n.getDiff(ctx, prevRoot, thisRoot) + wl, pf, err := w.getDiff(ctx, prevRoot, thisRoot) if err != nil { result.err = err return @@ -456,35 +464,35 @@ func (n *Node) fetchDiff(round uint64, prevRoot, thisRoot storageApi.Root) { // getDiff fetches writelog using diff sync p2p protocol client. // // In case of no peers or error, it fallbacks to the legacy storage sync protocol. -func (n *Node) getDiff(ctx context.Context, prevRoot, thisRoot storageApi.Root) (storageApi.WriteLog, rpc.PeerFeedback, error) { - rsp1, pf, err := n.diffSync.GetDiff(ctx, &diffsync.GetDiffRequest{StartRoot: prevRoot, EndRoot: thisRoot}) +func (w *Worker) getDiff(ctx context.Context, prevRoot, thisRoot storageApi.Root) (storageApi.WriteLog, rpc.PeerFeedback, error) { + rsp1, pf, err := w.diffSync.GetDiff(ctx, &diffsync.GetDiffRequest{StartRoot: prevRoot, EndRoot: thisRoot}) if err == nil { // if NO error return rsp1.WriteLog, pf, nil } - rsp2, pf, err := n.legacyStorageSync.GetDiff(ctx, &synclegacy.GetDiffRequest{StartRoot: prevRoot, EndRoot: thisRoot}) + rsp2, pf, err := w.legacyStorageSync.GetDiff(ctx, &synclegacy.GetDiffRequest{StartRoot: prevRoot, EndRoot: thisRoot}) if err != nil { return nil, nil, err } return rsp2.WriteLog, pf, nil } -func (n *Node) finalize(summary *blockSummary) { - err := n.localStorage.NodeDB().Finalize(summary.Roots) +func (w *Worker) finalize(summary *blockSummary) { + err := w.localStorage.NodeDB().Finalize(summary.Roots) switch err { case nil: - n.logger.Debug("storage round finalized", + w.logger.Debug("storage round finalized", "round", summary.Round, ) case storageApi.ErrAlreadyFinalized: // This can happen if we are restoring after a roothash migration or if // we crashed before updating the sync state. - n.logger.Warn("storage round already finalized", + w.logger.Warn("storage round already finalized", "round", summary.Round, ) err = nil default: - n.logger.Error("failed to finalize storage round", + w.logger.Error("failed to finalize storage round", "err", err, "round", summary.Round, ) @@ -496,31 +504,31 @@ func (n *Node) finalize(summary *blockSummary) { } select { - case n.finalizeCh <- result: - case <-n.ctx.Done(): + case w.finalizeCh <- result: + case <-w.ctx.Done(): } } -func (n *Node) initGenesis(rt *registryApi.Runtime, genesisBlock *block.Block) error { - n.logger.Info("initializing storage at genesis") +func (w *Worker) initGenesis(rt *registryApi.Runtime, genesisBlock *block.Block) error { + w.logger.Info("initializing storage at genesis") // Check what the latest finalized version in the database is as we may be using a database // from a previous version or network. - latestVersion, alreadyInitialized := n.localStorage.NodeDB().GetLatestVersion() + latestVersion, alreadyInitialized := w.localStorage.NodeDB().GetLatestVersion() // Finalize any versions that were not yet finalized in the old database. This is only possible // as long as there is only one non-finalized root per version. Note that we also cannot be sure // that any of these roots are valid, but this is fine as long as the final version matches the // genesis root. if alreadyInitialized { - n.logger.Debug("already initialized, finalizing any non-finalized versions", + w.logger.Debug("already initialized, finalizing any non-finalized versions", "genesis_state_root", genesisBlock.Header.StateRoot, "genesis_round", genesisBlock.Header.Round, "latest_version", latestVersion, ) for v := latestVersion + 1; v < genesisBlock.Header.Round; v++ { - roots, err := n.localStorage.NodeDB().GetRootsForVersion(v) + roots, err := w.localStorage.NodeDB().GetRootsForVersion(v) if err != nil { return fmt.Errorf("failed to fetch roots for version %d: %w", v, err) } @@ -535,7 +543,7 @@ func (n *Node) initGenesis(rt *registryApi.Runtime, genesisBlock *block.Block) e break // We must have exactly one non-finalized state root to continue. } - err = n.localStorage.NodeDB().Finalize(stateRoots) + err = w.localStorage.NodeDB().Finalize(stateRoots) if err != nil { return fmt.Errorf("failed to finalize version %d: %w", v, err) } @@ -559,14 +567,14 @@ func (n *Node) initGenesis(rt *registryApi.Runtime, genesisBlock *block.Block) e maybeRoot := stateRoot maybeRoot.Version = latestVersion - if n.localStorage.NodeDB().HasRoot(maybeRoot) { - n.logger.Debug("latest version earlier than genesis state root, filling in versions", + if w.localStorage.NodeDB().HasRoot(maybeRoot) { + w.logger.Debug("latest version earlier than genesis state root, filling in versions", "genesis_state_root", genesisBlock.Header.StateRoot, "genesis_round", genesisBlock.Header.Round, "latest_version", latestVersion, ) for v := latestVersion; v < stateRoot.Version; v++ { - err := n.localStorage.Apply(n.ctx, &storageApi.ApplyRequest{ + err := w.localStorage.Apply(w.ctx, &storageApi.ApplyRequest{ Namespace: rt.ID, RootType: storageApi.RootTypeState, SrcRound: v, @@ -579,7 +587,7 @@ func (n *Node) initGenesis(rt *registryApi.Runtime, genesisBlock *block.Block) e return fmt.Errorf("failed to fill in version %d: %w", v, err) } - err = n.localStorage.NodeDB().Finalize([]storageApi.Root{{ + err = w.localStorage.NodeDB().Finalize([]storageApi.Root{{ Namespace: rt.ID, Version: v + 1, Type: storageApi.RootTypeState, @@ -594,14 +602,14 @@ func (n *Node) initGenesis(rt *registryApi.Runtime, genesisBlock *block.Block) e } default: // Latest finalized version is the same or ahead, root must exist. - compatible = n.localStorage.NodeDB().HasRoot(stateRoot) + compatible = w.localStorage.NodeDB().HasRoot(stateRoot) } // If we are incompatible and the local version is greater or the same as the genesis version, // we cannot do anything. If the local version is lower we assume the node will sync from a // different node. if !compatible && latestVersion >= stateRoot.Version { - n.logger.Error("existing state is incompatible with runtime genesis state", + w.logger.Error("existing state is incompatible with runtime genesis state", "genesis_state_root", genesisBlock.Header.StateRoot, "genesis_round", genesisBlock.Header.Round, "latest_version", latestVersion, @@ -611,46 +619,46 @@ func (n *Node) initGenesis(rt *registryApi.Runtime, genesisBlock *block.Block) e if !compatible { // Database is empty, so assume the state will be replicated from another node. - n.logger.Warn("non-empty state root but no state available, assuming replication", + w.logger.Warn("non-empty state root but no state available, assuming replication", "state_root", genesisBlock.Header.StateRoot, ) - n.checkpointSyncForced = true + w.checkpointSyncForced = true } return nil } -func (n *Node) flushSyncedState(summary *blockSummary) (uint64, error) { - n.syncedLock.Lock() - defer n.syncedLock.Unlock() +func (w *Worker) flushSyncedState(summary *blockSummary) (uint64, error) { + w.syncedLock.Lock() + defer w.syncedLock.Unlock() - n.syncedState = *summary - if err := n.commonNode.Runtime.History().StorageSyncCheckpoint(n.syncedState.Round); err != nil { + w.syncedState = *summary + if err := w.commonNode.Runtime.History().StorageSyncCheckpoint(w.syncedState.Round); err != nil { return 0, err } - return n.syncedState.Round, nil + return w.syncedState.Round, nil } -func (n *Node) consensusCheckpointSyncer() { +func (w *Worker) consensusCheckpointSyncer() { // Make sure we always create a checkpoint when the consensus layer creates a checkpoint. The // reason why we do this is to make it faster for storage nodes that use consensus state sync // to catch up as exactly the right checkpoint will be available. - consensusCp := n.commonNode.Consensus.Checkpointer() + consensusCp := w.commonNode.Consensus.Checkpointer() if consensusCp == nil { return } // Wait for the common node to be initialized. select { - case <-n.commonNode.Initialized(): - case <-n.ctx.Done(): + case <-w.commonNode.Initialized(): + case <-w.ctx.Done(): return } // Determine the maximum number of consensus checkpoints to keep. - consensusParams, err := n.commonNode.Consensus.Core().GetParameters(n.ctx, consensus.HeightLatest) + consensusParams, err := w.commonNode.Consensus.Core().GetParameters(w.ctx, consensus.HeightLatest) if err != nil { - n.logger.Error("failed to fetch consensus parameters", + w.logger.Error("failed to fetch consensus parameters", "err", err, ) return @@ -658,7 +666,7 @@ func (n *Node) consensusCheckpointSyncer() { ch, sub, err := consensusCp.WatchCheckpoints() if err != nil { - n.logger.Error("failed to watch checkpoints", + w.logger.Error("failed to watch checkpoints", "err", err, ) return @@ -679,9 +687,9 @@ func (n *Node) consensusCheckpointSyncer() { }() for { select { - case <-n.quitCh: + case <-w.quitCh: return - case <-n.ctx.Done(): + case <-w.ctx.Done(): return case version := <-ch: // We need to wait for the next version as that is what will be in the consensus @@ -692,15 +700,15 @@ func (n *Node) consensusCheckpointSyncer() { versions = versions[1:] } - n.logger.Debug("consensus checkpoint detected, queuing runtime checkpoint", + w.logger.Debug("consensus checkpoint detected, queuing runtime checkpoint", "version", version+1, "num_versions", len(versions), ) if blkCh == nil { - blkCh, blkSub, err = n.commonNode.Consensus.Core().WatchBlocks(n.ctx) + blkCh, blkSub, err = w.commonNode.Consensus.Core().WatchBlocks(w.ctx) if err != nil { - n.logger.Error("failed to watch blocks", + w.logger.Error("failed to watch blocks", "err", err, ) continue @@ -709,7 +717,7 @@ func (n *Node) consensusCheckpointSyncer() { case blk := <-blkCh: // If there's nothing remaining, unsubscribe. if len(versions) == 0 { - n.logger.Debug("no more queued consensus checkpoint versions") + w.logger.Debug("no more queued consensus checkpoint versions") blkSub.Close() blkSub = nil @@ -727,12 +735,12 @@ func (n *Node) consensusCheckpointSyncer() { // Lookup what runtime round corresponds to the given consensus layer version and make // sure we checkpoint it. - blk, err := n.commonNode.Consensus.RootHash().GetLatestBlock(n.ctx, &roothashApi.RuntimeRequest{ - RuntimeID: n.commonNode.Runtime.ID(), + blk, err := w.commonNode.Consensus.RootHash().GetLatestBlock(w.ctx, &roothashApi.RuntimeRequest{ + RuntimeID: w.commonNode.Runtime.ID(), Height: int64(version), }) if err != nil { - n.logger.Error("failed to get runtime block corresponding to consensus checkpoint", + w.logger.Error("failed to get runtime block corresponding to consensus checkpoint", "err", err, "height", version, ) @@ -741,11 +749,11 @@ func (n *Node) consensusCheckpointSyncer() { // We may have not yet synced the corresponding runtime round locally. In this case // we need to wait until this is the case. - n.syncedLock.RLock() - lastSyncedRound := n.syncedState.Round - n.syncedLock.RUnlock() + w.syncedLock.RLock() + lastSyncedRound := w.syncedState.Round + w.syncedLock.RUnlock() if blk.Header.Round > lastSyncedRound { - n.logger.Debug("runtime round not available yet for checkpoint, waiting", + w.logger.Debug("runtime round not available yet for checkpoint, waiting", "height", version, "round", blk.Header.Round, "last_synced_round", lastSyncedRound, @@ -755,12 +763,12 @@ func (n *Node) consensusCheckpointSyncer() { } // Force runtime storage checkpointer to create a checkpoint at this round. - n.logger.Info("consensus checkpoint, force runtime checkpoint", + w.logger.Info("consensus checkpoint, force runtime checkpoint", "height", version, "round", blk.Header.Round, ) - n.checkpointer.ForceCheckpoint(blk.Header.Round) + w.checkpointer.ForceCheckpoint(blk.Header.Round) } versions = newVersions } @@ -768,105 +776,105 @@ func (n *Node) consensusCheckpointSyncer() { } // This is only called from the main worker goroutine, so no locking should be necessary. -func (n *Node) nudgeAvailability(lastSynced, latest uint64) { - if lastSynced == n.undefinedRound || latest == n.undefinedRound { +func (w *Worker) nudgeAvailability(lastSynced, latest uint64) { + if lastSynced == w.undefinedRound || latest == w.undefinedRound { return } - if latest-lastSynced < maximumRoundDelayForAvailability && !n.roleAvailable { - n.roleProvider.SetAvailable(func(_ *node.Node) error { + if latest-lastSynced < maximumRoundDelayForAvailability && !w.roleAvailable { + w.roleProvider.SetAvailable(func(_ *node.Node) error { return nil }) - if n.rpcRoleProvider != nil { - n.rpcRoleProvider.SetAvailable(func(_ *node.Node) error { + if w.rpcRoleProvider != nil { + w.rpcRoleProvider.SetAvailable(func(_ *node.Node) error { return nil }) } - n.roleAvailable = true + w.roleAvailable = true } - if latest-lastSynced > minimumRoundDelayForUnavailability && n.roleAvailable { - n.roleProvider.SetUnavailable() - if n.rpcRoleProvider != nil { - n.rpcRoleProvider.SetUnavailable() + if latest-lastSynced > minimumRoundDelayForUnavailability && w.roleAvailable { + w.roleProvider.SetUnavailable() + if w.rpcRoleProvider != nil { + w.rpcRoleProvider.SetUnavailable() } - n.roleAvailable = false + w.roleAvailable = false } } -func (n *Node) worker() { // nolint: gocyclo - defer close(n.quitCh) - defer close(n.diffCh) +func (w *Worker) worker() { // nolint: gocyclo + defer close(w.quitCh) + defer close(w.diffCh) // Wait for the common node to be initialized. select { - case <-n.commonNode.Initialized(): - case <-n.ctx.Done(): - close(n.initCh) + case <-w.commonNode.Initialized(): + case <-w.ctx.Done(): + close(w.initCh) return } - n.logger.Info("starting committee node") + w.logger.Info("starting") - n.statusLock.Lock() - n.status = api.StatusStarting - n.statusLock.Unlock() + w.statusLock.Lock() + w.status = api.StatusStarting + w.statusLock.Unlock() // Determine genesis block. - genesisBlock, err := n.commonNode.Consensus.RootHash().GetGenesisBlock(n.ctx, &roothashApi.RuntimeRequest{ - RuntimeID: n.commonNode.Runtime.ID(), + genesisBlock, err := w.commonNode.Consensus.RootHash().GetGenesisBlock(w.ctx, &roothashApi.RuntimeRequest{ + RuntimeID: w.commonNode.Runtime.ID(), Height: consensus.HeightLatest, }) if err != nil { - n.logger.Error("can't retrieve genesis block", "err", err) + w.logger.Error("can't retrieve genesis block", "err", err) return } - n.undefinedRound = genesisBlock.Header.Round - 1 + w.undefinedRound = genesisBlock.Header.Round - 1 // Determine last finalized storage version. - if version, dbNonEmpty := n.localStorage.NodeDB().GetLatestVersion(); dbNonEmpty { + if version, dbNonEmpty := w.localStorage.NodeDB().GetLatestVersion(); dbNonEmpty { var blk *block.Block - blk, err = n.commonNode.Runtime.History().GetCommittedBlock(n.ctx, version) + blk, err = w.commonNode.Runtime.History().GetCommittedBlock(w.ctx, version) switch err { case nil: // Set last synced version to last finalized storage version. - if _, err = n.flushSyncedState(summaryFromBlock(blk)); err != nil { - n.logger.Error("failed to flush synced state", "err", err) + if _, err = w.flushSyncedState(summaryFromBlock(blk)); err != nil { + w.logger.Error("failed to flush synced state", "err", err) return } default: // Failed to fetch historic block. This is fine when the network just went through a // dump/restore upgrade and we don't have any information before genesis. We treat the // database as unsynced and will proceed to either use checkpoints or sync iteratively. - n.logger.Warn("failed to fetch historic block", + w.logger.Warn("failed to fetch historic block", "err", err, "round", version, ) } } - n.syncedLock.RLock() - cachedLastRound := n.syncedState.Round - n.syncedLock.RUnlock() + w.syncedLock.RLock() + cachedLastRound := w.syncedState.Round + w.syncedLock.RUnlock() if cachedLastRound == defaultUndefinedRound || cachedLastRound < genesisBlock.Header.Round { - cachedLastRound = n.undefinedRound + cachedLastRound = w.undefinedRound } // Initialize genesis from the runtime descriptor. - isInitialStartup := (cachedLastRound == n.undefinedRound) + isInitialStartup := (cachedLastRound == w.undefinedRound) if isInitialStartup { - n.statusLock.Lock() - n.status = api.StatusInitializingGenesis - n.statusLock.Unlock() + w.statusLock.Lock() + w.status = api.StatusInitializingGenesis + w.statusLock.Unlock() var rt *registryApi.Runtime - rt, err = n.commonNode.Runtime.ActiveDescriptor(n.ctx) + rt, err = w.commonNode.Runtime.ActiveDescriptor(w.ctx) if err != nil { - n.logger.Error("failed to retrieve runtime registry descriptor", + w.logger.Error("failed to retrieve runtime registry descriptor", "err", err, ) return } - if err = n.initGenesis(rt, genesisBlock); err != nil { - n.logger.Error("failed to initialize storage at genesis", + if err = w.initGenesis(rt, genesisBlock); err != nil { + w.logger.Error("failed to initialize storage at genesis", "err", err, ) return @@ -874,28 +882,28 @@ func (n *Node) worker() { // nolint: gocyclo } // Notify the checkpointer of the genesis round so it can be checkpointed. - if n.checkpointer != nil { - n.checkpointer.ForceCheckpoint(genesisBlock.Header.Round) - n.checkpointer.Flush() + if w.checkpointer != nil { + w.checkpointer.ForceCheckpoint(genesisBlock.Header.Round) + w.checkpointer.Flush() } // Check if we are able to fetch the first block that we would be syncing if we used iterative // syncing. In case we cannot (likely because we synced the consensus layer via state sync), we // must wait for a later checkpoint to become available. - if !n.checkpointSyncForced { - n.statusLock.Lock() - n.status = api.StatusSyncStartCheck - n.statusLock.Unlock() + if !w.checkpointSyncForced { + w.statusLock.Lock() + w.status = api.StatusSyncStartCheck + w.statusLock.Unlock() // Determine what is the first round that we would need to sync. iterativeSyncStart := cachedLastRound - if iterativeSyncStart == n.undefinedRound { + if iterativeSyncStart == w.undefinedRound { iterativeSyncStart++ } // Check if we actually have information about that round. This assumes that any reindexing // was already performed (the common node would not indicate being initialized otherwise). - _, err = n.commonNode.Runtime.History().GetCommittedBlock(n.ctx, iterativeSyncStart) + _, err = w.commonNode.Runtime.History().GetCommittedBlock(w.ctx, iterativeSyncStart) SyncStartCheck: switch { case err == nil: @@ -903,7 +911,7 @@ func (n *Node) worker() { // nolint: gocyclo // No information is available about the initial round. Query the earliest historic // block and check if that block has the genesis state root and empty I/O root. var earlyBlk *block.Block - earlyBlk, err = n.commonNode.Runtime.History().GetEarliestBlock(n.ctx) + earlyBlk, err = w.commonNode.Runtime.History().GetEarliestBlock(w.ctx) switch err { case nil: // Make sure the state root is still the same as at genesis time. @@ -917,13 +925,13 @@ func (n *Node) worker() { // nolint: gocyclo // If this is the case, we can start syncing from this round instead. Fill in the // remaining versions to make sure they actually exist in the database. - n.logger.Debug("filling in versions to genesis", + w.logger.Debug("filling in versions to genesis", "genesis_round", genesisBlock.Header.Round, "earliest_round", earlyBlk.Header.Round, ) for v := genesisBlock.Header.Round; v < earlyBlk.Header.Round; v++ { - err = n.localStorage.Apply(n.ctx, &storageApi.ApplyRequest{ - Namespace: n.commonNode.Runtime.ID(), + err = w.localStorage.Apply(w.ctx, &storageApi.ApplyRequest{ + Namespace: w.commonNode.Runtime.ID(), RootType: storageApi.RootTypeState, SrcRound: v, SrcRoot: genesisBlock.Header.StateRoot, @@ -937,31 +945,31 @@ func (n *Node) worker() { // nolint: gocyclo // Ignore already finalized versions. continue default: - n.logger.Error("failed to fill in version", + w.logger.Error("failed to fill in version", "version", v, "err", err, ) return } - err = n.localStorage.NodeDB().Finalize([]storageApi.Root{{ - Namespace: n.commonNode.Runtime.ID(), + err = w.localStorage.NodeDB().Finalize([]storageApi.Root{{ + Namespace: w.commonNode.Runtime.ID(), Version: v + 1, Type: storageApi.RootTypeState, Hash: genesisBlock.Header.StateRoot, // We can ignore I/O roots. }}) if err != nil { - n.logger.Error("failed to finalize filled in version", + w.logger.Error("failed to finalize filled in version", "version", v, "err", err, ) return } } - cachedLastRound, err = n.flushSyncedState(summaryFromBlock(earlyBlk)) + cachedLastRound, err = w.flushSyncedState(summaryFromBlock(earlyBlk)) if err != nil { - n.logger.Error("failed to flush synced state", + w.logger.Error("failed to flush synced state", "err", err, ) return @@ -970,26 +978,26 @@ func (n *Node) worker() { // nolint: gocyclo break SyncStartCheck default: // This should never happen as the block should exist. - n.logger.Warn("failed to query earliest block in local history", + w.logger.Warn("failed to query earliest block in local history", "err", err, ) } // No information is available about this round, force checkpoint sync. - n.logger.Warn("forcing checkpoint sync as we don't have authoritative block info", + w.logger.Warn("forcing checkpoint sync as we don't have authoritative block info", "round", iterativeSyncStart, ) - n.checkpointSyncForced = true + w.checkpointSyncForced = true default: // Unknown error while fetching block information, abort. - n.logger.Error("failed to query block", + w.logger.Error("failed to query block", "err", err, ) return } } - n.logger.Info("worker initialized", + w.logger.Info("worker initialized", "genesis_round", genesisBlock.Header.Round, "last_synced", cachedLastRound, ) @@ -1008,10 +1016,10 @@ func (n *Node) worker() { // nolint: gocyclo // to a later state which may not be desired given that checkpoint sync has been explicitly // disabled via config. // - if (isInitialStartup && !n.checkpointSyncCfg.Disabled) || n.checkpointSyncForced { - n.statusLock.Lock() - n.status = api.StatusSyncingCheckpoints - n.statusLock.Unlock() + if (isInitialStartup && !w.checkpointSyncCfg.Disabled) || w.checkpointSyncForced { + w.statusLock.Lock() + w.status = api.StatusSyncingCheckpoints + w.statusLock.Unlock() var ( summary *blockSummary @@ -1019,17 +1027,17 @@ func (n *Node) worker() { // nolint: gocyclo ) CheckpointSyncRetry: for { - summary, err = n.syncCheckpoints(genesisBlock.Header.Round, n.checkpointSyncCfg.Disabled) + summary, err = w.syncCheckpoints(genesisBlock.Header.Round, w.checkpointSyncCfg.Disabled) if err == nil { break } attempt++ - switch n.checkpointSyncForced { + switch w.checkpointSyncForced { case true: // We have no other options but to perform a checkpoint sync as we are missing // either state or authoritative blocks. - n.logger.Info("checkpoint sync required, retrying", + w.logger.Info("checkpoint sync required, retrying", "err", err, "attempt", attempt, ) @@ -1041,36 +1049,36 @@ func (n *Node) worker() { // nolint: gocyclo // Try syncing again. The main reason for this is the sync failing due to a // checkpoint pruning race condition (where nodes list a checkpoint which is // then deleted just before we request its chunks). One retry is enough. - n.logger.Info("first checkpoint sync failed, trying once more", "err", err) + w.logger.Info("first checkpoint sync failed, trying once more", "err", err) } // Delay before retrying. select { case <-time.After(checkpointSyncRetryDelay): - case <-n.ctx.Done(): + case <-w.ctx.Done(): return } } if err != nil { - n.logger.Info("checkpoint sync failed", "err", err) + w.logger.Info("checkpoint sync failed", "err", err) } else { - cachedLastRound, err = n.flushSyncedState(summary) + cachedLastRound, err = w.flushSyncedState(summary) if err != nil { - n.logger.Error("failed to flush synced state", + w.logger.Error("failed to flush synced state", "err", err, ) return } lastFullyAppliedRound = cachedLastRound - n.logger.Info("checkpoint sync succeeded", + w.logger.Info("checkpoint sync succeeded", logging.LogEvent, LogEventCheckpointSyncSuccess, ) } } - close(n.initCh) + close(w.initCh) // Don't register availability immediately, we want to know first how far behind consensus we are. - latestBlockRound := n.undefinedRound + latestBlockRound := w.undefinedRound heartbeat := heartbeat{} heartbeat.reset() @@ -1097,10 +1105,10 @@ func (n *Node) worker() { // nolint: gocyclo syncingRounds[i] = syncing if i == latestBlockRound { - storageWorkerLastPendingRound.With(n.getMetricLabels()).Set(float64(i)) + storageWorkerLastPendingRound.With(w.getMetricLabels()).Set(float64(i)) } } - n.logger.Debug("preparing round sync", + w.logger.Debug("preparing round sync", "round", i, "outstanding_mask", syncing.outstanding, "awaiting_retry", syncing.awaitingRetry, @@ -1128,18 +1136,18 @@ func (n *Node) worker() { // nolint: gocyclo if !syncing.outstanding.contains(rootType) && syncing.awaitingRetry.contains(rootType) { syncing.scheduleDiff(rootType) wg.Add(1) - n.fetchPool.Submit(func() { + w.fetchPool.Submit(func() { defer wg.Done() - n.fetchDiff(this.Round, prevRoots[i], this.Roots[i]) + w.fetchDiff(this.Round, prevRoots[i], this.Roots[i]) }) } } } } - n.statusLock.Lock() - n.status = api.StatusSyncingRounds - n.statusLock.Unlock() + w.statusLock.Lock() + w.status = api.StatusSyncingRounds + w.statusLock.Unlock() pendingApply := &minRoundQueue{} pendingFinalize := &minRoundQueue{} @@ -1163,7 +1171,7 @@ mainLoop: // Apply the write log if one exists. err = nil if lastDiff.fetched { - err = n.localStorage.Apply(n.ctx, &storageApi.ApplyRequest{ + err = w.localStorage.Apply(w.ctx, &storageApi.ApplyRequest{ Namespace: lastDiff.thisRoot.Namespace, RootType: lastDiff.thisRoot.Type, SrcRound: lastDiff.prevRoot.Version, @@ -1178,7 +1186,7 @@ mainLoop: case errors.Is(err, storageApi.ErrExpectedRootMismatch): lastDiff.pf.RecordBadPeer() default: - n.logger.Error("can't apply write log", + w.logger.Error("can't apply write log", "err", err, "old_root", lastDiff.prevRoot, "new_root", lastDiff.thisRoot, @@ -1198,14 +1206,14 @@ mainLoop: } // We have fully synced the given round. - n.logger.Debug("finished syncing round", "round", lastDiff.round) + w.logger.Debug("finished syncing round", "round", lastDiff.round) delete(syncingRounds, lastDiff.round) summary := summaryCache[lastDiff.round] delete(summaryCache, lastDiff.round-1) lastFullyAppliedRound = lastDiff.round - storageWorkerLastSyncedRound.With(n.getMetricLabels()).Set(float64(lastDiff.round)) - storageWorkerRoundSyncLatency.With(n.getMetricLabels()).Observe(time.Since(syncing.startedAt).Seconds()) + storageWorkerLastSyncedRound.With(w.getMetricLabels()).Set(float64(lastDiff.round)) + storageWorkerRoundSyncLatency.With(w.getMetricLabels()).Observe(time.Since(syncing.startedAt).Seconds()) // Finalize storage for this round. This happens asynchronously // with respect to Apply operations for subsequent rounds. @@ -1222,15 +1230,15 @@ mainLoop: wg.Add(1) go func() { // Don't block fetching and applying remaining rounds. defer wg.Done() - n.finalize(lastSummary) + w.finalize(lastSummary) }() continue } select { - case inBlk := <-n.blockCh.Out(): + case inBlk := <-w.blockCh.Out(): blk := inBlk.(*block.Block) - n.logger.Debug("incoming block", + w.logger.Debug("incoming block", "round", blk.Header.Round, "last_synced", lastFullyAppliedRound, "last_finalized", cachedLastRound, @@ -1238,9 +1246,9 @@ mainLoop: // Check if we're far enough to reasonably register as available. latestBlockRound = blk.Header.Round - n.nudgeAvailability(cachedLastRound, latestBlockRound) + w.nudgeAvailability(cachedLastRound, latestBlockRound) - if _, ok := summaryCache[lastFullyAppliedRound]; !ok && lastFullyAppliedRound == n.undefinedRound { + if _, ok := summaryCache[lastFullyAppliedRound]; !ok && lastFullyAppliedRound == w.undefinedRound { dummy := blockSummary{ Namespace: blk.Header.Namespace, Round: lastFullyAppliedRound + 1, @@ -1264,7 +1272,7 @@ mainLoop: // since the undefined round may be unsigned -1 and in this case the loop // would not do any iterations. startSummaryRound := lastFullyAppliedRound - if startSummaryRound == n.undefinedRound { + if startSummaryRound == w.undefinedRound { startSummaryRound++ } for i := startSummaryRound; i < blk.Header.Round; i++ { @@ -1272,9 +1280,9 @@ mainLoop: continue } var oldBlock *block.Block - oldBlock, err = n.commonNode.Runtime.History().GetCommittedBlock(n.ctx, i) + oldBlock, err = w.commonNode.Runtime.History().GetCommittedBlock(w.ctx, i) if err != nil { - n.logger.Error("can't get block for round", + w.logger.Error("can't get block for round", "err", err, "round", i, "current_round", blk.Header.Round, @@ -1291,14 +1299,14 @@ mainLoop: heartbeat.reset() case <-heartbeat.C: - if latestBlockRound != n.undefinedRound { - n.logger.Debug("heartbeat", "in_flight_rounds", len(syncingRounds)) + if latestBlockRound != w.undefinedRound { + w.logger.Debug("heartbeat", "in_flight_rounds", len(syncingRounds)) triggerRoundFetches() } - case item := <-n.diffCh: + case item := <-w.diffCh: if item.err != nil { - n.logger.Error("error calling getdiff", + w.logger.Error("error calling getdiff", "err", item.err, "round", item.round, "old_root", item.prevRoot, @@ -1315,35 +1323,35 @@ mainLoop: // when we're syncing and are far behind. triggerRoundFetches() - case finalized := <-n.finalizeCh: + case finalized := <-w.finalizeCh: // If finalization failed, things start falling apart. // There's no point redoing it, since it's probably not a transient // error, and cachedLastRound also can't be updated legitimately. if finalized.err != nil { // Request a node shutdown given that syncing is effectively blocked. - _ = n.commonNode.HostNode.RequestShutdown(n.ctx, false) + _ = w.commonNode.HostNode.RequestShutdown(w.ctx, false) break mainLoop } // No further sync or out of order handling needed here, since // only one finalize at a time is triggered (for round cachedLastRound+1) - cachedLastRound, err = n.flushSyncedState(finalized.summary) + cachedLastRound, err = w.flushSyncedState(finalized.summary) if err != nil { - n.logger.Error("failed to flush synced state", + w.logger.Error("failed to flush synced state", "err", err, ) } - storageWorkerLastFullRound.With(n.getMetricLabels()).Set(float64(finalized.summary.Round)) + storageWorkerLastFullRound.With(w.getMetricLabels()).Set(float64(finalized.summary.Round)) // Check if we're far enough to reasonably register as available. - n.nudgeAvailability(cachedLastRound, latestBlockRound) + w.nudgeAvailability(cachedLastRound, latestBlockRound) // Notify the checkpointer that there is a new finalized round. if config.GlobalConfig.Storage.Checkpointer.Enabled { - n.checkpointer.NotifyNewVersion(finalized.summary.Round) + w.checkpointer.NotifyNewVersion(finalized.summary.Round) } - case <-n.ctx.Done(): + case <-w.ctx.Done(): break mainLoop } } @@ -1356,12 +1364,12 @@ mainLoop: type pruneHandler struct { logger *logging.Logger - node *Node + worker *Worker } func (p *pruneHandler) Prune(rounds []uint64) error { // Make sure we never prune past what was synced. - lastSycnedRound, _, _ := p.node.GetLastSynced() + lastSycnedRound, _, _ := p.worker.GetLastSynced() for _, round := range rounds { if round >= lastSycnedRound { @@ -1375,7 +1383,7 @@ func (p *pruneHandler) Prune(rounds []uint64) error { p.logger.Debug("pruning storage for round", "round", round) // Prune given block. - err := p.node.localStorage.NodeDB().Prune(round) + err := p.worker.localStorage.NodeDB().Prune(round) switch err { case nil: case mkvsDB.ErrNotEarliest: diff --git a/go/worker/storage/service_internal.go b/go/worker/storage/service_internal.go index cf5d0ecf064..bd1843e0379 100644 --- a/go/worker/storage/service_internal.go +++ b/go/worker/storage/service_internal.go @@ -9,12 +9,12 @@ import ( var _ api.StorageWorker = (*Worker)(nil) func (w *Worker) GetLastSyncedRound(_ context.Context, request *api.GetLastSyncedRoundRequest) (*api.GetLastSyncedRoundResponse, error) { - node := w.runtimes[request.RuntimeID] - if node == nil { + worker := w.runtimes[request.RuntimeID] + if worker == nil { return nil, api.ErrRuntimeNotFound } - round, ioRoot, stateRoot := node.GetLastSynced() + round, ioRoot, stateRoot := worker.GetLastSynced() return &api.GetLastSyncedRoundResponse{ Round: round, IORoot: ioRoot, diff --git a/go/worker/storage/worker.go b/go/worker/storage/worker.go index f49988bd1c7..95bd92e766b 100644 --- a/go/worker/storage/worker.go +++ b/go/worker/storage/worker.go @@ -15,7 +15,7 @@ import ( "github.com/oasisprotocol/oasis-core/go/worker/storage/committee" ) -// Worker is a worker handling storage operations. +// Worker is a worker handling storage operations for all common worker runtimes. type Worker struct { enabled bool @@ -26,7 +26,7 @@ type Worker struct { initCh chan struct{} quitCh chan struct{} - runtimes map[common.Namespace]*committee.Node + runtimes map[common.Namespace]*committee.Worker } // New constructs a new storage worker. @@ -44,14 +44,14 @@ func New( logger: logging.GetLogger("worker/storage"), initCh: make(chan struct{}), quitCh: make(chan struct{}), - runtimes: make(map[common.Namespace]*committee.Node), + runtimes: make(map[common.Namespace]*committee.Worker), } if !enabled { return s, nil } - // Start storage node for every runtime. + // Register the storage worker for every runtime. for id, rt := range s.commonWorker.GetRuntimes() { if err := s.registerRuntime(rt); err != nil { return nil, fmt.Errorf("failed to create storage worker for runtime %s: %w", id, err) @@ -90,7 +90,7 @@ func (w *Worker) registerRuntime(commonNode *committeeCommon.Node) error { return fmt.Errorf("can't create local storage backend: %w", err) } - node, err := committee.NewNode( + worker, err := committee.New( commonNode, rp, rpRPC, @@ -105,8 +105,8 @@ func (w *Worker) registerRuntime(commonNode *committeeCommon.Node) error { return err } commonNode.Runtime.RegisterStorage(localStorage) - commonNode.AddHooks(node) - w.runtimes[id] = node + commonNode.AddHooks(worker) + w.runtimes[id] = worker w.logger.Info("new runtime registered", "runtime_id", id, @@ -115,7 +115,7 @@ func (w *Worker) registerRuntime(commonNode *committeeCommon.Node) error { return nil } -// Name returns the service name. +// Name returns the worker name. func (w *Worker) Name() string { return "storage worker" } @@ -196,6 +196,6 @@ func (w *Worker) Cleanup() { // GetRuntime returns a storage committee node for the given runtime (if available). // // In case the runtime with the specified id was not configured for this node it returns nil. -func (w *Worker) GetRuntime(id common.Namespace) *committee.Node { +func (w *Worker) GetRuntime(id common.Namespace) *committee.Worker { return w.runtimes[id] } From 9a824a7bd4c6028e937150b5a4d236cca798bc5a Mon Sep 17 00:00:00 2001 From: Martin Tomazic Date: Sat, 2 Aug 2025 21:57:31 +0200 Subject: [PATCH 02/10] go/worker/storage/statesync: Move pruning to separate file --- go/worker/storage/committee/prune.go | 48 +++++++++++++++++++++++++++ go/worker/storage/committee/worker.go | 41 ----------------------- 2 files changed, 48 insertions(+), 41 deletions(-) create mode 100644 go/worker/storage/committee/prune.go diff --git a/go/worker/storage/committee/prune.go b/go/worker/storage/committee/prune.go new file mode 100644 index 00000000000..f61f96ce1e7 --- /dev/null +++ b/go/worker/storage/committee/prune.go @@ -0,0 +1,48 @@ +package committee + +import ( + "fmt" + + "github.com/oasisprotocol/oasis-core/go/common/logging" + mkvsDB "github.com/oasisprotocol/oasis-core/go/storage/mkvs/db/api" +) + +type pruneHandler struct { + logger *logging.Logger + worker *Worker +} + +func (p *pruneHandler) Prune(rounds []uint64) error { + // Make sure we never prune past what was synced. + lastSycnedRound, _, _ := p.worker.GetLastSynced() + + for _, round := range rounds { + if round >= lastSycnedRound { + return fmt.Errorf("worker/storage: tried to prune past last synced round (last synced: %d)", + lastSycnedRound, + ) + } + + // Old suggestion: Make sure we don't prune rounds that need to be checkpointed but haven't been yet. + + p.logger.Debug("pruning storage for round", "round", round) + + // Prune given block. + err := p.worker.localStorage.NodeDB().Prune(round) + switch err { + case nil: + case mkvsDB.ErrNotEarliest: + p.logger.Debug("skipping non-earliest round", + "round", round, + ) + continue + default: + p.logger.Error("failed to prune block", + "err", err, + ) + return err + } + } + + return nil +} diff --git a/go/worker/storage/committee/worker.go b/go/worker/storage/committee/worker.go index d487d0676bc..a8afce86425 100644 --- a/go/worker/storage/committee/worker.go +++ b/go/worker/storage/committee/worker.go @@ -29,7 +29,6 @@ import ( storageApi "github.com/oasisprotocol/oasis-core/go/storage/api" "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" dbApi "github.com/oasisprotocol/oasis-core/go/storage/mkvs/db/api" - mkvsDB "github.com/oasisprotocol/oasis-core/go/storage/mkvs/db/api" workerCommon "github.com/oasisprotocol/oasis-core/go/worker/common" "github.com/oasisprotocol/oasis-core/go/worker/common/committee" "github.com/oasisprotocol/oasis-core/go/worker/registration" @@ -1361,43 +1360,3 @@ mainLoop: // some new blocks, but only as many as were already in-flight at the point when the main // context was canceled. } - -type pruneHandler struct { - logger *logging.Logger - worker *Worker -} - -func (p *pruneHandler) Prune(rounds []uint64) error { - // Make sure we never prune past what was synced. - lastSycnedRound, _, _ := p.worker.GetLastSynced() - - for _, round := range rounds { - if round >= lastSycnedRound { - return fmt.Errorf("worker/storage: tried to prune past last synced round (last synced: %d)", - lastSycnedRound, - ) - } - - // TODO: Make sure we don't prune rounds that need to be checkpointed but haven't been yet. - - p.logger.Debug("pruning storage for round", "round", round) - - // Prune given block. - err := p.worker.localStorage.NodeDB().Prune(round) - switch err { - case nil: - case mkvsDB.ErrNotEarliest: - p.logger.Debug("skipping non-earliest round", - "round", round, - ) - continue - default: - p.logger.Error("failed to prune block", - "err", err, - ) - return err - } - } - - return nil -} From 5c016ed4d4e42de239b149b96d7df0c652681946 Mon Sep 17 00:00:00 2001 From: Martin Tomazic Date: Sun, 24 Aug 2025 21:24:25 +0200 Subject: [PATCH 03/10] go/worker/storage/statesync: Create genesis checkpoint later Previously, genesis checkpoint was created right after the state may be initialized from the runtime descriptor. If this is not the case it is first fetched from the peers. Thus, we should force the genesis checkpoint only after checkpoint sync finishes. Finally, redundant nil check was removed. --- go/worker/storage/committee/worker.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/go/worker/storage/committee/worker.go b/go/worker/storage/committee/worker.go index a8afce86425..c29bc81d0a3 100644 --- a/go/worker/storage/committee/worker.go +++ b/go/worker/storage/committee/worker.go @@ -880,12 +880,6 @@ func (w *Worker) worker() { // nolint: gocyclo } } - // Notify the checkpointer of the genesis round so it can be checkpointed. - if w.checkpointer != nil { - w.checkpointer.ForceCheckpoint(genesisBlock.Header.Round) - w.checkpointer.Flush() - } - // Check if we are able to fetch the first block that we would be syncing if we used iterative // syncing. In case we cannot (likely because we synced the consensus layer via state sync), we // must wait for a later checkpoint to become available. @@ -1076,6 +1070,10 @@ func (w *Worker) worker() { // nolint: gocyclo } close(w.initCh) + // Notify the checkpointer of the genesis round so it can be checkpointed. + w.checkpointer.ForceCheckpoint(genesisBlock.Header.Round) + w.checkpointer.Flush() + // Don't register availability immediately, we want to know first how far behind consensus we are. latestBlockRound := w.undefinedRound From 9d1cce323b440024af34208c59769c189f2dcb27 Mon Sep 17 00:00:00 2001 From: Martin Tomazic Date: Sun, 24 Aug 2025 21:51:39 +0200 Subject: [PATCH 04/10] go/storage/mkvs: Add a Serve method to the checkpointer This is desirable, so that worker that initilize a new checkpointer don't require accepting context, but instead the lifetime and initialization of checkpointer is handled by the worker's Serve method. --- go/consensus/cometbft/abci/state.go | 9 ++++---- go/storage/mkvs/checkpoint/checkpointer.go | 22 ++++++++----------- .../mkvs/checkpoint/checkpointer_test.go | 19 ++++++++++++---- go/worker/storage/committee/worker.go | 11 ++++++---- 4 files changed, 36 insertions(+), 25 deletions(-) diff --git a/go/consensus/cometbft/abci/state.go b/go/consensus/cometbft/abci/state.go index e580cd3e720..ced587f45de 100644 --- a/go/consensus/cometbft/abci/state.go +++ b/go/consensus/cometbft/abci/state.go @@ -782,10 +782,11 @@ func newApplicationState(ctx context.Context, upgrader upgrade.Backend, cfg *App }, nil }, } - s.checkpointer, err = checkpoint.NewCheckpointer(s.ctx, ndb, ldb.Checkpointer(), checkpointerCfg) - if err != nil { - return nil, fmt.Errorf("state: failed to create checkpointer: %w", err) - } + s.checkpointer = checkpoint.NewCheckpointer(ndb, ldb.Checkpointer(), checkpointerCfg) + go func() { + err := s.checkpointer.Serve(ctx) + s.logger.Error("checkpointer stopped", "err", err) + }() } go s.metricsWorker() diff --git a/go/storage/mkvs/checkpoint/checkpointer.go b/go/storage/mkvs/checkpoint/checkpointer.go index 7a43a834871..723584f98d0 100644 --- a/go/storage/mkvs/checkpoint/checkpointer.go +++ b/go/storage/mkvs/checkpoint/checkpointer.go @@ -72,7 +72,7 @@ type CreationParameters struct { ChunkerThreads uint16 } -// Checkpointer is a checkpointer. +// Checkpointer is responsible for creating the storage snapshots (checkpoints). type Checkpointer interface { // NotifyNewVersion notifies the checkpointer that a new version has been finalized. NotifyNewVersion(version uint64) @@ -95,6 +95,9 @@ type Checkpointer interface { // intervals; after unpausing, a checkpoint won't be created immediately, but the checkpointer // will wait for the next regular event. Pause(pause bool) + + // Serve starts running the checkpointer. + Serve(ctx context.Context) error } type checkpointer struct { @@ -285,7 +288,7 @@ func (c *checkpointer) maybeCheckpoint(ctx context.Context, version uint64, para return nil } -func (c *checkpointer) worker(ctx context.Context) { +func (c *checkpointer) Serve(ctx context.Context) error { c.logger.Debug("storage checkpointer started", "check_interval", c.cfg.CheckInterval, ) @@ -310,7 +313,7 @@ func (c *checkpointer) worker(ctx context.Context) { select { case <-ctx.Done(): - return + return ctx.Err() case <-time.After(interval): case <-c.flushCh.Out(): case paused = <-c.pausedCh: @@ -323,7 +326,7 @@ func (c *checkpointer) worker(ctx context.Context) { ) select { case <-ctx.Done(): - return + return ctx.Err() case v := <-c.notifyCh.Out(): version = v.(uint64) case v := <-c.forceCh.Out(): @@ -387,13 +390,8 @@ func (c *checkpointer) worker(ctx context.Context) { // NewCheckpointer creates a new checkpointer that can be notified of new finalized versions and // will automatically generate the configured number of checkpoints. -func NewCheckpointer( - ctx context.Context, - ndb db.NodeDB, - creator Creator, - cfg CheckpointerConfig, -) (Checkpointer, error) { - c := &checkpointer{ +func NewCheckpointer(ndb db.NodeDB, creator Creator, cfg CheckpointerConfig) Checkpointer { + return &checkpointer{ cfg: cfg, ndb: ndb, creator: creator, @@ -405,6 +403,4 @@ func NewCheckpointer( cpNotifier: pubsub.NewBroker(false), logger: logging.GetLogger("storage/mkvs/checkpoint/"+cfg.Name).With("namespace", cfg.Namespace), } - go c.worker(ctx) - return c, nil } diff --git a/go/storage/mkvs/checkpoint/checkpointer_test.go b/go/storage/mkvs/checkpoint/checkpointer_test.go index 9782f904b85..67e3e5ea193 100644 --- a/go/storage/mkvs/checkpoint/checkpointer_test.go +++ b/go/storage/mkvs/checkpoint/checkpointer_test.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "path/filepath" + "sync" "testing" "time" @@ -25,7 +26,12 @@ const ( func testCheckpointer(t *testing.T, factory dbApi.Factory, earliestVersion, interval uint64, preExistingData bool) { require := require.New(t) - ctx := context.Background() + + var wg sync.WaitGroup + defer wg.Wait() + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() // Initialize a database. dir, err := os.MkdirTemp("", "mkvs.checkpointer") @@ -69,8 +75,8 @@ func testCheckpointer(t *testing.T, factory dbApi.Factory, earliestVersion, inte fc, err := NewFileCreator(filepath.Join(dir, "checkpoints"), ndb) require.NoError(err, "NewFileCreator") - // Create a checkpointer. - cp, err := NewCheckpointer(ctx, ndb, fc, CheckpointerConfig{ + // Create and run a checkpointer. + cp := NewCheckpointer(ndb, fc, CheckpointerConfig{ Name: "test", Namespace: testNs, CheckInterval: testCheckInterval, @@ -89,7 +95,12 @@ func testCheckpointer(t *testing.T, factory dbApi.Factory, earliestVersion, inte return ndb.GetRootsForVersion(version) }, }) - require.NoError(err, "NewCheckpointer") + wg.Go(func() { + err := cp.Serve(ctx) + if err != context.Canceled { + require.NoError(err) + } + }) // Start watching checkpoints. cpCh, sub, err := cp.WatchCheckpoints() diff --git a/go/worker/storage/committee/worker.go b/go/worker/storage/committee/worker.go index c29bc81d0a3..8865c5d52de 100644 --- a/go/worker/storage/committee/worker.go +++ b/go/worker/storage/committee/worker.go @@ -225,11 +225,15 @@ func New( w.ctx, w.ctxCancel = context.WithCancel(context.Background()) // Create a checkpointer (even if checkpointing is disabled) to ensure the genesis checkpoint is available. - checkpointer, err := w.newCheckpointer(w.ctx, commonNode, localStorage) + checkpointer, err := w.newCheckpointer(commonNode, localStorage) if err != nil { return nil, fmt.Errorf("failed to create checkpointer: %w", err) } w.checkpointer = checkpointer + go func() { + err := w.checkpointer.Serve(w.ctx) + w.logger.Error("checkpointer stopped", "err", err) + }() // Register prune handler. commonNode.Runtime.History().Pruner().RegisterHandler(&pruneHandler{ @@ -255,7 +259,7 @@ func New( return w, nil } -func (w *Worker) newCheckpointer(ctx context.Context, commonNode *committee.Node, localStorage storageApi.LocalBackend) (checkpoint.Checkpointer, error) { +func (w *Worker) newCheckpointer(commonNode *committee.Node, localStorage storageApi.LocalBackend) (checkpoint.Checkpointer, error) { checkInterval := checkpoint.CheckIntervalDisabled if config.GlobalConfig.Storage.Checkpointer.Enabled { checkInterval = config.GlobalConfig.Storage.Checkpointer.CheckInterval @@ -303,11 +307,10 @@ func (w *Worker) newCheckpointer(ctx context.Context, commonNode *committee.Node } return checkpoint.NewCheckpointer( - ctx, localStorage.NodeDB(), localStorage.Checkpointer(), checkpointerCfg, - ) + ), nil } // Service interface. From affcc6cd012e00d35855e9d66b8dd6f8b4b691f1 Mon Sep 17 00:00:00 2001 From: Martin Tomazic Date: Wed, 27 Aug 2025 10:10:18 +0200 Subject: [PATCH 05/10] go/storage/mkvs/checkpoint: Move methods around for readability --- go/storage/mkvs/checkpoint/checkpointer.go | 234 ++++++++++----------- 1 file changed, 117 insertions(+), 117 deletions(-) diff --git a/go/storage/mkvs/checkpoint/checkpointer.go b/go/storage/mkvs/checkpoint/checkpointer.go index 723584f98d0..b42d7291e44 100644 --- a/go/storage/mkvs/checkpoint/checkpointer.go +++ b/go/storage/mkvs/checkpoint/checkpointer.go @@ -115,6 +115,23 @@ type checkpointer struct { logger *logging.Logger } +// NewCheckpointer creates a new checkpointer that can be notified of new finalized versions and +// will automatically generate the configured number of checkpoints. +func NewCheckpointer(ndb db.NodeDB, creator Creator, cfg CheckpointerConfig) Checkpointer { + return &checkpointer{ + cfg: cfg, + ndb: ndb, + creator: creator, + notifyCh: channels.NewRingChannel(1), + forceCh: channels.NewRingChannel(1), + flushCh: channels.NewRingChannel(1), + statusCh: make(chan struct{}), + pausedCh: make(chan bool), + cpNotifier: pubsub.NewBroker(false), + logger: logging.GetLogger("storage/mkvs/checkpoint/"+cfg.Name).With("namespace", cfg.Namespace), + } +} + // Implements Checkpointer. func (c *checkpointer) NotifyNewVersion(version uint64) { c.notifyCh.In() <- version @@ -143,6 +160,106 @@ func (c *checkpointer) Pause(pause bool) { c.pausedCh <- pause } +func (c *checkpointer) Serve(ctx context.Context) error { + c.logger.Debug("storage checkpointer started", + "check_interval", c.cfg.CheckInterval, + ) + defer func() { + c.logger.Debug("storage checkpointer terminating") + }() + + paused := false + + for { + var interval time.Duration + switch c.cfg.CheckInterval { + case CheckIntervalDisabled: + interval = CheckIntervalDisabled + default: + interval = random.GetRandomValueFromInterval( + checkpointIntervalRandomizationFactor, + rand.Float64(), + c.cfg.CheckInterval, + ) + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(interval): + case <-c.flushCh.Out(): + case paused = <-c.pausedCh: + continue + } + + var ( + version uint64 + force bool + ) + select { + case <-ctx.Done(): + return ctx.Err() + case v := <-c.notifyCh.Out(): + version = v.(uint64) + case v := <-c.forceCh.Out(): + version = v.(uint64) + force = true + } + + // Fetch current checkpoint parameters. + params := c.cfg.Parameters + if params == nil && c.cfg.GetParameters != nil { + var err error + params, err = c.cfg.GetParameters(ctx) + if err != nil { + c.logger.Error("failed to get checkpoint parameters", + "err", err, + "version", version, + ) + continue + } + } + if params == nil { + c.logger.Error("no checkpoint parameters") + continue + } + + // Don't checkpoint if checkpoints are disabled. + switch { + case force: + // Always checkpoint when forced. + case paused: + continue + case params.Interval == 0: + continue + case c.cfg.CheckInterval == CheckIntervalDisabled: + continue + default: + } + + var err error + switch force { + case false: + err = c.maybeCheckpoint(ctx, version, params) + case true: + err = c.checkpoint(ctx, version, params) + } + if err != nil { + c.logger.Error("failed to checkpoint", + "version", version, + "err", err, + ) + continue + } + + // Emit status update if someone is listening. This is only used in tests. + select { + case c.statusCh <- struct{}{}: + default: + } + } +} + func (c *checkpointer) checkpoint(ctx context.Context, version uint64, params *CreationParameters) (err error) { // Notify watchers about the checkpoint we are about to make. c.cpNotifier.Broadcast(version) @@ -287,120 +404,3 @@ func (c *checkpointer) maybeCheckpoint(ctx context.Context, version uint64, para return nil } - -func (c *checkpointer) Serve(ctx context.Context) error { - c.logger.Debug("storage checkpointer started", - "check_interval", c.cfg.CheckInterval, - ) - defer func() { - c.logger.Debug("storage checkpointer terminating") - }() - - paused := false - - for { - var interval time.Duration - switch c.cfg.CheckInterval { - case CheckIntervalDisabled: - interval = CheckIntervalDisabled - default: - interval = random.GetRandomValueFromInterval( - checkpointIntervalRandomizationFactor, - rand.Float64(), - c.cfg.CheckInterval, - ) - } - - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(interval): - case <-c.flushCh.Out(): - case paused = <-c.pausedCh: - continue - } - - var ( - version uint64 - force bool - ) - select { - case <-ctx.Done(): - return ctx.Err() - case v := <-c.notifyCh.Out(): - version = v.(uint64) - case v := <-c.forceCh.Out(): - version = v.(uint64) - force = true - } - - // Fetch current checkpoint parameters. - params := c.cfg.Parameters - if params == nil && c.cfg.GetParameters != nil { - var err error - params, err = c.cfg.GetParameters(ctx) - if err != nil { - c.logger.Error("failed to get checkpoint parameters", - "err", err, - "version", version, - ) - continue - } - } - if params == nil { - c.logger.Error("no checkpoint parameters") - continue - } - - // Don't checkpoint if checkpoints are disabled. - switch { - case force: - // Always checkpoint when forced. - case paused: - continue - case params.Interval == 0: - continue - case c.cfg.CheckInterval == CheckIntervalDisabled: - continue - default: - } - - var err error - switch force { - case false: - err = c.maybeCheckpoint(ctx, version, params) - case true: - err = c.checkpoint(ctx, version, params) - } - if err != nil { - c.logger.Error("failed to checkpoint", - "version", version, - "err", err, - ) - continue - } - - // Emit status update if someone is listening. This is only used in tests. - select { - case c.statusCh <- struct{}{}: - default: - } - } -} - -// NewCheckpointer creates a new checkpointer that can be notified of new finalized versions and -// will automatically generate the configured number of checkpoints. -func NewCheckpointer(ndb db.NodeDB, creator Creator, cfg CheckpointerConfig) Checkpointer { - return &checkpointer{ - cfg: cfg, - ndb: ndb, - creator: creator, - notifyCh: channels.NewRingChannel(1), - forceCh: channels.NewRingChannel(1), - flushCh: channels.NewRingChannel(1), - statusCh: make(chan struct{}), - pausedCh: make(chan bool), - cpNotifier: pubsub.NewBroker(false), - logger: logging.GetLogger("storage/mkvs/checkpoint/"+cfg.Name).With("namespace", cfg.Namespace), - } -} From fb734746e6988c5b3b8842408bc7a0a962c8227b Mon Sep 17 00:00:00 2001 From: Martin Tomazic Date: Mon, 25 Aug 2025 11:14:53 +0200 Subject: [PATCH 06/10] go/worker/storage/statesync: Add explicit timeout for fetching diff --- go/worker/storage/committee/checkpoint_sync.go | 2 +- go/worker/storage/committee/worker.go | 12 ++++++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/go/worker/storage/committee/checkpoint_sync.go b/go/worker/storage/committee/checkpoint_sync.go index cfed8ed7d26..44b6c2d20d5 100644 --- a/go/worker/storage/committee/checkpoint_sync.go +++ b/go/worker/storage/committee/checkpoint_sync.go @@ -22,7 +22,7 @@ const ( // cpListsTimeout is the timeout for fetching checkpoints from all nodes. cpListsTimeout = 30 * time.Second // cpRestoreTimeout is the timeout for restoring a checkpoint chunk from the remote peer. - cpRestoreTimeout = 60 * time.Second + cpRestoreTimeout = time.Minute checkpointStatusDone = 0 checkpointStatusNext = 1 diff --git a/go/worker/storage/committee/worker.go b/go/worker/storage/committee/worker.go index 8865c5d52de..390950e2a8c 100644 --- a/go/worker/storage/committee/worker.go +++ b/go/worker/storage/committee/worker.go @@ -71,6 +71,9 @@ const ( // same checkpoint hashes. The current value was chosen based on the benchmarks // done on the modern developer machine. chunkerThreads = 12 + + // diffResponseTimeout is the maximum time for fetching storage diff from the peer. + diffResponseTimeout = 15 * time.Second ) type roundItem interface { @@ -451,10 +454,7 @@ func (w *Worker) fetchDiff(round uint64, prevRoot, thisRoot storageApi.Root) { "new_root", thisRoot, ) - ctx, cancel := context.WithCancel(w.ctx) - defer cancel() - - wl, pf, err := w.getDiff(ctx, prevRoot, thisRoot) + wl, pf, err := w.getDiff(w.ctx, prevRoot, thisRoot) if err != nil { result.err = err return @@ -467,11 +467,15 @@ func (w *Worker) fetchDiff(round uint64, prevRoot, thisRoot storageApi.Root) { // // In case of no peers or error, it fallbacks to the legacy storage sync protocol. func (w *Worker) getDiff(ctx context.Context, prevRoot, thisRoot storageApi.Root) (storageApi.WriteLog, rpc.PeerFeedback, error) { + ctx, cancel := context.WithTimeout(ctx, diffResponseTimeout) + defer cancel() rsp1, pf, err := w.diffSync.GetDiff(ctx, &diffsync.GetDiffRequest{StartRoot: prevRoot, EndRoot: thisRoot}) if err == nil { // if NO error return rsp1.WriteLog, pf, nil } + ctx, cancel = context.WithTimeout(ctx, diffResponseTimeout) + defer cancel() rsp2, pf, err := w.legacyStorageSync.GetDiff(ctx, &synclegacy.GetDiffRequest{StartRoot: prevRoot, EndRoot: thisRoot}) if err != nil { return nil, nil, err From d620addf92be25d6b652e002a6467ca811e59f7a Mon Sep 17 00:00:00 2001 From: Martin Tomazic Date: Fri, 29 Aug 2025 21:28:42 +0200 Subject: [PATCH 07/10] go/worker/storage/statesync: Fix deadlock on the cleanup Previously if the context was not canceled the fetcher might be sending the diffs on a channel that cannot be emptied, since we are already out of the main for loop, resulting in wg.Wait to never complete. --- go/worker/storage/committee/worker.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/go/worker/storage/committee/worker.go b/go/worker/storage/committee/worker.go index 390950e2a8c..22a8798d8b4 100644 --- a/go/worker/storage/committee/worker.go +++ b/go/worker/storage/committee/worker.go @@ -824,6 +824,9 @@ func (w *Worker) worker() { // nolint: gocyclo w.status = api.StatusStarting w.statusLock.Unlock() + var wg sync.WaitGroup + defer wg.Wait() + // Determine genesis block. genesisBlock, err := w.commonNode.Consensus.RootHash().GetGenesisBlock(w.ctx, &roothashApi.RuntimeRequest{ RuntimeID: w.commonNode.Runtime.ID(), @@ -1087,7 +1090,6 @@ func (w *Worker) worker() { // nolint: gocyclo heartbeat := heartbeat{} heartbeat.reset() - var wg sync.WaitGroup syncingRounds := make(map[uint64]*inFlight) summaryCache := make(map[uint64]*blockSummary) triggerRoundFetches := func() { @@ -1360,7 +1362,6 @@ mainLoop: } } - wg.Wait() // blockCh will be garbage-collected without being closed. It can potentially still contain // some new blocks, but only as many as were already in-flight at the point when the main // context was canceled. From c4748dba6da35dada0360179ec9a5bca034a018f Mon Sep 17 00:00:00 2001 From: Martin Tomazic Date: Wed, 27 Aug 2025 11:43:16 +0200 Subject: [PATCH 08/10] go/worker/storage/statesync: Reduce the scope of workers This also serves as step towards passing the context explicitly. --- go/worker/storage/committee/worker.go | 49 ++++++++++++++------------- 1 file changed, 26 insertions(+), 23 deletions(-) diff --git a/go/worker/storage/committee/worker.go b/go/worker/storage/committee/worker.go index 22a8798d8b4..8a46855be39 100644 --- a/go/worker/storage/committee/worker.go +++ b/go/worker/storage/committee/worker.go @@ -150,8 +150,6 @@ type Worker struct { undefinedRound uint64 - fetchPool *workerpool.Pool - workerCommonCfg workerCommon.Config checkpointer checkpoint.Checkpointer @@ -187,10 +185,6 @@ func New( ) (*Worker, error) { initMetrics() - // Create the fetcher pool. - fetchPool := workerpool.New("storage_fetch/" + commonNode.Runtime.ID().String()) - fetchPool.Resize(config.GlobalConfig.Storage.FetcherCount) - w := &Worker{ commonNode: commonNode, @@ -203,8 +197,6 @@ func New( localStorage: localStorage, - fetchPool: fetchPool, - checkpointSyncCfg: checkpointSyncCfg, status: api.StatusInitializing, @@ -233,10 +225,6 @@ func New( return nil, fmt.Errorf("failed to create checkpointer: %w", err) } w.checkpointer = checkpointer - go func() { - err := w.checkpointer.Serve(w.ctx) - w.logger.Error("checkpointer stopped", "err", err) - }() // Register prune handler. commonNode.Runtime.History().Pruner().RegisterHandler(&pruneHandler{ @@ -321,20 +309,11 @@ func (w *Worker) newCheckpointer(commonNode *committee.Node, localStorage storag // Start causes the worker to start responding to CometBFT new block events. func (w *Worker) Start() error { go w.worker() - if config.GlobalConfig.Storage.Checkpointer.Enabled { - go w.consensusCheckpointSyncer() - } return nil } // Stop causes the worker to stop watching and shut down. func (w *Worker) Stop() { - w.statusLock.Lock() - w.status = api.StatusStopping - w.statusLock.Unlock() - - w.fetchPool.Stop() - w.ctxCancel() } @@ -819,7 +798,6 @@ func (w *Worker) worker() { // nolint: gocyclo } w.logger.Info("starting") - w.statusLock.Lock() w.status = api.StatusStarting w.statusLock.Unlock() @@ -827,6 +805,25 @@ func (w *Worker) worker() { // nolint: gocyclo var wg sync.WaitGroup defer wg.Wait() + ctx, cancel := context.WithCancel(w.ctx) + defer cancel() + + wg.Go(func() { + <-ctx.Done() + w.statusLock.Lock() + w.status = api.StatusStopping + w.statusLock.Unlock() + }) + defer w.logger.Info("stopped") + + wg.Go(func() { + err := w.checkpointer.Serve(ctx) + w.logger.Error("checkpointer stopped", "err", err) + }) + if config.GlobalConfig.Storage.Checkpointer.Enabled { + go w.consensusCheckpointSyncer() + } + // Determine genesis block. genesisBlock, err := w.commonNode.Consensus.RootHash().GetGenesisBlock(w.ctx, &roothashApi.RuntimeRequest{ RuntimeID: w.commonNode.Runtime.ID(), @@ -1079,6 +1076,7 @@ func (w *Worker) worker() { // nolint: gocyclo } } close(w.initCh) + w.logger.Info("initialized") // Notify the checkpointer of the genesis round so it can be checkpointed. w.checkpointer.ForceCheckpoint(genesisBlock.Header.Round) @@ -1092,6 +1090,11 @@ func (w *Worker) worker() { // nolint: gocyclo syncingRounds := make(map[uint64]*inFlight) summaryCache := make(map[uint64]*blockSummary) + + fetchPool := workerpool.New("storage_fetch/" + w.commonNode.Runtime.ID().String()) + fetchPool.Resize(config.GlobalConfig.Storage.FetcherCount) + defer fetchPool.Stop() + triggerRoundFetches := func() { for i := lastFullyAppliedRound + 1; i <= latestBlockRound; i++ { syncing, ok := syncingRounds[i] @@ -1142,7 +1145,7 @@ func (w *Worker) worker() { // nolint: gocyclo if !syncing.outstanding.contains(rootType) && syncing.awaitingRetry.contains(rootType) { syncing.scheduleDiff(rootType) wg.Add(1) - w.fetchPool.Submit(func() { + fetchPool.Submit(func() { defer wg.Done() w.fetchDiff(this.Round, prevRoots[i], this.Roots[i]) }) From 18bebc03c9ef8fdb95a9d0d5f65cc2fdf33ce785 Mon Sep 17 00:00:00 2001 From: Martin Tomazic Date: Sun, 3 Aug 2025 00:57:34 +0200 Subject: [PATCH 09/10] go/worker/storage/statesync: Pass context explicitly In addition, committee storage worker now implements the Service interface. The corresponding BackgroundService methods (already not used) have been removed. Similarly, the storage worker was internally refactored to Service interface to ease eventual removal of the BackgroundService. Additionally, observe that the parent (storage worker) is registered as background service, thus upon error inside committee worker there is no need to manually request the node shutdown. Finally, panicking has been replaced with error. Semantic changed slightly: Previously storage worker would wait for all committee workers to finish. Now it will terminate when the first one finishes. This was already the case if the committee worker panicked. --- .changelog/6306.trivial.md | 0 .../storage/committee/checkpoint_sync.go | 28 +-- go/worker/storage/committee/worker.go | 160 ++++++------------ go/worker/storage/worker.go | 68 +++++--- 4 files changed, 108 insertions(+), 148 deletions(-) create mode 100644 .changelog/6306.trivial.md diff --git a/.changelog/6306.trivial.md b/.changelog/6306.trivial.md new file mode 100644 index 00000000000..e69de29bb2d diff --git a/go/worker/storage/committee/checkpoint_sync.go b/go/worker/storage/committee/checkpoint_sync.go index 44b6c2d20d5..9d38936158e 100644 --- a/go/worker/storage/committee/checkpoint_sync.go +++ b/go/worker/storage/committee/checkpoint_sync.go @@ -194,8 +194,8 @@ func (w *Worker) fetchChunk(ctx context.Context, chunk *chunk) ([]byte, rpc.Peer return rsp2.Chunk, pf, nil } -func (w *Worker) handleCheckpoint(check *checkpointsync.Checkpoint, maxParallelRequests uint) (cpStatus int, rerr error) { - if err := w.localStorage.Checkpointer().StartRestore(w.ctx, check.Metadata); err != nil { +func (w *Worker) handleCheckpoint(ctx context.Context, check *checkpointsync.Checkpoint, maxParallelRequests uint) (cpStatus int, rerr error) { + if err := w.localStorage.Checkpointer().StartRestore(ctx, check.Metadata); err != nil { // Any previous restores were already aborted by the driver up the call stack, so // things should have been going smoothly here; bail. return checkpointStatusBail, fmt.Errorf("can't start checkpoint restore: %w", err) @@ -222,7 +222,7 @@ func (w *Worker) handleCheckpoint(check *checkpointsync.Checkpoint, maxParallelR chunkReturnCh := make(chan *chunk, maxParallelRequests) errorCh := make(chan int, maxParallelRequests) - ctx, cancel := context.WithCancel(w.ctx) + chunkCtx, cancel := context.WithCancel(ctx) // Spawn the worker group to fetch and restore checkpoint chunks. var workerGroup sync.WaitGroup @@ -231,7 +231,7 @@ func (w *Worker) handleCheckpoint(check *checkpointsync.Checkpoint, maxParallelR workerGroup.Add(1) go func() { defer workerGroup.Done() - w.checkpointChunkFetcher(ctx, chunkDispatchCh, chunkReturnCh, errorCh) + w.checkpointChunkFetcher(chunkCtx, chunkDispatchCh, chunkReturnCh, errorCh) }() } go func() { @@ -283,8 +283,8 @@ func (w *Worker) handleCheckpoint(check *checkpointsync.Checkpoint, maxParallelR } select { - case <-w.ctx.Done(): - return checkpointStatusBail, w.ctx.Err() + case <-ctx.Done(): + return checkpointStatusBail, ctx.Err() case returned := <-chunkReturnCh: if returned == nil { @@ -313,8 +313,8 @@ func (w *Worker) handleCheckpoint(check *checkpointsync.Checkpoint, maxParallelR } } -func (w *Worker) getCheckpointList() ([]*checkpointsync.Checkpoint, error) { - ctx, cancel := context.WithTimeout(w.ctx, cpListsTimeout) +func (w *Worker) getCheckpointList(ctx context.Context) ([]*checkpointsync.Checkpoint, error) { + ctx, cancel := context.WithTimeout(ctx, cpListsTimeout) defer cancel() list, err := w.fetchCheckpoints(ctx) @@ -369,7 +369,7 @@ func sortCheckpoints(s []*checkpointsync.Checkpoint) { }) } -func (w *Worker) checkCheckpointUsable(cp *checkpointsync.Checkpoint, remainingMask outstandingMask, genesisRound uint64) bool { +func (w *Worker) checkCheckpointUsable(ctx context.Context, cp *checkpointsync.Checkpoint, remainingMask outstandingMask, genesisRound uint64) bool { namespace := w.commonNode.Runtime.ID() if !namespace.Equal(&cp.Root.Namespace) { // Not for the right runtime. @@ -380,7 +380,7 @@ func (w *Worker) checkCheckpointUsable(cp *checkpointsync.Checkpoint, remainingM return false } - blk, err := w.commonNode.Runtime.History().GetCommittedBlock(w.ctx, cp.Root.Version) + blk, err := w.commonNode.Runtime.History().GetCommittedBlock(ctx, cp.Root.Version) if err != nil { w.logger.Error("can't get block information for checkpoint, skipping", "err", err, "root", cp.Root) return false @@ -405,14 +405,14 @@ func (w *Worker) checkCheckpointUsable(cp *checkpointsync.Checkpoint, remainingM return false } -func (w *Worker) syncCheckpoints(genesisRound uint64, wantOnlyGenesis bool) (*blockSummary, error) { +func (w *Worker) syncCheckpoints(ctx context.Context, genesisRound uint64, wantOnlyGenesis bool) (*blockSummary, error) { // Store roots and round info for checkpoints that finished syncing. // Round and namespace info will get overwritten as rounds are skipped // for errors, driven by remainingRoots. var syncState blockSummary // Fetch checkpoints from peers. - cps, err := w.getCheckpointList() + cps, err := w.getCheckpointList(ctx) if err != nil { return nil, fmt.Errorf("can't get checkpoint list from peers: %w", err) } @@ -449,7 +449,7 @@ func (w *Worker) syncCheckpoints(genesisRound uint64, wantOnlyGenesis bool) (*bl for _, check := range cps { - if check.Root.Version < genesisRound || !w.checkCheckpointUsable(check, remainingRoots, genesisRound) { + if check.Root.Version < genesisRound || !w.checkCheckpointUsable(ctx, check, remainingRoots, genesisRound) { continue } @@ -486,7 +486,7 @@ func (w *Worker) syncCheckpoints(genesisRound uint64, wantOnlyGenesis bool) (*bl } } - status, err := w.handleCheckpoint(check, w.checkpointSyncCfg.ChunkFetcherCount) + status, err := w.handleCheckpoint(ctx, check, w.checkpointSyncCfg.ChunkFetcherCount) switch status { case checkpointStatusDone: w.logger.Info("successfully restored from checkpoint", "root", check.Root, "mask", mask) diff --git a/go/worker/storage/committee/worker.go b/go/worker/storage/committee/worker.go index 8a46855be39..a96dee0d01a 100644 --- a/go/worker/storage/committee/worker.go +++ b/go/worker/storage/committee/worker.go @@ -166,11 +166,6 @@ type Worker struct { diffCh chan *fetchedDiff finalizeCh chan finalizeResult - ctx context.Context - ctxCancel context.CancelFunc - - quitCh chan struct{} - initCh chan struct{} } @@ -205,7 +200,6 @@ func New( diffCh: make(chan *fetchedDiff), finalizeCh: make(chan finalizeResult), - quitCh: make(chan struct{}), initCh: make(chan struct{}), } @@ -217,8 +211,6 @@ func New( // Initialize sync state. w.syncedState.Round = defaultUndefinedRound - w.ctx, w.ctxCancel = context.WithCancel(context.Background()) - // Create a checkpointer (even if checkpointing is disabled) to ensure the genesis checkpoint is available. checkpointer, err := w.newCheckpointer(commonNode, localStorage) if err != nil { @@ -304,29 +296,6 @@ func (w *Worker) newCheckpointer(commonNode *committee.Node, localStorage storag ), nil } -// Service interface. - -// Start causes the worker to start responding to CometBFT new block events. -func (w *Worker) Start() error { - go w.worker() - return nil -} - -// Stop causes the worker to stop watching and shut down. -func (w *Worker) Stop() { - w.ctxCancel() -} - -// Quit returns a channel that will be closed when the worker stops. -func (w *Worker) Quit() <-chan struct{} { - return w.quitCh -} - -// Cleanup cleans up any leftover state after the worker is stopped. -func (w *Worker) Cleanup() { - // Nothing to do here? -} - // Initialized returns a channel that will be closed once the worker finished starting up. func (w *Worker) Initialized() <-chan struct{} { return w.initCh @@ -397,7 +366,7 @@ func (w *Worker) GetLastSynced() (uint64, storageApi.Root, storageApi.Root) { return w.syncedState.Round, io, state } -func (w *Worker) fetchDiff(round uint64, prevRoot, thisRoot storageApi.Root) { +func (w *Worker) fetchDiff(ctx context.Context, round uint64, prevRoot, thisRoot storageApi.Root) { result := &fetchedDiff{ fetched: false, pf: rpc.NewNopPeerFeedback(), @@ -408,7 +377,7 @@ func (w *Worker) fetchDiff(round uint64, prevRoot, thisRoot storageApi.Root) { defer func() { select { case w.diffCh <- result: - case <-w.ctx.Done(): + case <-ctx.Done(): } }() @@ -433,7 +402,7 @@ func (w *Worker) fetchDiff(round uint64, prevRoot, thisRoot storageApi.Root) { "new_root", thisRoot, ) - wl, pf, err := w.getDiff(w.ctx, prevRoot, thisRoot) + wl, pf, err := w.getDiff(ctx, prevRoot, thisRoot) if err != nil { result.err = err return @@ -462,7 +431,7 @@ func (w *Worker) getDiff(ctx context.Context, prevRoot, thisRoot storageApi.Root return rsp2.WriteLog, pf, nil } -func (w *Worker) finalize(summary *blockSummary) { +func (w *Worker) finalize(ctx context.Context, summary *blockSummary) { err := w.localStorage.NodeDB().Finalize(summary.Roots) switch err { case nil: @@ -490,11 +459,11 @@ func (w *Worker) finalize(summary *blockSummary) { select { case w.finalizeCh <- result: - case <-w.ctx.Done(): + case <-ctx.Done(): } } -func (w *Worker) initGenesis(rt *registryApi.Runtime, genesisBlock *block.Block) error { +func (w *Worker) initGenesis(ctx context.Context, rt *registryApi.Runtime, genesisBlock *block.Block) error { w.logger.Info("initializing storage at genesis") // Check what the latest finalized version in the database is as we may be using a database @@ -559,7 +528,7 @@ func (w *Worker) initGenesis(rt *registryApi.Runtime, genesisBlock *block.Block) "latest_version", latestVersion, ) for v := latestVersion; v < stateRoot.Version; v++ { - err := w.localStorage.Apply(w.ctx, &storageApi.ApplyRequest{ + err := w.localStorage.Apply(ctx, &storageApi.ApplyRequest{ Namespace: rt.ID, RootType: storageApi.RootTypeState, SrcRound: v, @@ -624,7 +593,7 @@ func (w *Worker) flushSyncedState(summary *blockSummary) (uint64, error) { return w.syncedState.Round, nil } -func (w *Worker) consensusCheckpointSyncer() { +func (w *Worker) consensusCheckpointSyncer(ctx context.Context) { // Make sure we always create a checkpoint when the consensus layer creates a checkpoint. The // reason why we do this is to make it faster for storage nodes that use consensus state sync // to catch up as exactly the right checkpoint will be available. @@ -636,12 +605,12 @@ func (w *Worker) consensusCheckpointSyncer() { // Wait for the common node to be initialized. select { case <-w.commonNode.Initialized(): - case <-w.ctx.Done(): + case <-ctx.Done(): return } // Determine the maximum number of consensus checkpoints to keep. - consensusParams, err := w.commonNode.Consensus.Core().GetParameters(w.ctx, consensus.HeightLatest) + consensusParams, err := w.commonNode.Consensus.Core().GetParameters(ctx, consensus.HeightLatest) if err != nil { w.logger.Error("failed to fetch consensus parameters", "err", err, @@ -672,9 +641,7 @@ func (w *Worker) consensusCheckpointSyncer() { }() for { select { - case <-w.quitCh: - return - case <-w.ctx.Done(): + case <-ctx.Done(): return case version := <-ch: // We need to wait for the next version as that is what will be in the consensus @@ -691,7 +658,7 @@ func (w *Worker) consensusCheckpointSyncer() { ) if blkCh == nil { - blkCh, blkSub, err = w.commonNode.Consensus.Core().WatchBlocks(w.ctx) + blkCh, blkSub, err = w.commonNode.Consensus.Core().WatchBlocks(ctx) if err != nil { w.logger.Error("failed to watch blocks", "err", err, @@ -720,7 +687,7 @@ func (w *Worker) consensusCheckpointSyncer() { // Lookup what runtime round corresponds to the given consensus layer version and make // sure we checkpoint it. - blk, err := w.commonNode.Consensus.RootHash().GetLatestBlock(w.ctx, &roothashApi.RuntimeRequest{ + blk, err := w.commonNode.Consensus.RootHash().GetLatestBlock(ctx, &roothashApi.RuntimeRequest{ RuntimeID: w.commonNode.Runtime.ID(), Height: int64(version), }) @@ -785,16 +752,16 @@ func (w *Worker) nudgeAvailability(lastSynced, latest uint64) { } } -func (w *Worker) worker() { // nolint: gocyclo - defer close(w.quitCh) +// Serve runs the storage worker. +func (w *Worker) Serve(ctx context.Context) error { // nolint: gocyclo defer close(w.diffCh) // Wait for the common node to be initialized. select { case <-w.commonNode.Initialized(): - case <-w.ctx.Done(): + case <-ctx.Done(): close(w.initCh) - return + return ctx.Err() } w.logger.Info("starting") @@ -805,7 +772,7 @@ func (w *Worker) worker() { // nolint: gocyclo var wg sync.WaitGroup defer wg.Wait() - ctx, cancel := context.WithCancel(w.ctx) + ctx, cancel := context.WithCancel(ctx) defer cancel() wg.Go(func() { @@ -821,30 +788,30 @@ func (w *Worker) worker() { // nolint: gocyclo w.logger.Error("checkpointer stopped", "err", err) }) if config.GlobalConfig.Storage.Checkpointer.Enabled { - go w.consensusCheckpointSyncer() + wg.Go(func() { + w.consensusCheckpointSyncer(ctx) + }) } // Determine genesis block. - genesisBlock, err := w.commonNode.Consensus.RootHash().GetGenesisBlock(w.ctx, &roothashApi.RuntimeRequest{ + genesisBlock, err := w.commonNode.Consensus.RootHash().GetGenesisBlock(ctx, &roothashApi.RuntimeRequest{ RuntimeID: w.commonNode.Runtime.ID(), Height: consensus.HeightLatest, }) if err != nil { - w.logger.Error("can't retrieve genesis block", "err", err) - return + return fmt.Errorf("can't retrieve genesis block: %w", err) } w.undefinedRound = genesisBlock.Header.Round - 1 // Determine last finalized storage version. if version, dbNonEmpty := w.localStorage.NodeDB().GetLatestVersion(); dbNonEmpty { var blk *block.Block - blk, err = w.commonNode.Runtime.History().GetCommittedBlock(w.ctx, version) + blk, err = w.commonNode.Runtime.History().GetCommittedBlock(ctx, version) switch err { case nil: // Set last synced version to last finalized storage version. if _, err = w.flushSyncedState(summaryFromBlock(blk)); err != nil { - w.logger.Error("failed to flush synced state", "err", err) - return + return fmt.Errorf("failed to flush synced state: %w", err) } default: // Failed to fetch historic block. This is fine when the network just went through a @@ -872,18 +839,12 @@ func (w *Worker) worker() { // nolint: gocyclo w.statusLock.Unlock() var rt *registryApi.Runtime - rt, err = w.commonNode.Runtime.ActiveDescriptor(w.ctx) + rt, err = w.commonNode.Runtime.ActiveDescriptor(ctx) if err != nil { - w.logger.Error("failed to retrieve runtime registry descriptor", - "err", err, - ) - return + return fmt.Errorf("failed to retrieve runtime registry descriptor: %w", err) } - if err = w.initGenesis(rt, genesisBlock); err != nil { - w.logger.Error("failed to initialize storage at genesis", - "err", err, - ) - return + if err = w.initGenesis(ctx, rt, genesisBlock); err != nil { + return fmt.Errorf("failed to initialize storage at genesis: %w", err) } } @@ -903,7 +864,7 @@ func (w *Worker) worker() { // nolint: gocyclo // Check if we actually have information about that round. This assumes that any reindexing // was already performed (the common node would not indicate being initialized otherwise). - _, err = w.commonNode.Runtime.History().GetCommittedBlock(w.ctx, iterativeSyncStart) + _, err = w.commonNode.Runtime.History().GetCommittedBlock(ctx, iterativeSyncStart) SyncStartCheck: switch { case err == nil: @@ -911,7 +872,7 @@ func (w *Worker) worker() { // nolint: gocyclo // No information is available about the initial round. Query the earliest historic // block and check if that block has the genesis state root and empty I/O root. var earlyBlk *block.Block - earlyBlk, err = w.commonNode.Runtime.History().GetEarliestBlock(w.ctx) + earlyBlk, err = w.commonNode.Runtime.History().GetEarliestBlock(ctx) switch err { case nil: // Make sure the state root is still the same as at genesis time. @@ -930,7 +891,7 @@ func (w *Worker) worker() { // nolint: gocyclo "earliest_round", earlyBlk.Header.Round, ) for v := genesisBlock.Header.Round; v < earlyBlk.Header.Round; v++ { - err = w.localStorage.Apply(w.ctx, &storageApi.ApplyRequest{ + err = w.localStorage.Apply(ctx, &storageApi.ApplyRequest{ Namespace: w.commonNode.Runtime.ID(), RootType: storageApi.RootTypeState, SrcRound: v, @@ -945,11 +906,7 @@ func (w *Worker) worker() { // nolint: gocyclo // Ignore already finalized versions. continue default: - w.logger.Error("failed to fill in version", - "version", v, - "err", err, - ) - return + return fmt.Errorf("failed to fill in version %d: %w", v, err) } err = w.localStorage.NodeDB().Finalize([]storageApi.Root{{ @@ -960,19 +917,12 @@ func (w *Worker) worker() { // nolint: gocyclo // We can ignore I/O roots. }}) if err != nil { - w.logger.Error("failed to finalize filled in version", - "version", v, - "err", err, - ) - return + return fmt.Errorf("failed to finalize filled in version %v: %w", v, err) } } cachedLastRound, err = w.flushSyncedState(summaryFromBlock(earlyBlk)) if err != nil { - w.logger.Error("failed to flush synced state", - "err", err, - ) - return + return fmt.Errorf("failed to flush synced state: %w", err) } // No need to force a checkpoint sync. break SyncStartCheck @@ -990,10 +940,7 @@ func (w *Worker) worker() { // nolint: gocyclo w.checkpointSyncForced = true default: // Unknown error while fetching block information, abort. - w.logger.Error("failed to query block", - "err", err, - ) - return + return fmt.Errorf("failed to query block: %w", err) } } @@ -1027,7 +974,7 @@ func (w *Worker) worker() { // nolint: gocyclo ) CheckpointSyncRetry: for { - summary, err = w.syncCheckpoints(genesisBlock.Header.Round, w.checkpointSyncCfg.Disabled) + summary, err = w.syncCheckpoints(ctx, genesisBlock.Header.Round, w.checkpointSyncCfg.Disabled) if err == nil { break } @@ -1055,8 +1002,8 @@ func (w *Worker) worker() { // nolint: gocyclo // Delay before retrying. select { case <-time.After(checkpointSyncRetryDelay): - case <-w.ctx.Done(): - return + case <-ctx.Done(): + return ctx.Err() } } if err != nil { @@ -1064,10 +1011,7 @@ func (w *Worker) worker() { // nolint: gocyclo } else { cachedLastRound, err = w.flushSyncedState(summary) if err != nil { - w.logger.Error("failed to flush synced state", - "err", err, - ) - return + return fmt.Errorf("failed to flush synced state %w", err) } lastFullyAppliedRound = cachedLastRound w.logger.Info("checkpoint sync succeeded", @@ -1147,7 +1091,7 @@ func (w *Worker) worker() { // nolint: gocyclo wg.Add(1) fetchPool.Submit(func() { defer wg.Done() - w.fetchDiff(this.Round, prevRoots[i], this.Roots[i]) + w.fetchDiff(ctx, this.Round, prevRoots[i], this.Roots[i]) }) } } @@ -1166,7 +1110,6 @@ func (w *Worker) worker() { // nolint: gocyclo // including all missing rounds since the last fully applied one. Fetched diffs are then applied // in round order, ensuring no gaps. Once a round has all its roots applied, background finalization // for that round is triggered asynchronously, not blocking concurrent fetching and diff application. -mainLoop: for { // Drain the Apply and Finalize queues first, before waiting for new events in the select below. @@ -1180,7 +1123,7 @@ mainLoop: // Apply the write log if one exists. err = nil if lastDiff.fetched { - err = w.localStorage.Apply(w.ctx, &storageApi.ApplyRequest{ + err = w.localStorage.Apply(ctx, &storageApi.ApplyRequest{ Namespace: lastDiff.thisRoot.Namespace, RootType: lastDiff.thisRoot.Type, SrcRound: lastDiff.prevRoot.Version, @@ -1239,7 +1182,7 @@ mainLoop: wg.Add(1) go func() { // Don't block fetching and applying remaining rounds. defer wg.Done() - w.finalize(lastSummary) + w.finalize(ctx, lastSummary) }() continue } @@ -1289,14 +1232,9 @@ mainLoop: continue } var oldBlock *block.Block - oldBlock, err = w.commonNode.Runtime.History().GetCommittedBlock(w.ctx, i) + oldBlock, err = w.commonNode.Runtime.History().GetCommittedBlock(ctx, i) if err != nil { - w.logger.Error("can't get block for round", - "err", err, - "round", i, - "current_round", blk.Header.Round, - ) - panic("can't get block in storage worker") + return fmt.Errorf("failed to get block for round %d (current round: %d): %w", i, blk.Header.Round, err) } summaryCache[i] = summaryFromBlock(oldBlock) } @@ -1337,9 +1275,7 @@ mainLoop: // There's no point redoing it, since it's probably not a transient // error, and cachedLastRound also can't be updated legitimately. if finalized.err != nil { - // Request a node shutdown given that syncing is effectively blocked. - _ = w.commonNode.HostNode.RequestShutdown(w.ctx, false) - break mainLoop + return fmt.Errorf("failed to finalize (round: %d): %w", finalized.summary.Round, finalized.err) } // No further sync or out of order handling needed here, since @@ -1360,8 +1296,8 @@ mainLoop: w.checkpointer.NotifyNewVersion(finalized.summary.Round) } - case <-w.ctx.Done(): - break mainLoop + case <-ctx.Done(): + return ctx.Err() } } diff --git a/go/worker/storage/worker.go b/go/worker/storage/worker.go index 95bd92e766b..9797db4ac45 100644 --- a/go/worker/storage/worker.go +++ b/go/worker/storage/worker.go @@ -1,8 +1,11 @@ package storage import ( + "context" "fmt" + "golang.org/x/sync/errgroup" + "github.com/oasisprotocol/oasis-core/go/common" "github.com/oasisprotocol/oasis-core/go/common/grpc" "github.com/oasisprotocol/oasis-core/go/common/logging" @@ -27,6 +30,9 @@ type Worker struct { quitCh chan struct{} runtimes map[common.Namespace]*committee.Worker + + ctx context.Context + cancel context.CancelFunc } // New constructs a new storage worker. @@ -35,6 +41,7 @@ func New( commonWorker *workerCommon.Worker, registration *registration.Worker, ) (*Worker, error) { + ctx, cancel := context.WithCancel(context.Background()) enabled := config.GlobalConfig.Mode.HasLocalStorage() && len(commonWorker.GetRuntimes()) > 0 s := &Worker{ @@ -45,6 +52,8 @@ func New( initCh: make(chan struct{}), quitCh: make(chan struct{}), runtimes: make(map[common.Namespace]*committee.Worker), + ctx: ctx, + cancel: cancel, } if !enabled { @@ -133,6 +142,21 @@ func (w *Worker) Initialized() <-chan struct{} { // Start starts the storage service. func (w *Worker) Start() error { + go func() { + if err := w.Serve(w.ctx); err != nil { + w.logger.Error("worker stopped", "error", err) + } + }() + return nil +} + +// Serve starts a state sync worker for each of the configured runtime, unless +// disabled. +// +// If any state sync worker returns an error, it cancels the remaining ones and +// waits for all of them to finish. The error from the first failing worker is +// returned. +func (w *Worker) Serve(ctx context.Context) error { if !w.enabled { w.logger.Info("not starting storage worker as it is disabled") @@ -142,34 +166,34 @@ func (w *Worker) Start() error { return nil } - // Wait for all runtimes to terminate. - go func() { - defer close(w.quitCh) - - for _, r := range w.runtimes { - <-r.Quit() - } + w.logger.Info("starting", "num_runtimes", len(w.runtimes)) + defer func() { + close(w.quitCh) + w.logger.Info("stopped") }() - // Start all runtimes and wait for initialization. go func() { - w.logger.Info("starting storage sync services", "num_runtimes", len(w.runtimes)) - - for _, r := range w.runtimes { - _ = r.Start() - } - - // Wait for runtimes to be initialized. for _, r := range w.runtimes { <-r.Initialized() } - - w.logger.Info("storage worker started") - + w.logger.Info("initialized") close(w.initCh) }() - return nil + return w.serve(ctx) +} + +func (w *Worker) serve(ctx context.Context) error { + g, ctx := errgroup.WithContext(ctx) + for id, r := range w.runtimes { + g.Go(func() error { + if err := r.Serve(ctx); err != nil { + return fmt.Errorf("storage worker failed (runtimeID: %s): %w", id, err) + } + return nil + }) + } + return g.Wait() } // Stop halts the service. @@ -179,9 +203,9 @@ func (w *Worker) Stop() { return } - for _, r := range w.runtimes { - r.Stop() - } + w.logger.Info("stopping") + w.cancel() + <-w.quitCh } // Quit returns a channel that will be closed when the service terminates. From 91ed0dac960b8860c02a423ce87e6f5a7b9423a8 Mon Sep 17 00:00:00 2001 From: Martin Tomazic Date: Fri, 29 Aug 2025 21:50:13 +0200 Subject: [PATCH 10/10] go/worker/storage/committee: Define lastFullyAppliedRound later --- go/worker/storage/committee/worker.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/go/worker/storage/committee/worker.go b/go/worker/storage/committee/worker.go index a96dee0d01a..8548b526c51 100644 --- a/go/worker/storage/committee/worker.go +++ b/go/worker/storage/committee/worker.go @@ -949,8 +949,6 @@ func (w *Worker) Serve(ctx context.Context) error { // nolint: gocyclo "last_synced", cachedLastRound, ) - lastFullyAppliedRound := cachedLastRound - // Try to perform initial sync from state and io checkpoints if either: // // - Checkpoint sync has been forced because there is insufficient information available to use @@ -1013,7 +1011,6 @@ func (w *Worker) Serve(ctx context.Context) error { // nolint: gocyclo if err != nil { return fmt.Errorf("failed to flush synced state %w", err) } - lastFullyAppliedRound = cachedLastRound w.logger.Info("checkpoint sync succeeded", logging.LogEvent, LogEventCheckpointSyncSuccess, ) @@ -1028,6 +1025,7 @@ func (w *Worker) Serve(ctx context.Context) error { // nolint: gocyclo // Don't register availability immediately, we want to know first how far behind consensus we are. latestBlockRound := w.undefinedRound + lastFullyAppliedRound := cachedLastRound heartbeat := heartbeat{} heartbeat.reset()