Skip to content

Commit

Permalink
Replace actual processor by the pipeline - part 3
Browse files Browse the repository at this point in the history
  • Loading branch information
yorevs committed Oct 23, 2024
1 parent 3e0bcce commit 64d86dc
Show file tree
Hide file tree
Showing 13 changed files with 203 additions and 160 deletions.
2 changes: 1 addition & 1 deletion dependencies.hspd
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ package: html2text, version: 2024.2.26, mode: ge

/* CLI/TUI */
package: rich, version: 13.8.1, mode: ge
package: textual, version: 0.80.1, mode: ge
package: textual, version: 0.80.1, mode: eq

/* Audio */
package: soundfile, version: 0.12.1, mode: ge
Expand Down
2 changes: 2 additions & 0 deletions src/main/askai/__classpath__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@

if not is_a_tty():
log.getLogger().setLevel(log.ERROR)
else:
log.getLogger().setLevel(log.INFO)

if not os.environ.get("USER_AGENT"):
# The AskAI User Agent, required by the langchain framework
Expand Down
2 changes: 1 addition & 1 deletion src/main/askai/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def _main(self, *params, **kwargs) -> ExitStatus:
os.environ["ASKAI_APP"] = RunModes.ASKAI_CMD.value
return self._execute_command(query_string)

log.info(
log.debug(
dedent(
f"""
{os.environ.get("ASKAI_APP")} v{self._app_version}
Expand Down
5 changes: 2 additions & 3 deletions src/main/askai/core/processors/splitter/splitter_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,14 @@ def refine_answer(question: str, answer: str, acc_response: AccResponse | None =
return answer

@staticmethod
def process_action(action: SimpleNamespace) -> str:
def process_action(action: SimpleNamespace) -> Optional[str]:
"""TODO"""
path_str: str | None = (
"Path: " + action.path
if hasattr(action, "path") and action.path.upper() not in ["N/A", "NONE", ""]
else None
)
task: str = f"{action.task} {path_str or ''}"
return agent.invoke(task)
return agent.invoke(f"{action.task} {path_str or ''}")

def __init__(self):
self._rag: RAGProvider = RAGProvider("task-splitter.csv")
Expand Down
16 changes: 8 additions & 8 deletions src/main/askai/core/processors/splitter/splitter_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,10 @@ def run(self):
with self._console.status(msg.wait(), spinner="dots") as spinner:
while not self.pipeline.state == States.COMPLETE:
self.pipeline.track_previous()
spinner.update(f"{shared.nickname_spinner}[green]{self.pipeline.state.value}…[/green]")
if 0 < configs.max_router_retries < self.pipeline.failures[self.pipeline.state.value]:
if 1 < configs.max_router_retries < 1 + self.pipeline.failures[self.pipeline.state.value]:
self.display(f"\n[red] Max retries exceeded: {configs.max_router_retries}[/red]\n")
break
if 0 < configs.max_iteractions < self.pipeline.iteractions:
if 1 < configs.max_iteractions < 1 + self.pipeline.iteractions:
self.display(f"\n[red] Max iteractions exceeded: {configs.max_iteractions}[/red]\n")
break
match self.pipeline.state:
Expand All @@ -64,18 +63,18 @@ def run(self):
case States.TASK_SPLIT:
if self.pipeline.st_task_split():
if self.pipeline.is_direct():
self.display("[yellow] AI decided to respond directly[/yellow]")
self.display("[yellow]Direct answer provided[/yellow]")
self.pipeline.ev_direct_answer()
else:
spinner.update("[green] Executing action plan[/green]")
self.display(f"[green]Action plan created[/green]")
self.pipeline.ev_plan_created()
case States.EXECUTE_TASK:
if self.pipeline.st_execute_next():
if self.pipeline.st_execute_task():
self.pipeline.ev_task_executed()
case States.ACCURACY_CHECK:
case States.ACC_CHECK:
acc_color: AccColor = self.pipeline.st_accuracy_check()
c_name: str = acc_color.color.casefold()
self.display(f"[green]Accuracy check: [{c_name}]{c_name.upper()}[/{c_name}][/green]")
self.display(f"[green]Accuracy check: [{c_name}]{c_name.upper()}[/{c_name}][/green]")
if acc_color.passed(AccColor.GOOD):
self.pipeline.ev_accuracy_passed()
elif acc_color.passed(AccColor.MODERATE):
Expand All @@ -98,6 +97,7 @@ def run(self):
)
self.pipeline.failures[self.pipeline.state.value] += 1 if not execution_status else 0
self.display(f"[green]{execution_status_str}[/green]")
spinner.update(f"{shared.nickname_spinner}[green]{self.pipeline.state.value}…[/green]")
self.pipeline.iteractions += 1

if configs.is_debug:
Expand Down
191 changes: 96 additions & 95 deletions src/main/askai/core/processors/splitter/splitter_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
Copyright (c) 2024, HomeSetup
"""
import logging as log
import os
from collections import defaultdict
from typing import AnyStr, Optional
from typing import AnyStr

from hspylib.core.preconditions import check_state
from langchain_core.prompts import PromptTemplate
from transitions import Machine

Expand All @@ -27,9 +27,10 @@
from askai.core.model.action_plan import ActionPlan
from askai.core.model.model_result import ModelResult
from askai.core.processors.splitter.splitter_actions import actions
from askai.core.processors.splitter.splitter_result import SplitterResult, PipelineResponse
from askai.core.processors.splitter.splitter_states import States
from askai.core.processors.splitter.splitter_transitions import Transition, TRANSITIONS
from askai.core.router.evaluation import assert_accuracy, EVALUATION_GUIDE
from askai.core.router.evaluation import eval_response, EVALUATION_GUIDE
from askai.core.support.shared_instances import shared


Expand All @@ -40,35 +41,21 @@ class SplitterPipeline:

FAKE_SLEEP: float = 0.3

def __init__(self, query: AnyStr):
def __init__(self, question: AnyStr):
self._transitions: list[Transition] = [t for t in TRANSITIONS]
self._machine: Machine = Machine(
name="Taius-Coder",
model=self,
initial=States.STARTUP,
states=States,
transitions=self._transitions,
name="Taius-Coder", model=self,
initial=States.STARTUP, states=States, transitions=self._transitions,
auto_transitions=False
)
self._previous: States | None = None
self._failures: dict[str, int] = defaultdict(int)
self._previous: States = States.NOT_STARTED
self._iteractions: int = 0
self._query: str = query
self._plan: ActionPlan | None = None
self._direct_answer: Optional[str] = None
self._model: ModelResult | None = None
self._resp_history: list[str] = list()
self._last_acc_response: AccResponse | None = None
self._last_task: str | None = None

def _invalidate(self) -> None:
"""TODO"""
self._plan = None
self._direct_answer = None
self._model = None
self._resp_history = list()
self._last_acc_response = None
self._last_task = None
self._failures: dict[str, int] = defaultdict(int)
self._result: SplitterResult = SplitterResult(question)

@property
def previous(self) -> States:
return self._previous

@property
def iteractions(self) -> int:
Expand All @@ -79,56 +66,64 @@ def iteractions(self, value: int):
self._iteractions = value

@property
def last_acc_response(self) -> AccResponse:
return self._last_acc_response

@last_acc_response.setter
def last_acc_response(self, value: AccResponse) -> None:
self._last_acc_response = value
def failures(self) -> dict[str, int]:
return self._failures

@property
def last_task(self) -> str:
return self._last_task
def result(self) -> SplitterResult:
return self._result

@last_task.setter
def last_task(self, value: str) -> None:
self._last_task = value
@property
def responses(self) -> list[PipelineResponse]:
return self._result.responses

@property
def failures(self) -> dict[str, int]:
return self._failures
def question(self) -> str:
return self.result.question

@property
def plan(self) -> ActionPlan:
return self._plan
def last_query(self) -> str:
return self.responses[-1].query

@last_query.setter
def last_query(self, value: str) -> None:
self.responses[-1].query = value

@property
def model(self) -> ModelResult:
return self._model
def last_answer(self) -> str:
return self.responses[-1].answer

@last_answer.setter
def last_answer(self, value: str) -> None:
self.responses[-1].answer = value

@property
def previous(self) -> States:
return self._previous
def last_accuracy(self) -> AccResponse:
return self.responses[-1].accuracy

@last_accuracy.setter
def last_accuracy(self, value: AccResponse) -> None:
self.responses[-1].accuracy = value

@property
def query(self) -> str:
if self.last_task is not None:
question: str = self.last_task
else:
question: str = self._query
return question
def plan(self) -> ActionPlan:
return self.result.plan

@plan.setter
def plan(self, value: ActionPlan):
self.result.plan = value

@property
def final_answer(self) -> Optional[str]:
if self.is_direct():
ai_response: str = self._direct_answer
else:
ai_response: str = os.linesep.join(self._resp_history)
return ai_response
def model(self) -> ModelResult:
return self.result.model

@model.setter
def model(self, value: ModelResult):
self.result.model = value

@property
def resp_history(self) -> list[str]:
return self._resp_history
def final_answer(self) -> str:
return self.result.final_response()

def track_previous(self) -> None:
"""TODO"""
Expand All @@ -143,67 +138,73 @@ def is_direct(self) -> bool:
return self.plan.is_direct if self.plan is not None else True

def st_startup(self) -> bool:
"""TODO"""
log.info("Task Splitter pipeline has started!")
self._invalidate()
return True

def st_model_select(self) -> bool:
"""TODO"""
log.info("Selecting response model...")
self._model = ModelResult.default()
# FIXME: Model select is default for now
self.model = ModelResult.default()
return True

def st_task_split(self) -> bool:
"""TODO"""
log.info("Splitting tasks...")
self._plan = actions.split(self.query, self.model)
if self._plan.is_direct:
self._direct_answer = self._plan.speak or msg.no_output("TaskSplitter")
return True
if (plan := actions.split(self.question, self.model)) is not None:
if plan.is_direct:
self.responses.append(PipelineResponse(self.question, plan.speak or msg.no_output("TaskSplitter")))
self.plan = plan
return True
return False

def st_execute_next(self) -> bool:
def st_execute_task(self) -> bool:
"""TODO"""
check_state(self.plan.tasks is not None and len(self.plan.tasks) > 0)
_iter_ = self.plan.tasks.copy().__iter__()
if action := next(_iter_, None):
log.info(f"Executing task '{action}'...")
if agent_output := actions.process_action(action):
self.last_task = self.plan.tasks.pop(0).task if len(self.plan.tasks) > 0 else None
return self.last_task is not None
self.responses.append(PipelineResponse(action.task, agent_output))
return True
return False

def st_accuracy_check(self) -> AccColor:
"""TODO"""

if self.last_query is None or self.last_answer is None:
return AccColor.BAD

# FIXME Hardcoded for now
pass_threshold: AccColor = AccColor.GOOD

acc: AccResponse = assert_accuracy(self.query, self.final_answer, pass_threshold)

if acc.is_interrupt:
# AI flags that it can't continue interacting.
log.warning(msg.interruption_requested(self.final_answer))
elif acc.is_terminate:
# AI flags that the user wants to end the session.
log.warning(msg.terminate_requested(self.final_answer))
elif acc.is_pass(pass_threshold):
# AI provided a good answer.
log.warning(f"AI provided a final answer: {self.final_answer}")
self.resp_history.append(self.final_answer)
shared.memory.save_context({"input": self.query}, {"output": self.final_answer})
pass_threshold: AccColor = AccColor.MODERATE
acc: AccResponse = eval_response(self.last_query, self.last_answer)

if acc.is_interrupt: # AI flags that it can't continue interacting.
log.warning(msg.interruption_requested(self.last_answer))
self.plan.tasks.clear()
elif acc.is_terminate: # AI flags that the user wants to end the session.
log.warning(msg.terminate_requested(self.last_answer))
self.plan.tasks.clear()
elif acc.is_pass(pass_threshold): # AI provided a good answer.
log.info(f"AI provided a good answer: {self.last_answer}")
if len(self.plan.tasks) > 0:
self.plan.tasks.pop(0)
shared.memory.save_context({"input": self.last_query}, {"output": self.last_answer})
else:
if len(self.responses) > 0:
self.responses.pop(0)
acc_template = PromptTemplate(input_variables=["problems"], template=prompt.read_prompt("acc-report"))
# Include the guidelines for the first mistake.
if not shared.context.get("EVALUATION"):
if not shared.context.get("EVALUATION"): # Include the guidelines for the first mistake.
shared.context.push("EVALUATION", EVALUATION_GUIDE)
shared.context.push("EVALUATION", acc_template.format(problems=acc.details))

self.last_acc_response = acc
self.last_accuracy = acc

return acc.acc_color

def st_refine_answer(self) -> bool:
if self.is_direct:
ai_response: str = self.final_answer
else:
ai_response: str = os.linesep.join(self._resp_history)

return actions.refine_answer(self.query, ai_response, self.last_acc_response)
return actions.refine_answer(self.question, self.final_answer, self.last_accuracy)

def st_final_answer(self) -> bool:

return actions.wrap_answer(self.query, self.final_answer, self.model)
return actions.wrap_answer(self.question, self.final_answer, self.model)
Loading

0 comments on commit 64d86dc

Please sign in to comment.