|
7 | 7 | # See https://aboutcode.org for more information about nexB OSS projects. |
8 | 8 | # |
9 | 9 | import logging |
10 | | -import os |
11 | 10 | from pathlib import Path |
12 | 11 | from typing import Iterable |
13 | 12 |
|
14 | 13 | import saneyaml |
| 14 | +from fetchcode.vcs import fetch_via_vcs |
15 | 15 |
|
16 | 16 | from vulnerabilities.importer import AdvisoryData |
17 | | -from vulnerabilities.importer import Importer |
18 | 17 | from vulnerabilities.importers.osv import parse_advisory_data |
| 18 | +from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipeline |
19 | 19 | from vulnerabilities.utils import get_advisory_url |
20 | 20 |
|
21 | | -logger = logging.getLogger(__name__) |
| 21 | +module_logger = logging.getLogger(__name__) |
22 | 22 |
|
23 | 23 |
|
24 | | -class PyPaImporter(Importer): |
25 | | - license_url = "https://github.com/pypa/advisory-database/blob/main/LICENSE" |
| 24 | +class PyPaImporterPipeline(VulnerableCodeBaseImporterPipeline): |
| 25 | + """Collect advisories from PyPA GitHub repository.""" |
| 26 | + |
26 | 27 | spdx_license_expression = "CC-BY-4.0" |
| 28 | + license_url = "https://github.com/pypa/advisory-database/blob/main/LICENSE" |
27 | 29 | repo_url = "git+https://github.com/pypa/advisory-database" |
28 | 30 | importer_name = "Pypa Importer" |
29 | 31 |
|
30 | | - def advisory_data(self) -> Iterable[AdvisoryData]: |
31 | | - try: |
32 | | - vcs_response = self.clone(repo_url=self.repo_url) |
33 | | - path = Path(vcs_response.dest_dir) |
34 | | - for advisory_url, raw_data in fork_and_get_files(base_path=path): |
35 | | - yield parse_advisory_data( |
36 | | - raw_data=raw_data, |
37 | | - supported_ecosystems=["pypi"], |
38 | | - advisory_url=advisory_url, |
39 | | - ) |
40 | | - finally: |
41 | | - if self.vcs_response: |
42 | | - self.vcs_response.delete() |
| 32 | + @classmethod |
| 33 | + def steps(cls): |
| 34 | + return ( |
| 35 | + cls.clone, |
| 36 | + cls.collect_and_store_advisories, |
| 37 | + cls.import_new_advisories, |
| 38 | + cls.clean_downloads, |
| 39 | + ) |
43 | 40 |
|
| 41 | + def clone(self): |
| 42 | + self.log(f"Cloning `{self.repo_url}`") |
| 43 | + self.vcs_response = fetch_via_vcs(self.repo_url) |
44 | 44 |
|
45 | | -class ForkError(Exception): |
46 | | - pass |
| 45 | + def advisories_count(self): |
| 46 | + vulns_directory = Path(self.vcs_response.dest_dir) / "vulns" |
| 47 | + return sum(1 for _ in vulns_directory.rglob("*.yaml")) |
47 | 48 |
|
| 49 | + def collect_advisories(self) -> Iterable[AdvisoryData]: |
| 50 | + base_directory = Path(self.vcs_response.dest_dir) |
| 51 | + vulns_directory = base_directory / "vulns" |
| 52 | + self.advisories_count = sum(1 for _ in vulns_directory.rglob("*.yaml")) |
48 | 53 |
|
49 | | -def fork_and_get_files(base_path) -> dict: |
50 | | - """ |
51 | | - Yield advisorie data mappings from the PyPA GitHub repository at ``url``. |
52 | | - """ |
53 | | - advisory_dirs = os.path.join(base_path, "vulns") |
54 | | - for root, _, files in os.walk(advisory_dirs): |
55 | | - for file in files: |
56 | | - path = os.path.join(root, file) |
57 | | - if not file.endswith(".yaml"): |
58 | | - logger.warning(f"Unsupported non-YAML PyPA advisory file: {path}") |
59 | | - continue |
| 54 | + for advisory in vulns_directory.rglob("*.yaml"): |
60 | 55 | advisory_url = get_advisory_url( |
61 | | - file=Path(path), |
62 | | - base_path=base_path, |
| 56 | + file=advisory, |
| 57 | + base_path=base_directory, |
63 | 58 | url="https://github.com/pypa/advisory-database/blob/main/", |
64 | 59 | ) |
65 | | - with open(path) as f: |
66 | | - yield advisory_url, saneyaml.load(f.read()) |
| 60 | + advisory_dict = saneyaml.load(advisory.read_text()) |
| 61 | + yield parse_advisory_data( |
| 62 | + raw_data=advisory_dict, |
| 63 | + supported_ecosystems=["pypi"], |
| 64 | + advisory_url=advisory_url, |
| 65 | + ) |
| 66 | + |
| 67 | + def clean_downloads(self): |
| 68 | + if self.vcs_response: |
| 69 | + self.log(f"Removing cloned repository") |
| 70 | + self.vcs_response.delete() |
0 commit comments