forked from getninjas/unix_ar
-
Notifications
You must be signed in to change notification settings - Fork 0
/
unix_ar.py
373 lines (316 loc) · 12.1 KB
/
unix_ar.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
import os
import io
import struct
__version__ = '0.2.1'
_open = open
CHUNKSIZE = 4096
def utf8(s):
"""
Keeps bytes, converts unicode into UTF-8.
This is used for filenames, which the user may supply as unicode, but is
always stored as bytes in the archive.
"""
if isinstance(s, bytes):
return s
else:
return s.encode('utf-8')
class ArInfo(object):
"""
Information on a file in an archive.
This has the filename and all the metadata for a file in an archive.
It is returned by :meth:`~unix_ar.ArFile.infolist()` and
:meth:`~unix_ar.ArFile.getinfo()`, and can be passed when adding or
extracting a file to or from the archive.
Missing fields will be autocompleted when passed to `ArFile`, but note that
things like `size` will be respected, allowing you to store or extract only
part of a file.
`ArInfo` objects returned by `ArFile` have the offset to the file in the
archive, allowing to extract the correct one even if multiple files with
the same name are present; if you change the `name` attribute, the initial
file will be extracted with the new name (and new metadata).
"""
def __init__(self, name, size=None, mtime=None, perms=None, uid=None, gid=None):
self.name = name
self.size = size
self.mtime = mtime
self.perms = perms
self.uid = uid
self.gid = gid
self.offset = None
@property
def name(self):
return self._name
@name.setter
def name(self, value):
self._name = utf8(value)
@classmethod
def frombuffer(cls, buffer):
"""
Decode the archive header.
"""
# 0 16 File name ASCII
# 16 12 File modification timestamp Decimal
# 28 6 Owner ID Decimal
# 34 6 Group ID Decimal
# 40 8 File mode Octal
# 48 10 File size in bytes Decimal
# 58 2 File magic 0x60 0x0A
name, mtime, uid, gid, perms, size, magic = (struct.unpack('16s12s6s6s8s10s2s', buffer))
name = utf8(name).rstrip(b' ')
mtime = int(mtime, 10)
uid = int(uid, 10)
gid = int(gid, 10)
perms = int(perms, 8)
size = int(size, 10)
if magic != b'\x60\n':
raise ValueError("Invalid file signature")
return cls(name, size, mtime, perms, uid, gid)
def tobuffer(self):
"""
Encode as an archive header.
"""
if any(f is None for f in (self._name, self.mtime, self.uid, self.gid, self.perms, self.size)):
raise ValueError("ArInfo object has None fields")
return (
'{0: <16}{1: <12}{2: <6}{3: <6}{4: <8o}{5: <10}\x60\n'.format(
self.name.decode('iso-8859-1'), self.mtime, self.uid, self.gid, self.perms, self.size
).encode('iso-8859-1')
)
def updatefromdisk(self, path=None):
"""
Fill in the missing attributes from an actual file.
This is called by `ArFile` when adding a file to the archive. If some
attributes are missing, they have to be provided from the disk.
If the file doesn't exist, adding will fail. There is currently no
default values for the attributes, it is thus your responsibility to
provide them.
"""
attrs = (self._name, self.size, self.mtime, self.perms, self.uid, self.gid)
if not any(a is None for a in attrs):
return self.__class__(*attrs)
name, size, mtime, perms, uid, gid = attrs
if path is None:
path = name
stat = os.stat(path)
if name is None:
name = utf8(path)
if size is None:
size = stat.st_size
if mtime is None:
mtime = int(stat.st_mtime)
if perms is None:
perms = stat.st_mode
if uid is None:
uid = stat.st_uid
if gid is None:
gid = stat.st_gid
return self.__class__(name, size, mtime, perms, uid, gid)
def __copy__(self):
member = self.__class__(self._name, self.size, self.mtime, self.perms, self.uid, self.gid)
member.offset = self.offset
return member
class ArFile(object):
"""
An UNIX ar archive.
This object allows you to either read or write an AR archive.
"""
def __init__(self, file, mode='r'):
"""
Create an `ArFile` from an opened file (in 'rb' or 'wb' mode).
Don't use this constructor, call :func:`unix_ar.open()` instead.
"""
self._file = file
self._mode = mode
if mode == 'r':
self._read_entries()
elif mode == 'w':
self._file.write(b'!<arch>\n')
else:
raise ValueError("mode must be one of 'r' or 'w'")
def _read_entries(self):
if self._file.read(8) != b'!<arch>\n':
raise ValueError("Invalid archive signature")
self._entries = []
self._name_map = {}
pos = 8
while True:
buffer = self._file.read(60)
if len(buffer) == 0:
break
elif len(buffer) == 60:
member = ArInfo.frombuffer(buffer)
member.offset = pos
self._name_map[member.name] = len(self._entries)
self._entries.append(member)
skip = member.size
if skip % 2 != 0:
skip += 1
pos += 60 + skip
self._file.seek(skip, 1)
if pos == self._file.tell():
continue
raise ValueError("Truncated archive?")
def _check(self, expected_mode):
if self._file is None:
raise ValueError("Attempted to use a closed %s" % self.__class__.__name__)
if self._mode != expected_mode:
if self._mode == 'r':
raise ValueError("Can't change a read-only archive")
else:
raise ValueError("Can't read from a write-only archive")
def add(self, name, arcname=None):
"""
Add a file to the archive.
:param name: Path to the file to be added.
:type name: bytes | unicode
:param arcname: Name the file will be stored as in the archive, or
a full :class:`~unix_ar.ArInfo`. If unset, `name` will be used.
:type arcname: None | bytes | unicode | unix_ar.ArInfo
"""
self._check('w')
if arcname is None:
arcname = ArInfo(name)
elif not isinstance(arcname, ArInfo):
arcname = ArInfo(arcname)
arcname = arcname.updatefromdisk(name)
with _open(name, 'rb') as fp:
self.addfile(arcname, fp)
def addfile(self, name, fileobj=None):
"""
Add a file to the archive from a file object.
:param name: Name the file will be stored as in the archive, or
a full :class:`~unix_ar.ArInfo`.
:type name: bytes | unicode | unix_ar.ArInfo
:param fileobj: File object to read from.
"""
self._check('w')
if not isinstance(name, ArInfo):
name = ArInfo(name)
name = name.updatefromdisk()
self._file.write(name.tobuffer())
if fileobj is None:
fp = _open(name.name, 'rb')
else:
fp = fileobj
for pos in range(0, name.size, CHUNKSIZE):
chunk = fp.read(min(CHUNKSIZE, name.size - pos))
if len(chunk) != CHUNKSIZE and len(chunk) != name.size - pos:
raise RuntimeError("File changed size?")
self._file.write(chunk)
if name.size % 2 == 1:
self._file.write(b'\n')
if fileobj is None:
fp.close()
def infolist(self):
"""
Return a list of :class:`~unix_ar.ArInfo` for files in the archive.
These objects are copy, so feel free to change them before feeding them
to :meth:`~unix_ar.ArFile.add()` or :meth:`~unix_ar.ArFile.addfile()`.
:rtype: [unix_ar.ArInfo]
"""
self._check('r')
return list(i.__copy__() for i in self._entries)
def getinfo(self, member):
"""
Return an :class:`~unix_ar.ArInfo` for a specific file.
This object is a copy, so feel free to change it before feeding them to
:meth:`~unix_ar.ArFile.add()` or :meth:`~unix_ar.ArFile.addfile()`.
:param member: Either a file name or an incomplete
:class:`unix_ar.ArInfo` object to search for.
:type member: bytes | unicode | unix_ar.ArInfo
:rtype: unix_ar.ArInfo
"""
self._check('r')
if isinstance(member, ArInfo):
if member.offset is not None:
self._file.seek(member.offset, 0)
return ArInfo.frombuffer(self._file.read(60))
else:
index = self._name_map[member.name]
return self._entries[index].__copy__()
else:
index = self._name_map[utf8(member)]
return self._entries[index].__copy__()
def _extract(self, member, path):
if hasattr(path, 'write'):
fp = path
else:
fp = _open(path.rstrip(b'/'), 'wb')
self._file.seek(member.offset + 60, 0)
for pos in range(0, member.size, CHUNKSIZE):
chunk = self._file.read(min(CHUNKSIZE, member.size - pos))
fp.write(chunk)
fp.flush()
fp.seek(0)
return fp
def extract(self, member, path='') -> 'filelike':
"""
Extract a single file from the archive.
:param member: Either a file name or an :class:`unix_ar.ArInfo` object
to extract.
:type member: bytes | unicode | unix_ar.ArInfo
:param path: Destination path (current directory by default). You can
also change the `name` attribute on the `ArInfo` you pass this
method to extract to any file name.
:type path: bytes | unicode
"""
self._check('r')
actualmember = self.getinfo(member)
if isinstance(member, ArInfo):
if member.offset is None:
member.offset = actualmember.offset
if member.size > actualmember.size:
member.size = actualmember.size
else:
member = actualmember
if not hasattr(path, 'write'):
if not path or os.path.isdir(path):
path = os.path.join(utf8(path), member.name)
return self._extract(member, path)
def extractfile(self, member):
self._check('r')
raise NotImplementedError("extractfile() is not yet implemented")
def extractall(self, path=''):
"""
Extract all the files in the archive.
:param path: Destination path (current directory by default).
:type path: bytes | unicode
"""
self._check('r')
# Iterate on _name_map instead of plain _entries so we don't extract
# multiple files with the same name, just the last one
for index in self._name_map.values():
member = self._entries[index]
self._extract(member, os.path.join(utf8(path), member.name))
def open(self, member: str) -> io.BytesIO:
filelike = self.extract(member, path=io.BytesIO())
filelike.name = member.strip('/')
return filelike
def close(self):
"""
Close this archive and the underlying file.
No method should be called on the object after this.
"""
if self._file is not None:
self._file.close()
self._file = None
self._entries = None
self._name_map = None
def open(file, mode='r'):
"""
Open an archive file.
:param file: File name to open.
:type file: bytes | unicode
:param mode: Either ''r' or 'w'
:rtype: unix_ar.ArFile
"""
if hasattr(file, 'read'):
return ArFile(file, mode)
else:
if mode == 'r' or mode == 'rb':
omode = 'rb'
elif mode == 'w' or mode == 'wb':
omode = 'wb'
else:
raise ValueError("mode must be one of 'r' or 'w'")
return ArFile(_open(file, omode), mode)