diff --git a/README.md b/README.md index d354d5e1..c82a5768 100644 --- a/README.md +++ b/README.md @@ -31,7 +31,7 @@ pip install . ## Example ```python -from control_flow import ai_flow, ai_task, run_agent, instructions +from control_flow import ai_flow, ai_task, run_ai_task, instructions from pydantic import BaseModel @@ -61,7 +61,7 @@ def demo(): name = get_user_name() # define an AI task inline - interests = run_agent("ask user for three interests", cast=list[str], user_access=True) + interests = run_ai_task("ask user for three interests", cast=list[str], user_access=True) # set instructions for just the next task with instructions("no more than 8 lines"): diff --git a/examples/documentation.py b/examples/documentation.py index 04db8146..1a6b5bba 100644 --- a/examples/documentation.py +++ b/examples/documentation.py @@ -4,7 +4,7 @@ import control_flow from control_flow import ai_flow, ai_task from marvin.beta.assistants import Assistant, Thread -from marvin.tools.filesystem import ls, read, read_lines, write +from marvin.tools.filesystem import read, write ROOT = Path(control_flow.__file__).parents[2] @@ -23,7 +23,7 @@ def glob(pattern: str) -> list[str]: You are an expert technical writer who writes wonderful documentation for open-source tools and believes that documentation is a product unto itself. """, - tools=[read, read_lines, ls, write, glob], + tools=[read, write, glob], ) @@ -35,7 +35,7 @@ def examine_source_code(source_dir: Path, extensions: list[str]): """ -@ai_task +@ai_task(model="gpt-3.5-turbo") def read_docs(docs_dir: Path): """ Read all documentation in the docs dir and subdirectories, if any. @@ -52,7 +52,7 @@ def write_docs(docs_dir: Path, instructions: str = None): @ai_flow(assistant=assistant) def docs_flow(instructions: str): examine_source_code(ROOT / "src", extensions=[".py"]) - read_docs(ROOT / "docs") + # read_docs(ROOT / "docs") write_docs(ROOT / "docs", instructions=instructions) diff --git a/examples/readme_example.py b/examples/readme_example.py index d88de7d3..0ac0298b 100644 --- a/examples/readme_example.py +++ b/examples/readme_example.py @@ -1,4 +1,4 @@ -from control_flow import ai_flow, ai_task, instructions, run_agent +from control_flow import ai_flow, ai_task, instructions, run_ai_task from pydantic import BaseModel @@ -26,7 +26,7 @@ def demo(): name = get_user_name() # define an AI task inline - interests = run_agent( + interests = run_ai_task( "ask user for three interests", cast=list[str], user_access=True, diff --git a/src/control_flow/__init__.py b/src/control_flow/__init__.py index 97f9c013..371a8e4a 100644 --- a/src/control_flow/__init__.py +++ b/src/control_flow/__init__.py @@ -1,5 +1,7 @@ from .settings import settings -from .agent import ai_task, Agent, run_agent +from .agent import ai_task, Agent, run_ai_task from .flow import ai_flow from .instructions import instructions + +from marvin.beta.assistants import Assistant diff --git a/src/control_flow/agent.py b/src/control_flow/agent.py index f8d5c1a8..39ee2088 100644 --- a/src/control_flow/agent.py +++ b/src/control_flow/agent.py @@ -2,19 +2,20 @@ import inspect import json import logging +from datetime import datetime from typing import Callable, Generic, TypeVar, Union +from zoneinfo import ZoneInfo import marvin import marvin.utilities.tools import prefect -from marvin.beta.assistants import Thread from marvin.beta.assistants.assistants import Assistant from marvin.beta.assistants.handlers import PrintHandler from marvin.beta.assistants.runs import Run from marvin.tools.assistants import AssistantTool, EndRun from marvin.types import FunctionTool from marvin.utilities.asyncio import ExposeSyncMethodsMixin, expose_sync_method -from marvin.utilities.jinja import Environment +from marvin.utilities.jinja import BaseEnvironment from openai.types.beta.threads.runs import ToolCall from prefect import get_client as get_prefect_client from prefect import task as prefect_task @@ -23,6 +24,7 @@ from control_flow import settings from control_flow.context import ctx +from control_flow.delegation import DelegationStrategy, RoundRobin from control_flow.utilities.prefect import ( create_json_artifact, create_markdown_artifact, @@ -36,7 +38,14 @@ logger = logging.getLogger(__name__) NOT_PROVIDED = object() -TEMP_THREADS = {} + +jinja_environment = BaseEnvironment( + globals={ + "now": lambda: datetime.now(ZoneInfo("UTC")), + "inspect": inspect, + "id": id, + } +) TOOL_CALL_FUNCTION_RESULT_TEMPLATE = inspect.cleandoc( @@ -61,10 +70,11 @@ INSTRUCTIONS = """ -You are an AI assistant. Your job is to complete the tasks assigned to you. You -were created by a software application, and any messages you receive are from -that software application, not a user. You may use any tools at your disposal to -complete the task, including talking to a human user. +You are an AI assistant. Your name is `{{ assistant.name}}`. Your job is to +complete the tasks assigned to you. You were created by a software application, +and any messages you receive are from that software application, not a user. You +may use any tools at your disposal to complete the task, including talking to a +human user. ## Instructions @@ -85,6 +95,21 @@ {% endfor %} +## Assistants + +The following assistants are collaborating to complete the tasks. + +{% for a in agent.assistants %} +- {{ a.name }} {%- if id(a) == id(assistant) %} (this is you){%endif%}: {{ a.description | default("(no description provided)")}} +{% endfor %} + +{% if agent.assistants|length > 1 %} +Use the `end_run` tool to end your turn and allow another assistant to take +over, without marking a task as complete or failed. +{% endif %} + + + ## Tasks {% if agent.tasks %} @@ -130,7 +155,10 @@ {% if agent.system_access -%} The software that created you is an AI capable of processing natural language, -so you can freely respond by posting messages to the thread. +so you can freely respond by posting messages to the thread. There may be more +than one AI posting responses, so preface your messages with your own name to +avoid confusion. For example, if your name is "Marvin", start all of your +messages with "Marvin:". This is not required for tool calls. {% else %} The software that created you is a Python script that can only process structured responses produced by your tools. DO NOT POST ANY MESSAGES OR RESPONSES TO THE @@ -161,76 +189,6 @@ """ -class AgentHandler(PrintHandler): - def __init__(self, **kwargs): - super().__init__(**kwargs) - self.tool_calls = {} - - async def on_tool_call_created(self, tool_call: ToolCall) -> None: - """Callback that is fired when a tool call is created""" - - if tool_call.type == "function": - task_run_name = "Prepare arguments for tool call" - else: - task_run_name = f"Tool call: {tool_call.type}" - - client = get_prefect_client() - engine_context = FlowRunContext.get() - if not engine_context: - return - - task_run = await client.create_task_run( - task=prefect.Task(fn=lambda: None), - name=task_run_name, - extra_tags=["tool-call"], - flow_run_id=engine_context.flow_run.id, - dynamic_key=tool_call.id, - state=prefect.states.Running(), - ) - - self.tool_calls[tool_call.id] = task_run - - async def on_tool_call_done(self, tool_call: ToolCall) -> None: - """Callback that is fired when a tool call is done""" - - client = get_prefect_client() - task_run = self.tool_calls.get(tool_call.id) - if not task_run: - return - await client.set_task_run_state( - task_run_id=task_run.id, state=prefect.states.Completed(), force=True - ) - - # code interpreter is run as a single call, so we can publish a result artifact - if tool_call.type == "code_interpreter": - # images = [] - # for output in tool_call.code_interpreter.outputs: - # if output.type == "image": - # image_path = download_temp_file(output.image.file_id) - # images.append(image_path) - - create_python_artifact( - key="code", - code=tool_call.code_interpreter.input, - description="Code executed in the code interpreter", - task_run_id=task_run.id, - ) - create_json_artifact( - key="output", - data=tool_call.code_interpreter.outputs, - description="Output from the code interpreter", - task_run_id=task_run.id, - ) - - elif tool_call.type == "function": - create_json_artifact( - key="arguments", - data=json.dumps(json.loads(tool_call.function.arguments), indent=2), - description=f"Arguments for the `{tool_call.function.name}` tool", - task_run_id=task_run.id, - ) - - def talk_to_human(message: str, get_response: bool = True) -> str: """ Send a message to the human user and optionally wait for a response. @@ -245,14 +203,19 @@ def talk_to_human(message: str, get_response: bool = True) -> str: def end_run(): - """Use this tool to end the run.""" + """Use this tool to end the run without marking a task as complete or failed.""" return EndRun() class Agent(BaseModel, Generic[T], ExposeSyncMethodsMixin): tasks: list[AITask] = [] flow: AIFlow = Field(None, validate_default=True) - assistant: Assistant = Field(None, validate_default=True) + assistants: list = Field(None, validate_default=True) + delegation_strategy: DelegationStrategy = Field( + validate_default=True, + description="The strategy for delegating work to assistants.", + default_factory=RoundRobin, + ) tools: list[Union[AssistantTool, Assistant, Callable]] = [] context: dict = Field(None, validate_default=True) user_access: bool = Field( @@ -284,14 +247,14 @@ def _default_context(cls, v): v = {} return v - @field_validator("assistant", mode="before") - def _default_assistant(cls, v): + @field_validator("assistants", mode="before") + def _default_assistants(cls, v): if v is None: flow = ctx.get("flow") if flow: - v = flow.assistant + v = flow.assistants if v is None: - v = Assistant() + v = [Assistant()] return v @field_validator("user_access", "system_access", mode="before") @@ -303,22 +266,21 @@ def _default_access(cls, v): def numbered_tasks(self) -> list[tuple[int, AITask]]: return [(i + 1, task) for i, task in enumerate(self.tasks)] - def _get_instructions(self, context: dict = None): - instructions = Environment.render( + def _get_instructions(self, assistant: Assistant, context: dict = None): + instructions = jinja_environment.render( INSTRUCTIONS, agent=self, flow=self.flow, - assistant=self.assistant, + assistant=assistant, instructions=ctx.get("instructions", []), context={**self.context, **(context or {})}, ) - return instructions - def _get_tools(self) -> list[AssistantTool]: - tools = self.flow.tools + self.tools + self.assistant.tools + def _get_tools(self, assistant: Assistant) -> list[AssistantTool]: + tools = self.flow.tools + self.tools + assistant.tools - if not self.tasks: + if not self.tasks or len(self.assistants) > 1: tools.append(end_run) # if there is only one task, and the agent can't send a response to the @@ -341,9 +303,7 @@ def _get_tools(self) -> list[AssistantTool]: final_tools = [] for tool in tools: - if isinstance(tool, marvin.beta.assistants.Assistant): - tool = self.model_copy(update={"assistant": tool}).as_tool() - elif not isinstance(tool, AssistantTool): + if not isinstance(tool, AssistantTool): tool = marvin.utilities.tools.tool_from_function(tool) if isinstance(tool, FunctionTool): @@ -383,28 +343,37 @@ async def modified_fn( final_tools.append(tool) return final_tools - def _get_openai_run_task(self): + def _get_run_assistant_task(self): """ Helper function for building the task that will execute the OpenAI assistant run. This needs to be regenerated each time in case the instructions change. """ - @prefect_task(task_run_name=f"Run OpenAI assistant ({self.assistant.name})") - async def execute_openai_run( - context: dict = None, run_kwargs: dict = None + def _name_from_assistant(): + """Helper function for naming task runs""" + from prefect.runtime import task_run + + assistant = task_run.parameters.get("assistant") + return f"Run OpenAI assistant ({assistant.name})" + + @prefect_task(task_run_name=_name_from_assistant) + async def run_openai_assistant( + assistant: Assistant, context: dict = None, run_kwargs: dict = None ) -> Run: run_kwargs = run_kwargs or {} model = run_kwargs.pop( "model", - self.assistant.model or self.flow.model or settings.assistant_model, + assistant.model or self.flow.model or settings.assistant_model, ) thread = run_kwargs.pop("thread", self.flow.thread) run = Run( - assistant=self.assistant, + assistant=assistant, thread=thread, - instructions=self._get_instructions(context=context), - tools=self._get_tools(), + instructions=self._get_instructions( + assistant=assistant, context=context + ), + tools=self._get_tools(assistant=assistant), event_handler_class=AgentHandler, model=model, **run_kwargs, @@ -424,13 +393,18 @@ async def execute_openai_run( ) return run - return execute_openai_run + return run_openai_assistant @expose_sync_method("run") async def run_async(self, context: dict = None, **run_kwargs) -> list[AITask]: - openai_run = self._get_openai_run_task() + assistants_generator = self.delegation_strategy.run(self.assistants) + openai_run = self._get_run_assistant_task() - openai_run(context=context, run_kwargs=run_kwargs) + openai_run( + assistant=next(assistants_generator), + context=context, + run_kwargs=run_kwargs, + ) # if this AI can't post messages to the system, then continue to invoke # it until all tasks are finished @@ -440,49 +414,94 @@ async def run_async(self, context: dict = None, **run_kwargs) -> list[AITask]: any(t.status == TaskStatus.PENDING for t in self.tasks) and counter < settings.max_agent_iterations ): - openai_run(context=context, run_kwargs=run_kwargs) + openai_run( + assistant=next(assistants_generator), + context=context, + run_kwargs=run_kwargs, + ) counter += 1 result = [t.result for t in self.tasks if t.status == TaskStatus.COMPLETED] return result - def as_tool(self): - thread = TEMP_THREADS.setdefault(self.assistant.model_dump_json(), Thread()) - - def _run(message: str, context: dict = None) -> list[str]: - task = self._get_openai_run_task() - run: Run = task(context=context, run_kwargs=dict(thread=thread)) - return [m.model_dump_json() for m in run.messages] - - return marvin.utilities.tools.tool_from_function( - _run, - name=f"call_ai_{self.assistant.name}", - description=inspect.cleandoc(""" - Use this tool to talk to a sub-AI that can operate independently of - you. The sub-AI may have a different skillset or be able to access - different tools than you. The sub-AI will run one iteration and - respond to you. You may continue to invoke it multiple times in sequence, as - needed. - - Note: you can only talk to one sub-AI at a time. Do not call in parallel or you will get an error about thread conflicts. - - ## Sub-AI Details - - - Name: {name} - - Instructions: {instructions} - """).format( - name=self.assistant.name, instructions=self.assistant.instructions - ), + +class AgentHandler(PrintHandler): + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.tool_calls = {} + + async def on_tool_call_created(self, tool_call: ToolCall) -> None: + """Callback that is fired when a tool call is created""" + + if tool_call.type == "function": + task_run_name = "Prepare arguments for tool call" + else: + task_run_name = f"Tool call: {tool_call.type}" + + client = get_prefect_client() + engine_context = FlowRunContext.get() + if not engine_context: + return + + task_run = await client.create_task_run( + task=prefect.Task(fn=lambda: None), + name=task_run_name, + extra_tags=["tool-call"], + flow_run_id=engine_context.flow_run.id, + dynamic_key=tool_call.id, + state=prefect.states.Running(), ) + self.tool_calls[tool_call.id] = task_run + + async def on_tool_call_done(self, tool_call: ToolCall) -> None: + """Callback that is fired when a tool call is done""" + + client = get_prefect_client() + task_run = self.tool_calls.get(tool_call.id) + if not task_run: + return + await client.set_task_run_state( + task_run_id=task_run.id, state=prefect.states.Completed(), force=True + ) + + # code interpreter is run as a single call, so we can publish a result artifact + if tool_call.type == "code_interpreter": + # images = [] + # for output in tool_call.code_interpreter.outputs: + # if output.type == "image": + # image_path = download_temp_file(output.image.file_id) + # images.append(image_path) + + create_python_artifact( + key="code", + code=tool_call.code_interpreter.input, + description="Code executed in the code interpreter", + task_run_id=task_run.id, + ) + create_json_artifact( + key="output", + data=tool_call.code_interpreter.outputs, + description="Output from the code interpreter", + task_run_id=task_run.id, + ) + + elif tool_call.type == "function": + create_json_artifact( + key="arguments", + data=json.dumps(json.loads(tool_call.function.arguments), indent=2), + description=f"Arguments for the `{tool_call.function.name}` tool", + task_run_id=task_run.id, + ) + def ai_task( fn=None, *, objective: str = None, user_access: bool = None, **agent_kwargs: dict ): """ - Decorator that uses a function to create an AI task. When the function is - called, an agent is created to complete the task and return the result. + Use a Python function to create an AI task. When the function is called, an + agent is created to complete the task and return the result. """ if fn is None: @@ -504,7 +523,7 @@ def wrapper(*args, **kwargs): bound = sig.bind(*args, **kwargs) bound.apply_defaults() - return run_agent.with_options(name=f"Task: {fn.__name__}")( + return run_ai_task.with_options(name=f"Task: {fn.__name__}")( task=objective, cast=fn.__annotations__.get("return"), context=bound.arguments, @@ -529,7 +548,7 @@ def _name_from_objective(): @prefect_task(task_run_name=_name_from_objective) -def run_agent( +def run_ai_task( task: str = None, cast: T = NOT_PROVIDED, context: dict = None, @@ -538,8 +557,14 @@ def run_agent( **agent_kwargs: dict, ) -> T: """ - Run an agent to complete a task with the given objective and context. The - response will be cast to the given result type. + Create and run an agent to complete a task with the given objective and + context. This function is similar to an inline version of the @ai_task + decorator. + + This inline version is useful when you want to create and run an ad-hoc AI + task, without defining a function or using decorator syntax. It provides + more flexibility in terms of dynamically setting the task parameters. + Additional detail can be provided as `context`. """ if cast is NOT_PROVIDED: diff --git a/src/control_flow/delegation.py b/src/control_flow/delegation.py new file mode 100644 index 00000000..86c2a7bb --- /dev/null +++ b/src/control_flow/delegation.py @@ -0,0 +1,86 @@ +import itertools +from typing import Any, Generator + +import marvin +from marvin.beta.assistants import Assistant +from pydantic import BaseModel + +from control_flow.flow import get_flow_messages +from control_flow.instructions import get_instructions + + +class DelegationStrategy(BaseModel): + """ + A DelegationStrategy is a strategy for delegating tasks to AI assistants. + """ + + def run(self, assistants: list[Assistant]) -> Generator[Any, Any, Assistant]: + """ + Given a list of potential assistants, delegate the task to the most qualified assistant. + """ + + raise NotImplementedError() + + +class Single(DelegationStrategy): + """ + A Single delegation strategy delegates tasks to a single AI assistant. + """ + + assistant: Assistant + + def run(self, assistants: list[Assistant]) -> Generator[Any, Any, Assistant]: + """ + Given a list of potential assistants, choose the first assistant in the list. + """ + + if self.assistant not in assistants: + raise ValueError("Assistant not in list of assistants") + + while True: + yield self.assistant + + +class RoundRobin(DelegationStrategy): + """ + A RoundRobin delegation strategy delegates tasks to AI assistants in a round-robin fashion. + """ + + def run(self, assistants: list[Assistant]) -> Generator[Any, Any, Assistant]: + """ + Given a list of potential assistants, choose the next assistant in the list. + """ + + yield from itertools.cycle(assistants) + + +class Moderator(DelegationStrategy): + """ + A Moderator delegation strategy delegates tasks to the most qualified AI assistant, using a Marvin classifier + """ + + model: str = None + + def run(self, assistants: list[Assistant]) -> Generator[Any, Any, Assistant]: + """ + Given a list of potential assistants, delegate the task to the most qualified assistant. + """ + + while True: + instructions = get_instructions() + history = get_flow_messages() + + context = dict(messages=history, global_instructions=instructions) + assistant = marvin.classify( + context, + assistants, + instructions=""" + Given the conversation context, choose the AI assistant most + qualified to take the next turn at completing the tasks. Take into + account the instructions, each assistant's own instructions, and the + tools they have available. + """, + model_kwargs=dict(model=self.model), + ) + + yield assistant diff --git a/src/control_flow/flow.py b/src/control_flow/flow.py index 24dfcb81..e5184d82 100644 --- a/src/control_flow/flow.py +++ b/src/control_flow/flow.py @@ -17,19 +17,13 @@ class AIFlow(BaseModel): thread: Thread = Field(None, validate_default=True) - assistant: Optional[Assistant] = Field(None, validate_default=True) + assistants: Optional[list[Assistant]] = Field(None, validate_default=True) tools: list[Union[AssistantTool, Callable]] = Field(None, validate_default=True) instructions: Optional[str] = None model: Optional[str] = None model_config: dict = dict(validate_assignment=True, extra="forbid") - @field_validator("assistant", mode="before") - def _load_assistant_from_ctx(cls, v): - if v is None: - v = ctx.get("assistant", None) - return v - @field_validator("thread", mode="before") def _load_thread_from_ctx(cls, v): if v is None: @@ -54,7 +48,7 @@ def add_message(self, message: str): def ai_flow( fn=None, *, - assistant: Assistant = None, + assistants: list[Assistant] = None, thread: Thread = None, tools: list[Union[AssistantTool, Callable]] = None, instructions: str = None, @@ -67,7 +61,7 @@ def ai_flow( if fn is None: return functools.partial( ai_flow, - assistant=assistant, + assistants=assistants, thread=thread, tools=tools, instructions=instructions, @@ -77,7 +71,7 @@ def ai_flow( @functools.wraps(fn) def wrapper( *args, - _assistant: Assistant = None, + _assistants: list[Assistant] = None, _thread: Thread = None, _tools: list[Union[AssistantTool, Callable]] = None, _instructions: str = None, @@ -85,19 +79,14 @@ def wrapper( **kwargs, ): p_fn = prefect_flow(fn) - flow_assistant = _assistant or assistant - flow_thread = ( - _thread - or thread - or (flow_assistant.default_thread if flow_assistant else None) - or Thread() - ) + flow_assistants = _assistants or assistants + flow_thread = _thread or thread or Thread() flow_instructions = _instructions or instructions flow_tools = _tools or tools flow_model = _model or model flow_obj = AIFlow( thread=flow_thread, - assistant=flow_assistant, + assistants=flow_assistants, tools=flow_tools, instructions=flow_instructions, model=flow_model, @@ -113,13 +102,23 @@ def wrapper( return wrapper -def get_messages(limit: int = None) -> list[Message]: +def get_flow() -> AIFlow: """ - Loads messages from the flow's thread. + Loads the flow from the context. Will error if no flow is found in the context. """ flow: Optional[AIFlow] = ctx.get("flow") if not flow: raise ValueError("No flow found in context") + return flow + + +def get_flow_messages(limit: int = None) -> list[Message]: + """ + Loads messages from the flow's thread. + + Will error if no flow is found in the context. + """ + flow = get_flow() return flow.thread.get_messages(limit=limit) diff --git a/src/control_flow/settings.py b/src/control_flow/settings.py index 9e46653b..8b8af737 100644 --- a/src/control_flow/settings.py +++ b/src/control_flow/settings.py @@ -1,5 +1,6 @@ import os +from pydantic import Field from pydantic_settings import BaseSettings, SettingsConfigDict @@ -17,10 +18,29 @@ class ControlFlowSettings(BaseSettings): ) +class PrefectSettings(ControlFlowSettings): + """ + All settings here are used as defaults for Prefect, unless overridden by env vars. + """ + + PREFECT_LOGGING_LEVEL: str = "WARNING" + + def apply(self): + import os + + for k, v in self.model_dump().items(): + if k not in os.environ: + os.environ[k] = v + + class Settings(ControlFlowSettings): assistant_model: str = "gpt-4-1106-preview" max_agent_iterations: int = 10 - use_prefect: bool = True + prefect: PrefectSettings = Field(default_factory=PrefectSettings) + + def __init__(self, **data): + super().__init__(**data) + self.prefect.apply() settings = Settings() diff --git a/tests/flows/test_sign_guestbook.py b/tests/flows/test_sign_guestbook.py new file mode 100644 index 00000000..b3d816cf --- /dev/null +++ b/tests/flows/test_sign_guestbook.py @@ -0,0 +1,47 @@ +from control_flow import Assistant, run_ai_task +from control_flow.flow import ai_flow + +# define assistants + +a = Assistant(name="a") +b = Assistant(name="b") +c = Assistant(name="c") + + +# define tools + +GUESTBOOK = [] + + +def sign(name): + """sign your name in the guestbook""" + GUESTBOOK.append(name) + + +def view_guestbook(): + """view the guestbook""" + return GUESTBOOK + + +# define flow + + +@ai_flow +def guestbook_flow(): + run_ai_task( + """ + Add your name to the list using the `sign` tool. All assistants must + sign their names for the task to be complete. You can read the sign to + see if that has happened yet. You can not sign for another assistant. + """, + assistants=[a, b, c], + tools=[sign, view_guestbook], + ) + + +# run test + + +def test(): + guestbook_flow() + assert GUESTBOOK == ["a", "b", "c"]