diff --git a/vulnerabilities/importers/__init__.py b/vulnerabilities/importers/__init__.py index c0cf04ed7..349255879 100644 --- a/vulnerabilities/importers/__init__.py +++ b/vulnerabilities/importers/__init__.py @@ -52,6 +52,9 @@ from vulnerabilities.pipelines.v2_importers import ( elixir_security_importer as elixir_security_importer_v2, ) +from vulnerabilities.pipelines.v2_importers import ( + elixir_security_live_importer as elixir_security_live_importer_v2, +) from vulnerabilities.pipelines.v2_importers import epss_importer_v2 from vulnerabilities.pipelines.v2_importers import fireeye_importer_v2 from vulnerabilities.pipelines.v2_importers import gentoo_importer as gentoo_importer_v2 @@ -196,3 +199,9 @@ for key, value in IMPORTERS_REGISTRY.items() if issubclass(value, VulnerableCodeBaseImporterPipelineV2) and value.exclude_from_package_todo ] + +LIVE_IMPORTERS_REGISTRY = create_registry( + [ + elixir_security_live_importer_v2.ElixirSecurityLiveImporterPipeline, + ] +) diff --git a/vulnerabilities/pipelines/v2_importers/elixir_security_importer.py b/vulnerabilities/pipelines/v2_importers/elixir_security_importer.py index 2269d0fbc..8b80246bf 100644 --- a/vulnerabilities/pipelines/v2_importers/elixir_security_importer.py +++ b/vulnerabilities/pipelines/v2_importers/elixir_security_importer.py @@ -58,32 +58,26 @@ def advisories_count(self) -> int: return count def collect_advisories(self) -> Iterable[AdvisoryDataV2]: - try: - base_path = Path(self.vcs_response.dest_dir) - vuln = base_path / "packages" - for file in vuln.glob("**/*.yml"): - yield from self.process_file(file, base_path) - finally: - if self.vcs_response: - self.vcs_response.delete() + base_path = Path(self.vcs_response.dest_dir) + vuln = base_path / "packages" + for file in vuln.glob("**/*.yml"): + relative_path = str(file.relative_to(base_path)).strip("/") + path_segments = str(file).split("/") + # use the last two segments as the advisory ID + advisory_id = "/".join(path_segments[-2:]).replace(".yml", "") + advisory_url = f"https://github.com/dependabot/elixir-security-advisories/blob/master/{relative_path}" + + yaml_file = load_yaml(str(file)) + yield from self.build_advisory_from_text( + advisory_id=advisory_id, advisory_url=advisory_url, yaml_file=yaml_file + ) def on_failure(self): self.clean_downloads() - def process_file(self, file, base_path) -> Iterable[AdvisoryDataV2]: - relative_path = str(file.relative_to(base_path)).strip("/") - path_segments = str(file).split("/") - # use the last two segments as the advisory ID - advisory_id = "/".join(path_segments[-2:]).replace(".yml", "") - advisory_url = ( - f"https://github.com/dependabot/elixir-security-advisories/blob/master/{relative_path}" - ) - advisory_text = None - with open(str(file)) as f: - advisory_text = f.read() - - yaml_file = load_yaml(str(file)) - + def build_advisory_from_text( + self, advisory_id, advisory_url, yaml_file + ) -> Iterable[AdvisoryDataV2]: summary = yaml_file.get("description") or "" pkg_name = yaml_file.get("package") or "" @@ -138,5 +132,5 @@ def process_file(self, file, base_path) -> Iterable[AdvisoryDataV2]: affected_packages=affected_packages, url=advisory_url, date_published=date_published, - original_advisory_text=advisory_text or str(yaml_file), + original_advisory_text=str(yaml_file), ) diff --git a/vulnerabilities/pipelines/v2_importers/elixir_security_live_importer.py b/vulnerabilities/pipelines/v2_importers/elixir_security_live_importer.py new file mode 100644 index 000000000..931ada2b9 --- /dev/null +++ b/vulnerabilities/pipelines/v2_importers/elixir_security_live_importer.py @@ -0,0 +1,118 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# VulnerableCode is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# See https://github.com/aboutcode-org/vulnerablecode for support or download. +# See https://aboutcode.org for more information about nexB OSS projects. +# + +from typing import Iterable + +import requests +from packageurl import PackageURL +from univers.versions import SemverVersion + +from vulnerabilities.importer import AdvisoryDataV2 +from vulnerabilities.pipelines.v2_importers.elixir_security_importer import ( + ElixirSecurityImporterPipeline, +) +from vulnerabilities.utils import fetch_yaml + + +class ElixirSecurityLiveImporterPipeline(ElixirSecurityImporterPipeline): + """ + Elixir Security Advisories Importer Pipeline + + This pipeline imports security advisories for a single elixir PURL. + """ + + pipeline_id = "elixir_security_live_importer_v2" + supported_types = ["hex"] + + @classmethod + def steps(cls): + return ( + cls.get_purl_inputs, + cls.collect_and_store_advisories, + ) + + def get_purl_inputs(self): + purl = self.inputs["purl"] + if not purl: + raise ValueError("PURL is required for ElixirSecurityLiveImporterPipeline") + + if isinstance(purl, str): + purl = PackageURL.from_string(purl) + + if not isinstance(purl, PackageURL): + raise ValueError(f"Object of type {type(purl)} {purl!r} is not a PackageURL instance") + + if purl.type not in self.supported_types: + raise ValueError( + f"PURL: {purl!s} is not among the supported package types {self.supported_types!r}" + ) + + self.purl = purl + + def advisories_count(self) -> int: + return 0 + + def collect_advisories(self) -> Iterable[AdvisoryDataV2]: + package_name = self.purl.name + try: + directory_url = f"https://api.github.com/repos/dependabot/elixir-security-advisories/contents/packages/{package_name}" + response = requests.get(directory_url) + + if response.status_code != 200: + self.log(f"No advisories found for {package_name} in Elixir Security Database") + return [] + + yaml_entries = [file for file in response.json() if file["name"].endswith(".yml")] + + for entry in yaml_entries: + # entry["path"] looks like: packages//.yml + file_path = entry["path"] + advisory_url = f"https://api.github.com/repos/dependabot/elixir-security-advisories/contents/{file_path}" + advisory_text = fetch_yaml( + advisory_url, headers={"Accept": "application/vnd.github.v3.raw"} + ) + + path_segments = str(file_path).split("/") + # use the last two segments as the advisory ID + advisory_id = "/".join(path_segments[-2:]).replace(".yml", "") + + for advisory in self.build_advisory_from_text( + advisory_id=advisory_id, + yaml_file=advisory_text, + advisory_url=advisory_url, + ): + if self.purl.version and not self.validate_advisory(advisory): + continue + yield advisory + + except Exception as e: + self.log(f"Error fetching advisories for {self.purl}: {str(e)}") + return [] + + def validate_advisory(self, advisory: AdvisoryDataV2) -> bool: + if not self.purl.version: + return True + + for affected_package in advisory.affected_packages: + try: + purl_version = SemverVersion(self.purl.version) + if ( + affected_package.affected_version_range + and purl_version in affected_package.affected_version_range + ) or ( + affected_package.fixed_version_range + and purl_version in affected_package.fixed_version_range + ): + return True + + except Exception as e: + self.log(f"Failed to parse version {self.purl.version}: {str(e)}") + # Since we have a small package file, if we fail to parse the versions, we can just return all of them + return True + return False diff --git a/vulnerabilities/tests/pipelines/v2_importers/test_elixir_security_live_importer_v2.py b/vulnerabilities/tests/pipelines/v2_importers/test_elixir_security_live_importer_v2.py new file mode 100644 index 000000000..07827568b --- /dev/null +++ b/vulnerabilities/tests/pipelines/v2_importers/test_elixir_security_live_importer_v2.py @@ -0,0 +1,98 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# VulnerableCode is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# See https://github.com/aboutcode-org/vulnerablecode for support or download. +# See https://aboutcode.org for more information about nexB OSS projects. +# + +from pathlib import Path +from unittest.mock import MagicMock +from unittest.mock import patch + +import pytest +from packageurl import PackageURL + +from vulnerabilities.pipelines.v2_importers.elixir_security_live_importer import ( + ElixirSecurityLiveImporterPipeline, +) + + +@pytest.fixture +def test_data_dir(): + return Path(__file__).parent.parent.parent / "test_data" / "elixir_security" + + +@patch("requests.get") +def test_package_first_mode_with_version_filter(mock_get, test_data_dir): + directory_response = MagicMock() + directory_response.status_code = 200 + directory_response.json.return_value = [ + {"name": "test_file.yml", "path": "packages/coherence/test_file.yml"} + ] + + advisory_file_path = test_data_dir / "test_file.yml" + advisory_content = advisory_file_path.read_text() + + content_response = MagicMock() + content_response.status_code = 200 + content_response.content = advisory_content + + mock_get.side_effect = [directory_response, content_response] + + # Version affected + purl = PackageURL(type="hex", name="coherence", version="0.5.1") + importer = ElixirSecurityLiveImporterPipeline(purl=purl) + importer.get_purl_inputs() + advisories = list(importer.collect_advisories()) + assert len(advisories) == 1 + + # Version not affected + mock_get.side_effect = [directory_response, content_response] + purl = PackageURL(type="hex", name="coherence", version="0.5.2") + importer = ElixirSecurityLiveImporterPipeline(purl=purl) + importer.get_purl_inputs() + advisories = list(importer.collect_advisories()) + assert len(advisories) == 0 + + +@patch("requests.get") +def test_package_first_mode_no_advisories(mock_get): + mock_response = MagicMock() + mock_response.status_code = 404 + mock_get.return_value = mock_response + + purl = PackageURL(type="hex", name="nonexistent-package") + importer = ElixirSecurityLiveImporterPipeline(purl=purl) + importer.get_purl_inputs() + advisories = list(importer.collect_advisories()) + assert len(advisories) == 0 + + +@patch("requests.get") +def test_package_first_mode_api_error(mock_get): + directory_response = MagicMock() + directory_response.status_code = 200 + directory_response.json.return_value = [ + {"name": "test_file.yml", "path": "packages/coherence/test_file.yml"} + ] + + content_response = MagicMock() + content_response.status_code = 500 + content_response.content = b"" + + mock_get.side_effect = [directory_response, content_response] + + purl = PackageURL(type="hex", name="coherence", version="0.5.1") + importer = ElixirSecurityLiveImporterPipeline(purl=purl) + importer.get_purl_inputs() + advisories = list(importer.collect_advisories()) + assert len(advisories) == 0 + + +def test_package_first_mode_non_hex_purl(): + purl = PackageURL(type="npm", name="some-package") + importer = ElixirSecurityLiveImporterPipeline(purl=purl) + with pytest.raises(ValueError): + importer.get_purl_inputs() diff --git a/vulnerabilities/utils.py b/vulnerabilities/utils.py index 2e618a920..6c8a0b341 100644 --- a/vulnerabilities/utils.py +++ b/vulnerabilities/utils.py @@ -77,8 +77,8 @@ def load_toml(path): return toml.load(f) -def fetch_yaml(url): - response = requests.get(url) +def fetch_yaml(url, headers=None): + response = requests.get(url, headers=headers) return saneyaml.load(response.content)