Skip to content

Commit 291e18e

Browse files
fix(backend/kernel): retain parent Statement for async-submitted commands
The kernel's `Statement.close()` invalidates any executed handle the Statement produced (see `databricks-sql-kernel/src/statement/validity.rs` and `mutable::close`). Today `KernelDatabricksClient.execute_command` closes `stmt` in `finally` regardless of `async_op` — which is correct for the sync path (`stmt.execute()` returns a fully-materialised result before the close) but wrong for the async path: as soon as the user calls `cursor.execute_async(...)` and tries to poll or fetch, the async handle is already invalidated and the next operation raises "executed-statement handle has been invalidated by a re-execute on its parent Statement". This makes the entire async surface (`execute_async` + `is_query_pending`/`get_query_state` + `get_async_execution_result`) effectively non-functional on the kernel backend today. Fix: when `async_op=True`, retain the parent Statement in a new `_async_statements` dict alongside `_async_handles`, and close it in `close_command`, `close_session`, and `get_execution_result` (which already closes the async handle once the result stream is produced). The sync path is unchanged. Discovered via the python-comparator audit harness running the `STATEMENT_ASYNC` suite — the kernel side produced 0 HTTP requests because every async cursor operation short-circuited locally on the invalidated handle. Test: conn = sql.connect(..., use_kernel=True) cur = conn.cursor() cur.execute_async("SELECT 1 AS x") while cur.is_query_pending(): time.sleep(0.2) cur.get_async_execution_result() assert cur.fetchall() == [(1,)] Previously raised KernelError("InvalidStatementHandle: …") on `is_query_pending`; now succeeds. Signed-off-by: Vikrant Puppala <vikrant.puppala@databricks.com>
1 parent fb55001 commit 291e18e

1 file changed

Lines changed: 71 additions & 10 deletions

File tree

src/databricks/sql/backend/kernel/client.py

Lines changed: 71 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,15 @@ def __init__(
129129
# Guarded by ``_async_handles_lock`` so concurrent cursors on the
130130
# same connection don't race on submit / close / close-session.
131131
self._async_handles: Dict[str, Any] = {}
132+
# Parent ``Statement`` objects kept alive alongside async handles.
133+
# On the kernel, ``Statement.close()`` flips the validity flag on
134+
# the produced executed handle (see kernel
135+
# ``statement::mutable::close``), so we cannot close the
136+
# Statement immediately after ``submit()`` as we do for sync
137+
# ``execute()``. Instead retain it here and close it in
138+
# ``close_command`` / ``close_session`` after the async handle
139+
# has finished its work.
140+
self._async_statements: Dict[str, Any] = {}
132141
# CommandId.guids of async commands that have already been
133142
# closed (via ``close_command`` or ``close_session``). Lets
134143
# ``get_query_state`` report ``CLOSED`` for them rather than
@@ -197,7 +206,9 @@ def close_session(self, session_id: SessionId) -> None:
197206
# server-side CloseStatement before the session goes away.
198207
with self._async_handles_lock:
199208
tracked = list(self._async_handles.items())
209+
tracked_stmts = list(self._async_statements.items())
200210
self._async_handles.clear()
211+
self._async_statements.clear()
201212
for guid, _ in tracked:
202213
self._closed_commands.add(guid)
203214
for _, handle in tracked:
@@ -211,6 +222,16 @@ def close_session(self, session_id: SessionId) -> None:
211222
logger.warning(
212223
"Error closing async handle during session close: %s", exc
213224
)
225+
# Now drop the parent Statements that were keeping those handles
226+
# alive. Same non-fatal close semantics — close errors are not
227+
# actionable at session-close time.
228+
for _, stmt in tracked_stmts:
229+
try:
230+
stmt.close()
231+
except Exception as exc:
232+
logger.warning(
233+
"Error closing async statement during session close: %s", exc
234+
)
214235
try:
215236
self._kernel_session.close()
216237
except Exception as exc:
@@ -249,6 +270,11 @@ def execute_command(
249270
stmt = self._kernel_session.statement()
250271
except Exception as exc:
251272
raise _wrap_kernel_exception("execute_command", exc) from exc
273+
# ``async_op`` keeps ``stmt`` alive (tracked in
274+
# ``_async_statements`` and closed by ``close_command``); the sync
275+
# path drops it in finally. ``close_stmt`` is the post-success
276+
# decision flag — it stays True on sync, flips to False on async.
277+
close_stmt = True
252278
try:
253279
try:
254280
stmt.set_sql(operation)
@@ -262,21 +288,26 @@ def execute_command(
262288
cursor.active_command_id = command_id
263289
with self._async_handles_lock:
264290
self._async_handles[command_id.guid] = async_exec
291+
# Closing the kernel ``Statement`` invalidates the
292+
# async handle (see kernel validity flag). Retain
293+
# the Statement here and close it on
294+
# ``close_command`` / ``close_session``.
295+
self._async_statements[command_id.guid] = stmt
296+
close_stmt = False
265297
return None
266298
executed = stmt.execute()
267299
except Exception as exc:
268300
raise _wrap_kernel_exception("execute_command", exc) from exc
269301
finally:
270-
# ``Statement`` is a lifecycle owner separate from the
271-
# executed handle it produces. Drop it here so the
272-
# parent doesn't keep the handle alive longer than the
273-
# caller expects. Swallow all close errors (including
274-
# PyO3 native exceptions) — a failed stmt.close() is
275-
# not actionable for the caller.
276-
try:
277-
stmt.close()
278-
except Exception:
279-
pass
302+
if close_stmt:
303+
# Sync path: ``Statement`` is a lifecycle owner separate
304+
# from the executed handle. Drop it here so the parent
305+
# doesn't outlive its caller. Swallow close errors —
306+
# they're not actionable.
307+
try:
308+
stmt.close()
309+
except Exception:
310+
pass
280311

281312
command_id = CommandId.from_sea_statement_id(executed.statement_id)
282313
cursor.active_command_id = command_id
@@ -307,17 +338,34 @@ def cancel_command(self, command_id: CommandId) -> None:
307338
def close_command(self, command_id: CommandId) -> None:
308339
with self._async_handles_lock:
309340
handle = self._async_handles.pop(command_id.guid, None)
341+
stmt = self._async_statements.pop(command_id.guid, None)
310342
if handle is not None:
311343
# Record the close so ``get_query_state`` can report
312344
# ``CLOSED`` (not ``SUCCEEDED``) for this command.
313345
self._closed_commands.add(command_id.guid)
314346
if handle is None:
315347
logger.debug("close_command: no tracked handle for %s", command_id)
348+
# Still drop the parent Statement if somehow tracked without
349+
# the handle — keeps the invariant clean even on bookkeeping
350+
# races.
351+
if stmt is not None:
352+
try:
353+
stmt.close()
354+
except Exception:
355+
pass
316356
return
317357
try:
318358
handle.close()
319359
except Exception as exc:
320360
raise _wrap_kernel_exception("close_command", exc) from exc
361+
finally:
362+
# Now safe to close the parent Statement — the executed
363+
# handle has finished its lifecycle.
364+
if stmt is not None:
365+
try:
366+
stmt.close()
367+
except Exception:
368+
pass
321369

322370
def get_query_state(self, command_id: CommandId) -> CommandState:
323371
with self._async_handles_lock:
@@ -378,6 +426,7 @@ def get_execution_result(
378426
# it wraps. Drop tracking and fire-and-forget the close.
379427
with self._async_handles_lock:
380428
self._async_handles.pop(command_id.guid, None)
429+
stmt = self._async_statements.pop(command_id.guid, None)
381430
self._closed_commands.add(command_id.guid)
382431
try:
383432
async_exec.close()
@@ -387,6 +436,18 @@ def get_execution_result(
387436
command_id,
388437
exc,
389438
)
439+
# The parent Statement is no longer needed once the async handle
440+
# has produced its ResultStream. Close to release server-side
441+
# tracking; matches the sync path's eager Statement close.
442+
if stmt is not None:
443+
try:
444+
stmt.close()
445+
except Exception as exc:
446+
logger.warning(
447+
"Error closing async statement after await_result for %s: %s",
448+
command_id,
449+
exc,
450+
)
390451
# ``KernelResultSet.__init__`` calls ``arrow_schema()`` which
391452
# can raise — map that to PEP 249 too.
392453
try:

0 commit comments

Comments
 (0)