-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathkomga_cover_extractor.py
12044 lines (10295 loc) · 454 KB
/
komga_cover_extractor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import argparse
import hashlib
import io
import os
import re
import shutil
import string
import subprocess
import sys
import tempfile
import threading
import traceback
import time
import urllib.request
import xml.etree.ElementTree as ET
import zipfile
from base64 import b64encode
from datetime import datetime
from difflib import SequenceMatcher
from functools import lru_cache
from posixpath import join
from urllib.parse import urlparse
import cProfile
import cv2
import filetype
import numpy as np
import py7zr
import rarfile
import regex as re
import requests
import scandir
from bs4 import BeautifulSoup
from discord_webhook import DiscordEmbed, DiscordWebhook
from lxml import etree
from PIL import Image
from skimage.metrics import structural_similarity as ssim
from titlecase import titlecase
from unidecode import unidecode
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
from settings import *
# Get all the variables in settings.py
import settings as settings_file
# Version of the script
script_version = (2, 5, 26)
script_version_text = "v{}.{}.{}".format(*script_version)
# Paths = existing library
# Download_folders = newly acquired manga/novels
paths = []
download_folders = []
# paths within paths that were passed in with a defined path_type
# EX: "volume" or "chapter"
paths_with_types = []
# download folders within download_folders that were passed in with a defined path_type
download_folders_with_types = []
# global folder_accessor
folder_accessor = None
# To compress the extracted images
compress_image_option = False
# Default image compression value.
# Pass in via cli
image_quality = 40
# Stat-related variables
image_count = 0
errors = []
items_changed = []
# A discord webhook url used to send messages to discord about the changes made.
# Pass in via cli
discord_webhook_url = []
# Two webhooks specific to the bookwalker check.
# One is used for released books, the other is used for upcoming books.
# Intended to be sent to two seperate channels.
# FIRST WEBHOOK = released books
# SECOND WEBHOOK = upcoming books
bookwalker_webhook_urls = []
# Checks the library against bookwalker for new releases.
bookwalker_check = False
# All the release groups stored in release_groups.txt
# Used when renaming files where it has a matching group.
release_groups = []
# All the publishers stored in publishers.txt
# Used when renaming files where it has a matching publisher.
publishers = []
# skipped files that don't have a release group
skipped_release_group_files = []
# skipped files that don't have a publisher
skipped_publisher_files = []
# A quick and dirty fix to avoid non-processed files from
# being moved over to the existing library. Will be removed in the future.
processed_files = []
# Any files moved to the existing library. Used for triggering a library scan in komga.
moved_files = []
# The script's root directory
ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
# Where logs are written to.
LOGS_DIR = os.path.join(ROOT_DIR, "logs")
# Where the addon scripts are located.
ADDONS_DIR = os.path.join(ROOT_DIR, "addons")
# Docker Status
in_docker = False
# Check if the instance is running in docker.
# If the ROOT_DIR is /app, then it's running in docker.
if ROOT_DIR == "/app":
in_docker = True
script_version_text += " • Docker"
# The path location of the blank_white.jpg in the root of the script directory.
blank_white_image_path = (
os.path.join(ROOT_DIR, "blank_white.jpg")
if os.path.isfile(os.path.join(ROOT_DIR, "blank_white.jpg"))
else None
)
blank_black_image_path = (
os.path.join(ROOT_DIR, "blank_black.png")
if os.path.isfile(os.path.join(ROOT_DIR, "blank_black.png"))
else None
)
# Cached paths from the users existing library. Read from cached_paths.txt
cached_paths = []
cached_paths_path = os.path.join(LOGS_DIR, "cached_paths.txt")
# Cached identifier results, aka successful matches via series_id or isbn
cached_identifier_results = []
# watchdog toggle
watchdog_toggle = False
# 7zip extensions
seven_zip_extensions = [".7z"]
# Zip extensions
zip_extensions = [
".zip",
".cbz",
".epub",
]
rar_extensions = [".rar", ".cbr"]
# Accepted file extensions for novels
novel_extensions = [".epub"]
# Accepted file extensions for manga
manga_extensions = [x for x in zip_extensions if x not in novel_extensions]
# All the accepted file extensions
file_extensions = novel_extensions + manga_extensions
# All the accepted convertable file extensions for convert_to_cbz(),
# and the watchdog handler.
convertable_file_extensions = seven_zip_extensions + rar_extensions
# All the accepted image extensions
image_extensions = {".jpg", ".jpeg", ".png", ".tbn", ".webp"}
# Type of file formats for manga and novels
file_formats = ["chapter", "volume"]
# stores our folder path modification times
# used for skipping folders that haven't been modified
# when running extract_covers() with watchdog enabled
root_modification_times = {}
# Stores all the new series paths for series that were added to an existing library
moved_folders = []
# Profiles the execution - for dev use
profile_code = ""
# get all of the non-callable variables
settings = [
var
for var in dir(settings_file)
if not callable(getattr(settings_file, var)) and not var.startswith("__")
]
# Libraries to be scanned after files have been moved over.
libraries_to_scan = []
# Library Type class
class LibraryType:
def __init__(
self, name, extensions, must_contain, must_not_contain, match_percentage=90
):
self.name = name
self.extensions = extensions
self.must_contain = must_contain
self.must_not_contain = must_not_contain
self.match_percentage = match_percentage
# Convert the object to a string representation
def __str__(self):
return f"LibraryType(name={self.name}, extensions={self.extensions}, must_contain={self.must_contain}, must_not_contain={self.must_not_contain}, match_percentage={self.match_percentage})"
# The Library Entertainment types
library_types = [
LibraryType(
"manga", # name
manga_extensions, # extensions
[r"\(Digital\)"], # must_contain
[
r"Webtoon",
r"^(?=.*Digital)((?=.*Compilation)|(?=.*danke-repack))",
], # must_not_contain
1, # match_percentage - for classifying a group
),
LibraryType(
"light novel", # name
novel_extensions, # extensions
[
r"\[[^\]]*(Lucaz|Stick|Oak|Yen (Press|On)|J-Novel|Seven Seas|Vertical|One Peace Books|Cross Infinite|Sol Press|Hanashi Media|Kodansha|Tentai Books|SB Creative|Hobby Japan|Impress Corporation|KADOKAWA|Viz Media)[^\]]*\]|(faratnis)"
], # must_contain
[], # must_not_contain
),
LibraryType(
"digital_comps", # name
manga_extensions, # extensions
[r"^(?=.*Digital)((?=.*Compilation)|(?=.*danke-repack))"], # must_contain
[], # must_not_contain
),
]
# The Translation Status source types for a library
translation_source_types = ["official", "fan", "raw"]
# The Library languages
source_languages = [
"english",
"japanese",
"chinese",
"korean",
]
# Volume Regex Keywords to be used throughout the script
# ORDER IS IMPORTANT, if a single character volume keyword is checked first, then that can break
# cleaning of various bits of input.
volume_keywords = [
"LN",
"Light Novels?",
"Novels?",
"Books?",
"Volumes?",
"Vols?",
"Discs?",
"Tomo",
"Tome",
"Von",
"V",
"第",
"T",
]
# Chapter Regex Keywords used throughout the script
chapter_keywords = [
"Chapters?",
"Chaps?",
"Chs?",
"Cs?",
"D",
]
# Keywords to be avoided in a chapter regex.
# Helps avoid picking the wrong chapter number
# when no chapter keyword was used before it.
exclusion_keywords = [
r"(\s)Part(\s)",
r"(\s)Episode(\s)",
r"(\s)Season(\s)",
r"(\s)Arc(\s)",
r"(\s)Prologue(\s)",
r"(\s)Epilogue(\s)",
r"(\s)Omake(\s)",
r"(\s)Extra(\s)",
r"(\s)- Special(\s)",
r"(\s)Side Story(\s)",
# r"(\s)S(\s)",
r"(\s)Act(\s)",
r"(\s)Special Episode(\s)",
r"(\s)Ep(\s)",
r"(\s)- Version(\s)",
r"(\s)Ver(\s)",
r"(\s)PT\.",
r"(\s)PT(\s)",
r",",
r"(\s)×",
r"\d\s*-\s*",
r"\bNo.",
r"\bNo.(\s)",
r"\bBonus(\s)",
r"(\]|\}|\)) -",
r"\bZom(\s)",
r"Tail -",
]
subtitle_exclusion_keywords = [r"-(\s)", r"-", r"-\s[A-Za-z]+\s"]
# Volume Regex Keywords to be used throughout the script
volume_regex_keywords = "(?<![A-Za-z])" + "|(?<![A-Za-z])".join(volume_keywords)
# Exclusion keywords joined by just |
exclusion_keywords_joined = "|".join(exclusion_keywords)
# Subtitle exclusion keywords joined by just |
subtitle_exclusion_keywords_joined = "|".join(subtitle_exclusion_keywords)
# Put the exclusion_keywords_joined inside of (?<!%s)
exclusion_keywords_regex = r"(?<!%s)" % exclusion_keywords_joined
# Put the subtitle_exclusion_keywords_joined inside of (?<!%s)
subtitle_exclusion_keywords_regex = r"(?<!%s)" % subtitle_exclusion_keywords_joined
# Chapter Regex Keywords to be used throughout the script
chapter_regex_keywords = r"(?<![A-Za-z])" + (r"|(?<![A-Za-z])").join(chapter_keywords)
### EXTENION REGEX ###
# File extensions regex to be used throughout the script
file_extensions_regex = "|".join(file_extensions).replace(".", "\.")
# Manga extensions regex to be used throughout the script
manga_extensions_regex = "|".join(manga_extensions).replace(".", "\.")
# Novel extensions regex to be used throughout the script
novel_extensions_regex = "|".join(novel_extensions).replace(".", "\.")
# Image extensions regex to be used throughout the script
image_extensions_regex = "|".join(image_extensions).replace(".", "\.")
# REMINDER: ORDER IS IMPORTANT, Top to bottom is the order it will be checked in.
# Once a match is found, it will stop checking the rest.
# IMPORTANT: Any change of order or swapping of regexes, requires change in full_chapter_match_attempt_allowed alternative logic!
chapter_searches = [
r"\b\s-\s*(#)?(\d+)([-_.]\d+)*(x\d+)?\s*-\s",
r"\b(?<![\[\(\{])(%s)(\.)?\s*(\d+)([-_.]\d+)*(x\d+)?\b(?<!\s(\d+)([-_.]\d+)*(x\d+)?\s.*)"
% chapter_regex_keywords,
r"(?<![A-Za-z]|%s)(?<![\[\(\{])(((%s)([-_. ]+)?(\d+)([-_.]\d+)*(x\d+)?)|\s+(\d+)(\.\d+)?(x\d+((\.\d+)+)?)?(\s+|#\d+|%s))"
% (exclusion_keywords_joined, chapter_regex_keywords, manga_extensions_regex),
r"((?<!^)\b(\.)?\s*(%s)(\d+)([-_.]\d+)*((x|#)(\d+)([-_.]\d+)*)*\b)((\s+-|:)\s+).*?(?=\s*[\(\[\{](\d{4}|Digital)[\)\]\}])"
% exclusion_keywords_regex,
r"(\b(%s)?(\.)?\s*((%s)(\d{1,2})|\d{3,})([-_.]\d+)*(x\d+)?(#\d+([-_.]\d+)*)?\b)\s*((\[[^\]]*\]|\([^\)]*\)|\{[^}]*\})|((?<!\w(\s))|(?<!\w))(%s)(?!\w))"
% (chapter_regex_keywords, exclusion_keywords_regex, file_extensions_regex),
r"^((#)?(\d+)([-_.]\d+)*((x|#)(\d+)([-_.]\d+)*)*)$",
]
# pre-compile the chapter_searches
chapter_search_patterns_comp = [
re.compile(pattern, flags=re.IGNORECASE) for pattern in chapter_searches
]
# Used in check_for_existing_series() when sending
# a bulk amount of chapter release notifications to discord after the function is done,
# also allows them to be sent in number order.
messages_to_send = []
# Used to store multiple embeds to be sent in one message
grouped_notifications = []
# Discord's maximum amount of embeds that can be sent in one message
discord_embed_limit = 10
# The time to wait before performing the next action in
# the watchdog event handler.
sleep_timer = 10
# The time to wait before scraping another bookwalker page in
# the bookwalker_check feature.
sleep_timer_bk = 2
# The fill values for the chapter and volume files when renaming.
# # VOLUME
zfill_volume_int_value = 2 # 01
zfill_volume_float_value = 4 # 01.0
# # CHAPTER
zfill_chapter_int_value = 3 # 001
zfill_chapter_float_value = 5 # 001.0
# The Discord colors used for the embeds
purple_color = 7615723 # Starting Execution Notification
red_color = 16711680 # Removing File Notification
grey_color = 8421504 # Renaming, Reorganizing, Moving, Series Matching, and Bookwalker Release Notification
yellow_color = 16776960 # Not Upgradeable Notification
green_color = 65280 # Upgradeable and New Release Notification
preorder_blue_color = 5919485 # Bookwalker Preorder Notification
# The similarity score required for a publisher to be considered a match
publisher_similarity_score = 0.9
# Used to store the files and their associated dirs that have been marked as fully transferred
# When using watchdog, this is used to prevent the script from
# trying to process the same file multiple times.
transferred_files = []
transferred_dirs = []
# The logo url for usage in the bookwalker_check discord output
bookwalker_logo_url = "https://play-lh.googleusercontent.com/a7jUyjTxWrl_Kl1FkUSv2FHsSu3Swucpem2UIFDRbA1fmt5ywKBf-gcwe6_zalOqIR7V=w240-h480-rw"
# An alternative matching method that uses the image similarity between covers.
match_through_image_similarity = True
# The required score for two cover images to be considered a match
required_image_similarity_score = 0.9
# Checks the library against bookwalker for new releases.
bookwalker_check = False
# Used when moving the cover between locations.
series_cover_file_names = ["cover", "poster"]
# The required similarity score between the detected cover and the blank image to be considered a match.
# If the similarity score is equal to or greater than this value, the cover will be ignored as
# it is most likely a blank cover.
blank_cover_required_similarity_score = 0.9
# Prompts the user when deleting a lower-ranking duplicate volume when running
# check_for_duplicate_volumes()
manual_delete = False
# The required file type matching percentage between
# the download folder and the existing folder
#
# EX: 90% of the folder's files must have an extension in manga_extensions or novel_extensions
required_matching_percentage = 90
# The similarity score requirement when matching any bracketed release group
# within a file name. Used when rebuilding the file name in reorganize_and_rename.
release_group_similarity_score = 0.8
# searches for and copies an existing volume cover from a volume library over to the chapter library
copy_existing_volume_covers_toggle = False
# The percentage of words in the array of words,
# parsed from a shortened series_name to be kept
# for both series_names being compared.
# EX: 0.7= 70%
short_word_filter_percentage = 0.7
# The amount of time to sleep before checking again if all the files are fully transferred.
# Slower network response times may require a higher value.
watchdog_discover_new_files_check_interval = 5
# The time to sleep between file size checks when determining if a file is fully transferred.
# Slower network response times may require a higher value.
watchdog_file_transferred_check_interval = 1
# The libraries on the user's komga server.
# Used for sending scan reqeusts after files have been moved over.
komga_libraries = []
# Will move new series that couldn't be matched to the library to the appropriate library.
# requires: '--watchdog "True"' and check_for_existing_series_toggle = True
move_new_series_to_library_toggle = False
# Moves any series with a non-matching library type to the appropriate library
# requires: library_types
move_series_to_correct_library_toggle = False
# Used in get_extra_from_group()
publishers_joined = ""
release_groups_joined = ""
# Outputs the covers as WebP format
# instead of jpg format.
output_covers_as_webp = False
series_cover_path = ""
# Folder Class
class Folder:
def __init__(self, root, dirs, basename, folder_name, files):
self.root = root
self.dirs = dirs
self.basename = basename
self.folder_name = folder_name
self.files = files
# to string
def __str__(self):
return f"Folder(root={self.root}, dirs={self.dirs}, basename={self.basename}, folder_name={self.folder_name}, files={self.files})"
def __repr__(self):
return str(self)
# File Class
class File:
def __init__(
self,
name,
extensionless_name,
basename,
extension,
root,
path,
extensionless_path,
volume_number,
file_type,
header_extension,
):
self.name = name
self.extensionless_name = extensionless_name
self.basename = basename
self.extension = extension
self.root = root
self.path = path
self.extensionless_path = extensionless_path
self.volume_number = volume_number
self.file_type = file_type
self.header_extension = header_extension
class Publisher:
def __init__(self, from_meta, from_name):
self.from_meta = from_meta
self.from_name = from_name
# to string
def __str__(self):
return f"Publisher(from_meta={self.from_meta}, from_name={self.from_name})"
def __repr__(self):
return str(self)
# Volume Class
class Volume:
def __init__(
self,
file_type,
series_name,
shortened_series_name,
volume_year,
volume_number,
volume_part,
index_number,
release_group,
name,
extensionless_name,
basename,
extension,
root,
path,
extensionless_path,
extras,
publisher,
is_premium,
subtitle,
header_extension,
multi_volume=None,
is_one_shot=None,
):
self.file_type = file_type
self.series_name = series_name
self.shortened_series_name = shortened_series_name
self.volume_year = volume_year
self.volume_number = volume_number
self.volume_part = volume_part
self.index_number = index_number
self.release_group = release_group
self.name = name
self.extensionless_name = extensionless_name
self.basename = basename
self.extension = extension
self.root = root
self.path = path
self.extensionless_path = extensionless_path
self.extras = extras
self.publisher = publisher
self.is_premium = is_premium
self.subtitle = subtitle
self.header_extension = header_extension
self.multi_volume = multi_volume
self.is_one_shot = is_one_shot
# Custom sorting key function, sort by index_number
def get_sort_key(index_number):
if isinstance(index_number, list):
return min(index_number)
else:
return index_number
# Sorts the volumes by the index number if they're all numbers,
# otherwise it sorts the volumes alphabetically by the file name.
def sort_volumes(volumes):
if any(isinstance(item.index_number, str) for item in volumes):
# sort alphabetically by the file name
return sorted(volumes, key=lambda x: x.name)
else:
# sort by the index number
return sorted(volumes, key=lambda x: get_sort_key(x.index_number))
# Path Class
class Path:
def __init__(
self,
path,
path_formats=file_formats,
path_extensions=file_extensions,
library_types=library_types,
translation_source_types=translation_source_types,
source_languages=source_languages,
):
self.path = path
self.path_formats = path_formats
self.path_extensions = path_extensions
self.library_types = library_types
self.translation_source_types = translation_source_types
self.source_languages = source_languages
# to string
def __str__(self):
return f"Path(path={self.path}, path_formats={self.path_formats}, path_extensions={self.path_extensions}, library_types={self.library_types}, translation_source_types={self.translation_source_types}, source_languages={self.source_languages})"
def __repr__(self):
return str(self)
# Watches the download directory for any changes.
class Watcher:
def __init__(self):
self.observers = []
self.lock = threading.Lock()
def run(self):
event_handler = Handler(self.lock)
for folder in download_folders:
observer = Observer()
self.observers.append(observer)
observer.schedule(event_handler, folder, recursive=True)
observer.start()
try:
while True:
time.sleep(sleep_timer)
except Exception as e:
print(f"ERROR in Watcher.run(): {e}")
for observer in self.observers:
observer.stop()
print("Observer Stopped")
for observer in self.observers:
observer.join()
print("Observer Joined")
# Handles our embed object along with any associated file
class Embed:
def __init__(self, embed, file=None):
self.embed = embed
self.file = file
# Our array of file extensions and how many files have that extension
file_counters = {x: 0 for x in file_extensions}
# Sends a message, prints it, and writes it to a file.
def send_message(
message,
discord=True,
error=False,
log=log_to_file,
error_file_name="errors.txt",
changes_file_name="changes.txt",
):
print(message)
if discord:
send_discord_message(message)
if error:
errors.append(message)
if log:
write_to_file(error_file_name, message)
else:
items_changed.append(message)
if log:
write_to_file(changes_file_name, message)
# Determines the files library type
def get_library_type(files, required_match_percentage=None):
for library_type in library_types:
match_count = 0
for file in files:
extension = get_file_extension(file)
if (
extension in library_type.extensions
and all(
re.search(regex, file, re.IGNORECASE)
for regex in library_type.must_contain
)
and all(
not re.search(regex, file, re.IGNORECASE)
for regex in library_type.must_not_contain
)
):
match_count += 1
match_percentage = required_match_percentage or library_type.match_percentage
if match_count / len(files) * 100 >= match_percentage:
return library_type
return None
# Checks if the file is fully transferred by checking the file size
def is_file_transferred(file_path):
# Check if the file path exists and is a file
if not os.path.isfile(file_path):
return False
try:
# Get the file size before waiting for 1 second
before_file_size = os.path.getsize(file_path)
# Wait for 1 second
time.sleep(watchdog_file_transferred_check_interval)
# Get the file size after waiting for 1 second
after_file_size = os.path.getsize(file_path)
# Check if both file sizes are not None and not equal
if (
before_file_size is not None
and after_file_size is not None
and before_file_size != after_file_size
):
return False
# If the file size is None or the same, return True, indicating the file transfer is complete
return True
except Exception as e:
send_message(f"ERROR in is_file_transferred(): {e}")
return False
# Gets the file's file size
def get_file_size(file_path):
# Check if the file path exists and is a file
if os.path.isfile(file_path):
# Get the file information using os.stat()
file_info = os.stat(file_path)
# Return the file size using the st_size attribute of file_info
return file_info.st_size
else:
# If the file path does not exist or is not a file, return None
return None
# Recursively gets all the folders in a directory
def get_all_folders_recursively_in_dir(dir_path):
results = []
for root, dirs, files in scandir.walk(dir_path):
if root in download_folders + paths:
continue
folder_info = {"root": root, "dirs": dirs, "files": files}
results.append(folder_info)
return results
# Recursively gets all the files in a directory
def get_all_files_in_directory(dir_path):
results = []
for root, dirs, files in scandir.walk(dir_path):
files = remove_hidden_files(files)
files = remove_unaccepted_file_types(files, root, file_extensions)
results.extend(files)
return results
# Resursively gets all files in a directory for watchdog
def get_all_files_recursively_in_dir_watchdog(dir_path):
results = []
for root, dirs, files in scandir.walk(dir_path):
files = remove_hidden_files(files)
for file in files:
file_path = os.path.join(root, file)
if file_path not in results:
extension = get_file_extension(file_path)
if extension not in image_extensions:
results.append(file_path)
elif not compress_image_option and (
download_folders and dir_path in paths
):
results.append(file_path)
return results
# Generates a folder object for a given root
def create_folder_obj(root, dirs=None, files=None):
return Folder(
root,
dirs if dirs is not None else [],
os.path.basename(os.path.dirname(root)),
os.path.basename(root),
get_all_files_recursively_in_dir_watchdog(root) if files is None else files,
)
class Handler(FileSystemEventHandler):
def __init__(self, lock):
self.lock = lock
def on_created(self, event):
with self.lock:
start_time = time.time()
global grouped_notifications
try:
global transferred_files, transferred_dirs
extension = get_file_extension(event.src_path)
base_name = os.path.basename(event.src_path)
is_hidden = base_name.startswith(".")
is_valid_file = os.path.isfile(event.src_path)
in_file_extensions = extension in file_extensions
if not event.event_type == "created":
return None
if not is_valid_file or extension in image_extensions or is_hidden:
return None
print(f"\n\tEvent Type: {event.event_type}")
print(f"\tEvent Src Path: {event.src_path}")
# if not extension was found, return None
if not extension:
print("\t\t -No extension found, skipped.")
return None
# if the event is a directory, return None
if event.is_directory:
print("\t\t -Is a directory, skipped.")
return None
# if transferred_files, and the file is already in transferred_files
# then it already has been processed, so return None
elif transferred_files and event.src_path in transferred_files:
print("\t\t -Already processed, skipped.")
return None
# check if the extension is not in our accepted file extensions
elif not in_file_extensions:
# if we don't have delete_unacceptable_files_toggle enabled, return None
# if delete_unacceptable_files_toggle, we let it past so it can purge it with delete_unacceptable_files()
if not delete_unacceptable_files_toggle:
print(
"\t\t -Not in file extensions and delete_unacceptable_files_toggle is not enabled, skipped."
)
return None
elif (
(delete_unacceptable_files_toggle or convert_to_cbz_toggle)
and (
extension not in unacceptable_keywords
and "\\" + extension not in unacceptable_keywords
)
and not (
convert_to_cbz_toggle
and extension in convertable_file_extensions
)
):
print("\t\t -Not in file extensions, skipped.")
return None
# Finally if all checks are passed and the file was just created, we can process it
# Take any action here when a file is first created.
send_message("\nStarting Execution (WATCHDOG)", discord=False)
embed = handle_fields(
DiscordEmbed(
title="Starting Execution (WATCHDOG)",
color=purple_color,
),
[
{
"name": "File Found",
"value": f"```{event.src_path}```",
"inline": False,
}
],
)
send_discord_message(
None,
[Embed(embed, None)],
)
print(f"\n\tFile Found: {event.src_path}\n")
if not os.path.isfile(event.src_path):
return None
# Get a list of all files in the root directory and its subdirectories.
files = [
file
for folder in download_folders
for file in get_all_files_recursively_in_dir_watchdog(folder)
]
# Check if all files in the root directory and its subdirectories are fully transferred.
while True:
all_files_transferred = True
print(f"\nTotal files: {len(files)}")
for file in files:
print(
f"\t[{files.index(file) + 1}/{len(files)}] {os.path.basename(file)}"
)
if file in transferred_files:
print("\t\t-already transferred")
continue
is_transferred = is_file_transferred(file)
if is_transferred:
print("\t\t-fully transferred")
transferred_files.append(file)
dir_path = os.path.dirname(file)
if dir_path not in download_folders + transferred_dirs:
transferred_dirs.append(os.path.dirname(file))
elif not os.path.isfile(file):
print("\t\t-file no longer exists")
all_files_transferred = False
files.remove(file)
break
else:
print("\t\t-still transferreing...")
all_files_transferred = False
break
if all_files_transferred:
time.sleep(watchdog_discover_new_files_check_interval)
# The current list of files in the root directory and its subdirectories.
new_files = [
file
for folder in download_folders
for file in get_all_files_recursively_in_dir_watchdog(
folder
)
]
# If any new files started transferring while we were checking the current files,
# then we have more files to check.
if files != new_files:
all_files_transferred = False
if len(new_files) > len(files):
print(
f"\tNew transfers: +{len(new_files) - len(files)}"
)
files = new_files
elif len(new_files) < len(files):
break
elif files == new_files:
break
time.sleep(watchdog_discover_new_files_check_interval)
# Proceed with the next steps here.
print("\nAll files are transferred.")
# Make sure all items are a folder object
transferred_dirs = [
create_folder_obj(x) if not isinstance(x, Folder) else x