This document provides security best practices, mitigation strategies, and guidelines for securing the AI Agent Connector system. It complements the Threat Model by providing actionable security measures.
- Authentication and Authorization
- Data Protection
- API Security
- Database Security
- Web Interface Security
- Deployment Security
- Monitoring and Incident Response
- Compliance and Best Practices
- Bug Bounty Program
-
Generate Strong API Keys
# Use cryptographically secure random generation import secrets api_key = secrets.token_urlsafe(32) # 32 bytes = 43 characters
-
Store API Keys Securely
- Hash API keys using bcrypt or Argon2
- Never store plaintext API keys
- Use environment variables or secret management services
-
Implement API Key Rotation
# Example: Rotate keys every 90 days def rotate_api_key(agent_id: str): new_key = generate_api_key() # Revoke old key revoke_api_key(agent_id, old_key) # Issue new key issue_api_key(agent_id, new_key) return new_key
-
Enforce Key Expiration
- Set expiration dates for API keys
- Implement automatic key rotation
- Notify users before expiration
-
Rate Limiting per API Key
# Implement rate limiting from flask_limiter import Limiter limiter = Limiter( app, key_func=lambda: request.headers.get('X-API-Key'), default_limits=["1000 per hour", "100 per minute"] )
- ✅ HTTPS Enforcement: Require TLS 1.2+ for all API communications
- ✅ Key Scoping: Limit API key permissions to minimum required
- ✅ Audit Logging: Log all API key usage and authentication attempts
- ✅ Revocation: Immediate revocation capability for compromised keys
- ✅ Key Rotation Policy: Rotate keys every 90 days or after security incidents
-
Validate Agent IDs
# Use UUIDs for agent IDs import uuid agent_id = str(uuid.uuid4()) # Validate format def validate_agent_id(agent_id: str) -> bool: try: uuid.UUID(agent_id) return True except ValueError: return False
-
Implement Multi-Factor Authentication (MFA) for Admins
- Require MFA for admin operations
- Use TOTP (Time-based One-Time Password) or hardware tokens
- Store MFA secrets securely
-
Session Management
# Secure session configuration app.config['SESSION_COOKIE_SECURE'] = True # HTTPS only app.config['SESSION_COOKIE_HTTPONLY'] = True # Prevent XSS app.config['SESSION_COOKIE_SAMESITE'] = 'Lax' # CSRF protection app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(hours=8)
-
Database Credentials Encryption
# Use Fernet symmetric encryption from cryptography.fernet import Fernet # Generate key (do this once, store securely) key = Fernet.generate_key() # Encrypt credentials def encrypt_credentials(credentials: str, key: bytes) -> bytes: f = Fernet(key) return f.encrypt(credentials.encode()) # Decrypt credentials def decrypt_credentials(encrypted: bytes, key: bytes) -> str: f = Fernet(key) return f.decrypt(encrypted).decode()
-
Encryption Key Management
- DO NOT hardcode encryption keys
- Use environment variables or secret management services
- Rotate encryption keys regularly
- Store keys separately from encrypted data
-
Secret Management Services
- AWS: AWS Secrets Manager
- Azure: Azure Key Vault
- GCP: Google Secret Manager
- HashiCorp: Vault
-
TLS Configuration
# Enforce TLS 1.2+ import ssl context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) context.minimum_version = ssl.TLSVersion.TLSv1_2 context.set_ciphers('HIGH:!aNULL:!eNULL')
-
Database Connection Security
- Always use TLS for database connections
- Verify SSL certificates
- Use connection strings with SSL parameters
-
Never Log Credentials
# BAD: Logging credentials logger.info(f"Connecting with password: {password}") # GOOD: Mask credentials logger.info(f"Connecting to database: {host}:{port}")
-
Mask Credentials in Responses
def mask_credentials(config: dict) -> dict: masked = config.copy() if 'password' in masked: masked['password'] = '***' if 'connection_string' in masked: # Mask password in connection string masked['connection_string'] = mask_connection_string( masked['connection_string'] ) return masked
-
Credential Rotation
- Support credential rotation without service disruption
- Validate new credentials before switching
- Keep old credentials temporarily for rollback
-
Always Use Parameterized Queries
# BAD: String concatenation query = f"SELECT * FROM users WHERE id = {user_id}" # GOOD: Parameterized query query = "SELECT * FROM users WHERE id = %s" params = (user_id,) connector.execute_query(query, params)
-
Validate and Sanitize Inputs
import re def validate_sql_input(value: str) -> bool: # Check for SQL injection patterns dangerous_patterns = [ r"(\bUNION\b|\bSELECT\b|\bINSERT\b|\bDELETE\b|\bDROP\b)", r"(--|;|\*|')", r"(\bor\b|\band\b)\s+\d+\s*=\s*\d+" ] for pattern in dangerous_patterns: if re.search(pattern, value, re.IGNORECASE): return False return True
-
Query Validation
# Validate query structure before execution def validate_query(query: str) -> tuple[bool, str]: # Parse query # Check for dangerous operations dangerous_ops = ['DROP', 'ALTER', 'TRUNCATE', 'CREATE', 'GRANT'] query_upper = query.upper().strip() for op in dangerous_ops: if query_upper.startswith(op): return False, f"Dangerous operation not allowed: {op}" return True, ""
-
Query Complexity Limits
# Limit query complexity MAX_QUERY_LENGTH = 10000 MAX_JOINS = 10 MAX_SUBQUERIES = 5 def check_query_complexity(query: str) -> bool: if len(query) > MAX_QUERY_LENGTH: return False # Count joins, subqueries, etc. return True
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
# Global rate limiting
limiter = Limiter(
app,
key_func=get_remote_address,
default_limits=["1000 per hour", "100 per minute"]
)
# Per-endpoint rate limiting
@app.route('/api/agents/<agent_id>/query', methods=['POST'])
@limiter.limit("60 per minute") # 60 queries per minute
def execute_query(agent_id: str):
# ...- IP-based: Limit requests per IP address
- API Key-based: Limit requests per API key
- Agent-based: Limit requests per agent ID
- Endpoint-based: Different limits for different endpoints
from flask_cors import CORS
# Restrict CORS to trusted origins
CORS(app, origins=[
"https://trusted-domain.com",
"https://app.example.com"
])-
Use Least Privilege Database Accounts
-- Create read-only user CREATE USER agent_readonly WITH PASSWORD 'secure_password'; GRANT SELECT ON ALL TABLES IN SCHEMA public TO agent_readonly; -- Create read-write user (if needed) CREATE USER agent_rw WITH PASSWORD 'secure_password'; GRANT SELECT, INSERT, UPDATE ON ALL TABLES IN SCHEMA public TO agent_rw;
-
Enable SSL/TLS for Database Connections
# PostgreSQL with SSL connection_string = ( "postgresql://user:pass@host:5432/db" "?sslmode=require" )
-
Connection Pooling Security
# Limit pool size to prevent exhaustion pool_config = { 'min_size': 2, 'max_size': 10, 'max_queries': 50000, 'max_inactive_connection_lifetime': 300 }
-
Enforce Query Timeouts
# Set query timeout connector.execute_query( query, params, timeout=30 # 30 seconds )
-
Limit Result Set Size
MAX_RESULT_ROWS = 10000 def execute_query_safe(query: str, params: tuple): # Add LIMIT if not present if 'LIMIT' not in query.upper(): query = f"{query} LIMIT {MAX_RESULT_ROWS}" return connector.execute_query(query, params)
-
Row-Level Security (RLS)
# Apply RLS rules from ai_agent_connector.app.utils.row_level_security import RowLevelSecurity rls = RowLevelSecurity() rls.add_rule( agent_id="agent-001", table="users", rule_type=RLSRuleType.FILTER, condition="user_id = current_user_id()" )
-
Column Masking
# Mask sensitive columns from ai_agent_connector.app.utils.column_masking import ColumnMasker masker = ColumnMasker() masker.add_rule( table="users", column="ssn", masking_type=MaskingType.HASH )
-
Input Sanitization
from markupsafe import escape # Escape user input user_input = escape(request.form.get('input'))
-
Output Encoding
{# In Jinja2 templates #} {{ user_input|e }} {# Escape output #}
-
Content Security Policy (CSP)
@app.after_request def set_security_headers(response): response.headers['Content-Security-Policy'] = ( "default-src 'self'; " "script-src 'self' 'unsafe-inline'; " "style-src 'self' 'unsafe-inline';" ) return response
from flask_wtf.csrf import CSRFProtect
csrf = CSRFProtect(app)
# In forms
<form method="POST">
{{ csrf_token() }}
<!-- form fields -->
</form>@app.after_request
def set_security_headers(response):
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['X-Frame-Options'] = 'DENY'
response.headers['X-XSS-Protection'] = '1; mode=block'
response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains'
response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin'
return response-
Environment Variables
# .env file (DO NOT commit to version control) ENCRYPTION_KEY=<generated-key> SECRET_KEY=<flask-secret-key> DATABASE_URL=<connection-string> OPENAI_API_KEY=<api-key> FLASK_ENV=production
-
Secret Management
# Use secret management service import boto3 def get_secret(secret_name: str) -> str: client = boto3.client('secretsmanager') response = client.get_secret_value(SecretId=secret_name) return response['SecretString']
-
AWS Lambda
# template.yaml Resources: ApiFunction: Type: AWS::Serverless::Function Properties: Environment: Variables: ENCRYPTION_KEY: !Ref EncryptionKeySecret Policies: - SecretsManagerReadWrite: SecretArn: !Ref DatabaseSecret
-
IAM Roles
- Use least privilege principle
- Grant only necessary permissions
- Use separate roles for different functions
-
VPC Configuration
- Deploy Lambda in VPC for database access
- Use security groups to restrict access
- Use private subnets for databases
-
Dockerfile Security
# Use non-root user RUN useradd -m -u 1000 appuser USER appuser # Don't expose secrets # Use secrets management
-
Image Scanning
- Scan images for vulnerabilities
- Use minimal base images
- Keep images updated
-
Audit Logging
# Log all security-relevant events audit_logger.log( action_type=ActionType.AUTHENTICATION_FAILED, agent_id=None, status="error", details={ "ip": request.remote_addr, "user_agent": request.headers.get('User-Agent'), "reason": "Invalid API key" } )
-
Security Event Monitoring
# Monitor for suspicious activity security_monitor.check_for_anomalies( agent_id=agent_id, action_type="query_execution", metadata={"query": query, "tables": tables} )
-
Alerting
# Alert on security events if failed_attempts > 5: alert_manager.send_alert( severity=AlertSeverity.HIGH, message="Multiple failed authentication attempts", details={"ip": ip, "count": failed_attempts} )
-
Incident Response Plan
- Document incident response procedures
- Define roles and responsibilities
- Establish communication channels
- Create runbooks for common incidents
-
Key Revocation Procedure
def revoke_compromised_key(agent_id: str): # Revoke API key agent_registry.revoke_agent(agent_id) # Log incident audit_logger.log( action_type=ActionType.AGENT_REVOKED, agent_id=agent_id, status="success", details={"reason": "Security incident"} ) # Alert security team alert_manager.send_alert( severity=AlertSeverity.CRITICAL, message=f"Agent {agent_id} revoked due to security incident" )
-
Forensics and Investigation
- Preserve audit logs
- Document timeline of events
- Analyze attack vectors
- Implement additional mitigations
- Strong API key generation (32+ bytes)
- API key hashing and secure storage
- API key rotation policy (90 days)
- MFA for admin accounts
- Session timeout configuration
- Secure session cookies
- Encryption at rest for credentials
- Encryption in transit (TLS 1.2+)
- Secret management service integration
- Credential masking in logs/responses
- Encryption key rotation
- Parameterized queries (SQL injection prevention)
- Input validation and sanitization
- Rate limiting (multi-layer)
- Query complexity limits
- Result size limits
- CORS configuration
- Least privilege database accounts
- SSL/TLS for database connections
- Connection pool limits
- Query timeouts
- Row-level security
- Column masking
- XSS prevention (input sanitization, output encoding)
- CSRF protection
- Security headers (CSP, X-Frame-Options, etc.)
- Secure cookie configuration
- Environment variable security
- Secret management
- IAM role configuration (least privilege)
- VPC and network security
- Container security
- Image vulnerability scanning
- Comprehensive audit logging
- Security event monitoring
- Alerting for security events
- Incident response plan
- Log retention policy
-
GDPR (General Data Protection Regulation)
- Data residency rules
- Data retention policies
- Right to deletion
- Audit log anonymization
-
SOC 2
- Access controls
- Audit logging
- Change management
- Incident response
-
PCI DSS (if handling payment data)
- Encryption requirements
- Access controls
- Network segmentation
- Regular security testing
-
Quarterly
- Threat model review
- Security control assessment
- Access review
- Security training
-
Annually
- Penetration testing
- Security audit
- Disaster recovery testing
- Compliance review
-
Ongoing
- Vulnerability scanning
- Dependency updates
- Security monitoring
- Incident response
- Threat Model - Comprehensive threat analysis
- Security team contact: security@example.com
- Incident reporting: security-incidents@example.com
We value the security research community and encourage responsible disclosure of security vulnerabilities. This bug bounty program provides recognition and rewards for security researchers who help improve the security of the AI Agent Connector.
The AI Agent Connector Bug Bounty Program is designed to:
- Encourage responsible disclosure of security vulnerabilities
- Recognize and reward security researchers
- Improve the overall security posture of the system
- Build a collaborative relationship with the security community
The following are eligible for bug bounty rewards:
Web Application
- Authentication and authorization bypasses
- SQL injection vulnerabilities
- Cross-site scripting (XSS)
- Cross-site request forgery (CSRF)
- Server-side request forgery (SSRF)
- Remote code execution (RCE)
- Insecure direct object references (IDOR)
- Privilege escalation
- Information disclosure
- API security vulnerabilities
Infrastructure
- Configuration vulnerabilities
- Secrets exposure
- Network security issues
- Deployment misconfigurations
Cryptography
- Weak encryption implementation
- Key management vulnerabilities
- Certificate validation issues
The following are NOT eligible for bug bounty rewards:
- Denial of Service (DoS) attacks
- Distributed Denial of Service (DDoS) attacks
- Social engineering attacks
- Physical security issues
- Issues requiring physical access to devices
- Spam or phishing attacks
- Issues in third-party applications or services
- Issues in dependencies (report to maintainers)
- Self-XSS (user must be logged in and perform action)
- Clickjacking on pages without sensitive actions
- Missing security headers without demonstrated impact
- Issues requiring unlikely user interaction
- Content spoofing without security impact
- Rate limiting or brute force attacks
- Issues in development/staging environments
- Issues already known to us or publicly disclosed
- Issues found through automated scanning tools without manual verification
-
Do Not:
- Access, modify, or delete data that does not belong to you
- Perform any action that could harm our users or systems
- Disrupt or degrade our services
- Violate any laws or breach any agreements
- Disclose vulnerabilities publicly before we've addressed them
- Use automated scanners that generate significant traffic
- Test on production systems without authorization
-
Do:
- Act in good faith
- Report vulnerabilities as soon as possible
- Provide detailed, reproducible reports
- Respect user privacy and data
- Follow responsible disclosure practices
- Test only on systems you have permission to test
- Any form of denial of service attack
- Physical attacks against infrastructure
- Social engineering of employees or users
- Accessing accounts or data that doesn't belong to you
- Modifying or destroying data
- Spamming or phishing
- Any illegal activity
Submit your vulnerability report via email to: security@example.com
Include the following information:
-
Vulnerability Details
- Type of vulnerability
- Affected component/endpoint
- Steps to reproduce (detailed)
- Proof of concept (if applicable)
- Potential impact
-
Evidence
- Screenshots or videos demonstrating the issue
- Code snippets or payloads (if applicable)
- Network traffic captures (if relevant)
-
Impact Assessment
- Potential security impact
- Affected users or data
- Exploitability assessment
-
Your Information
- Your name or handle
- Contact information
- Preferred method of recognition (if applicable)
Report Template:
Subject: [Bug Bounty] [Severity] Brief Description
Vulnerability Type: [e.g., SQL Injection, XSS, etc.]
Severity: [Critical/High/Medium/Low]
Affected Component: [e.g., /api/agents/register]
Description:
[Detailed description of the vulnerability]
Steps to Reproduce:
1. [Step 1]
2. [Step 2]
3. [Step 3]
Proof of Concept:
[Code, payload, or detailed explanation]
Impact:
[Potential security impact]
Suggested Fix:
[Optional: suggestions for fixing the issue]
Contact Information:
[Your name/handle and contact details]
- Within 24 hours: We will acknowledge receipt of your report
- Within 3 business days: We will provide an initial assessment
- Within 7 business days: We will provide a detailed response with our assessment
- We will investigate the vulnerability
- We will work on developing and testing a fix
- We will keep you updated on our progress
- Typical resolution time: 30-90 days depending on severity
- Once fixed, we will notify you
- We will add you to our Hall of Fame (if you consent)
- We will process any applicable rewards
- We may request your assistance in verifying the fix
Vulnerabilities are classified based on their potential impact:
- Remote code execution
- SQL injection with data extraction
- Authentication bypass
- Privilege escalation to admin
- Reward: $500 - $2,000
- SQL injection (read-only)
- Cross-site scripting (stored)
- Server-side request forgery
- Insecure direct object references
- Reward: $200 - $500
- Cross-site scripting (reflected)
- Cross-site request forgery
- Information disclosure
- Weak authentication mechanisms
- Reward: $50 - $200
- Missing security headers
- Information leakage (low impact)
- Weak session management
- Reward: Recognition only
Note: Reward amounts are guidelines and may vary based on:
- Quality of the report
- Impact and exploitability
- Quality of suggested fixes
- First-time vs. duplicate reports
Security researchers who responsibly disclose vulnerabilities will be recognized in our Hall of Fame (with consent). Recognition includes:
- Name or handle (as preferred)
- Date of disclosure
- Vulnerability type
- Severity level
- Monetary Rewards: For Critical, High, and Medium severity vulnerabilities
- Recognition: Hall of Fame listing for all valid reports
- Swag: Special recognition for exceptional contributions
- Public Acknowledgment: With permission, we may publicly acknowledge your contribution
- Rewards are paid via PayPal, bank transfer, or cryptocurrency (as preferred)
- Payment is made after the vulnerability is fixed and verified
- Typical payment time: 30-60 days after fix verification
We recognize and thank the following security researchers for their responsible disclosure of vulnerabilities:
No entries yet - be the first!
To be included in the Hall of Fame:
- Submit a valid security vulnerability
- Follow responsible disclosure practices
- Provide consent for public recognition
- Allow us to fix the vulnerability before public disclosure
Good Faith Testing
We provide safe harbor for security research conducted in good faith. As long as you:
- Act in good faith
- Follow this security policy
- Do not access or modify data that doesn't belong to you
- Do not harm our users or systems
- Report vulnerabilities responsibly
We will:
- Not pursue legal action against you
- Work with you to understand and resolve issues
- Recognize your contribution appropriately
Important: This safe harbor applies only to security research that follows this policy. Any activities outside this scope may be subject to legal action.
We follow a coordinated disclosure process:
- Day 0: Vulnerability reported
- Day 1: Acknowledgment sent
- Day 3: Initial assessment provided
- Day 7: Detailed response with timeline
- Day 30-90: Fix developed and deployed
- Day 90+: Public disclosure (if applicable, with researcher consent)
Note: Timeline may vary based on vulnerability complexity and severity.
- We prefer coordinated disclosure
- Public disclosure should occur only after the fix is deployed
- We will work with researchers on disclosure timing
- We may request a delay in public disclosure if needed for user protection
For questions about the bug bounty program:
- Email: security@example.com
- Subject: [Bug Bounty Question] Your Question
- Response Time: Within 3 business days
This bug bounty program may be updated periodically. We will notify active researchers of significant changes. Check this page regularly for updates.
Last Updated: 2024-01-15
Program Status: Active
For security concerns or to report vulnerabilities, please contact:
- Email: security@example.com
- PGP Key: [Link to PGP key]
- Bug Bounty Program: See Bug Bounty Program section above
- Responsible Disclosure: We appreciate responsible disclosure of security vulnerabilities
- Last Updated: 2024-01-15
- Next Review: 2024-04-15
- Version: 1.0
- Owner: Security Engineering Team