Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions vulnerabilities/importers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@
from vulnerabilities.pipelines.v2_importers import (
elixir_security_importer as elixir_security_importer_v2,
)
from vulnerabilities.pipelines.v2_importers import (
elixir_security_live_importer as elixir_security_live_importer_v2,
)
from vulnerabilities.pipelines.v2_importers import epss_importer_v2
from vulnerabilities.pipelines.v2_importers import fireeye_importer_v2
from vulnerabilities.pipelines.v2_importers import gentoo_importer as gentoo_importer_v2
Expand Down Expand Up @@ -196,3 +199,9 @@
for key, value in IMPORTERS_REGISTRY.items()
if issubclass(value, VulnerableCodeBaseImporterPipelineV2) and value.exclude_from_package_todo
]

LIVE_IMPORTERS_REGISTRY = create_registry(
[
elixir_security_live_importer_v2.ElixirSecurityLiveImporterPipeline,
]
)
40 changes: 17 additions & 23 deletions vulnerabilities/pipelines/v2_importers/elixir_security_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,32 +58,26 @@ def advisories_count(self) -> int:
return count

def collect_advisories(self) -> Iterable[AdvisoryDataV2]:
try:
base_path = Path(self.vcs_response.dest_dir)
vuln = base_path / "packages"
for file in vuln.glob("**/*.yml"):
yield from self.process_file(file, base_path)
finally:
if self.vcs_response:
self.vcs_response.delete()
base_path = Path(self.vcs_response.dest_dir)
vuln = base_path / "packages"
for file in vuln.glob("**/*.yml"):
relative_path = str(file.relative_to(base_path)).strip("/")
path_segments = str(file).split("/")
# use the last two segments as the advisory ID
advisory_id = "/".join(path_segments[-2:]).replace(".yml", "")
advisory_url = f"https://github.com/dependabot/elixir-security-advisories/blob/master/{relative_path}"

yaml_file = load_yaml(str(file))
yield from self.build_advisory_from_text(
advisory_id=advisory_id, advisory_url=advisory_url, yaml_file=yaml_file
)

def on_failure(self):
self.clean_downloads()

def process_file(self, file, base_path) -> Iterable[AdvisoryDataV2]:
relative_path = str(file.relative_to(base_path)).strip("/")
path_segments = str(file).split("/")
# use the last two segments as the advisory ID
advisory_id = "/".join(path_segments[-2:]).replace(".yml", "")
advisory_url = (
f"https://github.com/dependabot/elixir-security-advisories/blob/master/{relative_path}"
)
advisory_text = None
with open(str(file)) as f:
advisory_text = f.read()

yaml_file = load_yaml(str(file))

def build_advisory_from_text(
self, advisory_id, advisory_url, yaml_file
) -> Iterable[AdvisoryDataV2]:
summary = yaml_file.get("description") or ""
pkg_name = yaml_file.get("package") or ""

Expand Down Expand Up @@ -138,5 +132,5 @@ def process_file(self, file, base_path) -> Iterable[AdvisoryDataV2]:
affected_packages=affected_packages,
url=advisory_url,
date_published=date_published,
original_advisory_text=advisory_text or str(yaml_file),
original_advisory_text=str(yaml_file),
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

from typing import Iterable

import requests
from packageurl import PackageURL
from univers.versions import SemverVersion

from vulnerabilities.importer import AdvisoryDataV2
from vulnerabilities.pipelines.v2_importers.elixir_security_importer import (
ElixirSecurityImporterPipeline,
)
from vulnerabilities.utils import fetch_yaml


class ElixirSecurityLiveImporterPipeline(ElixirSecurityImporterPipeline):
"""
Elixir Security Advisories Importer Pipeline

This pipeline imports security advisories for a single elixir PURL.
"""

pipeline_id = "elixir_security_live_importer_v2"
supported_types = ["hex"]

@classmethod
def steps(cls):
return (
cls.get_purl_inputs,
cls.collect_and_store_advisories,
)

def get_purl_inputs(self):
purl = self.inputs["purl"]
if not purl:
raise ValueError("PURL is required for ElixirSecurityLiveImporterPipeline")

if isinstance(purl, str):
purl = PackageURL.from_string(purl)

if not isinstance(purl, PackageURL):
raise ValueError(f"Object of type {type(purl)} {purl!r} is not a PackageURL instance")

if purl.type not in self.supported_types:
raise ValueError(
f"PURL: {purl!s} is not among the supported package types {self.supported_types!r}"
)

self.purl = purl

def advisories_count(self) -> int:
return 0

def collect_advisories(self) -> Iterable[AdvisoryDataV2]:
package_name = self.purl.name
try:
directory_url = f"https://api.github.com/repos/dependabot/elixir-security-advisories/contents/packages/{package_name}"
response = requests.get(directory_url)

if response.status_code != 200:
self.log(f"No advisories found for {package_name} in Elixir Security Database")
return []

yaml_entries = [file for file in response.json() if file["name"].endswith(".yml")]

for entry in yaml_entries:
# entry["path"] looks like: packages/<pkg>/<file>.yml
file_path = entry["path"]
advisory_url = f"https://api.github.com/repos/dependabot/elixir-security-advisories/contents/{file_path}"
advisory_text = fetch_yaml(
advisory_url, headers={"Accept": "application/vnd.github.v3.raw"}
)

path_segments = str(file_path).split("/")
# use the last two segments as the advisory ID
advisory_id = "/".join(path_segments[-2:]).replace(".yml", "")

for advisory in self.build_advisory_from_text(
advisory_id=advisory_id,
yaml_file=advisory_text,
advisory_url=advisory_url,
):
if self.purl.version and not self.validate_advisory(advisory):
continue
yield advisory

except Exception as e:
self.log(f"Error fetching advisories for {self.purl}: {str(e)}")
return []

def validate_advisory(self, advisory: AdvisoryDataV2) -> bool:
if not self.purl.version:
return True

for affected_package in advisory.affected_packages:
try:
purl_version = SemverVersion(self.purl.version)
if (
affected_package.affected_version_range
and purl_version in affected_package.affected_version_range
) or (
affected_package.fixed_version_range
and purl_version in affected_package.fixed_version_range
):
return True

except Exception as e:
self.log(f"Failed to parse version {self.purl.version}: {str(e)}")
# Since we have a small package file, if we fail to parse the versions, we can just return all of them
return True
return False
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

from pathlib import Path
from unittest.mock import MagicMock
from unittest.mock import patch

import pytest
from packageurl import PackageURL

from vulnerabilities.pipelines.v2_importers.elixir_security_live_importer import (
ElixirSecurityLiveImporterPipeline,
)


@pytest.fixture
def test_data_dir():
return Path(__file__).parent.parent.parent / "test_data" / "elixir_security"


@patch("requests.get")
def test_package_first_mode_with_version_filter(mock_get, test_data_dir):
directory_response = MagicMock()
directory_response.status_code = 200
directory_response.json.return_value = [
{"name": "test_file.yml", "path": "packages/coherence/test_file.yml"}
]

advisory_file_path = test_data_dir / "test_file.yml"
advisory_content = advisory_file_path.read_text()

content_response = MagicMock()
content_response.status_code = 200
content_response.content = advisory_content

mock_get.side_effect = [directory_response, content_response]

# Version affected
purl = PackageURL(type="hex", name="coherence", version="0.5.1")
importer = ElixirSecurityLiveImporterPipeline(purl=purl)
importer.get_purl_inputs()
advisories = list(importer.collect_advisories())
assert len(advisories) == 1

# Version not affected
mock_get.side_effect = [directory_response, content_response]
purl = PackageURL(type="hex", name="coherence", version="0.5.2")
importer = ElixirSecurityLiveImporterPipeline(purl=purl)
importer.get_purl_inputs()
advisories = list(importer.collect_advisories())
assert len(advisories) == 0


@patch("requests.get")
def test_package_first_mode_no_advisories(mock_get):
mock_response = MagicMock()
mock_response.status_code = 404
mock_get.return_value = mock_response

purl = PackageURL(type="hex", name="nonexistent-package")
importer = ElixirSecurityLiveImporterPipeline(purl=purl)
importer.get_purl_inputs()
advisories = list(importer.collect_advisories())
assert len(advisories) == 0


@patch("requests.get")
def test_package_first_mode_api_error(mock_get):
directory_response = MagicMock()
directory_response.status_code = 200
directory_response.json.return_value = [
{"name": "test_file.yml", "path": "packages/coherence/test_file.yml"}
]

content_response = MagicMock()
content_response.status_code = 500
content_response.content = b""

mock_get.side_effect = [directory_response, content_response]

purl = PackageURL(type="hex", name="coherence", version="0.5.1")
importer = ElixirSecurityLiveImporterPipeline(purl=purl)
importer.get_purl_inputs()
advisories = list(importer.collect_advisories())
assert len(advisories) == 0


def test_package_first_mode_non_hex_purl():
purl = PackageURL(type="npm", name="some-package")
importer = ElixirSecurityLiveImporterPipeline(purl=purl)
with pytest.raises(ValueError):
importer.get_purl_inputs()
4 changes: 2 additions & 2 deletions vulnerabilities/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ def load_toml(path):
return toml.load(f)


def fetch_yaml(url):
response = requests.get(url)
def fetch_yaml(url, headers=None):
response = requests.get(url, headers=headers)
return saneyaml.load(response.content)


Expand Down