From a6ac7019a146bc198b46388f50597f433bd156b5 Mon Sep 17 00:00:00 2001 From: Leon Morten Richter Date: Sun, 28 Jul 2024 13:04:11 +0200 Subject: [PATCH] feat: adds PreprocessorProtocol for custom message formats (#147) * feat: adds PreprocessorProtocol for custom message formats * fix: backwards compat for python 3.8 & drop support for 3.7 * chore: updates changelog * fix: examples with FileReaderStream --- .github/workflows/main.yml | 4 +- .gitignore | 3 +- CHANGELOG.txt | 6 ++ README.md | 47 +++++++++-- docs/examples.rst | 28 ++++--- examples/decode.py | 2 +- examples/file_stream.py | 7 +- examples/gatehouse_wrappers.py | 21 ++--- examples/preprocess.py | 44 ++++++++++ examples/tag_block.py | 13 +-- examples/tracker.py | 9 +- examples/tracking_perf.py | 13 +-- pyais/__init__.py | 5 +- pyais/stream.py | 66 ++++++++++++--- pyproject.toml | 2 +- pyrightconfig.json | 4 + tests/test_examples.py | 2 +- tests/test_file_stream.py | 16 ++-- tests/test_preprocess.py | 149 +++++++++++++++++++++++++++++++++ 19 files changed, 364 insertions(+), 77 deletions(-) create mode 100644 examples/preprocess.py create mode 100644 pyrightconfig.json create mode 100644 tests/test_preprocess.py diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 97f5108..47f641b 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -9,11 +9,11 @@ jobs: strategy: fail-fast: false matrix: - python: ['3.7', '3.8', '3.9', '3.10', '3.11'] + python: ['3.8', '3.9', '3.10', '3.11', '3.12'] os: ['ubuntu-latest'] steps: - uses: actions/checkout@master - + - name: Setup python uses: actions/setup-python@v1 with: diff --git a/.gitignore b/.gitignore index 533528f..47192dc 100644 --- a/.gitignore +++ b/.gitignore @@ -37,4 +37,5 @@ tests/test-reports-*/* .mypy_cache/ profiling_results venv/ -build/ \ No newline at end of file +build/ +*.ais diff --git a/CHANGELOG.txt b/CHANGELOG.txt index 27caaac..3607e57 100644 --- a/CHANGELOG.txt +++ b/CHANGELOG.txt @@ -1,6 +1,12 @@ ==================== pyais CHANGELOG ==================== +------------------------------------------------------------------------------- + Version 2.7.0 27 Jul 2024 +------------------------------------------------------------------------------- +* adds the `PreprocessorProtocol` to support custom message formats +* DROPS support for Python 3.7 (EOL) +* minor correction in the documentation (thanks to @lsamaciel) ------------------------------------------------------------------------------- Version 2.6.6 26 Jun 2024 ------------------------------------------------------------------------------- diff --git a/README.md b/README.md index a32e6b9..5fa2174 100644 --- a/README.md +++ b/README.md @@ -118,9 +118,10 @@ from pyais.stream import FileReaderStream filename = "sample.ais" -for msg in FileReaderStream(filename): - decoded = msg.decode() - print(decoded) +with FileReaderStream(filename) as stream: + for msg in stream: + decoded = msg.decode() + print(decoded) ``` Decode a stream of messages (e.g. a list or generator):: @@ -325,6 +326,37 @@ Such details include information used by the slot allocation algorithm (either S Refer to [readthedocs](https://pyais.readthedocs.io/en/latest/messages.html#communication-state) for more information. +# Preprocessing + +The `PreprocessorProtocol` is designed to provide flexibility in handling different message formats. By implementing this protocol, users can create custom preprocessors that transform input messages into the required NMEA0183 format before further processing. + +## Definition + +```py +import typing + +class PreprocessorProtocol(typing.Protocol): + def process(self, line: bytes) -> bytes: + pass +``` + +where `process` is defined as: + +```py +def process(self, line: bytes) -> bytes: + pass +``` + +Parameters: + line (bytes): The input line in bytes that needs to be processed. +Returns: + bytes: The processed line in bytes, conforming to the NMEA0183 format. + +The `process` method is responsible for transforming the input bytes into a format that adheres to the NMEA0183 standard. This method must be implemented by any class that conforms to the `PreprocessorProtocol`. + +The custom preprocessor implementing the PreprocessorProtocol can be passed as an optional keyword argument (default None) to any class that implements the streaming protocol, excluding `IterMessages()`. + +See [the preprocess example](./examples/preprocess.py) for an example implementation. # AIS Filters @@ -431,10 +463,11 @@ from pyais.stream import FileReaderStream filename = pathlib.Path(__file__).parent.joinpath('sample.ais') -with AISTracker() as tracker: - for msg in FileReaderStream(str(filename)): - tracker.update(msg) - latest_tracks = tracker.n_latest_tracks(10) +with FileReaderStream(str(filename)) as stream: + with AISTracker() as tracker: + for msg in stream: + tracker.update(msg) + latest_tracks = tracker.n_latest_tracks(10) # Get the latest 10 tracks print('latest 10 tracks', ','.join(str(t.mmsi) for t in latest_tracks)) diff --git a/docs/examples.rst b/docs/examples.rst index 1778ae6..1f2d664 100644 --- a/docs/examples.rst +++ b/docs/examples.rst @@ -73,9 +73,10 @@ The following example shows how to read and parse AIS messages from a file:: filename = pathlib.Path(__file__).parent.joinpath('sample.ais') - for msg in FileReaderStream(str(filename)): - decoded = msg.decode() - print(decoded) + with FileReaderStream(str(filename)) as stream: + for msg in stream: + decoded = msg.decode() + print(decoded) Gatehouse wrappers ------------------- @@ -88,16 +89,17 @@ Some AIS messages have so-called Gatehouse wrappers:: filename = pathlib.Path(__file__).parent.joinpath('gatehouse.nmea') - for msg in FileReaderStream(str(filename)): - print('*' * 80) - if msg.wrapper_msg is not None: # <= optional gatehouse wrapper - print('Country', msg.wrapper_msg.country) - print('Online', msg.wrapper_msg.online_data) - print('PSS', msg.wrapper_msg.pss) - print('Region', msg.wrapper_msg.region) - print('Timestamp', msg.wrapper_msg.timestamp) - decoded = msg.decode() - print(decoded) + with FileReaderStream(str(filename)) as stream: + for msg in stream: + print('*' * 80) + if msg.wrapper_msg is not None: # <= optional gatehouse wrapper + print('Country', msg.wrapper_msg.country) + print('Online', msg.wrapper_msg.online_data) + print('PSS', msg.wrapper_msg.pss) + print('Region', msg.wrapper_msg.region) + print('Timestamp', msg.wrapper_msg.timestamp) + decoded = msg.decode() + print(decoded) Livestream ----------- diff --git a/examples/decode.py b/examples/decode.py index c0f25f0..a12457e 100644 --- a/examples/decode.py +++ b/examples/decode.py @@ -32,7 +32,7 @@ decoded_s = decode("!AIVDM,1,1,,B,15NG6V0P01G?cFhE`R2IU?wn28R>,0*05") assert decoded_b == decoded_s -# Lets say you have some kind of stream of messages. Than you can use `IterMessages` to decode the messages: +# Lets say you have some kind of stream of messages. Then you can use `IterMessages` to decode the messages: fake_stream = [ b"!AIVDM,1,1,,A,13HOI:0P0000VOHLCnHQKwvL05Ip,0*23", b"!AIVDM,1,1,,A,133sVfPP00PD>hRMDH@jNOvN20S8,0*7F", diff --git a/examples/file_stream.py b/examples/file_stream.py index 0e069b5..03ba486 100644 --- a/examples/file_stream.py +++ b/examples/file_stream.py @@ -13,6 +13,7 @@ filename = pathlib.Path(__file__).parent.joinpath('sample.ais') -for msg in FileReaderStream(str(filename)): - decoded = msg.decode() - print(decoded) +with FileReaderStream(str(filename)) as stream: + for msg in stream: + decoded = msg.decode() + print(decoded) diff --git a/examples/gatehouse_wrappers.py b/examples/gatehouse_wrappers.py index b39c0c4..514251a 100644 --- a/examples/gatehouse_wrappers.py +++ b/examples/gatehouse_wrappers.py @@ -8,13 +8,14 @@ filename = pathlib.Path(__file__).parent.joinpath('gatehouse.nmea') -for msg in FileReaderStream(str(filename)): - print('*' * 80) - if msg.wrapper_msg is not None: # <= optional gatehouse wrapper - print('Country', msg.wrapper_msg.country) - print('Online', msg.wrapper_msg.online_data) - print('PSS', msg.wrapper_msg.pss) - print('Region', msg.wrapper_msg.region) - print('Timestamp', msg.wrapper_msg.timestamp) - decoded = msg.decode() - print(decoded) +with FileReaderStream(str(filename)) as stream: + for msg in stream: + print('*' * 80) + if msg.wrapper_msg is not None: # <= optional gatehouse wrapper + print('Country', msg.wrapper_msg.country) + print('Online', msg.wrapper_msg.online_data) + print('PSS', msg.wrapper_msg.pss) + print('Region', msg.wrapper_msg.region) + print('Timestamp', msg.wrapper_msg.timestamp) + decoded = msg.decode() + print(decoded) diff --git a/examples/preprocess.py b/examples/preprocess.py new file mode 100644 index 0000000..37eda86 --- /dev/null +++ b/examples/preprocess.py @@ -0,0 +1,44 @@ +import pathlib +import re +import textwrap + +from pyais.stream import FileReaderStream, PreprocessorProtocol + +filename = pathlib.Path(__file__).parent.joinpath('preprocess.ais') + +# Create a sample file +with open(filename, 'w') as fd: + fd.write(textwrap.dedent(""" + [2024-07-19 08:45:27.141] !AIVDM,1,1,,A,13HOI:0P0000VOHLCnHQKwvL05Ip,0*23 + [2024-07-19 08:45:30.074] !AIVDM,1,1,,A,133sVfPP00PD>hRMDH@jNOvN20S8,0*7F + [2024-07-19 08:45:35.007] !AIVDM,1,1,,B,100h00PP0@PHFV`Mg5gTH?vNPUIp,0*3B + [2024-07-19 08:45:35.301] !AIVDM,1,1,,B,13eaJF0P00Qd388Eew6aagvH85Ip,0*45 + [2024-07-19 08:45:40.021] !AIVDM,1,1,,A,14eGrSPP00ncMJTO5C6aBwvP2D0?,0*7A + [2024-07-19 09:00:00.001] !AIVDO,2,1,,A,8=?eN>0000:C=4B1KTTsgLoUelGetEo0FoWr8jo=?045TNv5Tge6sAUl4MKWo,0*5F + [2024-07-19 09:00:00.002] !AIVDO,2,2,,A,vhOL9NIPln:BsP0=BLOiiCbE7;SKsSJfALeATapHfdm6Tl,2*79 + [2024-07-19 08:45:40.074] !AIVDM,1,1,,A,15MrVH0000KH<:V:NtBLoqFP2H9:,0*2F + """)) + + +class Preprocessor(PreprocessorProtocol): + """A custom preprocessor class that implements the PreprocessorProtocol. + This class implements the parsing of a custom meta data format [2024-07-19 08:45:40.074].""" + + def __init__(self) -> None: + self.last_meta = None + + def process(self, line: bytes): + nmea_message = re.search(b".* (.*)", line).group(1) + self.last_meta = re.search(b"(.*) .*", line).group(1) + return nmea_message + + def get_meta(self): + return self.last_meta + + +preprocessor = Preprocessor() + +with FileReaderStream(str(filename), preprocessor=preprocessor) as stream: + for msg in stream: + decoded = msg.decode() + print(decoded, preprocessor.get_meta()) diff --git a/examples/tag_block.py b/examples/tag_block.py index c8eb147..9cd1885 100644 --- a/examples/tag_block.py +++ b/examples/tag_block.py @@ -22,10 +22,11 @@ filename = pathlib.Path(__file__).parent.joinpath('sample.ais') -for nmea in FileReaderStream(filename): - if nmea.tag_block: - # again: not all messages have a tag block - nmea.tag_block.init() - print(nmea.tag_block.asdict()) +with FileReaderStream(filename) as stream: + for nmea in stream: + if nmea.tag_block: + # again: not all messages have a tag block + nmea.tag_block.init() + print(nmea.tag_block.asdict()) - decoded = nmea.decode() + decoded = nmea.decode() diff --git a/examples/tracker.py b/examples/tracker.py index 14d74ae..8aaefd7 100644 --- a/examples/tracker.py +++ b/examples/tracker.py @@ -5,10 +5,11 @@ filename = pathlib.Path(__file__).parent.joinpath('sample.ais') -with AISTracker() as tracker: - for msg in FileReaderStream(str(filename)): - tracker.update(msg) - latest_tracks = tracker.n_latest_tracks(10) +with FileReaderStream(str(filename)) as stream: + with AISTracker() as tracker: + for msg in stream: + tracker.update(msg) + latest_tracks = tracker.n_latest_tracks(10) # Get the latest 10 tracks print('latest 10 tracks', ','.join(str(t.mmsi) for t in latest_tracks)) diff --git a/examples/tracking_perf.py b/examples/tracking_perf.py index c06abfb..7f71ee4 100644 --- a/examples/tracking_perf.py +++ b/examples/tracking_perf.py @@ -10,12 +10,13 @@ tracker = AISTracker(ttl_in_seconds=0.01, stream_is_ordered=True) start = time.time() -for i, msg in enumerate(FileReaderStream(str(filename)), start=1): - try: - tracker.update(msg) - _ = tracker.n_latest_tracks(50) - except UnknownMessageException as e: - print(str(e)) +with FileReaderStream(str(filename)) as stream: + for i, msg in enumerate(stream, start=1): + try: + tracker.update(msg) + _ = tracker.n_latest_tracks(50) + except UnknownMessageException as e: + print(str(e)) finish = time.time() diff --git a/pyais/__init__.py b/pyais/__init__.py index c03aa8f..12950aa 100644 --- a/pyais/__init__.py +++ b/pyais/__init__.py @@ -1,11 +1,11 @@ from pyais.messages import NMEAMessage, ANY_MESSAGE, AISSentence -from pyais.stream import TCPConnection, FileReaderStream, IterMessages, Stream +from pyais.stream import TCPConnection, FileReaderStream, IterMessages, Stream, PreprocessorProtocol from pyais.encode import encode_dict, encode_msg, ais_to_nmea_0183 from pyais.decode import decode from pyais.tracker import AISTracker, AISTrack __license__ = 'MIT' -__version__ = '2.6.6' +__version__ = '2.7.0' __author__ = 'Leon Morten Richter' __all__ = ( @@ -19,6 +19,7 @@ 'IterMessages', 'FileReaderStream', 'Stream', + 'PreprocessorProtocol', 'decode', 'AISTracker', 'AISTrack', diff --git a/pyais/stream.py b/pyais/stream.py index 237701b..fd06319 100644 --- a/pyais/stream.py +++ b/pyais/stream.py @@ -1,4 +1,5 @@ import typing +import pathlib from abc import ABC, abstractmethod from socket import AF_INET, SOCK_DGRAM, SOCK_STREAM, socket from typing import BinaryIO, Generator, Generic, Iterable, List, TypeVar, cast @@ -22,6 +23,37 @@ def should_parse(byte_str: bytes) -> bool: return len(byte_str) > 0 and byte_str[0] in (DOLLAR_SIGN, EXCLAMATION_POINT, BACKSLASH) +class PreprocessorProtocol(typing.Protocol): + """ + Protocol for preprocessing lines of bytes into NMEA0183 format. + + This protocol must be implemented to support various message formats in the pyais library. + Implementing classes should provide a `process` method that transforms input lines of bytes + into NMEA0183 compliant messages. + + Methods: + -------- + process(line: bytes) -> bytes: + Processes an input line of bytes and returns it in NMEA0183 format. + """ + + def process(self, line: bytes) -> bytes: + """ + Process an input line of bytes and return it in NMEA0183 format. + + Parameters: + ----------- + line : bytes + The input line of bytes to be processed. + + Returns: + -------- + bytes + The processed line in bytes, conforming to the NMEA0183 format. + """ + pass + + class AssembleMessages(ABC): """ Base class that assembles multiline messages. @@ -144,13 +176,15 @@ def _iter_messages(self) -> Generator[bytes, None, None]: class Stream(AssembleMessages, Generic[F], ABC): - def __init__(self, fobj: F) -> None: + def __init__(self, fobj: F, preprocessor: typing.Optional[PreprocessorProtocol] = None) -> None: """ Create a new Stream-like object. @param fobj: A file-like or socket object. + @param preprocessor: An optional preprocessor """ super().__init__() self._fobj: F = fobj + self.preprocessor = preprocessor def __exit__(self, exc_type: object, exc_val: object, exc_tb: object) -> None: if self._fobj is not None: @@ -158,7 +192,13 @@ def __exit__(self, exc_type: object, exc_val: object, exc_tb: object) -> None: def _iter_messages(self) -> Generator[bytes, None, None]: # Do not parse lines, that are obviously not NMEA messages - yield from (line for line in self.read() if should_parse(line)) + for line in self.read(): + if len(line) <= 10: + continue + if self.preprocessor is not None: + line = self.preprocessor.process(line) + if should_parse(line): + yield line @abstractmethod def read(self) -> Generator[bytes, None, None]: @@ -168,8 +208,8 @@ def read(self) -> Generator[bytes, None, None]: class BinaryIOStream(Stream[BinaryIO]): """Read messages from a file-like object""" - def __init__(self, file: BinaryIO) -> None: - super().__init__(file) + def __init__(self, file: BinaryIO, preprocessor: typing.Optional[PreprocessorProtocol] = None) -> None: + super().__init__(file, preprocessor=preprocessor) def read(self) -> Generator[bytes, None, None]: yield from self._fobj @@ -180,8 +220,8 @@ class FileReaderStream(BinaryIOStream): Read NMEA messages from file """ - def __init__(self, filename: str, mode: str = "rb") -> None: - self.filename: str = filename + def __init__(self, filename: typing.Union[str, pathlib.Path], mode: str = "rb", preprocessor: typing.Optional[PreprocessorProtocol] = None) -> None: + self.filename: typing.Union[str, pathlib.Path] = filename self.mode: str = mode # Try to open file try: @@ -189,7 +229,7 @@ def __init__(self, filename: str, mode: str = "rb") -> None: file = cast(BinaryIO, file) except Exception as e: raise FileNotFoundError(f"Could not open file {self.filename}") from e - super().__init__(file) + super().__init__(file, preprocessor=preprocessor) class ByteStream(Stream[None]): @@ -197,9 +237,9 @@ class ByteStream(Stream[None]): Takes a iterable that contains ais messages as bytes and assembles them. """ - def __init__(self, iterable: Iterable[bytes]) -> None: + def __init__(self, iterable: Iterable[bytes], preprocessor: typing.Optional[PreprocessorProtocol] = None) -> None: self.iterable: Iterable[bytes] = iterable - super().__init__(None) + super().__init__(None, preprocessor=preprocessor) def __exit__(self, exc_type: object, exc_val: object, exc_tb: object) -> None: return @@ -242,10 +282,10 @@ def read(self) -> Generator[bytes, None, None]: class UDPReceiver(SocketStream): - def __init__(self, host: str, port: int) -> None: + def __init__(self, host: str, port: int, preprocessor: typing.Optional[PreprocessorProtocol] = None) -> None: sock: socket = socket(AF_INET, SOCK_DGRAM) sock.bind((host, port)) - super().__init__(sock) + super().__init__(sock, preprocessor=preprocessor) def recv(self) -> bytes: return self._fobj.recvfrom(self.BUF_SIZE)[0] @@ -260,11 +300,11 @@ class TCPConnection(SocketStream): def recv(self) -> bytes: return self._fobj.recv(self.BUF_SIZE) - def __init__(self, host: str, port: int = 80) -> None: + def __init__(self, host: str, port: int = 80, preprocessor: typing.Optional[PreprocessorProtocol] = None) -> None: sock: socket = socket(AF_INET, SOCK_STREAM) try: sock.connect((host, port)) except ConnectionRefusedError as e: sock.close() raise ConnectionRefusedError(f"Failed to connect to {host}:{port}") from e - super().__init__(sock) + super().__init__(sock, preprocessor=preprocessor) diff --git a/pyproject.toml b/pyproject.toml index 00d984d..393af92 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,7 +3,7 @@ name = "pyais" dynamic = ["version"] description = "AIS message decoding" readme = "README.md" -requires-python = ">=3.7" +requires-python = ">=3.8" license = {file = "LICENSE"} keywords = ["AIS", "ship", "decoding", "NMEA", "maritime"] diff --git a/pyrightconfig.json b/pyrightconfig.json new file mode 100644 index 0000000..379822c --- /dev/null +++ b/pyrightconfig.json @@ -0,0 +1,4 @@ +{ + "venvPath": ".", + "venv": "venv" +} \ No newline at end of file diff --git a/tests/test_examples.py b/tests/test_examples.py index 4eef83e..d4c5ed3 100644 --- a/tests/test_examples.py +++ b/tests/test_examples.py @@ -32,4 +32,4 @@ def test_run_every_file(self): if csv_file.exists(): csv_file.unlink() - assert i == 20 + assert i == 21 diff --git a/tests/test_file_stream.py b/tests/test_file_stream.py index cbe1676..10efdb9 100644 --- a/tests/test_file_stream.py +++ b/tests/test_file_stream.py @@ -158,12 +158,13 @@ def test_large_file(self): par_dir = pathlib.Path(__file__).parent.absolute() large_file = par_dir.joinpath("nmea-sample") errors = 0 - for i, msg in enumerate(FileReaderStream(large_file)): - try: - msg.decode() - except UnknownMessageException: - errors += 1 - continue + with FileReaderStream(large_file) as stream: + for i, msg in enumerate(stream): + try: + msg.decode() + except UnknownMessageException: + errors += 1 + continue print(f"Decoding {i + 1} messages took:", time.time() - start) print("ERRORS", errors) @@ -186,7 +187,8 @@ def test_mixed_content(self): text files, that contain both AIS messages and non AIS messages.""" par_dir = pathlib.Path(__file__).parent.absolute() mixed_content_file = par_dir.joinpath("messages.ais") - self.assertEqual(len(list(iter(FileReaderStream(mixed_content_file)))), 6) + with FileReaderStream(mixed_content_file) as stream: + self.assertEqual(len(list(iter(stream))), 6) def test_timestamp_messages(self): par_dir = pathlib.Path(__file__).parent.absolute() diff --git a/tests/test_preprocess.py b/tests/test_preprocess.py new file mode 100644 index 0000000..000ea0d --- /dev/null +++ b/tests/test_preprocess.py @@ -0,0 +1,149 @@ +import pathlib +import re +import textwrap +import unittest +from unittest.mock import patch + +from pyais.stream import FileReaderStream, PreprocessorProtocol, TCPConnection, UDPReceiver + + +# This serves as an example for a custom format. +# Taken from: https://github.com/M0r13n/pyais/issues/144 +FILE_CONTENT = textwrap.dedent(""" + [2024-07-19 08:45:27.141] !AIVDM,1,1,,A,13HOI:0P0000VOHLCnHQKwvL05Ip,0*23 + [2024-07-19 08:45:30.074] !AIVDM,1,1,,A,133sVfPP00PD>hRMDH@jNOvN20S8,0*7F + [2024-07-19 08:45:35.007] !AIVDM,1,1,,B,100h00PP0@PHFV`Mg5gTH?vNPUIp,0*3B + [2024-07-19 08:45:35.301] !AIVDM,1,1,,B,13eaJF0P00Qd388Eew6aagvH85Ip,0*45 + [2024-07-19 08:45:40.021] !AIVDM,1,1,,A,14eGrSPP00ncMJTO5C6aBwvP2D0?,0*7A + [2024-07-19 09:00:00.001] !AIVDO,2,1,,A,8=?eN>0000:C=4B1KTTsgLoUelGetEo0FoWr8jo=?045TNv5Tge6sAUl4MKWo,0*5F + [2024-07-19 09:00:00.002] !AIVDO,2,2,,A,vhOL9NIPln:BsP0=BLOiiCbE7;SKsSJfALeATapHfdm6Tl,2*79 + [2024-07-19 08:45:40.074] !AIVDM,1,1,,A,15MrVH0000KH<:V:NtBLoqFP2H9:,0*2F +""") + + +class Preprocessor(PreprocessorProtocol): + """Dummy Preprocessor that handles the format defined above""" + + def __init__(self) -> None: + self.last_meta = None + + def process(self, line: bytes): + nmea_message = re.search(b".* (.*)", line).group(1) + self.last_meta = re.search(b"(.*) .*", line).group(1) + return nmea_message + + def get_meta(self): + return self.last_meta + + +class PreprocessFileStreamTestCase(unittest.TestCase): + """Test case for file stream preprocessing.""" + + TEST_READER_FILE = pathlib.Path(__file__).parent.joinpath('preprocess.ais') + + @classmethod + def setUpClass(cls): + # Create a test file to read + with open(cls.TEST_READER_FILE, 'w') as fd: + fd.write(FILE_CONTENT) + + @classmethod + def tearDownClass(cls): + # Remove the test file again + cls.TEST_READER_FILE.unlink(missing_ok=True) + + def test_that_custom_format_can_be_parsed(self): + results, preprocessor = [], Preprocessor() + + with FileReaderStream(self.TEST_READER_FILE, preprocessor=preprocessor) as stream: + for msg in stream: + decoded = msg.decode() + results.append((decoded, preprocessor.get_meta())) + + self.assertEqual(len(results), 7) + self.assertEqual(results[0][0].mmsi, 227006760) + self.assertEqual(results[0][1], b"[2024-07-19 08:45:27.141]") + self.assertEqual(results[1][0].mmsi, 205448890) + self.assertEqual(results[1][1], b"[2024-07-19 08:45:30.074]") + self.assertEqual(results[2][0].mmsi, 786434) + self.assertEqual(results[2][1], b"[2024-07-19 08:45:35.007]") + self.assertEqual(results[3][0].mmsi, 249191000) + self.assertEqual(results[3][1], b"[2024-07-19 08:45:35.301]") + self.assertEqual(results[4][0].mmsi, 316013198) + self.assertEqual(results[4][1], b"[2024-07-19 08:45:40.021]") + self.assertEqual(results[5][0].mmsi, 888888888) + self.assertEqual(results[5][1], b"[2024-07-19 09:00:00.002]") + self.assertEqual(results[6][0].mmsi, 366913120) + self.assertEqual(results[6][1], b"[2024-07-19 08:45:40.074]") + + +class PreprocessUDPTestCase(unittest.TestCase): + """Test case for UDP preprocessing.""" + + @patch('pyais.stream.socket') + def test_that_custom_format_can_be_parsed(self, _): + results, preprocessor = [], Preprocessor() + + with UDPReceiver('0.0.0.0', 1234, preprocessor=preprocessor) as stream: + stream.recv = lambda: FILE_CONTENT.encode() + + for i, msg in enumerate(stream): + decoded = msg.decode() + results.append((decoded, preprocessor.get_meta())) + + if i >= 6: + break + + self.assertEqual(len(results), 7) + self.assertEqual(results[0][0].mmsi, 227006760) + self.assertEqual(results[0][1], b"[2024-07-19 08:45:27.141]") + self.assertEqual(results[1][0].mmsi, 205448890) + self.assertEqual(results[1][1], b"[2024-07-19 08:45:30.074]") + self.assertEqual(results[2][0].mmsi, 786434) + self.assertEqual(results[2][1], b"[2024-07-19 08:45:35.007]") + self.assertEqual(results[3][0].mmsi, 249191000) + self.assertEqual(results[3][1], b"[2024-07-19 08:45:35.301]") + self.assertEqual(results[4][0].mmsi, 316013198) + self.assertEqual(results[4][1], b"[2024-07-19 08:45:40.021]") + self.assertEqual(results[5][0].mmsi, 888888888) + self.assertEqual(results[5][1], b"[2024-07-19 09:00:00.002]") + self.assertEqual(results[6][0].mmsi, 366913120) + self.assertEqual(results[6][1], b"[2024-07-19 08:45:40.074]") + + +class PreprocessTCPTestCase(unittest.TestCase): + """Test case for TCP preprocessing.""" + + @patch('pyais.stream.socket') + def test_that_custom_format_can_be_parsed(self, _): + results, preprocessor = [], Preprocessor() + + with TCPConnection('0.0.0.0', 1234, preprocessor=preprocessor) as stream: + stream.recv = lambda: FILE_CONTENT.encode() + + for i, msg in enumerate(stream): + decoded = msg.decode() + results.append((decoded, preprocessor.get_meta())) + + if i >= 6: + break + + self.assertEqual(len(results), 7) + self.assertEqual(results[0][0].mmsi, 227006760) + self.assertEqual(results[0][1], b"[2024-07-19 08:45:27.141]") + self.assertEqual(results[1][0].mmsi, 205448890) + self.assertEqual(results[1][1], b"[2024-07-19 08:45:30.074]") + self.assertEqual(results[2][0].mmsi, 786434) + self.assertEqual(results[2][1], b"[2024-07-19 08:45:35.007]") + self.assertEqual(results[3][0].mmsi, 249191000) + self.assertEqual(results[3][1], b"[2024-07-19 08:45:35.301]") + self.assertEqual(results[4][0].mmsi, 316013198) + self.assertEqual(results[4][1], b"[2024-07-19 08:45:40.021]") + self.assertEqual(results[5][0].mmsi, 888888888) + self.assertEqual(results[5][1], b"[2024-07-19 09:00:00.002]") + self.assertEqual(results[6][0].mmsi, 366913120) + self.assertEqual(results[6][1], b"[2024-07-19 08:45:40.074]") + + +if __name__ == '__main__': + unittest.main()