Skip to content

Commit 2f0b9ee

Browse files
committed
2 parents 799e1ed + 7513099 commit 2f0b9ee

File tree

4 files changed

+108
-2
lines changed

4 files changed

+108
-2
lines changed

.github/dependabot.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,8 @@ updates:
1313
commit-message:
1414
prefix: ":arrow_up:"
1515
open-pull-requests-limit: 5
16+
17+
- package-ecosystem: "github-actions"
18+
directory: "/"
19+
schedule:
20+
interval: "monthly"

.github/workflows/build.yml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,15 @@ jobs:
4646
fastapi \
4747
tenacity \
4848
portend \
49-
testcontainers
49+
testcontainers \
50+
async-timeout
5051
5152
- name: Run tests
5253
run: |
5354
make test-unit
5455
5556
- name: Upload coverage to Codecov
56-
uses: codecov/codecov-action@v4
57+
uses: codecov/codecov-action@v5
5758
if: contains(env.USING_COVERAGE, matrix.python-version)
5859
with:
5960
token: ${{ secrets.CODECOV_TOKEN }}

README.md

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,39 @@ async def endless(req: Request):
6969
return EventSourceResponse(event_publisher())
7070
```
7171

72+
# Thread Safety with SQLAlchemy Sessions and Similar Objects
73+
74+
The streaming portion of this package is accomplished via anyio TaskGroups. Care
75+
needs to be taken to avoid passing objects that are not thread-safe to generators
76+
you use to yield streaming data.
77+
78+
For example, if you are using SQLAlchemy, you should not use/pass an `AsyncSession`
79+
object to your generator:
80+
81+
```python
82+
# ❌ This can result in "The garbage collector is trying to clean up non-checked-in connection..." errors
83+
async def bad_route():
84+
async with AsyncSession() as session:
85+
async def generator():
86+
async for row in session.execute(select(User)):
87+
yield dict(data=row)
88+
89+
return EventSourceResponse(generator)
90+
```
91+
92+
Instead, ensure you create sessions within the generator itself
93+
94+
```python
95+
# ✅ This is safe
96+
async def good_route():
97+
async def generator():
98+
async with AsyncSession() as session:
99+
async for row in session.execute(select(User)):
100+
yield dict(data=row)
101+
102+
return EventSourceResponse(generator)
103+
```
104+
72105
## Special use cases
73106
### Customize Ping
74107
By default, the server sends a ping every 15 seconds. You can customize this by:
@@ -112,6 +145,26 @@ Async generators can expose tricky error and cleanup behavior especially when th
112145
Example [`no_async_generators.py`](https://github.com/sysid/sse-starlette/pull/56#issue-1704495339) shows an alternative implementation
113146
that does not rely on async generators but instead uses memory channels (`examples/no_async_generators.py`).
114147

148+
### Using pytest to test SSE Endpoints
149+
When testing more than a single SSE endpoint via pytest, one may encounter the following error: `RuntimeError: <asyncio.locks.Event object at 0xxxx [unset]> is bound to a different event loop`.
150+
151+
A workaround to fix this error is to use the following fixture on all tests that use SSE:
152+
153+
```python
154+
@pytest.fixture
155+
def reset_sse_starlette_appstatus_event():
156+
"""
157+
Fixture that resets the appstatus event in the sse_starlette app.
158+
159+
Should be used on any test that uses sse_starlette to stream events.
160+
"""
161+
# See https://github.com/sysid/sse-starlette/issues/59
162+
from sse_starlette.sse import AppStatus
163+
164+
AppStatus.should_exit_event = None
165+
```
166+
167+
For details, see [Issue#59](https://github.com/sysid/sse-starlette/issues/59#issuecomment-1961678665).
115168

116169
## Development, Contributing
117170
1. install pdm: `pip install pdm`
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import typing as T
2+
3+
import sqlalchemy as sa
4+
from fastapi import Depends, FastAPI
5+
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
6+
7+
from sse_starlette.sse import EventSourceResponse
8+
9+
# Database
10+
db_bind = create_async_engine("sqlite+aiosqlite://:memory:")
11+
AsyncSessionLocal = async_sessionmaker(bind=db_bind, expire_on_commit=False)
12+
13+
14+
async def async_db_session():
15+
async with AsyncSessionLocal() as session:
16+
yield session
17+
18+
19+
AsyncDbSessionDependency = T.Annotated[AsyncSession, Depends(async_db_session)]
20+
21+
TODOS_CTE_SQL = """
22+
WITH todo AS (
23+
SELECT 1 AS id, 'Task 1' AS title, 'Description 1' AS description, 0 AS completed
24+
UNION ALL
25+
SELECT 2, 'Task 2', 'Description 2', 1
26+
UNION ALL
27+
SELECT 3, 'Task 3', 'Description 3', 0
28+
)
29+
30+
SELECT * FROM todo
31+
"""
32+
33+
# App
34+
app = FastAPI()
35+
36+
37+
@app.route("/things")
38+
async def things(db_session: AsyncDbSessionDependency):
39+
# Safe to use db_session here to do auth or something else.
40+
async def thing_streamer():
41+
# Do *NOT* reuse db_session here within the AsyncGenerator, create a
42+
# new session instead.
43+
async with AsyncSessionLocal() as session:
44+
async for row in session.execute(sa.text(TODOS_CTE_SQL)):
45+
yield {"data": dict(row)}
46+
47+
return EventSourceResponse(thing_streamer)

0 commit comments

Comments
 (0)