-
Notifications
You must be signed in to change notification settings - Fork 4
/
backup.py
226 lines (191 loc) · 8.35 KB
/
backup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
import os
import shutil
from pyrogram import Client, filters, enums
from pyrogram.types import Message
# noinspection PyUnresolvedReferences
from utils.misc import modules_help, prefix
from utils.scripts import format_exc, restart
from utils.db import db
from utils import config
if config.db_type in ["mongodb", "mongo"]:
import bson
def dump_mongo(collections, path, db_):
for coll in collections:
with open(os.path.join(path, f"{coll}.bson"), "wb+") as f:
for doc in db_[coll].find():
f.write(bson.BSON.encode(doc))
def restore_mongo(path, db_):
for coll in os.listdir(path):
if coll.endswith(".bson"):
with open(os.path.join(path, coll), "rb+") as f:
db_[coll.split(".")[0]].insert_many(bson.decode_all(f.read()))
@Client.on_message(filters.command(["backup", "back"], prefix) & filters.me)
async def backup(client: Client, message: Message):
"""
Backup the database
"""
try:
if not os.path.exists("backups/"):
os.mkdir("backups/")
await message.edit("<b>Backing up database...</b>", parse_mode=enums.ParseMode.HTML)
if config.db_type in ["mongo", "mongodb"]:
dump_mongo(db._database.list_collection_names(), "backups/", db._database)
return await message.edit(
f"<b>Database backed up to:</b> <code>backups/</code> folder",
parse_mode=enums.ParseMode.HTML
)
else:
shutil.copy(config.db_name, f"backups/{config.db_name}")
await client.send_document(
"me",
caption=f"<b>Database backup complete!\nType: </b>"
f"<code>.restore</code> in response to this message to restore the database.",
document=f"backups/{config.db_name}",
parse_mode=enums.ParseMode.HTML
)
return await message.edit(
"<b>Database backed up successfully! <code>(Check your favorites)</code></b>",
parse_mode=enums.ParseMode.HTML
)
except Exception as e:
await message.edit(format_exc(e), parse_mode=enums.ParseMode.HTML)
@Client.on_message(filters.command(["restore", "res"], prefix) & filters.me)
async def restore(client: Client, message: Message):
"""
Restore the database
"""
try:
await message.edit("<b>Restoring database...</b>", parse_mode=enums.ParseMode.HTML)
if config.db_type in ["mongo", "mongodb"]:
restore_mongo("backups/", db._database)
return await message.edit(
f"<b>Database restored from:</b> <code>backups/</code> folder",
parse_mode=enums.ParseMode.HTML
)
else:
if not message.reply_to_message or not message.reply_to_message.document:
return await message.edit(
"<b>Reply to a document to restore the database.</b>",
parse_mode=enums.ParseMode.HTML
)
elif not message.reply_to_message.document.file_name.casefold().endswith(
(".db", ".sqlite", ".sqlite3")
):
return await message.edit(
"<b>Reply to a database file to restore the database.</b>",
parse_mode=enums.ParseMode.HTML
)
await message.reply_to_message.download(
f"backups/{message.reply_to_message.document.file_name}"
)
shutil.copy(
f"backups/{message.reply_to_message.document.file_name}", config.db_name
)
await message.edit("<b>Database restored successfully!</b>", parse_mode=enums.ParseMode.HTML)
restart()
except Exception as e:
await message.edit(format_exc(e), parse_mode=enums.ParseMode.HTML)
@Client.on_message(filters.command(["backupmods", "bms"], prefix) & filters.me)
async def backupmods(client: Client, message: Message):
"""
Backup the modules
"""
try:
if not os.path.exists("backups/"):
os.mkdir("backups/")
await message.edit("<b>Backing up modules...</b>", parse_mode=enums.ParseMode.HTML)
from utils.misc import modules_help
for mod in modules_help:
if os.path.isfile(f"modules/custom_modules/{mod}.py"):
f = open(f"backups/{mod}.py", "wb")
f.write(open(f"modules/custom_modules/{mod}.py", "rb").read())
await message.edit(
text=f"<b>All modules backed up to:</b> <code>backups/</code> folder",
parse_mode=enums.ParseMode.HTML
)
except Exception as e:
await message.edit(format_exc(e), parse_mode=enums.ParseMode.HTML)
@Client.on_message(filters.command(["backupmod", "bm"], prefix) & filters.me)
async def backupmod(client: Client, message: Message):
"""
Backup the module
"""
try:
if not os.path.exists("backups/"):
os.mkdir("backups/")
from utils.misc import modules_help
try:
mod = message.text.split(maxsplit=1)[1].split(".")[0]
except:
return await message.edit(
f"<b>Usage:</b> <code>{prefix}backupmod [module]</code>",
parse_mode=enums.ParseMode.HTML
)
await message.edit("<b>Backing up module...</b>", parse_mode=enums.ParseMode.HTML)
if os.path.isfile(f"modules/custom_modules/{mod}.py"):
f = open(f"backups/{mod}.py", "wb")
f.write(open(f"modules/custom_modules/{mod}.py", "rb").read())
else:
return await message.edit(f"<b>Module <code>{mod}</code> not found.</b>", parse_mode=enums.ParseMode.HTML)
await message.reply_document(
document=f"backups/{mod}.py",
caption=f"<b>Module <code>{mod}</code> backed up to:</b> <code>backups/</code> folder", parse_mode=enums.ParseMode.HTML
)
except Exception as e:
await message.edit(format_exc(e), parse_mode=enums.ParseMode.HTML)
@Client.on_message(filters.command(["restoremod", "resmod"], prefix) & filters.me)
async def restoremod(client: Client, message: Message):
"""
Restore the module
"""
try:
if not os.path.exists("backups/"):
os.mkdir("backups/")
from utils.misc import modules_help
try:
mod = message.text.split(maxsplit=1)[1].split(".")[0]
except:
return await message.edit(
f"<b>Usage:</b> <code>{prefix}restoremod [module]</code>",
parse_mode=enums.ParseMode.HTML
)
await message.edit("<b>Restoring module...</b>", parse_mode=enums.ParseMode.HTML)
if os.path.isfile(f"backups/{mod}.py"):
f = open(f"modules/custom_modules/{mod}.py", "wb")
f.write(open(f"backups/{mod}.py", "rb").read())
else:
return await message.edit(f"<b>Module <code>{mod}</code> not found.</b>", parse_mode=enums.ParseMode.HTML)
await message.edit(f"<b>Module <code>{mod}</code> restored successfully!</b>", parse_mode=enums.ParseMode.HTML)
restart()
except Exception as e:
await message.edit(format_exc(e), parse_mode=enums.ParseMode.HTML)
@Client.on_message(filters.command(["restoremods", "resmods"], prefix) & filters.me)
async def restoremods(client: Client, message: Message):
"""
Restore the modules
"""
try:
if not os.path.exists("backups/"):
os.mkdir("backups/")
await message.edit("<b>Restoring modules...</b>", parse_mode=enums.ParseMode.HTML)
for mod in os.listdir("backups/"):
if mod.endswith(".py"):
if os.path.isfile(f"modules/{mod}"):
continue
f = open(f"modules/custom_modules/{mod}", "wb")
f.write(open(f"backups/{mod}", "rb").read())
await message.edit(
text=f"<b>All modules restored from:</b> <code>backups/</code> folder",
parse_mode=enums.ParseMode.HTML
)
restart()
except Exception as e:
await message.edit(format_exc(e), parse_mode=enums.ParseMode.HTML)
modules_help["backup"] = {
"backup": f"<b>Backup database</b>",
"restore [reply]": f"<b>Restore database</b>",
"backupmod [name]": f"<b>Backup mod</b>",
"backupmods": f"<b>Backup all mods</b>",
"resmod [name]": f"<b>Restore mod</b>",
"resmods": f"<b>Restore all mods</b>",
}