A production-grade machine learning model monitoring and observability platform that detects data drift, concept drift, and performance degradation in real-time.
┌─────────────────┐
│ ML Models │
│ (Production) │
└────────┬────────┘
│ Prediction Logs
▼
┌─────────────────────────────────────────────────────────┐
│ FastAPI Backend │
├─────────────────────────────────────────────────────────┤
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Ingestion │ │ Drift │ │ Alerting │ │
│ │ Service │ │ Detection │ │ Service │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Performance │ │ Statistics │ │ Dashboard │ │
│ │ Monitoring │ │ Engine │ │ API │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────┘
│ │
▼ ▼
┌──────────────────┐ ┌──────────────────┐
│ PostgreSQL │ │ Redis │
│ (Prediction │ │ (Statistics │
│ Logs & Stats) │ │ Cache) │
└──────────────────┘ └──────────────────┘
- Ingestion Service: Receives and validates prediction logs from production models
- Drift Detection Engine: Implements statistical tests for distribution changes
- Performance Monitor: Tracks accuracy and performance metrics over time
- Alerting Service: Threshold-based alerting with configurable rules
- Statistics Engine: Computes and caches metrics using Redis
- Dashboard API: Exposes monitoring data for visualization
Method: Non-parametric test comparing cumulative distribution functions (CDFs)
Implementation:
- Compares empirical CDFs of baseline vs. production data
- Statistic: Maximum vertical distance between CDFs
- P-value threshold: typically 0.05
Advantages:
- Distribution-free (no assumptions about data shape)
- Sensitive to location and shape differences
- Well-established statistical test
Limitations:
- Works only for continuous/numerical features
- Less sensitive to tail differences
- Requires sufficient sample size (n > 30 recommended)
Use Case: Primary drift detector for numerical features
Method: Measures distribution shift using binned data
Formula:
PSI = Σ (Actual% - Expected%) × ln(Actual% / Expected%)
Interpretation:
- PSI < 0.1: No significant change
- 0.1 ≤ PSI < 0.25: Moderate change, investigate
- PSI ≥ 0.25: Significant change, action required
Advantages:
- Industry-standard in credit scoring and finance
- Works with both numerical and categorical features
- Intuitive interpretation
- Less sensitive to sample size
Limitations:
- Binning strategy affects results
- Can miss subtle drifts within bins
- Requires careful bin selection (typically 10-20 bins)
Use Case: Feature stability monitoring, categorical drift detection
Method: Tests independence between observed and expected frequencies
Implementation:
- Compares category distributions
- Tests null hypothesis: distributions are identical
Advantages:
- Standard test for categorical data
- Provides p-value for statistical significance
Use Case: Categorical feature drift detection
- Data Drift Alert: Triggered when feature distributions shift significantly
- Concept Drift Alert: Triggered when model performance degrades
- Volume Alert: Triggered by unusual prediction volume patterns
# config/thresholds.yaml
drift_detection:
ks_test_threshold: 0.05 # p-value threshold
psi_threshold: 0.25 # PSI critical value
chi_square_threshold: 0.05 # p-value threshold
performance:
accuracy_drop_threshold: 0.05 # 5% drop triggers alert
min_samples_for_eval: 100 # Minimum samples before evaluation
volume:
prediction_volume_std: 3.0 # Standard deviations for anomalySchema Design:
-- Prediction logs with partitioning by date
predictions (
id, model_id, timestamp, features_json,
prediction, ground_truth, created_at
) PARTITION BY RANGE (timestamp);
-- Baseline distributions for drift comparison
baselines (
model_id, feature_name, distribution_stats,
version, created_at
);
-- Drift detection results
drift_results (
model_id, feature_name, test_type,
statistic, p_value, timestamp, is_drifted
);
-- Performance metrics over time
performance_metrics (
model_id, window_start, window_end,
accuracy, precision, recall, sample_count
);Optimizations:
- Time-series partitioning for efficient queries
- Indexes on model_id, timestamp, and composite keys
- Retention policies (e.g., raw logs: 90 days, aggregates: 1 year)
Cached Data:
- Recent statistics (1-hour, 24-hour windows)
- Baseline distribution summaries
- Alert states and counts
- Real-time prediction volumes
TTL Strategy:
- Real-time metrics: 5 minutes
- Hourly aggregates: 1 hour
- Daily aggregates: 24 hours
- Buffer incoming predictions in Redis
- Process drift detection in scheduled batches (e.g., every 15 minutes)
- Reduces database load and improves latency
- Use reservoir sampling for large volumes (>10K predictions/hour)
- Maintains statistical validity while reducing computation
- Configurable sampling rate based on traffic
- Use Celery/RQ for background tasks:
- Drift detection computation
- Performance metric aggregation
- Alert generation and delivery
- Non-blocking API responses
- Stateless API design enables load balancing
- Redis cluster for distributed caching
- PostgreSQL read replicas for dashboard queries
| Workload | Throughput | Latency (p95) |
|---|---|---|
| Prediction ingestion | 10K req/s | < 50ms |
| Drift detection (batch) | 1M samples | < 5 min |
| Dashboard queries | 100 req/s | < 200ms |
Hardware Assumptions: 4 CPU cores, 16GB RAM, SSD storage
# Database
DATABASE_URL=postgresql://user:pass@localhost:5432/ml_observability
REDIS_URL=redis://localhost:6379/0
# API
API_HOST=0.0.0.0
API_PORT=8000
LOG_LEVEL=INFO
# Drift Detection
KS_TEST_THRESHOLD=0.05
PSI_THRESHOLD=0.25
DRIFT_CHECK_INTERVAL=900 # seconds (15 minutes)
# Performance Monitoring
MIN_SAMPLES_FOR_EVAL=100
ACCURACY_DROP_THRESHOLD=0.05
# Alerting
ALERT_WEBHOOK_URL=https://your-webhook.com/alerts
ALERT_EMAIL=alerts@yourcompany.com# Clone repository
git clone https://github.com/yourusername/model-observability-system.git
cd model-observability-system
# Create virtual environment
python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate
# Install dependencies
pip install -r requirements.txt
# Set up database
alembic upgrade head
# Start Redis
docker run -d -p 6379:6379 redis:7-alpine
# Run application
uvicorn app.main:app --host 0.0.0.0 --port 8000 --reloadimport requests
# Log a prediction
response = requests.post(
"http://localhost:8000/api/v1/predictions",
json={
"model_id": "fraud_detector_v2",
"features": {
"transaction_amount": 150.50,
"merchant_category": "retail",
"user_age": 35,
"account_age_days": 450
},
"prediction": 0, # 0: legitimate, 1: fraud
"ground_truth": None # Will be updated later
}
)# Upload training data distribution as baseline
response = requests.post(
"http://localhost:8000/api/v1/baselines",
json={
"model_id": "fraud_detector_v2",
"version": "v2.1.0",
"features": {
"transaction_amount": {
"type": "numerical",
"mean": 125.30,
"std": 89.45,
"min": 0.01,
"max": 9999.99,
"samples": [/* sample data */]
},
"merchant_category": {
"type": "categorical",
"distribution": {
"retail": 0.35,
"food": 0.25,
"online": 0.20,
"other": 0.20
}
}
}
}
)# Get drift metrics for a model
curl http://localhost:8000/api/v1/drift/fraud_detector_v2
# Response
{
"model_id": "fraud_detector_v2",
"timestamp": "2026-01-15T10:30:00Z",
"features": {
"transaction_amount": {
"ks_statistic": 0.08,
"ks_p_value": 0.15,
"psi": 0.12,
"is_drifted": false,
"drift_severity": "low"
},
"merchant_category": {
"chi_square_statistic": 12.5,
"chi_square_p_value": 0.006,
"psi": 0.28,
"is_drifted": true,
"drift_severity": "high"
}
},
"alerts": [
{
"type": "data_drift",
"feature": "merchant_category",
"severity": "high",
"message": "Significant distribution shift detected"
}
]
}POST /api/v1/predictions- Log a single predictionPOST /api/v1/predictions/batch- Log multiple predictionsPATCH /api/v1/predictions/{id}/ground-truth- Update ground truth
POST /api/v1/baselines- Register baseline distributionGET /api/v1/baselines/{model_id}- Get current baselinePUT /api/v1/baselines/{model_id}- Update baseline
GET /api/v1/drift/{model_id}- Get current drift statusGET /api/v1/drift/{model_id}/history- Historical drift trendsPOST /api/v1/drift/{model_id}/check- Trigger manual drift check
GET /api/v1/performance/{model_id}- Current performance metricsGET /api/v1/performance/{model_id}/trends- Performance over time
GET /api/v1/alerts- List active alertsGET /api/v1/alerts/{model_id}- Alerts for specific modelPOST /api/v1/alerts/acknowledge/{alert_id}- Acknowledge alert
GET /health- Health checkGET /metrics- Prometheus metrics
-
Statistical Power
- Drift tests require sufficient sample size (typically 100+ samples)
- Low-traffic models may have delayed drift detection
- Mitigation: Configurable time windows and sample thresholds
-
Feature Engineering
- System monitors input features, not engineered features
- Drift in engineered features requires separate tracking
- Mitigation: Log both raw and engineered features if needed
-
Multivariate Drift
- Univariate tests may miss interactions between features
- Correlation drift not directly captured
- Future Enhancement: Implement multivariate drift tests (e.g., MMD)
-
Ground Truth Latency
- Performance monitoring requires ground truth labels
- Labels may arrive hours/days after prediction
- Mitigation: Separate real-time drift detection from delayed performance monitoring
-
High Cardinality Categorical Features
- PSI and Chi-square less effective with 100+ categories
- Requires grouping strategies
- Mitigation: Auto-group rare categories, use entropy-based metrics
| Choice | Benefit | Trade-off |
|---|---|---|
| Batch drift detection | Lower compute cost, better batching | 15-min detection delay |
| Redis caching | Fast dashboard queries | Eventual consistency |
| PostgreSQL | ACID guarantees, SQL queries | Harder to scale writes |
| Univariate tests | Simple, interpretable | Misses feature interactions |
| Fixed time windows | Consistent evaluation | May miss sudden spikes |
# Build and run with docker-compose
docker-compose up -d
# Scale API workers
docker-compose up -d --scale api=3# Deploy to Kubernetes
kubectl apply -f k8s/
# Components:
# - API deployment (3 replicas)
# - Background worker deployment (2 replicas)
# - PostgreSQL StatefulSet
# - Redis deployment
# - Nginx ingressMetrics Exported:
- Prediction ingestion rate
- Drift detection computation time
- Alert generation rate
- Database query latency
- Cache hit rate
Logging:
- Structured JSON logging
- Request/response logging
- Error tracking with stack traces
- Audit logs for baseline changes
Recommended Stack:
- Prometheus + Grafana for metrics
- ELK/Loki for log aggregation
- PagerDuty/Opsgenie for alert routing
# Run unit tests
pytest tests/unit -v
# Run integration tests
pytest tests/integration -v
# Run with coverage
pytest --cov=app --cov-report=html
# Load testing
locust -f tests/load/locustfile.pySee CONTRIBUTING.md for development guidelines.
MIT License - see LICENSE file for details.
Note: This is a production-grade framework. Customize thresholds, time windows, and alerting channels based on your specific use case and SLAs.