From 00f58b0f65599e55a42a7de143aed1682159ef73 Mon Sep 17 00:00:00 2001 From: Dmitry Teryaev Date: Wed, 13 May 2026 18:09:40 +0300 Subject: [PATCH] stream lifecycle subprocess output and bracket cocoindex with lance lines Co-authored-by: Cursor --- java_codebase_rag/cli.py | 14 +- java_codebase_rag/cli_progress.py | 52 +++++ java_codebase_rag/pipeline.py | 93 ++++++++- server.py | 79 ++++++-- .../init_quiet_success.stdout.txt | 1 + .../reprocess_quiet_success.stdout.txt | 1 + tests/test_cli_progress_stdout_invariant.py | 182 ++++++++++++++++++ 7 files changed, 390 insertions(+), 32 deletions(-) create mode 100644 java_codebase_rag/cli_progress.py create mode 100644 tests/fixtures/cli_progress_stdout/init_quiet_success.stdout.txt create mode 100644 tests/fixtures/cli_progress_stdout/reprocess_quiet_success.stdout.txt create mode 100644 tests/test_cli_progress_stdout_invariant.py diff --git a/java_codebase_rag/cli.py b/java_codebase_rag/cli.py index 9c5dea0..a4082a8 100644 --- a/java_codebase_rag/cli.py +++ b/java_codebase_rag/cli.py @@ -169,7 +169,12 @@ def _cmd_init(args: argparse.Namespace) -> int: return 2 cfg.index_dir.mkdir(parents=True, exist_ok=True) env = cfg.subprocess_env() - coco = run_cocoindex_update(env, full_reprocess=False, quiet=bool(args.quiet)) + coco = run_cocoindex_update( + env, + full_reprocess=False, + quiet=bool(args.quiet), + lance_project_root=None if args.quiet else cfg.source_root, + ) if coco.returncode != 0: _emit( { @@ -208,7 +213,12 @@ def _cmd_increment(args: argparse.Namespace) -> int: cfg.apply_to_os_environ() _emit_increment_kuzu_warning() env = cfg.subprocess_env() - coco = run_cocoindex_update(env, full_reprocess=False, quiet=bool(args.quiet)) + coco = run_cocoindex_update( + env, + full_reprocess=False, + quiet=bool(args.quiet), + lance_project_root=None if args.quiet else cfg.source_root, + ) if coco.returncode != 0: _emit( { diff --git a/java_codebase_rag/cli_progress.py b/java_codebase_rag/cli_progress.py new file mode 100644 index 0000000..4744852 --- /dev/null +++ b/java_codebase_rag/cli_progress.py @@ -0,0 +1,52 @@ +"""CLI-owned stderr progress lines (shared by server reprocess path and pipeline helpers).""" +from __future__ import annotations + +import asyncio +import sys +from pathlib import Path + + +def emit_lance_cocoindex_start(project_root: Path) -> None: + root = project_root.expanduser().resolve() + print( + f"[lance] running cocoindex update (project_root={root})", + file=sys.stderr, + flush=True, + ) + + +def emit_lance_cocoindex_finish(*, elapsed_s: float, exit_code: int) -> None: + print( + f"[lance] cocoindex update finished in {elapsed_s:.2f}s (exit={exit_code})", + file=sys.stderr, + flush=True, + ) + + +async def accumulate_and_relay_subprocess_streams( + proc: asyncio.subprocess.Process, + *, + relay: bool, +) -> tuple[bytes, bytes]: + """Read stdout and stderr until EOF; optionally copy each chunk verbatim to stderr.""" + stdout = proc.stdout + stderr = proc.stderr + if stdout is None or stderr is None: + raise RuntimeError("subprocess must be created with stdout=PIPE and stderr=PIPE") + + out_buf = bytearray() + err_buf = bytearray() + + async def drain(reader: asyncio.StreamReader, target: bytearray) -> None: + while True: + chunk = await reader.read(65536) + if not chunk: + break + target.extend(chunk) + if relay: + sys.stderr.buffer.write(chunk) + sys.stderr.buffer.flush() + + await asyncio.gather(drain(stdout, out_buf), drain(stderr, err_buf)) + await proc.wait() + return bytes(out_buf), bytes(err_buf) diff --git a/java_codebase_rag/pipeline.py b/java_codebase_rag/pipeline.py index cd8af1a..e2b709b 100644 --- a/java_codebase_rag/pipeline.py +++ b/java_codebase_rag/pipeline.py @@ -4,8 +4,12 @@ import os import subprocess import sys +import threading +import time from pathlib import Path +from java_codebase_rag.cli_progress import emit_lance_cocoindex_finish, emit_lance_cocoindex_start + COCOINDEX_TARGET = "java_index_flow_lancedb.py:JavaCodeIndexLance" @@ -17,11 +21,48 @@ def cocoindex_bin() -> Path: return Path(sys.executable).parent / "cocoindex" +def _popen_stream_to_stderr( + proc: subprocess.Popen[bytes], +) -> tuple[str, str, int]: + out_buf = bytearray() + err_buf = bytearray() + + def drain_out() -> None: + assert proc.stdout is not None + while True: + chunk = proc.stdout.read(65536) + if not chunk: + break + out_buf.extend(chunk) + sys.stderr.buffer.write(chunk) + sys.stderr.buffer.flush() + + def drain_err() -> None: + assert proc.stderr is not None + while True: + chunk = proc.stderr.read(65536) + if not chunk: + break + err_buf.extend(chunk) + sys.stderr.buffer.write(chunk) + sys.stderr.buffer.flush() + + t_out = threading.Thread(target=drain_out, name="stream-stdout", daemon=True) + t_err = threading.Thread(target=drain_err, name="stream-stderr", daemon=True) + t_out.start() + t_err.start() + t_out.join() + t_err.join() + code = proc.wait() + return out_buf.decode(errors="replace"), err_buf.decode(errors="replace"), code + + def run_cocoindex_update( env: dict[str, str], *, full_reprocess: bool, quiet: bool, + lance_project_root: Path | None = None, ) -> subprocess.CompletedProcess[str]: exe = cocoindex_bin() if not exe.is_file(): @@ -47,13 +88,34 @@ def run_cocoindex_update( cmd.append("-f") if quiet: cmd.append("-q") - return subprocess.run( - cmd, - cwd=str(bd), - env=env, - capture_output=True, - text=True, - ) + return subprocess.run( + cmd, + cwd=str(bd), + env=env, + capture_output=True, + text=True, + ) + + emit_lance = lance_project_root is not None + if emit_lance: + emit_lance_cocoindex_start(lance_project_root) + t0 = time.perf_counter() + code = -1 + out_s, err_s = "", "" + try: + proc = subprocess.Popen( + cmd, + cwd=str(bd), + env=env, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + bufsize=0, + ) + out_s, err_s, code = _popen_stream_to_stderr(proc) + finally: + if emit_lance: + emit_lance_cocoindex_finish(elapsed_s=time.perf_counter() - t0, exit_code=code) + return subprocess.CompletedProcess(args=cmd, returncode=code, stdout=out_s, stderr=err_s) def run_cocoindex_drop(env: dict[str, str], *, quiet: bool) -> subprocess.CompletedProcess[str]: @@ -103,13 +165,24 @@ def run_build_ast_graph( ] if verbose: cmd.append("--verbose") - return subprocess.run( + if not verbose: + return subprocess.run( + cmd, + cwd=str(source_root), + env=env or os.environ.copy(), + capture_output=True, + text=True, + ) + proc = subprocess.Popen( cmd, cwd=str(source_root), env=env or os.environ.copy(), - capture_output=True, - text=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + bufsize=0, ) + out_s, err_s, code = _popen_stream_to_stderr(proc) + return subprocess.CompletedProcess(args=cmd, returncode=code, stdout=out_s, stderr=err_s) def clip(s: str, n: int) -> str: diff --git a/server.py b/server.py index e4eade7..8650323 100644 --- a/server.py +++ b/server.py @@ -5,11 +5,17 @@ import asyncio import os import sys +import time from pathlib import Path from typing import Any, Literal import mcp_v2 from index_common import SBERT_MODEL +from java_codebase_rag.cli_progress import ( + accumulate_and_relay_subprocess_streams, + emit_lance_cocoindex_finish, + emit_lance_cocoindex_start, +) from java_codebase_rag.config import emit_legacy_env_hints_if_present, resolved_sbert_model_for_process_env from kuzu_queries import KuzuGraph, resolve_kuzu_path from mcp.server.fastmcp import FastMCP @@ -213,25 +219,55 @@ async def run_refresh_pipeline(*, quiet: bool = False) -> RefreshIndexOutput: message=f"java_index_flow_lancedb.py not found under {root} nor {bundle_dir}", phases_run=[], ) - try: - proc = await asyncio.create_subprocess_exec( - str(cocoindex_bin), - "update", - _COCOINDEX_TARGET, - "--full-reprocess", - "-f", - cwd=str(flow_path.parent), - env=_cocoindex_subprocess_env(root), - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - out_b, err_b = await proc.communicate() - except Exception as exc: - return RefreshIndexOutput( - success=False, - message=f"spawn failed: {exc!s}", - phases_run=[], - ) + proc: asyncio.subprocess.Process | None = None + out_b, err_b = b"", b"" + if quiet: + try: + proc = await asyncio.create_subprocess_exec( + str(cocoindex_bin), + "update", + _COCOINDEX_TARGET, + "--full-reprocess", + "-f", + cwd=str(flow_path.parent), + env=_cocoindex_subprocess_env(root), + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + out_b, err_b = await proc.communicate() + except Exception as exc: + return RefreshIndexOutput( + success=False, + message=f"spawn failed: {exc!s}", + phases_run=[], + ) + else: + emit_lance_cocoindex_start(root) + t0 = time.perf_counter() + code_c = -1 + try: + proc = await asyncio.create_subprocess_exec( + str(cocoindex_bin), + "update", + _COCOINDEX_TARGET, + "--full-reprocess", + "-f", + cwd=str(flow_path.parent), + env=_cocoindex_subprocess_env(root), + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + out_b, err_b = await accumulate_and_relay_subprocess_streams(proc, relay=True) + code_c = proc.returncode if proc.returncode is not None else -1 + except Exception as exc: + return RefreshIndexOutput( + success=False, + message=f"spawn failed: {exc!s}", + phases_run=[], + ) + finally: + emit_lance_cocoindex_finish(elapsed_s=time.perf_counter() - t0, exit_code=code_c) + assert proc is not None out = out_b.decode(errors="replace") err = err_b.decode(errors="replace") ok = proc.returncode == 0 @@ -261,7 +297,10 @@ async def run_refresh_pipeline(*, quiet: bool = False) -> RefreshIndexOutput: stderr=asyncio.subprocess.PIPE, ) phases_run = ["vectors", "graph"] - gout_b, gerr_b = await gproc.communicate() + if quiet: + gout_b, gerr_b = await gproc.communicate() + else: + gout_b, gerr_b = await accumulate_and_relay_subprocess_streams(gproc, relay=True) graph_code = gproc.returncode graph_out = gout_b.decode(errors="replace") graph_err = gerr_b.decode(errors="replace") diff --git a/tests/fixtures/cli_progress_stdout/init_quiet_success.stdout.txt b/tests/fixtures/cli_progress_stdout/init_quiet_success.stdout.txt new file mode 100644 index 0000000..ed31511 --- /dev/null +++ b/tests/fixtures/cli_progress_stdout/init_quiet_success.stdout.txt @@ -0,0 +1 @@ +{"message": "init completed", "success": true} diff --git a/tests/fixtures/cli_progress_stdout/reprocess_quiet_success.stdout.txt b/tests/fixtures/cli_progress_stdout/reprocess_quiet_success.stdout.txt new file mode 100644 index 0000000..0bb74d2 --- /dev/null +++ b/tests/fixtures/cli_progress_stdout/reprocess_quiet_success.stdout.txt @@ -0,0 +1 @@ +{"exit_code": 0, "graph_exit_code": 0, "graph_stderr": "", "graph_stdout": "", "message": null, "phases_run": ["vectors", "graph"], "stderr": "", "stdout": "", "success": true} diff --git a/tests/test_cli_progress_stdout_invariant.py b/tests/test_cli_progress_stdout_invariant.py new file mode 100644 index 0000000..941f313 --- /dev/null +++ b/tests/test_cli_progress_stdout_invariant.py @@ -0,0 +1,182 @@ +from __future__ import annotations + +import asyncio +import io +import os +import shutil +import subprocess +import sys +from contextlib import redirect_stderr, redirect_stdout +from pathlib import Path + +import pytest + +from java_codebase_rag.cli_progress import accumulate_and_relay_subprocess_streams + +_FIXTURE_DIR = Path(__file__).resolve().parent / "fixtures" / "cli_progress_stdout" + + +def _cocoindex_available() -> bool: + return (Path(sys.executable).parent / "cocoindex").is_file() + + +def _run_cli(args: list[str], *, env: dict[str, str]) -> subprocess.CompletedProcess[str]: + exe = shutil.which("java-codebase-rag") + assert exe is not None + return subprocess.run( + [exe, *args], + capture_output=True, + text=True, + env=env, + check=False, + ) + + +async def test_stream_relay_arrives_before_wait(monkeypatch: pytest.MonkeyPatch) -> None: + recorded: list[bytes] = [] + orig_write = sys.stderr.buffer.write + + def capture_write(data: bytes | bytearray) -> int: + recorded.append(bytes(data)) + return orig_write(data) + + monkeypatch.setattr(sys.stderr.buffer, "write", capture_write) + + code = ( + "import sys, time\n" + "sys.stdout.buffer.write(b'EARLY\\n')\n" + "sys.stdout.buffer.flush()\n" + "time.sleep(0.35)\n" + "sys.stdout.buffer.write(b'LATE\\n')\n" + "sys.stdout.buffer.flush()\n" + ) + proc = await asyncio.create_subprocess_exec( + sys.executable, + "-u", + "-c", + code, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + acc_task = asyncio.create_task(accumulate_and_relay_subprocess_streams(proc, relay=True)) + joined = b"" + for _ in range(200): + joined = b"".join(recorded) + if b"EARLY" in joined: + break + await asyncio.sleep(0.02) + assert b"EARLY" in joined, joined + await acc_task + final = b"".join(recorded) + assert b"LATE" in final + + +def test_refresh_pipeline_quiet_stderr_baseline(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: + import server + + repo_root = Path(__file__).resolve().parent.parent + idx = tmp_path / "idx_q" + idx.mkdir(parents=True) + monkeypatch.setenv("JAVA_CODEBASE_RAG_SOURCE_ROOT", str(repo_root)) + monkeypatch.setenv("JAVA_CODEBASE_RAG_INDEX_DIR", str(idx)) + + real_is_file = Path.is_file + + def is_file_patched(self: Path) -> bool: + try: + if self.resolve() == (Path(sys.executable).parent / "cocoindex").resolve(): + return True + except OSError: + pass + return real_is_file(self) + + monkeypatch.setattr(Path, "is_file", is_file_patched) + + async def fake_create(*_a: object, **_k: object) -> object: + class _Proc: + returncode = 0 + + async def communicate(self) -> tuple[bytes, bytes]: + return b"idx_out", b"idx_err" + + return _Proc() + + monkeypatch.setattr(server.asyncio, "create_subprocess_exec", fake_create) + + buf = io.StringIO() + with redirect_stderr(buf): + out = asyncio.run(server.run_refresh_pipeline(quiet=True)) + err = buf.getvalue() + assert "[lance]" not in err + assert b"idx_out".decode() not in err + assert b"idx_err".decode() not in err + assert out.success is True + assert "idx_out" in out.stdout + + +@pytest.mark.skipif(not _cocoindex_available(), reason="cocoindex not installed in venv") +def test_cli_lifecycle_stdout_invariant_init(corpus_root: Path, tmp_path: Path) -> None: + baseline = (_FIXTURE_DIR / "init_quiet_success.stdout.txt").read_text(encoding="utf-8") + idx = tmp_path / "stdout_inv_init" + env = os.environ.copy() + env["JAVA_CODEBASE_RAG_INDEX_DIR"] = str(idx) + env["JAVA_CODEBASE_RAG_SOURCE_ROOT"] = str(corpus_root.resolve()) + e0 = _run_cli( + ["erase", "--source-root", str(corpus_root), "--index-dir", str(idx), "--yes"], + env=env, + ) + assert e0.returncode == 0, e0.stderr + proc = _run_cli( + ["init", "--source-root", str(corpus_root), "--index-dir", str(idx), "--quiet"], + env=env, + ) + assert proc.returncode == 0, proc.stdout + proc.stderr + assert proc.stdout == baseline + + +def test_cli_lifecycle_stdout_invariant_reprocess( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + import server + from java_codebase_rag import cli as cli_mod + from server import RefreshIndexOutput + + baseline = (_FIXTURE_DIR / "reprocess_quiet_success.stdout.txt").read_text(encoding="utf-8") + + async def fake_refresh(*, quiet: bool = False) -> RefreshIndexOutput: + _ = quiet + return RefreshIndexOutput( + success=True, + exit_code=0, + stdout="", + stderr="", + message=None, + graph_exit_code=0, + graph_stdout="", + graph_stderr="", + phases_run=["vectors", "graph"], + ) + + monkeypatch.setattr(server, "run_refresh_pipeline", fake_refresh) + + repo_root = Path(__file__).resolve().parent.parent + idx = tmp_path / "idx_rep_stdout" + idx.mkdir(parents=True) + monkeypatch.setenv("JAVA_CODEBASE_RAG_SOURCE_ROOT", str(repo_root)) + monkeypatch.setenv("JAVA_CODEBASE_RAG_INDEX_DIR", str(idx)) + + buf = io.StringIO() + with redirect_stdout(buf): + rc = cli_mod.main( + [ + "reprocess", + "--source-root", + str(repo_root), + "--index-dir", + str(idx), + "--quiet", + ], + ) + assert rc == 0 + assert buf.getvalue() == baseline