Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cpiofile: Add unittest opening a symlink as fileobj #140

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .vscode/ltex.dictionary.en-US.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ codecov
covcombine
coverallsapp
cpio
cpiofile
cpioinfo
euxv
ibft
ifrename
kname
lastboot
linkname
logbuf
MACPCI
nektos
Expand Down
80 changes: 55 additions & 25 deletions tests/test_cpiofile.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import pytest
from pyfakefs.fake_filesystem import FakeFileOpen, FakeFilesystem

from xcp.cpiofile import CpioFile
from xcp.cpiofile import CpioFile, StreamError

binary_data = b"\x00\x1b\x5b\x95\xb1\xb2\xb3\xb4\xb5\xb6\xb7\xb8\xb9\xcc\xdd\xee\xff"

Expand Down Expand Up @@ -48,53 +48,79 @@ def test_cpiofile_modes(fs):
if comp == "xz" and filetype == ":":
continue # streaming xz is not implemented (supported only as file)
check_archive_mode(filetype + comp, fs)
if filetype == "|":
check_archive_mode(filetype + comp, fs, filename="archive." + comp)


def check_archive_mode(archive_mode, fs):
# type: (str, FakeFilesystem) -> None
def create_cpio_archive(fs, archive_mode, filename=None):
# type: (FakeFilesystem, str, str | None) -> io.BytesIO | None
"""
Test CpioFile in the given archive mode with verification of the archive contents.
Create a CpioFile archive with files and directories from a FakeFilesystem.

:param fs: `FakeFilesystem` fixture representing a simulated file system for testing
:param archive_mode: The archive mode is a string parameter that specifies the mode
in which the CpioFile object should be opened.
:param fs: `FakeFilesystem` fixture representing a simulated file system for testing
:param filename: The name of the file to create the cpio archive
"""
# Step 1: Create and populate a cpio archive in a BytesIO buffer
bytesio = io.BytesIO()
archive = CpioFile.open(fileobj=bytesio, mode="w" + archive_mode)
pyfakefs_populate_archive(archive, fs)
cpiofile = None if filename else io.BytesIO()
fs.reset()
cpio = CpioFile.open(name=filename, fileobj=cpiofile, mode="w" + archive_mode)
pyfakefs_populate_archive(cpio, fs)
if archive_mode == "|gz":
archive.list(verbose=True)
archive.close()
cpio.list(verbose=True)
cpio.close()
if not cpiofile:
cpio_data = FakeFileOpen(fs)(filename, "rb").read()
fs.reset()
fs.create_file(filename, contents=cast(str, cpio_data))
return None
fs.reset()
cpiofile.seek(0)
return cpiofile


def check_archive_mode(archive_mode, fs, filename=None):
# type: (str, FakeFilesystem, str | None) -> None
"""
Test CpioFile in the given archive mode with verification of the archive contents.

:param archive_mode: The archive mode is a string parameter that specifies the mode
in which the CpioFile object should be opened.
:param fs: `FakeFilesystem` fixture representing a simulated file system for testing
"""
# Step 2: Extract the archive in a clean filesystem and verify the extracted contents
fs.reset()
bytesio.seek(0)
archive = CpioFile.open(fileobj=bytesio, mode="r" + archive_mode)
cpiofile = create_cpio_archive(fs, archive_mode, filename)
archive = CpioFile.open(name=filename, fileobj=cpiofile, mode="r" + archive_mode)
archive.extractall()
pyfakefs_verify_filesystem(fs)
assert archive.getnames() == ["dirname", "dirname/filename", "dir2/symlink"]
assert archive.getnames() == ["dirname", "dirname/filename", "symlink", "dir2/file_2"]
dirs = [cpioinfo.name for cpioinfo in archive.getmembers() if cpioinfo.isdir()]
files = [cpioinfo.name for cpioinfo in archive.getmembers() if cpioinfo.isreg()]
symlinks = [cpioinfo.name for cpioinfo in archive.getmembers() if cpioinfo.issym()]
assert dirs == ["dirname"]
assert files == ["dirname/filename"]
assert symlinks == ["dir2/symlink"]
assert archive.getmember(symlinks[0]).linkname == "symlink_target"
assert files == ["dirname/filename", "dir2/file_2"]
assert symlinks == ["symlink"]
assert archive.getmember(symlinks[0]).linkname == "dirname/filename"

# Test extracting a symlink to a file object:
if archive_mode.startswith("|"): # Non-seekable streams raise StreamError
with pytest.raises(StreamError):
archive.extractfile("symlink")
else: # Expect a seekable fileobj for this test (not a stream) to work:
fileobj = archive.extractfile("symlink")
assert fileobj and fileobj.read() == binary_data
archive.close()

# Step 3: Extract the archive a second time using another method
fs.reset()
bytesio.seek(0)
archive = CpioFile.open(fileobj=bytesio, mode="r" + archive_mode)
cpiofile = create_cpio_archive(fs, archive_mode, filename)
archive = CpioFile.open(name=filename, fileobj=cpiofile, mode="r" + archive_mode)
if archive_mode[0] != "|":
for cpioinfo in archive:
archive.extract(cpioinfo)
pyfakefs_verify_filesystem(fs)
if archive_mode == "|xz":
archive.list(verbose=True)
archive.close()
bytesio.close()


def pyfakefs_populate_archive(archive, fs):
Expand All @@ -105,11 +131,12 @@ def pyfakefs_populate_archive(archive, fs):
:param archive: Instance of the CpioFile class to create a new cpio archive
:param fs: `FakeFilesystem` fixture representing a simulated file system for testing
"""
fs.reset()

fs.create_file("dirname/filename", contents=cast(str, binary_data))
archive.add("dirname", recursive=True)
fs.create_symlink("directory/symlink", "symlink_target")
fs.create_file("directory/file_2", contents=cast(str, binary_data))
fs.create_symlink("symlink", "dirname/filename")
archive.add("symlink")

# Test special code path of archive.add(".", ...):
os.chdir("directory")
Expand All @@ -129,7 +156,10 @@ def pyfakefs_verify_filesystem(fs):

:param fs: `FakeFilesystem` fixture representing a simulated file system for testing
"""
assert fs.islink("dir2/symlink")
assert fs.islink("symlink")
assert fs.isfile("dirname/filename")
assert fs.isfile("dir2/file_2")
with FakeFileOpen(fs)("dirname/filename", "rb") as contents:
assert contents.read() == binary_data
with FakeFileOpen(fs)("dir2/file_2", "rb") as contents:
assert contents.read() == binary_data
9 changes: 3 additions & 6 deletions xcp/cpiofile.py
Original file line number Diff line number Diff line change
Expand Up @@ -1463,7 +1463,7 @@ def extract(self, member, path=""):
self._dbg(1, "cpiofile: %s" % e)

def extractfile(self, member):
# type:(CpioInfo) -> ExFileObject | None
# type:(CpioInfo | str | bytes) -> ExFileObject | None
"""Extract a member from the archive as a file object. `member' may be
a filename or a CpioInfo object. If `member' is a regular file, a
file-like object is returned. If `member' is a link, a file-like
Expand All @@ -1489,11 +1489,8 @@ def extractfile(self, member):
# A small but ugly workaround for the case that someone tries
# to extract a symlink as a file-object from a non-seekable
# stream of cpio blocks.
raise StreamError("cannot extract symlink as file object")
else:
# A symlink's file object is its target's file object.
return self.extractfile(self._getmember(cpioinfo.linkname,
cpioinfo)) # type: ignore
raise StreamError("Need a seekable stream to open() a symlink target!")
return self.extractfile(cpioinfo.linkname)
else:
# If there's no data associated with the member (directory, chrdev,
# blkdev, etc.), return None instead of a file object.
Expand Down
Loading