Skip to content

πŸš€ A FastAPI-powered Retrieval-Augmented Generation (RAG) system with Elasticsearch, Hugging Face embeddings, and Anthropic Claude β€” featuring conversational memory for context-aware AI answers.

Notifications You must be signed in to change notification settings

shawon-kanji/python-elasticsearch-RAG

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

5 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

🧠 Conversational RAG System

A Retrieval-Augmented Generation (RAG) system built with FastAPI that combines document search capabilities with conversational memory. This system uses Elasticsearch for document storage and retrieval, and Anthropic's Claude for intelligent responses with conversation context.

✨ Features

  • πŸ“„ Document Ingestion: Automatically chunk and embed documents into Elasticsearch
  • πŸ” Semantic Search: Vector similarity search across your document corpus
  • πŸ’¬ Conversational Memory: Persistent conversation history across sessions
  • 🧠 Context-Aware Responses: Leverages both document context and conversation history
  • πŸ”§ Memory Management: Smart conversation thread management

πŸ—οΈ Architecture

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   Documents     β”‚    β”‚   User Query     β”‚    β”‚  Conversation   β”‚
β”‚  (Markdown)     β”‚    β”‚                  β”‚    β”‚    Memory       β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚  (In-Memory)    β”‚
          β”‚                      β”‚             β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
          β–Ό                      β–Ό                      β–²
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”             β”‚
β”‚  Text Splitter  β”‚    β”‚   Embeddings     β”‚             β”‚
β”‚  (LangChain)    β”‚    β”‚                  β”‚             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜             β”‚
          β”‚                      β”‚                      β”‚
          β–Ό                      β–Ό                      β”‚
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”             β”‚
β”‚  Elasticsearch  │◄────  Vector Search   β”‚             β”‚
β”‚   (Vectors +    β”‚    β”‚     (kNN)        β”‚             β”‚
β”‚   Documents)    β”‚    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜              β”‚                      β”‚
                                 β–Ό                      β”‚
                       β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”             β”‚
                       β”‚   Claude LLM     β”‚β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                       β”‚   (Anthropic)    β”‚
                       β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

πŸš€ Quick Start

Prerequisites

  • Python 3.8+
  • Elasticsearch cluster (local or cloud)
  • Anthropic API key

Installation

  1. Clone the repository

    git clone <your-repo-url>
    cd conversational-rag
  2. Install dependencies

    pip install -r requirements.txt
  3. Set up environment variables

    cp .env.example .env

    Edit .env with your configuration:

    ELASTICSEARCH_URL=http://localhost:9200
    ELASTICSEARCH_API_KEY=your_elasticsearch_api_key
    ANTHROPIC_API_KEY=your_anthropic_api_key
  4. Prepare your documents

    mkdir datasource
    # Add your markdown files to the datasource directory
  5. Run the application

    python main.py

    Or with uvicorn:

    uvicorn main:app --host 0.0.0.0 --port 8000 --reload

πŸ“š API Documentation

Once running, visit http://localhost:8000/docs for interactive API documentation.

Core Endpoints

Health Check

GET /es/health

Check Elasticsearch connectivity and system health.

Document Ingestion

GET /es/ingest

Process and index documents from the datasource directory.

Document Search

GET /es/search?query=your_question&k=5

Search documents without conversation context.

Conversational RAG

GET /rag/query?query=your_question&thread_id=optional_thread_id&k=5

Query with conversation memory. If no thread_id is provided, a new conversation thread is created.

Memory Management

GET /rag/clear_memory?thread_id=your_thread_id

Clear conversation history for a specific thread.

πŸ”§ Configuration

Document Processing

  • Chunk Size: 500 characters (configurable in RecursiveCharacterTextSplitter)
  • Chunk Overlap: 50 characters
  • Separators: ["\n\n", "\n", " ", ""]

Embedding Model

  • Model: sentence-transformers/all-MiniLM-L6-v2
  • Dimension: 384
  • Use Case: Balanced performance and quality for semantic search

LLM Configuration

  • Model: claude-3-5-sonnet-20240620
  • Temperature: 0 (deterministic responses)
  • Provider: Anthropic via LangChain

Memory Settings

  • Type: ConversationBufferMemory
  • History Limit: Last 6 messages (3 exchanges)
  • Storage: In-memory (resets on restart)

πŸ”„ Usage Examples

Basic Document Query

curl "http://localhost:8000/rag/query?query=What is the main topic of the story?"

Follow-up Question

curl "http://localhost:8000/rag/query?query=Tell me more about that&thread_id=your_thread_id"

Conversation Flow

import requests

base_url = "http://localhost:8000"

# Start conversation
response1 = requests.get(f"{base_url}/rag/query", params={
    "query": "What are the main characters?"
})
thread_id = response1.json()["thread_id"]

# Follow-up question
response2 = requests.get(f"{base_url}/rag/query", params={
    "query": "What happens to them in the end?",
    "thread_id": thread_id
})

print(response2.json()["answer"])

πŸ“ Project Structure

β”œβ”€β”€ main.py                 # FastAPI application
β”œβ”€β”€ datasource/            # Document storage directory
β”‚   └── story1.md         # Example document
β”œβ”€β”€ requirements.txt       # Python dependencies
β”œβ”€β”€ .env.example          # Environment variables template
└── README.md             # This file

πŸ› οΈ Development

Adding New Document Types

To support different document formats, extend the ingestion logic in ingest_data():

# Example: PDF support
from PyPDF2 import PdfReader

def load_pdf(file_path):
    reader = PdfReader(file_path)
    text = ""
    for page in reader.pages:
        text += page.extract_text()
    return text

Custom Embedding Models

Replace the embedding model by updating:

MODEL_NAME = "sentence-transformers/your-preferred-model"
embedder = SentenceTransformer(MODEL_NAME)

Advanced Memory Strategies

For persistent memory across restarts, consider:

  • Database Storage: PostgreSQL, MongoDB
  • Vector Databases: ChromaDB, Pinecone, Weaviate
  • Hybrid Approach: Recent messages in memory + historical in database

πŸš€ Production Deployment

Docker Setup

FROM python:3.9-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .
EXPOSE 8000

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

Environment Variables

Production environment variables:

ELASTICSEARCH_URL=https://your-es-cluster.com:9200
ELASTICSEARCH_API_KEY=your_production_api_key
ANTHROPIC_API_KEY=your_anthropic_api_key
LOG_LEVEL=INFO

Health Monitoring

The /es/health endpoint provides system health status for monitoring tools.

πŸ” Security Considerations

  • API Keys: Never commit API keys to version control
  • Authentication: Add API authentication for production use
  • Rate Limiting: Implement rate limiting for public deployments
  • Input Validation: Validate and sanitize user inputs
  • CORS: Configure CORS settings appropriately

πŸ“ˆ Performance Tuning

Elasticsearch Optimization

  • Index Settings: Adjust shards and replicas based on data size
  • Mapping: Define explicit mappings for better performance
  • Caching: Enable query result caching

Memory Management

  • Conversation Limits: Implement automatic cleanup of old conversations
  • Memory Monitoring: Monitor memory usage in production
  • Async Processing: Use async operations for better concurrency

πŸ› Troubleshooting

Common Issues

  1. Elasticsearch Connection Failed

    # Check if Elasticsearch is running
    curl -X GET "localhost:9200/_cluster/health"
  2. Empty Search Results

    • Verify documents are ingested: GET /es/ingest
    • Check index exists in Elasticsearch
    • Verify embedding model is working
  3. Memory Issues

    • Restart the application to clear in-memory conversations
    • Check available system memory
  4. API Key Errors

    • Verify environment variables are loaded
    • Check API key validity with Anthropic

Debugging

Enable debug logging:

import logging
logging.basicConfig(level=logging.DEBUG)

🀝 Contributing

  1. Fork the repository
  2. Create a feature branch: git checkout -b feature-name
  3. Commit changes: git commit -am 'Add feature'
  4. Push to branch: git push origin feature-name
  5. Submit a pull request

πŸ“„ Requirements

fastapi>=0.104.0
uvicorn>=0.24.0
elasticsearch>=8.10.0
python-dotenv>=1.0.0
langchain>=0.0.300
langchain-anthropic>=0.1.0
sentence-transformers>=2.2.2
langchain-text-splitters>=0.0.1

πŸ™ Acknowledgments


⭐ If this project helped you, please give it a star! ⭐

About

πŸš€ A FastAPI-powered Retrieval-Augmented Generation (RAG) system with Elasticsearch, Hugging Face embeddings, and Anthropic Claude β€” featuring conversational memory for context-aware AI answers.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages