diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 1b82d87..53b07e1 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -13,19 +13,19 @@ jobs: uses: actions/setup-python@v1 with: python-version: 3.7 -# - name: Install dependencies -# run: | -# python -m pip install --upgrade pip -# pip install -r requirements.txt -# - name: Test with pytest -# run: | -# coverage run -m pytest tests -# coverage xml -o coverage/python.xml -# - name: Report python coverage -# uses: orgoro/coverage@v3 -# with: -# coverageFile: coverage/python.xml -# token: ${{ secrets.GITHUB_TOKEN }} + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + - name: Test with pytest + run: | + coverage run -m pytest tests + coverage xml -o coverage/python.xml + - name: Report python coverage + uses: orgoro/coverage@v3 + with: + coverageFile: coverage/python.xml + token: ${{ secrets.GITHUB_TOKEN }} # The next few steps only apply if you have javascript files # - name: Setup node # uses: actions/setup-node@v3 diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..c51b32e --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +coverage +google-cloud-storage +pre-commit +pytest diff --git a/ror_dag.py b/ror_dag.py index 938364c..89b2ef3 100644 --- a/ror_dag.py +++ b/ror_dag.py @@ -54,7 +54,7 @@ ) # Retrieve and expand the data - json_loc = tmp_dir + "/ror.jsonl" + raw_jsonl_loc = tmp_dir + "/ror.jsonl" working_dir = "ror_working_dir" setup_commands = f"rm -rf {working_dir};" + " && ".join( [ @@ -68,7 +68,7 @@ ) download_data = GKEStartPodOperator( task_id="download_data", - name="1790_er_download_data", + name="ror-download", project_id=PROJECT_ID, location=GCP_ZONE, cluster_name="cc2-task-pool", @@ -78,7 +78,49 @@ "-c", ( setup_commands - + f" && python3 fetch.py --output_bucket '{DATA_BUCKET}' --output_loc '{json_loc}'" + + f" && python3 fetch.py --output_bucket '{DATA_BUCKET}' --output_loc '{raw_jsonl_loc}'" + ), + ], + namespace="default", + image=f"gcr.io/{PROJECT_ID}/cc2-task-pool", + get_logs=True, + startup_timeout_seconds=300, + on_finish_action="delete_pod", + affinity={ + "nodeAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "cloud.google.com/gke-nodepool", + "operator": "In", + "values": [ + "default-pool", + ], + } + ] + } + ] + } + } + }, + ) + + jsonl_with_up = tmp_dir + "ror_json_with_up.jsonl" + add_ultimate_parent = GKEStartPodOperator( + task_id="add_ultimate_parent", + name="ror-ultimate-parent", + project_id=PROJECT_ID, + location=GCP_ZONE, + cluster_name="cc2-task-pool", + do_xcom_push=True, + cmds=["/bin/bash"], + arguments=[ + "-c", + ( + setup_commands + + f" && python3 get_ultimate_parent.py --bucket '{DATA_BUCKET}' --input_loc '{raw_jsonl_loc}' --output_loc '{jsonl_with_up}'" ), ], namespace="default", @@ -111,7 +153,7 @@ load_staging = GCSToBigQueryOperator( task_id="load_staging", bucket=DATA_BUCKET, - source_objects=[json_loc], + source_objects=[jsonl_with_up], schema_object=f"{schema_dir}/ror.json", destination_project_dataset_table=f"{staging_dataset}.ror", source_format="NEWLINE_DELIMITED_JSON", @@ -174,6 +216,7 @@ ( clear_tmp_dir >> download_data + >> add_ultimate_parent >> load_staging >> checks >> load_production diff --git a/ror_scripts/get_ultimate_parent.py b/ror_scripts/get_ultimate_parent.py new file mode 100644 index 0000000..37b43f0 --- /dev/null +++ b/ror_scripts/get_ultimate_parent.py @@ -0,0 +1,90 @@ +import argparse +import json +import os +import tempfile + +from google.cloud import storage + + +def traverse_parents(ror_id: str, id_to_parent: dict) -> str: + """ + Find the ultimate ancestor of a ror id + :param ror_id: ror id to find the ultimate ancestor for + :param id_to_parent: dict mapping ror id to parent ror id + :return: Ultimate ancestor of a ror id + """ + parent = id_to_parent[ror_id] + if ror_id == parent: + return ror_id + return traverse_parents(parent, id_to_parent) + + +def roll_up(id_to_parent: dict) -> dict: + """ + Roll up ror id to its ultimate parent. + :param id_to_parent: ror id to immediate parent mapping. If a ROR id has no parent, it is mapped to itself + :return: dict containing ror id to ultimate parent mapping. Ultimate parents are mapped to themselves + """ + id_to_ultimate_parent = {} + for ror_id in id_to_parent: + ultimate_parent = traverse_parents(ror_id, id_to_parent) + id_to_ultimate_parent[ror_id] = ultimate_parent + return id_to_ultimate_parent + + +def run(bucket_name: str, input_loc: str, output_loc: str) -> None: + """ + With thanks to https://ror.readme.io/docs/data-dump, fetches ROR data, + converts to jsonl, and uploads to GCS + :param bucket_name: GCS bucket where json of data should be read and written + :param input_loc: Location of input data within `bucket` + :param output_loc: Location where output data should be written to `bucket` + :return: None + """ + storage_client = storage.Client() + bucket = storage_client.bucket(bucket_name) + with tempfile.TemporaryDirectory() as td: + blob = bucket.blob(input_loc) + input_fi = os.path.join(td, "input.jsonl") + blob.download_to_filename(input_fi) + id_to_parent = {} + rows = [] + with open(input_fi) as f: + for line in f: + js = json.loads(line) + ror_id = js["id"] + parent_id = ror_id + for relationship in js["relationships"]: + if relationship["type"].lower() == "parent": + parent_id = relationship["id"] + assert ror_id not in id_to_parent, f"Duplicate ID: {ror_id}" + id_to_parent[ror_id] = parent_id + rows.append(js) + id_to_ultimate_parent = roll_up(id_to_parent) + output_fi = os.path.join(td, "output.jsonl") + with open(output_fi, mode="w") as out: + for row in rows: + row["ultimate_parent"] = id_to_ultimate_parent[row["id"]] + out.write(json.dumps(row) + "\n") + blob = bucket.blob(output_loc) + blob.upload_from_filename(output_fi) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument( + "--bucket", + help="GCS bucket where data should be read and written", + required=True, + ) + parser.add_argument( + "--input_loc", help="Location of input data within `bucket`", required=True + ) + parser.add_argument( + "--output_loc", + help="Location where output data should be written to `bucket`", + required=True, + ) + args = parser.parse_args() + + run(args.bucket, args.input_loc, args.output_loc) diff --git a/schemas/ror.json b/schemas/ror.json index 614c256..f77287a 100644 --- a/schemas/ror.json +++ b/schemas/ror.json @@ -460,5 +460,11 @@ "mode": "NULLABLE", "name": "external_ids", "type": "RECORD" + }, + { + "mode": "REQUIRED", + "name": "ultimate_parent", + "type": "STRING", + "description": "Added by CSET through recursive traversal of `relationship`. The most distant ancestor of this organization." } ] diff --git a/tests/test_get_ultimate_parent.py b/tests/test_get_ultimate_parent.py new file mode 100644 index 0000000..7860452 --- /dev/null +++ b/tests/test_get_ultimate_parent.py @@ -0,0 +1,21 @@ +import unittest + +from ror_scripts.get_ultimate_parent import roll_up, traverse_parents + + +class TestGetUltimateParent(unittest.TestCase): + def test_roll_up(self) -> None: + id_to_parent = {"A": "B", "B": "C", "C": "D", "D": "D", "E": "E"} + expected_id_to_ultimate_parent = { + "A": "D", + "B": "D", + "C": "D", + "D": "D", + "E": "E", + } + self.assertEqual(expected_id_to_ultimate_parent, roll_up(id_to_parent)) + + def test_traverse_parents(self) -> None: + id_to_parent = {"A": "B", "B": "C", "C": "D", "D": "D"} + self.assertEqual("D", traverse_parents("A", id_to_parent)) + self.assertEqual("D", traverse_parents("D", id_to_parent))