refactor(submitter): concurrent submitter#3287
Conversation
📝 WalkthroughWalkthroughExtends the cache manager with in-flight range reset methods and refactors pending tracking to use claims/gaps; converts DA submission to asynchronous, callback-driven submission with centralized retry/backoff and lifecycle Changes
Sequence DiagramssequenceDiagram
participant Submitter as Submitter
participant Cache as Cache (pending manager)
participant DASubmitter as DA Submitter
participant DA as DA Layer
Submitter->>Cache: GetPendingHeaders()
Cache->>Cache: select contiguous unclaimed range\nregister in-flight claim
Cache-->>Submitter: pending items & range
Submitter->>DASubmitter: SubmitHeaders(batch, onSuccess, onError)
DASubmitter-->>Submitter: return nil (async)
DASubmitter->>DASubmitter: spawn goroutine -> submitWithRetry
DASubmitter->>DA: submitToDA(batch)
alt Success
DA-->>DASubmitter: success
DASubmitter->>Cache: update post-submit state
DASubmitter->>Submitter: call onSuccess()
else Retryable failure
DA-->>DASubmitter: error
DASubmitter->>DASubmitter: backoff & retry
else Non-retryable / terminal
DA-->>DASubmitter: error
DASubmitter->>Submitter: call onError(error)
Submitter->>Cache: ResetInFlightHeaderRange(start,end)
end
sequenceDiagram
participant SubmitLoop as Submit loop
participant DASubmitter as DA Submitter
participant Cache as pendingBase
SubmitLoop->>Cache: GetPendingData()
Cache->>Cache: choose first unclaimed contiguous range\nregister claim, remove overlapping gaps
Cache-->>SubmitLoop: pending items + range
SubmitLoop->>DASubmitter: SubmitData(batch, onSuccess, onError)
DASubmitter-->>SubmitLoop: nil
DASubmitter->>DASubmitter: submitWithRetry -> onSuccess/onError callbacks
alt onError called
SubmitLoop->>Cache: ResetInFlightDataRange(start,end)
Cache->>Cache: remove claim, add failing portion as gap
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Review rate limit: 0/1 reviews remaining, refill in 60 minutes.Comment |
|
The latest Buf updates on your PR. Results from workflow CI / buf-check (pull_request).
|
|
Claude finished @julienrbrt's task in 3m 19s —— View job Code Review:
|
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #3287 +/- ##
==========================================
+ Coverage 62.45% 62.52% +0.06%
==========================================
Files 122 122
Lines 13047 13078 +31
==========================================
+ Hits 8149 8177 +28
+ Misses 4012 4011 -1
- Partials 886 890 +4
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
block/internal/submitting/da_submitter_tracing.go (1)
33-61:⚠️ Potential issue | 🟠 MajorThe span now ends before the async submission does.
inner.SubmitHeaders/SubmitDatanow return right after scheduling background work, sodefer span.End()closes the span before retries/callbacks run. Async failures routed throughonSubmitErrornever reach the span, and the recorded latency is only the enqueue time. Wrap the callbacks and end/annotate the span from the actual completion path instead.Also applies to: 64-92
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@block/internal/submitting/da_submitter_tracing.go` around lines 33 - 61, The current tracedDASubmitter starts a span in SubmitHeaders (and similarly in SubmitData) but defers span.End(), which closes the span before async work completes; instead remove the immediate defer span.End() and wrap the onSubmitSuccess and onSubmitError callbacks with closures that record errors/status on the span and call span.End() when the async completion path runs; i.e., in tracedDASubmitter.SubmitHeaders and SubmitData, create wrappedOnSuccess := func(){ span.SetStatus(codes.Ok, ""); onSubmitSuccess(); span.End() } and wrappedOnError := func(err error){ span.RecordError(err); span.SetStatus(codes.Error, err.Error()); onSubmitError(err); span.End() } (or equivalent wrappers matching original signatures) and pass those to t.inner.SubmitHeaders/SubmitData so the span lifetime and annotations reflect actual completion.block/internal/cache/pending_base.go (1)
80-96:⚠️ Potential issue | 🟠 MajorTake
lastHeightand the in-flight ranges under one synchronization boundary.
getPending()readslastHeightbefore cloninginFlightClaims/gaps, whilesetLastSubmittedHeight()updateslastHeightand trims those slices independently. With the new concurrent submitter, thelastHeight=old+claims already trimmedinterleaving can makefindAvailableRange()hand out heights that were just acknowledged, causing duplicate DA submissions.As per coding guidelines "Be careful with concurrent access to shared state".
Also applies to: 175-189
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@block/internal/cache/pending_base.go` around lines 80 - 96, getPending() currently reads pb.lastHeight (pb.lastHeight.Load()) outside the pb.inFlightMu critical section and then clones pb.inFlightClaims and pb.gaps, which allows an interleaving with setLastSubmittedHeight() that trims those slices and updates lastHeight causing findAvailableRange() to return already-acknowledged heights; fix by moving the read of pb.lastHeight inside the same pb.inFlightMu.Lock()/Unlock() block where you clone inFlightClaims and gaps so lastHeight and the in-flight ranges are read atomically, and apply the same locking discipline to setLastSubmittedHeight() (acquire pb.inFlightMu while trimming inFlightClaims/gaps and updating pb.lastHeight) to prevent races when findAvailableRange, getPending, and setLastSubmittedHeight interact.
🧹 Nitpick comments (3)
block/internal/cache/pending_data.go (1)
88-90: Document the new exported reset helper.
ResetInFlightDataRangeis public, but it has no doc comment. That violates the repo's Go guideline for exported identifiers and will likely fail linting.💡 Suggested fix
+// ResetInFlightDataRange clears the in-flight data claim state for heights in [start, end]. func (pd *PendingData) ResetInFlightDataRange(start, end uint64) {As per coding guidelines, Document exported types and functions in Go code.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@block/internal/cache/pending_data.go` around lines 88 - 90, Add a Go doc comment for the exported method ResetInFlightDataRange on type PendingData: describe what the method does, its parameters (start, end uint64) and any important behavior or side-effects (it delegates to pd.base.resetInFlightRange to reset the in-flight data range). Place the comment immediately above the func declaration for ResetInFlightDataRange so it satisfies Go exported identifier documentation guidelines.block/internal/cache/pending_headers.go (1)
83-85: Document the new exported reset helper.
ResetInFlightHeaderRangeis public, but it has no doc comment. That violates the repo's Go guideline for exported identifiers and will likely fail linting.💡 Suggested fix
+// ResetInFlightHeaderRange clears the in-flight header claim state for heights in [start, end]. func (ph *PendingHeaders) ResetInFlightHeaderRange(start, end uint64) {As per coding guidelines, Document exported types and functions in Go code.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@block/internal/cache/pending_headers.go` around lines 83 - 85, Add a Go doc comment for the exported method PendingHeaders.ResetInFlightHeaderRange describing its purpose and behavior: explain that it resets the in-flight header range tracked by the PendingHeaders instance between the inclusive start and end uint64 indices, mention any side effects (it delegates to ph.base.resetInFlightRange), and document the parameters (start, end) and any expectations (e.g., inclusive bounds or preconditions). Place the comment immediately above the ResetInFlightHeaderRange method.block/internal/submitting/submitter_test.go (1)
424-440: Exercise the new callbacks in the fake submitter.This test double now accepts
onSubmitSuccess/onSubmitError, but it still discards them. That means the loop test only verifies enqueueing, not the success/error lifecycle that now updates timestamps and resets in-flight cache state.Consider invoking the callbacks when non-nil or adding a focused test that covers that contract.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@block/internal/submitting/submitter_test.go` around lines 424 - 440, The fakeDASubmitter currently drops the provided callbacks in SubmitHeaders and SubmitData; update these methods (SubmitHeaders and SubmitData on fakeDASubmitter) to call the supplied on-success and on-error callbacks when they are non-nil so the test exercises the full success/error lifecycle (e.g., invoke the success callback when you want the fake to simulate success, or invoke the error callback with a test error to simulate failure), while preserving the existing signaling to chHdr/chData; alternatively add a focused test that uses a fake submitter which invokes those callbacks to assert timestamps and in-flight cache resets.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@block/internal/submitting/da_submitter_integration_test.go`:
- Around line 101-110: The test currently calls daSubmitter.Close() only at the
end, risking resource leaks if earlier assertions fail; after creating the async
submitter (the daSubmitter variable), ensure cleanup is guaranteed by
registering a deferred close or t.Cleanup call—e.g., immediately after
daSubmitter is constructed call defer daSubmitter.Close() or t.Cleanup(func(){
daSubmitter.Close() }) so the Close() method on daSubmitter always runs even if
the test fails early.
In `@block/internal/submitting/da_submitter_test.go`:
- Around line 216-218: The test currently calls submitter.Close() after
assertions which can leak the submitter's async worker if an assertion fails;
change the teardown to run immediately after setup by invoking defer
submitter.Close() (or t.Cleanup(func(){ submitter.Close() })) right after the
submitter is created so Close() always runs even on test failures — update the
tests that call submitter.SubmitHeaders(...) and later submitter.Close() (e.g.,
the cases around SubmitHeaders and the other similar test) to use deferred
cleanup instead.
In `@block/internal/submitting/da_submitter.go`:
- Around line 388-398: The datalayer success branch uses res.SubmittedCount
directly which can be 0 or >len(marshaled) and cause infinite loops or panics;
in the datypes.StatusSuccess case (around res.SubmittedCount handling) validate
that submitted := int(res.SubmittedCount) is >0 and <= len(marshaled) before
calling onSuccess or advancing the window (marshaled = marshaled[submitted:]);
if submitted==0 treat as a reject/error (update rs.Attempt or return/log and do
not spin) and if submitted>len(marshaled) treat as malformed input (log/error
and reject) so only a validated count is passed to onSuccess and used to slice
marshaled.
In `@block/internal/submitting/submitter.go`:
- Around line 236-250: The code enqueues a batch as in-flight via
GetPendingHeaders/GetPendingData but if s.daSubmitter.SubmitHeaders or
SubmitData returns an immediate error the in-flight claim is never released;
update the error path in submitter.go around s.daSubmitter.SubmitHeaders and the
analogous SubmitData call so that before logging or returning on synchronous
error you call s.cache.ResetInFlightHeaderRange(headers[0].Height(),
headers[len(headers)-1].Height()) (and for data use the corresponding
ResetInFlightDataRange with the first/last data heights), then proceed to
log/handle the error (including the existing ErrOversizedItem handling) so the
claimed heights are retried.
---
Outside diff comments:
In `@block/internal/cache/pending_base.go`:
- Around line 80-96: getPending() currently reads pb.lastHeight
(pb.lastHeight.Load()) outside the pb.inFlightMu critical section and then
clones pb.inFlightClaims and pb.gaps, which allows an interleaving with
setLastSubmittedHeight() that trims those slices and updates lastHeight causing
findAvailableRange() to return already-acknowledged heights; fix by moving the
read of pb.lastHeight inside the same pb.inFlightMu.Lock()/Unlock() block where
you clone inFlightClaims and gaps so lastHeight and the in-flight ranges are
read atomically, and apply the same locking discipline to
setLastSubmittedHeight() (acquire pb.inFlightMu while trimming
inFlightClaims/gaps and updating pb.lastHeight) to prevent races when
findAvailableRange, getPending, and setLastSubmittedHeight interact.
In `@block/internal/submitting/da_submitter_tracing.go`:
- Around line 33-61: The current tracedDASubmitter starts a span in
SubmitHeaders (and similarly in SubmitData) but defers span.End(), which closes
the span before async work completes; instead remove the immediate defer
span.End() and wrap the onSubmitSuccess and onSubmitError callbacks with
closures that record errors/status on the span and call span.End() when the
async completion path runs; i.e., in tracedDASubmitter.SubmitHeaders and
SubmitData, create wrappedOnSuccess := func(){ span.SetStatus(codes.Ok, "");
onSubmitSuccess(); span.End() } and wrappedOnError := func(err error){
span.RecordError(err); span.SetStatus(codes.Error, err.Error());
onSubmitError(err); span.End() } (or equivalent wrappers matching original
signatures) and pass those to t.inner.SubmitHeaders/SubmitData so the span
lifetime and annotations reflect actual completion.
---
Nitpick comments:
In `@block/internal/cache/pending_data.go`:
- Around line 88-90: Add a Go doc comment for the exported method
ResetInFlightDataRange on type PendingData: describe what the method does, its
parameters (start, end uint64) and any important behavior or side-effects (it
delegates to pd.base.resetInFlightRange to reset the in-flight data range).
Place the comment immediately above the func declaration for
ResetInFlightDataRange so it satisfies Go exported identifier documentation
guidelines.
In `@block/internal/cache/pending_headers.go`:
- Around line 83-85: Add a Go doc comment for the exported method
PendingHeaders.ResetInFlightHeaderRange describing its purpose and behavior:
explain that it resets the in-flight header range tracked by the PendingHeaders
instance between the inclusive start and end uint64 indices, mention any side
effects (it delegates to ph.base.resetInFlightRange), and document the
parameters (start, end) and any expectations (e.g., inclusive bounds or
preconditions). Place the comment immediately above the ResetInFlightHeaderRange
method.
In `@block/internal/submitting/submitter_test.go`:
- Around line 424-440: The fakeDASubmitter currently drops the provided
callbacks in SubmitHeaders and SubmitData; update these methods (SubmitHeaders
and SubmitData on fakeDASubmitter) to call the supplied on-success and on-error
callbacks when they are non-nil so the test exercises the full success/error
lifecycle (e.g., invoke the success callback when you want the fake to simulate
success, or invoke the error callback with a test error to simulate failure),
while preserving the existing signaling to chHdr/chData; alternatively add a
focused test that uses a fake submitter which invokes those callbacks to assert
timestamps and in-flight cache resets.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 996c9001-1856-439f-b61e-aa2ea4669b4e
📒 Files selected for processing (16)
block/internal/cache/manager.goblock/internal/cache/manager_test.goblock/internal/cache/pending_base.goblock/internal/cache/pending_base_test.goblock/internal/cache/pending_data.goblock/internal/cache/pending_data_test.goblock/internal/cache/pending_headers.goblock/internal/cache/pending_headers_test.goblock/internal/submitting/da_submitter.goblock/internal/submitting/da_submitter_integration_test.goblock/internal/submitting/da_submitter_mocks_test.goblock/internal/submitting/da_submitter_test.goblock/internal/submitting/da_submitter_tracing.goblock/internal/submitting/da_submitter_tracing_test.goblock/internal/submitting/submitter.goblock/internal/submitting/submitter_test.go
| // Submit headers and data - cache returns both items and marshalled bytes | ||
| headers, marshalledHeaders, err := cm.GetPendingHeaders(context.Background()) | ||
| require.NoError(t, err) | ||
| require.NoError(t, daSubmitter.SubmitHeaders(context.Background(), headers, marshalledHeaders, cm, n)) | ||
| require.NoError(t, daSubmitter.SubmitHeaders(context.Background(), headers, marshalledHeaders, cm, n, nil, nil)) | ||
|
|
||
| dataList, marshalledData, err := cm.GetPendingData(context.Background()) | ||
| require.NoError(t, err) | ||
| require.NoError(t, daSubmitter.SubmitData(context.Background(), dataList, marshalledData, cm, n, gen)) | ||
| require.NoError(t, daSubmitter.SubmitData(context.Background(), dataList, marshalledData, cm, n, gen, nil, nil)) | ||
|
|
||
| daSubmitter.Close() |
There was a problem hiding this comment.
Defer Close() so cleanup always runs.
Calling Close() only at the end of the successful path can leak the async submitter if an earlier assertion fails. Move it into a defer or t.Cleanup immediately after construction.
💡 Suggested fix
daSubmitter := NewDASubmitter(client, cfg, gen, common.DefaultBlockOptions(), common.NopMetrics(), zerolog.Nop(), noopDAHintAppender{}, noopDAHintAppender{})
+defer daSubmitter.Close()
@@
- daSubmitter.Close()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Submit headers and data - cache returns both items and marshalled bytes | |
| headers, marshalledHeaders, err := cm.GetPendingHeaders(context.Background()) | |
| require.NoError(t, err) | |
| require.NoError(t, daSubmitter.SubmitHeaders(context.Background(), headers, marshalledHeaders, cm, n)) | |
| require.NoError(t, daSubmitter.SubmitHeaders(context.Background(), headers, marshalledHeaders, cm, n, nil, nil)) | |
| dataList, marshalledData, err := cm.GetPendingData(context.Background()) | |
| require.NoError(t, err) | |
| require.NoError(t, daSubmitter.SubmitData(context.Background(), dataList, marshalledData, cm, n, gen)) | |
| require.NoError(t, daSubmitter.SubmitData(context.Background(), dataList, marshalledData, cm, n, gen, nil, nil)) | |
| daSubmitter.Close() | |
| // Submit headers and data - cache returns both items and marshalled bytes | |
| headers, marshalledHeaders, err := cm.GetPendingHeaders(context.Background()) | |
| require.NoError(t, err) | |
| require.NoError(t, daSubmitter.SubmitHeaders(context.Background(), headers, marshalledHeaders, cm, n, nil, nil)) | |
| dataList, marshalledData, err := cm.GetPendingData(context.Background()) | |
| require.NoError(t, err) | |
| require.NoError(t, daSubmitter.SubmitData(context.Background(), dataList, marshalledData, cm, n, gen, nil, nil)) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@block/internal/submitting/da_submitter_integration_test.go` around lines 101
- 110, The test currently calls daSubmitter.Close() only at the end, risking
resource leaks if earlier assertions fail; after creating the async submitter
(the daSubmitter variable), ensure cleanup is guaranteed by registering a
deferred close or t.Cleanup call—e.g., immediately after daSubmitter is
constructed call defer daSubmitter.Close() or t.Cleanup(func(){
daSubmitter.Close() }) so the Close() method on daSubmitter always runs even if
the test fails early.
| err = submitter.SubmitHeaders(ctx, headers, marshalledHeaders, cm, signer, nil, nil) | ||
| require.NoError(t, err) | ||
| submitter.Close() |
There was a problem hiding this comment.
Defer Close() in the success-path tests.
Both tests close the submitter only after the assertions. If one of those assertions fails, teardown is skipped and the async worker can leak into later tests. Use defer or t.Cleanup right after setup instead.
💡 Suggested fix
submitter, st, cm, mockDA, gen := setupDASubmitterTest(t)
+defer submitter.Close()
@@
- submitter.Close()Also applies to: 331-333
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@block/internal/submitting/da_submitter_test.go` around lines 216 - 218, The
test currently calls submitter.Close() after assertions which can leak the
submitter's async worker if an assertion fails; change the teardown to run
immediately after setup by invoking defer submitter.Close() (or
t.Cleanup(func(){ submitter.Close() })) right after the submitter is created so
Close() always runs even on test failures — update the tests that call
submitter.SubmitHeaders(...) and later submitter.Close() (e.g., the cases around
SubmitHeaders and the other similar test) to use deferred cleanup instead.
| case datypes.StatusSuccess: | ||
| submitted := int(res.SubmittedCount) | ||
| if onSuccess != nil { | ||
| onSuccess(submitted, res.Height) | ||
| } | ||
| s.logger.Info().Str("itemType", itemType).Int("count", submitted).Msg("successfully submitted items to DA layer") | ||
| if submitted == len(marshaled) { | ||
| return | ||
| } | ||
| // partial success: advance window | ||
| marshaled = marshaled[submitted:] |
There was a problem hiding this comment.
Validate SubmittedCount before using it.
res.SubmittedCount comes from the DA client. A value of 0 leaves marshaled unchanged and rs.Attempt unmodified, so this loop spins forever; a value larger than the batch size panics here and in the outer headers[:submittedCount] / signedDataList[:submittedCount] callbacks. Reject out-of-range counts before calling onSuccess.
Suggested fix
case datypes.StatusSuccess:
submitted := int(res.SubmittedCount)
+ if submitted <= 0 || submitted > len(marshaled) {
+ err := fmt.Errorf("invalid submitted count %d for batch size %d", submitted, len(marshaled))
+ s.recordFailure(common.DASubmitterFailureReasonUnknown)
+ s.logger.Error().Err(err).Str("itemType", itemType).Msg("DA layer returned invalid submitted count")
+ if onError != nil {
+ onError(err)
+ }
+ return
+ }
if onSuccess != nil {
onSuccess(submitted, res.Height)
}
s.logger.Info().Str("itemType", itemType).Int("count", submitted).Msg("successfully submitted items to DA layer")As per coding guidelines "Validate all inputs from external sources in Go code".
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| case datypes.StatusSuccess: | |
| submitted := int(res.SubmittedCount) | |
| if onSuccess != nil { | |
| onSuccess(submitted, res.Height) | |
| } | |
| s.logger.Info().Str("itemType", itemType).Int("count", submitted).Msg("successfully submitted items to DA layer") | |
| if submitted == len(marshaled) { | |
| return | |
| } | |
| // partial success: advance window | |
| marshaled = marshaled[submitted:] | |
| case datypes.StatusSuccess: | |
| submitted := int(res.SubmittedCount) | |
| if submitted <= 0 || submitted > len(marshaled) { | |
| err := fmt.Errorf("invalid submitted count %d for batch size %d", submitted, len(marshaled)) | |
| s.recordFailure(common.DASubmitterFailureReasonUnknown) | |
| s.logger.Error().Err(err).Str("itemType", itemType).Msg("DA layer returned invalid submitted count") | |
| if onError != nil { | |
| onError(err) | |
| } | |
| return | |
| } | |
| if onSuccess != nil { | |
| onSuccess(submitted, res.Height) | |
| } | |
| s.logger.Info().Str("itemType", itemType).Int("count", submitted).Msg("successfully submitted items to DA layer") | |
| if submitted == len(marshaled) { | |
| return | |
| } | |
| // partial success: advance window | |
| marshaled = marshaled[submitted:] |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@block/internal/submitting/da_submitter.go` around lines 388 - 398, The
datalayer success branch uses res.SubmittedCount directly which can be 0 or
>len(marshaled) and cause infinite loops or panics; in the datypes.StatusSuccess
case (around res.SubmittedCount handling) validate that submitted :=
int(res.SubmittedCount) is >0 and <= len(marshaled) before calling onSuccess or
advancing the window (marshaled = marshaled[submitted:]); if submitted==0 treat
as a reject/error (update rs.Attempt or return/log and do not spin) and if
submitted>len(marshaled) treat as malformed input (log/error and reject) so only
a validated count is passed to onSuccess and used to slice marshaled.
There was a problem hiding this comment.
♻️ Duplicate comments (1)
block/internal/submitting/da_submitter.go (1)
384-395:⚠️ Potential issue | 🔴 CriticalValidate
SubmittedCountbefore using it.The
res.SubmittedCountfrom the DA client is used without validation. Ifsubmitted == 0, the loop continues with unchangedmarshaledand reset backoff, potentially causing an infinite loop. Ifsubmitted > len(marshaled), the slice operation at line 394 will panic.🛡️ Proposed fix to validate SubmittedCount
case datypes.StatusSuccess: submitted := int(res.SubmittedCount) + if submitted <= 0 || submitted > len(marshaled) { + s.recordFailure(common.DASubmitterFailureReasonUnknown) + err := fmt.Errorf("invalid submitted count %d for batch size %d", submitted, len(marshaled)) + s.logger.Error().Err(err).Str("itemType", itemType).Msg("DA layer returned invalid submitted count") + if onError != nil { + onError(err) + } + return + } if onSuccess != nil { onSuccess(submitted, res.Height) }As per coding guidelines: "Validate all inputs from external sources in Go code".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@block/internal/submitting/da_submitter.go` around lines 384 - 395, The code uses res.SubmittedCount directly which can be 0 or >len(marshaled); validate it before slicing and advancing. In the datypes.StatusSuccess branch (symbols: res.SubmittedCount, marshaled, onSuccess, rs.Next, reasonSuccess, pol) ensure submitted := int(res.SubmittedCount) is checked: if submitted <= 0 treat as no progress (do not reset backoff — call rs.Fail or return to avoid infinite loop), if submitted > len(marshaled) cap it to len(marshaled) before calling onSuccess and slicing; only advance marshaled and call rs.Next when a positive, bounded submitted value was applied.
🧹 Nitpick comments (2)
block/internal/submitting/submitter.go (1)
249-260: Minor: Avoid logging whenerris nil.When
onErroris called withnil(on context cancellation or empty batch fromsubmitWithRetry), this logs an error with no actual error. Consider guarding the log statement.♻️ Proposed improvement
onError := func(err error) { if len(headers) > 0 { s.cache.ResetInFlightHeaderRange(headers[0].Height(), headers[len(headers)-1].Height()) } if errors.Is(err, common.ErrOversizedItem) { s.logger.Error().Err(err). Msg("CRITICAL: Header exceeds DA blob size limit - halting to prevent live lock") s.sendCriticalError(fmt.Errorf("unrecoverable DA submission error: %w", err)) return } - s.logger.Error().Err(err).Msg("failed to submit headers") + if err != nil { + s.logger.Error().Err(err).Msg("failed to submit headers") + } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@block/internal/submitting/submitter.go` around lines 249 - 260, The onError closure should not log when err is nil: after calling s.cache.ResetInFlightHeaderRange(headers[0].Height(), headers[len(headers)-1].Height()) (if headers present), add an early guard if err == nil { return } so you skip the error handling below; keep the oversized-item check (errors.Is(err, common.ErrOversizedItem)), s.logger.Error().Err(err).Msg("failed to submit headers"), and s.sendCriticalError(...) as-is for non-nil errors.block/internal/submitting/da_submitter.go (1)
326-331: Consider documenting theonError(nil)contract.Calling
onError(nil)for empty batches and context cancellations signals completion without error, but the callback name suggests an error occurred. This works correctly with the caller'serrors.Is(err, common.ErrOversizedItem)check, but the semantics could be clearer.Consider renaming to
onComplete(error)or documenting thatnilindicates graceful termination without retry failure.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@block/internal/submitting/da_submitter.go` around lines 326 - 331, The callback parameter onError in da_submitter.go is used to signal both actual errors and graceful completion by calling onError(nil) for empty batches/cancellations; update the code to make this contract explicit by either renaming the parameter from onError to onComplete (and update all call sites and related tests) or by adding a clear doc comment on the onError parameter/signature (and any public type that exposes it) stating that a nil argument indicates graceful termination/no-retry, while a non-nil error indicates a retryable/failure condition; ensure you update comments and any place that inspects the value (e.g., callers that use errors.Is(err, common.ErrOversizedItem)) to reflect the new name or documented behavior.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@block/internal/submitting/da_submitter.go`:
- Around line 384-395: The code uses res.SubmittedCount directly which can be 0
or >len(marshaled); validate it before slicing and advancing. In the
datypes.StatusSuccess branch (symbols: res.SubmittedCount, marshaled, onSuccess,
rs.Next, reasonSuccess, pol) ensure submitted := int(res.SubmittedCount) is
checked: if submitted <= 0 treat as no progress (do not reset backoff — call
rs.Fail or return to avoid infinite loop), if submitted > len(marshaled) cap it
to len(marshaled) before calling onSuccess and slicing; only advance marshaled
and call rs.Next when a positive, bounded submitted value was applied.
---
Nitpick comments:
In `@block/internal/submitting/da_submitter.go`:
- Around line 326-331: The callback parameter onError in da_submitter.go is used
to signal both actual errors and graceful completion by calling onError(nil) for
empty batches/cancellations; update the code to make this contract explicit by
either renaming the parameter from onError to onComplete (and update all call
sites and related tests) or by adding a clear doc comment on the onError
parameter/signature (and any public type that exposes it) stating that a nil
argument indicates graceful termination/no-retry, while a non-nil error
indicates a retryable/failure condition; ensure you update comments and any
place that inspects the value (e.g., callers that use errors.Is(err,
common.ErrOversizedItem)) to reflect the new name or documented behavior.
In `@block/internal/submitting/submitter.go`:
- Around line 249-260: The onError closure should not log when err is nil: after
calling s.cache.ResetInFlightHeaderRange(headers[0].Height(),
headers[len(headers)-1].Height()) (if headers present), add an early guard if
err == nil { return } so you skip the error handling below; keep the
oversized-item check (errors.Is(err, common.ErrOversizedItem)),
s.logger.Error().Err(err).Msg("failed to submit headers"), and
s.sendCriticalError(...) as-is for non-nil errors.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: bbad2e4d-1de5-4835-bdcd-e5d352bbf063
📒 Files selected for processing (3)
block/internal/cache/pending_base.goblock/internal/submitting/da_submitter.goblock/internal/submitting/submitter.go
✅ Files skipped from review due to trivial changes (1)
- block/internal/cache/pending_base.go
Overview
Attempt to improve submitter by doing concurrent sends. We don't need to wait for the answer and we would submit.
Useful when the throughput of blobs needs to be high.
Related to #3244 as Fiber takes time to return, so it is necessary there. This PR is mainly to investigate if we can generalize that improvement to mainline evnode.
Summary by CodeRabbit
New Features
Refactor
Bug Fixes