diff --git a/README.md b/README.md index 1cb4db5..8342c0a 100644 --- a/README.md +++ b/README.md @@ -48,13 +48,103 @@ print(result) ## Extending the Project The RAG Document Parser can be extended with the following features: +## Hybrid Retrieval & Reranking + +The RAG Document Parser now supports hybrid retrieval that combines both dense semantic search and sparse keyword search with intelligent reranking. This approach significantly improves retrieval accuracy by leveraging the strengths of both methods. + +### How Hybrid Retrieval Works + +1. **Dense Semantic Search**: Uses vector embeddings to find semantically similar content +2. **Sparse Keyword Search**: Uses TF-IDF based keyword matching for exact term matches +3. **Result Merging**: Combines results from both approaches, deduplicating by document ID +4. **Intelligent Reranking**: Scores results using multiple relevance signals: + - Normalized dense similarity score (weight: 0.5) + - Normalized sparse keyword score (weight: 0.3) + - Lexical overlap ratio (weight: 0.2) + +### Using Hybrid Search + +#### Basic Usage + +```python +from src.retrieval.hybrid_search import hybrid_search + +# Execute hybrid search +results = hybrid_search( + query="existential meaning of life", + top_k_dense=5, # Number of dense results + top_k_sparse=20 # Number of sparse results +) + +# Results include detailed scoring breakdown +for result in results: + print(f"Score: {result['relevance_score']:.4f}") + print(f"Text: {result['text']}") + print(f"Source: {result['source']}") # 'dense', 'sparse', 'both' + + # Detailed score breakdown + breakdown = result['score_breakdown'] + print(f"Dense: {breakdown['normalized_dense']:.3f}") + print(f"Sparse: {breakdown['normalized_sparse']:.3f}") + print(f"Overlap: {breakdown['overlap_score']:.3f}") +``` + +#### Testing Hybrid Search + +```bash +# Test both semantic and hybrid search +python src/scripts/test_search.py +``` + +### Document Ingestion for Hybrid Search + +The ingestion process now creates both dense and sparse indexes: + +```bash +# Ingest documents with hybrid indexing +python src/scripts/ingest_documents.py +``` + +During ingestion: +- **Dense vectors** are stored in the `philosophy-rag` index using managed embeddings +- **Sparse vectors** are stored in the `philosophy-rag-sparse` index using TF-IDF weights +- **Vocabulary** and **document frequencies** are persisted in `data/vocab.json` and `data/df.json` + +### Technical Details + +#### Sparse Vector Construction +- **Tokenization**: Lowercase, alphanumeric splitting, stopword filtering, minimum length 2 +- **TF-IDF Formula**: `(1 + log(tf)) * log((N + 1) / (df + 1)) + 1` +- **Vocabulary Management**: Token-to-ID mapping persisted locally +- **Document Frequencies**: Global DF statistics for IDF calculation + +#### Reranking Algorithm +1. Normalize dense and sparse scores separately using min-max normalization +2. Calculate lexical overlap as intersection/union of query and document tokens +3. Compute weighted combination: `0.5 * dense + 0.3 * sparse + 0.2 * overlap` +4. Sort results by final relevance score + +#### Graceful Degradation +- If sparse index is unavailable, falls back to dense-only search +- If dense index is unavailable, falls back to sparse-only search +- Maintains consistent result format regardless of available indexes + +### Data Storage + +The system creates and maintains: +- `data/vocab.json`: Token to integer ID mapping for sparse vectors +- `data/df.json`: Document frequencies for TF-IDF calculations +- Two Pinecone indexes: + - `philosophy-rag`: Dense vectors with managed embeddings + - `philosophy-rag-sparse`: Sparse vectors with TF-IDF weights + +## Extending the Project +The RAG Document Parser can be extended with the following features: + ### 1. Reranking with Cross-Encoder Enhance the retrieval process by adding a reranking step using a cross-encoder. After retrieving the initial `top_k` documents, use a cross-encoder model to rerank the results based on relevance. -### 2. Hybrid Retrieval -Combine classical and dense retrieval methods by storing text in a classical index (e.g., Elasticsearch) while also leveraging dense embeddings. This hybrid approach improves retrieval accuracy and robustness. - -### 3. Generation Layer with Prompt Templates +### 2. Generation Layer with Prompt Templates Incorporate a generation layer using a Large Language Model (LLM). Use prompt templates that reference retrieved chunks to generate coherent and contextually relevant outputs. ## License diff --git a/data/README.md b/data/README.md new file mode 100644 index 0000000..66e9b7e --- /dev/null +++ b/data/README.md @@ -0,0 +1,8 @@ +# Data Directory + +This directory contains persistent data files for the hybrid retrieval system: + +- `vocab.json` - Vocabulary mapping from tokens to integer IDs for sparse vectors +- `df.json` - Document frequencies for each token used in TF-IDF calculations + +These files are automatically created and maintained by the sparse storage system. \ No newline at end of file diff --git a/src/retrieval/__init__.py b/src/retrieval/__init__.py new file mode 100644 index 0000000..6985e06 --- /dev/null +++ b/src/retrieval/__init__.py @@ -0,0 +1 @@ +# Retrieval module for hybrid search functionality \ No newline at end of file diff --git a/src/retrieval/hybrid_search.py b/src/retrieval/hybrid_search.py new file mode 100644 index 0000000..a45330d --- /dev/null +++ b/src/retrieval/hybrid_search.py @@ -0,0 +1,336 @@ +""" +Hybrid search implementation combining dense semantic search with sparse keyword search. + +This module provides: +- Hybrid search combining dense and sparse retrieval +- Result merging and deduplication by ID +- Reranking with multiple relevance signals +- Configurable scoring weights +""" + +from typing import List, Dict, Any, Set +from collections import defaultdict +import math + +# Import search functions +from src.storage.vector_store import semantic_query +from src.storage.sparse_store import sparse_store + +# Scoring weights - configurable constants +W_DENSE = 0.5 # Weight for dense semantic score +W_SPARSE = 0.3 # Weight for sparse keyword score +W_OVERLAP = 0.2 # Weight for lexical overlap score + + +def calculate_lexical_overlap(query: str, text: str) -> float: + """ + Calculate lexical overlap ratio between query and text. + + Returns: unique query terms present / total unique query terms + """ + if not query or not text: + return 0.0 + + # Tokenize using same method as sparse store + query_tokens = set(sparse_store.tokenize(query)) + text_tokens = set(sparse_store.tokenize(text)) + + if not query_tokens: + return 0.0 + + overlap = len(query_tokens.intersection(text_tokens)) + return overlap / len(query_tokens) + + +def normalize_scores(scores: List[float]) -> List[float]: + """ + Normalize scores to 0-1 range using min-max normalization. + + Args: + scores: List of scores to normalize + + Returns: + List of normalized scores (0-1 range) + """ + if not scores: + return [] + + min_score = min(scores) + max_score = max(scores) + + # Handle case where all scores are the same + if max_score == min_score: + return [1.0] * len(scores) + + # Min-max normalization + normalized = [] + for score in scores: + norm_score = (score - min_score) / (max_score - min_score) + normalized.append(norm_score) + + return normalized + + +def merge_results(dense_results: List[Dict[str, Any]], + sparse_results: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]: + """ + Merge dense and sparse results by ID, aggregating scores. + + Args: + dense_results: Results from semantic search + sparse_results: Results from sparse search + + Returns: + Dictionary mapping result ID to merged result with both scores + """ + merged = {} + + # Process dense results + for result in dense_results: + result_id = result['id'] + merged[result_id] = { + 'id': result_id, + 'text': result['text'], + 'metadata': result.get('metadata', {}), + 'dense_score': result['score'], + 'sparse_score': 0.0, + 'source': 'dense' + } + + # Process sparse results + for result in sparse_results: + result_id = result['id'] + if result_id in merged: + # Update existing result with sparse score + merged[result_id]['sparse_score'] = result['score'] + merged[result_id]['source'] = 'both' + else: + # Add new sparse-only result + merged[result_id] = { + 'id': result_id, + 'text': result['text'], + 'metadata': result.get('metadata', {}), + 'dense_score': 0.0, + 'sparse_score': result['score'], + 'source': 'sparse' + } + + return merged + + +def rerank_results(query: str, merged_results: Dict[str, Dict[str, Any]]) -> List[Dict[str, Any]]: + """ + Rerank merged results using weighted combination of multiple signals. + + Args: + query: Original search query + merged_results: Dictionary of merged results by ID + + Returns: + List of reranked results with final relevance scores + """ + if not merged_results: + return [] + + results_list = list(merged_results.values()) + + # Extract and normalize dense scores + dense_scores = [r['dense_score'] for r in results_list] + normalized_dense = normalize_scores([s for s in dense_scores if s > 0]) + + # Extract and normalize sparse scores + sparse_scores = [r['sparse_score'] for r in results_list] + normalized_sparse = normalize_scores([s for s in sparse_scores if s > 0]) + + # Create mappings for normalized scores + dense_score_map = {} + sparse_score_map = {} + + # Map normalized dense scores + dense_idx = 0 + for i, result in enumerate(results_list): + if result['dense_score'] > 0: + dense_score_map[i] = normalized_dense[dense_idx] if normalized_dense else 0.0 + dense_idx += 1 + else: + dense_score_map[i] = 0.0 + + # Map normalized sparse scores + sparse_idx = 0 + for i, result in enumerate(results_list): + if result['sparse_score'] > 0: + sparse_score_map[i] = normalized_sparse[sparse_idx] if normalized_sparse else 0.0 + sparse_idx += 1 + else: + sparse_score_map[i] = 0.0 + + # Calculate final relevance scores + reranked_results = [] + for i, result in enumerate(results_list): + # Get normalized component scores + norm_dense = dense_score_map[i] + norm_sparse = sparse_score_map[i] + + # Calculate lexical overlap + overlap_score = calculate_lexical_overlap(query, result['text']) + + # Weighted combination + relevance_score = ( + W_DENSE * norm_dense + + W_SPARSE * norm_sparse + + W_OVERLAP * overlap_score + ) + + # Create final result + final_result = { + 'id': result['id'], + 'text': result['text'], + 'metadata': result['metadata'], + 'relevance_score': relevance_score, + 'score_breakdown': { + 'dense_score': result['dense_score'], + 'sparse_score': result['sparse_score'], + 'normalized_dense': norm_dense, + 'normalized_sparse': norm_sparse, + 'overlap_score': overlap_score, + 'final_score': relevance_score + }, + 'source': result['source'] + } + reranked_results.append(final_result) + + # Sort by relevance score (descending) + reranked_results.sort(key=lambda x: x['relevance_score'], reverse=True) + + return reranked_results + + +def hybrid_search(query: str, top_k_dense: int = 5, top_k_sparse: int = 20) -> List[Dict[str, Any]]: + """ + Execute hybrid search combining dense and sparse retrieval with reranking. + + Args: + query: Search query string + top_k_dense: Number of dense results to retrieve (default: 5) + top_k_sparse: Number of sparse results to retrieve (default: 20) + + Returns: + List of reranked results with relevance scores and breakdown + """ + print(f"Executing hybrid search for query: '{query}'") + + # Execute dense semantic search + try: + print("Executing dense semantic search...") + dense_raw = semantic_query(query, top_k=top_k_dense) + + # Parse dense results - handle Pinecone response format + dense_results = [] + hits = dense_raw.get("result", {}).get("hits", []) + for hit in hits: + fields = hit.get("fields", {}) or {} + text = fields.get("chunk_text") or fields.get("text") or "" + + # Extract metadata + metadata = {} + for k, v in fields.items(): + if k.startswith("meta_"): + metadata[k[5:]] = v # strip 'meta_' prefix + + dense_results.append({ + "id": hit.get("_id"), + "score": hit.get("_score", 0.0), + "text": text, + "metadata": metadata + }) + + print(f"Dense search returned {len(dense_results)} results") + + except Exception as e: + print(f"Dense search failed: {e}") + dense_results = [] + + # Execute sparse keyword search + try: + print("Executing sparse keyword search...") + sparse_results = sparse_store.sparse_query(query, top_k=top_k_sparse) + print(f"Sparse search returned {len(sparse_results)} results") + + except Exception as e: + print(f"Sparse search failed: {e}") + sparse_results = [] + + # Handle case where both searches failed + if not dense_results and not sparse_results: + print("Both dense and sparse searches failed") + return [] + + # Handle graceful degradation + if not sparse_results: + print("Sparse search unavailable, using dense results only") + # Return dense results with relevance scores + reranked = [] + dense_scores = [r['score'] for r in dense_results] + normalized_dense = normalize_scores(dense_scores) + + for i, result in enumerate(dense_results): + overlap_score = calculate_lexical_overlap(query, result['text']) + relevance_score = W_DENSE * normalized_dense[i] + W_OVERLAP * overlap_score + + reranked.append({ + 'id': result['id'], + 'text': result['text'], + 'metadata': result['metadata'], + 'relevance_score': relevance_score, + 'score_breakdown': { + 'dense_score': result['score'], + 'sparse_score': 0.0, + 'normalized_dense': normalized_dense[i], + 'normalized_sparse': 0.0, + 'overlap_score': overlap_score, + 'final_score': relevance_score + }, + 'source': 'dense_only' + }) + + return reranked + + if not dense_results: + print("Dense search unavailable, using sparse results only") + # Return sparse results with relevance scores + reranked = [] + sparse_scores = [r['score'] for r in sparse_results] + normalized_sparse = normalize_scores(sparse_scores) + + for i, result in enumerate(sparse_results): + overlap_score = calculate_lexical_overlap(query, result['text']) + relevance_score = W_SPARSE * normalized_sparse[i] + W_OVERLAP * overlap_score + + reranked.append({ + 'id': result['id'], + 'text': result['text'], + 'metadata': result['metadata'], + 'relevance_score': relevance_score, + 'score_breakdown': { + 'dense_score': 0.0, + 'sparse_score': result['score'], + 'normalized_dense': 0.0, + 'normalized_sparse': normalized_sparse[i], + 'overlap_score': overlap_score, + 'final_score': relevance_score + }, + 'source': 'sparse_only' + }) + + return reranked + + # Merge results from both searches + print("Merging and reranking results...") + merged_results = merge_results(dense_results, sparse_results) + + # Rerank using weighted combination + final_results = rerank_results(query, merged_results) + + print(f"Final hybrid search returned {len(final_results)} reranked results") + + return final_results \ No newline at end of file diff --git a/src/scripts/ingest_documents.py b/src/scripts/ingest_documents.py index 7f83db4..164d629 100644 --- a/src/scripts/ingest_documents.py +++ b/src/scripts/ingest_documents.py @@ -7,6 +7,7 @@ from src.ingestion.normalizer import normalize_metadata from src.ingestion.chunk_document import chunk_document from src.storage.vector_store import store_vectors +from src.storage.sparse_store import sparse_store # from src.logging_utils.audit_logger import log_event def ingest_documents(directory): @@ -29,12 +30,22 @@ def ingest_documents(directory): # Chunk text chunks = chunk_document(pdf_content, normalized_metadata) print(f'Chunks created: {len(chunks)}') - # print(chunks[:2]) - # Store vectors - # vector_ids = store_vectors(chunks, normalized_metadata) + # Assign consistent IDs to chunks for both dense and sparse storage + import uuid + for chunk in chunks: + chunk['id'] = str(uuid.uuid4()) + + # Store dense vectors vector_ids = store_vectors(chunks) - # print(f'Stored {len(vector_ids)} vectors for {filename}.') + print(f'Stored {len(chunks)} dense vectors for {filename}.') + + # Store sparse vectors + try: + sparse_store.upsert_sparse_vectors(chunks) + print(f'Stored {len(chunks)} sparse vectors for {filename}.') + except Exception as e: + print(f'Warning: Failed to store sparse vectors for {filename}: {e}') # # Log the ingestion event # log_event(f'Document ingested: {filename}', metadata) diff --git a/src/scripts/test_search.py b/src/scripts/test_search.py index 036659b..3278085 100644 --- a/src/scripts/test_search.py +++ b/src/scripts/test_search.py @@ -2,16 +2,48 @@ from dotenv import load_dotenv # dev dependency load_dotenv() from src.storage.vector_store import semantic_query +from src.retrieval.hybrid_search import hybrid_search -if __name__ == '__main__': - # A sentence (or fragment) you expect exists in a chunk +def test_semantic_search(): + """Test traditional semantic search.""" + print("=== SEMANTIC SEARCH TEST ===") query = "existential meaning life" - + # Search the dense index results = semantic_query(query) - print('results are') + print('Semantic search results:') print(results) + print() - # # Print the results - # for hit in results['result']['hits']: - # print(f"id: {hit['_id']:<5} | score: {round(hit['_score'], 2):<5} | category: {hit['fields']['category']:<10} | text: {hit['fields']['chunk_text']:<50}") \ No newline at end of file +def test_hybrid_search(): + """Test hybrid search with reranking.""" + print("=== HYBRID SEARCH TEST ===") + query = "existential meaning life" + + # Execute hybrid search + results = hybrid_search(query, top_k_dense=5, top_k_sparse=20) + + print(f"\nHybrid search results for '{query}':") + print(f"Total results: {len(results)}") + print() + + # Display top results with detailed scoring + for i, result in enumerate(results[:5]): # Show top 5 + print(f"Result {i+1}:") + print(f" ID: {result['id']}") + print(f" Relevance Score: {result['relevance_score']:.4f}") + print(f" Source: {result['source']}") + print(f" Text: {result['text'][:100]}...") + + breakdown = result['score_breakdown'] + print(f" Score Breakdown:") + print(f" Dense: {breakdown['dense_score']:.4f} -> normalized: {breakdown['normalized_dense']:.4f}") + print(f" Sparse: {breakdown['sparse_score']:.4f} -> normalized: {breakdown['normalized_sparse']:.4f}") + print(f" Overlap: {breakdown['overlap_score']:.4f}") + print(f" Final: {breakdown['final_score']:.4f}") + print() + +if __name__ == '__main__': + # Test both search methods + test_semantic_search() + test_hybrid_search() \ No newline at end of file diff --git a/src/storage/sparse_store.py b/src/storage/sparse_store.py new file mode 100644 index 0000000..93fd557 --- /dev/null +++ b/src/storage/sparse_store.py @@ -0,0 +1,276 @@ +""" +Sparse vector storage and management for hybrid retrieval. + +This module handles: +- Vocabulary management (token to integer ID mapping) +- Document frequency tracking for TF-IDF calculations +- Sparse vector construction from text +- Pinecone sparse index operations +""" + +import json +import math +import os +import re +import uuid +from collections import Counter, defaultdict +from typing import Dict, List, Tuple, Any, Set + +from pinecone import Pinecone + +# Configuration +PINECONE_API_KEY = os.getenv("PINECONE_API_KEY") +if not PINECONE_API_KEY: + raise RuntimeError("Set PINECONE_API_KEY env var") + +pc = Pinecone(api_key=PINECONE_API_KEY) + +SPARSE_INDEX_NAME = "philosophy-rag-sparse" +NAMESPACE = "__default__" +VOCAB_FILE = "/home/runner/work/rag-document-parser/rag-document-parser/data/vocab.json" +DF_FILE = "/home/runner/work/rag-document-parser/rag-document-parser/data/df.json" + +# Basic stopwords list +STOPWORDS = { + 'a', 'an', 'and', 'are', 'as', 'at', 'be', 'by', 'for', 'from', 'has', 'he', + 'in', 'is', 'it', 'its', 'of', 'on', 'that', 'the', 'to', 'was', 'will', 'with', + 'i', 'you', 'we', 'they', 'this', 'but', 'not', 'or', 'have', 'had', 'been', + 'their', 'if', 'would', 'could', 'should', 'can', 'may', 'might', 'must' +} + + +class SparseVectorStore: + def __init__(self): + self.vocab = self._load_vocab() + self.df = self._load_df() + self.next_token_id = max(self.vocab.values()) + 1 if self.vocab else 1 + + def _load_vocab(self) -> Dict[str, int]: + """Load vocabulary from file or return empty dict.""" + if os.path.exists(VOCAB_FILE): + try: + with open(VOCAB_FILE, 'r') as f: + return json.load(f) + except (json.JSONDecodeError, IOError): + print(f"Warning: Could not load vocab from {VOCAB_FILE}, starting fresh") + return {} + + def _save_vocab(self): + """Save vocabulary to file.""" + os.makedirs(os.path.dirname(VOCAB_FILE), exist_ok=True) + with open(VOCAB_FILE, 'w') as f: + json.dump(self.vocab, f, indent=2) + + def _load_df(self) -> Dict[str, int]: + """Load document frequencies from file or return empty dict.""" + if os.path.exists(DF_FILE): + try: + with open(DF_FILE, 'r') as f: + return json.load(f) + except (json.JSONDecodeError, IOError): + print(f"Warning: Could not load DF from {DF_FILE}, starting fresh") + return {} + + def _save_df(self): + """Save document frequencies to file.""" + os.makedirs(os.path.dirname(DF_FILE), exist_ok=True) + with open(DF_FILE, 'w') as f: + json.dump(self.df, f, indent=2) + + def tokenize(self, text: str) -> List[str]: + """ + Tokenize text according to specs: + - Lowercase + - Split on non-alphanumeric + - Filter stopwords + - Length >= 2 + """ + if not text: + return [] + + # Lowercase and split on non-alphanumeric + tokens = re.findall(r'[a-zA-Z0-9]+', text.lower()) + + # Filter stopwords and length + filtered_tokens = [ + token for token in tokens + if len(token) >= 2 and token not in STOPWORDS + ] + + return filtered_tokens + + def get_or_create_token_id(self, token: str) -> int: + """Get existing token ID or create new one.""" + if token not in self.vocab: + self.vocab[token] = self.next_token_id + self.next_token_id += 1 + return self.vocab[token] + + def build_sparse_vector(self, text: str, update_df: bool = True) -> Tuple[List[int], List[float]]: + """ + Build sparse vector from text using TF-IDF. + + Args: + text: Input text to vectorize + update_df: Whether to update document frequencies (True during ingestion) + + Returns: + Tuple of (indices, values) for sparse vector + """ + tokens = self.tokenize(text) + if not tokens: + return [], [] + + # Calculate term frequencies + tf_counts = Counter(tokens) + unique_tokens = set(tokens) + + # Update document frequencies if needed + if update_df: + for token in unique_tokens: + self.df[token] = self.df.get(token, 0) + 1 + + # Get total number of documents (approximate) + N = max(self.df.values()) if self.df else 1 + + indices = [] + values = [] + + for token, tf in tf_counts.items(): + token_id = self.get_or_create_token_id(token) + df = self.df.get(token, 1) # Default to 1 if not found + + # TF-IDF formula: (1 + log(tf)) * log((N + 1) / (df + 1)) + 1 + tf_component = 1 + math.log(tf) + idf_component = math.log((N + 1) / (df + 1)) + tfidf_score = tf_component * idf_component + 1 + + indices.append(token_id) + values.append(tfidf_score) + + return indices, values + + def ensure_sparse_index(self): + """Ensure sparse index exists in Pinecone.""" + if not pc.has_index(SPARSE_INDEX_NAME): + # Create serverless index for sparse vectors + pc.create_index( + name=SPARSE_INDEX_NAME, + dimension=1, # Minimal dimension since we're using sparse vectors + metric="cosine", + spec={ + "serverless": { + "cloud": "aws", + "region": "us-east-1" + } + } + ) + return pc.Index(SPARSE_INDEX_NAME) + + def upsert_sparse_vectors(self, chunks: List[Dict[str, Any]]): + """ + Upsert sparse vectors to Pinecone index. + + Args: + chunks: List of chunk dictionaries with 'chunk', 'id', 'metadata' + """ + index = self.ensure_sparse_index() + vectors = [] + + for chunk in chunks: + chunk_text = chunk.get('chunk', '') + chunk_id = chunk.get('id', str(uuid.uuid4())) + metadata = chunk.get('metadata', {}) + + indices, values = self.build_sparse_vector(chunk_text, update_df=True) + + if indices and values: # Only add if we have valid sparse vector + vector = { + "id": chunk_id, + "sparse_values": { + "indices": indices, + "values": values + }, + "metadata": { + "chunk_text": chunk_text[:1000], # Limit text length + **{f"meta_{k}": v for k, v in metadata.items() + if isinstance(v, (str, int, float, bool))} + } + } + vectors.append(vector) + + # Upsert in batches + batch_size = 100 + for i in range(0, len(vectors), batch_size): + batch = vectors[i:i+batch_size] + index.upsert(vectors=batch, namespace=NAMESPACE) + + # Save updated vocab and DF + self._save_vocab() + self._save_df() + + print(f"Upserted {len(vectors)} sparse vectors to {SPARSE_INDEX_NAME}") + + def sparse_query(self, query: str, top_k: int = 20) -> List[Dict[str, Any]]: + """ + Execute sparse keyword search. + + Args: + query: Search query + top_k: Number of results to return + + Returns: + List of search results with scores and metadata + """ + try: + index = self.ensure_sparse_index() + + # Build sparse vector for query (don't update DF) + indices, values = self.build_sparse_vector(query, update_df=False) + + if not indices or not values: + print(f"Warning: No valid tokens in query '{query}'") + return [] + + # Execute sparse search + results = index.query( + sparse_vector={ + "indices": indices, + "values": values + }, + top_k=top_k, + namespace=NAMESPACE, + include_metadata=True + ) + + # Format results + formatted_results = [] + for match in results.get('matches', []): + formatted_results.append({ + 'id': match['id'], + 'score': match['score'], + 'text': match.get('metadata', {}).get('chunk_text', ''), + 'metadata': { + k[5:] if k.startswith('meta_') else k: v + for k, v in match.get('metadata', {}).items() + if k != 'chunk_text' + } + }) + + return formatted_results + + except Exception as e: + print(f"Warning: Sparse search failed: {e}") + return [] + + def get_vocab_size(self) -> int: + """Get current vocabulary size.""" + return len(self.vocab) + + def get_document_count(self) -> int: + """Get approximate document count from DF stats.""" + return max(self.df.values()) if self.df else 0 + + +# Global instance +sparse_store = SparseVectorStore() \ No newline at end of file diff --git a/src/storage/vector_store.py b/src/storage/vector_store.py index 371383c..f691c2e 100644 --- a/src/storage/vector_store.py +++ b/src/storage/vector_store.py @@ -55,8 +55,10 @@ def to_records(chunks: Iterable[Dict]) -> List[Dict]: records = [] for c in chunks: metadata = _flatten_metadata(c.get("metadata", {})) + # Use existing ID if provided, otherwise generate new one + chunk_id = c.get("id") or str(uuid.uuid4()) records.append({ - "id": str(uuid.uuid4()), + "id": chunk_id, "chunk_text": c.get("chunk"), # field mapped to 'text' **metadata })