From 816de54f3649de2ddf4aeadc646929814e0279b6 Mon Sep 17 00:00:00 2001
From: Leon Morten Richter
Date: Sat, 21 Sep 2024 13:49:40 +0200
Subject: [PATCH] feat: adds TagBlockQueue (#152)
* feat: adds TagBlockQueue
* fix: unsupported type hints on Python <3.10
* chore: docs
---
README.md | 46 +++++++++------
examples/grouping.py | 14 +++++
examples/sample.ais | 4 ++
pyais/stream.py | 121 ++++++++++++++++++++++++++++++++++------
tests/test_examples.py | 2 +-
tests/test_tag_block.py | 56 ++++++++++++++++++-
6 files changed, 209 insertions(+), 34 deletions(-)
create mode 100644 examples/grouping.py
diff --git a/README.md b/README.md
index 5fa2174..fafb6f7 100644
--- a/README.md
+++ b/README.md
@@ -14,7 +14,6 @@
![CI](https://github.com/M0r13n/pyais/workflows/CI/badge.svg)
[![Documentation Status](https://readthedocs.org/projects/pyais/badge/?version=latest)](https://pyais.readthedocs.io/en/latest/?badge=latest)
-
@@ -51,7 +50,7 @@ This AIS sentence is known as a "Position Report" message and is used to transmi
- : This field is left blank. This field can contain the sequence number.
- **B**: This field indicates the communication channel being used to transmit the message. In this case, the channel is "B".
- **15MwkT1P37G?fl0EJbR0OwT0@MS**: This field contains the payload of the message, which is encoded using a variant of ASCII known as "Six-bit ASCII". The payload contains information such as the vessel's identification, position, course, and speed.
- 0*4E: This field is a checksum that is used to verify the integrity of the sentence.
+ 0\*4E: This field is a checksum that is used to verify the integrity of the sentence.
**pyais** is a Python modul to encode and decode AIS messages.
@@ -149,7 +148,7 @@ The AIS data is freely available under the [norwegian license for public data](h
Data can be read from a TCP/IP socket and is encoded according to IEC 62320-1:
-- IP: 153.44.253.27
+- IP: 153.44.253.27
- Port: 5631
Refer to the [examples/live_stream.py](./examples/live_stream.py) for a practical example on how to read & decode AIS data from a TCP/IP socket.
@@ -159,8 +158,8 @@ This is useful for debugging or for getting used to pyais.
It is also possible to encode messages.
-| :exclamation: Every message needs at least a single keyword argument: `mmsi`. All other fields have most likely default values. |
-| -------------------------------------------------------------------------------------------------------------------------------- |
+| :exclamation: Every message needs at least a single keyword argument: `mmsi`. All other fields have most likely default values. |
+| ------------------------------------------------------------------------------------------------------------------------------- |
### Encode data using a dictionary
@@ -169,12 +168,12 @@ You can pass a dict that has a set of key-value pairs:
- use `from pyais.encode import encode_dict` to import `encode_dict` method
- it takes a dictionary of data and some NMEA specific kwargs and returns the NMEA 0183 encoded AIS sentence.
- only keys known to each message are considered
- - other keys are simply omitted
- - you can get list of available keys by looking at pyais/encode.py
- - you can also call `MessageType1.fields()` to get a list of fields programmatically for each message
+ - other keys are simply omitted
+ - you can get list of available keys by looking at pyais/encode.py
+ - you can also call `MessageType1.fields()` to get a list of fields programmatically for each message
- every message needs at least two keyword arguments:
- - `mmsi` the MMSI number to encode
- - `type` or `msg_type` the type of the message to encode (1-27)
+ - `mmsi` the MMSI number to encode
+ - `type` or `msg_type` the type of the message to encode (1-27)
**NOTE:**
This method takes care of splitting large payloads (larger than 60 characters)
@@ -303,6 +302,21 @@ with IterMessages(messages) as s:
print(msg.decode())
```
+## Tag Block Queue (grouping)
+
+Every class that implements the streaming API accepts an optional keyword argument `tbq`, which is set to `None` by default. When tbq is provided, it can be used as a queue for handling NMEA tag blocks. The queue's `get_nowait()` method allows you to retrieve a list of NMEASentence objects, but only when the entire group has been received (i.e., all sentences within the group are complete). It is important to note that this is rudimentary support for tag block groups, as pyais primarily focuses on processing AIS messages and abstracts away NMEA sentences from the user.
+
+```py
+with FileReaderStream('/path/to/file.nmea', tbq=TagBlockQueue()) as stream:
+ tbq = stream.tbq
+
+ for msg in stream:
+ try:
+ print(tbq.get_nowait())
+ except queue.Empty:
+ pass
+```
+
# Gatehouse wrappers
Some AIS messages have so-called Gatehouse wrappers. These encapsulating messages contain extra information, such as time and checksums. Some readers also process these. See some more documentation [here](https://www.iala-aism.org/wiki/iwrap/index.php/GH_AIS_Message_Format).
@@ -348,9 +362,9 @@ def process(self, line: bytes) -> bytes:
```
Parameters:
- line (bytes): The input line in bytes that needs to be processed.
+line (bytes): The input line in bytes that needs to be processed.
Returns:
- bytes: The processed line in bytes, conforming to the NMEA0183 format.
+bytes: The processed line in bytes, conforming to the NMEA0183 format.
The `process` method is responsible for transforming the input bytes into a format that adheres to the NMEA0183 standard. This method must be implemented by any class that conforms to the `PreprocessorProtocol`.
@@ -549,7 +563,7 @@ feature, please create an issue.
During installation, you may encounter problems due to missing header files. The error looks like this:
-````sh
+```sh
...
bitarray/_bitarray.c:13:10: fatal error: Python.h: No such file or directory
@@ -560,13 +574,13 @@ During installation, you may encounter problems due to missing header files. The
...
-````
+```
In order to solve this issue, you need to install header files and static libraries for python dev:
-````sh
+```sh
$ sudo apt install python3-dev
-````
+```
# For developers
diff --git a/examples/grouping.py b/examples/grouping.py
new file mode 100644
index 0000000..12628e9
--- /dev/null
+++ b/examples/grouping.py
@@ -0,0 +1,14 @@
+from pyais.stream import FileReaderStream, TagBlockQueue
+import pathlib
+
+filename = pathlib.Path(__file__).parent.joinpath('sample.ais')
+
+# To track NMEA 4.10 tag block groups a queue is required.
+# This queue buffers NMEA sentences belonging until the group is complete.
+# NOTE: get_nowait will return NMEA sentences - NOT AIS sentences!
+tbq = TagBlockQueue()
+
+with FileReaderStream(str(filename), tbq=tbq) as stream:
+ for msg in stream:
+ while not tbq.empty():
+ print(tbq.get_nowait())
diff --git a/examples/sample.ais b/examples/sample.ais
index 77232f3..3251111 100644
--- a/examples/sample.ais
+++ b/examples/sample.ais
@@ -10,3 +10,7 @@
IamNotAnAisMessage1111
# Tag blocks are also supported
\g:1-2-73874,n:157036,s:r003669945,c:12415440354*A\!AIVDM,1,1,,B,15N4cJ005Jrek0H@9nDW5608EP,013
+\g:1-2-27300,n:636994,s:b003669710,c:1428621738*5F\!SAVDM,2,1,2,B,55Mw@A7J1adAL@?;7WPl58F0U bytes:
pass
+class TagBlockQueue(queue.Queue): # type: ignore
+
+ def __init__(self, maxsize: int = 0) -> None:
+ self.groups: typing.Dict[str, typing.Dict[str, object]] = {}
+ super().__init__(maxsize)
+
+ def put_sentence(self, sentence: NMEASentence) -> None:
+ if not sentence.tag_block:
+ # No NMEA 4.10 tag block
+ # Returns a single line immediately.
+ super().put([sentence,])
+ return
+
+ tb = sentence.tag_block
+ tb.init()
+
+ if not tb.group:
+ # No NMEA 4.10 tag block 'g'.
+ super().put([sentence,])
+ return
+
+ if int(tb.group.sentence_tot) == 1:
+ # Group of a single sentence
+ super().put([sentence,])
+ return
+
+ if int(tb.group.sentence_num) == 1:
+ # The first sentence
+ self.groups[tb.group.group_id] = {'sentence_tot': tb.group.sentence_tot, 'sentences': [sentence,]}
+ return
+
+ if tb.group.group_id not in self.groups:
+ # Unknown group. First sentence of group is missing.
+ return
+
+ self.groups[tb.group.group_id]['sentences'].append(sentence) # type: ignore
+
+ # is_last_sentence = int(tb.group.sentence_num) != int(self.groups[tb.group.group_id]['sentence_tot'])
+ group_is_complete = int(tb.group.sentence_tot) != len(self.groups[tb.group.group_id]['sentences']) # type: ignore
+
+ if group_is_complete:
+ return
+
+ # All sentences belonging to this group were received.
+ super().put(self.groups[tb.group.group_id]['sentences'])
+ del self.groups[tb.group.group_id]
+
+
class AssembleMessages(ABC):
"""
Base class that assembles multiline messages.
Offers a iterator like interface.
"""
- def __init__(self) -> None:
+ def __init__(self, tbq: typing.Optional[TagBlockQueue] = None) -> None:
self.wrapper_msg: typing.Optional[GatehouseSentence] = None
+ self.tbq: typing.Optional[TagBlockQueue] = tbq
def __enter__(self) -> "AssembleMessages":
# Enables use of with statement
@@ -91,6 +141,12 @@ def __insert_wrapper_msg(self, msg: AISSentence) -> AISSentence:
msg.wrapper_msg = wrapper_msg
return msg
+ def __add_to_tbq(self, sentence: NMEASentence) -> None:
+ if not self.tbq:
+ # Tag Block Queue not defined. Do nothing.
+ return
+ self.tbq.put_sentence(sentence)
+
def _assemble_messages(self) -> Generator[NMEAMessage, None, None]:
buffer: typing.Dict[typing.Tuple[int, str], typing.List[typing.Optional[NMEAMessage]]] = {}
messages = self._iter_messages()
@@ -99,6 +155,7 @@ def _assemble_messages(self) -> Generator[NMEAMessage, None, None]:
for line in messages:
try:
sentence = NMEASentenceFactory.produce(line)
+ self.__add_to_tbq(sentence)
if sentence.TYPE == GatehouseSentence.TYPE:
sentence = cast(GatehouseSentence, sentence)
self.__set_last_wrapper_msg(sentence)
@@ -143,8 +200,8 @@ def _iter_messages(self) -> Generator[bytes, None, None]:
class IterMessages(AssembleMessages):
- def __init__(self, messages: Iterable[bytes]):
- super().__init__()
+ def __init__(self, messages: Iterable[bytes], tbq: typing.Optional[TagBlockQueue] = None):
+ super().__init__(tbq=tbq)
# If the user passes a single byte string make it into a list
if isinstance(messages, bytes):
messages = [messages, ]
@@ -176,13 +233,17 @@ def _iter_messages(self) -> Generator[bytes, None, None]:
class Stream(AssembleMessages, Generic[F], ABC):
- def __init__(self, fobj: F, preprocessor: typing.Optional[PreprocessorProtocol] = None) -> None:
+ def __init__(
+ self, fobj: F,
+ preprocessor: typing.Optional[PreprocessorProtocol] = None,
+ tbq: typing.Optional[TagBlockQueue] = None
+ ) -> None:
"""
Create a new Stream-like object.
@param fobj: A file-like or socket object.
@param preprocessor: An optional preprocessor
"""
- super().__init__()
+ super().__init__(tbq=tbq)
self._fobj: F = fobj
self.preprocessor = preprocessor
@@ -211,8 +272,13 @@ def read(self) -> Generator[bytes, None, None]:
class BinaryIOStream(Stream[BinaryIO]):
"""Read messages from a file-like object"""
- def __init__(self, file: BinaryIO, preprocessor: typing.Optional[PreprocessorProtocol] = None) -> None:
- super().__init__(file, preprocessor=preprocessor)
+ def __init__(
+ self,
+ file: BinaryIO,
+ preprocessor: typing.Optional[PreprocessorProtocol] = None,
+ tbq: typing.Optional[TagBlockQueue] = None
+ ) -> None:
+ super().__init__(file, preprocessor=preprocessor, tbq=tbq)
def read(self) -> Generator[bytes, None, None]:
yield from self._fobj
@@ -223,7 +289,13 @@ class FileReaderStream(BinaryIOStream):
Read NMEA messages from file
"""
- def __init__(self, filename: typing.Union[str, pathlib.Path], mode: str = "rb", preprocessor: typing.Optional[PreprocessorProtocol] = None) -> None:
+ def __init__(
+ self,
+ filename: typing.Union[str, pathlib.Path],
+ mode: str = "rb",
+ preprocessor: typing.Optional[PreprocessorProtocol] = None,
+ tbq: typing.Optional[TagBlockQueue] = None
+ ) -> None:
self.filename: typing.Union[str, pathlib.Path] = filename
self.mode: str = mode
# Try to open file
@@ -232,7 +304,7 @@ def __init__(self, filename: typing.Union[str, pathlib.Path], mode: str = "rb",
file = cast(BinaryIO, file)
except Exception as e:
raise FileNotFoundError(f"Could not open file {self.filename}") from e
- super().__init__(file, preprocessor=preprocessor)
+ super().__init__(file, preprocessor=preprocessor, tbq=tbq)
class ByteStream(Stream[None]):
@@ -240,9 +312,14 @@ class ByteStream(Stream[None]):
Takes a iterable that contains ais messages as bytes and assembles them.
"""
- def __init__(self, iterable: Iterable[bytes], preprocessor: typing.Optional[PreprocessorProtocol] = None) -> None:
+ def __init__(
+ self,
+ iterable: Iterable[bytes],
+ preprocessor: typing.Optional[PreprocessorProtocol] = None,
+ tbq: typing.Optional[TagBlockQueue] = None
+ ) -> None:
self.iterable: Iterable[bytes] = iterable
- super().__init__(None, preprocessor=preprocessor)
+ super().__init__(None, preprocessor=preprocessor, tbq=tbq)
def __exit__(self, exc_type: object, exc_val: object, exc_tb: object) -> None:
return
@@ -285,10 +362,16 @@ def read(self) -> Generator[bytes, None, None]:
class UDPReceiver(SocketStream):
- def __init__(self, host: str, port: int, preprocessor: typing.Optional[PreprocessorProtocol] = None) -> None:
+ def __init__(
+ self,
+ host: str,
+ port: int,
+ preprocessor: typing.Optional[PreprocessorProtocol] = None,
+ tbq: typing.Optional[TagBlockQueue] = None
+ ) -> None:
sock: socket = socket(AF_INET, SOCK_DGRAM)
sock.bind((host, port))
- super().__init__(sock, preprocessor=preprocessor)
+ super().__init__(sock, preprocessor=preprocessor, tbq=tbq)
def recv(self) -> bytes:
return self._fobj.recvfrom(self.BUF_SIZE)[0]
@@ -303,11 +386,17 @@ class TCPConnection(SocketStream):
def recv(self) -> bytes:
return self._fobj.recv(self.BUF_SIZE)
- def __init__(self, host: str, port: int = 80, preprocessor: typing.Optional[PreprocessorProtocol] = None) -> None:
+ def __init__(
+ self,
+ host: str,
+ port: int = 80,
+ preprocessor: typing.Optional[PreprocessorProtocol] = None,
+ tbq: typing.Optional[TagBlockQueue] = None
+ ) -> None:
sock: socket = socket(AF_INET, SOCK_STREAM)
try:
sock.connect((host, port))
except ConnectionRefusedError as e:
sock.close()
raise ConnectionRefusedError(f"Failed to connect to {host}:{port}") from e
- super().__init__(sock, preprocessor=preprocessor)
+ super().__init__(sock, preprocessor=preprocessor, tbq=tbq)
diff --git a/tests/test_examples.py b/tests/test_examples.py
index d4c5ed3..38b9ce3 100644
--- a/tests/test_examples.py
+++ b/tests/test_examples.py
@@ -32,4 +32,4 @@ def test_run_every_file(self):
if csv_file.exists():
csv_file.unlink()
- assert i == 21
+ assert i == 22
diff --git a/tests/test_tag_block.py b/tests/test_tag_block.py
index d6cd84e..892de05 100644
--- a/tests/test_tag_block.py
+++ b/tests/test_tag_block.py
@@ -3,7 +3,57 @@
from pyais.exceptions import TagBlockNotInitializedException, UnknownMessageException
from pyais.messages import AISSentence, NMEASentenceFactory, TagBlock
-from pyais.stream import IterMessages
+from pyais.stream import IterMessages, TagBlockQueue
+
+
+class TagBlockQueueTestCase(unittest.TestCase):
+
+ def test_put_single_w_group(self):
+ tbq = TagBlockQueue()
+
+ raw = b'\\g:1-1-4512,s:FooBar,c:1428451253*50\\!AIVDM,1,1,,A,13nN34?000QFpgRWnQLLSPpF00SO,0*1C'
+ sentence = NMEASentenceFactory.produce(raw)
+ tbq.put_sentence(sentence)
+
+ result = tbq.get_nowait()
+
+ self.assertEqual(len(result), 1)
+ self.assertEqual(result[0], sentence)
+
+ def test_put_single_wo_group(self):
+ tbq = TagBlockQueue()
+
+ raw = b'!AIVDM,1,1,,A,13nN34?000QFpgRWnQLLSPpF00SO,0*1C'
+ sentence = NMEASentenceFactory.produce(raw)
+ tbq.put_sentence(sentence)
+
+ result = tbq.get_nowait()
+
+ self.assertEqual(len(result), 1)
+ self.assertEqual(result[0], sentence)
+
+ def test_put_multiple_w_groups(self):
+ RAWS = [
+ b'\\g:1-3-4512,s:FooBar,c:1428451253*50\\!AIVDM,1,1,,A,13nN34?000QFpgRWnQLLSPpF00SO,0*1C',
+ b'\\g:3-3-4512,s:FooBar,c:1428451253*50\\!AIVDM,1,3,,A,13nN34?000QFpgRWnQLLSPpF00SO,0*1C',
+ b'\\g:1-3-1234*30\\!AIVDM,1,1,,A,13nN34?000QFpgRWnQLLSPpF00SO,0*1C',
+ b'\\g:2-3-4512*30\\!AIVDM,1,2,,A,13nN34?000QFpgRWnQLLSPpF00SO,0*1C',
+ b'\\g:1-1-1337,s:FooBar,c:1428451253*50\\!AIVDM,1,2,,A,13nN34?000QFpgRWnQLLSPpF00SO,0*1C',
+ b'\\g:1-42-4242,s:FooBar,c:1428451253*50\\!AIVDM,1,1,,A,13nN34?000QFpgRWnQLLSPpF00SO,0*1C',
+ ]
+ tbq = TagBlockQueue()
+
+ for raw in RAWS:
+ tbq.put_sentence(NMEASentenceFactory.produce(raw))
+
+ result = tbq.get_nowait()
+ self.assertEqual(len(result), 3)
+ self.assertEqual(result[0].frag_num, 1)
+ self.assertEqual(result[1].frag_num, 3)
+ self.assertEqual(result[2].frag_num, 2)
+
+ result = tbq.get_nowait()
+ self.assertEqual(len(result), 1)
class TagBlockTestCase(unittest.TestCase):
@@ -187,3 +237,7 @@ def test_that_unknown_tag_blocks_can_exported_as_dicts(self):
'text': None
}
)
+
+
+if __name__ == '__main__':
+ unittest.main()