diff --git a/README.md b/README.md index 79e7ed8..ad48b3c 100644 --- a/README.md +++ b/README.md @@ -285,7 +285,7 @@ Example: Operator playbook with workflows, exit codes, and env alignment: [`docs/JAVA-CODEBASE-RAG-CLI.md`](./docs/JAVA-CODEBASE-RAG-CLI.md). -Run `java-codebase-rag --help` to list grouped subcommands (lifecycle / introspection / analysis). Output mode is automatic: JSON when piped, pretty text in a TTY. Module entrypoint: `python -m java_codebase_rag.cli`. +Run `java-codebase-rag --help` to list grouped subcommands (lifecycle / introspection / analysis). Output mode is automatic: JSON when piped, pretty text in a TTY. Module entrypoint: `python -m java_codebase_rag.cli`. Lifecycle commands (`init`, `increment`, `reprocess`, `erase`) stream subprocess progress to **stderr** (including any child stdout the tool relays); **`--quiet`** suppresses that human channel; **stdout** remains the machine-readable contract (JSON or pprint). Shared flags on all subcommands: `--source-root`, `--index-dir`, `--embedding-model`, `--embedding-device` (each optional; see the CLI guide for precedence). diff --git a/build_ast_graph.py b/build_ast_graph.py index a7689c2..4437344 100644 --- a/build_ast_graph.py +++ b/build_ast_graph.py @@ -31,6 +31,7 @@ import os import re import sys +import threading import time from collections import defaultdict from dataclasses import asdict, dataclass, field, replace @@ -67,6 +68,55 @@ log = logging.getLogger(__name__) +_VERBOSE_STDERR_LOCK = threading.Lock() + +_PASS1_START = "[pass1] starting · parsing Java files under source root" +_PASS2_START = "[pass2] starting · emitting EXTENDS / IMPLEMENTS / DECLARES rows" +_PASS3_START = "[pass3] starting · call resolution (outgoing calls per site)" +_PASS4_START = "[pass4] starting · route and EXPOSES extraction" +_PASS5_START = "[pass5] starting · imperative HTTP_CALLS / ASYNC_CALLS edges" +_PASS6_START = "[pass6] starting · cross-service call-edge matching" +_WRITE_START = "[write] starting · writing Kuzu graph to disk" + + +def _verbose_stderr_line(content: str) -> None: + with _VERBOSE_STDERR_LOCK: + print(content, file=sys.stderr, flush=True) + + +class _VerbosePassHeartbeats: + """Emit ``[tag] running … Ns elapsed`` every 5s on stderr while in scope (verbose only).""" + + def __init__(self, tag: str, *, verbose: bool) -> None: + self._tag = tag + self._verbose = verbose + self._thr: threading.Thread | None = None + self._stop: threading.Event | None = None + + def __enter__(self) -> None: + if not self._verbose: + return None + self._stop = threading.Event() + stop = self._stop + tag = self._tag + + def worker() -> None: + t0 = time.monotonic() + while not stop.wait(timeout=5.0): + elapsed = int(time.monotonic() - t0) + _verbose_stderr_line(f"{tag} running … {elapsed}s elapsed") + + self._thr = threading.Thread(target=worker, name=f"hb-{tag}", daemon=True) + self._thr.start() + return None + + def __exit__(self, exc_type, exc, tb) -> bool: + if self._thr is not None and self._stop is not None: + self._stop.set() + self._thr.join(timeout=2.0) + return False + + _JAVA_LANG_SIMPLE = frozenset({ "Object", "String", "Integer", "Long", "Short", "Byte", "Boolean", "Double", "Float", "Character", "Number", "Void", "Class", "Enum", "Record", @@ -362,51 +412,64 @@ def pass1_parse(root: Path, tables: GraphTables, *, verbose: bool) -> dict[str, ignore = LayeredIgnore(root) t0 = time.time() n_files = 0 - for p in iter_java_source_files(root, ignore=ignore): - n_files += 1 - try: - content = p.read_bytes() - except OSError: - tables.skipped_files += 1 - continue - if not content.strip(): - continue + if verbose: + _verbose_stderr_line(_PASS1_START) + slow_sec = 0.0 + raw_slow = os.environ.get("JAVA_CODEBASE_RAG_TEST_GRAPH_SLOW_SEC", "").strip() + if raw_slow: try: - rel = p.resolve().relative_to(root.resolve()).as_posix() + slow_sec = float(raw_slow) except ValueError: - rel = p.as_posix() - try: - ast = parse_java(content, filename=rel, verbose=verbose) - except Exception: - tables.parse_errors += 1 - continue - if ast.parse_error: - tables.parse_errors += 1 - # Still index what tree-sitter gave us; robust to syntax errors. - module = module_for_path(str(p), root) - microservice = microservice_for_path(str(p), root) - asts[rel] = ast - - # file node - file_id = symbol_id("file", rel, rel, 0) - tables.files[rel] = file_id - - # package node (created lazily; nodes deduped by id) - if ast.package and ast.package not in tables.packages: - tables.packages[ast.package] = symbol_id("package", ast.package, "", 0) - - for t in ast.top_level_types: - _register_type( - tables, t, file_path=rel, - module=module, microservice=microservice, outer_fqn=None, - ) + slow_sec = 0.0 + with _VerbosePassHeartbeats("[pass1]", verbose=verbose): + if verbose and slow_sec > 0: + time.sleep(slow_sec) + for p in iter_java_source_files(root, ignore=ignore): + n_files += 1 + try: + content = p.read_bytes() + except OSError: + tables.skipped_files += 1 + continue + if not content.strip(): + continue + try: + rel = p.resolve().relative_to(root.resolve()).as_posix() + except ValueError: + rel = p.as_posix() + try: + ast = parse_java(content, filename=rel, verbose=verbose) + except Exception: + tables.parse_errors += 1 + continue + if ast.parse_error: + tables.parse_errors += 1 + # Still index what tree-sitter gave us; robust to syntax errors. + module = module_for_path(str(p), root) + microservice = microservice_for_path(str(p), root) + asts[rel] = ast + + # file node + file_id = symbol_id("file", rel, rel, 0) + tables.files[rel] = file_id + + # package node (created lazily; nodes deduped by id) + if ast.package and ast.package not in tables.packages: + tables.packages[ast.package] = symbol_id("package", ast.package, "", 0) + + for t in ast.top_level_types: + _register_type( + tables, t, file_path=rel, + module=module, microservice=microservice, outer_fqn=None, + ) if verbose: elapsed = time.time() - t0 - print(f"[pass1] parsed {n_files} files in {elapsed:.2f}s: " - f"{len(tables.types)} types, {len(tables.members)} members, " - f"{tables.parse_errors} parse errors, {tables.skipped_files} skipped", - file=sys.stderr) + _verbose_stderr_line( + f"[pass1] parsed {n_files} files in {elapsed:.2f}s: " + f"{len(tables.types)} types, {len(tables.members)} members, " + f"{tables.parse_errors} parse errors, {tables.skipped_files} skipped", + ) return asts @@ -639,19 +702,23 @@ def pass2_edges(tables: GraphTables, asts: dict[str, JavaFileAst], *, verbose: b seen_ext: set[tuple[str, str]] = set() seen_impl: set[tuple[str, str]] = set() seen_inj: set[tuple[str, str, str, str]] = set() - for fqn, entry in tables.types.items(): - ast = asts.get(entry.file_path) - if ast is None: - continue - _emit_extends_implements(entry, ast, tables, seen_ext=seen_ext, seen_impl=seen_impl) - _emit_injects(entry, ast, tables, seen=seen_inj) + if verbose: + _verbose_stderr_line(_PASS2_START) + with _VerbosePassHeartbeats("[pass2]", verbose=verbose): + for fqn, entry in tables.types.items(): + ast = asts.get(entry.file_path) + if ast is None: + continue + _emit_extends_implements(entry, ast, tables, seen_ext=seen_ext, seen_impl=seen_impl) + _emit_injects(entry, ast, tables, seen=seen_inj) if verbose: elapsed = time.time() - t0 - print(f"[pass2] emitted {len(tables.extends_rows)} EXTENDS, " - f"{len(tables.implements_rows)} IMPLEMENTS, " - f"{len(tables.injects_rows)} INJECTS, " - f"{len(tables.phantoms)} phantoms in {elapsed:.2f}s", - file=sys.stderr) + _verbose_stderr_line( + f"[pass2] emitted {len(tables.extends_rows)} EXTENDS, " + f"{len(tables.implements_rows)} IMPLEMENTS, " + f"{len(tables.injects_rows)} INJECTS, " + f"{len(tables.phantoms)} phantoms in {elapsed:.2f}s", + ) # ---------- pass 3: call graph ---------- @@ -1216,13 +1283,16 @@ def _process_file_calls( def pass3_calls(tables: GraphTables, asts: dict[str, JavaFileAst], *, verbose: bool) -> None: + if verbose: + _verbose_stderr_line(_PASS3_START) _build_member_indexes(tables) stats = CallResolutionStats() - for rel_path, file_ast in asts.items(): - try: - _process_file_calls(file_ast, rel_path, tables, stats) - except Exception as e: - log.error("Call extraction failed for %s: %s", rel_path, e) + with _VerbosePassHeartbeats("[pass3]", verbose=verbose): + for rel_path, file_ast in asts.items(): + try: + _process_file_calls(file_ast, rel_path, tables, stats) + except Exception as e: + log.error("Call extraction failed for %s: %s", rel_path, e) pct_chained = 100.0 * stats.phantom_chained / max(1, stats.total) pct_callee_unres = 100.0 * stats.callee_unresolved / max(1, stats.total) pct_phantom_recv = 100.0 * stats.phantom_other / max(1, stats.total) @@ -1236,7 +1306,7 @@ def pass3_calls(tables: GraphTables, asts: dict[str, JavaFileAst], *, verbose: b ) log.info(msg) if verbose: - print(f"[pass3] {msg}", file=sys.stderr) + _verbose_stderr_line(f"[pass3] {msg}") _PATH_VAR_SEG = re.compile(r"^\{([^:{}]+)(?::([^}]*))?\}$") # whole path segment @@ -1342,118 +1412,121 @@ def pass4_routes( prs = str(source_root) tables.cross_service_resolution = _load_config_cross_service_resolution(prs) meta_chain = collect_annotation_meta_chain(prs) + if verbose: + _verbose_stderr_line(_PASS4_START) + with _VerbosePassHeartbeats("[pass4]", verbose=verbose): - for ast in asts.values(): - stats.routes_skipped_unresolved += ast.routes_skipped_unresolved + for ast in asts.values(): + stats.routes_skipped_unresolved += ast.routes_skipped_unresolved - routes_by_id: dict[str, RouteRow] = {} - exposes_seen: set[tuple[str, str]] = set() + routes_by_id: dict[str, RouteRow] = {} + exposes_seen: set[tuple[str, str]] = set() - http_kinds = frozenset({"http_endpoint", "http_consumer"}) + http_kinds = frozenset({"http_endpoint", "http_consumer"}) - for member in sorted(tables.members, key=lambda m: m.node_id): - if member.decl.is_constructor: - continue - ast = asts.get(member.file_path) - if ast is None: - continue - type_decl = tables.types[member.parent_fqn].decl - final_routes = resolve_routes_for_method( - method_decl=member.decl, - enclosing_type=type_decl, - overrides=overrides, - meta_chain=meta_chain, - builtin_routes=member.decl.routes, - ) - if not final_routes: - continue - for decl in final_routes: - path_template, path_regex = ("", "") - if decl.kind in http_kinds: - if decl.resolved and decl.resolution_strategy in ( - "annotation", - "codebase_route", - ): - path_template, path_regex = _normalize_path(decl.path) - else: - path_template, path_regex = "", "" - rid = _route_id( - decl.framework, - decl.kind, - decl.http_method, - path_template, - decl.path, - decl.topic, - decl.broker, - member.microservice, + for member in sorted(tables.members, key=lambda m: m.node_id): + if member.decl.is_constructor: + continue + ast = asts.get(member.file_path) + if ast is None: + continue + type_decl = tables.types[member.parent_fqn].decl + final_routes = resolve_routes_for_method( + method_decl=member.decl, + enclosing_type=type_decl, + overrides=overrides, + meta_chain=meta_chain, + builtin_routes=member.decl.routes, ) - layer = decl.route_source_layer - if rid not in routes_by_id: - routes_by_id[rid] = RouteRow( - id=rid, - kind=decl.kind, - framework=decl.framework, - method=decl.http_method, - path=decl.path, - path_template=path_template, - path_regex=path_regex, - topic=decl.topic, - broker=decl.broker, - feign_name=decl.feign_name, - feign_url=decl.feign_url, - microservice=member.microservice, - module=member.module, - filename=decl.filename, - start_line=decl.start_line, - end_line=decl.end_line, - resolved=decl.resolved, - source_layer=layer, - ) - else: - prev = routes_by_id[rid] - if _ROUTE_LAYER_RANK.get(layer, 0) > _ROUTE_LAYER_RANK.get( - prev.source_layer, - 0, - ): - routes_by_id[rid] = replace(prev, source_layer=layer) - ek = (member.node_id, rid) - if ek not in exposes_seen: - route_kind = routes_by_id[rid].kind - if route_kind == "http_consumer": - stats.exposes_suppressed_feign += 1 - continue - exposes_seen.add(ek) - tables.exposes_rows.append( - ExposesRow( - symbol_id=member.node_id, - route_id=rid, - confidence=decl.confidence, - strategy=decl.resolution_strategy, - ), + if not final_routes: + continue + for decl in final_routes: + path_template, path_regex = ("", "") + if decl.kind in http_kinds: + if decl.resolved and decl.resolution_strategy in ( + "annotation", + "codebase_route", + ): + path_template, path_regex = _normalize_path(decl.path) + else: + path_template, path_regex = "", "" + rid = _route_id( + decl.framework, + decl.kind, + decl.http_method, + path_template, + decl.path, + decl.topic, + decl.broker, + member.microservice, ) + layer = decl.route_source_layer + if rid not in routes_by_id: + routes_by_id[rid] = RouteRow( + id=rid, + kind=decl.kind, + framework=decl.framework, + method=decl.http_method, + path=decl.path, + path_template=path_template, + path_regex=path_regex, + topic=decl.topic, + broker=decl.broker, + feign_name=decl.feign_name, + feign_url=decl.feign_url, + microservice=member.microservice, + module=member.module, + filename=decl.filename, + start_line=decl.start_line, + end_line=decl.end_line, + resolved=decl.resolved, + source_layer=layer, + ) + else: + prev = routes_by_id[rid] + if _ROUTE_LAYER_RANK.get(layer, 0) > _ROUTE_LAYER_RANK.get( + prev.source_layer, + 0, + ): + routes_by_id[rid] = replace(prev, source_layer=layer) + ek = (member.node_id, rid) + if ek not in exposes_seen: + route_kind = routes_by_id[rid].kind + if route_kind == "http_consumer": + stats.exposes_suppressed_feign += 1 + continue + exposes_seen.add(ek) + tables.exposes_rows.append( + ExposesRow( + symbol_id=member.node_id, + route_id=rid, + confidence=decl.confidence, + strategy=decl.resolution_strategy, + ), + ) - tables.routes_rows = sorted(routes_by_id.values(), key=lambda r: r.id) + tables.routes_rows = sorted(routes_by_id.values(), key=lambda r: r.id) - for row in tables.routes_rows: - stats.by_framework[row.framework] += 1 - stats.by_kind[row.kind] += 1 - - n_routes = len(tables.routes_rows) - if n_routes: - stats.routes_resolved_pct = 100.0 * sum( - 1 for r in tables.routes_rows if r.resolved - ) / n_routes - stats.routes_from_brownfield_pct = 100.0 * sum( - 1 for r in tables.routes_rows if r.source_layer != "builtin" - ) / n_routes - else: - stats.routes_resolved_pct = 100.0 - stats.routes_from_brownfield_pct = 0.0 + for row in tables.routes_rows: + stats.by_framework[row.framework] += 1 + stats.by_kind[row.kind] += 1 - by_layer: dict[str, int] = defaultdict(int) - for row in tables.routes_rows: - by_layer[row.source_layer] += 1 - stats.routes_by_layer = dict(sorted(by_layer.items())) + n_routes = len(tables.routes_rows) + if n_routes: + stats.routes_resolved_pct = 100.0 * sum( + 1 for r in tables.routes_rows if r.resolved + ) / n_routes + stats.routes_from_brownfield_pct = 100.0 * sum( + 1 for r in tables.routes_rows if r.source_layer != "builtin" + ) / n_routes + else: + stats.routes_resolved_pct = 100.0 + stats.routes_from_brownfield_pct = 0.0 + + by_layer: dict[str, int] = defaultdict(int) + for row in tables.routes_rows: + by_layer[row.source_layer] += 1 + stats.routes_by_layer = dict(sorted(by_layer.items())) msg = ( f"Route extraction: emitted={n_routes}, exposes={len(tables.exposes_rows)}, " @@ -1465,7 +1538,7 @@ def pass4_routes( ) log.info(msg) if verbose: - print(f"[pass4] {msg}", file=sys.stderr) + _verbose_stderr_line(f"[pass4] {msg}") def pass5_imperative_edges( @@ -1514,96 +1587,142 @@ def _phantom_async_route_id(call: OutgoingCallDecl) -> str: uniq = hashlib.sha1(f"{call.filename}:{call.start_line}:{call.raw_topic}".encode()).hexdigest()[:12] return f"r:phantom:{uniq}" - for member in sorted(tables.members, key=lambda x: x.node_id): - if member.decl.is_constructor: - continue - type_decl = tables.types[member.parent_fqn].decl - final_http_calls = resolve_http_client_for_method( - method_decl=member.decl, - enclosing_type=type_decl, - overrides=overrides, - meta_chain=meta_chain, - builtin_calls=member.decl.outgoing_calls, - ) - final_async_calls = resolve_async_producer_for_method( - method_decl=member.decl, - enclosing_type=type_decl, - overrides=overrides, - meta_chain=meta_chain, - builtin_calls=member.decl.outgoing_calls, - ) - micro_factor = _micro_factor(member) - for call in final_http_calls + final_async_calls: - if call.channel == "http": - client_path = (call.path_template_call or "").strip() - client_method = (call.method_call or "").strip().upper() - # Keep normalized path fields on Client now so LC3 filter semantics - # (`path_prefix`) can use persisted columns without extra transforms. - client_path_template = "" - client_path_regex = "" - if client_path: - client_path_template, client_path_regex = _normalize_path(client_path) - cid = _client_id( - microservice=member.microservice, - member_fqn=call.method_fqn, - client_kind=call.client_kind, - path=client_path, - method=client_method, - ) - if cid not in client_seen: - client_seen.add(cid) - tables.client_rows.append( - ClientRow( - id=cid, - client_kind=call.client_kind, - target_service=call.feign_target_name, - path=client_path, - path_template=client_path_template, - path_regex=client_path_regex, - method=client_method, - member_fqn=call.method_fqn, - member_id=member.node_id, - microservice=member.microservice, - module=member.module, - filename=call.filename, - start_line=call.start_line, - end_line=call.end_line, - resolved=call.resolved, - source_layer=_client_source_layer(call.resolution_strategy), - ), + if verbose: + _verbose_stderr_line(_PASS5_START) + with _VerbosePassHeartbeats("[pass5]", verbose=verbose): + for member in sorted(tables.members, key=lambda x: x.node_id): + if member.decl.is_constructor: + continue + type_decl = tables.types[member.parent_fqn].decl + final_http_calls = resolve_http_client_for_method( + method_decl=member.decl, + enclosing_type=type_decl, + overrides=overrides, + meta_chain=meta_chain, + builtin_calls=member.decl.outgoing_calls, + ) + final_async_calls = resolve_async_producer_for_method( + method_decl=member.decl, + enclosing_type=type_decl, + overrides=overrides, + meta_chain=meta_chain, + builtin_calls=member.decl.outgoing_calls, + ) + micro_factor = _micro_factor(member) + for call in final_http_calls + final_async_calls: + if call.channel == "http": + client_path = (call.path_template_call or "").strip() + client_method = (call.method_call or "").strip().upper() + # Keep normalized path fields on Client now so LC3 filter semantics + # (`path_prefix`) can use persisted columns without extra transforms. + client_path_template = "" + client_path_regex = "" + if client_path: + client_path_template, client_path_regex = _normalize_path(client_path) + cid = _client_id( + microservice=member.microservice, + member_fqn=call.method_fqn, + client_kind=call.client_kind, + path=client_path, + method=client_method, ) - dkey = (member.node_id, cid) - if dkey not in declares_client_seen: - declares_client_seen.add(dkey) - tables.declares_client_rows.append( - DeclaresClientRow( + if cid not in client_seen: + client_seen.add(cid) + tables.client_rows.append( + ClientRow( + id=cid, + client_kind=call.client_kind, + target_service=call.feign_target_name, + path=client_path, + path_template=client_path_template, + path_regex=client_path_regex, + method=client_method, + member_fqn=call.method_fqn, + member_id=member.node_id, + microservice=member.microservice, + module=member.module, + filename=call.filename, + start_line=call.start_line, + end_line=call.end_line, + resolved=call.resolved, + source_layer=_client_source_layer(call.resolution_strategy), + ), + ) + dkey = (member.node_id, cid) + if dkey not in declares_client_seen: + declares_client_seen.add(dkey) + tables.declares_client_rows.append( + DeclaresClientRow( + symbol_id=member.node_id, + client_id=cid, + confidence=call.confidence_base, + strategy=call.resolution_strategy, + ), + ) + rid = "" + strategy = call.resolution_strategy + if call.client_kind == "feign_method": + exposing = next((e for e in tables.exposes_rows if e.symbol_id == member.node_id), None) + if exposing is not None: + rid = exposing.route_id + if not rid: + rid = _phantom_http_route_id(call) + _append_route( + RouteRow( + id=rid, + kind="http_endpoint", + framework="", + method=call.method_call, + path=call.path_template_call, + path_template=call.path_template_call, + path_regex="", + topic="", + broker="", + feign_name=call.feign_target_name, + feign_url=call.feign_target_url, + microservice="", + module="", + filename=call.filename, + start_line=call.start_line, + end_line=call.end_line, + resolved=False, + source_layer="builtin", + ) + ) + key = (member.node_id, rid) + if key in http_seen: + continue + http_seen.add(key) + conf = call.confidence_base * 0.3 * micro_factor + tables.http_call_rows.append( + HttpCallRow( symbol_id=member.node_id, - client_id=cid, - confidence=call.confidence_base, - strategy=call.resolution_strategy, - ), + route_id=rid, + confidence=conf, + strategy=strategy, + method_call=call.method_call, + raw_uri=call.raw_uri, + match="unresolved", + ) ) - rid = "" - strategy = call.resolution_strategy - if call.client_kind == "feign_method": - exposing = next((e for e in tables.exposes_rows if e.symbol_id == member.node_id), None) - if exposing is not None: - rid = exposing.route_id - if not rid: - rid = _phantom_http_route_id(call) + tables.call_edge_stats.http_calls_total += 1 + tables.call_edge_stats.http_calls_by_client_kind[call.client_kind] += 1 + tables.call_edge_stats.http_calls_by_strategy[strategy] += 1 + elif call.channel == "async": + rid = _phantom_async_route_id(call) _append_route( RouteRow( id=rid, - kind="http_endpoint", + kind="kafka_topic", framework="", - method=call.method_call, - path=call.path_template_call, - path_template=call.path_template_call, + method="", + path="", + path_template="", path_regex="", - topic="", - broker="", - feign_name=call.feign_target_name, - feign_url=call.feign_target_url, + topic=call.topic_call, + broker=call.broker_call, + feign_name="", + feign_url="", microservice="", module="", filename=call.filename, @@ -1613,118 +1732,74 @@ def _phantom_async_route_id(call: OutgoingCallDecl) -> str: source_layer="builtin", ) ) - key = (member.node_id, rid) - if key in http_seen: - continue - http_seen.add(key) - conf = call.confidence_base * 0.3 * micro_factor - tables.http_call_rows.append( - HttpCallRow( - symbol_id=member.node_id, - route_id=rid, - confidence=conf, - strategy=strategy, - method_call=call.method_call, - raw_uri=call.raw_uri, - match="unresolved", - ) - ) - tables.call_edge_stats.http_calls_total += 1 - tables.call_edge_stats.http_calls_by_client_kind[call.client_kind] += 1 - tables.call_edge_stats.http_calls_by_strategy[strategy] += 1 - elif call.channel == "async": - rid = _phantom_async_route_id(call) - _append_route( - RouteRow( - id=rid, - kind="kafka_topic", - framework="", - method="", - path="", - path_template="", - path_regex="", - topic=call.topic_call, - broker=call.broker_call, - feign_name="", - feign_url="", - microservice="", - module="", - filename=call.filename, - start_line=call.start_line, - end_line=call.end_line, - resolved=False, - source_layer="builtin", - ) - ) - key = (member.node_id, rid) - if key in async_seen: - continue - async_seen.add(key) - conf = call.confidence_base * 0.3 * micro_factor - strategy = call.resolution_strategy - tables.async_call_rows.append( - AsyncCallRow( - symbol_id=member.node_id, - route_id=rid, - confidence=conf, - strategy=strategy, - direction="producer", - raw_topic=call.raw_topic, - match="unresolved", + key = (member.node_id, rid) + if key in async_seen: + continue + async_seen.add(key) + conf = call.confidence_base * 0.3 * micro_factor + strategy = call.resolution_strategy + tables.async_call_rows.append( + AsyncCallRow( + symbol_id=member.node_id, + route_id=rid, + confidence=conf, + strategy=strategy, + direction="producer", + raw_topic=call.raw_topic, + match="unresolved", + ) ) - ) - tables.call_edge_stats.async_calls_total += 1 - tables.call_edge_stats.async_calls_by_client_kind[call.client_kind] += 1 - tables.call_edge_stats.async_calls_by_strategy[strategy] += 1 - - tables.routes_rows = sorted(route_rows, key=lambda r: r.id) - tables.client_rows = sorted(tables.client_rows, key=lambda c: c.id) - tables.declares_client_rows = sorted( - tables.declares_client_rows, - key=lambda e: (e.symbol_id, e.client_id), - ) - tables.client_stats.clients_total = len(tables.client_rows) - tables.client_stats.declares_client_total = len(tables.declares_client_rows) - tables.client_stats.clients_by_kind = defaultdict(int) - for row in tables.client_rows: - tables.client_stats.clients_by_kind[row.client_kind] += 1 - brownfield_strategies = frozenset( - ( - "layer_b_ann", - "layer_a_meta", - "layer_c_source", - "layer_b_fqn", - "codebase_client", - "codebase_producer", - ), - ) - if tables.call_edge_stats.http_calls_total: - n_http = sum( - v for k, v in tables.call_edge_stats.http_calls_by_strategy.items() - if k in brownfield_strategies - ) - tables.call_edge_stats.http_clients_from_brownfield_pct = ( - 100.0 * float(n_http) / float(tables.call_edge_stats.http_calls_total) + tables.call_edge_stats.async_calls_total += 1 + tables.call_edge_stats.async_calls_by_client_kind[call.client_kind] += 1 + tables.call_edge_stats.async_calls_by_strategy[strategy] += 1 + + tables.routes_rows = sorted(route_rows, key=lambda r: r.id) + tables.client_rows = sorted(tables.client_rows, key=lambda c: c.id) + tables.declares_client_rows = sorted( + tables.declares_client_rows, + key=lambda e: (e.symbol_id, e.client_id), ) - if tables.call_edge_stats.async_calls_total: - n_async = sum( - v for k, v in tables.call_edge_stats.async_calls_by_strategy.items() - if k in brownfield_strategies - ) - tables.call_edge_stats.async_producers_from_brownfield_pct = ( - 100.0 * float(n_async) / float(tables.call_edge_stats.async_calls_total) + tables.client_stats.clients_total = len(tables.client_rows) + tables.client_stats.declares_client_total = len(tables.declares_client_rows) + tables.client_stats.clients_by_kind = defaultdict(int) + for row in tables.client_rows: + tables.client_stats.clients_by_kind[row.client_kind] += 1 + brownfield_strategies = frozenset( + ( + "layer_b_ann", + "layer_a_meta", + "layer_c_source", + "layer_b_fqn", + "codebase_client", + "codebase_producer", + ), ) + if tables.call_edge_stats.http_calls_total: + n_http = sum( + v for k, v in tables.call_edge_stats.http_calls_by_strategy.items() + if k in brownfield_strategies + ) + tables.call_edge_stats.http_clients_from_brownfield_pct = ( + 100.0 * float(n_http) / float(tables.call_edge_stats.http_calls_total) + ) + if tables.call_edge_stats.async_calls_total: + n_async = sum( + v for k, v in tables.call_edge_stats.async_calls_by_strategy.items() + if k in brownfield_strategies + ) + tables.call_edge_stats.async_producers_from_brownfield_pct = ( + 100.0 * float(n_async) / float(tables.call_edge_stats.async_calls_total) + ) if verbose: http_client = dict(sorted(tables.call_edge_stats.http_calls_by_client_kind.items())) async_client = dict(sorted(tables.call_edge_stats.async_calls_by_client_kind.items())) http_strategy = dict(sorted(tables.call_edge_stats.http_calls_by_strategy.items())) async_strategy = dict(sorted(tables.call_edge_stats.async_calls_by_strategy.items())) - print( + _verbose_stderr_line( f"[pass5] HTTP_CALLS: {len(tables.http_call_rows)} edges, " f"ASYNC_CALLS: {len(tables.async_call_rows)} edges; " f"http_by_client_kind={http_client}, async_by_client_kind={async_client}, " f"http_by_strategy={http_strategy}, async_by_strategy={async_strategy}", - file=sys.stderr, ) @@ -1865,166 +1940,167 @@ def pass6_match_edges( def _micro_factor(member: MemberEntry | None) -> float: return 1.0 if (member and member.microservice) else 0.85 - for row in tables.http_call_rows: - if row.match != "unresolved": - continue - member = member_by_id.get(row.symbol_id) - base = row.confidence / max(1e-9, (0.3 * _micro_factor(member))) - src_route = route_by_id.get(row.route_id) - if src_route is None and member is not None: - # Recover feign caller hints from persisted caller-side Client declarations. - for client in client_hints_by_member.get(member.node_id, ()): - if client.client_kind != "feign_method": - continue - path_template, path_regex = _normalize_path(client.path) - src_route = RouteRow( - id="", - kind="http_consumer", - framework="feign", - method=client.method, - path=client.path, - path_template=path_template, - path_regex=path_regex, - topic="", - broker="", - feign_name=client.target_service, - # `Client` stores service-name hints, not feign URL; matcher keys off feign_name. - feign_url="", - microservice=member.microservice, - module=member.module, - filename=client.filename, - start_line=client.start_line, - end_line=client.end_line, - resolved=client.resolved, - source_layer=client.source_layer, + if verbose: + _verbose_stderr_line(_PASS6_START) + with _VerbosePassHeartbeats("[pass6]", verbose=verbose): + for row in tables.http_call_rows: + if row.match != "unresolved": + continue + member = member_by_id.get(row.symbol_id) + base = row.confidence / max(1e-9, (0.3 * _micro_factor(member))) + src_route = route_by_id.get(row.route_id) + if src_route is None and member is not None: + # Recover feign caller hints from persisted caller-side Client declarations. + for client in client_hints_by_member.get(member.node_id, ()): + if client.client_kind != "feign_method": + continue + path_template, path_regex = _normalize_path(client.path) + src_route = RouteRow( + id="", + kind="http_consumer", + framework="feign", + method=client.method, + path=client.path, + path_template=path_template, + path_regex=path_regex, + topic="", + broker="", + feign_name=client.target_service, + # `Client` stores service-name hints, not feign URL; matcher keys off feign_name. + feign_url="", + microservice=member.microservice, + module=member.module, + filename=client.filename, + start_line=client.start_line, + end_line=client.end_line, + resolved=client.resolved, + source_layer=client.source_layer, + ) + break + # Feign caller hints are synthesized as transient `http_consumer` routes in pass6; + # synthetic phantoms from imperative clients are `http_endpoint` even when `feign_name` is populated from + # `@CodebaseHttpClient.targetService` / YAML hints — those must path-match like RestTemplate. + _feign_like = ( + src_route is not None + and src_route.kind == "http_consumer" + and bool(src_route.feign_name) + ) + call = OutgoingCallDecl( + method_fqn=f"{member.parent_fqn}#{member.decl.signature}" if member else "", + method_sig=member.decl.signature if member else "", + client_kind="feign_method" if _feign_like else "rest_template", + channel="http", + feign_target_name=src_route.feign_name if src_route else "", + feign_target_url=src_route.feign_url if src_route else "", + path_template_call=src_route.path_template if src_route else "", + method_call=row.method_call, + topic_call="", + broker_call="", + raw_uri=row.raw_uri, + raw_topic="", + resolution_strategy=row.strategy, + confidence_base=base, + resolved=(row.strategy != "unresolved"), + filename=member.file_path if member else "", + start_line=member.decl.start_line if member else 0, + end_line=member.decl.end_line if member else 0, + ) + outcome, candidates = _match_call_edge(call, all_routes, member.microservice if member else "") + if ( + brownfield_only + and outcome == "cross_service" + and not _is_brownfield_sourced(row.strategy, candidates) + ): + outcome = "unresolved" + candidates = [] + suppressed_auto_cross_count += 1 + if len(suppressed_auto_cross_http) < 5: + suppressed_auto_cross_http.append(call.method_fqn) + if outcome in VALID_HTTP_CALL_MATCHES: + row.match = outcome + if outcome in ("cross_service", "intra_service") and len(candidates) == 1: + row.route_id = candidates[0].id + row.confidence = call.confidence_base * match_factor[row.match] * _micro_factor(member) + tables.call_edge_stats.http_calls_match_breakdown[row.match] += 1 + if row.match == "cross_service": + tables.call_edge_stats.cross_service_calls_total += 1 + + for row in tables.async_call_rows: + if row.match != "unresolved": + continue + member = member_by_id.get(row.symbol_id) + base = row.confidence / max(1e-9, (0.3 * _micro_factor(member))) + src_route = route_by_id.get(row.route_id) + call = OutgoingCallDecl( + method_fqn=f"{member.parent_fqn}#{member.decl.signature}" if member else "", + method_sig=member.decl.signature if member else "", + client_kind="kafka_send", + channel="async", + feign_target_name="", + feign_target_url="", + path_template_call="", + method_call="", + topic_call=src_route.topic if src_route else "", + broker_call=src_route.broker if src_route else "", + raw_uri="", + raw_topic=row.raw_topic, + resolution_strategy=row.strategy, + confidence_base=base, + resolved=(row.strategy != "unresolved"), + filename=member.file_path if member else "", + start_line=member.decl.start_line if member else 0, + end_line=member.decl.end_line if member else 0, + ) + outcome, candidates = _match_call_edge(call, all_routes, member.microservice if member else "") + if ( + brownfield_only + and outcome == "cross_service" + and not _is_brownfield_sourced(row.strategy, candidates) + ): + outcome = "unresolved" + candidates = [] + suppressed_auto_cross_count += 1 + if len(suppressed_auto_cross_async) < 5: + suppressed_auto_cross_async.append(call.method_fqn) + if outcome in VALID_HTTP_CALL_MATCHES: + row.match = outcome + if outcome in ("cross_service", "intra_service") and len(candidates) == 1: + row.route_id = candidates[0].id + row.confidence = call.confidence_base * match_factor[row.match] * _micro_factor(member) + tables.call_edge_stats.async_calls_match_breakdown[row.match] += 1 + if row.match == "cross_service": + tables.call_edge_stats.cross_service_calls_total += 1 + + inbound_route_ids = {r.route_id for r in tables.http_call_rows} | {r.route_id for r in tables.async_call_rows} + tables.routes_rows = sorted( + [ + r for r in tables.routes_rows + if not ( + (r.microservice == "") + and (r.framework == "") + and (not r.resolved) + and (r.id not in inbound_route_ids) ) - break - # Feign caller hints are synthesized as transient `http_consumer` routes in pass6; - # synthetic phantoms from imperative clients are `http_endpoint` even when `feign_name` is populated from - # `@CodebaseHttpClient.targetService` / YAML hints — those must path-match like RestTemplate. - _feign_like = ( - src_route is not None - and src_route.kind == "http_consumer" - and bool(src_route.feign_name) - ) - call = OutgoingCallDecl( - method_fqn=f"{member.parent_fqn}#{member.decl.signature}" if member else "", - method_sig=member.decl.signature if member else "", - client_kind="feign_method" if _feign_like else "rest_template", - channel="http", - feign_target_name=src_route.feign_name if src_route else "", - feign_target_url=src_route.feign_url if src_route else "", - path_template_call=src_route.path_template if src_route else "", - method_call=row.method_call, - topic_call="", - broker_call="", - raw_uri=row.raw_uri, - raw_topic="", - resolution_strategy=row.strategy, - confidence_base=base, - resolved=(row.strategy != "unresolved"), - filename=member.file_path if member else "", - start_line=member.decl.start_line if member else 0, - end_line=member.decl.end_line if member else 0, - ) - outcome, candidates = _match_call_edge(call, all_routes, member.microservice if member else "") - if ( - brownfield_only - and outcome == "cross_service" - and not _is_brownfield_sourced(row.strategy, candidates) - ): - outcome = "unresolved" - candidates = [] - suppressed_auto_cross_count += 1 - if len(suppressed_auto_cross_http) < 5: - suppressed_auto_cross_http.append(call.method_fqn) - if outcome in VALID_HTTP_CALL_MATCHES: - row.match = outcome - if outcome in ("cross_service", "intra_service") and len(candidates) == 1: - row.route_id = candidates[0].id - row.confidence = call.confidence_base * match_factor[row.match] * _micro_factor(member) - tables.call_edge_stats.http_calls_match_breakdown[row.match] += 1 - if row.match == "cross_service": - tables.call_edge_stats.cross_service_calls_total += 1 - - for row in tables.async_call_rows: - if row.match != "unresolved": - continue - member = member_by_id.get(row.symbol_id) - base = row.confidence / max(1e-9, (0.3 * _micro_factor(member))) - src_route = route_by_id.get(row.route_id) - call = OutgoingCallDecl( - method_fqn=f"{member.parent_fqn}#{member.decl.signature}" if member else "", - method_sig=member.decl.signature if member else "", - client_kind="kafka_send", - channel="async", - feign_target_name="", - feign_target_url="", - path_template_call="", - method_call="", - topic_call=src_route.topic if src_route else "", - broker_call=src_route.broker if src_route else "", - raw_uri="", - raw_topic=row.raw_topic, - resolution_strategy=row.strategy, - confidence_base=base, - resolved=(row.strategy != "unresolved"), - filename=member.file_path if member else "", - start_line=member.decl.start_line if member else 0, - end_line=member.decl.end_line if member else 0, + ], + key=lambda r: r.id, ) - outcome, candidates = _match_call_edge(call, all_routes, member.microservice if member else "") - if ( - brownfield_only - and outcome == "cross_service" - and not _is_brownfield_sourced(row.strategy, candidates) - ): - outcome = "unresolved" - candidates = [] - suppressed_auto_cross_count += 1 - if len(suppressed_auto_cross_async) < 5: - suppressed_auto_cross_async.append(call.method_fqn) - if outcome in VALID_HTTP_CALL_MATCHES: - row.match = outcome - if outcome in ("cross_service", "intra_service") and len(candidates) == 1: - row.route_id = candidates[0].id - row.confidence = call.confidence_base * match_factor[row.match] * _micro_factor(member) - tables.call_edge_stats.async_calls_match_breakdown[row.match] += 1 - if row.match == "cross_service": - tables.call_edge_stats.cross_service_calls_total += 1 - - inbound_route_ids = {r.route_id for r in tables.http_call_rows} | {r.route_id for r in tables.async_call_rows} - tables.routes_rows = sorted( - [ - r for r in tables.routes_rows - if not ( - (r.microservice == "") - and (r.framework == "") - and (not r.resolved) - and (r.id not in inbound_route_ids) - ) - ], - key=lambda r: r.id, - ) if verbose: if brownfield_only: n_bf = tables.call_edge_stats.cross_service_calls_total first_http = ", ".join(suppressed_auto_cross_http) first_async = ", ".join(suppressed_auto_cross_async) - print( + _verbose_stderr_line( f"[pass6] cross_service_resolution=brownfield_only:\n" f" {n_bf} cross_service edges from brownfield layers,\n" f" {suppressed_auto_cross_count} auto-cross-service candidates suppressed -> unresolved\n" f" (first 5 http: {first_http})\n" f" (first 5 async: {first_async})", - file=sys.stderr, ) - print( + _verbose_stderr_line( f"[pass6] http_match={dict(sorted(tables.call_edge_stats.http_calls_match_breakdown.items()))}, " f"async_match={dict(sorted(tables.call_edge_stats.async_calls_match_breakdown.items()))}, " f"cross_service_calls_total={tables.call_edge_stats.cross_service_calls_total}", - file=sys.stderr, ) @@ -2548,31 +2624,34 @@ def write_kuzu( meta_chain = collect_annotation_meta_chain( str(source_root.resolve()), ) - db_path.parent.mkdir(parents=True, exist_ok=True) - db = kuzu.Database(str(db_path)) - conn = kuzu.Connection(db) - _drop_all(conn) - _create_schema(conn) - t0 = time.time() - _write_nodes( - conn, - tables, - project_root=source_root, - meta_chain=meta_chain, - ) if verbose: - print(f"[write] nodes written in {time.time() - t0:.2f}s", file=sys.stderr) - _populate_declares_rows(tables) - t1 = time.time() - _write_edges(conn, tables) - if verbose: - print(f"[write] edges written in {time.time() - t1:.2f}s", file=sys.stderr) - t2 = time.time() - _write_routes_and_exposes(conn, tables) - if verbose: - print(f"[write] routes/exposes written in {time.time() - t2:.2f}s", file=sys.stderr) - _write_meta(conn, tables, source_root) - conn.close() + _verbose_stderr_line(_WRITE_START) + with _VerbosePassHeartbeats("[write]", verbose=verbose): + db_path.parent.mkdir(parents=True, exist_ok=True) + db = kuzu.Database(str(db_path)) + conn = kuzu.Connection(db) + _drop_all(conn) + _create_schema(conn) + t0 = time.time() + _write_nodes( + conn, + tables, + project_root=source_root, + meta_chain=meta_chain, + ) + if verbose: + _verbose_stderr_line(f"[write] nodes written in {time.time() - t0:.2f}s") + _populate_declares_rows(tables) + t1 = time.time() + _write_edges(conn, tables) + if verbose: + _verbose_stderr_line(f"[write] edges written in {time.time() - t1:.2f}s") + t2 = time.time() + _write_routes_and_exposes(conn, tables) + if verbose: + _verbose_stderr_line(f"[write] routes/exposes written in {time.time() - t2:.2f}s") + _write_meta(conn, tables, source_root) + conn.close() # ---------- CLI ---------- @@ -2615,7 +2694,7 @@ def main() -> int: pass6_match_edges(tables, verbose=args.verbose) write_kuzu(kuzu_path, tables, source_root=root, verbose=args.verbose) if args.verbose: - print(f"[done] kuzu at {kuzu_path}", file=sys.stderr) + _verbose_stderr_line(f"[done] kuzu at {kuzu_path}") return 0 diff --git a/docs/AGENT-GUIDE.md b/docs/AGENT-GUIDE.md index 04a1987..9ded241 100644 --- a/docs/AGENT-GUIDE.md +++ b/docs/AGENT-GUIDE.md @@ -33,7 +33,7 @@ This MCP indexes Java enterprise projects into two stores: **MCP surface (navigation only):** `search`, `find`, `describe`, `neighbors`. -**Operator / diagnostics (not MCP):** use the **`java-codebase-rag`** CLI — lifecycle (`init`, `increment`, `reprocess`, `erase`) plus `meta`, `tables`, `diagnose-ignore`, `analyze-pr`. Rebuilds are slow; the coding agent should not pretend it can reindex via MCP. +**Operator / diagnostics (not MCP):** use the **`java-codebase-rag`** CLI — lifecycle (`init`, `increment`, `reprocess`, `erase`) plus `meta`, `tables`, `diagnose-ignore`, `analyze-pr`. Rebuilds are slow; the coding agent should not pretend it can reindex via MCP. For lifecycle commands, subprocess progress is written to **stderr** (use **`--quiet`** to suppress it); **stdout** is only the structured result payload. **Use this MCP when** you need whole-codebase context: who calls what, what handles a route, what a method invokes, where clients point, or fuzzy “where is concept X” entry points. diff --git a/docs/JAVA-CODEBASE-RAG-CLI.md b/docs/JAVA-CODEBASE-RAG-CLI.md index 524724e..f73cabc 100644 --- a/docs/JAVA-CODEBASE-RAG-CLI.md +++ b/docs/JAVA-CODEBASE-RAG-CLI.md @@ -21,6 +21,7 @@ If `java-codebase-rag` is missing, run the module entrypoint: - **TTY:** human-readable `pprint` of the payload on stdout (except **successful selective `reprocess`** with `--vectors-only` / `--graph-only`, which prints `Rebuilt:` / `Skipped:` lines instead of dumping the full dict). - **Piped / non-TTY:** **single JSON object** per invocation on stdout (no trailing noise). Use this in scripts and CI. +- **Lifecycle stderr:** `init`, `increment`, `reprocess`, and `erase` stream subprocess progress (and relayed child stdout) to **stderr**; pass **`--quiet`** to suppress that stream. **stdout** stays the JSON/pprint payload only. Example: diff --git a/java_codebase_rag/cli.py b/java_codebase_rag/cli.py index a4082a8..a311b8a 100644 --- a/java_codebase_rag/cli.py +++ b/java_codebase_rag/cli.py @@ -9,8 +9,9 @@ import pprint import shutil import sys +import time from pathlib import Path -from typing import Any +from typing import Any, Callable from java_codebase_rag.config import ( ResolvedOperatorConfig, @@ -94,6 +95,60 @@ def _emit_reprocess_outcome(payload: dict[str, Any], *, selective_tty_mode: str _emit(payload) +_PIPELINE_SEP = "\u00b7" + + +def _pipeline_header(subcommand: str, cfg: ResolvedOperatorConfig) -> None: + root = cfg.source_root.resolve() + idx = cfg.index_dir.resolve() + print( + f"java-codebase-rag {subcommand} {_PIPELINE_SEP} source={root} {_PIPELINE_SEP} index={idx}", + file=sys.stderr, + flush=True, + ) + + +def _pipeline_footer(subcommand: str, started: float, exit_code: int) -> None: + elapsed = time.perf_counter() - started + print( + f"java-codebase-rag {subcommand} {_PIPELINE_SEP} finished in {elapsed:.2f}s (exit={exit_code})", + file=sys.stderr, + flush=True, + ) + + +def _run_with_pipeline_progress( + subcommand: str, + cfg: ResolvedOperatorConfig, + *, + quiet: bool, + work: Callable[[], int], +) -> int: + if quiet: + return int(work()) + _pipeline_header(subcommand, cfg) + t0 = time.perf_counter() + code = 0 + try: + code = int(work()) + return code + except BaseException as exc: + # Keep footer aligned with process outcome (main maps unhandled Exception -> exit 2). + if isinstance(exc, SystemExit): + c = exc.code + if isinstance(c, int): + code = c + elif c in (None, False): + code = 0 + else: + code = 1 + elif code == 0: + code = 2 + raise + finally: + _pipeline_footer(subcommand, t0, code) + + def _jsonable(value: Any) -> Any: if hasattr(value, "model_dump"): return value.model_dump() @@ -168,43 +223,47 @@ def _cmd_init(args: argparse.Namespace) -> int: ) return 2 cfg.index_dir.mkdir(parents=True, exist_ok=True) - env = cfg.subprocess_env() - coco = run_cocoindex_update( - env, - full_reprocess=False, - quiet=bool(args.quiet), - lance_project_root=None if args.quiet else cfg.source_root, - ) - if coco.returncode != 0: - _emit( - { - "success": False, - "exit_code": coco.returncode, - "stdout": clip(coco.stdout, 8000), - "stderr": clip(coco.stderr, 8000), - "message": f"cocoindex exit {coco.returncode}", - } + + def work() -> int: + env = cfg.subprocess_env() + coco = run_cocoindex_update( + env, + full_reprocess=False, + quiet=bool(args.quiet), + lance_project_root=None if args.quiet else cfg.source_root, ) - return 1 - g = run_build_ast_graph( - source_root=cfg.source_root, - kuzu_path=cfg.kuzu_path, - verbose=not args.quiet, - env=env, - ) - if g.returncode != 0: - _emit( - { - "success": False, - "exit_code": g.returncode, - "stdout": clip(g.stdout, 4000), - "stderr": clip(g.stderr, 4000), - "message": f"graph builder exit {g.returncode}", - } + if coco.returncode != 0: + _emit( + { + "success": False, + "exit_code": coco.returncode, + "stdout": clip(coco.stdout, 8000), + "stderr": clip(coco.stderr, 8000), + "message": f"cocoindex exit {coco.returncode}", + } + ) + return 1 + g = run_build_ast_graph( + source_root=cfg.source_root, + kuzu_path=cfg.kuzu_path, + verbose=not args.quiet, + env=env, ) - return 1 - _emit({"success": True, "message": "init completed"}) - return 0 + if g.returncode != 0: + _emit( + { + "success": False, + "exit_code": g.returncode, + "stdout": clip(g.stdout, 4000), + "stderr": clip(g.stderr, 4000), + "message": f"graph builder exit {g.returncode}", + } + ) + return 1 + _emit({"success": True, "message": "init completed"}) + return 0 + + return _run_with_pipeline_progress("init", cfg, quiet=bool(args.quiet), work=work) def _cmd_increment(args: argparse.Namespace) -> int: @@ -212,113 +271,121 @@ def _cmd_increment(args: argparse.Namespace) -> int: _startup_hints(cfg) cfg.apply_to_os_environ() _emit_increment_kuzu_warning() - env = cfg.subprocess_env() - coco = run_cocoindex_update( - env, - full_reprocess=False, - quiet=bool(args.quiet), - lance_project_root=None if args.quiet else cfg.source_root, - ) - if coco.returncode != 0: - _emit( - { - "success": False, - "exit_code": coco.returncode, - "stdout": clip(coco.stdout, 8000), - "stderr": clip(coco.stderr, 8000), - "message": f"cocoindex exit {coco.returncode}", - } + + def work() -> int: + env = cfg.subprocess_env() + coco = run_cocoindex_update( + env, + full_reprocess=False, + quiet=bool(args.quiet), + lance_project_root=None if args.quiet else cfg.source_root, ) - return 1 - _emit({"success": True, "message": "increment completed (Lance only; graph may be stale — see stderr)"}) - return 0 + if coco.returncode != 0: + _emit( + { + "success": False, + "exit_code": coco.returncode, + "stdout": clip(coco.stdout, 8000), + "stderr": clip(coco.stderr, 8000), + "message": f"cocoindex exit {coco.returncode}", + } + ) + return 1 + _emit({"success": True, "message": "increment completed (Lance only; graph may be stale — see stderr)"}) + return 0 + + return _run_with_pipeline_progress("increment", cfg, quiet=bool(args.quiet), work=work) def _cmd_reprocess(args: argparse.Namespace) -> int: cfg = _resolved_from_ns(args) _startup_hints(cfg) cfg.apply_to_os_environ() - env = cfg.subprocess_env() - vectors_only = bool(getattr(args, "vectors_only", False)) - graph_only = bool(getattr(args, "graph_only", False)) - - if vectors_only: - coco = run_cocoindex_update(env, full_reprocess=True, quiet=bool(args.quiet)) - if _is_cocoindex_preflight_blocker(coco): - payload: dict[str, Any] = { - "success": False, - "exit_code": None, + + def work() -> int: + env = cfg.subprocess_env() + vectors_only = bool(getattr(args, "vectors_only", False)) + graph_only = bool(getattr(args, "graph_only", False)) + + if vectors_only: + coco = run_cocoindex_update(env, full_reprocess=True, quiet=bool(args.quiet)) + if _is_cocoindex_preflight_blocker(coco): + payload: dict[str, Any] = { + "success": False, + "exit_code": None, + "stdout": clip(coco.stdout, 8000), + "stderr": clip(coco.stderr, 8000), + "message": coco.stderr.strip() or f"cocoindex setup exit {coco.returncode}", + "graph_exit_code": None, + "graph_stdout": "", + "graph_stderr": "", + "phases_run": [], + } + _emit_reprocess_outcome(payload) + return _reprocess_exit_code(payload) + ok = coco.returncode == 0 + payload = { + "success": ok, + "exit_code": coco.returncode, "stdout": clip(coco.stdout, 8000), "stderr": clip(coco.stderr, 8000), - "message": coco.stderr.strip() or f"cocoindex setup exit {coco.returncode}", + "message": None if ok else f"cocoindex exit {coco.returncode}", "graph_exit_code": None, "graph_stdout": "", "graph_stderr": "", - "phases_run": [], + "phases_run": ["vectors"], } - _emit_reprocess_outcome(payload) + if ok: + print(_REPROCESS_DRIFT_VECTORS_ONLY, file=sys.stderr) + _emit_reprocess_outcome(payload, selective_tty_mode="vectors" if ok else None) return _reprocess_exit_code(payload) - ok = coco.returncode == 0 - payload = { - "success": ok, - "exit_code": coco.returncode, - "stdout": clip(coco.stdout, 8000), - "stderr": clip(coco.stderr, 8000), - "message": None if ok else f"cocoindex exit {coco.returncode}", - "graph_exit_code": None, - "graph_stdout": "", - "graph_stderr": "", - "phases_run": ["vectors"], - } - if ok: - print(_REPROCESS_DRIFT_VECTORS_ONLY, file=sys.stderr) - _emit_reprocess_outcome(payload, selective_tty_mode="vectors" if ok else None) - return _reprocess_exit_code(payload) - if graph_only: - g = run_build_ast_graph( - source_root=cfg.source_root, - kuzu_path=cfg.kuzu_path, - verbose=not args.quiet, - env=env, - ) - if _is_graph_preflight_blocker(g): + if graph_only: + g = run_build_ast_graph( + source_root=cfg.source_root, + kuzu_path=cfg.kuzu_path, + verbose=not args.quiet, + env=env, + ) + if _is_graph_preflight_blocker(g): + payload = { + "success": False, + "exit_code": None, + "stdout": "", + "stderr": "", + "message": g.stderr.strip() or f"graph builder setup exit {g.returncode}", + "graph_exit_code": None, + "graph_stdout": clip(g.stdout, 4000), + "graph_stderr": clip(g.stderr, 4000), + "phases_run": [], + } + _emit_reprocess_outcome(payload) + return _reprocess_exit_code(payload) + ok = g.returncode == 0 payload = { - "success": False, + "success": ok, "exit_code": None, "stdout": "", "stderr": "", - "message": g.stderr.strip() or f"graph builder setup exit {g.returncode}", - "graph_exit_code": None, + "message": None if ok else f"graph builder exit {g.returncode}", + "graph_exit_code": g.returncode, "graph_stdout": clip(g.stdout, 4000), "graph_stderr": clip(g.stderr, 4000), - "phases_run": [], + "phases_run": ["graph"], } - _emit_reprocess_outcome(payload) + if ok: + print(_reprocess_drift_graph_only_line(cfg.index_dir), file=sys.stderr) + _emit_reprocess_outcome(payload, selective_tty_mode="graph" if ok else None) return _reprocess_exit_code(payload) - ok = g.returncode == 0 - payload = { - "success": ok, - "exit_code": None, - "stdout": "", - "stderr": "", - "message": None if ok else f"graph builder exit {g.returncode}", - "graph_exit_code": g.returncode, - "graph_stdout": clip(g.stdout, 4000), - "graph_stderr": clip(g.stderr, 4000), - "phases_run": ["graph"], - } - if ok: - print(_reprocess_drift_graph_only_line(cfg.index_dir), file=sys.stderr) - _emit_reprocess_outcome(payload, selective_tty_mode="graph" if ok else None) - return _reprocess_exit_code(payload) - import server # lazy: pulls sentence_transformers/torch/lancedb/kuzu + import server # lazy: pulls sentence_transformers/torch/lancedb/kuzu + + result = asyncio.run(server.run_refresh_pipeline(quiet=bool(args.quiet))) + payload = result.model_dump() + _emit_reprocess_outcome(payload) + return _reprocess_exit_code(payload) - result = asyncio.run(server.run_refresh_pipeline(quiet=bool(args.quiet))) - payload = result.model_dump() - _emit_reprocess_outcome(payload) - return _reprocess_exit_code(payload) + return _run_with_pipeline_progress("reprocess", cfg, quiet=bool(args.quiet), work=work) def _cmd_erase(args: argparse.Namespace) -> int: @@ -350,37 +417,41 @@ def _cmd_erase(args: argparse.Namespace) -> int: if ans not in ("y", "yes"): print("Aborted.", file=sys.stderr) return 2 - env = cfg.subprocess_env() - drop = run_cocoindex_drop(env, quiet=False) - if drop.returncode == 127: - print( - "java-codebase-rag erase: cocoindex CLI not found next to this Python; " - "skipped `cocoindex drop` — cocoindex.db (if any) was not removed by CocoIndex.", - file=sys.stderr, - ) - elif drop.returncode != 0: - print(clip(drop.stderr, 4000), file=sys.stderr) - if cfg.kuzu_path.exists(): - shutil.rmtree(cfg.kuzu_path, ignore_errors=True) - if cfg.cocoindex_db.exists(): - try: - cfg.cocoindex_db.unlink() - except OSError: - pass - if cfg.index_dir.is_dir(): - try: - import lancedb - db = lancedb.connect(str(cfg.index_dir.resolve())) - for name in list(db.table_names()): - try: - db.drop_table(name) - except Exception as exc: - print(f"warning: failed to drop Lance table {name!r}: {exc}", file=sys.stderr) - except Exception: - pass - _emit({"success": True, "message": "erase completed"}) - return 0 + def work() -> int: + env = cfg.subprocess_env() + drop = run_cocoindex_drop(env, quiet=bool(args.quiet)) + if drop.returncode == 127: + print( + "java-codebase-rag erase: cocoindex CLI not found next to this Python; " + "skipped `cocoindex drop` — cocoindex.db (if any) was not removed by CocoIndex.", + file=sys.stderr, + ) + elif drop.returncode != 0: + print(clip(drop.stderr, 4000), file=sys.stderr) + if cfg.kuzu_path.exists(): + shutil.rmtree(cfg.kuzu_path, ignore_errors=True) + if cfg.cocoindex_db.exists(): + try: + cfg.cocoindex_db.unlink() + except OSError: + pass + if cfg.index_dir.is_dir(): + try: + import lancedb + + db = lancedb.connect(str(cfg.index_dir.resolve())) + for name in list(db.table_names()): + try: + db.drop_table(name) + except Exception as exc: + print(f"warning: failed to drop Lance table {name!r}: {exc}", file=sys.stderr) + except Exception: + pass + _emit({"success": True, "message": "erase completed"}) + return 0 + + return _run_with_pipeline_progress("erase", cfg, quiet=bool(args.quiet), work=work) def _cmd_meta(args: argparse.Namespace) -> int: @@ -470,6 +541,8 @@ def _cmd_analyze_pr(args: argparse.Namespace) -> int: def build_parser() -> argparse.ArgumentParser: description = ( "java-codebase-rag — graph-native code intelligence for Java microservices.\n\n" + "Lifecycle commands stream subprocess progress to stderr (including relayed child stdout); " + "--quiet suppresses that stream; stdout remains the machine-readable payload.\n\n" "Lifecycle (manage the index):\n" " init Create a fresh index from a Java repository.\n" " increment Pick up changes since the last index update (Lance only).\n" @@ -500,7 +573,11 @@ def build_parser() -> argparse.ArgumentParser: ), ) _add_index_embedding_flags(init) - init.add_argument("--quiet", action="store_true") + init.add_argument( + "--quiet", + action="store_true", + help="Suppress stderr progress relay; stdout payload unchanged.", + ) init.set_defaults(handler=_cmd_init) increment = subparsers.add_parser( @@ -509,7 +586,11 @@ def build_parser() -> argparse.ArgumentParser: description="Runs cocoindex catch-up (no full reprocess). Does not rebuild Kuzu; see stderr warning.", ) _add_index_embedding_flags(increment) - increment.add_argument("--quiet", action="store_true") + increment.add_argument( + "--quiet", + action="store_true", + help="Suppress stderr progress relay; stdout payload unchanged.", + ) increment.set_defaults(handler=_cmd_increment) reprocess = subparsers.add_parser( @@ -521,7 +602,11 @@ def build_parser() -> argparse.ArgumentParser: ), ) _add_index_embedding_flags(reprocess) - reprocess.add_argument("--quiet", action="store_true") + reprocess.add_argument( + "--quiet", + action="store_true", + help="Suppress stderr progress relay; stdout payload unchanged.", + ) _rex = reprocess.add_mutually_exclusive_group() _rex.add_argument( "--vectors-only", @@ -542,6 +627,11 @@ def build_parser() -> argparse.ArgumentParser: ) _add_index_embedding_flags(erase) erase.add_argument("--yes", action="store_true", help="Confirm destructive deletion (required in CI)") + erase.add_argument( + "--quiet", + action="store_true", + help="Suppress stderr progress relay; stdout payload unchanged.", + ) erase.set_defaults(handler=_cmd_erase) meta = subparsers.add_parser("meta", help="Print graph meta and embedding resolution.") diff --git a/tests/README.md b/tests/README.md index a93a2c6..23b3c15 100644 --- a/tests/README.md +++ b/tests/README.md @@ -62,12 +62,18 @@ The session-scoped fixtures in `conftest.py` materialize Kuzu (and, where needed The heavier end-to-end test that runs `cocoindex` + a real LanceDB index is gated behind `JAVA_CODEBASE_RAG_RUN_HEAVY=1` because it downloads the embedding model on first run and indexes the corpus from scratch (~minute on a -warm cache, several minutes cold). +warm cache, several minutes cold). The same gate applies to full +`java-codebase-rag` lifecycle subprocess checks in +`tests/test_cli_progress_stdout_invariant.py` and the cocoindex portion of +`tests/test_cli_quiet_parity.py` so default `pytest tests` stays fast when +`cocoindex` is installed. ```bash JAVA_CODEBASE_RAG_RUN_HEAVY=1 .venv/bin/pytest tests -v ``` +**`JAVA_CODEBASE_RAG_TEST_GRAPH_SLOW_SEC`:** optional float read by `build_ast_graph.py` in pass1 only. When set (e.g. `6`), pass1 sleeps that many seconds under `--verbose` so tests can assert heartbeat lines. Leave unset for normal `pytest` runs. + --- ## ⚠️ Note for future contributors — DO NOT OVERFIT THE MCP TO THIS CORPUS diff --git a/tests/test_cli_progress_stdout_invariant.py b/tests/test_cli_progress_stdout_invariant.py index 941f313..0a2996c 100644 --- a/tests/test_cli_progress_stdout_invariant.py +++ b/tests/test_cli_progress_stdout_invariant.py @@ -2,6 +2,7 @@ import asyncio import io +import json import os import shutil import subprocess @@ -114,6 +115,7 @@ async def communicate(self) -> tuple[bytes, bytes]: assert "idx_out" in out.stdout +@pytest.mark.skipif(os.environ.get("JAVA_CODEBASE_RAG_RUN_HEAVY", "").strip() != "1", reason="cocoindex lifecycle; set JAVA_CODEBASE_RAG_RUN_HEAVY=1") @pytest.mark.skipif(not _cocoindex_available(), reason="cocoindex not installed in venv") def test_cli_lifecycle_stdout_invariant_init(corpus_root: Path, tmp_path: Path) -> None: baseline = (_FIXTURE_DIR / "init_quiet_success.stdout.txt").read_text(encoding="utf-8") @@ -180,3 +182,94 @@ async def fake_refresh(*, quiet: bool = False) -> RefreshIndexOutput: ) assert rc == 0 assert buf.getvalue() == baseline + + +def test_cli_lifecycle_stdout_invariant_erase_quiet(tmp_path: Path) -> None: + idx = tmp_path / "idx_so" + idx.mkdir() + env = os.environ.copy() + env["JAVA_CODEBASE_RAG_INDEX_DIR"] = str(idx) + env["JAVA_CODEBASE_RAG_SOURCE_ROOT"] = str(tmp_path) + proc = _run_cli( + ["erase", "--source-root", str(tmp_path), "--index-dir", str(idx), "--yes", "--quiet"], + env=env, + ) + assert proc.returncode == 0 + assert proc.stdout.strip() == '{"message": "erase completed", "success": true}' + + +@pytest.mark.skipif(os.environ.get("JAVA_CODEBASE_RAG_RUN_HEAVY", "").strip() != "1", reason="cocoindex lifecycle; set JAVA_CODEBASE_RAG_RUN_HEAVY=1") +@pytest.mark.skipif(not _cocoindex_available(), reason="cocoindex not installed in venv") +def test_cli_lifecycle_stdout_invariant_init_increment_reprocess_when_cocoindex( + tmp_path: Path, + corpus_root: Path, +) -> None: + idx = tmp_path / "idx_inv" + env = os.environ.copy() + env["JAVA_CODEBASE_RAG_INDEX_DIR"] = str(idx) + env["JAVA_CODEBASE_RAG_SOURCE_ROOT"] = str(corpus_root.resolve()) + + r_pre = _run_cli( + ["erase", "--source-root", str(corpus_root), "--index-dir", str(idx), "--yes", "--quiet"], + env=env, + ) + assert r_pre.returncode == 0, r_pre.stderr + + r_init = _run_cli( + ["init", "--source-root", str(corpus_root), "--index-dir", str(idx), "--quiet"], + env=env, + ) + assert r_init.returncode == 0, r_init.stderr + r_init.stdout + assert r_init.stdout.strip() == '{"message": "init completed", "success": true}' + + r_inc = _run_cli( + ["increment", "--source-root", str(corpus_root), "--index-dir", str(idx), "--quiet"], + env=env, + ) + assert r_inc.returncode == 0, r_inc.stderr + r_inc.stdout + inc_payload = json.loads(r_inc.stdout) + assert inc_payload == { + "success": True, + "message": "increment completed (Lance only; graph may be stale — see stderr)", + } + + r_rep = _run_cli( + ["reprocess", "--source-root", str(corpus_root), "--index-dir", str(idx), "--quiet"], + env=env, + ) + assert r_rep.returncode == 0, r_rep.stderr + r_rep.stdout + rep_payload = json.loads(r_rep.stdout) + assert rep_payload.get("success") is True + assert isinstance(rep_payload.get("stdout"), str) + assert isinstance(rep_payload.get("graph_stderr"), str) + + +def test_pipeline_footer_reflects_exception_before_propagate(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: + from java_codebase_rag.config import resolve_operator_config + + import java_codebase_rag.cli as cli_mod + + cfg = resolve_operator_config(source_root=tmp_path, cli_index_dir=str(tmp_path / "ix_footer")) + codes: list[int] = [] + + def capture_footer(_sub: str, _t0: float, code: int) -> None: + codes.append(code) + + monkeypatch.setattr(cli_mod, "_pipeline_footer", capture_footer) + + def boom() -> int: + raise RuntimeError("simulated handler failure") + + with pytest.raises(RuntimeError, match="simulated handler failure"): + cli_mod._run_with_pipeline_progress("reprocess", cfg, quiet=False, work=boom) + assert codes == [2] + + codes.clear() + + def exit5() -> int: + raise SystemExit(5) + + with pytest.raises(SystemExit) as excinfo: + cli_mod._run_with_pipeline_progress("reprocess", cfg, quiet=False, work=exit5) + assert excinfo.value.code == 5 + assert codes == [5] diff --git a/tests/test_cli_quiet_parity.py b/tests/test_cli_quiet_parity.py new file mode 100644 index 0000000..365647e --- /dev/null +++ b/tests/test_cli_quiet_parity.py @@ -0,0 +1,172 @@ +"""PR-3: quiet stderr parity + graph builder pass start / heartbeat + pipeline banner.""" + +from __future__ import annotations + +import os +import re +import shutil +import subprocess +import sys +from pathlib import Path + +import pytest + +from java_codebase_rag import cli as cli_mod + +REPO = Path(__file__).resolve().parent.parent +BUILDER = REPO / "build_ast_graph.py" +FIXTURE_ROOT = REPO / "tests" / "fixtures" / "call_graph_smoke" +_PASS1_START = "[pass1] starting · parsing Java files under source root" + + +def _cocoindex_available() -> bool: + return (Path(sys.executable).parent / "cocoindex").is_file() + + +def _assert_quiet_stderr_no_progress_markers(stderr: str) -> None: + assert "[lance]" not in stderr + assert "] starting ·" not in stderr + assert "] running …" not in stderr + assert " · source=" not in stderr, stderr + + +def test_pass_heartbeat_fires_when_pass_slowed(tmp_path: Path) -> None: + kuzu = tmp_path / "g.kuzu" + env = os.environ.copy() + env["JAVA_CODEBASE_RAG_TEST_GRAPH_SLOW_SEC"] = "6" + proc = subprocess.run( + [ + sys.executable, + str(BUILDER), + "--source-root", + str(FIXTURE_ROOT), + "--kuzu-path", + str(kuzu), + "--verbose", + ], + cwd=str(REPO), + env=env, + capture_output=True, + text=True, + check=False, + ) + assert proc.returncode == 0, proc.stderr + proc.stdout + err = proc.stderr + assert "[pass1] running …" in err + hb_pos = err.index("[pass1] running …") + summary_pos = err.index("[pass1] parsed") + assert hb_pos < summary_pos + + +def test_pass_start_before_pass_body(tmp_path: Path) -> None: + kuzu = tmp_path / "g2.kuzu" + proc = subprocess.run( + [ + sys.executable, + str(BUILDER), + "--source-root", + str(FIXTURE_ROOT), + "--kuzu-path", + str(kuzu), + "--verbose", + ], + cwd=str(REPO), + env=os.environ.copy(), + capture_output=True, + text=True, + check=False, + ) + assert proc.returncode == 0, proc.stderr + err = proc.stderr + assert err.find(_PASS1_START) < err.find("[pass1] parsed") + + +def test_pipeline_header_footer_present(tmp_path: Path) -> None: + exe = shutil.which("java-codebase-rag") + if exe is None: + pytest.skip("java-codebase-rag entrypoint not on PATH") + idx = tmp_path / "idx_pf" + idx.mkdir() + env = os.environ.copy() + env["JAVA_CODEBASE_RAG_INDEX_DIR"] = str(idx) + env["JAVA_CODEBASE_RAG_SOURCE_ROOT"] = str(tmp_path) + proc = subprocess.run( + [exe, "erase", "--source-root", str(tmp_path), "--index-dir", str(idx), "--yes"], + capture_output=True, + text=True, + env=env, + check=False, + ) + assert proc.returncode == 0, proc.stderr + proc.stdout + err = proc.stderr + assert re.search(r"java-codebase-rag erase · source=.* · index=", err) + assert re.search(r"java-codebase-rag erase · finished in [0-9]+\.[0-9]{2}s \(exit=0\)", err) + + +def test_cli_quiet_stderr_baseline_per_subcommand(tmp_path: Path, corpus_root: Path) -> None: + exe = shutil.which("java-codebase-rag") + if exe is None: + pytest.skip("java-codebase-rag entrypoint not on PATH") + + idx_erase = tmp_path / "idx_qe" + idx_erase.mkdir() + env_erase = os.environ.copy() + env_erase["JAVA_CODEBASE_RAG_INDEX_DIR"] = str(idx_erase) + env_erase["JAVA_CODEBASE_RAG_SOURCE_ROOT"] = str(tmp_path) + r_erase = subprocess.run( + [exe, "erase", "--source-root", str(tmp_path), "--index-dir", str(idx_erase), "--yes", "--quiet"], + capture_output=True, + text=True, + env=env_erase, + check=False, + ) + assert r_erase.returncode == 0, r_erase.stderr + r_erase.stdout + _assert_quiet_stderr_no_progress_markers(r_erase.stderr) + + if os.environ.get("JAVA_CODEBASE_RAG_RUN_HEAVY", "").strip() != "1": + pytest.skip("cocoindex init/increment/reprocess quiet checks; set JAVA_CODEBASE_RAG_RUN_HEAVY=1") + if not _cocoindex_available(): + pytest.skip("cocoindex CLI missing — skip init/increment/reprocess quiet checks") + + idx_life = tmp_path / "idx_lf" + env_life = os.environ.copy() + env_life["JAVA_CODEBASE_RAG_INDEX_DIR"] = str(idx_life) + env_life["JAVA_CODEBASE_RAG_SOURCE_ROOT"] = str(corpus_root.resolve()) + + subprocess.run( + [exe, "erase", "--source-root", str(corpus_root), "--index-dir", str(idx_life), "--yes", "--quiet"], + capture_output=True, + text=True, + env=env_life, + check=False, + ) + r_init = subprocess.run( + [exe, "init", "--source-root", str(corpus_root), "--index-dir", str(idx_life), "--quiet"], + capture_output=True, + text=True, + env=env_life, + check=False, + ) + assert r_init.returncode == 0, r_init.stderr + r_init.stdout + _assert_quiet_stderr_no_progress_markers(r_init.stderr) + + r_inc = subprocess.run( + [exe, "increment", "--source-root", str(corpus_root), "--index-dir", str(idx_life), "--quiet"], + capture_output=True, + text=True, + env=env_life, + check=False, + ) + assert r_inc.returncode == 0, r_inc.stderr + r_inc.stdout + _assert_quiet_stderr_no_progress_markers(r_inc.stderr) + assert r_inc.stderr == "\n".join(cli_mod._INCREMENT_WARNING_LINES) + "\n" + + r_rep = subprocess.run( + [exe, "reprocess", "--source-root", str(corpus_root), "--index-dir", str(idx_life), "--quiet"], + capture_output=True, + text=True, + env=env_life, + check=False, + ) + assert r_rep.returncode == 0, r_rep.stderr + r_rep.stdout + _assert_quiet_stderr_no_progress_markers(r_rep.stderr)