From da343d294e83d6b9fe265a365e93376abffaabdf Mon Sep 17 00:00:00 2001 From: Eugene Miretsky Date: Mon, 13 Jun 2022 12:09:14 -0400 Subject: [PATCH 1/5] remove unsused imports --- .../operators/gcp/ods/ods_merge_table_operator.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/gcp_airflow_foundations/operators/gcp/ods/ods_merge_table_operator.py b/gcp_airflow_foundations/operators/gcp/ods/ods_merge_table_operator.py index 987e8aea..2c5f46df 100644 --- a/gcp_airflow_foundations/operators/gcp/ods/ods_merge_table_operator.py +++ b/gcp_airflow_foundations/operators/gcp/ods/ods_merge_table_operator.py @@ -1,14 +1,11 @@ from typing import Optional -from datetime import datetime -from airflow.models import BaseOperator, BaseOperatorLink from airflow.contrib.operators.bigquery_operator import ( BigQueryOperator, BigQueryCreateEmptyTableOperator, ) from airflow.utils.decorators import apply_defaults -from airflow.contrib.hooks.bigquery_hook import BigQueryHook from airflow.exceptions import AirflowException From 5bd33d9bb18323cdcbcaa135ab628c900e2d797a Mon Sep 17 00:00:00 2001 From: Eugene Miretsky Date: Mon, 13 Jun 2022 12:11:02 -0400 Subject: [PATCH 2/5] remove unsused imports --- .../common/gcp/source_schema/bq.py | 7 +++--- .../common/gcp/source_schema/gcs.py | 3 --- .../facebook/operators/facebook_ads_to_gcs.py | 6 ----- .../schema_parsing/schema_parsing_operator.py | 22 +++++-------------- tests/test_utils/bq_test_utils.py | 2 -- 5 files changed, 8 insertions(+), 32 deletions(-) diff --git a/gcp_airflow_foundations/common/gcp/source_schema/bq.py b/gcp_airflow_foundations/common/gcp/source_schema/bq.py index d7d3af45..c9cf7d7f 100644 --- a/gcp_airflow_foundations/common/gcp/source_schema/bq.py +++ b/gcp_airflow_foundations/common/gcp/source_schema/bq.py @@ -1,6 +1,3 @@ -import json -import logging - from airflow.contrib.hooks.bigquery_hook import BigQueryHook @@ -18,6 +15,8 @@ def read_schema_from_bq( bq_hook = BigQueryHook(bigquery_conn_id=bigquery_conn_id, delegate_to=None) - schema = bq_hook.get_schema(dataset_id=dataset_id, table_id=table_id, project_id=project_id) + schema = bq_hook.get_schema( + dataset_id=dataset_id, table_id=table_id, project_id=project_id + ) return schema["fields"] diff --git a/gcp_airflow_foundations/common/gcp/source_schema/gcs.py b/gcp_airflow_foundations/common/gcp/source_schema/gcs.py index 2e2f2825..33bdaeed 100644 --- a/gcp_airflow_foundations/common/gcp/source_schema/gcs.py +++ b/gcp_airflow_foundations/common/gcp/source_schema/gcs.py @@ -1,7 +1,4 @@ import json -import logging - -from airflow.contrib.hooks.bigquery_hook import BigQueryHook from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook from urllib.parse import urlparse diff --git a/gcp_airflow_foundations/operators/facebook/operators/facebook_ads_to_gcs.py b/gcp_airflow_foundations/operators/facebook/operators/facebook_ads_to_gcs.py index 60ed1169..b0169b0c 100644 --- a/gcp_airflow_foundations/operators/facebook/operators/facebook_ads_to_gcs.py +++ b/gcp_airflow_foundations/operators/facebook/operators/facebook_ads_to_gcs.py @@ -1,17 +1,11 @@ -import csv -import tempfile -import warnings -import time from typing import Any, Dict, List, Optional, Sequence, Union from random import shuffle import pandas as pd from datetime import datetime -from dateutil.relativedelta import relativedelta import pyarrow.parquet as pq import pyarrow -from airflow.exceptions import AirflowException from gcp_airflow_foundations.operators.facebook.hooks.ads import ( CustomFacebookAdsReportingHook, diff --git a/gcp_airflow_foundations/operators/gcp/schema_parsing/schema_parsing_operator.py b/gcp_airflow_foundations/operators/gcp/schema_parsing/schema_parsing_operator.py index d5a85125..76b679bc 100644 --- a/gcp_airflow_foundations/operators/gcp/schema_parsing/schema_parsing_operator.py +++ b/gcp_airflow_foundations/operators/gcp/schema_parsing/schema_parsing_operator.py @@ -1,27 +1,13 @@ -from typing import Optional - from airflow.models import BaseOperator, BaseOperatorLink -from airflow.contrib.operators.bigquery_operator import ( - BigQueryOperator, - BigQueryCreateEmptyTableOperator, -) from airflow.utils.decorators import apply_defaults -from airflow.contrib.hooks.bigquery_hook import BigQueryHook - -from airflow.exceptions import AirflowException import logging -from gcp_airflow_foundations.common.gcp.source_schema.gcs import read_schema_from_gcs from gcp_airflow_foundations.common.gcp.ods.schema_utils import parse_ods_schema from gcp_airflow_foundations.common.gcp.hds.schema_utils import parse_hds_schema -from gcp_airflow_foundations.base_class.ods_table_config import OdsTableConfig -from gcp_airflow_foundations.base_class.hds_table_config import HdsTableConfig - - class ParseSchema(BaseOperator): @apply_defaults def __init__( @@ -45,8 +31,8 @@ def __init__( self.new_column_udfs = new_column_udfs self.data_source = data_source self.table_config = table_config - self.ods_table_id = ods_table_id, - self.hds_table_id = hds_table_id, + self.ods_table_id = (ods_table_id,) + self.hds_table_id = (hds_table_id,) self.ods_table_config = table_config.ods_config self.hds_table_config = table_config.hds_config @@ -80,7 +66,9 @@ def execute(self, context): column_names = list(self.new_column_udfs.keys()) for column_name in column_names: field = self.new_column_udfs[column_name] - source_schema_fields.append({"name": column_name, "type": field["output_type"]}) + source_schema_fields.append( + {"name": column_name, "type": field["output_type"]} + ) if self.ods_table_config: schema_xcom[ diff --git a/tests/test_utils/bq_test_utils.py b/tests/test_utils/bq_test_utils.py index bcbcdce0..e1ed7e7b 100644 --- a/tests/test_utils/bq_test_utils.py +++ b/tests/test_utils/bq_test_utils.py @@ -1,5 +1,3 @@ -from google.cloud import bigquery -from google.cloud.bigquery import SchemaField import pandas from time import sleep from airflow.contrib.hooks.bigquery_hook import BigQueryHook From 6212e628df9fa87bf11d53cc3386b5838022e121 Mon Sep 17 00:00:00 2001 From: Eugene Miretsky Date: Mon, 13 Jun 2022 12:23:25 -0400 Subject: [PATCH 3/5] remove unsused imports --- gcp_airflow_foundations/common/gcp/source_schema/bq.py | 2 +- .../operators/api/operators/twilio_operator.py | 2 +- .../operators/facebook/operators/facebook_ads_to_gcs.py | 2 +- .../operators/gcp/bigquery/custom_bq_hook.py | 3 +-- .../operators/gcp/hds/hds_merge_table_operator.py | 6 +----- tests/integration/dlp/test_dlp.py | 1 - tests/test_utils/bq_test_utils.py | 2 +- 7 files changed, 6 insertions(+), 12 deletions(-) diff --git a/gcp_airflow_foundations/common/gcp/source_schema/bq.py b/gcp_airflow_foundations/common/gcp/source_schema/bq.py index c9cf7d7f..52bc7443 100644 --- a/gcp_airflow_foundations/common/gcp/source_schema/bq.py +++ b/gcp_airflow_foundations/common/gcp/source_schema/bq.py @@ -1,4 +1,4 @@ -from airflow.contrib.hooks.bigquery_hook import BigQueryHook +from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook def read_schema_from_bq( diff --git a/gcp_airflow_foundations/operators/api/operators/twilio_operator.py b/gcp_airflow_foundations/operators/api/operators/twilio_operator.py index ea59320f..12004428 100644 --- a/gcp_airflow_foundations/operators/api/operators/twilio_operator.py +++ b/gcp_airflow_foundations/operators/api/operators/twilio_operator.py @@ -1,7 +1,7 @@ import json import logging -from airflow.contrib.hooks.bigquery_hook import BigQueryHook +from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook from airflow.models.baseoperator import BaseOperator diff --git a/gcp_airflow_foundations/operators/facebook/operators/facebook_ads_to_gcs.py b/gcp_airflow_foundations/operators/facebook/operators/facebook_ads_to_gcs.py index b0169b0c..911ca7c9 100644 --- a/gcp_airflow_foundations/operators/facebook/operators/facebook_ads_to_gcs.py +++ b/gcp_airflow_foundations/operators/facebook/operators/facebook_ads_to_gcs.py @@ -13,7 +13,7 @@ from gcp_airflow_foundations.enums.facebook import AccountLookupScope, ApiObject from airflow.models import BaseOperator, Variable -from airflow.contrib.hooks.bigquery_hook import BigQueryHook +from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook from google.cloud import bigquery diff --git a/gcp_airflow_foundations/operators/gcp/bigquery/custom_bq_hook.py b/gcp_airflow_foundations/operators/gcp/bigquery/custom_bq_hook.py index bcabe21c..06e00143 100644 --- a/gcp_airflow_foundations/operators/gcp/bigquery/custom_bq_hook.py +++ b/gcp_airflow_foundations/operators/gcp/bigquery/custom_bq_hook.py @@ -1,9 +1,8 @@ import pandas as pd -import json import pyarrow.parquet as pq import pyarrow -from airflow.contrib.hooks.bigquery_hook import BigQueryHook +from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook from airflow.utils.decorators import apply_defaults from google.cloud import bigquery diff --git a/gcp_airflow_foundations/operators/gcp/hds/hds_merge_table_operator.py b/gcp_airflow_foundations/operators/gcp/hds/hds_merge_table_operator.py index 1d8d4ecf..57a9220a 100644 --- a/gcp_airflow_foundations/operators/gcp/hds/hds_merge_table_operator.py +++ b/gcp_airflow_foundations/operators/gcp/hds/hds_merge_table_operator.py @@ -1,14 +1,12 @@ from typing import Optional from datetime import datetime -from airflow.models import BaseOperator, BaseOperatorLink from airflow.contrib.operators.bigquery_operator import ( BigQueryOperator, BigQueryCreateEmptyTableOperator, ) from airflow.utils.decorators import apply_defaults -from airflow.contrib.hooks.bigquery_hook import BigQueryHook from airflow.exceptions import AirflowException @@ -171,9 +169,7 @@ def pre_execute(self, context) -> None: self.write_disposition = "WRITE_TRUNCATE" self.create_disposition = "CREATE_IF_NEEDED" - self.destination_dataset_table = ( - f"{self.project_id}.{self.data_dataset_name}.{self.data_table_name}${partition_id}" - ) + self.destination_dataset_table = f"{self.project_id}.{self.data_dataset_name}.{self.data_table_name}${partition_id}" elif self.hds_table_config.hds_table_type == HdsTableType.SCD2: sql = sql_helper.create_scd2_sql_with_hash( diff --git a/tests/integration/dlp/test_dlp.py b/tests/integration/dlp/test_dlp.py index 67fef2da..4d5a2ed6 100644 --- a/tests/integration/dlp/test_dlp.py +++ b/tests/integration/dlp/test_dlp.py @@ -7,7 +7,6 @@ from airflow.utils.types import DagRunType from bq_test_utils import insert_to_bq_from_dict from datetime import datetime -from google.cloud import bigquery from pytest_testconfig import config from test_utils import cleanup_xcom, clear_db_dags diff --git a/tests/test_utils/bq_test_utils.py b/tests/test_utils/bq_test_utils.py index e1ed7e7b..67e25331 100644 --- a/tests/test_utils/bq_test_utils.py +++ b/tests/test_utils/bq_test_utils.py @@ -1,6 +1,6 @@ import pandas from time import sleep -from airflow.contrib.hooks.bigquery_hook import BigQueryHook +from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook def insert_to_bq_from_csv(csv, project_id, dataset_id, table_id): From e90cbe6e98ae585182b83a805fa918bcbb2df622 Mon Sep 17 00:00:00 2001 From: Eugene Miretsky Date: Tue, 14 Jun 2022 14:20:03 -0400 Subject: [PATCH 4/5] add location to BigQuery hook --- docker-compose.yaml | 8 ++++---- gcp_airflow_foundations/common/gcp/load_builder.py | 7 +++++-- .../operators/gcp/dlp/dlp_to_datacatalog_taskgroup.py | 11 ++++++++++- .../gcp/dlp/get_dlp_bq_inspection_results_operator.py | 4 ++++ tests/integration/dlp/test_dlp.py | 1 + 5 files changed, 24 insertions(+), 7 deletions(-) diff --git a/docker-compose.yaml b/docker-compose.yaml index 09fc87f2..802b914a 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -9,7 +9,7 @@ x-airflow-common: &airflow-common-env # uncomment to allow using user IAM for access #AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT: 'google-cloud-platform://user:password@host/schema?extra__google_cloud_platform__scope=${GCP_AUTH_SCOPE}&extra__google_cloud_platform__project=${GCP_PROJECT_ID}' - AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow1!@postgres/airflow + AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow env_file: - ./variables/docker-env-vars - ./variables/docker-env-secrets # added to gitignore @@ -34,7 +34,7 @@ services: image: postgres:13 environment: POSTGRES_USER: airflow - POSTGRES_PASSWORD: airflow1! + POSTGRES_PASSWORD: airflow POSTGRES_DB: airflow volumes: - postgres-db-volume:/var/lib/postgresql/data @@ -94,8 +94,8 @@ services: _AIRFLOW_DB_UPGRADE: 'true' _AIRFLOW_WWW_USER_CREATE: 'true' _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow} - _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow1!} + _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow} user: "0:${AIRFLOW_GID:-0}" volumes: - postgres-db-volume: \ No newline at end of file + postgres-db-volume: diff --git a/gcp_airflow_foundations/common/gcp/load_builder.py b/gcp_airflow_foundations/common/gcp/load_builder.py index f971fb4a..0b27a46c 100644 --- a/gcp_airflow_foundations/common/gcp/load_builder.py +++ b/gcp_airflow_foundations/common/gcp/load_builder.py @@ -19,7 +19,9 @@ ParseSchema, ) from gcp_airflow_foundations.source_class.schema_source_config import SchemaSourceConfig -from gcp_airflow_foundations.operators.gcp.create_dataset import CustomBigQueryCreateEmptyDatasetOperator +from gcp_airflow_foundations.operators.gcp.create_dataset import ( + CustomBigQueryCreateEmptyDatasetOperator, +) def load_builder( @@ -137,7 +139,7 @@ def load_builder( dataset_id=data_source.dlp_config.results_dataset_id, location=location, exists_ok=True, - dag=dag + dag=dag, ) dlp_tasks_configs = [ @@ -161,6 +163,7 @@ def load_builder( ods_dlp_task_groups = schedule_dlp_to_datacatalog_taskgroup_multiple_tables( table_configs=dlp_tasks_configs, table_dlp_config=dlp_table_config, + location=location, next_task=done, dag=dag, ) diff --git a/gcp_airflow_foundations/operators/gcp/dlp/dlp_to_datacatalog_taskgroup.py b/gcp_airflow_foundations/operators/gcp/dlp/dlp_to_datacatalog_taskgroup.py index cd9f4f46..7b3e79da 100644 --- a/gcp_airflow_foundations/operators/gcp/dlp/dlp_to_datacatalog_taskgroup.py +++ b/gcp_airflow_foundations/operators/gcp/dlp/dlp_to_datacatalog_taskgroup.py @@ -34,6 +34,7 @@ def schedule_dlp_to_datacatalog_taskgroup( project_id: str, table_id: str, dataset_id: str, + location: str, table_dlp_config: DlpTableConfig, next_task: BaseOperator, dag, @@ -52,6 +53,7 @@ def schedule_dlp_to_datacatalog_taskgroup( project_id=project_id, table_id=table_id, dataset_id=dataset_id, + location=location, table_dlp_config=table_dlp_config, next_task=next_task, dag=dag, @@ -72,7 +74,11 @@ def schedule_dlp_to_datacatalog_taskgroup( def schedule_dlp_to_datacatalog_taskgroup_multiple_tables( - table_configs: list, table_dlp_config: DlpTableConfig, next_task: BaseOperator, dag + table_configs: list, + table_dlp_config: DlpTableConfig, + location: str, + next_task: BaseOperator, + dag, ): """ Check if DLP should run, and run it on multiple tables @@ -88,6 +94,7 @@ def schedule_dlp_to_datacatalog_taskgroup_multiple_tables( for table_config in table_configs: dlp_task = dlp_to_datacatalog_builder( taskgroup=taskgroup, + location=location, datastore=table_config["datastore"], project_id=table_config["project_id"], table_id=table_config["table_id"], @@ -125,6 +132,7 @@ def dlp_to_datacatalog_builder( project_id: str, table_id: str, dataset_id: str, + location: str, table_dlp_config: DlpTableConfig, next_task: BaseOperator, dag, @@ -181,6 +189,7 @@ def dlp_to_datacatalog_builder( project_id=dlp_results_table_ref.project, dataset_id=dlp_results_table_ref.dataset_id, table_id=dlp_results_table_ref.table_id, + location=location, do_xcom_push=True, min_match_count=table_dlp_config.get_min_match_count(), task_group=taskgroup, diff --git a/gcp_airflow_foundations/operators/gcp/dlp/get_dlp_bq_inspection_results_operator.py b/gcp_airflow_foundations/operators/gcp/dlp/get_dlp_bq_inspection_results_operator.py index 6df3fead..f8efa502 100644 --- a/gcp_airflow_foundations/operators/gcp/dlp/get_dlp_bq_inspection_results_operator.py +++ b/gcp_airflow_foundations/operators/gcp/dlp/get_dlp_bq_inspection_results_operator.py @@ -18,6 +18,8 @@ class DlpBQInspectionResultsOperator(BaseOperator): :type dataset_id: str :param min_match_count: Minimum number of findings per column/likelihood level pair :type min_match_count: int + :param location: Location of the source dataset + :type dataset_id: str """ @apply_defaults @@ -27,6 +29,7 @@ def __init__( dataset_id, table_id, project_id, + location, min_match_count=0, do_xcom_push=True, gcp_conn_id="google_cloud_default", @@ -44,6 +47,7 @@ def __init__( gcp_conn_id=gcp_conn_id, use_legacy_sql=False, impersonation_chain=impersonation_chain, + location=location, ) conn = self.hook.get_conn() self.cursor = conn.cursor() diff --git a/tests/integration/dlp/test_dlp.py b/tests/integration/dlp/test_dlp.py index 4d5a2ed6..79d105dd 100644 --- a/tests/integration/dlp/test_dlp.py +++ b/tests/integration/dlp/test_dlp.py @@ -147,6 +147,7 @@ def create_dlp_dag(self, dag, project_id, dataset_id, target_table_id): table_id=target_table_id, dataset_id=dataset_id, table_dlp_config=dlp_table_config, + location=config["gcp"]["location"], next_task=done, dag=dag, ) From 4530193e50086c9112e64f153744d420c5ab2aff Mon Sep 17 00:00:00 2001 From: Eugene Miretsky Date: Tue, 14 Jun 2022 14:38:24 -0400 Subject: [PATCH 5/5] fix unit test --- tests/unit/operators/dlp/test_dlp.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/unit/operators/dlp/test_dlp.py b/tests/unit/operators/dlp/test_dlp.py index 27ee15f0..e0168e32 100644 --- a/tests/unit/operators/dlp/test_dlp.py +++ b/tests/unit/operators/dlp/test_dlp.py @@ -138,6 +138,7 @@ def test_dlp_args( table_id=TEST_TABLE_ID, dataset_id=TEST_DATASET, table_dlp_config=dlp_table_config, + location="us", dag=self.dag, )