Skip to content

Commit d52c7f6

Browse files
committed
Split Backend into Broker/ResultBackend/EventBackend
1 parent 4a6e1c7 commit d52c7f6

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+3233
-2766
lines changed

README.md

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -77,11 +77,9 @@ import tasks
7777
logger = logging.getLogger("arrlio")
7878
logger.setLevel("INFO")
7979

80-
BACKEND = "arrlio.backends.local"
81-
# BACKEND = "arrlio.backends.rabbitmq"
8280

8381
async def main():
84-
app = arrlio.App(arrlio.Config(backend={"module": BACKEND}))
82+
app = arrlio.App(arrlio.Config())
8583

8684
async with app:
8785
await app.consume_tasks()
@@ -103,7 +101,7 @@ async def main():
103101
ar = await app.send_task(tasks.exception)
104102
logger.info(await ar.get())
105103
except Exception as e:
106-
print(f"\nThis is example exception for {app.backend}:\n")
104+
print(f"\nThis is example exception for {app.result_backend}:\n")
107105
logger.exception(e)
108106
print()
109107

@@ -130,9 +128,6 @@ import tasks
130128
logger = logging.getLogger("arrlio")
131129
logger.setLevel("INFO")
132130

133-
BACKEND = "arrlio.backends.local"
134-
# BACKEND = "arrlio.backends.rabbitmq"
135-
136131

137132
async def main():
138133
graph = arrlio.Graph("My Graph")
@@ -146,7 +141,6 @@ async def main():
146141
# plugins are required
147142
app = arrlio.App(
148143
arrlio.Config(
149-
backend={"module": BACKEND},
150144
plugins=[
151145
{"module": "arrlio.plugins.events"},
152146
{"module": "arrlio.plugins.graphs"},
@@ -178,9 +172,6 @@ import tasks
178172
logger = logging.getLogger("arrlio")
179173
logger.setLevel("INFO")
180174

181-
BACKEND = "arrlio.backends.local"
182-
# BACKEND = "arrlio.backends.rabbitmq"
183-
184175

185176
async def main():
186177
graph = arrlio.Graph("My Graph")
@@ -190,7 +181,6 @@ async def main():
190181

191182
app = arrlio.App(
192183
arrlio.Config(
193-
backend={"module": BACKEND},
194184
plugins=[
195185
{"module": "arrlio.plugins.events"},
196186
{"module": "arrlio.plugins.graphs"},

arrlio/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,12 @@
22
import logging
33
import sys
44

5+
56
__version__ = importlib.metadata.version("arrlio")
67

78
logger = logging.getLogger("arrlio")
89

9-
log_frmt = logging.Formatter("%(asctime)s %(levelname)-8s %(name)-27s lineno:%(lineno)4d -- %(message)s")
10+
log_frmt = logging.Formatter("%(asctime)s %(levelname)-8s %(name)-40s lineno:%(lineno)4d -- %(message)s")
1011
log_hndl = logging.StreamHandler(stream=sys.stderr)
1112
log_hndl.setFormatter(log_frmt)
1213
logger.addHandler(log_hndl)
@@ -17,4 +18,5 @@
1718
from arrlio.models import Event, Graph, Task, TaskInstance, TaskResult # noqa
1819
from arrlio.settings import LOG_LEVEL
1920

21+
2022
logger.setLevel(LOG_LEVEL)

arrlio/abc.py

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
from abc import ABC, abstractmethod
2+
from typing import Any, AsyncGenerator, Callable, Coroutine
3+
4+
from arrlio.models import Event, Shared, TaskInstance, TaskResult
5+
6+
7+
class AbstractClosable(ABC):
8+
@property
9+
@abstractmethod
10+
def is_closed(self) -> bool:
11+
raise NotImplementedError
12+
13+
@abstractmethod
14+
async def init(self):
15+
raise NotImplementedError
16+
17+
@abstractmethod
18+
async def close(self):
19+
raise NotImplementedError
20+
21+
async def __aenter__(self):
22+
await self.init()
23+
return self
24+
25+
async def __aexit__(self, exc_type, exc, tb):
26+
await self.close()
27+
28+
29+
class AbstractBroker(AbstractClosable, ABC):
30+
@abstractmethod
31+
async def send_task(self, task_instance: TaskInstance, **kwds):
32+
"""Send task to backend."""
33+
raise NotImplementedError
34+
35+
@abstractmethod
36+
async def consume_tasks(self, queues: list[str], callback: Callable[[TaskInstance], Coroutine]):
37+
"""Consume tasks from the queues and invoke `callback` on `arrlio.models.TaskInstance` received."""
38+
raise NotImplementedError
39+
40+
@abstractmethod
41+
async def stop_consume_tasks(self, queues: list[str] | None = None):
42+
"""Stop consuming tasks."""
43+
raise NotImplementedError
44+
45+
46+
class AbstractResultBackend(AbstractClosable, ABC):
47+
@abstractmethod
48+
def make_headers(self, task_instance: TaskInstance) -> dict:
49+
"""Make result backend headers for `arrlio.models.TaskInstance`."""
50+
raise NotImplementedError
51+
52+
@abstractmethod
53+
def make_shared(self, task_instance: TaskInstance) -> Shared:
54+
"""Make result backend shared for `arrio.models.TaskInstance`."""
55+
raise NotImplementedError
56+
57+
@abstractmethod
58+
async def allocate_storage(self, task_instance: TaskInstance):
59+
"""Allocate storage for future task result if needed."""
60+
raise NotImplementedError
61+
62+
@abstractmethod
63+
async def push_task_result(self, task_result: TaskResult, task_instance: TaskInstance):
64+
"""Push `arrlio.models.TaskResult` for `arrlio.models.TaskInstance`."""
65+
raise NotImplementedError
66+
67+
@abstractmethod
68+
async def pop_task_result(self, task_instance: TaskInstance) -> AsyncGenerator[TaskResult, None]:
69+
"""Pop `arrlio.models.TaskResult` for `arrlio.models.TaskInstance`."""
70+
raise NotImplementedError
71+
72+
@abstractmethod
73+
async def close_task(self, task_instance: TaskInstance, idx: tuple[str, int] | None = None):
74+
"""Close task."""
75+
raise NotImplementedError
76+
77+
78+
class AbstractEventBackend(AbstractClosable, ABC):
79+
@abstractmethod
80+
async def send_event(self, event: Event):
81+
"""Send `arrlio.models.Event`."""
82+
raise NotImplementedError
83+
84+
@abstractmethod
85+
async def consume_events(
86+
self,
87+
callback_id: str,
88+
callback: Callable[[Event], Any],
89+
event_types: list[str] | None = None,
90+
):
91+
"""Consume events and invoke `callback` on `arrlio.models.Event` received."""
92+
raise NotImplementedError
93+
94+
@abstractmethod
95+
async def stop_consume_events(self, callback_id: str | None = None):
96+
"""Stop consuming events."""
97+
raise NotImplementedError

arrlio/backends/base.py

Lines changed: 0 additions & 167 deletions
This file was deleted.

arrlio/backends/brokers/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from . import local, rabbitmq

0 commit comments

Comments
 (0)