Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support streaming events #384

Merged
merged 8 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/mint.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
"patterns/running-tasks",
"patterns/task-results",
"patterns/tools",
"patterns/streaming-tasks",
"patterns/interactivity",
"patterns/dependencies",
"patterns/memory",
Expand Down
40 changes: 40 additions & 0 deletions docs/patterns/running-tasks.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,47 @@ Crafting worlds and shaping dreams.

</CodeGroup>

## Streaming

<VersionBadge version="0.11" />

In addition to running tasks to completion, ControlFlow supports streaming events during task execution. This allows you to process or display intermediate outputs like agent messages, tool calls, and results in real-time.

To enable streaming, set `stream=True` when running tasks:

```python
import controlflow as cf

# Stream all events
for event, snapshot, delta in cf.run("Write a poem", stream=True, handlers=[]):
print(f"Event type: {event.event}")

if event.event == "agent-content":
print(f"Agent said: {snapshot}")
elif event.event == "agent-tool-call":
print(f"Tool called: {snapshot}")
```

You can also filter which events you want to receive using the `Stream` enum:

```python
import controlflow as cf

# Only stream content events
for event, content, delta in cf.run(
"Write a poem",
stream=cf.Stream.CONTENT,
handlers=[], # remove the default print handler
):
if delta:
# Print incremental content updates
print(delta, end="", flush=True)
else:
# Print complete messages
print(content)
```

For more details on working with streaming events, including programmatic event handlers, see the [Streaming guide](/patterns/streaming).

## Multi-Agent Collaboration
For tasks involving multiple agents, ControlFlow needs a way to manage their collaboration. What makes this more complicated than simply making an LLM call and moving on to the next agent is that it may take multiple LLM calls to complete a single agentic "turn" of work.
Expand Down
252 changes: 252 additions & 0 deletions docs/patterns/streaming-tasks.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
---
title: Streaming
description: Process agent responses, tool calls and results in real-time through streaming or handlers.
icon: bars-staggered
---

import { VersionBadge } from '/snippets/version-badge.mdx'


ControlFlow provides two ways to process events during task execution:
- [**Streaming**](#streaming): Iterate over events in real-time using a Python iterator
- [**Handlers**](#handlers): Register callback functions that are called for each event

Both approaches give you access to the same events - which one you choose depends on how you want to integrate with your application.

## Streaming

<VersionBadge version="0.12.0" />

When you enable streaming, task execution returns an iterator that yields events as they occur. Each iteration provides a tuple of (event, snapshot, delta) representing what just happened in the workflow:

```python
import controlflow as cf

for event, snapshot, delta in cf.run(
"Write a poem about AI",
stream=True,
):
# For complete events, snapshot contains the full content
if event.event == "agent-content":
print(f"Agent wrote: {snapshot}")

# For delta events, delta contains just what's new
elif event.event == "agent-content-delta":
print(delta, end="", flush=True)
```

You can focus on specific events using the `Stream` enum. Here, we return only content updates:

```python
import controlflow as cf

# Only stream content updates
for event, snapshot, delta in cf.run(
"Write a poem",
stream=cf.Stream.CONTENT
):
print(delta if delta else snapshot)
```

The available stream filters are:
- `Stream.ALL`: All events (equivalent to `stream=True`)
- `Stream.CONTENT`: Agent content and content deltas
- `Stream.TOOLS`: All tool events
- `Stream.COMPLETION_TOOLS`: Completion tool events (like marking a task successful or failed)
- `Stream.AGENT_TOOLS`: Tools used by agents for any purpose other than completing a task
- `Stream.TASK_EVENTS`: Task lifecycle events (starting, completion, failure, etc)

You can combine filters with the `|` operator:

```python
# Stream content and tool events
stream = Stream.CONTENT | Stream.TOOLS
```

For more complex filtering, set stream=True and filter the events manually, or use a handler.

## Handlers
<VersionBadge version="0.9.2" />

For more complex event processing, or when you want to decouple event handling from your main workflow, use handlers:

```python
from controlflow.orchestration.handler import Handler
from controlflow.events.events import AgentMessage

class LoggingHandler(Handler):
def on_agent_message(self, event: AgentMessage):
print(f"Agent {event.agent.name} said: {event.message['content']}")

def on_tool_result(self, event: ToolResult):
print(f"Tool call result: {event.tool_result.str_result}")

# Use the handler
cf.run("Write a poem", handlers=[LoggingHandler()])
```

Handlers are especially useful for:
- Adding logging or monitoring
- Collecting metrics
- Updating UI elements
- Processing events asynchronously

Handlers call their `on_<event-name>` methods for each event type. For a complete list of available methods, see the [Event Details](#event-details) section below.


### Async Handlers

<VersionBadge version="0.11.1" />

For asynchronous event processing, use `AsyncHandler`:

```python
import asyncio
from controlflow.orchestration.handler import AsyncHandler

class AsyncLoggingHandler(AsyncHandler):
async def on_agent_message(self, event: AgentMessage):
await asyncio.sleep(0.1) # Simulate async operation
print(f"Agent {event.agent.name} said: {event.message['content']}")

await cf.run_async("Write a poem", handlers=[AsyncLoggingHandler()])
```

## Example: Real-time Content Display

Here's a complete example showing both approaches to display content in real-time:

<CodeGroup>
```python Streaming
import controlflow as cf

for event, snapshot, delta in cf.run(
"Write a story about time travel",
stream=cf.Stream.CONTENT
):
# Print character by character
if delta:
print(delta, end="", flush=True)
```

```python Handler
import controlflow as cf
from controlflow.orchestration.handler import Handler

class ContentHandler(Handler):
def on_agent_content_delta(self, event):
# Print character by character
print(event.content_delta, end="", flush=True)

cf.run(
"Write a story about time travel",
handlers=[ContentHandler()]
)
```
</CodeGroup>

## Event Details

Now that we've seen how to process events, let's look at the types of events you can receive:

### Content Events
Content events give you access to what an agent is saying or writing:

```python
# Complete content
{
"event": "agent-content",
"agent": agent, # Agent object
"content": "Hello, world!", # The complete content
"agent_message_id": "msg_123" # Optional ID linking to parent message
}

# Content delta (incremental update)
{
"event": "agent-content-delta",
"agent": agent,
"content_delta": "Hello", # New content since last update
"content_snapshot": "Hello, world!", # Complete content so far
"agent_message_id": "msg_123"
}
```

### Tool Events
Tool events let you observe when agents use tools and get their results:

```python
# Tool being called
{
"event": "agent-tool-call",
"agent": agent,
"tool_call": {...}, # The complete tool call info
"tool": tool, # The Tool object being called
"args": {...}, # Arguments passed to the tool
"agent_message_id": "msg_123"
}

# Tool call delta (incremental update)
{
"event": "agent-tool-call-delta",
"agent": agent,
"tool_call_delta": {...}, # Changes to the tool call
"tool_call_snapshot": {...}, # Complete tool call info so far
"tool": tool,
"args": {...},
"agent_message_id": "msg_123"
}

# Tool result
{
"event": "tool-result",
"agent": agent,
"tool_result": {
"tool_call": {...}, # The original tool call
"tool": tool, # The Tool object that was called
"result": any, # The raw result value
"str_result": "...", # String representation of result
"is_error": False # Whether the tool call failed
}
}
```

### Workflow Events
### Task Events
Events that mark key points in a task's lifecycle:
- `TaskStart`: A task has begun execution
- `TaskSuccess`: A task completed successfully (includes the final result)
- `TaskFailure`: A task failed (includes the error reason)
- `TaskSkipped`: A task was skipped

### Orchestration Events
Events related to orchestrating the overall workflow:
- `OrchestratorStart`/`End`: Workflow orchestration starting/ending
- `AgentTurnStart`/`End`: An agent's turn starting/ending
- `OrchestratorError`: An error occurred during orchestration

### Handler Methods

Each handler can implement methods for different types of events. The method will be called whenever that type of event occurs. Here are all available handler methods:

| Method | Event Type | Description |
|--------|------------|-------------|
| `on_event(event)` | Any | Called for every event, before any specific handler |
| `on_agent_message(event)` | AgentMessage | Raw LLM output containing both content and tool calls |
| `on_agent_message_delta(event)` | AgentMessageDelta | Incremental updates to raw LLM output |
| `on_agent_content(event)` | AgentContent | Unstructured text output from an agent |
| `on_agent_content_delta(event)` | AgentContentDelta | Incremental updates to agent content |
| `on_agent_tool_call(event)` | AgentToolCall | Tool being called by an agent |
| `on_agent_tool_call_delta(event)` | AgentToolCallDelta | Incremental updates to a tool call |
| `on_tool_result(event)` | ToolResult | Result returned from a tool |
| `on_orchestrator_start(event)` | OrchestratorStart | Workflow orchestration starting |
| `on_orchestrator_end(event)` | OrchestratorEnd | Workflow orchestration completed |
| `on_agent_turn_start(event)` | AgentTurnStart | An agent beginning their turn |
| `on_agent_turn_end(event)` | AgentTurnEnd | An agent completing their turn |
| `on_orchestrator_error(event)` | OrchestratorError | Error during orchestration |

Note that AgentMessage is the "raw" output from the LLM and contains both unstructured content and structured tool calls. When you receive an AgentMessage, you will also receive separate AgentContent and/or AgentToolCall events for any content or tool calls contained in that message. This allows you to:
1. Handle all LLM output in one place with `on_agent_message`
2. Handle just content with `on_agent_content`
3. Handle just tool calls with `on_agent_tool_call`

For streaming cases, the delta events (e.g. AgentMessageDelta, AgentContentDelta) provide incremental updates as the LLM generates its response. Task events, in contrast, are complete events that mark important points in a task's lifecycle - you can use these to track progress and get results without managing the task object directly..
2 changes: 1 addition & 1 deletion src/controlflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from .instructions import instructions
from .decorators import flow, task
from .tools import tool
from .run import run, run_async, run_tasks, run_tasks_async
from .run import run, run_async, run_tasks, run_tasks_async, Stream
from .plan import plan
import controlflow.orchestration

Expand Down
26 changes: 23 additions & 3 deletions src/controlflow/agents/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
Any,
AsyncGenerator,
Generator,
Iterator,
Optional,
Union,
)
Expand Down Expand Up @@ -43,10 +44,11 @@
from controlflow.utilities.prefect import create_markdown_artifact, prefect_task

if TYPE_CHECKING:
from controlflow.events.events import Event
from controlflow.flows import Flow
from controlflow.orchestration.handler import AsyncHandler, Handler
from controlflow.orchestration.turn_strategies import TurnStrategy
from controlflow.tasks import Task
from controlflow.tools.tools import Tool
from controlflow.stream import Stream
logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -223,13 +225,29 @@ def run(
*,
turn_strategy: Optional["TurnStrategy"] = None,
handlers: Optional[list["Handler"]] = None,
stream: Union[bool, "Stream"] = False,
**task_kwargs,
):
) -> Union[Any, Iterator[tuple["Event", Any, Optional[Any]]]]:
"""
Run a task with this agent.

Args:
objective: The objective to accomplish
turn_strategy: Optional turn strategy to use
handlers: Optional list of handlers
stream: If True, stream all events. Can also provide StreamFilter flags.
**task_kwargs: Additional kwargs passed to Task creation

Returns:
If not streaming: The task result
If streaming: Iterator of (event, snapshot, delta) tuples
"""
return controlflow.run(
objective=objective,
agents=[self],
turn_strategy=turn_strategy,
handlers=handlers,
stream=stream,
**task_kwargs,
)

Expand All @@ -239,13 +257,15 @@ async def run_async(
*,
turn_strategy: Optional["TurnStrategy"] = None,
handlers: Optional[list[Union["Handler", "AsyncHandler"]]] = None,
stream: Union[bool, "Stream"] = False,
**task_kwargs,
):
return await controlflow.run_async(
objective=objective,
agents=[self],
turn_strategy=turn_strategy,
handlers=handlers,
stream=stream,
**task_kwargs,
)

Expand Down
2 changes: 1 addition & 1 deletion src/controlflow/events/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def to_messages(self, context: "CompileContext") -> list["BaseMessage"]:
return []

def __repr__(self) -> str:
return f"<{self.event} {self.timestamp}>"
return f"<Event: {self.event} Timestamp: {self.timestamp}>"


class UnpersistedEvent(Event):
Expand Down
Loading