Skip to content

Commit

Permalink
Add aio_paho MQTT client backend and example
Browse files Browse the repository at this point in the history
New client backend aio_paho for JSON-RPC over MQTT.

It's dependency library asyncio-paho needs python >= 3.8,
updated in pyproject.toml accodringly.
  • Loading branch information
Bernhard Kaindl committed Aug 26, 2022
1 parent 0f21f4d commit 17acf9c
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 4 deletions.
1 change: 1 addition & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ Extra requirements

- `aiohttp <https://aiohttp.readthedocs.io>`_
- `aio_pika <https://aio-pika.readthedocs.io>`_
- `asyncio-paho <https://github.com/toreamun/asyncio-paho>`_
- `flask <https://flask.palletsprojects.com>`_
- `jsonschema <https://python-jsonschema.readthedocs.io>`_
- `kombu <https://kombu.readthedocs.io/en/stable/>`_
Expand Down
1 change: 1 addition & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ Extra requirements

- `aiohttp <https://aiohttp.readthedocs.io>`_
- `aio_pika <https://aio-pika.readthedocs.io>`_
- `asyncio-paho <https://github.com/toreamun/asyncio-paho>`_
- `flask <https://flask.palletsprojects.com>`_
- `jsonschema <https://python-jsonschema.readthedocs.io>`_
- `kombu <https://kombu.readthedocs.io/en/stable/>`_
Expand Down
7 changes: 7 additions & 0 deletions docs/source/pjrpc/examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ Examples
========


aio_paho client
---------------

.. literalinclude:: ../../../examples/aio_paho_client.py
:language: python


aio_pika client
---------------

Expand Down
43 changes: 43 additions & 0 deletions examples/aio_paho_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import asyncio
import logging
from os import environ

import pjrpc
from pjrpc.client.backend import aio_paho


async def main() -> None:
broker = environ.get("MQTT_BROKER")
assert broker
request_topic = environ.get("MQTT_RPC_REQUEST_TOPIC", "")
response_topic = environ.get("MQTT_RPC_RESPONSE_TOPIC", "")
clientid = environ.get("MQTT_CLIENTID", "")
username = environ.get("MQTT_USERNAME", "")
password = environ.get("MQTT_PASSWORD", "")

client = aio_paho.Client(
broker=broker,
request_topic=request_topic,
response_topic=response_topic,
clientid=clientid,
username=username,
password=password,
)
await client.connect(debug=True)

response = await client.send(pjrpc.Request('get_methods', params=None, id=1))
assert response
print(response.result)

result = await client('get_methods')
print(result)

result = await client.proxy.get_methods()
print(result)

await client.notify('schedule_shutdown')


if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
asyncio.run(main())
140 changes: 140 additions & 0 deletions pjrpc/client/backend/aio_paho.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
"""Based on pjrpc/client/backend/aio_pika.py (but much simpler due to MQTT),
this module implements the JSON-RPC backend for JSON-RPC over MQTT brokers"""
import asyncio
import logging
from typing import Any, List, Optional

import paho.mqtt.client as paho # type: ignore
from asyncio_paho import AsyncioPahoClient # type: ignore

from pjrpc.client import AbstractAsyncClient

debug = logging.getLogger(__package__).debug


class Client(AbstractAsyncClient):
"""
JSON-RPC client based on `asyncio-mqtt <https://github.com/sbtinstruments/asyncio-mqtt/>`_
:param broker: MQTT broker
:param request_topic: MQTT topic for publishing RPC requests
:param response_topic: MQTT topic for receiving RPC responses
:param clientid: MQTT Client Id for connecting to the MQTT broker
:param username: MQTT user name for connecting to the MQTT broker
:param password: MQTT password for connecting to the MQTT broker
:param port: Port number used by the MQTT broker(default: 1884)
:param queue_name: queue name to publish requests to
:param kwargs: parameters to be passed to :py:class:`pjrpc.client.AbstractClient`
"""

def __init__(
self,
broker: str,
request_topic: str,
response_topic: str,
clientid: Optional[str],
username: Optional[str],
password: Optional[str],
port: Optional[int] = 1884,
**kwargs: Any,
):
super().__init__(**kwargs)
self._broker = broker
self._port = port
self._request_topic = request_topic
self._response_topic = response_topic
self._clientid = clientid
self._username = username
self._password = password

async def connect(self, **kwargs: Any) -> None:
"""Opens a connection to the broker.
:param kwargs: parameters to be passed to :py:class:`asyncio_paho.AsyncioPahoClient`
"""
subscribe_result: tuple[int, int] = (-1, -1)
self._subscribed_future: asyncio.Future[str] = asyncio.Future()
self._rpc_futures: List[asyncio.Future[str]] = []
self._debug = kwargs.pop("debug", False)

def on_connect(
client: paho.Client,
userdata: Any,
flags_dict: dict[str, Any],
result: int,
) -> None:
# pylint: disable=unused-argument
nonlocal subscribe_result
if self._debug:
debug(f"aio_paho: Connected, subscribe to: {self._response_topic}")
subscribe_result = client.subscribe(self._response_topic)
assert subscribe_result[0] == paho.MQTT_ERR_SUCCESS
if self._debug:
debug(f"aio_paho: Subscribed to {self._response_topic}")

def on_subscribe(
client: paho.Client,
userdata: Any,
mid: int,
granted_qos: tuple[int, ...],
) -> None:
# pylint: disable=unused-argument
if self._debug:
debug(f"aio_paho: Subscribed to: {self._response_topic}")
nonlocal subscribe_result
assert mid == subscribe_result[1]
self._subscribed_future.set_result("")

def on_message(client: paho.Client, userdt: Any, msg: paho.MQTTMessage) -> None:
# pylint: disable=unused-argument
if self._debug:
debug(f"aio_paho: Received from {msg.topic}: {str(msg.payload)}")
future = self._rpc_futures[-1]
future.set_result(msg.payload.decode())

def on_connect_fail(client: paho.Client, userdata: Any) -> None:
# pylint: disable=unused-argument
debug("aio_paho: Connect failed")

def on_log(client: paho.Client, userdata: Any, level: int, buf: Any) -> None:
# pylint: disable=unused-argument
debug(f"aio_paho: {buf}")

self._client = AsyncioPahoClient(self._clientid or "", **kwargs)
if self._password:
self._client.username_pw_set(self._username, self._password)
self._client.on_connect = on_connect
self._client.on_connect_fail = on_connect_fail
self._client.on_subscribe = on_subscribe
self._client.on_message = on_message
if self._debug:
self._client.on_log = on_log

self._client.connect_async(self._broker)
await self._subscribed_future

async def close(self) -> None:
"""Close the current connection to the MQTT broker and send exceptions."""
await self._client.close()
for future in self._rpc_futures:
if future.done():
continue
future.set_exception(asyncio.CancelledError)

async def _request(
self,
request_text: str,
is_notification: bool = False,
**kwargs: Any,
) -> Optional[str]:
"""Publish an RPC request to the MQTT topic and return the received result"""
if not is_notification:
future: asyncio.Future[str] = asyncio.Future()
self._rpc_futures.append(future)
if self._debug:
debug(f"aio_paho: {self._request_topic}: publish '{request_text}'")
self._client.publish(self._request_topic, request_text.encode())
if is_notification:
return None
received = await future
self._rpc_futures.pop()
return received
13 changes: 9 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ documentation = "https://pjrpc.readthedocs.io/en/latest/"
keywords = [
'json-rpc', 'rpc', 'jsonrpc-client', 'jsonrpc-server',
'requests', 'aiohttp', 'flask', 'httpx', 'aio-pika', 'kombu',
'openapi', 'openrpc', 'starlette', 'django',
'openapi', 'openrpc', 'starlette', 'django', 'mqtt'
]
classifiers = [
"Development Status :: 5 - Production/Stable",
Expand All @@ -26,17 +26,17 @@ classifiers = [
"Topic :: Software Development :: Libraries",
"Topic :: Software Development :: Libraries :: Application Frameworks",
"Programming Language :: Python",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
]


[tool.poetry.dependencies]
python = "^3.7"
python = "^3.8"
aio-pika = { version = "^8.0", optional = true }
aiofiles = { version = "^0.7", optional = true }
asyncio-paho = { version = "^0.5", optional = true }
aiohttp = { version = "^3.7", optional = true }
django = { version = "^3.0", optional = true }
docstring-parser = { version = "^0.8", optional = true }
Expand All @@ -55,6 +55,7 @@ sphinx = { version = "~=4.5", optional = true}
[tool.poetry.extras]
aio-pika = ['aio-pika']
aiohttp = ['aiohttp']
asyncio-paho = ['asyncio-paho']
django = ['django']
docstring-parser = ['docstring-parser']
flask = ['flask', 'markupsafe']
Expand All @@ -67,7 +68,7 @@ requests = ['requests']
starlette = ['starlette', 'aiofiles']
test = ['docstring-parser', 'flask', 'jsonschema', 'openapi-ui-bundles', 'pydantic', 'werkzeug']
werkzeug = ['werkzeug']
docgen = ['sphinx', 'aiohttp', 'aio-pika', 'flask', 'jsonschema', 'pydantic', 'requests', 'kombu']
docgen = ['sphinx', 'aiohttp', 'aio-pika', 'asyncio-paho', 'flask', 'jsonschema', 'pydantic', 'requests', 'kombu']

[tool.poetry.dev-dependencies]
aioresponses = "^0.7"
Expand Down Expand Up @@ -105,6 +106,10 @@ ignore_missing_imports = true
module = "aio_pika.*"
ignore_missing_imports = true

[[tool.mypy.overrides]]
module = "asyncio-paho.*"
ignore_missing_imports = true

[[tool.mypy.overrides]]
module = "docstring_parser.*"
ignore_missing_imports = true
Expand Down

0 comments on commit 17acf9c

Please sign in to comment.