Skip to content

Commit

Permalink
Add logging tracking id for logging messages
Browse files Browse the repository at this point in the history
  • Loading branch information
hsong-rh committed Feb 26, 2025
1 parent 8a4acec commit e31974a
Show file tree
Hide file tree
Showing 12 changed files with 232 additions and 5 deletions.
19 changes: 18 additions & 1 deletion src/aap_eda/api/serializers/activation.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,14 @@ class Meta:
"event_streams",
"source_mappings",
"skip_audit_events",
"log_tracking_id",
]
read_only_fields = [
"id",
"created_at",
"modified_at",
"log_tracking_id",
]
read_only_fields = ["id", "created_at", "modified_at"]

def to_representation(self, activation):
rules_count, rules_fired_count = get_rules_count(
Expand Down Expand Up @@ -378,6 +384,7 @@ def to_representation(self, activation):
"event_streams": event_streams,
"source_mappings": activation.source_mappings,
"skip_audit_events": activation.skip_audit_events,
"log_tracking_id": activation.log_tracking_id,
"created_by": BasicUserSerializer(activation.created_by).data,
"modified_by": BasicUserSerializer(activation.modified_by).data,
}
Expand Down Expand Up @@ -471,6 +478,7 @@ def create(self, validated_data):
validated_data["rulebook_rulesets"] = rulebook.rulesets
validated_data["git_hash"] = rulebook.project.git_hash
validated_data["project_id"] = rulebook.project.id
validated_data["log_tracking_id"] = str(uuid.uuid4())

if settings.DEPLOYMENT_TYPE == "k8s":
validated_data["k8s_service_name"] = _update_k8s_service_name(
Expand Down Expand Up @@ -820,6 +828,12 @@ class ActivationReadSerializer(serializers.ModelSerializer):
allow_null=True,
child=EventStreamOutSerializer(),
)
log_tracking_id = serializers.CharField(
required=False,
allow_null=True,
allow_blank=True,
help_text="Log tracking ID of the activation",
)
created_by = BasicUserFieldSerializer()
modified_by = BasicUserFieldSerializer()

Expand Down Expand Up @@ -859,12 +873,14 @@ class Meta:
"event_streams",
"source_mappings",
"skip_audit_events",
"log_tracking_id",
]
read_only_fields = [
"id",
"created_at",
"modified_at",
"restarted_at",
"log_tracking_id",
]

def to_representation(self, activation):
Expand Down Expand Up @@ -960,6 +976,7 @@ def to_representation(self, activation):
"event_streams": event_streams,
"source_mappings": activation.source_mappings,
"skip_audit_events": activation.skip_audit_events,
"log_tracking_id": activation.log_tracking_id,
"created_by": BasicUserSerializer(activation.created_by).data,
"modified_by": BasicUserSerializer(activation.modified_by).data,
}
Expand Down
17 changes: 17 additions & 0 deletions src/aap_eda/core/migrations/0059_activation_log_tracking_id.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Generated by Django 4.2.16 on 2025-02-25 16:49

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("core", "0058_activation_edited_at"),
]

operations = [
migrations.AddField(
model_name="activation",
name="log_tracking_id",
field=models.TextField(blank=True),
),
]
1 change: 1 addition & 0 deletions src/aap_eda/core/models/activation.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ class Meta:
default=False,
help_text=("Skip audit events for activation"),
)
log_tracking_id = models.TextField(blank=True)

def get_parent_type(self) -> str:
return ProcessParentType.ACTIVATION
Expand Down
17 changes: 17 additions & 0 deletions src/aap_eda/services/activation/activation_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import contextlib
import logging
import typing as tp
import uuid
from datetime import timedelta

import rq
Expand All @@ -35,6 +36,7 @@
system_cancel_restart_activation,
system_restart_activation,
)
from aap_eda.utils.log_tracking_id_filter import assign_log_tracking_id

from .db_log_handler import DBLogger
from .engine.common import ContainerableInvalidError, ContainerEngine
Expand Down Expand Up @@ -68,6 +70,7 @@ def __init__(
container_engine: The container engine to use.
"""
super().__init__(db_instance)
self._set_log_tracking_id()
if container_engine:
self.container_engine = container_engine
else:
Expand Down Expand Up @@ -106,6 +109,12 @@ def _increase_restart_count(self):
self.db_instance.restart_count += 1
self.db_instance.save(update_fields=["restart_count", "modified_at"])

@run_with_lock
def _reset_log_tracking_id(self, log_tracking_id: str):
"""Reset the log_tracking_id of the activation."""
self.db_instance.log_tracking_id = log_tracking_id
self.db_instance.save(update_fields=["log_tracking_id"])

@run_with_lock
def _check_start_prerequirements(self) -> None:
"""Check if the activation can be started."""
Expand Down Expand Up @@ -662,6 +671,14 @@ def _error_activation(self, msg: str):
LOGGER.error(msg)
self.set_status(ActivationStatus.ERROR, msg)

def _set_log_tracking_id(self):
if not self.db_instance.log_tracking_id:
tracking_id = str(uuid.uuid4())
self._reset_log_tracking_id(tracking_id)
assign_log_tracking_id(tracking_id)
else:
assign_log_tracking_id(self.db_instance.log_tracking_id)

def start(self, is_restart: bool = False):
"""Start an activation.
Expand Down
2 changes: 2 additions & 0 deletions src/aap_eda/services/activation/engine/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ class ContainerRequest(BaseModel):
env_vars: tp.Optional[dict] = None
extra_args: tp.Optional[dict] = None
k8s_service_name: tp.Optional[str] = None
log_tracking_id: tp.Optional[str] = None


class ContainerableMixinError(Exception):
Expand Down Expand Up @@ -167,6 +168,7 @@ def get_container_request(self) -> ContainerRequest:
mounts=settings.PODMAN_MOUNTS,
cmdline=self._build_cmdline(),
k8s_service_name=self.k8s_service_name,
log_tracking_id=self.log_tracking_id,
)

def get_restart_policy(self) -> str:
Expand Down
4 changes: 4 additions & 0 deletions src/aap_eda/services/activation/engine/podman.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,10 @@ def _pull_image(
) -> Image:
try:
log_handler.write(f"Pulling image {request.image_url}", True)
log_handler.write(
f"Log tracking id: {request.log_tracking_id}",
True,
)
LOGGER.info(f"Pulling image : {request.image_url}")
kwargs = {}
if request.credential:
Expand Down
37 changes: 35 additions & 2 deletions src/aap_eda/settings/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -587,14 +587,37 @@ def get_rq_queues() -> dict:
LOGGING = {
"version": 1,
"disable_existing_loggers": False,
"filters": {
"activation_id_filter": {
"()": "aap_eda.utils.log_tracking_id_filter.LogTrackingIdFilter",
},
},
"formatters": {
"simple": {
"format": "{asctime} {name} {levelname:<8} {message}",
"format": "{asctime} {levelname:<8} {name} {message}",
"style": "{",
},
"tracking": {
"format": "{asctime} {levelname:<8} [{log_tracking_id}] {name}"
" {message}",
"style": "{",
},
},
"handlers": {
"console": {"class": "logging.StreamHandler", "formatter": "simple"},
"console": {
"class": "logging.StreamHandler",
"formatter": "simple",
},
"websocket": {
"class": "logging.StreamHandler",
"formatter": "tracking",
"filters": ["activation_id_filter"],
},
"activation": {
"class": "logging.StreamHandler",
"formatter": "tracking",
"filters": ["activation_id_filter"],
},
},
"root": {"handlers": ["console"], "level": "WARNING"},
"loggers": {
Expand All @@ -618,6 +641,16 @@ def get_rq_queues() -> dict:
"level": APP_LOG_LEVEL,
"propagate": False,
},
"aap_eda.wsapi.consumers": {
"handlers": ["websocket"],
"level": APP_LOG_LEVEL,
"propagate": False,
},
"aap_eda.services.activation": {
"handlers": ["activation"],
"level": APP_LOG_LEVEL,
"propagate": False,
},
"ansible_base": {
"handlers": ["console"],
"level": APP_LOG_LEVEL,
Expand Down
31 changes: 31 additions & 0 deletions src/aap_eda/utils/log_tracking_id_filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Copyright 2025 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import contextvars
import logging

log_tracking_id_var = contextvars.ContextVar("log_tracking_id")


class LogTrackingIdFilter(logging.Filter):
def filter(self, record):
record.log_tracking_id = log_tracking_id_var.get(
"Log tracking id not set"
)

return True


def assign_log_tracking_id(log_tracking_id):
log_tracking_id_var.set(log_tracking_id)
12 changes: 12 additions & 0 deletions src/aap_eda/wsapi/consumers.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
)
from aap_eda.core.utils.strings import extract_variables, substitute_variables
from aap_eda.tasks import orchestrator
from aap_eda.utils.log_tracking_id_filter import assign_log_tracking_id

from .messages import (
ActionMessage,
Expand Down Expand Up @@ -101,6 +102,8 @@ async def receive(self, text_data=None, bytes_data=None):
data = json.loads(text_data)
logger.debug(f"AnsibleRulebookConsumer received: {data}")

await self._set_log_tracking_id(data)

msg_type = MessageType(data.get("type"))

try:
Expand Down Expand Up @@ -179,6 +182,15 @@ async def handle_actions(self, message: ActionMessage):
logger.info(f"Start to handle actions: {message}")
await self.insert_audit_rule_data(message)

async def _set_log_tracking_id(self, data: dict):
log_tracking_id = ""
activation_instance_id = data.get("activation_id")
if activation_instance_id:
activation = await self.get_activation(activation_instance_id)
log_tracking_id = activation.log_tracking_id

assign_log_tracking_id(log_tracking_id)

@database_sync_to_async
def handle_heartbeat(self, message: HeartbeatMessage) -> None:
logger.info(f"Start to handle heartbeat: {message}")
Expand Down
4 changes: 3 additions & 1 deletion tests/integration/services/activation/engine/test_podman.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,9 @@ def test_engine_start(
engine.start(request, log_handler)

engine.client.containers.run.assert_called_once()
assert models.RulebookProcessLog.objects.count() == 4
assert (
models.RulebookProcessLog.objects.count() == 5
) # new line for tracking id
for log in models.RulebookProcessLog.objects.all():
assert log.log_timestamp > 0
assert models.RulebookProcessLog.objects.last().log.endswith("is running.")
Expand Down
17 changes: 16 additions & 1 deletion tests/integration/services/activation/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# TODO(alex) dedup code and fixtures across all the tests

from unittest import mock
from unittest.mock import MagicMock, create_autospec
from unittest.mock import MagicMock, create_autospec, patch

import pytest
from _pytest.logging import LogCaptureFixture
Expand Down Expand Up @@ -852,3 +852,18 @@ def test_status_manager_set_status(process_parent):
)
assert process_parent.status == enums.ActivationStatus.PENDING
assert process_parent.status_message == "Activation is pending"


@pytest.mark.django_db
def test_assign_log_tracking_id_when_exists(container_engine_mock):
db_instance_mock = MagicMock()
db_instance_mock.log_tracking_id = "test-tracking-id-123"

manager = ActivationManager(db_instance_mock, container_engine_mock)

with patch(
"aap_eda.services.activation.activation_manager.assign_log_tracking_id"
) as mock_assign:
manager._set_log_tracking_id()

mock_assign.assert_called_once_with("test-tracking-id-123")
Loading

0 comments on commit e31974a

Please sign in to comment.