-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathprocessor.py
346 lines (311 loc) · 12.7 KB
/
processor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
from urllib.parse import urlparse, parse_qsl, urlunparse, urlencode
from collections.abc import Callable, Iterable, Mapping
import requests.structures
from Varstorage import Configuration, Constants
from CFSession import cfSession
from requests.exceptions import RequestException
from typing import Any
from pathlib import Path
import threading
import requests
import shutil
import m3u8
import pickle
import time
import os
import re
config = Configuration().load()
# bytes pretty-printing
UNITS_MAPPING = [
(1<<50, ' PB'),
(1<<40, ' TB'),
(1<<30, ' GB'),
(1<<20, ' MB'),
(1<<10, ' KB'),
(1, (' byte', ' bytes')),
]
def validatename(word_orig:str):
"""
Copied from: https://github.com/Kinuseka/nScraper
"""
word = word_orig
forbidden = ['<', '>', ':', '"', "|", "?", "*"]
for char in forbidden:
word = word.replace(char, "")
word = word.replace("\\","_")
word = word.replace("/","_")
return word
def pretty_size(bytes, units=UNITS_MAPPING):
"""Get human-readable file sizes.
simplified version of https://pypi.python.org/pypi/hurry.filesize/
"""
for factor, suffix in units:
if bytes >= factor:
break
amount = int(bytes / factor)
if isinstance(suffix, tuple):
singular, multiple = suffix
if amount == 1:
suffix = singular
else:
suffix = multiple
return str(amount) + suffix
def UrlSearch(prepend):
"Generate a valid search url based on given words"
string = str(prepend)
var = string.replace(" ", "%20")
return f'{config.get_host}//search.html?keyword={var}'
def UrlFixer(url:str, flair: str):
"Add flair to base url"
return f'{url}{flair}'
def Get_ID(url):
"Get video ID from episode link"
url_parse = urlparse(url)
return url_parse.path.split("/")[-1]
def append_query(url: str, key: str, value: str) -> str:
url = url.rstrip('/')
url_parsed = urlparse(url)
query = dict(parse_qsl(url_parsed.query))
query.update({key: value})
new_url = url_parsed._replace(query=urlencode(query)).geturl()
return new_url
def pagination_link(url, on, max, direction):
if direction == "fwd" and on == max:
return None
elif direction == "prv" and on == 1:
return None
value = 1 if direction == "fwd" else -1
final_value = sum((on,value))
final = append_query(url, "page", final_value)
return final
def pick_quality(m3u8_data, preferred_quality="1080p", force = False):
qualities = [ quality for quality in m3u8_data if quality["quality"] not in ['backup','default']]
qualities.reverse()
quality_heiarchy = ["1080p", "720p", "480p", "360p", "default","backup"]
if preferred_quality == "best":
return qualities[0]
for quality in qualities:
if quality["quality"] == preferred_quality:
return quality
else:
if not force:
#Second attempt with automatic
for quality in qualities:
for qualh in quality_heiarchy:
if quality["quality"] == qualh:
#preferred_quality = quality_heiarchy[index+1]
return quality
return None
def list_quality(m3u8_data):
return list(reversed([quality['quality'] for quality in m3u8_data if quality["quality"] not in ['backup','default']]))
class RequestsClient():
def download(self, uri, timeout=None, headers={}, verify_ssl=True):
session = cfSession()
o = session.get(uri, timeout=timeout, headers=headers)
return o.text, o.url
class HlsObject():
"""
An HlsObject is responsible for managing the m3u8 file and downloading the video file. To initiate the download process, use the .download() method
"""
def __init__(self,
m3u8_url: str,
headers: dict,
file_name: str,
download_location: str,
extension = "mp4",
concurrency: int = 10,
daemon = True,
):
self.url = m3u8_url
self.headers = headers
self.daemon = daemon
self.download_location = download_location
self.file_name = file_name
#Set m3u8 instance
self.playlist = m3u8.load(self.url, http_client=RequestsClient())
self.duration = self.playlist.target_duration
#Set directories
self.cache_location = os.path.join(self.download_location, f".cache.{file_name}")
self.final_location = os.path.join(self.download_location, f"{file_name}.{extension}")
Path(self.download_location).mkdir(parents=True, exist_ok=True)
Path(self.cache_location).mkdir(parents=True, exist_ok=True)
self.semaphore = threading.Semaphore(concurrency)
#Load pickle for possible resume
self._pickled_directory = os.path.join(self.cache_location, "store.datadl")
self._load_pickle()
self.started_download = False
#Threaded variables
self.child_processes: list[Downloader_child] = []
#Parent progress
def _load_pickle(self) -> bool:
"Loads existing progress"
if os.path.isfile(self._pickled_directory):
with open(self._pickled_directory, "rb") as f:
try:
self.map, self.progress = pickle.load(f)
except (EOFError, pickle.UnpicklingError, MemoryError) as e:
print(e)
else:
self.map = []
self.progress = {
"error": None,
"progress": 0,
"errored": 0,
"file_size": 0
}
def _dump_pickle(self):
"Dumps existing progress"
with open(self._pickled_directory, "wb") as f:
pickle.dump((self.map, self.progress), f)
@property
def segments(self):
url = urlparse(self.url)
path = url.path.split("/")[1:]
if self._has_valid_url(self.playlist.segments.uri):
return self.playlist.segments.uri
else:
return [f"{url.scheme}://{url.netloc}/{path[0]}/{path[1]}/{seg}"
for seg in self.playlist.segments.uri]
@property
def segment_count(self):
return len(self.segments)
@property
def is_download_done(self):
return self.progress['progress'] == self.segment_count
def _has_valid_url(self, urls):
pattern = r'^https?://[\w\-]+(\.[\w\-]+)+[/#?]?.*$'
for url in urls:
if re.match(pattern, url):
return True
else:
return False
def download(self):
"""Creates children, and runs them starting the download process"""
if self.create_children():
self.started_download = True
self.start_children()
def _create_child(self, segment_url, segment_id):
"""Creates a single children process"""
thread = Downloader_child(url=segment_url, file_name=self.file_name, directory=self.cache_location, segment_id=segment_id, semaphore=self.semaphore, headers=self.headers, daemon=self.daemon)
return thread
def create_children(self):
"""Creates the children processes"""
for num, segments in enumerate(self.segments):
if num in self.map: continue
child = self._create_child(segment_url=segments,segment_id=num)
self.child_processes.append(child)
else:
return True
def start_children(self):
"""Starts the generated children"""
for child in self.child_processes:
child.start()
def download_progress(self):
"Blocks main_thread and shows the download progress during the process. Useful for debugging"
if not self.started_download:
print("Download has not started")
return
while True:
self.update_progress()
if self.is_download_done:
break
print(f"Downloaded: {self.progress['progress']}/{self.segment_count}/-{self.progress['errored']} Size: {self.progress['file_size']}", end="\r")
time.sleep(1)
def update_progress(self):
"""Invoke this to update status updates on children processes"""
done = []
errored = []
file_size = 0
for children in self.child_processes:
file_size += children.progress["file_size"]
if children.progress["done"] and not children.progress["error"]:
done.append(1)
if children.segment_id in self.map:
self.map.append(children.segment_id)
elif children.progress["error"]:
errored.append(1)
self.progress['file_size'] = file_size
self.progress['progress'] = len(done)
self.progress['errored'] = len(errored)
def arrange_files(self):
"""Arrange the downloaded files"""
if not self.is_download_done:
return False
with open(self.final_location, "wb") as f:
for child in self.child_processes:
with child.file_open() as file:
#Do chunked read to save memory
for chunk in iter(lambda: file.read(4096), b""):
f.write(chunk)
child.delete_file()
def cache_clear(self):
"""Clean temporary files/folder on cache for this particular instance. Once you call this, the parent process is now broken and must be discarded."""
cache_dir = self.cache_location
if not os.path.exists(cache_dir): return
for filename in os.listdir(cache_dir):
# Create absolute path
filepath = os.path.join(cache_dir, filename)
try:
# If it is a file or symlink, remove it
if os.path.isfile(filepath) or os.path.islink(filepath):
os.unlink(filepath)
# If it is a directory, remove it
elif os.path.isdir(filepath):
shutil.rmtree(filepath)
except Exception as e:
print('[CacheClean] Failed to delete %s. Reason: %s' % (filepath, e))
os.rmdir(cache_dir)
def close(self):
"""Gracefully close downloader and save progress"""
#post store
self.update_progress()
#self._dump_pickle()
class Downloader_child(threading.Thread):
def __init__(self, url, file_name, directory, segment_id, headers, semaphore: threading.Semaphore, finished_state = False, **kwargs):
super().__init__(**kwargs)
self.url = url
self.headers = headers
self.directory = directory
self.filename = file_name
self.segment_id = segment_id
self.final_name = f"{file_name}.{segment_id}"
self.semaphore = semaphore
self.progress = {
"file_name": file_name,
"error": None,
"done": False,
"file_size": 0
}
self.headers = requests.structures.CaseInsensitiveDict(headers)
if not self.headers.get("User-Agent"):
self.headers["User-Agent"] = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36 OPR/107.0.0.0"
Path(self.directory).mkdir(parents=True, exist_ok=True)
def file_open(self):
"""Return a Readable stream"""
return open(os.path.join(self.directory, self.final_name), "rb")
def delete_file(self):
"""Destroy contents, once you call this the child process is now broken and must be discarded"""
os.remove(os.path.join(self.directory, self.final_name))
def run(self):
self.semaphore.acquire()
last_exception = None
for i in range(5):
recorded_chunks = 0
with open(os.path.join(self.directory, self.final_name), "wb") as f:
try:
response_stream = requests.get(self.url, stream=True, timeout=10, headers=self.headers)
for chunks in response_stream.iter_content(chunk_size=4096):
f.write(chunks)
recorded_chunks += len(chunks)
self.progress["file_size"] = recorded_chunks
else:
self.progress["done"] = True
self.semaphore.release()
break
except RequestException as e:
print(f"thr: {self.segment_id} Attempt {i+1}/5: Error: {e}")
self.progress["file_size"] -= recorded_chunks
last_exception = e
else:
self.progress["done"] = True
self.progress["error"] = last_exception