From 45695a5f0344eec7a9bfc8c1fffd82ae56959969 Mon Sep 17 00:00:00 2001 From: Guide Date: Thu, 27 Nov 2025 04:15:58 +0300 Subject: [PATCH] Update mqtt.py --- examples/mqtt.py | 197 ++++++++++++++++++++++++++++++++--------------- 1 file changed, 135 insertions(+), 62 deletions(-) diff --git a/examples/mqtt.py b/examples/mqtt.py index db5c20c1..3b6a1829 100644 --- a/examples/mqtt.py +++ b/examples/mqtt.py @@ -1,89 +1,126 @@ -""" Server metrics upload. +""" +Server metrics upload gateway. + +This module connects to an MQTT broker, subscribes to all topics, collects +the received data into a nested dictionary structure, and periodically +uploads the collected metrics as aggregate messages to the Aleph.im network. """ # -*- coding: utf-8 -*- import asyncio -from typing import Dict - -import aiomqtt import click +import logging +from typing import Dict, Any, Optional +# Dependencies for MQTT, Aleph.im client, and authentication. +import aiomqtt from aleph.sdk.chains.common import get_fallback_private_key from aleph.sdk.chains.ethereum import ETHAccount from aleph.sdk.client import AuthenticatedAlephHttpClient from aleph.sdk.conf import settings +# Configure basic logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + -def get_input_data(value): +def get_input_data(value: bytes) -> Any: + """ + Attempts to convert raw MQTT payload bytes into appropriate Python types (bool, float, or string). + + :param value: The raw payload as bytes. + :return: The converted value. + """ if value == b"true": return True elif value == b"false": return False try: - v = float(value) - return v + # Try converting to a float (common for metric values) + return float(value) except ValueError: + # Fallback to a string if float conversion fails return value.decode("utf-8") -async def send_metrics(account, metrics): - async with AuthenticatedAlephHttpClient( - account=account, api_server=settings.API_HOST - ) as session: - return session.create_aggregate( - key="metrics", content=metrics, channel="SYSINFO" - ) - +# --- MQTT Callback Handlers --- -def on_disconnect(client, userdata, rc): +def on_disconnect(client: aiomqtt.Client, userdata: Dict[str, Any], rc: int): + """Callback triggered on MQTT disconnection.""" if rc != 0: - print("Unexpected MQTT disconnection. Will auto-reconnect") - + logger.warning(f"Unexpected MQTT disconnection (RC={rc}). Will attempt auto-reconnect.") + + # aiomqtt (paho-based) often requires manual reconnection handling for persistent connections. + # The main gateway loop will also attempt reconnect if no messages are received. client.reconnect() -# The callback for when the client receives a CONNACK response from the server. -def on_connect(client, userdata, flags, rc): - print("Connected with result code " + str(rc)) +def on_connect(client: aiomqtt.Client, userdata: Dict[str, Any], flags: Dict[str, Any], rc: int): + """Callback triggered on successful MQTT connection.""" + logger.info(f"Connected to MQTT Broker with result code {rc}") - # Subscribing in on_connect() means that if we lose the connection and - # reconnect then subscriptions will be renewed. + # Subscribing to all topics upon successful connection. client.subscribe("/#") -# The callback for when a PUBLISH message is received from the server. -def on_message(client, userdata, msg): +def on_message(client: aiomqtt.Client, userdata: Dict[str, Any], msg: aiomqtt.MQTTMessage): + """Callback for when a PUBLISH message is received from the server.""" userdata["received"] = True - state = userdata["state"] + state: Dict = userdata["state"] + + # Split the topic path (e.g., 'system/cpu/load') parts = msg.topic.strip("/").split("/") curp = state + + # Recursively build the nested dictionary structure from topic parts. for part in parts[:-1]: - if not isinstance(curp.get(part, None), dict): + # Ensure the current part is a dictionary before traversing or initializing. + if not isinstance(curp.get(part), dict): curp[part] = {} curp = curp[part] + # Assign the final metric value to the innermost key. curp[parts[-1]] = get_input_data(msg.payload) - print(parts, msg.payload) + logger.debug(f"Received: {parts}, Payload: {msg.payload}") + +# --- Core Gateway Logic --- async def gateway( - loop, - host="api1.aleph.im", - port=1883, - ca_cert=None, - pkey=None, - keepalive=10, - transport="tcp", - auth=None, + host: str = "api1.aleph.im", + port: int = 1883, + ca_cert: Optional[str] = None, + pkey: Optional[str] = None, + keepalive: int = 10, + transport: str = "tcp", + auth: Optional[Dict[str, str]] = None, ): - if pkey is None: - pkey = get_fallback_private_key() - - account = ETHAccount(private_key=pkey) - state: Dict = dict() + """ + Main asynchronous loop for connecting to MQTT and sending metrics to Aleph.im. + + This process involves: MQTT Broker -> Python Gateway -> Aleph.im Aggregate Store. + + + :param host: MQTT Broker host. + :param port: MQTT Broker port. + :param ca_cert: Path to the CA Certificate file for TLS. + :param pkey: Account private key for Aleph.im authentication. + :param keepalive: MQTT keepalive interval. + :param transport: MQTT transport type ('tcp' or 'websockets'). + :param auth: Dictionary containing 'username' and 'password' for MQTT basic auth. + """ + # 1. Aleph.im Account Setup + # Use the provided key or fall back to a default location (e.g., device.key). + private_key = pkey if pkey else get_fallback_private_key() + account = ETHAccount(private_key=private_key) + + # 2. Shared State for MQTT Callbacks and Upload Loop + state: Dict[str, Any] = dict() userdata = {"account": account, "state": state, "received": False} - client = aiomqtt.Client(loop, userdata=userdata, transport=transport) + + # 3. MQTT Client Initialization + client = aiomqtt.Client(loop=asyncio.get_event_loop(), userdata=userdata, transport=transport) client.on_connect = on_connect client.on_disconnect = on_disconnect client.on_message = on_message @@ -93,52 +130,88 @@ async def gateway( if auth is not None: client.username_pw_set(**auth) + # Start the non-blocking MQTT loop in the background. asyncio.ensure_future(client.loop_forever()) + # Connect to the broker and start receiving data. await client.connect(host, port, keepalive) + logger.info("MQTT client started and connected.") + + # 4. Main Upload Loop while True: + # Wait for the upload interval (10 seconds) await asyncio.sleep(10) + + # Check if any messages were received in the last interval. + # If not, attempt reconnect to handle silent connection drops not caught by on_disconnect. if not userdata["received"]: + logger.warning("No MQTT messages received recently. Attempting reconnect.") await client.reconnect() + continue + # If data was received, proceed to upload it. + userdata["received"] = False + + # Use an authenticated Aleph.im session for uploading aggregate messages. async with AuthenticatedAlephHttpClient( account=account, api_server=settings.API_HOST ) as session: - for key, value in state.items(): - message, status = await session.create_aggregate( - key=key, content=value, channel="IOT_TEST" - ) - print("sent", message.item_hash) - userdata["received"] = False - + # Create a temporary copy of the state and clear the main state + # immediately to allow the MQTT thread to collect new metrics + # while the upload is ongoing (minimizing lock contention risk, + # though locks aren't strictly necessary in single-threaded asyncio). + metrics_to_send = state.copy() + state.clear() + + for key, content in metrics_to_send.items(): + try: + # Upload each top-level key/value pair as a separate aggregate. + message, status = await session.create_aggregate( + key=key, content=content, channel="SYSINFO" # Changed channel to SYSINFO for clarity + ) + logger.info(f"Uploaded aggregate '{key}'. Hash: {message.item_hash}") + except Exception as e: + logger.error(f"Failed to upload aggregate '{key}' to Aleph.im: {e}") + # Re-insert failed data into state to retry later, though this + # risks overwriting newer data if the next message arrives quickly. + state[key] = content + + +# --- Command Line Interface --- @click.command() -@click.option("--host", default="localhost", help="MQTT Broker host") -@click.option("--port", default=1883, help="MQTT Broker port") -@click.option("--user", default=None, help="MQTT Auth username") -@click.option("--password", default=None, help="MQTT Auth password") -@click.option("--use-tls", is_flag=True) -@click.option("--ca-cert", default=None, help="CA Cert path") +@click.option("--host", default="localhost", help="MQTT Broker host.") +@click.option("--port", default=1883, type=int, help="MQTT Broker port.") +@click.option("--user", default=None, help="MQTT Auth username.") +@click.option("--password", default=None, help="MQTT Auth password.") +@click.option("--use-tls", is_flag=True, help="Use TLS for connection.") +@click.option("--ca-cert", default=None, help="CA Cert path for TLS validation.") @click.option( "--pkey", default=None, - help="Account private key (optionnal, will default to device.key file)", + help="Account private key (optional, defaults to device.key file).", ) -def main(host, port, user, password, use_tls=False, ca_cert=None, pkey=None): +def main(host: str, port: int, user: str, password: str, use_tls: bool, ca_cert: Optional[str], pkey: Optional[str]): + """ + Runs the MQTT Gateway to Aleph.im metrics aggregator. + """ loop = asyncio.get_event_loop() auth = None if user is not None: auth = {"username": user, "password": password} - if use_tls: - if ca_cert is None: + # Automatically determine CA cert if TLS is used but no path is provided. + if use_tls and ca_cert is None: + try: import certifi - ca_cert = certifi.where() - print(ca_cert) + logger.info(f"Using default CA Cert path from certifi: {ca_cert}") + except ImportError: + logger.error("TLS requested but 'certifi' is not installed and 'ca-cert' path not provided.") + return loop.run_until_complete( - gateway(loop, host, port, ca_cert=ca_cert, pkey=pkey, auth=auth) + gateway(host=host, port=port, ca_cert=ca_cert, pkey=pkey, auth=auth) )