Skip to content

Commit

Permalink
Add analytics collector to EDA
Browse files Browse the repository at this point in the history
  • Loading branch information
hsong-rh committed Sep 17, 2024
1 parent a80ee8e commit 6f53b3f
Show file tree
Hide file tree
Showing 10 changed files with 1,355 additions and 5 deletions.
170 changes: 165 additions & 5 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ psycopg = "^3.1.17"
xxhash = "*"
pyjwt = { version = "*", extras = ["crypto"] }
ecdsa = "*"
insights-analytics-collector = "^0.3.2"
distro = "^1.9.0"

[tool.poetry.group.test.dependencies]
pytest = "*"
Expand Down
329 changes: 329 additions & 0 deletions src/aap_eda/analytics/analytics_collectors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,329 @@
import os
import platform
from datetime import datetime

import distro
from ansible_base.resource_registry.models.service_identifier import service_id
from django.conf import settings
from django.db.models import Manager, Q
from insights_analytics_collector import CsvFileSplitter, register

from aap_eda.analytics.collector import AnalyticsCollector
from aap_eda.core import models
from aap_eda.utils import get_eda_version


@register(
"config",
"1.0",
description="General platform configuration.",
config=True,
)
def config(**kwargs) -> dict:
install_type = "traditional"
if os.environ.get("container") == "oci":
install_type = "openshift"
elif "KUBERNETES_SERVICE_PORT" in os.environ:
install_type = "k8s"
return {
"install_uuid": service_id(),
"platform": {
"system": platform.system(),
"dist": distro.linux_distribution(),
"release": platform.release(),
"type": install_type,
},
# skip license related info so far
"eda_log_level": settings.APP_LOG_LEVEL,
"eda_version": get_eda_version(),
"eda_deployment_type": settings.DEPLOYMENT_TYPE,
}


@register(
"activations_table",
"1.0",
format="csv",
description="Data on activations",
)
def activations_table(
since: datetime, full_path: str, until: datetime, **kwargs
):
query = _get_query(models.Activation.objects, since, until)

return _copy_table("activations", query, full_path)


@register(
"audit_action_table",
"1.0",
format="csv",
description="Data on audit_actions",
)
def audit_actions_table(
since: datetime, full_path: str, until: datetime, **kwargs
):
audit_actions = _get_audit_action_qs(since, until)

audit_action_query = (
f"COPY ({audit_actions.query}) TO STDOUT WITH CSV HEADER"
)

return _copy_table("audit_actions", audit_action_query, full_path)


@register(
"audit_event_table",
"1.0",
format="csv",
description="Data on audit_events",
)
def audit_events_table(
since: datetime, full_path: str, until: datetime, **kwargs
):
audit_actions = _get_audit_action_qs(since, until)
audit_event_query = _get_audit_event_query(audit_actions)

return _copy_table("audit_events", audit_event_query, full_path)


@register(
"audit_rule_table",
"1.0",
format="csv",
description="Data on audit_rules",
)
def audit_rules_table(
since: datetime, full_path: str, until: datetime, **kwargs
):
audit_rules = _get_audit_rule_qs(since, until)
audit_rule_query = f"COPY ({audit_rules.query}) TO STDOUT WITH CSV HEADER"

return _copy_table("audit_rules", audit_rule_query, full_path)


@register(
"eda_credential_table",
"1.0",
format="csv",
description="Data on eda_credentials",
)
def eda_credentials_table(
since: datetime, full_path: str, until: datetime, **kwargs
):
query = _get_query(models.EdaCredential.objects, since, until)

return _copy_table("eda_credentials", query, full_path)


@register(
"credential_type_table",
"1.0",
format="csv",
description="Data on credential_types",
)
def credential_types_table(
since: datetime, full_path: str, until: datetime, **kwargs
):
query = _get_query(models.CredentialType.objects, since, until)

return _copy_table("credential_types", query, full_path)


@register(
"decision_environment_table",
"1.0",
format="csv",
description="Data on decision_environments",
)
def decision_environments_table(
since: datetime, full_path: str, until: datetime, **kwargs
):
query = _get_query(models.DecisionEnvironment.objects, since, until)
return _copy_table("decision_environments", query, full_path)


@register(
"event_stream_table",
"1.0",
format="csv",
description="Data on event_streams",
)
def event_streams_table(
since: datetime, full_path: str, until: datetime, **kwargs
):
query = _get_query(models.EventStream.objects, since, until)
return _copy_table("event_streams", query, full_path)


@register(
"project_table",
"1.0",
format="csv",
description="Data on projects",
)
def projects_table(since: datetime, full_path: str, until: datetime, **kwargs):
query = _get_query(models.Project.objects, since, until)
return _copy_table("projects", query, full_path)


@register(
"rulebook_table",
"1.0",
format="csv",
description="Data on rulebooks",
)
def rulebooks_table(
since: datetime, full_path: str, until: datetime, **kwargs
):
query = _get_query(models.Rulebook.objects, since, until)
return _copy_table("rulebooks", query, full_path)


@register(
"rulebook_process_table",
"1.0",
format="csv",
description="Data on rulebook_processes",
)
def rulebook_processes_table(
since: datetime, full_path: str, until: datetime, **kwargs
):
args = {"started_at": True}
query = _get_query(models.RulebookProcess.objects, since, until, **args)
return _copy_table("rulebook_processes", query, full_path)


@register(
"organization_table",
"1.0",
format="csv",
description="Data on organizations",
)
def organizations_table(
since: datetime, full_path: str, until: datetime, **kwargs
):
args = {"created": True}
query = _get_query(models.Organization.objects, since, until, **args)
return _copy_table("organizations", query, full_path)


@register(
"team_table",
"1.0",
format="csv",
description="Data on teams",
)
def teams_table(since: datetime, full_path: str, until: datetime, **kwargs):
args = {"created": True}
query = _get_query(models.Team.objects, since, until, **args)

return _copy_table("teams", query, full_path)


def _datetime_format(timestamp: datetime) -> str:
"""Convert datetime object to string."""
iso_format = timestamp.strftime("%Y-%m-%d %H:%M:%S.%f%z")
return iso_format[:-2] + ":" + iso_format[-2:]


def _get_query(
objects: Manager, since: datetime, until: datetime, **kwargs
) -> str:
"""Construct sql query with datetime params."""
if kwargs.get("started_at"):
qs = objects.filter(
Q(started_at__gt=since, started_at__lte=until)
| Q(updated_at__gt=since, updated_at__lte=until)
).order_by("id")
elif kwargs.get("created"):
qs = objects.filter(
Q(created__gt=since, created__lte=until)
| Q(modified__gt=since, modified__lte=until)
).order_by("id")
else:
qs = objects.filter(
Q(created_at__gt=since, created_at__lte=until)
| Q(modified_at__gt=since, modified_at__lte=until)
).order_by("id")
query = (
str(qs.query)
.replace(_datetime_format(since), f"'{since.isoformat()}'")
.replace(_datetime_format(until), f"'{until.isoformat()}'")
)

return f"COPY ({query}) TO STDOUT WITH CSV HEADER"


def _get_audit_event_query(actions: list[models.AuditAction]):
events = models.AuditEvent.objects.none()
for action in actions:
events |= action.audit_events.all()

query = str(events.distinct().query)

for action in actions:
query = query.replace(str(action.id), f"'{action.id}'")

return f"COPY ({query}) TO STDOUT WITH CSV HEADER"


def _get_audit_rule_qs(since: datetime, until: datetime):
activation_instance_ids = (
models.RulebookProcess.objects.filter(
Q(
started_at__gt=since.isoformat(),
started_at__lte=until.isoformat(),
)
| Q(
updated_at__gt=since.isoformat(),
updated_at__lte=until.isoformat(),
)
)
.values_list("id", flat=True)
.distinct()
)

if len(activation_instance_ids) == 0:
return []

if len(activation_instance_ids) == 1:
audit_rules = models.AuditRule.objects.filter(
activation_instance_id=activation_instance_ids[0]
).order_by("id")
else:
audit_rules = models.AuditRule.objects.filter(
activation_instance_id__in=tuple(activation_instance_ids)
).order_by("id")

return audit_rules


def _get_audit_action_qs(since: datetime, until: datetime):
audit_rules = _get_audit_rule_qs(since, until)
audit_rule_ids = audit_rules.values_list("id").distinct()

if len(audit_rule_ids) == 0:
return []

if len(audit_rule_ids) == 1:
audit_actions = models.AuditAction.objects.filter(
audit_rule_id=audit_rule_ids[0],
).order_by("id")
else:
audit_actions = models.AuditAction.objects.filter(
audit_rule_id__in=tuple(audit_rule_ids)
).order_by("id")

return audit_actions


def _copy_table(table, query, path):
file_path = os.path.join(path, table + "_table.csv")
file = CsvFileSplitter(filespec=file_path)
with AnalyticsCollector.db_connection().cursor() as cursor:
with cursor.copy(query) as copy:
while data := copy.read():
byte_data = bytes(data)
file.write(byte_data.decode())
return file.file_list()
60 changes: 60 additions & 0 deletions src/aap_eda/analytics/collector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import json

from django.conf import settings
from django.core.serializers.json import DjangoJSONEncoder
from django.db import connection
from insights_analytics_collector import Collector

from aap_eda.analytics.package import Package
from aap_eda.analytics.utils import datetime_hook


class AnalyticsCollector(Collector):
@staticmethod
def db_connection():
return connection

@staticmethod
def _package_class():
return Package

def get_last_gathering(self):
return self._last_gathering()

def _is_shipping_configured(self):
if not settings.INSIGHTS_TRACKING_STATE:
self.logger.warning(
"Insights for Event Driven Ansible is not enabled."
)
return False

return True

def _is_valid_license(self):
# ignore license information checking for now
return True

def _last_gathering(self):
return settings.AUTOMATION_ANALYTICS_LAST_GATHER

def _load_last_gathered_entries(self):
last_entries = settings.AUTOMATION_ANALYTICS_LAST_ENTRIES

return json.loads(
last_entries.value
if last_entries and last_entries.value
else "{}", # noqa: P103
object_hook=datetime_hook,
)

def _save_last_gathered_entries(self, last_gathered_entries):
self.logger.info(f"Save last_entries: {last_gathered_entries}")

settings.AUTOMATION_ANALYTICS_LAST_ENTRIES = json.dumps(
last_gathered_entries, cls=DjangoJSONEncoder
)

def _save_last_gather(self):
self.logger.info(f"Save last_gather: {self.gather_until}")

settings.AUTOMATION_ANALYTICS_LAST_GATHER = self.gather_until
Loading

0 comments on commit 6f53b3f

Please sign in to comment.