From 885a3aef2a6be6ae7e1544a7a65436cc5ab21362 Mon Sep 17 00:00:00 2001 From: Max Burnette Date: Tue, 28 Jan 2020 10:10:35 -0600 Subject: [PATCH] improve behavior and logic for complete coverage --- scripts/upload_directories_to_clowder.py | 112 ++++++++++++++++++----- 1 file changed, 88 insertions(+), 24 deletions(-) diff --git a/scripts/upload_directories_to_clowder.py b/scripts/upload_directories_to_clowder.py index d1decfe..7e5f199 100644 --- a/scripts/upload_directories_to_clowder.py +++ b/scripts/upload_directories_to_clowder.py @@ -7,6 +7,8 @@ Expects sensor-metadata git directory to be available locally for fixed metadata: git clone https://github.com/terraref/sensor-metadata.git export SENSOR_METADATA_CACHE=/path/to/sensor-metadata +e.g. on terra-clowder... + export SENSOR_METADATA_CACHE=/home/clowder/sites/ua-mac/sensor-metadata Expected data directory structure: /root_path @@ -35,6 +37,8 @@ # Set to True to test reading & cleaning a dataset from each product without actually uploading dry_run = True +# Set to True to actually upload a dataset from each product then stop +test_one = True # --------------------------------- # Clowder instance configuration @@ -56,16 +60,30 @@ root_path = "/home/clowder/sites/ua-mac" # Defines which products to upload (i.e. remove raw_data key to only upload Level_1 data) -# TODO: Currently only supports products with a timestamp level (i.e. EnvironmentLogger will not work) products_to_upload = { - "raw_data": ["stereoTop", "flirIrCamera"], - "Level_1": ["rgb_geotiff", "ir_geotiff"] + "raw_data": ["stereoTop", "flirIrCamera", "VNIR", "SWIR", "EnvironmentLogger"], + "Level_1": ["scanner3DTop"] } +# Products which only have a date level for each dataset +no_timestamps = ["EnvironmentLogger"] + # Defines start and end date to upload (inclusive) -start_date = "2019-06-01" -end_date = "2019-06-05" +start_date = "2019-04-29" +end_date = "2019-96-05" + + +def get_last_dataset_path(logfile): + lastline = None + with open(logfile, 'r') as log: + currline = log.readline() + while len(currline) > 0: + lastline = currline + currline = log.readline() + contents = lastline.split(",") + ds_path = contents[2].replace('"', '') + return ds_path # Upload a dataset to Clowder including metadata, etc. def upload_dataset(dataset_path, level, product, timestamp, sess, logfile): @@ -76,12 +94,27 @@ def upload_dataset(dataset_path, level, product, timestamp, sess, logfile): # Find and prepare the metadata clean_md = None - for f in contents: - if f.endswith("_metadata.json"): - md = load_json_file(os.path.join(dataset_path, f)) - clean_md = clean_metadata(md, product) - elif f.endswith("_metadata_cleaned.json"): - clean_md = load_json_file(os.path.join(dataset_path, f)) + if product == "scanner3DTop" and level == "Level_1": + # Special check between Level_1/raw_data for scanner3DTop only + path3d = dataset_path.replace("Level_1", "raw_data") + contents3d = os.listdir(path3d) + for f in contents3d: + if f.endswith("_metadata.json"): + md = load_json_file(os.path.join(path3d, f)) + clean_md = clean_metadata(md, product) + if dry_run: + print("...%s successfully cleaned." % os.path.join(path3d, f)) + else: + for f in contents: + if f.endswith("_metadata.json"): + md = load_json_file(os.path.join(dataset_path, f)) + clean_md = clean_metadata(md, product) + if dry_run: + print("...%s successfully cleaned." % os.path.join(dataset_path, f)) + elif f.endswith("_metadata_cleaned.json"): + clean_md = load_json_file(os.path.join(dataset_path, f)) + if dry_run: + print("...%s successfully loaded." % os.path.join(dataset_path, f)) if clean_md is None: logfile.write('%s,%s,"%s",%s\n' % (level, product, dataset_path, "ERR: No metadata found")) @@ -96,7 +129,9 @@ def upload_dataset(dataset_path, level, product, timestamp, sess, logfile): if not dry_run: dsid = build_dataset_hierarchy_crawl(clowder_host, clowder_admin_key, clowder_user, clowder_pass, clowder_space, season_name, experiment_name, product, YYYY, MM, DD, dataset_name) - logfile.write('%s,%s,"%s",%s\n' % (level, product, dataset_path, "OK: %s" % dsid)) + else: + dsid = "JustPretend" + logfile.write('%s,%s,"%s",%s\n' % (level, product, dataset_path, "OK: %s" % dsid)) # Upload metadata if not dry_run: @@ -111,8 +146,6 @@ def upload_dataset(dataset_path, level, product, timestamp, sess, logfile): "user_id": "https://terraref.ncsa.illinois.edu/clowder/api/users/%s" % clowder_userid } })) - else: - print("%s metadata successfully cleaned." % dataset_path) # Add each file for f in contents: @@ -126,12 +159,21 @@ def upload_dataset(dataset_path, level, product, timestamp, sess, logfile): return True # Walk a product directory and upload all datasets -def scan_product_directory(product_path, level, product): +def scan_product_directory(product_path, level, product, timestamped=True): if not dry_run: - logfile = open("uploaded_data_%s.csv" % product, 'w') + log_path ="log_%s.csv" % product + else: + log_path ="log_%s_DRYRUN.csv" % product + + if os.path.isfile(log_path): + last_dataset = get_last_dataset_path(log_path) + found_last_dataset = False + print("...resuming from: %s" % last_dataset) + logfile = open(log_path, 'a') else: - logfile = open("uploaded_data_%s_DRYRUN.csv" % product, 'w') - logfile.write("level,product,directory,status\n") + found_last_dataset = True + logfile = open(log_path, 'w') + logfile.write("level,product,directory,status\n") # One session for all subsequent uploads sess = requests.Session() @@ -149,11 +191,31 @@ def scan_product_directory(product_path, level, product): upload_count = 0 failed_count = 0 - timestamps = os.listdir(date_path) - timestamps.sort() - for ts in timestamps: - ts_path = os.path.join(date_path, ts) - result = upload_dataset(ts_path, level, product, ts, sess, logfile) + if timestamped: + timestamps = os.listdir(date_path) + timestamps.sort() + for ts in timestamps: + ts_path = os.path.join(date_path, ts) + if not found_last_dataset: + if ts_path == last_dataset: + print("...found resume point") + found_last_dataset = True + continue + result = upload_dataset(ts_path, level, product, ts, sess, logfile) + if result: + upload_count += 1 + if upload_count % 500 == 0: + print("......%s datasets uploaded" % upload_count) + else: + failed_count += 1 + if dry_run or test_one: break + else: + if not found_last_dataset: + if date_path == last_dataset: + print("...found resume point") + found_last_dataset = True + continue + result = upload_dataset(date_path, level, product, date, sess, logfile) if result: upload_count += 1 if upload_count % 500 == 0: @@ -162,6 +224,7 @@ def scan_product_directory(product_path, level, product): failed_count += 1 print("...done (%s datasets uploaded, %s datasets failed)" % (upload_count, failed_count)) + if dry_run or test_one: break logfile.close() @@ -185,6 +248,7 @@ def scan_product_directory(product_path, level, product): continue print("Processing %s" % product_path) - scan_product_directory(product_path, level, product) + timestamped = not (product in no_timestamps) + scan_product_directory(product_path, level, product, timestamped) print("Processing complete.")