Skip to content

Commit

Permalink
Prevent decode function from accesing data that is out of bounds (#41)
Browse files Browse the repository at this point in the history
* Prevent decode function from accesing data that is out of bounds

* Make variable length messages actually check the payloads length

* Add some more unittests

* Add changelog entry
  • Loading branch information
M0r13n authored Dec 5, 2021
1 parent 5527830 commit da2d434
Show file tree
Hide file tree
Showing 7 changed files with 240 additions and 50 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
====================
pyais CHANGELOG
====================
-------------------------------------------------------------------------------
Version 1.6.3 5 Dec 2021
-------------------------------------------------------------------------------

* Correctly handles variable length messages (https://github.com/M0r13n/pyais/issues/40)
- prior versions were missing required length checks
- therefore some messages were not correctly decoded
- this affects Type 7, 15, 16, 20, 22, 24, 25 and 26

-------------------------------------------------------------------------------
Version 1.6.2 2 May 2021
-------------------------------------------------------------------------------
Expand Down
130 changes: 92 additions & 38 deletions pyais/decode.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
NavAid
)
from pyais.exceptions import UnknownMessageException, MissingMultipartMessageException, TooManyMessagesException
from pyais.util import get_int, encode_bin_as_ascii6, get_mmsi
from pyais.util import get_int, encode_bin_as_ascii6, get_mmsi, binary_data


def decode_msg_1(bit_arr: bitarray.bitarray) -> Dict[str, Any]:
Expand Down Expand Up @@ -128,7 +128,7 @@ def decode_msg_6(bit_arr: bitarray.bitarray) -> Dict[str, Any]:
'retransmit': bit_arr[70],
'dac': get_int_from_data(72, 82),
'fid': get_int_from_data(82, 88),
'data': bit_arr[88:].to01()
'data': binary_data(bit_arr, 88)
}


Expand All @@ -138,19 +138,39 @@ def decode_msg_7(bit_arr: bitarray.bitarray) -> Dict[str, Any]:
Src: https://gpsd.gitlab.io/gpsd/AIVDM.html#_type_7_binary_acknowledge
"""
get_int_from_data = partial(get_int, bit_arr)
return {
length = len(bit_arr)
# Total length varies between 72 and 168 bits depending on the number of addressed ships
# Each address requires 32 bit
data = {
'type': get_int_from_data(0, 6),
'repeat': get_int_from_data(6, 8),
'mmsi': get_mmsi(bit_arr, 8, 38),
'mmsi1': get_mmsi(bit_arr, 40, 70),
'mmsiseq1': get_int_from_data(70, 72),
'mmsi2': get_mmsi(bit_arr, 72, 102),
'mmsiseq2': get_int_from_data(102, 104),
'mmsi3': get_mmsi(bit_arr, 104, 134),
'mmsiseq3': get_int_from_data(134, 136),
'mmsi4': get_mmsi(bit_arr, 136, 166),
'mmsiseq4': get_int_from_data(166, 168)
'mmsi2': None,
'mmsiseq2': None,
'mmsi3': None,
'mmsiseq3': None,
'mmsi4': None,
'mmsiseq4': None,
}
if 72 < length <= 104:
data['mmsi2'] = get_mmsi(bit_arr, 72, 102)
data['mmsiseq2'] = get_int_from_data(102, 104)
elif 104 < length <= 136:
data['mmsi2'] = get_mmsi(bit_arr, 72, 102)
data['mmsiseq2'] = get_int_from_data(102, 104)
data['mmsi3'] = get_mmsi(bit_arr, 104, 134)
data['mmsiseq3'] = get_int_from_data(134, 136)
if 136 < length:
data['mmsi2'] = get_mmsi(bit_arr, 72, 102)
data['mmsiseq2'] = get_int_from_data(102, 104)
data['mmsi3'] = get_mmsi(bit_arr, 104, 134)
data['mmsiseq3'] = get_int_from_data(134, 136)
data['mmsi4'] = get_mmsi(bit_arr, 136, 166)
data['mmsiseq4'] = get_int_from_data(166, 168)

return data


def decode_msg_8(bit_arr: bitarray.bitarray) -> Dict[str, Any]:
Expand All @@ -165,7 +185,7 @@ def decode_msg_8(bit_arr: bitarray.bitarray) -> Dict[str, Any]:
'mmsi': get_mmsi(bit_arr, 8, 38),
'dac': get_int_from_data(40, 50),
'fid': get_int_from_data(50, 56),
'data': bit_arr[56:].to01()
'data': binary_data(bit_arr, 56)
}


Expand Down Expand Up @@ -259,7 +279,7 @@ def decode_msg_15(bit_arr: bitarray.bitarray) -> Dict[str, Any]:
Src: https://gpsd.gitlab.io/gpsd/AIVDM.html#_type_15_interrogation
"""
get_int_from_data = partial(get_int, bit_arr)
return {
data = {
'type': get_int_from_data(0, 6),
'repeat': get_int_from_data(6, 8),
'mmsi': get_mmsi(bit_arr, 8, 38),
Expand All @@ -268,30 +288,46 @@ def decode_msg_15(bit_arr: bitarray.bitarray) -> Dict[str, Any]:
'offset1_1': get_int_from_data(76, 88),
'type1_2': get_int_from_data(90, 96),
'offset1_2': get_int_from_data(96, 108),
'mmsi2': get_mmsi(bit_arr, 110, 140),
'type2_1': get_int_from_data(140, 146),
'offset2_1': get_int_from_data(146, 157),
'mmsi2': None,
'type2_1': None,
'offset2_1': None,
}

if len(bit_arr) > 88:
# TODO (richter): there are more edge cases
data['mmsi2'] = get_mmsi(bit_arr, 110, 140)
data['type2_1'] = get_int_from_data(140, 146)
data['offset2_1'] = get_int_from_data(146, 157)

return data


def decode_msg_16(bit_arr: bitarray.bitarray) -> Dict[str, Any]:
"""
Assignment Mode Command
Src: https://gpsd.gitlab.io/gpsd/AIVDM.html#_type_16_assignment_mode_command
Src: https://gpsd.gitlab.io/gpsd/AIVDM.html#_type_16_assignment_mode_command
"""
get_int_from_data = partial(get_int, bit_arr)
return {
data = {
'type': get_int_from_data(0, 6),
'repeat': get_int_from_data(6, 8),
'mmsi': get_mmsi(bit_arr, 8, 38),
'mmsi1': get_mmsi(bit_arr, 40, 70),
'offset1': get_int_from_data(70, 82),
'increment1': get_int_from_data(82, 92),
'mmsi2': get_mmsi(bit_arr, 92, 122),
'offset2': get_int_from_data(122, 134),
'increment2': get_int_from_data(134, 144)
'mmsi2': None,
'offset2': None,
'increment2': None
}

if len(data) > 96:
# If the message is 96 bits long it should be interpreted as a channel assignment for two stations
data['mmsi2'] = get_mmsi(bit_arr, 92, 122)
data['offset2'] = get_int_from_data(122, 134)
data['increment2'] = get_int_from_data(134, 144)

return data


def decode_msg_17(bit_arr: bitarray.bitarray) -> Dict[str, Any]:
"""
Expand All @@ -305,7 +341,7 @@ def decode_msg_17(bit_arr: bitarray.bitarray) -> Dict[str, Any]:
'mmsi': get_mmsi(bit_arr, 8, 38),
'lon': get_int_from_data(40, 58, signed=True),
'lat': get_int_from_data(58, 75, signed=True),
'data': get_int_from_data(80, 816)
'data': binary_data(bit_arr, 80)
}


Expand Down Expand Up @@ -375,7 +411,7 @@ def decode_msg_20(bit_arr: bitarray.bitarray) -> Dict[str, Any]:
Src: https://gpsd.gitlab.io/gpsd/AIVDM.html#_type_20_data_link_management_message
"""
get_int_from_data = partial(get_int, bit_arr)
return {
data = {
'type': get_int_from_data(0, 6),
'repeat': get_int_from_data(8, 8),
'mmsi': get_mmsi(bit_arr, 8, 38),
Expand All @@ -385,22 +421,41 @@ def decode_msg_20(bit_arr: bitarray.bitarray) -> Dict[str, Any]:
'timeout1': get_int_from_data(56, 59),
'increment1': get_int_from_data(59, 70),

'offset2': get_int_from_data(70, 82),
'number2': get_int_from_data(82, 86),
'timeout2': get_int_from_data(86, 89),
'increment2': get_int_from_data(89, 100),
'offset2': None,
'number2': None,
'timeout2': None,
'increment2': None,

'offset3': get_int_from_data(100, 112),
'number3': get_int_from_data(112, 116),
'timeout3': get_int_from_data(116, 119),
'increment3': get_int_from_data(110, 130),
'offset3': None,
'number3': None,
'timeout3': None,
'increment3': None,

'offset4': get_int_from_data(130, 142),
'number4': get_int_from_data(142, 146),
'timeout4': get_int_from_data(146, 149),
'increment4': get_int_from_data(149, 160),
'offset4': None,
'number4': None,
'timeout4': None,
'increment4': None,
}

length = len(bit_arr)
if 100 <= length:
data['offset2'] = get_int_from_data(70, 82)
data['number2'] = get_int_from_data(82, 86)
data['timeout2'] = get_int_from_data(86, 89)
data['increment2'] = get_int_from_data(89, 100)
if 130 <= length:
data['offset3'] = get_int_from_data(100, 112)
data['number3'] = get_int_from_data(112, 116)
data['timeout3'] = get_int_from_data(116, 119)
data['increment3'] = get_int_from_data(119, 130)
if 160 <= length:
data['offset4'] = get_int_from_data(130, 142)
data['number4'] = get_int_from_data(142, 146)
data['timeout4'] = get_int_from_data(146, 149)
data['increment4'] = get_int_from_data(149, 160)

return data


def decode_msg_21(bit_arr: bitarray.bitarray) -> Dict[str, Any]:
"""
Expand Down Expand Up @@ -446,7 +501,6 @@ def decode_msg_22(bit_arr: bitarray.bitarray) -> Dict[str, Any]:
'type': get_int_from_data(0, 6),
'repeat': get_int_from_data(8, 8),
'mmsi': get_mmsi(bit_arr, 8, 38),

'channel_a': get_int_from_data(40, 52),
'channel_b': get_int_from_data(52, 64),
'txrx': get_int_from_data(64, 68),
Expand Down Expand Up @@ -570,11 +624,11 @@ def decode_msg_25(bit_arr: bitarray.bitarray) -> Dict[str, Any]:
if data['structured']:
d = {
'app_id': get_int_from_data(lo_ix, hi_ix),
'data': bit_arr[hi_ix:].to01()
'data': binary_data(bit_arr, hi_ix)
}
else:
d = {
'data': bit_arr[lo_ix:].to01()
'data': binary_data(bit_arr, lo_ix)
}
data.update(d)
return data
Expand Down Expand Up @@ -615,11 +669,11 @@ def decode_msg_26(bit_arr: bitarray.bitarray) -> Dict[str, Any]:
if data['structured']:
d = {
'app_id': get_int_from_data(lo_ix, hi_ix),
'data': bit_arr[hi_ix:radio_status_offset].to01()
'data': binary_data(bit_arr, hi_ix, radio_status_offset)
}
else:
d = {
'data': bit_arr[lo_ix:radio_status_offset].to01()
'data': binary_data(bit_arr, lo_ix, radio_status_offset)
}

data.update(d)
Expand Down
31 changes: 29 additions & 2 deletions pyais/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from collections import OrderedDict
from functools import partial, reduce
from operator import xor
from typing import Any, Generator, Hashable, TYPE_CHECKING, Callable
from typing import Any, Generator, Hashable, TYPE_CHECKING, Callable, Optional

from bitarray import bitarray

Expand Down Expand Up @@ -88,21 +88,48 @@ def get_int(data: bitarray, ix_low: int, ix_high: int, signed: bool = False) ->
:param ix_low: the lower index of the sub-array
:param ix_high: the upper index of the sub-array
:param signed: True if the value should be interpreted as a signed integer
:return: a normal integer (int)
:return: The integer value of the sub-array data[ix_low:ix_high]
"""
shift: int = (8 - ((ix_high - ix_low) % 8)) % 8
data = data[ix_low:ix_high]
i: int = from_bytes_signed(data) if signed else from_bytes(data)
return i >> shift


def binary_data(data: bitarray, ix_low: int, ix_high: Optional[int] = None) -> Optional[str]:
"""
Get a sub_array of a bitarray as bitstring.
:param data: some bitarray
:param ix_low: the lower index of the sub-array
:param ix_high: the upper index of the sub-array
:return: The integer value of the sub-array data[ix_low:ix_high]
"""
length = len(data)
if ix_high is None:
ix_high = length
if ix_low >= length or ix_high > length:
# Indices out of bounds
return None

return data[ix_low:ix_high].to01()


def get_mmsi(data: bitarray, ix_low: int, ix_high: int) -> str:
"""
A Maritime Mobile Service Identity (MMSI) is a series of nine digits.
Every digit is required and therefore we can NOT use a int.
See: issue #6
"""

mmsi_int: int = get_int(data, ix_low, ix_high)
if len(data) < ix_high:
# Remove padding from MMSIs shorter than 30 bits
mask = 0x3fffffff
d = ix_high - len(data)
mask ^= int(d * "1", 2)
mmsi_int &= mask

return str(mmsi_int).zfill(9)


Expand Down
Loading

0 comments on commit da2d434

Please sign in to comment.