From 3aadc82a60e7b44c95acb76055b4964c80d8f788 Mon Sep 17 00:00:00 2001 From: lchen <73617864+lchen-2101@users.noreply.github.com> Date: Mon, 27 Jan 2025 13:07:10 -0800 Subject: [PATCH 1/2] test lighter processing past errors threshold --- src/regtech_data_validator/validator.py | 64 ++++++++++++++----------- 1 file changed, 36 insertions(+), 28 deletions(-) diff --git a/src/regtech_data_validator/validator.py b/src/regtech_data_validator/validator.py index 6a3da0e..1f52099 100644 --- a/src/regtech_data_validator/validator.py +++ b/src/regtech_data_validator/validator.py @@ -71,7 +71,7 @@ def _add_validation_metadata(failed_check_fields_df: pl.DataFrame, check: SBLChe def validate( - schema: pa.DataFrameSchema, submission_df: pl.DataFrame, row_start: int + schema: pa.DataFrameSchema, submission_df: pl.LazyFrame, row_start: int, process_errors: bool ) -> pl.DataFrame: """ validate received dataframe with schema and return list of @@ -95,7 +95,7 @@ def validate( # `list[dict[str,Any]]`, but it's actually of type `SchemaError` schema_error: SchemaError - #if process_errors: + # if process_errors: for schema_error in err.schema_errors: check = schema_error.check column_name = schema_error.schema.name @@ -116,17 +116,22 @@ def validate( f'Check {check} type on {column_name} column not supported. Must be of type {SBLCheck}' ) from schema_error - schema_error = gather_errors(schema_error) + # schema_error = gather_errors(schema_error) + # check_output: pl.Series = gather_errors(schema_error) fields = _get_check_fields(check, column_name) - check_output: pl.Series | None = schema_error.check_output + check_output = gather_check_errors(schema_error.check_output) if check_output is not None: # Filter data not associated with failed Check, and update index for merging with findings_df check_output = check_output.with_columns(pl.col('index').add(row_start)) - failed_records_df = _filter_valid_records(submission_df, check_output, fields) - failed_record_fields_df = _records_to_fields(failed_records_df) - findings = _add_validation_metadata(failed_record_fields_df, check) + if process_errors: + failed_records_df = _filter_valid_records(submission_df, check_output, fields) + failed_record_fields_df = _records_to_fields(failed_records_df) + findings = _add_validation_metadata(failed_record_fields_df, check) + else: + findings = _add_validation_metadata(check_output, check) + findings = findings.with_columns(pl.lit(check.scope).alias("scope"), pl.lit(check.severity.value).alias("validation_type")) check_findings.append(findings) else: # The above exception handling _should_ prevent this from ever happenin, but...just in case. @@ -136,8 +141,9 @@ def validate( if check_findings: findings_df = pl.concat(check_findings) - updated_df = add_uid(findings_df, submission_df, row_start) - return updated_df + return add_uid(findings_df, submission_df, row_start) if process_errors else findings_df + # updated_df = add_uid(findings_df, submission_df, row_start) + # return updated_df # Add the uid for the record throwing the error/warning to the error dataframe @@ -182,7 +188,7 @@ def validate_batch_csv( if not has_syntax_errors: register_schema = get_register_schema(context) - validation_results = validate(register_schema, pl.DataFrame({"uid": all_uids}), 0) + validation_results = validate(register_schema, pl.DataFrame({"uid": all_uids}), 0, True) if not validation_results.is_empty(): validation_results = format_findings( validation_results, @@ -199,7 +205,6 @@ def validate_batch_csv( ) yield results - print("Processing other logic errors") for validation_results, _ in validate_chunks( logic_schema, real_path, batch_size, batch_count, max_errors, logic_checks ): @@ -222,7 +227,7 @@ def validate_chunks(schema, path, batch_size, batch_count, max_errors, checks): row_start = 0 while batches: df = pl.concat(batches) - validation_results = validate(schema, df, row_start) + validation_results = validate(schema, df, row_start, process_errors) if not validation_results.is_empty(): validation_results = format_findings(validation_results, schema.name.value, checks) @@ -235,9 +240,9 @@ def validate_chunks(schema, path, batch_size, batch_count, max_errors, checks): findings=validation_results if process_errors else pl.DataFrame(), phase=schema.name, ) - print(f"Findings height: {validation_results.height}", flush=True) + # print(f"Findings height: {validation_results.height}", flush=True) total_count += (error_counts.total_count + warning_counts.total_count) - print(f"Total Count: {total_count}", flush=True) + # print(f"Total Count: {total_count}", flush=True) if total_count > max_errors and process_errors: process_errors = False head_count = results.findings.height - (total_count - max_errors) @@ -284,9 +289,9 @@ def validate_lazy_frame( def validate_register_level(context: Dict[str, str] | None, all_uids: List[str]): - print("Processing register logic errors") + # print("Processing register logic errors") register_schema = get_register_schema(context) - validation_results = validate(register_schema, pl.DataFrame({"uid": all_uids}), 0) + validation_results = validate(register_schema, pl.DataFrame({"uid": all_uids}), 0, True) if not validation_results.is_empty(): validation_results = format_findings( validation_results, @@ -301,14 +306,14 @@ def validate_register_level(context: Dict[str, str] | None, all_uids: List[str]) findings=validation_results, phase=register_schema.name, ) - print(f"Register counts: {error_counts} {warning_counts}", flush=True) + # print(f"Register counts: {error_counts} {warning_counts}", flush=True) return results def validate_chunk(schema, df, total_count, row_start, max_errors, process_errors, checks): - print(f"Start UID: {df['uid'][0]}, Last UID: {df['uid'][-1]}", flush=True) - validation_results = validate(schema, df, row_start) - if not validation_results.is_empty(): + # print(f"Start UID: {df['uid'][0]}, Last UID: {df['uid'][-1]}", flush=True) + validation_results = validate(schema, df, row_start, process_errors) + if process_errors and not validation_results.is_empty(): validation_results = format_findings( validation_results, schema.name.value, checks ) @@ -323,19 +328,19 @@ def validate_chunk(schema, df, total_count, row_start, max_errors, process_error ) total_count += (error_counts.total_count + warning_counts.total_count) - print(f"Counts: {error_counts} {warning_counts}", flush=True) + # print(f"Counts: {error_counts} {warning_counts}", flush=True) if total_count > max_errors and process_errors: - print("Reached max errors, adjusting results", flush=True) + # print("Reached max errors, adjusting results", flush=True) process_errors = False head_count = results.findings.height - (total_count - max_errors) - print(f"Results height: {results.findings.height}, total count: {total_count}, head count: {head_count}", flush=True) + # print(f"Results height: {results.findings.height}, total count: {total_count}, head count: {head_count}", flush=True) results.findings = results.findings.head(head_count) - print(f"Results height after heading {results.findings.height}", flush=True) + # print(f"Results height after heading {results.findings.height}", flush=True) - if not results.findings.is_empty(): - result = results.findings.group_by("validation_id").agg([pl.count().alias("count")]).sort("validation_id") - result_dict = dict(zip(result["validation_id"], result["count"])) - print(f"{result_dict}\nTotal Results: {results.findings.height}", flush=True) + # if not results.findings.is_empty(): + # result = results.findings.group_by("validation_id").agg([pl.count().alias("count")]).sort("validation_id") + # result_dict = dict(zip(result["validation_id"], result["count"])) + # print(f"{result_dict}\nTotal Results: {results.findings.height}", flush=True) return results, total_count, process_errors @@ -364,6 +369,9 @@ def get_real_file_path(path): return f.name return path +def gather_check_errors(check_output: pl.DataFrame): + return check_output.with_row_index().filter(~pl.col("check_output")) + # This function adds an index column (polars dataframes do not normally have one), and filters out # any row that did not fail a check. From d6bd140cfa4289b28f8d49a795bec0ba052603b3 Mon Sep 17 00:00:00 2001 From: lchen <73617864+lchen-2101@users.noreply.github.com> Date: Mon, 27 Jan 2025 16:00:41 -0800 Subject: [PATCH 2/2] keep track of record count --- src/regtech_data_validator/validation_results.py | 1 + src/regtech_data_validator/validator.py | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/regtech_data_validator/validation_results.py b/src/regtech_data_validator/validation_results.py index 40a6352..f9262f2 100644 --- a/src/regtech_data_validator/validation_results.py +++ b/src/regtech_data_validator/validation_results.py @@ -26,3 +26,4 @@ class ValidationResults(object): is_valid: bool findings: pl.DataFrame phase: ValidationPhase + record_count: int = 0 diff --git a/src/regtech_data_validator/validator.py b/src/regtech_data_validator/validator.py index 1f52099..d9a5bc5 100644 --- a/src/regtech_data_validator/validator.py +++ b/src/regtech_data_validator/validator.py @@ -310,7 +310,7 @@ def validate_register_level(context: Dict[str, str] | None, all_uids: List[str]) return results -def validate_chunk(schema, df, total_count, row_start, max_errors, process_errors, checks): +def validate_chunk(schema, df: pl.DataFrame, total_count, row_start, max_errors, process_errors, checks): # print(f"Start UID: {df['uid'][0]}, Last UID: {df['uid'][-1]}", flush=True) validation_results = validate(schema, df, row_start, process_errors) if process_errors and not validation_results.is_empty(): @@ -320,6 +320,7 @@ def validate_chunk(schema, df, total_count, row_start, max_errors, process_error error_counts, warning_counts = get_scope_counts(validation_results) results = ValidationResults( + record_count=df.height, error_counts=error_counts, warning_counts=warning_counts, is_valid=((error_counts.total_count + warning_counts.total_count) == 0),