Skip to content

Commit

Permalink
Add support for querying code interpreter (#66)
Browse files Browse the repository at this point in the history
* Add support for queryig code interpreter

* Fix formatting

* Ensure the sandbox close is called on exceptions

* Update service/code_interpreter.py

Co-authored-by: Tomas Valenta <valenta.and.thomas@gmail.com>

* Update service/code_interpreter.py

Co-authored-by: Tomas Valenta <valenta.and.thomas@gmail.com>

* Update service/router.py

Co-authored-by: Tomas Valenta <valenta.and.thomas@gmail.com>

* Update service/code_interpreter.py

Co-authored-by: Tomas Valenta <valenta.and.thomas@gmail.com>

* Add system prompt

* Format code

* Bump dependencies

* Minor tweaks

---------

Co-authored-by: Tomas Valenta <valenta.and.thomas@gmail.com>
  • Loading branch information
homanp and ValentaTomas committed Feb 29, 2024
1 parent 7491a0c commit 02933ff
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 54 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ Input example:
"provider": "openai",
"name": "text-embedding-3-small",
"dimensions": 384
}
},
"interpreter_mode": False, # Set to True if you wish to run computation Q&A with a code interpreter
"session_id": "my_session_id" # keeps micro-vm sessions and enables caching
}
```
Expand Down
1 change: 1 addition & 0 deletions models/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class RequestPayload(BaseModel):
index_name: str
encoder: EncoderConfig = EncoderConfig()
session_id: Optional[str] = None
interpreter_mode: Optional[bool] = False


class ResponsePayload(BaseModel):
Expand Down
21 changes: 11 additions & 10 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ cmake = "^3.28.1"
pypdf = "^4.0.1"
docx2txt = "^0.8"
python-dotenv = "^1.0.1"
e2b = "^0.14.4"
e2b = "^0.14.7"
gunicorn = "^21.2.0"
unstructured-client = "^0.18.0"
unstructured = {extras = ["google-drive"], version = "^0.12.4"}
Expand Down
102 changes: 76 additions & 26 deletions service/code_interpreter.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,23 @@
import asyncio
import logging
import re
import textwrap
import time
from typing import List

import pandas as pd
from decouple import config
from e2b import Sandbox
from openai import AsyncOpenAI

logging.getLogger("e2b").setLevel(logging.INFO)

client = AsyncOpenAI(
api_key=config("OPENAI_API_KEY"),
)

SYSTEM_PROMPT = "You are a world-class python programmer that can complete any data analysis tasks by coding."


class CodeInterpreterService:
timeout = 3 * 60 # 3 minutes
Expand Down Expand Up @@ -61,48 +72,87 @@ def __init__(
self.sandbox = self._ensure_sandbox(session_id)

async def __aenter__(self):
if not self._is_initialized:
self._is_initialized = True
for file_url in self.file_urls:
await self._upload_file(file_url)
try:
if not self._is_initialized:
self._is_initialized = True
for file_url in self.file_urls:
await self._upload_file(file_url)
except:
self.self.sandbox.close()
raise

return self

async def __aexit__(self, _exc_type, _exc_value, _traceback):
if self.session_id:
self.sandbox.keep_alive(self.timeout)
self.sandbox.close()
try:
if self.session_id:
self.sandbox.keep_alive(self.timeout)
finally:
self.sandbox.close()

def get_files_code(self):
def get_dataframe(self):
"""
Get the code to read the files in the sandbox.
This can be used for instructing the LLM how to access the loaded files.
"""

# TODO: Add support for xslx, json
files_code = "\n".join(
f'df{i} = pd.read_csv("{self._get_file_path(url)}") # {url}'
for i, url in enumerate(self.file_urls)
# TODO: Add support for multiple dataframes
df = pd.read_csv(self.file_urls[0])
return df, self.file_urls[0]

def generate_prompt(self, query: str) -> str:
df, url = self.get_dataframe()
return textwrap.dedent(
f"""
You are provided with a following pandas dataframe (`df`):
{df.info()}
Using the provided dataframe (`df`), update the following python code using pandas that returns the answer to question: \"{query}\"
This is the initial python code to be updated:
```python
import pandas as pd
df = pd.read_csv("{url}")
1. Process: Manipulating data for analysis (grouping, filtering, aggregating, etc.)
2. Analyze: Conducting the actual analysis
3. Output: Returning the answer as a string
```
"""
)

return f"""
import pandas as pd
def extract_code(self, code: str) -> str:
pattern = r"```(?:python)?(.*?)```"
matches = re.findall(pattern, code, re.DOTALL)
if matches:
return matches[0].strip()
return ""

{files_code}
"""
async def generate_code(
self,
query: str,
) -> str:
content = self.generate_prompt(query=query)
completion = await client.chat.completions.create(
messages=[
{
"role": "system",
"content": SYSTEM_PROMPT,
},
{
"role": "user",
"content": content,
},
],
model="gpt-3.5-turbo-0125",
)
output = completion.choices[0].message.content
return self.extract_code(code=output)

async def run_python(self, code: str):
files_code = self.get_files_code()

templated_code = f"""
{files_code}
{code}
"""

epoch_time = time.time()
codefile_path = f"/tmp/main-{epoch_time}.py"
self.sandbox.filesystem.write(codefile_path, templated_code)
self.sandbox.filesystem.write(codefile_path, code)
process = await asyncio.to_thread(
self.sandbox.process.start_and_wait,
f"python {codefile_path}",
Expand Down
41 changes: 25 additions & 16 deletions service/router.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
from uuid import uuid4

from decouple import config
from semantic_router.encoders import CohereEncoder
from semantic_router.layer import RouteLayer
from semantic_router.route import Route

from models.document import BaseDocumentChunk
from models.query import RequestPayload

# from service.code_interpreter import CodeInterpreterService
from service.code_interpreter import CodeInterpreterService
from utils.logger import logger
from utils.summarise import SUMMARY_SUFFIX
from vectordbs import BaseVectorDatabase, get_vector_service

STRUTURED_DATA = [".xlsx", ".csv", ".json"]


def create_route_layer() -> RouteLayer:
routes = [
Expand All @@ -34,12 +37,29 @@ async def get_documents(
*, vector_service: BaseVectorDatabase, payload: RequestPayload
) -> list[BaseDocumentChunk]:
chunks = await vector_service.query(input=payload.input, top_k=5)

if not len(chunks):
logger.error(f"No documents found for query: {payload.input}")
return []

reranked_chunks = await vector_service.rerank(query=payload.input, documents=chunks)
is_structured = chunks[0].metadata.get("document_type") in STRUTURED_DATA
reranked_chunks = []
if is_structured and payload.interpreter_mode:
async with CodeInterpreterService(
session_id=payload.session_id, file_urls=[chunks[0].metadata.get("doc_url")]
) as service:
code = await service.generate_code(query=payload.input)
response = await service.run_python(code=code)
output = response.stdout
reranked_chunks.append(
BaseDocumentChunk(
id=str(uuid4()),
document_id=str(uuid4()),
content=output,
doc_url=chunks[0].metadata.get("doc_url"),
)
)
reranked_chunks.extend(
await vector_service.rerank(query=payload.input, documents=chunks)
)
return reranked_chunks


Expand All @@ -62,15 +82,4 @@ async def query(payload: RequestPayload) -> list[BaseDocumentChunk]:
encoder=encoder,
)

# async with CodeInterpreterService(
# session_id=payload.session_id,
# file_urls=[
# "https://raw.githubusercontent.com/datasciencedojo/datasets/master/titanic.csv"
# ],
# ) as service:
# code = "df0.info()"
# output = await service.run_python(code=code)
# print(output.stderr)
# print(output.stdout)

return await get_documents(vector_service=vector_service, payload=payload)

0 comments on commit 02933ff

Please sign in to comment.