From 4e2639a1dfe96811cbc8112ccbe7ab289f81100b Mon Sep 17 00:00:00 2001 From: Patrick Troy Date: Fri, 14 Jun 2024 14:53:05 +0100 Subject: [PATCH] add CIN csv pipeline --- liiatools/cin_census_pipeline/pipeline.py | 54 ++-- .../spec/CIN_schema_2016.yml | 3 +- .../spec/CIN_schema_2017.diff.yml | 5 + .../spec/CIN_schema_2021.diff.yml | 87 +++++++ .../spec/CIN_schema_2022.diff.yml | 89 +++++++ .../spec/CIN_schema_2023.diff.yml | 23 ++ .../cin_census_pipeline/spec/__init__.py | 78 +++++- .../cin_census_pipeline/spec/pipeline.yml | 237 ++++++++++++++++++ .../cin_census_pipeline/stream_pipeline.py | 38 ++- 9 files changed, 585 insertions(+), 29 deletions(-) create mode 100644 liiatools/cin_census_pipeline/spec/CIN_schema_2017.diff.yml create mode 100644 liiatools/cin_census_pipeline/spec/CIN_schema_2021.diff.yml create mode 100644 liiatools/cin_census_pipeline/spec/CIN_schema_2022.diff.yml create mode 100644 liiatools/cin_census_pipeline/spec/CIN_schema_2023.diff.yml diff --git a/liiatools/cin_census_pipeline/pipeline.py b/liiatools/cin_census_pipeline/pipeline.py index fb853c52..704a4094 100644 --- a/liiatools/cin_census_pipeline/pipeline.py +++ b/liiatools/cin_census_pipeline/pipeline.py @@ -15,10 +15,14 @@ from liiatools.cin_census_pipeline.spec import ( load_pipeline_config, - load_schema, + load_xml_schema, + load_csv_schema, load_schema_path, ) -from liiatools.cin_census_pipeline.stream_pipeline import task_cleanfile +from liiatools.cin_census_pipeline.stream_pipeline import ( + task_xml_cleanfile, + task_csv_cleanfile, +) from liiatools.cin_census_pipeline.reports import reports @@ -55,13 +59,21 @@ def process_file( uuid = file_locator.meta["uuid"] # Load schema and set on processing metadata - schema = load_schema(year=year) - schema_path = load_schema_path(year=year) + if file_locator.name.endswith(".xml"): + schema = load_xml_schema(year=year) + schema_path = load_schema_path(year=year) + elif file_locator.name.endswith(".csv"): + schema = load_csv_schema(year=year) + metadata = dict(year=year, schema=schema, la_code=la_code) # Normalise the data and export to the session 'cleaned' folder try: - cleanfile_result = task_cleanfile(file_locator, schema, schema_path) + cleanfile_result = ( + task_xml_cleanfile(file_locator, schema, schema_path) + if file_locator.name.endswith(".xml") + else task_csv_cleanfile(file_locator, schema) + ) except Exception as e: logger.exception(f"Error cleaning file {file_locator.name}") errors.append( @@ -155,19 +167,19 @@ def process_session(source_fs: FS, output_fs: FS, la_code: str): report_data.data.export(report_folder, "cin_census_", "csv") # Run report analysis - analysis_data = report_data.data["CIN"] - - expanded_assessment_factors = reports.expanded_assessment_factors(analysis_data) - referral_outcomes = reports.referral_outcomes(analysis_data) - s47_journeys = reports.s47_journeys(analysis_data) - - analysis_data = DataContainer( - { - "factors": expanded_assessment_factors, - "referrals": referral_outcomes, - "S47_journeys": s47_journeys, - } - ) - - analysis_folder = export_folder.makedirs("REPORTS", recreate=True) - analysis_data.export(analysis_folder, "cin_census_", "csv") + # analysis_data = report_data.data["CIN"] + # + # expanded_assessment_factors = reports.expanded_assessment_factors(analysis_data) + # referral_outcomes = reports.referral_outcomes(analysis_data) + # s47_journeys = reports.s47_journeys(analysis_data) + # + # analysis_data = DataContainer( + # { + # "factors": expanded_assessment_factors, + # "referrals": referral_outcomes, + # "S47_journeys": s47_journeys, + # } + # ) + # + # analysis_folder = export_folder.makedirs("REPORTS", recreate=True) + # analysis_data.export(analysis_folder, "cin_census_", "csv") diff --git a/liiatools/cin_census_pipeline/spec/CIN_schema_2016.yml b/liiatools/cin_census_pipeline/spec/CIN_schema_2016.yml index bb99cfea..391f6f28 100644 --- a/liiatools/cin_census_pipeline/spec/CIN_schema_2016.yml +++ b/liiatools/cin_census_pipeline/spec/CIN_schema_2016.yml @@ -38,7 +38,7 @@ column_map: upn: &upn string: "regex" - cell_regex: "[A-Za-z]\d{11}(\d|[A-Za-z])" + cell_regex: "[A-Za-z]\\d{11}(\\d|[A-Za-z])" canbeblank: yes formerupn: *upn @@ -128,7 +128,6 @@ column_map: cppenddate: *date-blank numberofpreviouscpp: - &int-not-blank numeric: type: "integer" min_value: 0 diff --git a/liiatools/cin_census_pipeline/spec/CIN_schema_2017.diff.yml b/liiatools/cin_census_pipeline/spec/CIN_schema_2017.diff.yml new file mode 100644 index 00000000..32d43d37 --- /dev/null +++ b/liiatools/cin_census_pipeline/spec/CIN_schema_2017.diff.yml @@ -0,0 +1,5 @@ +column_map.childprotectionplans: + type: remove + description: Remove column from childprotectionplans + value: + - seensocialworker diff --git a/liiatools/cin_census_pipeline/spec/CIN_schema_2021.diff.yml b/liiatools/cin_census_pipeline/spec/CIN_schema_2021.diff.yml new file mode 100644 index 00000000..3f3eacde --- /dev/null +++ b/liiatools/cin_census_pipeline/spec/CIN_schema_2021.diff.yml @@ -0,0 +1,87 @@ +column_map.factorsidentifiedatassessment.assessmentfactors: + type: modify + description: Updated assessmentfactors codes for 2021 schema + value: + category: + - code: "1A" + name: "Alcohol misuse: concerns about alcohol misuse by the child" + - code: "1B" + name: "Alcohol misuse: concerns about alcohol misuse by the parent(s)/carer(s)" + - code: "1C" + name: "Alcohol misuse: concerns about alcohol misuse by another person living in the household" + - code: "2A" + name: "Drug misuse: concerns about drug misuse by the child" + - code: "2B" + name: "Drug misuse: concerns about drug misuse by the parent(s)/carer(s)" + - code: "2C" + name: "Drug misuse: concerns about drug misuse by another person living in the household" + - code: "3A" + name: "Domestic violence: concerns about the child being the subject of domestic violence" + - code: "3B" + name: "Domestic violence: concerns about the child’s parent(s)/carer(s) being the subject of domestic violence" + - code: "3C" + name: "Domestic violence: concerns about another person living in the household being the subject of domestic violence" + - code: "4A" + name: "Mental health: concerns about the mental health of the child" + - code: "4B" + name: "Mental health: concerns about the mental health of the parent(s)/carer(s)" + - code: "4C" + name: "Mental health: concerns about the mental health of another person in the family/household" + - code: "5A" + name: "Learning disability: concerns about the child’s learning disability" + - code: "5B" + name: "Learning disability: concerns about the parent(s)/carer(s) learning disability" + - code: "5C" + name: "Learning disability: concerns about another person in the family/household’s learning disability" + - code: "6A" + name: "Physical disability or illness: concerns about a physical disability or illness of the child" + - code: "6B" + name: "Physical disability or illness: concerns about a physical disability or illness of the parent(s)/carer(s)" + - code: "6C" + name: "Physical disability or illness: concerns about a physical disability or illness of another person in the family/household" + - code: "7A" + name: "Young carer: concerns that services may be required or the child’s health or development may be impaired due to their caring responsibilities" + - code: "8B" + name: "Privately fostered: concerns that services may be required or the child may be at risk as a privately fostered child - overseas children who intend to return" + - code: "8C" + name: "Privately fostered: concerns that services may be required or the child may be at risk as a privately fostered child - overseas children who intend to stay" + - code: "8D" + name: "Privately fostered: concerns that services may be required or the child may be at risk as a privately fostered child - UK children in educational placements" + - code: "8E" + name: "Privately fostered: concerns that services may be required or the child may be at risk as a privately fostered child - UK children making alternative family arrangements" + - code: "8F" + name: "Privately fostered: concerns that services may be required or the child may be at risk as a privately fostered child - other" + - code: "9A" + name: "UASC: concerns that services may be required or the child may be at risk of harm as an unaccompanied asylum-seeking child" + - code: "10A" + name: "Missing: concerns that services may be required or the child may be at risk of harm due to going/being missing" + - code: "11A" + name: "Child sexual exploitation: concerns that services may be required or the child may be at risk of harm due to child sexual exploitation" + - code: "12A" + name: "Trafficking: concerns that services may be required or the child may be at risk of harm due to trafficking" + - code: "13A" + name: "Gangs: concerns that services may be required or the child may be at risk of harm because of involvement in/with gangs" + - code: "14A" + name: "Socially unacceptable behaviour: concerns that services may be required or the child may be at risk due to their socially unacceptable behaviour" + - code: "15A" + name: "Self-harm: concerns that services may be required or due to suspected/actual self-harming child may be at risk of harm" + - code: "16A" + name: "Abuse or neglect – ‘NEGLECT’: concerns that services may be required or the child may be suffering or likely to suffer significant harm due to abuse or neglect" + - code: "17A" + name: "Abuse or neglect – ‘EMOTIONAL ABUSE’: concerns that services may be required or the child may be suffering or likely to suffer significant harm due to abuse or neglect" + - code: "18B" + name: "Abuse or neglect – ‘PHYSICAL ABUSE’ (child on child): concerns that services may be required or the child may be suffering or likely to suffer significant harm due to abuse or neglect by another child" + - code: "18C" + name: "Abuse or neglect – ‘PHYSICAL ABUSE’ (adult on child): concerns that services may be required or the child may be suffering or likely to suffer significant harm due to abuse or neglect by an adult" + - code: "19B" + name: "Abuse or neglect – ‘SEXUAL ABUSE’ (child on child): concerns that services may be required or the child may be suffering or likely to suffer significant harm due to abuse or neglect by another child" + - code: "19C" + name: "Abuse or neglect – ‘SEXUAL ABUSE’ (adult on child): concerns that services may be required or the child may be suffering or likely to suffer significant harm due to abuse or neglect by an adult" + - code: "20" + name: "Other" + - code: "21" + name: "No factors identified - only use this if there is no evidence of any of the factors above and no further action is being taken" + - code: "22A" + name: "Female genital mutilation (FGM) - concerns that services may be required or the child may be at risk due to female genital mutilation" + - code: "23A" + name: "Abuse linked to faith or belief - concerns that services may be required or the child may be at risk due to abuse linked to faith or belief" \ No newline at end of file diff --git a/liiatools/cin_census_pipeline/spec/CIN_schema_2022.diff.yml b/liiatools/cin_census_pipeline/spec/CIN_schema_2022.diff.yml new file mode 100644 index 00000000..b3999658 --- /dev/null +++ b/liiatools/cin_census_pipeline/spec/CIN_schema_2022.diff.yml @@ -0,0 +1,89 @@ +column_map.factorsidentifiedatassessment.assessmentfactors: + type: modify + description: Updated assessmentfactors codes for 2022 schema + value: + category: + - code: "1A" + name: "Alcohol misuse: concerns about alcohol misuse by the child" + - code: "1B" + name: "Alcohol misuse: concerns about alcohol misuse by the parent(s)/carer(s)" + - code: "1C" + name: "Alcohol misuse: concerns about alcohol misuse by another person living in the household" + - code: "2A" + name: "Drug misuse: concerns about drug misuse by the child" + - code: "2B" + name: "Drug misuse: concerns about drug misuse by the parent(s)/carer(s)" + - code: "2C" + name: "Drug misuse: concerns about drug misuse by another person living in the household" + - code: "3A" + name: "Domestic violence: concerns about the child being the subject of domestic violence" + - code: "3B" + name: "Domestic violence: concerns about the child’s parent(s)/carer(s) being the subject of domestic violence" + - code: "3C" + name: "Domestic violence: concerns about another person living in the household being the subject of domestic violence" + - code: "4A" + name: "Mental health: concerns about the mental health of the child" + - code: "4B" + name: "Mental health: concerns about the mental health of the parent(s)/carer(s)" + - code: "4C" + name: "Mental health: concerns about the mental health of another person in the family/household" + - code: "5A" + name: "Learning disability: concerns about the child’s learning disability" + - code: "5B" + name: "Learning disability: concerns about the parent(s)/carer(s) learning disability" + - code: "5C" + name: "Learning disability: concerns about another person in the family/household’s learning disability" + - code: "6A" + name: "Physical disability or illness: concerns about a physical disability or illness of the child" + - code: "6B" + name: "Physical disability or illness: concerns about a physical disability or illness of the parent(s)/carer(s)" + - code: "6C" + name: "Physical disability or illness: concerns about a physical disability or illness of another person in the family/household" + - code: "7A" + name: "Young carer: concerns that services may be required or the child’s health or development may be impaired due to their caring responsibilities" + - code: "8B" + name: "Privately fostered: concerns that services may be required or the child may be at risk as a privately fostered child - overseas children who intend to return" + - code: "8C" + name: "Privately fostered: concerns that services may be required or the child may be at risk as a privately fostered child - overseas children who intend to stay" + - code: "8D" + name: "Privately fostered: concerns that services may be required or the child may be at risk as a privately fostered child - UK children in educational placements" + - code: "8E" + name: "Privately fostered: concerns that services may be required or the child may be at risk as a privately fostered child - UK children making alternative family arrangements" + - code: "8F" + name: "Privately fostered: concerns that services may be required or the child may be at risk as a privately fostered child - other" + - code: "9A" + name: "UASC: concerns that services may be required or the child may be at risk of harm as an unaccompanied asylum-seeking child" + - code: "10A" + name: "Missing: concerns that services may be required or the child may be at risk of harm due to going/being missing" + - code: "11A" + name: "Child sexual exploitation: concerns that services may be required or the child may be at risk of harm due to child sexual exploitation" + - code: "12A" + name: "Trafficking: concerns that services may be required or the child may be at risk of harm due to trafficking" + - code: "13A" + name: "Gangs: concerns that services may be required or the child may be at risk of harm because of involvement in/with gangs" + - code: "14A" + name: "Socially unacceptable behaviour: concerns that services may be required or the child may be at risk due to their socially unacceptable behaviour" + - code: "15A" + name: "Self-harm: concerns that services may be required or due to suspected/actual self-harming child may be at risk of harm" + - code: "16A" + name: "Abuse or neglect – ‘NEGLECT’: concerns that services may be required or the child may be suffering or likely to suffer significant harm due to abuse or neglect" + - code: "17A" + name: "Abuse or neglect – ‘EMOTIONAL ABUSE’: concerns that services may be required or the child may be suffering or likely to suffer significant harm due to abuse or neglect" + - code: "18B" + name: "Abuse or neglect – ‘PHYSICAL ABUSE’ (child on child): concerns that services may be required or the child may be suffering or likely to suffer significant harm due to abuse or neglect by another child" + - code: "18C" + name: "Abuse or neglect – ‘PHYSICAL ABUSE’ (adult on child): concerns that services may be required or the child may be suffering or likely to suffer significant harm due to abuse or neglect by an adult" + - code: "19B" + name: "Abuse or neglect – ‘SEXUAL ABUSE’ (child on child): concerns that services may be required or the child may be suffering or likely to suffer significant harm due to abuse or neglect by another child" + - code: "19C" + name: "Abuse or neglect – ‘SEXUAL ABUSE’ (adult on child): concerns that services may be required or the child may be suffering or likely to suffer significant harm due to abuse or neglect by an adult" + - code: "20" + name: "Other" + - code: "21" + name: "No factors identified - only use this if there is no evidence of any of the factors above and no further action is being taken" + - code: "22A" + name: "Female genital mutilation (FGM) - concerns that services may be required or the child may be at risk due to female genital mutilation" + - code: "23A" + name: "Abuse linked to faith or belief - concerns that services may be required or the child may be at risk due to abuse linked to faith or belief" + - code: "24A" + name: "Child criminal exploitation: concerns that services may be required or the child may be at risk of harm due to child criminal exploitation" \ No newline at end of file diff --git a/liiatools/cin_census_pipeline/spec/CIN_schema_2023.diff.yml b/liiatools/cin_census_pipeline/spec/CIN_schema_2023.diff.yml new file mode 100644 index 00000000..99d90a89 --- /dev/null +++ b/liiatools/cin_census_pipeline/spec/CIN_schema_2023.diff.yml @@ -0,0 +1,23 @@ +column_map.cindetails.reasonforclosure: + type: modify + description: Updated reasonforclosure codes for 2023 schema + value: + category: + - code: "RC1" + name: "Adopted" + - code: "RC2" + name: "Died" + - code: "RC3" + name: "Child arrangements order" + - code: "RC4" + name: "Special guardianship order" + - code: "RC5" + name: "Transferred to services of another local authority" + - code: "RC6" + name: "Transferred to adult social care services" + - code: "RC7" + name: "Services ceased for any other reason, including child no longer in need" + - code: "RC8" + name: "Case closed after assessment, no further action" + - code: "RC9" + name: "Case closed after assessment, referred to early help" \ No newline at end of file diff --git a/liiatools/cin_census_pipeline/spec/__init__.py b/liiatools/cin_census_pipeline/spec/__init__.py index f670a2f7..26bc3f64 100644 --- a/liiatools/cin_census_pipeline/spec/__init__.py +++ b/liiatools/cin_census_pipeline/spec/__init__.py @@ -1,12 +1,16 @@ from functools import lru_cache from pathlib import Path import yaml - +import re +import logging import xmlschema from pydantic_yaml import parse_yaml_file_as from liiatools.common.data import PipelineConfig +from liiatools.common.spec.__data_schema import DataSchema + +logger = logging.getLogger(__name__) SCHEMA_DIR = Path(__file__).parent @@ -18,7 +22,7 @@ def load_pipeline_config(): @lru_cache -def load_schema(year: int) -> xmlschema.XMLSchema: +def load_xml_schema(year: int) -> xmlschema.XMLSchema: return xmlschema.XMLSchema(SCHEMA_DIR / f"CIN_schema_{year:04d}.xsd") @@ -31,3 +35,73 @@ def load_schema_path(year: int) -> Path: def load_reports(): with open(SCHEMA_DIR / "reports.yml", "rt") as FILE: return yaml.load(FILE, Loader=yaml.FullLoader) + + +@lru_cache +def load_csv_schema(year: int) -> DataSchema: + pattern = re.compile(r"CIN_schema_(\d{4})(\.diff)?\.yml") + + # Build index of all schema files + all_schema_files = list(SCHEMA_DIR.glob("CIN_schema_*.yml")) + all_schema_files.sort() + schema_lookup = [] + for fn in all_schema_files: + match = pattern.match(fn.name) + assert match, f"Unexpected schema name {fn}" + schema_lookup.append((fn, int(match.group(1)), match.group(2) is not None)) + + # Filter only those earlier than the year we're looking for + schema_lookup = [x for x in schema_lookup if x[1] <= year] + + # If we have no schema files, raise an error + if not schema_lookup: + raise ValueError(f"No schema files found for year {year}") + + # Find the latest complete schema + last_complete_schema = [x for x in schema_lookup if not x[2]][-1] + + # Now filter down to only include last complete and any diff files after that + schema_lookup = [x for x in schema_lookup if x[1] >= last_complete_schema[1]] + + # We load the full schema + logger.debug("Loading schema from %s", schema_lookup[0][0]) + full_schema = yaml.safe_load(schema_lookup[0][0].read_text()) + + # Now loop over diff files and apply them + for fn, _, _ in schema_lookup[1:]: + logger.debug("Loading partial schema from %s", fn) + try: + diff = yaml.safe_load(fn.read_text()) + except yaml.YAMLError as e: + raise ValueError(f"Error parsing diff file {fn}") from e + + for key, diff_obj in diff.items(): + diff_type = diff_obj["type"] + assert diff_type in ( + "add", + "modify", + "rename", + "remove", + ), f"Unknown diff type {diff_type}" + path = key.split(".") + parent = full_schema + + if diff_type in ["add", "modify"]: + for item in path[:-1]: + parent = parent[item] + parent[path[-1]] = diff_obj["value"] + + elif diff_type == "rename": + dict = parent[path[0]][path[1]] + dict[diff_obj["value"]] = dict.pop(path[-1]) + + elif diff_type == "remove": + if len(path) == 2: # Remove columns + dict = parent[path[0]][path[1]] + [dict.pop(key) for key in diff_obj["value"]] + elif len(path) == 1: # Remove files + dict = parent[path[0]] + [dict.pop(key) for key in diff_obj["value"]] + + # Now we can parse the full schema into a DataSchema object from the dict + return DataSchema(**full_schema) diff --git a/liiatools/cin_census_pipeline/spec/pipeline.yml b/liiatools/cin_census_pipeline/spec/pipeline.yml index 1754d31f..553d10c7 100644 --- a/liiatools/cin_census_pipeline/spec/pipeline.yml +++ b/liiatools/cin_census_pipeline/spec/pipeline.yml @@ -85,6 +85,243 @@ table_list: - id: LA type: string enrich: la_name + - id: Year + type: numeric + enrich: year + sort: 0 +- id: assessments + retain: + - PAN + columns: + - id: assessmentstableid + type: string + - id: NativeId + type: string + - id: assessmentsorderseqcolumn + type: string + - id: sourceid + type: string + - id: cindetailstableid + type: string + - id: assessmentactualstartdate + type: date + - id: assessmentinternalreviewdate + type: date + - id: assessmentauthorisationdate + type: date + - id: LA + type: string + enrich: la_name + - id: Year + type: numeric + enrich: year + sort: 0 +- id: child + retain: + - PAN + columns: + - id: childtableid + type: string + - id: NativeId + type: string + - id: childorderseqcolumn + type: string + - id: sourceid + type: string + - id: childreninneedtableid + type: string + - id: lachildid + type: string + unique_key: true + enrich: [integer, add_la_suffix] + - id: upn + type: string + - id: formerupn + type: string + - id: dob + type: date + degrade: first_of_month + - id: expecteddob + type: date + degrade: first_of_month + - id: persondeathdate + type: date + degrade: first_of_month + - id: upnunknown + type: category + - id: gender + type: category + - id: ethnicity + type: category + - id: LA + type: string + enrich: la_name + - id: Year + type: numeric + enrich: year + sort: 0 +- id: childprotectionplans + retain: + - PAN + columns: + - id: childprotectionplanstableid + type: string + - id: NativeId + type: string + - id: childprotectionplansorderseqcolumn + type: string + - id: sourceid + type: string + - id: cindetailstableid + type: string + - id: cppstartdate + type: date + - id: cppenddate + type: date + - id: numberofpreviouscpp + type: numeric + - id: seensocialworker + type: category + - id: initialcategoryofabuse + type: category + - id: latestcategoryofabuse + type: category + - id: LA + type: string + enrich: la_name + - id: Year + type: numeric + enrich: year + sort: 0 +- id: cindetails + retain: + - PAN + columns: + - id: cindetailstableid + type: string + - id: NativeId + type: string + - id: cindetailsorderseqcolumn + type: string + - id: sourceid + type: string + - id: childtableid + type: string + - id: cinreferraldate + type: date + - id: cinclosuredate + type: date + - id: dateofinitialcpc + type: date + - id: referralnfa + type: category + - id: referralsource + type: category + - id: primaryneedcode + type: category + - id: reasonforclosure + type: category + - id: LA + type: string + enrich: la_name + - id: Year + type: numeric + enrich: year + sort: 0 +- id: disabilities + retain: + - PAN + columns: + - id: disabilitiestableid + type: string + - id: NativeId + type: string + - id: disabilitiesorderseqcolumn + type: string + - id: sourceid + type: string + - id: childtableid + type: string + - id: disability + type: category + - id: LA + type: string + enrich: la_name + - id: Year + type: numeric + enrich: year + sort: 0 +- id: factorsidentifiedatassessment + retain: + - PAN + columns: + - id: factorsidentifiedatassessmenttableid + type: string + - id: NativeId + type: string + - id: factorsidentifiedatassessmentorderseqcolumn + type: string + - id: sourceid + type: string + - id: assessmentstableid + type: string + - id: assessmentfactors + type: category + - id: LA + type: string + enrich: la_name + - id: Year + type: numeric + enrich: year + sort: 0 +- id: reviews + retain: + - PAN + columns: + - id: reviewstableid + type: string + - id: NativeId + type: string + - id: reviewsorderseqcolumn + type: string + - id: sourceid + type: string + - id: childprotectionplanstableid + type: string + - id: cppreviewdate + type: date + - id: LA + type: string + enrich: la_name + - id: Year + type: numeric + enrich: year + sort: 0 +- id: section47 + retain: + - PAN + columns: + - id: section47tableid + type: string + - id: NativeId + type: string + - id: section47orderseqcolumn + type: string + - id: sourceid + type: string + - id: cindetailstableid + type: string + - id: s47actualstartdate + type: date + - id: initialcpctargetdate + type: date + - id: dateofinitialcpc + type: date + - id: icpcnotrequired + type: category + - id: LA + type: string + enrich: la_name - id: Year type: numeric enrich: year diff --git a/liiatools/cin_census_pipeline/stream_pipeline.py b/liiatools/cin_census_pipeline/stream_pipeline.py index e66bc0c7..da13b040 100644 --- a/liiatools/cin_census_pipeline/stream_pipeline.py +++ b/liiatools/cin_census_pipeline/stream_pipeline.py @@ -1,20 +1,20 @@ from pathlib import Path from xmlschema import XMLSchema -import pandas as pd from sfdata_stream_parser.filters import generic from liiatools.common.data import FileLocator, ProcessResult, DataContainer from liiatools.common import stream_filters as stream_functions +from liiatools.common.spec.__data_schema import DataSchema from liiatools.common.stream_parse import dom_parse -from liiatools.common.stream_pipeline import to_dataframe_xml +from liiatools.common.stream_pipeline import to_dataframe_xml, to_dataframe from liiatools.cin_census_pipeline import stream_record from . import stream_filters as filters -def task_cleanfile( +def task_xml_cleanfile( src_file: FileLocator, schema: XMLSchema, schema_path: Path ) -> ProcessResult: """ @@ -50,9 +50,39 @@ def task_cleanfile( dataset = dataset_holder.value errors = error_holder.value - # dataset = DataContainer({k: pd.DataFrame(v) for k, v in dataset.items()}) dataset = DataContainer( {k: to_dataframe_xml(v, schema_path) for k, v in dataset.items()} ) return ProcessResult(data=dataset, errors=errors) + + +def task_csv_cleanfile(src_file: FileLocator, schema: DataSchema) -> ProcessResult: + # Open & Parse file + stream = stream_functions.tablib_parse(src_file) + + # Configure stream + stream = stream_functions.add_table_name(stream, schema=schema) + stream = stream_functions.inherit_property(stream, ["table_name", "table_spec"]) + stream = stream_functions.match_config_to_cell(stream, schema=schema) + + # Clean stream + stream = stream_functions.log_blanks(stream) + stream = stream_functions.conform_cell_types(stream) + + # Create dataset + stream = stream_functions.collect_cell_values_for_row(stream) + dataset_holder, stream = stream_functions.collect_tables(stream) + error_holder, stream = stream_functions.collect_errors(stream) + + # Consume stream so we know it's been processed + generic.consume(stream) + + dataset = dataset_holder.value + errors = error_holder.value + + dataset = DataContainer( + {k: to_dataframe(v, schema.table[k]) for k, v in dataset.items()} + ) + + return ProcessResult(data=dataset, errors=errors)