From 9bbffc05792a5d2fe710db912412388ce37154bb Mon Sep 17 00:00:00 2001 From: Madhu Kanoor Date: Thu, 15 Aug 2024 09:47:53 -0400 Subject: [PATCH] chore: remove old event stream from codebase https://issues.redhat.com/browse/AAP-27632 Remove the old EventStream from fanout --- src/aap_eda/api/constants.py | 32 - src/aap_eda/api/exceptions.py | 30 - src/aap_eda/api/filters/__init__.py | 3 - src/aap_eda/api/filters/event_stream.py | 29 - src/aap_eda/api/serializers/__init__.py | 9 - src/aap_eda/api/serializers/activation.py | 93 +-- src/aap_eda/api/serializers/event_stream.py | 218 ------- src/aap_eda/api/urls.py | 1 - src/aap_eda/api/views/__init__.py | 3 - src/aap_eda/api/views/event_stream.py | 330 ----------- src/aap_eda/core/enums.py | 2 - ...emove_activation_event_streams_and_more.py | 41 ++ src/aap_eda/core/models/__init__.py | 2 - src/aap_eda/core/models/activation.py | 4 - src/aap_eda/core/models/event_stream.py | 118 ---- src/aap_eda/core/models/rulebook_process.py | 20 +- src/aap_eda/core/utils/strings.py | 12 - src/aap_eda/core/validators.py | 11 - .../services/activation/activation_manager.py | 2 +- .../services/activation/status_manager.py | 2 +- src/aap_eda/tasks/activation_request_queue.py | 4 +- src/aap_eda/tasks/orchestrator.py | 13 +- .../api/test_activation_instance.py | 1 - tests/integration/api/test_event_stream.py | 558 ------------------ tests/integration/api/test_root.py | 2 - tests/integration/core/conftest.py | 16 - tests/integration/core/test_process_parent.py | 10 - .../integration/core/test_rulebook_process.py | 10 - tests/integration/core/test_status_handler.py | 12 - .../activation/engine/test_kubernetes.py | 40 -- .../services/activation/test_event_stream.py | 115 ---- .../services/activation/test_manager.py | 41 -- tests/unit/test_orchestrator.py | 16 +- 33 files changed, 55 insertions(+), 1745 deletions(-) delete mode 100644 src/aap_eda/api/filters/event_stream.py delete mode 100644 src/aap_eda/api/serializers/event_stream.py delete mode 100644 src/aap_eda/api/views/event_stream.py create mode 100644 src/aap_eda/core/migrations/0046_remove_activation_event_streams_and_more.py delete mode 100644 src/aap_eda/core/models/event_stream.py delete mode 100644 tests/integration/api/test_event_stream.py delete mode 100644 tests/integration/services/activation/test_event_stream.py diff --git a/src/aap_eda/api/constants.py b/src/aap_eda/api/constants.py index 113123de8..2b1fc47a6 100644 --- a/src/aap_eda/api/constants.py +++ b/src/aap_eda/api/constants.py @@ -14,36 +14,4 @@ # EDA_SERVER_VAULT_LABEL is reserved for system vault password identifiers EDA_SERVER_VAULT_LABEL = "EDA_SERVER" - -PG_NOTIFY_TEMPLATE_RULEBOOK_NAME = "_PG_NOTIFY_TEMPLATE_RULEBOOK_" -PG_NOTIFY_TEMPLATE_RULEBOOK_DATA = """ ---- -- name: PG Notify Template Event Stream - hosts: all - sources: - - name: my_range - ansible.eda.range: - limit: 5 - complementary_source: - type: ansible.eda.pg_listener - name: Postgres Listener - args: - dsn: "{{ EDA_PG_NOTIFY_DSN }}" - channels: - - "{{ EDA_PG_NOTIFY_CHANNEL }}" - extra_vars: - EDA_PG_NOTIFY_DSN: "{{ settings.PG_NOTIFY_DSN }}" - EDA_PG_NOTIFY_CHANNEL: "{{ event_stream.channel_name }}" - encrypt_vars: - - EDA_PG_NOTIFY_DSN - rules: - - name: Post event - condition: true - action: - pg_notify: - dsn: "{{ EDA_PG_NOTIFY_DSN }}" - channel: "{{ EDA_PG_NOTIFY_CHANNEL }}" - event: "{{ event }}" -""" - SOURCE_MAPPING_ERROR_KEY = "rulebook" diff --git a/src/aap_eda/api/exceptions.py b/src/aap_eda/api/exceptions.py index 8aabd06dd..1d982a317 100644 --- a/src/aap_eda/api/exceptions.py +++ b/src/aap_eda/api/exceptions.py @@ -97,36 +97,6 @@ class InvalidWebsocketHost(APIException): ) -class MissingEventStreamRulebook(APIException): - status_code = status.HTTP_500_INTERNAL_SERVER_ERROR - default_detail = ( - "Configuration Error: Event stream template rulebook not found" - ) - - -class MissingEventStreamRulebookKeys(APIException): - status_code = status.HTTP_500_INTERNAL_SERVER_ERROR - default_detail = ( - "Configuration Error: Event stream template rulebook is missing " - "required keys in complementary_source: type, name and args" - ) - - -class MissingEventStreamRulebookSource(APIException): - status_code = status.HTTP_500_INTERNAL_SERVER_ERROR - default_detail = ( - "Configuration Error: Event stream template rulebook is missing " - "required complementary_source" - ) - - -class InvalidEventStreamRulebook(APIException): - status_code = status.HTTP_500_INTERNAL_SERVER_ERROR - default_detail = ( - "Configuration Error: Event stream template rulebook is invalid" - ) - - class InvalidWebhookSource(APIException): status_code = status.HTTP_500_INTERNAL_SERVER_ERROR default_detail = ( diff --git a/src/aap_eda/api/filters/__init__.py b/src/aap_eda/api/filters/__init__.py index 0a993ef12..b8cfa4b38 100644 --- a/src/aap_eda/api/filters/__init__.py +++ b/src/aap_eda/api/filters/__init__.py @@ -20,7 +20,6 @@ from .credential_type import CredentialTypeFilter from .decision_environment import DecisionEnvironmentFilter from .eda_credential import EdaCredentialFilter -from .event_stream import EventStreamFilter from .organization import OrganizationFilter from .project import ProjectFilter from .rulebook import ( @@ -52,8 +51,6 @@ "ActivationInstanceLogFilter", # user "UserFilter", - # event_stream - "EventStreamFilter", # organization "OrganizationFilter", # team diff --git a/src/aap_eda/api/filters/event_stream.py b/src/aap_eda/api/filters/event_stream.py deleted file mode 100644 index 53455855c..000000000 --- a/src/aap_eda/api/filters/event_stream.py +++ /dev/null @@ -1,29 +0,0 @@ -# Copyright 2024 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import django_filters - -from aap_eda.core import models - - -class EventStreamFilter(django_filters.FilterSet): - name = django_filters.CharFilter( - field_name="name", - lookup_expr="istartswith", - label="Filter by event source name.", - ) - - class Meta: - model = models.EventStream - fields = ["name"] diff --git a/src/aap_eda/api/serializers/__init__.py b/src/aap_eda/api/serializers/__init__.py index ce4939e41..ee8b66d81 100644 --- a/src/aap_eda/api/serializers/__init__.py +++ b/src/aap_eda/api/serializers/__init__.py @@ -38,11 +38,6 @@ EdaCredentialCreateSerializer, EdaCredentialSerializer, ) -from .event_stream import ( - EventStreamCreateSerializer, - EventStreamOutSerializer, - EventStreamSerializer, -) from .organization import ( OrganizationCreateSerializer, OrganizationRefSerializer, @@ -123,10 +118,6 @@ "EdaCredentialCreateSerializer", # decision environment "DecisionEnvironmentSerializer", - # event streams - "EventStreamSerializer", - "EventStreamCreateSerializer", - "EventStreamOutSerializer", # organizations "OrganizationSerializer", "OrganizationCreateSerializer", diff --git a/src/aap_eda/api/serializers/activation.py b/src/aap_eda/api/serializers/activation.py index 207f5b082..2c5c47341 100644 --- a/src/aap_eda/api/serializers/activation.py +++ b/src/aap_eda/api/serializers/activation.py @@ -24,18 +24,13 @@ from aap_eda.api.constants import ( EDA_SERVER_VAULT_LABEL, - PG_NOTIFY_TEMPLATE_RULEBOOK_DATA, SOURCE_MAPPING_ERROR_KEY, ) -from aap_eda.api.exceptions import ( - InvalidEventStreamRulebook, - InvalidWebhookSource, -) +from aap_eda.api.exceptions import InvalidWebhookSource from aap_eda.api.serializers.decision_environment import ( DecisionEnvironmentRefSerializer, ) from aap_eda.api.serializers.eda_credential import EdaCredentialSerializer -from aap_eda.api.serializers.event_stream import EventStreamOutSerializer from aap_eda.api.serializers.organization import OrganizationRefSerializer from aap_eda.api.serializers.project import ( ANSIBLE_VAULT_STRING, @@ -58,12 +53,7 @@ get_rulebook_hash, swap_webhook_sources, ) -from aap_eda.core.utils.strings import ( - substitute_extra_vars, - substitute_source_args, - substitute_variables, - swap_sources, -) +from aap_eda.core.utils.strings import substitute_variables logger = logging.getLogger(__name__) REQUIRED_KEYS = ["webhook_id", "webhook_name", "source_name", "rulebook_hash"] @@ -106,43 +96,6 @@ def _update_webhook_source(validated_data: dict, vault_data: VaultData) -> str: raise InvalidWebhookSource(e) from e -def _updated_ruleset(validated_data: dict, vault_data: VaultData): - try: - sources_info = [] - - for event_stream_id in validated_data["event_streams"]: - event_stream = models.EventStream.objects.get(id=event_stream_id) - - if event_stream.rulebook: - rulesets = yaml.safe_load(event_stream.rulebook.rulesets) - else: - rulesets = yaml.safe_load(PG_NOTIFY_TEMPLATE_RULEBOOK_DATA) - - extra_vars = rulesets[0]["sources"][0].get("extra_vars", {}) - encrypt_vars = rulesets[0]["sources"][0].get("encrypt_vars", []) - - if bool(encrypt_vars): - vault_data.password_used = True - - extra_vars = substitute_extra_vars( - event_stream.__dict__, - extra_vars, - encrypt_vars, - vault_data.password, - ) - - source = rulesets[0]["sources"][0]["complementary_source"] - source = substitute_source_args( - event_stream.__dict__, source, extra_vars - ) - sources_info.append(source) - - return swap_sources(validated_data["rulebook_rulesets"], sources_info) - except Exception as e: - logger.error(f"Failed to update rulesets: {e}") - raise InvalidEventStreamRulebook(e) - - def _update_k8s_service_name(validated_data: dict) -> str: service_name = validated_data.get("k8s_service_name") return service_name or create_k8s_service_name(validated_data["name"]) @@ -246,12 +199,6 @@ def replace_vault_data(extra_var): class ActivationSerializer(serializers.ModelSerializer): """Serializer for the Activation model.""" - event_streams = serializers.ListField( - required=False, - allow_null=True, - child=EventStreamOutSerializer(), - ) - eda_credentials = serializers.ListField( required=False, allow_null=True, @@ -287,7 +234,6 @@ class Meta: "modified_at", "status_message", "awx_token_id", - "event_streams", "eda_credentials", "log_level", "webhooks", @@ -307,11 +253,6 @@ class ActivationListSerializer(serializers.ModelSerializer): rules_count = serializers.IntegerField() rules_fired_count = serializers.IntegerField() - event_streams = serializers.ListField( - required=False, - allow_null=True, - child=EventStreamOutSerializer(), - ) eda_credentials = serializers.ListField( required=False, allow_null=True, @@ -352,7 +293,6 @@ class Meta: "modified_at", "status_message", "awx_token_id", - "event_streams", "log_level", "eda_credentials", "k8s_service_name", @@ -366,10 +306,6 @@ def to_representation(self, activation): rules_count, rules_fired_count = get_rules_count( activation.ruleset_stats ) - event_streams = [ - EventStreamOutSerializer(event_stream).data - for event_stream in activation.event_streams.all() - ] eda_credentials = [ EdaCredentialSerializer(credential).data for credential in activation.eda_credentials.all() @@ -405,7 +341,6 @@ def to_representation(self, activation): "modified_at": activation.modified_at, "status_message": activation.status_message, "awx_token_id": activation.awx_token_id, - "event_streams": event_streams, "log_level": activation.log_level, "eda_credentials": eda_credentials, "k8s_service_name": activation.k8s_service_name, @@ -431,7 +366,6 @@ class Meta: "user", "restart_policy", "awx_token_id", - "event_streams", "log_level", "eda_credentials", "k8s_service_name", @@ -463,12 +397,6 @@ class Meta: validators=[validators.check_if_awx_token_exists], required=False, ) - event_streams = serializers.ListField( - required=False, - allow_null=True, - child=serializers.IntegerField(), - validators=[validators.check_if_event_streams_exists], - ) eda_credentials = serializers.ListField( required=False, allow_null=True, @@ -503,11 +431,6 @@ def create(self, validated_data): vault_data = VaultData() - if validated_data.get("event_streams"): - validated_data["rulebook_rulesets"] = _updated_ruleset( - validated_data, vault_data - ) - if validated_data.get("source_mappings", []): validated_data["rulebook_rulesets"] = _update_webhook_source( validated_data, vault_data @@ -555,7 +478,6 @@ class Meta: "git_hash", "status_message", "activation_id", - "event_stream_id", "organization_id", "started_at", "ended_at", @@ -586,11 +508,6 @@ class ActivationReadSerializer(serializers.ModelSerializer): rules_count = serializers.IntegerField() rules_fired_count = serializers.IntegerField() restarted_at = serializers.DateTimeField(required=False, allow_null=True) - event_streams = serializers.ListField( - required=False, - allow_null=True, - child=EventStreamOutSerializer(), - ) eda_credentials = serializers.ListField( required=False, allow_null=True, @@ -635,7 +552,6 @@ class Meta: "status_message", "awx_token_id", "eda_credentials", - "event_streams", "log_level", "k8s_service_name", "webhooks", @@ -682,10 +598,6 @@ def to_representation(self, activation): if activation.organization else None ) - event_streams = [ - EventStreamOutSerializer(event_stream).data - for event_stream in activation.event_streams.all() - ] eda_credentials = [ EdaCredentialSerializer(credential).data for credential in activation.eda_credentials.all() @@ -726,7 +638,6 @@ def to_representation(self, activation): "restarted_at": restarted_at, "status_message": activation.status_message, "awx_token_id": activation.awx_token_id, - "event_streams": event_streams, "log_level": activation.log_level, "eda_credentials": eda_credentials, "k8s_service_name": activation.k8s_service_name, diff --git a/src/aap_eda/api/serializers/event_stream.py b/src/aap_eda/api/serializers/event_stream.py deleted file mode 100644 index 57abaec9f..000000000 --- a/src/aap_eda/api/serializers/event_stream.py +++ /dev/null @@ -1,218 +0,0 @@ -# Copyright 2024 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -import secrets -import uuid - -import yaml -from django.conf import settings -from django.core.validators import RegexValidator -from rest_framework import serializers - -from aap_eda.api.constants import ( - PG_NOTIFY_TEMPLATE_RULEBOOK_DATA, - PG_NOTIFY_TEMPLATE_RULEBOOK_NAME, -) -from aap_eda.api.exceptions import ( - MissingEventStreamRulebook, - MissingEventStreamRulebookKeys, - MissingEventStreamRulebookSource, -) -from aap_eda.api.serializers.fields.yaml import YAMLSerializerField -from aap_eda.core import models, validators -from aap_eda.core.utils.strings import substitute_extra_vars, swap_sources - -logger = logging.getLogger(__name__) - -EDA_CHANNEL_PREFIX = "eda_" - - -def _get_rulebook(): - rulebook = None - name = settings.PG_NOTIFY_TEMPLATE_RULEBOOK - if name: - rulebook = models.Rulebook.objects.filter(name=name).first() - - if not rulebook: - logger.error( - "Missing Listener rulebook %s", - settings.PG_NOTIFY_TEMPLATE_RULEBOOK, - ) - raise MissingEventStreamRulebook - - required_keys = ["type", "name", "args"] - rulesets = yaml.safe_load(rulebook.rulesets) - for ruleset in rulesets: - sources = ruleset.get("sources", []) - for source in sources: - complementary_source = source.get("complementary_source") - - if not complementary_source: - raise MissingEventStreamRulebookSource - - for key in required_keys: - if key not in complementary_source.keys(): - raise MissingEventStreamRulebookKeys - - return rulebook - - -def _get_default_channel_name(): - stream_uuid = str(uuid.uuid4()) - return f"{EDA_CHANNEL_PREFIX}{stream_uuid.replace('-','_')}" - - -def _get_extra_var(validated_data: dict) -> dict: - rulesets = yaml.safe_load(validated_data["rulebook_rulesets"]) - extra_vars = rulesets[0]["sources"][0]["extra_vars"] - encrypt_vars = rulesets[0]["sources"][0].get("encrypt_vars", []) - - password = "" - - if bool(encrypt_vars): - password = secrets.token_urlsafe() - - extra_vars = substitute_extra_vars( - validated_data, extra_vars, encrypt_vars, password - ) - - return extra_vars - - -def _updated_listener_ruleset(validated_data): - sources_info = [ - { - "name": validated_data["name"], - "type": validated_data["source_type"], - "args": validated_data["source_args"], - } - ] - return swap_sources(validated_data["rulebook_rulesets"], sources_info) - - -class EventStreamSerializer(serializers.ModelSerializer): - decision_environment_id = serializers.IntegerField( - validators=[validators.check_if_de_exists] - ) - user = serializers.SerializerMethodField() - source_args = YAMLSerializerField(required=False, allow_null=True) - queue_name = serializers.CharField( - source="rulebookprocessqueue.queue_name", - read_only=True, - allow_null=True, - default=None, - ) - - class Meta: - model = models.EventStream - read_only_fields = [ - "id", - "created_at", - "modified_at", - ] - fields = [ - "name", - "source_args", - "source_type", - "channel_name", - "is_enabled", - "status", - "status_message", - "decision_environment_id", - "user", - "log_level", - "k8s_service_name", - "queue_name", - *read_only_fields, - ] - - def get_user(self, obj) -> str: - return f"{obj.user.username}" - - -class EventStreamCreateSerializer(serializers.ModelSerializer): - """Serializer for creating the EventStream.""" - - decision_environment_id = serializers.IntegerField( - validators=[validators.check_if_de_exists] - ) - user = serializers.HiddenField(default=serializers.CurrentUserDefault()) - source_args = YAMLSerializerField() - channel_name = serializers.CharField( - default=_get_default_channel_name, - validators=[ - RegexValidator( - regex=r"^\w+$", - message="Channel name can only contain alphanumeric and " - "underscore characters", - ), - ], - ) - k8s_service_name = serializers.CharField( - required=False, - allow_null=True, - allow_blank=True, - validators=[validators.check_if_rfc_1035_compliant], - ) - - class Meta: - model = models.EventStream - fields = [ - "name", - "description", - "is_enabled", - "source_type", - "source_args", - "channel_name", - "decision_environment_id", - "rulebook_id", - "extra_var", - "user", - "restart_policy", - "log_level", - "k8s_service_name", - ] - - def create(self, validated_data): - rulebook = _get_rulebook() - validated_data["user_id"] = validated_data["user"].id - if rulebook: - validated_data["rulebook_name"] = rulebook.name - validated_data["rulebook_id"] = rulebook.id - validated_data["rulebook_rulesets"] = rulebook.rulesets - else: - validated_data["rulebook_name"] = PG_NOTIFY_TEMPLATE_RULEBOOK_NAME - validated_data["rulebook_id"] = None - validated_data[ - "rulebook_rulesets" - ] = PG_NOTIFY_TEMPLATE_RULEBOOK_DATA - - validated_data["channel_name"] = validated_data.get( - "channel_name", _get_default_channel_name() - ) - extra_vars = _get_extra_var(validated_data) - validated_data["extra_var"] = yaml.dump(extra_vars) - validated_data["rulebook_rulesets"] = _updated_listener_ruleset( - validated_data - ) - return super().create(validated_data) - - -class EventStreamOutSerializer(serializers.ModelSerializer): - """Serializer for UI to show EventStream.""" - - class Meta: - model = models.EventStream - fields = ["id", "name"] diff --git a/src/aap_eda/api/urls.py b/src/aap_eda/api/urls.py index fa277a04a..dcf68e2df 100644 --- a/src/aap_eda/api/urls.py +++ b/src/aap_eda/api/urls.py @@ -44,7 +44,6 @@ ) router.register("audit-rules", views.AuditRuleViewSet) router.register("users", views.UserViewSet) -router.register("event-streams", views.EventStreamViewSet) router.register( "users/me/awx-tokens", views.CurrentUserAwxTokenViewSet, diff --git a/src/aap_eda/api/views/__init__.py b/src/aap_eda/api/views/__init__.py index 129066fbf..6971d5a37 100644 --- a/src/aap_eda/api/views/__init__.py +++ b/src/aap_eda/api/views/__init__.py @@ -18,7 +18,6 @@ from .credential_type import CredentialTypeViewSet from .decision_environment import DecisionEnvironmentViewSet from .eda_credential import EdaCredentialViewSet -from .event_stream import EventStreamViewSet from .external_webhook import ExternalWebhookViewSet from .organization import OrganizationViewSet from .project import ProjectViewSet @@ -48,8 +47,6 @@ "EdaCredentialViewSet", # decision_environment "DecisionEnvironmentViewSet", - # event_stream - "EventStreamViewSet", # organizations "OrganizationViewSet", # teams diff --git a/src/aap_eda/api/views/event_stream.py b/src/aap_eda/api/views/event_stream.py deleted file mode 100644 index 2f808ea75..000000000 --- a/src/aap_eda/api/views/event_stream.py +++ /dev/null @@ -1,330 +0,0 @@ -# Copyright 2024 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import logging - -from django.shortcuts import get_object_or_404 -from django_filters import rest_framework as defaultfilters -from drf_spectacular.utils import ( - OpenApiParameter, - OpenApiResponse, - extend_schema, - extend_schema_view, -) -from rest_framework import exceptions, mixins, status, viewsets -from rest_framework.decorators import action -from rest_framework.response import Response - -from aap_eda.api import exceptions as api_exc, filters, serializers -from aap_eda.api.serializers.activation import is_activation_valid -from aap_eda.core import models -from aap_eda.core.enums import ( - Action, - ActivationStatus, - ProcessParentType, - ResourceType, -) -from aap_eda.tasks.orchestrator import ( - delete_rulebook_process, - restart_rulebook_process, - start_rulebook_process, - stop_rulebook_process, -) - -logger = logging.getLogger(__name__) - - -@extend_schema_view( - destroy=extend_schema( - description="Delete an existing EventStream", - responses={ - status.HTTP_204_NO_CONTENT: OpenApiResponse( - None, - description="The EventStream has been deleted.", - ), - }, - ), -) -@extend_schema(exclude=True) -class EventStreamViewSet( - mixins.DestroyModelMixin, - viewsets.GenericViewSet, -): - queryset = models.EventStream.objects.select_related( - "rulebookprocessqueue", - ) - serializer_class = serializers.EventStreamSerializer - filter_backends = (defaultfilters.DjangoFilterBackend,) - filterset_class = filters.EventStreamFilter - rbac_resource_type = ResourceType.EVENT_STREAM - rbac_action = None - - @extend_schema( - request=serializers.EventStreamCreateSerializer, - responses={ - status.HTTP_201_CREATED: serializers.EventStreamSerializer, - status.HTTP_400_BAD_REQUEST: OpenApiResponse( - description="Invalid data to create event_stream." - ), - }, - ) - def create(self, request): - context = {"request": request} - serializer = serializers.EventStreamCreateSerializer( - data=request.data, context=context - ) - serializer.is_valid(raise_exception=True) - - event_stream = serializer.create(serializer.validated_data) - - if event_stream.is_enabled: - start_rulebook_process( - process_parent_type=ProcessParentType.EVENT_STREAM, - process_parent_id=event_stream.id, - ) - - return Response( - serializers.EventStreamSerializer(event_stream).data, - status=status.HTTP_201_CREATED, - ) - - @extend_schema( - description="Get an event_stream by id", - responses={ - status.HTTP_200_OK: OpenApiResponse( - serializers.EventStreamSerializer, - description="Return an event_stream by id.", - ), - }, - ) - def retrieve(self, request, pk: int): - event_stream = get_object_or_404(models.EventStream, pk=pk) - return Response(serializers.EventStreamSerializer(event_stream).data) - - @extend_schema( - description="List all EventStreams", - request=None, - responses={ - status.HTTP_200_OK: OpenApiResponse( - serializers.EventStreamSerializer(many=True), - description="Return a list of EventStreams.", - ), - }, - ) - def list(self, request): - event_streams = models.EventStream.objects.all() - event_streams = self.filter_queryset(event_streams) - - serializer = serializers.EventStreamSerializer( - event_streams, many=True - ) - result = self.paginate_queryset(serializer.data) - - return self.get_paginated_response(result) - - def perform_destroy(self, event_stream): - event_stream.status = ActivationStatus.DELETING - event_stream.save(update_fields=["status"]) - logger.info(f"Now deleting {event_stream.name} ...") - delete_rulebook_process( - process_parent_type=ProcessParentType.EVENT_STREAM, - process_parent_id=event_stream.id, - ) - - @extend_schema( - description="List all instances for the EventStream", - request=None, - responses={ - status.HTTP_200_OK: serializers.ActivationInstanceSerializer( - many=True - ), - }, - parameters=[ - OpenApiParameter( - name="id", - type=int, - location=OpenApiParameter.PATH, - description="A unique integer value identifying this rulebook.", # noqa: E501 - ) - ], - ) - @action( - detail=False, - queryset=models.RulebookProcess.objects.order_by("id"), - filterset_class=filters.ActivationInstanceFilter, - rbac_resource_type=ResourceType.ACTIVATION_INSTANCE, - rbac_action=Action.READ, - url_path="(?P[^/.]+)/instances", - ) - def instances(self, request, id): - event_stream_exists = models.EventStream.objects.filter(id=id).exists() - if not event_stream_exists: - raise api_exc.NotFound( - code=status.HTTP_404_NOT_FOUND, - detail=f"EventStream with ID={id} does not exist.", - ) - - event_stream_instances = models.RulebookProcess.objects.filter( - parent_type=ProcessParentType.EVENT_STREAM, - event_stream_id=id, - ) - filtered_instances = self.filter_queryset(event_stream_instances) - result = self.paginate_queryset(filtered_instances) - serializer = serializers.ActivationInstanceSerializer( - result, many=True - ) - return self.get_paginated_response(serializer.data) - - @extend_schema( - description="Enable the EventStream", - request=None, - responses={ - status.HTTP_204_NO_CONTENT: OpenApiResponse( - None, - description="EventStream has been enabled.", - ), - status.HTTP_400_BAD_REQUEST: OpenApiResponse( - None, - description="EventStream not enabled.", - ), - status.HTTP_409_CONFLICT: OpenApiResponse( - None, - description="EventStream not enabled do to current event" - " stream status", - ), - }, - ) - @action(methods=["post"], detail=True, rbac_action=Action.ENABLE) - def enable(self, request, pk): - event_stream = get_object_or_404(models.EventStream, pk=pk) - - if event_stream.is_enabled: - return Response(status=status.HTTP_204_NO_CONTENT) - - if event_stream.status in [ - ActivationStatus.STARTING, - ActivationStatus.STOPPING, - ActivationStatus.DELETING, - ActivationStatus.RUNNING, - ActivationStatus.UNRESPONSIVE, - ]: - return Response(status=status.HTTP_409_CONFLICT) - - valid, error = is_activation_valid(event_stream) - if not valid: - event_stream.status = ActivationStatus.ERROR - event_stream.status_message = error - event_stream.save(update_fields=["status", "status_message"]) - logger.error(f"Failed to enable {event_stream.name}: {error}") - - return Response( - {"errors": error}, status=status.HTTP_400_BAD_REQUEST - ) - - logger.info(f"Now enabling {event_stream.name} ...") - - event_stream.is_enabled = True - event_stream.failure_count = 0 - event_stream.status = ActivationStatus.PENDING - event_stream.save( - update_fields=[ - "is_enabled", - "failure_count", - "status", - "modified_at", - ] - ) - start_rulebook_process( - process_parent_type=ProcessParentType.EVENT_STREAM, - process_parent_id=pk, - ) - - return Response(status=status.HTTP_204_NO_CONTENT) - - @extend_schema( - description="Disable the EventStream", - request=None, - responses={ - status.HTTP_204_NO_CONTENT: OpenApiResponse( - None, - description="EventStream has been disabled.", - ), - }, - ) - @action(methods=["post"], detail=True, rbac_action=Action.DISABLE) - def disable(self, request, pk): - event_stream = get_object_or_404(models.EventStream, pk=pk) - - self._check_deleting(event_stream) - - if event_stream.is_enabled: - event_stream.status = ActivationStatus.STOPPING - event_stream.is_enabled = False - event_stream.save( - update_fields=["is_enabled", "status", "modified_at"] - ) - stop_rulebook_process( - process_parent_type=ProcessParentType.EVENT_STREAM, - process_parent_id=event_stream.id, - ) - - return Response(status=status.HTTP_204_NO_CONTENT) - - @extend_schema( - description="Restart the EventStream", - request=None, - responses={ - status.HTTP_204_NO_CONTENT: OpenApiResponse( - None, - description="EventStream restart was successful.", - ), - status.HTTP_400_BAD_REQUEST: OpenApiResponse( - None, - description="EventStream not enabled.", - ), - }, - ) - @action(methods=["post"], detail=True, rbac_action=Action.RESTART) - def restart(self, request, pk): - event_stream = get_object_or_404(models.EventStream, pk=pk) - - self._check_deleting(event_stream) - - if not event_stream.is_enabled: - raise api_exc.Forbidden( - detail="EventStream is disabled and cannot be run." - ) - - valid, error = is_activation_valid(event_stream) - if not valid: - event_stream.status = ActivationStatus.ERROR - event_stream.status_message = error - event_stream.save(update_fields=["status", "status_message"]) - logger.error(f"Failed to restart {event_stream.name}: {error}") - - return Response( - {"errors": error}, status=status.HTTP_400_BAD_REQUEST - ) - - restart_rulebook_process( - process_parent_type=ProcessParentType.EVENT_STREAM, - process_parent_id=event_stream.id, - ) - - return Response(status=status.HTTP_204_NO_CONTENT) - - def _check_deleting(self, event_stream): - if event_stream.status == ActivationStatus.DELETING: - raise exceptions.APIException( - detail="Object is being deleted", code=409 - ) diff --git a/src/aap_eda/core/enums.py b/src/aap_eda/core/enums.py index 8c7e1c577..ff8cb61ee 100644 --- a/src/aap_eda/core/enums.py +++ b/src/aap_eda/core/enums.py @@ -50,7 +50,6 @@ class ResourceType(DjangoStrEnum): CREDENTIAL = "credential" CREDENTIAL_TYPE = "credential_type" EDA_CREDENTIAL = "eda_credential" - EVENT_STREAM = "event_stream" ORGANIZATION = "organization" TEAM = "team" WEBHOOK = "webhook" @@ -130,7 +129,6 @@ class ProcessParentType(DjangoStrEnum): """Types of parent objects for a rulebook process.""" ACTIVATION = "activation" - EVENT_STREAM = "event_stream" class RulebookProcessLogLevel(DjangoStrEnum): diff --git a/src/aap_eda/core/migrations/0046_remove_activation_event_streams_and_more.py b/src/aap_eda/core/migrations/0046_remove_activation_event_streams_and_more.py new file mode 100644 index 000000000..da3fe0106 --- /dev/null +++ b/src/aap_eda/core/migrations/0046_remove_activation_event_streams_and_more.py @@ -0,0 +1,41 @@ +# Generated by Django 4.2.7 on 2024-08-15 14:04 + +from django.db import migrations, models + +import aap_eda.core.enums + + +class Migration(migrations.Migration): + dependencies = [ + ("core", "0045_activation_skip_audit_events"), + ] + + operations = [ + migrations.RemoveField( + model_name="activation", + name="event_streams", + ), + migrations.RemoveField( + model_name="rulebookprocess", + name="event_stream", + ), + migrations.AlterField( + model_name="activationrequestqueue", + name="process_parent_type", + field=models.TextField( + choices=[("activation", "activation")], + default=aap_eda.core.enums.ProcessParentType["ACTIVATION"], + ), + ), + migrations.AlterField( + model_name="rulebookprocess", + name="parent_type", + field=models.TextField( + choices=[("activation", "activation")], + default=aap_eda.core.enums.ProcessParentType["ACTIVATION"], + ), + ), + migrations.DeleteModel( + name="EventStream", + ), + ] diff --git a/src/aap_eda/core/models/__init__.py b/src/aap_eda/core/models/__init__.py index 865ac6272..04c9f6eb9 100644 --- a/src/aap_eda/core/models/__init__.py +++ b/src/aap_eda/core/models/__init__.py @@ -19,7 +19,6 @@ from .credential_type import CredentialType from .decision_environment import DecisionEnvironment from .eda_credential import EdaCredential -from .event_stream import EventStream from .job import ( ActivationInstanceJobInstance, Job, @@ -71,7 +70,6 @@ "EdaCredential", "DecisionEnvironment", "ActivationRequestQueue", - "EventStream", "Organization", "Team", "Webhook", diff --git a/src/aap_eda/core/models/activation.py b/src/aap_eda/core/models/activation.py index 90ffffaa0..7ddcac076 100644 --- a/src/aap_eda/core/models/activation.py +++ b/src/aap_eda/core/models/activation.py @@ -108,10 +108,6 @@ class Meta: null=True, default=None, ) - event_streams = models.ManyToManyField( - "EventStream", - default=None, - ) log_level = models.CharField( max_length=20, choices=RulebookProcessLogLevel.choices(), diff --git a/src/aap_eda/core/models/event_stream.py b/src/aap_eda/core/models/event_stream.py deleted file mode 100644 index b53cd8349..000000000 --- a/src/aap_eda/core/models/event_stream.py +++ /dev/null @@ -1,118 +0,0 @@ -# Copyright 2024 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from django.db import models - -from aap_eda.core.enums import ( - ActivationStatus, - ProcessParentType, - RestartPolicy, - RulebookProcessLogLevel, -) -from aap_eda.core.utils import get_default_log_level -from aap_eda.services.activation.engine.common import ContainerableMixin - -from .base import UniqueNamedModel -from .mixins import OnDeleteProcessParentMixin, StatusHandlerModelMixin - - -class EventStream( - StatusHandlerModelMixin, - ContainerableMixin, - OnDeleteProcessParentMixin, - UniqueNamedModel, -): - """Model representing an event stream.""" - - router_basename = "eventstream" - - description = models.TextField(default="", blank=True) - is_enabled = models.BooleanField(default=True) - decision_environment = models.ForeignKey( - "DecisionEnvironment", - on_delete=models.SET_NULL, - null=True, - ) - rulebook = models.ForeignKey( - "Rulebook", - on_delete=models.SET_NULL, - null=True, - ) - extra_var = models.TextField(null=True, blank=True) - restart_policy = models.TextField( - choices=RestartPolicy.choices(), - default=RestartPolicy.ON_FAILURE, - ) - status = models.TextField( - choices=ActivationStatus.choices(), - default=ActivationStatus.PENDING, - ) - current_job_id = models.TextField(null=True) - restart_count = models.IntegerField(default=0) - failure_count = models.IntegerField(default=0) # internal, since last good - rulebook_name = models.TextField( - null=False, - help_text="Name of the referenced rulebook", - default="", - ) - rulebook_rulesets = models.TextField( - null=False, - help_text="Content of the last referenced rulebook", - default="", - ) - ruleset_stats = models.JSONField(default=dict) - user = models.ForeignKey("User", on_delete=models.CASCADE, null=False) - created_at = models.DateTimeField(auto_now_add=True, null=False) - modified_at = models.DateTimeField(auto_now=True, null=False) - status_updated_at = models.DateTimeField(null=True) - status_message = models.TextField(null=True, default=None) - latest_instance = models.OneToOneField( - "RulebookProcess", - null=True, - default=None, - on_delete=models.SET_NULL, - related_name="+", - ) - channel_name = models.TextField(null=True, default=None) - source_type = models.TextField(null=False) - source_args = models.JSONField(null=True, default=None) - log_level = models.CharField( - max_length=20, - choices=RulebookProcessLogLevel.choices(), - default=get_default_log_level, - ) - k8s_service_name = models.TextField( - null=True, - default=None, - blank=True, - help_text="Name of the kubernetes service", - ) - - class Meta: - db_table = "core_event_stream" - indexes = [ - models.Index(fields=["name"], name="ix_event_stream_name"), - ] - ordering = ("-created_at",) - - def __str__(self) -> str: - return f"EventStream {self.name} ({self.id})" - - # Implementation of the ContainerableMixin. - def _get_skip_audit_events(self) -> bool: - """Event stream skips audit events.""" - return True - - def get_parent_type(self) -> str: - return ProcessParentType.EVENT_STREAM diff --git a/src/aap_eda/core/models/rulebook_process.py b/src/aap_eda/core/models/rulebook_process.py index d463a9020..bcda9904e 100644 --- a/src/aap_eda/core/models/rulebook_process.py +++ b/src/aap_eda/core/models/rulebook_process.py @@ -58,13 +58,6 @@ class RulebookProcess(BaseOrgModel): blank=True, related_name="activation_processes", ) - event_stream = models.ForeignKey( - "EventStream", - on_delete=models.CASCADE, - null=True, - blank=True, - related_name="event_stream_processes", - ) parent_type = models.TextField( choices=ProcessParentType.choices(), null=False, @@ -130,14 +123,9 @@ def save(self, *args, **kwargs): def _check_parent(self): """Clean method for RulebookProcess model.""" - # Check that either activation or event_stream is set, but not both - if (self.activation is None and self.event_stream is None) or ( - self.activation and self.event_stream - ): - raise ValidationError( - "Either an activation or a event_stream must be set, " - "but not both." - ) + # Check that activation is set + if self.activation is None: + raise ValidationError("activation must be set") def get_parent(self): return getattr(self, self.parent_type.lower()) @@ -146,8 +134,6 @@ def _set_parent_type(self): self._check_parent() if self.activation: self.parent_type = ProcessParentType.ACTIVATION - if self.event_stream: - self.parent_type = ProcessParentType.EVENT_STREAM def _get_default_status_message(self): try: diff --git a/src/aap_eda/core/utils/strings.py b/src/aap_eda/core/utils/strings.py index c4eb68c12..8f2da212d 100644 --- a/src/aap_eda/core/utils/strings.py +++ b/src/aap_eda/core/utils/strings.py @@ -60,18 +60,6 @@ def substitute_variables( return value -def substitute_source_args(event_stream, source, extra_vars) -> dict: - context = { - "settings": settings.__dict__["_wrapped"].__dict__, - "event_stream": event_stream, - } - for key in extra_vars: - context[key] = extra_vars[key] - - source["args"] = substitute_variables(source.get("args", {}), context) - return source - - def substitute_extra_vars( event_stream, extra_vars, encrypt_keys, password ) -> dict: diff --git a/src/aap_eda/core/validators.py b/src/aap_eda/core/validators.py index 909e371a2..4426031c5 100644 --- a/src/aap_eda/core/validators.py +++ b/src/aap_eda/core/validators.py @@ -198,17 +198,6 @@ def is_extra_var_dict(extra_var: str): ) -def check_if_event_streams_exists(event_stream_ids: list[int]) -> list[int]: - for event_stream_id in event_stream_ids: - try: - models.EventStream.objects.get(pk=event_stream_id) - except models.EventStream.DoesNotExist: - raise serializers.ValidationError( - f"EventStream with id {event_stream_id} does not exist" - ) - return event_stream_ids - - def check_if_schema_valid(schema: dict): errors = validate_schema(schema) diff --git a/src/aap_eda/services/activation/activation_manager.py b/src/aap_eda/services/activation/activation_manager.py index 987287bb4..b83935bc9 100644 --- a/src/aap_eda/services/activation/activation_manager.py +++ b/src/aap_eda/services/activation/activation_manager.py @@ -57,7 +57,7 @@ class ActivationManager(StatusManager): def __init__( self, - db_instance: tp.Union[models.Activation, models.EventStream], + db_instance: models.Activation, container_engine: tp.Optional[ContainerEngine] = None, container_logger_class: type[DBLogger] = DBLogger, ): diff --git a/src/aap_eda/services/activation/status_manager.py b/src/aap_eda/services/activation/status_manager.py index 2438c8fb3..b81ed3f4d 100644 --- a/src/aap_eda/services/activation/status_manager.py +++ b/src/aap_eda/services/activation/status_manager.py @@ -62,7 +62,7 @@ class StatusManager: def __init__( self, - db_instance: tp.Union[models.Activation, models.EventStream], + db_instance: models.Activation, ): """Initialize the Process Parent Status Manager. diff --git a/src/aap_eda/tasks/activation_request_queue.py b/src/aap_eda/tasks/activation_request_queue.py index 56f1ec8f4..17c7124dc 100644 --- a/src/aap_eda/tasks/activation_request_queue.py +++ b/src/aap_eda/tasks/activation_request_queue.py @@ -17,7 +17,7 @@ from django.db.utils import IntegrityError from aap_eda.core.enums import ActivationRequest, ProcessParentType -from aap_eda.core.models import Activation, ActivationRequestQueue, EventStream +from aap_eda.core.models import Activation, ActivationRequestQueue from .exceptions import UnknownProcessParentType @@ -26,8 +26,6 @@ def push(parent_type: str, parent_id: int, request: ActivationRequest) -> None: if parent_type == ProcessParentType.ACTIVATION: model = Activation - elif parent_type == ProcessParentType.EVENT_STREAM: - model = EventStream else: raise UnknownProcessParentType( f"Unknown parent type {parent_type}", diff --git a/src/aap_eda/tasks/orchestrator.py b/src/aap_eda/tasks/orchestrator.py index 9b71c6e37..71d3a00f2 100644 --- a/src/aap_eda/tasks/orchestrator.py +++ b/src/aap_eda/tasks/orchestrator.py @@ -16,7 +16,7 @@ import random from collections import Counter from datetime import datetime, timedelta -from typing import Optional, Union +from typing import Optional from django.conf import settings from django.core.exceptions import ObjectDoesNotExist @@ -29,7 +29,7 @@ ActivationStatus, ProcessParentType, ) -from aap_eda.core.models import Activation, ActivationRequestQueue, EventStream +from aap_eda.core.models import Activation, ActivationRequestQueue from aap_eda.core.tasking import Worker, unique_enqueue from aap_eda.services.activation import exceptions from aap_eda.services.activation.activation_manager import ( @@ -56,11 +56,9 @@ def _manage_process_job_id(process_parent_type: str, id: int) -> str: def get_process_parent( process_parent_type: str, parent_id: int, -) -> Union[Activation, EventStream]: +) -> Activation: if process_parent_type == ProcessParentType.ACTIVATION: klass = Activation - elif process_parent_type == ProcessParentType.EVENT_STREAM: - klass = EventStream else: raise UnknownProcessParentType( f"Unknown process parent type {process_parent_type}", @@ -108,7 +106,7 @@ def _manage(process_parent_type: str, id: int) -> None: def _run_request( - process_parent: Union[Activation, EventStream], + process_parent: Activation, request: ActivationRequestQueue, ) -> bool: """Attempt to run a request for an activation via the manager.""" @@ -475,8 +473,7 @@ def monitor_rulebook_processes() -> None: process_parent_type = str(process.parent_type) if process_parent_type == ProcessParentType.ACTIVATION: process_parent_id = process.activation_id - else: - process_parent_id = process.event_stream_id + dispatch( process_parent_type, process_parent_id, diff --git a/tests/integration/api/test_activation_instance.py b/tests/integration/api/test_activation_instance.py index 8c817642c..028ba5f3e 100644 --- a/tests/integration/api/test_activation_instance.py +++ b/tests/integration/api/test_activation_instance.py @@ -166,6 +166,5 @@ def assert_activation_instance_data( "started_at": instance.started_at.strftime(DATETIME_FORMAT), "ended_at": instance.ended_at, "status_message": enums.ACTIVATION_STATUS_MESSAGE_MAP[instance.status], - "event_stream_id": None, "queue_name": instance.rulebookprocessqueue.queue_name, } diff --git a/tests/integration/api/test_event_stream.py b/tests/integration/api/test_event_stream.py deleted file mode 100644 index c419b3e7d..000000000 --- a/tests/integration/api/test_event_stream.py +++ /dev/null @@ -1,558 +0,0 @@ -# Copyright 2024 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from unittest import mock - -import pytest -import yaml -from rest_framework import status -from rest_framework.test import APIClient - -from aap_eda.api.constants import ( - PG_NOTIFY_TEMPLATE_RULEBOOK_DATA, - PG_NOTIFY_TEMPLATE_RULEBOOK_NAME, -) -from aap_eda.core import models -from aap_eda.core.enums import Action, ProcessParentType, ResourceType -from tests.integration.constants import api_url_v1 - -pytestmark = pytest.mark.skip( - reason="EventStream views are currently hidden, thus no need to run tests" -) - -BAD_PG_NOTIFY_TEMPLATE_RULEBOOK_NO_TYPE = """ ---- -- name: PG Notify Template Event Stream - hosts: all - sources: - - name: my_range - ansible.eda.range: - limit: 5 - complementary_source: - name: Postgres Listener - args: - dsn: "{{ EDA_PG_NOTIFY_DSN }}" - channels: - - "{{ EDA_PG_NOTIFY_CHANNEL }}" - extra_vars: - EDA_PG_NOTIFY_DSN: "{{ settings.PG_NOTIFY_DSN }}" - EDA_PG_NOTIFY_CHANNEL: "{{ event_stream.channel_name }}" - encrypt_vars: - - EDA_PG_NOTIFY_DSN - rules: - - name: Post event - condition: true - action: - pg_notify: - dsn: "{{ EDA_PG_NOTIFY_DSN }}" - channel: "{{ EDA_PG_NOTIFY_CHANNEL }}" - event: "{{ event }}" -""" -BAD_PG_NOTIFY_TEMPLATE_RULEBOOK_NO_NAME = """ ---- -- name: PG Notify Template Event Stream - hosts: all - sources: - - name: my_range - ansible.eda.range: - limit: 5 - complementary_source: - type: ansible.eda.pg_listener - args: - dsn: "{{ EDA_PG_NOTIFY_DSN }}" - channels: - - "{{ EDA_PG_NOTIFY_CHANNEL }}" - extra_vars: - EDA_PG_NOTIFY_DSN: "{{ settings.PG_NOTIFY_DSN }}" - EDA_PG_NOTIFY_CHANNEL: "{{ event_stream.channel_name }}" - encrypt_vars: - - EDA_PG_NOTIFY_DSN - rules: - - name: Post event - condition: true - action: - pg_notify: - dsn: "{{ EDA_PG_NOTIFY_DSN }}" - channel: "{{ EDA_PG_NOTIFY_CHANNEL }}" - event: "{{ event }}" -""" -BAD_PG_NOTIFY_TEMPLATE_RULEBOOK_NO_ARGS = """ ---- -- name: PG Notify Template Event Stream - hosts: all - sources: - - name: my_range - ansible.eda.range: - limit: 5 - complementary_source: - type: ansible.eda.pg_listener - name: Postgres Listener - extra_vars: - EDA_PG_NOTIFY_DSN: "{{ settings.PG_NOTIFY_DSN }}" - EDA_PG_NOTIFY_CHANNEL: "{{ event_stream.channel_name }}" - encrypt_vars: - - EDA_PG_NOTIFY_DSN - rules: - - name: Post event - condition: true - action: - pg_notify: - dsn: "{{ EDA_PG_NOTIFY_DSN }}" - channel: "{{ EDA_PG_NOTIFY_CHANNEL }}" - event: "{{ event }}" -""" -BAD_PG_NOTIFY_NO_COMPLEMENTARY_SOURCE = """ ---- -- name: PG Notify Template Event Stream - hosts: all - sources: - - name: my_range - ansible.eda.range: - limit: 5 - extra_vars: - EDA_PG_NOTIFY_DSN: "{{ settings.PG_NOTIFY_DSN }}" - EDA_PG_NOTIFY_CHANNEL: "{{ event_stream.channel_name }}" - encrypt_vars: - - EDA_PG_NOTIFY_DSN - rules: - - name: Post event - condition: true - action: - pg_notify: - dsn: "{{ EDA_PG_NOTIFY_DSN }}" - channel: "{{ EDA_PG_NOTIFY_CHANNEL }}" - event: "{{ event }}" -""" - - -@pytest.mark.django_db -def test_list_event_streams( - admin_client: APIClient, - check_permission_mock: mock.Mock, - default_decision_environment: models.DecisionEnvironment, - default_user: models.User, -): - event_streams = models.EventStream.objects.bulk_create( - [ - models.EventStream( - name="test-event_stream-1", - source_type="ansible.eda.range", - source_args={"limit": 5, "delay": 1}, - user=default_user, - decision_environment_id=default_decision_environment.id, - ), - models.EventStream( - name="test-event_stream-2", - source_type="ansible.eda.range", - source_args={"limit": 6, "delay": 2}, - user=default_user, - decision_environment_id=default_decision_environment.id, - ), - ] - ) - - response = admin_client.get(f"{api_url_v1}/event-streams/") - assert response.status_code == status.HTTP_200_OK - assert len(response.data["results"]) == 2 - assert ( - response.data["results"][1]["source_type"] - == event_streams[0].source_type - ) - assert response.data["results"][1]["name"] == event_streams[0].name - assert response.data["results"][1]["user"] == "luke.skywalker" - - check_permission_mock.assert_called_once_with( - mock.ANY, mock.ANY, ResourceType.EVENT_STREAM, Action.READ - ) - - -@pytest.mark.django_db -def test_retrieve_event_stream( - admin_client: APIClient, - check_permission_mock: mock.Mock, - default_decision_environment: models.DecisionEnvironment, - default_user: models.User, -): - args = {"limit": 5, "delay": 1} - event_stream = models.EventStream.objects.create( - name="test-event_stream-1", - source_type="ansible.eda.range", - source_args=args, - user=default_user, - decision_environment_id=default_decision_environment.id, - ) - - response = admin_client.get( - f"{api_url_v1}/event-streams/{event_stream.id}/" - ) - assert response.status_code == status.HTTP_200_OK - assert response.data["name"] == event_stream.name - assert response.data["source_type"] == event_stream.source_type - assert yaml.safe_load(response.data["source_args"]) == args - assert response.data["user"] == "luke.skywalker" - - check_permission_mock.assert_called_once_with( - mock.ANY, mock.ANY, ResourceType.EVENT_STREAM, Action.READ - ) - - -@pytest.mark.django_db -def test_create_event_stream( - admin_client: APIClient, - default_decision_environment: models.DecisionEnvironment, -): - models.Rulebook.objects.create( - name=PG_NOTIFY_TEMPLATE_RULEBOOK_NAME, - rulesets=PG_NOTIFY_TEMPLATE_RULEBOOK_DATA, - ) - - args = {"limit": 5, "delay": 1} - source_type = "ansible.eda.range" - data_in = { - "name": "test_event_stream", - "source_type": f"{source_type}", - "source_args": f"{args}", - "decision_environment_id": default_decision_environment.id, - } - response = admin_client.post(f"{api_url_v1}/event-streams/", data=data_in) - assert response.status_code == status.HTTP_201_CREATED - result = response.data - assert result["name"] == "test_event_stream" - assert result["source_type"] == source_type - assert result["user"] == "test.admin" - assert yaml.safe_load(response.data["source_args"]) == args - - event_stream = models.EventStream.objects.first() - rulesets = yaml.safe_load(event_stream.rulebook_rulesets) - source = rulesets[0]["sources"][0] - assert source[source_type] == args - assert source["name"] == "test_event_stream" - - -@pytest.mark.django_db -def test_create_event_stream_blank_text( - client: APIClient, - default_de: models.DecisionEnvironment, -): - models.Rulebook.objects.create( - name=PG_NOTIFY_TEMPLATE_RULEBOOK_NAME, - rulesets=PG_NOTIFY_TEMPLATE_RULEBOOK_DATA, - ) - - args = {"limit": 5, "delay": 1} - source_type = "ansible.eda.range" - data_in = { - "name": "test_event_stream", - "source_type": f"{source_type}", - "source_args": f"{args}", - "decision_environment_id": default_de.id, - "description": "", - "extra_vars": "", - "k8s_service_name": "", - } - response = client.post(f"{api_url_v1}/event-streams/", data=data_in) - assert response.status_code == status.HTTP_201_CREATED - result = response.data - assert result["name"] == "test_event_stream" - assert result["source_type"] == source_type - assert result["user"] == "test.admin" - assert result["description"] == data_in["description"] - - # An extra_var empty string is outbound serialized as None; it's considered - # a "no value" situation. - assert result["extra_var"] is None - - assert result["k8s_service_name"] == data_in["k8s_service_name"] - assert yaml.safe_load(response.data["source_args"]) == args - - event_stream = models.EventStream.objects.first() - rulesets = yaml.safe_load(event_stream.rulebook_rulesets) - source = rulesets[0]["sources"][0] - assert source[source_type] == args - assert source["name"] == "test_event_stream" - - -@pytest.mark.django_db -def test_create_event_stream_with_different_default_channel_names( - admin_client: APIClient, - default_decision_environment: models.DecisionEnvironment, -): - models.Rulebook.objects.create( - name=PG_NOTIFY_TEMPLATE_RULEBOOK_NAME, - rulesets=PG_NOTIFY_TEMPLATE_RULEBOOK_DATA, - ) - - args = {"limit": 5, "delay": 1} - source_type = "ansible.eda.range" - data_in_1 = { - "name": "test_event_stream", - "source_type": f"{source_type}", - "source_args": f"{args}", - "decision_environment_id": default_decision_environment.id, - } - response = admin_client.post( - f"{api_url_v1}/event-streams/", data=data_in_1 - ) - assert response.status_code == status.HTTP_201_CREATED - - data_in_2 = { - "name": "test_event_stream_2", - "source_type": f"{source_type}", - "source_args": f"{args}", - "decision_environment_id": default_decision_environment.id, - } - response = admin_client.post( - f"{api_url_v1}/event-streams/", data=data_in_2 - ) - - assert response.status_code == status.HTTP_201_CREATED - assert models.EventStream.objects.count() == 2 - assert ( - models.EventStream.objects.first().channel_name - != models.EventStream.objects.last().channel_name - ) - - -@pytest.mark.django_db -def test_create_event_stream_with_credential( - admin_client: APIClient, - default_decision_environment: models.DecisionEnvironment, -): - models.Rulebook.objects.create( - name=PG_NOTIFY_TEMPLATE_RULEBOOK_NAME, - rulesets=PG_NOTIFY_TEMPLATE_RULEBOOK_DATA, - ) - - args = {"limit": 5, "delay": 1} - data_in = { - "name": "test_event_stream", - "source_type": "ansible.eda.range", - "source_args": f"{args}", - "decision_environment_id": default_decision_environment.id, - } - response = admin_client.post(f"{api_url_v1}/event-streams/", data=data_in) - assert response.status_code == status.HTTP_201_CREATED - result = response.data - assert result["name"] == "test_event_stream" - assert result["source_type"] == "ansible.eda.range" - assert yaml.safe_load(response.data["source_args"]) == args - - -@pytest.mark.parametrize( - "bad_rulebooks", - [ - {"bad_rulebook_1": f"{BAD_PG_NOTIFY_TEMPLATE_RULEBOOK_NO_TYPE}"}, - {"bad_rulebook_2": f"{BAD_PG_NOTIFY_TEMPLATE_RULEBOOK_NO_NAME}"}, - {"bad_rulebook_3": f"{BAD_PG_NOTIFY_TEMPLATE_RULEBOOK_NO_ARGS}"}, - {"bad_rulebook_4": f"{BAD_PG_NOTIFY_NO_COMPLEMENTARY_SOURCE}"}, - ], -) -@pytest.mark.django_db -def test_create_event_stream_with_bad_rulebook( - admin_client: APIClient, - default_decision_environment: models.DecisionEnvironment, - settings, - bad_rulebooks, -): - for key in bad_rulebooks: - settings.PG_NOTIFY_TEMPLATE_RULEBOOK = key - settings.PG_NOTIFY_DSN = ( - "host=localhost port=5432 dbname=eda user=postgres password=secret" - ) - models.Rulebook.objects.create( - name=key, - rulesets=bad_rulebooks[key], - ) - - args = {"limit": 5, "delay": 1} - data_in = { - "name": "test_event_stream", - "source_type": "ansible.eda.range", - "source_args": f"{args}", - "decision_environment_id": default_decision_environment.id, - } - response = admin_client.post( - f"{api_url_v1}/event-streams/", data=data_in - ) - assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR - assert response.data["detail"].startswith( - "Configuration Error: Event stream template rulebook is missing " - ) - - -@pytest.mark.django_db -def test_create_event_stream_bad_channel_name( - admin_client: APIClient, - default_decision_environment: models.DecisionEnvironment, -): - args = {"limit": 5, "delay": 1} - data_in = { - "name": "test_event_stream", - "source_type": "ansible.eda.range", - "source_args": f"{args}", - "channel_name": "abc-def", - "decision_environment_id": default_decision_environment.id, - } - response = admin_client.post(f"{api_url_v1}/event-streams/", data=data_in) - assert response.status_code == status.HTTP_400_BAD_REQUEST - assert ( - str(response.data["channel_name"][0]) - == "Channel name can only contain alphanumeric and " - "underscore characters" - ) - - -@pytest.mark.django_db -def test_create_event_stream_bad_args( - admin_client: APIClient, - default_decision_environment: models.DecisionEnvironment, -): - data_in = { - "name": "test_event_stream", - "source_type": "ansible.eda.range", - "source_args": "gobbledegook", - "decision_environment_id": default_decision_environment.id, - } - response = admin_client.post(f"{api_url_v1}/event-streams/", data=data_in) - assert response.status_code == status.HTTP_400_BAD_REQUEST - result = response.data - assert ( - str(result["source_args"][0]) - == "The input field must be a YAML object (dictionary)" - ) - - -@pytest.mark.django_db -def test_create_event_stream_empty_args( - admin_client: APIClient, - default_decision_environment: models.DecisionEnvironment, -): - data_in = { - "name": "test_event_stream", - "source_type": "ansible.eda.generic", - "decision_environment_id": default_decision_environment.id, - } - response = admin_client.post(f"{api_url_v1}/event-streams/", data=data_in) - assert response.status_code == status.HTTP_400_BAD_REQUEST - assert response.data["source_args"][0] == "This field is required." - - -@pytest.mark.django_db -def test_create_event_stream_bad_de(admin_client: APIClient): - data_in = { - "name": "test_event_stream", - "source_type": "ansible.eda.generic", - "decision_environment_id": 99999, - } - response = admin_client.post(f"{api_url_v1}/event-streams/", data=data_in) - assert response.status_code == status.HTTP_400_BAD_REQUEST - result = response.data - assert ( - str(result["decision_environment_id"][0]) - == "DecisionEnvironment with id 99999 does not exist" - ) - - -@pytest.mark.django_db -def test_create_event_stream_no_de( - admin_client: APIClient, -): - data_in = { - "name": "test_event_stream", - "source_type": "ansible.eda.generic", - } - response = admin_client.post(f"{api_url_v1}/event-streams/", data=data_in) - assert response.status_code == status.HTTP_400_BAD_REQUEST - result = response.data - assert result["decision_environment_id"][0] == "This field is required." - - -@pytest.mark.django_db -def test_list_event_stream_instances( - admin_client: APIClient, - default_decision_environment: models.DecisionEnvironment, - default_user: models.User, -): - args = {"limit": 5, "delay": 1} - event_stream = models.EventStream.objects.create( - name="test-event_stream-1", - source_type="ansible.eda.range", - source_args=args, - user=default_user, - decision_environment_id=default_decision_environment.id, - ) - - instances = models.RulebookProcess.objects.bulk_create( - [ - models.RulebookProcess( - name="test-activation-instance-1", - event_stream=event_stream, - parent_type=ProcessParentType.EVENT_STREAM, - ), - models.RulebookProcess( - name="test-activation-instance-1", - event_stream=event_stream, - parent_type=ProcessParentType.EVENT_STREAM, - ), - ] - ) - models.RulebookProcessQueue.objects.create( - process=instances[0], - queue_name="activation", - ) - response = admin_client.get( - f"{api_url_v1}/event-streams/{event_stream.id}/instances/" - ) - data = response.data["results"] - assert response.status_code == status.HTTP_200_OK - assert len(data) == len(instances) - assert data[0]["name"] == instances[0].name - assert data[1]["name"] == instances[1].name - assert data[0]["queue_name"] == "activation" - assert data[1]["queue_name"] is None - - -@pytest.mark.django_db -def test_retrieve_event_stream_instance( - admin_client: APIClient, - default_decision_environment: models.DecisionEnvironment, - default_user: models.User, -): - args = {"limit": 5, "delay": 1} - event_stream = models.EventStream.objects.create( - name="test-event_stream-1", - source_type="ansible.eda.range", - source_args=args, - user=default_user, - decision_environment_id=default_decision_environment.id, - ) - - instance = models.RulebookProcess.objects.create( - name="test-activation-instance-1", - event_stream=event_stream, - parent_type=ProcessParentType.EVENT_STREAM, - ) - models.RulebookProcessQueue.objects.create( - process=instance, - queue_name="activation", - ) - response = admin_client.get( - ( - f"{api_url_v1}/event-streams/{event_stream.id}" - f"/instances/{instance.id}/" - ), - ) - data = response.data - assert response.status_code == status.HTTP_200_OK - assert data["name"] == instance.name - assert data["queue_name"] == "activation" diff --git a/tests/integration/api/test_root.py b/tests/integration/api/test_root.py index bf7490b98..098466335 100644 --- a/tests/integration/api/test_root.py +++ b/tests/integration/api/test_root.py @@ -38,7 +38,6 @@ "/activation-instances/", "/audit-rules/", "/users/", - "/event-streams/", "/users/me/awx-tokens/", "/credential-types/", "/eda-credentials/", @@ -77,7 +76,6 @@ "/activation-instances/", "/audit-rules/", "/users/", - "/event-streams/", "/users/me/awx-tokens/", "/credential-types/", "/eda-credentials/", diff --git a/tests/integration/core/conftest.py b/tests/integration/core/conftest.py index cf64786a1..04bb11307 100644 --- a/tests/integration/core/conftest.py +++ b/tests/integration/core/conftest.py @@ -24,25 +24,9 @@ def new_activation(new_user): ) -@pytest.fixture() -def new_event_stream(new_user): - return models.EventStream.objects.create( - name="event_stream", - user=new_user, - ) - - @pytest.fixture() def new_rulebook_process_with_activation(new_activation): return models.RulebookProcess.objects.create( name="test-instance", activation=new_activation, ) - - -@pytest.fixture() -def new_rulebook_process_with_event_stream(new_event_stream): - return models.RulebookProcess.objects.create( - name="test-instance", - event_stream=new_event_stream, - ) diff --git a/tests/integration/core/test_process_parent.py b/tests/integration/core/test_process_parent.py index cdeafb3e9..61052889b 100644 --- a/tests/integration/core/test_process_parent.py +++ b/tests/integration/core/test_process_parent.py @@ -27,10 +27,6 @@ lazy_fixture("new_activation"), id="activation", ), - pytest.param( - lazy_fixture("new_event_stream"), - id="event_stream", - ), ], ) @pytest.mark.django_db @@ -45,8 +41,6 @@ def test_latest_instance_field(instance): if isinstance(instance, models.Activation): kwargs["activation"] = instance - else: - kwargs["event_stream"] = instance first_instance = models.RulebookProcess.objects.create(**kwargs) assert instance.latest_instance == first_instance @@ -67,10 +61,6 @@ def test_latest_instance_field(instance): lazy_fixture("new_activation"), id="activation", ), - pytest.param( - lazy_fixture("new_event_stream"), - id="event_stream", - ), ], ) @pytest.mark.django_db diff --git a/tests/integration/core/test_rulebook_process.py b/tests/integration/core/test_rulebook_process.py index 8280ec8cb..2a07ee2d0 100644 --- a/tests/integration/core/test_rulebook_process.py +++ b/tests/integration/core/test_rulebook_process.py @@ -103,10 +103,6 @@ def test_rulebook_process_save(init_data): lazy_fixture("new_rulebook_process_with_activation"), id="activation", ), - pytest.param( - lazy_fixture("new_rulebook_process_with_event_stream"), - id="event_stream", - ), ], ) @pytest.mark.django_db @@ -125,10 +121,6 @@ def test_rulebook_process_parent_type(instance): lazy_fixture("new_rulebook_process_with_activation"), id="activation", ), - pytest.param( - lazy_fixture("new_rulebook_process_with_event_stream"), - id="event_stream", - ), ], ) @pytest.mark.django_db @@ -136,5 +128,3 @@ def test_rulebook_process_get_parent(instance): """Test get_parent method returns the correct parent instance.""" if instance.activation: assert instance.get_parent() == instance.activation - else: - assert instance.get_parent() == instance.event_stream diff --git a/tests/integration/core/test_status_handler.py b/tests/integration/core/test_status_handler.py index 577060c52..ba0fddd3b 100644 --- a/tests/integration/core/test_status_handler.py +++ b/tests/integration/core/test_status_handler.py @@ -31,10 +31,6 @@ lazy_fixture("new_activation"), id="activation", ), - pytest.param( - lazy_fixture("new_event_stream"), - id="event_stream", - ), ], ) @pytest.mark.django_db @@ -53,10 +49,6 @@ def test_save_with_errors(instance): lazy_fixture("new_activation"), id="activation", ), - pytest.param( - lazy_fixture("new_event_stream"), - id="event_stream", - ), ], ) @pytest.mark.django_db @@ -80,10 +72,6 @@ def test_save_with_invalid_status(instance): lazy_fixture("new_activation"), id="activation", ), - pytest.param( - lazy_fixture("new_event_stream"), - id="event_stream", - ), ], ) @pytest.mark.django_db diff --git a/tests/integration/services/activation/engine/test_kubernetes.py b/tests/integration/services/activation/engine/test_kubernetes.py index a37801f3e..02f057f5c 100644 --- a/tests/integration/services/activation/engine/test_kubernetes.py +++ b/tests/integration/services/activation/engine/test_kubernetes.py @@ -276,24 +276,10 @@ def kubernetes_engine(init_data): yield engine -@pytest.fixture -def kubernetes_event_stream_engine(init_data): - activation_id = init_data.activation.id - with mock.patch("builtins.open", mock.mock_open(read_data="aap-eda")): - engine = Engine( - activation_id=str(activation_id), - resource_prefix=ProcessParentType.EVENT_STREAM, - client=mock.Mock(), - ) - - yield engine - - @pytest.mark.parametrize( "resource_prefixes", [ {ProcessParentType.ACTIVATION: "activation"}, - {ProcessParentType.EVENT_STREAM: "event-stream"}, ], ) @pytest.mark.django_db @@ -379,32 +365,6 @@ def test_engine_start(init_data, kubernetes_engine): ) -@pytest.mark.django_db -def test_event_stream_engine_start(init_data, kubernetes_event_stream_engine): - engine = kubernetes_event_stream_engine - request = get_request(init_data) - log_handler = DBLogger(init_data.activation_instance.id) - - with mock.patch("aap_eda.services.activation.engine.kubernetes.watch"): - with mock.patch.object(engine.client, "core_api") as core_api_mock: - core_api_mock.list_namespaced_service.return_value.items = None - engine.start(request, log_handler) - - assert engine.job_name == ( - f"event-stream-job-{init_data.activation.id}-" - f"{init_data.activation_instance.id}" - ) - assert engine.pod_name == ( - f"event-stream-pod-{init_data.activation.id}-" - f"{init_data.activation_instance.id}" - ) - - assert models.RulebookProcessLog.objects.count() == 5 - assert models.RulebookProcessLog.objects.last().log.endswith( - f"Job {engine.job_name} is running" - ) - - @pytest.mark.django_db def test_engine_start_with_create_job_exception(init_data, kubernetes_engine): engine = kubernetes_engine diff --git a/tests/integration/services/activation/test_event_stream.py b/tests/integration/services/activation/test_event_stream.py deleted file mode 100644 index 99bcb7164..000000000 --- a/tests/integration/services/activation/test_event_stream.py +++ /dev/null @@ -1,115 +0,0 @@ -# Copyright 2024 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""EventStream Containerable Interface tests.""" - -import pytest - -from aap_eda.core import models -from aap_eda.services.activation.engine.common import ( - AnsibleRulebookCmdLine, - ContainerableInvalidError, - ContainerRequest, -) - -PROJECT_GIT_HASH = "684f62df18ce5f8d5c428e53203b9b975426eed0" - - -@pytest.fixture -def event_stream_no_instance( - default_user: models.User, - default_decision_environment: models.DecisionEnvironment, - default_rulebook: models.Rulebook, -) -> models.EventStream: - """Return an event stream without associated RulebookProcess.""" - return models.EventStream.objects.create( - name="test-event-stream", - user=default_user, - decision_environment=default_decision_environment, - rulebook=default_rulebook, - # rulebook_rulesets is populated by the serializer - rulebook_rulesets=default_rulebook.rulesets, - ) - - -@pytest.fixture -def event_stream(event_stream_no_instance) -> models.EventStream: - """Return an event stream with associated RulebookProcess.""" - models.RulebookProcess.objects.create( - name="event-stream-instance-1", - event_stream=event_stream_no_instance, - git_hash=PROJECT_GIT_HASH, - ) - return event_stream_no_instance - - -@pytest.mark.django_db -def test_container_request_no_credential(event_stream): - """Test container params when no credential exists.""" - request = event_stream.get_container_request() - assert request.credential is None - - -@pytest.mark.django_db -def test_get_container_request(event_stream): - """Test the construction of a ContainerRequest.""" - request = event_stream.get_container_request() - - assert isinstance(request, ContainerRequest) - assert request.name is not None - assert request.image_url is not None - assert request.ports is not None - assert request.env_vars is not None - assert request.extra_args is not None - assert request.mem_limit is not None - assert request.mounts is not None - assert request.process_parent_id == event_stream.id - assert request.rulebook_process_id == event_stream.latest_instance.id - - cmdline = request.cmdline - assert (cmdline is not None) and isinstance( - cmdline, AnsibleRulebookCmdLine - ) - assert cmdline.id == str(event_stream.latest_instance.id) - assert cmdline.ws_url is not None - assert cmdline.log_level is None - assert cmdline.ws_ssl_verify is not None - assert cmdline.ws_token_url is not None - assert cmdline.ws_access_token is not None - assert cmdline.ws_refresh_token is not None - assert cmdline.heartbeat is not None - assert cmdline.skip_audit_events - assert "--skip-audit-events" in cmdline.get_args() - - -@pytest.mark.django_db -def test_get_container_request_no_instance(event_stream_no_instance): - """Test the construction of a ContainerRequest.""" - with pytest.raises(ContainerableInvalidError): - event_stream_no_instance.get_container_request() - - -@pytest.mark.parametrize( - "value,expected", - [ - ("debug", "-vv"), - ("info", "-v"), - ("error", None), - ], -) -@pytest.mark.django_db -def test_log_level_param_event_stream(event_stream, value, expected): - event_stream.log_level = value - event_stream.save(update_fields=["log_level"]) - request = event_stream.get_container_request() - assert request.cmdline.log_level == expected diff --git a/tests/integration/services/activation/test_manager.py b/tests/integration/services/activation/test_manager.py index 23ad63c69..e6dbf01be 100644 --- a/tests/integration/services/activation/test_manager.py +++ b/tests/integration/services/activation/test_manager.py @@ -139,28 +139,6 @@ def new_activation_with_instance( return activation -@pytest.fixture -def new_event_stream_with_instance( - default_user: models.User, - default_decision_environment: models.DecisionEnvironment, - default_rulebook: models.Rulebook, -) -> models.EventStream: - """Return an event stream with an instance.""" - event_stream = models.EventStream.objects.create( - name="new_event_stream_with_instance", - user=default_user, - decision_environment=default_decision_environment, - rulebook=default_rulebook, - # rulebook_rulesets is populated by the serializer - rulebook_rulesets=default_rulebook.rulesets, - ) - models.RulebookProcess.objects.create( - event_stream=event_stream, - status=enums.ActivationStatus.RUNNING, - ) - return event_stream - - @pytest.fixture def container_engine_mock() -> MagicMock: return create_autospec(ContainerEngine, instance=True) @@ -707,28 +685,12 @@ def test_init_status_manager_with_activation(basic_activation): ) -@pytest.mark.django_db -def test_init_status_manager_with_event_stream(new_event_stream_with_instance): - status_manager = StatusManager(new_event_stream_with_instance) - assert status_manager.db_instance == new_event_stream_with_instance - assert ( - status_manager.latest_instance - == new_event_stream_with_instance.latest_instance - ) - assert ( - status_manager.db_instance_type == enums.ProcessParentType.EVENT_STREAM - ) - - @pytest.mark.parametrize( "process_parent", [ pytest.param( lazy_fixture("new_activation_with_instance"), ), - pytest.param( - lazy_fixture("new_event_stream_with_instance"), - ), ], ) @pytest.mark.django_db @@ -752,9 +714,6 @@ def test_status_manager_set_latest_instance_status(process_parent): pytest.param( lazy_fixture("new_activation_with_instance"), ), - pytest.param( - lazy_fixture("new_event_stream_with_instance"), - ), ], ) @pytest.mark.django_db diff --git a/tests/unit/test_orchestrator.py b/tests/unit/test_orchestrator.py index 456d8c856..f663918ce 100644 --- a/tests/unit/test_orchestrator.py +++ b/tests/unit/test_orchestrator.py @@ -20,7 +20,7 @@ from django.conf import settings from aap_eda.core.enums import ProcessParentType -from aap_eda.core.models import Activation, EventStream, RulebookProcess +from aap_eda.core.models import Activation, RulebookProcess from aap_eda.settings import default from aap_eda.tasks import orchestrator from aap_eda.tasks.exceptions import UnknownProcessParentType @@ -369,20 +369,6 @@ def test_get_process_parent_activation(): assert result == activation_mock -def test_get_process_parent_event_stream(): - event_stream_id = 1 - event_stream_mock = mock.Mock(spec=EventStream) - EventStream.objects.get = mock.Mock(return_value=event_stream_mock) - - result = get_process_parent( - ProcessParentType.EVENT_STREAM, - event_stream_id, - ) - - EventStream.objects.get.assert_called_once_with(id=event_stream_id) - assert result == event_stream_mock - - def test_get_process_parent_unknown_type(): process_parent_type = "unknown" parent_id = 1