Skip to content

Commit

Permalink
Add support for querying code interpreter (#66)
Browse files Browse the repository at this point in the history
* Add support for queryig code interpreter

* Fix formatting

* Ensure the sandbox close is called on exceptions

* Update service/code_interpreter.py

Co-authored-by: Tomas Valenta <valenta.and.thomas@gmail.com>

* Update service/code_interpreter.py

Co-authored-by: Tomas Valenta <valenta.and.thomas@gmail.com>

* Update service/router.py

Co-authored-by: Tomas Valenta <valenta.and.thomas@gmail.com>

* Update service/code_interpreter.py

Co-authored-by: Tomas Valenta <valenta.and.thomas@gmail.com>

* Add system prompt

* Format code

* Bump dependencies

* Minor tweaks

---------

Co-authored-by: Tomas Valenta <valenta.and.thomas@gmail.com>
  • Loading branch information
homanp and ValentaTomas committed Feb 29, 2024
1 parent 8264094 commit 9bcefb3
Show file tree
Hide file tree
Showing 10 changed files with 351 additions and 289 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ Input example:
"encoder": {
"type": "openai",
"name": "text-embedding-3-small",
}
},
"interpreter_mode": False, # Set to True if you wish to run computation Q&A with a code interpreter
"session_id": "my_session_id" # keeps micro-vm sessions and enables caching
}
```
Expand Down
2 changes: 1 addition & 1 deletion api/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from models.ingest import RequestPayload
from service.embedding import EmbeddingService, get_encoder
from service.ingest import handle_urls, handle_google_drive
from service.ingest import handle_google_drive, handle_urls
from utils.summarise import SUMMARY_SUFFIX

router = APIRouter()
Expand Down
2 changes: 1 addition & 1 deletion models/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
from pydantic import BaseModel

from models.file import File
from models.vector_database import VectorDatabase
from models.google_drive import GoogleDrive
from models.vector_database import VectorDatabase


class EncoderEnum(str, Enum):
Expand Down
1 change: 1 addition & 0 deletions models/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class RequestPayload(BaseModel):
index_name: str
encoder: Encoder
session_id: Optional[str] = None
interpreter_mode: Optional[bool] = False


class ResponseData(BaseModel):
Expand Down
483 changes: 242 additions & 241 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ cmake = "^3.28.1"
pypdf = "^4.0.1"
docx2txt = "^0.8"
python-dotenv = "^1.0.1"
e2b = "^0.14.4"
e2b = "^0.14.7"
gunicorn = "^21.2.0"
unstructured-client = "^0.18.0"
unstructured = {extras = ["google-drive"], version = "^0.12.4"}
Expand Down
102 changes: 76 additions & 26 deletions service/code_interpreter.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,23 @@
import asyncio
import logging
import re
import textwrap
import time
from typing import List

import pandas as pd
from decouple import config
from e2b import Sandbox
from openai import AsyncOpenAI

logging.getLogger("e2b").setLevel(logging.INFO)

client = AsyncOpenAI(
api_key=config("OPENAI_API_KEY"),
)

SYSTEM_PROMPT = "You are a world-class python programmer that can complete any data analysis tasks by coding."


class CodeInterpreterService:
timeout = 3 * 60 # 3 minutes
Expand Down Expand Up @@ -61,48 +72,87 @@ def __init__(
self.sandbox = self._ensure_sandbox(session_id)

async def __aenter__(self):
if not self._is_initialized:
self._is_initialized = True
for file_url in self.file_urls:
await self._upload_file(file_url)
try:
if not self._is_initialized:
self._is_initialized = True
for file_url in self.file_urls:
await self._upload_file(file_url)
except:
self.self.sandbox.close()
raise

return self

async def __aexit__(self, _exc_type, _exc_value, _traceback):
if self.session_id:
self.sandbox.keep_alive(self.timeout)
self.sandbox.close()
try:
if self.session_id:
self.sandbox.keep_alive(self.timeout)
finally:
self.sandbox.close()

def get_files_code(self):
def get_dataframe(self):
"""
Get the code to read the files in the sandbox.
This can be used for instructing the LLM how to access the loaded files.
"""

# TODO: Add support for xslx, json
files_code = "\n".join(
f'df{i} = pd.read_csv("{self._get_file_path(url)}") # {url}'
for i, url in enumerate(self.file_urls)
# TODO: Add support for multiple dataframes
df = pd.read_csv(self.file_urls[0])
return df, self.file_urls[0]

def generate_prompt(self, query: str) -> str:
df, url = self.get_dataframe()
return textwrap.dedent(
f"""
You are provided with a following pandas dataframe (`df`):
{df.info()}
Using the provided dataframe (`df`), update the following python code using pandas that returns the answer to question: \"{query}\"
This is the initial python code to be updated:
```python
import pandas as pd
df = pd.read_csv("{url}")
1. Process: Manipulating data for analysis (grouping, filtering, aggregating, etc.)
2. Analyze: Conducting the actual analysis
3. Output: Returning the answer as a string
```
"""
)

return f"""
import pandas as pd
def extract_code(self, code: str) -> str:
pattern = r"```(?:python)?(.*?)```"
matches = re.findall(pattern, code, re.DOTALL)
if matches:
return matches[0].strip()
return ""

{files_code}
"""
async def generate_code(
self,
query: str,
) -> str:
content = self.generate_prompt(query=query)
completion = await client.chat.completions.create(
messages=[
{
"role": "system",
"content": SYSTEM_PROMPT,
},
{
"role": "user",
"content": content,
},
],
model="gpt-3.5-turbo-0125",
)
output = completion.choices[0].message.content
return self.extract_code(code=output)

async def run_python(self, code: str):
files_code = self.get_files_code()

templated_code = f"""
{files_code}
{code}
"""

epoch_time = time.time()
codefile_path = f"/tmp/main-{epoch_time}.py"
self.sandbox.filesystem.write(codefile_path, templated_code)
self.sandbox.filesystem.write(codefile_path, code)
process = await asyncio.to_thread(
self.sandbox.process.start_and_wait,
f"python {codefile_path}",
Expand Down
2 changes: 1 addition & 1 deletion service/embedding.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
from models.file import File
from models.google_drive import GoogleDrive
from models.ingest import Encoder, EncoderEnum
from utils.file import get_file_extension_from_url
from utils.logger import logger
from utils.summarise import completion
from utils.file import get_file_extension_from_url
from vectordbs import get_vector_service


Expand Down
41 changes: 25 additions & 16 deletions service/router.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
from uuid import uuid4

from decouple import config
from semantic_router.encoders import CohereEncoder
from semantic_router.layer import RouteLayer
from semantic_router.route import Route

from models.document import BaseDocumentChunk
from models.query import RequestPayload

# from service.code_interpreter import CodeInterpreterService
from service.code_interpreter import CodeInterpreterService
from service.embedding import get_encoder
from utils.logger import logger
from utils.summarise import SUMMARY_SUFFIX
from vectordbs import BaseVectorDatabase, get_vector_service

STRUTURED_DATA = [".xlsx", ".csv", ".json"]


def create_route_layer() -> RouteLayer:
routes = [
Expand All @@ -35,12 +38,29 @@ async def get_documents(
*, vector_service: BaseVectorDatabase, payload: RequestPayload
) -> list[BaseDocumentChunk]:
chunks = await vector_service.query(input=payload.input, top_k=5)

if not len(chunks):
logger.error(f"No documents found for query: {payload.input}")
return []

reranked_chunks = await vector_service.rerank(query=payload.input, documents=chunks)
is_structured = chunks[0].metadata.get("document_type") in STRUTURED_DATA
reranked_chunks = []
if is_structured and payload.interpreter_mode:
async with CodeInterpreterService(
session_id=payload.session_id, file_urls=[chunks[0].metadata.get("doc_url")]
) as service:
code = await service.generate_code(query=payload.input)
response = await service.run_python(code=code)
output = response.stdout
reranked_chunks.append(
BaseDocumentChunk(
id=str(uuid4()),
document_id=str(uuid4()),
content=output,
doc_url=chunks[0].metadata.get("doc_url"),
)
)
reranked_chunks.extend(
await vector_service.rerank(query=payload.input, documents=chunks)
)
return reranked_chunks


Expand All @@ -63,15 +83,4 @@ async def query(payload: RequestPayload) -> list[BaseDocumentChunk]:
encoder=encoder,
)

# async with CodeInterpreterService(
# session_id=payload.session_id,
# file_urls=[
# "https://raw.githubusercontent.com/datasciencedojo/datasets/master/titanic.csv"
# ],
# ) as service:
# code = "df0.info()"
# output = await service.run_python(code=code)
# print(output.stderr)
# print(output.stdout)

return await get_documents(vector_service=vector_service, payload=payload)
2 changes: 1 addition & 1 deletion utils/file.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from urllib.parse import urlparse
import os
from urllib.parse import urlparse


def get_file_extension_from_url(url: str) -> str:
Expand Down

0 comments on commit 9bcefb3

Please sign in to comment.