Skip to content

Commit

Permalink
0.2.0: refactor producer logic from decorator to class. (#36)
Browse files Browse the repository at this point in the history
### Breaking Changes

- rewrite heizer producer to be a class instead of decorator. There is no clear benefit to create
  producer as decorator
- updated class names

### New Features:

- support retry in consumer
- add helpers to list, list topics
- add consumer signal
- write consumer status log file for health checking
- support async produce in producer

### Improvements

- add more logs
  • Loading branch information
claudezss authored Aug 6, 2023
1 parent e466a1c commit 112d235
Show file tree
Hide file tree
Showing 25 changed files with 954 additions and 443 deletions.
31 changes: 0 additions & 31 deletions .github/workflows/deploy-latest.yml

This file was deleted.

15 changes: 9 additions & 6 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ jobs:
run: |
python -m pytest -s tests --cov=heizer --cov-report=xml --junit-xml=report.xml
test_build_doc:
build_doc_and_publish_on_main:
runs-on: ubuntu-latest
services:
zookeeper:
Expand Down Expand Up @@ -116,13 +116,16 @@ jobs:
sudo apt-get install -y zip gzip tar
python -m pip install --upgrade pip
pip install -e .[all,doc]
sphinx-multiversion ./docs/source ./doc/
zip -r doc.zip doc/
echo ${{ github.ref_name }} > heizer/VERSION
python docs/create_versions_file.py
sphinx-build ./docs/source "./public/${{ github.ref_name }}"
mv ./docs/versions.json ./public/versions.json
zip -r public.zip ./public/
- name: Upload Artifact
uses: actions/upload-artifact@v3
with:
name: doc_zip
path: doc.zip
name: public_zip
path: public.zip
retention-days: 5
- name: deploy
if: github.event_name == 'push' && (github.ref == 'refs/heads/main' || startsWith(github.ref, 'refs/tags/'))
Expand All @@ -131,7 +134,7 @@ jobs:
host: ${{ secrets.SERVER_IP }}
username: "root"
password: ${{ secrets.SERVER_PASS }}
source: "doc/"
source: "public/"
target: "/var/www/html/docs/"
strip_components: 1
overwrite: true
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,5 @@ cython_debug/
# Editors
.idea
.vscode
.fleet
public
40 changes: 40 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Main/Dev


### Features

### Improvements

### Fixes

---

# Releases

## 0.2.0

### Breaking Changes

- rewrite heizer producer to be a class instead of decorator. There is no clear benefit to create
producer as decorator
- updated class names

### New Features:

- support retry in consumer
- add helpers to list, list topics
- add consumer signal
- write consumer status log file for health checking
- support async produce in producer

### Improvements

- add more logs

---

## 0.1.5

### Improvements

- improve consumer
21 changes: 21 additions & 0 deletions docs/create_versions_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import json
import pathlib

import requests


def create():
versions = [{"name": "main", "url": "https://heizer.claudezss.com/docs/main"}]

rsp = requests.get("https://api.github.com/repos/claudezss/heizer/releases?per_page=100")
d = rsp.json()
for item in d:
release_name = item["name"]
versions.append({"name": release_name, "url": f"https://heizer.claudezss.com/docs/{release_name}/"})

with open(pathlib.Path(__file__).parent / "versions.json", "+w") as f:
json.dump(versions, f, indent=4)


if __name__ == "__main__":
create()
37 changes: 19 additions & 18 deletions docs/source/_templates/versions.html
Original file line number Diff line number Diff line change
@@ -1,28 +1,29 @@
{%- if current_version %}
<div class="rst-versions" data-toggle="rst-versions" role="note" aria-label="versions">
<script src="_static/jquery.js"></script>
<script>
fetch('./../versions.json')
.then(response => response.json())
.then(data => {
var myDl = document.getElementById('versions');
data.map(function (item) {
var dd = document.createElement('dd');
var a = document.createElement('a')
a.textContent = item.name;
a.href = item.url;
dd.appendChild(a);
myDl.appendChild(dd);
})
});
</script>
<span class="rst-current-version" data-toggle="rst-current-version">

<span class="fa fa-book"> Other Versions</span>
v: {{ current_version.name }}
v: {{ release }}
<span class="fa fa-caret-down"></span>
</span>
<div class="rst-other-versions">
{%- if versions.tags %}
<dl>
<div class="rst-other-versions" >
<dl id="versions">
<dt>Tags</dt>
{%- for item in versions.tags %}
<dd><a href="{{ item.url }}">{{ item.name }}</a></dd>
{%- endfor %}
</dl>
{%- endif %}
{%- if versions.branches %}
<dl>
<dt>Branches</dt>
{%- for item in versions.branches %}
<dd><a href="{{ item.url }}">{{ item.name }}</a></dd>
{%- endfor %}
</dl>
{%- endif %}
</div>
</div>
{%- endif %}
6 changes: 5 additions & 1 deletion docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@
# import sys

# sys.path.insert(0, os.path.abspath("../.."))
from pathlib import Path

root_folder = Path(__file__).parent.parent.parent

project = "heizer"
copyright = "2023, Yan Zhang"
author = "Yan Zhang"
release = "main"
release = open(root_folder / "heizer" / "VERSION").read()

# -- General configuration ---------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration
Expand All @@ -32,6 +35,7 @@
"_templates",
]

html_context = {"release": release}

autodoc_typehints = "description"

Expand Down
97 changes: 56 additions & 41 deletions docs/source/tutorial/consumer.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,60 +8,68 @@ Basic Producer and Consumer

.. ipython:: python
from heizer import HeizerConfig, HeizerTopic, consumer, producer, HeizerMessage
from heizer import Topic, consumer, Producer, Message, ProducerConfig, ConsumerConfig, create_new_topics
import json
import uuid
import asyncio
producer_config = HeizerConfig(
{
"bootstrap.servers": "localhost:9092",
}
)
producer_config = ProducerConfig(bootstrap_servers="localhost:9092")
consumer_config = HeizerConfig(
{
"bootstrap.servers": "localhost:9092",
"group.id": "default",
"auto.offset.reset": "earliest",
}
)
consumer_config = ConsumerConfig(bootstrap_servers="localhost:9092", group_id="default")
2. Create the topic
2. Create the topic with 2 partitions

.. ipython:: python
topics = [HeizerTopic(name="my.topic1.consumer.example")]
topics = [Topic(name=f"my.topic1.consumer.example.{uuid.uuid4()}", num_partitions=2)]
create_new_topics(config=producer_config, topics=topics)
3. Create producer

.. ipython:: python
@producer(
topics=topics,
config=producer_config,
key_alias="key",
headers_alias="headers",
)
def produce_data(status: str, result: str):
return {
"status": status,
"result": result,
"key": "my_key",
"headers": {"my_header": "my_header_value"},
}
pd = Producer(config=producer_config)
4. Publish messages
4. Publish messages synchronously to partition 0

.. ipython:: python
produce_data("start", "1")
produce_data("loading", "2")
for status, val in [("start", "1"), ("loading", "2"), ("success", "3"), ("postprocess", "4")]:
pd.produce(
topic=topics[0],
key="my_key",
value={"status": status, "result": val},
headers={"k": "v"},
partition=0,
auto_flush=False
)
pd.flush()
produce_data("success", "3")
5. Publish messages asynchronously to partition 1 ( it's faster than sync produce in most cases)

produce_data("postprocess", "4")
.. ipython:: python
5. Create consumer
jobs = []
async def produce():
for status, val in [("start", "1"), ("loading", "2"), ("success", "3"), ("postprocess", "4")]:
jobs.append(
asyncio.ensure_future(
pd.async_produce(
topic=topics[0],
key="my_key",
value={"status": status, "result": val},
headers={"k": "v"},
partition=1,
auto_flush=False
)
)
)
await asyncio.gather(*jobs)
pd.flush()
asyncio.run(produce())
6. Create consumer

.. ipython:: python
Expand All @@ -70,7 +78,8 @@ Basic Producer and Consumer
# `status` is `success` in msg
# If there is no stopper func, consumer will keep running forever
def stopper(msg: HeizerMessage):
def stopper(msg: Message, C: consumer, *arg, **kargs):
print(f"Consumer name: {C.name}")
data = json.loads(msg.value)
if data["status"] == "success":
return True
Expand All @@ -81,12 +90,18 @@ Basic Producer and Consumer
config=consumer_config,
stopper=stopper,
)
def consume_data(message: HeizerMessage):
def consume_data(message: Message, *arg, **kwargs):
data = json.loads(message.value)
print(data)
print(message.key)
print(message.headers)
print(f"message data: {data}")
print(f"message key: {message.key}")
print(f"message headers: {message.headers}")
return data["result"]
result = consume_data()
print("Expected Result:", result)
print("Expected Result (should be 3):", result)
7. More samples:

.. literalinclude :: ./../../../tests/test_consumer.py
:language: python
30 changes: 19 additions & 11 deletions heizer/__init__.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
from heizer._source.admin import create_new_topic, get_admin_client
from heizer._source.consumer import consumer
from heizer._source.message import HeizerMessage
from heizer._source.producer import producer
from heizer._source.topic import HeizerTopic
from heizer.config import HeizerConfig
from heizer._source.admin import create_new_topics, delete_topics, get_admin_client, list_topics
from heizer._source.consumer import ConsumerSignal, consumer
from heizer._source.message import Message
from heizer._source.producer import Producer
from heizer._source.status_manager import read_consumer_status, write_consumer_status
from heizer._source.topic import Topic
from heizer.config import BaseConfig, ConsumerConfig, ProducerConfig

__all__ = [
"consumer",
"producer",
"HeizerConfig",
"HeizerTopic",
"HeizerMessage",
"create_new_topic",
"ConsumerSignal",
"Producer",
"BaseConfig",
"ProducerConfig",
"ConsumerConfig",
"Message",
"Topic",
"create_new_topics",
"get_admin_client",
"write_consumer_status",
"read_consumer_status",
"delete_topics",
"list_topics",
]
Loading

0 comments on commit 112d235

Please sign in to comment.