Skip to content

Commit

Permalink
feat: adds PreprocessorProtocol for custom message formats (#147)
Browse files Browse the repository at this point in the history
* feat: adds PreprocessorProtocol for custom message formats

* fix: backwards compat for python 3.8 & drop support for 3.7

* chore: updates changelog

* fix: examples with FileReaderStream
  • Loading branch information
M0r13n authored Jul 28, 2024
1 parent 86fcd62 commit a6ac701
Show file tree
Hide file tree
Showing 19 changed files with 364 additions and 77 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ jobs:
strategy:
fail-fast: false
matrix:
python: ['3.7', '3.8', '3.9', '3.10', '3.11']
python: ['3.8', '3.9', '3.10', '3.11', '3.12']
os: ['ubuntu-latest']
steps:
- uses: actions/checkout@master

- name: Setup python
uses: actions/setup-python@v1
with:
Expand Down
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,5 @@ tests/test-reports-*/*
.mypy_cache/
profiling_results
venv/
build/
build/
*.ais
6 changes: 6 additions & 0 deletions CHANGELOG.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
====================
pyais CHANGELOG
====================
-------------------------------------------------------------------------------
Version 2.7.0 27 Jul 2024
-------------------------------------------------------------------------------
* adds the `PreprocessorProtocol` to support custom message formats
* DROPS support for Python 3.7 (EOL)
* minor correction in the documentation (thanks to @lsamaciel)
-------------------------------------------------------------------------------
Version 2.6.6 26 Jun 2024
-------------------------------------------------------------------------------
Expand Down
47 changes: 40 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,10 @@ from pyais.stream import FileReaderStream

filename = "sample.ais"

for msg in FileReaderStream(filename):
decoded = msg.decode()
print(decoded)
with FileReaderStream(filename) as stream:
for msg in stream:
decoded = msg.decode()
print(decoded)
```

Decode a stream of messages (e.g. a list or generator)::
Expand Down Expand Up @@ -325,6 +326,37 @@ Such details include information used by the slot allocation algorithm (either S

Refer to [readthedocs](https://pyais.readthedocs.io/en/latest/messages.html#communication-state) for more information.

# Preprocessing

The `PreprocessorProtocol` is designed to provide flexibility in handling different message formats. By implementing this protocol, users can create custom preprocessors that transform input messages into the required NMEA0183 format before further processing.

## Definition

```py
import typing

class PreprocessorProtocol(typing.Protocol):
def process(self, line: bytes) -> bytes:
pass
```

where `process` is defined as:

```py
def process(self, line: bytes) -> bytes:
pass
```

Parameters:
line (bytes): The input line in bytes that needs to be processed.
Returns:
bytes: The processed line in bytes, conforming to the NMEA0183 format.

The `process` method is responsible for transforming the input bytes into a format that adheres to the NMEA0183 standard. This method must be implemented by any class that conforms to the `PreprocessorProtocol`.

The custom preprocessor implementing the PreprocessorProtocol can be passed as an optional keyword argument (default None) to any class that implements the streaming protocol, excluding `IterMessages()`.

See [the preprocess example](./examples/preprocess.py) for an example implementation.

# AIS Filters

Expand Down Expand Up @@ -431,10 +463,11 @@ from pyais.stream import FileReaderStream

filename = pathlib.Path(__file__).parent.joinpath('sample.ais')

with AISTracker() as tracker:
for msg in FileReaderStream(str(filename)):
tracker.update(msg)
latest_tracks = tracker.n_latest_tracks(10)
with FileReaderStream(str(filename)) as stream:
with AISTracker() as tracker:
for msg in stream:
tracker.update(msg)
latest_tracks = tracker.n_latest_tracks(10)

# Get the latest 10 tracks
print('latest 10 tracks', ','.join(str(t.mmsi) for t in latest_tracks))
Expand Down
28 changes: 15 additions & 13 deletions docs/examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,10 @@ The following example shows how to read and parse AIS messages from a file::

filename = pathlib.Path(__file__).parent.joinpath('sample.ais')

for msg in FileReaderStream(str(filename)):
decoded = msg.decode()
print(decoded)
with FileReaderStream(str(filename)) as stream:
for msg in stream:
decoded = msg.decode()
print(decoded)

Gatehouse wrappers
-------------------
Expand All @@ -88,16 +89,17 @@ Some AIS messages have so-called Gatehouse wrappers::

filename = pathlib.Path(__file__).parent.joinpath('gatehouse.nmea')

for msg in FileReaderStream(str(filename)):
print('*' * 80)
if msg.wrapper_msg is not None: # <= optional gatehouse wrapper
print('Country', msg.wrapper_msg.country)
print('Online', msg.wrapper_msg.online_data)
print('PSS', msg.wrapper_msg.pss)
print('Region', msg.wrapper_msg.region)
print('Timestamp', msg.wrapper_msg.timestamp)
decoded = msg.decode()
print(decoded)
with FileReaderStream(str(filename)) as stream:
for msg in stream:
print('*' * 80)
if msg.wrapper_msg is not None: # <= optional gatehouse wrapper
print('Country', msg.wrapper_msg.country)
print('Online', msg.wrapper_msg.online_data)
print('PSS', msg.wrapper_msg.pss)
print('Region', msg.wrapper_msg.region)
print('Timestamp', msg.wrapper_msg.timestamp)
decoded = msg.decode()
print(decoded)

Livestream
-----------
Expand Down
2 changes: 1 addition & 1 deletion examples/decode.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
decoded_s = decode("!AIVDM,1,1,,B,15NG6V0P01G?cFhE`R2IU?wn28R>,0*05")
assert decoded_b == decoded_s

# Lets say you have some kind of stream of messages. Than you can use `IterMessages` to decode the messages:
# Lets say you have some kind of stream of messages. Then you can use `IterMessages` to decode the messages:
fake_stream = [
b"!AIVDM,1,1,,A,13HOI:0P0000VOHLCnHQKwvL05Ip,0*23",
b"!AIVDM,1,1,,A,133sVfPP00PD>hRMDH@jNOvN20S8,0*7F",
Expand Down
7 changes: 4 additions & 3 deletions examples/file_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

filename = pathlib.Path(__file__).parent.joinpath('sample.ais')

for msg in FileReaderStream(str(filename)):
decoded = msg.decode()
print(decoded)
with FileReaderStream(str(filename)) as stream:
for msg in stream:
decoded = msg.decode()
print(decoded)
21 changes: 11 additions & 10 deletions examples/gatehouse_wrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@

filename = pathlib.Path(__file__).parent.joinpath('gatehouse.nmea')

for msg in FileReaderStream(str(filename)):
print('*' * 80)
if msg.wrapper_msg is not None: # <= optional gatehouse wrapper
print('Country', msg.wrapper_msg.country)
print('Online', msg.wrapper_msg.online_data)
print('PSS', msg.wrapper_msg.pss)
print('Region', msg.wrapper_msg.region)
print('Timestamp', msg.wrapper_msg.timestamp)
decoded = msg.decode()
print(decoded)
with FileReaderStream(str(filename)) as stream:
for msg in stream:
print('*' * 80)
if msg.wrapper_msg is not None: # <= optional gatehouse wrapper
print('Country', msg.wrapper_msg.country)
print('Online', msg.wrapper_msg.online_data)
print('PSS', msg.wrapper_msg.pss)
print('Region', msg.wrapper_msg.region)
print('Timestamp', msg.wrapper_msg.timestamp)
decoded = msg.decode()
print(decoded)
44 changes: 44 additions & 0 deletions examples/preprocess.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import pathlib
import re
import textwrap

from pyais.stream import FileReaderStream, PreprocessorProtocol

filename = pathlib.Path(__file__).parent.joinpath('preprocess.ais')

# Create a sample file
with open(filename, 'w') as fd:
fd.write(textwrap.dedent("""
[2024-07-19 08:45:27.141] !AIVDM,1,1,,A,13HOI:0P0000VOHLCnHQKwvL05Ip,0*23
[2024-07-19 08:45:30.074] !AIVDM,1,1,,A,133sVfPP00PD>hRMDH@jNOvN20S8,0*7F
[2024-07-19 08:45:35.007] !AIVDM,1,1,,B,100h00PP0@PHFV`Mg5gTH?vNPUIp,0*3B
[2024-07-19 08:45:35.301] !AIVDM,1,1,,B,13eaJF0P00Qd388Eew6aagvH85Ip,0*45
[2024-07-19 08:45:40.021] !AIVDM,1,1,,A,14eGrSPP00ncMJTO5C6aBwvP2D0?,0*7A
[2024-07-19 09:00:00.001] !AIVDO,2,1,,A,8=?eN>0000:C=4B1KTTsgLoUelGetEo0FoWr8jo=?045TNv5Tge6sAUl4MKWo,0*5F
[2024-07-19 09:00:00.002] !AIVDO,2,2,,A,vhOL9NIPln:BsP0=BLOiiCbE7;SKsSJfALeATapHfdm6Tl,2*79
[2024-07-19 08:45:40.074] !AIVDM,1,1,,A,15MrVH0000KH<:V:NtBLoqFP2H9:,0*2F
"""))


class Preprocessor(PreprocessorProtocol):
"""A custom preprocessor class that implements the PreprocessorProtocol.
This class implements the parsing of a custom meta data format [2024-07-19 08:45:40.074]."""

def __init__(self) -> None:
self.last_meta = None

def process(self, line: bytes):
nmea_message = re.search(b".* (.*)", line).group(1)
self.last_meta = re.search(b"(.*) .*", line).group(1)
return nmea_message

def get_meta(self):
return self.last_meta


preprocessor = Preprocessor()

with FileReaderStream(str(filename), preprocessor=preprocessor) as stream:
for msg in stream:
decoded = msg.decode()
print(decoded, preprocessor.get_meta())
13 changes: 7 additions & 6 deletions examples/tag_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@

filename = pathlib.Path(__file__).parent.joinpath('sample.ais')

for nmea in FileReaderStream(filename):
if nmea.tag_block:
# again: not all messages have a tag block
nmea.tag_block.init()
print(nmea.tag_block.asdict())
with FileReaderStream(filename) as stream:
for nmea in stream:
if nmea.tag_block:
# again: not all messages have a tag block
nmea.tag_block.init()
print(nmea.tag_block.asdict())

decoded = nmea.decode()
decoded = nmea.decode()
9 changes: 5 additions & 4 deletions examples/tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@

filename = pathlib.Path(__file__).parent.joinpath('sample.ais')

with AISTracker() as tracker:
for msg in FileReaderStream(str(filename)):
tracker.update(msg)
latest_tracks = tracker.n_latest_tracks(10)
with FileReaderStream(str(filename)) as stream:
with AISTracker() as tracker:
for msg in stream:
tracker.update(msg)
latest_tracks = tracker.n_latest_tracks(10)

# Get the latest 10 tracks
print('latest 10 tracks', ','.join(str(t.mmsi) for t in latest_tracks))
Expand Down
13 changes: 7 additions & 6 deletions examples/tracking_perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@
tracker = AISTracker(ttl_in_seconds=0.01, stream_is_ordered=True)

start = time.time()
for i, msg in enumerate(FileReaderStream(str(filename)), start=1):
try:
tracker.update(msg)
_ = tracker.n_latest_tracks(50)
except UnknownMessageException as e:
print(str(e))
with FileReaderStream(str(filename)) as stream:
for i, msg in enumerate(stream, start=1):
try:
tracker.update(msg)
_ = tracker.n_latest_tracks(50)
except UnknownMessageException as e:
print(str(e))

finish = time.time()

Expand Down
5 changes: 3 additions & 2 deletions pyais/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from pyais.messages import NMEAMessage, ANY_MESSAGE, AISSentence
from pyais.stream import TCPConnection, FileReaderStream, IterMessages, Stream
from pyais.stream import TCPConnection, FileReaderStream, IterMessages, Stream, PreprocessorProtocol
from pyais.encode import encode_dict, encode_msg, ais_to_nmea_0183
from pyais.decode import decode
from pyais.tracker import AISTracker, AISTrack

__license__ = 'MIT'
__version__ = '2.6.6'
__version__ = '2.7.0'
__author__ = 'Leon Morten Richter'

__all__ = (
Expand All @@ -19,6 +19,7 @@
'IterMessages',
'FileReaderStream',
'Stream',
'PreprocessorProtocol',
'decode',
'AISTracker',
'AISTrack',
Expand Down
Loading

0 comments on commit a6ac701

Please sign in to comment.