Skip to content

Automated Versioning of Checks#1044

Open
STEFANOVIVAS wants to merge 25 commits intodatabrickslabs:mainfrom
STEFANOVIVAS:feature/versioning_check_rules
Open

Automated Versioning of Checks#1044
STEFANOVIVAS wants to merge 25 commits intodatabrickslabs:mainfrom
STEFANOVIVAS:feature/versioning_check_rules

Conversation

@STEFANOVIVAS
Copy link
Contributor

@STEFANOVIVAS STEFANOVIVAS commented Feb 24, 2026

Changes

Adding automated versioning of rules.

Linked issues

Resolves #672

Tests

  • manually tested
  • added unit tests
  • added integration tests
  • added end-to-end tests
  • added performance tests

@STEFANOVIVAS STEFANOVIVAS requested a review from a team as a code owner February 24, 2026 00:50
@STEFANOVIVAS STEFANOVIVAS requested review from gergo-databricks and removed request for a team February 24, 2026 00:50
@alexott
Copy link
Contributor

alexott commented Feb 26, 2026

Code review

Found 2 issues:

  1. spark.catalog.tableExists re-introduced in the save path, regressing the fix from PR Use WorkspaceClient to check table existence #1035 which replaced it with ws.tables.get() because spark.catalog.tableExists fails for tables with special characters and is blocked in Spark Declarative Pipelines. The existing load path correctly continues to use ws.tables.get(), making the two methods inconsistent.

rule_set_fingerprint = first_row[0] if first_row else None
if self.spark.catalog.tableExists(config.location) and rule_set_fingerprint is not None:
if (
not self.spark.read.table(config.location)

  1. In _load_checks_from_lakebase, the "load latest version" path fetches the most recent rule_set_fingerprint via a subquery filtered by run_config_name, but the outer SELECT only filters by rule_set_fingerprint without also filtering by run_config_name. If two different configs share identical check content (same fingerprint), the query returns rows from both configs mixed together.

select(table.c.rule_set_fingerprint)
.where(table.c.run_config_name == config.run_config_name)
.order_by(table.c.created_at.desc())
.limit(1)
.scalar_subquery()
)
stmt = select(table).where(table.c.rule_set_fingerprint == latest_rule_set_fingerprint)
with engine.connect() as conn:
result = conn.execute(stmt)
checks = result.mappings().all()

🤖 Generated with Claude Code

- If this code review was useful, please react with 👍. Otherwise, react with 👎.

@alexott
Copy link
Contributor

alexott commented Feb 26, 2026

Additional code review findings (lower confidence)

Found 4 additional issues (scored below primary threshold):

  1. Lakebase deduplication dead code when mode="overwrite" — in _save_checks_to_lakebase, the overwrite DELETE runs first, then the fingerprint existence check queries for a row that was just deleted. The SELECT ... WHERE rule_set_fingerprint == ... always returns empty in overwrite mode, so the early-return skip can never trigger. The existence check should run before the delete.

)
self._ensure_rule_version_columns_exist(conn, config)
logger.info("Rule version columns exist or added.")
if config.mode == "overwrite":
delete_stmt = delete(table).where(table.c.run_config_name == config.run_config_name)
result = conn.execute(delete_stmt)
logger.info(f"Deleted {result.rowcount} existing checks for run_config_name '{config.run_config_name}'")
normalized_checks = self._normalize_checks(checks, config)
rule_set_fingerprint = normalized_checks[0].get("rule_set_fingerprint")
exists_rule_set = (
select(table.c.rule_set_fingerprint)
.where(
table.c.run_config_name == config.run_config_name,
table.c.rule_set_fingerprint == rule_set_fingerprint,
)
.limit(1)
)
if conn.execute(exists_rule_set).first():
logger.info(f"Checks with rule_set_fingerprint {rule_set_fingerprint} already exist — skipping")
return
insert_stmt = insert(table)

  1. rule_set_fingerprint = 'None' on schema migration — when latest_row.rule_set_fingerprint is Python None (pre-existing rows whose column was backfilled as NULL via mergeSchema), the f-string produces the Spark SQL predicate rule_set_fingerprint = 'None' — the literal string "None", not SQL NULL. This silently returns zero rows for any table that has pre-versioning data, breaking load after a schema migration.

if not rule_set_fingerprint:
max_created_at = filtered_df.agg(F.max("created_at")).collect()[0][0]
latest_row = filtered_df.where(F.col("created_at") == max_created_at).first()
if latest_row:
rule_set_fingerprint = latest_row.rule_set_fingerprint
filtered_df = filtered_df.where(f"rule_set_fingerprint = '{rule_set_fingerprint}'")
check_rows = filtered_df.collect()

  1. Docstring contradicts the new default — both TableChecksStorageConfig and LakebaseChecksStorageConfig change mode from "overwrite" to "append", but LakebaseChecksStorageConfig's docstring still explicitly states (default is 'overwrite'). This is a breaking behavioral change for existing callers that relied on the default.

run_config_name: Name of the run configuration to use for checks (default is 'default').
mode: The mode for writing checks to a table (e.g., 'append' or 'overwrite'). The *overwrite* mode
only replaces checks for the specific run config and not all checks in the table (default is 'overwrite').
rule_set_fingerprint: Optional SHA-256 fingerprint of the rule set to load.
When provided, loads rules matching this specific fingerprint instead of the latest batch.
When None (default), loads the latest batch.
"""
location: str
instance_name: str | None = None
client_id: str | None = None
port: str = "5432"
run_config_name: str = "default"
mode: str = "append"
rule_set_fingerprint: str | None = None

  1. SQL injection via f-string for rule_set_fingerprint — the new filter in TableChecksStorageHandler.save embeds the user-supplied rule_set_fingerprint config field directly into a Spark SQL string. When the fingerprint is computed internally it is always a safe hex-SHA256, but since TableChecksStorageConfig.rule_set_fingerprint is an externally-supplied field, a crafted value can bypass the filter. The Column API (F.col("rule_set_fingerprint") == rule_set_fingerprint) avoids this, consistent with the rest of the codebase.

first_row = rules_df.select("rule_set_fingerprint").first()
rule_set_fingerprint = first_row[0] if first_row else None
if self.spark.catalog.tableExists(config.location) and rule_set_fingerprint is not None:
if (
not self.spark.read.table(config.location)
.filter(
f"run_config_name = '{config.run_config_name}' and rule_set_fingerprint = '{rule_set_fingerprint}'"
)
.isEmpty()
):

🤖 Generated with Claude Code

- If this code review was useful, please react with 👍. Otherwise, react with 👎.

"""
check_rows = df.where(f"run_config_name = '{run_config_name}'").collect()
filtered_df = df.where(f"run_config_name = '{run_config_name}'")
if filtered_df.isEmpty():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filtered_df.isEmpty() triggers a full Spark job just to check for zero rows, before you do the actual .collect() a few lines below. If the DataFrame is empty, collect() already returns [] naturally. This early check doubles the number of Spark actions in the common (non-empty) path.

Remove the guard entirely, or if an early log is important, keep it but note the extra cost:

check_rows = filtered_df.collect()
if not check_rows:
    logger.info(f"No checks found for run_config_name '{run_config_name}'.")
    return []

filtered_df = filtered_df.where((F.col("rule_set_fingerprint") == rule_set_fingerprint)&(F.col("run_config_name") == run_config_name))

else:
rule_set_fingerprint=filtered_df.select(F.col("rule_set_fingerprint")).where(F.col("run_config_name") == run_config_name).orderBy(F.col("created_at").desc()).limit(1).collect()[0][0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Three problems on this line:

1. Crash risk: .collect()[0][0] raises IndexError if created_at is NULL for all rows or filtered_df is empty at this point.

2. Redundant filter: .where(F.col("run_config_name") == run_config_name) is applied a second time here even though filtered_df was already filtered by run_config_name earlier (line 374).

3. Line length: This is 200+ characters — well over the project's style limit.

Suggested rewrite:

result = (
    filtered_df
    .select(F.col("rule_set_fingerprint"))
    .orderBy(F.col("created_at").desc())
    .limit(1)
    .collect()
)
if not result:
    return []
rule_set_fingerprint = result[0][0]

"criticality": check_dict.get("criticality", "error"),
"function": check_dict.get("check", {}).get("function"),
"arguments": check_dict.get("check", {}).get("arguments"),
"filter": check_dict.get("filter"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for_each_column is not included in fingerprint_data. Two rules that differ only in for_each_column (e.g. applying the same check to different column lists) will produce the same fingerprint — incorrect deduplication and missed version changes.

fingerprint_data = {
    ...
    "filter": check_dict.get("filter"),
    "for_each_column": check_dict.get("check", {}).get("for_each_column"),  # add this
}

logger.info("Rule version columns exist or added.")

normalized_checks = self._normalize_checks(checks, config)
rule_set_fingerprint = normalized_checks[0].get("rule_set_fingerprint")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

normalized_checks[0] raises IndexError if checks is empty. The same issue exists in the same pattern in the TableChecksStorageHandler.save (where rules_df.select("rule_set_fingerprint").first() is slightly safer, but still asymmetric).

rule_set_fingerprint = normalized_checks[0].get("rule_set_fingerprint") if normalized_checks else None

Or guard earlier:

if not checks:
    logger.info("No checks to save — skipping.")
    return


config_load = TableChecksStorageConfig(
location=table_name,
rule_set_fingerprint="e27b1748e670c8bceeb8449ac494f22bd80a934a30a3c86919547de56790bc00",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hardcoded hash values in tests are brittle. If the fingerprinting algorithm, input data, or canonical serialisation ever changes, this test will silently fail (wrong fingerprint → empty result → assertion passes on the wrong data) or raise a confusing error.

Compute the expected fingerprint from the same input data instead:

from databricks.labs.dqx.checks_serializer import compute_rule_set_fingerprint

config_load = TableChecksStorageConfig(
    location=table_name,
    rule_set_fingerprint=compute_rule_set_fingerprint(INPUT_CHECKS[:1]),
)

Same applies to the equivalent hardcoded hash in test_save_and_load_checks_from_lakebase_table.py.

@mwojtyczka mwojtyczka changed the title Feature/versioning check rules Automated versioning of checks Mar 3, 2026
@mwojtyczka mwojtyczka changed the title Automated versioning of checks Automated Versioning of Checks Mar 3, 2026
@STEFANOVIVAS STEFANOVIVAS requested a review from mwojtyczka March 4, 2026 02:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[FEATURE]: Versioning of rules

3 participants