diff --git a/src/main/askai/core/askai_cli.py b/src/main/askai/core/askai_cli.py
index 3c380b23..1af54c62 100644
--- a/src/main/askai/core/askai_cli.py
+++ b/src/main/askai/core/askai_cli.py
@@ -71,7 +71,9 @@ def __init__(
def run(self) -> None:
"""Run the application."""
- signal.signal(signal.SIGINT, self.abort)
+ signal.signal(signal.SIGINT, self.abort) # Handle Ctrl+C
+ signal.signal(signal.SIGTERM, self.abort) # Handle termination requests
+ signal.signal(signal.SIGHUP, self.abort) # Handle terminal hangup
while question := (self._query_string or self._input()):
status, output = self.ask_and_reply(question)
if not status:
@@ -81,9 +83,7 @@ def run(self) -> None:
cache.save_reply(question, output)
cache.save_input_history()
# FIXME This is only writing the final answer to the markdown file.
- with open(
- self.console_path, "a+", encoding=Charset.UTF_8.val
- ) as f_console:
+ with open(self.console_path, "a+", encoding=Charset.UTF_8.val) as f_console:
f_console.write(f"{shared.username_md}{question}\n\n")
f_console.write(f"{shared.nickname_md}{output}\n\n")
f_console.flush()
@@ -119,9 +119,7 @@ def _input(self) -> Optional[str]:
"""Read the user input from stdin.
:return: The user's input as a string, or None if no input is provided.
"""
- return shared.input_text(
- f"{shared.username}", f"{msg.t('Message')} {self.engine.nickname()}"
- )
+ return shared.input_text(f"{shared.username}", f"{msg.t('Message')} {self.engine.nickname()}")
def _cb_reply_event(self, ev: Event) -> None:
"""Callback to handle reply events.
@@ -143,15 +141,11 @@ def _cb_mode_changed_event(self, ev: Event) -> None:
"""
self.mode: RouterMode = RouterMode.of_name(ev.args.mode)
if self.mode == RouterMode.QNA:
- welcome_msg = self.mode.welcome(
- sum_path=ev.args.sum_path, sum_glob=ev.args.glob
- )
+ welcome_msg = self.mode.welcome(sum_path=ev.args.sum_path, sum_glob=ev.args.glob)
else:
welcome_msg = self.mode.welcome()
- events.reply.emit(
- reply=AIReply.info(welcome_msg or msg.welcome(prompt.user.title()))
- )
+ events.reply.emit(reply=AIReply.info(welcome_msg or msg.welcome(prompt.user.title())))
def _cb_mic_listening_event(self, ev: Event) -> None:
"""Callback to handle microphone listening events.
@@ -196,9 +190,7 @@ def _startup(self) -> None:
if configs.is_interactive:
splash_thread: Thread = Thread(daemon=True, target=self._splash)
splash_thread.start()
- task = progress.add_task(
- f'[green] {msg.t("Starting up...")}', total=len(tasks)
- )
+ task = progress.add_task(f'[green] {msg.t("Starting up...")}', total=len(tasks))
with progress:
os.chdir(Path.home())
progress.update(
@@ -206,9 +198,7 @@ def _startup(self) -> None:
advance=1,
description=f'[green] {msg.t("Downloading nltk data")}',
)
- nltk.download(
- "averaged_perceptron_tagger", quiet=True, download_dir=CACHE_DIR
- )
+ nltk.download("averaged_perceptron_tagger", quiet=True, download_dir=CACHE_DIR)
cache.cache_enable = configs.is_cache
progress.update(
task,
@@ -248,9 +238,7 @@ def _startup(self) -> None:
display_text(str(self), markdown=False)
self._reply(AIReply.info(self.mode.welcome()))
elif configs.is_speak:
- nltk.download(
- "averaged_perceptron_tagger", quiet=True, download_dir=CACHE_DIR
- )
+ nltk.download("averaged_perceptron_tagger", quiet=True, download_dir=CACHE_DIR)
recorder.setup()
player.start_delay()
# Register the startup
diff --git a/src/main/askai/core/commander/commander.py b/src/main/askai/core/commander/commander.py
index 8b238f7c..789403d3 100644
--- a/src/main/askai/core/commander/commander.py
+++ b/src/main/askai/core/commander/commander.py
@@ -2,15 +2,15 @@
# -*- coding: utf-8 -*-
"""
- @project: HsPyLib-AskAI
- @package: askai.core.commander.commander
- @file: commander.py
- @created: Thu, 25 Apr 2024
- @author: Hugo Saporetti Junior
- @site: https://github.com/yorevs/askai
- @license: MIT - Please refer to
-
- Copyright (c) 2024, AskAI
+@project: HsPyLib-AskAI
+@package: askai.core.commander.commander
+ @file: commander.py
+@created: Thu, 25 Apr 2024
+ @author: Hugo Saporetti Junior
+ @site: https://github.com/yorevs/askai
+@license: MIT - Please refer to
+
+Copyright (c) 2024, AskAI
"""
from askai.core.askai_configs import configs
from askai.core.askai_events import ASKAI_BUS_NAME, AskAiEvents, events, REPLY_EVENT
@@ -20,7 +20,7 @@
from askai.core.commander.commands.history_cmd import HistoryCmd
from askai.core.commander.commands.settings_cmd import SettingsCmd
from askai.core.commander.commands.tts_stt_cmd import TtsSttCmd
-from askai.core.component.rag_provider import RAGProvider
+from askai.core.component.rag_provider import RAGProvider, RAG_EXT_DIR
from askai.core.enums.router_mode import RouterMode
from askai.core.support.shared_instances import shared
from askai.core.support.text_formatter import text_formatter
@@ -32,7 +32,7 @@
from hspylib.core.enums.charset import Charset
from hspylib.core.tools.commons import sysout, to_bool
from hspylib.modules.eventbus.event import Event
-from os.path import dirname
+from os.path import dirname, basename
from pathlib import Path
from string import Template
from textwrap import dedent
@@ -41,9 +41,9 @@
import os
import re
+# fmt: off
COMMANDER_HELP_TPL = Template(
- dedent(
- """\
+ dedent("""\
# AskAI Commander - HELP
> Commands:
@@ -55,18 +55,16 @@
| **Key** | **Action** |
| -------- | ----------------------------- |
- | *Ctrl+L* | **Push-To-Talk.** |
+ | *Ctrl+L* | **Activate Push-To-Talk.** |
| *Ctrl+R* | **Reset the input field.** |
| *Ctrl+F* | **Forget the input history.** |
+ | *Ctrl+D* | **Exit the application.** |
> To get help about a specific command type: '/help \\'
- """
- )
-)
+ """))
COMMANDER_HELP_CMD_TPL = Template(
- dedent(
- """\
+ dedent("""\
# AskAI Commander - HELP
```
%CYAN%Command: %ORANGE%${command}%NC%
@@ -75,9 +73,8 @@
%CYAN%Usage:\t%WHITE%/${usage}
```
- """
- )
-)
+ """))
+# fmt: on
RE_ASKAI_CMD: str = r"^(? str:
return COMMANDER_HELP_CMD_TPL.substitute(command=command.name.title(), docstr=docstr, usage=usage_str)
+def color_bool(condition: bool, true_text: str = "ON", false_text: str = "OFF") -> str:
+ """TODO"""
+ return ("%GREEN% " + true_text if condition else "%RED% " + false_text) + "%NC%"
+
+
def _init_context(context_size: int = 1000, engine_name: str = "openai", model_name: str = "gpt-4o-mini") -> None:
"""Initialize the AskAI context and startup components.
:param context_size: The maximum size of the context window (default is 1000).
@@ -191,23 +193,21 @@ def help(command: str | None) -> None:
def assistive() -> None:
"""Toggle assistive mode ON/OFF."""
configs.is_assistive = not configs.is_assistive
- text_formatter.commander_print(
- f"`Assistive responses` is {'%GREEN%ON' if configs.is_assistive else '%RED%OFF'}%NC%"
- )
+ text_formatter.commander_print(f"`Assistive responses` is {color_bool(configs.is_assistive)}")
@ask_commander.command()
def debug() -> None:
"""Toggle debug mode ON/OFF."""
configs.is_debug = not configs.is_debug
- text_formatter.commander_print(f"`Debugging` is {'%GREEN%ON' if configs.is_debug else '%RED%OFF'}%NC%")
+ text_formatter.commander_print(f"`Debugging` is {color_bool(configs.is_debug)}")
@ask_commander.command()
def speak() -> None:
"""Toggle speak mode ON/OFF."""
configs.is_speak = not configs.is_speak
- text_formatter.commander_print(f"`Speech-To-Text` is {'%GREEN%ON' if configs.is_speak else '%RED%OFF'}%NC%")
+ text_formatter.commander_print(f"`Speech-To-Text` is {color_bool(configs.is_speak)}")
@ask_commander.command()
@@ -339,7 +339,7 @@ def cache(operation: str, args: tuple[str, ...]) -> None:
configs.ttl = int(args[0])
text_formatter.commander_print(f"Cache TTL was set to *{args[0]} minutes* !")
case _:
- text_formatter.commander_print(f"Cache is *{'en' if configs.is_rag else 'dis'}abled* !")
+ text_formatter.commander_print(f"`Caching` is {color_bool(configs.is_cache)}")
@ask_commander.command()
@@ -473,24 +473,48 @@ def mode(router_mode: str) -> None:
@click.argument("args", nargs=-1)
def rag(operation: str, args: tuple[str, ...]) -> None:
"""Manages AskAI RAG features.
- :param operation: Specifies the rag operation. Options: [add|enable]
+ :param operation: Specifies the rag operation. Options: [list|add|del|clear|enable]
:param args: Arguments relevant to the chosen operation.
"""
match operation.casefold():
case "add":
if not args:
- err: str = str(click.MissingParameter(f"Arguments missing. Usage /rag add \\"))
+ err: str = str(click.MissingParameter(f"Arguments missing. Usage /rag add \\"))
text_formatter.commander_print(f"Error: {err}")
else:
- folder: Path = Path(args[0])
- if not folder.exists():
- text_formatter.commander_print(f"Error: Could not find folder: '{folder}'")
+ res: Path = Path(args[0])
+ if not res.exists():
+ text_formatter.commander_print(f"Error: Could not find folder: '{res}'")
else:
- if RAGProvider.copy_rag(folder):
- text_formatter.commander_print(f"RAG folder '{folder}' has been *added* to rag directory !")
+ if RAGProvider.copy_rag(res):
+ text_formatter.commander_print(f"RAG folder '{res}' has been *added* to rag directory !")
else:
- text_formatter.commander_print(f"Error: Failed to add RAG folder: '{folder}' !")
-
+ text_formatter.commander_print(f"Error: Failed to add RAG folder: '{res}' !")
+ case "list":
+ sysout(f"> Listing ARG entries from: `{RAG_EXT_DIR}`", markdown=True)
+ results = list()
+ for entry in sorted(RAG_EXT_DIR.iterdir()):
+ pathname: str = basename(entry)
+ if pathname.startswith(".") or pathname == "rag-documents.txt":
+ continue
+ entry_str: str = " " if entry.is_dir() else " "
+ results.append(f"- **{entry_str}:** {pathname + ('/' if entry.is_dir() else '')}")
+ sysout(os.linesep.join(sorted(results)), markdown=True)
+ case "del":
+ if not args:
+ err: str = str(click.MissingParameter(f"Arguments missing. Usage /rag del \\"))
+ text_formatter.commander_print(f"Error: {err}")
+ else:
+ res: Path = Path(args[0])
+ if RAGProvider.del_rag(res):
+ text_formatter.commander_print(f"RAG resource '{res}' has been *deleted* from rag directory !")
+ else:
+ text_formatter.commander_print(f"Error: Failed to delete RAG folder: '{res}' !")
+ case "clear":
+ if RAGProvider.clear():
+ text_formatter.commander_print(f"ALL RAG resources been *deleted* from rag directory !")
+ else:
+ text_formatter.commander_print(f"Error: Failed to wipe the RAG folder!")
case "enable":
if not args:
err: str = str(click.MissingParameter(f"Arguments missing. Usage /rag enable \\<0|1\\>"))
@@ -499,4 +523,4 @@ def rag(operation: str, args: tuple[str, ...]) -> None:
configs.is_rag = to_bool(args[0])
text_formatter.commander_print(f"RAG has been *{'en' if configs.is_rag else 'dis'}abled* !")
case _:
- text_formatter.commander_print(f"RAG is *{'en' if configs.is_rag else 'dis'}abled* !")
+ text_formatter.commander_print(f"`RAG` is {color_bool(configs.is_rag)}")
diff --git a/src/main/askai/core/component/rag_provider.py b/src/main/askai/core/component/rag_provider.py
index 965c2dc3..24633b39 100644
--- a/src/main/askai/core/component/rag_provider.py
+++ b/src/main/askai/core/component/rag_provider.py
@@ -2,35 +2,36 @@
# -*- coding: utf-8 -*-
"""
- @project: HsPyLib-AskAI
- @package: askai.core.support
- @file: rag_provider.py
- @created: Wed, 28 Aug 2024
- @author: Hugo Saporetti Junior
- @site: https://github.com/yorevs/askai
- @license: MIT - Please refer to
-
- Copyright (c) 2024, AskAI
+@project: HsPyLib-AskAI
+@package: askai.core.support
+ @file: rag_provider.py
+@created: Wed, 28 Aug 2024
+ @author: Hugo Saporetti Junior
+ @site: https://github.com/yorevs/askai
+@license: MIT - Please refer to
+
+Copyright (c) 2024, AskAI
"""
-from askai.__classpath__ import classpath
-from askai.core.askai_configs import configs
-from askai.core.askai_settings import ASKAI_DIR
-from askai.core.support.langchain_support import lc_llm
+import shutil
+from pathlib import Path
+import glob
+import os
+
+
from hspylib.core.config.path_object import PathObject
from hspylib.core.metaclass.classpath import AnyPath
from hspylib.core.preconditions import check_state
-from hspylib.core.tools.commons import dirname, file_is_not_empty
+from hspylib.core.tools.commons import dirname, file_is_not_empty, safe_delete_file
from hspylib.core.tools.text_tools import ensure_endswith, hash_text
from langchain_community.document_loaders import CSVLoader
from langchain_community.vectorstores import FAISS
from langchain_core.documents import Document
from langchain_core.vectorstores import VectorStore
-from pathlib import Path
-from shutil import copyfile
-import glob
-import os
-import shutil
+from askai.__classpath__ import classpath
+from askai.core.askai_configs import configs
+from askai.core.askai_settings import ASKAI_DIR
+from askai.core.support.langchain_support import lc_llm
# External RAG Directory
RAG_EXT_DIR: Path = Path(f"{ASKAI_DIR}/rag")
@@ -50,17 +51,17 @@ def copy_rag(
dest_name: AnyPath | None = None,
rag_dir: AnyPath = RAG_EXT_DIR,
) -> bool:
- """Copy the RAG documents into the specified RAG directory.
+ """Copy RAG documents into the specified RAG directory.
:param path_name: The path of the RAG documents to copy.
:param dest_name: The destination, within the RAG directory, where the documents will be copied to. If None,
defaults to a hashed directory based on the source path.
- :param rag_dir: The directory where RAG documents will be copied.
+ :param rag_dir: The directory where RAG documents will be copied to.
:return: True if the copy operation was successful, False otherwise.
"""
src_path: PathObject = PathObject.of(path_name)
if src_path.exists and src_path.is_file:
file: str = f"{rag_dir}/{dest_name or src_path.filename}"
- copyfile(str(src_path), file)
+ shutil.copyfile(str(src_path), file)
elif src_path.exists and src_path.is_dir:
shutil.copytree(
str(src_path),
@@ -70,6 +71,7 @@ def copy_rag(
)
else:
return False
+
files: list[str] = sorted(glob.glob(f"{str(rag_dir)}/**/*.*", recursive=True))
rag_files: str = "".join(list(ensure_endswith(d, os.linesep) for d in files))
rag_docs_file: Path = Path(os.path.join(rag_dir), "rag-documents.txt")
@@ -77,19 +79,61 @@ def copy_rag(
return True
+ @classmethod
+ def del_rag(
+ cls,
+ path_name: AnyPath,
+ rag_dir: AnyPath = RAG_EXT_DIR,
+ ) -> bool:
+ """Delete RAG documents from the specified RAG directory.
+ :param path_name: The path of the RAG documents to delete.
+ :param rag_dir: The directory where RAG documents will be deleted from.
+ :return: True if the deletion operation was successful, False otherwise.
+ """
+ src_path: PathObject = PathObject.of(f"{rag_dir}/{path_name}")
+ if src_path.exists:
+ if src_path.is_file:
+ ret_val = safe_delete_file(src_path.abs_dir)
+ else:
+ shutil.rmtree(src_path.abs_dir)
+ ret_val = True
+ else:
+ return False
+
+ if ret_val:
+ files: list[str] = sorted(glob.glob(f"{str(rag_dir)}/**/*.*", recursive=True))
+ rag_files: str = "".join(list(ensure_endswith(d, os.linesep) for d in files))
+ rag_docs_file: Path = Path(os.path.join(rag_dir), "rag-documents.txt")
+ rag_docs_file.write_text(rag_files)
+
+ return ret_val
+
+ @classmethod
+ def clear(
+ cls,
+ rag_dir: AnyPath = RAG_EXT_DIR,
+ ) -> bool:
+ """Wipe RAG documents from the specified RAG directory.
+ :param rag_dir: The directory where RAG documents will be deleted from.
+ :return: True if the deletion operation was successful, False otherwise.
+ """
+ try:
+ src_path: PathObject = PathObject.of(rag_dir)
+ shutil.rmtree(src_path.abs_dir)
+ os.makedirs(src_path.abs_dir)
+ return True
+ except OSError:
+ return False
+
@staticmethod
def requires_update(rag_dir: AnyPath = RAG_EXT_DIR) -> bool:
"""Check whether the RAG directory has changed and therefore, requires an update from the Chroma DB.
:return: True if an update is required, False otherwise
"""
rag_docs_file: Path = Path(os.path.join(rag_dir), "rag-documents.txt")
- rag_hash_file: Path = Path(
- os.path.join(dirname(str(rag_docs_file)), ".rag-hash")
- )
+ rag_hash_file: Path = Path(os.path.join(dirname(str(rag_docs_file)), ".rag-hash"))
files_hash: str = hash_text(Path(rag_docs_file).read_text())
- if not os.path.exists(str(rag_docs_file)) or not os.path.exists(
- str(rag_hash_file)
- ):
+ if not os.path.exists(str(rag_docs_file)) or not os.path.exists(str(rag_hash_file)):
rag_hash_file.write_text(files_hash)
return True
rag_hash: str = rag_hash_file.read_text()
@@ -111,9 +155,7 @@ def get_rag_examples(self, query: str, k: int = configs.rag_retrival_amount) ->
"""
if configs.is_rag:
if self._rag_db is None:
- self._rag_db = FAISS.from_documents(
- self._rag_docs, lc_llm.create_embeddings()
- )
+ self._rag_db = FAISS.from_documents(self._rag_docs, lc_llm.create_embeddings())
example_docs: list[Document] = self._rag_db.similarity_search(query, k=k)
rag_examples: list[str] = [doc.page_content for doc in example_docs]
return f'**Examples:**\n"""{(2 * os.linesep).join(rag_examples)}"""'
diff --git a/src/main/askai/core/support/shared_instances.py b/src/main/askai/core/support/shared_instances.py
index a3825484..81727199 100644
--- a/src/main/askai/core/support/shared_instances.py
+++ b/src/main/askai/core/support/shared_instances.py
@@ -2,15 +2,15 @@
# -*- coding: utf-8 -*-
"""
- @project: "askai"
- @package: "askai".main.askai.core.support
- @file: shared_instances.py
- @created: Tue, 23 Apr 2024
- @author: "Hugo Saporetti Junior
- @site: "https://github.com/yorevs/askai")
- @license: MIT - Please refer to
-
- Copyright (c) 2024, AskAI
+@project: "askai"
+@package: "askai".main.askai.core.support
+ @file: shared_instances.py
+@created: Tue, 23 Apr 2024
+ @author: "Hugo Saporetti Junior
+ @site: "https://github.com/yorevs/askai")
+@license: MIT - Please refer to
+
+Copyright (c) 2024, AskAI
"""
from askai.__classpath__ import classpath
from askai.core.askai_configs import configs
@@ -114,21 +114,13 @@ def max_iteractions(self) -> int:
@property
def app_info(self) -> str:
device_info = f"{recorder.input_device[1]}" if recorder.input_device else ""
- device_info += (
- f", %YELLOW%AUTO-SWAP {'%GREEN%' if recorder.is_auto_swap else '%RED%'}"
- )
+ device_info += f", %YELLOW%AUTO-SWAP {'%GREEN%' if recorder.is_auto_swap else '%RED%'}"
dtm = f" {geo_location.datetime} "
speak_info = str(configs.tempo) + " @" + self.engine.configs().tts_voice
cur_dir = elide_text(str(Path(os.getcwd()).absolute()), 67, "…")
- translator = (
- f"translated by '{msg.translator.name()}'"
- if configs.language.name.title() != "English"
- else ""
- )
+ translator = f"translated by '{msg.translator.name()}'" if configs.language.name.title() != "English" else ""
eng: AIEngine = shared.engine
- model_info: str = (
- f"'{eng.ai_model_name()}'%YELLOW% {eng.ai_token_limit()}%GREEN% tokens"
- )
+ model_info: str = f"'{eng.ai_model_name()}'%YELLOW% {eng.ai_token_limit()}%GREEN% tokens"
engine_info: str = f"{eng.ai_name()} - %CYAN%{eng.nickname()} / {model_info}"
rag_info: str = "%GREEN% " if configs.is_rag else "%RED% "
assist_info: str = "%GREEN% " if configs.is_assistive else "%RED% "
@@ -181,9 +173,7 @@ def create_context(self, token_limit: int) -> ChatContext:
ctx.append(ContextEntry(role, content))
return self._context
- def create_memory(
- self, memory_key: str = "chat_history"
- ) -> ConversationBufferWindowMemory:
+ def create_memory(self, memory_key: str = "chat_history") -> ConversationBufferWindowMemory:
"""Create or retrieve the conversation window memory.
:param memory_key: The key used to identify the memory (default is "chat_history").
:return: An instance of BaseChatMemory associated with the specified memory key.
@@ -197,14 +187,10 @@ def create_memory(
if configs.is_keep_context:
entries: list[str] = cache.read_memory()
for role, content in zip(entries[::2], entries[1::2]):
- self._memory.chat_memory.add_message(
- self.context.LANGCHAIN_ROLE_MAP[role](content)
- )
+ self._memory.chat_memory.add_message(self.context.LANGCHAIN_ROLE_MAP[role](content))
return self._memory
- def input_text(
- self, input_prompt: str, placeholder: str | None = None
- ) -> Optional[str]:
+ def input_text(self, input_prompt: str, placeholder: str | None = None) -> Optional[str]:
"""Prompt the user for input.
:param input_prompt: The text prompt to display to the user.
:param placeholder: The placeholder text to display in the input field (optional).
@@ -212,13 +198,13 @@ def input_text(
"""
ret = None
while ret is None:
- if (
- ret := line_input(input_prompt, placeholder)
- ) == Keyboard.VK_CTRL_L: # Use voice input.
+ if (ret := line_input(input_prompt, placeholder)) == Keyboard.VK_CTRL_L: # Use voice input.
terminal.cursor.erase_line()
if spoken_text := self.engine.speech_to_text():
display_text(f"{self.username}: {spoken_text}")
ret = spoken_text
+ elif ret == Keyboard.VK_CTRL_D:
+ return ""
return ret if not ret or isinstance(ret, str) else ret.val
diff --git a/src/main/askai/core/support/text_formatter.py b/src/main/askai/core/support/text_formatter.py
index c8ef8054..e1733036 100644
--- a/src/main/askai/core/support/text_formatter.py
+++ b/src/main/askai/core/support/text_formatter.py
@@ -1,13 +1,13 @@
"""
- @project: HsPyLib-AskAI
- @package: askai.core.support.text_formatter
- @file: text_formatter.py
- @created: Fri, 28 Feb 2024
- @author: Hugo Saporetti Junior
- @site: https://github.com/yorevs/askai
- @license: MIT - Please refer to
-
- Copyright (c) 2024, AskAI
+@project: HsPyLib-AskAI
+@package: askai.core.support.text_formatter
+ @file: text_formatter.py
+@created: Fri, 28 Feb 2024
+ @author: Hugo Saporetti Junior
+ @site: https://github.com/yorevs/askai
+@license: MIT - Please refer to
+
+Copyright (c) 2024, AskAI
"""
from textwrap import dedent
@@ -161,8 +161,6 @@ def commander_print(self, text: AnyStr) -> None:
"""
cmd_message: str = f"%ORANGE% Commander%NC%: {str(text)}"
self.display_markdown(cmd_message)
- if os.environ.get("ASKAI_APP") is not None:
- events.reply.emit(reply=AIReply.info(VtColor.strip_colors(cmd_message)))
assert (text_formatter := TextFormatter().INSTANCE) is not None