Replies: 1 comment 1 reply
-
Hi Vaarlion, thanks for opening this discussion. Let me have a look. :) You have a good starting point with your class Mqtt_server():
def __init__(self, server, port, username, password, client_id, topic, tls) -> None:
self.server = server
self.port = port
self.username = username
self.password = password
self.client_id = client_id
self.topic = topic
self.logger = logging.getLogger(__name__)
self.logger.level = logger_level
self.tls_params = self.setup_tls(tls)
def setup_tls(self, enabled):
if enabled:
tls_params = aiomqtt.TLSParameters(
ca_certs=None,
certfile=None,
keyfile=None,
cert_reqs=ssl.CERT_REQUIRED,
tls_version=ssl.PROTOCOL_TLS,
ciphers=None,
)
else:
tls_params = None
return tls_params
async def _maintain_connection(self) -> None:
reconnect_interval = 5
while True:
try:
self.logger.info("Connecting to {} MQTT server".format(self.server))
await self.client.connect()
except aiomqtt.MqttError as error:
self.logger.error(f'MQTT error "{error}". Reconnecting in {reconnect_interval} seconds.')
await asyncio.sleep(reconnect_interval)
async def send_update(self, device_name, device_data) -> None:
retry_interval = 5
data_sent = False
max_retry = 5
while not data_sent and max_retry > 0:
try:
await self.client.publish(
"{}/{}".format(self.topic, device_name),
payload = device_data
)
data_sent = True
except aiomqtt.MqttError as error:
max_retry -= 1
self.logger.error(f'MQTT error "{error}". Rerying {max_retry} still in {retry_interval} seconds.')
await asyncio.sleep(retry_interval)
async def __aenter__(self):
self.client = aiomqtt.Client(
hostname = self.server,
port = self.port,
username = self.username,
password = self.password,
client_id = self.client_id,
tls_params = self.tls_params,
keepalive = 60,
)
self.client.__aenter__()
self._reconnect_task = asyncio.create_task(self._reconnect_task())
return self
async def __aexit__(self, exc_type, exc_value, traceback):
self.logger.info("Stopping MQTT")
self._reconnect_task.cancel()
await self._reconnect_task
await self.client.__aexit__(exc_type, exc_value, traceback)
class Taks1():
...
class Task2():
...
async def main():
tasks = []
mqtt_server = Mqtt_server(...)
try:
async with mqtt_server:
task1 = Task1(..., mqtt_server.send_update)
task2 = Task2(..., mqtt_server.send_update)
task3 = Task1(..., mqtt_server.send_update)
tasks.append(asyncio.create_task(task1.start()))
tasks.append(asyncio.create_task(task2.start()))
tasks.append(asyncio.create_task(task3.start()))
await asyncio.gather(*tasks)
except Exception as error:
logger.error(f'General error "{error}". Closing process')
finally:
await taks1.stop()
await taks2.stop()
await taks3.stop() Hope that makes sense. Basically, I just moved There are some edge cases here and there that may cause some trouble but it's a good start. Best regards, |
Beta Was this translation helpful? Give feedback.
1 reply
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
Hi !
I've start working with asyncio-mqtt for a side project of mine, and i wanted async to have different task managing different data source in my code.
Because of this, and before finding https://sbtinstruments.github.io/asyncio-mqtt/sharing-the-connection.html#why-can-t-i-connect-disconnect-manually, i structured it this way (simplified, and WIP that don't work)
Given this, with the obvious goal of multiple task running in //, i feel like i can't use what's in the docs and the
async with aiomqtt.Client(...) as client:
format, so I'm looking for suggestion :DBeta Was this translation helpful? Give feedback.
All reactions