diff --git a/lib/sea/SeaNativeLoader.ts b/lib/sea/SeaNativeLoader.ts index c1f25f4f..edc9cbcc 100644 --- a/lib/sea/SeaNativeLoader.ts +++ b/lib/sea/SeaNativeLoader.ts @@ -100,6 +100,47 @@ export interface SeaNativeExecuteOptions { queryTags?: Record; } +/** + * Server-side execution status returned by + * `AsyncStatement.status()`. Mirrors the kernel `StatementStatus` + * enum collapsed to its variant name. `'Unknown'` is the + * forward-compat arm for kernel variants the binding doesn't + * recognise. + */ +export type SeaNativeStatementStatus = + | 'Pending' + | 'Running' + | 'Succeeded' + | 'Failed' + | 'Cancelled' + | 'Closed' + | 'Unknown'; + +/** + * Typed surface for the opaque napi `AsyncResultHandle`. Returned + * by `AsyncStatement.awaitResult()`; same fetch-side surface as + * `SeaNativeStatement` but without `cancel()` / `close()` (the + * parent `AsyncStatement` owns server-side lifecycle). + */ +export interface SeaNativeAsyncResultHandle { + readonly statementId: string; + fetchNextBatch(): Promise; + schema(): Promise; +} + +/** + * Typed surface for the opaque napi `AsyncStatement`. Returned by + * `Connection.submitStatement(...)`. JS drives polling via + * `status()` or blocks via `awaitResult()`. + */ +export interface SeaNativeAsyncStatement { + readonly statementId: string; + status(): Promise; + awaitResult(): Promise; + cancel(): Promise; + close(): Promise; +} + /** * Typed surface for the opaque napi `Connection` handle. */ @@ -113,6 +154,16 @@ export interface SeaNativeConnection { * (per-statement Spark conf overlay) and `queryTags`. */ executeStatement(sql: string, options?: SeaNativeExecuteOptions): Promise; + /** + * Submit a SQL statement asynchronously and return an + * `AsyncStatement` handle. The kernel sends `wait_timeout=0s` so + * the server returns immediately with a `statement_id` while the + * query is still Pending/Running. Drive polling via the returned + * handle's `status()` / `awaitResult()` methods. Drop-cancel + * during `awaitResult()` is handled by the kernel's + * `AwaitResultCancelGuard`. + */ + submitStatement(sql: string, options?: SeaNativeExecuteOptions): Promise; close(): Promise; } diff --git a/native/sea/index.d.ts b/native/sea/index.d.ts index 2ce8fa34..8d03d658 100644 --- a/native/sea/index.d.ts +++ b/native/sea/index.d.ts @@ -96,11 +96,58 @@ export interface ArrowBatch { } /** * An Arrow IPC stream payload encoding just the result schema (no - * record-batch messages). Returned by `Statement.schema()`. + * record-batch messages). Returned by `Statement.schema()` or + * `AsyncResultHandle.schema()`. */ export interface ArrowSchema { ipcBytes: Buffer } +/** + * Opaque async-statement handle returned by + * `Connection.submitStatement(...)`. The kernel sent + * `wait_timeout=0s`, so the server is still Pending/Running when + * this handle is constructed; JS drives polling via `status()` or + * blocks via `awaitResult()`. Drop-cancel during `awaitResult()` + * is handled by the kernel's `AwaitResultCancelGuard`. + */ +export declare class AsyncStatement { + /** Server-issued statement id. Cached at construction. */ + get statementId(): string + /** + * One-shot status check. Returns + * `'Pending' | 'Running' | 'Succeeded' | 'Failed' | 'Cancelled' | + * 'Closed' | 'Unknown'`. Returns `KernelError(InvalidStatementHandle)` + * if the statement has been explicitly `close()`d. + */ + status(): Promise + /** + * Block until the server reaches a terminal state, then return an + * `AsyncResultHandle` that wraps the materialised result stream. + */ + awaitResult(): Promise + /** Server-side cancel. */ + cancel(): Promise + /** Explicit close. Idempotent. */ + close(): Promise +} +/** + * Opaque result-fetch handle returned by + * `AsyncStatement.awaitResult()`. Wraps a kernel `ResultStream`. + * No `cancel()` / `close()` — the parent `AsyncStatement` owns + * server-side lifecycle. + */ +export declare class AsyncResultHandle { + /** Server-issued statement id. Matches the parent `AsyncStatement`. */ + get statementId(): string + /** + * Pull the next batch of results. Returns `null` when the stream + * is exhausted. Byte-identical IPC payload to the sync + * `Statement.fetchNextBatch()` for the same query. + */ + fetchNextBatch(): Promise + /** Result schema as a schema-only Arrow IPC payload. */ + schema(): Promise +} /** * Returns the native binding's crate version (`CARGO_PKG_VERSION`). * @@ -131,6 +178,14 @@ export declare class Connection { * `serializeQueryTags` wire shape). */ executeStatement(sql: string, options?: ExecuteOptions | undefined | null): Promise + /** + * Submit a SQL statement asynchronously and return an + * `AsyncStatement` handle. The kernel `Statement::submit()` sends + * `wait_timeout=0s`, so the server returns immediately with a + * statement_id while the query is still Pending/Running. Drive + * polling via the returned handle. + */ + submitStatement(sql: string, options?: ExecuteOptions | undefined | null): Promise /** * Explicit close. Marks the connection wrapper as closed so * subsequent calls on this `Connection` return `InvalidArg`, then diff --git a/tests/e2e/sea/async-execute-e2e.test.ts b/tests/e2e/sea/async-execute-e2e.test.ts new file mode 100644 index 00000000..d662477c --- /dev/null +++ b/tests/e2e/sea/async-execute-e2e.test.ts @@ -0,0 +1,236 @@ +// Copyright (c) 2026 Databricks, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * End-to-end tests for the SEA submit/await_result async-consumption + * path through the napi binding. + * + * Path under test: + * 1. `binding.openSession(...)` — kernel `Session::open()` + * 2. `connection.submitStatement(sql)` — kernel `Statement::submit()` + * (wait_timeout=0s, server returns Pending/Running with a + * statement_id) + * 3. `asyncStmt.status()` / `.awaitResult()` — kernel `status()` / + * `await_result()` + * 4. `asyncResult.fetchNextBatch()` — kernel `ResultStream::next_batch()` + * + * The kernel's `AwaitResultCancelGuard` covers drop-cancel safety; + * the `cancel-while-pending` test exercises explicit cancel mid-poll + * by racing `asyncStmt.cancel()` against `asyncStmt.awaitResult()`. + * + * Calls the napi binding directly (same pattern as + * `operation-lifecycle-e2e.test.ts`) — the higher-level + * `DBSQLOperation` async-mode integration (matching Thrift's + * `IDBSQLOperation` polling-mode surface) is a follow-on. This test + * proves the kernel → napi → JS shape works end-to-end. + * + * Skipped when `DATABRICKS_PECOTESTING_*` env vars are absent. + */ + +import { expect } from 'chai'; +import { tableFromIPC } from 'apache-arrow'; +import { existsSync } from 'fs'; +import { resolve as resolvePath } from 'path'; +import { createRequire } from 'module'; + +// `createRequire(import.meta.url)` so the require works under both +// CJS and the ESM-reparse path mocha 11+ may use +// (MODULE_TYPELESS_PACKAGE_JSON reparse-as-ESM). +// eslint-disable-next-line @typescript-eslint/naming-convention +const requireFromHere = createRequire(import.meta.url); + +interface NativeBinding { + openSession(opts: { + hostName: string; + httpPath: string; + token: string; + }): Promise; +} + +interface NativeConnection { + submitStatement(sql: string): Promise; + close(): Promise; +} + +interface NativeAsyncStatement { + readonly statementId: string; + status(): Promise; + awaitResult(): Promise; + cancel(): Promise; + close(): Promise; +} + +interface NativeAsyncResultHandle { + readonly statementId: string; + fetchNextBatch(): Promise<{ ipcBytes: Buffer } | null>; + schema(): Promise<{ ipcBytes: Buffer }>; +} + +describe('SEA async execute — submit / status / awaitResult / cancel', function suite() { + this.timeout(180_000); + + const hostName = + process.env.DATABRICKS_PECOTESTING_SERVER_HOSTNAME || process.env.E2E_HOST; + const httpPath = + process.env.DATABRICKS_PECOTESTING_HTTP_PATH || process.env.E2E_PATH; + const token = + process.env.DATABRICKS_PECOTESTING_TOKEN || process.env.E2E_ACCESS_TOKEN; + + before(function gate() { + if (!hostName || !httpPath || !token) { + // eslint-disable-next-line no-invalid-this + this.skip(); + return; + } + // Verify the native artifact exists before any test calls + // loadBinding(). Skip-with-message if absent. DA round-1 H1 fixup + // (skip-gate must not crash MODULE_NOT_FOUND when build:native not + // run). + const nodeArtifact = resolvePath( + process.cwd(), + 'native/sea/index.linux-x64-gnu.node', + ); + if (!existsSync(nodeArtifact)) { + // eslint-disable-next-line no-console + console.warn( + `[sea async-execute e2e] skipping: native binary not built. ` + + `Run \`yarn build:native\` first.`, + ); + // eslint-disable-next-line no-invalid-this + this.skip(); + } + }); + + /** + * Lazy-load the native binding so the test file is requirable in + * environments where the `.node` artifact isn't built yet — the + * `before()` gate skips the suite before we touch the binding. + */ + function loadBinding(): NativeBinding { + return requireFromHere('../../../native/sea/index.js') as NativeBinding; + } + + it('submit returns immediately with a statement_id; awaitResult drains', async () => { + const binding = loadBinding(); + const connection = await binding.openSession({ + hostName: hostName as string, + httpPath: httpPath as string, + token: token as string, + }); + + let asyncStmt: NativeAsyncStatement | null = null; + try { + asyncStmt = await connection.submitStatement('SELECT * FROM range(0, 100)'); + expect(asyncStmt).to.be.an('object'); + expect(asyncStmt.statementId).to.be.a('string').and.to.have.length.greaterThan(0); + + // Block on the server-side terminal state. The kernel's + // internal polling handles backoff and the drop-cancel guard. + const result = await asyncStmt.awaitResult(); + expect(result.statementId).to.equal(asyncStmt.statementId); + + // Drain the full result and assert row count. + let totalRows = 0; + // eslint-disable-next-line no-constant-condition + while (true) { + // eslint-disable-next-line no-await-in-loop + const envelope = await result.fetchNextBatch(); + if (envelope === null) { + break; + } + const table = tableFromIPC(envelope.ipcBytes); + totalRows += table.numRows; + } + expect(totalRows).to.equal(100); + } finally { + if (asyncStmt !== null) { + try { + await asyncStmt.close(); + } catch (_) { + // best-effort cleanup + } + } + await connection.close(); + } + }); + + it('status() returns a string variant from the kernel StatementStatus enum', async () => { + const binding = loadBinding(); + const connection = await binding.openSession({ + hostName: hostName as string, + httpPath: httpPath as string, + token: token as string, + }); + + let asyncStmt: NativeAsyncStatement | null = null; + try { + asyncStmt = await connection.submitStatement('SELECT * FROM range(0, 100)'); + const status = await asyncStmt.status(); + expect(status).to.be.a('string'); + expect(['Pending', 'Running', 'Succeeded', 'Closed']).to.include( + status, + `unexpected status: ${status}`, + ); + + // Drain via awaitResult to release server-side resources. + await asyncStmt.awaitResult(); + } finally { + if (asyncStmt !== null) { + try { + await asyncStmt.close(); + } catch (_) { + // best-effort cleanup + } + } + await connection.close(); + } + }); + + it('cancel() against a still-pending async statement completes quickly', async () => { + const binding = loadBinding(); + const connection = await binding.openSession({ + hostName: hostName as string, + httpPath: httpPath as string, + token: token as string, + }); + + let asyncStmt: NativeAsyncStatement | null = null; + try { + // Large enough query that the server will not have finished by + // the time we issue cancel. `range(0, 100_000_000)` was used in + // the existing sync cancel test for the same reason. + asyncStmt = await connection.submitStatement( + 'SELECT * FROM range(0, 100000000)', + ); + expect(asyncStmt.statementId).to.have.length.greaterThan(0); + + const t0 = Date.now(); + await asyncStmt.cancel(); + const elapsed = Date.now() - t0; + // cancel should not block on completion of the underlying query; + // it just sends a CancelStatement and returns. Allow a generous + // budget for wire latency. + expect(elapsed).to.be.lessThan(2000, `cancel latency ${elapsed}ms`); + } finally { + if (asyncStmt !== null) { + try { + await asyncStmt.close(); + } catch (_) { + // best-effort cleanup; cancelled statements may surface a close error + } + } + await connection.close(); + } + }); +});