Skip to content

Commit

Permalink
feat: adds TagBlockQueue (#152)
Browse files Browse the repository at this point in the history
* feat: adds TagBlockQueue

* fix: unsupported type hints on Python <3.10

* chore: docs
  • Loading branch information
M0r13n authored Sep 21, 2024
1 parent a75aa87 commit 816de54
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 34 deletions.
46 changes: 30 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
![CI](https://github.com/M0r13n/pyais/workflows/CI/badge.svg)
[![Documentation Status](https://readthedocs.org/projects/pyais/badge/?version=latest)](https://pyais.readthedocs.io/en/latest/?badge=latest)


</p>
</div>

Expand Down Expand Up @@ -51,7 +50,7 @@ This AIS sentence is known as a "Position Report" message and is used to transmi
- : This field is left blank. This field can contain the sequence number.
- **B**: This field indicates the communication channel being used to transmit the message. In this case, the channel is "B".
- **15MwkT1P37G?fl0EJbR0OwT0@MS**: This field contains the payload of the message, which is encoded using a variant of ASCII known as "Six-bit ASCII". The payload contains information such as the vessel's identification, position, course, and speed.
0*4E: This field is a checksum that is used to verify the integrity of the sentence.
0\*4E: This field is a checksum that is used to verify the integrity of the sentence.

**pyais** is a Python modul to encode and decode AIS messages.

Expand Down Expand Up @@ -149,7 +148,7 @@ The AIS data is freely available under the [norwegian license for public data](h

Data can be read from a TCP/IP socket and is encoded according to IEC 62320-1:

- IP: 153.44.253.27
- IP: 153.44.253.27
- Port: 5631

Refer to the [examples/live_stream.py](./examples/live_stream.py) for a practical example on how to read & decode AIS data from a TCP/IP socket.
Expand All @@ -159,8 +158,8 @@ This is useful for debugging or for getting used to pyais.

It is also possible to encode messages.

| :exclamation: Every message needs at least a single keyword argument: `mmsi`. All other fields have most likely default values. |
| -------------------------------------------------------------------------------------------------------------------------------- |
| :exclamation: Every message needs at least a single keyword argument: `mmsi`. All other fields have most likely default values. |
| ------------------------------------------------------------------------------------------------------------------------------- |

### Encode data using a dictionary

Expand All @@ -169,12 +168,12 @@ You can pass a dict that has a set of key-value pairs:
- use `from pyais.encode import encode_dict` to import `encode_dict` method
- it takes a dictionary of data and some NMEA specific kwargs and returns the NMEA 0183 encoded AIS sentence.
- only keys known to each message are considered
- other keys are simply omitted
- you can get list of available keys by looking at pyais/encode.py
- you can also call `MessageType1.fields()` to get a list of fields programmatically for each message
- other keys are simply omitted
- you can get list of available keys by looking at pyais/encode.py
- you can also call `MessageType1.fields()` to get a list of fields programmatically for each message
- every message needs at least two keyword arguments:
- `mmsi` the MMSI number to encode
- `type` or `msg_type` the type of the message to encode (1-27)
- `mmsi` the MMSI number to encode
- `type` or `msg_type` the type of the message to encode (1-27)

**NOTE:**
This method takes care of splitting large payloads (larger than 60 characters)
Expand Down Expand Up @@ -303,6 +302,21 @@ with IterMessages(messages) as s:
print(msg.decode())
```

## Tag Block Queue (grouping)

Every class that implements the streaming API accepts an optional keyword argument `tbq`, which is set to `None` by default. When tbq is provided, it can be used as a queue for handling NMEA tag blocks. The queue's `get_nowait()` method allows you to retrieve a list of NMEASentence objects, but only when the entire group has been received (i.e., all sentences within the group are complete). It is important to note that this is rudimentary support for tag block groups, as pyais primarily focuses on processing AIS messages and abstracts away NMEA sentences from the user.

```py
with FileReaderStream('/path/to/file.nmea', tbq=TagBlockQueue()) as stream:
tbq = stream.tbq

for msg in stream:
try:
print(tbq.get_nowait())
except queue.Empty:
pass
```

# Gatehouse wrappers

Some AIS messages have so-called Gatehouse wrappers. These encapsulating messages contain extra information, such as time and checksums. Some readers also process these. See some more documentation [here](https://www.iala-aism.org/wiki/iwrap/index.php/GH_AIS_Message_Format).
Expand Down Expand Up @@ -348,9 +362,9 @@ def process(self, line: bytes) -> bytes:
```

Parameters:
line (bytes): The input line in bytes that needs to be processed.
line (bytes): The input line in bytes that needs to be processed.
Returns:
bytes: The processed line in bytes, conforming to the NMEA0183 format.
bytes: The processed line in bytes, conforming to the NMEA0183 format.

The `process` method is responsible for transforming the input bytes into a format that adheres to the NMEA0183 standard. This method must be implemented by any class that conforms to the `PreprocessorProtocol`.

Expand Down Expand Up @@ -549,7 +563,7 @@ feature, please create an issue.

During installation, you may encounter problems due to missing header files. The error looks like this:

````sh
```sh
...

bitarray/_bitarray.c:13:10: fatal error: Python.h: No such file or directory
Expand All @@ -560,13 +574,13 @@ During installation, you may encounter problems due to missing header files. The

...

````
```

In order to solve this issue, you need to install header files and static libraries for python dev:

````sh
```sh
$ sudo apt install python3-dev
````
```

# For developers

Expand Down
14 changes: 14 additions & 0 deletions examples/grouping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from pyais.stream import FileReaderStream, TagBlockQueue
import pathlib

filename = pathlib.Path(__file__).parent.joinpath('sample.ais')

# To track NMEA 4.10 tag block groups a queue is required.
# This queue buffers NMEA sentences belonging until the group is complete.
# NOTE: get_nowait will return NMEA sentences - NOT AIS sentences!
tbq = TagBlockQueue()

with FileReaderStream(str(filename), tbq=tbq) as stream:
for msg in stream:
while not tbq.empty():
print(tbq.get_nowait())
4 changes: 4 additions & 0 deletions examples/sample.ais
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,7 @@
IamNotAnAisMessage1111
# Tag blocks are also supported
\g:1-2-73874,n:157036,s:r003669945,c:12415440354*A\!AIVDM,1,1,,B,15N4cJ005Jrek0H@9nDW5608EP,013
\g:1-2-27300,n:636994,s:b003669710,c:1428621738*5F\!SAVDM,2,1,2,B,55Mw@A7J1adAL@?;7WPl58F0U<h4pB222222220t1PN5553fN4g?`4iSp5Rc,0*26
\g:2-2-27300,n:636995*15\!SAVDM,2,2,2,B,iP`88888880,2*5E
\g:1-2-73874*A\!AIVDM,1,1,,A,15MrVH0000KH<:V:NtBLoqFP2H9:,0*2F
\g:2-2-73874,n:157037*A\!AIVDM,1,1,,B,100h00PP0@PHFV`Mg5gTH?vNPUIp,0*3B
121 changes: 105 additions & 16 deletions pyais/stream.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import typing
import pathlib
import queue
from abc import ABC, abstractmethod
from socket import AF_INET, SOCK_DGRAM, SOCK_STREAM, socket
from typing import BinaryIO, Generator, Generic, Iterable, List, TypeVar, cast

from pyais.exceptions import InvalidNMEAMessageException, NonPrintableCharacterException, UnknownMessageException
from pyais.messages import AISSentence, GatehouseSentence, NMEAMessage, NMEASentenceFactory
from pyais.messages import AISSentence, GatehouseSentence, NMEAMessage, NMEASentence, NMEASentenceFactory

T = TypeVar("T")
F = TypeVar("F", BinaryIO, socket, None)
Expand Down Expand Up @@ -54,14 +55,63 @@ def process(self, line: bytes) -> bytes:
pass


class TagBlockQueue(queue.Queue): # type: ignore

def __init__(self, maxsize: int = 0) -> None:
self.groups: typing.Dict[str, typing.Dict[str, object]] = {}
super().__init__(maxsize)

def put_sentence(self, sentence: NMEASentence) -> None:
if not sentence.tag_block:
# No NMEA 4.10 tag block
# Returns a single line immediately.
super().put([sentence,])
return

tb = sentence.tag_block
tb.init()

if not tb.group:
# No NMEA 4.10 tag block 'g'.
super().put([sentence,])
return

if int(tb.group.sentence_tot) == 1:
# Group of a single sentence
super().put([sentence,])
return

if int(tb.group.sentence_num) == 1:
# The first sentence
self.groups[tb.group.group_id] = {'sentence_tot': tb.group.sentence_tot, 'sentences': [sentence,]}
return

if tb.group.group_id not in self.groups:
# Unknown group. First sentence of group is missing.
return

self.groups[tb.group.group_id]['sentences'].append(sentence) # type: ignore

# is_last_sentence = int(tb.group.sentence_num) != int(self.groups[tb.group.group_id]['sentence_tot'])
group_is_complete = int(tb.group.sentence_tot) != len(self.groups[tb.group.group_id]['sentences']) # type: ignore

if group_is_complete:
return

# All sentences belonging to this group were received.
super().put(self.groups[tb.group.group_id]['sentences'])
del self.groups[tb.group.group_id]


class AssembleMessages(ABC):
"""
Base class that assembles multiline messages.
Offers a iterator like interface.
"""

def __init__(self) -> None:
def __init__(self, tbq: typing.Optional[TagBlockQueue] = None) -> None:
self.wrapper_msg: typing.Optional[GatehouseSentence] = None
self.tbq: typing.Optional[TagBlockQueue] = tbq

def __enter__(self) -> "AssembleMessages":
# Enables use of with statement
Expand Down Expand Up @@ -91,6 +141,12 @@ def __insert_wrapper_msg(self, msg: AISSentence) -> AISSentence:
msg.wrapper_msg = wrapper_msg
return msg

def __add_to_tbq(self, sentence: NMEASentence) -> None:
if not self.tbq:
# Tag Block Queue not defined. Do nothing.
return
self.tbq.put_sentence(sentence)

def _assemble_messages(self) -> Generator[NMEAMessage, None, None]:
buffer: typing.Dict[typing.Tuple[int, str], typing.List[typing.Optional[NMEAMessage]]] = {}
messages = self._iter_messages()
Expand All @@ -99,6 +155,7 @@ def _assemble_messages(self) -> Generator[NMEAMessage, None, None]:
for line in messages:
try:
sentence = NMEASentenceFactory.produce(line)
self.__add_to_tbq(sentence)
if sentence.TYPE == GatehouseSentence.TYPE:
sentence = cast(GatehouseSentence, sentence)
self.__set_last_wrapper_msg(sentence)
Expand Down Expand Up @@ -143,8 +200,8 @@ def _iter_messages(self) -> Generator[bytes, None, None]:

class IterMessages(AssembleMessages):

def __init__(self, messages: Iterable[bytes]):
super().__init__()
def __init__(self, messages: Iterable[bytes], tbq: typing.Optional[TagBlockQueue] = None):
super().__init__(tbq=tbq)
# If the user passes a single byte string make it into a list
if isinstance(messages, bytes):
messages = [messages, ]
Expand Down Expand Up @@ -176,13 +233,17 @@ def _iter_messages(self) -> Generator[bytes, None, None]:

class Stream(AssembleMessages, Generic[F], ABC):

def __init__(self, fobj: F, preprocessor: typing.Optional[PreprocessorProtocol] = None) -> None:
def __init__(
self, fobj: F,
preprocessor: typing.Optional[PreprocessorProtocol] = None,
tbq: typing.Optional[TagBlockQueue] = None
) -> None:
"""
Create a new Stream-like object.
@param fobj: A file-like or socket object.
@param preprocessor: An optional preprocessor
"""
super().__init__()
super().__init__(tbq=tbq)
self._fobj: F = fobj
self.preprocessor = preprocessor

Expand Down Expand Up @@ -211,8 +272,13 @@ def read(self) -> Generator[bytes, None, None]:
class BinaryIOStream(Stream[BinaryIO]):
"""Read messages from a file-like object"""

def __init__(self, file: BinaryIO, preprocessor: typing.Optional[PreprocessorProtocol] = None) -> None:
super().__init__(file, preprocessor=preprocessor)
def __init__(
self,
file: BinaryIO,
preprocessor: typing.Optional[PreprocessorProtocol] = None,
tbq: typing.Optional[TagBlockQueue] = None
) -> None:
super().__init__(file, preprocessor=preprocessor, tbq=tbq)

def read(self) -> Generator[bytes, None, None]:
yield from self._fobj
Expand All @@ -223,7 +289,13 @@ class FileReaderStream(BinaryIOStream):
Read NMEA messages from file
"""

def __init__(self, filename: typing.Union[str, pathlib.Path], mode: str = "rb", preprocessor: typing.Optional[PreprocessorProtocol] = None) -> None:
def __init__(
self,
filename: typing.Union[str, pathlib.Path],
mode: str = "rb",
preprocessor: typing.Optional[PreprocessorProtocol] = None,
tbq: typing.Optional[TagBlockQueue] = None
) -> None:
self.filename: typing.Union[str, pathlib.Path] = filename
self.mode: str = mode
# Try to open file
Expand All @@ -232,17 +304,22 @@ def __init__(self, filename: typing.Union[str, pathlib.Path], mode: str = "rb",
file = cast(BinaryIO, file)
except Exception as e:
raise FileNotFoundError(f"Could not open file {self.filename}") from e
super().__init__(file, preprocessor=preprocessor)
super().__init__(file, preprocessor=preprocessor, tbq=tbq)


class ByteStream(Stream[None]):
"""
Takes a iterable that contains ais messages as bytes and assembles them.
"""

def __init__(self, iterable: Iterable[bytes], preprocessor: typing.Optional[PreprocessorProtocol] = None) -> None:
def __init__(
self,
iterable: Iterable[bytes],
preprocessor: typing.Optional[PreprocessorProtocol] = None,
tbq: typing.Optional[TagBlockQueue] = None
) -> None:
self.iterable: Iterable[bytes] = iterable
super().__init__(None, preprocessor=preprocessor)
super().__init__(None, preprocessor=preprocessor, tbq=tbq)

def __exit__(self, exc_type: object, exc_val: object, exc_tb: object) -> None:
return
Expand Down Expand Up @@ -285,10 +362,16 @@ def read(self) -> Generator[bytes, None, None]:

class UDPReceiver(SocketStream):

def __init__(self, host: str, port: int, preprocessor: typing.Optional[PreprocessorProtocol] = None) -> None:
def __init__(
self,
host: str,
port: int,
preprocessor: typing.Optional[PreprocessorProtocol] = None,
tbq: typing.Optional[TagBlockQueue] = None
) -> None:
sock: socket = socket(AF_INET, SOCK_DGRAM)
sock.bind((host, port))
super().__init__(sock, preprocessor=preprocessor)
super().__init__(sock, preprocessor=preprocessor, tbq=tbq)

def recv(self) -> bytes:
return self._fobj.recvfrom(self.BUF_SIZE)[0]
Expand All @@ -303,11 +386,17 @@ class TCPConnection(SocketStream):
def recv(self) -> bytes:
return self._fobj.recv(self.BUF_SIZE)

def __init__(self, host: str, port: int = 80, preprocessor: typing.Optional[PreprocessorProtocol] = None) -> None:
def __init__(
self,
host: str,
port: int = 80,
preprocessor: typing.Optional[PreprocessorProtocol] = None,
tbq: typing.Optional[TagBlockQueue] = None
) -> None:
sock: socket = socket(AF_INET, SOCK_STREAM)
try:
sock.connect((host, port))
except ConnectionRefusedError as e:
sock.close()
raise ConnectionRefusedError(f"Failed to connect to {host}:{port}") from e
super().__init__(sock, preprocessor=preprocessor)
super().__init__(sock, preprocessor=preprocessor, tbq=tbq)
2 changes: 1 addition & 1 deletion tests/test_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@ def test_run_every_file(self):
if csv_file.exists():
csv_file.unlink()

assert i == 21
assert i == 22
Loading

0 comments on commit 816de54

Please sign in to comment.