Skip to content

Conversation

@julienrbrt
Copy link
Member

Follow-up of #2797. Blocked on #2797 being merged.

Implements part of #2797 (comment)

@github-actions
Copy link
Contributor

github-actions bot commented Nov 14, 2025

The latest Buf updates on your PR. Results from workflow CI / buf-check (pull_request).

BuildFormatLintBreakingUpdated (UTC)
✅ passed⏩ skipped✅ passed✅ passedNov 17, 2025, 2:51 PM

@claude
Copy link
Contributor

claude bot commented Nov 14, 2025

Claude finished @julienrbrt's task —— View job


Comprehensive Code Review ✅

I've completed a thorough review of the async force inclusion fetching implementation. Here are my findings across all focus areas:


1. Code Quality & Architecture

Strengths

  • Well-structured modular design: The separation between ForcedInclusionRetriever (DA layer) and BasedSequencer is clean and follows the codebase's zero-dependency core pattern
  • Idiomatic Go patterns: Good use of atomic types, context.Context, sync.WaitGroup, and proper cleanup patterns
  • Clear interfaces: The ForcedInclusionRetriever interface in block/public.go provides a clean public API

Issues & Concerns

Critical: Potential deadlock in CAS loops ⚠️

block/internal/da/forced_inclusion_retriever.go:59-72, 74-88, 103-117, 179-188

The lock-free epochCache implementation has infinite CAS retry loops without backoff:

func (c *epochCache) set(epochStart uint64, event *ForcedInclusionEvent) {
    for {
        oldEventsPtr := c.events.Load()
        // ... copy and update ...
        if c.events.CompareAndSwap(oldEventsPtr, &newEvents) {
            return
        }
        // No backoff or limit! Will spin forever under high contention
    }
}

Problem: Under high contention (e.g., concurrent calls from background fetcher + sync requests), these loops could spin indefinitely, wasting CPU cycles. While CAS loops are appropriate here, adding a yield or short backoff after failures is a best practice.

Recommendation: Add runtime.Gosched() after failed CAS attempts:

if c.events.CompareAndSwap(oldEventsPtr, &newEvents) {
    return
}
runtime.Gosched() // Yield to scheduler

Medium: Race condition window in SetDAHeight ⚠️

block/internal/da/forced_inclusion_retriever.go:178-188

func (r *ForcedInclusionRetriever) SetDAHeight(height uint64) {
    for {
        current := r.currentDAHeight.Load()
        if height <= current {
            return  // Early return - good!
        }
        if r.currentDAHeight.CompareAndSwap(current, height) {
            return
        }
    }
}

Issue: While the CAS prevents incorrect updates, there's a conceptual issue: if SetDAHeight(105) and SetDAHeight(110) are called concurrently, and 105 wins first, the 110 call will succeed on retry. But if heights are being set from different sources (DA worker vs retriever), this could mask ordering issues.

Recommendation: Consider adding debug logging when updates are skipped to detect potential issues:

if height <= current {
    r.logger.Debug().Uint64("current", current).Uint64("attempted", height).Msg("skipping stale DA height update")
    return
}

Medium: Background fetcher starts immediately without validation

block/internal/da/forced_inclusion_retriever.go:160-164

if client.HasForcedInclusionNamespace() {
    r.fetcherWg.Add(1)
    go r.backgroundFetcher()
}

The background goroutine starts immediately in the constructor, which is generally considered an anti-pattern. This makes lifecycle management harder and testing more complex.

Recommendation: Add an explicit Start() method or pass a context to the constructor. This allows callers to control when background work begins.

Low: Magic numbers without constants

block/internal/da/forced_inclusion_retriever.go:154, 270, 292-294

  • newEpochCache(10) - hardcoded max samples
  • context.WithTimeout(r.fetcherCtx, 30*time.Second) - hardcoded timeout
  • Cleanup keeps "last 5 epochs" - hardcoded

Recommendation: Extract as named constants with documentation explaining the rationale.


2. Error Handling & Resilience

Strengths

  • Proper context handling throughout with timeouts
  • Good use of sentinel errors (ErrForceInclusionNotConfigured, ErrHeightFromFuture)
  • Defensive nil checks in cleanup functions

Issues

Critical: Background fetcher silently drops errors ⚠️

block/internal/da/forced_inclusion_retriever.go:273-276

event, err := r.fetchEpochSync(ctx, epochStart)
if err != nil {
    r.logger.Debug().Err(err).Uint64("epoch_start", epochStart).Msg("failed to fetch epoch in background")
    return  // Error silently dropped!
}

Problem: The background fetcher logs failures at DEBUG level and continues. Under persistent DA failures, this means:

  1. No cache warming occurs (defeating the purpose)
  2. Synchronous requests will always block
  3. No alerting that the async optimization isn't working
  4. Debug logs may not be visible in production

Recommendation:

  • Log at WARN or ERROR level for persistent failures
  • Consider exponential backoff for repeated failures
  • Add metrics to track background fetch success/failure rates

Medium: Incomplete error wrapping

block/internal/da/forced_inclusion_retriever.go:367, 377

return nil, fmt.Errorf("%w: epoch start height %d not yet available", coreda.ErrHeightFromFuture, epochStart)

Good use of %w, but some error paths in processForcedInclusionBlobs don't wrap errors, making debugging harder:

return fmt.Errorf("failed to retrieve forced inclusion blobs at height %d: %s", height, result.Message)
// Should be: fmt.Errorf("failed to retrieve forced inclusion blobs at height %d: %w", height, err)

Low: No timeout configuration

sequencers/based/sequencer.go:92

forcedTxsEvent, err := s.fiRetriever.RetrieveForcedIncludedTxs(ctx, currentDAHeight)

The sequencer passes through the request context directly. If the caller doesn't set a timeout, this could block indefinitely.

Recommendation: Set a maximum timeout in the sequencer to protect against long DA fetches.


3. Security

Issues

Critical: No validation of cached events before serving ⚠️

block/internal/da/forced_inclusion_retriever.go:329-334

if cachedEvent, ok := r.epochCache.get(epochStart); ok {
    r.logger.Debug().Uint64("epoch_start", epochStart).Int("tx_count", len(cachedEvent.Txs)).Msg("using cached forced inclusion transactions")
    return cachedEvent, nil  // Returns pointer to cached event!
}

Problem: The cache returns a pointer to the shared cached event. Callers can mutate cachedEvent.Txs (it's a slice), affecting all subsequent retrievals. This could lead to:

  1. Data races if callers process events concurrently
  2. Corruption of cached data
  3. Security issues if malicious code modifies transactions

Recommendation: Return a deep copy of cached events:

if cachedEvent, ok := r.epochCache.get(epochStart); ok {
    // Return a copy to prevent mutation
    eventCopy := &ForcedInclusionEvent{
        StartDaHeight: cachedEvent.StartDaHeight,
        EndDaHeight:   cachedEvent.EndDaHeight,
        Txs:           make([][]byte, len(cachedEvent.Txs)),
    }
    copy(eventCopy.Txs, cachedEvent.Txs)
    return eventCopy, nil
}

Medium: Blob size validation only in sequencer, not retriever

sequencers/based/sequencer.go:130-138

Validation happens in the sequencer but not during fetch. If extremely large blobs are fetched, they'll be cached and consume memory even if they'll be rejected later.

Recommendation: Add size validation in processForcedInclusionBlobs before caching.

Low: No rate limiting on DA requests

The background fetcher runs every 2 seconds without backoff on failures. This could overwhelm the DA layer if it's experiencing issues.


4. Performance & Resource Efficiency

Strengths

  • Excellent async prefetching design with adaptive window sizing
  • Efficient lock-free cache using atomics
  • Good use of time-based metrics for adaptive behavior

Issues

Critical: Memory leak potential - unbounded cache growth ⚠️

block/internal/da/forced_inclusion_retriever.go:283-299

r.epochCache.set(epochStart, event)

// Cleanup old epochs (keep last 5 epochs)
if epochStart >= r.genesis.DAStartHeight+r.daEpochSize*5 {
    cleanupBefore := epochStart - r.daEpochSize*5
    // ...
    r.epochCache.cleanup(cleanupBefore)
}

Problem: Cleanup only happens when fetching new epochs. If the background fetcher stops fetching (DA is down, errors, etc.), the cache never gets cleaned up. Over time, this could accumulate stale entries.

Scenario:

  1. Background fetcher runs for days, caching hundreds of epochs
  2. DA goes down for a period
  3. Cache cleanup stops (no new fetches trigger it)
  4. Memory keeps growing with stale data

Recommendation: Add periodic cleanup timer or use an LRU cache with max size.

Medium: Unnecessary allocations in CAS loops

block/internal/da/forced_inclusion_retriever.go:60-72

for {
    oldEventsPtr := c.events.Load()
    oldEvents := *oldEventsPtr
    newEvents := make(map[uint64]*ForcedInclusionEvent, len(oldEvents)+1)  // Allocated every retry!
    for k, v := range oldEvents {
        newEvents[k] = v
    }
    // ...
}

On CAS failure, the allocation and copy are wasted. For a cache with 5-10 entries, this is likely fine, but it's inefficient.

Recommendation: Consider using a simple sync.RWMutex instead. The lock-free approach adds complexity without clear benefits for this use case (low contention expected).

Medium: Background fetcher timing issue

block/internal/da/forced_inclusion_retriever.go:222-235

ticker := time.NewTicker(defaultFetchInterval)
for {
    select {
    case <-r.fetcherCtx.Done():
        return
    case <-ticker.C:
        r.fetchNextEpoch()  // This can take 30+ seconds!
    }
}

If fetchNextEpoch() takes longer than defaultFetchInterval (2s), ticks will queue up. The 30-second timeout means fetches could overlap or queue.

Recommendation: Reset/recreate the ticker after each fetch completes, or use a different scheduling pattern.

Low: Sequencer queue uses full copy-on-write

sequencers/based/sequencer.go:141-149

for {
    oldQueuePtr := s.txQueue.Load()
    oldQueue := *oldQueuePtr
    newQueue := append(oldQueue, tx)  // Copies entire queue every tx!
    if s.txQueue.CompareAndSwap(oldQueuePtr, &newQueue) {
        validTxs++
        break
    }
}

For each forced inclusion tx, the entire queue is copied. With large epochs (hundreds of txs), this could be expensive.

Recommendation: Consider batching queue updates or using a concurrent queue implementation.


5. Concurrency Safety

Strengths

  • Good use of atomic operations for shared state
  • Proper WaitGroup usage for goroutine lifecycle
  • Context-based cancellation

Issues

Critical: Data race in epochCache.get() ⚠️

block/internal/da/forced_inclusion_retriever.go:53-57

func (c *epochCache) get(epochStart uint64) (*ForcedInclusionEvent, bool) {
    events := c.events.Load()
    event, ok := (*events)[epochStart]  // Returns pointer from map!
    return event, ok
}

Problem: The method returns a pointer to a ForcedInclusionEvent stored in the map. While the map itself is atomically loaded, the event data (particularly event.Txs slice) can be modified by callers without synchronization. This is a classic race condition.

Data race scenario:

  1. Background fetcher caches event at epoch 100
  2. Caller A retrieves event via get(100) and starts reading Txs
  3. Caller B retrieves same event and modifies Txs
  4. DATA RACE: Caller A reads while Caller B writes

Evidence: This issue is related to the security concern above but is more severe from a concurrency perspective. The Go race detector would catch this.

Recommendation: Must return defensive copies or document that callers must not mutate returned events.

Medium: BasedSequencer queue race

sequencers/based/sequencer.go:170-206

The createBatchFromQueue uses CAS correctly, but the pattern where the queue is checked for emptiness and then processed creates a TOCTOU (time-of-check-time-of-use) issue:

queuePtr := s.txQueue.Load()
queue := *queuePtr
if len(queue) == 0 {
    return &coresequencer.Batch{Transactions: nil}
}

Between the length check and the CAS later, another goroutine could modify the queue. The CAS handles this, but the retry could be more efficient.


6. Testing & Reliability

Strengths

  • Comprehensive unit tests with good edge case coverage
  • Proper use of mocks
  • Tests for epoch boundaries, errors, and multi-height epochs

Issues

Critical: Missing test for background fetcher lifecycle ⚠️

There are no tests verifying:

  • Background fetcher properly stops on StopBackgroundFetcher()
  • No goroutine leaks when retriever is destroyed
  • Background fetcher handles context cancellation correctly
  • What happens if DA is unreachable for extended periods

Recommendation: Add tests using goroutine leak detection (e.g., goleak) and verify clean shutdown.

Critical: No concurrency tests ⚠️

Given the heavy use of lock-free atomics and CAS loops, there are no tests exercising concurrent access:

  • Multiple goroutines calling RetrieveForcedIncludedTxs concurrently
  • Background fetcher running while sync requests come in
  • Concurrent SetDAHeight calls
  • Race detector testing

Recommendation: Add property-based tests or stress tests with go test -race.

Medium: Insufficient test coverage (58.06%)

Per codecov, 104 lines are missing coverage, primarily in:

  • block/internal/syncing/da_retriever.go - 97 missing lines (47.12% coverage)
  • Error handling paths
  • Background fetcher logic

Recommendation: Focus on testing error paths and the background fetcher behavior.

Low: Tests don't verify adaptive window calculation

block/internal/da/forced_inclusion_retriever.go:195-216

The calculateAdaptiveEpochWindow() function has no direct tests. This is a key performance feature that should be tested with various average fetch times.


7. Documentation & Developer Experience

Strengths

  • Clear function and type comments
  • Good logging with structured fields
  • Exported types properly documented

Issues

Medium: Missing package-level documentation

Neither block/internal/da/ nor sequencers/based/ have package docs explaining the async fetching architecture and how components interact.

Recommendation: Add package comments explaining:

  • The async prefetching strategy
  • How background fetcher coordinates with sync requests
  • Adaptive window sizing rationale

Medium: AI-generated code disclaimer

The author noted "this func and what it calls is AI" in the review comments. While AI-generated code isn't inherently problematic, it should be thoroughly reviewed and tested, especially for concurrent/performance-critical code like this.

Recommendation: Ensure all AI-generated sections have comprehensive tests and have been manually reviewed for correctness.

Low: Inconsistent logging levels

Background fetcher uses DEBUG for errors, but these should be WARN/ERROR for operational visibility.


Summary & Priority Recommendations

Must Fix Before Merge (Critical) 🔴

  1. Data race in epochCache.get(): Return deep copies of cached events to prevent races and mutation
  2. Missing concurrency tests: Add tests with -race flag and concurrent access patterns
  3. Background fetcher error handling: Log at appropriate levels and add metrics
  4. Memory leak potential: Add periodic cleanup or bounded cache
  5. CAS loop deadlock potential: Add runtime.Gosched() or limits to prevent CPU spinning

Should Fix (High Priority) 🟡

  1. Test coverage: Add tests for background fetcher lifecycle and error paths
  2. Blob size validation: Move to retriever before caching
  3. Background fetcher lifecycle: Don't start in constructor, add explicit Start() method
  4. Package documentation: Document the async architecture

Nice to Have (Medium Priority) 🟢

  1. Magic numbers: Extract constants
  2. Sequencer timeout: Set maximum fetch timeout
  3. Adaptive window tests: Test the calculation logic
  4. Performance optimizations: Consider sync.RWMutex instead of lock-free for cache

Positive Notes ✅

  • The async prefetching design is excellent and will significantly improve performance
  • Adaptive window sizing based on fetch times is a clever optimization
  • The separation of concerns between components is clean
  • Most error paths are handled correctly
  • Test coverage for basic functionality is solid

The core idea and implementation are sound. The issues identified are primarily around concurrency safety, error visibility, and resource management - all fixable with targeted improvements.


@julienrbrt julienrbrt changed the title feat: optimize force inclusion fetching feat: async force inclusion fetching Nov 14, 2025
@codecov
Copy link

codecov bot commented Nov 14, 2025

Codecov Report

❌ Patch coverage is 53.99061% with 98 lines in your changes missing coverage. Please review.
⚠️ Please upload report for BASE (julien/fi@c65c330). Learn more about missing BASE report.

Files with missing lines Patch % Lines
block/internal/da/forced_inclusion_retriever.go 37.41% 96 Missing and 1 partial ⚠️
block/internal/syncing/syncer.go 0.00% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             julien/fi    #2842   +/-   ##
============================================
  Coverage             ?   64.77%           
============================================
  Files                ?       85           
  Lines                ?     7838           
  Branches             ?        0           
============================================
  Hits                 ?     5077           
  Misses               ?     2199           
  Partials             ?      562           
Flag Coverage Δ
combined 64.77% <53.99%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

}

// calculateAdaptiveEpochWindow calculates the epoch lag window based on average fetch time
func (r *daRetriever) calculateAdaptiveEpochWindow() uint64 {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this func and what it calls is AI. i think it makes sense.

Merges forced inclusion implementation from julien/fi branch while preserving
existing features from julien/async-fi. Key changes:

- Refactored forced inclusion into clean block/internal/da package
- Added ForcedInclusionRetriever and DAClient abstractions
- Updated based sequencer to use new ForcedInclusionRetriever interface
- Updated single sequencer with forced inclusion support
- Added size validation for forced inclusion transactions
- Removed async background fetcher in favor of simpler synchronous approach
- Updated tests to match new behavior
- Kept atomic.Pointer queue in based sequencer for thread safety

The merge preserves forced inclusion functionality while adopting the
cleaner architecture from julien/fi.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

Status: No status

Development

Successfully merging this pull request may close these issues.

2 participants