Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions java_codebase_rag/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,12 @@ def _cmd_init(args: argparse.Namespace) -> int:
return 2
cfg.index_dir.mkdir(parents=True, exist_ok=True)
env = cfg.subprocess_env()
coco = run_cocoindex_update(env, full_reprocess=False, quiet=bool(args.quiet))
coco = run_cocoindex_update(
env,
full_reprocess=False,
quiet=bool(args.quiet),
lance_project_root=None if args.quiet else cfg.source_root,
)
if coco.returncode != 0:
_emit(
{
Expand Down Expand Up @@ -208,7 +213,12 @@ def _cmd_increment(args: argparse.Namespace) -> int:
cfg.apply_to_os_environ()
_emit_increment_kuzu_warning()
env = cfg.subprocess_env()
coco = run_cocoindex_update(env, full_reprocess=False, quiet=bool(args.quiet))
coco = run_cocoindex_update(
env,
full_reprocess=False,
quiet=bool(args.quiet),
lance_project_root=None if args.quiet else cfg.source_root,
)
if coco.returncode != 0:
_emit(
{
Expand Down
52 changes: 52 additions & 0 deletions java_codebase_rag/cli_progress.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""CLI-owned stderr progress lines (shared by server reprocess path and pipeline helpers)."""
from __future__ import annotations

import asyncio
import sys
from pathlib import Path


def emit_lance_cocoindex_start(project_root: Path) -> None:
root = project_root.expanduser().resolve()
print(
f"[lance] running cocoindex update (project_root={root})",
file=sys.stderr,
flush=True,
)


def emit_lance_cocoindex_finish(*, elapsed_s: float, exit_code: int) -> None:
print(
f"[lance] cocoindex update finished in {elapsed_s:.2f}s (exit={exit_code})",
file=sys.stderr,
flush=True,
)


async def accumulate_and_relay_subprocess_streams(
proc: asyncio.subprocess.Process,
*,
relay: bool,
) -> tuple[bytes, bytes]:
"""Read stdout and stderr until EOF; optionally copy each chunk verbatim to stderr."""
stdout = proc.stdout
stderr = proc.stderr
if stdout is None or stderr is None:
raise RuntimeError("subprocess must be created with stdout=PIPE and stderr=PIPE")

out_buf = bytearray()
err_buf = bytearray()

async def drain(reader: asyncio.StreamReader, target: bytearray) -> None:
while True:
chunk = await reader.read(65536)
if not chunk:
break
target.extend(chunk)
if relay:
sys.stderr.buffer.write(chunk)
sys.stderr.buffer.flush()

await asyncio.gather(drain(stdout, out_buf), drain(stderr, err_buf))
await proc.wait()
return bytes(out_buf), bytes(err_buf)
93 changes: 83 additions & 10 deletions java_codebase_rag/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@
import os
import subprocess
import sys
import threading
import time
from pathlib import Path

from java_codebase_rag.cli_progress import emit_lance_cocoindex_finish, emit_lance_cocoindex_start

COCOINDEX_TARGET = "java_index_flow_lancedb.py:JavaCodeIndexLance"


Expand All @@ -17,11 +21,48 @@ def cocoindex_bin() -> Path:
return Path(sys.executable).parent / "cocoindex"


def _popen_stream_to_stderr(
proc: subprocess.Popen[bytes],
) -> tuple[str, str, int]:
out_buf = bytearray()
err_buf = bytearray()

def drain_out() -> None:
assert proc.stdout is not None
while True:
chunk = proc.stdout.read(65536)
if not chunk:
break
out_buf.extend(chunk)
sys.stderr.buffer.write(chunk)
sys.stderr.buffer.flush()

def drain_err() -> None:
assert proc.stderr is not None
while True:
chunk = proc.stderr.read(65536)
if not chunk:
break
err_buf.extend(chunk)
sys.stderr.buffer.write(chunk)
sys.stderr.buffer.flush()

t_out = threading.Thread(target=drain_out, name="stream-stdout", daemon=True)
t_err = threading.Thread(target=drain_err, name="stream-stderr", daemon=True)
t_out.start()
t_err.start()
t_out.join()
t_err.join()
code = proc.wait()
return out_buf.decode(errors="replace"), err_buf.decode(errors="replace"), code


def run_cocoindex_update(
env: dict[str, str],
*,
full_reprocess: bool,
quiet: bool,
lance_project_root: Path | None = None,
) -> subprocess.CompletedProcess[str]:
exe = cocoindex_bin()
if not exe.is_file():
Expand All @@ -47,13 +88,34 @@ def run_cocoindex_update(
cmd.append("-f")
if quiet:
cmd.append("-q")
return subprocess.run(
cmd,
cwd=str(bd),
env=env,
capture_output=True,
text=True,
)
return subprocess.run(
cmd,
cwd=str(bd),
env=env,
capture_output=True,
text=True,
)

emit_lance = lance_project_root is not None
if emit_lance:
emit_lance_cocoindex_start(lance_project_root)
t0 = time.perf_counter()
code = -1
out_s, err_s = "", ""
try:
proc = subprocess.Popen(
cmd,
cwd=str(bd),
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=0,
)
out_s, err_s, code = _popen_stream_to_stderr(proc)
finally:
if emit_lance:
emit_lance_cocoindex_finish(elapsed_s=time.perf_counter() - t0, exit_code=code)
return subprocess.CompletedProcess(args=cmd, returncode=code, stdout=out_s, stderr=err_s)


def run_cocoindex_drop(env: dict[str, str], *, quiet: bool) -> subprocess.CompletedProcess[str]:
Expand Down Expand Up @@ -103,13 +165,24 @@ def run_build_ast_graph(
]
if verbose:
cmd.append("--verbose")
return subprocess.run(
if not verbose:
return subprocess.run(
cmd,
cwd=str(source_root),
env=env or os.environ.copy(),
capture_output=True,
text=True,
)
proc = subprocess.Popen(
cmd,
cwd=str(source_root),
env=env or os.environ.copy(),
capture_output=True,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=0,
)
out_s, err_s, code = _popen_stream_to_stderr(proc)
return subprocess.CompletedProcess(args=cmd, returncode=code, stdout=out_s, stderr=err_s)


def clip(s: str, n: int) -> str:
Expand Down
79 changes: 59 additions & 20 deletions server.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,17 @@
import asyncio
import os
import sys
import time
from pathlib import Path
from typing import Any, Literal

import mcp_v2
from index_common import SBERT_MODEL
from java_codebase_rag.cli_progress import (
accumulate_and_relay_subprocess_streams,
emit_lance_cocoindex_finish,
emit_lance_cocoindex_start,
)
from java_codebase_rag.config import emit_legacy_env_hints_if_present, resolved_sbert_model_for_process_env
from kuzu_queries import KuzuGraph, resolve_kuzu_path
from mcp.server.fastmcp import FastMCP
Expand Down Expand Up @@ -213,25 +219,55 @@ async def run_refresh_pipeline(*, quiet: bool = False) -> RefreshIndexOutput:
message=f"java_index_flow_lancedb.py not found under {root} nor {bundle_dir}",
phases_run=[],
)
try:
proc = await asyncio.create_subprocess_exec(
str(cocoindex_bin),
"update",
_COCOINDEX_TARGET,
"--full-reprocess",
"-f",
cwd=str(flow_path.parent),
env=_cocoindex_subprocess_env(root),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
out_b, err_b = await proc.communicate()
except Exception as exc:
return RefreshIndexOutput(
success=False,
message=f"spawn failed: {exc!s}",
phases_run=[],
)
proc: asyncio.subprocess.Process | None = None
out_b, err_b = b"", b""
if quiet:
try:
proc = await asyncio.create_subprocess_exec(
str(cocoindex_bin),
"update",
_COCOINDEX_TARGET,
"--full-reprocess",
"-f",
cwd=str(flow_path.parent),
env=_cocoindex_subprocess_env(root),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
out_b, err_b = await proc.communicate()
except Exception as exc:
return RefreshIndexOutput(
success=False,
message=f"spawn failed: {exc!s}",
phases_run=[],
)
else:
emit_lance_cocoindex_start(root)
t0 = time.perf_counter()
code_c = -1
try:
proc = await asyncio.create_subprocess_exec(
str(cocoindex_bin),
"update",
_COCOINDEX_TARGET,
"--full-reprocess",
"-f",
cwd=str(flow_path.parent),
env=_cocoindex_subprocess_env(root),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
out_b, err_b = await accumulate_and_relay_subprocess_streams(proc, relay=True)
code_c = proc.returncode if proc.returncode is not None else -1
except Exception as exc:
return RefreshIndexOutput(
success=False,
message=f"spawn failed: {exc!s}",
phases_run=[],
)
finally:
emit_lance_cocoindex_finish(elapsed_s=time.perf_counter() - t0, exit_code=code_c)
assert proc is not None
out = out_b.decode(errors="replace")
err = err_b.decode(errors="replace")
ok = proc.returncode == 0
Expand Down Expand Up @@ -261,7 +297,10 @@ async def run_refresh_pipeline(*, quiet: bool = False) -> RefreshIndexOutput:
stderr=asyncio.subprocess.PIPE,
)
phases_run = ["vectors", "graph"]
gout_b, gerr_b = await gproc.communicate()
if quiet:
gout_b, gerr_b = await gproc.communicate()
else:
gout_b, gerr_b = await accumulate_and_relay_subprocess_streams(gproc, relay=True)
graph_code = gproc.returncode
graph_out = gout_b.decode(errors="replace")
graph_err = gerr_b.decode(errors="replace")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"message": "init completed", "success": true}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"exit_code": 0, "graph_exit_code": 0, "graph_stderr": "", "graph_stdout": "", "message": null, "phases_run": ["vectors", "graph"], "stderr": "", "stdout": "", "success": true}
Loading
Loading