8
8
import tarfile
9
9
import tempfile
10
10
import zipfile
11
+ from types import TracebackType
11
12
from typing import (
12
13
Any ,
13
14
cast ,
18
19
Optional ,
19
20
overload ,
20
21
Tuple ,
22
+ Type ,
21
23
Union ,
22
24
)
23
25
24
- from typing_extensions import Literal
26
+ from typing_extensions import (
27
+ Literal ,
28
+ Self ,
29
+ )
25
30
26
31
from galaxy .util .path import (
27
32
safe_relpath ,
@@ -167,12 +172,16 @@ def decompress_bytes_to_directory(content: bytes) -> str:
167
172
with tempfile .NamedTemporaryFile (delete = False ) as fp :
168
173
fp .write (content )
169
174
fp .close ()
170
- return CompressedFile (fp .name ).extract (temp_directory )
175
+ with CompressedFile (fp .name ) as cf :
176
+ outdir = cf .extract (temp_directory )
177
+ return outdir
171
178
172
179
173
180
def decompress_path_to_directory (path : str ) -> str :
174
181
temp_directory = tempfile .mkdtemp ()
175
- return CompressedFile (path ).extract (temp_directory )
182
+ with CompressedFile (path ) as cf :
183
+ outdir = cf .extract (temp_directory )
184
+ return outdir
176
185
177
186
178
187
class CompressedFile :
@@ -336,13 +345,24 @@ def isfile(self, member: ArchiveMemberType) -> bool:
336
345
return True
337
346
return False
338
347
339
- def open_tar (self , filepath : StrPath , mode : Literal ["a" , "r" , "w" , "x" ]) -> tarfile .TarFile :
340
- return tarfile .open (filepath , mode , errorlevel = 0 )
348
+ @staticmethod
349
+ def open_tar (file : Union [StrPath , IO [bytes ]], mode : Literal ["a" , "r" , "w" , "x" ] = "r" ) -> tarfile .TarFile :
350
+ if isinstance (file , (str , os .PathLike )):
351
+ tf = tarfile .open (file , mode = mode , errorlevel = 0 )
352
+ else :
353
+ tf = tarfile .open (mode = mode , fileobj = file , errorlevel = 0 )
354
+ # Set a safe default ("data_filter") for the extraction filter if
355
+ # available, reverting to Python 3.11 behavior otherwise, see
356
+ # https://docs.python.org/3/library/tarfile.html#supporting-older-python-versions
357
+ tf .extraction_filter = getattr (tarfile , "data_filter" , (lambda member , path : member ))
358
+ return tf
341
359
342
- def open_zip (self , filepath : StrPath , mode : Literal ["a" , "r" , "w" , "x" ]) -> zipfile .ZipFile :
343
- return zipfile .ZipFile (filepath , mode )
360
+ @staticmethod
361
+ def open_zip (file : Union [StrPath , IO [bytes ]], mode : Literal ["a" , "r" , "w" , "x" ] = "r" ) -> zipfile .ZipFile :
362
+ return zipfile .ZipFile (file , mode )
344
363
345
- def zipfile_ok (self , path_to_archive : StrPath ) -> bool :
364
+ @staticmethod
365
+ def zipfile_ok (path_to_archive : StrPath ) -> bool :
346
366
"""
347
367
This function is a bit pedantic and not functionally necessary. It checks whether there is
348
368
no file pointing outside of the extraction, because ZipFile.extractall() has some potential
@@ -356,6 +376,21 @@ def zipfile_ok(self, path_to_archive: StrPath) -> bool:
356
376
return False
357
377
return True
358
378
379
+ def __enter__ (self ) -> Self :
380
+ return self
381
+
382
+ def __exit__ (
383
+ self ,
384
+ exc_type : Optional [Type [BaseException ]],
385
+ exc_value : Optional [BaseException ],
386
+ traceback : Optional [TracebackType ],
387
+ ) -> bool :
388
+ try :
389
+ self .archive .close ()
390
+ return exc_type is None
391
+ except Exception :
392
+ return False
393
+
359
394
360
395
class FastZipFile (zipfile .ZipFile ):
361
396
"""
0 commit comments