-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgitCache.py
183 lines (143 loc) · 5.72 KB
/
gitCache.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# Data for one entry in the git index (.git/index)
import os
import struct
import collections
import hashlib
from helpers import *
"""
Description:
The Cache/Index entry format.
"""
CacheEntry = collections.namedtuple('CacheEntry',
[
'ctime_s', 'ctime_n', 'mtime_s', 'mtime_n', 'dev', 'ino', 'mode', 'uid',
'gid', 'size', 'sha1', 'flags', 'path',
]
)
def getCache():
"""
Description:
Reads the entries of the cache/index, in the CacheEntry format.
Parameters:
None.
Return:
cache (list): list of the cache/index entries.
"""
# if the index file doesn't exist -> return an empty list
try:
cache_data = readFile(os.path.join('.git', 'index'))
except FileNotFoundError:
return []
# get the signature, version, and number of entries from the cache
signature, version, num_entries = struct.unpack('!4sLL', cache_data[:12])
if signature != b'DIRC':
raise Exception('Invalid cache signature %' % signature)
# the cache list will contain all the entries in the git cache
cache = []
cache_entries = cache_data[12:-20]
i = 0
while i + 62 < len(cache_entries):
# unpack the cache data
fields_end = i + 62
fields = struct.unpack('!LLLLLLLLLL20sH',
cache_entries[i:fields_end])
# Get the Null index as the seprator of the path
seperator = cache_entries.index(b'\x00', fields_end)
# get the path of the entry/blob
path = cache_entries[fields_end:seperator]
# put the entry in the format of IndexEntry
entry = CacheEntry(*(fields + (path.decode(),)))
# add the entry to the list of entries extracted from the cache
cache.append(entry)
# calc the entry length
entry_len = ((62 + len(path) + 8) // 8) * 8
# update the loop pointer
i += entry_len
# return a list of all the entries in the cache/index file.
return cache
def listFiles(quiet=False):
"""
Description:
Displays all the files paths from the cache.
Parameters:
[quiet] (boolean): optional parameter, true by default
if print = true -> print the entries paths to the screen
else -> don't print.
Return:
files (list) : list of files paths from the cache.
"""
# get the cache list
cache = getCache()
files = []
# if the cache is empty -> print 'empty' and return an empty list
if not cache:
print('Git index is empty')
# for each entry in the cache => print the entry path
for entry in cache:
if not quiet:
print(entry.path)
files.append(entry.path)
# return list of all the files paths
return files
def getWorkdirState(path = '.'):
"""
Description:
Gets the state of each file in the working dir.
By calc the sha1 hash of the file and compare it to the cache hash for the crosspoding file.
Parameters:
[path] (string): the path of the git repository, path = '.' by default.
Return:
states (tuple) : tuple of the 3 lists
- new => the list of new files
- modified => the list of modified files
- deleted => the list of deleted files
"""
directory_files = set()
for root, dirs, files in os.walk(path):
dirs[:] = [d for d in dirs if d != '.git']
for file in files:
path = os.path.join(root, file)
# if running on windows -> change \\ to / in the path string
path = path.replace('\\', '/')
# if the path is simmilar to ./file -> remove './'
if path.startswith('./'):
path = path[2:]
directory_files.add(path)
# get the cache entries paths
entries_dict = {e.path: e for e in getCache()}
cache_entries_files = set(entries_dict)
# get list of the new files, by subtracting directory files list from the files in the cache
new = directory_files - cache_entries_files
# the modified files set
modified = set()
# get list of the deleted files, by subtracting the list of files in the cache from the directory files list
deleted = cache_entries_files - directory_files
# add the modified files to the list by comparing the sha1 hash of the working dir file and the cache file
for file in (directory_files & cache_entries_files):
if generate_object_hash(readFile(file), 'blob') != entries_dict[file].sha1.hex() :
modified.add(file)
states = (new, modified, deleted)
return states
def writeCache(entries):
"""
Description:
Packs and Writes entries to the cache.
Parameters:
entries (list): list of entries in the format of CacheEntry.
Return:
None.
"""
packed_entries = []
for entry in entries:
head = struct.pack('!LLLLLLLLLL20sH',
entry.ctime_s, entry.ctime_n, entry.mtime_s, entry.mtime_n,
entry.dev, entry.ino, entry.mode, entry.uid, entry.gid,
entry.size, entry.sha1, entry.flags)
path = entry.path.encode()
length = ((62 + len(path) + 8) // 8) * 8
packed_entry = head + path + b'\x00' * (length - 62 - len(path))
packed_entries.append(packed_entry)
header = struct.pack('!4sLL', b'DIRC', 2, len(entries))
all_data = header + b''.join(packed_entries)
digest = hashlib.sha1(all_data).digest()
writeFile(os.path.join('.git', 'index'), all_data + digest)