From 68f27bb35d67b55b5d05fb118d30ec1b7a4fbc56 Mon Sep 17 00:00:00 2001 From: Bernhard Kaindl Date: Mon, 13 May 2024 12:00:00 +0200 Subject: [PATCH] cpiofile: Add unittest opening a symlink as fileobj Signed-off-by: Bernhard Kaindl --- .vscode/ltex.dictionary.en-US.txt | 3 ++ tests/test_cpiofile.py | 80 +++++++++++++++++++++---------- xcp/cpiofile.py | 9 ++-- 3 files changed, 61 insertions(+), 31 deletions(-) diff --git a/.vscode/ltex.dictionary.en-US.txt b/.vscode/ltex.dictionary.en-US.txt index 9aa9c687..f92c872c 100644 --- a/.vscode/ltex.dictionary.en-US.txt +++ b/.vscode/ltex.dictionary.en-US.txt @@ -2,11 +2,14 @@ codecov covcombine coverallsapp cpio +cpiofile +cpioinfo euxv ibft ifrename kname lastboot +linkname logbuf MACPCI nektos diff --git a/tests/test_cpiofile.py b/tests/test_cpiofile.py index 3f7ea231..cd36a2f2 100644 --- a/tests/test_cpiofile.py +++ b/tests/test_cpiofile.py @@ -14,7 +14,7 @@ import pytest from pyfakefs.fake_filesystem import FakeFileOpen, FakeFilesystem -from xcp.cpiofile import CpioFile +from xcp.cpiofile import CpioFile, StreamError binary_data = b"\x00\x1b\x5b\x95\xb1\xb2\xb3\xb4\xb5\xb6\xb7\xb8\xb9\xcc\xdd\xee\xff" @@ -48,45 +48,72 @@ def test_cpiofile_modes(fs): if comp == "xz" and filetype == ":": continue # streaming xz is not implemented (supported only as file) check_archive_mode(filetype + comp, fs) + if filetype == "|": + check_archive_mode(filetype + comp, fs, filename="archive." + comp) -def check_archive_mode(archive_mode, fs): - # type: (str, FakeFilesystem) -> None +def create_cpio_archive(fs, archive_mode, filename=None): + # type: (FakeFilesystem, str, str | None) -> io.BytesIO | None """ - Test CpioFile in the given archive mode with verification of the archive contents. + Create a CpioFile archive with files and directories from a FakeFilesystem. + :param fs: `FakeFilesystem` fixture representing a simulated file system for testing :param archive_mode: The archive mode is a string parameter that specifies the mode in which the CpioFile object should be opened. - :param fs: `FakeFilesystem` fixture representing a simulated file system for testing + :param filename: The name of the file to create the cpio archive """ - # Step 1: Create and populate a cpio archive in a BytesIO buffer - bytesio = io.BytesIO() - archive = CpioFile.open(fileobj=bytesio, mode="w" + archive_mode) - pyfakefs_populate_archive(archive, fs) + cpiofile = None if filename else io.BytesIO() + fs.reset() + cpio = CpioFile.open(name=filename, fileobj=cpiofile, mode="w" + archive_mode) + pyfakefs_populate_archive(cpio, fs) if archive_mode == "|gz": - archive.list(verbose=True) - archive.close() + cpio.list(verbose=True) + cpio.close() + if not cpiofile: + cpio_data = FakeFileOpen(fs)(filename, "rb").read() + fs.reset() + fs.create_file(filename, contents=cast(str, cpio_data)) + return None + fs.reset() + cpiofile.seek(0) + return cpiofile + +def check_archive_mode(archive_mode, fs, filename=None): + # type: (str, FakeFilesystem, str | None) -> None + """ + Test CpioFile in the given archive mode with verification of the archive contents. + + :param archive_mode: The archive mode is a string parameter that specifies the mode + in which the CpioFile object should be opened. + :param fs: `FakeFilesystem` fixture representing a simulated file system for testing + """ # Step 2: Extract the archive in a clean filesystem and verify the extracted contents - fs.reset() - bytesio.seek(0) - archive = CpioFile.open(fileobj=bytesio, mode="r" + archive_mode) + cpiofile = create_cpio_archive(fs, archive_mode, filename) + archive = CpioFile.open(name=filename, fileobj=cpiofile, mode="r" + archive_mode) archive.extractall() pyfakefs_verify_filesystem(fs) - assert archive.getnames() == ["dirname", "dirname/filename", "dir2/symlink"] + assert archive.getnames() == ["dirname", "dirname/filename", "symlink", "dir2/file_2"] dirs = [cpioinfo.name for cpioinfo in archive.getmembers() if cpioinfo.isdir()] files = [cpioinfo.name for cpioinfo in archive.getmembers() if cpioinfo.isreg()] symlinks = [cpioinfo.name for cpioinfo in archive.getmembers() if cpioinfo.issym()] assert dirs == ["dirname"] - assert files == ["dirname/filename"] - assert symlinks == ["dir2/symlink"] - assert archive.getmember(symlinks[0]).linkname == "symlink_target" + assert files == ["dirname/filename", "dir2/file_2"] + assert symlinks == ["symlink"] + assert archive.getmember(symlinks[0]).linkname == "dirname/filename" + + # Test extracting a symlink to a file object: + if archive_mode.startswith("|"): # Non-seekable streams raise StreamError + with pytest.raises(StreamError): + archive.extractfile("symlink") + else: # Expect a seekable fileobj for this test (not a stream) to work: + fileobj = archive.extractfile("symlink") + assert fileobj and fileobj.read() == binary_data archive.close() # Step 3: Extract the archive a second time using another method - fs.reset() - bytesio.seek(0) - archive = CpioFile.open(fileobj=bytesio, mode="r" + archive_mode) + cpiofile = create_cpio_archive(fs, archive_mode, filename) + archive = CpioFile.open(name=filename, fileobj=cpiofile, mode="r" + archive_mode) if archive_mode[0] != "|": for cpioinfo in archive: archive.extract(cpioinfo) @@ -94,7 +121,6 @@ def check_archive_mode(archive_mode, fs): if archive_mode == "|xz": archive.list(verbose=True) archive.close() - bytesio.close() def pyfakefs_populate_archive(archive, fs): @@ -105,11 +131,12 @@ def pyfakefs_populate_archive(archive, fs): :param archive: Instance of the CpioFile class to create a new cpio archive :param fs: `FakeFilesystem` fixture representing a simulated file system for testing """ - fs.reset() fs.create_file("dirname/filename", contents=cast(str, binary_data)) archive.add("dirname", recursive=True) - fs.create_symlink("directory/symlink", "symlink_target") + fs.create_file("directory/file_2", contents=cast(str, binary_data)) + fs.create_symlink("symlink", "dirname/filename") + archive.add("symlink") # Test special code path of archive.add(".", ...): os.chdir("directory") @@ -129,7 +156,10 @@ def pyfakefs_verify_filesystem(fs): :param fs: `FakeFilesystem` fixture representing a simulated file system for testing """ - assert fs.islink("dir2/symlink") + assert fs.islink("symlink") assert fs.isfile("dirname/filename") + assert fs.isfile("dir2/file_2") with FakeFileOpen(fs)("dirname/filename", "rb") as contents: assert contents.read() == binary_data + with FakeFileOpen(fs)("dir2/file_2", "rb") as contents: + assert contents.read() == binary_data diff --git a/xcp/cpiofile.py b/xcp/cpiofile.py index 3eeb209d..995da94c 100644 --- a/xcp/cpiofile.py +++ b/xcp/cpiofile.py @@ -1463,7 +1463,7 @@ def extract(self, member, path=""): self._dbg(1, "cpiofile: %s" % e) def extractfile(self, member): - # type:(CpioInfo) -> ExFileObject | None + # type:(CpioInfo | str | bytes) -> ExFileObject | None """Extract a member from the archive as a file object. `member' may be a filename or a CpioInfo object. If `member' is a regular file, a file-like object is returned. If `member' is a link, a file-like @@ -1489,11 +1489,8 @@ def extractfile(self, member): # A small but ugly workaround for the case that someone tries # to extract a symlink as a file-object from a non-seekable # stream of cpio blocks. - raise StreamError("cannot extract symlink as file object") - else: - # A symlink's file object is its target's file object. - return self.extractfile(self._getmember(cpioinfo.linkname, - cpioinfo)) # type: ignore + raise StreamError("Need a seekable stream to open() a symlink target!") + return self.extractfile(cpioinfo.linkname) else: # If there's no data associated with the member (directory, chrdev, # blkdev, etc.), return None instead of a file object.