Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merging many changes #16

Merged
merged 3 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,8 @@ if __name__ == "__main__":
demo()
```





<img width="1491" alt="image" src="https://github.com/jlowin/control_flow/assets/153965/43b7278b-7bcf-4d65-b219-c3a20f62a179">
21 changes: 21 additions & 0 deletions examples/choose_a_number.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from control_flow import Agent, Task, ai_flow

a1 = Agent(name="A1", instructions="You struggle to make decisions.")
a2 = Agent(
name="A2",
instructions="You like to make decisions.",
)


@ai_flow
def demo():
task = Task("Choose a number between 1 and 100", agents=[a1, a2], result_type=int)

while task.is_incomplete():
a1.run(task)
a2.run(task)

return task


demo()
38 changes: 38 additions & 0 deletions examples/pineapple_pizza.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from control_flow import Agent, Task, ai_flow
from control_flow.instructions import instructions

a1 = Agent(
name="Half-full",
instructions="You are an ardent fan and hype-man of whatever topic"
" the user asks you for information on."
" Purely positive, though thorough in your debating skills.",
)
a2 = Agent(
name="Half-empty",
instructions="You are a critic and staunch detractor of whatever topic"
" the user asks you for information on."
" Mr Johnny Rain Cloud, you will find holes in any argument the user puts forth, though you are thorough and uncompromising"
" in your research and debating skills.",
)


@ai_flow
def demo():
user_message = "pineapple on pizza"

with instructions("one sentence max"):
task = Task(
"All agents must give an argument based on the user message",
agents=[a1, a2],
context={"user_message": user_message},
)
task.run_until_complete()

task2 = Task(
"Post a message saying which argument about the user message is more compelling?"
)
while task2.is_incomplete():
task2.run(agents=[Agent(instructions="you always pick a side")])


demo()
1 change: 1 addition & 0 deletions src/control_flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# from .agent_old import ai_task, Agent, run_ai
from .core.flow import Flow
from .core.agent import Agent
from .core.task import Task
from .core.controller.controller import Controller
from .instructions import instructions
from .dx import ai_flow, run_ai, ai_task
Empty file.
42 changes: 42 additions & 0 deletions src/control_flow/agents/agents.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import marvin

from control_flow.core.agent import Agent
from control_flow.instructions import get_instructions
from control_flow.utilities.context import ctx
from control_flow.utilities.threads import get_history


def choose_agent(
agents: list[Agent],
instructions: str = None,
context: dict = None,
model: str = None,
) -> Agent:
"""
Given a list of potential agents, choose the most qualified assistant to complete the tasks.
"""

instructions = get_instructions()
history = []
if (flow := ctx.get("flow")) and flow.thread.id:
history = get_history(thread_id=flow.thread.id)

info = dict(
history=history,
global_instructions=instructions,
context=context,
)

agent = marvin.classify(
info,
agents,
instructions="""
Given the conversation context, choose the AI agent most
qualified to take the next turn at completing the tasks. Take into
account the instructions, each agent's own instructions, and the
tools they have available.
""",
model_kwargs=dict(model=model),
)

return agent
22 changes: 15 additions & 7 deletions src/control_flow/core/agent.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import logging
from enum import Enum
from typing import Callable

from marvin.utilities.asyncio import ExposeSyncMethodsMixin, expose_sync_method
from marvin.utilities.tools import tool_from_function
from pydantic import Field

from control_flow.core.flow import get_flow
from control_flow.core.task import Task
from control_flow.utilities.prefect import (
wrap_prefect_tool,
)
Expand All @@ -14,12 +16,8 @@
logger = logging.getLogger(__name__)


class AgentStatus(Enum):
INCOMPLETE = "incomplete"
COMPLETE = "complete"


class Agent(Assistant, ControlFlowModel):
class Agent(Assistant, ControlFlowModel, ExposeSyncMethodsMixin):
name: str = "Agent"
user_access: bool = Field(
False,
description="If True, the agent is given tools for interacting with a human user.",
Expand All @@ -35,3 +33,13 @@ def get_tools(self) -> list[AssistantTool | Callable]:
tools.append(tool_from_function(talk_to_human))

return [wrap_prefect_tool(tool) for tool in tools]

@expose_sync_method("run")
async def run_async(self, tasks: list[Task] | Task | None = None):
from control_flow.core.controller import Controller

if isinstance(tasks, Task):
tasks = [tasks]

controller = Controller(agents=[self], tasks=tasks or [], flow=get_flow())
return await controller.run_agent_async(agent=self)
91 changes: 42 additions & 49 deletions src/control_flow/core/controller/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,8 @@
from pydantic import BaseModel, Field, field_validator, model_validator

from control_flow.core.agent import Agent
from control_flow.core.controller.delegation import (
DelegationStrategy,
RoundRobin,
)
from control_flow.core.flow import Flow
from control_flow.core.task import Task, TaskStatus
from control_flow.core.task import Task
from control_flow.instructions import get_instructions as get_context_instructions
from control_flow.utilities.prefect import (
create_json_artifact,
Expand All @@ -30,20 +26,26 @@


class Controller(BaseModel, ExposeSyncMethodsMixin):
"""
A controller contains logic for executing agents with context about the
larger workflow, including the flow itself, any tasks, and any other agents
they are collaborating with. The controller is responsible for orchestrating
agent behavior by generating instructions and tools for each agent. Note
that while the controller accepts details about (potentially multiple)
agents and tasks, it's responsiblity is to invoke one agent one time. Other
mechanisms should be used to orchestrate multiple agents invocations. This
is done by the controller to avoid tying e.g. agents to tasks or even a
specific flow.

"""

flow: Flow
agents: list[Agent]
tasks: list[Task] = Field(
description="Tasks that the controller will complete.",
default_factory=list,
)
delegation_strategy: DelegationStrategy = Field(
validate_default=True,
description="The strategy for delegating work to assistants.",
default_factory=RoundRobin,
)
# termination_strategy: TerminationStrategy
context: dict = {}
instructions: str = None
model_config: dict = dict(extra="forbid")

@field_validator("agents", mode="before")
Expand All @@ -58,48 +60,18 @@ def _add_tasks_to_flow(self) -> Self:
self.flow.add_task(task)
return self

@expose_sync_method("run")
async def run_async(self):
@expose_sync_method("run_agent")
async def run_agent_async(self, agent: Agent):
"""
Run the control flow.
"""
if agent not in self.agents:
raise ValueError("Agent not found in controller agents.")

# continue as long as there are incomplete tasks
while any([t for t in self.tasks if t.status == TaskStatus.PENDING]):
# select the next agent
if len(self.agents) > 1:
agent = self.delegation_strategy(self.agents)
else:
agent = self.agents[0]
if not agent:
return

# run the agent
task = await self._get_prefect_run_agent_task(agent)
task(agent=agent)

async def _get_prefect_run_agent_task(
self, agent: Agent, thread: Thread = None
) -> Callable:
@prefect_task(task_run_name=f'Run Agent: "{agent.name}"')
async def _run_agent(agent: Agent, thread: Thread = None):
run = await self.run_agent(agent=agent, thread=thread)

create_json_artifact(
key="messages",
data=[m.model_dump() for m in run.messages],
description="All messages sent and received during the run.",
)
create_json_artifact(
key="actions",
data=[s.model_dump() for s in run.steps],
description="All actions taken by the assistant during the run.",
)
return run

return _run_agent
task = await self._get_prefect_run_agent_task(agent)
await task(agent=agent)

async def run_agent(self, agent: Agent, thread: Thread = None) -> Run:
async def _run_agent(self, agent: Agent, thread: Thread = None) -> Run:
"""
Run a single agent.
"""
Expand Down Expand Up @@ -142,6 +114,27 @@ async def run_agent(self, agent: Agent, thread: Thread = None) -> Run:

return run

async def _get_prefect_run_agent_task(
self, agent: Agent, thread: Thread = None
) -> Callable:
@prefect_task(task_run_name=f'Run Agent: "{agent.name}"')
async def _run_agent(agent: Agent, thread: Thread = None):
run = await self._run_agent(agent=agent, thread=thread)

create_json_artifact(
key="messages",
data=[m.model_dump() for m in run.messages],
description="All messages sent and received during the run.",
)
create_json_artifact(
key="actions",
data=[s.model_dump() for s in run.steps],
description="All actions taken by the assistant during the run.",
)
return run

return _run_agent

def task_ids(self) -> dict[Task, int]:
return {task: self.flow.get_task_id(task) for task in self.tasks}

Expand Down
Loading
Loading