diff --git a/.changeset/modern-boxes-watch.md b/.changeset/modern-boxes-watch.md new file mode 100644 index 00000000000..e9539e2105b --- /dev/null +++ b/.changeset/modern-boxes-watch.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/sdk": patch +--- + +Add PAYLOAD_TOO_LARGE error to handle graceful recovery of sending batch trigger items with payloads that exceed the maximum payload size diff --git a/.server-changes/graceful-oversized-batch-items.md b/.server-changes/graceful-oversized-batch-items.md new file mode 100644 index 00000000000..980dd33e537 --- /dev/null +++ b/.server-changes/graceful-oversized-batch-items.md @@ -0,0 +1,10 @@ +--- +area: webapp +type: fix +--- + +Gracefully handle oversized batch items instead of aborting the stream. + +When an NDJSON batch item exceeds the maximum size, the parser now emits an error marker instead of throwing, allowing the batch to seal normally. The oversized item becomes a pre-failed run with `PAYLOAD_TOO_LARGE` error code, while other items in the batch process successfully. This prevents `batchTriggerAndWait` from seeing connection errors and retrying with exponential backoff. + +Also fixes the NDJSON parser not consuming the remainder of an oversized line split across multiple chunks, which caused "Invalid JSON" errors on subsequent lines. diff --git a/ailogger-output.log b/ailogger-output.log new file mode 100644 index 00000000000..e69de29bb2d diff --git a/apps/webapp/app/presenters/v3/SpanPresenter.server.ts b/apps/webapp/app/presenters/v3/SpanPresenter.server.ts index 59e717f7cd8..a85d8b20dd2 100644 --- a/apps/webapp/app/presenters/v3/SpanPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/SpanPresenter.server.ts @@ -509,8 +509,7 @@ export class SpanPresenter extends BasePresenter { taskIdentifier: true, spanId: true, createdAt: true, - number: true, - taskVersion: true, + status: true, }, where: { parentSpanId: spanId, diff --git a/apps/webapp/app/routes/api.v3.batches.$batchId.items.ts b/apps/webapp/app/routes/api.v3.batches.$batchId.items.ts index 8307f34afce..b3ed1c22422 100644 --- a/apps/webapp/app/routes/api.v3.batches.$batchId.items.ts +++ b/apps/webapp/app/routes/api.v3.batches.$batchId.items.ts @@ -99,11 +99,8 @@ export async function action({ request, params }: ActionFunctionArgs) { if (error instanceof ServiceValidationError) { return json({ error: error.message }, { status: 422 }); } else if (error instanceof Error) { - // Check for stream parsing errors - if ( - error.message.includes("Invalid JSON") || - error.message.includes("exceeds maximum size") - ) { + // Check for stream parsing errors (e.g. invalid JSON) + if (error.message.includes("Invalid JSON")) { return json({ error: error.message }, { status: 400 }); } diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx index ae8bdaa7077..a78c95d6036 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx @@ -58,6 +58,7 @@ import { RunTimeline, RunTimelineEvent, SpanTimeline } from "~/components/run/Ru import { PacketDisplay } from "~/components/runs/v3/PacketDisplay"; import { RunIcon } from "~/components/runs/v3/RunIcon"; import { RunTag } from "~/components/runs/v3/RunTag"; +import { TruncatedCopyableValue } from "~/components/primitives/TruncatedCopyableValue"; import { SpanEvents } from "~/components/runs/v3/SpanEvents"; import { SpanTitle } from "~/components/runs/v3/SpanTitle"; import { TaskRunAttemptStatusCombo } from "~/components/runs/v3/TaskRunAttemptStatus"; @@ -133,9 +134,10 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { name: error.name, message: error.message, stack: error.stack, - cause: error.cause instanceof Error - ? { name: error.cause.name, message: error.cause.message } - : error.cause, + cause: + error.cause instanceof Error + ? { name: error.cause.name, message: error.cause.message } + : error.cause, } : error, }); @@ -1003,7 +1005,7 @@ function RunBody({ )} -
+
{run.friendlyId !== runParam && ( Message {span.message} - {span.triggeredRuns.length > 0 && ( - -
- Triggered runs - - - - Run # - Task - Version - Created at - - - - {span.triggeredRuns.map((run) => { - const path = v3RunSpanPath( - organization, - project, - environment, - { friendlyId: run.friendlyId }, - { spanId: run.spanId } - ); - return ( - - - {run.number} - - - {run.taskIdentifier} - - - {run.taskVersion ?? "–"} - - - - - - ); - })} - -
-
-
- )} {span.events.length > 0 && } {span.properties !== undefined ? ( @@ -1268,6 +1228,48 @@ function SpanEntity({ span }: { span: Span }) { showOpenInModal /> ) : null} + {span.triggeredRuns.length > 0 && ( +
+ Runs + + + + ID + Task + Status + Created + + + + {span.triggeredRuns.map((run) => { + const path = v3RunSpanPath( + organization, + project, + environment, + { friendlyId: run.friendlyId }, + { spanId: run.spanId } + ); + return ( + + + + + + {run.taskIdentifier} + + + + + + + + + ); + })} + +
+
+ )}
); } diff --git a/apps/webapp/app/runEngine/services/streamBatchItems.server.ts b/apps/webapp/app/runEngine/services/streamBatchItems.server.ts index 8206760f469..859dfe2e6b9 100644 --- a/apps/webapp/app/runEngine/services/streamBatchItems.server.ts +++ b/apps/webapp/app/runEngine/services/streamBatchItems.server.ts @@ -14,6 +14,14 @@ export type StreamBatchItemsServiceOptions = { maxItemBytes: number; }; +export type OversizedItemMarker = { + __batchItemError: "OVERSIZED"; + index: number; + task: string; + actualSize: number; + maxSize: number; +}; + export type StreamBatchItemsServiceConstructorOptions = { prisma?: PrismaClientOrTransaction; engine?: RunEngine; @@ -110,6 +118,41 @@ export class StreamBatchItemsService extends WithRunEngine { // Process items from the stream for await (const rawItem of itemsIterator) { + // Check for oversized item markers from the NDJSON parser + if (rawItem && typeof rawItem === "object" && "__batchItemError" in rawItem) { + const marker = rawItem as OversizedItemMarker; + const itemIndex = marker.index >= 0 ? marker.index : lastIndex + 1; + + const errorMessage = `Batch item payload is too large (${(marker.actualSize / 1024).toFixed(1)} KB). Maximum allowed size is ${(marker.maxSize / 1024).toFixed(1)} KB. Reduce the payload size or offload large data to external storage.`; + + // Enqueue with __error metadata - processItemCallback will detect this + // and use TriggerFailedTaskService to create a pre-failed run + const batchItem: BatchItem = { + task: marker.task, + payload: "{}", + payloadType: "application/json", + options: { + __error: errorMessage, + __errorCode: "PAYLOAD_TOO_LARGE", + }, + }; + + const result = await this._engine.enqueueBatchItem( + batchId, + environment.id, + itemIndex, + batchItem + ); + + if (result.enqueued) { + itemsAccepted++; + } else { + itemsDeduplicated++; + } + lastIndex = itemIndex; + continue; + } + // Parse and validate the item const parseResult = BatchItemNDJSONSchema.safeParse(rawItem); if (!parseResult.success) { @@ -168,6 +211,34 @@ export class StreamBatchItemsService extends WithRunEngine { // Validate we received the expected number of items if (enqueuedCount !== batch.runCount) { + // The batch queue consumers may have already processed all items and + // cleaned up the Redis keys before we got here (especially likely when + // items include pre-failed runs that complete instantly). Check if the + // batch was already sealed/completed in Postgres. + const currentBatch = await this._prisma.batchTaskRun.findUnique({ + where: { id: batchId }, + select: { sealed: true, status: true }, + }); + + if (currentBatch?.sealed) { + logger.info("Batch already sealed before count check (fast completion)", { + batchId: batchFriendlyId, + itemsAccepted, + itemsDeduplicated, + enqueuedCount, + expectedCount: batch.runCount, + batchStatus: currentBatch.status, + }); + + return { + id: batchFriendlyId, + itemsAccepted, + itemsDeduplicated, + sealed: true, + runCount: batch.runCount, + }; + } + logger.warn("Batch item count mismatch", { batchId: batchFriendlyId, expected: batch.runCount, @@ -281,6 +352,121 @@ export class StreamBatchItemsService extends WithRunEngine { } } +/** + * Extract `index` and `task` from raw JSON bytes without decoding the full line. + * Scans at most 512 bytes, tracking JSON nesting depth to only match top-level keys. + */ +export function extractIndexAndTask(bytes: Uint8Array): { index: number; task: string } { + let index = -1; + let task = "unknown"; + let depth = 0; + let foundIndex = false; + let foundTask = false; + const limit = Math.min(bytes.byteLength, 512); + + const QUOTE = 0x22; // " + const COLON = 0x3a; // : + const LBRACE = 0x7b; // { + const RBRACE = 0x7d; // } + const LBRACKET = 0x5b; // [ + const RBRACKET = 0x5d; // ] + const BACKSLASH = 0x5c; // \ + + // Byte patterns for "index" and "task" (without quotes) + const INDEX_BYTES = [0x69, 0x6e, 0x64, 0x65, 0x78]; // index + const TASK_BYTES = [0x74, 0x61, 0x73, 0x6b]; // task + + let i = 0; + while (i < limit && !(foundIndex && foundTask)) { + const b = bytes[i]; + + if (b === LBRACE || b === LBRACKET) { + depth++; + i++; + continue; + } + if (b === RBRACE || b === RBRACKET) { + depth--; + i++; + continue; + } + + // Only match keys at depth 1 (top-level object) + if (b === QUOTE && depth === 1) { + // Read the key inside quotes + const keyStart = i + 1; + let keyEnd = keyStart; + while (keyEnd < limit && bytes[keyEnd] !== QUOTE) { + if (bytes[keyEnd] === BACKSLASH) keyEnd++; // skip escaped char + keyEnd++; + } + + const keyLen = keyEnd - keyStart; + + // Check if this key matches "index" or "task" + const isIndex = + !foundIndex && + keyLen === INDEX_BYTES.length && + INDEX_BYTES.every((b, j) => bytes[keyStart + j] === b); + const isTask = + !foundTask && + keyLen === TASK_BYTES.length && + TASK_BYTES.every((b, j) => bytes[keyStart + j] === b); + + if (isIndex || isTask) { + // Skip past closing quote and find colon + let pos = keyEnd + 1; + while (pos < limit && bytes[pos] !== COLON) pos++; + pos++; // skip colon + // Skip whitespace + while (pos < limit && (bytes[pos] === 0x20 || bytes[pos] === 0x09)) pos++; + + if (isIndex) { + // Parse digits + let num = 0; + let hasDigit = false; + while (pos < limit && bytes[pos] >= 0x30 && bytes[pos] <= 0x39) { + num = num * 10 + (bytes[pos] - 0x30); + hasDigit = true; + pos++; + } + if (hasDigit) { + index = num; + foundIndex = true; + } + } else { + // Parse quoted string value + if (pos < limit && bytes[pos] === QUOTE) { + const valStart = pos + 1; + let valEnd = valStart; + while (valEnd < limit && bytes[valEnd] !== QUOTE) { + if (bytes[valEnd] === BACKSLASH) valEnd++; + valEnd++; + } + // Decode just this slice + try { + task = new TextDecoder("utf-8", { fatal: true }).decode( + bytes.slice(valStart, valEnd) + ); + foundTask = true; + } catch { + // Leave as "unknown" + } + } + } + } + + // Skip past the key's closing quote + i = keyEnd + 1; + continue; + } + + i++; + } + + return { index, task }; +} + /** * Create an NDJSON parser transform stream. * @@ -305,6 +491,9 @@ export function createNdjsonParserStream( let chunks: Uint8Array[] = []; let totalBytes = 0; let lineNumber = 0; + // When an oversized incomplete line is detected (Case 2), we must discard + // all remaining bytes of that line until the next newline delimiter. + let skipUntilNewline = false; const NEWLINE_BYTE = 0x0a; // '\n' @@ -398,6 +587,24 @@ export function createNdjsonParserStream( return new TransformStream({ transform(chunk, controller) { + // If we're skipping the remainder of an oversized line, scan for the + // next newline in this chunk and discard everything before it. + if (skipUntilNewline) { + const nlPos = chunk.indexOf(NEWLINE_BYTE); + if (nlPos === -1) { + // Entire chunk is still part of the oversized line — discard it + return; + } + // Found the newline — keep everything after it + skipUntilNewline = false; + const remaining = chunk.slice(nlPos + 1); + if (remaining.byteLength === 0) { + return; + } + // Replace chunk with the remainder and fall through to normal processing + chunk = remaining; + } + // Append chunk to buffer chunks.push(chunk); totalBytes += chunk.byteLength; @@ -407,11 +614,19 @@ export function createNdjsonParserStream( while ((newlineIndex = findNewlineIndex()) !== -1) { // Check size limit BEFORE extracting/decoding (bytes up to newline) if (newlineIndex > maxItemBytes) { - throw new Error( - `Item at line ${ - lineNumber + 1 - } exceeds maximum size of ${maxItemBytes} bytes (actual: ${newlineIndex})` - ); + // Case 1: Complete line exceeds limit - emit marker instead of throwing + const lineBytes = extractLine(newlineIndex); + const extracted = extractIndexAndTask(lineBytes); + const marker: OversizedItemMarker = { + __batchItemError: "OVERSIZED", + index: extracted.index, + task: extracted.task, + actualSize: newlineIndex, + maxSize: maxItemBytes, + }; + controller.enqueue(marker); + lineNumber++; + continue; } const lineBytes = extractLine(newlineIndex); @@ -421,11 +636,23 @@ export function createNdjsonParserStream( // Check if the remaining buffer (incomplete line) exceeds the limit // This prevents OOM from a single huge line without newlines if (totalBytes > maxItemBytes) { - throw new Error( - `Item at line ${ - lineNumber + 1 - } exceeds maximum size of ${maxItemBytes} bytes (buffered: ${totalBytes}, no newline found)` - ); + // Case 2: Incomplete line exceeds limit - emit marker instead of throwing + const extracted = extractIndexAndTask(concatenateChunks()); + const marker: OversizedItemMarker = { + __batchItemError: "OVERSIZED", + index: extracted.index, + task: extracted.task, + actualSize: totalBytes, + maxSize: maxItemBytes, + }; + controller.enqueue(marker); + lineNumber++; + // Clear buffer and skip remaining bytes of this oversized line + // until the next newline delimiter is found in a subsequent chunk + chunks = []; + totalBytes = 0; + skipUntilNewline = true; + return; } }, @@ -441,11 +668,17 @@ export function createNdjsonParserStream( // Check size limit before processing final line if (totalBytes > maxItemBytes) { - throw new Error( - `Item at line ${ - lineNumber + 1 - } exceeds maximum size of ${maxItemBytes} bytes (actual: ${totalBytes})` - ); + // Case 3: Flush with oversized remaining - emit marker instead of throwing + const extracted = extractIndexAndTask(concatenateChunks()); + const marker: OversizedItemMarker = { + __batchItemError: "OVERSIZED", + index: extracted.index, + task: extracted.task, + actualSize: totalBytes, + maxSize: maxItemBytes, + }; + controller.enqueue(marker); + return; } const finalBytes = concatenateChunks(); diff --git a/apps/webapp/app/v3/runEngineHandlers.server.ts b/apps/webapp/app/v3/runEngineHandlers.server.ts index f0cf449d36a..46fe5eaa796 100644 --- a/apps/webapp/app/v3/runEngineHandlers.server.ts +++ b/apps/webapp/app/v3/runEngineHandlers.server.ts @@ -669,6 +669,46 @@ export function setupBatchQueueCallbacks() { engine, }); + // Check for pre-marked error items (e.g. oversized payloads) + const itemError = item.options?.__error as string | undefined; + if (itemError) { + const errorCode = (item.options?.__errorCode as string) ?? "ITEM_ERROR"; + + let environment: AuthenticatedEnvironment | undefined; + try { + environment = (await findEnvironmentById(meta.environmentId)) ?? undefined; + } catch { + // Best-effort environment lookup + } + + if (environment) { + const failedRunId = await triggerFailedTaskService.call({ + taskId: item.task, + environment, + payload: item.payload ?? "{}", + payloadType: item.payloadType as string, + errorMessage: itemError, + errorCode: errorCode as TaskRunErrorCodes, + parentRunId: meta.parentRunId, + resumeParentOnCompletion: meta.resumeParentOnCompletion, + batch: { id: batchId, index: itemIndex }, + traceContext: meta.traceContext as Record | undefined, + spanParentAsLink: meta.spanParentAsLink, + }); + + if (failedRunId) { + span.setAttribute("batch.result.pre_failed", true); + span.setAttribute("batch.result.run_id", failedRunId); + span.end(); + return { success: true as const, runId: failedRunId }; + } + } + + // Fallback if TriggerFailedTaskService or environment lookup fails + span.end(); + return { success: false as const, error: itemError, errorCode }; + } + let environment: AuthenticatedEnvironment | undefined; try { environment = (await findEnvironmentById(meta.environmentId)) ?? undefined; diff --git a/apps/webapp/test/engine/streamBatchItems.test.ts b/apps/webapp/test/engine/streamBatchItems.test.ts index 9e3b3aafe8b..2dee8668762 100644 --- a/apps/webapp/test/engine/streamBatchItems.test.ts +++ b/apps/webapp/test/engine/streamBatchItems.test.ts @@ -24,6 +24,8 @@ import { StreamBatchItemsService, createNdjsonParserStream, streamToAsyncIterable, + extractIndexAndTask, + type OversizedItemMarker, } from "../../app/runEngine/services/streamBatchItems.server"; import { ServiceValidationError } from "../../app/v3/services/baseService.server"; @@ -641,6 +643,25 @@ describe("createNdjsonParserStream", () => { expect(results).toEqual([{ id: 1 }, { id: 2 }]); }); + it("should handle escaped newlines in JSON string values", async () => { + // JSON.stringify escapes newlines as \n (two chars: backslash + n), + // so they don't break NDJSON line boundaries. This is the normal case + // when the SDK serializes payloads containing newlines. + const item1 = JSON.stringify({ payload: "line1\nline2\nline3" }); + const item2 = JSON.stringify({ payload: "no newlines" }); + const ndjson = item1 + "\n" + item2 + "\n"; + const encoder = new TextEncoder(); + const stream = chunksToStream([encoder.encode(ndjson)]); + + const parser = createNdjsonParserStream(1024); + const results = await collectStream(stream.pipeThrough(parser)); + + expect(results).toEqual([ + { payload: "line1\nline2\nline3" }, + { payload: "no newlines" }, + ]); + }); + it("should skip empty lines", async () => { const ndjson = '{"a":1}\n\n{"b":2}\n \n{"c":3}\n'; const encoder = new TextEncoder(); @@ -705,33 +726,76 @@ describe("createNdjsonParserStream", () => { expect(results).toEqual([{ greeting: "こんにちは" }]); }); - it("should reject lines exceeding maxItemBytes", async () => { + it("should emit OversizedItemMarker for lines exceeding maxItemBytes", async () => { const maxBytes = 50; - // Create a line that exceeds the limit - const largeJson = JSON.stringify({ data: "x".repeat(100) }) + "\n"; + // Create a line that exceeds the limit with index and task fields + const largeJson = JSON.stringify({ index: 3, task: "my-task", data: "x".repeat(100) }) + "\n"; const encoder = new TextEncoder(); const stream = chunksToStream([encoder.encode(largeJson)]); const parser = createNdjsonParserStream(maxBytes); + const results = await collectStream(stream.pipeThrough(parser)); - await expect(collectStream(stream.pipeThrough(parser))).rejects.toThrow(/exceeds maximum size/); + expect(results).toHaveLength(1); + const marker = results[0] as OversizedItemMarker; + expect(marker.__batchItemError).toBe("OVERSIZED"); + expect(marker.index).toBe(3); + expect(marker.task).toBe("my-task"); + expect(marker.maxSize).toBe(maxBytes); + expect(marker.actualSize).toBeGreaterThan(maxBytes); }); - it("should reject unbounded accumulation without newlines", async () => { + it("should emit OversizedItemMarker for unbounded accumulation without newlines", async () => { const maxBytes = 50; // Send data without any newlines that exceeds the buffer limit const encoder = new TextEncoder(); const chunks = [ - encoder.encode('{"start":"'), + encoder.encode('{"index":7,"task":"big-task","start":"'), encoder.encode("x".repeat(60)), // This will push buffer over 50 bytes ]; const stream = chunksToStream(chunks); const parser = createNdjsonParserStream(maxBytes); + const results = await collectStream(stream.pipeThrough(parser)); - await expect(collectStream(stream.pipeThrough(parser))).rejects.toThrow( - /exceeds maximum size.*no newline found/ - ); + expect(results).toHaveLength(1); + const marker = results[0] as OversizedItemMarker; + expect(marker.__batchItemError).toBe("OVERSIZED"); + expect(marker.index).toBe(7); + expect(marker.task).toBe("big-task"); + expect(marker.maxSize).toBe(maxBytes); + }); + + it("should skip remaining bytes of oversized line arriving in subsequent chunks", async () => { + const maxBytes = 50; + const encoder = new TextEncoder(); + // Simulate a normal item, then an oversized item split across many chunks, + // then another normal item after the newline. + // The oversized line is: {"index":1,"task":"t","data":"xxxx...120 x's...xxxx"}\n + const normalItem1 = '{"index":0,"task":"t","x":1}\n'; + const oversizedStart = '{"index":1,"task":"t","data":"'; + const oversizedMiddle = "x".repeat(120); // way over 50 bytes + const oversizedEnd = '"}\n'; + const normalItem2 = '{"index":2,"task":"t","x":2}\n'; + + // Send as separate chunks to trigger Case 2 (no newline, buffer > limit) + const chunks = [ + encoder.encode(normalItem1 + oversizedStart), + encoder.encode(oversizedMiddle.slice(0, 60)), + encoder.encode(oversizedMiddle.slice(60)), + encoder.encode(oversizedEnd + normalItem2), + ]; + const stream = chunksToStream(chunks); + + const parser = createNdjsonParserStream(maxBytes); + const results = await collectStream(stream.pipeThrough(parser)); + + // Should get: normal item 1, oversized marker, normal item 2 + expect(results).toHaveLength(3); + expect(results[0]).toEqual({ index: 0, task: "t", x: 1 }); + expect((results[1] as OversizedItemMarker).__batchItemError).toBe("OVERSIZED"); + expect((results[1] as OversizedItemMarker).index).toBe(1); + expect(results[2]).toEqual({ index: 2, task: "t", x: 2 }); }); it("should check byte size before decoding to prevent OOM", async () => { @@ -756,10 +820,12 @@ describe("createNdjsonParserStream", () => { const results1 = await collectStream(stream1.pipeThrough(parser1)); expect(results1).toHaveLength(1); - // Large one should fail + // Large one should emit an OversizedItemMarker const stream2 = chunksToStream([largeBytes]); const parser2 = createNdjsonParserStream(maxBytes); - await expect(collectStream(stream2.pipeThrough(parser2))).rejects.toThrow(/exceeds maximum/); + const results2 = await collectStream(stream2.pipeThrough(parser2)); + expect(results2).toHaveLength(1); + expect((results2[0] as OversizedItemMarker).__batchItemError).toBe("OVERSIZED"); }); it("should handle final line in flush without trailing newline", async () => { @@ -837,6 +903,28 @@ describe("createNdjsonParserStream", () => { expect(results).toEqual([]); }); + it("should pass normal items and emit markers for oversized items in the same stream", async () => { + const maxBytes = 50; + const encoder = new TextEncoder(); + // Normal item, then oversized item, then another normal item + const normalItem1 = '{"index":0,"task":"t","x":1}\n'; + const oversizedItem = JSON.stringify({ index: 1, task: "t", data: "x".repeat(100) }) + "\n"; + const normalItem2 = '{"index":2,"task":"t","x":2}\n'; + const stream = chunksToStream([encoder.encode(normalItem1 + oversizedItem + normalItem2)]); + + const parser = createNdjsonParserStream(maxBytes); + const results = await collectStream(stream.pipeThrough(parser)); + + expect(results).toHaveLength(3); + // First: normal parsed object + expect(results[0]).toEqual({ index: 0, task: "t", x: 1 }); + // Second: oversized marker + expect((results[1] as OversizedItemMarker).__batchItemError).toBe("OVERSIZED"); + expect((results[1] as OversizedItemMarker).index).toBe(1); + // Third: normal parsed object + expect(results[2]).toEqual({ index: 2, task: "t", x: 2 }); + }); + it("should handle stream with only whitespace", async () => { const encoder = new TextEncoder(); const stream = chunksToStream([encoder.encode(" \n\n \n")]); @@ -847,3 +935,34 @@ describe("createNdjsonParserStream", () => { expect(results).toEqual([]); }); }); + +describe("extractIndexAndTask", () => { + const encoder = new TextEncoder(); + + it("should extract index and task from JSON bytes", () => { + const bytes = encoder.encode('{"index":42,"task":"my-task","data":"x"}'); + const result = extractIndexAndTask(bytes); + expect(result.index).toBe(42); + expect(result.task).toBe("my-task"); + }); + + it("should return defaults for empty or malformed bytes", () => { + const result = extractIndexAndTask(new Uint8Array(0)); + expect(result.index).toBe(-1); + expect(result.task).toBe("unknown"); + }); + + it("should handle keys in any order", () => { + const bytes = encoder.encode('{"task":"other-task","data":"y","index":99}'); + const result = extractIndexAndTask(bytes); + expect(result.index).toBe(99); + expect(result.task).toBe("other-task"); + }); + + it("should not match nested keys", () => { + const bytes = encoder.encode('{"nested":{"index":999,"task":"inner"},"index":5,"task":"outer"}'); + const result = extractIndexAndTask(bytes); + expect(result.index).toBe(5); + expect(result.task).toBe("outer"); + }); +}); diff --git a/internal-packages/run-engine/src/batch-queue/index.ts b/internal-packages/run-engine/src/batch-queue/index.ts index 98bdacc052e..312bf4772f7 100644 --- a/internal-packages/run-engine/src/batch-queue/index.ts +++ b/internal-packages/run-engine/src/batch-queue/index.ts @@ -848,7 +848,11 @@ export class BatchQueue { "BatchQueue.serializePayload", async (innerSpan) => { const str = - typeof item.payload === "string" ? item.payload : JSON.stringify(item.payload); + item.payload === undefined || item.payload === null + ? "{}" + : typeof item.payload === "string" + ? item.payload + : JSON.stringify(item.payload); innerSpan?.setAttribute("batch.payloadSize", str.length); return str; } @@ -912,7 +916,11 @@ export class BatchQueue { "BatchQueue.serializePayload", async (innerSpan) => { const str = - typeof item.payload === "string" ? item.payload : JSON.stringify(item.payload); + item.payload === undefined || item.payload === null + ? "{}" + : typeof item.payload === "string" + ? item.payload + : JSON.stringify(item.payload); innerSpan?.setAttribute("batch.payloadSize", str.length); return str; } diff --git a/internal-packages/run-engine/src/engine/errors.ts b/internal-packages/run-engine/src/engine/errors.ts index 373f9daa14f..772282debd1 100644 --- a/internal-packages/run-engine/src/engine/errors.ts +++ b/internal-packages/run-engine/src/engine/errors.ts @@ -61,6 +61,7 @@ export function runStatusFromError( case "TASK_PROCESS_SIGTERM": case "TASK_DID_CONCURRENT_WAIT": case "BATCH_ITEM_COULD_NOT_TRIGGER": + case "PAYLOAD_TOO_LARGE": case "UNSPECIFIED_ERROR": return "SYSTEM_FAILURE"; default: diff --git a/packages/core/src/v3/errors.ts b/packages/core/src/v3/errors.ts index 91483251318..87fff767d7b 100644 --- a/packages/core/src/v3/errors.ts +++ b/packages/core/src/v3/errors.ts @@ -308,6 +308,7 @@ export function shouldRetryError(error: TaskRunError): boolean { case "TASK_HAS_N0_EXECUTION_SNAPSHOT": case "TASK_RUN_DEQUEUED_MAX_RETRIES": case "BATCH_ITEM_COULD_NOT_TRIGGER": + case "PAYLOAD_TOO_LARGE": case "UNSPECIFIED_ERROR": return false; diff --git a/packages/core/src/v3/schemas/common.ts b/packages/core/src/v3/schemas/common.ts index d489a59390e..f3757208335 100644 --- a/packages/core/src/v3/schemas/common.ts +++ b/packages/core/src/v3/schemas/common.ts @@ -188,6 +188,7 @@ export const TaskRunInternalError = z.object({ "TASK_DID_CONCURRENT_WAIT", "RECURSIVE_WAIT_DEADLOCK", "BATCH_ITEM_COULD_NOT_TRIGGER", + "PAYLOAD_TOO_LARGE", "UNSPECIFIED_ERROR", ]), message: z.string().optional(), diff --git a/references/hello-world/src/trigger/batches.ts b/references/hello-world/src/trigger/batches.ts index 6bbdf946120..b6a3f79e74e 100644 --- a/references/hello-world/src/trigger/batches.ts +++ b/references/hello-world/src/trigger/batches.ts @@ -999,6 +999,45 @@ export const largePayloadTask = task({ }, }); +// ============================================================================ +// Oversized Payload Graceful Handling +// ============================================================================ + +/** + * Test: Batch with oversized item should complete gracefully + * + * Sends 2 items: one normal, one oversized (~3.2MB). + * The oversized item should result in a pre-failed run (ok: false) + * while the normal item processes successfully (ok: true). + */ +export const batchSealFailureOversizedPayload = task({ + id: "batch-seal-failure-oversized", + maxDuration: 60, + retry: { + maxAttempts: 1, + }, + run: async () => { + const results = await fixedLengthTask.batchTriggerAndWait([ + { payload: { waitSeconds: 1, output: "normal" } }, + { payload: { waitSeconds: 1, output: "x".repeat(3_200_000) } }, // ~3.2MB oversized + ]); + + const normal = results.runs[0]; + const oversized = results.runs[1]; + + logger.info("Batch results", { + normalOk: normal?.ok, + oversizedOk: oversized?.ok, + }); + + return { + normalOk: normal?.ok === true, + oversizedOk: oversized?.ok === false, + oversizedError: !oversized?.ok ? oversized?.error : undefined, + }; + }, +}); + type Payload = { waitSeconds: number; error?: string;