A sophisticated LangGraph-based fraud detection system that leverages parallel AI analysis to detect fraudulent transactions in real-time. Built with Python, FastAPI, and OpenAI's language models, this agent performs multi-dimensional risk assessment through five specialized analyzers running in parallel.
- Overview
- Architecture
- Features
- Installation
- Configuration
- Usage
- API Documentation
- Fraud Detection Methodology
- LangGraph Implementation
- Deployment
- Performance
- Troubleshooting
- Development
The Fraud Detection Agent is an intelligent system that analyzes financial transactions for potential fraud using multiple AI-powered analyzers working in parallel. It provides real-time risk assessment with three possible outcomes: APPROVE, REVIEW, or DECLINE.
- Parallel Analysis: 5 specialized fraud detectors run simultaneously
- LLM-Powered: Uses OpenAI's GPT models for sophisticated pattern recognition
- Flexible Input: Accepts multiple transaction data formats
- Production-Ready: Deployed on Google Cloud Run with monitoring
- Configurable: Adjustable risk thresholds and analyzer weights
- Fault-Tolerant: Built-in retry logic and timeout protection
- Orchestration: LangGraph (state-based parallel workflow)
- AI/ML: OpenAI GPT-4o-mini (configurable)
- Runtime: FastAPI + Uvicorn
- Language: Python 3.13
- Deployment: Google Cloud Run
- Monitoring: Handit AI
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β FastAPI Server (Port 8001) β
β with Handit AI Tracing β
ββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
ββββββββββββββββββββββ
β LangGraphAgent β
β (Main Orchestrator)β
ββββββββββββββ¬ββββββββ
β
ββββββββββββββΌβββββββββββββ
β RiskManagerGraph β
β (StateGraph Based) β
ββββββββββββββ¬βββββββββββββ
β
ββββββββββββββΌβββββββββββββββββββββββββββββββββββββββ
β ORCHESTRATOR NODE (START) β
β Normalizes and enriches transaction data β
ββββββββββββββ¬βββββββββββββββββββββββββββββββββββββββ
β
ββββββββββββββΌβββββββββββββββββββββββββββββββββββββββ
β PARALLEL ANALYZER EXECUTION β
β ββββββββββββββββ ββββββββββββββββ β
β β Pattern β β Behavioral β β
β β Detector β β Analyzer β β
β ββββββββββββββββ ββββββββββββββββ β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββ β
β β Velocity β β Merchant β βGeographicββ
β β Checker β β Risk Analyzerβ β Analyzer ββ
β ββββββββββββββββ ββββββββββββββββ ββββββββββββ β
ββββββββββββββ¬βββββββββββββββββββββββββββββββββββββββ
β
ββββββββββββββΌβββββββββββββββββββββββ
β DECISION AGGREGATOR NODE β
β Combines all analyzer results β
ββββββββββββββ¬βββββββββββββββββββββββ
β
βΌ
ββββββββββββββββββββββββββββββββββ
β Final JSON Decision Output β
β {final_decision, reason, ...} β
ββββββββββββββββββββββββββββββββββ
risk_manager/
βββ main.py # FastAPI application entry point
βββ src/
β βββ agent.py # LangGraphAgent orchestrator
β βββ base.py # Base node classes
β βββ config.py # Configuration and graph topology
β βββ graph/
β β βββ main.py # RiskManagerGraph implementation
β β βββ nodes/
β β βββ nodes.py # Node function definitions
β βββ nodes/
β β βββ llm/ # LLM-based analyzer nodes
β β βββ pattern_detector/ # Fraud pattern detection
β β βββ behavioral_analizer/ # Behavioral anomaly detection
β β βββ velocity_checker/ # Velocity abuse detection
β β βββ merchant_risk_analizer/ # Merchant risk assessment
β β βββ geographic_analizer/ # Geographic fraud detection
β β βββ decision_aggregator/ # Final decision maker
β βββ state/
β β βββ state.py # AgentState definition
β βββ utils/
β βββ openai_client.py # OpenAI client wrapper
βββ use_cases/ # Test cases and examples
βββ requirements.txt # Python dependencies
βββ .env.example # Environment variables template
βββ cloudbuild.yaml # Google Cloud Build config
- 5 analyzers run simultaneously, not sequentially
- Reduces processing time from ~5x to ~1x
- Automatic state merging with LangGraph
Accepts various transaction data formats:
- Simple format (basic fields)
- Complex banking format (comprehensive data)
- Legacy format (backward compatibility)
- Hybrid format (mixed structure)
- Weighted scoring from multiple analyzers
- Configurable risk thresholds
- Evidence-based decision making
- Async/await throughout
- Retry with exponential backoff
- Timeout protection (120s max)
- Health checks and monitoring
| Analyzer | Purpose | Weight | Key Detection Areas |
|---|---|---|---|
| Pattern Detector | Identifies known fraud signatures | 25% | Synthetic identity, account takeover, card testing, money laundering |
| Behavioral Analyzer | Detects user behavior anomalies | 20% | Spending patterns, temporal deviations, merchant preferences |
| Velocity Checker | Catches rapid-fire attacks | 25% | Transaction frequency, amount velocity, failure rates |
| Merchant Risk Analyzer | Assesses merchant trustworthiness | 15% | Merchant category, reputation, fraud history |
| Geographic Analyzer | Detects location-based fraud | 15% | Impossible travel, VPN detection, high-risk regions |
- Python 3.13+
- OpenAI API key
- (Optional) Handit AI API key for monitoring
- (Optional) Google Cloud account for deployment
- Clone the repository
git clone <repository-url>
cd risk_manager- Create virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate- Install dependencies
pip install -r requirements.txt- Set up environment variables
cp .env.example .env
# Edit .env and add your OPENAI_API_KEY- Run the application
python main.pyThe server will start on http://localhost:8001
Create a .env file with the following variables:
# Required
OPENAI_API_KEY=sk-... # Your OpenAI API key
# Optional
HANDIT_API_KEY=... # Handit AI monitoring (optional)
MODEL_NAME=gpt-4o-mini # LLM model to use (default: gpt-4o-mini)
MODEL_PROVIDER=openai # LLM provider (default: mock for testing)
ENVIRONMENT=development # Environment (development/production)
HOST=0.0.0.0 # Server host
PORT=8001 # Server port
LOG_LEVEL=info # Logging level (debug/info/warning/error)Modify risk thresholds in src/config.py:
self.risk_thresholds = {
"decline": 70, # Risk score >= 70: DECLINE
"review": 40, # Risk score >= 40: REVIEW
"approve": 0 # Risk score < 40: APPROVE
}Adjust the importance of each analyzer in src/config.py:
self.analyzer_weights = {
"pattern_detector": 0.25, # 25% weight
"behavioral_analizer": 0.20, # 20% weight
"velocity_checker": 0.25, # 25% weight
"merchant_risk_analizer": 0.15, # 15% weight
"geographic_analizer": 0.15 # 15% weight
}curl -X POST http://localhost:8001/process \
-H "Content-Type: application/json" \
-d '{
"user_id": "john_doe_123",
"amount": 250.50,
"merchant_name": "Amazon",
"user_age_days": 365,
"total_transactions": 150
}'import requests
# Transaction data
transaction = {
"user_id": "customer_123",
"amount": 500.00,
"merchant_name": "Electronics Store",
"user_age_days": 180,
"total_transactions": 50,
"location": "New York, US",
"time": "14:30",
"velocity_counters": {
"transactions_last_hour": 2,
"declined_transactions_last_24h": 0
}
}
# Send request
response = requests.post(
"http://localhost:8001/process",
json=transaction
)
# Parse response
result = response.json()
decision = result["result"]["decision"]["final_decision"]
reason = result["result"]["decision"]["reason"]
print(f"Decision: {decision}")
print(f"Reason: {reason}"){
"result": {
"pattern_detector": "No suspicious patterns detected...",
"behavioral_analizer": "Transaction aligns with user history...",
"velocity_checker": "Normal transaction velocity...",
"merchant_risk_analizer": "Trusted merchant...",
"geographic_analizer": "Location consistent...",
"decision": {
"final_decision": "APPROVE",
"conclusion": "Low-risk transaction from established user",
"recommendations": ["Process transaction normally"],
"reason": "All analyzers indicate low fraud risk..."
}
},
"success": true,
"metadata": {
"agent": "risk_manager",
"framework": "langgraph",
"processing_time_ms": 3250.45
}
}Process a transaction through fraud detection
Request Body (Multiple formats supported):
Simple Format:
{
"user_id": "string",
"amount": 0.0,
"merchant_name": "string",
"user_age_days": 0,
"total_transactions": 0
}Complex Banking Format:
{
"transaction": {
"transaction_id": "string",
"amount": 0.0,
"currency": "USD"
},
"merchant": {
"merchant_name": "string",
"merchant_category_code": "string"
},
"customer": {
"user_id": "string",
"user_age_days": 0
},
"velocity_counters": {
"transactions_last_hour": 0,
"declined_transactions_last_24h": 0
}
}Response: ProcessResponse with analyzer results and decision
Health check endpoint
Response:
{
"status": "healthy",
"agent": "risk_manager",
"framework": "langgraph"
}Get graph structure information
Response: Graph topology and node information
The system employs a multi-layered defense strategy:
- Pattern Recognition: Identifies known fraud signatures
- Behavioral Analysis: Detects anomalies from established patterns
- Velocity Monitoring: Catches rapid-fire attacks
- Merchant Assessment: Evaluates merchant trustworthiness
- Geographic Verification: Validates location consistency
- Amount anomalies (unusually high amounts)
- Time-based risks (transactions at 2-5 AM)
- New user risks (account age < 7 days)
- Authentication failures
- Spending pattern deviations
- Merchant preference changes
- Location inconsistencies
- Device/channel changes
- High transaction frequency
- Escalating amounts
- Multiple declined attempts
- Rapid account changes
Risk Score Calculation:
ββ Each analyzer returns risk assessment
ββ Scores are weighted by importance
ββ Final score = Weighted average (0-100)
ββ Decision based on thresholds:
ββ Score < 40: APPROVE β
ββ Score 40-69: REVIEW π
ββ Score >= 70: DECLINE β
- Pattern Detector: "Account takeover signature detected"
- Behavioral: "Significant deviation from normal behavior"
- Geographic: "Login from different country"
- Decision: DECLINE
- Pattern Detector: "Multiple small transactions pattern"
- Velocity: "10 transactions in 5 minutes"
- Merchant: "Multiple different merchants"
- Decision: DECLINE
- Pattern Detector: "No fraud patterns"
- Behavioral: "Consistent with user profile"
- Merchant: "Trusted merchant"
- Decision: APPROVE
LangGraph provides the perfect framework for our parallel fraud detection:
- State Management: Automatic state merging from parallel branches
- Graph Structure: Visual workflow representation
- Error Handling: Built-in retry and error recovery
- Async Support: Native async/await for high performance
The graph topology is defined in src/config.py:
graph_config = {
"nodes": {
"orchestrator": {...}, # Entry point
"pattern_detector": {...}, # Parallel analyzer 1
"behavioral_analizer": {...}, # Parallel analyzer 2
"velocity_checker": {...}, # Parallel analyzer 3
"merchant_risk_analizer": {...}, # Parallel analyzer 4
"geographic_analizer": {...}, # Parallel analyzer 5
"decision_aggregator": {...}, # Convergence point
"finalizer": {...} # Final processing
},
"edges": [
# Fan-out (1 to many)
{"from": "orchestrator", "to": "pattern_detector"},
{"from": "orchestrator", "to": "behavioral_analizer"},
{"from": "orchestrator", "to": "velocity_checker"},
{"from": "orchestrator", "to": "merchant_risk_analizer"},
{"from": "orchestrator", "to": "geographic_analizer"},
# Fan-in (many to 1)
{"from": "pattern_detector", "to": "decision_aggregator"},
{"from": "behavioral_analizer", "to": "decision_aggregator"},
{"from": "velocity_checker", "to": "decision_aggregator"},
{"from": "merchant_risk_analizer", "to": "decision_aggregator"},
{"from": "geographic_analizer", "to": "decision_aggregator"},
# Sequential
{"from": "decision_aggregator", "to": "finalizer"}
]
}The AgentState (TypedDict) manages data flow:
class AgentState(TypedDict):
input: Dict[str, Any]
transaction_data: Dict[str, Any]
enriched_transaction: Dict[str, Any]
analyzer_results: Annotated[Dict, merge_dicts] # Parallel merge
risk_scores: Annotated[Dict, merge_dicts] # Parallel merge
completed_analyzers: List[str]
final_decision: str
results: Annotated[Dict, merge_dicts] # Parallel merge1. Orchestrator prepares transaction data
2. LangGraph spawns 5 parallel branches
3. Each analyzer runs independently
4. Results automatically merge (Annotated fields)
5. Decision aggregator receives all results
6. Final decision generated
The application is configured for Google Cloud Run deployment with automatic CI/CD.
- Google Cloud Project
- Cloud Build API enabled
- Artifact Registry repository created
- Configure secrets in Google Cloud
echo "your-openai-key" | gcloud secrets create OPENAI_API_KEY --data-file=--
Update
cloudbuild.yamlwith your project details -
Deploy using Cloud Build
gcloud builds submit --config=cloudbuild.yaml- Access the deployed service
gcloud run services describe transaction-validation-agent --region=us-central1Build and run with Docker:
# Build image
docker build -t fraud-detection-agent .
# Run container
docker run -p 8001:8001 \
-e OPENAI_API_KEY=your-key \
fraud-detection-agentRecommended Cloud Run settings:
- Memory: 4 GiB (for LLM processing)
- CPU: 2 vCPUs
- Timeout: 600 seconds
- Concurrency: 10 requests
- Min instances: 0 (scale to zero)
- Max instances: 10 (adjust based on load)
| Metric | Value | Notes |
|---|---|---|
| Average Processing Time | 3-4 seconds | For complete analysis |
| Parallel Speedup | ~5x | Compared to sequential |
| Throughput | 15-20 req/min | Per instance |
| Timeout Rate | < 1% | With 120s timeout |
| Success Rate | > 99% | With retry logic |
- Reduce Analyzer Count: Disable less critical analyzers for speed
- Adjust Timeouts: Lower timeout for faster failure detection
- Cache Results: Implement Redis for repeat transactions
- Batch Processing: Process multiple transactions together
- Model Selection: Use faster models (gpt-3.5-turbo) for lower latency
- Minimum: 2 GB RAM, 1 vCPU
- Recommended: 4 GB RAM, 2 vCPUs
- Network: Low bandwidth, ~1 KB per request
- Storage: Minimal, logs only
Error: OpenAI API key not found
Solution: Ensure OPENAI_API_KEY is set in environment
Error: asyncio.TimeoutError
Solution:
- Increase timeout in
src/graph/main.py - Check OpenAI API status
- Reduce parallel analyzer count
Symptoms: Requests taking > 10 seconds Solutions:
- Switch to faster model (gpt-3.5-turbo)
- Reduce prompt complexity
- Implement caching layer
Error: Out of memory
Solution: Increase Cloud Run memory allocation
Enable debug logging:
LOG_LEVEL=debug python main.pyView detailed traces:
- Check Handit AI dashboard (if configured)
- Review application logs
- Use
/graph/infoendpoint for graph details
- Create analyzer directory
mkdir src/nodes/llm/new_analyzer- Implement processor
# src/nodes/llm/new_analyzer/processor.py
from src.base import BaseLLMNode
class NewAnalyzerLLMNode(BaseLLMNode):
async def run(self, input_data):
# Implementation
pass- Define prompts
# src/nodes/llm/new_analyzer/prompts.py
def get_prompts():
return {
"system": "You are an expert...",
"user_template": "Analyze..."
}- Register in config
# src/config.py
# Add to graph_config nodes and edges- Add node function
# src/graph/nodes/nodes.py
async def new_analyzer_node(state):
# Node implementation
passExecute test suite:
# Run all tests
pytest
# Run with coverage
pytest --cov=src
# Run specific test file
pytest tests/test_fraud_detection.pyTest with provided use cases:
# Run all use cases
python run_use_cases.py
# Run specific file
python run_use_cases.py --file use_cases/SOPHISTICATED_FRAUD_CASES.json
# Generate report
python run_use_cases.py --reportMaintain code quality:
# Format code
black .
# Lint
flake8 src/
# Type checking
mypy src/This project is proprietary software. All rights reserved.
Please follow these guidelines:
- Fork the repository
- Create feature branch (
git checkout -b feature/AmazingFeature) - Commit changes (
git commit -m 'Add AmazingFeature') - Push to branch (
git push origin feature/AmazingFeature) - Open Pull Request
- Documentation: Handit.ai Documentation
- LangGraph: LangGraph Documentation
- Issues: Open an issue in the repository
- Built with LangGraph for orchestration
- Powered by OpenAI for AI analysis
- Monitored by Handit AI for observability
- Deployed on Google Cloud Run for scalability
Version: 1.0.0 Last Updated: January 2025 Status: Production Ready