diff --git a/api/alembic/versions/0002_hierarchical_config.py b/api/alembic/versions/0002_hierarchical_config.py new file mode 100644 index 00000000..2a38d2b5 --- /dev/null +++ b/api/alembic/versions/0002_hierarchical_config.py @@ -0,0 +1,131 @@ +"""Hierarchical configuration tables + +Revision ID: 0002_hierarchical_config +Revises: 0001_initial +Create Date: 2025-09-26 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '0002_hierarchical_config' +down_revision = '0001_initial' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # Global configuration + op.create_table( + 'config_global', + sa.Column('key', sa.String(200), primary_key=True, nullable=False), + sa.Column('value_encrypted', sa.Text(), nullable=False), + sa.Column('value_type', sa.String(20), nullable=False, server_default='string'), + sa.Column('encrypted', sa.Boolean(), nullable=False, server_default=sa.true()), + sa.Column('description', sa.Text(), nullable=True), + sa.Column('category', sa.String(50), nullable=True), + sa.Column('created_at', sa.DateTime(), nullable=False, server_default=sa.func.now()), + sa.Column('updated_at', sa.DateTime(), nullable=False, server_default=sa.func.now()), + ) + + # Tenant-level configuration + op.create_table( + 'config_tenant', + sa.Column('tenant_id', sa.String(100), nullable=False), + sa.Column('key', sa.String(200), nullable=False), + sa.Column('value_encrypted', sa.Text(), nullable=False), + sa.Column('value_type', sa.String(20), nullable=False, server_default='string'), + sa.Column('encrypted', sa.Boolean(), nullable=False, server_default=sa.true()), + sa.Column('created_at', sa.DateTime(), nullable=False, server_default=sa.func.now()), + sa.Column('updated_at', sa.DateTime(), nullable=False, server_default=sa.func.now()), + sa.PrimaryKeyConstraint('tenant_id', 'key'), + ) + op.create_index('idx_tenant_key', 'config_tenant', ['tenant_id', 'key']) + + # Department-level configuration + op.create_table( + 'config_department', + sa.Column('tenant_id', sa.String(100), nullable=False), + sa.Column('department', sa.String(100), nullable=False), + sa.Column('key', sa.String(200), nullable=False), + sa.Column('value_encrypted', sa.Text(), nullable=False), + sa.Column('value_type', sa.String(20), nullable=False, server_default='string'), + sa.Column('encrypted', sa.Boolean(), nullable=False, server_default=sa.true()), + sa.Column('created_at', sa.DateTime(), nullable=False, server_default=sa.func.now()), + sa.Column('updated_at', sa.DateTime(), nullable=False, server_default=sa.func.now()), + sa.PrimaryKeyConstraint('tenant_id', 'department', 'key'), + ) + op.create_index('idx_dept_key', 'config_department', ['tenant_id', 'department', 'key']) + + # Group-level configuration + op.create_table( + 'config_group', + sa.Column('group_id', sa.String(100), nullable=False), + sa.Column('key', sa.String(200), nullable=False), + sa.Column('value_encrypted', sa.Text(), nullable=False), + sa.Column('value_type', sa.String(20), nullable=False, server_default='string'), + sa.Column('encrypted', sa.Boolean(), nullable=False, server_default=sa.true()), + sa.Column('priority', sa.Integer(), nullable=False, server_default='0'), + sa.Column('created_at', sa.DateTime(), nullable=False, server_default=sa.func.now()), + sa.Column('updated_at', sa.DateTime(), nullable=False, server_default=sa.func.now()), + sa.PrimaryKeyConstraint('group_id', 'key'), + ) + op.create_index('idx_group_key', 'config_group', ['group_id', 'key']) + op.create_index('idx_group_priority', 'config_group', ['group_id', 'priority']) + + # User-level configuration + op.create_table( + 'config_user', + sa.Column('user_id', sa.String(100), nullable=False), + sa.Column('key', sa.String(200), nullable=False), + sa.Column('value_encrypted', sa.Text(), nullable=False), + sa.Column('value_type', sa.String(20), nullable=False, server_default='string'), + sa.Column('encrypted', sa.Boolean(), nullable=False, server_default=sa.true()), + sa.Column('created_at', sa.DateTime(), nullable=False, server_default=sa.func.now()), + sa.Column('updated_at', sa.DateTime(), nullable=False, server_default=sa.func.now()), + sa.PrimaryKeyConstraint('user_id', 'key'), + ) + op.create_index('idx_user_key', 'config_user', ['user_id', 'key']) + + # Configuration audit trail (masked only; integrity fingerprint) + op.create_table( + 'config_audit', + sa.Column('id', sa.String(40), primary_key=True, nullable=False), + sa.Column('level', sa.String(20), nullable=False), + sa.Column('level_id', sa.String(200), nullable=True), + sa.Column('key', sa.String(200), nullable=False), + sa.Column('old_value_masked', sa.Text(), nullable=True), + sa.Column('new_value_masked', sa.Text(), nullable=False), + sa.Column('value_hmac', sa.String(64), nullable=False), # hex sha256 + sa.Column('value_type', sa.String(20), nullable=False), + sa.Column('changed_by', sa.String(100), nullable=False), + sa.Column('changed_at', sa.DateTime(), nullable=False, server_default=sa.func.now()), + sa.Column('reason', sa.Text(), nullable=True), + sa.Column('ip_address', sa.String(45), nullable=True), + sa.Column('user_agent', sa.Text(), nullable=True), + ) + op.create_index('idx_audit_level', 'config_audit', ['level', 'level_id']) + op.create_index('idx_audit_key', 'config_audit', ['key']) + op.create_index('idx_audit_time', 'config_audit', ['changed_at']) + op.create_index('idx_audit_user', 'config_audit', ['changed_by']) + + +def downgrade() -> None: + op.drop_index('idx_audit_user', table_name='config_audit') + op.drop_index('idx_audit_time', table_name='config_audit') + op.drop_index('idx_audit_key', table_name='config_audit') + op.drop_index('idx_audit_level', table_name='config_audit') + op.drop_table('config_audit') + op.drop_index('idx_user_key', table_name='config_user') + op.drop_table('config_user') + op.drop_index('idx_group_priority', table_name='config_group') + op.drop_index('idx_group_key', table_name='config_group') + op.drop_table('config_group') + op.drop_index('idx_dept_key', table_name='config_department') + op.drop_table('config_department') + op.drop_index('idx_tenant_key', table_name='config_tenant') + op.drop_table('config_tenant') + op.drop_table('config_global') + diff --git a/api/app/config/hierarchical_provider.py b/api/app/config/hierarchical_provider.py new file mode 100644 index 00000000..a9a075b4 --- /dev/null +++ b/api/app/config/hierarchical_provider.py @@ -0,0 +1,332 @@ +import json +import uuid +from dataclasses import dataclass +from datetime import datetime +from typing import Any, Dict, List, Literal, Optional + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from api.app.db.async_db import AsyncSessionLocal # type: ignore +from api.app.models.config import ( + ConfigGlobal, + ConfigTenant, + ConfigDepartment, + ConfigGroup, + ConfigUser, + ConfigAudit, +) + +from cryptography.fernet import Fernet # type: ignore + + +ConfigLevel = Literal["global", "tenant", "department", "group", "user"] +ConfigSource = Literal["db", "env", "default", "cache"] + + +@dataclass +class UserContext: + user_id: Optional[str] + tenant_id: Optional[str] = None + department: Optional[str] = None + groups: List[str] = None # type: ignore + + def __post_init__(self) -> None: + if self.groups is None: + self.groups = [] + + +@dataclass +class ConfigValue: + value: Any + source: ConfigSource + level: Optional[ConfigLevel] = None + level_id: Optional[str] = None + encrypted: bool = False + updated_at: Optional[datetime] = None + + +class ConfigEncryption: + def __init__(self, master_key: str): + if not master_key or len(master_key) != 44: + raise ValueError("CONFIG_MASTER_KEY must be a 44-char base64 Fernet key") + self.fernet = Fernet(master_key.encode()) + + def encrypt(self, value: Any, encrypt_flag: bool) -> str: + as_json = json.dumps(value) + return ( + self.fernet.encrypt(as_json.encode()).decode() if encrypt_flag else as_json + ) + + def decrypt(self, stored: str, is_encrypted: bool) -> Any: + try: + if is_encrypted: + dec = self.fernet.decrypt(stored.encode()).decode() + return json.loads(dec) + return json.loads(stored) + except Exception: + return stored + + +class HierarchicalConfigProvider: + BUILT_IN_DEFAULTS: Dict[str, Any] = { + "fax.timeout_seconds": 30, + "fax.max_pages": 100, + "fax.retry_attempts": 3, + "api.rate_limit_rpm": 60, + "notifications.enable_sse": True, + } + + ALWAYS_ENCRYPT_KEYS = { + "fax.provider.api_key", + "fax.provider.secret", + "oauth.client_secret", + } + + SAFE_EDIT_KEYS: Dict[str, Dict[str, Any]] = { + "fax.timeout_seconds": {"type": "integer", "min": 5, "max": 300}, + "fax.retry_attempts": {"type": "integer", "min": 0, "max": 10}, + "api.rate_limit_rpm": {"type": "integer", "min": 1, "max": 10000}, + "notifications.enable_sse": {"type": "boolean"}, + } + + def __init__(self, encryption_key: str, cache_manager=None): + self.enc = ConfigEncryption(encryption_key) + self.cache = cache_manager + + async def get_effective( + self, key: str, user_ctx: UserContext, default: Any = None + ) -> ConfigValue: + # Cache first + if self.cache: + ckey = self._cache_key("eff", user_ctx, key) + cached = await self.cache.get(ckey) + if cached: + return ConfigValue(**cached, source="cache") + + val = await self._resolve(key, user_ctx, default) + + if self.cache and val.source == "db": + ckey = self._cache_key("eff", user_ctx, key) + await self.cache.set( + ckey, + { + "value": val.value, + "source": "db", + "level": val.level, + "level_id": val.level_id, + "encrypted": val.encrypted, + "updated_at": val.updated_at.isoformat() if val.updated_at else None, + }, + ttl=300, + ) + + return val + + async def _resolve( + self, key: str, user_ctx: UserContext, default: Any = None + ) -> ConfigValue: + async with AsyncSessionLocal() as db: # type: ignore + # 1. User + if user_ctx.user_id: + q = await db.execute( + select(ConfigUser).where( + ConfigUser.user_id == user_ctx.user_id, + ConfigUser.key == key, + ) + ) + row = q.scalar_one_or_none() + if row: + return ConfigValue( + value=self.enc.decrypt(row.value_encrypted, row.encrypted), + source="db", + level="user", + level_id=user_ctx.user_id, + encrypted=row.encrypted, + updated_at=row.updated_at, + ) + + # 2. Group (first by priority desc) + if user_ctx.groups: + q = await db.execute( + select(ConfigGroup) + .where(ConfigGroup.group_id.in_(user_ctx.groups), ConfigGroup.key == key) + .order_by(ConfigGroup.priority.desc()) + ) + grp = q.first() + if grp: + grp = grp[0] + return ConfigValue( + value=self.enc.decrypt(grp.value_encrypted, grp.encrypted), + source="db", + level="group", + level_id=grp.group_id, + encrypted=grp.encrypted, + updated_at=grp.updated_at, + ) + + # 3. Department + if user_ctx.tenant_id and user_ctx.department: + q = await db.execute( + select(ConfigDepartment).where( + ConfigDepartment.tenant_id == user_ctx.tenant_id, + ConfigDepartment.department == user_ctx.department, + ConfigDepartment.key == key, + ) + ) + dep = q.scalar_one_or_none() + if dep: + return ConfigValue( + value=self.enc.decrypt(dep.value_encrypted, dep.encrypted), + source="db", + level="department", + level_id=f"{user_ctx.tenant_id}:{user_ctx.department}", + encrypted=dep.encrypted, + updated_at=dep.updated_at, + ) + + # 4. Tenant + if user_ctx.tenant_id: + q = await db.execute( + select(ConfigTenant).where( + ConfigTenant.tenant_id == user_ctx.tenant_id, + ConfigTenant.key == key, + ) + ) + ten = q.scalar_one_or_none() + if ten: + return ConfigValue( + value=self.enc.decrypt(ten.value_encrypted, ten.encrypted), + source="db", + level="tenant", + level_id=user_ctx.tenant_id, + encrypted=ten.encrypted, + updated_at=ten.updated_at, + ) + + # 5. Global + q = await db.execute(select(ConfigGlobal).where(ConfigGlobal.key == key)) + glob = q.scalar_one_or_none() + if glob: + return ConfigValue( + value=self.enc.decrypt(glob.value_encrypted, glob.encrypted), + source="db", + level="global", + encrypted=glob.encrypted, + updated_at=glob.updated_at, + ) + + # 6. Built-in defaults + if key in self.BUILT_IN_DEFAULTS: + return ConfigValue(value=self.BUILT_IN_DEFAULTS[key], source="default") + + # 7. fallback (unset) + if default is not None: + return ConfigValue(value=default, source="default") + raise KeyError(key) + + def _cache_key(self, prefix: str, u: UserContext, key: str) -> str: + parts = [u.tenant_id or "null", u.department or "null", u.user_id or "null", ",".join(sorted(u.groups)) or "null"] + return f"cfg:{prefix}:{':'.join(parts)}:{key}" + + async def get_hierarchy(self, key: str, user_ctx: UserContext) -> List[ConfigValue]: + """Return layered values from highest to lowest priority present + default.""" + layers: List[ConfigValue] = [] + try: + # user + if user_ctx.user_id: + async with AsyncSessionLocal() as db: # type: ignore + q = await db.execute(select(ConfigUser).where(ConfigUser.user_id == user_ctx.user_id, ConfigUser.key == key)) + row = q.scalar_one_or_none() + if row: + layers.append( + ConfigValue( + value=self.enc.decrypt(row.value_encrypted, row.encrypted), + source="db", + level="user", + level_id=user_ctx.user_id, + encrypted=row.encrypted, + updated_at=row.updated_at, + ) + ) + # group (first only reported) + if user_ctx.groups: + async with AsyncSessionLocal() as db: # type: ignore + q = await db.execute( + select(ConfigGroup) + .where(ConfigGroup.group_id.in_(user_ctx.groups), ConfigGroup.key == key) + .order_by(ConfigGroup.priority.desc()) + ) + grp = q.first() + if grp: + grp = grp[0] + layers.append( + ConfigValue( + value=self.enc.decrypt(grp.value_encrypted, grp.encrypted), + source="db", + level="group", + level_id=grp.group_id, + encrypted=grp.encrypted, + updated_at=grp.updated_at, + ) + ) + # department + if user_ctx.tenant_id and user_ctx.department: + async with AsyncSessionLocal() as db: # type: ignore + q = await db.execute( + select(ConfigDepartment).where( + ConfigDepartment.tenant_id == user_ctx.tenant_id, + ConfigDepartment.department == user_ctx.department, + ConfigDepartment.key == key, + ) + ) + dep = q.scalar_one_or_none() + if dep: + layers.append( + ConfigValue( + value=self.enc.decrypt(dep.value_encrypted, dep.encrypted), + source="db", + level="department", + level_id=f"{user_ctx.tenant_id}:{user_ctx.department}", + encrypted=dep.encrypted, + updated_at=dep.updated_at, + ) + ) + # tenant + if user_ctx.tenant_id: + async with AsyncSessionLocal() as db: # type: ignore + q = await db.execute(select(ConfigTenant).where(ConfigTenant.tenant_id == user_ctx.tenant_id, ConfigTenant.key == key)) + ten = q.scalar_one_or_none() + if ten: + layers.append( + ConfigValue( + value=self.enc.decrypt(ten.value_encrypted, ten.encrypted), + source="db", + level="tenant", + level_id=user_ctx.tenant_id, + encrypted=ten.encrypted, + updated_at=ten.updated_at, + ) + ) + # global + async with AsyncSessionLocal() as db: # type: ignore + q = await db.execute(select(ConfigGlobal).where(ConfigGlobal.key == key)) + glob = q.scalar_one_or_none() + if glob: + layers.append( + ConfigValue( + value=self.enc.decrypt(glob.value_encrypted, glob.encrypted), + source="db", + level="global", + encrypted=glob.encrypted, + updated_at=glob.updated_at, + ) + ) + except Exception: + pass + if key in self.BUILT_IN_DEFAULTS: + layers.append(ConfigValue(value=self.BUILT_IN_DEFAULTS[key], source="default")) + return layers + + async def get_safe_edit_keys(self) -> Dict[str, Dict[str, Any]]: + return dict(self.SAFE_EDIT_KEYS) diff --git a/api/app/main.py b/api/app/main.py index bbc00db7..ed3112b3 100644 --- a/api/app/main.py +++ b/api/app/main.py @@ -56,6 +56,7 @@ from .middleware.traits import requires_traits from .security.permissions import require_permissions from .security.user_traits import pack_user_traits +from fastapi import APIRouter app = FastAPI( @@ -116,10 +117,117 @@ def _inbound_dedupe(provider_id: str, external_id: str, window_sec: int = 600) - _inbound_seen.pop(k, None) key = f"{provider_id}:{external_id}" ts = _inbound_seen.get(key) - if ts and ts >= cutoff: - return True + if ts and ts >= cutoff: + return True _inbound_seen[key] = now - return False + return False + +# ===== Phase 3: optional hierarchical config bootstrap (lazy) ===== +try: + from .services.cache_manager import CacheManager # type: ignore + from .config.hierarchical_provider import HierarchicalConfigProvider # type: ignore + + _REDIS_URL = os.getenv("REDIS_URL") + _cmk = os.getenv("CONFIG_MASTER_KEY", "") + _cache = CacheManager(_REDIS_URL) if _REDIS_URL else None + if _cmk and len(_cmk) == 44: + app.state.hierarchical_config = HierarchicalConfigProvider(_cmk, cache_manager=_cache) # type: ignore[attr-defined] + else: + # Expose None if not configured; Admin endpoints will be added in Phase 3 PRs + app.state.hierarchical_config = None # type: ignore[attr-defined] +except Exception: + # Do not block startup; Phase 3 endpoints will check availability + app.state.hierarchical_config = None # type: ignore[attr-defined] + +# ===== Phase 3: Admin Config (v4) endpoints (read-only for now) ===== +router_cfg_v4 = APIRouter(prefix="/admin/config/v4", tags=["ConfigurationV4"], dependencies=[Depends(require_admin)]) + + +@router_cfg_v4.get("/effective") +async def v4_config_effective(request: Request): + hc = getattr(app.state, "hierarchical_config", None) + if not hc: + raise HTTPException(503, "Hierarchical configuration not initialized") + + # Use a minimal system context for now; later we can derive from auth + from .config import settings as _s + user_ctx = { + "user_id": "admin", + "tenant_id": None, + "department": None, + "groups": [], + } + # Common keys for initial surface + keys = [ + "fax.timeout_seconds", + "fax.retry_attempts", + "api.rate_limit_rpm", + "notifications.enable_sse", + ] + out: dict[str, dict[str, Any]] = {} + # Resolve values + from .config.hierarchical_provider import UserContext # type: ignore + for k in keys: + try: + cv = await hc.get_effective(k, UserContext(**user_ctx)) + out[k] = { + "value": cv.value, + "source": cv.source, + "level": cv.level, + "level_id": cv.level_id, + "encrypted": cv.encrypted, + "updated_at": (cv.updated_at.isoformat() if cv.updated_at else None), + } + except Exception: + pass + # Cache stats if present + cache_stats = {} + if getattr(hc, "cache", None): + cache_stats = await hc.cache.get_stats() + return {"values": out, "cache_stats": cache_stats} + + +@router_cfg_v4.get("/hierarchy") +async def v4_config_hierarchy(key: str): + hc = getattr(app.state, "hierarchical_config", None) + if not hc: + raise HTTPException(503, "Hierarchical configuration not initialized") + from .config.hierarchical_provider import UserContext # type: ignore + layers = await hc.get_hierarchy(key, UserContext(user_id="admin", tenant_id=None, department=None, groups=[])) + return { + "key": key, + "layers": [ + { + "level": v.level, + "level_id": v.level_id, + "value": v.value, + "encrypted": v.encrypted, + "updated_at": (v.updated_at.isoformat() if v.updated_at else None), + } + for v in layers + ], + } + + +@router_cfg_v4.get("/safe-keys") +async def v4_config_safe_keys(): + hc = getattr(app.state, "hierarchical_config", None) + if not hc: + raise HTTPException(503, "Hierarchical configuration not initialized") + return await hc.get_safe_edit_keys() + + +@router_cfg_v4.post("/flush-cache") +async def v4_config_flush_cache(scope: Optional[str] = None): + hc = getattr(app.state, "hierarchical_config", None) + if not hc or not getattr(hc, "cache", None): + raise HTTPException(503, "Cache manager not available") + # Basic flush-all for now; scope handling in later PRs + await hc.cache.flush_all() + return {"success": True, "scope": scope or "all"} + + +app.include_router(router_cfg_v4) except Exception: return False diff --git a/api/app/models/config.py b/api/app/models/config.py new file mode 100644 index 00000000..16f4ba36 --- /dev/null +++ b/api/app/models/config.py @@ -0,0 +1,94 @@ +from datetime import datetime +from sqlalchemy import Column, String, Text, Boolean, DateTime, Integer +from sqlalchemy.sql import func + +# Use the existing Base from the sync DB module so Alembic can import models +from api.app.db import Base # type: ignore + + +class ConfigGlobal(Base): # type: ignore + """Global configuration table (system-wide defaults).""" + __tablename__ = "config_global" + + key = Column(String(200), primary_key=True, nullable=False) + value_encrypted = Column(Text(), nullable=False) + value_type = Column(String(20), nullable=False, default="string") + encrypted = Column(Boolean(), nullable=False, default=True) + description = Column(Text(), nullable=True) + category = Column(String(50), nullable=True) + created_at = Column(DateTime(), nullable=False, server_default=func.now()) + updated_at = Column(DateTime(), nullable=False, server_default=func.now()) + + +class ConfigTenant(Base): # type: ignore + """Tenant-level configuration.""" + __tablename__ = "config_tenant" + + tenant_id = Column(String(100), primary_key=True, nullable=False) + key = Column(String(200), primary_key=True, nullable=False) + value_encrypted = Column(Text(), nullable=False) + value_type = Column(String(20), nullable=False, default="string") + encrypted = Column(Boolean(), nullable=False, default=True) + created_at = Column(DateTime(), nullable=False, server_default=func.now()) + updated_at = Column(DateTime(), nullable=False, server_default=func.now()) + + +class ConfigDepartment(Base): # type: ignore + """Department-level configuration.""" + __tablename__ = "config_department" + + tenant_id = Column(String(100), primary_key=True, nullable=False) + department = Column(String(100), primary_key=True, nullable=False) + key = Column(String(200), primary_key=True, nullable=False) + value_encrypted = Column(Text(), nullable=False) + value_type = Column(String(20), nullable=False, default="string") + encrypted = Column(Boolean(), nullable=False, default=True) + created_at = Column(DateTime(), nullable=False, server_default=func.now()) + updated_at = Column(DateTime(), nullable=False, server_default=func.now()) + + +class ConfigGroup(Base): # type: ignore + """Group-level configuration with priority to decide first-match order.""" + __tablename__ = "config_group" + + group_id = Column(String(100), primary_key=True, nullable=False) + key = Column(String(200), primary_key=True, nullable=False) + value_encrypted = Column(Text(), nullable=False) + value_type = Column(String(20), nullable=False, default="string") + encrypted = Column(Boolean(), nullable=False, default=True) + priority = Column(Integer(), nullable=False, default=0) + created_at = Column(DateTime(), nullable=False, server_default=func.now()) + updated_at = Column(DateTime(), nullable=False, server_default=func.now()) + + +class ConfigUser(Base): # type: ignore + """User-level configuration.""" + __tablename__ = "config_user" + + user_id = Column(String(100), primary_key=True, nullable=False) + key = Column(String(200), primary_key=True, nullable=False) + value_encrypted = Column(Text(), nullable=False) + value_type = Column(String(20), nullable=False, default="string") + encrypted = Column(Boolean(), nullable=False, default=True) + created_at = Column(DateTime(), nullable=False, server_default=func.now()) + updated_at = Column(DateTime(), nullable=False, server_default=func.now()) + + +class ConfigAudit(Base): # type: ignore + """Configuration audit trail (masked values; integrity fingerprint only).""" + __tablename__ = "config_audit" + + id = Column(String(40), primary_key=True, nullable=False) + level = Column(String(20), nullable=False) # global|tenant|department|group|user + level_id = Column(String(200), nullable=True) + key = Column(String(200), nullable=False) + old_value_masked = Column(Text(), nullable=True) + new_value_masked = Column(Text(), nullable=False) + value_hmac = Column(String(64), nullable=False) + value_type = Column(String(20), nullable=False) + changed_by = Column(String(100), nullable=False) + changed_at = Column(DateTime(), nullable=False, server_default=func.now()) + reason = Column(Text(), nullable=True) + ip_address = Column(String(45), nullable=True) + user_agent = Column(Text(), nullable=True) + diff --git a/api/app/services/cache_manager.py b/api/app/services/cache_manager.py new file mode 100644 index 00000000..67a91a2d --- /dev/null +++ b/api/app/services/cache_manager.py @@ -0,0 +1,118 @@ +import json +from datetime import datetime, timedelta +from typing import Any, Optional, Dict + +try: + import redis.asyncio as redis # type: ignore +except Exception: # pragma: no cover - optional runtime dep + redis = None # type: ignore + + +class CacheManager: + """Redis-backed cache with a simple local fallback. + + - If REDIS_URL is provided and redis library is available, uses Redis for get/set. + - Always mirrors values into a local in-process cache with a short TTL so we + remain resilient if Redis blips. + """ + + def __init__(self, redis_url: Optional[str] = None): + self.redis_available = False + self.redis_client = None + self.local_cache: Dict[str, Any] = {} + self.local_expiry: Dict[str, datetime] = {} + + if redis_url and redis is not None: + try: + self.redis_client = redis.from_url(redis_url) + self.redis_available = True + except Exception: + # Degrade gracefully; local cache only + self.redis_available = False + + async def get(self, key: str) -> Optional[Any]: + # Try Redis first + if self.redis_available and self.redis_client: + try: + raw = await self.redis_client.get(key) + if raw: + return json.loads(raw) + except Exception: + pass + + # Fallback to local + exp = self.local_expiry.get(key) + if exp and exp > datetime.utcnow(): + return self.local_cache.get(key) + # Expired + self.local_cache.pop(key, None) + self.local_expiry.pop(key, None) + return None + + async def set(self, key: str, value: Any, ttl: int = 300) -> bool: + # Redis + if self.redis_available and self.redis_client: + try: + await self.redis_client.setex(key, ttl, json.dumps(value, default=str)) + except Exception: + pass + # Local mirror with capped TTL (≤ 60s) + self.local_cache[key] = value + self.local_expiry[key] = datetime.utcnow() + timedelta(seconds=min(ttl, 60)) + return True + + async def delete(self, key: str) -> bool: + if self.redis_available and self.redis_client: + try: + await self.redis_client.delete(key) + except Exception: + pass + self.local_cache.pop(key, None) + self.local_expiry.pop(key, None) + return True + + async def delete_pattern(self, pattern: str) -> int: + deleted = 0 + # Redis pattern delete (SCAN based) + if self.redis_available and self.redis_client: + try: + async for key in self.redis_client.scan_iter(match=pattern): + await self.redis_client.delete(key) + deleted += 1 + except Exception: + pass + # Local pattern delete + for key in list(self.local_cache.keys()): + if _pattern_match(key, pattern): + self.local_cache.pop(key, None) + self.local_expiry.pop(key, None) + deleted += 1 + return deleted + + async def flush_all(self) -> None: + # Clear local + self.local_cache.clear() + self.local_expiry.clear() + # Best-effort Redis flush for this DB only + if self.redis_available and self.redis_client: + try: + await self.redis_client.flushdb() + except Exception: + pass + + async def get_stats(self) -> Dict[str, Any]: + return { + "redis_available": bool(self.redis_available), + "local_size": len(self.local_cache), + } + + +def _pattern_match(key: str, pattern: str) -> bool: + # Very small helper; Redis pattern only used with '*' wildcards in our usage + if pattern == key: + return True + if "*" not in pattern: + return False + prefix = pattern.split("*")[0] + return key.startswith(prefix) + diff --git a/api/requirements.txt b/api/requirements.txt index e35cfcbe..5b4e5453 100644 --- a/api/requirements.txt +++ b/api/requirements.txt @@ -19,3 +19,6 @@ pexpect==4.9.0 segno==1.6.1 qrcode==7.4.2 Pillow==10.4.0 +cryptography==43.0.1 +redis==5.0.8 +sse-starlette==3.0.2