Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add column containing ultimate parent #6

Merged
merged 2 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,19 @@ jobs:
uses: actions/setup-python@v1
with:
python-version: 3.7
# - name: Install dependencies
# run: |
# python -m pip install --upgrade pip
# pip install -r requirements.txt
# - name: Test with pytest
# run: |
# coverage run -m pytest tests
# coverage xml -o coverage/python.xml
# - name: Report python coverage
# uses: orgoro/coverage@v3
# with:
# coverageFile: coverage/python.xml
# token: ${{ secrets.GITHUB_TOKEN }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Test with pytest
run: |
coverage run -m pytest tests
coverage xml -o coverage/python.xml
- name: Report python coverage
uses: orgoro/coverage@v3
with:
coverageFile: coverage/python.xml
token: ${{ secrets.GITHUB_TOKEN }}
# The next few steps only apply if you have javascript files
# - name: Setup node
# uses: actions/setup-node@v3
Expand Down
4 changes: 4 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
coverage
google-cloud-storage
pre-commit
pytest
51 changes: 47 additions & 4 deletions ror_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
)

# Retrieve and expand the data
json_loc = tmp_dir + "/ror.jsonl"
raw_jsonl_loc = tmp_dir + "/ror.jsonl"
working_dir = "ror_working_dir"
setup_commands = f"rm -rf {working_dir};" + " && ".join(
[
Expand All @@ -68,7 +68,7 @@
)
download_data = GKEStartPodOperator(
task_id="download_data",
name="1790_er_download_data",
name="ror-download",
project_id=PROJECT_ID,
location=GCP_ZONE,
cluster_name="cc2-task-pool",
Expand All @@ -78,7 +78,49 @@
"-c",
(
setup_commands
+ f" && python3 fetch.py --output_bucket '{DATA_BUCKET}' --output_loc '{json_loc}'"
+ f" && python3 fetch.py --output_bucket '{DATA_BUCKET}' --output_loc '{raw_jsonl_loc}'"
),
],
namespace="default",
image=f"gcr.io/{PROJECT_ID}/cc2-task-pool",
get_logs=True,
startup_timeout_seconds=300,
on_finish_action="delete_pod",
affinity={
"nodeAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [
{
"matchExpressions": [
{
"key": "cloud.google.com/gke-nodepool",
"operator": "In",
"values": [
"default-pool",
],
}
]
}
]
}
}
},
)

jsonl_with_up = tmp_dir + "ror_json_with_up.jsonl"
add_ultimate_parent = GKEStartPodOperator(
task_id="add_ultimate_parent",
name="ror-ultimate-parent",
project_id=PROJECT_ID,
location=GCP_ZONE,
cluster_name="cc2-task-pool",
do_xcom_push=True,
cmds=["/bin/bash"],
arguments=[
"-c",
(
setup_commands
+ f" && python3 get_ultimate_parent.py --bucket '{DATA_BUCKET}' --input_loc '{raw_jsonl_loc}' --output_loc '{jsonl_with_up}'"
),
],
namespace="default",
Expand Down Expand Up @@ -111,7 +153,7 @@
load_staging = GCSToBigQueryOperator(
task_id="load_staging",
bucket=DATA_BUCKET,
source_objects=[json_loc],
source_objects=[jsonl_with_up],
schema_object=f"{schema_dir}/ror.json",
destination_project_dataset_table=f"{staging_dataset}.ror",
source_format="NEWLINE_DELIMITED_JSON",
Expand Down Expand Up @@ -174,6 +216,7 @@
(
clear_tmp_dir
>> download_data
>> add_ultimate_parent
>> load_staging
>> checks
>> load_production
Expand Down
90 changes: 90 additions & 0 deletions ror_scripts/get_ultimate_parent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import argparse
import json
import os
import tempfile

from google.cloud import storage


def traverse_parents(ror_id: str, id_to_parent: dict) -> str:
"""
Find the ultimate ancestor of a ror id
:param ror_id: ror id to find the ultimate ancestor for
:param id_to_parent: dict mapping ror id to parent ror id
:return: Ultimate ancestor of a ror id
"""
parent = id_to_parent[ror_id]
if ror_id == parent:
return ror_id
return traverse_parents(parent, id_to_parent)


def roll_up(id_to_parent: dict) -> dict:
"""
Roll up ror id to its ultimate parent.
:param id_to_parent: ror id to immediate parent mapping. If a ROR id has no parent, it is mapped to itself
:return: dict containing ror id to ultimate parent mapping. Ultimate parents are mapped to themselves
"""
id_to_ultimate_parent = {}
for ror_id in id_to_parent:
ultimate_parent = traverse_parents(ror_id, id_to_parent)
id_to_ultimate_parent[ror_id] = ultimate_parent
return id_to_ultimate_parent


def run(bucket_name: str, input_loc: str, output_loc: str) -> None:
"""
With thanks to https://ror.readme.io/docs/data-dump, fetches ROR data,
converts to jsonl, and uploads to GCS
:param bucket_name: GCS bucket where json of data should be read and written
:param input_loc: Location of input data within `bucket`
:param output_loc: Location where output data should be written to `bucket`
:return: None
"""
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
with tempfile.TemporaryDirectory() as td:
blob = bucket.blob(input_loc)
input_fi = os.path.join(td, "input.jsonl")
blob.download_to_filename(input_fi)
id_to_parent = {}
rows = []
with open(input_fi) as f:
for line in f:
js = json.loads(line)
ror_id = js["id"]
parent_id = ror_id
for relationship in js["relationships"]:
if relationship["type"].lower() == "parent":
parent_id = relationship["id"]
assert ror_id not in id_to_parent, f"Duplicate ID: {ror_id}"
id_to_parent[ror_id] = parent_id
rows.append(js)
id_to_ultimate_parent = roll_up(id_to_parent)
output_fi = os.path.join(td, "output.jsonl")
with open(output_fi, mode="w") as out:
for row in rows:
row["ultimate_parent"] = id_to_ultimate_parent[row["id"]]
out.write(json.dumps(row) + "\n")
blob = bucket.blob(output_loc)
blob.upload_from_filename(output_fi)


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--bucket",
help="GCS bucket where data should be read and written",
required=True,
)
parser.add_argument(
"--input_loc", help="Location of input data within `bucket`", required=True
)
parser.add_argument(
"--output_loc",
help="Location where output data should be written to `bucket`",
required=True,
)
args = parser.parse_args()

run(args.bucket, args.input_loc, args.output_loc)
6 changes: 6 additions & 0 deletions schemas/ror.json
Original file line number Diff line number Diff line change
Expand Up @@ -460,5 +460,11 @@
"mode": "NULLABLE",
"name": "external_ids",
"type": "RECORD"
},
{
"mode": "REQUIRED",
"name": "ultimate_parent",
"type": "STRING",
"description": "Added by CSET through recursive traversal of `relationship`. The most distant ancestor of this organization."
}
]
21 changes: 21 additions & 0 deletions tests/test_get_ultimate_parent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import unittest

from ror_scripts.get_ultimate_parent import roll_up, traverse_parents


class TestGetUltimateParent(unittest.TestCase):
def test_roll_up(self) -> None:
id_to_parent = {"A": "B", "B": "C", "C": "D", "D": "D", "E": "E"}
expected_id_to_ultimate_parent = {
"A": "D",
"B": "D",
"C": "D",
"D": "D",
"E": "E",
}
self.assertEqual(expected_id_to_ultimate_parent, roll_up(id_to_parent))

def test_traverse_parents(self) -> None:
id_to_parent = {"A": "B", "B": "C", "C": "D", "D": "D"}
self.assertEqual("D", traverse_parents("A", id_to_parent))
self.assertEqual("D", traverse_parents("D", id_to_parent))
Loading