Skip to content

Commit

Permalink
Merge branch 'elementary-data:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
Nats-hope authored Nov 5, 2024
2 parents 3b7610d + 9c66df0 commit 2d280d9
Show file tree
Hide file tree
Showing 11 changed files with 243 additions and 101 deletions.
2 changes: 2 additions & 0 deletions elementary/monitor/alerts/alerts_groups/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from .alerts_group import AlertsGroup
from .base_alerts_group import BaseAlertsGroup
from .grouped_by_table import GroupedByTableAlerts

__all__ = [
"AlertsGroup",
"BaseAlertsGroup",
"GroupedByTableAlerts",
]
49 changes: 14 additions & 35 deletions elementary/monitor/alerts/alerts_groups/alerts_group.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,28 @@
from datetime import datetime
from typing import Dict, List, Union
from typing import List, Union

from elementary.monitor.alerts.alerts_groups.base_alerts_group import BaseAlertsGroup
from elementary.monitor.alerts.model_alert import ModelAlertModel
from elementary.monitor.alerts.source_freshness_alert import SourceFreshnessAlertModel
from elementary.monitor.alerts.test_alert import TestAlertModel


class AlertsGroup:
class AlertsGroup(BaseAlertsGroup):
test_errors: List[Union[TestAlertModel, SourceFreshnessAlertModel]]
test_warnings: List[Union[TestAlertModel, SourceFreshnessAlertModel]]
test_failures: List[Union[TestAlertModel, SourceFreshnessAlertModel]]
model_errors: List[ModelAlertModel]

def __init__(
self,
alerts: List[Union[TestAlertModel, ModelAlertModel, SourceFreshnessAlertModel]],
) -> None:
self.alerts = alerts
self.test_errors: List[Union[TestAlertModel, SourceFreshnessAlertModel]] = []
self.test_warnings: List[Union[TestAlertModel, SourceFreshnessAlertModel]] = []
self.test_failures: List[Union[TestAlertModel, SourceFreshnessAlertModel]] = []
self.model_errors: List[ModelAlertModel] = []
super().__init__(alerts)
self.test_errors = []
self.test_warnings = []
self.test_failures = []
self.model_errors = []
self._sort_alerts()

@property
def summary(self) -> str:
return f"{len(self.alerts)} issues detected"

@property
def detected_at(self) -> datetime:
return min(alert.detected_at or datetime.max for alert in self.alerts)

@property
def status(self) -> str:
if self.model_errors or self.test_errors:
Expand All @@ -35,25 +32,7 @@ def status(self) -> str:
else:
return "warn"

@property
def data(self) -> List[Dict]:
return [alert.data for alert in self.alerts]

@property
def unified_meta(self) -> Dict:
model_unified_meta = dict()
test_unified_meta = dict()
for alert in self.alerts:
alert_unified_meta = alert.unified_meta
if alert_unified_meta:
if isinstance(alert, ModelAlertModel):
model_unified_meta = alert_unified_meta
break

test_unified_meta = alert_unified_meta
return model_unified_meta or test_unified_meta

def _sort_alerts(self):
def _sort_alerts(self) -> None:
for alert in self.alerts:
if isinstance(alert, ModelAlertModel):
self.model_errors.append(alert)
Expand Down
38 changes: 38 additions & 0 deletions elementary/monitor/alerts/alerts_groups/base_alerts_group.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Dict, List, Sequence, Union

from elementary.monitor.alerts.model_alert import ModelAlertModel
from elementary.monitor.alerts.source_freshness_alert import SourceFreshnessAlertModel
from elementary.monitor.alerts.test_alert import TestAlertModel


class BaseAlertsGroup(ABC):
def __init__(
self,
alerts: Sequence[
Union[TestAlertModel, ModelAlertModel, SourceFreshnessAlertModel]
],
) -> None:
self.alerts = alerts

@property
def summary(self) -> str:
return f"{len(self.alerts)} issues detected"

@property
def detected_at(self) -> datetime:
return min(alert.detected_at or datetime.max for alert in self.alerts)

@property
@abstractmethod
def status(self) -> str:
...

@property
def data(self) -> List[Dict]:
return [alert.data for alert in self.alerts]

@property
def unified_meta(self) -> Dict:
return {}
17 changes: 16 additions & 1 deletion elementary/monitor/alerts/alerts_groups/grouped_by_table.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Optional
from typing import Dict, Optional

from elementary.monitor.alerts.alerts_groups.alerts_group import AlertsGroup
from elementary.monitor.alerts.model_alert import ModelAlertModel
from elementary.monitor.data_monitoring.alerts.integrations.utils.report_link import (
ReportLinkData,
get_model_test_runs_link,
Expand Down Expand Up @@ -30,3 +31,17 @@ def get_report_link(self) -> Optional[ReportLinkData]:
return get_model_test_runs_link(self.report_url, self.model_unique_id)

return None

@property
def unified_meta(self) -> Dict:
model_unified_meta = {}
test_unified_meta = {}
for alert in self.alerts:
alert_unified_meta = alert.unified_meta
if alert_unified_meta:
if isinstance(alert, ModelAlertModel):
model_unified_meta = alert_unified_meta
break

test_unified_meta = alert_unified_meta
return model_unified_meta or test_unified_meta
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from alive_progress import alive_it

from elementary.config.config import Config
from elementary.monitor.alerts.alerts_groups import AlertsGroup, GroupedByTableAlerts
from elementary.monitor.alerts.alerts_groups import GroupedByTableAlerts
from elementary.monitor.alerts.alerts_groups.base_alerts_group import BaseAlertsGroup
from elementary.monitor.alerts.grouping_type import GroupingType
from elementary.monitor.alerts.model_alert import ModelAlertModel
from elementary.monitor.alerts.source_freshness_alert import SourceFreshnessAlertModel
Expand Down Expand Up @@ -256,12 +257,12 @@ def _send_alerts(
alerts_with_progress_bar, self.config.group_alerts_threshold
):
if sent_successfully:
if isinstance(alert, AlertsGroup):
if isinstance(alert, BaseAlertsGroup):
sent_successfully_alerts.extend(alert.alerts)
else:
sent_successfully_alerts.append(alert)
else:
if isinstance(alert, AlertsGroup):
if isinstance(alert, BaseAlertsGroup):
for inner_alert in alert.alerts:
logger.error(
f"Could not send the alert - {inner_alert.id}. Full alert: {json.dumps(inner_alert.data)}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Generator, List, Sequence, Tuple, Union

from elementary.monitor.alerts.alerts_groups import AlertsGroup, GroupedByTableAlerts
from elementary.monitor.alerts.alerts_groups.base_alerts_group import BaseAlertsGroup
from elementary.monitor.alerts.model_alert import ModelAlertModel
from elementary.monitor.alerts.source_freshness_alert import SourceFreshnessAlertModel
from elementary.monitor.alerts.test_alert import TestAlertModel
Expand All @@ -25,7 +26,7 @@ def _get_alert_template(
ModelAlertModel,
SourceFreshnessAlertModel,
GroupedByTableAlerts,
AlertsGroup,
BaseAlertsGroup,
],
*args,
**kwargs,
Expand All @@ -44,7 +45,7 @@ def _get_alert_template(
return self._get_source_freshness_template(alert)
elif isinstance(alert, GroupedByTableAlerts):
return self._get_group_by_table_template(alert)
elif isinstance(alert, AlertsGroup):
elif isinstance(alert, BaseAlertsGroup):
return self._get_alerts_group_template(alert)

@abstractmethod
Expand Down Expand Up @@ -76,7 +77,7 @@ def _get_group_by_table_template(
raise NotImplementedError

@abstractmethod
def _get_alerts_group_template(self, alert: AlertsGroup, *args, **kwargs):
def _get_alerts_group_template(self, alert: BaseAlertsGroup, *args, **kwargs):
raise NotImplementedError

@abstractmethod
Expand All @@ -101,7 +102,7 @@ def send_alert(
ModelAlertModel,
SourceFreshnessAlertModel,
GroupedByTableAlerts,
AlertsGroup,
BaseAlertsGroup,
],
*args,
**kwargs,
Expand Down Expand Up @@ -132,7 +133,7 @@ def _group_alerts(
Union[TestAlertModel, ModelAlertModel, SourceFreshnessAlertModel]
] = []
for alert in alerts:
if isinstance(alert, AlertsGroup):
if isinstance(alert, BaseAlertsGroup):
flattened_alerts.extend(alert.alerts)
else:
flattened_alerts.append(alert)
Expand Down Expand Up @@ -171,7 +172,7 @@ def send_alerts(
]:
grouped_alerts = self._group_alerts(alerts, group_alerts_threshold)
for alert in grouped_alerts:
if isinstance(alert, AlertsGroup):
if isinstance(alert, BaseAlertsGroup):
sent_successfully = self.send_alert(alert, *args, **kwargs)
for inner_alert in alert.alerts:
yield inner_alert, sent_successfully
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from elementary.clients.slack.slack_message_builder import MessageColor
from elementary.config.config import Config
from elementary.monitor.alerts.alerts_groups import AlertsGroup, GroupedByTableAlerts
from elementary.monitor.alerts.alerts_groups.base_alerts_group import BaseAlertsGroup
from elementary.monitor.alerts.model_alert import ModelAlertModel
from elementary.monitor.alerts.source_freshness_alert import SourceFreshnessAlertModel
from elementary.monitor.alerts.test_alert import TestAlertModel
Expand All @@ -21,9 +22,7 @@
SlackAlertMessageSchema,
)
from elementary.monitor.data_monitoring.alerts.integrations.utils.report_link import (
get_model_runs_link,
get_model_test_runs_link,
get_test_runs_link,
)
from elementary.tracking.tracking_interface import Tracking
from elementary.utils.json_utils import (
Expand Down Expand Up @@ -100,7 +99,7 @@ def _get_alert_template(
ModelAlertModel,
SourceFreshnessAlertModel,
GroupedByTableAlerts,
AlertsGroup,
BaseAlertsGroup,
],
*args,
**kwargs,
Expand Down Expand Up @@ -147,9 +146,8 @@ def _get_dbt_test_template(
),
)

test_runs_report_link = get_test_runs_link(
alert.report_url, alert.elementary_unique_id
)
test_runs_report_link = alert.get_report_link()

if test_runs_report_link:
report_link = self.message_builder.create_context_block(
[
Expand Down Expand Up @@ -313,9 +311,8 @@ def _get_elementary_test_template(
),
)

test_runs_report_link = get_test_runs_link(
alert.report_url, alert.elementary_unique_id
)
test_runs_report_link = alert.get_report_link()

if test_runs_report_link:
report_link = self.message_builder.create_context_block(
[
Expand Down Expand Up @@ -460,9 +457,8 @@ def _get_model_template(
),
)

model_runs_report_link = get_model_runs_link(
alert.report_url, alert.model_unique_id
)
model_runs_report_link = alert.get_report_link()

if model_runs_report_link:
report_link = self.message_builder.create_context_block(
[
Expand Down Expand Up @@ -564,9 +560,8 @@ def _get_snapshot_template(
),
)

model_runs_report_link = get_model_runs_link(
alert.report_url, alert.model_unique_id
)
model_runs_report_link = alert.get_report_link()

if model_runs_report_link:
report_link = self.message_builder.create_context_block(
[
Expand Down Expand Up @@ -652,9 +647,8 @@ def _get_source_freshness_template(
),
)

test_runs_report_link = get_test_runs_link(
alert.report_url, alert.source_freshness_execution_id
)
test_runs_report_link = alert.get_report_link()

if test_runs_report_link:
report_link = self.message_builder.create_context_block(
[
Expand Down Expand Up @@ -964,8 +958,8 @@ def _add_sub_group_details_block(
section_text_rows = [f"*{sub_title}*"]
for alert in alerts:
text = f":{bullet_icon}: {alert.summary}"
if alert.report_url:
text = " - ".join([text, f"<{alert.get_report_link()}|View Details>"])
if report_link := alert.get_report_link():
text = " - ".join([text, f"<{report_link.url}|{report_link.text}>"])
section_text_rows.append(text)

section = self.message_builder.create_text_section_block(
Expand Down Expand Up @@ -1004,19 +998,19 @@ def _get_sub_group_details_blocks(
return details_blocks

def _get_alerts_group_template(
self, alert: AlertsGroup, *args, **kwargs
self, alert: BaseAlertsGroup, *args, **kwargs
) -> SlackAlertMessageSchema:
if len(alert.alerts) >= self.COMPACT_SCHEMA_THRESHOLD:
return self._get_alerts_group_compact_template(alert)
return self._get_alerts_group_compact_template(alert) # type: ignore[arg-type]

self.message_builder.add_message_color(self._get_color(alert.status))
title_blocks = [
self.message_builder.create_header_block(
f"{self._get_display_name(alert.status)}: {alert.summary}"
),
self._get_alert_type_counters_block(alert),
self._get_alert_type_counters_block(alert), # type: ignore[arg-type]
]
details_blocks = self._get_sub_group_details_blocks(alert)
details_blocks = self._get_sub_group_details_blocks(alert) # type: ignore[arg-type]
return SlackAlertMessageSchema(title=title_blocks, details=details_blocks)

@staticmethod
Expand Down Expand Up @@ -1047,7 +1041,7 @@ def _get_fallback_template(
ModelAlertModel,
SourceFreshnessAlertModel,
GroupedByTableAlerts,
AlertsGroup,
BaseAlertsGroup,
],
*args,
**kwargs,
Expand Down Expand Up @@ -1090,10 +1084,10 @@ def _fix_owners_and_subscribers(
ModelAlertModel,
SourceFreshnessAlertModel,
GroupedByTableAlerts,
AlertsGroup,
BaseAlertsGroup,
],
):
if isinstance(alert, AlertsGroup):
if isinstance(alert, BaseAlertsGroup):
for inner_alert in alert.alerts:
inner_alert.owners = self._parse_emails_to_ids(inner_alert.owners)
inner_alert.subscribers = self._parse_emails_to_ids(
Expand All @@ -1110,7 +1104,7 @@ def send_alert(
ModelAlertModel,
SourceFreshnessAlertModel,
GroupedByTableAlerts,
AlertsGroup,
BaseAlertsGroup,
],
*args,
**kwargs,
Expand Down Expand Up @@ -1155,7 +1149,7 @@ def _get_integration_params(
ModelAlertModel,
SourceFreshnessAlertModel,
GroupedByTableAlerts,
AlertsGroup,
BaseAlertsGroup,
],
*args,
**kwargs,
Expand Down
Loading

0 comments on commit 2d280d9

Please sign in to comment.