Skip to content

feat: Subscribe to forced inclusion ns events#3146

Draft
alpe wants to merge 12 commits intomainfrom
alex/2803_best_2worlds_fi_rebased2
Draft

feat: Subscribe to forced inclusion ns events#3146
alpe wants to merge 12 commits intomainfrom
alex/2803_best_2worlds_fi_rebased2

Conversation

@alpe
Copy link
Contributor

@alpe alpe commented Mar 9, 2026

Resolves #3145

Refactor DA Subscriptions and Follower Logic

Summary

This PR introduces a unified da.Subscriber primitive to manage the follow and catch-up lifecycle for DA blobs. Previously, the syncing (DAFollower) and forced inclusion (asyncBlockRetriever) systems duplicated aspects of this logic, leading to subtle bugs spanning error handling, backoff retries, and cache memory leaks.

By abstracting the subscription management into a dedicated Subscriber component, we centralize stream merging, height tracking, and fallback handling. Both DAFollower and asyncBlockRetriever now simply implement the SubscriberHandler interface to receive HandleEvent (for inline processing) and HandleCatchup callbacks.

Key Changes

  • New da.Subscriber Primitive: Encapsulates DA subscription logic. Manages an internal followLoop (for inline events) and a catchupLoop (for robust, sequential catch-up when falling behind). Includes support for merging events from multiple DA namespaces.
  • SubscriberHandler Interface: Extracted the core consumer logic into an interface with HandleEvent and HandleCatchup methods.
  • Refactored DAFollower & asyncBlockRetriever: Both components now compose a da.Subscriber and implement SubscriberHandler, reducing duplicate boilerplate and abstracting away the complex gap-filling logic.
  • Test Infrastructure Revamp: Deleted syncer_backoff_test.go and replaced its indirect testing with synchronous unit tests centered around da_follower_test.go and da/subscriber_test.go, improving test reliability and execution speed.
  • Bug Fixes: Resolved previous issues where ErrHeightFromFuture inappropriately triggered backoff retries, and fixed memory leaks associated with stale cache heights in the asyncBlockRetriever.

Architecture Overview

flowchart TD
    DAC[DA Client] -->|Emits SubscriptionEvents| SUB[da.Subscriber]
    
    subgraph da [da.Subscriber Component]
        FL["followLoop<br>Receives live events"] 
        CL["catchupLoop<br>Fills height gaps"]
        
        FL -->|Updates highest seen| CL
    end
    
    SUB --> FL
    
    FL -->|"HandleEvent (inline processing)"| SH{SubscriberHandler API}
    CL -->|"HandleCatchup (sequential processing)"| SH
    
    ABR["asyncBlockRetriever<br>Forced Inclusions"] -.->|implements| SH
    DAF["DAFollower<br>Syncing"] -.->|implements| SH
Loading

@github-actions
Copy link
Contributor

github-actions bot commented Mar 9, 2026

The latest Buf updates on your PR. Results from workflow CI / buf-check (pull_request).

BuildFormatLintBreakingUpdated (UTC)
✅ passed⏩ skipped✅ passed✅ passedMar 11, 2026, 2:17 PM

@claude
Copy link
Contributor

claude bot commented Mar 9, 2026

Claude finished @alpe's task in 2m 16s —— View job


Code Review

  • Gather context and understand the PR
  • Review core new primitive: block/internal/da/subscriber.go
  • Review refactored DAFollower (block/internal/syncing/da_follower.go)
  • Review refactored asyncBlockRetriever (block/internal/da/async_block_retriever.go)
  • Review supporting changes (interfaces, tracing, mocks, tests)
  • Review sequencer changes
  • Compile final review

Summary

This is a solid architectural refactoring that centralises DA subscription lifecycle management into a shared da.Subscriber primitive. The design is clean, the intent is clear, and the PR successfully eliminates meaningful duplication. The review below covers several bugs and concerns that should be addressed before merging.


Critical / Bugs

1. runCatchup infinite loop with no termination conditionsubscriber.go:316

runCatchup loops unconditionally as long as localDAHeight < highestSeenDAHeight. When highestSeenDAHeight hasn't been updated yet (e.g., at startup), the CAS at line 324 will succeed, HandleCatchup is called, and if it returns ErrHeightFromFuture, the function returns. That path is fine. However: if HandleCatchup keeps returning a non-future transient error (e.g., a network blip), shouldContinueCatchup applies the backoff — but then the outer for-loop retries immediately without re-checking ctx.Err() at the top of the next iteration until after the CAS. The ctx.Err() check at line 318 would be hit, but it relies on the context being cancelled rather than a signal. In practice this works but the flow is subtle. Consider using select at the retry point to be explicit.

2. Prefetch in HandleCatchup ignores ErrHeightFromFutureasync_block_retriever.go:193–206

for h := daHeight + 1; h <= target; h++ {
    ...
    if err := f.fetchAndCacheBlock(ctx, h); err != nil {
        return err  // returns ErrHeightFromFuture, rolls back localDAHeight
    }
}

The prefetch loop returns ErrHeightFromFuture for any speculative future height that isn't available yet, which causes the subscriber's catchup loop to roll back localDAHeight to daHeight and retry — even though daHeight itself was already cached successfully. This means the node permanently stalls trying to prefetch a window that hasn't landed on DA yet: it fetches daHeight, succeeds, then fails the prefetch, rolls back, re-fetches daHeight (cache hit), then fails the prefetch again, ad infinitum. Fix this →

3. HandleEvent in asyncBlockRetriever always returns an error when isInline=trueasync_block_retriever.go:179

func (f *asyncBlockRetriever) HandleEvent(..., isInline bool) error {
    f.cacheBlock(...)
    if isInline {
        return errors.New("async block retriever relies on catchup state machine")
    }
    return nil
}

The subscriber code path for isInline=true rollback is: if HandleEvent errors while isInline, the subscriber resets localDAHeight back. But in asyncBlockRetriever this is intentional — it always wants catchup to drive height advancement. The caching still happens (f.cacheBlock above the guard), which is correct. However, this design means every inline event will generate a debug log message: "inline processing skipped/failed, rolling back" (subscriber.go:209) — which is noise. Consider either suppressing that log for the "intentional refusal" case, or having HandleEvent return a sentinel that the subscriber can distinguish from a real failure. The current approach works but will produce spurious warning-level (it's debug-level in practice) log churn.


Architecture / Design

4. Subscriber.cancel nil-check in Stopsubscriber.go:119–124

func (s *Subscriber) Stop() {
    if s.cancel != nil {
        s.cancel()
    }
    s.wg.Wait()
}

If Stop is called without a preceding Start (e.g., when len(namespaces)==0 and Start returns early without setting cancel), the wg.Wait() is harmless since wg.Add(2) was never called. But Start calls wg.Add(2) before setting s.cancel, which means there's a window in Start where cancel is not yet set but the goroutines have started. If Stop races with Start (unlikely but possible during shutdown), Stop may see cancel == nil and hang on wg.Wait() forever. Consider setting cancel before wg.Add. Fix this →

5. Inline CAS logic is fragilesubscriber.go:201

local := s.localDAHeight.Load()
isInline := ev.Height == local && s.localDAHeight.CompareAndSwap(local, local+1)

The inline path is claimed when ev.Height == localDAHeight and the CAS succeeds. But the CAS can spuriously fail if the catchupLoop concurrently advances localDAHeight — in that case isInline becomes false, the event is dropped, and catchupLoop is expected to pick it up. That's fine. The concern is the inverse: if two subscription events arrive with the same Height (possible with merged namespaces from mergeSubscriptions — two namespaces can emit an event for the same DA block), the first one succeeds the CAS and becomes inline; the second one finds ev.Height != local (since local is already height+1) and becomes non-inline. Since there's only one catchup iteration per DA height, the second event is silently discarded. This is likely intentional (deduplication), but it should be documented. The current comment only mentions the CAS claiming mechanic, not that duplicate-height events from merged namespaces are discarded.

6. subscribe only deduplicates index 0 vs. others, not pairwisesubscriber.go:236

for i := 1; i < len(s.namespaces); i++ {
    if bytes.Equal(s.namespaces[i], s.namespaces[0]) {
        continue // Same namespace, skip duplicate.
    }

This skips namespaces[i] only if it equals namespaces[0], but does not check for duplicates among namespaces[1..n]. If namespaces[1] == namespaces[2] (neither equal to namespaces[0]), two subscriptions are opened on the same namespace. This is a bug when DAFollowerConfig specifies e.g. Namespace == "ns1" and DataNamespace == "ns2" where some other caller might pass three or more namespaces. For the current two-namespace usage it's probably safe, but the code doesn't document this limitation and is easy to misuse.


Error Handling

7. UpdateCurrentHeight calls cleanupOldBlocks with context.Background()async_block_retriever.go:125

f.cleanupOldBlocks(context.Background(), height)

Using context.Background() here means cache cleanup can't be cancelled even after the retriever is stopped. This is a minor resource waste. Consider threading the retriever's lifecycle context through, or calling cleanup from the subscriber goroutine where the context is already available.

8. fetchAndCacheBlock silently succeeds on unknown status codesasync_block_retriever.go:229–234

default:
    f.logger.Debug().
        ...
        Msg("failed to retrieve block - will retry")
}
return nil

Unknown status codes log a debug message and return nil (success!), so the height is considered "cached" with no data and the subscriber advances localDAHeight. The correct behaviour is probably to return an error and let the backoff loop retry, or at minimum use a warning-level log. The function's name (fetchAndCacheBlock) and contract (called from the retry loop) suggest failures should propagate.


Performance

9. Priority-height queue grows unboundedlyda_follower.go:188–194

QueuePriorityHeight inserts into a sorted slice with slices.Insert. The slice is only bounded by the number of distinct P2P hints seen before they're consumed by HandleCatchup. In pathological scenarios (many P2P peers sending far-future hints), this slice could grow without bound. Consider capping the queue at a reasonable maximum (e.g., 1000 entries).

10. Cache cleanup uses a full scanasync_block_retriever.go:272–297

cleanupOldBlocks queries all keys with prefix /block/ and then filters by height. Since the in-memory datastore (go-datastore/sync) doesn't support range queries, this is O(n) in the total number of cached entries. For a small prefetch window this is fine, but it's worth a comment explaining why the full scan is acceptable and what the expected max cache size is.


Testing

11. subscriber_test.go only tests runCatchup directly — tests are thin for the followLoop / runSubscription paths

There are no tests for:

  • Watchdog timeout triggering a reconnect
  • Merged subscriptions (multiple namespaces)
  • updateHighest being called while runCatchup is in-flight (concurrent access stress)
  • The headReached transition

The TestAsyncBlockRetriever_SubscriptionDrivenCaching test at async_block_retriever_test.go:46 partially covers the follow path, but it relies on timing (time.Sleep(25 * time.Millisecond)) which is inherently flaky in CI. A channel-signalled approach would be more reliable.

12. Test: backoff_on_error asserts the wrong LocalDAHeightsubscriber_test.go:96

assert.Equal(t, uint64(100), sub.LocalDAHeight(), "should roll back to 100 on future error")

When HandleCatchup(100) returns ErrHeightFromFuture, runCatchup rolls back to 100 and sets headReached = true, then returns. But the test calls cancel() inside the second mock invocation before runCatchup observes the context — so the comment "should roll back to 100 on future error" is accurate but the assertion doesn't test the backoff path, it tests the future-error path. The test name says backoff_on_error but what it actually verifies is: after a network failure triggers backoff, the next attempt returns ErrHeightFromFuture and the height remains 100. This is useful but the naming is misleading.


Minor / Style

13. daFollower.priorityHeights nil-sets on emptyda_follower.go:207–209

if len(f.priorityHeights) == 0 {
    f.priorityHeights = nil
}

This nil-sets the slice when empty to save memory. While not wrong, it's inconsistent: QueuePriorityHeight uses slices.Insert which may re-allocate. This pattern of nil-ing and re-growing creates small but repeated GC pressure. A minor point but worth considering if the priority queue is expected to be hot.

14. daFollower.HandleCatchup: priority loop break semanticsda_follower.go:149

break // continue with daHeight

The comment says "continue with daHeight" but the break exits the for-loop, and the code falls through to the sequential fetch. This is correct but reads confusingly — break in a for loop is not idiomatic when you want to "proceed to the next step". The surrounding comment helps but could be restructured with a label or a boolean flag for clarity.

15. Deprecated FetchBlockTimestamp field documented but not gatedsubscriber.go:39–40

The // Deprecated: comment links to an issue but the field is still wired through the entire call graph. It would be helpful to add a TODO comment near the usage in NewAsyncBlockRetriever (where it's passed as true) noting the specific version/issue where it will be removed, consistent with the other deprecation notices.


Documentation

16. SubscriberHandler.HandleCatchup doc comment has stale contentsubscriber.go:26–28

// Returning (false, nil) rolls back localDAHeight without triggering a backoff.

The actual signature is HandleCatchup(ctx context.Context, height uint64) error — there is no bool return value. The doc comment references a prior interface design. Fix this →


Checklist

Area Status
Architecture / DRY ✅ Clear improvement over prior duplication
Concurrency correctness ⚠️ Start/Stop race (item 4), inline CAS edge case (item 5)
Error handling ⚠️ Prefetch returns ErrHeightFromFuture incorrectly (item 2), unknown status silently succeeds (item 8)
Test coverage ⚠️ Follow-loop paths untested, timing-sensitive tests
Documentation ⚠️ Stale HandleCatchup doc (item 16)
Performance ✅ Acceptable for expected scale, minor concerns noted

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 9, 2026

Important

Review skipped

Draft detected.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 84cd25c6-68c9-4ef4-abef-395a0925106a

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch alex/2803_best_2worlds_fi_rebased2

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

alpe added 6 commits March 10, 2026 11:57
* main:
  fix: persist snapshot once for avoiding badger vlog (#3153)
  build(deps): Bump docker/setup-buildx-action from 3 to 4 (#3148)
  build(deps): Bump the all-go group across 5 directories with 8 updates (#3147)
  build(deps): Bump docker/login-action from 3 to 4 (#3149)
  build(deps): Bump docker/build-push-action from 6 to 7 (#3151)
  build(deps): Bump actions/setup-go from 6.2.0 to 6.3.0 (#3150)
@codecov
Copy link

codecov bot commented Mar 11, 2026

Codecov Report

❌ Patch coverage is 63.36898% with 137 lines in your changes missing coverage. Please review.
✅ Project coverage is 60.49%. Comparing base (abb6b71) to head (aca762f).

Files with missing lines Patch % Lines
block/internal/da/subscriber.go 65.06% 48 Missing and 10 partials ⚠️
...quencers/common/forced_inclusion_retriever_mock.go 0.00% 20 Missing ⚠️
block/internal/syncing/da_follower.go 80.88% 9 Missing and 4 partials ⚠️
block/internal/da/async_block_retriever.go 81.81% 8 Missing and 4 partials ⚠️
block/internal/da/client.go 0.00% 9 Missing ⚠️
block/internal/syncing/da_retriever.go 68.18% 4 Missing and 3 partials ⚠️
block/internal/syncing/syncer.go 33.33% 4 Missing ⚠️
block/internal/da/forced_inclusion_retriever.go 25.00% 2 Missing and 1 partial ⚠️
block/internal/common/event.go 0.00% 2 Missing ⚠️
block/internal/da/forced_inclusion_tracing.go 0.00% 2 Missing ⚠️
... and 4 more
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3146      +/-   ##
==========================================
+ Coverage   60.20%   60.49%   +0.28%     
==========================================
  Files         115      117       +2     
  Lines       11896    11939      +43     
==========================================
+ Hits         7162     7222      +60     
+ Misses       3923     3899      -24     
- Partials      811      818       +7     
Flag Coverage Δ
combined 60.49% <63.36%> (+0.28%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@alpe alpe changed the title [WIP] feat: Subscribe to forced inclusion ns events feat: Subscribe to forced inclusion ns events Mar 11, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant