Skip to content

Commit

Permalink
Improve screen capturer and fixed the splitter rag by removing the re…
Browse files Browse the repository at this point in the history
…asoning column as it was making the AI to commit more mistakes
  • Loading branch information
yorevs committed Nov 22, 2024
1 parent 439205b commit 83f79b7
Show file tree
Hide file tree
Showing 9 changed files with 115 additions and 61 deletions.
4 changes: 2 additions & 2 deletions src/demo/others/screenshot_demo.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from askai.core.router.tools.vision import take_screenshot
from askai.core.router.tools.vision import capture_screenshot
from hspylib.core.tools.commons import sysout
from utils import init_context

if __name__ == "__main__":
init_context("camera-demo")
sysout(take_screenshot("gabiroba.jpeg"))
sysout(capture_screenshot("gabiroba.jpeg"))
3 changes: 3 additions & 0 deletions src/main/askai/core/askai_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ def describe_image(self, image_path: AnyPath) -> str:
def model_select(self, model: AnyStr) -> str:
return f"~~[DEBUG]~~ Using routing model: `{model}`"

def parsing_caption(self) -> str:
return f"~~[DEBUG]~~ Parsing caption…"

def task(self, task: AnyStr) -> str:
return f"~~[DEBUG]~~ > `Task:` {task}"

Expand Down
5 changes: 5 additions & 0 deletions src/main/askai/core/component/cache_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@
if not PICTURE_DIR.exists():
PICTURE_DIR.mkdir(parents=True, exist_ok=True)

# Desktop screenshots cache directory.
SCREENSHOTS_DIR: Path = Path(str(CACHE_DIR) + "/screenshots")
if not SCREENSHOTS_DIR.exists():
SCREENSHOTS_DIR.mkdir(parents=True, exist_ok=True)

# Camera photo shots cache directory.
PHOTO_DIR: Path = Path(str(PICTURE_DIR) + "/photos")
if not PHOTO_DIR.exists():
Expand Down
4 changes: 2 additions & 2 deletions src/main/askai/core/component/camera.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class Camera(metaclass=Singleton):
ALG: str = configs.face_detect_alg

@staticmethod
def _countdown(count: int) -> None:
def countdown(count: int) -> None:
"""Display a countdown before taking a photo.
:param count: The number of seconds for the countdown.
"""
Expand Down Expand Up @@ -113,7 +113,7 @@ def capture(
events.reply.emit(reply=AIReply.error(msg.camera_not_open()))
return None

self._countdown(countdown)
self.countdown(countdown)

ret, photo = self._cam.read()
if not ret:
Expand Down
21 changes: 20 additions & 1 deletion src/main/askai/core/model/image_result.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import ast
import json

from pydantic import BaseModel, Field
from typing import AnyStr

Expand All @@ -15,4 +18,20 @@ class ImageResult(BaseModel):

@staticmethod
def of(image_caption: AnyStr) -> "ImageResult":
return ImageResult.model_validate_json(str(image_caption).replace("'", '"'))
"""Parses a string into an ImageResult instance with enhanced handling for mixed quotes.
:param image_caption: The string to parse.
:return: An instance of ImageResult populated with the parsed data.
:raises ValueError: If the string cannot be parsed as a Python object or JSON.
"""

try:
parsed_data = ast.literal_eval(image_caption)
except (ValueError, SyntaxError):
try:
parsed_data = json.loads(image_caption)
except json.JSONDecodeError as e_json:
raise ValueError("String could not be parsed as Python object or JSON.") from e_json
try:
return ImageResult(**parsed_data)
except Exception as e_pydantic:
raise ValueError("Parsed data does not conform to ImageResult schema.") from e_pydantic
9 changes: 5 additions & 4 deletions src/main/askai/core/processors/splitter/splitter_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,9 @@ def run(self) -> None:
except InaccurateResponse:
live.update(Spinner("dots", f"[red]AI failed to respond. Retrying…[/red]", style="green"))

final_state: States = self.pipeline.state

if configs.is_debug:
final_state: States = self.pipeline.state
final_state_str: str = (
"[green] Succeeded[/green] " if final_state == States.COMPLETE else "[red] Failed [/red]"
)
Expand All @@ -144,6 +145,6 @@ def run(self) -> None:
)
self.display(f"Failures:\n{all_failures}")

if final_state != States.COMPLETE and not self._interrupted:
retries: int = self.pipeline.failures[self.pipeline.state.value]
self.display(f" Failed to generate a response after {retries} retries", True)
if final_state != States.COMPLETE and not self._interrupted:
retries: int = self.pipeline.failures[self.pipeline.state.value]
self.display(f" Failed to generate a response after {retries} retries", True)
11 changes: 10 additions & 1 deletion src/main/askai/core/router/agent_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from askai.core.router.tools.generation import generate_content, save_content
from askai.core.router.tools.summarization import summarize
from askai.core.router.tools.terminal import execute_command, list_contents, open_command
from askai.core.router.tools.vision import image_captioner, parse_caption
from askai.core.router.tools.vision import image_captioner, parse_caption, capture_screenshot
from askai.core.router.tools.webcam import webcam_capturer, webcam_identifier, CAPTION_TEMPLATE
from askai.exception.exceptions import TerminatingQuery

Expand Down Expand Up @@ -152,6 +152,15 @@ def webcam_identifier(self) -> str:
"""
return webcam_identifier()

def screenshot(self, path_name: AnyPath | None = None, save_dir: AnyPath | None = None) -> str:
"""Capture a screenshot and save it to the specified path.
Usage: `screenshot(path_name, load_dir)`
:param path_name: Optional path name of the captured screenshot.
:param save_dir: Optional directory to save the screenshot.
:return: The path to the saved screenshot.
"""
return capture_screenshot(path_name, save_dir)

def generate_content(self, instructions: str, mime_type: str, filepath: AnyPath) -> str:
"""Use this tool to generate various types of content, such as code, text, images, etc. This tool processes
descriptive instructions to create the specified content type and can optionally save it to a file.
Expand Down
44 changes: 30 additions & 14 deletions src/main/askai/core/router/tools/vision.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
import os
from fileinput import filename
from textwrap import indent

import pause
import pyautogui
import torch
from PIL import Image
from hspylib.core.config.path_object import PathObject
from hspylib.core.enums.enumeration import Enumeration
from hspylib.core.metaclass.classpath import AnyPath
from hspylib.core.preconditions import check_argument
from hspylib.core.tools.text_tools import ensure_endswith
from hspylib.core.zoned_datetime import now
from transformers import BlipForConditionalGeneration, BlipProcessor

from askai.core.askai_events import events
from askai.core.askai_messages import msg
from askai.core.component.cache_service import PICTURE_DIR
from askai.core.component.audio_player import player
from askai.core.component.cache_service import PICTURE_DIR, SCREENSHOTS_DIR
from askai.core.engine.ai_vision import AIVision
from askai.core.model.ai_reply import AIReply
from askai.core.model.image_result import ImageResult
Expand Down Expand Up @@ -100,6 +105,7 @@ def parse_caption(image_caption: str) -> list[str]:
:return: The parsed caption as a string.
"""
if image_caption:
events.reply.emit(reply=AIReply.full(msg.parsing_caption()))
result: ImageResult = ImageResult.of(image_caption)
ln: str = os.linesep
people_desc: list[str] = []
Expand All @@ -121,22 +127,32 @@ def parse_caption(image_caption: str) -> list[str]:
return [msg.no_caption()]


def take_screenshot(path_name: AnyPath, load_dir: AnyPath | None = None) -> str:
"""Takes a screenshot and saves it to the specified path.
:param path_name: The path where the screenshot will be saved.
:param load_dir: Optional directory to save the screenshot.
def capture_screenshot(path_name: AnyPath | None = None, save_dir: AnyPath | None = None) -> str:
"""Capture a screenshot and save it to the specified path.
:param path_name: Optional path name of the captured screenshot.
:param save_dir: Optional directory to save the screenshot.
:return: The path to the saved screenshot.
"""

posix_path: PathObject = PathObject.of(path_name)
file_path: str = ensure_endswith(path_name or f"ASKAI-SCREENSHOT-{now('%Y%m%d%H%M')}", ".jpeg")
posix_path: PathObject = PathObject.of(file_path)
check_argument(os.path.exists(posix_path.abs_dir))
screenshot = pyautogui.screenshot()
_, ext = os.path.splitext(posix_path.filename)
if ext.casefold().endswith((".jpg", ".jpeg")):
screenshot = screenshot.convert("RGB")
final_path: str = os.path.join(load_dir or posix_path.abs_dir or PICTURE_DIR, posix_path.filename)
screenshot.save(final_path)
events.reply.emit(reply=AIReply.full(msg.screenshot_saved(final_path)))
desktop_caption = image_captioner(final_path, load_dir)
desktop_caption: str = "No screenshot captured"
i = 3

while (i := (i - 1)) >= 0:
player.play_sfx("click")
pause.seconds(1)
player.play_sfx("camera-shutter")
events.reply.emit(reply=AIReply.mute(msg.click()), erase_last=True)

if screenshot := pyautogui.screenshot():
_, ext = os.path.splitext(posix_path.filename)
if ext.casefold().endswith((".jpg", ".jpeg")):
screenshot = screenshot.convert("RGB")
final_path: str = os.path.join(save_dir or posix_path.abs_dir or SCREENSHOTS_DIR, posix_path.filename)
screenshot.save(final_path)
events.reply.emit(reply=AIReply.full(msg.screenshot_saved(final_path)))
desktop_caption = image_captioner(final_path, save_dir)

return desktop_caption
Loading

0 comments on commit 83f79b7

Please sign in to comment.