diff --git a/404.html b/404.html index 4800bae..2b75596 100644 --- a/404.html +++ b/404.html @@ -303,7 +303,7 @@ - Reference - Code API + Introduction @@ -376,6 +376,48 @@ + + + + + + +
  • + + + + + Task Consumer + + + + +
  • + + + + + + + + + + +
  • + + + + + Workers + + + + +
  • + + + + @@ -450,7 +492,7 @@ - Database + Async Database diff --git a/assets/_mkdocstrings.css b/assets/_mkdocstrings.css index 85449ec..b500381 100644 --- a/assets/_mkdocstrings.css +++ b/assets/_mkdocstrings.css @@ -26,20 +26,33 @@ float: right; } +/* Parameter headings must be inline, not blocks. */ +.doc-heading-parameter { + display: inline; +} + +/* Prefer space on the right, not the left of parameter permalinks. */ +.doc-heading-parameter .headerlink { + margin-left: 0 !important; + margin-right: 0.2rem; +} + /* Backward-compatibility: docstring section titles in bold. */ .doc-section-title { font-weight: bold; } /* Symbols in Navigation and ToC. */ -:root, +:root, :host, [data-md-color-scheme="default"] { + --doc-symbol-parameter-fg-color: #df50af; --doc-symbol-attribute-fg-color: #953800; --doc-symbol-function-fg-color: #8250df; --doc-symbol-method-fg-color: #8250df; --doc-symbol-class-fg-color: #0550ae; --doc-symbol-module-fg-color: #5cad0f; + --doc-symbol-parameter-bg-color: #df50af1a; --doc-symbol-attribute-bg-color: #9538001a; --doc-symbol-function-bg-color: #8250df1a; --doc-symbol-method-bg-color: #8250df1a; @@ -48,12 +61,14 @@ } [data-md-color-scheme="slate"] { + --doc-symbol-parameter-fg-color: #ffa8cc; --doc-symbol-attribute-fg-color: #ffa657; --doc-symbol-function-fg-color: #d2a8ff; --doc-symbol-method-fg-color: #d2a8ff; --doc-symbol-class-fg-color: #79c0ff; --doc-symbol-module-fg-color: #baff79; + --doc-symbol-parameter-bg-color: #ffa8cc1a; --doc-symbol-attribute-bg-color: #ffa6571a; --doc-symbol-function-bg-color: #d2a8ff1a; --doc-symbol-method-bg-color: #d2a8ff1a; @@ -68,6 +83,15 @@ code.doc-symbol { font-weight: bold; } +code.doc-symbol-parameter { + color: var(--doc-symbol-parameter-fg-color); + background-color: var(--doc-symbol-parameter-bg-color); +} + +code.doc-symbol-parameter::after { + content: "param"; +} + code.doc-symbol-attribute { color: var(--doc-symbol-attribute-fg-color); background-color: var(--doc-symbol-attribute-bg-color); diff --git a/index.html b/index.html index 6b3b46d..df2c019 100644 --- a/index.html +++ b/index.html @@ -366,7 +366,7 @@ - Reference - Code API + Introduction @@ -439,6 +439,48 @@ + + + + + + +
  • + + + + + Task Consumer + + + + +
  • + + + + + + + + + + +
  • + + + + + Workers + + + + +
  • + + + + @@ -513,7 +555,7 @@ - Database + Async Database @@ -614,7 +656,7 @@

    Home

    Aio Fluid

    -

    Async utilities for backend python services

    +

    Async utilities for backend python services developed by Quantmind.

    PyPI version Python versions build

    @@ -625,16 +667,16 @@

    Installation
    pip install aio-fluid
     
    -

    To install all the dependencies, you can use the full extra:

    -
    pip install aio-fluid[full]
    +

    To install all the dependencies:

    +
    pip install aio-fluid[cli, db, http, log]
     

    this includes the following extra dependencies:

    Development

    You can run the examples via

    diff --git a/objects.inv b/objects.inv index 3b118ea..d928812 100644 Binary files a/objects.inv and b/objects.inv differ diff --git a/reference/index.html b/reference/index.html index 5571f8c..ca25c7c 100644 --- a/reference/index.html +++ b/reference/index.html @@ -22,7 +22,7 @@ - Reference - Code API - Aio Fluid + Introduction - Aio Fluid @@ -76,7 +76,7 @@
    - + Skip to content @@ -112,7 +112,7 @@
    - Reference - Code API + Introduction
    @@ -325,7 +325,7 @@ - Reference - Code API + Introduction @@ -399,6 +399,48 @@ + + + + + + +
  • + + + + + Task Consumer + + + + +
  • + + + + + + + + + + +
  • + + + + + Workers + + + + +
  • + + + + @@ -473,7 +515,7 @@ - Database + Async Database @@ -547,10 +589,8 @@ -

    Reference - Code API

    -

    Here's the reference or code API, the classes, functions, parameters, attributes, and all the Aio Fluid parts you can use in your applications.

    -

    If you want to learn Aio Fluid you are much better off reading the -Api Fluid Tutorials.

    +

    Introduction

    +

    Here's the reference or code API, the classes, functions, parameters, attributes, and all the aio-fluid parts you can use in your applications.

    diff --git a/reference/task_broker/index.html b/reference/task_broker/index.html index 0ed912b..9804776 100644 --- a/reference/task_broker/index.html +++ b/reference/task_broker/index.html @@ -316,7 +316,7 @@ - Reference - Code API + Introduction @@ -597,6 +597,48 @@ + + + + + + +
  • + + + + + Task Consumer + + + + +
  • + + + + + + + + + + +
  • + + + + + Workers + + + + +
  • + + + + @@ -671,7 +713,7 @@ - Database + Async Database @@ -945,6 +987,11 @@

    Bases: ABC

    + + + + +
    Source code in fluid/scheduler/broker.py
    37
    @@ -1026,7 +1073,7 @@ 

    -

    Names of the task queues

    +

    Names of the task queues

    @@ -1051,7 +1098,7 @@

    -

    Queue a task

    +

    Queue a task

    Source code in fluid/scheduler/broker.py @@ -1084,7 +1131,7 @@

    -

    Get a Task run from the task queue

    +

    Get a Task run from the task queue

    Source code in fluid/scheduler/broker.py @@ -1117,7 +1164,7 @@

    -

    Length of task queues

    +

    Length of task queues

    Source code in fluid/scheduler/broker.py @@ -1150,7 +1197,7 @@

    -

    List of TaskInfo objects

    +

    List of TaskInfo objects

    Source code in fluid/scheduler/broker.py @@ -1183,7 +1230,7 @@

    -

    Update a task dynamic parameters

    +

    Update a task dynamic parameters

    Source code in fluid/scheduler/broker.py @@ -1216,7 +1263,7 @@

    -

    Close the broker on shutdown

    +

    Close the broker on shutdown

    Source code in fluid/scheduler/broker.py @@ -1248,7 +1295,7 @@

    -

    Create a lock

    +

    Create a lock

    Source code in fluid/scheduler/broker.py @@ -1418,7 +1465,7 @@

    -

    Enable or disable a registered task

    +

    Enable or disable a registered task

    Source code in fluid/scheduler/broker.py diff --git a/reference/task_manager/index.html b/reference/task_manager/index.html index 12a2ceb..2b51c50 100644 --- a/reference/task_manager/index.html +++ b/reference/task_manager/index.html @@ -316,7 +316,7 @@ - Reference - Code API + Introduction @@ -579,6 +579,48 @@ + + + + + + +
  • + + + + + Task Consumer + + + + +
  • + + + + + + + + + + +
  • + + + + + Workers + + + + +
  • + + + + @@ -653,7 +695,7 @@ - Database + Async Database @@ -907,7 +949,12 @@

    -

    The task manager is the main entry point for managing tasks

    +

    The task manager is the main entry point for managing tasks

    + + + + +
    Source code in fluid/scheduler/consumer.py @@ -1108,7 +1155,7 @@

    -

    Execute a task and wait for it to finish

    +

    Execute a task and wait for it to finish

    Source code in fluid/scheduler/consumer.py @@ -1196,7 +1243,7 @@

    -

    Register a task with the task manager

    +

    Register a task with the task manager

    Only tasks registered can be executed by a task manager

    @@ -1235,7 +1282,7 @@

    -

    Queue a task for execution

    +

    Queue a task for execution

    This methods fires two events:

    • queue: when the task is about to be queued
    • @@ -1298,7 +1345,7 @@

      -

      Create a TaskRun in init state

      +

      Create a TaskRun in init state

      Source code in fluid/scheduler/consumer.py @@ -1388,7 +1435,7 @@

      -

      Create the task manager command line interface

      +

      Create the task manager command line interface

      Source code in fluid/scheduler/consumer.py diff --git a/reference/task_run/index.html b/reference/task_run/index.html index 1d77d35..f5a05e7 100644 --- a/reference/task_run/index.html +++ b/reference/task_run/index.html @@ -14,7 +14,7 @@ - + @@ -316,7 +316,7 @@ - Reference - Code API + Introduction @@ -651,6 +651,48 @@ + + + + + + +
    • + + + + + Task Consumer + + + + +
    • + + + + + + + + + + +
    • + + + + + Workers + + + + +
    • + + + +

    @@ -725,7 +767,7 @@ - Database + Async Database @@ -1052,7 +1094,12 @@

    Bases: BaseModel

    -

    A TaskRun contains all the data generated by a Task run

    +

    A TaskRun contains all the data generated by a Task run

    + + + + + @@ -1185,7 +1232,7 @@

    -
    task_manager = Field(exclude=True, repr=False)
    +
    task_manager = Field(exclude=True, repr=False)
     
    diff --git a/reference/tast_consumer/index.html b/reference/tast_consumer/index.html new file mode 100644 index 0000000..b2ae10c --- /dev/null +++ b/reference/tast_consumer/index.html @@ -0,0 +1,2773 @@ + + + + + + + + + + + + + + + + + + + + + + + + + Task Consumer - Aio Fluid + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
    + +
    + + + + + + +
    + + +
    + +
    + + + + + + +
    +
    + + + +
    +
    +
    + + + + + +
    +
    +
    + + + + + + + +
    +
    + + + + + + + +

    Task Consumer

    +

    The task consumer is a TaskManager which is also a Workers that consumes tasks from the task queue and executes them. It can be imported from fluid.scheduler:

    +
    from fastapi.scheduler import TaskConsumer
    +
    + + + +
    + + + +

    + fluid.scheduler.TaskConsumer + + +

    +
    TaskConsumer(**config)
    +
    + +
    +

    + Bases: TaskManager, Workers

    + + +

    The Task Consumer is a Task Manager responsible for consuming tasks +from a task queue

    + + + + + + +
    + Source code in fluid/scheduler/consumer.py +
    163
    +164
    +165
    +166
    +167
    +168
    +169
    +170
    +171
    +172
    +173
    +174
    +175
    +176
    +177
    +178
    +179
    def __init__(self, **config: Any) -> None:
    +    super().__init__(**config)
    +    Workers.__init__(self)
    +    self._concurrent_tasks: dict[str, dict[str, TaskRun]] = defaultdict(dict)
    +    self._task_to_queue: deque[str | Task] = deque()
    +    self._priority_task_run_queue: deque[TaskRun] = deque()
    +    self._queue_tasks_worker = WorkerFunction(
    +        self._queue_task, name="queue-task-worker"
    +    )
    +    self.add_workers(self._queue_tasks_worker)
    +    for i in range(self.config.max_concurrent_tasks):
    +        worker_name = f"task-worker-{i+1}"
    +        self.add_workers(
    +            WorkerFunction(
    +                partial(self._consume_tasks, worker_name), name=worker_name
    +            )
    +        )
    +
    +
    + + + +
    + + + + + + + +
    + + + +

    + worker_name + + + + property + + +

    +
    worker_name
    +
    + +
    +
    + +
    + +
    + + + +

    + num_workers + + + + property + + +

    +
    num_workers
    +
    + +
    +
    + +
    + +
    + + + +

    + running + + + + property + + +

    +
    running
    +
    + +
    +
    + +
    + +
    + + + +

    + state + + + + instance-attribute + + +

    +
    state = {}
    +
    + +
    +
    + +
    + +
    + + + +

    + config + + + + instance-attribute + + +

    +
    config = TaskManagerConfig(**kwargs)
    +
    + +
    +
    + +
    + +
    + + + +

    + dispatcher + + + + instance-attribute + + +

    +
    dispatcher = TaskDispatcher()
    +
    + +
    +
    + +
    + +
    + + + +

    + broker + + + + instance-attribute + + +

    +
    broker = from_url(broker_url)
    +
    + +
    +
    + +
    + +
    + + + +

    + registry + + + + property + + +

    +
    registry
    +
    + +
    +
    + +
    + +
    + + + +

    + type + + + + property + + +

    +
    type
    +
    + +
    +
    + +
    + +
    + + + +

    + num_concurrent_tasks + + + + property + + +

    +
    num_concurrent_tasks
    +
    + +
    + +

    The number of concurrent_tasks running in the consumer

    +
    + +
    + + + +
    + + +

    + status + + + + async + + +

    +
    status()
    +
    + +
    + +
    + Source code in fluid/utils/worker.py +
    288
    +289
    async def status(self) -> dict:
    +    return await self._workers.status()
    +
    +
    +
    + +
    + +
    + + +

    + gracefully_stop + + +

    +
    gracefully_stop()
    +
    + +
    + +
    + Source code in fluid/utils/worker.py +
    285
    +286
    def gracefully_stop(self) -> None:
    +    self._workers.gracefully_stop()
    +
    +
    +
    + +
    + +
    + + +

    + is_running + + +

    +
    is_running()
    +
    + +
    + +
    + Source code in fluid/utils/worker.py +
    79
    +80
    def is_running(self) -> bool:
    +    return self._running
    +
    +
    +
    + +
    + +
    + + +

    + is_stopping + + +

    +
    is_stopping()
    +
    + +
    + +
    + Source code in fluid/utils/worker.py +
    275
    +276
    def is_stopping(self) -> bool:
    +    return self._workers.is_stopping()
    +
    +
    +
    + +
    + +
    + + +

    + run + + + + async + + +

    +
    run()
    +
    + +
    + +

    run the workers

    + +
    + Source code in fluid/utils/worker.py +
    414
    +415
    +416
    +417
    +418
    +419
    +420
    +421
    +422
    +423
    +424
    async def run(self) -> None:
    +    """run the workers"""
    +    with self.start_running():
    +        async with self.safe_run():
    +            workers, _ = self._workers.workers_tasks()
    +            self._workers.workers = tuple(workers)
    +            self._workers.tasks = tuple(
    +                self.create_task(worker) for worker in workers
    +            )
    +            await asyncio.gather(*self._workers.tasks)
    +        await self.shutdown()
    +
    +
    +
    + +
    + +
    + + +

    + start_running + + +

    +
    start_running()
    +
    + +
    + +
    + Source code in fluid/utils/worker.py +
    82
    +83
    +84
    +85
    +86
    +87
    +88
    +89
    +90
    +91
    +92
    @contextmanager
    +def start_running(self) -> Generator:
    +    if self._running:
    +        raise RuntimeError("Worker is already running")
    +    self._running = True
    +    try:
    +        logger.info("%s started running", self.worker_name)
    +        yield
    +    finally:
    +        self._running = False
    +        logger.warning("%s stopped running", self.worker_name)
    +
    +
    +
    + +
    + +
    + + +

    + add_workers + + +

    +
    add_workers(*workers)
    +
    + +
    + +

    add workers to the workers

    + +
    + Source code in fluid/utils/worker.py +
    407
    +408
    +409
    +410
    +411
    +412
    def add_workers(self, *workers: Worker) -> None:
    +    """add workers to the workers"""
    +    workers_, _ = self._workers.workers_tasks()
    +    for worker in workers:
    +        if worker not in workers_:
    +            workers_.append(worker)
    +
    +
    +
    + +
    + +
    + + +

    + wait_for_exit + + + + async + + +

    +
    wait_for_exit()
    +
    + +
    + +
    + Source code in fluid/utils/worker.py +
    426
    +427
    +428
    async def wait_for_exit(self) -> None:
    +    if self._workers_task is not None:
    +        await self._workers_task
    +
    +
    +
    + +
    + +
    + + +

    + create_task + + +

    +
    create_task(worker)
    +
    + +
    + +
    + Source code in fluid/utils/worker.py +
    291
    +292
    +293
    +294
    def create_task(self, worker: Worker) -> asyncio.Task:
    +    return asyncio.create_task(
    +        self._run_worker(worker), name=f"{self.worker_name}-{worker.worker_name}"
    +    )
    +
    +
    +
    + +
    + +
    + + +

    + on_shutdown + + + + async + + +

    +
    on_shutdown()
    +
    + +
    + +
    + Source code in fluid/scheduler/consumer.py +
    82
    +83
    async def on_shutdown(self) -> None:
    +    await self.broker.close()
    +
    +
    +
    + +
    + +
    + + +

    + shutdown + + + + async + + +

    +
    shutdown()
    +
    + +
    + +

    shutdown the workers

    + +
    + Source code in fluid/utils/worker.py +
    299
    +300
    +301
    +302
    +303
    +304
    +305
    +306
    +307
    +308
    +309
    +310
    +311
    +312
    +313
    +314
    +315
    +316
    +317
    +318
    +319
    +320
    +321
    +322
    +323
    +324
    +325
    +326
    +327
    +328
    +329
    +330
    +331
    +332
    async def shutdown(self) -> None:
    +    """shutdown the workers"""
    +    if self._has_shutdown:
    +        return
    +    self._has_shutdown = True
    +    logger.warning(
    +        "gracefully stopping %d workers: %s",
    +        self.num_workers,
    +        ", ".join(w.worker_name for w in self._workers.workers),
    +    )
    +    self.gracefully_stop()
    +    try:
    +        async with async_timeout.timeout(self._stopping_grace_period):
    +            await self.wait_for_exit()
    +        await self.on_shutdown()
    +        return
    +    except asyncio.TimeoutError:
    +        logger.warning(
    +            "could not stop workers %s gracefully after %s"
    +            " seconds - force shutdown",
    +            ", ".join(
    +                task.get_name() for task in self._workers.tasks if not task.done()
    +            ),
    +            self._stopping_grace_period,
    +        )
    +    except asyncio.CancelledError:
    +        pass
    +    self._force_shutdown = True
    +    self._workers.cancel()
    +    try:
    +        await self.wait_for_exit()
    +    except asyncio.CancelledError:
    +        pass
    +    await self.on_shutdown()
    +
    +
    +
    + +
    + +
    + + +

    + bail_out + + +

    +
    bail_out(reason, code=1)
    +
    + +
    + +
    + Source code in fluid/utils/worker.py +
    334
    +335
    def bail_out(self, reason: str, code: int = 1) -> None:
    +    self.gracefully_stop()
    +
    +
    +
    + +
    + +
    + + +

    + safe_run + + + + async + + +

    +
    safe_run()
    +
    + +
    + +

    Context manager to run a worker safely

    + +
    + Source code in fluid/utils/worker.py +
    337
    +338
    +339
    +340
    +341
    +342
    +343
    +344
    +345
    +346
    +347
    +348
    +349
    +350
    +351
    +352
    +353
    +354
    @asynccontextmanager
    +async def safe_run(self) -> AsyncGenerator:
    +    """Context manager to run a worker safely"""
    +    try:
    +        yield
    +    except asyncio.CancelledError:
    +        if self._force_shutdown:
    +            # we are shutting down, this is expected
    +            pass
    +        raise
    +    except Exception as e:
    +        reason = f"unhandled exception while running workers: {e}"
    +        logger.exception(reason)
    +        asyncio.get_event_loop().call_soon(self.bail_out, reason, 2)
    +    else:
    +        # worker finished without error
    +        # make sure we are shutting down
    +        asyncio.get_event_loop().call_soon(self.bail_out, "worker exit", 1)
    +
    +
    +
    + +
    + +
    + + +

    + remove_workers + + +

    +
    remove_workers(*workers)
    +
    + +
    + +

    remove workers from the workers

    + +
    + Source code in fluid/utils/worker.py +
    430
    +431
    +432
    +433
    +434
    +435
    +436
    +437
    def remove_workers(self, *workers: Worker) -> None:
    +    "remove workers from the workers"
    +    workers_, _ = self._workers.workers_tasks()
    +    for worker in workers:
    +        try:
    +            workers_.remove(worker)
    +        except ValueError:
    +            pass
    +
    +
    +
    + +
    + +
    + + +

    + startup + + + + async + + +

    +
    startup()
    +
    + +
    + +

    start the workers

    + +
    + Source code in fluid/utils/worker.py +
    439
    +440
    +441
    +442
    +443
    +444
    +445
    async def startup(self) -> None:
    +    """start the workers"""
    +    if self._workers_task is None:
    +        self._workers_task = asyncio.create_task(self.run(), name=self.worker_name)
    +        for args in self._delayed_callbacks:
    +            self._delayed_callback(*args)
    +        self._delayed_callbacks = []
    +
    +
    +
    + +
    + +
    + + +

    + register_callback + + +

    +
    register_callback(
    +    callback, seconds, jitter=0.0, periodic=False
    +)
    +
    + +
    + +

    Register a callback

    +

    The callback can be periodic or not.

    + +
    + Source code in fluid/utils/worker.py +
    447
    +448
    +449
    +450
    +451
    +452
    +453
    +454
    +455
    +456
    +457
    +458
    +459
    +460
    +461
    +462
    +463
    +464
    +465
    +466
    +467
    def register_callback(
    +    self,
    +    callback: Callable[[], None],
    +    seconds: float,
    +    jitter: float = 0.0,
    +    periodic: bool | float = False,
    +) -> None:
    +    """Register a callback
    +
    +    The callback can be periodic or not.
    +    """
    +    if periodic is True:
    +        periodic_float = seconds
    +    elif periodic is False:
    +        periodic_float = 0.0
    +    else:
    +        periodic_float = periodic
    +    if not self.running:
    +        self._delayed_callbacks.append((callback, seconds, jitter, periodic_float))
    +    else:
    +        self._delayed_callback(callback, seconds, jitter, periodic_float)
    +
    +
    +
    + +
    + +
    + + +

    + enter_async_context + + + + async + + +

    +
    enter_async_context(cm)
    +
    + +
    + +
    + Source code in fluid/scheduler/consumer.py +
    65
    +66
    async def enter_async_context(self, cm: Any) -> Any:
    +    return await self._stack.enter_async_context(cm)
    +
    +
    +
    + +
    + +
    + + +

    + execute + + + + async + + +

    +
    execute(task, **params)
    +
    + +
    + +

    Execute a task and wait for it to finish

    + +
    + Source code in fluid/scheduler/consumer.py +
    76
    +77
    +78
    +79
    +80
    async def execute(self, task: Task | str, **params: Any) -> TaskRun:
    +    """Execute a task and wait for it to finish"""
    +    task_run = self.create_task_run(task, **params)
    +    await task_run.execute()
    +    return task_run
    +
    +
    +
    + +
    + +
    + + +

    + execute_sync + + +

    +
    execute_sync(task, **params)
    +
    + +
    + +
    + Source code in fluid/scheduler/consumer.py +
    85
    +86
    +87
    +88
    def execute_sync(self, task: Task | str, **params: Any) -> TaskRun:
    +    return asyncio.get_event_loop().run_until_complete(
    +        self._execute_and_exit(task, **params)
    +    )
    +
    +
    +
    + +
    + +
    + + +

    + register_task + + +

    +
    register_task(task)
    +
    + +
    + +

    Register a task with the task manager

    +

    Only tasks registered can be executed by a task manager

    + +
    + Source code in fluid/scheduler/consumer.py +
    90
    +91
    +92
    +93
    +94
    +95
    def register_task(self, task: Task) -> None:
    +    """Register a task with the task manager
    +
    +    Only tasks registered can be executed by a task manager
    +    """
    +    self.broker.register_task(task)
    +
    +
    +
    + +
    + +
    + + +

    + queue + + + + async + + +

    +
    queue(task, priority=None, **params)
    +
    + +
    + +

    Queue a task for execution

    +

    This methods fires two events:

    +
      +
    • queue: when the task is about to be queued
    • +
    • queued: after the task is queued
    • +
    + +
    + Source code in fluid/scheduler/consumer.py +
     97
    + 98
    + 99
    +100
    +101
    +102
    +103
    +104
    +105
    +106
    +107
    +108
    +109
    +110
    +111
    +112
    +113
    +114
    async def queue(
    +    self,
    +    task: str | Task,
    +    priority: TaskPriority | None = None,
    +    **params: Any,
    +) -> TaskRun:
    +    """Queue a task for execution
    +
    +    This methods fires two events:
    +
    +    - queue: when the task is about to be queued
    +    - queued: after the task is queued
    +    """
    +    task_run = self.create_task_run(task, priority=priority, **params)
    +    self.dispatcher.dispatch(task_run)
    +    task_run.set_state(TaskState.queued)
    +    await self.broker.queue_task(task_run)
    +    return task_run
    +
    +
    +
    + +
    + +
    + + +

    + create_task_run + + +

    +
    create_task_run(task, run_id='', priority=None, **params)
    +
    + +
    + +

    Create a TaskRun in init state

    + +
    + Source code in fluid/scheduler/consumer.py +
    116
    +117
    +118
    +119
    +120
    +121
    +122
    +123
    +124
    +125
    +126
    +127
    +128
    +129
    +130
    +131
    +132
    +133
    def create_task_run(
    +    self,
    +    task: str | Task,
    +    run_id: str = "",
    +    priority: TaskPriority | None = None,
    +    **params: Any,
    +) -> TaskRun:
    +    """Create a TaskRun in `init` state"""
    +    if isinstance(task, str):
    +        task = self.broker.task_from_registry(task)
    +    run_id = run_id or self.broker.new_uuid()
    +    return TaskRun(
    +        id=run_id,
    +        task=task,
    +        priority=priority or task.priority,
    +        params=params,
    +        task_manager=self,
    +    )
    +
    +
    +
    + +
    + +
    + + +

    + register_from_module + + +

    +
    register_from_module(module)
    +
    + +
    + +
    + Source code in fluid/scheduler/consumer.py +
    135
    +136
    +137
    +138
    +139
    +140
    def register_from_module(self, module: Any) -> None:
    +    for name in dir(module):
    +        if name.startswith("_"):
    +            continue
    +        if isinstance(obj := getattr(module, name), Task):
    +            self.register_task(obj)
    +
    +
    +
    + +
    + +
    + + +

    + cli + + +

    +
    cli(**kwargs)
    +
    + +
    + +

    Create the task manager command line interface

    + +
    + Source code in fluid/scheduler/consumer.py +
    142
    +143
    +144
    +145
    +146
    +147
    +148
    +149
    +150
    +151
    def cli(self, **kwargs: Any) -> Any:
    +    """Create the task manager command line interface"""
    +    try:
    +        from fluid.scheduler.cli import TaskManagerCLI
    +    except ImportError:
    +        raise ImportError(
    +            "TaskManagerCLI is not available - "
    +            "install with `pip install aio-fluid[cli]`"
    +        ) from None
    +    return TaskManagerCLI(self, **kwargs)
    +
    +
    +
    + +
    + +
    + + +

    + sync_queue + + +

    +
    sync_queue(task)
    +
    + +
    + +
    + Source code in fluid/scheduler/consumer.py +
    186
    +187
    def sync_queue(self, task: str | Task) -> None:
    +    self._task_to_queue.appendleft(task)
    +
    +
    +
    + +
    + +
    + + +

    + sync_priority_queue + + +

    +
    sync_priority_queue(task)
    +
    + +
    + +
    + Source code in fluid/scheduler/consumer.py +
    189
    +190
    def sync_priority_queue(self, task: str | Task) -> None:
    +    self._priority_task_run_queue.appendleft(self.create_task_run(task))
    +
    +
    +
    + +
    + +
    + + +

    + num_concurrent_tasks_for + + +

    +
    num_concurrent_tasks_for(task_name)
    +
    + +
    + +

    The number of concurrent tasks for a given task_name

    + +
    + Source code in fluid/scheduler/consumer.py +
    192
    +193
    +194
    def num_concurrent_tasks_for(self, task_name: str) -> int:
    +    """The number of concurrent tasks for a given task_name"""
    +    return len(self._concurrent_tasks[task_name])
    +
    +
    +
    + +
    + +
    + + +

    + queue_and_wait + + + + async + + +

    +
    queue_and_wait(task, *, timeout=2, **params)
    +
    + +
    + +

    Queue a task and wait for it to finish

    + +
    + Source code in fluid/scheduler/consumer.py +
    196
    +197
    +198
    +199
    +200
    +201
    async def queue_and_wait(
    +    self, task: str, *, timeout: int = 2, **params: Any
    +) -> TaskRun:
    +    """Queue a task and wait for it to finish"""
    +    with TaskRunWaiter(self) as waiter:
    +        return await waiter.wait(await self.queue(task, **params), timeout=timeout)
    +
    +
    +
    + +
    + + + +
    + +
    + +
    + + + + + + + + + + + + + +
    +
    + + + +
    + +
    + + + +
    +
    +
    +
    + + + + + + + + + + \ No newline at end of file diff --git a/reference/workers/index.html b/reference/workers/index.html new file mode 100644 index 0000000..57fe5a1 --- /dev/null +++ b/reference/workers/index.html @@ -0,0 +1,3157 @@ + + + + + + + + + + + + + + + + + + + + + + + + + Workers - Aio Fluid + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
    + +
    + + + + + + +
    + + +
    + +
    + + + + + + +
    +
    + + + +
    +
    +
    + + + + + +
    +
    +
    + + + +
    +
    +
    + + + +
    +
    +
    + + + +
    +
    + + + + + + + +

    Workers

    +

    Workers are the main building block for asynchronous programming with aio-fluid. They are responsible for running tasks and managing their lifecycle. +There are several worker classes which can be imported from fluid.utils.worker:

    +
    from fastapi.utils.worker import StoppingWorker
    +
    + + + +
    + + + +

    + fluid.utils.worker.Worker + + +

    +
    Worker(name='')
    +
    + +
    +

    + Bases: ABC

    + + +

    The base class of a worker that can be run

    + + + + + + +
    + Source code in fluid/utils/worker.py +
    39
    +40
    def __init__(self, name: str = "") -> None:
    +    self._name: str = name or underscore(type(self).__name__)
    +
    +
    + + + +
    + + + + + + + +
    + + + +

    + worker_name + + + + property + + +

    +
    worker_name
    +
    + +
    +
    + +
    + +
    + + + +

    + num_workers + + + + property + + +

    +
    num_workers
    +
    + +
    +
    + +
    + + + +
    + + +

    + status + + + + abstractmethod + async + + +

    +
    status()
    +
    + +
    + +

    Get the status of the worker.

    + +
    + Source code in fluid/utils/worker.py +
    50
    +51
    +52
    +53
    +54
    @abstractmethod
    +async def status(self) -> dict:
    +    """
    +    Get the status of the worker.
    +    """
    +
    +
    +
    + +
    + +
    + + +

    + gracefully_stop + + + + abstractmethod + + +

    +
    gracefully_stop()
    +
    + +
    + +

    gracefully stop the worker

    + +
    + Source code in fluid/utils/worker.py +
    56
    +57
    +58
    @abstractmethod
    +def gracefully_stop(self) -> None:
    +    "gracefully stop the worker"
    +
    +
    +
    + +
    + +
    + + +

    + is_running + + + + abstractmethod + + +

    +
    is_running()
    +
    + +
    + +

    Is the worker running?

    + +
    + Source code in fluid/utils/worker.py +
    60
    +61
    +62
    @abstractmethod
    +def is_running(self) -> bool:
    +    """Is the worker running?"""
    +
    +
    +
    + +
    + +
    + + +

    + is_stopping + + + + abstractmethod + + +

    +
    is_stopping()
    +
    + +
    + +

    Is the worker stopping?

    + +
    + Source code in fluid/utils/worker.py +
    64
    +65
    +66
    @abstractmethod
    +def is_stopping(self) -> bool:
    +    """Is the worker stopping?"""
    +
    +
    +
    + +
    + +
    + + +

    + run + + + + abstractmethod + async + + +

    +
    run()
    +
    + +
    + +

    run the worker

    + +
    + Source code in fluid/utils/worker.py +
    68
    +69
    +70
    @abstractmethod
    +async def run(self) -> None:
    +    """run the worker"""
    +
    +
    +
    + +
    + + + +
    + +
    + +
    + +
    + + + +

    + fluid.utils.worker.StoppingWorker + + +

    +
    StoppingWorker(name='')
    +
    + +
    +

    + Bases: RunningWorker

    + + +

    A Worker that can be stopped

    + + + + + + +
    + Source code in fluid/utils/worker.py +
     98
    + 99
    +100
    def __init__(self, name: str = "") -> None:
    +    super().__init__(name)
    +    self._stopping: bool = False
    +
    +
    + + + +
    + + + + + + + +
    + + + +

    + worker_name + + + + property + + +

    +
    worker_name
    +
    + +
    +
    + +
    + +
    + + + +

    + num_workers + + + + property + + +

    +
    num_workers
    +
    + +
    +
    + +
    + + + +
    + + +

    + is_running + + +

    +
    is_running()
    +
    + +
    + +
    + Source code in fluid/utils/worker.py +
    79
    +80
    def is_running(self) -> bool:
    +    return self._running
    +
    +
    +
    + +
    + +
    + + +

    + run + + + + abstractmethod + async + + +

    +
    run()
    +
    + +
    + +

    run the worker

    + +
    + Source code in fluid/utils/worker.py +
    68
    +69
    +70
    @abstractmethod
    +async def run(self) -> None:
    +    """run the worker"""
    +
    +
    +
    + +
    + +
    + + +

    + start_running + + +

    +
    start_running()
    +
    + +
    + +
    + Source code in fluid/utils/worker.py +
    82
    +83
    +84
    +85
    +86
    +87
    +88
    +89
    +90
    +91
    +92
    @contextmanager
    +def start_running(self) -> Generator:
    +    if self._running:
    +        raise RuntimeError("Worker is already running")
    +    self._running = True
    +    try:
    +        logger.info("%s started running", self.worker_name)
    +        yield
    +    finally:
    +        self._running = False
    +        logger.warning("%s stopped running", self.worker_name)
    +
    +
    +
    + +
    + +
    + + +

    + is_stopping + + +

    +
    is_stopping()
    +
    + +
    + +
    + Source code in fluid/utils/worker.py +
    102
    +103
    def is_stopping(self) -> bool:
    +    return self._stopping
    +
    +
    +
    + +
    + +
    + + +

    + gracefully_stop + + +

    +
    gracefully_stop()
    +
    + +
    + +
    + Source code in fluid/utils/worker.py +
    105
    +106
    def gracefully_stop(self) -> None:
    +    self._stopping = True
    +
    +
    +
    + +
    + +
    + + +

    + status + + + + async + + +

    +
    status()
    +
    + +
    + +
    + Source code in fluid/utils/worker.py +
    108
    +109
    async def status(self) -> dict:
    +    return {"stopping": self.is_stopping(), "running": self.is_running()}
    +
    +
    +
    + +
    + + + +
    + +
    + +
    + +
    + + + +

    + fluid.utils.worker.WorkerFunction + + +

    +
    WorkerFunction(run_function, heartbeat=0, name='')
    +
    + +
    +

    + Bases: StoppingWorker

    + + + + + + + +
    + Source code in fluid/utils/worker.py +
    113
    +114
    +115
    +116
    +117
    +118
    +119
    +120
    +121
    def __init__(
    +    self,
    +    run_function: Callable[[], Awaitable[None]],
    +    heartbeat: float | int = 0,
    +    name: str = "",
    +) -> None:
    +    super().__init__(name=name)
    +    self._run_function = run_function
    +    self._heartbeat = heartbeat
    +
    +
    + + + +
    + + + + + + + +
    + + + +

    + worker_name + + + + property + + +

    +
    worker_name
    +
    + +
    +
    + +
    + +
    + + + +

    + num_workers + + + + property + + +

    +
    num_workers
    +
    + +
    +
    + +
    + + + +
    + + +

    + status + + + + async + + +

    +
    status()
    +
    + +
    + +
    + Source code in fluid/utils/worker.py +
    108
    +109
    async def status(self) -> dict:
    +    return {"stopping": self.is_stopping(), "running": self.is_running()}
    +
    +
    +
    + +
    + +
    + + +

    + gracefully_stop + + +

    +
    gracefully_stop()
    +
    + +
    + +
    + Source code in fluid/utils/worker.py +
    105
    +106
    def gracefully_stop(self) -> None:
    +    self._stopping = True
    +
    +
    +
    + +
    + +
    + + +

    + is_running + + +

    +
    is_running()
    +
    + +
    + +
    + Source code in fluid/utils/worker.py +
    79
    +80
    def is_running(self) -> bool:
    +    return self._running
    +
    +
    +
    + +
    + +
    + + +

    + is_stopping + + +

    +
    is_stopping()
    +
    + +
    + +
    + Source code in fluid/utils/worker.py +
    102
    +103
    def is_stopping(self) -> bool:
    +    return self._stopping
    +
    +
    +
    + +
    + +
    + + +

    + start_running + + +

    +
    start_running()
    +
    + +
    + +
    + Source code in fluid/utils/worker.py +
    82
    +83
    +84
    +85
    +86
    +87
    +88
    +89
    +90
    +91
    +92
    @contextmanager
    +def start_running(self) -> Generator:
    +    if self._running:
    +        raise RuntimeError("Worker is already running")
    +    self._running = True
    +    try:
    +        logger.info("%s started running", self.worker_name)
    +        yield
    +    finally:
    +        self._running = False
    +        logger.warning("%s stopped running", self.worker_name)
    +
    +
    +
    + +
    + +
    + + +

    + run + + + + async + + +

    +
    run()
    +
    + +
    + +
    + Source code in fluid/utils/worker.py +
    123
    +124
    +125
    +126
    +127
    async def run(self) -> None:
    +    with self.start_running():
    +        while not self.is_stopping():
    +            await self._run_function()
    +            await asyncio.sleep(self._heartbeat)
    +
    +
    +
    + +
    + + + +
    + +
    + +
    + +
    + + + +

    + fluid.utils.worker.Workers + + +

    +
    Workers(
    +    *workers,
    +    name="",
    +    stopping_grace_period=STOPPING_GRACE_PERIOD
    +)
    +
    + +
    +

    + Bases: MultipleWorkers

    + + +

    A worker managing several workers

    + + + + + + +
    + Source code in fluid/utils/worker.py +
    388
    +389
    +390
    +391
    +392
    +393
    +394
    +395
    +396
    +397
    +398
    +399
    +400
    def __init__(
    +    self,
    +    *workers: Worker,
    +    name: str = "",
    +    stopping_grace_period: int = settings.STOPPING_GRACE_PERIOD,
    +) -> None:
    +    super().__init__(
    +        *workers, name=name, stopping_grace_period=stopping_grace_period
    +    )
    +    self._workers_task: asyncio.Task | None = None
    +    self._delayed_callbacks: list[
    +        tuple[Callable[[], None], float, float, float]
    +    ] = []
    +
    +
    + + + +
    + + + + + + + +
    + + + +

    + worker_name + + + + property + + +

    +
    worker_name
    +
    + +
    +
    + +
    + +
    + + + +

    + num_workers + + + + property + + +

    +
    num_workers
    +
    + +
    +
    + +
    + +
    + + + +

    + running + + + + property + + +

    +
    running
    +
    + +
    +
    + +
    + + + +
    + + +

    + status + + + + async + + +

    +
    status()
    +
    + +
    + +
    + Source code in fluid/utils/worker.py +
    288
    +289
    async def status(self) -> dict:
    +    return await self._workers.status()
    +
    +
    +
    + +
    + +
    + + +

    + gracefully_stop + + +

    +
    gracefully_stop()
    +
    + +
    + +
    + Source code in fluid/utils/worker.py +
    285
    +286
    def gracefully_stop(self) -> None:
    +    self._workers.gracefully_stop()
    +
    +
    +
    + +
    + +
    + + +

    + is_running + + +

    +
    is_running()
    +
    + +
    + +
    + Source code in fluid/utils/worker.py +
    79
    +80
    def is_running(self) -> bool:
    +    return self._running
    +
    +
    +
    + +
    + +
    + + +

    + is_stopping + + +

    +
    is_stopping()
    +
    + +
    + +
    + Source code in fluid/utils/worker.py +
    275
    +276
    def is_stopping(self) -> bool:
    +    return self._workers.is_stopping()
    +
    +
    +
    + +
    + +
    + + +

    + start_running + + +

    +
    start_running()
    +
    + +
    + +
    + Source code in fluid/utils/worker.py +
    82
    +83
    +84
    +85
    +86
    +87
    +88
    +89
    +90
    +91
    +92
    @contextmanager
    +def start_running(self) -> Generator:
    +    if self._running:
    +        raise RuntimeError("Worker is already running")
    +    self._running = True
    +    try:
    +        logger.info("%s started running", self.worker_name)
    +        yield
    +    finally:
    +        self._running = False
    +        logger.warning("%s stopped running", self.worker_name)
    +
    +
    +
    + +
    + +
    + + +

    + create_task + + +

    +
    create_task(worker)
    +
    + +
    + +
    + Source code in fluid/utils/worker.py +
    291
    +292
    +293
    +294
    def create_task(self, worker: Worker) -> asyncio.Task:
    +    return asyncio.create_task(
    +        self._run_worker(worker), name=f"{self.worker_name}-{worker.worker_name}"
    +    )
    +
    +
    +
    + +
    + +
    + + +

    + on_shutdown + + + + async + + +

    +
    on_shutdown()
    +
    + +
    + +

    called after the workers are stopped

    + +
    + Source code in fluid/utils/worker.py +
    296
    +297
    async def on_shutdown(self) -> None:
    +    """called after the workers are stopped"""
    +
    +
    +
    + +
    + +
    + + +

    + shutdown + + + + async + + +

    +
    shutdown()
    +
    + +
    + +

    shutdown the workers

    + +
    + Source code in fluid/utils/worker.py +
    299
    +300
    +301
    +302
    +303
    +304
    +305
    +306
    +307
    +308
    +309
    +310
    +311
    +312
    +313
    +314
    +315
    +316
    +317
    +318
    +319
    +320
    +321
    +322
    +323
    +324
    +325
    +326
    +327
    +328
    +329
    +330
    +331
    +332
    async def shutdown(self) -> None:
    +    """shutdown the workers"""
    +    if self._has_shutdown:
    +        return
    +    self._has_shutdown = True
    +    logger.warning(
    +        "gracefully stopping %d workers: %s",
    +        self.num_workers,
    +        ", ".join(w.worker_name for w in self._workers.workers),
    +    )
    +    self.gracefully_stop()
    +    try:
    +        async with async_timeout.timeout(self._stopping_grace_period):
    +            await self.wait_for_exit()
    +        await self.on_shutdown()
    +        return
    +    except asyncio.TimeoutError:
    +        logger.warning(
    +            "could not stop workers %s gracefully after %s"
    +            " seconds - force shutdown",
    +            ", ".join(
    +                task.get_name() for task in self._workers.tasks if not task.done()
    +            ),
    +            self._stopping_grace_period,
    +        )
    +    except asyncio.CancelledError:
    +        pass
    +    self._force_shutdown = True
    +    self._workers.cancel()
    +    try:
    +        await self.wait_for_exit()
    +    except asyncio.CancelledError:
    +        pass
    +    await self.on_shutdown()
    +
    +
    +
    + +
    + +
    + + +

    + bail_out + + +

    +
    bail_out(reason, code=1)
    +
    + +
    + +
    + Source code in fluid/utils/worker.py +
    334
    +335
    def bail_out(self, reason: str, code: int = 1) -> None:
    +    self.gracefully_stop()
    +
    +
    +
    + +
    + +
    + + +

    + safe_run + + + + async + + +

    +
    safe_run()
    +
    + +
    + +

    Context manager to run a worker safely

    + +
    + Source code in fluid/utils/worker.py +
    337
    +338
    +339
    +340
    +341
    +342
    +343
    +344
    +345
    +346
    +347
    +348
    +349
    +350
    +351
    +352
    +353
    +354
    @asynccontextmanager
    +async def safe_run(self) -> AsyncGenerator:
    +    """Context manager to run a worker safely"""
    +    try:
    +        yield
    +    except asyncio.CancelledError:
    +        if self._force_shutdown:
    +            # we are shutting down, this is expected
    +            pass
    +        raise
    +    except Exception as e:
    +        reason = f"unhandled exception while running workers: {e}"
    +        logger.exception(reason)
    +        asyncio.get_event_loop().call_soon(self.bail_out, reason, 2)
    +    else:
    +        # worker finished without error
    +        # make sure we are shutting down
    +        asyncio.get_event_loop().call_soon(self.bail_out, "worker exit", 1)
    +
    +
    +
    + +
    + +
    + + +

    + add_workers + + +

    +
    add_workers(*workers)
    +
    + +
    + +

    add workers to the workers

    + +
    + Source code in fluid/utils/worker.py +
    407
    +408
    +409
    +410
    +411
    +412
    def add_workers(self, *workers: Worker) -> None:
    +    """add workers to the workers"""
    +    workers_, _ = self._workers.workers_tasks()
    +    for worker in workers:
    +        if worker not in workers_:
    +            workers_.append(worker)
    +
    +
    +
    + +
    + +
    + + +

    + run + + + + async + + +

    +
    run()
    +
    + +
    + +

    run the workers

    + +
    + Source code in fluid/utils/worker.py +
    414
    +415
    +416
    +417
    +418
    +419
    +420
    +421
    +422
    +423
    +424
    async def run(self) -> None:
    +    """run the workers"""
    +    with self.start_running():
    +        async with self.safe_run():
    +            workers, _ = self._workers.workers_tasks()
    +            self._workers.workers = tuple(workers)
    +            self._workers.tasks = tuple(
    +                self.create_task(worker) for worker in workers
    +            )
    +            await asyncio.gather(*self._workers.tasks)
    +        await self.shutdown()
    +
    +
    +
    + +
    + +
    + + +

    + wait_for_exit + + + + async + + +

    +
    wait_for_exit()
    +
    + +
    + +
    + Source code in fluid/utils/worker.py +
    426
    +427
    +428
    async def wait_for_exit(self) -> None:
    +    if self._workers_task is not None:
    +        await self._workers_task
    +
    +
    +
    + +
    + +
    + + +

    + remove_workers + + +

    +
    remove_workers(*workers)
    +
    + +
    + +

    remove workers from the workers

    + +
    + Source code in fluid/utils/worker.py +
    430
    +431
    +432
    +433
    +434
    +435
    +436
    +437
    def remove_workers(self, *workers: Worker) -> None:
    +    "remove workers from the workers"
    +    workers_, _ = self._workers.workers_tasks()
    +    for worker in workers:
    +        try:
    +            workers_.remove(worker)
    +        except ValueError:
    +            pass
    +
    +
    +
    + +
    + +
    + + +

    + startup + + + + async + + +

    +
    startup()
    +
    + +
    + +

    start the workers

    + +
    + Source code in fluid/utils/worker.py +
    439
    +440
    +441
    +442
    +443
    +444
    +445
    async def startup(self) -> None:
    +    """start the workers"""
    +    if self._workers_task is None:
    +        self._workers_task = asyncio.create_task(self.run(), name=self.worker_name)
    +        for args in self._delayed_callbacks:
    +            self._delayed_callback(*args)
    +        self._delayed_callbacks = []
    +
    +
    +
    + +
    + +
    + + +

    + register_callback + + +

    +
    register_callback(
    +    callback, seconds, jitter=0.0, periodic=False
    +)
    +
    + +
    + +

    Register a callback

    +

    The callback can be periodic or not.

    + +
    + Source code in fluid/utils/worker.py +
    447
    +448
    +449
    +450
    +451
    +452
    +453
    +454
    +455
    +456
    +457
    +458
    +459
    +460
    +461
    +462
    +463
    +464
    +465
    +466
    +467
    def register_callback(
    +    self,
    +    callback: Callable[[], None],
    +    seconds: float,
    +    jitter: float = 0.0,
    +    periodic: bool | float = False,
    +) -> None:
    +    """Register a callback
    +
    +    The callback can be periodic or not.
    +    """
    +    if periodic is True:
    +        periodic_float = seconds
    +    elif periodic is False:
    +        periodic_float = 0.0
    +    else:
    +        periodic_float = periodic
    +    if not self.running:
    +        self._delayed_callbacks.append((callback, seconds, jitter, periodic_float))
    +    else:
    +        self._delayed_callback(callback, seconds, jitter, periodic_float)
    +
    +
    +
    + +
    + + + +
    + +
    + +
    + + + + + + + + + + + + + +
    +
    + + + +
    + +
    + + + +
    +
    +
    +
    + + + + + + + + + + \ No newline at end of file diff --git a/search/search_index.json b/search/search_index.json index 2ba3735..bf68bf8 100644 --- a/search/search_index.json +++ b/search/search_index.json @@ -1 +1 @@ -{"config":{"lang":["en"],"separator":"[\\s\\-]+","pipeline":["stopWordFilter"]},"docs":[{"location":"","title":"Home","text":"

    Aio Fluid

    Async utilities for backend python services

    Documentation: https://quantmind.github.io/aio-fluid

    Source Code: https://github.com/quantmind/aio-fluid

    "},{"location":"#installation","title":"Installation","text":"

    This is a simple python package you can install via pip:

    pip install aio-fluid\n

    To install all the dependencies, you can use the full extra:

    pip install aio-fluid[full]\n

    this includes the following extra dependencies:

    • cli for the command line interface
    • db for database support
    • http for http client support
    • log for JSON logging support
    "},{"location":"#development","title":"Development","text":"

    You can run the examples via

    poetry run python -m examples.main\n
    "},{"location":"reference/","title":"Reference - Code API","text":"

    Here's the reference or code API, the classes, functions, parameters, attributes, and all the Aio Fluid parts you can use in your applications.

    If you want to learn Aio Fluid you are much better off reading the Api Fluid Tutorials.

    "},{"location":"reference/task_broker/","title":"Task Broker","text":"

    It can be imported from fluid.scheduler:

    from fastapi.scheduler import TaskBroker\n
    "},{"location":"reference/task_broker/#fluid.scheduler.TaskBroker","title":"fluid.scheduler.TaskBroker","text":"
    TaskBroker(url)\n

    Bases: ABC

    Source code in fluid/scheduler/broker.py
    def __init__(self, url: URL) -> None:\n    self.url: URL = url\n    self.registry: TaskRegistry = TaskRegistry()\n
    "},{"location":"reference/task_broker/#fluid.scheduler.TaskBroker.url","title":"url instance-attribute","text":"
    url = url\n
    "},{"location":"reference/task_broker/#fluid.scheduler.TaskBroker.registry","title":"registry instance-attribute","text":"
    registry = TaskRegistry()\n
    "},{"location":"reference/task_broker/#fluid.scheduler.TaskBroker.task_queue_names","title":"task_queue_names abstractmethod property","text":"
    task_queue_names\n

    Names of the task queues

    "},{"location":"reference/task_broker/#fluid.scheduler.TaskBroker.queue_task","title":"queue_task abstractmethod async","text":"
    queue_task(task_run)\n

    Queue a task

    Source code in fluid/scheduler/broker.py
    @abstractmethod\nasync def queue_task(self, task_run: TaskRun) -> None:\n    \"\"\"Queue a task\"\"\"\n
    "},{"location":"reference/task_broker/#fluid.scheduler.TaskBroker.get_task_run","title":"get_task_run abstractmethod async","text":"
    get_task_run(task_manager)\n

    Get a Task run from the task queue

    Source code in fluid/scheduler/broker.py
    @abstractmethod\nasync def get_task_run(self, task_manager: TaskManager) -> TaskRun | None:\n    \"\"\"Get a Task run from the task queue\"\"\"\n
    "},{"location":"reference/task_broker/#fluid.scheduler.TaskBroker.queue_length","title":"queue_length abstractmethod async","text":"
    queue_length()\n

    Length of task queues

    Source code in fluid/scheduler/broker.py
    @abstractmethod\nasync def queue_length(self) -> dict[str, int]:\n    \"\"\"Length of task queues\"\"\"\n
    "},{"location":"reference/task_broker/#fluid.scheduler.TaskBroker.get_tasks_info","title":"get_tasks_info abstractmethod async","text":"
    get_tasks_info(*task_names)\n

    List of TaskInfo objects

    Source code in fluid/scheduler/broker.py
    @abstractmethod\nasync def get_tasks_info(self, *task_names: str) -> list[TaskInfo]:\n    \"\"\"List of TaskInfo objects\"\"\"\n
    "},{"location":"reference/task_broker/#fluid.scheduler.TaskBroker.update_task","title":"update_task abstractmethod async","text":"
    update_task(task, params)\n

    Update a task dynamic parameters

    Source code in fluid/scheduler/broker.py
    @abstractmethod\nasync def update_task(self, task: Task, params: dict[str, Any]) -> TaskInfo:\n    \"\"\"Update a task dynamic parameters\"\"\"\n
    "},{"location":"reference/task_broker/#fluid.scheduler.TaskBroker.close","title":"close abstractmethod async","text":"
    close()\n

    Close the broker on shutdown

    Source code in fluid/scheduler/broker.py
    @abstractmethod\nasync def close(self) -> None:\n    \"\"\"Close the broker on shutdown\"\"\"\n
    "},{"location":"reference/task_broker/#fluid.scheduler.TaskBroker.lock","title":"lock abstractmethod","text":"
    lock(name, timeout=None)\n

    Create a lock

    Source code in fluid/scheduler/broker.py
    @abstractmethod\ndef lock(self, name: str, timeout: float | None = None) -> Lock:\n    \"\"\"Create a lock\"\"\"\n
    "},{"location":"reference/task_broker/#fluid.scheduler.TaskBroker.new_uuid","title":"new_uuid","text":"
    new_uuid()\n
    Source code in fluid/scheduler/broker.py
    def new_uuid(self) -> str:\n    return uuid4().hex\n
    "},{"location":"reference/task_broker/#fluid.scheduler.TaskBroker.filter_tasks","title":"filter_tasks async","text":"
    filter_tasks(scheduled=None, enabled=None)\n
    Source code in fluid/scheduler/broker.py
    async def filter_tasks(\n    self,\n    scheduled: bool | None = None,\n    enabled: bool | None = None,\n) -> list[Task]:\n    task_info = await self.get_tasks_info()\n    task_map = {info.name: info for info in task_info}\n    tasks = []\n    for task in self.registry.values():\n        if scheduled is not None and bool(task.schedule) is not scheduled:\n            continue\n        if enabled is not None and task_map[task.name].enabled is not enabled:\n            continue\n        tasks.append(task)\n    return tasks\n
    "},{"location":"reference/task_broker/#fluid.scheduler.TaskBroker.task_from_registry","title":"task_from_registry","text":"
    task_from_registry(task)\n
    Source code in fluid/scheduler/broker.py
    def task_from_registry(self, task: str | Task) -> Task:\n    if isinstance(task, Task):\n        self.register_task(task)\n        return task\n    else:\n        if task_ := self.registry.get(task):\n            return task_\n        raise UnknownTaskError(task)\n
    "},{"location":"reference/task_broker/#fluid.scheduler.TaskBroker.register_task","title":"register_task","text":"
    register_task(task)\n
    Source code in fluid/scheduler/broker.py
    def register_task(self, task: Task) -> None:\n    self.registry[task.name] = task\n
    "},{"location":"reference/task_broker/#fluid.scheduler.TaskBroker.enable_task","title":"enable_task async","text":"
    enable_task(task_name, enable=True)\n

    Enable or disable a registered task

    Source code in fluid/scheduler/broker.py
    async def enable_task(self, task_name: str, enable: bool = True) -> TaskInfo:\n    \"\"\"Enable or disable a registered task\"\"\"\n    task = self.registry.get(task_name)\n    if not task:\n        raise UnknownTaskError(task_name)\n    return await self.update_task(task, dict(enabled=enable))\n
    "},{"location":"reference/task_broker/#fluid.scheduler.TaskBroker.from_url","title":"from_url classmethod","text":"
    from_url(url='')\n
    Source code in fluid/scheduler/broker.py
    @classmethod\ndef from_url(cls, url: str = \"\") -> TaskBroker:\n    p = URL(url or broker_url_from_env())\n    if factory := _brokers.get(p.scheme):\n        return factory(p)\n    raise RuntimeError(f\"Invalid broker {p}\")\n
    "},{"location":"reference/task_broker/#fluid.scheduler.TaskBroker.register_broker","title":"register_broker classmethod","text":"
    register_broker(name, factory)\n
    Source code in fluid/scheduler/broker.py
    @classmethod\ndef register_broker(cls, name: str, factory: type[TaskBroker]) -> None:\n    _brokers[name] = factory\n
    "},{"location":"reference/task_manager/","title":"Task Manager","text":"

    It can be imported from fluid.scheduler:

    from fastapi.scheduler import TaskManager\n
    "},{"location":"reference/task_manager/#fluid.scheduler.TaskManager","title":"fluid.scheduler.TaskManager","text":"
    TaskManager(**kwargs)\n

    The task manager is the main entry point for managing tasks

    Source code in fluid/scheduler/consumer.py
    def __init__(self, **kwargs: Any) -> None:\n    self.state: dict[str, Any] = {}\n    self.config: TaskManagerConfig = TaskManagerConfig(**kwargs)\n    self.dispatcher = TaskDispatcher()\n    self.broker = TaskBroker.from_url(self.config.broker_url)\n    self._stack = AsyncExitStack()\n
    "},{"location":"reference/task_manager/#fluid.scheduler.TaskManager.state","title":"state instance-attribute","text":"
    state = {}\n
    "},{"location":"reference/task_manager/#fluid.scheduler.TaskManager.config","title":"config instance-attribute","text":"
    config = TaskManagerConfig(**kwargs)\n
    "},{"location":"reference/task_manager/#fluid.scheduler.TaskManager.dispatcher","title":"dispatcher instance-attribute","text":"
    dispatcher = TaskDispatcher()\n
    "},{"location":"reference/task_manager/#fluid.scheduler.TaskManager.broker","title":"broker instance-attribute","text":"
    broker = from_url(broker_url)\n
    "},{"location":"reference/task_manager/#fluid.scheduler.TaskManager.registry","title":"registry property","text":"
    registry\n
    "},{"location":"reference/task_manager/#fluid.scheduler.TaskManager.type","title":"type property","text":"
    type\n
    "},{"location":"reference/task_manager/#fluid.scheduler.TaskManager.enter_async_context","title":"enter_async_context async","text":"
    enter_async_context(cm)\n
    Source code in fluid/scheduler/consumer.py
    async def enter_async_context(self, cm: Any) -> Any:\n    return await self._stack.enter_async_context(cm)\n
    "},{"location":"reference/task_manager/#fluid.scheduler.TaskManager.execute","title":"execute async","text":"
    execute(task, **params)\n

    Execute a task and wait for it to finish

    Source code in fluid/scheduler/consumer.py
    async def execute(self, task: Task | str, **params: Any) -> TaskRun:\n    \"\"\"Execute a task and wait for it to finish\"\"\"\n    task_run = self.create_task_run(task, **params)\n    await task_run.execute()\n    return task_run\n
    "},{"location":"reference/task_manager/#fluid.scheduler.TaskManager.on_shutdown","title":"on_shutdown async","text":"
    on_shutdown()\n
    Source code in fluid/scheduler/consumer.py
    async def on_shutdown(self) -> None:\n    await self.broker.close()\n
    "},{"location":"reference/task_manager/#fluid.scheduler.TaskManager.execute_sync","title":"execute_sync","text":"
    execute_sync(task, **params)\n
    Source code in fluid/scheduler/consumer.py
    def execute_sync(self, task: Task | str, **params: Any) -> TaskRun:\n    return asyncio.get_event_loop().run_until_complete(\n        self._execute_and_exit(task, **params)\n    )\n
    "},{"location":"reference/task_manager/#fluid.scheduler.TaskManager.register_task","title":"register_task","text":"
    register_task(task)\n

    Register a task with the task manager

    Only tasks registered can be executed by a task manager

    Source code in fluid/scheduler/consumer.py
    def register_task(self, task: Task) -> None:\n    \"\"\"Register a task with the task manager\n\n    Only tasks registered can be executed by a task manager\n    \"\"\"\n    self.broker.register_task(task)\n
    "},{"location":"reference/task_manager/#fluid.scheduler.TaskManager.queue","title":"queue async","text":"
    queue(task, priority=None, **params)\n

    Queue a task for execution

    This methods fires two events:

    • queue: when the task is about to be queued
    • queued: after the task is queued
    Source code in fluid/scheduler/consumer.py
    async def queue(\n    self,\n    task: str | Task,\n    priority: TaskPriority | None = None,\n    **params: Any,\n) -> TaskRun:\n    \"\"\"Queue a task for execution\n\n    This methods fires two events:\n\n    - queue: when the task is about to be queued\n    - queued: after the task is queued\n    \"\"\"\n    task_run = self.create_task_run(task, priority=priority, **params)\n    self.dispatcher.dispatch(task_run)\n    task_run.set_state(TaskState.queued)\n    await self.broker.queue_task(task_run)\n    return task_run\n
    "},{"location":"reference/task_manager/#fluid.scheduler.TaskManager.create_task_run","title":"create_task_run","text":"
    create_task_run(task, run_id='', priority=None, **params)\n

    Create a TaskRun in init state

    Source code in fluid/scheduler/consumer.py
    def create_task_run(\n    self,\n    task: str | Task,\n    run_id: str = \"\",\n    priority: TaskPriority | None = None,\n    **params: Any,\n) -> TaskRun:\n    \"\"\"Create a TaskRun in `init` state\"\"\"\n    if isinstance(task, str):\n        task = self.broker.task_from_registry(task)\n    run_id = run_id or self.broker.new_uuid()\n    return TaskRun(\n        id=run_id,\n        task=task,\n        priority=priority or task.priority,\n        params=params,\n        task_manager=self,\n    )\n
    "},{"location":"reference/task_manager/#fluid.scheduler.TaskManager.register_from_module","title":"register_from_module","text":"
    register_from_module(module)\n
    Source code in fluid/scheduler/consumer.py
    def register_from_module(self, module: Any) -> None:\n    for name in dir(module):\n        if name.startswith(\"_\"):\n            continue\n        if isinstance(obj := getattr(module, name), Task):\n            self.register_task(obj)\n
    "},{"location":"reference/task_manager/#fluid.scheduler.TaskManager.cli","title":"cli","text":"
    cli(**kwargs)\n

    Create the task manager command line interface

    Source code in fluid/scheduler/consumer.py
    def cli(self, **kwargs: Any) -> Any:\n    \"\"\"Create the task manager command line interface\"\"\"\n    try:\n        from fluid.scheduler.cli import TaskManagerCLI\n    except ImportError:\n        raise ImportError(\n            \"TaskManagerCLI is not available - \"\n            \"install with `pip install aio-fluid[cli]`\"\n        ) from None\n    return TaskManagerCLI(self, **kwargs)\n
    "},{"location":"reference/task_run/","title":"Task Run","text":"

    It can be imported from fluid.scheduler:

    from fastapi.scheduler import TaskRun\n
    "},{"location":"reference/task_run/#fluid.scheduler.TaskRun","title":"fluid.scheduler.TaskRun","text":"

    Bases: BaseModel

    A TaskRun contains all the data generated by a Task run

    "},{"location":"reference/task_run/#fluid.scheduler.TaskRun.id","title":"id instance-attribute","text":"
    id\n
    "},{"location":"reference/task_run/#fluid.scheduler.TaskRun.task","title":"task instance-attribute","text":"
    task\n
    "},{"location":"reference/task_run/#fluid.scheduler.TaskRun.priority","title":"priority instance-attribute","text":"
    priority\n
    "},{"location":"reference/task_run/#fluid.scheduler.TaskRun.params","title":"params instance-attribute","text":"
    params\n
    "},{"location":"reference/task_run/#fluid.scheduler.TaskRun.state","title":"state class-attribute instance-attribute","text":"
    state = init\n
    "},{"location":"reference/task_run/#fluid.scheduler.TaskRun.task_manager","title":"task_manager class-attribute instance-attribute","text":"
    task_manager = Field(exclude=True, repr=False)\n
    "},{"location":"reference/task_run/#fluid.scheduler.TaskRun.queued","title":"queued class-attribute instance-attribute","text":"
    queued = None\n
    "},{"location":"reference/task_run/#fluid.scheduler.TaskRun.start","title":"start class-attribute instance-attribute","text":"
    start = None\n
    "},{"location":"reference/task_run/#fluid.scheduler.TaskRun.end","title":"end class-attribute instance-attribute","text":"
    end = None\n
    "},{"location":"reference/task_run/#fluid.scheduler.TaskRun.logger","title":"logger property","text":"
    logger\n
    "},{"location":"reference/task_run/#fluid.scheduler.TaskRun.in_queue","title":"in_queue property","text":"
    in_queue\n
    "},{"location":"reference/task_run/#fluid.scheduler.TaskRun.duration","title":"duration property","text":"
    duration\n
    "},{"location":"reference/task_run/#fluid.scheduler.TaskRun.duration_ms","title":"duration_ms property","text":"
    duration_ms\n
    "},{"location":"reference/task_run/#fluid.scheduler.TaskRun.total","title":"total property","text":"
    total\n
    "},{"location":"reference/task_run/#fluid.scheduler.TaskRun.name","title":"name property","text":"
    name\n
    "},{"location":"reference/task_run/#fluid.scheduler.TaskRun.name_id","title":"name_id property","text":"
    name_id\n
    "},{"location":"reference/task_run/#fluid.scheduler.TaskRun.is_done","title":"is_done property","text":"
    is_done\n
    "},{"location":"reference/task_run/#fluid.scheduler.TaskRun.is_failure","title":"is_failure property","text":"
    is_failure\n
    "},{"location":"reference/task_run/#fluid.scheduler.TaskRun.execute","title":"execute async","text":"
    execute()\n
    Source code in fluid/scheduler/models.py
    async def execute(self) -> None:\n    try:\n        self.set_state(TaskState.running)\n        await self.task.executor(self)\n    except Exception:\n        self.set_state(TaskState.failure)\n        raise\n    else:\n        self.set_state(TaskState.success)\n
    "},{"location":"reference/task_run/#fluid.scheduler.TaskRun.serialize_task","title":"serialize_task","text":"
    serialize_task(task, _info)\n
    Source code in fluid/scheduler/models.py
    @field_serializer(\"task\")\ndef serialize_task(self, task: Task, _info: Any) -> str:\n    return task.name\n
    "},{"location":"reference/task_run/#fluid.scheduler.TaskRun.params_dump_json","title":"params_dump_json","text":"
    params_dump_json()\n
    Source code in fluid/scheduler/models.py
    def params_dump_json(self) -> str:\n    return self.task.params_dump_json(self.params)\n
    "},{"location":"reference/task_run/#fluid.scheduler.TaskRun.set_state","title":"set_state","text":"
    set_state(state, state_time=None)\n
    Source code in fluid/scheduler/models.py
    def set_state(\n    self,\n    state: TaskState,\n    state_time: datetime | None = None,\n) -> None:\n    if self.state == state:\n        return\n    state_time = state_time or utcnow()\n    match (self.state, state):\n        case (TaskState.init, TaskState.queued):\n            self.queued = state_time\n            self.state = state\n            self._dispatch()\n        case (TaskState.init, _):\n            self.set_state(TaskState.queued, state_time)\n            self.set_state(state, state_time)\n        case (TaskState.queued, TaskState.running):\n            self.start = state_time\n            self.state = state\n            self._dispatch()\n        case (\n            TaskState.queued,\n            TaskState.success\n            | TaskState.aborted\n            | TaskState.rate_limited\n            | TaskState.failure,\n        ):\n            self.set_state(TaskState.running, state_time)\n            self.set_state(state, state_time)\n        case (\n            TaskState.running,\n            TaskState.success\n            | TaskState.aborted\n            | TaskState.rate_limited\n            | TaskState.failure,\n        ):\n            self.end = state_time\n            self.state = state\n            self._dispatch()\n        case _:\n            raise TaskRunError(f\"invalid state transition {self.state} -> {state}\")\n
    "},{"location":"reference/task_run/#fluid.scheduler.TaskRun.lock","title":"lock","text":"
    lock(timeout)\n
    Source code in fluid/scheduler/models.py
    def lock(self, timeout: float | None) -> Lock:\n    return self.task_manager.broker.lock(self.name, timeout=timeout)\n
    "},{"location":"tutorials/","title":"Tutorials","text":"

    The step-by-step guides, the how-to's, the recipes, and all the Aio Fluid parts you can use in your applications.

    "},{"location":"tutorials/db/","title":"Database","text":"

    The fluid.db module provides a simple asynchronous interface to interact with postgres databases. It is built on top of the sqlalchemy and asyncpg libraries.

    "},{"location":"tutorials/scheduler/","title":"Task Queue","text":"

    This module has a lightweight implementation of a distributed task producer (TaskScheduler) and consumer (TaskConsumer). The middleware for distributing tasks can be configured via the Broker interface. A redis broker is provided for convenience.

    "},{"location":"tutorials/scheduler/#tasks","title":"Tasks","text":"

    Tasks are standard python async functions decorated with the task decorator.

    from fluid.scheduler import task, TaskRun\n\n@task\nasync def say_hi(ctx: TaskRun):\n    return \"Hi!\"\n

    There are two types of tasks implemented

    • Simple concurrent tasks - they run concurrently with the task consumer - thy must be IO type tasks (no heavy CPU bound operations)
      from fluid.scheduler import task, TaskRun\n\n  @task\n  async def fecth_data(ctx: TaskRun):\n      # fetch data\n      data = await http_cli.get(\"https://...\")\n      data_id = await datastore_cli.stote(data)\n      # trigger another task\n      ctx.task_manager.queue(\"heavy_calculation\", data_id=data_id)\n
    • CPU bound tasks - they run on a subprocess
    from fluid.scheduler import task, TaskRun\n\n@task(cpu_bound=True)\nasync def heavy_calculation(ctx: TaskRun):\n    data = await datastore_cli.get(ctx.params[\"data_id\"])\n    # perform some heavy calculation\n    ...\n    # trigger another task\n    ctx.task_manager.queue(\"fetch_data\")\n

    Both tasks can be periodically scheduled via the schedule keyword argument:

    from datetime import timedelta\nfrom fluid.scheduler import task, TaskContext, every\n\n@task(schedule=every(timedelta(seconds=1)))\nasync def scheduled(context: TaskContext) -> str:\n    await asyncio.sleep(0.1)\n    return \"OK\"\n
    "},{"location":"tutorials/scheduler/#broker","title":"Broker","text":"

    A Task broker needs to implement three abstract methods

      @abstractmethod\n  async def queue_task(self, queued_task: QueuedTask) -> TaskRun:\n      \"\"\"Queue a task\"\"\"\n\n  @abstractmethod\n  async def get_task_run(self) -> Optional[TaskRun]:\n      \"\"\"Get a Task run from the task queue\"\"\"\n\n  @abstractmethod\n  async def queue_length(self) -> Dict[str, int]:\n      \"\"\"Length of task queues\"\"\"\n

    The library ships a Redis broker for convenience.

    from fluid.scheduler import Broker\n\nredis_broker = Broker.from_url(\"redis://localhost:6349\")\n
    "}]} \ No newline at end of file +{"config":{"lang":["en"],"separator":"[\\s\\-]+","pipeline":["stopWordFilter"]},"docs":[{"location":"","title":"Home","text":"

    Aio Fluid

    Async utilities for backend python services developed by Quantmind.

    Documentation: https://quantmind.github.io/aio-fluid

    Source Code: https://github.com/quantmind/aio-fluid

    "},{"location":"#installation","title":"Installation","text":"

    This is a simple python package you can install via pip:

    pip install aio-fluid\n

    To install all the dependencies:

    pip install aio-fluid[cli, db, http, log]\n

    this includes the following extra dependencies:

    • cli for the command line interface using click and rich
    • db for database support with asyncpg and sqlalchemy
    • http for http client support with httpx and aiohttp
    • log for JSON logging support with python-json-logger
    "},{"location":"#development","title":"Development","text":"

    You can run the examples via

    poetry run python -m examples.main\n
    "},{"location":"reference/","title":"Introduction","text":"

    Here's the reference or code API, the classes, functions, parameters, attributes, and all the aio-fluid parts you can use in your applications.

    "},{"location":"reference/task_broker/","title":"Task Broker","text":"

    It can be imported from fluid.scheduler:

    from fastapi.scheduler import TaskBroker\n
    "},{"location":"reference/task_broker/#fluid.scheduler.TaskBroker","title":"fluid.scheduler.TaskBroker","text":"
    TaskBroker(url)\n

    Bases: ABC

    Source code in fluid/scheduler/broker.py
    def __init__(self, url: URL) -> None:\n    self.url: URL = url\n    self.registry: TaskRegistry = TaskRegistry()\n
    "},{"location":"reference/task_broker/#fluid.scheduler.TaskBroker.url","title":"url instance-attribute","text":"
    url = url\n
    "},{"location":"reference/task_broker/#fluid.scheduler.TaskBroker.registry","title":"registry instance-attribute","text":"
    registry = TaskRegistry()\n
    "},{"location":"reference/task_broker/#fluid.scheduler.TaskBroker.task_queue_names","title":"task_queue_names abstractmethod property","text":"
    task_queue_names\n

    Names of the task queues

    "},{"location":"reference/task_broker/#fluid.scheduler.TaskBroker.queue_task","title":"queue_task abstractmethod async","text":"
    queue_task(task_run)\n

    Queue a task

    Source code in fluid/scheduler/broker.py
    @abstractmethod\nasync def queue_task(self, task_run: TaskRun) -> None:\n    \"\"\"Queue a task\"\"\"\n
    "},{"location":"reference/task_broker/#fluid.scheduler.TaskBroker.get_task_run","title":"get_task_run abstractmethod async","text":"
    get_task_run(task_manager)\n

    Get a Task run from the task queue

    Source code in fluid/scheduler/broker.py
    @abstractmethod\nasync def get_task_run(self, task_manager: TaskManager) -> TaskRun | None:\n    \"\"\"Get a Task run from the task queue\"\"\"\n
    "},{"location":"reference/task_broker/#fluid.scheduler.TaskBroker.queue_length","title":"queue_length abstractmethod async","text":"
    queue_length()\n

    Length of task queues

    Source code in fluid/scheduler/broker.py
    @abstractmethod\nasync def queue_length(self) -> dict[str, int]:\n    \"\"\"Length of task queues\"\"\"\n
    "},{"location":"reference/task_broker/#fluid.scheduler.TaskBroker.get_tasks_info","title":"get_tasks_info abstractmethod async","text":"
    get_tasks_info(*task_names)\n

    List of TaskInfo objects

    Source code in fluid/scheduler/broker.py
    @abstractmethod\nasync def get_tasks_info(self, *task_names: str) -> list[TaskInfo]:\n    \"\"\"List of TaskInfo objects\"\"\"\n
    "},{"location":"reference/task_broker/#fluid.scheduler.TaskBroker.update_task","title":"update_task abstractmethod async","text":"
    update_task(task, params)\n

    Update a task dynamic parameters

    Source code in fluid/scheduler/broker.py
    @abstractmethod\nasync def update_task(self, task: Task, params: dict[str, Any]) -> TaskInfo:\n    \"\"\"Update a task dynamic parameters\"\"\"\n
    "},{"location":"reference/task_broker/#fluid.scheduler.TaskBroker.close","title":"close abstractmethod async","text":"
    close()\n

    Close the broker on shutdown

    Source code in fluid/scheduler/broker.py
    @abstractmethod\nasync def close(self) -> None:\n    \"\"\"Close the broker on shutdown\"\"\"\n
    "},{"location":"reference/task_broker/#fluid.scheduler.TaskBroker.lock","title":"lock abstractmethod","text":"
    lock(name, timeout=None)\n

    Create a lock

    Source code in fluid/scheduler/broker.py
    @abstractmethod\ndef lock(self, name: str, timeout: float | None = None) -> Lock:\n    \"\"\"Create a lock\"\"\"\n
    "},{"location":"reference/task_broker/#fluid.scheduler.TaskBroker.new_uuid","title":"new_uuid","text":"
    new_uuid()\n
    Source code in fluid/scheduler/broker.py
    def new_uuid(self) -> str:\n    return uuid4().hex\n
    "},{"location":"reference/task_broker/#fluid.scheduler.TaskBroker.filter_tasks","title":"filter_tasks async","text":"
    filter_tasks(scheduled=None, enabled=None)\n
    Source code in fluid/scheduler/broker.py
    async def filter_tasks(\n    self,\n    scheduled: bool | None = None,\n    enabled: bool | None = None,\n) -> list[Task]:\n    task_info = await self.get_tasks_info()\n    task_map = {info.name: info for info in task_info}\n    tasks = []\n    for task in self.registry.values():\n        if scheduled is not None and bool(task.schedule) is not scheduled:\n            continue\n        if enabled is not None and task_map[task.name].enabled is not enabled:\n            continue\n        tasks.append(task)\n    return tasks\n
    "},{"location":"reference/task_broker/#fluid.scheduler.TaskBroker.task_from_registry","title":"task_from_registry","text":"
    task_from_registry(task)\n
    Source code in fluid/scheduler/broker.py
    def task_from_registry(self, task: str | Task) -> Task:\n    if isinstance(task, Task):\n        self.register_task(task)\n        return task\n    else:\n        if task_ := self.registry.get(task):\n            return task_\n        raise UnknownTaskError(task)\n
    "},{"location":"reference/task_broker/#fluid.scheduler.TaskBroker.register_task","title":"register_task","text":"
    register_task(task)\n
    Source code in fluid/scheduler/broker.py
    def register_task(self, task: Task) -> None:\n    self.registry[task.name] = task\n
    "},{"location":"reference/task_broker/#fluid.scheduler.TaskBroker.enable_task","title":"enable_task async","text":"
    enable_task(task_name, enable=True)\n

    Enable or disable a registered task

    Source code in fluid/scheduler/broker.py
    async def enable_task(self, task_name: str, enable: bool = True) -> TaskInfo:\n    \"\"\"Enable or disable a registered task\"\"\"\n    task = self.registry.get(task_name)\n    if not task:\n        raise UnknownTaskError(task_name)\n    return await self.update_task(task, dict(enabled=enable))\n
    "},{"location":"reference/task_broker/#fluid.scheduler.TaskBroker.from_url","title":"from_url classmethod","text":"
    from_url(url='')\n
    Source code in fluid/scheduler/broker.py
    @classmethod\ndef from_url(cls, url: str = \"\") -> TaskBroker:\n    p = URL(url or broker_url_from_env())\n    if factory := _brokers.get(p.scheme):\n        return factory(p)\n    raise RuntimeError(f\"Invalid broker {p}\")\n
    "},{"location":"reference/task_broker/#fluid.scheduler.TaskBroker.register_broker","title":"register_broker classmethod","text":"
    register_broker(name, factory)\n
    Source code in fluid/scheduler/broker.py
    @classmethod\ndef register_broker(cls, name: str, factory: type[TaskBroker]) -> None:\n    _brokers[name] = factory\n
    "},{"location":"reference/task_manager/","title":"Task Manager","text":"

    It can be imported from fluid.scheduler:

    from fastapi.scheduler import TaskManager\n
    "},{"location":"reference/task_manager/#fluid.scheduler.TaskManager","title":"fluid.scheduler.TaskManager","text":"
    TaskManager(**kwargs)\n

    The task manager is the main entry point for managing tasks

    Source code in fluid/scheduler/consumer.py
    def __init__(self, **kwargs: Any) -> None:\n    self.state: dict[str, Any] = {}\n    self.config: TaskManagerConfig = TaskManagerConfig(**kwargs)\n    self.dispatcher = TaskDispatcher()\n    self.broker = TaskBroker.from_url(self.config.broker_url)\n    self._stack = AsyncExitStack()\n
    "},{"location":"reference/task_manager/#fluid.scheduler.TaskManager.state","title":"state instance-attribute","text":"
    state = {}\n
    "},{"location":"reference/task_manager/#fluid.scheduler.TaskManager.config","title":"config instance-attribute","text":"
    config = TaskManagerConfig(**kwargs)\n
    "},{"location":"reference/task_manager/#fluid.scheduler.TaskManager.dispatcher","title":"dispatcher instance-attribute","text":"
    dispatcher = TaskDispatcher()\n
    "},{"location":"reference/task_manager/#fluid.scheduler.TaskManager.broker","title":"broker instance-attribute","text":"
    broker = from_url(broker_url)\n
    "},{"location":"reference/task_manager/#fluid.scheduler.TaskManager.registry","title":"registry property","text":"
    registry\n
    "},{"location":"reference/task_manager/#fluid.scheduler.TaskManager.type","title":"type property","text":"
    type\n
    "},{"location":"reference/task_manager/#fluid.scheduler.TaskManager.enter_async_context","title":"enter_async_context async","text":"
    enter_async_context(cm)\n
    Source code in fluid/scheduler/consumer.py
    async def enter_async_context(self, cm: Any) -> Any:\n    return await self._stack.enter_async_context(cm)\n
    "},{"location":"reference/task_manager/#fluid.scheduler.TaskManager.execute","title":"execute async","text":"
    execute(task, **params)\n

    Execute a task and wait for it to finish

    Source code in fluid/scheduler/consumer.py
    async def execute(self, task: Task | str, **params: Any) -> TaskRun:\n    \"\"\"Execute a task and wait for it to finish\"\"\"\n    task_run = self.create_task_run(task, **params)\n    await task_run.execute()\n    return task_run\n
    "},{"location":"reference/task_manager/#fluid.scheduler.TaskManager.on_shutdown","title":"on_shutdown async","text":"
    on_shutdown()\n
    Source code in fluid/scheduler/consumer.py
    async def on_shutdown(self) -> None:\n    await self.broker.close()\n
    "},{"location":"reference/task_manager/#fluid.scheduler.TaskManager.execute_sync","title":"execute_sync","text":"
    execute_sync(task, **params)\n
    Source code in fluid/scheduler/consumer.py
    def execute_sync(self, task: Task | str, **params: Any) -> TaskRun:\n    return asyncio.get_event_loop().run_until_complete(\n        self._execute_and_exit(task, **params)\n    )\n
    "},{"location":"reference/task_manager/#fluid.scheduler.TaskManager.register_task","title":"register_task","text":"
    register_task(task)\n

    Register a task with the task manager

    Only tasks registered can be executed by a task manager

    Source code in fluid/scheduler/consumer.py
    def register_task(self, task: Task) -> None:\n    \"\"\"Register a task with the task manager\n\n    Only tasks registered can be executed by a task manager\n    \"\"\"\n    self.broker.register_task(task)\n
    "},{"location":"reference/task_manager/#fluid.scheduler.TaskManager.queue","title":"queue async","text":"
    queue(task, priority=None, **params)\n

    Queue a task for execution

    This methods fires two events:

    • queue: when the task is about to be queued
    • queued: after the task is queued
    Source code in fluid/scheduler/consumer.py
    async def queue(\n    self,\n    task: str | Task,\n    priority: TaskPriority | None = None,\n    **params: Any,\n) -> TaskRun:\n    \"\"\"Queue a task for execution\n\n    This methods fires two events:\n\n    - queue: when the task is about to be queued\n    - queued: after the task is queued\n    \"\"\"\n    task_run = self.create_task_run(task, priority=priority, **params)\n    self.dispatcher.dispatch(task_run)\n    task_run.set_state(TaskState.queued)\n    await self.broker.queue_task(task_run)\n    return task_run\n
    "},{"location":"reference/task_manager/#fluid.scheduler.TaskManager.create_task_run","title":"create_task_run","text":"
    create_task_run(task, run_id='', priority=None, **params)\n

    Create a TaskRun in init state

    Source code in fluid/scheduler/consumer.py
    def create_task_run(\n    self,\n    task: str | Task,\n    run_id: str = \"\",\n    priority: TaskPriority | None = None,\n    **params: Any,\n) -> TaskRun:\n    \"\"\"Create a TaskRun in `init` state\"\"\"\n    if isinstance(task, str):\n        task = self.broker.task_from_registry(task)\n    run_id = run_id or self.broker.new_uuid()\n    return TaskRun(\n        id=run_id,\n        task=task,\n        priority=priority or task.priority,\n        params=params,\n        task_manager=self,\n    )\n
    "},{"location":"reference/task_manager/#fluid.scheduler.TaskManager.register_from_module","title":"register_from_module","text":"
    register_from_module(module)\n
    Source code in fluid/scheduler/consumer.py
    def register_from_module(self, module: Any) -> None:\n    for name in dir(module):\n        if name.startswith(\"_\"):\n            continue\n        if isinstance(obj := getattr(module, name), Task):\n            self.register_task(obj)\n
    "},{"location":"reference/task_manager/#fluid.scheduler.TaskManager.cli","title":"cli","text":"
    cli(**kwargs)\n

    Create the task manager command line interface

    Source code in fluid/scheduler/consumer.py
    def cli(self, **kwargs: Any) -> Any:\n    \"\"\"Create the task manager command line interface\"\"\"\n    try:\n        from fluid.scheduler.cli import TaskManagerCLI\n    except ImportError:\n        raise ImportError(\n            \"TaskManagerCLI is not available - \"\n            \"install with `pip install aio-fluid[cli]`\"\n        ) from None\n    return TaskManagerCLI(self, **kwargs)\n
    "},{"location":"reference/task_run/","title":"Task Run","text":"

    It can be imported from fluid.scheduler:

    from fastapi.scheduler import TaskRun\n
    "},{"location":"reference/task_run/#fluid.scheduler.TaskRun","title":"fluid.scheduler.TaskRun","text":"

    Bases: BaseModel

    A TaskRun contains all the data generated by a Task run

    "},{"location":"reference/task_run/#fluid.scheduler.TaskRun.id","title":"id instance-attribute","text":"
    id\n
    "},{"location":"reference/task_run/#fluid.scheduler.TaskRun.task","title":"task instance-attribute","text":"
    task\n
    "},{"location":"reference/task_run/#fluid.scheduler.TaskRun.priority","title":"priority instance-attribute","text":"
    priority\n
    "},{"location":"reference/task_run/#fluid.scheduler.TaskRun.params","title":"params instance-attribute","text":"
    params\n
    "},{"location":"reference/task_run/#fluid.scheduler.TaskRun.state","title":"state class-attribute instance-attribute","text":"
    state = init\n
    "},{"location":"reference/task_run/#fluid.scheduler.TaskRun.task_manager","title":"task_manager class-attribute instance-attribute","text":"
    task_manager = Field(exclude=True, repr=False)\n
    "},{"location":"reference/task_run/#fluid.scheduler.TaskRun.queued","title":"queued class-attribute instance-attribute","text":"
    queued = None\n
    "},{"location":"reference/task_run/#fluid.scheduler.TaskRun.start","title":"start class-attribute instance-attribute","text":"
    start = None\n
    "},{"location":"reference/task_run/#fluid.scheduler.TaskRun.end","title":"end class-attribute instance-attribute","text":"
    end = None\n
    "},{"location":"reference/task_run/#fluid.scheduler.TaskRun.logger","title":"logger property","text":"
    logger\n
    "},{"location":"reference/task_run/#fluid.scheduler.TaskRun.in_queue","title":"in_queue property","text":"
    in_queue\n
    "},{"location":"reference/task_run/#fluid.scheduler.TaskRun.duration","title":"duration property","text":"
    duration\n
    "},{"location":"reference/task_run/#fluid.scheduler.TaskRun.duration_ms","title":"duration_ms property","text":"
    duration_ms\n
    "},{"location":"reference/task_run/#fluid.scheduler.TaskRun.total","title":"total property","text":"
    total\n
    "},{"location":"reference/task_run/#fluid.scheduler.TaskRun.name","title":"name property","text":"
    name\n
    "},{"location":"reference/task_run/#fluid.scheduler.TaskRun.name_id","title":"name_id property","text":"
    name_id\n
    "},{"location":"reference/task_run/#fluid.scheduler.TaskRun.is_done","title":"is_done property","text":"
    is_done\n
    "},{"location":"reference/task_run/#fluid.scheduler.TaskRun.is_failure","title":"is_failure property","text":"
    is_failure\n
    "},{"location":"reference/task_run/#fluid.scheduler.TaskRun.execute","title":"execute async","text":"
    execute()\n
    Source code in fluid/scheduler/models.py
    async def execute(self) -> None:\n    try:\n        self.set_state(TaskState.running)\n        await self.task.executor(self)\n    except Exception:\n        self.set_state(TaskState.failure)\n        raise\n    else:\n        self.set_state(TaskState.success)\n
    "},{"location":"reference/task_run/#fluid.scheduler.TaskRun.serialize_task","title":"serialize_task","text":"
    serialize_task(task, _info)\n
    Source code in fluid/scheduler/models.py
    @field_serializer(\"task\")\ndef serialize_task(self, task: Task, _info: Any) -> str:\n    return task.name\n
    "},{"location":"reference/task_run/#fluid.scheduler.TaskRun.params_dump_json","title":"params_dump_json","text":"
    params_dump_json()\n
    Source code in fluid/scheduler/models.py
    def params_dump_json(self) -> str:\n    return self.task.params_dump_json(self.params)\n
    "},{"location":"reference/task_run/#fluid.scheduler.TaskRun.set_state","title":"set_state","text":"
    set_state(state, state_time=None)\n
    Source code in fluid/scheduler/models.py
    def set_state(\n    self,\n    state: TaskState,\n    state_time: datetime | None = None,\n) -> None:\n    if self.state == state:\n        return\n    state_time = state_time or utcnow()\n    match (self.state, state):\n        case (TaskState.init, TaskState.queued):\n            self.queued = state_time\n            self.state = state\n            self._dispatch()\n        case (TaskState.init, _):\n            self.set_state(TaskState.queued, state_time)\n            self.set_state(state, state_time)\n        case (TaskState.queued, TaskState.running):\n            self.start = state_time\n            self.state = state\n            self._dispatch()\n        case (\n            TaskState.queued,\n            TaskState.success\n            | TaskState.aborted\n            | TaskState.rate_limited\n            | TaskState.failure,\n        ):\n            self.set_state(TaskState.running, state_time)\n            self.set_state(state, state_time)\n        case (\n            TaskState.running,\n            TaskState.success\n            | TaskState.aborted\n            | TaskState.rate_limited\n            | TaskState.failure,\n        ):\n            self.end = state_time\n            self.state = state\n            self._dispatch()\n        case _:\n            raise TaskRunError(f\"invalid state transition {self.state} -> {state}\")\n
    "},{"location":"reference/task_run/#fluid.scheduler.TaskRun.lock","title":"lock","text":"
    lock(timeout)\n
    Source code in fluid/scheduler/models.py
    def lock(self, timeout: float | None) -> Lock:\n    return self.task_manager.broker.lock(self.name, timeout=timeout)\n
    "},{"location":"reference/tast_consumer/","title":"Task Consumer","text":"

    The task consumer is a TaskManager which is also a Workers that consumes tasks from the task queue and executes them. It can be imported from fluid.scheduler:

    from fastapi.scheduler import TaskConsumer\n
    "},{"location":"reference/tast_consumer/#fluid.scheduler.TaskConsumer","title":"fluid.scheduler.TaskConsumer","text":"
    TaskConsumer(**config)\n

    Bases: TaskManager, Workers

    The Task Consumer is a Task Manager responsible for consuming tasks from a task queue

    Source code in fluid/scheduler/consumer.py
    def __init__(self, **config: Any) -> None:\n    super().__init__(**config)\n    Workers.__init__(self)\n    self._concurrent_tasks: dict[str, dict[str, TaskRun]] = defaultdict(dict)\n    self._task_to_queue: deque[str | Task] = deque()\n    self._priority_task_run_queue: deque[TaskRun] = deque()\n    self._queue_tasks_worker = WorkerFunction(\n        self._queue_task, name=\"queue-task-worker\"\n    )\n    self.add_workers(self._queue_tasks_worker)\n    for i in range(self.config.max_concurrent_tasks):\n        worker_name = f\"task-worker-{i+1}\"\n        self.add_workers(\n            WorkerFunction(\n                partial(self._consume_tasks, worker_name), name=worker_name\n            )\n        )\n
    "},{"location":"reference/tast_consumer/#fluid.scheduler.TaskConsumer.worker_name","title":"worker_name property","text":"
    worker_name\n
    "},{"location":"reference/tast_consumer/#fluid.scheduler.TaskConsumer.num_workers","title":"num_workers property","text":"
    num_workers\n
    "},{"location":"reference/tast_consumer/#fluid.scheduler.TaskConsumer.running","title":"running property","text":"
    running\n
    "},{"location":"reference/tast_consumer/#fluid.scheduler.TaskConsumer.state","title":"state instance-attribute","text":"
    state = {}\n
    "},{"location":"reference/tast_consumer/#fluid.scheduler.TaskConsumer.config","title":"config instance-attribute","text":"
    config = TaskManagerConfig(**kwargs)\n
    "},{"location":"reference/tast_consumer/#fluid.scheduler.TaskConsumer.dispatcher","title":"dispatcher instance-attribute","text":"
    dispatcher = TaskDispatcher()\n
    "},{"location":"reference/tast_consumer/#fluid.scheduler.TaskConsumer.broker","title":"broker instance-attribute","text":"
    broker = from_url(broker_url)\n
    "},{"location":"reference/tast_consumer/#fluid.scheduler.TaskConsumer.registry","title":"registry property","text":"
    registry\n
    "},{"location":"reference/tast_consumer/#fluid.scheduler.TaskConsumer.type","title":"type property","text":"
    type\n
    "},{"location":"reference/tast_consumer/#fluid.scheduler.TaskConsumer.num_concurrent_tasks","title":"num_concurrent_tasks property","text":"
    num_concurrent_tasks\n

    The number of concurrent_tasks running in the consumer

    "},{"location":"reference/tast_consumer/#fluid.scheduler.TaskConsumer.status","title":"status async","text":"
    status()\n
    Source code in fluid/utils/worker.py
    async def status(self) -> dict:\n    return await self._workers.status()\n
    "},{"location":"reference/tast_consumer/#fluid.scheduler.TaskConsumer.gracefully_stop","title":"gracefully_stop","text":"
    gracefully_stop()\n
    Source code in fluid/utils/worker.py
    def gracefully_stop(self) -> None:\n    self._workers.gracefully_stop()\n
    "},{"location":"reference/tast_consumer/#fluid.scheduler.TaskConsumer.is_running","title":"is_running","text":"
    is_running()\n
    Source code in fluid/utils/worker.py
    def is_running(self) -> bool:\n    return self._running\n
    "},{"location":"reference/tast_consumer/#fluid.scheduler.TaskConsumer.is_stopping","title":"is_stopping","text":"
    is_stopping()\n
    Source code in fluid/utils/worker.py
    def is_stopping(self) -> bool:\n    return self._workers.is_stopping()\n
    "},{"location":"reference/tast_consumer/#fluid.scheduler.TaskConsumer.run","title":"run async","text":"
    run()\n

    run the workers

    Source code in fluid/utils/worker.py
    async def run(self) -> None:\n    \"\"\"run the workers\"\"\"\n    with self.start_running():\n        async with self.safe_run():\n            workers, _ = self._workers.workers_tasks()\n            self._workers.workers = tuple(workers)\n            self._workers.tasks = tuple(\n                self.create_task(worker) for worker in workers\n            )\n            await asyncio.gather(*self._workers.tasks)\n        await self.shutdown()\n
    "},{"location":"reference/tast_consumer/#fluid.scheduler.TaskConsumer.start_running","title":"start_running","text":"
    start_running()\n
    Source code in fluid/utils/worker.py
    @contextmanager\ndef start_running(self) -> Generator:\n    if self._running:\n        raise RuntimeError(\"Worker is already running\")\n    self._running = True\n    try:\n        logger.info(\"%s started running\", self.worker_name)\n        yield\n    finally:\n        self._running = False\n        logger.warning(\"%s stopped running\", self.worker_name)\n
    "},{"location":"reference/tast_consumer/#fluid.scheduler.TaskConsumer.add_workers","title":"add_workers","text":"
    add_workers(*workers)\n

    add workers to the workers

    Source code in fluid/utils/worker.py
    def add_workers(self, *workers: Worker) -> None:\n    \"\"\"add workers to the workers\"\"\"\n    workers_, _ = self._workers.workers_tasks()\n    for worker in workers:\n        if worker not in workers_:\n            workers_.append(worker)\n
    "},{"location":"reference/tast_consumer/#fluid.scheduler.TaskConsumer.wait_for_exit","title":"wait_for_exit async","text":"
    wait_for_exit()\n
    Source code in fluid/utils/worker.py
    async def wait_for_exit(self) -> None:\n    if self._workers_task is not None:\n        await self._workers_task\n
    "},{"location":"reference/tast_consumer/#fluid.scheduler.TaskConsumer.create_task","title":"create_task","text":"
    create_task(worker)\n
    Source code in fluid/utils/worker.py
    def create_task(self, worker: Worker) -> asyncio.Task:\n    return asyncio.create_task(\n        self._run_worker(worker), name=f\"{self.worker_name}-{worker.worker_name}\"\n    )\n
    "},{"location":"reference/tast_consumer/#fluid.scheduler.TaskConsumer.on_shutdown","title":"on_shutdown async","text":"
    on_shutdown()\n
    Source code in fluid/scheduler/consumer.py
    async def on_shutdown(self) -> None:\n    await self.broker.close()\n
    "},{"location":"reference/tast_consumer/#fluid.scheduler.TaskConsumer.shutdown","title":"shutdown async","text":"
    shutdown()\n

    shutdown the workers

    Source code in fluid/utils/worker.py
    async def shutdown(self) -> None:\n    \"\"\"shutdown the workers\"\"\"\n    if self._has_shutdown:\n        return\n    self._has_shutdown = True\n    logger.warning(\n        \"gracefully stopping %d workers: %s\",\n        self.num_workers,\n        \", \".join(w.worker_name for w in self._workers.workers),\n    )\n    self.gracefully_stop()\n    try:\n        async with async_timeout.timeout(self._stopping_grace_period):\n            await self.wait_for_exit()\n        await self.on_shutdown()\n        return\n    except asyncio.TimeoutError:\n        logger.warning(\n            \"could not stop workers %s gracefully after %s\"\n            \" seconds - force shutdown\",\n            \", \".join(\n                task.get_name() for task in self._workers.tasks if not task.done()\n            ),\n            self._stopping_grace_period,\n        )\n    except asyncio.CancelledError:\n        pass\n    self._force_shutdown = True\n    self._workers.cancel()\n    try:\n        await self.wait_for_exit()\n    except asyncio.CancelledError:\n        pass\n    await self.on_shutdown()\n
    "},{"location":"reference/tast_consumer/#fluid.scheduler.TaskConsumer.bail_out","title":"bail_out","text":"
    bail_out(reason, code=1)\n
    Source code in fluid/utils/worker.py
    def bail_out(self, reason: str, code: int = 1) -> None:\n    self.gracefully_stop()\n
    "},{"location":"reference/tast_consumer/#fluid.scheduler.TaskConsumer.safe_run","title":"safe_run async","text":"
    safe_run()\n

    Context manager to run a worker safely

    Source code in fluid/utils/worker.py
    @asynccontextmanager\nasync def safe_run(self) -> AsyncGenerator:\n    \"\"\"Context manager to run a worker safely\"\"\"\n    try:\n        yield\n    except asyncio.CancelledError:\n        if self._force_shutdown:\n            # we are shutting down, this is expected\n            pass\n        raise\n    except Exception as e:\n        reason = f\"unhandled exception while running workers: {e}\"\n        logger.exception(reason)\n        asyncio.get_event_loop().call_soon(self.bail_out, reason, 2)\n    else:\n        # worker finished without error\n        # make sure we are shutting down\n        asyncio.get_event_loop().call_soon(self.bail_out, \"worker exit\", 1)\n
    "},{"location":"reference/tast_consumer/#fluid.scheduler.TaskConsumer.remove_workers","title":"remove_workers","text":"
    remove_workers(*workers)\n

    remove workers from the workers

    Source code in fluid/utils/worker.py
    def remove_workers(self, *workers: Worker) -> None:\n    \"remove workers from the workers\"\n    workers_, _ = self._workers.workers_tasks()\n    for worker in workers:\n        try:\n            workers_.remove(worker)\n        except ValueError:\n            pass\n
    "},{"location":"reference/tast_consumer/#fluid.scheduler.TaskConsumer.startup","title":"startup async","text":"
    startup()\n

    start the workers

    Source code in fluid/utils/worker.py
    async def startup(self) -> None:\n    \"\"\"start the workers\"\"\"\n    if self._workers_task is None:\n        self._workers_task = asyncio.create_task(self.run(), name=self.worker_name)\n        for args in self._delayed_callbacks:\n            self._delayed_callback(*args)\n        self._delayed_callbacks = []\n
    "},{"location":"reference/tast_consumer/#fluid.scheduler.TaskConsumer.register_callback","title":"register_callback","text":"
    register_callback(\n    callback, seconds, jitter=0.0, periodic=False\n)\n

    Register a callback

    The callback can be periodic or not.

    Source code in fluid/utils/worker.py
    def register_callback(\n    self,\n    callback: Callable[[], None],\n    seconds: float,\n    jitter: float = 0.0,\n    periodic: bool | float = False,\n) -> None:\n    \"\"\"Register a callback\n\n    The callback can be periodic or not.\n    \"\"\"\n    if periodic is True:\n        periodic_float = seconds\n    elif periodic is False:\n        periodic_float = 0.0\n    else:\n        periodic_float = periodic\n    if not self.running:\n        self._delayed_callbacks.append((callback, seconds, jitter, periodic_float))\n    else:\n        self._delayed_callback(callback, seconds, jitter, periodic_float)\n
    "},{"location":"reference/tast_consumer/#fluid.scheduler.TaskConsumer.enter_async_context","title":"enter_async_context async","text":"
    enter_async_context(cm)\n
    Source code in fluid/scheduler/consumer.py
    async def enter_async_context(self, cm: Any) -> Any:\n    return await self._stack.enter_async_context(cm)\n
    "},{"location":"reference/tast_consumer/#fluid.scheduler.TaskConsumer.execute","title":"execute async","text":"
    execute(task, **params)\n

    Execute a task and wait for it to finish

    Source code in fluid/scheduler/consumer.py
    async def execute(self, task: Task | str, **params: Any) -> TaskRun:\n    \"\"\"Execute a task and wait for it to finish\"\"\"\n    task_run = self.create_task_run(task, **params)\n    await task_run.execute()\n    return task_run\n
    "},{"location":"reference/tast_consumer/#fluid.scheduler.TaskConsumer.execute_sync","title":"execute_sync","text":"
    execute_sync(task, **params)\n
    Source code in fluid/scheduler/consumer.py
    def execute_sync(self, task: Task | str, **params: Any) -> TaskRun:\n    return asyncio.get_event_loop().run_until_complete(\n        self._execute_and_exit(task, **params)\n    )\n
    "},{"location":"reference/tast_consumer/#fluid.scheduler.TaskConsumer.register_task","title":"register_task","text":"
    register_task(task)\n

    Register a task with the task manager

    Only tasks registered can be executed by a task manager

    Source code in fluid/scheduler/consumer.py
    def register_task(self, task: Task) -> None:\n    \"\"\"Register a task with the task manager\n\n    Only tasks registered can be executed by a task manager\n    \"\"\"\n    self.broker.register_task(task)\n
    "},{"location":"reference/tast_consumer/#fluid.scheduler.TaskConsumer.queue","title":"queue async","text":"
    queue(task, priority=None, **params)\n

    Queue a task for execution

    This methods fires two events:

    • queue: when the task is about to be queued
    • queued: after the task is queued
    Source code in fluid/scheduler/consumer.py
    async def queue(\n    self,\n    task: str | Task,\n    priority: TaskPriority | None = None,\n    **params: Any,\n) -> TaskRun:\n    \"\"\"Queue a task for execution\n\n    This methods fires two events:\n\n    - queue: when the task is about to be queued\n    - queued: after the task is queued\n    \"\"\"\n    task_run = self.create_task_run(task, priority=priority, **params)\n    self.dispatcher.dispatch(task_run)\n    task_run.set_state(TaskState.queued)\n    await self.broker.queue_task(task_run)\n    return task_run\n
    "},{"location":"reference/tast_consumer/#fluid.scheduler.TaskConsumer.create_task_run","title":"create_task_run","text":"
    create_task_run(task, run_id='', priority=None, **params)\n

    Create a TaskRun in init state

    Source code in fluid/scheduler/consumer.py
    def create_task_run(\n    self,\n    task: str | Task,\n    run_id: str = \"\",\n    priority: TaskPriority | None = None,\n    **params: Any,\n) -> TaskRun:\n    \"\"\"Create a TaskRun in `init` state\"\"\"\n    if isinstance(task, str):\n        task = self.broker.task_from_registry(task)\n    run_id = run_id or self.broker.new_uuid()\n    return TaskRun(\n        id=run_id,\n        task=task,\n        priority=priority or task.priority,\n        params=params,\n        task_manager=self,\n    )\n
    "},{"location":"reference/tast_consumer/#fluid.scheduler.TaskConsumer.register_from_module","title":"register_from_module","text":"
    register_from_module(module)\n
    Source code in fluid/scheduler/consumer.py
    def register_from_module(self, module: Any) -> None:\n    for name in dir(module):\n        if name.startswith(\"_\"):\n            continue\n        if isinstance(obj := getattr(module, name), Task):\n            self.register_task(obj)\n
    "},{"location":"reference/tast_consumer/#fluid.scheduler.TaskConsumer.cli","title":"cli","text":"
    cli(**kwargs)\n

    Create the task manager command line interface

    Source code in fluid/scheduler/consumer.py
    def cli(self, **kwargs: Any) -> Any:\n    \"\"\"Create the task manager command line interface\"\"\"\n    try:\n        from fluid.scheduler.cli import TaskManagerCLI\n    except ImportError:\n        raise ImportError(\n            \"TaskManagerCLI is not available - \"\n            \"install with `pip install aio-fluid[cli]`\"\n        ) from None\n    return TaskManagerCLI(self, **kwargs)\n
    "},{"location":"reference/tast_consumer/#fluid.scheduler.TaskConsumer.sync_queue","title":"sync_queue","text":"
    sync_queue(task)\n
    Source code in fluid/scheduler/consumer.py
    def sync_queue(self, task: str | Task) -> None:\n    self._task_to_queue.appendleft(task)\n
    "},{"location":"reference/tast_consumer/#fluid.scheduler.TaskConsumer.sync_priority_queue","title":"sync_priority_queue","text":"
    sync_priority_queue(task)\n
    Source code in fluid/scheduler/consumer.py
    def sync_priority_queue(self, task: str | Task) -> None:\n    self._priority_task_run_queue.appendleft(self.create_task_run(task))\n
    "},{"location":"reference/tast_consumer/#fluid.scheduler.TaskConsumer.num_concurrent_tasks_for","title":"num_concurrent_tasks_for","text":"
    num_concurrent_tasks_for(task_name)\n

    The number of concurrent tasks for a given task_name

    Source code in fluid/scheduler/consumer.py
    def num_concurrent_tasks_for(self, task_name: str) -> int:\n    \"\"\"The number of concurrent tasks for a given task_name\"\"\"\n    return len(self._concurrent_tasks[task_name])\n
    "},{"location":"reference/tast_consumer/#fluid.scheduler.TaskConsumer.queue_and_wait","title":"queue_and_wait async","text":"
    queue_and_wait(task, *, timeout=2, **params)\n

    Queue a task and wait for it to finish

    Source code in fluid/scheduler/consumer.py
    async def queue_and_wait(\n    self, task: str, *, timeout: int = 2, **params: Any\n) -> TaskRun:\n    \"\"\"Queue a task and wait for it to finish\"\"\"\n    with TaskRunWaiter(self) as waiter:\n        return await waiter.wait(await self.queue(task, **params), timeout=timeout)\n
    "},{"location":"reference/workers/","title":"Workers","text":"

    Workers are the main building block for asynchronous programming with aio-fluid. They are responsible for running tasks and managing their lifecycle. There are several worker classes which can be imported from fluid.utils.worker:

    from fastapi.utils.worker import StoppingWorker\n
    "},{"location":"reference/workers/#fluid.utils.worker.Worker","title":"fluid.utils.worker.Worker","text":"
    Worker(name='')\n

    Bases: ABC

    The base class of a worker that can be run

    Source code in fluid/utils/worker.py
    def __init__(self, name: str = \"\") -> None:\n    self._name: str = name or underscore(type(self).__name__)\n
    "},{"location":"reference/workers/#fluid.utils.worker.Worker.worker_name","title":"worker_name property","text":"
    worker_name\n
    "},{"location":"reference/workers/#fluid.utils.worker.Worker.num_workers","title":"num_workers property","text":"
    num_workers\n
    "},{"location":"reference/workers/#fluid.utils.worker.Worker.status","title":"status abstractmethod async","text":"
    status()\n

    Get the status of the worker.

    Source code in fluid/utils/worker.py
    @abstractmethod\nasync def status(self) -> dict:\n    \"\"\"\n    Get the status of the worker.\n    \"\"\"\n
    "},{"location":"reference/workers/#fluid.utils.worker.Worker.gracefully_stop","title":"gracefully_stop abstractmethod","text":"
    gracefully_stop()\n

    gracefully stop the worker

    Source code in fluid/utils/worker.py
    @abstractmethod\ndef gracefully_stop(self) -> None:\n    \"gracefully stop the worker\"\n
    "},{"location":"reference/workers/#fluid.utils.worker.Worker.is_running","title":"is_running abstractmethod","text":"
    is_running()\n

    Is the worker running?

    Source code in fluid/utils/worker.py
    @abstractmethod\ndef is_running(self) -> bool:\n    \"\"\"Is the worker running?\"\"\"\n
    "},{"location":"reference/workers/#fluid.utils.worker.Worker.is_stopping","title":"is_stopping abstractmethod","text":"
    is_stopping()\n

    Is the worker stopping?

    Source code in fluid/utils/worker.py
    @abstractmethod\ndef is_stopping(self) -> bool:\n    \"\"\"Is the worker stopping?\"\"\"\n
    "},{"location":"reference/workers/#fluid.utils.worker.Worker.run","title":"run abstractmethod async","text":"
    run()\n

    run the worker

    Source code in fluid/utils/worker.py
    @abstractmethod\nasync def run(self) -> None:\n    \"\"\"run the worker\"\"\"\n
    "},{"location":"reference/workers/#fluid.utils.worker.StoppingWorker","title":"fluid.utils.worker.StoppingWorker","text":"
    StoppingWorker(name='')\n

    Bases: RunningWorker

    A Worker that can be stopped

    Source code in fluid/utils/worker.py
    def __init__(self, name: str = \"\") -> None:\n    super().__init__(name)\n    self._stopping: bool = False\n
    "},{"location":"reference/workers/#fluid.utils.worker.StoppingWorker.worker_name","title":"worker_name property","text":"
    worker_name\n
    "},{"location":"reference/workers/#fluid.utils.worker.StoppingWorker.num_workers","title":"num_workers property","text":"
    num_workers\n
    "},{"location":"reference/workers/#fluid.utils.worker.StoppingWorker.is_running","title":"is_running","text":"
    is_running()\n
    Source code in fluid/utils/worker.py
    def is_running(self) -> bool:\n    return self._running\n
    "},{"location":"reference/workers/#fluid.utils.worker.StoppingWorker.run","title":"run abstractmethod async","text":"
    run()\n

    run the worker

    Source code in fluid/utils/worker.py
    @abstractmethod\nasync def run(self) -> None:\n    \"\"\"run the worker\"\"\"\n
    "},{"location":"reference/workers/#fluid.utils.worker.StoppingWorker.start_running","title":"start_running","text":"
    start_running()\n
    Source code in fluid/utils/worker.py
    @contextmanager\ndef start_running(self) -> Generator:\n    if self._running:\n        raise RuntimeError(\"Worker is already running\")\n    self._running = True\n    try:\n        logger.info(\"%s started running\", self.worker_name)\n        yield\n    finally:\n        self._running = False\n        logger.warning(\"%s stopped running\", self.worker_name)\n
    "},{"location":"reference/workers/#fluid.utils.worker.StoppingWorker.is_stopping","title":"is_stopping","text":"
    is_stopping()\n
    Source code in fluid/utils/worker.py
    def is_stopping(self) -> bool:\n    return self._stopping\n
    "},{"location":"reference/workers/#fluid.utils.worker.StoppingWorker.gracefully_stop","title":"gracefully_stop","text":"
    gracefully_stop()\n
    Source code in fluid/utils/worker.py
    def gracefully_stop(self) -> None:\n    self._stopping = True\n
    "},{"location":"reference/workers/#fluid.utils.worker.StoppingWorker.status","title":"status async","text":"
    status()\n
    Source code in fluid/utils/worker.py
    async def status(self) -> dict:\n    return {\"stopping\": self.is_stopping(), \"running\": self.is_running()}\n
    "},{"location":"reference/workers/#fluid.utils.worker.WorkerFunction","title":"fluid.utils.worker.WorkerFunction","text":"
    WorkerFunction(run_function, heartbeat=0, name='')\n

    Bases: StoppingWorker

    Source code in fluid/utils/worker.py
    def __init__(\n    self,\n    run_function: Callable[[], Awaitable[None]],\n    heartbeat: float | int = 0,\n    name: str = \"\",\n) -> None:\n    super().__init__(name=name)\n    self._run_function = run_function\n    self._heartbeat = heartbeat\n
    "},{"location":"reference/workers/#fluid.utils.worker.WorkerFunction.worker_name","title":"worker_name property","text":"
    worker_name\n
    "},{"location":"reference/workers/#fluid.utils.worker.WorkerFunction.num_workers","title":"num_workers property","text":"
    num_workers\n
    "},{"location":"reference/workers/#fluid.utils.worker.WorkerFunction.status","title":"status async","text":"
    status()\n
    Source code in fluid/utils/worker.py
    async def status(self) -> dict:\n    return {\"stopping\": self.is_stopping(), \"running\": self.is_running()}\n
    "},{"location":"reference/workers/#fluid.utils.worker.WorkerFunction.gracefully_stop","title":"gracefully_stop","text":"
    gracefully_stop()\n
    Source code in fluid/utils/worker.py
    def gracefully_stop(self) -> None:\n    self._stopping = True\n
    "},{"location":"reference/workers/#fluid.utils.worker.WorkerFunction.is_running","title":"is_running","text":"
    is_running()\n
    Source code in fluid/utils/worker.py
    def is_running(self) -> bool:\n    return self._running\n
    "},{"location":"reference/workers/#fluid.utils.worker.WorkerFunction.is_stopping","title":"is_stopping","text":"
    is_stopping()\n
    Source code in fluid/utils/worker.py
    def is_stopping(self) -> bool:\n    return self._stopping\n
    "},{"location":"reference/workers/#fluid.utils.worker.WorkerFunction.start_running","title":"start_running","text":"
    start_running()\n
    Source code in fluid/utils/worker.py
    @contextmanager\ndef start_running(self) -> Generator:\n    if self._running:\n        raise RuntimeError(\"Worker is already running\")\n    self._running = True\n    try:\n        logger.info(\"%s started running\", self.worker_name)\n        yield\n    finally:\n        self._running = False\n        logger.warning(\"%s stopped running\", self.worker_name)\n
    "},{"location":"reference/workers/#fluid.utils.worker.WorkerFunction.run","title":"run async","text":"
    run()\n
    Source code in fluid/utils/worker.py
    async def run(self) -> None:\n    with self.start_running():\n        while not self.is_stopping():\n            await self._run_function()\n            await asyncio.sleep(self._heartbeat)\n
    "},{"location":"reference/workers/#fluid.utils.worker.Workers","title":"fluid.utils.worker.Workers","text":"
    Workers(\n    *workers,\n    name=\"\",\n    stopping_grace_period=STOPPING_GRACE_PERIOD\n)\n

    Bases: MultipleWorkers

    A worker managing several workers

    Source code in fluid/utils/worker.py
    def __init__(\n    self,\n    *workers: Worker,\n    name: str = \"\",\n    stopping_grace_period: int = settings.STOPPING_GRACE_PERIOD,\n) -> None:\n    super().__init__(\n        *workers, name=name, stopping_grace_period=stopping_grace_period\n    )\n    self._workers_task: asyncio.Task | None = None\n    self._delayed_callbacks: list[\n        tuple[Callable[[], None], float, float, float]\n    ] = []\n
    "},{"location":"reference/workers/#fluid.utils.worker.Workers.worker_name","title":"worker_name property","text":"
    worker_name\n
    "},{"location":"reference/workers/#fluid.utils.worker.Workers.num_workers","title":"num_workers property","text":"
    num_workers\n
    "},{"location":"reference/workers/#fluid.utils.worker.Workers.running","title":"running property","text":"
    running\n
    "},{"location":"reference/workers/#fluid.utils.worker.Workers.status","title":"status async","text":"
    status()\n
    Source code in fluid/utils/worker.py
    async def status(self) -> dict:\n    return await self._workers.status()\n
    "},{"location":"reference/workers/#fluid.utils.worker.Workers.gracefully_stop","title":"gracefully_stop","text":"
    gracefully_stop()\n
    Source code in fluid/utils/worker.py
    def gracefully_stop(self) -> None:\n    self._workers.gracefully_stop()\n
    "},{"location":"reference/workers/#fluid.utils.worker.Workers.is_running","title":"is_running","text":"
    is_running()\n
    Source code in fluid/utils/worker.py
    def is_running(self) -> bool:\n    return self._running\n
    "},{"location":"reference/workers/#fluid.utils.worker.Workers.is_stopping","title":"is_stopping","text":"
    is_stopping()\n
    Source code in fluid/utils/worker.py
    def is_stopping(self) -> bool:\n    return self._workers.is_stopping()\n
    "},{"location":"reference/workers/#fluid.utils.worker.Workers.start_running","title":"start_running","text":"
    start_running()\n
    Source code in fluid/utils/worker.py
    @contextmanager\ndef start_running(self) -> Generator:\n    if self._running:\n        raise RuntimeError(\"Worker is already running\")\n    self._running = True\n    try:\n        logger.info(\"%s started running\", self.worker_name)\n        yield\n    finally:\n        self._running = False\n        logger.warning(\"%s stopped running\", self.worker_name)\n
    "},{"location":"reference/workers/#fluid.utils.worker.Workers.create_task","title":"create_task","text":"
    create_task(worker)\n
    Source code in fluid/utils/worker.py
    def create_task(self, worker: Worker) -> asyncio.Task:\n    return asyncio.create_task(\n        self._run_worker(worker), name=f\"{self.worker_name}-{worker.worker_name}\"\n    )\n
    "},{"location":"reference/workers/#fluid.utils.worker.Workers.on_shutdown","title":"on_shutdown async","text":"
    on_shutdown()\n

    called after the workers are stopped

    Source code in fluid/utils/worker.py
    async def on_shutdown(self) -> None:\n    \"\"\"called after the workers are stopped\"\"\"\n
    "},{"location":"reference/workers/#fluid.utils.worker.Workers.shutdown","title":"shutdown async","text":"
    shutdown()\n

    shutdown the workers

    Source code in fluid/utils/worker.py
    async def shutdown(self) -> None:\n    \"\"\"shutdown the workers\"\"\"\n    if self._has_shutdown:\n        return\n    self._has_shutdown = True\n    logger.warning(\n        \"gracefully stopping %d workers: %s\",\n        self.num_workers,\n        \", \".join(w.worker_name for w in self._workers.workers),\n    )\n    self.gracefully_stop()\n    try:\n        async with async_timeout.timeout(self._stopping_grace_period):\n            await self.wait_for_exit()\n        await self.on_shutdown()\n        return\n    except asyncio.TimeoutError:\n        logger.warning(\n            \"could not stop workers %s gracefully after %s\"\n            \" seconds - force shutdown\",\n            \", \".join(\n                task.get_name() for task in self._workers.tasks if not task.done()\n            ),\n            self._stopping_grace_period,\n        )\n    except asyncio.CancelledError:\n        pass\n    self._force_shutdown = True\n    self._workers.cancel()\n    try:\n        await self.wait_for_exit()\n    except asyncio.CancelledError:\n        pass\n    await self.on_shutdown()\n
    "},{"location":"reference/workers/#fluid.utils.worker.Workers.bail_out","title":"bail_out","text":"
    bail_out(reason, code=1)\n
    Source code in fluid/utils/worker.py
    def bail_out(self, reason: str, code: int = 1) -> None:\n    self.gracefully_stop()\n
    "},{"location":"reference/workers/#fluid.utils.worker.Workers.safe_run","title":"safe_run async","text":"
    safe_run()\n

    Context manager to run a worker safely

    Source code in fluid/utils/worker.py
    @asynccontextmanager\nasync def safe_run(self) -> AsyncGenerator:\n    \"\"\"Context manager to run a worker safely\"\"\"\n    try:\n        yield\n    except asyncio.CancelledError:\n        if self._force_shutdown:\n            # we are shutting down, this is expected\n            pass\n        raise\n    except Exception as e:\n        reason = f\"unhandled exception while running workers: {e}\"\n        logger.exception(reason)\n        asyncio.get_event_loop().call_soon(self.bail_out, reason, 2)\n    else:\n        # worker finished without error\n        # make sure we are shutting down\n        asyncio.get_event_loop().call_soon(self.bail_out, \"worker exit\", 1)\n
    "},{"location":"reference/workers/#fluid.utils.worker.Workers.add_workers","title":"add_workers","text":"
    add_workers(*workers)\n

    add workers to the workers

    Source code in fluid/utils/worker.py
    def add_workers(self, *workers: Worker) -> None:\n    \"\"\"add workers to the workers\"\"\"\n    workers_, _ = self._workers.workers_tasks()\n    for worker in workers:\n        if worker not in workers_:\n            workers_.append(worker)\n
    "},{"location":"reference/workers/#fluid.utils.worker.Workers.run","title":"run async","text":"
    run()\n

    run the workers

    Source code in fluid/utils/worker.py
    async def run(self) -> None:\n    \"\"\"run the workers\"\"\"\n    with self.start_running():\n        async with self.safe_run():\n            workers, _ = self._workers.workers_tasks()\n            self._workers.workers = tuple(workers)\n            self._workers.tasks = tuple(\n                self.create_task(worker) for worker in workers\n            )\n            await asyncio.gather(*self._workers.tasks)\n        await self.shutdown()\n
    "},{"location":"reference/workers/#fluid.utils.worker.Workers.wait_for_exit","title":"wait_for_exit async","text":"
    wait_for_exit()\n
    Source code in fluid/utils/worker.py
    async def wait_for_exit(self) -> None:\n    if self._workers_task is not None:\n        await self._workers_task\n
    "},{"location":"reference/workers/#fluid.utils.worker.Workers.remove_workers","title":"remove_workers","text":"
    remove_workers(*workers)\n

    remove workers from the workers

    Source code in fluid/utils/worker.py
    def remove_workers(self, *workers: Worker) -> None:\n    \"remove workers from the workers\"\n    workers_, _ = self._workers.workers_tasks()\n    for worker in workers:\n        try:\n            workers_.remove(worker)\n        except ValueError:\n            pass\n
    "},{"location":"reference/workers/#fluid.utils.worker.Workers.startup","title":"startup async","text":"
    startup()\n

    start the workers

    Source code in fluid/utils/worker.py
    async def startup(self) -> None:\n    \"\"\"start the workers\"\"\"\n    if self._workers_task is None:\n        self._workers_task = asyncio.create_task(self.run(), name=self.worker_name)\n        for args in self._delayed_callbacks:\n            self._delayed_callback(*args)\n        self._delayed_callbacks = []\n
    "},{"location":"reference/workers/#fluid.utils.worker.Workers.register_callback","title":"register_callback","text":"
    register_callback(\n    callback, seconds, jitter=0.0, periodic=False\n)\n

    Register a callback

    The callback can be periodic or not.

    Source code in fluid/utils/worker.py
    def register_callback(\n    self,\n    callback: Callable[[], None],\n    seconds: float,\n    jitter: float = 0.0,\n    periodic: bool | float = False,\n) -> None:\n    \"\"\"Register a callback\n\n    The callback can be periodic or not.\n    \"\"\"\n    if periodic is True:\n        periodic_float = seconds\n    elif periodic is False:\n        periodic_float = 0.0\n    else:\n        periodic_float = periodic\n    if not self.running:\n        self._delayed_callbacks.append((callback, seconds, jitter, periodic_float))\n    else:\n        self._delayed_callback(callback, seconds, jitter, periodic_float)\n
    "},{"location":"tutorials/","title":"Tutorials","text":"

    The step-by-step guides, the how-to's, the recipes, and all the Aio Fluid parts you can use in your applications.

    "},{"location":"tutorials/db/","title":"Async Database","text":"

    The fluid.db module provides a simple asynchronous interface to interact with postgres databases. It is built on top of the sqlalchemy and asyncpg libraries.

    "},{"location":"tutorials/scheduler/","title":"Task Queue","text":"

    This module has a lightweight implementation of a distributed task producer (TaskScheduler) and consumer (TaskConsumer). The middleware for distributing tasks can be configured via the Broker interface. A redis broker is provided for convenience.

    "},{"location":"tutorials/scheduler/#tasks","title":"Tasks","text":"

    Tasks are standard python async functions decorated with the task decorator.

    from fluid.scheduler import task, TaskRun\n\n@task\nasync def say_hi(ctx: TaskRun):\n    return \"Hi!\"\n

    There are two types of tasks implemented

    • Simple concurrent tasks - they run concurrently with the task consumer - thy must be IO type tasks (no heavy CPU bound operations)
      from fluid.scheduler import task, TaskRun\n\n  @task\n  async def fecth_data(ctx: TaskRun):\n      # fetch data\n      data = await http_cli.get(\"https://...\")\n      data_id = await datastore_cli.stote(data)\n      # trigger another task\n      ctx.task_manager.queue(\"heavy_calculation\", data_id=data_id)\n
    • CPU bound tasks - they run on a subprocess
    from fluid.scheduler import task, TaskRun\n\n@task(cpu_bound=True)\nasync def heavy_calculation(ctx: TaskRun):\n    data = await datastore_cli.get(ctx.params[\"data_id\"])\n    # perform some heavy calculation\n    ...\n    # trigger another task\n    ctx.task_manager.queue(\"fetch_data\")\n

    Both tasks can be periodically scheduled via the schedule keyword argument:

    from datetime import timedelta\nfrom fluid.scheduler import task, TaskContext, every\n\n@task(schedule=every(timedelta(seconds=1)))\nasync def scheduled(context: TaskContext) -> str:\n    await asyncio.sleep(0.1)\n    return \"OK\"\n
    "},{"location":"tutorials/scheduler/#broker","title":"Broker","text":"

    A Task broker needs to implement three abstract methods

      @abstractmethod\n  async def queue_task(self, queued_task: QueuedTask) -> TaskRun:\n      \"\"\"Queue a task\"\"\"\n\n  @abstractmethod\n  async def get_task_run(self) -> Optional[TaskRun]:\n      \"\"\"Get a Task run from the task queue\"\"\"\n\n  @abstractmethod\n  async def queue_length(self) -> Dict[str, int]:\n      \"\"\"Length of task queues\"\"\"\n

    The library ships a Redis broker for convenience.

    from fluid.scheduler import Broker\n\nredis_broker = Broker.from_url(\"redis://localhost:6349\")\n
    "}]} \ No newline at end of file diff --git a/sitemap.xml b/sitemap.xml index 15d8dd6..11932d4 100644 --- a/sitemap.xml +++ b/sitemap.xml @@ -20,6 +20,14 @@ https://aio-fluid.com/reference/task_run/ 2024-10-19 + + https://aio-fluid.com/reference/tast_consumer/ + 2024-10-19 + + + https://aio-fluid.com/reference/workers/ + 2024-10-19 + https://aio-fluid.com/tutorials/ 2024-10-19 diff --git a/sitemap.xml.gz b/sitemap.xml.gz index cd257f5..64cdfed 100644 Binary files a/sitemap.xml.gz and b/sitemap.xml.gz differ diff --git a/tutorials/db/index.html b/tutorials/db/index.html index 25e507d..8f83b8d 100644 --- a/tutorials/db/index.html +++ b/tutorials/db/index.html @@ -22,7 +22,7 @@ - Database - Aio Fluid + Async Database - Aio Fluid @@ -76,7 +76,7 @@
    - + Skip to content @@ -112,7 +112,7 @@
    - Database + Async Database
    @@ -314,7 +314,7 @@ - Reference - Code API + Introduction @@ -387,6 +387,48 @@ + + + + + + +
  • + + + + + Task Consumer + + + + +
  • + + + + + + + + + + +
  • + + + + + Workers + + + + +
  • + + + + @@ -472,7 +514,7 @@ - Database + Async Database @@ -547,7 +589,7 @@ -

    Database

    +

    Async Database

    The fluid.db module provides a simple asynchronous interface to interact with postgres databases. It is built on top of the sqlalchemy and asyncpg libraries.

    diff --git a/tutorials/index.html b/tutorials/index.html index 50fce31..32df40d 100644 --- a/tutorials/index.html +++ b/tutorials/index.html @@ -11,7 +11,7 @@ - + @@ -314,7 +314,7 @@ - Reference - Code API + Introduction @@ -387,6 +387,48 @@ + + + + + + +
  • + + + + + Task Consumer + + + + +
  • + + + + + + + + + + +
  • + + + + + Workers + + + + +
  • + + + + @@ -473,7 +515,7 @@ - Database + Async Database diff --git a/tutorials/scheduler/index.html b/tutorials/scheduler/index.html index ebf1eb4..e681b79 100644 --- a/tutorials/scheduler/index.html +++ b/tutorials/scheduler/index.html @@ -312,7 +312,7 @@ - Reference - Code API + Introduction @@ -385,6 +385,48 @@ + + + + + + +
  • + + + + + Task Consumer + + + + +
  • + + + + + + + + + + +
  • + + + + + Workers + + + + +
  • + + + + @@ -461,7 +503,7 @@ - Database + Async Database