diff --git a/README.md b/README.md index 3a5c1eb7..6bded26c 100644 --- a/README.md +++ b/README.md @@ -57,7 +57,8 @@ Input example: "provider": "openai", "name": "text-embedding-3-small", "dimensions": 384 - } + }, + "interpreter_mode": False, # Set to True if you wish to run computation Q&A with a code interpreter "session_id": "my_session_id" # keeps micro-vm sessions and enables caching } ``` diff --git a/models/query.py b/models/query.py index a9e120db..3aef2f50 100644 --- a/models/query.py +++ b/models/query.py @@ -13,6 +13,7 @@ class RequestPayload(BaseModel): index_name: str encoder: EncoderConfig = EncoderConfig() session_id: Optional[str] = None + interpreter_mode: Optional[bool] = False class ResponsePayload(BaseModel): diff --git a/poetry.lock b/poetry.lock index 631b1a20..6889e530 100644 --- a/poetry.lock +++ b/poetry.lock @@ -885,13 +885,13 @@ files = [ [[package]] name = "e2b" -version = "0.14.4" +version = "0.14.7" description = "E2B SDK that give agents cloud environments" optional = false python-versions = ">=3.8,<4.0" files = [ - {file = "e2b-0.14.4-py3-none-any.whl", hash = "sha256:70ca7f4a4696b2ad5f1785ffecd57906ce37506e834c29078f61563a7392962a"}, - {file = "e2b-0.14.4.tar.gz", hash = "sha256:e838f478f5cfc82333459996a508f885551ceedfbaa0fad871a40062abc8296d"}, + {file = "e2b-0.14.7-py3-none-any.whl", hash = "sha256:1a37d0f2c706fe5109e8895362d8850b9aac8bf321dbb8e2197fb07b8c74287e"}, + {file = "e2b-0.14.7.tar.gz", hash = "sha256:ac74d66fa9d51a6fb10af8f080aa590aa5c7245f2ae70652f76f380f4dbaf2c6"}, ] [package.dependencies] @@ -2217,13 +2217,13 @@ files = [ [[package]] name = "openai" -version = "1.12.0" +version = "1.13.3" description = "The official Python library for the openai API" optional = false python-versions = ">=3.7.1" files = [ - {file = "openai-1.12.0-py3-none-any.whl", hash = "sha256:a54002c814e05222e413664f651b5916714e4700d041d5cf5724d3ae1a3e3481"}, - {file = "openai-1.12.0.tar.gz", hash = "sha256:99c5d257d09ea6533d689d1cc77caa0ac679fa21efef8893d8b0832a86877f1b"}, + {file = "openai-1.13.3-py3-none-any.whl", hash = "sha256:5769b62abd02f350a8dd1a3a242d8972c947860654466171d60fb0972ae0a41c"}, + {file = "openai-1.13.3.tar.gz", hash = "sha256:ff6c6b3bc7327e715e4b3592a923a5a1c7519ff5dd764a83d69f633d49e77a7b"}, ] [package.dependencies] @@ -3744,13 +3744,13 @@ files = [ [[package]] name = "unstructured" -version = "0.12.4" +version = "0.12.5" description = "A library that prepares raw documents for downstream ML tasks." optional = false python-versions = ">=3.9.0,<3.12" files = [ - {file = "unstructured-0.12.4-py3-none-any.whl", hash = "sha256:f1aa046297a3afba3aa16895e513aca6a93802ef73b7a18080656435c4deb217"}, - {file = "unstructured-0.12.4.tar.gz", hash = "sha256:019cf52e9e2bfa286e61ffa0d7d336e1645280f9a0f165e697583143fcfe708a"}, + {file = "unstructured-0.12.5-py3-none-any.whl", hash = "sha256:cce7de36964f556810adb8728d0639d8e9b3ef4567443877609f3c66a54e24d1"}, + {file = "unstructured-0.12.5.tar.gz", hash = "sha256:5ea6c881049e7d98a88c07192bcb6ab750de41b4e3b594972ed1034bda99dbae"}, ] [package.dependencies] @@ -3777,6 +3777,7 @@ wrapt = "*" [package.extras] airtable = ["pyairtable"] all-docs = ["markdown", "msg-parser", "networkx", "onnx", "openpyxl", "pandas", "pdf2image", "pdfminer.six", "pikepdf", "pillow-heif", "pypandoc", "pypdf", "python-docx", "python-pptx (<=0.6.23)", "unstructured-inference (==0.7.23)", "unstructured.pytesseract (>=0.3.12)", "xlrd"] +astra = ["astrapy"] azure = ["adlfs", "fsspec"] azure-cognitive-search = ["azure-search-documents"] bedrock = ["boto3", "langchain-community"] @@ -4257,4 +4258,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p [metadata] lock-version = "2.0" python-versions = ">=3.9,<3.12" -content-hash = "ca9bfb9fb29878b192128f32b196abecb6b56f0a97fa56cbee4e1ac31791bfe2" +content-hash = "b9774decb9338d39235bb4599347540fa5a684cecd89d194a95b26257be43364" diff --git a/pyproject.toml b/pyproject.toml index 8f7ae264..7263d176 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,7 +28,7 @@ cmake = "^3.28.1" pypdf = "^4.0.1" docx2txt = "^0.8" python-dotenv = "^1.0.1" -e2b = "^0.14.4" +e2b = "^0.14.7" gunicorn = "^21.2.0" unstructured-client = "^0.18.0" unstructured = {extras = ["google-drive"], version = "^0.12.4"} diff --git a/service/code_interpreter.py b/service/code_interpreter.py index 7c456fa5..56b2abc2 100644 --- a/service/code_interpreter.py +++ b/service/code_interpreter.py @@ -1,12 +1,23 @@ import asyncio import logging +import re +import textwrap import time from typing import List +import pandas as pd +from decouple import config from e2b import Sandbox +from openai import AsyncOpenAI logging.getLogger("e2b").setLevel(logging.INFO) +client = AsyncOpenAI( + api_key=config("OPENAI_API_KEY"), +) + +SYSTEM_PROMPT = "You are a world-class python programmer that can complete any data analysis tasks by coding." + class CodeInterpreterService: timeout = 3 * 60 # 3 minutes @@ -61,48 +72,87 @@ def __init__( self.sandbox = self._ensure_sandbox(session_id) async def __aenter__(self): - if not self._is_initialized: - self._is_initialized = True - for file_url in self.file_urls: - await self._upload_file(file_url) + try: + if not self._is_initialized: + self._is_initialized = True + for file_url in self.file_urls: + await self._upload_file(file_url) + except: + self.self.sandbox.close() + raise return self async def __aexit__(self, _exc_type, _exc_value, _traceback): - if self.session_id: - self.sandbox.keep_alive(self.timeout) - self.sandbox.close() + try: + if self.session_id: + self.sandbox.keep_alive(self.timeout) + finally: + self.sandbox.close() - def get_files_code(self): + def get_dataframe(self): """ Get the code to read the files in the sandbox. This can be used for instructing the LLM how to access the loaded files. """ - - # TODO: Add support for xslx, json - files_code = "\n".join( - f'df{i} = pd.read_csv("{self._get_file_path(url)}") # {url}' - for i, url in enumerate(self.file_urls) + # TODO: Add support for multiple dataframes + df = pd.read_csv(self.file_urls[0]) + return df, self.file_urls[0] + + def generate_prompt(self, query: str) -> str: + df, url = self.get_dataframe() + return textwrap.dedent( + f""" + You are provided with a following pandas dataframe (`df`): + {df.info()} + + Using the provided dataframe (`df`), update the following python code using pandas that returns the answer to question: \"{query}\" + + This is the initial python code to be updated: + + ```python + import pandas as pd + + df = pd.read_csv("{url}") + 1. Process: Manipulating data for analysis (grouping, filtering, aggregating, etc.) + 2. Analyze: Conducting the actual analysis + 3. Output: Returning the answer as a string + ``` + """ ) - return f""" -import pandas as pd + def extract_code(self, code: str) -> str: + pattern = r"```(?:python)?(.*?)```" + matches = re.findall(pattern, code, re.DOTALL) + if matches: + return matches[0].strip() + return "" -{files_code} - -""" + async def generate_code( + self, + query: str, + ) -> str: + content = self.generate_prompt(query=query) + completion = await client.chat.completions.create( + messages=[ + { + "role": "system", + "content": SYSTEM_PROMPT, + }, + { + "role": "user", + "content": content, + }, + ], + model="gpt-3.5-turbo-0125", + ) + output = completion.choices[0].message.content + return self.extract_code(code=output) async def run_python(self, code: str): - files_code = self.get_files_code() - - templated_code = f""" -{files_code} -{code} -""" - epoch_time = time.time() codefile_path = f"/tmp/main-{epoch_time}.py" - self.sandbox.filesystem.write(codefile_path, templated_code) + self.sandbox.filesystem.write(codefile_path, code) process = await asyncio.to_thread( self.sandbox.process.start_and_wait, f"python {codefile_path}", diff --git a/service/router.py b/service/router.py index b75895f1..076a6ede 100644 --- a/service/router.py +++ b/service/router.py @@ -1,3 +1,5 @@ +from uuid import uuid4 + from decouple import config from semantic_router.encoders import CohereEncoder from semantic_router.layer import RouteLayer @@ -5,12 +7,13 @@ from models.document import BaseDocumentChunk from models.query import RequestPayload - -# from service.code_interpreter import CodeInterpreterService +from service.code_interpreter import CodeInterpreterService from utils.logger import logger from utils.summarise import SUMMARY_SUFFIX from vectordbs import BaseVectorDatabase, get_vector_service +STRUTURED_DATA = [".xlsx", ".csv", ".json"] + def create_route_layer() -> RouteLayer: routes = [ @@ -34,12 +37,29 @@ async def get_documents( *, vector_service: BaseVectorDatabase, payload: RequestPayload ) -> list[BaseDocumentChunk]: chunks = await vector_service.query(input=payload.input, top_k=5) - if not len(chunks): logger.error(f"No documents found for query: {payload.input}") return [] - - reranked_chunks = await vector_service.rerank(query=payload.input, documents=chunks) + is_structured = chunks[0].metadata.get("document_type") in STRUTURED_DATA + reranked_chunks = [] + if is_structured and payload.interpreter_mode: + async with CodeInterpreterService( + session_id=payload.session_id, file_urls=[chunks[0].metadata.get("doc_url")] + ) as service: + code = await service.generate_code(query=payload.input) + response = await service.run_python(code=code) + output = response.stdout + reranked_chunks.append( + BaseDocumentChunk( + id=str(uuid4()), + document_id=str(uuid4()), + content=output, + doc_url=chunks[0].metadata.get("doc_url"), + ) + ) + reranked_chunks.extend( + await vector_service.rerank(query=payload.input, documents=chunks) + ) return reranked_chunks @@ -62,15 +82,4 @@ async def query(payload: RequestPayload) -> list[BaseDocumentChunk]: encoder=encoder, ) - # async with CodeInterpreterService( - # session_id=payload.session_id, - # file_urls=[ - # "https://raw.githubusercontent.com/datasciencedojo/datasets/master/titanic.csv" - # ], - # ) as service: - # code = "df0.info()" - # output = await service.run_python(code=code) - # print(output.stderr) - # print(output.stdout) - return await get_documents(vector_service=vector_service, payload=payload)