-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
light processing on over threshold errors, and keep track of record count #330
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -71,7 +71,7 @@ def _add_validation_metadata(failed_check_fields_df: pl.DataFrame, check: SBLChe | |
|
||
|
||
def validate( | ||
schema: pa.DataFrameSchema, submission_df: pl.DataFrame, row_start: int | ||
schema: pa.DataFrameSchema, submission_df: pl.LazyFrame, row_start: int, process_errors: bool | ||
) -> pl.DataFrame: | ||
""" | ||
validate received dataframe with schema and return list of | ||
|
@@ -95,7 +95,7 @@ def validate( | |
# `list[dict[str,Any]]`, but it's actually of type `SchemaError` | ||
schema_error: SchemaError | ||
|
||
#if process_errors: | ||
# if process_errors: | ||
for schema_error in err.schema_errors: | ||
check = schema_error.check | ||
column_name = schema_error.schema.name | ||
|
@@ -116,17 +116,22 @@ def validate( | |
f'Check {check} type on {column_name} column not supported. Must be of type {SBLCheck}' | ||
) from schema_error | ||
|
||
schema_error = gather_errors(schema_error) | ||
# schema_error = gather_errors(schema_error) | ||
# check_output: pl.Series = gather_errors(schema_error) | ||
|
||
fields = _get_check_fields(check, column_name) | ||
check_output: pl.Series | None = schema_error.check_output | ||
check_output = gather_check_errors(schema_error.check_output) | ||
|
||
if check_output is not None: | ||
# Filter data not associated with failed Check, and update index for merging with findings_df | ||
check_output = check_output.with_columns(pl.col('index').add(row_start)) | ||
failed_records_df = _filter_valid_records(submission_df, check_output, fields) | ||
failed_record_fields_df = _records_to_fields(failed_records_df) | ||
findings = _add_validation_metadata(failed_record_fields_df, check) | ||
if process_errors: | ||
failed_records_df = _filter_valid_records(submission_df, check_output, fields) | ||
failed_record_fields_df = _records_to_fields(failed_records_df) | ||
findings = _add_validation_metadata(failed_record_fields_df, check) | ||
else: | ||
findings = _add_validation_metadata(check_output, check) | ||
findings = findings.with_columns(pl.lit(check.scope).alias("scope"), pl.lit(check.severity.value).alias("validation_type")) | ||
check_findings.append(findings) | ||
else: | ||
# The above exception handling _should_ prevent this from ever happenin, but...just in case. | ||
|
@@ -136,8 +141,9 @@ def validate( | |
if check_findings: | ||
findings_df = pl.concat(check_findings) | ||
|
||
updated_df = add_uid(findings_df, submission_df, row_start) | ||
return updated_df | ||
return add_uid(findings_df, submission_df, row_start) if process_errors else findings_df | ||
# updated_df = add_uid(findings_df, submission_df, row_start) | ||
# return updated_df | ||
|
||
|
||
# Add the uid for the record throwing the error/warning to the error dataframe | ||
|
@@ -182,7 +188,7 @@ def validate_batch_csv( | |
|
||
if not has_syntax_errors: | ||
register_schema = get_register_schema(context) | ||
validation_results = validate(register_schema, pl.DataFrame({"uid": all_uids}), 0) | ||
validation_results = validate(register_schema, pl.DataFrame({"uid": all_uids}), 0, True) | ||
if not validation_results.is_empty(): | ||
validation_results = format_findings( | ||
validation_results, | ||
|
@@ -199,7 +205,6 @@ def validate_batch_csv( | |
) | ||
yield results | ||
|
||
print("Processing other logic errors") | ||
for validation_results, _ in validate_chunks( | ||
logic_schema, real_path, batch_size, batch_count, max_errors, logic_checks | ||
): | ||
|
@@ -222,7 +227,7 @@ def validate_chunks(schema, path, batch_size, batch_count, max_errors, checks): | |
row_start = 0 | ||
while batches: | ||
df = pl.concat(batches) | ||
validation_results = validate(schema, df, row_start) | ||
validation_results = validate(schema, df, row_start, process_errors) | ||
if not validation_results.is_empty(): | ||
|
||
validation_results = format_findings(validation_results, schema.name.value, checks) | ||
|
@@ -235,9 +240,9 @@ def validate_chunks(schema, path, batch_size, batch_count, max_errors, checks): | |
findings=validation_results if process_errors else pl.DataFrame(), | ||
phase=schema.name, | ||
) | ||
print(f"Findings height: {validation_results.height}", flush=True) | ||
# print(f"Findings height: {validation_results.height}", flush=True) | ||
total_count += (error_counts.total_count + warning_counts.total_count) | ||
print(f"Total Count: {total_count}", flush=True) | ||
# print(f"Total Count: {total_count}", flush=True) | ||
if total_count > max_errors and process_errors: | ||
process_errors = False | ||
head_count = results.findings.height - (total_count - max_errors) | ||
|
@@ -284,9 +289,9 @@ def validate_lazy_frame( | |
|
||
|
||
def validate_register_level(context: Dict[str, str] | None, all_uids: List[str]): | ||
print("Processing register logic errors") | ||
# print("Processing register logic errors") | ||
register_schema = get_register_schema(context) | ||
validation_results = validate(register_schema, pl.DataFrame({"uid": all_uids}), 0) | ||
validation_results = validate(register_schema, pl.DataFrame({"uid": all_uids}), 0, True) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know this will probably never happen, but say a bank submits a sblar with over 1mil entries and they accidentally use the same UID for every row. We'd end up with register errors beyond our max limit, as well moving on to processing other logic errors beyond the max limit. Doesn't need to take place here but think we should look at treating these the same. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe, although this part isn't doing it on the submitted lf/df; it's on the information we already gathered from the previous pass (syntax errors), so it's a lot less intensive, and it's currently not counting towards the limit. |
||
if not validation_results.is_empty(): | ||
validation_results = format_findings( | ||
validation_results, | ||
|
@@ -301,20 +306,21 @@ def validate_register_level(context: Dict[str, str] | None, all_uids: List[str]) | |
findings=validation_results, | ||
phase=register_schema.name, | ||
) | ||
print(f"Register counts: {error_counts} {warning_counts}", flush=True) | ||
# print(f"Register counts: {error_counts} {warning_counts}", flush=True) | ||
return results | ||
|
||
|
||
def validate_chunk(schema, df, total_count, row_start, max_errors, process_errors, checks): | ||
print(f"Start UID: {df['uid'][0]}, Last UID: {df['uid'][-1]}", flush=True) | ||
validation_results = validate(schema, df, row_start) | ||
if not validation_results.is_empty(): | ||
def validate_chunk(schema, df: pl.DataFrame, total_count, row_start, max_errors, process_errors, checks): | ||
# print(f"Start UID: {df['uid'][0]}, Last UID: {df['uid'][-1]}", flush=True) | ||
validation_results = validate(schema, df, row_start, process_errors) | ||
if process_errors and not validation_results.is_empty(): | ||
validation_results = format_findings( | ||
validation_results, schema.name.value, checks | ||
) | ||
|
||
error_counts, warning_counts = get_scope_counts(validation_results) | ||
results = ValidationResults( | ||
record_count=df.height, | ||
error_counts=error_counts, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the intended use of this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. gonna guess u meant to comment on the |
||
warning_counts=warning_counts, | ||
is_valid=((error_counts.total_count + warning_counts.total_count) == 0), | ||
|
@@ -323,19 +329,19 @@ def validate_chunk(schema, df, total_count, row_start, max_errors, process_error | |
) | ||
|
||
total_count += (error_counts.total_count + warning_counts.total_count) | ||
print(f"Counts: {error_counts} {warning_counts}", flush=True) | ||
# print(f"Counts: {error_counts} {warning_counts}", flush=True) | ||
if total_count > max_errors and process_errors: | ||
print("Reached max errors, adjusting results", flush=True) | ||
# print("Reached max errors, adjusting results", flush=True) | ||
process_errors = False | ||
head_count = results.findings.height - (total_count - max_errors) | ||
print(f"Results height: {results.findings.height}, total count: {total_count}, head count: {head_count}", flush=True) | ||
# print(f"Results height: {results.findings.height}, total count: {total_count}, head count: {head_count}", flush=True) | ||
results.findings = results.findings.head(head_count) | ||
print(f"Results height after heading {results.findings.height}", flush=True) | ||
# print(f"Results height after heading {results.findings.height}", flush=True) | ||
|
||
if not results.findings.is_empty(): | ||
result = results.findings.group_by("validation_id").agg([pl.count().alias("count")]).sort("validation_id") | ||
result_dict = dict(zip(result["validation_id"], result["count"])) | ||
print(f"{result_dict}\nTotal Results: {results.findings.height}", flush=True) | ||
# if not results.findings.is_empty(): | ||
# result = results.findings.group_by("validation_id").agg([pl.count().alias("count")]).sort("validation_id") | ||
# result_dict = dict(zip(result["validation_id"], result["count"])) | ||
# print(f"{result_dict}\nTotal Results: {results.findings.height}", flush=True) | ||
|
||
|
||
return results, total_count, process_errors | ||
|
@@ -364,6 +370,9 @@ def get_real_file_path(path): | |
return f.name | ||
return path | ||
|
||
def gather_check_errors(check_output: pl.DataFrame): | ||
return check_output.with_row_index().filter(~pl.col("check_output")) | ||
|
||
|
||
# This function adds an index column (polars dataframes do not normally have one), and filters out | ||
# any row that did not fail a check. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I say we take out any of the none validate lf/parquet stuff because I think we've proven the batch csv approach isn't the way to go. Either here or in a new story.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's deal with it in another story; I think I replicated parts of that code, and abstracted out to what we have now for lazyframes; the thought is to still be able to do cli csv, but it would then fall into using the lazyframe batch processing.