Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for querying code interpreter #66

Merged
merged 11 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ Input example:
"encoder": {
"type": "openai",
"name": "text-embedding-3-small",
}
},
"interpreter_mode": False, # Set to True if you wish to run computation Q&A with a code interpreter
"session_id": "my_session_id" # keeps micro-vm sessions and enables caching
}
```
Expand Down
2 changes: 1 addition & 1 deletion api/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from models.ingest import RequestPayload
from service.embedding import EmbeddingService, get_encoder
from service.ingest import handle_urls, handle_google_drive
from service.ingest import handle_google_drive, handle_urls
from utils.summarise import SUMMARY_SUFFIX

router = APIRouter()
Expand Down
2 changes: 1 addition & 1 deletion models/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
from pydantic import BaseModel

from models.file import File
from models.vector_database import VectorDatabase
from models.google_drive import GoogleDrive
from models.vector_database import VectorDatabase


class EncoderEnum(str, Enum):
Expand Down
1 change: 1 addition & 0 deletions models/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class RequestPayload(BaseModel):
index_name: str
encoder: Encoder
session_id: Optional[str] = None
interpreter_mode: Optional[bool] = False


class ResponseData(BaseModel):
Expand Down
483 changes: 242 additions & 241 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ cmake = "^3.28.1"
pypdf = "^4.0.1"
docx2txt = "^0.8"
python-dotenv = "^1.0.1"
e2b = "^0.14.4"
e2b = "^0.14.7"
gunicorn = "^21.2.0"
unstructured-client = "^0.18.0"
unstructured = {extras = ["google-drive"], version = "^0.12.4"}
Expand Down
102 changes: 76 additions & 26 deletions service/code_interpreter.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,23 @@
import asyncio
import logging
import re
import textwrap
import time
from typing import List

import pandas as pd
from decouple import config
from e2b import Sandbox
from openai import AsyncOpenAI

logging.getLogger("e2b").setLevel(logging.INFO)

client = AsyncOpenAI(
api_key=config("OPENAI_API_KEY"),
)

SYSTEM_PROMPT = "You are a world-class python programmer that can complete any data analysis tasks by coding."


class CodeInterpreterService:
timeout = 3 * 60 # 3 minutes
Expand Down Expand Up @@ -61,48 +72,87 @@ def __init__(
self.sandbox = self._ensure_sandbox(session_id)

async def __aenter__(self):
if not self._is_initialized:
self._is_initialized = True
for file_url in self.file_urls:
await self._upload_file(file_url)
try:
if not self._is_initialized:
self._is_initialized = True
for file_url in self.file_urls:
await self._upload_file(file_url)
except:
self.self.sandbox.close()
raise

return self

async def __aexit__(self, _exc_type, _exc_value, _traceback):
if self.session_id:
self.sandbox.keep_alive(self.timeout)
self.sandbox.close()
try:
if self.session_id:
self.sandbox.keep_alive(self.timeout)
finally:
self.sandbox.close()

def get_files_code(self):
def get_dataframe(self):
"""
Get the code to read the files in the sandbox.
This can be used for instructing the LLM how to access the loaded files.
"""

# TODO: Add support for xslx, json
files_code = "\n".join(
f'df{i} = pd.read_csv("{self._get_file_path(url)}") # {url}'
for i, url in enumerate(self.file_urls)
# TODO: Add support for multiple dataframes
df = pd.read_csv(self.file_urls[0])
return df, self.file_urls[0]

def generate_prompt(self, query: str) -> str:
df, url = self.get_dataframe()
return textwrap.dedent(
f"""
You are provided with a following pandas dataframe (`df`):
{df.info()}

Using the provided dataframe (`df`), update the following python code using pandas that returns the answer to question: \"{query}\"

This is the initial python code to be updated:

```python
import pandas as pd

df = pd.read_csv("{url}")
homanp marked this conversation as resolved.
Show resolved Hide resolved
1. Process: Manipulating data for analysis (grouping, filtering, aggregating, etc.)
2. Analyze: Conducting the actual analysis
3. Output: Returning the answer as a string
```
"""
)

return f"""
import pandas as pd
def extract_code(self, code: str) -> str:
pattern = r"```(?:python)?(.*?)```"
matches = re.findall(pattern, code, re.DOTALL)
if matches:
return matches[0].strip()
return ""

{files_code}

"""
async def generate_code(
self,
query: str,
) -> str:
content = self.generate_prompt(query=query)
completion = await client.chat.completions.create(
messages=[
homanp marked this conversation as resolved.
Show resolved Hide resolved
{
"role": "system",
"content": SYSTEM_PROMPT,
},
{
"role": "user",
"content": content,
},
],
model="gpt-3.5-turbo-0125",
)
output = completion.choices[0].message.content
return self.extract_code(code=output)

async def run_python(self, code: str):
files_code = self.get_files_code()

templated_code = f"""
{files_code}
{code}
"""

epoch_time = time.time()
codefile_path = f"/tmp/main-{epoch_time}.py"
self.sandbox.filesystem.write(codefile_path, templated_code)
self.sandbox.filesystem.write(codefile_path, code)
process = await asyncio.to_thread(
self.sandbox.process.start_and_wait,
f"python {codefile_path}",
Expand Down
2 changes: 1 addition & 1 deletion service/embedding.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
from models.file import File
from models.google_drive import GoogleDrive
from models.ingest import Encoder, EncoderEnum
from utils.file import get_file_extension_from_url
from utils.logger import logger
from utils.summarise import completion
from utils.file import get_file_extension_from_url
from vectordbs import get_vector_service


Expand Down
41 changes: 25 additions & 16 deletions service/router.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
from uuid import uuid4

from decouple import config
from semantic_router.encoders import CohereEncoder
from semantic_router.layer import RouteLayer
from semantic_router.route import Route

from models.document import BaseDocumentChunk
from models.query import RequestPayload

# from service.code_interpreter import CodeInterpreterService
from service.code_interpreter import CodeInterpreterService
from service.embedding import get_encoder
from utils.logger import logger
from utils.summarise import SUMMARY_SUFFIX
from vectordbs import BaseVectorDatabase, get_vector_service

STRUTURED_DATA = [".xlsx", ".csv", ".json"]


def create_route_layer() -> RouteLayer:
routes = [
Expand All @@ -35,12 +38,29 @@ async def get_documents(
*, vector_service: BaseVectorDatabase, payload: RequestPayload
) -> list[BaseDocumentChunk]:
chunks = await vector_service.query(input=payload.input, top_k=5)

if not len(chunks):
logger.error(f"No documents found for query: {payload.input}")
return []

reranked_chunks = await vector_service.rerank(query=payload.input, documents=chunks)
is_structured = chunks[0].metadata.get("document_type") in STRUTURED_DATA
reranked_chunks = []
if is_structured and payload.interpreter_mode:
async with CodeInterpreterService(
session_id=payload.session_id, file_urls=[chunks[0].metadata.get("doc_url")]
) as service:
code = await service.generate_code(query=payload.input)
response = await service.run_python(code=code)
output = response.stdout
reranked_chunks.append(
BaseDocumentChunk(
id=str(uuid4()),
document_id=str(uuid4()),
content=output,
doc_url=chunks[0].metadata.get("doc_url"),
)
)
reranked_chunks.extend(
await vector_service.rerank(query=payload.input, documents=chunks)
)
return reranked_chunks


Expand All @@ -63,15 +83,4 @@ async def query(payload: RequestPayload) -> list[BaseDocumentChunk]:
encoder=encoder,
)

# async with CodeInterpreterService(
# session_id=payload.session_id,
# file_urls=[
# "https://raw.githubusercontent.com/datasciencedojo/datasets/master/titanic.csv"
# ],
# ) as service:
# code = "df0.info()"
# output = await service.run_python(code=code)
# print(output.stderr)
# print(output.stdout)

return await get_documents(vector_service=vector_service, payload=payload)
2 changes: 1 addition & 1 deletion utils/file.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from urllib.parse import urlparse
import os
from urllib.parse import urlparse


def get_file_extension_from_url(url: str) -> str:
Expand Down