From c0cb5c19d314e85e4978c70daa1869c7b8e51d45 Mon Sep 17 00:00:00 2001 From: patrick-troy Date: Fri, 30 Aug 2024 14:38:58 +0100 Subject: [PATCH] add filename argument to school census cli --- liiatools/school_census_pipeline/cli.py | 10 +- liiatools/school_census_pipeline/pipeline.py | 125 ++++++++++--------- 2 files changed, 73 insertions(+), 62 deletions(-) diff --git a/liiatools/school_census_pipeline/cli.py b/liiatools/school_census_pipeline/cli.py index 4826a444..b452f670 100644 --- a/liiatools/school_census_pipeline/cli.py +++ b/liiatools/school_census_pipeline/cli.py @@ -38,8 +38,14 @@ def school_census(): "-i", type=click.Path(exists=True, file_okay=False, readable=True), ) +@click.option( + "--filename", + "-f", + type=str, + help="Name of files you want to clean" +) @click_log.simple_verbosity_option(log) -def pipeline(input, la_code, output): +def pipeline(input, la_code, output, filename): """Runs the full pipeline on a file or folder""" # Source FS is the filesystem containing the input files @@ -48,4 +54,4 @@ def pipeline(input, la_code, output): # Get the output filesystem output_fs = open_fs(output) - process_session(source_fs, output_fs, la_code) + process_session(source_fs, output_fs, la_code, filename) diff --git a/liiatools/school_census_pipeline/pipeline.py b/liiatools/school_census_pipeline/pipeline.py index c129f728..f0cbefef 100644 --- a/liiatools/school_census_pipeline/pipeline.py +++ b/liiatools/school_census_pipeline/pipeline.py @@ -24,78 +24,83 @@ def process_file( session_folder: FS, pipeline_config: PipelineConfig, la_code: str, + filename: str = None, ) -> ProcessResult: errors = ErrorContainer() - year = pl.discover_year(file_locator) - if year is None: - errors.append( - dict( - type="MissingYear", - message="Could not find a year in the filename or path", - filename=file_locator.name, + if filename is None or filename in file_locator.name: + year = pl.discover_year(file_locator) + if year is None: + errors.append( + dict( + type="MissingYear", + message="Could not find a year in the filename or path", + filename=file_locator.name, + ) ) - ) - return ProcessResult(data=None, errors=errors) - - term = pl.discover_term(file_locator) - if term is None: - errors.append( - dict( - type="MissingTerm", - message="Could not find a term in the filename or path", - filename=file_locator.name, + return ProcessResult(data=None, errors=errors) + + term = pl.discover_term(file_locator) + if term is None: + errors.append( + dict( + type="MissingTerm", + message="Could not find a term in the filename or path", + filename=file_locator.name, + ) ) - ) - return ProcessResult(data=None, errors=errors) - - # We save these files based on the session UUID - so UUID must exist - uuid = file_locator.meta["uuid"] - - # Load schema and set on processing metadata - schema = load_schema(year, term) - metadata = dict(year=year, schema=schema, la_code=la_code, term=term) - - # Normalise the data and export to the session 'cleaned' folder - try: - cleanfile_result = task_cleanfile(file_locator, schema) - except Exception as e: - logger.exception(f"Error cleaning file {file_locator.name}") - errors.append( - dict( - type="StreamError", - message="Failed to clean file. Check log files for technical errors.", - filename=file_locator.name, + return ProcessResult(data=None, errors=errors) + + # We save these files based on the session UUID - so UUID must exist + uuid = file_locator.meta["uuid"] + + # Load schema and set on processing metadata + schema = load_schema(year, term) + metadata = dict(year=year, schema=schema, la_code=la_code, term=term) + + # Normalise the data and export to the session 'cleaned' folder + try: + cleanfile_result = task_cleanfile(file_locator, schema) + except Exception as e: + logger.exception(f"Error cleaning file {file_locator.name}") + errors.append( + dict( + type="StreamError", + message="Failed to clean file. Check log files for technical errors.", + filename=file_locator.name, + ) ) + return ProcessResult(data=None, errors=errors) + + # Export the cleaned data to the session 'cleaned' folder + cleanfile_result.data.export( + session_folder, f"{SessionNames.CLEANED_FOLDER}/{uuid}_", "parquet" ) - return ProcessResult(data=None, errors=errors) + errors.extend(cleanfile_result.errors) - # Export the cleaned data to the session 'cleaned' folder - cleanfile_result.data.export( - session_folder, f"{SessionNames.CLEANED_FOLDER}/{uuid}_", "parquet" - ) - errors.extend(cleanfile_result.errors) + # Enrich the data and export to the session 'enriched' folder + enrich_result = enrich_data(cleanfile_result.data, pipeline_config, metadata) + enrich_result.data.export( + session_folder, f"{SessionNames.ENRICHED_FOLDER}/{uuid}_", "parquet" + ) + errors.extend(enrich_result.errors) - # Enrich the data and export to the session 'enriched' folder - enrich_result = enrich_data(cleanfile_result.data, pipeline_config, metadata) - enrich_result.data.export( - session_folder, f"{SessionNames.ENRICHED_FOLDER}/{uuid}_", "parquet" - ) - errors.extend(enrich_result.errors) + # Degrade the data and export to the session 'degraded' folder + degraded_result = degrade_data(enrich_result.data, pipeline_config, metadata) + degraded_result.data.export( + session_folder, f"{SessionNames.DEGRADED_FOLDER}/{uuid}_", "parquet" + ) + errors.extend(degraded_result.errors) - # Degrade the data and export to the session 'degraded' folder - degraded_result = degrade_data(enrich_result.data, pipeline_config, metadata) - degraded_result.data.export( - session_folder, f"{SessionNames.DEGRADED_FOLDER}/{uuid}_", "parquet" - ) - errors.extend(degraded_result.errors) + errors.set_property("filename", file_locator.name) + errors.set_property("uuid", uuid) - errors.set_property("filename", file_locator.name) - errors.set_property("uuid", uuid) + return ProcessResult(data=degraded_result.data, errors=errors) - return ProcessResult(data=degraded_result.data, errors=errors) + else: + return ProcessResult(data=None, errors=errors) -def process_session(source_fs: FS, output_fs: FS, la_code: str): +def process_session(source_fs: FS, output_fs: FS, la_code: str, filename: str): # Before we start - load configuration for this dataset pipeline_config = load_pipeline_config() @@ -110,7 +115,7 @@ def process_session(source_fs: FS, output_fs: FS, la_code: str): # Process each incoming file processed_files = [ - process_file(locator, session_folder, pipeline_config, la_code) + process_file(locator, session_folder, pipeline_config, la_code, filename) for locator in locator_list ]