diff --git a/README.md b/README.md index 88518d2f..9462058b 100644 --- a/README.md +++ b/README.md @@ -1,15 +1,19 @@ ![image](https://github.com/jlowin/control_flow/assets/153965/c2a8a2f0-8777-49a6-a79b-a0e101bd4a04) - # ControlFlow -ControlFlow is a Python framework for orchestrating AI agents in workflows alongside traditional code. It allows you to seamlessly integrate AI into any workflow, coordinate multiple specialized AI agents, collect of human inputs when needed, and maintain full observability for debugging. +ControlFlow is a Python framework for orchestrating AI agents in workflows alongside traditional code. It allows you to declaratively define AI tasks, assign them to agents, and seamlessly integrate them into larger workflows. By providing a structured way to coordinate multiple AI agents, ControlFlow enables you to build sophisticated applications that leverage the power of AI while maintaining the control and flexibility of traditional programming. -ControlFlow is designed with the belief that AI works best when focused and iterated. It encourages breaking workflows into small, targeted steps, each handled by a dedicated AI agent. This keeps each AI as effective as possible, while maintaining context across the entire ensemble. ControlFlow recognizes that AI should augment traditional development, not replace it. It enables a declarative approach to AI, where the desired outcomes are specified and the framework handles the implementation details. This allows developers to mix AI and traditional code freely, leveraging AI where it's most useful while using standard programming everywhere else. +At its core, ControlFlow is built on the idea of agent orchestration. It provides a way to break down complex workflows into smaller, focused tasks that can be assigned to specialized AI agents. These agents can work autonomously on their assigned tasks, while the framework ensures smooth coordination and information sharing between them. This approach allows each agent to excel at its specific role, while the overall workflow benefits from their combined capabilities. 🚨 ControlFlow requires bleeding-edge versions of [Prefect](https://github.com/prefecthq/prefect) and [Marvin](https://github.com/prefecthq/marvin). Caveat emptor! +## Key Concepts + +- **Flow**: A container for an AI-enhanced workflow that maintains consistent context and history. Flows are defined with the `@ai_flow` decorator. +- **Task**: A discrete objective for AI agents to solve. Tasks can be defined with the `@ai_task` decorator or declared inline. +- **Agent**: an AI agent that can be assigned tasks ## Key Features @@ -32,7 +36,7 @@ pip install . ## Example ```python -from control_flow import ai_flow, ai_task, run_ai_task, instructions +from control_flow import ai_flow, ai_task, run_ai, instructions from pydantic import BaseModel @@ -54,15 +58,15 @@ def write_poem_about_user(name: Name, interests: list[str]) -> str: @ai_flow() def demo(): - # set instructions that will be used for multiple tasks with instructions("talk like a pirate"): - # define an AI task as a function and have it execute it name = get_user_name() # define an AI task inline - interests = run_ai_task("ask user for three interests", cast=list[str], user_access=True) + interests = run_ai( + "ask user for three interests", cast=list[str], user_access=True + ) # set instructions for just the next task with instructions("no more than 8 lines"): diff --git a/examples/readme_example.py b/examples/readme_example.py index 0ac0298b..ac073e15 100644 --- a/examples/readme_example.py +++ b/examples/readme_example.py @@ -1,4 +1,4 @@ -from control_flow import ai_flow, ai_task, instructions, run_ai_task +from control_flow import ai_flow, ai_task, instructions, run_ai from pydantic import BaseModel @@ -12,7 +12,7 @@ def get_user_name() -> Name: pass -@ai_task() +@ai_task def write_poem_about_user(name: Name, interests: list[str]) -> str: """write a poem based on the provided `name` and `interests`""" pass @@ -26,10 +26,8 @@ def demo(): name = get_user_name() # define an AI task inline - interests = run_ai_task( - "ask user for three interests", - cast=list[str], - user_access=True, + interests = run_ai( + "ask user for three interests", cast=list[str], user_access=True ) # set instructions for just the next task @@ -39,5 +37,5 @@ def demo(): return poem -if __name__ == "main": +if __name__ == "__main__": demo() diff --git a/pyproject.toml b/pyproject.toml index b9892c38..469fb4c1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,17 +3,31 @@ name = "control_flow" version = "0.1.0" description = "AI Workflows" authors = [ - { name = "Jeremiah Lowin", email = "153965+jlowin@users.noreply.github.com" } + { name = "Jeremiah Lowin", email = "153965+jlowin@users.noreply.github.com" }, ] dependencies = [ "marvin @ git+https://github.com/prefecthq/marvin@main", - "prefect @ git+https://github.com/prefecthq/prefect@main", + "prefect[dev] @ git+https://github.com/prefecthq/prefect@main", # can remove when prefect fully migrates to pydantic 2 "pydantic>=2", ] readme = "README.md" requires-python = ">= 3.9" -keywords = ["ai", "chatbot", "llm", "NLP", "natural language processing", "prefect", "workflow", "orchestration", "python", "GPT", "openai", "assistant", "agent"] +keywords = [ + "ai", + "chatbot", + "llm", + "NLP", + "natural language processing", + "prefect", + "workflow", + "orchestration", + "python", + "GPT", + "openai", + "assistant", + "agent", +] [project.urls] Code = "https://github.com/jlowin/ControlFlow" @@ -24,24 +38,19 @@ tests = [ "pytest-env>=0.8,<2.0", "pytest-rerunfailures>=10,<14", "pytest-sugar>=0.9,<2.0", - "pytest>=8.1.1", + "pytest>=7.0", "pytest-timeout", "pytest-xdist", "pre-commit>=3.7.0", ] -dev = [ - "control_flow[tests]", - "ipython>=8.22.2", - "pdbpp>=0.10.3", - "ruff>=0.3.4", -] +dev = ["control_flow[tests]", "ipython>=8.22.2", "pdbpp>=0.10.3", "ruff>=0.3.4"] [build-system] requires = ["hatchling"] build-backend = "hatchling.build" [tool.rye] -managed = true +managed = true [tool.hatch.metadata] allow-direct-references = true @@ -62,4 +71,5 @@ skip-magic-trailing-comma = false [tool.ruff.lint.per-file-ignores] "__init__.py" = ['I', 'F401', 'E402'] "conftest.py" = ["F401", "F403"] -'tests/fixtures/*.py' = ['F403'] +'tests/fixtures/*.py' = ['F401', 'F403'] +"src/control_flow/utilities/types.py" = ['F401'] diff --git a/src/control_flow/__init__.py b/src/control_flow/__init__.py index 371a8e4a..6f3237e5 100644 --- a/src/control_flow/__init__.py +++ b/src/control_flow/__init__.py @@ -1,7 +1,8 @@ from .settings import settings -from .agent import ai_task, Agent, run_ai_task -from .flow import ai_flow +# from .agent_old import ai_task, Agent, run_ai +from .core.flow import Flow +from .core.agent import Agent +from .core.controller.controller import Controller from .instructions import instructions - -from marvin.beta.assistants import Assistant +from .dx import ai_flow, run_ai, ai_task diff --git a/src/control_flow/agent.py b/src/control_flow/agent.py deleted file mode 100644 index 39ee2088..00000000 --- a/src/control_flow/agent.py +++ /dev/null @@ -1,593 +0,0 @@ -import functools -import inspect -import json -import logging -from datetime import datetime -from typing import Callable, Generic, TypeVar, Union -from zoneinfo import ZoneInfo - -import marvin -import marvin.utilities.tools -import prefect -from marvin.beta.assistants.assistants import Assistant -from marvin.beta.assistants.handlers import PrintHandler -from marvin.beta.assistants.runs import Run -from marvin.tools.assistants import AssistantTool, EndRun -from marvin.types import FunctionTool -from marvin.utilities.asyncio import ExposeSyncMethodsMixin, expose_sync_method -from marvin.utilities.jinja import BaseEnvironment -from openai.types.beta.threads.runs import ToolCall -from prefect import get_client as get_prefect_client -from prefect import task as prefect_task -from prefect.context import FlowRunContext -from pydantic import BaseModel, Field, field_validator - -from control_flow import settings -from control_flow.context import ctx -from control_flow.delegation import DelegationStrategy, RoundRobin -from control_flow.utilities.prefect import ( - create_json_artifact, - create_markdown_artifact, - create_python_artifact, -) - -from .flow import AIFlow -from .task import AITask, TaskStatus - -T = TypeVar("T") -logger = logging.getLogger(__name__) - -NOT_PROVIDED = object() - -jinja_environment = BaseEnvironment( - globals={ - "now": lambda: datetime.now(ZoneInfo("UTC")), - "inspect": inspect, - "id": id, - } -) - - -TOOL_CALL_FUNCTION_RESULT_TEMPLATE = inspect.cleandoc( - """ - ## Tool call: {name} - - **Description:** {description} - - ## Arguments - - ```json - {args} - ``` - - ### Result - - ```json - {result} - ``` - """ -) - - -INSTRUCTIONS = """ -You are an AI assistant. Your name is `{{ assistant.name}}`. Your job is to -complete the tasks assigned to you. You were created by a software application, -and any messages you receive are from that software application, not a user. You -may use any tools at your disposal to complete the task, including talking to a -human user. - - -## Instructions - -Follow these instructions at all times: - -{% if assistant.instructions -%} -- {{ assistant.instructions }} -{% endif %} -{% if flow.instructions -%} -- {{ flow.instructions }} -{% endif %} -{% if agent.instructions -%} -- {{ agent.instructions }} -{% endif %} -{% for instruction in instructions %} -- {{ instruction }} -{% endfor %} - - -## Assistants - -The following assistants are collaborating to complete the tasks. - -{% for a in agent.assistants %} -- {{ a.name }} {%- if id(a) == id(assistant) %} (this is you){%endif%}: {{ a.description | default("(no description provided)")}} -{% endfor %} - -{% if agent.assistants|length > 1 %} -Use the `end_run` tool to end your turn and allow another assistant to take -over, without marking a task as complete or failed. -{% endif %} - - - -## Tasks - -{% if agent.tasks %} -You have been assigned the following tasks. You will continue to run until all -tasks are finished. It may take multiple attempts, iterations, or tool uses to -complete a task. When a task is finished, mark it as `completed` -(and provide a result, if required) or `failed` (with a brief explanation) by -using the appropriate tool. Do not mark a task as complete if you don't have a -complete result. Do not make up results. If you receive only partial or unclear -information from a user, keep working until you have all the information you -need. Be very sure that a task is truly unsolvable before marking it as failed, -especially when working with a human user. - - -{% for task_id, task in agent.numbered_tasks() %} -### Task {{ task_id }} -- Status: {{ task.status.value }} -- Objective: {{ task.objective }} -{% if task.instructions %} -- Additional instructions: {{ task.instructions }} -{% endif %} -{% if task.status.value == "completed" %} -- Result: {{ task.result }} -{% elif task.status.value == "failed" %} -- Error: {{ task.error }} -{% endif %} -{% if task.context %} -- Context: {{ task.context }} -{% endif %} - -{% endfor %} -{% else %} -You have no explicit tasks to complete. Follow your instructions as best as you -can. If it is not possible to comply with the instructions in any way, use the -`end_run` tool to manually stop the run. -{% endif %} - -## Communication - -All messages you receive in the thread are generated by the software that -created you, not a human user. All messages you send are sent only to that -software and are never seen by any human. - -{% if agent.system_access -%} -The software that created you is an AI capable of processing natural language, -so you can freely respond by posting messages to the thread. There may be more -than one AI posting responses, so preface your messages with your own name to -avoid confusion. For example, if your name is "Marvin", start all of your -messages with "Marvin:". This is not required for tool calls. -{% else %} -The software that created you is a Python script that can only process -structured responses produced by your tools. DO NOT POST ANY MESSAGES OR RESPONSES TO THE -THREAD. They will be ignored and only waste time. ONLY USE TOOLS TO RESPOND. -{% endif %} - -{% if agent.user_access -%} -There is also a human user who may be involved in the task. You can communicate -with them using the `talk_to_human` tool. The user is a human and unaware of -your tasks or this system. Do not mention your tasks or anything about how the -system works to them. They can only see messages you send them via tool, not the -rest of the thread. When dealing with humans, you may not always get a clear or -correct response. You may need to ask multiple times or rephrase your questions. -You should also interpret human responses broadly and not be too literal. -{% else %} -You can not communicate with a human user at this time. -{% endif %} - - -{% if context %} -## Additional context - -The following context was provided: -{% for key, value in context.items() -%} -- {{ key }}: {{ value }} -{% endfor %} -{% endif %} -""" - - -def talk_to_human(message: str, get_response: bool = True) -> str: - """ - Send a message to the human user and optionally wait for a response. - If `get_response` is True, the function will return the user's response, - otherwise it will return a simple confirmation. - """ - print(message) - if get_response: - response = input("> ") - return response - return "Message sent to user" - - -def end_run(): - """Use this tool to end the run without marking a task as complete or failed.""" - return EndRun() - - -class Agent(BaseModel, Generic[T], ExposeSyncMethodsMixin): - tasks: list[AITask] = [] - flow: AIFlow = Field(None, validate_default=True) - assistants: list = Field(None, validate_default=True) - delegation_strategy: DelegationStrategy = Field( - validate_default=True, - description="The strategy for delegating work to assistants.", - default_factory=RoundRobin, - ) - tools: list[Union[AssistantTool, Assistant, Callable]] = [] - context: dict = Field(None, validate_default=True) - user_access: bool = Field( - None, - validate_default=True, - description="If True, the agent is given tools for interacting with a human user.", - ) - system_access: bool = Field( - None, - validate_default=True, - description="If True, the agent will communicate with the system via messages. " - "This is usually only used when the agent was spawned by another " - "agent capable of understanding its responses.", - ) - instructions: str = None - model_config: dict = dict(arbitrary_types_allowed=True, extra="forbid") - - @field_validator("flow", mode="before") - def _load_flow_from_ctx(cls, v): - if v is None: - v = ctx.get("flow", None) - if v is None: - v = AIFlow() - return v - - @field_validator("context", mode="before") - def _default_context(cls, v): - if v is None: - v = {} - return v - - @field_validator("assistants", mode="before") - def _default_assistants(cls, v): - if v is None: - flow = ctx.get("flow") - if flow: - v = flow.assistants - if v is None: - v = [Assistant()] - return v - - @field_validator("user_access", "system_access", mode="before") - def _default_access(cls, v): - if v is None: - v = False - return v - - def numbered_tasks(self) -> list[tuple[int, AITask]]: - return [(i + 1, task) for i, task in enumerate(self.tasks)] - - def _get_instructions(self, assistant: Assistant, context: dict = None): - instructions = jinja_environment.render( - INSTRUCTIONS, - agent=self, - flow=self.flow, - assistant=assistant, - instructions=ctx.get("instructions", []), - context={**self.context, **(context or {})}, - ) - return instructions - - def _get_tools(self, assistant: Assistant) -> list[AssistantTool]: - tools = self.flow.tools + self.tools + assistant.tools - - if not self.tasks or len(self.assistants) > 1: - tools.append(end_run) - - # if there is only one task, and the agent can't send a response to the - # system, then we can quit as soon as it is marked finished - if not self.system_access and len(self.tasks) == 1: - early_end_run = True - else: - early_end_run = False - - for i, task in self.numbered_tasks(): - tools.extend( - [ - task._create_complete_tool(task_id=i, end_run=early_end_run), - task._create_fail_tool(task_id=i, end_run=early_end_run), - ] - ) - - if self.user_access: - tools.append(talk_to_human) - - final_tools = [] - for tool in tools: - if not isinstance(tool, AssistantTool): - tool = marvin.utilities.tools.tool_from_function(tool) - - if isinstance(tool, FunctionTool): - - async def modified_fn( - *args, - # provide default args to avoid a late-binding issue - original_fn: Callable = tool.function._python_fn, - tool: FunctionTool = tool, - **kwargs, - ): - # call fn - result = original_fn(*args, **kwargs) - - passed_args = ( - inspect.signature(original_fn).bind(*args, **kwargs).arguments - ) - try: - passed_args = json.dumps(passed_args, indent=2) - except Exception: - pass - create_markdown_artifact( - markdown=TOOL_CALL_FUNCTION_RESULT_TEMPLATE.format( - name=tool.function.name, - description=tool.function.description or "(none provided)", - args=passed_args, - result=result, - ), - key="result", - ) - return result - - tool.function._python_fn = prefect_task( - modified_fn, - task_run_name=f"Tool call: {tool.function.name}", - ) - final_tools.append(tool) - return final_tools - - def _get_run_assistant_task(self): - """ - Helper function for building the task that will execute the OpenAI assistant run. - This needs to be regenerated each time in case the instructions change. - """ - - def _name_from_assistant(): - """Helper function for naming task runs""" - from prefect.runtime import task_run - - assistant = task_run.parameters.get("assistant") - return f"Run OpenAI assistant ({assistant.name})" - - @prefect_task(task_run_name=_name_from_assistant) - async def run_openai_assistant( - assistant: Assistant, context: dict = None, run_kwargs: dict = None - ) -> Run: - run_kwargs = run_kwargs or {} - model = run_kwargs.pop( - "model", - assistant.model or self.flow.model or settings.assistant_model, - ) - thread = run_kwargs.pop("thread", self.flow.thread) - - run = Run( - assistant=assistant, - thread=thread, - instructions=self._get_instructions( - assistant=assistant, context=context - ), - tools=self._get_tools(assistant=assistant), - event_handler_class=AgentHandler, - model=model, - **run_kwargs, - ) - await run.run_async() - create_json_artifact( - key="messages", - # dump explicilty because of odd OAI serialization issue - data=[m.model_dump() for m in run.messages], - description="All messages sent and received during the run.", - ) - create_json_artifact( - key="actions", - # dump explicilty because of odd OAI serialization issue - data=[s.model_dump() for s in run.steps], - description="All actions taken by the assistant during the run.", - ) - return run - - return run_openai_assistant - - @expose_sync_method("run") - async def run_async(self, context: dict = None, **run_kwargs) -> list[AITask]: - assistants_generator = self.delegation_strategy.run(self.assistants) - openai_run = self._get_run_assistant_task() - - openai_run( - assistant=next(assistants_generator), - context=context, - run_kwargs=run_kwargs, - ) - - # if this AI can't post messages to the system, then continue to invoke - # it until all tasks are finished - if not self.system_access: - counter = 0 - while ( - any(t.status == TaskStatus.PENDING for t in self.tasks) - and counter < settings.max_agent_iterations - ): - openai_run( - assistant=next(assistants_generator), - context=context, - run_kwargs=run_kwargs, - ) - counter += 1 - - result = [t.result for t in self.tasks if t.status == TaskStatus.COMPLETED] - - return result - - -class AgentHandler(PrintHandler): - def __init__(self, **kwargs): - super().__init__(**kwargs) - self.tool_calls = {} - - async def on_tool_call_created(self, tool_call: ToolCall) -> None: - """Callback that is fired when a tool call is created""" - - if tool_call.type == "function": - task_run_name = "Prepare arguments for tool call" - else: - task_run_name = f"Tool call: {tool_call.type}" - - client = get_prefect_client() - engine_context = FlowRunContext.get() - if not engine_context: - return - - task_run = await client.create_task_run( - task=prefect.Task(fn=lambda: None), - name=task_run_name, - extra_tags=["tool-call"], - flow_run_id=engine_context.flow_run.id, - dynamic_key=tool_call.id, - state=prefect.states.Running(), - ) - - self.tool_calls[tool_call.id] = task_run - - async def on_tool_call_done(self, tool_call: ToolCall) -> None: - """Callback that is fired when a tool call is done""" - - client = get_prefect_client() - task_run = self.tool_calls.get(tool_call.id) - if not task_run: - return - await client.set_task_run_state( - task_run_id=task_run.id, state=prefect.states.Completed(), force=True - ) - - # code interpreter is run as a single call, so we can publish a result artifact - if tool_call.type == "code_interpreter": - # images = [] - # for output in tool_call.code_interpreter.outputs: - # if output.type == "image": - # image_path = download_temp_file(output.image.file_id) - # images.append(image_path) - - create_python_artifact( - key="code", - code=tool_call.code_interpreter.input, - description="Code executed in the code interpreter", - task_run_id=task_run.id, - ) - create_json_artifact( - key="output", - data=tool_call.code_interpreter.outputs, - description="Output from the code interpreter", - task_run_id=task_run.id, - ) - - elif tool_call.type == "function": - create_json_artifact( - key="arguments", - data=json.dumps(json.loads(tool_call.function.arguments), indent=2), - description=f"Arguments for the `{tool_call.function.name}` tool", - task_run_id=task_run.id, - ) - - -def ai_task( - fn=None, *, objective: str = None, user_access: bool = None, **agent_kwargs: dict -): - """ - Use a Python function to create an AI task. When the function is called, an - agent is created to complete the task and return the result. - """ - - if fn is None: - return functools.partial( - ai_task, objective=objective, user_access=user_access, **agent_kwargs - ) - - sig = inspect.signature(fn) - - if objective is None: - if fn.__doc__: - objective = f"{fn.__name__}: {fn.__doc__}" - else: - objective = fn.__name__ - - @functools.wraps(fn) - def wrapper(*args, **kwargs): - # first process callargs - bound = sig.bind(*args, **kwargs) - bound.apply_defaults() - - return run_ai_task.with_options(name=f"Task: {fn.__name__}")( - task=objective, - cast=fn.__annotations__.get("return"), - context=bound.arguments, - user_access=user_access, - **agent_kwargs, - ) - - return wrapper - - -def _name_from_objective(): - """Helper function for naming task runs""" - from prefect.runtime import task_run - - objective = task_run.parameters.get("task") - - if not objective: - objective = "Follow general instructions" - if len(objective) > 75: - return f"Task: {objective[:75]}..." - return f"Task: {objective}" - - -@prefect_task(task_run_name=_name_from_objective) -def run_ai_task( - task: str = None, - cast: T = NOT_PROVIDED, - context: dict = None, - user_access: bool = None, - model: str = None, - **agent_kwargs: dict, -) -> T: - """ - Create and run an agent to complete a task with the given objective and - context. This function is similar to an inline version of the @ai_task - decorator. - - This inline version is useful when you want to create and run an ad-hoc AI - task, without defining a function or using decorator syntax. It provides - more flexibility in terms of dynamically setting the task parameters. - Additional detail can be provided as `context`. - """ - - if cast is NOT_PROVIDED: - if not task: - cast = None - else: - cast = str - - # load flow - flow = ctx.get("flow", None) - - # create tasks - if task: - ai_tasks = [AITask[cast](objective=task, context=context)] - else: - ai_tasks = [] - - # run agent - agent = Agent(tasks=ai_tasks, flow=flow, user_access=user_access, **agent_kwargs) - agent.run(model=model) - - if ai_tasks: - if ai_tasks[0].status == TaskStatus.COMPLETED: - return ai_tasks[0].result - elif ai_tasks[0].status == TaskStatus.FAILED: - raise ValueError(ai_tasks[0].error) diff --git a/src/control_flow/core/__init__.py b/src/control_flow/core/__init__.py new file mode 100644 index 00000000..f6856568 --- /dev/null +++ b/src/control_flow/core/__init__.py @@ -0,0 +1,4 @@ +from .task import Task, TaskStatus +from .flow import Flow +from .agent import Agent +from .controller import Controller diff --git a/src/control_flow/core/agent.py b/src/control_flow/core/agent.py new file mode 100644 index 00000000..c13e823d --- /dev/null +++ b/src/control_flow/core/agent.py @@ -0,0 +1,125 @@ +import inspect +import json +import logging +from enum import Enum +from typing import Callable + +from marvin.types import FunctionTool +from marvin.utilities.tools import tool_from_function +from prefect import task as prefect_task +from pydantic import Field + +from control_flow.utilities.prefect import ( + create_markdown_artifact, +) +from control_flow.utilities.types import Assistant, AssistantTool, ControlFlowModel + +logger = logging.getLogger(__name__) + +TOOL_CALL_FUNCTION_RESULT_TEMPLATE = inspect.cleandoc( + """ + ## Tool call: {name} + + **Description:** {description} + + ## Arguments + + ```json + {args} + ``` + + ### Result + + ```json + {result} + ``` + """ +) + + +class AgentStatus(Enum): + INCOMPLETE = "incomplete" + COMPLETE = "complete" + + +def talk_to_human(message: str, get_response: bool = True) -> str: + """ + Send a message to the human user and optionally wait for a response. + If `get_response` is True, the function will return the user's response, + otherwise it will return a simple confirmation. + """ + print(message) + if get_response: + response = input("> ") + return response + return "Message sent to user" + + +class Agent(Assistant, ControlFlowModel): + user_access: bool = Field( + False, + description="If True, the agent is given tools for interacting with a human user.", + ) + controller_access: bool = Field( + False, + description="If True, the agent will communicate with the controller via messages.", + ) + + def get_tools(self, user_access: bool = None) -> list[AssistantTool | Callable]: + if user_access is None: + user_access = self.user_access + tools = super().get_tools() + if user_access: + tools.append(tool_from_function(talk_to_human)) + + wrapped_tools = [] + for tool in tools: + wrapped_tools.append(self._wrap_prefect_tool(tool)) + return tools + + def _wrap_prefect_tool(self, tool: AssistantTool | Callable) -> AssistantTool: + if not isinstance(tool, AssistantTool): + tool = tool_from_function(tool) + + if isinstance(tool, FunctionTool): + # for functions, we modify the function to become a Prefect task and + # publish an artifact that contains details about the function call + + async def modified_fn( + *args, + # provide default args to avoid a late-binding issue + original_fn: Callable = tool.function._python_fn, + tool: FunctionTool = tool, + **kwargs, + ): + # call fn + result = original_fn(*args, **kwargs) + + # prepare artifact + passed_args = ( + inspect.signature(original_fn).bind(*args, **kwargs).arguments + ) + try: + passed_args = json.dumps(passed_args, indent=2) + except Exception: + pass + create_markdown_artifact( + markdown=TOOL_CALL_FUNCTION_RESULT_TEMPLATE.format( + name=tool.function.name, + description=tool.function.description or "(none provided)", + args=passed_args, + result=result, + ), + key="result", + ) + + # return result + return result + + # replace the function with the modified version + tool.function._python_fn = prefect_task( + modified_fn, + task_run_name=f"Tool call: {tool.function.name}", + ) + + return tool diff --git a/src/control_flow/core/controller/__init__.py b/src/control_flow/core/controller/__init__.py new file mode 100644 index 00000000..5319894d --- /dev/null +++ b/src/control_flow/core/controller/__init__.py @@ -0,0 +1,2 @@ +from .controller import Controller +from .delegation import RoundRobin diff --git a/src/control_flow/core/controller/controller.py b/src/control_flow/core/controller/controller.py new file mode 100644 index 00000000..cee12cea --- /dev/null +++ b/src/control_flow/core/controller/controller.py @@ -0,0 +1,210 @@ +import json +import logging +from typing import Callable, Self + +import prefect +from marvin.beta.assistants import PrintHandler, Run +from marvin.utilities.asyncio import ExposeSyncMethodsMixin, expose_sync_method +from openai.types.beta.threads.runs import ToolCall +from prefect import get_client as get_prefect_client +from prefect import task as prefect_task +from prefect.context import FlowRunContext +from pydantic import BaseModel, Field, field_validator, model_validator + +from control_flow.core.agent import Agent +from control_flow.core.controller.delegation import ( + DelegationStrategy, + RoundRobin, +) +from control_flow.core.flow import Flow +from control_flow.core.task import Task, TaskStatus +from control_flow.instructions import get_instructions as get_context_instructions +from control_flow.utilities.prefect import ( + create_json_artifact, + create_python_artifact, +) +from control_flow.utilities.types import Thread + +logger = logging.getLogger(__name__) + + +class Controller(BaseModel, ExposeSyncMethodsMixin): + flow: Flow + agents: list[Agent] + tasks: list[Task] = Field( + description="Tasks that the controller will complete.", + default_factory=list, + ) + delegation_strategy: DelegationStrategy = Field( + validate_default=True, + description="The strategy for delegating work to assistants.", + default_factory=RoundRobin, + ) + # termination_strategy: TerminationStrategy + context: dict = {} + instructions: str = None + user_access: bool | None = Field( + None, + description="If True or False, overrides the user_access of the " + "agents. If None, the user_access setting of each agents is used.", + ) + model_config: dict = dict(extra="forbid") + + @field_validator("agents", mode="before") + def _validate_agents(cls, v): + if not v: + raise ValueError("At least one agent is required.") + return v + + @model_validator(mode="after") + def _add_tasks_to_flow(self) -> Self: + for task in self.tasks: + self.flow.add_task(task) + return self + + @expose_sync_method("run") + async def run_async(self): + """ + Run the control flow. + """ + + # continue as long as there are incomplete tasks + while any([t for t in self.tasks if t.status == TaskStatus.PENDING]): + # select the next agent + if len(self.agents) > 1: + agent = self.delegation_strategy(self.agents) + else: + agent = self.agents[0] + if not agent: + return + + # run the agent + task = await self._get_prefect_run_agent_task(agent) + task(agent=agent) + + async def _get_prefect_run_agent_task( + self, agent: Agent, thread: Thread = None + ) -> Callable: + @prefect_task(task_run_name=f'Run Agent: "{agent.name}"') + async def _run_agent(agent: Agent, thread: Thread = None): + run = await self.run_agent(agent=agent, thread=thread) + + create_json_artifact( + key="messages", + data=[m.model_dump() for m in run.messages], + description="All messages sent and received during the run.", + ) + create_json_artifact( + key="actions", + data=[s.model_dump() for s in run.steps], + description="All actions taken by the assistant during the run.", + ) + return run + + return _run_agent + + async def run_agent(self, agent: Agent, thread: Thread = None) -> Run: + """ + Run a single agent. + """ + from control_flow.core.controller.instruction_template import MainTemplate + + instructions_template = MainTemplate( + agent=agent, + controller=self, + context=self.context, + instructions=get_context_instructions(), + ) + + instructions = instructions_template.render() + + tools = self.flow.tools + agent.get_tools(user_access=self.user_access) + + for task in self.tasks: + task_id = self.flow.get_task_id(task) + tools = tools + task.get_tools(task_id=task_id) + + run = Run( + assistant=agent, + thread=thread or self.flow.thread, + instructions=instructions, + tools=tools, + event_handler_class=AgentHandler, + ) + + await run.run_async() + + return run + + def task_ids(self) -> dict[Task, int]: + return {task: self.flow.get_task_id(task) for task in self.tasks} + + +class AgentHandler(PrintHandler): + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.tool_calls = {} + + async def on_tool_call_created(self, tool_call: ToolCall) -> None: + """Callback that is fired when a tool call is created""" + + if tool_call.type == "function": + task_run_name = "Prepare arguments for tool call" + else: + task_run_name = f"Tool call: {tool_call.type}" + + client = get_prefect_client() + engine_context = FlowRunContext.get() + if not engine_context: + return + + task_run = await client.create_task_run( + task=prefect.Task(fn=lambda: None), + name=task_run_name, + extra_tags=["tool-call"], + flow_run_id=engine_context.flow_run.id, + dynamic_key=tool_call.id, + state=prefect.states.Running(), + ) + + self.tool_calls[tool_call.id] = task_run + + async def on_tool_call_done(self, tool_call: ToolCall) -> None: + """Callback that is fired when a tool call is done""" + + client = get_prefect_client() + task_run = self.tool_calls.get(tool_call.id) + if not task_run: + return + await client.set_task_run_state( + task_run_id=task_run.id, state=prefect.states.Completed(), force=True + ) + + # code interpreter is run as a single call, so we can publish a result artifact + if tool_call.type == "code_interpreter": + # images = [] + # for output in tool_call.code_interpreter.outputs: + # if output.type == "image": + # image_path = download_temp_file(output.image.file_id) + # images.append(image_path) + + create_python_artifact( + key="code", + code=tool_call.code_interpreter.input, + description="Code executed in the code interpreter", + task_run_id=task_run.id, + ) + create_json_artifact( + key="output", + data=tool_call.code_interpreter.outputs, + description="Output from the code interpreter", + task_run_id=task_run.id, + ) + + elif tool_call.type == "function": + create_json_artifact( + key="arguments", + data=json.dumps(json.loads(tool_call.function.arguments), indent=2), + description=f"Arguments for the `{tool_call.function.name}` tool", + task_run_id=task_run.id, + ) diff --git a/src/control_flow/core/controller/delegation.py b/src/control_flow/core/controller/delegation.py new file mode 100644 index 00000000..82875d20 --- /dev/null +++ b/src/control_flow/core/controller/delegation.py @@ -0,0 +1,91 @@ +import itertools +from typing import TYPE_CHECKING, Any, Generator, Iterator + +from pydantic import BaseModel, PrivateAttr + +from control_flow.core.agent import Agent + +if TYPE_CHECKING: + from control_flow.core.agent import Agent + + +class DelegationStrategy(BaseModel): + """ + A DelegationStrategy is a strategy for delegating tasks to AI assistants. + """ + + def __call__(self, agents: list["Agent"]) -> Agent: + """ + Run the delegation strategy with a list of agents. + """ + return self._next_agent(agents) + + def _next_agent(self, agents: list["Agent"], **kwargs) -> "Agent": + """ + Select an agent from a list of agents. + """ + raise NotImplementedError() + + +class Single(DelegationStrategy): + """ + A Single delegation strategy delegates tasks to a single agent. This is useful for debugging. + """ + + agent: Agent + + def _next_agent(self, agents: list[Agent], **kwargs) -> Generator[Any, Any, Agent]: + """ + Given a list of potential agents, choose the single agent. + """ + if self.agent in agents: + return self.agent + + +class RoundRobin(DelegationStrategy): + """ + A RoundRobin delegation strategy delegates tasks to AI assistants in a round-robin fashion. + """ + + _cycle: Iterator[Agent] = PrivateAttr(None) + + def _next_agent(self, agents: list["Agent"], **kwargs) -> "Agent": + """ + Given a list of potential agents, delegate the tasks in a round-robin fashion. + """ + # the first time this is called, create a cycle iterator + if self._cycle is None: + self._cycle = itertools.cycle(agents) + return next(self._cycle) + + +# class Moderator(DelegationStrategy): +# """ +# A Moderator delegation strategy delegates tasks to the most qualified AI assistant, using a Marvin classifier +# """ + +# model: str = None + +# def _next_agent( +# self, agents: list["Agent"], tasks: list[Task], history: list[Message] +# ) -> "Agent": +# """ +# Given a list of potential agents, choose the most qualified assistant to complete the tasks. +# """ + +# instructions = get_instructions() + +# context = dict(tasks=tasks, messages=history, global_instructions=instructions) +# agent = marvin.classify( +# context, +# [a for a in agents if a.status == AgentStatus.INCOMPLETE], +# instructions=""" +# Given the conversation context, choose the AI agent most +# qualified to take the next turn at completing the tasks. Take into +# account the instructions, each agent's own instructions, and the +# tools they have available. +# """, +# model_kwargs=dict(model=self.model), +# ) + +# return agent diff --git a/src/control_flow/core/controller/instruction_template.py b/src/control_flow/core/controller/instruction_template.py new file mode 100644 index 00000000..c597167e --- /dev/null +++ b/src/control_flow/core/controller/instruction_template.py @@ -0,0 +1,272 @@ +import inspect + +from pydantic import BaseModel + +from control_flow.core.agent import Agent +from control_flow.utilities.jinja import jinja_env +from control_flow.utilities.types import ControlFlowModel + +from .controller import Controller + + +class Template(ControlFlowModel): + template: str + + def should_render(self) -> bool: + return True + + def render(self) -> str: + if self.should_render(): + render_kwargs = dict(self) + render_kwargs.pop("template") + return jinja_env.render(inspect.cleandoc(self.template), **render_kwargs) + + +class AgentTemplate(Template): + template: str = """ + You are an AI agent. Your name is "{{ agent.name }}". + + {% if agent.description %} + + The following description has been provided for you: + {{ agent.description }} + + {% endif -%} + + Your job is to work on any pending tasks until you can mark them as either + `complete` or `failed`. The following instructions will provide you with all + the context you need to complete your tasks. Note that using a tool to + complete or fail a task is your ultimate objective, and you should not post + any messages to the thread unless you have a specific reason to do so. + """ + agent: Agent + + +class CommunicationTemplate(Template): + template: str = """ + ## Communciation + + ### Posting messages to the thread + + You have been created by a Controller in the Python library ControlFlow in + order to complete various tasks or instructions. All messages in this thread + are either from the controller or from AI agents like you. Note that all + agents post to the thread with the `Assistant` role, so if you do need to + post a message, preface with your name (e.g. "{{ agent.name }}: Hello!") in + order to distinguish your messages. + + The controller CAN NOT and WILL NOT read your messages, so DO NOT post + messages unless you need to send information to another agent. DO NOT post + messages about information already captured by your tool calls, such as the + tool call itself, its result, human responses, or task completion. + + ### Talking to humans + + {% if agent.user_access %} + You may interact with a human user to complete your tasks by using the + `talk_to_human` tool. The human is unaware of your tasks or the controller. + Do not mention them or anything else about how this system works. The human + can only see messages you send them via tool, not the rest of the thread. + + Humans may give poor, incorrect, or partial responses. You may need to ask + questions multiple times in order to complete your tasks. Use good judgement + to determine the best way to achieve your goal. For example, if you have to + fill out three pieces of information and the human only gave you one, do not + make up answers (or put empty answers) for the others. Ask again and only + fail the task if you truly can not make progress. + {% else %} + You can not interact with a human at this time. + {% endif %} + + """ + + agent: Agent + + +class CollaborationTemplate(Template): + template: str = """ + ## Collaboration + + You are collaborating with other AI agents. They are listed below by name, + along with a brief description. Note that all agents post messages to the + same thread with the `Assistant` role, so pay attention to the name of the + agent that is speaking. Only one agent needs to indicate that a task is + complete. + + ### Agents + {% for agent in other_agents %} + {{loop.index}}. "{{agent.name}}": {{agent.description}} + {% endfor %} + {% if not other_agents %} + (There are no other agents currently participating in this workflow) + {% endif %} + """ + other_agents: list[Agent] + + +class InstructionsTemplate(Template): + template: str = """ + ## Instructions + + {% if flow_instructions %} + ### Workflow instructions + + These instructions apply to the entire workflow: + + {{ flow_instructions }} + {% endif %} + + {% if controller_instructions %} + ### Controller instructions + + These instructions apply to these tasks: + + {{ controller_instructions }} + {% endif %} + + {% if agent_instructions %} + ### Agent instructions + + These instructions apply only to you: + + {{ agent_instructions }} + {% endif %} + + {% if additional_instructions %} + ### Additional instructions + + These instructions were additionally provided for this part of the workflow: + + {% for instruction in additional_instructions %} + - {{ instruction }} + {% endfor %} + {% endif %} + """ + flow_instructions: str | None = None + controller_instructions: str | None = None + agent_instructions: str | None = None + additional_instructions: list[str] = [] + + def should_render(self): + return any( + [ + self.flow_instructions, + self.controller_instructions, + self.agent_instructions, + self.additional_instructions, + ] + ) + + +class TasksTemplate(Template): + template: str = """ + ## Tasks + + ### Active tasks + + The following tasks are pending. You and any other agents are responsible + for completing them and will continue to be invoked until you mark each + task as either "completed" or "failed" with the appropriate tool. + + Note: Task IDs are assigned for identification purposes only and will be + resused after tasks complete. + + {% for task in controller.tasks %} + {% if task.status.value == "pending" %} + #### Task {{ controller.flow.get_task_id(task) }} + - Status: {{ task.status.value }} + - Objective: {{ task.objective }} + {% if task.instructions %} + - Instructions: {{ task.instructions }} + {% endif %} + {% if task.status.value == "completed" %} + - Result: {{ task.result }} + {% elif task.status.value == "failed" %} + - Error: {{ task.error }} + {% endif %} + {% if task.context %} + - Context: {{ task.context }} + {% endif %} + + {% endif %} + {% endfor %} + + ### Completed tasks + The following tasks were recently completed: + + {% for task in controller.flow.completed_tasks(reverse=True, limit=20) %} + #### Task {{ controller.flow.get_task_id(task) }} + - Status: {{ task.status.value }} + - Objective: {{ task.objective }} + {% if task.status.value == "completed" %} + - Result: {{ task.result }} + {% elif task.status.value == "failed" %} + - Error: {{ task.error }} + {% endif %} + {% if task.context %} + - Context: {{ task.context }} + {% endif %} + + {% endfor %} + """ + controller: Controller + + def should_render(self): + return any(self.controller.tasks) + + +class ContextTemplate(Template): + template: str = """ + ## Context + + {% if flow_context %} + ### Flow context + {% for key, value in flow_context.items() %} + - *{{ key }}*: {{ value }} + {% endfor %} + {% endif %} + + {% if controller_context %} + ### Controller context + {% for key, value in controller_context.items() %} + - *{{ key }}*: {{ value }} + {% endfor %} + {% endif %} + """ + flow_context: dict + controller_context: dict + + def should_render(self): + return bool(self.flow_context or self.controller_context) + + +class MainTemplate(BaseModel): + agent: Agent + controller: Controller + context: dict + instructions: list[str] + + def render(self): + templates = [ + AgentTemplate(agent=self.agent), + TasksTemplate(controller=self.controller), + ContextTemplate( + flow_context=self.controller.flow.context, + controller_context=self.controller.context, + ), + InstructionsTemplate( + flow_instructions=self.controller.flow.instructions, + controller_instructions=self.controller.instructions, + agent_instructions=self.agent.instructions, + additional_instructions=self.instructions, + ), + CommunicationTemplate(agent=self.agent), + CollaborationTemplate( + other_agents=[a for a in self.controller.agents if a != self.agent] + ), + ] + + rendered = [ + template.render() for template in templates if template.should_render() + ] + return "\n\n".join(rendered) diff --git a/src/control_flow/core/controller/termination.py b/src/control_flow/core/controller/termination.py new file mode 100644 index 00000000..773582d5 --- /dev/null +++ b/src/control_flow/core/controller/termination.py @@ -0,0 +1,36 @@ +from typing import TYPE_CHECKING + +from pydantic import BaseModel + +from control_flow.task import TaskStatus + +if TYPE_CHECKING: + from control_flow.agent import Agent + + +class TerminationStrategy(BaseModel): + """ + A TerminationStrategy is a strategy for deciding when AI assistants have completed their tasks. + """ + + def run(self, agents: list["Agent"]) -> bool: + """ + Given agents, determine whether they have completed their tasks. + """ + + raise NotImplementedError() + + +class AllFinished(TerminationStrategy): + """ + An AllFinished termination strategy terminates when all agents have finished all of their tasks (either COMPLETED or FAILED). + """ + + def run(self, agents: list["Agent"]) -> bool: + """ + Given agents, determine whether they have completed their tasks. + """ + for agent in agents: + if any(task.status == TaskStatus.PENDING for task in agent.tasks): + return False + return True diff --git a/src/control_flow/core/flow.py b/src/control_flow/core/flow.py new file mode 100644 index 00000000..069a57c1 --- /dev/null +++ b/src/control_flow/core/flow.py @@ -0,0 +1,87 @@ +from typing import Callable + +from marvin.beta.assistants import Thread +from openai.types.beta.threads import Message +from prefect import task as prefect_task +from pydantic import Field, field_validator + +from control_flow.core.task import Task, TaskStatus +from control_flow.utilities.context import ctx +from control_flow.utilities.logging import get_logger +from control_flow.utilities.types import AssistantTool, ControlFlowModel + +logger = get_logger(__name__) + + +class Flow(ControlFlowModel): + thread: Thread = Field(None, validate_default=True) + tools: list[AssistantTool | Callable] = Field( + [], description="Tools that will be available to every agent in the flow" + ) + instructions: str | None = None + model: str | None = None + context: dict = {} + tasks: dict[Task, int] = Field(repr=False, default_factory=dict) + + @field_validator("thread", mode="before") + def _load_thread_from_ctx(cls, v): + if v is None: + v = ctx.get("thread", None) + if v is None: + v = Thread() + if not v.id: + v.create() + + return v + + def add_message(self, message: str): + prefect_task(self.thread.add)(message) + + def add_task(self, task: Task): + if task not in self.tasks: + task_id = len(self.tasks) + 1 + self.tasks[task] = task_id + # this message is important for contexualizing activity + # self.add_message(f'Task #{task_id} added to flow: "{task.objective}"') + + def get_task_id(self, task: Task): + return self.tasks[task] + + def pending_tasks(self): + return sorted( + (t for t in self.tasks if t.status == TaskStatus.PENDING), + key=lambda t: t.created_at, + ) + + def completed_tasks(self, reverse=False, limit=None): + result = sorted( + (t for t in self.tasks if t.status != TaskStatus.PENDING), + key=lambda t: t.completed_at, + reverse=reverse, + ) + + if limit: + result = result[:limit] + return result + + +def get_flow() -> Flow: + """ + Loads the flow from the context. + + Will error if no flow is found in the context. + """ + flow: Flow | None = ctx.get("flow") + if not flow: + raise ValueError("No flow found in context") + return flow + + +def get_flow_messages(limit: int = None) -> list[Message]: + """ + Loads messages from the flow's thread. + + Will error if no flow is found in the context. + """ + flow = get_flow() + return flow.thread.get_messages(limit=limit) diff --git a/src/control_flow/core/task.py b/src/control_flow/core/task.py new file mode 100644 index 00000000..09672333 --- /dev/null +++ b/src/control_flow/core/task.py @@ -0,0 +1,86 @@ +import datetime +from enum import Enum +from typing import Callable, Generic, TypeVar + +import marvin +import marvin.utilities.tools +from marvin.utilities.tools import FunctionTool +from pydantic import Field + +from control_flow.utilities.logging import get_logger +from control_flow.utilities.types import AssistantTool, ControlFlowModel + +T = TypeVar("T") +logger = get_logger(__name__) + + +class TaskStatus(Enum): + PENDING = "pending" + COMPLETED = "completed" + FAILED = "failed" + + +class Task(ControlFlowModel, Generic[T]): + objective: str + instructions: str | None = None + context: dict = Field({}) + status: TaskStatus = TaskStatus.PENDING + result: T = None + error: str | None = None + tools: list[AssistantTool | Callable] = [] + created_at: datetime.datetime = Field(default_factory=datetime.datetime.now) + completed_at: datetime.datetime | None = None + + def __hash__(self): + return id(self) + + def _create_complete_tool(self, task_id: int) -> FunctionTool: + """ + Create an agent-compatible tool for completing this task. + """ + result_type = self.get_result_type() + + # wrap the method call to get the correct result type signature + def complete(result: result_type): + self.complete(result=result) + + tool = marvin.utilities.tools.tool_from_function( + complete, + name=f"complete_task_{task_id}", + description=f"Mark task {task_id} completed", + ) + + return tool + + def _create_fail_tool(self, task_id: int) -> FunctionTool: + """ + Create an agent-compatible tool for failing this task. + """ + tool = marvin.utilities.tools.tool_from_function( + self.fail, + name=f"fail_task_{task_id}", + description=f"Mark task {task_id} failed", + ) + return tool + + def get_tools(self, task_id: int) -> list[AssistantTool | Callable]: + return [ + self._create_complete_tool(task_id), + self._create_fail_tool(task_id), + ] + self.tools + + def complete(self, result: T): + self.result = result + self.status = TaskStatus.COMPLETED + self.completed_at = datetime.datetime.now() + + def fail(self, message: str | None = None): + self.error = message + self.status = TaskStatus.FAILED + self.completed_at = datetime.datetime.now() + + def get_result_type(self) -> T: + """ + Returns the `type` of the task's result field. + """ + return self.model_fields["result"].annotation diff --git a/src/control_flow/delegation.py b/src/control_flow/delegation.py deleted file mode 100644 index 86c2a7bb..00000000 --- a/src/control_flow/delegation.py +++ /dev/null @@ -1,86 +0,0 @@ -import itertools -from typing import Any, Generator - -import marvin -from marvin.beta.assistants import Assistant -from pydantic import BaseModel - -from control_flow.flow import get_flow_messages -from control_flow.instructions import get_instructions - - -class DelegationStrategy(BaseModel): - """ - A DelegationStrategy is a strategy for delegating tasks to AI assistants. - """ - - def run(self, assistants: list[Assistant]) -> Generator[Any, Any, Assistant]: - """ - Given a list of potential assistants, delegate the task to the most qualified assistant. - """ - - raise NotImplementedError() - - -class Single(DelegationStrategy): - """ - A Single delegation strategy delegates tasks to a single AI assistant. - """ - - assistant: Assistant - - def run(self, assistants: list[Assistant]) -> Generator[Any, Any, Assistant]: - """ - Given a list of potential assistants, choose the first assistant in the list. - """ - - if self.assistant not in assistants: - raise ValueError("Assistant not in list of assistants") - - while True: - yield self.assistant - - -class RoundRobin(DelegationStrategy): - """ - A RoundRobin delegation strategy delegates tasks to AI assistants in a round-robin fashion. - """ - - def run(self, assistants: list[Assistant]) -> Generator[Any, Any, Assistant]: - """ - Given a list of potential assistants, choose the next assistant in the list. - """ - - yield from itertools.cycle(assistants) - - -class Moderator(DelegationStrategy): - """ - A Moderator delegation strategy delegates tasks to the most qualified AI assistant, using a Marvin classifier - """ - - model: str = None - - def run(self, assistants: list[Assistant]) -> Generator[Any, Any, Assistant]: - """ - Given a list of potential assistants, delegate the task to the most qualified assistant. - """ - - while True: - instructions = get_instructions() - history = get_flow_messages() - - context = dict(messages=history, global_instructions=instructions) - assistant = marvin.classify( - context, - assistants, - instructions=""" - Given the conversation context, choose the AI assistant most - qualified to take the next turn at completing the tasks. Take into - account the instructions, each assistant's own instructions, and the - tools they have available. - """, - model_kwargs=dict(model=self.model), - ) - - yield assistant diff --git a/src/control_flow/dx.py b/src/control_flow/dx.py new file mode 100644 index 00000000..fbf41b84 --- /dev/null +++ b/src/control_flow/dx.py @@ -0,0 +1,185 @@ +import functools +import inspect +from typing import Callable, TypeVar + +from prefect import flow as prefect_flow +from prefect import task as prefect_task + +from control_flow.core.agent import Agent +from control_flow.core.flow import Flow +from control_flow.core.task import Task, TaskStatus +from control_flow.utilities.context import ctx +from control_flow.utilities.logging import get_logger +from control_flow.utilities.marvin import patch_marvin +from control_flow.utilities.types import AssistantTool, Thread + +logger = get_logger(__name__) +T = TypeVar("T") +NOT_PROVIDED = object() + + +def ai_flow( + fn=None, + *, + thread: Thread = None, + tools: list[AssistantTool | Callable] = None, + instructions: str = None, + model: str = None, +): + """ + Prepare a function to be executed as a Control Flow flow. + """ + + if fn is None: + return functools.partial( + ai_flow, + thread=thread, + tools=tools, + instructions=instructions, + model=model, + ) + + @functools.wraps(fn) + def wrapper( + *args, + flow_kwargs: dict = None, + **kwargs, + ): + p_fn = prefect_flow(fn) + + flow_obj = Flow( + **{ + "thread": thread, + "tools": tools or [], + "instructions": instructions, + "model": model, + **(flow_kwargs or {}), + } + ) + + logger.info( + f'Executing AI flow "{fn.__name__}" on thread "{flow_obj.thread.id}"' + ) + + with ctx(flow=flow_obj), patch_marvin(): + return p_fn(*args, **kwargs) + + return wrapper + + +def ai_task( + fn=None, *, objective: str = None, user_access: bool = None, **agent_kwargs: dict +): + """ + Use a Python function to create an AI task. When the function is called, an + agent is created to complete the task and return the result. + """ + + if fn is None: + return functools.partial( + ai_task, objective=objective, user_access=user_access, **agent_kwargs + ) + + sig = inspect.signature(fn) + + if objective is None: + if fn.__doc__: + objective = f"{fn.__name__}: {fn.__doc__}" + else: + objective = fn.__name__ + + @functools.wraps(fn) + def wrapper(*args, **kwargs): + # first process callargs + bound = sig.bind(*args, **kwargs) + bound.apply_defaults() + + # return run_ai.with_options(name=f"Task: {fn.__name__}")( + return run_ai( + tasks=objective, + cast=fn.__annotations__.get("return"), + context=bound.arguments, + user_access=user_access, + **agent_kwargs, + ) + + return wrapper + + +def _name_from_objective(): + """Helper function for naming task runs""" + from prefect.runtime import task_run + + objective = task_run.parameters.get("task") + + if not objective: + objective = "Follow general instructions" + if len(objective) > 75: + return f"Task: {objective[:75]}..." + return f"Task: {objective}" + + +@prefect_task(task_run_name=_name_from_objective) +def run_ai( + tasks: str | list[str], + agents: list[Agent] = None, + cast: T = NOT_PROVIDED, + context: dict = None, + user_access: bool = False, +) -> T | list[T]: + """ + Create and run an agent to complete a task with the given objective and + context. This function is similar to an inline version of the @ai_task + decorator. + + This inline version is useful when you want to create and run an ad-hoc AI + task, without defining a function or using decorator syntax. It provides + more flexibility in terms of dynamically setting the task parameters. + Additional detail can be provided as `context`. + """ + + single_result = False + if isinstance(tasks, str): + single_result = True + + tasks = [tasks] + + if cast is NOT_PROVIDED: + if not tasks: + cast = None + else: + cast = str + + # load flow + flow = ctx.get("flow", None) + + # create tasks + if tasks: + ai_tasks = [Task[cast](objective=t, context=context or {}) for t in tasks] + else: + ai_tasks = [] + + # create agent + if agents is None: + agents = [Agent()] + + # create Controller + from control_flow.core.controller.controller import Controller + + controller = Controller( + tasks=ai_tasks, agents=agents, flow=flow, user_access=user_access + ) + controller.run() + + if ai_tasks: + if all(task.status == TaskStatus.COMPLETED for task in ai_tasks): + result = [task.result for task in ai_tasks] + if single_result: + result = result[0] + return result + elif failed_tasks := [ + task for task in ai_tasks if task.status == TaskStatus.FAILED + ]: + raise ValueError( + f'Failed tasks: {", ".join([task.objective for task in failed_tasks])}' + ) diff --git a/src/control_flow/flow.py b/src/control_flow/flow.py deleted file mode 100644 index e5184d82..00000000 --- a/src/control_flow/flow.py +++ /dev/null @@ -1,124 +0,0 @@ -import functools -from typing import Callable, Optional, Union - -from marvin.beta.assistants import Assistant, Thread -from marvin.beta.assistants.assistants import AssistantTool -from marvin.utilities.logging import get_logger -from openai.types.beta.threads import Message -from prefect import flow as prefect_flow -from prefect import task as prefect_task -from pydantic import BaseModel, Field, field_validator - -from control_flow.context import ctx -from control_flow.utilities.marvin import patch_marvin - -logger = get_logger(__name__) - - -class AIFlow(BaseModel): - thread: Thread = Field(None, validate_default=True) - assistants: Optional[list[Assistant]] = Field(None, validate_default=True) - tools: list[Union[AssistantTool, Callable]] = Field(None, validate_default=True) - instructions: Optional[str] = None - model: Optional[str] = None - - model_config: dict = dict(validate_assignment=True, extra="forbid") - - @field_validator("thread", mode="before") - def _load_thread_from_ctx(cls, v): - if v is None: - v = ctx.get("thread", None) - if v is None: - v = Thread() - if not v.id: - v.create() - - return v - - @field_validator("tools", mode="before") - def _default_tools(cls, v): - if v is None: - v = [] - return v - - def add_message(self, message: str): - prefect_task(self.thread.add)(message) - - -def ai_flow( - fn=None, - *, - assistants: list[Assistant] = None, - thread: Thread = None, - tools: list[Union[AssistantTool, Callable]] = None, - instructions: str = None, - model: str = None, -): - """ - Prepare a function to be executed as a Control Flow flow. - """ - - if fn is None: - return functools.partial( - ai_flow, - assistants=assistants, - thread=thread, - tools=tools, - instructions=instructions, - model=model, - ) - - @functools.wraps(fn) - def wrapper( - *args, - _assistants: list[Assistant] = None, - _thread: Thread = None, - _tools: list[Union[AssistantTool, Callable]] = None, - _instructions: str = None, - _model: str = None, - **kwargs, - ): - p_fn = prefect_flow(fn) - flow_assistants = _assistants or assistants - flow_thread = _thread or thread or Thread() - flow_instructions = _instructions or instructions - flow_tools = _tools or tools - flow_model = _model or model - flow_obj = AIFlow( - thread=flow_thread, - assistants=flow_assistants, - tools=flow_tools, - instructions=flow_instructions, - model=flow_model, - ) - - logger.info( - f'Executing AI flow "{fn.__name__}" on thread "{flow_obj.thread.id}"' - ) - - with ctx(flow=flow_obj), patch_marvin(): - return p_fn(*args, **kwargs) - - return wrapper - - -def get_flow() -> AIFlow: - """ - Loads the flow from the context. - - Will error if no flow is found in the context. - """ - flow: Optional[AIFlow] = ctx.get("flow") - if not flow: - raise ValueError("No flow found in context") - return flow - - -def get_flow_messages(limit: int = None) -> list[Message]: - """ - Loads messages from the flow's thread. - - Will error if no flow is found in the context. - """ - flow = get_flow() - return flow.thread.get_messages(limit=limit) diff --git a/src/control_flow/instructions.py b/src/control_flow/instructions.py index 4780d66a..97e89664 100644 --- a/src/control_flow/instructions.py +++ b/src/control_flow/instructions.py @@ -2,10 +2,9 @@ from contextlib import contextmanager from typing import Generator, List -from marvin.utilities.logging import get_logger - -from control_flow.context import ctx -from control_flow.flow import AIFlow +from control_flow.core.flow import Flow +from control_flow.utilities.context import ctx +from control_flow.utilities.logging import get_logger logger = get_logger(__name__) @@ -31,7 +30,7 @@ def instructions( """ if post_add_message or post_remove_message: - flow: AIFlow = ctx.get("flow") + flow: Flow = ctx.get("flow") if flow is None: raise ValueError( "instructions() with message posting must be used within a flow context" diff --git a/src/control_flow/task.py b/src/control_flow/task.py deleted file mode 100644 index 438aa546..00000000 --- a/src/control_flow/task.py +++ /dev/null @@ -1,113 +0,0 @@ -from enum import Enum -from typing import Generic, Optional, TypeVar - -import marvin -import marvin.utilities.tools -from marvin.beta.assistants.runs import EndRun -from marvin.utilities.logging import get_logger -from marvin.utilities.tools import FunctionTool -from pydantic import BaseModel, Field, field_validator - -T = TypeVar("T") -logger = get_logger(__name__) - - -class TaskStatus(Enum): - PENDING = "pending" - COMPLETED = "completed" - FAILED = "failed" - - -class AITask(BaseModel, Generic[T]): - """ - An AITask represents a single unit of work that an assistant must complete. - Unlike instructions, which do not require a formal result and may last for - zero or more agent iterations, tasks must be formally completed (or failed) - by producing a result. Agents that are assigned tasks will continue to - iterate until all tasks are completed. - """ - - objective: str - instructions: Optional[str] = None - context: dict = Field(None, validate_default=True) - status: TaskStatus = TaskStatus.PENDING - result: T = None - error: Optional[str] = None - - # internal - model_config: dict = dict(validate_assignment=True, extra="forbid") - - @field_validator("context", mode="before") - def _default_context(cls, v): - if v is None: - v = {} - return v - - def _create_complete_tool( - self, task_id: int, end_run: bool = False - ) -> FunctionTool: - """ - Create an agent-compatible tool for completing this task. - """ - - result_type = self.get_result_type() - - if result_type is not None: - - def complete(result: result_type): - self.result = result - self.status = TaskStatus.COMPLETED - if end_run: - return EndRun() - - tool = marvin.utilities.tools.tool_from_function( - complete, - name=f"complete_task_{task_id}", - description=f"Mark task {task_id} completed", - ) - else: - - def complete(): - self.status = TaskStatus.COMPLETED - if end_run: - return EndRun() - - tool = marvin.utilities.tools.tool_from_function( - complete, - name=f"complete_task_{task_id}", - description=f"Mark task {task_id} completed", - ) - - return tool - - def _create_fail_tool(self, task_id: int, end_run: bool = False) -> FunctionTool: - """ - Create an agent-compatible tool for failing this task. - """ - - def fail(message: Optional[str] = None): - self.error = message - self.status = TaskStatus.FAILED - if end_run: - return EndRun() - - tool = marvin.utilities.tools.tool_from_function( - fail, - name=f"fail_task_{task_id}", - description=f"Mark task {task_id} failed", - ) - return tool - - def complete(self, result: T): - self.result = result - self.status = TaskStatus.COMPLETED - - def fail(self, message: Optional[str] = None): - self.error = message - self.status = TaskStatus.FAILED - - def get_result_type(self) -> T: - """ - Returns the `type` of the task's result field. - """ - return self.model_fields["result"].annotation diff --git a/src/control_flow/context.py b/src/control_flow/utilities/context.py similarity index 100% rename from src/control_flow/context.py rename to src/control_flow/utilities/context.py diff --git a/src/control_flow/utilities/jinja.py b/src/control_flow/utilities/jinja.py new file mode 100644 index 00000000..92d4ca71 --- /dev/null +++ b/src/control_flow/utilities/jinja.py @@ -0,0 +1,13 @@ +import inspect +from datetime import datetime +from zoneinfo import ZoneInfo + +from marvin.utilities.jinja import BaseEnvironment + +jinja_env = BaseEnvironment( + globals={ + "now": lambda: datetime.now(ZoneInfo("UTC")), + "inspect": inspect, + "id": id, + } +) diff --git a/src/control_flow/utilities/logging.py b/src/control_flow/utilities/logging.py new file mode 100644 index 00000000..52a57cff --- /dev/null +++ b/src/control_flow/utilities/logging.py @@ -0,0 +1,44 @@ +import logging +from functools import lru_cache +from typing import Optional + +from marvin.utilities.logging import add_logging_methods + + +@lru_cache() +def get_logger(name: Optional[str] = None) -> logging.Logger: + """ + Retrieves a logger with the given name, or the root logger if no name is given. + + Args: + name: The name of the logger to retrieve. + + Returns: + The logger with the given name, or the root logger if no name is given. + + Example: + Basic Usage of `get_logger` + ```python + from control_flow.utilities.logging import get_logger + + logger = get_logger("control_flow.test") + logger.info("This is a test") # Output: control_flow.test: This is a test + + debug_logger = get_logger("control_flow.debug") + debug_logger.debug_kv("TITLE", "log message", "green") + ``` + """ + parent_logger = logging.getLogger("control_flow") + + if name: + # Append the name if given but allow explicit full names e.g. "control_flow.test" + # should not become "control_flow.control_flow.test" + if not name.startswith(parent_logger.name + "."): + logger = parent_logger.getChild(name) + else: + logger = logging.getLogger(name) + else: + logger = parent_logger + + add_logging_methods(logger) + return logger diff --git a/src/control_flow/utilities/types.py b/src/control_flow/utilities/types.py new file mode 100644 index 00000000..5b4c45d2 --- /dev/null +++ b/src/control_flow/utilities/types.py @@ -0,0 +1,8 @@ +from marvin.beta.assistants import Assistant, Thread +from marvin.beta.assistants.assistants import AssistantTool +from marvin.utilities.asyncio import ExposeSyncMethodsMixin +from pydantic import BaseModel + + +class ControlFlowModel(BaseModel): + model_config = dict(validate_assignment=True, extra="forbid") diff --git a/tests/flows/test_sign_guestbook.py b/tests/flows/test_sign_guestbook.py index b3d816cf..66b7b2e8 100644 --- a/tests/flows/test_sign_guestbook.py +++ b/tests/flows/test_sign_guestbook.py @@ -1,5 +1,5 @@ -from control_flow import Assistant, run_ai_task -from control_flow.flow import ai_flow +from control_flow import Assistant, run_ai +from control_flow.core.flow import ai_flow # define assistants @@ -28,7 +28,7 @@ def view_guestbook(): @ai_flow def guestbook_flow(): - run_ai_task( + run_ai( """ Add your name to the list using the `sign` tool. All assistants must sign their names for the task to be complete. You can read the sign to