diff --git a/.gitignore b/.gitignore index 33bd906..04a8815 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,9 @@ *egg-info build venv +.venv dist __pycache__ .idea test-reports +.github \ No newline at end of file diff --git a/README.md b/README.md index 570168d..dbd6b13 100644 --- a/README.md +++ b/README.md @@ -30,7 +30,9 @@ adb --version Import the necessary modules: ```python -from adb-pywrapper import AdbDevice, AdbResult, PullResult +from adb_pywrapper.adb_device import AdbDevice +from adb_pywrapper.adb_result import AdbResult +from adb_pywrapper.pull_result import PullResult ``` ## Listing Connected Devices diff --git a/adb_init.py b/adb_pywrapper/__init__.py similarity index 75% rename from adb_init.py rename to adb_pywrapper/__init__.py index add599f..8b81c7b 100644 --- a/adb_init.py +++ b/adb_pywrapper/__init__.py @@ -1,16 +1,18 @@ import logging +import subprocess from datetime import datetime from os import makedirs, environ from os.path import dirname, abspath, isdir, expanduser from sys import stdout -logger = logging.getLogger(__name__) +logger = logging.getLogger(__name__) ########################### # ADB INITIALISATION CODE # ########################### -def _get_sdk_root(): + +def get_sdk_root(): """ Used to initialize ANDROID_SDK_ROOT, should not be called, use ANDROID_SDK_ROOT instead! This method looks for the location of your Android SDK. it does so by looking for commonly used environmental @@ -40,7 +42,21 @@ def _get_sdk_root(): return possible_paths[0] -ANDROID_SDK_HOME = _get_sdk_root() +ADB_PATH = None +ANDROID_SDK_HOME = get_sdk_root() +# initialize adb path: first check if adb is installed and available as a command +# if not, use the ANDROID_SDK_HOME detected in __init__ +if 'Android Debug Bridge version' in subprocess.getoutput('adb --version'): + ADB_PATH = 'adb' +else: + ADB_PATH = f'{ANDROID_SDK_HOME}/platform-tools/adb' +if 'Android Debug Bridge version' not in subprocess.getoutput(f'{ADB_PATH} --version'): + logger.warning( + f'Could not locate ADB. expected it available as "{ADB_PATH}" ' + f'but checking the version gives unexpected output: {subprocess.getoutput(f"{ADB_PATH} --version")}. ' + f'We are assuming the command `adb` works for now even though it doesn\'t seem to work...') + ADB_PATH = 'adb' + ##################### # LOGGING INIT CODE # diff --git a/adb_pywrapper.py b/adb_pywrapper/adb_device.py similarity index 74% rename from adb_pywrapper.py rename to adb_pywrapper/adb_device.py index f759fd8..7358eae 100644 --- a/adb_pywrapper.py +++ b/adb_pywrapper/adb_device.py @@ -1,66 +1,13 @@ -import re import subprocess -import time from os import makedirs -from os.path import basename, isfile, exists +from os.path import basename, isfile from subprocess import CompletedProcess from time import sleep from typing import Optional -from uuid import uuid4 - -import adb_init - -ADB_PATH = None - -# initialize adb path: first check if adb is installed and available as a command -# if not, use the ANDROID_SDK_HOME detected in __init__ -if 'Android Debug Bridge version' in subprocess.getoutput('adb --version'): - ADB_PATH = 'adb' -else: - ADB_PATH = f'{adb_init.ANDROID_SDK_HOME}/platform-tools/adb' -if 'Android Debug Bridge version' not in subprocess.getoutput(f'{ADB_PATH} --version'): - adb_init.logger.warning( - f'Could not locate ADB. expected it available as "{ADB_PATH}" ' - f'but checking the version gives unexpected output: {subprocess.getoutput(f"{ADB_PATH} --version")}. ' - f'We are assuming the command `adb` works for now even though it doesn\'t seem to work...') - ADB_PATH = 'adb' - - -class AdbResult: - def __init__(self, completed_adb_process: CompletedProcess): - self.completed_adb_process: CompletedProcess = completed_adb_process - self.stdout: str = completed_adb_process.stdout.decode() - self.stderr: str = completed_adb_process.stderr.decode() - self.success: bool = completed_adb_process.returncode == 0 - - def __str__(self) -> str: - return f'success : "{self.success}", ' \ - f'stdout : "{self.stdout}", ' \ - f'stderr : "{self.stderr}"' - def __repr__(self) -> str: - return self.__str__() - - -class PullResult: - """ - A class to represent the result of an adb pull. it contains three properties: - path: the path on which the pulled file should be available if the pull was successful - completed_adb_process: the result of the completed adb process - success: True if the pull was successful and the file in path exists - """ - - def __init__(self, result_path: str, adb_result: AdbResult): - self.completed_adb_process = adb_result - self.path = result_path - self.success = exists(result_path) - - def __str__(self) -> str: - return f'path : "{self.path}", ' \ - f'completed_adb_process : [{self.completed_adb_process.__str__()}]"' - - def __repr__(self) -> str: - return self.__str__() +from adb_pywrapper import logger, log_error_and_raise_exception, ADB_PATH +from adb_pywrapper.adb_result import AdbResult +from adb_pywrapper.pull_result import PullResult class AdbDevice: @@ -69,7 +16,7 @@ def __init__(self, device: str = None, check_device_exists: bool = True): if device is not None and check_device_exists: connected_devices = AdbDevice.list_devices() if device not in connected_devices: - adb_init.log_error_and_raise_exception(adb_init.logger, + log_error_and_raise_exception(logger, f'Cannot create adb connection with device `{device}` ' f'as it cannot be found with `adb devices`: {connected_devices}') self.device_command = '' @@ -112,7 +59,7 @@ def list_devices() -> list[str]: """ result = AdbDevice._adb_command('devices') if not result.success: - adb_init.log_error_and_raise_exception(adb_init.logger, f'Could not get list of available adb devices. ' + log_error_and_raise_exception(logger, f'Could not get list of available adb devices. ' f'ADB output: {result.stdout}{result.stderr}') devices = [line[:line.index('\t')] for line in result.stdout.splitlines() if '\t' in line] return devices @@ -126,13 +73,13 @@ def get_device_status(device_name) -> str: """ result = AdbDevice._adb_command('devices') if not result.success: - adb_init.log_error_and_raise_exception(adb_init.logger, f'Could not get list of available adb devices. ' + log_error_and_raise_exception(logger, f'Could not get list of available adb devices. ' f'ADB output: {result.stdout}{result.stderr}') for line in result.stdout.splitlines(): if line.startswith(device_name): return line.split('\t')[1] - adb_init.log_error_and_raise_exception(adb_init.logger, f'Could not get status from device {device_name}') + log_error_and_raise_exception(logger, f'Could not get status from device {device_name}') def root(self) -> AdbResult: """ @@ -194,7 +141,7 @@ def ls(self, path: str) -> Optional[list[str]]: """ adb_result = self.shell(f'ls {path}') if not adb_result.success: - adb_init.log_error_and_raise_exception(adb_init.logger, + log_error_and_raise_exception(logger, f'Could not get contents of path {path} on device {self.device}. ' f'adb stderr: {adb_result.stderr}') return adb_result.stdout.splitlines() @@ -206,7 +153,7 @@ def installed_packages(self) -> list[str]: """ adb_result = self.shell(f'pm list packages') if not adb_result.success: - adb_init.log_error_and_raise_exception(adb_init.logger, + log_error_and_raise_exception(logger, f'Could not get installed packages on device {self.device}. ' f'adb stderr: {adb_result.stderr}') return [line[line.index(':') + 1:] for line in adb_result.stdout.splitlines() if line.startswith('package:')] @@ -222,7 +169,7 @@ def path_package(self, package_name: str) -> list[str]: """ adb_result = self.shell(f'pm path {package_name}') if not adb_result.success: - adb_init.log_error_and_raise_exception(adb_init.logger, + log_error_and_raise_exception(logger, f'Could not locate package {package_name} on device {self.device}. ' f'adb stderr: {adb_result.stderr}') return [line[line.index(':') + 1:] for line in adb_result.stdout.splitlines() if line.startswith('package:')] @@ -236,7 +183,7 @@ def package_versions(self, package_name: str) -> list[str]: """ adb_result = self.shell(f"dumpsys package {package_name} | grep versionName") if not adb_result.success: - adb_init.log_error_and_raise_exception(adb_init.logger, + log_error_and_raise_exception(logger, f'Could not locate package {package_name} on device {self.device}. ' f'adb stderr: {adb_result.stderr}') result = adb_result.stdout.splitlines() @@ -272,7 +219,7 @@ def pull(self, file_to_pull: str, destination: str) -> PullResult: break sleep(1) if not pull_result.success: - adb_init.log_error_and_raise_exception(adb_init.logger, + log_error_and_raise_exception(logger, f'Could not pull file {file_to_pull} on device {self.device}, ' f'adb output: {pull_result.stdout}{pull_result.stderr}') @@ -293,7 +240,7 @@ def pull_package(self, package_name: str, destination: str) -> list[PullResult]: result = [] files_to_pull = self.path_package(package_name) if len(files_to_pull) == 0: - adb_init.log_error_and_raise_exception(adb_init.logger, + log_error_and_raise_exception(logger, f'Could not locate any package files for package {package_name} on ' f'device {self.device}. Is it installed on the device?') for file_to_pull in files_to_pull: @@ -357,10 +304,10 @@ def _snapshot_command(self, subcommand: str, snapshot_name: Optional[str] = None """ allowed_subcommands = ["list", "save", "load", "del", "get"] if not subcommand in allowed_subcommands: - adb_init.log_error_and_raise_exception(adb_init.logger, + log_error_and_raise_exception(logger, f"Could not execute snapshot subcommand {subcommand}, should be one of {', '.join(allowed_subcommands)}") if subcommand not in ["list", "get"] and snapshot_name is None: - adb_init.logger.warning(adb_init.logger, f"Snapshot subcommand requires a snapshot_name, None is given.") + logger.warning(logger, f"Snapshot subcommand requires a snapshot_name, None is given.") return self.emulator_emu_avd(f' snapshot {subcommand} {snapshot_name}') def emulator_snapshots_list(self) -> list: @@ -378,7 +325,7 @@ def emulator_snapshot_load(self, snapshot_name: str) -> AdbResult: :param snapshot_name: The name of the snapshot. :return: AdbResult object with stdout, stderr if applicable and success True/False. """ - adb_init.logger.info(f"Loading snapshot: {snapshot_name} for {self.device}...") + logger.info(f"Loading snapshot: {snapshot_name} for {self.device}...") if self._snapshot_exists(snapshot_name): return self._snapshot_command("load", snapshot_name) @@ -394,13 +341,13 @@ def emulator_snapshot_save(self, snapshot_name: str) -> AdbResult: :return: AdbResult object with stdout, stderr if applicable and success True/False. """ if self._snapshot_exists(snapshot_name): - adb_init.logger.error(f'A snapshot with the name {snapshot_name} already exists') + logger.error(f'A snapshot with the name {snapshot_name} already exists') return AdbResult(CompletedProcess(args=[], returncode=1, stdout=b'', stderr=f'A snapshot with the name {snapshot_name} already exists'.encode())) save_state = self._snapshot_command("save", snapshot_name) if save_state.success: - adb_init.logger.info(f"Saved snapshot {snapshot_name} of emulator {self.device}") + logger.info(f"Saved snapshot {snapshot_name} of emulator {self.device}") return save_state def emulator_snapshot_delete(self, delete: list[str] = None) -> AdbResult: @@ -438,62 +385,3 @@ def emulator_snapshot_delete(self, delete: list[str] = None) -> AdbResult: CompletedProcess(args=[], returncode=0, stdout=f'All provided snapshots were deleted ' f'successfully'.encode(), stderr=b'') ) - - -class AdbScreenRecorder: - def __init__(self, device: AdbDevice, bit_rate: str = '8M'): - self.device = device - self._recording_process = None - self._recording_folder = f'/sdcard/{uuid4()}' - self._bit_rate = bit_rate - self.__enter__() - - def __enter__(self): - # create recording folder: - self.device.shell(f'mkdir {self._recording_folder}') - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - # stop recording if still in process - if self.is_recording(): - self._recording_process.kill() - # remove files on device - self.device.shell(f'rm -rf {self._recording_folder}') - - def is_recording(self): - return self._recording_process is not None and self._recording_process.poll() is None - - def start_recording(self): - if not self.is_recording(): - arguments = f'{ADB_PATH} {self.device.device_command}shell'.split(' ') - loop = f'i=1; while true; do screenrecord --bit-rate {self._bit_rate} {self._recording_folder}/$i.mp4; let i=i+1; done' - arguments.append(loop) - adb_init.logger.info(f'executing adb command: {arguments}') - self._recording_process = subprocess.Popen(arguments) - - def _screenrecord_process_active_on_device(self): - return '' != self.device.shell('ps -A | grep screenrecord').stdout - - def stop_recording(self, output_folder: str) -> [str]: - if not self.is_recording(): - adb_init.logger.warning(f"Recording was stopped but the recorder wasn't started") - return None - # Stop background process - adb_init.logger.info('Stopping screen recorder...') - self._recording_process.terminate() - while self._screenrecord_process_active_on_device(): - time.sleep(0.2) - self._recording_process = None - # collect recordings - video_files = [f'{self._recording_folder}/{file_name}' for file_name in - self.device.ls(self._recording_folder)] - adb_init.logger.info(f'Copying video files: {video_files}') - pull_results = self.device.pull_multi(video_files, output_folder) - failures = [pull_result for pull_result in pull_results if not pull_result.success] - if len(failures) > 0: - msg = f"Failed to pull file(s) {[failure.path for failure in failures]}" - adb_init.log_error_and_raise_exception(adb_init.logger, msg) - # clean up recordings on device - for video_file in video_files: - self.device.shell(f'rm -f {video_file}') - return [pull_result.path for pull_result in pull_results] # pulled files diff --git a/adb_pywrapper/adb_result.py b/adb_pywrapper/adb_result.py new file mode 100644 index 0000000..b258b89 --- /dev/null +++ b/adb_pywrapper/adb_result.py @@ -0,0 +1,17 @@ +from subprocess import CompletedProcess + + +class AdbResult: + def __init__(self, completed_adb_process: CompletedProcess): + self.completed_adb_process: CompletedProcess = completed_adb_process + self.stdout: str = completed_adb_process.stdout.decode() + self.stderr: str = completed_adb_process.stderr.decode() + self.success: bool = completed_adb_process.returncode == 0 + + def __str__(self) -> str: + return f'success : "{self.success}", ' \ + f'stdout : "{self.stdout}", ' \ + f'stderr : "{self.stderr}"' + + def __repr__(self) -> str: + return self.__str__() \ No newline at end of file diff --git a/adb_pywrapper/adb_screen_recorder.py b/adb_pywrapper/adb_screen_recorder.py new file mode 100644 index 0000000..3dab1db --- /dev/null +++ b/adb_pywrapper/adb_screen_recorder.py @@ -0,0 +1,65 @@ +import subprocess +from time import sleep +from uuid import uuid4 + +from adb_pywrapper import log_error_and_raise_exception, logger, ADB_PATH +from adb_pywrapper.adb_device import AdbDevice + + +class AdbScreenRecorder: + def __init__(self, device: AdbDevice, bit_rate: str = '8M'): + self.device = device + self._recording_process = None + self._recording_folder = f'/sdcard/{uuid4()}' + self._bit_rate = bit_rate + self.__enter__() + + def __enter__(self): + # create recording folder: + self.device.shell(f'mkdir {self._recording_folder}') + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + # stop recording if still in process + if self.is_recording(): + self._recording_process.kill() + # remove files on device + self.device.shell(f'rm -rf {self._recording_folder}') + + def is_recording(self): + return self._recording_process is not None and self._recording_process.poll() is None + + def start_recording(self): + if not self.is_recording(): + arguments = f'{ADB_PATH} {self.device.device_command}shell'.split(' ') + loop = f'i=1; while true; do screenrecord --bit-rate {self._bit_rate} {self._recording_folder}/$i.mp4; let i=i+1; done' + arguments.append(loop) + logger.info(f'executing adb command: {arguments}') + self._recording_process = subprocess.Popen(arguments) + + def _screenrecord_process_active_on_device(self): + return '' != self.device.shell('ps -A | grep screenrecord').stdout + + def stop_recording(self, output_folder: str) -> [str]: + if not self.is_recording(): + logger.warning(f"Recording was stopped but the recorder wasn't started") + return None + # Stop background process + logger.info('Stopping screen recorder...') + self._recording_process.terminate() + while self._screenrecord_process_active_on_device(): + sleep(0.2) + self._recording_process = None + # collect recordings + video_files = [f'{self._recording_folder}/{file_name}' for file_name in + self.device.ls(self._recording_folder)] + logger.info(f'Copying video files: {video_files}') + pull_results = self.device.pull_multi(video_files, output_folder) + failures = [pull_result for pull_result in pull_results if not pull_result.success] + if len(failures) > 0: + msg = f"Failed to pull file(s) {[failure.path for failure in failures]}" + log_error_and_raise_exception(logger, msg) + # clean up recordings on device + for video_file in video_files: + self.device.shell(f'rm -f {video_file}') + return [pull_result.path for pull_result in pull_results] # pulled files \ No newline at end of file diff --git a/adb_pywrapper/pull_result.py b/adb_pywrapper/pull_result.py new file mode 100644 index 0000000..20b6dfa --- /dev/null +++ b/adb_pywrapper/pull_result.py @@ -0,0 +1,24 @@ +import os + +from adb_pywrapper.adb_result import AdbResult + + +class PullResult: + """ + A class to represent the result of an adb pull. it contains three properties: + path: the path on which the pulled file should be available if the pull was successful + completed_adb_process: the result of the completed adb process + success: True if the pull was successful and the file in path exists + """ + + def __init__(self, result_path: str, adb_result: AdbResult): + self.completed_adb_process = adb_result + self.path = result_path + self.success = os.path.exists(result_path) + + def __str__(self) -> str: + return f'path : "{self.path}", ' \ + f'completed_adb_process : [{self.completed_adb_process.__str__()}]"' + + def __repr__(self) -> str: + return self.__str__() \ No newline at end of file diff --git a/setup.py b/setup.py index 25674ce..20a2b5c 100644 --- a/setup.py +++ b/setup.py @@ -1,9 +1,12 @@ -from setuptools import setup +from setuptools import setup, find_packages setup( - name="adb-pywrapper", - version="1.0.0", - description="adb-pywrapper facilitates seamless interaction with Android devices using the Android Debug Bridge (ADB) " + name="adb_pywrapper", + version="1.0.1", + packages=find_packages(), + test_suite="test", + + description="adb_pywrapper facilitates seamless interaction with Android devices using the Android Debug Bridge (ADB) " "directly within Python scripts.", long_description=f"{open('README.md').read()}", long_description_content_type="text/markdown", @@ -11,6 +14,5 @@ author_email="netherlandsforensicinstitute@users.noreply.github.com", url="https://github.com/NetherlandsForensicInstitute/adb-pywrapper", licence="EUPL-1.2", - py_modules=["adb-pywrapper", "adb_init"], - test_suite="test", ) + diff --git a/test/test_adb_pywrapper.py b/test/test_adb_pywrapper.py index ac617c5..661125a 100644 --- a/test/test_adb_pywrapper.py +++ b/test/test_adb_pywrapper.py @@ -1,11 +1,10 @@ import os.path import subprocess import unittest -from unittest.mock import patch, MagicMock, Mock +from unittest.mock import patch, Mock -from parameterized import parameterized - -from adb_pywrapper import AdbResult, AdbDevice +from adb_pywrapper.adb_device import AdbDevice +from adb_pywrapper.adb_result import AdbResult PROCESS = subprocess.run('echo hello', shell=True, capture_output=True) MOCK_SUCCESSFULLY_PULLED_FILE = ['abc.apk', 'bla.jpg'] @@ -321,7 +320,7 @@ def test_get_state(self): self.assertFalse(result.success) self.assertIn('Invalid command', result.stderr) - @patch('adb_pywrapper.AdbDevice._snapshot_command') + @patch('adb_pywrapper.adb_device.AdbDevice._snapshot_command') def test_emulator_snapshots_list(self, mock_snapshot_command): # Test emulator_snapshots_list function snapshot_list_output = """List of snapshots present on all disks: @@ -336,8 +335,8 @@ def test_emulator_snapshots_list(self, mock_snapshot_command): mock_snapshot_command.assert_called_once_with('list') self.assertEqual(result, ['snap_2023-11-23_13-13-02', 'snap_2023-12-05_12-56-56']) - @patch('adb_pywrapper.AdbDevice._snapshot_exists') - @patch('adb_pywrapper.AdbDevice._snapshot_command') + @patch('adb_pywrapper.adb_device.AdbDevice._snapshot_exists') + @patch('adb_pywrapper.adb_device.AdbDevice._snapshot_command') def test_emulator_snapshot_load_existing(self, mock_snapshot_command, mock_snapshot_exists): # Test emulator_snapshot_load function with an existing snapshot mock_snapshot_exists.return_value = True @@ -348,7 +347,7 @@ def test_emulator_snapshot_load_existing(self, mock_snapshot_command, mock_snaps mock_snapshot_exists.assert_called_once_with('snapshot1') mock_snapshot_command.assert_called_once_with('load', 'snapshot1') - @patch('adb_pywrapper.AdbDevice._snapshot_exists') + @patch('adb_pywrapper.adb_device.AdbDevice._snapshot_exists') def test_emulator_snapshot_load_non_existing(self, mock_snapshot_exists): # Test emulator_snapshot_load function with a non-existing snapshot mock_snapshot_exists.return_value = False @@ -358,8 +357,8 @@ def test_emulator_snapshot_load_non_existing(self, mock_snapshot_exists): mock_snapshot_exists.assert_called_once_with('non_existing_snapshot') self.assertFalse(result.success) - @patch('adb_pywrapper.AdbDevice._snapshot_exists') - @patch('adb_pywrapper.AdbDevice._snapshot_command') + @patch('adb_pywrapper.adb_device.AdbDevice._snapshot_exists') + @patch('adb_pywrapper.adb_device.AdbDevice._snapshot_command') def test_emulator_snapshot_save(self, mock_snapshot_command, mock_snapshot_exists): # Test emulator_snapshot_save function mock_snapshot_exists.return_value = False @@ -370,9 +369,9 @@ def test_emulator_snapshot_save(self, mock_snapshot_command, mock_snapshot_exist mock_snapshot_exists.assert_called_once_with('snapshot1') mock_snapshot_command.assert_called_once_with('save', 'snapshot1') - @patch('adb_pywrapper.AdbDevice.emulator_snapshots_list', return_value=['snapshot1', 'snapshot2']) - @patch('adb_pywrapper.AdbDevice._snapshot_exists') - @patch('adb_pywrapper.AdbDevice._snapshot_command') + @patch('adb_pywrapper.adb_device.AdbDevice.emulator_snapshots_list', return_value=['snapshot1', 'snapshot2']) + @patch('adb_pywrapper.adb_device.AdbDevice._snapshot_exists') + @patch('adb_pywrapper.adb_device.AdbDevice._snapshot_command') def test_emulator_snapshot_delete(self, mock_snapshot_command, mock_snapshot_exists, mock_emulator_snapshots_list): # Test emulator_snapshot_delete function mock_snapshot_exists.return_value = True # Existing snapshots