diff --git a/api/admin_ui/package-lock.json b/api/admin_ui/package-lock.json index c1004845..7ab2e2cd 100644 --- a/api/admin_ui/package-lock.json +++ b/api/admin_ui/package-lock.json @@ -16,6 +16,7 @@ "@xterm/addon-fit": "^0.10.0", "@xterm/addon-web-links": "^0.11.0", "@xterm/xterm": "^5.5.0", + "date-fns": "^4.1.0", "react": "^18.2.0", "react-dom": "^18.2.0", "react-hook-form": "^7.48.2", @@ -2413,6 +2414,16 @@ "integrity": "sha512-M1uQkMl8rQK/szD0LNhtqxIPLpimGm8sOBwU7lLnCpSbTyY3yeU1Vc7l4KT5zT4s/yOxHH5O7tIuuLOCnLADRw==", "license": "MIT" }, + "node_modules/date-fns": { + "version": "4.1.0", + "resolved": "https://registry.npmjs.org/date-fns/-/date-fns-4.1.0.tgz", + "integrity": "sha512-Ukq0owbQXxa/U3EGtsdVBkR1w7KOQ5gIBqdH2hkvknzZPYvBxb/aa6E8L7tmjFtkwZBu3UXBbjIgPo/Ez4xaNg==", + "license": "MIT", + "funding": { + "type": "github", + "url": "https://github.com/sponsors/kossnocorp" + } + }, "node_modules/debug": { "version": "4.4.1", "resolved": "https://registry.npmjs.org/debug/-/debug-4.4.1.tgz", diff --git a/api/admin_ui/package.json b/api/admin_ui/package.json index 6d7d9909..f8684e28 100644 --- a/api/admin_ui/package.json +++ b/api/admin_ui/package.json @@ -9,32 +9,33 @@ "test": "vitest" }, "dependencies": { - "react": "^18.2.0", - "react-dom": "^18.2.0", - "@mui/material": "^5.14.20", - "@mui/icons-material": "^5.14.19", "@emotion/react": "^11.11.1", "@emotion/styled": "^11.11.0", - "react-router-dom": "^6.20.1", - "react-hook-form": "^7.48.2", - "@xterm/xterm": "^5.5.0", + "@mui/icons-material": "^5.14.19", + "@mui/material": "^5.14.20", + "@xterm/addon-attach": "^0.11.0", "@xterm/addon-fit": "^0.10.0", "@xterm/addon-web-links": "^0.11.0", - "@xterm/addon-attach": "^0.11.0" + "@xterm/xterm": "^5.5.0", + "date-fns": "^4.1.0", + "react": "^18.2.0", + "react-dom": "^18.2.0", + "react-hook-form": "^7.48.2", + "react-router-dom": "^6.20.1" }, "devDependencies": { + "@testing-library/react": "^14.1.2", "@types/react": "^18.2.43", "@types/react-dom": "^18.2.17", "@vitejs/plugin-react": "^4.2.0", + "msw": "^2.3.1", "typescript": "^5.3.3", "vite": "^5.0.8", - "vitest": "^1.0.4", - "@testing-library/react": "^14.1.2", - "msw": "^2.3.1" + "vitest": "^1.0.4" }, "msw": { "workerDirectory": [ "public" ] } -} \ No newline at end of file +} diff --git a/api/admin_ui/src/api/client.ts b/api/admin_ui/src/api/client.ts index 322e2e60..98e7679a 100644 --- a/api/admin_ui/src/api/client.ts +++ b/api/admin_ui/src/api/client.ts @@ -237,6 +237,84 @@ export class AdminAPIClient { return res.json(); } + // Events & Diagnostics + async getRecentEvents(params: { + limit?: number; + provider_id?: string; + event_type?: string; + from_db?: boolean + } = {}): Promise<{ events: any[]; total: number; source: string }> { + const search = new URLSearchParams(); + Object.entries(params).forEach(([key, value]) => { + if (value !== undefined && value !== null) { + search.append(key, String(value)); + } + }); + const res = await this.fetch(`/admin/diagnostics/events/recent?${search}`); + return res.json(); + } + + async getEventTypes(): Promise<{ event_types: Array<{ value: string; label: string }> }> { + const res = await this.fetch('/admin/diagnostics/events/types'); + return res.json(); + } + + createEventSSE(): EventSource { + // Note: EventSource doesn't support custom headers directly + // Pass API key as query parameter for authentication + const params = new URLSearchParams(); + params.append('X-API-Key', this.apiKey); + return new EventSource(`${this.baseURL}/admin/diagnostics/events/sse?${params.toString()}`); + } + + // Provider Health Management + async getProviderHealthStatus(): Promise<{ + provider_statuses: Record; + total_providers: number; + healthy_count: number; + degraded_count: number; + circuit_open_count: number; + disabled_count: number; + }> { + const res = await this.fetch('/admin/providers/health'); + return res.json(); + } + + async enableProvider(providerId: string): Promise<{ + success: boolean; + provider_id: string; + new_status: string; + message?: string; + }> { + const res = await this.fetch('/admin/providers/enable', { + method: 'POST', + body: JSON.stringify({ provider_id: providerId }), + }); + return res.json(); + } + + async disableProvider(providerId: string): Promise<{ + success: boolean; + provider_id: string; + new_status: string; + message?: string; + }> { + const res = await this.fetch('/admin/providers/disable', { + method: 'POST', + body: JSON.stringify({ provider_id: providerId }), + }); + return res.json(); + } + + async shouldAllowRequests(providerId: string): Promise<{ + provider_id: string; + allowed: boolean; + reason: string; + }> { + const res = await this.fetch(`/admin/providers/circuit-breaker/${encodeURIComponent(providerId)}/should-allow`); + return res.json(); + } + // Jobs async listJobs(params: { status?: string; diff --git a/api/admin_ui/src/components/Diagnostics.tsx b/api/admin_ui/src/components/Diagnostics.tsx index 6b8e9790..21cbfe0d 100644 --- a/api/admin_ui/src/components/Diagnostics.tsx +++ b/api/admin_ui/src/components/Diagnostics.tsx @@ -46,6 +46,8 @@ import AdminAPIClient from '../api/client'; import type { DiagnosticsResult } from '../api/types'; import { useTraits } from '../hooks/useTraits'; import { ResponsiveFormSection } from './common/ResponsiveFormFields'; +import EventStream from './EventStream'; +import ProviderHealthStatus from './ProviderHealthStatus'; interface DiagnosticsProps { client: AdminAPIClient; @@ -1031,6 +1033,22 @@ function Diagnostics({ client, onNavigate, docsBase }: DiagnosticsProps) { renderCheckSection(title.charAt(0).toUpperCase() + title.slice(1), checks as Record) ))} + + {/* Event Stream */} + + + Real-time Event Stream + + + + + {/* Provider Health Status */} + + + Provider Health Status + + + )} diff --git a/api/admin_ui/src/components/EventStream.tsx b/api/admin_ui/src/components/EventStream.tsx new file mode 100644 index 00000000..e82905c5 --- /dev/null +++ b/api/admin_ui/src/components/EventStream.tsx @@ -0,0 +1,403 @@ +import React, { useState, useEffect, useRef } from 'react'; +import { + Box, + Typography, + Paper, + Chip, + List, + ListItem, + ListItemText, + ListItemIcon, + Divider, + TextField, + MenuItem, + Stack, + IconButton, + Tooltip, + Alert, + Badge, + FormControlLabel, + Switch, + useTheme, + useMediaQuery, +} from '@mui/material'; +import { + Circle as CircleIcon, + Pause as PauseIcon, + PlayArrow as PlayIcon, + Clear as ClearIcon, + Refresh as RefreshIcon, + FilterList as FilterIcon, + Timeline as TimelineIcon, + Event as EventIcon, + Storage as StorageIcon, + Memory as MemoryIcon, +} from '@mui/icons-material'; +import AdminAPIClient from '../api/client'; +import { format } from 'date-fns'; + +interface Event { + id: string; + type: string; + occurred_at: string; + provider_id?: string; + external_id?: string; + job_id?: string; + user_id?: string; + payload_meta?: Record; + correlation_id?: string; +} + +interface EventStreamProps { + client: AdminAPIClient; +} + +const EventStream: React.FC = ({ client }) => { + const theme = useTheme(); + const isMobile = useMediaQuery(theme.breakpoints.down('md')); + + // State + const [events, setEvents] = useState([]); + const [eventTypes, setEventTypes] = useState>([]); + const [isConnected, setIsConnected] = useState(false); + const [isPaused, setIsPaused] = useState(false); + const [loading, setLoading] = useState(false); + const [error, setError] = useState(null); + + // Filters + const [selectedEventType, setSelectedEventType] = useState(''); + const [selectedProvider, setSelectedProvider] = useState(''); + const [fromDatabase, setFromDatabase] = useState(false); + const [limit, setLimit] = useState(50); + + // Refs + const eventSourceRef = useRef(null); + const eventsRef = useRef(null); + + // Load initial data + useEffect(() => { + loadEventTypes(); + loadRecentEvents(); + }, [selectedEventType, selectedProvider, fromDatabase, limit]); + + // SSE connection management + useEffect(() => { + if (!isPaused) { + connectSSE(); + } else { + disconnectSSE(); + } + + return () => { + disconnectSSE(); + }; + }, [isPaused]); + + const loadEventTypes = async () => { + try { + const response = await client.getEventTypes(); + setEventTypes(response.event_types); + } catch (err) { + console.error('Failed to load event types:', err); + } + }; + + const loadRecentEvents = async () => { + setLoading(true); + setError(null); + + try { + const response = await client.getRecentEvents({ + limit, + provider_id: selectedProvider || undefined, + event_type: selectedEventType || undefined, + from_db: fromDatabase, + }); + + setEvents(response.events); + } catch (err) { + setError(err instanceof Error ? err.message : 'Failed to load events'); + } finally { + setLoading(false); + } + }; + + const connectSSE = () => { + if (eventSourceRef.current) return; + + try { + const eventSource = client.createEventSSE(); + eventSourceRef.current = eventSource; + + eventSource.onopen = () => { + setIsConnected(true); + setError(null); + }; + + eventSource.addEventListener('connected', () => { + setIsConnected(true); + }); + + eventSource.addEventListener('event', (e) => { + try { + const eventData = JSON.parse(e.data) as Event; + + // Apply filters + if (selectedEventType && eventData.type !== selectedEventType) return; + if (selectedProvider && eventData.provider_id !== selectedProvider) return; + + setEvents(prev => [eventData, ...prev.slice(0, limit - 1)]); + } catch (err) { + console.error('Failed to parse event:', err); + } + }); + + eventSource.addEventListener('keepalive', () => { + // Keepalive received + }); + + eventSource.onerror = () => { + setIsConnected(false); + setError('Connection lost to event stream'); + eventSourceRef.current = null; + }; + } catch (err) { + setError('Failed to connect to event stream'); + } + }; + + const disconnectSSE = () => { + if (eventSourceRef.current) { + eventSourceRef.current.close(); + eventSourceRef.current = null; + setIsConnected(false); + } + }; + + const clearEvents = () => { + setEvents([]); + }; + + const getEventIcon = (eventType: string) => { + if (eventType.includes('fax')) return ; + if (eventType.includes('provider') || eventType.includes('health')) return ; + if (eventType.includes('webhook')) return ; + if (eventType.includes('config')) return ; + return ; + }; + + const getEventColor = (eventType: string): "default" | "primary" | "secondary" | "error" | "info" | "success" | "warning" => { + if (eventType.includes('failed') || eventType.includes('error')) return 'error'; + if (eventType.includes('delivered') || eventType.includes('sent')) return 'success'; + if (eventType.includes('queued') || eventType.includes('retrying')) return 'warning'; + if (eventType.includes('health')) return 'info'; + return 'default'; + }; + + const formatPayloadMeta = (meta?: Record) => { + if (!meta || Object.keys(meta).length === 0) return null; + + return Object.entries(meta).map(([key, value]) => ( + + )); + }; + + const uniqueProviders = Array.from(new Set(events.map(e => e.provider_id).filter(Boolean))); + + return ( + + {/* Header */} + + + + Event Stream + + + + + + + + {/* Controls */} + + + setIsPaused(!isPaused)} color={isPaused ? 'primary' : 'default'}> + {isPaused ? : } + + + + + + + + + + + + + + + + + + {/* Filters */} + + setSelectedEventType(e.target.value)} + sx={{ minWidth: 150 }} + > + All Types + {eventTypes.map((type) => ( + + {type.label} + + ))} + + + setSelectedProvider(e.target.value)} + sx={{ minWidth: 120 }} + > + All Providers + {uniqueProviders.map((provider) => ( + + {provider} + + ))} + + + setLimit(parseInt(e.target.value) || 50)} + InputProps={{ inputProps: { min: 1, max: 200 } }} + sx={{ width: 80 }} + /> + + setFromDatabase(e.target.checked)} + size="small" + /> + } + label={ + + {fromDatabase ? : } + {fromDatabase ? 'Database' : 'Memory'} + + } + /> + + + {/* Error Alert */} + {error && ( + setError(null)}> + {error} + + )} + + {/* Events List */} + + {events.length === 0 ? ( + + + + {loading ? 'Loading events...' : 'No events to display'} + + + {isPaused ? 'Stream is paused' : 'Events will appear here in real-time'} + + + ) : ( + + {events.map((event, index) => ( + + + + {getEventIcon(event.type)} + + + + {event.provider_id && ( + + )} + + } + secondary={ + + + {format(new Date(event.occurred_at), 'MMM dd, HH:mm:ss.SSS')} + {event.job_id && ` • Job: ${event.job_id}`} + {event.external_id && ` • Ext: ${event.external_id}`} + {event.correlation_id && ` • Correlation: ${event.correlation_id}`} + + {event.payload_meta && ( + + {formatPayloadMeta(event.payload_meta)} + + )} + + } + /> + + {index < events.length - 1 && } + + ))} + + )} + + + {/* Footer */} + + + Showing {events.length} events • Source: {fromDatabase ? 'Database' : 'Memory'} • + Stream: {isConnected ? 'Connected' : 'Disconnected'} + + + + ); +}; + +export default EventStream; \ No newline at end of file diff --git a/api/admin_ui/src/components/ProviderHealthStatus.tsx b/api/admin_ui/src/components/ProviderHealthStatus.tsx new file mode 100644 index 00000000..38c0502d --- /dev/null +++ b/api/admin_ui/src/components/ProviderHealthStatus.tsx @@ -0,0 +1,322 @@ +import React, { useState, useEffect } from 'react'; +import { + Box, + Card, + CardContent, + Typography, + Chip, + Button, + IconButton, + Grid, + Alert, + CircularProgress, + Tooltip, + Stack, +} from '@mui/material'; +import { + Refresh as RefreshIcon, + CheckCircle as HealthyIcon, + Warning as DegradedIcon, + Error as ErrorIcon, + Block as DisabledIcon, + PlayArrow as EnableIcon, + Pause as DisableIcon, + Timeline as ChartIcon, +} from '@mui/icons-material'; +import { format } from 'date-fns'; +import AdminAPIClient from '../api/client'; + +interface ProviderStatus { + provider_id: string; + provider_type: string; + status: 'healthy' | 'degraded' | 'circuit_open' | 'disabled'; + failure_count: number; + last_success: string | null; + last_failure: string | null; + next_retry_at?: string; +} + +interface HealthStatusData { + provider_statuses: Record; + total_providers: number; + healthy_count: number; + degraded_count: number; + circuit_open_count: number; + disabled_count: number; +} + +interface ProviderHealthStatusProps { + client: AdminAPIClient; +} + +const ProviderHealthStatus: React.FC = ({ client }) => { + + const [healthData, setHealthData] = useState(null); + const [loading, setLoading] = useState(false); + const [error, setError] = useState(null); + const [actionLoading, setActionLoading] = useState(null); + + const loadHealthStatus = async () => { + setLoading(true); + setError(null); + + try { + const data = await client.getProviderHealthStatus(); + setHealthData(data); + } catch (err) { + setError(err instanceof Error ? err.message : 'Failed to load provider health status'); + } finally { + setLoading(false); + } + }; + + const handleProviderAction = async (providerId: string, action: 'enable' | 'disable') => { + setActionLoading(providerId); + + try { + if (action === 'enable') { + await client.enableProvider(providerId); + } else { + await client.disableProvider(providerId); + } + + // Refresh health status + await loadHealthStatus(); + } catch (err) { + setError(err instanceof Error ? err.message : `Failed to ${action} provider`); + } finally { + setActionLoading(null); + } + }; + + useEffect(() => { + loadHealthStatus(); + + // Auto-refresh every 30 seconds + const interval = setInterval(loadHealthStatus, 30000); + return () => clearInterval(interval); + }, []); + + const getStatusIcon = (status: ProviderStatus['status']) => { + switch (status) { + case 'healthy': + return ; + case 'degraded': + return ; + case 'circuit_open': + return ; + case 'disabled': + return ; + default: + return ; + } + }; + + const getStatusColor = (status: ProviderStatus['status']): "default" | "primary" | "secondary" | "error" | "info" | "success" | "warning" => { + switch (status) { + case 'healthy': + return 'success'; + case 'degraded': + return 'warning'; + case 'circuit_open': + return 'error'; + case 'disabled': + return 'default'; + default: + return 'default'; + } + }; + + const getStatusText = (status: ProviderStatus['status']) => { + switch (status) { + case 'healthy': + return 'Healthy'; + case 'degraded': + return 'Degraded'; + case 'circuit_open': + return 'Circuit Open'; + case 'disabled': + return 'Disabled'; + default: + return 'Unknown'; + } + }; + + const formatTimestamp = (timestamp: string | null) => { + if (!timestamp) return 'Never'; + try { + return format(new Date(timestamp), 'MMM dd, HH:mm:ss'); + } catch { + return 'Invalid date'; + } + }; + + if (loading && !healthData) { + return ( + + + + + + ); + } + + return ( + + {/* Summary Cards */} + {healthData && ( + + + + + + {healthData.total_providers} + + + Total Providers + + + + + + + + + {healthData.healthy_count} + + + Healthy + + + + + + + + + {healthData.degraded_count} + + + Degraded + + + + + + + + + {healthData.circuit_open_count + healthData.disabled_count} + + + Unavailable + + + + + + )} + + {/* Controls */} + + + Provider Details + + + + + + + + + {/* Error Alert */} + {error && ( + setError(null)}> + {error} + + )} + + {/* Provider Details */} + {healthData && Object.keys(healthData.provider_statuses).length === 0 ? ( + + + + No Providers Found + + Health monitoring will start when providers are available + + + + ) : ( + + {healthData && Object.entries(healthData.provider_statuses).map(([providerId, status]) => ( + + + + + + {getStatusIcon(status.status)} + + {providerId} + + + + + + + + Type: {status.provider_type} + + + Failures: {status.failure_count} + + + Last Success: {formatTimestamp(status.last_success)} + + + Last Failure: {formatTimestamp(status.last_failure)} + + {status.next_retry_at && ( + + Next Retry: {formatTimestamp(status.next_retry_at)} + + )} + + + + + + + + + + ))} + + )} + + ); +}; + +export default ProviderHealthStatus; \ No newline at end of file diff --git a/api/alembic/versions/0002_hierarchical_config.py b/api/alembic/versions/0002_hierarchical_config.py new file mode 100644 index 00000000..2a38d2b5 --- /dev/null +++ b/api/alembic/versions/0002_hierarchical_config.py @@ -0,0 +1,131 @@ +"""Hierarchical configuration tables + +Revision ID: 0002_hierarchical_config +Revises: 0001_initial +Create Date: 2025-09-26 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '0002_hierarchical_config' +down_revision = '0001_initial' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # Global configuration + op.create_table( + 'config_global', + sa.Column('key', sa.String(200), primary_key=True, nullable=False), + sa.Column('value_encrypted', sa.Text(), nullable=False), + sa.Column('value_type', sa.String(20), nullable=False, server_default='string'), + sa.Column('encrypted', sa.Boolean(), nullable=False, server_default=sa.true()), + sa.Column('description', sa.Text(), nullable=True), + sa.Column('category', sa.String(50), nullable=True), + sa.Column('created_at', sa.DateTime(), nullable=False, server_default=sa.func.now()), + sa.Column('updated_at', sa.DateTime(), nullable=False, server_default=sa.func.now()), + ) + + # Tenant-level configuration + op.create_table( + 'config_tenant', + sa.Column('tenant_id', sa.String(100), nullable=False), + sa.Column('key', sa.String(200), nullable=False), + sa.Column('value_encrypted', sa.Text(), nullable=False), + sa.Column('value_type', sa.String(20), nullable=False, server_default='string'), + sa.Column('encrypted', sa.Boolean(), nullable=False, server_default=sa.true()), + sa.Column('created_at', sa.DateTime(), nullable=False, server_default=sa.func.now()), + sa.Column('updated_at', sa.DateTime(), nullable=False, server_default=sa.func.now()), + sa.PrimaryKeyConstraint('tenant_id', 'key'), + ) + op.create_index('idx_tenant_key', 'config_tenant', ['tenant_id', 'key']) + + # Department-level configuration + op.create_table( + 'config_department', + sa.Column('tenant_id', sa.String(100), nullable=False), + sa.Column('department', sa.String(100), nullable=False), + sa.Column('key', sa.String(200), nullable=False), + sa.Column('value_encrypted', sa.Text(), nullable=False), + sa.Column('value_type', sa.String(20), nullable=False, server_default='string'), + sa.Column('encrypted', sa.Boolean(), nullable=False, server_default=sa.true()), + sa.Column('created_at', sa.DateTime(), nullable=False, server_default=sa.func.now()), + sa.Column('updated_at', sa.DateTime(), nullable=False, server_default=sa.func.now()), + sa.PrimaryKeyConstraint('tenant_id', 'department', 'key'), + ) + op.create_index('idx_dept_key', 'config_department', ['tenant_id', 'department', 'key']) + + # Group-level configuration + op.create_table( + 'config_group', + sa.Column('group_id', sa.String(100), nullable=False), + sa.Column('key', sa.String(200), nullable=False), + sa.Column('value_encrypted', sa.Text(), nullable=False), + sa.Column('value_type', sa.String(20), nullable=False, server_default='string'), + sa.Column('encrypted', sa.Boolean(), nullable=False, server_default=sa.true()), + sa.Column('priority', sa.Integer(), nullable=False, server_default='0'), + sa.Column('created_at', sa.DateTime(), nullable=False, server_default=sa.func.now()), + sa.Column('updated_at', sa.DateTime(), nullable=False, server_default=sa.func.now()), + sa.PrimaryKeyConstraint('group_id', 'key'), + ) + op.create_index('idx_group_key', 'config_group', ['group_id', 'key']) + op.create_index('idx_group_priority', 'config_group', ['group_id', 'priority']) + + # User-level configuration + op.create_table( + 'config_user', + sa.Column('user_id', sa.String(100), nullable=False), + sa.Column('key', sa.String(200), nullable=False), + sa.Column('value_encrypted', sa.Text(), nullable=False), + sa.Column('value_type', sa.String(20), nullable=False, server_default='string'), + sa.Column('encrypted', sa.Boolean(), nullable=False, server_default=sa.true()), + sa.Column('created_at', sa.DateTime(), nullable=False, server_default=sa.func.now()), + sa.Column('updated_at', sa.DateTime(), nullable=False, server_default=sa.func.now()), + sa.PrimaryKeyConstraint('user_id', 'key'), + ) + op.create_index('idx_user_key', 'config_user', ['user_id', 'key']) + + # Configuration audit trail (masked only; integrity fingerprint) + op.create_table( + 'config_audit', + sa.Column('id', sa.String(40), primary_key=True, nullable=False), + sa.Column('level', sa.String(20), nullable=False), + sa.Column('level_id', sa.String(200), nullable=True), + sa.Column('key', sa.String(200), nullable=False), + sa.Column('old_value_masked', sa.Text(), nullable=True), + sa.Column('new_value_masked', sa.Text(), nullable=False), + sa.Column('value_hmac', sa.String(64), nullable=False), # hex sha256 + sa.Column('value_type', sa.String(20), nullable=False), + sa.Column('changed_by', sa.String(100), nullable=False), + sa.Column('changed_at', sa.DateTime(), nullable=False, server_default=sa.func.now()), + sa.Column('reason', sa.Text(), nullable=True), + sa.Column('ip_address', sa.String(45), nullable=True), + sa.Column('user_agent', sa.Text(), nullable=True), + ) + op.create_index('idx_audit_level', 'config_audit', ['level', 'level_id']) + op.create_index('idx_audit_key', 'config_audit', ['key']) + op.create_index('idx_audit_time', 'config_audit', ['changed_at']) + op.create_index('idx_audit_user', 'config_audit', ['changed_by']) + + +def downgrade() -> None: + op.drop_index('idx_audit_user', table_name='config_audit') + op.drop_index('idx_audit_time', table_name='config_audit') + op.drop_index('idx_audit_key', table_name='config_audit') + op.drop_index('idx_audit_level', table_name='config_audit') + op.drop_table('config_audit') + op.drop_index('idx_user_key', table_name='config_user') + op.drop_table('config_user') + op.drop_index('idx_group_priority', table_name='config_group') + op.drop_index('idx_group_key', table_name='config_group') + op.drop_table('config_group') + op.drop_index('idx_dept_key', table_name='config_department') + op.drop_table('config_department') + op.drop_index('idx_tenant_key', table_name='config_tenant') + op.drop_table('config_tenant') + op.drop_table('config_global') + diff --git a/api/alembic/versions/0003_webhook_dlq.py b/api/alembic/versions/0003_webhook_dlq.py new file mode 100644 index 00000000..d705de83 --- /dev/null +++ b/api/alembic/versions/0003_webhook_dlq.py @@ -0,0 +1,47 @@ +"""webhook DLQ table + +Revision ID: 0003 +Revises: 0002 +Create Date: 2025-09-26 10:45:00.000000 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '0003' +down_revision = '0002' +branch_labels = None +depends_on = None + + +def upgrade(): + """Create webhook_dlq table for dead letter queue.""" + op.create_table('webhook_dlq', + sa.Column('id', sa.String(40), nullable=False, primary_key=True), + sa.Column('provider_id', sa.String(40), nullable=False), + sa.Column('external_id', sa.String(100), nullable=True), + sa.Column('received_at', sa.DateTime(), nullable=False, server_default=sa.func.now()), + sa.Column('status', sa.String(20), nullable=False, server_default='queued'), + sa.Column('error', sa.Text(), nullable=True), + sa.Column('headers_meta', sa.Text(), nullable=True), + sa.Column('retry_count', sa.String(10), nullable=False, server_default='0'), + sa.Column('last_retry_at', sa.DateTime(), nullable=True), + sa.Column('next_retry_at', sa.DateTime(), nullable=True), + ) + + # Create indexes for efficient queries + op.create_index('ix_webhook_dlq_provider_id', 'webhook_dlq', ['provider_id']) + op.create_index('ix_webhook_dlq_status', 'webhook_dlq', ['status']) + op.create_index('ix_webhook_dlq_external_id', 'webhook_dlq', ['external_id']) + op.create_index('ix_webhook_dlq_next_retry_at', 'webhook_dlq', ['next_retry_at']) + + +def downgrade(): + """Drop webhook_dlq table.""" + op.drop_index('ix_webhook_dlq_next_retry_at', table_name='webhook_dlq') + op.drop_index('ix_webhook_dlq_external_id', table_name='webhook_dlq') + op.drop_index('ix_webhook_dlq_status', table_name='webhook_dlq') + op.drop_index('ix_webhook_dlq_provider_id', table_name='webhook_dlq') + op.drop_table('webhook_dlq') \ No newline at end of file diff --git a/api/app/db.py b/api/app/db.py index bf5089f1..9d39c21a 100644 --- a/api/app/db.py +++ b/api/app/db.py @@ -113,6 +113,7 @@ def init_db(): _rebind_engine_if_needed() Base.metadata.create_all(engine) _ensure_optional_columns() + _ensure_dlq_table() def _ensure_optional_columns() -> None: @@ -171,3 +172,69 @@ def _ensure_optional_columns() -> None: except Exception: # Do not block startup on migration best-effort failures pass + + +def _ensure_dlq_table() -> None: + """Ensure webhook_dlq table exists (Phase 3 PR20).""" + try: + with engine.begin() as conn: + dialect = engine.dialect.name + if dialect == 'sqlite': + # Check if table exists + result = conn.exec_driver_sql("SELECT name FROM sqlite_master WHERE type='table' AND name='webhook_dlq'") + if not result.fetchone(): + # Create table + conn.exec_driver_sql(""" + CREATE TABLE webhook_dlq ( + id VARCHAR(40) NOT NULL PRIMARY KEY, + provider_id VARCHAR(40) NOT NULL, + external_id VARCHAR(100), + received_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + status VARCHAR(20) NOT NULL DEFAULT 'queued', + error TEXT, + headers_meta TEXT, + retry_count VARCHAR(10) NOT NULL DEFAULT '0', + last_retry_at DATETIME, + next_retry_at DATETIME + ) + """) + # Create indexes + conn.exec_driver_sql("CREATE INDEX ix_webhook_dlq_provider_id ON webhook_dlq (provider_id)") + conn.exec_driver_sql("CREATE INDEX ix_webhook_dlq_status ON webhook_dlq (status)") + conn.exec_driver_sql("CREATE INDEX ix_webhook_dlq_external_id ON webhook_dlq (external_id)") + conn.exec_driver_sql("CREATE INDEX ix_webhook_dlq_next_retry_at ON webhook_dlq (next_retry_at)") + else: + # PostgreSQL - best effort + try: + conn.exec_driver_sql(""" + CREATE TABLE IF NOT EXISTS webhook_dlq ( + id VARCHAR(40) NOT NULL PRIMARY KEY, + provider_id VARCHAR(40) NOT NULL, + external_id VARCHAR(100), + received_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + status VARCHAR(20) NOT NULL DEFAULT 'queued', + error TEXT, + headers_meta TEXT, + retry_count VARCHAR(10) NOT NULL DEFAULT '0', + last_retry_at TIMESTAMP, + next_retry_at TIMESTAMP + ) + """) + # Create indexes if not exists + conn.exec_driver_sql("CREATE INDEX IF NOT EXISTS ix_webhook_dlq_provider_id ON webhook_dlq (provider_id)") + conn.exec_driver_sql("CREATE INDEX IF NOT EXISTS ix_webhook_dlq_status ON webhook_dlq (status)") + conn.exec_driver_sql("CREATE INDEX IF NOT EXISTS ix_webhook_dlq_external_id ON webhook_dlq (external_id)") + conn.exec_driver_sql("CREATE INDEX IF NOT EXISTS ix_webhook_dlq_next_retry_at ON webhook_dlq (next_retry_at)") + except Exception: + pass + except Exception: + # Don't fail startup if DLQ table creation fails + pass + + +# Import models for alembic auto-detection +try: + from .models.webhook_dlq import WebhookDLQ # noqa: F401 +except ImportError: + # Model not available in all environments + pass diff --git a/api/app/models/webhook_dlq.py b/api/app/models/webhook_dlq.py new file mode 100644 index 00000000..4663f648 --- /dev/null +++ b/api/app/models/webhook_dlq.py @@ -0,0 +1,90 @@ +""" +Webhook Dead Letter Queue (DLQ) model for failed webhook processing. + +The DLQ captures webhook callbacks that fail to process after retry attempts, +storing essential metadata for debugging and reprocessing. +""" + +from sqlalchemy import Column, String, Text, DateTime, func +import uuid +from datetime import datetime +from typing import Dict, Any, Optional +import json + +from api.app.db import Base + + +class WebhookDLQ(Base): + """ + Dead Letter Queue entry for failed webhook processing. + + This table stores webhook callbacks that could not be processed + successfully after exhausting retry attempts. Only safe header + metadata is persisted - no Authorization headers or sensitive data. + """ + __tablename__ = "webhook_dlq" + + id = Column(String(40), primary_key=True, nullable=False) + provider_id = Column(String(40), nullable=False) + external_id = Column(String(100), nullable=True) + received_at = Column(DateTime(), nullable=False, server_default=func.now()) + status = Column(String(20), nullable=False, default='queued') # queued|retrying|failed + error = Column(Text(), nullable=True) + headers_meta = Column(Text(), nullable=True) # JSON; ALLOWLIST ONLY + + # Retry tracking + retry_count = Column(String(10), nullable=False, default='0') + last_retry_at = Column(DateTime(), nullable=True) + next_retry_at = Column(DateTime(), nullable=True) + + def __init__(self, provider_id: str, external_id: Optional[str] = None, + error: Optional[str] = None, headers_meta: Optional[Dict[str, Any]] = None, + **kwargs): + self.id = kwargs.get('id', f'dlq_{uuid.uuid4().hex}') + self.provider_id = provider_id + self.external_id = external_id + self.error = error + self.headers_meta = json.dumps(headers_meta) if headers_meta else None + self.status = kwargs.get('status', 'queued') + self.retry_count = str(kwargs.get('retry_count', 0)) + + @classmethod + def create_safe_headers_meta(cls, headers: Dict[str, str]) -> Dict[str, str]: + """ + Create safe headers metadata by allowlisting only non-sensitive headers. + + Security: Never persist Authorization, secrets, or other sensitive headers. + """ + ALLOWED_HEADERS = { + "user-agent", "content-type", "content-length", + "x-request-id", "x-signature", "x-timestamp", + "x-phaxio-signature", "x-sinch-signature" + } + + return { + k: v for k, v in headers.items() + if k.lower() in ALLOWED_HEADERS + } + + def get_headers_meta(self) -> Dict[str, Any]: + """Get parsed headers metadata.""" + if not self.headers_meta: + return {} + try: + return json.loads(self.headers_meta) + except (json.JSONDecodeError, TypeError): + return {} + + def increment_retry(self, next_retry_at: Optional[datetime] = None): + """Increment retry count and update retry timestamps.""" + self.retry_count = str(int(self.retry_count) + 1) + self.last_retry_at = datetime.utcnow() + if next_retry_at: + self.next_retry_at = next_retry_at + self.status = 'retrying' + + def mark_failed(self, error: str): + """Mark the DLQ entry as permanently failed.""" + self.status = 'failed' + self.error = error + self.next_retry_at = None \ No newline at end of file diff --git a/api/app/monitoring/health.py b/api/app/monitoring/health.py index dbfe0af1..38be7033 100644 --- a/api/app/monitoring/health.py +++ b/api/app/monitoring/health.py @@ -1,9 +1,378 @@ +import asyncio +import logging +from datetime import datetime, timedelta +from typing import Dict, List, Optional, Literal, Any +from dataclasses import dataclass, field +from enum import Enum + +from api.app.services.events import EventEmitter, EventType +from api.app.config.hierarchical_provider import HierarchicalConfigProvider + +logger = logging.getLogger(__name__) + +ProviderStatus = Literal['healthy', 'degraded', 'circuit_open', 'disabled'] + + +@dataclass +class HealthCheck: + """Provider health check result.""" + provider_id: str + provider_type: str + success: bool + response_time_ms: float + details: Dict[str, Any] = field(default_factory=dict) + error: Optional[str] = None + checked_at: datetime = field(default_factory=datetime.utcnow) + + +@dataclass +class CircuitBreakerState: + """Circuit breaker state for a provider.""" + provider_id: str + status: ProviderStatus = 'healthy' + failure_count: int = 0 + last_failure_time: Optional[datetime] = None + last_success_time: Optional[datetime] = None + circuit_opened_at: Optional[datetime] = None + next_retry_at: Optional[datetime] = None + failure_threshold: int = 5 + recovery_timeout_seconds: int = 60 + health_check_interval_seconds: int = 300 + + def should_allow_request(self) -> bool: + """Check if requests should be allowed through circuit breaker.""" + now = datetime.utcnow() + + if self.status == 'healthy': + return True + elif self.status == 'degraded': + return True # Allow with warnings + elif self.status == 'circuit_open': + # Check if we should try to recover + if self.next_retry_at and now >= self.next_retry_at: + return True # Try one request + return False + elif self.status == 'disabled': + return False + + return False + + def record_success(self): + """Record successful request.""" + self.last_success_time = datetime.utcnow() + if self.status == 'circuit_open': + # Circuit breaker recovery + self.status = 'healthy' + self.failure_count = 0 + self.circuit_opened_at = None + self.next_retry_at = None + elif self.status == 'degraded' and self.failure_count > 0: + self.failure_count = max(0, self.failure_count - 1) + if self.failure_count == 0: + self.status = 'healthy' + + def record_failure(self, error: str): + """Record failed request.""" + now = datetime.utcnow() + self.last_failure_time = now + self.failure_count += 1 + + if self.failure_count >= self.failure_threshold: + if self.status != 'circuit_open': + self.status = 'circuit_open' + self.circuit_opened_at = now + self.next_retry_at = now + timedelta(seconds=self.recovery_timeout_seconds) + elif self.failure_count >= self.failure_threshold // 2: + if self.status == 'healthy': + self.status = 'degraded' + + class ProviderHealthMonitor: - """Minimal health monitor stub for CI guardrail checks. + """Monitor provider health with circuit breaker functionality.""" + + def __init__( + self, + plugin_manager=None, + event_emitter: Optional[EventEmitter] = None, + config_provider: Optional[HierarchicalConfigProvider] = None + ): + self.plugin_manager = plugin_manager + self.event_emitter = event_emitter + self.config_provider = config_provider + self.circuit_states: Dict[str, CircuitBreakerState] = {} + self.health_check_task: Optional[asyncio.Task] = None + self.running = False + + async def start_monitoring(self): + """Start background health monitoring.""" + if self.running: + return + + self.running = True + self.health_check_task = asyncio.create_task(self._health_check_loop()) + logger.info("Provider health monitoring started") + + async def stop_monitoring(self): + """Stop background health monitoring.""" + self.running = False + if self.health_check_task: + self.health_check_task.cancel() + try: + await self.health_check_task + except asyncio.CancelledError: + pass + logger.info("Provider health monitoring stopped") + + async def _health_check_loop(self): + """Background loop for health checks.""" + while self.running: + try: + await self._perform_health_checks() + + # Get check interval from config + interval = await self._get_health_check_interval() + await asyncio.sleep(interval) + + except asyncio.CancelledError: + break + except Exception as e: + logger.error(f"Health check loop error: {e}") + await asyncio.sleep(60) # Back off on error + + async def _perform_health_checks(self): + """Perform health checks on all providers.""" + if not self.plugin_manager: + return + + try: + # Get transport plugins from plugin manager + transport_plugins = getattr(self.plugin_manager, 'get_plugins_by_type', lambda x: {})('transport') + + if not transport_plugins: + # Fallback: simulate providers for testing + transport_plugins = { + 'test': type('TestPlugin', (), { + 'plugin_type': 'transport', + 'check_health': lambda: {'ok': True, 'details': {'method': 'test'}} + })() + } + + for provider_id, plugin in transport_plugins.items(): + try: + health_check = await self._check_provider_health(provider_id, plugin) + await self._update_circuit_breaker(health_check) + + except Exception as e: + logger.error(f"Provider health check error for {provider_id}: {e}") + except Exception as e: + logger.error(f"Failed to get transport plugins: {e}") + + async def _check_provider_health(self, provider_id: str, plugin) -> HealthCheck: + """Check health of a specific provider.""" + start_time = datetime.utcnow() + + try: + # Check if provider has health check method + if hasattr(plugin, 'check_health'): + if asyncio.iscoroutinefunction(plugin.check_health): + result = await plugin.check_health() + else: + result = plugin.check_health() + response_time = (datetime.utcnow() - start_time).total_seconds() * 1000 + + return HealthCheck( + provider_id=provider_id, + provider_type=getattr(plugin, 'plugin_type', 'transport'), + success=result.get('ok', False), + response_time_ms=response_time, + details=result.get('details', {}), + error=result.get('error') + ) + else: + # No health check method - assume healthy if plugin loaded + return HealthCheck( + provider_id=provider_id, + provider_type=getattr(plugin, 'plugin_type', 'transport'), + success=True, + response_time_ms=0, + details={'method': 'plugin_loaded_check'} + ) + + except Exception as e: + response_time = (datetime.utcnow() - start_time).total_seconds() * 1000 + return HealthCheck( + provider_id=provider_id, + provider_type=getattr(plugin, 'plugin_type', 'transport'), + success=False, + response_time_ms=response_time, + error=str(e) + ) + + async def _update_circuit_breaker(self, health_check: HealthCheck): + """Update circuit breaker state based on health check.""" + provider_id = health_check.provider_id + + # Get or create circuit breaker state + if provider_id not in self.circuit_states: + self.circuit_states[provider_id] = CircuitBreakerState( + provider_id=provider_id, + failure_threshold=await self._get_circuit_breaker_threshold(provider_id), + recovery_timeout_seconds=await self._get_circuit_breaker_timeout(provider_id) + ) + + circuit_state = self.circuit_states[provider_id] + old_status = circuit_state.status + + if health_check.success: + circuit_state.record_success() + else: + circuit_state.record_failure(health_check.error or 'Health check failed') + + # Emit event if status changed + if circuit_state.status != old_status and self.event_emitter: + await self.event_emitter.emit_event( + EventType.PROVIDER_HEALTH_CHANGED, + provider_id=provider_id, + payload_meta={ + 'old_status': old_status, + 'new_status': circuit_state.status, + 'failure_count': circuit_state.failure_count, + 'response_time_ms': health_check.response_time_ms + } + ) + + logger.info(f"Provider {provider_id} status changed: {old_status} -> {circuit_state.status}") + + async def _get_health_check_interval(self) -> int: + """Get health check interval from configuration.""" + try: + if self.config_provider: + system_ctx = {'user_id': 'system', 'groups': [], 'department': None, 'tenant_id': None} + result = await self.config_provider.get_effective('provider.health_check_interval', system_ctx) + return int(result.value) if result else 300 + except Exception: + pass + return 300 # 5 minutes default + + async def _get_circuit_breaker_threshold(self, provider_id: str) -> int: + """Get circuit breaker threshold for provider.""" + try: + if self.config_provider: + system_ctx = {'user_id': 'system', 'groups': [], 'department': None, 'tenant_id': None} + key = f'provider.{provider_id}.circuit_breaker_threshold' + result = await self.config_provider.get_effective(key, system_ctx) + return int(result.value) if result else 5 + except Exception: + pass + return 5 # Default threshold + + async def _get_circuit_breaker_timeout(self, provider_id: str) -> int: + """Get circuit breaker recovery timeout for provider.""" + try: + if self.config_provider: + system_ctx = {'user_id': 'system', 'groups': [], 'department': None, 'tenant_id': None} + key = f'provider.{provider_id}.circuit_breaker_timeout' + result = await self.config_provider.get_effective(key, system_ctx) + return int(result.value) if result else 60 + except Exception: + pass + return 60 # Default 1 minute + + def should_allow_request(self, provider_id: str) -> bool: + """Check if provider should receive requests.""" + if provider_id not in self.circuit_states: + return True # No circuit breaker info, allow by default + + return self.circuit_states[provider_id].should_allow_request() + + def record_request_result(self, provider_id: str, success: bool, error: str = None): + """Record result of provider request (for circuit breaker).""" + if provider_id not in self.circuit_states: + self.circuit_states[provider_id] = CircuitBreakerState(provider_id=provider_id) + + circuit_state = self.circuit_states[provider_id] + old_status = circuit_state.status + + if success: + circuit_state.record_success() + else: + circuit_state.record_failure(error or 'Request failed') + + # Emit event if status changed (async fire-and-forget) + if circuit_state.status != old_status and self.event_emitter: + asyncio.create_task(self.event_emitter.emit_event( + EventType.PROVIDER_HEALTH_CHANGED, + provider_id=provider_id, + payload_meta={ + 'old_status': old_status, + 'new_status': circuit_state.status, + 'trigger': 'request_result' + } + )) + + async def get_provider_statuses(self) -> Dict[str, Dict]: + """Get current status of all providers.""" + statuses = {} + + # Include all known providers from circuit states + for provider_id, circuit_state in self.circuit_states.items(): + status_info = { + 'provider_id': provider_id, + 'provider_type': 'transport', + 'status': circuit_state.status, + 'failure_count': circuit_state.failure_count, + 'last_success': circuit_state.last_success_time.isoformat() if circuit_state.last_success_time else None, + 'last_failure': circuit_state.last_failure_time.isoformat() if circuit_state.last_failure_time else None, + } + + if circuit_state.status == 'circuit_open': + status_info['next_retry_at'] = circuit_state.next_retry_at.isoformat() if circuit_state.next_retry_at else None + + statuses[provider_id] = status_info + + return statuses + + async def manual_enable_provider(self, provider_id: str): + """Manually enable a provider (admin action).""" + if provider_id in self.circuit_states: + old_status = self.circuit_states[provider_id].status + self.circuit_states[provider_id].status = 'healthy' + self.circuit_states[provider_id].failure_count = 0 + self.circuit_states[provider_id].circuit_opened_at = None + self.circuit_states[provider_id].next_retry_at = None + + if self.event_emitter: + await self.event_emitter.emit_event( + EventType.PROVIDER_ENABLED, + provider_id=provider_id, + payload_meta={ + 'old_status': old_status, + 'new_status': 'healthy', + 'trigger': 'manual_enable' + } + ) + + logger.info(f"Provider {provider_id} manually enabled") + + async def manual_disable_provider(self, provider_id: str): + """Manually disable a provider (admin action).""" + if provider_id not in self.circuit_states: + self.circuit_states[provider_id] = CircuitBreakerState(provider_id=provider_id) + + old_status = self.circuit_states[provider_id].status + self.circuit_states[provider_id].status = 'disabled' + + if self.event_emitter: + await self.event_emitter.emit_event( + EventType.PROVIDER_DISABLED, + provider_id=provider_id, + payload_meta={ + 'old_status': old_status, + 'new_status': 'disabled', + 'trigger': 'manual_disable' + } + ) - In Phase-3 this will be replaced by the real monitor/circuit breaker. - """ + logger.info(f"Provider {provider_id} manually disabled") - def __init__(self) -> None: - pass diff --git a/api/app/plugins/transport/shims/test/plugin.py b/api/app/plugins/transport/shims/test/plugin.py new file mode 100644 index 00000000..d73a6546 --- /dev/null +++ b/api/app/plugins/transport/shims/test/plugin.py @@ -0,0 +1,19 @@ +from typing import Dict, Any +from datetime import datetime + +class Plugin: + plugin_type = "transport" + plugin_id = "test" + + async def initialize(self, config: Dict[str, Any]) -> None: + return + + async def send_fax(self, to_number: str, file_path: str, **kwargs) -> Dict[str, Any]: + return { + "job_id": f"test-{int(datetime.utcnow().timestamp())}", + "provider_sid": "test-sid", + "status": "queued", + "to_number": to_number, + "provider": "test", + "metadata": {"note": "dummy-transport"} + } diff --git a/api/app/plugins/transport/test/manifest.json b/api/app/plugins/transport/test/manifest.json new file mode 100644 index 00000000..40eae0a4 --- /dev/null +++ b/api/app/plugins/transport/test/manifest.json @@ -0,0 +1,15 @@ +{ + "id": "test", + "name": "Test Transport", + "version": "1.0.0", + "type": "transport", + "traits": { + "send_fax": true, + "status_callback": false, + "inbound_supported": false, + "webhook": { "path": "/callbacks/test", "verification": "none" }, + "auth": { "methods": [] } + }, + "config_schema": { "type": "object", "properties": {} } + } + \ No newline at end of file diff --git a/api/app/routers/admin_diagnostics.py b/api/app/routers/admin_diagnostics.py new file mode 100644 index 00000000..9558c120 --- /dev/null +++ b/api/app/routers/admin_diagnostics.py @@ -0,0 +1,91 @@ +import asyncio +from typing import Optional + +from fastapi import APIRouter, Depends, HTTPException, Query +from fastapi import Request +from sse_starlette.sse import EventSourceResponse # type: ignore + +from api.app.main import require_admin # reuse admin dependency +from api.app.services.events import EventEmitter + + +router = APIRouter(prefix="/admin/diagnostics", tags=["Diagnostics"], dependencies=[Depends(require_admin)]) + + +@router.get("/events/recent") +async def recent_events( + request: Request, + limit: int = 50, + provider_id: Optional[str] = None, + event_type: Optional[str] = None, + from_db: bool = False +): + """Get recent events with filtering options.""" + emitter: EventEmitter = request.app.state.event_emitter # type: ignore + events = await emitter.get_recent_events( + limit=limit, + provider_id=provider_id, + event_type=event_type, + from_db=from_db + ) + return { + "events": [ + { + "id": e.id, + "type": e.type.value, + "occurred_at": e.occurred_at.isoformat(), + "provider_id": e.provider_id, + "external_id": e.external_id, + "job_id": e.job_id, + "user_id": e.user_id, + "payload_meta": e.payload_meta, + "correlation_id": e.correlation_id, + } + for e in events + ], + "total": len(events), + "source": "database" if from_db else "memory", + } + + +@router.get("/events/sse") +async def events_sse( + request: Request, + admin_auth = Depends(require_admin) +): + """Server-Sent Events stream for real-time event monitoring.""" + emitter: EventEmitter = request.app.state.event_emitter # type: ignore + queue = await emitter.add_subscriber() + + async def event_stream(): + try: + # Send initial keepalive + yield {"event": "connected", "data": '{"status": "connected"}'} + + while True: + try: + msg = await asyncio.wait_for(queue.get(), timeout=30.0) + yield {"event": "event", "data": msg} + except asyncio.TimeoutError: + # Send keepalive every 30 seconds + yield {"event": "keepalive", "data": '{"ping": true}'} + except Exception: + pass + finally: + await emitter.remove_subscriber(queue) + + return EventSourceResponse(event_stream()) + + +@router.get("/events/types") +async def get_event_types(): + """Get available event types for filtering.""" + from api.app.services.events import EventType + + return { + "event_types": [ + {"value": event_type.value, "label": event_type.value.replace(".", " ").title()} + for event_type in EventType + ] + } + diff --git a/api/app/routers/admin_providers.py b/api/app/routers/admin_providers.py new file mode 100644 index 00000000..562e7d0c --- /dev/null +++ b/api/app/routers/admin_providers.py @@ -0,0 +1,153 @@ +""" +Admin endpoints for provider health monitoring and circuit breaker management. + +Provides APIs for: +- Getting provider health status +- Manual enable/disable controls +- Circuit breaker configuration +""" + +from typing import Optional, Dict, Any +from fastapi import APIRouter, Depends, HTTPException, Request +from pydantic import BaseModel + +from api.app.main import require_admin +from api.app.monitoring.health import ProviderHealthMonitor + + +router = APIRouter(prefix="/admin/providers", tags=["Provider Health"], dependencies=[Depends(require_admin)]) + + +class ProviderStatusResponse(BaseModel): + provider_statuses: Dict[str, Dict[str, Any]] + total_providers: int + healthy_count: int + degraded_count: int + circuit_open_count: int + disabled_count: int + + +class ProviderActionRequest(BaseModel): + provider_id: str + + +class ProviderActionResponse(BaseModel): + success: bool + provider_id: str + new_status: str + message: Optional[str] = None + + +@router.get("/health", response_model=ProviderStatusResponse) +async def get_provider_health_status(request: Request): + """Get health status of all providers.""" + health_monitor: ProviderHealthMonitor = getattr(request.app.state, "health_monitor", None) + + if not health_monitor: + raise HTTPException(status_code=503, detail="Health monitor not available") + + provider_statuses = await health_monitor.get_provider_statuses() + + # Calculate summary stats + status_counts = { + 'healthy': 0, + 'degraded': 0, + 'circuit_open': 0, + 'disabled': 0 + } + + for status_info in provider_statuses.values(): + status = status_info.get('status', 'healthy') + if status in status_counts: + status_counts[status] += 1 + + return ProviderStatusResponse( + provider_statuses=provider_statuses, + total_providers=len(provider_statuses), + healthy_count=status_counts['healthy'], + degraded_count=status_counts['degraded'], + circuit_open_count=status_counts['circuit_open'], + disabled_count=status_counts['disabled'] + ) + + +@router.post("/enable", response_model=ProviderActionResponse) +async def enable_provider(request: Request, action_request: ProviderActionRequest): + """Manually enable a provider (reset circuit breaker).""" + health_monitor: ProviderHealthMonitor = getattr(request.app.state, "health_monitor", None) + + if not health_monitor: + raise HTTPException(status_code=503, detail="Health monitor not available") + + try: + await health_monitor.manual_enable_provider(action_request.provider_id) + + return ProviderActionResponse( + success=True, + provider_id=action_request.provider_id, + new_status="healthy", + message=f"Provider {action_request.provider_id} has been enabled and circuit breaker reset" + ) + except Exception as e: + raise HTTPException(status_code=500, detail=f"Failed to enable provider: {str(e)}") + + +@router.post("/disable", response_model=ProviderActionResponse) +async def disable_provider(request: Request, action_request: ProviderActionRequest): + """Manually disable a provider.""" + health_monitor: ProviderHealthMonitor = getattr(request.app.state, "health_monitor", None) + + if not health_monitor: + raise HTTPException(status_code=503, detail="Health monitor not available") + + try: + await health_monitor.manual_disable_provider(action_request.provider_id) + + return ProviderActionResponse( + success=True, + provider_id=action_request.provider_id, + new_status="disabled", + message=f"Provider {action_request.provider_id} has been manually disabled" + ) + except Exception as e: + raise HTTPException(status_code=500, detail=f"Failed to disable provider: {str(e)}") + + +@router.get("/circuit-breaker/{provider_id}/should-allow") +async def should_allow_requests(request: Request, provider_id: str): + """Check if circuit breaker allows requests for a provider.""" + health_monitor: ProviderHealthMonitor = getattr(request.app.state, "health_monitor", None) + + if not health_monitor: + return {"allowed": True, "reason": "Health monitor not available"} + + allowed = health_monitor.should_allow_request(provider_id) + + return { + "provider_id": provider_id, + "allowed": allowed, + "reason": "Circuit breaker state" if not allowed else "Provider healthy" + } + + +@router.post("/circuit-breaker/{provider_id}/record-result") +async def record_request_result( + request: Request, + provider_id: str, + success: bool, + error: Optional[str] = None +): + """Record the result of a provider request for circuit breaker tracking.""" + health_monitor: ProviderHealthMonitor = getattr(request.app.state, "health_monitor", None) + + if not health_monitor: + return {"recorded": False, "reason": "Health monitor not available"} + + health_monitor.record_request_result(provider_id, success, error) + + return { + "provider_id": provider_id, + "recorded": True, + "success": success, + "error": error + } \ No newline at end of file diff --git a/api/app/routers/webhooks_v2.py b/api/app/routers/webhooks_v2.py new file mode 100644 index 00000000..02097691 --- /dev/null +++ b/api/app/routers/webhooks_v2.py @@ -0,0 +1,219 @@ +""" +Enhanced webhook router with verification, DLQ, and idempotency support. + +This router provides enterprise-grade webhook handling: +- Signature verification for all providers +- Dead Letter Queue for failed processing +- Idempotency key support +- Structured audit logging +""" + +import logging +from typing import Dict, Any, Optional +from fastapi import APIRouter, Request, HTTPException, Depends, Header +from fastapi.responses import JSONResponse +from pydantic import BaseModel + +from api.app.services.webhook_processor import WebhookProcessor +from api.app.plugins.manager import PluginManager +from api.app.services.events import EventEmitter +from api.app.config.provider import HybridConfigProvider +from api.app.main import get_plugin_manager, get_event_emitter, get_config_provider + + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/webhooks/v2", tags=["Webhooks V2"]) + + +class WebhookResponse(BaseModel): + success: bool + message: str + job_id: Optional[str] = None + status: str + + +def get_webhook_processor( + plugin_manager: PluginManager = Depends(get_plugin_manager), + event_emitter: EventEmitter = Depends(get_event_emitter), + config_provider: HybridConfigProvider = Depends(get_config_provider) +) -> WebhookProcessor: + """Dependency to get webhook processor.""" + return WebhookProcessor(plugin_manager, event_emitter, config_provider) + + +async def extract_headers(request: Request) -> Dict[str, str]: + """Extract headers from request for webhook processing.""" + return {k.lower(): v for k, v in request.headers.items()} + + +@router.post("/{provider_id}", response_model=WebhookResponse) +async def process_provider_webhook( + provider_id: str, + request: Request, + processor: WebhookProcessor = Depends(get_webhook_processor), + idempotency_key: Optional[str] = Header(None, alias="Idempotency-Key") +): + """ + Process webhook from any provider with unified handling. + + This endpoint provides enterprise-grade webhook processing: + - Signature verification using provider-specific methods + - Job lookup by (provider_id, external_id) + - Dead Letter Queue for failed processing + - Idempotency key support for duplicate prevention + """ + try: + # Extract request data + headers = await extract_headers(request) + body = await request.body() + + logger.info(f"Processing webhook from {provider_id}, size: {len(body)} bytes") + + # Process webhook + result = await processor.process_webhook( + provider_id=provider_id, + headers=headers, + body=body, + idempotency_key=idempotency_key + ) + + # Return appropriate response + status_code = result.get("status", 500) + if not result.get("success", False): + logger.warning(f"Webhook processing failed for {provider_id}: {result.get('error')}") + raise HTTPException(status_code=status_code, detail=result.get("error", "Unknown error")) + + return WebhookResponse( + success=True, + message=f"Webhook processed successfully for {provider_id}", + job_id=result.get("job_id"), + status=result.get("new_status", "processed") + ) + + except HTTPException: + raise + except Exception as e: + logger.exception(f"Unexpected error processing webhook from {provider_id}") + raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") + + +@router.post("/phaxio", response_model=WebhookResponse) +async def phaxio_webhook( + request: Request, + processor: WebhookProcessor = Depends(get_webhook_processor), + idempotency_key: Optional[str] = Header(None, alias="Idempotency-Key") +): + """Legacy Phaxio webhook endpoint - redirects to unified handler.""" + return await process_provider_webhook("phaxio", request, processor, idempotency_key) + + +@router.post("/sinch", response_model=WebhookResponse) +async def sinch_webhook( + request: Request, + processor: WebhookProcessor = Depends(get_webhook_processor), + idempotency_key: Optional[str] = Header(None, alias="Idempotency-Key") +): + """Legacy Sinch webhook endpoint - redirects to unified handler.""" + return await process_provider_webhook("sinch", request, processor, idempotency_key) + + +@router.post("/signalwire", response_model=WebhookResponse) +async def signalwire_webhook( + request: Request, + processor: WebhookProcessor = Depends(get_webhook_processor), + idempotency_key: Optional[str] = Header(None, alias="Idempotency-Key") +): + """Legacy SignalWire webhook endpoint - redirects to unified handler.""" + return await process_provider_webhook("signalwire", request, processor, idempotency_key) + + +# Health check endpoint +@router.get("/health") +async def webhook_health(): + """Health check for webhook service.""" + return {"status": "healthy", "service": "webhooks_v2"} + + +# DLQ management endpoints (admin only) +from api.app.main import require_admin + +@router.get("/admin/dlq", dependencies=[Depends(require_admin)]) +async def get_dlq_entries( + limit: int = 50, + provider_id: Optional[str] = None, + status: Optional[str] = None +): + """Get DLQ entries for admin review.""" + from sqlalchemy import select + from api.app.database import AsyncSessionLocal + from api.app.models.webhook_dlq import WebhookDLQ + + try: + async with AsyncSessionLocal() as session: + stmt = select(WebhookDLQ).order_by(WebhookDLQ.received_at.desc()).limit(limit) + + if provider_id: + stmt = stmt.where(WebhookDLQ.provider_id == provider_id) + if status: + stmt = stmt.where(WebhookDLQ.status == status) + + result = await session.execute(stmt) + entries = result.scalars().all() + + return { + "entries": [ + { + "id": entry.id, + "provider_id": entry.provider_id, + "external_id": entry.external_id, + "received_at": entry.received_at.isoformat(), + "status": entry.status, + "error": entry.error, + "retry_count": entry.retry_count, + "headers_meta": entry.get_headers_meta() + } + for entry in entries + ], + "total": len(entries) + } + + except Exception as e: + logger.exception("Error retrieving DLQ entries") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.post("/admin/dlq/{dlq_id}/retry", dependencies=[Depends(require_admin)]) +async def retry_dlq_entry(dlq_id: str): + """Manually retry a specific DLQ entry.""" + from sqlalchemy import select + from api.app.database import AsyncSessionLocal + from api.app.models.webhook_dlq import WebhookDLQ + from datetime import datetime + + try: + async with AsyncSessionLocal() as session: + stmt = select(WebhookDLQ).where(WebhookDLQ.id == dlq_id) + result = await session.execute(stmt) + entry = result.scalar_one_or_none() + + if not entry: + raise HTTPException(status_code=404, detail="DLQ entry not found") + + # Reset for immediate retry + entry.status = 'retrying' + entry.next_retry_at = datetime.utcnow() + + await session.commit() + + return { + "success": True, + "message": f"DLQ entry {dlq_id} scheduled for retry", + "entry_id": dlq_id + } + + except HTTPException: + raise + except Exception as e: + logger.exception(f"Error retrying DLQ entry {dlq_id}") + raise HTTPException(status_code=500, detail=str(e)) \ No newline at end of file diff --git a/api/app/services/webhook_processor.py b/api/app/services/webhook_processor.py new file mode 100644 index 00000000..3cbd4ccd --- /dev/null +++ b/api/app/services/webhook_processor.py @@ -0,0 +1,294 @@ +""" +Webhook processing service with verification, routing, and DLQ support. + +This service provides enterprise-grade webhook handling: +- Signature verification using plugin-specific methods +- Job lookup by (provider_id, external_id) +- Retry logic with exponential backoff +- Dead Letter Queue for failed processing +- Idempotency key support +""" + +import asyncio +import json +import logging +from datetime import datetime, timedelta +from typing import Dict, Any, Optional, Tuple +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import select +from sqlalchemy.orm import selectinload + +from api.app.database import AsyncSessionLocal +from api.app.models.webhook_dlq import WebhookDLQ +from api.app.models.jobs import Job +from api.app.plugins.manager import PluginManager +from api.app.services.events import EventEmitter, EventType +from api.app.config.provider import HybridConfigProvider + + +logger = logging.getLogger(__name__) + + +class WebhookProcessor: + """ + Processes incoming webhooks with verification, routing, and DLQ support. + """ + + def __init__(self, plugin_manager: PluginManager, event_emitter: EventEmitter, + config_provider: HybridConfigProvider): + self.plugin_manager = plugin_manager + self.event_emitter = event_emitter + self.config_provider = config_provider + self._idempotency_cache: Dict[str, Any] = {} + + async def process_webhook(self, provider_id: str, headers: Dict[str, str], + body: bytes, idempotency_key: Optional[str] = None) -> Dict[str, Any]: + """ + Process an incoming webhook with full verification and error handling. + + Args: + provider_id: Provider that sent the webhook + headers: HTTP headers from webhook request + body: Raw webhook body + idempotency_key: Optional idempotency key for duplicate prevention + + Returns: + Dict with processing result and status + """ + try: + # Check idempotency + if idempotency_key and idempotency_key in self._idempotency_cache: + return self._idempotency_cache[idempotency_key] + + # Get provider plugin + plugin = self.plugin_manager.get_provider_plugin(provider_id) + if not plugin: + error = f"No plugin found for provider: {provider_id}" + logger.error(error) + return {"success": False, "error": error, "status": 404} + + # Verify webhook signature + try: + if hasattr(plugin, 'verify_webhook'): + is_valid = await plugin.verify_webhook(headers, body) + if not is_valid: + error = f"Webhook signature verification failed for provider: {provider_id}" + logger.warning(error) + return {"success": False, "error": error, "status": 401} + except Exception as e: + logger.error(f"Webhook verification error for {provider_id}: {str(e)}") + return {"success": False, "error": f"Verification failed: {str(e)}", "status": 500} + + # Parse webhook payload + try: + payload_str = body.decode('utf-8') + payload = json.loads(payload_str) + except (UnicodeDecodeError, json.JSONDecodeError) as e: + error = f"Invalid webhook payload from {provider_id}: {str(e)}" + logger.error(error) + await self._add_to_dlq(provider_id, None, error, headers) + return {"success": False, "error": error, "status": 400} + + # Extract external_id from payload + external_id = await self._extract_external_id(plugin, payload, provider_id) + if not external_id: + error = f"Could not extract external_id from {provider_id} webhook" + logger.error(error) + await self._add_to_dlq(provider_id, None, error, headers) + return {"success": False, "error": error, "status": 400} + + # Find job by provider_id and external_id + job = await self._find_job(provider_id, external_id) + if not job: + error = f"No job found for {provider_id} external_id: {external_id}" + logger.warning(error) + await self._add_to_dlq(provider_id, external_id, error, headers) + return {"success": False, "error": error, "status": 404} + + # Process status callback + result = await self._process_status_callback(plugin, job, payload) + + # Cache result for idempotency + if idempotency_key: + self._idempotency_cache[idempotency_key] = result + + return result + + except Exception as e: + error = f"Unexpected error processing webhook from {provider_id}: {str(e)}" + logger.exception(error) + await self._add_to_dlq(provider_id, None, error, headers) + return {"success": False, "error": error, "status": 500} + + async def _extract_external_id(self, plugin: Any, payload: Dict[str, Any], + provider_id: str) -> Optional[str]: + """Extract external_id from webhook payload using provider-specific logic.""" + try: + if hasattr(plugin, 'extract_external_id'): + return await plugin.extract_external_id(payload) + + # Fallback logic for common providers + if provider_id == 'phaxio': + return payload.get('fax', {}).get('id') + elif provider_id == 'sinch': + return payload.get('id') + elif provider_id.startswith('sip_'): + return payload.get('job_id') or payload.get('external_id') + + # Generic fallback + return payload.get('id') or payload.get('external_id') or payload.get('job_id') + + except Exception as e: + logger.error(f"Error extracting external_id from {provider_id}: {str(e)}") + return None + + async def _find_job(self, provider_id: str, external_id: str) -> Optional[Job]: + """Find job by provider_id and external_id.""" + try: + async with AsyncSessionLocal() as session: + stmt = select(Job).where( + Job.provider_id == provider_id, + Job.external_id == external_id + ) + result = await session.execute(stmt) + return result.scalar_one_or_none() + except Exception as e: + logger.error(f"Error finding job for {provider_id}/{external_id}: {str(e)}") + return None + + async def _process_status_callback(self, plugin: Any, job: Job, + payload: Dict[str, Any]) -> Dict[str, Any]: + """Process status callback and emit events.""" + try: + old_status = job.status + + # Let plugin handle the status update + if hasattr(plugin, 'handle_status_callback'): + await plugin.handle_status_callback(job, payload) + else: + # Default status mapping + await self._update_job_status_default(job, payload) + + # Emit status change event + if job.status != old_status: + await self.event_emitter.emit_event( + EventType.FAX_STATUS_CHANGED, + job_id=job.id, + provider_id=job.provider_id, + payload_meta={ + 'old_status': old_status, + 'new_status': job.status, + 'external_id': job.external_id + } + ) + + return { + "success": True, + "job_id": job.id, + "old_status": old_status, + "new_status": job.status, + "status": 200 + } + + except Exception as e: + error = f"Error processing status callback: {str(e)}" + logger.exception(error) + return {"success": False, "error": error, "status": 500} + + async def _update_job_status_default(self, job: Job, payload: Dict[str, Any]): + """Default job status update logic.""" + # This is a fallback - plugins should implement handle_status_callback + status_field = payload.get('status') or payload.get('state') + if status_field: + # Map provider-specific statuses to canonical ones + status_map = { + 'success': 'SUCCESS', + 'completed': 'SUCCESS', + 'delivered': 'SUCCESS', + 'failed': 'FAILED', + 'error': 'FAILED', + 'pending': 'QUEUED', + 'queued': 'QUEUED', + 'processing': 'SENDING' + } + canonical_status = status_map.get(status_field.lower(), status_field.upper()) + job.status = canonical_status + + async with AsyncSessionLocal() as session: + session.add(job) + await session.commit() + + async def _add_to_dlq(self, provider_id: str, external_id: Optional[str], + error: str, headers: Dict[str, str]): + """Add failed webhook to Dead Letter Queue.""" + try: + safe_headers = WebhookDLQ.create_safe_headers_meta(headers) + dlq_entry = WebhookDLQ( + provider_id=provider_id, + external_id=external_id, + error=error, + headers_meta=safe_headers + ) + + async with AsyncSessionLocal() as session: + session.add(dlq_entry) + await session.commit() + + logger.info(f"Added webhook to DLQ: provider={provider_id}, external_id={external_id}") + + except Exception as e: + logger.error(f"Failed to add webhook to DLQ: {str(e)}") + + async def retry_dlq_entries(self, max_retries: int = 3): + """ + Process DLQ entries that are ready for retry. + + This should be called periodically by a background task. + """ + try: + async with AsyncSessionLocal() as session: + # Find DLQ entries ready for retry + now = datetime.utcnow() + stmt = select(WebhookDLQ).where( + WebhookDLQ.status == 'retrying', + WebhookDLQ.next_retry_at <= now, + WebhookDLQ.retry_count.cast(int) < max_retries + ) + result = await session.execute(stmt) + dlq_entries = result.scalars().all() + + for entry in dlq_entries: + await self._retry_dlq_entry(session, entry, max_retries) + + await session.commit() + + except Exception as e: + logger.error(f"Error processing DLQ retries: {str(e)}") + + async def _retry_dlq_entry(self, session: AsyncSession, entry: WebhookDLQ, max_retries: int): + """Retry a single DLQ entry.""" + try: + retry_count = int(entry.retry_count) + if retry_count >= max_retries: + entry.mark_failed(f"Max retries ({max_retries}) exceeded") + return + + # Calculate exponential backoff + backoff_seconds = min(300, 30 * (2 ** retry_count)) # Max 5 minutes + next_retry = datetime.utcnow() + timedelta(seconds=backoff_seconds) + + entry.increment_retry(next_retry) + + logger.info(f"Scheduled DLQ entry for retry: {entry.id}, attempt {retry_count + 1}") + + except Exception as e: + logger.error(f"Error retrying DLQ entry {entry.id}: {str(e)}") + entry.mark_failed(f"Retry error: {str(e)}") + + def clear_idempotency_cache(self, older_than_minutes: int = 60): + """Clear old idempotency cache entries to prevent memory leaks.""" + # In production, this should use Redis with TTL + # For now, we'll keep it simple + if len(self._idempotency_cache) > 1000: + self._idempotency_cache.clear() + logger.info("Cleared idempotency cache due to size limit") \ No newline at end of file diff --git a/v4_plans/implement/phase_3_runbook.md b/v4_plans/implement/phase_3_runbook.md new file mode 100644 index 00000000..3b3f8556 --- /dev/null +++ b/v4_plans/implement/phase_3_runbook.md @@ -0,0 +1,389 @@ +Phase 3 Runbook — Hierarchical Config, Diagnostics, Reliability + +Branch: auto-tunnel (all v4 work stays here) +Scope: DB-first hierarchical config, Redis caching + invalidation, canonical events + SSE, webhook hardening + DLQ, provider health + circuit breaker, rate limiting, HIPAA-aligned logging + +0) Pre-flight (once per machine) + +Checklist + + Git on the right branch and clean working tree + + Python 3.11+ / Node 20+ / Redis 7 / Postgres 14+ reachable + + Uvicorn + Alembic available + + Strong crypto key present (44-char Fernet) + +Commands (copy/paste) + +echo "== ensure branch auto-tunnel and clean tree" && git fetch --all --tags && git checkout -B auto-tunnel origin/auto-tunnel && git status && \ +echo "== versions" && python3 --version && node --version && npm --version && psql --version && redis-server --version && \ +echo "== install server deps (uvicorn/alembic/cryptography)" && pip3 install -U pip wheel && pip3 install -r requirements.txt || true && \ +echo "== install admin UI deps" && cd api/admin_ui && npm ci && cd - && \ +echo "== start local Redis if not present" && (docker ps --format '{{.Names}}' | grep -q '^faxbot-redis$' || docker run -d --name faxbot-redis -p 6379:6379 redis:7-alpine) && \ +echo "== generate CONFIG_MASTER_KEY if missing and write .env.local" && python3 - <<'PY' +import os,base64,sys +from pathlib import Path +env = Path(".env.local") +if env.exists(): txt = env.read_text() +else: txt = "" +if "CONFIG_MASTER_KEY=" in txt: + print("CONFIG_MASTER_KEY already present") +else: + try: + from cryptography.fernet import Fernet + key = Fernet.generate_key().decode() + except Exception: + key = base64.urlsafe_b64encode(os.urandom(32)).decode()[:44] + with env.open("a") as f: + f.write(f"\nCONFIG_MASTER_KEY={key}\n") + print("CONFIG_MASTER_KEY created") +PY + + +Stop-check + + git status shows On branch auto-tunnel, no uncommitted changes needed for this run + + .env.local now contains CONFIG_MASTER_KEY= with 44 characters + +1) Database migration — hierarchical config + audit + events + DLQ + +Checklist + + Migration 003 applied (config tables + audit + events + dlq) + + Alembic heads consistent (no divergence) + +Commands + +echo "== run Phase 3 migrations" && alembic -c api/db/alembic.ini upgrade head && \ +echo "== confirm tables exist" && psql "$DATABASE_URL" -c "\dt+" | egrep -i "config_(global|tenant|department|group|user)|config_audit|events|webhook_dlq" + + +Stop-check + + Tables config_global/tenant/department/group/user, config_audit, events, webhook_dlq all listed + +2) Boot with safe defaults (no PHI, DB-first reads) + +Checklist + + Server starts and refuses to boot if CONFIG_MASTER_KEY is invalid + + Admin Console reachable + +Commands + +echo "== export baseline env (safe defaults)" && \ +export FAXBOT_ENV="dev" && export ENABLE_LOCAL_ADMIN="true" && export REDIS_URL="redis://localhost:6379/2" && \ +export REQUIRE_API_KEY="true" && export API_KEY="devkey-devkey-devkey" && \ +echo "== start server (foreground)" && uvicorn api.app.main:app --host 0.0.0.0 --port 8080 + + +(Open a second terminal for the next steps.) + +Stop-check + + http://localhost:8080/health returns 200 + + Admin Console loads at http://localhost:8080/admin/ + +3) Hierarchical Config — effective resolution + safe writes + +Checklist + + Effective read works (db/default/cache source indicated) + + Safe key write at tenant and user levels reflects in effective resolution + + Cache invalidation works after write + +Commands + +echo "== show effective config (admin-capable required; using API key header)" && \ +curl -sS -H "X-API-Key: devkey-devkey-devkey" "http://localhost:8080/admin/config/effective" | jq '.values|keys' && \ +echo "== set tenant override for api.rate_limit_rpm=60" && \ +curl -sS -H "X-API-Key: devkey-devkey-devkey" -H "Content-Type: application/json" \ + -X POST "http://localhost:8080/admin/config/set" \ + -d '{"key":"api.rate_limit_rpm","value":60,"level":"tenant","level_id":"tenant_demo","reason":"Phase3 tenant default"}' | jq && \ +echo "== set user override for api.rate_limit_rpm=5" && \ +curl -sS -H "X-API-Key: devkey-devkey-devkey" -H "Content-Type: application/json" \ + -X POST "http://localhost:8080/admin/config/set" \ + -d '{"key":"api.rate_limit_rpm","value":5,"level":"user","level_id":"user_demo","reason":"tight user test"}' | jq && \ +echo "== query hierarchy for key" && \ +curl -sS -H "X-API-Key: devkey-devkey-devkey" \ + "http://localhost:8080/admin/config/hierarchy?key=api.rate_limit_rpm" | jq && \ +echo "== flush config cache" && \ +curl -sS -H "X-API-Key: devkey-devkey-devkey" -X POST "http://localhost:8080/admin/config/flush-cache" | jq + + +Stop-check + + hierarchy.layers[0] shows user value 5 when asking from that user context + + After flush-cache, re-query reflects latest DB values + + No secrets displayed; masked in UI + +4) Redis Cache — stats + fallback + +Checklist + + Redis stats visible; local cache present + + Service keeps running if Redis is killed (falls back to local) + +Commands + +echo "== get effective + cache stats" && \ +curl -sS -H "X-API-Key: devkey-devkey-devkey" "http://localhost:8080/admin/config/effective" | jq '.cache_stats' && \ +echo "== simulate Redis outage (container stop)" && docker stop faxbot-redis && sleep 2 && \ +echo "== re-query effective (should still work using local cache)" && \ +curl -sS -H "X-API-Key: devkey-devkey-devkey" "http://localhost:8080/admin/config/effective" | jq '.cache_stats' && \ +echo "== restore Redis" && docker start faxbot-redis && sleep 2 + + +Stop-check + + With Redis down, effective config endpoint still works (warning allowed) + + After restart, stats show Redis again + +5) Canonical Events + SSE diagnostics (no PHI) + +Checklist + + Events table persists entries + + SSE stream shows recent + live events (admin-only) + + Payload metadata contains no PHI + +Commands + +echo "== tail SSE (keep this running; Ctrl+C to exit later)" && \ +curl -N -H "X-API-Key: devkey-devkey-devkey" "http://localhost:8080/admin/diagnostics/events" & +echo "== emit a test event via helper endpoint (if present) or trigger by sending a test fax job" && \ +curl -sS -H "X-API-Key: devkey-devkey-devkey" "http://localhost:8080/admin/diagnostics/events/recent?limit=5" | jq + + +Stop-check + + SSE stream prints recent events, then heartbeats; fields are IDs + metadata only + + No names, numbers, or documents appear in payload_meta + +6) Provider Health + Circuit Breaker + +Checklist + + Health monitor loop running + + Circuit opens after threshold failures and recovers after timeout/success + + Admin Diagnostics reflects status transitions + +Commands (force a failure with bogus creds, low threshold) + +echo "== lower CB threshold to 1 for provider 'phaxio' (example)" && \ +curl -sS -H "X-API-Key: devkey-devkey-devkey" -H "Content-Type: application/json" \ + -X POST "http://localhost:8080/admin/config/set" \ + -d '{"key":"provider.phaxio.circuit_breaker_threshold","value":1,"level":"global","reason":"test trip"}' | jq && \ +echo "== attempt a send with intentionally bad creds to trip the circuit" && \ +curl -sS -H "X-API-Key: devkey-devkey-devkey" -F to=+15551234567 -F file=@./example.pdf \ + -X POST "http://localhost:8080/fax" | jq || true && \ +echo "== check recent events include PROVIDER_HEALTH_CHANGED" && \ +curl -sS -H "X-API-Key: devkey-devkey-devkey" "http://localhost:8080/admin/diagnostics/events/recent?limit=20" | jq '.events[]|select(.type=="PROVIDER_HEALTH_CHANGED")' + + +Stop-check + + A PROVIDER_HEALTH_CHANGED event shows new_status: "circuit_open" + + After timeout or a successful call, status returns to healthy + +7) Webhook Hardening + DLQ + +Checklist + + Unverified callback is rejected (401/403) + + Bad payload lands in DLQ with allowlisted headers only + + Lookup of job uses (provider_id, external_id) + +Commands (simulate bad callback; provider route may vary) + +echo "== send bogus callback to provider route to trigger DLQ (adjust path to an enabled provider)" && \ +curl -sS -H "Content-Type: application/json" \ + -d '{"fake":"payload","external_id":"does-not-exist"}' \ + "http://localhost:8080/callbacks/phaxio" | jq || true && \ +echo "== inspect DLQ entries (via SQL; adjust table/schema if needed)" && \ +psql "$DATABASE_URL" -c "SELECT id,provider_id,external_id,status,substr(headers_meta,1,120) AS headers_sample FROM webhook_dlq ORDER BY received_at DESC LIMIT 5;" + + +Stop-check + + Callback returns 401/403 or handled with DLQ store (no Authorization headers saved) + + DLQ contains sanitized header metadata only + +8) Rate Limiting (per key/endpoint) + +Checklist + + 429 returned with Retry-After when limit exceeded + + Limits are hierarchical (tenant/user can override) + +Commands + +echo "== set global rate limit for fax send to 1 rpm" && \ +curl -sS -H "X-API-Key: devkey-devkey-devkey" -H "Content-Type: application/json" \ + -X POST "http://localhost:8080/admin/config/set" \ + -d '{"key":"api.rate_limit_rpm","value":1,"level":"global","reason":"limit test"}' | jq && \ +echo "== send two requests quickly; second should 429" && \ +BASE="http://localhost:8080" && \ +curl -sS -H "X-API-Key: devkey-devkey-devkey" -F to=+15551234567 -F file=@./example.pdf -X POST "$BASE/fax" | jq && \ +curl -i -sS -H "X-API-Key: devkey-devkey-devkey" -F to=+15551234567 -F file=@./example.pdf -X POST "$BASE/fax" | sed -n '1,20p' + + +Stop-check + + Second response shows HTTP/1.1 429 and a Retry-After header + +9) Admin Console — Configuration Manager & Diagnostics UI + +Checklist + + “Configuration Manager” screen renders effective values with source badges + + Safe keys editable; secrets masked + + Diagnostics shows provider states and SSE status + +Manual UI actions + +Settings → Configuration Manager: verify source chips (db/env/default/cache) + +Diagnostics → Events: live SSE updates appear; no PHI + +Diagnostics → Providers: status chips reflect health/circuit state + +10) Acceptance Sweep (tick all) + +Configuration + + User→Group→Department→Tenant→Global→Default resolution verified + + Writes persist without restart; audit entries created + + Safe keys only editable via Admin Console; validation enforced + + Secrets encrypted at rest; masked in UI + +Caching + + Redis on; cache hits observed; invalidation works + + Redis off; service continues with local cache + +Events & SSE + + Canonical events persisted; SSE streams IDs/metadata only (no PHI) + + Recent events endpoint filters by type/provider + +Provider Health + + Health checks run; circuit opens on failures; recovers as configured + + Manual enable/disable reflected in events + UI + +Webhooks + + Signature/verification enforced when provider supports it + + Bad callbacks routed to DLQ; headers allowlisted; no secrets persisted + +Rate Limiting + + 429 with Retry-After for exceeded limits; hierarchical overrides honored + +Security + + All admin endpoints require admin_capable trait + + CONFIG_MASTER_KEY length 44; boot fails fast if invalid + + No PHI in logs/streams/events + +11) Troubleshooting (one step at a time) + +A. Boot fails with crypto errors + +echo "== verify CONFIG_MASTER_KEY length is 44 and valid base64" && \ +python3 - <<'PY' +import os,base64 +k=os.getenv("CONFIG_MASTER_KEY","") +print(len(k),"chars") +base64.urlsafe_b64decode(k.encode()) +print("OK: decodable") +PY + + +If not 44 or decode fails: regenerate key (Section 0) → restart → re-test. + +B. Effective config not changing after set() + +echo "== flush cache and re-read" && \ +curl -sS -H "X-API-Key: devkey-devkey-devkey" -X POST "http://localhost:8080/admin/config/flush-cache" | jq && \ +curl -sS -H "X-API-Key: devkey-devkey-devkey" "http://localhost:8080/admin/config/hierarchy?key=api.rate_limit_rpm" | jq + + +If still stale: verify you wrote to the correct level_id; check audit log row exists. + +C. SSE seems silent + +echo "== fetch recent events directly" && \ +curl -sS -H "X-API-Key: devkey-devkey-devkey" "http://localhost:8080/admin/diagnostics/events/recent?limit=10" | jq + + +If empty: trigger a fax send (success or failure) to generate lifecycle events. + +D. Circuit never opens + +Lower threshold to 1 (Section 6) and force a failing request. Confirm PROVIDER_HEALTH_CHANGED. + +E. 429 not returned + +Confirm middleware is enabled and api.rate_limit_rpm not overridden by user/tenant to a higher value. + +12) Go/No-Go & Rollback + +Go + + All Acceptance Sweep boxes ticked + + Load test: config resolution p95 ≤ 5ms with warm cache + + No PHI observed in any logs/streams + +Rollback (quick) + +echo "== rollback code to previous tag" && \ +git fetch --tags && git checkout && \ +echo "== downgrade DB to pre-003 if necessary" && \ +alembic -c api/db/alembic.ini downgrade -1 && \ +echo "== restore .env/.env.local backups" && ls -la + + +(If you need exact step counts for downgrade, run alembic history --verbose first.) + +13) Quick Audits (hygiene) +echo "== async ORM anti-patterns" && rg -n "await\s+.*\.merge\(" api/app || true && \ +echo "== accidental env fallbacks in provider" && rg -n "os\.getenv\(" api/app/config || true && \ +echo "== cache key shapes" && rg -n "cfg:eff" api/app | sort || true && \ +echo "== SSE usage sites" && rg -n "Event \ No newline at end of file diff --git a/v4_plans/implement/phase_4_implement_plan.md b/v4_plans/implement/phase_4_implement_plan.md index d46aabdd..9fc7efa9 100644 --- a/v4_plans/implement/phase_4_implement_plan.md +++ b/v4_plans/implement/phase_4_implement_plan.md @@ -4,253 +4,7 @@ **Status**: Comprehensive implementation plan including all existing infrastructure **Timeline**: 7-8 weeks -## Executive Summary - -Phase 4 transforms Faxbot into an enterprise integration powerhouse by building upon all existing infrastructure while adding advanced capabilities. This phase consolidates and extends the current plugin ecosystem (transport, storage, identity, messaging), implements a secure marketplace, advanced webhook system, enterprise integrations (LDAP, SAML, ERP), and provides a complete plugin development SDK. - -**Critical Philosophy**: Build upon, don't replace. Every existing service, plugin, and provider must remain functional while gaining new capabilities. - -## Current Infrastructure Inventory (Must Preserve & Extend) - -### 🔌 **Existing Transport Providers** (Phase 4 Enhanced) -**Current Providers** (from `config/provider_traits.json` and `config/plugin_registry.json`): -- **Phaxio Cloud Fax** - HIPAA-ready, webhook support, HMAC verification -- **Sinch Fax API v3** - Direct upload model, basic auth inbound -- **SignalWire** - Twilio-compatible API, cloud-based -- **SIP/Asterisk** - Self-hosted, T.38 protocol, AMI interface -- **FreeSWITCH** - Self-hosted via mod_spandsp, ESL integration -- **Test/Disabled Mode** - Development testing - -**Phase 4 Enhancements**: -```python -# Existing providers get marketplace integration -class PhaxioTransportPlugin(FaxPlugin): - # Current functionality preserved - # + Marketplace metadata - # + Enhanced webhook handling - # + Circuit breaker integration - # + Usage analytics -``` - -### 💾 **Existing Storage Backends** (Phase 4 Extended) -**Current Storage** (from plugin registry): -- **Local Storage** - Development/single-node deployments -- **S3/S3-Compatible** - Production with SSE-KMS encryption - -**Phase 4 Additions**: -- **Azure Blob Storage** with managed identities -- **Google Cloud Storage** with service accounts -- **Multi-cloud replication** for disaster recovery -- **Compliance-aware retention** policies per tenant - -### 🔐 **Authentication Infrastructure** (Phase 4 Enterprise) -**Current Auth** (from Phase 2): -- API key authentication (existing) -- Session-based authentication (Phase 2) -- SQLAlchemy identity provider (Phase 2) -- Trait-based permissions (Phase 2) - -**Phase 4 Enterprise Extensions**: -- LDAP/Active Directory integration -- SAML 2.0 SSO providers -- OAuth2/OIDC providers (Google, Microsoft, Okta) -- Multi-tenant identity isolation - -### 🌐 **MCP Server Infrastructure** (Phase 4 Enhanced) -**Current MCP Servers**: -- **Node MCP** (`node_mcp/`) - stdio/HTTP/SSE/WebSocket transports -- **Python MCP** (`python_mcp/`) - stdio/SSE transports -- Multiple transport support with OAuth2/JWT for SSE - -**Phase 4 Enhancements**: -- Plugin-aware MCP tools -- Enterprise integration MCP extensions -- Webhook event streaming via MCP -- Advanced rate limiting per tenant - -### 🛠️ **Plugin Development Kit** (Phase 4 Complete) -**Current Plugin SDK** (`plugin-dev-kit/python/`): -- Base plugin classes (FaxPlugin, StoragePlugin, AuthPlugin) -- Plugin manifest system -- Testing framework foundation - -**Phase 4 SDK Completion**: -- CLI scaffolding tools (`faxbot-sdk init`, `validate`, `test`) -- Local development server -- Automated testing harness -- CI/CD templates and deployment tools - -## Dependencies & Integration Points - -### Phase 1-3 Foundation Requirements: -- ✅ **Phase 1**: Plugin architecture, SecurityCore, CanonicalMessage, PluginManager -- ✅ **Phase 2**: Trait-based auth, user management, identity providers -- ✅ **Phase 3**: Hierarchical configuration, Redis caching, webhook hardening - -### Phase 3 → Phase 4 Evolution: -```python -# Phase 3: Hierarchical config with reliability -config_value = hierarchical_config.get_effective('fax.provider.endpoint', user_context) - -# Phase 4: Enterprise multi-tenant with marketplace -plugin = marketplace.get_tenant_plugin('transport', 'phaxio', tenant_context) -await plugin.send_with_analytics(message, tenant_analytics) -``` - -## Week 1-2: Plugin Marketplace & Registry Service - -### 1. Advanced Plugin Registry Architecture (with Out-of-Process Host for third-party) - -P0: Third-party (marketplace) plugins MUST run out-of-process (OOP host) with resource limits and no PHI/secret access; first-party built-ins remain in-process. - -Marketplace defaults (security by default): -- `admin.marketplace.enabled = false` (search UI can render, installs disabled) -- `admin.marketplace.remote_install_enabled = false` (explicit admin toggle required) -- `admin.marketplace.trust_tier = 'curated_only'` (HIPAA tenants see HIPAA-compliant plugins only) - -Admin Console constraints: -- Show marketplace UI with disabled “Install” buttons until both flags are enabled. -- Provide warnings and docsBase links (“Learn more”) when enabling remote installs. -- All marketplace screens must be trait-gated (admin_capable), mobile-first, and avoid global CSS. - -**Extend `api/app/plugins/registry/service.py`** (don't replace): -```python -class EnterprisePluginRegistry: - """Enterprise plugin registry with signature verification and marketplace""" - - def __init__(self, base_registry, security_core, hierarchical_config): - self.base_registry = base_registry # Preserve existing functionality - self.security_core = security_core - self.config = hierarchical_config - self.signature_validator = PluginSignatureValidator() - - async def publish_plugin(self, manifest: Dict, signature: str, tenant_id: str) -> Dict[str, Any]: - """Publish plugin to marketplace with signature verification""" - # Verify plugin signature (Sigstore preferred; GPG fallback) - if not await self.signature_validator.verify(manifest, signature): - raise SecurityError("Invalid plugin signature") - - # Validate against existing plugin-dev-kit schema - validation = await self.base_registry.validate_manifest(manifest) - if not validation['valid']: - raise ValidationError(validation['errors']) - - # Store in marketplace with tenant isolation - return await self.store_plugin(manifest, tenant_id) - - async def search_plugins(self, query: str, filters: Dict, tenant_context: Dict) -> List[Dict]: - """Search marketplace with tenant-aware filtering""" - base_results = await self.base_registry.search(query, filters) - - # Apply tenant compliance filtering - compliance_mode = tenant_context.get('compliance_mode', 'standard') - if compliance_mode == 'hipaa': - base_results = [p for p in base_results if p.get('hipaa_compliant', False)] - - return self._apply_tenant_permissions(base_results, tenant_context) - - async def install_plugin(self, plugin_id: str, tenant_context: Dict) -> Dict[str, Any]: - """Install plugin with dependency resolution""" - plugin = await self.get_plugin_metadata(plugin_id) - - # Check compatibility with current platform version - platform_version = await self.get_platform_version() - if not self._check_version_compatibility(plugin, platform_version): - raise CompatibilityError(f"Plugin requires platform {plugin['min_platform_version']}") - - # Resolve and install dependencies - await self._resolve_dependencies(plugin, tenant_context) - - # Sandbox installation with resource limits (out-of-process host) - return await self._sandboxed_install_oop(plugin, tenant_context) - - async def effective_marketplace_flags(self, tenant_context): - # Use hierarchical config (Phase 3) to get per-tenant flags - enabled = await self.config.get_effective('admin.marketplace.enabled', tenant_context) or False - remote_install = await self.config.get_effective('admin.marketplace.remote_install_enabled', tenant_context) or False - trust_tier = await self.config.get_effective('admin.marketplace.trust_tier', tenant_context) or 'curated_only' - return enabled, remote_install, trust_tier -``` - -### 2. Plugin Signature & Security System (Sigstore/GPG) - -**Create `api/app/plugins/security/signature.py`**: -```python -import gnupg -from cryptography.hazmat.primitives import hashes, serialization -from cryptography.hazmat.primitives.asymmetric import rsa, padding -from typing import Dict, Any, Optional -import json -import hashlib - -class PluginSignatureValidator: - """Validates plugin signatures for marketplace security""" - - def __init__(self): - self.gpg = gnupg.GPG() - self.trusted_keys = self._load_trusted_keys() - - async def verify_plugin_signature(self, manifest: Dict[str, Any], signature: str) -> bool: - """Verify plugin manifest signature""" - - # Create canonical manifest hash - canonical_manifest = self._canonicalize_manifest(manifest) - manifest_hash = hashlib.sha256(canonical_manifest.encode()).digest() - - try: - # Try Sigstore first (when configured); fallback to GPG - if await self._verify_sigstore(canonical_manifest, signature): - return True - if await self._verify_gpg_signature(canonical_manifest, signature): - return True - - # Fallback to RSA signature verification - return await self._verify_rsa_signature(manifest_hash, signature) - - except Exception as e: - audit_event('plugin_signature_verification_failed', - plugin_id=manifest.get('id'), - error=str(e)) - return False - - def _canonicalize_manifest(self, manifest: Dict[str, Any]) -> str: - """Create canonical JSON representation for signing""" - # Remove signature-related fields and sort keys - clean_manifest = {k: v for k, v in manifest.items() - if k not in ['signature', '_signature_meta']} - return json.dumps(clean_manifest, sort_keys=True, separators=(',', ':')) - - async def _verify_gpg_signature(self, content: str, signature: str) -> bool: - """Verify GPG signature from trusted keyring""" - try: - # Correct parameter order for detached verify - verified = self.gpg.verify_data(content.encode(), signature.encode()) - return verified.valid and verified.key_id in self.trusted_keys - except Exception: - return False - - async def _verify_sigstore(self, content: str, signature: str) -> bool: - """Verify Sigstore signature (Fulcio/Rekor) when available.""" - try: - # Placeholder; real implementation to call sigstore-python APIs - return False - except Exception: - return False - -### 4. Current State Integration Map (Phase 4) - -- Registry/ingestor: extend existing registry (`api/app/plugins/registry/*`) for marketplace metadata and signature verification; do not fork a parallel system. -- Marketplace defaults: both `admin.marketplace.enabled` and `admin.marketplace.remote_install_enabled` default to false; search works with curated registry; install buttons disabled with explicit admin approval flow. -- Out‑of‑process host (OOP): required for third‑party plugins; built‑ins continue in‑process. Enforce resource limits; no PHI/secret access; audit everything. -- HIPAA filtering: tenant compliance filters marketplace results (only HIPAA‑compliant plugins for HIPAA tenants); trust tiers applied. -- Admin Console: marketplace UI added as a Tab; install buttons disabled until flags enabled; every screen has docsBase links and help texts; mobile‑first; no global CSS. -- Migration stability: existing plugins remain functional if marketplace disabled; manifests and provider traits continue to work; marketplace metadata optional. - -### 5. Security Posture - -- Mandatory signature verification (Sigstore or GPG) for any third‑party plugin. -- OOP host isolation for untrusted plugins; no network egress without policy; logs scrubbed for PHI/secrets. -- Audit: installs, updates, uninstalls recorded with actor, tenant, correlation ID. +1 ``` diff --git a/v4_plans/implement/phase_5_implement_plan.md b/v4_plans/implement/phase_5_implement_plan.md index f69dc74c..4a8dac96 100644 --- a/v4_plans/implement/phase_5_implement_plan.md +++ b/v4_plans/implement/phase_5_implement_plan.md @@ -12,7 +12,7 @@ Phase 5 is the FINAL phase completing the v4 platform transformation. This phase 1. **Sinch** - Most widely used, OAuth2 + Basic Auth, regional endpoints 2. **Phaxio** - HIPAA-ready with BAA, HMAC verification 3. **SignalWire** - Twilio-compatible (NOTE: Twilio fax API is sunset) -4. **HumbleFax** - Complex webhook + email inbound, IMAP integration +4. **HumbleFax** - Most widely used by non-HIPAA customers, Complex webhook + email inbound, IMAP integration 5. **SIP/Asterisk** - Self-hosted T.38, AMI interface 6. **FreeSWITCH** - ESL integration, mod_spandsp 7. **Test/Disabled** - Development and CI/CD diff --git a/v4_plans/implement/pr-gate.sh b/v4_plans/implement/pr-gate.sh new file mode 100644 index 00000000..2d0baaf0 --- /dev/null +++ b/v4_plans/implement/pr-gate.sh @@ -0,0 +1,29 @@ +#!/usr/bin/env bash +set -euo pipefail + +echo "[build]" +docker compose build + +echo "[up]" +docker compose up -d + +echo "[health check (best effort)]" +curl -fsS http://localhost:8080/health || true + +echo "[/metrics exists]" +curl -fsS http://localhost:8080/metrics | head -n 3 >/dev/null + +echo "[providers endpoint shape]" +curl -fsS -H "X-API-Key: ${API_KEY:-dev}" http://localhost:8080/admin/providers | jq -e 'type=="object"' >/dev/null + +echo "[callbacks return 202 (idempotent no-op is fine)]" +code=$(curl -sS -o /dev/null -w "%{http_code}" -X POST http://localhost:8080/phaxio-callback \ + -H "Content-Type: application/json" -H "X-Phaxio-Signature: test" -d '{"id":"smoke-pr","status":"success"}') +test "$code" -eq 202 + +code=$(curl -sS -o /dev/null -w "%{http_code}" -X POST http://localhost:8080/sinch-inbound \ + -H "Authorization: Basic dGVzdDp0ZXN0" -H "Content-Type: application/json" -d '{"id":"smoke-pr"}') +test "$code" -eq 202 + +echo "[openapi diff + traits schema are separate CI jobs]" +echo "OK"