Skip to content

Commit

Permalink
Merge pull request #112 from msukmanowsky/main
Browse files Browse the repository at this point in the history
chore (docs): Add documentation note and example about thread safety
  • Loading branch information
sysid authored Oct 10, 2024
2 parents c2c8bbc + 1d8307f commit ec270e2
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 0 deletions.
33 changes: 33 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,39 @@ async def endless(req: Request):
return EventSourceResponse(event_publisher())
```

# Thread Safety with SQLAlchemy Sessions and Similar Objects

The streaming portion of this package is accomplished via anyio TaskGroups. Care
needs to be taken to avoid passing objects that are not thread-safe to generators
you use to yield streaming data.

For example, if you are using SQLAlchemy, you should not use/pass an `AsyncSession`
object to your generator:

```python
# ❌ This can result in "The garbage collector is trying to clean up non-checked-in connection..." errors
async def bad_route():
async with AsyncSession() as session:
async def generator():
async for row in session.execute(select(User)):
yield dict(data=row)

return EventSourceResponse(generator)
```

Instead, ensure you create sessions within the generator itself

```python
# ✅ This is safe
async def good_route():
async def generator():
async with AsyncSession() as session:
async for row in session.execute(select(User)):
yield dict(data=row)

return EventSourceResponse(generator)
```

## Special use cases
### Customize Ping
By default, the server sends a ping every 15 seconds. You can customize this by:
Expand Down
47 changes: 47 additions & 0 deletions examples/example_fastapi_sqlalchemy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import typing as T

import sqlalchemy as sa
from fastapi import Depends, FastAPI
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine

from sse_starlette.sse import EventSourceResponse

# Database
db_bind = create_async_engine("sqlite+aiosqlite://:memory:")
AsyncSessionLocal = async_sessionmaker(bind=db_bind, expire_on_commit=False)


async def async_db_session():
async with AsyncSessionLocal() as session:
yield session


AsyncDbSessionDependency = T.Annotated[AsyncSession, Depends(async_db_session)]

TODOS_CTE_SQL = """
WITH todo AS (
SELECT 1 AS id, 'Task 1' AS title, 'Description 1' AS description, 0 AS completed
UNION ALL
SELECT 2, 'Task 2', 'Description 2', 1
UNION ALL
SELECT 3, 'Task 3', 'Description 3', 0
)
SELECT * FROM todo
"""

# App
app = FastAPI()


@app.route("/things")
async def things(db_session: AsyncDbSessionDependency):
# Safe to use db_session here to do auth or something else.
async def thing_streamer():
# Do *NOT* reuse db_session here within the AsyncGenerator, create a
# new session instead.
async with AsyncSessionLocal() as session:
async for row in session.execute(sa.text(TODOS_CTE_SQL)):
yield {"data": dict(row)}

return EventSourceResponse(thing_streamer)

0 comments on commit ec270e2

Please sign in to comment.