Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 63 additions & 5 deletions packetraven/connections/serial.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from abc import ABC
from datetime import datetime

from kiss import KISS
from serial import Serial

from packetraven.connections.base import (
Expand All @@ -9,17 +11,22 @@
TimeIntervalError,
)
from packetraven.packets import APRSPacket
from packetraven.packets.parsing import InvalidPacketError


class SerialTNC(APRSPacketSource):
def __init__(self, serial_port: str = None, callsigns: [str] = None):
class SerialAPRSConnection(APRSPacketSource, ABC):
def __init__(self, serial_port: str = None, callsigns: [str] = None, baudrate: int = None):
"""
Connect to TNC over given serial port.
Connect to a given serial port.

:param serial_port: port name
:param callsigns: list of callsigns to return from source
:param baudrate: baud rate of serial communication
"""

if baudrate is None:
baudrate = 9600

if serial_port is None or serial_port == '' or serial_port == 'auto':
try:
serial_port = next_open_serial_port()
Expand All @@ -28,8 +35,19 @@ def __init__(self, serial_port: str = None, callsigns: [str] = None):
else:
serial_port = serial_port.strip('"')

self.serial_connection = Serial(serial_port, baudrate=9600, timeout=1)
super().__init__(self.serial_connection.port, callsigns)
super().__init__(serial_port, callsigns)

self.__baudrate = baudrate

@property
def baudrate(self) -> int:
return self.__baudrate


class SerialTNC(SerialAPRSConnection):
def __init__(self, serial_port: str = None, callsigns: [str] = None, baudrate: int = None):
super().__init__(serial_port, callsigns, baudrate)
self.serial_connection = Serial(port=self.location, baudrate=self.baudrate, timeout=1)
self.__last_access_time = None

@property
Expand Down Expand Up @@ -57,3 +75,43 @@ def close(self):

def __repr__(self):
return f'{self.__class__.__name__}({repr(self.location)}, {repr(self.callsigns)})'


class SerialKISS(APRSPacketSource):
def __init__(self, serial_port: str = None, callsigns: [str] = None, baudrate: int = None):
super().__init__(serial_port, callsigns, baudrate)
self.serial_connection = KISS(port=self.location, speed=self.baudrate, timeout=1)
self.__last_access_time = None

@property
def packets(self) -> [APRSPacket]:
if self.__last_access_time is not None and self.interval is not None:
interval = datetime.now() - self.__last_access_time
if interval < self.interval:
raise TimeIntervalError(
f'interval {interval} less than minimum interval {self.interval}'
)

packets = []

def add_frames(frame: str):
try:
packet = APRSPacket.from_frame(frame)
if self.callsigns is not None:
if packet.from_callsign in self.callsigns:
if packet not in packets:
packets.append(packet)
except InvalidPacketError:
pass

self.serial_connection.start()
self.serial_connection.read(callback=add_frames, readmode=False)

self.__last_access_time = datetime.now()
return packets

def close(self):
self.serial_connection.stop()

def __repr__(self):
return f'{self.__class__.__name__}({repr(self.location)}, {repr(self.callsigns)})'