-
Notifications
You must be signed in to change notification settings - Fork 85
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #384 from PrefectHQ/stream
Support streaming events
- Loading branch information
Showing
15 changed files
with
899 additions
and
462 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,252 @@ | ||
--- | ||
title: Streaming | ||
description: Process agent responses, tool calls and results in real-time through streaming or handlers. | ||
icon: bars-staggered | ||
--- | ||
|
||
import { VersionBadge } from '/snippets/version-badge.mdx' | ||
|
||
|
||
ControlFlow provides two ways to process events during task execution: | ||
- [**Streaming**](#streaming): Iterate over events in real-time using a Python iterator | ||
- [**Handlers**](#handlers): Register callback functions that are called for each event | ||
|
||
Both approaches give you access to the same events - which one you choose depends on how you want to integrate with your application. | ||
|
||
## Streaming | ||
|
||
<VersionBadge version="0.12.0" /> | ||
|
||
When you enable streaming, task execution returns an iterator that yields events as they occur. Each iteration provides a tuple of (event, snapshot, delta) representing what just happened in the workflow: | ||
|
||
```python | ||
import controlflow as cf | ||
|
||
for event, snapshot, delta in cf.run( | ||
"Write a poem about AI", | ||
stream=True, | ||
): | ||
# For complete events, snapshot contains the full content | ||
if event.event == "agent-content": | ||
print(f"Agent wrote: {snapshot}") | ||
|
||
# For delta events, delta contains just what's new | ||
elif event.event == "agent-content-delta": | ||
print(delta, end="", flush=True) | ||
``` | ||
|
||
You can focus on specific events using the `Stream` enum. Here, we return only content updates: | ||
|
||
```python | ||
import controlflow as cf | ||
|
||
# Only stream content updates | ||
for event, snapshot, delta in cf.run( | ||
"Write a poem", | ||
stream=cf.Stream.CONTENT | ||
): | ||
print(delta if delta else snapshot) | ||
``` | ||
|
||
The available stream filters are: | ||
- `Stream.ALL`: All events (equivalent to `stream=True`) | ||
- `Stream.CONTENT`: Agent content and content deltas | ||
- `Stream.TOOLS`: All tool events | ||
- `Stream.COMPLETION_TOOLS`: Completion tool events (like marking a task successful or failed) | ||
- `Stream.AGENT_TOOLS`: Tools used by agents for any purpose other than completing a task | ||
- `Stream.TASK_EVENTS`: Task lifecycle events (starting, completion, failure, etc) | ||
|
||
You can combine filters with the `|` operator: | ||
|
||
```python | ||
# Stream content and tool events | ||
stream = Stream.CONTENT | Stream.TOOLS | ||
``` | ||
|
||
For more complex filtering, set stream=True and filter the events manually, or use a handler. | ||
|
||
## Handlers | ||
<VersionBadge version="0.9.2" /> | ||
|
||
For more complex event processing, or when you want to decouple event handling from your main workflow, use handlers: | ||
|
||
```python | ||
from controlflow.orchestration.handler import Handler | ||
from controlflow.events.events import AgentMessage | ||
|
||
class LoggingHandler(Handler): | ||
def on_agent_message(self, event: AgentMessage): | ||
print(f"Agent {event.agent.name} said: {event.message['content']}") | ||
|
||
def on_tool_result(self, event: ToolResult): | ||
print(f"Tool call result: {event.tool_result.str_result}") | ||
|
||
# Use the handler | ||
cf.run("Write a poem", handlers=[LoggingHandler()]) | ||
``` | ||
|
||
Handlers are especially useful for: | ||
- Adding logging or monitoring | ||
- Collecting metrics | ||
- Updating UI elements | ||
- Processing events asynchronously | ||
|
||
Handlers call their `on_<event-name>` methods for each event type. For a complete list of available methods, see the [Event Details](#event-details) section below. | ||
|
||
|
||
### Async Handlers | ||
|
||
<VersionBadge version="0.11.1" /> | ||
|
||
For asynchronous event processing, use `AsyncHandler`: | ||
|
||
```python | ||
import asyncio | ||
from controlflow.orchestration.handler import AsyncHandler | ||
|
||
class AsyncLoggingHandler(AsyncHandler): | ||
async def on_agent_message(self, event: AgentMessage): | ||
await asyncio.sleep(0.1) # Simulate async operation | ||
print(f"Agent {event.agent.name} said: {event.message['content']}") | ||
|
||
await cf.run_async("Write a poem", handlers=[AsyncLoggingHandler()]) | ||
``` | ||
|
||
## Example: Real-time Content Display | ||
|
||
Here's a complete example showing both approaches to display content in real-time: | ||
|
||
<CodeGroup> | ||
```python Streaming | ||
import controlflow as cf | ||
|
||
for event, snapshot, delta in cf.run( | ||
"Write a story about time travel", | ||
stream=cf.Stream.CONTENT | ||
): | ||
# Print character by character | ||
if delta: | ||
print(delta, end="", flush=True) | ||
``` | ||
|
||
```python Handler | ||
import controlflow as cf | ||
from controlflow.orchestration.handler import Handler | ||
|
||
class ContentHandler(Handler): | ||
def on_agent_content_delta(self, event): | ||
# Print character by character | ||
print(event.content_delta, end="", flush=True) | ||
|
||
cf.run( | ||
"Write a story about time travel", | ||
handlers=[ContentHandler()] | ||
) | ||
``` | ||
</CodeGroup> | ||
|
||
## Event Details | ||
|
||
Now that we've seen how to process events, let's look at the types of events you can receive: | ||
|
||
### Content Events | ||
Content events give you access to what an agent is saying or writing: | ||
|
||
```python | ||
# Complete content | ||
{ | ||
"event": "agent-content", | ||
"agent": agent, # Agent object | ||
"content": "Hello, world!", # The complete content | ||
"agent_message_id": "msg_123" # Optional ID linking to parent message | ||
} | ||
|
||
# Content delta (incremental update) | ||
{ | ||
"event": "agent-content-delta", | ||
"agent": agent, | ||
"content_delta": "Hello", # New content since last update | ||
"content_snapshot": "Hello, world!", # Complete content so far | ||
"agent_message_id": "msg_123" | ||
} | ||
``` | ||
|
||
### Tool Events | ||
Tool events let you observe when agents use tools and get their results: | ||
|
||
```python | ||
# Tool being called | ||
{ | ||
"event": "agent-tool-call", | ||
"agent": agent, | ||
"tool_call": {...}, # The complete tool call info | ||
"tool": tool, # The Tool object being called | ||
"args": {...}, # Arguments passed to the tool | ||
"agent_message_id": "msg_123" | ||
} | ||
|
||
# Tool call delta (incremental update) | ||
{ | ||
"event": "agent-tool-call-delta", | ||
"agent": agent, | ||
"tool_call_delta": {...}, # Changes to the tool call | ||
"tool_call_snapshot": {...}, # Complete tool call info so far | ||
"tool": tool, | ||
"args": {...}, | ||
"agent_message_id": "msg_123" | ||
} | ||
|
||
# Tool result | ||
{ | ||
"event": "tool-result", | ||
"agent": agent, | ||
"tool_result": { | ||
"tool_call": {...}, # The original tool call | ||
"tool": tool, # The Tool object that was called | ||
"result": any, # The raw result value | ||
"str_result": "...", # String representation of result | ||
"is_error": False # Whether the tool call failed | ||
} | ||
} | ||
``` | ||
|
||
### Workflow Events | ||
### Task Events | ||
Events that mark key points in a task's lifecycle: | ||
- `TaskStart`: A task has begun execution | ||
- `TaskSuccess`: A task completed successfully (includes the final result) | ||
- `TaskFailure`: A task failed (includes the error reason) | ||
- `TaskSkipped`: A task was skipped | ||
|
||
### Orchestration Events | ||
Events related to orchestrating the overall workflow: | ||
- `OrchestratorStart`/`End`: Workflow orchestration starting/ending | ||
- `AgentTurnStart`/`End`: An agent's turn starting/ending | ||
- `OrchestratorError`: An error occurred during orchestration | ||
|
||
### Handler Methods | ||
|
||
Each handler can implement methods for different types of events. The method will be called whenever that type of event occurs. Here are all available handler methods: | ||
|
||
| Method | Event Type | Description | | ||
|--------|------------|-------------| | ||
| `on_event(event)` | Any | Called for every event, before any specific handler | | ||
| `on_agent_message(event)` | AgentMessage | Raw LLM output containing both content and tool calls | | ||
| `on_agent_message_delta(event)` | AgentMessageDelta | Incremental updates to raw LLM output | | ||
| `on_agent_content(event)` | AgentContent | Unstructured text output from an agent | | ||
| `on_agent_content_delta(event)` | AgentContentDelta | Incremental updates to agent content | | ||
| `on_agent_tool_call(event)` | AgentToolCall | Tool being called by an agent | | ||
| `on_agent_tool_call_delta(event)` | AgentToolCallDelta | Incremental updates to a tool call | | ||
| `on_tool_result(event)` | ToolResult | Result returned from a tool | | ||
| `on_orchestrator_start(event)` | OrchestratorStart | Workflow orchestration starting | | ||
| `on_orchestrator_end(event)` | OrchestratorEnd | Workflow orchestration completed | | ||
| `on_agent_turn_start(event)` | AgentTurnStart | An agent beginning their turn | | ||
| `on_agent_turn_end(event)` | AgentTurnEnd | An agent completing their turn | | ||
| `on_orchestrator_error(event)` | OrchestratorError | Error during orchestration | | ||
|
||
Note that AgentMessage is the "raw" output from the LLM and contains both unstructured content and structured tool calls. When you receive an AgentMessage, you will also receive separate AgentContent and/or AgentToolCall events for any content or tool calls contained in that message. This allows you to: | ||
1. Handle all LLM output in one place with `on_agent_message` | ||
2. Handle just content with `on_agent_content` | ||
3. Handle just tool calls with `on_agent_tool_call` | ||
|
||
For streaming cases, the delta events (e.g. AgentMessageDelta, AgentContentDelta) provide incremental updates as the LLM generates its response. Task events, in contrast, are complete events that mark important points in a task's lifecycle - you can use these to track progress and get results without managing the task object directly.. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.