diff --git a/app/application.py b/app/application.py index dace6b9..2cf8ee1 100644 --- a/app/application.py +++ b/app/application.py @@ -1,3 +1,6 @@ +import asyncio +import random + from faststream import FastStream, Logger from faststream.kafka import KafkaBroker from pydantic import BaseModel, Field @@ -12,7 +15,9 @@ class Greeting(BaseModel): broker = KafkaBroker("localhost:9092") -app = FastStream(broker) +app = FastStream( + broker, title="My service", version="0.1.0", description="My service description" +) to_greetings = broker.publisher("greetings") @@ -20,5 +25,28 @@ class Greeting(BaseModel): @broker.subscriber("names") # type: ignore async def on_names(msg: Name, logger: Logger) -> None: result = f"hello {msg.name}" + logger.info(result) greeting = Greeting(greeting=result) await to_greetings.publish(greeting) + + +@app.after_startup +async def publish_names() -> None: + async def _publish_names() -> None: + while True: + name = random.choice( # nosec + [ + "Ana", + "Mario", + "Pedro", + "João", + "Gustavo", + "Joana", + "Mariana", + "Juliana", + ], + ) + await broker.publish(Name(name=name), topic="names") + await asyncio.sleep(2) + + asyncio.create_task(_publish_names())