-
Notifications
You must be signed in to change notification settings - Fork 0
/
voice_fetch.py
109 lines (93 loc) · 4.24 KB
/
voice_fetch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import asyncio
import os
import pathlib
import subprocess
import clint
import requests
import common
import config
import fandom
import json
import threading
def make_dirs():
os.makedirs(config.save_dest_for_downloaded_voice, exist_ok=True)
def download_task_wrapper(char: str, text: str, url: str):
def download_task():
out = pathlib.Path(config.save_dest_for_downloaded_voice) / char / f"{common.md5(text)}.mp3"
if out.exists() and not out.read_bytes().startswith(b"<?xml"):
return
# fake our ua
r = requests.get(url, allow_redirects=True, stream=False, headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36'})
if r.status_code != 200:
common.log(f"Failed to download {url} for {char} with status code {r.status_code}")
raise Exception("Failed to download")
with open(out, 'wb+') as file:
total = int(r.headers.get('content-length'))
for i in clint.textui.progress.bar(r.iter_content(chunk_size=2391975), expected_size=(total / 1024) + 1):
if i:
file.write(i)
return common.request_retry_wrapper(lambda: download_task())
def dispatch_download_task(char: str, text: str, url: str):
x = lambda: download_task_wrapper(char, text, url)
# common.run_in_parallel(, ())
x()
def fetch_collection(collection: dict[str, list[tuple[str, str]]]):
make_dirs()
for char in collection:
ths: list[threading.Thread] = []
os.makedirs(pathlib.Path(config.save_dest_for_downloaded_voice) / char, exist_ok=True)
# seperate 8 list with same counts of elements
def split_list(lst, n):
return [lst[i:i+n] for i in range(0, len(lst), n)]
x = split_list(collection[char], 8)
def dispatcher(lst):
for voice in lst:
text, url = voice
dispatch_download_task(char, text, url)
for i in x:
ths.append(threading.Thread(target=dispatcher, args=(i,)))
for i in ths:
i.start()
for i in ths:
i.join()
def serialize_collection(collection: dict[str, list[tuple[str, str]]]) -> str:
result = {}
for char in collection:
for voice in collection[char]:
label = common.md5(voice[0])
text, url = voice
if result.get(char) is None:
result[char] = {}
result[char][label] = {"text": text, "url": url, "dest": os.path.join(config.save_dest_for_downloaded_voice, char, f"{label}.mp3")}
# persist unicodes
return json.dumps(result, ensure_ascii=False)
def reduce_collection(collection: dict[str, list[tuple[str, str]]]) -> dict[str, list[tuple[str, str]]]:
not_enough_data = []
former = json.loads(pathlib.Path(config.dataset_manifest_file_dest).read_text())
for char in collection:
# fetch 10 elements for each character
save_keys = [i for i in collection[char]][0:300]
collection[char] = save_keys
if len(collection[char]) < 10:
not_enough_data.append(char)
for i in not_enough_data:
if former.get(i) is not None:
common.log(f"Using existing data for {i}, ")
collection[i] = former[i]
else:
common.log(f"Not enough data for {i}, removing from collection")
del collection[i]
return collection
def generate_text_list(colab_project_prefix: pathlib.Path = pathlib.Path(config.save_dest_for_downloaded_voice)) -> list[str]:
collection = json.loads(pathlib.Path(config.dataset_manifest_file_dest).read_text())
for char in collection:
generated = ""
for i in collection[char]:
text, dest = collection[char][i]['text'], collection[char][i]['dest']
# vocal_path|speaker_name|language|text
text = text.replace('\n', '')
generated += f"{pathlib.Path(dest).name}|{char}|{common.extract_character_name(char)[1]}|{text}" + "\n"
# with open(, "w+") as f:
# f.write(generated)
pth: pathlib.Path = pathlib.Path(config.save_dest_for_downloaded_voice) / char / f"{char}.list"
pth.write_text(generated)