Skip to content

Commit

Permalink
Shift to using a semver string tracked in an airflow variable rather …
Browse files Browse the repository at this point in the history
…than relying on zenodo to auto-update the version
  • Loading branch information
jmelot committed Aug 1, 2024
1 parent 741d587 commit d6b12d6
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 3 deletions.
33 changes: 32 additions & 1 deletion cset_openalex_augmentation_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
import os
from datetime import datetime

import semver
from airflow import DAG
from airflow.models import Variable
from airflow.operators.bash import BashOperator
from airflow.operators.python import BranchPythonOperator, PythonOperator
from airflow.providers.google.cloud.operators.bigquery import (
Expand Down Expand Up @@ -34,6 +36,29 @@
args["retries"] = 1


def get_updated_version() -> str:
"""
Get the latest semver string for the delivery, and update the corresponding airflow Variable. By default,
we bump the minor version with each delivery, but this function allows us to manually set a different
part of the semver string to bump in the variable
:return: Updated semver string
"""
oa_variable = "cset_openalex_version"
version_config = Variable.get(oa_variable, deserialize_json=True)
version_to_updater = {
"major": semver.bump_major,
"minor": semver.bump_minor,
"patch": semver.bump_patch,
}
new_version = version_to_updater[version_config["increment"]](
version_config["current_version"]
)
Variable.set(
oa_variable, json.dumps({"current_version": new_version, "increment": "minor"})
)
return new_version


with DAG(
"cset_openalex_updater",
default_args=args,
Expand Down Expand Up @@ -146,6 +171,11 @@
force_rerun=True,
)

update_version = PythonOperator(
task_id="update_version",
python_callable=get_updated_version,
)

gce_instance_start = ComputeEngineStartInstanceOperator(
task_id=f"start-{gce_resource_id}",
project_id=PROJECT_ID,
Expand All @@ -160,7 +190,7 @@
f"gsutil -m cp -r gs://{DATA_BUCKET}/{tmp_dir}/{production_dataset} .",
f"gsutil -m cp -r gs://{DATA_BUCKET}/{production_dataset}/upload.py .",
f"zip -r {production_dataset}.zip {production_dataset}",
"python3 upload.py",
"python3 upload.py --version {{ task_instance.xcom_pull('update_version', key='return_value') }}",
]
update_zenodo_script = " && ".join(update_zenodo_sequence)

Expand All @@ -186,6 +216,7 @@
>> snapshot
>> pop_descriptions
>> export_metadata
>> update_version
>> gce_instance_start
>> update_zenodo
>> gce_instance_stop
Expand Down
7 changes: 5 additions & 2 deletions scripts/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
import requests


def upload(data_fi: str) -> None:
def upload(data_fi: str, version: str) -> None:
"""
Update Zenodo bucket with new data version
:param data_fi: Path to data to upload
:param version: New version string for the delivery
:return: None
"""
with open(".settings.txt") as f:
Expand All @@ -20,6 +21,7 @@ def upload(data_fi: str) -> None:
params=params,
)
new_version_metadata = new_version_response.json()["metadata"]
new_version_metadata["version"] = version
record_id = new_version_response.json()["record_id"]

try:
Expand Down Expand Up @@ -65,6 +67,7 @@ def upload(data_fi: str) -> None:
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--input_file", default="cset_openalex.zip")
parser.add_argument("--version", required=True)
args = parser.parse_args()

upload(args.input_file)
upload(args.input_file, args.version)

0 comments on commit d6b12d6

Please sign in to comment.