Skip to content
This repository has been archived by the owner on Jun 6, 2023. It is now read-only.

Commit

Permalink
Merge pull request #171 from yawwwwwn/master
Browse files Browse the repository at this point in the history
优化调整
  • Loading branch information
Dawnnnnnn authored Dec 21, 2018
2 parents 8f2892f + 5a6ef1c commit 8b4ab70
Show file tree
Hide file tree
Showing 8 changed files with 281 additions and 309 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@
*.xml
log.txt
__pycache__/
.idea/
164 changes: 42 additions & 122 deletions MultiRoom.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,127 +4,47 @@
from printer import Printer


class MultiRoom:

async def asmr_area(self):
while True:
try:
url = "https://api.live.bilibili.com/room/v1/area/getRoomList?platform=web&parent_area_id=1&cate_id=0&area_id=0&sort_type=online&page=1&page_size=30"
response = await bilibili().bili_section_get(url)
json_response = await response.json(content_type=None)
checklen = len(json_response['data'])
asmr_area_room = json_response['data'][random.randint(0, checklen-1)]['roomid']
state = await bilibili().check_room_state(asmr_area_room)
if state == 1:
return [asmr_area_room, "娱乐分区"]
else:
Printer().printer("检测到房间未开播,立即尝试重新获取", "Error", "red")
except Exception as e:
Printer().printer(f"获取 [娱乐分区] 房间列表失败,5s后进行下次尝试 {repr(e)}", "Error", "red")
await asyncio.sleep(5)

async def game_area(self):
while True:
try:
url = "https://api.live.bilibili.com/room/v1/area/getRoomList?platform=web&parent_area_id=2&cate_id=0&area_id=0&sort_type=online&page=1&page_size=30"
response = await bilibili().bili_section_get(url)
json_response = await response.json(content_type=None)
checklen = len(json_response['data'])
game_area_room = json_response['data'][random.randint(0, checklen-1)]['roomid']
state = await bilibili().check_room_state(game_area_room)
if state == 1:
return [game_area_room, "游戏分区"]
else:
Printer().printer("检测到房间未开播,立即尝试重新获取", "Error", "red")
except Exception as e:
Printer().printer(f"获取 [游戏分区] 房间列表失败,5s后进行下次尝试 {repr(e)}", "Error", "red")
await asyncio.sleep(5)

async def mobile_area(self):
while True:
try:
url = "https://api.live.bilibili.com/room/v1/area/getRoomList?platform=web&parent_area_id=3&cate_id=0&area_id=0&sort_type=online&page=1&page_size=30"
response = await bilibili().bili_section_get(url)
json_response = await response.json(content_type=None)
checklen = len(json_response['data'])
mobile_area_room = json_response['data'][random.randint(0, checklen-1)]['roomid']
state = await bilibili().check_room_state(mobile_area_room)
if state == 1:
return [mobile_area_room, "手游分区"]
else:
Printer().printer("检测到房间未开播,立即尝试重新获取", "Error", "red")
except Exception as e:
Printer().printer(f"获取 [手游分区] 房间列表失败,5s后进行下次尝试 {repr(e)}", "Error", "red")
await asyncio.sleep(5)

async def draw_area(self):
while True:
try:
url = "https://api.live.bilibili.com/room/v1/area/getRoomList?platform=web&parent_area_id=4&cate_id=0&area_id=0&sort_type=online&page=1&page_size=30"
response = await bilibili().bili_section_get(url)
json_response = await response.json(content_type=None)
checklen = len(json_response['data'])
draw_area_room = json_response['data'][random.randint(0, checklen-1)]['roomid']
state = await bilibili().check_room_state(draw_area_room)
if state == 1:
return [draw_area_room, "绘画分区"]
else:
Printer().printer(f"检测到房间未开播,立即尝试重新获取", "Error", "red")
except Exception as e:
Printer().printer(f"获取 [绘画分区] 房间列表失败,5s后进行下次尝试 {repr(e)}", "Error", "red")
await asyncio.sleep(5)

async def radio_area(self):
while True:
try:
url = "https://api.live.bilibili.com/room/v1/area/getRoomList?platform=web&parent_area_id=5&cate_id=0&area_id=0&sort_type=online&page=1&page_size=30"
response = await bilibili().bili_section_get(url)
json_response = await response.json(content_type=None)
checklen = len(json_response['data'])
radio_area_room = json_response['data'][random.randint(0, checklen-1)]['roomid']
state = await bilibili().check_room_state(radio_area_room)
if state == 1:
return [radio_area_room, "电台分区"]
else:
Printer().printer(f"检测到房间未开播,立即尝试重新获取", "Error", "red")
except Exception as e:
Printer().printer(f"获取 [电台分区] 房间列表失败,5s后进行下次尝试 {repr(e)}", "Error", "red")
await asyncio.sleep(5)

async def check_state(self, area, roomid=None):
if roomid is not None:
response = await bilibili().check_room_info(roomid)
async def area2room(area_id):
while True:
try:
url = "https://api.live.bilibili.com/room/v1/area/getRoomList?platform=web&parent_area_id=" + \
str(area_id) + "&cate_id=0&area_id=0&sort_type=online&page=1&page_size=30"
response = await bilibili().bili_section_get(url)
json_response = await response.json(content_type=None)
live_status = json_response['data']['live_status']
curr_area_name = json_response['data']['parent_area_name']
if live_status == 1 and curr_area_name in area:
Printer().printer(f'[{area}] 房间 {roomid} 直播状态正常', "Info", "green")
return [roomid, area]
elif live_status != 1:
Printer().printer(f"[{area}] 房间 {roomid} 已未直播!将切换监听房间", "Info", "green")
checklen = len(json_response['data'])
rand_num = random.randint(0, checklen-1)
new_area_id = json_response['data'][rand_num]['parent_id']
if not new_area_id == int(area_id):
continue
area_room = json_response['data'][rand_num]['roomid']
state = await bilibili().check_room_state(area_room)
if state == 1:
new_area = str(new_area_id) + json_response['data'][rand_num]['parent_name']
return [area_room, new_area]
else:
# print(type(live_status), live_status, curr_area_name)
Printer().printer(f"[{area}] 房间 {roomid} 已切换分区[{curr_area_name}]!将切换监听房间", "Info", "green")
if area == "娱乐分区":
asmr = await self.asmr_area()
return asmr
elif area == "游戏分区":
game = await self.game_area()
return game
elif area == "手游分区":
mobile = await self.mobile_area()
return mobile
elif area == "绘画分区":
draw = await self.draw_area()
return draw
elif area == "电台分区":
radio = await self.radio_area()
return radio

async def get_all(self):
asmr = await self.asmr_area()
game = await self.game_area()
mobile = await self.mobile_area()
draw = await self.draw_area()
radio = await self.radio_area()
return [asmr, game, mobile, draw, radio]
Printer().printer("检测到获取房间未开播,立即尝试重新获取", "Error", "red")
except Exception as e:
Printer().printer(f"获取房间列表失败,5s后进行下次尝试 {repr(e)}", "Error", "red")
await asyncio.sleep(5)


async def check_state(area, roomid=None):
if roomid is not None:
response = await bilibili().check_room_info(roomid)
json_response = await response.json(content_type=None)
live_status = json_response['data']['live_status']
curr_area_name = json_response['data']['parent_area_name']
if live_status == 1 and curr_area_name in area:
Printer().printer(f'[{area}分区] 房间 {roomid} 直播状态正常', "Info", "green")
return [roomid, area]
elif live_status != 1:
Printer().printer(f"[{area}分区] 房间 {roomid} 已未直播!将切换监听房间", "Info", "green")
else:
# print(type(live_status), live_status, curr_area_name)
Printer().printer(f"[{area}分区] 房间 {roomid} 已切换分区[{curr_area_name}]!将切换监听房间", "Info", "green")

return await area2room(area[0])


async def get_all():
return [await area2room(i+1) for i in range(5)]
75 changes: 52 additions & 23 deletions OnlineHeart.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from bilibili import bilibili
from login import login
import utils
import time
import traceback
import datetime
Expand All @@ -15,6 +16,7 @@ def CurrentTime():


class OnlineHeart:
last_guard_room = 0

async def apppost_heartbeat(self):
await bilibili().apppost_heartbeat()
Expand All @@ -27,55 +29,82 @@ async def heart_gift(self):
await bilibili().heart_gift()

async def guard_lottery(self):
response = await bilibili().guard_list()
json_response = response.json()
for k in range(3):
try:
response = await bilibili().guard_list()
json_response = response.json()
break
except Exception:
continue
else:
Printer().printer("连接舰长服务器失败", "Error", "red")
return
for i in range(0, len(json_response)):
if json_response[i]['Status']:
GuardId = json_response[i]['GuardId']
if GuardId not in had_gotted_guard:
if GuardId not in had_gotted_guard and GuardId != 0:
had_gotted_guard.append(GuardId)
OriginRoomId = json_response[i]['OriginRoomId']
if not OriginRoomId == OnlineHeart.last_guard_room:
result = await utils.check_room_true(OriginRoomId)
if True in result:
Printer().printer(f"检测到房间 {OriginRoomId} 的钓鱼操作", "Warning", "red")
continue
await bilibili().post_watching_history(OriginRoomId)
OnlineHeart.last_guard_room = OriginRoomId
response2 = await bilibili().get_gift_of_captain(OriginRoomId, GuardId)
json_response2 = await response2.json(content_type=None)
if json_response2['code'] == 0:
Printer().printer(f"获取到房间[{OriginRoomId}]编号[{GuardId}]的上船亲密度:{json_response2['data']['message']}",
Printer().printer(f"获取到房间 {OriginRoomId} 编号 {GuardId} 的上船亲密度: {json_response2['data']['message']}",
"Lottery", "cyan")
elif json_response2['code'] == 400 and json_response2['msg'] == "你已经领取过啦":
Printer().printer(
f"房间[{OriginRoomId}]编号[{GuardId}]的上船亲密度已领过",
f"房间 {OriginRoomId} 编号 {GuardId} 的上船亲密度已领过",
"Info", "green")
elif json_response2['code'] == 400 and json_response2['msg'] == "访问被拒绝":
Printer().printer(f"获取房间 {OriginRoomId} 编号 {GuardId} 的上船亲密度: {json_response2['message']}",
"Lottery", "cyan")
print(json_response2)
else:
Printer().printer(
f"房间[{OriginRoomId}]编号[{GuardId}] 的上船亲密度领取出错,{json_response2}",
f"房间 {OriginRoomId} 编号 {GuardId} 的上船亲密度领取出错: {json_response2}",
"Error", "red")
else:
pass


async def draw_lottery(self):
black_list = ["测试", "test", "12345"]
for i in range(212, 300):
black_list = ["123", "1111", "测试", "測試", "测一测", "ce-shi", "test", "T-E-S-T", "lala", # 已经出现
"測一測", "TEST", "Test", "t-e-s-t"] # 合理猜想
last_lottery = 0
for i in range(247, 300):
response = await bilibili().get_lotterylist(i)
json_response = await response.json()
if json_response['code'] == 0:
temp = json_response['data']['title']
for k in black_list:
if k in temp:
Printer().printer(f"检测到疑似钓鱼类测试抽奖,默认不参与,请自行判断抽奖可参与性","Warning","red")
else:
check = len(json_response['data']['typeB'])
for g in range(0, check):
join_end_time = json_response['data']['typeB'][g]['join_end_time']
join_start_time = json_response['data']['typeB'][g]['join_start_time']
ts = CurrentTime()
if int(join_end_time) > int(ts) > int(join_start_time):
last_lottery = 0
title = json_response['data']['title']
check = len(json_response['data']['typeB'])
for g in range(check):
join_end_time = json_response['data']['typeB'][g]['join_end_time']
join_start_time = json_response['data']['typeB'][g]['join_start_time']
status = json_response['data']['typeB'][g]['status']
ts = CurrentTime()
if int(join_end_time) > int(ts) > int(join_start_time) and status == 0:
jp_list = '&'.join([jp['jp_name'] for jp in json_response['data']['typeB'][g]['list']])
for k in black_list:
if k in title or k in jp_list:
Printer().printer(f"检测到 {i} 号疑似钓鱼类测试抽奖『{title}>>>{jp_list}』" + \
",默认不参与,请自行判断抽奖可参与性","Warning","red")
break
else:
response1 = await bilibili().get_gift_of_lottery(i, g)
json_response1 = await response1.json(content_type=None)
Printer().printer(f"参与实物抽奖回显:{json_response1}", "Lottery","cyan")
else:
pass
Printer().printer(f"参与『{title}>>>{jp_list}』抽奖回显: {json_response1}", "Lottery", "cyan")
else:
break
if not last_lottery == 0: # 因为有中途暂时空一个-400的情况
break
else:
last_lottery = json_response['code']

async def run(self):
while 1:
Expand Down
Loading

0 comments on commit 8b4ab70

Please sign in to comment.