Skip to content

feat: merge Broker(context=...) and FastStream(context=...) at broker-level #2693

@nectarindev

Description

@nectarindev

I recently migrated my project to faststream 0.6. I was very interested in how I could add my dependencies to the context. Prior to version 0.6, I did something like this:

from faststream.annotations import ContextRepo
from faststream.kafka import KafkaBroker
from faststream.utils.context import context

broker = KafkaBroker()


@broker.subscriber("my_topic", group_id="my_group")
async def handle(
    context: ContextRepo,
):
    print("dependency: ", context.get("dependency"))  # 42


async def lifespan(*args, **kwargs):
    context.set_global("dependency", 42)

    await broker.start()
    try:
        yield
    finally:
        await broker.stop()

I launched the broker as part of my application in lifespan without using the FastStream class.

For version 0.6, I saw examples where it was suggested to pass the context to FastStream, but that solution did not suit me. I discovered that the broker also accepts context, and that solves my problem:

broker = KafkaBroker(context=ContextRepo({"dependency": 42}))

...

async def lifespan(*args, **kwargs):
    await broker.start()
    try:
        yield
    finally:
        await broker.stop()

But I also discovered that if I create a FastStream instance, its context will be used, even though I didn't use it to start the broker.

from fastapi import FastAPI
from faststream import ContextRepo, FastStream
from faststream.kafka import KafkaBroker

broker = KafkaBroker(context=ContextRepo({"broker_dependency": 2}))
app = FastStream(broker, context=ContextRepo({"application_dependency": 1}))


@broker.subscriber("my_topic", group_id="my_group")
async def handle(
    context: ContextRepo,
):
    print("broker_dependency: ", context.get("broker_dependency"))            # None
    print("application_dependency: ", context.get("application_dependency"))  # 1


async def lifespan(*args, **kwargs):
    await broker.start()
    try:
        yield
    finally:
        await broker.stop()


asgi = FastAPI(lifespan=lifespan)

I'm not sure that's normal behavior. It would make much more sense if only the broker's dependency were available.


Running FastStream 0.6.3 with CPython 3.12.4 on Linux

Metadata

Metadata

Assignees

No one assigned

    Labels

    CoreIssues related to core FastStream functionality and affects to all brokersenhancementNew feature or requestgood first issueGood for newcomers

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions