Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions packages/storage/src/graphdb-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,55 @@ export const ROUND_TRIP_COLUMN_MAP: readonly (readonly [
["content_hash", "contentHash", "string"],
];

// ---------------------------------------------------------------------------
// Transient bulk-load retry
// ---------------------------------------------------------------------------

/** Resolve after `ms` milliseconds. Used for bulk-load retry backoff. */
function delay(ms: number): Promise<void> {
return new Promise((res) => {
setTimeout(res, ms);
});
}

/**
* True for the transient lbug WAL→checkpoint rename failure that surfaces
* under load — e.g. `IO exception: Error renaming file <db>.wal to
* <db>.wal.checkpoint. ErrorMessage: No such file or directory`. The data is
* already in the WAL (a reopen recovers it), so this specific failure is
* safe to retry. Matched on the stable token trio (renaming + .wal +
* checkpoint) rather than the OS-specific errno suffix, which varies by
* platform. Every other error returns false and rethrows.
*/
export function isTransientCheckpointError(err: unknown): boolean {
const msg = err instanceof Error ? err.message : String(err);
return /renaming/i.test(msg) && /\.wal\b/i.test(msg) && /checkpoint/i.test(msg);
}

/**
* Run `fn`, retrying up to `maxAttempts` times when it throws a transient
* WAL→checkpoint rename error (see {@link isTransientCheckpointError}). Any
* other error rethrows immediately; the transient error on the final attempt
* also rethrows. Backoff scales with attempt (25ms, 50ms, …) to let the OS
* settle the WAL file. Extracted as a pure helper so the retry policy is unit-
* testable without provoking a native race. Used only by replace-mode-safe
* bulk-load, which is idempotent (truncate-then-insert).
*/
export async function retryTransientCheckpoint<T>(
fn: () => Promise<T>,
maxAttempts = 3,
backoff: (attempt: number) => Promise<void> = (attempt) => delay(attempt * 25),
): Promise<T> {
for (let attempt = 1; ; attempt++) {
try {
return await fn();
} catch (err) {
if (attempt >= maxAttempts || !isTransientCheckpointError(err)) throw err;
await backoff(attempt);
}
}
}

// ---------------------------------------------------------------------------
// COPY FROM (subquery) bulk insert
// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -570,7 +619,25 @@ export class GraphDbStore implements IGraphStore {
// Bulk load
// --------------------------------------------------------------------------

/**
* Bulk-load with a bounded retry for the transient lbug WAL→checkpoint
* IO race. Under CPU/IO pressure the native binding's auto-checkpoint can
* fail to rename `graph.lbug.wal` → `.wal.checkpoint` ("No such file or
* directory") even though the data is already durably in the WAL. The
* write otherwise succeeds (a reopen recovers the WAL), so the failure is
* a flaky teardown artifact, not data loss — but unretried it bubbles to
* the CLI's top-level catch and fails `analyze` with exit 1. Observed only
* on loaded CI runners (varies by leg), never on an idle box.
*
* replace-mode bulkLoad is idempotent (truncate-then-insert fully replaces
* prior contents), so a retry is safe. We retry only the transient
* checkpoint-rename class; every other error rethrows immediately.
*/
async bulkLoad(graph: KnowledgeGraph, opts: BulkLoadOptions = {}): Promise<BulkLoadStats> {
return retryTransientCheckpoint(() => this.#bulkLoadOnce(graph, opts));
}

async #bulkLoadOnce(graph: KnowledgeGraph, opts: BulkLoadOptions = {}): Promise<BulkLoadStats> {
const pool = this.requirePool();
const started = performance.now();
const mode = opts.mode ?? "replace";
Expand Down
141 changes: 141 additions & 0 deletions packages/storage/src/transient-checkpoint.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/**
* Tests for `isTransientCheckpointError` — the matcher that gates the
* bulk-load retry in {@link GraphDbStore.bulkLoad}.
*
* The lbug native binding can fail the WAL→checkpoint rename under load with
* an "Error renaming file <db>.wal to <db>.wal.checkpoint" IO exception even
* though the write is durably in the WAL. That specific failure is safe to
* retry (replace-mode bulkLoad is idempotent); everything else must rethrow.
* These tests pin the matcher to the real lbug message and guard against it
* widening to swallow unrelated failures.
*/

import assert from "node:assert/strict";
import { test } from "node:test";
import { isTransientCheckpointError, retryTransientCheckpoint } from "./graphdb-adapter.js";

/** The canonical transient WAL→checkpoint rename error. */
const checkpointErr = () =>
new Error(
"IO exception: Error renaming file graph.lbug.wal to graph.lbug.wal.checkpoint. " +
"ErrorMessage: No such file or directory",
);
/** Zero-delay backoff so retry tests don't sleep. */
const noBackoff = () => Promise.resolve();

test("matches the real lbug WAL→checkpoint rename failure", () => {
const real =
"IO exception: Error renaming file /tmp/x/.codehub/graph.lbug.wal to " +
"/tmp/x/.codehub/graph.lbug.wal.checkpoint. ErrorMessage: No such file or directory";
assert.equal(isTransientCheckpointError(new Error(real)), true);
});

test("matches regardless of the OS-specific errno suffix", () => {
// Linux/macOS phrase the trailing errno differently; the matcher keys on
// the stable token trio (renaming + .wal + checkpoint), not the suffix.
const variant =
"IO exception: Error renaming file graph.lbug.wal to graph.lbug.wal.checkpoint. " +
"ErrorMessage: Permission denied";
assert.equal(isTransientCheckpointError(new Error(variant)), true);
});

test("accepts a non-Error thrown value (string)", () => {
const s = "Error renaming file a.wal to a.wal.checkpoint. boom";
assert.equal(isTransientCheckpointError(s), true);
});

test("does NOT match an unrelated IO error", () => {
assert.equal(
isTransientCheckpointError(new Error("IO exception: disk full while writing CodeNode")),
false,
);
});

test("does NOT match a generic checkpoint mention without a WAL rename", () => {
// A CHECKPOINT statement error that isn't the rename race must rethrow.
assert.equal(
isTransientCheckpointError(new Error("CHECKPOINT failed: transaction conflict")),
false,
);
});

test("does NOT match a query/constraint error", () => {
assert.equal(
isTransientCheckpointError(
new Error("Runtime exception: primary key violation on CodeNode.id"),
),
false,
);
});

test("does NOT match undefined / null", () => {
assert.equal(isTransientCheckpointError(undefined), false);
assert.equal(isTransientCheckpointError(null), false);
});

// ---------------------------------------------------------------------------
// retryTransientCheckpoint — the policy that wraps bulkLoad
// ---------------------------------------------------------------------------

test("recovers when the transient error clears before maxAttempts", async () => {
let calls = 0;
const result = await retryTransientCheckpoint(
async () => {
calls++;
if (calls < 3) throw checkpointErr(); // fail attempts 1 and 2
return "ok";
},
3,
noBackoff,
);
assert.equal(result, "ok");
assert.equal(calls, 3, "should have retried twice then succeeded on the 3rd attempt");
});

test("succeeds on the first attempt without retrying", async () => {
let calls = 0;
const result = await retryTransientCheckpoint(
async () => {
calls++;
return 42;
},
3,
noBackoff,
);
assert.equal(result, 42);
assert.equal(calls, 1, "no retry when the first attempt succeeds");
});

test("rethrows the transient error after exhausting maxAttempts", async () => {
let calls = 0;
await assert.rejects(
() =>
retryTransientCheckpoint(
async () => {
calls++;
throw checkpointErr();
},
3,
noBackoff,
),
/renaming/,
);
assert.equal(calls, 3, "should attempt exactly maxAttempts times before giving up");
});

test("rethrows a non-transient error immediately without retrying", async () => {
let calls = 0;
await assert.rejects(
() =>
retryTransientCheckpoint(
async () => {
calls++;
throw new Error("primary key violation on CodeNode.id");
},
3,
noBackoff,
),
/primary key/,
);
assert.equal(calls, 1, "a non-transient error must NOT be retried");
});
14 changes: 12 additions & 2 deletions scripts/verify-global-install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -277,11 +277,21 @@ else
if [ ! -d "$FIXTURE_DIR" ]; then
fail "smoke: fixture directory '$FIXTURE_DIR' missing"
else
if codehub analyze "$FIXTURE_DIR" >/dev/null 2>&1; then
# Capture combined output to a temp log instead of `>/dev/null 2>&1` so a
# non-zero exit is DIAGNOSABLE. Swallowing the output is why analyze-smoke
# failures on some runners read as undebuggable "flakes": the actual
# stderr (the throw that set exit 1) never reached the CI log. On failure
# we echo the tail so the next run shows the real cause.
ANALYZE_LOG=$(mktemp -t verify-global-install-analyze.XXXXXX)
if codehub analyze "$FIXTURE_DIR" >"$ANALYZE_LOG" 2>&1; then
pass "smoke: codehub analyze $FIXTURE_DIR exits 0"
else
fail "smoke: codehub analyze $FIXTURE_DIR exited non-zero"
analyze_rc=$?
fail "smoke: codehub analyze $FIXTURE_DIR exited non-zero (rc=$analyze_rc)"
note "tail of analyze output:"
tail -40 "$ANALYZE_LOG" | sed 's/^/ /' >&2 || true
fi
rm -f "$ANALYZE_LOG"
fi

# ------------------------------------------------------------------ smoke: codehub query 'export default'
Expand Down
Loading