Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions pr_review/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
"""
Custom exceptions for the PR Review System
"""


class PRReviewError(Exception):
"""Base exception for PR Review System errors"""
pass


class GitHubAPIError(PRReviewError):
"""Raised when GitHub API requests fail"""

def __init__(self, message: str, status_code: int = None, response: dict = None):
self.status_code = status_code
self.response = response
super().__init__(message)


class MetricsCalculationError(PRReviewError):
"""Raised when metrics calculation fails"""
pass


class AILabelingError(PRReviewError):
"""Raised when AI labeling fails"""
pass


class ReviewerAssignmentError(PRReviewError):
"""Raised when reviewer assignment fails"""
pass


class DiscordNotificationError(PRReviewError):
"""Raised when Discord notification fails"""
pass


class ConfigurationError(PRReviewError):
"""Raised when configuration is invalid or missing"""
pass


class ValidationError(PRReviewError):
"""Raised when input validation fails"""
pass
112 changes: 112 additions & 0 deletions pr_review/logging_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
"""
Centralized logging configuration for the PR Review System
"""

import logging
import sys
from pathlib import Path
from logging.handlers import RotatingFileHandler
from typing import Optional


def setup_logging(
name: str = "pr_review",
level: str = "INFO",
log_file: Optional[str] = None,
max_bytes: int = 10485760, # 10MB
backup_count: int = 5,
format_string: Optional[str] = None
) -> logging.Logger:
"""
Configure logging with console and optional file output.

Args:
name: Logger name
level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
log_file: Optional log file path
max_bytes: Maximum log file size before rotation
backup_count: Number of backup files to keep
format_string: Custom format string

Returns:
Configured logger instance
"""
# Create logger
logger = logging.getLogger(name)
logger.setLevel(getattr(logging, level.upper()))

# Remove existing handlers to avoid duplicates
logger.handlers.clear()

# Default format
if format_string is None:
format_string = (
'%(asctime)s - %(name)s - %(levelname)s - '
'[%(filename)s:%(lineno)d] - %(message)s'
)

formatter = logging.Formatter(format_string, datefmt='%Y-%m-%d %H:%M:%S')

# Console handler
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.DEBUG)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)

# File handler (optional)
if log_file:
log_path = Path(log_file)
log_path.parent.mkdir(parents=True, exist_ok=True)

file_handler = RotatingFileHandler(
log_file,
maxBytes=max_bytes,
backupCount=backup_count
)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)

return logger


def get_logger(name: str) -> logging.Logger:
"""
Get a logger instance with the given name.

Args:
name: Logger name

Returns:
Logger instance
"""
return logging.getLogger(name)


class LoggerMixin:
"""Mixin class to add logging capabilities to any class"""

@property
def logger(self) -> logging.Logger:
"""Get logger for this class"""
name = f"{self.__class__.__module__}.{self.__class__.__name__}"
return get_logger(name)


# Context manager for temporary log level changes
class LogLevel:
"""Context manager for temporarily changing log level"""

def __init__(self, logger: logging.Logger, level: str):
self.logger = logger
self.new_level = getattr(logging, level.upper())
self.old_level = None

def __enter__(self):
self.old_level = self.logger.level
self.logger.setLevel(self.new_level)
return self.logger

def __exit__(self, exc_type, exc_val, exc_tb):
self.logger.setLevel(self.old_level)
return False
205 changes: 205 additions & 0 deletions pr_review/utils/retry_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
"""
Retry decorator and utilities for resilient API calls
"""

import time
import functools
import logging
from typing import Callable, Type, Tuple, Optional

logger = logging.getLogger(__name__)


def retry(
max_attempts: int = 3,
delay: float = 1.0,
backoff: float = 2.0,
exceptions: Tuple[Type[Exception], ...] = (Exception,),
on_retry: Optional[Callable] = None
):
"""
Decorator to retry a function on failure with exponential backoff.

Args:
max_attempts: Maximum number of retry attempts
delay: Initial delay between retries in seconds
backoff: Multiplier for delay after each retry
exceptions: Tuple of exception types to catch and retry
on_retry: Optional callback function called on each retry

Returns:
Decorated function with retry logic

Example:
@retry(max_attempts=3, delay=1.0, backoff=2.0, exceptions=(requests.RequestException,))
def fetch_data():
response = requests.get('https://api.example.com/data')
return response.json()
"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs):
current_delay = delay
last_exception = None

for attempt in range(1, max_attempts + 1):
try:
return func(*args, **kwargs)
except exceptions as e:
last_exception = e

if attempt == max_attempts:
logger.error(
f"Function {func.__name__} failed after {max_attempts} attempts. "
f"Last error: {str(e)}"
)
raise

logger.warning(
f"Function {func.__name__} failed (attempt {attempt}/{max_attempts}). "
f"Retrying in {current_delay:.1f}s... Error: {str(e)}"
)

if on_retry:
on_retry(attempt, e)

time.sleep(current_delay)
current_delay *= backoff

# This should never be reached, but just in case
if last_exception:
raise last_exception

return wrapper
return decorator


def retry_async(
max_attempts: int = 3,
delay: float = 1.0,
backoff: float = 2.0,
exceptions: Tuple[Type[Exception], ...] = (Exception,),
on_retry: Optional[Callable] = None
):
"""
Async version of retry decorator for async functions.

Args:
max_attempts: Maximum number of retry attempts
delay: Initial delay between retries in seconds
backoff: Multiplier for delay after each retry
exceptions: Tuple of exception types to catch and retry
on_retry: Optional callback function called on each retry

Returns:
Decorated async function with retry logic

Example:
@retry_async(max_attempts=3, delay=1.0, exceptions=(aiohttp.ClientError,))
async def fetch_data():
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com/data') as response:
return await response.json()
"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
async def wrapper(*args, **kwargs):
import asyncio

current_delay = delay
last_exception = None

for attempt in range(1, max_attempts + 1):
try:
return await func(*args, **kwargs)
except exceptions as e:
last_exception = e

if attempt == max_attempts:
logger.error(
f"Async function {func.__name__} failed after {max_attempts} attempts. "
f"Last error: {str(e)}"
)
raise

logger.warning(
f"Async function {func.__name__} failed (attempt {attempt}/{max_attempts}). "
f"Retrying in {current_delay:.1f}s... Error: {str(e)}"
)

if on_retry:
if asyncio.iscoroutinefunction(on_retry):
await on_retry(attempt, e)
else:
on_retry(attempt, e)

await asyncio.sleep(current_delay)
current_delay *= backoff

# This should never be reached, but just in case
if last_exception:
raise last_exception

return wrapper
return decorator


class CircuitBreaker:
"""
Circuit breaker pattern implementation for preventing cascading failures.

States:
- CLOSED: Normal operation, requests pass through
- OPEN: Too many failures, requests fail immediately
- HALF_OPEN: Testing if service recovered
"""

def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 60.0,
expected_exception: Type[Exception] = Exception
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception

self.failure_count = 0
self.last_failure_time = None
self.state = "CLOSED"

def call(self, func: Callable, *args, **kwargs):
"""Execute function with circuit breaker protection"""
if self.state == "OPEN":
if time.time() - self.last_failure_time >= self.recovery_timeout:
self.state = "HALF_OPEN"
logger.info(f"Circuit breaker entering HALF_OPEN state for {func.__name__}")
else:
raise Exception(f"Circuit breaker is OPEN for {func.__name__}")

try:
result = func(*args, **kwargs)
self._on_success()
return result
except self.expected_exception as e:
self._on_failure()
raise e

def _on_success(self):
"""Handle successful call"""
self.failure_count = 0
if self.state == "HALF_OPEN":
self.state = "CLOSED"
logger.info("Circuit breaker recovered, state: CLOSED")

def _on_failure(self):
"""Handle failed call"""
self.failure_count += 1
self.last_failure_time = time.time()

if self.failure_count >= self.failure_threshold:
self.state = "OPEN"
logger.error(
f"Circuit breaker opened after {self.failure_count} failures. "
f"Will retry after {self.recovery_timeout}s"
)