Skip to content

A high-performance async Python pipeline for researching user achievements through parallel web searches and LLM analysis. Demonstrates best practices for concurrent API orchestration and async/await patterns.

Notifications You must be signed in to change notification settings

nsourlos/async-llm-research-pipeline

Repository files navigation

🔬 Async Parallel Research Pipeline

A Python application that fetches random users and researches their public achievements using parallel web searches and LLM analysis.

📋 Table of Contents

🎯 Overview

This project demonstrates three different approaches to researching user achievements:

  1. Direct LLM Calls (llm_call.py) - Simple, synchronous implementation
  2. LangChain Agents (llm_agents.py) - Autonomous agent with tool selection
  3. Async Parallel Pipeline (llm_agents_async.py) - High-performance parallel execution ⭐

The async parallel approach achieves speedup compared to sequential processing by executing web searches and LLM analyses concurrently.

✨ Features

  • 🚀 Parallel Execution: Concurrent web searches and LLM analyses
  • 🔒 Rate Limiting: Semaphore-based concurrency control to respect API quotas
  • 📊 Real-time Results: Live progress updates as analyses complete
  • 📝 Markdown Reports: Auto-generated reports with architecture diagrams
  • Comprehensive Testing: Tests with mock API responses
  • 🛡️ Error Handling: Graceful degradation and retry logic
  • 🎨 Clean Architecture: Modular, maintainable code structure

🏗️ Architecture

flowchart TD
    A[Start Workflow] --> B[Fetch Random Users]
    B --> C{Users Found?}
    C -->|No| D[End - No Users]
    C -->|Yes| E[Create Parallel Search Tasks]
    
    E --> F1[Search Person 1]
    E --> F2[Search Person 2]
    E --> F3[Search Person N...]
    
    F1 --> G[Tavily Web Search API]
    F2 --> G
    F3 --> G
    
    G --> H[Gather All Search Results]
    H --> I[Create Parallel LLM Analysis Tasks]
    
    I --> J1[Analyze Person 1]
    I --> J2[Analyze Person 2]
    I --> J3[Analyze Person N...]
    
    J1 --> K[OpenAI LLM API]
    J2 --> K
    J3 --> K
    
    K --> L[Gather All Analyses]
    L --> M[Generate Summary & Reports]
    M --> N[Display Results & Timing]
Loading

Pipeline Stages

  1. Fetch Users (0.21s) - Single API call to randomuser.me
  2. Parallel Web Searches (1.53s for 5 users) - Concurrent Tavily searches
  3. Parallel LLM Analysis (19.21s for 5 users) - Concurrent OpenAI analyses
  4. Report Generation - Markdown file with architecture and results

Total Time: ~21s

🚀 Setup

Prerequisites

  • Python 3.11
  • API Keys:
    • OpenRouter API key (for LLM access)
    • Tavily API key (for web search)

Installation

  1. Clone or download the project
git clone https://github.com/nsourlos/async_LLM_web_search.git
cd /async_LLM_web_search
  1. Install dependencies

Option A: Using uv (Recommended - Faster)

uv is a fast Python package installer and resolver.

# Install uv if not already installed
curl -LsSf https://astral.sh/uv/install.sh | sh

# Create virtual environment with Python 3.11
uv venv genai --python 3.11

# Activate the environment
source genai/bin/activate  # On macOS/Linux
# OR
genai\Scripts\activate     # On Windows

# Install dependencies
uv pip install ipykernel 
uv pip install -r requirements.txt

# Optional: Install Jupyter kernel for notebook support
genai/bin/python -m ipykernel install --user --name=genai --display-name "genai"

Option B: Using pip (Traditional)

# Optional: Create virtual environment (recommended)
python -m venv venv
source venv/bin/activate  # On macOS/Linux
# OR
venv\Scripts\activate     # On Windows

# Install dependencies from requirements.txt
pip install -r requirements.txt
  1. Configure environment variables
# Create env file
cp env.example env

# Edit env file and add your API keys:
OPEN_ROUTER_API_KEY=your_openrouter_key_here
TAVILY_API_KEY=your_tavily_key_here

📖 Usage

Run the Async Parallel Workflow (Recommended)

python main.py

This will:

  • Fetch 5 random users
  • Search for their achievements in parallel
  • Analyze results with LLM in parallel
  • Generate a markdown report (workflow_results_YYYYMMDD_HHMMSS.md)
  • Display timing statistics

Customize Parameters

# Adjust number of users and concurrency
asyncio.run(run_parallel_workflow(
    num_users=10,           # Number of users to research
    max_concurrent=5,        # Max concurrent API calls
    save_markdown=True       # Generate markdown report
))

🎓 Best Practices Implemented

1. Async/Await Pattern

# Proper async execution with asyncio.gather()
search_tasks = [
    search_person_async(user['name'], user['country'], semaphore)
    for user in users
]
search_results = await asyncio.gather(*search_tasks)

Benefits:

  • Non-blocking I/O operations
  • Maximum parallelism for API calls
  • Efficient resource utilization

2. Semaphore-Based Rate Limiting

semaphore = asyncio.Semaphore(max_concurrent)  # Limit to 5 concurrent
async with semaphore:
    # API call here

Benefits:

  • Prevents API rate limit errors
  • Controls resource usage

3. ThreadPoolExecutor for Blocking Calls

loop = asyncio.get_event_loop()
results = await loop.run_in_executor(
    None,  # Default ThreadPoolExecutor
    lambda: tavily_search.invoke({"query": search_query})
)

Benefits:

  • Handles synchronous APIs in async context
  • Prevents blocking the event loop
  • Maintains concurrency benefits

4. Modular Project Structure

fetch_filter_user_data.py  → Data fetching & filtering
llm_call.py                 → Direct LLM interactions
llm_agents.py               → Agent-based orchestration
llm_agents_async.py         → Async parallel implementation
main.py                     → Entry point
test_user_filter.py         → Comprehensive tests

Benefits:

  • Easy to test and maintain
  • Reusable components across different approaches
  • Simple to extend with new features

5. Comprehensive Error Handling

try:
    response = await llm.invoke(prompt)
    person_data = json.loads(response.content)
except Exception as e:
    print(f"✗ Error analyzing {name}: {str(e)}")
    return {"status": "error", "error": str(e)}

Benefits:

  • Informative error messages
  • Track error and what caused it

6. Retry Logic with Exponential Backoff

for attempt in range(max_retries):
    try:
        completion = client.chat.completions.create(...)
        break
    except Exception as e:
        wait_time = 2 ** attempt  # Exponential backoff
        time.sleep(wait_time)

Benefits:

  • Respects API rate limits

7. Environment Variable Security

load_dotenv('env')
api_key = os.getenv("OPEN_ROUTER_API_KEY")

Benefits:

  • No hardcoded credentials
  • Easy configuration management

8. Testing with Mocks

def test_fetch_random_users_success(mocker):
    mock_response = mocker.Mock()
    mocker.patch('fetch_filter_user_data.requests.get', 
                 return_value=mock_response)

Benefits:

  • Tests without API calls
  • Predictable test results
  • Fast test execution

9. Clean Code and Documentation

async def search_person_async(name: str, country: str, 
                               semaphore: asyncio.Semaphore) -> dict:
    """
    Asynchronously search for a person's achievements.
    
    Args:
        name: Full name of the person
        country: Country of origin
        semaphore: Asyncio semaphore to limit concurrency
        
    Returns:
        Dict with search results
    """

Benefits:

  • Self-documenting code with docstrings
  • Inline comments explain complex logic
  • Easier maintenance and modular design
  • Easy onboarding for new developers

10. Structured Logging and Reports

add_markdown_header(md_lines, "Performance Metrics", level=2)
add_markdown_line(md_lines, f"- **Total Time**: {total_time:.2f}s")

Benefits:

  • Observable execution
  • Shareable results
  • Performance tracking

11. Fast Dependency Management with uv

We use uv as our recommended package manager.

Why uv is Superior:

  • 10-100x faster than pip for package resolution and installation
  • 🔒 Deterministic builds with better dependency resolution
  • 🎯 Modern Rust-based implementation for performance
  • 📦 Drop-in replacement for pip - same commands work
  • 💾 Smaller disk footprint with better caching

🚀 Future Enhancements

1. Configuration File for Parameters

# config.yaml
llm: #or use llmlite
  model: "openai/gpt-5-mini"
  temperature: 0.0
  max_tokens: 2000
  base_url: "https://openrouter.ai/api/v1"

search:
  provider: "tavily"
  max_results: 3
  timeout: 10

workflow:
  num_users: 5
  max_concurrent: 5
  save_markdown: true
  output_dir: "./results"

# Usage in code
config = Config.from_yaml('config.yaml')
llm = ChatOpenAI(model=config.llm_model, temperature=config.llm_temperature)

Benefits:

  • Easy experimentation with different LLM models
  • Adjust concurrency limits per API quota
  • Configure timeouts and retry strategies
  • Support multiple deployment environments (dev/prod)
  • Also possible to add prompts there for easier modification

2. Streaming LLM Responses

async def stream_llm_analysis():
    async for chunk in llm.astream(prompt):
        yield chunk  # Process as data arrives

Expected Impact:

  • Faster time-to-first-result
  • Better user experience
  • Reduced perceived latency

3. Prompt Optimization

# Current: ~500 tokens
# Optimized: ~200 tokens
prompt = f"""JSON only. Analyze: {name} ({country})
Data: {search_data}
Schema: {{"has_public_record":bool,"field":str,"key_achievement":str}}"""

Also consider using DSPy

Expected Impact:

  • Fewer tokens
  • Cost reduction
  • Slightly faster responses

4. Tests for async code and Langchain Agents

Expected Impact:

  • Verifiable code for production

📁 Project Structure

async_LLM_web_search/
├── main.py                          # Entry point
├── llm_agents_async.py              # Async parallel implementation ⭐
├── llm_agents.py                    # LangChain agent approach
├── llm_call.py                      # Direct LLM calls
├── fetch_filter_user_data.py        # User data fetching
├── test_user_filter.py              # Tests
├── architecture_flow.md             # Architecture diagram
├── workflow_results_*.md            # Generated reports
├── requirements.txt                 # Python dependencies
├── env                              # Environment variables (to be filled in)
└── README.md                        # This file

Key Files

  • llm_agents_async.py: Main implementation with parallel execution
  • main.py: Entry point that runs the workflow
  • fetch_filter_user_data.py: User data fetching and filtering logic
  • test_user_filter.py: Comprehensive unit tests
  • architecture_flow.md: Mermaid diagram of the system

🧪 Testing

Run Tests

Simply run the below command inside the folder:

pytest

Test Coverage

  • ✅ Date filtering logic (before/after 2000)
  • ✅ Edge cases (exactly 2000, invalid dates)
  • ✅ API mocking (success and timeout scenarios)
  • ✅ Multiple users with mixed dates

Sample Test

def test_fetch_random_users_success(mocker):
    """Verify successful API call using pytest-mock."""
    mock_response = mocker.Mock()
    mock_response.json.return_value = {
        "results": [{"name": {"first": "jane", "last": "doe"}, 
                     "dob": {"date": "2003-01-01T00:00:00.000Z"}}]
    }
    mocker.patch('fetch_filter_user_data.requests.get', 
                 return_value=mock_response)
    
    result = fetch_random_users(1)
    assert result["results"][0]["name"]["first"] == "jane"

📊 Example Output

====================================================================================================
STARTING PARALLEL AGENTIC WORKFLOW
====================================================================================================
📥 Fetching 5 random users...
Fetched 5 users.
✓ Found 5 users

⏱️  User fetch completed in 0.21s

====================================================================================================
PARALLEL WEB SEARCHES
====================================================================================================
🔍 Searching for: Nixon Harris (New Zealand)...
🔍 Searching for: Antoine Jones (Canada)...
🔍 Searching for: Jorrick Van Der Weijden (Netherlands)...
🔍 Searching for: Amalie Olsen (Denmark)...
🔍 Searching for: Yaropolk Poznanskiy (Ukraine)...

⏱️  Parallel searches completed in 1.53s
   Average per search: 0.31s

====================================================================================================
🤖 ANALYZING ALL USERS IN PARALLEL WITH LLM
====================================================================================================

====================================================================================================
✓ COMPLETED: Amalie Olsen
====================================================================================================
🏆 Amalie Olsen (Sports)
   Country: Denmark
   Achievement: 8-time Danish Track Champion and Paralympic cycling coach
   Details:
     • Paralympic cycling coach
     • Former professional track cyclist
     • 8x Danish Track Champion

====================================================================================================
SUMMARY: 3/5 people with verifiable public records
====================================================================================================

📄 Results saved to: workflow_results_20251117_101129.md

🤝 Contributing

Feel free to submit issues or pull requests for:

  • Additional performance optimizations
  • New test cases
  • Documentation improvements
  • Bug fixes

📝 License

This is an educational project.

🙏 Acknowledgments


About

A high-performance async Python pipeline for researching user achievements through parallel web searches and LLM analysis. Demonstrates best practices for concurrent API orchestration and async/await patterns.

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages