This repository was archived by the owner on Jul 13, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathext2.py
executable file
·635 lines (523 loc) · 20.3 KB
/
ext2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
#! /usr/bin/env python
# encoding=utf8
import struct
import stat
import time
import uuid
import os
import threading as T
__author__ = 'dmytrish'
__version__ = '0.1'
"""
This is a collection of low-level classes for inspecting ext2
internals. In order to use it effectively you need understanding
the ext2 data layout. The main goal of this tool is to be a tool
for manual manipulation on an ext2 filesystem (possibly broken or
nonstandard).
Class ext2fs tries to provide user-friendly and abstract interface to
filesystem manipulation routines.
How to start (see ext2.ext2fs.__doc__ for details):
>>> import ext2
>>> e2 = ext2.ext2fs( '/path/to/ext2/image/or/device' )
Getting information about FS:
>>> print e2.sb # this is the superblock information
>>> print e2.space_bytes(), e2.free_space_bytes()
>>> print e2.sb.uuid, e2.sb.name
List a directory on the FS:
>>> print e2.ls('') # root directory of the FS
>>> print e2.ls('/') # the same
>>> print e2.ls('linux/fs/ext2')
>>> print e2.ls('/boot/grub')
Copy file to and fro:
>>> e2.pull('boot/grub/menu.lst', '.')
>>> e2.push('~/code/py/ext2', '/src/')
(l) License: beerware.
You use this script at your own peril and must not moan for its speed.
"""
stat_full_rights = 'rwxrwxrwx'
stat_filetype = {
stat.S_IFDIR: 'd', stat.S_IFREG: '-', stat.S_IFLNK: 'l',
stat.S_IFCHR: 'c', stat.S_IFBLK: 'b', stat.S_IFSOCK: 's',
stat.S_IFIFO: 'p'
}
struct.intsz = struct.calcsize('I')
def unpack_struct(fmt, strct, s):
val_tuple = struct.unpack(fmt, s[:struct.calcsize(fmt)])
return dict(zip(strct, val_tuple))
def unpack_int_at(s, index):
return struct.unpack_from('1I', s, index * struct.intsz)[0]
def time_format(unix_time):
return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(unix_time))
class Ext2Exception(Exception):
pass
class E2IO:
def __init__(self, source):
self.f = open(source, 'rb')
self._lock = T.Lock()
self._b_lock = T.Lock()
# self.blksz must be read from the file, so setting it later:
def set_blksz(self, blksz):
self.blksz = blksz
def close(self):
self.f.close()
def read_block(self, block_num):
self._b_lock.acquire()
self._go_to_block(block_num)
buf = self.f.read(self.blksz)
self._b_lock.release()
return buf
def read(self, count):
self._b_lock.acquire()
buf = self.f.read(count)
self._b_lock.release()
return buf
def read_at(self, count, offset=0, whence=os.SEEK_SET):
self._b_lock.acquire()
self.f.seek(offset, whence)
buf = self.f.read(count)
self._b_lock.release()
return buf
def lock(self): self._lock.acquire()
def unlock(self): self._lock.release()
def _go_to_block(self, block_num):
self.f.seek(block_num * self.blksz)
class e2dentry:
d_fmt = 'IHBB'
fmt_size = struct.calcsize(d_fmt)
d_flds = ('d_inode', 'd_entry_size', 'd_namelen', 'd_filetype')
stattype = [0, stat.S_IFREG, stat.S_IFDIR, stat.S_IFCHR,
stat.S_IFBLK, stat.S_IFIFO, stat.S_IFSOCK, stat.S_IFLNK]
def __init__(self, io, offset):
io.lock()
byte_array = io.read_at(self.fmt_size, offset)
self.d = unpack_struct(self.d_fmt, self.d_flds, byte_array)
self.inode = self.d['d_inode']
self.size = self.d['d_entry_size']
raw_name_size = self.size - struct.calcsize(self.d_fmt)
self.name = io.read(raw_name_size)
io.unlock()
self.name = self.name[: self.d['d_namelen']]
try:
self.ftype = self.stattype[self.d['d_filetype']]
except IndexError:
raise Ext2Exception(
'Invalid file type %d for dentry %s' %
(self.d['d_filetype'], self.name))
class e2directory:
def __init__(self, io, inode):
if not inode.is_directory():
raise Ext2Exception('Not a directory: ' % inode.name)
# TODO: directory might be more than 1 block
self.ent = []
bytes_read = 0
while bytes_read < io.blksz:
offset = (inode.block_at(0) * io.blksz) + bytes_read
e = e2dentry(io, offset)
self.ent.append(e)
bytes_read += e.d['d_entry_size']
def ent_by_name(self, name):
for e in self.ent:
if e.name == name:
return e
return None
def ls(self):
for e in self.ent:
line = stat_filetype[e.ftype]
line += '\t' + str(e.inode)
line += '\t' + e.name
print line
class e2inode:
i_fmt = '2H5I2H3I12I3I'
i_flds = (
'i_mode', 'i_uid', 'i_size',
# time
'i_atime', 'i_ctime', 'i_mtime', 'i_dtime',
'i_gid', 'i_links_count', 'i_blocks',
'i_flags', 'i_osspec1',
# direct block pointers
'i_db0', 'i_db1', 'i_db2', 'i_db3', 'i_db4', 'i_db5',
'i_db6', 'i_db7', 'i_db8', 'i_db9', 'i_db10', 'i_db11',
# single-, double-, tripple- indirect block pointers
'i_i1b', 'i_i2b', 'i_i3b'
)
EXT2_NDIR_BLOCKS = 12
EXT2_N_BLOCKS = 15
def __init__(self, ino_num, io, offset, inosz):
self.index = ino_num
self.i_size = inosz
byte_array = io.read_at(self.i_size, offset)
self.d = unpack_struct(self.i_fmt, self.i_flds, byte_array)
self.uid = self.d['i_uid']
self.gid = self.d['i_gid']
self.n_length = self.d['i_size']
self.mode = self.d['i_mode']
self.nlink = self.d['i_links_count']
if not self.is_short_link():
self.block_list = self._build_block_list(io)
def _build_block_list(self, io):
"""return list of absolute block addresses for the inode"""
def list_of_indirects(ib):
bl = []
for i in xrange(io.blksz / struct.intsz):
bn = unpack_int_at(ib, i)
if bn is 0:
return bl
bl.append(bn)
return bl
def list_of_double_indirects(dib):
dibl = []
for i in xrange(io.blksz / struct.intsz):
ibn = unpack_int_at(dib, i)
if ibn is 0:
return dibl
ib = io.read_block(ibn)
dibl.extend(list_of_indirects(ib))
return dibl
blocks = []
for i in range(e2inode.EXT2_NDIR_BLOCKS):
block_num = self.d['i_db' + str(i)]
if block_num == 0:
return blocks
blocks.append(block_num)
i1b_num = self.d['i_i1b']
if i1b_num is 0:
return blocks
i1b = io.read_block(i1b_num)
blocks.extend(list_of_indirects(i1b))
i2b_num = self.d['i_i2b']
if i2b_num is 0:
return blocks
i2b = io.read_block(i2b_num)
blocks.extend(list_of_double_indirects(i2b))
i3b_num = self.d['i_i3b']
if i3b_num is 0:
return blocks
i3b = io.read_block(i3b_num)
for i in xrange(io.blksz / struct.intsz):
dibn = unpack_int_at(i3b, i)
if dibn is 0:
return blocks
dib = io.read_block(dibn)
blocks.extend(list_of_double_indirects(dib))
print 'it''s kinda strange to reach this point'
print 'is the inode really that long?'
return blocks
def get_mode(self):
rights = ''
for i in range(9):
if (1 << (8 - i)) & self.mode:
rights += stat_full_rights[i]
else:
rights += '-'
return stat_filetype[stat.S_IFMT(self.mode)] + rights
def is_directory(self):
return stat.S_IFMT(self.mode) == stat.S_IFDIR
def is_device(self):
return stat.S_IFMT(self.mode) in (stat.S_IFCHR, stat.S_IFBLK)
def is_link(self):
return stat.S_IFMT(self.mode) == stat.S_IFLNK
def is_short_link(self):
if self.n_length > struct.intsz * self.EXT2_N_BLOCKS:
return False
return self.is_link()
def block_at(self, fileblock):
''' absolute block number from relative in-file block number '''
try:
return self.block_list[fileblock]
except IndexError:
raise Ext2Exception('Invalid file block number %d for inode %d'
% (fileblock, self.index))
def get_block_list(self):
return self.block_list
def blocks_as_string(self):
""" this method is used for reading in-place links, up to 60 chars """
s = ''
for i in range(e2inode.EXT2_N_BLOCKS):
if i < e2inode.EXT2_NDIR_BLOCKS:
b = self.d['i_db' + str(i)]
else:
b = self.d['i_i%db' % (1 + i - e2inode.EXT2_NDIR_BLOCKS)]
if b == 0:
break
s += struct.pack('I', b)
return s.strip('\0')
def device_id(self):
dev = self.d['i_db0']
return (os.major(dev), os.minor(dev))
def __str__(self):
res = self.get_mode()
res += ' %3d' % self.d['i_links_count']
res += ' %4d:%d\t' % (self.uid, self.gid)
if self.is_device(): # devices need DevID formatting
res += ' (%2d,%2d)' % self.device_id()
else:
res += ' %10d' % self.n_length
res += ' %s' % time_format(self.d['i_ctime'])
return res
class e2group_descriptor:
gd_size = 32
gd_fmt = '3I3H'
gd_flds = (
'bg_block_bitmap', 'bg_inode_bitmap', 'bg_inode_table',
'bg_free_blocks_count', 'bg_free_inodes_count', 'bg_used_dirs_count'
)
def check_range(self, x, description):
if not ((self.start <= x) and (x < self.end)):
raise Ext2Exception('Bad %s block %d for block_group_desc[%d]'
% (description, x, self.index))
def check(self):
self.check_range(self.block_bitmap, 'blockbitmap')
self.check_range(self.inode_bitmap, 'inodebitmap')
self.check_range(self.inode_table, 'inodetable')
def __init__(self, fs, offset, index):
self.index = index
byte_array = fs.io.read_at(self.gd_size, offset + index * self.gd_size)
self.d = unpack_struct(self.gd_fmt, self.gd_flds, byte_array)
self.block_bitmap = self.d['bg_block_bitmap']
self.inode_bitmap = self.d['bg_inode_bitmap']
self.inode_table = self.d['bg_inode_table']
self.start = fs.sb.boot_block + self.index * fs.sb.blocks_in_grp
self.end = self.start + fs.sb.blocks_in_grp
self.check()
class e2superblock:
file_offset = 1024
sb_size = 1024
ext2magic = 0xef53
root_dir_inode = 2
sb_fmt = '13I6H4I2HI2H3I16s16s64sI2B'
sb_keys = (
's_inodes_count', 's_blocks_count', 's_r_blocks_count',
's_free_blocks_count', 's_free_inodes_count', 's_first_data_block',
's_log_block_size', 's_log_frag_size', 's_blocks_per_group',
's_frags_per_group', 's_inodes_per_group', 's_mtime',
's_wtime', 's_mnt_count', 's_max_mnt_count',
's_magic', 's_state', 's_errors',
's_minor_rev_level', 's_lastcheck', 's_checkinterval',
's_creator_os', 's_rev_level', 's_def_resuid',
's_def_resgid', 's_first_ino', 's_inode_size',
's_block_group_nr', 's_feature_compat', 's_feature_incompat',
's_feature_ro_compat', 's_uuid', 's_volume_name',
's_last_mounted', 's_algorithm_usage_bitmap', 's_prealloc_block',
's_prealloc_dir_blocks'
)
def __init__(self, io):
byte_array = io.read_at(self.sb_size, self.file_offset)
self.d = unpack_struct(self.sb_fmt, self.sb_keys, byte_array)
if self.d['s_magic'] != self.ext2magic:
raise Ext2Exception('Invalid ext2 superblock: the magic is bad')
self.blksz = 1024 << self.d['s_log_block_size']
io.set_blksz(self.blksz)
self.n_inodes = self.d['s_inodes_count']
self.n_blocks = self.d['s_blocks_count']
self.n_free_inodes = self.d['s_free_inodes_count']
self.n_free_blocks = self.d['s_free_blocks_count']
self.blocks_in_grp = self.d['s_blocks_per_group']
self.inodes_in_grp = self.d['s_inodes_per_group']
self.boot_block = self.d['s_first_data_block']
self.name = str(self.d['s_volume_name']).strip('\0')
self.uuid = str(uuid.UUID(bytes=self.d['s_uuid']))
self.check()
def check(self):
pass
def __str__(self):
res = ''
for k in self.d:
v = str(self.d[k])
if k == 's_uuid':
v = self.uuid
elif k in ('s_lastcheck', 's_wtime'):
v = time_format(self.d[k])
res += ('%s = %s\n' % (k, v))
return res
def block_size(self):
return self.blksz
def inode_size(self):
if self.d['s_rev_level'] > 0:
return self.d['s_inode_size']
return 128
class ext2fs:
""" an ext2fs object represents a mounted ext2 file system.
"""
def __init__(self, filename):
self.io = E2IO(filename)
self.sb = e2superblock(self.io)
self._blksz = self.sb.block_size()
self._indsz = self.sb.inode_size()
self._bgd = self._blkgrps_read()
self.root = self._inode(self.sb.root_dir_inode)
def umount(self):
self.io.close()
def _blkgrps_read(self):
self._n_blkgrps = self.sb.n_blocks / self.sb.blocks_in_grp
if self.sb.n_blocks % self.sb.blocks_in_grp:
self._n_blkgrps += 1
offset = (1 + self.sb.boot_block) * self._blksz
bgd = []
for i in range(self._n_blkgrps):
bgd.append(e2group_descriptor(self, offset, i))
return bgd
def _inode(self, ino_num):
""" construct and read e2inode for index #ino_num"""
group_index = (ino_num - 1) % self.sb.inodes_in_grp
bg = self._bgd[(ino_num - 1) / self.sb.inodes_in_grp]
offset = bg.inode_table * self._blksz # go to inode table
offset += group_index * self._indsz
return e2inode(ino_num, self.io, offset, self._indsz)
def _ent_by_path(self, pathto):
if pathto == '/':
return e2directory(self.io, self.root).ent_by_name('.')
path_array = pathto.split('/')
while path_array.count(''):
path_array.remove('')
inode = self.root
dentry = None
for fname in path_array:
dentry = e2directory(self.io, inode).ent_by_name(fname)
if dentry is None:
raise Ext2Exception(
'Name lookup failed for "%s" in "%s"' % (fname, pathto))
inode = self._inode(dentry.inode)
return dentry
def _inode_by_path(self, pathto):
""" return e2inode for path 'pathto' """
if pathto == '/':
return self.root
path_array = pathto.split('/')
while path_array.count(''):
path_array.remove('')
inode = self.root
for fname in path_array:
dentry = e2directory(self.io, inode).ent_by_name(fname)
if dentry is None:
raise Ext2Exception(
'Name lookup failed for "%s" in "%s"' % (fname, pathto))
inode = self._inode(dentry.inode)
return inode
def _dir_by_inode(self, ino_num):
return e2directory(self.io, self._inode(ino_num))
def free_space_bytes(self):
return self.sb.n_free_blocks * self._blksz
def space_bytes(self):
return self.sb.n_blocks * self._blksz
def ls(self, pathname, opts=''):
"""list files in 'pathname' like 'ls -l'
The second argument controls listing format, options:
'i' - output inodes, e.g. fs.ls('dir/subdir', 'i')
"""
def print_dentry(dentry):
if opts.count('i'):
print self._inode(e.inode), '%8d' % e.inode, e.name
else:
print self._inode(e.inode), e.name
inode = self._inode_by_path(pathname)
if inode.is_directory():
d = e2directory(self.io, inode)
for e in d.ent:
print_dentry(e)
else:
print inode
def pull(self, fspath, to_file):
"""copy file from ext2 image at 'fspath' to external file 'to_file' """
inode = self._inode_by_path(fspath)
try:
st = os.stat(to_file)
if stat.S_IFDIR == stat.S_IFMT(st.st_mode):
to_file += '/' + fspath.split('/')[-1]
except OSError:
pass
bytes_written = 0
destination = open(to_file, 'wb')
for block in inode.get_block_list():
bytes_to_copy = min(inode.n_length - bytes_written, self._blksz)
if bytes_to_copy <= 0:
destination.close()
os.remove(to_file)
raise Ext2Exception('Redundant blocks in file %s' % fspath)
piece = self.io.read_block(block)[:bytes_to_copy]
destination.write(piece)
bytes_written += bytes_to_copy
destination.close()
def read(self, fspath, offset, bytes_count):
if bytes_count <= 0 or offset < 0:
return ''
inode = self._inode_by_path(fspath)
end_offset = offset + bytes_count
if end_offset > inode.n_length:
end_offset = inode.n_length
bytes_count = inode.n_length - offset
start_fileblock = offset / self._blksz
end_fileblock = end_offset / self._blksz
start_block_offset = offset % self._blksz
start_block = inode.block_at(start_fileblock)
contents = self.io.read_block(start_block)[start_block_offset:]
if start_block_offset + bytes_count <= self._blksz:
return contents[:bytes_count]
for i in range(start_fileblock + 1, end_fileblock):
contents += self.io.read_block(inode.block_at(i))
end_block_bytes = end_offset % self._blksz
if end_block_bytes:
end_block = inode.block_at(end_fileblock)
contents += self.io.read_block(end_block)[:end_block_bytes]
return contents
def readlink(self, path):
inode = self._inode_by_path(path)
if inode.is_short_link():
# in-place link, less than or equal to 60 characters
return inode.blocks_as_string()
# otherwise: long link with its own blocks
s = ''
for b in inode.get_block_list():
sb = self.io.read_block(b)
s += sb.split('\0')[0]
if sb.count('\0'):
break
return s
def push(self, from_file, to_fspath):
""" write an external file 'from_file' to ext2 path 'fspath' """
# TODO
pass
def usage():
print 'Usage: %s /path/to/ext2/img/or/device> <action>' % sys.argv[0]
print '<action>s:'
print ' info'
print ' ls <path>'
print ' cp <from/image> <outside/file>'
if '__main__' == __name__:
import sys
if len(sys.argv) < 3:
usage()
sys.exit(-1)
imgfile = sys.argv[1]
try:
e2fs = ext2fs(imgfile)
except IOError:
print 'No such file: %s' % imgfile
sys.exit(-2)
if sys.argv[2] == 'info':
if e2fs.sb.d['s_state'] > 1:
print 'State: %d' % e2fs.sb.d['s_state']
print('UUID: %s, Label: "%s"' % (e2fs.sb.uuid.__str__, e2fs.sb.name))
print('Total space: %d, free space: %d bytes'
% (e2fs.space_bytes(), e2fs.free_space_bytes()))
print('Block size: %d' % e2fs.sb.block_size())
print('Total inodes: %d, free inodes: %d'
% (e2fs.sb.n_inodes, e2fs.sb.n_free_inodes))
print('Last mounted: %s' % time_format(e2fs.sb.d['s_last_mounted']))
print('Mounted %d times without check, checked every %d time'
% (e2fs.sb.d['s_mnt_count'], e2fs.sb.d['s_max_mnt_count']))
print('last checked %s, check interval is %d'
% (time_format(e2fs.sb.d['s_lastcheck']),
time_format(e2fs.sb.d['s_checkinterval'])))
print('')
elif sys.argv[2] == 'ls':
if len(sys.argv) < 4:
usage()
else:
e2fs.ls(sys.argv[3])
elif sys.argv[2] == 'cp':
if len(sys.argv) < 5:
usage()
else:
e2fs.pull(sys.argv[3], sys.argv[4])
else:
usage()