From 59812838c448c8ae39b16f26932ff3eb3e2f8f17 Mon Sep 17 00:00:00 2001 From: Hugo Saporetti Junior Date: Mon, 21 Oct 2024 18:34:44 -0300 Subject: [PATCH] Replace actual processor by the pipeline - part1 --- src/demo/features/router_demo.py | 2 +- src/main/askai/core/askai.py | 2 +- src/main/askai/core/enums/response_model.py | 5 - src/main/askai/core/enums/router_mode.py | 12 +- src/main/askai/core/model/action_plan.py | 5 +- .../processors/splitter/splitter_actions.py | 152 +++++++++++++ .../processors/splitter/splitter_executor.py | 67 +++--- .../processors/splitter/splitter_pipeline.py | 116 +++++++--- .../processors/splitter/splitter_states.py | 15 +- .../splitter/splitter_transitions.py | 5 +- .../askai/core/processors/task_splitter.py | 199 ++---------------- src/main/askai/core/router/evaluation.py | 35 +-- src/main/askai/core/router/task_agent.py | 2 - 13 files changed, 318 insertions(+), 299 deletions(-) create mode 100644 src/main/askai/core/processors/splitter/splitter_actions.py diff --git a/src/demo/features/router_demo.py b/src/demo/features/router_demo.py index a50d3a27..ad649833 100644 --- a/src/demo/features/router_demo.py +++ b/src/demo/features/router_demo.py @@ -1,4 +1,4 @@ -from askai.core.router.processors.task_splitter import splitter +from askai.core.processors.task_splitter import splitter from askai.core.support.shared_instances import shared from askai.core.support.utilities import display_text from utils import init_context diff --git a/src/main/askai/core/askai.py b/src/main/askai/core/askai.py index 53d81457..b116b5e5 100644 --- a/src/main/askai/core/askai.py +++ b/src/main/askai/core/askai.py @@ -22,7 +22,7 @@ from askai.core.engine.ai_engine import AIEngine from askai.core.enums.router_mode import RouterMode from askai.core.model.ai_reply import AIReply -from askai.core.router.processors.ai_processor import AIProcessor +from askai.core.processors.ai_processor import AIProcessor from askai.core.support.chat_context import ChatContext from askai.core.support.shared_instances import shared from askai.core.support.utilities import read_stdin diff --git a/src/main/askai/core/enums/response_model.py b/src/main/askai/core/enums/response_model.py index c4935b23..6fd94696 100644 --- a/src/main/askai/core/enums/response_model.py +++ b/src/main/askai/core/enums/response_model.py @@ -85,11 +85,6 @@ class ResponseModel(Enumeration): "provide a clear and accurate answer." ) - # REFINE_ANSWER - REFINER = "ASK_012", ( - "Select this model to respond to improve the AI response. This should be only selected upon a clear request." - ) - # fmt: on @classmethod diff --git a/src/main/askai/core/enums/router_mode.py b/src/main/askai/core/enums/router_mode.py index b3426eb4..026b734d 100644 --- a/src/main/askai/core/enums/router_mode.py +++ b/src/main/askai/core/enums/router_mode.py @@ -14,12 +14,12 @@ """ from askai.core.askai_configs import configs from askai.core.askai_messages import msg -from askai.core.router.processors.ai_processor import AIProcessor -from askai.core.router.processors.chat import chat -from askai.core.router.processors.qna import qna -from askai.core.router.processors.qstring import qstring -from askai.core.router.processors.rag import rag -from askai.core.router.processors.task_splitter import splitter +from askai.core.processors.ai_processor import AIProcessor +from askai.core.processors.chat import chat +from askai.core.processors.qna import qna +from askai.core.processors.qstring import qstring +from askai.core.processors.rag import rag +from askai.core.processors.task_splitter import splitter from functools import lru_cache from hspylib.core.enums.enumeration import Enumeration from hspylib.core.tools.dict_tools import get_or_default_by_key diff --git a/src/main/askai/core/model/action_plan.py b/src/main/askai/core/model/action_plan.py index 3c771a9c..1c0598c8 100644 --- a/src/main/askai/core/model/action_plan.py +++ b/src/main/askai/core/model/action_plan.py @@ -91,10 +91,7 @@ def _direct_answer(question: str, answer: str, goal: str, model: ModelResult) -> :param model: The result model. :return: An instance of ActionPlan created from the direct response. """ - if answer: - speak: str = answer.split(',')[0].strip("'\"") - else: - speak: str = answer + speak: str = answer.strip("'").strip('"') return ActionPlan(question, speak, goal, True, [], [], model) diff --git a/src/main/askai/core/processors/splitter/splitter_actions.py b/src/main/askai/core/processors/splitter/splitter_actions.py new file mode 100644 index 00000000..2e7a5675 --- /dev/null +++ b/src/main/askai/core/processors/splitter/splitter_actions.py @@ -0,0 +1,152 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" + @project: HsPyLib-AskAI + @package: askai.core.processors.splitter.splitter_actions + @file: splitter_actions.py + @created: Mon, 21 Oct 2024 + @author: Hugo Saporetti Junior + @site: https://github.com/yorevs/askai + @license: MIT - Please refer to + + Copyright (c) 2024, HomeSetup +""" +import logging as log +from pathlib import Path +from types import SimpleNamespace +from typing import Optional + +from hspylib.core.metaclass.singleton import Singleton +from langchain_core.messages import AIMessage +from langchain_core.prompts import ChatPromptTemplate, PromptTemplate, MessagesPlaceholder +from langchain_core.runnables import RunnableWithMessageHistory + +from askai.core.askai_configs import configs +from askai.core.askai_prompt import prompt +from askai.core.component.geo_location import geo_location +from askai.core.component.rag_provider import RAGProvider +from askai.core.engine.openai.temperature import Temperature +from askai.core.enums.response_model import ResponseModel +from askai.core.model.acc_response import AccResponse +from askai.core.model.action_plan import ActionPlan +from askai.core.model.model_result import ModelResult +from askai.core.router.agent_tools import features +from askai.core.router.task_agent import agent +from askai.core.router.tools.general import final_answer +from askai.core.support.langchain_support import lc_llm +from askai.core.support.shared_instances import shared + + +class SplitterActions(metaclass=Singleton): + """TODO""" + + INSTANCE: 'SplitterActions' + + @staticmethod + def wrap_answer(question: str, answer: str, model_result: ModelResult = ModelResult.default()) -> str: + """Provide a final answer to the user by wrapping the AI response with additional context. + :param question: The user's question. + :param answer: The AI's response to the question. + :param model_result: The result from the selected routing model (default is ModelResult.default()). + :return: A formatted string containing the final answer. + """ + output: str = answer + args = {"user": prompt.user.title(), "idiom": shared.idiom, "context": answer, "question": question} + prompt_args: list[str] = [k for k in args.keys()] + model: ResponseModel = ResponseModel.of_model(model_result.mid) + # events.reply.emit(reply=AIReply.full(msg.model_select(model))) + + match model, configs.is_speak: + case ResponseModel.TERMINAL_COMMAND, True: + output = final_answer("taius-stt", prompt_args, **args) + case ResponseModel.ASSISTIVE_TECH_HELPER, _: + output = final_answer("taius-stt", prompt_args, **args) + case ResponseModel.CHAT_MASTER, _: + output = final_answer("taius-jarvis", prompt_args, **args) + case _: + pass # Default is to leave the last AI response as is + + # Save the conversation to use with the task agent executor. + shared.memory.save_context({"input": question}, {"output": output}) + + return output + + @staticmethod + def refine_answer(question: str, answer: str, acc_response: AccResponse | None = None) -> str: + """TODO + :param question: The user's question. + :param answer: The AI's response to the question. + :param acc_response: The final accuracy response, if available. + """ + if acc_response and acc_response.reasoning: + ctx: str = str(shared.context.flat("HISTORY")) + args = { + "improvements": acc_response.details, + "context": ctx, + "response": answer, + "question": question, + } + prompt_args = [k for k in args.keys()] + # events.reply.emit(reply=AIReply.debug(msg.refine_answer(answer))) + return final_answer("taius-refiner", prompt_args, **args) + + return answer + + @staticmethod + def process_action(action: SimpleNamespace) -> str: + """TODO""" + path_str: str | None = ( + "Path: " + action.path + if hasattr(action, "path") and action.path.upper() not in ["N/A", "NONE", ""] + else None + ) + task: str = f"{action.task} {path_str or ''}" + return agent.invoke(task) + + def __init__(self): + self._rag: RAGProvider = RAGProvider("task-splitter.csv") + + def splitter_template(self, query: str) -> ChatPromptTemplate: + """Retrieve the processor Template.""" + + evaluation: str = str(shared.context.flat("EVALUATION")) + template = PromptTemplate( + input_variables=["os_type", "shell", "datetime", "home", "agent_tools", "rag"], + template=prompt.read_prompt("task-splitter.txt"), + ) + return ChatPromptTemplate.from_messages( + [ + ( + "system", + template.format( + os_type=prompt.os_type, + shell=prompt.shell, + datetime=geo_location.datetime, + home=Path.home(), + agent_tools=features.available_tools, + rag=self._rag.get_rag_examples(query), + ), + ), + MessagesPlaceholder("chat_history"), + ("assistant", evaluation), + ("human", "Human Question: '{input}'"), + ] + ) + + def split(self, question: str, model: ModelResult = ModelResult.default()) -> Optional[ActionPlan]: + """Invoke the LLM to split the tasks and create an action plan.""" + runnable = self.splitter_template(question) | lc_llm.create_chat_model(Temperature.COLDEST.temp) + runnable = RunnableWithMessageHistory( + runnable, shared.context.flat, input_messages_key="input", history_messages_key="chat_history" + ) + response: AIMessage + if response := runnable.invoke({"input": question}, config={"configurable": {"session_id": "HISTORY"}}): + answer: str = str(response.content) + log.info("Router::[RESPONSE] Received from AI: \n%s.", answer) + return ActionPlan.create(question, answer, model) + + return None + + +assert (actions := SplitterActions().INSTANCE) is not None diff --git a/src/main/askai/core/processors/splitter/splitter_executor.py b/src/main/askai/core/processors/splitter/splitter_executor.py index d7f35d3f..ec6baebc 100644 --- a/src/main/askai/core/processors/splitter/splitter_executor.py +++ b/src/main/askai/core/processors/splitter/splitter_executor.py @@ -20,34 +20,40 @@ from rich.console import Console from askai.core.askai_configs import configs +from askai.core.askai_messages import msg from askai.core.enums.acc_color import AccColor from askai.core.processors.splitter.splitter_pipeline import SplitterPipeline from askai.core.processors.splitter.splitter_states import States class SplitterExecutor(Thread): - """Responsible for executing a Taius Coder pipeline.""" + """Responsible for executing a TaskSplitter pipeline.""" - def __init__(self, pipeline: SplitterPipeline): + def __init__(self, query: str): super().__init__() - self._pipeline = pipeline + self._pipeline = SplitterPipeline(query) self._console = Console() @property def pipeline(self) -> SplitterPipeline: return self._pipeline + def display(self, message: str) -> None: + """TODO""" + if configs.is_debug: + self._console.print(message) + @profiled def run(self): - with self._console.status("Processing query...", spinner="dots") as spinner: - max_retries: int = configs.max_router_retries - max_interactions: int = configs.max_iteractions + with self._console.status(msg.wait(), spinner="dots") as spinner: while not self.pipeline.state == States.COMPLETE: self.pipeline.track_previous() spinner.update(f"[green]{self.pipeline.state.value}[/green]") - if (0 < max_retries < self.pipeline.failures[self.pipeline.state.value]) \ - and (0 < max_interactions < self.pipeline.iteractions): - spinner.update(f"\nMax state retries reached: {max_retries}") + if 0 < configs.max_router_retries < self.pipeline.failures[self.pipeline.state.value]: + self.display(f"\n[red] Max retries exceeded: {configs.max_router_retries}[/red]\n") + break + if 0 < configs.max_iteractions < self.pipeline.iteractions: + self.display(f"\n[red] Max iteractions exceeded: {configs.max_iteractions}[/red]\n") break match self.pipeline.state: case States.STARTUP: @@ -57,11 +63,12 @@ def run(self): if self.pipeline.st_model_select(): self.pipeline.ev_model_selected() case States.TASK_SPLIT: - status, direct = self.pipeline.st_task_split() - if status: - if direct: + if self.pipeline.st_task_split(): + if self.pipeline.is_direct: + spinner.update("[yellow] AI decided to respond directly[/yellow]") self.pipeline.ev_direct_answer() else: + spinner.update("[green] Executing action plan[/green]") self.pipeline.ev_plan_created() case States.EXECUTE_TASK: if self.pipeline.st_execute_next(): @@ -69,7 +76,7 @@ def run(self): case States.ACCURACY_CHECK: acc_color: AccColor = self.pipeline.st_accuracy_check() c_name: str = acc_color.color.casefold() - self._console.print(f"[green] Accuracy check: [{c_name}]{c_name.upper()}[/{c_name}][/green]") + spinner.update(f"[green] Accuracy check: [{c_name}]{c_name.upper()}[/{c_name}][/green]") if acc_color.passed(AccColor.GOOD): self.pipeline.ev_accuracy_passed() elif acc_color.passed(AccColor.MODERATE): @@ -79,8 +86,11 @@ def run(self): case States.REFINE_ANSWER: if self.pipeline.st_refine_answer(): self.pipeline.ev_answer_refined() + case States.WRAP_ANSWER: + if self.pipeline.st_final_answer(): + self.pipeline.ev_final_answer() case _: - spinner.update(f"Error: Machine stopped before it was done ({self.pipeline.state}) %NC%") + self.display(f"[red]Error: Machine halted before complete!({self.pipeline.state})[/red]") break execution_status: bool = self.pipeline.previous != self.pipeline.state execution_status_str: str = ( @@ -88,31 +98,24 @@ def run(self): f" {str(self.pipeline.previous)}" ) self.pipeline.failures[self.pipeline.state.value] += 1 if not execution_status else 0 - self._console.print(f"[green]{execution_status_str}[/green]") + self.display(f"[green]{execution_status_str}[/green]") self.pipeline.iteractions += 1 + if configs.is_debug: final_state: States = self.pipeline.state - final_state_str: str = '[green]√ Succeeded[/green] ' \ + final_state_str: str = '[green] Succeeded[/green] ' \ if final_state == States.COMPLETE \ - else '[red]X Failed [/red]' - self._console.print( + else '[red] Failed [/red]' + self.display( f"\n[cyan]Pipeline execution {final_state_str} [cyan][{final_state}][/cyan] " - f"after [yellow]{self.pipeline.iteractions}[/yellow] iteractions\n" - ) + f"after [yellow]{self.pipeline.iteractions}[/yellow] iteractions\n") all_failures: str = indent( - os.linesep.join([f"- {k}: {c}" for k, c in self.pipeline.failures.items()]), - ' ' - ) - self._console.print(f"Failures:\n{all_failures}") + os.linesep.join([f"- {k}: {c}" for k, c in self.pipeline.failures.items()]), ' ') + self.display(f"Failures:\n{all_failures}") if final_state != States.COMPLETE: retries: int = self.pipeline.failures[self.pipeline.state.value] - self._console.print(f"Failed to generate a response after {retries} retries") - + self.display(f"Failed to generate a response after {retries} retries") -if __name__ == '__main__': - query: str = "What is the size of the moon" - p: SplitterPipeline = SplitterPipeline(query) - executor: SplitterExecutor = SplitterExecutor(p) - executor.start() - executor.join() + if self.pipeline.state == States.COMPLETE and self.pipeline.final_answer: + print(self.pipeline.final_answer) diff --git a/src/main/askai/core/processors/splitter/splitter_pipeline.py b/src/main/askai/core/processors/splitter/splitter_pipeline.py index f8a8a09e..a9c551ec 100644 --- a/src/main/askai/core/processors/splitter/splitter_pipeline.py +++ b/src/main/askai/core/processors/splitter/splitter_pipeline.py @@ -12,17 +12,28 @@ Copyright (c) 2024, HomeSetup """ +import logging as log +import os import random from collections import defaultdict -from typing import AnyStr +from typing import AnyStr, Optional import pause +from langchain_core.prompts import PromptTemplate from transitions import Machine +from askai.core.askai_messages import msg +from askai.core.askai_prompt import prompt from askai.core.enums.acc_color import AccColor +from askai.core.model.acc_response import AccResponse from askai.core.model.action_plan import ActionPlan +from askai.core.model.model_result import ModelResult +from askai.core.processors.splitter.splitter_actions import actions from askai.core.processors.splitter.splitter_states import States from askai.core.processors.splitter.splitter_transitions import Transition, TRANSITIONS +from askai.core.router.evaluation import assert_accuracy, EVALUATION_GUIDE +from askai.core.support.shared_instances import shared +from askai.exception.exceptions import InterruptionRequest, TerminatingQuery class SplitterPipeline: @@ -30,7 +41,7 @@ class SplitterPipeline: state: States - FAKE_SLEEP: float = 0.0 + FAKE_SLEEP: float = 0.3 def __init__(self, query: AnyStr): self._transitions: list[Transition] = [t for t in TRANSITIONS] @@ -47,10 +58,9 @@ def __init__(self, query: AnyStr): self._iteractions: int = 0 self._query: str = query self._plan: ActionPlan | None = None - - @property - def failures(self) -> dict[str, int]: - return self._failures + self._final_answer: Optional[str] = None + self._model: ModelResult | None = None + self._resp_history: list[str] = list() @property def iteractions(self) -> int: @@ -60,53 +70,107 @@ def iteractions(self) -> int: def iteractions(self, value: int): self._iteractions = value + @property + def failures(self) -> dict[str, int]: + return self._failures + @property def plan(self) -> ActionPlan: return self._plan + @property + def model(self) -> ModelResult: + return self._model + @property def previous(self) -> States: return self._previous + @property + def query(self) -> str: + return self._query + + @property + def final_answer(self) -> Optional[str]: + return self._final_answer + + @property + def resp_history(self) -> list[str]: + return self._resp_history + def track_previous(self) -> None: + """TODO""" self._previous = self.state def has_next(self) -> bool: """TODO""" - # return len(self.plan.tasks) > 0 if self.plan else False - return random.choice([True, False]) + return len(self.plan.tasks) > 0 if self.plan and self.plan.tasks else False def is_direct(self) -> bool: """TODO""" - # return self.plan.is_direct if self.plan else True - return random.choice([True, False]) + return self.plan.is_direct if self.plan else True def st_startup(self) -> bool: - result = random.choice([True, False]) - pause.seconds(self.FAKE_SLEEP) - return result + log.info("Task Splitter pipeline has started!") + return True def st_model_select(self) -> bool: - result = random.choice([True, False]) - pause.seconds(self.FAKE_SLEEP) - return result + log.info("Selecting response model...") + self._model = ModelResult.default() + return True - def st_task_split(self) -> tuple[bool, bool]: - result1, result2 = random.choice([True, False]), random.choice([True, False]) - pause.seconds(self.FAKE_SLEEP) - return result1, result2 + def st_task_split(self) -> bool: + log.info("Splitting tasks...") + self._plan = actions.split(self.query, self.model) + if self._plan.is_direct: + self._final_answer = self._plan.speak or msg.no_output("TaskSplitter") + return True def st_execute_next(self) -> bool: - result = random.choice([True, False]) - pause.seconds(self.FAKE_SLEEP) - return result + _iter_ = self.plan.tasks.copy().__iter__() + if action := next(_iter_, None): + if agent_output := actions.process_action(action): + self.resp_history.append(agent_output) + self.plan.tasks.pop(0) + return False def st_accuracy_check(self) -> AccColor: - color = AccColor.value_of(random.choice(AccColor.names())) - pause.seconds(self.FAKE_SLEEP) - return color + + # FIXME Hardcoded for now + pass_threshold: AccColor = AccColor.GOOD + + if self.is_direct: + ai_response: str = self.final_answer + else: + ai_response: str = os.linesep.join(self._resp_history) + + acc: AccResponse = assert_accuracy(self.query, ai_response, pass_threshold) + + if acc.is_interrupt: + # AI flags that it can't continue interacting. + log.warning(msg.interruption_requested(ai_response)) + raise InterruptionRequest(ai_response) + elif acc.is_terminate: + # AI flags that the user wants to end the session. + raise TerminatingQuery(ai_response) + elif acc.is_pass(pass_threshold): + shared.memory.save_context({"input": self.query}, {"output": self.final_answer}) + else: + acc_template = PromptTemplate(input_variables=["problems"], template=prompt.read_prompt("acc-report")) + # Include the guidelines for the first mistake. + if not shared.context.get("EVALUATION"): + shared.context.push("EVALUATION", EVALUATION_GUIDE) + shared.context.push("EVALUATION", acc_template.format(problems=acc.details)) + + return acc.acc_color def st_refine_answer(self) -> bool: result = random.choice([True, False]) pause.seconds(self.FAKE_SLEEP) return result + + def st_final_answer(self) -> bool: + self._final_answer = "This is the final answer" + result = random.choice([True, False]) + pause.seconds(self.FAKE_SLEEP) + return result diff --git a/src/main/askai/core/processors/splitter/splitter_states.py b/src/main/askai/core/processors/splitter/splitter_states.py index aaded8b3..6809943f 100644 --- a/src/main/askai/core/processors/splitter/splitter_states.py +++ b/src/main/askai/core/processors/splitter/splitter_states.py @@ -18,11 +18,12 @@ class States(Enumeration): """Enumeration of possible task splitter states.""" # fmt: off - STARTUP = 'Processing query' - MODEL_SELECT = 'Selecting Model' - TASK_SPLIT = 'Splitting Task' - ACCURACY_CHECK = 'Checking Accuracy' - EXECUTE_TASK = 'Executing Task' - REFINE_ANSWER = 'Refining Answer' - COMPLETE = 'Completed' + STARTUP = ' Processing query' + MODEL_SELECT = ' Selecting Model' + TASK_SPLIT = ' Splitting Tasks' + ACCURACY_CHECK = ' Checking Accuracy' + EXECUTE_TASK = ' Executing Task' + REFINE_ANSWER = ' Refining Answer' + WRAP_ANSWER = ' Wrapping final answer' + COMPLETE = 'ﲏ Completed' # fmt: on diff --git a/src/main/askai/core/processors/splitter/splitter_transitions.py b/src/main/askai/core/processors/splitter/splitter_transitions.py index 05cb71bf..1d01f8ca 100644 --- a/src/main/askai/core/processors/splitter/splitter_transitions.py +++ b/src/main/askai/core/processors/splitter/splitter_transitions.py @@ -33,10 +33,11 @@ {'trigger': 'ev_accuracy_passed', 'source': States.ACCURACY_CHECK, 'dest': States.EXECUTE_TASK, 'conditions': ['has_next']}, {'trigger': 'ev_accuracy_passed', 'source': States.ACCURACY_CHECK, 'dest': States.COMPLETE, 'unless': ['has_next']}, - {'trigger': 'ev_accuracy_failed', 'source': States.ACCURACY_CHECK, 'dest': States.EXECUTE_TASK, 'conditions': ['has_next']}, + {'trigger': 'ev_accuracy_failed', 'source': States.ACCURACY_CHECK, 'dest': States.EXECUTE_TASK}, {'trigger': 'ev_refine_required', 'source': States.ACCURACY_CHECK, 'dest': States.REFINE_ANSWER, 'unless': ['has_next']}, - {'trigger': 'ev_answer_refined', 'source': States.REFINE_ANSWER, 'dest': States.COMPLETE}, + {'trigger': 'ev_answer_refined', 'source': States.REFINE_ANSWER, 'dest': States.WRAP_ANSWER}, + {'trigger': 'ev_final_answer', 'source': States.WRAP_ANSWER, 'dest': States.COMPLETE}, {'trigger': 'ev_task_complete', 'source': States.TASK_SPLIT, 'dest': States.COMPLETE}, ] diff --git a/src/main/askai/core/processors/task_splitter.py b/src/main/askai/core/processors/task_splitter.py index 850edbf8..b130d955 100644 --- a/src/main/askai/core/processors/task_splitter.py +++ b/src/main/askai/core/processors/task_splitter.py @@ -12,40 +12,18 @@ Copyright (c) 2024, HomeSetup """ -from askai.core.askai_configs import configs -from askai.core.askai_events import events -from askai.core.askai_messages import msg -from askai.core.askai_prompt import prompt -from askai.core.component.geo_location import geo_location -from askai.core.component.rag_provider import RAGProvider -from askai.core.engine.openai.temperature import Temperature -from askai.core.enums.acc_color import AccColor -from askai.core.enums.response_model import ResponseModel -from askai.core.model.acc_response import AccResponse -from askai.core.model.action_plan import ActionPlan -from askai.core.model.ai_reply import AIReply -from askai.core.model.model_result import ModelResult -from askai.core.router.agent_tools import features -from askai.core.router.evaluation import assert_accuracy -from askai.core.router.task_agent import agent -from askai.core.router.tools.general import final_answer -from askai.core.support.langchain_support import lc_llm -from askai.core.support.shared_instances import shared -from askai.exception.exceptions import InaccurateResponse, InterruptionRequest, TerminatingQuery +import logging as log +import os +from pathlib import Path +from typing import Any, Optional, Type, TypeAlias + from hspylib.core.exception.exceptions import InvalidArgumentError from hspylib.core.metaclass.singleton import Singleton -from langchain_core.messages import AIMessage -from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder, PromptTemplate -from langchain_core.runnables.history import RunnableWithMessageHistory -from pathlib import Path from pydantic_core import ValidationError -from retry import retry -from textwrap import dedent -from types import SimpleNamespace -from typing import Any, Optional, Type, TypeAlias -import logging as log -import os +from askai.core.processors.splitter.splitter_executor import SplitterExecutor +from askai.core.support.shared_instances import shared +from askai.exception.exceptions import InaccurateResponse, TerminatingQuery AgentResponse: TypeAlias = dict[str, Any] @@ -56,8 +34,6 @@ class TaskSplitter(metaclass=Singleton): INSTANCE: "TaskSplitter" - HUMAN_PROMPT: str = dedent("""Human Question: '{input}'""").strip() - # fmt: off # Allow the router to retry on the errors bellow. RETRIABLE_ERRORS: tuple[Type[Exception], ...] = ( @@ -66,88 +42,6 @@ class TaskSplitter(metaclass=Singleton): ValidationError ) # fmt: on - @staticmethod - def wrap_answer( - question: str, - answer: str, - model_result: ModelResult = ModelResult.default(), - acc_response: AccResponse | None = None, - ) -> str: - """Provide a final answer to the user by wrapping the AI response with additional context. - :param question: The user's question. - :param answer: The AI's response to the question. - :param model_result: The result from the selected routing model (default is ModelResult.default()). - :param acc_response: The final accuracy response, if available. - :return: A formatted string containing the final answer. - """ - output: str = answer - args = {"user": prompt.user.title(), "idiom": shared.idiom, "context": answer, "question": question} - prompt_args: list[str] = [k for k in args.keys()] - model: ResponseModel = ( - ResponseModel.REFINER - if acc_response and (acc_response.acc_color > AccColor.GOOD) - else ResponseModel.of_model(model_result.mid) - ) - events.reply.emit(reply=AIReply.full(msg.model_select(model))) - - match model, configs.is_speak: - case ResponseModel.TERMINAL_COMMAND, True: - output = final_answer("taius-stt", prompt_args, **args) - case ResponseModel.ASSISTIVE_TECH_HELPER, _: - output = final_answer("taius-stt", prompt_args, **args) - case ResponseModel.CHAT_MASTER, _: - output = final_answer("taius-jarvis", prompt_args, **args) - case ResponseModel.REFINER, _: - if acc_response and acc_response.reasoning: - ctx: str = str(shared.context.flat("HISTORY")) - args = { - "improvements": acc_response.details, - "context": ctx, - "response": answer, - "question": question, - } - prompt_args = [k for k in args.keys()] - events.reply.emit(reply=AIReply.debug(msg.refine_answer(answer))) - output = final_answer("taius-refiner", prompt_args, **args) - case _: - pass # Default is to leave the last AI response as is - - # Save the conversation to use with the task agent executor. - shared.memory.save_context({"input": question}, {"output": output}) - - return output - - def __init__(self): - self._approved: bool = False - self._rag: RAGProvider = RAGProvider("task-splitter.csv") - - def template(self, query: str) -> ChatPromptTemplate: - """Retrieve the processor Template.""" - - evaluation: str = str(shared.context.flat("EVALUATION")) - template = PromptTemplate( - input_variables=["os_type", "shell", "datetime", "home", "agent_tools", "rag"], - template=prompt.read_prompt("task-splitter.txt"), - ) - return ChatPromptTemplate.from_messages( - [ - ( - "system", - template.format( - os_type=prompt.os_type, - shell=prompt.shell, - datetime=geo_location.datetime, - home=Path.home(), - agent_tools=features.available_tools, - rag=self._rag.get_rag_examples(query), - ), - ), - MessagesPlaceholder("chat_history"), - ("assistant", evaluation), - ("human", self.HUMAN_PROMPT), - ] - ) - def process(self, question: str, **_) -> Optional[str]: """Process the user question by splitting complex tasks into smaller single actionable tasks. :param question: The user question to process. @@ -156,81 +50,14 @@ def process(self, question: str, **_) -> Optional[str]: if not question or question.casefold() in ["exit", "leave", "quit", "q"]: raise TerminatingQuery("The user wants to exit!") + executor = SplitterExecutor(question) os.chdir(Path.home()) shared.context.forget("EVALUATION") # Erase previous evaluation notes. - model: ModelResult = ModelResult.default() # Hard-coding the result model for now. - log.info("Router::[QUESTION] '%s'", question) - retries: int = 0 - - @retry(exceptions=self.RETRIABLE_ERRORS, tries=configs.max_router_retries, backoff=1, jitter=1) - def _splitter_wrapper_(retry_count: int) -> Optional[str]: - retry_count += 1 - # Invoke the LLM to split the tasks and create an action plan. - runnable = self.template(question) | lc_llm.create_chat_model(Temperature.COLDEST.temp) - runnable = RunnableWithMessageHistory( - runnable, shared.context.flat, input_messages_key="input", history_messages_key="chat_history" - ) - response: AIMessage - if response := runnable.invoke({"input": question}, config={"configurable": {"session_id": "HISTORY"}}): - answer: str = str(response.content) - log.info("Router::[RESPONSE] Received from AI: \n%s.", answer) - plan = ActionPlan.create(question, answer, model) - if not plan.is_direct and (task_list := plan.tasks): - acc_response: str | None = None - events.reply.emit(reply=AIReply.debug(msg.action_plan(str(plan)))) - if plan.speak: - events.reply.emit(reply=AIReply.info(plan.speak)) - try: - agent_output: str | None = self._process_tasks(task_list, retries) - if len(task_list) > 1: - acc_response: AccResponse = assert_accuracy(question, agent_output, AccColor.MODERATE) - except InterruptionRequest as err: - return str(err) - except self.RETRIABLE_ERRORS: - if retry_count <= 1: - events.reply.emit(reply=AIReply.error(msg.sorry_retry())) - raise - return self.wrap_answer(question, agent_output, plan.model, acc_response) - else: - # Most of the times, indicates the LLM responded directly. - acc_response: AccResponse = assert_accuracy(question, response.content, AccColor.GOOD) - if not (output := plan.speak): - output = msg.no_output("Task-Splitter") - return self.wrap_answer(question, output, plan.model, acc_response) - else: - return msg.no_output("Task-Splitter") # Most of the times, indicates a failure. - - return _splitter_wrapper_(retries) - - @retry(exceptions=RETRIABLE_ERRORS, tries=configs.max_router_retries, backoff=1, jitter=1) - def _process_tasks(self, task_list: list[SimpleNamespace], retry_count: int) -> Optional[str]: - """Wrapper to allow accuracy retries.""" - retry_count += 1 - resp_history: list[str] = list() - - if not task_list: - return None - - try: - _iter_ = task_list.copy().__iter__() - while action := next(_iter_, None): - path_str: str | None = ( - "Path: " + action.path - if hasattr(action, "path") and action.path.upper() not in ["N/A", "NONE", ""] - else None - ) - task: str = f"{action.task} {path_str or ''}" - if agent_output := agent.invoke(task): - resp_history.append(agent_output) - task_list.pop(0) - except (InterruptionRequest, TerminatingQuery) as err: - return str(err) - except self.RETRIABLE_ERRORS: - if retry_count <= 1: - events.reply.emit(reply=AIReply.error(msg.sorry_retry())) - raise + log.info("TaskSplitter::[QUESTION] '%s'", question) + executor.start() + executor.join() # Wait for the pipeline execution. - return os.linesep.join(resp_history) if resp_history else msg.no_output("Task-Splitter") + return executor.pipeline.final_answer assert (splitter := TaskSplitter().INSTANCE) is not None diff --git a/src/main/askai/core/router/evaluation.py b/src/main/askai/core/router/evaluation.py index fd41a63f..7bc877d4 100644 --- a/src/main/askai/core/router/evaluation.py +++ b/src/main/askai/core/router/evaluation.py @@ -13,6 +13,13 @@ Copyright (c) 2024, HomeSetup """ +import logging as log +from textwrap import dedent + +from langchain_core.messages import AIMessage +from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder, PromptTemplate +from langchain_core.runnables.history import RunnableWithMessageHistory + from askai.core.askai_events import events from askai.core.askai_messages import msg from askai.core.askai_prompt import prompt @@ -23,13 +30,6 @@ from askai.core.model.ai_reply import AIReply from askai.core.support.langchain_support import lc_llm from askai.core.support.shared_instances import shared -from askai.exception.exceptions import InaccurateResponse, InterruptionRequest, TerminatingQuery -from langchain_core.messages import AIMessage -from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder, PromptTemplate -from langchain_core.runnables.history import RunnableWithMessageHistory -from textwrap import dedent - -import logging as log # fmt: off EVALUATION_GUIDE: str = dedent(""" @@ -54,7 +54,6 @@ def assert_accuracy(question: str, ai_response: str, pass_threshold: AccColor = :return: The accuracy classification of the AI's response as an AccResponse enum value. """ if ai_response and ai_response not in msg.accurate_responses: - acc_template = PromptTemplate(input_variables=["problems"], template=prompt.read_prompt("acc-report")) eval_template = PromptTemplate( input_variables=["rag", "input", "response"], template=prompt.read_prompt("evaluation") ) @@ -64,25 +63,7 @@ def assert_accuracy(question: str, ai_response: str, pass_threshold: AccColor = response: AIMessage = llm.invoke(final_prompt) if response and (output := response.content): - if acc := AccResponse.parse_response(output): - log.info("Accuracy check -> status: '%s' details: '%s'", acc.status, acc.details) - events.reply.emit(reply=AIReply.debug(msg.assert_acc(acc.status, acc.details))) - if acc.is_interrupt: - # AI flags that it can't continue interacting. - log.warning(msg.interruption_requested(output)) - raise InterruptionRequest(ai_response) - elif acc.is_terminate: - # AI flags that the user wants to end the session. - raise TerminatingQuery(ai_response) - elif not acc.is_pass(pass_threshold): - # Include the guidelines for the first mistake. - if not shared.context.get("EVALUATION"): - shared.context.push("EVALUATION", EVALUATION_GUIDE) - shared.context.push("EVALUATION", acc_template.format(problems=acc.details)) - raise InaccurateResponse(f"AI Assistant failed to respond => '{response.content}'") - return acc - # At this point, the response was inaccurate. - raise InaccurateResponse(f"AI Assistant didn't respond accurately. Response: '{response}'") + return AccResponse.parse_response(output) def resolve_x_refs(ref_name: str, context: str | None = None) -> str: diff --git a/src/main/askai/core/router/task_agent.py b/src/main/askai/core/router/task_agent.py index 9cab51d0..6a88fa7f 100644 --- a/src/main/askai/core/router/task_agent.py +++ b/src/main/askai/core/router/task_agent.py @@ -74,8 +74,6 @@ def invoke(self, task: str) -> str: shared.context.push("HISTORY", task, "assistant") if (response := self._exec_task(task)) and (output := response["output"]): log.info("Router::[RESPONSE] Received from AI: \n%s.", output) - assert_accuracy(task, output, AccColor.MODERATE) - shared.memory.save_context({"input": task}, {"output": output}) else: output = msg.no_output("AI") shared.context.push("HISTORY", output, "assistant")