-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclasses.py
369 lines (303 loc) · 11.2 KB
/
classes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
import atexit
import csv
import string
import unicodedata
from pathlib import Path
from datetime import datetime, timedelta
import time
def clean_filename(filename):
"""
Converts a string to a valid windows compliant filename
:param filename:
:return:
"""
# replace spaces
filename = filename.replace(" ", "_")
# keep only valid ascii chars
cleaned_filename = (
unicodedata.normalize("NFKD", filename).encode("ASCII", "ignore").decode()
)
# keep only whitelisted chars
char_limit = 255
whitelist = "-_.() %s%s" % (string.ascii_letters, string.digits)
cleaned_filename = "".join(c for c in cleaned_filename if c in whitelist)
if len(cleaned_filename) > char_limit:
print(
"""
Warning:
Filename truncated because it was over {}. Filename's may no longer be unique.
""".format(
char_limit
)
)
return cleaned_filename[:char_limit]
class CSVLogger:
"""Helper class to manage logging to a CSV"""
__file = None
__writer = None
header_placed = False
def __init__(self, path, persistent_file=False):
self.path = path
self.persistent_file = persistent_file # File connection kept open when true
self.header_placed = True if self.path.exists() else False
@property
def _file(self):
"""
Returns the open _file object
:return:
"""
if not self.__file:
self.__file = open(self.path, "a+", newline="")
return self.__file
@property
def _writer(self):
"""
Returns the file writer
:return:
"""
if not self.__writer:
self.__writer = csv.writer(
self._file, delimiter=",", quotechar='"', quoting=csv.QUOTE_ALL
)
return csv.writer(
self._file, delimiter=",", quotechar='"', quoting=csv.QUOTE_ALL
)
def _write(self, data):
"""
Handles writing to the file, and closing the connection if needed
:param data:
:return:
"""
self._writer.writerow(data)
if not self.persistent_file:
# Close file, and set values to None so they will reopen on the next log
self.__file.close()
self.__file = None
self.__writer = None
def log(self, log_data):
"""
Logging function
Creates the file and places header if file not yet created
:param log_data:
:return:
"""
if not self.header_placed:
self._write(log_data.keys())
self.header_placed = True
self._write(log_data.values())
class NamingProject:
"""Base project class, tracking project details and location"""
def __init__(self, project_name, project_dir=None):
self.project_name = project_name
self.project_stem = clean_filename(self.project_name)
self.project_dir = Path(project_dir) if project_dir else Path.cwd().joinpath(self.project_stem)
# Create the project folder if it does not exist
self.project_dir.mkdir(exist_ok=True)
class SkuLogger:
"""Track a current SKU for logging"""
project = None # Project object
__current_sku = None
current_file_stem = None # What the _file should be named
current_timestamp = None
skus_logged = 0 # count of SKUs logged this session
log_file_type = "csv"
end_flag = "***END***"
end_strings = [
"",
"break",
"end",
"exit",
end_flag,
] # Strings that will end the sku_logger
end = False # Indicates if the script should end
def __init__(self, project):
self.project = project
self.log_file_path = self.project.project_dir.joinpath(
f"{self.project.project_stem}_sku_log.{self.log_file_type}"
)
self.logger = CSVLogger(self.log_file_path)
# Register a method to run when script is ended
# Cases this obj to persist through the end of the script
atexit.register(self.cleanup)
def cleanup(self):
"""
Cleanup method to insert ending flag in log on exit
:return:
"""
try:
if not self.end and self.skus_logged > 0:
self.current_sku = self.end_flag
self.log()
print(f"Updated log with end flag: '{self.end_flag}'.")
except:
print(
"An exception occurred when attempting to insert the ending tag in the log _file.\n"
"The renaming script will assume the end-time for your last SKU is the time you run the rename script,"
"or the next product in the log if one exists."
)
def log(self):
"""
Write the current SKU info to the log _file
:return:
"""
print_log = f"{self.skus_logged} [{self.current_timestamp}] {self.current_sku}"
print_log += (
f" ({self.current_file_stem})"
if self.current_file_stem != self.current_sku
else ""
)
print(print_log)
self.logger.log(
{
"timestamp": self.current_timestamp,
"sku": self.current_sku,
"file_stem": self.current_file_stem,
}
)
@property
def as_time_interval(self):
"""
Converts SKU log data into a list of dicts with start and end times
:return:
"""
if not self.log_file_path.exists():
print(f"There's no _file at '{self.log_file_path}")
else:
time_dict = []
with open(self.log_file_path, "r") as file:
reader = csv.reader(file)
data = list(map(list, reader))
row_count = len(data) - 1
for i, x in enumerate(data):
if (
i > 0 and x[1] != self.end_flag
): # Skip header and don't return ending flags
timestamp = datetime.strptime(x[0], "%Y-%m-%d %H:%M:%S.%f")
sku = x[1]
stem = x[2]
end_time = (
datetime.strptime(data[i + 1][0], "%Y-%m-%d %H:%M:%S.%f")
if i < row_count
else datetime.now()
)
time_dict.append(
{
"sku": sku,
"start_time": timestamp,
"end_time": end_time,
"stem": stem,
}
)
return time_dict
@property
def current_sku(self):
"""
Returns the current SKU
:return:
"""
return self.__current_sku
@current_sku.setter
def current_sku(self, sku):
"""
Sets the current SKU value and the the windows compliant current_file_stem value
:param sku:
:return:
"""
self.current_timestamp = datetime.now()
self.end = (
True if sku.lower() in self.end_strings else False
) # register exit when flag passed
self.__current_sku = sku if not self.end else self.end_flag
self.current_file_stem = clean_filename(self.__current_sku)
self.skus_logged += 1
def run(self):
"""
Start accepting SKU inputs
:return:
"""
while True:
self.current_sku = input("Current SKU: ")
if not self.end:
self.log()
else:
break
class FileRename:
"""Class for handling the renaming of files"""
project = None # NamingProject obj
files_dir = None # Location of the files to rename
recursive = False # Look for files recursively to rename
sku_logger = None
log_file_path = None
log_file_type = "csv"
logger = None # CSVLogger obj
files = None # Files to rename
def __init__(self, project, files_dir, recursive=False, offset=None):
self.project = project
self.files_dir = Path(files_dir)
self.recursive = recursive
self.sku_logger = SkuLogger(project)
self.offset = timedelta(seconds=float(offset)) if offset else offset
self.log_file_path = self.project.project_dir.joinpath(
f"{self.project.project_stem}_rename_log.{self.log_file_type}"
)
self.logger = CSVLogger(self.log_file_path)
if self.recursive:
self.files = [f for f in self.files_dir.glob("**/*") if f.is_file()]
else:
self.files = [f for f in self.files_dir.iterdir()]
def get_sku_by_timestamp(self, timestamp):
"""
Locates the sku based on the files last modified timestamp
:param timestamp:
:return:
"""
for sku_record in self.sku_logger.as_time_interval:
if sku_record["start_time"] <= timestamp <= sku_record["end_time"]:
return sku_record
def run(self):
existing_files = self.files.copy() # List of existing files in the dir
for file in self.files:
# Locate record
file_timestamp = datetime.fromtimestamp(file.lstat().st_mtime)
if self.offset:
file_timestamp += self.offset
sku_record = self.get_sku_by_timestamp(file_timestamp)
# Rename _file according to SKU
if sku_record:
# assemble new file name
rename_path = file.with_name(sku_record["stem"] + file.suffix)
# Add a number to the filename if the file already exists
count = 0
while rename_path in existing_files:
rename_path = file.with_name(
sku_record["stem"] + f" ({count})" + file.suffix
)
count += 1
existing_files.append(rename_path)
self.logger.log(
{
"timestamp": datetime.now(),
"sku": sku_record["sku"],
"original_path": file,
"renamed_path": rename_path,
}
)
file.rename(rename_path)
print(f"Renamed: {file.name} >>> {rename_path.name}")
class Clock:
"""Clock class to display the machine time"""
def __init__(self, sleep=0.04):
while True:
now = datetime.now()
print("Photograph the following value with your camera: ", now, end="\r")
time.sleep(sleep)
class OffsetCalculator:
"""Calculate the offset between machine time and a cameras time"""
def __init__(self, image_path, timestamp):
self.image_path = Path(image_path)
self.file_timestamp = file_timestamp = datetime.fromtimestamp(self.image_path.lstat().st_mtime)
self.input_timestamp = input_timestamp = datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S.%f')
self.offset = input_timestamp - file_timestamp
@property
def as_seconds(self):
return self.offset.total_seconds()