diff --git a/liiatools/cin_census_pipeline/_reports_referrals.py b/liiatools/cin_census_pipeline/_reports_referrals.py deleted file mode 100644 index 1228e6d6..00000000 --- a/liiatools/cin_census_pipeline/_reports_referrals.py +++ /dev/null @@ -1,54 +0,0 @@ -import numpy as np -import pandas as pd - - -def referral_outcomes(data: pd.DataFrame) -> pd.DataFrame: - s17_dates = data[data["AssessmentActualStartDate"].notna()][ - ["LAchildID", "CINreferralDate", "AssessmentActualStartDate"] - ].drop_duplicates() - s17_dates["days_to_s17"] = ( - s17_dates["CINreferralDate"] - s17_dates["AssessmentActualStartDate"] - ) - s17_dates["days_to_s17"] = s17_dates["days_to_s17"].dt.days - - # Remove any that are less than zero - it shouldn't happen, but just in case - s17_dates = s17_dates[s17_dates["days_to_s17"] >= 0] - - s47_dates = data[data["S47ActualStartDate"].notna()][ - ["LAchildID", "CINreferralDate", "S47ActualStartDate"] - ].drop_duplicates() - s47_dates["days_to_s47"] = ( - s47_dates["CINreferralDate"] - s47_dates["S47ActualStartDate"] - ) - s47_dates["days_to_s47"] = s47_dates["days_to_s47"].dt.days - - # Remove any that are less than zero - it shouldn't happen, but just in case - s47_dates = s47_dates[s47_dates["days_to_s47"] >= 0] - - merged = data[["LAchildID", "CINreferralDate"]].drop_duplicates() - merged = merged.merge(s17_dates, how="left", on=["LAchildID", "CINreferralDate"]) - merged = merged.merge(s47_dates, how="left", on=["LAchildID", "CINreferralDate"]) - - neither = ( - merged["AssessmentActualStartDate"].isna() & merged["S47ActualStartDate"].isna() - ) - s17_set = ( - merged["AssessmentActualStartDate"].notna() - & merged["S47ActualStartDate"].isna() - ) - s47_set = ( - merged["AssessmentActualStartDate"].isna() - & merged["S47ActualStartDate"].notna() - ) - both_set = ( - merged["AssessmentActualStartDate"].notna() - & merged["S47ActualStartDate"].notna() - ) - - merged["referral_outcome"] = np.select( - [neither, s17_set, s47_set, both_set], - ["NFA", "S17", "S47", "BOTH"], - default=None, - ) - - return merged diff --git a/liiatools/cin_census_pipeline/_reports_s47_journeys.py b/liiatools/cin_census_pipeline/_reports_s47_journeys.py deleted file mode 100644 index e69de29b..00000000 diff --git a/liiatools/cin_census_pipeline/pipeline.py b/liiatools/cin_census_pipeline/pipeline.py index b61c4be6..3d4664a5 100644 --- a/liiatools/cin_census_pipeline/pipeline.py +++ b/liiatools/cin_census_pipeline/pipeline.py @@ -1,11 +1,11 @@ import logging -from fs import open_fs from fs.base import FS from liiatools.common import pipeline as pl from liiatools.common.archive import DataframeArchive from liiatools.common.constants import ProcessNames, SessionNames from liiatools.common.data import ( + DataContainer, ErrorContainer, FileLocator, PipelineConfig, @@ -18,8 +18,8 @@ load_schema, load_schema_path, ) - from liiatools.cin_census_pipeline.stream_pipeline import task_cleanfile +from liiatools.cin_census_pipeline.reports import reports logger = logging.getLogger() @@ -58,7 +58,7 @@ def process_file( schema = load_schema(year=year) schema_path = load_schema_path(year=year) metadata = dict(year=year, schema=schema, la_code=la_code) - + # Normalise the data and export to the session 'cleaned' folder try: cleanfile_result = task_cleanfile(file_locator, schema, schema_path) @@ -154,9 +154,16 @@ def process_session(source_fs: FS, output_fs: FS, la_code: str): report_folder = export_folder.makedirs(report, recreate=True) report_data.data.export(report_folder, "cin_census_", "csv") + # Run report analysis + analysis_data = report_data.data["CIN"] -process_session( - open_fs(r"C:\Users\patrick.troy\OneDrive - Social Finance Ltd\Work\LIIA\LIIA tests\CIN\pipeline\input"), - open_fs(r"C:\Users\patrick.troy\OneDrive - Social Finance Ltd\Work\LIIA\LIIA tests\CIN\pipeline\output"), - la_code="BAR" + expanded_assessment_factors = reports.expanded_assessment_factors(analysis_data) + referral_outcomes = reports.referral_outcomes(analysis_data) + s47_journeys = reports.s47_journeys(analysis_data) + + analysis_data = DataContainer( + {"factors": expanded_assessment_factors, "referrals": referral_outcomes, "S47_journeys": s47_journeys} ) + + analysis_folder = export_folder.makedirs("REPORTS", recreate=True) + analysis_data.export(analysis_folder, "cin_census_", "csv") diff --git a/liiatools/cin_census_pipeline/reports/__init__.py b/liiatools/cin_census_pipeline/reports/__init__.py new file mode 100644 index 00000000..920ddef6 --- /dev/null +++ b/liiatools/cin_census_pipeline/reports/__init__.py @@ -0,0 +1,43 @@ +import pandas as pd +import numpy as np + + +def _time_between_date_series( + later_date: pd.Series, + earlier_date: pd.Series, + years: bool = False, + days: bool = False, +) -> pd.Series: + """ + Returns the number of days between two date series. + + :param later_date: The later date. + :param earlier_date: The earlier date. + :param years: If True, returns the number of years between the two dates. The default is False. + :param days: If True, returns the number of days between the two dates. The default is True. + :returns: The number of days between the dates. + """ + time = later_date - earlier_date + time = time.dt.days + + if days: + time = time.astype("Int64") + return time + + elif years: + time = (time / 365).apply(np.floor) + time = time.astype("Int64") + return time + + +def _filter_events(data: pd.DataFrame, day_column: str, max_days: int) -> pd.DataFrame: + """ + Filters the data to only include events that occur within the specified maximum days. + + :param data: The data to filter. + :param day_column: The column containing the date. + :param max_days: The maximum number of days to include. + :returns: The filtered data. + """ + data = data[((data[day_column] <= max_days) & (data[day_column] >= 0))] + return data diff --git a/liiatools/cin_census_pipeline/_reports_assessment_factors.py b/liiatools/cin_census_pipeline/reports/_reports_assessment_factors.py similarity index 100% rename from liiatools/cin_census_pipeline/_reports_assessment_factors.py rename to liiatools/cin_census_pipeline/reports/_reports_assessment_factors.py diff --git a/liiatools/cin_census_pipeline/reports/_reports_referrals.py b/liiatools/cin_census_pipeline/reports/_reports_referrals.py new file mode 100644 index 00000000..4bafaee9 --- /dev/null +++ b/liiatools/cin_census_pipeline/reports/_reports_referrals.py @@ -0,0 +1,74 @@ +import numpy as np +import pandas as pd + +from liiatools.cin_census_pipeline.spec import load_reports +from liiatools.cin_census_pipeline.reports import _time_between_date_series, _filter_events + + +def referral_outcomes(data: pd.DataFrame) -> pd.DataFrame: + """ + Add referral outcomes to the data based on assessment and S47 dates. These can be; + NFA, S17, S47 or BOTH + + :param data: The data calculate referral outcomes. + :returns: The data with referral outcomes attached. + """ + reports_config = load_reports() + + s17_dates = data[data["AssessmentActualStartDate"].notna()][ + ["LAchildID", "CINreferralDate", "AssessmentActualStartDate"] + ].drop_duplicates() + + s17_dates["days_to_s17"] = _time_between_date_series( + s17_dates["CINreferralDate"], s17_dates["AssessmentActualStartDate"], days=True + ) + + # Only assessments within config-specified period following referral are valid + s17_dates = _filter_events( + s17_dates, "days_to_s17", max_days=reports_config["ref_assessment"] + ) + + s47_dates = data[data["S47ActualStartDate"].notna()][ + ["LAchildID", "CINreferralDate", "S47ActualStartDate"] + ].drop_duplicates() + + s47_dates["days_to_s47"] = _time_between_date_series( + s47_dates["CINreferralDate"], s47_dates["S47ActualStartDate"], days=True + ) + + # Only S47s within config-specified period following referral are valid + s47_dates = _filter_events( + s47_dates, "days_to_s47", max_days=reports_config["ref_assessment"] + ) + + merged = data[["LAchildID", "CINreferralDate", "PersonBirthDate"]].drop_duplicates() + merged = merged.merge(s17_dates, how="left", on=["LAchildID", "CINreferralDate"]) + merged = merged.merge(s47_dates, how="left", on=["LAchildID", "CINreferralDate"]) + + neither = ( + merged["AssessmentActualStartDate"].isna() & merged["S47ActualStartDate"].isna() + ) + s17_set = ( + merged["AssessmentActualStartDate"].notna() + & merged["S47ActualStartDate"].isna() + ) + s47_set = ( + merged["AssessmentActualStartDate"].isna() + & merged["S47ActualStartDate"].notna() + ) + both_set = ( + merged["AssessmentActualStartDate"].notna() + & merged["S47ActualStartDate"].notna() + ) + + merged["referral_outcome"] = np.select( + [neither, s17_set, s47_set, both_set], + ["NFA", "S17", "S47", "BOTH"], + default=None, + ) + + merged["Age at referral"] = _time_between_date_series( + merged["CINreferralDate"], merged["PersonBirthDate"], years=True + ) + + return merged diff --git a/liiatools/cin_census_pipeline/reports/_reports_s47_journeys.py b/liiatools/cin_census_pipeline/reports/_reports_s47_journeys.py new file mode 100644 index 00000000..c184a39d --- /dev/null +++ b/liiatools/cin_census_pipeline/reports/_reports_s47_journeys.py @@ -0,0 +1,107 @@ +import pandas as pd +import numpy as np +from datetime import datetime + +from liiatools.cin_census_pipeline.spec import load_reports +from liiatools.cin_census_pipeline.reports import ( + _time_between_date_series, +) + + +def s47_journeys(data: pd.DataFrame) -> pd.DataFrame: + """ + Creates an output that can generate a Sankey diagram of outcomes from S47 events + + :param data: The data to calculate S47 event outcomes. + :return: The data with S47 outcomes attached. + """ + reports_config = load_reports() + + s47_dates = data[data["S47ActualStartDate"].notna()][ + ["LAchildID", "CINreferralDate", "S47ActualStartDate"] + ].drop_duplicates() + + cpp_dates = data[data["CPPstartDate"].notna()][ + ["LAchildID", "CINreferralDate", "CPPstartDate"] + ].drop_duplicates() + + merged = data[ + [ + "LAchildID", + "CINreferralDate", + "PersonBirthDate", + "DateOfInitialCPC", + "Year", + ] + ].drop_duplicates() + + merged = merged.merge(s47_dates, how="left", on=["LAchildID", "CINreferralDate"]) + merged = merged.merge(cpp_dates, how="left", on=["LAchildID", "CINreferralDate"]) + + merged["icpc_to_cpp"] = _time_between_date_series( + merged["CPPstartDate"], merged["DateOfInitialCPC"], days=True + ) + + merged["s47_to_cpp"] = _time_between_date_series( + merged["CPPstartDate"], merged["S47ActualStartDate"], days=True + ) + + # Only keep logically consistent events (as defined in config variables) + merged = merged[ + ( + (merged["icpc_to_cpp"] >= 0) + & (merged["icpc_to_cpp"] <= reports_config["icpc_cpp_days"]) + ) + | ( + (merged["s47_to_cpp"] >= 0) + & (merged["s47_to_cpp"] <= reports_config["s47_cpp_days"]) + ) + ] + + # Dates used to define window for S47 events where outcome may not be known because CIN Census is too recent + for y in merged["Year"]: + merged["cin_census_close"] = datetime(int(y), 3, 31) + + merged["s47_max_date"] = merged["cin_census_close"] - pd.Timedelta( + reports_config["s47_day_limit"] + ) + merged["icpc_max_date"] = merged["cin_census_close"] - pd.Timedelta( + reports_config["icpc_day_limit"] + ) + + merged["Source"] = "S47 strategy discussion" + + icpc = merged["DateOfInitialCPC"].notna() + + cpp_start = merged["DateOfInitialCPC"].isna() & merged["CPPstartDate"].notna() + + # TODO: Check if this (and the default=No ICPC or CPP) ever actually comes up + # (I think they're removed when checking for logical events) + tbd = merged["S47ActualStartDate"] >= merged["s47_max_date"] + + merged["Destination"] = np.select( + [icpc, cpp_start, tbd], + ["ICPC", "CPP Start", "TBD - S47 too recent"], + default="No ICPC or CPP", + ) + + icpc_destination = merged[merged["Destination"] == "ICPC"] + icpc_destination["Source"] = "ICPC" + + cpp_start_2 = icpc_destination["CPPstartDate"].notna() + + tbd_2 = icpc_destination["DateOfInitialCPC"] >= icpc_destination["icpc_max_date"] + + icpc_destination["Destination"] = np.select( + [cpp_start_2, tbd_2], + ["CPP Start", "TBD - ICPC too recent"], + default="No CPP", + ) + + s47_journey = pd.concat([merged, icpc_destination]) + + s47_journey["Age at S47"] = _time_between_date_series( + s47_journey["S47ActualStartDate"], s47_journey["PersonBirthDate"], years=True + ) + + return s47_journey diff --git a/liiatools/cin_census_pipeline/reports.py b/liiatools/cin_census_pipeline/reports/reports.py similarity index 54% rename from liiatools/cin_census_pipeline/reports.py rename to liiatools/cin_census_pipeline/reports/reports.py index 21822ebd..e3a4752b 100644 --- a/liiatools/cin_census_pipeline/reports.py +++ b/liiatools/cin_census_pipeline/reports/reports.py @@ -1,7 +1,13 @@ from ._reports_assessment_factors import expanded_assessment_factors from ._reports_referrals import referral_outcomes +from ._reports_s47_journeys import s47_journeys +from liiatools.cin_census_pipeline.reports import _time_between_date_series, _filter_events __ALL__ = [ "expanded_assessment_factors", "referral_outcomes", + "s47_journeys", ] + + + diff --git a/liiatools/cin_census_pipeline/spec/__init__.py b/liiatools/cin_census_pipeline/spec/__init__.py index 61375aa2..f670a2f7 100644 --- a/liiatools/cin_census_pipeline/spec/__init__.py +++ b/liiatools/cin_census_pipeline/spec/__init__.py @@ -1,5 +1,6 @@ from functools import lru_cache from pathlib import Path +import yaml import xmlschema @@ -21,5 +22,12 @@ def load_schema(year: int) -> xmlschema.XMLSchema: return xmlschema.XMLSchema(SCHEMA_DIR / f"CIN_schema_{year:04d}.xsd") +@lru_cache def load_schema_path(year: int) -> Path: return Path(SCHEMA_DIR, f"CIN_schema_{year:04d}.xsd") + + +@lru_cache +def load_reports(): + with open(SCHEMA_DIR / "reports.yml", "rt") as FILE: + return yaml.load(FILE, Loader=yaml.FullLoader) diff --git a/liiatools/cin_census_pipeline/spec/agg.yml b/liiatools/cin_census_pipeline/spec/agg.yml deleted file mode 100644 index 7a47611d..00000000 --- a/liiatools/cin_census_pipeline/spec/agg.yml +++ /dev/null @@ -1,40 +0,0 @@ -dates: - - Date - - CINreferralDate - - CINclosureDate - - DateOfInitialCPC - - CINPlanStartDate - - CINPlanEndDate - - S47ActualStartDate - - InitialCPCtarget - - AssessmentActualStartDate - - AssessmentInternalReviewDate - - AssessmentAuthorisationDate - - CPPstartDate - - CPPendDate - - PersonBirthDate - - ExpectedPersonBirthDate - - PersonDeathDate - -sort_order: - - InitialCPCtarget - - AssessmentInternalReviewDate - - YEAR - -dedup: - - Date - - Type - - LAchildID - - ReferralSource - -# Assumptions used in producing analytical outputs of CIN Census - -ref_assessment: 30 - -icpc_cpp_days: 45 - -s47_cpp_days: 60 - -s47_day_limit: "60 days" - -icpc_day_limit: "45 days" \ No newline at end of file diff --git a/liiatools/cin_census_pipeline/spec/reports.yml b/liiatools/cin_census_pipeline/spec/reports.yml new file mode 100644 index 00000000..589cf43c --- /dev/null +++ b/liiatools/cin_census_pipeline/spec/reports.yml @@ -0,0 +1,6 @@ +# Assumptions used in producing analytical outputs of CIN Census +ref_assessment: 30 +icpc_cpp_days: 45 +s47_cpp_days: 60 +s47_day_limit: "60 days" +icpc_day_limit: "45 days" \ No newline at end of file diff --git a/liiatools/cin_census_pipeline/spec/samples/cin-2022.xml b/liiatools/cin_census_pipeline/spec/samples/cin-2022.xml index 7c00c564..6dbbf70d 100644 --- a/liiatools/cin_census_pipeline/spec/samples/cin-2022.xml +++ b/liiatools/cin_census_pipeline/spec/samples/cin-2022.xml @@ -39,7 +39,7 @@ N4 1971-02-27 RC1 - 1970-12-06 + 1970-02-06 1970-06-03 1970-06-22 @@ -54,9 +54,9 @@ 1971-01-26 - 1970-06-02 + 1970-02-02 1970-06-23 - 1970-06-17 + 1970-02-11 false false diff --git a/liiatools/cin_census_pipeline/stream_filters.py b/liiatools/cin_census_pipeline/stream_filters.py index efe648a9..aa435eae 100644 --- a/liiatools/cin_census_pipeline/stream_filters.py +++ b/liiatools/cin_census_pipeline/stream_filters.py @@ -6,7 +6,11 @@ from sfdata_stream_parser.filters.generic import pass_event, streamfilter from liiatools.common.spec.__data_schema import Column, Numeric -from liiatools.common.stream_filters import _create_category_spec, _create_regex_spec, _create_numeric_spec +from liiatools.common.stream_filters import ( + _create_category_spec, + _create_regex_spec, + _create_numeric_spec, +) logger = logging.getLogger(__name__) @@ -38,18 +42,14 @@ def add_column_spec(event, schema_path: Path): if config_type in ["upntype"]: column_spec.string = "regex" column_spec.cell_regex = _create_regex_spec(config_type, schema_path) - if ( - config_type == "{http://www.w3.org/2001/XMLSchema}date" - ): + if config_type == "{http://www.w3.org/2001/XMLSchema}date": column_spec.date = "%Y-%m-%d" - if ( - config_type == "{http://www.w3.org/2001/XMLSchema}dateTime" - ): + if config_type == "{http://www.w3.org/2001/XMLSchema}dateTime": column_spec.date = "%Y-%m-%dT%H:%M:%S" if config_type in [ - "{http://www.w3.org/2001/XMLSchema}integer", - "{http://www.w3.org/2001/XMLSchema}gYear" - ]: + "{http://www.w3.org/2001/XMLSchema}integer", + "{http://www.w3.org/2001/XMLSchema}gYear", + ]: column_spec.numeric = Numeric(type="integer") if config_type == "{http://www.w3.org/2001/XMLSchema}string": column_spec.string = "alphanumeric" diff --git a/liiatools/cin_census_pipeline/stream_record.py b/liiatools/cin_census_pipeline/stream_record.py index 02bc1f9e..15efc87f 100644 --- a/liiatools/cin_census_pipeline/stream_record.py +++ b/liiatools/cin_census_pipeline/stream_record.py @@ -12,7 +12,7 @@ class CINEvent(events.ParseEvent): @staticmethod def name(): return "CIN" - + pass @@ -145,14 +145,6 @@ def _maybe_list(value): - If the input value is None, the function returns an empty list. - If the input value is already a list, it is returned as is. - For any other value, the function wraps it in a list and returns it. - - Examples: - >>> _maybe_list(None) - [] - >>> _maybe_list(42) - [42] - >>> _maybe_list([1, 2, 3]) - [1, 2, 3] """ if value is None: value = [] @@ -161,7 +153,7 @@ def _maybe_list(value): return value -def cin_event(record, property, event_name=None): +def cin_event(record, property, event_name=None, export_headers=__EXPORT_HEADERS): """ Create an event record based on the given property from the original record. @@ -169,36 +161,29 @@ def cin_event(record, property, event_name=None): `property`. If the property exists and is non-empty, it creates a new dictionary with keys "Date" and "Type" where "Date" is the value of the specified property and "Type" is the name of the event. The new dictionary is then filtered based - on the keys specified in the global variable `__EXPORT_HEADERS`. + on the keys specified in export_headers. Parameters: - record (dict): The original record containing various key-value pairs. - property (str): The key in the `record` dictionary to look for. - event_name (str, optional): The name of the event. Defaults to the value of `property` if not specified. + - export_headers (list, optional): A list of keys to include in the returned dictionary. + Defaults to `__EXPORT_HEADERS`. Returns: - - tuple: A single-element tuple containing the new filtered dictionary, or an empty tuple if `property` is not found or its value is empty. - - Example: - >>> record = {'Name': 'John', 'DOB': '2000-01-01'} - >>> property = 'DOB' - >>> event_name = 'Date of Birth' - >>> cin_event(record, property, event_name) - ({'Date': '2000-01-01', 'Type': 'Date of Birth'},) - - >>> cin_event(record, 'UnknownProperty') - () + - tuple: A single-element tuple containing the new filtered dictionary, or an empty tuple if `property` is not + found or its value is empty. Note: - - Assumes that a global variable `__EXPORT_HEADERS` exists that specifies which keys to include in the returned dictionary. - - The reason this returns a tuple is that when called, it is used with 'yield from' which expects an iterable. An empty tuple results in no records being yielded. + - The reason this returns a tuple is that when called, it is used with 'yield from' which expects an iterable. + An empty tuple results in no records being yielded. """ if event_name is None: event_name = property value = record.get(property) if value: new_record = {**record, "Date": value, "Type": event_name} - return {k: new_record.get(k) for k in __EXPORT_HEADERS}, + return {k: new_record.get(k) for k in export_headers}, return () @@ -216,13 +201,11 @@ def event_to_records(event: CINEvent) -> Iterator[dict]: - Iterator[dict]: An iterator that yields dictionaries representing individual event records. Behavior: - - The function first creates a 'child' dictionary by merging the "ChildIdentifiers" and "ChildCharacteristics" from the original event record. - - It then processes various sub-records within the event, including "CINdetails", "Assessments", "CINPlanDates", "Section47", and "ChildProtectionPlans". + - The function first creates a 'child' dictionary by merging the "ChildIdentifiers" and "ChildCharacteristics" + from the original event record. + - It then processes various sub-records within the event, including "CINdetails", "Assessments", "CINPlanDates", + "Section47", and "ChildProtectionPlans". - Each sub-record is further processed and emitted as an individual event record. - - Examples: - >>> list(event_to_records(CINEvent(...))) - [{'field1': 'value1', 'field2': 'value2'}, ...] """ record = event.record child = { diff --git a/liiatools/common/stream_record.py b/liiatools/common/stream_record.py index 1e5b4f24..97e34a4d 100644 --- a/liiatools/common/stream_record.py +++ b/liiatools/common/stream_record.py @@ -1,6 +1,5 @@ from sfdata_stream_parser import events from sfdata_stream_parser.collectors import xml_collector -from sfdata_stream_parser.filters.generic import generator_with_value class HeaderEvent(events.ParseEvent): @@ -44,22 +43,3 @@ def text_collector(stream): if isinstance(event, events.TextNode) and event.cell: data_dict.setdefault(current_element, []).append(event.cell) return _reduce_dict(data_dict) - - -@generator_with_value -def export_table(stream): - """ - Collects all the records into a dictionary of lists of rows - - This filter requires that the stream has been processed by `message_collector` first - - :param stream: An iterator of events from message_collector - :yield: All events - :return: A dictionary of lists of rows, keyed by record name - """ - dataset = {} - for event in stream: - event_type = type(event) - dataset.setdefault(event_type.name(), []).append(event.as_dict()["record"]) - yield event - return dataset diff --git a/liiatools/csww_pipeline/stream_pipeline.py b/liiatools/csww_pipeline/stream_pipeline.py index 24059f55..113e0bfb 100644 --- a/liiatools/csww_pipeline/stream_pipeline.py +++ b/liiatools/csww_pipeline/stream_pipeline.py @@ -7,7 +7,6 @@ from liiatools.common.data import FileLocator, ProcessResult, DataContainer from liiatools.common import stream_filters as stream_functions from liiatools.common.stream_parse import dom_parse -from liiatools.common.stream_record import export_table from liiatools.csww_pipeline import stream_record @@ -42,7 +41,7 @@ def task_cleanfile( # Create dataset error_holder, stream = stream_functions.collect_errors(stream) stream = stream_record.message_collector(stream) - dataset_holder, stream = export_table(stream) + dataset_holder, stream = stream_record.export_table(stream) # Consume stream so we know it's been processed generic.consume(stream) diff --git a/liiatools/csww_pipeline/stream_record.py b/liiatools/csww_pipeline/stream_record.py index 6b5bb2a5..b6346297 100644 --- a/liiatools/csww_pipeline/stream_record.py +++ b/liiatools/csww_pipeline/stream_record.py @@ -1,6 +1,7 @@ from more_itertools import peekable from sfdata_stream_parser import events +from sfdata_stream_parser.filters.generic import generator_with_value from liiatools.common.stream_record import text_collector, HeaderEvent @@ -46,3 +47,22 @@ def message_collector(stream): yield LALevelEvent(record=lalevel_record) else: next(stream) + + +@generator_with_value +def export_table(stream): + """ + Collects all the records into a dictionary of lists of rows + + This filter requires that the stream has been processed by `message_collector` first + + :param stream: An iterator of events from message_collector + :yield: All events + :return: A dictionary of lists of rows, keyed by record name + """ + dataset = {} + for event in stream: + event_type = type(event) + dataset.setdefault(event_type.name(), []).append(event.as_dict()["record"]) + yield event + return dataset diff --git a/tests/cin_census/test_agg_processes.py b/tests/cin_census/test_agg_processes.py deleted file mode 100644 index 00907ed8..00000000 --- a/tests/cin_census/test_agg_processes.py +++ /dev/null @@ -1,92 +0,0 @@ -# import pandas as pd -# from liiatools.datasets.cin_census.lds_cin_la_agg import process as agg_process -# -# -# def test_deduplicate(): -# test_df_1 = pd.DataFrame( -# { -# "Column 1": [1, 2], -# "Column 2": ["a", "a"], -# "Column 3": ["Answer 1", "Answer 2"], -# } -# ) -# sort_order = ["Column 1"] -# dedup_1 = ["Column 2"] -# output_df_1 = agg_process.deduplicate(test_df_1, sort_order, dedup_1) -# assert len(output_df_1) == 1 -# assert output_df_1["Column 3"][0] == "Answer 2" -# dedup_2 = ["Column 1", "Column 2"] -# output_df_2 = agg_process.deduplicate(test_df_1, sort_order, dedup_2) -# assert len(output_df_2) == 2 -# assert output_df_2["Column 3"][0] == "Answer 2" -# sort_order_2 = ["Column 2"] -# output_df_3 = agg_process.deduplicate(test_df_1, sort_order_2, dedup_2) -# assert len(output_df_3) == 2 -# assert output_df_3["Column 3"][0] == "Answer 1" -# -# -# def test_remove_old_data(): -# this_year = pd.to_datetime("today").year -# month = pd.to_datetime("today").month -# last_year = this_year - 1 -# two_ya = this_year - 2 -# three_ya = this_year - 3 -# test_df_1 = pd.DataFrame({"YEAR": [this_year, last_year, two_ya, three_ya]}) -# output_df_1 = agg_process.remove_old_data(test_df_1, 1) -# if month <= 6: -# assert len(output_df_1) == 3 -# else: -# assert len(output_df_1) == 2 -# -# -# def test_filter_flatfile(): -# test_df_1 = pd.DataFrame( -# { -# "Type": ["Type 1", "Type 2"], -# "Type 1 data": ["a", None], -# "Type 2 data": [None, "b"], -# } -# ) -# output_1 = agg_process.filter_flatfile(test_df_1, "Type 1") -# assert len(output_1) == 1 -# assert output_1["Type 1 data"][0] == "a" -# output_2 = agg_process.filter_flatfile(test_df_1, "Type 2") -# assert len(output_2) == 1 -# assert output_2["Type 2 data"][1] == "b" -# -# -# def test_split_factors(): -# test_df_1 = pd.DataFrame({"Factors": ["a,b", "b,c"]}) -# assert test_df_1.shape == (2, 1) -# assert list(test_df_1.columns) == ["Factors"] -# assert test_df_1.iloc[0, 0] == "a,b" -# output_1 = agg_process.split_factors(test_df_1) -# assert output_1.shape == (2, 4) -# assert list(output_1.columns) == ["Factors", "a", "b", "c"] -# assert output_1.iloc[0, 1] == 1 -# -# -# def test_time_between_date_series(): -# test_df_1 = pd.DataFrame( -# { -# "date_series_1": ["2022-01-01", "2022-01-01"], -# "date_series_2": ["2021-01-01", "2020-01-01"], -# }, -# dtype="datetime64[ns]", -# ) -# output_series_1 = agg_process._time_between_date_series( -# test_df_1["date_series_1"], test_df_1["date_series_2"], years=1 -# ) -# assert list(output_series_1) == [1, 2] -# output_series_2 = agg_process._time_between_date_series( -# test_df_1["date_series_1"], test_df_1["date_series_2"], days=1 -# ) -# assert list(output_series_2) == [365, 731] -# -# -# def test_filter_event_series(): -# test_df_1 = pd.DataFrame({"day_series": [1, -1, 30]}) -# output_1 = agg_process._filter_event_series(test_df_1, "day_series", 25) -# output_2 = agg_process._filter_event_series(test_df_1, "day_series", 30) -# assert output_1.shape == (1, 1) -# assert output_2.shape == (2, 1) diff --git a/tests/cin_census/test_reports.py b/tests/cin_census/test_reports.py index 6c6d2634..f078a72c 100644 --- a/tests/cin_census/test_reports.py +++ b/tests/cin_census/test_reports.py @@ -1,8 +1,13 @@ import pandas as pd +import numpy as np +from datetime import date -from liiatools.cin_census_pipeline.reports import ( +from liiatools.cin_census_pipeline.reports.reports import ( expanded_assessment_factors, - referral_outcomes + referral_outcomes, + _time_between_date_series, + _filter_events, + s47_journeys, ) @@ -20,4 +25,183 @@ def test_assessment_factors(): df = expanded_assessment_factors(df) - print(df) + assert list(df.A.values) == [1, 0, 0, 1, 1] + assert list(df.B.values) == [1, 0, 0, 0, 0] + assert list(df.C.values) == [1, 0, 0, 0, 0] + assert list(df.D.values) == [0, 0, 0, 0, 1] + + +def test_time_between_date_series(): + test_df_1 = pd.DataFrame( + [ + [date(2022, 1, 1), date(2021, 1, 1)], + [date(2022, 1, 1), date(2020, 1, 1)], + ], + columns=["date_series_1", "date_series_2"], + ) + + output_series_1 = _time_between_date_series( + test_df_1["date_series_1"], test_df_1["date_series_2"], years=True + ) + assert list(output_series_1) == [1, 2] + + output_series_2 = _time_between_date_series( + test_df_1["date_series_1"], test_df_1["date_series_2"], days=True + ) + assert list(output_series_2) == [365, 731] + + +def test_filter_events(): + test_df_1 = pd.DataFrame( + [ + 1, + -1, + 30, + ], + columns=["day_series"], + ) + + output_1 = _filter_events(test_df_1, "day_series", 25) + output_2 = _filter_events(test_df_1, "day_series", 30) + assert output_1.shape == (1, 1) + assert output_2.shape == (2, 1) + + +def test_referral_outcomes(): + df = pd.DataFrame( + [ + [ + "CHILD1", + date(1965, 6, 15), + date(1970, 10, 6), + date(1970, 6, 3), + date(1970, 6, 2), + ], + [ + "CHILD1", + date(1965, 6, 15), + date(1970, 10, 6), + date(1970, 6, 3), + date(1970, 6, 2), + ], + [ + "CHILD2", + date(1992, 1, 2), + date(2001, 11, 7), + date(2001, 10, 25), + date(2001, 10, 20), + ], + [ + "CHILD3", + date(1995, 7, 21), + date(2003, 9, 5), + date(2003, 8, 28), + date(2003, 8, 26), + ], + ], + columns=[ + "LAchildID", + "PersonBirthDate", + "CINreferralDate", + "AssessmentActualStartDate", + "S47ActualStartDate", + ], + ) + + df = referral_outcomes(df) + + assert list(df["AssessmentActualStartDate"]) == [ + np.nan, + date(2001, 10, 25), + date(2003, 8, 28), + ] + assert list(df["days_to_s17"]) == [pd.NA, 13, 8] + assert list(df["S47ActualStartDate"]) == [ + np.nan, + date(2001, 10, 20), + date(2003, 8, 26), + ] + assert list(df["days_to_s47"]) == [pd.NA, 18, 10] + assert list(df["referral_outcome"]) == ["NFA", "BOTH", "BOTH"] + assert list(df["Age at referral"]) == [5, 9, 8] + + +def test_s47_journeys(): + df = pd.DataFrame( + [ + [ + "CHILD1", + date(1965, 6, 15), + date(1970, 10, 6), + date(1970, 3, 3), + date(1970, 5, 25), + date(1970, 4, 5), + 2022, + ], + [ + "CHILD1", + date(1965, 6, 15), + date(1970, 10, 6), + date(1970, 3, 3), + date(1970, 5, 25), + date(1970, 4, 5), + 2022, + ], + [ + "CHILD2", + date(1992, 1, 2), + date(2001, 11, 7), + date(2001, 8, 2), + date(2001, 10, 12), + date(2001, 9, 29), + 2022, + ], + [ + "CHILD3", + date(2015, 7, 21), + date(2022, 9, 5), + date(2022, 7, 27), + pd.NA, + pd.NA, + 2022, + ], + [ + "CHILD4", + date(1997, 7, 21), + date(2006, 9, 5), + date(2006, 7, 28), + date(2006, 8, 16), + pd.NA, + 2022, + ], + [ + "CHILD5", + date(1993, 4, 22), + date(2001, 9, 2), + date(2001, 7, 22), + pd.NA, + pd.NA, + 2022, + ], + ], + columns=[ + "LAchildID", + "PersonBirthDate", + "CINreferralDate", + "S47ActualStartDate", + "CPPstartDate", + "DateOfInitialCPC", + "YEAR", + ], + ) + + df = s47_journeys(df) + + assert list(df["icpc_to_cpp"]) == [13, pd.NA, 13] + assert list(df["s47_to_cpp"]) == [71, 19, 71] + assert list(df["cin_census_close"]) == [date(2022, 3, 31), date(2022, 3, 31), date(2022, 3, 31)] + assert list(df["s47_max_date"]) == [date(2022, 1, 30), date(2022, 1, 30), date(2022, 1, 30)] + assert list(df["icpc_max_date"]) == [date(2022, 2, 14), date(2022, 2, 14), date(2022, 2, 14)] + assert list(df["Source"]) == ["S47 strategy discussion", "S47 strategy discussion", "ICPC"] + assert list(df["Destination"]) == ["ICPC", "CPP Start", "CPP Start"] + assert list(df["Age at S47"]) == [9, 9, 9] diff --git a/tests/cin_census/test_stream_filters.py b/tests/cin_census/test_stream_filters.py new file mode 100644 index 00000000..457007e5 --- /dev/null +++ b/tests/cin_census/test_stream_filters.py @@ -0,0 +1,153 @@ +from collections import namedtuple + +from sfdata_stream_parser.events import TextNode +from liiatools.common.spec.__data_schema import ( + Column, + Numeric, + Category, +) +from liiatools.cin_census_pipeline.spec import load_schema_path +from liiatools.cin_census_pipeline.stream_filters import add_column_spec + + +def test_add_column_spec(): + Schema = namedtuple("schema", "occurs type") + Name = namedtuple("type", "name") + + category_schema = Schema((0, 1), Name("yesnotype")) + numeric_schema = Schema((0, 1), Name("positiveintegertype")) + regex_schema = Schema((0, 1), Name("upntype")) + date_schema = Schema((0, 1), Name("{http://www.w3.org/2001/XMLSchema}date")) + date_time_schema = Schema((0, 1), Name("{http://www.w3.org/2001/XMLSchema}dateTime")) + integer_schema = Schema((0, 1), Name("http://www.w3.org/2001/XMLSchema}integer")) + string_schema = Schema((0, 1), Name("{http://www.w3.org/2001/XMLSchema}string")) + alphanumeric_schema = Schema((1, 1), Name(None)) + + schema_path = load_schema_path(2022) + stream = [ + TextNode(text=None, schema=category_schema), + TextNode(text=None, schema=numeric_schema), + TextNode(text=None, schema=regex_schema), + TextNode(text=None, schema=date_schema), + TextNode(text=None, schema=date_time_schema), + TextNode(text=None, schema=integer_schema), + TextNode(text=None, schema=string_schema), + TextNode(text=None, schema=alphanumeric_schema), + ] + + column_spec = list(add_column_spec(stream, schema_path=schema_path)) + + assert column_spec[0].column_spec == Column( + string=None, + numeric=None, + date=None, + dictionary=None, + category=[ + Category( + code="0", + name="False", + cell_regex=None, + model_config={"extra": "forbid"}, + ), + Category( + code="1", + name="True", + cell_regex=None, + model_config={"extra": "forbid"}, + ), + ], + header_regex=None, + cell_regex=None, + canbeblank=True, + model_config={"extra": "forbid"}, + ) + + assert column_spec[1].column_spec == Column( + string=None, + numeric=Numeric( + type="integer", + min_value=0, + max_value=None, + decimal_places=None, + model_config={"extra": "forbid"}, + ), + date=None, + dictionary=None, + category=None, + header_regex=None, + cell_regex=None, + canbeblank=True, + model_config={"extra": "forbid"}, + ) + + assert column_spec[2].column_spec == Column( + string="regex", + numeric=None, + date=None, + dictionary=None, + category=None, + header_regex=None, + cell_regex="[A-Za-z]\\d{11}(\\d|[A-Za-z])", + canbeblank=True, + model_config={"extra": "forbid"}, + ) + + assert column_spec[3].column_spec == Column( + string=None, + numeric=None, + date="%Y-%m-%d", + dictionary=None, + category=None, + header_regex=None, + cell_regex=None, + canbeblank=True, + model_config={"extra": "forbid"}, + ) + + assert column_spec[4].column_spec == Column( + string=None, + numeric=None, + date="%Y-%m-%dT%H:%M:%S", + dictionary=None, + category=None, + header_regex=None, + cell_regex=None, + canbeblank=True, + model_config={"extra": "forbid"}, + ) + + assert column_spec[5].column_spec == Column( + string=None, + numeric=None, + date=None, + dictionary=None, + category=None, + header_regex=None, + cell_regex=None, + canbeblank=True, + model_config={"extra": "forbid"}, + ) + + assert column_spec[6].column_spec == Column( + string="alphanumeric", + numeric=None, + date=None, + dictionary=None, + category=None, + header_regex=None, + cell_regex=None, + canbeblank=True, + model_config={"extra": "forbid"}, + ) + + assert column_spec[7].column_spec == Column( + string="alphanumeric", + numeric=None, + date=None, + dictionary=None, + category=None, + header_regex=None, + cell_regex=None, + canbeblank=False, + model_config={"extra": "forbid"}, + ) diff --git a/tests/cin_census/test_stream_record.py b/tests/cin_census/test_stream_record.py new file mode 100644 index 00000000..9fc13c6e --- /dev/null +++ b/tests/cin_census/test_stream_record.py @@ -0,0 +1,342 @@ +import unittest +from datetime import date + +from sfdata_stream_parser.events import StartElement, EndElement, TextNode + +from liiatools.cin_census_pipeline.stream_record import ( + CINEvent, + HeaderEvent, + cin_collector, + child_collector, + message_collector, + _maybe_list, + cin_event, + event_to_records, + export_table, +) + + +def test_maybe_list(): + assert _maybe_list(None) == [] + assert _maybe_list(42) == [42] + assert _maybe_list([1, 2, 3]) == [1, 2, 3] + + +def test_cin_event(): + record = {"Name": "John", "DOB": "2000-01-01"} + property = "DOB" + event_name = "Date of Birth" + assert cin_event( + record, property, event_name, export_headers=["DOB", "Date", "Type"] + ) == ( + {"DOB": "2000-01-01", "Date": "2000-01-01", "Type": "Date of Birth"}, + ) + assert cin_event(record, "UnknownProperty") == () + + +def test_event_to_records(): + cin_record = { + "ChildIdentifiers": { + "LAchildID": "DfEX0000001", + "UPN": "A123456789123", + "PersonBirthDate": "2004-03-24", + }, + "ChildCharacteristics": {"Ethnicity": "WBRI"}, + "CINdetails": { + "CINreferralDate": "2009-03-15", + }, + } + + assert list(event_to_records(CINEvent(record=cin_record))) == [ + { + "LAchildID": "DfEX0000001", + "Date": "2009-03-15", + "Type": "CINreferralDate", + "CINreferralDate": "2009-03-15", + "ReferralSource": None, + "PrimaryNeedCode": None, + "CINclosureDate": None, + "ReasonForClosure": None, + "DateOfInitialCPC": None, + "ReferralNFA": None, + "CINPlanStartDate": None, + "CINPlanEndDate": None, + "S47ActualStartDate": None, + "InitialCPCtarget": None, + "ICPCnotRequired": None, + "AssessmentActualStartDate": None, + "AssessmentInternalReviewDate": None, + "AssessmentAuthorisationDate": None, + "Factors": None, + "CPPstartDate": None, + "CPPendDate": None, + "InitialCategoryOfAbuse": None, + "LatestCategoryOfAbuse": None, + "NumberOfPreviousCPP": None, + "UPN": "A123456789123", + "FormerUPN": None, + "UPNunknown": None, + "PersonBirthDate": "2004-03-24", + "ExpectedPersonBirthDate": None, + "GenderCurrent": None, + "PersonDeathDate": None, + "Ethnicity": "WBRI", + "Disabilities": "", + }, + ] + + +class TestRecord(unittest.TestCase): + def generate_text_element(self, tag: str, cell): + """ + Create a complete TextNode sandwiched between a StartElement and EndElement + + :param tag: XML tag + :param cell: text to be stored in the given XML tag, could be a string, integer, float etc. + :return: StartElement and EndElement with given tags and TextNode with given text + """ + yield StartElement(tag=tag) + yield TextNode(cell=str(cell), text=None) + yield EndElement(tag=tag) + + def generate_test_child(self): + """ + Generate a sample child + + :return: Stream of generators containing information required to create a child + """ + yield StartElement(tag="Child") + yield StartElement(tag="ChildIdentifiers") + yield from self.generate_text_element(tag="LAchildID", cell="DfEX0000001") + yield from self.generate_text_element(tag="UPN", cell="A123456789123") + yield from self.generate_text_element( + tag="PersonBirthDate", cell=date(2004, 3, 24) + ) + yield EndElement(tag="ChildIdentifiers") + yield StartElement(tag="ChildCharacteristics") + yield from self.generate_text_element(tag="Ethnicity", cell="WBRI") + yield EndElement(tag="ChildCharacteristics") + yield StartElement(tag="CINdetails") + yield from self.generate_text_element( + tag="CINreferralDate", cell=date(2009, 3, 15) + ) + yield from self.generate_text_element(tag="ReferralSource", cell="1A") + yield from self.generate_text_element(tag="ReferralNFA", cell="false") + yield StartElement(tag="Assessments") + yield from self.generate_text_element( + tag="AssessmentActualStartDate", cell=date(2009, 2, 21) + ) + yield EndElement(tag="Assessments") + yield StartElement(tag="Section47") + yield from self.generate_text_element( + tag="S47ActualStartDate", cell=date(2009, 2, 17) + ) + yield EndElement(tag="Section47") + yield StartElement(tag="ChildProtectionPlans") + yield from self.generate_text_element(tag="NumberOfPreviousCPP", cell=10) + yield EndElement(tag="ChildProtectionPlans") + yield EndElement(tag="CINdetails") + yield EndElement(tag="Child") + + def generate_test_cin_census_file(self): + """ + Generate a sample children in need census file + + :return: Stream of generators containing information required to create an XML file + """ + yield StartElement(tag="Message") + yield StartElement(tag="Header") + yield from self.generate_text_element(tag="Version", cell=1) + yield EndElement(tag="Header") + yield from self.generate_test_child() + yield EndElement(tag="Message") + + def test_cin_collector(self): + test_stream = self.generate_test_cin_census_file() + test_event = cin_collector(test_stream) + self.assertEqual( + test_event, + { + "Version": "1", + "LAchildID": "DfEX0000001", + "UPN": "A123456789123", + "PersonBirthDate": "2004-03-24", + "Ethnicity": "WBRI", + "CINreferralDate": "2009-03-15", + "ReferralSource": "1A", + "ReferralNFA": "false", + "Assessments": {"AssessmentActualStartDate": "2009-02-21"}, + "Section47": {"S47ActualStartDate": "2009-02-17"}, + "ChildProtectionPlans": {"NumberOfPreviousCPP": "10"}, + }, + ) + + def test_child_collector(self): + test_stream = self.generate_test_child() + test_event = child_collector(test_stream) + self.assertEqual( + test_event, + { + "ChildIdentifiers": { + "LAchildID": "DfEX0000001", + "UPN": "A123456789123", + "PersonBirthDate": "2004-03-24", + }, + "ChildCharacteristics": {"Ethnicity": "WBRI"}, + "CINdetails": { + "CINreferralDate": "2009-03-15", + "ReferralSource": "1A", + "ReferralNFA": "false", + "Assessments": {"AssessmentActualStartDate": "2009-02-21"}, + "Section47": {"S47ActualStartDate": "2009-02-17"}, + "ChildProtectionPlans": {"NumberOfPreviousCPP": "10"}, + }, + }, + ) + + def test_message_collector(self): + test_stream = self.generate_test_cin_census_file() + test_events = list(message_collector(test_stream)) + self.assertEqual(len(test_events), 2) + self.assertIsInstance(test_events[0], HeaderEvent) + self.assertEqual(test_events[0].record, {"Version": "1"}) + self.assertIsInstance(test_events[1], CINEvent) + self.assertEqual( + test_events[1].record, + { + "ChildIdentifiers": { + "LAchildID": "DfEX0000001", + "UPN": "A123456789123", + "PersonBirthDate": "2004-03-24", + }, + "ChildCharacteristics": {"Ethnicity": "WBRI"}, + "CINdetails": { + "CINreferralDate": "2009-03-15", + "ReferralSource": "1A", + "ReferralNFA": "false", + "Assessments": {"AssessmentActualStartDate": "2009-02-21"}, + "Section47": {"S47ActualStartDate": "2009-02-17"}, + "ChildProtectionPlans": {"NumberOfPreviousCPP": "10"}, + }, + }, + ) + + def test_export_table(self): + test_stream = self.generate_test_cin_census_file() + test_events = list(message_collector(test_stream)) + dataset_holder, stream = export_table(test_events) + + self.assertEqual(len(list(stream)), 2) + + data = dataset_holder.value + self.assertEqual(len(data), 1) + self.assertEqual( + data["CIN"], + [ + { + "LAchildID": "DfEX0000001", + "Date": "2009-03-15", + "Type": "CINreferralDate", + "CINreferralDate": "2009-03-15", + "ReferralSource": "1A", + "PrimaryNeedCode": None, + "CINclosureDate": None, + "ReasonForClosure": None, + "DateOfInitialCPC": None, + "ReferralNFA": "false", + "CINPlanStartDate": None, + "CINPlanEndDate": None, + "S47ActualStartDate": None, + "InitialCPCtarget": None, + "ICPCnotRequired": None, + "AssessmentActualStartDate": None, + "AssessmentInternalReviewDate": None, + "AssessmentAuthorisationDate": None, + "Factors": None, + "CPPstartDate": None, + "CPPendDate": None, + "InitialCategoryOfAbuse": None, + "LatestCategoryOfAbuse": None, + "NumberOfPreviousCPP": None, + "UPN": "A123456789123", + "FormerUPN": None, + "UPNunknown": None, + "PersonBirthDate": "2004-03-24", + "ExpectedPersonBirthDate": None, + "GenderCurrent": None, + "PersonDeathDate": None, + "Ethnicity": "WBRI", + "Disabilities": "", + }, + { + "LAchildID": "DfEX0000001", + "Date": "2009-02-21", + "Type": "AssessmentActualStartDate", + "CINreferralDate": "2009-03-15", + "ReferralSource": "1A", + "PrimaryNeedCode": None, + "CINclosureDate": None, + "ReasonForClosure": None, + "DateOfInitialCPC": None, + "ReferralNFA": "false", + "CINPlanStartDate": None, + "CINPlanEndDate": None, + "S47ActualStartDate": None, + "InitialCPCtarget": None, + "ICPCnotRequired": None, + "AssessmentActualStartDate": "2009-02-21", + "AssessmentInternalReviewDate": None, + "AssessmentAuthorisationDate": None, + "Factors": "", + "CPPstartDate": None, + "CPPendDate": None, + "InitialCategoryOfAbuse": None, + "LatestCategoryOfAbuse": None, + "NumberOfPreviousCPP": None, + "UPN": "A123456789123", + "FormerUPN": None, + "UPNunknown": None, + "PersonBirthDate": "2004-03-24", + "ExpectedPersonBirthDate": None, + "GenderCurrent": None, + "PersonDeathDate": None, + "Ethnicity": "WBRI", + "Disabilities": "", + }, + { + "LAchildID": "DfEX0000001", + "Date": "2009-02-17", + "Type": "S47ActualStartDate", + "CINreferralDate": "2009-03-15", + "ReferralSource": "1A", + "PrimaryNeedCode": None, + "CINclosureDate": None, + "ReasonForClosure": None, + "DateOfInitialCPC": None, + "ReferralNFA": "false", + "CINPlanStartDate": None, + "CINPlanEndDate": None, + "S47ActualStartDate": "2009-02-17", + "InitialCPCtarget": None, + "ICPCnotRequired": None, + "AssessmentActualStartDate": None, + "AssessmentInternalReviewDate": None, + "AssessmentAuthorisationDate": None, + "Factors": None, + "CPPstartDate": None, + "CPPendDate": None, + "InitialCategoryOfAbuse": None, + "LatestCategoryOfAbuse": None, + "NumberOfPreviousCPP": None, + "UPN": "A123456789123", + "FormerUPN": None, + "UPNunknown": None, + "PersonBirthDate": "2004-03-24", + "ExpectedPersonBirthDate": None, + "GenderCurrent": None, + "PersonDeathDate": None, + "Ethnicity": "WBRI", + "Disabilities": "", + }, + ], + ) diff --git a/tests/common/test_stream_record.py b/tests/common/test_stream_record.py index fe157136..24e90b8f 100644 --- a/tests/common/test_stream_record.py +++ b/tests/common/test_stream_record.py @@ -5,11 +5,8 @@ from liiatools.common.stream_record import ( _reduce_dict, text_collector, - export_table, ) -from liiatools.csww_pipeline.stream_record import message_collector - def test_reduce_dict(): sample_dict = { @@ -74,18 +71,3 @@ def test_text_collector(self): "Agency": "0", }, ) - - def test_export_table(self): - test_stream = self.generate_test_csww_file() - test_events = list(message_collector(test_stream)) - dataset_holder, stream = export_table(test_events) - - self.assertEqual(len(list(stream)), 3) - - data = dataset_holder.value - self.assertEqual(len(data), 3) - self.assertEqual(data["Header"], [{"Version": "1"}]) - self.assertEqual(data["LA_Level"], [{"NumberOfVacancies": "100"}]) - self.assertEqual( - data["Worker"], [{"ID": "100", "SWENo": "AB123456789", "Agency": "0"}] - ) diff --git a/tests/social_work_workforce/test_stream_record.py b/tests/social_work_workforce/test_stream_record.py index 71479fbf..280476c5 100644 --- a/tests/social_work_workforce/test_stream_record.py +++ b/tests/social_work_workforce/test_stream_record.py @@ -7,6 +7,7 @@ LALevelEvent, HeaderEvent, message_collector, + export_table, ) @@ -56,3 +57,18 @@ def test_message_collector(self): self.assertEqual( test_events[2].record, {"ID": "100", "SWENo": "AB123456789", "Agency": "0"} ) + + def test_export_table(self): + test_stream = self.generate_test_csww_file() + test_events = list(message_collector(test_stream)) + dataset_holder, stream = export_table(test_events) + + self.assertEqual(len(list(stream)), 3) + + data = dataset_holder.value + self.assertEqual(len(data), 3) + self.assertEqual(data["Header"], [{"Version": "1"}]) + self.assertEqual(data["LA_Level"], [{"NumberOfVacancies": "100"}]) + self.assertEqual( + data["Worker"], [{"ID": "100", "SWENo": "AB123456789", "Agency": "0"}] + )