Skip to content

Commit 65a5aec

Browse files
committed
fix(cli): fail attempt on uncaught exception instead of hanging to maxDuration (TRI-9117)
When a Node EventEmitter (e.g. node-redis) emits an "error" event with no listener attached, Node escalates it to process.on("uncaughtException") in the task worker. The worker reported the error via the UNCAUGHT_EXCEPTION IPC event but did not exit, and the supervisor-side handler in taskRunProcess only logged the message at debug level — leaving the run() promise orphaned until maxDuration fired and producing empty attempts (durationMs=0, costInCents=0). The supervisor now rejects the in-flight attempt with an UncaughtExceptionError and gracefully terminates the worker (preserving the OTEL flush window) on UNCAUGHT_EXCEPTION. The attempt fails fast with TASK_EXECUTION_FAILED, surfacing the original error name, message, and stack trace, and falls under the normal retry policy. This mirrors the existing indexing-side behavior in indexWorkerManifest. Apply the same handling to unhandled promise rejections, which Node already routes through uncaughtException by default.
1 parent 45ec23c commit 65a5aec

3 files changed

Lines changed: 82 additions & 1 deletion

File tree

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
---
2+
"trigger.dev": patch
3+
---
4+
5+
Fix runs hanging to `MAX_DURATION_EXCEEDED` after an uncaught exception. When a Node `EventEmitter` (e.g. `node-redis`) emits an `"error"` event with no listener attached, Node escalates it to `process.on("uncaughtException")` in the task worker. The worker reported the error via the `UNCAUGHT_EXCEPTION` IPC event but did not exit, and the supervisor-side handler in `taskRunProcess` only logged the message at debug level — leaving the `run()` promise orphaned until `maxDuration` fired and producing empty attempts (`durationMs=0`, `costInCents=0`).
6+
7+
The supervisor now rejects the in-flight attempt with an `UncaughtExceptionError` and gracefully terminates the worker (preserving the OTEL flush window) on `UNCAUGHT_EXCEPTION`. The attempt fails fast with `TASK_EXECUTION_FAILED`, surfacing the original error name, message, and stack trace, and falls under the normal retry policy. This mirrors the existing indexing-side behavior. Apply the same handling to unhandled promise rejections, which Node already routes through `uncaughtException` by default.
8+
9+
Customers should still attach `client.on("error", ...)` listeners to long-lived clients (Redis, Postgres, etc.) and let awaited command rejections drive failure semantics — but a missed listener will no longer silently consume the entire `maxDuration` budget.

packages/cli-v3/src/executions/taskRunProcess.test.ts

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { TaskRunProcess, type TaskRunProcessOptions } from "./taskRunProcess.js";
22
import { describe, it, expect, vi } from "vitest";
3-
import { UnexpectedExitError } from "@trigger.dev/core/v3/errors";
3+
import { UncaughtExceptionError, UnexpectedExitError } from "@trigger.dev/core/v3/errors";
44
import type {
55
TaskRunExecution,
66
TaskRunExecutionPayload,
@@ -118,4 +118,37 @@ describe("TaskRunProcess", () => {
118118
}
119119
});
120120
});
121+
122+
describe("parseExecuteError(UncaughtExceptionError)", () => {
123+
it("surfaces the original error name/message/stack as TASK_EXECUTION_FAILED", () => {
124+
const error = new UncaughtExceptionError(
125+
{
126+
name: "Error",
127+
message: "read ECONNRESET",
128+
stack:
129+
"Error: read ECONNRESET\n at TCP.onStreamRead (node:internal/stream_base_commons:216:20)",
130+
},
131+
"uncaughtException"
132+
);
133+
134+
const result = TaskRunProcess.parseExecuteError(error);
135+
136+
expect(result.type).toBe("INTERNAL_ERROR");
137+
expect(result.code).toBe("TASK_EXECUTION_FAILED");
138+
expect(result.message).toBe("Uncaught uncaughtException: read ECONNRESET");
139+
expect(result.stackTrace).toContain("TCP.onStreamRead");
140+
});
141+
142+
it("preserves origin=unhandledRejection in the surfaced message", () => {
143+
const error = new UncaughtExceptionError(
144+
{ name: "Error", message: "boom" },
145+
"unhandledRejection"
146+
);
147+
148+
const result = TaskRunProcess.parseExecuteError(error);
149+
150+
expect(result.code).toBe("TASK_EXECUTION_FAILED");
151+
expect(result.message).toBe("Uncaught unhandledRejection: boom");
152+
});
153+
});
121154
});

packages/cli-v3/src/executions/taskRunProcess.ts

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import {
3333
MaxDurationExceededError,
3434
UnexpectedExitError,
3535
SuspendedProcessError,
36+
UncaughtExceptionError,
3637
} from "@trigger.dev/core/v3/errors";
3738

3839
export type OnSendDebugLogMessage = InferSocketMessageSchema<
@@ -205,6 +206,18 @@ export class TaskRunProcess {
205206
},
206207
UNCAUGHT_EXCEPTION: async (message) => {
207208
logger.debug("uncaught exception in task run process", { ...message });
209+
210+
// The worker process reports uncaught exceptions and unhandled rejections via this
211+
// event, but does not exit on its own. If we don't terminate the attempt here, run()
212+
// hangs (the awaited promise that triggered the throw is orphaned) until maxDuration
213+
// expires — surfacing as TIMED_OUT/MAX_DURATION_EXCEEDED with empty attempts. Reject
214+
// any pending attempts now and gracefully terminate the worker so OTEL gets a flush
215+
// window before SIGKILL.
216+
this.#rejectPendingAttempts(
217+
new UncaughtExceptionError(message.error, message.origin)
218+
);
219+
220+
await this.#gracefullyTerminate(this.options.gracefulTerminationTimeoutInMs);
208221
},
209222
SEND_DEBUG_LOG: async (message) => {
210223
this.onSendDebugLog.post(message);
@@ -339,6 +352,23 @@ export class TaskRunProcess {
339352
logger.debug("child process error", { error, pid: this.pid });
340353
}
341354

355+
#rejectPendingAttempts(error: Error) {
356+
for (const [id, status] of this._attemptStatuses.entries()) {
357+
if (status !== "PENDING") {
358+
continue;
359+
}
360+
361+
this._attemptStatuses.set(id, "REJECTED");
362+
363+
const attemptPromise = this._attemptPromises.get(id);
364+
if (!attemptPromise) {
365+
continue;
366+
}
367+
368+
attemptPromise.rejecter(error);
369+
}
370+
}
371+
342372
async #handleExit(code: number | null, signal: NodeJS.Signals | null) {
343373
logger.debug("handling child exit", { code, signal, pid: this.pid });
344374

@@ -559,6 +589,15 @@ export class TaskRunProcess {
559589
};
560590
}
561591

592+
if (error instanceof UncaughtExceptionError) {
593+
return {
594+
type: "INTERNAL_ERROR",
595+
code: TaskRunErrorCodes.TASK_EXECUTION_FAILED,
596+
message: `Uncaught ${error.origin}: ${error.originalError.message}`,
597+
stackTrace: error.originalError.stack,
598+
};
599+
}
600+
562601
return {
563602
type: "INTERNAL_ERROR",
564603
code: TaskRunErrorCodes.TASK_EXECUTION_FAILED,

0 commit comments

Comments
 (0)