Skip to content

Commit 3aadc82

Browse files
committed
test lighter processing past errors threshold
1 parent 14b0c98 commit 3aadc82

File tree

1 file changed

+36
-28
lines changed

1 file changed

+36
-28
lines changed

src/regtech_data_validator/validator.py

Lines changed: 36 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ def _add_validation_metadata(failed_check_fields_df: pl.DataFrame, check: SBLChe
7171

7272

7373
def validate(
74-
schema: pa.DataFrameSchema, submission_df: pl.DataFrame, row_start: int
74+
schema: pa.DataFrameSchema, submission_df: pl.LazyFrame, row_start: int, process_errors: bool
7575
) -> pl.DataFrame:
7676
"""
7777
validate received dataframe with schema and return list of
@@ -95,7 +95,7 @@ def validate(
9595
# `list[dict[str,Any]]`, but it's actually of type `SchemaError`
9696
schema_error: SchemaError
9797

98-
#if process_errors:
98+
# if process_errors:
9999
for schema_error in err.schema_errors:
100100
check = schema_error.check
101101
column_name = schema_error.schema.name
@@ -116,17 +116,22 @@ def validate(
116116
f'Check {check} type on {column_name} column not supported. Must be of type {SBLCheck}'
117117
) from schema_error
118118

119-
schema_error = gather_errors(schema_error)
119+
# schema_error = gather_errors(schema_error)
120+
# check_output: pl.Series = gather_errors(schema_error)
120121

121122
fields = _get_check_fields(check, column_name)
122-
check_output: pl.Series | None = schema_error.check_output
123+
check_output = gather_check_errors(schema_error.check_output)
123124

124125
if check_output is not None:
125126
# Filter data not associated with failed Check, and update index for merging with findings_df
126127
check_output = check_output.with_columns(pl.col('index').add(row_start))
127-
failed_records_df = _filter_valid_records(submission_df, check_output, fields)
128-
failed_record_fields_df = _records_to_fields(failed_records_df)
129-
findings = _add_validation_metadata(failed_record_fields_df, check)
128+
if process_errors:
129+
failed_records_df = _filter_valid_records(submission_df, check_output, fields)
130+
failed_record_fields_df = _records_to_fields(failed_records_df)
131+
findings = _add_validation_metadata(failed_record_fields_df, check)
132+
else:
133+
findings = _add_validation_metadata(check_output, check)
134+
findings = findings.with_columns(pl.lit(check.scope).alias("scope"), pl.lit(check.severity.value).alias("validation_type"))
130135
check_findings.append(findings)
131136
else:
132137
# The above exception handling _should_ prevent this from ever happenin, but...just in case.
@@ -136,8 +141,9 @@ def validate(
136141
if check_findings:
137142
findings_df = pl.concat(check_findings)
138143

139-
updated_df = add_uid(findings_df, submission_df, row_start)
140-
return updated_df
144+
return add_uid(findings_df, submission_df, row_start) if process_errors else findings_df
145+
# updated_df = add_uid(findings_df, submission_df, row_start)
146+
# return updated_df
141147

142148

143149
# Add the uid for the record throwing the error/warning to the error dataframe
@@ -182,7 +188,7 @@ def validate_batch_csv(
182188

183189
if not has_syntax_errors:
184190
register_schema = get_register_schema(context)
185-
validation_results = validate(register_schema, pl.DataFrame({"uid": all_uids}), 0)
191+
validation_results = validate(register_schema, pl.DataFrame({"uid": all_uids}), 0, True)
186192
if not validation_results.is_empty():
187193
validation_results = format_findings(
188194
validation_results,
@@ -199,7 +205,6 @@ def validate_batch_csv(
199205
)
200206
yield results
201207

202-
print("Processing other logic errors")
203208
for validation_results, _ in validate_chunks(
204209
logic_schema, real_path, batch_size, batch_count, max_errors, logic_checks
205210
):
@@ -222,7 +227,7 @@ def validate_chunks(schema, path, batch_size, batch_count, max_errors, checks):
222227
row_start = 0
223228
while batches:
224229
df = pl.concat(batches)
225-
validation_results = validate(schema, df, row_start)
230+
validation_results = validate(schema, df, row_start, process_errors)
226231
if not validation_results.is_empty():
227232

228233
validation_results = format_findings(validation_results, schema.name.value, checks)
@@ -235,9 +240,9 @@ def validate_chunks(schema, path, batch_size, batch_count, max_errors, checks):
235240
findings=validation_results if process_errors else pl.DataFrame(),
236241
phase=schema.name,
237242
)
238-
print(f"Findings height: {validation_results.height}", flush=True)
243+
# print(f"Findings height: {validation_results.height}", flush=True)
239244
total_count += (error_counts.total_count + warning_counts.total_count)
240-
print(f"Total Count: {total_count}", flush=True)
245+
# print(f"Total Count: {total_count}", flush=True)
241246
if total_count > max_errors and process_errors:
242247
process_errors = False
243248
head_count = results.findings.height - (total_count - max_errors)
@@ -284,9 +289,9 @@ def validate_lazy_frame(
284289

285290

286291
def validate_register_level(context: Dict[str, str] | None, all_uids: List[str]):
287-
print("Processing register logic errors")
292+
# print("Processing register logic errors")
288293
register_schema = get_register_schema(context)
289-
validation_results = validate(register_schema, pl.DataFrame({"uid": all_uids}), 0)
294+
validation_results = validate(register_schema, pl.DataFrame({"uid": all_uids}), 0, True)
290295
if not validation_results.is_empty():
291296
validation_results = format_findings(
292297
validation_results,
@@ -301,14 +306,14 @@ def validate_register_level(context: Dict[str, str] | None, all_uids: List[str])
301306
findings=validation_results,
302307
phase=register_schema.name,
303308
)
304-
print(f"Register counts: {error_counts} {warning_counts}", flush=True)
309+
# print(f"Register counts: {error_counts} {warning_counts}", flush=True)
305310
return results
306311

307312

308313
def validate_chunk(schema, df, total_count, row_start, max_errors, process_errors, checks):
309-
print(f"Start UID: {df['uid'][0]}, Last UID: {df['uid'][-1]}", flush=True)
310-
validation_results = validate(schema, df, row_start)
311-
if not validation_results.is_empty():
314+
# print(f"Start UID: {df['uid'][0]}, Last UID: {df['uid'][-1]}", flush=True)
315+
validation_results = validate(schema, df, row_start, process_errors)
316+
if process_errors and not validation_results.is_empty():
312317
validation_results = format_findings(
313318
validation_results, schema.name.value, checks
314319
)
@@ -323,19 +328,19 @@ def validate_chunk(schema, df, total_count, row_start, max_errors, process_error
323328
)
324329

325330
total_count += (error_counts.total_count + warning_counts.total_count)
326-
print(f"Counts: {error_counts} {warning_counts}", flush=True)
331+
# print(f"Counts: {error_counts} {warning_counts}", flush=True)
327332
if total_count > max_errors and process_errors:
328-
print("Reached max errors, adjusting results", flush=True)
333+
# print("Reached max errors, adjusting results", flush=True)
329334
process_errors = False
330335
head_count = results.findings.height - (total_count - max_errors)
331-
print(f"Results height: {results.findings.height}, total count: {total_count}, head count: {head_count}", flush=True)
336+
# print(f"Results height: {results.findings.height}, total count: {total_count}, head count: {head_count}", flush=True)
332337
results.findings = results.findings.head(head_count)
333-
print(f"Results height after heading {results.findings.height}", flush=True)
338+
# print(f"Results height after heading {results.findings.height}", flush=True)
334339

335-
if not results.findings.is_empty():
336-
result = results.findings.group_by("validation_id").agg([pl.count().alias("count")]).sort("validation_id")
337-
result_dict = dict(zip(result["validation_id"], result["count"]))
338-
print(f"{result_dict}\nTotal Results: {results.findings.height}", flush=True)
340+
# if not results.findings.is_empty():
341+
# result = results.findings.group_by("validation_id").agg([pl.count().alias("count")]).sort("validation_id")
342+
# result_dict = dict(zip(result["validation_id"], result["count"]))
343+
# print(f"{result_dict}\nTotal Results: {results.findings.height}", flush=True)
339344

340345

341346
return results, total_count, process_errors
@@ -364,6 +369,9 @@ def get_real_file_path(path):
364369
return f.name
365370
return path
366371

372+
def gather_check_errors(check_output: pl.DataFrame):
373+
return check_output.with_row_index().filter(~pl.col("check_output"))
374+
367375

368376
# This function adds an index column (polars dataframes do not normally have one), and filters out
369377
# any row that did not fail a check.

0 commit comments

Comments
 (0)