From 1197a4a2f175b062b9df1f9fea40ecf6fc6d5c58 Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin <153965+jlowin@users.noreply.github.com> Date: Mon, 8 Apr 2024 15:35:12 -0400 Subject: [PATCH 1/4] WIP: controller --- README.md | 20 +- pyproject.toml | 3 +- src/control_flow/__init__.py | 8 +- src/control_flow/agent.py | 558 ++---------------- src/control_flow/agent_old.py | 558 ++++++++++++++++++ src/control_flow/controller/__init__.py | 1 + src/control_flow/controller/controller.py | 101 ++++ src/control_flow/controller/delegation.py | 105 ++++ .../controller/instruction_template.py | 166 ++++++ src/control_flow/delegation.py | 86 --- src/control_flow/flow.py | 62 +- src/control_flow/instructions.py | 4 +- src/control_flow/task.py | 90 +-- src/control_flow/termination.py | 36 ++ src/control_flow/types.py | 8 + src/control_flow/utilities/jinja.py | 13 + 16 files changed, 1111 insertions(+), 708 deletions(-) create mode 100644 src/control_flow/agent_old.py create mode 100644 src/control_flow/controller/__init__.py create mode 100644 src/control_flow/controller/controller.py create mode 100644 src/control_flow/controller/delegation.py create mode 100644 src/control_flow/controller/instruction_template.py delete mode 100644 src/control_flow/delegation.py create mode 100644 src/control_flow/termination.py create mode 100644 src/control_flow/types.py create mode 100644 src/control_flow/utilities/jinja.py diff --git a/README.md b/README.md index c82a5768..a29b8517 100644 --- a/README.md +++ b/README.md @@ -1,14 +1,21 @@ ![image](https://github.com/jlowin/ControlFlow/assets/153965/9465c321-6b3f-4a6f-af88-f7e3c250fb31) - # ControlFlow -ControlFlow is a Python framework for orchestrating AI agents in workflows alongside traditional code. It allows you to seamlessly integrate AI into any workflow, coordinate multiple specialized AI agents, collect of human inputs when needed, and maintain full observability for debugging. +ControlFlow is a Python framework for orchestrating AI agents in workflows alongside traditional code. It allows you to declaratively define AI tasks, assign them to agents, and seamlessly integrate them into larger workflows. By providing a structured way to coordinate multiple AI agents, ControlFlow enables you to build sophisticated applications that leverage the power of AI while maintaining the control and flexibility of traditional programming. -ControlFlow is designed with the belief that AI works best when focused and iterated. It encourages breaking workflows into small, targeted steps, each handled by a dedicated AI agent. This keeps each AI as effective as possible, while maintaining context across the entire ensemble. ControlFlow recognizes that AI should augment traditional development, not replace it. It enables a declarative approach to AI, where the desired outcomes are specified and the framework handles the implementation details. This allows developers to mix AI and traditional code freely, leveraging AI where it's most useful while using standard programming everywhere else. +At its core, ControlFlow is built on the idea of agent orchestration. It provides a way to break down complex workflows into smaller, focused tasks that can be assigned to specialized AI agents. These agents can work autonomously on their assigned tasks, while the framework ensures smooth coordination and information sharing between them. This approach allows each agent to excel at its specific role, while the overall workflow benefits from their combined capabilities. 🚨 ControlFlow requires bleeding-edge versions of [Prefect](https://github.com/prefecthq/prefect) and [Marvin](https://github.com/prefecthq/marvin). Caveat emptor! +## Key Concepts + +- **Flow**: A container for an AI-enhanced workflow that maintains consistent context and history. Flows are defined with the `@ai_flow` decorator. +- **Task**: A discrete objective for AI agents to solve. Tasks can be defined with the `@ai_task` decorator or declared inline. +- **Agent**: Agents encapsulate the logic for applying an AI assistant to one or more tasks. +- **Controller**: Controllers are responsible for coordinating agents, delegating tasks, and managing the overall execution of the workflow. + +Users typically don't interact directly with agents or controllers. Instead, they define tasks and flows, which are then executed by the controller. The controller is responsible for managing the agents and ensuring that the workflow is executed correctly. ## Key Features @@ -34,23 +41,19 @@ pip install . from control_flow import ai_flow, ai_task, run_ai_task, instructions from pydantic import BaseModel - class Name(BaseModel): first_name: str last_name: str - @ai_task(user_access=True) def get_user_name() -> Name: pass - @ai_task def write_poem_about_user(name: Name, interests: list[str]) -> str: """write a poem based on the provided `name` and `interests`""" pass - @ai_flow() def demo(): @@ -69,9 +72,8 @@ def demo(): return poem - if __name__ == "__main__": demo() ``` -image +image \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index b9892c38..a4fbc16a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -62,4 +62,5 @@ skip-magic-trailing-comma = false [tool.ruff.lint.per-file-ignores] "__init__.py" = ['I', 'F401', 'E402'] "conftest.py" = ["F401", "F403"] -'tests/fixtures/*.py' = ['F403'] +'tests/fixtures/*.py' = ['F401', 'F403'] +"src/control_flow/types.py" = ['F401'] diff --git a/src/control_flow/__init__.py b/src/control_flow/__init__.py index 371a8e4a..8cd2d46b 100644 --- a/src/control_flow/__init__.py +++ b/src/control_flow/__init__.py @@ -1,7 +1,9 @@ from .settings import settings -from .agent import ai_task, Agent, run_ai_task +# from .agent_old import ai_task, Agent, run_ai_task from .flow import ai_flow -from .instructions import instructions +from .controller.controller import Controller +from .agent import run_ai_task +# from .instructions import instructions -from marvin.beta.assistants import Assistant +# from marvin.beta.assistants import Assistant diff --git a/src/control_flow/agent.py b/src/control_flow/agent.py index 39ee2088..d2f3c299 100644 --- a/src/control_flow/agent.py +++ b/src/control_flow/agent.py @@ -1,192 +1,24 @@ -import functools -import inspect -import json import logging -from datetime import datetime -from typing import Callable, Generic, TypeVar, Union -from zoneinfo import ZoneInfo +from enum import Enum +from typing import Callable, TypeVar -import marvin -import marvin.utilities.tools -import prefect -from marvin.beta.assistants.assistants import Assistant -from marvin.beta.assistants.handlers import PrintHandler -from marvin.beta.assistants.runs import Run -from marvin.tools.assistants import AssistantTool, EndRun -from marvin.types import FunctionTool from marvin.utilities.asyncio import ExposeSyncMethodsMixin, expose_sync_method -from marvin.utilities.jinja import BaseEnvironment -from openai.types.beta.threads.runs import ToolCall -from prefect import get_client as get_prefect_client -from prefect import task as prefect_task -from prefect.context import FlowRunContext -from pydantic import BaseModel, Field, field_validator +from pydantic import Field, field_validator -from control_flow import settings from control_flow.context import ctx -from control_flow.delegation import DelegationStrategy, RoundRobin -from control_flow.utilities.prefect import ( - create_json_artifact, - create_markdown_artifact, - create_python_artifact, -) - -from .flow import AIFlow -from .task import AITask, TaskStatus +from control_flow.flow import Flow +from control_flow.task import Task, TaskStatus +from control_flow.types import Assistant, AssistantTool, ControlFlowModel T = TypeVar("T") logger = logging.getLogger(__name__) NOT_PROVIDED = object() -jinja_environment = BaseEnvironment( - globals={ - "now": lambda: datetime.now(ZoneInfo("UTC")), - "inspect": inspect, - "id": id, - } -) - - -TOOL_CALL_FUNCTION_RESULT_TEMPLATE = inspect.cleandoc( - """ - ## Tool call: {name} - - **Description:** {description} - - ## Arguments - - ```json - {args} - ``` - - ### Result - - ```json - {result} - ``` - """ -) - - -INSTRUCTIONS = """ -You are an AI assistant. Your name is `{{ assistant.name}}`. Your job is to -complete the tasks assigned to you. You were created by a software application, -and any messages you receive are from that software application, not a user. You -may use any tools at your disposal to complete the task, including talking to a -human user. - - -## Instructions - -Follow these instructions at all times: - -{% if assistant.instructions -%} -- {{ assistant.instructions }} -{% endif %} -{% if flow.instructions -%} -- {{ flow.instructions }} -{% endif %} -{% if agent.instructions -%} -- {{ agent.instructions }} -{% endif %} -{% for instruction in instructions %} -- {{ instruction }} -{% endfor %} - - -## Assistants - -The following assistants are collaborating to complete the tasks. - -{% for a in agent.assistants %} -- {{ a.name }} {%- if id(a) == id(assistant) %} (this is you){%endif%}: {{ a.description | default("(no description provided)")}} -{% endfor %} - -{% if agent.assistants|length > 1 %} -Use the `end_run` tool to end your turn and allow another assistant to take -over, without marking a task as complete or failed. -{% endif %} - - - -## Tasks - -{% if agent.tasks %} -You have been assigned the following tasks. You will continue to run until all -tasks are finished. It may take multiple attempts, iterations, or tool uses to -complete a task. When a task is finished, mark it as `completed` -(and provide a result, if required) or `failed` (with a brief explanation) by -using the appropriate tool. Do not mark a task as complete if you don't have a -complete result. Do not make up results. If you receive only partial or unclear -information from a user, keep working until you have all the information you -need. Be very sure that a task is truly unsolvable before marking it as failed, -especially when working with a human user. - - -{% for task_id, task in agent.numbered_tasks() %} -### Task {{ task_id }} -- Status: {{ task.status.value }} -- Objective: {{ task.objective }} -{% if task.instructions %} -- Additional instructions: {{ task.instructions }} -{% endif %} -{% if task.status.value == "completed" %} -- Result: {{ task.result }} -{% elif task.status.value == "failed" %} -- Error: {{ task.error }} -{% endif %} -{% if task.context %} -- Context: {{ task.context }} -{% endif %} - -{% endfor %} -{% else %} -You have no explicit tasks to complete. Follow your instructions as best as you -can. If it is not possible to comply with the instructions in any way, use the -`end_run` tool to manually stop the run. -{% endif %} -## Communication - -All messages you receive in the thread are generated by the software that -created you, not a human user. All messages you send are sent only to that -software and are never seen by any human. - -{% if agent.system_access -%} -The software that created you is an AI capable of processing natural language, -so you can freely respond by posting messages to the thread. There may be more -than one AI posting responses, so preface your messages with your own name to -avoid confusion. For example, if your name is "Marvin", start all of your -messages with "Marvin:". This is not required for tool calls. -{% else %} -The software that created you is a Python script that can only process -structured responses produced by your tools. DO NOT POST ANY MESSAGES OR RESPONSES TO THE -THREAD. They will be ignored and only waste time. ONLY USE TOOLS TO RESPOND. -{% endif %} - -{% if agent.user_access -%} -There is also a human user who may be involved in the task. You can communicate -with them using the `talk_to_human` tool. The user is a human and unaware of -your tasks or this system. Do not mention your tasks or anything about how the -system works to them. They can only see messages you send them via tool, not the -rest of the thread. When dealing with humans, you may not always get a clear or -correct response. You may need to ask multiple times or rephrase your questions. -You should also interpret human responses broadly and not be too literal. -{% else %} -You can not communicate with a human user at this time. -{% endif %} - - -{% if context %} -## Additional context - -The following context was provided: -{% for key, value in context.items() -%} -- {{ key }}: {{ value }} -{% endfor %} -{% endif %} -""" +class AgentStatus(Enum): + INCOMPLETE = "incomplete" + COMPLETE = "complete" def talk_to_human(message: str, get_response: bool = True) -> str: @@ -202,358 +34,68 @@ def talk_to_human(message: str, get_response: bool = True) -> str: return "Message sent to user" -def end_run(): - """Use this tool to end the run without marking a task as complete or failed.""" - return EndRun() - - -class Agent(BaseModel, Generic[T], ExposeSyncMethodsMixin): - tasks: list[AITask] = [] - flow: AIFlow = Field(None, validate_default=True) - assistants: list = Field(None, validate_default=True) - delegation_strategy: DelegationStrategy = Field( - validate_default=True, - description="The strategy for delegating work to assistants.", - default_factory=RoundRobin, - ) - tools: list[Union[AssistantTool, Assistant, Callable]] = [] - context: dict = Field(None, validate_default=True) +class Agent(ControlFlowModel, ExposeSyncMethodsMixin): + tasks: list[Task] = Field(description="Tasks that the agent will complete.") + assistant: Assistant = Field(default_factory=Assistant) + instructions: str = None + tools: list[AssistantTool | Callable] = [] + context: dict = Field({}, validate_default=True) user_access: bool = Field( - None, - validate_default=True, + False, description="If True, the agent is given tools for interacting with a human user.", ) - system_access: bool = Field( - None, - validate_default=True, - description="If True, the agent will communicate with the system via messages. " - "This is usually only used when the agent was spawned by another " - "agent capable of understanding its responses.", + controller_access: bool = Field( + False, + description="If True, the agent will communicate with the controller via messages.", ) - instructions: str = None - model_config: dict = dict(arbitrary_types_allowed=True, extra="forbid") - @field_validator("flow", mode="before") - def _load_flow_from_ctx(cls, v): - if v is None: - v = ctx.get("flow", None) - if v is None: - v = AIFlow() + @field_validator("tasks", mode="before") + def _validate_tasks(cls, v): + if not v: + raise ValueError("An agent must have at least one task.") return v - @field_validator("context", mode="before") - def _default_context(cls, v): - if v is None: - v = {} - return v - - @field_validator("assistants", mode="before") - def _default_assistants(cls, v): - if v is None: - flow = ctx.get("flow") - if flow: - v = flow.assistants - if v is None: - v = [Assistant()] - return v - - @field_validator("user_access", "system_access", mode="before") - def _default_access(cls, v): - if v is None: - v = False - return v - - def numbered_tasks(self) -> list[tuple[int, AITask]]: + def task_ids(self) -> list[tuple[int, Task]]: + """ + Assign an ID to each task so they can be identified by the assistant. + """ return [(i + 1, task) for i, task in enumerate(self.tasks)] - def _get_instructions(self, assistant: Assistant, context: dict = None): - instructions = jinja_environment.render( - INSTRUCTIONS, - agent=self, - flow=self.flow, - assistant=assistant, - instructions=ctx.get("instructions", []), - context={**self.context, **(context or {})}, - ) - return instructions - - def _get_tools(self, assistant: Assistant) -> list[AssistantTool]: - tools = self.flow.tools + self.tools + assistant.tools - - if not self.tasks or len(self.assistants) > 1: - tools.append(end_run) - - # if there is only one task, and the agent can't send a response to the - # system, then we can quit as soon as it is marked finished - if not self.system_access and len(self.tasks) == 1: - early_end_run = True - else: - early_end_run = False - - for i, task in self.numbered_tasks(): - tools.extend( - [ - task._create_complete_tool(task_id=i, end_run=early_end_run), - task._create_fail_tool(task_id=i, end_run=early_end_run), - ] - ) - + def get_tools(self) -> list[AssistantTool | Callable]: + """ + Get all tools from the agent and its tasks. + """ + tools = self.tools + for i, task in self.task_ids(): + tools = tools + task.get_tools(task_id=i) if self.user_access: tools.append(talk_to_human) + return tools - final_tools = [] - for tool in tools: - if not isinstance(tool, AssistantTool): - tool = marvin.utilities.tools.tool_from_function(tool) - - if isinstance(tool, FunctionTool): - - async def modified_fn( - *args, - # provide default args to avoid a late-binding issue - original_fn: Callable = tool.function._python_fn, - tool: FunctionTool = tool, - **kwargs, - ): - # call fn - result = original_fn(*args, **kwargs) - - passed_args = ( - inspect.signature(original_fn).bind(*args, **kwargs).arguments - ) - try: - passed_args = json.dumps(passed_args, indent=2) - except Exception: - pass - create_markdown_artifact( - markdown=TOOL_CALL_FUNCTION_RESULT_TEMPLATE.format( - name=tool.function.name, - description=tool.function.description or "(none provided)", - args=passed_args, - result=result, - ), - key="result", - ) - return result - - tool.function._python_fn = prefect_task( - modified_fn, - task_run_name=f"Tool call: {tool.function.name}", - ) - final_tools.append(tool) - return final_tools - - def _get_run_assistant_task(self): + @property + def status(self) -> AgentStatus: """ - Helper function for building the task that will execute the OpenAI assistant run. - This needs to be regenerated each time in case the instructions change. + Check if all tasks have been completed. """ - - def _name_from_assistant(): - """Helper function for naming task runs""" - from prefect.runtime import task_run - - assistant = task_run.parameters.get("assistant") - return f"Run OpenAI assistant ({assistant.name})" - - @prefect_task(task_run_name=_name_from_assistant) - async def run_openai_assistant( - assistant: Assistant, context: dict = None, run_kwargs: dict = None - ) -> Run: - run_kwargs = run_kwargs or {} - model = run_kwargs.pop( - "model", - assistant.model or self.flow.model or settings.assistant_model, - ) - thread = run_kwargs.pop("thread", self.flow.thread) - - run = Run( - assistant=assistant, - thread=thread, - instructions=self._get_instructions( - assistant=assistant, context=context - ), - tools=self._get_tools(assistant=assistant), - event_handler_class=AgentHandler, - model=model, - **run_kwargs, - ) - await run.run_async() - create_json_artifact( - key="messages", - # dump explicilty because of odd OAI serialization issue - data=[m.model_dump() for m in run.messages], - description="All messages sent and received during the run.", - ) - create_json_artifact( - key="actions", - # dump explicilty because of odd OAI serialization issue - data=[s.model_dump() for s in run.steps], - description="All actions taken by the assistant during the run.", - ) - return run - - return run_openai_assistant - - @expose_sync_method("run") - async def run_async(self, context: dict = None, **run_kwargs) -> list[AITask]: - assistants_generator = self.delegation_strategy.run(self.assistants) - openai_run = self._get_run_assistant_task() - - openai_run( - assistant=next(assistants_generator), - context=context, - run_kwargs=run_kwargs, - ) - - # if this AI can't post messages to the system, then continue to invoke - # it until all tasks are finished - if not self.system_access: - counter = 0 - while ( - any(t.status == TaskStatus.PENDING for t in self.tasks) - and counter < settings.max_agent_iterations - ): - openai_run( - assistant=next(assistants_generator), - context=context, - run_kwargs=run_kwargs, - ) - counter += 1 - - result = [t.result for t in self.tasks if t.status == TaskStatus.COMPLETED] - - return result - - -class AgentHandler(PrintHandler): - def __init__(self, **kwargs): - super().__init__(**kwargs) - self.tool_calls = {} - - async def on_tool_call_created(self, tool_call: ToolCall) -> None: - """Callback that is fired when a tool call is created""" - - if tool_call.type == "function": - task_run_name = "Prepare arguments for tool call" + if any(task.status == TaskStatus.PENDING for task in self.tasks): + return AgentStatus.INCOMPLETE else: - task_run_name = f"Tool call: {tool_call.type}" - - client = get_prefect_client() - engine_context = FlowRunContext.get() - if not engine_context: - return - - task_run = await client.create_task_run( - task=prefect.Task(fn=lambda: None), - name=task_run_name, - extra_tags=["tool-call"], - flow_run_id=engine_context.flow_run.id, - dynamic_key=tool_call.id, - state=prefect.states.Running(), - ) - - self.tool_calls[tool_call.id] = task_run - - async def on_tool_call_done(self, tool_call: ToolCall) -> None: - """Callback that is fired when a tool call is done""" - - client = get_prefect_client() - task_run = self.tool_calls.get(tool_call.id) - if not task_run: - return - await client.set_task_run_state( - task_run_id=task_run.id, state=prefect.states.Completed(), force=True - ) + return AgentStatus.COMPLETE - # code interpreter is run as a single call, so we can publish a result artifact - if tool_call.type == "code_interpreter": - # images = [] - # for output in tool_call.code_interpreter.outputs: - # if output.type == "image": - # image_path = download_temp_file(output.image.file_id) - # images.append(image_path) - - create_python_artifact( - key="code", - code=tool_call.code_interpreter.input, - description="Code executed in the code interpreter", - task_run_id=task_run.id, - ) - create_json_artifact( - key="output", - data=tool_call.code_interpreter.outputs, - description="Output from the code interpreter", - task_run_id=task_run.id, - ) - - elif tool_call.type == "function": - create_json_artifact( - key="arguments", - data=json.dumps(json.loads(tool_call.function.arguments), indent=2), - description=f"Arguments for the `{tool_call.function.name}` tool", - task_run_id=task_run.id, - ) - - -def ai_task( - fn=None, *, objective: str = None, user_access: bool = None, **agent_kwargs: dict -): - """ - Use a Python function to create an AI task. When the function is called, an - agent is created to complete the task and return the result. - """ - - if fn is None: - return functools.partial( - ai_task, objective=objective, user_access=user_access, **agent_kwargs - ) - - sig = inspect.signature(fn) - - if objective is None: - if fn.__doc__: - objective = f"{fn.__name__}: {fn.__doc__}" - else: - objective = fn.__name__ - - @functools.wraps(fn) - def wrapper(*args, **kwargs): - # first process callargs - bound = sig.bind(*args, **kwargs) - bound.apply_defaults() - - return run_ai_task.with_options(name=f"Task: {fn.__name__}")( - task=objective, - cast=fn.__annotations__.get("return"), - context=bound.arguments, - user_access=user_access, - **agent_kwargs, - ) - - return wrapper - - -def _name_from_objective(): - """Helper function for naming task runs""" - from prefect.runtime import task_run - - objective = task_run.parameters.get("task") + @expose_sync_method("run") + async def run_async(self, flow: Flow = None): + from control_flow.controller import SingleAgentController - if not objective: - objective = "Follow general instructions" - if len(objective) > 75: - return f"Task: {objective[:75]}..." - return f"Task: {objective}" + controller = SingleAgentController(agents=[self], flow=flow) + return await controller.run() -@prefect_task(task_run_name=_name_from_objective) +# @prefect_task(task_run_name=_name_from_objective) def run_ai_task( task: str = None, cast: T = NOT_PROVIDED, context: dict = None, user_access: bool = None, - model: str = None, **agent_kwargs: dict, ) -> T: """ @@ -578,13 +120,13 @@ def run_ai_task( # create tasks if task: - ai_tasks = [AITask[cast](objective=task, context=context)] + ai_tasks = [Task[cast](objective=task, context=context or {})] else: ai_tasks = [] # run agent - agent = Agent(tasks=ai_tasks, flow=flow, user_access=user_access, **agent_kwargs) - agent.run(model=model) + agent = Agent(tasks=ai_tasks, user_access=user_access or False, **agent_kwargs) + agent.run(flow=flow) if ai_tasks: if ai_tasks[0].status == TaskStatus.COMPLETED: diff --git a/src/control_flow/agent_old.py b/src/control_flow/agent_old.py new file mode 100644 index 00000000..a93a2d0c --- /dev/null +++ b/src/control_flow/agent_old.py @@ -0,0 +1,558 @@ +import functools +import inspect +import json +import logging +from typing import Callable, Generic, TypeVar, Union + +import marvin +import marvin.utilities.tools +import prefect +from marvin.beta.assistants.assistants import Assistant +from marvin.beta.assistants.handlers import PrintHandler +from marvin.beta.assistants.runs import Run +from marvin.tools.assistants import AssistantTool, EndRun +from marvin.types import FunctionTool +from marvin.utilities.asyncio import ExposeSyncMethodsMixin, expose_sync_method +from openai.types.beta.threads.runs import ToolCall +from prefect import get_client as get_prefect_client +from prefect import task as prefect_task +from prefect.context import FlowRunContext +from pydantic import BaseModel, Field, field_validator + +from control_flow import settings +from control_flow.context import ctx +from control_flow.flow import Flow +from control_flow.task import Task, TaskStatus +from control_flow.utilities.jinja import jinja_env +from control_flow.utilities.prefect import ( + create_json_artifact, + create_markdown_artifact, + create_python_artifact, +) + +T = TypeVar("T") +logger = logging.getLogger(__name__) + +NOT_PROVIDED = object() + + +TOOL_CALL_FUNCTION_RESULT_TEMPLATE = inspect.cleandoc( + """ + ## Tool call: {name} + + **Description:** {description} + + ## Arguments + + ```json + {args} + ``` + + ### Result + + ```json + {result} + ``` + """ +) + + +INSTRUCTIONS = """ +You are an AI assistant. Your name is `{{ assistant.name}}`. Your job is to +complete the tasks assigned to you. You were created by a software application, +and any messages you receive are from that software application, not a user. You +may use any tools at your disposal to complete the task, including talking to a +human user. + + +## Instructions + +Follow these instructions at all times: + +{% if assistant.instructions -%} +- {{ assistant.instructions }} +{% endif %} +{% if flow.instructions -%} +- {{ flow.instructions }} +{% endif %} +{% if agent.instructions -%} +- {{ agent.instructions }} +{% endif %} +{% for instruction in instructions %} +- {{ instruction }} +{% endfor %} + + +## Assistants + +The following assistants are collaborating to complete the tasks. + +{% for a in agent.assistants %} +- {{ a.name }} {%- if id(a) == id(assistant) %} (this is you){%endif%}: {{ a.description | default("(no description provided)")}} +{% endfor %} + +{% if agent.assistants|length > 1 %} +Use the `end_run` tool to end your turn and allow another assistant to take +over, without marking a task as complete or failed. +{% endif %} + + + +## Tasks + +{% if agent.tasks %} +You have been assigned the following tasks. You will continue to run until all +tasks are finished. It may take multiple attempts, iterations, or tool uses to +complete a task. When a task is finished, mark it as `completed` +(and provide a result, if required) or `failed` (with a brief explanation) by +using the appropriate tool. Do not mark a task as complete if you don't have a +complete result. Do not make up results. If you receive only partial or unclear +information from a user, keep working until you have all the information you +need. Be very sure that a task is truly unsolvable before marking it as failed, +especially when working with a human user. + + +{% for task_id, task in agent.numbered_tasks() %} +### Task {{ task_id }} +- Status: {{ task.status.value }} +- Objective: {{ task.objective }} +{% if task.instructions %} +- Additional instructions: {{ task.instructions }} +{% endif %} +{% if task.status.value == "completed" %} +- Result: {{ task.result }} +{% elif task.status.value == "failed" %} +- Error: {{ task.error }} +{% endif %} +{% if task.context %} +- Context: {{ task.context }} +{% endif %} + +{% endfor %} +{% else %} +You have no explicit tasks to complete. Follow your instructions as best as you +can. If it is not possible to comply with the instructions in any way, use the +`end_run` tool to manually stop the run. +{% endif %} + +## Communication + +All messages you receive in the thread are generated by the software that +created you, not a human user. All messages you send are sent only to that +software and are never seen by any human. + +{% if agent.system_access -%} +The software that created you is an AI capable of processing natural language, +so you can freely respond by posting messages to the thread. There may be more +than one AI posting responses, so preface your messages with your own name to +avoid confusion. For example, if your name is "Marvin", start all of your +messages with "Marvin:". This is not required for tool calls. +{% else %} +The software that created you is a Python script that can only process +structured responses produced by your tools. DO NOT POST ANY MESSAGES OR RESPONSES TO THE +THREAD. They will be ignored and only waste time. ONLY USE TOOLS TO RESPOND. +{% endif %} + +{% if agent.user_access -%} +There is also a human user who may be involved in the task. You can communicate +with them using the `talk_to_human` tool. The user is a human and unaware of +your tasks or this system. Do not mention your tasks or anything about how the +system works to them. They can only see messages you send them via tool, not the +rest of the thread. When dealing with humans, you may not always get a clear or +correct response. You may need to ask multiple times or rephrase your questions. +You should also interpret human responses broadly and not be too literal. +{% else %} +You can not communicate with a human user at this time. +{% endif %} + + +{% if context %} +## Additional context + +The following context was provided: +{% for key, value in context.items() -%} +- {{ key }}: {{ value }} +{% endfor %} +{% endif %} +""" + + +def talk_to_human(message: str, get_response: bool = True) -> str: + """ + Send a message to the human user and optionally wait for a response. + If `get_response` is True, the function will return the user's response, + otherwise it will return a simple confirmation. + """ + print(message) + if get_response: + response = input("> ") + return response + return "Message sent to user" + + +def end_run(): + """Use this tool to end the run without marking a task as complete or failed.""" + return EndRun() + + +class Agent(BaseModel, Generic[T], ExposeSyncMethodsMixin): + tasks: list[Task] = [] + assistant: Assistant = Field(None, validate_default=True) + tools: list[Union[AssistantTool, Assistant, Callable]] = [] + context: dict = Field(None, validate_default=True) + user_access: bool = Field( + None, + validate_default=True, + description="If True, the agent is given tools for interacting with a human user.", + ) + system_access: bool = Field( + None, + validate_default=True, + description="If True, the agent will communicate with the system via messages. " + "This is usually only used when the agent was spawned by another " + "agent capable of understanding its responses.", + ) + instructions: str = None + model_config: dict = dict(extra="forbid") + + @field_validator("flow", mode="before") + def _load_flow_from_ctx(cls, v): + if v is None: + v = ctx.get("flow", None) + if v is None: + v = Flow() + return v + + @field_validator("context", mode="before") + def _default_context(cls, v): + if v is None: + v = {} + return v + + @field_validator("assistant", mode="before") + def _default_assistants(cls, v): + if v is None: + v = Assistant() + return v + + @field_validator("user_access", "system_access", mode="before") + def _default_access(cls, v): + if v is None: + v = False + return v + + def numbered_tasks(self) -> list[tuple[int, Task]]: + return [(i + 1, task) for i, task in enumerate(self.tasks)] + + def _get_instructions(self, assistant: Assistant, context: dict = None): + instructions = jinja_env.render( + INSTRUCTIONS, + agent=self, + assistant=assistant, + instructions=ctx.get("instructions", []), + context={**self.context, **(context or {})}, + ) + return instructions + + def _get_tools(self, tools: list[AssistantTool | Callable]) -> list[AssistantTool]: + tools = self.tools + self.assistant.tools + tools + + if not self.tasks: + tools.append(end_run) + + # if there is only one task, and the agent can't send a response to the + # system, then we can quit as soon as it is marked finished + if not self.system_access and len(self.tasks) == 1: + early_end_run = True + else: + early_end_run = False + + for i, task in self.numbered_tasks(): + tools.extend( + [ + task._create_complete_tool(task_id=i, end_run=early_end_run), + task._create_fail_tool(task_id=i, end_run=early_end_run), + ] + ) + + if self.user_access: + tools.append(talk_to_human) + + final_tools = [] + for tool in tools: + if not isinstance(tool, AssistantTool): + tool = marvin.utilities.tools.tool_from_function(tool) + + if isinstance(tool, FunctionTool): + + async def modified_fn( + *args, + # provide default args to avoid a late-binding issue + original_fn: Callable = tool.function._python_fn, + tool: FunctionTool = tool, + **kwargs, + ): + # call fn + result = original_fn(*args, **kwargs) + + passed_args = ( + inspect.signature(original_fn).bind(*args, **kwargs).arguments + ) + try: + passed_args = json.dumps(passed_args, indent=2) + except Exception: + pass + create_markdown_artifact( + markdown=TOOL_CALL_FUNCTION_RESULT_TEMPLATE.format( + name=tool.function.name, + description=tool.function.description or "(none provided)", + args=passed_args, + result=result, + ), + key="result", + ) + return result + + tool.function._python_fn = prefect_task( + modified_fn, + task_run_name=f"Tool call: {tool.function.name}", + ) + final_tools.append(tool) + return final_tools + + def _get_run_assistant_task(self): + """ + Helper function for building the task that will execute the OpenAI assistant run. + This needs to be regenerated each time in case the instructions change. + """ + + @prefect_task(task_run_name=f'Run OpenAI assistant: "{self.assistant.name}"') + async def run_openai_assistant( + context: dict = None, run_kwargs: dict = None + ) -> Run: + run_kwargs = run_kwargs or {} + model = run_kwargs.pop( + "model", + self.assistant.model or self.flow.model or settings.assistant_model, + ) + thread = run_kwargs.pop("thread", self.flow.thread) + + run = Run( + assistant=self.assistant, + thread=thread, + instructions=self._get_instructions(context=context), + tools=self._get_tools(), + event_handler_class=AgentHandler, + model=model, + **run_kwargs, + ) + await run.run_async() + create_json_artifact( + key="messages", + # dump explicilty because of odd OAI serialization issue + data=[m.model_dump() for m in run.messages], + description="All messages sent and received during the run.", + ) + create_json_artifact( + key="actions", + # dump explicilty because of odd OAI serialization issue + data=[s.model_dump() for s in run.steps], + description="All actions taken by the assistant during the run.", + ) + return run + + return run_openai_assistant + + @expose_sync_method("run") + async def run_async(self, context: dict = None, **run_kwargs) -> list[Task]: + openai_run = self._get_run_assistant_task() + + openai_run( + context=context, + run_kwargs=run_kwargs, + ) + + # if this AI can't post messages to the system, then continue to invoke + # it until all tasks are finished + if not self.system_access: + counter = 0 + while ( + any(t.status == TaskStatus.PENDING for t in self.tasks) + and counter < settings.max_agent_iterations + ): + openai_run( + context=context, + run_kwargs=run_kwargs, + ) + counter += 1 + + result = [t.result for t in self.tasks if t.status == TaskStatus.COMPLETED] + + return result + + +class AgentHandler(PrintHandler): + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.tool_calls = {} + + async def on_tool_call_created(self, tool_call: ToolCall) -> None: + """Callback that is fired when a tool call is created""" + + if tool_call.type == "function": + task_run_name = "Prepare arguments for tool call" + else: + task_run_name = f"Tool call: {tool_call.type}" + + client = get_prefect_client() + engine_context = FlowRunContext.get() + if not engine_context: + return + + task_run = await client.create_task_run( + task=prefect.Task(fn=lambda: None), + name=task_run_name, + extra_tags=["tool-call"], + flow_run_id=engine_context.flow_run.id, + dynamic_key=tool_call.id, + state=prefect.states.Running(), + ) + + self.tool_calls[tool_call.id] = task_run + + async def on_tool_call_done(self, tool_call: ToolCall) -> None: + """Callback that is fired when a tool call is done""" + + client = get_prefect_client() + task_run = self.tool_calls.get(tool_call.id) + if not task_run: + return + await client.set_task_run_state( + task_run_id=task_run.id, state=prefect.states.Completed(), force=True + ) + + # code interpreter is run as a single call, so we can publish a result artifact + if tool_call.type == "code_interpreter": + # images = [] + # for output in tool_call.code_interpreter.outputs: + # if output.type == "image": + # image_path = download_temp_file(output.image.file_id) + # images.append(image_path) + + create_python_artifact( + key="code", + code=tool_call.code_interpreter.input, + description="Code executed in the code interpreter", + task_run_id=task_run.id, + ) + create_json_artifact( + key="output", + data=tool_call.code_interpreter.outputs, + description="Output from the code interpreter", + task_run_id=task_run.id, + ) + + elif tool_call.type == "function": + create_json_artifact( + key="arguments", + data=json.dumps(json.loads(tool_call.function.arguments), indent=2), + description=f"Arguments for the `{tool_call.function.name}` tool", + task_run_id=task_run.id, + ) + + +def ai_task( + fn=None, *, objective: str = None, user_access: bool = None, **agent_kwargs: dict +): + """ + Use a Python function to create an AI task. When the function is called, an + agent is created to complete the task and return the result. + """ + + if fn is None: + return functools.partial( + ai_task, objective=objective, user_access=user_access, **agent_kwargs + ) + + sig = inspect.signature(fn) + + if objective is None: + if fn.__doc__: + objective = f"{fn.__name__}: {fn.__doc__}" + else: + objective = fn.__name__ + + @functools.wraps(fn) + def wrapper(*args, **kwargs): + # first process callargs + bound = sig.bind(*args, **kwargs) + bound.apply_defaults() + + # return run_ai_task.with_options(name=f"Task: {fn.__name__}")( + return run_ai_task( + task=objective, + cast=fn.__annotations__.get("return"), + context=bound.arguments, + user_access=user_access, + **agent_kwargs, + ) + + return wrapper + + +def _name_from_objective(): + """Helper function for naming task runs""" + from prefect.runtime import task_run + + objective = task_run.parameters.get("task") + + if not objective: + objective = "Follow general instructions" + if len(objective) > 75: + return f"Task: {objective[:75]}..." + return f"Task: {objective}" + + +# @prefect_task(task_run_name=_name_from_objective) +def run_ai_task( + task: str = None, + cast: T = NOT_PROVIDED, + context: dict = None, + user_access: bool = None, + **agent_kwargs: dict, +) -> T: + """ + Create and run an agent to complete a task with the given objective and + context. This function is similar to an inline version of the @ai_task + decorator. + + This inline version is useful when you want to create and run an ad-hoc AI + task, without defining a function or using decorator syntax. It provides + more flexibility in terms of dynamically setting the task parameters. + Additional detail can be provided as `context`. + """ + + if cast is NOT_PROVIDED: + if not task: + cast = None + else: + cast = str + + # load flow + flow = ctx.get("flow", None) + + # create tasks + if task: + ai_tasks = [Task[cast](objective=task, context=context)] + else: + ai_tasks = [] + + # run agent + agent = Agent(tasks=ai_tasks, flow=flow, user_access=user_access, **agent_kwargs) + agent.run() + + if ai_tasks: + if ai_tasks[0].status == TaskStatus.COMPLETED: + return ai_tasks[0].result + elif ai_tasks[0].status == TaskStatus.FAILED: + raise ValueError(ai_tasks[0].error) diff --git a/src/control_flow/controller/__init__.py b/src/control_flow/controller/__init__.py new file mode 100644 index 00000000..1b1fb669 --- /dev/null +++ b/src/control_flow/controller/__init__.py @@ -0,0 +1 @@ +from .controller import Controller, SingleAgentController diff --git a/src/control_flow/controller/controller.py b/src/control_flow/controller/controller.py new file mode 100644 index 00000000..26694d60 --- /dev/null +++ b/src/control_flow/controller/controller.py @@ -0,0 +1,101 @@ +import logging +from typing import Any + +from marvin.beta.assistants import Run +from marvin.utilities.asyncio import ExposeSyncMethodsMixin +from pydantic import BaseModel, Field, field_validator, model_validator + +from control_flow.agent import Agent, AgentStatus +from control_flow.context import ctx +from control_flow.controller.delegation import DelegationStrategy, RoundRobin, Single +from control_flow.flow import Flow +from control_flow.instructions import get_instructions +from control_flow.types import Thread + +logger = logging.getLogger(__name__) + + +class Controller(BaseModel, ExposeSyncMethodsMixin): + agents: list[Agent] + flow: Flow = Field(None, validate_default=True) + delegation_strategy: DelegationStrategy = Field( + validate_default=True, + description="The strategy for delegating work to assistants.", + default_factory=RoundRobin, + ) + # termination_strategy: TerminationStrategy + context: dict = {} + instructions: str = None + model_config: dict = dict(extra="forbid") + + @field_validator("flow", mode="before") + def _default_flow(cls, v): + if v is None: + v = ctx.get("flow", None) + return v + + @field_validator("agents", mode="before") + def _validate_agents(cls, v): + if not v: + raise ValueError("At least one agent is required.") + return v + + async def run(self): + """ + Run the control flow. + """ + + while incomplete_agents := [ + a for a in self.agents if a.status == AgentStatus.INCOMPLETE + ]: + agent = self.delegation_strategy(incomplete_agents) + if not agent: + return + await self.run_agent(agent=agent) + + async def run_agent(self, agent: Agent, thread: Thread = None): + """ + Run a single agent. + """ + from control_flow.controller.instruction_template import MainTemplate + + instructions_template = MainTemplate( + agent=agent, + controller=self, + context=self.context, + instructions=get_instructions(), + ) + + run = Run( + assistant=agent.assistant, + thread=thread or self.flow.thread, + instructions=instructions_template.render(), + tools=agent.get_tools() + self.flow.tools, + ) + + await run.run_async() + + return run + + +class SingleAgentController(Controller): + """ + A SingleAgentController is a controller that runs a single agent. + """ + + delegation_strategy: Single = Field(None, validate_default=True) + + @field_validator("agents", mode="before") + def _validate_agents(cls, v): + if len(v) != 1: + raise ValueError("A SingleAgentController must have exactly one agent.") + return v + + @model_validator(mode="before") + @classmethod + def _create_single_strategy(cls, data: Any) -> Any: + """ + Create a Single delegation strategy with the agent. + """ + data["delegation_strategy"] = Single(agent=data["agents"][0]) + return data diff --git a/src/control_flow/controller/delegation.py b/src/control_flow/controller/delegation.py new file mode 100644 index 00000000..63d712cf --- /dev/null +++ b/src/control_flow/controller/delegation.py @@ -0,0 +1,105 @@ +import itertools +from typing import TYPE_CHECKING, Any, Generator, Iterator + +import marvin +from pydantic import BaseModel, PrivateAttr + +from control_flow.agent import Agent, AgentStatus +from control_flow.flow import get_flow_messages +from control_flow.instructions import get_instructions + +if TYPE_CHECKING: + from control_flow.agent import Agent + + +class DelegationStrategy(BaseModel): + """ + A DelegationStrategy is a strategy for delegating tasks to AI assistants. + """ + + def __call__(self, agents: list["Agent"]) -> Agent: + """ + Run the delegation strategy with a list of agents. + """ + return self._next_agent(agents) + + def _next_agent(self, agents: list["Agent"]) -> "Agent": + """ + Select an agent from a list of agents. + """ + raise NotImplementedError() + + +class Single(DelegationStrategy): + """ + A Single delegation strategy delegates tasks to a single agent. This is useful for debugging. + """ + + agent: Agent + + def _next_agent(self, agents: list[Agent]) -> Generator[Any, Any, Agent]: + """ + Given a list of potential agents, choose the single agent. + """ + if self.agent in agents: + return self.agent + + +class RoundRobin(DelegationStrategy): + """ + A RoundRobin delegation strategy delegates tasks to AI assistants in a round-robin fashion. + """ + + _cycle: Iterator[Agent] = PrivateAttr(None) + + def _next_agent(self, agents: list["Agent"]) -> "Agent": + """ + Given a list of potential agents, delegate the tasks in a round-robin fashion. + """ + # the first time this is called, create a cycle iterator + if self._cycle is None: + self._cycle = itertools.cycle(agents) + + # cycle once through all agents, returning the first one that is in the list + # if no agent is found after cycling through all agents, return None + first_agent_seen = None + for agent in self._cycle: + if agent in agents: + return agent + elif agent == first_agent_seen: + break + + # remember the first agent seen so we can avoid an infinite loop + if first_agent_seen is None: + first_agent_seen = agent + + +class Moderator(DelegationStrategy): + """ + A Moderator delegation strategy delegates tasks to the most qualified AI assistant, using a Marvin classifier + """ + + model: str = None + + def _next_agent(self, agents: list["Agent"]) -> "Agent": + """ + Given a list of potential agents, choose the most qualified assistant to complete the tasks. + """ + + instructions = get_instructions() + history = get_flow_messages() + + context = dict(messages=history, global_instructions=instructions) + agent = marvin.classify( + context, + [a for a in agents if a.status == AgentStatus.INCOMPLETE], + instructions=""" + Given the conversation context, choose the AI agent most + qualified to take the next turn at completing the tasks. Take into + account the instructions, each agent's own instructions, and the + tools they have available. + """, + model_kwargs=dict(model=self.model), + ) + + return agent diff --git a/src/control_flow/controller/instruction_template.py b/src/control_flow/controller/instruction_template.py new file mode 100644 index 00000000..5d94b9a6 --- /dev/null +++ b/src/control_flow/controller/instruction_template.py @@ -0,0 +1,166 @@ +import inspect + +from pydantic import BaseModel + +from control_flow.agent import Agent +from control_flow.types import ControlFlowModel +from control_flow.utilities.jinja import jinja_env + +from .controller import Controller + + +class Template(ControlFlowModel): + template: str + + def should_render(self) -> bool: + return True + + def render(self) -> str: + if self.should_render(): + render_kwargs = dict(self) + render_kwargs.pop("template") + return jinja_env.render(inspect.cleandoc(self.template), **render_kwargs) + + +class HeaderTemplate(Template): + template: str = """ + You are an AI agent. Your name is "{{ agent.assistant.name }}". + """ + + agent: Agent + + +class InstructionsTemplate(Template): + template: str = """ + ## Instructions + + {% if flow_instructions %} + ### Workflow instructions + + {{ flow_instructions }} + {% endif %} + + {% if controller_instructions %} + ### Controller instructions + + {{ controller_instructions }} + {% endif %} + + {% if agent_instructions %} + ### Agent instructions + + {{ agent_instructions }} + {% endif %} + + {% if additional_instructions %} + ### Additional instructions + + {% for instruction in additional_instructions %} + - {{ instruction }} + {% endfor %} + {% endif %} + """ + flow_instructions: str | None = None + controller_instructions: str | None = None + agent_instructions: str | None = None + additional_instructions: list[str] = [] + + def should_render(self): + return any( + [ + self.flow_instructions, + self.controller_instructions, + self.agent_instructions, + self.additional_instructions, + ] + ) + + +class TasksTemplate(Template): + template: str = """ + ## Tasks + + {% for task_id, task in agent.task_ids() %} + ### Task {{ task_id }} + - Status: {{ task.status }} + - Objective: {{ task.objective }} + {% if task.instructions %} + - Instructions: {{ task.instructions }} + {% endif %} + {% if task.status.value == "completed" %} + - Result: {{ task.result }} + {% elif task.status.value == "failed" %} + - Error: {{ task.error }} + {% endif %} + {% if task.context %} + - Context: {{ task.context }} + {% endif %} + + {% endfor %} + """ + agent: Agent + + def should_render(self): + return any(self.agent.tasks) + + +class ContextTemplate(Template): + template: str = """ + ## Context + + {% if flow_context %} + ### Flow context + {% for key, value in flow_context.items() %} + - *{{ key }}*: {{ value }} + {% endfor %} + {% endif %} + + {% if controller_context %} + ### Controller context + {% for key, value in controller_context.items() %} + - *{{ key }}*: {{ value }} + {% endfor %} + {% endif %} + + {% if agent_context %} + ### Agent context + {% for key, value in agent_context.items() %} + - *{{ key }}*: {{ value }} + {% endfor %} + {% endif %} + """ + flow_context: dict + controller_context: dict + agent_context: dict + + def should_render(self): + return bool(self.flow_context or self.controller_context or self.agent_context) + + +class MainTemplate(BaseModel): + agent: Agent + controller: Controller + context: dict + instructions: list[str] + + def render(self): + templates = [ + HeaderTemplate(agent=self.agent), + InstructionsTemplate( + flow_instructions=self.controller.flow.instructions, + controller_instructions=self.controller.instructions, + agent_instructions=self.agent.instructions, + additional_instructions=self.instructions, + ), + TasksTemplate(agent=self.agent), + ContextTemplate( + flow_context=self.controller.flow.context, + controller_context=self.controller.context, + agent_context=self.agent.context, + ), + ] + + rendered = [ + template.render() for template in templates if template.should_render() + ] + return "\n\n".join(rendered) diff --git a/src/control_flow/delegation.py b/src/control_flow/delegation.py deleted file mode 100644 index 86c2a7bb..00000000 --- a/src/control_flow/delegation.py +++ /dev/null @@ -1,86 +0,0 @@ -import itertools -from typing import Any, Generator - -import marvin -from marvin.beta.assistants import Assistant -from pydantic import BaseModel - -from control_flow.flow import get_flow_messages -from control_flow.instructions import get_instructions - - -class DelegationStrategy(BaseModel): - """ - A DelegationStrategy is a strategy for delegating tasks to AI assistants. - """ - - def run(self, assistants: list[Assistant]) -> Generator[Any, Any, Assistant]: - """ - Given a list of potential assistants, delegate the task to the most qualified assistant. - """ - - raise NotImplementedError() - - -class Single(DelegationStrategy): - """ - A Single delegation strategy delegates tasks to a single AI assistant. - """ - - assistant: Assistant - - def run(self, assistants: list[Assistant]) -> Generator[Any, Any, Assistant]: - """ - Given a list of potential assistants, choose the first assistant in the list. - """ - - if self.assistant not in assistants: - raise ValueError("Assistant not in list of assistants") - - while True: - yield self.assistant - - -class RoundRobin(DelegationStrategy): - """ - A RoundRobin delegation strategy delegates tasks to AI assistants in a round-robin fashion. - """ - - def run(self, assistants: list[Assistant]) -> Generator[Any, Any, Assistant]: - """ - Given a list of potential assistants, choose the next assistant in the list. - """ - - yield from itertools.cycle(assistants) - - -class Moderator(DelegationStrategy): - """ - A Moderator delegation strategy delegates tasks to the most qualified AI assistant, using a Marvin classifier - """ - - model: str = None - - def run(self, assistants: list[Assistant]) -> Generator[Any, Any, Assistant]: - """ - Given a list of potential assistants, delegate the task to the most qualified assistant. - """ - - while True: - instructions = get_instructions() - history = get_flow_messages() - - context = dict(messages=history, global_instructions=instructions) - assistant = marvin.classify( - context, - assistants, - instructions=""" - Given the conversation context, choose the AI assistant most - qualified to take the next turn at completing the tasks. Take into - account the instructions, each assistant's own instructions, and the - tools they have available. - """, - model_kwargs=dict(model=self.model), - ) - - yield assistant diff --git a/src/control_flow/flow.py b/src/control_flow/flow.py index e5184d82..bfd15b51 100644 --- a/src/control_flow/flow.py +++ b/src/control_flow/flow.py @@ -1,28 +1,28 @@ import functools -from typing import Callable, Optional, Union +from typing import Callable -from marvin.beta.assistants import Assistant, Thread -from marvin.beta.assistants.assistants import AssistantTool +from marvin.beta.assistants import Thread from marvin.utilities.logging import get_logger from openai.types.beta.threads import Message from prefect import flow as prefect_flow from prefect import task as prefect_task -from pydantic import BaseModel, Field, field_validator +from pydantic import Field, field_validator from control_flow.context import ctx +from control_flow.types import AssistantTool, ControlFlowModel from control_flow.utilities.marvin import patch_marvin logger = get_logger(__name__) -class AIFlow(BaseModel): +class Flow(ControlFlowModel): thread: Thread = Field(None, validate_default=True) - assistants: Optional[list[Assistant]] = Field(None, validate_default=True) - tools: list[Union[AssistantTool, Callable]] = Field(None, validate_default=True) - instructions: Optional[str] = None - model: Optional[str] = None - - model_config: dict = dict(validate_assignment=True, extra="forbid") + tools: list[AssistantTool | Callable] = Field( + [], description="Tools that will be available to every agent in the flow" + ) + instructions: str | None = None + model: str | None = None + context: dict = {} @field_validator("thread", mode="before") def _load_thread_from_ctx(cls, v): @@ -35,12 +35,6 @@ def _load_thread_from_ctx(cls, v): return v - @field_validator("tools", mode="before") - def _default_tools(cls, v): - if v is None: - v = [] - return v - def add_message(self, message: str): prefect_task(self.thread.add)(message) @@ -48,9 +42,8 @@ def add_message(self, message: str): def ai_flow( fn=None, *, - assistants: list[Assistant] = None, thread: Thread = None, - tools: list[Union[AssistantTool, Callable]] = None, + tools: list[AssistantTool | Callable] = None, instructions: str = None, model: str = None, ): @@ -61,7 +54,6 @@ def ai_flow( if fn is None: return functools.partial( ai_flow, - assistants=assistants, thread=thread, tools=tools, instructions=instructions, @@ -71,25 +63,19 @@ def ai_flow( @functools.wraps(fn) def wrapper( *args, - _assistants: list[Assistant] = None, - _thread: Thread = None, - _tools: list[Union[AssistantTool, Callable]] = None, - _instructions: str = None, - _model: str = None, + flow_kwargs: dict = None, **kwargs, ): p_fn = prefect_flow(fn) - flow_assistants = _assistants or assistants - flow_thread = _thread or thread or Thread() - flow_instructions = _instructions or instructions - flow_tools = _tools or tools - flow_model = _model or model - flow_obj = AIFlow( - thread=flow_thread, - assistants=flow_assistants, - tools=flow_tools, - instructions=flow_instructions, - model=flow_model, + + flow_obj = Flow( + **{ + "thread": thread, + "tools": tools or [], + "instructions": instructions, + "model": model, + **(flow_kwargs or {}), + } ) logger.info( @@ -102,13 +88,13 @@ def wrapper( return wrapper -def get_flow() -> AIFlow: +def get_flow() -> Flow: """ Loads the flow from the context. Will error if no flow is found in the context. """ - flow: Optional[AIFlow] = ctx.get("flow") + flow: Flow | None = ctx.get("flow") if not flow: raise ValueError("No flow found in context") return flow diff --git a/src/control_flow/instructions.py b/src/control_flow/instructions.py index 4780d66a..08dd7f18 100644 --- a/src/control_flow/instructions.py +++ b/src/control_flow/instructions.py @@ -5,7 +5,7 @@ from marvin.utilities.logging import get_logger from control_flow.context import ctx -from control_flow.flow import AIFlow +from control_flow.flow import Flow logger = get_logger(__name__) @@ -31,7 +31,7 @@ def instructions( """ if post_add_message or post_remove_message: - flow: AIFlow = ctx.get("flow") + flow: Flow = ctx.get("flow") if flow is None: raise ValueError( "instructions() with message posting must be used within a flow context" diff --git a/src/control_flow/task.py b/src/control_flow/task.py index 438aa546..8d05f699 100644 --- a/src/control_flow/task.py +++ b/src/control_flow/task.py @@ -1,12 +1,13 @@ from enum import Enum -from typing import Generic, Optional, TypeVar +from typing import Callable, Generic, TypeVar import marvin import marvin.utilities.tools -from marvin.beta.assistants.runs import EndRun from marvin.utilities.logging import get_logger from marvin.utilities.tools import FunctionTool -from pydantic import BaseModel, Field, field_validator +from pydantic import Field + +from control_flow.types import AssistantTool, ControlFlowModel T = TypeVar("T") logger = get_logger(__name__) @@ -18,91 +19,58 @@ class TaskStatus(Enum): FAILED = "failed" -class AITask(BaseModel, Generic[T]): - """ - An AITask represents a single unit of work that an assistant must complete. - Unlike instructions, which do not require a formal result and may last for - zero or more agent iterations, tasks must be formally completed (or failed) - by producing a result. Agents that are assigned tasks will continue to - iterate until all tasks are completed. - """ - +class Task(ControlFlowModel, Generic[T]): objective: str - instructions: Optional[str] = None - context: dict = Field(None, validate_default=True) + instructions: str | None = None + context: dict = Field({}) status: TaskStatus = TaskStatus.PENDING result: T = None - error: Optional[str] = None - - # internal - model_config: dict = dict(validate_assignment=True, extra="forbid") + error: str | None = None + tools: list[AssistantTool | Callable] = [] - @field_validator("context", mode="before") - def _default_context(cls, v): - if v is None: - v = {} - return v + def __hash__(self): + return id(self) - def _create_complete_tool( - self, task_id: int, end_run: bool = False - ) -> FunctionTool: + def _create_complete_tool(self, task_id: int) -> FunctionTool: """ Create an agent-compatible tool for completing this task. """ - result_type = self.get_result_type() - if result_type is not None: - - def complete(result: result_type): - self.result = result - self.status = TaskStatus.COMPLETED - if end_run: - return EndRun() - - tool = marvin.utilities.tools.tool_from_function( - complete, - name=f"complete_task_{task_id}", - description=f"Mark task {task_id} completed", - ) - else: + def complete(result: result_type): + self.result = result + self.status = TaskStatus.COMPLETED - def complete(): - self.status = TaskStatus.COMPLETED - if end_run: - return EndRun() - - tool = marvin.utilities.tools.tool_from_function( - complete, - name=f"complete_task_{task_id}", - description=f"Mark task {task_id} completed", - ) + tool = marvin.utilities.tools.tool_from_function( + complete, + name=f"complete_task_{task_id}", + description=f"Mark task {task_id} completed", + ) return tool - def _create_fail_tool(self, task_id: int, end_run: bool = False) -> FunctionTool: + def _create_fail_tool(self, task_id: int) -> FunctionTool: """ Create an agent-compatible tool for failing this task. """ - - def fail(message: Optional[str] = None): - self.error = message - self.status = TaskStatus.FAILED - if end_run: - return EndRun() - tool = marvin.utilities.tools.tool_from_function( - fail, + self.fail, name=f"fail_task_{task_id}", description=f"Mark task {task_id} failed", ) return tool + def get_tools(self, task_id: int) -> list[AssistantTool | Callable]: + return [ + self._create_complete_tool(task_id), + self._create_fail_tool(task_id), + ] + self.tools + def complete(self, result: T): self.result = result self.status = TaskStatus.COMPLETED - def fail(self, message: Optional[str] = None): + def fail(self, message: str | None = None): self.error = message self.status = TaskStatus.FAILED diff --git a/src/control_flow/termination.py b/src/control_flow/termination.py new file mode 100644 index 00000000..773582d5 --- /dev/null +++ b/src/control_flow/termination.py @@ -0,0 +1,36 @@ +from typing import TYPE_CHECKING + +from pydantic import BaseModel + +from control_flow.task import TaskStatus + +if TYPE_CHECKING: + from control_flow.agent import Agent + + +class TerminationStrategy(BaseModel): + """ + A TerminationStrategy is a strategy for deciding when AI assistants have completed their tasks. + """ + + def run(self, agents: list["Agent"]) -> bool: + """ + Given agents, determine whether they have completed their tasks. + """ + + raise NotImplementedError() + + +class AllFinished(TerminationStrategy): + """ + An AllFinished termination strategy terminates when all agents have finished all of their tasks (either COMPLETED or FAILED). + """ + + def run(self, agents: list["Agent"]) -> bool: + """ + Given agents, determine whether they have completed their tasks. + """ + for agent in agents: + if any(task.status == TaskStatus.PENDING for task in agent.tasks): + return False + return True diff --git a/src/control_flow/types.py b/src/control_flow/types.py new file mode 100644 index 00000000..5b4c45d2 --- /dev/null +++ b/src/control_flow/types.py @@ -0,0 +1,8 @@ +from marvin.beta.assistants import Assistant, Thread +from marvin.beta.assistants.assistants import AssistantTool +from marvin.utilities.asyncio import ExposeSyncMethodsMixin +from pydantic import BaseModel + + +class ControlFlowModel(BaseModel): + model_config = dict(validate_assignment=True, extra="forbid") diff --git a/src/control_flow/utilities/jinja.py b/src/control_flow/utilities/jinja.py new file mode 100644 index 00000000..92d4ca71 --- /dev/null +++ b/src/control_flow/utilities/jinja.py @@ -0,0 +1,13 @@ +import inspect +from datetime import datetime +from zoneinfo import ZoneInfo + +from marvin.utilities.jinja import BaseEnvironment + +jinja_env = BaseEnvironment( + globals={ + "now": lambda: datetime.now(ZoneInfo("UTC")), + "inspect": inspect, + "id": id, + } +) From e7d5be2ffadac0154f2cc59ee6537769b2b3fbd4 Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin <153965+jlowin@users.noreply.github.com> Date: Mon, 8 Apr 2024 16:08:57 -0400 Subject: [PATCH 2/4] reorg all files --- pyproject.toml | 2 +- src/control_flow/__init__.py | 6 ++--- src/control_flow/agent_old.py | 6 ++--- src/control_flow/core/__init__.py | 4 +++ src/control_flow/{ => core}/agent.py | 10 +++---- .../{ => core}/controller/__init__.py | 1 + .../{ => core}/controller/controller.py | 19 ++++++++----- .../{ => core}/controller/delegation.py | 6 ++--- .../controller/instruction_template.py | 27 ++++++++++++++++--- .../{ => core/controller}/termination.py | 0 src/control_flow/{ => core}/flow.py | 4 +-- src/control_flow/{ => core}/task.py | 2 +- src/control_flow/instructions.py | 4 +-- src/control_flow/{ => utilities}/context.py | 0 src/control_flow/{ => utilities}/types.py | 0 tests/flows/test_sign_guestbook.py | 2 +- 16 files changed, 61 insertions(+), 32 deletions(-) create mode 100644 src/control_flow/core/__init__.py rename src/control_flow/{ => core}/agent.py (93%) rename src/control_flow/{ => core}/controller/__init__.py (62%) rename src/control_flow/{ => core}/controller/controller.py (84%) rename src/control_flow/{ => core}/controller/delegation.py (95%) rename src/control_flow/{ => core}/controller/instruction_template.py (79%) rename src/control_flow/{ => core/controller}/termination.py (100%) rename src/control_flow/{ => core}/flow.py (95%) rename src/control_flow/{ => core}/task.py (96%) rename src/control_flow/{ => utilities}/context.py (100%) rename src/control_flow/{ => utilities}/types.py (100%) diff --git a/pyproject.toml b/pyproject.toml index a4fbc16a..7bec0e5b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -63,4 +63,4 @@ skip-magic-trailing-comma = false "__init__.py" = ['I', 'F401', 'E402'] "conftest.py" = ["F401", "F403"] 'tests/fixtures/*.py' = ['F401', 'F403'] -"src/control_flow/types.py" = ['F401'] +"src/control_flow/utilities/types.py" = ['F401'] diff --git a/src/control_flow/__init__.py b/src/control_flow/__init__.py index 8cd2d46b..7ae7eaff 100644 --- a/src/control_flow/__init__.py +++ b/src/control_flow/__init__.py @@ -1,9 +1,9 @@ from .settings import settings # from .agent_old import ai_task, Agent, run_ai_task -from .flow import ai_flow -from .controller.controller import Controller -from .agent import run_ai_task +from .core.flow import ai_flow, Flow +from .core.controller.controller import Controller +from .core.agent import run_ai_task # from .instructions import instructions # from marvin.beta.assistants import Assistant diff --git a/src/control_flow/agent_old.py b/src/control_flow/agent_old.py index a93a2d0c..22923269 100644 --- a/src/control_flow/agent_old.py +++ b/src/control_flow/agent_old.py @@ -20,9 +20,9 @@ from pydantic import BaseModel, Field, field_validator from control_flow import settings -from control_flow.context import ctx -from control_flow.flow import Flow -from control_flow.task import Task, TaskStatus +from control_flow.core.flow import Flow +from control_flow.core.task import Task, TaskStatus +from control_flow.utilities.context import ctx from control_flow.utilities.jinja import jinja_env from control_flow.utilities.prefect import ( create_json_artifact, diff --git a/src/control_flow/core/__init__.py b/src/control_flow/core/__init__.py new file mode 100644 index 00000000..eaaedf38 --- /dev/null +++ b/src/control_flow/core/__init__.py @@ -0,0 +1,4 @@ +from .task import Task, TaskStatus +from .flow import Flow +from .agent import Agent +from .controller import Controller, SingleAgentController diff --git a/src/control_flow/agent.py b/src/control_flow/core/agent.py similarity index 93% rename from src/control_flow/agent.py rename to src/control_flow/core/agent.py index d2f3c299..e3ea39d8 100644 --- a/src/control_flow/agent.py +++ b/src/control_flow/core/agent.py @@ -5,10 +5,10 @@ from marvin.utilities.asyncio import ExposeSyncMethodsMixin, expose_sync_method from pydantic import Field, field_validator -from control_flow.context import ctx -from control_flow.flow import Flow -from control_flow.task import Task, TaskStatus -from control_flow.types import Assistant, AssistantTool, ControlFlowModel +from control_flow.core.flow import Flow +from control_flow.core.task import Task, TaskStatus +from control_flow.utilities.context import ctx +from control_flow.utilities.types import Assistant, AssistantTool, ControlFlowModel T = TypeVar("T") logger = logging.getLogger(__name__) @@ -84,7 +84,7 @@ def status(self) -> AgentStatus: @expose_sync_method("run") async def run_async(self, flow: Flow = None): - from control_flow.controller import SingleAgentController + from control_flow.core.controller import SingleAgentController controller = SingleAgentController(agents=[self], flow=flow) return await controller.run() diff --git a/src/control_flow/controller/__init__.py b/src/control_flow/core/controller/__init__.py similarity index 62% rename from src/control_flow/controller/__init__.py rename to src/control_flow/core/controller/__init__.py index 1b1fb669..ab1f892c 100644 --- a/src/control_flow/controller/__init__.py +++ b/src/control_flow/core/controller/__init__.py @@ -1 +1,2 @@ from .controller import Controller, SingleAgentController +from .delegation import RoundRobin diff --git a/src/control_flow/controller/controller.py b/src/control_flow/core/controller/controller.py similarity index 84% rename from src/control_flow/controller/controller.py rename to src/control_flow/core/controller/controller.py index 26694d60..8efa24d5 100644 --- a/src/control_flow/controller/controller.py +++ b/src/control_flow/core/controller/controller.py @@ -1,16 +1,20 @@ import logging from typing import Any -from marvin.beta.assistants import Run +from marvin.beta.assistants import PrintHandler, Run from marvin.utilities.asyncio import ExposeSyncMethodsMixin from pydantic import BaseModel, Field, field_validator, model_validator -from control_flow.agent import Agent, AgentStatus -from control_flow.context import ctx -from control_flow.controller.delegation import DelegationStrategy, RoundRobin, Single -from control_flow.flow import Flow +from control_flow.core.agent import Agent, AgentStatus +from control_flow.core.controller.delegation import ( + DelegationStrategy, + RoundRobin, + Single, +) +from control_flow.core.flow import Flow from control_flow.instructions import get_instructions -from control_flow.types import Thread +from control_flow.utilities.context import ctx +from control_flow.utilities.types import Thread logger = logging.getLogger(__name__) @@ -57,7 +61,7 @@ async def run_agent(self, agent: Agent, thread: Thread = None): """ Run a single agent. """ - from control_flow.controller.instruction_template import MainTemplate + from control_flow.core.controller.instruction_template import MainTemplate instructions_template = MainTemplate( agent=agent, @@ -71,6 +75,7 @@ async def run_agent(self, agent: Agent, thread: Thread = None): thread=thread or self.flow.thread, instructions=instructions_template.render(), tools=agent.get_tools() + self.flow.tools, + event_handler_class=PrintHandler, ) await run.run_async() diff --git a/src/control_flow/controller/delegation.py b/src/control_flow/core/controller/delegation.py similarity index 95% rename from src/control_flow/controller/delegation.py rename to src/control_flow/core/controller/delegation.py index 63d712cf..095a7b9c 100644 --- a/src/control_flow/controller/delegation.py +++ b/src/control_flow/core/controller/delegation.py @@ -4,12 +4,12 @@ import marvin from pydantic import BaseModel, PrivateAttr -from control_flow.agent import Agent, AgentStatus -from control_flow.flow import get_flow_messages +from control_flow.core.agent import Agent, AgentStatus +from control_flow.core.flow import get_flow_messages from control_flow.instructions import get_instructions if TYPE_CHECKING: - from control_flow.agent import Agent + from control_flow.core.agent import Agent class DelegationStrategy(BaseModel): diff --git a/src/control_flow/controller/instruction_template.py b/src/control_flow/core/controller/instruction_template.py similarity index 79% rename from src/control_flow/controller/instruction_template.py rename to src/control_flow/core/controller/instruction_template.py index 5d94b9a6..6a50e576 100644 --- a/src/control_flow/controller/instruction_template.py +++ b/src/control_flow/core/controller/instruction_template.py @@ -2,9 +2,9 @@ from pydantic import BaseModel -from control_flow.agent import Agent -from control_flow.types import ControlFlowModel +from control_flow.core.agent import Agent from control_flow.utilities.jinja import jinja_env +from control_flow.utilities.types import ControlFlowModel from .controller import Controller @@ -22,9 +22,28 @@ def render(self) -> str: return jinja_env.render(inspect.cleandoc(self.template), **render_kwargs) -class HeaderTemplate(Template): +class AgentTemplate(Template): template: str = """ You are an AI agent. Your name is "{{ agent.assistant.name }}". + + You have been created by a Controller in the Python library ControlFlow in + order to complete various tasks or instructions. All messages in this thread + are either from the Controller or from other AI agents like you. Preface any + response with your name (e.g. "{{ agent.assistant.name }}: Hello!") in order + to distinguish your messages. + + {% if agent.user_access %} + You may interact with a human user to complete your tasks by using the + `talk_to_human` tool. The human is unaware of your tasks or the Controller. + Do not mention them or anything else about how this system works. The human + can only see messages you send via tool, not the rest of the thread. + + Humans may give poor or incorrect responses. You may need to ask questions + multiple times and not be too literal in your interpretation of their + responses. + {% else %} + You can not interact with a human at this time. + {% endif %} """ agent: Agent @@ -145,7 +164,7 @@ class MainTemplate(BaseModel): def render(self): templates = [ - HeaderTemplate(agent=self.agent), + AgentTemplate(agent=self.agent), InstructionsTemplate( flow_instructions=self.controller.flow.instructions, controller_instructions=self.controller.instructions, diff --git a/src/control_flow/termination.py b/src/control_flow/core/controller/termination.py similarity index 100% rename from src/control_flow/termination.py rename to src/control_flow/core/controller/termination.py diff --git a/src/control_flow/flow.py b/src/control_flow/core/flow.py similarity index 95% rename from src/control_flow/flow.py rename to src/control_flow/core/flow.py index bfd15b51..f4f07030 100644 --- a/src/control_flow/flow.py +++ b/src/control_flow/core/flow.py @@ -8,9 +8,9 @@ from prefect import task as prefect_task from pydantic import Field, field_validator -from control_flow.context import ctx -from control_flow.types import AssistantTool, ControlFlowModel +from control_flow.utilities.context import ctx from control_flow.utilities.marvin import patch_marvin +from control_flow.utilities.types import AssistantTool, ControlFlowModel logger = get_logger(__name__) diff --git a/src/control_flow/task.py b/src/control_flow/core/task.py similarity index 96% rename from src/control_flow/task.py rename to src/control_flow/core/task.py index 8d05f699..fdadc02b 100644 --- a/src/control_flow/task.py +++ b/src/control_flow/core/task.py @@ -7,7 +7,7 @@ from marvin.utilities.tools import FunctionTool from pydantic import Field -from control_flow.types import AssistantTool, ControlFlowModel +from control_flow.utilities.types import AssistantTool, ControlFlowModel T = TypeVar("T") logger = get_logger(__name__) diff --git a/src/control_flow/instructions.py b/src/control_flow/instructions.py index 08dd7f18..6ec1d968 100644 --- a/src/control_flow/instructions.py +++ b/src/control_flow/instructions.py @@ -4,8 +4,8 @@ from marvin.utilities.logging import get_logger -from control_flow.context import ctx -from control_flow.flow import Flow +from control_flow.core.flow import Flow +from control_flow.utilities.context import ctx logger = get_logger(__name__) diff --git a/src/control_flow/context.py b/src/control_flow/utilities/context.py similarity index 100% rename from src/control_flow/context.py rename to src/control_flow/utilities/context.py diff --git a/src/control_flow/types.py b/src/control_flow/utilities/types.py similarity index 100% rename from src/control_flow/types.py rename to src/control_flow/utilities/types.py diff --git a/tests/flows/test_sign_guestbook.py b/tests/flows/test_sign_guestbook.py index b3d816cf..14d815ac 100644 --- a/tests/flows/test_sign_guestbook.py +++ b/tests/flows/test_sign_guestbook.py @@ -1,5 +1,5 @@ from control_flow import Assistant, run_ai_task -from control_flow.flow import ai_flow +from control_flow.core.flow import ai_flow # define assistants From 3b7eb88fa33ce45cb609ce08042355514e4a61a1 Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin <153965+jlowin@users.noreply.github.com> Date: Tue, 16 Apr 2024 20:22:07 -0400 Subject: [PATCH 3/4] refactor --- README.md | 18 +- examples/readme_example.py | 12 +- pyproject.toml | 31 +-- src/control_flow/__init__.py | 11 +- src/control_flow/agent_old.py | 7 +- src/control_flow/core/__init__.py | 2 +- src/control_flow/core/agent.py | 108 +--------- src/control_flow/core/controller/__init__.py | 2 +- .../core/controller/controller.py | 89 ++++----- .../core/controller/delegation.py | 88 ++++----- .../core/controller/instruction_template.py | 143 +++++++++++--- src/control_flow/core/flow.py | 69 +++---- src/control_flow/core/task.py | 11 +- src/control_flow/dx.py | 186 ++++++++++++++++++ src/control_flow/instructions.py | 3 +- src/control_flow/utilities/logging.py | 44 +++++ tests/flows/test_sign_guestbook.py | 4 +- 17 files changed, 517 insertions(+), 311 deletions(-) create mode 100644 src/control_flow/dx.py create mode 100644 src/control_flow/utilities/logging.py diff --git a/README.md b/README.md index a29b8517..67ba20cc 100644 --- a/README.md +++ b/README.md @@ -12,10 +12,7 @@ At its core, ControlFlow is built on the idea of agent orchestration. It provide - **Flow**: A container for an AI-enhanced workflow that maintains consistent context and history. Flows are defined with the `@ai_flow` decorator. - **Task**: A discrete objective for AI agents to solve. Tasks can be defined with the `@ai_task` decorator or declared inline. -- **Agent**: Agents encapsulate the logic for applying an AI assistant to one or more tasks. -- **Controller**: Controllers are responsible for coordinating agents, delegating tasks, and managing the overall execution of the workflow. - -Users typically don't interact directly with agents or controllers. Instead, they define tasks and flows, which are then executed by the controller. The controller is responsible for managing the agents and ensuring that the workflow is executed correctly. +- **Agent**: an AI agent that can be assigned tasks ## Key Features @@ -38,33 +35,37 @@ pip install . ## Example ```python -from control_flow import ai_flow, ai_task, run_ai_task, instructions +from control_flow import ai_flow, ai_task, run_ai, instructions from pydantic import BaseModel + class Name(BaseModel): first_name: str last_name: str + @ai_task(user_access=True) def get_user_name() -> Name: pass + @ai_task def write_poem_about_user(name: Name, interests: list[str]) -> str: """write a poem based on the provided `name` and `interests`""" pass + @ai_flow() def demo(): - # set instructions that will be used for multiple tasks with instructions("talk like a pirate"): - # define an AI task as a function and have it execute it name = get_user_name() # define an AI task inline - interests = run_ai_task("ask user for three interests", cast=list[str], user_access=True) + interests = run_ai( + "ask user for three interests", cast=list[str], user_access=True + ) # set instructions for just the next task with instructions("no more than 8 lines"): @@ -72,6 +73,7 @@ def demo(): return poem + if __name__ == "__main__": demo() ``` diff --git a/examples/readme_example.py b/examples/readme_example.py index 0ac0298b..ac073e15 100644 --- a/examples/readme_example.py +++ b/examples/readme_example.py @@ -1,4 +1,4 @@ -from control_flow import ai_flow, ai_task, instructions, run_ai_task +from control_flow import ai_flow, ai_task, instructions, run_ai from pydantic import BaseModel @@ -12,7 +12,7 @@ def get_user_name() -> Name: pass -@ai_task() +@ai_task def write_poem_about_user(name: Name, interests: list[str]) -> str: """write a poem based on the provided `name` and `interests`""" pass @@ -26,10 +26,8 @@ def demo(): name = get_user_name() # define an AI task inline - interests = run_ai_task( - "ask user for three interests", - cast=list[str], - user_access=True, + interests = run_ai( + "ask user for three interests", cast=list[str], user_access=True ) # set instructions for just the next task @@ -39,5 +37,5 @@ def demo(): return poem -if __name__ == "main": +if __name__ == "__main__": demo() diff --git a/pyproject.toml b/pyproject.toml index 7bec0e5b..469fb4c1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,17 +3,31 @@ name = "control_flow" version = "0.1.0" description = "AI Workflows" authors = [ - { name = "Jeremiah Lowin", email = "153965+jlowin@users.noreply.github.com" } + { name = "Jeremiah Lowin", email = "153965+jlowin@users.noreply.github.com" }, ] dependencies = [ "marvin @ git+https://github.com/prefecthq/marvin@main", - "prefect @ git+https://github.com/prefecthq/prefect@main", + "prefect[dev] @ git+https://github.com/prefecthq/prefect@main", # can remove when prefect fully migrates to pydantic 2 "pydantic>=2", ] readme = "README.md" requires-python = ">= 3.9" -keywords = ["ai", "chatbot", "llm", "NLP", "natural language processing", "prefect", "workflow", "orchestration", "python", "GPT", "openai", "assistant", "agent"] +keywords = [ + "ai", + "chatbot", + "llm", + "NLP", + "natural language processing", + "prefect", + "workflow", + "orchestration", + "python", + "GPT", + "openai", + "assistant", + "agent", +] [project.urls] Code = "https://github.com/jlowin/ControlFlow" @@ -24,24 +38,19 @@ tests = [ "pytest-env>=0.8,<2.0", "pytest-rerunfailures>=10,<14", "pytest-sugar>=0.9,<2.0", - "pytest>=8.1.1", + "pytest>=7.0", "pytest-timeout", "pytest-xdist", "pre-commit>=3.7.0", ] -dev = [ - "control_flow[tests]", - "ipython>=8.22.2", - "pdbpp>=0.10.3", - "ruff>=0.3.4", -] +dev = ["control_flow[tests]", "ipython>=8.22.2", "pdbpp>=0.10.3", "ruff>=0.3.4"] [build-system] requires = ["hatchling"] build-backend = "hatchling.build" [tool.rye] -managed = true +managed = true [tool.hatch.metadata] allow-direct-references = true diff --git a/src/control_flow/__init__.py b/src/control_flow/__init__.py index 7ae7eaff..6f3237e5 100644 --- a/src/control_flow/__init__.py +++ b/src/control_flow/__init__.py @@ -1,9 +1,8 @@ from .settings import settings -# from .agent_old import ai_task, Agent, run_ai_task -from .core.flow import ai_flow, Flow +# from .agent_old import ai_task, Agent, run_ai +from .core.flow import Flow +from .core.agent import Agent from .core.controller.controller import Controller -from .core.agent import run_ai_task -# from .instructions import instructions - -# from marvin.beta.assistants import Assistant +from .instructions import instructions +from .dx import ai_flow, run_ai, ai_task diff --git a/src/control_flow/agent_old.py b/src/control_flow/agent_old.py index 22923269..4969ca15 100644 --- a/src/control_flow/agent_old.py +++ b/src/control_flow/agent_old.py @@ -488,8 +488,8 @@ def wrapper(*args, **kwargs): bound = sig.bind(*args, **kwargs) bound.apply_defaults() - # return run_ai_task.with_options(name=f"Task: {fn.__name__}")( - return run_ai_task( + # return run_ai.with_options(name=f"Task: {fn.__name__}")( + return run_ai( task=objective, cast=fn.__annotations__.get("return"), context=bound.arguments, @@ -514,11 +514,12 @@ def _name_from_objective(): # @prefect_task(task_run_name=_name_from_objective) -def run_ai_task( +def run_ai( task: str = None, cast: T = NOT_PROVIDED, context: dict = None, user_access: bool = None, + agents: list[Union[Agent, Assistant]] = None, **agent_kwargs: dict, ) -> T: """ diff --git a/src/control_flow/core/__init__.py b/src/control_flow/core/__init__.py index eaaedf38..f6856568 100644 --- a/src/control_flow/core/__init__.py +++ b/src/control_flow/core/__init__.py @@ -1,4 +1,4 @@ from .task import Task, TaskStatus from .flow import Flow from .agent import Agent -from .controller import Controller, SingleAgentController +from .controller import Controller diff --git a/src/control_flow/core/agent.py b/src/control_flow/core/agent.py index e3ea39d8..6fb7ecca 100644 --- a/src/control_flow/core/agent.py +++ b/src/control_flow/core/agent.py @@ -1,20 +1,14 @@ import logging from enum import Enum -from typing import Callable, TypeVar +from typing import Callable -from marvin.utilities.asyncio import ExposeSyncMethodsMixin, expose_sync_method -from pydantic import Field, field_validator +from marvin.utilities.tools import tool_from_function +from pydantic import Field -from control_flow.core.flow import Flow -from control_flow.core.task import Task, TaskStatus -from control_flow.utilities.context import ctx from control_flow.utilities.types import Assistant, AssistantTool, ControlFlowModel -T = TypeVar("T") logger = logging.getLogger(__name__) -NOT_PROVIDED = object() - class AgentStatus(Enum): INCOMPLETE = "incomplete" @@ -34,12 +28,7 @@ def talk_to_human(message: str, get_response: bool = True) -> str: return "Message sent to user" -class Agent(ControlFlowModel, ExposeSyncMethodsMixin): - tasks: list[Task] = Field(description="Tasks that the agent will complete.") - assistant: Assistant = Field(default_factory=Assistant) - instructions: str = None - tools: list[AssistantTool | Callable] = [] - context: dict = Field({}, validate_default=True) +class Agent(Assistant, ControlFlowModel): user_access: bool = Field( False, description="If True, the agent is given tools for interacting with a human user.", @@ -49,87 +38,10 @@ class Agent(ControlFlowModel, ExposeSyncMethodsMixin): description="If True, the agent will communicate with the controller via messages.", ) - @field_validator("tasks", mode="before") - def _validate_tasks(cls, v): - if not v: - raise ValueError("An agent must have at least one task.") - return v - - def task_ids(self) -> list[tuple[int, Task]]: - """ - Assign an ID to each task so they can be identified by the assistant. - """ - return [(i + 1, task) for i, task in enumerate(self.tasks)] - - def get_tools(self) -> list[AssistantTool | Callable]: - """ - Get all tools from the agent and its tasks. - """ - tools = self.tools - for i, task in self.task_ids(): - tools = tools + task.get_tools(task_id=i) - if self.user_access: - tools.append(talk_to_human) + def get_tools(self, user_access: bool = None) -> list[AssistantTool | Callable]: + if user_access is None: + user_access = self.user_access + tools = super().get_tools() + if user_access: + tools.append(tool_from_function(talk_to_human)) return tools - - @property - def status(self) -> AgentStatus: - """ - Check if all tasks have been completed. - """ - if any(task.status == TaskStatus.PENDING for task in self.tasks): - return AgentStatus.INCOMPLETE - else: - return AgentStatus.COMPLETE - - @expose_sync_method("run") - async def run_async(self, flow: Flow = None): - from control_flow.core.controller import SingleAgentController - - controller = SingleAgentController(agents=[self], flow=flow) - return await controller.run() - - -# @prefect_task(task_run_name=_name_from_objective) -def run_ai_task( - task: str = None, - cast: T = NOT_PROVIDED, - context: dict = None, - user_access: bool = None, - **agent_kwargs: dict, -) -> T: - """ - Create and run an agent to complete a task with the given objective and - context. This function is similar to an inline version of the @ai_task - decorator. - - This inline version is useful when you want to create and run an ad-hoc AI - task, without defining a function or using decorator syntax. It provides - more flexibility in terms of dynamically setting the task parameters. - Additional detail can be provided as `context`. - """ - - if cast is NOT_PROVIDED: - if not task: - cast = None - else: - cast = str - - # load flow - flow = ctx.get("flow", None) - - # create tasks - if task: - ai_tasks = [Task[cast](objective=task, context=context or {})] - else: - ai_tasks = [] - - # run agent - agent = Agent(tasks=ai_tasks, user_access=user_access or False, **agent_kwargs) - agent.run(flow=flow) - - if ai_tasks: - if ai_tasks[0].status == TaskStatus.COMPLETED: - return ai_tasks[0].result - elif ai_tasks[0].status == TaskStatus.FAILED: - raise ValueError(ai_tasks[0].error) diff --git a/src/control_flow/core/controller/__init__.py b/src/control_flow/core/controller/__init__.py index ab1f892c..5319894d 100644 --- a/src/control_flow/core/controller/__init__.py +++ b/src/control_flow/core/controller/__init__.py @@ -1,2 +1,2 @@ -from .controller import Controller, SingleAgentController +from .controller import Controller from .delegation import RoundRobin diff --git a/src/control_flow/core/controller/controller.py b/src/control_flow/core/controller/controller.py index 8efa24d5..c5d178ca 100644 --- a/src/control_flow/core/controller/controller.py +++ b/src/control_flow/core/controller/controller.py @@ -1,27 +1,30 @@ import logging -from typing import Any +from typing import Self from marvin.beta.assistants import PrintHandler, Run -from marvin.utilities.asyncio import ExposeSyncMethodsMixin +from marvin.utilities.asyncio import ExposeSyncMethodsMixin, expose_sync_method from pydantic import BaseModel, Field, field_validator, model_validator -from control_flow.core.agent import Agent, AgentStatus +from control_flow.core.agent import Agent from control_flow.core.controller.delegation import ( DelegationStrategy, RoundRobin, - Single, ) from control_flow.core.flow import Flow -from control_flow.instructions import get_instructions -from control_flow.utilities.context import ctx +from control_flow.core.task import Task, TaskStatus +from control_flow.instructions import get_instructions as get_context_instructions from control_flow.utilities.types import Thread logger = logging.getLogger(__name__) class Controller(BaseModel, ExposeSyncMethodsMixin): + flow: Flow agents: list[Agent] - flow: Flow = Field(None, validate_default=True) + tasks: list[Task] = Field( + description="Tasks that the controller will complete.", + default_factory=list, + ) delegation_strategy: DelegationStrategy = Field( validate_default=True, description="The strategy for delegating work to assistants.", @@ -30,29 +33,39 @@ class Controller(BaseModel, ExposeSyncMethodsMixin): # termination_strategy: TerminationStrategy context: dict = {} instructions: str = None + user_access: bool | None = Field( + None, + description="If True or False, overrides the user_access of the " + "agents. If None, the user_access setting of each agents is used.", + ) model_config: dict = dict(extra="forbid") - @field_validator("flow", mode="before") - def _default_flow(cls, v): - if v is None: - v = ctx.get("flow", None) - return v - @field_validator("agents", mode="before") def _validate_agents(cls, v): if not v: raise ValueError("At least one agent is required.") return v - async def run(self): + @model_validator(mode="after") + def _add_tasks_to_flow(self) -> Self: + for task in self.tasks: + self.flow.add_task(task) + return self + + @expose_sync_method("run") + async def run_async(self): """ Run the control flow. """ - while incomplete_agents := [ - a for a in self.agents if a.status == AgentStatus.INCOMPLETE - ]: - agent = self.delegation_strategy(incomplete_agents) + while True: + incomplete = any([t for t in self.tasks if t.status == TaskStatus.PENDING]) + if not incomplete: + break + if len(self.agents) > 1: + agent = self.delegation_strategy(self.agents) + else: + agent = self.agents[0] if not agent: return await self.run_agent(agent=agent) @@ -67,14 +80,22 @@ async def run_agent(self, agent: Agent, thread: Thread = None): agent=agent, controller=self, context=self.context, - instructions=get_instructions(), + instructions=get_context_instructions(), ) + instructions = instructions_template.render() + + tools = self.flow.tools + agent.get_tools(user_access=self.user_access) + + for task in self.tasks: + task_id = self.flow.get_task_id(task) + tools = tools + task.get_tools(task_id=task_id) + run = Run( - assistant=agent.assistant, + assistant=agent, thread=thread or self.flow.thread, - instructions=instructions_template.render(), - tools=agent.get_tools() + self.flow.tools, + instructions=instructions, + tools=tools, event_handler_class=PrintHandler, ) @@ -82,25 +103,5 @@ async def run_agent(self, agent: Agent, thread: Thread = None): return run - -class SingleAgentController(Controller): - """ - A SingleAgentController is a controller that runs a single agent. - """ - - delegation_strategy: Single = Field(None, validate_default=True) - - @field_validator("agents", mode="before") - def _validate_agents(cls, v): - if len(v) != 1: - raise ValueError("A SingleAgentController must have exactly one agent.") - return v - - @model_validator(mode="before") - @classmethod - def _create_single_strategy(cls, data: Any) -> Any: - """ - Create a Single delegation strategy with the agent. - """ - data["delegation_strategy"] = Single(agent=data["agents"][0]) - return data + def task_ids(self) -> dict[Task, int]: + return {task: self.flow.get_task_id(task) for task in self.tasks} diff --git a/src/control_flow/core/controller/delegation.py b/src/control_flow/core/controller/delegation.py index 095a7b9c..82875d20 100644 --- a/src/control_flow/core/controller/delegation.py +++ b/src/control_flow/core/controller/delegation.py @@ -1,12 +1,9 @@ import itertools from typing import TYPE_CHECKING, Any, Generator, Iterator -import marvin from pydantic import BaseModel, PrivateAttr -from control_flow.core.agent import Agent, AgentStatus -from control_flow.core.flow import get_flow_messages -from control_flow.instructions import get_instructions +from control_flow.core.agent import Agent if TYPE_CHECKING: from control_flow.core.agent import Agent @@ -23,7 +20,7 @@ def __call__(self, agents: list["Agent"]) -> Agent: """ return self._next_agent(agents) - def _next_agent(self, agents: list["Agent"]) -> "Agent": + def _next_agent(self, agents: list["Agent"], **kwargs) -> "Agent": """ Select an agent from a list of agents. """ @@ -37,7 +34,7 @@ class Single(DelegationStrategy): agent: Agent - def _next_agent(self, agents: list[Agent]) -> Generator[Any, Any, Agent]: + def _next_agent(self, agents: list[Agent], **kwargs) -> Generator[Any, Any, Agent]: """ Given a list of potential agents, choose the single agent. """ @@ -52,54 +49,43 @@ class RoundRobin(DelegationStrategy): _cycle: Iterator[Agent] = PrivateAttr(None) - def _next_agent(self, agents: list["Agent"]) -> "Agent": + def _next_agent(self, agents: list["Agent"], **kwargs) -> "Agent": """ Given a list of potential agents, delegate the tasks in a round-robin fashion. """ # the first time this is called, create a cycle iterator if self._cycle is None: self._cycle = itertools.cycle(agents) - - # cycle once through all agents, returning the first one that is in the list - # if no agent is found after cycling through all agents, return None - first_agent_seen = None - for agent in self._cycle: - if agent in agents: - return agent - elif agent == first_agent_seen: - break - - # remember the first agent seen so we can avoid an infinite loop - if first_agent_seen is None: - first_agent_seen = agent - - -class Moderator(DelegationStrategy): - """ - A Moderator delegation strategy delegates tasks to the most qualified AI assistant, using a Marvin classifier - """ - - model: str = None - - def _next_agent(self, agents: list["Agent"]) -> "Agent": - """ - Given a list of potential agents, choose the most qualified assistant to complete the tasks. - """ - - instructions = get_instructions() - history = get_flow_messages() - - context = dict(messages=history, global_instructions=instructions) - agent = marvin.classify( - context, - [a for a in agents if a.status == AgentStatus.INCOMPLETE], - instructions=""" - Given the conversation context, choose the AI agent most - qualified to take the next turn at completing the tasks. Take into - account the instructions, each agent's own instructions, and the - tools they have available. - """, - model_kwargs=dict(model=self.model), - ) - - return agent + return next(self._cycle) + + +# class Moderator(DelegationStrategy): +# """ +# A Moderator delegation strategy delegates tasks to the most qualified AI assistant, using a Marvin classifier +# """ + +# model: str = None + +# def _next_agent( +# self, agents: list["Agent"], tasks: list[Task], history: list[Message] +# ) -> "Agent": +# """ +# Given a list of potential agents, choose the most qualified assistant to complete the tasks. +# """ + +# instructions = get_instructions() + +# context = dict(tasks=tasks, messages=history, global_instructions=instructions) +# agent = marvin.classify( +# context, +# [a for a in agents if a.status == AgentStatus.INCOMPLETE], +# instructions=""" +# Given the conversation context, choose the AI agent most +# qualified to take the next turn at completing the tasks. Take into +# account the instructions, each agent's own instructions, and the +# tools they have available. +# """, +# model_kwargs=dict(model=self.model), +# ) + +# return agent diff --git a/src/control_flow/core/controller/instruction_template.py b/src/control_flow/core/controller/instruction_template.py index 6a50e576..c597167e 100644 --- a/src/control_flow/core/controller/instruction_template.py +++ b/src/control_flow/core/controller/instruction_template.py @@ -24,31 +24,86 @@ def render(self) -> str: class AgentTemplate(Template): template: str = """ - You are an AI agent. Your name is "{{ agent.assistant.name }}". + You are an AI agent. Your name is "{{ agent.name }}". + + {% if agent.description %} + + The following description has been provided for you: + {{ agent.description }} + + {% endif -%} + + Your job is to work on any pending tasks until you can mark them as either + `complete` or `failed`. The following instructions will provide you with all + the context you need to complete your tasks. Note that using a tool to + complete or fail a task is your ultimate objective, and you should not post + any messages to the thread unless you have a specific reason to do so. + """ + agent: Agent + + +class CommunicationTemplate(Template): + template: str = """ + ## Communciation + + ### Posting messages to the thread You have been created by a Controller in the Python library ControlFlow in order to complete various tasks or instructions. All messages in this thread - are either from the Controller or from other AI agents like you. Preface any - response with your name (e.g. "{{ agent.assistant.name }}: Hello!") in order - to distinguish your messages. + are either from the controller or from AI agents like you. Note that all + agents post to the thread with the `Assistant` role, so if you do need to + post a message, preface with your name (e.g. "{{ agent.name }}: Hello!") in + order to distinguish your messages. + + The controller CAN NOT and WILL NOT read your messages, so DO NOT post + messages unless you need to send information to another agent. DO NOT post + messages about information already captured by your tool calls, such as the + tool call itself, its result, human responses, or task completion. + + ### Talking to humans {% if agent.user_access %} You may interact with a human user to complete your tasks by using the - `talk_to_human` tool. The human is unaware of your tasks or the Controller. + `talk_to_human` tool. The human is unaware of your tasks or the controller. Do not mention them or anything else about how this system works. The human - can only see messages you send via tool, not the rest of the thread. + can only see messages you send them via tool, not the rest of the thread. - Humans may give poor or incorrect responses. You may need to ask questions - multiple times and not be too literal in your interpretation of their - responses. + Humans may give poor, incorrect, or partial responses. You may need to ask + questions multiple times in order to complete your tasks. Use good judgement + to determine the best way to achieve your goal. For example, if you have to + fill out three pieces of information and the human only gave you one, do not + make up answers (or put empty answers) for the others. Ask again and only + fail the task if you truly can not make progress. {% else %} You can not interact with a human at this time. {% endif %} + """ agent: Agent +class CollaborationTemplate(Template): + template: str = """ + ## Collaboration + + You are collaborating with other AI agents. They are listed below by name, + along with a brief description. Note that all agents post messages to the + same thread with the `Assistant` role, so pay attention to the name of the + agent that is speaking. Only one agent needs to indicate that a task is + complete. + + ### Agents + {% for agent in other_agents %} + {{loop.index}}. "{{agent.name}}": {{agent.description}} + {% endfor %} + {% if not other_agents %} + (There are no other agents currently participating in this workflow) + {% endif %} + """ + other_agents: list[Agent] + + class InstructionsTemplate(Template): template: str = """ ## Instructions @@ -56,24 +111,32 @@ class InstructionsTemplate(Template): {% if flow_instructions %} ### Workflow instructions + These instructions apply to the entire workflow: + {{ flow_instructions }} {% endif %} {% if controller_instructions %} ### Controller instructions + These instructions apply to these tasks: + {{ controller_instructions }} {% endif %} {% if agent_instructions %} ### Agent instructions + These instructions apply only to you: + {{ agent_instructions }} {% endif %} {% if additional_instructions %} ### Additional instructions + These instructions were additionally provided for this part of the workflow: + {% for instruction in additional_instructions %} - {{ instruction }} {% endfor %} @@ -99,9 +162,19 @@ class TasksTemplate(Template): template: str = """ ## Tasks - {% for task_id, task in agent.task_ids() %} - ### Task {{ task_id }} - - Status: {{ task.status }} + ### Active tasks + + The following tasks are pending. You and any other agents are responsible + for completing them and will continue to be invoked until you mark each + task as either "completed" or "failed" with the appropriate tool. + + Note: Task IDs are assigned for identification purposes only and will be + resused after tasks complete. + + {% for task in controller.tasks %} + {% if task.status.value == "pending" %} + #### Task {{ controller.flow.get_task_id(task) }} + - Status: {{ task.status.value }} - Objective: {{ task.objective }} {% if task.instructions %} - Instructions: {{ task.instructions }} @@ -115,12 +188,31 @@ class TasksTemplate(Template): - Context: {{ task.context }} {% endif %} + {% endif %} + {% endfor %} + + ### Completed tasks + The following tasks were recently completed: + + {% for task in controller.flow.completed_tasks(reverse=True, limit=20) %} + #### Task {{ controller.flow.get_task_id(task) }} + - Status: {{ task.status.value }} + - Objective: {{ task.objective }} + {% if task.status.value == "completed" %} + - Result: {{ task.result }} + {% elif task.status.value == "failed" %} + - Error: {{ task.error }} + {% endif %} + {% if task.context %} + - Context: {{ task.context }} + {% endif %} + {% endfor %} """ - agent: Agent + controller: Controller def should_render(self): - return any(self.agent.tasks) + return any(self.controller.tasks) class ContextTemplate(Template): @@ -140,20 +232,12 @@ class ContextTemplate(Template): - *{{ key }}*: {{ value }} {% endfor %} {% endif %} - - {% if agent_context %} - ### Agent context - {% for key, value in agent_context.items() %} - - *{{ key }}*: {{ value }} - {% endfor %} - {% endif %} """ flow_context: dict controller_context: dict - agent_context: dict def should_render(self): - return bool(self.flow_context or self.controller_context or self.agent_context) + return bool(self.flow_context or self.controller_context) class MainTemplate(BaseModel): @@ -165,17 +249,20 @@ class MainTemplate(BaseModel): def render(self): templates = [ AgentTemplate(agent=self.agent), + TasksTemplate(controller=self.controller), + ContextTemplate( + flow_context=self.controller.flow.context, + controller_context=self.controller.context, + ), InstructionsTemplate( flow_instructions=self.controller.flow.instructions, controller_instructions=self.controller.instructions, agent_instructions=self.agent.instructions, additional_instructions=self.instructions, ), - TasksTemplate(agent=self.agent), - ContextTemplate( - flow_context=self.controller.flow.context, - controller_context=self.controller.context, - agent_context=self.agent.context, + CommunicationTemplate(agent=self.agent), + CollaborationTemplate( + other_agents=[a for a in self.controller.agents if a != self.agent] ), ] diff --git a/src/control_flow/core/flow.py b/src/control_flow/core/flow.py index f4f07030..069a57c1 100644 --- a/src/control_flow/core/flow.py +++ b/src/control_flow/core/flow.py @@ -1,15 +1,13 @@ -import functools from typing import Callable from marvin.beta.assistants import Thread -from marvin.utilities.logging import get_logger from openai.types.beta.threads import Message -from prefect import flow as prefect_flow from prefect import task as prefect_task from pydantic import Field, field_validator +from control_flow.core.task import Task, TaskStatus from control_flow.utilities.context import ctx -from control_flow.utilities.marvin import patch_marvin +from control_flow.utilities.logging import get_logger from control_flow.utilities.types import AssistantTool, ControlFlowModel logger = get_logger(__name__) @@ -23,6 +21,7 @@ class Flow(ControlFlowModel): instructions: str | None = None model: str | None = None context: dict = {} + tasks: dict[Task, int] = Field(repr=False, default_factory=dict) @field_validator("thread", mode="before") def _load_thread_from_ctx(cls, v): @@ -38,54 +37,32 @@ def _load_thread_from_ctx(cls, v): def add_message(self, message: str): prefect_task(self.thread.add)(message) + def add_task(self, task: Task): + if task not in self.tasks: + task_id = len(self.tasks) + 1 + self.tasks[task] = task_id + # this message is important for contexualizing activity + # self.add_message(f'Task #{task_id} added to flow: "{task.objective}"') -def ai_flow( - fn=None, - *, - thread: Thread = None, - tools: list[AssistantTool | Callable] = None, - instructions: str = None, - model: str = None, -): - """ - Prepare a function to be executed as a Control Flow flow. - """ + def get_task_id(self, task: Task): + return self.tasks[task] - if fn is None: - return functools.partial( - ai_flow, - thread=thread, - tools=tools, - instructions=instructions, - model=model, + def pending_tasks(self): + return sorted( + (t for t in self.tasks if t.status == TaskStatus.PENDING), + key=lambda t: t.created_at, ) - @functools.wraps(fn) - def wrapper( - *args, - flow_kwargs: dict = None, - **kwargs, - ): - p_fn = prefect_flow(fn) - - flow_obj = Flow( - **{ - "thread": thread, - "tools": tools or [], - "instructions": instructions, - "model": model, - **(flow_kwargs or {}), - } + def completed_tasks(self, reverse=False, limit=None): + result = sorted( + (t for t in self.tasks if t.status != TaskStatus.PENDING), + key=lambda t: t.completed_at, + reverse=reverse, ) - logger.info( - f'Executing AI flow "{fn.__name__}" on thread "{flow_obj.thread.id}"' - ) - - with ctx(flow=flow_obj), patch_marvin(): - return p_fn(*args, **kwargs) - - return wrapper + if limit: + result = result[:limit] + return result def get_flow() -> Flow: diff --git a/src/control_flow/core/task.py b/src/control_flow/core/task.py index fdadc02b..09672333 100644 --- a/src/control_flow/core/task.py +++ b/src/control_flow/core/task.py @@ -1,12 +1,13 @@ +import datetime from enum import Enum from typing import Callable, Generic, TypeVar import marvin import marvin.utilities.tools -from marvin.utilities.logging import get_logger from marvin.utilities.tools import FunctionTool from pydantic import Field +from control_flow.utilities.logging import get_logger from control_flow.utilities.types import AssistantTool, ControlFlowModel T = TypeVar("T") @@ -27,6 +28,8 @@ class Task(ControlFlowModel, Generic[T]): result: T = None error: str | None = None tools: list[AssistantTool | Callable] = [] + created_at: datetime.datetime = Field(default_factory=datetime.datetime.now) + completed_at: datetime.datetime | None = None def __hash__(self): return id(self) @@ -37,9 +40,9 @@ def _create_complete_tool(self, task_id: int) -> FunctionTool: """ result_type = self.get_result_type() + # wrap the method call to get the correct result type signature def complete(result: result_type): - self.result = result - self.status = TaskStatus.COMPLETED + self.complete(result=result) tool = marvin.utilities.tools.tool_from_function( complete, @@ -69,10 +72,12 @@ def get_tools(self, task_id: int) -> list[AssistantTool | Callable]: def complete(self, result: T): self.result = result self.status = TaskStatus.COMPLETED + self.completed_at = datetime.datetime.now() def fail(self, message: str | None = None): self.error = message self.status = TaskStatus.FAILED + self.completed_at = datetime.datetime.now() def get_result_type(self) -> T: """ diff --git a/src/control_flow/dx.py b/src/control_flow/dx.py new file mode 100644 index 00000000..52161038 --- /dev/null +++ b/src/control_flow/dx.py @@ -0,0 +1,186 @@ +import functools +import inspect +from typing import Callable, TypeVar + +from prefect import flow as prefect_flow +from prefect import task as prefect_task + +from control_flow.core.agent import Agent +from control_flow.core.flow import Flow +from control_flow.core.task import Task, TaskStatus +from control_flow.utilities.context import ctx +from control_flow.utilities.logging import get_logger +from control_flow.utilities.marvin import patch_marvin +from control_flow.utilities.types import AssistantTool, Thread + +logger = get_logger(__name__) +T = TypeVar("T") +NOT_PROVIDED = object() + + +def ai_flow( + fn=None, + *, + thread: Thread = None, + tools: list[AssistantTool | Callable] = None, + instructions: str = None, + model: str = None, +): + """ + Prepare a function to be executed as a Control Flow flow. + """ + + if fn is None: + return functools.partial( + ai_flow, + thread=thread, + tools=tools, + instructions=instructions, + model=model, + ) + + @functools.wraps(fn) + def wrapper( + *args, + flow_kwargs: dict = None, + **kwargs, + ): + p_fn = prefect_flow(fn) + p_fn = fn + + flow_obj = Flow( + **{ + "thread": thread, + "tools": tools or [], + "instructions": instructions, + "model": model, + **(flow_kwargs or {}), + } + ) + + logger.info( + f'Executing AI flow "{fn.__name__}" on thread "{flow_obj.thread.id}"' + ) + + with ctx(flow=flow_obj), patch_marvin(): + return p_fn(*args, **kwargs) + + return wrapper + + +def ai_task( + fn=None, *, objective: str = None, user_access: bool = None, **agent_kwargs: dict +): + """ + Use a Python function to create an AI task. When the function is called, an + agent is created to complete the task and return the result. + """ + + if fn is None: + return functools.partial( + ai_task, objective=objective, user_access=user_access, **agent_kwargs + ) + + sig = inspect.signature(fn) + + if objective is None: + if fn.__doc__: + objective = f"{fn.__name__}: {fn.__doc__}" + else: + objective = fn.__name__ + + @functools.wraps(fn) + def wrapper(*args, **kwargs): + # first process callargs + bound = sig.bind(*args, **kwargs) + bound.apply_defaults() + + # return run_ai.with_options(name=f"Task: {fn.__name__}")( + return run_ai( + tasks=objective, + cast=fn.__annotations__.get("return"), + context=bound.arguments, + user_access=user_access, + **agent_kwargs, + ) + + return wrapper + + +def _name_from_objective(): + """Helper function for naming task runs""" + from prefect.runtime import task_run + + objective = task_run.parameters.get("task") + + if not objective: + objective = "Follow general instructions" + if len(objective) > 75: + return f"Task: {objective[:75]}..." + return f"Task: {objective}" + + +@prefect_task(task_run_name=_name_from_objective) +def run_ai( + tasks: str | list[str], + agents: list[Agent] = None, + cast: T = NOT_PROVIDED, + context: dict = None, + user_access: bool = False, +) -> T | list[T]: + """ + Create and run an agent to complete a task with the given objective and + context. This function is similar to an inline version of the @ai_task + decorator. + + This inline version is useful when you want to create and run an ad-hoc AI + task, without defining a function or using decorator syntax. It provides + more flexibility in terms of dynamically setting the task parameters. + Additional detail can be provided as `context`. + """ + + single_result = False + if isinstance(tasks, str): + single_result = True + + tasks = [tasks] + + if cast is NOT_PROVIDED: + if not tasks: + cast = None + else: + cast = str + + # load flow + flow = ctx.get("flow", None) + + # create tasks + if tasks: + ai_tasks = [Task[cast](objective=t, context=context or {}) for t in tasks] + else: + ai_tasks = [] + + # create agent + if agents is None: + agents = [Agent()] + + # create Controller + from control_flow.core.controller.controller import Controller + + controller = Controller( + tasks=ai_tasks, agents=agents, flow=flow, user_access=user_access + ) + controller.run() + + if ai_tasks: + if all(task.status == TaskStatus.COMPLETED for task in ai_tasks): + result = [task.result for task in ai_tasks] + if single_result: + result = result[0] + return result + elif failed_tasks := [ + task for task in ai_tasks if task.status == TaskStatus.FAILED + ]: + raise ValueError( + f'Failed tasks: {", ".join([task.objective for task in failed_tasks])}' + ) diff --git a/src/control_flow/instructions.py b/src/control_flow/instructions.py index 6ec1d968..97e89664 100644 --- a/src/control_flow/instructions.py +++ b/src/control_flow/instructions.py @@ -2,10 +2,9 @@ from contextlib import contextmanager from typing import Generator, List -from marvin.utilities.logging import get_logger - from control_flow.core.flow import Flow from control_flow.utilities.context import ctx +from control_flow.utilities.logging import get_logger logger = get_logger(__name__) diff --git a/src/control_flow/utilities/logging.py b/src/control_flow/utilities/logging.py new file mode 100644 index 00000000..52a57cff --- /dev/null +++ b/src/control_flow/utilities/logging.py @@ -0,0 +1,44 @@ +import logging +from functools import lru_cache +from typing import Optional + +from marvin.utilities.logging import add_logging_methods + + +@lru_cache() +def get_logger(name: Optional[str] = None) -> logging.Logger: + """ + Retrieves a logger with the given name, or the root logger if no name is given. + + Args: + name: The name of the logger to retrieve. + + Returns: + The logger with the given name, or the root logger if no name is given. + + Example: + Basic Usage of `get_logger` + ```python + from control_flow.utilities.logging import get_logger + + logger = get_logger("control_flow.test") + logger.info("This is a test") # Output: control_flow.test: This is a test + + debug_logger = get_logger("control_flow.debug") + debug_logger.debug_kv("TITLE", "log message", "green") + ``` + """ + parent_logger = logging.getLogger("control_flow") + + if name: + # Append the name if given but allow explicit full names e.g. "control_flow.test" + # should not become "control_flow.control_flow.test" + if not name.startswith(parent_logger.name + "."): + logger = parent_logger.getChild(name) + else: + logger = logging.getLogger(name) + else: + logger = parent_logger + + add_logging_methods(logger) + return logger diff --git a/tests/flows/test_sign_guestbook.py b/tests/flows/test_sign_guestbook.py index 14d815ac..66b7b2e8 100644 --- a/tests/flows/test_sign_guestbook.py +++ b/tests/flows/test_sign_guestbook.py @@ -1,4 +1,4 @@ -from control_flow import Assistant, run_ai_task +from control_flow import Assistant, run_ai from control_flow.core.flow import ai_flow # define assistants @@ -28,7 +28,7 @@ def view_guestbook(): @ai_flow def guestbook_flow(): - run_ai_task( + run_ai( """ Add your name to the list using the `sign` tool. All assistants must sign their names for the task to be complete. You can read the sign to From 618b6e088b2ab737bb750eecbfba217ccbda456c Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin <153965+jlowin@users.noreply.github.com> Date: Tue, 16 Apr 2024 20:38:33 -0400 Subject: [PATCH 4/4] Prefect --- src/control_flow/agent_old.py | 559 ------------------ src/control_flow/core/agent.py | 78 +++ .../core/controller/controller.py | 119 +++- src/control_flow/dx.py | 1 - 4 files changed, 189 insertions(+), 568 deletions(-) delete mode 100644 src/control_flow/agent_old.py diff --git a/src/control_flow/agent_old.py b/src/control_flow/agent_old.py deleted file mode 100644 index 4969ca15..00000000 --- a/src/control_flow/agent_old.py +++ /dev/null @@ -1,559 +0,0 @@ -import functools -import inspect -import json -import logging -from typing import Callable, Generic, TypeVar, Union - -import marvin -import marvin.utilities.tools -import prefect -from marvin.beta.assistants.assistants import Assistant -from marvin.beta.assistants.handlers import PrintHandler -from marvin.beta.assistants.runs import Run -from marvin.tools.assistants import AssistantTool, EndRun -from marvin.types import FunctionTool -from marvin.utilities.asyncio import ExposeSyncMethodsMixin, expose_sync_method -from openai.types.beta.threads.runs import ToolCall -from prefect import get_client as get_prefect_client -from prefect import task as prefect_task -from prefect.context import FlowRunContext -from pydantic import BaseModel, Field, field_validator - -from control_flow import settings -from control_flow.core.flow import Flow -from control_flow.core.task import Task, TaskStatus -from control_flow.utilities.context import ctx -from control_flow.utilities.jinja import jinja_env -from control_flow.utilities.prefect import ( - create_json_artifact, - create_markdown_artifact, - create_python_artifact, -) - -T = TypeVar("T") -logger = logging.getLogger(__name__) - -NOT_PROVIDED = object() - - -TOOL_CALL_FUNCTION_RESULT_TEMPLATE = inspect.cleandoc( - """ - ## Tool call: {name} - - **Description:** {description} - - ## Arguments - - ```json - {args} - ``` - - ### Result - - ```json - {result} - ``` - """ -) - - -INSTRUCTIONS = """ -You are an AI assistant. Your name is `{{ assistant.name}}`. Your job is to -complete the tasks assigned to you. You were created by a software application, -and any messages you receive are from that software application, not a user. You -may use any tools at your disposal to complete the task, including talking to a -human user. - - -## Instructions - -Follow these instructions at all times: - -{% if assistant.instructions -%} -- {{ assistant.instructions }} -{% endif %} -{% if flow.instructions -%} -- {{ flow.instructions }} -{% endif %} -{% if agent.instructions -%} -- {{ agent.instructions }} -{% endif %} -{% for instruction in instructions %} -- {{ instruction }} -{% endfor %} - - -## Assistants - -The following assistants are collaborating to complete the tasks. - -{% for a in agent.assistants %} -- {{ a.name }} {%- if id(a) == id(assistant) %} (this is you){%endif%}: {{ a.description | default("(no description provided)")}} -{% endfor %} - -{% if agent.assistants|length > 1 %} -Use the `end_run` tool to end your turn and allow another assistant to take -over, without marking a task as complete or failed. -{% endif %} - - - -## Tasks - -{% if agent.tasks %} -You have been assigned the following tasks. You will continue to run until all -tasks are finished. It may take multiple attempts, iterations, or tool uses to -complete a task. When a task is finished, mark it as `completed` -(and provide a result, if required) or `failed` (with a brief explanation) by -using the appropriate tool. Do not mark a task as complete if you don't have a -complete result. Do not make up results. If you receive only partial or unclear -information from a user, keep working until you have all the information you -need. Be very sure that a task is truly unsolvable before marking it as failed, -especially when working with a human user. - - -{% for task_id, task in agent.numbered_tasks() %} -### Task {{ task_id }} -- Status: {{ task.status.value }} -- Objective: {{ task.objective }} -{% if task.instructions %} -- Additional instructions: {{ task.instructions }} -{% endif %} -{% if task.status.value == "completed" %} -- Result: {{ task.result }} -{% elif task.status.value == "failed" %} -- Error: {{ task.error }} -{% endif %} -{% if task.context %} -- Context: {{ task.context }} -{% endif %} - -{% endfor %} -{% else %} -You have no explicit tasks to complete. Follow your instructions as best as you -can. If it is not possible to comply with the instructions in any way, use the -`end_run` tool to manually stop the run. -{% endif %} - -## Communication - -All messages you receive in the thread are generated by the software that -created you, not a human user. All messages you send are sent only to that -software and are never seen by any human. - -{% if agent.system_access -%} -The software that created you is an AI capable of processing natural language, -so you can freely respond by posting messages to the thread. There may be more -than one AI posting responses, so preface your messages with your own name to -avoid confusion. For example, if your name is "Marvin", start all of your -messages with "Marvin:". This is not required for tool calls. -{% else %} -The software that created you is a Python script that can only process -structured responses produced by your tools. DO NOT POST ANY MESSAGES OR RESPONSES TO THE -THREAD. They will be ignored and only waste time. ONLY USE TOOLS TO RESPOND. -{% endif %} - -{% if agent.user_access -%} -There is also a human user who may be involved in the task. You can communicate -with them using the `talk_to_human` tool. The user is a human and unaware of -your tasks or this system. Do not mention your tasks or anything about how the -system works to them. They can only see messages you send them via tool, not the -rest of the thread. When dealing with humans, you may not always get a clear or -correct response. You may need to ask multiple times or rephrase your questions. -You should also interpret human responses broadly and not be too literal. -{% else %} -You can not communicate with a human user at this time. -{% endif %} - - -{% if context %} -## Additional context - -The following context was provided: -{% for key, value in context.items() -%} -- {{ key }}: {{ value }} -{% endfor %} -{% endif %} -""" - - -def talk_to_human(message: str, get_response: bool = True) -> str: - """ - Send a message to the human user and optionally wait for a response. - If `get_response` is True, the function will return the user's response, - otherwise it will return a simple confirmation. - """ - print(message) - if get_response: - response = input("> ") - return response - return "Message sent to user" - - -def end_run(): - """Use this tool to end the run without marking a task as complete or failed.""" - return EndRun() - - -class Agent(BaseModel, Generic[T], ExposeSyncMethodsMixin): - tasks: list[Task] = [] - assistant: Assistant = Field(None, validate_default=True) - tools: list[Union[AssistantTool, Assistant, Callable]] = [] - context: dict = Field(None, validate_default=True) - user_access: bool = Field( - None, - validate_default=True, - description="If True, the agent is given tools for interacting with a human user.", - ) - system_access: bool = Field( - None, - validate_default=True, - description="If True, the agent will communicate with the system via messages. " - "This is usually only used when the agent was spawned by another " - "agent capable of understanding its responses.", - ) - instructions: str = None - model_config: dict = dict(extra="forbid") - - @field_validator("flow", mode="before") - def _load_flow_from_ctx(cls, v): - if v is None: - v = ctx.get("flow", None) - if v is None: - v = Flow() - return v - - @field_validator("context", mode="before") - def _default_context(cls, v): - if v is None: - v = {} - return v - - @field_validator("assistant", mode="before") - def _default_assistants(cls, v): - if v is None: - v = Assistant() - return v - - @field_validator("user_access", "system_access", mode="before") - def _default_access(cls, v): - if v is None: - v = False - return v - - def numbered_tasks(self) -> list[tuple[int, Task]]: - return [(i + 1, task) for i, task in enumerate(self.tasks)] - - def _get_instructions(self, assistant: Assistant, context: dict = None): - instructions = jinja_env.render( - INSTRUCTIONS, - agent=self, - assistant=assistant, - instructions=ctx.get("instructions", []), - context={**self.context, **(context or {})}, - ) - return instructions - - def _get_tools(self, tools: list[AssistantTool | Callable]) -> list[AssistantTool]: - tools = self.tools + self.assistant.tools + tools - - if not self.tasks: - tools.append(end_run) - - # if there is only one task, and the agent can't send a response to the - # system, then we can quit as soon as it is marked finished - if not self.system_access and len(self.tasks) == 1: - early_end_run = True - else: - early_end_run = False - - for i, task in self.numbered_tasks(): - tools.extend( - [ - task._create_complete_tool(task_id=i, end_run=early_end_run), - task._create_fail_tool(task_id=i, end_run=early_end_run), - ] - ) - - if self.user_access: - tools.append(talk_to_human) - - final_tools = [] - for tool in tools: - if not isinstance(tool, AssistantTool): - tool = marvin.utilities.tools.tool_from_function(tool) - - if isinstance(tool, FunctionTool): - - async def modified_fn( - *args, - # provide default args to avoid a late-binding issue - original_fn: Callable = tool.function._python_fn, - tool: FunctionTool = tool, - **kwargs, - ): - # call fn - result = original_fn(*args, **kwargs) - - passed_args = ( - inspect.signature(original_fn).bind(*args, **kwargs).arguments - ) - try: - passed_args = json.dumps(passed_args, indent=2) - except Exception: - pass - create_markdown_artifact( - markdown=TOOL_CALL_FUNCTION_RESULT_TEMPLATE.format( - name=tool.function.name, - description=tool.function.description or "(none provided)", - args=passed_args, - result=result, - ), - key="result", - ) - return result - - tool.function._python_fn = prefect_task( - modified_fn, - task_run_name=f"Tool call: {tool.function.name}", - ) - final_tools.append(tool) - return final_tools - - def _get_run_assistant_task(self): - """ - Helper function for building the task that will execute the OpenAI assistant run. - This needs to be regenerated each time in case the instructions change. - """ - - @prefect_task(task_run_name=f'Run OpenAI assistant: "{self.assistant.name}"') - async def run_openai_assistant( - context: dict = None, run_kwargs: dict = None - ) -> Run: - run_kwargs = run_kwargs or {} - model = run_kwargs.pop( - "model", - self.assistant.model or self.flow.model or settings.assistant_model, - ) - thread = run_kwargs.pop("thread", self.flow.thread) - - run = Run( - assistant=self.assistant, - thread=thread, - instructions=self._get_instructions(context=context), - tools=self._get_tools(), - event_handler_class=AgentHandler, - model=model, - **run_kwargs, - ) - await run.run_async() - create_json_artifact( - key="messages", - # dump explicilty because of odd OAI serialization issue - data=[m.model_dump() for m in run.messages], - description="All messages sent and received during the run.", - ) - create_json_artifact( - key="actions", - # dump explicilty because of odd OAI serialization issue - data=[s.model_dump() for s in run.steps], - description="All actions taken by the assistant during the run.", - ) - return run - - return run_openai_assistant - - @expose_sync_method("run") - async def run_async(self, context: dict = None, **run_kwargs) -> list[Task]: - openai_run = self._get_run_assistant_task() - - openai_run( - context=context, - run_kwargs=run_kwargs, - ) - - # if this AI can't post messages to the system, then continue to invoke - # it until all tasks are finished - if not self.system_access: - counter = 0 - while ( - any(t.status == TaskStatus.PENDING for t in self.tasks) - and counter < settings.max_agent_iterations - ): - openai_run( - context=context, - run_kwargs=run_kwargs, - ) - counter += 1 - - result = [t.result for t in self.tasks if t.status == TaskStatus.COMPLETED] - - return result - - -class AgentHandler(PrintHandler): - def __init__(self, **kwargs): - super().__init__(**kwargs) - self.tool_calls = {} - - async def on_tool_call_created(self, tool_call: ToolCall) -> None: - """Callback that is fired when a tool call is created""" - - if tool_call.type == "function": - task_run_name = "Prepare arguments for tool call" - else: - task_run_name = f"Tool call: {tool_call.type}" - - client = get_prefect_client() - engine_context = FlowRunContext.get() - if not engine_context: - return - - task_run = await client.create_task_run( - task=prefect.Task(fn=lambda: None), - name=task_run_name, - extra_tags=["tool-call"], - flow_run_id=engine_context.flow_run.id, - dynamic_key=tool_call.id, - state=prefect.states.Running(), - ) - - self.tool_calls[tool_call.id] = task_run - - async def on_tool_call_done(self, tool_call: ToolCall) -> None: - """Callback that is fired when a tool call is done""" - - client = get_prefect_client() - task_run = self.tool_calls.get(tool_call.id) - if not task_run: - return - await client.set_task_run_state( - task_run_id=task_run.id, state=prefect.states.Completed(), force=True - ) - - # code interpreter is run as a single call, so we can publish a result artifact - if tool_call.type == "code_interpreter": - # images = [] - # for output in tool_call.code_interpreter.outputs: - # if output.type == "image": - # image_path = download_temp_file(output.image.file_id) - # images.append(image_path) - - create_python_artifact( - key="code", - code=tool_call.code_interpreter.input, - description="Code executed in the code interpreter", - task_run_id=task_run.id, - ) - create_json_artifact( - key="output", - data=tool_call.code_interpreter.outputs, - description="Output from the code interpreter", - task_run_id=task_run.id, - ) - - elif tool_call.type == "function": - create_json_artifact( - key="arguments", - data=json.dumps(json.loads(tool_call.function.arguments), indent=2), - description=f"Arguments for the `{tool_call.function.name}` tool", - task_run_id=task_run.id, - ) - - -def ai_task( - fn=None, *, objective: str = None, user_access: bool = None, **agent_kwargs: dict -): - """ - Use a Python function to create an AI task. When the function is called, an - agent is created to complete the task and return the result. - """ - - if fn is None: - return functools.partial( - ai_task, objective=objective, user_access=user_access, **agent_kwargs - ) - - sig = inspect.signature(fn) - - if objective is None: - if fn.__doc__: - objective = f"{fn.__name__}: {fn.__doc__}" - else: - objective = fn.__name__ - - @functools.wraps(fn) - def wrapper(*args, **kwargs): - # first process callargs - bound = sig.bind(*args, **kwargs) - bound.apply_defaults() - - # return run_ai.with_options(name=f"Task: {fn.__name__}")( - return run_ai( - task=objective, - cast=fn.__annotations__.get("return"), - context=bound.arguments, - user_access=user_access, - **agent_kwargs, - ) - - return wrapper - - -def _name_from_objective(): - """Helper function for naming task runs""" - from prefect.runtime import task_run - - objective = task_run.parameters.get("task") - - if not objective: - objective = "Follow general instructions" - if len(objective) > 75: - return f"Task: {objective[:75]}..." - return f"Task: {objective}" - - -# @prefect_task(task_run_name=_name_from_objective) -def run_ai( - task: str = None, - cast: T = NOT_PROVIDED, - context: dict = None, - user_access: bool = None, - agents: list[Union[Agent, Assistant]] = None, - **agent_kwargs: dict, -) -> T: - """ - Create and run an agent to complete a task with the given objective and - context. This function is similar to an inline version of the @ai_task - decorator. - - This inline version is useful when you want to create and run an ad-hoc AI - task, without defining a function or using decorator syntax. It provides - more flexibility in terms of dynamically setting the task parameters. - Additional detail can be provided as `context`. - """ - - if cast is NOT_PROVIDED: - if not task: - cast = None - else: - cast = str - - # load flow - flow = ctx.get("flow", None) - - # create tasks - if task: - ai_tasks = [Task[cast](objective=task, context=context)] - else: - ai_tasks = [] - - # run agent - agent = Agent(tasks=ai_tasks, flow=flow, user_access=user_access, **agent_kwargs) - agent.run() - - if ai_tasks: - if ai_tasks[0].status == TaskStatus.COMPLETED: - return ai_tasks[0].result - elif ai_tasks[0].status == TaskStatus.FAILED: - raise ValueError(ai_tasks[0].error) diff --git a/src/control_flow/core/agent.py b/src/control_flow/core/agent.py index 6fb7ecca..c13e823d 100644 --- a/src/control_flow/core/agent.py +++ b/src/control_flow/core/agent.py @@ -1,14 +1,41 @@ +import inspect +import json import logging from enum import Enum from typing import Callable +from marvin.types import FunctionTool from marvin.utilities.tools import tool_from_function +from prefect import task as prefect_task from pydantic import Field +from control_flow.utilities.prefect import ( + create_markdown_artifact, +) from control_flow.utilities.types import Assistant, AssistantTool, ControlFlowModel logger = logging.getLogger(__name__) +TOOL_CALL_FUNCTION_RESULT_TEMPLATE = inspect.cleandoc( + """ + ## Tool call: {name} + + **Description:** {description} + + ## Arguments + + ```json + {args} + ``` + + ### Result + + ```json + {result} + ``` + """ +) + class AgentStatus(Enum): INCOMPLETE = "incomplete" @@ -44,4 +71,55 @@ def get_tools(self, user_access: bool = None) -> list[AssistantTool | Callable]: tools = super().get_tools() if user_access: tools.append(tool_from_function(talk_to_human)) + + wrapped_tools = [] + for tool in tools: + wrapped_tools.append(self._wrap_prefect_tool(tool)) return tools + + def _wrap_prefect_tool(self, tool: AssistantTool | Callable) -> AssistantTool: + if not isinstance(tool, AssistantTool): + tool = tool_from_function(tool) + + if isinstance(tool, FunctionTool): + # for functions, we modify the function to become a Prefect task and + # publish an artifact that contains details about the function call + + async def modified_fn( + *args, + # provide default args to avoid a late-binding issue + original_fn: Callable = tool.function._python_fn, + tool: FunctionTool = tool, + **kwargs, + ): + # call fn + result = original_fn(*args, **kwargs) + + # prepare artifact + passed_args = ( + inspect.signature(original_fn).bind(*args, **kwargs).arguments + ) + try: + passed_args = json.dumps(passed_args, indent=2) + except Exception: + pass + create_markdown_artifact( + markdown=TOOL_CALL_FUNCTION_RESULT_TEMPLATE.format( + name=tool.function.name, + description=tool.function.description or "(none provided)", + args=passed_args, + result=result, + ), + key="result", + ) + + # return result + return result + + # replace the function with the modified version + tool.function._python_fn = prefect_task( + modified_fn, + task_run_name=f"Tool call: {tool.function.name}", + ) + + return tool diff --git a/src/control_flow/core/controller/controller.py b/src/control_flow/core/controller/controller.py index c5d178ca..cee12cea 100644 --- a/src/control_flow/core/controller/controller.py +++ b/src/control_flow/core/controller/controller.py @@ -1,8 +1,14 @@ +import json import logging -from typing import Self +from typing import Callable, Self +import prefect from marvin.beta.assistants import PrintHandler, Run from marvin.utilities.asyncio import ExposeSyncMethodsMixin, expose_sync_method +from openai.types.beta.threads.runs import ToolCall +from prefect import get_client as get_prefect_client +from prefect import task as prefect_task +from prefect.context import FlowRunContext from pydantic import BaseModel, Field, field_validator, model_validator from control_flow.core.agent import Agent @@ -13,6 +19,10 @@ from control_flow.core.flow import Flow from control_flow.core.task import Task, TaskStatus from control_flow.instructions import get_instructions as get_context_instructions +from control_flow.utilities.prefect import ( + create_json_artifact, + create_python_artifact, +) from control_flow.utilities.types import Thread logger = logging.getLogger(__name__) @@ -58,19 +68,42 @@ async def run_async(self): Run the control flow. """ - while True: - incomplete = any([t for t in self.tasks if t.status == TaskStatus.PENDING]) - if not incomplete: - break + # continue as long as there are incomplete tasks + while any([t for t in self.tasks if t.status == TaskStatus.PENDING]): + # select the next agent if len(self.agents) > 1: agent = self.delegation_strategy(self.agents) else: agent = self.agents[0] if not agent: return - await self.run_agent(agent=agent) - async def run_agent(self, agent: Agent, thread: Thread = None): + # run the agent + task = await self._get_prefect_run_agent_task(agent) + task(agent=agent) + + async def _get_prefect_run_agent_task( + self, agent: Agent, thread: Thread = None + ) -> Callable: + @prefect_task(task_run_name=f'Run Agent: "{agent.name}"') + async def _run_agent(agent: Agent, thread: Thread = None): + run = await self.run_agent(agent=agent, thread=thread) + + create_json_artifact( + key="messages", + data=[m.model_dump() for m in run.messages], + description="All messages sent and received during the run.", + ) + create_json_artifact( + key="actions", + data=[s.model_dump() for s in run.steps], + description="All actions taken by the assistant during the run.", + ) + return run + + return _run_agent + + async def run_agent(self, agent: Agent, thread: Thread = None) -> Run: """ Run a single agent. """ @@ -96,7 +129,7 @@ async def run_agent(self, agent: Agent, thread: Thread = None): thread=thread or self.flow.thread, instructions=instructions, tools=tools, - event_handler_class=PrintHandler, + event_handler_class=AgentHandler, ) await run.run_async() @@ -105,3 +138,73 @@ async def run_agent(self, agent: Agent, thread: Thread = None): def task_ids(self) -> dict[Task, int]: return {task: self.flow.get_task_id(task) for task in self.tasks} + + +class AgentHandler(PrintHandler): + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.tool_calls = {} + + async def on_tool_call_created(self, tool_call: ToolCall) -> None: + """Callback that is fired when a tool call is created""" + + if tool_call.type == "function": + task_run_name = "Prepare arguments for tool call" + else: + task_run_name = f"Tool call: {tool_call.type}" + + client = get_prefect_client() + engine_context = FlowRunContext.get() + if not engine_context: + return + + task_run = await client.create_task_run( + task=prefect.Task(fn=lambda: None), + name=task_run_name, + extra_tags=["tool-call"], + flow_run_id=engine_context.flow_run.id, + dynamic_key=tool_call.id, + state=prefect.states.Running(), + ) + + self.tool_calls[tool_call.id] = task_run + + async def on_tool_call_done(self, tool_call: ToolCall) -> None: + """Callback that is fired when a tool call is done""" + + client = get_prefect_client() + task_run = self.tool_calls.get(tool_call.id) + if not task_run: + return + await client.set_task_run_state( + task_run_id=task_run.id, state=prefect.states.Completed(), force=True + ) + + # code interpreter is run as a single call, so we can publish a result artifact + if tool_call.type == "code_interpreter": + # images = [] + # for output in tool_call.code_interpreter.outputs: + # if output.type == "image": + # image_path = download_temp_file(output.image.file_id) + # images.append(image_path) + + create_python_artifact( + key="code", + code=tool_call.code_interpreter.input, + description="Code executed in the code interpreter", + task_run_id=task_run.id, + ) + create_json_artifact( + key="output", + data=tool_call.code_interpreter.outputs, + description="Output from the code interpreter", + task_run_id=task_run.id, + ) + + elif tool_call.type == "function": + create_json_artifact( + key="arguments", + data=json.dumps(json.loads(tool_call.function.arguments), indent=2), + description=f"Arguments for the `{tool_call.function.name}` tool", + task_run_id=task_run.id, + ) diff --git a/src/control_flow/dx.py b/src/control_flow/dx.py index 52161038..fbf41b84 100644 --- a/src/control_flow/dx.py +++ b/src/control_flow/dx.py @@ -46,7 +46,6 @@ def wrapper( **kwargs, ): p_fn = prefect_flow(fn) - p_fn = fn flow_obj = Flow( **{