Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/fix-blocking-waitpoint-race-condition.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: fix
---

Fix a race condition in the waitpoint system where a run could be blocked by a completed waitpoint but never be resumed because of a PostgreSQL MVCC issue. This was most likely to occur when creating a waitpoint via `wait.forToken()` at the same moment as completing the token with `wait.completeToken()`. Other types of waitpoints (timed, child runs) were not affected.
39 changes: 33 additions & 6 deletions internal-packages/run-engine/src/engine/systems/waitpointSystem.ts
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,22 @@ export class WaitpointSystem {

/**
* Prevents a run from continuing until the waitpoint is completed.
*
* This method uses two separate SQL statements intentionally:
*
* 1. A CTE that INSERTs TaskRunWaitpoint rows (blocking connections) and
* _WaitpointRunConnections rows (historical connections).
*
* 2. A separate SELECT that checks if any of the requested waitpoints are still PENDING.
*
* These MUST be separate statements because of PostgreSQL MVCC in READ COMMITTED isolation:
* each statement gets its own snapshot. If a concurrent `completeWaitpoint` commits between
* the CTE starting and finishing, the CTE's snapshot won't see the COMPLETED status. By using
* a separate SELECT, we get a fresh snapshot that reflects the latest committed state.
*
* The pending check queries ALL requested waitpoint IDs (not just the ones actually inserted
* by the CTE). This is intentional: if a TaskRunWaitpoint row already existed (ON CONFLICT
* DO NOTHING skipped the insert), a still-PENDING waitpoint should still count as blocking.
*/
async blockRunWithWaitpoint({
runId,
Expand Down Expand Up @@ -399,8 +415,10 @@ export class WaitpointSystem {
return await this.$.runLock.lock("blockRunWithWaitpoint", [runId], async () => {
let snapshot: TaskRunExecutionSnapshot = await getLatestExecutionSnapshot(prisma, runId);

//block the run with the waitpoints, returning how many waitpoints are pending
const insert = await prisma.$queryRaw<{ pending_count: BigInt }[]>`
// Insert the blocking connections and the historical run connections.
// We use a CTE to do both inserts atomically. Data-modifying CTEs are
// always executed regardless of whether they're referenced in the outer query.
await prisma.$queryRaw`
WITH inserted AS (
INSERT INTO "TaskRunWaitpoint" ("id", "taskRunId", "waitpointId", "projectId", "createdAt", "updatedAt", "spanIdToComplete", "batchId", "batchIndex")
SELECT
Expand All @@ -425,12 +443,21 @@ export class WaitpointSystem {
WHERE w.id IN (${Prisma.join($waitpoints)})
ON CONFLICT DO NOTHING
)
SELECT COUNT(*) FROM inserted`;

// Check if the run is actually blocked using a separate query.
// This MUST be a separate statement from the CTE above because in READ COMMITTED
// isolation, each statement gets its own snapshot. The CTE's snapshot is taken when
// it starts, so if a concurrent completeWaitpoint commits during the CTE, the CTE
// won't see it. This fresh query gets a new snapshot that reflects the latest commits.
const pendingCheck = await prisma.$queryRaw<{ pending_count: BigInt }[]>`
SELECT COUNT(*) as pending_count
FROM inserted i
JOIN "Waitpoint" w ON w.id = i."waitpointId"
WHERE w.status = 'PENDING';`;
FROM "Waitpoint"
WHERE id IN (${Prisma.join($waitpoints)})
AND status = 'PENDING'
`;

const isRunBlocked = Number(insert.at(0)?.pending_count ?? 0) > 0;
const isRunBlocked = Number(pendingCheck.at(0)?.pending_count ?? 0) > 0;

let newStatus: TaskRunExecutionStatus = "SUSPENDED";
if (
Expand Down
Loading