diff --git a/lib/DBSQLClient.ts b/lib/DBSQLClient.ts index 38d55a54..7c6430bc 100644 --- a/lib/DBSQLClient.ts +++ b/lib/DBSQLClient.ts @@ -1,9 +1,7 @@ import thrift from 'thrift'; -import Int64 from 'node-int64'; import { EventEmitter } from 'events'; import TCLIService from '../thrift/TCLIService'; -import { TProtocolVersion } from '../thrift/TCLIService_types'; import IDBSQLClient, { ClientOptions, ConnectionOptions, OpenSessionRequest } from './contracts/IDBSQLClient'; import IDriver from './contracts/IDriver'; import IClientContext, { ClientConfig } from './contracts/IClientContext'; @@ -14,9 +12,12 @@ import IDBSQLSession from './contracts/IDBSQLSession'; import IAuthentication from './connection/contracts/IAuthentication'; import HttpConnection from './connection/connections/HttpConnection'; import IConnectionOptions from './connection/contracts/IConnectionOptions'; -import Status from './dto/Status'; import HiveDriverError from './errors/HiveDriverError'; -import { buildUserAgentString, definedOrError, serializeQueryTags } from './utils'; +import { buildUserAgentString } from './utils'; +import IBackend from './contracts/IBackend'; +import { InternalConnectionOptions } from './contracts/InternalConnectionOptions'; +import ThriftBackend from './thrift-backend/ThriftBackend'; +import SeaBackend from './sea/SeaBackend'; import PlainHttpAuthentication from './connection/auth/PlainHttpAuthentication'; import DatabricksOAuth, { OAuthFlow } from './connection/auth/DatabricksOAuth'; import { @@ -39,19 +40,6 @@ function prependSlash(str: string): string { return str; } -function getInitialNamespaceOptions(catalogName?: string, schemaName?: string) { - if (!catalogName && !schemaName) { - return {}; - } - - return { - initialNamespace: { - catalogName, - schemaName, - }, - }; -} - export type ThriftLibrary = Pick; export default class DBSQLClient extends EventEmitter implements IDBSQLClient, IClientContext { @@ -75,6 +63,8 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I private readonly sessions = new CloseableCollection(); + private backend?: IBackend; + private static getDefaultLogger(): IDBSQLLogger { if (!this.defaultLogger) { this.defaultLogger = new DBSQLLogger(); @@ -248,38 +238,53 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I this.connectionProvider = this.createConnectionProvider(options); - const thriftConnection = await this.connectionProvider.getThriftConnection(); - - thriftConnection.on('error', (error: Error) => { - // Error.stack already contains error type and message, so log stack if available, - // otherwise fall back to just error type + message - this.logger.log(LogLevel.error, error.stack || `${error.name}: ${error.message}`); - try { - this.emit('error', error); - } catch (e) { - // EventEmitter will throw unhandled error when emitting 'error' event. - // Since we already logged it few lines above, just suppress this behaviour - } - }); - - thriftConnection.on('reconnecting', (params: { delay: number; attempt: number }) => { - this.logger.log(LogLevel.debug, `Reconnecting, params: ${JSON.stringify(params)}`); - this.emit('reconnecting', params); - }); - - thriftConnection.on('close', () => { - this.logger.log(LogLevel.debug, 'Closing connection.'); - this.emit('close'); - }); + // M0: `useSEA` is consumed via a non-exported internal-options cast so it + // doesn't ship in the public `.d.ts`. Mirrors Python's `kwargs.get("use_sea")` + // pattern (see databricks-sql-python/src/databricks/sql/session.py). + const internalOptions = options as ConnectionOptions & InternalConnectionOptions; + this.backend = internalOptions.useSEA + ? new SeaBackend() + : new ThriftBackend({ + context: this, + onConnectionEvent: (event, payload) => this.forwardConnectionEvent(event, payload), + }); - thriftConnection.on('timeout', () => { - this.logger.log(LogLevel.debug, 'Connection timed out.'); - this.emit('timeout'); - }); + await this.backend.connect(options); return this; } + private forwardConnectionEvent(event: 'error' | 'reconnecting' | 'close' | 'timeout', payload?: unknown): void { + switch (event) { + case 'error': { + const error = payload as Error; + this.logger.log(LogLevel.error, error.stack || `${error.name}: ${error.message}`); + try { + this.emit('error', error); + } catch (e) { + // EventEmitter throws when 'error' has no listeners; we've already logged it. + } + return; + } + case 'reconnecting': + this.logger.log(LogLevel.debug, `Reconnecting, params: ${JSON.stringify(payload)}`); + this.emit('reconnecting', payload); + return; + case 'close': + this.logger.log(LogLevel.debug, 'Closing connection.'); + this.emit('close'); + return; + case 'timeout': + this.logger.log(LogLevel.debug, 'Connection timed out.'); + this.emit('timeout'); + // Explicit return mirrors the other cases and protects against + // fall-through if a new event is added below. + // eslint-disable-next-line no-useless-return + return; + // no default + } + } + /** * Starts new session * @public @@ -290,44 +295,20 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I * const session = await client.openSession(); */ public async openSession(request: OpenSessionRequest = {}): Promise { - // Prepare session configuration - const configuration = request.configuration ? { ...request.configuration } : {}; - - // Add metric view metadata config if enabled - if (this.config.enableMetricViewMetadata) { - configuration['spark.sql.thriftserver.metadata.metricview.enabled'] = 'true'; - } - - // Serialize queryTags dict and set in configuration; takes precedence over configuration.QUERY_TAGS - if (request.queryTags !== undefined) { - const serialized = serializeQueryTags(request.queryTags); - if (serialized) { - configuration.QUERY_TAGS = serialized; - } else { - delete configuration.QUERY_TAGS; - } + if (!this.backend) { + throw new HiveDriverError('DBSQLClient: not connected'); } - - const response = await this.driver.openSession({ - client_protocol_i64: new Int64(TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V8), - ...getInitialNamespaceOptions(request.initialCatalog, request.initialSchema), - configuration, - canUseMultipleCatalogs: true, - }); - - Status.assert(response.status); - const session = new DBSQLSession({ - handle: definedOrError(response.sessionHandle), - context: this, - serverProtocolVersion: response.serverProtocolVersion, - }); + const sessionBackend = await this.backend.openSession(request); + const session = new DBSQLSession({ backend: sessionBackend, context: this }); this.sessions.add(session); return session; } public async close(): Promise { await this.sessions.closeAll(); + await this.backend?.close(); + this.backend = undefined; this.client = undefined; this.connectionProvider = undefined; this.authProvider = undefined; diff --git a/lib/DBSQLOperation.ts b/lib/DBSQLOperation.ts index fe22995d..21b8f0fd 100644 --- a/lib/DBSQLOperation.ts +++ b/lib/DBSQLOperation.ts @@ -1,4 +1,3 @@ -import { stringify, NIL } from 'uuid'; import { Readable } from 'node:stream'; import IOperation, { FetchOptions, @@ -12,91 +11,45 @@ import IOperation, { } from './contracts/IOperation'; import { TGetOperationStatusResp, - TOperationHandle, - TTableSchema, - TSparkDirectResults, TGetResultSetMetadataResp, - TSparkRowSetType, - TCloseOperationResp, - TOperationState, + TTableSchema, } from '../thrift/TCLIService_types'; import Status from './dto/Status'; import { LogLevel } from './contracts/IDBSQLLogger'; import OperationStateError, { OperationStateErrorCode } from './errors/OperationStateError'; -import IResultsProvider from './result/IResultsProvider'; -import RowSetProvider from './result/RowSetProvider'; -import JsonResultHandler from './result/JsonResultHandler'; -import ArrowResultHandler from './result/ArrowResultHandler'; -import CloudFetchResultHandler from './result/CloudFetchResultHandler'; -import ArrowResultConverter from './result/ArrowResultConverter'; -import ResultSlicer from './result/ResultSlicer'; -import { definedOrError } from './utils'; import { OperationChunksIterator, OperationRowsIterator } from './utils/OperationIterator'; -import HiveDriverError from './errors/HiveDriverError'; import IClientContext from './contracts/IClientContext'; +import IOperationBackend from './contracts/IOperationBackend'; +import { ResultMetadata } from './contracts/ResultMetadata'; +import ThriftOperationBackend from './thrift-backend/ThriftOperationBackend'; +import { synthesizeThriftStatus, synthesizeThriftResultSetMetadata } from './utils/thriftWireSynthesis'; interface DBSQLOperationConstructorOptions { - handle: TOperationHandle; - directResults?: TSparkDirectResults; + backend: IOperationBackend; context: IClientContext; } -async function delay(ms?: number): Promise { - return new Promise((resolve) => { - setTimeout(() => { - resolve(); - }, ms); - }); -} - export default class DBSQLOperation implements IOperation { private readonly context: IClientContext; - private readonly operationHandle: TOperationHandle; + private readonly backend: IOperationBackend; public onClose?: () => void; - private readonly _data: RowSetProvider; - - private readonly closeOperation?: TCloseOperationResp; - private closed: boolean = false; private cancelled: boolean = false; - private metadata?: TGetResultSetMetadataResp; - - private metadataPromise?: Promise; - - private state: TOperationState = TOperationState.INITIALIZED_STATE; - - // Once operation is finished or fails - cache status response, because subsequent calls - // to `getOperationStatus()` may fail with irrelevant errors, e.g. HTTP 404 - private operationStatus?: TGetOperationStatusResp; - - private resultHandler?: ResultSlicer; - - constructor({ handle, directResults, context }: DBSQLOperationConstructorOptions) { - this.operationHandle = handle; - this.context = context; - - const useOnlyPrefetchedResults = Boolean(directResults?.closeOperation); - - if (directResults?.operationStatus) { - this.processOperationStatusResponse(directResults.operationStatus); - } - - this.metadata = directResults?.resultSetMetadata; - this._data = new RowSetProvider( - this.context, - this.operationHandle, - [directResults?.resultSet], - useOnlyPrefetchedResults, - ); - this.closeOperation = directResults?.closeOperation; + constructor(options: DBSQLOperationConstructorOptions) { + this.context = options.context; + this.backend = options.backend; this.context.getLogger().log(LogLevel.debug, `Operation created with id: ${this.id}`); } + public get id() { + return this.backend.id; + } + public iterateChunks(options?: IteratorOptions): IOperationChunksIterator { return new OperationChunksIterator(this, options); } @@ -122,11 +75,6 @@ export default class DBSQLOperation implements IOperation { return Readable.from(iterable, options?.streamOptions); } - public get id() { - const operationId = this.operationHandle?.operationId?.guid; - return operationId ? stringify(operationId) : NIL; - } - /** * Fetches all data * @public @@ -141,8 +89,6 @@ export default class DBSQLOperation implements IOperation { const fetchChunkOptions = { ...options, - // Tell slicer to return raw chunks. We're going to process all of them anyway, - // so no need to additionally buffer and slice chunks returned by server disableBuffering: true, }; @@ -168,70 +114,44 @@ export default class DBSQLOperation implements IOperation { public async fetchChunk(options?: FetchOptions): Promise> { await this.failIfClosed(); - if (!this.operationHandle.hasResultSet) { + if (!this.backend.hasResultSet) { return []; } - await this.waitUntilReady(options); - - const resultHandler = await this.getResultHandler(); + await this.waitUntilReadyThroughBackend(options); await this.failIfClosed(); - // All the library code is Promise-based, however, since Promises are microtasks, - // enqueueing a lot of promises may block macrotasks execution for a while. - // Usually, there are no much microtasks scheduled, however, when fetching query - // results (especially CloudFetch ones) it's quite easy to block event loop for - // long enough to break a lot of things. For example, with CloudFetch, after first - // set of files are downloaded and being processed immediately one by one, event - // loop easily gets blocked for enough time to break connection pool. `http.Agent` - // stops receiving socket events, and marks all sockets invalid on the next attempt - // to use them. See these similar issues that helped to debug this particular case - - // https://github.com/nodejs/node/issues/47130 and https://github.com/node-fetch/node-fetch/issues/1735 - // This simple fix allows to clean up a microtasks queue and allow Node to process - // macrotasks as well, allowing the normal operation of other code. Also, this - // fix is added to `fetchChunk` method because, unlike other methods, `fetchChunk` is - // a potential source of issues described above - await new Promise((resolve) => { - setTimeout(resolve, 0); - }); - const defaultMaxRows = this.context.getConfig().fetchChunkDefaultMaxRows; - - const result = resultHandler.fetchNext({ - limit: options?.maxRows ?? defaultMaxRows, - disableBuffering: options?.disableBuffering, - }); + const limit = options?.maxRows ?? defaultMaxRows; + const result = await this.backend.fetchChunk({ limit, disableBuffering: options?.disableBuffering }); await this.failIfClosed(); - this.context - .getLogger() - .log( - LogLevel.debug, - `Fetched chunk of size: ${options?.maxRows ?? defaultMaxRows} from operation with id: ${this.id}`, - ); + this.context.getLogger().log(LogLevel.debug, `Fetched chunk of size: ${limit} from operation with id: ${this.id}`); return result; } /** - * Requests operation status + * Requests operation status. Returns the Thrift wire response for + * back-compat with existing user code. On the Thrift backend the response + * is returned verbatim; on any other backend (e.g. SEA) the response is + * synthesized from the neutral {@link IOperationBackend.status} result, + * with Thrift-only fields (`taskStatus`, `numModifiedRows`, etc.) left + * undefined. + * * @param progress * @throws {StatusError} */ public async status(progress: boolean = false): Promise { await this.failIfClosed(); this.context.getLogger().log(LogLevel.debug, `Fetching status for operation with id: ${this.id}`); - - if (this.operationStatus) { - return this.operationStatus; + if (this.backend instanceof ThriftOperationBackend) { + // Zero-loss path: the Thrift backend has the wire response on hand. + return this.backend.thriftStatusResponse(progress); } - - const driver = await this.context.getDriver(); - const response = await driver.getOperationStatus({ - operationHandle: this.operationHandle, - getProgressUpdate: progress, - }); - - return this.processOperationStatusResponse(response); + // Non-Thrift backend: synthesize the Thrift-shaped response from the + // neutral OperationStatus DTO. + const status = await this.backend.status(progress); + return synthesizeThriftStatus(status); } /** @@ -242,18 +162,8 @@ export default class DBSQLOperation implements IOperation { if (this.closed || this.cancelled) { return Status.success(); } - - this.context.getLogger().log(LogLevel.debug, `Cancelling operation with id: ${this.id}`); - - const driver = await this.context.getDriver(); - const response = await driver.cancelOperation({ - operationHandle: this.operationHandle, - }); - Status.assert(response.status); + const result = await this.backend.cancel(); this.cancelled = true; - const result = new Status(response.status); - - // Cancelled operation becomes unusable, similarly to being closed this.onClose?.(); return result; } @@ -266,63 +176,66 @@ export default class DBSQLOperation implements IOperation { if (this.closed || this.cancelled) { return Status.success(); } - - this.context.getLogger().log(LogLevel.debug, `Closing operation with id: ${this.id}`); - - const driver = await this.context.getDriver(); - const response = - this.closeOperation ?? - (await driver.closeOperation({ - operationHandle: this.operationHandle, - })); - Status.assert(response.status); + const result = await this.backend.close(); this.closed = true; - const result = new Status(response.status); - this.onClose?.(); return result; } public async finished(options?: FinishedOptions): Promise { await this.failIfClosed(); - await this.waitUntilReady(options); + await this.waitUntilReadyThroughBackend(options); } public async hasMoreRows(): Promise { - // If operation is closed or cancelled - we should not try to get data from it if (this.closed || this.cancelled) { return false; } - // Wait for operation to finish before checking for more rows - // This ensures metadata can be fetched successfully - if (this.operationHandle.hasResultSet) { - await this.waitUntilReady(); + if (this.backend.hasResultSet) { + await this.waitUntilReadyThroughBackend(); } - // If we fetched all the data from server - check if there's anything buffered in result handler - const resultHandler = await this.getResultHandler(); - return resultHandler.hasMore(); + return this.backend.hasMore(); } public async getSchema(options?: GetSchemaOptions): Promise { await this.failIfClosed(); - if (!this.operationHandle.hasResultSet) { + if (!this.backend.hasResultSet) { return null; } - await this.waitUntilReady(options); + await this.waitUntilReadyThroughBackend(options); this.context.getLogger().log(LogLevel.debug, `Fetching schema for operation with id: ${this.id}`); - const metadata = await this.fetchMetadata(); + const metadata = await this.backend.getResultMetadata(); return metadata.schema ?? null; } + public async getResultMetadata(): Promise { + await this.failIfClosed(); + await this.waitUntilReadyThroughBackend(); + return this.backend.getResultMetadata(); + } + + /** + * Fetch result-set metadata as the Thrift wire response. Kept for + * back-compat with existing user code. On the Thrift backend the wire + * response is returned verbatim; on any other backend the response is + * synthesized from the neutral {@link ResultMetadata}, with Thrift-only + * fields (`cacheLookupResult`, `uncompressedBytes`, `compressedBytes`, + * `status`) left undefined / defaulted. + * + * Prefer {@link DBSQLOperation.getResultMetadata} in new code. + */ public async getMetadata(): Promise { await this.failIfClosed(); - await this.waitUntilReady(); - return this.fetchMetadata(); + await this.waitUntilReadyThroughBackend(); + if (this.backend instanceof ThriftOperationBackend) { + return this.backend.thriftResultMetadataResponse(); + } + return synthesizeThriftResultSetMetadata(await this.backend.getResultMetadata()); } private async failIfClosed(): Promise { @@ -334,151 +247,20 @@ export default class DBSQLOperation implements IOperation { } } - private async waitUntilReady(options?: WaitUntilReadyOptions) { - if (this.state === TOperationState.FINISHED_STATE) { - return; - } - - let isReady = false; - - while (!isReady) { - // eslint-disable-next-line no-await-in-loop - const response = await this.status(Boolean(options?.progress)); - - if (options?.callback) { - // eslint-disable-next-line no-await-in-loop - await Promise.resolve(options.callback(response)); - } - - switch (response.operationState) { - // For these states do nothing and continue waiting - case TOperationState.INITIALIZED_STATE: - case TOperationState.PENDING_STATE: - case TOperationState.RUNNING_STATE: - break; - - // Operation is completed, so exit the loop - case TOperationState.FINISHED_STATE: - isReady = true; - break; - - // Operation was cancelled, so set a flag and exit the loop (throw an error) - case TOperationState.CANCELED_STATE: + private async waitUntilReadyThroughBackend(options?: WaitUntilReadyOptions) { + try { + await this.backend.waitUntilReady(options); + } catch (err) { + // Reflect terminal states back into facade flags so subsequent calls + // short-circuit via failIfClosed(). + if (err instanceof OperationStateError) { + if (err.errorCode === OperationStateErrorCode.Canceled) { this.cancelled = true; - throw new OperationStateError(OperationStateErrorCode.Canceled, response); - - // Operation was closed, so set a flag and exit the loop (throw an error) - case TOperationState.CLOSED_STATE: + } else if (err.errorCode === OperationStateErrorCode.Closed) { this.closed = true; - throw new OperationStateError(OperationStateErrorCode.Closed, response); - - // Error states - throw and exit the loop - case TOperationState.ERROR_STATE: - throw new OperationStateError(OperationStateErrorCode.Error, response); - case TOperationState.TIMEDOUT_STATE: - throw new OperationStateError(OperationStateErrorCode.Timeout, response); - case TOperationState.UKNOWN_STATE: - default: - throw new OperationStateError(OperationStateErrorCode.Unknown, response); + } } - - // If not ready yet - make some delay before the next status requests - if (!isReady) { - // eslint-disable-next-line no-await-in-loop - await delay(100); - } - } - } - - private async fetchMetadata() { - // If metadata is already cached, return it immediately - if (this.metadata) { - return this.metadata; + throw err; } - - // If a fetch is already in progress, wait for it to complete - if (this.metadataPromise) { - return this.metadataPromise; - } - - // Start a new fetch and cache the promise to prevent concurrent fetches - this.metadataPromise = (async () => { - const driver = await this.context.getDriver(); - const metadata = await driver.getResultSetMetadata({ - operationHandle: this.operationHandle, - }); - Status.assert(metadata.status); - this.metadata = metadata; - return metadata; - })(); - - try { - return await this.metadataPromise; - } finally { - // Clear the promise once completed (success or failure) - this.metadataPromise = undefined; - } - } - - private async getResultHandler(): Promise> { - const metadata = await this.fetchMetadata(); - const resultFormat = definedOrError(metadata.resultFormat); - - if (!this.resultHandler) { - let resultSource: IResultsProvider> | undefined; - - switch (resultFormat) { - case TSparkRowSetType.COLUMN_BASED_SET: - resultSource = new JsonResultHandler(this.context, this._data, metadata); - break; - case TSparkRowSetType.ARROW_BASED_SET: - resultSource = new ArrowResultConverter( - this.context, - new ArrowResultHandler(this.context, this._data, metadata), - metadata, - ); - break; - case TSparkRowSetType.URL_BASED_SET: - resultSource = new ArrowResultConverter( - this.context, - new CloudFetchResultHandler(this.context, this._data, metadata), - metadata, - ); - break; - // no default - } - - if (resultSource) { - this.resultHandler = new ResultSlicer(this.context, resultSource); - } - } - - if (!this.resultHandler) { - throw new HiveDriverError(`Unsupported result format: ${TSparkRowSetType[resultFormat]}`); - } - - return this.resultHandler; - } - - private processOperationStatusResponse(response: TGetOperationStatusResp) { - Status.assert(response.status); - - this.state = response.operationState ?? this.state; - - if (typeof response.hasResultSet === 'boolean') { - this.operationHandle.hasResultSet = response.hasResultSet; - } - - const isInProgress = [ - TOperationState.INITIALIZED_STATE, - TOperationState.PENDING_STATE, - TOperationState.RUNNING_STATE, - ].includes(this.state); - - if (!isInProgress) { - this.operationStatus = response; - } - - return response; } } diff --git a/lib/DBSQLSession.ts b/lib/DBSQLSession.ts index 95715e1b..0e1cc934 100644 --- a/lib/DBSQLSession.ts +++ b/lib/DBSQLSession.ts @@ -2,19 +2,7 @@ import * as fs from 'fs'; import * as path from 'path'; import stream from 'node:stream'; import util from 'node:util'; -import { stringify, NIL } from 'uuid'; -import Int64 from 'node-int64'; import fetch, { HeadersInit } from 'node-fetch'; -import { - TSessionHandle, - TStatus, - TOperationHandle, - TSparkDirectResults, - TSparkArrowTypes, - TSparkParameter, - TProtocolVersion, - TExecuteStatementReq, -} from '../thrift/TCLIService_types'; import IDBSQLSession, { ExecuteStatementOptions, TypeInfoRequest, @@ -31,153 +19,44 @@ import IOperation from './contracts/IOperation'; import DBSQLOperation from './DBSQLOperation'; import Status from './dto/Status'; import InfoValue from './dto/InfoValue'; -import { definedOrError, LZ4, ProtocolVersion, serializeQueryTags } from './utils'; import CloseableCollection from './utils/CloseableCollection'; import { LogLevel } from './contracts/IDBSQLLogger'; import HiveDriverError from './errors/HiveDriverError'; import StagingError from './errors/StagingError'; -import { DBSQLParameter, DBSQLParameterValue } from './DBSQLParameter'; -import ParameterError from './errors/ParameterError'; -import IClientContext, { ClientConfig } from './contracts/IClientContext'; +import IClientContext from './contracts/IClientContext'; +import ISessionBackend from './contracts/ISessionBackend'; +import IOperationBackend from './contracts/IOperationBackend'; // Explicitly promisify a callback-style `pipeline` because `node:stream/promises` is not available in Node 14 const pipeline = util.promisify(stream.pipeline); -interface OperationResponseShape { - status: TStatus; - operationHandle?: TOperationHandle; - directResults?: TSparkDirectResults; -} - -export function numberToInt64(value: number | bigint | Int64): Int64 { - if (value instanceof Int64) { - return value; - } - - if (typeof value === 'bigint') { - const buffer = new ArrayBuffer(BigInt64Array.BYTES_PER_ELEMENT); - const view = new DataView(buffer); - view.setBigInt64(0, value, false); // `false` to use big-endian order - return new Int64(Buffer.from(buffer)); - } - - return new Int64(value); -} - -function getDirectResultsOptions(maxRows: number | bigint | Int64 | null | undefined, config: ClientConfig) { - if (maxRows === null) { - return {}; - } - - return { - getDirectResults: { - maxRows: numberToInt64(maxRows ?? config.directResultsDefaultMaxRows), - }, - }; -} - -function getArrowOptions( - config: ClientConfig, - serverProtocolVersion: TProtocolVersion | undefined | null, -): { - canReadArrowResult: boolean; - useArrowNativeTypes?: TSparkArrowTypes; -} { - const { arrowEnabled = true, useArrowNativeTypes = true } = config; - - if (!arrowEnabled || !ProtocolVersion.supportsArrowMetadata(serverProtocolVersion)) { - return { - canReadArrowResult: false, - }; - } - - return { - canReadArrowResult: true, - useArrowNativeTypes: { - timestampAsArrow: useArrowNativeTypes, - decimalAsArrow: useArrowNativeTypes, - complexTypesAsArrow: useArrowNativeTypes, - // TODO: currently unsupported by `apache-arrow` (see https://github.com/streamlit/streamlit/issues/4489) - intervalTypesAsArrow: false, - }, - }; -} - -function getQueryParameters( - namedParameters?: Record, - ordinalParameters?: Array, -): Array { - const namedParametersProvided = namedParameters !== undefined && Object.keys(namedParameters).length > 0; - const ordinalParametersProvided = ordinalParameters !== undefined && ordinalParameters.length > 0; - - if (namedParametersProvided && ordinalParametersProvided) { - throw new ParameterError('Driver does not support both ordinal and named parameters.'); - } - - if (!namedParametersProvided && !ordinalParametersProvided) { - return []; - } - - const result: Array = []; - - if (namedParameters !== undefined) { - for (const name of Object.keys(namedParameters)) { - const value = namedParameters[name]; - const param = value instanceof DBSQLParameter ? value : new DBSQLParameter({ value }); - result.push(param.toSparkParameter({ name })); - } - } - - if (ordinalParameters !== undefined) { - for (const value of ordinalParameters) { - const param = value instanceof DBSQLParameter ? value : new DBSQLParameter({ value }); - result.push(param.toSparkParameter()); - } - } - - return result; -} +// Re-export for back-compat with existing imports. +export { numberToInt64 } from './thrift-backend/ThriftSessionBackend'; interface DBSQLSessionConstructorOptions { - handle: TSessionHandle; + backend: ISessionBackend; context: IClientContext; - serverProtocolVersion?: TProtocolVersion; } export default class DBSQLSession implements IDBSQLSession { private readonly context: IClientContext; - private readonly sessionHandle: TSessionHandle; + private readonly backend: ISessionBackend; private isOpen = true; - private serverProtocolVersion?: TProtocolVersion; - public onClose?: () => void; private operations = new CloseableCollection(); - /** - * Helper method to determine if runAsync should be set for metadata operations - * @private - * @returns true if supported by protocol version, undefined otherwise - */ - private getRunAsyncForMetadataOperations(): boolean | undefined { - return ProtocolVersion.supportsAsyncMetadataOperations(this.serverProtocolVersion) ? true : undefined; - } - - constructor({ handle, context, serverProtocolVersion }: DBSQLSessionConstructorOptions) { - this.sessionHandle = handle; - this.context = context; - // Get the server protocol version from the provided parameter (from TOpenSessionResp) - this.serverProtocolVersion = serverProtocolVersion; + constructor(options: DBSQLSessionConstructorOptions) { + this.context = options.context; + this.backend = options.backend; this.context.getLogger().log(LogLevel.debug, `Session created with id: ${this.id}`); - this.context.getLogger().log(LogLevel.debug, `Server protocol version: ${this.serverProtocolVersion}`); } public get id() { - const sessionId = this.sessionHandle?.sessionId?.guid; - return sessionId ? stringify(sessionId) : NIL; + return this.backend.id; } /** @@ -190,14 +69,9 @@ export default class DBSQLSession implements IDBSQLSession { */ public async getInfo(infoType: number): Promise { await this.failIfClosed(); - const driver = await this.context.getDriver(); - const operationPromise = driver.getInfo({ - sessionHandle: this.sessionHandle, - infoType, - }); - const response = await this.handleResponse(operationPromise); - Status.assert(response.status); - return new InfoValue(response.infoValue); + const result = await this.backend.getInfo(infoType); + await this.failIfClosed(); + return result; } /** @@ -211,48 +85,13 @@ export default class DBSQLSession implements IDBSQLSession { */ public async executeStatement(statement: string, options: ExecuteStatementOptions = {}): Promise { await this.failIfClosed(); - const driver = await this.context.getDriver(); - const clientConfig = this.context.getConfig(); - - const request = new TExecuteStatementReq({ - sessionHandle: this.sessionHandle, - statement, - queryTimeout: options.queryTimeout ? numberToInt64(options.queryTimeout) : undefined, - runAsync: true, - ...getDirectResultsOptions(options.maxRows, clientConfig), - ...getArrowOptions(clientConfig, this.serverProtocolVersion), - }); - - if (ProtocolVersion.supportsParameterizedQueries(this.serverProtocolVersion)) { - request.parameters = getQueryParameters(options.namedParameters, options.ordinalParameters); - } - - const serializedQueryTags = serializeQueryTags(options.queryTags); - if (serializedQueryTags !== undefined) { - request.confOverlay = { ...request.confOverlay, query_tags: serializedQueryTags }; - } - - if (ProtocolVersion.supportsCloudFetch(this.serverProtocolVersion)) { - request.canDownloadResult = options.useCloudFetch ?? clientConfig.useCloudFetch; - } - - if (ProtocolVersion.supportsArrowCompression(this.serverProtocolVersion) && request.canDownloadResult !== true) { - request.canDecompressLZ4Result = (options.useLZ4Compression ?? clientConfig.useLZ4Compression) && Boolean(LZ4()); - } + const opBackend = await this.backend.executeStatement(statement, options); + await this.failIfClosed(); + const operation = this.wrapOperation(opBackend); - const operationPromise = driver.executeStatement(request); - const response = await this.handleResponse(operationPromise); - const operation = this.createOperation(response); - - // If `stagingAllowedLocalPath` is provided - assume that operation possibly may be a staging operation. - // To know for sure, fetch metadata and check a `isStagingOperation` flag. If it happens that it wasn't - // a staging operation - not a big deal, we just fetched metadata earlier, but operation is still usable - // and user can get data from it. - // If `stagingAllowedLocalPath` is not provided - don't do anything to the operation. In a case of regular - // operation, everything will work as usual. In a case of staging operation, it will be processed like any - // other query - it will be possible to get data from it as usual, or use other operation methods. + // Staging detection: only run when stagingAllowedLocalPath is provided. if (options.stagingAllowedLocalPath !== undefined) { - const metadata = await operation.getMetadata(); + const metadata = await operation.getResultMetadata(); if (metadata.isStagingOperation) { const allowedLocalPath = Array.isArray(options.stagingAllowedLocalPath) ? options.stagingAllowedLocalPath @@ -276,7 +115,6 @@ export default class DBSQLSession implements IDBSQLSession { } const row = rows[0] as StagingResponse; - // For REMOVE operation local file is not available, so no need to validate it if (row.localFile !== undefined) { let allowOperation = false; @@ -328,7 +166,6 @@ export default class DBSQLSession implements IDBSQLSession { } const fileStream = fs.createWriteStream(localFile); - // `pipeline` will do all the dirty job for us, including error handling and closing all the streams properly return pipeline(response.body, fileStream); } @@ -337,13 +174,6 @@ export default class DBSQLSession implements IDBSQLSession { const agent = await connectionProvider.getAgent(); const response = await fetch(presignedUrl, { method: 'DELETE', headers, agent }); - // Looks that AWS and Azure have a different behavior of HTTP `DELETE` for non-existing files. - // AWS assumes that - since file already doesn't exist - the goal is achieved, and returns HTTP 200. - // Azure, on the other hand, is somewhat stricter and check if file exists before deleting it. And if - // file doesn't exist - Azure returns HTTP 404. - // - // For us, it's totally okay if file didn't exist before removing. So when we get an HTTP 404 - - // just ignore it and report success. This way we can have a uniform library behavior for all clouds if (!response.ok && response.status !== 404) { throw new StagingError(`HTTP error ${response.status} ${response.statusText}`); } @@ -368,7 +198,6 @@ export default class DBSQLSession implements IDBSQLSession { method: 'PUT', headers: { ...headers, - // This header is required by server 'Content-Length': fileInfo.size.toString(), }, agent, @@ -387,16 +216,9 @@ export default class DBSQLSession implements IDBSQLSession { */ public async getTypeInfo(request: TypeInfoRequest = {}): Promise { await this.failIfClosed(); - const driver = await this.context.getDriver(); - const clientConfig = this.context.getConfig(); - - const operationPromise = driver.getTypeInfo({ - sessionHandle: this.sessionHandle, - runAsync: this.getRunAsyncForMetadataOperations(), - ...getDirectResultsOptions(request.maxRows, clientConfig), - }); - const response = await this.handleResponse(operationPromise); - return this.createOperation(response); + const opBackend = await this.backend.getTypeInfo(request); + await this.failIfClosed(); + return this.wrapOperation(opBackend); } /** @@ -407,16 +229,9 @@ export default class DBSQLSession implements IDBSQLSession { */ public async getCatalogs(request: CatalogsRequest = {}): Promise { await this.failIfClosed(); - const driver = await this.context.getDriver(); - const clientConfig = this.context.getConfig(); - - const operationPromise = driver.getCatalogs({ - sessionHandle: this.sessionHandle, - runAsync: this.getRunAsyncForMetadataOperations(), - ...getDirectResultsOptions(request.maxRows, clientConfig), - }); - const response = await this.handleResponse(operationPromise); - return this.createOperation(response); + const opBackend = await this.backend.getCatalogs(request); + await this.failIfClosed(); + return this.wrapOperation(opBackend); } /** @@ -427,18 +242,9 @@ export default class DBSQLSession implements IDBSQLSession { */ public async getSchemas(request: SchemasRequest = {}): Promise { await this.failIfClosed(); - const driver = await this.context.getDriver(); - const clientConfig = this.context.getConfig(); - - const operationPromise = driver.getSchemas({ - sessionHandle: this.sessionHandle, - catalogName: request.catalogName, - schemaName: request.schemaName, - runAsync: this.getRunAsyncForMetadataOperations(), - ...getDirectResultsOptions(request.maxRows, clientConfig), - }); - const response = await this.handleResponse(operationPromise); - return this.createOperation(response); + const opBackend = await this.backend.getSchemas(request); + await this.failIfClosed(); + return this.wrapOperation(opBackend); } /** @@ -449,20 +255,9 @@ export default class DBSQLSession implements IDBSQLSession { */ public async getTables(request: TablesRequest = {}): Promise { await this.failIfClosed(); - const driver = await this.context.getDriver(); - const clientConfig = this.context.getConfig(); - - const operationPromise = driver.getTables({ - sessionHandle: this.sessionHandle, - catalogName: request.catalogName, - schemaName: request.schemaName, - tableName: request.tableName, - tableTypes: request.tableTypes, - runAsync: this.getRunAsyncForMetadataOperations(), - ...getDirectResultsOptions(request.maxRows, clientConfig), - }); - const response = await this.handleResponse(operationPromise); - return this.createOperation(response); + const opBackend = await this.backend.getTables(request); + await this.failIfClosed(); + return this.wrapOperation(opBackend); } /** @@ -473,16 +268,9 @@ export default class DBSQLSession implements IDBSQLSession { */ public async getTableTypes(request: TableTypesRequest = {}): Promise { await this.failIfClosed(); - const driver = await this.context.getDriver(); - const clientConfig = this.context.getConfig(); - - const operationPromise = driver.getTableTypes({ - sessionHandle: this.sessionHandle, - runAsync: this.getRunAsyncForMetadataOperations(), - ...getDirectResultsOptions(request.maxRows, clientConfig), - }); - const response = await this.handleResponse(operationPromise); - return this.createOperation(response); + const opBackend = await this.backend.getTableTypes(request); + await this.failIfClosed(); + return this.wrapOperation(opBackend); } /** @@ -493,20 +281,9 @@ export default class DBSQLSession implements IDBSQLSession { */ public async getColumns(request: ColumnsRequest = {}): Promise { await this.failIfClosed(); - const driver = await this.context.getDriver(); - const clientConfig = this.context.getConfig(); - - const operationPromise = driver.getColumns({ - sessionHandle: this.sessionHandle, - catalogName: request.catalogName, - schemaName: request.schemaName, - tableName: request.tableName, - columnName: request.columnName, - runAsync: this.getRunAsyncForMetadataOperations(), - ...getDirectResultsOptions(request.maxRows, clientConfig), - }); - const response = await this.handleResponse(operationPromise); - return this.createOperation(response); + const opBackend = await this.backend.getColumns(request); + await this.failIfClosed(); + return this.wrapOperation(opBackend); } /** @@ -517,36 +294,16 @@ export default class DBSQLSession implements IDBSQLSession { */ public async getFunctions(request: FunctionsRequest): Promise { await this.failIfClosed(); - const driver = await this.context.getDriver(); - const clientConfig = this.context.getConfig(); - - const operationPromise = driver.getFunctions({ - sessionHandle: this.sessionHandle, - catalogName: request.catalogName, - schemaName: request.schemaName, - functionName: request.functionName, - runAsync: this.getRunAsyncForMetadataOperations(), - ...getDirectResultsOptions(request.maxRows, clientConfig), - }); - const response = await this.handleResponse(operationPromise); - return this.createOperation(response); + const opBackend = await this.backend.getFunctions(request); + await this.failIfClosed(); + return this.wrapOperation(opBackend); } public async getPrimaryKeys(request: PrimaryKeysRequest): Promise { await this.failIfClosed(); - const driver = await this.context.getDriver(); - const clientConfig = this.context.getConfig(); - - const operationPromise = driver.getPrimaryKeys({ - sessionHandle: this.sessionHandle, - catalogName: request.catalogName, - schemaName: request.schemaName, - tableName: request.tableName, - runAsync: this.getRunAsyncForMetadataOperations(), - ...getDirectResultsOptions(request.maxRows, clientConfig), - }); - const response = await this.handleResponse(operationPromise); - return this.createOperation(response); + const opBackend = await this.backend.getPrimaryKeys(request); + await this.failIfClosed(); + return this.wrapOperation(opBackend); } /** @@ -557,22 +314,9 @@ export default class DBSQLSession implements IDBSQLSession { */ public async getCrossReference(request: CrossReferenceRequest): Promise { await this.failIfClosed(); - const driver = await this.context.getDriver(); - const clientConfig = this.context.getConfig(); - - const operationPromise = driver.getCrossReference({ - sessionHandle: this.sessionHandle, - parentCatalogName: request.parentCatalogName, - parentSchemaName: request.parentSchemaName, - parentTableName: request.parentTableName, - foreignCatalogName: request.foreignCatalogName, - foreignSchemaName: request.foreignSchemaName, - foreignTableName: request.foreignTableName, - runAsync: this.getRunAsyncForMetadataOperations(), - ...getDirectResultsOptions(request.maxRows, clientConfig), - }); - const response = await this.handleResponse(operationPromise); - return this.createOperation(response); + const opBackend = await this.backend.getCrossReference(request); + await this.failIfClosed(); + return this.wrapOperation(opBackend); } /** @@ -585,35 +329,20 @@ export default class DBSQLSession implements IDBSQLSession { return Status.success(); } - // Close owned operations one by one, removing successfully closed ones from the list await this.operations.closeAll(); - const driver = await this.context.getDriver(); - const response = await driver.closeSession({ - sessionHandle: this.sessionHandle, - }); - // check status for being successful - Status.assert(response.status); + const status = await this.backend.close(); - // notify owner connection this.onClose?.(); this.isOpen = false; this.context.getLogger().log(LogLevel.debug, `Session closed with id: ${this.id}`); - return new Status(response.status); + return status; } - private createOperation(response: OperationResponseShape): DBSQLOperation { - Status.assert(response.status); - const handle = definedOrError(response.operationHandle); - const operation = new DBSQLOperation({ - handle, - directResults: response.directResults, - context: this.context, - }); - + private wrapOperation(backend: IOperationBackend): DBSQLOperation { + const operation = new DBSQLOperation({ backend, context: this.context }); this.operations.add(operation); - return operation; } @@ -622,13 +351,4 @@ export default class DBSQLSession implements IDBSQLSession { throw new HiveDriverError('The session was closed or has expired'); } } - - private async handleResponse(requestPromise: Promise): Promise { - // Currently, after being closed sessions remains usable - server will not - // error out when trying to run operations on closed session. So it's - // basically useless to process any errors here - const result = await requestPromise; - await this.failIfClosed(); - return result; - } } diff --git a/lib/contracts/IBackend.ts b/lib/contracts/IBackend.ts new file mode 100644 index 00000000..2e5edd16 --- /dev/null +++ b/lib/contracts/IBackend.ts @@ -0,0 +1,34 @@ +import { ConnectionOptions, OpenSessionRequest } from './IDBSQLClient'; +import ISessionBackend from './ISessionBackend'; + +/** + * Top-level backend dispatch handle. One instance per `DBSQLClient`, + * chosen at `connect()` time based on the `useSEA` flag and never + * re-selected per-call. + */ +export default interface IBackend { + /** + * Establish backend-level state before any session is opened. Implementations + * consume `options` to build backend-specific connection parameters (e.g. the + * SEA backend derives napi-binding `SeaNativeConnectionOptions` from the auth + * + host fields here). Transport-layer connection providers are owned by + * `DBSQLClient` (via `IClientContext`) and exposed to backends through + * constructor injection. + */ + connect(options: ConnectionOptions): Promise; + + /** + * Open a session. Returned `ISessionBackend` is owned by the caller + * and torn down via its own `close()`. + */ + openSession(request: OpenSessionRequest): Promise; + + /** + * Backend-level teardown. Transport-layer cleanup (connection provider, + * thrift client, auth provider) is owned by `DBSQLClient` and runs + * after this returns. Implementations release backend-internal resources + * here, and MUST be safe to call on a partially-initialized backend + * (i.e. after a failed `connect()`). + */ + close(): Promise; +} diff --git a/lib/contracts/IOperation.ts b/lib/contracts/IOperation.ts index 1d0bb9a1..bbeed622 100644 --- a/lib/contracts/IOperation.ts +++ b/lib/contracts/IOperation.ts @@ -1,6 +1,7 @@ import { Readable, ReadableOptions } from 'node:stream'; import { TGetOperationStatusResp, TTableSchema } from '../../thrift/TCLIService_types'; import Status from '../dto/Status'; +import { ResultMetadata } from './ResultMetadata'; export type OperationStatusCallback = (progress: TGetOperationStatusResp) => unknown; @@ -59,7 +60,10 @@ export default interface IOperation { fetchAll(options?: FetchOptions): Promise>; /** - * Request status of operation + * Request status of operation. Returns the Thrift wire response for + * back-compat. New code should prefer {@link IOperation.getResultMetadata} + * for metadata and may consume the neutral `IOperationBackend.status` via + * a typed downcast when implementing alternative backends. * * @param progress */ @@ -90,6 +94,12 @@ export default interface IOperation { */ getSchema(options?: GetSchemaOptions): Promise; + /** + * Fetch result-set metadata in the backend-neutral `ResultMetadata` shape. + * Prefer this over the Thrift-shaped surface for new code. + */ + getResultMetadata(): Promise; + iterateChunks(options?: IteratorOptions): IOperationChunksIterator; iterateRows(options?: IteratorOptions): IOperationRowsIterator; diff --git a/lib/contracts/IOperationBackend.ts b/lib/contracts/IOperationBackend.ts new file mode 100644 index 00000000..4c17020b --- /dev/null +++ b/lib/contracts/IOperationBackend.ts @@ -0,0 +1,55 @@ +import Status from '../dto/Status'; +import { WaitUntilReadyOptions } from './IOperation'; +import { OperationStatus } from './OperationStatus'; +import { ResultMetadata } from './ResultMetadata'; + +/** + * What a `DBSQLOperation` needs from its backend. Returned by + * `ISessionBackend.executeStatement` and the metadata methods. + */ +export default interface IOperationBackend { + /** Operation identifier. */ + readonly id: string; + + /** + * Whether this operation has a result set. Initial value may be derived + * from the create-operation response; implementations MUST refresh it + * from terminal status responses (the Thrift impl updates + * `operationHandle.hasResultSet` inside `processOperationStatusResponse`). + * `readonly` here means external callers cannot reassign the property — + * not that the underlying value is fixed at construction time. + */ + readonly hasResultSet: boolean; + + /** Fetch the next chunk of result rows. */ + fetchChunk(options: { limit: number; disableBuffering?: boolean }): Promise>; + + /** Whether more rows are available beyond what has been fetched. */ + hasMore(): Promise; + + /** + * Poll the backend until the operation reaches a terminal state. + * + * MUST throw `OperationStateError` (with one of `OperationStateErrorCode.{Canceled, + * Closed, Error, Timeout, Unknown}`) on terminal non-success states. The + * `DBSQLOperation` facade depends on `Canceled` and `Closed` codes to mirror + * the operation into its closed/cancelled flags; future implementations must + * use the same error type for the facade to stay in sync. + */ + waitUntilReady(options?: WaitUntilReadyOptions): Promise; + + /** + * Fetch operation status as a neutral `OperationStatus`. Pass `progress: true` + * to request that the backend include a progress payload. + */ + status(progress: boolean): Promise; + + /** Fetch result-set metadata (schema, format, lz4 flag, arrow schema, staging flag). */ + getResultMetadata(): Promise; + + /** Cancel the operation. */ + cancel(): Promise; + + /** Close the operation. Idempotent. */ + close(): Promise; +} diff --git a/lib/contracts/ISessionBackend.ts b/lib/contracts/ISessionBackend.ts new file mode 100644 index 00000000..2404dc68 --- /dev/null +++ b/lib/contracts/ISessionBackend.ts @@ -0,0 +1,60 @@ +import IOperationBackend from './IOperationBackend'; +import { + ExecuteStatementOptions, + TypeInfoRequest, + CatalogsRequest, + SchemasRequest, + TablesRequest, + TableTypesRequest, + ColumnsRequest, + FunctionsRequest, + PrimaryKeysRequest, + CrossReferenceRequest, +} from './IDBSQLSession'; +import Status from '../dto/Status'; +import InfoValue from '../dto/InfoValue'; + +/** + * What a `DBSQLSession` needs from its backend. Returned by + * `IBackend.openSession()`. Lifecycle tied to a single `DBSQLSession`. + */ +export default interface ISessionBackend { + /** Session identifier. */ + readonly id: string; + + /** Returns general information about the data source. */ + getInfo(infoType: number): Promise; + + /** Executes DDL/DML statements. */ + executeStatement(statement: string, options: ExecuteStatementOptions): Promise; + + /** Information about supported data types. */ + getTypeInfo(request: TypeInfoRequest): Promise; + + /** List of catalogs. */ + getCatalogs(request: CatalogsRequest): Promise; + + /** List of schemas. */ + getSchemas(request: SchemasRequest): Promise; + + /** List of tables. */ + getTables(request: TablesRequest): Promise; + + /** List of supported table types. */ + getTableTypes(request: TableTypesRequest): Promise; + + /** Full column information for a table. */ + getColumns(request: ColumnsRequest): Promise; + + /** Information about a function. */ + getFunctions(request: FunctionsRequest): Promise; + + /** Primary keys of a table. */ + getPrimaryKeys(request: PrimaryKeysRequest): Promise; + + /** Foreign-key relationships between two tables. */ + getCrossReference(request: CrossReferenceRequest): Promise; + + /** Close the session. Idempotent. */ + close(): Promise; +} diff --git a/lib/contracts/InternalConnectionOptions.ts b/lib/contracts/InternalConnectionOptions.ts new file mode 100644 index 00000000..a115aa47 --- /dev/null +++ b/lib/contracts/InternalConnectionOptions.ts @@ -0,0 +1,21 @@ +/** + * Internal, non-exported extension of `ConnectionOptions`. Carries M0-only + * flags that should not appear in the published `.d.ts`. + * + * Matches the Python connector pattern: there, `use_sea` is consumed via + * `kwargs.get("use_sea", False)` and is intentionally absent from the typed + * signature (see `databricks-sql-python/src/databricks/sql/session.py`). + * + * Callers cast `ConnectionOptions` to this type *only* at the read site + * inside the driver; user code that wants to set `useSEA` may still do so + * via an untyped object literal — the option is not part of the public + * contract and may be removed without notice. + */ +export interface InternalConnectionOptions { + /** + * Opt-in flag to dispatch through the Statement Execution API (SEA) + * backend instead of the default Thrift backend. Defaults to `false`. + * @internal Not stable; M0 stub only. + */ + useSEA?: boolean; +} diff --git a/lib/contracts/OperationStatus.ts b/lib/contracts/OperationStatus.ts new file mode 100644 index 00000000..7f167aba --- /dev/null +++ b/lib/contracts/OperationStatus.ts @@ -0,0 +1,56 @@ +/** + * Backend-neutral operation state. Mirrors the kernel/pyo3 `StatementStatus` + * and the Python connector's `CommandState`, so a SEA `IOperationBackend` + * implementer can return these without depending on the Thrift wire enum. + * + * Thrift mapping (in `ThriftOperationBackend.adaptOperationStatus`): + * - INITIALIZED_STATE, PENDING_STATE → Pending + * - RUNNING_STATE → Running + * - FINISHED_STATE → Succeeded + * - CANCELED_STATE → Cancelled + * - CLOSED_STATE → Closed + * - ERROR_STATE, TIMEDOUT_STATE → Failed + * - UKNOWN_STATE / anything else → Unknown + */ +export enum OperationState { + Pending = 'Pending', + Running = 'Running', + Succeeded = 'Succeeded', + Failed = 'Failed', + Cancelled = 'Cancelled', + Closed = 'Closed', + Unknown = 'Unknown', +} + +/** + * Neutral status snapshot returned by `IOperationBackend.status()`. Backends + * adapt their wire format at the boundary; callers in `DBSQLOperation` and + * `IOperationBackend.waitUntilReady` switch on `state` alone. + * + * Fields beyond `state` are best-effort and may be undefined depending on + * what the backend exposes. + */ +export interface OperationStatus { + /** Current operation state. */ + state: OperationState; + + /** + * Whether this operation has produced (or is producing) a result set. + * Some backends only know this after the operation reaches a terminal + * state — undefined means "no signal from this backend". + */ + hasResultSet?: boolean; + + /** Human-readable error/display message, if the backend supplied one. */ + errorMessage?: string; + + /** SQL state code (e.g. "42000"), if available. */ + sqlState?: string; + + /** + * Opaque progress payload as returned by the backend when callers pass + * `progress: true`. Treated as untyped by the facade — passed through + * to `WaitUntilReadyOptions.callback` for the consumer to interpret. + */ + progressUpdateResponse?: unknown; +} diff --git a/lib/contracts/ResultMetadata.ts b/lib/contracts/ResultMetadata.ts new file mode 100644 index 00000000..5fc09a79 --- /dev/null +++ b/lib/contracts/ResultMetadata.ts @@ -0,0 +1,39 @@ +import { TTableSchema } from '../../thrift/TCLIService_types'; + +/** + * Backend-neutral result-format taxonomy. Mirrors the three on-wire shapes + * `ThriftOperationBackend` actually dispatches on (`COLUMN_BASED_SET`, + * `ARROW_BASED_SET`, `URL_BASED_SET`); a SEA implementer surfaces the same + * three so result-handling stays format-agnostic. + */ +export enum ResultFormat { + ColumnBased = 'COLUMN_BASED', + ArrowBased = 'ARROW_BASED', + UrlBased = 'URL_BASED', +} + +/** + * Neutral result-set metadata returned by `IOperationBackend.getResultMetadata()`. + * + * `schema` keeps the Thrift `TTableSchema` shape for now because the public + * `DBSQLOperation.getSchema()` and `getMetadata()` already expose it on + * `IOperation`; carrying it across the boundary preserves back-compat. The + * SEA backend will adapt its column descriptors into the same shape until + * the public IOperation surface is migrated in a later PR. + */ +export interface ResultMetadata { + /** Column schema; null if the operation has no result set. */ + schema?: TTableSchema; + + /** Wire format the result handler should dispatch on. */ + resultFormat: ResultFormat; + + /** Whether the result payload is LZ4-compressed. */ + lz4Compressed?: boolean; + + /** Optional Arrow IPC schema bytes (for ARROW_BASED / URL_BASED formats). */ + arrowSchema?: Buffer; + + /** True iff the operation is a staging (PUT/GET/REMOVE) operation. */ + isStagingOperation: boolean; +} diff --git a/lib/sea/SeaBackend.ts b/lib/sea/SeaBackend.ts new file mode 100644 index 00000000..43958679 --- /dev/null +++ b/lib/sea/SeaBackend.ts @@ -0,0 +1,23 @@ +import IBackend from '../contracts/IBackend'; +import ISessionBackend from '../contracts/ISessionBackend'; +import { ConnectionOptions, OpenSessionRequest } from '../contracts/IDBSQLClient'; +import HiveDriverError from '../errors/HiveDriverError'; + +const NOT_IMPLEMENTED = 'SEA backend not implemented yet — wired in sea-napi-binding feature'; + +export default class SeaBackend implements IBackend { + // eslint-disable-next-line @typescript-eslint/no-unused-vars, class-methods-use-this + public async connect(options: ConnectionOptions): Promise { + throw new HiveDriverError(NOT_IMPLEMENTED); + } + + // eslint-disable-next-line @typescript-eslint/no-unused-vars, class-methods-use-this + public async openSession(request: OpenSessionRequest): Promise { + throw new HiveDriverError(NOT_IMPLEMENTED); + } + + // No-op so DBSQLClient.close() can finish its state-clearing block after a + // failed useSEA: true connect. Real teardown lands with the M1 SEA impl. + // eslint-disable-next-line @typescript-eslint/no-empty-function, class-methods-use-this + public async close(): Promise {} +} diff --git a/lib/thrift-backend/ThriftBackend.ts b/lib/thrift-backend/ThriftBackend.ts new file mode 100644 index 00000000..5e0e7570 --- /dev/null +++ b/lib/thrift-backend/ThriftBackend.ts @@ -0,0 +1,100 @@ +import Int64 from 'node-int64'; +import IBackend from '../contracts/IBackend'; +import ISessionBackend from '../contracts/ISessionBackend'; +import IClientContext from '../contracts/IClientContext'; +import { OpenSessionRequest } from '../contracts/IDBSQLClient'; +import { TProtocolVersion } from '../../thrift/TCLIService_types'; +import Status from '../dto/Status'; +import { definedOrError, serializeQueryTags } from '../utils'; +import ThriftSessionBackend from './ThriftSessionBackend'; + +function getInitialNamespaceOptions(catalogName?: string, schemaName?: string) { + if (!catalogName && !schemaName) { + return {}; + } + + return { + initialNamespace: { + catalogName, + schemaName, + }, + }; +} + +interface ThriftBackendOptions { + context: IClientContext; + onConnectionEvent: (event: 'error' | 'reconnecting' | 'close' | 'timeout', payload?: unknown) => void; +} + +export default class ThriftBackend implements IBackend { + private readonly context: IClientContext; + + private readonly onConnectionEvent: ThriftBackendOptions['onConnectionEvent']; + + constructor({ context, onConnectionEvent }: ThriftBackendOptions) { + this.context = context; + this.onConnectionEvent = onConnectionEvent; + } + + public async connect(): Promise { + // The connection provider is owned by DBSQLClient (it implements IClientContext). + // We only need to wire the EventEmitter listeners through this backend. + const connectionProvider = await this.context.getConnectionProvider(); + const thriftConnection = await connectionProvider.getThriftConnection(); + + thriftConnection.on('error', (error: Error) => { + this.onConnectionEvent('error', error); + }); + + thriftConnection.on('reconnecting', (params: { delay: number; attempt: number }) => { + this.onConnectionEvent('reconnecting', params); + }); + + thriftConnection.on('close', () => { + this.onConnectionEvent('close'); + }); + + thriftConnection.on('timeout', () => { + this.onConnectionEvent('timeout'); + }); + } + + public async openSession(request: OpenSessionRequest): Promise { + const driver = await this.context.getDriver(); + const config = this.context.getConfig(); + + const configuration = request.configuration ? { ...request.configuration } : {}; + + if (config.enableMetricViewMetadata) { + configuration['spark.sql.thriftserver.metadata.metricview.enabled'] = 'true'; + } + + if (request.queryTags !== undefined) { + const serialized = serializeQueryTags(request.queryTags); + if (serialized) { + configuration.QUERY_TAGS = serialized; + } else { + delete configuration.QUERY_TAGS; + } + } + + const response = await driver.openSession({ + client_protocol_i64: new Int64(TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V8), + ...getInitialNamespaceOptions(request.initialCatalog, request.initialSchema), + configuration, + canUseMultipleCatalogs: true, + }); + + Status.assert(response.status); + return new ThriftSessionBackend({ + handle: definedOrError(response.sessionHandle), + context: this.context, + serverProtocolVersion: response.serverProtocolVersion, + }); + } + + public async close(): Promise { + // DBSQLClient owns the connection lifecycle and clears its own state + // (connectionProvider, authProvider, thrift client) after this returns. + } +} diff --git a/lib/thrift-backend/ThriftOperationBackend.ts b/lib/thrift-backend/ThriftOperationBackend.ts new file mode 100644 index 00000000..436d4928 --- /dev/null +++ b/lib/thrift-backend/ThriftOperationBackend.ts @@ -0,0 +1,382 @@ +import { stringify, NIL } from 'uuid'; +import { + TGetOperationStatusResp, + TOperationHandle, + TSparkDirectResults, + TGetResultSetMetadataResp, + TSparkRowSetType, + TCloseOperationResp, + TOperationState, +} from '../../thrift/TCLIService_types'; +import IOperationBackend from '../contracts/IOperationBackend'; +import IClientContext from '../contracts/IClientContext'; +import { WaitUntilReadyOptions } from '../contracts/IOperation'; +import { OperationStatus, OperationState } from '../contracts/OperationStatus'; +import { ResultMetadata, ResultFormat } from '../contracts/ResultMetadata'; +import Status from '../dto/Status'; +import { LogLevel } from '../contracts/IDBSQLLogger'; +import OperationStateError, { OperationStateErrorCode } from '../errors/OperationStateError'; +import IResultsProvider from '../result/IResultsProvider'; +import RowSetProvider from '../result/RowSetProvider'; +import JsonResultHandler from '../result/JsonResultHandler'; +import ArrowResultHandler from '../result/ArrowResultHandler'; +import CloudFetchResultHandler from '../result/CloudFetchResultHandler'; +import ArrowResultConverter from '../result/ArrowResultConverter'; +import ResultSlicer from '../result/ResultSlicer'; +import { definedOrError } from '../utils'; +import HiveDriverError from '../errors/HiveDriverError'; + +interface ThriftOperationBackendOptions { + handle: TOperationHandle; + directResults?: TSparkDirectResults; + context: IClientContext; +} + +async function delay(ms?: number): Promise { + return new Promise((resolve) => { + setTimeout(resolve, ms); + }); +} + +function thriftStateToOperationState(state: TOperationState | undefined | null): OperationState { + switch (state) { + case TOperationState.INITIALIZED_STATE: + case TOperationState.PENDING_STATE: + return OperationState.Pending; + case TOperationState.RUNNING_STATE: + return OperationState.Running; + case TOperationState.FINISHED_STATE: + return OperationState.Succeeded; + case TOperationState.CANCELED_STATE: + return OperationState.Cancelled; + case TOperationState.CLOSED_STATE: + return OperationState.Closed; + case TOperationState.ERROR_STATE: + case TOperationState.TIMEDOUT_STATE: + return OperationState.Failed; + case TOperationState.UKNOWN_STATE: + default: + return OperationState.Unknown; + } +} + +function thriftRowSetTypeToResultFormat(type: TSparkRowSetType): ResultFormat { + switch (type) { + case TSparkRowSetType.COLUMN_BASED_SET: + return ResultFormat.ColumnBased; + case TSparkRowSetType.ARROW_BASED_SET: + return ResultFormat.ArrowBased; + case TSparkRowSetType.URL_BASED_SET: + return ResultFormat.UrlBased; + default: + throw new HiveDriverError(`Unsupported result format: ${TSparkRowSetType[type]}`); + } +} + +export default class ThriftOperationBackend implements IOperationBackend { + private readonly context: IClientContext; + + private readonly operationHandle: TOperationHandle; + + private readonly _data: RowSetProvider; + + private readonly closeOperation?: TCloseOperationResp; + + private metadata?: TGetResultSetMetadataResp; + + private metadataPromise?: Promise; + + private state: TOperationState = TOperationState.INITIALIZED_STATE; + + private operationStatus?: TGetOperationStatusResp; + + private resultHandler?: ResultSlicer; + + constructor({ handle, directResults, context }: ThriftOperationBackendOptions) { + this.operationHandle = handle; + this.context = context; + + const useOnlyPrefetchedResults = Boolean(directResults?.closeOperation); + + if (directResults?.operationStatus) { + this.processOperationStatusResponse(directResults.operationStatus); + } + + this.metadata = directResults?.resultSetMetadata; + this._data = new RowSetProvider( + this.context, + this.operationHandle, + [directResults?.resultSet], + useOnlyPrefetchedResults, + ); + this.closeOperation = directResults?.closeOperation; + } + + public get id(): string { + const operationId = this.operationHandle?.operationId?.guid; + return operationId ? stringify(operationId) : NIL; + } + + public get hasResultSet(): boolean { + return Boolean(this.operationHandle.hasResultSet); + } + + public async fetchChunk({ + limit, + disableBuffering, + }: { + limit: number; + disableBuffering?: boolean; + }): Promise> { + const resultHandler = await this.getResultHandler(); + + // All the library code is Promise-based, however, since Promises are microtasks, + // enqueueing a lot of promises may block macrotasks execution for a while. + // Usually, there are no much microtasks scheduled, however, when fetching query + // results (especially CloudFetch ones) it's quite easy to block event loop for + // long enough to break a lot of things. For example, with CloudFetch, after first + // set of files are downloaded and being processed immediately one by one, event + // loop easily gets blocked for enough time to break connection pool. `http.Agent` + // stops receiving socket events, and marks all sockets invalid on the next attempt + // to use them. See these similar issues that helped to debug this particular case - + // https://github.com/nodejs/node/issues/47130 and https://github.com/node-fetch/node-fetch/issues/1735 + await new Promise((resolve) => { + setTimeout(resolve, 0); + }); + + return resultHandler.fetchNext({ limit, disableBuffering }); + } + + public async hasMore(): Promise { + const resultHandler = await this.getResultHandler(); + return resultHandler.hasMore(); + } + + public async status(progress: boolean): Promise { + const response = await this.thriftStatusResponse(progress); + return this.adaptOperationStatus(response); + } + + /** + * Thrift-specific accessor that returns the raw `TGetOperationStatusResp`. + * + * Used internally to drive the Thrift state machine + attach the wire + * response to `OperationStateError`. Also called by the public + * `DBSQLOperation.status()` facade (zero-loss fast path) so existing user + * code that reads `taskStatus`, `numModifiedRows`, etc. continues to work + * verbatim against the Thrift backend. + * + * Not declared on `IOperationBackend` — non-Thrift backends do not + * implement it. The facade reaches it via `instanceof ThriftOperationBackend`. + */ + public async thriftStatusResponse(progress: boolean): Promise { + if (this.operationStatus) { + return this.operationStatus; + } + + const driver = await this.context.getDriver(); + const response = await driver.getOperationStatus({ + operationHandle: this.operationHandle, + getProgressUpdate: progress, + }); + + return this.processOperationStatusResponse(response); + } + + public async waitUntilReady(options?: WaitUntilReadyOptions): Promise { + if (this.state === TOperationState.FINISHED_STATE) { + return; + } + + let isReady = false; + + while (!isReady) { + // eslint-disable-next-line no-await-in-loop + const response = await this.thriftStatusResponse(Boolean(options?.progress)); + + if (options?.callback) { + // The public `OperationStatusCallback` is Thrift-shaped; pass the + // wire response verbatim. Non-Thrift backends synthesize via + // `synthesizeThriftStatus` in their own `waitUntilReady` impls. + // eslint-disable-next-line no-await-in-loop + await Promise.resolve(options.callback(response)); + } + + switch (response.operationState) { + case TOperationState.INITIALIZED_STATE: + case TOperationState.PENDING_STATE: + case TOperationState.RUNNING_STATE: + break; + + case TOperationState.FINISHED_STATE: + isReady = true; + break; + + case TOperationState.CANCELED_STATE: + throw new OperationStateError(OperationStateErrorCode.Canceled, response); + + case TOperationState.CLOSED_STATE: + throw new OperationStateError(OperationStateErrorCode.Closed, response); + + case TOperationState.ERROR_STATE: + throw new OperationStateError(OperationStateErrorCode.Error, response); + case TOperationState.TIMEDOUT_STATE: + throw new OperationStateError(OperationStateErrorCode.Timeout, response); + case TOperationState.UKNOWN_STATE: + default: + throw new OperationStateError(OperationStateErrorCode.Unknown, response); + } + + if (!isReady) { + // eslint-disable-next-line no-await-in-loop + await delay(100); + } + } + } + + public async getResultMetadata(): Promise { + return this.adaptResultMetadata(await this.thriftResultMetadataResponse()); + } + + /** + * Thrift-specific accessor for the raw `TGetResultSetMetadataResp`. + * + * Used internally by `getResultHandler` (dispatches on Thrift `resultFormat` + * and passes the full Thrift response to the JSON / Arrow / CloudFetch + * result handlers). Also called by the public `DBSQLOperation.getMetadata()` + * facade (zero-loss fast path). + * + * Not declared on `IOperationBackend` — non-Thrift backends do not implement + * it. The facade reaches it via `instanceof ThriftOperationBackend`. + */ + public async thriftResultMetadataResponse(): Promise { + if (this.metadata) { + return this.metadata; + } + + if (this.metadataPromise) { + return this.metadataPromise; + } + + this.metadataPromise = (async () => { + const driver = await this.context.getDriver(); + const metadata = await driver.getResultSetMetadata({ + operationHandle: this.operationHandle, + }); + Status.assert(metadata.status); + this.metadata = metadata; + return metadata; + })(); + + try { + return await this.metadataPromise; + } finally { + this.metadataPromise = undefined; + } + } + + public async cancel(): Promise { + this.context.getLogger().log(LogLevel.debug, `Cancelling operation with id: ${this.id}`); + const driver = await this.context.getDriver(); + const response = await driver.cancelOperation({ + operationHandle: this.operationHandle, + }); + Status.assert(response.status); + return new Status(response.status); + } + + public async close(): Promise { + this.context.getLogger().log(LogLevel.debug, `Closing operation with id: ${this.id}`); + const driver = await this.context.getDriver(); + const response = + this.closeOperation ?? + (await driver.closeOperation({ + operationHandle: this.operationHandle, + })); + Status.assert(response.status); + return new Status(response.status); + } + + private async getResultHandler(): Promise> { + const metadata = await this.thriftResultMetadataResponse(); + const resultFormat = definedOrError(metadata.resultFormat); + + if (!this.resultHandler) { + let resultSource: IResultsProvider> | undefined; + + switch (resultFormat) { + case TSparkRowSetType.COLUMN_BASED_SET: + resultSource = new JsonResultHandler(this.context, this._data, metadata); + break; + case TSparkRowSetType.ARROW_BASED_SET: + resultSource = new ArrowResultConverter( + this.context, + new ArrowResultHandler(this.context, this._data, metadata), + metadata, + ); + break; + case TSparkRowSetType.URL_BASED_SET: + resultSource = new ArrowResultConverter( + this.context, + new CloudFetchResultHandler(this.context, this._data, metadata), + metadata, + ); + break; + // no default + } + + if (resultSource) { + this.resultHandler = new ResultSlicer(this.context, resultSource); + } + } + + if (!this.resultHandler) { + throw new HiveDriverError(`Unsupported result format: ${TSparkRowSetType[resultFormat]}`); + } + + return this.resultHandler; + } + + private processOperationStatusResponse(response: TGetOperationStatusResp) { + Status.assert(response.status); + + this.state = response.operationState ?? this.state; + + if (typeof response.hasResultSet === 'boolean') { + this.operationHandle.hasResultSet = response.hasResultSet; + } + + const isInProgress = [ + TOperationState.INITIALIZED_STATE, + TOperationState.PENDING_STATE, + TOperationState.RUNNING_STATE, + ].includes(this.state); + + if (!isInProgress) { + this.operationStatus = response; + } + + return response; + } + + private adaptOperationStatus(response: TGetOperationStatusResp): OperationStatus { + return { + state: thriftStateToOperationState(response.operationState), + hasResultSet: typeof response.hasResultSet === 'boolean' ? response.hasResultSet : undefined, + errorMessage: response.errorMessage ?? response.displayMessage ?? undefined, + sqlState: response.sqlState ?? undefined, + progressUpdateResponse: response.progressUpdateResponse, + }; + } + + // eslint-disable-next-line class-methods-use-this + private adaptResultMetadata(response: TGetResultSetMetadataResp): ResultMetadata { + return { + schema: response.schema, + resultFormat: thriftRowSetTypeToResultFormat(definedOrError(response.resultFormat)), + lz4Compressed: response.lz4Compressed, + arrowSchema: response.arrowSchema, + isStagingOperation: Boolean(response.isStagingOperation), + }; + } +} diff --git a/lib/thrift-backend/ThriftSessionBackend.ts b/lib/thrift-backend/ThriftSessionBackend.ts new file mode 100644 index 00000000..c103ab4f --- /dev/null +++ b/lib/thrift-backend/ThriftSessionBackend.ts @@ -0,0 +1,333 @@ +import { stringify, NIL } from 'uuid'; +import Int64 from 'node-int64'; +import { + TSessionHandle, + TStatus, + TOperationHandle, + TSparkDirectResults, + TSparkArrowTypes, + TSparkParameter, + TProtocolVersion, + TExecuteStatementReq, +} from '../../thrift/TCLIService_types'; +import ISessionBackend from '../contracts/ISessionBackend'; +import IOperationBackend from '../contracts/IOperationBackend'; +import IClientContext, { ClientConfig } from '../contracts/IClientContext'; +import { + ExecuteStatementOptions, + TypeInfoRequest, + CatalogsRequest, + SchemasRequest, + TablesRequest, + TableTypesRequest, + ColumnsRequest, + FunctionsRequest, + PrimaryKeysRequest, + CrossReferenceRequest, +} from '../contracts/IDBSQLSession'; +import Status from '../dto/Status'; +import InfoValue from '../dto/InfoValue'; +import { definedOrError, LZ4, ProtocolVersion, serializeQueryTags } from '../utils'; +import ParameterError from '../errors/ParameterError'; +import { DBSQLParameter, DBSQLParameterValue } from '../DBSQLParameter'; +import { LogLevel } from '../contracts/IDBSQLLogger'; +import ThriftOperationBackend from './ThriftOperationBackend'; + +interface OperationResponseShape { + status: TStatus; + operationHandle?: TOperationHandle; + directResults?: TSparkDirectResults; +} + +export function numberToInt64(value: number | bigint | Int64): Int64 { + if (value instanceof Int64) { + return value; + } + + if (typeof value === 'bigint') { + const buffer = new ArrayBuffer(BigInt64Array.BYTES_PER_ELEMENT); + const view = new DataView(buffer); + view.setBigInt64(0, value, false); // `false` to use big-endian order + return new Int64(Buffer.from(buffer)); + } + + return new Int64(value); +} + +function getDirectResultsOptions(maxRows: number | bigint | Int64 | null | undefined, config: ClientConfig) { + if (maxRows === null) { + return {}; + } + + return { + getDirectResults: { + maxRows: numberToInt64(maxRows ?? config.directResultsDefaultMaxRows), + }, + }; +} + +function getArrowOptions( + config: ClientConfig, + serverProtocolVersion: TProtocolVersion | undefined | null, +): { + canReadArrowResult: boolean; + useArrowNativeTypes?: TSparkArrowTypes; +} { + const { arrowEnabled = true, useArrowNativeTypes = true } = config; + + if (!arrowEnabled || !ProtocolVersion.supportsArrowMetadata(serverProtocolVersion)) { + return { + canReadArrowResult: false, + }; + } + + return { + canReadArrowResult: true, + useArrowNativeTypes: { + timestampAsArrow: useArrowNativeTypes, + decimalAsArrow: useArrowNativeTypes, + complexTypesAsArrow: useArrowNativeTypes, + intervalTypesAsArrow: false, + }, + }; +} + +function getQueryParameters( + namedParameters?: Record, + ordinalParameters?: Array, +): Array { + const namedParametersProvided = namedParameters !== undefined && Object.keys(namedParameters).length > 0; + const ordinalParametersProvided = ordinalParameters !== undefined && ordinalParameters.length > 0; + + if (namedParametersProvided && ordinalParametersProvided) { + throw new ParameterError('Driver does not support both ordinal and named parameters.'); + } + + if (!namedParametersProvided && !ordinalParametersProvided) { + return []; + } + + const result: Array = []; + + if (namedParameters !== undefined) { + for (const name of Object.keys(namedParameters)) { + const value = namedParameters[name]; + const param = value instanceof DBSQLParameter ? value : new DBSQLParameter({ value }); + result.push(param.toSparkParameter({ name })); + } + } + + if (ordinalParameters !== undefined) { + for (const value of ordinalParameters) { + const param = value instanceof DBSQLParameter ? value : new DBSQLParameter({ value }); + result.push(param.toSparkParameter()); + } + } + + return result; +} + +interface ThriftSessionBackendOptions { + handle: TSessionHandle; + context: IClientContext; + serverProtocolVersion?: TProtocolVersion; +} + +export default class ThriftSessionBackend implements ISessionBackend { + private readonly context: IClientContext; + + private readonly sessionHandle: TSessionHandle; + + private readonly serverProtocolVersion?: TProtocolVersion; + + constructor({ handle, context, serverProtocolVersion }: ThriftSessionBackendOptions) { + this.sessionHandle = handle; + this.context = context; + this.serverProtocolVersion = serverProtocolVersion; + this.context.getLogger().log(LogLevel.debug, `Server protocol version: ${this.serverProtocolVersion}`); + } + + private getRunAsyncForMetadataOperations(): boolean | undefined { + return ProtocolVersion.supportsAsyncMetadataOperations(this.serverProtocolVersion) ? true : undefined; + } + + public get id(): string { + const sessionId = this.sessionHandle?.sessionId?.guid; + return sessionId ? stringify(sessionId) : NIL; + } + + public async getInfo(infoType: number): Promise { + const driver = await this.context.getDriver(); + const response = await driver.getInfo({ + sessionHandle: this.sessionHandle, + infoType, + }); + Status.assert(response.status); + return new InfoValue(response.infoValue); + } + + public async executeStatement(statement: string, options: ExecuteStatementOptions): Promise { + const driver = await this.context.getDriver(); + const clientConfig = this.context.getConfig(); + + const request = new TExecuteStatementReq({ + sessionHandle: this.sessionHandle, + statement, + queryTimeout: options.queryTimeout ? numberToInt64(options.queryTimeout) : undefined, + runAsync: true, + ...getDirectResultsOptions(options.maxRows, clientConfig), + ...getArrowOptions(clientConfig, this.serverProtocolVersion), + }); + + if (ProtocolVersion.supportsParameterizedQueries(this.serverProtocolVersion)) { + request.parameters = getQueryParameters(options.namedParameters, options.ordinalParameters); + } + + const serializedQueryTags = serializeQueryTags(options.queryTags); + if (serializedQueryTags !== undefined) { + request.confOverlay = { ...request.confOverlay, query_tags: serializedQueryTags }; + } + + if (ProtocolVersion.supportsCloudFetch(this.serverProtocolVersion)) { + request.canDownloadResult = options.useCloudFetch ?? clientConfig.useCloudFetch; + } + + if (ProtocolVersion.supportsArrowCompression(this.serverProtocolVersion) && request.canDownloadResult !== true) { + request.canDecompressLZ4Result = (options.useLZ4Compression ?? clientConfig.useLZ4Compression) && Boolean(LZ4()); + } + + const response = await driver.executeStatement(request); + return this.createOperationBackend(response); + } + + public async getTypeInfo(request: TypeInfoRequest): Promise { + const driver = await this.context.getDriver(); + const response = await driver.getTypeInfo({ + sessionHandle: this.sessionHandle, + runAsync: this.getRunAsyncForMetadataOperations(), + ...getDirectResultsOptions(request.maxRows, this.context.getConfig()), + }); + return this.createOperationBackend(response); + } + + public async getCatalogs(request: CatalogsRequest): Promise { + const driver = await this.context.getDriver(); + const response = await driver.getCatalogs({ + sessionHandle: this.sessionHandle, + runAsync: this.getRunAsyncForMetadataOperations(), + ...getDirectResultsOptions(request.maxRows, this.context.getConfig()), + }); + return this.createOperationBackend(response); + } + + public async getSchemas(request: SchemasRequest): Promise { + const driver = await this.context.getDriver(); + const response = await driver.getSchemas({ + sessionHandle: this.sessionHandle, + catalogName: request.catalogName, + schemaName: request.schemaName, + runAsync: this.getRunAsyncForMetadataOperations(), + ...getDirectResultsOptions(request.maxRows, this.context.getConfig()), + }); + return this.createOperationBackend(response); + } + + public async getTables(request: TablesRequest): Promise { + const driver = await this.context.getDriver(); + const response = await driver.getTables({ + sessionHandle: this.sessionHandle, + catalogName: request.catalogName, + schemaName: request.schemaName, + tableName: request.tableName, + tableTypes: request.tableTypes, + runAsync: this.getRunAsyncForMetadataOperations(), + ...getDirectResultsOptions(request.maxRows, this.context.getConfig()), + }); + return this.createOperationBackend(response); + } + + public async getTableTypes(request: TableTypesRequest): Promise { + const driver = await this.context.getDriver(); + const response = await driver.getTableTypes({ + sessionHandle: this.sessionHandle, + runAsync: this.getRunAsyncForMetadataOperations(), + ...getDirectResultsOptions(request.maxRows, this.context.getConfig()), + }); + return this.createOperationBackend(response); + } + + public async getColumns(request: ColumnsRequest): Promise { + const driver = await this.context.getDriver(); + const response = await driver.getColumns({ + sessionHandle: this.sessionHandle, + catalogName: request.catalogName, + schemaName: request.schemaName, + tableName: request.tableName, + columnName: request.columnName, + runAsync: this.getRunAsyncForMetadataOperations(), + ...getDirectResultsOptions(request.maxRows, this.context.getConfig()), + }); + return this.createOperationBackend(response); + } + + public async getFunctions(request: FunctionsRequest): Promise { + const driver = await this.context.getDriver(); + const response = await driver.getFunctions({ + sessionHandle: this.sessionHandle, + catalogName: request.catalogName, + schemaName: request.schemaName, + functionName: request.functionName, + runAsync: this.getRunAsyncForMetadataOperations(), + ...getDirectResultsOptions(request.maxRows, this.context.getConfig()), + }); + return this.createOperationBackend(response); + } + + public async getPrimaryKeys(request: PrimaryKeysRequest): Promise { + const driver = await this.context.getDriver(); + const response = await driver.getPrimaryKeys({ + sessionHandle: this.sessionHandle, + catalogName: request.catalogName, + schemaName: request.schemaName, + tableName: request.tableName, + runAsync: this.getRunAsyncForMetadataOperations(), + ...getDirectResultsOptions(request.maxRows, this.context.getConfig()), + }); + return this.createOperationBackend(response); + } + + public async getCrossReference(request: CrossReferenceRequest): Promise { + const driver = await this.context.getDriver(); + const response = await driver.getCrossReference({ + sessionHandle: this.sessionHandle, + parentCatalogName: request.parentCatalogName, + parentSchemaName: request.parentSchemaName, + parentTableName: request.parentTableName, + foreignCatalogName: request.foreignCatalogName, + foreignSchemaName: request.foreignSchemaName, + foreignTableName: request.foreignTableName, + runAsync: this.getRunAsyncForMetadataOperations(), + ...getDirectResultsOptions(request.maxRows, this.context.getConfig()), + }); + return this.createOperationBackend(response); + } + + public async close(): Promise { + const driver = await this.context.getDriver(); + const response = await driver.closeSession({ + sessionHandle: this.sessionHandle, + }); + Status.assert(response.status); + return new Status(response.status); + } + + private createOperationBackend(response: OperationResponseShape): IOperationBackend { + Status.assert(response.status); + const handle = definedOrError(response.operationHandle); + return new ThriftOperationBackend({ + handle, + directResults: response.directResults, + context: this.context, + }); + } +} diff --git a/lib/utils/thriftWireSynthesis.ts b/lib/utils/thriftWireSynthesis.ts new file mode 100644 index 00000000..b2f69246 --- /dev/null +++ b/lib/utils/thriftWireSynthesis.ts @@ -0,0 +1,87 @@ +import { + TGetOperationStatusResp, + TGetResultSetMetadataResp, + TOperationState, + TSparkRowSetType, + TStatus, + TStatusCode, +} from '../../thrift/TCLIService_types'; +import { OperationState, OperationStatus } from '../contracts/OperationStatus'; +import { ResultFormat, ResultMetadata } from '../contracts/ResultMetadata'; + +function synthesizeOkStatus(): TStatus { + return { statusCode: TStatusCode.SUCCESS_STATUS } as TStatus; +} + +function operationStateToThrift(state: OperationState): TOperationState { + switch (state) { + case OperationState.Pending: + return TOperationState.PENDING_STATE; + case OperationState.Running: + return TOperationState.RUNNING_STATE; + case OperationState.Succeeded: + return TOperationState.FINISHED_STATE; + case OperationState.Cancelled: + return TOperationState.CANCELED_STATE; + case OperationState.Closed: + return TOperationState.CLOSED_STATE; + case OperationState.Failed: + return TOperationState.ERROR_STATE; + case OperationState.Unknown: + default: + return TOperationState.UKNOWN_STATE; + } +} + +function resultFormatToThrift(format: ResultFormat): TSparkRowSetType { + switch (format) { + case ResultFormat.ColumnBased: + return TSparkRowSetType.COLUMN_BASED_SET; + case ResultFormat.ArrowBased: + return TSparkRowSetType.ARROW_BASED_SET; + case ResultFormat.UrlBased: + return TSparkRowSetType.URL_BASED_SET; + default: + return TSparkRowSetType.COLUMN_BASED_SET; + } +} + +/** + * Synthesize a Thrift `TGetOperationStatusResp` from the neutral + * `OperationStatus` DTO. Used by `DBSQLOperation.status()` when running + * against a non-Thrift backend (e.g. SEA) so the public API stays Thrift-shaped. + * + * Lossy by design: Thrift-only fields not carried by `OperationStatus` + * (`taskStatus`, `numModifiedRows`, `operationStarted`, `operationCompleted`, + * `displayMessage`, `diagnosticInfo`) are left undefined. Consumers that + * read those fields will see `undefined` on non-Thrift backends. + */ +export function synthesizeThriftStatus(status: OperationStatus): TGetOperationStatusResp { + return { + status: synthesizeOkStatus(), + operationState: operationStateToThrift(status.state), + sqlState: status.sqlState, + errorMessage: status.errorMessage, + hasResultSet: status.hasResultSet, + progressUpdateResponse: status.progressUpdateResponse as TGetOperationStatusResp['progressUpdateResponse'], + } as TGetOperationStatusResp; +} + +/** + * Synthesize a Thrift `TGetResultSetMetadataResp` from the neutral + * `ResultMetadata` DTO. Used by `DBSQLOperation.getMetadata()` when running + * against a non-Thrift backend. + * + * Lossy: `cacheLookupResult`, `uncompressedBytes`, `compressedBytes` are left + * undefined; `status` is set to a synthetic OK. + */ +export function synthesizeThriftResultSetMetadata(metadata: ResultMetadata): TGetResultSetMetadataResp { + return { + status: synthesizeOkStatus(), + schema: metadata.schema, + resultFormat: resultFormatToThrift(metadata.resultFormat), + lz4Compressed: metadata.lz4Compressed, + arrowSchema: metadata.arrowSchema, + isStagingOperation: metadata.isStagingOperation, + } as TGetResultSetMetadataResp; +} diff --git a/tests/unit/.stubs/OperationStub.ts b/tests/unit/.stubs/OperationStub.ts index cd827141..1dcac5ca 100644 --- a/tests/unit/.stubs/OperationStub.ts +++ b/tests/unit/.stubs/OperationStub.ts @@ -54,6 +54,10 @@ export default class OperationStub implements IOperation { return Promise.reject(new Error('Not implemented')); } + public async getResultMetadata() { + return Promise.reject(new Error('Not implemented')); + } + public iterateChunks(options?: IteratorOptions): IOperationChunksIterator { return new OperationChunksIterator(this, options); } diff --git a/tests/unit/.stubs/createOperationForTest.ts b/tests/unit/.stubs/createOperationForTest.ts new file mode 100644 index 00000000..563ad016 --- /dev/null +++ b/tests/unit/.stubs/createOperationForTest.ts @@ -0,0 +1,25 @@ +import { TOperationHandle, TSparkDirectResults } from '../../../thrift/TCLIService_types'; +import DBSQLOperation from '../../../lib/DBSQLOperation'; +import ThriftOperationBackend from '../../../lib/thrift-backend/ThriftOperationBackend'; +import IClientContext from '../../../lib/contracts/IClientContext'; + +interface CreateOperationForTestArgs { + handle: TOperationHandle; + directResults?: TSparkDirectResults; + context: IClientContext; +} + +/** + * Test helper that mirrors the pre-PR-378 `new DBSQLOperation({ handle, ... })` + * legacy ctor shape, but routes through the post-PR-378 `{ backend, ... }` + * shape by constructing a `ThriftOperationBackend` explicitly. Keeps the + * facade decoupled from concrete backend imports. + */ +export function createOperationForTest({ + handle, + directResults, + context, +}: CreateOperationForTestArgs): DBSQLOperation { + const backend = new ThriftOperationBackend({ handle, directResults, context }); + return new DBSQLOperation({ backend, context }); +} diff --git a/tests/unit/.stubs/createSessionForTest.ts b/tests/unit/.stubs/createSessionForTest.ts new file mode 100644 index 00000000..145c438e --- /dev/null +++ b/tests/unit/.stubs/createSessionForTest.ts @@ -0,0 +1,21 @@ +import { TSessionHandle, TProtocolVersion } from '../../../thrift/TCLIService_types'; +import DBSQLSession from '../../../lib/DBSQLSession'; +import ThriftSessionBackend from '../../../lib/thrift-backend/ThriftSessionBackend'; +import IClientContext from '../../../lib/contracts/IClientContext'; + +interface CreateSessionForTestArgs { + handle: TSessionHandle; + context: IClientContext; + serverProtocolVersion?: TProtocolVersion; +} + +/** + * Test helper that mirrors the pre-PR-378 `new DBSQLSession({ handle, ... })` + * legacy ctor shape, but routes through the post-PR-378 `{ backend, ... }` + * shape by constructing a `ThriftSessionBackend` explicitly. Keeps the + * facade decoupled from concrete backend imports. + */ +export function createSessionForTest({ handle, context, serverProtocolVersion }: CreateSessionForTestArgs): DBSQLSession { + const backend = new ThriftSessionBackend({ handle, context, serverProtocolVersion }); + return new DBSQLSession({ backend, context }); +} diff --git a/tests/unit/DBSQLClient.test.ts b/tests/unit/DBSQLClient.test.ts index 4c0a3a34..8c3e64ce 100644 --- a/tests/unit/DBSQLClient.test.ts +++ b/tests/unit/DBSQLClient.test.ts @@ -2,6 +2,7 @@ import { expect, AssertionError } from 'chai'; import sinon from 'sinon'; import DBSQLClient, { ThriftLibrary } from '../../lib/DBSQLClient'; import DBSQLSession from '../../lib/DBSQLSession'; +import ThriftBackend from '../../lib/thrift-backend/ThriftBackend'; import PlainHttpAuthentication from '../../lib/connection/auth/PlainHttpAuthentication'; import DatabricksOAuth from '../../lib/connection/auth/DatabricksOAuth'; @@ -25,6 +26,19 @@ const connectOptions = { token: 'dapi********************************', } satisfies ConnectionOptions; +// Test helper: build a DBSQLClient with `getClient` stubbed to return the given +// ThriftClient stub, and pre-seed `client['backend']` with a ThriftBackend. +// Used to avoid 12 copies of the same 4-line setup across the openSession tests. +function makeStubbedClient(thriftClient: ThriftClientStub = new ThriftClientStub()): { + client: DBSQLClient; + thriftClient: ThriftClientStub; +} { + const client = new DBSQLClient(); + sinon.stub(client, 'getClient').returns(Promise.resolve(thriftClient)); + client['backend'] = new ThriftBackend({ context: client, onConnectionEvent: () => {} }); + return { client, thriftClient }; +} + describe('DBSQLClient.connect', () => { it('should prepend "/" to path if it is missing', async () => { const client = new DBSQLClient(); @@ -103,18 +117,14 @@ describe('DBSQLClient.connect', () => { describe('DBSQLClient.openSession', () => { it('should successfully open session', async () => { - const client = new DBSQLClient(); - const thriftClient = new ThriftClientStub(); - sinon.stub(client, 'getClient').returns(Promise.resolve(thriftClient)); + const { client } = makeStubbedClient(); const session = await client.openSession(); expect(session).instanceOf(DBSQLSession); }); it('should use initial namespace options', async () => { - const client = new DBSQLClient(); - const thriftClient = new ThriftClientStub(); - sinon.stub(client, 'getClient').returns(Promise.resolve(thriftClient)); + const { client, thriftClient } = makeStubbedClient(); case1: { const initialCatalog = 'catalog1'; @@ -144,6 +154,7 @@ describe('DBSQLClient.openSession', () => { it('should throw an exception when not connected', async () => { const client = new DBSQLClient(); + client['backend'] = undefined; client['connectionProvider'] = undefined; try { @@ -158,15 +169,13 @@ describe('DBSQLClient.openSession', () => { }); it('should correctly pass server protocol version to session', async () => { - const client = new DBSQLClient(); - const thriftClient = new ThriftClientStub(); - sinon.stub(client, 'getClient').returns(Promise.resolve(thriftClient)); + const { client, thriftClient } = makeStubbedClient(); // Test with default protocol version (SPARK_CLI_SERVICE_PROTOCOL_V8) { const session = await client.openSession(); expect(session).instanceOf(DBSQLSession); - expect((session as DBSQLSession)['serverProtocolVersion']).to.equal( + expect(((session as DBSQLSession)['backend'] as any)['serverProtocolVersion']).to.equal( TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V8, ); } @@ -179,16 +188,14 @@ describe('DBSQLClient.openSession', () => { const session = await client.openSession(); expect(session).instanceOf(DBSQLSession); - expect((session as DBSQLSession)['serverProtocolVersion']).to.equal( + expect(((session as DBSQLSession)['backend'] as any)['serverProtocolVersion']).to.equal( TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V7, ); } }); it('should pass session configuration to OpenSessionReq', async () => { - const client = new DBSQLClient(); - const thriftClient = new ThriftClientStub(); - sinon.stub(client, 'getClient').returns(Promise.resolve(thriftClient)); + const { client, thriftClient } = makeStubbedClient(); const configuration = { QUERY_TAGS: 'team:engineering', ansi_mode: 'true' }; await client.openSession({ configuration }); @@ -196,9 +203,7 @@ describe('DBSQLClient.openSession', () => { }); it('should affect session behavior based on protocol version', async () => { - const client = new DBSQLClient(); - const thriftClient = new ThriftClientStub(); - sinon.stub(client, 'getClient').returns(Promise.resolve(thriftClient)); + const { client, thriftClient } = makeStubbedClient(); // With protocol version V6 - should support async metadata operations { @@ -360,6 +365,7 @@ describe('DBSQLClient.close', () => { client['client'] = thriftClient; client['connectionProvider'] = new ConnectionProviderStub(); client['authProvider'] = new AuthProviderStub(); + client['backend'] = new ThriftBackend({ context: client, onConnectionEvent: () => {} }); const session = await client.openSession(); if (!(session instanceof DBSQLSession)) { @@ -583,9 +589,7 @@ describe('DBSQLClient.enableMetricViewMetadata', () => { }); it('should inject session parameter when enableMetricViewMetadata is true', async () => { - const client = new DBSQLClient(); - const thriftClient = new ThriftClientStub(); - sinon.stub(client, 'getClient').returns(Promise.resolve(thriftClient)); + const { client, thriftClient } = makeStubbedClient(); await client.connect({ ...connectOptions, enableMetricViewMetadata: true }); await client.openSession(); @@ -597,9 +601,7 @@ describe('DBSQLClient.enableMetricViewMetadata', () => { }); it('should not inject session parameter when enableMetricViewMetadata is false', async () => { - const client = new DBSQLClient(); - const thriftClient = new ThriftClientStub(); - sinon.stub(client, 'getClient').returns(Promise.resolve(thriftClient)); + const { client, thriftClient } = makeStubbedClient(); await client.connect({ ...connectOptions, enableMetricViewMetadata: false }); await client.openSession(); @@ -610,9 +612,7 @@ describe('DBSQLClient.enableMetricViewMetadata', () => { }); it('should not inject session parameter when enableMetricViewMetadata is not set', async () => { - const client = new DBSQLClient(); - const thriftClient = new ThriftClientStub(); - sinon.stub(client, 'getClient').returns(Promise.resolve(thriftClient)); + const { client, thriftClient } = makeStubbedClient(); await client.connect(connectOptions); await client.openSession(); @@ -623,9 +623,7 @@ describe('DBSQLClient.enableMetricViewMetadata', () => { }); it('should preserve user-provided session configuration', async () => { - const client = new DBSQLClient(); - const thriftClient = new ThriftClientStub(); - sinon.stub(client, 'getClient').returns(Promise.resolve(thriftClient)); + const { client, thriftClient } = makeStubbedClient(); await client.connect({ ...connectOptions, enableMetricViewMetadata: true }); const userConfig = { QUERY_TAGS: 'team:engineering', ansi_mode: 'true' }; @@ -638,9 +636,7 @@ describe('DBSQLClient.enableMetricViewMetadata', () => { }); it('should serialize queryTags dict and set in session configuration', async () => { - const client = new DBSQLClient(); - const thriftClient = new ThriftClientStub(); - sinon.stub(client, 'getClient').returns(Promise.resolve(thriftClient)); + const { client, thriftClient } = makeStubbedClient(); await client.openSession({ queryTags: { team: 'data-eng', project: 'etl' }, @@ -652,9 +648,7 @@ describe('DBSQLClient.enableMetricViewMetadata', () => { }); it('should let queryTags take precedence over configuration.QUERY_TAGS', async () => { - const client = new DBSQLClient(); - const thriftClient = new ThriftClientStub(); - sinon.stub(client, 'getClient').returns(Promise.resolve(thriftClient)); + const { client, thriftClient } = makeStubbedClient(); await client.openSession({ queryTags: { team: 'new-team' }, @@ -668,9 +662,7 @@ describe('DBSQLClient.enableMetricViewMetadata', () => { }); it('should remove QUERY_TAGS from configuration when queryTags is empty', async () => { - const client = new DBSQLClient(); - const thriftClient = new ThriftClientStub(); - sinon.stub(client, 'getClient').returns(Promise.resolve(thriftClient)); + const { client, thriftClient } = makeStubbedClient(); await client.openSession({ queryTags: {}, diff --git a/tests/unit/DBSQLOperation.test.ts b/tests/unit/DBSQLOperation.test.ts index b5f142ba..1e670c46 100644 --- a/tests/unit/DBSQLOperation.test.ts +++ b/tests/unit/DBSQLOperation.test.ts @@ -21,6 +21,7 @@ import CloudFetchResultHandler from '../../lib/result/CloudFetchResultHandler'; import ResultSlicer from '../../lib/result/ResultSlicer'; import ClientContextStub from './.stubs/ClientContextStub'; +import { createOperationForTest } from './.stubs/createOperationForTest'; import { Type } from 'apache-arrow'; function operationHandleStub(overrides: Partial): TOperationHandle { @@ -47,15 +48,15 @@ describe('DBSQLOperation', () => { describe('status', () => { it('should pick up state from operation handle', async () => { const context = new ClientContextStub(); - const operation = new DBSQLOperation({ handle: operationHandleStub({ hasResultSet: true }), context }); + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: true }), context }); - expect(operation['state']).to.equal(TOperationState.INITIALIZED_STATE); - expect(operation['operationHandle'].hasResultSet).to.be.true; + expect((operation['backend'] as any)['state']).to.equal(TOperationState.INITIALIZED_STATE); + expect((operation['backend'] as any)['operationHandle'].hasResultSet).to.be.true; }); it('should pick up state from directResults', async () => { const context = new ClientContextStub(); - const operation = new DBSQLOperation({ + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: true }), context, directResults: { @@ -67,8 +68,8 @@ describe('DBSQLOperation', () => { }, }); - expect(operation['state']).to.equal(TOperationState.FINISHED_STATE); - expect(operation['operationHandle'].hasResultSet).to.be.true; + expect((operation['backend'] as any)['state']).to.equal(TOperationState.FINISHED_STATE); + expect((operation['backend'] as any)['operationHandle'].hasResultSet).to.be.true; }); it('should fetch status and update internal state', async () => { @@ -77,17 +78,17 @@ describe('DBSQLOperation', () => { driver.getOperationStatusResp.operationState = TOperationState.FINISHED_STATE; driver.getOperationStatusResp.hasResultSet = true; - const operation = new DBSQLOperation({ handle: operationHandleStub({ hasResultSet: false }), context }); + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: false }), context }); - expect(operation['state']).to.equal(TOperationState.INITIALIZED_STATE); - expect(operation['operationHandle'].hasResultSet).to.be.false; + expect((operation['backend'] as any)['state']).to.equal(TOperationState.INITIALIZED_STATE); + expect((operation['backend'] as any)['operationHandle'].hasResultSet).to.be.false; const status = await operation.status(); expect(driver.getOperationStatus.called).to.be.true; expect(status.operationState).to.equal(TOperationState.FINISHED_STATE); - expect(operation['state']).to.equal(TOperationState.FINISHED_STATE); - expect(operation['operationHandle'].hasResultSet).to.be.true; + expect((operation['backend'] as any)['state']).to.equal(TOperationState.FINISHED_STATE); + expect((operation['backend'] as any)['operationHandle'].hasResultSet).to.be.true; }); it('should request progress', async () => { @@ -95,7 +96,7 @@ describe('DBSQLOperation', () => { const driver = sinon.spy(context.driver); driver.getOperationStatusResp.operationState = TOperationState.FINISHED_STATE; - const operation = new DBSQLOperation({ handle: operationHandleStub({ hasResultSet: false }), context }); + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: false }), context }); await operation.status(true); expect(driver.getOperationStatus.called).to.be.true; @@ -108,10 +109,10 @@ describe('DBSQLOperation', () => { const driver = sinon.spy(context.driver); driver.getOperationStatusResp.hasResultSet = true; - const operation = new DBSQLOperation({ handle: operationHandleStub({ hasResultSet: false }), context }); + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: false }), context }); - expect(operation['state']).to.equal(TOperationState.INITIALIZED_STATE); - expect(operation['operationHandle'].hasResultSet).to.be.false; + expect((operation['backend'] as any)['state']).to.equal(TOperationState.INITIALIZED_STATE); + expect((operation['backend'] as any)['operationHandle'].hasResultSet).to.be.false; // First call - should fetch data and cache driver.getOperationStatusResp = { @@ -122,8 +123,8 @@ describe('DBSQLOperation', () => { expect(driver.getOperationStatus.callCount).to.equal(1); expect(status1.operationState).to.equal(TOperationState.FINISHED_STATE); - expect(operation['state']).to.equal(TOperationState.FINISHED_STATE); - expect(operation['operationHandle'].hasResultSet).to.be.true; + expect((operation['backend'] as any)['state']).to.equal(TOperationState.FINISHED_STATE); + expect((operation['backend'] as any)['operationHandle'].hasResultSet).to.be.true; // Second call - should return cached data driver.getOperationStatusResp = { @@ -134,8 +135,8 @@ describe('DBSQLOperation', () => { expect(driver.getOperationStatus.callCount).to.equal(1); expect(status2.operationState).to.equal(TOperationState.FINISHED_STATE); - expect(operation['state']).to.equal(TOperationState.FINISHED_STATE); - expect(operation['operationHandle'].hasResultSet).to.be.true; + expect((operation['backend'] as any)['state']).to.equal(TOperationState.FINISHED_STATE); + expect((operation['backend'] as any)['operationHandle'].hasResultSet).to.be.true; }); it('should fetch status if directResults status is not finished', async () => { @@ -144,7 +145,7 @@ describe('DBSQLOperation', () => { driver.getOperationStatusResp.operationState = TOperationState.FINISHED_STATE; driver.getOperationStatusResp.hasResultSet = true; - const operation = new DBSQLOperation({ + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: false }), context, directResults: { @@ -156,15 +157,15 @@ describe('DBSQLOperation', () => { }, }); - expect(operation['state']).to.equal(TOperationState.RUNNING_STATE); // from directResults - expect(operation['operationHandle'].hasResultSet).to.be.false; + expect((operation['backend'] as any)['state']).to.equal(TOperationState.RUNNING_STATE); // from directResults + expect((operation['backend'] as any)['operationHandle'].hasResultSet).to.be.false; const status = await operation.status(false); expect(driver.getOperationStatus.called).to.be.true; expect(status.operationState).to.equal(TOperationState.FINISHED_STATE); - expect(operation['state']).to.equal(TOperationState.FINISHED_STATE); - expect(operation['operationHandle'].hasResultSet).to.be.true; + expect((operation['backend'] as any)['state']).to.equal(TOperationState.FINISHED_STATE); + expect((operation['backend'] as any)['operationHandle'].hasResultSet).to.be.true; }); it('should not fetch status if directResults status is finished', async () => { @@ -173,7 +174,7 @@ describe('DBSQLOperation', () => { driver.getOperationStatusResp.operationState = TOperationState.RUNNING_STATE; driver.getOperationStatusResp.hasResultSet = true; - const operation = new DBSQLOperation({ + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: false }), context, directResults: { @@ -185,21 +186,21 @@ describe('DBSQLOperation', () => { }, }); - expect(operation['state']).to.equal(TOperationState.FINISHED_STATE); // from directResults - expect(operation['operationHandle'].hasResultSet).to.be.false; + expect((operation['backend'] as any)['state']).to.equal(TOperationState.FINISHED_STATE); // from directResults + expect((operation['backend'] as any)['operationHandle'].hasResultSet).to.be.false; const status = await operation.status(false); expect(driver.getOperationStatus.called).to.be.false; expect(status.operationState).to.equal(TOperationState.FINISHED_STATE); - expect(operation['state']).to.equal(TOperationState.FINISHED_STATE); - expect(operation['operationHandle'].hasResultSet).to.be.false; + expect((operation['backend'] as any)['state']).to.equal(TOperationState.FINISHED_STATE); + expect((operation['backend'] as any)['operationHandle'].hasResultSet).to.be.false; }); it('should throw an error in case of a status error', async () => { const context = new ClientContextStub(); context.driver.getOperationStatusResp.status.statusCode = TStatusCode.ERROR_STATUS; - const operation = new DBSQLOperation({ handle: operationHandleStub({ hasResultSet: true }), context }); + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: true }), context }); try { await operation.status(false); @@ -217,7 +218,7 @@ describe('DBSQLOperation', () => { it('should cancel operation and update state', async () => { const context = new ClientContextStub(); const driver = sinon.spy(context.driver); - const operation = new DBSQLOperation({ handle: operationHandleStub({ hasResultSet: true }), context }); + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: true }), context }); expect(operation['cancelled']).to.be.false; expect(operation['closed']).to.be.false; @@ -232,7 +233,7 @@ describe('DBSQLOperation', () => { it('should return immediately if already cancelled', async () => { const context = new ClientContextStub(); const driver = sinon.spy(context.driver); - const operation = new DBSQLOperation({ handle: operationHandleStub({ hasResultSet: true }), context }); + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: true }), context }); expect(operation['cancelled']).to.be.false; expect(operation['closed']).to.be.false; @@ -251,7 +252,7 @@ describe('DBSQLOperation', () => { it('should return immediately if already closed', async () => { const context = new ClientContextStub(); const driver = sinon.spy(context.driver); - const operation = new DBSQLOperation({ handle: operationHandleStub({ hasResultSet: true }), context }); + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: true }), context }); expect(operation['cancelled']).to.be.false; expect(operation['closed']).to.be.false; @@ -270,7 +271,7 @@ describe('DBSQLOperation', () => { it('should throw an error in case of a status error and keep state', async () => { const context = new ClientContextStub(); context.driver.cancelOperationResp.status.statusCode = TStatusCode.ERROR_STATUS; - const operation = new DBSQLOperation({ handle: operationHandleStub({ hasResultSet: true }), context }); + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: true }), context }); expect(operation['cancelled']).to.be.false; expect(operation['closed']).to.be.false; @@ -290,7 +291,7 @@ describe('DBSQLOperation', () => { it('should reject all methods once cancelled', async () => { const context = new ClientContextStub(); - const operation = new DBSQLOperation({ handle: operationHandleStub({ hasResultSet: true }), context }); + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: true }), context }); await operation.cancel(); expect(operation['cancelled']).to.be.true; @@ -307,7 +308,7 @@ describe('DBSQLOperation', () => { it('should close operation and update state', async () => { const context = new ClientContextStub(); const driver = sinon.spy(context.driver); - const operation = new DBSQLOperation({ handle: operationHandleStub({ hasResultSet: true }), context }); + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: true }), context }); expect(operation['cancelled']).to.be.false; expect(operation['closed']).to.be.false; @@ -322,7 +323,7 @@ describe('DBSQLOperation', () => { it('should return immediately if already closed', async () => { const context = new ClientContextStub(); const driver = sinon.spy(context.driver); - const operation = new DBSQLOperation({ handle: operationHandleStub({ hasResultSet: true }), context }); + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: true }), context }); expect(operation['cancelled']).to.be.false; expect(operation['closed']).to.be.false; @@ -341,7 +342,7 @@ describe('DBSQLOperation', () => { it('should return immediately if already cancelled', async () => { const context = new ClientContextStub(); const driver = sinon.spy(context.driver); - const operation = new DBSQLOperation({ handle: operationHandleStub({ hasResultSet: true }), context }); + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: true }), context }); expect(operation['cancelled']).to.be.false; expect(operation['closed']).to.be.false; @@ -361,7 +362,7 @@ describe('DBSQLOperation', () => { const context = new ClientContextStub(); const driver = sinon.spy(context.driver); - const operation = new DBSQLOperation({ + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: true }), context, directResults: { @@ -385,7 +386,7 @@ describe('DBSQLOperation', () => { it('should throw an error in case of a status error and keep state', async () => { const context = new ClientContextStub(); context.driver.closeOperationResp.status.statusCode = TStatusCode.ERROR_STATUS; - const operation = new DBSQLOperation({ handle: operationHandleStub({ hasResultSet: true }), context }); + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: true }), context }); expect(operation['cancelled']).to.be.false; expect(operation['closed']).to.be.false; @@ -405,7 +406,7 @@ describe('DBSQLOperation', () => { it('should reject all methods once closed', async () => { const context = new ClientContextStub(); - const operation = new DBSQLOperation({ handle: operationHandleStub({ hasResultSet: true }), context }); + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: true }), context }); await operation.close(); expect(operation['closed']).to.be.true; @@ -437,14 +438,14 @@ describe('DBSQLOperation', () => { return getOperationStatusStub.wrappedMethod.apply(context.driver, args); }); - const operation = new DBSQLOperation({ handle: operationHandleStub({ hasResultSet: true }), context }); + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: true }), context }); - expect(operation['state']).to.equal(TOperationState.INITIALIZED_STATE); + expect((operation['backend'] as any)['state']).to.equal(TOperationState.INITIALIZED_STATE); await operation.finished(); expect(getOperationStatusStub.callCount).to.be.equal(attemptsUntilFinished); - expect(operation['state']).to.equal(TOperationState.FINISHED_STATE); + expect((operation['backend'] as any)['state']).to.equal(TOperationState.FINISHED_STATE); }); }, ); @@ -463,7 +464,7 @@ describe('DBSQLOperation', () => { return getOperationStatusStub.wrappedMethod.apply(context.driver, args); }); - const operation = new DBSQLOperation({ handle: operationHandleStub({ hasResultSet: true }), context }); + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: true }), context }); await operation.finished({ progress: true }); expect(getOperationStatusStub.called).to.be.true; @@ -487,7 +488,7 @@ describe('DBSQLOperation', () => { return getOperationStatusStub.wrappedMethod.apply(context.driver, args); }); - const operation = new DBSQLOperation({ handle: operationHandleStub({ hasResultSet: true }), context }); + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: true }), context }); const callback = sinon.stub(); @@ -503,7 +504,7 @@ describe('DBSQLOperation', () => { driver.getOperationStatusResp.status.statusCode = TStatusCode.SUCCESS_STATUS; driver.getOperationStatusResp.operationState = TOperationState.FINISHED_STATE; - const operation = new DBSQLOperation({ + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: true }), context, directResults: { @@ -526,7 +527,7 @@ describe('DBSQLOperation', () => { context.driver.getOperationStatusResp.status.statusCode = TStatusCode.ERROR_STATUS; context.driver.getOperationStatusResp.operationState = TOperationState.FINISHED_STATE; - const operation = new DBSQLOperation({ handle: operationHandleStub({ hasResultSet: true }), context }); + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: true }), context }); try { await operation.finished(); @@ -551,7 +552,7 @@ describe('DBSQLOperation', () => { context.driver.getOperationStatusResp.status.statusCode = TStatusCode.SUCCESS_STATUS; context.driver.getOperationStatusResp.operationState = operationState; - const operation = new DBSQLOperation({ handle: operationHandleStub({ hasResultSet: true }), context }); + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: true }), context }); try { await operation.finished(); @@ -573,7 +574,7 @@ describe('DBSQLOperation', () => { context.driver.getOperationStatusResp.operationState = TOperationState.FINISHED_STATE; context.driver.getOperationStatusResp.hasResultSet = false; - const operation = new DBSQLOperation({ handle: operationHandleStub({ hasResultSet: false }), context }); + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: false }), context }); const schema = await operation.getSchema(); @@ -597,13 +598,13 @@ describe('DBSQLOperation', () => { context.driver.getResultSetMetadataResp.schema = { columns: [] }; - const operation = new DBSQLOperation({ handle: operationHandleStub({ hasResultSet: true }), context }); + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: true }), context }); const schema = await operation.getSchema(); expect(getOperationStatusStub.called).to.be.true; expect(schema).to.deep.equal(context.driver.getResultSetMetadataResp.schema); - expect(operation['state']).to.equal(TOperationState.FINISHED_STATE); + expect((operation['backend'] as any)['state']).to.equal(TOperationState.FINISHED_STATE); }); it('should request progress', async () => { @@ -620,7 +621,7 @@ describe('DBSQLOperation', () => { return getOperationStatusStub.wrappedMethod.apply(context.driver, args); }); - const operation = new DBSQLOperation({ handle: operationHandleStub({ hasResultSet: true }), context }); + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: true }), context }); await operation.getSchema({ progress: true }); expect(getOperationStatusStub.called).to.be.true; @@ -644,7 +645,7 @@ describe('DBSQLOperation', () => { return getOperationStatusStub.wrappedMethod.apply(context.driver, args); }); - const operation = new DBSQLOperation({ handle: operationHandleStub({ hasResultSet: true }), context }); + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: true }), context }); const callback = sinon.stub(); @@ -660,7 +661,7 @@ describe('DBSQLOperation', () => { driver.getOperationStatusResp.operationState = TOperationState.FINISHED_STATE; driver.getOperationStatusResp.hasResultSet = true; - const operation = new DBSQLOperation({ handle: operationHandleStub({ hasResultSet: true }), context }); + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: true }), context }); const schema = await operation.getSchema(); expect(schema).to.deep.equal(driver.getResultSetMetadataResp.schema); @@ -673,7 +674,7 @@ describe('DBSQLOperation', () => { driver.getOperationStatusResp.operationState = TOperationState.FINISHED_STATE; driver.getOperationStatusResp.hasResultSet = true; - const operation = new DBSQLOperation({ handle: operationHandleStub({ hasResultSet: true }), context }); + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: true }), context }); const schema1 = await operation.getSchema(); expect(schema1).to.deep.equal(context.driver.getResultSetMetadataResp.schema); @@ -710,7 +711,7 @@ describe('DBSQLOperation', () => { }, }, }; - const operation = new DBSQLOperation({ + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: true }), context, directResults, @@ -728,7 +729,7 @@ describe('DBSQLOperation', () => { context.driver.getOperationStatusResp.hasResultSet = true; context.driver.getResultSetMetadataResp.status.statusCode = TStatusCode.ERROR_STATUS; - const operation = new DBSQLOperation({ handle: operationHandleStub({ hasResultSet: true }), context }); + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: true }), context }); try { await operation.getSchema(); @@ -751,8 +752,8 @@ describe('DBSQLOperation', () => { driver.getResultSetMetadataResp.resultFormat = TSparkRowSetType.COLUMN_BASED_SET; driver.getResultSetMetadata.resetHistory(); - const operation = new DBSQLOperation({ handle: operationHandleStub({ hasResultSet: true }), context }); - const resultHandler = await operation['getResultHandler'](); + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: true }), context }); + const resultHandler = await (operation['backend'] as any)['getResultHandler'](); expect(driver.getResultSetMetadata.called).to.be.true; expect(resultHandler).to.be.instanceOf(ResultSlicer); expect(resultHandler['source']).to.be.instanceOf(JsonResultHandler); @@ -762,8 +763,8 @@ describe('DBSQLOperation', () => { driver.getResultSetMetadataResp.resultFormat = TSparkRowSetType.ARROW_BASED_SET; driver.getResultSetMetadata.resetHistory(); - const operation = new DBSQLOperation({ handle: operationHandleStub({ hasResultSet: true }), context }); - const resultHandler = await operation['getResultHandler'](); + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: true }), context }); + const resultHandler = await (operation['backend'] as any)['getResultHandler'](); expect(driver.getResultSetMetadata.called).to.be.true; expect(resultHandler).to.be.instanceOf(ResultSlicer); expect(resultHandler['source']).to.be.instanceOf(ArrowResultConverter); @@ -777,8 +778,8 @@ describe('DBSQLOperation', () => { driver.getResultSetMetadataResp.resultFormat = TSparkRowSetType.URL_BASED_SET; driver.getResultSetMetadata.resetHistory(); - const operation = new DBSQLOperation({ handle: operationHandleStub({ hasResultSet: true }), context }); - const resultHandler = await operation['getResultHandler'](); + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: true }), context }); + const resultHandler = await (operation['backend'] as any)['getResultHandler'](); expect(driver.getResultSetMetadata.called).to.be.true; expect(resultHandler).to.be.instanceOf(ResultSlicer); expect(resultHandler['source']).to.be.instanceOf(ArrowResultConverter); @@ -795,7 +796,7 @@ describe('DBSQLOperation', () => { const context = new ClientContextStub(); const driver = sinon.spy(context.driver); - const operation = new DBSQLOperation({ handle: operationHandleStub({ hasResultSet: false }), context }); + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: false }), context }); const results = await operation.fetchChunk({ disableBuffering: true }); @@ -822,13 +823,13 @@ describe('DBSQLOperation', () => { context.driver.fetchResultsResp.hasMoreRows = false; context.driver.fetchResultsResp.results!.columns = []; - const operation = new DBSQLOperation({ handle: operationHandleStub({ hasResultSet: true }), context }); + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: true }), context }); const results = await operation.fetchChunk({ disableBuffering: true }); expect(getOperationStatusStub.called).to.be.true; expect(results).to.deep.equal([]); - expect(operation['state']).to.equal(TOperationState.FINISHED_STATE); + expect((operation['backend'] as any)['state']).to.equal(TOperationState.FINISHED_STATE); }); it('should request progress', async () => { @@ -849,7 +850,7 @@ describe('DBSQLOperation', () => { context.driver.fetchResultsResp.hasMoreRows = false; context.driver.fetchResultsResp.results!.columns = []; - const operation = new DBSQLOperation({ handle: operationHandleStub({ hasResultSet: true }), context }); + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: true }), context }); await operation.fetchChunk({ progress: true, disableBuffering: true }); expect(getOperationStatusStub.called).to.be.true; @@ -877,7 +878,7 @@ describe('DBSQLOperation', () => { context.driver.fetchResultsResp.hasMoreRows = false; context.driver.fetchResultsResp.results!.columns = []; - const operation = new DBSQLOperation({ handle: operationHandleStub({ hasResultSet: true }), context }); + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: true }), context }); const callback = sinon.stub(); @@ -893,7 +894,7 @@ describe('DBSQLOperation', () => { driver.getOperationStatusResp.operationState = TOperationState.FINISHED_STATE; driver.getOperationStatusResp.hasResultSet = true; - const operation = new DBSQLOperation({ handle: operationHandleStub({ hasResultSet: true }), context }); + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: true }), context }); const results = await operation.fetchChunk({ disableBuffering: true }); @@ -907,7 +908,7 @@ describe('DBSQLOperation', () => { const driver = sinon.spy(context.driver); driver.getOperationStatusResp.operationState = TOperationState.FINISHED_STATE; - const operation = new DBSQLOperation({ + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: true }), context, directResults: { @@ -943,7 +944,7 @@ describe('DBSQLOperation', () => { driver.getOperationStatusResp.operationState = TOperationState.FINISHED_STATE; driver.getOperationStatusResp.hasResultSet = true; - const operation = new DBSQLOperation({ + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: true }), context, directResults: { @@ -986,7 +987,7 @@ describe('DBSQLOperation', () => { context.driver.getResultSetMetadataResp.resultFormat = TSparkRowSetType.ROW_BASED_SET; context.driver.getResultSetMetadataResp.schema = { columns: [] }; - const operation = new DBSQLOperation({ handle: operationHandleStub({ hasResultSet: true }), context }); + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: true }), context }); try { await operation.fetchChunk({ disableBuffering: true }); @@ -1003,7 +1004,7 @@ describe('DBSQLOperation', () => { describe('fetchAll', () => { it('should fetch data while available and return it all', async () => { const context = new ClientContextStub(); - const operation = new DBSQLOperation({ handle: operationHandleStub({ hasResultSet: true }), context }); + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: true }), context }); const originalData = [1, 2, 3, 4, 5, 6, 7, 8, 9, 0]; @@ -1038,13 +1039,13 @@ describe('DBSQLOperation', () => { context.driver.getOperationStatusResp.hasResultSet = true; context.driver.fetchResultsResp.hasMoreRows = false; context.driver.fetchResultsResp.results = undefined; - const operation = new DBSQLOperation({ handle: operationHandleStub({ hasResultSet: true }), context }); + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: true }), context }); expect(await operation.hasMoreRows()).to.be.true; - expect(operation['_data']['hasMoreRowsFlag']).to.be.undefined; + expect((operation['backend'] as any)['_data']['hasMoreRowsFlag']).to.be.undefined; await operation.fetchChunk({ disableBuffering: true }); expect(await operation.hasMoreRows()).to.be.false; - expect(operation['_data']['hasMoreRowsFlag']).to.be.false; + expect((operation['backend'] as any)['_data']['hasMoreRowsFlag']).to.be.false; }); it('should return False if operation was closed', async () => { @@ -1053,7 +1054,7 @@ describe('DBSQLOperation', () => { context.driver.getOperationStatusResp.operationState = TOperationState.FINISHED_STATE; context.driver.getOperationStatusResp.hasResultSet = true; context.driver.fetchResultsResp.hasMoreRows = true; - const operation = new DBSQLOperation({ handle: operationHandleStub({ hasResultSet: true }), context }); + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: true }), context }); expect(await operation.hasMoreRows()).to.be.true; await operation.fetchChunk({ disableBuffering: true }); @@ -1068,7 +1069,7 @@ describe('DBSQLOperation', () => { context.driver.getOperationStatusResp.operationState = TOperationState.FINISHED_STATE; context.driver.getOperationStatusResp.hasResultSet = true; context.driver.fetchResultsResp.hasMoreRows = true; - const operation = new DBSQLOperation({ handle: operationHandleStub({ hasResultSet: true }), context }); + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: true }), context }); expect(await operation.hasMoreRows()).to.be.true; await operation.fetchChunk({ disableBuffering: true }); @@ -1083,13 +1084,13 @@ describe('DBSQLOperation', () => { context.driver.getOperationStatusResp.operationState = TOperationState.FINISHED_STATE; context.driver.getOperationStatusResp.hasResultSet = true; context.driver.fetchResultsResp.hasMoreRows = true; - const operation = new DBSQLOperation({ handle: operationHandleStub({ hasResultSet: true }), context }); + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: true }), context }); expect(await operation.hasMoreRows()).to.be.true; - expect(operation['_data']['hasMoreRowsFlag']).to.be.undefined; + expect((operation['backend'] as any)['_data']['hasMoreRowsFlag']).to.be.undefined; await operation.fetchChunk({ disableBuffering: true }); expect(await operation.hasMoreRows()).to.be.true; - expect(operation['_data']['hasMoreRowsFlag']).to.be.true; + expect((operation['backend'] as any)['_data']['hasMoreRowsFlag']).to.be.true; }); it('should return True if hasMoreRows flag is False but there is actual data', async () => { @@ -1098,13 +1099,13 @@ describe('DBSQLOperation', () => { context.driver.getOperationStatusResp.operationState = TOperationState.FINISHED_STATE; context.driver.getOperationStatusResp.hasResultSet = true; context.driver.fetchResultsResp.hasMoreRows = false; - const operation = new DBSQLOperation({ handle: operationHandleStub({ hasResultSet: true }), context }); + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: true }), context }); expect(await operation.hasMoreRows()).to.be.true; - expect(operation['_data']['hasMoreRowsFlag']).to.be.undefined; + expect((operation['backend'] as any)['_data']['hasMoreRowsFlag']).to.be.undefined; await operation.fetchChunk({ disableBuffering: true }); expect(await operation.hasMoreRows()).to.be.true; - expect(operation['_data']['hasMoreRowsFlag']).to.be.true; + expect((operation['backend'] as any)['_data']['hasMoreRowsFlag']).to.be.true; }); it('should return True if hasMoreRows flag is unset but there is actual data', async () => { @@ -1113,13 +1114,13 @@ describe('DBSQLOperation', () => { context.driver.getOperationStatusResp.operationState = TOperationState.FINISHED_STATE; context.driver.getOperationStatusResp.hasResultSet = true; context.driver.fetchResultsResp.hasMoreRows = undefined; - const operation = new DBSQLOperation({ handle: operationHandleStub({ hasResultSet: true }), context }); + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: true }), context }); expect(await operation.hasMoreRows()).to.be.true; - expect(operation['_data']['hasMoreRowsFlag']).to.be.undefined; + expect((operation['backend'] as any)['_data']['hasMoreRowsFlag']).to.be.undefined; await operation.fetchChunk({ disableBuffering: true }); expect(await operation.hasMoreRows()).to.be.true; - expect(operation['_data']['hasMoreRowsFlag']).to.be.true; + expect((operation['backend'] as any)['_data']['hasMoreRowsFlag']).to.be.true; }); it('should return False if hasMoreRows flag is False and there is no data', async () => { @@ -1129,13 +1130,13 @@ describe('DBSQLOperation', () => { context.driver.getOperationStatusResp.hasResultSet = true; context.driver.fetchResultsResp.hasMoreRows = false; context.driver.fetchResultsResp.results = undefined; - const operation = new DBSQLOperation({ handle: operationHandleStub({ hasResultSet: true }), context }); + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: true }), context }); expect(await operation.hasMoreRows()).to.be.true; - expect(operation['_data']['hasMoreRowsFlag']).to.be.undefined; + expect((operation['backend'] as any)['_data']['hasMoreRowsFlag']).to.be.undefined; await operation.fetchChunk({ disableBuffering: true }); expect(await operation.hasMoreRows()).to.be.false; - expect(operation['_data']['hasMoreRowsFlag']).to.be.false; + expect((operation['backend'] as any)['_data']['hasMoreRowsFlag']).to.be.false; }); }); @@ -1147,7 +1148,7 @@ describe('DBSQLOperation', () => { driver.getOperationStatusResp.hasResultSet = true; // Create operation without direct results to force metadata fetching - const operation = new DBSQLOperation({ handle: operationHandleStub({ hasResultSet: true }), context }); + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: true }), context }); // Trigger multiple concurrent metadata fetches const results = await Promise.all([operation.hasMoreRows(), operation.hasMoreRows(), operation.hasMoreRows()]); @@ -1165,7 +1166,7 @@ describe('DBSQLOperation', () => { driver.getOperationStatusResp.operationState = TOperationState.FINISHED_STATE; driver.getOperationStatusResp.hasResultSet = true; - const operation = new DBSQLOperation({ handle: operationHandleStub({ hasResultSet: true }), context }); + const operation = createOperationForTest({ handle: operationHandleStub({ hasResultSet: true }), context }); // First call should fetch metadata await operation.hasMoreRows(); diff --git a/tests/unit/DBSQLSession.test.ts b/tests/unit/DBSQLSession.test.ts index 0dc79037..51b27133 100644 --- a/tests/unit/DBSQLSession.test.ts +++ b/tests/unit/DBSQLSession.test.ts @@ -7,6 +7,7 @@ import Status from '../../lib/dto/Status'; import DBSQLOperation from '../../lib/DBSQLOperation'; import { TSessionHandle, TProtocolVersion } from '../../thrift/TCLIService_types'; import ClientContextStub from './.stubs/ClientContextStub'; +import { createSessionForTest } from './.stubs/createSessionForTest'; const sessionHandleStub: TSessionHandle = { sessionId: { guid: Buffer.alloc(16), secret: Buffer.alloc(16) }, @@ -50,7 +51,7 @@ describe('DBSQLSession', () => { describe('getInfo', () => { it('should run operation', async () => { - const session = new DBSQLSession({ handle: sessionHandleStub, context: new ClientContextStub() }); + const session = createSessionForTest({ handle: sessionHandleStub, context: new ClientContextStub() }); const result = await session.getInfo(1); expect(result).instanceOf(InfoValue); }); @@ -58,26 +59,26 @@ describe('DBSQLSession', () => { describe('executeStatement', () => { it('should execute statement', async () => { - const session = new DBSQLSession({ handle: sessionHandleStub, context: new ClientContextStub() }); + const session = createSessionForTest({ handle: sessionHandleStub, context: new ClientContextStub() }); const result = await session.executeStatement('SELECT * FROM table'); expect(result).instanceOf(DBSQLOperation); }); it('should use direct results', async () => { - const session = new DBSQLSession({ handle: sessionHandleStub, context: new ClientContextStub() }); + const session = createSessionForTest({ handle: sessionHandleStub, context: new ClientContextStub() }); const result = await session.executeStatement('SELECT * FROM table', { maxRows: 10 }); expect(result).instanceOf(DBSQLOperation); }); it('should disable direct results', async () => { - const session = new DBSQLSession({ handle: sessionHandleStub, context: new ClientContextStub() }); + const session = createSessionForTest({ handle: sessionHandleStub, context: new ClientContextStub() }); const result = await session.executeStatement('SELECT * FROM table', { maxRows: null }); expect(result).instanceOf(DBSQLOperation); }); describe('Arrow support', () => { it('should not use Arrow if disabled in options', async () => { - const session = new DBSQLSession({ + const session = createSessionForTest({ handle: sessionHandleStub, context: new ClientContextStub({ arrowEnabled: false }), }); @@ -88,7 +89,7 @@ describe('DBSQLSession', () => { it('should apply defaults for Arrow options', async () => { // case 1 { - const session = new DBSQLSession({ + const session = createSessionForTest({ handle: sessionHandleStub, context: new ClientContextStub({ arrowEnabled: true }), }); @@ -98,7 +99,7 @@ describe('DBSQLSession', () => { // case 2 { - const session = new DBSQLSession({ + const session = createSessionForTest({ handle: sessionHandleStub, context: new ClientContextStub({ arrowEnabled: true, useArrowNativeTypes: false }), }); @@ -133,7 +134,7 @@ describe('DBSQLSession', () => { useLZ4Compression: true, }; - const session = new DBSQLSession({ + const session = createSessionForTest({ handle: sessionHandleStub, context, serverProtocolVersion: version, @@ -195,7 +196,7 @@ describe('DBSQLSession', () => { const statement = 'SELECT * FROM table'; // Use V6+ which supports arrow compression - const session = new DBSQLSession({ + const session = createSessionForTest({ handle: sessionHandleStub, context, serverProtocolVersion: TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V6, @@ -218,7 +219,7 @@ describe('DBSQLSession', () => { const statement = 'SELECT * FROM table'; // Use V6+ which supports arrow compression - const session = new DBSQLSession({ + const session = createSessionForTest({ handle: sessionHandleStub, context, serverProtocolVersion: TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V6, @@ -241,7 +242,7 @@ describe('DBSQLSession', () => { const statement = 'SELECT * FROM table'; // Use V5 which does not support arrow compression - const session = new DBSQLSession({ + const session = createSessionForTest({ handle: sessionHandleStub, context, serverProtocolVersion: TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V5, @@ -263,7 +264,7 @@ describe('DBSQLSession', () => { it('should set confOverlay with query_tags when queryTags are provided', async () => { const context = new ClientContextStub(); const driver = sinon.spy(context.driver); - const session = new DBSQLSession({ handle: sessionHandleStub, context }); + const session = createSessionForTest({ handle: sessionHandleStub, context }); await session.executeStatement('SELECT 1', { queryTags: { team: 'eng', app: 'etl' } }); @@ -275,7 +276,7 @@ describe('DBSQLSession', () => { it('should not set confOverlay query_tags when queryTags is not provided', async () => { const context = new ClientContextStub(); const driver = sinon.spy(context.driver); - const session = new DBSQLSession({ handle: sessionHandleStub, context }); + const session = createSessionForTest({ handle: sessionHandleStub, context }); await session.executeStatement('SELECT 1'); @@ -287,7 +288,7 @@ describe('DBSQLSession', () => { it('should not set confOverlay query_tags when queryTags is empty', async () => { const context = new ClientContextStub(); const driver = sinon.spy(context.driver); - const session = new DBSQLSession({ handle: sessionHandleStub, context }); + const session = createSessionForTest({ handle: sessionHandleStub, context }); await session.executeStatement('SELECT 1', { queryTags: {} }); @@ -299,19 +300,19 @@ describe('DBSQLSession', () => { describe('getTypeInfo', () => { it('should run operation', async () => { - const session = new DBSQLSession({ handle: sessionHandleStub, context: new ClientContextStub() }); + const session = createSessionForTest({ handle: sessionHandleStub, context: new ClientContextStub() }); const result = await session.getTypeInfo(); expect(result).instanceOf(DBSQLOperation); }); it('should use direct results', async () => { - const session = new DBSQLSession({ handle: sessionHandleStub, context: new ClientContextStub() }); + const session = createSessionForTest({ handle: sessionHandleStub, context: new ClientContextStub() }); const result = await session.getTypeInfo({ maxRows: 10 }); expect(result).instanceOf(DBSQLOperation); }); it('should disable direct results', async () => { - const session = new DBSQLSession({ handle: sessionHandleStub, context: new ClientContextStub() }); + const session = createSessionForTest({ handle: sessionHandleStub, context: new ClientContextStub() }); const result = await session.getTypeInfo({ maxRows: null }); expect(result).instanceOf(DBSQLOperation); }); @@ -319,19 +320,19 @@ describe('DBSQLSession', () => { describe('getCatalogs', () => { it('should run operation', async () => { - const session = new DBSQLSession({ handle: sessionHandleStub, context: new ClientContextStub() }); + const session = createSessionForTest({ handle: sessionHandleStub, context: new ClientContextStub() }); const result = await session.getCatalogs(); expect(result).instanceOf(DBSQLOperation); }); it('should use direct results', async () => { - const session = new DBSQLSession({ handle: sessionHandleStub, context: new ClientContextStub() }); + const session = createSessionForTest({ handle: sessionHandleStub, context: new ClientContextStub() }); const result = await session.getCatalogs({ maxRows: 10 }); expect(result).instanceOf(DBSQLOperation); }); it('should disable direct results', async () => { - const session = new DBSQLSession({ handle: sessionHandleStub, context: new ClientContextStub() }); + const session = createSessionForTest({ handle: sessionHandleStub, context: new ClientContextStub() }); const result = await session.getCatalogs({ maxRows: null }); expect(result).instanceOf(DBSQLOperation); }); @@ -339,13 +340,13 @@ describe('DBSQLSession', () => { describe('getSchemas', () => { it('should run operation', async () => { - const session = new DBSQLSession({ handle: sessionHandleStub, context: new ClientContextStub() }); + const session = createSessionForTest({ handle: sessionHandleStub, context: new ClientContextStub() }); const result = await session.getSchemas(); expect(result).instanceOf(DBSQLOperation); }); it('should use filters', async () => { - const session = new DBSQLSession({ handle: sessionHandleStub, context: new ClientContextStub() }); + const session = createSessionForTest({ handle: sessionHandleStub, context: new ClientContextStub() }); const result = await session.getSchemas({ catalogName: 'catalog', schemaName: 'schema', @@ -354,13 +355,13 @@ describe('DBSQLSession', () => { }); it('should use direct results', async () => { - const session = new DBSQLSession({ handle: sessionHandleStub, context: new ClientContextStub() }); + const session = createSessionForTest({ handle: sessionHandleStub, context: new ClientContextStub() }); const result = await session.getSchemas({ maxRows: 10 }); expect(result).instanceOf(DBSQLOperation); }); it('should disable direct results', async () => { - const session = new DBSQLSession({ handle: sessionHandleStub, context: new ClientContextStub() }); + const session = createSessionForTest({ handle: sessionHandleStub, context: new ClientContextStub() }); const result = await session.getSchemas({ maxRows: null }); expect(result).instanceOf(DBSQLOperation); }); @@ -368,13 +369,13 @@ describe('DBSQLSession', () => { describe('getTables', () => { it('should run operation', async () => { - const session = new DBSQLSession({ handle: sessionHandleStub, context: new ClientContextStub() }); + const session = createSessionForTest({ handle: sessionHandleStub, context: new ClientContextStub() }); const result = await session.getTables(); expect(result).instanceOf(DBSQLOperation); }); it('should use filters', async () => { - const session = new DBSQLSession({ handle: sessionHandleStub, context: new ClientContextStub() }); + const session = createSessionForTest({ handle: sessionHandleStub, context: new ClientContextStub() }); const result = await session.getTables({ catalogName: 'catalog', schemaName: 'default', @@ -385,13 +386,13 @@ describe('DBSQLSession', () => { }); it('should use direct results', async () => { - const session = new DBSQLSession({ handle: sessionHandleStub, context: new ClientContextStub() }); + const session = createSessionForTest({ handle: sessionHandleStub, context: new ClientContextStub() }); const result = await session.getTables({ maxRows: 10 }); expect(result).instanceOf(DBSQLOperation); }); it('should disable direct results', async () => { - const session = new DBSQLSession({ handle: sessionHandleStub, context: new ClientContextStub() }); + const session = createSessionForTest({ handle: sessionHandleStub, context: new ClientContextStub() }); const result = await session.getTables({ maxRows: null }); expect(result).instanceOf(DBSQLOperation); }); @@ -399,19 +400,19 @@ describe('DBSQLSession', () => { describe('getTableTypes', () => { it('should run operation', async () => { - const session = new DBSQLSession({ handle: sessionHandleStub, context: new ClientContextStub() }); + const session = createSessionForTest({ handle: sessionHandleStub, context: new ClientContextStub() }); const result = await session.getTableTypes(); expect(result).instanceOf(DBSQLOperation); }); it('should use direct results', async () => { - const session = new DBSQLSession({ handle: sessionHandleStub, context: new ClientContextStub() }); + const session = createSessionForTest({ handle: sessionHandleStub, context: new ClientContextStub() }); const result = await session.getTableTypes({ maxRows: 10 }); expect(result).instanceOf(DBSQLOperation); }); it('should disable direct results', async () => { - const session = new DBSQLSession({ handle: sessionHandleStub, context: new ClientContextStub() }); + const session = createSessionForTest({ handle: sessionHandleStub, context: new ClientContextStub() }); const result = await session.getTableTypes({ maxRows: null }); expect(result).instanceOf(DBSQLOperation); }); @@ -419,13 +420,13 @@ describe('DBSQLSession', () => { describe('getColumns', () => { it('should run operation', async () => { - const session = new DBSQLSession({ handle: sessionHandleStub, context: new ClientContextStub() }); + const session = createSessionForTest({ handle: sessionHandleStub, context: new ClientContextStub() }); const result = await session.getColumns(); expect(result).instanceOf(DBSQLOperation); }); it('should use filters', async () => { - const session = new DBSQLSession({ handle: sessionHandleStub, context: new ClientContextStub() }); + const session = createSessionForTest({ handle: sessionHandleStub, context: new ClientContextStub() }); const result = await session.getColumns({ catalogName: 'catalog', schemaName: 'schema', @@ -436,13 +437,13 @@ describe('DBSQLSession', () => { }); it('should use direct results', async () => { - const session = new DBSQLSession({ handle: sessionHandleStub, context: new ClientContextStub() }); + const session = createSessionForTest({ handle: sessionHandleStub, context: new ClientContextStub() }); const result = await session.getColumns({ maxRows: 10 }); expect(result).instanceOf(DBSQLOperation); }); it('should disable direct results', async () => { - const session = new DBSQLSession({ handle: sessionHandleStub, context: new ClientContextStub() }); + const session = createSessionForTest({ handle: sessionHandleStub, context: new ClientContextStub() }); const result = await session.getColumns({ maxRows: null }); expect(result).instanceOf(DBSQLOperation); }); @@ -450,7 +451,7 @@ describe('DBSQLSession', () => { describe('getFunctions', () => { it('should run operation', async () => { - const session = new DBSQLSession({ handle: sessionHandleStub, context: new ClientContextStub() }); + const session = createSessionForTest({ handle: sessionHandleStub, context: new ClientContextStub() }); const result = await session.getFunctions({ catalogName: 'catalog', schemaName: 'schema', @@ -460,7 +461,7 @@ describe('DBSQLSession', () => { }); it('should use direct results', async () => { - const session = new DBSQLSession({ handle: sessionHandleStub, context: new ClientContextStub() }); + const session = createSessionForTest({ handle: sessionHandleStub, context: new ClientContextStub() }); const result = await session.getFunctions({ catalogName: 'catalog', schemaName: 'schema', @@ -471,7 +472,7 @@ describe('DBSQLSession', () => { }); it('should disable direct results', async () => { - const session = new DBSQLSession({ handle: sessionHandleStub, context: new ClientContextStub() }); + const session = createSessionForTest({ handle: sessionHandleStub, context: new ClientContextStub() }); const result = await session.getFunctions({ catalogName: 'catalog', schemaName: 'schema', @@ -484,7 +485,7 @@ describe('DBSQLSession', () => { describe('getPrimaryKeys', () => { it('should run operation', async () => { - const session = new DBSQLSession({ handle: sessionHandleStub, context: new ClientContextStub() }); + const session = createSessionForTest({ handle: sessionHandleStub, context: new ClientContextStub() }); const result = await session.getPrimaryKeys({ catalogName: 'catalog', schemaName: 'schema', @@ -494,7 +495,7 @@ describe('DBSQLSession', () => { }); it('should use direct results', async () => { - const session = new DBSQLSession({ handle: sessionHandleStub, context: new ClientContextStub() }); + const session = createSessionForTest({ handle: sessionHandleStub, context: new ClientContextStub() }); const result = await session.getPrimaryKeys({ catalogName: 'catalog', schemaName: 'schema', @@ -505,7 +506,7 @@ describe('DBSQLSession', () => { }); it('should disable direct results', async () => { - const session = new DBSQLSession({ handle: sessionHandleStub, context: new ClientContextStub() }); + const session = createSessionForTest({ handle: sessionHandleStub, context: new ClientContextStub() }); const result = await session.getPrimaryKeys({ catalogName: 'catalog', schemaName: 'schema', @@ -518,7 +519,7 @@ describe('DBSQLSession', () => { describe('getCrossReference', () => { it('should run operation', async () => { - const session = new DBSQLSession({ handle: sessionHandleStub, context: new ClientContextStub() }); + const session = createSessionForTest({ handle: sessionHandleStub, context: new ClientContextStub() }); const result = await session.getCrossReference({ parentCatalogName: 'parentCatalogName', parentSchemaName: 'parentSchemaName', @@ -531,7 +532,7 @@ describe('DBSQLSession', () => { }); it('should use direct results', async () => { - const session = new DBSQLSession({ handle: sessionHandleStub, context: new ClientContextStub() }); + const session = createSessionForTest({ handle: sessionHandleStub, context: new ClientContextStub() }); const result = await session.getCrossReference({ parentCatalogName: 'parentCatalogName', parentSchemaName: 'parentSchemaName', @@ -545,7 +546,7 @@ describe('DBSQLSession', () => { }); it('should disable direct results', async () => { - const session = new DBSQLSession({ handle: sessionHandleStub, context: new ClientContextStub() }); + const session = createSessionForTest({ handle: sessionHandleStub, context: new ClientContextStub() }); const result = await session.getCrossReference({ parentCatalogName: 'parentCatalogName', parentSchemaName: 'parentSchemaName', @@ -564,7 +565,7 @@ describe('DBSQLSession', () => { const context = new ClientContextStub(); const driver = sinon.spy(context.driver); - const session = new DBSQLSession({ handle: sessionHandleStub, context }); + const session = createSessionForTest({ handle: sessionHandleStub, context }); expect(session['isOpen']).to.be.true; const result = await session.close(); @@ -577,7 +578,7 @@ describe('DBSQLSession', () => { const context = new ClientContextStub(); const driver = sinon.spy(context.driver); - const session = new DBSQLSession({ handle: sessionHandleStub, context }); + const session = createSessionForTest({ handle: sessionHandleStub, context }); expect(session['isOpen']).to.be.true; const result = await session.close(); @@ -592,7 +593,7 @@ describe('DBSQLSession', () => { }); it('should close operations that belong to it', async () => { - const session = new DBSQLSession({ handle: sessionHandleStub, context: new ClientContextStub() }); + const session = createSessionForTest({ handle: sessionHandleStub, context: new ClientContextStub() }); const operation = await session.executeStatement('SELECT * FROM table'); if (!(operation instanceof DBSQLOperation)) { expect.fail('Assertion error: operation is not a DBSQLOperation'); @@ -614,7 +615,7 @@ describe('DBSQLSession', () => { }); it('should reject all methods once closed', async () => { - const session = new DBSQLSession({ handle: sessionHandleStub, context: new ClientContextStub() }); + const session = createSessionForTest({ handle: sessionHandleStub, context: new ClientContextStub() }); await session.close(); expect(session['isOpen']).to.be.false; diff --git a/tests/unit/sea/SeaBackend.test.ts b/tests/unit/sea/SeaBackend.test.ts new file mode 100644 index 00000000..ff9e45c9 --- /dev/null +++ b/tests/unit/sea/SeaBackend.test.ts @@ -0,0 +1,39 @@ +import { expect, AssertionError } from 'chai'; +import SeaBackend from '../../../lib/sea/SeaBackend'; +import HiveDriverError from '../../../lib/errors/HiveDriverError'; +import { ConnectionOptions, OpenSessionRequest } from '../../../lib/contracts/IDBSQLClient'; + +describe('SeaBackend stub', () => { + it('connect() rejects with HiveDriverError until M1 wires the binding', async () => { + const backend = new SeaBackend(); + try { + await backend.connect({ host: '', path: '', token: '' } as ConnectionOptions); + expect.fail('It should throw an error'); + } catch (error) { + if (error instanceof AssertionError || !(error instanceof Error)) { + throw error; + } + expect(error).to.be.instanceOf(HiveDriverError); + expect(error.message).to.contain('not implemented'); + } + }); + + it('openSession() rejects with HiveDriverError until M1 wires the binding', async () => { + const backend = new SeaBackend(); + try { + await backend.openSession({} as OpenSessionRequest); + expect.fail('It should throw an error'); + } catch (error) { + if (error instanceof AssertionError || !(error instanceof Error)) { + throw error; + } + expect(error).to.be.instanceOf(HiveDriverError); + expect(error.message).to.contain('not implemented'); + } + }); + + it('close() is a no-op so DBSQLClient.close() can finish state-clearing after a failed connect', async () => { + const backend = new SeaBackend(); + await backend.close(); + }); +});