From 2a5a0f7c1e5d25d0bdc1eb1d2dca6eb49ec6b1d5 Mon Sep 17 00:00:00 2001 From: cortadocodes Date: Tue, 17 Dec 2024 12:46:59 +0000 Subject: [PATCH] ENH: Improve output validation/upload logging --- octue/resources/analysis.py | 40 +++++++++++++++++++++-------- tests/cloud/pub_sub/test_service.py | 9 ++++--- 2 files changed, 34 insertions(+), 15 deletions(-) diff --git a/octue/resources/analysis.py b/octue/resources/analysis.py index c401db7cd..37f8ccb85 100644 --- a/octue/resources/analysis.py +++ b/octue/resources/analysis.py @@ -140,8 +140,7 @@ def set_up_periodic_monitor_message(self, create_monitor_message, period=60): def finalise(self, upload_output_datasets_to=None, use_signed_urls=None): """Validate the output values and output manifest and, if the analysis produced an output manifest, upload its output datasets to a unique subdirectory within the analysis's output location. This output location can be - overridden by providing a different cloud path via the `upload_output_datasets_to` parameter. Either way, the - dataset paths in the output manifest are replaced with signed URLs for easier, expiring access. + overridden by providing a different cloud path via the `upload_output_datasets_to` parameter. :param str|None upload_output_datasets_to: If not provided but an output location was provided at instantiation, upload any output datasets into a unique subdirectory within this output location; if provided, upload into this location instead. The output manifest is updated with the upload locations. :param bool|None use_signed_urls: if `True`, use signed URLs instead of cloud URIs for dataset paths in the output manifest; if `None`, use the value of `use_signed_urls_for_output_datasets` given at instantiation @@ -150,34 +149,53 @@ def finalise(self, upload_output_datasets_to=None, use_signed_urls=None): serialised_strands = {"output_values": None, "output_manifest": None} if self.output_values: + logger.info("The analysis produced output values.") serialised_strands["output_values"] = json.dumps(self.output_values, cls=OctueJSONEncoder) + else: + logger.info("The analysis didn't produce output values.") if self.output_manifest: + logger.info("The analysis produced an output manifest.") + + if not (self.output_location or upload_output_datasets_to): + logger.info("No output location was set in the app configuration - can't upload output datasets.") + serialised_strands["output_manifest"] = self.output_manifest.serialise() + else: + logger.info("The analysis didn't produce an output manifest.") + self.twine.validate(**serialised_strands) self._finalised = True - logger.info("Validated output values and output manifest against the twine.") + logger.info("Validated outputs against the twine.") + if self.output_manifest and (self.output_location or upload_output_datasets_to): + self._upload_output_datasets(upload_output_datasets_to, use_signed_urls) + + def _upload_output_datasets(self, upload_output_datasets_to, use_signed_urls): + """Upload the output manifest's datasets. + + :param str|None upload_output_datasets_to: If not provided but an output location was provided at instantiation, upload any output datasets into a unique subdirectory within this output location; if provided, upload into this location instead. The output manifest is updated with the upload locations. + :param bool|None use_signed_urls: if `True`, use signed URLs instead of cloud URIs for dataset paths in the output manifest; if `None`, use the value of `use_signed_urls_for_output_datasets` given at instantiation + :return None: + """ # Use a unique subdirectory in the output location given at instantiation (if given) if no # `upload_output_datasets_to` is provided. - if self.output_location and not upload_output_datasets_to: + if not upload_output_datasets_to: upload_output_datasets_to = storage.path.join(self.output_location, coolname.generate_slug()) - if use_signed_urls is None: - use_signed_urls = self.use_signed_urls_for_output_datasets - - # If there isn't both a non-None output manifest and upload location, nothing is uploaded. - if not (upload_output_datasets_to and getattr(self, "output_manifest")): - return + logger.info("Beginning upload of output datasets to %r...", upload_output_datasets_to) for name, dataset in self.output_manifest.datasets.items(): dataset.upload(cloud_path=storage.path.join(upload_output_datasets_to, name)) + if use_signed_urls is None: + use_signed_urls = self.use_signed_urls_for_output_datasets + if use_signed_urls: self.output_manifest.use_signed_urls_for_datasets() - logger.info("Uploaded output datasets to %r.", upload_output_datasets_to) + logger.info("Finished uploading output datasets to %r.", upload_output_datasets_to) def _calculate_strand_hashes(self, strands): """Calculate the hashes of the strands specified in the HASH_FUNCTIONS constant. diff --git a/tests/cloud/pub_sub/test_service.py b/tests/cloud/pub_sub/test_service.py index 1922c6762..6e5e6a1b1 100644 --- a/tests/cloud/pub_sub/test_service.py +++ b/tests/cloud/pub_sub/test_service.py @@ -708,10 +708,11 @@ def test_child_messages_can_be_recorded_by_parent(self): # Check that the child's messages have been recorded by the parent. self.assertEqual(parent.received_events[0]["event"]["kind"], "delivery_acknowledgement") - self.assertEqual(parent.received_events[1]["event"]["kind"], "log_record") - self.assertEqual(parent.received_events[2]["event"]["kind"], "log_record") - self.assertEqual(parent.received_events[3]["event"]["kind"], "log_record") - self.assertEqual(parent.received_events[4]["event"], {"kind": "result", "output_values": "Hello! It worked!"}) + + for i in range(1, 6): + self.assertEqual(parent.received_events[i]["event"]["kind"], "log_record") + + self.assertEqual(parent.received_events[6]["event"], {"kind": "result", "output_values": "Hello! It worked!"}) def test_child_exception_message_can_be_recorded_by_parent(self): """Test that the parent can record exceptions raised by the child."""