Skip to content

Commit

Permalink
basic cin pipeline working
Browse files Browse the repository at this point in the history
  • Loading branch information
patrick-troy committed Jan 23, 2024
1 parent a9d3f73 commit 290adb9
Show file tree
Hide file tree
Showing 40 changed files with 1,179 additions and 1,162 deletions.
4 changes: 2 additions & 2 deletions liiatools/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from liiatools.annex_a_pipeline.cli import annex_a

# from liiatools.datasets.cin_census.cin_cli import cin_census
from liiatools.cin_census_pipeline.cli import cin_census
from liiatools.csww_pipeline.cli import csww
from liiatools.ssda903_pipeline.cli import s903
from liiatools.s251_pipeline.cli import s251
Expand All @@ -14,7 +14,7 @@ def cli():


cli.add_command(annex_a)
# cli.add_command(cin_census)
cli.add_command(cin_census)
cli.add_command(s903)
cli.add_command(csww)
cli.add_command(s251)
Expand Down
4 changes: 2 additions & 2 deletions liiatools/cin_census_pipeline/_reports_assessment_factors.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@


def expanded_assessment_factors(
data: pd.DataFrame, column_name="AssessmentFactor", prefix: str = ""
data: pd.DataFrame, column_name="Factors", prefix: str = ""
) -> pd.DataFrame:
"""
Expects to receive a dataframe with a column named 'AssessmentFactor' containing a comma-separated list of values.
Expects to receive a dataframe with a column named "Factors" containing a comma-separated list of values.
Expands these values into a "one-hot" encoding of the values. Can optionally prefix the column names with a
prefix string.
Expand Down
Empty file.
57 changes: 57 additions & 0 deletions liiatools/cin_census_pipeline/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import logging

import click as click
import click_log
from fs import open_fs

from liiatools.common.reference import authorities

from .pipeline import process_session

log = logging.getLogger()
click_log.basic_config(log)


@click.group()
def cin_census():
"""Functions for cleaning, minimising and aggregating CIN census files"""
pass


@cin_census.command()
@click.option(
"--la-code",
"-c",
required=True,
type=click.Choice(authorities.codes, case_sensitive=False),
help="Local authority code",
)
@click.option(
"--output",
"-o",
required=True,
type=click.Path(file_okay=False, writable=True),
help="Output folder",
)
@click.option(
"--input",
"-i",
type=click.Path(exists=True, file_okay=False, readable=True),
)
@click_log.simple_verbosity_option(log)
def pipeline(input, la_code, output):
"""
Runs the full pipeline on a file or folder
:param input: The path to the input folder
:param la_code: A three-letter string for the local authority depositing the file
:param output: The path to the output folder
:return: None
"""

# Source FS is the filesystem containing the input files
source_fs = open_fs(input)

# Get the output filesystem
output_fs = open_fs(output)

process_session(source_fs, output_fs, la_code)
80 changes: 76 additions & 4 deletions liiatools/cin_census_pipeline/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,44 @@
from fs import FS
import logging
from fs import open_fs
from fs.base import FS

from liiatools.common import pipeline as pl
from liiatools.common.archive import DataframeArchive
from liiatools.common.constants import ProcessNames, SessionNames
from liiatools.common.data import (
DataContainer,
ErrorContainer,
FileLocator,
PipelineConfig,
ProcessResult,
TableConfig,
)
from liiatools.common.transform import degrade_data, enrich_data, prepare_export

from liiatools.cin_census_pipeline.spec import (
load_pipeline_config,
load_schema,
load_schema_path,
)

from liiatools.cin_census_pipeline.stream_pipeline import task_cleanfile


logger = logging.getLogger()


def process_file(
file_locator: FileLocator,
session_folder: FS,
pipeline_config: PipelineConfig,
la_code: str,
) -> ProcessResult:
"""
Clean, enrich and degrade data
:param file_locator: The pointer to a file in a virtual filesystem
:param session_folder: The path to the session folder
:param pipeline_config: The pipeline configuration
:param la_code: A three-letter string for the local authority depositing the file
:return: A class containing a DataContainer and ErrorContainer
"""
errors = ErrorContainer()
year = pl.discover_year(file_locator)
if year is None:
Expand All @@ -35,10 +54,14 @@ def process_file(
# We save these files based on the session UUID - so UUID must exist
uuid = file_locator.meta["uuid"]

# Load schema and set on processing metadata
schema = load_schema(year=year)
schema_path = load_schema_path(year=year)
metadata = dict(year=year, schema=schema, la_code=la_code)

# Normalise the data and export to the session 'cleaned' folder
try:
cleanfile_result = task_cleanfile(file_locator, schema)
cleanfile_result = task_cleanfile(file_locator, schema, schema_path)
except Exception as e:
logger.exception(f"Error cleaning file {file_locator.name}")
errors.append(
Expand All @@ -50,8 +73,43 @@ def process_file(
)
return ProcessResult(data=None, errors=errors)

# Export the cleaned data to the session 'cleaned' folder
cleanfile_result.data.export(
session_folder, f"{SessionNames.CLEANED_FOLDER}/{uuid}_", "parquet"
)
errors.extend(cleanfile_result.errors)

# Enrich the data and export to the session 'enriched' folder
enrich_result = enrich_data(cleanfile_result.data, pipeline_config, metadata)
enrich_result.data.export(
session_folder, f"{SessionNames.ENRICHED_FOLDER}/{uuid}_", "parquet"
)
errors.extend(enrich_result.errors)

# Degrade the data and export to the session 'degraded' folder
degraded_result = degrade_data(enrich_result.data, pipeline_config, metadata)
degraded_result.data.export(
session_folder, f"{SessionNames.DEGRADED_FOLDER}/{uuid}_", "parquet"
)
errors.extend(degraded_result.errors)

errors.set_property("filename", file_locator.name)
errors.set_property("uuid", uuid)

return ProcessResult(data=degraded_result.data, errors=errors)


def process_session(source_fs: FS, output_fs: FS, la_code: str):
"""
Runs the full pipeline on a file or folder
:param source_fs: File system containing the input files
:param output_fs: File system for the output files
:param la_code: A three-letter string for the local authority depositing the file
:return: None
"""
# Before we start - load configuration for this dataset
pipeline_config = load_pipeline_config()

# Ensure all processing folders exist
pl.create_process_folders(output_fs)

Expand Down Expand Up @@ -88,3 +146,17 @@ def process_session(source_fs: FS, output_fs: FS, la_code: str):
current_data.export(
output_fs.opendir(ProcessNames.CURRENT_FOLDER), "cin_cencus_current_", "csv"
)

# Create the different reports
export_folder = output_fs.opendir(ProcessNames.EXPORT_FOLDER)
for report in ["PAN"]:
report_data = prepare_export(current_data, pipeline_config, profile=report)
report_folder = export_folder.makedirs(report, recreate=True)
report_data.data.export(report_folder, "cin_census_", "csv")


process_session(
open_fs(r"C:\Users\patrick.troy\OneDrive - Social Finance Ltd\Work\LIIA\LIIA tests\CIN\pipeline\input"),
open_fs(r"C:\Users\patrick.troy\OneDrive - Social Finance Ltd\Work\LIIA\LIIA tests\CIN\pipeline\output"),
la_code="BAR"
)
27 changes: 14 additions & 13 deletions liiatools/cin_census_pipeline/spec/CIN_schema_2017.xsd
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@

<xs:complexType name="childidentifierstype">
<xs:sequence>
<xs:element name="LAchildID" type="nonEmptyString" minOccurs="1" maxOccurs="1"/>
<xs:element name="LAchildID" type="xs:string" minOccurs="1" maxOccurs="1"/>
<xs:element name="UPN" type="upntype" minOccurs="0" maxOccurs="1"/>
<xs:element name="FormerUPN" type="upntype" minOccurs="0" maxOccurs="1"/>
<xs:element name="UPNunknown" type="unknownupntype" minOccurs="0" maxOccurs="1"/>
Expand Down Expand Up @@ -143,9 +143,9 @@
<xs:sequence>
<xs:element name="CPPstartDate" type="xs:date" minOccurs="1" maxOccurs="1"/>
<xs:element name="CPPendDate" type="xs:date" minOccurs="0" maxOccurs="1"/>
<xs:element name="InitialCategoryOfAbuse" type="categoryofabuse" minOccurs="1" maxOccurs="1"/>
<xs:element name="LatestCategoryOfAbuse" type="categoryofabuse" minOccurs="1" maxOccurs="1"/>
<xs:element name="NumberOfPreviousCPP" type="xs:positiveInteger" minOccurs="1" maxOccurs="1"/>
<xs:element name="InitialCategoryOfAbuse" type="categoryofabusetype" minOccurs="1" maxOccurs="1"/>
<xs:element name="LatestCategoryOfAbuse" type="categoryofabusetype" minOccurs="1" maxOccurs="1"/>
<xs:element name="NumberOfPreviousCPP" type="positiveintegertype" minOccurs="1" maxOccurs="1"/>
<xs:element name="Reviews" minOccurs="0" maxOccurs="1">
<xs:complexType>
<xs:sequence>
Expand All @@ -161,22 +161,23 @@
<xs:minLength value="1"/>
</xs:restriction>
</xs:simpleType>

<xs:simpleType name="positiveintegertype">
<xs:restriction base="xs:integer">
<xs:minInclusive value="0"/>
</xs:restriction>
</xs:simpleType>

<xs:simpleType name="yesnotype">
<xs:restriction base="nonEmptyString">
<xs:enumeration value="true" />
<xs:enumeration value="false" />
<xs:enumeration value="TRUE" />
<xs:enumeration value="FALSE" />
<xs:enumeration value="0" />
<xs:enumeration value="1" />
<xs:enumeration value="0"><xs:annotation><xs:documentation>False</xs:documentation></xs:annotation></xs:enumeration>
<xs:enumeration value="1"><xs:annotation><xs:documentation>True</xs:documentation></xs:annotation></xs:enumeration>
</xs:restriction>
</xs:simpleType>

<xs:simpleType name="upntype">
<xs:restriction base="xs:string">
<xs:pattern value="[A-Za-z]\d{12}"/>
<xs:pattern value="[A-Za-z]\d{11}[A-Za-z]"/>
<xs:pattern value="[A-Za-z]\d{11}(\d|[A-Za-z])"/>
</xs:restriction>
</xs:simpleType>

Expand Down Expand Up @@ -279,7 +280,7 @@
</xs:restriction>
</xs:simpleType>

<xs:simpleType name="categoryofabuse">
<xs:simpleType name="categoryofabusetype">
<xs:restriction base="nonEmptyString">
<xs:enumeration value="NEG"><xs:annotation><xs:documentation>Neglect</xs:documentation></xs:annotation></xs:enumeration>
<xs:enumeration value="PHY"><xs:annotation><xs:documentation>Physical abuse</xs:documentation></xs:annotation></xs:enumeration>
Expand Down
27 changes: 14 additions & 13 deletions liiatools/cin_census_pipeline/spec/CIN_schema_2018.xsd
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@

<xs:complexType name="childidentifierstype">
<xs:sequence>
<xs:element name="LAchildID" type="nonEmptyString" minOccurs="1" maxOccurs="1"/>
<xs:element name="LAchildID" type="xs:string" minOccurs="1" maxOccurs="1"/>
<xs:element name="UPN" type="upntype" minOccurs="0" maxOccurs="1"/>
<xs:element name="FormerUPN" type="upntype" minOccurs="0" maxOccurs="1"/>
<xs:element name="UPNunknown" type="unknownupntype" minOccurs="0" maxOccurs="1"/>
Expand Down Expand Up @@ -143,9 +143,9 @@
<xs:sequence>
<xs:element name="CPPstartDate" type="xs:date" minOccurs="1" maxOccurs="1"/>
<xs:element name="CPPendDate" type="xs:date" minOccurs="0" maxOccurs="1"/>
<xs:element name="InitialCategoryOfAbuse" type="categoryofabuse" minOccurs="1" maxOccurs="1"/>
<xs:element name="LatestCategoryOfAbuse" type="categoryofabuse" minOccurs="1" maxOccurs="1"/>
<xs:element name="NumberOfPreviousCPP" type="xs:positiveInteger" minOccurs="1" maxOccurs="1"/>
<xs:element name="InitialCategoryOfAbuse" type="categoryofabusetype" minOccurs="1" maxOccurs="1"/>
<xs:element name="LatestCategoryOfAbuse" type="categoryofabusetype" minOccurs="1" maxOccurs="1"/>
<xs:element name="NumberOfPreviousCPP" type="positiveintegertype" minOccurs="1" maxOccurs="1"/>
<xs:element name="Reviews" minOccurs="0" maxOccurs="1">
<xs:complexType>
<xs:sequence>
Expand All @@ -162,21 +162,22 @@
</xs:restriction>
</xs:simpleType>

<xs:simpleType name="positiveintegertype">
<xs:restriction base="xs:integer">
<xs:minInclusive value="0"/>
</xs:restriction>
</xs:simpleType>

<xs:simpleType name="yesnotype">
<xs:restriction base="nonEmptyString">
<xs:enumeration value="true" />
<xs:enumeration value="false" />
<xs:enumeration value="TRUE" />
<xs:enumeration value="FALSE" />
<xs:enumeration value="0" />
<xs:enumeration value="1" />
<xs:enumeration value="0"><xs:annotation><xs:documentation>False</xs:documentation></xs:annotation></xs:enumeration>
<xs:enumeration value="1"><xs:annotation><xs:documentation>True</xs:documentation></xs:annotation></xs:enumeration>
</xs:restriction>
</xs:simpleType>

<xs:simpleType name="upntype">
<xs:restriction base="xs:string">
<xs:pattern value="[A-Za-z]\d{12}"/>
<xs:pattern value="[A-Za-z]\d{11}[A-Za-z]"/>
<xs:pattern value="[A-Za-z]\d{11}(\d|[A-Za-z])"/>
</xs:restriction>
</xs:simpleType>

Expand Down Expand Up @@ -279,7 +280,7 @@
</xs:restriction>
</xs:simpleType>

<xs:simpleType name="categoryofabuse">
<xs:simpleType name="categoryofabusetype">
<xs:restriction base="nonEmptyString">
<xs:enumeration value="NEG"><xs:annotation><xs:documentation>Neglect</xs:documentation></xs:annotation></xs:enumeration>
<xs:enumeration value="PHY"><xs:annotation><xs:documentation>Physical abuse</xs:documentation></xs:annotation></xs:enumeration>
Expand Down
Loading

0 comments on commit 290adb9

Please sign in to comment.