Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added .changelog/6306.trivial.md
Empty file.
9 changes: 5 additions & 4 deletions go/consensus/cometbft/abci/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -782,10 +782,11 @@ func newApplicationState(ctx context.Context, upgrader upgrade.Backend, cfg *App
}, nil
},
}
s.checkpointer, err = checkpoint.NewCheckpointer(s.ctx, ndb, ldb.Checkpointer(), checkpointerCfg)
if err != nil {
return nil, fmt.Errorf("state: failed to create checkpointer: %w", err)
}
s.checkpointer = checkpoint.NewCheckpointer(ndb, ldb.Checkpointer(), checkpointerCfg)
go func() {
err := s.checkpointer.Serve(ctx)
s.logger.Error("checkpointer stopped", "err", err)
}()
}

go s.metricsWorker()
Expand Down
4 changes: 2 additions & 2 deletions go/oasis-node/cmd/node/node_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,8 @@ func (n *Node) getRuntimeStatus(ctx context.Context) (map[common.Namespace]contr
}

// Fetch storage worker status.
if storageNode := n.StorageWorker.GetRuntime(rt.ID()); storageNode != nil {
status.Storage, err = storageNode.GetStatus(ctx)
if storageWorker := n.StorageWorker.GetRuntime(rt.ID()); storageWorker != nil {
status.Storage, err = storageWorker.GetStatus(ctx)
if err != nil {
logger.Error("failed to fetch storage worker status", "err", err)
}
Expand Down
4 changes: 2 additions & 2 deletions go/oasis-test-runner/oasis/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
roothash "github.com/oasisprotocol/oasis-core/go/roothash/api"
"github.com/oasisprotocol/oasis-core/go/roothash/api/commitment"
upgrade "github.com/oasisprotocol/oasis-core/go/upgrade/api"
workerStorage "github.com/oasisprotocol/oasis-core/go/worker/storage/committee"
storageWorker "github.com/oasisprotocol/oasis-core/go/worker/storage/committee"
)

// LogAssertEvent returns a handler which checks whether a specific log event was
Expand Down Expand Up @@ -116,7 +116,7 @@ func LogAssertRoothashRoothashReindexing() log.WatcherHandlerFactory {
// LogAssertCheckpointSync returns a handler which checks whether initial storage sync from
// a checkpoint was successful or not.
func LogAssertCheckpointSync() log.WatcherHandlerFactory {
return LogAssertEvent(workerStorage.LogEventCheckpointSyncSuccess, "checkpoint sync did not succeed")
return LogAssertEvent(storageWorker.LogEventCheckpointSyncSuccess, "checkpoint sync did not succeed")
}

// LogAssertDiscrepancyMajorityFailure returns a handler which checks whether a discrepancy resolution
Expand Down
246 changes: 121 additions & 125 deletions go/storage/mkvs/checkpoint/checkpointer.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type CreationParameters struct {
ChunkerThreads uint16
}

// Checkpointer is a checkpointer.
// Checkpointer is responsible for creating the storage snapshots (checkpoints).
type Checkpointer interface {
// NotifyNewVersion notifies the checkpointer that a new version has been finalized.
NotifyNewVersion(version uint64)
Expand All @@ -95,6 +95,9 @@ type Checkpointer interface {
// intervals; after unpausing, a checkpoint won't be created immediately, but the checkpointer
// will wait for the next regular event.
Pause(pause bool)

// Serve starts running the checkpointer.
Serve(ctx context.Context) error
}

type checkpointer struct {
Expand All @@ -112,6 +115,23 @@ type checkpointer struct {
logger *logging.Logger
}

// NewCheckpointer creates a new checkpointer that can be notified of new finalized versions and
// will automatically generate the configured number of checkpoints.
func NewCheckpointer(ndb db.NodeDB, creator Creator, cfg CheckpointerConfig) Checkpointer {
return &checkpointer{
cfg: cfg,
ndb: ndb,
creator: creator,
notifyCh: channels.NewRingChannel(1),
forceCh: channels.NewRingChannel(1),
flushCh: channels.NewRingChannel(1),
statusCh: make(chan struct{}),
pausedCh: make(chan bool),
cpNotifier: pubsub.NewBroker(false),
logger: logging.GetLogger("storage/mkvs/checkpoint/"+cfg.Name).With("namespace", cfg.Namespace),
}
}

// Implements Checkpointer.
func (c *checkpointer) NotifyNewVersion(version uint64) {
c.notifyCh.In() <- version
Expand Down Expand Up @@ -140,6 +160,106 @@ func (c *checkpointer) Pause(pause bool) {
c.pausedCh <- pause
}

func (c *checkpointer) Serve(ctx context.Context) error {
c.logger.Debug("storage checkpointer started",
"check_interval", c.cfg.CheckInterval,
)
defer func() {
c.logger.Debug("storage checkpointer terminating")
}()

paused := false

for {
var interval time.Duration
switch c.cfg.CheckInterval {
case CheckIntervalDisabled:
interval = CheckIntervalDisabled
default:
interval = random.GetRandomValueFromInterval(
checkpointIntervalRandomizationFactor,
rand.Float64(),
c.cfg.CheckInterval,
)
}

select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(interval):
case <-c.flushCh.Out():
case paused = <-c.pausedCh:
continue
}

var (
version uint64
force bool
)
select {
case <-ctx.Done():
return ctx.Err()
case v := <-c.notifyCh.Out():
version = v.(uint64)
case v := <-c.forceCh.Out():
version = v.(uint64)
force = true
}

// Fetch current checkpoint parameters.
params := c.cfg.Parameters
if params == nil && c.cfg.GetParameters != nil {
var err error
params, err = c.cfg.GetParameters(ctx)
if err != nil {
c.logger.Error("failed to get checkpoint parameters",
"err", err,
"version", version,
)
continue
}
}
if params == nil {
c.logger.Error("no checkpoint parameters")
continue
}

// Don't checkpoint if checkpoints are disabled.
switch {
case force:
// Always checkpoint when forced.
case paused:
continue
case params.Interval == 0:
continue
case c.cfg.CheckInterval == CheckIntervalDisabled:
continue
default:
}

var err error
switch force {
case false:
err = c.maybeCheckpoint(ctx, version, params)
case true:
err = c.checkpoint(ctx, version, params)
}
if err != nil {
c.logger.Error("failed to checkpoint",
"version", version,
"err", err,
)
continue
}

// Emit status update if someone is listening. This is only used in tests.
select {
case c.statusCh <- struct{}{}:
default:
}
}
}

func (c *checkpointer) checkpoint(ctx context.Context, version uint64, params *CreationParameters) (err error) {
// Notify watchers about the checkpoint we are about to make.
c.cpNotifier.Broadcast(version)
Expand Down Expand Up @@ -284,127 +404,3 @@ func (c *checkpointer) maybeCheckpoint(ctx context.Context, version uint64, para

return nil
}

func (c *checkpointer) worker(ctx context.Context) {
c.logger.Debug("storage checkpointer started",
"check_interval", c.cfg.CheckInterval,
)
defer func() {
c.logger.Debug("storage checkpointer terminating")
}()

paused := false

for {
var interval time.Duration
switch c.cfg.CheckInterval {
case CheckIntervalDisabled:
interval = CheckIntervalDisabled
default:
interval = random.GetRandomValueFromInterval(
checkpointIntervalRandomizationFactor,
rand.Float64(),
c.cfg.CheckInterval,
)
}

select {
case <-ctx.Done():
return
case <-time.After(interval):
case <-c.flushCh.Out():
case paused = <-c.pausedCh:
continue
}

var (
version uint64
force bool
)
select {
case <-ctx.Done():
return
case v := <-c.notifyCh.Out():
version = v.(uint64)
case v := <-c.forceCh.Out():
version = v.(uint64)
force = true
}

// Fetch current checkpoint parameters.
params := c.cfg.Parameters
if params == nil && c.cfg.GetParameters != nil {
var err error
params, err = c.cfg.GetParameters(ctx)
if err != nil {
c.logger.Error("failed to get checkpoint parameters",
"err", err,
"version", version,
)
continue
}
}
if params == nil {
c.logger.Error("no checkpoint parameters")
continue
}

// Don't checkpoint if checkpoints are disabled.
switch {
case force:
// Always checkpoint when forced.
case paused:
continue
case params.Interval == 0:
continue
case c.cfg.CheckInterval == CheckIntervalDisabled:
continue
default:
}

var err error
switch force {
case false:
err = c.maybeCheckpoint(ctx, version, params)
case true:
err = c.checkpoint(ctx, version, params)
}
if err != nil {
c.logger.Error("failed to checkpoint",
"version", version,
"err", err,
)
continue
}

// Emit status update if someone is listening. This is only used in tests.
select {
case c.statusCh <- struct{}{}:
default:
}
}
}

// NewCheckpointer creates a new checkpointer that can be notified of new finalized versions and
// will automatically generate the configured number of checkpoints.
func NewCheckpointer(
ctx context.Context,
ndb db.NodeDB,
creator Creator,
cfg CheckpointerConfig,
) (Checkpointer, error) {
c := &checkpointer{
cfg: cfg,
ndb: ndb,
creator: creator,
notifyCh: channels.NewRingChannel(1),
forceCh: channels.NewRingChannel(1),
flushCh: channels.NewRingChannel(1),
statusCh: make(chan struct{}),
pausedCh: make(chan bool),
cpNotifier: pubsub.NewBroker(false),
logger: logging.GetLogger("storage/mkvs/checkpoint/"+cfg.Name).With("namespace", cfg.Namespace),
}
go c.worker(ctx)
return c, nil
}
19 changes: 15 additions & 4 deletions go/storage/mkvs/checkpoint/checkpointer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"os"
"path/filepath"
"sync"
"testing"
"time"

Expand All @@ -25,7 +26,12 @@ const (

func testCheckpointer(t *testing.T, factory dbApi.Factory, earliestVersion, interval uint64, preExistingData bool) {
require := require.New(t)
ctx := context.Background()

var wg sync.WaitGroup
defer wg.Wait()

ctx, cancel := context.WithCancel(t.Context())
defer cancel()

// Initialize a database.
dir, err := os.MkdirTemp("", "mkvs.checkpointer")
Expand Down Expand Up @@ -69,8 +75,8 @@ func testCheckpointer(t *testing.T, factory dbApi.Factory, earliestVersion, inte
fc, err := NewFileCreator(filepath.Join(dir, "checkpoints"), ndb)
require.NoError(err, "NewFileCreator")

// Create a checkpointer.
cp, err := NewCheckpointer(ctx, ndb, fc, CheckpointerConfig{
// Create and run a checkpointer.
cp := NewCheckpointer(ndb, fc, CheckpointerConfig{
Name: "test",
Namespace: testNs,
CheckInterval: testCheckInterval,
Expand All @@ -89,7 +95,12 @@ func testCheckpointer(t *testing.T, factory dbApi.Factory, earliestVersion, inte
return ndb.GetRootsForVersion(version)
},
})
require.NoError(err, "NewCheckpointer")
wg.Go(func() {
err := cp.Serve(ctx)
if err != context.Canceled {
require.NoError(err)
}
})

// Start watching checkpoints.
cpCh, sub, err := cp.WatchCheckpoints()
Expand Down
Loading
Loading