Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(batcher): altda parallel submitted blobs respect strict holocene order #21

Open
wants to merge 6 commits into
base: eigenda-develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions .github/workflows/test-golang.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ jobs:
strategy:
matrix:
packages:
- op-batcher
- op-node
- op-e2e/system/altda
- op-e2e/actions/altda
Expand All @@ -68,7 +67,6 @@ jobs:
with:
path: packages/contracts-bedrock/forge-artifacts
key: ${{ runner.os }}-forge-${{ hashFiles('packages/contracts-bedrock/src/**/*.sol') }}

# Cache has been stored in the build-and-cache-contracts job, so if this fails there's a problem
- name: Check cache restore
if: steps.cache-restore.outputs.cache-hit != 'true'
Expand Down Expand Up @@ -96,3 +94,38 @@ jobs:
- name: Run tests
run: |
go test -timeout=10m ./${{ matrix.packages }}/...

# We test the batcher independently because one of its test relies on the failpoint tool
# See the test-failpoint job in op-batcher/justfile for more details
test-batcher:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- uses: jdx/mise-action@v2
with:
version: 2024.12.14 # [default: latest] mise version to install
install: true # [default: true] run `mise install`
cache: true # [default: true] cache mise using GitHub's cache
experimental: true # [default: false] enable experimental features

# We use mise to install golang instead of the setup-go action,
# so we need to do the cache setup ourselves
- name: Go Module Cache
uses: actions/cache@v3
id: go-cache
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-

# Add explicit download on cache miss
# go test runs `go mod download` implicitly, but this separation is nice to see how long downloading vs running tests takes
- name: Download Go modules
if: steps.go-cache.outputs.cache-hit != 'true'
run: go mod download

- name: Run tests
working-directory: op-batcher
run: just test
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ require (
github.com/multiformats/go-multiaddr v0.14.0
github.com/multiformats/go-multiaddr-dns v0.4.1
github.com/olekukonko/tablewriter v0.0.5
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86
github.com/pkg/errors v0.9.1
github.com/pkg/profile v1.7.0
github.com/prometheus/client_golang v1.20.5
Expand Down Expand Up @@ -185,6 +186,7 @@ require (
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
github.com/peterh/liner v1.1.1-0.20190123174540-a2c9a5303de7 // indirect
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
github.com/pingcap/errors v0.11.4 // indirect
github.com/pion/datachannel v1.5.8 // indirect
github.com/pion/dtls/v2 v2.2.12 // indirect
github.com/pion/ice/v2 v2.3.34 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,8 @@ github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9F
github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4=
github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 h1:tdMsjOqUR7YXHoBitzdebTvOjs/swniBTOLy5XiMtuE=
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86/go.mod h1:exzhVYca3WRtd6gclGNErRWb1qEgff3LYta0LvRmON4=
github.com/pion/datachannel v1.5.8 h1:ph1P1NsGkazkjrvyMfhRBUAWMxugJjq2HfQifaOoSNo=
github.com/pion/datachannel v1.5.8/go.mod h1:PgmdpoaNBLX9HNzNClmdki4DYW5JtI7Yibu8QzbL3tI=
github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ/F9s=
Expand Down
2 changes: 1 addition & 1 deletion justfiles/go.just
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,4 @@ go_fuzz FUZZ TIME='10s' PKG='': (go_test PKG _EXTRALDFLAGS "-fuzztime" TIME "-fu

[private]
go_generate SELECTOR *FLAGS:
go generate -v {{FLAGS}} {{SELECTOR}}
go generate -v {{FLAGS}} {{SELECTOR}}
1 change: 1 addition & 0 deletions mise.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ just = "1.37.0"
"go:gotest.tools/gotestsum" = "1.12.0"
"go:github.com/vektra/mockery/v2" = "2.46.0"
"go:github.com/golangci/golangci-lint/cmd/golangci-lint" = "1.61.0"
"go:github.com/pingcap/failpoint/failpoint-toolexec" = "v0.0.0-20240528011301-b51a646c7c86"

# Python dependencies
"pipx:slither-analyzer" = "0.10.2"
Expand Down
8 changes: 6 additions & 2 deletions op-alt-da/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,12 @@ func (c CLIConfig) Check() error {
return nil
}

func (c CLIConfig) NewDAClient() *DAClient {
return &DAClient{url: c.DAServerURL, verify: c.VerifyOnRead, precompute: !c.GenericDA, getTimeout: c.GetTimeout, putTimeout: c.PutTimeout}
func (c CLIConfig) NewDAClient() (*DAClient, error) {
err := c.Check()
if err != nil {
return nil, err
}
return &DAClient{url: c.DAServerURL, verify: c.VerifyOnRead, precompute: !c.GenericDA, getTimeout: c.GetTimeout, putTimeout: c.PutTimeout}, nil
}

func ReadCLIConfig(c *cli.Context) CLIConfig {
Expand Down
6 changes: 4 additions & 2 deletions op-alt-da/daclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ func TestDAClientPrecomputed(t *testing.T) {
}
require.NoError(t, cfg.Check())

client := cfg.NewDAClient()
client, err := cfg.NewDAClient()
require.NoError(t, err)

rng := rand.New(rand.NewSource(1234))

Expand Down Expand Up @@ -85,7 +86,8 @@ func TestDAClientService(t *testing.T) {
}
require.NoError(t, cfg.Check())

client := cfg.NewDAClient()
client, err := cfg.NewDAClient()
require.NoError(t, err)

rng := rand.New(rand.NewSource(1234))

Expand Down
8 changes: 6 additions & 2 deletions op-alt-da/damgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,12 @@ type DA struct {
}

// NewAltDA creates a new AltDA instance with the given log and CLIConfig.
func NewAltDA(log log.Logger, cli CLIConfig, cfg Config, metrics Metricer) *DA {
return NewAltDAWithStorage(log, cfg, cli.NewDAClient(), metrics)
func NewAltDA(log log.Logger, cli CLIConfig, cfg Config, metrics Metricer) (*DA, error) {
daClient, err := cli.NewDAClient()
if err != nil {
return nil, err
}
return NewAltDAWithStorage(log, cfg, daClient, metrics), nil
}

// NewAltDAWithStorage creates a new AltDA instance with the given log and DAStorage interface.
Expand Down
108 changes: 101 additions & 7 deletions op-alt-da/damock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package altda

import (
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"net/http"
"sync"
Expand All @@ -16,22 +18,53 @@ import (
)

// MockDAClient mocks a DA storage provider to avoid running an HTTP DA server
// in unit tests.
// in unit tests. MockDAClient is goroutine-safe.
type MockDAClient struct {
CommitmentType CommitmentType
store ethdb.KeyValueStore
log log.Logger
mu sync.Mutex
CommitmentType CommitmentType
GenericCommitmentCount uint16 // next generic commitment (use counting commitment instead of hash to help with testing)
store ethdb.KeyValueStore
StoreCount int
log log.Logger
dropEveryNthPut uint // 0 means nothing gets dropped, 1 means every put errors, etc.
setInputRequestCount uint // number of put requests received, irrespective of whether they were successful
}

func NewMockDAClient(log log.Logger) *MockDAClient {
return &MockDAClient{
CommitmentType: Keccak256CommitmentType,
store: memorydb.New(),
StoreCount: 0,
log: log,
}
}

// NewCountingGenericCommitmentMockDAClient creates a MockDAClient that uses counting commitments.
// It's commitments are big-endian encoded uint16s of 0, 1, 2, etc. instead of actual hash or altda-layer related commitments.
litt3 marked this conversation as resolved.
Show resolved Hide resolved
// Used for testing to make sure we receive commitments in order following Holocene strict ordering rules.
func NewCountingGenericCommitmentMockDAClient(log log.Logger) *MockDAClient {
return &MockDAClient{
CommitmentType: GenericCommitmentType,
GenericCommitmentCount: 0,
store: memorydb.New(),
StoreCount: 0,
log: log,
}
}

// Fakes a da server that drops/errors on every Nth put request.
// Useful for testing the batcher's error handling.
// 0 means nothing gets dropped, 1 means every put errors, etc.
func (c *MockDAClient) DropEveryNthPut(n uint) {
c.mu.Lock()
defer c.mu.Unlock()
c.dropEveryNthPut = n
}

func (c *MockDAClient) GetInput(ctx context.Context, key CommitmentData) ([]byte, error) {
c.mu.Lock()
defer c.mu.Unlock()
c.log.Debug("Getting input", "key", key)
bytes, err := c.store.Get(key.Encode())
if err != nil {
return nil, ErrNotFound
Expand All @@ -40,12 +73,42 @@ func (c *MockDAClient) GetInput(ctx context.Context, key CommitmentData) ([]byte
}

func (c *MockDAClient) SetInput(ctx context.Context, data []byte) (CommitmentData, error) {
key := NewCommitmentData(c.CommitmentType, data)
return key, c.store.Put(key.Encode(), data)
c.mu.Lock()
defer c.mu.Unlock()
c.setInputRequestCount++
var key CommitmentData
if c.CommitmentType == GenericCommitmentType {
countCommitment := make([]byte, 2)
binary.BigEndian.PutUint16(countCommitment, c.GenericCommitmentCount)
key = NewGenericCommitment(countCommitment)
} else {
key = NewKeccak256Commitment(data)
}
var action string = "put"
if c.dropEveryNthPut > 0 && c.setInputRequestCount%c.dropEveryNthPut == 0 {
action = "dropped"
}
c.log.Debug("Setting input", "action", action, "key", key, "data", fmt.Sprintf("%x", data))
if action == "dropped" {
return nil, errors.New("put dropped")
}
err := c.store.Put(key.Encode(), data)
if err == nil {
c.GenericCommitmentCount++
c.StoreCount++
}
return key, err
}

func (c *MockDAClient) DeleteData(key []byte) error {
return c.store.Delete(key)
c.mu.Lock()
defer c.mu.Unlock()
c.log.Debug("Deleting data", "key", key)
err := c.store.Delete(key)
litt3 marked this conversation as resolved.
Show resolved Hide resolved
if err == nil {
c.StoreCount--
}
return err
}

type DAErrFaker struct {
Expand Down Expand Up @@ -111,6 +174,12 @@ type FakeDAServer struct {
*DAServer
putRequestLatency time.Duration
getRequestLatency time.Duration
// outOfOrderResponses is a flag that, when set, causes the server to send responses out of order.
// It will only respond to pairs of request, returning the second response first, and waiting 1 second before sending the first response.
// This is used to test the batcher's ability to handle out of order responses, while still ensuring holocene's strict ordering rules.
outOfOrderResponses bool
oooMu sync.Mutex
oooWaitChan chan struct{}
}

func NewFakeDAServer(host string, port int, log log.Logger) *FakeDAServer {
Expand All @@ -130,6 +199,21 @@ func (s *FakeDAServer) HandleGet(w http.ResponseWriter, r *http.Request) {

func (s *FakeDAServer) HandlePut(w http.ResponseWriter, r *http.Request) {
time.Sleep(s.putRequestLatency)
if s.outOfOrderResponses {
s.oooMu.Lock()
if s.oooWaitChan == nil {
s.log.Info("Received put request while in out-of-order mode, waiting for next request")
s.oooWaitChan = make(chan struct{})
s.oooMu.Unlock()
<-s.oooWaitChan
time.Sleep(1 * time.Second)
} else {
s.log.Info("Received second put request in out-of-order mode, responding to this one first, then the first one")
close(s.oooWaitChan)
s.oooWaitChan = nil
s.oooMu.Unlock()
}
}
s.DAServer.HandlePut(w, r)
}

Expand All @@ -147,13 +231,23 @@ func (s *FakeDAServer) Start() error {
}

func (s *FakeDAServer) SetPutRequestLatency(latency time.Duration) {
s.log.Info("Setting put request latency", "latency", latency)
s.putRequestLatency = latency
}

func (s *FakeDAServer) SetGetRequestLatency(latency time.Duration) {
s.log.Info("Setting get request latency", "latency", latency)
s.getRequestLatency = latency
}

// When ooo=true, causes the server to send responses out of order.
// It will only respond to pairs of request, returning the second response first, and waiting 1 second before sending the first response.
// This is used to test the batcher's ability to handle out of order responses, while still ensuring holocene's strict ordering rules.
func (s *FakeDAServer) SetOutOfOrderResponses(ooo bool) {
s.log.Info("Setting out of order responses", "ooo", ooo)
s.outOfOrderResponses = ooo
}

type MemStore struct {
db map[string][]byte
lock sync.RWMutex
Expand Down
65 changes: 65 additions & 0 deletions op-alt-da/damock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package altda

import (
"net/http/httptest"
"sync"
"testing"
"time"

"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum/go-ethereum/log"
)

func TestFakeDAServer_OutOfOrderResponses(t *testing.T) {
logger := testlog.Logger(t, log.LevelDebug)
daServer := NewFakeDAServer("localhost", 0, logger)
daServer.SetOutOfOrderResponses(true)

// Channel to track completion order
completionOrder := make(chan int, 2)

// Start two concurrent requests
var wg sync.WaitGroup
wg.Add(2)

// First request
go func() {
defer wg.Done()
w := httptest.NewRecorder()
r := httptest.NewRequest("PUT", "/data", nil)

daServer.HandlePut(w, r)
completionOrder <- 1
}()

// Small delay to ensure first request starts first
time.Sleep(100 * time.Millisecond)

// Second request
go func() {
defer wg.Done()
w := httptest.NewRecorder()
r := httptest.NewRequest("PUT", "/data", nil)

daServer.HandlePut(w, r)
completionOrder <- 2
}()

// Wait for both requests to complete
wg.Wait()
close(completionOrder)

// Check completion order
var order []int
for n := range completionOrder {
order = append(order, n)
}

// Second request should complete before first
if len(order) != 2 {
t.Fatalf("expected 2 requests to complete, got %d", len(order))
}
if order[0] != 2 || order[1] != 1 {
t.Errorf("expected completion order [2,1], got %v", order)
}
}
Loading
Loading