From ae3b1132eaa45528f28158d5a7f6d029cd856df9 Mon Sep 17 00:00:00 2001 From: Simonas Jakubonis <20096648+simjak@users.noreply.github.com> Date: Sat, 2 Mar 2024 07:23:08 +0200 Subject: [PATCH] feat: Semantic splitter (#63) * feat: Semantic Spliter + minor improvements * chore: testing * fix: BaseDocumentChunk output fix * feat: Combined chunk title with chunk content * chore: Updated semantic router version * chore: Updated semantic router version * Small tweaks and bug fixes * Update README * chore: Merging * chore: Testing * chore: Minor improvements * Add support for querying code interpreter (#66) * Add support for queryig code interpreter * Fix formatting * Ensure the sandbox close is called on exceptions * Update service/code_interpreter.py Co-authored-by: Tomas Valenta * Update service/code_interpreter.py Co-authored-by: Tomas Valenta * Update service/router.py Co-authored-by: Tomas Valenta * Update service/code_interpreter.py Co-authored-by: Tomas Valenta * Add system prompt * Format code * Bump dependencies * Minor tweaks --------- Co-authored-by: Tomas Valenta * Minor tweaks --------- Co-authored-by: Ismail Pelaseyed Co-authored-by: Tomas Valenta --- .vscode/settings.json | 6 + README.md | 47 ++++--- api/delete.py | 5 +- api/ingest.py | 26 ++-- api/query.py | 10 +- dev/embedding.ipynb | 2 +- dev/walkthrough.ipynb | 270 ++++++++++++++++++++++++++++++--------- models/__init__.py | 0 models/delete.py | 4 +- models/document.py | 79 +++++++++++- models/file.py | 22 ++-- models/ingest.py | 84 +++++++++++-- models/query.py | 14 ++- poetry.lock | 12 +- pyproject.toml | 3 +- service/embedding.py | 287 ++++++++++++++++++++++++++---------------- service/ingest.py | 5 +- service/router.py | 3 +- service/splitter.py | 129 +++++++++++++++++++ utils/file.py | 1 + vectordbs/pinecone.py | 64 ++++------ vectordbs/qdrant.py | 2 + 22 files changed, 803 insertions(+), 272 deletions(-) create mode 100644 .vscode/settings.json create mode 100644 models/__init__.py create mode 100644 service/splitter.py diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 00000000..20adcc69 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,6 @@ +{ + "cSpell.words": [ + "tiktoken", + "Upserted" + ] +} diff --git a/README.md b/README.md index b727dd82..f317230d 100644 --- a/README.md +++ b/README.md @@ -66,9 +66,32 @@ Super-Rag comes with a built in REST API powered by FastApi. // Payload { - "files": [{ - "url": "https://arxiv.org/pdf/2210.03629.pdf" - }], + "files": [ + { + "name": "My file", // Optional + "url": "https://path-to-my-file.pdf" + } + ], + "document_processor": { // Optional + "encoder": { + "dimensions": 384, + "model_name": "embed-multilingual-light-v3.0", + "provider": "cohere" + }, + "unstructured": { + "hi_res_model_name": "detectron2_onnx", + "partition_strategy": "auto", + "process_tables": false + }, + "splitter": { + "max_tokens": 400, + "min_tokens": 30, + "name": "semantic", + "prefix_summary": true, + "prefix_title": true, + "rolling_window_size": 1 + } + }, "vector_database": { "type": "qdrant", "config": { @@ -76,13 +99,8 @@ Super-Rag comes with a built in REST API powered by FastApi. "host": "THE QDRANT HOST" } }, - "encoder": { - "type": "openai", - "name": "text-embedding-3-small", - "dimensions": 1536 // encoder depends on the provider and model - }, - "index_name": "YOUR INDEX", - "webhook_url": "https://webhook.site/0e217d1c-49f1-424a-9992-497db09f7793" + "index_name": "my_index", + "webhook_url": "https://my-webhook-url" } ``` @@ -103,12 +121,13 @@ Super-Rag comes with a built in REST API powered by FastApi. "index_name": "YOUR INDEX", "interpreter_mode": true, "encoder": { - "type": "cohere", - "name": "embed-multilingual-light-v3.0", + "provider": "openai", + "name": "text-embedding-3-small", "dimensions": 384 }, - "exclude_fields": ["metadata"], - "session_id": "test" + "exclude_fields": ["metadata"], // Exclude specific fields + "interpreter_mode": False, // Set to True if you wish to run computation Q&A with a code interpreter + "session_id": "my_session_id" // keeps micro-vm sessions and enables caching } ``` diff --git a/api/delete.py b/api/delete.py index ae5c859e..493049da 100644 --- a/api/delete.py +++ b/api/delete.py @@ -1,7 +1,6 @@ from fastapi import APIRouter from models.delete import RequestPayload, ResponsePayload -from service.embedding import get_encoder from vectordbs import get_vector_service from vectordbs.base import BaseVectorDatabase @@ -10,12 +9,12 @@ @router.delete("/delete", response_model=ResponsePayload) async def delete(payload: RequestPayload): - encoder = get_encoder(encoder_config=payload.encoder) + encoder = payload.encoder.get_encoder() vector_service: BaseVectorDatabase = get_vector_service( index_name=payload.index_name, credentials=payload.vector_database, encoder=encoder, - dimensions=encoder.dimensions, + dimensions=payload.encoder.dimensions, ) for file in payload.files: diff --git a/api/ingest.py b/api/ingest.py index dda04f10..6fa5630b 100644 --- a/api/ingest.py +++ b/api/ingest.py @@ -5,7 +5,7 @@ from fastapi import APIRouter from models.ingest import RequestPayload -from service.embedding import EmbeddingService, get_encoder +from service.embedding import EmbeddingService from service.ingest import handle_google_drive, handle_urls from utils.summarise import SUMMARY_SUFFIX @@ -14,25 +14,33 @@ @router.post("/ingest") async def ingest(payload: RequestPayload) -> Dict: - encoder = get_encoder(encoder_config=payload.encoder) + encoder = payload.document_processor.encoder.get_encoder() embedding_service = EmbeddingService( + encoder=encoder, index_name=payload.index_name, vector_credentials=payload.vector_database, - dimensions=payload.encoder.dimensions, + dimensions=payload.document_processor.encoder.dimensions, ) + chunks = [] + summary_documents = [] if payload.files: - chunks, summary_documents = await handle_urls(embedding_service, payload.files) + chunks, summary_documents = await handle_urls( + embedding_service=embedding_service, + files=payload.files, + config=payload.document_processor, + ) + elif payload.google_drive: chunks, summary_documents = await handle_google_drive( embedding_service, payload.google_drive - ) + ) # type: ignore TODO: Fix typing await asyncio.gather( - embedding_service.generate_and_upsert_embeddings( - documents=chunks, encoder=encoder, index_name=payload.index_name + embedding_service.embed_and_upsert( + chunks=chunks, encoder=encoder, index_name=payload.index_name ), - embedding_service.generate_and_upsert_embeddings( - documents=summary_documents, + embedding_service.embed_and_upsert( + chunks=summary_documents, encoder=encoder, index_name=f"{payload.index_name}{SUMMARY_SUFFIX}", ), diff --git a/api/query.py b/api/query.py index 503e7d99..da31463d 100644 --- a/api/query.py +++ b/api/query.py @@ -1,6 +1,6 @@ from fastapi import APIRouter -from models.query import RequestPayload, ResponseData, ResponsePayload +from models.query import RequestPayload, ResponsePayload from service.router import query as _query router = APIRouter() @@ -9,5 +9,9 @@ @router.post("/query", response_model=ResponsePayload) async def query(payload: RequestPayload): chunks = await _query(payload=payload) - response_data = [ResponseData(**chunk.model_dump()) for chunk in chunks] - return {"success": True, "data": response_data} + # NOTE: Filter out fields before given to LLM + response_payload = ResponsePayload(success=True, data=chunks) + response_data = response_payload.model_dump( + exclude=set(payload.exclude_fields) if payload.exclude_fields else None + ) + return response_data diff --git a/dev/embedding.ipynb b/dev/embedding.ipynb index 3d4626c8..f7aa4cea 100644 --- a/dev/embedding.ipynb +++ b/dev/embedding.ipynb @@ -40,7 +40,7 @@ "metadata": {}, "outputs": [], "source": [ - "elements = await embedding_service._download_and_extract_elements(file, strategy=\"auto\")\n" + "elements = await embedding_service._partition_file(file, strategy=\"auto\")\n" ] }, { diff --git a/dev/walkthrough.ipynb b/dev/walkthrough.ipynb index 3a5f9cfb..d91d867d 100644 --- a/dev/walkthrough.ipynb +++ b/dev/walkthrough.ipynb @@ -2,12 +2,23 @@ "cells": [ { "cell_type": "code", - "execution_count": null, + "execution_count": 1, "metadata": {}, - "outputs": [], + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "API_URL: http://localhost:8000\n", + "PINECONE_INDEX: simonas-serverless-384\n", + "PINECONE_HOST: https://simonas-serverless-1536-75c816a.svc.apw5-4e34-81fa.pinecone.io\n" + ] + } + ], "source": [ "import os\n", "import requests\n", + "import json\n", "from dotenv import load_dotenv\n", "load_dotenv()\n", "\n", @@ -17,16 +28,24 @@ "PINECONE_HOST = os.environ.get('PINECONE_HOST', '')\n", "\n", "print(\"API_URL:\", API_URL)\n", - "print(\"PINECONE_API_KEY:\", PINECONE_API_KEY)\n", + "# print(\"PINECONE_API_KEY:\", PINECONE_API_KEY)\n", "print(\"PINECONE_INDEX:\", PINECONE_INDEX)\n", "print(\"PINECONE_HOST:\", PINECONE_HOST)" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 2, "metadata": {}, - "outputs": [], + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{'success': True, 'index_name': 'simonas-serverless-384'}\n" + ] + } + ], "source": [ "# Ingest a file\n", "url = f\"{API_URL}/api/v1/ingest\"\n", @@ -34,7 +53,7 @@ "payload = {\n", " \"files\": [\n", " {\n", - " \"type\": \"PDF\",\n", + " \"name\": \"chunking\",\n", " \"url\": \"https://arxiv.org/pdf/2402.05131.pdf\"\n", " }\n", " ],\n", @@ -46,7 +65,6 @@ " }\n", " },\n", " \"index_name\": PINECONE_INDEX,\n", - " \"encoder\": \"cohere\",\n", "}\n", "\n", "response = requests.post(url, json=payload)\n", @@ -56,35 +74,189 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 3, "metadata": {}, - "outputs": [], + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{\n", + " \"success\": true,\n", + " \"data\": [\n", + " {\n", + " \"id\": \"75d3adef-0fec-496e-99a7-0510d9c2ed5d\",\n", + " \"doc_url\": \"https://arxiv.org/pdf/2402.05131.pdf\",\n", + " \"document_id\": \"doc_fdadb486-da0e-4bc3-ada5-d583831cb112\",\n", + " \"content\": \"2 Related work\\nMore speci\\ufb01cally on document chunking methods for RAG, there are stan- dard approaches being considered such as chunking text into spans of a given token length (e.g. 128 and 256) or chunking based on sentences. Open source projects already allow simple processing of documents (e.g. Unstructured4, Lla- maindex5 or Langchain 6), without explicitly considering the table structure on which these chunking strategies are applied. Even though di\\ufb00erent approaches are available, an exhaustive evaluation of chunking applied to RAG and speci\\ufb01cally to \\ufb01nancial reporting, except for some limited chunking analysis [14,36], is non-existent. In our work, we compare a broad range of chunking approaches in addition to more simple ones and provide an analysis of the outcomes of di\\ufb00erent methods when asking questions about di\\ufb00erent aspects of the reports.\",\n", + " \"source\": \"https://arxiv.org/pdf/2402.05131.pdf\",\n", + " \"source_type\": \".pdf\",\n", + " \"chunk_index\": null,\n", + " \"title\": \"2 Related work\",\n", + " \"token_count\": null,\n", + " \"page_number\": 3,\n", + " \"metadata\": {\n", + " \"filename\": \"tmpykpa2wwh.pdf\",\n", + " \"filetype\": \"application/pdf\",\n", + " \"languages\": [\n", + " \"eng\"\n", + " ],\n", + " \"parent_id\": \"5cdbed1de9473b8856ab0befd08ff7cb\"\n", + " },\n", + " \"dense_embedding\": null\n", + " },\n", + " {\n", + " \"id\": \"58353d3f-a938-43f7-bde8-0e99125fa2f9\",\n", + " \"doc_url\": \"https://arxiv.org/pdf/2402.05131.pdf\",\n", + " \"document_id\": \"doc_fdadb486-da0e-4bc3-ada5-d583831cb112\",\n", + " \"content\": \"Table 3. Chunks statistics for basic chunking elements and Unstructured elements\\nResults in table 5 show that element-based chunking strategies o\\ufb00er the best question-answering accuracy, which is consistent with page retrieval and para- graph retrieval accuracy. Lastly, our approach stands out for its e\\ufb03ciency. Not only is element-based chunking generalizable without the need to select the chunk size, but when com- pared to the aggregation results that yield the highest retrieval scores. Element- based chunking achieves the highest retrieval scores with only half the number of chunks required compared to methods that do not consider the structure of the documents (62,529 v.s. 112,155). This can reduce the indexing cost and im- prove query latency because there are only half as many vectors to index for the vectordb that stores the chunks. This underscores the e\\ufb00ectiveness of our solu- tion in optimizing the balance between performance and computational resource requirements.\",\n", + " \"source\": \"https://arxiv.org/pdf/2402.05131.pdf\",\n", + " \"source_type\": \".pdf\",\n", + " \"chunk_index\": null,\n", + " \"title\": \"Table 3. Chunks statistics for basic chunking elements and Unstructured elements\",\n", + " \"token_count\": null,\n", + " \"page_number\": 9,\n", + " \"metadata\": {\n", + " \"filename\": \"tmpykpa2wwh.pdf\",\n", + " \"filetype\": \"application/pdf\",\n", + " \"languages\": [\n", + " \"eng\"\n", + " ],\n", + " \"parent_id\": \"53ffedc9520f52ef2c8e4568301c8530\"\n", + " },\n", + " \"dense_embedding\": null\n", + " },\n", + " {\n", + " \"id\": \"e3caf266-27a8-4654-94ec-9b82ead3c9ce\",\n", + " \"doc_url\": \"https://arxiv.org/pdf/2402.05131.pdf\",\n", + " \"document_id\": \"doc_fdadb486-da0e-4bc3-ada5-d583831cb112\",\n", + " \"content\": \"Table 3. Chunks statistics for basic chunking elements and Unstructured elements\\nRetrieval Accuracy Secondly, we evaluate the capabilities of each chunking strategy in terms of retrieval accuracy. We use the page numbers in the ground truth to calculate the page-level retrieval accuracy, and we use ROGUE [24] and BLEU [32] scores to evaluate the accuracy of paragraph-level retrieval compared to the ground truth evidence paragraphs. As shown in Table 4, when compared to Unstructured element-based chunk- ing strategies, basic chunking strategies seem to have higher page-level retrieval accuracy but lower paragraph-level accuracy on average. Additionally, basic chunking strategies also lack consistency between page-level and paragraph-level accuracy; higher page-level accuracy doesn\\u2019t ensure higher paragraph-level ac- curacy. For example, Base 128 has the second highest page-level accuracy but\",\n", + " \"source\": \"https://arxiv.org/pdf/2402.05131.pdf\",\n", + " \"source_type\": \".pdf\",\n", + " \"chunk_index\": null,\n", + " \"title\": \"Table 3. Chunks statistics for basic chunking elements and Unstructured elements\",\n", + " \"token_count\": null,\n", + " \"page_number\": 9,\n", + " \"metadata\": {\n", + " \"filename\": \"tmpykpa2wwh.pdf\",\n", + " \"filetype\": \"application/pdf\",\n", + " \"languages\": [\n", + " \"eng\"\n", + " ],\n", + " \"parent_id\": \"53ffedc9520f52ef2c8e4568301c8530\"\n", + " },\n", + " \"dense_embedding\": null\n", + " },\n", + " {\n", + " \"id\": \"14257177-480d-45cf-9759-f6e8b1bd60b5\",\n", + " \"doc_url\": \"https://arxiv.org/pdf/2402.05131.pdf\",\n", + " \"document_id\": \"doc_fdadb486-da0e-4bc3-ada5-d583831cb112\",\n", + " \"content\": \"5 Discussion\\new have observed that using basic 512 chunking strategies produces results most similar to the Unstructured element-based approach, which may be due to the fact that 512 tokens share a similar length with the token size within our element-based chunks and capture a long context, but fail keep a coherent context in some cases, leaving out relevant information required for Q&A. This is further observed when considering the ROGUE and BLEU scores in table 4, where the chunk contexts for the baseline have lower scores. These \\ufb01ndings support existing research stating that the best basic chunk size varies from data to data [3]. These results show, as well, that our method adapts to di\\ufb00erent documents without tuning. Our method relies on the struc-\",\n", + " \"source\": \"https://arxiv.org/pdf/2402.05131.pdf\",\n", + " \"source_type\": \".pdf\",\n", + " \"chunk_index\": null,\n", + " \"title\": \"5 Discussion\",\n", + " \"token_count\": null,\n", + " \"page_number\": 11,\n", + " \"metadata\": {\n", + " \"filename\": \"tmpykpa2wwh.pdf\",\n", + " \"filetype\": \"application/pdf\",\n", + " \"languages\": [\n", + " \"eng\"\n", + " ],\n", + " \"parent_id\": \"2a6506945581218449cc497a03e8cfcd\"\n", + " },\n", + " \"dense_embedding\": null\n", + " },\n", + " {\n", + " \"id\": \"94411542-6ad8-4454-ad42-d0fbf9f5b4f9\",\n", + " \"doc_url\": \"https://arxiv.org/pdf/2402.05131.pdf\",\n", + " \"document_id\": \"doc_fdadb486-da0e-4bc3-ada5-d583831cb112\",\n", + " \"content\": \"3.4 Chunking\\nThe list of elements considered are provided by the Unstructured9 open source library. From the set of processing strategies, 9 https://unstructured-io.github.io/unstructured/introduction.html#\",\n", + " \"source\": \"https://arxiv.org/pdf/2402.05131.pdf\",\n", + " \"source_type\": \".pdf\",\n", + " \"chunk_index\": null,\n", + " \"title\": \"3.4 Chunking\",\n", + " \"token_count\": null,\n", + " \"page_number\": 6,\n", + " \"metadata\": {\n", + " \"filename\": \"tmpykpa2wwh.pdf\",\n", + " \"filetype\": \"application/pdf\",\n", + " \"languages\": [\n", + " \"eng\"\n", + " ],\n", + " \"links\": [\n", + " \"{'text': '9https :// unstructured - io . github . io / unstructured / introduction . html', 'url': 'https://unstructured-io.github.io/unstructured/introduction.html#elements', 'start_index': 313}\"\n", + " ],\n", + " \"parent_id\": \"dac017d1d3734f5431cae57dcc72f748\"\n", + " },\n", + " \"dense_embedding\": null\n", + " }\n", + " ]\n", + "}\n" + ] + } + ], "source": [ - "# Ingest a file\n", - "url = f\"{API_URL}/api/v1/ingest\"\n", + "# Query the index\n", + "query_url = f\"{API_URL}/api/v1/query\"\n", "\n", - "payload = {\n", - " \"files\": [\n", + "query_payload = {\n", + " \"input\": \"What are the chunking strategies?\",\n", + " \"vector_database\": {\n", + " \"type\": \"pinecone\",\n", + " \"config\": {\n", + " \"api_key\": PINECONE_API_KEY,\n", + " \"host\": PINECONE_HOST,\n", + " }\n", + " },\n", + " \"index_name\": PINECONE_INDEX,\n", + "}\n", + "\n", + "query_response = requests.post(query_url, json=query_payload)\n", + "\n", + "# NOTE: Filter out fields before given to LLM\n", + "# Include title, content, source, page_number, chunk_index\n", + "formatted_json = json.dumps(query_response.json(), indent=4)\n", + "print(formatted_json)" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{'success': True, 'data': {'num_of_deleted_chunks': 59}}\n" + ] + } + ], + "source": [ + "# Delete the index\n", + "query_url = f\"{API_URL}/api/v1/delete\"\n", + "\n", + "delete_payload = {\n", + " \"files\": [\n", " {\n", - " \"type\": \"PDF\",\n", " \"url\": \"https://arxiv.org/pdf/2402.05131.pdf\"\n", " }\n", " ],\n", + " \"index_name\": PINECONE_INDEX,\n", " \"vector_database\": {\n", - " \"type\": \"weaviate\",\n", + " \"type\": \"pinecone\",\n", " \"config\": {\n", - " \"api_key\": \"9eXH8oNR0uqN3GvvzAgaUD11ltPnGqZG2RFQ\",\n", - " \"host\": \"https://superagent-ragas-1575sjfq.weaviate.network\"\n", + " \"api_key\": PINECONE_API_KEY,\n", + " \"host\": PINECONE_HOST,\n", " }\n", " },\n", - " \"index_name\": \"homanp11\",\n", - " \"encoder\": \"cohere\",\n", - " \"webhook_url\": \"https://webhook.site/0e217d1c-49f1-424a-9992-497db09f7793\"\n", "}\n", "\n", - "response = requests.post(url, json=payload)\n", + "delete_response = requests.delete(query_url, json=delete_payload)\n", "\n", - "print(response.json())" + "print(delete_response.json())" ] }, { @@ -93,25 +265,30 @@ "metadata": {}, "outputs": [], "source": [ - "# Query the index\n", - "query_url = f\"{API_URL}/api/v1/query\"\n", + "# Ingest a file\n", + "url = f\"{API_URL}/api/v1/ingest\"\n", "\n", - "query_payload = {\n", - " \"input\": \"What are the chunking strategies?\",\n", + "payload = {\n", + " \"files\": [\n", + " {\n", + " \"type\": \"PDF\",\n", + " \"url\": \"https://arxiv.org/pdf/2402.05131.pdf\"\n", + " }\n", + " ],\n", " \"vector_database\": {\n", - " \"type\": \"pinecone\",\n", + " \"type\": \"weaviate\",\n", " \"config\": {\n", - " \"api_key\": PINECONE_API_KEY,\n", - " \"host\": PINECONE_HOST,\n", + " \"api_key\": \"9eXH8oNR0uqN3GvvzAgaUD11ltPnGqZG2RFQ\",\n", + " \"host\": \"https://superagent-ragas-1575sjfq.weaviate.network\"\n", " }\n", " },\n", - " \"index_name\": PINECONE_INDEX,\n", - " \"encoder\": \"cohere\",\n", + " \"index_name\": \"homanp11\",\n", + " \"webhook_url\": \"https://webhook.site/0e217d1c-49f1-424a-9992-497db09f7793\"\n", "}\n", "\n", - "query_response = requests.post(query_url, json=query_payload)\n", + "response = requests.post(url, json=payload)\n", "\n", - "print(query_response.json())" + "print(response.json())" ] }, { @@ -161,33 +338,6 @@ "data" ] }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Delete the index\n", - "query_url = f\"{API_URL}/api/v1/delete\"\n", - "\n", - "delete_payload = {\n", - " \"file_url\": \"https://arxiv.org/pdf/2210.03629.pdf\",\n", - " \"vector_database\": {\n", - " \"type\": \"pinecone\",\n", - " \"config\": {\n", - " \"api_key\": PINECONE_API_KEY,\n", - " \"host\": PINECONE_HOST,\n", - " }\n", - " },\n", - " \"index_name\": PINECONE_INDEX,\n", - " \"encoder\": \"cohere\",\n", - "}\n", - "\n", - "delete_response = requests.delete(query_url, json=delete_payload)\n", - "\n", - "print(delete_response.json())" - ] - }, { "cell_type": "code", "execution_count": null, diff --git a/models/__init__.py b/models/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/models/delete.py b/models/delete.py index a000a7c0..d18d6c62 100644 --- a/models/delete.py +++ b/models/delete.py @@ -1,6 +1,6 @@ from pydantic import BaseModel -from models.ingest import Encoder +from models.ingest import EncoderConfig from models.vector_database import VectorDatabase @@ -12,7 +12,7 @@ class RequestPayload(BaseModel): index_name: str files: list[File] vector_database: VectorDatabase - encoder: Encoder + encoder: EncoderConfig = EncoderConfig() class DeleteResponse(BaseModel): diff --git a/models/document.py b/models/document.py index 5973ccf1..dfd7b763 100644 --- a/models/document.py +++ b/models/document.py @@ -13,25 +13,94 @@ class BaseDocument(BaseModel): class BaseDocumentChunk(BaseModel): id: str + doc_url: str | None = None document_id: str content: str - doc_url: str + source: str | None = None + source_type: str | None = None + chunk_index: int | None = None + title: str | None = None + token_count: int | None = None page_number: int | None = None metadata: dict | None = None dense_embedding: Optional[List[float]] = None + @classmethod + def from_metadata(cls, metadata: dict): + exclude_keys = { + "chunk_id", + "chunk_index", + "document_id", + "doc_url", + "content", + "source", + "source_type", + "title", + "token_count", + "page_number", + } + # Prepare metadata for the constructor and for embedding into the object + constructor_metadata = { + k: v for k, v in metadata.items() if k not in exclude_keys + } + filtered_metadata = { + k: v for k, v in metadata.items() if k in exclude_keys and k != "chunk_id" + } + + def to_int(value): + try: + return int(value) if str(value).isdigit() else None + except (TypeError, ValueError): + return None + + chunk_index = to_int(metadata.get("chunk_index")) + token_count = to_int(metadata.get("token_count")) + + # Remove explicitly passed keys from filtered_metadata to avoid duplication + for key in ["chunk_index", "token_count"]: + filtered_metadata.pop(key, None) + + return cls( + id=metadata.get("chunk_id", ""), + chunk_index=chunk_index, + token_count=token_count, + **filtered_metadata, # Pass filtered metadata for constructor + metadata=constructor_metadata, # Pass the rest as part of the metadata + dense_embedding=metadata.get("values"), + ) + @validator("id") - def id_must_be_valid_uuid(cls, v): # noqa: F841 + def id_must_be_valid_uuid(cls, v): try: uuid_obj = uuid.UUID(v, version=4) return str(uuid_obj) except ValueError: - raise ValueError("id must be a valid UUID") + raise ValueError(f"id must be a valid UUID, got {v}") @validator("dense_embedding") - def embeddings_must_be_list_of_floats(cls, v): # noqa: F841 + def embeddings_must_be_list_of_floats(cls, v): if v is None: return v # Allow None to pass through if not all(isinstance(item, float) for item in v): - raise ValueError("embeddings must be a list of floats") + raise ValueError(f"embeddings must be a list of floats, got {v}") return v + + def to_vector_db(self): + metadata = { + "chunk_id": self.id, + "chunk_index": self.chunk_index or "", + "document_id": self.document_id, + "doc_url": self.doc_url, + "content": self.content, + "source": self.source, + "source_type": self.source_type, + "title": self.title or "", + "token_count": self.token_count, + **(self.metadata or {}), + } + result = { + "id": self.id, + "values": self.dense_embedding, + "metadata": metadata, + } + return result diff --git a/models/file.py b/models/file.py index efceeecb..9a8d0167 100644 --- a/models/file.py +++ b/models/file.py @@ -1,7 +1,7 @@ from enum import Enum from urllib.parse import unquote, urlparse -from pydantic import BaseModel, validator +from pydantic import BaseModel class FileType(Enum): @@ -32,13 +32,11 @@ def suffix(self) -> str: class File(BaseModel): url: str - type: FileType | None = None + name: str | None = None - @validator("type", pre=True, always=True) - def set_type_from_url(cls, v, values): # noqa: F841 - if v is not None: - return v - url = values.get("url") + @property + def type(self) -> FileType | None: + url = self.url if url: parsed_url = urlparse(url) path = unquote(parsed_url.path) @@ -47,4 +45,12 @@ def set_type_from_url(cls, v, values): # noqa: F841 return FileType[extension] except KeyError: raise ValueError(f"Unsupported file type for URL: {url}") - return v + return None + + @property + def suffix(self) -> str: + file_type = self.type + if file_type is not None: + return file_type.suffix() + else: + raise ValueError("File type is undefined, cannot determine suffix.") diff --git a/models/ingest.py b/models/ingest.py index b4ec62ac..e82920d9 100644 --- a/models/ingest.py +++ b/models/ingest.py @@ -1,28 +1,92 @@ from enum import Enum -from typing import List, Optional +from typing import List, Literal, Optional -from pydantic import BaseModel +from pydantic import BaseModel, Field +from semantic_router.encoders import BaseEncoder, CohereEncoder, OpenAIEncoder from models.file import File from models.google_drive import GoogleDrive from models.vector_database import VectorDatabase -class EncoderEnum(str, Enum): +class EncoderProvider(str, Enum): cohere = "cohere" openai = "openai" -class Encoder(BaseModel): - name: str - type: str - dimensions: Optional[int] = None +class EncoderConfig(BaseModel): + provider: EncoderProvider = Field( + default=EncoderProvider.cohere, description="Embedding provider" + ) + model_name: str = Field( + default="embed-multilingual-light-v3.0", + description="Model name for the encoder", + ) + dimensions: int = Field(default=384, description="Dimension of the encoder output") + + _encoder_config = { + EncoderProvider.cohere: { + "class": CohereEncoder, + "default_model_name": "embed-multilingual-light-v3.0", + "default_dimensions": 384, + }, + EncoderProvider.openai: { + "class": OpenAIEncoder, + "default_model_name": "text-embedding-3-small", + "default_dimensions": 1536, + }, + } + + def get_encoder(self) -> BaseEncoder: + config = self._encoder_config.get(self.provider) + if not config: + raise ValueError(f"Encoder '{self.provider}' not found.") + model_name = self.model_name or config["default_model_name"] + encoder_class = config["class"] + return encoder_class(name=model_name) + + +class UnstructuredConfig(BaseModel): + partition_strategy: Literal["auto", "hi_res"] = Field(default="auto") + hi_res_model_name: Literal["detectron2_onnx", "chipper"] = Field( + default="detectron2_onnx", description="Only for `hi_res` strategy" + ) + process_tables: bool = Field( + default=False, description="Only for `hi_res` strategy" + ) + + +class SplitterConfig(BaseModel): + name: Literal["semantic", "by_title"] = Field( + default="semantic", description="Splitter name, `semantic` or `by_title`" + ) + min_tokens: int = Field(default=30, description="Only for `semantic` method") + max_tokens: int = Field( + default=400, description="Only for `semantic` and `recursive` methods" + ) + rolling_window_size: int = Field( + default=1, + description="Only for `semantic` method, cumulative window size " + "for comparing similarity between elements", + ) + prefix_title: bool = Field( + default=True, description="Add to split titles, headers, only `semantic` method" + ) + prefix_summary: bool = Field( + default=True, description="Add to split sub-document summary" + ) + + +class DocumentProcessorConfig(BaseModel): + encoder: EncoderConfig = EncoderConfig() + unstructured: UnstructuredConfig = UnstructuredConfig() + splitter: SplitterConfig = SplitterConfig() class RequestPayload(BaseModel): + index_name: str + vector_database: VectorDatabase + document_processor: DocumentProcessorConfig = DocumentProcessorConfig() files: Optional[List[File]] = None google_drive: Optional[GoogleDrive] = None - encoder: Encoder - vector_database: VectorDatabase - index_name: str webhook_url: Optional[str] = None diff --git a/models/query.py b/models/query.py index c8e8be2c..9289ffbc 100644 --- a/models/query.py +++ b/models/query.py @@ -2,7 +2,8 @@ from pydantic import BaseModel -from models.ingest import Encoder +from models.document import BaseDocumentChunk +from models.ingest import EncoderConfig from models.vector_database import VectorDatabase @@ -10,9 +11,10 @@ class RequestPayload(BaseModel): input: str vector_database: VectorDatabase index_name: str - encoder: Encoder + encoder: EncoderConfig = EncoderConfig() session_id: Optional[str] = None interpreter_mode: Optional[bool] = False + exclude_fields: List[str] = None class ResponseData(BaseModel): @@ -24,4 +26,10 @@ class ResponseData(BaseModel): class ResponsePayload(BaseModel): success: bool - data: List[ResponseData] + data: List[BaseDocumentChunk] + + def model_dump(self, exclude: set = None): + return { + "success": self.success, + "data": [chunk.dict(exclude=exclude) for chunk in self.data], + } diff --git a/poetry.lock b/poetry.lock index b4112dde..6889e530 100644 --- a/poetry.lock +++ b/poetry.lock @@ -3331,13 +3331,13 @@ files = [ [[package]] name = "semantic-router" -version = "0.0.22" +version = "0.0.26" description = "Super fast semantic router for AI decision making" optional = false python-versions = ">=3.9,<3.13" files = [ - {file = "semantic_router-0.0.22-py3-none-any.whl", hash = "sha256:6322a43dff9c21c5949df20720262f03b62e484c34dbc02aa6ed046344716ce9"}, - {file = "semantic_router-0.0.22.tar.gz", hash = "sha256:f75fcfbf0eaed0b1a19fa6ed79f3b11a03605d06bacee3595680a233c0abc0f1"}, + {file = "semantic_router-0.0.26-py3-none-any.whl", hash = "sha256:edc3664c913867f7cd8006226af6240fb9b2bface58cdf6eee13a57f5bd058cf"}, + {file = "semantic_router-0.0.26.tar.gz", hash = "sha256:c4c37e6542b28f487baec4f02a5388239dc4f61238b033d2a01a169c6811d1a3"}, ] [package.dependencies] @@ -3350,12 +3350,16 @@ numpy = ">=1.25.2,<2.0.0" openai = ">=1.10.0,<2.0.0" pydantic = ">=2.5.3,<3.0.0" pyyaml = ">=6.0.1,<7.0.0" +regex = ">=2023.12.25,<2024.0.0" +tiktoken = ">=0.6.0,<0.7.0" [package.extras] fastembed = ["fastembed (>=0.1.3,<0.2.0)"] hybrid = ["pinecone-text (>=0.7.1,<0.8.0)"] local = ["llama-cpp-python (>=0.2.28,<0.3.0)", "torch (>=2.1.0,<3.0.0)", "transformers (>=4.36.2,<5.0.0)"] pinecone = ["pinecone-client (>=3.0.0,<4.0.0)"] +processing = ["matplotlib (>=3.8.3,<4.0.0)"] +vision = ["pillow (>=10.2.0,<11.0.0)", "torch (>=2.1.0,<3.0.0)", "torchvision (>=0.16.2,<0.17.0)", "transformers (>=4.36.2,<5.0.0)"] [[package]] name = "setuptools" @@ -4254,4 +4258,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p [metadata] lock-version = "2.0" python-versions = ">=3.9,<3.12" -content-hash = "9fab773e0bb1d4a412cdf10a47ff271056bd99f8b0e76a9309b377c732c50e63" +content-hash = "b9774decb9338d39235bb4599347540fa5a684cecd89d194a95b26257be43364" diff --git a/pyproject.toml b/pyproject.toml index c0783f2f..7263d176 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,7 +19,7 @@ black = "^23.12.1" flake8 = "^7.0.0" vulture = "^2.11" python-decouple = "^3.8" -semantic-router = {version = "^0.0.22"} +semantic-router = {version = "^0.0.26"} astrapy = "^0.7.4" openai = "^1.12.0" tqdm = "^4.66.2" @@ -32,6 +32,7 @@ e2b = "^0.14.7" gunicorn = "^21.2.0" unstructured-client = "^0.18.0" unstructured = {extras = ["google-drive"], version = "^0.12.4"} +tiktoken = "^0.6.0" [tool.poetry.group.dev.dependencies] termcolor = "^2.4.0" diff --git a/service/embedding.py b/service/embedding.py index 70c7d2bf..9b1d9173 100644 --- a/service/embedding.py +++ b/service/embedding.py @@ -2,15 +2,14 @@ import copy import uuid from tempfile import NamedTemporaryFile -from typing import Any, List, Optional +from typing import Any, List, Literal, Optional import numpy as np import requests +import tiktoken from decouple import config from semantic_router.encoders import ( BaseEncoder, - CohereEncoder, - OpenAIEncoder, ) from tqdm import tqdm from unstructured_client import UnstructuredClient @@ -20,22 +19,28 @@ from models.document import BaseDocument, BaseDocumentChunk from models.file import File from models.google_drive import GoogleDrive -from models.ingest import Encoder, EncoderEnum -from utils.file import get_file_extension_from_url +from models.ingest import DocumentProcessorConfig +from service.splitter import UnstructuredSemanticSplitter from utils.logger import logger from utils.summarise import completion from vectordbs import get_vector_service +# TODO: Add similarity score to the BaseDocumentChunk +# TODO: Add relevance score to the BaseDocumentChunk +# TODO: Add created_at date to the BaseDocumentChunk + class EmbeddingService: def __init__( self, index_name: str, + encoder: BaseEncoder, vector_credentials: dict, dimensions: Optional[int], files: Optional[List[File]] = None, google_drive: Optional[GoogleDrive] = None, ): + self.encoder = encoder self.files = files self.google_drive = google_drive self.index_name = index_name @@ -46,7 +51,7 @@ def __init__( server_url=config("UNSTRUCTURED_IO_SERVER_URL"), ) - def _get_strategy(self, type: str) -> dict: + def _get_strategy(self, type: str) -> Optional[str]: strategies = { "PDF": "auto", } @@ -55,142 +60,219 @@ def _get_strategy(self, type: str) -> dict: except KeyError: return None - async def _download_and_extract_elements( - self, file: File, strategy: Optional[str] = "hi_res" + async def _partition_file( + self, + file: File, + strategy="auto", + returned_elements_type: Literal["chunked", "original"] = "chunked", ) -> List[Any]: """ Downloads the file and extracts elements using the partition function. Returns a list of unstructured elements. """ + # TODO: This will overwrite the function parameter? + # if file.type is None: + # raise ValueError(f"File type not set for {file.url}") + # strategy = self._get_strategy(type=file.type.value) + + # TODO: Do we need this if we have default value in the function signature? + # if strategy is None: + # strategy = "auto" + logger.info( f"Downloading and extracting elements from {file.url}," f"using `{strategy}` strategy" ) - suffix = get_file_extension_from_url(url=file.url) - strategy = self._get_strategy(type=file.type.value) - with NamedTemporaryFile(suffix=suffix, delete=True) as temp_file: + with NamedTemporaryFile(suffix=file.suffix, delete=True) as temp_file: with requests.get(url=file.url) as response: temp_file.write(response.content) temp_file.flush() temp_file.seek(0) # Reset file pointer to the beginning file_content = temp_file.read() file_name = temp_file.name + files = shared.Files( content=file_content, file_name=file_name, ) - req = shared.PartitionParameters( - files=files, - include_page_breaks=True, - strategy=strategy, - max_characters=1500, - new_after_n_chars=1000, - chunking_strategy="by_title", - ) + if returned_elements_type == "original": + req = shared.PartitionParameters( + files=files, + include_page_breaks=True, + strategy=strategy, + ) + else: + req = shared.PartitionParameters( + files=files, + include_page_breaks=True, + strategy=strategy, + max_characters=2500, + new_after_n_chars=1000, + chunking_strategy="by_title", + ) try: unstructured_response = self.unstructured_client.general.partition(req) + if unstructured_response.elements is not None: + return unstructured_response.elements + else: + logger.error( + f"Error partitioning file {file.url}: {unstructured_response}" + ) + return [] except SDKError as e: - print(e) - return unstructured_response.elements or [] + logger.error(f"Error partitioning file {file.url}: {e}") + return [] - async def generate_document( - self, file: File, elements: List[Any] - ) -> BaseDocument | None: - logger.info(f"Generating document from {file.url}") - try: - doc_content = "".join(element.get("text") for element in elements) - if not doc_content: - logger.error(f"Cannot extract text from {file.url}") - return None - doc_metadata = { - "source": file.url, - "source_type": "document", - "document_type": get_file_extension_from_url(url=file.url), - } - return BaseDocument( - id=f"doc_{uuid.uuid4()}", - content=doc_content, - doc_url=file.url, - metadata=doc_metadata, - ) - except Exception as e: - logger.error(f"Error loading document {file.url}: {e}") + def _tiktoken_length(self, text: str): + tokenizer = tiktoken.get_encoding("cl100k_base") + tokens = tokenizer.encode(text, disallowed_special=()) + return len(tokens) + + def _sanitize_metadata(self, metadata: dict) -> dict: + def sanitize_value(value): + if isinstance(value, (str, int, float, bool)): + return value + elif isinstance(value, list): + # Ensure all elements in the list are of type str, int, float, or bool + # Convert non-compliant elements to str + sanitized_list = [] + for v in value: + if isinstance(v, (str, int, float, bool)): + sanitized_list.append(v) + elif isinstance(v, (dict, list)): + # For nested structures, convert to a string representation + sanitized_list.append(str(v)) + else: + sanitized_list.append(str(v)) + return sanitized_list + elif isinstance(value, dict): + return {k: sanitize_value(v) for k, v in value.items()} + else: + return str(value) + + return {key: sanitize_value(value) for key, value in metadata.items()} async def generate_chunks( - self, strategy: Optional[str] = "auto" + self, + config: DocumentProcessorConfig, ) -> List[BaseDocumentChunk]: doc_chunks = [] for file in tqdm(self.files, desc="Generating chunks"): try: - chunks = await self._download_and_extract_elements(file, strategy) - document = await self.generate_document(file, chunks) - if not document: + chunks = [] + if config.splitter.name == "by_title": + chunked_elements = await self._partition_file( + file, strategy=config.unstructured.partition_strategy + ) + # TODO: handle chunked_elements being None + for element in chunked_elements: + chunk_data = { + "content": element.get("text"), + "metadata": self._sanitize_metadata( + element.get("metadata") + ), + } + chunks.append(chunk_data) + + if config.splitter.name == "semantic": + elements = await self._partition_file( + file, + strategy=config.unstructured.partition_strategy, + returned_elements_type="original", + ) + splitter_config = UnstructuredSemanticSplitter( + encoder=self.encoder, + window_size=config.splitter.rolling_window_size, + min_split_tokens=config.splitter.min_tokens, + max_split_tokens=config.splitter.max_tokens, + ) + chunks = await splitter_config(elements=elements) + + if not chunks: continue + + document_id = f"doc_{uuid.uuid4()}" + document_content = "" for chunk in chunks: - # Ensure all metadata values are of a type acceptable - sanitized_metadata = { - key: ( - value - if isinstance(value, (str, int, float, bool, list)) - else str(value) - ) - for key, value in chunk.get("metadata").items() - } - chunk_id = str(uuid.uuid4()) # must be a valid UUID - chunk_text = chunk.get("text") - doc_chunks.append( - BaseDocumentChunk( - id=chunk_id, - document_id=document.id, - content=chunk_text, - doc_url=file.url, - metadata={ - "chunk_id": chunk_id, - "document_id": document.id, - "source": file.url, - "source_type": "document", - "document_type": get_file_extension_from_url(file.url), - "content": chunk_text, - **sanitized_metadata, - }, + document_content += chunk.get("content", "") + chunk_id = str(uuid.uuid4()) + + if config.splitter.prefix_title: + content = ( + f"{chunk.get('title', '')}\n{chunk.get('content', '')}" ) + else: + content = chunk.get("content", "") + doc_chunk = BaseDocumentChunk( + id=chunk_id, + doc_url=file.url, + document_id=document_id, + content=content, + source=file.url, + source_type=file.suffix, + chunk_index=chunk.get("chunk_index", None), + title=chunk.get("title", None), + token_count=self._tiktoken_length(chunk.get("content", "")), + metadata=self._sanitize_metadata(chunk.get("metadata", {})), ) + doc_chunks.append(doc_chunk) + + # This object will be used for evaluation purposes + BaseDocument( + id=document_id, + content=document_content, + doc_url=file.url, + metadata={ + "source": file.url, + "source_type": "document", + "document_type": file.suffix, + }, + ) + except Exception as e: logger.error(f"Error loading chunks from {file.url}: {e}") + raise return doc_chunks - async def generate_and_upsert_embeddings( + async def embed_and_upsert( self, - documents: List[BaseDocumentChunk], + chunks: List[BaseDocumentChunk], encoder: BaseEncoder, index_name: Optional[str] = None, + batch_size: int = 100, ) -> List[BaseDocumentChunk]: - pbar = tqdm(total=len(documents), desc="Generating embeddings") + pbar = tqdm(total=len(chunks), desc="Generating embeddings") sem = asyncio.Semaphore(10) # Limit to 10 concurrent tasks - async def safe_generate_embedding( - chunk: BaseDocumentChunk, - ) -> BaseDocumentChunk | None: + async def embed_batch( + chunks_batch: List[BaseDocumentChunk], + ) -> List[BaseDocumentChunk]: async with sem: try: - return await generate_embedding(chunk) + texts = [chunk.content for chunk in chunks_batch] + embeddings = encoder(texts) + for chunk, embedding in zip(chunks_batch, embeddings): + chunk.dense_embedding = np.array(embedding).tolist() + pbar.update(len(chunks_batch)) # Update the progress bar + return chunks_batch except Exception as e: - logger.error(f"Error embedding document {chunk.id}: {e}") - return None + logger.error(f"Error embedding a batch of documents: {e}") + raise - async def generate_embedding( - chunk: BaseDocumentChunk, - ) -> BaseDocumentChunk | None: - if chunk is not None: - embeddings: List[np.ndarray] = [ - np.array(e) for e in encoder([chunk.content]) - ] - chunk.dense_embedding = embeddings[0].tolist() - pbar.update() - return chunk - - tasks = [safe_generate_embedding(document) for document in documents] - chunks_with_embeddings = await asyncio.gather(*tasks, return_exceptions=False) + # Create batches of chunks + chunks_batches = [ + chunks[i : i + batch_size] for i in range(0, len(chunks), batch_size) + ] + + # Process each batch + tasks = [embed_batch(batch) for batch in chunks_batches] + chunks_with_embeddings = await asyncio.gather(*tasks) + chunks_with_embeddings = [ + chunk + for batch in chunks_with_embeddings + for chunk in batch + if chunk is not None + ] pbar.close() vector_service = get_vector_service( @@ -203,7 +285,7 @@ async def generate_embedding( await vector_service.upsert(chunks=chunks_with_embeddings) except Exception as e: logger.error(f"Error upserting embeddings: {e}") - raise Exception(f"Error upserting embeddings: {e}") + raise return chunks_with_embeddings @@ -240,16 +322,3 @@ async def safe_completion(document: BaseDocumentChunk) -> BaseDocumentChunk: pbar.close() return summary_documents - - -def get_encoder(*, encoder_config: Encoder) -> BaseEncoder: - encoder_mapping = { - EncoderEnum.cohere: CohereEncoder, - EncoderEnum.openai: OpenAIEncoder, - } - encoder_provider = encoder_config.type - encoder = encoder_config.name - encoder_class = encoder_mapping.get(encoder_provider) - if encoder_class is None: - raise ValueError(f"Unsupported provider: {encoder_provider}") - return encoder_class(name=encoder) diff --git a/service/ingest.py b/service/ingest.py index 39a648fd..407342a4 100644 --- a/service/ingest.py +++ b/service/ingest.py @@ -2,15 +2,18 @@ from models.file import File from models.google_drive import GoogleDrive +from models.ingest import DocumentProcessorConfig from service.embedding import EmbeddingService async def handle_urls( + *, embedding_service: EmbeddingService, files: List[File], + config: DocumentProcessorConfig ): embedding_service.files = files - chunks = await embedding_service.generate_chunks() + chunks = await embedding_service.generate_chunks(config=config) summary_documents = await embedding_service.generate_summary_documents( documents=chunks ) diff --git a/service/router.py b/service/router.py index 9511b165..2bacf49f 100644 --- a/service/router.py +++ b/service/router.py @@ -8,7 +8,6 @@ from models.document import BaseDocumentChunk from models.query import RequestPayload from service.code_interpreter import CodeInterpreterService -from service.embedding import get_encoder from utils.logger import logger from utils.summarise import SUMMARY_SUFFIX from vectordbs import BaseVectorDatabase, get_vector_service @@ -68,7 +67,7 @@ async def get_documents( async def query(payload: RequestPayload) -> list[BaseDocumentChunk]: rl = create_route_layer() decision = rl(payload.input).name - encoder = get_encoder(encoder_config=payload.encoder) + encoder = payload.encoder.get_encoder() if decision == "summarize": vector_service: BaseVectorDatabase = get_vector_service( diff --git a/service/splitter.py b/service/splitter.py new file mode 100644 index 00000000..b044144d --- /dev/null +++ b/service/splitter.py @@ -0,0 +1,129 @@ +import re +from typing import Any + +from colorama import Fore, Style +from semantic_router.encoders import BaseEncoder +from semantic_router.splitters import RollingWindowSplitter + + +class UnstructuredSemanticSplitter: + def __init__( + self, + encoder: BaseEncoder, + window_size: int, + min_split_tokens: int, + max_split_tokens: int, + ): + self.splitter = RollingWindowSplitter( + encoder=encoder, + window_size=window_size, + min_split_tokens=min_split_tokens, + max_split_tokens=max_split_tokens, + ) + + def is_valid_title(self, title: str) -> bool: + # Rule 1: Title starts with a lowercase letter + if re.match(r"^[a-z]", title): + return False + # Rule 2: Title has a special character (excluding :, -, and .) + if re.search(r"[^\w\s:\-\.]", title): + return False + # Rule 3: Title ends with a dot + if title.endswith("."): + return False + return True + + def _group_elements_by_title(self, elements: list[dict[str, Any]]) -> dict: + grouped_elements = {} + current_title = "Untitled" # Default title for initial text without a title + + for element in elements: + if element.get("type") == "Title": + potential_title = element.get("text", "Untitled") + if self.is_valid_title(potential_title): + print(f"{Fore.GREEN}{potential_title}: True{Style.RESET_ALL}") + current_title = potential_title + else: + print(f"{Fore.RED}{potential_title}: False{Style.RESET_ALL}") + continue + else: + if current_title not in grouped_elements: + grouped_elements[current_title] = [] + else: + grouped_elements[current_title].append(element) + return grouped_elements + + async def split_grouped_elements( + self, elements: list[dict[str, Any]], splitter: RollingWindowSplitter + ) -> list[dict[str, Any]]: + grouped_elements = self._group_elements_by_title(elements) + chunks_with_title = [] + + def _append_chunks(*, title: str, content: str, index: int, metadata: dict): + chunks_with_title.append( + { + "title": title, + "content": content, + "chunk_index": index, + "metadata": metadata, + } + ) + + for index, (title, elements) in enumerate(grouped_elements.items()): + if not elements: + continue + section_metadata = elements[0].get( + "metadata", {} + ) # Took first element's data + accumulated_element_texts: list[str] = [] + chunks: list[dict[str, Any]] = [] + + for element in elements: + if not element.get("text"): + continue + if element.get("type") == "Table": + # Process accumulated text before the table + if accumulated_element_texts: + splits = splitter(accumulated_element_texts) + for split in splits: + _append_chunks( + title=title, + content=split.content, + index=index, + metadata=section_metadata, + ) + # TODO: reset after PageBreak also + accumulated_element_texts = ( + [] + ) # Start new accumulation after table + + # Add table as a separate chunk + _append_chunks( + title=title, + content=element.get("metadata", {}).get( + "text_as_html", "No text" + ), + index=index, + metadata=element.get("metadata", {}), + ) + else: + accumulated_element_texts.append(element.get("text", "No text")) + + # Process any remaining accumulated text after the last table + # or if no table was encountered + + if accumulated_element_texts: + splits = splitter(accumulated_element_texts) + for split in splits: + _append_chunks( + title=title, + content=split.content, + index=index, + metadata=section_metadata, + ) + if chunks: + chunks_with_title.extend(chunks) + return chunks_with_title + + async def __call__(self, elements: list[dict[str, Any]]) -> list[dict[str, Any]]: + return await self.split_grouped_elements(elements, self.splitter) diff --git a/utils/file.py b/utils/file.py index 1634e80d..0582d163 100644 --- a/utils/file.py +++ b/utils/file.py @@ -2,6 +2,7 @@ from urllib.parse import urlparse +# TODO: Can be removed def get_file_extension_from_url(url: str) -> str: """ Extracts the file extension from a given URL. diff --git a/vectordbs/pinecone.py b/vectordbs/pinecone.py index 606b339e..6077397f 100644 --- a/vectordbs/pinecone.py +++ b/vectordbs/pinecone.py @@ -22,6 +22,9 @@ def __init__( ) pinecone = Pinecone(api_key=credentials["api_key"]) if index_name not in [index.name for index in pinecone.list_indexes()]: + logger.info( + f"Creating new index {index_name}, dimension {dimension} in Pinecone" + ) pinecone.create_index( name=self.index_name, dimension=dimension, @@ -31,28 +34,24 @@ def __init__( self.index = pinecone.Index(name=self.index_name) # TODO: add batch size - async def upsert(self, chunks: List[BaseDocumentChunk]): + async def upsert(self, chunks: List[BaseDocumentChunk], batch_size: int = 100): if self.index is None: raise ValueError(f"Pinecone index {self.index_name} is not initialized.") + try: + logger.info(f"Upserting {len(chunks)} chunks into index {self.index_name}") + # Prepare and upsert documents to Pinecone in batches + for i in tqdm(range(0, len(chunks), batch_size)): + i_end = min(i + batch_size, len(chunks)) + chunks_batch = chunks[i:i_end] + to_upsert = [chunk.to_vector_db() for chunk in chunks_batch] + self.index.upsert(vectors=to_upsert) + logger.info(f"Upserted {len(chunks_batch)} chunks into Pinecone") - # Prepare the data for upserting into Pinecone - vectors_to_upsert = [] - for chunk in tqdm( - chunks, - desc=f"Upserting {len(chunks)} chunks to Pinecone index {self.index_name}", - ): - vector_data = { - "id": chunk.id, - "values": chunk.dense_embedding, - "metadata": { - "document_id": chunk.document_id, - "content": chunk.content, - "doc_url": chunk.doc_url, - **(chunk.metadata if chunk.metadata else {}), - }, - } - vectors_to_upsert.append(vector_data) - self.index.upsert(vectors=vectors_to_upsert) + # Check that we have all vectors in index + return self.index.describe_index_stats() + except Exception as e: + logger.error(f"Error in embedding documents: {e}") + raise async def query( self, input: str, top_k: int = 25, include_metadata: bool = True @@ -65,24 +64,15 @@ async def query( top_k=top_k, include_metadata=include_metadata, ) - document_chunks = [] - for match in results["matches"]: - document_chunk = BaseDocumentChunk( - id=match["id"], - document_id=match["metadata"].get("document_id", ""), - content=match["metadata"]["content"], - doc_url=match["metadata"].get("source", ""), - page_number=str( - match["metadata"].get("page_number", "") - ), # TODO: is this correct? - metadata={ - key: value - for key, value in match["metadata"].items() - if key not in ["content", "file_url"] - }, - ) - document_chunks.append(document_chunk) - return document_chunks + chunks = [] + if results.get("matches"): + for match in results["matches"]: + chunk = BaseDocumentChunk.from_metadata(metadata=match["metadata"]) + chunks.append(chunk) + return chunks + else: + logger.warning(f"No matches found for the given query '{input}'") + return [] async def delete(self, file_url: str) -> DeleteResponse: if self.index is None: diff --git a/vectordbs/qdrant.py b/vectordbs/qdrant.py index 7d43dc36..2be69597 100644 --- a/vectordbs/qdrant.py +++ b/vectordbs/qdrant.py @@ -80,6 +80,8 @@ async def query(self, input: str, top_k: int = MAX_QUERY_TOP_K) -> List: return [ BaseDocumentChunk( id=result.id, + source_type=result.payload.get("filetype"), + source=result.payload.get("doc_url"), document_id=result.payload.get("document_id"), content=result.payload.get("content"), doc_url=result.payload.get("doc_url"),