Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 126 additions & 0 deletions workflow/monitoring.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import logging
from typing import Any

import requests
from django.conf import settings

from workflow_app import celery_app

logger = logging.getLogger(__name__)


class CeleryMonitoring:
def __init__(self):
self.insp = celery_app.control.inspect()
self.task_types = ["active_tasks", "scheduled_tasks", "pending_tasks"]

def get_tasks_data(self) -> dict[str, Any]:
try:
tasks_data = {
"active_tasks": self.insp.active() or {},
"scheduled_tasks": self.insp.scheduled() or {},
"pending_tasks": self.insp.reserved() or {},
}

data: dict[str, Any] = {}
for task_type in self.task_types:
for full_worker_name, tasks in tasks_data[task_type].items():
worker_type, worker_name = full_worker_name.split("@")

if worker_type not in data:
data[worker_type] = {}

if worker_name not in data[worker_type]:
data[worker_type][worker_name] = {
"active_tasks": [],
"scheduled_tasks": [],
"pending_tasks": [],
"active_tasks_count": 0,
"pending_tasks_count": 0,
"scheduled_tasks_count": 0,
"total_tasks_count": 0,
}

for task in tasks:
data[worker_type][worker_name][task_type].append(task)
data[worker_type][worker_name][f"{task_type}_count"] += 1
data[worker_type][worker_name]["total_tasks_count"] += 1

return data

except Exception as e:
logger.error(f"Error getting Celery tasks data: {e}")
return {}


class RabbitMQMonitoring:
def __init__(self):
self.host = settings.RABBITMQ_HOST
self.username = settings.RABBITMQ_USER
self.password = settings.RABBITMQ_PASSWORD
self.management_port = getattr(settings, "RABBITMQ_MANAGEMENT_PORT", 15672)
self.auth = (self.username, self.password)
self.base_url = f"http://{self.host}:{self.management_port}/api"

def _make_request(self, endpoint: str) -> dict[str, Any] | None:
try:
url = f"{self.base_url}/{endpoint}"
response = requests.get(url, auth=self.auth, timeout=10)

if response.status_code == 200:
return response.json()
else:
logger.error(f"Error requesting {endpoint}: {response.status_code}")
return None

except Exception as e:
logger.error(f"Error making request to {endpoint}: {e}")
return None

def get_main_queues_info(self) -> dict[str, dict[str, Any]]:
queues_data = self._make_request("queues")
if not queues_data:
return {}

main_queues: dict[str, dict[str, Any]] = {}
for queue in queues_data:
if not isinstance(queue, dict):
continue
queue_name = queue.get("name", "")

if any(skip in queue_name for skip in [".celery.pidbox", "celeryev."]):
continue

if queue_name.startswith("backend-"):
group = "backend"
display_name = queue_name
elif queue_name == "workflow":
group = "workflow"
display_name = queue_name
else:
logger.warning(f"Unknown queue: {queue_name}")
continue

if group not in main_queues:
main_queues[group] = {}

main_queues[group][display_name] = {
"name": queue_name,
"messages": queue.get("messages", 0),
"messages_ready": queue.get("messages_ready", 0),
"messages_unacknowledged": queue.get("messages_unacknowledged", 0),
"consumers": queue.get("consumers", 0),
"consumer_utilisation": queue.get("consumer_utilisation", 0),
}

return main_queues


def get_celery_tasks_data() -> dict[str, Any]:
monitor = CeleryMonitoring()
return monitor.get_tasks_data()


def get_rabbitmq_queues_info() -> dict[str, dict[str, Any]]:
monitor = RabbitMQMonitoring()
return monitor.get_main_queues_info()
90 changes: 90 additions & 0 deletions workflow/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,3 +276,93 @@ class ResumeWorkflowSerializer(serializers.Serializer):
required=False,
help_text="Optional payload to update the workflow before resuming.",
)


class CeleryTaskSerializer(serializers.Serializer):
acknowledged = serializers.BooleanField()
args = serializers.ListField()
delivery_info = serializers.DictField()
hostname = serializers.CharField()
id = serializers.CharField()
kwargs = serializers.DictField()
name = serializers.CharField()
time_start = serializers.FloatField()
type = serializers.CharField()
worker_pid = serializers.IntegerField()

def to_representation(self, instance):
data = super().to_representation(instance)

kwargs = instance.get("kwargs", {})
context = kwargs.get("context", {})
realm_code = context.get("realm_code", "")
space_code = context.get("space_code", "")

hostname = instance.get("hostname", "")
if "workflow" in hostname:
workflow_id = kwargs.get("workflow_id")
if workflow_id and realm_code and space_code:
data["url"] = f"/{realm_code}/{space_code}/w/workflow/{workflow_id}"
else:
data["url"] = None
elif "backend" in hostname:
task_id = kwargs.get("task_id")
if task_id and realm_code and space_code:
data["url"] = f"/{realm_code}/{space_code}/api/v1/tasks/task/{task_id}"
else:
data["url"] = None
else:
data["url"] = None

return data


class CeleryWorkerSerializer(serializers.Serializer):
active_tasks = CeleryTaskSerializer(many=True)
scheduled_tasks = CeleryTaskSerializer(many=True)
pending_tasks = CeleryTaskSerializer(many=True)
active_tasks_count = serializers.IntegerField()
pending_tasks_count = serializers.IntegerField()
scheduled_tasks_count = serializers.IntegerField()
total_tasks_count = serializers.IntegerField()


class CeleryWorkerTypeSerializer(serializers.Serializer):
def to_representation(self, instance):
result = {}
for worker_name, worker_data in instance.items():
result[worker_name] = CeleryWorkerSerializer(worker_data).data
return result


class CeleryMonitoringSerializer(serializers.Serializer):
def to_representation(self, instance):
result = {}
for worker_type, workers in instance.items():
result[worker_type] = CeleryWorkerTypeSerializer(workers).data
return result


class RabbitMQQueueSerializer(serializers.Serializer):
name = serializers.CharField()
messages = serializers.IntegerField()
messages_ready = serializers.IntegerField()
messages_unacknowledged = serializers.IntegerField()
consumers = serializers.IntegerField()
consumer_utilisation = serializers.FloatField()


class RabbitMQQueueGroupSerializer(serializers.Serializer):
def to_representation(self, instance):
result = {}
for queue_name, queue_data in instance.items():
result[queue_name] = RabbitMQQueueSerializer(queue_data).data
return result


class RabbitMQMonitoringSerializer(serializers.Serializer):
def to_representation(self, instance):
result = {}
for group_name, queues in instance.items():
result[group_name] = RabbitMQQueueGroupSerializer(queues).data
return result
149 changes: 149 additions & 0 deletions workflow/tests/test_monitoring.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
from unittest.mock import patch

from rest_framework.test import APIClient

from workflow.models import Space, User
from workflow.tests.base import BaseTestCase


class MonitoringViewSetTestCase(BaseTestCase):
def setUp(self):
self.client = APIClient()
self.realm_code = f"realm{self.random_string(5)}"
self.space_code = f"space{self.random_string(5)}"
self.space = Space.objects.create(realm_code=self.realm_code, space_code=self.space_code)
self.user = User.objects.create(
username=self.random_string(5),
is_staff=True,
is_superuser=True,
)
self.client.force_authenticate(self.user)

@patch("workflow.views.get_celery_tasks_data")
def test_celery_monitoring_view_success(self, mock_get_tasks_data):
mock_tasks_data = {
"workflow": {
"worker00": {
"active_tasks": [
{
"acknowledged": False,
"args": [None],
"delivery_info": {
"exchange": "",
"priority": None,
"redelivered": False,
"routing_key": "workflow",
},
"hostname": "workflow@worker00",
"id": "task1",
"kwargs": {"context": {"realm_code": "realm1", "space_code": "space1"}, "workflow_id": 1},
"name": "workflow.tasks.test",
"time_start": 1757891351.975915,
"type": "workflow.tasks.test",
"worker_pid": 78733,
}
],
"scheduled_tasks": [],
"pending_tasks": [],
"active_tasks_count": 1,
"pending_tasks_count": 0,
"scheduled_tasks_count": 0,
"total_tasks_count": 1,
}
}
}
mock_get_tasks_data.return_value = mock_tasks_data

url = f"/{self.realm_code}/{self.space_code}/workflow/api/monitoring/celery/"
response = self.client.get(url)

self.assertEqual(response.status_code, 200)
data = response.json()

self.assertIn("workflow", data)
self.assertIn("worker00", data["workflow"])
self.assertEqual(data["workflow"]["worker00"]["active_tasks_count"], 1)
self.assertEqual(data["workflow"]["worker00"]["total_tasks_count"], 1)

@patch("workflow.views.get_celery_tasks_data")
def test_celery_monitoring_view_error(self, mock_get_tasks_data):
mock_get_tasks_data.side_effect = Exception("Celery connection failed")

url = f"/{self.realm_code}/{self.space_code}/workflow/api/monitoring/celery/"
response = self.client.get(url)

self.assertEqual(response.status_code, 500)
data = response.json()
self.assertIn("message", data)
self.assertEqual(data["message"], "Celery connection failed")

@patch("workflow.views.get_rabbitmq_queues_info")
def test_rabbitmq_monitoring_view_success(self, mock_get_queues_info):
mock_queues_data = {
"workflow": {
"workflow": {
"name": "workflow",
"messages": 5,
"messages_ready": 3,
"messages_unacknowledged": 2,
"consumers": 1,
"consumer_utilisation": 0.8,
}
},
"backend": {
"backend-task1": {
"name": "backend-task1",
"messages": 10,
"messages_ready": 8,
"messages_unacknowledged": 2,
"consumers": 2,
"consumer_utilisation": 0.6,
}
},
}
mock_get_queues_info.return_value = mock_queues_data

url = f"/{self.realm_code}/{self.space_code}/workflow/api/monitoring/rabbitmq/"
response = self.client.get(url)

self.assertEqual(response.status_code, 200)
data = response.json()

self.assertIn("workflow", data)
self.assertIn("backend", data)
self.assertIn("workflow", data["workflow"])
self.assertIn("backend-task1", data["backend"])

workflow_queue = data["workflow"]["workflow"]
self.assertEqual(workflow_queue["messages"], 5)
self.assertEqual(workflow_queue["consumers"], 1)

@patch("workflow.views.get_rabbitmq_queues_info")
def test_rabbitmq_monitoring_view_error(self, mock_get_queues_info):
mock_get_queues_info.side_effect = Exception("RabbitMQ connection failed")

url = f"/{self.realm_code}/{self.space_code}/workflow/api/monitoring/rabbitmq/"
response = self.client.get(url)

self.assertEqual(response.status_code, 500)
data = response.json()
self.assertIn("message", data)
self.assertEqual(data["message"], "RabbitMQ connection failed")

@patch("workflow.views.get_celery_tasks_data")
@patch("workflow.views.get_rabbitmq_queues_info")
def test_monitoring_endpoints_consistency(self, mock_rabbitmq, mock_celery):
mock_celery.return_value = {}
mock_rabbitmq.return_value = {}

celery_url = f"/{self.realm_code}/{self.space_code}/workflow/api/monitoring/celery/"
rabbitmq_url = f"/{self.realm_code}/{self.space_code}/workflow/api/monitoring/rabbitmq/"

celery_response = self.client.get(celery_url)
rabbitmq_response = self.client.get(rabbitmq_url)

self.assertEqual(celery_response.status_code, 200)
self.assertEqual(rabbitmq_response.status_code, 200)

mock_celery.assert_called_once()
mock_rabbitmq.assert_called_once()
Loading