From 28f915c992685c8e308a85e1883f2deb77475fa5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=90=A6?= Date: Wed, 10 Apr 2024 18:29:37 +0800 Subject: [PATCH 01/16] If mss screenshot fails, use the old screenshot method instead (cherry picked from commit 7f6c621a5e87cd3da261e90a684f7a3f3b783c6c) --- airtest/core/win/win.py | 51 ++++++++++++++++++++++++++++++++++++----- 1 file changed, 45 insertions(+), 6 deletions(-) diff --git a/airtest/core/win/win.py b/airtest/core/win/win.py index eb2c92af6..a0fb72e56 100644 --- a/airtest/core/win/win.py +++ b/airtest/core/win/win.py @@ -24,6 +24,7 @@ from airtest.core.settings import Settings as ST from airtest.utils.snippet import get_absolute_coordinate from airtest.utils.logger import get_logger +from airtest.core.win.screen import screenshot LOGGING = get_logger(__name__) @@ -107,6 +108,40 @@ def shell(self, cmd): """ return subprocess.check_output(cmd, shell=True) + def snapshot_old(self, filename=None, quality=10, max_size=None): + """ + Take a screenshot and save it in ST.LOG_DIR folder + + Args: + filename: name of the file to give to the screenshot, {time}.jpg by default + quality: The image quality, integer in range [1, 99] + max_size: the maximum size of the picture, e.g 1200 + + Returns: + display the screenshot + + """ + if self.handle: + screen = screenshot(filename, self.handle) + else: + screen = screenshot(filename) + if self.app: + rect = self.get_rect() + rect = self._fix_image_rect(rect) + screen = aircv.crop_image(screen, [rect.left, rect.top, rect.right, rect.bottom]) + if not screen.any(): + if self.app: + rect = self.get_rect() + rect = self._fix_image_rect(rect) + screen = aircv.crop_image(screenshot(filename), [rect.left, rect.top, rect.right, rect.bottom]) + if self._focus_rect != (0, 0, 0, 0): + height, width = screen.shape[:2] + rect = (self._focus_rect[0], self._focus_rect[1], width + self._focus_rect[2], height + self._focus_rect[3]) + screen = aircv.crop_image(screen, rect) + if filename: + aircv.imwrite(filename, screen, quality, max_size=max_size) + return screen + def snapshot(self, filename=None, quality=10, max_size=None): """ Take a screenshot and save it in ST.LOG_DIR folder @@ -127,12 +162,16 @@ def snapshot(self, filename=None, quality=10, max_size=None): "height": rect.bottom - rect.top, "monitor": 1} else: monitor = self.screen.monitors[0] - with mss.mss() as sct: - sct_img = sct.grab(monitor) - screen = numpy.array(sct_img, dtype=numpy.uint8)[...,:3] - if filename: - aircv.imwrite(filename, screen, quality, max_size=max_size) - return screen + try: + with mss.mss() as sct: + sct_img = sct.grab(monitor) + screen = numpy.array(sct_img, dtype=numpy.uint8)[...,:3] + if filename: + aircv.imwrite(filename, screen, quality, max_size=max_size) + return screen + except: + # if mss.exception.ScreenShotError: gdi32.GetDIBits() failed. + return self.snapshot_old(filename, quality, max_size) def _fix_image_rect(self, rect): """Fix rect in image.""" From cf6ef2bbfb82f8dc69855507ab54c6895f910152 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=90=A6?= Date: Tue, 16 Apr 2024 15:28:57 +0800 Subject: [PATCH 02/16] fix: ignore pywintypes.error in set_foreground when connec to windows (cherry picked from commit 4ecbe287785f73c4893164dd590c8b3434ae4420) --- airtest/core/win/win.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/airtest/core/win/win.py b/airtest/core/win/win.py index a0fb72e56..5b6b9cf45 100644 --- a/airtest/core/win/win.py +++ b/airtest/core/win/win.py @@ -90,7 +90,12 @@ def connect(self, handle=None, **kwargs): self.app = self._app.connect(**kwargs) self._top_window = self.app.top_window().wrapper_object() if kwargs.get("foreground", True) in (True, "True", "true"): - self.set_foreground() + try: + self.set_foreground() + except pywintypes.error as e: + # pywintypes.error: (0, 'SetForegroundWindow', 'No error message is available') + # If you are not running with administrator privileges, it may fail, but this error can be ignored. + pass def shell(self, cmd): """ From d8ad6fb86057db14c7a42f6aa32f1e2a71a0e94d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=90=A6?= Date: Mon, 27 May 2024 15:17:56 +0800 Subject: [PATCH 03/16] The Android interface supports pushing files and folders to a specified path. (cherry picked from commit c4bf16e34ddb195d9a9d4b81282734486a627998) --- airtest/core/android/adb.py | 78 ++++++++++++++++++++++++++++--------- tests/test_adb.py | 22 ++++++++++- 2 files changed, 81 insertions(+), 19 deletions(-) diff --git a/airtest/core/android/adb.py b/airtest/core/android/adb.py index 849d70c78..b66e79d77 100644 --- a/airtest/core/android/adb.py +++ b/airtest/core/android/adb.py @@ -25,6 +25,7 @@ from airtest.utils.snippet import get_std_encoding, reg_cleanup, split_cmd, make_file_executable LOGGING = get_logger(__name__) +TMP_PATH = "/data/local/tmp" # Android's temporary file directory class ADB(object): @@ -470,16 +471,10 @@ def sdk_version(self): def push(self, local, remote): """ - Perform `adb push` command - - Note: - If there is a space (or special symbol) in the file name, it will be forced to add escape characters, - and the new file name will be added with quotation marks and returned as the return value - - 注意:文件名中如果带有空格(或特殊符号),将会被强制增加转义符,并将新的文件名添加引号,作为返回值返回 + Push file or folder to the specified directory to the device Args: - local: local file to be copied to the device + local: local file or folder to be copied to the device remote: destination on the device where the file will be copied Returns: @@ -495,16 +490,63 @@ def push(self, local, remote): "/data/local/tmp/test\ space.txt" >>> adb.shell("rm " + new_name) + >>> adb.push("test_dir", "/sdcard/Android/data/com.test.package/files") + >>> adb.push("test_dir", "/sdcard/Android/data/com.test.package/files/test_dir") + """ - local = decode_path(local) # py2 - if os.path.isfile(local) and os.path.splitext(local)[-1] != os.path.splitext(remote)[-1]: - # If remote is a folder, add the filename and escape - filename = os.path.basename(local) - # Add escape characters for spaces, parentheses, etc. in filenames - filename = re.sub(r"[ \(\)\&]", lambda m: "\\" + m.group(0), filename) - remote = '%s/%s' % (remote, filename) - self.cmd(["push", local, remote], ensure_unicode=False) - return '\"%s\"' % remote + _, ext = os.path.splitext(remote) + if ext: + # The target path is a file + dst_parent = os.path.dirname(remote) + else: + dst_parent = remote + + # If the target file already exists, delete it first to avoid overwrite failure + src_filename = os.path.basename(local) + _, src_ext = os.path.splitext(local) + if src_ext: + dst_path = f"{dst_parent}/{src_filename}" + else: + if src_filename == os.path.basename(remote): + dst_path = remote + else: + dst_path = f"{dst_parent}/{src_filename}" + try: + self.shell(f"rm -r {dst_path}") + except: + pass + + # If the target folder has multiple levels that have never been created, try to create them + try: + self.shell(f"mkdir -p {dst_parent}") + except: + pass + + # Push the file to the tmp directory to avoid permission issues + tmp_path = f"{TMP_PATH}/{src_filename}" + try: + self.cmd(["push", local, tmp_path]) + except: + self.cmd(["push", local, dst_parent]) + else: + try: + if src_ext: + try: + self.shell(f'mv "{tmp_path}" "{remote}"') + except: + self.shell(f'mv "{tmp_path}" "{remote}"') + else: + try: + self.shell(f'cp -frp "{tmp_path}/*" "{remote}"') + except: + self.shell(f'mv "{tmp_path}" "{remote}"') + finally: + try: + if TMP_PATH != dst_parent: + self.shell(f'rm -r "{tmp_path}"') + except: + pass + return dst_path def pull(self, remote, local): """ @@ -921,7 +963,7 @@ def exists_file(self, filepath): """ try: - out = self.shell(["ls", filepath]) + out = self.shell("ls \"%s\"" % filepath) except AdbShellError: return False else: diff --git a/tests/test_adb.py b/tests/test_adb.py index fe6b39ca8..80fee54c2 100644 --- a/tests/test_adb.py +++ b/tests/test_adb.py @@ -124,7 +124,7 @@ def test_push_file(file_path, des_path): print(des_file) self.assertIsNotNone(des_file) self.assertTrue(self.adb.exists_file(des_file)) - self.adb.shell("rm " + des_file) + self.adb.shell("rm -r \"" + des_file + "\"") tmpdir = "/data/local/tmp" test_push_file(IMG, tmpdir) @@ -132,6 +132,7 @@ def test_push_file(file_path, des_path): imgname = os.path.basename(IMG) tmpimgpath = tmpdir + "/" + imgname test_push_file(IMG, tmpimgpath) + test_push_file(IMG, tmpdir) # 测试空格+特殊字符+中文 test_space_img = os.path.join(os.path.dirname(IMG), "space " + imgname) @@ -146,6 +147,25 @@ def test_push_file(file_path, des_path): test_push_file(test_img, tmpdir + "/" + os.path.basename(test_img)) try_remove(test_img) + # 测试非临时目录(部分高版本手机有权限问题,不允许直接push) + dst_path = "/sdcard/Android/data/com.netease.nie.yosemite/files" + test_push_file(IMG, dst_path) + test_img = os.path.join(os.path.dirname(IMG), imgname + "中文 (1)") + shutil.copy(IMG, test_img) + test_push_file(test_img, dst_path) + + # 推送文件夹 /test push 到 目标路径 + os.makedirs("test push", exist_ok=True) + shutil.copy(IMG, "test push/" + imgname) + test_push_file("test push", dst_path) + shutil.rmtree("test push") + + # 推送文件夹 /test push 到 目标路径/test push + os.makedirs("test push", exist_ok=True) + shutil.copy(IMG, "test push/" + imgname) + test_push_file("test push", dst_path + "/test") + shutil.rmtree("test push") + def test_pull(self): tmpdir = "/data/local/tmp" imgname = os.path.basename(IMG) From 1119e2d8e7bc83de2c05b4d5ee429fbc5ffa8839 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=90=A6?= Date: Fri, 31 May 2024 17:27:46 +0800 Subject: [PATCH 04/16] fix exists file error (cherry picked from commit b639798657fe5e23e267c356570e4a4c5f3b7d0d) --- airtest/core/android/adb.py | 2 +- tests/test_adb.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/airtest/core/android/adb.py b/airtest/core/android/adb.py index b66e79d77..72f4c574f 100644 --- a/airtest/core/android/adb.py +++ b/airtest/core/android/adb.py @@ -964,7 +964,7 @@ def exists_file(self, filepath): """ try: out = self.shell("ls \"%s\"" % filepath) - except AdbShellError: + except (AdbShellError, AdbError): return False else: return not ("No such file or directory" in out) diff --git a/tests/test_adb.py b/tests/test_adb.py index 80fee54c2..826270f26 100644 --- a/tests/test_adb.py +++ b/tests/test_adb.py @@ -158,6 +158,7 @@ def test_push_file(file_path, des_path): os.makedirs("test push", exist_ok=True) shutil.copy(IMG, "test push/" + imgname) test_push_file("test push", dst_path) + test_push_file("test push", tmpdir) shutil.rmtree("test push") # 推送文件夹 /test push 到 目标路径/test push From 70b1264bca7863060d5ce213f583c074ce0f0fa9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=90=A6?= Date: Wed, 19 Jun 2024 15:14:27 +0800 Subject: [PATCH 05/16] add: Add interface for push and pull, support both iOS and Android. (cherry picked from commit ea33f4c4baab0c9dffa584f2571e23fa18ffc11b) --- airtest/core/android/adb.py | 4 +++- airtest/core/android/android.py | 35 +++++++++++++++++++++++++++++++++ airtest/core/api.py | 35 +++++++++++++++++++++++++++++++++ 3 files changed, 73 insertions(+), 1 deletion(-) diff --git a/airtest/core/android/adb.py b/airtest/core/android/adb.py index 72f4c574f..f0cc006e2 100644 --- a/airtest/core/android/adb.py +++ b/airtest/core/android/adb.py @@ -548,7 +548,7 @@ def push(self, local, remote): pass return dst_path - def pull(self, remote, local): + def pull(self, remote, local=""): """ Perform `adb pull` command @@ -563,6 +563,8 @@ def pull(self, remote, local): Returns: None """ + if not local: + local = os.path.basename(remote) local = decode_path(local) # py2 if PY3: # If python3, use Path to force / convert to \ diff --git a/airtest/core/android/android.py b/airtest/core/android/android.py index 300284141..b7ad41aba 100644 --- a/airtest/core/android/android.py +++ b/airtest/core/android/android.py @@ -1030,6 +1030,41 @@ def set_clipboard(self, text): """ self.yosemite_ext.set_clipboard(text) + def push(self, local, remote): + """ + Push file to the device + + Args: + local: local file or folder to be copied to the device + remote: destination on the device where the file will be copied + + Returns: + The file path saved in the phone may be enclosed in quotation marks, eg. '"test\ file.txt"' + + Examples: + >>> dev = connect_device("android:///") + >>> dev.push("test.txt", "/sdcard/test.txt") + + """ + return self.adb.push(local, remote) + + def pull(self, remote, local=""): + """ + Pull file from the device + + Args: + remote: remote file to be downloaded from the device + local: local destination where the file will be downloaded from the device, if not specified, the current directory is used + + Returns: + None + + Examples: + >>> dev = connect_device("android:///") + >>> dev.pull("/sdcard/test.txt", "rename.txt") + """ + return self.adb.pull(remote, local=local) + def _register_rotation_watcher(self): """ Register callbacks for Android and minicap when rotation of screen has changed diff --git a/airtest/core/api.py b/airtest/core/api.py index a2e4a8bca..0f0be053d 100644 --- a/airtest/core/api.py +++ b/airtest/core/api.py @@ -724,6 +724,41 @@ def paste(*args, **kwargs): G.DEVICE.paste(*args, **kwargs) +@logwrap +def push(local, remote, *args, **kwargs): + """ + Push file from local to remote + + :param local: local file path + :param remote: remote file path + :return: filename of the pushed file + :platforms: Android, iOS + :Example: + + >>> connect_device("android:///") + >>> push(r"D:\demo\test.text", "/data/local/tmp/test.text") + + """ + return G.DEVICE.push(local, remote, *args, **kwargs) + + +@logwrap +def pull(remote, local, *args, **kwargs): + """ + Pull file from remote to local + + :param remote: remote file path + :param local: local file path + :return: filename of the pulled file + :platforms: Android, iOS + :Example: + + >>> connect_device("android:///") + >>> pull("/data/local/tmp/test.text", r"D:\demo\test.text") + + """ + return G.DEVICE.pull(remote, local, *args, **kwargs) + """ Assertions: see airtest/core/assertions.py """ From 8c2b44f173d0534e2a2601c8264f19763f87f64b Mon Sep 17 00:00:00 2001 From: ZSCharlie <> Date: Thu, 20 Jun 2024 14:51:29 +0800 Subject: [PATCH 06/16] =?UTF-8?q?=E5=A2=9E=E5=8A=A0IOS=20push=20pull=20ls?= =?UTF-8?q?=20rm=20mkdir=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit (cherry picked from commit 5c30f0302051b9428a040ecf39d40ce990e449f4) --- airtest/core/ios/ios.py | 313 ++++++++++++++++++++++++++++++++++++- airtest/utils/logwraper.py | 6 + tests/test_ios.py | 110 ++++++++++++- 3 files changed, 420 insertions(+), 9 deletions(-) diff --git a/airtest/core/ios/ios.py b/airtest/core/ios/ios.py index 7e69ee854..deb29838a 100644 --- a/airtest/core/ios/ios.py +++ b/airtest/core/ios/ios.py @@ -11,6 +11,7 @@ import base64 import inspect import logging +import pathlib import traceback from logzero import setup_logger from functools import wraps @@ -31,7 +32,7 @@ from airtest.core.settings import Settings as ST from airtest.aircv.screen_recorder import ScreenRecorder, resize_by_max, get_max_size from airtest.core.error import LocalDeviceError, AirtestError - +from airtest.core.helper import logwrap LOGGING = get_logger(__name__) @@ -57,7 +58,7 @@ def wrapper(self, *args, **kwargs): except: time.sleep(0.5) continue - raise + raise AirtestError("Failed to re-acquire session.") return wrapper @@ -96,6 +97,18 @@ def decorator_wrapper(cls): return cls return decorator_wrapper +def format_file_list(file_list): + formatted_list = [] + for file in file_list: + file_info = { + 'type': 'Directory' if file[0] == 'd' else 'File', + 'size': file[1], + 'last_modified': file[2].strftime('%Y-%m-%d %H:%M:%S'), + 'name': file[3] + } + formatted_list.append(file_info) + + return formatted_list @add_decorator_to_methods(decorator_pairing_dialog) class TIDevice: @@ -270,7 +283,187 @@ def ps_wda(udid): @staticmethod def xctest(udid, wda_bundle_id): return BaseDevice(udid, Usbmux()).xctest(fuzzy_bundle_id=wda_bundle_id, logger=setup_logger(level=logging.INFO)) + + @staticmethod + def push(udid, local_path, device_path, bundle_id=None, timeout=None): + """ + Pushes a file or a directory from the local machine to the iOS device. + Args: + udid (str): The UDID of the iOS device. + device_path (str): The directory path on the iOS device where the file or directory will be pushed. + local_path (str): The local path of the file or directory to be pushed. + bundle_id (str, optional): The bundle ID of the app. If provided, the file or directory will be pushed to the app's sandbox container. Defaults to None. + timeout (int, optional): The timeout in seconds for the remote device operation. Defaults to None. + + """ + try: + if bundle_id: + sync = BaseDevice(udid, Usbmux()).app_sync(bundle_id) + else: + sync = BaseDevice(udid, Usbmux()).sync + + if os.path.isfile(local_path): + file_name = os.path.basename(local_path) + device_path = os.path.join(device_path, file_name) + device_path = device_path.replace("\\", "/") + with open(local_path, "rb") as f: + content = f.read() + sync.push_content(device_path, content) + elif os.path.isdir(local_path): + device_path = os.path.join(device_path, os.path.basename(local_path)) + device_path = device_path.replace("\\", "/") + sync.mkdir(device_path) + for root, dirs, files in os.walk(local_path): + for directory in dirs: + dir_path = os.path.join(root, directory) + relative_dir_path = os.path.relpath(dir_path, local_path) + device_dir_path = os.path.join(device_path, relative_dir_path) + device_dir_path = device_dir_path.replace("\\", "/") + sync.mkdir(device_dir_path) + for file_name in files: + file_path = os.path.join(root, file_name) + relative_path = os.path.relpath(file_path, local_path) + device_file_path = os.path.join(device_path, relative_path) + device_file_path = device_file_path.replace("\\", "/") + with open(file_path, "rb") as f: + content = f.read() + sync.push_content(device_file_path, content) + print(f"pushed {local_path} to {device_path}") + except Exception as e: + raise AirtestError(f"Failed to push {local_path} to {device_path}.") + + @staticmethod + def pull(udid, device_path, local_path, bundle_id=None, timeout=None): + """ + Pulls a file or directory from the iOS device to the local machine. + + Args: + udid (str): The UDID of the iOS device. + device_path (str): The path of the file or directory on the iOS device. + Remote devices can only be file paths. + local_path (str): The destination path on the local machine. + Remote devices can only be file paths. + bundle_id (str, optional): The bundle ID of the app. If provided, the file or directory will be pulled from the app's sandbox. Defaults to None. + timeout (int, optional): The timeout in seconds for the remote device operation. Defaults to None. + + """ + def _is_dir(remote_path): + remote_path = remote_path.rstrip("\\/") + remote_path_dir, remote_path_base = os.path.split(remote_path) + ret = TIDevice.ls(udid, remote_path_dir, bundle_id) + + for i in ret: + if i['name'].rstrip('/') == remote_path_base: + return i['type'].lower() == 'directory' + return False + + try: + if bundle_id: + sync = BaseDevice(udid, Usbmux()).app_sync(bundle_id) + else: + sync = BaseDevice(udid, Usbmux()).sync + + if _is_dir(device_path): + os.makedirs(local_path, exist_ok=True) + + src = pathlib.Path(device_path) + dst = pathlib.Path(local_path) + if dst.is_dir() and src.name and sync.stat(src).is_dir(): + dst = dst.joinpath(src.name) + + sync.pull(src, dst) + print("pulled", src, "->", dst) + except Exception as e: + raise AirtestError(f"Failed to pull {device_path} to {local_path}.") + + @staticmethod + def rm(udid, remote_path, bundle_id=None, is_dir=False): + """ + Removes a file or directory from the iOS device. + + Args: + udid (str): The UDID of the iOS device. + remote_path (str): The path of the file or directory on the iOS device. + bundle_id (str, optional): The bundle ID of the app. If provided, the file or directory will be removed from the app's sandbox. Defaults to None. + is_dir (bool, optional): Indicates whether the path is a directory. Defaults to False. + """ + def _remove_folder(udid, folder_path, bundle_id): + folder_path = folder_path.replace("\\", "/") + for file_info in TIDevice.ls(udid, folder_path, bundle_id): + if file_info['type'] == 'Directory': + _remove_folder(udid, os.path.join(folder_path, file_info['name']), bundle_id) + else: + sync.remove(os.path.join(folder_path, file_info['name'])) + sync.remove(folder_path) + + if bundle_id: + sync = BaseDevice(udid, Usbmux()).app_sync(bundle_id) + else: + sync = BaseDevice(udid, Usbmux()).sync + + if is_dir: + if not remote_path.endswith("/"): + remote_path += "/" + _remove_folder(udid, remote_path, bundle_id) + else: + sync.remove(remote_path) + status = sync.remove(remote_path) + if status == 0: + print("removed", remote_path) + else: + raise AirtestError(f"<{status.name} {status.value}> Failed to remove {remote_path}") + + @staticmethod + def ls(udid, remote_path, bundle_id=None): + """ + List files and directories in the specified path on the iOS device. + + Args: + udid (str): The UDID of the iOS device. + remote_path (str): The path on the iOS device. + bundle_id (str, optional): The bundle ID of the app. Defaults to None. + + Returns: + list: A list of files and directories in the specified path. + """ + try: + file_list = [] + if bundle_id: + sync = BaseDevice(udid, Usbmux()).app_sync(bundle_id) + else: + sync = BaseDevice(udid, Usbmux()).sync + + for file_info in sync.listdir_info(remote_path): + filename = file_info.st_name + if file_info.is_dir(): + filename = filename + "/" + file_list.append(['d' if file_info.is_dir() else '-', file_info.st_size, file_info.st_mtime, filename]) + file_list = format_file_list(file_list) + return file_list + except Exception as e: + raise AirtestError(f"Failed to list files and directories in {remote_path}.") + + @staticmethod + def mkdir(udid, remote_path, bundle_id=None): + """ + Create a directory on the iOS device. + + Args: + udid (str): The UDID of the iOS device. + remote_path (str): The path of the directory to be created on the iOS device. + bundle_id (str, optional): The bundle ID of the app. Defaults to None. + """ + if bundle_id: + sync = BaseDevice(udid, Usbmux()).app_sync(bundle_id) + else: + sync = BaseDevice(udid, Usbmux()).sync + + status = sync.mkdir(remote_path) + if int(status) == 0: + print("created", remote_path) + else: + raise AirtestError(f"<{status.name} {status.value}> Failed to create directory {remote_path}") @add_decorator_to_methods(decorator_retry_session) class IOS(Device): @@ -696,12 +889,17 @@ def _quick_click(self, x, y, duration): x, y = self.driver._percent2pos(x, y) data = {'x': x, 'y': y, 'duration': duration} # 为了兼容改动直接覆盖原生接口的自制版wda。 + try: - return self.driver._session_http.post('/wda/tap', data=data) + self.driver._session_http.post('/wda/deviceTap', data=data) + #如果找不到接口说明是低版本的wda,低于1.3版本没有此接口 except wda.WDARequestError as e: - if e.status == 110: - self.driver.click(x, y, duration) - + try: + return self.driver._session_http.post('/wda/tap', data=data) + except wda.WDARequestError as e: + if e.status == 110: + self.driver.click(x, y, duration) + def double_click(self, pos): x, y = self._transform_xy(pos) self.driver.double_tap(x, y) @@ -767,7 +965,12 @@ def _quick_swipe(self, x1, y1, x2, y2, delay): data = dict(fromX=x1, fromY=y1, toX=x2, toY=y2, delay=delay) # 为了兼容改动直接覆盖原生接口的自制版wda。 try: - return self.driver._session_http.post('/wda/swipe', data=data) + if self.using_ios_tagent: + try: + self.driver._session_http.post('/wda/deviceSwipe', data=data) + #如果找不到接口说明是低版本的wda,低于1.3版本没有此接口 + except wda.WDARequestError as e: + return self.driver._session_http.post('/wda/swipe', data=data) except wda.WDARequestError as e: if e.status == 110: self.driver.swipe(x1, y1, x2, y2) @@ -1315,3 +1518,99 @@ def stop_recording(self,): LOGGING.info("stopping recording") self.recorder.stop() return None + + def push(self, local_path, remote_path, bundle_id=None, timeout=None): + """ + Pushes a file from the local machine to the iOS device. + + Args: + remote_path (str): The path on the iOS device where the file will be saved. + local_path (str): The path of the file on the local machine. + bundle_id (str, optional): The bundle identifier of the app. Defaults to None. + timeout (int, optional): The timeout in seconds for the remote device operation. Defaults to None. + + Raises: + LocalDeviceError: If the device is remote. + """ + if not self.is_local_device: + raise LocalDeviceError() + TIDevice.push(self.udid, local_path, remote_path, bundle_id=bundle_id) + + def pull(self, remote_path, local_path, bundle_id=None, timeout=None): + """ + Pulls a file or directory from the iOS device to the local machine. + + Args: + remote_path (str): The path of the file or directory on the iOS device. + local_path (str): The path where the file or directory will be saved on the local machine. + bundle_id (str, optional): The bundle identifier of the app. Defaults to None. Required for remote devices. + timeout (int, optional): The timeout in seconds for the remote device operation. Defaults to None. + + Raises: + LocalDeviceError: If the device is remote. + """ + if not self.is_local_device: + raise LocalDeviceError() + TIDevice.pull(self.udid, remote_path, local_path, bundle_id=bundle_id, timeout=timeout) + + @logwrap + def ls(self, remote_path, bundle_id=None): + """ + List files and directories in the specified remote path on the iOS device. + + Args: + remote_path (str): The remote path to list. + bundle_id (str, optional): The bundle ID of the app. Defaults to None. Required for remote devices. + + Returns: + list: A list of files and directories in the remote path. Each item in the list is a dictionary with the following keys: + - 'type': The type of the item. This can be 'Directory' or 'File'. + - 'size': The size of the item in bytes. + - 'last_modified': The last modification time of the item, in the format 'YYYY-MM-DD HH:MM:SS'. + - 'name': The name of the item, including the path relative to `remote_path`. + + Example: + [ + {'type': 'Directory', 'size': 1024, 'last_modified': 'YYYY-MM-DD HH:MM:SS', 'name': 'example_directory/'}, + {'type': 'File', 'size': 2048, 'last_modified': 'YYYY-MM-DD HH:MM:SS', 'name': 'example_file.txt'} + ] + + Raises: + LocalDeviceError: If the device is remote. + """ + if not self.is_local_device: + raise LocalDeviceError() + return TIDevice.ls(self.udid, remote_path, bundle_id=bundle_id) + + @logwrap + def rm(self, remote_path, bundle_id=None, is_dir=False): + """ + Remove a file or directory from the iOS device. + + Args: + remote_path (str): The remote path to remove. + bundle_id (str, optional): The bundle ID of the app. Defaults to None. + is_dir (bool, optional): True if the remote path is a directory. Defaults to False. + + Raises: + LocalDeviceError: If the device is remote. + """ + if not self.is_local_device: + raise LocalDeviceError() + TIDevice.rm(self.udid, remote_path, bundle_id=bundle_id, is_dir=is_dir) + + @logwrap + def mkdir(self, remote_path, bundle_id=None): + """ + Create a directory on the iOS device. + + Args: + remote_path (str): The remote path to create. + bundle_id (str, optional): The bundle ID of the app. Defaults to None. + + Raises: + LocalDeviceError: If the device is remote. + """ + if not self.is_local_device: + raise LocalDeviceError() + TIDevice.mkdir(self.udid, remote_path, bundle_id=bundle_id) diff --git a/airtest/utils/logwraper.py b/airtest/utils/logwraper.py index 6ee4a3911..5e1d1c7dd 100644 --- a/airtest/utils/logwraper.py +++ b/airtest/utils/logwraper.py @@ -9,6 +9,7 @@ from copy import copy from .logger import get_logger from .snippet import reg_cleanup +from airtest.core.error import LocalDeviceError LOGGING = get_logger(__name__) @@ -125,10 +126,15 @@ def func1(snapshot=True): # The snapshot parameter is popped from the function parameter, # so the function cannot use the parameter name snapshot later snapshot = m.pop('snapshot', False) + m.pop('self', None) # remove self from the call_args + m.pop('cls', None) # remove cls from the call_args fndata = {'name': f.__name__, 'call_args': m, 'start_time': start} logger.running_stack.append(fndata) try: res = f(*args, **kwargs) + except LocalDeviceError: + # 为了进入airtools中的远程方法,同时不让LocalDeviceError在报告中显示为失败步骤 + raise LocalDeviceError except Exception as e: data = {"traceback": traceback.format_exc(), "end_time": time.time()} fndata.update(data) diff --git a/tests/test_ios.py b/tests/test_ios.py index eeebb6120..02e97e104 100644 --- a/tests/test_ios.py +++ b/tests/test_ios.py @@ -1,5 +1,6 @@ # encoding=utf-8 import os +import shutil import time import unittest import numpy @@ -9,6 +10,7 @@ from .testconf import try_remove import cv2 import warnings +import tempfile warnings.simplefilter("always") text_flag = True # 控制是否运行text接口用例 @@ -17,13 +19,17 @@ PKG_SAFARI = "com.apple.mobilesafari" TEST_IPA_FILE_OR_URL = "" # IPA包体的路径或者url链接,测试安装 TEST_IPA_BUNDLE_ID = "" # IPA安装后app的bundleID,测试卸载 - -class TestIos(unittest.TestCase): +class TestIos(unittest.TestCase): @classmethod def setUpClass(cls): # cls.ios = IOS(addr=DEFAULT_ADDR, cap_method=CAP_METHOD.WDACAP) cls.ios = connect_device("iOS:///http+usbmux://") + cls.TEST_FSYNC_APP = "" # 测试文件推送、同步的app的bundleID + # 获取一个可以用于文件操作的app + app_list = cls.ios.list_app(type="all") + if len(app_list) > 0: + cls.TEST_FSYNC_APP = app_list[0][0] @classmethod def tearDownClass(cls): @@ -317,6 +323,101 @@ def test_set_clipboard(self): self.assertEqual(self.ios.get_clipboard(), text) self.ios.paste() + def test_ls(self): + print("test ls") + print(self.ios.ls("/Documents/", self.TEST_FSYNC_APP)) + + def test_push(self): + def _test_file(file_name): + print(f"test push file {file_name}") + with open(file_name, 'w') as f: + f.write('Test data') + self.ios.push(file_name, "/Documents/", self.TEST_FSYNC_APP, timeout=60) + try_remove(file_name) + file_list = self.ios.ls("/Documents/", self.TEST_FSYNC_APP) + self.assertTrue(file_name in [item['name'] for item in file_list]) + + def _test_dir(dir_name): + print(f"test push directory {dir_name}") + os.mkdir(dir_name) + with open(f'{dir_name}/test_data', 'w') as f: + f.write('Test data') + self.ios.push(dir_name, "/Documents/", self.TEST_FSYNC_APP, timeout=60) + try_remove(dir_name) + file_list = self.ios.ls("/Documents/", self.TEST_FSYNC_APP) + self.assertTrue(f"{dir_name}/" in [item['name'] for item in file_list]) + + _test_file("test_data.txt") + _test_file("测试文件.txt") + _test_dir('test_dir') + _test_dir('测试文件夹') + + def test_pull(self): + def _test_file(file_name): + print(f"test pull file {file_name}") + self.ios.pull(f"/Documents/{file_name}", ".", self.TEST_FSYNC_APP, timeout=60) + self.assertTrue(os.path.exists(file_name)) + try_remove(file_name) + + def _test_dir(dir_name): + print(f"test pull directory {dir_name}") + os.mkdir(dir_name) + self.ios.pull(f"/Documents/{dir_name}", dir_name, self.TEST_FSYNC_APP, timeout=60) + self.assertTrue(os.path.exists(f"{dir_name}/{dir_name}")) + try_remove(dir_name) + + _test_file("test_data.txt") + _test_file("测试文件.txt") + _test_dir('test_dir') + _test_dir('测试文件夹') + + def test_rm(self): + def _test_file(file_name): + print(f"test rm file {file_name}") + file_list = self.ios.ls("/Documents/", self.TEST_FSYNC_APP) + find_flag = False + for item in file_list: + if item['name'] == file_name: + print(f"find file {file_name}") + find_flag = True + break + if not find_flag: + print(f"not find file {file_name}") + return + self.ios.rm(f"/Documents/{file_name}", self.TEST_FSYNC_APP) + file_list = self.ios.ls("/Documents/", self.TEST_FSYNC_APP) + self.assertTrue(file_name not in [item['name'] for item in file_list]) + + def _test_dir(dir_name): + print(f"test rm directory {dir_name}") + file_list = self.ios.ls("/Documents/", self.TEST_FSYNC_APP) + find_flag = False + for item in file_list: + if item['name'] == f"{dir_name}/": + print(f"find dir {dir_name}") + find_flag = True + break + if not find_flag: + print(f"not find dir {dir_name}") + return + self.ios.rm(f"/Documents/{dir_name}", self.TEST_FSYNC_APP, is_dir=True) + file_list = self.ios.ls("/Documents/", self.TEST_FSYNC_APP) + self.assertTrue(f"{dir_name}/" not in [item['name'] for item in file_list]) + + _test_file("test_data.txt") + _test_file("测试文件.txt") + _test_dir('test_dir') + _test_dir('测试文件夹') + + def test_mkdir(self): + print("test mkdir") + dir_name = "/Documents/test_dir" + self.ios.mkdir(dir_name, self.TEST_FSYNC_APP) + + dirs = self.ios.ls("/Documents", self.TEST_FSYNC_APP) + self.assertTrue(any(d['name'] == 'test_dir/' for d in dirs)) + self.ios.rm(dir_name, self.TEST_FSYNC_APP, is_dir=True) + if __name__ == '__main__': # unittest.main() @@ -339,6 +440,11 @@ def test_set_clipboard(self): suite.addTest(TestIos("test_touch")) suite.addTest(TestIos("test_swipe")) suite.addTest(TestIos("test_double_click")) + suite.addTest(TestIos("test_ls")) + suite.addTest(TestIos("test_push")) + suite.addTest(TestIos("test_pull")) + suite.addTest(TestIos("test_mkdir")) + suite.addTest(TestIos("test_rm")) # 联合接口,顺序测试:解锁屏、应用启动关闭 suite.addTest(TestIos("test_is_locked")) suite.addTest(TestIos("test_lock")) From cd179008ddc553cde8a35ee19bfb46fc0cb37542 Mon Sep 17 00:00:00 2001 From: ZSCharlie <> Date: Thu, 20 Jun 2024 20:33:52 +0800 Subject: [PATCH 07/16] level up to 1.3.5 --- airtest/utils/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airtest/utils/version.py b/airtest/utils/version.py index ce5aeff03..f1250baf1 100644 --- a/airtest/utils/version.py +++ b/airtest/utils/version.py @@ -1,4 +1,4 @@ -__version__ = "1.3.3.1" +__version__ = "1.3.5" import os import sys From 3329f14c90f47340d239c81c8b59b3f6937d6838 Mon Sep 17 00:00:00 2001 From: zhangshun01 Date: Fri, 21 Jun 2024 15:46:06 +0800 Subject: [PATCH 08/16] =?UTF-8?q?iOS=20push=E5=A2=9E=E5=8A=A0=E5=88=A4?= =?UTF-8?q?=E6=96=AD=E8=B7=AF=E5=BE=84=E6=98=AF=E5=90=A6=E5=AD=98=E5=9C=A8?= =?UTF-8?q?=EF=BC=8C=E4=BF=AE=E6=94=B9rm=E5=88=A4=E6=96=AD=E8=B7=AF?= =?UTF-8?q?=E5=BE=84=E6=98=AF=E5=90=A6=E4=B8=BA=E6=96=87=E4=BB=B6=E5=A4=B9?= =?UTF-8?q?=E7=9A=84=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit (cherry picked from commit ef5769cbba755e704e443fc899f940b2ce085a4e) --- airtest/core/ios/ios.py | 62 ++++++++++++++++++++++++----------------- tests/test_ios.py | 4 +-- 2 files changed, 39 insertions(+), 27 deletions(-) diff --git a/airtest/core/ios/ios.py b/airtest/core/ios/ios.py index deb29838a..2de14afec 100644 --- a/airtest/core/ios/ios.py +++ b/airtest/core/ios/ios.py @@ -298,6 +298,9 @@ def push(udid, local_path, device_path, bundle_id=None, timeout=None): """ try: + if not os.path.exists(local_path): + raise AirtestError(f"Local path {local_path} does not exist.") + if bundle_id: sync = BaseDevice(udid, Usbmux()).app_sync(bundle_id) else: @@ -348,23 +351,13 @@ def pull(udid, device_path, local_path, bundle_id=None, timeout=None): timeout (int, optional): The timeout in seconds for the remote device operation. Defaults to None. """ - def _is_dir(remote_path): - remote_path = remote_path.rstrip("\\/") - remote_path_dir, remote_path_base = os.path.split(remote_path) - ret = TIDevice.ls(udid, remote_path_dir, bundle_id) - - for i in ret: - if i['name'].rstrip('/') == remote_path_base: - return i['type'].lower() == 'directory' - return False - try: if bundle_id: sync = BaseDevice(udid, Usbmux()).app_sync(bundle_id) else: sync = BaseDevice(udid, Usbmux()).sync - if _is_dir(device_path): + if TIDevice.is_dir(device_path, udid, bundle_id): os.makedirs(local_path, exist_ok=True) src = pathlib.Path(device_path) @@ -378,7 +371,7 @@ def _is_dir(remote_path): raise AirtestError(f"Failed to pull {device_path} to {local_path}.") @staticmethod - def rm(udid, remote_path, bundle_id=None, is_dir=False): + def rm(udid, remote_path, bundle_id=None): """ Removes a file or directory from the iOS device. @@ -386,33 +379,37 @@ def rm(udid, remote_path, bundle_id=None, is_dir=False): udid (str): The UDID of the iOS device. remote_path (str): The path of the file or directory on the iOS device. bundle_id (str, optional): The bundle ID of the app. If provided, the file or directory will be removed from the app's sandbox. Defaults to None. - is_dir (bool, optional): Indicates whether the path is a directory. Defaults to False. + """ + def _check_status(status): + if status == 0: + print("removed", remote_path) + else: + raise AirtestError(f"<{status.name} {status.value}> Failed to remove {remote_path}") + def _remove_folder(udid, folder_path, bundle_id): folder_path = folder_path.replace("\\", "/") for file_info in TIDevice.ls(udid, folder_path, bundle_id): if file_info['type'] == 'Directory': _remove_folder(udid, os.path.join(folder_path, file_info['name']), bundle_id) else: - sync.remove(os.path.join(folder_path, file_info['name'])) - sync.remove(folder_path) + status = sync.remove(os.path.join(folder_path, file_info['name'])) + _check_status(status) + status = sync.remove(folder_path) + _check_status(status) if bundle_id: sync = BaseDevice(udid, Usbmux()).app_sync(bundle_id) else: sync = BaseDevice(udid, Usbmux()).sync - if is_dir: + if TIDevice.is_dir(remote_path, udid, bundle_id): if not remote_path.endswith("/"): remote_path += "/" _remove_folder(udid, remote_path, bundle_id) else: - sync.remove(remote_path) - status = sync.remove(remote_path) - if status == 0: - print("removed", remote_path) - else: - raise AirtestError(f"<{status.name} {status.value}> Failed to remove {remote_path}") + status = sync.remove(remote_path) + _check_status(status) @staticmethod def ls(udid, remote_path, bundle_id=None): @@ -465,6 +462,22 @@ def mkdir(udid, remote_path, bundle_id=None): else: raise AirtestError(f"<{status.name} {status.value}> Failed to create directory {remote_path}") + @staticmethod + def is_dir(remote_path, udid, bundle_id): + try: + remote_path = remote_path.rstrip("\\/") + remote_path_dir, remote_path_base = os.path.split(remote_path) + if bundle_id: + sync = BaseDevice(udid, Usbmux()).app_sync(bundle_id) + else: + sync = BaseDevice(udid, Usbmux()).sync + + for file_info in sync.listdir_info(remote_path_dir): + if file_info.st_name == remote_path_base: + return file_info.is_dir() + except Exception as e: + raise AirtestError(f"Failed to check if {remote_path} is a directory.") + @add_decorator_to_methods(decorator_retry_session) class IOS(Device): """IOS client. @@ -1583,21 +1596,20 @@ def ls(self, remote_path, bundle_id=None): return TIDevice.ls(self.udid, remote_path, bundle_id=bundle_id) @logwrap - def rm(self, remote_path, bundle_id=None, is_dir=False): + def rm(self, remote_path, bundle_id=None): """ Remove a file or directory from the iOS device. Args: remote_path (str): The remote path to remove. bundle_id (str, optional): The bundle ID of the app. Defaults to None. - is_dir (bool, optional): True if the remote path is a directory. Defaults to False. Raises: LocalDeviceError: If the device is remote. """ if not self.is_local_device: raise LocalDeviceError() - TIDevice.rm(self.udid, remote_path, bundle_id=bundle_id, is_dir=is_dir) + TIDevice.rm(self.udid, remote_path, bundle_id=bundle_id) @logwrap def mkdir(self, remote_path, bundle_id=None): diff --git a/tests/test_ios.py b/tests/test_ios.py index 02e97e104..2c2798b67 100644 --- a/tests/test_ios.py +++ b/tests/test_ios.py @@ -400,7 +400,7 @@ def _test_dir(dir_name): if not find_flag: print(f"not find dir {dir_name}") return - self.ios.rm(f"/Documents/{dir_name}", self.TEST_FSYNC_APP, is_dir=True) + self.ios.rm(f"/Documents/{dir_name}", self.TEST_FSYNC_APP) file_list = self.ios.ls("/Documents/", self.TEST_FSYNC_APP) self.assertTrue(f"{dir_name}/" not in [item['name'] for item in file_list]) @@ -416,7 +416,7 @@ def test_mkdir(self): dirs = self.ios.ls("/Documents", self.TEST_FSYNC_APP) self.assertTrue(any(d['name'] == 'test_dir/' for d in dirs)) - self.ios.rm(dir_name, self.TEST_FSYNC_APP, is_dir=True) + self.ios.rm(dir_name, self.TEST_FSYNC_APP) if __name__ == '__main__': From 7e9e200be65d45ce11f02b6d19b752facc25a258 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=90=A6?= Date: Tue, 23 Jul 2024 19:29:17 +0800 Subject: [PATCH 09/16] fix: modify wda version (cherry picked from commit ae0ac758d22a9598915c95c12632bc489faa2d95) --- setup.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/setup.py b/setup.py index 4de5ece09..c15d426be 100644 --- a/setup.py +++ b/setup.py @@ -36,6 +36,8 @@ def parse_requirements(filename): # if py<=3.6 add dataclasses if sys.version_info.major == 3 and sys.version_info.minor <= 6: reqs.append("dataclasses") + reqs.remove("facebook-wda>=1.3.3") + reqs.append("facebook-wda<1.4.9") if is_docker(): reqs.remove("opencv-contrib-python>=4.4.0.46, <=4.6.0.66") reqs.append("opencv-contrib-python-headless==4.5.5.64") From d42f7d0f11521fbeea709d8089d1818b70332eaa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=90=A6?= Date: Tue, 23 Jul 2024 19:30:48 +0800 Subject: [PATCH 10/16] fix version (cherry picked from commit fc6fe9fdbac20f41e4cd62fa2e72a8f75032e43a) --- setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.py b/setup.py index c15d426be..4b995b530 100644 --- a/setup.py +++ b/setup.py @@ -36,6 +36,7 @@ def parse_requirements(filename): # if py<=3.6 add dataclasses if sys.version_info.major == 3 and sys.version_info.minor <= 6: reqs.append("dataclasses") + if sys.version_info.major == 3 and sys.version_info.minor <= 7: reqs.remove("facebook-wda>=1.3.3") reqs.append("facebook-wda<1.4.9") if is_docker(): From dcf20fdd49a36ab1759638bde4b90928ff3d2e9a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=90=A6?= Date: Tue, 23 Jul 2024 19:54:54 +0800 Subject: [PATCH 11/16] fix: py<3.8 use wda failed (cherry picked from commit db3867f91c787d8226a01042790d91b3e086161f) --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 4b995b530..52ee3c6a4 100644 --- a/setup.py +++ b/setup.py @@ -38,7 +38,7 @@ def parse_requirements(filename): reqs.append("dataclasses") if sys.version_info.major == 3 and sys.version_info.minor <= 7: reqs.remove("facebook-wda>=1.3.3") - reqs.append("facebook-wda<1.4.9") + reqs.append("facebook-wda<1.4.8") if is_docker(): reqs.remove("opencv-contrib-python>=4.4.0.46, <=4.6.0.66") reqs.append("opencv-contrib-python-headless==4.5.5.64") From 58d4f6be96287d8c34398a2e1d24c4b3c6023b0e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E9=A1=BA?= Date: Thu, 27 Jun 2024 14:53:47 +0800 Subject: [PATCH 12/16] merge ios_extend --- airtest/core/ios/ios.py | 91 ++++++++++++++++++++++++++++++-------- tests/test_ios.py | 96 +++++++++++++++++++++++++++++++++++------ 2 files changed, 156 insertions(+), 31 deletions(-) diff --git a/airtest/core/ios/ios.py b/airtest/core/ios/ios.py index 2de14afec..aa82949ed 100644 --- a/airtest/core/ios/ios.py +++ b/airtest/core/ios/ios.py @@ -295,7 +295,19 @@ def push(udid, local_path, device_path, bundle_id=None, timeout=None): local_path (str): The local path of the file or directory to be pushed. bundle_id (str, optional): The bundle ID of the app. If provided, the file or directory will be pushed to the app's sandbox container. Defaults to None. timeout (int, optional): The timeout in seconds for the remote device operation. Defaults to None. - + + Examples: + + Push a file to the DCIM directory:: + + >>> TIDevice.push("00008020-001270842E88002E", "C:/Users/username/Pictures/photo.jpg", "/DCIM") + >>> TIDevice.push("00008020-001270842E88002E", "C:/Users/username/Pictures/photo.jpg", "/DCIM/photo.jpg") + + Push a directory to the Documents directory of the Keynote app:: + + >>> TIDevice.push("00008020-001270842E88002E", "C:/Users/username/test.key", "/Documents", "com.apple.Keynote") + >>> TIDevice.push("00008020-001270842E88002E", "C:/Users/username/test.key", "/Documents/test.key", "com.apple.Keynote") + """ try: if not os.path.exists(local_path): @@ -306,10 +318,19 @@ def push(udid, local_path, device_path, bundle_id=None, timeout=None): else: sync = BaseDevice(udid, Usbmux()).sync + if device_path.endswith("/") or device_path.endswith("\\"): + device_path = device_path[:-1] + if os.path.isfile(local_path): file_name = os.path.basename(local_path) - device_path = os.path.join(device_path, file_name) + # 如果device_path有后缀则认为是文件,和本地文件名不一样视为需要重命名 + if not os.path.splitext(device_path)[1]: + if os.path.basename(device_path) != file_name: + device_path = os.path.join(device_path, file_name) device_path = device_path.replace("\\", "/") + # Create the directory if it does not exist + sync.mkdir(os.path.dirname(device_path)) + with open(local_path, "rb") as f: content = f.read() sync.push_content(device_path, content) @@ -318,12 +339,14 @@ def push(udid, local_path, device_path, bundle_id=None, timeout=None): device_path = device_path.replace("\\", "/") sync.mkdir(device_path) for root, dirs, files in os.walk(local_path): + # 创建文件夹 for directory in dirs: dir_path = os.path.join(root, directory) relative_dir_path = os.path.relpath(dir_path, local_path) device_dir_path = os.path.join(device_path, relative_dir_path) device_dir_path = device_dir_path.replace("\\", "/") sync.mkdir(device_dir_path) + # 上传文件 for file_name in files: file_path = os.path.join(root, file_name) relative_path = os.path.relpath(file_path, local_path) @@ -334,7 +357,7 @@ def push(udid, local_path, device_path, bundle_id=None, timeout=None): sync.push_content(device_file_path, content) print(f"pushed {local_path} to {device_path}") except Exception as e: - raise AirtestError(f"Failed to push {local_path} to {device_path}.") + raise AirtestError(f"Failed to push {local_path} to {device_path}. If push a FILE, please check if there is a DIRECTORY with the same name already exists. If push a DIRECTORY, please check if there is a FILE with the same name already exists, and try again.") @staticmethod def pull(udid, device_path, local_path, bundle_id=None, timeout=None): @@ -357,7 +380,7 @@ def pull(udid, device_path, local_path, bundle_id=None, timeout=None): else: sync = BaseDevice(udid, Usbmux()).sync - if TIDevice.is_dir(device_path, udid, bundle_id): + if TIDevice.is_dir(udid, device_path, bundle_id): os.makedirs(local_path, exist_ok=True) src = pathlib.Path(device_path) @@ -395,15 +418,13 @@ def _remove_folder(udid, folder_path, bundle_id): else: status = sync.remove(os.path.join(folder_path, file_info['name'])) _check_status(status) - status = sync.remove(folder_path) - _check_status(status) if bundle_id: sync = BaseDevice(udid, Usbmux()).app_sync(bundle_id) else: sync = BaseDevice(udid, Usbmux()).sync - if TIDevice.is_dir(remote_path, udid, bundle_id): + if TIDevice.is_dir(udid, remote_path, bundle_id): if not remote_path.endswith("/"): remote_path += "/" _remove_folder(udid, remote_path, bundle_id) @@ -430,7 +451,8 @@ def ls(udid, remote_path, bundle_id=None): sync = BaseDevice(udid, Usbmux()).app_sync(bundle_id) else: sync = BaseDevice(udid, Usbmux()).sync - + if remote_path.endswith("/") or remote_path.endswith("\\"): + remote_path = remote_path[:-1] for file_info in sync.listdir_info(remote_path): filename = file_info.st_name if file_info.is_dir(): @@ -463,20 +485,19 @@ def mkdir(udid, remote_path, bundle_id=None): raise AirtestError(f"<{status.name} {status.value}> Failed to create directory {remote_path}") @staticmethod - def is_dir(remote_path, udid, bundle_id): + def is_dir(udid, remote_path, bundle_id): try: remote_path = remote_path.rstrip("\\/") remote_path_dir, remote_path_base = os.path.split(remote_path) - if bundle_id: - sync = BaseDevice(udid, Usbmux()).app_sync(bundle_id) - else: - sync = BaseDevice(udid, Usbmux()).sync - - for file_info in sync.listdir_info(remote_path_dir): - if file_info.st_name == remote_path_base: - return file_info.is_dir() + file_info = TIDevice.ls(udid, remote_path_dir, bundle_id) + for info in file_info: + # Remove the trailing slash. + if info['name'].endswith("/"): + info['name'] = info['name'][:-1] + if info['name'] == f"{remote_path_base}": + return info['type'] == 'Directory' except Exception as e: - raise AirtestError(f"Failed to check if {remote_path} is a directory.") + raise AirtestError(f"Failed to check if {remote_path} is a directory. Please check the path exist and try again.") @add_decorator_to_methods(decorator_retry_session) class IOS(Device): @@ -1544,6 +1565,16 @@ def push(self, local_path, remote_path, bundle_id=None, timeout=None): Raises: LocalDeviceError: If the device is remote. + + Examples: + + >>> dev = connect_device("iOS:///http+usbmux://udid") + >>> dev.push("test.png", "/DCIM/") + >>> dev.push("test.png", "/DCIM/test.png") + >>> dev.push("test.png", "/DCIM/test_rename.png") + >>> dev.push("test.key", "/Documents/", "com.apple.Keynote") # Push to the Documents directory of the Keynote app + >>> dev.push("test.key", "/Documents/test.key", "com.apple.Keynote") + """ if not self.is_local_device: raise LocalDeviceError() @@ -1626,3 +1657,27 @@ def mkdir(self, remote_path, bundle_id=None): if not self.is_local_device: raise LocalDeviceError() TIDevice.mkdir(self.udid, remote_path, bundle_id=bundle_id) + + def is_dir(self, remote_path, bundle_id=None): + """ + Check if the specified path on the iOS device is a directory. + + Args: + remote_path (str): The remote path to check. + bundle_id (str, optional): The bundle ID of the app. Defaults to None. + + Returns: + bool: True if the path is a directory, False otherwise. + + Exapmles: + + >>> dev = connect_device("iOS:///http+usbmux://udid") + >>> print(dev.is_dir("/DCIM/")) + True + >>> print(dev.is_dir("/Documents/test.key", "com.apple.Keynote")) + False + + """ + if not self.is_local_device: + raise LocalDeviceError() + return TIDevice.is_dir(self.udid, remote_path, bundle_id=bundle_id) diff --git a/tests/test_ios.py b/tests/test_ios.py index 2c2798b67..d85ec551b 100644 --- a/tests/test_ios.py +++ b/tests/test_ios.py @@ -328,29 +328,99 @@ def test_ls(self): print(self.ios.ls("/Documents/", self.TEST_FSYNC_APP)) def test_push(self): - def _test_file(file_name): - print(f"test push file {file_name}") + def _try_remove_ios(file_name, bundle_id=None): + try: + self.ios.rm(file_name, bundle_id) + file_list = self.ios.ls(os.path.dirname(file_name), bundle_id) + for file in file_list: + if file['name'] == file_name: + raise Exception(f"remove file {file_name} failed") + print(f"file {file_name} not exist now.") + except: + pass + + def _test_file(file_name, dst="/Documents/", bundle_id=self.TEST_FSYNC_APP, target=None): + try_remove(file_name) with open(file_name, 'w') as f: f.write('Test data') - self.ios.push(file_name, "/Documents/", self.TEST_FSYNC_APP, timeout=60) + + # 用来ls和rm的路径,没有将文件改名则默认为file_name + if not target: + tmp_dst = os.path.normpath(dst) + if os.path.basename(tmp_dst) != file_name: + tmp_dst = os.path.join(tmp_dst, file_name) + target = tmp_dst.replace('\\', '/') + + # 清理手机里的文件 + _try_remove_ios(target, bundle_id) + self.ios.push(file_name, dst, bundle_id, timeout=60) + time.sleep(1) + file_list = self.ios.ls(target, bundle_id) + # 验证结果 + self.assertEqual(len(file_list), 1) + self.assertEqual(file_list[0]['name'], os.path.basename(target)) + self.assertEqual(file_list[0]['type'], 'File') + self.ios.rm(target, bundle_id) + time.sleep(1) + + # 清理 try_remove(file_name) - file_list = self.ios.ls("/Documents/", self.TEST_FSYNC_APP) - self.assertTrue(file_name in [item['name'] for item in file_list]) - def _test_dir(dir_name): + def _test_dir(dir_name, dst="/Documents/"): print(f"test push directory {dir_name}") + # 用来ls和rm的路径 + tmp_dst = os.path.normpath(dst) + if os.path.basename(tmp_dst) != dir_name: + tmp_dst = os.path.join(tmp_dst, dir_name) + target = tmp_dst.replace('\\', '/') + + # 创建文件夹和文件 + try_remove(dir_name) + _try_remove_ios(target, self.TEST_FSYNC_APP) os.mkdir(dir_name) with open(f'{dir_name}/test_data', 'w') as f: f.write('Test data') - self.ios.push(dir_name, "/Documents/", self.TEST_FSYNC_APP, timeout=60) + + self.ios.push(dir_name, dst, self.TEST_FSYNC_APP, timeout=60) + time.sleep(1) + + dir_list = self.ios.ls(os.path.dirname(target), self.TEST_FSYNC_APP) + print(dir_list) + self.assertTrue(f"{dir_name}/" in [item['name'] for item in dir_list]) + file_list = self.ios.ls(f"{target}/test_data", self.TEST_FSYNC_APP) + self.assertTrue("test_data" in [item['name'] for item in file_list]) + self.ios.rm(target, self.TEST_FSYNC_APP) + time.sleep(1) + try_remove(dir_name) - file_list = self.ios.ls("/Documents/", self.TEST_FSYNC_APP) - self.assertTrue(f"{dir_name}/" in [item['name'] for item in file_list]) - _test_file("test_data.txt") - _test_file("测试文件.txt") - _test_dir('test_dir') - _test_dir('测试文件夹') + # 执行得太快会报错,可能和wda的处理速度有关 + # 如果报错尝试单独执行那些用例 + _test_file("test_data_1.txt", "/Documents/") + _test_file("test_data_2.txt", "/Documents/test_data_2.txt") + _test_file("test_data_3.txt", "/Documents/test_data_3.txt/") + #重命名文件 + _test_file("test_data_4.txt", "/Documents/test_data.txt/", target="/Documents/test_data.txt") + _test_file("test_data.txt", "/Documents") + _test_file("test_1.png", "/DCIM", None) + _test_file("test_2.png", "/DCIM/", None) + _test_file("test_3.png", "/DCIM/test_3.png", None) + _test_file("test_4.png", "/DCIM/test_4.png/", None) + _test_file("test_5.png", "/DCIM/test.png/", None, target="/DCIM/test.png") + _test_file("test.png", "/DCIM/", None) + _test_file("t e s t d a t a.txt", "/Documents") + _test_file("测试文件.txt", "/Documents") + _test_file("测 试 文 件.txt", "/Documents") + _test_file("(){}[]~'-_@!#$%&+,;=^.txt", "/Documents") + _test_file("data", "/Documents") + + _test_dir('test_dir', "/Documents/") + _test_dir('test_dir_1', "/Documents") + _test_dir('t e s t d i r', "/Documents") + _test_dir("(){}[]~'-_@!#$%&+,;=^", "/Documents") + _test_dir('测试文件夹', "/Documents/") + _test_dir('测试文件夹_1', "/Documents") + _test_dir('测 试 文 件 夹', "/Documents") def test_pull(self): def _test_file(file_name): From a9421157d3a2b2c7d8f6b58b84a58b33cfcb333c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E9=A1=BA?= Date: Fri, 9 Aug 2024 18:25:41 +0800 Subject: [PATCH 13/16] fix xctest bug --- airtest/core/ios/ios.py | 117 ++++++++++++++-- tests/test_ios.py | 294 ++++++++++++++++++++++++++++++++-------- 2 files changed, 346 insertions(+), 65 deletions(-) diff --git a/airtest/core/ios/ios.py b/airtest/core/ios/ios.py index aa82949ed..9f06e5e26 100644 --- a/airtest/core/ios/ios.py +++ b/airtest/core/ios/ios.py @@ -71,7 +71,11 @@ def wrapper(*args, **kwargs): try: return func(*args, **kwargs) except MuxError: - LOGGING.error("Device is not yet paired. Triggered the trust dialogue. Please accept and try again." + "(iTunes is required on Windows.) " if sys.platform.startswith("win") else "") + if sys.platform.startswith("win"): + error_msg = "Device is not yet paired. Triggered the trust dialogue. Please accept and try again. (iTunes is required on Windows.) " + else: + error_msg = "Device is not yet paired. Triggered the trust dialogue. Please accept and try again." + LOGGING.error(error_msg) raise return wrapper @@ -282,7 +286,16 @@ def ps_wda(udid): @staticmethod def xctest(udid, wda_bundle_id): - return BaseDevice(udid, Usbmux()).xctest(fuzzy_bundle_id=wda_bundle_id, logger=setup_logger(level=logging.INFO)) + try: + return BaseDevice(udid, Usbmux()).xctest(fuzzy_bundle_id=wda_bundle_id, logger=setup_logger(level=logging.INFO)) + except Exception as e: + print(f"Failed to run tidevice xctest function for {wda_bundle_id}.Try to run tidevice runwda function for {wda_bundle_id}.") + try: + return BaseDevice(udid, Usbmux()).runwda(fuzzy_bundle_id=wda_bundle_id) + except Exception as e: + print(f"Failed to run tidevice runwda function for {wda_bundle_id}.") + # 先不抛出异常,ios17的兼容未合并进来,ios17设备一定会报错 + # raise AirtestError(f"Failed to start XCTest for {wda_bundle_id}.") @staticmethod def push(udid, local_path, device_path, bundle_id=None, timeout=None): @@ -373,6 +386,18 @@ def pull(udid, device_path, local_path, bundle_id=None, timeout=None): bundle_id (str, optional): The bundle ID of the app. If provided, the file or directory will be pulled from the app's sandbox. Defaults to None. timeout (int, optional): The timeout in seconds for the remote device operation. Defaults to None. + Examples: + + Pull a file from the DCIM directory:: + + >>> TIDevice.pull("00008020-001270842E88002E", "/DCIM/photo.jpg", "C:/Users/username/Pictures/photo.jpg") + >>> TIDevice.pull("00008020-001270842E88002E", "/DCIM/photo.jpg", "C:/Users/username/Pictures") + + Pull a directory from the Documents directory of the Keynote app:: + + >>> TIDevice.pull("00008020-001270842E88002E", "/Documents", "C:/Users/username/Documents", "com.apple.Keynote") + >>> TIDevice.pull("00008020-001270842E88002E", "/Documents", "C:/Users/username/Documents", "com.apple.Keynote") + """ try: if bundle_id: @@ -403,12 +428,21 @@ def rm(udid, remote_path, bundle_id=None): remote_path (str): The path of the file or directory on the iOS device. bundle_id (str, optional): The bundle ID of the app. If provided, the file or directory will be removed from the app's sandbox. Defaults to None. + Examples: + Remove a file from the DCIM directory:: + + >>> TIDevice.rm("00008020-001270842E88002E", "/DCIM/photo.jpg") + >>> TIDevice.rm("00008020-001270842E88002E", "/DCIM/photo.jpg", "com.apple.Photos") + + Remove a directory from the Documents directory of the Keynote app:: + + >>> TIDevice.rm("00008020-001270842E88002E", "/Documents", "com.apple.Keynote") """ - def _check_status(status): + def _check_status(status, path): if status == 0: - print("removed", remote_path) + print("removed", path) else: - raise AirtestError(f"<{status.name} {status.value}> Failed to remove {remote_path}") + raise AirtestError(f"<{status.name} {status.value}> Failed to remove {path}") def _remove_folder(udid, folder_path, bundle_id): folder_path = folder_path.replace("\\", "/") @@ -417,7 +451,10 @@ def _remove_folder(udid, folder_path, bundle_id): _remove_folder(udid, os.path.join(folder_path, file_info['name']), bundle_id) else: status = sync.remove(os.path.join(folder_path, file_info['name'])) - _check_status(status) + _check_status(status, os.path.join(folder_path, file_info['name'])) + # remove the folder itself + status = sync.remove(folder_path) + _check_status(status, folder_path) if bundle_id: sync = BaseDevice(udid, Usbmux()).app_sync(bundle_id) @@ -430,7 +467,7 @@ def _remove_folder(udid, folder_path, bundle_id): _remove_folder(udid, remote_path, bundle_id) else: status = sync.remove(remote_path) - _check_status(status) + _check_status(status, remote_path) @staticmethod def ls(udid, remote_path, bundle_id=None): @@ -472,6 +509,16 @@ def mkdir(udid, remote_path, bundle_id=None): udid (str): The UDID of the iOS device. remote_path (str): The path of the directory to be created on the iOS device. bundle_id (str, optional): The bundle ID of the app. Defaults to None. + + Examples: + Create a directory in the DCIM directory:: + + >>> TIDevice.mkdir("00008020-001270842E88002E", "/DCIM/test") + + Create a directory in the Documents directory of the Keynote app:: + + >>> TIDevice.mkdir("00008020-001270842E88002E", "/Documents/test", "com.apple.Keynote") + """ if bundle_id: sync = BaseDevice(udid, Usbmux()).app_sync(bundle_id) @@ -486,6 +533,30 @@ def mkdir(udid, remote_path, bundle_id=None): @staticmethod def is_dir(udid, remote_path, bundle_id): + """ + Check if the specified path on the iOS device is a directory. + + Args: + udid (str): The UDID of the iOS device. + remote_path (str): The path on the iOS device. + bundle_id (str): The bundle ID of the app. + + Returns: + bool: True if the path is a directory, False otherwise. + + Examples: + Check if the DCIM directory is a directory:: + + >>> TIDevice.is_dir("00008020-001270842E88002E", "/DCIM") + True + + Check if the Documents directory of the Keynote app is a directory:: + + >>> TIDevice.is_dir("00008020-001270842E88002E", "/Documents", "com.apple.Keynote") + True + >>> TIDevice.is_dir("00008020-001270842E88002E", "/Documents/test.key", "com.apple.Keynote") + False + """ try: remote_path = remote_path.rstrip("\\/") remote_path_dir, remote_path_base = os.path.split(remote_path) @@ -1569,11 +1640,14 @@ def push(self, local_path, remote_path, bundle_id=None, timeout=None): Examples: >>> dev = connect_device("iOS:///http+usbmux://udid") - >>> dev.push("test.png", "/DCIM/") >>> dev.push("test.png", "/DCIM/test.png") >>> dev.push("test.png", "/DCIM/test_rename.png") >>> dev.push("test.key", "/Documents/", "com.apple.Keynote") # Push to the Documents directory of the Keynote app - >>> dev.push("test.key", "/Documents/test.key", "com.apple.Keynote") + >>> dev.push("test.key", "/Documents/test.key", "com.apple.Keynote") + + Push file without suffix cannot be renamed, so the following code will push file to the path considered as a directory + >>> dev.push("test", "/Documents/test", "com.apple.Keynote") # The pushed file will be /Documents/test + >>> dev.push("test", "/Documents/test_rename", "com.apple.Keynote") # The pushed file will be /Documents/test_rename/test """ if not self.is_local_device: @@ -1592,6 +1666,15 @@ def pull(self, remote_path, local_path, bundle_id=None, timeout=None): Raises: LocalDeviceError: If the device is remote. + + Examples: + + >>> dev = connect_device("iOS:///http+usbmux://udid") + >>> dev.pull("/DCIM/test.png", "test.png") + >>> dev.pull("/Documents/test.key", "test.key", "com.apple.Keynote") + >>> dev.pull("/Documents/test.key", "dir/test.key", "com.apple.Keynote") + >>> dev.pull("/Documents/test.key", "test_rename.key", "com.apple.Keynote") + """ if not self.is_local_device: raise LocalDeviceError() @@ -1637,6 +1720,14 @@ def rm(self, remote_path, bundle_id=None): Raises: LocalDeviceError: If the device is remote. + AirtestError: If the file or directory does not exist. + + Examples: + + >>> dev = connect_device("iOS:///http+usbmux://udid") + >>> dev.rm("/Documents/test.key", "com.apple.Keynote") + >>> dev.rm("/Documents/test_dir", "com.apple.Keynote") + """ if not self.is_local_device: raise LocalDeviceError() @@ -1651,8 +1742,12 @@ def mkdir(self, remote_path, bundle_id=None): remote_path (str): The remote path to create. bundle_id (str, optional): The bundle ID of the app. Defaults to None. - Raises: - LocalDeviceError: If the device is remote. + + Examples: + + >>> dev = connect_device("iOS:///http+usbmux://udid") + >>> dev.mkdir("/Documents/test_dir", "com.apple.Keynote") + """ if not self.is_local_device: raise LocalDeviceError() diff --git a/tests/test_ios.py b/tests/test_ios.py index d85ec551b..300cae619 100644 --- a/tests/test_ios.py +++ b/tests/test_ios.py @@ -1,4 +1,5 @@ # encoding=utf-8 +import hashlib import os import shutil import time @@ -27,9 +28,8 @@ def setUpClass(cls): cls.ios = connect_device("iOS:///http+usbmux://") cls.TEST_FSYNC_APP = "" # 测试文件推送、同步的app的bundleID # 获取一个可以用于文件操作的app - app_list = cls.ios.list_app(type="all") - if len(app_list) > 0: - cls.TEST_FSYNC_APP = app_list[0][0] + cls.TEST_FSYNC_APP = "com.apple.Keynote" + # cls.TEST_FSYNC_APP = "rn.notes.best" @classmethod def tearDownClass(cls): @@ -325,20 +325,36 @@ def test_set_clipboard(self): def test_ls(self): print("test ls") - print(self.ios.ls("/Documents/", self.TEST_FSYNC_APP)) + # ls /DCIM/ + dcim = self.ios.ls("/DCIM/") + print(dcim) + self.assertTrue(isinstance(dcim, list) and len(dcim) > 0) + self.assertTrue(isinstance(dcim[0], dict)) + + # ls app /Documents/ + with open("test_ls_file.txt", 'w') as f: + f.write('Test data') + self.ios.push("test_ls_file.txt", "/Documents/", self.TEST_FSYNC_APP) + file_list = self.ios.ls("/Documents/", self.TEST_FSYNC_APP) + self.assertTrue(isinstance(file_list, list)) + self.assertTrue(len(file_list) > 0) + self.assertTrue(isinstance(file_list[0], dict)) + self._try_remove_ios("/Documents/test_ls_file.txt", self.TEST_FSYNC_APP) + try_remove("test_ls_file.txt") - def test_push(self): - def _try_remove_ios(file_name, bundle_id=None): - try: - self.ios.rm(file_name, bundle_id) - file_list = self.ios.ls(os.path.dirname(file_name), bundle_id) - for file in file_list: - if file['name'] == file_name: - raise Exception(f"remove file {file_name} failed") - print(f"file {file_name} not exist now.") - except: - pass + def _try_remove_ios(self, file_name, bundle_id=None): + try: + self.ios.rm(file_name, bundle_id) + file_list = self.ios.ls(os.path.dirname(file_name), bundle_id) + for file in file_list: + if file['name'] == file_name: + raise Exception(f"remove file {file_name} failed") + print(f"file {file_name} not exist now.") + except: + print(f"not find {file_name}") + pass + def test_push(self): def _test_file(file_name, dst="/Documents/", bundle_id=self.TEST_FSYNC_APP, target=None): try_remove(file_name) with open(file_name, 'w') as f: @@ -352,7 +368,7 @@ def _test_file(file_name, dst="/Documents/", bundle_id=self.TEST_FSYNC_APP, targ target = tmp_dst.replace('\\', '/') # 清理手机里的文件 - _try_remove_ios(target, bundle_id) + self._try_remove_ios(target, bundle_id) self.ios.push(file_name, dst, bundle_id, timeout=60) time.sleep(1) file_list = self.ios.ls(target, bundle_id) @@ -360,7 +376,7 @@ def _test_file(file_name, dst="/Documents/", bundle_id=self.TEST_FSYNC_APP, targ self.assertEqual(len(file_list), 1) self.assertEqual(file_list[0]['name'], os.path.basename(target)) self.assertEqual(file_list[0]['type'], 'File') - self.ios.rm(target, bundle_id) + self._try_remove_ios(target, bundle_id) time.sleep(1) # 清理 @@ -376,7 +392,7 @@ def _test_dir(dir_name, dst="/Documents/"): # 创建文件夹和文件 try_remove(dir_name) - _try_remove_ios(target, self.TEST_FSYNC_APP) + self._try_remove_ios(target, self.TEST_FSYNC_APP) os.mkdir(dir_name) with open(f'{dir_name}/test_data', 'w') as f: f.write('Test data') @@ -389,7 +405,7 @@ def _test_dir(dir_name, dst="/Documents/"): self.assertTrue(f"{dir_name}/" in [item['name'] for item in dir_list]) file_list = self.ios.ls(f"{target}/test_data", self.TEST_FSYNC_APP) self.assertTrue("test_data" in [item['name'] for item in file_list]) - self.ios.rm(target, self.TEST_FSYNC_APP) + self._try_remove_ios(target, self.TEST_FSYNC_APP) time.sleep(1) try_remove(dir_name) @@ -399,7 +415,7 @@ def _test_dir(dir_name, dst="/Documents/"): _test_file("test_data_1.txt", "/Documents/") _test_file("test_data_2.txt", "/Documents/test_data_2.txt") _test_file("test_data_3.txt", "/Documents/test_data_3.txt/") - #重命名文件 + # 重命名文件 _test_file("test_data_4.txt", "/Documents/test_data.txt/", target="/Documents/test_data.txt") _test_file("test_data.txt", "/Documents") _test_file("test_1.png", "/DCIM", None) @@ -423,70 +439,239 @@ def _test_dir(dir_name, dst="/Documents/"): _test_dir('测 试 文 件 夹', "/Documents") def test_pull(self): - def _test_file(file_name): + def _get_file_md5(file_path): + hasher = hashlib.md5() + with open(file_path, 'rb') as f: + while chunk := f.read(8192): + hasher.update(chunk) + return hasher.hexdigest() + + def _get_folder_md5(folder_path): + md5_list = [] + for root, _, files in os.walk(folder_path): + for file in sorted(files): # Sort to maintain order + file_path = os.path.join(root, file) + file_md5 = _get_file_md5(file_path) + md5_list.append(file_md5) + + combined_md5 = hashlib.md5("".join(md5_list).encode()).hexdigest() + return combined_md5 + + def _test_file(file_name, bundle_id=self.TEST_FSYNC_APP, folder="/Documents"): + target = f"{folder}/{file_name}" + # 删除手机和本地存在的文件, + try_remove(file_name) + self._try_remove_ios(target, bundle_id=bundle_id) + + # 创建文件,推送文件 + with open(file_name, 'w') as f: + f.write('Test data') + md5 = _get_file_md5(file_name) + self.ios.push(file_name, f"{folder}/", bundle_id=bundle_id, timeout=60) + try_remove(file_name) + + # 下载文件 print(f"test pull file {file_name}") - self.ios.pull(f"/Documents/{file_name}", ".", self.TEST_FSYNC_APP, timeout=60) + self.ios.pull(target, ".", bundle_id=bundle_id, timeout=60) self.assertTrue(os.path.exists(file_name)) + self.assertEqual(md5, _get_file_md5(file_name)) + + # 下载、重命名文件 + self.ios.pull(target, "rename_file.txt", bundle_id=bundle_id, timeout=60) + self.assertTrue(os.path.exists("rename_file.txt")) + self.assertEqual(md5, _get_file_md5("rename_file.txt")) + + # 带文件夹路径 + os.mkdir("test_dir") + self.ios.pull(target, "test_dir", bundle_id=bundle_id, timeout=60) + self.assertTrue(os.path.exists(f"test_dir/{file_name}")) + self.assertEqual(md5, _get_file_md5(f"test_dir/{file_name}")) + + # 清理 + self._try_remove_ios(target, bundle_id) try_remove(file_name) + try_remove("rename_file.txt") + try_remove("test_dir") + + def _test_dir(dir_name, bundle_id=self.TEST_FSYNC_APP, folder="/Documents"): + target = f"{folder}/{dir_name}" + # 删除手机和本地存在的文件夹,创建文件夹和文件 + try_remove(dir_name) + self._try_remove_ios(target, bundle_id=bundle_id) + os.mkdir(dir_name) + with open(f'{dir_name}/test_data', 'w') as f: + f.write('Test data') + md5 = _get_folder_md5(dir_name) + self.ios.push(dir_name, f"{folder}/", bundle_id=bundle_id, timeout=60) + time.sleep(1) + try_remove(dir_name) - def _test_dir(dir_name): + # 推送文件夹 print(f"test pull directory {dir_name}") os.mkdir(dir_name) - self.ios.pull(f"/Documents/{dir_name}", dir_name, self.TEST_FSYNC_APP, timeout=60) + self.ios.pull(target, dir_name, bundle_id=bundle_id, timeout=60) self.assertTrue(os.path.exists(f"{dir_name}/{dir_name}")) + self.assertEqual(md5, _get_folder_md5(f"{dir_name}/{dir_name}")) + + # 清理 + self._try_remove_ios(target, bundle_id=bundle_id) + time.sleep(1) try_remove(dir_name) - + + # 执行得太快会报错,可能和wda的处理速度有关 + # 如果报错尝试单独执行那些用例 _test_file("test_data.txt") + _test_file("t e s t _ d a t a.txt") _test_file("测试文件.txt") + _test_file("测 试 文 件.txt") + _test_file("(){}[]~'-_@!#$%&+,;=^.txt") + _test_file("data") + _test_file("data.png", bundle_id=None, folder="/DCIM") + + _test_dir('test_dir') + _test_dir('t e s t _ d i r') _test_dir('测试文件夹') + _test_dir('测试文件夹.txt') + _test_dir('测 试 文 件 夹') + _test_dir("(){}[]~'-_@!#$%&+,;=^") + _test_dir('test_dir_no_bundle', bundle_id=None, folder="/DCIM") def test_rm(self): - def _test_file(file_name): + def _test_file(file_name, bundle_id=self.TEST_FSYNC_APP, folder="/Documents"): + target = f"{folder}/{file_name}" + + # 删除手机和本地存在的文件,创建文件 + self._try_remove_ios(target, bundle_id) + with open(file_name, 'w') as f: + f.write('Test data') + + # 推送文件 + self.ios.push(file_name, f"{folder}/", bundle_id, timeout=60) + time.sleep(1) + try_remove(file_name) + file_list = self.ios.ls(target, bundle_id) + self.assertEqual(len(file_list), 1) + + # 删除文件 print(f"test rm file {file_name}") - file_list = self.ios.ls("/Documents/", self.TEST_FSYNC_APP) - find_flag = False + self.ios.rm(target, bundle_id) + file_list = self.ios.ls(folder, bundle_id) for item in file_list: if item['name'] == file_name: - print(f"find file {file_name}") - find_flag = True - break - if not find_flag: - print(f"not find file {file_name}") - return - self.ios.rm(f"/Documents/{file_name}", self.TEST_FSYNC_APP) - file_list = self.ios.ls("/Documents/", self.TEST_FSYNC_APP) - self.assertTrue(file_name not in [item['name'] for item in file_list]) - - def _test_dir(dir_name): + raise Exception(f"remove {file_name} failed") + + def _test_dir(dir_name, bundle_id=self.TEST_FSYNC_APP, folder="/Documents"): + target = f"{folder}/{dir_name}" + + # 删除手机和本地存在的文件夹,创建文件夹和文件 + self._try_remove_ios(target, bundle_id) + os.mkdir(dir_name) + with open(f'{dir_name}/test_data', 'w') as f: + f.write('Test data') + + # 推送文件夹 + self.ios.push(dir_name, f"{folder}/", bundle_id, timeout=60) + time.sleep(1) + try_remove(dir_name) + print(f"test rm directory {dir_name}") - file_list = self.ios.ls("/Documents/", self.TEST_FSYNC_APP) - find_flag = False + file_list = self.ios.ls(folder, bundle_id) for item in file_list: if item['name'] == f"{dir_name}/": - print(f"find dir {dir_name}") - find_flag = True break - if not find_flag: - print(f"not find dir {dir_name}") - return - self.ios.rm(f"/Documents/{dir_name}", self.TEST_FSYNC_APP) - file_list = self.ios.ls("/Documents/", self.TEST_FSYNC_APP) + else: + raise Exception(f"directory {dir_name} not exist") + + # 删除文件夹 + self.ios.rm(target, bundle_id) + file_list = self.ios.ls(folder, bundle_id) self.assertTrue(f"{dir_name}/" not in [item['name'] for item in file_list]) + # 执行得太快会报错,可能和wda的处理速度有关 + # 如果报错尝试单独执行那些用例 _test_file("test_data.txt") + _test_file("t e s t _ d a t a.txt") _test_file("测试文件.txt") + _test_file("测 试 文 件.txt") + _test_file("(){}[]~'-_@!#$%&+,;=^.txt") + _test_file("data") + _test_file("data.png", bundle_id=None, folder="/DCIM") + _test_dir('test_dir') + _test_dir('t e s t _ d i r') _test_dir('测试文件夹') + _test_dir('测试文件夹.txt') + _test_dir('测 试 文 件 夹') + _test_dir("(){}[]~'-_@!#$%&+,;=^") + _test_dir('test_dir_no_bundle', bundle_id=None, folder="/DCIM") def test_mkdir(self): - print("test mkdir") - dir_name = "/Documents/test_dir" - self.ios.mkdir(dir_name, self.TEST_FSYNC_APP) + def _test(dir_name, bundle_id=self.TEST_FSYNC_APP, folder="/Documents"): + target = f"{folder}/{dir_name}" + + # 删除目标目录 + self._try_remove_ios(target, bundle_id) + + print("test mkdir") + + # 创建目录 + self.ios.mkdir(target, bundle_id) + + # 获取目标文件夹下的目录列表 + dirs = self.ios.ls(folder, bundle_id) + + # 检查新建的目录是否存在 + self.assertTrue(any(d['name'] == f"{dir_name}/" for d in dirs)) + + # 删除目标目录 + self._try_remove_ios(target, bundle_id) + + # 执行得太快会报错,可能和wda的处理速度有关 + # 如果报错尝试单独执行那些用例 + _test('test_dir') + _test('t e s t _ d i r') + _test('测试文件夹') + _test('测试文件夹.txt') + _test('测 试 文 件 夹') + _test("(){}[]~'-_@!#$%&+,;=^") + _test('test_dir_no_bundle', bundle_id=None, folder="/DCIM") + + def test_is_dir(self): + print("test is_dir") + + def create_and_push_file(local_name, remote_name, bundle_id): + with open(local_name, 'w') as f: + f.write('Test data') + self.ios.push(local_name, remote_name, bundle_id) + try_remove(local_name) + + def create_and_push_dir(local_name, remote_name, bundle_id): + os.makedirs(local_name) + self.ios.push(local_name, remote_name, bundle_id) + try_remove(local_name) - dirs = self.ios.ls("/Documents", self.TEST_FSYNC_APP) - self.assertTrue(any(d['name'] == 'test_dir/' for d in dirs)) - self.ios.rm(dir_name, self.TEST_FSYNC_APP) + # 测试文件 + file_path = "/Documents/test_data.txt" + self._try_remove_ios(file_path, self.TEST_FSYNC_APP) + create_and_push_file("test_data.txt", "/Documents/", self.TEST_FSYNC_APP) + self.assertFalse(self.ios.is_dir(file_path, self.TEST_FSYNC_APP)) + self._try_remove_ios(file_path, self.TEST_FSYNC_APP) + + # 测试文件夹 + dir_path = "/Documents/test_dir" + create_and_push_dir("test_dir", "/Documents/", self.TEST_FSYNC_APP) + self.assertTrue(self.ios.is_dir(dir_path, self.TEST_FSYNC_APP)) + self._try_remove_ios(dir_path, self.TEST_FSYNC_APP) + + # 测试另外一个文件夹 + file_path_dcim = "/DCIM/test.png" + self._try_remove_ios(file_path_dcim, None) + create_and_push_file("test_data.txt", "/DCIM/test.png", None) + self.assertTrue(self.ios.is_dir("/DCIM", None)) + self.assertFalse(self.ios.is_dir(file_path_dcim, None)) + self._try_remove_ios(file_path_dcim, None) + if __name__ == '__main__': @@ -515,6 +700,7 @@ def test_mkdir(self): suite.addTest(TestIos("test_pull")) suite.addTest(TestIos("test_mkdir")) suite.addTest(TestIos("test_rm")) + suite.addTest(TestIos("test_is_dir")) # 联合接口,顺序测试:解锁屏、应用启动关闭 suite.addTest(TestIos("test_is_locked")) suite.addTest(TestIos("test_lock")) From f242fe0b26f02434f706e4f3f89f8138ff97f3c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=90=A6?= Date: Tue, 25 Jun 2024 16:24:18 +0800 Subject: [PATCH 14/16] add ios push/pull comment and testcase --- airtest/core/api.py | 25 +++++- airtest/core/ios/ios.py | 180 +++++++++++++++++++++++++--------------- tests/test_ios.py | 79 +++++++++--------- 3 files changed, 171 insertions(+), 113 deletions(-) diff --git a/airtest/core/api.py b/airtest/core/api.py index 0f0be053d..b38d95727 100644 --- a/airtest/core/api.py +++ b/airtest/core/api.py @@ -735,8 +735,17 @@ def push(local, remote, *args, **kwargs): :platforms: Android, iOS :Example: - >>> connect_device("android:///") - >>> push(r"D:\demo\test.text", "/data/local/tmp/test.text") + Android:: + + >>> connect_device("android:///") + >>> push(r"D:\demo\test.text", "/data/local/tmp/test.text") + + + iOS:: + + >>> connect_device("iOS:///http+usbmux://udid") + >>> push("test.png", "/DCIM/") # push to the DCIM directory + >>> push(r"D:\demo\test.text", "/Documents", bundle_id="com.apple.Keynote") # push to the Documents directory of the Keynote app """ return G.DEVICE.push(local, remote, *args, **kwargs) @@ -753,8 +762,16 @@ def pull(remote, local, *args, **kwargs): :platforms: Android, iOS :Example: - >>> connect_device("android:///") - >>> pull("/data/local/tmp/test.text", r"D:\demo\test.text") + Android:: + + >>> connect_device("android:///") + >>> pull("/data/local/tmp/test.txt", r"D:\demo\test.txt") + + iOS:: + + >>> connect_device("iOS:///http+usbmux://udid") + >>> pull("/DCIM/test.png", r"D:\demo\test.png") + >>> pull("/Documents/test.key", r"D:\demo\test.key", bundle_id="com.apple.Keynote") """ return G.DEVICE.pull(remote, local, *args, **kwargs) diff --git a/airtest/core/ios/ios.py b/airtest/core/ios/ios.py index 9f06e5e26..4bf5fe9b7 100644 --- a/airtest/core/ios/ios.py +++ b/airtest/core/ios/ios.py @@ -46,6 +46,7 @@ def decorator_retry_session(func): 当因为session失效而操作失败时,尝试重新获取session,最多重试3次。 """ + @wraps(func) def wrapper(self, *args, **kwargs): try: @@ -59,6 +60,7 @@ def wrapper(self, *args, **kwargs): time.sleep(0.5) continue raise AirtestError("Failed to re-acquire session.") + return wrapper @@ -66,6 +68,7 @@ def decorator_pairing_dialog(func): """ When the device is not paired, trigger the trust dialogue and try again. """ + @wraps(func) def wrapper(*args, **kwargs): try: @@ -77,6 +80,7 @@ def wrapper(*args, **kwargs): error_msg = "Device is not yet paired. Triggered the trust dialogue. Please accept and try again." LOGGING.error(error_msg) raise + return wrapper @@ -90,6 +94,7 @@ def add_decorator_to_methods(decorator): Returns: - decorator_wrapper: A function that takes a class as input and decorates all the methods of the class by applying the input decorator to each method. """ + def decorator_wrapper(cls): # 获取要装饰的类的所有方法 methods = [attr for attr in dir(cls) if callable(getattr(cls, attr)) and not attr.startswith("_")] @@ -99,8 +104,10 @@ def decorator_wrapper(cls): setattr(cls, method, decorator(getattr(cls, method))) return cls + return decorator_wrapper + def format_file_list(file_list): formatted_list = [] for file in file_list: @@ -111,9 +118,10 @@ def format_file_list(file_list): 'name': file[3] } formatted_list.append(file_info) - + return formatted_list + @add_decorator_to_methods(decorator_pairing_dialog) class TIDevice: """Below staticmethods are provided by Tidevice. @@ -125,11 +133,11 @@ def devices(): Get all available devices connected by USB, return a list of UDIDs. Returns: - list: A list of UDIDs. + list: A list of UDIDs. e.g. ['539c5fffb18f2be0bf7f771d68f7c327fb68d2d9'] """ return Usbmux().device_udid_list() - + @staticmethod def list_app(udid, app_type="user"): """ @@ -175,7 +183,7 @@ def list_wda(udid): if (bundle_id.startswith('com.') and bundle_id.endswith(".xctrunner")) or display_name == "WebDriverAgentRunner-Runner": wda_list.append(bundle_id) return wda_list - + @staticmethod def device_info(udid): """ @@ -208,8 +216,8 @@ def device_info(udid): 'BasebandVersion' """ for attr in ('ProductVersion', 'ProductType', - 'ModelNumber', 'SerialNumber', 'PhoneNumber', - 'TimeZone', 'UniqueDeviceID'): + 'ModelNumber', 'SerialNumber', 'PhoneNumber', + 'TimeZone', 'UniqueDeviceID'): key = attr[0].lower() + attr[1:] if attr in device_info: tmp_dict[key] = device_info[attr] @@ -218,7 +226,7 @@ def device_info(udid): except: tmp_dict["marketName"] = "" return tmp_dict - + @staticmethod def install_app(udid, file_or_url): BaseDevice(udid, Usbmux()).app_install(file_or_url) @@ -263,7 +271,7 @@ def ps(udid): continue ps_list.append({key: p[key] for key in keys}) return ps_list - + @staticmethod def ps_wda(udid): """Get all running WDA on device that meet certain naming rules. @@ -283,20 +291,22 @@ def ps_wda(udid): else: continue return ps_wda_list - + @staticmethod def xctest(udid, wda_bundle_id): try: - return BaseDevice(udid, Usbmux()).xctest(fuzzy_bundle_id=wda_bundle_id, logger=setup_logger(level=logging.INFO)) + return BaseDevice(udid, Usbmux()).xctest(fuzzy_bundle_id=wda_bundle_id, + logger=setup_logger(level=logging.INFO)) except Exception as e: - print(f"Failed to run tidevice xctest function for {wda_bundle_id}.Try to run tidevice runwda function for {wda_bundle_id}.") + print( + f"Failed to run tidevice xctest function for {wda_bundle_id}.Try to run tidevice runwda function for {wda_bundle_id}.") try: return BaseDevice(udid, Usbmux()).runwda(fuzzy_bundle_id=wda_bundle_id) except Exception as e: print(f"Failed to run tidevice runwda function for {wda_bundle_id}.") # 先不抛出异常,ios17的兼容未合并进来,ios17设备一定会报错 # raise AirtestError(f"Failed to start XCTest for {wda_bundle_id}.") - + @staticmethod def push(udid, local_path, device_path, bundle_id=None, timeout=None): """ @@ -320,12 +330,11 @@ def push(udid, local_path, device_path, bundle_id=None, timeout=None): >>> TIDevice.push("00008020-001270842E88002E", "C:/Users/username/test.key", "/Documents", "com.apple.Keynote") >>> TIDevice.push("00008020-001270842E88002E", "C:/Users/username/test.key", "/Documents/test.key", "com.apple.Keynote") - """ try: if not os.path.exists(local_path): raise AirtestError(f"Local path {local_path} does not exist.") - + if bundle_id: sync = BaseDevice(udid, Usbmux()).app_sync(bundle_id) else: @@ -370,7 +379,8 @@ def push(udid, local_path, device_path, bundle_id=None, timeout=None): sync.push_content(device_file_path, content) print(f"pushed {local_path} to {device_path}") except Exception as e: - raise AirtestError(f"Failed to push {local_path} to {device_path}. If push a FILE, please check if there is a DIRECTORY with the same name already exists. If push a DIRECTORY, please check if there is a FILE with the same name already exists, and try again.") + raise AirtestError( + f"Failed to push {local_path} to {device_path}. If push a FILE, please check if there is a DIRECTORY with the same name already exists. If push a DIRECTORY, please check if there is a FILE with the same name already exists, and try again.") @staticmethod def pull(udid, device_path, local_path, bundle_id=None, timeout=None): @@ -380,21 +390,21 @@ def pull(udid, device_path, local_path, bundle_id=None, timeout=None): Args: udid (str): The UDID of the iOS device. device_path (str): The path of the file or directory on the iOS device. - Remote devices can only be file paths. + Remote devices can only be file paths. local_path (str): The destination path on the local machine. - Remote devices can only be file paths. + Remote devices can only be file paths. bundle_id (str, optional): The bundle ID of the app. If provided, the file or directory will be pulled from the app's sandbox. Defaults to None. timeout (int, optional): The timeout in seconds for the remote device operation. Defaults to None. Examples: - + Pull a file from the DCIM directory:: - + >>> TIDevice.pull("00008020-001270842E88002E", "/DCIM/photo.jpg", "C:/Users/username/Pictures/photo.jpg") >>> TIDevice.pull("00008020-001270842E88002E", "/DCIM/photo.jpg", "C:/Users/username/Pictures") - + Pull a directory from the Documents directory of the Keynote app:: - + >>> TIDevice.pull("00008020-001270842E88002E", "/Documents", "C:/Users/username/Documents", "com.apple.Keynote") >>> TIDevice.pull("00008020-001270842E88002E", "/Documents", "C:/Users/username/Documents", "com.apple.Keynote") @@ -407,7 +417,7 @@ def pull(udid, device_path, local_path, bundle_id=None, timeout=None): if TIDevice.is_dir(udid, device_path, bundle_id): os.makedirs(local_path, exist_ok=True) - + src = pathlib.Path(device_path) dst = pathlib.Path(local_path) if dst.is_dir() and src.name and sync.stat(src).is_dir(): @@ -438,12 +448,13 @@ def rm(udid, remote_path, bundle_id=None): >>> TIDevice.rm("00008020-001270842E88002E", "/Documents", "com.apple.Keynote") """ + def _check_status(status, path): if status == 0: print("removed", path) else: raise AirtestError(f"<{status.name} {status.value}> Failed to remove {path}") - + def _remove_folder(udid, folder_path, bundle_id): folder_path = folder_path.replace("\\", "/") for file_info in TIDevice.ls(udid, folder_path, bundle_id): @@ -455,12 +466,12 @@ def _remove_folder(udid, folder_path, bundle_id): # remove the folder itself status = sync.remove(folder_path) _check_status(status, folder_path) - + if bundle_id: sync = BaseDevice(udid, Usbmux()).app_sync(bundle_id) else: sync = BaseDevice(udid, Usbmux()).sync - + if TIDevice.is_dir(udid, remote_path, bundle_id): if not remote_path.endswith("/"): remote_path += "/" @@ -481,6 +492,18 @@ def ls(udid, remote_path, bundle_id=None): Returns: list: A list of files and directories in the specified path. + + Examples: + + List files and directories in the DCIM directory:: + + >>> print(TIDevice.ls("00008020-001270842E88002E", "/DCIM")) + [{'type': 'Directory', 'size': 96, 'last_modified': '2021-12-01 15:30:13', 'name': '100APPLE/'}, {'type': 'Directory', 'size': 96, 'last_modified': '2021-07-20 17:29:01', 'name': '.MISC/'}] + + List files and directories in the Documents directory of the Keynote app:: + + >>> print(TIDevice.ls("00008020-001270842E88002E", "/Documents", "com.apple.Keynote")) + [{'type': 'File', 'size': 302626, 'last_modified': '2024-06-25 11:25:25', 'name': '演示文稿.key'}] """ try: file_list = [] @@ -509,7 +532,7 @@ def mkdir(udid, remote_path, bundle_id=None): udid (str): The UDID of the iOS device. remote_path (str): The path of the directory to be created on the iOS device. bundle_id (str, optional): The bundle ID of the app. Defaults to None. - + Examples: Create a directory in the DCIM directory:: @@ -540,10 +563,10 @@ def is_dir(udid, remote_path, bundle_id): udid (str): The UDID of the iOS device. remote_path (str): The path on the iOS device. bundle_id (str): The bundle ID of the app. - + Returns: bool: True if the path is a directory, False otherwise. - + Examples: Check if the DCIM directory is a directory:: @@ -568,8 +591,10 @@ def is_dir(udid, remote_path, bundle_id): if info['name'] == f"{remote_path_base}": return info['type'] == 'Directory' except Exception as e: - raise AirtestError(f"Failed to check if {remote_path} is a directory. Please check the path exist and try again.") - + raise AirtestError( + f"Failed to check if {remote_path} is a directory. Please check the path exist and try again.") + + @add_decorator_to_methods(decorator_retry_session) class IOS(Device): """IOS client. @@ -581,7 +606,8 @@ class IOS(Device): - ``iproxy $port 8100 $udid`` """ - def __init__(self, addr=DEFAULT_ADDR, cap_method=CAP_METHOD.MJPEG, mjpeg_port=None, udid=None, name=None, serialno=None, wda_bundle_id=None): + def __init__(self, addr=DEFAULT_ADDR, cap_method=CAP_METHOD.MJPEG, mjpeg_port=None, udid=None, name=None, + serialno=None, wda_bundle_id=None): super().__init__() # If none or empty, use default addr. @@ -607,12 +633,13 @@ def __init__(self, addr=DEFAULT_ADDR, cap_method=CAP_METHOD.MJPEG, mjpeg_port=No # e.g., connect local device http://127.0.0.1:8100 or http://localhost:8100 or http+usbmux://00008020-001270842E88002E self.udid = udid or name or serialno self._wda_bundle_id = wda_bundle_id - parsed = urlparse(self.addr).netloc.split(":")[0] if ":" in urlparse(self.addr).netloc else urlparse(self.addr).netloc + parsed = urlparse(self.addr).netloc.split(":")[0] if ":" in urlparse(self.addr).netloc else urlparse( + self.addr).netloc if parsed not in ["localhost", "127.0.0.1"] and "." in parsed: # Connect remote device via url. self.is_local_device = False - self.driver = wda.Client(self.addr) - else: + self.driver = wda.Client(self.addr) + else: # Connect local device via url. self.is_local_device = True if parsed in ["localhost", "127.0.0.1"]: @@ -655,25 +682,25 @@ def _get_default_device(self): if device_udid_list: return device_udid_list[0] raise IndexError("iOS devices not found, please connect device first.") - + def _get_default_wda_bundle_id(self): """Get local default device's WDA bundleID when no WDA bundleID. Returns: Local device's WDA bundleID. - """ + """ try: wda_list = TIDevice.list_wda(self.udid) return wda_list[0] except IndexError: raise IndexError("WDA bundleID not found, please install WDA on device.") - + def _get_default_running_wda_bundle_id(self): """Get the bundleID of the WDA that is currently running on local device. Returns: Local device's running WDA bundleID. - """ + """ try: running_wda_list = TIDevice.ps_wda(self.udid) return running_wda_list[0] @@ -685,7 +712,7 @@ def wda_bundle_id(self): if not self._wda_bundle_id and self.is_local_device: self._wda_bundle_id = self._get_default_wda_bundle_id() return self._wda_bundle_id - + @property def ip(self): """Returns the IP address of the host connected to the iOS phone. @@ -798,7 +825,7 @@ def _register_rotation_watcher(self): def window_size(self): """ - Returns: + Returns: Window size (width, height). """ try: @@ -986,7 +1013,7 @@ def _quick_click(self, x, y, duration): """ The method extended from the facebook-wda third-party library. Use modified appium wda to perform quick click. - + Args: x, y (int, float): float(percent), int(coordicate) duration (optional): tap_hold duration @@ -994,17 +1021,17 @@ def _quick_click(self, x, y, duration): x, y = self.driver._percent2pos(x, y) data = {'x': x, 'y': y, 'duration': duration} # 为了兼容改动直接覆盖原生接口的自制版wda。 - + try: self.driver._session_http.post('/wda/deviceTap', data=data) - #如果找不到接口说明是低版本的wda,低于1.3版本没有此接口 + # 如果找不到接口说明是低版本的wda,低于1.3版本没有此接口 except wda.WDARequestError as e: try: return self.driver._session_http.post('/wda/tap', data=data) except wda.WDARequestError as e: if e.status == 110: self.driver.click(x, y, duration) - + def double_click(self, pos): x, y = self._transform_xy(pos) self.driver.double_tap(x, y) @@ -1030,6 +1057,7 @@ def swipe(self, fpos, tpos, duration=0, delay=None, *args, **kwargs): fx, fy = int(fx * self.touch_factor), int(fy * self.touch_factor) if not (tx < 1 and ty < 1): tx, ty = int(tx * self.touch_factor), int(ty * self.touch_factor) + # 如果是通过ide来滑动,且安装的是自制版的wda就调用快速滑动接口,其他时候不关心滑动速度就使用原生接口保证滑动精确性。 def ios_tagent_swipe(fpos, tpos, delay=None): # 调用自定义的wda swipe接口需要进行坐标转换。 @@ -1057,7 +1085,7 @@ def _quick_swipe(self, x1, y1, x2, y2, delay): """ The method extended from the facebook-wda third-party library. Use modified appium wda to perform quick swipe. - + Args: x1, y1, x2, y2 (int, float): float(percent), int(coordicate) delay (float): start coordinate to end coordinate duration (seconds) @@ -1073,7 +1101,7 @@ def _quick_swipe(self, x1, y1, x2, y2, delay): if self.using_ios_tagent: try: self.driver._session_http.post('/wda/deviceSwipe', data=data) - #如果找不到接口说明是低版本的wda,低于1.3版本没有此接口 + # 如果找不到接口说明是低版本的wda,低于1.3版本没有此接口 except wda.WDARequestError as e: return self.driver._session_http.post('/wda/swipe', data=data) except wda.WDARequestError as e: @@ -1113,7 +1141,7 @@ def text(self, text, enter=True): if enter: text += '\n' self.driver.send_keys(text) - + def install_app(self, file_or_url, **kwargs): """ curl -X POST $JSON_HEADER \ @@ -1135,11 +1163,11 @@ def install_app(self, file_or_url, **kwargs): if not self.is_local_device: raise LocalDeviceError() return TIDevice.install_app(self.udid, file_or_url) - + def uninstall_app(self, bundle_id): """Uninstall app from the device. - Notes: + Notes: It seems always return True. Args: @@ -1171,10 +1199,10 @@ def start_app(self, bundle_id, *args, **kwargs): raise AirtestError(f"App launch timeout, please check if the app is installed: {bundle_id}") else: return TIDevice.start_app(self.udid, bundle_id) - + def stop_app(self, bundle_id): """ - Note: Both ways of killing the app may fail, nothing responds or just closes the + Note: Both ways of killing the app may fail, nothing responds or just closes the app to the background instead of actually killing it and no error will be reported. """ try: @@ -1235,13 +1263,13 @@ def app_current(self): "bundleId": "com.netease.cloudmusic"} """ return self.driver.app_current() - + def get_clipboard(self, wda_bundle_id=None, *args, **kwargs): """Get clipboard text. - Before calling the WDA interface, you need to ensure that WDA was foreground. + Before calling the WDA interface, you need to ensure that WDA was foreground. If there are multiple WDA on your device, please specify the active WDA by parameter wda_bundle_id. - + Args: wda_bundle_id: The bundle id of the running WDA, if None, will use default WDA bundle id. @@ -1252,7 +1280,7 @@ def get_clipboard(self, wda_bundle_id=None, *args, **kwargs): LocalDeviceError: If the device is remote and the wda_bundle_id parameter is not provided. Notes: - If you want to use this function, you have to set WDA foreground which would switch the + If you want to use this function, you have to set WDA foreground which would switch the current screen of the phone. Then we will try to switch back to the screen before. """ if wda_bundle_id is None: @@ -1281,7 +1309,7 @@ def get_clipboard(self, wda_bundle_id=None, *args, **kwargs): else: LOGGING.warning("we can't switch back to the app before, because can't get bundle id.") return decoded_text - + def set_clipboard(self, content, wda_bundle_id=None, *args, **kwargs): """ Set the clipboard content on the device. @@ -1476,7 +1504,7 @@ def alert_wait(self, time_counter=2): return self.driver.alert.wait(time_counter) def alert_buttons(self): - """Get alert buttons text. + """Get alert buttons text. Notes: Might not work on all devices. @@ -1509,7 +1537,7 @@ def alert_click(self, buttons): return self.driver.alert.click(buttons) def home_interface(self): - """Get True for the device status is on home interface. + """Get True for the device status is on home interface. Reason: Some devices can Horizontal screen on the home interface. @@ -1538,7 +1566,7 @@ def disconnect(self): self.mjpegcap.teardown_stream() if self.rotation_watcher: self.rotation_watcher.teardown() - + def start_recording(self, max_time=1800, output=None, fps=10, snapshot_sleep=0.001, orientation=0, max_size=None, *args, **kwargs): """Start recording the device display. @@ -1588,7 +1616,7 @@ def start_recording(self, max_time=1800, output=None, fps=10, if self.recorder and self.recorder.is_running(): LOGGING.warning("recording is already running, please don't call again") return None - + logdir = "./" if ST.LOG_DIR is not None: logdir = ST.LOG_DIR @@ -1601,10 +1629,11 @@ def start_recording(self, max_time=1800, output=None, fps=10, save_path = os.path.join(logdir, output) max_size = get_max_size(max_size) + def get_frame(): data = self.get_frame_from_stream() frame = aircv.utils.string_2_img(data) - + if max_size is not None: frame = resize_by_max(frame, max_size) return frame @@ -1617,13 +1646,13 @@ def get_frame(): LOGGING.info("start recording screen to {}".format(save_path)) return save_path - def stop_recording(self,): + def stop_recording(self, ): """ Stop recording the device display. Recoding file will be kept in the device. """ LOGGING.info("stopping recording") self.recorder.stop() return None - + def push(self, local_path, remote_path, bundle_id=None, timeout=None): """ Pushes a file from the local machine to the iOS device. @@ -1695,8 +1724,7 @@ def ls(self, remote_path, bundle_id=None): - 'size': The size of the item in bytes. - 'last_modified': The last modification time of the item, in the format 'YYYY-MM-DD HH:MM:SS'. - 'name': The name of the item, including the path relative to `remote_path`. - - Example: + e.g. [ {'type': 'Directory', 'size': 1024, 'last_modified': 'YYYY-MM-DD HH:MM:SS', 'name': 'example_directory/'}, {'type': 'File', 'size': 2048, 'last_modified': 'YYYY-MM-DD HH:MM:SS', 'name': 'example_file.txt'} @@ -1704,11 +1732,25 @@ def ls(self, remote_path, bundle_id=None): Raises: LocalDeviceError: If the device is remote. + + Examples: + + List files and directories in the DCIM directory:: + + >>> dev = connect_device("iOS:///http+usbmux://udid") + >>> print(dev.ls("/DCIM/")) + [{'type': 'Directory', 'size': 96, 'last_modified': '2021-12-01 15:30:13', 'name': '100APPLE/'}, {'type': 'Directory', 'size': 96, 'last_modified': '2021-07-20 17:29:01', 'name': '.MISC/'}] + + List files and directories in the Documents directory of the Keynote app:: + + >>> print(dev.ls("/Documents", "com.apple.Keynote")) + [{'type': 'File', 'size': 302626, 'last_modified': '2024-06-25 11:25:25', 'name': 'test.key'}] + """ if not self.is_local_device: raise LocalDeviceError() return TIDevice.ls(self.udid, remote_path, bundle_id=bundle_id) - + @logwrap def rm(self, remote_path, bundle_id=None): """ @@ -1742,9 +1784,9 @@ def mkdir(self, remote_path, bundle_id=None): remote_path (str): The remote path to create. bundle_id (str, optional): The bundle ID of the app. Defaults to None. - + Examples: - + >>> dev = connect_device("iOS:///http+usbmux://udid") >>> dev.mkdir("/Documents/test_dir", "com.apple.Keynote") @@ -1763,7 +1805,7 @@ def is_dir(self, remote_path, bundle_id=None): Returns: bool: True if the path is a directory, False otherwise. - + Exapmles: >>> dev = connect_device("iOS:///http+usbmux://udid") diff --git a/tests/test_ios.py b/tests/test_ios.py index 300cae619..a52deb95e 100644 --- a/tests/test_ios.py +++ b/tests/test_ios.py @@ -12,21 +12,22 @@ import cv2 import warnings import tempfile + warnings.simplefilter("always") -text_flag = True # 控制是否运行text接口用例 +text_flag = True # 控制是否运行text接口用例 skip_alert_flag = False # 控制是否测试alert相关接口用例 DEFAULT_ADDR = "http://localhost:8100/" # iOS设备连接参数 PKG_SAFARI = "com.apple.mobilesafari" -TEST_IPA_FILE_OR_URL = "" # IPA包体的路径或者url链接,测试安装 -TEST_IPA_BUNDLE_ID = "" # IPA安装后app的bundleID,测试卸载 +TEST_IPA_FILE_OR_URL = "" # IPA包体的路径或者url链接,测试安装 +TEST_IPA_BUNDLE_ID = "" # IPA安装后app的bundleID,测试卸载 + class TestIos(unittest.TestCase): @classmethod def setUpClass(cls): - # cls.ios = IOS(addr=DEFAULT_ADDR, cap_method=CAP_METHOD.WDACAP) - cls.ios = connect_device("iOS:///http+usbmux://") - cls.TEST_FSYNC_APP = "" # 测试文件推送、同步的app的bundleID + cls.ios = IOS(addr=DEFAULT_ADDR, cap_method=CAP_METHOD.WDACAP) + cls.TEST_FSYNC_APP = "" # 测试文件推送、同步的app的bundleID # 获取一个可以用于文件操作的app cls.TEST_FSYNC_APP = "com.apple.Keynote" # cls.TEST_FSYNC_APP = "rn.notes.best" @@ -71,7 +72,7 @@ def test_snapshot(self): filename = "./screen.png" if os.path.exists(filename): os.remove(filename) - + screen = self.ios.snapshot(filename=filename) self.assertIsInstance(screen, numpy.ndarray) self.assertTrue(os.path.exists(filename)) @@ -87,11 +88,11 @@ def test_keyevent_home(self): with self.assertRaises(ValueError): self.ios.keyevent("home1") - + def test_keyevent_volume_up(self): print("test_keyevent_volume_up") self.ios.keyevent("voluMeup") - + def test_keyevent_volume_down(self): print("test_keyevent_volume_down") self.ios.keyevent("voluMeDown") @@ -197,7 +198,7 @@ def test_app_current(self): time.sleep(2) self.assertEqual(self.ios.app_current()["bundleId"], PKG_SAFARI) self.ios.stop_app(PKG_SAFARI) - + def test_get_ip_address(self): print("test_get_ip_address") print(self.ios.get_ip_address()) @@ -269,7 +270,7 @@ def test_touch_factor(self): print("display_info:", self.ios.display_info) print("default touch_factor:", self.ios.touch_factor) self.ios.touch((500, 500)) - self.ios.touch_factor = 1/3.3 + self.ios.touch_factor = 1 / 3.3 self.ios.touch((500, 500)) def test_disconnect(self): @@ -278,10 +279,10 @@ def test_disconnect(self): self.ios.get_frame_from_stream() self.ios.disconnect() self.assertEqual(len(self.ios.instruct_helper._port_using_func.keys()), 0) - + def test_record(self): self.ios.start_recording(output="test_10s.mp4") - time.sleep(10+4) + time.sleep(10 + 4) self.ios.stop_recording() time.sleep(2) self.assertEqual(os.path.exists("test_10s.mp4"), True) @@ -290,9 +291,9 @@ def test_record(self): if cap.isOpened(): rate = cap.get(5) frame_num = cap.get(7) - duration = frame_num/rate + duration = frame_num / rate self.assertEqual(duration >= 10, True) - + def test_list_app(self): print("test_list_app") app_list = self.ios.list_app(type="all") @@ -302,7 +303,7 @@ def test_list_app(self): def test_install_app(self): print("test_install_app") self.ios.install_app(TEST_IPA_FILE_OR_URL) - + def test_uninstall_app(self): print("test_uninstall_app") self.ios.uninstall_app(TEST_IPA_BUNDLE_ID) @@ -313,7 +314,7 @@ def test_get_clipboard(self): def test_set_clipboard(self): for i in range(10): - text = "test_set_clipboard"+str(i) + text = "test_set_clipboard" + str(i) self.ios.set_clipboard(text) self.assertEqual(self.ios.get_clipboard(), text) self.ios.paste() @@ -341,7 +342,7 @@ def test_ls(self): self.assertTrue(isinstance(file_list[0], dict)) self._try_remove_ios("/Documents/test_ls_file.txt", self.TEST_FSYNC_APP) try_remove("test_ls_file.txt") - + def _try_remove_ios(self, file_name, bundle_id=None): try: self.ios.rm(file_name, bundle_id) @@ -437,7 +438,7 @@ def _test_dir(dir_name, dst="/Documents/"): _test_dir('测试文件夹', "/Documents/") _test_dir('测试文件夹_1', "/Documents") _test_dir('测 试 文 件 夹', "/Documents") - + def test_pull(self): def _get_file_md5(file_path): hasher = hashlib.md5() @@ -453,7 +454,7 @@ def _get_folder_md5(folder_path): file_path = os.path.join(root, file) file_md5 = _get_file_md5(file_path) md5_list.append(file_md5) - + combined_md5 = hashlib.md5("".join(md5_list).encode()).hexdigest() return combined_md5 @@ -528,7 +529,6 @@ def _test_dir(dir_name, bundle_id=self.TEST_FSYNC_APP, folder="/Documents"): _test_file("data") _test_file("data.png", bundle_id=None, folder="/DCIM") - _test_dir('test_dir') _test_dir('t e s t _ d i r') _test_dir('测试文件夹') @@ -540,7 +540,7 @@ def _test_dir(dir_name, bundle_id=self.TEST_FSYNC_APP, folder="/Documents"): def test_rm(self): def _test_file(file_name, bundle_id=self.TEST_FSYNC_APP, folder="/Documents"): target = f"{folder}/{file_name}" - + # 删除手机和本地存在的文件,创建文件 self._try_remove_ios(target, bundle_id) with open(file_name, 'w') as f: @@ -560,10 +560,10 @@ def _test_file(file_name, bundle_id=self.TEST_FSYNC_APP, folder="/Documents"): for item in file_list: if item['name'] == file_name: raise Exception(f"remove {file_name} failed") - + def _test_dir(dir_name, bundle_id=self.TEST_FSYNC_APP, folder="/Documents"): target = f"{folder}/{dir_name}" - + # 删除手机和本地存在的文件夹,创建文件夹和文件 self._try_remove_ios(target, bundle_id) os.mkdir(dir_name) @@ -582,7 +582,7 @@ def _test_dir(dir_name, bundle_id=self.TEST_FSYNC_APP, folder="/Documents"): break else: raise Exception(f"directory {dir_name} not exist") - + # 删除文件夹 self.ios.rm(target, bundle_id) file_list = self.ios.ls(folder, bundle_id) @@ -605,25 +605,25 @@ def _test_dir(dir_name, bundle_id=self.TEST_FSYNC_APP, folder="/Documents"): _test_dir('测 试 文 件 夹') _test_dir("(){}[]~'-_@!#$%&+,;=^") _test_dir('test_dir_no_bundle', bundle_id=None, folder="/DCIM") - + def test_mkdir(self): def _test(dir_name, bundle_id=self.TEST_FSYNC_APP, folder="/Documents"): target = f"{folder}/{dir_name}" - + # 删除目标目录 self._try_remove_ios(target, bundle_id) - + print("test mkdir") - + # 创建目录 self.ios.mkdir(target, bundle_id) - + # 获取目标文件夹下的目录列表 dirs = self.ios.ls(folder, bundle_id) - + # 检查新建的目录是否存在 self.assertTrue(any(d['name'] == f"{dir_name}/" for d in dirs)) - + # 删除目标目录 self._try_remove_ios(target, bundle_id) @@ -639,31 +639,31 @@ def _test(dir_name, bundle_id=self.TEST_FSYNC_APP, folder="/Documents"): def test_is_dir(self): print("test is_dir") - + def create_and_push_file(local_name, remote_name, bundle_id): with open(local_name, 'w') as f: f.write('Test data') self.ios.push(local_name, remote_name, bundle_id) try_remove(local_name) - + def create_and_push_dir(local_name, remote_name, bundle_id): os.makedirs(local_name) self.ios.push(local_name, remote_name, bundle_id) try_remove(local_name) - + # 测试文件 file_path = "/Documents/test_data.txt" self._try_remove_ios(file_path, self.TEST_FSYNC_APP) create_and_push_file("test_data.txt", "/Documents/", self.TEST_FSYNC_APP) self.assertFalse(self.ios.is_dir(file_path, self.TEST_FSYNC_APP)) self._try_remove_ios(file_path, self.TEST_FSYNC_APP) - + # 测试文件夹 dir_path = "/Documents/test_dir" create_and_push_dir("test_dir", "/Documents/", self.TEST_FSYNC_APP) self.assertTrue(self.ios.is_dir(dir_path, self.TEST_FSYNC_APP)) self._try_remove_ios(dir_path, self.TEST_FSYNC_APP) - + # 测试另外一个文件夹 file_path_dcim = "/DCIM/test.png" self._try_remove_ios(file_path_dcim, None) @@ -673,10 +673,9 @@ def create_and_push_dir(local_name, remote_name, bundle_id): self._try_remove_ios(file_path_dcim, None) - if __name__ == '__main__': # unittest.main() - #构造测试集 + # 构造测试集 suite = unittest.TestSuite() # 初始化相关信息 suite.addTest(TestIos("test_session")) @@ -726,6 +725,6 @@ def create_and_push_dir(local_name, remote_name, bundle_id): suite.addTest(TestIos("get_alert_accept")) suite.addTest(TestIos("get_alert_click")) - #执行测试 + # 执行测试 runner = unittest.TextTestRunner() runner.run(suite) From de75c335e7d4773da0007d18edee55a2cfaffd86 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=90=A6?= Date: Tue, 24 Sep 2024 17:26:00 +0800 Subject: [PATCH 15/16] fix: Fixed the issue where a process might remain when screen recording with ffmpeg fails. (cherry picked from commit 30ca25cbd74ddca52cf1a6e7f6471dabf762f32b) --- airtest/aircv/screen_recorder.py | 23 ++++++++++++++++++----- airtest/core/win/win.py | 30 ++++++++++++++++++------------ 2 files changed, 36 insertions(+), 17 deletions(-) diff --git a/airtest/aircv/screen_recorder.py b/airtest/aircv/screen_recorder.py index c9208ad49..c930a651f 100644 --- a/airtest/aircv/screen_recorder.py +++ b/airtest/aircv/screen_recorder.py @@ -9,6 +9,7 @@ import time import numpy as np import subprocess +import traceback RECORDER_ORI = { @@ -102,9 +103,16 @@ def write(self, frame): self.writer.write(frame.astype(np.uint8)) def close(self): - self.writer.close() - self.process.wait() - self.process.terminate() + try: + self.writer.close() + self.process.wait(timeout=5) + except Exception as e: + print(f"Error closing ffmpeg process: {e}") + finally: + try: + self.process.terminate() + except Exception as e: + print(f"Error terminating ffmpeg process: {e}") class ScreenRecorder: @@ -160,6 +168,7 @@ def stop(self): self._stop_flag = True self.t_write.join() self.t_stream.join() + self.writer.close() # Ensure writer is closed def get_frame_loop(self): # 单独一个线程持续截图 @@ -177,7 +186,6 @@ def get_frame_loop(self): raise def write_frame_loop(self): - # 按帧率间隔获取图像写入视频 try: duration = 1.0/self.writer.fps last_time = time.time() @@ -185,7 +193,11 @@ def write_frame_loop(self): while True: if time.time()-last_time >= duration: last_time += duration - self.writer.write(self.tmp_frame) + try: + self.writer.write(self.tmp_frame) + except Exception as e: + print(f"Error writing frame: {e}") + break if self.is_stop(): break time.sleep(0.0001) @@ -194,4 +206,5 @@ def write_frame_loop(self): except Exception as e: print("write thread error", e) self._stop_flag = True + self.writer.close() # Ensure the writer is closed on error raise diff --git a/airtest/core/win/win.py b/airtest/core/win/win.py index 5b6b9cf45..a48d5fdd0 100644 --- a/airtest/core/win/win.py +++ b/airtest/core/win/win.py @@ -28,12 +28,14 @@ LOGGING = get_logger(__name__) + def require_app(func): @wraps(func) def wrapper(inst, *args, **kwargs): if not inst.app: raise RuntimeError("Connect to an application first to use %s" % func.__name__) return func(inst, *args, **kwargs) + return wrapper @@ -170,7 +172,7 @@ def snapshot(self, filename=None, quality=10, max_size=None): try: with mss.mss() as sct: sct_img = sct.grab(monitor) - screen = numpy.array(sct_img, dtype=numpy.uint8)[...,:3] + screen = numpy.array(sct_img, dtype=numpy.uint8)[..., :3] if filename: aircv.imwrite(filename, screen, quality, max_size=max_size) return screen @@ -305,19 +307,19 @@ def touch(self, pos, **kwargs): time.sleep(interval) for i in range(1, steps): - x = int(start_x + (end_x-start_x) * i / steps) - y = int(start_y + (end_y-start_y) * i / steps) + x = int(start_x + (end_x - start_x) * i / steps) + y = int(start_y + (end_y - start_y) * i / steps) self.mouse.move(coords=(x, y)) time.sleep(interval) self.mouse.move(coords=(end_x, end_y)) - for i in range(1, offset+1): - self.mouse.move(coords=(end_x+i, end_y+i)) + for i in range(1, offset + 1): + self.mouse.move(coords=(end_x + i, end_y + i)) time.sleep(0.01) for i in range(offset): - self.mouse.move(coords=(end_x+offset-i, end_y+offset-i)) + self.mouse.move(coords=(end_x + offset - i, end_y + offset - i)) time.sleep(0.01) self.mouse.press(button=button, coords=(end_x, end_y)) @@ -645,7 +647,6 @@ def get_ip_address(self): return None - def start_recording(self, max_time=1800, output=None, fps=10, snapshot_sleep=0.001, orientation=0, max_size=None, *args, **kwargs): """ @@ -695,12 +696,12 @@ def start_recording(self, max_time=1800, output=None, fps=10, if self.recorder.is_running(): LOGGING.warning("recording is already running, please don't call again") return None - + logdir = "./" if not ST.LOG_DIR is None: logdir = ST.LOG_DIR if output is None: - save_path = os.path.join(logdir, "screen_%s.mp4"%(time.strftime("%Y%m%d%H%M%S", time.localtime()))) + save_path = os.path.join(logdir, "screen_%s.mp4" % (time.strftime("%Y%m%d%H%M%S", time.localtime()))) else: if os.path.isabs(output): save_path = output @@ -708,9 +709,14 @@ def start_recording(self, max_time=1800, output=None, fps=10, save_path = os.path.join(logdir, output) max_size = get_max_size(max_size) + def get_frame(): - frame = self.snapshot() - + try: + frame = self.snapshot() + except numpy.core._exceptions._ArrayMemoryError: + self.stop_recording() + raise Exception("memory error!!!!") + if max_size is not None: frame = resize_by_max(frame, max_size) return frame @@ -723,7 +729,7 @@ def get_frame(): LOGGING.info("start recording screen to {}, don't close or resize the app window".format(save_path)) return save_path - def stop_recording(self,): + def stop_recording(self): """ Stop recording the device display. Recoding file will be kept in the device. From 6dd0f9d737aa6053ac0700fb7a2eb031bcef0a9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=90=A6?= Date: Tue, 24 Sep 2024 17:26:44 +0800 Subject: [PATCH 16/16] numpy<2.0 (cherry picked from commit cf2e3b4a0d357e2d1b91fe6a4c7911fabd007e2a) --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 5f93f4959..99e315da1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,7 +3,7 @@ Pillow>=3.4.0 requests>=2.11.1 six>=1.9.0,<=1.16.0 mss==6.1.0 -numpy +numpy<2.0 opencv-contrib-python>=4.4.0.46, <=4.6.0.66 facebook-wda>=1.3.3 pywinauto==0.6.3