Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changelog/6446.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
go/worker/storage: Fix runtime state pruner

Disabling pruning shouldn't prune any data including stale one
(data marked as ready to be pruned) from the previous run when
node had pruning enabled.
5 changes: 5 additions & 0 deletions go/runtime/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,11 @@ type PruneConfig struct {
NumKept uint64 `yaml:"num_kept"`
}

// PruningEnabled returns true when pruning is enabled.
func (p PruneConfig) PruningEnabled() bool {
return p.Strategy != "none"
}

// IndexerConfig is history indexer configuration.
type IndexerConfig struct {
// BatchSize is max number of blocks committed in a batch during history reindex.
Expand Down
28 changes: 19 additions & 9 deletions go/worker/storage/committee/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,19 @@ type Worker struct {
diffCh chan *fetchedDiff
finalizeCh chan finalizeResult

pruneInterval time.Duration
pruneCfg PruneConfig

initCh chan struct{}
}

// PruneConfig configures pruning of the state DB.
type PruneConfig struct {
// Enabled is true when pruning is enabled.
Enabled bool
// Interval specifies frequency at which pruning will be triggered.
Interval time.Duration
}

// New creates a new storage worker.
func New(
commonNode *committee.Node,
Expand All @@ -176,7 +184,7 @@ func New(
workerCommonCfg workerCommon.Config,
localStorage storageApi.LocalBackend,
checkpointSyncCfg *CheckpointSyncConfig,
pruneInterval time.Duration,
pruneCfg PruneConfig,
) (*Worker, error) {
initMetrics()

Expand All @@ -199,7 +207,7 @@ func New(
diffCh: make(chan *fetchedDiff),
finalizeCh: make(chan finalizeResult),

pruneInterval: pruneInterval,
pruneCfg: pruneCfg,

initCh: make(chan struct{}),
}
Expand Down Expand Up @@ -1012,12 +1020,14 @@ func (w *Worker) Serve(ctx context.Context) error { // nolint: gocyclo
close(w.initCh)
w.logger.Info("initialized")

statePruner := newPruner(w.localStorage.NodeDB(), w.commonNode.Runtime.History(), w.pruneInterval)
wg.Go(func() {
if err := statePruner.serve(ctx); err != nil && !errors.Is(err, context.Canceled) {
w.logger.Error("state pruner failed: %w", err)
}
})
if w.pruneCfg.Enabled {
statePruner := newPruner(w.localStorage.NodeDB(), w.commonNode.Runtime.History(), w.pruneCfg.Interval)
wg.Go(func() {
if err := statePruner.serve(ctx); err != nil && !errors.Is(err, context.Canceled) {
w.logger.Error("state pruner failed: %w", err)
}
})
}

// Notify the checkpointer of the genesis round so it can be checkpointed.
w.checkpointer.ForceCheckpoint(genesisBlock.Header.Round)
Expand Down
5 changes: 4 additions & 1 deletion go/worker/storage/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,10 @@ func (w *Worker) registerRuntime(commonNode *committeeCommon.Node) error {
Disabled: config.GlobalConfig.Storage.CheckpointSyncDisabled,
ChunkFetcherCount: config.GlobalConfig.Storage.FetcherCount,
},
config.GlobalConfig.Runtime.Prune.Interval,
committee.PruneConfig{
Enabled: config.GlobalConfig.Runtime.Prune.PruningEnabled(),
Interval: config.GlobalConfig.Runtime.Prune.Interval,
},
)
if err != nil {
return err
Expand Down
Loading