Skip to content

Commit

Permalink
feat: add CLI support for AsgiFastStream (#1782)
Browse files Browse the repository at this point in the history
* feat: add CLI support for AsgiFastStream

* tests: ignore signal test at Windows

* Update asgi.md

* chore: bump version

---------

Co-authored-by: sehat1137 <edox1j2n@duck.com>
Co-authored-by: Nikita Pastukhov <diementros@yandex.ru>
Co-authored-by: Pastukhov Nikita <nikita@pastukhov-dev.ru>
  • Loading branch information
4 people committed Sep 23, 2024
1 parent 88e3018 commit a8b6572
Show file tree
Hide file tree
Showing 9 changed files with 463 additions and 224 deletions.
31 changes: 31 additions & 0 deletions docs/docs/en/getting-started/asgi.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,37 @@ app = AsgiFastStream(

Now, your **AsyncAPI HTML** representation can be found by the `/docs` url.

### FastStream object reusage

You may also use regular `FastStream` application object for similar result

```python linenums="1" hl_lines="2 9"
from faststream import FastStream
from faststream.nats import NatsBroker
from faststream.asgi import make_ping_asgi, AsgiResponse

broker = NatsBroker()

async def liveness_ping(scope, receive, send):
return AsgiResponse(b"", status_code=200)


app = FastStream(broker).as_asgi(
asgi_routes=[
("/liveness", liveness_ping),
("/readiness", make_ping_asgi(broker, timeout=5.0)),
],
asyncapi_path="/docs",
)
```

``` tip
For app which use ASGI you may use cli command like for default FastStream app
```shell
faststream run main:app --host 0.0.0.0 --port 8000 --workers 4
```

## Other ASGI Compatibility

Moreover, our wrappers can be used as ready-to-use endpoins for other **ASGI** frameworks. This can be very helpful When you are running **FastStream** in the same runtime as any other **ASGI** frameworks.
Expand Down
2 changes: 1 addition & 1 deletion faststream/__about__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Simple and fast framework to create message brokers based microservices."""

__version__ = "0.5.23"
__version__ = "0.5.24"

SERVICE_NAME = f"faststream-{__version__}"
Empty file.
207 changes: 207 additions & 0 deletions faststream/_internal/application.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
import logging
from abc import ABC, abstractmethod
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
List,
Optional,
Sequence,
TypeVar,
Union,
)

import anyio
from typing_extensions import ParamSpec

from faststream.asyncapi.proto import AsyncAPIApplication
from faststream.log.logging import logger
from faststream.utils import apply_types, context
from faststream.utils.functions import drop_response_type, fake_context, to_async

P_HookParams = ParamSpec("P_HookParams")
T_HookReturn = TypeVar("T_HookReturn")


if TYPE_CHECKING:
from faststream.asyncapi.schema import (
Contact,
ContactDict,
ExternalDocs,
ExternalDocsDict,
License,
LicenseDict,
Tag,
TagDict,
)
from faststream.broker.core.usecase import BrokerUsecase
from faststream.types import (
AnyDict,
AnyHttpUrl,
AsyncFunc,
Lifespan,
LoggerProto,
SettingField,
)


class Application(ABC, AsyncAPIApplication):
def __init__(
self,
broker: Optional["BrokerUsecase[Any, Any]"] = None,
logger: Optional["LoggerProto"] = logger,
lifespan: Optional["Lifespan"] = None,
# AsyncAPI args,
title: str = "FastStream",
version: str = "0.1.0",
description: str = "",
terms_of_service: Optional["AnyHttpUrl"] = None,
license: Optional[Union["License", "LicenseDict", "AnyDict"]] = None,
contact: Optional[Union["Contact", "ContactDict", "AnyDict"]] = None,
tags: Optional[Sequence[Union["Tag", "TagDict", "AnyDict"]]] = None,
external_docs: Optional[
Union["ExternalDocs", "ExternalDocsDict", "AnyDict"]
] = None,
identifier: Optional[str] = None,
on_startup: Sequence[Callable[P_HookParams, T_HookReturn]] = (),
after_startup: Sequence[Callable[P_HookParams, T_HookReturn]] = (),
on_shutdown: Sequence[Callable[P_HookParams, T_HookReturn]] = (),
after_shutdown: Sequence[Callable[P_HookParams, T_HookReturn]] = (),
) -> None:
context.set_global("app", self)

self._should_exit = anyio.Event()
self.broker = broker
self.logger = logger
self.context = context

self._on_startup_calling: List[AsyncFunc] = [
apply_types(to_async(x)) for x in on_startup
]
self._after_startup_calling: List[AsyncFunc] = [
apply_types(to_async(x)) for x in after_startup
]
self._on_shutdown_calling: List[AsyncFunc] = [
apply_types(to_async(x)) for x in on_shutdown
]
self._after_shutdown_calling: List[AsyncFunc] = [
apply_types(to_async(x)) for x in after_shutdown
]

if lifespan is not None:
self.lifespan_context = apply_types(
func=lifespan, wrap_model=drop_response_type
)
else:
self.lifespan_context = fake_context

# AsyncAPI information
self.title = title
self.version = version
self.description = description
self.terms_of_service = terms_of_service
self.license = license
self.contact = contact
self.identifier = identifier
self.asyncapi_tags = tags
self.external_docs = external_docs

@abstractmethod
async def run(
self,
log_level: int,
run_extra_options: Optional[Dict[str, "SettingField"]] = None,
sleep_time: float = 0.1,
) -> None: ...

def set_broker(self, broker: "BrokerUsecase[Any, Any]") -> None:
"""Set already existed App object broker.
Useful then you create/init broker in `on_startup` hook.
"""
self.broker = broker

def on_startup(
self,
func: Callable[P_HookParams, T_HookReturn],
) -> Callable[P_HookParams, T_HookReturn]:
"""Add hook running BEFORE broker connected.
This hook also takes an extra CLI options as a kwargs.
"""
self._on_startup_calling.append(apply_types(to_async(func)))
return func

def on_shutdown(
self,
func: Callable[P_HookParams, T_HookReturn],
) -> Callable[P_HookParams, T_HookReturn]:
"""Add hook running BEFORE broker disconnected."""
self._on_shutdown_calling.append(apply_types(to_async(func)))
return func

def after_startup(
self,
func: Callable[P_HookParams, T_HookReturn],
) -> Callable[P_HookParams, T_HookReturn]:
"""Add hook running AFTER broker connected."""
self._after_startup_calling.append(apply_types(to_async(func)))
return func

def after_shutdown(
self,
func: Callable[P_HookParams, T_HookReturn],
) -> Callable[P_HookParams, T_HookReturn]:
"""Add hook running AFTER broker disconnected."""
self._after_shutdown_calling.append(apply_types(to_async(func)))
return func

def exit(self) -> None:
"""Stop application manually."""
self._should_exit.set()

async def start(
self,
**run_extra_options: "SettingField",
) -> None:
"""Executes startup hooks and start broker."""
for func in self._on_startup_calling:
await func(**run_extra_options)

if self.broker is not None:
await self.broker.start()

for func in self._after_startup_calling:
await func()

async def stop(self) -> None:
"""Executes shutdown hooks and stop broker."""
for func in self._on_shutdown_calling:
await func()

if self.broker is not None:
await self.broker.close()

for func in self._after_shutdown_calling:
await func()

async def _startup(
self,
log_level: int = logging.INFO,
run_extra_options: Optional[Dict[str, "SettingField"]] = None,
) -> None:
self._log(log_level, "FastStream app starting...")
await self.start(**(run_extra_options or {}))
self._log(
log_level, "FastStream app started successfully! To exit, press CTRL+C"
)

async def _shutdown(self, log_level: int = logging.INFO) -> None:
self._log(log_level, "FastStream app shutting down...")
await self.stop()
self._log(log_level, "FastStream app shut down gracefully.")

def _log(self, level: int, message: str) -> None:
if self.logger is not None:
self.logger.log(level, message)
Loading

0 comments on commit a8b6572

Please sign in to comment.