diff --git a/op-acceptance-tests/tests/base/conductor/health_test.go b/op-acceptance-tests/tests/base/conductor/health_test.go new file mode 100644 index 00000000000..e474c1cc3b6 --- /dev/null +++ b/op-acceptance-tests/tests/base/conductor/health_test.go @@ -0,0 +1,49 @@ +package conductor + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/ethereum-optimism/optimism/op-devstack/devtest" + "github.com/ethereum-optimism/optimism/op-devstack/dsl" + "github.com/ethereum-optimism/optimism/op-devstack/presets" + "github.com/ethereum-optimism/optimism/op-devstack/sysgo" + "github.com/ethereum-optimism/optimism/op-service/eth" +) + +func TestConductorMarksStoppedSequencerUnhealthy(gt *testing.T) { + t := devtest.SerialT(gt) + sysgo.SkipOnKonaNode(t, "not supported") + + sys := presets.NewMinimalWithConductors(t, + presets.WithConductorHealthCheck(1, 3, 3600), + ) + conductor := conductorForChain(t, sys.ConductorSets, sys.L2Chain.Escape().ChainID()) + ctx := t.Ctx() + + require.Eventually(t, func() bool { + return conductorHealthy(ctx, conductor) + }, 30*time.Second, time.Second, "conductor should start healthy") + + sys.L2CL.StopSequencer() + + require.Eventually(t, func() bool { + return !conductorHealthy(ctx, conductor) + }, 30*time.Second, time.Second, "stopped sequencer should become unhealthy") +} + +func conductorForChain(t devtest.T, conductorSets map[eth.ChainID]dsl.ConductorSet, chainID eth.ChainID) *dsl.Conductor { + conductors := conductorSets[chainID] + require.NotEmpty(t, conductors, "expected conductors for chain %s", chainID) + return conductors[0] +} + +func conductorHealthy(ctx context.Context, conductor *dsl.Conductor) bool { + callCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + healthy, err := conductor.Escape().RpcAPI().SequencerHealthy(callCtx) + return err == nil && healthy +} diff --git a/op-acceptance-tests/tests/supernode/interop/reorg/conductor_reorg_health_test.go b/op-acceptance-tests/tests/supernode/interop/reorg/conductor_reorg_health_test.go new file mode 100644 index 00000000000..628e47861bf --- /dev/null +++ b/op-acceptance-tests/tests/supernode/interop/reorg/conductor_reorg_health_test.go @@ -0,0 +1,139 @@ +package reorg + +import ( + "context" + "errors" + "fmt" + "math/rand" + "testing" + "time" + + "github.com/ethereum/go-ethereum" + "github.com/stretchr/testify/require" + + "github.com/ethereum-optimism/optimism/op-devstack/devtest" + "github.com/ethereum-optimism/optimism/op-devstack/dsl" + "github.com/ethereum-optimism/optimism/op-devstack/presets" + "github.com/ethereum-optimism/optimism/op-service/bigs" + "github.com/ethereum-optimism/optimism/op-service/eth" +) + +func TestSupernodeInteropInvalidMessageReorgKeepsConductorHealthy(gt *testing.T) { + t := devtest.SerialT(gt) + sys := presets.NewTwoL2SupernodeInteropWithConductors(t, 0, + presets.WithConductorHealthCheck(5, 5, 3600), + presets.WithConductorHealthCheckMinPeerCount(2), + presets.WithConductorInteropReorgLeniency(), + ) + + ctx := t.Ctx() + l2BConductor := conductorForChain(t, sys.ConductorSets, sys.L2B.Escape().ChainID()) + require.Eventually(t, func() bool { + return conductorHealthy(ctx, l2BConductor) + }, 30*time.Second, time.Second, "chain B conductor should start healthy") + + alice := sys.FunderA.NewFundedEOA(eth.OneEther) + bob := sys.FunderB.NewFundedEOA(eth.OneEther) + eventLoggerA := alice.DeployEventLogger() + + sys.L2B.CatchUpTo(sys.L2A) + sys.L2A.CatchUpTo(sys.L2B) + + paused := sys.Supernode.EnsureInteropPaused(sys.L2ACL, sys.L2BCL, 10) + t.Logger().Info("interop paused", "paused", paused) + + rng := rand.New(rand.NewSource(12345)) + initMsg := alice.SendRandomInitMessage(rng, eventLoggerA, 2, 10) + sys.L2B.WaitForBlock() + + execMsg := bob.SendInvalidExecMessage(initMsg) + invalidBlockNumber := bigs.Uint64Strict(execMsg.BlockNumber()) + invalidBlockHash := execMsg.BlockHash() + invalidBlockTimestamp := sys.L2B.TimestampForBlockNum(invalidBlockNumber) + + require.Eventually(t, func() bool { + return sys.L2BCL.SyncStatus().LocalSafeL2.Number >= invalidBlockNumber + }, 60*time.Second, time.Second, "invalid block should become locally safe") + + stopHealthWatch := watchConductorHealth(ctx, l2BConductor) + sys.Supernode.ResumeInterop() + require.Eventually(t, func() bool { + currentBlock, err := sys.L2ELB.Escape().EthClient().BlockRefByNumber(ctx, invalidBlockNumber) + if err != nil { + if !errors.Is(eth.MaybeAsNotFoundErr(err), ethereum.NotFound) { + t.Logger().Warn("unexpected error checking block", + "block_number", invalidBlockNumber, + "err", err, + ) + } + return false + } + return currentBlock.Hash != invalidBlockHash + }, 60*time.Second, time.Second, "reset should replace the invalid block") + + sys.Supernode.AwaitValidatedTimestamp(invalidBlockTimestamp) + sys.L2ELB.AssertTxNotInBlock(invalidBlockNumber, execMsg.Receipt.TxHash) + require.NoError(t, stopHealthWatch(), "chain B conductor should stay healthy during invalid-message reorg") + require.True(t, conductorHealthy(ctx, l2BConductor), "chain B conductor should remain healthy after invalid-message reorg") +} + +func conductorForChain(t devtest.T, conductorSets map[eth.ChainID]dsl.ConductorSet, chainID eth.ChainID) *dsl.Conductor { + conductors := conductorSets[chainID] + require.NotEmpty(t, conductors, "expected conductors for chain %s", chainID) + return conductors[0] +} + +func conductorHealthy(ctx context.Context, conductor *dsl.Conductor) bool { + callCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + healthy, err := conductor.Escape().RpcAPI().SequencerHealthy(callCtx) + return err == nil && healthy +} + +func watchConductorHealth(ctx context.Context, conductor *dsl.Conductor) func() error { + watchCtx, cancel := context.WithCancel(ctx) + failures := make(chan error, 1) + done := make(chan struct{}) + go func() { + defer close(done) + ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-watchCtx.Done(): + return + case <-ticker.C: + callCtx, callCancel := context.WithTimeout(watchCtx, 5*time.Second) + healthy, err := conductor.Escape().RpcAPI().SequencerHealthy(callCtx) + callCancel() + if watchCtx.Err() != nil { + return + } + if err != nil { + select { + case failures <- fmt.Errorf("fetch conductor health: %w", err): + default: + } + return + } + if !healthy { + select { + case failures <- errors.New("conductor reported sequencer unhealthy"): + default: + } + return + } + } + } + }() + return func() error { + cancel() + <-done + select { + case err := <-failures: + return err + default: + return nil + } + } +} diff --git a/op-conductor/conductor/config.go b/op-conductor/conductor/config.go index 056df88c97a..f1ace1ca33a 100644 --- a/op-conductor/conductor/config.go +++ b/op-conductor/conductor/config.go @@ -192,15 +192,17 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*Config, error) { RollupBoostNextHealthcheckURL: ctx.String(flags.RollupBoostNextHealthcheckURL.Name), Paused: ctx.Bool(flags.Paused.Name), HealthCheck: HealthCheckConfig{ - Interval: ctx.Uint64(flags.HealthCheckInterval.Name), - UnsafeInterval: ctx.Uint64(flags.HealthCheckUnsafeInterval.Name), - SafeEnabled: ctx.Bool(flags.HealthCheckSafeEnabled.Name), - SafeInterval: ctx.Uint64(flags.HealthCheckSafeInterval.Name), - MinPeerCount: ctx.Uint64(flags.HealthCheckMinPeerCount.Name), - ExecutionP2pEnabled: ctx.Bool(flags.HealthcheckExecutionP2pEnabled.Name), - ExecutionP2pMinPeerCount: ctx.Uint64(flags.HealthcheckExecutionP2pMinPeerCount.Name), - ExecutionP2pRPCUrl: executionP2pRpcUrl, - ExecutionP2pCheckApi: executionP2pCheckApi, + Interval: ctx.Uint64(flags.HealthCheckInterval.Name), + UnsafeInterval: ctx.Uint64(flags.HealthCheckUnsafeInterval.Name), + SafeEnabled: ctx.Bool(flags.HealthCheckSafeEnabled.Name), + SafeInterval: ctx.Uint64(flags.HealthCheckSafeInterval.Name), + MinPeerCount: ctx.Uint64(flags.HealthCheckMinPeerCount.Name), + InteropReorgLeniency: ctx.Bool(flags.HealthCheckInteropReorgLeniency.Name), + InteropReorgLeniencyWindowSize: ctx.Uint64(flags.HealthCheckInteropReorgLeniencyWindowSize.Name), + ExecutionP2pEnabled: ctx.Bool(flags.HealthcheckExecutionP2pEnabled.Name), + ExecutionP2pMinPeerCount: ctx.Uint64(flags.HealthcheckExecutionP2pMinPeerCount.Name), + ExecutionP2pRPCUrl: executionP2pRpcUrl, + ExecutionP2pCheckApi: executionP2pCheckApi, RollupBoostPartialHealthinessToleranceLimit: ctx.Uint64(flags.HealthCheckRollupBoostPartialHealthinessToleranceLimit.Name), RollupBoostPartialHealthinessToleranceIntervalSeconds: ctx.Uint64(flags.HealthCheckRollupBoostPartialHealthinessToleranceIntervalSeconds.Name), }, @@ -234,6 +236,14 @@ type HealthCheckConfig struct { // MinPeerCount is the minimum number of peers required for the sequencer to be healthy. MinPeerCount uint64 + // InteropReorgLeniency enables experimental rolling-window health leniency + // for interop reorg recovery. + InteropReorgLeniency bool + + // InteropReorgLeniencyWindowSize is the number of observations in the shared + // rolling/recovery window when interop reorg leniency is enabled. + InteropReorgLeniencyWindowSize uint64 + // ExecutionP2pEnabled is whether to enable EL P2P checks. ExecutionP2pEnabled bool @@ -263,6 +273,9 @@ func (c *HealthCheckConfig) Check() error { if c.MinPeerCount == 0 { return fmt.Errorf("missing minimum peer count") } + if c.InteropReorgLeniency && c.InteropReorgLeniencyWindowSize == 0 { + return fmt.Errorf("missing interop reorg leniency window size") + } if c.ExecutionP2pEnabled { if c.ExecutionP2pMinPeerCount == 0 { return fmt.Errorf("missing minimum el p2p peers") diff --git a/op-conductor/conductor/config_test.go b/op-conductor/conductor/config_test.go index a5744c3863a..b06e66755df 100644 --- a/op-conductor/conductor/config_test.go +++ b/op-conductor/conductor/config_test.go @@ -3,7 +3,11 @@ package conductor import ( "testing" + "github.com/ethereum/go-ethereum/log" "github.com/stretchr/testify/require" + "github.com/urfave/cli/v2" + + conductorFlags "github.com/ethereum-optimism/optimism/op-conductor/flags" ) func TestConfigCheckRollupBoostAndNextMutuallyExclusive(t *testing.T) { @@ -23,3 +27,53 @@ func TestConfigCheckRollupBoostAndNextMutuallyExclusive(t *testing.T) { require.Error(t, err) require.Contains(t, err.Error(), "only one of rollup-boost or rollup-boost next healthchecks can be enabled") } + +func TestHealthCheckConfigRequiresInteropReorgLeniencyWindowSize(t *testing.T) { + cfg := &HealthCheckConfig{ + Interval: 1, + UnsafeInterval: 2, + SafeInterval: 3, + MinPeerCount: 1, + InteropReorgLeniency: true, + } + + err := cfg.Check() + require.Error(t, err) + require.Contains(t, err.Error(), "missing interop reorg leniency window size") +} + +func TestNewConfigReadsInteropReorgLeniencyEnvVar(t *testing.T) { + t.Setenv("OP_CONDUCTOR_BETA_HEALTHCHECK_INTEROP_REORG_LENIENCY", "true") + t.Setenv("OP_CONDUCTOR_BETA_HEALTHCHECK_INTEROP_REORG_LENIENCY_WINDOW_SIZE", "7") + t.Cleanup(func() { + conductorFlags.HealthCheckInteropReorgLeniency.Value = false + conductorFlags.HealthCheckInteropReorgLeniencyWindowSize.Value = 5 + }) + + var cfg *Config + app := cli.NewApp() + app.Flags = conductorFlags.Flags + app.Action = func(ctx *cli.Context) error { + var err error + cfg, err = NewConfig(ctx, log.New()) + return err + } + + err := app.Run([]string{ + "op-conductor", + "--consensus.addr", "127.0.0.1", + "--consensus.port", "0", + "--raft.server.id", "server-1", + "--raft.storage.dir", t.TempDir(), + "--node.rpc", "http://node.example", + "--execution.rpc", "http://exec.example", + "--healthcheck.interval", "1", + "--healthcheck.unsafe-interval", "2", + "--healthcheck.min-peer-count", "1", + "--network", "op-sepolia", + }) + require.NoError(t, err) + require.NotNil(t, cfg) + require.True(t, cfg.HealthCheck.InteropReorgLeniency) + require.Equal(t, uint64(7), cfg.HealthCheck.InteropReorgLeniencyWindowSize) +} diff --git a/op-conductor/conductor/service.go b/op-conductor/conductor/service.go index 2f37d3f38f1..0faa2beccc4 100644 --- a/op-conductor/conductor/service.go +++ b/op-conductor/conductor/service.go @@ -253,6 +253,8 @@ func (c *OpConductor) initHealthMonitor(ctx context.Context) error { c.cfg.HealthCheck.SafeInterval, c.cfg.HealthCheck.MinPeerCount, c.cfg.HealthCheck.SafeEnabled, + c.cfg.HealthCheck.InteropReorgLeniency, + c.cfg.HealthCheck.InteropReorgLeniencyWindowSize, &c.cfg.RollupCfg, node, p2p, diff --git a/op-conductor/flags/flags.go b/op-conductor/flags/flags.go index f69add935f9..5415da098e6 100644 --- a/op-conductor/flags/flags.go +++ b/op-conductor/flags/flags.go @@ -141,6 +141,20 @@ var ( Usage: "Minimum number of peers required to be considered healthy", EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "HEALTHCHECK_MIN_PEER_COUNT"), } + HealthCheckInteropReorgLeniency = &cli.BoolFlag{ + Name: "beta.healthcheck.interop-reorg-leniency", + Usage: "Enable experimental conductor health-check leniency for interop reorg recovery.", + EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "BETA_HEALTHCHECK_INTEROP_REORG_LENIENCY"), + Value: false, + Hidden: true, + } + HealthCheckInteropReorgLeniencyWindowSize = &cli.Uint64Flag{ + Name: "beta.healthcheck.interop-reorg-leniency-window-size", + Usage: "Number of observations in the experimental conductor health-check leniency rolling window.", + EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "BETA_HEALTHCHECK_INTEROP_REORG_LENIENCY_WINDOW_SIZE"), + Value: 5, + Hidden: true, + } Paused = &cli.BoolFlag{ Name: "paused", Usage: "Whether the conductor is paused", @@ -231,6 +245,8 @@ var optionalFlags = []cli.Flag{ RaftBootstrap, HealthCheckSafeEnabled, HealthCheckSafeInterval, + HealthCheckInteropReorgLeniency, + HealthCheckInteropReorgLeniencyWindowSize, RaftSnapshotInterval, RaftSnapshotThreshold, RaftTrailingLogs, diff --git a/op-conductor/flags/flags_test.go b/op-conductor/flags/flags_test.go index 812d9afb640..1e57525d0fd 100644 --- a/op-conductor/flags/flags_test.go +++ b/op-conductor/flags/flags_test.go @@ -53,6 +53,13 @@ func TestBetaFlags(t *testing.T) { } } +func TestHealthCheckInteropReorgLeniencyFlag(t *testing.T) { + require.Equal(t, "beta.healthcheck.interop-reorg-leniency", HealthCheckInteropReorgLeniency.Name) + require.Equal(t, []string{"OP_CONDUCTOR_BETA_HEALTHCHECK_INTEROP_REORG_LENIENCY"}, HealthCheckInteropReorgLeniency.EnvVars) + require.True(t, HealthCheckInteropReorgLeniency.Hidden) + require.False(t, HealthCheckInteropReorgLeniency.Value) +} + func TestHasEnvVar(t *testing.T) { for _, flag := range Flags { flag := flag diff --git a/op-conductor/health/monitor.go b/op-conductor/health/monitor.go index 64c815b07da..ca4c6b532dd 100644 --- a/op-conductor/health/monitor.go +++ b/op-conductor/health/monitor.go @@ -24,6 +24,27 @@ var ( ErrRollupBoostNotHealthy = errors.New("rollup boost is not healthy") ) +const ( + // defaultRecoveringWindowSize is the shared health-check grace window used when + // interop reorg leniency is enabled. Unsafe-head recovery uses it to require + // unsafe-head time to outpace wall-clock time. Sync-status RPC availability + // and CL peer-count checks use it as a rolling window: all failed observations + // fail the check, all successful observations restore Success, and mixed or + // not-yet-full windows remain Inconclusive. + defaultRecoveringWindowSize = 5 +) + +type rollingWindowState string + +const ( + // rollingWindowSuccess means the full rolling window contains only successes. + rollingWindowSuccess rollingWindowState = "Success" + // rollingWindowFailed means the full rolling window contains only failures. + rollingWindowFailed rollingWindowState = "Failed" + // rollingWindowInconclusive means the window is not full or contains mixed results. + rollingWindowInconclusive rollingWindowState = "Inconclusive" +) + // HealthMonitor defines the interface for monitoring the health of the sequencer. type HealthMonitor interface { // Subscribe returns a channel that will be notified for every health check. @@ -38,11 +59,17 @@ type HealthMonitor interface { // interval is the interval between health checks measured in seconds. // safeInterval is the interval between safe head progress measured in seconds. // minPeerCount is the minimum number of peers required for the sequencer to be healthy. +// interopReorgLeniency enables experimental interop reorg recovery leniency. +// recoveringWindowSize is the number of observations used by lenient rolling/recovery windows. // rollupBoostHealthChecker is an optional health checker for rollup-boost (either standard or next client). -func NewSequencerHealthMonitor(log log.Logger, metrics metrics.Metricer, interval, unsafeInterval, safeInterval, minPeerCount uint64, safeEnabled bool, rollupCfg *rollup.Config, node dial.RollupClientInterface, p2p apis.P2PClient, rollupBoostHealthChecker client.RollupBoostHealthChecker, elP2pClient client.ElP2PClient, minElP2pPeers uint64, rollupBoostToleratePartialHealthinessToleranceLimit uint64, rollupBoostToleratePartialHealthinessToleranceIntervalSeconds uint64) HealthMonitor { +func NewSequencerHealthMonitor(log log.Logger, metricer metrics.Metricer, interval, unsafeInterval, safeInterval, minPeerCount uint64, safeEnabled bool, interopReorgLeniency bool, recoveringWindowSize uint64, rollupCfg *rollup.Config, node dial.RollupClientInterface, p2p apis.P2PClient, rollupBoostHealthChecker client.RollupBoostHealthChecker, elP2pClient client.ElP2PClient, minElP2pPeers uint64, rollupBoostToleratePartialHealthinessToleranceLimit uint64, rollupBoostToleratePartialHealthinessToleranceIntervalSeconds uint64) HealthMonitor { + if metricer == nil { + metricer = metrics.NoopMetrics + } + hm := &SequencerHealthMonitor{ log: log, - metrics: metrics, + metrics: metricer, interval: interval, healthUpdateCh: make(chan error), rollupCfg: rollupCfg, @@ -50,6 +77,8 @@ func NewSequencerHealthMonitor(log log.Logger, metrics metrics.Metricer, interva safeEnabled: safeEnabled, safeInterval: safeInterval, minPeerCount: minPeerCount, + interopReorgLeniency: interopReorgLeniency, + recoveringWindowSize: normalizeRecoveringWindowSize(recoveringWindowSize), timeProviderFn: currentTimeProvider, node: node, p2p: p2p, @@ -71,6 +100,7 @@ func NewSequencerHealthMonitor(log log.Logger, metrics metrics.Metricer, interva panic(fmt.Errorf("failed to setup health monitor: %w", err)) } } + hm.recordStaticHealthCheckMetrics() return hm } @@ -88,15 +118,19 @@ type SequencerHealthMonitor struct { cancel context.CancelFunc wg sync.WaitGroup - rollupCfg *rollup.Config - unsafeInterval uint64 - safeEnabled bool - safeInterval uint64 - minPeerCount uint64 - interval uint64 - healthUpdateCh chan error - lastSeenUnsafeNum uint64 - lastSeenUnsafeTime uint64 + rollupCfg *rollup.Config + unsafeInterval uint64 + safeEnabled bool + safeInterval uint64 + minPeerCount uint64 + interopReorgLeniency bool + recoveringWindowSize uint64 + interval uint64 + healthUpdateCh chan error + lastSeenUnsafeNum uint64 + lastSeenUnsafeTime uint64 + syncStatusWindow rollingWindowTracker + peerCountWindow rollingWindowTracker timeProviderFn func() uint64 @@ -106,16 +140,175 @@ type SequencerHealthMonitor struct { elP2p *ElP2pHealthMonitor rollupBoostPartialHealthinessToleranceLimit uint64 rollupBoostPartialHealthinessToleranceCounter *timeBoundedRotatingCounter + + // Recovering state. When pollsInRecovery is zero, the sequencer is not recovering. + initialLagInRecovery uint64 + recoveryWindowStartLag uint64 + recoveryWindowStartWallTime uint64 + recoveryWindowStartUnsafe uint64 + recoveryWindowStartNum uint64 + pollsInRecovery uint64 + pollsInRecoveryWindow uint64 } var _ HealthMonitor = (*SequencerHealthMonitor)(nil) +type rollingWindowTracker struct { + observations []bool + next uint64 + count uint64 + successes uint64 +} + +func normalizeRecoveringWindowSize(windowSize uint64) uint64 { + if windowSize == 0 { + return defaultRecoveringWindowSize + } + return windowSize +} + +func (hm *SequencerHealthMonitor) recoveringWindowSizeOrDefault() uint64 { + return normalizeRecoveringWindowSize(hm.recoveringWindowSize) +} + +func (t *rollingWindowTracker) observe(success bool, windowSize uint64) rollingWindowState { + windowSize = normalizeRecoveringWindowSize(windowSize) + if t.count > windowSize || uint64(len(t.observations)) > windowSize { + t.observations = nil + t.next = 0 + t.count = 0 + t.successes = 0 + } + if t.count < windowSize { + t.observations = append(t.observations, success) + t.count++ + if success { + t.successes++ + } + return t.state(windowSize) + } + + if t.observations[t.next] { + t.successes-- + } + t.observations[t.next] = success + if success { + t.successes++ + } + t.next = (t.next + 1) % windowSize + return t.state(windowSize) +} + +func (t *rollingWindowTracker) state(windowSize uint64) rollingWindowState { + switch { + case t.count < windowSize: + return rollingWindowInconclusive + case t.successes == windowSize: + return rollingWindowSuccess + case t.successes == 0: + return rollingWindowFailed + default: + return rollingWindowInconclusive + } +} + +func (hm *SequencerHealthMonitor) recordStaticHealthCheckMetrics() { + if hm.metrics == nil { + return + } + hm.metrics.RecordHealthCheckConfig( + hm.interval, + hm.unsafeInterval, + hm.safeInterval, + hm.minPeerCount, + hm.recoveringWindowSizeOrDefault(), + hm.safeEnabled, + hm.interopReorgLeniency, + ) + if !hm.safeEnabled { + hm.metrics.RecordHealthCheckStatus(metrics.HealthCheckSafeLag, metrics.HealthCheckStatusDisabled) + } + hm.metrics.RecordUnsafeHeadRecovery(false, 0, 0, 0, 0, 0, 0, 0) +} + +func (hm *SequencerHealthMonitor) recordHealthCheckHeads(statusUnsafeNum, statusUnsafeTime, statusSafeNum, statusSafeTime, unsafeLag, safeLag uint64) { + if hm.metrics == nil { + return + } + hm.metrics.RecordHealthCheckHeads(statusUnsafeNum, statusUnsafeTime, statusSafeNum, statusSafeTime, unsafeLag, safeLag) +} + +func (hm *SequencerHealthMonitor) recordPeerCount(peerCount uint64) { + if hm.metrics == nil { + return + } + hm.metrics.RecordHealthCheckPeerCount(peerCount, hm.minPeerCount) +} + +func (hm *SequencerHealthMonitor) recordRollingWindow(check metrics.HealthCheck, tracker rollingWindowTracker, state rollingWindowState) { + if hm.metrics == nil { + return + } + successes := tracker.successes + failures := tracker.count - successes + hm.metrics.RecordHealthCheckWindow(check, metricWindowState(state), successes, failures, hm.recoveringWindowSizeOrDefault()) +} + +func (hm *SequencerHealthMonitor) recordCheckStatus(check metrics.HealthCheck, status metrics.HealthCheckStatus) { + if hm.metrics == nil { + return + } + hm.metrics.RecordHealthCheckStatus(check, status) +} + +func (hm *SequencerHealthMonitor) recordCheckFailure(check metrics.HealthCheck, reason metrics.HealthCheckFailureReason) { + if hm.metrics == nil { + return + } + hm.metrics.RecordHealthCheckFailure(check, reason) +} + +func (hm *SequencerHealthMonitor) recordUnsafeHeadRecovery(active bool, currentLag, wallElapsed, unsafeElapsed uint64) { + if hm.metrics == nil { + return + } + hm.metrics.RecordUnsafeHeadRecovery( + active, + currentLag, + hm.initialLagInRecovery, + hm.recoveryWindowStartLag, + wallElapsed, + unsafeElapsed, + hm.pollsInRecovery, + hm.pollsInRecoveryWindow, + ) +} + +func (hm *SequencerHealthMonitor) recordUnsafeHeadRecoveryEvent(event metrics.HealthCheckRecoveryEvent) { + if hm.metrics == nil { + return + } + hm.metrics.RecordUnsafeHeadRecoveryEvent(event) +} + +func metricWindowState(state rollingWindowState) metrics.HealthCheckWindowState { + switch state { + case rollingWindowSuccess: + return metrics.HealthCheckWindowStateSuccess + case rollingWindowFailed: + return metrics.HealthCheckWindowStateFailed + default: + return metrics.HealthCheckWindowStateInconclusive + } +} + // Start implements HealthMonitor. func (hm *SequencerHealthMonitor) Start(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) hm.cancel = cancel hm.log.Info("starting health monitor") + hm.recordStaticHealthCheckMetrics() hm.wg.Add(1) go hm.loop(ctx) @@ -164,7 +357,9 @@ func (hm *SequencerHealthMonitor) loop(ctx context.Context) { } // healthCheck checks the health of the sequencer by 3 criteria: -// 1. unsafe head is not too far behind now (measured by unsafeInterval) +// 1. unsafe head is not too far behind now. When interop reorg leniency is +// enabled, a lagging unsafe head may stay healthy while recovering faster than +// wall clock. // 2. safe head is progressing every configured batch submission interval // 3. peer count is above the configured minimum func (hm *SequencerHealthMonitor) healthCheck(ctx context.Context) error { @@ -217,11 +412,21 @@ func (hm *SequencerHealthMonitor) checkNode(ctx context.Context) error { } func (hm *SequencerHealthMonitor) checkNodeSyncStatus(ctx context.Context) error { + if hm.interopReorgLeniency { + return hm.checkNodeSyncStatusInteropReorgLenient(ctx) + } + return hm.checkNodeSyncStatusStrict(ctx) +} + +func (hm *SequencerHealthMonitor) checkNodeSyncStatusStrict(ctx context.Context) error { status, err := hm.node.SyncStatus(ctx) if err != nil { hm.log.Error("health monitor failed to get sync status", "err", err) + hm.recordCheckStatus(metrics.HealthCheckSyncStatusRPC, metrics.HealthCheckStatusUnhealthy) + hm.recordCheckFailure(metrics.HealthCheckSyncStatusRPC, metrics.HealthCheckFailureReasonRPCError) return ErrSequencerConnectionDown } + hm.recordCheckStatus(metrics.HealthCheckSyncStatusRPC, metrics.HealthCheckStatusHealthy) now := hm.timeProviderFn() @@ -231,6 +436,9 @@ func (hm *SequencerHealthMonitor) checkNodeSyncStatus(ctx context.Context) error } curUnsafeTimeDiff := calculateTimeDiff(now, status.UnsafeL2.Time) + curSafeTimeDiff := calculateTimeDiff(now, status.SafeL2.Time) + hm.recordHealthCheckHeads(status.UnsafeL2.Number, status.UnsafeL2.Time, status.SafeL2.Number, status.SafeL2.Time, curUnsafeTimeDiff, curSafeTimeDiff) + hm.recordUnsafeHeadRecovery(false, curUnsafeTimeDiff, 0, 0) if curUnsafeTimeDiff > hm.unsafeInterval { hm.log.Error( "unsafe head is falling behind the unsafe interval", @@ -240,10 +448,149 @@ func (hm *SequencerHealthMonitor) checkNodeSyncStatus(ctx context.Context) error "unsafe_interval", hm.unsafeInterval, "cur_unsafe_time_diff", curUnsafeTimeDiff, ) + hm.recordCheckStatus(metrics.HealthCheckUnsafeLag, metrics.HealthCheckStatusUnhealthy) + hm.recordCheckFailure(metrics.HealthCheckUnsafeLag, metrics.HealthCheckFailureReasonLagExceeded) return ErrSequencerNotHealthy } + hm.recordCheckStatus(metrics.HealthCheckUnsafeLag, metrics.HealthCheckStatusHealthy) - if hm.safeEnabled && calculateTimeDiff(now, status.SafeL2.Time) > hm.safeInterval { + if !hm.safeEnabled { + hm.recordCheckStatus(metrics.HealthCheckSafeLag, metrics.HealthCheckStatusDisabled) + return nil + } + + if curSafeTimeDiff > hm.safeInterval { + hm.log.Error( + "safe head is not progressing as expected", + "now", now, + "safe_head_num", status.SafeL2.Number, + "safe_head_time", status.SafeL2.Time, + "safe_interval", hm.safeInterval, + ) + hm.recordCheckStatus(metrics.HealthCheckSafeLag, metrics.HealthCheckStatusUnhealthy) + hm.recordCheckFailure(metrics.HealthCheckSafeLag, metrics.HealthCheckFailureReasonLagExceeded) + return ErrSequencerNotHealthy + } + hm.recordCheckStatus(metrics.HealthCheckSafeLag, metrics.HealthCheckStatusHealthy) + + return nil +} + +func (hm *SequencerHealthMonitor) checkNodeSyncStatusInteropReorgLenient(ctx context.Context) error { + windowSize := hm.recoveringWindowSizeOrDefault() + status, err := hm.node.SyncStatus(ctx) + if err != nil { + state := hm.syncStatusWindow.observe(false, windowSize) + hm.recordRollingWindow(metrics.HealthCheckSyncStatusRPC, hm.syncStatusWindow, state) + if state == rollingWindowFailed { + hm.log.Error( + "health monitor failed to get sync status", + "err", err, + "window_state", state, + "window_size", windowSize, + ) + hm.recordCheckStatus(metrics.HealthCheckSyncStatusRPC, metrics.HealthCheckStatusUnhealthy) + hm.recordCheckFailure(metrics.HealthCheckSyncStatusRPC, metrics.HealthCheckFailureReasonRPCError) + return ErrSequencerConnectionDown + } + hm.log.Warn( + "health monitor temporarily failed to get sync status", + "err", err, + "window_state", state, + "window_size", windowSize, + ) + hm.recordCheckStatus(metrics.HealthCheckSyncStatusRPC, metrics.HealthCheckStatusWarning) + return nil + } + state := hm.syncStatusWindow.observe(true, windowSize) + hm.recordRollingWindow(metrics.HealthCheckSyncStatusRPC, hm.syncStatusWindow, state) + hm.recordCheckStatus(metrics.HealthCheckSyncStatusRPC, metrics.HealthCheckStatusHealthy) + + now := hm.timeProviderFn() + + if status.UnsafeL2.Number > hm.lastSeenUnsafeNum { + hm.lastSeenUnsafeNum = status.UnsafeL2.Number + hm.lastSeenUnsafeTime = now + } + + curUnsafeLag := calculateTimeDiff(now, status.UnsafeL2.Time) + curSafeLag := calculateTimeDiff(now, status.SafeL2.Time) + hm.recordHealthCheckHeads(status.UnsafeL2.Number, status.UnsafeL2.Time, status.SafeL2.Number, status.SafeL2.Time, curUnsafeLag, curSafeLag) + switch { + case curUnsafeLag <= hm.unsafeInterval: + if hm.pollsInRecovery > 0 { + hm.log.Info( + "sequencer recovered from unsafe-head lag", + "polls_in_recovery", hm.pollsInRecovery, + "initial_lag_in_recovery", hm.initialLagInRecovery, + "cur_unsafe_lag", curUnsafeLag, + ) + hm.recordUnsafeHeadRecoveryEvent(metrics.HealthCheckRecoveryRecovered) + hm.clearUnsafeHeadRecovery() + } + hm.recordCheckStatus(metrics.HealthCheckUnsafeLag, metrics.HealthCheckStatusHealthy) + hm.recordUnsafeHeadRecovery(false, curUnsafeLag, 0, 0) + case hm.pollsInRecovery == 0: + hm.initialLagInRecovery = curUnsafeLag + hm.pollsInRecovery = 1 + hm.resetUnsafeHeadRecoveryWindow(now, status.UnsafeL2.Time, status.UnsafeL2.Number, curUnsafeLag) + hm.recordUnsafeHeadRecoveryEvent(metrics.HealthCheckRecoveryEntered) + hm.recordCheckStatus(metrics.HealthCheckUnsafeLag, metrics.HealthCheckStatusRecovering) + hm.recordUnsafeHeadRecovery(true, curUnsafeLag, 0, 0) + hm.log.Info( + "sequencer entering unsafe-head recovery", + "cur_unsafe_lag", curUnsafeLag, + "unsafe_interval", hm.unsafeInterval, + "unsafe_head_num", status.UnsafeL2.Number, + ) + default: + hm.pollsInRecovery++ + hm.pollsInRecoveryWindow++ + wallElapsed := calculateTimeDiff(now, hm.recoveryWindowStartWallTime) + unsafeElapsed := calculateTimeDiff(status.UnsafeL2.Time, hm.recoveryWindowStartUnsafe) + if unsafeElapsed > wallElapsed { + hm.log.Info( + "unsafe-head outpacing wall clock during recovery", + "cur_unsafe_lag", curUnsafeLag, + "previous_window_start_lag", hm.recoveryWindowStartLag, + "wall_elapsed", wallElapsed, + "unsafe_elapsed", unsafeElapsed, + "polls_in_recovery", hm.pollsInRecovery, + "polls_in_recovery_window", hm.pollsInRecoveryWindow, + ) + hm.resetUnsafeHeadRecoveryWindow(now, status.UnsafeL2.Time, status.UnsafeL2.Number, curUnsafeLag) + hm.recordUnsafeHeadRecoveryEvent(metrics.HealthCheckRecoveryWindowReset) + hm.recordCheckStatus(metrics.HealthCheckUnsafeLag, metrics.HealthCheckStatusRecovering) + hm.recordUnsafeHeadRecovery(true, curUnsafeLag, 0, 0) + break + } + if hm.pollsInRecoveryWindow >= windowSize { + hm.log.Error( + "unsafe-head recovery not outpacing wall clock", + "cur_unsafe_lag", curUnsafeLag, + "recovery_window_start_lag", hm.recoveryWindowStartLag, + "recovery_window_start_num", hm.recoveryWindowStartNum, + "wall_elapsed", wallElapsed, + "unsafe_elapsed", unsafeElapsed, + "polls_in_recovery", hm.pollsInRecovery, + "polls_in_recovery_window", hm.pollsInRecoveryWindow, + ) + hm.recordCheckStatus(metrics.HealthCheckUnsafeLag, metrics.HealthCheckStatusUnhealthy) + hm.recordCheckFailure(metrics.HealthCheckUnsafeLag, metrics.HealthCheckFailureReasonRecoveryWindowStalled) + hm.recordUnsafeHeadRecoveryEvent(metrics.HealthCheckRecoveryFailed) + hm.recordUnsafeHeadRecovery(true, curUnsafeLag, wallElapsed, unsafeElapsed) + return ErrSequencerNotHealthy + } + hm.recordCheckStatus(metrics.HealthCheckUnsafeLag, metrics.HealthCheckStatusRecovering) + hm.recordUnsafeHeadRecovery(true, curUnsafeLag, wallElapsed, unsafeElapsed) + } + + if !hm.safeEnabled { + hm.recordCheckStatus(metrics.HealthCheckSafeLag, metrics.HealthCheckStatusDisabled) + return nil + } + + if curSafeLag > hm.safeInterval { hm.log.Error( "safe head is not progressing as expected", "now", now, @@ -251,22 +598,94 @@ func (hm *SequencerHealthMonitor) checkNodeSyncStatus(ctx context.Context) error "safe_head_time", status.SafeL2.Time, "safe_interval", hm.safeInterval, ) + hm.recordCheckStatus(metrics.HealthCheckSafeLag, metrics.HealthCheckStatusUnhealthy) + hm.recordCheckFailure(metrics.HealthCheckSafeLag, metrics.HealthCheckFailureReasonLagExceeded) return ErrSequencerNotHealthy } + hm.recordCheckStatus(metrics.HealthCheckSafeLag, metrics.HealthCheckStatusHealthy) return nil } +func (hm *SequencerHealthMonitor) resetUnsafeHeadRecoveryWindow(now, unsafeTime, unsafeNum, curUnsafeLag uint64) { + hm.recoveryWindowStartLag = curUnsafeLag + hm.recoveryWindowStartWallTime = now + hm.recoveryWindowStartUnsafe = unsafeTime + hm.recoveryWindowStartNum = unsafeNum + hm.pollsInRecoveryWindow = 1 +} + +func (hm *SequencerHealthMonitor) clearUnsafeHeadRecovery() { + hm.initialLagInRecovery = 0 + hm.recoveryWindowStartLag = 0 + hm.recoveryWindowStartWallTime = 0 + hm.recoveryWindowStartUnsafe = 0 + hm.recoveryWindowStartNum = 0 + hm.pollsInRecovery = 0 + hm.pollsInRecoveryWindow = 0 +} + func (hm *SequencerHealthMonitor) checkNodePeerCount(ctx context.Context) error { + if hm.interopReorgLeniency { + return hm.checkNodePeerCountInteropReorgLenient(ctx) + } + return hm.checkNodePeerCountStrict(ctx) +} + +func (hm *SequencerHealthMonitor) checkNodePeerCountStrict(ctx context.Context) error { stats, err := hm.p2p.PeerStats(ctx) if err != nil { hm.log.Error("health monitor failed to get peer stats", "err", err) + hm.recordCheckStatus(metrics.HealthCheckPeerCount, metrics.HealthCheckStatusUnhealthy) + hm.recordCheckFailure(metrics.HealthCheckPeerCount, metrics.HealthCheckFailureReasonRPCError) return ErrSequencerConnectionDown } if uint64(stats.Connected) < hm.minPeerCount { + hm.recordPeerCount(uint64(stats.Connected)) hm.log.Error("peer count is below minimum", "connected", stats.Connected, "minPeerCount", hm.minPeerCount) + hm.recordCheckStatus(metrics.HealthCheckPeerCount, metrics.HealthCheckStatusUnhealthy) + hm.recordCheckFailure(metrics.HealthCheckPeerCount, metrics.HealthCheckFailureReasonPeerCountBelowMin) return ErrSequencerNotHealthy } + hm.recordPeerCount(uint64(stats.Connected)) + hm.recordCheckStatus(metrics.HealthCheckPeerCount, metrics.HealthCheckStatusHealthy) + + return nil +} + +func (hm *SequencerHealthMonitor) checkNodePeerCountInteropReorgLenient(ctx context.Context) error { + windowSize := hm.recoveringWindowSizeOrDefault() + stats, err := hm.p2p.PeerStats(ctx) + if err != nil { + state := hm.peerCountWindow.observe(false, windowSize) + hm.recordRollingWindow(metrics.HealthCheckPeerCount, hm.peerCountWindow, state) + if state == rollingWindowFailed { + hm.log.Error("health monitor failed to get peer stats", "err", err, "window_state", state, "window_size", windowSize) + hm.recordCheckStatus(metrics.HealthCheckPeerCount, metrics.HealthCheckStatusUnhealthy) + hm.recordCheckFailure(metrics.HealthCheckPeerCount, metrics.HealthCheckFailureReasonRPCError) + return ErrSequencerConnectionDown + } + hm.log.Warn("health monitor temporarily failed to get peer stats", "err", err, "window_state", state, "window_size", windowSize) + hm.recordCheckStatus(metrics.HealthCheckPeerCount, metrics.HealthCheckStatusWarning) + return nil + } + hm.recordPeerCount(uint64(stats.Connected)) + if uint64(stats.Connected) < hm.minPeerCount { + state := hm.peerCountWindow.observe(false, windowSize) + hm.recordRollingWindow(metrics.HealthCheckPeerCount, hm.peerCountWindow, state) + if state == rollingWindowFailed { + hm.log.Error("peer count is below minimum", "connected", stats.Connected, "minPeerCount", hm.minPeerCount, "window_state", state, "window_size", windowSize) + hm.recordCheckStatus(metrics.HealthCheckPeerCount, metrics.HealthCheckStatusUnhealthy) + hm.recordCheckFailure(metrics.HealthCheckPeerCount, metrics.HealthCheckFailureReasonPeerCountBelowMin) + return ErrSequencerNotHealthy + } + hm.log.Warn("peer count is temporarily below minimum", "connected", stats.Connected, "minPeerCount", hm.minPeerCount, "window_state", state, "window_size", windowSize) + hm.recordCheckStatus(metrics.HealthCheckPeerCount, metrics.HealthCheckStatusWarning) + return nil + } + state := hm.peerCountWindow.observe(true, windowSize) + hm.recordRollingWindow(metrics.HealthCheckPeerCount, hm.peerCountWindow, state) + hm.recordCheckStatus(metrics.HealthCheckPeerCount, metrics.HealthCheckStatusHealthy) return nil } diff --git a/op-conductor/health/monitor_test.go b/op-conductor/health/monitor_test.go index 7eae904e350..875bf2c870c 100644 --- a/op-conductor/health/monitor_test.go +++ b/op-conductor/health/monitor_test.go @@ -7,7 +7,9 @@ import ( "time" "github.com/ethereum/go-ethereum/log" + dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/ethereum-optimism/optimism/op-conductor/client" @@ -96,6 +98,22 @@ func (s *HealthMonitorTestSuite) SetupMonitor( type monitorOpts func(*SequencerHealthMonitor) +func withInteropReorgLeniency(monitor *SequencerHealthMonitor) { + monitor.interopReorgLeniency = true +} + +func withRecoveringWindowSize(windowSize uint64) monitorOpts { + return func(monitor *SequencerHealthMonitor) { + monitor.recoveringWindowSize = windowSize + } +} + +func withMetrics(metricer metrics.Metricer) monitorOpts { + return func(monitor *SequencerHealthMonitor) { + monitor.metrics = metricer + } +} + // SetupMonitorWithRollupBoost creates a HealthMonitor that includes a RollupBoostHealthChecker func (s *HealthMonitorTestSuite) SetupMonitorWithRollupBoost( now, unsafeInterval, safeInterval uint64, @@ -145,28 +163,620 @@ func (s *HealthMonitorTestSuite) SetupMonitorWithRollupBoost( return monitor } -func (s *HealthMonitorTestSuite) TestUnhealthyLowPeerCount() { - s.T().Parallel() +func newSyncStatusMonitor(t *testing.T, now, unsafeInterval, safeInterval uint64, mockRollupClient *testutils.MockRollupClient, opts ...monitorOpts) (*SequencerHealthMonitor, *timeProvider) { + t.Helper() + tp := &timeProvider{now: now} + monitor := &SequencerHealthMonitor{ + log: testlog.Logger(t, log.LevelDebug), + metrics: &metrics.NoopMetricsImpl{}, + rollupCfg: &rollup.Config{BlockTime: blockTime}, + unsafeInterval: unsafeInterval, + safeInterval: safeInterval, + safeEnabled: true, + timeProviderFn: tp.Now, + node: mockRollupClient, + } + for _, opt := range opts { + opt(monitor) + } + return monitor, tp +} + +func newPeerCountMonitor(t *testing.T, minPeerCount uint64, mockP2P *p2pMocks.API, opts ...monitorOpts) *SequencerHealthMonitor { + t.Helper() + monitor := &SequencerHealthMonitor{ + log: testlog.Logger(t, log.LevelDebug), + metrics: &metrics.NoopMetricsImpl{}, + minPeerCount: minPeerCount, + p2p: mockP2P, + } + for _, opt := range opts { + opt(monitor) + } + return monitor +} + +func conductorMetricValue(t *testing.T, metricer *metrics.Metrics, name string, labels map[string]string) float64 { + t.Helper() + metricFamilies, err := metricer.Registry().Gather() + require.NoError(t, err) + for _, family := range metricFamilies { + if family.GetName() != name { + continue + } + for _, metric := range family.GetMetric() { + if metricLabelsMatch(metric, labels) { + return metricSampleValue(t, metric) + } + } + } + t.Fatalf("metric %s with labels %v not found", name, labels) + return 0 +} + +func metricLabelsMatch(metric *dto.Metric, labels map[string]string) bool { + actual := make(map[string]string, len(metric.GetLabel())) + for _, label := range metric.GetLabel() { + actual[label.GetName()] = label.GetValue() + } + for key, want := range labels { + if actual[key] != want { + return false + } + } + return len(actual) == len(labels) +} + +func metricSampleValue(t *testing.T, metric *dto.Metric) float64 { + t.Helper() + if metric.GetGauge() != nil { + return metric.GetGauge().GetValue() + } + if metric.GetCounter() != nil { + return metric.GetCounter().GetValue() + } + t.Fatalf("metric has no gauge or counter sample") + return 0 +} + +func TestUnsafeHeadCatchingUpStaysHealthy(t *testing.T) { now := uint64(time.Now().Unix()) + rc := &testutils.MockRollupClient{} + polls := []struct { + now uint64 + unsafeTime uint64 + unsafeNum uint64 + }{ + {now: now, unsafeTime: now - 2, unsafeNum: 5}, + {now: now + 2, unsafeTime: now, unsafeNum: 6}, + {now: now + 22, unsafeTime: now + 2, unsafeNum: 7}, + {now: now + 24, unsafeTime: now + 4, unsafeNum: 8}, + {now: now + 26, unsafeTime: now + 8, unsafeNum: 9}, + {now: now + 28, unsafeTime: now + 14, unsafeNum: 10}, + {now: now + 30, unsafeTime: now + 20, unsafeNum: 11}, + {now: now + 32, unsafeTime: now + 28, unsafeNum: 12}, + } + for _, poll := range polls { + rc.ExpectSyncStatus(mockSyncStatus(poll.unsafeTime, poll.unsafeNum, poll.now, poll.unsafeNum), nil) + } + + monitor, tp := newSyncStatusMonitor(t, now, 10, 60, rc, withInteropReorgLeniency) + for _, poll := range polls { + tp.now = poll.now + require.NoError(t, monitor.checkNodeSyncStatus(context.Background())) + } +} +func TestDeepUnsafeHeadRecoveryOutpacesWallClockStaysHealthy(t *testing.T) { + now := uint64(time.Now().Unix()) rc := &testutils.MockRollupClient{} - ss1 := mockSyncStatus(now-1, 1, now-3, 0) - rc.ExpectSyncStatus(ss1, nil) - rc.ExpectSyncStatus(ss1, nil) + polls := []struct { + now uint64 + unsafeTime uint64 + unsafeNum uint64 + }{ + {now: now, unsafeTime: now - 2, unsafeNum: 100}, + {now: now + 2, unsafeTime: now - 600, unsafeNum: 1}, + {now: now + 4, unsafeTime: now - 590, unsafeNum: 2}, + {now: now + 6, unsafeTime: now - 580, unsafeNum: 3}, + {now: now + 8, unsafeTime: now - 570, unsafeNum: 4}, + {now: now + 10, unsafeTime: now - 560, unsafeNum: 5}, + {now: now + 12, unsafeTime: now - 550, unsafeNum: 6}, + {now: now + 14, unsafeTime: now - 540, unsafeNum: 7}, + } + for _, poll := range polls { + rc.ExpectSyncStatus(mockSyncStatus(poll.unsafeTime, poll.unsafeNum, poll.now, poll.unsafeNum), nil) + } + + monitor, tp := newSyncStatusMonitor(t, now, 2, 3600, rc, withInteropReorgLeniency) + for _, poll := range polls { + tp.now = poll.now + require.NoError(t, monitor.checkNodeSyncStatus(context.Background())) + } +} + +func TestStoppedSequencerStillMarkedUnhealthy(t *testing.T) { + now := uint64(time.Now().Unix()) + rc := &testutils.MockRollupClient{} + polls := []struct { + now uint64 + err error + }{ + {now: now}, + {now: now + 2}, + {now: now + 22}, + {now: now + 24}, + {now: now + 26}, + {now: now + 28}, + {now: now + 30, err: ErrSequencerNotHealthy}, + } + for _, poll := range polls { + rc.ExpectSyncStatus(mockSyncStatus(now, 6, poll.now, 6), nil) + } + + monitor, tp := newSyncStatusMonitor(t, now, 10, 60, rc, withInteropReorgLeniency) + for _, poll := range polls { + tp.now = poll.now + require.Equal(t, poll.err, monitor.checkNodeSyncStatus(context.Background())) + } +} + +func TestUnsafeHeadRecoveryAtNormalCadenceMarkedUnhealthy(t *testing.T) { + now := uint64(time.Now().Unix()) + rc := &testutils.MockRollupClient{} + polls := []struct { + now uint64 + unsafeTime uint64 + unsafeNum uint64 + err error + }{ + {now: now, unsafeTime: now - 2, unsafeNum: 5}, + {now: now + 2, unsafeTime: now - 100, unsafeNum: 6}, + {now: now + 4, unsafeTime: now - 98, unsafeNum: 7}, + {now: now + 6, unsafeTime: now - 96, unsafeNum: 8}, + {now: now + 8, unsafeTime: now - 94, unsafeNum: 9}, + {now: now + 10, unsafeTime: now - 92, unsafeNum: 10, err: ErrSequencerNotHealthy}, + } + for _, poll := range polls { + rc.ExpectSyncStatus(mockSyncStatus(poll.unsafeTime, poll.unsafeNum, poll.now, poll.unsafeNum), nil) + } + + monitor, tp := newSyncStatusMonitor(t, now, 2, 3600, rc, withInteropReorgLeniency) + for _, poll := range polls { + tp.now = poll.now + require.Equal(t, poll.err, monitor.checkNodeSyncStatus(context.Background())) + } +} + +func TestTransientSyncStatusFailureAfterHealthyPollStaysHealthy(t *testing.T) { + now := uint64(time.Now().Unix()) + rc := &testutils.MockRollupClient{} + syncErr := errors.New("optimism_syncStatus unavailable") + + rc.ExpectSyncStatus(mockSyncStatus(now, 5, now, 5), nil) + for i := uint64(0); i < defaultRecoveringWindowSize; i++ { + rc.ExpectSyncStatus(nil, syncErr) + } + + monitor, _ := newSyncStatusMonitor(t, now, 10, 60, rc, withInteropReorgLeniency) + require.NoError(t, monitor.checkNodeSyncStatus(context.Background())) + for i := uint64(1); i < defaultRecoveringWindowSize; i++ { + require.NoError(t, monitor.checkNodeSyncStatus(context.Background())) + } + require.Equal(t, ErrSequencerConnectionDown, monitor.checkNodeSyncStatus(context.Background())) +} + +func TestInitialSyncStatusFailureToleratedUntilWindowFails(t *testing.T) { + now := uint64(time.Now().Unix()) + rc := &testutils.MockRollupClient{} + for i := uint64(0); i < defaultRecoveringWindowSize; i++ { + rc.ExpectSyncStatus(nil, errors.New("optimism_syncStatus unavailable")) + } + + monitor, _ := newSyncStatusMonitor(t, now, 10, 60, rc, withInteropReorgLeniency) + for i := uint64(1); i < defaultRecoveringWindowSize; i++ { + require.NoError(t, monitor.checkNodeSyncStatus(context.Background())) + } + require.Equal(t, ErrSequencerConnectionDown, monitor.checkNodeSyncStatus(context.Background())) +} + +func TestLenientSyncStatusFailureUsesConfiguredWindowSize(t *testing.T) { + now := uint64(time.Now().Unix()) + metricer := metrics.NewMetrics() + rc := &testutils.MockRollupClient{} + windowSize := uint64(3) + for i := uint64(0); i < windowSize; i++ { + rc.ExpectSyncStatus(nil, errors.New("optimism_syncStatus unavailable")) + } + + monitor, _ := newSyncStatusMonitor(t, now, 10, 60, rc, withInteropReorgLeniency, withRecoveringWindowSize(windowSize), withMetrics(metricer)) + for i := uint64(1); i < windowSize; i++ { + require.NoError(t, monitor.checkNodeSyncStatus(context.Background())) + } + require.Equal(t, ErrSequencerConnectionDown, monitor.checkNodeSyncStatus(context.Background())) + require.Equal(t, float64(windowSize), conductorMetricValue(t, metricer, "op_conductor_healthcheck_window_size", map[string]string{ + "check": string(metrics.HealthCheckSyncStatusRPC), + })) +} + +func TestMixedSyncStatusFailureWindowStaysHealthy(t *testing.T) { + now := uint64(time.Now().Unix()) + rc := &testutils.MockRollupClient{} + syncErr := errors.New("optimism_syncStatus unavailable") + polls := []struct { + status *eth.SyncStatus + err error + }{ + {err: syncErr}, + {status: mockSyncStatus(now, 5, now, 5)}, + {err: syncErr}, + {status: mockSyncStatus(now, 6, now, 6)}, + {err: syncErr}, + {err: syncErr}, + {status: mockSyncStatus(now, 7, now, 7)}, + } + for _, poll := range polls { + rc.ExpectSyncStatus(poll.status, poll.err) + } + + monitor, _ := newSyncStatusMonitor(t, now, 10, 60, rc, withInteropReorgLeniency) + for range polls { + require.NoError(t, monitor.checkNodeSyncStatus(context.Background())) + } +} + +func TestSingleSyncStatusSuccessDoesNotResetWindow(t *testing.T) { + now := uint64(time.Now().Unix()) + rc := &testutils.MockRollupClient{} + syncErr := errors.New("optimism_syncStatus unavailable") + for i := uint64(1); i < defaultRecoveringWindowSize; i++ { + rc.ExpectSyncStatus(nil, syncErr) + } + rc.ExpectSyncStatus(mockSyncStatus(now, 5, now, 5), nil) + for i := uint64(1); i < defaultRecoveringWindowSize; i++ { + rc.ExpectSyncStatus(nil, syncErr) + } + for i := uint64(0); i < defaultRecoveringWindowSize; i++ { + rc.ExpectSyncStatus(mockSyncStatus(now, 6+i, now, 6+i), nil) + } + + monitor, _ := newSyncStatusMonitor(t, now, 10, 60, rc, withInteropReorgLeniency) + for i := uint64(1); i < defaultRecoveringWindowSize; i++ { + require.NoError(t, monitor.checkNodeSyncStatus(context.Background())) + } + require.NoError(t, monitor.checkNodeSyncStatus(context.Background()), "single success after failures should still be inconclusive") + for i := uint64(1); i < defaultRecoveringWindowSize; i++ { + require.NoError(t, monitor.checkNodeSyncStatus(context.Background())) + } + for i := uint64(1); i < defaultRecoveringWindowSize; i++ { + require.NoError(t, monitor.checkNodeSyncStatus(context.Background()), "recovering successes should not immediately restore Success state") + } + require.NoError(t, monitor.checkNodeSyncStatus(context.Background()), "full window of successes should restore Success state") +} + +func TestPollsInRecoveryNotResetOnSecondRegression(t *testing.T) { + now := uint64(time.Now().Unix()) + rc := &testutils.MockRollupClient{} + polls := []struct { + now uint64 + unsafeTime uint64 + unsafeNum uint64 + err error + }{ + {now: now, unsafeTime: now, unsafeNum: 5}, + {now: now + 2, unsafeTime: now + 2, unsafeNum: 6}, + {now: now + 17, unsafeTime: now + 2, unsafeNum: 7}, + {now: now + 19, unsafeTime: now + 7, unsafeNum: 8}, + {now: now + 21, unsafeTime: now + 3, unsafeNum: 9}, + {now: now + 23, unsafeTime: now + 3, unsafeNum: 9}, + {now: now + 25, unsafeTime: now + 3, unsafeNum: 9}, + {now: now + 27, unsafeTime: now + 3, unsafeNum: 9, err: ErrSequencerNotHealthy}, + } + for _, poll := range polls { + rc.ExpectSyncStatus(mockSyncStatus(poll.unsafeTime, poll.unsafeNum, poll.now, poll.unsafeNum), nil) + } + + monitor, tp := newSyncStatusMonitor(t, now, 10, 60, rc, withInteropReorgLeniency) + for _, poll := range polls { + tp.now = poll.now + require.Equal(t, poll.err, monitor.checkNodeSyncStatus(context.Background())) + } + require.Equal(t, uint64(15), monitor.initialLagInRecovery) + require.Equal(t, uint64(12), monitor.recoveryWindowStartLag) + require.Equal(t, uint64(6), monitor.pollsInRecovery) + require.Equal(t, uint64(defaultRecoveringWindowSize), monitor.pollsInRecoveryWindow) +} + +func TestUnsafeHeadRecoveryPlateauAboveUnsafeIntervalMarkedUnhealthy(t *testing.T) { + now := uint64(time.Now().Unix()) + rc := &testutils.MockRollupClient{} + polls := []struct { + now uint64 + unsafeLag uint64 + unsafeNum uint64 + err error + }{ + {now: now, unsafeLag: 25, unsafeNum: 5}, + {now: now + 2, unsafeLag: 24, unsafeNum: 6}, + {now: now + 4, unsafeLag: 24, unsafeNum: 7}, + {now: now + 6, unsafeLag: 24, unsafeNum: 8}, + {now: now + 8, unsafeLag: 24, unsafeNum: 9}, + {now: now + 10, unsafeLag: 24, unsafeNum: 10, err: ErrSequencerNotHealthy}, + } + for _, poll := range polls { + rc.ExpectSyncStatus(mockSyncStatus(poll.now-poll.unsafeLag, poll.unsafeNum, poll.now, poll.unsafeNum), nil) + } + + monitor, tp := newSyncStatusMonitor(t, now, 10, 60, rc, withInteropReorgLeniency) + for _, poll := range polls { + tp.now = poll.now + require.Equal(t, poll.err, monitor.checkNodeSyncStatus(context.Background())) + } +} + +func TestUnsafeHeadRecoveryUsesConfiguredWindowSize(t *testing.T) { + now := uint64(time.Now().Unix()) + windowSize := uint64(3) + rc := &testutils.MockRollupClient{} + polls := []struct { + now uint64 + unsafeLag uint64 + unsafeNum uint64 + err error + }{ + {now: now, unsafeLag: 25, unsafeNum: 5}, + {now: now + 2, unsafeLag: 24, unsafeNum: 6}, + {now: now + 4, unsafeLag: 24, unsafeNum: 7}, + {now: now + 6, unsafeLag: 24, unsafeNum: 8, err: ErrSequencerNotHealthy}, + } + for _, poll := range polls { + rc.ExpectSyncStatus(mockSyncStatus(poll.now-poll.unsafeLag, poll.unsafeNum, poll.now, poll.unsafeNum), nil) + } + + monitor, tp := newSyncStatusMonitor(t, now, 10, 60, rc, withInteropReorgLeniency, withRecoveringWindowSize(windowSize)) + for _, poll := range polls { + tp.now = poll.now + require.Equal(t, poll.err, monitor.checkNodeSyncStatus(context.Background())) + } + require.Equal(t, windowSize, monitor.pollsInRecoveryWindow) +} + +func TestUnsafeHeadRecoveryRecordsTransitionMetrics(t *testing.T) { + now := uint64(time.Now().Unix()) + metricer := metrics.NewMetrics() + rc := &testutils.MockRollupClient{} + recoveryPolls := []struct { + now uint64 + unsafeTime uint64 + unsafeNum uint64 + }{ + {now: now, unsafeTime: now - 20, unsafeNum: 1}, + {now: now + 1, unsafeTime: now - 10, unsafeNum: 2}, + {now: now + 2, unsafeTime: now - 7, unsafeNum: 3}, + } + for _, poll := range recoveryPolls { + rc.ExpectSyncStatus(mockSyncStatus(poll.unsafeTime, poll.unsafeNum, poll.now, poll.unsafeNum), nil) + } + + monitor, tp := newSyncStatusMonitor(t, now, 10, 100, rc, withInteropReorgLeniency, withMetrics(metricer)) + for _, poll := range recoveryPolls { + tp.now = poll.now + require.NoError(t, monitor.checkNodeSyncStatus(context.Background())) + } + + require.Equal(t, float64(1), conductorMetricValue(t, metricer, "op_conductor_healthcheck_interop_reorg_recovery_events_count", map[string]string{ + "event": string(metrics.HealthCheckRecoveryEntered), + })) + require.Equal(t, float64(1), conductorMetricValue(t, metricer, "op_conductor_healthcheck_interop_reorg_recovery_events_count", map[string]string{ + "event": string(metrics.HealthCheckRecoveryWindowReset), + })) + require.Equal(t, float64(1), conductorMetricValue(t, metricer, "op_conductor_healthcheck_interop_reorg_recovery_events_count", map[string]string{ + "event": string(metrics.HealthCheckRecoveryRecovered), + })) + require.Equal(t, float64(0), conductorMetricValue(t, metricer, "op_conductor_healthcheck_unsafe_head_recovery_active", nil)) + require.Equal(t, float64(9), conductorMetricValue(t, metricer, "op_conductor_healthcheck_unsafe_head_recovery_current_lag_seconds", nil)) + + failureMetrics := metrics.NewMetrics() + failingRC := &testutils.MockRollupClient{} + for i := uint64(0); i < defaultRecoveringWindowSize; i++ { + failingRC.ExpectSyncStatus(mockSyncStatus(now-20, i+1, now+i, i+1), nil) + } + + failingMonitor, failingTP := newSyncStatusMonitor(t, now, 10, 100, failingRC, withInteropReorgLeniency, withMetrics(failureMetrics)) + for i := uint64(0); i < defaultRecoveringWindowSize-1; i++ { + failingTP.now = now + i + require.NoError(t, failingMonitor.checkNodeSyncStatus(context.Background())) + } + failingTP.now = now + defaultRecoveringWindowSize - 1 + require.Equal(t, ErrSequencerNotHealthy, failingMonitor.checkNodeSyncStatus(context.Background())) + require.Equal(t, float64(1), conductorMetricValue(t, failureMetrics, "op_conductor_healthcheck_interop_reorg_recovery_events_count", map[string]string{ + "event": string(metrics.HealthCheckRecoveryFailed), + })) + require.Equal(t, float64(1), conductorMetricValue(t, failureMetrics, "op_conductor_healthcheck_unsafe_head_recovery_active", nil)) + require.Equal(t, float64(defaultRecoveringWindowSize), conductorMetricValue(t, failureMetrics, "op_conductor_healthcheck_unsafe_head_recovery_polls", nil)) + require.Equal(t, float64(1), conductorMetricValue(t, failureMetrics, "op_conductor_healthcheck_check_status", map[string]string{ + "check": string(metrics.HealthCheckUnsafeLag), + "status": string(metrics.HealthCheckStatusUnhealthy), + })) +} + +func TestDefaultStrictUnsafeLagFailsImmediately(t *testing.T) { + now := uint64(time.Now().Unix()) + rc := &testutils.MockRollupClient{} + rc.ExpectSyncStatus(mockSyncStatus(now-11, 7, now, 7), nil) + + monitor, tp := newSyncStatusMonitor(t, now, 10, 60, rc) + tp.now = now + require.Equal(t, ErrSequencerNotHealthy, monitor.checkNodeSyncStatus(context.Background())) +} + +func TestDefaultStrictUnsafeLagRecordsMetrics(t *testing.T) { + now := uint64(time.Now().Unix()) + metricer := metrics.NewMetrics() + rc := &testutils.MockRollupClient{} + rc.ExpectSyncStatus(mockSyncStatus(now-11, 7, now, 7), nil) + + monitor, tp := newSyncStatusMonitor(t, now, 10, 60, rc, withMetrics(metricer)) + tp.now = now + require.Equal(t, ErrSequencerNotHealthy, monitor.checkNodeSyncStatus(context.Background())) + + require.Equal(t, float64(11), conductorMetricValue(t, metricer, "op_conductor_healthcheck_unsafe_lag_seconds", nil)) + require.Equal(t, float64(7), conductorMetricValue(t, metricer, "op_conductor_healthcheck_unsafe_head_number", nil)) + require.Equal(t, float64(1), conductorMetricValue(t, metricer, "op_conductor_healthcheck_check_status", map[string]string{ + "check": string(metrics.HealthCheckUnsafeLag), + "status": string(metrics.HealthCheckStatusUnhealthy), + })) + require.Equal(t, float64(1), conductorMetricValue(t, metricer, "op_conductor_healthcheck_check_failures_count", map[string]string{ + "check": string(metrics.HealthCheckUnsafeLag), + "reason": string(metrics.HealthCheckFailureReasonLagExceeded), + })) +} + +func TestDefaultStrictSyncStatusRPCErrorFailsImmediately(t *testing.T) { + rc := &testutils.MockRollupClient{} + rc.ExpectSyncStatus(nil, errors.New("optimism_syncStatus unavailable")) + + monitor, _ := newSyncStatusMonitor(t, uint64(time.Now().Unix()), 10, 60, rc) + require.Equal(t, ErrSequencerConnectionDown, monitor.checkNodeSyncStatus(context.Background())) +} + +func TestLenientSyncStatusRPCFailureRecordsMetrics(t *testing.T) { + now := uint64(time.Now().Unix()) + metricer := metrics.NewMetrics() + rc := &testutils.MockRollupClient{} + syncErr := errors.New("optimism_syncStatus unavailable") + for i := uint64(0); i < defaultRecoveringWindowSize; i++ { + rc.ExpectSyncStatus(nil, syncErr) + } + + monitor, _ := newSyncStatusMonitor(t, now, 10, 60, rc, withInteropReorgLeniency, withMetrics(metricer)) + require.NoError(t, monitor.checkNodeSyncStatus(context.Background())) + require.Equal(t, float64(1), conductorMetricValue(t, metricer, "op_conductor_healthcheck_check_status", map[string]string{ + "check": string(metrics.HealthCheckSyncStatusRPC), + "status": string(metrics.HealthCheckStatusWarning), + })) + require.Equal(t, float64(1), conductorMetricValue(t, metricer, "op_conductor_healthcheck_window_observations", map[string]string{ + "check": string(metrics.HealthCheckSyncStatusRPC), + "result": "failure", + })) + + for i := uint64(1); i < defaultRecoveringWindowSize-1; i++ { + require.NoError(t, monitor.checkNodeSyncStatus(context.Background())) + } + require.Equal(t, ErrSequencerConnectionDown, monitor.checkNodeSyncStatus(context.Background())) + require.Equal(t, float64(1), conductorMetricValue(t, metricer, "op_conductor_healthcheck_window_state", map[string]string{ + "check": string(metrics.HealthCheckSyncStatusRPC), + "state": string(metrics.HealthCheckWindowStateFailed), + })) + require.Equal(t, float64(defaultRecoveringWindowSize), conductorMetricValue(t, metricer, "op_conductor_healthcheck_window_observations", map[string]string{ + "check": string(metrics.HealthCheckSyncStatusRPC), + "result": "failure", + })) + require.Equal(t, float64(1), conductorMetricValue(t, metricer, "op_conductor_healthcheck_window_filled", map[string]string{ + "check": string(metrics.HealthCheckSyncStatusRPC), + })) + require.Equal(t, float64(1), conductorMetricValue(t, metricer, "op_conductor_healthcheck_check_status", map[string]string{ + "check": string(metrics.HealthCheckSyncStatusRPC), + "status": string(metrics.HealthCheckStatusUnhealthy), + })) + require.Equal(t, float64(1), conductorMetricValue(t, metricer, "op_conductor_healthcheck_check_failures_count", map[string]string{ + "check": string(metrics.HealthCheckSyncStatusRPC), + "reason": string(metrics.HealthCheckFailureReasonRPCError), + })) +} + +func TestDefaultStrictPeerStatsRPCErrorFailsImmediately(t *testing.T) { + pc := &p2pMocks.API{} + pc.EXPECT().PeerStats(mock.Anything).Return(nil, errors.New("p2p unavailable")).Once() + + monitor := newPeerCountMonitor(t, minPeerCount, pc) + require.Equal(t, ErrSequencerConnectionDown, monitor.checkNodePeerCount(context.Background())) + pc.AssertExpectations(t) +} + +func TestDefaultStrictLowPeerCountFailsImmediately(t *testing.T) { + pc := &p2pMocks.API{} + pc.EXPECT().PeerStats(mock.Anything).Return(&apis.PeerStats{Connected: unhealthyPeerCount}, nil).Once() + + monitor := newPeerCountMonitor(t, minPeerCount, pc) + require.Equal(t, ErrSequencerNotHealthy, monitor.checkNodePeerCount(context.Background())) + pc.AssertExpectations(t) +} + +func TestLenientLowPeerCountRecordsMetrics(t *testing.T) { + metricer := metrics.NewMetrics() + pc := &p2pMocks.API{} + pc.EXPECT().PeerStats(mock.Anything).Return(&apis.PeerStats{Connected: unhealthyPeerCount}, nil).Times(defaultRecoveringWindowSize) + + monitor := newPeerCountMonitor(t, minPeerCount, pc, withInteropReorgLeniency, withMetrics(metricer)) + require.NoError(t, monitor.checkNodePeerCount(context.Background())) + require.Equal(t, float64(unhealthyPeerCount), conductorMetricValue(t, metricer, "op_conductor_healthcheck_cl_peer_count", nil)) + require.Equal(t, float64(1), conductorMetricValue(t, metricer, "op_conductor_healthcheck_check_status", map[string]string{ + "check": string(metrics.HealthCheckPeerCount), + "status": string(metrics.HealthCheckStatusWarning), + })) + + for i := uint64(1); i < defaultRecoveringWindowSize-1; i++ { + require.NoError(t, monitor.checkNodePeerCount(context.Background())) + } + require.Equal(t, ErrSequencerNotHealthy, monitor.checkNodePeerCount(context.Background())) + require.Equal(t, float64(1), conductorMetricValue(t, metricer, "op_conductor_healthcheck_window_state", map[string]string{ + "check": string(metrics.HealthCheckPeerCount), + "state": string(metrics.HealthCheckWindowStateFailed), + })) + require.Equal(t, float64(1), conductorMetricValue(t, metricer, "op_conductor_healthcheck_check_status", map[string]string{ + "check": string(metrics.HealthCheckPeerCount), + "status": string(metrics.HealthCheckStatusUnhealthy), + })) + require.Equal(t, float64(1), conductorMetricValue(t, metricer, "op_conductor_healthcheck_check_failures_count", map[string]string{ + "check": string(metrics.HealthCheckPeerCount), + "reason": string(metrics.HealthCheckFailureReasonPeerCountBelowMin), + })) + pc.AssertExpectations(t) +} + +func (s *HealthMonitorTestSuite) TestUnhealthyLowPeerCount() { + s.T().Parallel() pc := &p2pMocks.API{} ps1 := &apis.PeerStats{ Connected: unhealthyPeerCount, } - pc.EXPECT().PeerStats(mock.Anything).Return(ps1, nil).Times(1) + pc.EXPECT().PeerStats(mock.Anything).Return(ps1, nil).Times(defaultRecoveringWindowSize) - monitor := s.SetupMonitor(now, 60, 60, rc, pc, nil) + monitor := newPeerCountMonitor(s.T(), s.minPeerCount, pc, withInteropReorgLeniency) + for i := uint64(1); i < defaultRecoveringWindowSize; i++ { + s.NoError(monitor.checkNodePeerCount(context.Background())) + } + s.Equal(ErrSequencerNotHealthy, monitor.checkNodePeerCount(context.Background())) + pc.AssertExpectations(s.T()) +} - healthUpdateCh := monitor.Subscribe() - healthFailure := <-healthUpdateCh - s.NotNil(healthFailure) +func TestPeerStatsRPCErrorWindow(t *testing.T) { + pc := &p2pMocks.API{} + pc.EXPECT().PeerStats(mock.Anything).Return(nil, errors.New("p2p unavailable")).Times(defaultRecoveringWindowSize) - s.NoError(monitor.Stop()) + monitor := newPeerCountMonitor(t, minPeerCount, pc, withInteropReorgLeniency) + for i := uint64(1); i < defaultRecoveringWindowSize; i++ { + require.NoError(t, monitor.checkNodePeerCount(context.Background())) + } + require.Equal(t, ErrSequencerConnectionDown, monitor.checkNodePeerCount(context.Background())) + pc.AssertExpectations(t) +} + +func TestPeerCountMixedSuccessFailureWindowStaysHealthy(t *testing.T) { + pc := &p2pMocks.API{} + pc.EXPECT().PeerStats(mock.Anything).Return(nil, errors.New("p2p unavailable")).Once() + pc.EXPECT().PeerStats(mock.Anything).Return(&apis.PeerStats{Connected: healthyPeerCount}, nil).Once() + pc.EXPECT().PeerStats(mock.Anything).Return(&apis.PeerStats{Connected: unhealthyPeerCount}, nil).Once() + pc.EXPECT().PeerStats(mock.Anything).Return(&apis.PeerStats{Connected: healthyPeerCount}, nil).Once() + pc.EXPECT().PeerStats(mock.Anything).Return(nil, errors.New("p2p unavailable")).Once() + + monitor := newPeerCountMonitor(t, minPeerCount, pc, withInteropReorgLeniency) + for i := uint64(0); i < defaultRecoveringWindowSize; i++ { + require.NoError(t, monitor.checkNodePeerCount(context.Background())) + } + pc.AssertExpectations(t) } func (s *HealthMonitorTestSuite) TestUnhealthyLowElP2pPeerCount() { @@ -202,30 +812,23 @@ func (s *HealthMonitorTestSuite) TestUnhealthyUnsafeHeadNotProgressing() { rc := &testutils.MockRollupClient{} ss1 := mockSyncStatus(now, 5, now-8, 1) - unsafeBlocksInterval := 10 - for i := 0; i < unsafeBlocksInterval+2; i++ { + unsafeBlocksInterval := uint64(10) + for i := uint64(0); i < unsafeBlocksInterval+defaultRecoveringWindowSize+1; i++ { rc.ExpectSyncStatus(ss1, nil) } - elP2pClient := &clientmocks.ElP2PClient{} - elP2pClient.EXPECT().PeerCount(mock.Anything).Return(healthyElP2pPeerCount, nil) - - monitor := s.SetupMonitor(now, uint64(unsafeBlocksInterval), 60, rc, nil, elP2pClient) - healthUpdateCh := monitor.Subscribe() - - // once the unsafe interval is surpassed, we should expect "unsafe head is falling behind the unsafe interval" - for i := 0; i < unsafeBlocksInterval+2; i++ { - healthFailure := <-healthUpdateCh - if i <= unsafeBlocksInterval { + monitor, tp := newSyncStatusMonitor(s.T(), now, unsafeBlocksInterval, 60, rc, withInteropReorgLeniency) + for i := uint64(0); i < unsafeBlocksInterval+defaultRecoveringWindowSize+1; i++ { + tp.now = now + i + healthFailure := monitor.checkNodeSyncStatus(context.Background()) + if i < unsafeBlocksInterval+defaultRecoveringWindowSize { s.Nil(healthFailure) s.Equal(now, monitor.lastSeenUnsafeTime) s.Equal(uint64(5), monitor.lastSeenUnsafeNum) } else { - s.NotNil(healthFailure) + s.Equal(ErrSequencerNotHealthy, healthFailure) } } - - s.NoError(monitor.Stop()) } func (s *HealthMonitorTestSuite) TestUnhealthySafeHeadNotProgressing() { diff --git a/op-conductor/metrics/metrics.go b/op-conductor/metrics/metrics.go index 05e95e54f3c..752a3cb29ee 100644 --- a/op-conductor/metrics/metrics.go +++ b/op-conductor/metrics/metrics.go @@ -10,6 +10,51 @@ import ( const Namespace = "op_conductor" +type HealthCheck string + +const ( + HealthCheckSyncStatusRPC HealthCheck = "sync_status_rpc" + HealthCheckUnsafeLag HealthCheck = "unsafe_lag" + HealthCheckSafeLag HealthCheck = "safe_lag" + HealthCheckPeerCount HealthCheck = "peer_count" +) + +type HealthCheckStatus string + +const ( + HealthCheckStatusHealthy HealthCheckStatus = "healthy" + HealthCheckStatusWarning HealthCheckStatus = "warning" + HealthCheckStatusRecovering HealthCheckStatus = "recovering" + HealthCheckStatusUnhealthy HealthCheckStatus = "unhealthy" + HealthCheckStatusDisabled HealthCheckStatus = "disabled" +) + +type HealthCheckWindowState string + +const ( + HealthCheckWindowStateSuccess HealthCheckWindowState = "success" + HealthCheckWindowStateFailed HealthCheckWindowState = "failed" + HealthCheckWindowStateInconclusive HealthCheckWindowState = "inconclusive" +) + +type HealthCheckFailureReason string + +const ( + HealthCheckFailureReasonRPCError HealthCheckFailureReason = "rpc_error" + HealthCheckFailureReasonLagExceeded HealthCheckFailureReason = "lag_exceeded" + HealthCheckFailureReasonPeerCountBelowMin HealthCheckFailureReason = "peer_count_below_min" + HealthCheckFailureReasonRecoveryWindowStalled HealthCheckFailureReason = "recovery_window_stalled" +) + +type HealthCheckRecoveryEvent string + +const ( + HealthCheckRecoveryEntered HealthCheckRecoveryEvent = "entered" + HealthCheckRecoveryWindowReset HealthCheckRecoveryEvent = "window_reset" + HealthCheckRecoveryRecovered HealthCheckRecoveryEvent = "recovered" + HealthCheckRecoveryFailed HealthCheckRecoveryEvent = "failed" +) + type Metricer interface { RecordInfo(version string) RecordUp() @@ -21,6 +66,14 @@ type Metricer interface { RecordLoopExecutionTime(duration float64) RecordRollupBoostConnectionAttempts(success bool, source string) RecordWebSocketClientCount(count int) + RecordHealthCheckConfig(interval, unsafeInterval, safeInterval, minPeerCount, interopReorgLeniencyWindowSize uint64, safeEnabled, interopReorgLeniency bool) + RecordHealthCheckHeads(unsafeNumber, unsafeTimestamp, safeNumber, safeTimestamp, unsafeLag, safeLag uint64) + RecordHealthCheckPeerCount(peerCount, minPeerCount uint64) + RecordHealthCheckWindow(check HealthCheck, state HealthCheckWindowState, successes, failures, windowSize uint64) + RecordHealthCheckStatus(check HealthCheck, status HealthCheckStatus) + RecordHealthCheckFailure(check HealthCheck, reason HealthCheckFailureReason) + RecordUnsafeHeadRecovery(active bool, currentLag, initialLag, windowStartLag, wallElapsed, unsafeElapsed, polls, pollsInWindow uint64) + RecordUnsafeHeadRecoveryEvent(event HealthCheckRecoveryEvent) opmetrics.RPCMetricer } @@ -46,6 +99,36 @@ type Metrics struct { loopExecutionTime prometheus.Histogram webSocketClients prometheus.Gauge + + healthCheckIntervalSeconds prometheus.Gauge + healthCheckUnsafeIntervalSeconds prometheus.Gauge + healthCheckSafeIntervalSeconds prometheus.Gauge + healthCheckSafeEnabled prometheus.Gauge + healthCheckInteropReorgLeniencyEnabled prometheus.Gauge + healthCheckInteropReorgLeniencyWindowSize prometheus.Gauge + healthCheckUnsafeLagSeconds prometheus.Gauge + healthCheckSafeLagSeconds prometheus.Gauge + healthCheckUnsafeHeadNumber prometheus.Gauge + healthCheckUnsafeHeadTimestamp prometheus.Gauge + healthCheckSafeHeadNumber prometheus.Gauge + healthCheckSafeHeadTimestamp prometheus.Gauge + healthCheckCLPeerCount prometheus.Gauge + healthCheckMinPeerCount prometheus.Gauge + healthCheckUnsafeHeadRecoveryActive prometheus.Gauge + healthCheckUnsafeHeadRecoveryCurrentLag prometheus.Gauge + healthCheckUnsafeHeadRecoveryInitialLag prometheus.Gauge + healthCheckUnsafeHeadRecoveryWindowStartLag prometheus.Gauge + healthCheckUnsafeHeadRecoveryWallElapsed prometheus.Gauge + healthCheckUnsafeHeadRecoveryUnsafeElapsed prometheus.Gauge + healthCheckUnsafeHeadRecoveryPolls prometheus.Gauge + healthCheckUnsafeHeadRecoveryPollsInWindow prometheus.Gauge + healthCheckWindowState *prometheus.GaugeVec + healthCheckWindowObservations *prometheus.GaugeVec + healthCheckWindowFilled *prometheus.GaugeVec + healthCheckWindowSize *prometheus.GaugeVec + healthCheckStatus *prometheus.GaugeVec + healthCheckFailures *prometheus.CounterVec + healthCheckRecoveryEvents *prometheus.CounterVec } func (m *Metrics) Registry() *prometheus.Registry { @@ -122,6 +205,151 @@ func NewMetrics() *Metrics { Name: "websocket_clients_connected", Help: "Number of WebSocket clients currently connected to the hub", }), + healthCheckIntervalSeconds: factory.NewGauge(prometheus.GaugeOpts{ + Namespace: Namespace, + Name: "healthcheck_interval_seconds", + Help: "Configured interval between conductor health checks, in seconds", + }), + healthCheckUnsafeIntervalSeconds: factory.NewGauge(prometheus.GaugeOpts{ + Namespace: Namespace, + Name: "healthcheck_unsafe_interval_seconds", + Help: "Configured maximum unsafe-head lag, in seconds", + }), + healthCheckSafeIntervalSeconds: factory.NewGauge(prometheus.GaugeOpts{ + Namespace: Namespace, + Name: "healthcheck_safe_interval_seconds", + Help: "Configured maximum safe-head lag, in seconds", + }), + healthCheckSafeEnabled: factory.NewGauge(prometheus.GaugeOpts{ + Namespace: Namespace, + Name: "healthcheck_safe_enabled", + Help: "1 if conductor safe-head health checks are enabled", + }), + healthCheckInteropReorgLeniencyEnabled: factory.NewGauge(prometheus.GaugeOpts{ + Namespace: Namespace, + Name: "healthcheck_interop_reorg_leniency_enabled", + Help: "1 if conductor interop reorg health-check leniency is enabled", + }), + healthCheckInteropReorgLeniencyWindowSize: factory.NewGauge(prometheus.GaugeOpts{ + Namespace: Namespace, + Name: "healthcheck_interop_reorg_leniency_window_size", + Help: "Configured number of observations in the conductor interop reorg health-check leniency window", + }), + healthCheckUnsafeLagSeconds: factory.NewGauge(prometheus.GaugeOpts{ + Namespace: Namespace, + Name: "healthcheck_unsafe_lag_seconds", + Help: "Current unsafe-head lag, in seconds", + }), + healthCheckSafeLagSeconds: factory.NewGauge(prometheus.GaugeOpts{ + Namespace: Namespace, + Name: "healthcheck_safe_lag_seconds", + Help: "Current safe-head lag, in seconds", + }), + healthCheckUnsafeHeadNumber: factory.NewGauge(prometheus.GaugeOpts{ + Namespace: Namespace, + Name: "healthcheck_unsafe_head_number", + Help: "Current unsafe L2 head block number observed by the conductor health monitor", + }), + healthCheckUnsafeHeadTimestamp: factory.NewGauge(prometheus.GaugeOpts{ + Namespace: Namespace, + Name: "healthcheck_unsafe_head_timestamp", + Help: "Current unsafe L2 head timestamp observed by the conductor health monitor", + }), + healthCheckSafeHeadNumber: factory.NewGauge(prometheus.GaugeOpts{ + Namespace: Namespace, + Name: "healthcheck_safe_head_number", + Help: "Current safe L2 head block number observed by the conductor health monitor", + }), + healthCheckSafeHeadTimestamp: factory.NewGauge(prometheus.GaugeOpts{ + Namespace: Namespace, + Name: "healthcheck_safe_head_timestamp", + Help: "Current safe L2 head timestamp observed by the conductor health monitor", + }), + healthCheckCLPeerCount: factory.NewGauge(prometheus.GaugeOpts{ + Namespace: Namespace, + Name: "healthcheck_cl_peer_count", + Help: "Current consensus-layer peer count observed by the conductor health monitor", + }), + healthCheckMinPeerCount: factory.NewGauge(prometheus.GaugeOpts{ + Namespace: Namespace, + Name: "healthcheck_min_peer_count", + Help: "Configured minimum consensus-layer peer count for the conductor health monitor", + }), + healthCheckUnsafeHeadRecoveryActive: factory.NewGauge(prometheus.GaugeOpts{ + Namespace: Namespace, + Name: "healthcheck_unsafe_head_recovery_active", + Help: "1 if unsafe-head recovery leniency is currently active", + }), + healthCheckUnsafeHeadRecoveryCurrentLag: factory.NewGauge(prometheus.GaugeOpts{ + Namespace: Namespace, + Name: "healthcheck_unsafe_head_recovery_current_lag_seconds", + Help: "Current unsafe-head lag while recovery leniency is active, in seconds", + }), + healthCheckUnsafeHeadRecoveryInitialLag: factory.NewGauge(prometheus.GaugeOpts{ + Namespace: Namespace, + Name: "healthcheck_unsafe_head_recovery_initial_lag_seconds", + Help: "Initial unsafe-head lag when the current recovery episode began, in seconds", + }), + healthCheckUnsafeHeadRecoveryWindowStartLag: factory.NewGauge(prometheus.GaugeOpts{ + Namespace: Namespace, + Name: "healthcheck_unsafe_head_recovery_window_start_lag_seconds", + Help: "Unsafe-head lag at the start of the current recovery window, in seconds", + }), + healthCheckUnsafeHeadRecoveryWallElapsed: factory.NewGauge(prometheus.GaugeOpts{ + Namespace: Namespace, + Name: "healthcheck_unsafe_head_recovery_wall_elapsed_seconds", + Help: "Wall-clock elapsed time in the current unsafe-head recovery window, in seconds", + }), + healthCheckUnsafeHeadRecoveryUnsafeElapsed: factory.NewGauge(prometheus.GaugeOpts{ + Namespace: Namespace, + Name: "healthcheck_unsafe_head_recovery_unsafe_elapsed_seconds", + Help: "Unsafe-head timestamp elapsed time in the current recovery window, in seconds", + }), + healthCheckUnsafeHeadRecoveryPolls: factory.NewGauge(prometheus.GaugeOpts{ + Namespace: Namespace, + Name: "healthcheck_unsafe_head_recovery_polls", + Help: "Number of health-check polls in the current unsafe-head recovery episode", + }), + healthCheckUnsafeHeadRecoveryPollsInWindow: factory.NewGauge(prometheus.GaugeOpts{ + Namespace: Namespace, + Name: "healthcheck_unsafe_head_recovery_polls_in_window", + Help: "Number of health-check polls in the current unsafe-head recovery window", + }), + healthCheckWindowState: factory.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: Namespace, + Name: "healthcheck_window_state", + Help: "One-hot state of a conductor health-check rolling window", + }, []string{"check", "state"}), + healthCheckWindowObservations: factory.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: Namespace, + Name: "healthcheck_window_observations", + Help: "Current conductor health-check rolling-window observations by result", + }, []string{"check", "result"}), + healthCheckWindowFilled: factory.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: Namespace, + Name: "healthcheck_window_filled", + Help: "1 if a conductor health-check rolling window has reached its configured size", + }, []string{"check"}), + healthCheckWindowSize: factory.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: Namespace, + Name: "healthcheck_window_size", + Help: "Configured size of a conductor health-check rolling window", + }, []string{"check"}), + healthCheckStatus: factory.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: Namespace, + Name: "healthcheck_check_status", + Help: "One-hot conductor health-check status by bounded check and status labels", + }, []string{"check", "status"}), + healthCheckFailures: factory.NewCounterVec(prometheus.CounterOpts{ + Namespace: Namespace, + Name: "healthcheck_check_failures_count", + Help: "Number of conductor health-check failures by bounded check and reason labels", + }, []string{"check", "reason"}), + healthCheckRecoveryEvents: factory.NewCounterVec(prometheus.CounterOpts{ + Namespace: Namespace, + Name: "healthcheck_interop_reorg_recovery_events_count", + Help: "Number of unsafe-head recovery events from conductor interop reorg health-check leniency", + }, []string{"event"}), } } @@ -183,3 +411,80 @@ func (m *Metrics) RecordRollupBoostConnectionAttempts(success bool, source strin func (m *Metrics) RecordWebSocketClientCount(count int) { m.webSocketClients.Set(float64(count)) } + +func (m *Metrics) RecordHealthCheckConfig(interval, unsafeInterval, safeInterval, minPeerCount, interopReorgLeniencyWindowSize uint64, safeEnabled, interopReorgLeniency bool) { + m.healthCheckIntervalSeconds.Set(float64(interval)) + m.healthCheckUnsafeIntervalSeconds.Set(float64(unsafeInterval)) + m.healthCheckSafeIntervalSeconds.Set(float64(safeInterval)) + m.healthCheckSafeEnabled.Set(boolToFloat(safeEnabled)) + m.healthCheckInteropReorgLeniencyEnabled.Set(boolToFloat(interopReorgLeniency)) + m.healthCheckInteropReorgLeniencyWindowSize.Set(float64(interopReorgLeniencyWindowSize)) + m.healthCheckMinPeerCount.Set(float64(minPeerCount)) +} + +func (m *Metrics) RecordHealthCheckHeads(unsafeNumber, unsafeTimestamp, safeNumber, safeTimestamp, unsafeLag, safeLag uint64) { + m.healthCheckUnsafeHeadNumber.Set(float64(unsafeNumber)) + m.healthCheckUnsafeHeadTimestamp.Set(float64(unsafeTimestamp)) + m.healthCheckUnsafeLagSeconds.Set(float64(unsafeLag)) + m.healthCheckSafeHeadNumber.Set(float64(safeNumber)) + m.healthCheckSafeHeadTimestamp.Set(float64(safeTimestamp)) + m.healthCheckSafeLagSeconds.Set(float64(safeLag)) +} + +func (m *Metrics) RecordHealthCheckPeerCount(peerCount, minPeerCount uint64) { + m.healthCheckCLPeerCount.Set(float64(peerCount)) + m.healthCheckMinPeerCount.Set(float64(minPeerCount)) +} + +func (m *Metrics) RecordHealthCheckWindow(check HealthCheck, state HealthCheckWindowState, successes, failures, windowSize uint64) { + for _, windowState := range []HealthCheckWindowState{ + HealthCheckWindowStateSuccess, + HealthCheckWindowStateFailed, + HealthCheckWindowStateInconclusive, + } { + m.healthCheckWindowState.WithLabelValues(string(check), string(windowState)).Set(boolToFloat(windowState == state)) + } + + m.healthCheckWindowObservations.WithLabelValues(string(check), "success").Set(float64(successes)) + m.healthCheckWindowObservations.WithLabelValues(string(check), "failure").Set(float64(failures)) + m.healthCheckWindowSize.WithLabelValues(string(check)).Set(float64(windowSize)) + m.healthCheckWindowFilled.WithLabelValues(string(check)).Set(boolToFloat(successes+failures >= windowSize)) +} + +func (m *Metrics) RecordHealthCheckStatus(check HealthCheck, status HealthCheckStatus) { + for _, checkStatus := range []HealthCheckStatus{ + HealthCheckStatusHealthy, + HealthCheckStatusWarning, + HealthCheckStatusRecovering, + HealthCheckStatusUnhealthy, + HealthCheckStatusDisabled, + } { + m.healthCheckStatus.WithLabelValues(string(check), string(checkStatus)).Set(boolToFloat(checkStatus == status)) + } +} + +func (m *Metrics) RecordHealthCheckFailure(check HealthCheck, reason HealthCheckFailureReason) { + m.healthCheckFailures.WithLabelValues(string(check), string(reason)).Inc() +} + +func (m *Metrics) RecordUnsafeHeadRecovery(active bool, currentLag, initialLag, windowStartLag, wallElapsed, unsafeElapsed, polls, pollsInWindow uint64) { + m.healthCheckUnsafeHeadRecoveryActive.Set(boolToFloat(active)) + m.healthCheckUnsafeHeadRecoveryCurrentLag.Set(float64(currentLag)) + m.healthCheckUnsafeHeadRecoveryInitialLag.Set(float64(initialLag)) + m.healthCheckUnsafeHeadRecoveryWindowStartLag.Set(float64(windowStartLag)) + m.healthCheckUnsafeHeadRecoveryWallElapsed.Set(float64(wallElapsed)) + m.healthCheckUnsafeHeadRecoveryUnsafeElapsed.Set(float64(unsafeElapsed)) + m.healthCheckUnsafeHeadRecoveryPolls.Set(float64(polls)) + m.healthCheckUnsafeHeadRecoveryPollsInWindow.Set(float64(pollsInWindow)) +} + +func (m *Metrics) RecordUnsafeHeadRecoveryEvent(event HealthCheckRecoveryEvent) { + m.healthCheckRecoveryEvents.WithLabelValues(string(event)).Inc() +} + +func boolToFloat(v bool) float64 { + if v { + return 1 + } + return 0 +} diff --git a/op-conductor/metrics/metrics_test.go b/op-conductor/metrics/metrics_test.go new file mode 100644 index 00000000000..88b0345cf6b --- /dev/null +++ b/op-conductor/metrics/metrics_test.go @@ -0,0 +1,130 @@ +package metrics + +import ( + "testing" + + dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/require" +) + +func TestHealthCheckDebugGauges(t *testing.T) { + metricer := NewMetrics() + + metricer.RecordHealthCheckConfig(1, 10, 60, 2, 7, true, true) + metricer.RecordHealthCheckHeads(100, 1234, 90, 1200, 5, 39) + metricer.RecordHealthCheckPeerCount(3, 2) + metricer.RecordUnsafeHeadRecovery(true, 12, 20, 15, 4, 8, 3, 2) + + require.Equal(t, float64(1), metricValue(t, metricer, "op_conductor_healthcheck_interval_seconds", nil)) + require.Equal(t, float64(10), metricValue(t, metricer, "op_conductor_healthcheck_unsafe_interval_seconds", nil)) + require.Equal(t, float64(60), metricValue(t, metricer, "op_conductor_healthcheck_safe_interval_seconds", nil)) + require.Equal(t, float64(1), metricValue(t, metricer, "op_conductor_healthcheck_safe_enabled", nil)) + require.Equal(t, float64(1), metricValue(t, metricer, "op_conductor_healthcheck_interop_reorg_leniency_enabled", nil)) + require.Equal(t, float64(7), metricValue(t, metricer, "op_conductor_healthcheck_interop_reorg_leniency_window_size", nil)) + require.Equal(t, float64(100), metricValue(t, metricer, "op_conductor_healthcheck_unsafe_head_number", nil)) + require.Equal(t, float64(1234), metricValue(t, metricer, "op_conductor_healthcheck_unsafe_head_timestamp", nil)) + require.Equal(t, float64(5), metricValue(t, metricer, "op_conductor_healthcheck_unsafe_lag_seconds", nil)) + require.Equal(t, float64(90), metricValue(t, metricer, "op_conductor_healthcheck_safe_head_number", nil)) + require.Equal(t, float64(1200), metricValue(t, metricer, "op_conductor_healthcheck_safe_head_timestamp", nil)) + require.Equal(t, float64(39), metricValue(t, metricer, "op_conductor_healthcheck_safe_lag_seconds", nil)) + require.Equal(t, float64(3), metricValue(t, metricer, "op_conductor_healthcheck_cl_peer_count", nil)) + require.Equal(t, float64(2), metricValue(t, metricer, "op_conductor_healthcheck_min_peer_count", nil)) + require.Equal(t, float64(1), metricValue(t, metricer, "op_conductor_healthcheck_unsafe_head_recovery_active", nil)) + require.Equal(t, float64(12), metricValue(t, metricer, "op_conductor_healthcheck_unsafe_head_recovery_current_lag_seconds", nil)) + require.Equal(t, float64(20), metricValue(t, metricer, "op_conductor_healthcheck_unsafe_head_recovery_initial_lag_seconds", nil)) + require.Equal(t, float64(15), metricValue(t, metricer, "op_conductor_healthcheck_unsafe_head_recovery_window_start_lag_seconds", nil)) + require.Equal(t, float64(4), metricValue(t, metricer, "op_conductor_healthcheck_unsafe_head_recovery_wall_elapsed_seconds", nil)) + require.Equal(t, float64(8), metricValue(t, metricer, "op_conductor_healthcheck_unsafe_head_recovery_unsafe_elapsed_seconds", nil)) + require.Equal(t, float64(3), metricValue(t, metricer, "op_conductor_healthcheck_unsafe_head_recovery_polls", nil)) + require.Equal(t, float64(2), metricValue(t, metricer, "op_conductor_healthcheck_unsafe_head_recovery_polls_in_window", nil)) +} + +func TestHealthCheckBoundedLabelMetrics(t *testing.T) { + metricer := NewMetrics() + + metricer.RecordHealthCheckStatus(HealthCheckUnsafeLag, HealthCheckStatusRecovering) + metricer.RecordHealthCheckWindow(HealthCheckSyncStatusRPC, HealthCheckWindowStateInconclusive, 2, 3, 5) + metricer.RecordHealthCheckFailure(HealthCheckPeerCount, HealthCheckFailureReasonPeerCountBelowMin) + metricer.RecordUnsafeHeadRecoveryEvent(HealthCheckRecoveryEntered) + + require.Equal(t, float64(1), metricValue(t, metricer, "op_conductor_healthcheck_check_status", map[string]string{ + "check": string(HealthCheckUnsafeLag), + "status": string(HealthCheckStatusRecovering), + })) + require.Equal(t, float64(0), metricValue(t, metricer, "op_conductor_healthcheck_check_status", map[string]string{ + "check": string(HealthCheckUnsafeLag), + "status": string(HealthCheckStatusUnhealthy), + })) + require.Equal(t, float64(1), metricValue(t, metricer, "op_conductor_healthcheck_window_state", map[string]string{ + "check": string(HealthCheckSyncStatusRPC), + "state": string(HealthCheckWindowStateInconclusive), + })) + require.Equal(t, float64(0), metricValue(t, metricer, "op_conductor_healthcheck_window_state", map[string]string{ + "check": string(HealthCheckSyncStatusRPC), + "state": string(HealthCheckWindowStateFailed), + })) + require.Equal(t, float64(2), metricValue(t, metricer, "op_conductor_healthcheck_window_observations", map[string]string{ + "check": string(HealthCheckSyncStatusRPC), + "result": "success", + })) + require.Equal(t, float64(3), metricValue(t, metricer, "op_conductor_healthcheck_window_observations", map[string]string{ + "check": string(HealthCheckSyncStatusRPC), + "result": "failure", + })) + require.Equal(t, float64(1), metricValue(t, metricer, "op_conductor_healthcheck_window_filled", map[string]string{ + "check": string(HealthCheckSyncStatusRPC), + })) + require.Equal(t, float64(5), metricValue(t, metricer, "op_conductor_healthcheck_window_size", map[string]string{ + "check": string(HealthCheckSyncStatusRPC), + })) + require.Equal(t, float64(1), metricValue(t, metricer, "op_conductor_healthcheck_check_failures_count", map[string]string{ + "check": string(HealthCheckPeerCount), + "reason": string(HealthCheckFailureReasonPeerCountBelowMin), + })) + require.Equal(t, float64(1), metricValue(t, metricer, "op_conductor_healthcheck_interop_reorg_recovery_events_count", map[string]string{ + "event": string(HealthCheckRecoveryEntered), + })) +} + +func metricValue(t *testing.T, metricer *Metrics, name string, labels map[string]string) float64 { + t.Helper() + metricFamilies, err := metricer.Registry().Gather() + require.NoError(t, err) + for _, family := range metricFamilies { + if family.GetName() != name { + continue + } + for _, metric := range family.GetMetric() { + if labelsMatch(metric, labels) { + return sampleValue(t, metric) + } + } + } + t.Fatalf("metric %s with labels %v not found", name, labels) + return 0 +} + +func labelsMatch(metric *dto.Metric, labels map[string]string) bool { + actual := make(map[string]string, len(metric.GetLabel())) + for _, label := range metric.GetLabel() { + actual[label.GetName()] = label.GetValue() + } + for key, want := range labels { + if actual[key] != want { + return false + } + } + return len(actual) == len(labels) +} + +func sampleValue(t *testing.T, metric *dto.Metric) float64 { + t.Helper() + if metric.GetGauge() != nil { + return metric.GetGauge().GetValue() + } + if metric.GetCounter() != nil { + return metric.GetCounter().GetValue() + } + t.Fatalf("metric has no gauge or counter sample") + return 0 +} diff --git a/op-conductor/metrics/noop.go b/op-conductor/metrics/noop.go index 832d7258875..fa2a4ecd72e 100644 --- a/op-conductor/metrics/noop.go +++ b/op-conductor/metrics/noop.go @@ -18,3 +18,16 @@ func (*NoopMetricsImpl) RecordHealthCheck(success bool, err error) func (*NoopMetricsImpl) RecordLoopExecutionTime(duration float64) {} func (*NoopMetricsImpl) RecordRollupBoostConnectionAttempts(success bool, source string) {} func (*NoopMetricsImpl) RecordWebSocketClientCount(count int) {} +func (*NoopMetricsImpl) RecordHealthCheckConfig(interval, unsafeInterval, safeInterval, minPeerCount, interopReorgLeniencyWindowSize uint64, safeEnabled, interopReorgLeniency bool) { +} +func (*NoopMetricsImpl) RecordHealthCheckHeads(unsafeNumber, unsafeTimestamp, safeNumber, safeTimestamp, unsafeLag, safeLag uint64) { +} +func (*NoopMetricsImpl) RecordHealthCheckPeerCount(peerCount, minPeerCount uint64) {} +func (*NoopMetricsImpl) RecordHealthCheckWindow(check HealthCheck, state HealthCheckWindowState, successes, failures, windowSize uint64) { +} +func (*NoopMetricsImpl) RecordHealthCheckStatus(check HealthCheck, status HealthCheckStatus) {} +func (*NoopMetricsImpl) RecordHealthCheckFailure(check HealthCheck, reason HealthCheckFailureReason) { +} +func (*NoopMetricsImpl) RecordUnsafeHeadRecovery(active bool, currentLag, initialLag, windowStartLag, wallElapsed, unsafeElapsed, polls, pollsInWindow uint64) { +} +func (*NoopMetricsImpl) RecordUnsafeHeadRecoveryEvent(event HealthCheckRecoveryEvent) {} diff --git a/op-devstack/presets/option_validation.go b/op-devstack/presets/option_validation.go index 06a132c95aa..fb15e6f9503 100644 --- a/op-devstack/presets/option_validation.go +++ b/op-devstack/presets/option_validation.go @@ -30,6 +30,7 @@ const ( optionKindInteropFilter optionKindPreGenesisSuperGame optionKindSkipHonestProposer + optionKindConductor ) const allOptionKinds = optionKindDeployer | @@ -50,7 +51,8 @@ const allOptionKinds = optionKindDeployer | optionKindInteropLogBackfill | optionKindInteropFilter | optionKindPreGenesisSuperGame | - optionKindSkipHonestProposer + optionKindSkipHonestProposer | + optionKindConductor var optionKindLabels = []struct { kind optionKinds @@ -75,6 +77,7 @@ var optionKindLabels = []struct { {kind: optionKindInteropFilter, label: "interop filter"}, {kind: optionKindPreGenesisSuperGame, label: "pre-genesis super game"}, {kind: optionKindSkipHonestProposer, label: "skip honest proposer"}, + {kind: optionKindConductor, label: "conductor options"}, } func (k optionKinds) String() string { @@ -130,7 +133,8 @@ const minimalWithConductorsPresetSupportedOptionKinds = optionKindDeployer | optionKindRespectedGameType | optionKindTimeTravel | optionKindAfterBuild | - optionKindProofValidation + optionKindProofValidation | + optionKindConductor const simpleWithSyncTesterPresetSupportedOptionKinds = minimalPresetSupportedOptionKinds | optionKindGlobalSyncTesterEL @@ -156,5 +160,8 @@ const twoL2SupernodeInteropPresetSupportedOptionKinds = optionKindDeployer | optionKindInteropFilter | optionKindPreGenesisSuperGame +const twoL2SupernodeInteropWithConductorsPresetSupportedOptionKinds = twoL2SupernodeInteropPresetSupportedOptionKinds | + optionKindConductor + const singleChainWithFlashblocksPresetSupportedOptionKinds = optionKindDeployer | optionKindOPRBuilder diff --git a/op-devstack/presets/options.go b/op-devstack/presets/options.go index eabcc890df3..f951cd81a30 100644 --- a/op-devstack/presets/options.go +++ b/op-devstack/presets/options.go @@ -4,6 +4,7 @@ import ( "time" gameTypes "github.com/ethereum-optimism/optimism/op-challenger/game/types" + opconductor "github.com/ethereum-optimism/optimism/op-conductor/conductor" "github.com/ethereum-optimism/optimism/op-devstack/sysgo" "github.com/ethereum-optimism/optimism/op-service/eth" ) @@ -260,6 +261,63 @@ func WithInteropFilter() Option { } } +func WithConductorHealthCheck(interval, unsafeInterval, safeInterval uint64) Option { + return option{ + kinds: optionKindConductor, + applyFn: func(cfg *sysgo.PresetConfig) { + minPeerCount := uint64(1) + interopReorgLeniency := false + interopReorgLeniencyWindowSize := uint64(5) + if cfg.ConductorHealthCheck != nil { + minPeerCount = cfg.ConductorHealthCheck.MinPeerCount + interopReorgLeniency = cfg.ConductorHealthCheck.InteropReorgLeniency + if cfg.ConductorHealthCheck.InteropReorgLeniencyWindowSize != 0 { + interopReorgLeniencyWindowSize = cfg.ConductorHealthCheck.InteropReorgLeniencyWindowSize + } + } + cfg.ConductorHealthCheck = &opconductor.HealthCheckConfig{ + Interval: interval, + UnsafeInterval: unsafeInterval, + SafeInterval: safeInterval, + MinPeerCount: minPeerCount, + InteropReorgLeniency: interopReorgLeniency, + InteropReorgLeniencyWindowSize: interopReorgLeniencyWindowSize, + } + }, + } +} + +func WithConductorHealthCheckMinPeerCount(minPeerCount uint64) Option { + return option{ + kinds: optionKindConductor, + applyFn: func(cfg *sysgo.PresetConfig) { + ensureConductorHealthCheck(cfg).MinPeerCount = minPeerCount + }, + } +} + +func WithConductorInteropReorgLeniency() Option { + return option{ + kinds: optionKindConductor, + applyFn: func(cfg *sysgo.PresetConfig) { + ensureConductorHealthCheck(cfg).InteropReorgLeniency = true + }, + } +} + +func ensureConductorHealthCheck(cfg *sysgo.PresetConfig) *opconductor.HealthCheckConfig { + if cfg.ConductorHealthCheck == nil { + cfg.ConductorHealthCheck = &opconductor.HealthCheckConfig{ + Interval: 3600, + UnsafeInterval: 3600, + SafeInterval: 3600, + MinPeerCount: 1, + InteropReorgLeniencyWindowSize: 5, + } + } + return cfg.ConductorHealthCheck +} + func WithRequireInteropNotAtGenesis() Option { return option{ kinds: optionKindRequireInteropNotAtGen, diff --git a/op-devstack/presets/options_test.go b/op-devstack/presets/options_test.go index 749684bc7c0..fdcf30bfc09 100644 --- a/op-devstack/presets/options_test.go +++ b/op-devstack/presets/options_test.go @@ -32,6 +32,13 @@ func TestOptionKindsFromCompositeOptions(t *testing.T) { ) }) + t.Run("WithConductorInteropReorgLeniency", func(t *testing.T) { + require.Equal(t, + optionKindConductor, + WithConductorInteropReorgLeniency().optionKinds(), + ) + }) + t.Run("nil adapters do not claim support kinds", func(t *testing.T) { require.Zero(t, WithDeployerOptions(nil).optionKinds()) require.Zero(t, WithLocalContractSourcesAt("").optionKinds()) @@ -50,6 +57,82 @@ func TestWithLocalContractSourcesAt(t *testing.T) { require.Equal(t, "/tmp/contracts-bedrock", cfg.LocalContractArtifactsPath) } +func TestWithConductorInteropReorgLeniency(t *testing.T) { + t.Run("sets leniency with default health check intervals", func(t *testing.T) { + cfg, _ := collectPresetConfig([]Option{WithConductorInteropReorgLeniency()}) + require.NotNil(t, cfg.ConductorHealthCheck) + require.NoError(t, cfg.ConductorHealthCheck.Check()) + require.True(t, cfg.ConductorHealthCheck.InteropReorgLeniency) + require.Equal(t, uint64(5), cfg.ConductorHealthCheck.InteropReorgLeniencyWindowSize) + }) + + t.Run("preserves leniency when health check option follows", func(t *testing.T) { + cfg, _ := collectPresetConfig([]Option{ + WithConductorInteropReorgLeniency(), + WithConductorHealthCheck(5, 6, 7), + }) + require.NotNil(t, cfg.ConductorHealthCheck) + require.Equal(t, uint64(5), cfg.ConductorHealthCheck.Interval) + require.Equal(t, uint64(6), cfg.ConductorHealthCheck.UnsafeInterval) + require.Equal(t, uint64(7), cfg.ConductorHealthCheck.SafeInterval) + require.True(t, cfg.ConductorHealthCheck.InteropReorgLeniency) + require.Equal(t, uint64(5), cfg.ConductorHealthCheck.InteropReorgLeniencyWindowSize) + }) + + t.Run("sets leniency when applied after health check option", func(t *testing.T) { + cfg, _ := collectPresetConfig([]Option{ + WithConductorHealthCheck(5, 6, 7), + WithConductorInteropReorgLeniency(), + }) + require.NotNil(t, cfg.ConductorHealthCheck) + require.Equal(t, uint64(5), cfg.ConductorHealthCheck.Interval) + require.Equal(t, uint64(6), cfg.ConductorHealthCheck.UnsafeInterval) + require.Equal(t, uint64(7), cfg.ConductorHealthCheck.SafeInterval) + require.True(t, cfg.ConductorHealthCheck.InteropReorgLeniency) + require.Equal(t, uint64(5), cfg.ConductorHealthCheck.InteropReorgLeniencyWindowSize) + }) +} + +func TestWithConductorHealthCheckMinPeerCount(t *testing.T) { + t.Run("health check option defaults min peer count to one", func(t *testing.T) { + cfg, _ := collectPresetConfig([]Option{WithConductorHealthCheck(5, 6, 7)}) + require.NotNil(t, cfg.ConductorHealthCheck) + require.Equal(t, uint64(1), cfg.ConductorHealthCheck.MinPeerCount) + require.Equal(t, uint64(5), cfg.ConductorHealthCheck.InteropReorgLeniencyWindowSize) + }) + + t.Run("sets min peer count with default intervals", func(t *testing.T) { + cfg, _ := collectPresetConfig([]Option{WithConductorHealthCheckMinPeerCount(2)}) + require.NotNil(t, cfg.ConductorHealthCheck) + require.NoError(t, cfg.ConductorHealthCheck.Check()) + require.Equal(t, uint64(2), cfg.ConductorHealthCheck.MinPeerCount) + }) + + t.Run("preserves min peer count when health check option follows", func(t *testing.T) { + cfg, _ := collectPresetConfig([]Option{ + WithConductorHealthCheckMinPeerCount(2), + WithConductorHealthCheck(5, 6, 7), + }) + require.NotNil(t, cfg.ConductorHealthCheck) + require.Equal(t, uint64(5), cfg.ConductorHealthCheck.Interval) + require.Equal(t, uint64(6), cfg.ConductorHealthCheck.UnsafeInterval) + require.Equal(t, uint64(7), cfg.ConductorHealthCheck.SafeInterval) + require.Equal(t, uint64(2), cfg.ConductorHealthCheck.MinPeerCount) + }) + + t.Run("overrides min peer count when applied after health check option", func(t *testing.T) { + cfg, _ := collectPresetConfig([]Option{ + WithConductorHealthCheck(5, 6, 7), + WithConductorHealthCheckMinPeerCount(3), + }) + require.NotNil(t, cfg.ConductorHealthCheck) + require.Equal(t, uint64(5), cfg.ConductorHealthCheck.Interval) + require.Equal(t, uint64(6), cfg.ConductorHealthCheck.UnsafeInterval) + require.Equal(t, uint64(7), cfg.ConductorHealthCheck.SafeInterval) + require.Equal(t, uint64(3), cfg.ConductorHealthCheck.MinPeerCount) + }) +} + func TestUnsupportedPresetOptionKinds(t *testing.T) { builderOpt := sysgo.OPRBuilderNodeOptionFn(func(devtest.CommonT, sysgo.ComponentTarget, *sysgo.OPRBuilderNodeConfig) {}) diff --git a/op-devstack/presets/twol2.go b/op-devstack/presets/twol2.go index ff1d3444b85..c9b8a9e6fa7 100644 --- a/op-devstack/presets/twol2.go +++ b/op-devstack/presets/twol2.go @@ -95,6 +95,12 @@ func (s *TwoL2SupernodeInterop) L2UserRPCURLs() []string { return []string{s.L2ELA.Escape().UserRPC(), s.L2ELB.Escape().UserRPC()} } +type TwoL2SupernodeInteropWithConductors struct { + *TwoL2SupernodeInterop + + ConductorSets map[eth.ChainID]dsl.ConductorSet +} + // AdvanceTime advances the time-travel clock if enabled. func (s *TwoL2SupernodeInterop) AdvanceTime(amount time.Duration) { s.T.Require().NotNil(s.timeTravel, "attempting to advance time on incompatible system") @@ -120,6 +126,13 @@ func NewTwoL2SupernodeInterop(t devtest.T, delaySeconds uint64, opts ...Option) return twoL2SupernodeInteropFromRuntime(t, sysgo.NewTwoL2SupernodeInteropRuntimeWithConfig(t, delaySeconds, presetCfg)) } +func NewTwoL2SupernodeInteropWithConductors(t devtest.T, delaySeconds uint64, opts ...Option) *TwoL2SupernodeInteropWithConductors { + presetCfg, presetOpts := collectSupportedPresetConfig(t, "NewTwoL2SupernodeInteropWithConductors", opts, twoL2SupernodeInteropWithConductorsPresetSupportedOptionKinds) + out := twoL2SupernodeInteropWithConductorsFromRuntime(t, sysgo.NewTwoL2SupernodeInteropWithConductorsRuntimeWithConfig(t, delaySeconds, presetCfg)) + presetOpts.applyPreset(out) + return out +} + // ============================================================================= // Same-Timestamp Test Setup // ============================================================================= diff --git a/op-devstack/presets/twol2_from_runtime.go b/op-devstack/presets/twol2_from_runtime.go index 68865b2ee3d..db4a73c4399 100644 --- a/op-devstack/presets/twol2_from_runtime.go +++ b/op-devstack/presets/twol2_from_runtime.go @@ -1,9 +1,13 @@ package presets import ( + "sort" + "github.com/ethereum-optimism/optimism/op-devstack/devtest" "github.com/ethereum-optimism/optimism/op-devstack/dsl" + "github.com/ethereum-optimism/optimism/op-devstack/stack" "github.com/ethereum-optimism/optimism/op-devstack/sysgo" + "github.com/ethereum-optimism/optimism/op-service/eth" ) type twoL2RuntimeComponents struct { @@ -151,6 +155,45 @@ func twoL2SupernodeInteropFromRuntime(t devtest.T, runtime *sysgo.MultiChainRunt return preset } +func twoL2SupernodeInteropWithConductorsFromRuntime(t devtest.T, runtime *sysgo.MultiChainRuntime) *TwoL2SupernodeInteropWithConductors { + base := twoL2SupernodeInteropFromRuntime(t, runtime) + chainA := runtime.Chains["l2a"] + chainB := runtime.Chains["l2b"] + t.Require().NotNil(chainA, "missing l2a supernode chain") + t.Require().NotNil(chainB, "missing l2b supernode chain") + t.Require().NotNil(chainA.Conductors, "missing l2a conductors") + t.Require().NotNil(chainB.Conductors, "missing l2b conductors") + + conductorSets := map[eth.ChainID]dsl.ConductorSet{ + chainA.Network.ChainID(): addConductorsToL2Network(t, base.L2A, chainA.Network.ChainID(), chainA.Conductors), + chainB.Network.ChainID(): addConductorsToL2Network(t, base.L2B, chainB.Network.ChainID(), chainB.Conductors), + } + return &TwoL2SupernodeInteropWithConductors{ + TwoL2SupernodeInterop: base, + ConductorSets: conductorSets, + } +} + +func addConductorsToL2Network(t devtest.T, l2 *dsl.L2Network, chainID eth.ChainID, conductors map[string]*sysgo.Conductor) dsl.ConductorSet { + names := make([]string, 0, len(conductors)) + for name := range conductors { + names = append(names, name) + } + sort.Strings(names) + + frontends := make([]stack.Conductor, 0, len(names)) + l2Net, ok := l2.Escape().(*presetL2Network) + t.Require().True(ok, "expected preset L2 network") + for _, name := range names { + conductor := conductors[name] + t.Require().NotNil(conductor, "missing conductor %s", name) + frontend := newConductorFrontend(t, name, chainID, conductor.HTTPEndpoint()) + l2Net.AddConductor(frontend) + frontends = append(frontends, frontend) + } + return dsl.NewConductorSet(frontends) +} + func twoL2SupernodeFollowL2FromRuntime(t devtest.T, runtime *sysgo.MultiChainRuntime) *TwoL2SupernodeFollowL2 { base := twoL2SupernodeInteropFromRuntime(t, runtime) chainA := runtime.Chains["l2a"] diff --git a/op-devstack/sysgo/l2_cl_p2p_util.go b/op-devstack/sysgo/l2_cl_p2p_util.go index b5fda39a510..a0837fafae6 100644 --- a/op-devstack/sysgo/l2_cl_p2p_util.go +++ b/op-devstack/sysgo/l2_cl_p2p_util.go @@ -11,7 +11,6 @@ import ( "github.com/ethereum-optimism/optimism/op-service/client" "github.com/ethereum-optimism/optimism/op-service/retry" "github.com/ethereum-optimism/optimism/op-service/sources" - "github.com/ethereum-optimism/optimism/op-service/testreq" ) func GetP2PClient(ctx context.Context, logger log.Logger, l2CLNode L2CLNode) (*sources.P2PClient, error) { @@ -51,32 +50,3 @@ func GetPeers(ctx context.Context, p2pClient *sources.P2PClient) (*apis.PeerDump } return peerDump, nil } - -type p2pClientsAndPeers struct { - client1 *sources.P2PClient - client2 *sources.P2PClient - peerInfo1 *apis.PeerInfo - peerInfo2 *apis.PeerInfo -} - -func getP2PClientsAndPeers(ctx context.Context, logger log.Logger, - require *testreq.Assertions, l2CL1, l2CL2 L2CLNode) *p2pClientsAndPeers { - p2pClient1, err := GetP2PClient(ctx, logger, l2CL1) - require.NoError(err) - p2pClient2, err := GetP2PClient(ctx, logger, l2CL2) - require.NoError(err) - - peerInfo1, err := GetPeerInfo(ctx, p2pClient1) - require.NoError(err) - peerInfo2, err := GetPeerInfo(ctx, p2pClient2) - require.NoError(err) - - require.True(len(peerInfo1.Addresses) > 0 && len(peerInfo2.Addresses) > 0, "malformed peer info") - - return &p2pClientsAndPeers{ - client1: p2pClient1, - client2: p2pClient2, - peerInfo1: peerInfo1, - peerInfo2: peerInfo2, - } -} diff --git a/op-devstack/sysgo/l2_conductor.go b/op-devstack/sysgo/l2_conductor.go index d28fafbbaa6..60fd685600b 100644 --- a/op-devstack/sysgo/l2_conductor.go +++ b/op-devstack/sysgo/l2_conductor.go @@ -9,10 +9,11 @@ type Conductor struct { name string chainID eth.ChainID - serverID string - consensusEndpoint string - rpcEndpoint string - service *opconductor.OpConductor + serverID string + consensusEndpoint string + rpcEndpoint string + initialUnsafePayload *eth.ExecutionPayloadEnvelope + service *opconductor.OpConductor } func (c *Conductor) ServerID() string { diff --git a/op-devstack/sysgo/multichain_proofs.go b/op-devstack/sysgo/multichain_proofs.go index 38662def65d..d2ac40a32ff 100644 --- a/op-devstack/sysgo/multichain_proofs.go +++ b/op-devstack/sysgo/multichain_proofs.go @@ -150,7 +150,7 @@ func attachSuperChallengerAndProposer( func NewTwoL2SupernodeProofsRuntimeWithConfig(t devtest.T, interopAtGenesis bool, cfg PresetConfig) *MultiChainRuntime { cfg = withSuperProofsDeployerFeature(cfg) - runtime, _ := newTwoL2SupernodeRuntimeWithConfig(t, interopAtGenesis, 0, cfg) + runtime, _ := newTwoL2SupernodeRuntimeWithConfig(t, interopAtGenesis, 0, cfg, false) attachTestSequencerToRuntime(t, runtime, "test-sequencer-2l2") return attachSupernodeSuperProofs(t, runtime, cfg) } diff --git a/op-devstack/sysgo/multichain_supernode_runtime.go b/op-devstack/sysgo/multichain_supernode_runtime.go index dfbfefdd538..ebeb37c0001 100644 --- a/op-devstack/sysgo/multichain_supernode_runtime.go +++ b/op-devstack/sysgo/multichain_supernode_runtime.go @@ -6,6 +6,7 @@ import ( "fmt" "sort" "strconv" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/common/hexutil" @@ -66,7 +67,7 @@ func NewTwoL2SupernodeInteropRuntime(t devtest.T, delaySeconds uint64) *MultiCha } func NewTwoL2SupernodeInteropRuntimeWithConfig(t devtest.T, delaySeconds uint64, cfg PresetConfig) *MultiChainRuntime { - base, activationTime := newTwoL2SupernodeRuntimeWithConfig(t, true, delaySeconds, cfg) + base, activationTime := newTwoL2SupernodeRuntimeWithConfig(t, true, delaySeconds, cfg, false) chainA := base.Chains["l2a"] chainB := base.Chains["l2b"] t.Require().NotNil(chainA, "missing l2a supernode chain") @@ -83,6 +84,26 @@ func NewTwoL2SupernodeInteropRuntimeWithConfig(t devtest.T, delaySeconds uint64, return base } +func NewTwoL2SupernodeInteropWithConductorsRuntimeWithConfig(t devtest.T, delaySeconds uint64, cfg PresetConfig) *MultiChainRuntime { + base, activationTime := newTwoL2SupernodeRuntimeWithConfig(t, true, delaySeconds, cfg, true) + chainA := base.Chains["l2a"] + chainB := base.Chains["l2b"] + t.Require().NotNil(chainA, "missing l2a supernode chain") + t.Require().NotNil(chainB, "missing l2b supernode chain") + t.Require().NotNil(chainA.Conductors, "missing l2a conductors") + t.Require().NotNil(chainB.Conductors, "missing l2b conductors") + attachTestSequencerToRuntime(t, base, "test-sequencer-2l2") + + t.Logger().Info("configured supernode interop runtime with conductors", + "genesis_time", chainA.Network.rollupCfg.Genesis.L2Time, + "activation_time", activationTime, + "delay_seconds", delaySeconds, + ) + + base.DelaySeconds = delaySeconds + return base +} + func NewTwoL2SupernodeFollowL2RuntimeWithConfig(t devtest.T, delaySeconds uint64, cfg PresetConfig) *MultiChainRuntime { runtime := NewTwoL2SupernodeInteropRuntimeWithConfig(t, delaySeconds, cfg) addMultiChainFollowL2Node(t, runtime, "l2a", "follower") @@ -91,11 +112,11 @@ func NewTwoL2SupernodeFollowL2RuntimeWithConfig(t devtest.T, delaySeconds uint64 } func newTwoL2SupernodeRuntime(t devtest.T, enableInterop bool, delaySeconds uint64) (*MultiChainRuntime, uint64) { - return newTwoL2SupernodeRuntimeWithConfig(t, enableInterop, delaySeconds, PresetConfig{}) + return newTwoL2SupernodeRuntimeWithConfig(t, enableInterop, delaySeconds, PresetConfig{}, false) } func NewTwoL2SupernodeRuntimeWithConfig(t devtest.T, cfg PresetConfig) *MultiChainRuntime { - runtime, _ := newTwoL2SupernodeRuntimeWithConfig(t, false, 0, cfg) + runtime, _ := newTwoL2SupernodeRuntimeWithConfig(t, false, 0, cfg, false) return runtime } @@ -205,7 +226,7 @@ func newSingleChainSupernodeRuntimeWithConfig(t devtest.T, interopAtGenesis bool } } -func newTwoL2SupernodeRuntimeWithConfig(t devtest.T, enableInterop bool, delaySeconds uint64, cfg PresetConfig) (*MultiChainRuntime, uint64) { +func newTwoL2SupernodeRuntimeWithConfig(t devtest.T, enableInterop bool, delaySeconds uint64, cfg PresetConfig, enableConductors bool) (*MultiChainRuntime, uint64) { require := t.Require() keys, err := devkeys.NewMnemonicDevKeys(devkeys.TestMnemonic) @@ -279,7 +300,15 @@ func newTwoL2SupernodeRuntimeWithConfig(t devtest.T, enableInterop bool, delaySe require.NoError(err, "failed to override message expiry window") } - supernode, l2ACL, l2BCL := startTwoL2SharedSupernode( + var conductorEndpoints map[eth.ChainID]*atomic.Value + if enableConductors { + conductorEndpoints = map[eth.ChainID]*atomic.Value{ + l2ANet.ChainID(): newConductorRPCEndpoint(), + l2BNet.ChainID(): newConductorRPCEndpoint(), + } + } + + supernode, l2ASupernodeCL, l2BSupernodeCL := startTwoL2SharedSupernode( t, l1Net, l1EL, @@ -292,7 +321,56 @@ func newTwoL2SupernodeRuntimeWithConfig(t devtest.T, enableInterop bool, delaySe interopActivationTimestamp, cfg.InteropLogBackfillDepth, jwtSecret, + !enableConductors, ) + var l2ACL L2CLNode = l2ASupernodeCL + var l2BCL L2CLNode = l2BSupernodeCL + + var l2AConductors map[string]*Conductor + var l2BConductors map[string]*Conductor + var l2AFollowers map[string]*SingleChainNodeRuntime + var l2BFollowers map[string]*SingleChainNodeRuntime + if enableConductors { + conductorCfg := conductorConfigFromPreset(cfg) + var conductorNodeDepSet depset.DependencySet + if depSet != nil { + conductorNodeDepSet = depSet + } else { + conductorNodeDepSet = wb.outFullCfgSet.DependencySet + } + l2ACL = startConductorControlledSequencerCL(t, keys, l1Net, l1EL, l1CL, l2ANet, l2AEL, l2ASupernodeCL, conductorNodeDepSet, jwtSecret, conductorEndpoints[l2ANet.ChainID()]) + l2BCL = startConductorControlledSequencerCL(t, keys, l1Net, l1EL, l1CL, l2BNet, l2BEL, l2BSupernodeCL, conductorNodeDepSet, jwtSecret, conductorEndpoints[l2BNet.ChainID()]) + + l2AFollowers = startConductorHealthPeers(t, keys, l1Net, l1EL, l1CL, l2ANet, l2AEL, l2ACL, conductorNodeDepSet, conductorCfg.HealthCheck.MinPeerCount) + l2BFollowers = startConductorHealthPeers(t, keys, l1Net, l1EL, l1CL, l2BNet, l2BEL, l2BCL, conductorNodeDepSet, conductorCfg.HealthCheck.MinPeerCount) + + l2AConductor := startConductorForRPC( + t, + "sequencer", + l2ANet, + l2ACL.UserRPC(), + l2AEL.UserRPC(), + true, + false, + conductorEndpoints[l2ANet.ChainID()], + conductorCfg, + ) + l2BConductor := startConductorForRPC( + t, + "sequencer", + l2BNet, + l2BCL.UserRPC(), + l2BEL.UserRPC(), + true, + false, + conductorEndpoints[l2BNet.ChainID()], + conductorCfg, + ) + startConductorCluster(t, l2AConductor, nil) + startConductorCluster(t, l2BConductor, nil) + l2AConductors = map[string]*Conductor{"sequencer": l2AConductor} + l2BConductors = map[string]*Conductor{"sequencer": l2BConductor} + } l2ABatcher := startMinimalBatcher(t, keys, l2ANet, l1EL, l2ACL, l2AEL, cfg.BatcherOptions...) l2AProposer := startMinimalProposer(t, keys, l2ANet, l1EL, l2ACL) @@ -329,20 +407,24 @@ func newTwoL2SupernodeRuntimeWithConfig(t devtest.T, enableInterop bool, delaySe L1CL: l1CL, Chains: map[string]*MultiChainNodeRuntime{ "l2a": { - Name: "l2a", - Network: l2ANet, - EL: l2AEL, - CL: l2ACL, - Batcher: l2ABatcher, - Proposer: l2AProposer, + Name: "l2a", + Network: l2ANet, + EL: l2AEL, + CL: l2ACL, + Batcher: l2ABatcher, + Proposer: l2AProposer, + Followers: l2AFollowers, + Conductors: l2AConductors, }, "l2b": { - Name: "l2b", - Network: l2BNet, - EL: l2BEL, - CL: l2BCL, - Batcher: l2BBatcher, - Proposer: l2BProposer, + Name: "l2b", + Network: l2BNet, + EL: l2BEL, + CL: l2BCL, + Batcher: l2BBatcher, + Proposer: l2BProposer, + Followers: l2BFollowers, + Conductors: l2BConductors, }, }, Supernode: supernode, @@ -429,33 +511,96 @@ func addMultiChainFollowL2Node(t devtest.T, runtime *MultiChainRuntime, chainKey t.Require().NotNil(chain, "missing %s runtime chain", chainKey) t.Require().NotNil(chain.CL, "%s runtime chain missing CL follow source", chainKey) - jwtPath := chain.EL.JWTPath() + node := startMultiChainFollowL2Node(t, runtime.Keys, runtime.L1Network, runtime.L1EL, runtime.L1CL, chain.Network, chain.EL, chain.CL, runtime.DependencySet, name) + if chain.Followers == nil { + chain.Followers = make(map[string]*SingleChainNodeRuntime) + } + chain.Followers[name] = node + return node +} + +func startConductorHealthPeers( + t devtest.T, + keys devkeys.Keys, + l1Net *L1Network, + l1EL L1ELNode, + l1CL *L1CLNode, + l2Net *L2Network, + l2EL L2ELNode, + l2CL L2CLNode, + dependencySet depset.DependencySet, + peerCount uint64, +) map[string]*SingleChainNodeRuntime { + followers := make(map[string]*SingleChainNodeRuntime) + for i := uint64(1); i <= peerCount; i++ { + name := fmt.Sprintf("conductor-health-peer-%d", i) + node := startMultiChainFollowL2Node(t, keys, l1Net, l1EL, l1CL, l2Net, l2EL, l2CL, dependencySet, name) + followers[node.Name] = node + } + return followers +} + +func startConductorControlledSequencerCL( + t devtest.T, + keys devkeys.Keys, + l1Net *L1Network, + l1EL L1ELNode, + l1CL *L1CLNode, + l2Net *L2Network, + l2EL L2ELNode, + followSource L2CLNode, + dependencySet depset.DependencySet, + jwtSecret [32]byte, + conductorRPCEndpoint *atomic.Value, +) *OpNode { + sequencerCL := startL2CLNode(t, keys, l1Net, l2Net, l1EL, l1CL, l2EL, jwtSecret, l2CLNodeStartConfig{ + Key: "sequencer", + IsSequencer: true, + NoDiscovery: true, + EnableReqResp: true, + UseReqResp: true, + L2FollowSource: followSource.UserRPC(), + DependencySet: dependencySet, + ConductorRPCEndpoint: conductorRPCEndpoint, + }) + connectL2CLPeers(t, t.Logger(), followSource, sequencerCL) + return sequencerCL +} + +func startMultiChainFollowL2Node( + t devtest.T, + keys devkeys.Keys, + l1Net *L1Network, + l1EL L1ELNode, + l1CL *L1CLNode, + l2Net *L2Network, + l2EL L2ELNode, + l2CL L2CLNode, + dependencySet depset.DependencySet, + name string, +) *SingleChainNodeRuntime { + jwtPath := l2EL.JWTPath() jwtSecret := readJWTSecretFromPath(t, jwtPath) - l2EL := startL2ELNode(t, chain.Network, jwtPath, jwtSecret, name, NewELNodeIdentity(0)) - l2CL := startL2CLNode(t, runtime.Keys, runtime.L1Network, chain.Network, runtime.L1EL, runtime.L1CL, l2EL, jwtSecret, l2CLNodeStartConfig{ + followerEL := startL2ELNode(t, l2Net, jwtPath, jwtSecret, name, NewELNodeIdentity(0)) + followerCL := startL2CLNode(t, keys, l1Net, l2Net, l1EL, l1CL, followerEL, jwtSecret, l2CLNodeStartConfig{ Key: name, IsSequencer: false, NoDiscovery: true, EnableReqResp: false, UseReqResp: false, - L2FollowSource: chain.CL.UserRPC(), - DependencySet: runtime.DependencySet, + L2FollowSource: l2CL.UserRPC(), + DependencySet: dependencySet, }) - connectL2ELPeers(t, t.Logger(), chain.EL.UserRPC(), l2EL.UserRPC(), false) - connectL2CLPeers(t, t.Logger(), chain.CL, l2CL) + connectL2ELPeers(t, t.Logger(), l2EL.UserRPC(), followerEL.UserRPC(), false) + connectL2CLPeers(t, t.Logger(), l2CL, followerCL) - node := &SingleChainNodeRuntime{ + return &SingleChainNodeRuntime{ Name: name, IsSequencer: false, - EL: l2EL, - CL: l2CL, - } - if chain.Followers == nil { - chain.Followers = make(map[string]*SingleChainNodeRuntime) + EL: followerEL, + CL: followerCL, } - chain.Followers[name] = node - return node } func startTwoL2SharedSupernode( @@ -471,19 +616,24 @@ func startTwoL2SharedSupernode( interopActivationTimestamp *uint64, interopLogBackfillDepth time.Duration, jwtSecret [32]byte, + virtualSequencers bool, ) (*SuperNode, *SuperNodeProxy, *SuperNodeProxy) { require := t.Require() logger := t.Logger().New("component", "supernode") makeNodeCfg := func(l2Net *L2Network, l2EL L2ELNode) *opnodeconfig.Config { - p2pKey, err := l2Net.keys.Secret(devkeys.SequencerP2PRole.Key(l2Net.ChainID().ToBig())) - require.NoError(err, "need p2p key for supernode virtual sequencer") + var sequencerP2PKeyHex string + if virtualSequencers { + p2pKey, err := l2Net.keys.Secret(devkeys.SequencerP2PRole.Key(l2Net.ChainID().ToBig())) + require.NoError(err, "need p2p key for supernode virtual sequencer") + sequencerP2PKeyHex = hex.EncodeToString(crypto.FromECDSA(p2pKey)) + } p2pConfig, p2pSignerSetup := newDevstackP2PConfig( t, logger.New("chain_id", l2Net.ChainID().String(), "component", "supernode-p2p"), l2Net.rollupCfg.BlockTime, false, true, - hex.EncodeToString(crypto.FromECDSA(p2pKey)), + sequencerP2PKeyHex, ) cfg := &opnodeconfig.Config{ L1: &opnodeconfig.L1EndpointConfig{ @@ -503,7 +653,7 @@ func startTwoL2SharedSupernode( }, DependencySet: depSet, Beacon: &opnodeconfig.L1BeaconEndpointConfig{BeaconAddr: l1CL.beaconHTTPAddr}, - Driver: driver.Config{SequencerEnabled: true, SequencerConfDepth: 2}, + Driver: driver.Config{SequencerEnabled: virtualSequencers, SequencerConfDepth: 2}, Rollup: *l2Net.rollupCfg, P2PSigner: p2pSignerSetup, RPC: oprpc.CLIConfig{ListenAddr: "127.0.0.1", ListenPort: 0, EnableAdmin: true}, diff --git a/op-devstack/sysgo/preset_config.go b/op-devstack/sysgo/preset_config.go index d3be28e7ebc..4d4c9340a28 100644 --- a/op-devstack/sysgo/preset_config.go +++ b/op-devstack/sysgo/preset_config.go @@ -4,6 +4,7 @@ import ( "time" gameTypes "github.com/ethereum-optimism/optimism/op-challenger/game/types" + opconductor "github.com/ethereum-optimism/optimism/op-conductor/conductor" "github.com/ethereum-optimism/optimism/op-service/eth" ) @@ -36,6 +37,9 @@ type PresetConfig struct { PreGenesisSuperGame *PreGenesisSuperGameConfig // SkipHonestProposer skips starting op-proposer. SkipHonestProposer bool + // ConductorHealthCheck overrides the default slow conductor health check + // intervals for tests that intentionally exercise health transitions. + ConductorHealthCheck *opconductor.HealthCheckConfig } func NewPresetConfig() PresetConfig { diff --git a/op-devstack/sysgo/runtime_state.go b/op-devstack/sysgo/runtime_state.go index 9527efe6224..5c8c0fbe8f6 100644 --- a/op-devstack/sysgo/runtime_state.go +++ b/op-devstack/sysgo/runtime_state.go @@ -121,13 +121,14 @@ func (r *SingleChainRuntime) VMConfig(t devtest.T, dir string) *vm.Config { } type MultiChainNodeRuntime struct { - Name string - Network *L2Network - EL L2ELNode - CL L2CLNode - Batcher *L2Batcher - Proposer *L2Proposer - Followers map[string]*SingleChainNodeRuntime + Name string + Network *L2Network + EL L2ELNode + CL L2CLNode + Batcher *L2Batcher + Proposer *L2Proposer + Followers map[string]*SingleChainNodeRuntime + Conductors map[string]*Conductor } type MultiChainRuntime struct { diff --git a/op-devstack/sysgo/singlechain_build.go b/op-devstack/sysgo/singlechain_build.go index caa9c9742e4..21d265b7d67 100644 --- a/op-devstack/sysgo/singlechain_build.go +++ b/op-devstack/sysgo/singlechain_build.go @@ -5,6 +5,7 @@ import ( "encoding/hex" "flag" "fmt" + "sync/atomic" "time" "github.com/urfave/cli/v2" @@ -240,27 +241,51 @@ func connectL2CLPeers(t devtest.T, logger log.Logger, l2CL1, l2CL2 L2CLNode) { require := t.Require() ctx := t.Ctx() - p := getP2PClientsAndPeers(ctx, logger, require, l2CL1, l2CL2) - - connectPeer := func(p2pClient *sources.P2PClient, multiAddress string) { - err := retry.Do0(ctx, 6, retry.Exponential(), func() error { - return p2pClient.ConnectPeer(ctx, multiAddress) - }) - require.NoError(err, "failed to connect L2CL peer") - } - - connectPeer(p.client1, p.peerInfo2.Addresses[0]) - connectPeer(p.client2, p.peerInfo1.Addresses[0]) + err := retry.Do0(ctx, 6, retry.Exponential(), func() error { + p2pClient1, err := GetP2PClient(ctx, logger, l2CL1) + if err != nil { + return err + } + p2pClient2, err := GetP2PClient(ctx, logger, l2CL2) + if err != nil { + return err + } + peerInfo1, err := GetPeerInfo(ctx, p2pClient1) + if err != nil { + return err + } + peerInfo2, err := GetPeerInfo(ctx, p2pClient2) + if err != nil { + return err + } + if len(peerInfo1.Addresses) == 0 || len(peerInfo2.Addresses) == 0 { + return fmt.Errorf("malformed peer info") + } - peerDump1, err := GetPeers(ctx, p.client1) - require.NoError(err) - peerDump2, err := GetPeers(ctx, p.client2) - require.NoError(err) + if err := p2pClient1.ConnectPeer(ctx, peerInfo2.Addresses[0]); err != nil { + return fmt.Errorf("connect cl1 to cl2: %w", err) + } + if err := p2pClient2.ConnectPeer(ctx, peerInfo1.Addresses[0]); err != nil { + return fmt.Errorf("connect cl2 to cl1: %w", err) + } - _, ok1 := peerDump1.Peers[p.peerInfo2.PeerID.String()] - require.True(ok1, "peer register invalid (cl1 missing cl2)") - _, ok2 := peerDump2.Peers[p.peerInfo1.PeerID.String()] - require.True(ok2, "peer register invalid (cl2 missing cl1)") + peerDump1, err := GetPeers(ctx, p2pClient1) + if err != nil { + return err + } + peerDump2, err := GetPeers(ctx, p2pClient2) + if err != nil { + return err + } + if _, ok := peerDump1.Peers[peerInfo2.PeerID.String()]; !ok { + return fmt.Errorf("peer register invalid (cl1 missing cl2)") + } + if _, ok := peerDump2.Peers[peerInfo1.PeerID.String()]; !ok { + return fmt.Errorf("peer register invalid (cl2 missing cl1)") + } + return nil + }) + require.NoError(err, "failed to connect L2CL peer") } func startSequencerCL( @@ -287,6 +312,8 @@ type l2CLNodeStartConfig struct { L2FollowSource string DependencySet depset.DependencySet L2CLOptions []L2CLOption + + ConductorRPCEndpoint *atomic.Value } func startL2CLNode( @@ -416,7 +443,7 @@ func startL2CLNode( SkipSyncStartCheck: false, SupportsPostFinalizationELSync: false, L2FollowSourceEndpoint: cfg.FollowSource, - NeedInitialResetEngine: false, + NeedInitialResetEngine: cfg.IsSequencer && cfg.FollowSource != "", OffsetELSafe: cfg.OffsetELSafe, }, ConfigPersistence: config.DisabledConfigPersistence{}, @@ -431,6 +458,9 @@ func startL2CLNode( IgnoreMissingPectraBlobSchedule: false, ExperimentalOPStackAPI: true, } + if startCfg.ConductorRPCEndpoint != nil { + configureOpNodeConfigForConductor(nodeCfg, startCfg.ConductorRPCEndpoint) + } l2CL := &OpNode{ name: startCfg.Key, opNode: nil, diff --git a/op-devstack/sysgo/singlechain_variants.go b/op-devstack/sysgo/singlechain_variants.go index 53b8050a227..1733c77b9b7 100644 --- a/op-devstack/sysgo/singlechain_variants.go +++ b/op-devstack/sysgo/singlechain_variants.go @@ -8,13 +8,20 @@ import ( "sync/atomic" "time" + ds "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" + opconductor "github.com/ethereum-optimism/optimism/op-conductor/conductor" "github.com/ethereum-optimism/optimism/op-devstack/devtest" + opnodeconfig "github.com/ethereum-optimism/optimism/op-node/config" + "github.com/ethereum-optimism/optimism/op-node/p2p" + opclient "github.com/ethereum-optimism/optimism/op-service/client" "github.com/ethereum-optimism/optimism/op-service/endpoint" "github.com/ethereum-optimism/optimism/op-service/eth" oplog "github.com/ethereum-optimism/optimism/op-service/log" "github.com/ethereum-optimism/optimism/op-service/retry" oprpc "github.com/ethereum-optimism/optimism/op-service/rpc" + "github.com/ethereum-optimism/optimism/op-service/sources" synctesterconfig "github.com/ethereum-optimism/optimism/op-sync-tester/config" "github.com/ethereum-optimism/optimism/op-sync-tester/synctester" stconf "github.com/ethereum-optimism/optimism/op-sync-tester/synctester/backend/config" @@ -116,10 +123,14 @@ func NewMinimalWithConductorsRuntimeWithConfig(t devtest.T, cfg PresetConfig) *S nodeB := addSingleChainOpNode(t, runtime, "b", true, "", cfg.GlobalL2CLOptions...) nodeC := addSingleChainOpNode(t, runtime, "c", true, "", cfg.GlobalL2CLOptions...) - conductorA := startConductorNode(t, "sequencer", runtime.L2Network, runtime.L2CL.(*OpNode), runtime.L2EL, true, false) - conductorB := startConductorNode(t, "b", runtime.L2Network, nodeB.CL.(*OpNode), nodeB.EL, false, true) - conductorC := startConductorNode(t, "c", runtime.L2Network, nodeC.CL.(*OpNode), nodeC.EL, false, true) + conductorCfg := conductorConfigFromPreset(cfg) + conductorA := startConductorNode(t, "sequencer", runtime.L2Network, runtime.L2CL.(*OpNode), runtime.L2EL, true, false, conductorCfg) + conductorB := startConductorNode(t, "b", runtime.L2Network, nodeB.CL.(*OpNode), nodeB.EL, false, true, conductorCfg) + conductorC := startConductorNode(t, "c", runtime.L2Network, nodeC.CL.(*OpNode), nodeC.EL, false, true, conductorCfg) + connectSingleChainCLPeer(t, runtime.L2CL, nodeB.CL) + connectSingleChainCLPeer(t, runtime.L2CL, nodeC.CL) startConductorCluster(t, conductorA, []*Conductor{conductorB, conductorC}) + waitForRollupSequencerActive(t, runtime.L2CL.UserRPC()) runtime.Conductors = map[string]*Conductor{ "sequencer": conductorA, @@ -173,6 +184,61 @@ func addSingleChainOpNode( return node } +type conductorNodeConfig struct { + HealthCheck opconductor.HealthCheckConfig +} + +func conductorConfigFromPreset(cfg PresetConfig) conductorNodeConfig { + healthCfg := opconductor.HealthCheckConfig{ + Interval: 3600, + UnsafeInterval: 3600, + SafeInterval: 3600, + MinPeerCount: 1, + InteropReorgLeniencyWindowSize: 5, + } + if cfg.ConductorHealthCheck != nil { + healthCfg = *cfg.ConductorHealthCheck + } + return conductorNodeConfig{ + HealthCheck: healthCfg, + } +} + +func newConductorRPCEndpoint() *atomic.Value { + var conductorRPCEndpoint atomic.Value + conductorRPCEndpoint.Store("") + return &conductorRPCEndpoint +} + +func configureOpNodeConfigForConductor(cfg *opnodeconfig.Config, conductorRPCEndpoint *atomic.Value) { + cfg.ConductorEnabled = true + cfg.ConductorRpcTimeout = 5 * time.Second + cfg.ConductorRpc = conductorRPCFromEndpoint(conductorRPCEndpoint) + cfg.Driver.SequencerStopped = true +} + +func configureOpNodeForConductor(opNode *OpNode, conductorRPCEndpoint *atomic.Value) { + configureOpNodeConfigForConductor(opNode.cfg, conductorRPCEndpoint) + if p2pCfg, ok := opNode.cfg.P2P.(*p2p.Config); ok { + p2pCfg.Store = dssync.MutexWrap(ds.NewMapDatastore()) + } +} + +func conductorRPCFromEndpoint(conductorRPCEndpoint *atomic.Value) func(ctx context.Context) (string, error) { + return func(ctx context.Context) (string, error) { + for { + if endpoint, _ := conductorRPCEndpoint.Load().(string); endpoint != "" { + return endpoint, nil + } + select { + case <-ctx.Done(): + return "", ctx.Err() + case <-time.After(100 * time.Millisecond): + } + } + } +} + func startSyncTesterService(t devtest.T, chainRPCs map[eth.ChainID]string) *SyncTesterService { require := t.Require() syncTesters := make(map[sttypes.SyncTesterID]*stconf.SyncTesterEntry) @@ -234,31 +300,31 @@ func startConductorNode( l2EL L2ELNode, bootstrap bool, paused bool, + conductorCfg conductorNodeConfig, +) *Conductor { + conductorRPCEndpoint := newConductorRPCEndpoint() + configureOpNodeForConductor(opNode, conductorRPCEndpoint) + opNode.Stop() + opNode.Start() + + return startConductorForRPC(t, conductorName, l2Net, opNode.UserRPC(), l2EL.UserRPC(), bootstrap, paused, conductorRPCEndpoint, conductorCfg) +} + +func startConductorForRPC( + t devtest.T, + conductorName string, + l2Net *L2Network, + nodeRPC string, + executionRPC string, + bootstrap bool, + paused bool, + conductorRPCEndpoint *atomic.Value, + conductorCfg conductorNodeConfig, ) *Conductor { require := t.Require() serverID := conductorName require.NotEmpty(serverID, "conductor ID key cannot be empty") - var conductorRPCEndpoint atomic.Value - conductorRPCEndpoint.Store("") - opNode.cfg.ConductorEnabled = true - opNode.cfg.ConductorRpcTimeout = 5 * time.Second - opNode.cfg.ConductorRpc = func(ctx context.Context) (string, error) { - for { - if endpoint, _ := conductorRPCEndpoint.Load().(string); endpoint != "" { - return endpoint, nil - } - select { - case <-ctx.Done(): - return "", ctx.Err() - case <-time.After(100 * time.Millisecond): - } - } - } - opNode.cfg.Driver.SequencerStopped = true - opNode.Stop() - opNode.Start() - cfg := opconductor.Config{ ConsensusAddr: "127.0.0.1", ConsensusPort: 0, @@ -271,17 +337,12 @@ func startConductorNode( RaftTrailingLogs: 10240, RaftHeartbeatTimeout: 1000 * time.Millisecond, RaftLeaderLeaseTimeout: 500 * time.Millisecond, - NodeRPC: opNode.UserRPC(), - ExecutionRPC: l2EL.UserRPC(), + NodeRPC: nodeRPC, + ExecutionRPC: executionRPC, Paused: paused, - HealthCheck: opconductor.HealthCheckConfig{ - Interval: 3600, - UnsafeInterval: 3600, - SafeInterval: 3600, - MinPeerCount: 1, - }, - RollupCfg: *l2Net.rollupCfg, - RPCEnableProxy: false, + HealthCheck: conductorCfg.HealthCheck, + RollupCfg: *l2Net.rollupCfg, + RPCEnableProxy: false, LogConfig: oplog.CLIConfig{ Level: log.LevelInfo, Format: oplog.FormatText, @@ -294,6 +355,7 @@ func startConductorNode( } logger := t.Logger().New("component", "conductor", "name", conductorName, "chain", l2Net.ChainID()) + initialUnsafePayload := fetchInitialUnsafePayload(t, nodeRPC, executionRPC) svc, err := opconductor.New(t.Ctx(), &cfg, logger, "0.0.1") require.NoError(err) require.NoError(svc.Start(t.Ctx())) @@ -307,17 +369,78 @@ func startConductorNode( }) out := &Conductor{ - name: conductorName, - chainID: l2Net.ChainID(), - serverID: serverID, - consensusEndpoint: svc.ConsensusEndpoint(), - rpcEndpoint: svc.HTTPEndpoint(), - service: svc, + name: conductorName, + chainID: l2Net.ChainID(), + serverID: serverID, + consensusEndpoint: svc.ConsensusEndpoint(), + rpcEndpoint: svc.HTTPEndpoint(), + initialUnsafePayload: initialUnsafePayload, + service: svc, + } + if conductorRPCEndpoint != nil { + conductorRPCEndpoint.Store(svc.HTTPEndpoint()) } - conductorRPCEndpoint.Store(svc.HTTPEndpoint()) return out } +func fetchInitialUnsafePayload(t devtest.T, nodeRPC string, executionRPC string) *eth.ExecutionPayloadEnvelope { + nodeRPCClient, err := opclient.NewRPC(t.Ctx(), t.Logger(), nodeRPC) + t.Require().NoError(err, "dial node RPC for conductor unsafe head") + defer nodeRPCClient.Close() + rollupClient := sources.NewRollupClient(nodeRPCClient) + + executionRPCClient, err := opclient.NewRPC(t.Ctx(), t.Logger(), executionRPC) + t.Require().NoError(err, "dial execution RPC for conductor unsafe head") + defer executionRPCClient.Close() + ethClient, err := sources.NewEthClient(executionRPCClient, t.Logger(), nil, sources.DefaultEthClientConfig(10)) + t.Require().NoError(err, "create execution client for conductor unsafe head") + + var payload *eth.ExecutionPayloadEnvelope + err = retry.Do0(t.Ctx(), 120, retry.Fixed(250*time.Millisecond), func() error { + status, err := rollupClient.SyncStatus(t.Ctx()) + if err != nil { + return fmt.Errorf("fetch node sync status: %w", err) + } + if status == nil { + return errors.New("node sync status is nil") + } + currentPayload, err := ethClient.PayloadByHash(t.Ctx(), status.UnsafeL2.Hash) + if err != nil { + return fmt.Errorf("fetch unsafe payload %s: %w", status.UnsafeL2, err) + } + if currentPayload.ExecutionPayload.BlockHash != status.UnsafeL2.Hash || + uint64(currentPayload.ExecutionPayload.BlockNumber) != status.UnsafeL2.Number { + return fmt.Errorf("unsafe payload %s does not match sync status %s", currentPayload.ID(), status.UnsafeL2) + } + payload = currentPayload + return nil + }) + t.Require().NoError(err, "fetch conductor initial unsafe payload") + return payload +} + +func waitForRollupSequencerActive(t devtest.T, nodeRPC string) { + require := t.Require() + rpcClient, err := opclient.NewRPC(t.Ctx(), t.Logger(), nodeRPC) + require.NoError(err, "dial op-node RPC for sequencer active check") + defer rpcClient.Close() + rollupClient := sources.NewRollupClient(rpcClient) + + ctx, cancel := context.WithTimeout(t.Ctx(), 30*time.Second) + defer cancel() + err = retry.Do0(ctx, 60, retry.Fixed(500*time.Millisecond), func() error { + active, err := rollupClient.SequencerActive(ctx) + if err != nil { + return err + } + if !active { + return errors.New("sequencer is not active yet") + } + return nil + }) + require.NoError(err, "sequencer never became active") +} + func startConductorCluster(t devtest.T, bootstrap *Conductor, members []*Conductor) { require := t.Require() ctx, cancel := context.WithTimeout(t.Ctx(), 90*time.Second) @@ -331,6 +454,13 @@ func startConductorCluster(t devtest.T, bootstrap *Conductor, members []*Conduct }) require.NoError(err, "bootstrap conductor never became leader") + if bootstrap.initialUnsafePayload != nil { + err = retry.Do0(ctx, 40, retry.Fixed(250*time.Millisecond), func() error { + return bootstrap.service.CommitUnsafePayload(ctx, bootstrap.initialUnsafePayload) + }) + require.NoError(err, "failed to seed conductor unsafe head") + } + for _, member := range members { err := retry.Do0(ctx, 40, retry.Fixed(250*time.Millisecond), func() error { return bootstrap.service.AddServerAsNonvoter(ctx, member.ServerID(), member.ConsensusEndpoint(), 0)