From 27604fd2ed4b2f9542f4358f27b59d15202be385 Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin <153965+jlowin@users.noreply.github.com> Date: Mon, 3 Jun 2024 11:46:20 -0400 Subject: [PATCH 1/2] ensure local input tool works --- .../core/controller/instruction_template.py | 5 ++-- src/controlflow/llm/completions.py | 3 +- src/controlflow/llm/tools.py | 5 +++- src/controlflow/tools/talk_to_human.py | 29 +++++++------------ 4 files changed, 19 insertions(+), 23 deletions(-) diff --git a/src/controlflow/core/controller/instruction_template.py b/src/controlflow/core/controller/instruction_template.py index ac241284..84f1d77d 100644 --- a/src/controlflow/core/controller/instruction_template.py +++ b/src/controlflow/core/controller/instruction_template.py @@ -57,12 +57,11 @@ class TasksTemplate(Template): template: str = """ ## Tasks - Your job is to complete the tasks assigned to you. Tasks may have multiple agents assigned. Only one agent - will be active at a time. + Your job is to complete any tasks assigned to you. Tasks may have multiple agents assigned. ### Current tasks - These tasks are assigned to you and ready to be worked on because their dependencies have been completed. + These tasks are assigned to you and ready to be worked on because their dependencies have been completed: {% for task in tasks %} {% if task.is_ready %} diff --git a/src/controlflow/llm/completions.py b/src/controlflow/llm/completions.py index 81b37149..00cc2718 100644 --- a/src/controlflow/llm/completions.py +++ b/src/controlflow/llm/completions.py @@ -19,6 +19,7 @@ as_tools, get_tool_calls, handle_tool_call, + handle_tool_call_async, ) @@ -209,7 +210,7 @@ async def _completion_async_generator( response_messages.append(response_message) for tool_call in get_tool_calls(response_message): - tool_result_message = handle_tool_call(tool_call, tools) + tool_result_message = await handle_tool_call_async(tool_call, tools) yield CompletionEvent( type="tool_result_done", payload=dict(message=tool_result_message) ) diff --git a/src/controlflow/llm/tools.py b/src/controlflow/llm/tools.py index f69c98a6..ad483634 100644 --- a/src/controlflow/llm/tools.py +++ b/src/controlflow/llm/tools.py @@ -4,6 +4,7 @@ from typing import Any, Callable, Literal, Optional, Union import pydantic +from prefect.utilities.asyncutils import run_coro_as_sync from controlflow.llm.messages import ( AssistantMessage, @@ -171,6 +172,8 @@ def handle_tool_call(tool_call: ToolCall, tools: list[dict, Callable]) -> ToolMe metadata.update(tool._metadata) fn_args = tool_call.function.json_arguments() fn_output = tool(**fn_args) + if inspect.isawaitable(fn_output): + fn_output = run_coro_as_sync(fn_output) except Exception as exc: fn_output = f'Error calling function "{fn_name}": {exc}' metadata["is_failed"] = True @@ -199,7 +202,7 @@ async def handle_tool_call_async( metadata = tool._metadata fn_args = tool_call.function.json_arguments() fn_output = tool(**fn_args) - if inspect.is_awaitable(fn_output): + if inspect.isawaitable(fn_output): fn_output = await fn_output except Exception as exc: fn_output = f'Error calling function "{fn_name}": {exc}' diff --git a/src/controlflow/tools/talk_to_human.py b/src/controlflow/tools/talk_to_human.py index cd58a367..028483b9 100644 --- a/src/controlflow/tools/talk_to_human.py +++ b/src/controlflow/tools/talk_to_human.py @@ -7,10 +7,9 @@ import controlflow from controlflow.llm.tools import tool -from controlflow.utilities.context import ctx if TYPE_CHECKING: - from controlflow.tui.app import TUIApp + pass async def get_terminal_input(): @@ -18,18 +17,19 @@ async def get_terminal_input(): # this is not necessary for the flow to run, but can be useful for testing loop = asyncio.get_event_loop() user_input = await loop.run_in_executor(None, input, "Type your response: ") + # user_input = await loop.run_in_executor(None, Prompt.ask, "Type your response") return user_input -async def get_tui_input(tui: "TUIApp", message: str): - container = [] - await tui.get_input(message=message, container=container) - while not container: - await asyncio.sleep(0.1) - return container[0] +# async def get_tui_input(tui: "TUIApp", message: str): +# container = [] +# await tui.get_input(message=message, container=container) +# while not container: +# await asyncio.sleep(0.1) +# return container[0] -async def listen_for_response(): +async def get_flow_run_input(): async for response in receive_input( str, flow_run_id=FlowRunContext.get().flow_run.id, poll_interval=0.2 ): @@ -48,18 +48,11 @@ async def talk_to_human(message: str, get_response: bool = True) -> str: tasks = [] # if running in a Prefect flow, listen for a remote input if (frc := FlowRunContext.get()) and frc.flow_run and frc.flow_run.id: - remote_input = asyncio.create_task(listen_for_response()) + remote_input = asyncio.create_task(get_flow_run_input()) tasks.append(remote_input) # if terminal input is enabled, listen for local input if controlflow.settings.enable_local_input: - # if a TUI is running, use it to get input - if controlflow.settings.enable_tui and ctx.get("tui"): - local_input = asyncio.create_task( - get_tui_input(tui=ctx.get("tui"), message=message) - ) - # otherwise use terminal - else: - local_input = asyncio.create_task(get_terminal_input()) + local_input = asyncio.create_task(get_terminal_input()) tasks.append(local_input) if not tasks: raise ValueError( From 70d09e85016b61f27032a63b39f47ad39c723c93 Mon Sep 17 00:00:00 2001 From: Jeremiah Lowin <153965+jlowin@users.noreply.github.com> Date: Mon, 3 Jun 2024 12:27:07 -0400 Subject: [PATCH 2/2] Restore human in the loop --- src/controlflow/llm/handlers.py | 11 ++++++++--- src/controlflow/tools/talk_to_human.py | 5 +++-- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/src/controlflow/llm/handlers.py b/src/controlflow/llm/handlers.py index 389c9fb9..a25f93fb 100644 --- a/src/controlflow/llm/handlers.py +++ b/src/controlflow/llm/handlers.py @@ -95,21 +95,26 @@ def on_tool_result_done(self, message: ToolMessage): class PrintHandler(CompletionHandler): + def __init__(self): + self.messages: dict[str, ControlFlowMessage] = {} + self.live = Live(auto_refresh=False) + def on_start(self): - self.live = Live(refresh_per_second=12) self.live.start() - self.messages: dict[str, ControlFlowMessage] = {} def on_end(self): self.live.stop() + def on_exception(self, exc: Exception): + self.live.stop() + def update_live(self): messages = sorted(self.messages.values(), key=lambda m: m.timestamp) content = [] for message in messages: content.append(format_message(message)) - self.live.update(Group(*content)) + self.live.update(Group(*content), refresh=True) def on_message_delta(self, delta: AssistantMessage, snapshot: AssistantMessage): self.messages[snapshot.id] = snapshot diff --git a/src/controlflow/tools/talk_to_human.py b/src/controlflow/tools/talk_to_human.py index 028483b9..6620d492 100644 --- a/src/controlflow/tools/talk_to_human.py +++ b/src/controlflow/tools/talk_to_human.py @@ -4,6 +4,7 @@ from prefect.context import FlowRunContext from prefect.input.run_input import receive_input +from rich.prompt import Prompt import controlflow from controlflow.llm.tools import tool @@ -16,8 +17,8 @@ async def get_terminal_input(): # as a convenience, we wait for human input on the local terminal # this is not necessary for the flow to run, but can be useful for testing loop = asyncio.get_event_loop() - user_input = await loop.run_in_executor(None, input, "Type your response: ") - # user_input = await loop.run_in_executor(None, Prompt.ask, "Type your response") + # user_input = await loop.run_in_executor(None, input, "Type your response: ") + user_input = await loop.run_in_executor(None, Prompt.ask, "Type your response") return user_input