Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion dspy/streaming/streamify.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,12 @@ async def async_streamer(*args, **kwargs):
else:
# We are receiving a chunk from the LM's response stream, delegate it to the listeners to
# determine if we should yield a value to the user.
for listener in predict_id_to_listener[value.predict_id]:
# Reorder listeners so that active (buffering) listeners are processed first
# This ensures buffered chunks are flushed in the correct order.
listeners = predict_id_to_listener[value.predict_id]
listeners.sort(key=lambda x: x.stream_start, reverse=True)

for listener in listeners:
# In some special cases such as Citation API, it is possible that multiple listeners
# return values at the same time due to the chunk buffer of the listener.
if output := listener.receive(value):
Expand Down
25 changes: 21 additions & 4 deletions dspy/streaming/streaming_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,6 @@ def receive(self, chunk: ModelResponseStream):

try:
chunk_message = chunk.choices[0].delta.content
if chunk_message is None:
return
except Exception:
return

Expand All @@ -112,6 +110,20 @@ def receive(self, chunk: ModelResponseStream):
is_last_chunk=self.stream_end,
)

# If we receive an empty chunk but streaming has started, flush the buffer.
# LiteLLM does not send completely empty chunks (https://github.com/BerriAI/litellm/blob/main/litellm/litellm_core_utils/model_response_utils.py#L10),
# so empty content means it has other native fields such as provider_specific_fields.
if not chunk_message:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this correct? From the log shared internally, the LM can produce citation chunks in between of field chunks, then it could lead to unintended flush?

Copy link
Collaborator Author

@TomeHirata TomeHirata Oct 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flush is for finding the end token, right? It is true that the native chunk is passed between field chunks, when the citation chunk is passed, the end token (like [[ ##) shouldn't be in the queue. So it's a safe time to flush the tokens for the ongoing string field.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not entirely sure about the LM streaming order, like how it tangles normal text with native features/events, but if it looks like:

  1. text: what
  2. text: the
  3. citation:
  4. text: jesus?
  5. text: [[ ##
  6. citation:
  7. text: completed ## ]]

Then at step 6, [[ ## will be yielded by flush call.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that example, does it mean completed ## ]] is supported by the citation #5? I'm assuming that LM produces the native response and string response in the right order. If it makes mistakes, yeah it won't work well.

if self.stream_start:
if token := self._get_last_token():
return StreamResponse(
self.predict_name,
self.signature_field_name,
token,
is_last_chunk=False,
)
return

if chunk_message and start_identifier in chunk_message:
# If the cache is hit, the chunk_message could be the full response. When it happens we can
# directly end the stream listening. In some models like gemini, each stream chunk can be multiple
Expand Down Expand Up @@ -192,8 +204,7 @@ def flush(self) -> str:
are in the buffer because we don't directly yield the tokens received by the stream listener
with the purpose to not yield the end_identifier tokens, e.g., "[[ ## ... ## ]]" for ChatAdapter.
"""
last_tokens = "".join(self.field_end_queue.queue)
self.field_end_queue = Queue()
last_tokens = self._get_last_token()
if isinstance(settings.adapter, JSONAdapter):
match = re.search(r'",|"\s*}', last_tokens)
if match:
Expand All @@ -215,6 +226,12 @@ def flush(self) -> str:
f"{', '.join([a.__name__ for a in ADAPTER_SUPPORT_STREAMING])}"
)

def _get_last_token(self) -> str:
last_token = "".join(self.field_end_queue.queue)
self.field_end_queue = Queue()
return last_token


@property
def _output_type(self) -> type | None:
try:
Expand Down
4 changes: 4 additions & 0 deletions tests/streaming/test_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -1004,12 +1004,15 @@ async def citation_stream(*args, **kwargs):
output = program(documents=docs, question="What temperature does water boil?")
citation_chunks = []
answer_chunks = []
entire_chunks = []
final_prediction = None
async for value in output:
if isinstance(value, dspy.streaming.StreamResponse) and value.signature_field_name == "citations":
citation_chunks.append(value)
entire_chunks.append(f"[{value.chunk[0].document_index}]")
elif isinstance(value, dspy.streaming.StreamResponse) and value.signature_field_name == "answer":
answer_chunks.append(value.chunk)
entire_chunks.append(value.chunk)
elif isinstance(value, dspy.Prediction):
final_prediction = value

Expand All @@ -1023,6 +1026,7 @@ async def citation_stream(*args, **kwargs):

# Verify the answer chunks are correct
assert "".join(answer_chunks) == "According to the references, water boils at 100°C."
assert "".join(entire_chunks) == "According to the references,[0] water boils at 100°C."

# Test that prediction contains the expected fields
assert final_prediction is not None
Expand Down