forked from google/python-adb
-
Notifications
You must be signed in to change notification settings - Fork 6
/
common_mock.py
81 lines (62 loc) · 2.44 KB
/
common_mock.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
"""Stubs for tests using common's usb handling."""
import binascii
import string
import threading
from adb import adb_protocol
from adb import usb_exceptions
PRINTABLE_DATA = set(string.printable) - set(string.whitespace)
class Failure(Exception):
pass
def _Dotify(data): # pragma: no cover
# This code is not exercised unless there's a test case failure.
try:
return adb_protocol._AdbMessageHeader.Unpack(data)
except adb_protocol.InvalidResponseError:
return ''.join(char if char in PRINTABLE_DATA else '.' for char in data)
class MockUsb(object):
"""UsbHandle mock."""
def __init__(self):
# Immutable.
self.timeout_ms = 0
self.port_path = 'stub'
# Mutable.
self._lock = threading.Lock()
self._expected_io = []
def Close(self):
with self._lock:
assert not self._expected_io, 'Expected I/O not processed:\n' + '\n'.join(
'- %s: %s' % (i[0], _Dotify(i[1])) for i in self._expected_io)
def BulkWrite(self, data, unused_timeout_ms=None):
with self._lock:
if not self._expected_io:
raise Failure('No more excepted I/O') # pragma: no cover
if self._expected_io[0][0] != 'write':
raise Failure('I/O mismatch:\n- expected read %s\n- got write %s' %
(_Dotify(self._expected_io[0][1]),
_Dotify(data))) # pragma: no cover
expected_data = self._expected_io.pop(0)[1]
if expected_data != data:
raise Failure('Mismatch:\n- expected %s\n - got %s' %
(_Dotify(expected_data), _Dotify(data))) # pragma: no cover
def BulkRead(self, length,
timeout_ms=None): # pylint: disable=unused-argument
with self._lock:
if not self._expected_io:
raise usb_exceptions.ReadFailedError(None, None) # pragma: no cover
if self._expected_io[0][0] != 'read':
# Nothing to read for now.
raise usb_exceptions.ReadFailedError(None, None) # pragma: no cover
data = self._expected_io.pop(0)[1]
if length < len(data):
raise ValueError(
'Unexpected read length. Read %d bytes, got %d bytes' %
(length, len(data))) # pragma: no cover
return data
def ExpectWrite(self, data):
with self._lock:
self._expected_io.append(('write', data))
def ExpectRead(self, data):
with self._lock:
self._expected_io.append(('read', data))
def Timeout(self, timeout_ms):
return timeout_ms if timeout_ms is not None else self.timeout_ms