Skip to content
Closed
3 changes: 0 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@ __pycache__/
# C extensions
*.so

# .csv files
*.csv

# Distribution / packaging
.Python
build/
Expand Down
199 changes: 175 additions & 24 deletions keep/api/bl/enrichments_bl.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@
import json5
from fastapi import HTTPException
from sqlalchemy import func
from sqlalchemy.orm import defer
from sqlalchemy_utils import UUIDType
from sqlmodel import Session, select

from keep.api.bl.mapping_rule_matcher import MappingRuleMatcher
from keep.api.core.config import config
from keep.api.core.db import batch_enrich, get_incidents_by_alert_fingerprint
from keep.api.core.db import enrich_entity as enrich_alert_db, get_last_alert_by_fingerprint, \
Expand Down Expand Up @@ -101,7 +103,7 @@ def __init__(self, tenant_id: str, db: Session | None = None):
self.db_session = None
self.elastic_client = None

def run_mapping_rule_by_id(self, rule_id: int, alert_id: UUID) -> AlertDto:
def run_mapping_rule_by_id(self, rule_id: int, alert_id: UUID) -> bool:
rule = get_mapping_rule_by_id(self.tenant_id, rule_id, session=self.db_session)
if not rule:
raise HTTPException(status_code=404, detail="Mapping rule not found")
Expand Down Expand Up @@ -316,6 +318,7 @@ def run_mapping_rules(self, alert: AlertDto) -> AlertDto:
self.db_session.query(MappingRule)
.filter(MappingRule.tenant_id == self.tenant_id)
.filter(MappingRule.disabled == False)
.options(defer(MappingRule.rows))
.order_by(MappingRule.priority.desc())
.all()
)
Expand Down Expand Up @@ -419,14 +422,37 @@ def check_if_match_and_enrich(self, alert: AlertDto, rule: MappingRule) -> bool:
enrichments.pop("id", None)
elif rule.type == "csv":
if not rule.is_multi_level:
for row in rule.rows:
if any(
self._check_matcher(alert, row, matcher)
for matcher in rule.matchers
# Create an alert values dictionary for SQL-based matching
alert_dict = self._extract_alert_values_for_matchers(
alert, rule.matchers
)

# Use direct SQL-based matching to find matching row
try:
dialect_name = None
if (
self.db_session
and hasattr(self.db_session, "bind")
and self.db_session.bind
):
dialect_name = self.db_session.bind.dialect.name

matcher = MappingRuleMatcher(
dialect_name=dialect_name, session=self.db_session
)

self._add_enrichment_log(
f"Using SQL-based matching with dialect: {dialect_name or 'fallback'}",
"debug",
{"rule_id": rule.id},
)

matched_row = matcher.get_matching_row(rule, alert_dict)

if matched_row:
# Extract enrichments from the matched row
enrichments = {}
for key, value in row.items():
for key, value in matched_row.items():
if value is not None:
is_matcher = False
for matcher in rule.matchers:
Expand All @@ -439,7 +465,33 @@ def check_if_match_and_enrich(self, alert: AlertDto, rule: MappingRule) -> bool:
if isinstance(value, str):
value = value.strip()
enrichments[key.strip()] = value
break
except Exception as e:
self._add_enrichment_log(
f"Error using SQL matcher, falling back to in-memory iteration: {str(e)}",
"warning",
{"rule_id": rule.id},
)
# Fall back to the original in-memory matching
if rule.rows:
for row in rule.rows:
if any(
self._check_matcher(alert, row, matcher)
for matcher in rule.matchers
):
# Extract enrichments from the matched row
enrichments = {}
for key, value in row.items():
if value is not None:
is_matcher = False
for matcher in rule.matchers:
if key in matcher:
is_matcher = True
break
if not is_matcher:
if isinstance(value, str):
value = value.strip()
enrichments[key.strip()] = value
break
else:
# Multi-level mapping
# We can assume that the matcher is only a single key. i.e., [['customers']]
Expand All @@ -451,23 +503,93 @@ def check_if_match_and_enrich(self, alert: AlertDto, rule: MappingRule) -> bool:
else:
if isinstance(matcher_values, str):
matcher_values = json5.loads(matcher_values)
for matcher in matcher_values:
if rule.prefix_to_remove:
matcher = matcher.replace(rule.prefix_to_remove, "")
for row in rule.rows:
if self._check_explicit_match(row, key, matcher):
if rule.new_property_name not in enrichments:
enrichments[rule.new_property_name] = {}

if matcher not in enrichments[rule.new_property_name]:
enrichments[rule.new_property_name][matcher] = {}

for enrichment_key, enrichment_value in row.items():
if enrichment_value is not None:
enrichments[rule.new_property_name][matcher][
enrichment_key.strip()
] = enrichment_value.strip()
break

# Try SQL-based multi-level matching
try:
dialect_name = None
if (
self.db_session
and hasattr(self.db_session, "bind")
and self.db_session.bind
):
dialect_name = self.db_session.bind.dialect.name

matcher = MappingRuleMatcher(
dialect_name=dialect_name, session=self.db_session
)

self._add_enrichment_log(
f"Using SQL-based multi-level matching with dialect: {dialect_name or 'fallback'}",
"debug",
{"rule_id": rule.id},
)

# Convert matcher_values to list of strings if needed
string_values = []
for val in matcher_values:
if isinstance(val, str):
string_values.append(val)
else:
string_values.append(str(val))

matches = matcher.get_matching_rows_multi_level(
rule, key, string_values
)

if matches:
if rule.new_property_name not in enrichments:
enrichments[rule.new_property_name] = {}

for matcher_key, match_data in matches.items():
enrichments[rule.new_property_name][
matcher_key
] = match_data
except Exception as e:
self._add_enrichment_log(
f"Error using SQL multi-level matcher, falling back to in-memory iteration: {str(e)}",
"warning",
{"rule_id": rule.id},
)
# Fall back to the original implementation
if rule.rows:
for matcher in matcher_values:
matcher_str = (
str(matcher)
if not isinstance(matcher, str)
else matcher
)
if rule.prefix_to_remove:
matcher_str = matcher_str.replace(
rule.prefix_to_remove, ""
)
for row in rule.rows:
if self._check_explicit_match(
row, key, matcher_str
):
if rule.new_property_name not in enrichments:
enrichments[rule.new_property_name] = {}

if (
matcher_str
not in enrichments[rule.new_property_name]
):
enrichments[rule.new_property_name][
matcher_str
] = {}

for (
enrichment_key,
enrichment_value,
) in row.items():
if enrichment_value is not None:
enrichments[rule.new_property_name][
matcher_str
][enrichment_key.strip()] = (
enrichment_value.strip()
if isinstance(enrichment_value, str)
else enrichment_value
)
break
if enrichments:
# Enrich the alert with the matched data from the row
for key, matcher in enrichments.items():
Expand Down Expand Up @@ -962,3 +1084,32 @@ def check_incident_resolution(self, alert: Alert | AlertDto):
incident.status = IncidentStatus.RESOLVED.value
self.db_session.add(incident)
self.db_session.commit()

def _extract_alert_values_for_matchers(
self, alert: AlertDto, matchers: list[list[str]]
) -> dict:
"""
Extract alert values that match the matchers for SQL-based matching.

Args:
alert: AlertDto to extract values from
matchers: List of matcher rules

Returns:
Dictionary of alert values needed for matching
"""
alert_values = {}

# Get unique attributes across all matchers
all_attributes = set()
for matcher_group in matchers:
for attr in matcher_group:
all_attributes.add(attr.strip())

# Extract values for each attribute
for attr in all_attributes:
value = get_nested_attribute(alert, attr)
if value is not None:
alert_values[attr] = value

return alert_values
Loading
Loading