Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions vulnerabilities/importers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@
from vulnerabilities.pipelines import pypa_importer
from vulnerabilities.pipelines import pysec_importer
from vulnerabilities.pipelines.v2_importers import alpine_linux_importer as alpine_linux_importer_v2
from vulnerabilities.pipelines.v2_importers import (
alpine_security_importer as alpine_security_importer_v2,
)
from vulnerabilities.pipelines.v2_importers import aosp_importer as aosp_importer_v2
from vulnerabilities.pipelines.v2_importers import apache_httpd_importer as apache_httpd_v2
from vulnerabilities.pipelines.v2_importers import apache_kafka_importer as apache_kafka_importer_v2
Expand Down Expand Up @@ -118,6 +121,7 @@
retiredotnet_importer_v2.RetireDotnetImporterPipeline,
ubuntu_osv_importer_v2.UbuntuOSVImporterPipeline,
alpine_linux_importer_v2.AlpineLinuxImporterPipeline,
alpine_security_importer_v2.AlpineSecurityImporterPipeline,
nvd_importer.NVDImporterPipeline,
github_importer.GitHubAPIImporterPipeline,
gitlab_importer.GitLabImporterPipeline,
Expand Down
188 changes: 188 additions & 0 deletions vulnerabilities/pipelines/v2_importers/alpine_security_importer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

import json
import logging
from typing import Iterable

import requests
from packageurl import PackageURL
from univers.version_range import AlpineLinuxVersionRange
from univers.versions import InvalidVersion

from vulnerabilities.importer import AdvisoryDataV2
from vulnerabilities.importer import AffectedPackageV2
from vulnerabilities.importer import ReferenceV2
from vulnerabilities.importer import VulnerabilitySeverity
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2
from vulnerabilities.severity_systems import SCORING_SYSTEMS

logger = logging.getLogger(__name__)

ALPINE_SECURITY_ROOT = "https://security.alpinelinux.org/"
BRANCH_URL = "https://security.alpinelinux.org/branch/{branch}"
ADVISORY_HEADERS = {"Accept": "application/ld+json"}

# EOL branches absent from root API index; 3.13-3.16 omitted (return 0 items)
HISTORICAL_BRANCHES = [
"3.22-community",
"3.18-main",
"3.17-main",
"3.12-main",
"3.11-main",
"3.10-main",
]


def get_branches() -> list:
"""Discover active branches from the root API and append HISTORICAL_BRANCHES."""
try:
resp = requests.get(ALPINE_SECURITY_ROOT, headers=ADVISORY_HEADERS, timeout=30)
resp.raise_for_status()
data = resp.json()
# Branch entries have dict values; scalar values indicate non-branch keys.
active = [k for k, v in data.items() if isinstance(v, dict)]
except (requests.RequestException, ValueError) as e:
logger.error("Failed to discover branches from root API: %s", e)
active = []

seen = set(active)
return active + [b for b in HISTORICAL_BRANCHES if b not in seen]


class AlpineSecurityImporterPipeline(VulnerableCodeBaseImporterPipelineV2):
"""Collect Alpine Linux advisories from https://security.alpinelinux.org/."""

pipeline_id = "alpine_security_importer"
spdx_license_expression = "CC-BY-SA-4.0"
license_url = "https://security.alpinelinux.org/"
precedence = 200

@classmethod
def steps(cls):
return (cls.collect_and_store_advisories,)

def advisories_count(self) -> int:
count = 0
for branch in get_branches():
url = BRANCH_URL.format(branch=branch)
try:
resp = requests.get(url, headers=ADVISORY_HEADERS, timeout=30)
resp.raise_for_status()
data = resp.json()
except (requests.RequestException, ValueError) as e:
logger.error("Failed to fetch branch %s: %s", branch, e)
continue
count += len(data.get("items") or [])
return count

def collect_advisories(self) -> Iterable[AdvisoryDataV2]:
for branch in get_branches():
url = BRANCH_URL.format(branch=branch)
try:
resp = requests.get(url, headers=ADVISORY_HEADERS, timeout=30)
resp.raise_for_status()
data = resp.json()
except (requests.RequestException, ValueError) as e:
logger.error("Failed to fetch branch %s: %s", branch, e)
continue
for item in data.get("items") or []:
advisory = parse_advisory(item)
if advisory:
yield advisory


def parse_advisory(data: dict):
"""Parse a JSON-LD advisory; return None if the advisory ID is missing."""
cve_url = data.get("id") or ""
cve_id = cve_url.rstrip("/").split("/")[-1]
if not cve_id:
return None

summary = data.get("description") or ""

references = []
for ref in data.get("ref") or []:
ref_url = ref.get("rel") or ""
if ref_url:
references.append(
ReferenceV2(
url=ref_url,
reference_type=ref.get("referenceType") or "",
)
)
for cpe_match in data.get("cpeMatch") or []:
cpe_uri = cpe_match.get("cpeUri") or ""
cpe_id = cpe_match.get("id") or ""
if cpe_uri and cpe_id:
references.append(ReferenceV2(url=cpe_id, reference_id=cpe_uri))

severities = []
cvss3 = data.get("cvss3") or {}
cvss_score = cvss3.get("score")
cvss_vector = cvss3.get("vector") or ""
if cvss_vector and cvss_score:
if cvss_vector.startswith("CVSS:3.1/"):
system = SCORING_SYSTEMS["cvssv3.1"]
else:
system = SCORING_SYSTEMS["cvssv3"]
severities.append(
VulnerabilitySeverity(
system=system,
value=str(cvss_score),
scoring_elements=cvss_vector,
)
)
Comment on lines +129 to +143
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure we only have CVSS3 ?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I sampled 20+ live advisories across multiple branches and the API only ever returns a cvss3 key at the top level. No cvss2 or cvss4 fields exist in the JSON-LD schema for this endpoint.


affected_packages = []
for state in data.get("state") or []:
if not state.get("fixed"):
continue
pkg_version_url = state.get("packageVersion") or ""
repo = state.get("repo") or ""
parts = pkg_version_url.rstrip("/").split("/")
if len(parts) < 2:
continue
pkg_name = parts[-2]
version = parts[-1]
if not pkg_name or not version:
continue
repo_parts = repo.split("-", 1)
if len(repo_parts) != 2:
continue
version_tag, reponame = repo_parts
distroversion = version_tag if version_tag == "edge" else f"v{version_tag}"
purl = PackageURL(
type="apk",
namespace="alpine",
name=pkg_name,
qualifiers={"distroversion": distroversion, "reponame": reponame},
)
try:
fixed_version_range = AlpineLinuxVersionRange.from_versions([version])
except InvalidVersion:
logger.warning("Cannot parse Alpine version %r in %s", version, cve_id)
continue
affected_packages.append(
AffectedPackageV2(
package=purl,
fixed_version_range=fixed_version_range,
)
)

return AdvisoryDataV2(
advisory_id=cve_id,
aliases=[],
summary=summary,
affected_packages=affected_packages,
references=references,
severities=severities,
url=cve_url,
original_advisory_text=json.dumps(data, indent=2, ensure_ascii=False),
)
117 changes: 117 additions & 0 deletions vulnerabilities/tests/test_alpine_security_importer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

import os
from unittest import TestCase
from unittest.mock import MagicMock
from unittest.mock import patch

import requests

from vulnerabilities.pipelines.v2_importers.alpine_security_importer import (
AlpineSecurityImporterPipeline,
)
from vulnerabilities.pipelines.v2_importers.alpine_security_importer import parse_advisory
from vulnerabilities.tests import util_tests
from vulnerabilities.utils import load_json

BASE_DIR = os.path.dirname(os.path.abspath(__file__))
TEST_DATA = os.path.join(BASE_DIR, "test_data/alpine_security")


class TestAlpineSecurityImporter(TestCase):
def test_parse_advisory_with_cvss(self):
"""Advisory with CVSS 3.1 score, references, and no fixed versions."""
data = load_json(os.path.join(TEST_DATA, "alpine_security_mock1.json"))
expected = os.path.join(TEST_DATA, "expected_alpine_security_output1.json")
result = parse_advisory(data)
self.assertIsNotNone(result)
util_tests.check_results_against_json(result.to_dict(), expected)

def test_parse_advisory_with_fixed_states(self):
"""Advisory with no CVSS but multiple fixed package versions across branches."""
data = load_json(os.path.join(TEST_DATA, "alpine_security_mock2.json"))
expected = os.path.join(TEST_DATA, "expected_alpine_security_output2.json")
result = parse_advisory(data)
self.assertIsNotNone(result)
util_tests.check_results_against_json(result.to_dict(), expected)

def test_parse_advisory_missing_id_returns_none(self):
"""Advisory with an empty id field must be skipped."""
data = {
"id": "",
"description": "test",
"cvss3": {"score": 0.0, "vector": None},
"ref": [],
"state": [],
}
self.assertIsNone(parse_advisory(data))

def test_parse_advisory_skips_malformed_package_url(self):
"""State entries with a packageVersion URL too short to parse must be skipped."""
data = {
"id": "https://security.alpinelinux.org/vuln/CVE-2099-00001",
"description": "test",
"cvss3": {"score": 0.0, "vector": None},
"ref": [],
"state": [
{
"fixed": True,
"packageVersion": "https://security.alpinelinux.org/srcpkg/",
"repo": "edge-main",
}
],
}
result = parse_advisory(data)
self.assertIsNotNone(result)
self.assertEqual(result.affected_packages, [])

def test_parse_advisory_skips_unfixed_states(self):
"""State entries with fixed=False must not produce affected_packages."""
data = {
"id": "https://security.alpinelinux.org/vuln/CVE-2099-00002",
"description": "test",
"cvss3": {"score": 0.0, "vector": None},
"ref": [],
"state": [
{
"fixed": False,
"packageVersion": "https://security.alpinelinux.org/srcpkg/curl/8.0.0-r0",
"repo": "edge-main",
}
],
}
result = parse_advisory(data)
self.assertIsNotNone(result)
self.assertEqual(result.affected_packages, [])


class TestAlpineSecurityImporterPipeline(TestCase):
@patch("vulnerabilities.pipelines.v2_importers.alpine_security_importer.get_branches")
@patch("vulnerabilities.pipelines.v2_importers.alpine_security_importer.requests.get")
def test_collect_advisories_yields_advisory(self, mock_get, mock_branches):
mock_branches.return_value = ["3.19-main"]
data = load_json(os.path.join(TEST_DATA, "alpine_security_mock1.json"))
resp = MagicMock()
resp.json.return_value = {"items": [data]}
resp.raise_for_status.return_value = None
mock_get.return_value = resp
advisories = list(AlpineSecurityImporterPipeline().collect_advisories())
self.assertGreater(len(advisories), 0)

@patch("vulnerabilities.pipelines.v2_importers.alpine_security_importer.get_branches")
@patch("vulnerabilities.pipelines.v2_importers.alpine_security_importer.requests.get")
def test_collect_advisories_http_error_logs_and_continues(self, mock_get, mock_branches):
mock_branches.return_value = ["3.19-main"]
mock_get.side_effect = requests.RequestException("timeout")
logger_name = "vulnerabilities.pipelines.v2_importers.alpine_security_importer"
with self.assertLogs(logger_name, level="ERROR") as cm:
advisories = list(AlpineSecurityImporterPipeline().collect_advisories())
self.assertEqual(advisories, [])
self.assertTrue(any("timeout" in msg for msg in cm.output))
Loading
Loading