Skip to content

Commit d767512

Browse files
authored
Feature/595 create vulnerability tracking class (#597)
* Group poetry.lock and pyproject.toml in frozen dataclass * Move PoetryFiles and create poetry_files_from_latest_tag in shared_models. Add get_vulnerabilities_from_latest_tag * Add test for poetry_files_from_latest_tag * Add basic unit tests for get_vulnerabilities and get_vulnerabilities_from_latest_tag * Add ResolvedVulnerabilities to detect if vulnerability has been resolved from previous work to the present * Add coordinates * Move VulnerabilitySource to audit.py for shared usage * Move references_links to Vulnerability * Add vulnerability_id and subsection_for_changelog_summary with tests * Make docstring sentence clearer what the intent is by matching ID and aliases per review * Change structure of vulnerability so clearer delineation between package information and vulnerability
1 parent a666ede commit d767512

File tree

14 files changed

+425
-109
lines changed

14 files changed

+425
-109
lines changed

doc/changes/unreleased.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@
66
* #535: Added more information about Sonar's usage of ``exclusions``
77
* #596: Corrected and added more information regarding ``pyupgrade``
88

9+
## Features
10+
11+
* #595: Created class `ResolvedVulnerabilities` to track resolved vulnerabilities between versions
12+
913
## Refactoring
1014

1115
* #596: Added newline after header in versioned changelog

exasol/toolbox/nox/_lint.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from nox import Session
1212

1313
from exasol.toolbox.nox._shared import python_files
14+
from exasol.toolbox.util.dependencies.shared_models import PoetryFiles
1415
from noxconfig import PROJECT_CONFIG
1516

1617

@@ -140,7 +141,7 @@ def security_lint(session: Session) -> None:
140141
@nox.session(name="lint:dependencies", python=False)
141142
def dependency_check(session: Session) -> None:
142143
"""Checks if only valid sources of dependencies are used"""
143-
content = Path(PROJECT_CONFIG.root, "pyproject.toml").read_text()
144+
content = Path(PROJECT_CONFIG.root, PoetryFiles.pyproject_toml).read_text()
144145
dependencies = Dependencies.parse(content)
145146
console = rich.console.Console()
146147
if illegal := dependencies.illegal:

exasol/toolbox/nox/_release.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
check_for_config_attribute,
1515
)
1616
from exasol.toolbox.nox.plugin import NoxTasks
17+
from exasol.toolbox.util.dependencies.shared_models import PoetryFiles
1718
from exasol.toolbox.util.git import Git
1819
from exasol.toolbox.util.release.changelog import Changelogs
1920
from exasol.toolbox.util.version import (
@@ -142,7 +143,7 @@ def prepare_release(session: Session) -> None:
142143
return
143144

144145
changed_files += [
145-
PROJECT_CONFIG.root / "pyproject.toml",
146+
PROJECT_CONFIG.root / PoetryFiles.pyproject_toml,
146147
PROJECT_CONFIG.version_file,
147148
]
148149
results = pm.hook.prepare_release_add_files(session=session, config=PROJECT_CONFIG)

exasol/toolbox/tools/security.py

Lines changed: 12 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,11 @@
1818
from functools import partial
1919
from inspect import cleandoc
2020
from pathlib import Path
21-
from typing import Optional
2221

2322
import typer
2423

24+
from exasol.toolbox.util.dependencies.audit import VulnerabilitySource
25+
2526
stdout = print
2627
stderr = partial(print, file=sys.stderr)
2728

@@ -104,45 +105,14 @@ def from_maven(report: str) -> Iterable[Issue]:
104105
)
105106

106107

107-
class VulnerabilitySource(str, Enum):
108-
CVE = "CVE"
109-
CWE = "CWE"
110-
GHSA = "GHSA"
111-
PYSEC = "PYSEC"
112-
113-
@classmethod
114-
def from_prefix(cls, name: str) -> VulnerabilitySource | None:
115-
for el in cls:
116-
if name.upper().startswith(el.value):
117-
return el
118-
return None
119-
120-
def get_link(self, package: str, vuln_id: str) -> str:
121-
if self == VulnerabilitySource.CWE:
122-
cwe_id = vuln_id.upper().replace(f"{VulnerabilitySource.CWE.value}-", "")
123-
return f"https://cwe.mitre.org/data/definitions/{cwe_id}.html"
124-
125-
map_link = {
126-
VulnerabilitySource.CVE: "https://nvd.nist.gov/vuln/detail/{vuln_id}",
127-
VulnerabilitySource.GHSA: "https://github.com/advisories/{vuln_id}",
128-
VulnerabilitySource.PYSEC: "https://github.com/pypa/advisory-database/blob/main/vulns/{package}/{vuln_id}.yaml",
129-
}
130-
return map_link[self].format(package=package, vuln_id=vuln_id)
131-
132-
133-
def identify_pypi_references(
134-
references: list[str], package_name: str
135-
) -> tuple[list[str], list[str], list[str]]:
108+
def identify_pypi_references(references: list[str]) -> tuple[list[str], list[str]]:
136109
refs: dict = {k: [] for k in VulnerabilitySource}
137-
links = []
138110
for reference in references:
139111
if source := VulnerabilitySource.from_prefix(reference.upper()):
140112
refs[source].append(reference)
141-
links.append(source.get_link(package=package_name, vuln_id=reference))
142113
return (
143114
refs[VulnerabilitySource.CVE],
144115
refs[VulnerabilitySource.CWE],
145-
links,
146116
)
147117

148118

@@ -167,6 +137,11 @@ def from_pip_audit(report: str) -> Iterable[Issue]:
167137
"CVE-2025-27516"
168138
],
169139
"description": "An oversight ..."
140+
"coordinates": "jinja2:3.1.5",
141+
"references": [
142+
"https://github.com/advisories/GHSA-cpwx-vrp4-4pq7",
143+
"https://nvd.nist.gov/vuln/detail/CVE-2025-27516"
144+
]
170145
}
171146
]
172147
@@ -178,16 +153,16 @@ def from_pip_audit(report: str) -> Iterable[Issue]:
178153
vulnerabilities = json.loads(report)
179154

180155
for vulnerability in vulnerabilities:
181-
cves, cwes, links = identify_pypi_references(
182-
references=vulnerability["refs"], package_name=vulnerability["name"]
156+
cves, cwes = identify_pypi_references(
157+
references=vulnerability["refs"],
183158
)
184159
if cves:
185160
yield Issue(
186161
cve=sorted(cves)[0],
187162
cwe="None" if not cwes else ", ".join(cwes),
188163
description=vulnerability["description"],
189-
coordinates=f"{vulnerability['name']}:{vulnerability['version']}",
190-
references=tuple(links),
164+
coordinates=vulnerability["coordinates"],
165+
references=tuple(vulnerability["references"]),
191166
)
192167

193168

exasol/toolbox/util/dependencies/audit.py

Lines changed: 96 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,23 @@
44
import subprocess # nosec
55
import tempfile
66
from dataclasses import dataclass
7+
from enum import Enum
8+
from inspect import cleandoc
79
from pathlib import Path
810
from re import search
911
from typing import (
1012
Any,
11-
Union,
1213
)
1314

14-
from pydantic import BaseModel
15+
from pydantic import (
16+
BaseModel,
17+
ConfigDict,
18+
)
1519

16-
from exasol.toolbox.util.dependencies.shared_models import Package
20+
from exasol.toolbox.util.dependencies.shared_models import (
21+
Package,
22+
poetry_files_from_latest_tag,
23+
)
1724

1825
PIP_AUDIT_VULNERABILITY_PATTERN = (
1926
r"^Found \d+ known vulnerabilit\w{1,3} in \d+ package\w?$"
@@ -32,7 +39,36 @@ def __init__(self, subprocess_output: subprocess.CompletedProcess) -> None:
3239
self.stderr = subprocess_output.stderr
3340

3441

35-
class Vulnerability(Package):
42+
class VulnerabilitySource(str, Enum):
43+
CVE = "CVE"
44+
CWE = "CWE"
45+
GHSA = "GHSA"
46+
PYSEC = "PYSEC"
47+
48+
@classmethod
49+
def from_prefix(cls, name: str) -> VulnerabilitySource | None:
50+
for el in cls:
51+
if name.upper().startswith(el.value):
52+
return el
53+
return None
54+
55+
def get_link(self, package: str, vuln_id: str) -> str:
56+
if self == VulnerabilitySource.CWE:
57+
cwe_id = vuln_id.upper().replace(f"{VulnerabilitySource.CWE.value}-", "")
58+
return f"https://cwe.mitre.org/data/definitions/{cwe_id}.html"
59+
60+
map_link = {
61+
VulnerabilitySource.CVE: "https://nvd.nist.gov/vuln/detail/{vuln_id}",
62+
VulnerabilitySource.GHSA: "https://github.com/advisories/{vuln_id}",
63+
VulnerabilitySource.PYSEC: "https://github.com/pypa/advisory-database/blob/main/vulns/{package}/{vuln_id}.yaml",
64+
}
65+
return map_link[self].format(package=package, vuln_id=vuln_id)
66+
67+
68+
class Vulnerability(BaseModel):
69+
model_config = ConfigDict(frozen=True, arbitrary_types_allowed=True)
70+
71+
package: Package
3672
id: str
3773
aliases: list[str]
3874
fix_versions: list[str]
@@ -46,23 +82,61 @@ def from_audit_entry(
4682
Create a Vulnerability from a pip-audit vulnerability entry
4783
"""
4884
return cls(
49-
name=package_name,
50-
version=version,
85+
package=Package(name=package_name, version=version),
5186
id=vuln_entry["id"],
5287
aliases=vuln_entry["aliases"],
5388
fix_versions=vuln_entry["fix_versions"],
5489
description=vuln_entry["description"],
5590
)
5691

5792
@property
58-
def security_issue_entry(self) -> dict[str, str | list[str]]:
93+
def references(self) -> list[str]:
94+
return sorted([self.id] + self.aliases)
95+
96+
@property
97+
def reference_links(self) -> tuple[str, ...]:
98+
return tuple(
99+
source.get_link(package=self.package.name, vuln_id=reference)
100+
for reference in self.references
101+
if (source := VulnerabilitySource.from_prefix(reference.upper()))
102+
)
103+
104+
@property
105+
def security_issue_entry(self) -> dict[str, str | list[str] | tuple[str, ...]]:
59106
return {
60-
"name": self.name,
61-
"version": str(self.version),
62-
"refs": [self.id] + self.aliases,
107+
"name": self.package.name,
108+
"version": str(self.package.version),
109+
"refs": self.references,
63110
"description": self.description,
111+
"coordinates": self.package.coordinates,
112+
"references": self.reference_links,
64113
}
65114

115+
@property
116+
def vulnerability_id(self) -> str | None:
117+
"""
118+
Ensure a consistent way of identifying a vulnerability for string generation.
119+
"""
120+
for ref in self.references:
121+
ref_upper = ref.upper()
122+
if ref_upper.startswith(VulnerabilitySource.CVE.value):
123+
return ref
124+
if ref_upper.startswith(VulnerabilitySource.GHSA.value):
125+
return ref
126+
if ref_upper.startswith(VulnerabilitySource.PYSEC.value):
127+
return ref
128+
return self.references[0]
129+
130+
@property
131+
def subsection_for_changelog_summary(self) -> str:
132+
"""
133+
Create a subsection to be included in the Summary section of a versioned changelog.
134+
"""
135+
links_join = "\n* ".join(sorted(self.reference_links))
136+
references_subsection = f"\n#### References:\n\n* {links_join}\n\n "
137+
subsection = f"### {self.vulnerability_id} in {self.package.coordinates}\n\n{self.description}\n{references_subsection}"
138+
return cleandoc(subsection.strip())
139+
66140

67141
def audit_poetry_files(working_directory: Path) -> str:
68142
"""
@@ -141,7 +215,18 @@ def load_from_pip_audit(cls, working_directory: Path) -> Vulnerabilities:
141215
return Vulnerabilities(vulnerabilities=vulnerabilities)
142216

143217
@property
144-
def security_issue_dict(self) -> list[dict[str, str | list[str]]]:
218+
def security_issue_dict(self) -> list[dict[str, str | list[str] | tuple[str, ...]]]:
145219
return [
146220
vulnerability.security_issue_entry for vulnerability in self.vulnerabilities
147221
]
222+
223+
224+
def get_vulnerabilities(working_directory: Path) -> list[Vulnerability]:
225+
return Vulnerabilities.load_from_pip_audit(
226+
working_directory=working_directory
227+
).vulnerabilities
228+
229+
230+
def get_vulnerabilities_from_latest_tag():
231+
with poetry_files_from_latest_tag() as tmp_dir:
232+
return get_vulnerabilities(working_directory=tmp_dir)

exasol/toolbox/util/dependencies/poetry_dependencies.py

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
from __future__ import annotations
22

33
import subprocess
4-
import tempfile
54
from collections import OrderedDict
65
from pathlib import Path
7-
from typing import Optional
86

97
import tomlkit
108
from pydantic import (
@@ -16,9 +14,9 @@
1614
from exasol.toolbox.util.dependencies.shared_models import (
1715
NormalizedPackageStr,
1816
Package,
17+
PoetryFiles,
18+
poetry_files_from_latest_tag,
1919
)
20-
from exasol.toolbox.util.git import Git
21-
from noxconfig import PROJECT_CONFIG
2220

2321

2422
class PoetryGroup(BaseModel):
@@ -28,7 +26,6 @@ class PoetryGroup(BaseModel):
2826
toml_section: str | None
2927

3028

31-
PYPROJECT_TOML = "pyproject.toml"
3229
TRANSITIVE_GROUP = PoetryGroup(name="transitive", toml_section=None)
3330

3431

@@ -39,7 +36,7 @@ class PoetryToml(BaseModel):
3936

4037
@classmethod
4138
def load_from_toml(cls, working_directory: Path) -> PoetryToml:
42-
file_path = working_directory / PYPROJECT_TOML
39+
file_path = working_directory / PoetryFiles.pyproject_toml
4340
if not file_path.exists():
4441
raise ValueError(f"File not found: {file_path}")
4542

@@ -165,10 +162,5 @@ def get_dependencies(
165162
def get_dependencies_from_latest_tag() -> (
166163
OrderedDict[str, dict[NormalizedPackageStr, Package]]
167164
):
168-
latest_tag = Git.get_latest_tag()
169-
path = PROJECT_CONFIG.root.relative_to(Git.toplevel())
170-
with tempfile.TemporaryDirectory() as tmpdir_str:
171-
tmpdir = Path(tmpdir_str)
172-
for file in ("poetry.lock", PYPROJECT_TOML):
173-
Git.checkout(latest_tag, path / file, tmpdir / file)
174-
return get_dependencies(working_directory=tmpdir)
165+
with poetry_files_from_latest_tag() as tmp_dir:
166+
return get_dependencies(working_directory=tmp_dir)

0 commit comments

Comments
 (0)