Skip to content

Commit

Permalink
Refactor DebUpdateReleaseFileAttributes and add unit tests
Browse files Browse the repository at this point in the history
[noissue]
  • Loading branch information
hstct committed Jan 20, 2025
1 parent cb489f6 commit 044e806
Show file tree
Hide file tree
Showing 2 changed files with 388 additions and 110 deletions.
278 changes: 168 additions & 110 deletions pulp_deb/app/tasks/synchronizing.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,133 +351,117 @@ def __init__(self, remote, *args, **kwargs):
super().__init__(*args, **kwargs)
self.remote = remote
self.gpgkey = remote.gpgkey
self.gpg = None
if self.gpgkey:
gnupghome = os.path.join(os.getcwd(), "gpg-home")
os.makedirs(gnupghome)
os.makedirs(gnupghome, exist_ok=True)
self.gpg = gnupg.GPG(gpgbinary="/usr/bin/gpg", gnupghome=gnupghome)
import_res = self.gpg.import_keys(self.gpgkey)
if import_res.count == 0:
log.warning(_("Key import failed."))
pass

async def run(self):
"""
Parse ReleaseFile content units.
Update release content with information obtained from its artifact.
Parse ReleaseFile content units, verify GPG if needed, and update attributes.
"""
dropped_count = 0
async with ProgressReport(
message="Update ReleaseFile units", code="update.release_file"
) as pb:
async for d_content in self.items():
if isinstance(d_content.content, ReleaseFile):
release_file = d_content.content
da_names = {
os.path.basename(da.relative_path): da for da in d_content.d_artifacts
}
if "Release" in da_names:
if "Release.gpg" in da_names:
if self.gpgkey:
with NamedTemporaryFile() as tmp_file:
tmp_file.write(da_names["Release"].artifact.file.read())
tmp_file.flush()
verified = self.gpg.verify_file(
da_names["Release.gpg"].artifact.file, tmp_file.name
)
if verified.valid:
log.info(_("Verification of Release successful."))
release_file_artifact = da_names["Release"].artifact
release_file.relative_path = da_names["Release"].relative_path
else:
log.warning(_("Verification of Release failed. Dropping it."))
d_content.d_artifacts.remove(da_names.pop("Release"))
d_content.d_artifacts.remove(da_names.pop("Release.gpg"))
dropped_count += 1
else:
release_file_artifact = da_names["Release"].artifact
release_file.relative_path = da_names["Release"].relative_path
else:
if self.gpgkey:
d_content.d_artifacts.remove(da_names["Release"])
else:
release_file_artifact = da_names["Release"].artifact
release_file.relative_path = da_names["Release"].relative_path
else:
if "Release.gpg" in da_names:
# No need to keep the signature without "Release"
d_content.d_artifacts.remove(da_names.pop("Release.gpg"))

if "InRelease" in da_names:
if self.gpgkey:
verified = self.gpg.verify_file(da_names["InRelease"].artifact.file)
if verified.valid:
log.info(_("Verification of InRelease successful."))
release_file_artifact = da_names["InRelease"].artifact
release_file.relative_path = da_names["InRelease"].relative_path
else:
log.warning(_("Verification of InRelease failed. Dropping it."))
d_content.d_artifacts.remove(da_names.pop("InRelease"))
dropped_count += 1
else:
release_file_artifact = da_names["InRelease"].artifact
release_file.relative_path = da_names["InRelease"].relative_path
release_file = d_content.content
if not isinstance(release_file, ReleaseFile):
await self.put(d_content)
continue

if not d_content.d_artifacts:
# No (proper) artifacts left -> distribution not found
release_file_url = urljoin(self.remote.url, release_file.relative_path)
if dropped_count > 0:
raise NoValidSignatureForKey(url=release_file_url)
else:
raise NoReleaseFile(url=release_file_url)

release_file.sha256 = release_file_artifact.sha256
release_file.artifact_set_sha256 = _get_artifact_set_sha256(
d_content, ReleaseFile.SUPPORTED_ARTIFACTS
)
release_file_dict = deb822.Release(release_file_artifact.file)
if "codename" in release_file_dict:
release_file.codename = release_file_dict["Codename"]
if "suite" in release_file_dict:
release_file.suite = release_file_dict["Suite"]

if "components" in release_file_dict:
release_file.components = release_file_dict["Components"]
elif "component" in release_file_dict and settings.PERMISSIVE_SYNC:
release_file.components = release_file_dict["Component"]
elif release_file.distribution[-1] == "/":
message = (
"The Release file for distribution '{}' contains no 'Components' "
"field, but since we are dealing with a flat repo, we can continue "
"regardless."
)
log.warning(_(message).format(release_file.distribution))
# TODO: Consider not setting the field at all (requires migrations).
release_file.components = ""
else:
raise MissingReleaseFileField(release_file.distribution, "Components")

if "architectures" in release_file_dict:
release_file.architectures = release_file_dict["Architectures"]
elif "architecture" in release_file_dict and settings.PERMISSIVE_SYNC:
release_file.architectures = release_file_dict["Architecture"]
elif release_file.distribution[-1] == "/":
message = (
"The Release file for distribution '{}' contains no 'Architectures' "
"field, but since we are dealing with a flat repo, we can extract them "
"from the repos single Package index later."
)
log.warning(_(message).format(release_file.distribution))
release_file.architectures = ""
else:
raise MissingReleaseFileField(release_file.distribution, "Architectures")

log.debug(_("Codename: {}").format(release_file.codename))
log.debug(_("Components: {}").format(release_file.components))
log.debug(_("Architectures: {}").format(release_file.architectures))
await pb.aincrement()
await self.process_release_file_d_content(d_content, pb)
await self.put(d_content)

async def process_release_file_d_content(self, d_content, pb):
"""
Orchestrates the steps for a single ReleaseFile item.
"""
release_da, release_gpg_da, inrelease_da = _collect_release_artifacts(d_content)

release_artifact = None
if self.gpg:
release_artifact = self.verify_gpg_artifacts(
d_content, release_da, release_gpg_da, inrelease_da
)
else:
# If no gpgkey, pick main artifact in an order: InRelease => Release => None
if inrelease_da:
release_artifact = inrelease_da.artifact
d_content.content.relative_path = inrelease_da.relative_path
elif release_da:
release_artifact = release_da.artifact
d_content.content.relative_path = release_da.relative_path

# If there isn't a valid release
if not release_artifact:
if release_da in d_content.d_artifacts:
d_content.d_artifacts.remove(release_da)
if release_gpg_da in d_content.d_artifacts:
d_content.d_artifacts.remove(release_gpg_da)
if inrelease_da in d_content.d_artifacts:
d_content.d_artifacts.remove(inrelease_da)
raise NoReleaseFile(url=os.path.join(self.remote.url, d_content.content.relative_path))

d_content.content.sha256 = release_artifact.sha256
d_content.content.artifact_set_sha256 = _get_artifact_set_sha256(
d_content, ReleaseFile.SUPPORTED_ARTIFACTS
)
_parse_release_file_attributes(d_content, release_artifact)
await pb.aincrement()

def verify_gpg_artifacts(self, d_content, release_da, release_gpg_da, inrelease_da):
"""
Handle GPG verification. Returns the main artifact or raises and exception.
"""
if inrelease_da:
if self.verify_single_file(inrelease_da.artifact):
d_content.content.relative_path = inrelease_da.relative_path
return inrelease_da.artifact
else:
log.warning(_("Verification of InRelease failed. Removing it."))
d_content.d_artifacts.remove(inrelease_da)

if release_da and release_gpg_da:
if self.verify_detached_signature(release_da.artifact, release_gpg_da.artifact):
d_content.content.relative_path = release_da.relative_path
return release_da.artifact
else:
log.warning(_("Verification of Release + Release.gpg failed. Removing it."))
d_content.d_artifacts.remove(release_da)
d_content.d_artifacts.remove(release_gpg_da)
elif release_da:
log.warning(_("Release found bu no signature and gpgkey was provided. Removing it."))
d_content.d_artifacts.remove(release_da)

raise NoValidSignatureForKey(url=os.path.join(self.remote.url, "Release"))

def verify_single_file(self, artifact):
"""
Attempt to verify an inline-signed file (InRelease).
Returns True if valid, False otherwise.
"""
with artifact.file.open("rb") as inrelease_fh:
verified = self.gpg.verify_file(inrelease_fh)
return bool(verified.valid)

def verify_detached_signature(self, release_artifact, release_gpg_artifact):
"""
Attempt to verify a detached signature with "Release" and "Release.gpg".
Return True if valid, False otherwise.
"""
with NamedTemporaryFile() as tmp_file:
with release_artifact.file.open("rb") as rel_fh:
tmp_file.write(rel_fh.read())
tmp_file.flush()

with release_gpg_artifact.file.open("rb") as detached_fh:
verified = self.gpg.verify_file(detached_fh, tmp_file.name)
return bool(verified.valid)


class DebUpdatePackageIndexAttributes(Stage): # TODO: Needs a new name
"""
Expand Down Expand Up @@ -1322,6 +1306,80 @@ def _save_artifact_blocking(d_artifact):
d_artifact.artifact.touch()


def _collect_release_artifacts(d_content):
"""
Return (release_da, release_gpg_da, inrelease_da) from d_content.d_artifacts.
Looks for items whose filename is exactly "Release", "Release.gpg", or "InRelease".
If not found, return None for that slot.
"""
release_da = None
release_gpg_da = None
inrelease_da = None

da_names = {os.path.basename(da.relative_path): da for da in d_content.d_artifacts}
if "Release" in da_names:
release_da = da_names["Release"]
if "Release.gpg" in da_names:
release_gpg_da = da_names["Release.gpg"]
if "InRelease" in da_names:
inrelease_da = da_names["InRelease"]

return release_da, release_gpg_da, inrelease_da


def _parse_release_file_attributes(d_content, main_artifact):
"""
Parse the contents of main_artifact as a 'Release' file and update
d_content.content.* fields accordingly (e.g. codename, suite, etc.).
"""
from debian import deb822

with main_artifact.file.open("rb") as fh:
release_file_data = fh.read()
release_dict = deb822.Release(release_file_data)

if "codename" in release_dict:
d_content.content.codename = release_dict["Codename"]
if "suite" in release_dict:
d_content.content.suite = release_dict["Suite"]
if "components" in release_dict:
d_content.content.components = release_dict["Components"]
elif "component" in release_dict and settings.PERMISSIVE_SYNC:
d_content.content.components = release_dict["Component"]
elif d_content.content.distribution[-1] == "/":
message = (
"The Release file for distribution '{}' contains no 'Components' "
"field, but since we are dealing with a flat repo, we can continue "
"regardless."
)
log.warning(_(message).format(d_content.content.distribution))
# TODO: Consider not setting the field at all (requires migrations).
d_content.content.components = ""
else:
raise MissingReleaseFileField(d_content.content.distribution, "Components")

if "architectures" in release_dict:
d_content.content.architectures = release_dict["Architectures"]
elif "architecture" in release_dict and settings.PERMISSIVE_SYNC:
d_content.content.architectures = release_dict["Architecture"]
elif d_content.content.distribution[-1] == "/":
message = (
"The Release file for distribution '{}' contains no 'Architectures' "
"field, but since we are dealing with a flat repo, we can extract them "
"from the repos single Package index later."
)
log.warning(_(message).format(d_content.content.distribution))
d_content.content.architectures = ""
else:
raise MissingReleaseFileField(d_content.content.distribution, "Architectures")

log.debug(f"Codename: {d_content.content.codename}")
log.debug(f"Suite: {d_content.content.suite}")
log.debug(f"Components: {d_content.content.components}")
log.debug(f"Architectures: {d_content.content.architectures}")


def _get_artifact_set_sha256(d_content, supported_artifacts):
"""
Get the checksum of checksums for a set of artifacts associated with a multi artifact
Expand Down
Loading

0 comments on commit 044e806

Please sign in to comment.