Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(batcher): altda parallel submitted blobs respect strict holocene order #21

Open
wants to merge 6 commits into
base: eigenda-develop
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions .github/workflows/test-golang.yml
Original file line number Diff line number Diff line change
@@ -48,7 +48,6 @@ jobs:
strategy:
matrix:
packages:
- op-batcher
- op-node
- op-e2e/system/altda
- op-e2e/actions/altda
@@ -68,7 +67,6 @@ jobs:
with:
path: packages/contracts-bedrock/forge-artifacts
key: ${{ runner.os }}-forge-${{ hashFiles('packages/contracts-bedrock/src/**/*.sol') }}

# Cache has been stored in the build-and-cache-contracts job, so if this fails there's a problem
- name: Check cache restore
if: steps.cache-restore.outputs.cache-hit != 'true'
@@ -96,3 +94,38 @@ jobs:
- name: Run tests
run: |
go test -timeout=10m ./${{ matrix.packages }}/...
# We test the batcher independently because one of its test relies on the failpoint tool
# See the test-failpoint job in op-batcher/justfile for more details
test-batcher:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- uses: jdx/mise-action@v2
with:
version: 2024.12.14 # [default: latest] mise version to install
install: true # [default: true] run `mise install`
cache: true # [default: true] cache mise using GitHub's cache
experimental: true # [default: false] enable experimental features

# We use mise to install golang instead of the setup-go action,
# so we need to do the cache setup ourselves
- name: Go Module Cache
uses: actions/cache@v3
id: go-cache
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-
# Add explicit download on cache miss
# go test runs `go mod download` implicitly, but this separation is nice to see how long downloading vs running tests takes
- name: Download Go modules
if: steps.go-cache.outputs.cache-hit != 'true'
run: go mod download

- name: Run tests
working-directory: op-batcher
run: just test
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -44,6 +44,7 @@ require (
github.com/multiformats/go-multiaddr v0.14.0
github.com/multiformats/go-multiaddr-dns v0.4.1
github.com/olekukonko/tablewriter v0.0.5
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86
github.com/pkg/errors v0.9.1
github.com/pkg/profile v1.7.0
github.com/prometheus/client_golang v1.20.5
@@ -189,6 +190,7 @@ require (
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
github.com/peterh/liner v1.1.1-0.20190123174540-a2c9a5303de7 // indirect
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
github.com/pingcap/errors v0.11.4 // indirect
github.com/pion/datachannel v1.5.8 // indirect
github.com/pion/dtls/v2 v2.2.12 // indirect
github.com/pion/ice/v2 v2.3.34 // indirect
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -640,6 +640,8 @@ github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9F
github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4=
github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 h1:tdMsjOqUR7YXHoBitzdebTvOjs/swniBTOLy5XiMtuE=
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86/go.mod h1:exzhVYca3WRtd6gclGNErRWb1qEgff3LYta0LvRmON4=
github.com/pion/datachannel v1.5.8 h1:ph1P1NsGkazkjrvyMfhRBUAWMxugJjq2HfQifaOoSNo=
github.com/pion/datachannel v1.5.8/go.mod h1:PgmdpoaNBLX9HNzNClmdki4DYW5JtI7Yibu8QzbL3tI=
github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ/F9s=
2 changes: 1 addition & 1 deletion justfiles/go.just
Original file line number Diff line number Diff line change
@@ -28,4 +28,4 @@ go_fuzz FUZZ TIME='10s' PKG='': (go_test PKG _EXTRALDFLAGS "-fuzztime" TIME "-fu

[private]
go_generate SELECTOR *FLAGS:
go generate -v {{FLAGS}} {{SELECTOR}}
go generate -v {{FLAGS}} {{SELECTOR}}
1 change: 1 addition & 0 deletions mise.toml
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@ just = "1.37.0"
"go:gotest.tools/gotestsum" = "1.12.0"
"go:github.com/vektra/mockery/v2" = "2.46.0"
"go:github.com/golangci/golangci-lint/cmd/golangci-lint" = "1.61.0"
"go:github.com/pingcap/failpoint/failpoint-toolexec" = "v0.0.0-20240528011301-b51a646c7c86"

# Python dependencies
"pipx:slither-analyzer" = "0.10.2"
8 changes: 6 additions & 2 deletions op-alt-da/cli.go
Original file line number Diff line number Diff line change
@@ -102,8 +102,12 @@ func (c CLIConfig) Check() error {
return nil
}

func (c CLIConfig) NewDAClient() *DAClient {
return &DAClient{url: c.DAServerURL, verify: c.VerifyOnRead, precompute: !c.GenericDA, getTimeout: c.GetTimeout, putTimeout: c.PutTimeout}
func (c CLIConfig) NewDAClient() (*DAClient, error) {
err := c.Check()
if err != nil {
return nil, err
}
return &DAClient{url: c.DAServerURL, verify: c.VerifyOnRead, precompute: !c.GenericDA, getTimeout: c.GetTimeout, putTimeout: c.PutTimeout}, nil
}

func ReadCLIConfig(c *cli.Context) CLIConfig {
6 changes: 4 additions & 2 deletions op-alt-da/daclient_test.go
Original file line number Diff line number Diff line change
@@ -27,7 +27,8 @@ func TestDAClientPrecomputed(t *testing.T) {
}
require.NoError(t, cfg.Check())

client := cfg.NewDAClient()
client, err := cfg.NewDAClient()
require.NoError(t, err)

rng := rand.New(rand.NewSource(1234))

@@ -85,7 +86,8 @@ func TestDAClientService(t *testing.T) {
}
require.NoError(t, cfg.Check())

client := cfg.NewDAClient()
client, err := cfg.NewDAClient()
require.NoError(t, err)

rng := rand.New(rand.NewSource(1234))

8 changes: 6 additions & 2 deletions op-alt-da/damgr.go
Original file line number Diff line number Diff line change
@@ -78,8 +78,12 @@ type DA struct {
}

// NewAltDA creates a new AltDA instance with the given log and CLIConfig.
func NewAltDA(log log.Logger, cli CLIConfig, cfg Config, metrics Metricer) *DA {
return NewAltDAWithStorage(log, cfg, cli.NewDAClient(), metrics)
func NewAltDA(log log.Logger, cli CLIConfig, cfg Config, metrics Metricer) (*DA, error) {
daClient, err := cli.NewDAClient()
if err != nil {
return nil, err
}
return NewAltDAWithStorage(log, cfg, daClient, metrics), nil
}

// NewAltDAWithStorage creates a new AltDA instance with the given log and DAStorage interface.
112 changes: 105 additions & 7 deletions op-alt-da/damock.go
Original file line number Diff line number Diff line change
@@ -2,7 +2,9 @@ package altda

import (
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"net/http"
"sync"
@@ -16,22 +18,53 @@ import (
)

// MockDAClient mocks a DA storage provider to avoid running an HTTP DA server
// in unit tests.
// in unit tests. MockDAClient is goroutine-safe.
type MockDAClient struct {
CommitmentType CommitmentType
store ethdb.KeyValueStore
log log.Logger
mu sync.Mutex
CommitmentType CommitmentType
GenericCommitmentCount uint16 // next generic commitment (use counting commitment instead of hash to help with testing)
store ethdb.KeyValueStore
StoreCount int
log log.Logger
dropEveryNthPut uint // 0 means nothing gets dropped, 1 means every put errors, etc.
setInputRequestCount uint // number of put requests received, irrespective of whether they were successful
}

func NewMockDAClient(log log.Logger) *MockDAClient {
return &MockDAClient{
CommitmentType: Keccak256CommitmentType,
store: memorydb.New(),
StoreCount: 0,
log: log,
}
}

// NewCountingGenericCommitmentMockDAClient creates a MockDAClient that uses counting commitments.
// Its commitments are big-endian encoded uint16s of 0, 1, 2, etc. instead of actual hash or altda-layer related commitments.
// Used for testing to make sure we receive commitments in order following Holocene strict ordering rules.
func NewCountingGenericCommitmentMockDAClient(log log.Logger) *MockDAClient {
return &MockDAClient{
CommitmentType: GenericCommitmentType,
GenericCommitmentCount: 0,
store: memorydb.New(),
StoreCount: 0,
log: log,
}
}

// Fakes a da server that drops/errors on every Nth put request.
// Useful for testing the batcher's error handling.
// 0 means nothing gets dropped, 1 means every put errors, etc.
func (c *MockDAClient) DropEveryNthPut(n uint) {
c.mu.Lock()
defer c.mu.Unlock()
c.dropEveryNthPut = n
}

func (c *MockDAClient) GetInput(ctx context.Context, key CommitmentData) ([]byte, error) {
c.mu.Lock()
defer c.mu.Unlock()
c.log.Debug("Getting input", "key", key)
bytes, err := c.store.Get(key.Encode())
if err != nil {
return nil, ErrNotFound
@@ -40,12 +73,46 @@ func (c *MockDAClient) GetInput(ctx context.Context, key CommitmentData) ([]byte
}

func (c *MockDAClient) SetInput(ctx context.Context, data []byte) (CommitmentData, error) {
key := NewCommitmentData(c.CommitmentType, data)
return key, c.store.Put(key.Encode(), data)
c.mu.Lock()
defer c.mu.Unlock()
c.setInputRequestCount++
var key CommitmentData
if c.CommitmentType == GenericCommitmentType {
countCommitment := make([]byte, 2)
binary.BigEndian.PutUint16(countCommitment, c.GenericCommitmentCount)
key = NewGenericCommitment(countCommitment)
} else {
key = NewKeccak256Commitment(data)
}
var action string = "put"
if c.dropEveryNthPut > 0 && c.setInputRequestCount%c.dropEveryNthPut == 0 {
action = "dropped"
}
c.log.Debug("Setting input", "action", action, "key", key, "data", fmt.Sprintf("%x", data))
if action == "dropped" {
return nil, errors.New("put dropped")
}
err := c.store.Put(key.Encode(), data)
if err == nil {
c.GenericCommitmentCount++
c.StoreCount++
}
return key, err
}

func (c *MockDAClient) DeleteData(key []byte) error {
return c.store.Delete(key)
c.mu.Lock()
defer c.mu.Unlock()
c.log.Debug("Deleting data", "key", key)
// memorydb.Delete() returns nil even when the key doesn't exist, so we need to check if the key exists
// before decrementing StoreCount.
var err error
if _, err = c.store.Get(key); err == nil {
if err = c.store.Delete(key); err == nil {
c.StoreCount--
}
}
return err
}

type DAErrFaker struct {
@@ -111,6 +178,12 @@ type FakeDAServer struct {
*DAServer
putRequestLatency time.Duration
getRequestLatency time.Duration
// outOfOrderResponses is a flag that, when set, causes the server to send responses out of order.
// It will only respond to pairs of request, returning the second response first, and waiting 1 second before sending the first response.
// This is used to test the batcher's ability to handle out of order responses, while still ensuring holocene's strict ordering rules.
outOfOrderResponses bool
oooMu sync.Mutex
oooWaitChan chan struct{}
}

func NewFakeDAServer(host string, port int, log log.Logger) *FakeDAServer {
@@ -130,6 +203,21 @@ func (s *FakeDAServer) HandleGet(w http.ResponseWriter, r *http.Request) {

func (s *FakeDAServer) HandlePut(w http.ResponseWriter, r *http.Request) {
time.Sleep(s.putRequestLatency)
if s.outOfOrderResponses {
s.oooMu.Lock()
if s.oooWaitChan == nil {
s.log.Info("Received put request while in out-of-order mode, waiting for next request")
s.oooWaitChan = make(chan struct{})
s.oooMu.Unlock()
<-s.oooWaitChan
time.Sleep(1 * time.Second)
} else {
s.log.Info("Received second put request in out-of-order mode, responding to this one first, then the first one")
close(s.oooWaitChan)
s.oooWaitChan = nil
s.oooMu.Unlock()
}
}
s.DAServer.HandlePut(w, r)
}

@@ -147,13 +235,23 @@ func (s *FakeDAServer) Start() error {
}

func (s *FakeDAServer) SetPutRequestLatency(latency time.Duration) {
s.log.Info("Setting put request latency", "latency", latency)
s.putRequestLatency = latency
}

func (s *FakeDAServer) SetGetRequestLatency(latency time.Duration) {
s.log.Info("Setting get request latency", "latency", latency)
s.getRequestLatency = latency
}

// When ooo=true, causes the server to send responses out of order.
// It will only respond to pairs of request, returning the second response first, and waiting 1 second before sending the first response.
// This is used to test the batcher's ability to handle out of order responses, while still ensuring holocene's strict ordering rules.
func (s *FakeDAServer) SetOutOfOrderResponses(ooo bool) {
s.log.Info("Setting out of order responses", "ooo", ooo)
s.outOfOrderResponses = ooo
}

type MemStore struct {
db map[string][]byte
lock sync.RWMutex
65 changes: 65 additions & 0 deletions op-alt-da/damock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package altda

import (
"net/http/httptest"
"sync"
"testing"
"time"

"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum/go-ethereum/log"
)

func TestFakeDAServer_OutOfOrderResponses(t *testing.T) {
logger := testlog.Logger(t, log.LevelDebug)
daServer := NewFakeDAServer("localhost", 0, logger)
daServer.SetOutOfOrderResponses(true)

// Channel to track completion order
completionOrder := make(chan int, 2)

// Start two concurrent requests
var wg sync.WaitGroup
wg.Add(2)

// First request
go func() {
defer wg.Done()
w := httptest.NewRecorder()
r := httptest.NewRequest("PUT", "/data", nil)

daServer.HandlePut(w, r)
completionOrder <- 1
}()

// Small delay to ensure first request starts first
time.Sleep(100 * time.Millisecond)

// Second request
go func() {
defer wg.Done()
w := httptest.NewRecorder()
r := httptest.NewRequest("PUT", "/data", nil)

daServer.HandlePut(w, r)
completionOrder <- 2
}()

// Wait for both requests to complete
wg.Wait()
close(completionOrder)

// Check completion order
var order []int
for n := range completionOrder {
order = append(order, n)
}

// Second request should complete before first
if len(order) != 2 {
t.Fatalf("expected 2 requests to complete, got %d", len(order))
}
if order[0] != 2 || order[1] != 1 {
t.Errorf("expected completion order [2,1], got %v", order)
}
}
103 changes: 97 additions & 6 deletions op-batcher/batcher/channel.go
Original file line number Diff line number Diff line change
@@ -3,12 +3,14 @@ package batcher
import (
"math"

altda "github.com/ethereum-optimism/optimism/op-alt-da"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/pingcap/failpoint"
)

// channel is a lightweight wrapper around a ChannelBuilder which keeps track of pending
@@ -20,7 +22,16 @@ type channel struct {

// pending channel builder
channelBuilder *ChannelBuilder
// Set of unconfirmed txID -> tx data. For tx resubmission
// Temporary cache for altDACommitments that are received potentially out of order from the da layer.
// Map: first frameNumber in txData -> txData (that contains an altDACommitment)
// Once the txData containing altDAFrameCursor is received, it will be pulled out of the
// channel on the next driver iteration, and sent to L1.
altDACommitments map[uint16]txData
// Points to the next frame number to send to L1 in order to maintain holocene strict ordering rules.
// When altDACommitments[altDAFrameCursor] is non-nil, it will be sent to L1.
altDAFrameCursor uint16
// Set of unconfirmed txID -> tx data. For tx resubmission.
// Also used for altda for the entirity of the submission (data -> commitment -> tx).
pendingTransactions map[string]txData
// Set of confirmed txID -> inclusion block. For determining if the channel is timed out
confirmedTransactions map[string]eth.BlockID
@@ -38,21 +49,64 @@ func newChannel(log log.Logger, metr metrics.Metricer, cfg ChannelConfig, rollup
metr: metr,
cfg: cfg,
channelBuilder: cb,
altDACommitments: make(map[uint16]txData),
pendingTransactions: make(map[string]txData),
confirmedTransactions: make(map[string]eth.BlockID),
minInclusionBlock: math.MaxUint64,
}
}

// CacheAltDACommitment caches the commitment received from the DA layer for the given txData.
// We cannot submit it directly to L1 yet, as we need to make sure the commitments are submitted in order,
// according to the holocene rules. Therefore, we cache the commitment and let the channelManager
// decide when to pull them out of the channel and send them to L1.
func (s *channel) CacheAltDACommitment(txData txData, commitment altda.CommitmentData) {
if commitment == nil {
panic("expected non-nil commitment")
}
if len(txData.frames) == 0 {
panic("expected txData to have frames")
}
txData.altDACommitment = commitment
s.log.Debug("caching altDA commitment", "frame", txData.frames[0].id.frameNumber, "commitment", commitment.String())
s.altDACommitments[txData.frames[0].id.frameNumber] = txData
}

func (s *channel) rewindAltDAFrameCursor(txData txData) {
if len(txData.frames) == 0 {
panic("expected txData to have frames")
}
s.altDAFrameCursor = txData.frames[0].id.frameNumber
}

func (s *channel) AltDASubmissionFailed(id string) {
// We coopt TxFailed to rewind the frame cursor.
// This will force a resubmit of all the following frames as well,
// even if they had already successfully been submitted and their commitment cached.
// Ideally we'd have another way but for simplicity and to not tangle the altda code
// too much with the non altda code, we reuse the FrameCursor feature.
// TODO: is there a better abstraction for altda channels? FrameCursors are not well suited
// since frames do not have to be sent in order to the altda, only their commitment does.
s.TxFailed(id)
}

// TxFailed records a transaction as failed. It will attempt to resubmit the data
// in the failed transaction.
func (c *channel) TxFailed(id string) {
if data, ok := c.pendingTransactions[id]; ok {
c.log.Trace("marked transaction as failed", "id", id)
// Rewind to the first frame of the failed tx
// -- the frames are ordered, and we want to send them
// all again.
c.channelBuilder.RewindFrameCursor(data.Frames()[0])
if data.altDACommitment != nil {
// In altDA mode, we don't want to rewind the channelBuilder's frameCursor
// because that will lead to resubmitting the same data to the da layer.
// We simply need to rewind the altDAFrameCursor to the first frame of the failed txData,
// to force a resubmit of the cached altDACommitment.
c.rewindAltDAFrameCursor(data)
} else {
// Rewind to the first frame of the failed tx
// -- the frames are ordered, and we want to send them
// all again.
c.channelBuilder.RewindFrameCursor(data.Frames()[0])
}
delete(c.pendingTransactions, id)
} else {
c.log.Warn("unknown transaction marked as failed", "id", id)
@@ -89,7 +143,16 @@ func (c *channel) TxConfirmed(id string, inclusionBlock eth.BlockID) bool {
// and then reset this state so it can try to build a new channel.
if c.isTimedOut() {
c.metr.RecordChannelTimedOut(c.ID())
c.log.Warn("Channel timed out", "id", c.ID(), "min_inclusion_block", c.minInclusionBlock, "max_inclusion_block", c.maxInclusionBlock)
var chanFirstL2BlockNum, chanLastL2BlockNum uint64
if c.channelBuilder.blocks.Len() > 0 {
chanFirstL2Block, _ := c.channelBuilder.blocks.Peek()
chanLastL2Block, _ := c.channelBuilder.blocks.PeekN(c.channelBuilder.blocks.Len() - 1)
chanFirstL2BlockNum = chanFirstL2Block.NumberU64()
chanLastL2BlockNum = chanLastL2Block.NumberU64()
}
c.log.Warn("Channel timed out", "id", c.ID(),
"min_l1_inclusion_block", c.minInclusionBlock, "max_l1_inclusion_block", c.maxInclusionBlock,
"first_l2_block", chanFirstL2BlockNum, "last_l2_block", chanLastL2BlockNum)
return true
}

@@ -105,6 +168,12 @@ func (c *channel) Timeout() uint64 {
// A channel has timed out if the difference in L1 Inclusion blocks between
// the first & last included block is greater than or equal to the channel timeout.
func (c *channel) isTimedOut() bool {
// This is used in tests to inject timeouts. It is a noop when not in test mode.
// See op-batcher/justfile's test command for more info.
failpoint.Inject("channel.isTimedOut", func() {
c.log.Warn("channel.isTimedOut failpoint triggered, returning true")
failpoint.Return(true)
})
// Prior to the granite hard fork activating, the use of the shorter ChannelTimeout here may cause the batcher
// to believe the channel timed out when it was valid. It would then resubmit the blocks needlessly.
// This wastes batcher funds but doesn't cause any problems for the chain progressing safe head.
@@ -124,6 +193,28 @@ func (c *channel) ID() derive.ChannelID {
return c.channelBuilder.ID()
}

// NextAltDACommitment checks if it has already received the altDA commitment
// of the txData whose first frame is altDAFrameCursor. If it has, it returns
// the txData and true. Otherwise, it returns an empty txData and false.
func (c *channel) NextAltDACommitment() (txData, bool) {
if txData, ok := c.altDACommitments[c.altDAFrameCursor]; ok {
if txData.altDACommitment == nil {
panic("expected altDACommitment to be non-nil")
}
if len(txData.frames) == 0 {
panic("expected txData to have frames")
}
// update altDAFrameCursor to the first frame of the next txData
lastFrame := txData.frames[len(txData.frames)-1]
c.altDAFrameCursor = lastFrame.id.frameNumber + 1
// We also store it in pendingTransactions so that TxFailed can know
// that this tx's altDA commitment was already cached.
c.pendingTransactions[txData.ID().String()] = txData
return txData, true
}
return txData{}, false
}

// NextTxData dequeues the next frames from the channel and returns them encoded in a tx data packet.
// If cfg.UseBlobs is false, it returns txData with a single frame.
// If cfg.UseBlobs is true, it will read frames from its channel builder
57 changes: 55 additions & 2 deletions op-batcher/batcher/channel_manager.go
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@ import (
"io"
"math"

altda "github.com/ethereum-optimism/optimism/op-alt-da"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
@@ -53,7 +54,7 @@ type channelManager struct {
currentChannel *channel
// channels to read frame data from, for writing batches onchain
channelQueue []*channel
// used to lookup channels by tx ID upon tx success / failure
// used to lookup channels by tx ID upon altda and tx success / failure
txChannels map[string]*channel
}

@@ -91,6 +92,40 @@ func (s *channelManager) pendingBlocks() int {
return s.blocks.Len() - s.blockCursor
}

// CacheAltDACommitment caches the commitment received from the DA layer for the given txData.
// We cannot submit it directly to L1 yet, as we need to make sure the commitments are submitted in order,
// according to the holocene rules. Therefore, we cache them and let the channelManager decide when to submit them.
func (s *channelManager) CacheAltDACommitment(txData txData, commitment altda.CommitmentData) {
if len(txData.frames) == 0 {
panic("no frames in txData")
}
firstFrame, lastFrame := txData.frames[0], txData.frames[len(txData.frames)-1]
if firstFrame.id.chID != lastFrame.id.chID {
// The current implementation caches commitments inside channels,
// so it assumes that a txData only contains frames from a single channel.
// If this ever panics (hopefully in tests...) it shouldn't be too hard to fix.
panic("commitment spans multiple channels")
}
if channel, ok := s.txChannels[txData.ID().String()]; ok {
channel.CacheAltDACommitment(txData, commitment)
} else {
s.log.Warn("Trying to cache altda commitment for txData from unknown channel. Probably some state reset (from reorg?) happened.", "id", txData.ID())
}
}

// AltDASubmissionFailed marks a DA submission as having failed to be submitted to the DA layer.
// The frames will be pushed back into the corresponding channel such that they can be pulled again by the
// driver main loop and resent to the DA layer.
func (s *channelManager) AltDASubmissionFailed(_id txID) {
id := _id.String()
if channel, ok := s.txChannels[id]; ok {
delete(s.txChannels, id)
channel.AltDASubmissionFailed(id)
} else {
s.log.Warn("transaction from unknown channel marked as failed", "id", id)
}
}

// TxFailed records a transaction as failed. It will attempt to resubmit the data
// in the failed transaction.
func (s *channelManager) TxFailed(_id txID) {
@@ -183,6 +218,20 @@ func (s *channelManager) nextTxData(channel *channel) (txData, error) {
return tx, nil
}

func (s *channelManager) getNextAltDACommitment() (txData, bool) {
for _, channel := range s.channelQueue {
// if all frames have already been sent to altda, skip this channel
if int(channel.altDAFrameCursor) == channel.channelBuilder.TotalFrames() {
continue
}
if txData, ok := channel.NextAltDACommitment(); ok {
return txData, true
}
break // We need to send the commitments in order, so we can't skip to the next channel
}
return emptyTxData, false
}

// TxData returns the next tx data that should be submitted to L1.
//
// If the current channel is
@@ -193,6 +242,10 @@ func (s *channelManager) nextTxData(channel *channel) (txData, error) {
// When switching DA type, the channelManager state will be rebuilt
// with a new ChannelConfig.
func (s *channelManager) TxData(l1Head eth.BlockID, isPectra bool) (txData, error) {
// if any altda commitment is ready, return it
if txdata, ok := s.getNextAltDACommitment(); ok {
return txdata, nil
}
channel, err := s.getReadyChannel(l1Head)
if err != nil {
return emptyTxData, err
@@ -250,7 +303,7 @@ func (s *channelManager) getReadyChannel(l1Head eth.BlockID) (*channel, error) {
}

dataPending := firstWithTxData != nil
s.log.Debug("Requested tx data", "l1Head", l1Head, "txdata_pending", dataPending, "blocks_pending", s.blocks.Len())
s.log.Debug("Requested tx data", "l1Head", l1Head, "txdata_pending", dataPending, "blocks_pending", s.pendingBlocks())

// Short circuit if there is pending tx data or the channel manager is closed
if dataPending {
44 changes: 29 additions & 15 deletions op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
@@ -79,6 +79,10 @@ type RollupClient interface {
SyncStatus(ctx context.Context) (*eth.SyncStatus, error)
}

type AltDAClient interface {
SetInput(ctx context.Context, data []byte) (altda.CommitmentData, error)
}

// DriverSetup is the collection of input/output interfaces and configuration that the driver operates on.
type DriverSetup struct {
Log log.Logger
@@ -89,7 +93,7 @@ type DriverSetup struct {
L1Client L1Client
EndpointProvider dial.L2EndpointProvider
ChannelConfig ChannelConfigProvider
AltDA *altda.DAClient
AltDA AltDAClient
ChannelOutFactory ChannelOutFactory
}

@@ -719,6 +723,12 @@ func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[t
}
l.Metr.RecordLatestL1Block(l1tip)

// In AltDA mode, before pulling data out of the state, we make sure
// that the daGroup has not reached the maximum number of goroutines.
// This is to prevent blocking the main event loop when submitting the data to the DA Provider.
if l.Config.UseAltDA && !daGroup.TryGo(func() error { return nil }) {
return io.EOF
}
// Collect next transaction data. This pulls data out of the channel, so we need to make sure
// to put it back if ever da or txmgr requests fail, by calling l.recordFailedDARequest/recordFailedTx.
l.channelMgrMutex.Lock()
@@ -778,8 +788,9 @@ func (l *BatchSubmitter) cancelBlockingTx(queue *txmgr.Queue[txRef], receiptsCh
l.sendTx(txData{}, true, candidate, queue, receiptsCh)
}

// publishToAltDAAndL1 posts the txdata to the DA Provider and then sends the commitment to L1.
func (l *BatchSubmitter) publishToAltDAAndL1(txdata txData, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daGroup *errgroup.Group) {
// publishToAltDAAndStoreCommitment posts the txdata to the DA Provider and stores the returned commitment
// in the state. The commitment will later be sent to the L1 while making sure to follow holocene's strict ordering rules.
func (l *BatchSubmitter) publishToAltDAAndStoreCommitment(txdata txData, daGroup *errgroup.Group) {
// sanity checks
if nf := len(txdata.frames); nf != 1 {
l.Log.Crit("Unexpected number of frames in calldata tx", "num_frames", nf)
@@ -790,7 +801,7 @@ func (l *BatchSubmitter) publishToAltDAAndL1(txdata txData, queue *txmgr.Queue[t

// when posting txdata to an external DA Provider, we use a goroutine to avoid blocking the main loop
// since it may take a while for the request to return.
goroutineSpawned := daGroup.TryGo(func() error {
daGroup.Go(func() error {
// TODO: probably shouldn't be using the global shutdownCtx here, see https://go.dev/blog/context-and-structs
// but sendTransaction receives l.killCtx as an argument, which currently is only canceled after waiting for the main loop
// to exit, which would wait on this DA call to finish, which would take a long time.
@@ -809,17 +820,12 @@ func (l *BatchSubmitter) publishToAltDAAndL1(txdata txData, queue *txmgr.Queue[t
}
return nil
}
l.Log.Info("Set altda input", "commitment", comm, "tx", txdata.ID())
candidate := l.calldataTxCandidate(comm.TxData())
l.sendTx(txdata, false, candidate, queue, receiptsCh)
l.Log.Info("Sent txdata to altda layer and received commitment", "commitment", comm, "tx", txdata.ID())
l.channelMgrMutex.Lock()
l.channelMgr.CacheAltDACommitment(txdata, comm)
l.channelMgrMutex.Unlock()
return nil
})
if !goroutineSpawned {
// We couldn't start the goroutine because the errgroup.Group limit
// is already reached. Since we can't send the txdata, we have to
// return it for later processing. We use nil error to skip error logging.
l.recordFailedDARequest(txdata.ID(), nil)
}
}

// sendTransaction creates & queues for sending a transaction to the batch inbox address with the given `txData`.
@@ -830,7 +836,15 @@ func (l *BatchSubmitter) sendTransaction(txdata txData, queue *txmgr.Queue[txRef

// if Alt DA is enabled we post the txdata to the DA Provider and replace it with the commitment.
if l.Config.UseAltDA {
l.publishToAltDAAndL1(txdata, queue, receiptsCh, daGroup)
if txdata.altDACommitment == nil {
l.publishToAltDAAndStoreCommitment(txdata, daGroup)
} else {
// This means the txdata was already sent to the DA Provider and we have the commitment
// so we can send the commitment to the L1
l.Log.Info("Sending altda commitment to L1", "commitment", txdata.altDACommitment, "tx", txdata.ID())
candidate := l.calldataTxCandidate(txdata.altDACommitment.TxData())
l.sendTx(txdata, false, candidate, queue, receiptsCh)
}
// we return nil to allow publishStateToL1 to keep processing the next txdata
return nil
}
@@ -909,7 +923,7 @@ func (l *BatchSubmitter) recordFailedDARequest(id txID, err error) {
if err != nil {
l.Log.Warn("DA request failed", logFields(id, err)...)
}
l.channelMgr.TxFailed(id)
l.channelMgr.AltDASubmissionFailed(id)
}

func (l *BatchSubmitter) recordFailedTx(id txID, err error) {
282 changes: 282 additions & 0 deletions op-batcher/batcher/driver_test.go
Original file line number Diff line number Diff line change
@@ -2,15 +2,26 @@ package batcher

import (
"context"
"encoding/binary"
"errors"
"fmt"
"math/big"
"testing"
"time"

altda "github.com/ethereum-optimism/optimism/op-alt-da"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/dial"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-service/testutils"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/pingcap/failpoint"
"github.com/stretchr/testify/require"
)

@@ -114,5 +125,276 @@ func TestBatchSubmitter_SafeL1Origin_FailsToResolveRollupClient(t *testing.T) {
ep.rollupClientErr = errors.New("failed to resolve rollup client")

_, err := bs.safeL1Origin(context.Background())
fmt.Println(err)
require.Error(t, err)
}

// ======= ALTDA TESTS =======

// fakeL1Client is just a dummy struct. All fault injection is done via the fakeTxMgr (which doesn't interact with this fakeL1Client).
type fakeL1Client struct {
}

func (f *fakeL1Client) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) {
if number == nil {
number = big.NewInt(0)
}
return &types.Header{
Number: number,
ParentHash: common.Hash{},
Time: 0,
}, nil
}
func (f *fakeL1Client) NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) {
return 0, nil
}

func altDASetup(t *testing.T, log log.Logger) (*BatchSubmitter, *mockL2EndpointProvider, *altda.MockDAClient, *testutils.FakeTxMgr) {
ep := newEndpointProvider()

rollupCfg := &rollup.Config{
Genesis: rollup.Genesis{L2: eth.BlockID{Number: 0}, L1: eth.BlockID{Number: genesisL1Origin}},
L2ChainID: big.NewInt(1234),
}
batcherCfg := BatcherConfig{
PollInterval: 10 * time.Millisecond,
UseAltDA: true,
}

fakeTxMgr := testutils.NewFakeTxMgr(log.With("subsystem", "fake-txmgr"), common.Address{0})
l1Client := &fakeL1Client{}

channelCfg := ChannelConfig{
// SeqWindowSize: 15,
// SubSafetyMargin: 4,
ChannelTimeout: 10,
MaxFrameSize: 150, // so that each channel has exactly 1 frame
TargetNumFrames: 1,
BatchType: derive.SingularBatchType,
CompressorConfig: compressor.Config{
Kind: compressor.NoneKind,
},
}
mockAltDAClient := altda.NewCountingGenericCommitmentMockDAClient(log.With("subsystem", "da-client"))
return NewBatchSubmitter(DriverSetup{
Log: log,
Metr: metrics.NoopMetrics,
RollupConfig: rollupCfg,
ChannelConfig: channelCfg,
Config: batcherCfg,
EndpointProvider: ep,
Txmgr: fakeTxMgr,
L1Client: l1Client,
AltDA: mockAltDAClient,
}), ep, mockAltDAClient, fakeTxMgr
}

func fakeSyncStatus(unsafeL2BlockNum uint64, L1BlockRef eth.L1BlockRef) *eth.SyncStatus {
return &eth.SyncStatus{
UnsafeL2: eth.L2BlockRef{
Number: unsafeL2BlockNum,
L1Origin: eth.BlockID{
Number: 0,
},
},
SafeL2: eth.L2BlockRef{
Number: 0,
L1Origin: eth.BlockID{
Number: 0,
},
},
HeadL1: L1BlockRef,
}
}

// There are 4 failure cases (unhappy paths) that the op-batcher has to deal with.
// They are outlined in https://github.com/ethereum-optimism/optimism/tree/develop/op-batcher#happy-path
// This test suite covers these 4 cases in the context of AltDA.
func TestBatchSubmitter_AltDA_FailureCase1_L2Reorg(t *testing.T) {
t.Parallel()
log := testlog.Logger(t, log.LevelDebug)
bs, ep, mockAltDAClient, fakeTxMgr := altDASetup(t, log)

L1Block0 := types.NewBlock(&types.Header{
Number: big.NewInt(0),
}, nil, nil, nil, types.DefaultBlockConfig)
L1Block0Ref := eth.L1BlockRef{
Hash: L1Block0.Hash(),
Number: L1Block0.NumberU64(),
}
// We return incremental syncStatuses to force the op-batcher to entirely process each L2 block one by one.
// To test multi channel behavior, we could return a sync status that is multiple blocks ahead of the current L2 block.
ep.rollupClient.Mock.On("SyncStatus").Times(10).Return(fakeSyncStatus(1, L1Block0Ref), nil)
ep.rollupClient.Mock.On("SyncStatus").Times(10).Return(fakeSyncStatus(2, L1Block0Ref), nil)
ep.rollupClient.Mock.On("SyncStatus").Times(10).Return(fakeSyncStatus(3, L1Block0Ref), nil)
ep.rollupClient.Mock.On("SyncStatus").Times(10).Return(fakeSyncStatus(1, L1Block0Ref), nil)
ep.rollupClient.Mock.On("SyncStatus").Times(10).Return(fakeSyncStatus(2, L1Block0Ref), nil)
ep.rollupClient.Mock.On("SyncStatus").Return(fakeSyncStatus(3, L1Block0Ref), nil)

L2Block0 := newMiniL2BlockWithNumberParent(1, big.NewInt(0), common.HexToHash("0x0"))
L2Block1 := newMiniL2BlockWithNumberParent(1, big.NewInt(1), L2Block0.Hash())
L2Block2 := newMiniL2BlockWithNumberParent(1, big.NewInt(2), L2Block1.Hash())
L2Block2Prime := newMiniL2BlockWithNumberParentAndL1Information(1, big.NewInt(2), L2Block1.Hash(), 101, 0)
L2Block3Prime := newMiniL2BlockWithNumberParent(1, big.NewInt(3), L2Block2Prime.Hash())

// L2block0 is the genesis block which is considered safe, so never loaded into the state.
ep.ethClient.Mock.On("BlockByNumber", big.NewInt(1)).Twice().Return(L2Block1, nil)
ep.ethClient.Mock.On("BlockByNumber", big.NewInt(2)).Once().Return(L2Block2, nil)
ep.ethClient.Mock.On("BlockByNumber", big.NewInt(2)).Once().Return(L2Block2Prime, nil)
ep.ethClient.Mock.On("BlockByNumber", big.NewInt(3)).Twice().Return(L2Block3Prime, nil)

err := bs.StartBatchSubmitting()
require.NoError(t, err)
time.Sleep(1 * time.Second) // 1 second is enough to process all blocks at 10ms poll interval
err = bs.StopBatchSubmitting(context.Background())
require.NoError(t, err)

// After the reorg, block 1 needs to be reprocessed, hence why we see 5 store calls: 1, 2, 1, 2', 3'
require.Equal(t, 5, mockAltDAClient.StoreCount)
require.Equal(t, uint64(5), fakeTxMgr.Nonce)

}

func TestBatchSubmitter_AltDA_FailureCase2_FailedL1Tx(t *testing.T) {
t.Parallel()
log := testlog.Logger(t, log.LevelDebug)
bs, ep, mockAltDAClient, fakeTxMgr := altDASetup(t, log)

L1Block0 := types.NewBlock(&types.Header{
Number: big.NewInt(0),
}, nil, nil, nil, types.DefaultBlockConfig)
L1Block0Ref := eth.L1BlockRef{
Hash: L1Block0.Hash(),
Number: L1Block0.NumberU64(),
}
// We return incremental syncStatuses to force the op-batcher to entirely process each L2 block one by one.
// To test multi channel behavior, we could return a sync status that is multiple blocks ahead of the current L2 block.
ep.rollupClient.Mock.On("SyncStatus").Times(10).Return(fakeSyncStatus(1, L1Block0Ref), nil)
ep.rollupClient.Mock.On("SyncStatus").Times(10).Return(fakeSyncStatus(2, L1Block0Ref), nil)
ep.rollupClient.Mock.On("SyncStatus").Times(10).Return(fakeSyncStatus(3, L1Block0Ref), nil)
ep.rollupClient.Mock.On("SyncStatus").Return(fakeSyncStatus(4, L1Block0Ref), nil)

L2Block0 := newMiniL2BlockWithNumberParent(1, big.NewInt(0), common.HexToHash("0x0"))
L2Block1 := newMiniL2BlockWithNumberParent(1, big.NewInt(1), L2Block0.Hash())
L2Block2 := newMiniL2BlockWithNumberParent(1, big.NewInt(2), L2Block1.Hash())
L2Block3 := newMiniL2BlockWithNumberParent(1, big.NewInt(3), L2Block2.Hash())
L2Block4 := newMiniL2BlockWithNumberParent(1, big.NewInt(4), L2Block3.Hash())

// L2block0 is the genesis block which is considered safe, so never loaded into the state.
ep.ethClient.Mock.On("BlockByNumber", big.NewInt(1)).Once().Return(L2Block1, nil)
ep.ethClient.Mock.On("BlockByNumber", big.NewInt(2)).Once().Return(L2Block2, nil)
ep.ethClient.Mock.On("BlockByNumber", big.NewInt(3)).Once().Return(L2Block3, nil)
ep.ethClient.Mock.On("BlockByNumber", big.NewInt(4)).Once().Return(L2Block4, nil)

fakeTxMgr.ErrorEveryNthSend(2)
err := bs.StartBatchSubmitting()
require.NoError(t, err)
time.Sleep(1 * time.Second) // 1 second is enough to process all blocks at 10ms poll interval
err = bs.StopBatchSubmitting(context.Background())
require.NoError(t, err)

require.Equal(t, 4, mockAltDAClient.StoreCount)
// TODO: we should prob also check that the commitments are in order?
require.Equal(t, uint64(4), fakeTxMgr.Nonce)
}

// FailpointTest, which can't be run normally, because it requires failpoint to be enabled.
// Run it via `just test-failpoint` from the op-batcher directory.
func TestBatchSubmitter_AltDA_FailureCase3_ChannelTimeout_FailpointTest(t *testing.T) {
t.Parallel()

// // Failpoint injects code in channel.isTimedOut to return true 50% of the time, for up to 4 times.
err := failpoint.Enable("github.com/ethereum-optimism/optimism/op-batcher/batcher/channel.isTimedOut", "50%4*return(true)")
require.NoError(t, err)
t.Cleanup(func() {
_ = failpoint.Disable("github.com/ethereum-optimism/optimism/op-batcher/batcher/channel.isTimedOut")
})

log := testlog.Logger(t, log.LevelInfo)
bs, ep, mockAltDAClient, fakeTxMgr := altDASetup(t, log)
L1Block0 := types.NewBlock(&types.Header{
Number: big.NewInt(0),
}, nil, nil, nil, types.DefaultBlockConfig)
L1Block0Ref := eth.L1BlockRef{
Hash: L1Block0.Hash(),
Number: L1Block0.NumberU64(),
}
// We return incremental syncStatuses to force the op-batcher to entirely process each L2 block one by one.
// To test multi channel behavior, we could return a sync status that is multiple blocks ahead of the current L2 block.
// Removing the first 3 mock calls and only always returning a syncStatus on block 4 makes the batcher load all 4 blocks at once,
// and send them in parallel. However, the current channel timeout logic is not robust to this: https://github.com/ethereum-optimism/optimism/issues/13283
// TODO: After that issue is fixed, we can remove the first 3 mock calls here to test with more concurrent channels.
ep.rollupClient.Mock.On("SyncStatus").Times(10).Return(fakeSyncStatus(1, L1Block0Ref), nil)
ep.rollupClient.Mock.On("SyncStatus").Times(10).Return(fakeSyncStatus(2, L1Block0Ref), nil)
ep.rollupClient.Mock.On("SyncStatus").Times(10).Return(fakeSyncStatus(3, L1Block0Ref), nil)
ep.rollupClient.Mock.On("SyncStatus").Return(fakeSyncStatus(4, L1Block0Ref), nil)

L2Block0 := newMiniL2BlockWithNumberParent(1, big.NewInt(0), common.HexToHash("0x0"))
L2Block1 := newMiniL2BlockWithNumberParent(1, big.NewInt(1), L2Block0.Hash())
L2Block2 := newMiniL2BlockWithNumberParent(1, big.NewInt(2), L2Block1.Hash())
L2Block3 := newMiniL2BlockWithNumberParent(1, big.NewInt(3), L2Block2.Hash())
L2Block4 := newMiniL2BlockWithNumberParent(1, big.NewInt(4), L2Block3.Hash())

// L2block0 is the genesis block which is considered safe, so never loaded into the state.
ep.ethClient.Mock.On("BlockByNumber", big.NewInt(1)).Once().Return(L2Block1, nil)
ep.ethClient.Mock.On("BlockByNumber", big.NewInt(2)).Once().Return(L2Block2, nil)
ep.ethClient.Mock.On("BlockByNumber", big.NewInt(3)).Once().Return(L2Block3, nil)
ep.ethClient.Mock.On("BlockByNumber", big.NewInt(4)).Once().Return(L2Block4, nil)

err = bs.StartBatchSubmitting()
require.NoError(t, err)
time.Sleep(1 * time.Second) // 1 second is enough to process all blocks at 10ms poll interval
err = bs.StopBatchSubmitting(context.Background())
require.NoError(t, err)

log.Info("Number of commitments stored by the mockAltDAClient", "StoreCount", mockAltDAClient.StoreCount)
for i, txData := range fakeTxMgr.SuccessfullySentTxData {
// the mockAltDAClient uses counting commitments which take the last 2 bytes (bigEndian uint16),
// and the op-batcher adds the first 2 bytes for the version_byte and commitment_type.
// See https://specs.optimism.io/experimental/alt-da.html#input-commitment-submission
require.Equal(t, 4, len(txData))
// This is a very naive test, because we only check that the requests received by the fakeAltDAClient are sent to L1 in order.
// But this is neither necessary not sufficient. It works right now because the frames are sent in order to the altDAClient (see TODO above).
// But ultimately it is the frames that need to be sent in order, not the random commitment counts generate by the fakeAltDAClient.
// TODO: we should prob take the commitment and retrieve the frames from the fakeAltDAClient, and check that those were received in order somehow.
require.Equal(t, binary.BigEndian.Uint16(txData[2:4]), uint16(i), "altda commitments should be sent to L1 in order.")
}
}

func TestBatchSubmitter_AltDA_FailureCase4_FailedBlobSubmission(t *testing.T) {
t.Parallel()
log := testlog.Logger(t, log.LevelDebug)
bs, ep, mockAltDAClient, fakeTxMgr := altDASetup(t, log)

L1Block0 := types.NewBlock(&types.Header{
Number: big.NewInt(0),
}, nil, nil, nil, types.DefaultBlockConfig)
L1Block0Ref := eth.L1BlockRef{
Hash: L1Block0.Hash(),
Number: L1Block0.NumberU64(),
}
ep.rollupClient.Mock.On("SyncStatus").Return(fakeSyncStatus(4, L1Block0Ref), nil)

L2Block0 := newMiniL2BlockWithNumberParent(1, big.NewInt(0), common.HexToHash("0x0"))
L2Block1 := newMiniL2BlockWithNumberParent(1, big.NewInt(1), L2Block0.Hash())
L2Block2 := newMiniL2BlockWithNumberParent(1, big.NewInt(2), L2Block1.Hash())
L2Block3 := newMiniL2BlockWithNumberParent(1, big.NewInt(3), L2Block2.Hash())
L2Block4 := newMiniL2BlockWithNumberParent(1, big.NewInt(4), L2Block3.Hash())

// L2block0 is the genesis block which is considered safe, so never loaded into the state.
ep.ethClient.Mock.On("BlockByNumber", big.NewInt(1)).Once().Return(L2Block1, nil)
ep.ethClient.Mock.On("BlockByNumber", big.NewInt(2)).Once().Return(L2Block2, nil)
ep.ethClient.Mock.On("BlockByNumber", big.NewInt(3)).Once().Return(L2Block3, nil)
ep.ethClient.Mock.On("BlockByNumber", big.NewInt(4)).Once().Return(L2Block4, nil)

mockAltDAClient.DropEveryNthPut(2)

err := bs.StartBatchSubmitting()
require.NoError(t, err)
time.Sleep(1 * time.Second) // 1 second is enough to process all blocks at 10ms poll interval
err = bs.StopBatchSubmitting(context.Background())
require.NoError(t, err)

require.Equal(t, 4, mockAltDAClient.StoreCount)
require.Equal(t, uint64(4), fakeTxMgr.Nonce)
}
5 changes: 3 additions & 2 deletions op-batcher/batcher/service.go
Original file line number Diff line number Diff line change
@@ -370,10 +370,11 @@ func (bs *BatcherService) initRPCServer(cfg *CLIConfig) error {

func (bs *BatcherService) initAltDA(cfg *CLIConfig) error {
config := cfg.AltDA
if err := config.Check(); err != nil {
daClient, err := config.NewDAClient()
if err != nil {
return err
}
bs.AltDA = config.NewDAClient()
bs.AltDA = daClient
bs.UseAltDA = config.Enabled
return nil
}
4 changes: 4 additions & 0 deletions op-batcher/batcher/tx_data.go
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@ import (
"fmt"
"strings"

altda "github.com/ethereum-optimism/optimism/op-alt-da"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive/params"
"github.com/ethereum-optimism/optimism/op-service/eth"
@@ -17,6 +18,9 @@ import (
type txData struct {
frames []frameData
asBlob bool // indicates whether this should be sent as blob
// altDACommitment is non-nil when the frames have been sent to the alt-da server,
// and the received commitment needs to be sent to the L1.
altDACommitment altda.CommitmentData
}

func singleFrameTxData(frame frameData) txData {
8 changes: 7 additions & 1 deletion op-batcher/justfile
Original file line number Diff line number Diff line change
@@ -17,7 +17,13 @@ clean:
rm -f {{BINARY}}

# Run tests
test: (go_test "./...")
test: test-failpoint
just go_test "./..."

# Failpoint tests rely on https://github.com/pingcap/failpoint, which are identified by a _FailpointTest suffix to the test name.
# We run them using a separate GOCACHE because they inject code into the compiled binaries, which could interfere with other tests or normal builds.
test-failpoint:
GOCACHE=/tmp/failpoint-cache just go_test "./batcher/..." "-toolexec" "$(mise which failpoint-toolexec)" "-run" ".*_FailpointTest$"

[private]
batcher_fuzz_task FUZZ TIME='10s': (go_fuzz FUZZ TIME "./batcher")
19 changes: 14 additions & 5 deletions op-batcher/readme.md
Original file line number Diff line number Diff line change
@@ -38,23 +38,32 @@ In the happy path, the batcher periodically:
1. Enqueues unsafe blocks and dequeues safe blocks from the sequencer to its internal state.
2. Enqueues a new channel, if necessary.
3. Processes some unprocessed blocks into the current channel, triggers the compression of the block data and the creation of frames.
4. Sends frames from the channel queue to the DA layer as (e.g. to Ethereum L1 as calldata or blob transactions).
4. Sends frames from the channel queue to the DA layer (e.g. to Ethereum L1 as calldata or blob transactions).
5. If there is more transaction data to send, go to 2. Else wait for a tick and go to 0.


The `blockCursor` state variable tracks the next unprocessed block.
In each channel, the `frameCursor` tracks the next unsent frame.

### Failure Cases

### Reorgs
When an L2 unsafe reorg is detected, the batch submitter will reset its state, and wait for any in flight transactions to be ingested by the verifier nodes before starting work again.
#### Reorgs
When an L2 reorg (safe or unsafe) is detected, the batch submitter will reset its state, and wait for any in flight transactions to be ingested by the verifier nodes before starting work again.

### Tx Failed
> TODO: is this true?? `waitNodeSync()` seems to only wait for the node to sync to the L1 tip of the batcher's L1 node, or an older block where the last batcher tx was included, but not wait for inflight txs to be ingested.
#### Tx Failed
When a Tx fails, an asynchronous receipts handler is triggered. The channel from whence the Tx's frames came has its `frameCursor` rewound, so that all the frames can be resubmitted in order.

### Channel Times Out
> TODO: there might be an issue with this simple logic. See https://github.com/ethereum-optimism/optimism/issues/13283
#### Channel Times Out
When a Tx is confirmed, an asynchronous receipts handler is triggered. We only update the batcher's state if the channel timed out on chain. In that case, the `blockCursor` is rewound to the first block added to that channel, and the channel queue is cleared out. This allows the batcher to start fresh building a new channel starting from the same block -- it does not need to refetch blocks from the sequencer.

#### AltDA Submission Fails

> TODO: describe how the batcher handles this.
## Design Principles and Optimization Targets
At the current time, the batcher should be optimized for correctness, simplicity and robustness. It is considered preferable to prioritize these properties, even at the expense of other potentially desirable properties such as frugality. For example, it is preferable to have the batcher resubmit some data from time to time ("wasting" money on data availability costs) instead of avoiding that by e.g. adding some persistent state to the batcher.

2 changes: 1 addition & 1 deletion op-chain-ops/genesis/config.go
Original file line number Diff line number Diff line change
@@ -1127,7 +1127,7 @@ func (d *L1Deployments) Check(deployConfig *DeployConfig) error {
(name == "OptimismPortal" || name == "L2OutputOracle" || name == "L2OutputOracleProxy") {
continue
}
if !deployConfig.UseAltDA &&
if (!deployConfig.UseAltDA || deployConfig.DACommitmentType == altda.GenericCommitmentString) &&
(name == "DataAvailabilityChallenge" ||
name == "DataAvailabilityChallengeProxy") {
continue
13 changes: 7 additions & 6 deletions op-e2e/config/init.go
Original file line number Diff line number Diff line change
@@ -48,10 +48,11 @@ const (
type AllocType string

const (
AllocTypeStandard AllocType = "standard"
AllocTypeAltDA AllocType = "alt-da"
AllocTypeL2OO AllocType = "l2oo"
AllocTypeMTCannon AllocType = "mt-cannon"
AllocTypeStandard AllocType = "standard"
AllocTypeAltDA AllocType = "alt-da"
AllocTypeAltDAGeneric AllocType = "alt-da-generic"
AllocTypeL2OO AllocType = "l2oo"
AllocTypeMTCannon AllocType = "mt-cannon"

DefaultAllocType = AllocTypeStandard
)
@@ -65,14 +66,14 @@ func (a AllocType) Check() error {

func (a AllocType) UsesProofs() bool {
switch a {
case AllocTypeStandard, AllocTypeMTCannon, AllocTypeAltDA:
case AllocTypeStandard, AllocTypeMTCannon, AllocTypeAltDA, AllocTypeAltDAGeneric:
return true
default:
return false
}
}

var allocTypes = []AllocType{AllocTypeStandard, AllocTypeAltDA, AllocTypeL2OO, AllocTypeMTCannon}
var allocTypes = []AllocType{AllocTypeStandard, AllocTypeAltDA, AllocTypeAltDAGeneric, AllocTypeL2OO, AllocTypeMTCannon}

var (
// All of the following variables are set in the init function
77 changes: 66 additions & 11 deletions op-e2e/system/altda/concurrent_test.go
Original file line number Diff line number Diff line change
@@ -7,20 +7,22 @@ import (
"time"

op_e2e "github.com/ethereum-optimism/optimism/op-e2e"
"github.com/ethereum/go-ethereum/common/hexutil"

"github.com/ethereum-optimism/optimism/op-batcher/flags"
"github.com/ethereum-optimism/optimism/op-e2e/config"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils/geth"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils/transactions"
"github.com/ethereum-optimism/optimism/op-e2e/system/e2esys"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/stretchr/testify/require"
)

// TestBatcherConcurrentAltDARequests tests that the batcher can submit parallel requests
// to the alt-da server. It does not check that the requests are correctly ordered and interpreted
// by op nodes.
func TestBatcherConcurrentAltDARequests(t *testing.T) {
op_e2e.InitParallel(t)

numL1TxsExpected := int64(10)

cfg := e2esys.DefaultSystemConfig(t)
// Manually configure these since the alt-DA values aren't
// set at all in the standard config unless UseAltDA is set.
@@ -32,11 +34,9 @@ func TestBatcherConcurrentAltDARequests(t *testing.T) {
cfg.DeployConfig.DABondSize = 1000000
cfg.DeployConfig.DAResolverRefundPercentage = 0
cfg.BatcherMaxPendingTransactions = 0 // no limit on parallel txs
// ensures that batcher txs are as small as possible
cfg.BatcherMaxL1TxSizeBytes = derive.FrameV0OverHeadSize + 1 /*version bytes*/ + 1
cfg.BatcherBatchType = 0
cfg.DataAvailabilityType = flags.CalldataType
cfg.BatcherMaxConcurrentDARequest = uint64(numL1TxsExpected)
cfg.BatcherMaxConcurrentDARequest = 2

// disable batcher because we start it manually below
cfg.DisableBatcher = true
@@ -46,14 +46,15 @@ func TestBatcherConcurrentAltDARequests(t *testing.T) {
sys.Close()
})

// make every request take 5 seconds, such that only concurrent requests will be able to make progress fast enough
// make every request take 5 seconds, such that only if 2 altda requests are made
// concurrently will 2 batcher txs be able to land in a single L1 block
sys.FakeAltDAServer.SetPutRequestLatency(5 * time.Second)

l1Client := sys.NodeClient("l1")
l2Seq := sys.NodeClient("sequencer")

// we wait for numL1TxsExpected L2 blocks to have been produced, just to make sure the sequencer is working properly
_, err = geth.WaitForBlock(big.NewInt(numL1TxsExpected), l2Seq)
// we wait for 10 L2 blocks to have been produced, just to make sure the sequencer is working properly
_, err = geth.WaitForBlock(big.NewInt(10), l2Seq)
require.NoError(t, err, "Waiting for L2 blocks")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
@@ -65,8 +66,7 @@ func TestBatcherConcurrentAltDARequests(t *testing.T) {
err = driver.StartBatchSubmitting()
require.NoError(t, err)

// Iterate over up to 10 blocks. The number of transactions sent by the batcher should
// exceed the number of blocks.
// We make sure that some block has more than 1 batcher tx
checkBlocks := 10
for i := 0; i < checkBlocks; i++ {
block, err := geth.WaitForBlock(big.NewInt(int64(startingL1BlockNum)+int64(i)), l1Client)
@@ -82,3 +82,58 @@ func TestBatcherConcurrentAltDARequests(t *testing.T) {

t.Fatalf("did not find more than 1 batcher tx per block in %d blocks", checkBlocks)
}

// The Holocene fork enforced a new strict batch ordering rule, see https://specs.optimism.io/protocol/holocene/derivation.html
// This test makes sure that concurrent requests to the alt-da server that are responded out of order
// are submitted to the L1 chain in the correct order by the batcher.
func TestBatcherCanHandleOutOfOrderDAServerResponses(t *testing.T) {
op_e2e.InitParallel(t)
// Not sure whether WithAllocType is needed here, as the tests pass even without them
// (see mslipper's comments for the TestBatcherConcurrentAltDARequests test above))
// TODO: understand how the DeployConfigs are related to the AllocTypes
// I asked here https://discord.com/channels/1244729134312198194/1332175015180767265/1332456541067935834 but have yet to get an answer.
cfg := e2esys.HoloceneSystemConfig(t, new(hexutil.Uint64), e2esys.WithAllocType(config.AllocTypeAltDAGeneric))
cfg.DeployConfig.UseAltDA = true
cfg.DeployConfig.DACommitmentType = "GenericCommitment"
// TODO: figure out why the below are needed even in GenericCommitment mode which doesn't use the DAChallenge Contract
cfg.DeployConfig.DAChallengeWindow = 16
cfg.DeployConfig.DAResolveWindow = 16
cfg.DeployConfig.DABondSize = 1000000
cfg.DeployConfig.DAResolverRefundPercentage = 0
cfg.BatcherMaxPendingTransactions = 0 // no limit on parallel txs
cfg.BatcherBatchType = 0
cfg.DataAvailabilityType = flags.CalldataType
cfg.BatcherMaxConcurrentDARequest = 2
cfg.BatcherMaxL1TxSizeBytes = 150 // enough to fit a single compressed empty L1 block, but not 2
cfg.Nodes["sequencer"].SafeDBPath = t.TempDir() // needed for SafeHeadAtL1Block() below

sys, err := cfg.Start(t)
require.NoError(t, err, "Error starting up system")
t.Cleanup(func() {
sys.Close()
})
sys.FakeAltDAServer.SetOutOfOrderResponses(true)

l1Client := sys.NodeClient("l1")
l2SeqCL := sys.RollupClient("sequencer")

checkBlocksL1 := int64(15)
l2SafeHeadMovedCount := 0
l2SafeHeadMovedCountExpected := 3
l2SafeHeadCur := uint64(0)
for i := int64(0); i < checkBlocksL1; i++ {
_, err := geth.WaitForBlock(big.NewInt(i), l1Client, geth.WithNoChangeTimeout(5*time.Minute))
require.NoError(t, err, "Waiting for l1 blocks")
newL2SafeHead, err := l2SeqCL.SafeHeadAtL1Block(context.Background(), uint64(i))
require.NoError(t, err)
if newL2SafeHead.SafeHead.Number > l2SafeHeadCur {
l2SafeHeadMovedCount++
l2SafeHeadCur = newL2SafeHead.SafeHead.Number
}
if l2SafeHeadMovedCount == l2SafeHeadMovedCountExpected {
return
}
}
t.Fatalf("L2SafeHead only advanced %d times (expected >= %d) in %d L1 blocks", l2SafeHeadMovedCount, l2SafeHeadMovedCountExpected, checkBlocksL1)

}
38 changes: 20 additions & 18 deletions op-e2e/system/e2esys/setup.go
Original file line number Diff line number Diff line change
@@ -789,6 +789,24 @@ func (cfg SystemConfig) Start(t *testing.T, startOpts ...StartOption) (*System,
}
}

// The altDACLIConfig is shared by the batcher and rollup nodes.
var altDACLIConfig altda.CLIConfig
if cfg.DeployConfig.UseAltDA {
fakeAltDAServer := altda.NewFakeDAServer("127.0.0.1", 0, sys.Cfg.Loggers["da-server"])
if err := fakeAltDAServer.Start(); err != nil {
return nil, fmt.Errorf("failed to start fake altDA server: %w", err)
}
sys.FakeAltDAServer = fakeAltDAServer

altDACLIConfig = altda.CLIConfig{
Enabled: cfg.DeployConfig.UseAltDA,
DAServerURL: fakeAltDAServer.HttpEndpoint(),
VerifyOnRead: true,
GenericDA: true,
MaxConcurrentRequests: cfg.BatcherMaxConcurrentDARequest,
}
}

// Rollup nodes

// Ensure we are looping through the nodes in alphabetical order
@@ -803,7 +821,7 @@ func (cfg SystemConfig) Start(t *testing.T, startOpts ...StartOption) (*System,
if err := c.LoadPersisted(cfg.Loggers[name]); err != nil {
return nil, err
}

c.AltDA = altDACLIConfig
if p, ok := p2pNodes[name]; ok {
c.P2P = p

@@ -905,22 +923,6 @@ func (cfg SystemConfig) Start(t *testing.T, startOpts ...StartOption) (*System,
batcherTargetNumFrames = 1
}

var batcherAltDACLIConfig altda.CLIConfig
if cfg.DeployConfig.UseAltDA {
fakeAltDAServer := altda.NewFakeDAServer("127.0.0.1", 0, sys.Cfg.Loggers["da-server"])
if err := fakeAltDAServer.Start(); err != nil {
return nil, fmt.Errorf("failed to start fake altDA server: %w", err)
}
sys.FakeAltDAServer = fakeAltDAServer

batcherAltDACLIConfig = altda.CLIConfig{
Enabled: cfg.DeployConfig.UseAltDA,
DAServerURL: fakeAltDAServer.HttpEndpoint(),
VerifyOnRead: true,
GenericDA: true,
MaxConcurrentRequests: cfg.BatcherMaxConcurrentDARequest,
}
}
batcherCLIConfig := &bss.CLIConfig{
L1EthRpc: sys.EthInstances[RoleL1].UserRPC().RPC(),
L2EthRpc: sys.EthInstances[RoleSeq].UserRPC().RPC(),
@@ -943,7 +945,7 @@ func (cfg SystemConfig) Start(t *testing.T, startOpts ...StartOption) (*System,
MaxBlocksPerSpanBatch: cfg.BatcherMaxBlocksPerSpanBatch,
DataAvailabilityType: sys.Cfg.DataAvailabilityType,
CompressionAlgo: derive.Zlib,
AltDA: batcherAltDACLIConfig,
AltDA: altDACLIConfig,
}

// Apply batcher cli modifications
5 changes: 4 additions & 1 deletion op-node/node/node.go
Original file line number Diff line number Diff line change
@@ -426,7 +426,10 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config) error {
if cfg.AltDA.Enabled && err != nil {
return fmt.Errorf("failed to get altDA config: %w", err)
}
altDA := altda.NewAltDA(n.log, cfg.AltDA, rpCfg, n.metrics.AltDAMetrics)
altDA, err := altda.NewAltDA(n.log, cfg.AltDA, rpCfg, n.metrics.AltDAMetrics)
if err != nil {
return fmt.Errorf("failed to create altDA: %w", err)
}
if cfg.SafeDBPath != "" {
n.log.Info("Safe head database enabled", "path", cfg.SafeDBPath)
safeDB, err := safedb.NewSafeDB(n.log, cfg.SafeDBPath)
80 changes: 80 additions & 0 deletions op-service/testutils/fake_txmgr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package testutils

import (
"context"
"errors"
"math/big"

"github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
)

// FakeTxMgr is a fake txmgr.TxManager for testing the op-batcher.
type FakeTxMgr struct {
log log.Logger
FromAddr common.Address
Closed bool
Nonce uint64
SuccessfullySentTxData [][]byte
errorEveryNthSend uint // 0 means never error, 1 means every send errors, etc.
sendCount uint
}

var _ txmgr.TxManager = (*FakeTxMgr)(nil)

func NewFakeTxMgr(log log.Logger, from common.Address) *FakeTxMgr {
return &FakeTxMgr{
log: log,
FromAddr: from,
SuccessfullySentTxData: make([][]byte, 0),
}
}

func (f *FakeTxMgr) ErrorEveryNthSend(n uint) {
f.errorEveryNthSend = n
}

func (f *FakeTxMgr) Send(ctx context.Context, candidate txmgr.TxCandidate) (*types.Receipt, error) {
// We currently only use the FakeTxMgr to test the op-batcher, which only uses SendAsync.
// Send makes it harder to track failures and nonce management (prob need to add mutex, etc).
// We can implement this if/when its needed.
panic("FakeTxMgr does not implement Send")
}
func (f *FakeTxMgr) SendAsync(ctx context.Context, candidate txmgr.TxCandidate, ch chan txmgr.SendResponse) {
f.log.Debug("SendingAsync tx", "nonce", f.Nonce)
f.sendCount++
var sendResponse txmgr.SendResponse
if f.errorEveryNthSend != 0 && f.sendCount%f.errorEveryNthSend == 0 {
sendResponse.Err = errors.New("errorEveryNthSend")
} else {
sendResponse.Receipt = &types.Receipt{
BlockHash: common.Hash{},
BlockNumber: big.NewInt(0),
}
sendResponse.Nonce = f.Nonce
f.Nonce++
f.SuccessfullySentTxData = append(f.SuccessfullySentTxData, candidate.TxData)
}
ch <- sendResponse
}
func (f *FakeTxMgr) From() common.Address {
return f.FromAddr
}
func (f *FakeTxMgr) BlockNumber(ctx context.Context) (uint64, error) {
return 0, nil
}
func (f *FakeTxMgr) API() rpc.API {
return rpc.API{}
}
func (f *FakeTxMgr) Close() {
f.Closed = true
}
func (f *FakeTxMgr) IsClosed() bool {
return f.Closed
}
func (f *FakeTxMgr) SuggestGasPriceCaps(ctx context.Context) (tipCap *big.Int, baseFee *big.Int, blobBaseFee *big.Int, err error) {
return nil, nil, nil, nil
}