Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Preprocessing of articles stops if curation folder does not exist for an article (Issue #105) #107

5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ ReBACH is run via the command line as outlined in the 'How to Run' section of th
- user - required: Your user email address on AP Trust
- token - required: Your user secret token on AP Trust
- items_per_page - Maximum number of object to be return per page by the API
- alt_identifier_starts_with - Prefix for alternate identifier in AP Trust
- alt_identifier_starts_with - Prefix for alternate identifier in AP Trust
- retries - required: Number of times the script should retry API or file system calls if it is unable to connect. Defaults to 3
- retries_wait - required: Number of seconds the script should wait between call retries if it is unable to connect. Defaults to 10
- preservation_storage_location - required: The file system location where the preservation folders/packages should be created
Expand All @@ -54,9 +54,12 @@ These parameters are only available on the command line.
|`--xfg` | The path to the configuration file to use.|
|`--ids` | A comma-separated list of article IDs to process. E.g., 12345,12356|
|`--continue-on-error`| If there is an error during the item processing stage for a given item, skip it and continue to the next item.|
|`--dry-run` | Runs all operations, excluding any that involve writing any storage medium |

## Execution notes
- ReBACH will attempt to fetch all items in the institutional instance. Items that are not published (curation_status != 'approved') will be ignored.
- Items that are embargoed are also fetched however due to limitations in the API, only the latest version can be fetched until the embargo expires or is removed.
- While fetching, ReBACH checks preservation remote storages for a preserved copy of each item. If a preservation copy of an item is found and confirmed, the item will ignored in subsequent stages.
- Checking preservation final remote storage for a preserved copy of an article requires size of the curation storage folder of the article. If an error occurs while calculating the size of an article curation folder, the error will be recorded and execution will stop except if the `--continue-on-error` flag is set.
- When processing collections, ReBACH records which items are part of the collection by appending them to collection's JSON as returned by the Figshare API.
- If an item encounters errors, it will not be processed and any partial files are deleted in preservation staging storage.
25 changes: 21 additions & 4 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ def get_args():
help='list of article and/or collection IDs to process. E.g., "2323,4353,5454"')
parser.add_argument('--continue-on-error', action='store_true',
help='If an item encounters an error during the processing stage, continue to the next item.')
parser.add_argument('--dry-run', action='store_true',
help='Fetch, match and verify items only. Do not download, delete, or upload to preservation any files.')
args = parser.parse_args()


Expand Down Expand Up @@ -72,6 +74,7 @@ def main():
config_obj = Config(env_file)

config_obj.add_setting(name='continue-on-error', value=args.continue_on_error)
config_obj.add_setting(name='dry-run', value=args.dry_run)

figshare_config = config_obj.figshare_config()
system_config = config_obj.system_config()
Expand Down Expand Up @@ -149,6 +152,8 @@ def main():

already_preserved_articles_count = len(already_preserved_counts_dict['already_preserved_article_ids'])
already_preserved_versions_count = already_preserved_counts_dict['already_preserved_versions']
articles_with_error_count = len(already_preserved_counts_dict['articles_with_error'])
article_versions_with_error_count = already_preserved_counts_dict['article_versions_with_error']
published_articles_count = 0
published_articles_versions_count = 0
published_unpublished_count = 0
Expand All @@ -161,8 +166,9 @@ def main():

log.write_log_in_file('info', "Fetched: "
+ f"Total articles: {published_unpublished_count}, "
+ f"Published articles: {published_articles_count + already_preserved_articles_count}, "
+ f"Published article versions: {published_articles_versions_count + already_preserved_versions_count}",
+ f"Published articles: {published_articles_count + already_preserved_articles_count + articles_with_error_count}, "
+ "Published article versions: "
+ f"{published_articles_versions_count + already_preserved_versions_count + article_versions_with_error_count}",
True)
print(" ")

Expand Down Expand Up @@ -199,15 +205,26 @@ def main():

log.write_log_in_file('info',
"Total published articles/article versions: \t\t\t\t\t"
+ f'{published_articles_count + already_preserved_articles_count} / '
+ f'{published_articles_versions_count + already_preserved_versions_count}',
+ f'{published_articles_count + already_preserved_articles_count + articles_with_error_count} / '
+ f'{published_articles_versions_count + already_preserved_versions_count + article_versions_with_error_count}',
True)

log.write_log_in_file('info',
"Total count of already preserved (skipped) articles / article versions: \t\t"
+ f'{already_preserved_articles_count} / {already_preserved_versions_count}',
True)

log.write_log_in_file('info',
"Total count of articles with fetch error / articles: \t\t\t\t"
+ f'{articles_with_error_count} / {published_unpublished_count}',
True)

log.write_log_in_file('info',
"Total count of article versions with fetch error / article versions: \t\t"
+ f'{article_versions_with_error_count} / '
+ f'{published_articles_versions_count + already_preserved_versions_count + article_versions_with_error_count}',
True)

if article_obj.processor.duplicate_bag_in_preservation_storage_count > 0:
log.write_log_in_file('warning',
f'Bagger found {article_obj.processor.duplicate_bag_in_preservation_storage_count} duplicate article(s)',
Expand Down
71 changes: 56 additions & 15 deletions figshare/Article.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ def __init__(self, config, log, ids):
self.no_matched = 0
self.no_unmatched = 0
self.already_preserved_counts_dict = {'already_preserved_article_ids': set(), 'already_preserved_versions': 0,
'wasabi_preserved_versions': 0, 'ap_trust_preserved_versions': 0}
'wasabi_preserved_versions': 0, 'ap_trust_preserved_versions': 0,
'articles_with_error': set(), 'article_versions_with_error': 0}
self.skipped_article_versions = {}
self.processor = Integration(self.config_obj, self.logs)

Expand Down Expand Up @@ -260,6 +261,21 @@ def __get_article_metadata_by_version(self, version, article_id):
if (get_response.status_code == 200):
version_data = get_response.json()
payload_size = calculate_payload_size(self.system_config, version_data)

if payload_size == 0:
if self.system_config['continue-on-error'] == "False":
self.logs.write_log_in_file("error",
f"Curation folder for article {article_id} version {version['version']} not found.",
True)
self.logs.write_log_in_file("info", "Aborting execution.", True, True)
self.already_preserved_counts_dict['articles_with_error'].add(article_id)
self.already_preserved_counts_dict['article_versions_with_error'] += 1
self.logs.write_log_in_file("error",
f"Curation folder for article {article_id} version {version['version']} not found."
+ " Article version will be skipped.",
True)
return None

total_file_size = version_data['size']
files = []
error = ""
Expand All @@ -284,6 +300,7 @@ def __get_article_metadata_by_version(self, version, article_id):
wasabi_preserved_version_md5, wasabi_preserved_size = check_wasabi(article_id, version['version'])

# Compare hashes
# Checking both remote storages
if compare_hash(version_md5, wasabi_preserved_version_md5) and compare_hash(version_md5, preserved_version_md5):
already_preserved = in_ap_trust = True
self.already_preserved_counts_dict['already_preserved_versions'] += 1
Expand All @@ -310,7 +327,7 @@ def __get_article_metadata_by_version(self, version, article_id):
self.already_preserved_counts_dict['ap_trust_preserved_versions'] += 1
self.logs.write_log_in_file("info",
f"Article {article_id} version {version['version']} "
+ "already preserved in preservation staging remote storage.",
+ "already preserved in preservation final remote storage.",
True)

if already_preserved:
Expand Down Expand Up @@ -660,7 +677,10 @@ def __check_file_hash(self, files, version_data, folder_path):
# delete directory if validation failed.
if (delete_folder is True):
self.logs.write_log_in_file("error", f"Validation failed, deleting {preservation_storage_location + folder_path}.", True)
self.delete_folder(preservation_storage_location + folder_path)
if self.system_config['dry-run'] == 'False':
self.delete_folder(preservation_storage_location + folder_path)
else:
self.logs.write_log_in_file("info", "*Dry Run* Folder not deleted.", True)
process_article = True

return process_article
Expand Down Expand Up @@ -889,8 +909,7 @@ def __final_process(self, check_files, copy_files, check_dir, version_data, fold
except Exception as e:
self.logs.write_log_in_file("error", f"{str(e)} for {'_'.join(os.path.basename(folder_name).split('_')[0:-1])}" , True)
if self.system_config['continue-on-error'] == "False":
self.logs.write_log_in_file("info", "Aborting execution.", True)
exit()
self.logs.write_log_in_file("info", "Aborting execution.", True, True)
delete_now = True

# check if download process has error or not.
Expand Down Expand Up @@ -1008,8 +1027,14 @@ def process_articles(self, articles):

if (version_data["matched"] is True):
self.logs.write_log_in_file("info", f"------- Processing article {article} version {version_data['version']}.", True)

# call pre process script function for each matched item.
value_pre_process = self.pre_process_script_function()
if self.system_config['dry-run'] == 'False':
value_pre_process = self.pre_process_script_function()
else:
value_pre_process = 0
self.logs.write_log_in_file("info", "*Dry Run* Skipping pre processing.", True)

if (value_pre_process == 0):
self.logs.write_log_in_file("info", "Pre-processing script finished successfully.", True)
# check main folder exists in preservation storage.
Expand All @@ -1026,24 +1051,40 @@ def process_articles(self, articles):
else:
self.logs.write_log_in_file("info", "Exists and is empty", True)
check_files = False
# delete folder if validation fails
self.delete_folder(check_dir)
# call post process script function for each matched item. Code 5 corresponds to step 5 of S4.4 in the spec.
value_post_process = self.processor.post_process_script_function("Article", check_dir, value_pre_process, 5)
if (value_post_process != 0):
self.logs.write_log_in_file("error", f"{version_data['id']} version {version_data['version']} - "
+ "Post-processing script error found.", True)

if self.system_config['dry-run'] == 'False':
# delete folder if validation fails
self.delete_folder(check_dir)
# call post process script function for each matched item. Code 5 corresponds to step 5 of S4.4 in the spec.
value_post_process = self.processor.post_process_script_function("Article", check_dir, value_pre_process, 5)
if (value_post_process != 0):
self.logs.write_log_in_file("error", f"{version_data['id']} version {version_data['version']} - "
+ "Post-processing script error found.", True)
else:
self.logs.write_log_in_file("info", "*Dry Run* File download and post-processing with "
+ f"{self.system_config['post_process_script_command']} skipped.", True)

break
else:
self.logs.write_log_in_file("info", "Does not exist. Folder will be created", True)
value_post_process = 0
if self.system_config['dry-run'] == 'False':
self.logs.write_log_in_file("info", "Does not exist. Folder will be created", True)
else:
self.logs.write_log_in_file("info", "*Dru Run* Does not exist. Folder will not be created", True)

# end check main folder exists in preservation storage.
# check required files exist in curation UAL_RDM folder
self.logs.write_log_in_file("info", "Checking required files exist in associated curation "
+ f"folder {curation_storage_location}.", True)
copy_files = self.__can_copy_files(version_data)
if self.__final_process(check_files, copy_files, check_dir, version_data, folder_name, version_no, value_pre_process):

if self.system_config['dry-run'] == 'False':
if self.__final_process(check_files, copy_files, check_dir, version_data, folder_name, version_no, value_pre_process):
processed_count += 1
else:
processed_count += 1
self.logs.write_log_in_file("info", "*Dry Run* File download and post-processing with "
+ f"{self.system_config['post_process_script_command']} skipped.", True)
else:
self.logs.write_log_in_file("error", "Pre-processing script failed. Running post-processing script.", True)
# call post process script function for each matched item.
Expand Down
18 changes: 13 additions & 5 deletions figshare/Collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,13 +302,21 @@ def process_collections(self, collections):
version["license"] = json.loads('{"value": 2,"name": "CC0","url": "https://creativecommons.org/publicdomain/zero/1.0/"}')

self.logs.write_log_in_file("info", f"------- Processing collection {collection} version {version['version']}.", True)
self.__save_json_in_metadata(collection, version, folder_name)
collection_preservation_path = self.preservation_storage_location + os.path.basename(os.path.dirname(os.path.dirname(folder_name)))
value_post_process = self.processor.post_process_script_function("Collection", collection_preservation_path)
if (value_post_process != 0):
self.logs.write_log_in_file("error", f"collection {collection} - post-processing script failed.", True)

if self.system_config['dry-run'] == 'False':
self.__save_json_in_metadata(collection, version, folder_name)
collection_preservation_path = self.preservation_storage_location + \
os.path.basename(os.path.dirname(os.path.dirname(folder_name)))
value_post_process = self.processor.post_process_script_function("Collection", collection_preservation_path)
if (value_post_process != 0):
self.logs.write_log_in_file("error", f"collection {collection} - post-processing script failed.", True)
else:
processed_count += 1
else:
self.logs.write_log_in_file("info", "*Dry Run* File download and post-processing with "
+ f"{self.system_config['post_process_script_command']} skipped.", True)
processed_count += 1

return processed_count, self.already_preserved_counts_dict

"""
Expand Down
Loading
Loading