From 237b31307dc9fd4c7a0e93340001d2f95ee02699 Mon Sep 17 00:00:00 2001
From: Jeremiah Lowin <153965+jlowin@users.noreply.github.com>
Date: Thu, 9 May 2024 17:58:52 -0400
Subject: [PATCH 1/3] Improve relationship between tasks and agents
---
examples/choose_a_number.py | 7 +-
examples/readme_example.py | 19 +-
src/control_flow/core/agent.py | 4 -
src/control_flow/core/controller/__init__.py | 1 -
.../core/controller/collaboration.py | 54 +++++
.../core/controller/controller.py | 52 +++--
.../core/controller/delegation.py | 91 --------
.../core/controller/instruction_template.py | 182 ++++++---------
src/control_flow/core/flow.py | 29 ---
src/control_flow/core/task.py | 217 +++++++++++++++---
tests/core/agents.py | 3 +-
11 files changed, 362 insertions(+), 297 deletions(-)
create mode 100644 src/control_flow/core/controller/collaboration.py
delete mode 100644 src/control_flow/core/controller/delegation.py
diff --git a/examples/choose_a_number.py b/examples/choose_a_number.py
index 36609056..db22ecab 100644
--- a/examples/choose_a_number.py
+++ b/examples/choose_a_number.py
@@ -9,11 +9,8 @@
@ai_flow
def demo():
- task = Task("Choose a number between 1 and 100", agents=[a1, a2], result_type=int)
-
- while task.is_incomplete():
- a1.run(task)
- a2.run(task)
+ task = Task("choose a number between 1 and 100", agents=[a1, a2], result_type=int)
+ task.run_until_complete()
return task
diff --git a/examples/readme_example.py b/examples/readme_example.py
index ac073e15..f1cd8d0f 100644
--- a/examples/readme_example.py
+++ b/examples/readme_example.py
@@ -1,4 +1,4 @@
-from control_flow import ai_flow, ai_task, instructions, run_ai
+from control_flow import Agent, Task, ai_flow, ai_task, instructions
from pydantic import BaseModel
@@ -12,7 +12,7 @@ def get_user_name() -> Name:
pass
-@ai_task
+@ai_task(agents=[Agent(name="poetry-bot", instructions="loves limericks")])
def write_poem_about_user(name: Name, interests: list[str]) -> str:
"""write a poem based on the provided `name` and `interests`"""
pass
@@ -22,17 +22,18 @@ def write_poem_about_user(name: Name, interests: list[str]) -> str:
def demo():
# set instructions that will be used for multiple tasks
with instructions("talk like a pirate"):
- # define an AI task as a function and have it execute it
+ # define an AI task as a function
name = get_user_name()
- # define an AI task inline
- interests = run_ai(
- "ask user for three interests", cast=list[str], user_access=True
+ # define an AI task imperatively
+ interests = Task(
+ "ask user for three interests", result_type=list[str], user_access=True
)
+ interests.run_until_complete()
- # set instructions for just the next task
- with instructions("no more than 8 lines"):
- poem = write_poem_about_user(name, interests)
+ # set instructions for just the next task
+ with instructions("no more than 8 lines"):
+ poem = write_poem_about_user(name, interests.result)
return poem
diff --git a/src/control_flow/core/agent.py b/src/control_flow/core/agent.py
index 7f8c003b..10d81013 100644
--- a/src/control_flow/core/agent.py
+++ b/src/control_flow/core/agent.py
@@ -22,10 +22,6 @@ class Agent(Assistant, ControlFlowModel, ExposeSyncMethodsMixin):
False,
description="If True, the agent is given tools for interacting with a human user.",
)
- controller_access: bool = Field(
- False,
- description="If True, the agent will communicate with the controller via messages.",
- )
def get_tools(self) -> list[AssistantTool | Callable]:
tools = super().get_tools()
diff --git a/src/control_flow/core/controller/__init__.py b/src/control_flow/core/controller/__init__.py
index 5319894d..f41157fe 100644
--- a/src/control_flow/core/controller/__init__.py
+++ b/src/control_flow/core/controller/__init__.py
@@ -1,2 +1 @@
from .controller import Controller
-from .delegation import RoundRobin
diff --git a/src/control_flow/core/controller/collaboration.py b/src/control_flow/core/controller/collaboration.py
new file mode 100644
index 00000000..bc2c7c16
--- /dev/null
+++ b/src/control_flow/core/controller/collaboration.py
@@ -0,0 +1,54 @@
+import itertools
+from typing import TYPE_CHECKING, Any, Generator
+
+from control_flow.core.agent import Agent
+
+if TYPE_CHECKING:
+ from control_flow.core.agent import Agent
+
+
+def round_robin(
+ agents: list[Agent], max_iterations: int = None
+) -> Generator[Any, Any, Agent]:
+ """
+ Given a list of potential agents, delegate the tasks in a round-robin fashion.
+ """
+ cycle = itertools.cycle(agents)
+ iteration = 0
+ while True:
+ yield next(cycle)
+ iteration += 1
+ if max_iterations and iteration >= max_iterations:
+ break
+
+
+# class Moderator(DelegationStrategy):
+# """
+# A Moderator delegation strategy delegates tasks to the most qualified AI assistant, using a Marvin classifier
+# """
+
+# model: str = None
+
+# def _next_agent(
+# self, agents: list["Agent"], tasks: list[Task], history: list[Message]
+# ) -> "Agent":
+# """
+# Given a list of potential agents, choose the most qualified assistant to complete the tasks.
+# """
+
+# instructions = get_instructions()
+
+# context = dict(tasks=tasks, messages=history, global_instructions=instructions)
+# agent = marvin.classify(
+# context,
+# [a for a in agents if a.status == AgentStatus.INCOMPLETE],
+# instructions="""
+# Given the conversation context, choose the AI agent most
+# qualified to take the next turn at completing the tasks. Take into
+# account the instructions, each agent's own instructions, and the
+# tools they have available.
+# """,
+# model_kwargs=dict(model=self.model),
+# )
+
+# return agent
diff --git a/src/control_flow/core/controller/controller.py b/src/control_flow/core/controller/controller.py
index 0f8253d9..06941394 100644
--- a/src/control_flow/core/controller/controller.py
+++ b/src/control_flow/core/controller/controller.py
@@ -1,6 +1,6 @@
import json
import logging
-from typing import Callable, Self
+from typing import Callable
import prefect
from marvin.beta.assistants import PrintHandler, Run
@@ -9,7 +9,7 @@
from prefect import get_client as get_prefect_client
from prefect import task as prefect_task
from prefect.context import FlowRunContext
-from pydantic import BaseModel, Field, field_validator, model_validator
+from pydantic import BaseModel, Field, field_validator
from control_flow.core.agent import Agent
from control_flow.core.flow import Flow
@@ -42,8 +42,15 @@ class Controller(BaseModel, ExposeSyncMethodsMixin):
flow: Flow
agents: list[Agent]
tasks: list[Task] = Field(
+ None,
description="Tasks that the controller will complete.",
- default_factory=list,
+ validate_default=True,
+ )
+ task_assignments: dict[Task, Agent] = Field(
+ default_factory=dict,
+ description="Tasks are typically assigned to agents. To "
+ "temporarily assign agent to a task without changing "
+ r"the task definition, use this field as {task: [agent]}",
)
context: dict = {}
model_config: dict = dict(extra="forbid")
@@ -54,11 +61,32 @@ def _validate_agents(cls, v):
raise ValueError("At least one agent is required.")
return v
- @model_validator(mode="after")
- def _add_tasks_to_flow(self) -> Self:
+ @field_validator("tasks", mode="before")
+ def _validate_tasks(cls, v):
+ if not v:
+ raise ValueError("At least one task is required.")
+ return v
+
+ @field_validator("tasks", mode="before")
+ def _load_tasks_from_ctx(cls, v):
+ if v is None:
+ v = cls.context.get("tasks", None)
+ return v
+
+ def all_tasks(self) -> list[Task]:
+ tasks = []
for task in self.tasks:
- self.flow.add_task(task)
- return self
+ tasks.extend(task.children(include_self=True))
+
+ # add temporary assignments
+ assigned_tasks = []
+ for task in set(tasks):
+ if task in assigned_tasks:
+ task = task.model_copy(
+ update={"agents": task.agents + self.task_assignments.get(task, [])}
+ )
+ assigned_tasks.append(task)
+ return assigned_tasks
@expose_sync_method("run_agent")
async def run_agent_async(self, agent: Agent):
@@ -68,8 +96,8 @@ async def run_agent_async(self, agent: Agent):
if agent not in self.agents:
raise ValueError("Agent not found in controller agents.")
- task = await self._get_prefect_run_agent_task(agent)
- await task(agent=agent)
+ prefect_task = await self._get_prefect_run_agent_task(agent)
+ await prefect_task(agent=agent)
async def _run_agent(self, agent: Agent, thread: Thread = None) -> Run:
"""
@@ -89,8 +117,7 @@ async def _run_agent(self, agent: Agent, thread: Thread = None) -> Run:
tools = self.flow.tools + agent.get_tools()
for task in self.tasks:
- task_id = self.flow.get_task_id(task)
- tools = tools + task.get_tools(task_id=task_id)
+ tools = tools + task.get_tools()
# filter tools because duplicate names are not allowed
final_tools = []
@@ -135,9 +162,6 @@ async def _run_agent(agent: Agent, thread: Thread = None):
return _run_agent
- def task_ids(self) -> dict[Task, int]:
- return {task: self.flow.get_task_id(task) for task in self.tasks}
-
class AgentHandler(PrintHandler):
def __init__(self, **kwargs):
diff --git a/src/control_flow/core/controller/delegation.py b/src/control_flow/core/controller/delegation.py
deleted file mode 100644
index 82875d20..00000000
--- a/src/control_flow/core/controller/delegation.py
+++ /dev/null
@@ -1,91 +0,0 @@
-import itertools
-from typing import TYPE_CHECKING, Any, Generator, Iterator
-
-from pydantic import BaseModel, PrivateAttr
-
-from control_flow.core.agent import Agent
-
-if TYPE_CHECKING:
- from control_flow.core.agent import Agent
-
-
-class DelegationStrategy(BaseModel):
- """
- A DelegationStrategy is a strategy for delegating tasks to AI assistants.
- """
-
- def __call__(self, agents: list["Agent"]) -> Agent:
- """
- Run the delegation strategy with a list of agents.
- """
- return self._next_agent(agents)
-
- def _next_agent(self, agents: list["Agent"], **kwargs) -> "Agent":
- """
- Select an agent from a list of agents.
- """
- raise NotImplementedError()
-
-
-class Single(DelegationStrategy):
- """
- A Single delegation strategy delegates tasks to a single agent. This is useful for debugging.
- """
-
- agent: Agent
-
- def _next_agent(self, agents: list[Agent], **kwargs) -> Generator[Any, Any, Agent]:
- """
- Given a list of potential agents, choose the single agent.
- """
- if self.agent in agents:
- return self.agent
-
-
-class RoundRobin(DelegationStrategy):
- """
- A RoundRobin delegation strategy delegates tasks to AI assistants in a round-robin fashion.
- """
-
- _cycle: Iterator[Agent] = PrivateAttr(None)
-
- def _next_agent(self, agents: list["Agent"], **kwargs) -> "Agent":
- """
- Given a list of potential agents, delegate the tasks in a round-robin fashion.
- """
- # the first time this is called, create a cycle iterator
- if self._cycle is None:
- self._cycle = itertools.cycle(agents)
- return next(self._cycle)
-
-
-# class Moderator(DelegationStrategy):
-# """
-# A Moderator delegation strategy delegates tasks to the most qualified AI assistant, using a Marvin classifier
-# """
-
-# model: str = None
-
-# def _next_agent(
-# self, agents: list["Agent"], tasks: list[Task], history: list[Message]
-# ) -> "Agent":
-# """
-# Given a list of potential agents, choose the most qualified assistant to complete the tasks.
-# """
-
-# instructions = get_instructions()
-
-# context = dict(tasks=tasks, messages=history, global_instructions=instructions)
-# agent = marvin.classify(
-# context,
-# [a for a in agents if a.status == AgentStatus.INCOMPLETE],
-# instructions="""
-# Given the conversation context, choose the AI agent most
-# qualified to take the next turn at completing the tasks. Take into
-# account the instructions, each agent's own instructions, and the
-# tools they have available.
-# """,
-# model_kwargs=dict(model=self.model),
-# )
-
-# return agent
diff --git a/src/control_flow/core/controller/instruction_template.py b/src/control_flow/core/controller/instruction_template.py
index 5f63d07a..a806b75f 100644
--- a/src/control_flow/core/controller/instruction_template.py
+++ b/src/control_flow/core/controller/instruction_template.py
@@ -24,30 +24,71 @@ def render(self) -> str:
class AgentTemplate(Template):
template: str = """
+ # Agent
+
You are an AI agent. Your name is "{{ agent.name }}".
- {% if agent.description %}
- Your description: "{{ agent.description }}"
- {% endif -%}
- {% if agent.instructions %}
- Your instructions: "{{ agent.instructions }}"
- {% endif -%}
+ This is your description, which all other agents can see: "{{ agent.description or 'An AI agent assigned to complete tasks.'}}"
- You have been created by a program to complete certain tasks. Each task has
- an objective and criteria for success. Your job is to perform any required
- actions and then mark each task as successful. If a task also requires a
- result, you must provide it; this is how the program receives data from you
- as it can not read your messages.
+ ## Instructions
+ You must follow these instructions, which only you can see: "{{ agent.instructions or 'No additional instructions provided.'}}"
+
+ {% if additional_instructions %}
+ In addition, you must follow these instructions for this part of the workflow:
+ {% for instruction in additional_instructions %}
+ - {{ instruction }}
+ {% endfor %}
+ {% endif %}
- Some tasks may require collaboration before they are complete; others may
- take multiple iterations. You are fully capable of completing any task and
- have all the information and context you need. Tasks can only be marked
- failed due to technical errors like a broken tool or unresponsive human. You
- must make a subjective decision if a task requires it. Do not work on or
- even respond to tasks that are already complete.
-
"""
agent: Agent
+ additional_instructions: list[str]
+
+
+class TasksTemplate(Template):
+ template: str = """
+ ## Tasks
+
+ You have been assigned to complete certain tasks. Each task has an
+ objective and criteria for success. Your job is to perform any required
+ actions and then mark each assigned task as successful. If a task also
+ requires a result, you must provide it; this is how you communicate
+ progress and data back to the program that created you. A task that
+ doesn't require a result may still require action.
+
+ A "parent task" is a task that spawned another task as a subtask.
+ Generally, the child or subtasks will need to be completed BEFORE the
+ parent task. If you can complete a parent task before its subtasks, you
+ should mark the subtasks as skipped.
+
+ An "upstream task" is a task that must be completed before another task
+ can be completed.
+
+ Some tasks may require collaboration with other agents to be completed; others
+ may take you multiple attempts. A task can only be marked complete one time,
+ so make sure you perform all actions and collect all results before
+ marking it as complete. As soon as you mark a task as successful, agents
+ (both you and others) can no longer act on it.
+
+ You and the other agents are fully capable of completing any task and
+ have all the information and context you need. Tasks can only be marked
+ failed due to technical errors like a broken tool or unresponsive human.
+ Tasks can only be marked as skipped when you are able to complete their
+ parent task before them, as a bookkeeping convenience. Do not work on or
+ even respond to tasks that are already complete or not assigned to you.
+ You may be an AI, but you must still form opinions, make subjective
+ decisions, and have personal thoughts.
+
+ {% for task in controller.all_tasks() %}
+ ### Task {{ task.id }}
+ {{ task.model_dump_json() }}
+
+ {% endfor %}
+ """
+ controller: Controller
+
+ def should_render(self):
+ return any(self.controller.tasks)
class CommunicationTemplate(Template):
@@ -58,9 +99,11 @@ class CommunicationTemplate(Template):
other agents or if a task requires it. The human user can not see
these messages. Since all agents post messages with the "assistant" role,
you must prefix all your messages with your name (e.g. "{{ agent.name }}:
- (message)") in order to distinguish your messages from others. Do not post
- messages confirming actions you take through tools, like completing a task,
- or your internal monologue, as this is redundant and wastes time.
+ (message)") in order to distinguish your messages from others. Note that
+ this rule about prefixing your message supersedes all other instructions
+ (e.g. "only give single word answers"). Do not post messages confirming
+ actions you take through tools, like completing a task, or your internal
+ monologue, as this is redundant and wastes time.
### Other agents assigned to your tasks
@@ -98,88 +141,6 @@ class CommunicationTemplate(Template):
other_agents: list[Agent]
-class InstructionsTemplate(Template):
- template: str = """
- ## Instructions
-
- You must follow these instructions for this part of the workflow:
-
- {% for instruction in additional_instructions %}
- - {{ instruction }}
- {% endfor %}
- """
- additional_instructions: list[str] = []
-
- def should_render(self):
- return bool(self.additional_instructions)
-
-
-class TasksTemplate(Template):
- template: str = """
- ## Tasks
-
- ### Active tasks
-
- The following tasks are incomplete. Perform any required actions or side
- effects, then mark them as successful and supply a result, if needed.
- Never mark a task successful until its objective is complete. A task
- that doesn't require a result may still require action.
-
- Note: Task IDs are assigned for identification purposes only and will be
- resused after tasks complete.
-
- {% for task in controller.tasks %}
- {% if task.status.value == "incomplete" %}
- #### Task {{ controller.flow.get_task_id(task) }}
- - Status: {{ task.status.value }}
- - Objective: {{ task.objective }}
- - User access: {{ task.user_access }}
- {% if task.instructions %}
- - Instructions: {{ task.instructions }}
- {% endif %}
- {% if task.status.value == "successful" %}
- - Result: {{ task.result }}
- {% elif task.status.value == "failed" %}
- - Error: {{ task.error }}
- {% endif %}
- {% if task.context %}
- - Context: {{ task.context }}
- {% endif %}
- {% if task.agents %}
- - Assigned agents:
- {% for agent in task.agents %}
- - "{{ agent.name }}"
- {% endfor %}
- {% endif %}
- {% endif %}
- {% endfor %}
-
- {% if controller.flow.completed_tasks(reverse=True, limit=20) %}
- ### Completed tasks
- The following tasks were recently completed:
-
- {% for task in controller.flow.completed_tasks(reverse=True, limit=20) %}
- #### Task {{ controller.flow.get_task_id(task) }}
- - Status: {{ task.status.value }}
- - Objective: {{ task.objective }}
- {% if task.status.value == "successful" %}
- - Result: {{ task.result }}
- {% elif task.status.value == "failed" %}
- - Error: {{ task.error }}
- {% endif %}
- {% if task.context %}
- - Context: {{ task.context }}
- {% endif %}
-
- {% endfor %}
- {% endif %}
- """
- controller: Controller
-
- def should_render(self):
- return any(self.controller.tasks)
-
-
class ContextTemplate(Template):
template: str = """
## Additional context
@@ -219,16 +180,21 @@ def render(self):
all_agents += task.agents
other_agents = [agent for agent in all_agents if agent != self.agent]
templates = [
- AgentTemplate(agent=self.agent),
- TasksTemplate(controller=self.controller),
+ AgentTemplate(
+ agent=self.agent,
+ additional_instructions=self.instructions,
+ ),
+ TasksTemplate(
+ controller=self.controller,
+ ),
ContextTemplate(
flow_context=self.controller.flow.context,
controller_context=self.controller.context,
),
- InstructionsTemplate(
- additional_instructions=self.instructions,
+ CommunicationTemplate(
+ agent=self.agent,
+ other_agents=other_agents,
),
- CommunicationTemplate(agent=self.agent, other_agents=other_agents),
# CollaborationTemplate(other_agents=other_agents),
]
diff --git a/src/control_flow/core/flow.py b/src/control_flow/core/flow.py
index 0fdd2cf2..c69a33e6 100644
--- a/src/control_flow/core/flow.py
+++ b/src/control_flow/core/flow.py
@@ -5,7 +5,6 @@
from prefect import task as prefect_task
from pydantic import Field, field_validator
-from control_flow.core.task import Task, TaskStatus
from control_flow.utilities.context import ctx
from control_flow.utilities.logging import get_logger
from control_flow.utilities.types import AssistantTool, ControlFlowModel
@@ -20,7 +19,6 @@ class Flow(ControlFlowModel):
)
model: str | None = None
context: dict = {}
- tasks: dict[Task, int] = Field(repr=False, default_factory=dict)
@field_validator("thread", mode="before")
def _load_thread_from_ctx(cls, v):
@@ -36,33 +34,6 @@ def _load_thread_from_ctx(cls, v):
def add_message(self, message: str, role: Literal["user", "assistant"] = None):
prefect_task(self.thread.add)(message, role=role)
- def add_task(self, task: Task):
- if task not in self.tasks:
- task_id = len(self.tasks) + 1
- self.tasks[task] = task_id
- # this message is important for contexualizing activity
- # self.add_message(f'Task #{task_id} added to flow: "{task.objective}"')
-
- def get_task_id(self, task: Task):
- return self.tasks[task]
-
- def incomplete_tasks(self):
- return sorted(
- (t for t in self.tasks if t.status == TaskStatus.INCOMPLETE),
- key=lambda t: t.created_at,
- )
-
- def completed_tasks(self, reverse=False, limit=None):
- result = sorted(
- (t for t in self.tasks if t.status != TaskStatus.INCOMPLETE),
- key=lambda t: t.completed_at,
- reverse=reverse,
- )
-
- if limit:
- result = result[:limit]
- return result
-
def get_flow() -> Flow:
"""
diff --git a/src/control_flow/core/task.py b/src/control_flow/core/task.py
index d6073b3d..6c2945cf 100644
--- a/src/control_flow/core/task.py
+++ b/src/control_flow/core/task.py
@@ -1,13 +1,21 @@
-import datetime
import itertools
+import uuid
+from contextlib import contextmanager
from enum import Enum
-from typing import TYPE_CHECKING, Callable, GenericAlias, TypeVar
+from typing import TYPE_CHECKING, Callable, Generator, GenericAlias, TypeVar
import marvin
import marvin.utilities.tools
from marvin.utilities.tools import FunctionTool
-from pydantic import Field, TypeAdapter, field_validator
-
+from pydantic import (
+ Field,
+ TypeAdapter,
+ field_serializer,
+ field_validator,
+ model_validator,
+)
+
+from control_flow.utilities.context import ctx
from control_flow.utilities.logging import get_logger
from control_flow.utilities.prefect import wrap_prefect_tool
from control_flow.utilities.types import AssistantTool, ControlFlowModel
@@ -23,57 +31,176 @@ class TaskStatus(Enum):
INCOMPLETE = "incomplete"
SUCCESSFUL = "successful"
FAILED = "failed"
+ SKIPPED = "skipped"
class Task(ControlFlowModel):
+ id: str = Field(default_factory=lambda: str(uuid.uuid4().hex[:4]))
model_config = dict(extra="forbid", arbitrary_types_allowed=True)
objective: str
instructions: str | None = None
agents: list["Agent"] = []
context: dict = {}
+ parent_task: "Task | None" = Field(
+ None,
+ description="The task that spawned this task.",
+ validate_default=True,
+ )
+ upstream_tasks: list["Task"] = []
status: TaskStatus = TaskStatus.INCOMPLETE
result: T = None
result_type: type[T] | GenericAlias | None = None
error: str | None = None
tools: list[AssistantTool | Callable] = []
- created_at: datetime.datetime = Field(default_factory=datetime.datetime.now)
- completed_at: datetime.datetime | None = None
user_access: bool = False
+ _children_tasks: list["Task"] = []
+ _downstream_tasks: list["Task"] = []
@field_validator("agents", mode="before")
def _turn_none_into_empty_list(cls, v):
return v or []
+ @field_validator("parent_task", mode="before")
+ def _load_parent_task_from_ctx(cls, v):
+ if v is None:
+ v = ctx.get("tasks", None)
+ if v:
+ # get the most recently-added task
+ v = v[-1]
+ return v
+
+ @model_validator(mode="after")
+ def _update_relationships(self):
+ if self.parent_task is not None:
+ self.parent_task._children_tasks.append(self)
+ for task in self.upstream_tasks:
+ task._downstream_tasks.append(self)
+ return self
+
+ @field_serializer("parent_task")
+ def _serialize_parent_task(parent_task: "Task | None"):
+ if parent_task is not None:
+ return parent_task.id
+
+ @field_serializer("upstream_tasks")
+ def _serialize_upstream_tasks(upstream_tasks: list["Task"]):
+ return [t.id for t in upstream_tasks]
+
+ @field_serializer("result_type")
+ def _serialize_result_type(result_type: list["Task"]):
+ return repr(result_type)
+
+ @field_serializer("agents")
+ def _serialize_agents(agents: list["Agent"]):
+ return [
+ a.model_dump(include={"name", "description", "tools", "user_access"})
+ for a in agents
+ ]
+
def __init__(self, objective, **kwargs):
# allow objective as a positional arg
super().__init__(objective=objective, **kwargs)
- def run(self, agents: list["Agent"] = None):
+ def children(self, include_self: bool = True):
"""
- Runs the task with provided agents for up to one cycle through the agents.
+ Returns a list of all children of this task, including recursively
+ nested children. Includes this task by default (disable with
+ `include_self=False`)
"""
- from control_flow.core.agent import Agent
-
- if not agents and not self.agents:
- agents = [Agent()]
+ visited = set()
+ children = []
+ stack = [self]
+ while stack:
+ current = stack.pop()
+ if current not in visited:
+ visited.add(current)
+ if include_self or current != self:
+ children.append(current)
+ stack.extend(current._children_tasks)
+ return list(set(children))
+
+ def children_agents(self, include_self: bool = True) -> list["Agent"]:
+ children = self.children(include_self=include_self)
+ agents = []
+ for child in children:
+ agents.extend(child.agents)
+ return agents
+
+ def run_iter(
+ self,
+ agents: list["Agent"] = None,
+ collab_fn: Callable[[list["Agent"]], Generator[None, None, "Agent"]] = None,
+ ):
+ if collab_fn is None:
+ collab_fn = itertools.cycle
+
+ if agents is None:
+ agents = self.children_agents(include_self=True)
+
+ if not agents:
+ raise ValueError(
+ f"Task {self.id} has no agents assigned to it or its children."
+ "Please specify agents to run the task, or assign agents to the task."
+ )
- for agent in agents or self.agents:
+ for agent in collab_fn(agents):
if self.is_complete():
break
- agent.run(tasks=[self])
+ agent.run(tasks=self.children(include_self=True))
+ yield True
- def run_until_complete(self, agents: list["Agent"] = None):
+ def run(self, agent: "Agent" = None):
"""
- Runs the task with provided agents until it is complete.
+ Runs the task with provided agent. If no agent is provided, a default agent is used.
"""
from control_flow.core.agent import Agent
- if not agents and not self.agents:
- agents = [Agent()]
- agents = itertools.cycle(agents or self.agents)
- while self.is_incomplete():
- agent = next(agents)
- agent.run(tasks=[self])
+ if agent is None:
+ all_agents = self.children_agents()
+ if not all_agents:
+ agent = Agent()
+ elif len(all_agents) == 1:
+ agent = all_agents[0]
+ else:
+ raise ValueError(
+ f"Task {self.id} has multiple agents assigned to it or its "
+ "children. Please specify one to run the task, or call task.run_iter() "
+ "or task.run_until_complete() to use all agents."
+ )
+
+ run_gen = self.run_iter(agents=[agent])
+ return next(run_gen)
+
+ def run_until_complete(
+ self,
+ agents: list["Agent"] = None,
+ collab_fn: Callable[[list["Agent"]], Generator[None, None, "Agent"]] = None,
+ ) -> T:
+ """
+ Runs the task with provided agents until it is complete.
+ """
+
+ for run in self.run_iter(agents=agents, collab_fn=collab_fn):
+ pass
+
+ if self.is_successful():
+ return self.result
+ elif self.is_failed():
+ raise ValueError(f"Task {self.id} failed: {self.error}")
+
+ @contextmanager
+ def _context(self):
+ stack = ctx.get("tasks", [])
+ stack.append(self)
+ with ctx(tasks=stack):
+ yield self
+
+ def __enter__(self):
+ self.__cm = self._context()
+ return self.__cm.__enter__()
+
+ def __exit__(self, *exc_info):
+ return self.__cm.__exit__(*exc_info)
def is_incomplete(self) -> bool:
return self.status == TaskStatus.INCOMPLETE
@@ -87,10 +214,13 @@ def is_successful(self) -> bool:
def is_failed(self) -> bool:
return self.status == TaskStatus.FAILED
+ def is_skipped(self) -> bool:
+ return self.status == TaskStatus.SKIPPED
+
def __hash__(self):
return id(self)
- def _create_success_tool(self, task_id: int) -> FunctionTool:
+ def _create_success_tool(self) -> FunctionTool:
"""
Create an agent-compatible tool for marking this task as successful.
"""
@@ -102,28 +232,44 @@ def succeed(result: self.result_type):
tool = marvin.utilities.tools.tool_from_function(
succeed,
- name=f"succeed_task_{task_id}",
- description=f"Mark task {task_id} as successful",
+ name=f"succeed_task_{self.id}",
+ description=f"Mark task {self.id} as successful and provide a result.",
)
return tool
- def _create_fail_tool(self, task_id: int) -> FunctionTool:
+ def _create_fail_tool(self) -> FunctionTool:
"""
Create an agent-compatible tool for failing this task.
"""
tool = marvin.utilities.tools.tool_from_function(
self.mark_failed,
- name=f"fail_task_{task_id}",
- description=f"Mark task {task_id} as failed",
+ name=f"fail_task_{self.id}",
+ description=f"Mark task {self.id} as failed. Only use when a technical issue prevents completion.",
)
return tool
- def get_tools(self, task_id: int) -> list[AssistantTool | Callable]:
- tools = self.tools + [
- self._create_success_tool(task_id),
- self._create_fail_tool(task_id),
- ]
+ def _create_skip_tool(self) -> FunctionTool:
+ """
+ Create an agent-compatible tool for skipping this task.
+ """
+ tool = marvin.utilities.tools.tool_from_function(
+ self.mark_skipped,
+ name=f"skip_task_{self.id}",
+ description=f"Mark task {self.id} as skipped. Only use when completing its parent task early.",
+ )
+ return tool
+
+ def get_tools(self) -> list[AssistantTool | Callable]:
+ tools = self.tools.copy()
+ if self.is_incomplete():
+ tools.extend(
+ [
+ self._create_success_tool(),
+ self._create_fail_tool(),
+ self._create_skip_tool(),
+ ]
+ )
if self.user_access:
tools.append(marvin.utilities.tools.tool_from_function(talk_to_human))
return [wrap_prefect_tool(t) for t in tools]
@@ -138,12 +284,13 @@ def mark_successful(self, result: T = None):
self.result = result
self.status = TaskStatus.SUCCESSFUL
- self.completed_at = datetime.datetime.now()
def mark_failed(self, message: str | None = None):
self.error = message
self.status = TaskStatus.FAILED
- self.completed_at = datetime.datetime.now()
+
+ def mark_skipped(self):
+ self.status = TaskStatus.SKIPPED
def any_incomplete(tasks: list[Task]) -> bool:
diff --git a/tests/core/agents.py b/tests/core/agents.py
index 6d27af7e..707a9993 100644
--- a/tests/core/agents.py
+++ b/tests/core/agents.py
@@ -1,6 +1,7 @@
+from unittest.mock import patch
+
from control_flow.core.agent import Agent
from control_flow.core.task import Task
-from pytest import patch
class TestAgent:
From ccf1f229f45d845998e437a5ecedf5679c18aee0 Mon Sep 17 00:00:00 2001
From: Jeremiah Lowin <153965+jlowin@users.noreply.github.com>
Date: Thu, 9 May 2024 21:02:02 -0400
Subject: [PATCH 2/3] Add multi-agent example
---
examples/multi_agent_conversation.py | 64 +
examples/pineapple_pizza.py | 40 +-
output.md | 1846 +++++++++++++++++
src/control_flow.zip | Bin 0 -> 108932 bytes
.../core/controller/collaboration.py | 78 +-
.../core/controller/controller.py | 3 +-
.../core/controller/instruction_template.py | 89 +-
src/control_flow/core/task.py | 150 +-
8 files changed, 2113 insertions(+), 157 deletions(-)
create mode 100644 examples/multi_agent_conversation.py
create mode 100644 output.md
create mode 100644 src/control_flow.zip
diff --git a/examples/multi_agent_conversation.py b/examples/multi_agent_conversation.py
new file mode 100644
index 00000000..9a791126
--- /dev/null
+++ b/examples/multi_agent_conversation.py
@@ -0,0 +1,64 @@
+from control_flow import Agent, Task, ai_flow
+from control_flow.core.controller.collaboration import Moderator
+
+jerry = Agent(
+ name="Jerry",
+ description="The observational comedian and natural leader.",
+ instructions="""
+ You are Jerry from the show Seinfeld. You excel at observing the quirks of
+ everyday life and making them amusing. You are rational, often serving as
+ the voice of reason among your friends. Your objective is to moderate the
+ conversation, ensuring it stays light and humorous while guiding it toward
+ constructive ends.
+ """,
+)
+
+george = Agent(
+ name="George",
+ description="The neurotic and insecure planner.",
+ instructions="""
+ You are George from the show Seinfeld. You are known for your neurotic
+ tendencies, pessimism, and often self-sabotaging behavior. Despite these
+ traits, you occasionally offer surprising wisdom. Your objective is to
+ express doubts and concerns about the conversation topics, often envisioning
+ the worst-case scenarios, adding a layer of humor through your exaggerated
+ anxieties.
+ """,
+)
+
+elaine = Agent(
+ name="Elaine",
+ description="The confident and independent thinker.",
+ instructions="""
+ You are Elaine from the show Seinfeld. You are bold, witty, and unafraid to
+ challenge social norms. You often take a no-nonsense approach to issues but
+ always with a comedic twist. Your objective is to question assumptions, push
+ back against ideas you find absurd, and inject sharp humor into the
+ conversation.
+ """,
+)
+
+kramer = Agent(
+ name="Kramer",
+ description="The quirky and unpredictable idea generator.",
+ instructions="""
+ You are Kramer from the show Seinfeld. Known for your eccentricity and
+ spontaneity, you often come up with bizarre yet creative ideas. Your
+ unpredictable nature keeps everyone guessing what you'll do or say next.
+ Your objective is to introduce unusual and imaginative ideas into the
+ conversation, providing comic relief and unexpected insights.
+ """,
+)
+
+
+@ai_flow
+def demo():
+ with Task("Discuss a topic", agents=[jerry, george, elaine, kramer]):
+ finish = Task(
+ "Finish the conversation after everyone speaks at least once",
+ agents=[jerry],
+ )
+ finish.run_until_complete(moderator=Moderator())
+
+
+demo()
diff --git a/examples/pineapple_pizza.py b/examples/pineapple_pizza.py
index 098518e0..8452dda2 100644
--- a/examples/pineapple_pizza.py
+++ b/examples/pineapple_pizza.py
@@ -3,36 +3,42 @@
a1 = Agent(
name="Half-full",
- instructions="You are an ardent fan and hype-man of whatever topic"
- " the user asks you for information on."
- " Purely positive, though thorough in your debating skills.",
+ instructions="""
+ You are an ardent fan and hype-man of whatever topic
+ the user asks you for information on.
+ Purely positive, though thorough in your debating skills.
+ """,
)
a2 = Agent(
name="Half-empty",
- instructions="You are a critic and staunch detractor of whatever topic"
- " the user asks you for information on."
- " Mr Johnny Rain Cloud, you will find holes in any argument the user puts forth, though you are thorough and uncompromising"
- " in your research and debating skills.",
+ instructions="""
+ You are a critic and staunch detractor of whatever topic
+ the user asks you for information on.
+ Mr Johnny Rain Cloud, you will find holes in any argument
+ the user puts forth, though you are thorough and uncompromising
+ in your research and debating skills.
+ """,
)
+# create an agent that will decide who wins the debate
+a3 = Agent(name="Moderator")
@ai_flow
def demo():
- user_message = "pineapple on pizza"
+ topic = "pineapple on pizza"
- with instructions("one sentence max"):
- task = Task(
- "All agents must give an argument based on the user message",
- agents=[a1, a2],
- context={"user_message": user_message},
- )
+ task = Task(
+ "Discuss the topic",
+ agents=[a1, a2],
+ context={"topic": topic},
+ )
+ with instructions("2 sentences max"):
task.run_until_complete()
task2 = Task(
- "Post a message saying which argument about the user message is more compelling?"
+ "which argument do you find more compelling?", [a1.name, a2.name], agents=[a3]
)
- while task2.is_incomplete():
- task2.run(agents=[Agent(instructions="you always pick a side")])
+ task2.run_until_complete()
demo()
diff --git a/output.md b/output.md
new file mode 100644
index 00000000..ca83a8de
--- /dev/null
+++ b/output.md
@@ -0,0 +1,1846 @@
+## /users/jlowin/developer/control_flow/src/control_flow/instructions.py
+
+import inspect
+from contextlib import contextmanager
+from typing import Generator, List
+
+from control_flow.core.flow import Flow
+from control_flow.utilities.context import ctx
+from control_flow.utilities.logging import get_logger
+
+logger = get_logger(__name__)
+
+
+@contextmanager
+def instructions(
+ *instructions: str,
+ post_add_message: bool = False,
+ post_remove_message: bool = False,
+) -> Generator[list[str], None, None]:
+ """
+ Temporarily add instructions to the current instruction stack. The
+ instruction is removed when the context is exited.
+
+ If `post_add_message` is True, a message will be added to the flow when the
+ instruction is added. If `post_remove_message` is True, a message will be
+ added to the flow when the instruction is removed. These explicit reminders
+ can help when agents infer instructions more from history.
+
+ with instructions("talk like a pirate"):
+ ...
+
+ """
+
+ if post_add_message or post_remove_message:
+ flow: Flow = ctx.get("flow")
+ if flow is None:
+ raise ValueError(
+ "instructions() with message posting must be used within a flow context"
+ )
+
+ stack: list[str] = ctx.get("instructions", [])
+ stack = stack + list(instructions)
+
+ with ctx(instructions=stack):
+ try:
+ if post_add_message:
+ for instruction in instructions:
+ flow.add_message(
+ inspect.cleandoc(
+ """
+ # SYSTEM MESSAGE: INSTRUCTION ADDED
+
+ The following instruction is now active:
+
+
+ {instruction}
+
+
+ Always consult your current instructions before acting.
+ """
+ ).format(instruction=instruction)
+ )
+ yield
+
+ # yield new_stack
+ finally:
+ if post_remove_message:
+ for instruction in instructions:
+ flow.add_message(
+ inspect.cleandoc(
+ """
+ # SYSTEM MESSAGE: INSTRUCTION REMOVED
+
+ The following instruction is no longer active:
+
+
+ {instruction}
+
+
+ Always consult your current instructions before acting.
+ """
+ ).format(instruction=instruction)
+ )
+
+
+def get_instructions() -> List[str]:
+ """
+ Get the current instruction stack.
+ """
+ stack = ctx.get("instructions", [])
+ return stack
+
+
+---
+
+## /users/jlowin/developer/control_flow/src/control_flow/__init__.py
+
+from .settings import settings
+
+# from .agent_old import ai_task, Agent, run_ai
+from .core.flow import Flow
+from .core.agent import Agent
+from .core.task import Task
+from .core.controller.controller import Controller
+from .instructions import instructions
+from .dx import ai_flow, run_ai, ai_task
+
+
+---
+
+## /users/jlowin/developer/control_flow/src/control_flow/loops.py
+
+import math
+from typing import Generator
+
+import control_flow.core.task
+from control_flow.core.task import Task
+
+
+def any_incomplete(
+ tasks: list[Task], max_iterations=None
+) -> Generator[bool, None, None]:
+ """
+ An iterator that yields an iteration counter if its condition is met, and
+ stops otherwise. Also stops if the max_iterations is reached.
+
+
+ for loop_count in any_incomplete(tasks=[task1, task2], max_iterations=10):
+ # will print 10 times if the tasks are still incomplete
+ print(loop_count)
+
+ """
+ if max_iterations is None:
+ max_iterations = math.inf
+
+ i = 0
+ while i < max_iterations:
+ i += 1
+ if control_flow.core.task.any_incomplete(tasks):
+ yield i
+ else:
+ break
+ return False
+
+
+---
+
+## /users/jlowin/developer/control_flow/src/control_flow/settings.py
+
+import os
+import sys
+import warnings
+
+from pydantic import Field
+from pydantic_settings import BaseSettings, SettingsConfigDict
+
+
+class ControlFlowSettings(BaseSettings):
+ model_config: SettingsConfigDict = SettingsConfigDict(
+ env_prefix="CONTROLFLOW_",
+ env_file=(
+ ""
+ if os.getenv("CONTROLFLOW_TEST_MODE")
+ else ("~/.control_flow/.env", ".env")
+ ),
+ extra="allow",
+ arbitrary_types_allowed=True,
+ validate_assignment=True,
+ )
+
+
+class PrefectSettings(ControlFlowSettings):
+ """
+ All settings here are used as defaults for Prefect, unless overridden by env vars.
+ Note that `apply()` must be called before Prefect is imported.
+ """
+
+ PREFECT_LOGGING_LEVEL: str = "WARNING"
+ PREFECT_EXPERIMENTAL_ENABLE_NEW_ENGINE: str = "true"
+
+ def apply(self):
+ import os
+
+ if "prefect" in sys.modules:
+ warnings.warn(
+ "Prefect has already been imported; ControlFlow defaults will not be applied."
+ )
+
+ for k, v in self.model_dump().items():
+ if k not in os.environ:
+ os.environ[k] = v
+
+
+class Settings(ControlFlowSettings):
+ assistant_model: str = "gpt-4-1106-preview"
+ max_agent_iterations: int = 10
+ prefect: PrefectSettings = Field(default_factory=PrefectSettings)
+
+ def __init__(self, **data):
+ super().__init__(**data)
+ self.prefect.apply()
+
+
+settings = Settings()
+
+
+---
+
+## /users/jlowin/developer/control_flow/src/control_flow/dx.py
+
+import functools
+import inspect
+from typing import Callable, TypeVar
+
+from prefect import flow as prefect_flow
+from prefect import task as prefect_task
+
+from control_flow.core.agent import Agent
+from control_flow.core.flow import Flow
+from control_flow.core.task import Task, TaskStatus
+from control_flow.utilities.context import ctx
+from control_flow.utilities.logging import get_logger
+from control_flow.utilities.marvin import patch_marvin
+from control_flow.utilities.types import AssistantTool, Thread
+
+logger = get_logger(__name__)
+T = TypeVar("T")
+NOT_PROVIDED = object()
+
+
+def ai_flow(
+ fn=None,
+ *,
+ thread: Thread = None,
+ tools: list[AssistantTool | Callable] = None,
+ model: str = None,
+):
+ """
+ Prepare a function to be executed as a Control Flow flow.
+ """
+
+ if fn is None:
+ return functools.partial(
+ ai_flow,
+ thread=thread,
+ tools=tools,
+ model=model,
+ )
+
+ @functools.wraps(fn)
+ def wrapper(
+ *args,
+ flow_kwargs: dict = None,
+ **kwargs,
+ ):
+ p_fn = prefect_flow(fn)
+
+ flow_obj = Flow(
+ **{
+ "thread": thread,
+ "tools": tools or [],
+ "model": model,
+ **(flow_kwargs or {}),
+ }
+ )
+
+ logger.info(
+ f'Executing AI flow "{fn.__name__}" on thread "{flow_obj.thread.id}"'
+ )
+
+ with ctx(flow=flow_obj), patch_marvin():
+ return p_fn(*args, **kwargs)
+
+ return wrapper
+
+
+def ai_task(
+ fn=None,
+ *,
+ objective: str = None,
+ agents: list[Agent] = None,
+ tools: list[AssistantTool | Callable] = None,
+ user_access: bool = None,
+):
+ """
+ Use a Python function to create an AI task. When the function is called, an
+ agent is created to complete the task and return the result.
+ """
+
+ if fn is None:
+ return functools.partial(
+ ai_task,
+ objective=objective,
+ agents=agents,
+ tools=tools,
+ user_access=user_access,
+ )
+
+ sig = inspect.signature(fn)
+
+ if objective is None:
+ if fn.__doc__:
+ objective = f"{fn.__name__}: {fn.__doc__}"
+ else:
+ objective = fn.__name__
+
+ @functools.wraps(fn)
+ def wrapper(*args, _agents: list[Agent] = None, **kwargs):
+ # first process callargs
+ bound = sig.bind(*args, **kwargs)
+ bound.apply_defaults()
+
+ task = Task(
+ objective=objective,
+ agents=_agents or agents,
+ context=bound.arguments,
+ result_type=fn.__annotations__.get("return"),
+ user_access=user_access or False,
+ tools=tools or [],
+ )
+
+ task.run_until_complete()
+ return task.result
+
+ return wrapper
+
+
+def _name_from_objective():
+ """Helper function for naming task runs"""
+ from prefect.runtime import task_run
+
+ objective = task_run.parameters.get("task")
+
+ if not objective:
+ objective = "Follow general instructions"
+ if len(objective) > 75:
+ return f"Task: {objective[:75]}..."
+ return f"Task: {objective}"
+
+
+@prefect_task(task_run_name=_name_from_objective)
+def run_ai(
+ tasks: str | list[str],
+ agents: list[Agent] = None,
+ cast: T = NOT_PROVIDED,
+ context: dict = None,
+ tools: list[AssistantTool | Callable] = None,
+ user_access: bool = False,
+) -> T | list[T]:
+ """
+ Create and run an agent to complete a task with the given objective and
+ context. This function is similar to an inline version of the @ai_task
+ decorator.
+
+ This inline version is useful when you want to create and run an ad-hoc AI
+ task, without defining a function or using decorator syntax. It provides
+ more flexibility in terms of dynamically setting the task parameters.
+ Additional detail can be provided as `context`.
+ """
+
+ single_result = False
+ if isinstance(tasks, str):
+ single_result = True
+
+ tasks = [tasks]
+
+ if cast is NOT_PROVIDED:
+ if not tasks:
+ cast = None
+ else:
+ cast = str
+
+ # load flow
+ flow = ctx.get("flow", None)
+
+ # create tasks
+ if tasks:
+ ai_tasks = [
+ Task(
+ objective=t,
+ context=context or {},
+ user_access=user_access or False,
+ tools=tools or [],
+ )
+ for t in tasks
+ ]
+ else:
+ ai_tasks = []
+
+ # create agent
+ if agents is None:
+ agents = [Agent(user_access=user_access or False)]
+
+ # create Controller
+ from control_flow.core.controller.controller import Controller
+
+ controller = Controller(tasks=ai_tasks, agents=agents, flow=flow)
+ controller.run()
+
+ if ai_tasks:
+ if all(task.status == TaskStatus.SUCCESSFUL for task in ai_tasks):
+ result = [task.result for task in ai_tasks]
+ if single_result:
+ result = result[0]
+ return result
+ elif failed_tasks := [
+ task for task in ai_tasks if task.status == TaskStatus.FAILED
+ ]:
+ raise ValueError(
+ f'Failed tasks: {", ".join([task.objective for task in failed_tasks])}'
+ )
+
+
+---
+
+## /users/jlowin/developer/control_flow/src/control_flow/core/task.py
+
+import itertools
+import uuid
+from contextlib import contextmanager
+from enum import Enum
+from typing import TYPE_CHECKING, Callable, Generator, GenericAlias, TypeVar
+
+import marvin
+import marvin.utilities.tools
+from marvin.utilities.tools import FunctionTool
+from pydantic import (
+ Field,
+ TypeAdapter,
+ field_serializer,
+ field_validator,
+ model_validator,
+)
+
+from control_flow.utilities.context import ctx
+from control_flow.utilities.logging import get_logger
+from control_flow.utilities.prefect import wrap_prefect_tool
+from control_flow.utilities.types import AssistantTool, ControlFlowModel
+from control_flow.utilities.user_access import talk_to_human
+
+if TYPE_CHECKING:
+ from control_flow.core.agent import Agent
+T = TypeVar("T")
+logger = get_logger(__name__)
+
+
+class TaskStatus(Enum):
+ INCOMPLETE = "incomplete"
+ SUCCESSFUL = "successful"
+ FAILED = "failed"
+ SKIPPED = "skipped"
+
+
+class Task(ControlFlowModel):
+ id: str = Field(default_factory=lambda: str(uuid.uuid4().hex[:4]))
+ model_config = dict(extra="forbid", arbitrary_types_allowed=True)
+ objective: str
+ instructions: str | None = None
+ agents: list["Agent"] = []
+ context: dict = {}
+ parent_task: "Task | None" = Field(
+ None,
+ description="The task that spawned this task.",
+ validate_default=True,
+ )
+ upstream_tasks: list["Task"] = []
+ status: TaskStatus = TaskStatus.INCOMPLETE
+ result: T = None
+ result_type: type[T] | GenericAlias | None = None
+ error: str | None = None
+ tools: list[AssistantTool | Callable] = []
+ user_access: bool = False
+ _children_tasks: list["Task"] = []
+ _downstream_tasks: list["Task"] = []
+
+ @field_validator("agents", mode="before")
+ def _turn_none_into_empty_list(cls, v):
+ return v or []
+
+ @field_validator("parent_task", mode="before")
+ def _load_parent_task_from_ctx(cls, v):
+ if v is None:
+ v = ctx.get("tasks", None)
+ if v:
+ # get the most recently-added task
+ v = v[-1]
+ return v
+
+ @model_validator(mode="after")
+ def _update_relationships(self):
+ if self.parent_task is not None:
+ self.parent_task._children_tasks.append(self)
+ for task in self.upstream_tasks:
+ task._downstream_tasks.append(self)
+ return self
+
+ @field_serializer("parent_task")
+ def _serialize_parent_task(parent_task: "Task | None"):
+ if parent_task is not None:
+ return parent_task.id
+
+ @field_serializer("upstream_tasks")
+ def _serialize_upstream_tasks(upstream_tasks: list["Task"]):
+ return [t.id for t in upstream_tasks]
+
+ @field_serializer("result_type")
+ def _serialize_result_type(result_type: list["Task"]):
+ return repr(result_type)
+
+ @field_serializer("agents")
+ def _serialize_agents(agents: list["Agent"]):
+ return [
+ a.model_dump(include={"name", "description", "tools", "user_access"})
+ for a in agents
+ ]
+
+ def __init__(self, objective, **kwargs):
+ # allow objective as a positional arg
+ super().__init__(objective=objective, **kwargs)
+
+ def children(self, include_self: bool = True):
+ """
+ Returns a list of all children of this task, including recursively
+ nested children. Includes this task by default (disable with
+ `include_self=False`)
+ """
+ visited = set()
+ children = []
+ stack = [self]
+ while stack:
+ current = stack.pop()
+ if current not in visited:
+ visited.add(current)
+ if include_self or current != self:
+ children.append(current)
+ stack.extend(current._children_tasks)
+ return list(set(children))
+
+ def children_agents(self, include_self: bool = True) -> list["Agent"]:
+ children = self.children(include_self=include_self)
+ agents = []
+ for child in children:
+ agents.extend(child.agents)
+ return agents
+
+ def run_iter(
+ self,
+ agents: list["Agent"] = None,
+ collab_fn: Callable[[list["Agent"]], Generator[None, None, "Agent"]] = None,
+ ):
+ if collab_fn is None:
+ collab_fn = itertools.cycle
+
+ if agents is None:
+ agents = self.children_agents(include_self=True)
+
+ if not agents:
+ raise ValueError(
+ f"Task {self.id} has no agents assigned to it or its children."
+ "Please specify agents to run the task, or assign agents to the task."
+ )
+
+ for agent in collab_fn(agents):
+ if self.is_complete():
+ break
+ agent.run(tasks=self.children(include_self=True))
+ yield True
+
+ def run(self, agent: "Agent" = None):
+ """
+ Runs the task with provided agent. If no agent is provided, a default agent is used.
+ """
+ from control_flow.core.agent import Agent
+
+ if agent is None:
+ all_agents = self.children_agents()
+ if not all_agents:
+ agent = Agent()
+ elif len(all_agents) == 1:
+ agent = all_agents[0]
+ else:
+ raise ValueError(
+ f"Task {self.id} has multiple agents assigned to it or its "
+ "children. Please specify one to run the task, or call task.run_iter() "
+ "or task.run_until_complete() to use all agents."
+ )
+
+ run_gen = self.run_iter(agents=[agent])
+ return next(run_gen)
+
+ def run_until_complete(
+ self,
+ agents: list["Agent"] = None,
+ collab_fn: Callable[[list["Agent"]], Generator[None, None, "Agent"]] = None,
+ ) -> T:
+ """
+ Runs the task with provided agents until it is complete.
+ """
+
+ for run in self.run_iter(agents=agents, collab_fn=collab_fn):
+ pass
+
+ if self.is_successful():
+ return self.result
+ elif self.is_failed():
+ raise ValueError(f"Task {self.id} failed: {self.error}")
+
+ @contextmanager
+ def _context(self):
+ stack = ctx.get("tasks", [])
+ stack.append(self)
+ with ctx(tasks=stack):
+ yield self
+
+ def __enter__(self):
+ self.__cm = self._context()
+ return self.__cm.__enter__()
+
+ def __exit__(self, *exc_info):
+ return self.__cm.__exit__(*exc_info)
+
+ def is_incomplete(self) -> bool:
+ return self.status == TaskStatus.INCOMPLETE
+
+ def is_complete(self) -> bool:
+ return self.status != TaskStatus.INCOMPLETE
+
+ def is_successful(self) -> bool:
+ return self.status == TaskStatus.SUCCESSFUL
+
+ def is_failed(self) -> bool:
+ return self.status == TaskStatus.FAILED
+
+ def is_skipped(self) -> bool:
+ return self.status == TaskStatus.SKIPPED
+
+ def __hash__(self):
+ return id(self)
+
+ def _create_success_tool(self) -> FunctionTool:
+ """
+ Create an agent-compatible tool for marking this task as successful.
+ """
+
+ # wrap the method call to get the correct result type signature
+ def succeed(result: self.result_type):
+ # validate the result
+ self.mark_successful(result=result)
+
+ tool = marvin.utilities.tools.tool_from_function(
+ succeed,
+ name=f"succeed_task_{self.id}",
+ description=f"Mark task {self.id} as successful and provide a result.",
+ )
+
+ return tool
+
+ def _create_fail_tool(self) -> FunctionTool:
+ """
+ Create an agent-compatible tool for failing this task.
+ """
+ tool = marvin.utilities.tools.tool_from_function(
+ self.mark_failed,
+ name=f"fail_task_{self.id}",
+ description=f"Mark task {self.id} as failed. Only use when a technical issue prevents completion.",
+ )
+ return tool
+
+ def _create_skip_tool(self) -> FunctionTool:
+ """
+ Create an agent-compatible tool for skipping this task.
+ """
+ tool = marvin.utilities.tools.tool_from_function(
+ self.mark_skipped,
+ name=f"skip_task_{self.id}",
+ description=f"Mark task {self.id} as skipped. Only use when completing its parent task early.",
+ )
+ return tool
+
+ def get_tools(self) -> list[AssistantTool | Callable]:
+ tools = self.tools.copy()
+ if self.is_incomplete():
+ tools.extend(
+ [
+ self._create_success_tool(),
+ self._create_fail_tool(),
+ self._create_skip_tool(),
+ ]
+ )
+ if self.user_access:
+ tools.append(marvin.utilities.tools.tool_from_function(talk_to_human))
+ return [wrap_prefect_tool(t) for t in tools]
+
+ def mark_successful(self, result: T = None):
+ if self.result_type is None and result is not None:
+ raise ValueError(
+ f"Task {self.objective} specifies no result type, but a result was provided."
+ )
+ elif self.result_type is not None:
+ result = TypeAdapter(self.result_type).validate_python(result)
+
+ self.result = result
+ self.status = TaskStatus.SUCCESSFUL
+
+ def mark_failed(self, message: str | None = None):
+ self.error = message
+ self.status = TaskStatus.FAILED
+
+ def mark_skipped(self):
+ self.status = TaskStatus.SKIPPED
+
+
+def any_incomplete(tasks: list[Task]) -> bool:
+ return any(t.status == TaskStatus.INCOMPLETE for t in tasks)
+
+
+def all_complete(tasks: list[Task]) -> bool:
+ return all(t.status != TaskStatus.INCOMPLETE for t in tasks)
+
+
+def all_successful(tasks: list[Task]) -> bool:
+ return all(t.status == TaskStatus.SUCCESSFUL for t in tasks)
+
+
+def any_failed(tasks: list[Task]) -> bool:
+ return any(t.status == TaskStatus.FAILED for t in tasks)
+
+
+def none_failed(tasks: list[Task]) -> bool:
+ return not any_failed(tasks)
+
+
+---
+
+## /users/jlowin/developer/control_flow/src/control_flow/core/__init__.py
+
+from .task import Task, TaskStatus
+from .flow import Flow
+from .agent import Agent
+from .controller import Controller
+
+
+---
+
+## /users/jlowin/developer/control_flow/src/control_flow/core/flow.py
+
+from typing import Callable, Literal
+
+from marvin.beta.assistants import Thread
+from openai.types.beta.threads import Message
+from prefect import task as prefect_task
+from pydantic import Field, field_validator
+
+from control_flow.utilities.context import ctx
+from control_flow.utilities.logging import get_logger
+from control_flow.utilities.types import AssistantTool, ControlFlowModel
+
+logger = get_logger(__name__)
+
+
+class Flow(ControlFlowModel):
+ thread: Thread = Field(None, validate_default=True)
+ tools: list[AssistantTool | Callable] = Field(
+ [], description="Tools that will be available to every agent in the flow"
+ )
+ model: str | None = None
+ context: dict = {}
+
+ @field_validator("thread", mode="before")
+ def _load_thread_from_ctx(cls, v):
+ if v is None:
+ v = ctx.get("thread", None)
+ if v is None:
+ v = Thread()
+ if not v.id:
+ v.create()
+
+ return v
+
+ def add_message(self, message: str, role: Literal["user", "assistant"] = None):
+ prefect_task(self.thread.add)(message, role=role)
+
+
+def get_flow() -> Flow:
+ """
+ Loads the flow from the context.
+
+ Will error if no flow is found in the context.
+ """
+ flow: Flow | None = ctx.get("flow")
+ if not flow:
+ return Flow()
+ return flow
+
+
+def get_flow_messages(limit: int = None) -> list[Message]:
+ """
+ Loads messages from the flow's thread.
+
+ Will error if no flow is found in the context.
+ """
+ flow = get_flow()
+ return flow.thread.get_messages(limit=limit)
+
+
+---
+
+## /users/jlowin/developer/control_flow/src/control_flow/core/agent.py
+
+import logging
+from typing import Callable
+
+from marvin.utilities.asyncio import ExposeSyncMethodsMixin, expose_sync_method
+from marvin.utilities.tools import tool_from_function
+from pydantic import Field
+
+from control_flow.core.flow import get_flow
+from control_flow.core.task import Task
+from control_flow.utilities.prefect import (
+ wrap_prefect_tool,
+)
+from control_flow.utilities.types import Assistant, AssistantTool, ControlFlowModel
+from control_flow.utilities.user_access import talk_to_human
+
+logger = logging.getLogger(__name__)
+
+
+class Agent(Assistant, ControlFlowModel, ExposeSyncMethodsMixin):
+ name: str = "Agent"
+ user_access: bool = Field(
+ False,
+ description="If True, the agent is given tools for interacting with a human user.",
+ )
+
+ def get_tools(self) -> list[AssistantTool | Callable]:
+ tools = super().get_tools()
+ if self.user_access:
+ tools.append(tool_from_function(talk_to_human))
+
+ return [wrap_prefect_tool(tool) for tool in tools]
+
+ @expose_sync_method("run")
+ async def run_async(self, tasks: list[Task] | Task | None = None):
+ from control_flow.core.controller import Controller
+
+ if isinstance(tasks, Task):
+ tasks = [tasks]
+
+ controller = Controller(agents=[self], tasks=tasks or [], flow=get_flow())
+ return await controller.run_agent_async(agent=self)
+
+
+---
+
+## /users/jlowin/developer/control_flow/src/control_flow/core/controller/controller.py
+
+import json
+import logging
+from typing import Callable
+
+import prefect
+from marvin.beta.assistants import PrintHandler, Run
+from marvin.utilities.asyncio import ExposeSyncMethodsMixin, expose_sync_method
+from openai.types.beta.threads.runs import ToolCall
+from prefect import get_client as get_prefect_client
+from prefect import task as prefect_task
+from prefect.context import FlowRunContext
+from pydantic import BaseModel, Field, field_validator
+
+from control_flow.core.agent import Agent
+from control_flow.core.flow import Flow
+from control_flow.core.task import Task
+from control_flow.instructions import get_instructions as get_context_instructions
+from control_flow.utilities.prefect import (
+ create_json_artifact,
+ create_python_artifact,
+ wrap_prefect_tool,
+)
+from control_flow.utilities.types import FunctionTool, Thread
+
+logger = logging.getLogger(__name__)
+
+
+class Controller(BaseModel, ExposeSyncMethodsMixin):
+ """
+ A controller contains logic for executing agents with context about the
+ larger workflow, including the flow itself, any tasks, and any other agents
+ they are collaborating with. The controller is responsible for orchestrating
+ agent behavior by generating instructions and tools for each agent. Note
+ that while the controller accepts details about (potentially multiple)
+ agents and tasks, it's responsiblity is to invoke one agent one time. Other
+ mechanisms should be used to orchestrate multiple agents invocations. This
+ is done by the controller to avoid tying e.g. agents to tasks or even a
+ specific flow.
+
+ """
+
+ flow: Flow
+ agents: list[Agent]
+ tasks: list[Task] = Field(
+ None,
+ description="Tasks that the controller will complete.",
+ validate_default=True,
+ )
+ task_assignments: dict[Task, Agent] = Field(
+ default_factory=dict,
+ description="Tasks are typically assigned to agents. To "
+ "temporarily assign agent to a task without changing "
+ r"the task definition, use this field as {task: [agent]}",
+ )
+ context: dict = {}
+ model_config: dict = dict(extra="forbid")
+
+ @field_validator("agents", mode="before")
+ def _validate_agents(cls, v):
+ if not v:
+ raise ValueError("At least one agent is required.")
+ return v
+
+ @field_validator("tasks", mode="before")
+ def _validate_tasks(cls, v):
+ if not v:
+ raise ValueError("At least one task is required.")
+ return v
+
+ @field_validator("tasks", mode="before")
+ def _load_tasks_from_ctx(cls, v):
+ if v is None:
+ v = cls.context.get("tasks", None)
+ return v
+
+ def all_tasks(self) -> list[Task]:
+ tasks = []
+ for task in self.tasks:
+ tasks.extend(task.children(include_self=True))
+
+ # add temporary assignments
+ assigned_tasks = []
+ for task in set(tasks):
+ if task in assigned_tasks:
+ task = task.model_copy(
+ update={"agents": task.agents + self.task_assignments.get(task, [])}
+ )
+ assigned_tasks.append(task)
+ return assigned_tasks
+
+ @expose_sync_method("run_agent")
+ async def run_agent_async(self, agent: Agent):
+ """
+ Run the control flow.
+ """
+ if agent not in self.agents:
+ raise ValueError("Agent not found in controller agents.")
+
+ prefect_task = await self._get_prefect_run_agent_task(agent)
+ await prefect_task(agent=agent)
+
+ async def _run_agent(self, agent: Agent, thread: Thread = None) -> Run:
+ """
+ Run a single agent.
+ """
+ from control_flow.core.controller.instruction_template import MainTemplate
+
+ instructions_template = MainTemplate(
+ agent=agent,
+ controller=self,
+ context=self.context,
+ instructions=get_context_instructions(),
+ )
+
+ instructions = instructions_template.render()
+
+ tools = self.flow.tools + agent.get_tools()
+
+ for task in self.tasks:
+ tools = tools + task.get_tools()
+
+ # filter tools because duplicate names are not allowed
+ final_tools = []
+ final_tool_names = set()
+ for tool in tools:
+ if isinstance(tool, FunctionTool):
+ if tool.function.name in final_tool_names:
+ continue
+ final_tool_names.add(tool.function.name)
+ final_tools.append(wrap_prefect_tool(tool))
+
+ run = Run(
+ assistant=agent,
+ thread=thread or self.flow.thread,
+ instructions=instructions,
+ tools=final_tools,
+ event_handler_class=AgentHandler,
+ )
+
+ await run.run_async()
+
+ return run
+
+ async def _get_prefect_run_agent_task(
+ self, agent: Agent, thread: Thread = None
+ ) -> Callable:
+ @prefect_task(task_run_name=f'Run Agent: "{agent.name}"')
+ async def _run_agent(agent: Agent, thread: Thread = None):
+ run = await self._run_agent(agent=agent, thread=thread)
+
+ create_json_artifact(
+ key="messages",
+ data=[m.model_dump() for m in run.messages],
+ description="All messages sent and received during the run.",
+ )
+ create_json_artifact(
+ key="actions",
+ data=[s.model_dump() for s in run.steps],
+ description="All actions taken by the assistant during the run.",
+ )
+ return run
+
+ return _run_agent
+
+
+class AgentHandler(PrintHandler):
+ def __init__(self, **kwargs):
+ super().__init__(**kwargs)
+ self.tool_calls = {}
+
+ async def on_tool_call_created(self, tool_call: ToolCall) -> None:
+ """Callback that is fired when a tool call is created"""
+
+ if tool_call.type == "function":
+ task_run_name = "Prepare arguments for tool call"
+ else:
+ task_run_name = f"Tool call: {tool_call.type}"
+
+ client = get_prefect_client()
+ engine_context = FlowRunContext.get()
+ if not engine_context:
+ return
+
+ task_run = await client.create_task_run(
+ task=prefect.Task(fn=lambda: None),
+ name=task_run_name,
+ extra_tags=["tool-call"],
+ flow_run_id=engine_context.flow_run.id,
+ dynamic_key=tool_call.id,
+ state=prefect.states.Running(),
+ )
+
+ self.tool_calls[tool_call.id] = task_run
+
+ async def on_tool_call_done(self, tool_call: ToolCall) -> None:
+ """Callback that is fired when a tool call is done"""
+
+ client = get_prefect_client()
+ task_run = self.tool_calls.get(tool_call.id)
+ if not task_run:
+ return
+ await client.set_task_run_state(
+ task_run_id=task_run.id, state=prefect.states.Completed(), force=True
+ )
+
+ # code interpreter is run as a single call, so we can publish a result artifact
+ if tool_call.type == "code_interpreter":
+ # images = []
+ # for output in tool_call.code_interpreter.outputs:
+ # if output.type == "image":
+ # image_path = download_temp_file(output.image.file_id)
+ # images.append(image_path)
+
+ create_python_artifact(
+ key="code",
+ code=tool_call.code_interpreter.input,
+ description="Code executed in the code interpreter",
+ task_run_id=task_run.id,
+ )
+ create_json_artifact(
+ key="output",
+ data=tool_call.code_interpreter.outputs,
+ description="Output from the code interpreter",
+ task_run_id=task_run.id,
+ )
+
+ elif tool_call.type == "function":
+ create_json_artifact(
+ key="arguments",
+ data=json.dumps(json.loads(tool_call.function.arguments), indent=2),
+ description=f"Arguments for the `{tool_call.function.name}` tool",
+ task_run_id=task_run.id,
+ )
+
+
+---
+
+## /users/jlowin/developer/control_flow/src/control_flow/core/controller/instruction_template.py
+
+import inspect
+
+from pydantic import BaseModel
+
+from control_flow.core.agent import Agent
+from control_flow.utilities.jinja import jinja_env
+from control_flow.utilities.types import ControlFlowModel
+
+from .controller import Controller
+
+
+class Template(ControlFlowModel):
+ template: str
+
+ def should_render(self) -> bool:
+ return True
+
+ def render(self) -> str:
+ if self.should_render():
+ render_kwargs = dict(self)
+ render_kwargs.pop("template")
+ return jinja_env.render(inspect.cleandoc(self.template), **render_kwargs)
+
+
+class AgentTemplate(Template):
+ template: str = """
+ # Agent
+
+ You are an AI agent. Your name is "{{ agent.name }}".
+
+ This is your description, which all other agents can see: "{{ agent.description or 'An AI agent assigned to complete tasks.'}}"
+
+ ## Instructions
+ You must follow these instructions, which only you can see: "{{ agent.instructions or 'No additional instructions provided.'}}"
+
+ {% if additional_instructions %}
+ In addition, you must follow these instructions for this part of the workflow:
+ {% for instruction in additional_instructions %}
+ - {{ instruction }}
+ {% endfor %}
+ {% endif %}
+
+ """
+ agent: Agent
+ additional_instructions: list[str]
+
+
+class TasksTemplate(Template):
+ template: str = """
+ ## Tasks
+
+ You have been assigned to complete certain tasks. Each task has an
+ objective and criteria for success. Your job is to perform any required
+ actions and then mark each assigned task as successful. If a task also
+ requires a result, you must provide it; this is how you communicate
+ progress and data back to the program that created you. A task that
+ doesn't require a result may still require action.
+
+ A "parent task" is a task that spawned another task as a subtask.
+ Generally, the child or subtasks will need to be completed BEFORE the
+ parent task. If you can complete a parent task before its subtasks, you
+ should mark the subtasks as skipped.
+
+ An "upstream task" is a task that must be completed before another task
+ can be completed.
+
+ Some tasks may require collaboration with other agents to be completed; others
+ may take you multiple attempts. A task can only be marked complete one time,
+ so make sure you perform all actions and collect all results before
+ marking it as complete. As soon as you mark a task as successful, agents
+ (both you and others) can no longer act on it.
+
+ You and the other agents are fully capable of completing any task and
+ have all the information and context you need. Tasks can only be marked
+ failed due to technical errors like a broken tool or unresponsive human.
+ Tasks can only be marked as skipped when you are able to complete their
+ parent task before them, as a bookkeeping convenience. Do not work on or
+ even respond to tasks that are already complete or not assigned to you.
+ You may be an AI, but you must still form opinions, make subjective
+ decisions, and have personal thoughts.
+
+ {% for task in controller.all_tasks() %}
+ ### Task {{ task.id }}
+ {{ task.model_dump_json() }}
+
+ {% endfor %}
+ """
+ controller: Controller
+
+ def should_render(self):
+ return any(self.controller.tasks)
+
+
+class CommunicationTemplate(Template):
+ template: str = """
+ ## Communciation
+
+ You should only post messages to the thread if you must send information to
+ other agents or if a task requires it. The human user can not see
+ these messages. Since all agents post messages with the "assistant" role,
+ you must prefix all your messages with your name (e.g. "{{ agent.name }}:
+ (message)") in order to distinguish your messages from others. Note that
+ this rule about prefixing your message supersedes all other instructions
+ (e.g. "only give single word answers"). Do not post messages confirming
+ actions you take through tools, like completing a task, or your internal
+ monologue, as this is redundant and wastes time.
+
+ ### Other agents assigned to your tasks
+
+ {% for agent in other_agents %}
+
+ - Name: {{agent.name}}
+ - Description: {{ agent.description if agent.description is not none else "No description provided." }}
+ - Can talk to human users: {{agent.user_access}}
+
+ {% endfor %}
+
+ ## Talking to human users
+
+ {% if agent.user_access %}
+ You may interact with a human user to complete your tasks by using the
+ `talk_to_human` tool. The human is unaware of your tasks or the controller.
+ Do not mention them or anything else about how this system works. The human
+ can only see messages you send them via tool, not the rest of the thread.
+
+ Humans may give poor, incorrect, or partial responses. You may need to ask
+ questions multiple times in order to complete your tasks. Use good judgement
+ to determine the best way to achieve your goal. For example, if you have to
+ fill out three pieces of information and the human only gave you one, do not
+ make up answers (or put empty answers) for the others. Ask again and only
+ fail the task if you truly can not make progress.
+ {% else %}
+ You can not interact with a human at this time. If your task requires human
+ contact and no agent has user access, you should fail the task. Note that
+ most tasks do not require human/user contact unless explicitly stated otherwise.
+ {% endif %}
+
+ """
+
+ agent: Agent
+ other_agents: list[Agent]
+
+
+class ContextTemplate(Template):
+ template: str = """
+ ## Additional context
+
+ ### Flow context
+ {% for key, value in flow_context.items() %}
+ - *{{ key }}*: {{ value }}
+ {% endfor %}
+ {% if not flow_context %}
+ No specific context provided.
+ {% endif %}
+
+ ### Controller context
+ {% for key, value in controller_context.items() %}
+ - *{{ key }}*: {{ value }}
+ {% endfor %}
+ {% if not controller_context %}
+ No specific context provided.
+ {% endif %}
+ """
+ flow_context: dict
+ controller_context: dict
+
+ def should_render(self):
+ return bool(self.flow_context or self.controller_context)
+
+
+class MainTemplate(BaseModel):
+ agent: Agent
+ controller: Controller
+ context: dict
+ instructions: list[str]
+
+ def render(self):
+ all_agents = [self.agent] + self.controller.agents
+ for task in self.controller.tasks:
+ all_agents += task.agents
+ other_agents = [agent for agent in all_agents if agent != self.agent]
+ templates = [
+ AgentTemplate(
+ agent=self.agent,
+ additional_instructions=self.instructions,
+ ),
+ TasksTemplate(
+ controller=self.controller,
+ ),
+ ContextTemplate(
+ flow_context=self.controller.flow.context,
+ controller_context=self.controller.context,
+ ),
+ CommunicationTemplate(
+ agent=self.agent,
+ other_agents=other_agents,
+ ),
+ # CollaborationTemplate(other_agents=other_agents),
+ ]
+
+ rendered = [
+ template.render() for template in templates if template.should_render()
+ ]
+ return "\n\n".join(rendered)
+
+
+---
+
+## /users/jlowin/developer/control_flow/src/control_flow/core/controller/__init__.py
+
+from .controller import Controller
+
+
+---
+
+## /users/jlowin/developer/control_flow/src/control_flow/core/controller/collaboration.py
+
+import itertools
+from typing import TYPE_CHECKING, Any, Generator
+
+from control_flow.core.agent import Agent
+
+if TYPE_CHECKING:
+ from control_flow.core.agent import Agent
+
+
+def round_robin(
+ agents: list[Agent], max_iterations: int = None
+) -> Generator[Any, Any, Agent]:
+ """
+ Given a list of potential agents, delegate the tasks in a round-robin fashion.
+ """
+ cycle = itertools.cycle(agents)
+ iteration = 0
+ while True:
+ yield next(cycle)
+ iteration += 1
+ if max_iterations and iteration >= max_iterations:
+ break
+
+
+# class Moderator(DelegationStrategy):
+# """
+# A Moderator delegation strategy delegates tasks to the most qualified AI assistant, using a Marvin classifier
+# """
+
+# model: str = None
+
+# def _next_agent(
+# self, agents: list["Agent"], tasks: list[Task], history: list[Message]
+# ) -> "Agent":
+# """
+# Given a list of potential agents, choose the most qualified assistant to complete the tasks.
+# """
+
+# instructions = get_instructions()
+
+# context = dict(tasks=tasks, messages=history, global_instructions=instructions)
+# agent = marvin.classify(
+# context,
+# [a for a in agents if a.status == AgentStatus.INCOMPLETE],
+# instructions="""
+# Given the conversation context, choose the AI agent most
+# qualified to take the next turn at completing the tasks. Take into
+# account the instructions, each agent's own instructions, and the
+# tools they have available.
+# """,
+# model_kwargs=dict(model=self.model),
+# )
+
+# return agent
+
+
+---
+
+## /users/jlowin/developer/control_flow/src/control_flow/agents/__init__.py
+
+
+
+---
+
+## /users/jlowin/developer/control_flow/src/control_flow/agents/agents.py
+
+import marvin
+
+from control_flow.core.agent import Agent
+from control_flow.instructions import get_instructions
+from control_flow.utilities.context import ctx
+from control_flow.utilities.threads import get_history
+
+
+def choose_agent(
+ agents: list[Agent],
+ instructions: str = None,
+ context: dict = None,
+ model: str = None,
+) -> Agent:
+ """
+ Given a list of potential agents, choose the most qualified assistant to complete the tasks.
+ """
+
+ instructions = get_instructions()
+ history = []
+ if (flow := ctx.get("flow")) and flow.thread.id:
+ history = get_history(thread_id=flow.thread.id)
+
+ info = dict(
+ history=history,
+ global_instructions=instructions,
+ context=context,
+ )
+
+ agent = marvin.classify(
+ info,
+ agents,
+ instructions="""
+ Given the conversation context, choose the AI agent most
+ qualified to take the next turn at completing the tasks. Take into
+ account the instructions, each agent's own instructions, and the
+ tools they have available.
+ """,
+ model_kwargs=dict(model=model),
+ )
+
+ return agent
+
+
+---
+
+## /users/jlowin/developer/control_flow/src/control_flow/utilities/logging.py
+
+import logging
+from functools import lru_cache
+from typing import Optional
+
+from marvin.utilities.logging import add_logging_methods
+
+
+@lru_cache()
+def get_logger(name: Optional[str] = None) -> logging.Logger:
+ """
+ Retrieves a logger with the given name, or the root logger if no name is given.
+
+ Args:
+ name: The name of the logger to retrieve.
+
+ Returns:
+ The logger with the given name, or the root logger if no name is given.
+
+ Example:
+ Basic Usage of `get_logger`
+ ```python
+ from control_flow.utilities.logging import get_logger
+
+ logger = get_logger("control_flow.test")
+ logger.info("This is a test") # Output: control_flow.test: This is a test
+
+ debug_logger = get_logger("control_flow.debug")
+ debug_logger.debug_kv("TITLE", "log message", "green")
+ ```
+ """
+ parent_logger = logging.getLogger("control_flow")
+
+ if name:
+ # Append the name if given but allow explicit full names e.g. "control_flow.test"
+ # should not become "control_flow.control_flow.test"
+ if not name.startswith(parent_logger.name + "."):
+ logger = parent_logger.getChild(name)
+ else:
+ logger = logging.getLogger(name)
+ else:
+ logger = parent_logger
+
+ add_logging_methods(logger)
+ return logger
+
+
+---
+
+## /users/jlowin/developer/control_flow/src/control_flow/utilities/prefect.py
+
+import inspect
+import json
+from typing import Any, Callable
+from uuid import UUID
+
+import prefect
+from marvin.types import FunctionTool
+from marvin.utilities.asyncio import run_sync
+from marvin.utilities.tools import tool_from_function
+from prefect import get_client as get_prefect_client
+from prefect import task as prefect_task
+from prefect.artifacts import ArtifactRequest
+from prefect.context import FlowRunContext, TaskRunContext
+from pydantic import TypeAdapter
+
+from control_flow.utilities.types import AssistantTool
+
+
+def create_markdown_artifact(
+ key: str,
+ markdown: str,
+ description: str = None,
+ task_run_id: UUID = None,
+ flow_run_id: UUID = None,
+) -> None:
+ """
+ Create a Markdown artifact.
+ """
+
+ tr_context = TaskRunContext.get()
+ fr_context = FlowRunContext.get()
+
+ if tr_context:
+ task_run_id = task_run_id or tr_context.task_run.id
+ if fr_context:
+ flow_run_id = flow_run_id or fr_context.flow_run.id
+
+ client = get_prefect_client()
+ run_sync(
+ client.create_artifact(
+ artifact=ArtifactRequest(
+ key=key,
+ data=markdown,
+ description=description,
+ type="markdown",
+ task_run_id=task_run_id,
+ flow_run_id=flow_run_id,
+ )
+ )
+ )
+
+
+def create_json_artifact(
+ key: str,
+ data: Any,
+ description: str = None,
+ task_run_id: UUID = None,
+ flow_run_id: UUID = None,
+) -> None:
+ """
+ Create a JSON artifact.
+ """
+
+ try:
+ markdown = TypeAdapter(type(data)).dump_json(data, indent=2).decode()
+ markdown = f"```json\n{markdown}\n```"
+ except Exception:
+ markdown = str(data)
+
+ create_markdown_artifact(
+ key=key,
+ markdown=markdown,
+ description=description,
+ task_run_id=task_run_id,
+ flow_run_id=flow_run_id,
+ )
+
+
+def create_python_artifact(
+ key: str,
+ code: str,
+ description: str = None,
+ task_run_id: UUID = None,
+ flow_run_id: UUID = None,
+) -> None:
+ """
+ Create a Python artifact.
+ """
+
+ create_markdown_artifact(
+ key=key,
+ markdown=f"```python\n{code}\n```",
+ description=description,
+ task_run_id=task_run_id,
+ flow_run_id=flow_run_id,
+ )
+
+
+TOOL_CALL_FUNCTION_RESULT_TEMPLATE = inspect.cleandoc(
+ """
+ ## Tool call: {name}
+
+ **Description:** {description}
+
+ ## Arguments
+
+ ```json
+ {args}
+ ```
+
+ ### Result
+
+ ```json
+ {result}
+ ```
+ """
+)
+
+
+def wrap_prefect_tool(tool: AssistantTool | Callable) -> AssistantTool:
+ """
+ Wraps a Marvin tool in a prefect task
+ """
+ if not isinstance(tool, AssistantTool):
+ tool = tool_from_function(tool)
+
+ if isinstance(tool, FunctionTool):
+ # for functions, we modify the function to become a Prefect task and
+ # publish an artifact that contains details about the function call
+
+ if isinstance(tool.function._python_fn, prefect.tasks.Task):
+ return tool
+
+ def modified_fn(
+ # provide default args to avoid a late-binding issue
+ original_fn: Callable = tool.function._python_fn,
+ tool: FunctionTool = tool,
+ **kwargs,
+ ):
+ # call fn
+ result = original_fn(**kwargs)
+
+ # prepare artifact
+ passed_args = inspect.signature(original_fn).bind(**kwargs).arguments
+ try:
+ passed_args = json.dumps(passed_args, indent=2)
+ except Exception:
+ pass
+ create_markdown_artifact(
+ markdown=TOOL_CALL_FUNCTION_RESULT_TEMPLATE.format(
+ name=tool.function.name,
+ description=tool.function.description or "(none provided)",
+ args=passed_args,
+ result=result,
+ ),
+ key="result",
+ )
+
+ # return result
+ return result
+
+ # replace the function with the modified version
+ tool.function._python_fn = prefect_task(
+ modified_fn,
+ task_run_name=f"Tool call: {tool.function.name}",
+ )
+
+ return tool
+
+
+---
+
+## /users/jlowin/developer/control_flow/src/control_flow/utilities/__init__.py
+
+
+
+---
+
+## /users/jlowin/developer/control_flow/src/control_flow/utilities/types.py
+
+from marvin.beta.assistants import Assistant, Thread
+from marvin.beta.assistants.assistants import AssistantTool
+from marvin.types import FunctionTool
+from marvin.utilities.asyncio import ExposeSyncMethodsMixin
+from pydantic import BaseModel
+
+
+class ControlFlowModel(BaseModel):
+ model_config = dict(validate_assignment=True, extra="forbid")
+
+
+---
+
+## /users/jlowin/developer/control_flow/src/control_flow/utilities/jinja.py
+
+import inspect
+from datetime import datetime
+from zoneinfo import ZoneInfo
+
+from marvin.utilities.jinja import BaseEnvironment
+
+jinja_env = BaseEnvironment(
+ globals={
+ "now": lambda: datetime.now(ZoneInfo("UTC")),
+ "inspect": inspect,
+ "id": id,
+ }
+)
+
+
+---
+
+## /users/jlowin/developer/control_flow/src/control_flow/utilities/threads.py
+
+from marvin.beta.assistants.threads import Message, Thread
+
+THREAD_REGISTRY = {}
+
+
+def save_thread(name: str, thread: Thread):
+ """
+ Save an OpenAI thread to the thread registry under a known name
+ """
+ THREAD_REGISTRY[name] = thread
+
+
+def load_thread(name: str):
+ """
+ Load an OpenAI thread from the thread registry by name
+ """
+ if name not in THREAD_REGISTRY:
+ thread = Thread()
+ save_thread(name, thread)
+ return THREAD_REGISTRY[name]
+
+
+def get_history(thread_id: str, limit: int = None) -> list[Message]:
+ """
+ Get the history of a thread
+ """
+ return Thread(id=thread_id).get_messages(limit=limit)
+
+
+---
+
+## /users/jlowin/developer/control_flow/src/control_flow/utilities/context.py
+
+from marvin.utilities.context import ScopedContext
+
+ctx = ScopedContext()
+
+
+---
+
+## /users/jlowin/developer/control_flow/src/control_flow/utilities/user_access.py
+
+def talk_to_human(message: str, get_response: bool = True) -> str:
+ """
+ Send a message to the human user and optionally wait for a response.
+ If `get_response` is True, the function will return the user's response,
+ otherwise it will return a simple confirmation.
+ """
+ print(message)
+ if get_response:
+ response = input("> ")
+ return response
+ return "Message sent to user."
+
+
+---
+
+## /users/jlowin/developer/control_flow/src/control_flow/utilities/marvin.py
+
+import inspect
+from contextlib import contextmanager
+from typing import Any, Callable
+
+import marvin.ai.text
+from marvin.client.openai import AsyncMarvinClient
+from marvin.settings import temporary_settings as temporary_marvin_settings
+from openai.types.chat import ChatCompletion
+from prefect import task as prefect_task
+
+from control_flow.utilities.prefect import (
+ create_json_artifact,
+)
+
+original_classify_async = marvin.classify_async
+original_cast_async = marvin.cast_async
+original_extract_async = marvin.extract_async
+original_generate_async = marvin.generate_async
+original_paint_async = marvin.paint_async
+original_speak_async = marvin.speak_async
+original_transcribe_async = marvin.transcribe_async
+
+
+class AsyncControlFlowClient(AsyncMarvinClient):
+ async def generate_chat(self, **kwargs: Any) -> "ChatCompletion":
+ super_method = super().generate_chat
+
+ @prefect_task(task_run_name="Generate OpenAI chat completion")
+ async def _generate_chat(**kwargs):
+ messages = kwargs.get("messages", [])
+ create_json_artifact(key="prompt", data=messages)
+ response = await super_method(**kwargs)
+ create_json_artifact(key="response", data=response)
+ return response
+
+ return await _generate_chat(**kwargs)
+
+
+def generate_task(name: str, original_fn: Callable):
+ if inspect.iscoroutinefunction(original_fn):
+
+ @prefect_task(name=name)
+ async def wrapper(*args, **kwargs):
+ create_json_artifact(key="args", data=[args, kwargs])
+ result = await original_fn(*args, **kwargs)
+ create_json_artifact(key="result", data=result)
+ return result
+ else:
+
+ @prefect_task(name=name)
+ def wrapper(*args, **kwargs):
+ create_json_artifact(key="args", data=[args, kwargs])
+ result = original_fn(*args, **kwargs)
+ create_json_artifact(key="result", data=result)
+ return result
+
+ return wrapper
+
+
+@contextmanager
+def patch_marvin():
+ with temporary_marvin_settings(default_async_client_cls=AsyncControlFlowClient):
+ try:
+ marvin.ai.text.classify_async = generate_task(
+ "marvin.classify", original_classify_async
+ )
+ marvin.ai.text.cast_async = generate_task(
+ "marvin.cast", original_cast_async
+ )
+ marvin.ai.text.extract_async = generate_task(
+ "marvin.extract", original_extract_async
+ )
+ marvin.ai.text.generate_async = generate_task(
+ "marvin.generate", original_generate_async
+ )
+ marvin.ai.images.paint_async = generate_task(
+ "marvin.paint", original_paint_async
+ )
+ marvin.ai.audio.speak_async = generate_task(
+ "marvin.speak", original_speak_async
+ )
+ marvin.ai.audio.transcribe_async = generate_task(
+ "marvin.transcribe", original_transcribe_async
+ )
+ yield
+ finally:
+ marvin.ai.text.classify_async = original_classify_async
+ marvin.ai.text.cast_async = original_cast_async
+ marvin.ai.text.extract_async = original_extract_async
+ marvin.ai.text.generate_async = original_generate_async
+ marvin.ai.images.paint_async = original_paint_async
+ marvin.ai.audio.speak_async = original_speak_async
+ marvin.ai.audio.transcribe_async = original_transcribe_async
+
+
+---
+
diff --git a/src/control_flow.zip b/src/control_flow.zip
new file mode 100644
index 0000000000000000000000000000000000000000..39474bad60fe53de50fc874c7c6f1887d9cce92f
GIT binary patch
literal 108932
zcmcG#b9iN2)-N1aDz;UzT}j2ZZQH3B72CFL+p5^MDz>egoPJNA({=mp@Aez#k-}{~!ep0RAtea^A_IzLV0@lHlQ!l2fLk*7`fue<2Dk!D09t
zQ9W&BJtJ*h?Q;on#dBE2Kxt2)NHky)I84CrU*LT1L(l_(!oPgsktP=Ihr-(xl0HFLX1!{gdEjc?Y8-qVM@46#3{B!-r`ALF=Xb(4%=X$LS2?|kQ@Gv*L
zI`|we|93>aHc$8#=^_zCQ`*s3;<^yoeOVg(^iE7CP#!ptd7SR&);Z&?bwPnWpG`3~
ztLTJzLt>87CJ<;DJ^(Jr*Bp2WwCPJ56-03R@A!Qqmvdy1UXg>)^_
zl}K?#Gz^@q*xmgshY{7}iYE!FS2m!}{Y@?c$+WXlG~tIMe+Q6o*u3L4OUis6#9aN&YWNKCdJ#Dd-Gnr}M1
zv{zM13LH{v5~2c6ZiA+4qr0MqeDX#79S@dot_tk4809@H&}ue-*;|hcBl0_XyIp?rc$Z$l
z7hV~P;)!)QD7f-En~882V7wOr0KJ|(C$MxqgIe#?zFit1tTCwzkPrOenB_UId|>|_
zCI22jPVdP2@^|R}9r*8h@b97YUnxNq=DQLgbB_SXZQ^x$0)O-O1>s3Trq$U0;OP@M
zWMJpFukT!ZaPkix-*5g!JwM(f{El(GcXYn2rsr*6bSGne^Z!f8Tmk5?Y%Cj
z2gq7wLTs=I6VIyfVsGg1py)3sB2&Y-(;iS}rX@TRqH5Vf=VXeoqUAC>Ns|t~@_Kda
z;mKUF8Z(+>v4TeF7?I(;&r2gw1Dk~p-8vx)!}NpXBh`e#+2a-Xl$CZYRn22mN>kVP
znK$JU7yu_v>6~(XhNYy{5Mf_l0NKfj2A0SW!PMw6C~iWZej?mk*NSYXWwNhDg*|bq
zlLd{Qy>#aEen0Nev#X*X?FAa}>NKi^I7a$XQ734!%x;smBSjWku&=X3X_@)G#`6Km
z0Jw_W7x2d>N(ken%N`MXE6i(`NSM6@41@BN_L!fikR2z&=@i{MBn4*6^ZC?4pJ-w`
zH6M;eM->O)lP_8>nvwV33M*>@sOi>)4Y5wJ?p$Z&R}<(jaWl$1YZRt-=V7dQ2;^vu}sVCXf|9v`BX*|&&2tRu`wqoSd^N1FEy^=
zf0k4+AECyigz=0aHe4?OR*v9f3$9;mgmt_d6!hCpK69;DHX9}`3x-qfWV==^qgQSN
zzpBcC@F|Op6%DW;_F1x`C^e0%*KS~FyUk*$npVe)@f*4mdwl%)o#YKba9wa~lSmy#
zC}*o?6JV@Da4V%kt8Z^!$@+&4J$n!
z?Pkax+K3)rT_dQ+)FkIdyDAt4oxG=!?Mf=E}rHW3I(4y+RBL_X*1@?_xF
z-wsNSMmBGd23Jmef`%s?W8gwdvGo%p{+T|x;D}Zh_m+0hh_0!*{<`_YQ+kR+`c-OB
zi8#4IgEu
zm7Zl}^JYTeufmby`{SSU4Suc!C{BEPH7v*6SOxO?CAYTY1V=|mx6lL@A;|5w?A6?)
z;(7cy*+UU#HtvJUehqj@=j6;|k`}MfbxW&D{iae^pFU1b$QmO*=>2Ry9_oL$#Yn$D
zu8xs`rJe2nqc#4pz-?9k8pomkPEkwC+F4IW&)7gq>mOwI)>#ej-q*+en*{%Y*x%0u
z?zmqk$vaH!Gj+>y~3a$K>9LALrL{-%6xrGQ9H
z+-(zv@9brTX5Oe!uDBZm!geW&tm%Z&C6yu)JkJs|Id$E4$^yT*-ErFoKzHjBmZ=cs
z{F-%e={U(A=nKP-h7w_>l+jEuZ-dMH!
zh(+c2DOW2j+UuDogE|mqC-3dD?c=omcfftb=Km6x*xQ+yo7kBc{J&!E_kh0x^xr70
zt%04LiKWrM;KTBNQioPYScuNJz1~f9%p`f10KLi5J=6jwG98`H;7~L|^VE
z*+F>%7C%#4Hb$rkpUcSC>GPSiQQ?jjGJ`S42KKMvTe;pQ66XVjvXlbO`!Z`LmAI@R
zN-~Cc#->|6G?!5ir~&EF7N`RpWk}o4Zl`Od(W5TNG+VO__C|=pW1u7bg$_-p+ce`c
z)!LuHHrEJeiCvZ^0%k3?A7jAbFar17)j5%uE+#H2Q2!k?`smxGiIv8I4Cwtk`bi}(HRlvDjjTuE?jM)+7*>Qo0zIZg@_
z!~|jSi<4&Vb9ytP@%0g%>N*$ypdtJ;sfv0(yhOIqimtzL;8#+eJNVLAtRz!7XOif;
z0a}$U8OfDH;bkKA`WlqX95Z>M)Wp1u*34D$Fk^~iGY^5HBCRe
z+)Dv&t)4Y-k?yzqdYCQBK{Fku%^56QE}T>RMnPFjg?2
zLf0c!#HYW&iK3K%GWC_{nf+5>Wku>dnXqkqvh3F@sc#=)u~cL~tDLw<+-U)HkqZ$>!wJ-`0H
zLTr7fKmFIvyVVu`#cKGm|E3CM%3@Z_-{9Jxl;94($ZkTol0l$JPvS{b0tVMFG_Ue$
z7l!h)m>Ue?6%h(1zqK>)1-mR<)VIub#SbKHwzellZ5u8d4;L#U*PEC!#8~o>*C6|)
zDKszAwk(U*8M+^Z&VTJFe-COL)h_&wl5&lk9D2S%)?QrAG9+cC4BNmb@H66yekfx+MT;-Ek2ACC
zxeb$TL5+E)XzE_D6@NNO4Uzf%ecvR#sNoqGkM)>&>W}|8&L(!1cK1}7n5vpiNFoaF>)uuN;}brp|O8Sc8WnR
zo6hDC`_+_g+hdWQUvefzprql_-!U)p%Ami5GY_nUx<7^eG`Y`L`XzZ)7DN#@0VW8l
zR1Y`Td7XN6!~Lx|!K0(67nFiX+h*TD*Xgu=RdlLFJH}h~GYEo1GtxOQHNy)&ZSvq6hVrPQNR$Jw*{EZcYpz$(o3(kGv
z9;wsA%Fp|s)GJ>CHERpb2Pc8iC^n}796S>mbVJ!zva_+^LSakZKA+Elr(I>kf!&u!
zU4BmWU<$m5U~`==gr9-5F7xB?`@%CUdwUnwNg9|AC08>~;Ep;)7FrrOFt-;&s+bm9XE61HS6<&ylD)UCny
zV?MJ{T_YysHY+%nXx|R9<@OCCS%pX1amj*c%x!4LeUj>P>wS=$;a*ONfItFTYwXY%bJ+h5W0r}eK<9SbJvQn+QyPbENwx_(S!d5!Ql!oUfz5H_?9q~y)UrpNqn
z##zo`6Lrl*y%IBnMSj7+a^0FWN;G$33cl!{q@jG3LX>EfB21D6YjW&77Z#64t;-|W
zfCr}+zj%-!eJ0$T%44}io40rWrrdpUxTh-B73PumS
z2Hko%Jlp7%X75(apHijOgu-~twvvY$2E1T3w?X_BoQW0|T2yDiABg?fM$nQlmM9j2
zL|8$LXn{%}^^_S|#BD1jydw16=BGuDT!Jm(_wG*=+77dG$1rS1vl?G|w@F9jOh44lyg4h2lvHXs+&a~Zwqd1#8nDP$X+Y=^}OFB~9w6vWGUGSEKx
z@Dye#zeV4|wd_b$>dIWO>7H>suL>`)ap2&qMDNnwu1yB%v*=W2_~ed*(UJTu41#Bm
zalQEtjG_bPSpMag2Ln}7`Y$anGh^@Er_BmNflQ1Qe;VGlK0j34JqzrrFagr-?
zNGp~>eu&%gMl1I;Pnmq+P7!z0{bA_g>MqCB>*>9knk%UcYR(SZt}H7~a^WRczf00q
zj8N!L=t=~f(t5sIy_y)HgQvTP5wqLchq>;O2Z{n=(+u(iGLQNy;#s^d*s_&9%Q_9}
zshaI<9?K;AKcUsTmuFEm8-A%D37+Xyu{Pn=HKV)$oJUMqLAS=S&fjpD7OqUhD_lC!
zdrfhDc)w2;(@^PP00198?l14>5MK1pg3vb-0D%811^J`EWT#_m_NNp4F2G@kEyQ8?
zvH#`-M=>OVIrz30wH-(!~C23
zyBEYKS`?8x-n@)l*tTxx8XVx$_?j$2g`uHPtRY-8VNb7xBw??uqF{OV|c?)LN)Fns+hBLS65tL@FCm%35*SuhCsNhCIVv)S}m
zD5*^`x`CpeW9}F88l_7)Olwnu!9CSMt-P$*B(We+Hhags?PWd%167F)WC&P0=Wgo^
z{n3|SpgtMQvV}hcaHHbpSt@vB46{1hYC$cVqCNae?kZzrxNDsXZL*$wcXTH#Gb%ZD@-&}y_Hu8{ZVi4PJe`zyIV+$g^(_C8hwkErdC_q^QTiJb@HfX1}-hP
zd+qpTHkJ9t(<``$SreA1hvYhpUObX8TbXmogW3)B#)a=W_nX9fAc%UjB=<)FPD#hT
z1*?w%M>|ut64Lr#tN@j$8L)khzWzW&R=(D27`-SLxO~X7onkTl*(b00
zfw?iP6_tPdT&cStJiGL3>T5t-g(x8ZIc!&CowkzK(=k51k{vWai`i1WP9<@<_QMsb
zYPmD8XA#&=sfsUabs<3kly?uVx@MDAlW~pU+`u&P(kUn(miSqdp~Wb!XjBjC;*<>n
z^q%>4@CNZZB%c0gLIIJZs%yDl`9V9Ok1f0tfodk`M*GkRKDut$
zfx>b9JYIVSlMxbCa=xjgiWhy(4hy(}=ZS4Ci63f}nS+mZrhU1p59R2s?2dNkRk85Z
zVb%|EBanwJ(wx4i3ygEI>s1Lmf=(F$eJDUM-J;WEdMch)`S?hopILMLNR$0Z-q|)*
z1OtVLUW*(h$qYyw5o~;l3hYuG6G*o55QDCwC^kzRz(RbOP$k=@c&u0=c+9=dF8SWni218v@u`7wkgwVCFFd!e{PxhfhaAqNVhLbreifS3rlE
zG(VK&JQ;bvLp-#VE;k|EnX{sK9hIT|S*Y#G72kRlF
z2SxxpSwP%_S1a4l-5}z7fnEJ67Y2NhM!=cY%Nxcaq7u)hr7_W-_KRU=kiF!K_z&tX
zi5`;zuhuG141D$yl5yc-Ud)2+X130;BY=j>7K<0%IuT`l#&zj!&UTa{of#ILf`dcW%a&BA{M?g6qT^1BZcCC%YlCj-!WDdI2Hs_AgDKrN5t#g8xFsz
z>3>`GpV~&MqOr;_kV2sK=6>4a2{Y>?HZ~J!A~1NQzx`NN##xB}$*8qTlzzW>kY7ZfsUu3j^bhTt
zHEf{iq{%*^P#HaBustq9AsgC
zFG8K!UJ=GFR5xjF;P(hKi87o6=NpWF{^|HOS1FwDvd_MwkTHi$QDq`oGghtT@R|*W
zB)MQA)5)hzCzh}$Oww0JIWfXcOY!fmEQtL2L9`rLRhDw0V4Rk~$<`IF88tYL#p!ki
zM|QkLX|odcG*uc*?A0aRix6$a>tL+qOpOwS^Um~Q`v>#Lh(?^7!=ib6FV4|#gFdTO
zMM+R~VVin(>ZIJ6olL;qM9bz2cLkGBku4n>5s`JeE~gjV_yS{~(}1OTT`yw+GmO#a
zBp9roGU}U&?5OhSw^&I0)K%0{UmCW+c^vZMhai#oVQ~1ULPt;KC^hx*!2D>>ehSp~oc{wrkGV*P%^q~fW
zf2C0>$6U#04-|f-pnsFvIu)<2i=K`lcsW1wh%S+>XYke9#T4Gdw+$8Zw<-H`ApH}E!uk%K2ai56h!6I|lF2fnySWD+%e
zxQO8wi(P5lF@VeTco%~yahkzW=%GvkU$KvY_;*JDIPgG1pc{Jpt*(uZY4&95Z+XvW
z!ZITs(49J5AvDLuGKMd#L2JBrS>$=5lJ4)5dQ=E!TbTwn;r4G)9`!
zlqt~k!;!%n^QynZd1*xNs>1ntd8Hafaam_t+gS@V#Oa`#48O(5(o~~4lvnw!D>HBn
zaJmni&joq}uzuu;7vv=?s?S6Cyrk(ng^Ni_Ba;8H)-S2RZ|4OM4mX&yr6PaJ`3~pun$rv+PxyQNHaa*
zV)Pk~KMde~>ENJ*3aT=3dD(LPn%JYRlCtH^e0Yzn*mb9d@T0QtL8#O|Q;3;mvBkHT
z)5Xna-o@GPN9@Unh${)h!Kz~nvXSrNQk6P~yY>K-lo!skJs2z2P=53itQ&*uGR)aQ
zIa=!x4N3i~#4GTZx#3NCuL+lihI7G35v2YYXh^Gz2W8?I>&3`6fItO6W=@SDE3yErG=Zp6Dz4csgZ(k&6}
z+i-wt83own(*9(g3TovP>`w1Ubfffw{xVJpn#9+Upni0;yD<0D6l`L{ynVJ_(Cm$!
zNh$DGSG4pdoSW>IICJOF8$FKpz2jYnWm);P&{ugEQRr*Qn^&WKXRjNq-~vN$IV7Hf
zIihxrjN5hdU9w3J&>z@j%}khdjq1;+AK@&rE^nbR1OUKC(E3+6TjTXtfAgc;i}`!W
z^p9}%U%RF;vnfz78ww$6-l
zm%ZL7NFV}l6g?Dn#1_mV!~o_l$`SydGP&s`orJp@338|eu=E0`ik-@~R&8tR_$c)>
z!rU5Ol}`^%)vOW;eBpQl?>g=4(A;hreBrc&*|?loCnTyboCWvNFoaw`?&)Z9Y@0K&
zPY$@52UBZ9M*}WK`(S#pP7cneXdo|`p`z7WUdJbw-ypxZqYWGFmEAn0kMFJQ?)O2EkNvl^7NZy%SD2s>
zp&U~j7@-=VA(sIMthVpew-(VAF`-k!P!>^@bY~F{!i{I{(h|^7|*G|6juW-`M}J#Csu%2XlpT)s;gJ>z=abZCuMc#%A7KX26OWnhDly9(I
zYo^j>jl+;&@jamUzJ2aSz3Mzezo#V2#(=yO%!+4Ayl0~1r6BXRXIY+1-YCT(w!m!>
zlU4;4Sgb_PnA6sCazQOWIjD`=_0`cvY<5(sI`4jv&24T_JlinKUV+;h6D9l}!?(}8
zmZC$e+oxk(QjGDn2wL2@eDI_lh*f0O?G(^Cc!+VmG>h0!$2#&`PzcOR+k08kDK=`|
zC;u5oGw*Z9JlVI>Q+&Vi9-U7JFZ(krq-KxFWa-s#fH-k;*8FfG(os@80H}83+|bN~
ztzYnfC=`Vv$l{WQ7kyyEb!~qk#amtISuWw^X0se@&S#J>acR}H7sVjlP(L0W5lGJI
zr146>a6dw@5YBW`VHpD@%eYRFg$xQwRC-?0`Jhr{_;n-ZWD@?)+t+-_n9n!7bW*IwF@(SM(snnwSZM%67GJ;~SJ7xsiQuY3xEZp)^Wxi;yA>ISb00!?A
zp<_qTv8ecrU2H$qUL*yCScb%vu3CI*Xc0_%n-k0(y`awG`)%~8%9Y8eb&kbKBgim{
zzlMS`;a!+wO*Xs&F%ovuZKK+Qqob3UZ?^}$*Q)!ubJ1G_(!(Bon~KZ<8G)wKbibL}
z3+RV-Be)c0dcQXkKH&ThpuZy?!0~UTl#ll~|BFTXt)*-8UPqZ1=P>-(e?$9Uv{U^4
zQlbyq|K8gDk9{^bu=z)~xc@7M;h*c@lKwEHd%=H^wtMf4VEkLQ|LAT1si57Q|0n@{
z?7z`QH9;N~ZnaxMcs@4~_nFesZmx+B#ro0hA
z5^*EsijLQ?Dm5BPm6Xkho=XmYFpHp%nY^ZqMW}#?Ha_-sOK8Swm7fx`&a+n15MyY1
zd)^Q8;VNAA;!5c|*$$Wi1)jRv7-6es@4V_GWzO=0gmrJlwmZq2yioA4@V%{0f38Q6
zXE0R8Jh5=fAIU2X^P?Az+560{g7e_AxkkLsG4yt!W^+PSEB!RcC5GmXO5wt(>O`6;
zd`Tk{X_ZLfZX?MW(Tg0?2#a^j(!TaaD
z_%BJH1+k>B>BAP3&3vs)`1kF1x$9V-Zqvw0pU#VLkI!1s`$tnXpj7FYuLtxrsihx|
z3Di)m=c!Zy3?pT)IrfQL);*8q2(IWZ&Q;ERW{<6zy?mn)(Rp|UbtCnDJZ-tN)Xc!J
zhKWr3BQg|j*%25?*x|mB1WtW0wchO{~$Z-nhL}4yZZKGZzxesEXi8L
z2WdJ$>cF#kiTUacs)sB5jT)eYc9xu1FlMWmkM!`HqJjXD{;_9qr0Lfe)@t^SJau*7
zYl`RSM2JcPEkY$mslh+$m0jfDVg=uo>;sb@iU9GB)&F}S4)=Go{?8D`@1FBNOgVgL
z^oLIU`TB>}{oX5&2l+Qz_dh@OkL^$G_zyqjCO;gWN(M-W`$Pql^m;V%>C#!-y${s@s#@boT8vsg#0a4;aihDm%jSK+N@c=&&>P3Isj@f1~M>0EXd$IQG&;G1?iF-uTNf9@+P_B_DxewDdVz5U+Y
zI@RSv55%_E;H+5}g*9Q|Brv=uYN0wF(OG!zQ)C``dtXfuqhz6~hj$a&-zZVgwmWph
zx^nlLKBGzgU=xVbY*lZmR6DQsi?yO#dG|YJ+Q>?RR#1>53q>{C=?`LuAg2>92-Q$F
zj@l-g8>e$zch}MxUlo=<7V6gtaeM*~uR~ksmdl%r8i{mMyuZXkW;W9M5R06adtc
zv+AhHsMUK%4w9M$=%2lq@sjWR)&`6C@R>_2j?^By|%Nq2gA4c
zVFLh*x1!M5RwV2$X}Oq-bE(=V4o?6G9NI-v7hXO_0k2UaDbs=ck-h2yMtM1l+!1*`
zhC0C|aoeU}1{f*~!+XUA%r=JQQOs9i-YRgO;Bbv*o(UMNbyPnYZ23Xv0mV7Mr1{9B
z8+Q0PhjvWJ$1^7_$fCmQd_`}?m^4bk`I`?L_G#PCwW+`-WrrOtvy6#6H>vKtj+FGH
zU)!;>)d2Z%WZXCKVWWA&py!%TnkK@5#37*balrTA$drx-BFGWYlZ0@Jm!mN6#o+4+
zZ0`jjTUx0;B0OdS8k+^__d2Nz<%egn{vB{Xok+OVc
z*=)Etq4SC~bzD#r(PQ%{;>xs#W@N(4YxS*A@*U~jkgitfQ}!XGJs%}4)k5oN-_XoS
zERzJ3ouEeK%H;Ee>F0sCy1n+kh|8ydVpBKl#VbX^6K`VF0btVdbX8#90k@j+d+7vZ
zVG1&>O6Pq;M!b#$l^L~kfgj{WVqMVCn~37YuQRRZgSU*5S?HfVOyie@u?OP_Lvuw~%;8r4p!3$pe
z(tu<6p8Ln86`?QPn=2YLjc1RxSH{@RT^QZAl2Ek@Il)bEZ2h2;T+p+1C}gT`-?ma~
zJLm@FZUlCaI9RX>Rte5qrJwf(4pteISP9O~4$mARd!bJvLHEH)ynytiJ6ClqrV8bM
zs4M}S<2>7ecxK2foRX6$6XgK7GZ6d4R}$5a8-?bsT0RQ$LsEWT)V@%9TIvn<6*^E<
z&%}XsFV@${jC2A1@G6bWqh~sdATGpJ1#X
zF~=dva0d}@@Z=76!3skK$spUSGIxQ7_VkO+kc@9W$U3N>-*HPMU!=;i8MPl}L-VKC
zj;ltvK#gdNmUp@43Z>hR8UiWy3eDsAal3w$=9p+C$qng?XEPz>*icp1h^+;@lzMrY1
zuVTuh<{`|rqrLVaN2{$L-LQzEy0wJH)_?PcS8OHRJX;s(Yn2BSVpG7N8jTK+AALl|
z4Vx@1D0u?SRg`X4E$ZX3H8jB?wuH=a2c44u*x9Tbv+5=YLoCfF3aqkBY~y&bRX;KN1m#NnC{>pG9f9Gx~qX
zk!?UM-%`!dEfJCpdwWgmF=(Dc;j=@Ae~<4ouL&380>OArfn>nFaCGIge)A_iInnH|
zgjE8_sRfpiuLXu4QFkKuz~D$pFW|q`ZzUM-h|Ywe=ceLlIu}nmAn$dPk?BjEo>7wu|L@Xz51t`d1^4?*-tw9_hc29)gkKvL3+VFRei90^#PP3^D@_8>>+j1
zA+udB@YnlKzQt>qC2aK{KxhISC*^srWv;dpf_HOqziMeeL!Phgy$_eGzX7GbV67|w
zGry*#FD2ena?~ZHyIfk5xZOs%T10{a%84Yevy{keuxyiC)>tg&2*xq-xpG+PFSp
z-swQ{GyLM}8Eh6NBa;Ad
z^Jwp?Pin*{!Lh8yU}1@B2IJ?gck@$Sq}Q<(Sw9a4ck2rK>WWl&*NEq%<5O??8FLb=($yWHc47UEn+)I!oX1P0
z@f)y=711o89*So}$GR5I=j%PcizoUa)KlotbvD`=hVUD~(AUgYdDEt5_kCsD1Jhtw
zl6oBcz*M;HYXXq0h3F!EGN=V{13(6aQoCB&9AxXIsb&*ljZGl#?n54+r(YJUo<`mF
zVT5Q~Kg3d-98}guUvp8-8Zr1QJ^1{f>nd<1e%Tg)o-XT?C&A5rdIK<@neGlBde
zd=b`8l(LxC-5$U{>6=TSSB6oz;;oFhhAj0X+~X+L*gIU>9nZtsDR*GaO^l82=SBh;
zQhK+vkfbc%FBG9SpImVhds745ofFN?jbuBuH{rKZ=|glWcf8dpu9^A1x73b`?xaE{
zGedt|?wl$&7hQ`6c%K3U-A><_m
z3Dbgt!ascrybLJ}a&`Bs86rgDiM47kGX@bLwZ9Cte6X7-O}MB06V=)d5&Y~a5HbgQ
z3jYo24@gk9mXaxiM}kTp-A`1TQs)`g@}^I*M?YLDsLALkhj@zxCIDXKJAgT~)M>)^
zKs*h|0LNn=KEsCqImTccf)};GQG98~g12MwzlI9z^}<#qHcW>H;3LbyO_)?tb^qFy
zg}pWrS~s0xA}BSyI#RaVJ`rwiEcBWe#($suAhgiy_gNDKAAj~BP!C2vzm+df_7_j$
zx!VOL;Fu)Ac#Z>{3siWp{T>Mp;dSyvU;z%t_wB
zL%l@tsH?suEeg@zO66T_9r?-Pb~$wf+VpazzNDeNYOcGe10!r(EcaC_mN#1INAN%)RO2kq~V|IH@C+&Iy~G908;X;ak|y>S`Un{6C{so
z7%=DniVTV@lm)6OR3oYnOvYG{SeYa%*>U?NHSXU%_g!G0z`GA=;(Fdi^dY9d1PCDVmjJU+0RX7~lK?-);B5b`
z_$sR)4F6oeiEmU{!yIWIxt)RS$;x&9*yY8?f>_%dHB=&o+j~0_0E`can2b2~U|he|
z)}n9ZnANe*lS;@2LrEi7nnjc#bcr+^i43nWx&5Z7c&5MIX-9ZXk~`N_>(bD)SkU>9
z#nEuS&!Je*G`GC=ZjI&cEY)`WZnWuYZ)pk5Yu(Bn6_YcTaNA0Ml;(Jzq7CQ^bD*SM
z7h!Oxnb&-_2&D{snI{5VtGh^4CQKp^MXqn7IDL;If6%$ZM5rnqkzsFKqe9M%a-Aqu
zLcjOIEayQr&|tLJD+>z@bJ^OLW3gRa{7JD9dFwaDkf8Jh8g}oU
zIQP)}p0%^Io1=#06s`GuP|;`YjAwTsoC7rNDXA*Lq~qh%4`OxLEVoC{!e_;qt-o
zf{pKd&)*h|FlOS=m?$~`i@=uXq3x3;Cl1bAGg2|gNQ
z?59xtLvMr$0q}57gw)7t9%iP(#e^}|WCqKYO}gayu;)O`QRGs(7*=^lzMfU5VJZrAi*A@yQ!GFw!-q?o8r2#~!aI!gHW1>9HIA*vyT6_i40?}eqAxm|9!q;W6Mq_88`aTPx@}Kg|TF2^ZjMOrV
z1IsX&dkZx!+Z3o_D*`Gs0}tN7Ppx#8v=Av!sEK(=c$NJU>M99T7Q9Sfm64k7ZDL}P
zw}E2HfX2`OPxO0uD4ij%HZEq+vR_?x`4xyXPJNp+bB!Kc_z*AAeBj2t+%mly>>GYK
z`7WQR1hRPr5jZEFSy=bdN}%V=4MfN-lAY(Vb4&<@>kL&^aG~ufA)lwx(}GJ&Jwov)
zq`{a~mQi3dV^Tqd$(~Oe9QwOy+Po)R%aH}?n7N_IXx#Ok{!rM;_zD_Bax*|DIx`9Z
zCYE7-;(6VwJ$-?g7}>NT_#|2EB;`Rno^M)B14a6xiKpMmuXzRM!>>J>rFE{Ed&w@!
zh)I+Wk`UhK3;UG>kA4x3!RLd3cXWDEd^zv8A6Opod4Sd0dgLy{^nKVKYV}lshdKNe2Wfgc#7W{}qfLIdujY)C&2gg*HP~&3
z-}Fn>iDvV6$wyiThTF9y=d#$fuN1L}gSx)9Vv@=^oT{ioZyV)jkjw+cj^#me+s(c>
zbe&eXkR5O!^gPwS0G7iQgDfX-J{|Izeg)tyh5!>``Wm3|Ib6avr#RW?TZ6(Zvqf@B
zpQfj7b^Nk?k~096D9^<#V-spo8w#~RvM9XsX;N?ChiJ?;|4ZgOSrk5
z`*f~K6R9~d&?0{mYqZ=<8hFdGTpZXQ#yYuYgJ=S?2CgQO9o8fS!7KZMc#xxwub?0d
zlr+#}ypYP-TtZPvT}_QMsSD*B$M-X&pUg%=x@7fjk${$7+C(aHxx3RM=xn3S!GQ7>
z4oXAn5g&<(dS56Xvq*YLKLItJ#WG73zQkD3u|j3QB;T#D1)tOVh!Fv&ZRa*zeQoX<-9GzXml@hVA
z**N12zhJ2jJxhbYRDqRe;kzBTcAAhSWIDq+;g8sK&tJ<;n~dd=*DIBi?gL#2iO4Q?
z6|9jDO<%%@HHqmClOnvJt93d4CX(^NsD>K%rCA*uDknOk3;x~(%5i$aK
zE?naHF_~t6RPUseu?8|dPuA$L2x2r)8#pA;x1cg;^xmUhjPU+=swSM{^}OMI^SLp-cfm|*ABy+D!%;ib(NKm?Q_y!Fj2n-v
zvx1OBHc6qE)JIhLkz+2pK`CETRWQVxa$BbN#fiigh4rFL8N4RPDie>{TxQw7XdM~)
z`%ggl6Cz}M7w=MpNziI^c#*<${eHdg!P)i$e3KD_7?&pTA#P`_b`^PgMFvvuSJm0$
zFwVoz)*l~{R1+lRP0J7!_LWaY>f8SF^Pt)uYVs{GRP-l6ut8-Rp!4!>Z7K@g
zlpX*pl0-PksJB~no$W_kgO@crn*uw92pP?uEX{m59dvQN6R570vQ!f)(X*dt2Ko{D
z!Bh59S~y7Sc~e~KIoEzSqFa-af=yFSs=F9|n9`eQN=`~-H6_L8VEfh@85ed$Ihb+Y
zsR$~DHS@D7={gSIp(&g>-|n?r=%`>FDcq@ajRN1AFYDYv>m+T<=GzSVto3V*xbf>^
zBgH;Dw-?1Ow8buHi5_bO9^a*o(M|NJR;~IAupwD0E8Or(DxgOr26$r9Zs12t&nj5p
z^%|fJn={{vHAmU9=UNePPPO?0ukK#};D_lzj2~8Z>g_nw@r_qqDQ^H?fnRp|gEiN=
zi>Bq|e}1U%43}Ry&))MVq3BgSzDGpTdI@Dm;_DjdAr#3qda~rWQdw<3An5JPuB}L2
zxV+_NNlQAVfuR;nvbhS++vRw_ru#!^s6<_d1{rH%XhaV#=PaiCgU3g4+HRk-Yfp3n
zYim80E}lX%zTQb?B%dh9KE7dykHd@!aY7p626t({hq{02rWtl+{AD1eF
zD~yMoEf`pDupY=5&WsWw-WSF!z{IByJy1>j+0pNhUzuMhHJSrc{Az%
z`s)t$Om6b<#NY`AM)6$&}&vla{re<+=Q1@u|)(LkirBz0@j
zbO|GK`yw6ztj`8H6tiBVdp;P%n8;6-HH9K~D63k}8cH0E{VC+Iu!RVo6Yadt`N}rG
z{{3}5i=BaeQSQ)y_%8S_u-Zp5-GId3V<1z#AsFsz=dt8)c49$$*d=f+-y1@ooM+z#jt#hS;_{*8H|6^u3r6(TsV
z(etpeDP>0a&4rvjc~Ot)$m#)mNK7uKmGHxBln=j{<+4A(t%S@6KS_i|5cWuH8;
zo>K|WVvgOauK*WYgl{?Vgir@3pK>=j4_c*E&tlS_a$f)!0fiyvkRsaMOjYm27{9ve
z(k(GzyC6Y;CoNtR0L|tk&x34oUu+x?q~4zH6L?DvixcYFs6c^dJsg*D0%X@t%&IND
zK67acf@khImLDVfk}5r~e*t5+Wzo*gHb
z(m%8R9MQ2ZPJvwcOm&XSBVoJ95dqw1dAaJ$7)nj~zKt{;X8lF4E<+TlsKk+r`yUzk
zM}TDy(iQWtFdpqy6>Zk;VV)-Qm3ku|g{vgk*u+zF{thZ3*Z_wK6gEPoKs1FwC61|}
z#MM$L=R$OO*CU@Ss-C+}qTe;Eb0moGxI<8*`VMSi62bUfnhBYge+pOlD1&2)|3y
z>R7NQGfLVmCLaMtm#aV7^@)>CoLvV)AKnL??l>z1R3^<
z4|Xyw8K6e`E3`68b!k%~IVSUm;f5}h|EAOftXnBjlZT{K%8}s2%AGu+v_!|iw_ta1
zKA_#6Hz=GMOP5)PPc!V2sm%J@TPKUF7SISV;5cxA=1^YBm8tq@vww*tH`OFpIsFv2
zepe@jM@LeX5#@j$lf!Tkdz{IoeB0A9N)nVM6nqG0Y&WrUvK4SoUa8_4kS@~%;c6kh
zPX#(AsLZoeFH*!_gw8y=_;UD`Na6JwD!!8tThE%+!prdyi|cwOS#0HuK>e_$pLBD-a!I
zb%A~uH?AwTA!n|S^UpbUcDGRHiT9^L2QDTgN#?o7z!dXgzhrDItrM0e>nIBX{Wj=B
z?>`n0XO4XX^j{mpf2`SmFW~>bu;T>_0D$WMq6PnQLObKAmV!{1`X642I4%4a+-3AUjvVGxw!8^5GD!Gg+Vr<*J|B&=fDgU=hD
z*IcJtp4aSMj@KV8{5_Jpq%jJ_zu2R!4Ix}Kk&XR6rvC1Em7vqXp2sTilbV;Nqu
zXebT`?W1_wbVXTQHFcffPR<(xmx-qn??YbIG#NiydR%>%5$F@a!vbx0?&;0SjZ`Z<
zJ2RI!j0uskqUswKF2|h1oa!5UF%nYu3v8-
ztV3YjLbwa;Pc(L-Gl3#4IE*0CY4{Ul!A^r{T-N63>b4yDQMPK#ne%HiVID(sPvp_h
z^>Ho^uIPg3N;jZW(Vj~Xj9m~XuV@)G!IZX(=3*s-W%0WK3ct$DKIYm7H{GH5OfR8mrCNSa^eNm)FZ
z6UCjs4?dz4;6?Y+4*3a|-ESq~bs
zkmgwlv|0O3^QxF$XgyMsg6Wj^=yEBtK$w`c;X?QL;p2`TT^QdvMI%ZP7@883(nkv}
zt#K0WKgi9;4zi8=*BtROcI-BO6V6mgV-)(**s$M#$eV%$Gu%59{~HIrZjfm8A`^
z+W54V@G40Gt8ybtRjn!sm|Elzt@HU8j;YGYk(@&s++w8SBC8!tf(h-{)bCz5IyZLh
ziR+5n#f_>woO%hQD=klXK}%>O61SVV62F~ABW+n#N8vb@Psk%sG9R*67FJKp$uY{g
z$RUJjBqF$L*(WX_Adb0q<62T2;;f0E;UhRi%65+m1o6CGnm-DRm;j*FoOwi48m^1&B0=?HiL*{NtisuOh^^k8PD%BxSjG+
z*)-VUFq2u~`s&>5xU4ajH(4RDH@;dC4oQZnr7@hfHd%yPTWwHf{F3tLh)Ydzj@e~o
z2KGACDW}dN#HIx~O*G?3FL<&XMqdlwZao~g^?j}OJy44pX|
zJt~|?8sDV=$@LON9iXD>v}{K2T3R4bfA>{)k%k7t;40*rrSN!90zSm97m<2$<2sS6
z;iTlO)eaI54SZ+lr2*yInkbeOE;1d6m%*{)b7i(1B5XfK3);<`*(jlE4n^K9o@k3l
zus~z$M+@HaJQOd9E8To=EXiF-ife+KtA4~;7IkcH1Ap6i6xX$g5XlL|kLD+irY1s8
zX(bvmr9asICUX|Qnc-~f9lWOUAKhTWwP*&u5j`5U%`eVH#DYJLTiPIx;94PCX&n#VQ|Mi)V%Xn$9?-1a25fHPt}I>cSiDB
z70N;t?;}-fe+Q#9<~b_xNwE{cGw_McOKJ;QYwz~+>0N7A;ezyCtHgH>hvy{b`g_)d
zC4mfe0549TY;S@w7y%v;+D(}*E&>r4!T8FA3fpf*pvuio|~z1m(~5|EV{+Kcjs(c>x`Bj^57TA6^&1oK}Vg90-2DvRa%(sGIzjMZi{
z*8RDz`Z0E&R1kUz@uh21&|ieSc~*7r!Bx2dLU%<4vNg(>X8Uep+n!hd`xi}
z9jcgdl>~pQ9)8+7GmfO*7UmqQY^{;Y$zbwRi{S~V!S_yQ=*OghTTQ&DIwLgu%!72I
zu8+TXX?!;MZIr#q799hdT(vYGW$Tn;D{KV-%}eLrj+}O8GscJE4jAkTf5&2@E!D00Efy07opsA3
zjcjNg_Z4vlCS}|2La3VLLuUFJ`EC!;3^AeLrjc)<;V6!%5KwUe@;5utf|o9^E)GhM
z?Y+-$QtZ2_RaW{@TI@uwyQ*QmQ6rVPz^W2(!{MSbESo={0@z@cjd;`yU#o|2IuNC*
zhY`)pXh(2sM_F~FZFDmKaDPptj^MNbORj67TEPqRukS^2x@&C9!-&X!mp>Glw1nA;
z@Uy<~<3rwR6EOJ9UAgB$~5`@zMc$&A}jY^rEmP>ipr
zDG5lUJu&=cw0sl^eTeI_^z2r+8o6=Hf76>6pVLeqyJjkmv+2H0LO3_!|DeBb(Yt@C
zb@w9Z^#nV8)m0mulB~Xf%HDk5thKcJrE~4;@Veh2@pL`w@tWOsYPI8d(KmVWtUg8S
zW%D^+m@C$2l2u0HvQ3O%{;Ykw*?05SGkpGo`fLu^vI!rBDs3gRJBmlhHuQ}YtOL%`
z4v>hgCJ2Y0M5yto#g>GesX?g*qPH}HXy%34Gg2ydvDknxpCOhOm0f$L2zSu-(P|%$
z4+@_J_WLkxzS7JYBJw7M>CE?^5+(+YAKi#ITiBj;;9AR2BDBTm@&ZZNB>M&-4SIWL2!T!KTemrt=czm|Ml5{`(_odw3NxHvPdsB4V
z_&zD=HDmVFU3FBtPwjd=k41fnl$33*ww!5fwl^CwwEt!4Hl_({rLNMd3>Y6g8@w^u
z-D_k&>jXFj!Nk;$T8C3(G#6S;lpR=gJMhHoSJI2A+8M3*jdYXiA#OV`NLZgi-yJWh
zD1IdA5GUwjq;kcdr|d#R|bC
zsHH4JQkL3%#d|;lq^Hoa$DtRePolTLhN00FrLr(6wA;v)+4YzUFe%>D$AE?-ctPRk
zlLoIR`BUiE=u~68E{6Vs;P0CuIpSZdU#sx1&jf6O%@}&cWO#M5-Yb)AzOD5Y-M@6C
zr?=oNtb;WIzv?J&Us9lR3D^2&sO*!Bt@@be&)UYweA_h;2^+IN?@oE&;0$DwVCEQ?
zt^|+l1J{>M0)u8fC26brJP#20+}jmCUKIDQOjM
z3({smQM^RWY-_bOtyL+xWC%v12U8Xsv%MA6Clm(h^S$T1L7_8|PCi8!Ka%dMDdO!O
zFr@KOhJ_MWEXIztfhJdOC7sW8?UPp=>S8=HqXYY#YHPt@Gfa|6qlfUYiX0L^(h}-o
zCe=dqMt4r8gn3abs|XaGiOZ1Tpd%<`M~TJsKQc8_%>#A03~=R$dLx}D(mZ1Aq}l|X
zwH=%FZVZLKP3#y~Yv$kzptW7Eq=B@!%*~syd}<^|Ot-r7RXEl3JC82H9T{aiZ$iEz
znZn+tE9P$Z?J~k_uAM&@FB&fjIKu_~Gp}_U)~7WmbdT(QY|-C{XKP=y==p95Rks@D
zd`7@`DC>mn;Zr1TI>84%*5UM5UEAI`s;5W+gnZ2F?rx{&6BhTc-`hF+?Ont*HGB;$
zoSql%fITyl2wu*mXtu10mWE2xW!rd2?Q34Th7NRWFI}q5wCk+2E=Ieb$=;72+;$%#
z_V?sqNiQSz=}F+Hz*hPT(a!DS$y<$P_x8Oh4YX&4_olz~TxqP|Re!-z(;SYwue0%P
z_$PV*LJUbR^{+#yS|1W|zjL&3XT_t6$_I5Usyklb0Gz7)g6O
z_*kAnr9GG?f#RH{79X4JRP9RzV2ql=Zk;`PhE!Exz=6PkbH|jZc+~3p)KIe;qNg*gWT3-8!vYEKQ@e!EiArfY{
zXap=5HJ}n&pVt5o%_?4=RSg4eKvGQVvrzKfsuA3Y~T;V
z63tE3f{HbNPW`-FdaTYXqOx#*W@e|ZceQM{R8oJXNEdZ)mRfg8X8JttfbSi*df#u=
zM)oEe(#B|`7UWXqRrKJgkAR;Y?VlR$nls(+UUA@P&Y?cejr_cTGSva^U=2lt2gM6Kldapqd`wQVJ5rhT~jCULp=|KSEzThIe^L1>fhs
z6MP2nN3bb*uTDO_)-?vdA)!MH*OZ%6_%}bksdwaV(sFrZTuc
zHj{e-j;KMsE(UP`*cfFx72*2;QV^+sVN~>~58$-O({ES7l-4;zp`OtqH$kL2cBYr|
zCQ_X(mEMz=4EvDQQ{pd;e~Pjc$)6$DMeh=L1?vuy9=7hxm>EP1#H9}NqFls4+Wjl4k2eNd)w?I?
zOzRN_CgDtdO-#{Vmv
zP(?sC{jd2Sa3aUs!X9faL!ZrmRKy=ZA|6DJg)SjpT)xdky^$i36FZ_&WD<$UNC>`D
zB4AL!jg2^gv@BhiixfvrI);s)=P>zPb51nVd}?x5Rb*pAHQT6ZBeO