Skip to content

Commit

Permalink
ENH: Improve output validation/upload logging
Browse files Browse the repository at this point in the history
  • Loading branch information
cortadocodes committed Dec 17, 2024
1 parent 98109e1 commit 2a5a0f7
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 15 deletions.
40 changes: 29 additions & 11 deletions octue/resources/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,7 @@ def set_up_periodic_monitor_message(self, create_monitor_message, period=60):
def finalise(self, upload_output_datasets_to=None, use_signed_urls=None):
"""Validate the output values and output manifest and, if the analysis produced an output manifest, upload its
output datasets to a unique subdirectory within the analysis's output location. This output location can be
overridden by providing a different cloud path via the `upload_output_datasets_to` parameter. Either way, the
dataset paths in the output manifest are replaced with signed URLs for easier, expiring access.
overridden by providing a different cloud path via the `upload_output_datasets_to` parameter.
:param str|None upload_output_datasets_to: If not provided but an output location was provided at instantiation, upload any output datasets into a unique subdirectory within this output location; if provided, upload into this location instead. The output manifest is updated with the upload locations.
:param bool|None use_signed_urls: if `True`, use signed URLs instead of cloud URIs for dataset paths in the output manifest; if `None`, use the value of `use_signed_urls_for_output_datasets` given at instantiation
Expand All @@ -150,34 +149,53 @@ def finalise(self, upload_output_datasets_to=None, use_signed_urls=None):
serialised_strands = {"output_values": None, "output_manifest": None}

if self.output_values:
logger.info("The analysis produced output values.")
serialised_strands["output_values"] = json.dumps(self.output_values, cls=OctueJSONEncoder)
else:
logger.info("The analysis didn't produce output values.")

if self.output_manifest:
logger.info("The analysis produced an output manifest.")

if not (self.output_location or upload_output_datasets_to):
logger.info("No output location was set in the app configuration - can't upload output datasets.")

serialised_strands["output_manifest"] = self.output_manifest.serialise()

else:
logger.info("The analysis didn't produce an output manifest.")

self.twine.validate(**serialised_strands)
self._finalised = True
logger.info("Validated output values and output manifest against the twine.")
logger.info("Validated outputs against the twine.")

if self.output_manifest and (self.output_location or upload_output_datasets_to):
self._upload_output_datasets(upload_output_datasets_to, use_signed_urls)

def _upload_output_datasets(self, upload_output_datasets_to, use_signed_urls):
"""Upload the output manifest's datasets.
:param str|None upload_output_datasets_to: If not provided but an output location was provided at instantiation, upload any output datasets into a unique subdirectory within this output location; if provided, upload into this location instead. The output manifest is updated with the upload locations.
:param bool|None use_signed_urls: if `True`, use signed URLs instead of cloud URIs for dataset paths in the output manifest; if `None`, use the value of `use_signed_urls_for_output_datasets` given at instantiation
:return None:
"""
# Use a unique subdirectory in the output location given at instantiation (if given) if no
# `upload_output_datasets_to` is provided.
if self.output_location and not upload_output_datasets_to:
if not upload_output_datasets_to:
upload_output_datasets_to = storage.path.join(self.output_location, coolname.generate_slug())

if use_signed_urls is None:
use_signed_urls = self.use_signed_urls_for_output_datasets

# If there isn't both a non-None output manifest and upload location, nothing is uploaded.
if not (upload_output_datasets_to and getattr(self, "output_manifest")):
return
logger.info("Beginning upload of output datasets to %r...", upload_output_datasets_to)

for name, dataset in self.output_manifest.datasets.items():
dataset.upload(cloud_path=storage.path.join(upload_output_datasets_to, name))

if use_signed_urls is None:
use_signed_urls = self.use_signed_urls_for_output_datasets

if use_signed_urls:
self.output_manifest.use_signed_urls_for_datasets()

logger.info("Uploaded output datasets to %r.", upload_output_datasets_to)
logger.info("Finished uploading output datasets to %r.", upload_output_datasets_to)

def _calculate_strand_hashes(self, strands):
"""Calculate the hashes of the strands specified in the HASH_FUNCTIONS constant.
Expand Down
9 changes: 5 additions & 4 deletions tests/cloud/pub_sub/test_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -708,10 +708,11 @@ def test_child_messages_can_be_recorded_by_parent(self):

# Check that the child's messages have been recorded by the parent.
self.assertEqual(parent.received_events[0]["event"]["kind"], "delivery_acknowledgement")
self.assertEqual(parent.received_events[1]["event"]["kind"], "log_record")
self.assertEqual(parent.received_events[2]["event"]["kind"], "log_record")
self.assertEqual(parent.received_events[3]["event"]["kind"], "log_record")
self.assertEqual(parent.received_events[4]["event"], {"kind": "result", "output_values": "Hello! It worked!"})

for i in range(1, 6):
self.assertEqual(parent.received_events[i]["event"]["kind"], "log_record")

self.assertEqual(parent.received_events[6]["event"], {"kind": "result", "output_values": "Hello! It worked!"})

def test_child_exception_message_can_be_recorded_by_parent(self):
"""Test that the parent can record exceptions raised by the child."""
Expand Down

0 comments on commit 2a5a0f7

Please sign in to comment.