diff --git a/FFmpegWrapper.py b/FFmpegWrapper.py
deleted file mode 100644
index 75541bc..0000000
--- a/FFmpegWrapper.py
+++ /dev/null
@@ -1,190 +0,0 @@
-"""
-Thanks to Neon22 at:
-https://gist.github.com/Neon22
-"""
-
-from ctypes import (c_int, c_int64, c_uint64,
- c_uint8, c_uint, c_size_t, c_char, c_char_p,
- c_void_p, POINTER, CFUNCTYPE, Structure)
-
-
-AV_NUM_DATA_POINTERS = 8
-
-
-class AVCodecContext(Structure):
- pass
-
-
-class AVRational(Structure):
- pass
-
-
-class AVIOInterruptCB(Structure):
- pass
-
-
-class AVPacket(Structure):
- _fields_ = [
- ('buf', c_void_p),
- ('pts', c_int64),
- ('dts', c_int64),
- ('data', POINTER(c_uint8)),
- ('size', c_int),
- ('stream_index', c_int),
- ('flags', c_int),
- ('side_data', c_void_p),
- ('side_data_elems', c_int),
- ('duration', c_int64),
- ('pos', c_int64),
- ('convergence_duration', c_int64) # Deprecated
- ]
-
-
-class AVFrame(Structure):
- _fields_ = [
- ('data', POINTER(c_uint8) * AV_NUM_DATA_POINTERS),
- ('linesize', c_int * AV_NUM_DATA_POINTERS),
- ('extended_data', POINTER(POINTER(c_uint8))),
- ('width', c_int),
- ('height', c_int),
- ('nb_samples', c_int),
- ('format', c_int),
- ('key_frame', c_int),
- ('pict_type', c_int), # or c_uint8
- ('sample_aspect_ratio', AVRational),
- ('pts', c_int64),
- ('pkt_pts', c_int64), # Deprecated
- ('pkt_dts', c_int64),
- ('coded_picture_number', c_int),
- ('display_picture_number', c_int),
- ('quality', c_int),
- ('opaque', c_void_p),
- ('error', c_uint64 * AV_NUM_DATA_POINTERS), #Deprecated
- ('repeat_pict', c_int),
- ('interlaced_frame', c_int),
- ('top_field_first', c_int),
- ('palette_has_changed', c_int),
- ('reordered_opaque', c_int64),
- ('sample_rate', c_int),
- ('channel_layout', c_uint64),
- ('buf', c_void_p * AV_NUM_DATA_POINTERS),
- ('extended_buf', c_void_p),
- ('nb_extended_buf', c_int),
- ('side_data', c_void_p),
- ('nb_side_data', c_int),
- ('flags', c_int),
- ('color_range', c_int),
- ('color_primaries', c_int),
- ('color_trc', c_int),
- ('colorspace', c_int),
- ('chroma_location', c_int),
- ('best_effort_timestamp', c_int64),
- ('pkt_pos', c_int64),
- ('pkt_duration', c_int64),
- #!
- ('metadata', c_void_p),
- ('decode_error_flags', c_int),
- ('channels', c_int),
- ('pkt_size', c_int),
- ('qscale_table', POINTER(c_int)), #Deprecated or c_unit8#!
- ('qstride', c_int), #Deprecated
- ('qscale_type', c_int), #Deprecated
- ('qp_table_buf', c_void_p), #Deprecated
- ('hw_frames_ctx', c_void_p),
- ('opaque_ref', c_void_p),
- #!('private_ref', POINTER(AVBufferRef)),
- #!('width', c_int), # video frames only
- #(!'height', c_int), # video frames only
- #!('crop_top', c_size_t), # video frames only
- #!('crop_bottom', c_size_t), # video frames only
- #!('crop_left', c_size_t), # video frames only
- #!('crop_right', c_size_t) # video frames only
- ]
-
-
-class AVFormatContext(Structure):
- pass
-
-
-AVFormatContext._fields_ = [
- ('av_class', c_void_p),
- ('iformat', c_void_p),
- ('oformat', c_void_p),
- ('priv_data', c_void_p),
- ('pb', c_void_p),
- ('ctx_flags', c_int),
- ('nb_streams', c_uint),
- ('streams', c_void_p),
- ('filename', c_char*1024), # Deprecated
- ('url', c_char_p),
- ('start_time', c_int64),
- ('duration', c_int64),
- ('bit_rate', c_int64),
- ('packet_size', c_uint),
- ('max_delay', c_int),
- ('flags', c_int),
- ('probesize', c_int64),
- ('max_analyze_duration', c_int64),
- ('key', POINTER(c_uint8)),
- ('keylen', c_int),
- ('nb_programs', c_uint),
- ('programs', c_void_p),
- ('video_codec_id', c_int),
- ('audio_codec_id', c_int),
- ('subtitle_codec_id', c_int),
- ('max_index_size', c_uint),
- ('max_picture_buffer', c_uint),
- ('nb_chapters', c_uint),
- ('chapters', c_void_p),
- ('metadata', c_void_p),
- ('start_time_realtime', c_int64),
- ('fps_probe_size', c_int),
- ('error_recognition', c_int),
- ('interrupt_callback', AVIOInterruptCB),
- ('debug', c_int),
- ('max_interleave_delta', c_int64),
- ('strict_std_compliance', c_int),
- ('event_flags', c_int),
- ('max_ts_probe', c_int),
- ('avoid_negative_ts', c_int),
- ('ts_id', c_int),
- ('audio_preload', c_int),
- ('max_chunk_duration', c_int),
- ('max_chunk_size', c_int),
- ('use_wallclock_as_timestamps', c_int),
- ('avio_flags', c_int),
- ('duration_estimation_method', c_uint), #c_uint8
- ('skip_initial_bytes', c_int64),
- ('correct_ts_overflow', c_uint),
- ('seek2any', c_int),
- ('flush_packets', c_int),
- ('probe_score', c_int),
- ('format_probesize', c_int),
- ('codec_whitelist', c_char_p),
- ('format_whitelist', c_char_p),
- ('internal', c_void_p),
- ('io_repositioned', c_int),
- ('video_codec', c_void_p),
- ('audio_codec', c_void_p),
- ('subtitle_codec', c_void_p),
- ('data_codec', c_void_p),
- ('metadata_header_padding', c_int),
- ('opaque', c_void_p),
- ('control_message_cb', CFUNCTYPE(c_int,
- POINTER(AVFormatContext), c_int, c_void_p,
- c_size_t)),
- ('output_ts_offset', c_int64),
- ('dump_separator', POINTER(c_uint8)),
- ('data_codec_id', c_int),
- # ! one more in here?
- ('protocol_whitelist', c_char_p),
- ('io_open', CFUNCTYPE(c_int, POINTER(AVFormatContext),
- c_void_p, c_char_p, c_int,
- c_void_p)),
- ('io_close', CFUNCTYPE(None, POINTER(AVFormatContext), c_void_p)),
- ('protocol_blacklist', c_char_p),
- ('max_streams', c_int)
- ]
-
-
-read_packet_func = CFUNCTYPE(c_int, c_void_p, POINTER(c_uint8), c_int)
diff --git a/NaiveScrcpyClient.py b/NaiveScrcpyClient.py
deleted file mode 100644
index 2b5dfd2..0000000
--- a/NaiveScrcpyClient.py
+++ /dev/null
@@ -1,350 +0,0 @@
-# coding=utf-8
-
-import numpy as np
-import cv2
-import os
-import socket
-import struct
-import time
-import traceback
-import subprocess
-import ctypes
-from FFmpegWrapper import AVFormatContext, AVCodecContext, AVPacket, AVFrame, read_packet_func
-from threading import Thread
-from collections import deque
-
-
-class ScrcpyDecoder:
- def __init__(self, _config):
- self.buff_size = _config.get('buff_size', 0x100000)
- self.lib_path = _config.get('lib_path', 'lib')
- self.img_queue = deque(maxlen=int(_config.get('deque_length', 5)))
-
- # TCP
- self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.port = _config.get('adb_port', 61550)
-
- # pointers
- self.frame_ptr = 0
- self.codec_ctx_ptr = 0
- self.format_ctx_ptr = 0
-
- # thread and flag
- self.decode_thread = None
- self.should_run = False
-
- @staticmethod
- def ff_err_tag(cd):
- mk_tag = 0
- for i in reversed(cd):
- mk_tag = mk_tag << 8
- mk_tag |= (ord(i) & 0xff)
- result = -int(mk_tag)
- return result
-
- def start_decoder(self):
- self.should_run = True
- if self.decode_thread is None:
- self.decode_thread = Thread(target=self._run_decoder)
- self.decode_thread.start()
-
- def _run_decoder(self):
- try:
- self.sock.settimeout(0.5)
- self.sock.connect(("127.0.0.1", self.port))
- except:
- traceback.print_exc()
- return 1
- self._receive_info()
- lib_file_list = os.listdir(self.lib_path)
-
- def get_lib_full_path(keyword):
- for i in lib_file_list:
- if keyword in i:
- return os.path.join(self.lib_path, i)
- print("Could not find runtime %s at %s" % (keyword, self.lib_path))
- return None
-
- avutil_lib = get_lib_full_path("avutil")
- swresample_lib = get_lib_full_path("swresample")
- avcodec_lib = get_lib_full_path("avcodec")
- avformat_lib = get_lib_full_path("avformat")
-
- if None in [avutil_lib, swresample_lib, avcodec_lib, avformat_lib]:
- return -2
-
- lib_avutil = ctypes.CDLL(avutil_lib)
- lib_swresample = ctypes.CDLL(swresample_lib)
- lib_avcodec = ctypes.CDLL(avcodec_lib)
- lib_avformat = ctypes.CDLL(avformat_lib)
-
- lib_avformat.av_register_all()
-
- def clean_decoder():
- if self.frame_ptr:
- # print("Free frame")
- lib_avutil.av_free(self.frame_ptr)
- self.frame_ptr = 0
- if self.codec_ctx_ptr:
- # print("Free avcodec")
- lib_avcodec.avcodec_close(self.codec_ctx_ptr)
- self.codec_ctx_ptr = 0
- if self.format_ctx_ptr:
- # print("Free avformat")
- lib_avformat.avformat_close_input(ctypes.byref(self.format_ctx_ptr))
- self.format_ctx_ptr = 0
- self.sock.close()
-
- find_decoder = lib_avcodec.avcodec_find_decoder_by_name
- find_decoder.restype = ctypes.POINTER(AVCodecContext)
- decoder_list = [b'h264_mmal', b'h264']
- for decoder in decoder_list:
- codec_ptr = find_decoder(ctypes.c_char_p(decoder))
- if codec_ptr:
- print("Found %s decoder" % decoder.decode('utf8'))
- break
- else:
- print("H.264 decoder not found")
- return 1
-
- alloc_context = lib_avcodec.avcodec_alloc_context3
- alloc_context.restype = ctypes.POINTER(AVCodecContext)
- self.codec_ctx_ptr = alloc_context(codec_ptr)
- if not self.codec_ctx_ptr:
- print("Could not allocate decoder context")
- clean_decoder()
- return 2
-
- ret = lib_avcodec.avcodec_open2(self.codec_ctx_ptr, codec_ptr, None)
- if ret < 0:
- print("Could not open H.264 decoder")
- clean_decoder()
- return 3
-
- format_alloc_context = lib_avformat.avformat_alloc_context
- format_alloc_context.restype = ctypes.POINTER(AVFormatContext)
- self.format_ctx_ptr = format_alloc_context()
- if not self.format_ctx_ptr:
- print("Could not allocate format context")
- clean_decoder()
- return 4
-
- av_malloc = lib_avutil.av_malloc
- av_malloc.restype = ctypes.POINTER(ctypes.c_ubyte)
- buffer_ptr = av_malloc(self.buff_size)
- if not buffer_ptr:
- print("Could not allocate buffer")
- clean_decoder()
- return 5
-
- def read_packet_wrapper(_, buff, c_size):
- try:
- s, data = self.receive_data(c_size)
- if s == 0:
- return self.ff_err_tag('EOF ')
- else:
- ctypes.memmove(buff, data, s)
- return s
- except:
- traceback.print_exc()
- return self.ff_err_tag('EOF ')
-
- read_packet_ptr = read_packet_func(read_packet_wrapper)
- av_alloc_ctx = lib_avformat.avio_alloc_context
- av_alloc_ctx.restype = ctypes.c_void_p
- avio_ctx_ptr = av_alloc_ctx(buffer_ptr, self.buff_size, 0, None, read_packet_ptr, None, None)
- if not avio_ctx_ptr:
- print("Could not allocate avio context")
- clean_decoder()
- return 6
- self.format_ctx_ptr.contents.pb = avio_ctx_ptr
- open_input = lib_avformat.avformat_open_input
- ret = open_input(ctypes.byref(self.format_ctx_ptr), None, None, None)
- if ret < 0:
- print("Could not open video stream")
- clean_decoder()
- return 7
- alloc_frame = lib_avutil.av_frame_alloc
- alloc_frame.restype = ctypes.POINTER(AVFrame)
- self.frame_ptr = alloc_frame()
- packet = AVPacket()
- lib_avcodec.av_init_packet(ctypes.byref(packet))
-
- while self.should_run:
- if not lib_avformat.av_read_frame(self.format_ctx_ptr, ctypes.byref(packet)):
- ret = lib_avcodec.avcodec_send_packet(self.codec_ctx_ptr, ctypes.byref(packet))
- if ret < 0:
- print("Could not send video packet: %d" % ret)
- break
- ret = lib_avcodec.avcodec_receive_frame(self.codec_ctx_ptr, self.frame_ptr)
- if not ret:
- self.push_frame(self.frame_ptr)
- else:
- print("Could not receive video frame: %d" % ret)
- lib_avcodec.av_packet_unref(ctypes.byref(packet))
- else:
- print("Could not read packet, quit.")
- self.should_run = False
- clean_decoder()
- return 0
-
- def close_decoder(self):
- self.should_run = False
- if self.decode_thread:
- self.decode_thread.join()
- self.decode_thread = None
-
- def push_frame(self, frame_ptr):
- frame = frame_ptr.contents
- w = frame.width
- h = frame.height
- img_yuv = np.zeros((h + h // 2, w, 1), dtype=np.uint8)
- img_yuv[:h, :] = np.ctypeslib.as_array(frame.data[0], shape=(h, frame.linesize[0], 1))[:, :w]
- img_u = np.ctypeslib.as_array(frame.data[1], shape=(h // 2, frame.linesize[1], 1))[:, :w // 2]
- img_v = np.ctypeslib.as_array(frame.data[2], shape=(h // 2, frame.linesize[2], 1))[:, :w // 2]
- img_yuv[h:h + h // 4, : w // 2] = img_u[::2, :]
- img_yuv[h + h // 4:, : w // 2] = img_v[::2, :]
- img_yuv[h:h + h // 4, w // 2:] = img_u[1::2, :]
- img_yuv[h + h // 4:, w // 2:] = img_v[1::2, :]
- img = cv2.cvtColor(img_yuv, cv2.COLOR_YUV2BGR_I420)
- self.img_queue.append(img)
-
- def get_next_frame(self, latest_image=False):
- if not self.img_queue:
- return None
- else:
- img = self.img_queue.popleft()
- if latest_image:
- while self.img_queue:
- img = self.img_queue.popleft()
- return img
-
- def receive_data(self, c_size):
- while self.should_run:
- try:
- data = self.sock.recv(c_size)
- return len(data), data
- except socket.timeout:
- continue
- except:
- return 0, []
-
- def send_data(self, data):
- return self.sock.send(data)
-
- def _receive_info(self):
- dummy_byte = self.sock.recv(1)
- if not len(dummy_byte):
- raise ConnectionError("Did not receive Dummy Byte!")
- else:
- print("Connected!")
-
- device_name = self.sock.recv(64)
- device_name = device_name.decode("utf-8")
- if not len(device_name):
- raise ConnectionError("Did not receive Device Name!")
- print("Device Name: " + device_name)
-
- res = self.sock.recv(4)
- frame_width, frame_height = struct.unpack(">HH", res)
- print("WxH: " + str(frame_width) + "x" + str(frame_height))
-
-
-class NaiveScrcpyClient:
- def __init__(self, _config):
- self.config = _config
- self.lib_path = self.config.get('lib_path', 'lib')
- self.adb = self.config.get('adb_path', 'adb')
- self.port = int(self.config.get('adb_port', 61550))
-
- self.adb_sub_process = None
- self.decoder = None
- self.img_cache = None
- self.landscape = False
-
- self._connect_and_forward_scrcpy()
-
-
- def _connect_and_forward_scrcpy(self):
- try:
- print("Upload JAR...")
- adb_push = subprocess.Popen(
- [self.adb, 'push',
- 'scrcpy-server.jar',
- '/data/local/tmp/'],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- cwd=self.lib_path)
- adb_push_comm = ''.join([x.decode("utf-8") for x in adb_push.communicate() if x is not None])
-
- if "error" in adb_push_comm:
- print("Is your device/emulator visible to ADB?")
- raise Exception(adb_push_comm)
-
- subprocess.call(
- [self.adb, 'forward',
- 'tcp:%d' % self.port, 'localabstract:scrcpy'])
-
- '''
- ADB Shell is Blocking, don't wait up for it
- Args for the server are as follows:
- maxSize (integer, multiple of 8) 0
- bitRate (integer)
- tunnelForward (optional, bool) use "adb forward" instead of "adb tunnel"
- crop (optional, string) "width:height:x:y"
- sendFrameMeta (optional, bool)
-
- '''
- print("Run JAR")
- self.adb_sub_process = subprocess.Popen(
- [self.adb, 'shell',
- 'CLASSPATH=/data/local/tmp/scrcpy-server.jar',
- 'app_process', '/', 'com.genymobile.scrcpy.Server',
- str(int(self.config.get('max_size', 1280))),
- str(int(self.config.get('bit_rate', 2 ** 30))),
- "true",
- str(self.config.get('crop', '-')),
- "false"],
- )
- time.sleep(2)
- except FileNotFoundError:
- raise FileNotFoundError("Could not find ADB")
-
- def _disable_forward(self):
- subprocess.call(
- [self.adb, 'forward', '--remove',
- 'tcp:%d' % self.port])
-
- def start_loop(self):
- if self.decoder:
- return 2
- self.decoder = ScrcpyDecoder(self.config)
- try:
- self.decoder.start_decoder()
- return 0
- except ConnectionError:
- traceback.print_exc()
- self.stop_loop()
- return 1
-
- def stop_loop(self):
- if self.decoder:
- self.decoder.close_decoder()
- self.decoder = None
- if self.adb_sub_process:
- subprocess.Popen(
- [self.adb, 'shell',
- '`pkill app_process`',
- ],
- stdout=subprocess.PIPE,
- ).wait()
- self.adb_sub_process.wait()
- self.adb_sub_process = None
- self._disable_forward()
-
- def get_screen_frame(self):
- img = self.decoder.get_next_frame(True)
- if img is not None:
- self.landscape = img.shape[0] < img.shape[1]
- self.img_cache = img.copy()
- return self.img_cache
diff --git a/README.md b/README.md
index 7a1b7cf..c35105b 100644
--- a/README.md
+++ b/README.md
@@ -2,7 +2,7 @@
# 食物语自动挂机脚本
## 简介
-食物语自动挂机脚本, 主要用于客潮和活动小游戏
+食物语自动挂机脚本, 主要用于客潮~~和活动小游戏~~(太难做砍掉了绝对不是因为我懒啊啊啊啊啊啊)
有计划加入食物语做菜优化, 引用[此项目](https://github.com/ic30rs/swy_profit)
@@ -13,18 +13,21 @@ PSS: 那个活动的跑酷恶心死我了啊啊啊啊
## 食用方式
详见程序内指引
-注意若使用ADB请先确保电脑已安装ADB, 并已连接至手机
+**注意:** 若使用ADB(Scrcpy模式), 请确保系统环境变量中有ADB, 并且ADB已连接至手机
-混合模式还需将scrcpy的服务端和FFmpeg的动态链接库放至libs目录下, 详见[这里](https://github.com/LostXine/naive-scrcpy-client#to-start)
+## 改良菜谱
+目前本项目仅能在Windows上运行, 但经过简单修改就能在mac和linux上运行(~~不过我懒~~
-*嫌麻烦的话直接下载scrcpy放到libs目录就好啦 (逃*
+另外本挂机脚本实际上提供了一个框架, 经过简单修改应该也能用于其他游戏, 甚至是用于训练人工智能玩游戏(逃
+
+本项目欢迎各位空桑少主贡献代码
## 食材
- opencv-python
-- pywin32
-- PyAutoGUI
+- PyAV
+- pywin32(仅限Windows)
+- PyAutoGUI(Windows上不需要)
- pure-python-adb
-- naive-scrcpy-client(本项目自带)
- pyinstaller(如需要打包)
详见requirements.txt
@@ -32,8 +35,28 @@ PSS: 那个活动的跑酷恶心死我了啊啊啊啊
## 烹饪日记
2020/10/27 项目开始开发
-## 厨师们
-详见contributor.md
+2020/11/3 实现ADB模式
+
+2020/11/5 实现Windows原生模式
+
+2020/12/5 客潮挂机从模板匹配改为识别圆
+
+2021/2/1 大规模重构代码, 增加挂机任务类, 增加使用Scrcpy的模式(未完成), 更新版本号至V1.2
+
+2021/2/21 使用装饰器注册挂机任务, 完成Scrcpy模式, 更新版本号至V1.3
+
+## 厨师
+详见CONTRIBUTOR.md
+
+## 特别致谢
+所有第三方库的开发者
+
+Scrcpy功能参考自:
+- [py-scrcpy](https://github.com/Allong12/py-scrcpy)
+- [naive-scrcpy-client](https://github.com/LostXine/naive-scrcpy-client)
+- [android_autoclicker](https://github.com/JKookaburra/android_autoclicker)
+
+感谢以上项目的开发者
## 版权信息
本项目以MIT协议开源
diff --git a/data/kechao_title_part.png b/data/kechao_title_part.png
new file mode 100644
index 0000000..6d6d519
Binary files /dev/null and b/data/kechao_title_part.png differ
diff --git a/libs/scrcpy-server.jar b/libs/scrcpy-server.jar
index 82e080f..ab38830 100644
Binary files a/libs/scrcpy-server.jar and b/libs/scrcpy-server.jar differ
diff --git a/main.py b/main.py
index b504eb1..2b52635 100644
--- a/main.py
+++ b/main.py
@@ -7,7 +7,7 @@
import atexit
import cv2
from player import Player, PlayerADB, PlayerScrcpy, PlayerTest
-from task import Phases, Results, tasks, registerTasks, getImageCache
+from task import Phases, Results, getTasks
WINDOW_NAME = "Preview Window"
FPS = 5
@@ -16,21 +16,20 @@
task = None
def main():
- registerTasks()
setnodpi()
settitle("欢迎使用食物语挂机脚本")
print('''=============================================
-食物语挂机脚本 V1.2 作者: WC
+食物语挂机脚本 V1.3 作者: WC
本脚本仅供个人代肝使用, 严禁用于商业用途
使用本脚本造成的一切法律纠纷由使用者自行承担
项目地址: https://github.com/DawningW/swy-bot
-欢迎提交问题或是直接PR
+欢迎提交问题或者直接PR
=============================================''')
while True:
print('''>>>----------< 主 菜 单 >----------<<<
1. 原生模式(需先启动安卓虚拟机并打开食物语)
2. ADB模式(需手机连接电脑开启调试模式并打开食物语)
-3. 混合模式(使用scrcpy快速获取手机截屏并用ADB实现模拟点击)
+3. 混合模式(使用scrcpy快速获取手机截屏并模拟点击)(*推荐*)
4. 调试模式(将读取程序目录下的test.png并进行图像识别)
8. 线性规划做菜计算器
9. 用默认浏览器打开食物语wiki
@@ -64,15 +63,15 @@ def main():
def select():
while True:
print(">>>----------< 挂 机 菜 单 >----------<<<")
- for i in range(len(tasks)):
- print("{}. {}({})".format(i + 1, tasks[i].name, tasks[i].description))
+ for i in range(len(getTasks())):
+ print("{}. {}({})".format(i + 1, getTasks()[i].name, getTasks()[i].description))
print("PS: 输入其他数字退出")
try:
num = int(input("请输入序号: "))
global task
- task = tasks[num - 1]
- task.init()
+ task = getTasks()[num - 1]()
run()
+ task = None
except (ValueError, IndexError):
break
return
@@ -87,16 +86,17 @@ def run():
while not canceled:
times += 1
print("第 {} 次运行脚本: {}".format(times, task.name))
+ origin = time.perf_counter()
phase = Phases.BEGIN
while True:
- t = time.time()
+ t = time.perf_counter()
result = None
if phase == Phases.BEGIN:
- result = task.begin(player)
+ result = task.begin(player, t - origin)
elif phase == Phases.RUNNING:
- result = task.run(player)
+ result = task.run(player, t - origin)
elif phase == Phases.END:
- result = task.end(player)
+ result = task.end(player, t - origin)
else:
print("无效的阶段, 请向作者报告这个问题")
if result is None or result == Results.FAIL:
@@ -106,14 +106,15 @@ def run():
value = phase.value + 1
if value > Phases.END.value: break
else: phase = Phases(value)
- showimage(getImageCache())
- wait = 1 / FPS - (time.time() - t)
+ if task.getImageCache() is not None:
+ showimage(task.getImageCache())
+ wait = 1 / FPS - (time.perf_counter() - t)
if wait < 0:
print("严重滞后, 处理时间超出 {} ms, 发生了什么呢?".format(-int(wait * 1000)))
wait = 0
time.sleep(wait)
cv2.destroyAllWindows()
- settitle("当前挂机脚本已运行完毕 - 准备就绪")
+ settitle("食物语挂机脚本已运行完毕 - 准备就绪")
print("挂机脚本已运行完毕")
return
@@ -132,7 +133,7 @@ def showimage(image, wait = 1):
@atexit.register
def onexit():
settitle("食物语挂机脚本已结束")
- if player != None: player.end()
+ if player is not None: player.end()
print('''
=============================================
食物语挂机脚本已停止运行, 感谢您的使用, 再见!
diff --git a/player.py b/player.py
index f00bc20..0cc4276 100644
--- a/player.py
+++ b/player.py
@@ -6,10 +6,10 @@
import numpy
import cv2
import pyautogui
-import WindowsUtils as utils
+import windows
from ppadb.client import Client as ADBClient
-from NaiveScrcpyClient import NaiveScrcpyClient as ScrcpyClient
-from ImageUtils import readimage, writeimage
+from scrcpy import ScrcpyClient
+from utils import readimage, writeimage
class PlayerBase(object):
"""模拟玩家操作的基类"""
@@ -96,12 +96,12 @@ class Player(PlayerBase):
def init(self):
super().init()
- windows = [("TXGuiFoundation", "腾讯手游助手【极速傲引擎-7.1】"), ("StartupDui", "多屏协同"), ("SDL_app", None)]
+ windowsList = [("TXGuiFoundation", "腾讯手游助手【极速傲引擎-7.1】"), ("StartupDui", "多屏协同"), ("SDL_app", None)]
# 腾讯手游助手后台点击可用, 并且开放ADB端口5555, 然而获取截图时失败
# 华为多屏协同疑似直接获取光标位置, 而非从消息里读取, 所以需要激活才行, 无法后台挂机
- # Scrcpy后台挂机可用(已经提供对scrcpy的原生支持, 建议使用原生模式)
- for (classname, windowname) in windows:
- self.window = utils.findwindow(None, classname, windowname)
+ # Scrcpy后台挂机可用(已经提供对Scrcpy的原生支持, 建议使用Scrcpy模式)
+ for (classname, windowname) in windowsList:
+ self.window = windows.findwindow(None, classname, windowname)
if self.window != 0: break
if self.window == 0:
print("无法自动获取游戏窗口, 请手动获取(可以用VS的SPY++工具获取)")
@@ -109,7 +109,7 @@ def init(self):
windowname = input("请输入窗口标题: ")
if classname == "": classname = None
if windowname == "": windowname = None
- self.window = utils.findwindow(None, classname, windowname)
+ self.window = windows.findwindow(None, classname, windowname)
if (self.window == 0):
print("错误: 无法获取窗口句柄")
return False
@@ -118,8 +118,8 @@ def init(self):
print("若通过这种方式无法选中子窗口, 请直接在截图窗口按任意键退出并手动输入子窗口句柄")
hwnds = [self.window, self.child]
title = "Click a point to select child window"
- width, height = utils.getsize(self.window)
- buffer = utils.screenshot(self.window)
+ width, height = windows.getsize(self.window)
+ buffer = windows.screenshot(self.window)
image = numpy.frombuffer(buffer, dtype = "uint8")
image.shape = (height, width, 4)
cv2.cvtColor(image, cv2.COLOR_BGRA2RGB)
@@ -132,14 +132,14 @@ def init(self):
if self.child == 0:
print("遍历获取子窗口尚未编写, 请直接输入子窗口类名")
classname = input("请输入子窗口类名: ")
- if classname != '': self.child = utils.findwindow(self.window, classname, None)
+ if classname != '': self.child = windows.findwindow(self.window, classname, None)
if self.child == 0:
print("还是失败的话请直接输入句柄吧...")
str = input("请输入子窗口句柄(16进制): ")
if str == '': self.child = self.window
else: self.child = int(str, 16)
print("已成功获取子窗口句柄: {}".format(hex(self.child)))
- self.width, self.height = utils.getsize(self.child)
+ self.width, self.height = windows.getsize(self.child)
print("已获得模拟器窗口大小: {} X {}".format(self.width, self.height))
self.calcFactor()
print("已计算缩放因子: {}".format(self.factor))
@@ -149,20 +149,20 @@ def init(self):
def calcFactor(self):
# TODO DPI适配
# 算了我写不出来, 那就别适配了= =
- # dpi = utils.getdpi(self.window)
+ # dpi = windows.getdpi(self.window)
# self.width = int(self.width * dpi['x'] / 96)
# self.height = int(self.height * dpi['y'] / 96)
super().calcFactor()
return
def screenshotraw(self):
- buffer = utils.screenshot(self.child)
+ buffer = windows.screenshot(self.child)
image = numpy.frombuffer(buffer, dtype = "uint8")
image.shape = (self.height, self.width, 4)
return image
def clickraw(self, x, y):
- utils.click(self.child, int(x), int(y))
+ windows.click(self.child, int(x), int(y), True)
return
class PlayerADB(PlayerBase):
@@ -212,33 +212,37 @@ def clickraw(self, x, y):
self.device.input_tap(int(x), int(y))
return
-class PlayerScrcpy(PlayerADB):
- """使用scrcpy+ADB"""
+class PlayerScrcpy(PlayerBase):
+ """使用Scrcpy获取截屏并模拟点击"""
def init(self):
super().init()
- config = {
- "max_size": 640,
- "bit_rate": 2 ** 30,
- "crop": "-",
- "adb_path": "adb",
- "adb_port": 61550,
- "lib_path": "libs",
- "buff_size": 0x10000,
- "deque_length": 5
- }
- self.scrcpyClient = ScrcpyClient(config)
- if self.scrcpyClient.start_loop() == 0:
- return True
- else:
+ self.client = ScrcpyClient()
+ if not self.client.start():
+ print("连接失败")
return False
+ print("已成功连接至设备 {}".format(self.client.device_name))
+ self.width, self.height = self.client.resolution
+ print("已获得设备屏幕尺寸: {} X {}".format(self.width, self.height))
+ self.calcFactor()
+ print("已计算缩放因子: {}".format(self.factor))
+ return True
def end(self):
- self.scrcpyClient.stop_loop()
+ self.client.stop()
super().end()
+ return
def screenshotraw(self):
- return self.scrcpyClient.get_screen_frame()
+ image = self.client.get_next_frame(True)
+ if image is not None:
+ image = cv2.cvtColor(image, cv2.COLOR_RGB2RGBA)
+ self.lastimage = image
+ return self.lastimage
+
+ def clickraw(self, x, y):
+ self.client.tap(x, y)
+ return
class PlayerTest(PlayerBase):
"""图像识别测试"""
@@ -267,6 +271,6 @@ def clickraw(self, x, y):
def onclicked(event, x, y, flags, param):
if event == cv2.EVENT_LBUTTONDOWN:
- param[1] = utils.getwindow(param[0], x, y)
+ param[1] = windows.getwindow(param[0], x, y)
print("已点击 X: {} Y: {} 窗口句柄: {}".format(x, y, hex(param[1])))
return
diff --git a/scrcpy.py b/scrcpy.py
new file mode 100644
index 0000000..1c56979
--- /dev/null
+++ b/scrcpy.py
@@ -0,0 +1,241 @@
+# coding=utf-8
+
+import sys
+import os
+import time
+import subprocess
+import socket
+import struct
+from queue import Queue
+from threading import Thread
+import av
+
+ACTION_UP = b'\x01'
+ACTION_DOWN = b'\x00'
+ACTION_MOVE = b'\x02'
+
+class ScrcpyClient(object):
+ """Scrcpy的客户端"""
+ is_running = False
+ device_name = None
+ resolution = None
+ adb_sub_process = None
+ video_socket = None
+ control_socket = None
+ decode_thread = None
+ codec = None
+ video_data_queue = None
+
+ def __init__(self, max_size=0, bit_rate=8000000, max_fps=0, crop='-',
+ libs_path='libs', adb_path=r'adb', ip='127.0.0.1', port=27199,
+ queue_length=5):
+ """
+ :param max_size: frame width that will be broadcast from android server
+ :param bit_rate:
+ :param max_fps: 0 means not max fps
+ :param libs_path: path to 'scrcpy-server.jar'
+ :param adb_path: path to ADB
+ :param ip: scrcpy server IP
+ :param port: scrcpy server port
+ """
+ print("Init Scrcpy client")
+ self.max_size = max_size
+ self.bit_rate = bit_rate
+ self.max_fps = max_fps
+ self.crop = crop
+ self.libs_path = libs_path
+ self.adb_path = adb_path
+ self.ip = ip
+ self.port = port
+ self.video_data_queue = Queue(queue_length)
+ return
+
+ def connect_and_forward_scrcpy(self):
+ try:
+ print("Upload JAR...")
+ adb_push = subprocess.Popen(
+ [self.adb_path, 'push', 'scrcpy-server.jar', '/data/local/tmp/'],
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=self.libs_path)
+ adb_push_comm = ''.join([x.decode("utf-8") for x in adb_push.communicate() if x is not None])
+ if "error" in adb_push_comm:
+ print("Is your device/emulator visible to ADB?")
+ raise Exception(adb_push_comm)
+
+ print("Run JAR")
+ '''
+ ADB Shell is Blocking, don't wait up for it
+ Args for the server are as follows:
+ Also see: https://github.com/Genymobile/scrcpy/blob/master/app/src/server.c#L256
+ '''
+ self.adb_sub_process = subprocess.Popen(
+ [self.adb_path, 'shell',
+ 'CLASSPATH=/data/local/tmp/scrcpy-server.jar',
+ 'app_process', '/', 'com.genymobile.scrcpy.Server',
+ '1.17', # scrcpy version
+ 'info', # log level
+ str(self.max_size), # (integer, multiple of 8) max width
+ str(self.bit_rate), # (integer) bit rate
+ str(self.max_fps), # (integer) max fps
+ '-1', # lock_video_orientation_string
+ 'true', # (bool) tunnel forward: use "adb forward" instead of "adb tunnel"
+ str(self.crop), # (string) crop: "width:height:x:y"
+ 'false', # (bool) send frame meta: packet boundaries + timestamp
+ 'true', # control
+ '0', # display_id_string
+ 'false', # show_touches
+ 'true', # stay_awake
+ '-', # codec_options
+ '-' # encoder_name
+ ], stdin=subprocess.PIPE, cwd=self.libs_path
+ )
+ time.sleep(1)
+
+ print("Forward port")
+ subprocess.call([self.adb_path, 'forward', 'tcp:%d' % self.port, 'localabstract:scrcpy'])
+ time.sleep(1)
+ except FileNotFoundError:
+ raise FileNotFoundError("Could not find ADB at path: " + self.adb_path)
+ return
+
+ def disable_forward(self):
+ subprocess.call([self.adb_path, 'forward', '--remove', 'tcp:%d' % self.port])
+
+ def connect(self):
+ print("Connecting to video socket")
+ self.video_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.video_socket.connect((self.ip, self.port))
+
+ dummy_byte = self.video_socket.recv(1)
+ if not len(dummy_byte):
+ raise ConnectionError("Did not receive Dummy Byte!")
+ else:
+ print("Connected successfully!")
+
+ print("Connecting to control socket")
+ self.control_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.control_socket.connect((self.ip, self.port))
+
+ self.device_name = self.video_socket.recv(64).decode("utf-8")
+ if not len(self.device_name):
+ raise ConnectionError("Did not receive Device Name!")
+ print("Device Name: " + self.device_name)
+
+ self.resolution = struct.unpack(">HH", self.video_socket.recv(4))
+ print("Screen resolution: %dX%d" % (self.resolution[0], self.resolution[1]))
+
+ self.video_socket.setblocking(False)
+ return
+
+ def start(self):
+ if self.is_running: return False
+ self.is_running = True
+ print("Start scrcpy client")
+ try:
+ self.connect_and_forward_scrcpy()
+ self.codec = av.codec.CodecContext.create('h264', 'r')
+ self.connect()
+ if self.decode_thread is None:
+ self.decode_thread = Thread(target=self.loop, daemon=True)
+ self.decode_thread.start()
+ except:
+ self.stop()
+ raise
+ return False
+ return True
+
+ def loop(self):
+ """
+ Get raw h264 video data from video socket, parse packets, decode each
+ packet to frames, convert each frame to numpy array and put them in Queue.
+ This method should work in separate thread since it's blocking.
+ """
+ while self.is_running:
+ packets = []
+ try:
+ raw_h264 = self.video_socket.recv(0x10000)
+ if not raw_h264: continue
+ packets = self.codec.parse(raw_h264)
+ if not packets: continue
+ except socket.error as e:
+ continue
+ for packet in packets:
+ frames = self.codec.decode(packet)
+ for frame in frames:
+ if self.video_data_queue.full():
+ self.video_data_queue.get()
+ self.video_data_queue.put(frame.to_ndarray(format="bgr24"))
+ return
+
+ def get_next_frame(self, latest_image=False):
+ if self.video_data_queue and not self.video_data_queue.empty():
+ image = self.video_data_queue.get()
+ if latest_image:
+ while not self.video_data_queue.empty():
+ image = self.video_data_queue.get()
+ return image
+ return None
+
+ def build_touch_message(self, x, y, action):
+ b = bytearray(b'\x02')
+ b += action
+ b += b'\xff\xff\xff\xff\xff\xff\xff\xff'
+ b += struct.pack('>I', int(x))
+ b += struct.pack('>I', int(y))
+ b += struct.pack('>h', int(self.resolution[0]))
+ b += struct.pack('>h', int(self.resolution[1]))
+ b += b'\xff\xff' # Pressure
+ b += b'\x00\x00\x00\x01' # Event button primary
+ return bytes(b)
+
+ def tap(self, x, y):
+ self.control_socket.send(self.build_touch_message(x, y, ACTION_DOWN))
+ self.control_socket.send(self.build_touch_message(x, y, ACTION_UP))
+
+ def swipe(self, start_x, start_y, end_x, end_y, move_step_length=5, move_steps_delay=0.005):
+ self.control_socket.send(self.build_touch_message(start_x, start_y, self.ACTION_DOWN))
+ next_x = start_x
+ next_y = start_y
+ if end_x > self.resolution[0]:
+ end_x = self.resolution[0]
+ if end_y > self.resolution[1]:
+ end_y = self.resolution[1]
+ decrease_x = True if start_x > end_x else False
+ decrease_y = True if start_y > end_y else False
+ while True:
+ if decrease_x:
+ next_x -= move_step_length
+ if next_x < end_x:
+ next_x = end_x
+ else:
+ next_x += move_step_length
+ if next_x > end_x:
+ next_x = end_x
+ if decrease_y:
+ next_y -= move_step_length
+ if next_y < end_y:
+ next_y = end_y
+ else:
+ next_y += move_step_length
+ if next_y > end_y:
+ next_y = end_y
+ self.control_socket.send(self.build_touch_message(next_x, next_y, self.ACTION_MOVE))
+ if next_x == end_x and next_y == end_y:
+ self.control_socket.send(self.build_touch_message(next_x, next_y, self.ACTION_UP))
+ break
+ time.sleep(move_steps_delay)
+
+ def stop(self):
+ if not self.is_running: return
+ self.is_running = False
+ print("Stop scrcpy client")
+ if self.decode_thread is not None:
+ self.decode_thread.join()
+ self.video_data_queue = None
+ if self.video_socket is not None:
+ self.video_socket.close()
+ self.video_socket = None
+ if self.control_socket is not None:
+ self.control_socket.close()
+ self.control_socket = None
+ self.disable_forward()
+ return
diff --git a/swy-bot.pyproj b/swy-bot.pyproj
index 0eb7c99..797197a 100644
--- a/swy-bot.pyproj
+++ b/swy-bot.pyproj
@@ -37,21 +37,22 @@
-
-
+
+ Code
+
+
Code
Code
-
Code
Code
-
+
Code
diff --git a/task.py b/task.py
index e858d38..56032c5 100644
--- a/task.py
+++ b/task.py
@@ -6,118 +6,127 @@
import math
import numpy
import cv2
-from ImageUtils import readimage
+from utils import readimage
class Phases(IntEnum):
+ """任务执行的阶段"""
BEGIN = 0
RUNNING = 1
END = 2
class Results(Enum):
+ """任务执行的结果"""
PASS = auto()
SUCCESS = auto()
FAIL = auto()
tasks = []
-image = None
+
+def getTasks():
+ """获取任务列表"""
+ return tasks
+
+def registerTask(task, name, desc):
+ """注册任务(已弃用,请使用装饰器注册任务)"""
+ task.name = name
+ task.description = desc
+ tasks.append(task)
+ return
+
+def Task(name, desc):
+ """用于自动注册任务的装饰器"""
+ def decorator(cls):
+ registerTask(cls, name, desc)
+ return cls
+ return decorator
class TaskBase(object):
"""自动挂机任务的基类"""
name = ""
description = ""
- def __init__(self, name, desc):
- self.name = name
- self.description = desc
+ def __init__(self):
+ """初始化"""
+ self.image = None
return
- def init(self):
- return
-
- def begin(self, player):
+ def begin(self, player, t):
"""开始任务"""
return Results.FAIL
- def run(self, player):
+ def run(self, player, t):
"""执行任务"""
- return Results.PASS
+ return Results.FAIL
- def end(self, player):
+ def end(self, player, t):
"""结束任务"""
- return Results.PASS
+ return Results.FAIL
+ def getImageCache(self):
+ """获取用于预览的图像,如果不想显示请返回None"""
+ return self.image
+
+@Task("自动客潮", "请将界面停留在餐厅")
class TaskKeChao(TaskBase):
"""客潮自动化"""
- def __init__(self, name, desc):
- super().__init__(name, desc)
+ def __init__(self):
+ super().__init__()
self.templateButton = readimage("kechao_btn")
self.templateDish = readimage("kechao_dish_part")
- self.templateTitle = readimage("kechao_title")
- return
+ self.templateTitle = readimage("kechao_title_part")
- def init(self):
- self.watchdog = time.time()
+ self.lastTime = 0
self.dialog = False
self.pointCache = []
return
- def begin(self, player):
+ def begin(self, player, t):
"""需要玩家位于餐厅界面"""
- global image
- image = player.screenshot()
+ self.image = player.screenshot()
if not self.dialog:
- points = findcircle(image, 25)
+ points = findcircle(self.image, 25)
for x, y in points:
- if x > (image.shape[1] * 0.9) and y < (image.shape[0] * 0.2):
+ if x > (self.image.shape[1] * 0.9) and y < (self.image.shape[0] * 0.2):
player.clickaround(x, y)
self.dialog = True
else:
- points = findtemplate(image, self.templateButton)
+ points = findtemplate(self.image, self.templateButton)
for x, y in points:
player.clickaround(x, y)
time.sleep(1)
return Results.SUCCESS
return Results.PASS
- def run(self, player):
+ def run(self, player, t):
"""客潮挂机中"""
- global image
- image = player.screenshot()
- points = findcircle(image, 25)
+ self.image = player.screenshot()
+ points = findcircle(self.image, 25)
points2 = []
for x, y in points:
- if x > (image.shape[1] * 0.9): continue
- if y > (image.shape[0] * 0.8): return Results.SUCCESS
- cv2.circle(image, (x, y), 25, (0, 0, 255), 3)
+ if x > (self.image.shape[1] * 0.9): continue
+ if y > (self.image.shape[0] * 0.8): return Results.SUCCESS
+ cv2.circle(self.image, (x, y), 25, (0, 0, 255), 3)
points2.append((x, y))
if len(points2) > 0:
x, y = random.choice(points2)
player.clickaround(x, y)
return Results.PASS
- def end(self, player):
+ def end(self, player, t):
"""结束客潮"""
- global image
- image = player.screenshot()
- points = findtemplate(image, self.templateTitle)
+ self.image = player.screenshot()
+ points = findtemplate(self.image, self.templateTitle)
for x, y in points:
+ time.sleep(2)
player.clickaround(x, y)
- time.sleep(1)
+ time.sleep(2)
return Results.SUCCESS
return Results.PASS
+@Task("自动小游戏", "尚未编写")
class TaskMiniGames(TaskBase):
"""活动小游戏 算了放弃了 毁灭吧赶紧的"""
-def registerTasks():
- tasks.clear()
- tasks.append(TaskKeChao("自动客潮", "请将界面停留在餐厅"))
- tasks.append(TaskMiniGames("自动小游戏", "尚未编写"))
- return
-
-def getImageCache():
- return image
-
def findtemplate(image, template, threshold = 0.75, outline = False):
theight, twidth = template.shape[:2]
result = cv2.matchTemplate(image, template, cv2.TM_CCOEFF_NORMED)
diff --git a/ImageUtils.py b/utils.py
similarity index 100%
rename from ImageUtils.py
rename to utils.py
diff --git a/WindowsUtils.py b/windows.py
similarity index 100%
rename from WindowsUtils.py
rename to windows.py