Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
6476aaa
go/worker/storage: Rename committee package to statesync
martintomazic Aug 2, 2025
2a22222
go/worker/storage/statesync: Move pruning to separate file
martintomazic Aug 2, 2025
ce699b9
go/worker/storage/statesync: Create genesis checkpoint later
martintomazic Aug 24, 2025
06762da
go/storage/mkvs: Add a Serve method to the checkpointer
martintomazic Aug 24, 2025
97728c6
go/storage/mkvs/checkpoint: Move methods around for readability
martintomazic Aug 27, 2025
81a4381
go/worker/storage/statesync: Add explicit timeout for fetching diff
martintomazic Aug 25, 2025
874bca6
go/worker/storage/statesync: Reduce the scope of workers
martintomazic Aug 27, 2025
2cec7ce
go/worker/storage/statesync: Fix deadlock on the cleanup
martintomazic Aug 27, 2025
e699f36
go/worker/storage/statesync: Pass context explicitly
martintomazic Aug 2, 2025
f0172a3
go/worker/storage/statesync: Do not panic
martintomazic Aug 3, 2025
f5666e8
go/worker/storage/statesync: Improve var grouping for syncing
martintomazic Aug 27, 2025
17740ee
go/worker/storage/statesync: Improve variable names
martintomazic Aug 27, 2025
ce420f1
go/worker/storage/statesync: Add timeout to checkpoint restoration
martintomazic Aug 25, 2025
aad4b24
go/worker/storage: Remove unused method and config
martintomazic Aug 27, 2025
ce7ceb2
go/worker/storage: Create new runtime storage worker
martintomazic Aug 27, 2025
3762e46
worker/storage/statesync: Add WatchFinalizedRound method
martintomazic Aug 27, 2025
9d17682
go/worker/storage: Move checkpointer to a separate worker.
martintomazic Aug 27, 2025
8bc3c32
go/worker/storage: Move availability nudger to separate worker
martintomazic Aug 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added .changelog/6036.trivial.md
Empty file.
Empty file added .changelog/6308.trivial.md
Empty file.
9 changes: 5 additions & 4 deletions go/consensus/cometbft/abci/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -782,10 +782,11 @@ func newApplicationState(ctx context.Context, upgrader upgrade.Backend, cfg *App
}, nil
},
}
s.checkpointer, err = checkpoint.NewCheckpointer(s.ctx, ndb, ldb.Checkpointer(), checkpointerCfg)
if err != nil {
return nil, fmt.Errorf("state: failed to create checkpointer: %w", err)
}
s.checkpointer = checkpoint.NewCheckpointer(ndb, ldb.Checkpointer(), checkpointerCfg)
go func() {
err := s.checkpointer.Serve(ctx)
s.logger.Error("checkpointer failed", "err", err)
}()
}

go s.metricsWorker()
Expand Down
6 changes: 3 additions & 3 deletions go/oasis-node/cmd/node/node_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,10 +312,10 @@ func (n *Node) getRuntimeStatus(ctx context.Context) (map[common.Namespace]contr
}

// Fetch storage worker status.
if storageNode := n.StorageWorker.GetRuntime(rt.ID()); storageNode != nil {
status.Storage, err = storageNode.GetStatus(ctx)
if stateSync := n.StorageWorker.GetRuntime(rt.ID()); stateSync != nil {
status.Storage, err = stateSync.GetStatus(ctx)
if err != nil {
logger.Error("failed to fetch storage worker status", "err", err)
logger.Error("failed to fetch state sync worker status", "err", err)
}
}

Expand Down
4 changes: 2 additions & 2 deletions go/oasis-test-runner/oasis/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
roothash "github.com/oasisprotocol/oasis-core/go/roothash/api"
"github.com/oasisprotocol/oasis-core/go/roothash/api/commitment"
upgrade "github.com/oasisprotocol/oasis-core/go/upgrade/api"
workerStorage "github.com/oasisprotocol/oasis-core/go/worker/storage/committee"
"github.com/oasisprotocol/oasis-core/go/worker/storage/statesync"
)

// LogAssertEvent returns a handler which checks whether a specific log event was
Expand Down Expand Up @@ -116,7 +116,7 @@ func LogAssertRoothashRoothashReindexing() log.WatcherHandlerFactory {
// LogAssertCheckpointSync returns a handler which checks whether initial storage sync from
// a checkpoint was successful or not.
func LogAssertCheckpointSync() log.WatcherHandlerFactory {
return LogAssertEvent(workerStorage.LogEventCheckpointSyncSuccess, "checkpoint sync did not succeed")
return LogAssertEvent(statesync.LogEventCheckpointSyncSuccess, "checkpoint sync did not succeed")
}

// LogAssertDiscrepancyMajorityFailure returns a handler which checks whether a discrepancy resolution
Expand Down
279 changes: 146 additions & 133 deletions go/storage/mkvs/checkpoint/checkpointer.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type CreationParameters struct {
ChunkerThreads uint16
}

// Checkpointer is a checkpointer.
// Checkpointer is responsible for creating the storage snapshots (checkpoints).
type Checkpointer interface {
// NotifyNewVersion notifies the checkpointer that a new version has been finalized.
NotifyNewVersion(version uint64)
Expand All @@ -88,30 +88,57 @@ type Checkpointer interface {
// versions are emitted before the checkpointing process starts.
WatchCheckpoints() (<-chan uint64, pubsub.ClosableSubscription, error)

// // WatchCreatedCheckpoints returns a channel that produces a stream of checkpointed versions. The
// versions are emitted immediately after the checkpoint is created.
WatchCreatedCheckpoints() (<-chan uint64, pubsub.ClosableSubscription, error)

// Flush makes the checkpointer immediately process any notifications.
Flush()

// Pause pauses or unpauses the checkpointer. Pausing doesn't influence the checkpointing
// intervals; after unpausing, a checkpoint won't be created immediately, but the checkpointer
// will wait for the next regular event.
Pause(pause bool)

// Serve starts running the checkpointer.
Serve(ctx context.Context) error
}

type checkpointer struct {
cfg CheckpointerConfig

ndb db.NodeDB
creator Creator
notifyCh *channels.RingChannel
forceCh *channels.RingChannel
flushCh *channels.RingChannel
statusCh chan struct{}
pausedCh chan bool
cpNotifier *pubsub.Broker
ndb db.NodeDB
creator Creator
notifyCh *channels.RingChannel
forceCh *channels.RingChannel
flushCh *channels.RingChannel
statusCh chan struct{}
pausedCh chan bool
cpNotifier *pubsub.Broker
cpCreatedNotifier *pubsub.Broker

logger *logging.Logger
}

// NewCheckpointer creates a new checkpointer that can be notified of new finalized versions and
// will automatically generate the configured number of checkpoints.
func NewCheckpointer(ndb db.NodeDB, creator Creator, cfg CheckpointerConfig) Checkpointer {
c := &checkpointer{
cfg: cfg,
ndb: ndb,
creator: creator,
notifyCh: channels.NewRingChannel(1),
forceCh: channels.NewRingChannel(1),
flushCh: channels.NewRingChannel(1),
statusCh: make(chan struct{}),
pausedCh: make(chan bool),
cpNotifier: pubsub.NewBroker(false),
cpCreatedNotifier: pubsub.NewBroker(false),
logger: logging.GetLogger("storage/mkvs/checkpoint/"+cfg.Name).With("namespace", cfg.Namespace),
}
return c
}

// Implements Checkpointer.
func (c *checkpointer) NotifyNewVersion(version uint64) {
c.notifyCh.In() <- version
Expand All @@ -131,6 +158,15 @@ func (c *checkpointer) WatchCheckpoints() (<-chan uint64, pubsub.ClosableSubscri
return ch, sub, nil
}

// Implements Checkpointer.
func (c *checkpointer) WatchCreatedCheckpoints() (<-chan uint64, pubsub.ClosableSubscription, error) {
ch := make(chan uint64)
sub := c.cpCreatedNotifier.Subscribe()
sub.Unwrap(ch)

return ch, sub, nil
}

// Implements Checkpointer.
func (c *checkpointer) Flush() {
c.flushCh.In() <- struct{}{}
Expand All @@ -140,6 +176,106 @@ func (c *checkpointer) Pause(pause bool) {
c.pausedCh <- pause
}

func (c *checkpointer) Serve(ctx context.Context) error {
c.logger.Debug("storage checkpointer started",
"check_interval", c.cfg.CheckInterval,
)
defer func() {
c.logger.Debug("storage checkpointer terminating")
}()

paused := false

for {
var interval time.Duration
switch c.cfg.CheckInterval {
case CheckIntervalDisabled:
interval = CheckIntervalDisabled
default:
interval = random.GetRandomValueFromInterval(
checkpointIntervalRandomizationFactor,
rand.Float64(),
c.cfg.CheckInterval,
)
}

select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(interval):
case <-c.flushCh.Out():
case paused = <-c.pausedCh:
continue
}

var (
version uint64
force bool
)
select {
case <-ctx.Done():
return ctx.Err()
case v := <-c.notifyCh.Out():
version = v.(uint64)
case v := <-c.forceCh.Out():
version = v.(uint64)
force = true
}

// Fetch current checkpoint parameters.
params := c.cfg.Parameters
if params == nil && c.cfg.GetParameters != nil {
var err error
params, err = c.cfg.GetParameters(ctx)
if err != nil {
c.logger.Error("failed to get checkpoint parameters",
"err", err,
"version", version,
)
continue
}
}
if params == nil {
c.logger.Error("no checkpoint parameters")
continue
}

// Don't checkpoint if checkpoints are disabled.
switch {
case force:
// Always checkpoint when forced.
case paused:
continue
case params.Interval == 0:
continue
case c.cfg.CheckInterval == CheckIntervalDisabled:
continue
default:
}

var err error
switch force {
case false:
err = c.maybeCheckpoint(ctx, version, params)
case true:
err = c.checkpoint(ctx, version, params)
}
if err != nil {
c.logger.Error("failed to checkpoint",
"version", version,
"err", err,
)
continue
}

// Emit status update if someone is listening. This is only used in tests.
select {
case c.statusCh <- struct{}{}:
default:
}
}
}

func (c *checkpointer) checkpoint(ctx context.Context, version uint64, params *CreationParameters) (err error) {
// Notify watchers about the checkpoint we are about to make.
c.cpNotifier.Broadcast(version)
Expand Down Expand Up @@ -191,6 +327,7 @@ func (c *checkpointer) checkpoint(ctx context.Context, version uint64, params *C
return fmt.Errorf("checkpointer: failed to create checkpoint: %w", err)
}
}
c.cpCreatedNotifier.Broadcast(version)
return nil
}

Expand Down Expand Up @@ -284,127 +421,3 @@ func (c *checkpointer) maybeCheckpoint(ctx context.Context, version uint64, para

return nil
}

func (c *checkpointer) worker(ctx context.Context) {
c.logger.Debug("storage checkpointer started",
"check_interval", c.cfg.CheckInterval,
)
defer func() {
c.logger.Debug("storage checkpointer terminating")
}()

paused := false

for {
var interval time.Duration
switch c.cfg.CheckInterval {
case CheckIntervalDisabled:
interval = CheckIntervalDisabled
default:
interval = random.GetRandomValueFromInterval(
checkpointIntervalRandomizationFactor,
rand.Float64(),
c.cfg.CheckInterval,
)
}

select {
case <-ctx.Done():
return
case <-time.After(interval):
case <-c.flushCh.Out():
case paused = <-c.pausedCh:
continue
}

var (
version uint64
force bool
)
select {
case <-ctx.Done():
return
case v := <-c.notifyCh.Out():
version = v.(uint64)
case v := <-c.forceCh.Out():
version = v.(uint64)
force = true
}

// Fetch current checkpoint parameters.
params := c.cfg.Parameters
if params == nil && c.cfg.GetParameters != nil {
var err error
params, err = c.cfg.GetParameters(ctx)
if err != nil {
c.logger.Error("failed to get checkpoint parameters",
"err", err,
"version", version,
)
continue
}
}
if params == nil {
c.logger.Error("no checkpoint parameters")
continue
}

// Don't checkpoint if checkpoints are disabled.
switch {
case force:
// Always checkpoint when forced.
case paused:
continue
case params.Interval == 0:
continue
case c.cfg.CheckInterval == CheckIntervalDisabled:
continue
default:
}

var err error
switch force {
case false:
err = c.maybeCheckpoint(ctx, version, params)
case true:
err = c.checkpoint(ctx, version, params)
}
if err != nil {
c.logger.Error("failed to checkpoint",
"version", version,
"err", err,
)
continue
}

// Emit status update if someone is listening. This is only used in tests.
select {
case c.statusCh <- struct{}{}:
default:
}
}
}

// NewCheckpointer creates a new checkpointer that can be notified of new finalized versions and
// will automatically generate the configured number of checkpoints.
func NewCheckpointer(
ctx context.Context,
ndb db.NodeDB,
creator Creator,
cfg CheckpointerConfig,
) (Checkpointer, error) {
c := &checkpointer{
cfg: cfg,
ndb: ndb,
creator: creator,
notifyCh: channels.NewRingChannel(1),
forceCh: channels.NewRingChannel(1),
flushCh: channels.NewRingChannel(1),
statusCh: make(chan struct{}),
pausedCh: make(chan bool),
cpNotifier: pubsub.NewBroker(false),
logger: logging.GetLogger("storage/mkvs/checkpoint/"+cfg.Name).With("namespace", cfg.Namespace),
}
go c.worker(ctx)
return c, nil
}
Loading
Loading