-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathdump.py
178 lines (159 loc) · 5.44 KB
/
dump.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
import argparse
import asyncio
import re
import aiohttp
import aiofiles
from aiohttp import ClientTimeout
from bs4 import BeautifulSoup
from urllib.parse import urlparse
from tqdm.asyncio import tqdm, tqdm_asyncio
from pathlib import Path
USER_AGENT = "Mozilla/5.0"
HOST = "www.erome.com"
CHUNK_SIZE = 1024
def _clean_album_title(title: str, default_title="temp") -> str:
"""Remove illegal characters from the album title"""
illegal_chars = r'[\\/:*?"<>|]'
title = re.sub(illegal_chars, "_", title)
title = title.strip(". ")
return title if title else default_title
def _get_final_download_path(album_title: str) -> Path:
"""Create a directory with the title of the album"""
final_path = Path("downloads") / album_title
if not final_path.exists():
final_path.mkdir(parents=True)
return final_path
async def dump(url: str, max_connections: int, skip_videos: bool, skip_images: bool):
"""Collect album data and download the album"""
if urlparse(url).hostname != HOST:
raise ValueError(f"Host must be {HOST}")
title, urls = await _collect_album_data(
url=url, skip_videos=skip_videos, skip_images=skip_images
)
download_path = _get_final_download_path(album_title=title)
await _download(
album=url,
urls=urls,
max_connections=max_connections,
download_path=download_path,
)
async def _download(
album: str,
urls: list[str],
max_connections: int,
download_path: Path,
):
"""Download the album"""
semaphore = asyncio.Semaphore(max_connections)
async with aiohttp.ClientSession(
headers={"Referer": album, "User-Agent": USER_AGENT},
timeout=ClientTimeout(total=None),
) as session:
tasks = [
_download_file(
session=session,
url=url,
semaphore=semaphore,
download_path=download_path,
)
for url in urls
]
await tqdm_asyncio.gather(
*tasks,
colour="MAGENTA",
desc="Album Progress",
unit="file",
leave=True,
)
async def _download_file(
session: aiohttp.ClientSession,
url: str,
semaphore: asyncio.Semaphore,
download_path: Path,
):
"""Download the file"""
async with semaphore:
async with session.get(url) as r:
if r.ok:
file_name = Path(urlparse(url).path).name
total_size_in_bytes = int(r.headers.get("content-length", 0))
file_path = Path(download_path, file_name)
if file_path.exists():
existing_file_size = file_path.stat().st_size
if abs(existing_file_size - total_size_in_bytes) <= 50:
tqdm.write(f"[#] Skipping {url} [already downloaded]")
return
progress_bar = tqdm(
desc=f"[+] Downloading {url}",
total=total_size_in_bytes,
unit="B",
unit_scale=True,
unit_divisor=CHUNK_SIZE,
colour="MAGENTA",
leave=False,
)
async with aiofiles.open(file_path, "wb") as f:
async for chunk in r.content.iter_chunked(CHUNK_SIZE):
written_size = await f.write(chunk)
progress_bar.update(written_size)
progress_bar.close()
else:
tqdm.write(f"[ERROR] Failed to download {url}")
async def _collect_album_data(
url: str, skip_videos: bool, skip_images: bool
) -> tuple[str, list[str]]:
"""Collect videos and images from the album"""
headers = {"User-Agent": USER_AGENT}
async with aiohttp.ClientSession(headers=headers) as session:
async with session.get(url) as response:
html_content = await response.text()
soup = BeautifulSoup(html_content, "html.parser")
album_title = _clean_album_title(
soup.find("meta", property="og:title")["content"]
)
videos = (
[video_source["src"] for video_source in soup.find_all("source")]
if not skip_videos
else []
)
images = (
[
image["data-src"]
for image in soup.find_all("img", {"class": "img-back"})
]
if not skip_images
else []
)
album_urls = list({*videos, *images})
return album_title, album_urls
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-u", "--url", help="URL to download", type=str, required=True)
parser.add_argument(
"-c",
"--connections",
help="Maximum number of simultaneous connections",
type=int,
default=5,
)
parser.add_argument(
"-sv",
"--skip-videos",
action=argparse.BooleanOptionalAction,
help="Skip downloading videos",
)
parser.add_argument(
"-si",
"--skip-images",
action=argparse.BooleanOptionalAction,
help="Skip downloading images",
)
args = parser.parse_args()
asyncio.run(
dump(
url=args.url,
max_connections=args.connections,
skip_videos=args.skip_videos,
skip_images=args.skip_images,
)
)