Skip to content

Commit

Permalink
validate acknowledge messages.
Browse files Browse the repository at this point in the history
  • Loading branch information
gluap committed Jan 3, 2024
1 parent d664c70 commit ea373ac
Showing 1 changed file with 68 additions and 12 deletions.
80 changes: 68 additions & 12 deletions pyduofern/duofern_stick.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,13 @@
import logging
import os
import os.path
import random
import tempfile
import threading
import time
from dataclasses import dataclass
from queue import Queue, Empty
from typing import Dict

import serial
import serial.tools.list_ports
Expand Down Expand Up @@ -61,7 +65,8 @@ def hex(stuff):
duoRemotePair = "0D0006010000000000000000000000000000yyyyyy01"


MIN_MESSAGE_INTERVAL_MILLIS = 50
MIN_MESSAGE_INTERVAL_MILLIS = 25
RESEND_SECONDS = (2,4)

def refresh_serial_connection(function):
def new_funtion(*args, **kwargs):
Expand Down Expand Up @@ -119,7 +124,7 @@ def __init__(self, system_code=None, config_file_json=None, duofern_parser=None,

self.pairing = False
self.unpairing = False
self.write_queue = []
self.write_queue = Queue()
if not ephemeral:
self.config['system_code'] = self.system_code
self._dump_config()
Expand Down Expand Up @@ -162,7 +167,7 @@ def _initialize_recording(self):
def _initialize(self, **kwargs): # pragma: no cover
raise NotImplementedError("need to use an implementation of the Duofernstick")

def _simple_write(self, **kwargs): # pragma: no cover
def _simple_write(self, *args, **kwargs): # pragma: no cover
raise NotImplementedError("need to use an implementation of the Duofernstick")

def send(self, msg, **kwargs): # pragma: no cover
Expand All @@ -182,6 +187,9 @@ def process_message(self, message):
recorder.write("received {}\n".format(message))
recorder.flush()
if message[0:2] == '81':
if hasattr(self, "unacknowledged"):
if message[-14:-2] in self.unacknowledged:
del self.unacknowledged[message[-14:-2]]
return
if message[0:4] == '0602':
logger.info("got pairing reply")
Expand Down Expand Up @@ -248,11 +256,6 @@ def set_name(self, id, name):
self._initialize()
self.initialized=1

def handle_write_queue(self):
if len(self.write_queue) > 0:
tosend = self.write_queue.pop()
logger.info("sending {} from write queue, {} msgs left in queue".format(tosend, len(self.write_queue)))
self._simple_write(tosend)

def stop_pair(self):
self.send(duoStopPair)
Expand Down Expand Up @@ -437,6 +440,13 @@ async def handshake(self):
self.initialized = True


@dataclass
class WaitingMessage:
"""Class for keeping track of an item in inventory."""
message: str
next: datetime.datetime
retries: int = 5

class DuofernStickThreaded(DuofernStick, threading.Thread):
def __init__(self, serial_port=None, *args, **kwargs):
super().__init__(*args, **kwargs)
Expand All @@ -456,6 +466,9 @@ def __init__(self, serial_port=None, *args, **kwargs):
self.running = False
self.last_send = datetime.datetime.now()

self.rewrite_queue = Queue()
self.unacknowledged: Dict[str,WaitingMessage] = {}

def _read_answer(self, some_string): # ReadAnswer
"""read an answer..."""
logger.debug("should read {}".format(some_string))
Expand Down Expand Up @@ -569,6 +582,23 @@ def _simple_write(self, string_to_write): # SimpleWrite
self.serial_connection.open()
self.serial_connection.write(data_to_write)

def handle_write_queue(self):
try:
tosend = self.write_queue.get(block=False, timeout=None)
logger.info("sending {} from write queue, {} msgs left in queue".format(tosend, self.write_queue.qsize()))
self._simple_write(tosend)
self.unacknowledged[tosend[-14:-2]] = WaitingMessage(tosend, datetime.datetime.now()+datetime.timedelta(seconds=random.uniform(*RESEND_SECONDS)))
except Empty:
pass

def handle_rewrite_queue(self):
try:
tosend = self.rewrite_queue.get(block=False, timeout=None)
logger.info("SENDING {} from REwrite queue, {} msgs left in queue".format(tosend, self.rewrite_queue.qsize()))
self._simple_write(tosend)
except Empty:
pass

def command(self, *args, **kwargs):
if self.recording:
with open(self.record_filename, "a") as recorder:
Expand All @@ -579,12 +609,15 @@ def add_serial_and_send(self, msg):
message = msg.replace("zzzzzz", "6f" + self.system_code)
logger.debug("sending {}".format(message))
self.send(message)
logger.debug("added {} to write queue".format(message))

def run(self):
self.running = True
self._initialize()
last_resend_check = datetime.datetime.now()
toggle = False
while self.running:
toggle = not toggle

self.serial_connection.timeout = .05
if not self.serial_connection.isOpen():
self.serial_connection.open()
Expand All @@ -600,8 +633,31 @@ def run(self):
if in_data != duoACK:
self._simple_write(duoACK)
self.serial_connection.timeout = 1
if (len(self.write_queue) > 0) and ((datetime.datetime.now() - self.last_send) >= datetime.timedelta(milliseconds=MIN_MESSAGE_INTERVAL_MILLIS)):
self.handle_write_queue()
if not self.write_queue.empty() or not self.rewrite_queue.empty() and (
(datetime.datetime.now() - self.last_send) >= datetime.timedelta(milliseconds=MIN_MESSAGE_INTERVAL_MILLIS)):
if toggle:
self.handle_write_queue()
else:
self.handle_rewrite_queue()

if datetime.datetime.now() - last_resend_check > datetime.timedelta(seconds=0.1):
self.handle_resends()
last_resend_check = datetime.datetime.now()

def handle_resends(self):
logger.debug(self.unacknowledged)
done = set()
t = datetime.datetime.now()
for k in self.unacknowledged.keys():
if self.unacknowledged[k].retries == 0:
done.add(k)
elif self.unacknowledged[k].next < t:
self.unacknowledged[k].next = t + datetime.timedelta(seconds=random.uniform(*RESEND_SECONDS))
self.unacknowledged[k].retries -= 1
self.rewrite_queue.put(self.unacknowledged[k].message)
for d in done:
del self.unacknowledged[d]


def stop(self):
self.running = False
Expand All @@ -617,6 +673,6 @@ def unpair(self, timeout=10):

def send(self, msg, **kwargs):
logger.debug("sending {}".format(msg))
self.write_queue.append(msg)
self.write_queue.put_nowait(msg)
logger.debug("added {} to write queue".format(msg))
return

0 comments on commit ea373ac

Please sign in to comment.