Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/ISSUE_TEMPLATE/bug_report.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ ___

Environment
===========
- FastAPI Guard Agent version: [e.g. 1.0.2]
- FastAPI Guard version: [e.g. 4.0.2]
- FastAPI Guard Agent version: [e.g. 1.1.0]
- FastAPI Guard version: [e.g. 4.1.2]
- Python version: [e.g. 3.11.10]
- FastAPI version: [e.g. 0.115.0]
- Redis version: [e.g. 7.0.12]
Expand Down
3 changes: 2 additions & 1 deletion .mike.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ version_selector: true
title_switch: true
versions_file: docs/versions/versions.json
versions:
- 1.1.0
- 1.0.2
- 1.0.1
- 1.0.0
Expand All @@ -10,4 +11,4 @@ versions:
- 0.0.1
- latest
aliases:
latest: 1.0.2
latest: 1.1.0
21 changes: 20 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,30 @@ Release Notes

___

v1.1.0 (2025-10-14)
-------------------

New Features (v1.1.0)
---------------------
- Added end-to-end payload encryption for secure telemetry transmission using AES-256-GCM.
- Implemented `PayloadEncryptor` class with project-specific encryption keys.
- Added encrypted endpoint support for events and metrics (`/api/v1/events/encrypted`).
- Integrated automatic datetime serialization in encrypted payloads via custom JSON handler.
- Added encryption key verification during transport initialization.

Technical Details (v1.1.0)
--------------------------
- Encryption uses AES-256-GCM with 96-bit nonces and 128-bit authentication tags.
- Pydantic models are serialized using `.model_dump(mode="json")` before encryption.
- Custom `_default_json_handler` ensures datetime objects are properly ISO-formatted.

___

v1.0.2 (2025-09-12)
-------------------

Enhancements (v1.0.2)
------------
---------------------
- Added dynamic rule updated event type.

___
Expand Down
21 changes: 20 additions & 1 deletion docs/release-notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,30 @@ Release Notes

___

v1.1.0 (2025-10-14)
-------------------

New Features (v1.1.0)
---------------------
- Added end-to-end payload encryption for secure telemetry transmission using AES-256-GCM.
- Implemented `PayloadEncryptor` class with project-specific encryption keys.
- Added encrypted endpoint support for events and metrics (`/api/v1/events/encrypted`).
- Integrated automatic datetime serialization in encrypted payloads via custom JSON handler.
- Added encryption key verification during transport initialization.

Technical Details (v1.1.0)
--------------------------
- Encryption uses AES-256-GCM with 96-bit nonces and 128-bit authentication tags.
- Pydantic models are serialized using `.model_dump(mode="json")` before encryption.
- Custom `_default_json_handler` ensures datetime objects are properly ISO-formatted.

___

v1.0.2 (2025-09-12)
-------------------

Enhancements (v1.0.2)
------------
---------------------
- Added dynamic rule updated event type.

___
Expand Down
3 changes: 2 additions & 1 deletion docs/versions/versions.json
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
{
"1.1.0": "1.1.0",
"1.0.2": "1.0.2",
"1.0.1": "1.0.1",
"1.0.0": "1.0.0",
"0.1.1": "0.1.1",
"0.1.0": "0.1.0",
"0.0.1": "0.0.1",
"latest": "1.0.2"
"latest": "1.1.0"
}
2 changes: 1 addition & 1 deletion guard_agent/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
validate_config,
)

__version__ = "1.0.2"
__version__ = "1.1.0"

__all__ = [
# Main components
Expand Down
210 changes: 210 additions & 0 deletions guard_agent/encryption.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
import base64
import json
import os
from datetime import datetime
from typing import Any

from cryptography.hazmat.primitives.ciphers.aead import AESGCM


class EncryptionError(Exception):
"""Base exception for encryption-related errors."""

pass


def _default_json_handler(obj: Any) -> str:
"""Handle non-serializable objects during JSON encoding."""
if isinstance(obj, datetime):
return obj.isoformat()
raise TypeError(f"Object of type {type(obj).__name__} is not JSON serializable")


class PayloadEncryptor:
"""
Encrypts telemetry payloads using project-specific encryption keys with AES-256-GCM.

This class provides symmetric encryption using AES-256-GCM (Authenticated
Encryption with Associated Data) to protect telemetry data in transit
between the agent and the core backend.

Security features:
- 256-bit keys for strong security
- Authenticated encryption (prevents tampering)
- Project-specific keys for isolation
- Automatic JSON serialization
- Base64 encoding for safe transmission
- 2-3x faster than CBC mode
- No padding oracle vulnerabilities

Example:
>>> encryptor = PayloadEncryptor(project_key="your_key_here")
>>> data = {"events": [...], "metrics": [...]}
>>> encrypted = encryptor.encrypt(data)
>>> # Send encrypted to core
"""

# GCM configuration
NONCE_SIZE = 12 # 96 bits (optimal for GCM)
TAG_SIZE = 16 # 128 bits (maximum security)
KEY_SIZE = 32 # 256 bits (AES-256)

def __init__(self, project_key: str) -> None:
"""
Initialize the encryptor with a project encryption key.

Args:
project_key: Base64-encoded 256-bit key from core backend

Raises:
EncryptionError: If the project key is invalid
"""
if not project_key:
raise EncryptionError("Project key cannot be empty")

try:
# Decode project key
key_bytes = base64.urlsafe_b64decode(project_key.encode())

# Validate key size
if len(key_bytes) != self.KEY_SIZE:
raise EncryptionError(
f"Invalid key size: {len(key_bytes)} bytes, "
f"expected {self.KEY_SIZE}"
)

self._cipher = AESGCM(key_bytes)

except EncryptionError:
raise
except Exception as e:
raise EncryptionError(f"Invalid project key format: {e}") from e

def encrypt(self, data: dict[str, Any], associated_data: str | None = None) -> str:
"""
Encrypt a telemetry payload with AES-256-GCM.

The data is serialized to JSON (with deterministic key ordering),
encrypted using AES-256-GCM, and base64-encoded for transmission.

Args:
data: Dictionary containing events and/or metrics
associated_data: Optional authenticated data (not encrypted)

Returns:
Base64-encoded encrypted payload string

Raises:
EncryptionError: If encryption fails

Example:
>>> data = {
... "events": [{"type": "rate_limit", "ip": "1.2.3.4"}],
... "metrics": [{"type": "request_count", "value": 100}]
... }
>>> encrypted = encryptor.encrypt(data)
"""
try:
# Serialize with deterministic ordering and datetime handling
json_data = json.dumps(
data,
separators=(",", ":"),
sort_keys=True,
default=_default_json_handler,
)

# Generate random nonce (MUST be unique per encryption)
nonce = os.urandom(self.NONCE_SIZE)

# Prepare associated data
aad = associated_data.encode() if associated_data else None

# Encrypt with authentication
encrypted = self._cipher.encrypt(nonce, json_data.encode(), aad)

# Combine nonce + ciphertext
combined = nonce + encrypted

# Base64 encode for transmission
return base64.urlsafe_b64encode(combined).decode()

except Exception as e:
raise EncryptionError(f"Failed to encrypt payload: {e}") from e

def decrypt(
self, encrypted_data: str, associated_data: str | None = None
) -> dict[str, Any]:
"""
Decrypt an encrypted payload (primarily for testing).

In normal operation, only the core backend decrypts payloads.
This method is provided for testing and validation purposes.

Args:
encrypted_data: Base64-encoded encrypted payload
associated_data: Optional authenticated data

Returns:
Decrypted dictionary

Raises:
EncryptionError: If decryption fails or data is tampered

Example:
>>> decrypted = encryptor.decrypt(encrypted_payload)
"""
try:
# Decode from base64
combined = base64.urlsafe_b64decode(encrypted_data.encode())

# Extract nonce and ciphertext
nonce = combined[: self.NONCE_SIZE]
ciphertext = combined[self.NONCE_SIZE :]

# Prepare associated data
aad = associated_data.encode() if associated_data else None

# Decrypt with authentication verification
decrypted = self._cipher.decrypt(nonce, ciphertext, aad)

# Parse JSON
return json.loads(decrypted.decode()) # type: ignore[no-any-return]

except Exception as e:
raise EncryptionError("Invalid or tampered payload") from e

def verify_key(self) -> bool:
"""
Verify that the encryption key is valid.

Performs a quick encryption/decryption round-trip test.

Returns:
True if key is valid, False otherwise
"""
try:
test_data = {"test": "verification"}
encrypted = self.encrypt(test_data)
decrypted = self.decrypt(encrypted)
return decrypted == test_data
except EncryptionError:
return False


def create_encryptor(project_key: str | None) -> PayloadEncryptor | None:
"""
Factory function to create an encryptor with AES-256-GCM.

Args:
project_key: Optional project encryption key (256-bit, base64-encoded)

Returns:
PayloadEncryptor instance if key is provided, None otherwise

Raises:
EncryptionError: If key is provided but invalid
"""
if not project_key:
return None

return PayloadEncryptor(project_key)
6 changes: 6 additions & 0 deletions guard_agent/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ class AgentConfig(BaseModel):
default=1024, description="Maximum payload size to include in events (bytes)"
)

# Encryption
project_encryption_key: str | None = Field(
default=None,
description="Project-specific encryption key for secure telemetry transmission",
)

@field_validator("endpoint") # type: ignore
@classmethod
def validate_endpoint(cls, v: str) -> str:
Expand Down
Loading