Skip to content

Commit

Permalink
pt 3
Browse files Browse the repository at this point in the history
  • Loading branch information
keegansmith21 committed Jun 7, 2024
1 parent df8d3c2 commit 3428d25
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import logging
import os
from typing import List, Union
from typing import Union
from concurrent.futures import ThreadPoolExecutor, as_completed

import pendulum
Expand Down
98 changes: 81 additions & 17 deletions dags/oaebu_workflows/ucl_sales_telescope/ucl_sales_telescope.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

from oaebu_workflows.config import sql_folder
from oaebu_workflows.oaebu_partners import OaebuPartner, partner_from_str
from oaebu_workflows.onix_workflow.onix_workflow import copy_latest_export_tables
from observatory_platform.dataset_api import DatasetAPI, DatasetRelease
from observatory_platform.files import save_jsonl_gz, load_jsonl
from observatory_platform.google.gcs import gcs_blob_uri, gcs_upload_files, gcs_blob_name_from_path, gcs_download_blob
Expand Down Expand Up @@ -61,7 +62,6 @@ def __init__(
super().__init__(dag_id=dag_id, run_id=run_id, partition_date=partition_date)
self.data_interval_start = data_interval_start
self.data_interval_end = data_interval_end
self.transform_file_name = "ucl_sales.jsonl.gz"

@property
def download_path(self):
Expand All @@ -71,6 +71,10 @@ def download_path(self):
def transform_path(self):
return os.path.join(self.transform_folder, "ucl_sales.jsonl.gz")

@property
def aggregate_path(self):
return os.path.join(self.transform_folder, "ucl_sales_aggregated.jsonl.gz")

@property
def download_blob_name(self):
return gcs_blob_name_from_path(self.downloady_path)
Expand Down Expand Up @@ -105,8 +109,9 @@ def create_dag(
cloud_workspace: CloudWorkspace,
sheet_id: str,
data_partner: Union[str, OaebuPartner] = "ucl_sales",
bq_dataset_description: str = "UCL Sales Dataset",
bq_dataset_description: str = "UCL Dataset",
bq_table_description: str = "UCL Sales Table",
bq_agg_table_description: str = "UCL Sales Aggregated",
api_bq_dataset_id: str = "dataset_api",
oaebu_service_account_conn_id: str = "oaebu_service_account",
schedule: str = "0 0 4 * *", # run on the 4th of every month TODO: confirm
Expand All @@ -123,7 +128,8 @@ def create_dag(
:param sheet_id: The ID of the google sheet containing the sales data
:param data_partner: The name of the data partner
:param bq_dataset_description: Description for the BigQuery dataset
:param bq_table_description: Description for the biguery table
:param bq_table_description: Description for the raw data biguery table
:param bq_table_description: Description for the aggregated biguery table
:param api_bq_dataset_id: The name of the Bigquery dataset to store the API release(s)
:param oaebu_service_account_conn_id: Airflow connection ID for the oaebu service account
:param max_threads: The maximum number threads to utilise for parallel processes
Expand Down Expand Up @@ -184,10 +190,8 @@ def _transform(release: dict, **context) -> None:
if not success:
raise FileNotFoundError(f"Error downloading file: {release.download_blob_name}")

client = Client(project=cloud_workspace.project_id)
book_list = get_ucl_book_list(client=client)
data = load_jsonl(release.download_path)
data = transform(data, book_list)
data = transform(data)

save_jsonl_gz(release.transform_path, data)
success = gcs_upload_files(
Expand All @@ -196,7 +200,7 @@ def _transform(release: dict, **context) -> None:
set_task_state(success, context["ti"].task_id, release=release)

@task()
def _bq_load(release: dict, **context) -> None:
def _bq_load_transformed(release: dict, **context) -> None:
"""Loads the transformed data into BigQuery"""

release = UclSalesRelease.from_dict(release)
Expand Down Expand Up @@ -225,6 +229,59 @@ def _bq_load(release: dict, **context) -> None:
)
set_task_state(state, context["ti"].task_id, release=release)

@task()
def _aggregate(release: dict, **context) -> None:
"""Aggregates the sales data into a partitioned bigquery table"""

release = UclSalesRelease.from_dict(release)
client = Client(project=cloud_workspace.project_id)
data = run_aggregation_query(client)

save_jsonl_gz(release.aggregate_path, data)
success = gcs_upload_files(
bucket_name=cloud_workspace.transform_bucket, file_paths=[release.aggregate_path]
)
set_task_state(success, context["ti"].task_id, release=release)

@task()
def _bq_load_aggregation(release: dict, **context) -> None:
"""Loads the aggregated sales data into a sharded biqquery table"""

release = UclSalesRelease.from_dict(release)
bq_create_dataset(
project_id=cloud_workspace.project_id,
dataset_id=data_partner.bq_dataset_id,
location=cloud_workspace.data_location,
description=bq_dataset_description,
)

uri = gcs_blob_uri(cloud_workspace.transform_bucket, gcs_blob_name_from_path(release.transform_path))
table_id = bq_table_id(cloud_workspace.project_id, data_partner.bq_dataset_id, data_partner.bq_table_name)
client = Client(project=cloud_workspace.project_id)
state = bq_load_table(
uri=uri,
table_id=table_id,
schema_file_path=data_partner.schema_path,
source_format=SourceFormat.NEWLINE_DELIMITED_JSON,
table_description=bq_table_description,
ignore_unknown_values=True,
client=client,
)
set_task_state(state, context["ti"].task_id, release=release)

@task()
def _update_latest_aggregate_tables(release: dict, **context) -> None:
"""Create copies of the latest aggregate table in bigquery"""

release = UclSalesRelease.from_dict(release)
copy_latest_export_tables(
project_id=cloud_workspace.project_id,
from_dataset=data_partner.bq_dataset_id,
to_dataset=data_partner.bq_dataset_id,
date_match=release.snapshot_date.strftime("%Y%m%d"),
data_location=cloud_workspace.data_location,
)

@task()
def _add_new_dataset_releases(release: dict, **context) -> None:
"""Adds release information to API."""
Expand Down Expand Up @@ -256,7 +313,10 @@ def _cleanup_workflow(release: dict, **context) -> None:
xcom_release = _make_release()
task_download = _download(xcom_release)
task_transform = _transform(xcom_release)
task_bq_load = _bq_load(xcom_release)
task_bq_load_transformed = _bq_load_transformed(xcom_release)
task_aggregate = _aggregate(xcom_release)
task_bq_load_aggregation = _bq_load_aggregation(xcom_release)
task_update_latest_aggregate_table = _update_latest_aggregate_tables
task_add_new_dataset_releases = _add_new_dataset_releases(xcom_release)
task_cleanup_workflow = _cleanup_workflow(xcom_release)

Expand All @@ -265,7 +325,10 @@ def _cleanup_workflow(release: dict, **context) -> None:
>> xcom_release
>> task_download
>> task_transform
>> task_bq_load
>> task_bq_load_transformed
>> task_aggregate
>> task_bq_load_aggregation
>> task_update_latest_aggregate_table
>> task_add_new_dataset_releases
>> task_cleanup_workflow
)
Expand Down Expand Up @@ -327,30 +390,28 @@ def download(
return items


def get_ucl_book_list(client: Client = None) -> List[dict]:
"""Retrieves the ucl book list from the data export dataset
def run_aggregation_query(client: Client = None) -> List[dict]:
"""Retrieves runs the ucl sales aggregation query and returns the results
:param client: The bigquery client to use
:return: The resulting rows from the query
"""

sql_file = os.path.join(sql_folder("ucl_sales_telescope"), "book_list_query.sql")
sql_file = os.path.join(sql_folder("ucl_sales_telescope"), "ucl_sales_aggregation.sql")
with open(sql_file, "r") as f:
sql = f.read()
return bq_run_query(sql, client=client)


def transform_raw(data: List[dict]) -> List[dict]:
"""Transforms the ucl sales data. Aggregates each sale and matches the product using the book list
def transform(data: List[dict]) -> List[dict]:
"""Transforms the ucl sales data. Adds the release date partition.
:param data: The UCL sales data
:return: The transformed data
"""

# Convert the keys/headings to standard format
converted_data = {}
for k, v in data:
converted_data[k.strip().lower()] = v
converted_data = {k.strip().lower(): v for k, v in data}

# Check that all required headings are present
expected_headings = ("isbn", "qty", "year", "month", "free/paid/return?", "country", "book", "pub date", "format")
Expand All @@ -360,6 +421,9 @@ def transform_raw(data: List[dict]) -> List[dict]:
transformed = []
for row in converted_data:
new_row = {h: row[h] for h in expected_headings}
# Add release date. Not all entries for the sheet are necessarily of the same month
release_date = pendulum.datetime(int(row["year"]), (row["month"]), 1).end_of("month")
new_row["release_date"] = release_date.strftime("%Y-%m-%d")
transformed.append(new_row)

return transformed

0 comments on commit 3428d25

Please sign in to comment.