diff --git a/src/bedrock_agentcore/evaluation/__init__.py b/src/bedrock_agentcore/evaluation/__init__.py index 57b0c277..a70c267f 100644 --- a/src/bedrock_agentcore/evaluation/__init__.py +++ b/src/bedrock_agentcore/evaluation/__init__.py @@ -6,6 +6,7 @@ EvaluatorOutput, custom_code_based_evaluator, ) +from bedrock_agentcore.evaluation.dataset_client import DatasetClient from bedrock_agentcore.evaluation.runner.batch.batch_evaluation_models import ( BatchEvaluationResult, BatchEvaluationRunConfig, @@ -21,6 +22,7 @@ BatchEvaluationRunner, ) from bedrock_agentcore.evaluation.runner.dataset_providers import ( + DatasetManagementServiceProvider, DatasetProvider, FileDatasetProvider, ) @@ -77,6 +79,7 @@ "AgentInvokerOutput", "CloudWatchAgentSpanCollector", "Dataset", + "DatasetClient", "DatasetProvider", "EvaluationClient", "EvaluationResult", @@ -86,6 +89,7 @@ "EvaluatorOutput", "EvaluatorResult", "FileDatasetProvider", + "DatasetManagementServiceProvider", "Input", "OnDemandEvaluationDatasetRunner", "ReferenceInputs", diff --git a/src/bedrock_agentcore/evaluation/dataset_client.py b/src/bedrock_agentcore/evaluation/dataset_client.py new file mode 100644 index 00000000..2442f774 --- /dev/null +++ b/src/bedrock_agentcore/evaluation/dataset_client.py @@ -0,0 +1,277 @@ +"""DatasetClient for managing evaluation datasets.""" + +import logging +from typing import Any, Dict, Optional + +import boto3 +from botocore.config import Config + +from bedrock_agentcore._utils.config import WaitConfig +from bedrock_agentcore._utils.polling import wait_until, wait_until_deleted +from bedrock_agentcore._utils.snake_case import accept_snake_case_kwargs, convert_kwargs +from bedrock_agentcore._utils.user_agent import build_user_agent_suffix + +logger = logging.getLogger(__name__) + + +class DatasetClient: + """Client for managing evaluation datasets. + + Provides pass-through access to all dataset management APIs on the + bedrock-agentcore-control service, plus *_and_wait helpers for async operations. + + Example:: + + client = DatasetClient(region_name="us-west-2") + + # Create a dataset and wait for ACTIVE + dataset = client.create_dataset_and_wait( + datasetName="my-dataset", + schemaType="AGENTCORE_EVALUATION_PREDEFINED_V1", + source={"inlineExamples": {"examples": [...]}}, + ) + + # Pass-through to any dataset API + client.list_datasets(maxResults=10) + client.add_dataset_examples(datasetId="ds-123", examples=[...]) + """ + + _ALLOWED_CP_METHODS = { + # Dataset CRUD + "create_dataset", + "get_dataset", + "list_datasets", + "update_dataset", + "delete_dataset", + # Version management + "create_dataset_version", + "list_dataset_versions", + # Examples management + "add_dataset_examples", + "update_dataset_examples", + "delete_dataset_examples", + "list_dataset_examples", + } + + def __init__( + self, + region_name: Optional[str] = None, + integration_source: Optional[str] = None, + boto3_session: Optional[boto3.Session] = None, + ): + """Initialize the DatasetClient. + + Args: + region_name: AWS region. Falls back to boto3 session region or us-west-2. + integration_source: Optional integration framework identifier for telemetry. + boto3_session: Optional boto3 Session. If not provided, a default is created. + """ + session = boto3_session if boto3_session else boto3.Session() + self.region_name = region_name or session.region_name or "us-west-2" + self.integration_source = integration_source + + user_agent_extra = build_user_agent_suffix(integration_source) + client_config = Config(user_agent_extra=user_agent_extra) + + self._cp_client = session.client( + "bedrock-agentcore-control", + region_name=self.region_name, + config=client_config, + ) + + logger.info("Initialized DatasetClient in region %s", self.region_name) + + def __getattr__(self, name: str): + """Dynamically forward allowlisted method calls to the boto3 client.""" + if "_cp_client" not in self.__dict__: + raise AttributeError(name) + + if name in self._ALLOWED_CP_METHODS and hasattr(self._cp_client, name): + method = getattr(self._cp_client, name) + logger.debug("Forwarding method '%s' to _cp_client", name) + return accept_snake_case_kwargs(method) + + raise AttributeError( + f"'{self.__class__.__name__}' object has no attribute '{name}'. " + f"Method not found on control plane client. " + f"Available methods can be found in the boto3 documentation for " + f"'bedrock-agentcore-control' service." + ) + + # *_and_wait methods + # ------------------------------------------------------------------------- + + def create_dataset_and_wait( + self, + wait_config: Optional[WaitConfig] = None, + **kwargs, + ) -> Dict[str, Any]: + """Create a dataset and wait for it to reach ACTIVE status. + + Args: + wait_config: Optional WaitConfig for polling behavior. + **kwargs: Arguments forwarded to the create_dataset API. + + Returns: + Dataset details when ACTIVE. + + Raises: + RuntimeError: If the dataset reaches CREATE_FAILED status. + TimeoutError: If the dataset doesn't become ACTIVE within max_wait. + """ + response = self._cp_client.create_dataset(**convert_kwargs(kwargs)) + dataset_id = response["datasetId"] + return wait_until( + lambda: self._cp_client.get_dataset(datasetId=dataset_id), + "ACTIVE", + {"CREATE_FAILED"}, + wait_config, + ) + + def delete_dataset_and_wait( + self, + wait_config: Optional[WaitConfig] = None, + **kwargs, + ) -> Optional[Dict[str, Any]]: + """Delete a dataset (or a single version) and wait for completion. + + - Full delete (no ``datasetVersion``): polls until ``GetDataset`` + raises ``ResourceNotFoundException``. Fails on ``DELETE_FAILED``. + - Version-specific delete (``datasetVersion`` provided): the dataset + itself is not removed. Polls ``GetDataset`` until status returns to + ``ACTIVE``. Fails on ``UPDATE_FAILED``. Returns the dataset details. + + Args: + wait_config: Optional WaitConfig for polling behavior. + **kwargs: Arguments forwarded to the delete_dataset API. + + Raises: + RuntimeError: On ``DELETE_FAILED`` or ``UPDATE_FAILED``. + TimeoutError: If the operation does not finish within ``max_wait``. + """ + converted = convert_kwargs(kwargs) + response = self._cp_client.delete_dataset(**converted) + dataset_id = response["datasetId"] + + if "datasetVersion" in converted: + return wait_until( + lambda: self._cp_client.get_dataset(datasetId=dataset_id), + "ACTIVE", + {"UPDATE_FAILED"}, + wait_config, + ) + + wait_until_deleted( + lambda: self._cp_client.get_dataset(datasetId=dataset_id), + failed={"DELETE_FAILED"}, + wait_config=wait_config, + ) + return None + + def create_dataset_version_and_wait( + self, + wait_config: Optional[WaitConfig] = None, + **kwargs, + ) -> Dict[str, Any]: + """Create a dataset version and wait for the dataset to reach ACTIVE status. + + Args: + wait_config: Optional WaitConfig for polling behavior. + **kwargs: Arguments forwarded to the create_dataset_version API. + + Returns: + Dataset details when ACTIVE. + + Raises: + RuntimeError: If the dataset reaches UPDATE_FAILED status. + TimeoutError: If the dataset doesn't become ACTIVE within max_wait. + """ + response = self._cp_client.create_dataset_version(**convert_kwargs(kwargs)) + dataset_id = response["datasetId"] + return wait_until( + lambda: self._cp_client.get_dataset(datasetId=dataset_id), + "ACTIVE", + {"UPDATE_FAILED"}, + wait_config, + ) + + def add_examples_and_wait( + self, + wait_config: Optional[WaitConfig] = None, + **kwargs, + ) -> Dict[str, Any]: + """Add examples to a dataset and wait for ACTIVE status. + + Args: + wait_config: Optional WaitConfig for polling behavior. + **kwargs: Arguments forwarded to the add_dataset_examples API. + + Returns: + Dataset details when ACTIVE. + + Raises: + RuntimeError: If the dataset reaches UPDATE_FAILED status. + TimeoutError: If the dataset doesn't become ACTIVE within max_wait. + """ + response = self._cp_client.add_dataset_examples(**convert_kwargs(kwargs)) + dataset_id = response["datasetId"] + return wait_until( + lambda: self._cp_client.get_dataset(datasetId=dataset_id), + "ACTIVE", + {"UPDATE_FAILED"}, + wait_config, + ) + + def update_examples_and_wait( + self, + wait_config: Optional[WaitConfig] = None, + **kwargs, + ) -> Dict[str, Any]: + """Update examples in a dataset and wait for ACTIVE status. + + Args: + wait_config: Optional WaitConfig for polling behavior. + **kwargs: Arguments forwarded to the update_dataset_examples API. + + Returns: + Dataset details when ACTIVE. + + Raises: + RuntimeError: If the dataset reaches UPDATE_FAILED status. + TimeoutError: If the dataset doesn't become ACTIVE within max_wait. + """ + response = self._cp_client.update_dataset_examples(**convert_kwargs(kwargs)) + dataset_id = response["datasetId"] + return wait_until( + lambda: self._cp_client.get_dataset(datasetId=dataset_id), + "ACTIVE", + {"UPDATE_FAILED"}, + wait_config, + ) + + def delete_examples_and_wait( + self, + wait_config: Optional[WaitConfig] = None, + **kwargs, + ) -> Dict[str, Any]: + """Delete examples from a dataset and wait for ACTIVE status. + + Args: + wait_config: Optional WaitConfig for polling behavior. + **kwargs: Arguments forwarded to the delete_dataset_examples API. + + Returns: + Dataset details when ACTIVE. + + Raises: + RuntimeError: If the dataset reaches UPDATE_FAILED status. + TimeoutError: If the dataset doesn't become ACTIVE within max_wait. + """ + response = self._cp_client.delete_dataset_examples(**convert_kwargs(kwargs)) + dataset_id = response["datasetId"] + return wait_until( + lambda: self._cp_client.get_dataset(datasetId=dataset_id), + "ACTIVE", + {"UPDATE_FAILED"}, + wait_config, + ) diff --git a/src/bedrock_agentcore/evaluation/runner/dataset_providers.py b/src/bedrock_agentcore/evaluation/runner/dataset_providers.py index f8cb2cb4..081d626e 100644 --- a/src/bedrock_agentcore/evaluation/runner/dataset_providers.py +++ b/src/bedrock_agentcore/evaluation/runner/dataset_providers.py @@ -2,9 +2,52 @@ import json from abc import ABC, abstractmethod -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional -from .dataset_types import ActorProfile, Dataset, PredefinedScenario, Scenario, SimulatedScenario, Turn +import requests + +from bedrock_agentcore.evaluation.dataset_client import DatasetClient + +from .dataset_types import ( + ActorProfile, + Dataset, + PredefinedScenario, + Scenario, + SimulatedScenario, + Turn, +) + +SUPPORTED_SCHEMA_TYPES = { + "AGENTCORE_EVALUATION_PREDEFINED_V1", + "AGENTCORE_EVALUATION_SIMULATED_V1", +} + + +def _parse_scenario(raw: Dict[str, Any]) -> PredefinedScenario | SimulatedScenario: + """Parse a raw dict into a PredefinedScenario or SimulatedScenario.""" + if "turns" in raw: + return PredefinedScenario( + scenario_id=raw["scenario_id"], + turns=[Turn(input=t["input"], expected_response=t.get("expected_response")) for t in raw["turns"]], + expected_trajectory=raw.get("expected_trajectory"), + assertions=raw.get("assertions"), + metadata=raw.get("metadata"), + ) + else: + missing = [k for k in ("scenario_id", "actor_profile", "input") if k not in raw] + if missing: + raise ValueError( + f"Scenario '{raw.get('scenario_id', '?')}' is missing required fields for SimulatedScenario: {missing}" + ) + return SimulatedScenario( + scenario_id=raw["scenario_id"], + scenario_description=raw.get("scenario_description", ""), + actor_profile=ActorProfile(**raw["actor_profile"]), + input=raw["input"], + max_turns=raw.get("max_turns", 10), + assertions=raw.get("assertions"), + metadata=raw.get("metadata"), + ) class DatasetProvider(ABC): @@ -16,49 +59,86 @@ def get_dataset(self) -> Dataset: class FileDatasetProvider(DatasetProvider): - """A dataset provider that loads a Dataset from a JSON file.""" + """A dataset provider that loads a Dataset from a JSON or JSONL file. + + JSON format: {"scenarios": [{...}, {...}]} + JSONL format: one scenario JSON object per line. + Format is selected by file extension (".jsonl" → JSONL, otherwise JSON). + """ def __init__(self, file_path: str): - """Initialize with a path to a JSON dataset file.""" + """Initialize with a path to a JSON or JSONL dataset file.""" self._file_path = file_path def get_dataset(self) -> Dataset: - """Load and return the dataset from the JSON file.""" - with open(self._file_path) as f: - data = json.load(f) - scenarios: List[Scenario] = [self._parse_scenario(s) for s in data["scenarios"]] + """Load and return the dataset from the file.""" + if self._file_path.endswith(".jsonl"): + with open(self._file_path) as f: + raw_examples = [json.loads(line) for line in f if line.strip()] + else: + with open(self._file_path) as f: + raw_examples = json.load(f)["scenarios"] + scenarios: List[Scenario] = [_parse_scenario(s) for s in raw_examples] return Dataset(scenarios=scenarios) - @staticmethod - def _parse_scenario(raw: Dict[str, Any]) -> PredefinedScenario | SimulatedScenario: - if "turns" in raw: - return PredefinedScenario( - scenario_id=raw["scenario_id"], - turns=[FileDatasetProvider._parse_turn(t) for t in raw["turns"]], - expected_trajectory=raw.get("expected_trajectory"), - assertions=raw.get("assertions"), - metadata=raw.get("metadata"), - ) - else: - missing = [k for k in ("scenario_id", "actor_profile", "input") if k not in raw] - if missing: - raise ValueError( - f"Scenario '{raw.get('scenario_id', '?')}' is missing required fields " - f"for SimulatedScenario: {missing}" - ) - return SimulatedScenario( - scenario_id=raw["scenario_id"], - scenario_description=raw.get("scenario_description", ""), - actor_profile=ActorProfile(**raw["actor_profile"]), - input=raw["input"], - max_turns=raw.get("max_turns", 10), - assertions=raw.get("assertions"), - metadata=raw.get("metadata"), + +class DatasetManagementServiceProvider(DatasetProvider): + """A dataset provider that loads a Dataset from the Dataset Management service.""" + + def __init__( + self, + dataset_id: str, + version_id: Optional[str] = None, + client: Optional[DatasetClient] = None, + ): + """Initialize with a dataset ID and optional version. + + Args: + dataset_id: The dataset ID to fetch. + version_id: Optional version ID. If omitted, fetches DRAFT. + client: Optional DatasetClient instance. If not provided, a default is created. + """ + self._dataset_id = dataset_id + self._version_id = version_id + self._client = client if client is not None else DatasetClient() + + def get_dataset(self) -> Dataset: + """Load and return the dataset from the Dataset Management service. + + Fetches the dataset via the presigned download URL returned by GetDataset. + The URL points to a JSONL file where each line is one example. + + Raises: + ValueError: If the dataset has no downloadUrl or has an unsupported schemaType. + RuntimeError: If the dataset content cannot be downloaded. + """ + kwargs: Dict[str, Any] = {"datasetId": self._dataset_id} + if self._version_id: + kwargs["datasetVersion"] = self._version_id + + response = self._client.get_dataset(**kwargs) + + schema_type = response.get("schemaType") + if schema_type and schema_type not in SUPPORTED_SCHEMA_TYPES: + raise ValueError( + f"Dataset schema type '{schema_type}' is not supported by the " + f"evaluation runners. Supported types: {sorted(SUPPORTED_SCHEMA_TYPES)}" ) - @staticmethod - def _parse_turn(raw: Dict[str, Any]) -> Turn: - return Turn( - input=raw["input"], - expected_response=raw.get("expected_response"), - ) + download_url = response.get("downloadUrl") + if not download_url: + raise ValueError(f"Dataset {self._dataset_id} has no downloadUrl. Status: {response.get('status')}") + + try: + r = requests.get(download_url, timeout=60, stream=True) + r.raise_for_status() + except requests.RequestException as e: + raise RuntimeError(f"Couldn't download dataset from S3 bucket: {e}") from e + + all_examples: List[Dict[str, Any]] = [] + for line in r.iter_lines(decode_unicode=False): + if line: + all_examples.append(json.loads(line.decode("utf-8"))) + + scenarios: List[Scenario] = [_parse_scenario(example) for example in all_examples] + return Dataset(scenarios=scenarios) diff --git a/src/bedrock_agentcore/evaluation/runner/dataset_types.py b/src/bedrock_agentcore/evaluation/runner/dataset_types.py index 6d8b6206..4e9f63b6 100644 --- a/src/bedrock_agentcore/evaluation/runner/dataset_types.py +++ b/src/bedrock_agentcore/evaluation/runner/dataset_types.py @@ -37,6 +37,7 @@ class Turn(BaseModel): class Scenario(BaseModel): """Base class for evaluation scenarios.""" + schema_type: str = "" scenario_id: str assertions: Optional[List[str]] = None metadata: Optional[Dict[str, Any]] = None @@ -45,6 +46,7 @@ class Scenario(BaseModel): class PredefinedScenario(Scenario): """A scenario with a predefined conversation flow.""" + schema_type: str = "AGENTCORE_EVALUATION_PREDEFINED_V1" turns: List[Turn] expected_trajectory: Optional[List[str]] = None @@ -72,6 +74,8 @@ class SimulatedScenario(Scenario): Defaults to 10. """ + schema_type: str = "AGENTCORE_EVALUATION_SIMULATED_V1" + model_config = ConfigDict(arbitrary_types_allowed=True) scenario_description: str = "" diff --git a/tests/bedrock_agentcore/evaluation/test_dataset_client.py b/tests/bedrock_agentcore/evaluation/test_dataset_client.py new file mode 100644 index 00000000..c51523c8 --- /dev/null +++ b/tests/bedrock_agentcore/evaluation/test_dataset_client.py @@ -0,0 +1,235 @@ +"""Tests for DatasetClient.""" + +from unittest.mock import MagicMock, patch + +import pytest + +from bedrock_agentcore._utils.config import WaitConfig +from bedrock_agentcore.evaluation.dataset_client import DatasetClient + + +@pytest.fixture +def mock_boto3(): + with patch("bedrock_agentcore.evaluation.dataset_client.boto3") as m: + yield m + + +@pytest.fixture +def client_and_cp(mock_boto3): + """Return a (DatasetClient, mock_cp_client) tuple.""" + mock_session = MagicMock() + mock_boto3.Session.return_value = mock_session + mock_cp = mock_session.client.return_value + client = DatasetClient() + return client, mock_cp + + +class TestDatasetClientInit: + def test_default_region(self, mock_boto3): + mock_session = MagicMock() + mock_session.region_name = "us-east-1" + mock_boto3.Session.return_value = mock_session + + client = DatasetClient() + assert client.region_name == "us-east-1" + mock_session.client.assert_called_once() + + def test_explicit_region(self, mock_boto3): + mock_session = MagicMock() + mock_boto3.Session.return_value = mock_session + + client = DatasetClient(region_name="eu-west-1") + assert client.region_name == "eu-west-1" + + def test_custom_session(self, mock_boto3): + custom_session = MagicMock() + custom_session.region_name = "ap-southeast-1" + + client = DatasetClient(boto3_session=custom_session) + assert client.region_name == "ap-southeast-1" + custom_session.client.assert_called_once() + + +class TestDatasetClientPassthrough: + def test_allowed_method_forwarded(self, client_and_cp): + client, mock_cp = client_and_cp + mock_cp.list_datasets.return_value = {"datasets": []} + + client.list_datasets(maxResults=10) + mock_cp.list_datasets.assert_called_once_with(maxResults=10) + + def test_snake_case_converted(self, client_and_cp): + client, mock_cp = client_and_cp + mock_cp.list_datasets.return_value = {"datasets": []} + + client.list_datasets(max_results=5) + mock_cp.list_datasets.assert_called_once_with(maxResults=5) + + def test_non_allowed_method_raises(self, client_and_cp): + client, _ = client_and_cp + with pytest.raises(AttributeError, match="not_a_real_method"): + client.not_a_real_method() + + +class TestDatasetClientCreateAndWait: + def test_create_dataset_and_wait_success(self, client_and_cp): + client, mock_cp = client_and_cp + mock_cp.create_dataset.return_value = {"datasetId": "ds-123"} + mock_cp.get_dataset.return_value = {"datasetId": "ds-123", "status": "ACTIVE"} + + result = client.create_dataset_and_wait( + wait_config=WaitConfig(max_wait=10, poll_interval=1), + datasetName="test", + schemaType="AGENTCORE_EVALUATION_PREDEFINED_V1", + ) + + assert result["status"] == "ACTIVE" + mock_cp.create_dataset.assert_called_once() + mock_cp.get_dataset.assert_called_with(datasetId="ds-123") + + def test_create_dataset_and_wait_failure(self, client_and_cp): + client, mock_cp = client_and_cp + mock_cp.create_dataset.return_value = {"datasetId": "ds-123"} + mock_cp.get_dataset.return_value = { + "datasetId": "ds-123", + "status": "CREATE_FAILED", + "statusReasons": "Invalid source", + } + + with pytest.raises(RuntimeError, match="CREATE_FAILED"): + client.create_dataset_and_wait( + wait_config=WaitConfig(max_wait=5, poll_interval=1), + datasetName="test", + ) + + def test_create_dataset_and_wait_timeout(self, client_and_cp): + client, mock_cp = client_and_cp + mock_cp.create_dataset.return_value = {"datasetId": "ds-123"} + mock_cp.get_dataset.return_value = {"datasetId": "ds-123", "status": "CREATING"} + + with pytest.raises(TimeoutError): + client.create_dataset_and_wait( + wait_config=WaitConfig(max_wait=1, poll_interval=1), + datasetName="test", + ) + + +class TestDatasetClientDeleteAndWait: + def test_delete_dataset_and_wait_success(self, client_and_cp): + from botocore.exceptions import ClientError + + client, mock_cp = client_and_cp + mock_cp.delete_dataset.return_value = {"datasetId": "ds-123"} + mock_cp.get_dataset.side_effect = ClientError( + {"Error": {"Code": "ResourceNotFoundException", "Message": "Not found"}}, + "GetDataset", + ) + + client.delete_dataset_and_wait( + wait_config=WaitConfig(max_wait=10, poll_interval=1), + datasetId="ds-123", + ) + + mock_cp.delete_dataset.assert_called_once_with(datasetId="ds-123") + + def test_delete_dataset_version_and_wait_returns_active(self, client_and_cp): + client, mock_cp = client_and_cp + mock_cp.delete_dataset.return_value = {"datasetId": "ds-123"} + mock_cp.get_dataset.return_value = {"datasetId": "ds-123", "status": "ACTIVE"} + + result = client.delete_dataset_and_wait( + wait_config=WaitConfig(max_wait=10, poll_interval=1), + datasetId="ds-123", + datasetVersion="1", + ) + + assert result is not None + assert result["status"] == "ACTIVE" + mock_cp.delete_dataset.assert_called_once_with(datasetId="ds-123", datasetVersion="1") + + def test_delete_dataset_version_and_wait_failure(self, client_and_cp): + client, mock_cp = client_and_cp + mock_cp.delete_dataset.return_value = {"datasetId": "ds-123"} + mock_cp.get_dataset.return_value = { + "datasetId": "ds-123", + "status": "UPDATE_FAILED", + "statusReasons": "Workflow failed", + } + + with pytest.raises(RuntimeError, match="UPDATE_FAILED"): + client.delete_dataset_and_wait( + wait_config=WaitConfig(max_wait=5, poll_interval=1), + datasetId="ds-123", + datasetVersion="1", + ) + + +class TestDatasetClientVersionAndWait: + def test_create_version_and_wait_success(self, client_and_cp): + client, mock_cp = client_and_cp + mock_cp.create_dataset_version.return_value = {"datasetId": "ds-123"} + mock_cp.get_dataset.return_value = {"datasetId": "ds-123", "status": "ACTIVE"} + + result = client.create_dataset_version_and_wait( + wait_config=WaitConfig(max_wait=10, poll_interval=1), + datasetId="ds-123", + ) + + assert result["status"] == "ACTIVE" + + +class TestDatasetClientExamplesAndWait: + def test_add_examples_and_wait_success(self, client_and_cp): + client, mock_cp = client_and_cp + mock_cp.add_dataset_examples.return_value = {"datasetId": "ds-123"} + mock_cp.get_dataset.return_value = {"datasetId": "ds-123", "status": "ACTIVE"} + + result = client.add_examples_and_wait( + wait_config=WaitConfig(max_wait=10, poll_interval=1), + datasetId="ds-123", + examples=[{"scenario_id": "s1", "turns": [{"input": "hi"}]}], + ) + + assert result["status"] == "ACTIVE" + + def test_update_examples_and_wait_success(self, client_and_cp): + client, mock_cp = client_and_cp + mock_cp.update_dataset_examples.return_value = {"datasetId": "ds-123"} + mock_cp.get_dataset.return_value = {"datasetId": "ds-123", "status": "ACTIVE"} + + result = client.update_examples_and_wait( + wait_config=WaitConfig(max_wait=10, poll_interval=1), + datasetId="ds-123", + examples=[{"exampleId": "e1", "scenario_id": "s1", "turns": [{"input": "hi"}]}], + ) + + assert result["status"] == "ACTIVE" + + def test_delete_examples_and_wait_success(self, client_and_cp): + client, mock_cp = client_and_cp + mock_cp.delete_dataset_examples.return_value = {"datasetId": "ds-123"} + mock_cp.get_dataset.return_value = {"datasetId": "ds-123", "status": "ACTIVE"} + + result = client.delete_examples_and_wait( + wait_config=WaitConfig(max_wait=10, poll_interval=1), + datasetId="ds-123", + exampleIds=["e1"], + ) + + assert result["status"] == "ACTIVE" + + def test_add_examples_and_wait_failure(self, client_and_cp): + client, mock_cp = client_and_cp + mock_cp.add_dataset_examples.return_value = {"datasetId": "ds-123"} + mock_cp.get_dataset.return_value = { + "datasetId": "ds-123", + "status": "UPDATE_FAILED", + "statusReasons": "Schema validation failed", + } + + with pytest.raises(RuntimeError, match="UPDATE_FAILED"): + client.add_examples_and_wait( + wait_config=WaitConfig(max_wait=5, poll_interval=1), + datasetId="ds-123", + examples=[], + ) diff --git a/tests/bedrock_agentcore/evaluation/test_dataset_parser.py b/tests/bedrock_agentcore/evaluation/test_dataset_parser.py index da605375..5a39c97d 100644 --- a/tests/bedrock_agentcore/evaluation/test_dataset_parser.py +++ b/tests/bedrock_agentcore/evaluation/test_dataset_parser.py @@ -167,3 +167,98 @@ def test_parse_mixed_predefined_and_simulated(self, tmp_path): assert len(dataset.scenarios) == 2 assert isinstance(dataset.scenarios[0], PredefinedScenario) assert isinstance(dataset.scenarios[1], SimulatedScenario) + + +class TestFileDatasetProviderJsonl: + def _write_jsonl_and_load(self, tmp_path, scenarios): + file_path = tmp_path / "dataset.jsonl" + file_path.write_text("\n".join(json.dumps(s) for s in scenarios)) + return FileDatasetProvider(str(file_path)).get_dataset() + + def test_parse_jsonl_predefined(self, tmp_path): + scenarios = [ + { + "scenario_id": "s1", + "turns": [{"input": "Hello", "expected_response": "Hi"}], + }, + { + "scenario_id": "s2", + "turns": [{"input": "What is 2+2?"}], + }, + ] + dataset = self._write_jsonl_and_load(tmp_path, scenarios) + assert isinstance(dataset, Dataset) + assert len(dataset.scenarios) == 2 + assert all(isinstance(s, PredefinedScenario) for s in dataset.scenarios) + assert dataset.scenarios[0].scenario_id == "s1" + assert dataset.scenarios[0].turns[0].expected_response == "Hi" + + def test_parse_jsonl_simulated(self, tmp_path): + scenarios = [ + { + "scenario_id": "sim-1", + "scenario_description": "Customer orders pizza", + "actor_profile": {"context": "ctx", "goal": "Order a pizza"}, + "input": "I'd like a pizza", + "max_turns": 5, + } + ] + dataset = self._write_jsonl_and_load(tmp_path, scenarios) + assert len(dataset.scenarios) == 1 + scenario = dataset.scenarios[0] + assert isinstance(scenario, SimulatedScenario) + assert scenario.actor_profile.goal == "Order a pizza" + assert scenario.max_turns == 5 + + def test_parse_jsonl_mixed(self, tmp_path): + scenarios = [ + {"scenario_id": "pre-1", "turns": [{"input": "hi"}]}, + { + "scenario_id": "sim-1", + "scenario_description": "desc", + "actor_profile": {"context": "ctx", "goal": "goal"}, + "input": "hello", + }, + ] + dataset = self._write_jsonl_and_load(tmp_path, scenarios) + assert len(dataset.scenarios) == 2 + assert isinstance(dataset.scenarios[0], PredefinedScenario) + assert isinstance(dataset.scenarios[1], SimulatedScenario) + + def test_parse_jsonl_skips_blank_lines(self, tmp_path): + file_path = tmp_path / "dataset.jsonl" + file_path.write_text( + '{"scenario_id": "s1", "turns": [{"input": "hi"}]}\n' + "\n" + " \n" + '{"scenario_id": "s2", "turns": [{"input": "bye"}]}\n' + ) + dataset = FileDatasetProvider(str(file_path)).get_dataset() + assert len(dataset.scenarios) == 2 + assert {s.scenario_id for s in dataset.scenarios} == {"s1", "s2"} + + def test_parse_jsonl_empty_file_raises(self, tmp_path): + file_path = tmp_path / "empty.jsonl" + file_path.write_text("") + with pytest.raises(ValueError, match="scenarios must not be empty"): + FileDatasetProvider(str(file_path)).get_dataset() + + def test_parse_jsonl_invalid_line_raises(self, tmp_path): + file_path = tmp_path / "bad.jsonl" + file_path.write_text('{"scenario_id": "s1", "turns": [{"input": "hi"}]}\nnot json\n') + provider = FileDatasetProvider(str(file_path)) + with pytest.raises(json.JSONDecodeError): + provider.get_dataset() + + def test_parse_jsonl_missing_required_field_raises(self, tmp_path): + scenarios = [{"scenario_id": "s1"}] + with pytest.raises(ValueError): + self._write_jsonl_and_load(tmp_path, scenarios) + + def test_jsonl_extension_dispatch(self, tmp_path): + # A .json file with one-line-per-object content should NOT be parsed as JSONL. + file_path = tmp_path / "looks-like-jsonl.json" + file_path.write_text('{"scenario_id": "s1", "turns": [{"input": "hi"}]}\n') + # Without "scenarios" wrapper, JSON parse yields a dict that fails the lookup. + with pytest.raises(KeyError): + FileDatasetProvider(str(file_path)).get_dataset() diff --git a/tests/bedrock_agentcore/evaluation/test_service_dataset_provider.py b/tests/bedrock_agentcore/evaluation/test_service_dataset_provider.py new file mode 100644 index 00000000..3b9c8deb --- /dev/null +++ b/tests/bedrock_agentcore/evaluation/test_service_dataset_provider.py @@ -0,0 +1,188 @@ +"""Tests for DatasetManagementServiceProvider.""" + +from unittest.mock import MagicMock, patch + +import pytest + +from bedrock_agentcore.evaluation.runner.dataset_providers import DatasetManagementServiceProvider +from bedrock_agentcore.evaluation.runner.dataset_types import ( + Dataset, + PredefinedScenario, + SimulatedScenario, +) + +PATCH_REQUESTS = "bedrock_agentcore.evaluation.runner.dataset_providers.requests" + + +def _jsonl(*examples): + """Build a JSONL string from example dicts.""" + import json + + return "\n".join(json.dumps(e) for e in examples) + + +class TestDatasetManagementServiceProvider: + def _run_provider( + self, jsonl_content, dataset_id="ds-123", version_id=None, schema_type="AGENTCORE_EVALUATION_PREDEFINED_V1" + ): + mock_client = MagicMock() + mock_client.get_dataset.return_value = { + "datasetId": dataset_id, + "status": "ACTIVE", + "schemaType": schema_type, + "downloadUrl": "https://example.com/dataset.jsonl", + } + + mock_response = MagicMock() + mock_response.iter_lines.return_value = [line.encode("utf-8") for line in jsonl_content.split("\n") if line] + mock_response.raise_for_status = MagicMock() + + with patch(PATCH_REQUESTS) as mock_requests: + mock_requests.get.return_value = mock_response + provider = DatasetManagementServiceProvider( + dataset_id=dataset_id, version_id=version_id, client=mock_client + ) + return provider.get_dataset(), mock_client, mock_requests + + def test_get_dataset_predefined(self): + content = _jsonl( + { + "exampleId": "e1", + "scenario_id": "s1", + "turns": [{"input": "Hello", "expected_response": "Hi!"}], + "assertions": ["Be polite"], + "expected_trajectory": ["greet"], + }, + { + "exampleId": "e2", + "scenario_id": "s2", + "turns": [{"input": "What is 2+2?"}], + }, + ) + + dataset, mock_client, mock_requests = self._run_provider(content) + + assert isinstance(dataset, Dataset) + assert len(dataset.scenarios) == 2 + + s1 = dataset.scenarios[0] + assert isinstance(s1, PredefinedScenario) + assert s1.scenario_id == "s1" + assert s1.turns[0].input == "Hello" + assert s1.turns[0].expected_response == "Hi!" + assert s1.assertions == ["Be polite"] + assert s1.expected_trajectory == ["greet"] + + s2 = dataset.scenarios[1] + assert isinstance(s2, PredefinedScenario) + assert s2.scenario_id == "s2" + + def test_get_dataset_simulated(self): + content = _jsonl( + { + "exampleId": "e1", + "scenario_id": "sim-1", + "scenario_description": "Frustrated customer", + "actor_profile": { + "traits": {"personality": "impatient"}, + "context": "Waiting 3 days", + "goal": "Get a refund", + }, + "input": "I want my money back!", + "max_turns": 5, + "assertions": ["Agent should empathize"], + }, + ) + + dataset, _, _ = self._run_provider( + content, dataset_id="ds-456", schema_type="AGENTCORE_EVALUATION_SIMULATED_V1" + ) + + assert isinstance(dataset, Dataset) + assert len(dataset.scenarios) == 1 + scenario = dataset.scenarios[0] + assert isinstance(scenario, SimulatedScenario) + assert scenario.scenario_id == "sim-1" + assert scenario.actor_profile.goal == "Get a refund" + assert scenario.max_turns == 5 + assert scenario.assertions == ["Agent should empathize"] + + def test_downloads_from_presigned_url(self): + content = _jsonl({"scenario_id": "s1", "turns": [{"input": "hi"}]}) + + _, mock_client, mock_requests = self._run_provider(content) + + mock_client.get_dataset.assert_called_once_with(datasetId="ds-123") + mock_requests.get.assert_called_once_with("https://example.com/dataset.jsonl", timeout=60, stream=True) + + def test_get_dataset_with_version_id(self): + content = _jsonl({"scenario_id": "s1", "turns": [{"input": "hi"}]}) + + _, mock_client, _ = self._run_provider(content, dataset_id="ds-123", version_id="1") + + mock_client.get_dataset.assert_called_once_with(datasetId="ds-123", datasetVersion="1") + + def test_get_dataset_no_download_url_raises(self): + mock_client = MagicMock() + mock_client.get_dataset.return_value = { + "datasetId": "ds-123", + "status": "CREATING", + "schemaType": "AGENTCORE_EVALUATION_PREDEFINED_V1", + } + + provider = DatasetManagementServiceProvider(dataset_id="ds-123", client=mock_client) + with pytest.raises(ValueError, match="no downloadUrl"): + provider.get_dataset() + + def test_get_dataset_empty_raises(self): + mock_client = MagicMock() + mock_client.get_dataset.return_value = { + "datasetId": "ds-empty", + "status": "ACTIVE", + "schemaType": "AGENTCORE_EVALUATION_PREDEFINED_V1", + "downloadUrl": "https://example.com/dataset.jsonl", + } + + mock_response = MagicMock() + mock_response.iter_lines.return_value = [] + mock_response.raise_for_status = MagicMock() + + with patch(PATCH_REQUESTS) as mock_requests: + mock_requests.get.return_value = mock_response + provider = DatasetManagementServiceProvider(dataset_id="ds-empty", client=mock_client) + with pytest.raises(ValueError, match="scenarios must not be empty"): + provider.get_dataset() + + def test_unsupported_schema_type_raises(self): + mock_client = MagicMock() + mock_client.get_dataset.return_value = { + "datasetId": "ds-123", + "status": "ACTIVE", + "schemaType": "RAGAS_V1", + "downloadUrl": "https://example.com/dataset.jsonl", + } + + provider = DatasetManagementServiceProvider(dataset_id="ds-123", client=mock_client) + with pytest.raises(ValueError, match="not supported by the evaluation runners"): + provider.get_dataset() + + def test_download_failure_raises_runtime_error(self): + import requests as real_requests + + mock_client = MagicMock() + mock_client.get_dataset.return_value = { + "datasetId": "ds-123", + "status": "ACTIVE", + "schemaType": "AGENTCORE_EVALUATION_PREDEFINED_V1", + "downloadUrl": "https://example.com/dataset.jsonl", + } + + mock_response = MagicMock() + mock_response.raise_for_status.side_effect = real_requests.HTTPError("403 Forbidden") + + with patch(PATCH_REQUESTS) as mock_requests: + mock_requests.get.return_value = mock_response + mock_requests.RequestException = real_requests.RequestException + provider = DatasetManagementServiceProvider(dataset_id="ds-123", client=mock_client) + with pytest.raises(RuntimeError, match="Couldn't download dataset"): + provider.get_dataset() diff --git a/tests_integ/evaluation/test_dataset_client.py b/tests_integ/evaluation/test_dataset_client.py new file mode 100644 index 00000000..6fa0f73b --- /dev/null +++ b/tests_integ/evaluation/test_dataset_client.py @@ -0,0 +1,223 @@ +"""Integration tests for DatasetClient. + +Run with: + uv run pytest tests_integ/evaluation/test_dataset_client.py -xvs --log-cli-level=INFO +""" + +import os +import time + +import pytest +from botocore.exceptions import ClientError + +from bedrock_agentcore.evaluation.dataset_client import DatasetClient + + +@pytest.mark.integration +class TestDatasetClientPassthrough: + """Read-only passthrough tests. No resources needed.""" + + @classmethod + def setup_class(cls): + cls.region = os.environ.get("BEDROCK_TEST_REGION", "us-west-2") + cls.client = DatasetClient(region_name=cls.region) + + @pytest.mark.order(1) + def test_list_datasets_passthrough(self): + response = self.client.list_datasets() + assert "datasets" in response + + @pytest.mark.order(2) + def test_list_datasets_snake_case(self): + response = self.client.list_datasets(max_results=5) + assert "datasets" in response + + @pytest.mark.order(3) + def test_get_nonexistent_dataset_raises(self): + with pytest.raises(ClientError) as exc_info: + self.client.get_dataset(datasetId="nonexistent-dataset-id") + assert exc_info.value.response["Error"]["Code"] == "ResourceNotFoundException" + + @pytest.mark.order(4) + def test_non_allowlisted_method_raises(self): + with pytest.raises(AttributeError): + self.client.not_a_real_method() + + +@pytest.mark.integration +class TestDatasetCrud: + """Full CRUD lifecycle tests for datasets.""" + + @classmethod + def setup_class(cls): + cls.region = os.environ.get("BEDROCK_TEST_REGION", "us-west-2") + cls.client = DatasetClient(region_name=cls.region) + cls.test_prefix = f"sdk_integ_{int(time.time())}" + cls.dataset_ids = [] + + @classmethod + def teardown_class(cls): + for did in cls.dataset_ids: + try: + cls.client.delete_dataset(datasetId=did) + except Exception as e: + print(f"Failed to delete dataset {did}: {e}") + + @pytest.mark.order(5) + def test_create_dataset_and_wait_inline(self): + """Create a dataset with inline examples and wait for ACTIVE.""" + dataset = self.client.create_dataset_and_wait( + datasetName=f"{self.test_prefix}_predefined", + schemaType="AGENTCORE_EVALUATION_PREDEFINED_V1", + source={ + "inlineExamples": { + "examples": [ + { + "scenario_id": "test-scenario-1", + "turns": [{"input": "Hello", "expected_response": "Hi there!"}], + } + ] + } + }, + ) + self.__class__.dataset_ids.append(dataset["datasetId"]) + assert dataset["status"] == "ACTIVE" + + @pytest.mark.order(6) + def test_get_dataset(self): + """Get dataset metadata.""" + if not self.dataset_ids: + pytest.skip("prerequisite test did not create dataset") + dataset = self.client.get_dataset(datasetId=self.dataset_ids[0]) + assert dataset["datasetId"] == self.dataset_ids[0] + assert dataset["status"] == "ACTIVE" + + @pytest.mark.order(7) + def test_list_datasets_contains_created(self): + """List datasets includes the one we created.""" + if not self.dataset_ids: + pytest.skip("prerequisite test did not create dataset") + response = self.client.list_datasets() + ids = [d["datasetId"] for d in response["datasets"]] + assert self.dataset_ids[0] in ids + + @pytest.mark.order(8) + def test_update_dataset_metadata(self): + """Update dataset description (sync operation).""" + if not self.dataset_ids: + pytest.skip("prerequisite test did not create dataset") + response = self.client.update_dataset( + datasetId=self.dataset_ids[0], + description="updated by integ test", + ) + assert response["datasetId"] == self.dataset_ids[0] + # Verify via get + dataset = self.client.get_dataset(datasetId=self.dataset_ids[0]) + assert dataset.get("description") == "updated by integ test" + + @pytest.mark.order(9) + def test_add_examples_and_wait(self): + """Add examples to dataset and wait for ACTIVE.""" + if not self.dataset_ids: + pytest.skip("prerequisite test did not create dataset") + dataset = self.client.add_examples_and_wait( + datasetId=self.dataset_ids[0], + source={ + "inlineExamples": { + "examples": [ + { + "scenario_id": "test-scenario-2", + "turns": [{"input": "What is 2+2?", "expected_response": "4"}], + } + ] + } + }, + ) + assert dataset["status"] == "ACTIVE" + + @pytest.mark.order(10) + def test_list_examples(self): + """List examples in dataset.""" + if not self.dataset_ids: + pytest.skip("prerequisite test did not create dataset") + response = self.client.list_dataset_examples(datasetId=self.dataset_ids[0]) + assert "examples" in response + assert len(response["examples"]) >= 2 + + @pytest.mark.order(11) + def test_create_dataset_version_and_wait(self): + """Create a version from DRAFT and wait for ACTIVE.""" + if not self.dataset_ids: + pytest.skip("prerequisite test did not create dataset") + dataset = self.client.create_dataset_version_and_wait( + datasetId=self.dataset_ids[0], + ) + assert dataset["status"] == "ACTIVE" + + @pytest.mark.order(12) + def test_list_dataset_versions(self): + """List versions of the dataset.""" + if not self.dataset_ids: + pytest.skip("prerequisite test did not create dataset") + response = self.client.list_dataset_versions(datasetId=self.dataset_ids[0]) + assert "versions" in response + assert len(response["versions"]) >= 1 + + @pytest.mark.order(13) + def test_delete_dataset_version_and_wait(self): + """Version-specific delete returns the dataset to ACTIVE, dataset still exists. + + Requires at least 2 versions. Creates a second version first so the deletion + does not target the last remaining version (which the service rejects). + """ + if not self.dataset_ids: + pytest.skip("prerequisite test did not create dataset") + did = self.dataset_ids[0] + + # Make a second version so we have something we can safely delete. + self.client.add_examples_and_wait( + datasetId=did, + source={ + "inlineExamples": { + "examples": [ + { + "scenario_id": "version-delete-fixture", + "turns": [{"input": "filler", "expected_response": "ok"}], + } + ] + } + }, + ) + self.client.create_dataset_version_and_wait(datasetId=did) + + versions_before = self.client.list_dataset_versions(datasetId=did)["versions"] + assert len(versions_before) >= 2, "need >= 2 versions for version-specific delete" + # Delete the oldest version to keep the most recent one. + target_version = str(min(int(v["datasetVersion"]) for v in versions_before)) + + result = self.client.delete_dataset_and_wait(datasetId=did, datasetVersion=target_version) + + # Version-specific delete returns the dataset (not None) at ACTIVE. + assert result is not None + assert result["status"] == "ACTIVE" + + # Dataset itself is still present. + ds = self.client.get_dataset(datasetId=did) + assert ds["datasetId"] == did + + # Target version is gone. + versions_after = self.client.list_dataset_versions(datasetId=did)["versions"] + remaining = [str(v["datasetVersion"]) for v in versions_after] + assert target_version not in remaining + + @pytest.mark.order(14) + def test_delete_dataset_and_wait(self): + """Full delete (no datasetVersion) removes the dataset.""" + if not self.dataset_ids: + pytest.skip("prerequisite test did not create dataset") + did = self.dataset_ids.pop(0) + result = self.client.delete_dataset_and_wait(datasetId=did) + assert result is None + with pytest.raises(ClientError) as exc_info: + self.client.get_dataset(datasetId=did) + assert exc_info.value.response["Error"]["Code"] == "ResourceNotFoundException" diff --git a/tests_integ/evaluation/test_runners_with_service_dataset.py b/tests_integ/evaluation/test_runners_with_service_dataset.py new file mode 100644 index 00000000..daf4619b --- /dev/null +++ b/tests_integ/evaluation/test_runners_with_service_dataset.py @@ -0,0 +1,147 @@ +"""Integration tests for OnDemandEvaluationDatasetRunner using DatasetManagementServiceProvider. + +These tests verify the full pipeline: + DatasetManagementServiceProvider → Dataset → Runner → Agent Invocation → Evaluation + +Required env vars: + INTEG_AGENT_RUNTIME_ARN: ARN of a deployed, invokable agent runtime + (region is extracted from the ARN automatically) + +Optional env vars: + INTEG_AGENT_LOG_GROUP: CloudWatch log group for the agent's spans + (defaults to /aws/bedrock-agentcore/runtimes/{runtime_id}-DEFAULT) + +Run with: + export INTEG_AGENT_RUNTIME_ARN= + uv run pytest tests_integ/evaluation/test_runners_with_service_dataset.py -xvs +""" + +import json +import os +import time +import uuid + +import boto3 +import pytest + +from bedrock_agentcore.evaluation.dataset_client import DatasetClient +from bedrock_agentcore.evaluation.runner.dataset_providers import DatasetManagementServiceProvider +from bedrock_agentcore.evaluation.runner.invoker_types import AgentInvokerInput, AgentInvokerOutput +from bedrock_agentcore.evaluation.runner.on_demand.config import EvaluationRunConfig, EvaluatorConfig +from bedrock_agentcore.evaluation.runner.on_demand.on_demand_runner import OnDemandEvaluationDatasetRunner + +RUNTIME_ARN = os.environ.get("INTEG_AGENT_RUNTIME_ARN") +REGION = RUNTIME_ARN.split(":")[3] if RUNTIME_ARN else os.environ.get("BEDROCK_TEST_REGION", "us-west-2") + + +def _make_invoker(runtime_arn: str, region: str): + """Create an invoker compatible with the hosted echo agent.""" + dp = boto3.client("bedrock-agentcore", region_name=region) + + def invoker(input: AgentInvokerInput) -> AgentInvokerOutput: + prompt = input.payload if isinstance(input.payload, str) else json.dumps(input.payload) + resp = dp.invoke_agent_runtime( + agentRuntimeArn=runtime_arn, + payload=json.dumps({"prompt": prompt}).encode(), + runtimeSessionId=( + input.session_id if input.session_id and len(input.session_id) >= 33 else str(uuid.uuid4()) + ), + ) + body = resp["response"].read().decode() + result = json.loads(body) if body else {} + return AgentInvokerOutput(agent_output=result.get("result", result)) + + return invoker + + +@pytest.mark.integration +@pytest.mark.skipif(not RUNTIME_ARN, reason="INTEG_AGENT_RUNTIME_ARN not set") +class TestOnDemandRunnerWithServiceDataset: + """OnDemandEvaluationDatasetRunner + DatasetManagementServiceProvider end-to-end.""" + + @classmethod + def setup_class(cls): + cls.region = REGION + cls.runtime_arn = RUNTIME_ARN + cls.runtime_id = cls.runtime_arn.split("/")[-1] # type: ignore[union-attr] + + cls.log_group = os.environ.get( + "INTEG_AGENT_LOG_GROUP", + f"/aws/bedrock-agentcore/runtimes/{cls.runtime_id}-DEFAULT", + ) + + cls.client = DatasetClient(region_name=cls.region) + cls.test_prefix = f"sdk_integ_runner_{int(time.time())}" + cls.dataset_ids = [] + + ds = cls.client.create_dataset_and_wait( + datasetName=f"{cls.test_prefix}_ondemand", + schemaType="AGENTCORE_EVALUATION_PREDEFINED_V1", + source={ + "inlineExamples": { + "examples": [ + { + "scenario_id": "greeting", + "turns": [{"input": "Hello", "expected_response": "Hi there!"}], + "assertions": ["Agent should respond politely"], + }, + { + "scenario_id": "math", + "turns": [{"input": "What is 2+2?", "expected_response": "4"}], + }, + ] + } + }, + ) + cls.dataset_id = ds["datasetId"] + cls.dataset_ids.append(cls.dataset_id) + + @classmethod + def teardown_class(cls): + for did in cls.dataset_ids: + try: + cls.client.delete_dataset(datasetId=did) + except Exception as e: + print(f"Failed to delete dataset {did}: {e}") + + @pytest.mark.order(1) + def test_on_demand_runner_executes_scenarios(self): + """OnDemandRunner invokes agent for each scenario from DatasetManagementServiceProvider.""" + provider = DatasetManagementServiceProvider( + dataset_id=self.dataset_id, + client=self.client, + ) + dataset = provider.get_dataset() + assert len(dataset.scenarios) == 2 + + runner = OnDemandEvaluationDatasetRunner(region=self.region) + invoker = _make_invoker(self.runtime_arn, self.region) + + from bedrock_agentcore.evaluation.agent_span_collector import CloudWatchAgentSpanCollector + + collector = CloudWatchAgentSpanCollector( + log_group_name=self.log_group, + region=self.region, + ) + + config = EvaluationRunConfig( + evaluator_config=EvaluatorConfig(evaluator_ids=["Builtin.Helpfulness"]), + evaluation_delay_seconds=30, + max_concurrent_scenarios=2, + ) + + result = runner.run( + config=config, + dataset=dataset, + agent_invoker=invoker, + span_collector=collector, + ) + + # Both scenarios should have been executed + assert len(result.scenario_results) == 2 + scenario_ids = {r.scenario_id for r in result.scenario_results} + assert "greeting" in scenario_ids + assert "math" in scenario_ids + # Both should complete (agent invocation succeeded) + for sr in result.scenario_results: + assert sr.status == "COMPLETED", f"Scenario {sr.scenario_id} failed: {sr.error}" diff --git a/tests_integ/evaluation/test_service_dataset_provider.py b/tests_integ/evaluation/test_service_dataset_provider.py new file mode 100644 index 00000000..2999f4d8 --- /dev/null +++ b/tests_integ/evaluation/test_service_dataset_provider.py @@ -0,0 +1,167 @@ +"""Integration tests for DatasetManagementServiceProvider. + +Tests that DatasetManagementServiceProvider can fetch a dataset from the service +and return it as an SDK Dataset object usable by OnDemandEvaluationDatasetRunner. + +Run with: + uv run pytest tests_integ/evaluation/test_service_dataset_provider.py -xvs --log-cli-level=INFO +""" + +import os +import time + +import pytest + +from bedrock_agentcore.evaluation.dataset_client import DatasetClient +from bedrock_agentcore.evaluation.runner.dataset_providers import DatasetManagementServiceProvider +from bedrock_agentcore.evaluation.runner.dataset_types import ( + Dataset, + PredefinedScenario, + SimulatedScenario, +) + + +@pytest.mark.integration +class TestDatasetManagementServiceProvider: + """Tests DatasetManagementServiceProvider with both PREDEFINED and SYNTHETIC schema types.""" + + @classmethod + def setup_class(cls): + cls.region = os.environ.get("BEDROCK_TEST_REGION", "us-west-2") + cls.client = DatasetClient(region_name=cls.region) + cls.test_prefix = f"sdk_integ_provider_{int(time.time())}" + cls.dataset_ids = [] + + # Create a PREDEFINED_TURNS dataset + predefined = cls.client.create_dataset_and_wait( + datasetName=f"{cls.test_prefix}_predefined", + schemaType="AGENTCORE_EVALUATION_PREDEFINED_V1", + source={ + "inlineExamples": { + "examples": [ + { + "scenario_id": "greeting-scenario", + "turns": [ + {"input": "Hello", "expected_response": "Hi!"}, + {"input": "How are you?", "expected_response": "I'm good!"}, + ], + "assertions": ["Agent should be polite"], + "expected_trajectory": ["greet_user"], + }, + { + "scenario_id": "math-scenario", + "turns": [ + {"input": "What is 2+2?", "expected_response": "4"}, + ], + }, + ] + } + }, + ) + cls.predefined_dataset_id = predefined["datasetId"] + cls.dataset_ids.append(cls.predefined_dataset_id) + + # Create a version for version-specific fetch test + cls.client.create_dataset_version_and_wait(datasetId=cls.predefined_dataset_id) + + # Create a SYNTHETIC dataset + synthetic = cls.client.create_dataset_and_wait( + datasetName=f"{cls.test_prefix}_synthetic", + schemaType="AGENTCORE_EVALUATION_SIMULATED_V1", + source={ + "inlineExamples": { + "examples": [ + { + "scenario_id": "frustrated-customer", + "scenario_description": "Customer wants a refund", + "actor_profile": { + "traits": {"personality": "impatient"}, + "context": "Has been waiting 3 days", + "goal": "Get a refund", + }, + "input": "I want my money back!", + "max_turns": 5, + "assertions": ["Agent should empathize"], + }, + ] + } + }, + ) + cls.synthetic_dataset_id = synthetic["datasetId"] + cls.dataset_ids.append(cls.synthetic_dataset_id) + + @classmethod + def teardown_class(cls): + for did in cls.dataset_ids: + try: + cls.client.delete_dataset(datasetId=did) + except Exception as e: + print(f"Failed to delete dataset {did}: {e}") + + # --- Predefined turns tests --- + + @pytest.mark.order(1) + def test_get_predefined_dataset(self): + """DatasetManagementServiceProvider returns a valid SDK Dataset with PredefinedScenarios.""" + provider = DatasetManagementServiceProvider( + dataset_id=self.predefined_dataset_id, + client=self.client, + ) + dataset = provider.get_dataset() + + assert isinstance(dataset, Dataset) + assert len(dataset.scenarios) == 2 + for scenario in dataset.scenarios: + assert isinstance(scenario, PredefinedScenario) + + @pytest.mark.order(2) + def test_predefined_fields_preserved(self): + """Scenario fields (turns, assertions, trajectory) are preserved.""" + provider = DatasetManagementServiceProvider( + dataset_id=self.predefined_dataset_id, + client=self.client, + ) + dataset = provider.get_dataset() + + greeting = next(s for s in dataset.scenarios if s.scenario_id == "greeting-scenario") + assert len(greeting.turns) == 2 + assert greeting.turns[0].input == "Hello" + assert greeting.turns[0].expected_response == "Hi!" + assert greeting.assertions == ["Agent should be polite"] + assert greeting.expected_trajectory == ["greet_user"] + + @pytest.mark.order(3) + def test_get_predefined_with_version(self): + """DatasetManagementServiceProvider can fetch a specific version.""" + versions_resp = self.client.list_dataset_versions(datasetId=self.predefined_dataset_id) + version_id = str(versions_resp["versions"][0]["datasetVersion"]) + + provider = DatasetManagementServiceProvider( + dataset_id=self.predefined_dataset_id, + version_id=version_id, + client=self.client, + ) + dataset = provider.get_dataset() + + assert isinstance(dataset, Dataset) + assert len(dataset.scenarios) == 2 + + # --- Synthetic tests --- + + @pytest.mark.order(4) + def test_get_synthetic_dataset(self): + """DatasetManagementServiceProvider returns SimulatedScenarios for SYNTHETIC schema.""" + provider = DatasetManagementServiceProvider( + dataset_id=self.synthetic_dataset_id, + client=self.client, + ) + dataset = provider.get_dataset() + + assert isinstance(dataset, Dataset) + assert len(dataset.scenarios) == 1 + scenario = dataset.scenarios[0] + assert isinstance(scenario, SimulatedScenario) + assert scenario.scenario_id == "frustrated-customer" + assert scenario.actor_profile.goal == "Get a refund" + assert scenario.max_turns == 5 + assert scenario.assertions == ["Agent should empathize"]