diff --git a/Makefile b/Makefile index e4e2e10..9278ad9 100644 --- a/Makefile +++ b/Makefile @@ -1,12 +1,12 @@ lint: - python -m isort -c -rc src/ tests/ example/ - python -m black --check src/ tests/ example/ - python -m flake8 --select F --per-file-ignores="__init__.py:F401" src/ tests/ example/ - python -m mypy src/ tests/ example/ + python3 -m isort -c -rc src/ tests/ example/ + python3 -m black --check src/ tests/ example/ + python3 -m flake8 --select F --per-file-ignores="__init__.py:F401" src/ tests/ example/ + python3 -m mypy src/ tests/ example/ format: - python -m isort -rc src/ tests/ example/ - python -m black src/ tests/ example/ + python3 -m isort -rc src/ tests/ example/ + python3 -m black src/ tests/ example/ test: pip install . diff --git a/setup.py b/setup.py index 85a48d0..609ac61 100755 --- a/setup.py +++ b/setup.py @@ -9,7 +9,7 @@ setuptools.setup( name="enochecker", - version="0.2.0", + version="0.2.1", author="domenukk", author_email="dmaier@sect.tu-berlin.de", description="Library to build checker scripts for EnoEngine A/D CTF Framework in Python", diff --git a/src/enochecker/__init__.py b/src/enochecker/__init__.py index 84a43e4..c06966d 100644 --- a/src/enochecker/__init__.py +++ b/src/enochecker/__init__.py @@ -15,7 +15,6 @@ ensure_bytes, ensure_unicode, ensure_valid_filename, - readline_expect, serve_once, sha256ify, snake_caseify, diff --git a/src/enochecker/utils.py b/src/enochecker/utils.py index c655aea..42eccf7 100644 --- a/src/enochecker/utils.py +++ b/src/enochecker/utils.py @@ -23,7 +23,7 @@ Union, ) -from .results import BrokenServiceException, OfflineException +from .results import BrokenServiceException if TYPE_CHECKING: # pragma: no cover import requests @@ -179,42 +179,42 @@ def debase64ify( return base64.b64decode(s).decode("utf-8") -def readline_expect( - telnet: Union[telnetlib.Telnet, "SimpleSocket"], - expected: Union[str, bytes], - read_until: Union[str, bytes] = b"\n", - timeout: int = 30, -) -> bytes: - """ - Read to newline (or read_until string) and assert the presence of a string in the response. - - Will raise an exception if failed. - - :param telnet: Connected telnet instance (the result of self.telnet(..)) - :param expected: the expected String to search for in the response - :param read_until: Which char to read until. - :param timeout: a timeout - :return: the bytes read - """ - logger = getattr(telnet, "logger", utilslogger) - - if isinstance(expected, str): - expected = expected.encode("utf-8") - if isinstance(read_until, str): - read_until = read_until.encode("utf-8") - - read = telnet.read_until(read_until, timeout) - if read == b"": - err = "Expected {!r} but got nothing/timeout!".format(expected) - logger.error(err, stack_info=True) - telnet.close() - raise OfflineException(err) - if expected not in read: - err = "Expected {!r} but got {!r}".format(expected, read) - logger.error(err, stack_info=True) - telnet.close() - raise BrokenServiceException(err) - return read +# def readline_expect( +# telnet: Union[telnetlib.Telnet, "SimpleSocket"], +# expected: Union[str, bytes], +# read_until: Union[str, bytes] = b"\n", +# timeout: int = 30, +# ) -> bytes: +# """ +# Read to newline (or read_until string) and assert the presence of a string in the response. + +# Will raise an exception if failed. + +# :param telnet: Connected telnet instance (the result of self.telnet(..)) +# :param expected: the expected String to search for in the response +# :param read_until: Which char to read until. +# :param timeout: a timeout +# :return: the bytes read +# """ +# logger = getattr(telnet, "logger", utilslogger) + +# if isinstance(expected, str): +# expected = expected.encode("utf-8") +# if isinstance(read_until, str): +# read_until = read_until.encode("utf-8") + +# read = telnet.read_until(read_until, timeout) +# if read == b"": +# err = "Expected {!r} but got nothing/timeout!".format(expected) +# logger.error(err, stack_info=True) +# telnet.close() +# raise OfflineException(err) +# if expected not in read: +# err = "Expected {!r} but got {!r}".format(expected, read) +# logger.error(err, stack_info=True) +# telnet.close() +# raise BrokenServiceException(err) +# return read def start_daemon(target: Callable[..., Any]) -> threading.Thread: @@ -352,6 +352,7 @@ def readline_expect( expected: Union[str, bytes], read_until: Union[str, bytes] = b"\n", timeout: Optional[int] = None, + exception_message: Optional[str] = None, ) -> bytes: """ Read to newline (or read_until string) and assert the presence of a string in the response. @@ -365,7 +366,29 @@ def readline_expect( """ if timeout is None: timeout = self.current_default_timeout - return readline_expect(self, expected, read_until, timeout) + + expected = ensure_bytes(expected) + read_until = ensure_bytes(read_until) + + read = self.read_until(read_until, timeout) + if read == b"": + err = "Expected {!r} but got nothing/timeout!".format(expected) + self.logger.error(err, stack_info=True) + self.close() + if exception_message: + raise BrokenServiceException(exception_message) + else: + raise BrokenServiceException("Service returned nothing (timeout?).") + + if expected not in read: + err = "Expected {!r} but got {!r}".format(expected, read) + self.logger.error(err, stack_info=True) + self.close() + if exception_message: + raise BrokenServiceException(exception_message) + else: + raise BrokenServiceException("Service returned unexpected response.") + return read def expect( self, diff --git a/tests/test_enochecker.py b/tests/test_enochecker.py index 3446095..948b888 100644 --- a/tests/test_enochecker.py +++ b/tests/test_enochecker.py @@ -18,7 +18,6 @@ ensure_bytes, ensure_unicode, parse_args, - readline_expect, run, serve_once, snake_caseify, @@ -248,7 +247,7 @@ def test_checker_connections(): checker = CheckerExampleImpl(CHECKER_METHODS[0]) t = checker.connect() t.write(b"GET / HTTP/1.0\r\n\r\n") - assert readline_expect(t, "HTTP") + assert t.readline_expect("HTTP") t.close()