feat(server): Gracefully handle oversized batch items instead of aborting the stream#3137
feat(server): Gracefully handle oversized batch items instead of aborting the stream#3137
Conversation
…over mulitple chunks instead of in 1 chunk
🦋 Changeset detectedLatest commit: 3a656b7 The changes in this PR will be included in the next version bump. This PR includes changesets to release 28 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review infoConfiguration used: Repository UI Review profile: CHILL Plan: Pro ⛔ Files ignored due to path filters (1)
📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
📜 Recent review details⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (27)
WalkthroughThe PR adds graceful handling for oversized NDJSON batch items by emitting an Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes 🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
| const itemIndex = marker.index >= 0 ? marker.index : lastIndex + 1; | ||
|
|
||
| const errorMessage = `Batch item payload is too large (${(marker.actualSize / 1024).toFixed(1)} KB). Maximum allowed size is ${(marker.maxSize / 1024).toFixed(1)} KB. Reduce the payload size or offload large data to external storage.`; | ||
|
|
||
| // Enqueue with __error metadata - processItemCallback will detect this | ||
| // and use TriggerFailedTaskService to create a pre-failed run | ||
| const batchItem: BatchItem = { | ||
| task: marker.task, | ||
| payload: "{}", | ||
| payloadType: "application/json", | ||
| options: { | ||
| __error: errorMessage, | ||
| __errorCode: "PAYLOAD_TOO_LARGE", | ||
| }, | ||
| }; | ||
|
|
||
| const result = await this._engine.enqueueBatchItem( | ||
| batchId, | ||
| environment.id, | ||
| itemIndex, | ||
| batchItem |
There was a problem hiding this comment.
🟡 Missing index bounds validation for oversized batch items
When an oversized item marker is processed, the itemIndex is used to enqueue the batch item without the bounds check that exists for normal items.
Root Cause
For normal items, line 168 validates item.index >= batch.runCount before enqueueing. For oversized items at line 124, itemIndex is derived from the best-effort extractIndexAndTask function (which scans raw bytes with a 512-byte limit and imperfect depth tracking) or falls back to lastIndex + 1, but no bounds check against batch.runCount is performed before calling this._engine.enqueueBatchItem().
If extractIndexAndTask returns an incorrect index (e.g., due to depth-tracking confusion from {/} characters inside nested string values at streamBatchItems.server.ts:383-391, or the 512-byte scan limit being hit before finding the index field), or if the oversized item genuinely had an out-of-range index, the enqueue would proceed with an invalid index.
Impact: An out-of-bounds index in the batch completion tracker could cause the batch to never reach the expected runCount, potentially preventing the batch from being finalized/sealed properly.
| const itemIndex = marker.index >= 0 ? marker.index : lastIndex + 1; | |
| const errorMessage = `Batch item payload is too large (${(marker.actualSize / 1024).toFixed(1)} KB). Maximum allowed size is ${(marker.maxSize / 1024).toFixed(1)} KB. Reduce the payload size or offload large data to external storage.`; | |
| // Enqueue with __error metadata - processItemCallback will detect this | |
| // and use TriggerFailedTaskService to create a pre-failed run | |
| const batchItem: BatchItem = { | |
| task: marker.task, | |
| payload: "{}", | |
| payloadType: "application/json", | |
| options: { | |
| __error: errorMessage, | |
| __errorCode: "PAYLOAD_TOO_LARGE", | |
| }, | |
| }; | |
| const result = await this._engine.enqueueBatchItem( | |
| batchId, | |
| environment.id, | |
| itemIndex, | |
| batchItem | |
| const itemIndex = marker.index >= 0 ? marker.index : lastIndex + 1; | |
| // Validate index is within expected range (same check as normal items) | |
| if (itemIndex >= batch.runCount) { | |
| throw new ServiceValidationError( | |
| `Item index ${itemIndex} exceeds batch runCount ${batch.runCount}` | |
| ); | |
| } | |
| const errorMessage = `Batch item payload is too large (${(marker.actualSize / 1024).toFixed(1)} KB). Maximum allowed size is ${(marker.maxSize / 1024).toFixed(1)} KB. Reduce the payload size or offload large data to external storage.`; | |
| // Enqueue with __error metadata - processItemCallback will detect this | |
| // and use TriggerFailedTaskService to create a pre-failed run | |
| const batchItem: BatchItem = { | |
| task: marker.task, | |
| payload: "{}", | |
| payloadType: "application/json", | |
| options: { | |
| __error: errorMessage, | |
| __errorCode: "PAYLOAD_TOO_LARGE", | |
| }, | |
| }; | |
| const result = await this._engine.enqueueBatchItem( | |
| batchId, | |
| environment.id, | |
| itemIndex, | |
| batchItem | |
| ); |
Was this helpful? React with 👍 or 👎 to provide feedback.
Gracefully handle oversized batch items instead of aborting the stream.
When an NDJSON batch item exceeds the maximum size, the parser now emits an error marker instead of throwing, allowing the batch to seal normally. The oversized item becomes a pre-failed run with
PAYLOAD_TOO_LARGEerror code, while other items in the batch process successfully. This preventsbatchTriggerAndWaitfrom seeing connection errors and retrying with exponential backoff.Also fixes the NDJSON parser not consuming the remainder of an oversized line split across multiple chunks, which caused "Invalid JSON" errors on subsequent lines.