Skip to content

Commit

Permalink
fix call cache bug and add better test (#76)
Browse files Browse the repository at this point in the history
  • Loading branch information
morsecodist authored Apr 28, 2022
1 parent 352ee79 commit 3c71b64
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 12 deletions.
15 changes: 12 additions & 3 deletions miniwdl-plugins/s3upload/miniwdl_s3upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,11 @@ def get(
uri = urlparse(get_s3_get_prefix(self._cfg))
bucket, prefix = uri.hostname, uri.path

key = os.path.join(prefix, "cache", f"{key}.json")[1:]
s3_key = os.path.join(prefix, "cache", f"{key}.json")[1:]
abs_fn = os.path.join(self._cfg["call_cache"]["dir"], f"{key}.json")
Path(abs_fn).parent.mkdir(parents=True, exist_ok=True)
try:
s3_client.download_file(bucket, key, abs_fn)
s3_client.download_file(bucket, s3_key, abs_fn)
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] != "404":
raise e
Expand Down Expand Up @@ -238,7 +238,15 @@ def workflow(cfg, logger, run_id, run_dir, workflow, **recv):
workflow.name,
)

yield recv
# HACK: Because of the way that call caching works if a step is call cached its outputs
# will be s3 paths. This is fine for inputs to other steps because the downloader
# will download them but for the last step of the pipeline, it tries to link
# the s3 paths if they are outputs to the global pipeline and this results
# in file not found errors. Technically for swipe we don't need linking
# and our whole system works if we just stop here. Once we solve the linking
# problem a bit better we may want to revisit this and return this to:
# yield recv
exit(0)


def write_outputs_s3_json(logger, outputs, run_dir, s3prefix, namespace):
Expand Down Expand Up @@ -267,6 +275,7 @@ def rewriter(fd):
with open(fn, "w") as outfile:
json.dump(outputs_s3_json, outfile, indent=2)
outfile.write("\n")

s3cp(logger, fn, os.environ.get("WDL_OUTPUT_URI", os.path.join(s3prefix, "outputs.s3.json")))


Expand Down
87 changes: 79 additions & 8 deletions test/test_wdl.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,15 @@
docker_image_id = docker_image_id
}
call add_goodbye {
input:
hello_world = add_world.out,
docker_image_id = docker_image_id
}
output {
File out = add_world.out
File out_goodbye = add_goodbye.out_goodbye
}
}
Expand All @@ -44,6 +51,26 @@
docker: docker_image_id
}
}
task add_goodbye {
input {
File hello_world
String docker_image_id
}
command <<<
cat ~{hello_world} > out_goodbye.txt
echo goodbye >> out_goodbye.txt
>>>
output {
File out_goodbye = "out_goodbye.txt"
}
runtime {
docker: docker_image_id
}
}
"""

test_fail_wdl = """
Expand Down Expand Up @@ -141,6 +168,9 @@
class TestSFNWDL(unittest.TestCase):
def setUp(self) -> None:
self.s3 = boto3.resource("s3", endpoint_url="http://localhost:9000")
self.s3_client = boto3.client("s3", endpoint_url="http://localhost:9000")
self.batch = boto3.client("batch", endpoint_url="http://localhost:9000")
self.logs = boto3.client("logs", endpoint_url="http://localhost:9000")
self.sfn = boto3.client("stepfunctions", endpoint_url="http://localhost:8083")
self.test_bucket = self.s3.create_bucket(Bucket="swipe-test")
self.lamb = boto3.client("lambda", endpoint_url="http://localhost:9000")
Expand Down Expand Up @@ -196,8 +226,39 @@ def _wait_sfn(
time.sleep(10)
description = self.sfn.describe_execution(executionArn=arn)
print("printing execution history", file=sys.stderr)
for event in self.sfn.get_execution_history(executionArn=arn)["events"]:
print(event, file=sys.stderr)

seen_events = set()
for event in sorted(self.sfn.get_execution_history(executionArn=arn)["events"], key=lambda x: x["id"]):
if event["id"] not in seen_events:
details = {}
for key in event.keys():
if key.endswith("EventDetails") and event[key]:
details = event[key]
print(
event["timestamp"],
event["type"],
details.get("resourceType", ""),
details.get("resource", ""),
details.get("name", ""),
json.loads(details.get("parameters", "{}")).get("FunctionName", ""),
file=sys.stderr,
)
if "taskSubmittedEventDetails" in event:
if event.get("taskSubmittedEventDetails", {}).get("resourceType") == "batch":
job_id = json.loads(event["taskSubmittedEventDetails"]["output"])["JobId"]
print(f"Batch job ID {job_id}", file=sys.stderr)
job_desc = self.batch.describe_jobs(jobs=[job_id])["jobs"][0]
try:
log_group_name = job_desc["container"]["logConfiguration"]["options"]["awslogs-group"]
except KeyError:
log_group_name = "/aws/batch/job"
response = self.logs.get_log_events(
logGroupName=log_group_name,
logStreamName=job_desc["container"]["logStreamName"]
)
for log_event in response["events"]:
print(log_event["message"], file=sys.stderr)
seen_events.add(event["id"])

resp = self.sqs.receive_message(
QueueUrl=self.state_change_queue_url,
Expand Down Expand Up @@ -228,10 +289,10 @@ def test_simple_sfn_wdl_workflow(self):
arn, description, messages = self._wait_sfn(sfn_input, self.single_sfn_arn)

output = json.loads(description["output"])
output_path = (
f"s3://{self.input_obj.bucket_name}/{output_prefix}/test-1/out.txt"
)
self.assertEqual(output["Result"], {"swipe_test.out": output_path})
self.assertEqual(output["Result"], {
"swipe_test.out": f"s3://{self.input_obj.bucket_name}/{output_prefix}/test-1/out.txt",
"swipe_test.out_goodbye": f"s3://{self.input_obj.bucket_name}/{output_prefix}/test-1/out_goodbye.txt",
})

outputs_obj = self.test_bucket.Object(f"{output_prefix}/test-1/out.txt")
output_text = outputs_obj.get()["Body"].read().decode()
Expand Down Expand Up @@ -317,11 +378,21 @@ def test_call_cache(self):
self.test_bucket.Object(f"{output_prefix}/test-1/out.txt").put(
Body="cache_break\n".encode()
)
self.test_bucket.Object(f"{output_prefix}/test-1/out_goodbye.txt").delete()

# clear cache to simulate getting cut off the step before this one
objects = self.s3_client.list_objects_v2(
Bucket=self.test_bucket.name,
Prefix=f"{output_prefix}/test-1/cache/add_goodbye/",
)["Contents"]
self.test_bucket.Object(objects[0]["Key"]).delete()
self.test_bucket.Object(f"{output_prefix}/test-1/run_output.json").delete()

self._wait_sfn(sfn_input, self.single_sfn_arn)

outputs_obj = self.test_bucket.Object(f"{output_prefix}/test-1/out.txt")
outputs_obj = self.test_bucket.Object(f"{output_prefix}/test-1/out_goodbye.txt")
output_text = outputs_obj.get()["Body"].read().decode()
assert output_text == "cache_break\n", output_text
assert output_text == "cache_break\ngoodbye\n", output_text


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v0.19.4-beta
v0.19.5-beta

0 comments on commit 3c71b64

Please sign in to comment.