Skip to content

Commit

Permalink
add filename argument to school census cli
Browse files Browse the repository at this point in the history
  • Loading branch information
patrick-troy committed Aug 30, 2024
1 parent 7ec88e1 commit c0cb5c1
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 62 deletions.
10 changes: 8 additions & 2 deletions liiatools/school_census_pipeline/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,14 @@ def school_census():
"-i",
type=click.Path(exists=True, file_okay=False, readable=True),
)
@click.option(
"--filename",
"-f",
type=str,
help="Name of files you want to clean"
)
@click_log.simple_verbosity_option(log)
def pipeline(input, la_code, output):
def pipeline(input, la_code, output, filename):
"""Runs the full pipeline on a file or folder"""

# Source FS is the filesystem containing the input files
Expand All @@ -48,4 +54,4 @@ def pipeline(input, la_code, output):
# Get the output filesystem
output_fs = open_fs(output)

process_session(source_fs, output_fs, la_code)
process_session(source_fs, output_fs, la_code, filename)
125 changes: 65 additions & 60 deletions liiatools/school_census_pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,78 +24,83 @@ def process_file(
session_folder: FS,
pipeline_config: PipelineConfig,
la_code: str,
filename: str = None,
) -> ProcessResult:
errors = ErrorContainer()
year = pl.discover_year(file_locator)
if year is None:
errors.append(
dict(
type="MissingYear",
message="Could not find a year in the filename or path",
filename=file_locator.name,
if filename is None or filename in file_locator.name:
year = pl.discover_year(file_locator)
if year is None:
errors.append(
dict(
type="MissingYear",
message="Could not find a year in the filename or path",
filename=file_locator.name,
)
)
)
return ProcessResult(data=None, errors=errors)

term = pl.discover_term(file_locator)
if term is None:
errors.append(
dict(
type="MissingTerm",
message="Could not find a term in the filename or path",
filename=file_locator.name,
return ProcessResult(data=None, errors=errors)

term = pl.discover_term(file_locator)
if term is None:
errors.append(
dict(
type="MissingTerm",
message="Could not find a term in the filename or path",
filename=file_locator.name,
)
)
)
return ProcessResult(data=None, errors=errors)

# We save these files based on the session UUID - so UUID must exist
uuid = file_locator.meta["uuid"]

# Load schema and set on processing metadata
schema = load_schema(year, term)
metadata = dict(year=year, schema=schema, la_code=la_code, term=term)

# Normalise the data and export to the session 'cleaned' folder
try:
cleanfile_result = task_cleanfile(file_locator, schema)
except Exception as e:
logger.exception(f"Error cleaning file {file_locator.name}")
errors.append(
dict(
type="StreamError",
message="Failed to clean file. Check log files for technical errors.",
filename=file_locator.name,
return ProcessResult(data=None, errors=errors)

# We save these files based on the session UUID - so UUID must exist
uuid = file_locator.meta["uuid"]

# Load schema and set on processing metadata
schema = load_schema(year, term)
metadata = dict(year=year, schema=schema, la_code=la_code, term=term)

# Normalise the data and export to the session 'cleaned' folder
try:
cleanfile_result = task_cleanfile(file_locator, schema)
except Exception as e:
logger.exception(f"Error cleaning file {file_locator.name}")
errors.append(
dict(
type="StreamError",
message="Failed to clean file. Check log files for technical errors.",
filename=file_locator.name,
)
)
return ProcessResult(data=None, errors=errors)

# Export the cleaned data to the session 'cleaned' folder
cleanfile_result.data.export(
session_folder, f"{SessionNames.CLEANED_FOLDER}/{uuid}_", "parquet"
)
return ProcessResult(data=None, errors=errors)
errors.extend(cleanfile_result.errors)

# Export the cleaned data to the session 'cleaned' folder
cleanfile_result.data.export(
session_folder, f"{SessionNames.CLEANED_FOLDER}/{uuid}_", "parquet"
)
errors.extend(cleanfile_result.errors)
# Enrich the data and export to the session 'enriched' folder
enrich_result = enrich_data(cleanfile_result.data, pipeline_config, metadata)
enrich_result.data.export(
session_folder, f"{SessionNames.ENRICHED_FOLDER}/{uuid}_", "parquet"
)
errors.extend(enrich_result.errors)

# Enrich the data and export to the session 'enriched' folder
enrich_result = enrich_data(cleanfile_result.data, pipeline_config, metadata)
enrich_result.data.export(
session_folder, f"{SessionNames.ENRICHED_FOLDER}/{uuid}_", "parquet"
)
errors.extend(enrich_result.errors)
# Degrade the data and export to the session 'degraded' folder
degraded_result = degrade_data(enrich_result.data, pipeline_config, metadata)
degraded_result.data.export(
session_folder, f"{SessionNames.DEGRADED_FOLDER}/{uuid}_", "parquet"
)
errors.extend(degraded_result.errors)

# Degrade the data and export to the session 'degraded' folder
degraded_result = degrade_data(enrich_result.data, pipeline_config, metadata)
degraded_result.data.export(
session_folder, f"{SessionNames.DEGRADED_FOLDER}/{uuid}_", "parquet"
)
errors.extend(degraded_result.errors)
errors.set_property("filename", file_locator.name)
errors.set_property("uuid", uuid)

errors.set_property("filename", file_locator.name)
errors.set_property("uuid", uuid)
return ProcessResult(data=degraded_result.data, errors=errors)

return ProcessResult(data=degraded_result.data, errors=errors)
else:
return ProcessResult(data=None, errors=errors)


def process_session(source_fs: FS, output_fs: FS, la_code: str):
def process_session(source_fs: FS, output_fs: FS, la_code: str, filename: str):
# Before we start - load configuration for this dataset
pipeline_config = load_pipeline_config()

Expand All @@ -110,7 +115,7 @@ def process_session(source_fs: FS, output_fs: FS, la_code: str):

# Process each incoming file
processed_files = [
process_file(locator, session_folder, pipeline_config, la_code)
process_file(locator, session_folder, pipeline_config, la_code, filename)
for locator in locator_list
]

Expand Down

0 comments on commit c0cb5c1

Please sign in to comment.