diff --git a/workflow/monitoring.py b/workflow/monitoring.py new file mode 100644 index 0000000..12e248f --- /dev/null +++ b/workflow/monitoring.py @@ -0,0 +1,126 @@ +import logging +from typing import Any + +import requests +from django.conf import settings + +from workflow_app import celery_app + +logger = logging.getLogger(__name__) + + +class CeleryMonitoring: + def __init__(self): + self.insp = celery_app.control.inspect() + self.task_types = ["active_tasks", "scheduled_tasks", "pending_tasks"] + + def get_tasks_data(self) -> dict[str, Any]: + try: + tasks_data = { + "active_tasks": self.insp.active() or {}, + "scheduled_tasks": self.insp.scheduled() or {}, + "pending_tasks": self.insp.reserved() or {}, + } + + data: dict[str, Any] = {} + for task_type in self.task_types: + for full_worker_name, tasks in tasks_data[task_type].items(): + worker_type, worker_name = full_worker_name.split("@") + + if worker_type not in data: + data[worker_type] = {} + + if worker_name not in data[worker_type]: + data[worker_type][worker_name] = { + "active_tasks": [], + "scheduled_tasks": [], + "pending_tasks": [], + "active_tasks_count": 0, + "pending_tasks_count": 0, + "scheduled_tasks_count": 0, + "total_tasks_count": 0, + } + + for task in tasks: + data[worker_type][worker_name][task_type].append(task) + data[worker_type][worker_name][f"{task_type}_count"] += 1 + data[worker_type][worker_name]["total_tasks_count"] += 1 + + return data + + except Exception as e: + logger.error(f"Error getting Celery tasks data: {e}") + return {} + + +class RabbitMQMonitoring: + def __init__(self): + self.host = settings.RABBITMQ_HOST + self.username = settings.RABBITMQ_USER + self.password = settings.RABBITMQ_PASSWORD + self.management_port = getattr(settings, "RABBITMQ_MANAGEMENT_PORT", 15672) + self.auth = (self.username, self.password) + self.base_url = f"http://{self.host}:{self.management_port}/api" + + def _make_request(self, endpoint: str) -> dict[str, Any] | None: + try: + url = f"{self.base_url}/{endpoint}" + response = requests.get(url, auth=self.auth, timeout=10) + + if response.status_code == 200: + return response.json() + else: + logger.error(f"Error requesting {endpoint}: {response.status_code}") + return None + + except Exception as e: + logger.error(f"Error making request to {endpoint}: {e}") + return None + + def get_main_queues_info(self) -> dict[str, dict[str, Any]]: + queues_data = self._make_request("queues") + if not queues_data: + return {} + + main_queues: dict[str, dict[str, Any]] = {} + for queue in queues_data: + if not isinstance(queue, dict): + continue + queue_name = queue.get("name", "") + + if any(skip in queue_name for skip in [".celery.pidbox", "celeryev."]): + continue + + if queue_name.startswith("backend-"): + group = "backend" + display_name = queue_name + elif queue_name == "workflow": + group = "workflow" + display_name = queue_name + else: + logger.warning(f"Unknown queue: {queue_name}") + continue + + if group not in main_queues: + main_queues[group] = {} + + main_queues[group][display_name] = { + "name": queue_name, + "messages": queue.get("messages", 0), + "messages_ready": queue.get("messages_ready", 0), + "messages_unacknowledged": queue.get("messages_unacknowledged", 0), + "consumers": queue.get("consumers", 0), + "consumer_utilisation": queue.get("consumer_utilisation", 0), + } + + return main_queues + + +def get_celery_tasks_data() -> dict[str, Any]: + monitor = CeleryMonitoring() + return monitor.get_tasks_data() + + +def get_rabbitmq_queues_info() -> dict[str, dict[str, Any]]: + monitor = RabbitMQMonitoring() + return monitor.get_main_queues_info() diff --git a/workflow/serializers.py b/workflow/serializers.py index f5dc41f..3056753 100644 --- a/workflow/serializers.py +++ b/workflow/serializers.py @@ -276,3 +276,93 @@ class ResumeWorkflowSerializer(serializers.Serializer): required=False, help_text="Optional payload to update the workflow before resuming.", ) + + +class CeleryTaskSerializer(serializers.Serializer): + acknowledged = serializers.BooleanField() + args = serializers.ListField() + delivery_info = serializers.DictField() + hostname = serializers.CharField() + id = serializers.CharField() + kwargs = serializers.DictField() + name = serializers.CharField() + time_start = serializers.FloatField() + type = serializers.CharField() + worker_pid = serializers.IntegerField() + + def to_representation(self, instance): + data = super().to_representation(instance) + + kwargs = instance.get("kwargs", {}) + context = kwargs.get("context", {}) + realm_code = context.get("realm_code", "") + space_code = context.get("space_code", "") + + hostname = instance.get("hostname", "") + if "workflow" in hostname: + workflow_id = kwargs.get("workflow_id") + if workflow_id and realm_code and space_code: + data["url"] = f"/{realm_code}/{space_code}/w/workflow/{workflow_id}" + else: + data["url"] = None + elif "backend" in hostname: + task_id = kwargs.get("task_id") + if task_id and realm_code and space_code: + data["url"] = f"/{realm_code}/{space_code}/api/v1/tasks/task/{task_id}" + else: + data["url"] = None + else: + data["url"] = None + + return data + + +class CeleryWorkerSerializer(serializers.Serializer): + active_tasks = CeleryTaskSerializer(many=True) + scheduled_tasks = CeleryTaskSerializer(many=True) + pending_tasks = CeleryTaskSerializer(many=True) + active_tasks_count = serializers.IntegerField() + pending_tasks_count = serializers.IntegerField() + scheduled_tasks_count = serializers.IntegerField() + total_tasks_count = serializers.IntegerField() + + +class CeleryWorkerTypeSerializer(serializers.Serializer): + def to_representation(self, instance): + result = {} + for worker_name, worker_data in instance.items(): + result[worker_name] = CeleryWorkerSerializer(worker_data).data + return result + + +class CeleryMonitoringSerializer(serializers.Serializer): + def to_representation(self, instance): + result = {} + for worker_type, workers in instance.items(): + result[worker_type] = CeleryWorkerTypeSerializer(workers).data + return result + + +class RabbitMQQueueSerializer(serializers.Serializer): + name = serializers.CharField() + messages = serializers.IntegerField() + messages_ready = serializers.IntegerField() + messages_unacknowledged = serializers.IntegerField() + consumers = serializers.IntegerField() + consumer_utilisation = serializers.FloatField() + + +class RabbitMQQueueGroupSerializer(serializers.Serializer): + def to_representation(self, instance): + result = {} + for queue_name, queue_data in instance.items(): + result[queue_name] = RabbitMQQueueSerializer(queue_data).data + return result + + +class RabbitMQMonitoringSerializer(serializers.Serializer): + def to_representation(self, instance): + result = {} + for group_name, queues in instance.items(): + result[group_name] = RabbitMQQueueGroupSerializer(queues).data + return result diff --git a/workflow/tests/test_monitoring.py b/workflow/tests/test_monitoring.py new file mode 100644 index 0000000..f43079c --- /dev/null +++ b/workflow/tests/test_monitoring.py @@ -0,0 +1,149 @@ +from unittest.mock import patch + +from rest_framework.test import APIClient + +from workflow.models import Space, User +from workflow.tests.base import BaseTestCase + + +class MonitoringViewSetTestCase(BaseTestCase): + def setUp(self): + self.client = APIClient() + self.realm_code = f"realm{self.random_string(5)}" + self.space_code = f"space{self.random_string(5)}" + self.space = Space.objects.create(realm_code=self.realm_code, space_code=self.space_code) + self.user = User.objects.create( + username=self.random_string(5), + is_staff=True, + is_superuser=True, + ) + self.client.force_authenticate(self.user) + + @patch("workflow.views.get_celery_tasks_data") + def test_celery_monitoring_view_success(self, mock_get_tasks_data): + mock_tasks_data = { + "workflow": { + "worker00": { + "active_tasks": [ + { + "acknowledged": False, + "args": [None], + "delivery_info": { + "exchange": "", + "priority": None, + "redelivered": False, + "routing_key": "workflow", + }, + "hostname": "workflow@worker00", + "id": "task1", + "kwargs": {"context": {"realm_code": "realm1", "space_code": "space1"}, "workflow_id": 1}, + "name": "workflow.tasks.test", + "time_start": 1757891351.975915, + "type": "workflow.tasks.test", + "worker_pid": 78733, + } + ], + "scheduled_tasks": [], + "pending_tasks": [], + "active_tasks_count": 1, + "pending_tasks_count": 0, + "scheduled_tasks_count": 0, + "total_tasks_count": 1, + } + } + } + mock_get_tasks_data.return_value = mock_tasks_data + + url = f"/{self.realm_code}/{self.space_code}/workflow/api/monitoring/celery/" + response = self.client.get(url) + + self.assertEqual(response.status_code, 200) + data = response.json() + + self.assertIn("workflow", data) + self.assertIn("worker00", data["workflow"]) + self.assertEqual(data["workflow"]["worker00"]["active_tasks_count"], 1) + self.assertEqual(data["workflow"]["worker00"]["total_tasks_count"], 1) + + @patch("workflow.views.get_celery_tasks_data") + def test_celery_monitoring_view_error(self, mock_get_tasks_data): + mock_get_tasks_data.side_effect = Exception("Celery connection failed") + + url = f"/{self.realm_code}/{self.space_code}/workflow/api/monitoring/celery/" + response = self.client.get(url) + + self.assertEqual(response.status_code, 500) + data = response.json() + self.assertIn("message", data) + self.assertEqual(data["message"], "Celery connection failed") + + @patch("workflow.views.get_rabbitmq_queues_info") + def test_rabbitmq_monitoring_view_success(self, mock_get_queues_info): + mock_queues_data = { + "workflow": { + "workflow": { + "name": "workflow", + "messages": 5, + "messages_ready": 3, + "messages_unacknowledged": 2, + "consumers": 1, + "consumer_utilisation": 0.8, + } + }, + "backend": { + "backend-task1": { + "name": "backend-task1", + "messages": 10, + "messages_ready": 8, + "messages_unacknowledged": 2, + "consumers": 2, + "consumer_utilisation": 0.6, + } + }, + } + mock_get_queues_info.return_value = mock_queues_data + + url = f"/{self.realm_code}/{self.space_code}/workflow/api/monitoring/rabbitmq/" + response = self.client.get(url) + + self.assertEqual(response.status_code, 200) + data = response.json() + + self.assertIn("workflow", data) + self.assertIn("backend", data) + self.assertIn("workflow", data["workflow"]) + self.assertIn("backend-task1", data["backend"]) + + workflow_queue = data["workflow"]["workflow"] + self.assertEqual(workflow_queue["messages"], 5) + self.assertEqual(workflow_queue["consumers"], 1) + + @patch("workflow.views.get_rabbitmq_queues_info") + def test_rabbitmq_monitoring_view_error(self, mock_get_queues_info): + mock_get_queues_info.side_effect = Exception("RabbitMQ connection failed") + + url = f"/{self.realm_code}/{self.space_code}/workflow/api/monitoring/rabbitmq/" + response = self.client.get(url) + + self.assertEqual(response.status_code, 500) + data = response.json() + self.assertIn("message", data) + self.assertEqual(data["message"], "RabbitMQ connection failed") + + @patch("workflow.views.get_celery_tasks_data") + @patch("workflow.views.get_rabbitmq_queues_info") + def test_monitoring_endpoints_consistency(self, mock_rabbitmq, mock_celery): + mock_celery.return_value = {} + mock_rabbitmq.return_value = {} + + celery_url = f"/{self.realm_code}/{self.space_code}/workflow/api/monitoring/celery/" + rabbitmq_url = f"/{self.realm_code}/{self.space_code}/workflow/api/monitoring/rabbitmq/" + + celery_response = self.client.get(celery_url) + rabbitmq_response = self.client.get(rabbitmq_url) + + self.assertEqual(celery_response.status_code, 200) + self.assertEqual(rabbitmq_response.status_code, 200) + + mock_celery.assert_called_once() + mock_rabbitmq.assert_called_once() diff --git a/workflow/views.py b/workflow/views.py index 046f082..5ea9dd7 100644 --- a/workflow/views.py +++ b/workflow/views.py @@ -23,9 +23,12 @@ WorkflowSearchParamFilter, ) from workflow.models import Schedule, Task, Workflow, WorkflowTemplate +from workflow.monitoring import get_celery_tasks_data, get_rabbitmq_queues_info from workflow.serializers import ( BulkSerializer, + CeleryMonitoringSerializer, PingSerializer, + RabbitMQMonitoringSerializer, ResumeWorkflowSerializer, RunWorkflowSerializer, ScheduleSerializer, @@ -37,7 +40,6 @@ from workflow.system import get_system_workflow_manager from workflow.user_sessions import create_session, execute_code, execute_file, sessions from workflow.workflows import execute_workflow -from workflow_app import celery_app _l = logging.getLogger("workflow") @@ -571,18 +573,31 @@ def run_manual(self, request, *args, **kwargs): return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) -class CeleryStatusViewSet(ViewSet): - """ - A simple ViewSet that returns Celery queue and worker status. - """ +class CeleryMonitoringViewSet(ViewSet): + serializer_class = CeleryMonitoringSerializer def list(self, request, *args, **kwargs): - insp = celery_app.control.inspect() - data = { - "workers": insp.stats() or {}, - "active": insp.active() or {}, - "reserved": insp.reserved() or {}, - "scheduled": insp.scheduled() or {}, - } + try: + tasks_data = get_celery_tasks_data() - return Response(data) + serializer = CeleryMonitoringSerializer(tasks_data) + return Response(serializer.data) + + except Exception as e: + _l.error(f"CeleryMonitoringViewSet error: {str(e)}") + return Response({"message": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) + + +class RabbitMQMonitoringViewSet(ViewSet): + serializer_class = RabbitMQMonitoringSerializer + + def list(self, request, *args, **kwargs): + try: + queues_info = get_rabbitmq_queues_info() + _l.info(f"RabbitMQMonitoringViewSet queues_info: {queues_info}") + serializer = RabbitMQMonitoringSerializer(queues_info) + return Response(serializer.data) + + except Exception as e: + _l.error(f"RabbitMQMonitoringViewSet error: {str(e)}") + return Response({"message": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) diff --git a/workflow_app/urls.py b/workflow_app/urls.py index b947667..628f55d 100644 --- a/workflow_app/urls.py +++ b/workflow_app/urls.py @@ -21,11 +21,12 @@ from rest_framework import routers from workflow.views import ( - CeleryStatusViewSet, + CeleryMonitoringViewSet, CodeExecutionViewSet, DefinitionViewSet, FileExecutionViewSet, LogFileViewSet, + RabbitMQMonitoringViewSet, RealmMigrateSchemeView, RefreshStorageViewSet, ScheduleViewSet, @@ -48,7 +49,8 @@ router.register(r"execute-code", CodeExecutionViewSet, basename="execute-code") router.register(r"execute-file", FileExecutionViewSet, basename="execute-file") router.register(r"authorizer/migrate", RealmMigrateSchemeView, "migrate") -router.register(r"celery-status", CeleryStatusViewSet, basename="celery-status") +router.register(r"monitoring/celery", CeleryMonitoringViewSet, basename="celery-monitoring") +router.register(r"monitoring/rabbitmq", RabbitMQMonitoringViewSet, basename="rabbitmq-monitoring") urlpatterns = [ # Old Approach (delete in 1.9.0)