Skip to content
This repository was archived by the owner on Aug 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/controlflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

# --- Default agent ---
# assign to controlflow.default_agent to change the default agent
from .core.agent import DEFAULT_AGENT as default_agent
from .core.agent.agent import DEFAULT_AGENT as default_agent

# --- Version ---
try:
Expand Down
2 changes: 2 additions & 0 deletions src/controlflow/core/agent/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from . import memory
from .agent import Agent, get_default_agent
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
import logging
import random
import uuid
from contextlib import contextmanager
from typing import TYPE_CHECKING, Any, Callable, Optional

from pydantic import Field, field_serializer

import controlflow
from controlflow.llm.models import get_default_model
from controlflow.tools.talk_to_human import talk_to_human
from controlflow.utilities.context import ctx
from controlflow.utilities.types import ControlFlowModel

from .memory import Memory
from .names import NAMES

if TYPE_CHECKING:
from controlflow.core.task import Task
logger = logging.getLogger(__name__)
Expand All @@ -18,17 +25,19 @@ def get_default_agent() -> "Agent":


class Agent(ControlFlowModel):
id: str = Field(default_factory=lambda: str(uuid.uuid4().hex[:5]))
model_config = dict(arbitrary_types_allowed=True)
name: str = Field(
...,
description="The name of the agent. This is used to identify the agent in the system and should be unique per assigned task.",
description="The name of the agent.",
pattern=r"^[a-zA-Z0-9_-]+$",
default_factory=lambda: random.choice(NAMES),
)
description: Optional[str] = Field(
None, description="A description of the agent, visible to other agents."
)
instructions: Optional[str] = Field(
None, description="Instructions for the agent, private to this agent."
"You are a diligent AI assistant. You complete your tasks efficiently and without error.",
description="Instructions for the agent, private to this agent.",
)
tools: list[Callable] = Field(
[], description="List of tools availble to the agent."
Expand All @@ -37,6 +46,11 @@ class Agent(ControlFlowModel):
False,
description="If True, the agent is given tools for interacting with a human user.",
)
memory: Optional[Memory] = Field(
default=None,
# default_factory=ThreadMemory,
description="The memory object used by the agent. If not specified, an in-memory memory object will be used. Pass None to disable memory.",
)

# note: `model` should be typed as a BaseChatModel but V2 models can't have
# V1 attributes without erroring, so we have to use Any.
Expand All @@ -46,32 +60,45 @@ class Agent(ControlFlowModel):
exclude=True,
)

_cm_stack: list[contextmanager] = []

@field_serializer("tools")
def _serialize_tools(self, tools: list[Callable]):
tools = controlflow.llm.tools.as_tools(tools)
# tools are Pydantic 1 objects
return [t.dict(include={"name", "description"}) for t in tools]

def __init__(self, name, **kwargs):
super().__init__(name=name, **kwargs)
def __init__(self, name=None, **kwargs):
if name is not None:
kwargs["name"] = name
super().__init__(**kwargs)

def get_tools(self) -> list[Callable]:
tools = self.tools.copy()
if self.user_access:
tools.append(talk_to_human)
if self.memory is not None:
tools.extend(self.memory.get_tools())
return tools

@contextmanager
def create_context(self):
with ctx(agent=self):
yield self

def __enter__(self):
# use stack so we can enter the context multiple times
self._cm_stack.append(self.create_context())
return self._cm_stack[-1].__enter__()

def __exit__(self, *exc_info):
return self._cm_stack.pop().__exit__(*exc_info)

def run(self, task: "Task"):
return task.run_once(agent=self)

async def run_async(self, task: "Task"):
return await task.run_once_async(agent=self)


DEFAULT_AGENT = Agent(
name="Marvin",
instructions="""
You are a diligent AI assistant. You complete
your tasks efficiently and without error.
""",
)
DEFAULT_AGENT = Agent(name="Marvin")
95 changes: 95 additions & 0 deletions src/controlflow/core/agent/memory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import abc
import uuid
from typing import ClassVar, Optional, cast

from pydantic import Field

from controlflow.llm.tools import Tool
from controlflow.utilities.context import ctx
from controlflow.utilities.types import ControlFlowModel


class Memory(ControlFlowModel, abc.ABC):
id: str = Field(default_factory=lambda: uuid.uuid4().hex)

def load(self) -> dict[int, str]:
"""
Load all memories as a dictionary of index to value.
"""
raise NotImplementedError()

def update(self, value: str, index: int = None):
"""
Store a value, optionally overwriting an existing value at the given index.
"""
raise NotImplementedError()

def delete(self, index: int):
raise NotImplementedError()

def get_tools(self) -> list[Tool]:
update_tool = Tool.from_function(
self.update,
name="update_memory",
description="Privately remember an idea or fact, optionally updating the existing memory at `index`",
)
delete_tool = Tool.from_function(
self.delete,
name="delete_memory",
description="Forget the private memory at `index`",
)

tools = [update_tool, delete_tool]

return tools


class AgentMemory(Memory):
"""
In-memory store for an agent. Memories are scoped to the agent.

Note memories may persist across flows.
"""

_memory: list[str] = []

def update(self, value: str, index: int = None):
if index is not None:
self._memory[index] = value
else:
self._memory.append(value)

def load(self, thread_id: str) -> dict[int, str]:
return dict(enumerate(self._memory))

def delete(self, index: int):
del self._memory[index]


class ThreadMemory(Memory):
"""
In-memory store for an agent. Memories are scoped to each thread.
"""

_memory: ClassVar[dict[str, list[str]]] = {}

def _get_thread_id(self) -> Optional[str]:
from controlflow.core.flow import Flow

if flow := ctx.get("flow", None): # type: Flow
flow = cast(Flow, flow)
return flow.thread_id

def update(self, value: str, index: int = None):
thread_id = self._get_thread_id()
if index is not None:
self._memory[thread_id][index] = value
else:
self._memory[thread_id].append(value)

def load(self, thread_id: str) -> dict[int, str]:
return dict(enumerate(self._memory.get(thread_id, [])))

def delete(self, index: int):
thread_id = self._get_thread_id()
del self._memory[thread_id][index]
20 changes: 20 additions & 0 deletions src/controlflow/core/agent/names.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
NAMES = [
"HAL-9000",
"R2-D2",
"C-3PO",
"WALL-E",
"T-800",
"GLaDOS",
"JARVIS",
"EVE",
"KITT",
"Johnny-5",
"BB-8",
"Ultron",
"TARS",
"Agent-Smith",
"CLU",
"Deckard",
"HK-47",
"Bender",
]
62 changes: 46 additions & 16 deletions src/controlflow/core/controller/instruction_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ class AgentTemplate(Template):
{% if agent.description -%}
- Your description: "{{ agent.description }}"
{% endif %}


Your name and description are visible to other agents. The rest of the
information in this section is private to you.

## Instructions

You are part of an AI workflow and your job is to complete tasks
Expand All @@ -46,31 +48,39 @@ class AgentTemplate(Template):
You must follow your instructions at all times.

{% if agent.instructions %}
These are your private instructions:
- {{ agent.instructions }}
{{ agent.instructions }}
{% endif %}

{% if additional_instructions %}
These instructions apply to all agents at this part of the workflow:
{% for instruction in additional_instructions %}
- {{ instruction }}
## Memory

You have the following private memories:

{% for index, memory in memories %}
- {{ index }}: {{ memory }}
{% endfor %}
{% endif %}

## Other Agents
Use your memory to record information

You may be working with other agents. They may have different instructions and tools than you. To communicate with other agents, post messages to the thread.
"""
agent: Agent
additional_instructions: list[str]
memories: dict[int, str]


class TasksTemplate(Template):
class WorkflowTemplate(Template):
template: str = """
# Workflow

You are part of a team of agents helping to complete a larger workflow.
Certain tasks have been delegated to your team.

## Instructions

{% if additional_instructions %}
These instructions apply to all agents at this part of the workflow:
{% for instruction in additional_instructions %}
- {{ instruction }}
{% endfor %}
{% endif %}

## Flow

Expand Down Expand Up @@ -163,14 +173,13 @@ class TasksTemplate(Template):
tasks: list[Task]
json_tasks: list[dict]
flow: Flow
additional_instructions: list[str]


class CommunicationTemplate(Template):
template: str = """
# Communciation

## The thread

You and other agents are all communicating on a thread to complete
tasks. You can speak normally by posting messages if you need to. This
thread represents the internal state and context of the AI-powered
Expand All @@ -181,7 +190,22 @@ class CommunicationTemplate(Template):
Do not impersonate another agent or post messages on their behalf. The
workflow orchestrator will make sure that all agents have a fair chance
to act. You do not need to identify yourself in your messages.
"""

agent: Agent


class ToolTemplate(Template):
template: str = """
# Tools

## Your memory

You have a memory tool that you can use to store and retrieve private
information. Use this tool when you need to remember something private
or you don't want to confuse the thread. Otherwise, you can post your
thoughts publicly.

## Talking to human users

If your task requires communicating with a human, you will be given a
Expand Down Expand Up @@ -217,15 +241,21 @@ class MainTemplate(ControlFlowModel):
tasks: list[Task]

def render(self):
if self.agent.memory:
memories = self.agent.memory.load(thread_id=self.controller.flow.thread_id)
else:
memories = {}

templates = [
AgentTemplate(
agent=self.agent,
additional_instructions=self.instructions,
memories=memories,
),
TasksTemplate(
WorkflowTemplate(
flow=self.controller.flow,
tasks=self.tasks,
json_tasks=[task.model_dump() for task in self.tasks],
additional_instructions=self.instructions,
),
CommunicationTemplate(
agent=self.agent,
Expand Down
2 changes: 1 addition & 1 deletion src/controlflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class Settings(ControlFlowSettings):
# ------------ Debug settings ------------

raise_on_tool_error: bool = Field(
False, description="If True, an error in a tool call will raise an exception."
True, description="If True, an error in a tool call will raise an exception."
)

print_handler_width: Optional[int] = Field(
Expand Down
2 changes: 1 addition & 1 deletion tests/fixtures/mocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

# import litellm
# import pytest
# from controlflow.core.agent import Agent
# from controlflow.core.agents import Agent
# from controlflow.core.task import Task, TaskStatus
# from controlflow.llm.completions import Response
# from controlflow.settings import temporary_settings
Expand Down