From 9d7513d042054c76b441f2a5990e6268fa41c769 Mon Sep 17 00:00:00 2001 From: keeagnsmith21 Date: Thu, 3 Oct 2024 16:50:37 +0800 Subject: [PATCH] xref events removal --- dags/oaebu_workflows/airflow_pools.py | 11 - .../onix_workflow/onix_workflow.py | 295 +--------- .../onix_workflow/schema/book.json | 80 --- .../onix_workflow/schema/book_metrics.json | 14 - .../schema/book_metrics_author.json | 17 +- .../schema/book_metrics_events.json | 170 ------ .../schema/book_metrics_subject_bic.json | 17 +- .../schema/book_metrics_subject_bisac.json | 17 +- .../schema/book_metrics_subject_thema.json | 17 +- .../onix_workflow/schema/book_product.json | 42 +- .../onix_workflow/schema/crossref_events.json | 552 ------------------ .../onix_workflow/sql/book.sql.jinja2 | 56 +- .../onix_workflow/sql/book_metrics.sql.jinja2 | 8 +- .../sql/book_metrics_author.sql.jinja2 | 8 +- .../sql/book_metrics_events.sql.jinja2 | 74 --- .../sql/book_metrics_subject_bic.sql.jinja2 | 8 +- .../sql/book_metrics_subject_bisac.sql.jinja2 | 8 +- .../sql/book_metrics_subject_thema.sql.jinja2 | 7 +- .../onix_workflow/sql/book_product.sql.jinja2 | 24 +- .../sql/crossref_events_filter_doi.sql.jinja2 | 33 -- .../fixtures/crossref_events_api_calls.yaml | 3 - .../fixtures/crossref_events_request.yaml | 3 - .../fixtures/e2e_outputs/book.json | 4 +- .../fixtures/e2e_outputs/book_list.json | 4 +- .../fixtures/e2e_outputs/book_product.json | 4 +- tests/onix_workflow/test_onix_workflow.py | 200 +------ 26 files changed, 51 insertions(+), 1625 deletions(-) delete mode 100644 dags/oaebu_workflows/onix_workflow/schema/book_metrics_events.json delete mode 100644 dags/oaebu_workflows/onix_workflow/schema/crossref_events.json delete mode 100644 dags/oaebu_workflows/onix_workflow/sql/book_metrics_events.sql.jinja2 delete mode 100644 dags/oaebu_workflows/onix_workflow/sql/crossref_events_filter_doi.sql.jinja2 delete mode 100644 tests/onix_workflow/fixtures/crossref_events_api_calls.yaml delete mode 100644 tests/onix_workflow/fixtures/crossref_events_request.yaml diff --git a/dags/oaebu_workflows/airflow_pools.py b/dags/oaebu_workflows/airflow_pools.py index 9bc911cc..5553f90f 100644 --- a/dags/oaebu_workflows/airflow_pools.py +++ b/dags/oaebu_workflows/airflow_pools.py @@ -41,14 +41,3 @@ def create_or_get_pool(self): return self.get_pool() except PoolNotFound: return self.create_pool() - - -class CrossrefEventsPool(AirflowPool): - def __init__(self, pool_slots: int = 15): - """Constructor CrossrefEventsPool instance - - :param pool_slots: The number of slots assigned to this pool - """ - super().__init__( - pool_name="crossref_events_pool", pool_slots=pool_slots, pool_description="Crossref Events API Pool" - ) diff --git a/dags/oaebu_workflows/onix_workflow/onix_workflow.py b/dags/oaebu_workflows/onix_workflow/onix_workflow.py index bf36722c..da9c1c25 100644 --- a/dags/oaebu_workflows/onix_workflow/onix_workflow.py +++ b/dags/oaebu_workflows/onix_workflow/onix_workflow.py @@ -19,23 +19,17 @@ import logging import os import re -from typing import Iterable, List, Optional, Tuple, Union, Dict +from typing import Iterable, List, Optional, Union, Dict -import jsonlines import pendulum from airflow.decorators import dag, task, task_group -from airflow.exceptions import AirflowSkipException from airflow.models.baseoperator import chain from airflow.utils.trigger_rule import TriggerRule from airflow.timetables.base import Timetable from google.cloud.bigquery import Client, SourceFormat from jinja2 import Environment, FileSystemLoader -from ratelimit import limits, sleep_and_retry -from requests import Request, Session -from tenacity import wait_exponential_jitter -from oaebu_workflows.airflow_pools import CrossrefEventsPool -from oaebu_workflows.config import oaebu_user_agent_header, schema_folder as default_schema_folder, sql_folder +from oaebu_workflows.config import schema_folder as default_schema_folder, sql_folder from oaebu_workflows.oaebu_partners import DataPartner, OaebuPartner, partner_from_str, create_bespoke_data_partners from oaebu_workflows.onix_workflow.onix_work_aggregation import BookWorkAggregator, BookWorkFamilyAggregator from onix_workflow_schedule import OnixWorkflowTimetable @@ -45,7 +39,7 @@ from observatory_platform.airflow.tasks import check_dependencies from observatory_platform.airflow.workflow import cleanup, CloudWorkspace from observatory_platform.dataset_api import DatasetAPI, DatasetRelease -from observatory_platform.files import save_jsonl_gz, yield_jsonl +from observatory_platform.files import save_jsonl_gz from observatory_platform.google.bigquery import ( bq_copy_table, bq_create_dataset, @@ -53,22 +47,12 @@ bq_find_schema, bq_load_table, bq_run_query, - bq_select_latest_table, bq_select_table_shard_dates, bq_sharded_table_id, bq_table_id, ) from observatory_platform.google.gcs import gcs_blob_name_from_path, gcs_blob_uri, gcs_upload_files from observatory_platform.jinja2_utils import render_template -from observatory_platform.url_utils import retry_get_url - -# Crossref Events rate limit settings -# These settings mean that, for instance, if there are n Crossref Events tasks queued, then at most -# 15 tasks will run at once (due to the pool slots) and each task will only ever call Crossref Events -# at a rate of once per second. -MAX_POOL_SLOTS = 15 # Matches 15 requests -CALLS_PER_PERIOD = 1 # 1 call per second -PERIOD = 1 # 1 second class OnixWorkflowRelease(SnapshotRelease): @@ -104,7 +88,6 @@ def __init__( self.workslookup_errors_file_name = "worksid_errors.jsonl.gz" self.worksfamilylookup_file_name = "workfamilyid.jsonl.gz" self.crossref_metadata_file_name = "crossref_metadata.jsonl.gz" - self.crossref_events_file_name = "crossref_events.jsonl" # Generated Schemas self.book_product_schema_file_name = "book_product_schema.json" @@ -132,14 +115,6 @@ def worksfamilylookup_path(self): def crossref_metadata_path(self): return os.path.join(self.transform_folder, self.crossref_metadata_file_name) - @property - def download_crossref_events_path(self): - return os.path.join(self.download_folder, self.crossref_events_file_name) - - @property - def transformed_crossref_events_path(self): - return os.path.join(self.transform_folder, self.crossref_events_file_name) - @property def book_product_schema_path(self): return os.path.join(self.transform_folder, self.book_product_schema_file_name) @@ -185,10 +160,6 @@ def worksfamilylookup_blob_name(self): def crossref_metadata_blob_name(self): return gcs_blob_name_from_path(self.crossref_metadata_path) - @property - def crossref_events_blob_name(self): - return gcs_blob_name_from_path(self.transformed_crossref_events_path) - @staticmethod def from_dict(dict_: dict): return OnixWorkflowRelease( @@ -220,7 +191,6 @@ def create_dag( bq_oaebu_crossref_dataset_id: str = "crossref", bq_master_crossref_metadata_table_name: str = "crossref_metadata", bq_oaebu_crossref_metadata_table_name: str = "crossref_metadata", - bq_crossref_events_table_name: str = "crossref_events", bq_country_project_id: str = "oaebu-public-data", bq_country_dataset_id: str = "oaebu_reference", bq_subject_project_id: str = "oaebu-public-data", @@ -235,7 +205,6 @@ def create_dag( bq_worksid_table_name: str = "onix_workid_isbn", bq_worksid_error_table_name: str = "onix_workid_isbn_errors", bq_workfamilyid_table_name: str = "onix_workfamilyid_isbn", - skip_downloading_crossref_events: bool = True, # Run parameters data_partners: List[Union[str, OaebuPartner]] = None, bespoke_data_partners: List[Dict] = None, @@ -265,7 +234,6 @@ def create_dag( :param bq_oaebu_crossref_dataset_id: GCP dataset ID of crossref OAeBU data :param bq_master_crossref_metadata_table_name: The name of the master crossref metadata table :param bq_oaebu_crossref_metadata_table_name: The name of the OAeBU crossref metadata table - :param bq_crossref_events_table_name: The name of the crossref events table :param bq_country_project_id: GCP project ID of the country table :param bq_country_dataset_id: GCP dataset containing the country table :param bq_subject_project_id: GCP project ID of the subject tables @@ -280,7 +248,6 @@ def create_dag( :param bq_worksid_table_name: table ID of the worksid table :param bq_worksid_error_table_name: table ID of the worksid error table :param bq_workfamilyid_table_name: table ID of the workfamilyid table - :param skip_downloading_crossref_events: skip fetching new data for Crossref Events, when True, falls back to using a previous version of the table. :param data_partners: OAEBU data sources. @@ -311,11 +278,6 @@ def create_dag( if bespoke_data_partners: data_partners.extend(create_bespoke_data_partners(bespoke_data_partners)) - # Create pool for crossref API calls (if they don't exist) - # Pools are necessary to throttle the maxiumum number of requests we can make per second and avoid 429 errors - crossref_events_pool = CrossrefEventsPool(pool_slots=MAX_POOL_SLOTS) - crossref_events_pool.create_pool() - @dag( dag_id=dag_id, schedule=schedule, @@ -500,58 +462,9 @@ def create_crossref_metadata_table(release: dict, **context) -> None: ) set_task_state(state, context["ti"].task_id, release=release) - @task(pool="crossref_events_pool") - def create_crossref_events_table(release: dict, **context) -> None: - """Download, transform, upload and create a table for crossref events""" - - release = OnixWorkflowRelease.from_dict(release) - - if skip_downloading_crossref_events: - raise AirflowSkipException("create_crossref_events_table: skipping fetching new Crossref Events data") - - # Get the unique dois from the metadata table - metadata_table_id = bq_sharded_table_id( - cloud_workspace.project_id, - bq_oaebu_crossref_dataset_id, - bq_oaebu_crossref_metadata_table_name, - release.snapshot_date, - ) - client = Client(project=cloud_workspace.project_id) - dois = dois_from_table(metadata_table_id, doi_column_name="DOI", client=client) - doi_prefixes = get_doi_prefixes(dois) - logging.info(f"Found DOI prefixes: {doi_prefixes}") - - # Download and transform all events - start_date = crossref_start_date - end_date = release.snapshot_date - input_path = release.download_crossref_events_path - output_path = release.transformed_crossref_events_path - download_crossref_events(input_path, doi_prefixes, start_date, end_date, mailto) - transform_crossref_events(input_path, dois, output_path) - - # Upload to google cloud and load into BigQuery - gcs_upload_files( - bucket_name=cloud_workspace.transform_bucket, file_paths=[release.transformed_crossref_events_path] - ) - table_id = bq_sharded_table_id( - cloud_workspace.project_id, - bq_oaebu_crossref_dataset_id, - bq_crossref_events_table_name, - release.snapshot_date, - ) - state = bq_load_table( - uri=gcs_blob_uri(cloud_workspace.transform_bucket, release.crossref_events_blob_name), - table_id=table_id, - schema_file_path=bq_find_schema(path=schema_folder, table_name=bq_crossref_events_table_name), - source_format=SourceFormat.NEWLINE_DELIMITED_JSON, - write_disposition="WRITE_TRUNCATE", - client=client, - ) - set_task_state(state, context["ti"].task_id, release=release) - @task(trigger_rule=TriggerRule.ALL_DONE) def create_book_table(release: dict, **context) -> None: - """Create the oaebu book table using the crossref event and metadata tables""" + """Create the oaebu book table using the crossref metadata table""" release = OnixWorkflowRelease.from_dict(release) bq_create_dataset( @@ -570,17 +483,8 @@ def create_book_table(release: dict, **context) -> None: release.snapshot_date, ) client = Client(project=cloud_workspace.project_id) - crossref_events_table_id = bq_select_latest_table( - table_id=bq_table_id( - cloud_workspace.project_id, bq_oaebu_crossref_dataset_id, bq_crossref_events_table_name - ), - end_date=release.snapshot_date, - sharded=True, - client=client, - ) sql = render_template( os.path.join(sql_folder(workflow_module="onix_workflow"), "book.sql.jinja2"), - crossref_events_table_id=crossref_events_table_id, crossref_metadata_table_id=crossref_metadata_table_id, ) logging.info(sql) @@ -755,11 +659,6 @@ def create_tasks_export_tables(release): "query_template": os.path.join(sql_folder("onix_workflow"), "book_list.sql.jinja2"), "schema": os.path.join(default_schema_folder("onix_workflow"), "book_list.json"), }, - { - "output_table": "book_metrics_events", - "query_template": os.path.join(sql_folder("onix_workflow"), "book_metrics_events.sql.jinja2"), - "schema": os.path.join(default_schema_folder("onix_workflow"), "book_metrics_events.json"), - }, ] if "jstor_institution" in [dp.type_id for dp in data_partners]: generic_export_tables.append( @@ -1034,7 +933,6 @@ def cleanup_workflow(release: dict, **context): xcom_release = make_release() task_aggregate_works = aggregate_works(xcom_release) task_create_crossref_metadata_table = create_crossref_metadata_table(xcom_release) - task_create_crossref_events_table = create_crossref_events_table(xcom_release) task_create_book_table = create_book_table(xcom_release) task_group_create_intermediate_tables = _create_tasks_intermediate_tables(xcom_release) task_create_book_product_table = create_book_product_table(xcom_release) @@ -1049,7 +947,6 @@ def cleanup_workflow(release: dict, **context): >> xcom_release >> task_aggregate_works >> task_create_crossref_metadata_table - >> task_create_crossref_events_table >> task_create_book_table >> task_group_create_intermediate_tables >> task_create_book_product_table @@ -1156,194 +1053,10 @@ def dois_from_table(table_id: str, doi_column_name: str = "DOI", client: Client return dois -def download_crossref_events( - file_path: str, - doi_prefixes: Iterable[str], - start_date: pendulum.DateTime, - end_date: pendulum.DateTime, - mailto: str, -): - """ - Spawns multiple threads to download event data (DOI and publisher only) for each doi supplied. - The url template was made with reference to the crossref event api: - https://www.eventdata.crossref.org/guide/service/query-api/ - Note that the max_threads will cap at 15 because the events API will return a 429 if more than 15 requests are made - per second. Each API request happens to take roughly 1 second. Having more threadsthan necessary slows down the - download process as the retry script will wait a minimum of two seconds between each attempt. - - :param file_path: the path where the events should be saved. - :param doi_prefixes: the prefixes of DOIs to download data for. - :param dois: the prefixes of DOIs to download data for. - :param start_date: The start date for events we're interested in. - :param end_date: The end date for events we're interested in. - :param mailto: The email to use as a reference for who is requesting the data. - :return: All events for the input DOIs. - """ - - # Create queries per year - # requests = [] - # for prefix in doi_prefixes: - # for dt in pendulum.Period(start=start_date, end=end_date).range("years"): - # period_start = dt.start_of("year") - # period_end = dt.end_of("year") - # - # # Make sure the end date does not exceed the original end date - # if period_end > end_date: - # period_end = end_date - # - # request = Request( - # method="GET", - # url="https://api.eventdata.crossref.org/v1/events", - # params={ - # "mailto": mailto, - # "from-collected-date": period_start.strftime("%Y-%m-%d"), - # "until-collected-date": period_end.strftime("%Y-%m-%d"), - # "rows": 1000, - # "obj-id.prefix": prefix, - # }, - # ) - # requests.append(request) - requests = [ - Request( - method="GET", - url="https://api.eventdata.crossref.org/v1/events", - params={ - "mailto": mailto, - "from-collected-date": start_date.strftime("%Y-%m-%d"), - "until-collected-date": end_date.strftime("%Y-%m-%d"), - "rows": 1000, - "obj-id.prefix": prefix, - }, - ) - for prefix in doi_prefixes - ] - - logging.info(f"Beginning Crossref Event data download with {len(requests)} requests") - for request in requests: - paginate_crossref_events(request, file_path) - - -def paginate_crossref_events(request: Request, file_path: str): - """Downloads all crossref events from a url, iterating through pages if there is more than one - - :param request: the request. - :param file_path: Worker number - :return: The events from this URL - """ - - url = Session().prepare_request(request).url - logging.info(f"Fetching pages for {url} and saving to {file_path}") - next_cursor = None - total_counts = 0 - headers = oaebu_user_agent_header() - - while True: - # Update request parameters only if next_cursor is not None - if next_cursor is not None: - request.params["cursor"] = next_cursor - - # Download page - next_cursor, page_counts, _, page_events = download_crossref_events_page(request, headers) - - # Accumulate the results - total_counts += page_counts - - # Append data to file - with open(file_path, mode="a") as f: - with jsonlines.Writer(f) as writer: - writer.write_all(page_events) - - # Break the loop if no more cursor is provided - if not next_cursor: - break - - logging.info(f"Successful: {url} ") - logging.info(f"Total no. events: {total_counts}") - - -def download_crossref_events_page(request: Request, headers: dict) -> Tuple[str, int, int, List[dict]]: - """Download crossref events from a single page - - :param request: the request. - :param headers: Headers to send with the request - :return: The cursor, event counter, total number of events and the events for the URL - """ - - crossref_events_limiter() - url = Session().prepare_request(request).url - response = retry_get_url(url, num_retries=5, wait=wait_exponential_jitter(initial=0.5, max=60), headers=headers) - response_json = response.json() - total_events = response_json["message"]["total-results"] - events = response_json["message"]["events"] - next_cursor = response_json["message"]["next-cursor"] - counter = len(events) - - return next_cursor, counter, total_events, events - - -@sleep_and_retry -@limits(calls=CALLS_PER_PERIOD, period=PERIOD) -def crossref_events_limiter(): - """Task to throttle the calls to the crossref events API""" - return - - def clean_doi(doi: str) -> str: return re.sub(r"^https?://doi\.org/", "", doi).upper() -def transform_crossref_events(input_path: str, dois_filter: set[str], output_path: str): - """Transforms crossref events. - - :param input_path: A list of the events to transform - :param dois_filter: a set of DOIs to filter the input by. - :param output_path: The maximum number of threads to utilise for the transforming process - """ - - logging.info(f"Beginning crossref event transform") - - with open(output_path, mode="w") as f: - with jsonlines.Writer(f) as writer: - for event in yield_jsonl(input_path): - doi = clean_doi(event["obj_id"]) - if doi in dois_filter: - row = transform_event(event) - writer.write(row) - - logging.info("Crossref event transformation complete") - - -def transform_event(event: dict) -> dict: - """Transform the dictionary with event data by replacing '-' with '_' in key names, converting all int values to - string except for the 'total' field and parsing datetime columns for a valid datetime. - - :param event: The event dictionary - :return: The transformed event dictionary - """ - - if isinstance(event, (str, int, float)): - return event - if isinstance(event, dict): - new = event.__class__() - for k, v in event.items(): - if isinstance(v, int) and k != "total": - v = str(v) - if k in ["timestamp", "occurred_at", "issued", "dateModified", "updated_date"]: - try: - v = str(pendulum.parse(v)) - except ValueError: - v = "0001-01-01T00:00:00Z" - - # Replace hyphens with underscores for BigQuery compatibility - k = k.replace("-", "_") - - # Replace @ symbol in keys left by DataCite between the 15 and 22 March 2019 - k = k.replace("@", "") - - new[k] = transform_event(v) - return new - - def copy_latest_export_tables( project_id: str, from_dataset: str, to_dataset: str, date_match: str, data_location: str, description: str = None ) -> None: diff --git a/dags/oaebu_workflows/onix_workflow/schema/book.json b/dags/oaebu_workflows/onix_workflow/schema/book.json index 99798484..8f68df0d 100644 --- a/dags/oaebu_workflows/onix_workflow/schema/book.json +++ b/dags/oaebu_workflows/onix_workflow/schema/book.json @@ -80,85 +80,5 @@ "name": "chapters", "type": "RECORD", "description": "Crossref Objects (that are of type chapter) associated with the primary ISBN" - }, - { - "fields": [ - { - "fields": [ - { - "mode": "NULLABLE", - "name": "source", - "type": "STRING", - "description": "Event Source" - }, - { - "mode": "NULLABLE", - "name": "count", - "type": "INTEGER", - "description": "Count of Events" - } - ], - "mode": "REPEATED", - "name": "overall", - "type": "RECORD", - "description": "Overall Event Count" - }, - { - "fields": [ - { - "mode": "NULLABLE", - "name": "month", - "type": "STRING", - "description": "Month of the count" - }, - { - "mode": "NULLABLE", - "name": "source", - "type": "STRING", - "description": "Event Source" - }, - { - "mode": "NULLABLE", - "name": "count", - "type": "INTEGER", - "description": "Event Count in time period" - } - ], - "mode": "REPEATED", - "name": "months", - "type": "RECORD", - "description": "Event counts broken down by month" - }, - { - "fields": [ - { - "mode": "NULLABLE", - "name": "year", - "type": "INTEGER", - "description": "Year of count" - }, - { - "mode": "NULLABLE", - "name": "source", - "type": "STRING", - "description": "Event Source" - }, - { - "mode": "NULLABLE", - "name": "count", - "type": "INTEGER", - "description": "Event count in time period" - } - ], - "mode": "REPEATED", - "name": "years", - "type": "RECORD", - "description": "Event counts broken down by month" - } - ], - "mode": "NULLABLE", - "name": "events", - "type": "RECORD", - "description": "Crossref events assoicated with ISBN" } ] diff --git a/dags/oaebu_workflows/onix_workflow/schema/book_metrics.json b/dags/oaebu_workflows/onix_workflow/schema/book_metrics.json index dba84459..554277cc 100644 --- a/dags/oaebu_workflows/onix_workflow/schema/book_metrics.json +++ b/dags/oaebu_workflows/onix_workflow/schema/book_metrics.json @@ -146,19 +146,5 @@ "name": "month", "type": "DATE", "description": "The month in which the metrics took place" - }, - { - "fields": [ - { - "mode": "NULLABLE", - "name": "count", - "type": "INTEGER", - "description": "Count of events" - } - ], - "mode": "NULLABLE", - "name": "crossref_events", - "type": "RECORD", - "description": "Metrics from Crossref Events" } ] diff --git a/dags/oaebu_workflows/onix_workflow/schema/book_metrics_author.json b/dags/oaebu_workflows/onix_workflow/schema/book_metrics_author.json index f8fb76e2..8c887243 100644 --- a/dags/oaebu_workflows/onix_workflow/schema/book_metrics_author.json +++ b/dags/oaebu_workflows/onix_workflow/schema/book_metrics_author.json @@ -28,19 +28,6 @@ "name": "month", "type": "DATE", "description": "Month in which metrics took place" - }, - { - "fields": [ - { - "mode": "NULLABLE", - "name": "count", - "type": "INTEGER", - "description": "Count of events" - } - ], - "mode": "NULLABLE", - "name": "crossref_events", - "type": "RECORD", - "description": "Metrics from Crossref events" } -] \ No newline at end of file +] + diff --git a/dags/oaebu_workflows/onix_workflow/schema/book_metrics_events.json b/dags/oaebu_workflows/onix_workflow/schema/book_metrics_events.json deleted file mode 100644 index 40140bdf..00000000 --- a/dags/oaebu_workflows/onix_workflow/schema/book_metrics_events.json +++ /dev/null @@ -1,170 +0,0 @@ -[ - { - "mode": "NULLABLE", - "name": "product_id", - "type": "STRING", - "description": "Book Product ID" - }, - { - "mode": "NULLABLE", - "name": "work_id", - "type": "STRING", - "description": "Book Work ID" - }, - { - "mode": "NULLABLE", - "name": "work_family_id", - "type": "STRING", - "description": "Book Work Family ID" - }, - { - "mode": "NULLABLE", - "name": "title", - "type": "STRING", - "description": "The title of the book" - }, - { - "mode": "NULLABLE", - "name": "subtitle", - "type": "STRING", - "description": "Subtitle of the Book" - }, - { - "mode": "NULLABLE", - "name": "title_subtitle", - "type": "STRING", - "description": "The concatenated title and subtitle of the Book" - }, - { - "mode": "NULLABLE", - "name": "published_year", - "type": "INTEGER", - "description": "The publisher year of the book" - }, - { - "mode": "NULLABLE", - "name": "published_date", - "type": "DATE", - "description": "The publisher Date of the book" - }, - { - "mode": "NULLABLE", - "name": "publisher_name", - "type": "STRING", - "description": "The name of the publisher" - }, - { - "mode": "NULLABLE", - "name": "month", - "type": "DATE", - "description": "The month for which the metrics apply to" - }, - { - "mode": "NULLABLE", - "name": "event_source", - "type": "STRING", - "description": "Event Source" - }, - { - "fields": [ - { - "mode": "REPEATED", - "name": "bic_codes", - "type": "STRING", - "description": "A list of BIC subject codes" - }, - { - "mode": "REPEATED", - "name": "bic_top", - "type": "STRING", - "description": "A list of BIC subject codes for the top level subject" - }, - { - "mode": "REPEATED", - "name": "bic_names", - "type": "STRING", - "description": "A list of BIC subject names" - }, - { - "mode": "REPEATED", - "name": "bisac_codes", - "type": "STRING", - "description": "A list of BISAC subject codes" - }, - { - "mode": "REPEATED", - "name": "bisac_top", - "type": "STRING", - "description": "A list of BISAC subject codes for the top level subject" - }, - { - "mode": "REPEATED", - "name": "bisac_names", - "type": "STRING", - "description": "A list of BISAC subject names" - }, - { - "mode": "REPEATED", - "name": "thema_codes", - "type": "STRING", - "description": "A list of Thema subject codes" - }, - { - "mode": "REPEATED", - "name": "thema_top", - "type": "STRING", - "description": "A list of Thema subject codes for the top level subject" - }, - { - "mode": "REPEATED", - "name": "thema_names", - "type": "STRING", - "description": "A list of Thema subject names" - } - ], - "mode": "NULLABLE", - "name": "subjects", - "type": "RECORD", - "description": "Subjects associated with this product" - }, - { - "fields": [ - { - "mode": "NULLABLE", - "name": "person_name", - "type": "STRING", - "description": "Name of the author" - }, - { - "mode": "NULLABLE", - "name": "person_name_inverted", - "type": "STRING", - "description": "Inverted name of the author" - }, - { - "mode": "NULLABLE", - "name": "ORCID", - "type": "STRING", - "description": "Author's ORCID identifier" - } - ], - "mode": "REPEATED", - "name": "authors", - "type": "RECORD", - "description": "Author information" - }, - { - "fields": [ - { - "mode": "NULLABLE", - "name": "count", - "type": "INTEGER", - "description": "Count of Events" - } - ], - "mode": "NULLABLE", - "name": "crossref_events", - "type": "RECORD", - "description": "Metrics from Crossref Events" - } -] diff --git a/dags/oaebu_workflows/onix_workflow/schema/book_metrics_subject_bic.json b/dags/oaebu_workflows/onix_workflow/schema/book_metrics_subject_bic.json index 41da248c..e7a345d2 100644 --- a/dags/oaebu_workflows/onix_workflow/schema/book_metrics_subject_bic.json +++ b/dags/oaebu_workflows/onix_workflow/schema/book_metrics_subject_bic.json @@ -22,19 +22,6 @@ "name": "month", "type": "DATE", "description": "The month in which the metrics occurred" - }, - { - "fields": [ - { - "mode": "NULLABLE", - "name": "count", - "type": "INTEGER", - "description": "Count of events" - } - ], - "mode": "NULLABLE", - "name": "crossref_events", - "type": "RECORD", - "description": "Metrics from Crossref events" } -] \ No newline at end of file +] + diff --git a/dags/oaebu_workflows/onix_workflow/schema/book_metrics_subject_bisac.json b/dags/oaebu_workflows/onix_workflow/schema/book_metrics_subject_bisac.json index 73178873..bf2a5688 100644 --- a/dags/oaebu_workflows/onix_workflow/schema/book_metrics_subject_bisac.json +++ b/dags/oaebu_workflows/onix_workflow/schema/book_metrics_subject_bisac.json @@ -22,19 +22,6 @@ "name": "month", "type": "DATE", "description": "The month in which the metrics occurred" - }, - { - "fields": [ - { - "mode": "NULLABLE", - "name": "count", - "type": "INTEGER", - "description": "Count of events" - } - ], - "mode": "NULLABLE", - "name": "crossref_events", - "type": "RECORD", - "description": "Metrics from Crossref events" } -] \ No newline at end of file +] + diff --git a/dags/oaebu_workflows/onix_workflow/schema/book_metrics_subject_thema.json b/dags/oaebu_workflows/onix_workflow/schema/book_metrics_subject_thema.json index 090ac190..669bea9e 100644 --- a/dags/oaebu_workflows/onix_workflow/schema/book_metrics_subject_thema.json +++ b/dags/oaebu_workflows/onix_workflow/schema/book_metrics_subject_thema.json @@ -22,19 +22,6 @@ "name": "month", "type": "DATE", "description": "The month in which the metrics occurred" - }, - { - "fields": [ - { - "mode": "NULLABLE", - "name": "count", - "type": "INTEGER", - "description": "Count of events" - } - ], - "mode": "NULLABLE", - "name": "crossref_events", - "type": "RECORD", - "description": "Metrics from Crossref events" } -] \ No newline at end of file +] + diff --git a/dags/oaebu_workflows/onix_workflow/schema/book_product.json b/dags/oaebu_workflows/onix_workflow/schema/book_product.json index 65e9918f..fbc722cf 100644 --- a/dags/oaebu_workflows/onix_workflow/schema/book_product.json +++ b/dags/oaebu_workflows/onix_workflow/schema/book_product.json @@ -277,26 +277,6 @@ "name": "chapters", "type": "RECORD", "description": "Linked Objects from Crossref where they are of type book-chapter only" - }, - { - "fields": [ - { - "mode": "NULLABLE", - "name": "source", - "type": "STRING", - "description": "Event Source Type" - }, - { - "mode": "NULLABLE", - "name": "count", - "type": "INTEGER", - "description": "Count of events" - } - ], - "mode": "REPEATED", - "name": "events", - "type": "RECORD", - "description": "Count of events from Crossref Events" } ], "mode": "NULLABLE", @@ -311,31 +291,11 @@ "name": "month", "type": "DATE", "description": "Month of Recorded Metrics" - }, - { - "fields": [ - { - "mode": "NULLABLE", - "name": "source", - "type": "STRING", - "description": "The event source" - }, - { - "mode": "NULLABLE", - "name": "count", - "type": "INTEGER", - "description": "The count of events" - } - ], - "mode": "REPEATED", - "name": "crossref_events", - "type": "RECORD", - "description": "Metrics Derived From Crossref Events" } ], "mode": "REPEATED", "name": "months", "type": "RECORD", - "description": "Linked Metrics from all sources, organised by month of occurance" + "description": "Linked Metrics from all sources, organised by month of occurence" } ] diff --git a/dags/oaebu_workflows/onix_workflow/schema/crossref_events.json b/dags/oaebu_workflows/onix_workflow/schema/crossref_events.json deleted file mode 100644 index 95fd1995..00000000 --- a/dags/oaebu_workflows/onix_workflow/schema/crossref_events.json +++ /dev/null @@ -1,552 +0,0 @@ -[ - { - "description": "Unique ID for the Event.", - "mode": "REQUIRED", - "name": "id", - "type": "STRING" - }, - { - "description": "Subject persistent ID.", - "mode": "NULLABLE", - "name": "subj_id", - "type": "STRING" - }, - { - "description": "Type of the relationship between the subject and object.", - "mode": "NULLABLE", - "name": "relation_type_id", - "type": "STRING" - }, - { - "description": "Object persistent ID.", - "mode": "NULLABLE", - "name": "obj_id", - "type": "STRING" - }, - { - "description": "Timestamp of when the Event was created.", - "mode": "REQUIRED", - "name": "timestamp", - "type": "TIMESTAMP" - }, - { - "description": "Timestamp of when the Event is reported to have occurred.", - "mode": "REQUIRED", - "name": "occurred_at", - "type": "TIMESTAMP" - }, - { - "description": "", - "mode": "NULLABLE", - "name": "experimental", - "type": "BOOL" - }, - { - "description": "", - "mode": "NULLABLE", - "name": "total", - "type": "INTEGER" - }, - { - "description": "A name for the source.", - "mode": "REQUIRED", - "name": "source_id", - "type": "STRING" - }, - { - "description": "Unique ID that identifies the Agent that generated the Event.", - "mode": "NULLABLE", - "name": "source_token", - "type": "STRING" - }, - { - "description": "Terms of use for using the API at the point that you acquire the Event.", - "mode": "NULLABLE", - "name": "terms", - "type": "STRING" - }, - { - "description": "A license under which the Event is made available.", - "mode": "NULLABLE", - "name": "license", - "type": "STRING" - }, - { - "description": "Link to an Evidence Record for this Event.", - "mode": "NULLABLE", - "name": "evidence_record", - "type": "STRING" - }, - { - "description": "Subject metadata.", - "mode": "NULLABLE", - "name": "subj", - "type": "RECORD", - "fields": [ - { - "description": "The persistent ID. Must correspond to 'subj_id' or 'obj_id'", - "mode": "NULLABLE", - "name": "pid", - "type": "STRING" - }, - { - "description": "Publication date.", - "mode": "NULLABLE", - "name": "issued", - "type": "TIMESTAMP" - }, - { - "description": "The title of the webpage, comment, etc.", - "mode": "NULLABLE", - "name": "title", - "type": "STRING" - }, - { - "description": "Author of the comment, blog etc.", - "mode": "REPEATED", - "name": "author", - "type": "RECORD", - "fields": [ - { - "mode": "NULLABLE", - "name": "url", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "name", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "id", - "type": "STRING" - } - ] - }, - { - "description": "URL where this was found. May be different to 'pid'", - "mode": "NULLABLE", - "name": "url", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "alternative_id", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "original_tweet_author", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "original_tweet_url", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "type", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "work_type_id", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "work_subtype_id", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "jurisdiction", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "api_url", - "type": "STRING" - }, - { - "mode": "REPEATED", - "name": "publisher", - "type": "RECORD", - "fields": [ - { - "mode": "NULLABLE", - "name": "url", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "name", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "id", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "type", - "type": "STRING" - } - ] - }, - { - "mode": "NULLABLE", - "name": "json_url", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "name", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "datePublished", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "registrantId", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "dateModified", - "type": "TIMESTAMP" - }, - { - "mode": "NULLABLE", - "name": "id", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "proxyIdentifiers", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "funder", - "type": "RECORD", - "fields": [ - { - "mode": "NULLABLE", - "name": "id", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "type", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "name", - "type": "STRING" - } - ] - }, - { - "mode": "NULLABLE", - "name": "issueNumber", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "periodical", - "type": "RECORD", - "fields": [ - { - "mode": "NULLABLE", - "name": "id", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "issn", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "type", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "name", - "type": "STRING" - } - ] - }, - { - "mode": "NULLABLE", - "name": "pagination", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "version", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "volumeNumber", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "includedInDataCatalog", - "type": "RECORD", - "fields": [ - { - "mode": "NULLABLE", - "name": "id", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "type", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "name", - "type": "STRING" - } - ] - } - ] - }, - { - "description": "Object metadata.", - "mode": "REPEATED", - "name": "obj", - "type": "RECORD", - "fields": [ - { - "mode": "NULLABLE", - "name": "pid", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "url", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "method", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "verification", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "work_type_id", - "type": "STRING" - }, - { - "mode": "REPEATED", - "name": "publisher", - "type": "RECORD", - "fields": [ - { - "mode": "NULLABLE", - "name": "url", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "name", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "id", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "type", - "type": "STRING" - } - ] - }, - { - "mode": "NULLABLE", - "name": "name", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "datePublished", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "registrantId", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "dateModified", - "type": "TIMESTAMP" - }, - { - "mode": "NULLABLE", - "name": "id", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "proxyIdentifiers", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "author", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "type", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "funder", - "type": "RECORD", - "fields": [ - { - "mode": "NULLABLE", - "name": "id", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "type", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "name", - "type": "STRING" - } - ] - }, - { - "mode": "NULLABLE", - "name": "issueNumber", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "periodical", - "type": "RECORD", - "fields": [ - { - "mode": "NULLABLE", - "name": "id", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "issn", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "type", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "name", - "type": "STRING" - } - ] - }, - { - "mode": "NULLABLE", - "name": "pagination", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "version", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "volumeNumber", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "includedInDataCatalog", - "type": "RECORD", - "fields": [ - { - "mode": "NULLABLE", - "name": "id", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "type", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "name", - "type": "STRING" - } - ] - } - ] - }, - { - "description": "will have a value of 'deleted' or 'edited'", - "mode": "NULLABLE", - "name": "updated", - "type": "STRING" - }, - { - "description": "optional, may point to an announcement page explaining the edit", - "mode": "NULLABLE", - "name": "updated_reason", - "type": "STRING" - }, - { - "description": "ISO8601 date string for when the event was updated", - "mode": "NULLABLE", - "name": "updated_date", - "type": "TIMESTAMP" - }, - { - "mode": "NULLABLE", - "name": "message_action", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "action", - "type": "STRING" - }, - { - "mode": "NULLABLE", - "name": "jwt", - "type": "STRING" - } -] \ No newline at end of file diff --git a/dags/oaebu_workflows/onix_workflow/sql/book.sql.jinja2 b/dags/oaebu_workflows/onix_workflow/sql/book.sql.jinja2 index 32ce4bf7..1a871c3e 100644 --- a/dags/oaebu_workflows/onix_workflow/sql/book.sql.jinja2 +++ b/dags/oaebu_workflows/onix_workflow/sql/book.sql.jinja2 @@ -109,31 +109,6 @@ RETURNS STRING WITH --- Intermediate crossref events table -temp_intermediate_crossref_events as ( -SELECT - doi, - ARRAY(SELECT as STRUCT source, SUM(count) as count FROM UNNEST(months) GROUP BY source) as events, - months, - ARRAY(SELECT as STRUCT CAST(SPLIT(month, "-")[SAFE_OFFSET(0)] as int64) as year, source, SUM(count) as count FROM UNNEST(months) GROUP BY year, source) as years -FROM ( -SELECT - doi, - ARRAY_AGG(STRUCT(month, source, count)) as months -FROM ( - SELECT - (UPPER(TRIM(SUBSTR(obj_id, 17)))) as doi, - safe.FORMAT_TIMESTAMP('%Y-%m', occurred_at) as month, - source_id as source, - COUNT(id) as count - FROM `{{ crossref_events_table_id }}` - WHERE safe.FORMAT_TIMESTAMP('%Y-%m', occurred_at) is not null - GROUP BY - doi, source_id, month) -GROUP BY doi -)), - - -- Temp DOI table dois_temp_table as ( SELECT @@ -147,8 +122,7 @@ FROM CONCAT(issued.date_parts[offset(0)], "-", CASE WHEN ARRAY_LENGTH(issued.date_parts) > 1 THEN issued.date_parts[offset(1)] ELSE 13 END) as published_year_month, type, ISSN, ISBN, issn_type, publisher_location, publisher, member, prefix, container_title, short_container_title, group_title, references_count, is_referenced_by_count, subject, published_print, license, volume, funder, page, author, link, clinical_trial_number, alternative_id - ) as crossref, - (SELECT as STRUCT * from temp_intermediate_crossref_events as events WHERE events.doi = UPPER(TRIM(ref.doi))) as events + ) as crossref FROM `{{ crossref_metadata_table_id }}` as ref WHERE ARRAY_LENGTH(issued.date_parts) > 0)), @@ -190,38 +164,14 @@ SELECT ) as book_part on book.isbn = book_part.isbn), --- Temp aggregated events ISBN_DOI as (SELECT ISBN, ARRAY_CONCAT(ARRAY(SELECT doi FROM book.crossref_objects), ARRAY(SELECT doi FROM book.chapters)) as dois -FROM books as book), - -events_matched as (SELECT - ISBN, - ARRAY_CONCAT_AGG(crossref_events.events) as events, - ARRAY_CONCAT_AGG(crossref_events.months) as months, - ARRAY_CONCAT_AGG(crossref_events.years) as years, -FROM ISBN_DOI, UNNEST(dois) as doi -LEFT JOIN temp_intermediate_crossref_events as crossref_events ON crossref_events.doi = doi -GROUP BY ISBN), - -events_aggregated as (SELECT - ISBN, - ARRAY(SELECT as STRUCT source, SUM(count) as count FROM UNNEST(events) GROUP BY source) as overall, - ARRAY(SELECT as STRUCT month, source, SUM(count) as count FROM UNNEST(months) GROUP BY month, source ORDER BY month DESC) as months, - ARRAY(SELECT as STRUCT year, source, SUM(count) as count FROM UNNEST(years) GROUP BY year, source ORDER BY year DESC) as years -FROM events_matched) - +FROM books as book) -- Main Query SELECT book.isbn, book.crossref_objects, - book.chapters, - STRUCT( - events.overall, - events.months, - events.years - ) as events + book.chapters FROM books as book -LEFT join events_aggregated as events on events.ISBN = book.ISBN \ No newline at end of file diff --git a/dags/oaebu_workflows/onix_workflow/sql/book_metrics.sql.jinja2 b/dags/oaebu_workflows/onix_workflow/sql/book_metrics.sql.jinja2 index c59c0019..9dcb9d4f 100644 --- a/dags/oaebu_workflows/onix_workflow/sql/book_metrics.sql.jinja2 +++ b/dags/oaebu_workflows/onix_workflow/sql/book_metrics.sql.jinja2 @@ -37,12 +37,12 @@ SELECT (SELECT p.publisher_name as publisher_name FROM UNNEST(onix.publisher) as p WHERE p.publishing_role = "Publisher" LIMIT 1) as publisher_name, -- pull the publisher name from the onix.publisher field month.month, {% for dp in data_partners | selectattr("export_book_metrics", "equalto", True) %} - {% include dp.files.book_metrics_sql %}, + {% include dp.files.book_metrics_sql %} + {% if not loop.last %},{% endif %} {% endfor %} - STRUCT((SELECT SUM(count) FROM UNNEST(month.crossref_events)) as count) as crossref_events FROM `{{ book_product_table_id }}`, UNNEST(months) AS month WHERE {% for dp in data_partners | selectattr("export_book_metrics", "equalto", True) %} - {% include dp.files.month_null_sql %} OR + {% include dp.files.month_null_sql %} + {% if not loop.last %} OR{% endif %} {% endfor %} -ARRAY_LENGTH(month.crossref_events) > 0 diff --git a/dags/oaebu_workflows/onix_workflow/sql/book_metrics_author.sql.jinja2 b/dags/oaebu_workflows/onix_workflow/sql/book_metrics_author.sql.jinja2 index 9c496172..ce7f12a5 100644 --- a/dags/oaebu_workflows/onix_workflow/sql/book_metrics_author.sql.jinja2 +++ b/dags/oaebu_workflows/onix_workflow/sql/book_metrics_author.sql.jinja2 @@ -31,15 +31,15 @@ SELECT count(ISBN13) as unique_books, month.month, {% for dp in data_partners | selectattr("export_author", "equalto", True) %} - {% include dp.files.month_metrics_sum_sql %}, + {% include dp.files.month_metrics_sum_sql %} + {% if not loop.last %},{% endif %} {% endfor %} - STRUCT(group_counts(ARRAY_CONCAT_AGG(month.crossref_events)) as count) as crossref_events FROM `{{ book_product_table_id }}`, UNNEST(onix.authors) as author, UNNEST(months) as month WHERE {% for dp in data_partners | selectattr("export_author", "equalto", True) %} - {% include dp.files.month_null_sql %} OR + {% include dp.files.month_null_sql %} + {% if not loop.last %} OR{% endif %} {% endfor %} -ARRAY_LENGTH(month.crossref_events) > 0 GROUP BY author.PersonName, month ORDER BY author.PersonName ASC, month DESC diff --git a/dags/oaebu_workflows/onix_workflow/sql/book_metrics_events.sql.jinja2 b/dags/oaebu_workflows/onix_workflow/sql/book_metrics_events.sql.jinja2 deleted file mode 100644 index 76dec6c9..00000000 --- a/dags/oaebu_workflows/onix_workflow/sql/book_metrics_events.sql.jinja2 +++ /dev/null @@ -1,74 +0,0 @@ -{# Copyright 2020-2024 Curtin University -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# Author: Richard Hosking, Keegan Smith #} -{# -The purpose of this script it to export the book event metrics section from the book_product table -#} - -WITH publisher_names as ( - SELECT - publisher_name as publisher_name, - ISBN13 as product_id - FROM - `{{ book_product_table_id }}`, UNNEST(onix.publisher) - WHERE - publishing_role = "Publisher" -), - -authors as ( - SELECT - ISBN13 as product_id, - ARRAY( - SELECT STRUCT( - a.PersonName as person_name, - a.PersonNameInverted as person_name_inverted, - a.ORCID as ORCID - ) - FROM UNNEST(onix.authors) as a) as authors - FROM `{{ book_product_table_id }}` -), - -body as ( - SELECT - ISBN13 as product_id, - MAX(work_id) as work_id, - MAX(work_family_id) as work_family_id, - MAX(onix.title) as title, - MAX(onix.subtitle) as subtitle, - IFNULL(MAX(CONCAT(onix.title, ": ", onix.subtitle)), MAX(onix.title)) as title_subtitle, - CAST(MAX(onix.published_year) as INT64) as published_year, - MAX(onix.published_date) as published_date, - month.month, - events.source as event_source, - ANY_VALUE(subjects) as subjects, -- All subject structs created are the same for the group so take any value - STRUCT( - SUM(events.count) as count - ) as crossref_events, - FROM - `{{ book_product_table_id }}`, UNNEST(months) as month, UNNEST(month.crossref_events) as events - WHERE - events IS NOT NULL - GROUP BY - ISBN13, month, event_source -) - -SELECT - b.*, - p.publisher_name as publisher_name, - a.authors as authors -FROM - body AS b -LEFT JOIN publisher_names as p ON b.product_id = p.product_id -LEFT JOIN authors as a ON a.product_id = p.product_id diff --git a/dags/oaebu_workflows/onix_workflow/sql/book_metrics_subject_bic.sql.jinja2 b/dags/oaebu_workflows/onix_workflow/sql/book_metrics_subject_bic.sql.jinja2 index 9f3ae2ac..59e28835 100644 --- a/dags/oaebu_workflows/onix_workflow/sql/book_metrics_subject_bic.sql.jinja2 +++ b/dags/oaebu_workflows/onix_workflow/sql/book_metrics_subject_bic.sql.jinja2 @@ -43,19 +43,17 @@ FROM( month.month, {% for dp in data_partners | selectattr("export_subject", "equalto", True) %} - {% include dp.files.month_metrics_sum_sql %}, + {% include dp.files.month_metrics_sum_sql %} + {% if not loop.last %},{% endif %} {% endfor %} - STRUCT(group_counts(ARRAY_CONCAT_AGG(month.crossref_events)) as count) as crossref_events - FROM `{{ book_product_table_id }}`, UNNEST(top_level_subjects(onix.bic_subjects)) as subject, UNNEST(months) as month WHERE {% for dp in data_partners | selectattr("export_subject", "equalto", True) %} - {% include dp.files.month_null_sql %} OR + {% include dp.files.month_null_sql %}{% if not loop.last %} OR {% endif %} {% endfor %} - ARRAY_LENGTH(month.crossref_events) > 0 GROUP BY subject, month ORDER BY subject ASC, month DESC) as metrics LEFT JOIN `{{ bic_table_id }}` as bic on bic.code = subject diff --git a/dags/oaebu_workflows/onix_workflow/sql/book_metrics_subject_bisac.sql.jinja2 b/dags/oaebu_workflows/onix_workflow/sql/book_metrics_subject_bisac.sql.jinja2 index e35bc0f3..9c1f656f 100644 --- a/dags/oaebu_workflows/onix_workflow/sql/book_metrics_subject_bisac.sql.jinja2 +++ b/dags/oaebu_workflows/onix_workflow/sql/book_metrics_subject_bisac.sql.jinja2 @@ -43,19 +43,17 @@ FROM( month.month, {% for dp in data_partners | selectattr("export_subject", "equalto", True) %} - {% include dp.files.month_metrics_sum_sql %}, + {% include dp.files.month_metrics_sum_sql %} + {% if not loop.last %},{% endif %} {% endfor %} - STRUCT(group_counts(ARRAY_CONCAT_AGG(month.crossref_events)) as count) as crossref_events, - FROM `{{ book_product_table_id }}`, UNNEST(top_level_subjects(onix.bisac_subjects)) as subject, UNNEST(months) as month WHERE {% for dp in data_partners | selectattr("export_subject", "equalto", True) %} - {% include dp.files.month_null_sql %} OR + {% include dp.files.month_null_sql %}{% if not loop.last %} OR{% endif %} {% endfor %} - ARRAY_LENGTH(month.crossref_events) > 0 GROUP BY subject, month ORDER BY subject ASC, month DESC) as metrics LEFT JOIN `{{ bisac_table_id }}` as bisac on bisac.code = subject diff --git a/dags/oaebu_workflows/onix_workflow/sql/book_metrics_subject_thema.sql.jinja2 b/dags/oaebu_workflows/onix_workflow/sql/book_metrics_subject_thema.sql.jinja2 index 7dc0e076..a9d734d2 100644 --- a/dags/oaebu_workflows/onix_workflow/sql/book_metrics_subject_thema.sql.jinja2 +++ b/dags/oaebu_workflows/onix_workflow/sql/book_metrics_subject_thema.sql.jinja2 @@ -43,19 +43,16 @@ FROM( month.month, {% for dp in data_partners | selectattr("export_subject", "equalto", True) %} - {% include dp.files.month_metrics_sum_sql %}, + {% include dp.files.month_metrics_sum_sql %}{% if not loop.last %},{% endif %} {% endfor %} - STRUCT(group_counts(ARRAY_CONCAT_AGG(month.crossref_events)) as count) as crossref_events - FROM `{{ book_product_table_id }}`, UNNEST(top_level_subjects(onix.thema_subjects)) as subject, UNNEST(months) as month WHERE {% for dp in data_partners | selectattr("export_subject", "equalto", True) %} - {% include dp.files.month_null_sql %} OR + {% include dp.files.month_null_sql %}{% if not loop.last %} OR{% endif %} {% endfor %} - ARRAY_LENGTH(month.crossref_events) > 0 GROUP BY subject, month ORDER BY subject ASC, month DESC) as metrics LEFT JOIN `{{ thema_table_id }}` as thema on thema.code = subject diff --git a/dags/oaebu_workflows/onix_workflow/sql/book_product.sql.jinja2 b/dags/oaebu_workflows/onix_workflow/sql/book_product.sql.jinja2 index 8fec7bb9..3d02ec79 100644 --- a/dags/oaebu_workflows/onix_workflow/sql/book_product.sql.jinja2 +++ b/dags/oaebu_workflows/onix_workflow/sql/book_product.sql.jinja2 @@ -166,15 +166,6 @@ subjects AS ( FROM onix_ebook_titles), -crossref_events as ( - SELECT - public_data.isbn as ISBN13, - LAST_DAY(DATE(CAST(SPLIT(month_source.month, "-")[OFFSET(0)] as INT64), CAST(SPLIT(month_source.month, "-")[OFFSET(1)] as INT64), 1), MONTH) as release_date, - ARRAY_AGG(STRUCT(month_source.source, month_source.count)) as metrics - FROM `{{ book_table_id }}` as public_data, UNNEST(public_data.events.months) as month_source - GROUP BY public_data.isbn, month_source.month -), - {# The purpose of the block of SQL is to select a unique set of release dates (also read as unique months). The secondary purpose of this block is to not include months for which no metrics source has any data for a particular release. @@ -186,9 +177,8 @@ unique_releases as ( FROM UNNEST(ARRAY_CONCAT( {% for dp in data_partners %} - ARRAY(SELECT DISTINCT(release_date) FROM {{ dp.type_id + "_metrics" }}), + ARRAY(SELECT DISTINCT(release_date) FROM {{ dp.type_id + "_metrics" }}){% if not loop.last %},{% endif %} {% endfor %} - ARRAY(SELECT DISTINCT(release_date) FROM crossref_events) ) ) as release_date ORDER BY release_date DESC @@ -221,18 +211,15 @@ metrics as ( {% for dp in data_partners %} {{ dp.type_id + ".metrics" }} AS {{ dp.type_id }}, {% endfor %} - ebook_months.release_date as month, - crossref_events.metrics as crossref_events - + ebook_months.release_date as month ) ORDER BY ebook_months.release_date DESC) as months FROM ebook_months {% for dp in data_partners %} LEFT JOIN {{ dp.type_id + "_metrics" }} AS {{ dp.type_id }} ON ebook_months.ISBN13 = {{ dp.type_id + ".ISBN13" }} AND ebook_months.release_date = {{ dp.type_id + ".release_date" }} {% endfor %} - LEFT JOIN crossref_events as crossref_events ON ebook_months.ISBN13 = crossref_events.ISBN13 AND ebook_months.release_date = crossref_events.release_date - WHERE crossref_events.metrics IS NOT NULL + WHERE {% for dp in data_partners %} - OR {{ dp.type_id + ".metrics" }} IS NOT NULL + {{ dp.type_id + ".metrics" }} IS NOT NULL {% if not loop.last %} OR {% endif %} {% endfor %} GROUP BY ebook_months.ISBN13 ) @@ -263,7 +250,8 @@ SELECT {% for dp in data_partners | selectattr("has_metadata", "equalto", True) %} {{ dp.type_id + "_metadata" }} AS {{ dp.type_id }}, {% endfor %} - crossref_objects, chapters, events.overall AS events + crossref_objects, + chapters ) AS metadata, metrics.months FROM onix_ebook_titles diff --git a/dags/oaebu_workflows/onix_workflow/sql/crossref_events_filter_doi.sql.jinja2 b/dags/oaebu_workflows/onix_workflow/sql/crossref_events_filter_doi.sql.jinja2 deleted file mode 100644 index 0ca93d55..00000000 --- a/dags/oaebu_workflows/onix_workflow/sql/crossref_events_filter_doi.sql.jinja2 +++ /dev/null @@ -1,33 +0,0 @@ -/* - Copyright 2020 Curtin University - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - Author: Keegan Smith - */ -/* -This script creates a temporary table full of the supplied DOIs. -It then joins these DOIs on the Crossref Events table. Also filters for events ocurring between start and end date. -This gives the same result as using a WHERE statement for each DOI. -*/ -CREATE TEMPORARY TABLE doi_table (DOI STRING); - -{% for doi in dois %} -INSERT INTO doi_table (DOI) VALUES ('{{ doi }}'); -{% endfor %} - -SELECT - xref.* -FROM - `{{ project_id }}.{{ dataset_id }}.{{ crossref_events_table_id }}` xref - JOIN doi_table ON xref.DOI = doi_table.DOI - WHERE xref.ocurred_at >= {{ start_date }} AND xref.ocurred_at <= {{ end_date }} \ No newline at end of file diff --git a/tests/onix_workflow/fixtures/crossref_events_api_calls.yaml b/tests/onix_workflow/fixtures/crossref_events_api_calls.yaml deleted file mode 100644 index 0f217cf7..00000000 --- a/tests/onix_workflow/fixtures/crossref_events_api_calls.yaml +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:ccbea9f763b958410daa46fe585039c9be2fa358e732a8a8c8abcab77e3390dd -size 4856419 diff --git a/tests/onix_workflow/fixtures/crossref_events_request.yaml b/tests/onix_workflow/fixtures/crossref_events_request.yaml deleted file mode 100644 index 1592e713..00000000 --- a/tests/onix_workflow/fixtures/crossref_events_request.yaml +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:cb730f8464bc82b07113515eb4c097d31002167703d5f0918e11e80bf86dbe0e -size 11274 diff --git a/tests/onix_workflow/fixtures/e2e_outputs/book.json b/tests/onix_workflow/fixtures/e2e_outputs/book.json index e9ba9565..f29e589a 100644 --- a/tests/onix_workflow/fixtures/e2e_outputs/book.json +++ b/tests/onix_workflow/fixtures/e2e_outputs/book.json @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:1a46c8d7528be43aed9ba80fc06367fc9a756d6b1ac99b7140640ad6db0b16ee -size 3183 +oid sha256:8ab707a7d13e5391ce88c63c7c4a6ec3c3a1b4bcb0717b76d2853649b286fa10 +size 1040 diff --git a/tests/onix_workflow/fixtures/e2e_outputs/book_list.json b/tests/onix_workflow/fixtures/e2e_outputs/book_list.json index 8c785b7c..72227aca 100644 --- a/tests/onix_workflow/fixtures/e2e_outputs/book_list.json +++ b/tests/onix_workflow/fixtures/e2e_outputs/book_list.json @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:9d4819c22beb2be183463f88a09c552529cee4855e94baecf67a4ab64427a184 -size 2985 +oid sha256:cb7de8b4a6804d2dde1cbcd6e9ebf0b12077346c26a42c72dcf7e75386c9cf64 +size 2986 diff --git a/tests/onix_workflow/fixtures/e2e_outputs/book_product.json b/tests/onix_workflow/fixtures/e2e_outputs/book_product.json index aea51d4b..de58f885 100644 --- a/tests/onix_workflow/fixtures/e2e_outputs/book_product.json +++ b/tests/onix_workflow/fixtures/e2e_outputs/book_product.json @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:85c83efcf63955cb6c4b9884bb950bc3c4d4fa94421825dfe39fa528ac25a799 -size 43233 +oid sha256:533855ff7abb603c8797384c6829b8e0aac51a2993da610a51b681dcccecf03d +size 41633 diff --git a/tests/onix_workflow/test_onix_workflow.py b/tests/onix_workflow/test_onix_workflow.py index 62092224..26b9464b 100644 --- a/tests/onix_workflow/test_onix_workflow.py +++ b/tests/onix_workflow/test_onix_workflow.py @@ -33,12 +33,9 @@ copy_latest_export_tables, create_dag, dois_from_table, - download_crossref_events, get_onix_records, insert_into_schema, OnixWorkflowRelease, - transform_crossref_events, - transform_event, ) from observatory_platform.airflow.workflow import Workflow from observatory_platform.config import module_file_path @@ -137,8 +134,6 @@ def __init__(self, *args, **kwargs): # fixtures folder location self.fixtures_folder = test_fixtures_folder(workflow_module="onix_workflow") - # vcrpy cassettes for http request mocking - self.events_cassette = os.path.join(self.fixtures_folder, "crossref_events_request.yaml") @patch("oaebu_workflows.onix_workflow.onix_workflow.bq_select_table_shard_dates") def test_make_release_sharded(self, mock_sel_table_suffixes): @@ -186,10 +181,6 @@ def test_make_release_sharded(self, mock_sel_table_suffixes): release.crossref_metadata_path, os.path.join(release.transform_folder, "crossref_metadata.jsonl.gz"), ) - self.assertEqual( - release.transformed_crossref_events_path, - os.path.join(release.transform_folder, "crossref_events.jsonl"), - ) # Test that the onix and crossref snapshots are as expected self.assertEqual(expected_onix_table, release.onix_table_id) @@ -249,10 +240,6 @@ def test_make_release_unsharded(self, mock_sel_table_suffixes): release.crossref_metadata_path, os.path.join(release.transform_folder, "crossref_metadata.jsonl.gz"), ) - self.assertEqual( - release.transformed_crossref_events_path, - os.path.join(release.transform_folder, "crossref_events.jsonl"), - ) # Test that the onix table and crossref snapshots are as expected self.assertEqual(expected_onix_table, release.onix_table_id) @@ -327,29 +314,24 @@ def test_dag_structure(self): "aggregate_works", "export_tables.export_book_metrics", "create_book_product_table", - "export_tables.export_book_metrics_events", "add_new_dataset_releases", "create_crossref_metadata_table", "create_book_table", "update_latest_export_tables", "cleanup_workflow", "export_tables.export_book_metrics_subjects", - "create_crossref_events_table", ], "aggregate_works": ["create_crossref_metadata_table"], - "create_crossref_metadata_table": ["create_crossref_events_table"], - "create_crossref_events_table": ["create_book_table"], + "create_crossref_metadata_table": ["create_book_table"], "create_book_table": [], "create_book_product_table": [ "export_tables.export_book_metrics_author", "export_tables.export_book_list", "export_tables.export_book_metrics_country", "export_tables.export_book_metrics", - "export_tables.export_book_metrics_events", "export_tables.export_book_metrics_subjects", ], "export_tables.export_book_list": ["update_latest_export_tables"], - "export_tables.export_book_metrics_events": ["update_latest_export_tables"], "export_tables.export_book_metrics": ["update_latest_export_tables"], "export_tables.export_book_metrics_country": ["update_latest_export_tables"], "export_tables.export_book_metrics_author": ["update_latest_export_tables"], @@ -390,7 +372,6 @@ def test_dag_structure(self): "export_tables.export_book_metrics_author", "intermediate_tables.intermediate_google_books_sales", "export_tables.export_book_institution_list", - "create_crossref_events_table", "export_tables.export_book_metrics", "add_new_dataset_releases", "intermediate_tables.intermediate_jstor_country", @@ -398,7 +379,6 @@ def test_dag_structure(self): "intermediate_tables.intermediate_irus_oapen", "intermediate_tables.intermediate_google_books_traffic", "create_book_table", - "export_tables.export_book_metrics_events", "export_tables.export_book_metrics_subjects", "update_latest_export_tables", "export_tables.export_book_list", @@ -414,8 +394,7 @@ def test_dag_structure(self): "export_tables.export_book_metrics_institution", ], "aggregate_works": ["create_crossref_metadata_table"], - "create_crossref_metadata_table": ["create_crossref_events_table"], - "create_crossref_events_table": ["create_book_table"], + "create_crossref_metadata_table": ["create_book_table"], "create_book_table": [ "intermediate_tables.intermediate_worldreader", "intermediate_tables.intermediate_jstor_country", @@ -442,7 +421,6 @@ def test_dag_structure(self): "export_tables.export_book_metrics_author", "export_tables.export_book_institution_list", "export_tables.export_book_metrics_country", - "export_tables.export_book_metrics_events", "export_tables.export_book_metrics_city", "export_tables.export_book_metrics_institution", "export_tables.export_book_metrics_subjects", @@ -450,7 +428,6 @@ def test_dag_structure(self): "export_tables.export_book_metrics", ], "export_tables.export_book_list": ["update_latest_export_tables"], - "export_tables.export_book_metrics_events": ["update_latest_export_tables"], "export_tables.export_book_institution_list": ["update_latest_export_tables"], "export_tables.export_book_metrics_institution": ["update_latest_export_tables"], "export_tables.export_book_metrics_city": ["update_latest_export_tables"], @@ -562,26 +539,6 @@ def test_create_and_load_aggregate_works_table(self, mock_bq_query): self.assert_table_content(table_id, load_jsonl(release.worksfamilylookup_path), primary_key="isbn13") self.assert_table_content(table_id, worksfamilylookup_expected, primary_key="isbn13") - def test_crossref_events_api_calls(self): - """Test the functions that query the crossref event and metadata APIs""" - - with CliRunner().isolated_filesystem() as t: - input_path = os.path.join(t, "input.jsonl") - doi_prefixes = ["10.5555", "10.2222"] - events_start = pendulum.date(2020, 1, 1) - events_end = pendulum.date(2021, 1, 1) - mailto = "agent@observatory.academy" - - with vcr.use_cassette( - os.path.join(self.fixtures_folder, "crossref_events_api_calls.yaml"), - record_mode=vcr.record_mode.RecordMode.ONCE, - ): - download_crossref_events(input_path, doi_prefixes, events_start, events_end, mailto) - - actual_events = load_jsonl(input_path) - expected_num_events = 242 + 3620 - self.assertEqual(expected_num_events, len(actual_events)) - @patch("oaebu_workflows.onix_workflow.onix_workflow.bq_run_query") def test_get_onix_records(self, mock_bq_query): mock_bq_query.return_value = TestOnixWorkflow.onix_data @@ -589,116 +546,6 @@ def test_get_onix_records(self, mock_bq_query): self.assertEqual(len(records), 3) self.assertEqual(records[0]["ISBN13"], "111") - def test_crossref_transform(self): - """Test the function that transforms the crossref events data""" - - input_events = [ - { - "license": "https://creativecommons.org/publicdomain/zero/1.0/", - "obj_id": "https://doi.org/10.5555/12345678", - "source_token": "36c35e23-8757-4a9d-aacf-345e9b7eb50d", - "occurred_at": "2021-07-21T13:27:50Z", - "subj_id": "https://en.wikipedia.org/api/rest_v1/page/html/Josiah_S._Carberry/1034726541", - "id": "c76bbfd9-5122-4859-82c8-505b1eb845fd", - "evidence_record": "https://evidence.eventdata.crossref.org/evidence/20210721-wikipedia-83789066-1794-4c4f-bb2c-ed47ccd82264", - "terms": "https://doi.org/10.13003/CED-terms-of-use", - "action": "add", - "subj": { - "pid": "https://en.wikipedia.org/wiki/Josiah_S._Carberry", - "url": "https://en.wikipedia.org/w/index.php?title=Josiah_S._Carberry&oldid=1034726541", - "title": "Josiah S. Carberry", - "work_type_id": "entry-encyclopedia", - "api-url": "https://en.wikipedia.org/api/rest_v1/page/html/Josiah_S._Carberry/1034726541", - }, - "source_id": "wikipedia", - "obj": { - "pid": "https://doi.org/10.5555/12345678", - "url": "http://psychoceramics.labs.crossref.org/10.5555-12345678.html", - "method": "landing-page-meta-tag", - "verification": "checked-url-exact", - }, - "timestamp": "2021-07-21T14:00:58Z", - "relation_type_id": "references", - }, - { - "license": "https://creativecommons.org/publicdomain/zero/1.0/", - "obj_id": "https://doi.org/10.5555/12345679", - "source_token": "36c35e23-8757-4a9d-aacf-345e9b7eb50e", - "occurred_at": "2021-07-21T13:27:50Z", - "subj_id": "https://en.wikipedia.org/api/rest_v1/page/html/Josiah_S._Carberry/1034726541", - "id": "c76bbfd9-5122-4859-82c8-505b1eb845fd", - "evidence_record": "https://evidence.eventdata.crossref.org/evidence/20210721-wikipedia-83789066-1794-4c4f-bb2c-ed47ccd82264", - "terms": "https://doi.org/10.13003/CED-terms-of-use", - "action": "add", - "subj": { - "pid": "https://en.wikipedia.org/wiki/Josiah_S._Carberry", - "url": "https://en.wikipedia.org/w/index.php?title=Josiah_S._Carberry&oldid=1034726541", - "title": "Josiah S. Carberry", - "work_type_id": "entry-encyclopedia", - "api-url": "https://en.wikipedia.org/api/rest_v1/page/html/Josiah_S._Carberry/1034726541", - }, - "source_id": "wikipedia", - "obj": { - "pid": "https://doi.org/10.5555/12345678", - "url": "http://psychoceramics.labs.crossref.org/10.5555-12345678.html", - "method": "landing-page-meta-tag", - "verification": "checked-url-exact", - }, - "timestamp": "2021-07-21T14:00:58Z", - "relation_type_id": "references", - }, - ] - expected = [ - { - "license": "https://creativecommons.org/publicdomain/zero/1.0/", - "obj_id": "https://doi.org/10.5555/12345678", - "source_token": "36c35e23-8757-4a9d-aacf-345e9b7eb50d", - "occurred_at": "2021-07-21T13:27:50+00:00", - "subj_id": "https://en.wikipedia.org/api/rest_v1/page/html/Josiah_S._Carberry/1034726541", - "id": "c76bbfd9-5122-4859-82c8-505b1eb845fd", - "evidence_record": "https://evidence.eventdata.crossref.org/evidence/20210721-wikipedia-83789066-1794-4c4f-bb2c-ed47ccd82264", - "terms": "https://doi.org/10.13003/CED-terms-of-use", - "action": "add", - "subj": { - "pid": "https://en.wikipedia.org/wiki/Josiah_S._Carberry", - "url": "https://en.wikipedia.org/w/index.php?title=Josiah_S._Carberry&oldid=1034726541", - "title": "Josiah S. Carberry", - "work_type_id": "entry-encyclopedia", - "api_url": "https://en.wikipedia.org/api/rest_v1/page/html/Josiah_S._Carberry/1034726541", - }, - "source_id": "wikipedia", - "obj": { - "pid": "https://doi.org/10.5555/12345678", - "url": "http://psychoceramics.labs.crossref.org/10.5555-12345678.html", - "method": "landing-page-meta-tag", - "verification": "checked-url-exact", - }, - "timestamp": "2021-07-21T14:00:58+00:00", - "relation_type_id": "references", - } - ] - - # Standalone transform - actual = transform_event(input_events[0]) - self.assertEqual(expected[0], actual) - - # List transform - with CliRunner().isolated_filesystem() as t: - # Save input - input_path = os.path.join(t, "input.jsonl") - save_jsonl(input_path, input_events) - - # Transform - # Also tests filtering of events - dois = {"10.5555/12345678"} - output_path = os.path.join(t, "output.jsonl") - transform_crossref_events(input_path, dois, output_path) - - # Load actual and test - actual = load_jsonl(output_path) - self.assertEqual(len(actual), 1) - self.assertEqual(expected, actual) - def test_insert_into_schema(self): """Tests the instert_into_schema function""" @@ -932,15 +779,6 @@ def setup_input_data( def test_workflow(self): """End to end test of the ONIX workflow""" - def vcr_ignore_condition(request): - """This function is used by vcrpy to allow requests to sources not in the cassette file. - At time of writing, the only mocked requests are the ones to crossref events.""" - allowed_domains = ["https://api.eventdata.crossref.org"] - allow_request = any([request.url.startswith(i) for i in allowed_domains]) - if not allow_request: - return None - return request - # Setup Observatory environment env = SandboxEnvironment(self.gcp_project_id, self.data_location) @@ -988,7 +826,6 @@ def vcr_ignore_condition(request): start_date = pendulum.datetime(year=2021, month=5, day=9) # Sunday dag_id = "onix_workflow_test" bq_oaebu_crossref_metadata_table_name = "crossref_metadata" - bq_crossref_events_table_name = "crossref_events" bq_book_table_name = "book" bq_book_product_table_name = "book_product" bq_worksid_table_name = "onix_workid_isbn" @@ -1004,7 +841,6 @@ def vcr_ignore_condition(request): bq_oaebu_crossref_dataset_id=oaebu_crossref_dataset_id, bq_oaebu_crossref_metadata_table_name=bq_oaebu_crossref_metadata_table_name, bq_master_crossref_metadata_table_name="crossref_metadata_master", # Set in setup_input_data() - bq_crossref_events_table_name=bq_crossref_events_table_name, bq_book_table_name=bq_book_table_name, bq_book_product_table_name=bq_book_product_table_name, bq_country_project_id=env.cloud_workspace.project_id, @@ -1024,7 +860,6 @@ def vcr_ignore_condition(request): sensor_dag_ids=sensor_dag_ids, start_date=start_date, crossref_start_date=pendulum.datetime(year=2018, month=5, day=14), - skip_downloading_crossref_events=False, ) # Skip dag existence check in sensor. @@ -1125,25 +960,6 @@ def vcr_ignore_condition(request): primary_key="DOI", ) - # Load crossref event table into bigquery - # Even though this is using VCR, this file is manually created, which is not ideal - with vcr.use_cassette( - self.events_cassette, record_mode="none", before_record_request=vcr_ignore_condition - ): - ti = env.run_task("create_crossref_events_table") - self.assertEqual(ti.state, State.SUCCESS) - table_id = bq_sharded_table_id( - self.gcp_project_id, - oaebu_crossref_dataset_id, - bq_crossref_events_table_name, - release.snapshot_date, - ) - crossref_fixture_table = load_and_parse_json( - os.path.join(self.fixtures_folder, "e2e_outputs", "crossref_events.json"), - timestamp_fields=["timestamp", "occurred_at", "updated_date", "issued", "dateModified"], - ) - self.assert_table_content(table_id, crossref_fixture_table, primary_key="id") - # Create book table in bigquery ti = env.run_task("create_book_table") self.assertEqual(ti.state, State.SUCCESS) @@ -1189,21 +1005,19 @@ def vcr_ignore_condition(request): "export_tables.export_book_metrics_institution", "export_tables.export_book_metrics_author", "export_tables.export_book_metrics_city", - "export_tables.export_book_metrics_events", "export_tables.export_book_metrics_subjects", ] export_tables = [ ("book_list", 4), ("book_institution_list", 1), - ("book_metrics", 6), + ("book_metrics", 3), ("book_metrics_country", 32), ("book_metrics_institution", 1), - ("book_metrics_author", 3), + ("book_metrics_author", 1), ("book_metrics_city", 39), - ("book_metrics_events", 3), - ("book_metrics_subject_bic", 2), - ("book_metrics_subject_bisac", 1), - ("book_metrics_subject_thema", 2), + ("book_metrics_subject_bic", 1), + ("book_metrics_subject_bisac", 0), + ("book_metrics_subject_thema", 1), ] # Create the export tables