-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
105 lines (83 loc) · 3.3 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import json
import os
import httpx
from fastapi import FastAPI, Response, status
from bs4 import BeautifulSoup
TOKEN = os.getenv("TOKEN")
TELEGRAM_URL = f"https://api.telegram.org/bot{TOKEN}/"
app = FastAPI()
def scrapper(url: str) -> dict:
with httpx.Client(follow_redirects=True) as client:
response = client.get("https://songwhip.com/" + url)
if response.status_code != 200:
return {"error": "Wrong link"}
links = {}
html = response.text
soup = BeautifulSoup(html, 'html.parser')
songs = soup.find(id="__NEXT_DATA__")
songs = songs.get_text()
data = json.loads(songs)
try:
dct_links = next(iter(data['props']['initialReduxState']['tracks'].values()))[
'value']
except:
return {"error": "Can't process this link"}
artists = [artist["value"]['name']
for artist in data['props']['initialReduxState']["artists"].values()]
links["spotify"] = dct_links['links']['spotify'][0]['link']
links["itunes"] = dct_links['links']['itunes'][0]['link']
# links["yandex"] = dct_links['links']['yandex'][0]['link']
links["image"] = dct_links['image']
links['track'] = dct_links['name']
links['artists'] = ", ".join(artists)
links['songwhip'] = str(response.url)
return links
def validate_url(text: str) -> str:
if text == "/start":
return "start"
return ''
def sendMessage(text: str, id: int, disable_web_page_preview: bool = False) -> None:
with httpx.Client() as client:
client.get(TELEGRAM_URL + "sendMessage", params={
"chat_id": id, "text": text, "disable_web_page_preview": disable_web_page_preview})
@app.get("/song/{url:path}")
async def song_url(url) -> dict:
return scrapper(url)
@app.post(f"/telegram_bot", status_code=status.HTTP_200_OK)
async def webhook(update_data: dict, response: Response):
if update_data.get('message', None) == None:
response.status_code = status.HTTP_202_ACCEPTED
return
if update_data['message'].get('text', None) == None:
response.status_code = status.HTTP_202_ACCEPTED
return
verdict = validate_url(update_data['message']['text'])
if not verdict:
links = scrapper(update_data['message']['text'])
elif verdict == 'start':
welcome_string = '''
Just send me link from your music app!
For example: https://open.spotify.com/track/25nU5mxSzlzyOXzeqx4c5j
'''
sendMessage(
welcome_string, update_data["message"]["chat"]["id"], disable_web_page_preview=True)
return
else:
sendMessage(verdict, update_data["message"]["chat"]["id"])
links = scrapper(update_data['message']['text'])
if links.get("error", None) != None:
sendMessage(links['error'], update_data["message"]["chat"]["id"])
return
caption = f"""
<b>{links['track'].strip()} — {links['artists'].strip()}</b>
<b>Spotify:</b> {links['spotify'].strip()}
<b>Apple Music:</b> {links['itunes'].replace('{country}', 'ru').strip()}
<b>Others:</b> {links['songwhip'].strip()}
"""
params = {"chat_id": update_data["message"]["chat"]["id"],
"photo": links["image"],
"caption": caption,
"parse_mode": "HTML"
}
with httpx.Client() as client:
query = client.get(TELEGRAM_URL + "sendPhoto", params=params)