-
Notifications
You must be signed in to change notification settings - Fork 4
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #22 from secondlife/signal/detect-type
Add support for detecting archives by signature
- Loading branch information
Showing
10 changed files
with
126 additions
and
42 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
import multiprocessing | ||
import tarfile | ||
import zipfile | ||
|
||
class ArchiveType: | ||
GZ = "gz" | ||
BZ2 = "bz2" | ||
ZIP = "zip" | ||
ZST = "zst" | ||
|
||
|
||
# File signatures used for sniffing archive type | ||
# https://www.garykessler.net/library/file_sigs.html | ||
_ARCHIVE_MAGIC_NUMBERS = { | ||
b"\x1f\x8b\x08": ArchiveType.GZ, | ||
b"\x42\x5a\x68": ArchiveType.BZ2, | ||
b"\x50\x4b\x03\x04": ArchiveType.ZIP, | ||
b"\x28\xb5\x2f\xfd": ArchiveType.ZST, | ||
} | ||
|
||
_ARCHIVE_MAGIC_NUMBERS_MAX = max(len(x) for x in _ARCHIVE_MAGIC_NUMBERS) | ||
|
||
|
||
def _archive_type_from_signature(filename: str): | ||
"""Sniff archive type using file signature""" | ||
with open(filename, "rb") as f: | ||
head = f.read(_ARCHIVE_MAGIC_NUMBERS_MAX) | ||
for magic, f_type in _ARCHIVE_MAGIC_NUMBERS.items(): | ||
if head.startswith(magic): | ||
return f_type | ||
return None | ||
|
||
|
||
def _archive_type_from_extension(filename: str): | ||
if filename.endswith(".tar.gz"): | ||
return ArchiveType.GZ | ||
if filename.endswith(".tar.bz2"): | ||
return ArchiveType.BZ2 | ||
if filename.endswith(".tar.zst"): | ||
return ArchiveType.ZST | ||
if filename.endswith(".zip"): | ||
return ArchiveType.ZIP | ||
return None | ||
|
||
|
||
def detect_archive_type(filename: str): | ||
"""Given a filename, detect its ArchiveType using file extension and signature.""" | ||
f_type = _archive_type_from_extension(filename) | ||
if f_type: | ||
return f_type | ||
return _archive_type_from_signature(filename) | ||
|
||
|
||
def open_archive(filename: str) -> tarfile.TarFile | zipfile.ZipFile: | ||
f_type = detect_archive_type(filename) | ||
|
||
if f_type == ArchiveType.ZST: | ||
return ZstdTarFile(filename, "r") | ||
|
||
if f_type == ArchiveType.ZIP: | ||
return zipfile.ZipFile(filename, "r") | ||
|
||
return tarfile.open(filename, "r") | ||
|
||
|
||
class ZstdTarFile(tarfile.TarFile): | ||
def __init__(self, name, mode='r', *, level=4, zstd_dict=None, **kwargs): | ||
from pyzstd import CParameter, ZstdFile | ||
zstdoption = None | ||
if mode != 'r' and mode != 'rb': | ||
zstdoption = {CParameter.compressionLevel : level, | ||
CParameter.nbWorkers : multiprocessing.cpu_count(), | ||
CParameter.checksumFlag : 1} | ||
self.zstd_file = ZstdFile(name, mode, | ||
level_or_option=zstdoption, | ||
zstd_dict=zstd_dict) | ||
try: | ||
super().__init__(fileobj=self.zstd_file, mode=mode, **kwargs) | ||
except: | ||
self.zstd_file.close() | ||
raise | ||
|
||
def close(self): | ||
try: | ||
super().close() | ||
finally: | ||
self.zstd_file.close() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
import shutil | ||
from os import path | ||
from pathlib import Path | ||
from tests.basetest import temp_dir | ||
|
||
import pytest | ||
from autobuild import archive_utils | ||
|
||
|
||
_DATA_DIR = Path(__file__).parent / "data" | ||
|
||
_ARCHIVE_TEST_CASES = ( | ||
(path.join(_DATA_DIR, "archive.tar.bz2"), archive_utils.ArchiveType.BZ2), | ||
(path.join(_DATA_DIR, "archive.tar.gz"), archive_utils.ArchiveType.GZ), | ||
(path.join(_DATA_DIR, "archive.tar.zst"), archive_utils.ArchiveType.ZST), | ||
(path.join(_DATA_DIR, "archive.zip"), archive_utils.ArchiveType.ZIP), | ||
) | ||
|
||
|
||
@pytest.mark.parametrize("filename,expected_type", _ARCHIVE_TEST_CASES) | ||
def test_detect_from_extension(filename, expected_type): | ||
f_type = archive_utils.detect_archive_type(filename) | ||
assert f_type == expected_type | ||
|
||
|
||
@pytest.mark.parametrize("filename,expected_type", _ARCHIVE_TEST_CASES) | ||
def test_detect_from_signature(filename, expected_type): | ||
with temp_dir() as dir: | ||
filename_no_ext = str(Path(dir) / "archive") | ||
shutil.copyfile(filename, filename_no_ext) | ||
f_type = archive_utils.detect_archive_type(filename_no_ext) | ||
assert f_type == expected_type |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters