Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions lib/sea/SeaNativeLoader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,47 @@ export interface SeaNativeExecuteOptions {
queryTags?: Record<string, string>;
}

/**
* Server-side execution status returned by
* `AsyncStatement.status()`. Mirrors the kernel `StatementStatus`
* enum collapsed to its variant name. `'Unknown'` is the
* forward-compat arm for kernel variants the binding doesn't
* recognise.
*/
export type SeaNativeStatementStatus =
| 'Pending'
| 'Running'
| 'Succeeded'
| 'Failed'
| 'Cancelled'
| 'Closed'
| 'Unknown';

/**
* Typed surface for the opaque napi `AsyncResultHandle`. Returned
* by `AsyncStatement.awaitResult()`; same fetch-side surface as
* `SeaNativeStatement` but without `cancel()` / `close()` (the
* parent `AsyncStatement` owns server-side lifecycle).
*/
export interface SeaNativeAsyncResultHandle {
readonly statementId: string;
fetchNextBatch(): Promise<SeaArrowBatch | null>;
schema(): Promise<SeaArrowSchema>;
}

/**
* Typed surface for the opaque napi `AsyncStatement`. Returned by
* `Connection.submitStatement(...)`. JS drives polling via
* `status()` or blocks via `awaitResult()`.
*/
export interface SeaNativeAsyncStatement {
readonly statementId: string;
status(): Promise<SeaNativeStatementStatus>;
awaitResult(): Promise<SeaNativeAsyncResultHandle>;
cancel(): Promise<void>;
close(): Promise<void>;
}

/**
* Typed surface for the opaque napi `Connection` handle.
*/
Expand All @@ -113,6 +154,16 @@ export interface SeaNativeConnection {
* (per-statement Spark conf overlay) and `queryTags`.
*/
executeStatement(sql: string, options?: SeaNativeExecuteOptions): Promise<SeaNativeStatement>;
/**
* Submit a SQL statement asynchronously and return an
* `AsyncStatement` handle. The kernel sends `wait_timeout=0s` so
* the server returns immediately with a `statement_id` while the
* query is still Pending/Running. Drive polling via the returned
* handle's `status()` / `awaitResult()` methods. Drop-cancel
* during `awaitResult()` is handled by the kernel's
* `AwaitResultCancelGuard`.
*/
submitStatement(sql: string, options?: SeaNativeExecuteOptions): Promise<SeaNativeAsyncStatement>;
close(): Promise<void>;
}

Expand Down
57 changes: 56 additions & 1 deletion native/sea/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,58 @@ export interface ArrowBatch {
}
/**
* An Arrow IPC stream payload encoding just the result schema (no
* record-batch messages). Returned by `Statement.schema()`.
* record-batch messages). Returned by `Statement.schema()` or
* `AsyncResultHandle.schema()`.
*/
export interface ArrowSchema {
ipcBytes: Buffer
}
/**
* Opaque async-statement handle returned by
* `Connection.submitStatement(...)`. The kernel sent
* `wait_timeout=0s`, so the server is still Pending/Running when
* this handle is constructed; JS drives polling via `status()` or
* blocks via `awaitResult()`. Drop-cancel during `awaitResult()`
* is handled by the kernel's `AwaitResultCancelGuard`.
*/
export declare class AsyncStatement {
/** Server-issued statement id. Cached at construction. */
get statementId(): string
/**
* One-shot status check. Returns
* `'Pending' | 'Running' | 'Succeeded' | 'Failed' | 'Cancelled' |
* 'Closed' | 'Unknown'`. Returns `KernelError(InvalidStatementHandle)`
* if the statement has been explicitly `close()`d.
*/
status(): Promise<string>
/**
* Block until the server reaches a terminal state, then return an
* `AsyncResultHandle` that wraps the materialised result stream.
*/
awaitResult(): Promise<AsyncResultHandle>
/** Server-side cancel. */
cancel(): Promise<void>
/** Explicit close. Idempotent. */
close(): Promise<void>
}
/**
* Opaque result-fetch handle returned by
* `AsyncStatement.awaitResult()`. Wraps a kernel `ResultStream`.
* No `cancel()` / `close()` — the parent `AsyncStatement` owns
* server-side lifecycle.
*/
export declare class AsyncResultHandle {
/** Server-issued statement id. Matches the parent `AsyncStatement`. */
get statementId(): string
/**
* Pull the next batch of results. Returns `null` when the stream
* is exhausted. Byte-identical IPC payload to the sync
* `Statement.fetchNextBatch()` for the same query.
*/
fetchNextBatch(): Promise<ArrowBatch | null>
/** Result schema as a schema-only Arrow IPC payload. */
schema(): Promise<ArrowSchema>
}
/**
* Returns the native binding's crate version (`CARGO_PKG_VERSION`).
*
Expand Down Expand Up @@ -131,6 +178,14 @@ export declare class Connection {
* `serializeQueryTags` wire shape).
*/
executeStatement(sql: string, options?: ExecuteOptions | undefined | null): Promise<Statement>
/**
* Submit a SQL statement asynchronously and return an
* `AsyncStatement` handle. The kernel `Statement::submit()` sends
* `wait_timeout=0s`, so the server returns immediately with a
* statement_id while the query is still Pending/Running. Drive
* polling via the returned handle.
*/
submitStatement(sql: string, options?: ExecuteOptions | undefined | null): Promise<AsyncStatement>
/**
* Explicit close. Marks the connection wrapper as closed so
* subsequent calls on this `Connection` return `InvalidArg`, then
Expand Down
236 changes: 236 additions & 0 deletions tests/e2e/sea/async-execute-e2e.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
// Copyright (c) 2026 Databricks, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
* End-to-end tests for the SEA submit/await_result async-consumption
* path through the napi binding.
*
* Path under test:
* 1. `binding.openSession(...)` — kernel `Session::open()`
* 2. `connection.submitStatement(sql)` — kernel `Statement::submit()`
* (wait_timeout=0s, server returns Pending/Running with a
* statement_id)
* 3. `asyncStmt.status()` / `.awaitResult()` — kernel `status()` /
* `await_result()`
* 4. `asyncResult.fetchNextBatch()` — kernel `ResultStream::next_batch()`
*
* The kernel's `AwaitResultCancelGuard` covers drop-cancel safety;
* the `cancel-while-pending` test exercises explicit cancel mid-poll
* by racing `asyncStmt.cancel()` against `asyncStmt.awaitResult()`.
*
* Calls the napi binding directly (same pattern as
* `operation-lifecycle-e2e.test.ts`) — the higher-level
* `DBSQLOperation` async-mode integration (matching Thrift's
* `IDBSQLOperation` polling-mode surface) is a follow-on. This test
* proves the kernel → napi → JS shape works end-to-end.
*
* Skipped when `DATABRICKS_PECOTESTING_*` env vars are absent.
*/

import { expect } from 'chai';
import { tableFromIPC } from 'apache-arrow';
import { existsSync } from 'fs';
import { resolve as resolvePath } from 'path';
import { createRequire } from 'module';

// `createRequire(import.meta.url)` so the require works under both
// CJS and the ESM-reparse path mocha 11+ may use
// (MODULE_TYPELESS_PACKAGE_JSON reparse-as-ESM).
// eslint-disable-next-line @typescript-eslint/naming-convention
const requireFromHere = createRequire(import.meta.url);

interface NativeBinding {
openSession(opts: {
hostName: string;
httpPath: string;
token: string;
}): Promise<NativeConnection>;
}

interface NativeConnection {
submitStatement(sql: string): Promise<NativeAsyncStatement>;
close(): Promise<void>;
}

interface NativeAsyncStatement {
readonly statementId: string;
status(): Promise<string>;
awaitResult(): Promise<NativeAsyncResultHandle>;
cancel(): Promise<void>;
close(): Promise<void>;
}

interface NativeAsyncResultHandle {
readonly statementId: string;
fetchNextBatch(): Promise<{ ipcBytes: Buffer } | null>;
schema(): Promise<{ ipcBytes: Buffer }>;
}

describe('SEA async execute — submit / status / awaitResult / cancel', function suite() {
this.timeout(180_000);

const hostName =
process.env.DATABRICKS_PECOTESTING_SERVER_HOSTNAME || process.env.E2E_HOST;
const httpPath =
process.env.DATABRICKS_PECOTESTING_HTTP_PATH || process.env.E2E_PATH;
const token =
process.env.DATABRICKS_PECOTESTING_TOKEN || process.env.E2E_ACCESS_TOKEN;

before(function gate() {
if (!hostName || !httpPath || !token) {
// eslint-disable-next-line no-invalid-this
this.skip();
return;
}
// Verify the native artifact exists before any test calls
// loadBinding(). Skip-with-message if absent. DA round-1 H1 fixup
// (skip-gate must not crash MODULE_NOT_FOUND when build:native not
// run).
const nodeArtifact = resolvePath(
process.cwd(),
'native/sea/index.linux-x64-gnu.node',
);
if (!existsSync(nodeArtifact)) {
// eslint-disable-next-line no-console
console.warn(
`[sea async-execute e2e] skipping: native binary not built. ` +
`Run \`yarn build:native\` first.`,
);
// eslint-disable-next-line no-invalid-this
this.skip();
}
});

/**
* Lazy-load the native binding so the test file is requirable in
* environments where the `.node` artifact isn't built yet — the
* `before()` gate skips the suite before we touch the binding.
*/
function loadBinding(): NativeBinding {
return requireFromHere('../../../native/sea/index.js') as NativeBinding;
}

it('submit returns immediately with a statement_id; awaitResult drains', async () => {
const binding = loadBinding();
const connection = await binding.openSession({
hostName: hostName as string,
httpPath: httpPath as string,
token: token as string,
});

let asyncStmt: NativeAsyncStatement | null = null;
try {
asyncStmt = await connection.submitStatement('SELECT * FROM range(0, 100)');
expect(asyncStmt).to.be.an('object');
expect(asyncStmt.statementId).to.be.a('string').and.to.have.length.greaterThan(0);

// Block on the server-side terminal state. The kernel's
// internal polling handles backoff and the drop-cancel guard.
const result = await asyncStmt.awaitResult();
expect(result.statementId).to.equal(asyncStmt.statementId);

// Drain the full result and assert row count.
let totalRows = 0;
// eslint-disable-next-line no-constant-condition
while (true) {
// eslint-disable-next-line no-await-in-loop
const envelope = await result.fetchNextBatch();
if (envelope === null) {
break;
}
const table = tableFromIPC(envelope.ipcBytes);
totalRows += table.numRows;
}
expect(totalRows).to.equal(100);
} finally {
if (asyncStmt !== null) {
try {
await asyncStmt.close();
} catch (_) {
// best-effort cleanup
}
}
await connection.close();
}
});

it('status() returns a string variant from the kernel StatementStatus enum', async () => {
const binding = loadBinding();
const connection = await binding.openSession({
hostName: hostName as string,
httpPath: httpPath as string,
token: token as string,
});

let asyncStmt: NativeAsyncStatement | null = null;
try {
asyncStmt = await connection.submitStatement('SELECT * FROM range(0, 100)');
const status = await asyncStmt.status();
expect(status).to.be.a('string');
expect(['Pending', 'Running', 'Succeeded', 'Closed']).to.include(
status,
`unexpected status: ${status}`,
);

// Drain via awaitResult to release server-side resources.
await asyncStmt.awaitResult();
} finally {
if (asyncStmt !== null) {
try {
await asyncStmt.close();
} catch (_) {
// best-effort cleanup
}
}
await connection.close();
}
});

it('cancel() against a still-pending async statement completes quickly', async () => {
const binding = loadBinding();
const connection = await binding.openSession({
hostName: hostName as string,
httpPath: httpPath as string,
token: token as string,
});

let asyncStmt: NativeAsyncStatement | null = null;
try {
// Large enough query that the server will not have finished by
// the time we issue cancel. `range(0, 100_000_000)` was used in
// the existing sync cancel test for the same reason.
asyncStmt = await connection.submitStatement(
'SELECT * FROM range(0, 100000000)',
);
expect(asyncStmt.statementId).to.have.length.greaterThan(0);

const t0 = Date.now();
await asyncStmt.cancel();
const elapsed = Date.now() - t0;
// cancel should not block on completion of the underlying query;
// it just sends a CancelStatement and returns. Allow a generous
// budget for wire latency.
expect(elapsed).to.be.lessThan(2000, `cancel latency ${elapsed}ms`);
} finally {
if (asyncStmt !== null) {
try {
await asyncStmt.close();
} catch (_) {
// best-effort cleanup; cancelled statements may surface a close error
}
}
await connection.close();
}
});
});
Loading