From 81732ac44e4726f651f9682cf4a4fa8fad589133 Mon Sep 17 00:00:00 2001 From: Ric Evans <19216225+ric-evans@users.noreply.github.com> Date: Thu, 3 Feb 2022 16:45:33 -0600 Subject: [PATCH] First PR [minor] (#1) --- .circleci/config.yml | 64 ++++ .github/workflows/python-linters.yml | 29 ++ .github/workflows/semantic-release.yml | 23 ++ .github/workflows/try-setup-install.yml | 38 +++ .gitignore | 3 + README.md | 2 +- examples/nats_examples/nats_jetstream.py | 75 +++++ examples/run-with-wipactel-local.sh | 17 ++ examples/run.sh | 9 + examples/server.py | 48 +++ examples/worker.py | 36 +++ integrate_tests/test_nats.py | 33 ++ mqclient_nats/__init__.py | 20 ++ mqclient_nats/nats.py | 373 +++++++++++++++++++++++ mqclient_nats/py.typed | 0 mqclient_nats/queue.py | 21 ++ mqclient_nats/requirements.txt | 3 + requirements-dev.txt | 7 + resources/docker-nats-js.sh | 2 + resources/local-nats-js.sh | 8 + setup.cfg | 39 +++ setup.py | 26 ++ tox.ini | 15 + 23 files changed, 890 insertions(+), 1 deletion(-) create mode 100644 .circleci/config.yml create mode 100644 .github/workflows/python-linters.yml create mode 100644 .github/workflows/semantic-release.yml create mode 100644 .github/workflows/try-setup-install.yml create mode 100644 examples/nats_examples/nats_jetstream.py create mode 100755 examples/run-with-wipactel-local.sh create mode 100755 examples/run.sh create mode 100644 examples/server.py create mode 100644 examples/worker.py create mode 100644 integrate_tests/test_nats.py create mode 100644 mqclient_nats/__init__.py create mode 100644 mqclient_nats/nats.py create mode 100644 mqclient_nats/py.typed create mode 100644 mqclient_nats/queue.py create mode 100644 mqclient_nats/requirements.txt create mode 100644 requirements-dev.txt create mode 100755 resources/docker-nats-js.sh create mode 100755 resources/local-nats-js.sh create mode 100644 setup.cfg create mode 100644 setup.py create mode 100644 tox.ini diff --git a/.circleci/config.yml b/.circleci/config.yml new file mode 100644 index 0000000..de701d3 --- /dev/null +++ b/.circleci/config.yml @@ -0,0 +1,64 @@ +version: 2 + +jobs: + + integrate: + docker: + - image: circleci/python:3.8 + steps: + - checkout + - run: | + pip install --user tox + ./resources/local-nats-js.sh & + sleep 15 + tox integrate_tests -vv + + examples: + docker: + - image: circleci/python:3.8 + steps: + - checkout + - run: | + pip install --user tox + tox --notest -vv + . .tox/py/bin/activate + echo "Setting Up NATS Server with JetStream:" + ./resources/local-nats-js.sh & + sleep 15 + ./examples/run.sh + + + nats-examples: + docker: + - image: circleci/python:3.8 + steps: + - checkout + - run: | + pip install --user tox + tox --notest -vv + . .tox/py/bin/activate + echo "Setting Up NATS Server with JetStream:" + ./resources/local-nats-js.sh & + sleep 15 + python ./examples/nats_examples/nats_jetstream.py + + pycycle: + docker: + - image: circleci/python:3.8 + steps: + - checkout + - run: | + pip install --user tox pycycle + tox --notest -vv + . .tox/py/bin/activate + pycycle --here --verbose + + +workflows: + version: 2 + build_and_test: + jobs: + - integrate + - examples + - nats-examples + - pycycle \ No newline at end of file diff --git a/.github/workflows/python-linters.yml b/.github/workflows/python-linters.yml new file mode 100644 index 0000000..a13c8be --- /dev/null +++ b/.github/workflows/python-linters.yml @@ -0,0 +1,29 @@ +# For most recent version see: +# https://github.com/WIPACrepo/wipac-dev-tools/blob/main/.github/workflows/python-linters.yml +# Copy any updates to wipac-dev-tools. + +name: Python Linters + +on: [push] + +jobs: + + flake8: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: actions/setup-python@v2 + - run: pip install flake8 + - run: flake8 . --ignore=E203,E226,E228,E231,E501,W503,W504 + + mypy: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: actions/setup-python@v2 + - run: pip install mypy + - run: | + ([ -e "requirements-dev.txt" ] && pip install -r requirements-dev.txt) || echo "no dev reqs" + - run: | + ([ -e "setup.py" ] && pip install .) || pip install -r requirements.txt + - run: mypy --install-types --namespace-packages --non-interactive --exclude build/ . diff --git a/.github/workflows/semantic-release.yml b/.github/workflows/semantic-release.yml new file mode 100644 index 0000000..e34bae6 --- /dev/null +++ b/.github/workflows/semantic-release.yml @@ -0,0 +1,23 @@ +name: Semantic Release + +on: + push: + branches: + - main + - master + +jobs: + release: + runs-on: ubuntu-latest + + steps: + # Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it + - uses: actions/checkout@v2 + with: + fetch-depth: 0 + + - name: Python Semantic Release + uses: relekang/python-semantic-release@master + with: + github_token: ${{ secrets.GITHUB_TOKEN }} + # pypi_token: ${{ secrets.PYPI_TOKEN }} diff --git a/.github/workflows/try-setup-install.yml b/.github/workflows/try-setup-install.yml new file mode 100644 index 0000000..b3061f1 --- /dev/null +++ b/.github/workflows/try-setup-install.yml @@ -0,0 +1,38 @@ +name: Try Setup Install + +on: [push] + +jobs: + + gather-py3-versions: + runs-on: ubuntu-latest + outputs: + matrix: ${{ steps.py3_versions.outputs.matrix }} + steps: + - uses: actions/checkout@v2 + - name: Python Versions + id: py3_versions + run: | + minmin_maxmax=$(grep -P "\(\(\d+, ?\d+\), ?\(\d+, ?\d+\)\)" -oh setup.py | sed 's/[^0-9]/ /g') + IFS=', ' read -r -a array <<< "$minmin_maxmax" + min_thru_max_series=$(for i in `seq ${array[1]} ${array[3]}`; do printf "'3.$i',"; done | rev | cut -c 2- | rev) + echo ::set-output name=matrix::{\"py3_versions\":[$(echo $min_thru_max_series)]}\" + echo $min_thru_max_series + + pip-install: + needs: gather-py3-versions + runs-on: ubuntu-latest + + strategy: + max-parallel: 4 + fail-fast: false + matrix: ${{ fromJSON(needs.gather-py3-versions.outputs.matrix) }} + + steps: + - uses: actions/checkout@v2 + - uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.py3_versions }} + - run: | + pip install --upgrade pip wheel setuptools + pip install --editable . \ No newline at end of file diff --git a/.gitignore b/.gitignore index b6e4761..afe19aa 100644 --- a/.gitignore +++ b/.gitignore @@ -127,3 +127,6 @@ dmypy.json # Pyre type checker .pyre/ + +env*/ +nats-server* \ No newline at end of file diff --git a/README.md b/README.md index 14e7eed..1a8ee3d 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,2 @@ # MQClient-NATS -A NATS Message Queue Client API (Supporting the MQClient Interface) +A NATS Message Queue Client API with JetStream (Supporting the MQClient Interface) diff --git a/examples/nats_examples/nats_jetstream.py b/examples/nats_examples/nats_jetstream.py new file mode 100644 index 0000000..569ce9a --- /dev/null +++ b/examples/nats_examples/nats_jetstream.py @@ -0,0 +1,75 @@ +"""From https://github.com/nats-io/nats.py/blob/main/examples/jetstream.py.""" + +import asyncio + +import nats # type: ignore[import] + +# from nats.errors import TimeoutError + + +async def main(): + nc = await nats.connect("localhost") + + # Create JetStream context. + js = nc.jetstream() + + # Persist messages on 'foo's subject. + await js.add_stream(name="sample-stream", subjects=["foo"]) + + for i in range(0, 10): + ack = await js.publish("foo", f"hello world: {i}".encode()) + print(ack) + + # Create pull based consumer on 'foo'. + psub = await js.pull_subscribe("foo", "psub") + + # Fetch and ack messagess from consumer. + for i in range(0, 10): + msgs = await psub.fetch(1) + for msg in msgs: + print(msg) + + # Create single ephemeral push based subscriber. + sub = await js.subscribe("foo") + msg = await sub.next_msg() + await msg.ack() + + # Create single push based subscriber that is durable across restarts. + sub = await js.subscribe("foo", durable="myapp") + msg = await sub.next_msg() + await msg.ack() + + # Create deliver group that will be have load balanced messages. + async def qsub_a(msg): + print("QSUB A:", msg) + await msg.ack() + + async def qsub_b(msg): + print("QSUB B:", msg) + await msg.ack() + + await js.subscribe("foo", "workers", cb=qsub_a) + await js.subscribe("foo", "workers", cb=qsub_b) + + for i in range(0, 10): + ack = await js.publish("foo", f"hello world: {i}".encode()) + print("\t", ack) + + # Create ordered consumer with flow control and heartbeats + # that auto resumes on failures. + osub = await js.subscribe("foo", ordered_consumer=True) + data = bytearray() + + while True: + try: + msg = await osub.next_msg() + data.extend(msg.data) + except nats.errors.TimeoutError: + break + print("All data in stream:", len(data)) + + await nc.close() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/run-with-wipactel-local.sh b/examples/run-with-wipactel-local.sh new file mode 100755 index 0000000..bbec64c --- /dev/null +++ b/examples/run-with-wipactel-local.sh @@ -0,0 +1,17 @@ +#!/bin/bash + +if [[ `basename "$PWD"` != "MQClient-NATS" && $PWD != "/home/circleci/project" ]] ; then + echo "ERROR: Run from 'MQClient-NATS/' (not '$PWD')" + exit 1 +fi + +export WIPACTEL_EXPORT_STDOUT=${WIPACTEL_EXPORT_STDOUT:="TRUE"} +export OTEL_EXPORTER_OTLP_ENDPOINT="http://localhost:4318/v1/traces" +export WIPACTEL_SERVICE_NAME_PREFIX=mqclient-nats + +pip install tox +tox --notest -vv +. .tox/py/bin/activate +./resources/gcp-install.sh + +`dirname "$0"`/run.sh diff --git a/examples/run.sh b/examples/run.sh new file mode 100755 index 0000000..7916d2c --- /dev/null +++ b/examples/run.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +if [[ `basename "$PWD"` != "MQClient-NATS" && $PWD != "/home/circleci/project" ]] ; then + echo "ERROR: Run from 'MQClient-NATS/' (not '$PWD')" + exit 1 +fi + +python examples/worker.py & +python examples/server.py \ No newline at end of file diff --git a/examples/server.py b/examples/server.py new file mode 100644 index 0000000..65ceaf6 --- /dev/null +++ b/examples/server.py @@ -0,0 +1,48 @@ +"""A server sends work out on one queue, and receives results on another.""" + +import argparse +import asyncio +import logging +import typing + +import coloredlogs # type: ignore[import] +from mqclient_nats import Queue + + +async def server(work_queue: Queue, result_queue: Queue) -> None: + """Demo example server.""" + async with work_queue.open_pub() as p: + for i in range(100): + await p.send({"id": i, "cmd": f'echo "{i}"'}) + + results = {} + result_queue.timeout = 5 + async with result_queue.open_sub() as stream: + async for data in stream: + assert isinstance(data, dict) + results[typing.cast(int, data["id"])] = typing.cast(str, data["out"]) + + print(results) + assert len(results) == 100 + for i in results: + assert results[i].strip() == str(i) + + +if __name__ == "__main__": + coloredlogs.install(level=logging.DEBUG) + + parser = argparse.ArgumentParser(description="Worker") + parser.add_argument("--address", default="localhost", help="queue address") + parser.add_argument("--work-queue", default="queue1", help="work queue") + parser.add_argument("--result-queue", default="queue2", help="result queue") + parser.add_argument( + "--prefetch", type=int, default=10, help="result queue prefetch" + ) + args = parser.parse_args() + + workq = Queue(address=args.address, name=args.work_queue) + resultq = Queue( + address=args.address, name=args.result_queue, prefetch=args.prefetch + ) + + asyncio.get_event_loop().run_until_complete(server(workq, resultq)) diff --git a/examples/worker.py b/examples/worker.py new file mode 100644 index 0000000..2024664 --- /dev/null +++ b/examples/worker.py @@ -0,0 +1,36 @@ +"""A worker processes messages from one queue, and sends results on a second +queue.""" + +import argparse +import asyncio +import logging +import subprocess + +import coloredlogs # type: ignore[import] +from mqclient_nats import Queue + + +async def worker(recv_queue: Queue, send_queue: Queue) -> None: + """Demo example worker.""" + async with recv_queue.open_sub() as stream, send_queue.open_pub() as p: + async for data in stream: + cmd = data["cmd"] + out = subprocess.check_output(cmd, shell=True) + data["out"] = out.decode("utf-8") + await p.send(data) + + +if __name__ == "__main__": + coloredlogs.install(level=logging.DEBUG) + + parser = argparse.ArgumentParser(description="Worker") + parser.add_argument("--address", default="localhost", help="queue address") + parser.add_argument("--in-queue", default="queue1", help="input queue") + parser.add_argument("--out-queue", default="queue2", help="output queue") + parser.add_argument("--prefetch", type=int, default=10, help="input queue prefetch") + args = parser.parse_args() + + inq = Queue(address=args.address, name=args.in_queue, prefetch=args.prefetch) + outq = Queue(address=args.address, name=args.out_queue) + + asyncio.get_event_loop().run_until_complete(worker(inq, outq)) diff --git a/integrate_tests/test_nats.py b/integrate_tests/test_nats.py new file mode 100644 index 0000000..83ef927 --- /dev/null +++ b/integrate_tests/test_nats.py @@ -0,0 +1,33 @@ +"""Run integration tests for NATS backend.""" + +import asyncio +import logging + +import pytest +from mqclient.abstract_backend_tests import integrate_backend_interface, integrate_queue +from mqclient.abstract_backend_tests.utils import ( # pytest.fixture # noqa: F401 # pylint: disable=W0611 + queue_name, +) +from mqclient_nats.nats import Backend + +logging.getLogger().setLevel(logging.DEBUG) +logging.getLogger("flake8").setLevel(logging.WARNING) + + +@pytest.fixture(scope="module") +def event_loop(): # type: ignore[no-untyped-def] + loop = asyncio.new_event_loop() + yield loop + loop.close() + + +class TestNATSQueue(integrate_queue.PubSubQueue): + """Run PubSubQueue integration tests with NATS backend.""" + + backend = Backend() + + +class TestNATSBackend(integrate_backend_interface.PubSubBackendInterface): + """Run PubSubBackendInterface integration tests with NATS backend.""" + + backend = Backend() diff --git a/mqclient_nats/__init__.py b/mqclient_nats/__init__.py new file mode 100644 index 0000000..d0efc05 --- /dev/null +++ b/mqclient_nats/__init__.py @@ -0,0 +1,20 @@ +"""Public init.""" + +from .queue import Queue + +__all__ = ["Queue"] + +# version is a human-readable version number. + +# version_info is a four-tuple for programmatic comparison. The first +# three numbers are the components of the version number. The fourth +# is zero for an official release, positive for a development branch, +# or negative for a release candidate or beta (after the base version +# number has been incremented) +__version__ = "0.1.0" +version_info = ( + int(__version__.split(".")[0]), + int(__version__.split(".")[1]), + int(__version__.split(".")[2]), + 0, +) diff --git a/mqclient_nats/nats.py b/mqclient_nats/nats.py new file mode 100644 index 0000000..37da190 --- /dev/null +++ b/mqclient_nats/nats.py @@ -0,0 +1,373 @@ +"""Back-end using NATS.""" + + +import logging +import math +import time +from functools import partial +from typing import ( + Any, + AsyncGenerator, + Awaitable, + Callable, + List, + Optional, + TypeVar, + cast, +) + +from mqclient import backend_interface, log_msgs +from mqclient.backend_interface import ( + RETRY_DELAY, + TIMEOUT_MILLIS_DEFAULT, + TRY_ATTEMPTS, + ClosingFailedExcpetion, + Message, + Pub, + RawQueue, + Sub, +) + +import nats # type: ignore[import] + +T = TypeVar("T") # the callable/awaitable return type + + +async def _anext(gen: AsyncGenerator[Any, Any], default: Any) -> Any: + """Provide the functionality of python 3.10's `anext()`. + + https://docs.python.org/3/library/functions.html#anext + """ + try: + return await gen.__anext__() + except StopAsyncIteration: + return default + + +async def try_call(self: "NATS", func: Callable[..., Awaitable[T]]) -> T: + """Call `func` with auto-retries.""" + i = 0 + while True: + if i > 0: + logging.debug( + f"{log_msgs.TRYCALL_CONNECTION_ERROR_TRY_AGAIN} (attempt #{i+1})..." + ) + + try: + return await func() + except nats.errors.TimeoutError: + raise + except Exception as e: # pylint:disable=broad-except + logging.debug(f"[try_call()] Encountered exception: '{e}'") + if i == TRY_ATTEMPTS - 1: + logging.debug(log_msgs.TRYCALL_CONNECTION_ERROR_MAX_RETRIES) + raise + + await self.close() + time.sleep(RETRY_DELAY) + await self.connect() + i += 1 + + +class NATS(RawQueue): + """Base NATS wrapper, using JetStream. + + Extends: + RawQueue + """ + + def __init__(self, endpoint: str, stream_id: str, subject: str) -> None: + super().__init__() + self.endpoint = endpoint + self.subject = subject + self.stream_id = stream_id + + self._nats_client: Optional[nats.aio.client.Client] = None + self.js: Optional[nats.js.JetStreamContext] = None + + logging.debug(f"Stream & Subject: {stream_id}/{self.subject}") + + async def connect(self) -> None: + """Set up connection and channel.""" + await super().connect() + self._nats_client = cast( + nats.aio.client.Client, await nats.connect(self.endpoint) + ) + # Create JetStream context + self.js = cast( + nats.js.JetStreamContext, + self._nats_client.jetstream(timeout=TIMEOUT_MILLIS_DEFAULT // 1000), + ) + await self.js.add_stream(name=self.stream_id, subjects=[self.subject]) + + async def close(self) -> None: + """Close connection.""" + await super().close() + if not self._nats_client: + raise ClosingFailedExcpetion("No connection to close.") + await self._nats_client.close() + + +class NATSPub(NATS, Pub): + """Wrapper around PublisherClient, using JetStream. + + Extends: + NATS + Pub + """ + + def __init__(self, endpoint: str, stream_id: str, subject: str): + logging.debug(f"{log_msgs.INIT_PUB} ({endpoint}; {stream_id}; {subject})") + super().__init__(endpoint, stream_id, subject) + # NATS is pub-centric, so no extra instance needed + + async def connect(self) -> None: + """Set up pub, then create topic and any subscriptions indicated.""" + logging.debug(log_msgs.CONNECTING_PUB) + await super().connect() + logging.debug(log_msgs.CONNECTED_PUB) + + async def close(self) -> None: + """Close pub (no-op).""" + logging.debug(log_msgs.CLOSING_PUB) + await super().close() + logging.debug(log_msgs.CLOSED_PUB) + + async def send_message(self, msg: bytes) -> None: + """Send a message (publish).""" + logging.debug(log_msgs.SENDING_MESSAGE) + if not self.js: + raise RuntimeError("JetStream is not connected") + + ack: nats.js.api.PubAck = await try_call( + self, partial(self.js.publish, self.subject, msg) + ) + logging.debug(f"Sent Message w/ Ack: {ack}") + logging.debug(log_msgs.SENT_MESSAGE) + + +class NATSSub(NATS, Sub): + """Wrapper around queue with prefetch-queue, using JetStream. + + Extends: + NATS + Sub + """ + + def __init__(self, endpoint: str, stream_id: str, subject: str): + logging.debug(f"{log_msgs.INIT_SUB} ({endpoint}; {stream_id}; {subject})") + super().__init__(endpoint, stream_id, subject) + self._subscription: Optional[nats.js.JetStreamContext.PullSubscription] = None + self.prefetch = 1 + + async def connect(self) -> None: + """Set up sub (pull subscription).""" + logging.debug(log_msgs.CONNECTING_SUB) + await super().connect() + if not self.js: + raise RuntimeError("JetStream is not connected.") + + self._subscription = cast( + nats.js.JetStreamContext.PullSubscription, + await self.js.pull_subscribe(self.subject, "psub"), + ) + logging.debug(log_msgs.CONNECTED_SUB) + + async def close(self) -> None: + """Close sub.""" + logging.debug(log_msgs.CLOSING_SUB) + if not self._subscription: + raise ClosingFailedExcpetion("No sub to close.") + await super().close() + logging.debug(log_msgs.CLOSED_SUB) + + @staticmethod + def _to_message( # type: ignore[override] # noqa: F821 # pylint: disable=W0221 + msg: nats.aio.msg.Msg, # pylint: disable=no-member + ) -> Optional[Message]: + """Transform NATS-Message to Message type.""" + return Message(cast(str, msg.reply), cast(bytes, msg.data)) + + def _from_message(self, msg: Message) -> nats.aio.msg.Msg: + """Transform Message instance to NATS-Message. + + Assumes the message came from this NATSSub instance. + """ + return nats.aio.msg.Msg( + _client=self._nats_client, + subject=self.subject, + reply=msg.msg_id, + data=msg.data, + sid=0, # default + headers=None, # default + ) + + async def _get_messages( + self, timeout_millis: Optional[int], num_messages: int + ) -> List[Message]: + """Get n messages. + + The subscriber pulls a specific number of messages. The actual + number of messages pulled may be smaller than `num_messages`. + """ + if not self._subscription: + raise RuntimeError("Subscriber is not connected") + + if not timeout_millis: + timeout_millis = TIMEOUT_MILLIS_DEFAULT + + try: + nats_msgs: List[nats.aio.msg.Msg] = await try_call( + self, + partial( + self._subscription.fetch, + num_messages, + int(math.ceil(timeout_millis / 1000)), + ), + ) + except nats.errors.TimeoutError: + return [] + + msgs = [] + for recvd in nats_msgs: + msg = self._to_message(recvd) + if msg: + msgs.append(msg) + return msgs + + async def get_message( + self, timeout_millis: Optional[int] = TIMEOUT_MILLIS_DEFAULT + ) -> Optional[Message]: + """Get a message.""" + logging.debug(log_msgs.GETMSG_RECEIVE_MESSAGE) + if not self._subscription: + raise RuntimeError("Subscriber is not connected.") + + try: + msg = (await self._get_messages(timeout_millis, 1))[0] + logging.debug(f"{log_msgs.GETMSG_RECEIVED_MESSAGE} ({msg}).") + return msg + except IndexError: + logging.debug(log_msgs.GETMSG_NO_MESSAGE) + return None + + async def _gen_messages( + self, timeout_millis: Optional[int], num_messages: int + ) -> AsyncGenerator[Message, None]: + """Continuously generate messages until there are no more.""" + if not self._subscription: + raise RuntimeError("Subscriber is not connected.") + + while True: + msgs = await self._get_messages(timeout_millis, num_messages) + if not msgs: + return + for msg in msgs: + yield msg + + async def ack_message(self, msg: Message) -> None: + """Ack a message from the queue.""" + logging.debug(log_msgs.ACKING_MESSAGE) + if not self._subscription: + raise RuntimeError("subscriber is not connected") + + # Acknowledges the received messages so they will not be sent again. + await try_call(self, partial(self._from_message(msg).ack)) + logging.debug(f"{log_msgs.ACKED_MESSAGE} ({msg.msg_id!r}).") + + async def reject_message(self, msg: Message) -> None: + """Reject (nack) a message from the queue.""" + logging.debug(log_msgs.NACKING_MESSAGE) + if not self._subscription: + raise RuntimeError("subscriber is not connected") + + await try_call(self, partial(self._from_message(msg).nak)) # yes, it's "nak" + logging.debug(f"{log_msgs.NACKED_MESSAGE} ({msg.msg_id!r}).") + + async def message_generator( + self, timeout: int = 60, propagate_error: bool = True + ) -> AsyncGenerator[Optional[Message], None]: + """Yield Messages. + + Generate messages with variable timeout. + Yield `None` on `throw()`. + + Keyword Arguments: + timeout {int} -- timeout in seconds for inactivity (default: {60}) + propagate_error {bool} -- should errors from downstream code kill the generator? (default: {True}) + """ + logging.debug(log_msgs.MSGGEN_ENTERED) + if not self._subscription: + raise RuntimeError("subscriber is not connected") + + msg = None + try: + gen = self._gen_messages(timeout * 1000, self.prefetch) + while True: + # get message + logging.debug(log_msgs.MSGGEN_GET_NEW_MESSAGE) + msg = await _anext(gen, None) + if msg is None: + logging.info(log_msgs.MSGGEN_NO_MESSAGE_LOOK_BACK_IN_QUEUE) + break + + # yield message to consumer + try: + logging.debug(f"{log_msgs.MSGGEN_YIELDING_MESSAGE} [{msg}]") + yield msg + # consumer throws Exception... + except Exception as e: # pylint: disable=W0703 + logging.debug(log_msgs.MSGGEN_DOWNSTREAM_ERROR) + if propagate_error: + logging.debug(log_msgs.MSGGEN_PROPAGATING_ERROR) + raise + logging.warning( + f"{log_msgs.MSGGEN_EXCEPTED_DOWNSTREAM_ERROR} {e}.", + exc_info=True, + ) + yield None # hand back to consumer + # consumer requests again, aka next() + else: + pass + + # generator exit (explicit close(), or break in consumer's loop) + except GeneratorExit: + logging.debug(log_msgs.MSGGEN_GENERATOR_EXITING) + logging.debug(log_msgs.MSGGEN_GENERATOR_EXITED) + + +class Backend(backend_interface.Backend): + """NATS Pub-Sub Backend Factory. + + Extends: + Backend + """ + + @staticmethod + async def create_pub_queue( + address: str, name: str, auth_token: str = "" + ) -> NATSPub: + """Create a publishing queue. + + # NOTE - `auth_token` is not used currently + """ + q = NATSPub( # pylint: disable=invalid-name + address, name + "-stream", name + "-subject" + ) + await q.connect() + return q + + @staticmethod + async def create_sub_queue( + address: str, name: str, prefetch: int = 1, auth_token: str = "" + ) -> NATSSub: + """Create a subscription queue. + + # NOTE - `auth_token` is not used currently + """ + q = NATSSub( # pylint: disable=invalid-name + address, name + "-stream", name + "-subject" + ) + q.prefetch = prefetch + await q.connect() + return q diff --git a/mqclient_nats/py.typed b/mqclient_nats/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/mqclient_nats/queue.py b/mqclient_nats/queue.py new file mode 100644 index 0000000..d4a8401 --- /dev/null +++ b/mqclient_nats/queue.py @@ -0,0 +1,21 @@ +"""Queue class encapsulating a pub-sub messaging system with NATS.""" + + +from typing import Any, cast + +import mqclient + +from . import nats + + +class Queue(mqclient.queue.Queue): + __doc__ = mqclient.queue.Queue.__doc__ + + def __init__(self, *args: Any, **kargs: Any) -> None: + super().__init__( + cast( + mqclient.backend_interface.Backend, nats.Backend + ), # mypy is very picky + *args, + **kargs + ) diff --git a/mqclient_nats/requirements.txt b/mqclient_nats/requirements.txt new file mode 100644 index 0000000..7668e1f --- /dev/null +++ b/mqclient_nats/requirements.txt @@ -0,0 +1,3 @@ +git+https://github.com/WIPACrepo/MQClient@v0.3.0#egg=mqclient +nats-py==2.0.0 +nkeys==0.1.0 \ No newline at end of file diff --git a/requirements-dev.txt b/requirements-dev.txt new file mode 100644 index 0000000..c9ac561 --- /dev/null +++ b/requirements-dev.txt @@ -0,0 +1,7 @@ +asyncstdlib +mypy +flake8 +pytest +pytest-asyncio +pytest-mock +wheel diff --git a/resources/docker-nats-js.sh b/resources/docker-nats-js.sh new file mode 100755 index 0000000..1649255 --- /dev/null +++ b/resources/docker-nats-js.sh @@ -0,0 +1,2 @@ +#!/bin/bash +docker run --network host -p 4222:4222 nats -js \ No newline at end of file diff --git a/resources/local-nats-js.sh b/resources/local-nats-js.sh new file mode 100755 index 0000000..6e2a1ef --- /dev/null +++ b/resources/local-nats-js.sh @@ -0,0 +1,8 @@ +#!/bin/bash + +# Download from source b/c snap's version is too old (doesn't have jetstream support) +curl -L https://github.com/nats-io/nats-server/releases/download/v2.6.6/nats-server-v2.6.6-linux-amd64.zip -o nats-server.zip + +unzip nats-server.zip -d nats-server + +nats-server/nats-server-v2.6.6-linux-amd64/nats-server -js \ No newline at end of file diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..075623a --- /dev/null +++ b/setup.cfg @@ -0,0 +1,39 @@ +[coverage:run] +branch = True +parallel = True + +[coverage:report] +# Regexes for lines to exclude from consideration +exclude_lines = + # Have to re-enable the standard pragma + pragma: no cover + + # Don't complain about missing debug-only code: + def __repr__ + if self\.debug + + # Don't complain if tests don't hit defensive assertion code: + raise AssertionError + raise NotImplementedError + + # Don't complain if non-runnable code isn't run: + if 0: + if __name__ == .__main__.: +omit = *__init__* +ignore_errors = True + +[coverage:html] +directory = htmlcov + +[flake8] +ignore=E226,E261,E302,E305,E501,W503,W504 + +[semantic_release] +version_variable = mqclient_nats/__init__.py:__version__ +upload_to_pypi = False +patch_without_tag = True +major_on_zero = False +commit_parser = semantic_release.history.tag_parser +minor_tag = [minor] +fix_tag = [fix] +branch = main diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..5faa1df --- /dev/null +++ b/setup.py @@ -0,0 +1,26 @@ +#!/usr/bin/env python +"""Setup.""" + +import os +import subprocess + +from setuptools import setup # type: ignore[import] + +subprocess.run( + "pip install git+https://github.com/WIPACrepo/wipac-dev-tools.git".split(), + check=True, +) +from wipac_dev_tools import SetupShop # noqa: E402 # pylint: disable=C0413 + +shop = SetupShop( + "mqclient_nats", + os.path.abspath(os.path.dirname(__file__)), + ((3, 7), (3, 9)), + "Message Queue Client API with NATS", +) + +setup( + url="https://github.com/WIPACrepo/MQClient-NATS", + package_data={shop.name: ["py.typed", "requirements.txt"]}, + **shop.get_kwargs(), +) diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..4752a1c --- /dev/null +++ b/tox.ini @@ -0,0 +1,15 @@ +[tox] +envlist = py + +[testenv] +deps = -r mqclient_nats/requirements.txt + -r requirements-dev.txt +commands = pytest -vvv {posargs} +setenv = + PUBSUB_EMULATOR_HOST = localhost:8085 + +[pytest] +addopts = -W error +asyncio_mode = strict +log_format = %(asctime)s.%(msecs)03d %(levelname)s %(message)s +log_date_format = %Y-%m-%d %H:%M:%S \ No newline at end of file