Skip to content

feat(server): Gracefully handle oversized batch items instead of aborting the stream#3137

Open
ericallam wants to merge 4 commits intomainfrom
feature/tri-7510-graceful-handling-of-large-batch-item-payloads
Open

feat(server): Gracefully handle oversized batch items instead of aborting the stream#3137
ericallam wants to merge 4 commits intomainfrom
feature/tri-7510-graceful-handling-of-large-batch-item-payloads

Conversation

@ericallam
Copy link
Member

@ericallam ericallam commented Feb 26, 2026

Gracefully handle oversized batch items instead of aborting the stream.

When an NDJSON batch item exceeds the maximum size, the parser now emits an error marker instead of throwing, allowing the batch to seal normally. The oversized item becomes a pre-failed run with PAYLOAD_TOO_LARGE error code, while other items in the batch process successfully. This prevents batchTriggerAndWait from seeing connection errors and retrying with exponential backoff.

Also fixes the NDJSON parser not consuming the remainder of an oversized line split across multiple chunks, which caused "Invalid JSON" errors on subsequent lines.

@changeset-bot
Copy link

changeset-bot bot commented Feb 26, 2026

🦋 Changeset detected

Latest commit: 3a656b7

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 28 packages
Name Type
@trigger.dev/sdk Patch
@trigger.dev/python Patch
@internal/sdk-compat-tests Patch
d3-chat Patch
references-d3-openai-agents Patch
references-nextjs-realtime Patch
references-realtime-hooks-test Patch
references-realtime-streams Patch
references-telemetry Patch
@trigger.dev/build Patch
@trigger.dev/core Patch
@trigger.dev/react-hooks Patch
@trigger.dev/redis-worker Patch
@trigger.dev/rsc Patch
@trigger.dev/schema-to-json Patch
@trigger.dev/database Patch
@trigger.dev/otlp-importer Patch
trigger.dev Patch
@internal/cache Patch
@internal/clickhouse Patch
@internal/redis Patch
@internal/replication Patch
@internal/run-engine Patch
@internal/schedule-engine Patch
@internal/testcontainers Patch
@internal/tracing Patch
@internal/tsql Patch
@internal/zod-worker Patch

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 26, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c4ca399 and 3a656b7.

⛔ Files ignored due to path filters (1)
  • ailogger-output.log is excluded by !**/*.log
📒 Files selected for processing (1)
  • internal-packages/run-engine/src/batch-queue/index.ts
🚧 Files skipped from review as they are similar to previous changes (1)
  • internal-packages/run-engine/src/batch-queue/index.ts
📜 Recent review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (27)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
  • GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
  • GitHub Check: sdk-compat / Cloudflare Workers
  • GitHub Check: sdk-compat / Node.js 22.12 (ubuntu-latest)
  • GitHub Check: sdk-compat / Node.js 20.20 (ubuntu-latest)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: sdk-compat / Deno Runtime
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: sdk-compat / Bun Runtime
  • GitHub Check: typecheck / typecheck

Walkthrough

The PR adds graceful handling for oversized NDJSON batch items by emitting an OversizedItemMarker token instead of aborting the stream and by skipping the remainder of oversized lines across chunks. It introduces OversizedItemMarker and extractIndexAndTask, updates the batch item processing to create pre-failed runs with PAYLOAD_TOO_LARGE (including TriggerFailedTaskService integration and a normalizePayload helper), adjusts error-code mappings and schemas to include PAYLOAD_TOO_LARGE, tweaks serialization for null/undefined payloads, and updates UI and presenter code to change how triggered runs are displayed.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

🚥 Pre-merge checks | ✅ 1 | ❌ 2

❌ Failed checks (2 warnings)

Check name Status Explanation Resolution
Description check ⚠️ Warning The description covers the core change but lacks several required template sections: no issue reference, incomplete checklist, no testing steps, no screenshots section, and missing changelog details. Add issue reference (Closes #), complete all checklist items, document testing steps performed, and ensure changelog section is present with summary of changes.
Docstring Coverage ⚠️ Warning Docstring coverage is 50.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (1 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately describes the main change: graceful handling of oversized batch items using error markers instead of stream abortion.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feature/tri-7510-graceful-handling-of-large-batch-item-payloads

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@ericallam ericallam changed the title feature/tri-7510-graceful-handling-of-large-batch-item-payloads feat(server): Gracefully handle oversized batch items instead of aborting the stream Feb 26, 2026
devin-ai-integration[bot]

This comment was marked as resolved.

coderabbitai[bot]

This comment was marked as resolved.

@ericallam ericallam marked this pull request as ready for review February 26, 2026 22:42
Copy link

@devin-ai-integration devin-ai-integration bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 1 new potential issue.

View 5 additional findings in Devin Review.

Open in Devin Review

Comment on lines +124 to +144
const itemIndex = marker.index >= 0 ? marker.index : lastIndex + 1;

const errorMessage = `Batch item payload is too large (${(marker.actualSize / 1024).toFixed(1)} KB). Maximum allowed size is ${(marker.maxSize / 1024).toFixed(1)} KB. Reduce the payload size or offload large data to external storage.`;

// Enqueue with __error metadata - processItemCallback will detect this
// and use TriggerFailedTaskService to create a pre-failed run
const batchItem: BatchItem = {
task: marker.task,
payload: "{}",
payloadType: "application/json",
options: {
__error: errorMessage,
__errorCode: "PAYLOAD_TOO_LARGE",
},
};

const result = await this._engine.enqueueBatchItem(
batchId,
environment.id,
itemIndex,
batchItem

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Missing index bounds validation for oversized batch items

When an oversized item marker is processed, the itemIndex is used to enqueue the batch item without the bounds check that exists for normal items.

Root Cause

For normal items, line 168 validates item.index >= batch.runCount before enqueueing. For oversized items at line 124, itemIndex is derived from the best-effort extractIndexAndTask function (which scans raw bytes with a 512-byte limit and imperfect depth tracking) or falls back to lastIndex + 1, but no bounds check against batch.runCount is performed before calling this._engine.enqueueBatchItem().

If extractIndexAndTask returns an incorrect index (e.g., due to depth-tracking confusion from {/} characters inside nested string values at streamBatchItems.server.ts:383-391, or the 512-byte scan limit being hit before finding the index field), or if the oversized item genuinely had an out-of-range index, the enqueue would proceed with an invalid index.

Impact: An out-of-bounds index in the batch completion tracker could cause the batch to never reach the expected runCount, potentially preventing the batch from being finalized/sealed properly.

Suggested change
const itemIndex = marker.index >= 0 ? marker.index : lastIndex + 1;
const errorMessage = `Batch item payload is too large (${(marker.actualSize / 1024).toFixed(1)} KB). Maximum allowed size is ${(marker.maxSize / 1024).toFixed(1)} KB. Reduce the payload size or offload large data to external storage.`;
// Enqueue with __error metadata - processItemCallback will detect this
// and use TriggerFailedTaskService to create a pre-failed run
const batchItem: BatchItem = {
task: marker.task,
payload: "{}",
payloadType: "application/json",
options: {
__error: errorMessage,
__errorCode: "PAYLOAD_TOO_LARGE",
},
};
const result = await this._engine.enqueueBatchItem(
batchId,
environment.id,
itemIndex,
batchItem
const itemIndex = marker.index >= 0 ? marker.index : lastIndex + 1;
// Validate index is within expected range (same check as normal items)
if (itemIndex >= batch.runCount) {
throw new ServiceValidationError(
`Item index ${itemIndex} exceeds batch runCount ${batch.runCount}`
);
}
const errorMessage = `Batch item payload is too large (${(marker.actualSize / 1024).toFixed(1)} KB). Maximum allowed size is ${(marker.maxSize / 1024).toFixed(1)} KB. Reduce the payload size or offload large data to external storage.`;
// Enqueue with __error metadata - processItemCallback will detect this
// and use TriggerFailedTaskService to create a pre-failed run
const batchItem: BatchItem = {
task: marker.task,
payload: "{}",
payloadType: "application/json",
options: {
__error: errorMessage,
__errorCode: "PAYLOAD_TOO_LARGE",
},
};
const result = await this._engine.enqueueBatchItem(
batchId,
environment.id,
itemIndex,
batchItem
);
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant