diff --git a/.github/workflows/kurtosis-devnet.yml b/.github/workflows/kurtosis-devnet.yml index 05663291eb93..a1036ee13319 100644 --- a/.github/workflows/kurtosis-devnet.yml +++ b/.github/workflows/kurtosis-devnet.yml @@ -5,20 +5,20 @@ on: branches: [eigenda-develop] pull_request: +env: + MISE_VERSION: 2024.12.14 + jobs: # This is an optimism devnet which talks to the eigenda holesky testnet via an eigenda-proxy. # TODO: we should connect this to an eigenda kurtosis devnet instead of using our holesky testnet. run_op_eigenda_holesky_devnet: runs-on: ubuntu-latest steps: - - name: Checkout Repository - uses: actions/checkout@v4 + - uses: actions/checkout@v4 - uses: jdx/mise-action@v2 with: - version: 2024.12.14 # [default: latest] mise version to install - install: true # [default: true] run `mise install` - cache: true # [default: true] cache mise using GitHub's cache - experimental: true # [default: false] enable experimental features + version: ${{ env.MISE_VERSION }} + experimental: true # Needed by the just eigenda-holesky-devnet command below # These secrets get injected into the eigenda-holesky.yaml kurtosis config file - name: Create EigenDA secrets file @@ -31,7 +31,16 @@ jobs: } } EOF - - name: Run Starlark + - run: just eigenda-holesky-devnet + working-directory: kurtosis-devnet + + run_op_eigenda_memstore_devnet: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: jdx/mise-action@v2 + with: + version: ${{ env.MISE_VERSION }} + experimental: true + - run: just eigenda-memstore-devnet working-directory: kurtosis-devnet - run: | - just eigenda-holesky-devnet diff --git a/kurtosis-devnet/eigenda-memstore.yaml b/kurtosis-devnet/eigenda-memstore.yaml new file mode 100644 index 000000000000..3d8cd1a643d5 --- /dev/null +++ b/kurtosis-devnet/eigenda-memstore.yaml @@ -0,0 +1,101 @@ +# This devnet uses an eigenda-proxy to interact with the eigenda holesky testnet network. +# As a requirement, you must first create and populate the eigenda-secrets.json file +# 1. cp eigenda-secrets.example.json eigenda-secrets.json +# 2. Populate the file with the required values +# TODO: Connect this with an eigenda v1 kurtosis devnet instead of using our holesky testnet. +# See https://github.com/Layr-Labs/avs-devnet/blob/main/examples/eigenda.yaml +{{- $context := or . (dict)}} +--- +optimism_package: + altda_deploy_config: + use_altda: true + # We use the generic commitment which means that the dachallenge contract won't get deployed. + # We align with l2beat's analysis of the da_challenge contract not being economically viable, + # so even if a rollup failsover to keccak commitments, not using the da_challenge contract is fine + # (has same security as using it). + # See https://l2beat.com/scaling/projects/redstone#da-layer-risk-analysis and + # https://discord.com/channels/1244729134312198194/1260612364865245224/1290294353688002562 for + # an economic analysis of the da challenge contract. + da_commitment_type: GenericCommitment + da_challenge_window: 16 + da_resolve_window: 16 + da_bond_size: 0 + da_resolver_refund_percentage: 0 + chains: + - participants: + - el_type: op-geth + # latest tag is currently broken until the next stable release, see https://github.com/ethereum-optimism/op-geth/pull/515 + # Also see discussion in https://discord.com/channels/1244729134312198194/1260624141497798706/1342556343495692320 + el_image: "us-docker.pkg.dev/oplabs-tools-artifacts/images/op-geth:optimism" + el_log_level: "" + el_extra_env_vars: {} + el_extra_labels: {} + el_extra_params: [] + cl_type: op-node + cl_image: {{ localDockerImage "op-node" }} + cl_log_level: "debug" + cl_extra_env_vars: {} + cl_extra_labels: {} + cl_extra_params: [] + count: 1 + network_params: + network: "kurtosis" + network_id: "2151908" + seconds_per_slot: 2 + name: "op-kurtosis" + fjord_time_offset: 0 + granite_time_offset: 0 + holocene_time_offset: 0 + fund_dev_accounts: true + batcher_params: + image: {{ localDockerImage "op-batcher" }} + extra_params: + - --altda.max-concurrent-da-requests=1 + - --max-channel-duration=25 + - --target-num-frames=1 + - --max-l1-tx-size-bytes=1000 + - --batch-type=1 + proposer_params: + image: {{ localDockerImage "op-proposer" }} + extra_params: [] + game_type: 1 + proposal_interval: 10m + challenger_params: + # TODO: reenable once we start testing secure integrations + enabled: false + image: {{ localDockerImage "op-challenger" }} + cannon_prestate_path: "" + cannon_prestates_url: "http://fileserver/proofs/op-program/cannon" + extra_params: [] + da_server_params: + image: ghcr.io/layr-labs/eigenda-proxy:v1.6.4 + cmd: + - --addr + - 0.0.0.0 + - --port + - "3100" + - --memstore.enabled + - --memstore.expiration + - "30m" + additional_services: + - da_server + global_log_level: "info" + global_node_selectors: {} + global_tolerations: [] + persistent: false +ethereum_package: + participants: + - el_type: geth + cl_type: teku + network_params: + preset: minimal + genesis_delay: 5 + additional_preloaded_contracts: | + { + "0x4e59b44847b379578588920cA78FbF26c0B4956C": { + "balance": "0ETH", + "code": "0x7fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffe03601600081602082378035828234f58015156039578182fd5b8082525050506014600cf3", + "storage": {}, + "nonce": "1" + } + } diff --git a/kurtosis-devnet/justfile b/kurtosis-devnet/justfile index 94830b2ec346..082cf963acff 100644 --- a/kurtosis-devnet/justfile +++ b/kurtosis-devnet/justfile @@ -74,9 +74,83 @@ devnet-test DEVNET *TEST: # Devnet recipes # EigenDA devnet that uses eigenda-proxy connected to eigenda holesky testnet network +[group('eigenda')] eigenda-holesky-devnet: (devnet "eigenda-holesky.yaml" "eigenda-secrets.json" "eigenda-holesky") +[group('eigenda')] eigenda-holesky-devnet-clean: kurtosis enclave rm eigenda-holesky-devnet --force +# EigenDA devnet that uses the eigenda-proxy in memstore mode (simulates an eigenda network but generates random certs) +[group('eigenda')] +eigenda-memstore-devnet: (devnet "eigenda-memstore.yaml") +[group('eigenda')] +eigenda-memstore-devnet-clean: + kurtosis enclave rm eigenda-memstore-devnet --force +# Cause proxy to start returning 503 errors to batcher, as a signal +# to failover to ethDA. Use `eigenda-memstore-devnet-failback` to revert. +[group('eigenda')] +eigenda-memstore-devnet-failover: + #!/usr/bin/env bash + PROXY_ENDPOINT=$(kurtosis port print eigenda-memstore-devnet da-server-op-kurtosis http) + curl -X PATCH $PROXY_ENDPOINT/memstore/config -d '{"PutReturnsFailoverError": true}' +[group('eigenda')] +eigenda-memstore-devnet-failback: + #!/usr/bin/env bash + PROXY_ENDPOINT=$(kurtosis port print eigenda-memstore-devnet da-server-op-kurtosis http) + curl -X PATCH $PROXY_ENDPOINT/memstore/config -d '{"PutReturnsFailoverError": false}' +[group('eigenda')] +eigenda-memstore-devnet-sync-status: + #!/usr/bin/env bash + OPNODE_ENDPOINT=$(kurtosis port print eigenda-memstore-devnet op-cl-1-op-node-op-geth-op-kurtosis http) + cast rpc optimism_syncStatus --rpc-url $OPNODE_ENDPOINT | jq +[group('eigenda')] +eigenda-memstore-devnet-configs-l1-l2: + #!/usr/bin/env bash + echo "OP-NODE ROLLUP CONFIG:" + OPNODE_ENDPOINT=$(kurtosis port print eigenda-memstore-devnet op-cl-1-op-node-op-geth-op-kurtosis http) + cast rpc optimism_rollupConfig --rpc-url $OPNODE_ENDPOINT | jq + echo "TEKU L1-CL SPEC:" + TEKU_ENDPOINT=$(kurtosis port print eigenda-memstore-devnet cl-1-teku-geth http) + curl $TEKU_ENDPOINT/eth/v1/config/spec | jq +# We unfortunately have to restart the batcher in this ugly way right now just to change even a single flag. +# This is b/c op's kurtosis setup right now is not idempotent so if we change a param in eigenda-memstore.yaml +# and rerun `just eigenda-memstore-devnet`, the entire devnet gets respun up which takes a long time. +# Track progress for fixing this in https://github.com/ethereum-optimism/optimism/issues/14390. +# Kurtosis also doesn't have a simple way to update a running service's config, like `kubectl edit` for k8s. +# See https://github.com/kurtosis-tech/kurtosis/issues/2628 for this issue. +# Restart batcher with new flags or image. +[group('eigenda')] +eigenda-memstore-devnet-restart-batcher: + #!/usr/bin/env bash + # IMAGE=op-batcher:eigenda-memstore-devnet + IMAGE=us-docker.pkg.dev/oplabs-tools-artifacts/images/op-batcher:v1.10.0 + kurtosis service add eigenda-memstore-devnet op-batcher-op-kurtosis \ + $IMAGE \ + --ports "http=8548,metrics=9001" \ + -- op-batcher \ + --l2-eth-rpc=http://op-el-1-op-geth-op-node-op-kurtosis:8545 \ + --rollup-rpc=http://op-cl-1-op-node-op-geth-op-kurtosis:8547 \ + --poll-interval=1s \ + --sub-safety-margin=6 \ + --num-confirmations=1 \ + --safe-abort-nonce-too-low-count=3 \ + --resubmission-timeout=30s \ + --rpc.addr=0.0.0.0 \ + --rpc.port=8548 \ + --rpc.enable-admin \ + --metrics.enabled \ + --metrics.addr=0.0.0.0 \ + --metrics.port=9001 \ + --l1-eth-rpc=http://el-1-geth-teku:8545 \ + --private-key=0xb3d2d558e3491a3709b7c451100a0366b5872520c7aa020c17a0e7fa35b6a8df \ + --data-availability-type=calldata \ + --altda.enabled=True \ + --altda.da-server=http://da-server-op-kurtosis:3100 \ + --altda.da-service \ + --altda.max-concurrent-da-requests=1 \ + --max-channel-duration=25 \ + --target-num-frames=1 \ + --max-l1-tx-size-bytes=1000 \ + --batch-type=1 # Simple devnet simple-devnet: (devnet "simple.yaml") diff --git a/op-alt-da/daclient.go b/op-alt-da/daclient.go index 9f0bdab11fbd..dc690bbbbc88 100644 --- a/op-alt-da/daclient.go +++ b/op-alt-da/daclient.go @@ -16,6 +16,11 @@ var ErrNotFound = errors.New("not found") // ErrInvalidInput is returned when the input is not valid for posting to the DA storage. var ErrInvalidInput = errors.New("invalid input") +// ErrAltDADown is returned when the alt DA returns a 503 status code. +// It is used to signify that the alt DA is down and the client should failover to the eth DA. +// See https://github.com/ethereum-optimism/specs/issues/434 +var ErrAltDADown = errors.New("alt DA is down: failover to eth DA") + // DAClient is an HTTP client to communicate with a DA storage service. // It creates commitments and retrieves input data + verifies if needed. type DAClient struct { @@ -131,6 +136,9 @@ func (c *DAClient) setInput(ctx context.Context, img []byte) (CommitmentData, er return nil, err } defer resp.Body.Close() + if resp.StatusCode == http.StatusServiceUnavailable { + return nil, ErrAltDADown + } if resp.StatusCode != http.StatusOK { return nil, fmt.Errorf("failed to store data: %v", resp.StatusCode) } diff --git a/op-alt-da/damock.go b/op-alt-da/damock.go index ad388d0b2653..03cbfc4e99d7 100644 --- a/op-alt-da/damock.go +++ b/op-alt-da/damock.go @@ -105,12 +105,16 @@ func (d *AltDADisabled) AdvanceL1Origin(ctx context.Context, l1 L1Fetcher, block } // FakeDAServer is a fake DA server for e2e tests. -// It is a small wrapper around DAServer that allows for setting request latencies, -// to mimic a DA service with slow responses (eg. eigenDA with 10 min batching interval). +// It is a small wrapper around DAServer that allows for setting: +// - request latencies, to mimic a DA service with slow responses +// (eg. eigenDA with 10 min batching interval). +// - response status codes, to mimic a DA service that is down. type FakeDAServer struct { *DAServer putRequestLatency time.Duration getRequestLatency time.Duration + // next failoverCount Put requests will return 503 status code for failover testing + failoverCount uint64 } func NewFakeDAServer(host string, port int, log log.Logger) *FakeDAServer { @@ -130,6 +134,11 @@ func (s *FakeDAServer) HandleGet(w http.ResponseWriter, r *http.Request) { func (s *FakeDAServer) HandlePut(w http.ResponseWriter, r *http.Request) { time.Sleep(s.putRequestLatency) + if s.failoverCount > 0 { + w.WriteHeader(http.StatusServiceUnavailable) + s.failoverCount-- + return + } s.DAServer.HandlePut(w, r) } @@ -154,6 +163,11 @@ func (s *FakeDAServer) SetGetRequestLatency(latency time.Duration) { s.getRequestLatency = latency } +// SetResponseStatusForNRequests sets the next n Put requests to return 503 status code. +func (s *FakeDAServer) SetPutFailoverForNRequests(n uint64) { + s.failoverCount = n +} + type MemStore struct { db map[string][]byte lock sync.RWMutex diff --git a/op-batcher/batcher/channel.go b/op-batcher/batcher/channel.go index 6b936c112d34..751110839250 100644 --- a/op-batcher/batcher/channel.go +++ b/op-batcher/batcher/channel.go @@ -45,8 +45,9 @@ func newChannel(log log.Logger, metr metrics.Metricer, cfg ChannelConfig, rollup } // TxFailed records a transaction as failed. It will attempt to resubmit the data -// in the failed transaction. -func (c *channel) TxFailed(id string) { +// in the failed transaction. failoverToEthDA should be set to true when using altDA +// and altDA is down. This will switch the channel to submit frames to ethDA instead. +func (c *channel) TxFailed(id string, failoverToEthDA bool) { if data, ok := c.pendingTransactions[id]; ok { c.log.Trace("marked transaction as failed", "id", id) // Rewind to the first frame of the failed tx @@ -57,7 +58,16 @@ func (c *channel) TxFailed(id string) { } else { c.log.Warn("unknown transaction marked as failed", "id", id) } - + if failoverToEthDA { + // We failover to calldata txs because in altda mode the channel and channelManager + // are configured to use a calldataConfigManager, as opposed to DynamicEthChannelConfig + // which can use both calldata and blobs. Failover should happen extremely rarely, + // and is only used while the altDA is down, so we can afford to be inefficient here. + // TODO: figure out how to switch to blobs/auto instead. Might need to make + // batcherService.initChannelConfig function stateless so that we can reuse it. + c.log.Info("Failing over to calldata txs", "id", c.ID()) + c.cfg.DaType = DaTypeCalldata + } c.metr.RecordBatchTxFailed() } @@ -132,14 +142,14 @@ func (c *channel) ID() derive.ChannelID { // NextTxData should only be called after HasTxData returned true. func (c *channel) NextTxData() txData { nf := c.cfg.MaxFramesPerTx() - txdata := txData{frames: make([]frameData, 0, nf), asBlob: c.cfg.UseBlobs} + txdata := txData{frames: make([]frameData, 0, nf), daType: c.cfg.DaType} for i := 0; i < nf && c.channelBuilder.HasPendingFrame(); i++ { frame := c.channelBuilder.NextFrame() txdata.frames = append(txdata.frames, frame) } id := txdata.ID().String() - c.log.Debug("returning next tx data", "id", id, "num_frames", len(txdata.frames), "as_blob", txdata.asBlob) + c.log.Debug("returning next tx data", "id", id, "num_frames", len(txdata.frames), "da_type", txdata.daType) c.pendingTransactions[id] = txdata return txdata @@ -147,7 +157,7 @@ func (c *channel) NextTxData() txData { func (c *channel) HasTxData() bool { if c.IsFull() || // If the channel is full, we should start to submit it - !c.cfg.UseBlobs { // If using calldata, we only send one frame per tx + c.cfg.DaType == DaTypeCalldata { // If using calldata, we only send one frame per tx return c.channelBuilder.HasPendingFrame() } // Collect enough frames if channel is not full yet diff --git a/op-batcher/batcher/channel_config.go b/op-batcher/batcher/channel_config.go index bf0f5ffb4adb..5054bb398950 100644 --- a/op-batcher/batcher/channel_config.go +++ b/op-batcher/batcher/channel_config.go @@ -46,9 +46,12 @@ type ChannelConfig struct { // BatchType indicates whether the channel uses SingularBatch or SpanBatch. BatchType uint - // UseBlobs indicates that this channel should be sent as a multi-blob - // transaction with one blob per frame. - UseBlobs bool + // DaType indicates how the frames in this channel should be sent to the L1. + DaType DaType +} + +func (cc ChannelConfig) UseBlobs() bool { + return cc.DaType == DaTypeBlob } // ChannelConfig returns a copy of the receiver. @@ -93,7 +96,7 @@ func (cc *ChannelConfig) ReinitCompressorConfig() { } func (cc *ChannelConfig) MaxFramesPerTx() int { - if !cc.UseBlobs { + if cc.DaType == DaTypeCalldata { return 1 } return cc.TargetNumFrames diff --git a/op-batcher/batcher/channel_config_provider_test.go b/op-batcher/batcher/channel_config_provider_test.go index 95e51a921e5f..fccc26d64921 100644 --- a/op-batcher/batcher/channel_config_provider_test.go +++ b/op-batcher/batcher/channel_config_provider_test.go @@ -31,11 +31,12 @@ func TestDynamicEthChannelConfig_ChannelConfig(t *testing.T) { calldataCfg := ChannelConfig{ MaxFrameSize: 120_000 - 1, TargetNumFrames: 1, + DaType: DaTypeCalldata, } blobCfg := ChannelConfig{ MaxFrameSize: eth.MaxBlobDataSize - 1, TargetNumFrames: 3, // gets closest to amortized fixed tx costs - UseBlobs: true, + DaType: DaTypeBlob, } tests := []struct { diff --git a/op-batcher/batcher/channel_manager.go b/op-batcher/batcher/channel_manager.go index 1ea412c4b433..309a4aa77eb2 100644 --- a/op-batcher/batcher/channel_manager.go +++ b/op-batcher/batcher/channel_manager.go @@ -92,12 +92,13 @@ func (s *channelManager) pendingBlocks() int { } // TxFailed records a transaction as failed. It will attempt to resubmit the data -// in the failed transaction. -func (s *channelManager) TxFailed(_id txID) { +// in the failed transaction. failoverToEthDA should be set to true when using altDA +// and altDA is down. This will switch the channel to submit frames to ethDA instead. +func (s *channelManager) TxFailed(_id txID, failoverToEthDA bool) { id := _id.String() if channel, ok := s.txChannels[id]; ok { delete(s.txChannels, id) - channel.TxFailed(id) + channel.TxFailed(id, failoverToEthDA) } else { s.log.Warn("transaction from unknown channel marked as failed", "id", id) } @@ -207,16 +208,16 @@ func (s *channelManager) TxData(l1Head eth.BlockID, isPectra bool) (txData, erro newCfg := s.cfgProvider.ChannelConfig(isPectra) // No change: - if newCfg.UseBlobs == s.defaultCfg.UseBlobs { + if newCfg.UseBlobs() == s.defaultCfg.UseBlobs() { s.log.Debug("Recomputing optimal ChannelConfig: no need to switch DA type", - "useBlobs", s.defaultCfg.UseBlobs) + "useBlobs", s.defaultCfg.UseBlobs()) return s.nextTxData(channel) } // Change: s.log.Info("Recomputing optimal ChannelConfig: changing DA type and requeing blocks...", - "useBlobsBefore", s.defaultCfg.UseBlobs, - "useBlobsAfter", newCfg.UseBlobs) + "useBlobsBefore", s.defaultCfg.UseBlobs(), + "useBlobsAfter", newCfg.UseBlobs()) // Invalidate the channel so its blocks // get requeued: @@ -317,7 +318,7 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error { "compression_algo", cfg.CompressorConfig.CompressionAlgo, "target_num_frames", cfg.TargetNumFrames, "max_frame_size", cfg.MaxFrameSize, - "use_blobs", cfg.UseBlobs, + "da_type", cfg.DaType.String(), ) s.metr.RecordChannelOpened(pc.ID(), s.pendingBlocks()) diff --git a/op-batcher/batcher/channel_manager_test.go b/op-batcher/batcher/channel_manager_test.go index d7c8abcd87e9..460320e515c6 100644 --- a/op-batcher/batcher/channel_manager_test.go +++ b/op-batcher/batcher/channel_manager_test.go @@ -211,7 +211,7 @@ func ChannelManager_TxResend(t *testing.T, batchType uint) { require.ErrorIs(err, io.EOF) // requeue frame - m.TxFailed(txdata0.ID()) + m.TxFailed(txdata0.ID(), false) txdata1, err := m.TxData(eth.BlockID{}, false) require.NoError(err) @@ -290,11 +290,12 @@ func newFakeDynamicEthChannelConfig(lgr log.Logger, calldataCfg := ChannelConfig{ MaxFrameSize: 120_000 - 1, TargetNumFrames: 1, + DaType: DaTypeCalldata, } blobCfg := ChannelConfig{ MaxFrameSize: eth.MaxBlobDataSize - 1, TargetNumFrames: 3, // gets closest to amortized fixed tx costs - UseBlobs: true, + DaType: DaTypeBlob, } calldataCfg.InitNoneCompressor() blobCfg.InitNoneCompressor() @@ -348,7 +349,7 @@ func TestChannelManager_TxData(t *testing.T) { cfg.chooseBlobs = tc.chooseBlobsWhenChannelCreated m := NewChannelManager(l, metrics.NoopMetrics, cfg, defaultTestRollupConfig) - require.Equal(t, tc.chooseBlobsWhenChannelCreated, m.defaultCfg.UseBlobs) + require.Equal(t, tc.chooseBlobsWhenChannelCreated, m.defaultCfg.DaType == DaTypeBlob) // Seed channel manager with a block rng := rand.New(rand.NewSource(99)) @@ -385,8 +386,8 @@ func TestChannelManager_TxData(t *testing.T) { } require.Equal(t, tc.numExpectedAssessments, cfg.assessments) - require.Equal(t, tc.chooseBlobsWhenChannelSubmitted, data.asBlob) - require.Equal(t, tc.chooseBlobsWhenChannelSubmitted, m.defaultCfg.UseBlobs) + require.Equal(t, tc.chooseBlobsWhenChannelSubmitted, data.daType == DaTypeBlob) + require.Equal(t, tc.chooseBlobsWhenChannelSubmitted, m.defaultCfg.DaType == DaTypeBlob) }) } diff --git a/op-batcher/batcher/channel_test.go b/op-batcher/batcher/channel_test.go index b36ce9311bce..3b847e420196 100644 --- a/op-batcher/batcher/channel_test.go +++ b/op-batcher/batcher/channel_test.go @@ -131,7 +131,7 @@ func TestChannel_NextTxData_singleFrameTx(t *testing.T) { const n = 6 lgr := testlog.Logger(t, log.LevelWarn) ch, err := newChannelWithChannelOut(lgr, metrics.NoopMetrics, ChannelConfig{ - UseBlobs: false, + DaType: DaTypeCalldata, TargetNumFrames: n, CompressorConfig: compressor.Config{ CompressionAlgo: derive.Zlib, @@ -172,7 +172,7 @@ func TestChannel_NextTxData_multiFrameTx(t *testing.T) { const n = eth.MaxBlobsPerBlobTx lgr := testlog.Logger(t, log.LevelWarn) ch, err := newChannelWithChannelOut(lgr, metrics.NoopMetrics, ChannelConfig{ - UseBlobs: true, + DaType: DaTypeBlob, TargetNumFrames: n, CompressorConfig: compressor.Config{ CompressionAlgo: derive.Zlib, @@ -305,13 +305,13 @@ func TestChannelTxFailed(t *testing.T) { // Trying to mark an unknown pending transaction as failed // shouldn't modify state - m.TxFailed(zeroFrameTxID(0)) + m.TxFailed(zeroFrameTxID(0), false) require.Equal(t, 0, m.currentChannel.PendingFrames()) require.Equal(t, expectedTxData, m.currentChannel.pendingTransactions[expectedChannelID.String()]) // Now we still have a pending transaction // Let's mark it as failed - m.TxFailed(expectedChannelID) + m.TxFailed(expectedChannelID, false) require.Empty(t, m.currentChannel.pendingTransactions) // There should be a frame in the pending channel now require.Equal(t, 1, m.currentChannel.PendingFrames()) diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index fb791d4d2166..393ce1a88dc0 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -780,14 +780,6 @@ func (l *BatchSubmitter) cancelBlockingTx(queue *txmgr.Queue[txRef], receiptsCh // publishToAltDAAndL1 posts the txdata to the DA Provider and then sends the commitment to L1. func (l *BatchSubmitter) publishToAltDAAndL1(txdata txData, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daGroup *errgroup.Group) { - // sanity checks - if nf := len(txdata.frames); nf != 1 { - l.Log.Crit("Unexpected number of frames in calldata tx", "num_frames", nf) - } - if txdata.asBlob { - l.Log.Crit("Unexpected blob txdata with AltDA enabled") - } - // when posting txdata to an external DA Provider, we use a goroutine to avoid blocking the main loop // since it may take a while for the request to return. goroutineSpawned := daGroup.TryGo(func() error { @@ -827,16 +819,17 @@ func (l *BatchSubmitter) publishToAltDAAndL1(txdata txData, queue *txmgr.Queue[t // The method will block if the queue's MaxPendingTransactions is exceeded. func (l *BatchSubmitter) sendTransaction(txdata txData, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daGroup *errgroup.Group) error { var err error - - // if Alt DA is enabled we post the txdata to the DA Provider and replace it with the commitment. - if l.Config.UseAltDA { + var candidate *txmgr.TxCandidate + switch txdata.daType { + case DaTypeAltDA: + if !l.Config.UseAltDA { + l.Log.Crit("Received AltDA type txdata without AltDA being enabled") + } + // if Alt DA is enabled we post the txdata to the DA Provider and replace it with the commitment. l.publishToAltDAAndL1(txdata, queue, receiptsCh, daGroup) // we return nil to allow publishStateToL1 to keep processing the next txdata return nil - } - - var candidate *txmgr.TxCandidate - if txdata.asBlob { + case DaTypeBlob: if candidate, err = l.blobTxCandidate(txdata); err != nil { // We could potentially fall through and try a calldata tx instead, but this would // likely result in the chain spending more in gas fees than it is tuned for, so best @@ -844,12 +837,14 @@ func (l *BatchSubmitter) sendTransaction(txdata txData, queue *txmgr.Queue[txRef // or configuration issue. return fmt.Errorf("could not create blob tx candidate: %w", err) } - } else { + case DaTypeCalldata: // sanity check if nf := len(txdata.frames); nf != 1 { l.Log.Crit("Unexpected number of frames in calldata tx", "num_frames", nf) } candidate = l.calldataTxCandidate(txdata.CallData()) + default: + l.Log.Crit("Unknown DA type", "da_type", txdata.daType) } l.sendTx(txdata, false, candidate, queue, receiptsCh) @@ -867,7 +862,7 @@ func (l *BatchSubmitter) sendTx(txdata txData, isCancel bool, candidate *txmgr.T candidate.GasLimit = intrinsicGas } - queue.Send(txRef{id: txdata.ID(), isCancel: isCancel, isBlob: txdata.asBlob}, *candidate, receiptsCh) + queue.Send(txRef{id: txdata.ID(), isCancel: isCancel, isBlob: txdata.daType == DaTypeBlob}, *candidate, receiptsCh) } func (l *BatchSubmitter) blobTxCandidate(data txData) (*txmgr.TxCandidate, error) { @@ -906,17 +901,18 @@ func (l *BatchSubmitter) handleReceipt(r txmgr.TxReceipt[txRef]) { func (l *BatchSubmitter) recordFailedDARequest(id txID, err error) { l.channelMgrMutex.Lock() defer l.channelMgrMutex.Unlock() + failover := errors.Is(err, altda.ErrAltDADown) if err != nil { - l.Log.Warn("DA request failed", logFields(id, err)...) + l.Log.Warn("DA request failed", append([]interface{}{"failoverToEthDA", failover}, logFields(id, err)...)...) } - l.channelMgr.TxFailed(id) + l.channelMgr.TxFailed(id, failover) } func (l *BatchSubmitter) recordFailedTx(id txID, err error) { l.channelMgrMutex.Lock() defer l.channelMgrMutex.Unlock() l.Log.Warn("Transaction failed to send", logFields(id, err)...) - l.channelMgr.TxFailed(id) + l.channelMgr.TxFailed(id, false) } func (l *BatchSubmitter) recordConfirmedTx(id txID, receipt *types.Receipt) { diff --git a/op-batcher/batcher/service.go b/op-batcher/batcher/service.go index f884c57b3eab..03031d7a494e 100644 --- a/op-batcher/batcher/service.go +++ b/op-batcher/batcher/service.go @@ -218,30 +218,40 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error { TargetNumFrames: cfg.TargetNumFrames, SubSafetyMargin: cfg.SubSafetyMargin, BatchType: cfg.BatchType, + // DaType: set below } - switch cfg.DataAvailabilityType { - case flags.BlobsType, flags.AutoType: - if !cfg.TestUseMaxTxSizeForBlobs { - // account for version byte prefix - cc.MaxFrameSize = eth.MaxBlobDataSize - 1 + if bs.UseAltDA { + if cfg.DataAvailabilityType == flags.CalldataType { + cc.DaType = DaTypeAltDA + } else { + return fmt.Errorf("altDA is currently only supported with calldata DA Type") } - cc.UseBlobs = true - case flags.CalldataType: // do nothing - default: - return fmt.Errorf("unknown data availability type: %v", cfg.DataAvailabilityType) - } + if cc.MaxFrameSize > altda.MaxInputSize { + return fmt.Errorf("max frame size %d exceeds altDA max input size %d", cc.MaxFrameSize, altda.MaxInputSize) + } + } else { - if bs.UseAltDA && cc.MaxFrameSize > altda.MaxInputSize { - return fmt.Errorf("max frame size %d exceeds altDA max input size %d", cc.MaxFrameSize, altda.MaxInputSize) + switch cfg.DataAvailabilityType { + case flags.BlobsType, flags.AutoType: + if !cfg.TestUseMaxTxSizeForBlobs { + // account for version byte prefix + cc.MaxFrameSize = eth.MaxBlobDataSize - 1 + } + cc.DaType = DaTypeBlob + case flags.CalldataType: // do nothing + cc.DaType = DaTypeCalldata + default: + return fmt.Errorf("unknown data availability type: %v", cfg.DataAvailabilityType) + } } cc.InitCompressorConfig(cfg.ApproxComprRatio, cfg.Compressor, cfg.CompressionAlgo) - if cc.UseBlobs && !bs.RollupConfig.IsEcotone(uint64(time.Now().Unix())) { + if cc.UseBlobs() && !bs.RollupConfig.IsEcotone(uint64(time.Now().Unix())) { return errors.New("cannot use Blobs before Ecotone") } - if !cc.UseBlobs && bs.RollupConfig.IsEcotone(uint64(time.Now().Unix())) { + if !cc.UseBlobs() && bs.RollupConfig.IsEcotone(uint64(time.Now().Unix())) { bs.Log.Warn("Ecotone upgrade is active, but batcher is not configured to use Blobs!") } @@ -273,7 +283,7 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error { calldataCC := cc calldataCC.TargetNumFrames = 1 calldataCC.MaxFrameSize = 120_000 - calldataCC.UseBlobs = false + calldataCC.DaType = DaTypeCalldata calldataCC.ReinitCompressorConfig() bs.ChannelConfig = NewDynamicEthChannelConfig(bs.Log, 10*time.Second, bs.TxManager, cc, calldataCC) diff --git a/op-batcher/batcher/test_batch_submitter.go b/op-batcher/batcher/test_batch_submitter.go index 93083aa0dc6d..f497a81209dc 100644 --- a/op-batcher/batcher/test_batch_submitter.go +++ b/op-batcher/batcher/test_batch_submitter.go @@ -28,7 +28,7 @@ func (l *TestBatchSubmitter) JamTxPool(ctx context.Context) error { var candidate *txmgr.TxCandidate var err error cc := l.channelMgr.cfgProvider.ChannelConfig(true) - if cc.UseBlobs { + if cc.UseBlobs() { candidate = l.calldataTxCandidate([]byte{}) } else if candidate, err = l.blobTxCandidate(emptyTxData); err != nil { return err diff --git a/op-batcher/batcher/tx_data.go b/op-batcher/batcher/tx_data.go index 0165f85f079e..79783e63f4b4 100644 --- a/op-batcher/batcher/tx_data.go +++ b/op-batcher/batcher/tx_data.go @@ -9,6 +9,31 @@ import ( "github.com/ethereum-optimism/optimism/op-service/eth" ) +// DaType determines how txData is submitted to L1. +type DaType int + +const ( + // DaTypeCalldata means that the (single) frame in the txData is submitted as calldata. + DaTypeCalldata DaType = iota + // DaTypeBlob means that the frame(s) in the txData are submitted as ethereum 4844 blobs. + DaTypeBlob + // DaTypeAltDA means that the frame(s) in the txData are submitted to an altda da-server. + DaTypeAltDA +) + +func (d DaType) String() string { + switch d { + case DaTypeCalldata: + return "calldata" + case DaTypeBlob: + return "blob" + case DaTypeAltDA: + return "alt_da" + default: + return fmt.Sprintf("unknown_da_type_%d", d) + } +} + // txData represents the data for a single transaction. // // Note: The batcher currently sends exactly one frame per transaction. This @@ -16,7 +41,8 @@ import ( // different channels. type txData struct { frames []frameData - asBlob bool // indicates whether this should be sent as blob + // daType represents the DA type which the frames data will be submitted to. + daType DaType } func singleFrameTxData(frame frameData) txData { diff --git a/op-batcher/flags/flags.go b/op-batcher/flags/flags.go index dee068cdb8f4..405fa1fdf81f 100644 --- a/op-batcher/flags/flags.go +++ b/op-batcher/flags/flags.go @@ -82,8 +82,10 @@ var ( EnvVars: prefixEnvVars("MAX_BLOCKS_PER_SPAN_BATCH"), } TargetNumFramesFlag = &cli.IntFlag{ - Name: "target-num-frames", - Usage: "The target number of frames to create per channel. Controls number of blobs per blob tx, if using Blob DA.", + Name: "target-num-frames", + Usage: "The target number of frames to create per channel. " + + "Controls number of blobs per blob tx, if using Blob DA, " + + "or number of frames per blob, if using altDA.", Value: 1, EnvVars: prefixEnvVars("TARGET_NUM_FRAMES"), } diff --git a/op-e2e/e2eutils/geth/wait.go b/op-e2e/e2eutils/geth/wait.go index 8356058afda7..17c6f16226ca 100644 --- a/op-e2e/e2eutils/geth/wait.go +++ b/op-e2e/e2eutils/geth/wait.go @@ -8,6 +8,7 @@ import ( "strings" "time" + "github.com/ethereum-optimism/optimism/op-e2e/e2eutils/transactions" "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum/go-ethereum" @@ -86,6 +87,31 @@ func WaitForTransaction(hash common.Hash, client *ethclient.Client, timeout time } } +// WaitForBlockWithTxFromSender waits for a block with a transaction from a specific sender address. +// It starts from the current block and checks the next nBlocks blocks. +func WaitForBlockWithTxFromSender(sender common.Address, client *ethclient.Client, nBlocks uint64) (*types.Block, error) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + blockNum, err := client.BlockNumber(ctx) + if err != nil { + return nil, err + } + for blockNum := blockNum; blockNum < blockNum+nBlocks; blockNum++ { + blockL1, err := WaitForBlock(big.NewInt(0).SetUint64(blockNum), client) + if err != nil { + return nil, err + } + batcherTxCount, err := transactions.TransactionsBySenderCount(blockL1, sender) + if err != nil { + return nil, err + } + if batcherTxCount > 0 { + return blockL1, nil + } + } + return nil, fmt.Errorf("no block with tx from sender %s found in the last %d blocks", sender.Hex(), nBlocks) +} + type waitForBlockOptions struct { noChangeTimeout time.Duration absoluteTimeout time.Duration diff --git a/op-e2e/e2eutils/transactions/count.go b/op-e2e/e2eutils/transactions/count.go index 0f4d41fe0478..7f9f05c2857f 100644 --- a/op-e2e/e2eutils/transactions/count.go +++ b/op-e2e/e2eutils/transactions/count.go @@ -5,7 +5,8 @@ import ( "github.com/ethereum/go-ethereum/core/types" ) -func TransactionsBySender(block *types.Block, sender common.Address) (int64, error) { +// TransactionsBySenderCount returns the number of transactions in the block that were sent by the given sender. +func TransactionsBySenderCount(block *types.Block, sender common.Address) (int64, error) { txCount := int64(0) for _, tx := range block.Transactions() { signer := types.NewCancunSigner(tx.ChainId()) @@ -19,3 +20,18 @@ func TransactionsBySender(block *types.Block, sender common.Address) (int64, err } return txCount, nil } + +func TransactionsBySender(block *types.Block, sender common.Address) ([]*types.Transaction, error) { + txs := make([]*types.Transaction, 0) + for _, tx := range block.Transactions() { + signer := types.NewCancunSigner(tx.ChainId()) + txSender, err := types.Sender(signer, tx) + if err != nil { + return nil, err + } + if txSender == sender { + txs = append(txs, tx) + } + } + return txs, nil +} diff --git a/op-e2e/system/altda/concurrent_test.go b/op-e2e/system/altda/concurrent_test.go index ef11a879dc70..19c0a0103bb4 100644 --- a/op-e2e/system/altda/concurrent_test.go +++ b/op-e2e/system/altda/concurrent_test.go @@ -73,7 +73,7 @@ func TestBatcherConcurrentAltDARequests(t *testing.T) { require.NoError(t, err, "Waiting for l1 blocks") // there are possibly other services (proposer/challenger) in the background sending txs // so we only count the batcher txs - batcherTxCount, err := transactions.TransactionsBySender(block, cfg.DeployConfig.BatchSenderAddress) + batcherTxCount, err := transactions.TransactionsBySenderCount(block, cfg.DeployConfig.BatchSenderAddress) require.NoError(t, err) if batcherTxCount > 1 { return diff --git a/op-e2e/system/altda/failover_test.go b/op-e2e/system/altda/failover_test.go new file mode 100644 index 000000000000..b1d55598bfaa --- /dev/null +++ b/op-e2e/system/altda/failover_test.go @@ -0,0 +1,84 @@ +package altda + +import ( + "math/big" + "testing" + + op_e2e "github.com/ethereum-optimism/optimism/op-e2e" + "github.com/ethereum-optimism/optimism/op-node/rollup/derive/params" + "github.com/ethereum/go-ethereum/log" + + "github.com/ethereum-optimism/optimism/op-batcher/flags" + "github.com/ethereum-optimism/optimism/op-e2e/e2eutils/geth" + "github.com/ethereum-optimism/optimism/op-e2e/e2eutils/transactions" + "github.com/ethereum-optimism/optimism/op-e2e/system/e2esys" + "github.com/stretchr/testify/require" +) + +// TestBatcher_FailoverToEthDA_FallbackToAltDA tests that the batcher will failover to ethDA +// if the da-server returns 503. It also tests that the batcher successfully returns to normal +// behavior of posting batches to altda once it becomes available again +// (i.e. the da-server doesn't return 503 anymore). +func TestBatcher_FailoverToEthDA_FallbackToAltDA(t *testing.T) { + op_e2e.InitParallel(t) + + nChannelsFailover := uint64(2) + + cfg := e2esys.DefaultSystemConfig(t, e2esys.WithLogLevel(log.LevelCrit)) + cfg.DeployConfig.UseAltDA = true + cfg.DeployConfig.DACommitmentType = "GenericCommitment" + cfg.DeployConfig.DAChallengeWindow = 16 + cfg.DeployConfig.DAResolveWindow = 16 + cfg.DeployConfig.DABondSize = 1000000 + cfg.DeployConfig.DAResolverRefundPercentage = 0 + // With these settings, the batcher will post a single commitment per L1 block, + // so it's easy to trigger failover and observe the commitment changing on the next L1 block. + cfg.BatcherMaxPendingTransactions = 1 // no limit on parallel txs + cfg.BatcherMaxConcurrentDARequest = 1 + cfg.BatcherBatchType = 0 + // We make channels as small as possible, such that they contain a single commitment. + // This is because failover to ethDA happens on a per-channel basis (each new channel is sent to altDA first). + // Hence, we can quickly observe the failover (to ethda) and fallback (to altda) behavior. + // cfg.BatcherMaxL1TxSizeBytes = 1200 + // currently altda commitments can only be sent as calldata + cfg.DataAvailabilityType = flags.CalldataType + + sys, err := cfg.Start(t) + require.NoError(t, err, "Error starting up system") + defer sys.Close() + l1Client := sys.NodeClient("l1") + + startBlockL1, err := geth.WaitForBlockWithTxFromSender(cfg.DeployConfig.BatchSenderAddress, l1Client, 10) + require.NoError(t, err) + + // Simulate altda server returning 503 + sys.FakeAltDAServer.SetPutFailoverForNRequests(nChannelsFailover) + + countEthDACommitment := uint64(0) + + // There is some nondeterministic timing behavior that affects whether the batcher has already + // posted batches before seeing the above SetPutFailoverForNRequests behavior change. + // Most likely, sequence of blocks will be: altDA, ethDA, ethDA, altDA, altDA, altDA. + // 2 ethDA are expected (and checked for) because nChannelsFailover=2, so da-server will return 503 for 2 requests only, + // and the batcher always tries altda first for a new channel, and failsover to ethDA only if altda returns 503. + for blockNumL1 := startBlockL1.NumberU64(); blockNumL1 < startBlockL1.NumberU64()+6; blockNumL1++ { + blockL1, err := geth.WaitForBlock(big.NewInt(0).SetUint64(blockNumL1), l1Client) + require.NoError(t, err) + batcherTxs, err := transactions.TransactionsBySender(blockL1, cfg.DeployConfig.BatchSenderAddress) + require.NoError(t, err) + require.Equal(t, 1, len(batcherTxs)) // sanity check: ensure BatcherMaxPendingTransactions=1 is working + batcherTx := batcherTxs[0] + if batcherTx.Data()[0] == 1 { + t.Log("blockL1", blockNumL1, "batcherTxType", "altda") + } else if batcherTx.Data()[0] == 0 { + t.Log("blockL1", blockNumL1, "batcherTxType", "ethda") + } else { + t.Fatalf("unexpected batcherTxType: %v", batcherTx.Data()[0]) + } + if batcherTx.Data()[0] == byte(params.DerivationVersion0) { + countEthDACommitment++ + } + } + require.Equal(t, nChannelsFailover, countEthDACommitment, "Expected %v ethDA commitments, got %v", nChannelsFailover, countEthDACommitment) + +} diff --git a/op-e2e/system/da/multi_test.go b/op-e2e/system/da/multi_test.go index 461270282008..e8b7ea6ff266 100644 --- a/op-e2e/system/da/multi_test.go +++ b/op-e2e/system/da/multi_test.go @@ -52,7 +52,7 @@ func TestBatcherMultiTx(t *testing.T) { block, err := l1Client.BlockByNumber(ctx, big.NewInt(int64(i))) require.NoError(t, err) - batcherTxCount, err := transactions.TransactionsBySender(block, cfg.DeployConfig.BatchSenderAddress) + batcherTxCount, err := transactions.TransactionsBySenderCount(block, cfg.DeployConfig.BatchSenderAddress) require.NoError(t, err) totalBatcherTxsCount += batcherTxCount diff --git a/op-e2e/system/e2esys/setup.go b/op-e2e/system/e2esys/setup.go index 12f9af850788..dc67991bc7df 100644 --- a/op-e2e/system/e2esys/setup.go +++ b/op-e2e/system/e2esys/setup.go @@ -6,6 +6,7 @@ import ( "crypto/rand" "errors" "fmt" + "log/slog" "math/big" "net" "os" @@ -87,6 +88,7 @@ var ( type SystemConfigOpts struct { AllocType config.AllocType + LogLevel slog.Level } type SystemConfigOpt func(s *SystemConfigOpts) @@ -97,9 +99,16 @@ func WithAllocType(allocType config.AllocType) SystemConfigOpt { } } +func WithLogLevel(level slog.Level) SystemConfigOpt { + return func(s *SystemConfigOpts) { + s.LogLevel = level + } +} + func DefaultSystemConfig(t testing.TB, opts ...SystemConfigOpt) SystemConfig { sco := &SystemConfigOpts{ AllocType: config.DefaultAllocType, + LogLevel: slog.LevelInfo, } for _, opt := range opts { opt(sco) @@ -110,7 +119,7 @@ func DefaultSystemConfig(t testing.TB, opts ...SystemConfigOpt) SystemConfig { deployConfig := config.DeployConfig(sco.AllocType) deployConfig.L1GenesisBlockTimestamp = hexutil.Uint64(time.Now().Unix()) e2eutils.ApplyDeployConfigForks(deployConfig) - require.NoError(t, deployConfig.Check(testlog.Logger(t, log.LevelInfo)), + require.NoError(t, deployConfig.Check(testlog.Logger(t, sco.LogLevel).New("role", "config-check")), "Deploy config is invalid, do you need to run make devnet-allocs?") l1Deployments := config.L1Deployments(sco.AllocType) require.NoError(t, l1Deployments.Check(deployConfig)) @@ -172,11 +181,12 @@ func DefaultSystemConfig(t testing.TB, opts ...SystemConfigOpt) SystemConfig { }, }, Loggers: map[string]log.Logger{ - RoleVerif: testlog.Logger(t, log.LevelInfo).New("role", RoleVerif), - RoleSeq: testlog.Logger(t, log.LevelInfo).New("role", RoleSeq), - "batcher": testlog.Logger(t, log.LevelInfo).New("role", "batcher"), - "proposer": testlog.Logger(t, log.LevelInfo).New("role", "proposer"), - "da-server": testlog.Logger(t, log.LevelInfo).New("role", "da-server"), + RoleVerif: testlog.Logger(t, sco.LogLevel).New("role", RoleVerif), + RoleSeq: testlog.Logger(t, sco.LogLevel).New("role", RoleSeq), + "batcher": testlog.Logger(t, sco.LogLevel).New("role", "batcher"), + "proposer": testlog.Logger(t, sco.LogLevel).New("role", "proposer"), + "da-server": testlog.Logger(t, sco.LogLevel).New("role", "da-server"), + "config-check": testlog.Logger(t, sco.LogLevel).New("role", "config-check"), }, GethOptions: map[string][]geth.GethOption{}, P2PTopology: nil, // no P2P connectivity by default @@ -275,12 +285,10 @@ type SystemConfig struct { // L1FinalizedDistance is the distance from the L1 head that L1 blocks will be artificially finalized on. L1FinalizedDistance uint64 - Premine map[common.Address]*big.Int - Nodes map[string]*rollupNode.Config // Per node config. Don't use populate rollup.Config - Loggers map[string]log.Logger - GethOptions map[string][]geth.GethOption - ProposerLogger log.Logger - BatcherLogger log.Logger + Premine map[common.Address]*big.Int + Nodes map[string]*rollupNode.Config // Per node config. Don't use populate rollup.Config + Loggers map[string]log.Logger + GethOptions map[string][]geth.GethOption ExternalL2Shim string @@ -551,7 +559,7 @@ func (cfg SystemConfig) Start(t *testing.T, startOpts ...StartOption) (*System, c = sys.TimeTravelClock } - if err := cfg.DeployConfig.Check(testlog.Logger(t, log.LevelInfo)); err != nil { + if err := cfg.DeployConfig.Check(cfg.Loggers["config-check"]); err != nil { return nil, err }