Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions op-acceptance-tests/tests/base/conductor/health_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package conductor

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/ethereum-optimism/optimism/op-devstack/devtest"
"github.com/ethereum-optimism/optimism/op-devstack/dsl"
"github.com/ethereum-optimism/optimism/op-devstack/presets"
"github.com/ethereum-optimism/optimism/op-devstack/sysgo"
"github.com/ethereum-optimism/optimism/op-service/eth"
)

func TestConductorMarksStoppedSequencerUnhealthy(gt *testing.T) {
t := devtest.SerialT(gt)
sysgo.SkipOnKonaNode(t, "not supported")

sys := presets.NewMinimalWithConductors(t,
presets.WithConductorHealthCheck(1, 3, 3600),
)
conductor := conductorForChain(t, sys.ConductorSets, sys.L2Chain.Escape().ChainID())
ctx := t.Ctx()

require.Eventually(t, func() bool {
return conductorHealthy(ctx, conductor)
}, 30*time.Second, time.Second, "conductor should start healthy")

sys.L2CL.StopSequencer()

require.Eventually(t, func() bool {
return !conductorHealthy(ctx, conductor)
}, 30*time.Second, time.Second, "stopped sequencer should become unhealthy")
}

func conductorForChain(t devtest.T, conductorSets map[eth.ChainID]dsl.ConductorSet, chainID eth.ChainID) *dsl.Conductor {
conductors := conductorSets[chainID]
require.NotEmpty(t, conductors, "expected conductors for chain %s", chainID)
return conductors[0]
}

func conductorHealthy(ctx context.Context, conductor *dsl.Conductor) bool {
callCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
healthy, err := conductor.Escape().RpcAPI().SequencerHealthy(callCtx)
return err == nil && healthy
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package reorg

import (
"context"
"errors"
"fmt"
"math/rand"
"testing"
"time"

"github.com/ethereum/go-ethereum"
"github.com/stretchr/testify/require"

"github.com/ethereum-optimism/optimism/op-devstack/devtest"
"github.com/ethereum-optimism/optimism/op-devstack/dsl"
"github.com/ethereum-optimism/optimism/op-devstack/presets"
"github.com/ethereum-optimism/optimism/op-service/bigs"
"github.com/ethereum-optimism/optimism/op-service/eth"
)

func TestSupernodeInteropInvalidMessageReorgKeepsConductorHealthy(gt *testing.T) {
t := devtest.SerialT(gt)
sys := presets.NewTwoL2SupernodeInteropWithConductors(t, 0,
presets.WithConductorHealthCheck(5, 5, 3600),
presets.WithConductorHealthCheckMinPeerCount(2),
presets.WithConductorInteropReorgLeniency(),
)

ctx := t.Ctx()
l2BConductor := conductorForChain(t, sys.ConductorSets, sys.L2B.Escape().ChainID())
require.Eventually(t, func() bool {
return conductorHealthy(ctx, l2BConductor)
}, 30*time.Second, time.Second, "chain B conductor should start healthy")

alice := sys.FunderA.NewFundedEOA(eth.OneEther)
bob := sys.FunderB.NewFundedEOA(eth.OneEther)
eventLoggerA := alice.DeployEventLogger()

sys.L2B.CatchUpTo(sys.L2A)
sys.L2A.CatchUpTo(sys.L2B)

paused := sys.Supernode.EnsureInteropPaused(sys.L2ACL, sys.L2BCL, 10)
t.Logger().Info("interop paused", "paused", paused)

rng := rand.New(rand.NewSource(12345))
initMsg := alice.SendRandomInitMessage(rng, eventLoggerA, 2, 10)
sys.L2B.WaitForBlock()

execMsg := bob.SendInvalidExecMessage(initMsg)
invalidBlockNumber := bigs.Uint64Strict(execMsg.BlockNumber())
invalidBlockHash := execMsg.BlockHash()
invalidBlockTimestamp := sys.L2B.TimestampForBlockNum(invalidBlockNumber)

require.Eventually(t, func() bool {
return sys.L2BCL.SyncStatus().LocalSafeL2.Number >= invalidBlockNumber
}, 60*time.Second, time.Second, "invalid block should become locally safe")

stopHealthWatch := watchConductorHealth(ctx, l2BConductor)
sys.Supernode.ResumeInterop()
require.Eventually(t, func() bool {
currentBlock, err := sys.L2ELB.Escape().EthClient().BlockRefByNumber(ctx, invalidBlockNumber)
if err != nil {
if !errors.Is(eth.MaybeAsNotFoundErr(err), ethereum.NotFound) {
t.Logger().Warn("unexpected error checking block",
"block_number", invalidBlockNumber,
"err", err,
)
}
return false
}
return currentBlock.Hash != invalidBlockHash
}, 60*time.Second, time.Second, "reset should replace the invalid block")

sys.Supernode.AwaitValidatedTimestamp(invalidBlockTimestamp)
sys.L2ELB.AssertTxNotInBlock(invalidBlockNumber, execMsg.Receipt.TxHash)
require.NoError(t, stopHealthWatch(), "chain B conductor should stay healthy during invalid-message reorg")
require.True(t, conductorHealthy(ctx, l2BConductor), "chain B conductor should remain healthy after invalid-message reorg")
}

func conductorForChain(t devtest.T, conductorSets map[eth.ChainID]dsl.ConductorSet, chainID eth.ChainID) *dsl.Conductor {
conductors := conductorSets[chainID]
require.NotEmpty(t, conductors, "expected conductors for chain %s", chainID)
return conductors[0]
}

func conductorHealthy(ctx context.Context, conductor *dsl.Conductor) bool {
callCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
healthy, err := conductor.Escape().RpcAPI().SequencerHealthy(callCtx)
return err == nil && healthy
}

func watchConductorHealth(ctx context.Context, conductor *dsl.Conductor) func() error {
watchCtx, cancel := context.WithCancel(ctx)
failures := make(chan error, 1)
done := make(chan struct{})
go func() {
defer close(done)
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-watchCtx.Done():
return
case <-ticker.C:
callCtx, callCancel := context.WithTimeout(watchCtx, 5*time.Second)
healthy, err := conductor.Escape().RpcAPI().SequencerHealthy(callCtx)
callCancel()
if watchCtx.Err() != nil {
return
}
if err != nil {
select {
case failures <- fmt.Errorf("fetch conductor health: %w", err):
default:
}
return
}
if !healthy {
select {
case failures <- errors.New("conductor reported sequencer unhealthy"):
default:
}
return
}
}
}
}()
return func() error {
cancel()
<-done
select {
case err := <-failures:
return err
default:
return nil
}
}
}
31 changes: 22 additions & 9 deletions op-conductor/conductor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,15 +192,17 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*Config, error) {
RollupBoostNextHealthcheckURL: ctx.String(flags.RollupBoostNextHealthcheckURL.Name),
Paused: ctx.Bool(flags.Paused.Name),
HealthCheck: HealthCheckConfig{
Interval: ctx.Uint64(flags.HealthCheckInterval.Name),
UnsafeInterval: ctx.Uint64(flags.HealthCheckUnsafeInterval.Name),
SafeEnabled: ctx.Bool(flags.HealthCheckSafeEnabled.Name),
SafeInterval: ctx.Uint64(flags.HealthCheckSafeInterval.Name),
MinPeerCount: ctx.Uint64(flags.HealthCheckMinPeerCount.Name),
ExecutionP2pEnabled: ctx.Bool(flags.HealthcheckExecutionP2pEnabled.Name),
ExecutionP2pMinPeerCount: ctx.Uint64(flags.HealthcheckExecutionP2pMinPeerCount.Name),
ExecutionP2pRPCUrl: executionP2pRpcUrl,
ExecutionP2pCheckApi: executionP2pCheckApi,
Interval: ctx.Uint64(flags.HealthCheckInterval.Name),
UnsafeInterval: ctx.Uint64(flags.HealthCheckUnsafeInterval.Name),
SafeEnabled: ctx.Bool(flags.HealthCheckSafeEnabled.Name),
SafeInterval: ctx.Uint64(flags.HealthCheckSafeInterval.Name),
MinPeerCount: ctx.Uint64(flags.HealthCheckMinPeerCount.Name),
InteropReorgLeniency: ctx.Bool(flags.HealthCheckInteropReorgLeniency.Name),
InteropReorgLeniencyWindowSize: ctx.Uint64(flags.HealthCheckInteropReorgLeniencyWindowSize.Name),
ExecutionP2pEnabled: ctx.Bool(flags.HealthcheckExecutionP2pEnabled.Name),
ExecutionP2pMinPeerCount: ctx.Uint64(flags.HealthcheckExecutionP2pMinPeerCount.Name),
ExecutionP2pRPCUrl: executionP2pRpcUrl,
ExecutionP2pCheckApi: executionP2pCheckApi,
RollupBoostPartialHealthinessToleranceLimit: ctx.Uint64(flags.HealthCheckRollupBoostPartialHealthinessToleranceLimit.Name),
RollupBoostPartialHealthinessToleranceIntervalSeconds: ctx.Uint64(flags.HealthCheckRollupBoostPartialHealthinessToleranceIntervalSeconds.Name),
},
Expand Down Expand Up @@ -234,6 +236,14 @@ type HealthCheckConfig struct {
// MinPeerCount is the minimum number of peers required for the sequencer to be healthy.
MinPeerCount uint64

// InteropReorgLeniency enables experimental rolling-window health leniency
// for interop reorg recovery.
InteropReorgLeniency bool

// InteropReorgLeniencyWindowSize is the number of observations in the shared
// rolling/recovery window when interop reorg leniency is enabled.
InteropReorgLeniencyWindowSize uint64

// ExecutionP2pEnabled is whether to enable EL P2P checks.
ExecutionP2pEnabled bool

Expand Down Expand Up @@ -263,6 +273,9 @@ func (c *HealthCheckConfig) Check() error {
if c.MinPeerCount == 0 {
return fmt.Errorf("missing minimum peer count")
}
if c.InteropReorgLeniency && c.InteropReorgLeniencyWindowSize == 0 {
return fmt.Errorf("missing interop reorg leniency window size")
}
if c.ExecutionP2pEnabled {
if c.ExecutionP2pMinPeerCount == 0 {
return fmt.Errorf("missing minimum el p2p peers")
Expand Down
54 changes: 54 additions & 0 deletions op-conductor/conductor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ package conductor
import (
"testing"

"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
"github.com/urfave/cli/v2"

conductorFlags "github.com/ethereum-optimism/optimism/op-conductor/flags"
)

func TestConfigCheckRollupBoostAndNextMutuallyExclusive(t *testing.T) {
Expand All @@ -23,3 +27,53 @@ func TestConfigCheckRollupBoostAndNextMutuallyExclusive(t *testing.T) {
require.Error(t, err)
require.Contains(t, err.Error(), "only one of rollup-boost or rollup-boost next healthchecks can be enabled")
}

func TestHealthCheckConfigRequiresInteropReorgLeniencyWindowSize(t *testing.T) {
cfg := &HealthCheckConfig{
Interval: 1,
UnsafeInterval: 2,
SafeInterval: 3,
MinPeerCount: 1,
InteropReorgLeniency: true,
}

err := cfg.Check()
require.Error(t, err)
require.Contains(t, err.Error(), "missing interop reorg leniency window size")
}

func TestNewConfigReadsInteropReorgLeniencyEnvVar(t *testing.T) {
t.Setenv("OP_CONDUCTOR_BETA_HEALTHCHECK_INTEROP_REORG_LENIENCY", "true")
t.Setenv("OP_CONDUCTOR_BETA_HEALTHCHECK_INTEROP_REORG_LENIENCY_WINDOW_SIZE", "7")
t.Cleanup(func() {
conductorFlags.HealthCheckInteropReorgLeniency.Value = false
conductorFlags.HealthCheckInteropReorgLeniencyWindowSize.Value = 5
})

var cfg *Config
app := cli.NewApp()
app.Flags = conductorFlags.Flags
app.Action = func(ctx *cli.Context) error {
var err error
cfg, err = NewConfig(ctx, log.New())
return err
}

err := app.Run([]string{
"op-conductor",
"--consensus.addr", "127.0.0.1",
"--consensus.port", "0",
"--raft.server.id", "server-1",
"--raft.storage.dir", t.TempDir(),
"--node.rpc", "http://node.example",
"--execution.rpc", "http://exec.example",
"--healthcheck.interval", "1",
"--healthcheck.unsafe-interval", "2",
"--healthcheck.min-peer-count", "1",
"--network", "op-sepolia",
})
require.NoError(t, err)
require.NotNil(t, cfg)
require.True(t, cfg.HealthCheck.InteropReorgLeniency)
require.Equal(t, uint64(7), cfg.HealthCheck.InteropReorgLeniencyWindowSize)
}
2 changes: 2 additions & 0 deletions op-conductor/conductor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,8 @@ func (c *OpConductor) initHealthMonitor(ctx context.Context) error {
c.cfg.HealthCheck.SafeInterval,
c.cfg.HealthCheck.MinPeerCount,
c.cfg.HealthCheck.SafeEnabled,
c.cfg.HealthCheck.InteropReorgLeniency,
c.cfg.HealthCheck.InteropReorgLeniencyWindowSize,
&c.cfg.RollupCfg,
node,
p2p,
Expand Down
16 changes: 16 additions & 0 deletions op-conductor/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,20 @@ var (
Usage: "Minimum number of peers required to be considered healthy",
EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "HEALTHCHECK_MIN_PEER_COUNT"),
}
HealthCheckInteropReorgLeniency = &cli.BoolFlag{
Name: "beta.healthcheck.interop-reorg-leniency",
Usage: "Enable experimental conductor health-check leniency for interop reorg recovery.",
EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "BETA_HEALTHCHECK_INTEROP_REORG_LENIENCY"),
Value: false,
Hidden: true,
}
HealthCheckInteropReorgLeniencyWindowSize = &cli.Uint64Flag{
Name: "beta.healthcheck.interop-reorg-leniency-window-size",
Usage: "Number of observations in the experimental conductor health-check leniency rolling window.",
EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "BETA_HEALTHCHECK_INTEROP_REORG_LENIENCY_WINDOW_SIZE"),
Value: 5,
Hidden: true,
}
Paused = &cli.BoolFlag{
Name: "paused",
Usage: "Whether the conductor is paused",
Expand Down Expand Up @@ -231,6 +245,8 @@ var optionalFlags = []cli.Flag{
RaftBootstrap,
HealthCheckSafeEnabled,
HealthCheckSafeInterval,
HealthCheckInteropReorgLeniency,
HealthCheckInteropReorgLeniencyWindowSize,
RaftSnapshotInterval,
RaftSnapshotThreshold,
RaftTrailingLogs,
Expand Down
7 changes: 7 additions & 0 deletions op-conductor/flags/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ func TestBetaFlags(t *testing.T) {
}
}

func TestHealthCheckInteropReorgLeniencyFlag(t *testing.T) {
require.Equal(t, "beta.healthcheck.interop-reorg-leniency", HealthCheckInteropReorgLeniency.Name)
require.Equal(t, []string{"OP_CONDUCTOR_BETA_HEALTHCHECK_INTEROP_REORG_LENIENCY"}, HealthCheckInteropReorgLeniency.EnvVars)
require.True(t, HealthCheckInteropReorgLeniency.Hidden)
require.False(t, HealthCheckInteropReorgLeniency.Value)
}

func TestHasEnvVar(t *testing.T) {
for _, flag := range Flags {
flag := flag
Expand Down
Loading
Loading