Skip to content

Commit

Permalink
Merge pull request #467 from TeskaLabs/docs/pubsub
Browse files Browse the repository at this point in the history
Documentation for `PubSub`
  • Loading branch information
mejroslav authored Aug 8, 2023
2 parents 61cd891 + 2fc18f0 commit fcc6594
Show file tree
Hide file tree
Showing 3 changed files with 405 additions and 63 deletions.
227 changes: 164 additions & 63 deletions asab/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,43 @@
import asyncio
import weakref
import functools
import typing


L = logging.getLogger(__name__)


class PubSub(object):
"""
Object for delivering messages across the ASAB application.
A message is a function or coroutine with specific `message_type` that can be published and subscribed at various places in the code.
"""


def __init__(self, app):
self.Subscribers = {}
self.Loop = app.Loop


def subscribe(self, message_type, callback):
def subscribe(self, message_type: str, callback: typing.Callable):
"""
Subscribe a subscriber to the an message type.
It could be even plain function, method or its coroutine variant (then it will be delivered in a dedicated future)
Set `callback` that will be called when `message_type` is received.
Args:
message_type: Message to be subscribed to. It should end with an exclamation mark `"!"`.
callback: Function or coroutine that is called when the message is received. `message_type` is passed as the first argument to the callback.
Examples:
```python
class MyClass:
def __init__(self, app):
app.PubSub.subscribe("Application.tick!", self.on_tick)
def on_tick(self, message_type):
print(message_type)
```
"""

# If subscribe is a bound method, do special treatment
Expand All @@ -36,7 +56,23 @@ def subscribe(self, message_type, callback):

def subscribe_all(self, obj):
"""
Find all @asab.subscribe decorated methods on the obj and do subscription
Find all methods decorated by `@asab.subscribe` on the object and subscribe for them.
Examples:
```python
class MyClass:
def __init__(self, app):
app.PubSub.subscribe_all(self)
@asab.subscribe("Application.tick!")
async def on_tick(self, message_type):
print(message_type)
@asab.subscribe("Application.exit!")
def on_exit(self, message_type):
print(message_type)
```
"""
for member_name in dir(obj):
member = getattr(obj, member_name)
Expand All @@ -47,8 +83,23 @@ def subscribe_all(self, obj):


def unsubscribe(self, message_type, callback):
""" Remove a subscriber of an message type from the set. """
"""
Remove `callback` from the subscribed `message_type`.
When the subscription does not exist, warning is displayed.
Examples:
```python
class MyClass:
def __init__(self, app):
app.PubSub.subscribe("Application.tick!", self.only_once)
def only_once(self, message_type):
print("This message is displayed only once!")
app.PubSub.unsubscribe("Application.tick!", self.only_once)
```
"""
callback_list = self.Subscribers.get(message_type)
if callback_list is None:
L.warning("Message type subscription '{}'' not found.".format(message_type))
Expand Down Expand Up @@ -113,9 +164,33 @@ def _deliver_async(loop, callback, message_type, *args, **kwargs):
callback_list.remove(callback_ref)


def publish(self, message_type, *args, **kwargs):
def publish(self, message_type: str, *args, **kwargs):
"""
Notify subscribers of an `message type`. Including arguments.
Publish the message and notify the subscribers of an `message type`.
`message_type` is passed as the first argument to the subscribed callback.
Args:
message_type: The emitted message.
asynchronously (bool, optional): If `True`, `call_soon()` method will be used for the asynchronous delivery of the message. Defaults to `False`.
Examples:
```python
class MyApplication(asab.Application):
async def initialize(self):
self.Count = 0
self.PubSub.subscribe("Fireworks.started!", self.on_fireworks)
async def main(self):
for i in range(3):
self.Count += 1
self.PubSub.publish("Fireworks.started!", self.Count)
await asyncio.sleep(1)
def on_fireworks(self, message_type, count):
print("boom " * count)
```
"""

asynchronously = kwargs.pop('asynchronously', False)
Expand All @@ -132,44 +207,62 @@ def publish(self, message_type, *args, **kwargs):
L.exception("Error in a PubSub callback", struct_data={'message_type': message_type})


def publish_threadsafe(self, message_type, *args, **kwargs):
def publish_threadsafe(self, message_type: str, *args, **kwargs):
"""
Notify subscribers of an `message type` safely form a different that main thread.
Publish the message and notify the subscribers of an `message type` safely form a different that main thread.
`message_type` is passed as the first argument to the subscribed callback.
Args:
message_type: The emitted message.
asynchronously (bool, optional): If `True`, `call_soon()` method will be used for the asynchronous delivery of the message. Defaults to `False`.
"""
def in_main_thread():
self.publish(message_type, *args, **kwargs)
self.Loop.call_soon_threadsafe(in_main_thread)


async def message(self, message_type):
'''
This method allows to await a specific message from a coroutine.
It is a convenience method for `Subscriber` object.
async def message(self, message_type: str) -> tuple:
"""
Await specific message from a coroutine. It is a convenience method for the `Subscriber` object.
Usage:
```
message_type, args, kwargs = await self.PubSub.message("Library.ready!")
```
Args:
message_type: Message to be awaited.
Returns:
Triple (message_type, args, kwargs).
`message_type`, `args` and `kwargs` are the same as in PubSub callback.
Examples:
'''
```python
message_type, args, kwargs = await self.PubSub.message("Library.ready!")
```
"""
subscriber = Subscriber(self, message_type)
message_type, args, kwargs = await subscriber.message()
return message_type, args, kwargs


class subscribe(object):
"""
Decorator function that simplifies the process of subscription together with `PubSub.subscribe_all()` method.
Examples:
```python
class MyClass(object):
def __init__(self, app):
app.PubSub.subscribe_all(self)
'''
Decorator
@asab.subscribe("Application.tick!")
async def on_tick(self, message_type):
print(message_type)
Usage:
@asab.subscribe("Application.exit!")
def on_exit(self, message_type):
print(message_type)
```
"""

@asab.subscribe("tick")
def on_tick(self, message_type):
print("Service tick")
'''

def __init__(self, message_type):
self.message_type = message_type
Expand All @@ -184,21 +277,31 @@ def __call__(self, f):


class Subscriber(object):
"""
Object for consuming PubSub messages in coroutines.
'''
:any:`Subscriber` object allows to consume PubSub messages in coroutines.
It subscribes for various message types and consumes them.
It works on FIFO basis (First message In, first message Out).
If ``pubsub`` argument is None, the initial subscription is skipped.
It subscribes for various message types and consumes them.
It is built on (first-in, first-out) basis.
If `pubsub` argument is `None`, the initial subscription is skipped.
.. code:: python
Examples:
subscriber = asab.Subscriber(
app.PubSub,
"Application.tick!",
"Application.stop!"
)
'''
The example of the subscriber object usage in async for statement:
```python
async def my_coroutine(self):
# Subscribe for two application events
subscriber = asab.Subscriber(
self.PubSub,
"Application.tick!",
"Application.exit!"
)
async for message_type, args, kwargs in subscriber:
if message_type == "Application.exit!":
break;
print("Tick.")
```
"""

def __init__(self, pubsub=None, *message_types):

Expand All @@ -211,9 +314,9 @@ def __init__(self, pubsub=None, *message_types):


def subscribe(self, pubsub, message_type):
'''
Subscribe for more message types. This method can be called many times with various ``pubsub`` objects.
'''
"""
Subscribe for more message types. This method can be called many times with various `pubsub` objects.
"""
pubsub.subscribe(message_type, self)
self._subscriptions.append((pubsub, message_type))

Expand All @@ -223,28 +326,26 @@ def __call__(self, message_type, *args, **kwargs):


def message(self):
'''
Wait for a message asynchronously.
Returns a three-members tuple ``(message_type, args, kwargs)``.
Example of the `await message()` use:
.. code:: python
async def my_coroutine(app):
# Subscribe for a two application events
subscriber = asab.Subscriber(
app.PubSub,
"Application.tick!",
"Application.exit!"
)
while True:
message_type, args, kwargs = await subscriber.message()
if message_type == "Application.exit!":
break
print("Tick.")
'''
"""
Wait for a message asynchronously and return triple `(message_type, args, kwargs)`.
Examples:
```python
async def my_coroutine(app):
# Subscribe for a two application events
subscriber = asab.Subscriber(
app.PubSub,
"Application.tick!",
"Application.exit!"
)
while True:
message_type, args, kwargs = await subscriber.message()
if message_type == "Application.exit!":
break
print("Tick.")
```
"""
return self._q.get()


Expand Down
Loading

0 comments on commit fcc6594

Please sign in to comment.