Skip to content

Commit 4f30ae0

Browse files
committed
Combine event streams with activations
1 parent 1006529 commit 4f30ae0

File tree

23 files changed

+1466
-21
lines changed

23 files changed

+1466
-21
lines changed

src/aap_eda/api/constants.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,34 @@
1414

1515
# EDA_SERVER_VAULT_LABEL is reserved for system vault password identifiers
1616
EDA_SERVER_VAULT_LABEL = "EDA_SERVER"
17+
18+
PG_NOTIFY_TEMPLATE_RULEBOOK_NAME = "_PG_NOTIFY_TEMPLATE_RULEBOOK_"
19+
PG_NOTIFY_TEMPLATE_RULEBOOK_DATA = """
20+
---
21+
- name: PG Notify Template Event Stream
22+
hosts: all
23+
sources:
24+
- name: my_range
25+
ansible.eda.range:
26+
limit: 5
27+
complementary_source:
28+
type: ansible.eda.pg_listener
29+
name: Postgres Listener
30+
args:
31+
dsn: "{{ EDA_PG_NOTIFY_DSN }}"
32+
channels:
33+
- "{{ EDA_PG_NOTIFY_CHANNEL }}"
34+
extra_vars:
35+
EDA_PG_NOTIFY_DSN: "{{ settings.PG_NOTIFY_DSN }}"
36+
EDA_PG_NOTIFY_CHANNEL: "{{ event_stream.channel_name }}"
37+
encrypt_vars:
38+
- EDA_PG_NOTIFY_DSN
39+
rules:
40+
- name: Post event
41+
condition: true
42+
action:
43+
pg_notify:
44+
dsn: "{{ EDA_PG_NOTIFY_DSN }}"
45+
channel: "{{ EDA_PG_NOTIFY_CHANNEL }}"
46+
event: "{{ event }}"
47+
"""

src/aap_eda/api/exceptions.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,3 +95,33 @@ class InvalidWebsocketHost(APIException):
9595
default_detail = (
9696
"Connection Error: WebSocket URL must have a valid host address."
9797
)
98+
99+
100+
class MissingEventStreamRulebook(APIException):
101+
status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
102+
default_detail = (
103+
"Configuration Error: Event stream template rulebook not found"
104+
)
105+
106+
107+
class MissingEventStreamRulebookKeys(APIException):
108+
status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
109+
default_detail = (
110+
"Configuration Error: Event stream template rulebook is missing "
111+
"required keys in complementary_source: type, name and args"
112+
)
113+
114+
115+
class MissingEventStreamRulebookSource(APIException):
116+
status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
117+
default_detail = (
118+
"Configuration Error: Event stream template rulebook is missing "
119+
"required complementary_source"
120+
)
121+
122+
123+
class InvalidEventStreamRulebook(APIException):
124+
status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
125+
default_detail = (
126+
"Configuration Error: Event stream template rulebook is invalid"
127+
)

src/aap_eda/api/filters/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
)
2020
from .credential import CredentialFilter
2121
from .decision_environment import DecisionEnvironmentFilter
22+
from .event_stream import EventStreamFilter
2223
from .project import ProjectFilter
2324
from .role import RoleFilter
2425
from .rulebook import (
@@ -51,4 +52,6 @@
5152
"UserFilter",
5253
# role
5354
"RoleFilter",
55+
# event_stream
56+
"EventStreamFilter",
5457
)
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# Copyright 2024 Red Hat, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import django_filters
16+
17+
from aap_eda.core import models
18+
19+
20+
class EventStreamFilter(django_filters.FilterSet):
21+
name = django_filters.CharFilter(
22+
field_name="name",
23+
lookup_expr="istartswith",
24+
label="Filter by event source name.",
25+
)
26+
27+
class Meta:
28+
model = models.EventStream
29+
fields = ["name"]

src/aap_eda/api/serializers/__init__.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@
3838
DecisionEnvironmentRefSerializer,
3939
DecisionEnvironmentSerializer,
4040
)
41+
from .event_stream import (
42+
EventStreamCreateSerializer,
43+
EventStreamOutSerializer,
44+
EventStreamSerializer,
45+
)
4146
from .project import (
4247
ExtraVarCreateSerializer,
4348
ExtraVarRefSerializer,
@@ -122,4 +127,8 @@
122127
"RoleSerializer",
123128
"RoleListSerializer",
124129
"RoleDetailSerializer",
130+
# event streams
131+
"EventStreamSerializer",
132+
"EventStreamCreateSerializer",
133+
"EventStreamOutSerializer",
125134
)

src/aap_eda/api/serializers/activation.py

Lines changed: 98 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,65 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14+
import logging
15+
from typing import Union
16+
17+
import yaml
1418
from rest_framework import serializers
1519

20+
from aap_eda.api.constants import PG_NOTIFY_TEMPLATE_RULEBOOK_DATA
21+
from aap_eda.api.exceptions import InvalidEventStreamRulebook
1622
from aap_eda.api.serializers.credential import CredentialSerializer
1723
from aap_eda.api.serializers.decision_environment import (
1824
DecisionEnvironmentRefSerializer,
1925
)
26+
from aap_eda.api.serializers.event_stream import EventStreamOutSerializer
2027
from aap_eda.api.serializers.project import (
2128
ExtraVarRefSerializer,
2229
ProjectRefSerializer,
2330
)
2431
from aap_eda.api.serializers.rulebook import RulebookRefSerializer
32+
from aap_eda.api.serializers.utils import (
33+
substitute_extra_vars,
34+
substitute_source_args,
35+
swap_sources,
36+
)
2537
from aap_eda.core import models, validators
38+
from aap_eda.core.enums import ProcessParentType
39+
40+
logger = logging.getLogger(__name__)
41+
42+
43+
def _updated_ruleset(validated_data):
44+
try:
45+
sources_info = []
46+
47+
for event_stream_id in validated_data["event_streams"]:
48+
event_stream = models.EventStream.objects.get(id=event_stream_id)
49+
50+
if event_stream.rulebook:
51+
rulesets = yaml.safe_load(event_stream.rulebook.rulesets)
52+
else:
53+
rulesets = yaml.safe_load(PG_NOTIFY_TEMPLATE_RULEBOOK_DATA)
54+
55+
extra_vars = rulesets[0]["sources"][0].get("extra_vars", {})
56+
encrypt_vars = rulesets[0]["sources"][0].get("encrypt_vars", [])
57+
58+
# TODO: encrypt password when engine is ready for vaulted data
59+
extra_vars = substitute_extra_vars(
60+
event_stream.__dict__, extra_vars, encrypt_vars, "password"
61+
)
62+
63+
source = rulesets[0]["sources"][0]["complementary_source"]
64+
source = substitute_source_args(
65+
event_stream.__dict__, source, extra_vars
66+
)
67+
sources_info.append(source)
68+
69+
return swap_sources(validated_data["rulebook_rulesets"], sources_info)
70+
except Exception as e:
71+
logger.error(f"Failed to update rulesets: {e}")
72+
raise InvalidEventStreamRulebook(e)
2673

2774

2875
class ActivationSerializer(serializers.ModelSerializer):
@@ -34,6 +81,12 @@ class ActivationSerializer(serializers.ModelSerializer):
3481
child=CredentialSerializer(),
3582
)
3683

84+
event_streams = serializers.ListField(
85+
required=False,
86+
allow_null=True,
87+
child=EventStreamOutSerializer(),
88+
)
89+
3790
class Meta:
3891
model = models.Activation
3992
fields = [
@@ -57,6 +110,7 @@ class Meta:
57110
"status_message",
58111
"awx_token_id",
59112
"credentials",
113+
"event_streams",
60114
]
61115
read_only_fields = [
62116
"id",
@@ -77,6 +131,12 @@ class ActivationListSerializer(serializers.ModelSerializer):
77131
child=CredentialSerializer(),
78132
)
79133

134+
event_streams = serializers.ListField(
135+
required=False,
136+
allow_null=True,
137+
child=EventStreamOutSerializer(),
138+
)
139+
80140
class Meta:
81141
model = models.Activation
82142
fields = [
@@ -100,6 +160,7 @@ class Meta:
100160
"status_message",
101161
"awx_token_id",
102162
"credentials",
163+
"event_streams",
103164
]
104165
read_only_fields = ["id", "created_at", "modified_at"]
105166

@@ -111,6 +172,10 @@ def to_representation(self, activation):
111172
CredentialSerializer(credential).data
112173
for credential in activation.credentials.all()
113174
]
175+
event_streams = [
176+
EventStreamOutSerializer(event_stream).data
177+
for event_stream in activation.event_streams.all()
178+
]
114179

115180
return {
116181
"id": activation.id,
@@ -133,6 +198,7 @@ def to_representation(self, activation):
133198
"status_message": activation.status_message,
134199
"awx_token_id": activation.awx_token_id,
135200
"credentials": credentials,
201+
"event_streams": event_streams,
136202
}
137203

138204

@@ -152,6 +218,7 @@ class Meta:
152218
"restart_policy",
153219
"awx_token_id",
154220
"credentials",
221+
"event_streams",
155222
]
156223

157224
rulebook_id = serializers.IntegerField(
@@ -177,6 +244,12 @@ class Meta:
177244
allow_null=True,
178245
child=serializers.IntegerField(),
179246
)
247+
event_streams = serializers.ListField(
248+
required=False,
249+
allow_null=True,
250+
child=serializers.IntegerField(),
251+
validators=[validators.check_if_event_streams_exists],
252+
)
180253

181254
def validate(self, data):
182255
user = data["user"]
@@ -200,6 +273,10 @@ def create(self, validated_data):
200273
validated_data["rulebook_rulesets"] = rulebook.rulesets
201274
validated_data["git_hash"] = rulebook.project.git_hash
202275
validated_data["project_id"] = rulebook.project.id
276+
if validated_data.get("event_streams"):
277+
validated_data["rulebook_rulesets"] = _updated_ruleset(
278+
validated_data
279+
)
203280
return super().create(validated_data)
204281

205282

@@ -215,6 +292,7 @@ class Meta:
215292
"git_hash",
216293
"status_message",
217294
"activation_id",
295+
"event_stream_id",
218296
"started_at",
219297
"ended_at",
220298
]
@@ -243,6 +321,11 @@ class ActivationReadSerializer(serializers.ModelSerializer):
243321
rules_count = serializers.IntegerField()
244322
rules_fired_count = serializers.IntegerField()
245323
restarted_at = serializers.DateTimeField(required=False, allow_null=True)
324+
event_streams = serializers.ListField(
325+
required=False,
326+
allow_null=True,
327+
child=EventStreamOutSerializer(),
328+
)
246329

247330
class Meta:
248331
model = models.Activation
@@ -270,6 +353,7 @@ class Meta:
270353
"status_message",
271354
"awx_token_id",
272355
"credentials",
356+
"event_streams",
273357
]
274358
read_only_fields = ["id", "created_at", "modified_at", "restarted_at"]
275359

@@ -297,7 +381,8 @@ def to_representation(self, activation):
297381
else None
298382
)
299383
activation_instances = models.RulebookProcess.objects.filter(
300-
activation_id=activation.id
384+
activation_id=activation.id,
385+
parent_type=ProcessParentType.ACTIVATION,
301386
)
302387
rules_count, rules_fired_count = get_rules_count(
303388
activation.ruleset_stats
@@ -315,6 +400,11 @@ def to_representation(self, activation):
315400
for credential in activation.credentials.all()
316401
]
317402

403+
event_streams = [
404+
EventStreamOutSerializer(event_stream).data
405+
for event_stream in activation.event_streams.all()
406+
]
407+
318408
return {
319409
"id": activation.id,
320410
"name": activation.name,
@@ -341,6 +431,7 @@ def to_representation(self, activation):
341431
"status_message": activation.status_message,
342432
"awx_token_id": activation.awx_token_id,
343433
"credentials": credentials,
434+
"event_streams": event_streams,
344435
}
345436

346437

@@ -356,7 +447,9 @@ class PostActivationSerializer(serializers.ModelSerializer):
356447
allow_null=True,
357448
validators=[validators.check_if_extra_var_exists],
358449
)
450+
# TODO: is_activation_valid needs to tell event stream/activation
359451
awx_token_id = serializers.IntegerField(
452+
required=False,
360453
allow_null=True,
361454
validators=[validators.check_if_awx_token_exists],
362455
)
@@ -418,8 +511,11 @@ def parse_validation_errors(errors: dict) -> str:
418511
return str(messages)
419512

420513

421-
def validate_rulebook_token(rulebook_id: int) -> None:
514+
def validate_rulebook_token(rulebook_id: Union[int, None]) -> None:
422515
"""Validate if the rulebook requires an Awx Token."""
516+
if rulebook_id is None:
517+
return
518+
423519
rulebook = models.Rulebook.objects.get(id=rulebook_id)
424520

425521
# TODO: rulesets are stored as a string in the rulebook model

0 commit comments

Comments
 (0)