Skip to content

Commit

Permalink
Replace actual processor by the pipeline - part1
Browse files Browse the repository at this point in the history
  • Loading branch information
yorevs committed Oct 21, 2024
1 parent 9b2caaf commit 5981283
Show file tree
Hide file tree
Showing 13 changed files with 318 additions and 299 deletions.
2 changes: 1 addition & 1 deletion src/demo/features/router_demo.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from askai.core.router.processors.task_splitter import splitter
from askai.core.processors.task_splitter import splitter
from askai.core.support.shared_instances import shared
from askai.core.support.utilities import display_text
from utils import init_context
Expand Down
2 changes: 1 addition & 1 deletion src/main/askai/core/askai.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from askai.core.engine.ai_engine import AIEngine
from askai.core.enums.router_mode import RouterMode
from askai.core.model.ai_reply import AIReply
from askai.core.router.processors.ai_processor import AIProcessor
from askai.core.processors.ai_processor import AIProcessor
from askai.core.support.chat_context import ChatContext
from askai.core.support.shared_instances import shared
from askai.core.support.utilities import read_stdin
Expand Down
5 changes: 0 additions & 5 deletions src/main/askai/core/enums/response_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,6 @@ class ResponseModel(Enumeration):
"provide a clear and accurate answer."
)

# REFINE_ANSWER
REFINER = "ASK_012", (
"Select this model to respond to improve the AI response. This should be only selected upon a clear request."
)

# fmt: on

@classmethod
Expand Down
12 changes: 6 additions & 6 deletions src/main/askai/core/enums/router_mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
"""
from askai.core.askai_configs import configs
from askai.core.askai_messages import msg
from askai.core.router.processors.ai_processor import AIProcessor
from askai.core.router.processors.chat import chat
from askai.core.router.processors.qna import qna
from askai.core.router.processors.qstring import qstring
from askai.core.router.processors.rag import rag
from askai.core.router.processors.task_splitter import splitter
from askai.core.processors.ai_processor import AIProcessor
from askai.core.processors.chat import chat
from askai.core.processors.qna import qna
from askai.core.processors.qstring import qstring
from askai.core.processors.rag import rag
from askai.core.processors.task_splitter import splitter
from functools import lru_cache
from hspylib.core.enums.enumeration import Enumeration
from hspylib.core.tools.dict_tools import get_or_default_by_key
Expand Down
5 changes: 1 addition & 4 deletions src/main/askai/core/model/action_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,7 @@ def _direct_answer(question: str, answer: str, goal: str, model: ModelResult) ->
:param model: The result model.
:return: An instance of ActionPlan created from the direct response.
"""
if answer:
speak: str = answer.split(',')[0].strip("'\"")
else:
speak: str = answer
speak: str = answer.strip("'").strip('"')

return ActionPlan(question, speak, goal, True, [], [], model)

Expand Down
152 changes: 152 additions & 0 deletions src/main/askai/core/processors/splitter/splitter_actions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
@project: HsPyLib-AskAI
@package: askai.core.processors.splitter.splitter_actions
@file: splitter_actions.py
@created: Mon, 21 Oct 2024
@author: <B>H</B>ugo <B>S</B>aporetti <B>J</B>unior
@site: https://github.com/yorevs/askai
@license: MIT - Please refer to <https://opensource.org/licenses/MIT>
Copyright (c) 2024, HomeSetup
"""
import logging as log
from pathlib import Path
from types import SimpleNamespace
from typing import Optional

from hspylib.core.metaclass.singleton import Singleton
from langchain_core.messages import AIMessage
from langchain_core.prompts import ChatPromptTemplate, PromptTemplate, MessagesPlaceholder
from langchain_core.runnables import RunnableWithMessageHistory

from askai.core.askai_configs import configs
from askai.core.askai_prompt import prompt
from askai.core.component.geo_location import geo_location
from askai.core.component.rag_provider import RAGProvider
from askai.core.engine.openai.temperature import Temperature
from askai.core.enums.response_model import ResponseModel
from askai.core.model.acc_response import AccResponse
from askai.core.model.action_plan import ActionPlan
from askai.core.model.model_result import ModelResult
from askai.core.router.agent_tools import features
from askai.core.router.task_agent import agent
from askai.core.router.tools.general import final_answer
from askai.core.support.langchain_support import lc_llm
from askai.core.support.shared_instances import shared


class SplitterActions(metaclass=Singleton):
"""TODO"""

INSTANCE: 'SplitterActions'

@staticmethod
def wrap_answer(question: str, answer: str, model_result: ModelResult = ModelResult.default()) -> str:
"""Provide a final answer to the user by wrapping the AI response with additional context.
:param question: The user's question.
:param answer: The AI's response to the question.
:param model_result: The result from the selected routing model (default is ModelResult.default()).
:return: A formatted string containing the final answer.
"""
output: str = answer
args = {"user": prompt.user.title(), "idiom": shared.idiom, "context": answer, "question": question}
prompt_args: list[str] = [k for k in args.keys()]
model: ResponseModel = ResponseModel.of_model(model_result.mid)
# events.reply.emit(reply=AIReply.full(msg.model_select(model)))

match model, configs.is_speak:
case ResponseModel.TERMINAL_COMMAND, True:
output = final_answer("taius-stt", prompt_args, **args)
case ResponseModel.ASSISTIVE_TECH_HELPER, _:
output = final_answer("taius-stt", prompt_args, **args)
case ResponseModel.CHAT_MASTER, _:
output = final_answer("taius-jarvis", prompt_args, **args)
case _:
pass # Default is to leave the last AI response as is

# Save the conversation to use with the task agent executor.
shared.memory.save_context({"input": question}, {"output": output})

return output

@staticmethod
def refine_answer(question: str, answer: str, acc_response: AccResponse | None = None) -> str:
"""TODO
:param question: The user's question.
:param answer: The AI's response to the question.
:param acc_response: The final accuracy response, if available.
"""
if acc_response and acc_response.reasoning:
ctx: str = str(shared.context.flat("HISTORY"))
args = {
"improvements": acc_response.details,
"context": ctx,
"response": answer,
"question": question,
}
prompt_args = [k for k in args.keys()]
# events.reply.emit(reply=AIReply.debug(msg.refine_answer(answer)))
return final_answer("taius-refiner", prompt_args, **args)

return answer

@staticmethod
def process_action(action: SimpleNamespace) -> str:
"""TODO"""
path_str: str | None = (
"Path: " + action.path
if hasattr(action, "path") and action.path.upper() not in ["N/A", "NONE", ""]
else None
)
task: str = f"{action.task} {path_str or ''}"
return agent.invoke(task)

def __init__(self):
self._rag: RAGProvider = RAGProvider("task-splitter.csv")

def splitter_template(self, query: str) -> ChatPromptTemplate:
"""Retrieve the processor Template."""

evaluation: str = str(shared.context.flat("EVALUATION"))
template = PromptTemplate(
input_variables=["os_type", "shell", "datetime", "home", "agent_tools", "rag"],
template=prompt.read_prompt("task-splitter.txt"),
)
return ChatPromptTemplate.from_messages(
[
(
"system",
template.format(
os_type=prompt.os_type,
shell=prompt.shell,
datetime=geo_location.datetime,
home=Path.home(),
agent_tools=features.available_tools,
rag=self._rag.get_rag_examples(query),
),
),
MessagesPlaceholder("chat_history"),
("assistant", evaluation),
("human", "Human Question: '{input}'"),
]
)

def split(self, question: str, model: ModelResult = ModelResult.default()) -> Optional[ActionPlan]:
"""Invoke the LLM to split the tasks and create an action plan."""
runnable = self.splitter_template(question) | lc_llm.create_chat_model(Temperature.COLDEST.temp)
runnable = RunnableWithMessageHistory(
runnable, shared.context.flat, input_messages_key="input", history_messages_key="chat_history"
)
response: AIMessage
if response := runnable.invoke({"input": question}, config={"configurable": {"session_id": "HISTORY"}}):
answer: str = str(response.content)
log.info("Router::[RESPONSE] Received from AI: \n%s.", answer)
return ActionPlan.create(question, answer, model)

return None


assert (actions := SplitterActions().INSTANCE) is not None
67 changes: 35 additions & 32 deletions src/main/askai/core/processors/splitter/splitter_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,34 +20,40 @@
from rich.console import Console

from askai.core.askai_configs import configs
from askai.core.askai_messages import msg
from askai.core.enums.acc_color import AccColor
from askai.core.processors.splitter.splitter_pipeline import SplitterPipeline
from askai.core.processors.splitter.splitter_states import States


class SplitterExecutor(Thread):
"""Responsible for executing a Taius Coder pipeline."""
"""Responsible for executing a TaskSplitter pipeline."""

def __init__(self, pipeline: SplitterPipeline):
def __init__(self, query: str):
super().__init__()
self._pipeline = pipeline
self._pipeline = SplitterPipeline(query)
self._console = Console()

@property
def pipeline(self) -> SplitterPipeline:
return self._pipeline

def display(self, message: str) -> None:
"""TODO"""
if configs.is_debug:
self._console.print(message)

@profiled
def run(self):
with self._console.status("Processing query...", spinner="dots") as spinner:
max_retries: int = configs.max_router_retries
max_interactions: int = configs.max_iteractions
with self._console.status(msg.wait(), spinner="dots") as spinner:
while not self.pipeline.state == States.COMPLETE:
self.pipeline.track_previous()
spinner.update(f"[green]{self.pipeline.state.value}[/green]")
if (0 < max_retries < self.pipeline.failures[self.pipeline.state.value]) \
and (0 < max_interactions < self.pipeline.iteractions):
spinner.update(f"\nMax state retries reached: {max_retries}")
if 0 < configs.max_router_retries < self.pipeline.failures[self.pipeline.state.value]:
self.display(f"\n[red] Max retries exceeded: {configs.max_router_retries}[/red]\n")
break
if 0 < configs.max_iteractions < self.pipeline.iteractions:
self.display(f"\n[red] Max iteractions exceeded: {configs.max_iteractions}[/red]\n")
break
match self.pipeline.state:
case States.STARTUP:
Expand All @@ -57,19 +63,20 @@ def run(self):
if self.pipeline.st_model_select():
self.pipeline.ev_model_selected()
case States.TASK_SPLIT:
status, direct = self.pipeline.st_task_split()
if status:
if direct:
if self.pipeline.st_task_split():
if self.pipeline.is_direct:
spinner.update("[yellow] AI decided to respond directly[/yellow]")
self.pipeline.ev_direct_answer()
else:
spinner.update("[green] Executing action plan[/green]")
self.pipeline.ev_plan_created()
case States.EXECUTE_TASK:
if self.pipeline.st_execute_next():
self.pipeline.ev_task_executed()
case States.ACCURACY_CHECK:
acc_color: AccColor = self.pipeline.st_accuracy_check()
c_name: str = acc_color.color.casefold()
self._console.print(f"[green] Accuracy check: [{c_name}]{c_name.upper()}[/{c_name}][/green]")
spinner.update(f"[green] Accuracy check: [{c_name}]{c_name.upper()}[/{c_name}][/green]")
if acc_color.passed(AccColor.GOOD):
self.pipeline.ev_accuracy_passed()
elif acc_color.passed(AccColor.MODERATE):
Expand All @@ -79,40 +86,36 @@ def run(self):
case States.REFINE_ANSWER:
if self.pipeline.st_refine_answer():
self.pipeline.ev_answer_refined()
case States.WRAP_ANSWER:
if self.pipeline.st_final_answer():
self.pipeline.ev_final_answer()
case _:
spinner.update(f"Error: Machine stopped before it was done ({self.pipeline.state}) %NC%")
self.display(f"[red]Error: Machine halted before complete!({self.pipeline.state})[/red]")
break
execution_status: bool = self.pipeline.previous != self.pipeline.state
execution_status_str: str = (
f"{'[green]√[/green]' if execution_status else '[red]X[/red]'}"
f" {str(self.pipeline.previous)}"
)
self.pipeline.failures[self.pipeline.state.value] += 1 if not execution_status else 0
self._console.print(f"[green]{execution_status_str}[/green]")
self.display(f"[green]{execution_status_str}[/green]")
self.pipeline.iteractions += 1

if configs.is_debug:
final_state: States = self.pipeline.state
final_state_str: str = '[green] Succeeded[/green] ' \
final_state_str: str = '[green] Succeeded[/green] ' \
if final_state == States.COMPLETE \
else '[red]X Failed [/red]'
self._console.print(
else '[red] Failed [/red]'
self.display(
f"\n[cyan]Pipeline execution {final_state_str} [cyan][{final_state}][/cyan] "
f"after [yellow]{self.pipeline.iteractions}[/yellow] iteractions\n"
)
f"after [yellow]{self.pipeline.iteractions}[/yellow] iteractions\n")
all_failures: str = indent(
os.linesep.join([f"- {k}: {c}" for k, c in self.pipeline.failures.items()]),
' '
)
self._console.print(f"Failures:\n{all_failures}")
os.linesep.join([f"- {k}: {c}" for k, c in self.pipeline.failures.items()]), ' ')
self.display(f"Failures:\n{all_failures}")

if final_state != States.COMPLETE:
retries: int = self.pipeline.failures[self.pipeline.state.value]
self._console.print(f"Failed to generate a response after {retries} retries")

self.display(f"Failed to generate a response after {retries} retries")

if __name__ == '__main__':
query: str = "What is the size of the moon"
p: SplitterPipeline = SplitterPipeline(query)
executor: SplitterExecutor = SplitterExecutor(p)
executor.start()
executor.join()
if self.pipeline.state == States.COMPLETE and self.pipeline.final_answer:
print(self.pipeline.final_answer)
Loading

0 comments on commit 5981283

Please sign in to comment.