diff --git a/js/sdk/package-lock.json b/js/sdk/package-lock.json index 49372c60f..ce4d6d74d 100644 --- a/js/sdk/package-lock.json +++ b/js/sdk/package-lock.json @@ -1,6 +1,6 @@ { "name": "r2r-js", - "version": "0.3.15", + "version": "0.3.16", "lockfileVersion": 3, "requires": true, "packages": { diff --git a/js/sdk/package.json b/js/sdk/package.json index 0b6452ce5..2f42195d9 100644 --- a/js/sdk/package.json +++ b/js/sdk/package.json @@ -1,6 +1,6 @@ { "name": "r2r-js", - "version": "0.3.15", + "version": "0.3.16", "description": "", "main": "dist/index.js", "browser": "dist/index.browser.js", diff --git a/js/sdk/src/r2rClient.ts b/js/sdk/src/r2rClient.ts index f60730402..3e3a9e56f 100644 --- a/js/sdk/src/r2rClient.ts +++ b/js/sdk/src/r2rClient.ts @@ -1921,41 +1921,24 @@ export class r2rClient { /** * Search over documents. * @param query The query to search for. - * @param settings Settings for the document search. + * @param vector_search_settings Settings for the document search. * @returns A promise that resolves to the response from the server. */ @feature("searchDocuments") async searchDocuments( query: string, - settings?: { - searchOverMetadata?: boolean; - metadataKeys?: string[]; - searchOverBody?: boolean; - filters?: Record; - searchFilters?: Record; - offset?: number; - limit?: number; - titleWeight?: number; - metadataWeight?: number; - }, + vector_search_settings?: VectorSearchSettings | Record, ): Promise { this._ensureAuthenticated(); - const json_data: Record = { query, - settings: { - search_over_metadata: settings?.searchOverMetadata ?? true, - metadata_keys: settings?.metadataKeys ?? ["title"], - search_over_body: settings?.searchOverBody ?? false, - filters: settings?.filters ?? {}, - search_filters: settings?.searchFilters ?? {}, - offset: settings?.offset ?? 0, - limit: settings?.limit ?? 10, - title_weight: settings?.titleWeight ?? 0.5, - metadata_weight: settings?.metadataWeight ?? 0.5, - }, + vector_search_settings, }; + Object.keys(json_data).forEach( + (key) => json_data[key] === undefined && delete json_data[key], + ); + return await this._makeRequest("POST", "search_documents", { data: json_data, }); diff --git a/py/core/main/api/ingestion_router.py b/py/core/main/api/ingestion_router.py index 157b0937d..6a53076e3 100644 --- a/py/core/main/api/ingestion_router.py +++ b/py/core/main/api/ingestion_router.py @@ -11,10 +11,10 @@ Depends, File, Form, + HTTPException, Path, Query, UploadFile, - HTTPException, ) from pydantic import Json diff --git a/py/core/main/app.py b/py/core/main/app.py index 10cf5e7be..5fc6ec16c 100644 --- a/py/core/main/app.py +++ b/py/core/main/app.py @@ -1,11 +1,11 @@ from typing import Union -from core.base import R2RException from fastapi import FastAPI, Request -from fastapi.responses import JSONResponse from fastapi.middleware.cors import CORSMiddleware from fastapi.openapi.utils import get_openapi +from fastapi.responses import JSONResponse +from core.base import R2RException from core.providers import ( HatchetOrchestrationProvider, SimpleOrchestrationProvider, diff --git a/py/core/main/app_entry.py b/py/core/main/app_entry.py index 836b7b409..5328e637a 100644 --- a/py/core/main/app_entry.py +++ b/py/core/main/app_entry.py @@ -5,10 +5,11 @@ from typing import Optional from apscheduler.schedulers.asyncio import AsyncIOScheduler -from core.base import R2RException from fastapi import FastAPI, Request -from fastapi.responses import JSONResponse from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import JSONResponse + +from core.base import R2RException from .assembly import R2RBuilder, R2RConfig diff --git a/py/core/main/services/ingestion_service.py b/py/core/main/services/ingestion_service.py index fea3f44c2..f1a45cb86 100644 --- a/py/core/main/services/ingestion_service.py +++ b/py/core/main/services/ingestion_service.py @@ -251,6 +251,9 @@ async def augment_document_info( document_info.summary = response.choices[0].message.content # type: ignore + if not document_info.summary: + raise ValueError("Expected a generated response.") + embedding = await self.providers.embedding.async_get_embedding( text=document_info.summary, ) diff --git a/py/core/main/services/kg_service.py b/py/core/main/services/kg_service.py index 9d0e56dc5..c80e60ecc 100644 --- a/py/core/main/services/kg_service.py +++ b/py/core/main/services/kg_service.py @@ -3,6 +3,7 @@ import time from typing import AsyncGenerator, Optional from uuid import UUID + from fastapi import HTTPException from core.base import KGExtractionStatus, RunManager diff --git a/py/core/pipes/kg/deduplication.py b/py/core/pipes/kg/deduplication.py index f7ff24f75..441167610 100644 --- a/py/core/pipes/kg/deduplication.py +++ b/py/core/pipes/kg/deduplication.py @@ -2,6 +2,7 @@ import logging from typing import Any, Union from uuid import UUID + from fastapi import HTTPException from core.base import AsyncState diff --git a/py/core/pipes/kg/prompt_tuning.py b/py/core/pipes/kg/prompt_tuning.py index f1763f858..7a1274d5b 100644 --- a/py/core/pipes/kg/prompt_tuning.py +++ b/py/core/pipes/kg/prompt_tuning.py @@ -5,6 +5,7 @@ import logging from typing import Any from uuid import UUID + from fastapi import HTTPException from core.base import ( diff --git a/py/core/providers/auth/r2r_auth.py b/py/core/providers/auth/r2r_auth.py index 69043dfc6..f060591e5 100644 --- a/py/core/providers/auth/r2r_auth.py +++ b/py/core/providers/auth/r2r_auth.py @@ -1,10 +1,9 @@ import logging import os from datetime import datetime, timedelta, timezone -from fastapi import HTTPException import jwt -from fastapi import Depends +from fastapi import Depends, HTTPException from fastapi.security import OAuth2PasswordBearer from core.base import ( diff --git a/py/core/providers/database/collection.py b/py/core/providers/database/collection.py index 7acec6588..e577a29ab 100644 --- a/py/core/providers/database/collection.py +++ b/py/core/providers/database/collection.py @@ -3,6 +3,7 @@ from datetime import datetime from typing import Optional, Union from uuid import UUID, uuid4 + from fastapi import HTTPException from core.base import ( diff --git a/py/core/providers/database/document.py b/py/core/providers/database/document.py index 2f355ead9..a883831f6 100644 --- a/py/core/providers/database/document.py +++ b/py/core/providers/database/document.py @@ -197,6 +197,12 @@ async def upsert_documents_overview( else: wait_time = 0.1 * (2**retries) # Exponential backoff await asyncio.sleep(wait_time) + except Exception as e: + if 'column "summary"' in str(e): + raise ValueError( + "Document schema is missing 'summary' and 'summary_embedding' columns. Call `r2r db upgrade` to carry out the necessary migration." + ) + raise async def delete_from_documents_overview( self, document_id: UUID, version: Optional[str] = None diff --git a/py/core/providers/database/file.py b/py/core/providers/database/file.py index dc12f0ffe..df5496303 100644 --- a/py/core/providers/database/file.py +++ b/py/core/providers/database/file.py @@ -2,9 +2,9 @@ import logging from typing import BinaryIO, Optional, Union from uuid import UUID -from fastapi import HTTPException import asyncpg +from fastapi import HTTPException from core.base import FileHandler, R2RException diff --git a/py/core/providers/database/kg.py b/py/core/providers/database/kg.py index fac3882d8..b233041dd 100644 --- a/py/core/providers/database/kg.py +++ b/py/core/providers/database/kg.py @@ -3,10 +3,10 @@ import time from typing import Any, AsyncGenerator, Optional, Tuple from uuid import UUID -from fastapi import HTTPException import asyncpg from asyncpg.exceptions import PostgresError, UndefinedTableError +from fastapi import HTTPException from core.base import ( CommunityReport, diff --git a/py/core/providers/database/user.py b/py/core/providers/database/user.py index 41654ff04..0c45d761a 100644 --- a/py/core/providers/database/user.py +++ b/py/core/providers/database/user.py @@ -1,6 +1,7 @@ from datetime import datetime from typing import Optional, Union from uuid import UUID + from fastapi import HTTPException from core.base import CryptoProvider, UserHandler diff --git a/py/migrations/versions/2fac23e4d91b_migrate_to_document_search.py b/py/migrations/versions/2fac23e4d91b_migrate_to_document_search.py index a69eca57a..697d19009 100644 --- a/py/migrations/versions/2fac23e4d91b_migrate_to_document_search.py +++ b/py/migrations/versions/2fac23e4d91b_migrate_to_document_search.py @@ -15,7 +15,7 @@ import sqlalchemy as sa from alembic import op from openai import AsyncOpenAI -from sqlalchemy.dialects import postgresql +from sqlalchemy import inspect from sqlalchemy.types import UserDefinedType from r2r import R2RAsyncClient @@ -26,8 +26,17 @@ branch_labels: Union[str, Sequence[str], None] = None depends_on: Union[str, Sequence[str], None] = None -project_name = os.getenv("R2R_PROJECT_NAME") or "r2r_default" -dimension = 512 # OpenAI's embedding dimension +project_name = os.getenv("R2R_PROJECT_NAME") +if not project_name: + raise ValueError( + "Environment variable `R2R_PROJECT_NAME` must be provided migrate, it should be set equal to the value of `project_name` in your `r2r.toml`." + ) + +dimension = os.getenv("R2R_EMBEDDING_DIMENSION") +if not dimension: + raise ValueError( + "Environment variable `R2R_EMBEDDING_DIMENSION` must be provided migrate, it must should be set equal to the value of `base_dimension` in your `r2r.toml`." + ) class Vector(UserDefinedType): @@ -47,7 +56,7 @@ async def async_generate_all_summaries(): base_url = os.getenv("R2R_BASE_URL") if not base_url: raise ValueError( - "Environment variable `R2R_BASE_URL` must be provided, e.g. `http://localhost:7272`." + "Environment variable `R2R_BASE_URL` must be provided, it must point at the R2R deployment you wish to migrate, e.g. `http://localhost:7272`." ) print(f"Using R2R Base URL: {base_url})") @@ -55,21 +64,12 @@ async def async_generate_all_summaries(): base_model = os.getenv("R2R_BASE_MODEL") if not base_model: raise ValueError( - "Environment variable `R2R_BASE_MODEL` must be provided, e.g. `openai/gpt-4o-mini`." + "Environment variable `R2R_BASE_MODEL` must be provided, e.g. `openai/gpt-4o-mini`, it will be used for generating document summaries during migration." ) - print(f"Using R2R Base Model: {base_url})") - - embedding_model = os.getenv("R2R_EMBEDDING_MODEL") - if not base_model or "openai" not in embedding_model: - raise ValueError( - "Environment variable `R2R_EMBEDDING_MODEL` must be provided, e.g. `openai/text-embedding-3-small`, and must point to an OpenAI embedding model." - ) - embedding_model = embedding_model.split("openai/")[-1] - print(f"Using R2R Embedding Model: {embedding_model})") + print(f"Using R2R Base Model: {base_model}") client = R2RAsyncClient(base_url) - openai_client = AsyncOpenAI() offset = 0 limit = 1_000 @@ -157,10 +157,11 @@ async def async_generate_all_summaries(): summary_text = summary["results"]["choices"][0]["message"][ "content" ] - embedding_response = await openai_client.embeddings.create( - model=embedding_model, input=summary_text, dimensions=dimension - ) - embedding_vector = embedding_response.data[0].embedding + embedding_vector = client.embedding(summary_text)["results"][0] + # embedding_response = await openai_client.embeddings.create( + # model=embedding_model, input=summary_text, dimensions=dimension + # ) + # embedding_vector = embedding_response.data[0].embedding # Store in our results dictionary document_summaries[doc_id] = { @@ -187,87 +188,117 @@ def generate_all_summaries(): return run_async(async_generate_all_summaries()) -def upgrade() -> None: - # Load the document summaries - generate_all_summaries() - try: - with open("document_summaries.json", "r") as f: - document_summaries = json.load(f) - print(f"Loaded {len(document_summaries)} document summaries") - except FileNotFoundError: - raise ValueError( - "document_summaries.json not found. Please run the summary generation script first." +def check_if_upgrade_needed(): + """Check if the upgrade has already been applied by checking for summary column""" + # Get database connection + connection = op.get_bind() + inspector = inspect(connection) + + # Check if the columns exist + existing_columns = [ + col["name"] + for col in inspector.get_columns(f"document_info", schema=project_name) + ] + + needs_upgrade = "summary" not in existing_columns + + if needs_upgrade: + print( + "Migration needed: 'summary' column does not exist in document_info table" + ) + else: + print( + "Migration not needed: 'summary' column already exists in document_info table" ) - except json.JSONDecodeError: - raise ValueError("Invalid document_summaries.json file") - # Create the vector extension if it doesn't exist - op.execute("CREATE EXTENSION IF NOT EXISTS vector") + return needs_upgrade - # Add new columns to document_info - op.add_column( - "document_info", - sa.Column("summary", sa.Text(), nullable=True), - schema=project_name, - ) - op.add_column( - "document_info", - sa.Column("summary_embedding", Vector, nullable=True), - schema=project_name, - ) +def upgrade() -> None: + if check_if_upgrade_needed(): + # Load the document summaries + generate_all_summaries() + try: + with open("document_summaries.json", "r") as f: + document_summaries = json.load(f) + print(f"Loaded {len(document_summaries)} document summaries") + except FileNotFoundError: + raise ValueError( + "document_summaries.json not found. Please run the summary generation script first." + ) + except json.JSONDecodeError: + raise ValueError("Invalid document_summaries.json file") - # Add generated column for full text search - op.execute( - f""" - ALTER TABLE {project_name}.document_info - ADD COLUMN doc_search_vector tsvector - GENERATED ALWAYS AS ( - setweight(to_tsvector('english', COALESCE(title, '')), 'A') || - setweight(to_tsvector('english', COALESCE(summary, '')), 'B') || - setweight(to_tsvector('english', COALESCE((metadata->>'description')::text, '')), 'C') - ) STORED; - """ - ) + # Create the vector extension if it doesn't exist + op.execute("CREATE EXTENSION IF NOT EXISTS vector") - # Create index for full text search - op.execute( - f""" - CREATE INDEX idx_doc_search_{project_name} - ON {project_name}.document_info - USING GIN (doc_search_vector); - """ - ) + # Add new columns to document_info + op.add_column( + "document_info", + sa.Column("summary", sa.Text(), nullable=True), + schema=project_name, + ) + + op.add_column( + "document_info", + sa.Column("summary_embedding", Vector, nullable=True), + schema=project_name, + ) - # Update existing documents with summaries and embeddings - for doc_id, doc_data in document_summaries.items(): - # Convert the embedding array to the PostgreSQL vector format - embedding_str = f"[{','.join(str(x) for x in doc_data['embedding'])}]" + # Add generated column for full text search + op.execute( + f""" + ALTER TABLE {project_name}.document_info + ADD COLUMN doc_search_vector tsvector + GENERATED ALWAYS AS ( + setweight(to_tsvector('english', COALESCE(title, '')), 'A') || + setweight(to_tsvector('english', COALESCE(summary, '')), 'B') || + setweight(to_tsvector('english', COALESCE((metadata->>'description')::text, '')), 'C') + ) STORED; + """ + ) - # Use plain SQL with proper escaping for PostgreSQL + # Create index for full text search op.execute( f""" - UPDATE {project_name}.document_info - SET - summary = '{doc_data['summary'].replace("'", "''")}', - summary_embedding = '{embedding_str}'::vector({dimension}) - WHERE document_id = '{doc_id}'::uuid; - """ + CREATE INDEX idx_doc_search_{project_name} + ON {project_name}.document_info + USING GIN (doc_search_vector); + """ ) + # Update existing documents with summaries and embeddings + for doc_id, doc_data in document_summaries.items(): + # Convert the embedding array to the PostgreSQL vector format + embedding_str = ( + f"[{','.join(str(x) for x in doc_data['embedding'])}]" + ) + + # Use plain SQL with proper escaping for PostgreSQL + op.execute( + f""" + UPDATE {project_name}.document_info + SET + summary = '{doc_data['summary'].replace("'", "''")}', + summary_embedding = '{embedding_str}'::vector({dimension}) + WHERE document_id = '{doc_id}'::uuid; + """ + ) + def downgrade() -> None: - # Drop the full text search index + # First drop any dependencies on the columns we want to remove op.execute( f""" - DROP INDEX IF EXISTS {project_name}.idx_doc_search_{project_name}; - """ - ) + -- Drop the full text search index first + DROP INDEX IF EXISTS {project_name}.idx_doc_search_{project_name}; - # Remove the generated column (this will automatically remove dependencies) - op.drop_column("document_info", "doc_search_vector", schema=project_name) + -- Drop the generated column that depends on the summary column + ALTER TABLE {project_name}.document_info + DROP COLUMN IF EXISTS doc_search_vector; + """ + ) - # Remove the summary and embedding columns + # Now we can safely drop the summary and embedding columns op.drop_column("document_info", "summary_embedding", schema=project_name) - op.drop_column("document_info", "summary", schema=project_name) diff --git a/py/migrations/versions/d342e632358a_migrate_to_asyncpg.py b/py/migrations/versions/d342e632358a_migrate_to_asyncpg.py index fe10b0b48..d2a1a7013 100644 --- a/py/migrations/versions/d342e632358a_migrate_to_asyncpg.py +++ b/py/migrations/versions/d342e632358a_migrate_to_asyncpg.py @@ -1,284 +1,196 @@ -"""migrate_to_document_search +"""migrate_to_asyncpg -Revision ID: 2fac23e4d91b +Revision ID: d342e632358a Revises: -Create Date: 2024-11-11 11:55:49.461015 +Create Date: 2024-10-22 11:55:49.461015 """ -import asyncio -import json import os -from concurrent.futures import ThreadPoolExecutor from typing import Sequence, Union import sqlalchemy as sa from alembic import op -from openai import AsyncOpenAI +from sqlalchemy import inspect from sqlalchemy.dialects import postgresql from sqlalchemy.types import UserDefinedType -from r2r import R2RAsyncClient - # revision identifiers, used by Alembic. revision: str = "d342e632358a" down_revision: Union[str, None] = None branch_labels: Union[str, Sequence[str], None] = None depends_on: Union[str, Sequence[str], None] = None + project_name = os.getenv("R2R_PROJECT_NAME") or "r2r_default" -dimension = 512 # OpenAI's embedding dimension + +new_vector_table_name = "vectors" +old_vector_table_name = project_name class Vector(UserDefinedType): def get_col_spec(self, **kw): - return f"vector({dimension})" - + return "vector" -def run_async(coroutine): - """Helper function to run async code synchronously""" - with ThreadPoolExecutor() as pool: - return pool.submit(asyncio.run, coroutine).result() +def check_if_upgrade_needed(): + """Check if the upgrade has already been applied""" + # Get database connection + connection = op.get_bind() + inspector = inspect(connection) -async def async_generate_all_summaries(): - """Asynchronous function to generate summaries""" + # Check if the new vectors table exists + has_new_table = inspector.has_table( + new_vector_table_name, schema=project_name + ) - base_url = os.getenv("R2R_BASE_URL") - if not base_url: - raise ValueError( - "Environment variable `R2R_BASE_URL` must be provided, e.g. `http://localhost:7272`." + if has_new_table: + print( + f"Migration not needed: '{new_vector_table_name}' table already exists" ) + return False - print(f"Using R2R Base URL: {base_url})") - - base_model = os.getenv("R2R_BASE_MODEL") - if not base_model: - raise ValueError( - "Environment variable `R2R_BASE_MODEL` must be provided, e.g. `openai/gpt-4o-mini`." - ) + print(f"Migration needed: '{new_vector_table_name}' table does not exist") + return True - print(f"Using R2R Base Model: {base_url})") - embedding_model = os.getenv("R2R_EMBEDDING_MODEL") - if not base_model or "openai" not in embedding_model: - raise ValueError( - "Environment variable `R2R_EMBEDDING_MODEL` must be provided, e.g. `openai/text-embedding-3-small`, and must point to an OpenAI embedding model." +def upgrade() -> None: + if check_if_upgrade_needed(): + # Create required extensions + op.execute("CREATE EXTENSION IF NOT EXISTS vector") + op.execute("CREATE EXTENSION IF NOT EXISTS pg_trgm") + op.execute("CREATE EXTENSION IF NOT EXISTS btree_gin") + + # KG table migrations + op.execute( + f"ALTER TABLE IF EXISTS {project_name}.entity_raw RENAME TO chunk_entity" + ) + op.execute( + f"ALTER TABLE IF EXISTS {project_name}.triple_raw RENAME TO chunk_triple" + ) + op.execute( + f"ALTER TABLE IF EXISTS {project_name}.entity_embedding RENAME TO document_entity" + ) + op.execute( + f"ALTER TABLE IF EXISTS {project_name}.community RENAME TO community_info" ) - embedding_model = embedding_model.split("openai/")[-1] - print(f"Using R2R Embedding Model: {embedding_model})") - - client = R2RAsyncClient(base_url) - openai_client = AsyncOpenAI() - - offset = 0 - limit = 1_000 - documents = (await client.documents_overview(offset=offset, limit=limit))[ - "results" - ] - while len(documents) == limit: - limit += offset - documents += ( - await client.documents_overview(offset=offset, limit=limit) - )["results"] - - # Load existing summaries if they exist - document_summaries = {} - if os.path.exists("document_summaries.json"): - try: - with open("document_summaries.json", "r") as f: - document_summaries = json.load(f) - print( - f"Loaded {len(document_summaries)} existing document summaries" - ) - except json.JSONDecodeError: - print( - "Existing document_summaries.json was invalid, starting fresh" - ) - document_summaries = {} - - for document in documents: - title = document["title"] - doc_id = str( - document["id"] - ) # Convert UUID to string for JSON compatibility - - # Skip if document already has a summary - if doc_id in document_summaries: - print( - f"Skipping document {title} ({doc_id}) - summary already exists" - ) - continue - - print(f"Processing document: {title} ({doc_id})") - - try: - document_text = f"Document Title:{title}\n" - if document["metadata"]: - metadata = json.dumps(document["metadata"]) - document_text += f"Document Metadata:\n{metadata}\n" - - full_chunks = ( - await client.document_chunks(document["id"], limit=10) - )["results"] - - document_text += "Document Content:\n" - - for chunk in full_chunks: - document_text += chunk["text"] - - summary_prompt = """## Task: - - Your task is to generate a descriptive summary of the document that follows. Your objective is to return a summary that is roughly 10% of the input document size while retaining as many key points as possible. Your response should begin with `The document contains `. - - ### Document: - - {document} - - - ### Query: - - Reminder: Your task is to generate a descriptive summary of the document that was given. Your objective is to return a summary that is roughly 10% of the input document size while retaining as many key points as possible. Your response should begin with `The document contains `. - - ## Response:""" - messages = [ - { - "role": "user", - "content": summary_prompt.format( - **{"document": document_text} - ), - } - ] - print("Making completion") - summary = await client.completion( - messages=messages, generation_config={"model": base_model} - ) - summary_text = summary["results"]["choices"][0]["message"][ - "content" - ] - embedding_response = await openai_client.embeddings.create( - model=embedding_model, input=summary_text, dimensions=dimension - ) - embedding_vector = embedding_response.data[0].embedding + # Create the new table + op.create_table( + new_vector_table_name, + sa.Column("extraction_id", postgresql.UUID(), nullable=False), + sa.Column("document_id", postgresql.UUID(), nullable=False), + sa.Column("user_id", postgresql.UUID(), nullable=False), + sa.Column( + "collection_ids", + postgresql.ARRAY(postgresql.UUID()), + server_default="{}", + ), + sa.Column("vec", Vector), # This will be handled as a vector type + sa.Column("text", sa.Text(), nullable=True), + sa.Column( + "fts", + postgresql.TSVECTOR, + nullable=False, + server_default=sa.text( + "to_tsvector('english'::regconfig, '')" + ), + ), + sa.Column( + "metadata", + postgresql.JSONB(), + server_default="{}", + nullable=False, + ), + sa.PrimaryKeyConstraint("extraction_id"), + schema=project_name, + ) - # Store in our results dictionary - document_summaries[doc_id] = { - "summary": summary_text, - "embedding": embedding_vector, - } + # Create indices + op.create_index( + "idx_vectors_document_id", + new_vector_table_name, + ["document_id"], + schema=project_name, + ) - # Save after each document - with open("document_summaries.json", "w") as f: - json.dump(document_summaries, f) + op.create_index( + "idx_vectors_user_id", + new_vector_table_name, + ["user_id"], + schema=project_name, + ) - print(f"Successfully processed document {doc_id}") + op.create_index( + "idx_vectors_collection_ids", + new_vector_table_name, + ["collection_ids"], + schema=project_name, + postgresql_using="gin", + ) - except Exception as e: - print(f"Error processing document {doc_id}: {str(e)}") - # Continue with next document instead of failing - continue + op.create_index( + "idx_vectors_fts", + new_vector_table_name, + ["fts"], + schema=project_name, + postgresql_using="gin", + ) - return document_summaries + # Migrate data from old table (assuming old table name is 'old_vectors') + # Note: You'll need to replace 'old_schema' and 'old_vectors' with your actual names + op.execute( + f""" + INSERT INTO {project_name}.{new_vector_table_name} + (extraction_id, document_id, user_id, collection_ids, vec, text, metadata) + SELECT + extraction_id, + document_id, + user_id, + collection_ids, + vec, + text, + metadata + FROM {project_name}.{old_vector_table_name} + """ + ) + # Verify data migration + op.execute( + f""" + SELECT COUNT(*) old_count FROM {project_name}.{old_vector_table_name}; + SELECT COUNT(*) new_count FROM {project_name}.{new_vector_table_name}; + """ + ) -def generate_all_summaries(): - """Synchronous wrapper for async_generate_all_summaries""" - return run_async(async_generate_all_summaries()) + # If we get here, migration was successful, so drop the old table + op.execute( + f""" + DROP TABLE IF EXISTS {project_name}.{old_vector_table_name}; + """ + ) -def upgrade() -> None: - # First create vector extension if it doesn't exist - op.execute("CREATE EXTENSION IF NOT EXISTS vector") +def downgrade() -> None: + # Drop all indices + op.drop_index("idx_vectors_fts", schema=project_name) + op.drop_index("idx_vectors_collection_ids", schema=project_name) + op.drop_index("idx_vectors_user_id", schema=project_name) + op.drop_index("idx_vectors_document_id", schema=project_name) - # Add new columns with NULL constraint explicitly - op.add_column( - "document_info", - sa.Column("summary", sa.Text(), nullable=True), - schema=project_name, - ) + # Drop the new table + op.drop_table(new_vector_table_name, schema=project_name) - op.add_column( - "document_info", - sa.Column("summary_embedding", Vector, nullable=True), - schema=project_name, + # Revert KG table migrations + op.execute( + f"ALTER TABLE IF EXISTS {project_name}.chunk_entity RENAME TO entity_raw" ) - - # Add generated column for full text search with COALESCE to handle NULLs op.execute( - f""" - ALTER TABLE {project_name}.document_info - ADD COLUMN doc_search_vector tsvector - GENERATED ALWAYS AS ( - setweight(to_tsvector('english', COALESCE(title, '')), 'A') || - setweight(to_tsvector('english', COALESCE(summary, '')), 'B') || - setweight(to_tsvector('english', COALESCE((metadata->>'description')::text, '')), 'C') - ) STORED; - """ + f"ALTER TABLE IF EXISTS {project_name}.chunk_triple RENAME TO triple_raw" ) - - # Create index for full text search op.execute( - f""" - CREATE INDEX idx_doc_search_{project_name} - ON {project_name}.document_info - USING GIN (doc_search_vector); - """ + f"ALTER TABLE IF EXISTS {project_name}.document_entity RENAME TO entity_embedding" ) - - # Generate summaries after columns are created - generate_all_summaries() - - try: - with open("document_summaries.json", "r") as f: - document_summaries = json.load(f) - print(f"Loaded {len(document_summaries)} document summaries") - except FileNotFoundError: - print("No document summaries found, skipping updates") - return - except json.JSONDecodeError: - print("Invalid document_summaries.json file, skipping updates") - return - - # Update existing documents with summaries and embeddings - for doc_id, doc_data in document_summaries.items(): - try: - # Convert the embedding array to the PostgreSQL vector format - embedding_str = ( - f"[{','.join(str(x) for x in doc_data['embedding'])}]" - ) - - # Use parameterized query to handle escaping properly - op.execute( - f""" - UPDATE {project_name}.document_info - SET - summary = :summary, - summary_embedding = :embedding::vector({dimension}) - WHERE document_id = :doc_id::uuid; - """, - { - "summary": doc_data["summary"], - "embedding": embedding_str, - "doc_id": doc_id, - }, - ) - except Exception as e: - print(f"Error updating document {doc_id}: {str(e)}") - continue - - -def downgrade() -> None: - # Drop the full text search index first op.execute( - f""" - DROP INDEX IF EXISTS {project_name}.idx_doc_search_{project_name}; - """ + f"ALTER TABLE IF EXISTS {project_name}.community_info RENAME TO community" ) - - # Remove the generated column (this will automatically remove dependencies) - op.drop_column("document_info", "doc_search_vector", schema=project_name) - - # Remove the summary and embedding columns - op.drop_column("document_info", "summary_embedding", schema=project_name) - op.drop_column("document_info", "summary", schema=project_name) diff --git a/py/sdk/mixins/ingestion.py b/py/sdk/mixins/ingestion.py index 758821e9c..980036e03 100644 --- a/py/sdk/mixins/ingestion.py +++ b/py/sdk/mixins/ingestion.py @@ -38,57 +38,60 @@ async def ingest_files( "Number of metadatas must match number of document IDs." ) - all_file_paths: list[str] = [] - for path in file_paths: - if os.path.isdir(path): - for root, _, files in os.walk(path): - all_file_paths.extend( - os.path.join(root, file) for file in files - ) - else: - all_file_paths.append(path) - with ExitStack() as stack: - files_tuples = [ - ( - "files", + all_file_paths: list[str] = [] + for path in file_paths: + if os.path.isdir(path): + for root, _, files in os.walk(path): + all_file_paths.extend( + os.path.join(root, file) for file in files + ) + else: + all_file_paths.append(path) + + with ExitStack() as stack: + files_tuples = [ ( - os.path.basename(file), - stack.enter_context(open(file, "rb")), - "application/octet-stream", - ), - ) - for file in all_file_paths - ] + "files", + ( + os.path.basename(file), + stack.enter_context(open(file, "rb")), + "application/octet-stream", + ), + ) + for file in all_file_paths + ] - data = {} - if document_ids: - data["document_ids"] = json.dumps( - [str(doc_id) for doc_id in document_ids] - ) - if metadatas: - data["metadatas"] = json.dumps(metadatas) + data = {} + if document_ids: + data["document_ids"] = json.dumps( + [str(doc_id) for doc_id in document_ids] + ) + if metadatas: + data["metadatas"] = json.dumps(metadatas) - if ingestion_config: - data["ingestion_config"] = json.dumps(ingestion_config) + if ingestion_config: + data["ingestion_config"] = json.dumps(ingestion_config) - if run_with_orchestration is not None: - data["run_with_orchestration"] = str(run_with_orchestration) + if run_with_orchestration is not None: + data["run_with_orchestration"] = str( + run_with_orchestration + ) - if collection_ids: - data["collection_ids"] = json.dumps( - [ + if collection_ids: + data["collection_ids"] = json.dumps( [ - str(collection_id) - for collection_id in doc_collection_ids + [ + str(collection_id) + for collection_id in doc_collection_ids + ] + for doc_collection_ids in collection_ids ] - for doc_collection_ids in collection_ids - ] - ) + ) - return await self._make_request( # type: ignore - "POST", "ingest_files", data=data, files=files_tuples - ) + return await self._make_request( # type: ignore + "POST", "ingest_files", data=data, files=files_tuples + ) async def update_files( self, diff --git a/py/sdk/mixins/retrieval.py b/py/sdk/mixins/retrieval.py index da2190b09..0b08479f8 100644 --- a/py/sdk/mixins/retrieval.py +++ b/py/sdk/mixins/retrieval.py @@ -214,3 +214,18 @@ async def agent( return self._make_streaming_request("POST", "agent", json=data) # type: ignore else: return await self._make_request("POST", "agent", json=data) # type: ignore + + async def embedding( + self, + content: str, + ) -> list[float]: + """ + Generate embeddings for the provided content. + + Args: + content (str): The text content to embed. + + Returns: + list[float]: The generated embedding vector. + """ + return await self._make_request("POST", "embedding", json=content) # type: ignore diff --git a/py/tests/integration/runner_cli.py b/py/tests/integration/runner_cli.py index 33cc85dca..ab63bfe59 100644 --- a/py/tests/integration/runner_cli.py +++ b/py/tests/integration/runner_cli.py @@ -79,14 +79,11 @@ def compare_document_fields(documents, expected_doc): def test_document_overview_sample_file_cli(): print("Testing: Document overview contains 'aristotle.txt'") - output = run_command( - "poetry run r2r --base-url=http://localhost:7276 documents-overview" - ) + output = run_command("poetry run r2r documents-overview") output = output.replace("'", '"').replace( "None", "null" ) # Replace Python None with JSON null output_lines = output.strip().split("\n")[1:] - print("output_lines = ", output_lines) documents = [json.loads(ele) for ele in output_lines] aristotle_document = {