diff --git a/liiatools/common/constants.py b/liiatools/common/constants.py index 2c5b52e..2747981 100644 --- a/liiatools/common/constants.py +++ b/liiatools/common/constants.py @@ -9,7 +9,7 @@ class ProcessNames(StrEnum): SESSIONS_FOLDER = "sessions" ARCHIVE_FOLDER = "archive" - CURRENT_FOLDER = "cur" + CURRENT_FOLDER = "c" EXPORT_FOLDER = "exp" diff --git a/liiatools/school_census_pipeline/pipeline.py b/liiatools/school_census_pipeline/pipeline.py index 58bc1eb..5482c66 100644 --- a/liiatools/school_census_pipeline/pipeline.py +++ b/liiatools/school_census_pipeline/pipeline.py @@ -112,45 +112,45 @@ def process_session(source_fs: FS, output_fs: FS, la_code: str, filename: str): pipeline_config = load_pipeline_config() # Ensure all processing folders exist - # pl.create_process_folders(output_fs) - # - # # Create session folder - # session_folder, session_id = pl.create_session_folder(output_fs) - # - # # Move files into session folder - # locator_list = pl.move_files_for_processing(source_fs, session_folder) - # - # # Process each incoming file - # processed_files = [ - # process_file(locator, session_folder, pipeline_config, la_code, filename) - # for locator in locator_list - # ] + pl.create_process_folders(output_fs) + + # Create session folder + session_folder, session_id = pl.create_session_folder(output_fs) + + # Move files into session folder + locator_list = pl.move_files_for_processing(source_fs, session_folder) + + # Process each incoming file + processed_files = [ + process_file(locator, session_folder, pipeline_config, la_code, filename) + for locator in locator_list + ] # Add processed files to archive archive = DataframeArchive( output_fs.opendir(ProcessNames.ARCHIVE_FOLDER), pipeline_config ) - # for result in processed_files: - # if result.data: - # archive.add(result.data) - # - # # Write the error summary - # error_summary = ErrorContainer( - # [error for result in processed_files for error in result.errors] - # ) - # error_summary.set_property("session_id", session_id) - # with session_folder.open("error_summary.csv", "w") as FILE: - # error_summary.to_dataframe().to_csv(FILE, index=False) + for result in processed_files: + if result.data: + archive.add(result.data) + + # Write the error summary + error_summary = ErrorContainer( + [error for result in processed_files for error in result.errors] + ) + error_summary.set_property("session_id", session_id) + with session_folder.open("error_summary.csv", "w") as FILE: + error_summary.to_dataframe().to_csv(FILE, index=False) # Export the current snapshot of the archive current_data = archive.current() current_data.export( - output_fs.opendir(ProcessNames.CURRENT_FOLDER), "sc_c_", "csv" + output_fs.opendir(ProcessNames.CURRENT_FOLDER), "s_", "csv" ) # Create the different reports - export_folder = output_fs.opendir(ProcessNames.EXPORT_FOLDER) - for report in ["PAN"]: - report_data = prepare_export(current_data, pipeline_config, profile=report) - report_folder = export_folder.makedirs(report, recreate=True) - report_data.export(report_folder, "sc_", "csv") + # export_folder = output_fs.opendir(ProcessNames.EXPORT_FOLDER) + # for report in ["PAN"]: + # report_data = prepare_export(current_data, pipeline_config, profile=report) + # report_folder = export_folder.makedirs(report, recreate=True) + # report_data.export(report_folder, "sc_", "csv") diff --git a/liiatools/school_census_pipeline/spec/__init__.py b/liiatools/school_census_pipeline/spec/__init__.py index ccdf527..be4b582 100644 --- a/liiatools/school_census_pipeline/spec/__init__.py +++ b/liiatools/school_census_pipeline/spec/__init__.py @@ -95,12 +95,3 @@ def load_schema(year: int, term: Term) -> DataSchema: # Now we can parse the full schema into a DataSchema object from the dict return DataSchema(**term_schema) - - -year = 2018 -term = "Autumn" -x = ['pupilonrolltableid', 'NativeId', 'pupilonrollorderseqcolumn', 'sourceid', 'schoolcensustableid', 'upn', 'formerupn', 'surname', 'forename', 'middlenames', 'preferredsurname', 'uniquelearnernumber', 'formersurname', 'dob', 'hoursatsetting', 'schoollunchtaken', 'parttime', 'entrydate', 'topupfunding', 'termlysessionspossible', 'termlysessionsauthorised', 'termlysessionsunauthorised', 'qualhrs', 'senunitindicator', 'resourcedprovisionindicator', 'nonqualhrs', 'ftemp', 'moveoffrollflag', 'fundedhours', 'missingaddress', 'duplicatenotfunded', 'summerhalfterm2sessionspossible', 'extendedhours', 'thirtyhourcode', 'summerhalfterm2sessionsauthorised', 'summerhalfterm2sessionsunauthorised', 'dafindicator', 'gender', 'yssa', 'servicechild', 'language', 'classtype', 'enrolstatus', 'boarder', 'plaa', 'senprovision', 'ncyearactual', 'mathsgcsehighestpriorattainment', 'mathsgcsepriorattainmentyeargroup', 'englishgcsehighestpriorattainment', 'englishgcsepriorattainmentyeargroup', 'mathsgcsefundingexemption', 'englishgcsefundingexemption'] -y = load_schema(year, term).column_map["pupilonroll"].keys() - -print(year, term, set(y) - set(x)) -print(year, term, set(x) - set(y))