forked from REBEL7265/MASTERBOT
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathunzipfile.py
124 lines (110 loc) · 4.65 KB
/
unzipfile.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
""" command: .unzip
coded by @By_Azade
code rewritten my SnapDragon7410
"""
import asyncio
import os
import time
import zipfile
from telethon import events
from telethon.tl.types import DocumentAttributeAudio, DocumentAttributeVideo
from ULTRA.utils import admin_cmd, humanbytes, progress, time_formatter
import time
from datetime import datetime
from pySmartDL import SmartDL
from hachoir.metadata import extractMetadata
from hachoir.parser import createParser
from zipfile import ZipFile
thumb_image_path = Config.TMP_DOWNLOAD_DIRECTORY + "/thumb_image.jpg"
extracted = Config.TMP_DOWNLOAD_DIRECTORY + "extracted/"
if not os.path.isdir(extracted):
os.makedirs(extracted)
@borg.on(admin_cmd(pattern="unzip"))
async def _(event):
if event.fwd_from:
return
mone = await event.edit("Processing ...")
if not os.path.isdir(Config.TMP_DOWNLOAD_DIRECTORY):
os.makedirs(Config.TMP_DOWNLOAD_DIRECTORY)
if event.reply_to_msg_id:
start = datetime.now()
reply_message = await event.get_reply_message()
try:
c_time = time.time()
downloaded_file_name = await borg.download_media(
reply_message,
Config.TMP_DOWNLOAD_DIRECTORY,
)
except Exception as e: # pylint:disable=C0103,W0703
await mone.edit(str(e))
else:
end = datetime.now()
ms = (end - start).seconds
await mone.edit("Stored the zip to `{}` in {} seconds.".format(downloaded_file_name, ms))
with zipfile.ZipFile(downloaded_file_name, 'r') as zip_ref:
zip_ref.extractall(extracted)
filename = sorted(get_lst_of_files(extracted, []))
#filename = filename + "/"
await event.edit("Unzipping now")
# r=root, d=directories, f = files
for single_file in filename:
if os.path.exists(single_file):
# https://stackoverflow.com/a/678242/4723940
caption_rts = os.path.basename(single_file)
force_document = True
supports_streaming = False
document_attributes = []
if single_file.endswith((".mp4", ".mp3", ".flac", ".webm")):
metadata = extractMetadata(createParser(single_file))
duration = 0
width = 0
height = 0
if metadata.has("duration"):
duration = metadata.get('duration').seconds
if os.path.exists(thumb_image_path):
metadata = extractMetadata(createParser(thumb_image_path))
if metadata.has("width"):
width = metadata.get("width")
if metadata.has("height"):
height = metadata.get("height")
document_attributes = [
DocumentAttributeVideo(
duration=duration,
w=width,
h=height,
round_message=False,
supports_streaming=True
)
]
try:
await borg.send_file(
event.chat_id,
single_file,
caption=f"UnZipped `{caption_rts}`",
force_document=force_document,
supports_streaming=supports_streaming,
allow_cache=False,
reply_to=event.message.id,
attributes=document_attributes,
# progress_callback=lambda d, t: asyncio.get_event_loop().create_task(
# progress(d, t, event, c_time, "trying to upload")
# )
)
except Exception as e:
await borg.send_message(
event.chat_id,
"{} caused `{}`".format(caption_rts, str(e)),
reply_to=event.message.id
)
# some media were having some issues
continue
os.remove(single_file)
os.remove(downloaded_file_name)
def get_lst_of_files(input_directory, output_lst):
filesinfolder = os.listdir(input_directory)
for file_name in filesinfolder:
current_file_name = os.path.join(input_directory, file_name)
if os.path.isdir(current_file_name):
return get_lst_of_files(current_file_name, output_lst)
output_lst.append(current_file_name)
return output_lst