Skip to content

Commit

Permalink
improve behavior and logic for complete coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
Max Burnette committed Jan 28, 2020
1 parent 428785f commit 885a3ae
Showing 1 changed file with 88 additions and 24 deletions.
112 changes: 88 additions & 24 deletions scripts/upload_directories_to_clowder.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
Expects sensor-metadata git directory to be available locally for fixed metadata:
git clone https://github.com/terraref/sensor-metadata.git
export SENSOR_METADATA_CACHE=/path/to/sensor-metadata
e.g. on terra-clowder...
export SENSOR_METADATA_CACHE=/home/clowder/sites/ua-mac/sensor-metadata
Expected data directory structure:
/root_path
Expand Down Expand Up @@ -35,6 +37,8 @@

# Set to True to test reading & cleaning a dataset from each product without actually uploading
dry_run = True
# Set to True to actually upload a dataset from each product then stop
test_one = True

# ---------------------------------
# Clowder instance configuration
Expand All @@ -56,16 +60,30 @@
root_path = "/home/clowder/sites/ua-mac"

# Defines which products to upload (i.e. remove raw_data key to only upload Level_1 data)
# TODO: Currently only supports products with a timestamp level (i.e. EnvironmentLogger will not work)
products_to_upload = {
"raw_data": ["stereoTop", "flirIrCamera"],
"Level_1": ["rgb_geotiff", "ir_geotiff"]
"raw_data": ["stereoTop", "flirIrCamera", "VNIR", "SWIR", "EnvironmentLogger"],
"Level_1": ["scanner3DTop"]
}

# Products which only have a date level for each dataset
no_timestamps = ["EnvironmentLogger"]

# Defines start and end date to upload (inclusive)
start_date = "2019-06-01"
end_date = "2019-06-05"
start_date = "2019-04-29"
end_date = "2019-96-05"


def get_last_dataset_path(logfile):
lastline = None
with open(logfile, 'r') as log:
currline = log.readline()
while len(currline) > 0:
lastline = currline
currline = log.readline()

contents = lastline.split(",")
ds_path = contents[2].replace('"', '')
return ds_path

# Upload a dataset to Clowder including metadata, etc.
def upload_dataset(dataset_path, level, product, timestamp, sess, logfile):
Expand All @@ -76,12 +94,27 @@ def upload_dataset(dataset_path, level, product, timestamp, sess, logfile):

# Find and prepare the metadata
clean_md = None
for f in contents:
if f.endswith("_metadata.json"):
md = load_json_file(os.path.join(dataset_path, f))
clean_md = clean_metadata(md, product)
elif f.endswith("_metadata_cleaned.json"):
clean_md = load_json_file(os.path.join(dataset_path, f))
if product == "scanner3DTop" and level == "Level_1":
# Special check between Level_1/raw_data for scanner3DTop only
path3d = dataset_path.replace("Level_1", "raw_data")
contents3d = os.listdir(path3d)
for f in contents3d:
if f.endswith("_metadata.json"):
md = load_json_file(os.path.join(path3d, f))
clean_md = clean_metadata(md, product)
if dry_run:
print("...%s successfully cleaned." % os.path.join(path3d, f))
else:
for f in contents:
if f.endswith("_metadata.json"):
md = load_json_file(os.path.join(dataset_path, f))
clean_md = clean_metadata(md, product)
if dry_run:
print("...%s successfully cleaned." % os.path.join(dataset_path, f))
elif f.endswith("_metadata_cleaned.json"):
clean_md = load_json_file(os.path.join(dataset_path, f))
if dry_run:
print("...%s successfully loaded." % os.path.join(dataset_path, f))

if clean_md is None:
logfile.write('%s,%s,"%s",%s\n' % (level, product, dataset_path, "ERR: No metadata found"))
Expand All @@ -96,7 +129,9 @@ def upload_dataset(dataset_path, level, product, timestamp, sess, logfile):
if not dry_run:
dsid = build_dataset_hierarchy_crawl(clowder_host, clowder_admin_key, clowder_user, clowder_pass, clowder_space,
season_name, experiment_name, product, YYYY, MM, DD, dataset_name)
logfile.write('%s,%s,"%s",%s\n' % (level, product, dataset_path, "OK: %s" % dsid))
else:
dsid = "JustPretend"
logfile.write('%s,%s,"%s",%s\n' % (level, product, dataset_path, "OK: %s" % dsid))

# Upload metadata
if not dry_run:
Expand All @@ -111,8 +146,6 @@ def upload_dataset(dataset_path, level, product, timestamp, sess, logfile):
"user_id": "https://terraref.ncsa.illinois.edu/clowder/api/users/%s" % clowder_userid
}
}))
else:
print("%s metadata successfully cleaned." % dataset_path)

# Add each file
for f in contents:
Expand All @@ -126,12 +159,21 @@ def upload_dataset(dataset_path, level, product, timestamp, sess, logfile):
return True

# Walk a product directory and upload all datasets
def scan_product_directory(product_path, level, product):
def scan_product_directory(product_path, level, product, timestamped=True):
if not dry_run:
logfile = open("uploaded_data_%s.csv" % product, 'w')
log_path ="log_%s.csv" % product
else:
log_path ="log_%s_DRYRUN.csv" % product

if os.path.isfile(log_path):
last_dataset = get_last_dataset_path(log_path)
found_last_dataset = False
print("...resuming from: %s" % last_dataset)
logfile = open(log_path, 'a')
else:
logfile = open("uploaded_data_%s_DRYRUN.csv" % product, 'w')
logfile.write("level,product,directory,status\n")
found_last_dataset = True
logfile = open(log_path, 'w')
logfile.write("level,product,directory,status\n")

# One session for all subsequent uploads
sess = requests.Session()
Expand All @@ -149,11 +191,31 @@ def scan_product_directory(product_path, level, product):
upload_count = 0
failed_count = 0

timestamps = os.listdir(date_path)
timestamps.sort()
for ts in timestamps:
ts_path = os.path.join(date_path, ts)
result = upload_dataset(ts_path, level, product, ts, sess, logfile)
if timestamped:
timestamps = os.listdir(date_path)
timestamps.sort()
for ts in timestamps:
ts_path = os.path.join(date_path, ts)
if not found_last_dataset:
if ts_path == last_dataset:
print("...found resume point")
found_last_dataset = True
continue
result = upload_dataset(ts_path, level, product, ts, sess, logfile)
if result:
upload_count += 1
if upload_count % 500 == 0:
print("......%s datasets uploaded" % upload_count)
else:
failed_count += 1
if dry_run or test_one: break
else:
if not found_last_dataset:
if date_path == last_dataset:
print("...found resume point")
found_last_dataset = True
continue
result = upload_dataset(date_path, level, product, date, sess, logfile)
if result:
upload_count += 1
if upload_count % 500 == 0:
Expand All @@ -162,6 +224,7 @@ def scan_product_directory(product_path, level, product):
failed_count += 1

print("...done (%s datasets uploaded, %s datasets failed)" % (upload_count, failed_count))
if dry_run or test_one: break

logfile.close()

Expand All @@ -185,6 +248,7 @@ def scan_product_directory(product_path, level, product):
continue

print("Processing %s" % product_path)
scan_product_directory(product_path, level, product)
timestamped = not (product in no_timestamps)
scan_product_directory(product_path, level, product, timestamped)

print("Processing complete.")

0 comments on commit 885a3ae

Please sign in to comment.