From ce5eebd00403beabc23b4f0b4bedba5b5c397c42 Mon Sep 17 00:00:00 2001 From: max <42827971+moiseenkov@users.noreply.github.com> Date: Mon, 31 Jul 2023 10:46:26 +0200 Subject: [PATCH] Fix system test for MetastoreHivePartitionSensor (#32861) --- airflow/providers/google/provider.yaml | 2 +- generated/provider_dependencies.json | 2 +- ...ataproc_metastore_hive_partition_sensor.py | 200 ++++++++++++++++-- 3 files changed, 190 insertions(+), 14 deletions(-) diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml index 7d4805925c3644..1b96ca972a6169 100644 --- a/airflow/providers/google/provider.yaml +++ b/airflow/providers/google/provider.yaml @@ -94,7 +94,7 @@ dependencies: - google-cloud-dataform>=0.5.0 - google-cloud-dataplex>=1.4.2 - google-cloud-dataproc>=5.4.0 - - google-cloud-dataproc-metastore>=1.10.0 + - google-cloud-dataproc-metastore>=1.12.0 - google-cloud-dlp>=3.12.0 - google-cloud-kms>=2.15.0 - google-cloud-language>=2.9.0 diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index fadffea64a4cad..4b2fc9a63942bc 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -426,7 +426,7 @@ "google-cloud-dataflow-client>=0.8.2", "google-cloud-dataform>=0.5.0", "google-cloud-dataplex>=1.4.2", - "google-cloud-dataproc-metastore>=1.10.0", + "google-cloud-dataproc-metastore>=1.12.0", "google-cloud-dataproc>=5.4.0", "google-cloud-dlp>=3.12.0", "google-cloud-kms>=2.15.0", diff --git a/tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_hive_partition_sensor.py b/tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_hive_partition_sensor.py index bd5b9a3736f562..134caff6dbe00b 100644 --- a/tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_hive_partition_sensor.py +++ b/tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_hive_partition_sensor.py @@ -19,7 +19,7 @@ Example Airflow DAG that show how to check Hive partitions existence using Dataproc Metastore Sensor. -Note that Metastore service must be configured to use gRPC endpoints, +Note that Metastore service must be configured to use gRPC endpoints. """ from __future__ import annotations @@ -27,37 +27,213 @@ import os from airflow import models +from airflow.decorators import task +from airflow.providers.google.cloud.hooks.gcs import _parse_gcs_url +from airflow.providers.google.cloud.operators.dataproc import ( + DataprocCreateClusterOperator, + DataprocDeleteClusterOperator, + DataprocSubmitJobOperator, +) +from airflow.providers.google.cloud.operators.dataproc_metastore import ( + DataprocMetastoreCreateServiceOperator, + DataprocMetastoreDeleteServiceOperator, +) +from airflow.providers.google.cloud.operators.gcs import GCSDeleteBucketOperator from airflow.providers.google.cloud.sensors.dataproc_metastore import MetastoreHivePartitionSensor +from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator +from airflow.utils.trigger_rule import TriggerRule -DAG_ID = "dataproc_metastore_hive_partition_sensor" -PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "") -ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +DAG_ID = "hive_partition_sensor" +PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "demo-project") +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "demo-env") +REGION = "us-central1" +NETWORK = "default" -SERVICE_ID = f"{DAG_ID}-service-{ENV_ID}".replace("_", "-") -REGION = "europe-west1" -TABLE_NAME = "test_table" -PARTITION_1 = "column1=value1" -PARTITION_2 = "column2=value2/column3=value3" +METASTORE_SERVICE_ID = f"metastore-{DAG_ID}-{ENV_ID}".replace("_", "-") +METASTORE_TIMEOUT = 2400 +METASTORE_SERVICE = { + "name": METASTORE_SERVICE_ID, + "hive_metastore_config": { + "endpoint_protocol": "GRPC", + }, + "network": f"projects/{PROJECT_ID}/global/networks/{NETWORK}", +} +METASTORE_SERVICE_QFN = f"projects/{PROJECT_ID}/locations/{REGION}/services/{METASTORE_SERVICE_ID}" +DATAPROC_CLUSTER_NAME = f"cluster-{DAG_ID}".replace("_", "-") +DATAPROC_CLUSTER_CONFIG = { + "master_config": { + "num_instances": 1, + "machine_type_uri": "n1-standard-2", + "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024}, + }, + "worker_config": { + "num_instances": 2, + "machine_type_uri": "n1-standard-2", + "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024}, + }, + "metastore_config": { + "dataproc_metastore_service": METASTORE_SERVICE_QFN, + }, + "gce_cluster_config": { + "service_account_scopes": [ + "https://www.googleapis.com/auth/cloud-platform", + ], + }, +} +TABLE_NAME = "transactions_partitioned" +COLUMN = "TransactionType" +PARTITION_1 = f"{COLUMN}=credit".lower() +PARTITION_2 = f"{COLUMN}=debit".lower() +SOURCE_DATA_BUCKET = "airflow-system-tests-resources" +SOURCE_DATA_PATH = "dataproc/hive" +SOURCE_DATA_FILE_NAME = "part-00000.parquet" +EXTERNAL_TABLE_BUCKET = "{{task_instance.xcom_pull(task_ids='get_hive_warehouse_bucket_task', key='bucket')}}" +QUERY_CREATE_EXTERNAL_TABLE = f""" +CREATE EXTERNAL TABLE IF NOT EXISTS transactions +(SubmissionDate DATE, TransactionAmount DOUBLE, TransactionType STRING) +STORED AS PARQUET +LOCATION 'gs://{EXTERNAL_TABLE_BUCKET}/{SOURCE_DATA_PATH}'; +""" +QUERY_CREATE_PARTITIONED_TABLE = f""" +CREATE EXTERNAL TABLE IF NOT EXISTS {TABLE_NAME} +(SubmissionDate DATE, TransactionAmount DOUBLE) +PARTITIONED BY ({COLUMN} STRING); +""" +QUERY_COPY_DATA_WITH_PARTITIONS = f""" +SET hive.exec.dynamic.partition.mode=nonstrict; +INSERT INTO TABLE {TABLE_NAME} PARTITION ({COLUMN}) +SELECT SubmissionDate,TransactionAmount,TransactionType FROM transactions; +""" with models.DAG( DAG_ID, start_date=datetime.datetime(2021, 1, 1), schedule="@once", catchup=False, - tags=["example", "dataproc", "metastore"], + tags=["example", "dataproc", "metastore", "partition", "hive", "sensor"], ) as dag: + create_metastore_service = DataprocMetastoreCreateServiceOperator( + task_id="create_metastore_service", + region=REGION, + project_id=PROJECT_ID, + service=METASTORE_SERVICE, + service_id=METASTORE_SERVICE_ID, + timeout=METASTORE_TIMEOUT, + ) + + create_cluster = DataprocCreateClusterOperator( + task_id="create_cluster", + cluster_name=DATAPROC_CLUSTER_NAME, + project_id=PROJECT_ID, + cluster_config=DATAPROC_CLUSTER_CONFIG, + region=REGION, + ) + + @task(task_id="get_hive_warehouse_bucket_task") + def get_hive_warehouse_bucket(**kwargs): + """Returns Hive Metastore Warehouse GCS bucket name.""" + ti = kwargs["ti"] + metastore_service: dict = ti.xcom_pull(task_ids="create_metastore_service") + config_overrides: dict = metastore_service["hive_metastore_config"]["config_overrides"] + destination_dir: str = config_overrides["hive.metastore.warehouse.dir"] + bucket, _ = _parse_gcs_url(destination_dir) + ti.xcom_push(key="bucket", value=bucket) + + get_hive_warehouse_bucket_task = get_hive_warehouse_bucket() + + copy_source_data = GCSToGCSOperator( + task_id="copy_source_data", + source_bucket=SOURCE_DATA_BUCKET, + source_object=f"{SOURCE_DATA_PATH}/{SOURCE_DATA_FILE_NAME}", + destination_bucket=EXTERNAL_TABLE_BUCKET, + destination_object=f"{SOURCE_DATA_PATH}/{SOURCE_DATA_FILE_NAME}", + ) + + create_external_table = DataprocSubmitJobOperator( + task_id="create_external_table", + job={ + "reference": {"project_id": PROJECT_ID}, + "placement": {"cluster_name": DATAPROC_CLUSTER_NAME}, + "hive_job": {"query_list": {"queries": [QUERY_CREATE_EXTERNAL_TABLE]}}, + }, + region=REGION, + project_id=PROJECT_ID, + ) + + create_partitioned_table = DataprocSubmitJobOperator( + task_id="create_partitioned_table", + job={ + "reference": {"project_id": PROJECT_ID}, + "placement": {"cluster_name": DATAPROC_CLUSTER_NAME}, + "hive_job": {"query_list": {"queries": [QUERY_CREATE_PARTITIONED_TABLE]}}, + }, + region=REGION, + project_id=PROJECT_ID, + ) + + partition_data = DataprocSubmitJobOperator( + task_id="partition_data", + job={ + "reference": {"project_id": PROJECT_ID}, + "placement": {"cluster_name": DATAPROC_CLUSTER_NAME}, + "hive_job": {"query_list": {"queries": [QUERY_COPY_DATA_WITH_PARTITIONS]}}, + }, + region=REGION, + project_id=PROJECT_ID, + ) + # [START how_to_cloud_dataproc_metastore_hive_partition_sensor] - sensor = MetastoreHivePartitionSensor( + hive_partition_sensor = MetastoreHivePartitionSensor( task_id="hive_partition_sensor", - service_id=SERVICE_ID, + service_id=METASTORE_SERVICE_ID, region=REGION, table=TABLE_NAME, partitions=[PARTITION_1, PARTITION_2], ) # [END how_to_cloud_dataproc_metastore_hive_partition_sensor] + delete_dataproc_cluster = DataprocDeleteClusterOperator( + task_id="delete_dataproc_cluster", + cluster_name=DATAPROC_CLUSTER_NAME, + project_id=PROJECT_ID, + region=REGION, + trigger_rule=TriggerRule.ALL_DONE, + ) + + delete_metastore_service = DataprocMetastoreDeleteServiceOperator( + task_id="delete_metastore_service", + service_id=METASTORE_SERVICE_ID, + project_id=PROJECT_ID, + region=REGION, + trigger_rule=TriggerRule.ALL_DONE, + ) + + delete_warehouse_bucket = GCSDeleteBucketOperator( + task_id="delete_warehouse_bucket", + bucket_name=EXTERNAL_TABLE_BUCKET, + trigger_rule=TriggerRule.ALL_DONE, + ) + + # TEST SETUP + ( + create_metastore_service + >> create_cluster + >> get_hive_warehouse_bucket_task + >> copy_source_data + >> create_external_table + >> create_partitioned_table + >> partition_data + ) + ( + create_metastore_service + # TEST BODY + >> hive_partition_sensor + # TEST TEARDOWN + >> [delete_dataproc_cluster, delete_metastore_service, delete_warehouse_bucket] + ) + from tests.system.utils.watcher import watcher # This test needs watcher in order to properly mark success/failure