This repository has been archived by the owner on Jun 6, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 171
/
run.py
79 lines (64 loc) · 2.08 KB
/
run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import utils
from TCP_monitor import TCP_monitor
from OnlineHeart import OnlineHeart
from LotteryResult import LotteryResult
from Tasks import Tasks
from connect import connect
from rafflehandler import Rafflehandler
import asyncio
from login import login
from printer import Printer
from statistics import Statistics
from bilibili import bilibili
import threading
import biliconsole
from schedule import Schedule
import configloader
import os
fileDir = os.path.dirname(os.path.realpath('__file__'))
file_user = fileDir + "/conf/user.conf"
dic_user = configloader.load_user(file_user)
loop = asyncio.get_event_loop()
printer = Printer(dic_user['thoroughly_log']['on/off'])
bilibili()
Statistics()
rafflehandler = Rafflehandler()
biliconsole.Biliconsole()
task = OnlineHeart()
task2 = Tasks()
task3 = LotteryResult()
task4 = connect()
tasks1 = [
login().login_new()
]
loop.run_until_complete(asyncio.wait(tasks1))
console_thread = threading.Thread(target=biliconsole.controler)
console_thread.start()
tasks = [
task.run(),
task2.run(),
biliconsole.Biliconsole().run(),
task4.create(),
task3.query(),
rafflehandler.run(),
]
if dic_user['monitoy_server']['on/off'] == "1":
monitor = TCP_monitor()
task_tcp_conn = monitor.connectServer(
dic_user['monitoy_server']['host'], dic_user['monitoy_server']['port'], dic_user['monitoy_server']['key'])
task_tcp_heart = monitor.HeartbeatLoop()
tasks.append(task_tcp_conn)
tasks.append(task_tcp_heart)
schedule = Schedule()
if dic_user['regular_sleep']['on/off'] == "1":
tasks.append(schedule.run(dic_user['regular_sleep']['schedule']))
Schedule().scheduled_sleep = True
tasks = list(map(asyncio.ensure_future, tasks))
loop.run_until_complete(asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION))
Printer().printer('\n'.join(map(repr, asyncio.Task.all_tasks())), "Info", "green")
for task in tasks:
Printer().printer(repr(task._state), "Info", "green")
if task._state == 'FINISHED':
Printer().printer(f"Task err: {repr(task.exception())}", "Error", "red")
loop.close()
console_thread.join()