Skip to content
This repository has been archived by the owner on Sep 29, 2022. It is now read-only.

Commit

Permalink
First PR [minor] (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
ric-evans authored Feb 3, 2022
1 parent 5f64a81 commit 81732ac
Show file tree
Hide file tree
Showing 23 changed files with 890 additions and 1 deletion.
64 changes: 64 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
version: 2

jobs:

integrate:
docker:
- image: circleci/python:3.8
steps:
- checkout
- run: |
pip install --user tox
./resources/local-nats-js.sh &
sleep 15
tox integrate_tests -vv
examples:
docker:
- image: circleci/python:3.8
steps:
- checkout
- run: |
pip install --user tox
tox --notest -vv
. .tox/py/bin/activate
echo "Setting Up NATS Server with JetStream:"
./resources/local-nats-js.sh &
sleep 15
./examples/run.sh
nats-examples:
docker:
- image: circleci/python:3.8
steps:
- checkout
- run: |
pip install --user tox
tox --notest -vv
. .tox/py/bin/activate
echo "Setting Up NATS Server with JetStream:"
./resources/local-nats-js.sh &
sleep 15
python ./examples/nats_examples/nats_jetstream.py
pycycle:
docker:
- image: circleci/python:3.8
steps:
- checkout
- run: |
pip install --user tox pycycle
tox --notest -vv
. .tox/py/bin/activate
pycycle --here --verbose
workflows:
version: 2
build_and_test:
jobs:
- integrate
- examples
- nats-examples
- pycycle
29 changes: 29 additions & 0 deletions .github/workflows/python-linters.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# For most recent version see:
# https://github.com/WIPACrepo/wipac-dev-tools/blob/main/.github/workflows/python-linters.yml
# Copy any updates to wipac-dev-tools.

name: Python Linters

on: [push]

jobs:

flake8:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
- run: pip install flake8
- run: flake8 . --ignore=E203,E226,E228,E231,E501,W503,W504

mypy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
- run: pip install mypy
- run: |
([ -e "requirements-dev.txt" ] && pip install -r requirements-dev.txt) || echo "no dev reqs"
- run: |
([ -e "setup.py" ] && pip install .) || pip install -r requirements.txt
- run: mypy --install-types --namespace-packages --non-interactive --exclude build/ .
23 changes: 23 additions & 0 deletions .github/workflows/semantic-release.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
name: Semantic Release

on:
push:
branches:
- main
- master

jobs:
release:
runs-on: ubuntu-latest

steps:
# Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it
- uses: actions/checkout@v2
with:
fetch-depth: 0

- name: Python Semantic Release
uses: relekang/python-semantic-release@master
with:
github_token: ${{ secrets.GITHUB_TOKEN }}
# pypi_token: ${{ secrets.PYPI_TOKEN }}
38 changes: 38 additions & 0 deletions .github/workflows/try-setup-install.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
name: Try Setup Install

on: [push]

jobs:

gather-py3-versions:
runs-on: ubuntu-latest
outputs:
matrix: ${{ steps.py3_versions.outputs.matrix }}
steps:
- uses: actions/checkout@v2
- name: Python Versions
id: py3_versions
run: |
minmin_maxmax=$(grep -P "\(\(\d+, ?\d+\), ?\(\d+, ?\d+\)\)" -oh setup.py | sed 's/[^0-9]/ /g')
IFS=', ' read -r -a array <<< "$minmin_maxmax"
min_thru_max_series=$(for i in `seq ${array[1]} ${array[3]}`; do printf "'3.$i',"; done | rev | cut -c 2- | rev)
echo ::set-output name=matrix::{\"py3_versions\":[$(echo $min_thru_max_series)]}\"
echo $min_thru_max_series
pip-install:
needs: gather-py3-versions
runs-on: ubuntu-latest

strategy:
max-parallel: 4
fail-fast: false
matrix: ${{ fromJSON(needs.gather-py3-versions.outputs.matrix) }}

steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
with:
python-version: ${{ matrix.py3_versions }}
- run: |
pip install --upgrade pip wheel setuptools
pip install --editable .
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,6 @@ dmypy.json

# Pyre type checker
.pyre/

env*/
nats-server*
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# MQClient-NATS
A NATS Message Queue Client API (Supporting the MQClient Interface)
A NATS Message Queue Client API with JetStream (Supporting the MQClient Interface)
75 changes: 75 additions & 0 deletions examples/nats_examples/nats_jetstream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
"""From https://github.com/nats-io/nats.py/blob/main/examples/jetstream.py."""

import asyncio

import nats # type: ignore[import]

# from nats.errors import TimeoutError


async def main():
nc = await nats.connect("localhost")

# Create JetStream context.
js = nc.jetstream()

# Persist messages on 'foo's subject.
await js.add_stream(name="sample-stream", subjects=["foo"])

for i in range(0, 10):
ack = await js.publish("foo", f"hello world: {i}".encode())
print(ack)

# Create pull based consumer on 'foo'.
psub = await js.pull_subscribe("foo", "psub")

# Fetch and ack messagess from consumer.
for i in range(0, 10):
msgs = await psub.fetch(1)
for msg in msgs:
print(msg)

# Create single ephemeral push based subscriber.
sub = await js.subscribe("foo")
msg = await sub.next_msg()
await msg.ack()

# Create single push based subscriber that is durable across restarts.
sub = await js.subscribe("foo", durable="myapp")
msg = await sub.next_msg()
await msg.ack()

# Create deliver group that will be have load balanced messages.
async def qsub_a(msg):
print("QSUB A:", msg)
await msg.ack()

async def qsub_b(msg):
print("QSUB B:", msg)
await msg.ack()

await js.subscribe("foo", "workers", cb=qsub_a)
await js.subscribe("foo", "workers", cb=qsub_b)

for i in range(0, 10):
ack = await js.publish("foo", f"hello world: {i}".encode())
print("\t", ack)

# Create ordered consumer with flow control and heartbeats
# that auto resumes on failures.
osub = await js.subscribe("foo", ordered_consumer=True)
data = bytearray()

while True:
try:
msg = await osub.next_msg()
data.extend(msg.data)
except nats.errors.TimeoutError:
break
print("All data in stream:", len(data))

await nc.close()


if __name__ == "__main__":
asyncio.run(main())
17 changes: 17 additions & 0 deletions examples/run-with-wipactel-local.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/bin/bash

if [[ `basename "$PWD"` != "MQClient-NATS" && $PWD != "/home/circleci/project" ]] ; then
echo "ERROR: Run from 'MQClient-NATS/' (not '$PWD')"
exit 1
fi

export WIPACTEL_EXPORT_STDOUT=${WIPACTEL_EXPORT_STDOUT:="TRUE"}
export OTEL_EXPORTER_OTLP_ENDPOINT="http://localhost:4318/v1/traces"
export WIPACTEL_SERVICE_NAME_PREFIX=mqclient-nats

pip install tox
tox --notest -vv
. .tox/py/bin/activate
./resources/gcp-install.sh

`dirname "$0"`/run.sh
9 changes: 9 additions & 0 deletions examples/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/bin/bash

if [[ `basename "$PWD"` != "MQClient-NATS" && $PWD != "/home/circleci/project" ]] ; then
echo "ERROR: Run from 'MQClient-NATS/' (not '$PWD')"
exit 1
fi

python examples/worker.py &
python examples/server.py
48 changes: 48 additions & 0 deletions examples/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
"""A server sends work out on one queue, and receives results on another."""

import argparse
import asyncio
import logging
import typing

import coloredlogs # type: ignore[import]
from mqclient_nats import Queue


async def server(work_queue: Queue, result_queue: Queue) -> None:
"""Demo example server."""
async with work_queue.open_pub() as p:
for i in range(100):
await p.send({"id": i, "cmd": f'echo "{i}"'})

results = {}
result_queue.timeout = 5
async with result_queue.open_sub() as stream:
async for data in stream:
assert isinstance(data, dict)
results[typing.cast(int, data["id"])] = typing.cast(str, data["out"])

print(results)
assert len(results) == 100
for i in results:
assert results[i].strip() == str(i)


if __name__ == "__main__":
coloredlogs.install(level=logging.DEBUG)

parser = argparse.ArgumentParser(description="Worker")
parser.add_argument("--address", default="localhost", help="queue address")
parser.add_argument("--work-queue", default="queue1", help="work queue")
parser.add_argument("--result-queue", default="queue2", help="result queue")
parser.add_argument(
"--prefetch", type=int, default=10, help="result queue prefetch"
)
args = parser.parse_args()

workq = Queue(address=args.address, name=args.work_queue)
resultq = Queue(
address=args.address, name=args.result_queue, prefetch=args.prefetch
)

asyncio.get_event_loop().run_until_complete(server(workq, resultq))
36 changes: 36 additions & 0 deletions examples/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""A worker processes messages from one queue, and sends results on a second
queue."""

import argparse
import asyncio
import logging
import subprocess

import coloredlogs # type: ignore[import]
from mqclient_nats import Queue


async def worker(recv_queue: Queue, send_queue: Queue) -> None:
"""Demo example worker."""
async with recv_queue.open_sub() as stream, send_queue.open_pub() as p:
async for data in stream:
cmd = data["cmd"]
out = subprocess.check_output(cmd, shell=True)
data["out"] = out.decode("utf-8")
await p.send(data)


if __name__ == "__main__":
coloredlogs.install(level=logging.DEBUG)

parser = argparse.ArgumentParser(description="Worker")
parser.add_argument("--address", default="localhost", help="queue address")
parser.add_argument("--in-queue", default="queue1", help="input queue")
parser.add_argument("--out-queue", default="queue2", help="output queue")
parser.add_argument("--prefetch", type=int, default=10, help="input queue prefetch")
args = parser.parse_args()

inq = Queue(address=args.address, name=args.in_queue, prefetch=args.prefetch)
outq = Queue(address=args.address, name=args.out_queue)

asyncio.get_event_loop().run_until_complete(worker(inq, outq))
33 changes: 33 additions & 0 deletions integrate_tests/test_nats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""Run integration tests for NATS backend."""

import asyncio
import logging

import pytest
from mqclient.abstract_backend_tests import integrate_backend_interface, integrate_queue
from mqclient.abstract_backend_tests.utils import ( # pytest.fixture # noqa: F401 # pylint: disable=W0611
queue_name,
)
from mqclient_nats.nats import Backend

logging.getLogger().setLevel(logging.DEBUG)
logging.getLogger("flake8").setLevel(logging.WARNING)


@pytest.fixture(scope="module")
def event_loop(): # type: ignore[no-untyped-def]
loop = asyncio.new_event_loop()
yield loop
loop.close()


class TestNATSQueue(integrate_queue.PubSubQueue):
"""Run PubSubQueue integration tests with NATS backend."""

backend = Backend()


class TestNATSBackend(integrate_backend_interface.PubSubBackendInterface):
"""Run PubSubBackendInterface integration tests with NATS backend."""

backend = Backend()
Loading

0 comments on commit 81732ac

Please sign in to comment.