Skip to content

Commit

Permalink
Feature: Add bq functions to the platform (#646)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexmassen-hane authored Nov 23, 2023
1 parent 44f6e80 commit 2b34ebf
Show file tree
Hide file tree
Showing 3 changed files with 256 additions and 47 deletions.
61 changes: 39 additions & 22 deletions observatory-platform/observatory/platform/bigquery.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2020 Curtin University
# Copyright 2020-2023 Curtin University
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

# Author: James Diprose, Aniek Roelofs
# Author: James Diprose, Aniek Roelofs, Alex Massen-Hane

import datetime
import glob
Expand All @@ -28,7 +28,15 @@
import pendulum
from google.api_core.exceptions import BadRequest, Conflict
from google.cloud import bigquery
from google.cloud.bigquery import LoadJob, LoadJobConfig, QueryJob, SourceFormat, CopyJobConfig, CopyJob
from google.cloud.bigquery import (
LoadJob,
LoadJobConfig,
QueryJob,
SourceFormat,
CopyJobConfig,
CopyJob,
Table as BQTable,
)
from google.cloud.bigquery import dataset
from google.cloud.bigquery.job import QueryJobConfig
from google.cloud.bigquery.table import Table
Expand Down Expand Up @@ -84,33 +92,24 @@ def bq_sharded_table_id(
return f"{project_id}.{dataset_id}.{table_name}{date.strftime('%Y%m%d')}"


def bq_table_id_parts(table_id: str) -> Tuple[str, str, str, str, Optional[pendulum.Date]]:
def bq_table_id_parts(table_id: str) -> Tuple[str, str, str, Optional[pendulum.Date]]:
"""Convert a BigQuery fully qualified table identifier into its parts which consist of project_id, dataset_id,
and table_id, table_name and shard_date.
and table_id, and shard_date.
:param table_id: the fully qualified BigQuery table identifier.
:return: project_id, dataset_id and table_id and the table_name and optional shard date.
:return: project_id, dataset_id and table_id and optional shard date.
"""

assert_table_id(table_id)
parts = table_id.split(".")
project_id = parts[0]
dataset_id = parts[1]
table_id = parts[2]
table_name, shard_date = bq_table_shard_info(table_id)

return project_id, dataset_id, table_id, table_name, shard_date
table_id = parts[2] # Changes table_id
shard_date = None
if bool(re.search(r"\d{8}$", table_id)):
table_id, shard_date = bq_table_shard_info(parts[2])


def bq_table_name(table_id: str):
"""Remove the date from a table_id.
:param table_id: the table_id including a shard date.
:return: the table name.
"""

# -8 is removing the date from the string.
return table_id[:-8]
return project_id, dataset_id, table_id, shard_date


def bq_table_shard_info(table_id: str) -> Tuple[str, Optional[pendulum.Date]]:
Expand All @@ -125,7 +124,7 @@ def bq_table_shard_info(table_id: str) -> Tuple[str, Optional[pendulum.Date]]:
if results is None:
return table_id, None

return bq_table_name(table_id), pendulum.parse(results.group(0))
return table_id[:-8], pendulum.parse(results.group(0))


def bq_table_exists(table_id: str) -> bool:
Expand Down Expand Up @@ -783,6 +782,21 @@ def bq_list_tables(project_id: str, dataset_id: str) -> List[str]:
return table_ids


def bq_get_table(table_id: str) -> Optional[BQTable]:
"""Get a single Bigqury table object from the Google Bigquery API.
:param table_id: Fully qualified table id.
:return: The table obecjt from the Bigqury API."""

bq_client = bigquery.Client()
try:
table = bq_client.get_table(table_id)
return table
except NotFound:
logging.info(f"Table is not found! {table_id}")
return None


def bq_export_table(*, table_id: str, file_type: str, destination_uri: str) -> bool:
"""Export a BigQuery table.
Expand Down Expand Up @@ -937,7 +951,10 @@ def bq_select_columns(
"""

assert_table_id(table_id)
project_id, dataset_id, table_id, _, _ = bq_table_id_parts(table_id)
project_id, dataset_id, table_id, shard_date = bq_table_id_parts(table_id)
if shard_date is not None:
table_id = f"{table_id}{shard_date.format('YYYYMMDD')}"

template_path = os.path.join(sql_templates_path(), "select_columns.sql.jinja2")
query = render_template(
template_path,
Expand Down
4 changes: 2 additions & 2 deletions tests/fixtures/utils/people_schema.json
Git LFS file not shown
Loading

0 comments on commit 2b34ebf

Please sign in to comment.