-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathpenpencil.py
140 lines (126 loc) · 4.05 KB
/
penpencil.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import asyncio
import base64
import urllib.parse
import httpx
import re
# Developer @Its_Vj_p
async def get_pssh_kid(mpd_url: str, headers: dict = {}, cookies: dict = {}):
"""
Get pssh, kid from mpd url
headers: Headers if needed
"""
pssh = ""
kid = ""
for i in range(3):
try:
async with httpx.AsyncClient() as client:
res = await client.get(mpd_url, headers=headers, cookies=cookies)
mpd_res = res.text
except Exception as e:
print("Error fetching MPD:", e)
continue
try:
matches = re.finditer("<cenc:pssh>(.*)</cenc:pssh>", mpd_res)
pssh = next(matches).group(1)
kid = re.findall(r'default_KID="([\S]+)"', mpd_res)[0].replace("-", "")
except Exception as e:
print("Error extracting PSSH or KID:", e)
continue
else:
break
return pssh, kid
class Penpencil:
otp_url = "https://api.penpencil.xyz/v1/videos/get-otp?key="
penpencil_bearer = 'Your_token'
headers = {
"Host": "api.penpencil.xyz",
"content-type": "application/json",
"authorization": f"Bearer {penpencil_bearer}",
"client-version": "11",
"user-agent": "Mozilla/5.0 (Linux; Android 10; PACM00) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.98 Mobile Safari/537.36",
"client-type": "MOBILE",
"accept-encoding": "gzip",
}
@classmethod
def get_otp_key(cls, kid: str):
f = (
base64.b64encode(
bytes(
[
(
ord(kid[i])
^ ord(
cls.penpencil_bearer[
i % (len(cls.penpencil_bearer) - 1)
]
)
)
for i in range(0, len(kid))
]
)
)
).decode("utf-8")
return f
@classmethod
def get_key(cls, otp: str):
a = base64.b64decode(otp)
b = len(a)
c = [int(a[i]) for i in range(0, b)]
d = "".join(
[
chr(
(c[j])
^ ord(
cls.penpencil_bearer[
j % (len(cls.penpencil_bearer) - 1)
]
)
)
for j in range(0, b)
]
)
return d
@classmethod
async def get_keys(cls, kid: str):
otp_key = cls.get_otp_key(kid)
keys = []
for i in range(3):
try:
async with httpx.AsyncClient(headers=cls.headers) as client:
resp = await client.get(f"{cls.otp_url}{otp_key}")
otp_dict = resp.json()
except Exception as e:
print("Error fetching OTP:", e)
continue
try:
otp = otp_dict["data"]["otp"]
key = cls.get_key(otp)
keys = [f"{kid}:{key}"]
except Exception as e:
print("Error extracting key:", e)
continue
else:
break
return keys
@classmethod
async def get_mpd_title(cls, url: str):
mpd_url = url
return mpd_url
@classmethod
async def get_mpd_keys_title(cls, url: str, keys: list = []):
mpd_url = await cls.get_mpd_title(url)
if keys:
return mpd_url
if mpd_url:
pssh, kid = await get_pssh_kid(mpd_url)
print("PSSH:", pssh)
print("KID:", kid)
keys = await cls.get_keys(kid)
print("Keys:", keys)
return mpd_url, keys
async def main():
url = input("Developer @Its_Vj_p \nEnter the MPD URL: ")
mpd_url, keys = await Penpencil.get_mpd_keys_title(url)
print("MPD URL:", mpd_url)
print("Keys:", keys)
asyncio.run(main())