diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index 176eecc2d4..029221ffbc 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -61,13 +61,14 @@ ) from .integration_state import ( INTEGRATION_JSON, - INTEGRATION_STATE_SCHEMA, + IntegrationStateError as _IntegrationStateError, + IntegrationStateSchemaError as _IntegrationStateSchemaError, dedupe_integration_keys as _dedupe_integration_keys, default_integration_key as _default_integration_key, installed_integration_keys as _installed_integration_keys, integration_setting as _integration_setting, integration_settings as _integration_settings, - normalize_integration_state as _normalize_integration_state, + read_integration_state as _read_integration_state, write_integration_json as _write_integration_json_file, ) from .shared_infra import ( @@ -1926,34 +1927,19 @@ def get_speckit_version() -> str: def _read_integration_json(project_root: Path) -> dict[str, Any]: """Load ``.specify/integration.json``. Returns normalized state when present.""" - path = project_root / INTEGRATION_JSON - if not path.exists(): - return {} try: - data = json.loads(path.read_text(encoding="utf-8")) - except json.JSONDecodeError as exc: - console.print(f"[red]Error:[/red] {path} contains invalid JSON.") - console.print(f"Please fix or delete {INTEGRATION_JSON} and retry.") - console.print(f"[dim]Details:[/dim] {exc}") - raise typer.Exit(1) - except OSError as exc: - console.print(f"[red]Error:[/red] Could not read {path}.") - console.print(f"Please fix file permissions or delete {INTEGRATION_JSON} and retry.") - console.print(f"[dim]Details:[/dim] {exc}") + state = _read_integration_state(project_root) + except _IntegrationStateSchemaError as exc: + console.print(f"[red]Error:[/red] {exc}") + console.print("Please upgrade Spec Kit before modifying integrations.") raise typer.Exit(1) - if not isinstance(data, dict): - console.print(f"[red]Error:[/red] {path} must contain a JSON object, got {type(data).__name__}.") + except _IntegrationStateError as exc: + console.print(f"[red]Error:[/red] {exc}") console.print(f"Please fix or delete {INTEGRATION_JSON} and retry.") raise typer.Exit(1) - schema = data.get("integration_state_schema") - if isinstance(schema, int) and not isinstance(schema, bool) and schema > INTEGRATION_STATE_SCHEMA: - console.print( - f"[red]Error:[/red] {path} uses integration state schema {schema}, " - f"but this CLI only supports schema {INTEGRATION_STATE_SCHEMA}." - ) - console.print("Please upgrade Spec Kit before modifying integrations.") - raise typer.Exit(1) - return _normalize_integration_state(data) + if state is None: + return {} + return state def _write_integration_json( diff --git a/src/specify_cli/integration_state.py b/src/specify_cli/integration_state.py index ac892dfbf6..5561f290a8 100644 --- a/src/specify_cli/integration_state.py +++ b/src/specify_cli/integration_state.py @@ -9,6 +9,15 @@ INTEGRATION_JSON = ".specify/integration.json" INTEGRATION_STATE_SCHEMA = 1 +SPECIFY_DIR = ".specify" + + +class IntegrationStateError(Exception): + """Raised when integration.json is invalid or unreadable.""" + + +class IntegrationStateSchemaError(IntegrationStateError): + """Raised when integration.json uses an unsupported schema version.""" def clean_integration_key(key: Any) -> str | None: @@ -159,3 +168,103 @@ def write_integration_json( data["default_integration"] = integration_key dest.write_text(json.dumps(data, indent=2) + "\n", encoding="utf-8") + + +def read_integration_state(project_root: Path) -> dict[str, Any] | None: + """Read and normalize ``.specify/integration.json``. + + Returns None if the file does not exist. + + Raises + ------ + IntegrationStateSchemaError + If the file declares a schema version newer than this CLI supports. + IntegrationStateError + If the file cannot be read, parsed, or is not a JSON object. + """ + + path = project_root / INTEGRATION_JSON + if not path.exists(): + return None + if not path.is_file(): + raise IntegrationStateError( + f"{path} exists but is not a regular file." + ) + + try: + data = json.loads(path.read_text(encoding="utf-8")) + except (OSError, UnicodeDecodeError) as exc: + raise IntegrationStateError(f"Could not read {path}") from exc + except json.JSONDecodeError as exc: + raise IntegrationStateError(f"{path} contains invalid JSON") from exc + + if not isinstance(data, dict): + raise IntegrationStateError( + f"{path} must contain a JSON object, got {type(data).__name__}." + ) + + schema = data.get("integration_state_schema") + if ( + isinstance(schema, int) + and not isinstance(schema, bool) + and schema > INTEGRATION_STATE_SCHEMA + ): + raise IntegrationStateSchemaError( + f"{path} uses integration state schema {schema}, " + f"but this CLI only supports schema {INTEGRATION_STATE_SCHEMA}." + ) + + return normalize_integration_state(data) + + +def resolve_project_integration(project_root: Path) -> str: + """Return the active integration key for a project. + + Fallback chain: + - ``.specify/integration.json`` (normalized, schema-guarded) + - ``.specify/init-options.json`` (legacy keys: ``integration`` then ``ai``) + - ``"copilot"`` (hard-coded default) + + Notes + ----- + If ``integration.json`` exists but is unreadable/invalid or declares a future + schema version, this function raises an exception instead of silently falling + back. That keeps the engine and CLI consistent. + """ + + state = read_integration_state(project_root) # raises on invalid files + if state is not None: + key = default_integration_key(state) + if key and key != "auto": + return key + + init_opts_path = project_root / f"{SPECIFY_DIR}/init-options.json" + integration = _read_legacy_init_options(init_opts_path, "integration", "ai") + if integration is not None: + return integration + + return "copilot" + + +def _read_legacy_init_options(path: Path, *keys: str) -> str | None: + """Read a string value from a legacy init-options.json file.""" + + if not path.is_file(): + return None + + try: + data = json.loads(path.read_text(encoding="utf-8")) + except (OSError, UnicodeDecodeError, json.JSONDecodeError): + return None + + if not isinstance(data, dict): + return None + + for key in keys: + value = data.get(key) + if isinstance(value, str): + cleaned = value.strip() + if cleaned and cleaned != "auto": + return cleaned + + return None diff --git a/src/specify_cli/workflows/engine.py b/src/specify_cli/workflows/engine.py index d6a73bbeb0..4d6195b1c5 100644 --- a/src/specify_cli/workflows/engine.py +++ b/src/specify_cli/workflows/engine.py @@ -19,6 +19,10 @@ import yaml +from specify_cli.integration_state import ( + resolve_project_integration as _resolve_project_integration, +) + from .base import RunStatus, StepContext, StepResult, StepStatus @@ -82,15 +86,25 @@ def from_string(cls, content: str) -> WorkflowDefinition: # ID format: lowercase alphanumeric with hyphens _ID_PATTERN = re.compile(r"^[a-z0-9][a-z0-9-]*[a-z0-9]$|^[a-z0-9]$") + # Valid step types (matching STEP_REGISTRY keys) def _get_valid_step_types() -> set[str]: """Return valid step types from the registry, with a built-in fallback.""" from . import STEP_REGISTRY + if STEP_REGISTRY: return set(STEP_REGISTRY.keys()) return { - "command", "shell", "prompt", "gate", "if", - "switch", "while", "do-while", "fan-out", "fan-in", + "command", + "shell", + "prompt", + "gate", + "if", + "switch", + "while", + "do-while", + "fan-out", + "fan-in", } @@ -104,8 +118,7 @@ def validate_workflow(definition: WorkflowDefinition) -> list[str]: # -- Schema version --------------------------------------------------- if definition.schema_version not in ("1.0", "1"): errors.append( - f"Unsupported schema_version {definition.schema_version!r}. " - f"Expected '1.0'." + f"Unsupported schema_version {definition.schema_version!r}. Expected '1.0'." ) # -- Top-level fields ------------------------------------------------- @@ -187,9 +200,7 @@ def _validate_steps( # Determine step type step_type = step_config.get("type", "command") if step_type not in _get_valid_step_types(): - errors.append( - f"Step {step_id!r} has invalid type {step_type!r}." - ) + errors.append(f"Step {step_id!r} has invalid type {step_type!r}.") continue # Delegate to step-specific validation @@ -238,7 +249,7 @@ def __init__( project_root: Path | None = None, ) -> None: self.run_id = run_id or str(uuid.uuid4())[:8] - if not re.match(r'^[a-zA-Z0-9][a-zA-Z0-9_-]*$', self.run_id): + if not re.match(r"^[a-zA-Z0-9][a-zA-Z0-9_-]*$", self.run_id): msg = f"Invalid run_id {self.run_id!r}: must be alphanumeric with hyphens/underscores only." raise ValueError(msg) self.workflow_id = workflow_id @@ -248,6 +259,7 @@ def __init__( self.current_step_id: str | None = None self.step_results: dict[str, dict[str, Any]] = {} self.inputs: dict[str, Any] = {} + self.resolved_integration: str | None = None self.created_at = datetime.now(timezone.utc).isoformat() self.updated_at = self.created_at self.log_entries: list[dict[str, Any]] = [] @@ -269,6 +281,7 @@ def save(self) -> None: "current_step_index": self.current_step_index, "current_step_id": self.current_step_id, "step_results": self.step_results, + "resolved_integration": self.resolved_integration, "created_at": self.created_at, "updated_at": self.updated_at, } @@ -300,6 +313,7 @@ def load(cls, run_id: str, project_root: Path) -> RunState: state.current_step_index = state_data.get("current_step_index", 0) state.current_step_id = state_data.get("current_step_id") state.step_results = state_data.get("step_results", {}) + state.resolved_integration = state_data.get("resolved_integration") state.created_at = state_data.get("created_at", "") state.updated_at = state_data.get("updated_at", "") @@ -361,11 +375,7 @@ def load_workflow(self, source: str | Path) -> WorkflowDefinition: # Try as an installed workflow ID installed_path = ( - self.project_root - / ".specify" - / "workflows" - / str(source) - / "workflow.yml" + self.project_root / ".specify" / "workflows" / str(source) / "workflow.yml" ) if installed_path.exists(): return WorkflowDefinition.from_yaml(installed_path) @@ -413,26 +423,32 @@ def execute( run_dir.mkdir(parents=True, exist_ok=True) workflow_copy = run_dir / "workflow.yml" import yaml + with open(workflow_copy, "w", encoding="utf-8") as f: yaml.safe_dump(definition.data, f, sort_keys=False) - # Resolve inputs - resolved_inputs = self._resolve_inputs(definition, inputs or {}) - state.inputs = resolved_inputs - state.status = RunStatus.RUNNING - state.save() + # Resolve inputs, integration, and execute steps. + # All of this is inside the try block so that failures during + # resolution (e.g. corrupted integration.json) are recorded in + # the run state and re-raised for the caller. + try: + resolved_inputs = self._resolve_inputs(definition, inputs or {}) + state.inputs = resolved_inputs + state.resolved_integration = self._resolve_workflow_integration( + definition.default_integration + ) + state.status = RunStatus.RUNNING + state.save() - context = StepContext( - inputs=resolved_inputs, - default_integration=definition.default_integration, - default_model=definition.default_model, - default_options=definition.default_options, - project_root=str(self.project_root), - run_id=state.run_id, - ) + context = StepContext( + inputs=resolved_inputs, + default_integration=state.resolved_integration, + default_model=definition.default_model, + default_options=definition.default_options, + project_root=str(self.project_root), + run_id=state.run_id, + ) - # Execute steps - try: self._execute_steps(definition.steps, context, state, STEP_REGISTRY) except KeyboardInterrupt: state.status = RunStatus.PAUSED @@ -468,30 +484,44 @@ def resume(self, run_id: str) -> RunState: else: definition = self.load_workflow(state.workflow_id) - # Restore context - context = StepContext( - inputs=state.inputs, - steps=state.step_results, - default_integration=definition.default_integration, - default_model=definition.default_model, - default_options=definition.default_options, - project_root=str(self.project_root), - run_id=state.run_id, - ) - from . import STEP_REGISTRY - state.status = RunStatus.RUNNING - state.save() + # Restore context and resume steps. All resolution is inside the try + # block so that failures (e.g. corrupted integration.json) are recorded + # in the run state and re-raised for the caller. + try: + # Use the integration resolved when the run started so changes to + # project metadata during a pause do not redirect remaining steps + # to a different agent. For older runs without the persisted field, + # resolve once here and pin it before saving again. + resolved_integration = ( + state.resolved_integration + if state.resolved_integration is not None + else self._resolve_workflow_integration(definition.default_integration) + ) + state.resolved_integration = resolved_integration + context = StepContext( + inputs=state.inputs, + steps=state.step_results, + default_integration=resolved_integration, + default_model=definition.default_model, + default_options=definition.default_options, + project_root=str(self.project_root), + run_id=state.run_id, + ) - # Resume from the current step — re-execute it so gates - # can prompt interactively again. - remaining_steps = definition.steps[state.current_step_index :] - step_offset = state.current_step_index + state.status = RunStatus.RUNNING + state.save() - try: + # Resume from the current step — re-execute it so gates + # can prompt interactively again. + remaining_steps = definition.steps[state.current_step_index :] + step_offset = state.current_step_index self._execute_steps( - remaining_steps, context, state, STEP_REGISTRY, + remaining_steps, + context, + state, + STEP_REGISTRY, step_offset=step_offset, ) except KeyboardInterrupt: @@ -565,8 +595,7 @@ def _execute_steps( or context.default_model, "options": result.output.get("options") or step_config.get("options", {}), - "input": result.output.get("input") - or step_config.get("input", {}), + "input": result.output.get("input") or step_config.get("input", {}), "output": result.output, "status": result.status.value, } @@ -618,7 +647,10 @@ def _execute_steps( # enhancement. if result.next_steps: self._execute_steps( - result.next_steps, context, state, registry, + result.next_steps, + context, + state, + registry, step_offset=-1, ) if state.status in ( @@ -644,10 +676,15 @@ def _execute_steps( for ns in result.next_steps: ns_copy = dict(ns) if "id" in ns_copy: - ns_copy["id"] = f"{step_id}:{ns_copy['id']}:{_loop_iter + 1}" + ns_copy["id"] = ( + f"{step_id}:{ns_copy['id']}:{_loop_iter + 1}" + ) iter_steps.append(ns_copy) self._execute_steps( - iter_steps, context, state, registry, + iter_steps, + context, + state, + registry, step_offset=-1, ) if state.status in ( @@ -670,7 +707,10 @@ def _execute_steps( base_id = item_step.get("id", "item") item_step["id"] = f"{step_id}:{base_id}:{item_idx}" self._execute_steps( - [item_step], context, state, registry, + [item_step], + context, + state, + registry, step_offset=-1, ) # Collect per-item result for fan-in @@ -711,20 +751,20 @@ def _resolve_inputs( if not isinstance(input_def, dict): continue if name in provided: - resolved[name] = self._coerce_input( - name, provided[name], input_def - ) + value = provided[name] + if name == "integration" and value == "auto": + value = self._resolve_default(name, value) + resolved[name] = self._coerce_input(name, value, input_def) elif "default" in input_def: - resolved[name] = input_def["default"] + default_value = self._resolve_default(name, input_def["default"]) + resolved[name] = self._coerce_input(name, default_value, input_def) elif input_def.get("required", False): msg = f"Required input {name!r} not provided." raise ValueError(msg) return resolved @staticmethod - def _coerce_input( - name: str, value: Any, input_def: dict[str, Any] - ) -> Any: + def _coerce_input(name: str, value: Any, input_def: dict[str, Any]) -> Any: """Coerce a provided input value to the declared type.""" input_type = input_def.get("type", "string") enum_values = input_def.get("enum") @@ -749,13 +789,28 @@ def _coerce_input( if enum_values is not None and value not in enum_values: msg = ( - f"Input {name!r} value {value!r} not in allowed " - f"values: {enum_values}." + f"Input {name!r} value {value!r} not in allowed values: {enum_values}." ) raise ValueError(msg) return value + def _load_project_integration(self) -> str: + """Read the active integration key from project metadata.""" + return _resolve_project_integration(self.project_root) + + def _resolve_default(self, name: str, default: Any) -> Any: + """Resolve special default sentinels against project state.""" + if name == "integration" and default == "auto": + return self._load_project_integration() + return default + + def _resolve_workflow_integration(self, integration: str | None) -> str | None: + """Resolve the workflow-level integration sentinel.""" + if integration == "auto": + return self._resolve_default("integration", "auto") + return integration + def list_runs(self) -> list[dict[str, Any]]: """List all workflow runs in the project.""" runs_dir = self.project_root / ".specify" / "workflows" / "runs" diff --git a/tests/integrations/test_integration_state.py b/tests/integrations/test_integration_state.py index 1d6bdb0268..c1f8b301e2 100644 --- a/tests/integrations/test_integration_state.py +++ b/tests/integrations/test_integration_state.py @@ -1,12 +1,19 @@ -"""Tests for integration state normalization helpers.""" +"""Tests for integration state helpers.""" import json +import pytest + from specify_cli.integration_state import ( INTEGRATION_JSON, + INTEGRATION_STATE_SCHEMA, + IntegrationStateError, + IntegrationStateSchemaError, default_integration_key, integration_setting, normalize_integration_state, + read_integration_state, + resolve_project_integration, write_integration_json, ) @@ -84,3 +91,169 @@ def test_write_integration_json_strips_integration_key(tmp_path): assert state["integration"] == "claude" assert state["default_integration"] == "claude" assert state["installed_integrations"] == ["claude"] + + +class TestReadIntegrationState: + """Tests for read_integration_state().""" + + def test_returns_none_when_file_missing(self, tmp_path): + assert read_integration_state(tmp_path) is None + + def test_raises_when_path_is_directory(self, tmp_path): + """integration.json exists as a directory — should raise, not silently return None.""" + (tmp_path / ".specify").mkdir() + (tmp_path / ".specify" / "integration.json").mkdir() + + with pytest.raises(IntegrationStateError, match="not a regular file"): + read_integration_state(tmp_path) + + def test_returns_normalized_state(self, tmp_path): + data = {"integration": "claude", "version": "0.8.0"} + (tmp_path / ".specify").mkdir() + (tmp_path / ".specify" / "integration.json").write_text( + json.dumps(data), encoding="utf-8" + ) + + result = read_integration_state(tmp_path) + + assert result is not None + assert result["integration"] == "claude" + assert result["default_integration"] == "claude" + + def test_raises_schema_error_on_future_schema(self, tmp_path): + data = {"integration_state_schema": 999} + (tmp_path / ".specify").mkdir() + (tmp_path / ".specify" / "integration.json").write_text( + json.dumps(data), encoding="utf-8" + ) + + with pytest.raises(IntegrationStateSchemaError): + read_integration_state(tmp_path) + + def test_raises_on_invalid_json(self, tmp_path): + (tmp_path / ".specify").mkdir() + (tmp_path / ".specify" / "integration.json").write_text( + "not valid json", encoding="utf-8" + ) + + with pytest.raises(IntegrationStateError): + read_integration_state(tmp_path) + + def test_raises_on_non_dict(self, tmp_path): + (tmp_path / ".specify").mkdir() + (tmp_path / ".specify" / "integration.json").write_text( + json.dumps(["a", "list"]), encoding="utf-8" + ) + + with pytest.raises(IntegrationStateError): + read_integration_state(tmp_path) + + def test_accepts_current_schema(self, tmp_path): + data = {"integration_state_schema": INTEGRATION_STATE_SCHEMA} + (tmp_path / ".specify").mkdir() + (tmp_path / ".specify" / "integration.json").write_text( + json.dumps(data), encoding="utf-8" + ) + + result = read_integration_state(tmp_path) + + assert result is not None + + def test_uses_installed_integrations_fallback(self, tmp_path): + data = {"installed_integrations": ["gemini", "claude"]} + (tmp_path / ".specify").mkdir() + (tmp_path / ".specify" / "integration.json").write_text( + json.dumps(data), encoding="utf-8" + ) + + result = read_integration_state(tmp_path) + + assert result is not None + assert result["default_integration"] == "gemini" + + +class TestResolveProjectIntegration: + """Tests for resolve_project_integration().""" + + def test_from_integration_json(self, tmp_path): + (tmp_path / ".specify").mkdir() + (tmp_path / ".specify" / "integration.json").write_text( + json.dumps({"integration": "opencode"}), encoding="utf-8" + ) + + assert resolve_project_integration(tmp_path) == "opencode" + + def test_fallback_to_init_options(self, tmp_path): + (tmp_path / ".specify").mkdir() + (tmp_path / ".specify" / "init-options.json").write_text( + json.dumps({"integration": "claude"}), encoding="utf-8" + ) + + assert resolve_project_integration(tmp_path) == "claude" + + def test_fallback_to_init_options_ai_key(self, tmp_path): + (tmp_path / ".specify").mkdir() + (tmp_path / ".specify" / "init-options.json").write_text( + json.dumps({"ai": "opencode"}), encoding="utf-8" + ) + + assert resolve_project_integration(tmp_path) == "opencode" + + def test_fallback_to_copilot(self, tmp_path): + assert resolve_project_integration(tmp_path) == "copilot" + + def test_integration_json_takes_priority(self, tmp_path): + (tmp_path / ".specify").mkdir() + (tmp_path / ".specify" / "integration.json").write_text( + json.dumps({"integration": "gemini"}), encoding="utf-8" + ) + (tmp_path / ".specify" / "init-options.json").write_text( + json.dumps({"integration": "claude"}), encoding="utf-8" + ) + + assert resolve_project_integration(tmp_path) == "gemini" + + def test_raises_on_invalid_integration_json(self, tmp_path): + (tmp_path / ".specify").mkdir() + (tmp_path / ".specify" / "integration.json").write_text( + "not valid json", encoding="utf-8" + ) + + with pytest.raises(IntegrationStateError): + resolve_project_integration(tmp_path) + + def test_uses_default_integration_field(self, tmp_path): + (tmp_path / ".specify").mkdir() + (tmp_path / ".specify" / "integration.json").write_text( + json.dumps( + {"default_integration": "gemini", "integration_state_schema": 1} + ), + encoding="utf-8", + ) + + assert resolve_project_integration(tmp_path) == "gemini" + + def test_raises_on_future_schema(self, tmp_path): + (tmp_path / ".specify").mkdir() + (tmp_path / ".specify" / "integration.json").write_text( + json.dumps({"integration_state_schema": 999}), encoding="utf-8" + ) + + with pytest.raises(IntegrationStateSchemaError): + resolve_project_integration(tmp_path) + + def test_whitespace_only_value_falls_through(self, tmp_path): + (tmp_path / ".specify").mkdir() + (tmp_path / ".specify" / "integration.json").write_text( + json.dumps({"integration": " "}), encoding="utf-8" + ) + + assert resolve_project_integration(tmp_path) == "copilot" + + def test_auto_value_falls_through(self, tmp_path): + (tmp_path / ".specify").mkdir() + (tmp_path / ".specify" / "init-options.json").write_text( + json.dumps({"integration": "auto"}), encoding="utf-8" + ) + + assert resolve_project_integration(tmp_path) == "copilot" diff --git a/tests/test_workflows.py b/tests/test_workflows.py index 4c042fc7d5..191a8ad352 100644 --- a/tests/test_workflows.py +++ b/tests/test_workflows.py @@ -25,6 +25,7 @@ # Fixtures # --------------------------------------------------------------------------- + @pytest.fixture def temp_dir(): """Create a temporary directory for tests.""" @@ -86,6 +87,7 @@ def sample_workflow_file(project_dir, sample_workflow_yaml): # ===== Step Registry Tests ===== + class TestStepRegistry: """Test STEP_REGISTRY and auto-discovery.""" @@ -98,8 +100,16 @@ def test_all_step_types_registered(self): from specify_cli.workflows import STEP_REGISTRY expected = { - "command", "shell", "prompt", "gate", "if", "switch", - "while", "do-while", "fan-out", "fan-in", + "command", + "shell", + "prompt", + "gate", + "if", + "switch", + "while", + "do-while", + "fan-out", + "fan-in", } assert expected.issubset(set(STEP_REGISTRY.keys())) @@ -128,6 +138,7 @@ def test_register_step_empty_key_raises(self): class EmptyStep(StepBase): type_key = "" + def execute(self, config, context): return StepResult() @@ -137,6 +148,7 @@ def execute(self, config, context): # ===== Base Classes Tests ===== + class TestBaseClasses: """Test StepBase, StepContext, StepResult.""" @@ -194,6 +206,7 @@ def test_run_status_values(self): # ===== Expression Engine Tests ===== + class TestExpressions: """Test sandboxed expression evaluator.""" @@ -208,9 +221,7 @@ def test_step_output_reference(self): from specify_cli.workflows.expressions import evaluate_expression from specify_cli.workflows.base import StepContext - ctx = StepContext( - steps={"specify": {"output": {"file": "spec.md"}}} - ) + ctx = StepContext(steps={"specify": {"output": {"file": "spec.md"}}}) assert evaluate_expression("{{ steps.specify.output.file }}", ctx) == "spec.md" def test_string_interpolation(self): @@ -233,9 +244,7 @@ def test_comparison_not_equals(self): from specify_cli.workflows.expressions import evaluate_expression from specify_cli.workflows.base import StepContext - ctx = StepContext( - steps={"run-tests": {"output": {"exit_code": 1}}} - ) + ctx = StepContext(steps={"run-tests": {"output": {"exit_code": 1}}}) result = evaluate_expression("{{ steps.run-tests.output.exit_code != 0 }}", ctx) assert result is True @@ -243,11 +252,13 @@ def test_numeric_comparison(self): from specify_cli.workflows.expressions import evaluate_expression from specify_cli.workflows.base import StepContext - ctx = StepContext( - steps={"plan": {"output": {"task_count": 7}}} + ctx = StepContext(steps={"plan": {"output": {"task_count": 7}}}) + assert ( + evaluate_expression("{{ steps.plan.output.task_count > 5 }}", ctx) is True + ) + assert ( + evaluate_expression("{{ steps.plan.output.task_count < 5 }}", ctx) is False ) - assert evaluate_expression("{{ steps.plan.output.task_count > 5 }}", ctx) is True - assert evaluate_expression("{{ steps.plan.output.task_count < 5 }}", ctx) is False def test_boolean_and(self): from specify_cli.workflows.expressions import evaluate_expression @@ -268,7 +279,10 @@ def test_filter_default(self): from specify_cli.workflows.base import StepContext ctx = StepContext() - assert evaluate_expression("{{ inputs.missing | default('fallback') }}", ctx) == "fallback" + assert ( + evaluate_expression("{{ inputs.missing | default('fallback') }}", ctx) + == "fallback" + ) def test_filter_join(self): from specify_cli.workflows.expressions import evaluate_expression @@ -327,7 +341,9 @@ def test_list_indexing(self): from specify_cli.workflows.base import StepContext ctx = StepContext( - steps={"tasks": {"output": {"task_list": [{"file": "a.md"}, {"file": "b.md"}]}}} + steps={ + "tasks": {"output": {"task_list": [{"file": "a.md"}, {"file": "b.md"}]}} + } ) result = evaluate_expression("{{ steps.tasks.output.task_list[0].file }}", ctx) assert result == "a.md" @@ -335,11 +351,13 @@ def test_list_indexing(self): # ===== Integration Dispatch Tests ===== + class TestBuildExecArgs: """Test build_exec_args for CLI-based integrations.""" def test_claude_exec_args(self): from specify_cli.integrations.claude import ClaudeIntegration + impl = ClaudeIntegration() args = impl.build_exec_args("do stuff", model="sonnet-4") assert args[0] == "claude" @@ -351,6 +369,7 @@ def test_claude_exec_args(self): def test_gemini_exec_args(self): from specify_cli.integrations.gemini import GeminiIntegration + impl = GeminiIntegration() args = impl.build_exec_args("do stuff", model="gemini-2.5-pro") assert args[0] == "gemini" @@ -360,6 +379,7 @@ def test_gemini_exec_args(self): def test_codex_exec_args(self): from specify_cli.integrations.codex import CodexIntegration + impl = CodexIntegration() args = impl.build_exec_args("do stuff") assert args[0] == "codex" @@ -371,6 +391,7 @@ def test_copilot_exec_args(self, monkeypatch): monkeypatch.delenv("SPECKIT_COPILOT_ALLOW_ALL_TOOLS", raising=False) monkeypatch.delenv("SPECKIT_ALLOW_ALL_TOOLS", raising=False) from specify_cli.integrations.copilot import CopilotIntegration + impl = CopilotIntegration() args = impl.build_exec_args("do stuff", model="claude-sonnet-4-20250514") assert args[0] == "copilot" @@ -382,6 +403,7 @@ def test_copilot_new_env_var_disables_yolo(self, monkeypatch): monkeypatch.setenv("SPECKIT_COPILOT_ALLOW_ALL_TOOLS", "0") monkeypatch.delenv("SPECKIT_ALLOW_ALL_TOOLS", raising=False) from specify_cli.integrations.copilot import CopilotIntegration + impl = CopilotIntegration() args = impl.build_exec_args("do stuff") assert "--yolo" not in args @@ -391,6 +413,7 @@ def test_copilot_deprecated_env_var_still_honoured(self, monkeypatch): monkeypatch.setenv("SPECKIT_ALLOW_ALL_TOOLS", "0") import warnings from specify_cli.integrations.copilot import CopilotIntegration + impl = CopilotIntegration() with warnings.catch_warnings(record=True) as w: warnings.simplefilter("always") @@ -406,23 +429,27 @@ def test_copilot_new_env_var_takes_precedence(self, monkeypatch): monkeypatch.setenv("SPECKIT_COPILOT_ALLOW_ALL_TOOLS", "1") monkeypatch.setenv("SPECKIT_ALLOW_ALL_TOOLS", "0") from specify_cli.integrations.copilot import CopilotIntegration + impl = CopilotIntegration() args = impl.build_exec_args("do stuff") assert "--yolo" in args def test_ide_only_returns_none(self): from specify_cli.integrations.windsurf import WindsurfIntegration + impl = WindsurfIntegration() assert impl.build_exec_args("test") is None def test_no_model_omits_flag(self): from specify_cli.integrations.claude import ClaudeIntegration + impl = ClaudeIntegration() args = impl.build_exec_args("do stuff", model=None) assert "--model" not in args def test_no_json_omits_flag(self): from specify_cli.integrations.claude import ClaudeIntegration + impl = ClaudeIntegration() args = impl.build_exec_args("do stuff", output_json=False) assert "--output-format" not in args @@ -430,6 +457,7 @@ def test_no_json_omits_flag(self): # ===== Step Type Tests ===== + class TestCommandStep: """Test the command step type.""" @@ -448,7 +476,9 @@ def test_execute_basic(self): "command": "speckit.specify", "input": {"args": "{{ inputs.name }}"}, } - with patch("specify_cli.workflows.steps.command.shutil.which", return_value=None): + with patch( + "specify_cli.workflows.steps.command.shutil.which", return_value=None + ): result = step.execute(config, ctx) assert result.status == StepStatus.FAILED assert result.output["command"] == "speckit.specify" @@ -525,7 +555,9 @@ def test_dispatch_not_attempted_without_cli(self): "command": "speckit.specify", "input": {"args": "{{ inputs.name }}"}, } - with patch("specify_cli.workflows.steps.command.shutil.which", return_value=None): + with patch( + "specify_cli.workflows.steps.command.shutil.which", return_value=None + ): result = step.execute(config, ctx) assert result.status == StepStatus.FAILED assert result.output["dispatched"] is False @@ -554,8 +586,13 @@ def test_dispatch_with_mock_cli(self, tmp_path, monkeypatch): mock_result.stdout = '{"result": "done"}' mock_result.stderr = "" - with patch("specify_cli.workflows.steps.command.shutil.which", return_value="/usr/local/bin/claude"), \ - patch("subprocess.run", return_value=mock_result) as mock_run: + with ( + patch( + "specify_cli.workflows.steps.command.shutil.which", + return_value="/usr/local/bin/claude", + ), + patch("subprocess.run", return_value=mock_result) as mock_run, + ): result = step.execute(config, ctx) assert result.status == StepStatus.COMPLETED @@ -591,8 +628,13 @@ def test_dispatch_failure_returns_failed_status(self, tmp_path): mock_result.stdout = "" mock_result.stderr = "API error" - with patch("specify_cli.workflows.steps.command.shutil.which", return_value="/usr/local/bin/claude"), \ - patch("subprocess.run", return_value=mock_result): + with ( + patch( + "specify_cli.workflows.steps.command.shutil.which", + return_value="/usr/local/bin/claude", + ), + patch("subprocess.run", return_value=mock_result), + ): result = step.execute(config, ctx) assert result.status == StepStatus.FAILED @@ -618,7 +660,9 @@ def test_execute_basic(self): "type": "prompt", "prompt": "Review {{ inputs.file }} for security issues", } - with patch("specify_cli.workflows.steps.prompt.shutil.which", return_value=None): + with patch( + "specify_cli.workflows.steps.prompt.shutil.which", return_value=None + ): result = step.execute(config, ctx) assert result.status == StepStatus.FAILED assert result.output["prompt"] == "Review auth.py for security issues" @@ -676,8 +720,13 @@ def test_dispatch_with_mock_cli(self, tmp_path): mock_result.stdout = "Here is the explanation" mock_result.stderr = "" - with patch("specify_cli.workflows.steps.prompt.shutil.which", return_value="/usr/local/bin/claude"), \ - patch("subprocess.run", return_value=mock_result): + with ( + patch( + "specify_cli.workflows.steps.prompt.shutil.which", + return_value="/usr/local/bin/claude", + ), + patch("subprocess.run", return_value=mock_result), + ): result = step.execute(config, ctx) assert result.status == StepStatus.COMPLETED @@ -765,11 +814,13 @@ def test_validate_invalid_on_reject(self): from specify_cli.workflows.steps.gate import GateStep step = GateStep() - errors = step.validate({ - "id": "test", - "message": "Review", - "on_reject": "invalid", - }) + errors = step.validate( + { + "id": "test", + "message": "Review", + "on_reject": "invalid", + } + ) assert any("on_reject" in e for e in errors) @@ -825,9 +876,7 @@ def test_execute_matches_case(self): from specify_cli.workflows.base import StepContext step = SwitchStep() - ctx = StepContext( - steps={"review": {"output": {"choice": "approve"}}} - ) + ctx = StepContext(steps={"review": {"output": {"choice": "approve"}}}) config = { "id": "route", "expression": "{{ steps.review.output.choice }}", @@ -846,9 +895,7 @@ def test_execute_falls_to_default(self): from specify_cli.workflows.base import StepContext step = SwitchStep() - ctx = StepContext( - steps={"review": {"output": {"choice": "unknown"}}} - ) + ctx = StepContext(steps={"review": {"output": {"choice": "unknown"}}}) config = { "id": "route", "expression": "{{ steps.review.output.choice }}", @@ -866,9 +913,7 @@ def test_execute_no_default_no_match(self): from specify_cli.workflows.base import StepContext step = SwitchStep() - ctx = StepContext( - steps={"review": {"output": {"choice": "other"}}} - ) + ctx = StepContext(steps={"review": {"output": {"choice": "other"}}}) config = { "id": "route", "expression": "{{ steps.review.output.choice }}", @@ -891,12 +936,14 @@ def test_validate_invalid_cases_and_default(self): from specify_cli.workflows.steps.switch import SwitchStep step = SwitchStep() - errors = step.validate({ - "id": "test", - "expression": "{{ x }}", - "cases": {"a": "not-a-list"}, - "default": "also-bad", - }) + errors = step.validate( + { + "id": "test", + "expression": "{{ x }}", + "cases": {"a": "not-a-list"}, + "default": "also-bad", + } + ) assert any("case 'a' must be a list" in e for e in errors) assert any("'default' must be a list" in e for e in errors) @@ -909,9 +956,7 @@ def test_execute_condition_true(self): from specify_cli.workflows.base import StepContext step = WhileStep() - ctx = StepContext( - steps={"run-tests": {"output": {"exit_code": 1}}} - ) + ctx = StepContext(steps={"run-tests": {"output": {"exit_code": 1}}}) config = { "id": "retry", "condition": "{{ steps.run-tests.output.exit_code != 0 }}", @@ -927,9 +972,7 @@ def test_execute_condition_false(self): from specify_cli.workflows.base import StepContext step = WhileStep() - ctx = StepContext( - steps={"run-tests": {"output": {"exit_code": 0}}} - ) + ctx = StepContext(steps={"run-tests": {"output": {"exit_code": 0}}}) config = { "id": "retry", "condition": "{{ steps.run-tests.output.exit_code != 0 }}", @@ -952,7 +995,9 @@ def test_validate_invalid_max_iterations(self): from specify_cli.workflows.steps.while_loop import WhileStep step = WhileStep() - errors = step.validate({"id": "test", "condition": "{{ true }}", "max_iterations": 0, "steps": []}) + errors = step.validate( + {"id": "test", "condition": "{{ true }}", "max_iterations": 0, "steps": []} + ) assert any("must be an integer >= 1" in e for e in errors) @@ -1021,12 +1066,14 @@ def test_validate_steps_not_list(self): from specify_cli.workflows.steps.do_while import DoWhileStep step = DoWhileStep() - errors = step.validate({ - "id": "test", - "condition": "{{ true }}", - "max_iterations": 3, - "steps": "not-a-list", - }) + errors = step.validate( + { + "id": "test", + "condition": "{{ true }}", + "max_iterations": 3, + "steps": "not-a-list", + } + ) assert any("'steps' must be a list" in e for e in errors) @@ -1039,10 +1086,16 @@ def test_execute_with_items(self): step = FanOutStep() ctx = StepContext( - steps={"tasks": {"output": {"task_list": [ - {"file": "a.md"}, - {"file": "b.md"}, - ]}}} + steps={ + "tasks": { + "output": { + "task_list": [ + {"file": "a.md"}, + {"file": "b.md"}, + ] + } + } + } ) config = { "id": "parallel", @@ -1081,11 +1134,13 @@ def test_validate_step_not_mapping(self): from specify_cli.workflows.steps.fan_out import FanOutStep step = FanOutStep() - errors = step.validate({ - "id": "test", - "items": "{{ x }}", - "step": "not-a-dict", - }) + errors = step.validate( + { + "id": "test", + "items": "{{ x }}", + "step": "not-a-dict", + } + ) assert any("'step' must be a mapping" in e for e in errors) @@ -1098,9 +1153,7 @@ def test_execute_collects_results(self): step = FanInStep() ctx = StepContext( - steps={ - "parallel": {"output": {"item_count": 2, "status": "done"}} - } + steps={"parallel": {"output": {"item_count": 2, "status": "done"}}} ) config = { "id": "collect", @@ -1163,6 +1216,7 @@ def test_validate_wait_for_not_list(self): # ===== Workflow Definition Tests ===== + class TestWorkflowDefinition: """Test WorkflowDefinition loading and parsing.""" @@ -1199,6 +1253,7 @@ def test_inputs_parsed(self, sample_workflow_yaml): # ===== Workflow Validation Tests ===== + class TestWorkflowValidation: """Test workflow validation.""" @@ -1326,6 +1381,7 @@ def test_invalid_input_type(self): # ===== Workflow Engine Tests ===== + class TestWorkflowEngine: """Test WorkflowEngine execution.""" @@ -1374,7 +1430,9 @@ def test_execute_simple_workflow(self, project_dir): """ definition = WorkflowDefinition.from_string(yaml_str) engine = WorkflowEngine(project_dir) - with patch("specify_cli.workflows.steps.command.shutil.which", return_value=None): + with patch( + "specify_cli.workflows.steps.command.shutil.which", return_value=None + ): state = engine.execute(definition, {"name": "login"}) assert state.status == RunStatus.FAILED @@ -1496,8 +1554,512 @@ def test_execute_missing_required_input(self, project_dir): engine.execute(definition, {}) +class TestIntegrationAutoDetect: + """Tests for integration auto-detection in the workflow engine.""" + + def test_integration_auto_default_uses_project_integration(self, project_dir): + from specify_cli.workflows.engine import WorkflowDefinition, WorkflowEngine + + (project_dir / ".specify" / "integration.json").write_text( + '{"integration": "opencode"}', encoding="utf-8" + ) + engine = WorkflowEngine(project_dir) + yaml_str = """ +schema_version: "1.0" +workflow: + id: "auto-test" + name: "Auto Test" + version: "1.0.0" +inputs: + integration: + type: string + default: "auto" +steps: + - id: echo + type: shell + run: "echo {{ inputs.integration }}" +""" + definition = WorkflowDefinition.from_string(yaml_str) + resolved = engine._resolve_inputs(definition, {}) + assert resolved["integration"] == "opencode" + + def test_integration_auto_default_falls_back_to_copilot_when_no_json( + self, project_dir + ): + from specify_cli.workflows.engine import WorkflowDefinition, WorkflowEngine + + engine = WorkflowEngine(project_dir) + yaml_str = """ +schema_version: "1.0" +workflow: + id: "fallback-test" + name: "Fallback Test" + version: "1.0.0" +inputs: + integration: + type: string + default: "auto" +steps: + - id: echo + type: shell + run: "echo {{ inputs.integration }}" +""" + definition = WorkflowDefinition.from_string(yaml_str) + resolved = engine._resolve_inputs(definition, {}) + assert resolved["integration"] == "copilot" + + def test_integration_explicit_input_overrides_auto(self, project_dir): + from specify_cli.workflows.engine import WorkflowDefinition, WorkflowEngine + + (project_dir / ".specify" / "integration.json").write_text( + '{"integration": "opencode"}', encoding="utf-8" + ) + engine = WorkflowEngine(project_dir) + yaml_str = """ +schema_version: "1.0" +workflow: + id: "explicit-test" + name: "Explicit Test" + version: "1.0.0" +inputs: + integration: + type: string + default: "auto" +steps: + - id: echo + type: shell + run: "echo {{ inputs.integration }}" +""" + definition = WorkflowDefinition.from_string(yaml_str) + resolved = engine._resolve_inputs(definition, {"integration": "claude"}) + assert resolved["integration"] == "claude" + + def test_integration_explicit_auto_input_also_resolves(self, project_dir): + from specify_cli.workflows.engine import WorkflowDefinition, WorkflowEngine + + (project_dir / ".specify" / "integration.json").write_text( + '{"integration": "gemini"}', encoding="utf-8" + ) + engine = WorkflowEngine(project_dir) + yaml_str = """ +schema_version: "1.0" +workflow: + id: "explicit-auto-test" + name: "Explicit Auto Test" + version: "1.0.0" +inputs: + integration: + type: string + default: "auto" +steps: + - id: echo + type: shell + run: "echo {{ inputs.integration }}" +""" + definition = WorkflowDefinition.from_string(yaml_str) + resolved = engine._resolve_inputs(definition, {"integration": "auto"}) + assert resolved["integration"] == "gemini" + + def test_integration_auto_raises_on_malformed_integration_json(self, project_dir): + from specify_cli.integration_state import IntegrationStateError + from specify_cli.workflows.engine import WorkflowEngine + + (project_dir / ".specify" / "integration.json").write_text( + "not valid json", encoding="utf-8" + ) + engine = WorkflowEngine(project_dir) + with pytest.raises(IntegrationStateError): + engine._load_project_integration() + + def test_integration_auto_raises_on_oserror(self, project_dir): + from unittest.mock import patch + + from specify_cli.integration_state import IntegrationStateError + from specify_cli.workflows.engine import WorkflowEngine + + (project_dir / ".specify" / "integration.json").write_text( + '{"integration": "claude"}', encoding="utf-8" + ) + engine = WorkflowEngine(project_dir) + with patch("pathlib.Path.read_text", side_effect=OSError("permission denied")): + with pytest.raises(IntegrationStateError): + engine._load_project_integration() + + def test_integration_auto_ignores_whitespace_only_value(self, project_dir): + from specify_cli.workflows.engine import WorkflowEngine + + (project_dir / ".specify" / "integration.json").write_text( + '{"integration": " "}', encoding="utf-8" + ) + engine = WorkflowEngine(project_dir) + assert engine._load_project_integration() == "copilot" + + def test_integration_auto_falls_back_to_init_options_json(self, project_dir): + from specify_cli.workflows.engine import WorkflowEngine + + (project_dir / ".specify" / "init-options.json").write_text( + '{"integration": "claude"}', encoding="utf-8" + ) + engine = WorkflowEngine(project_dir) + assert engine._load_project_integration() == "claude" + + def test_integration_auto_init_options_ai_key_fallback(self, project_dir): + from specify_cli.workflows.engine import WorkflowEngine + + (project_dir / ".specify" / "init-options.json").write_text( + '{"ai": "opencode"}', encoding="utf-8" + ) + engine = WorkflowEngine(project_dir) + assert engine._load_project_integration() == "opencode" + + def test_integration_auto_integration_json_takes_priority(self, project_dir): + from specify_cli.workflows.engine import WorkflowEngine + + (project_dir / ".specify" / "integration.json").write_text( + '{"integration": "gemini"}', encoding="utf-8" + ) + (project_dir / ".specify" / "init-options.json").write_text( + '{"integration": "claude"}', encoding="utf-8" + ) + engine = WorkflowEngine(project_dir) + assert engine._load_project_integration() == "gemini" + + def test_integration_explicit_auto_with_enum_constraint(self, project_dir): + from specify_cli.workflows.engine import WorkflowDefinition, WorkflowEngine + + (project_dir / ".specify" / "integration.json").write_text( + '{"integration": "claude"}', encoding="utf-8" + ) + engine = WorkflowEngine(project_dir) + yaml_str = """ +schema_version: "1.0" +workflow: + id: "enum-auto-test" + name: "Enum Auto Test" + version: "1.0.0" +inputs: + integration: + type: string + enum: ["claude", "copilot"] + default: "auto" +steps: + - id: echo + type: shell + run: "echo {{ inputs.integration }}" +""" + definition = WorkflowDefinition.from_string(yaml_str) + resolved = engine._resolve_inputs(definition, {"integration": "auto"}) + assert resolved["integration"] == "claude" + + def test_integration_auto_default_validates_against_enum(self, project_dir): + """Auto default resolving to an allowed value passes enum validation.""" + from specify_cli.workflows.engine import WorkflowDefinition, WorkflowEngine + + (project_dir / ".specify" / "integration.json").write_text( + '{"integration": "claude"}', encoding="utf-8" + ) + engine = WorkflowEngine(project_dir) + yaml_str = """ +schema_version: "1.0" +workflow: + id: "enum-default-test" + name: "Enum Default Test" + version: "1.0.0" +inputs: + integration: + type: string + enum: ["claude", "copilot"] + default: "auto" +steps: + - id: echo + type: shell + run: "echo {{ inputs.integration }}" +""" + definition = WorkflowDefinition.from_string(yaml_str) + resolved = engine._resolve_inputs(definition, {}) + assert resolved["integration"] == "claude" + + def test_integration_auto_default_rejects_value_not_in_enum(self, project_dir): + """Auto default resolving to a value outside the enum raises ValueError.""" + from specify_cli.workflows.engine import WorkflowDefinition, WorkflowEngine + + (project_dir / ".specify" / "integration.json").write_text( + '{"integration": "opencode"}', encoding="utf-8" + ) + engine = WorkflowEngine(project_dir) + yaml_str = """ +schema_version: "1.0" +workflow: + id: "enum-reject-test" + name: "Enum Reject Test" + version: "1.0.0" +inputs: + integration: + type: string + enum: ["copilot", "claude"] + default: "auto" +steps: + - id: echo + type: shell + run: "echo {{ inputs.integration }}" +""" + definition = WorkflowDefinition.from_string(yaml_str) + with pytest.raises(ValueError, match="not in allowed values"): + engine._resolve_inputs(definition, {}) + + def test_workflow_level_integration_auto_resolves_in_context(self, project_dir): + from specify_cli.workflows.engine import WorkflowEngine + + (project_dir / ".specify" / "integration.json").write_text( + '{"integration": "opencode"}', encoding="utf-8" + ) + engine = WorkflowEngine(project_dir) + resolved = engine._resolve_workflow_integration("auto") + assert resolved == "opencode" + + def test_integration_auto_reads_default_integration_field(self, project_dir): + from specify_cli.workflows.engine import WorkflowEngine + + (project_dir / ".specify" / "integration.json").write_text( + '{"default_integration": "gemini", "integration_state_schema": 1}', + encoding="utf-8", + ) + engine = WorkflowEngine(project_dir) + assert engine._load_project_integration() == "gemini" + + def test_resolved_integration_persisted_in_run_state(self, project_dir): + from specify_cli.workflows.base import RunStatus + from specify_cli.workflows.engine import ( + RunState, + WorkflowDefinition, + WorkflowEngine, + ) + + (project_dir / ".specify" / "integration.json").write_text( + '{"integration": "opencode"}', encoding="utf-8" + ) + engine = WorkflowEngine(project_dir) + yaml_str = """ +schema_version: "1.0" +workflow: + id: "persist-test" + name: "Persist Test" + version: "1.0.0" + integration: auto +steps: + - id: gate + type: gate + message: "Proceed?" + options: [proceed] +""" + definition = WorkflowDefinition.from_string(yaml_str) + state = engine.execute(definition) + + assert state.status == RunStatus.PAUSED + assert state.resolved_integration == "opencode" + + reloaded = RunState.load(state.run_id, project_dir) + assert reloaded.resolved_integration == "opencode" + + def test_resume_uses_persisted_integration_not_current_project_state( + self, project_dir + ): + from unittest.mock import MagicMock + + from specify_cli.workflows.base import RunStatus + from specify_cli.workflows.engine import WorkflowDefinition, WorkflowEngine + + (project_dir / ".specify" / "integration.json").write_text( + '{"integration": "opencode"}', encoding="utf-8" + ) + engine = WorkflowEngine(project_dir) + yaml_str = """ +schema_version: "1.0" +workflow: + id: "resume-test" + name: "Resume Test" + version: "1.0.0" + integration: auto +steps: + - id: gate + type: gate + message: "Proceed?" + options: [proceed] +""" + definition = WorkflowDefinition.from_string(yaml_str) + state = engine.execute(definition) + assert state.status == RunStatus.PAUSED + assert state.resolved_integration == "opencode" + + (project_dir / ".specify" / "integration.json").write_text( + '{"integration": "claude"}', encoding="utf-8" + ) + + spy = MagicMock(wraps=engine._resolve_workflow_integration) + engine._resolve_workflow_integration = spy + + engine.resume(state.run_id) + + spy.assert_not_called() + + def test_resume_pins_legacy_integration_on_first_resume(self, project_dir): + from specify_cli.workflows.engine import ( + RunState, + WorkflowDefinition, + WorkflowEngine, + ) + + (project_dir / ".specify" / "integration.json").write_text( + '{"integration": "gemini"}', encoding="utf-8" + ) + engine = WorkflowEngine(project_dir) + yaml_str = """ +schema_version: "1.0" +workflow: + id: "legacy-resume-test" + name: "Legacy Resume Test" + version: "1.0.0" + integration: auto +steps: + - id: gate + type: gate + message: "Proceed?" + options: [proceed] +""" + definition = WorkflowDefinition.from_string(yaml_str) + state = engine.execute(definition) + state.resolved_integration = None + state.save() + + reloaded = RunState.load(state.run_id, project_dir) + assert reloaded.resolved_integration is None + + resumed = engine.resume(state.run_id) + assert resumed.resolved_integration == "gemini" + + def test_integration_auto_raises_on_future_schema(self, project_dir): + from specify_cli.integration_state import IntegrationStateSchemaError + from specify_cli.workflows.engine import WorkflowEngine + + (project_dir / ".specify" / "integration.json").write_text( + '{"integration_state_schema": 999}', encoding="utf-8" + ) + engine = WorkflowEngine(project_dir) + with pytest.raises(IntegrationStateSchemaError): + engine._load_project_integration() + + def test_bundled_speckit_workflow_has_auto_default(self, project_dir): + from specify_cli.workflows.engine import WorkflowDefinition + + repo_root = Path(__file__).parent.parent + workflow_path = repo_root / "workflows" / "speckit" / "workflow.yml" + assert workflow_path.exists() + + definition = WorkflowDefinition.from_yaml(workflow_path) + integration_input = definition.inputs.get("integration", {}) + assert integration_input.get("default") == "auto" + + def test_bundled_speckit_workflow_auto_resolves_for_engine(self, project_dir): + from specify_cli.workflows.engine import WorkflowDefinition, WorkflowEngine + + repo_root = Path(__file__).parent.parent + workflow_path = repo_root / "workflows" / "speckit" / "workflow.yml" + + (project_dir / ".specify" / "integration.json").write_text( + '{"integration": "opencode"}', encoding="utf-8" + ) + engine = WorkflowEngine(project_dir) + definition = WorkflowDefinition.from_yaml(workflow_path) + resolved = engine._resolve_inputs(definition, {"spec": "demo"}) + assert resolved.get("integration") == "opencode" + + def test_execute_fails_cleanly_on_corrupted_integration_json(self, project_dir): + """execute() records FAILED state when integration.json is corrupted.""" + from specify_cli.integration_state import IntegrationStateError + from specify_cli.workflows.base import RunStatus + from specify_cli.workflows.engine import ( + WorkflowDefinition, + WorkflowEngine, + ) + + (project_dir / ".specify" / "integration.json").write_text( + "not valid json", encoding="utf-8" + ) + engine = WorkflowEngine(project_dir) + yaml_str = """ +schema_version: "1.0" +workflow: + id: "corrupt-test" + name: "Corrupt Test" + version: "1.0.0" + integration: auto +steps: + - id: gate + type: gate + message: "Proceed?" + options: [proceed] +""" + definition = WorkflowDefinition.from_string(yaml_str) + with pytest.raises(IntegrationStateError): + engine.execute(definition) + + # Verify that a run was created and persisted as FAILED + runs = engine.list_runs() + assert len(runs) == 1 + assert runs[0]["status"] == RunStatus.FAILED.value + + def test_resume_fails_cleanly_on_corrupted_integration_json(self, project_dir): + """resume() records FAILED state when integration.json is corrupted + and no persisted resolved_integration exists.""" + from specify_cli.integration_state import IntegrationStateError + from specify_cli.workflows.base import RunStatus + from specify_cli.workflows.engine import ( + RunState, + WorkflowDefinition, + WorkflowEngine, + ) + + # Start with valid config so execute() succeeds + (project_dir / ".specify" / "integration.json").write_text( + '{"integration": "opencode"}', encoding="utf-8" + ) + engine = WorkflowEngine(project_dir) + yaml_str = """ +schema_version: "1.0" +workflow: + id: "resume-corrupt-test" + name: "Resume Corrupt Test" + version: "1.0.0" + integration: auto +steps: + - id: gate + type: gate + message: "Proceed?" + options: [proceed] +""" + definition = WorkflowDefinition.from_string(yaml_str) + state = engine.execute(definition) + assert state.status == RunStatus.PAUSED + + # Corrupt the file and clear persisted resolved_integration + # to force resume() to re-resolve from the corrupted file + (project_dir / ".specify" / "integration.json").write_text( + "not valid json", encoding="utf-8" + ) + state.resolved_integration = None + state.save() + + with pytest.raises(IntegrationStateError): + engine.resume(state.run_id) + + # Verify that the run was updated to FAILED + reloaded = RunState.load(state.run_id, project_dir) + assert reloaded.status == RunStatus.FAILED + + # ===== State Persistence Tests ===== + class TestRunState: """Test RunState persistence and loading.""" @@ -1585,6 +2147,7 @@ def test_list_after_execution(self, project_dir): # ===== Workflow Registry Tests ===== + class TestWorkflowRegistry: """Test WorkflowRegistry operations.""" @@ -1642,6 +2205,7 @@ def test_persistence(self, project_dir): # ===== Workflow Catalog Tests ===== + class TestWorkflowCatalog: """Test WorkflowCatalog catalog resolution.""" @@ -1657,7 +2221,9 @@ def test_default_catalogs(self, project_dir): def test_env_var_override(self, project_dir, monkeypatch): from specify_cli.workflows.catalog import WorkflowCatalog - monkeypatch.setenv("SPECKIT_WORKFLOW_CATALOG_URL", "https://example.com/catalog.json") + monkeypatch.setenv( + "SPECKIT_WORKFLOW_CATALOG_URL", "https://example.com/catalog.json" + ) catalog = WorkflowCatalog(project_dir) entries = catalog.get_active_catalogs() assert len(entries) == 1 @@ -1668,14 +2234,20 @@ def test_project_level_config(self, project_dir): from specify_cli.workflows.catalog import WorkflowCatalog config_path = project_dir / ".specify" / "workflow-catalogs.yml" - config_path.write_text(yaml.dump({ - "catalogs": [{ - "name": "custom", - "url": "https://example.com/wf-catalog.json", - "priority": 1, - "install_allowed": True, - }] - })) + config_path.write_text( + yaml.dump( + { + "catalogs": [ + { + "name": "custom", + "url": "https://example.com/wf-catalog.json", + "priority": 1, + "install_allowed": True, + } + ] + } + ) + ) catalog = WorkflowCatalog(project_dir) entries = catalog.get_active_catalogs() @@ -1683,7 +2255,10 @@ def test_project_level_config(self, project_dir): assert entries[0].name == "custom" def test_validate_url_http_rejected(self, project_dir): - from specify_cli.workflows.catalog import WorkflowCatalog, WorkflowValidationError + from specify_cli.workflows.catalog import ( + WorkflowCatalog, + WorkflowValidationError, + ) catalog = WorkflowCatalog(project_dir) with pytest.raises(WorkflowValidationError, match="HTTPS"): @@ -1709,7 +2284,10 @@ def test_add_catalog(self, project_dir): assert data["catalogs"][0]["url"] == "https://example.com/new-catalog.json" def test_add_catalog_duplicate_rejected(self, project_dir): - from specify_cli.workflows.catalog import WorkflowCatalog, WorkflowValidationError + from specify_cli.workflows.catalog import ( + WorkflowCatalog, + WorkflowValidationError, + ) catalog = WorkflowCatalog(project_dir) catalog.add_catalog("https://example.com/catalog.json") @@ -1732,7 +2310,10 @@ def test_remove_catalog(self, project_dir): assert len(data["catalogs"]) == 1 def test_remove_catalog_invalid_index(self, project_dir): - from specify_cli.workflows.catalog import WorkflowCatalog, WorkflowValidationError + from specify_cli.workflows.catalog import ( + WorkflowCatalog, + WorkflowValidationError, + ) catalog = WorkflowCatalog(project_dir) catalog.add_catalog("https://example.com/c1.json") @@ -1752,6 +2333,7 @@ def test_get_catalog_configs(self, project_dir): # ===== Integration Test ===== + class TestWorkflowIntegration: """End-to-end workflow execution tests.""" diff --git a/workflows/speckit/workflow.yml b/workflows/speckit/workflow.yml index bf18451029..7f294de917 100644 --- a/workflows/speckit/workflow.yml +++ b/workflows/speckit/workflow.yml @@ -9,7 +9,7 @@ workflow: requires: speckit_version: ">=0.7.2" integrations: - any: ["copilot", "claude", "gemini"] + any: ["copilot", "claude", "gemini", "opencode"] inputs: spec: @@ -18,8 +18,8 @@ inputs: prompt: "Describe what you want to build" integration: type: string - default: "copilot" - prompt: "Integration to use (e.g. claude, copilot, gemini)" + default: "auto" + prompt: "Integration to use (e.g. claude, copilot, gemini, opencode), or 'auto' to detect from project config" scope: type: string default: "full"