-
Notifications
You must be signed in to change notification settings - Fork 0
Open
Labels
enhancementNew feature or requestNew feature or request
Description
JSON formatter içinde bulunan hata takip ve loglama sistemini geliştir.
class JSONFormatter(logging.Formatter):
"""
Yapılandırılmış JSON log formatı.
Log aggregation sistemleri için ideal (ELK, Datadog, Splunk).
Örnek çıktı:
{"timestamp":"2025-01-07T12:00:00.123456+00:00","level":"INFO",
"service":"order-service","message":"Order created","order_id":"ORD-123"}
Özellikler:
- ISO 8601 veya Unix timestamp
- Extra alanlar otomatik eklenir
- Exception detayları (type, message, traceback)
- Thread-safe (record.created kullanır)
"""
def __init__(
self,
service_name: Optional[str] = None,
include_extra: bool = True,
timestamp_format: str = "iso",
include_location: bool = False,
include_exception: bool = True
):
"""
Args:
service_name: Servis adı (default: logger name)
include_extra: Extra alanları dahil et
timestamp_format: "iso" veya "unix"
include_location: Dosya/satır bilgisi ekle
include_exception: Exception bilgisi (type, message, traceback) ekle
"""
super().__init__()
self.service_name = service_name
self.include_extra = include_extra
self.timestamp_format = timestamp_format
self.include_location = include_location
self.include_exception = include_exception
def format(self, record: logging.LogRecord) -> str:
"""Log kaydını JSON string'e dönüştürür."""
# Mesajı al
message = record.getMessage()
# Exception bilgisi varsa mesajdan ayır
exception_data = None
if self.include_exception:
# Mesajda traceback varsa parse et
if '\nTraceback (most recent call last):' in message:
parts = message.split('\nTraceback (most recent call last):', 1)
clean_message = parts[0].strip()
traceback_text = 'Traceback (most recent call last):' + parts[1]
# Exception type ve message'ı traceback'ten çıkar
lines = traceback_text.strip().split('\n')
last_line = lines[-1] if lines else ""
exc_type = "Exception"
exc_message = ""
if ': ' in last_line:
exc_type, exc_message = last_line.split(': ', 1)
elif last_line:
exc_type = last_line
exception_data = {
"type": exc_type,
"message": exc_message,
"traceback": traceback_text
}
message = clean_message
# Temel alanlar
log_data: Dict[str, Any] = {
"timestamp": self._format_timestamp(record),
"level": record.levelname,
"service": self.service_name or record.name,
"message": message,
}
# Lokasyon bilgisi (opsiyonel)
if self.include_location:
log_data["location"] = {
"file": record.filename,
"line": record.lineno,
"function": record.funcName
}
# Extra alanlar
if self.include_extra:
extras = get_extra_fields(record)
log_data.update(extras)
# Exception bilgisi ekle (opsiyonel)
if exception_data:
log_data["exception"] = exception_data
# JSON serialize (hata korumalı)
try:
return json.dumps(log_data, ensure_ascii=False, default=str)
except (TypeError, ValueError) as e:
# Fallback: basit format
return json.dumps({
"timestamp": self._format_timestamp(record),
"level": record.levelname,
"service": self.service_name or record.name,
"message": record.getMessage(),
"_serialization_error": str(e)
})
def _format_timestamp(self, record: logging.LogRecord) -> str:
"""Record'un timestamp'ini formatlar."""
dt = get_record_timestamp(record, use_utc=True)
if self.timestamp_format == "unix":
return str(record.created)
return dt.isoformat(timespec="microseconds")Metadata
Metadata
Assignees
Labels
enhancementNew feature or requestNew feature or request