From 42465c5a9465fd77f3000117721e0ed1cc51c166 Mon Sep 17 00:00:00 2001 From: max <42827971+moiseenkov@users.noreply.github.com> Date: Mon, 31 Jul 2023 10:46:53 +0200 Subject: [PATCH] Fix Dataproc system tests (#32807) --- .../cloud/dataproc/example_dataproc_batch.py | 24 ++++++++----------- .../example_dataproc_batch_deferrable.py | 13 +++++++--- .../example_dataproc_batch_persistent.py | 18 ++++++++++---- .../example_dataproc_cluster_deferrable.py | 14 +++++++---- .../example_dataproc_cluster_generator.py | 15 ++++++++---- .../example_dataproc_cluster_update.py | 12 +++++++--- .../cloud/dataproc/example_dataproc_gke.py | 12 ++++++---- .../cloud/dataproc/example_dataproc_hadoop.py | 5 +--- .../cloud/dataproc/example_dataproc_hive.py | 6 +---- .../cloud/dataproc/example_dataproc_pig.py | 5 +--- .../dataproc/example_dataproc_pyspark.py | 3 +-- .../dataproc/example_dataproc_workflow.py | 13 +++++++--- .../example_dataproc_workflow_deferrable.py | 15 ++++++++---- 13 files changed, 96 insertions(+), 59 deletions(-) diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py index 160422d197cd7..3e9aed3502926 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py @@ -40,10 +40,10 @@ PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") REGION = "europe-west1" -BATCH_ID = f"test-batch-id-{ENV_ID}" -BATCH_ID_2 = f"test-batch-id-{ENV_ID}-2" -BATCH_ID_3 = f"test-batch-id-{ENV_ID}-3" -BATCH_ID_4 = f"test-batch-id-{ENV_ID}-4" +BATCH_ID = f"batch-{ENV_ID}-{DAG_ID}".replace("_", "-") +BATCH_ID_2 = f"batch-{ENV_ID}-{DAG_ID}-2".replace("_", "-") +BATCH_ID_3 = f"batch-{ENV_ID}-{DAG_ID}-3".replace("_", "-") +BATCH_ID_4 = f"batch-{ENV_ID}-{DAG_ID}-4".replace("_", "-") BATCH_CONFIG = { "spark_batch": { @@ -154,19 +154,15 @@ delete_batch_4.trigger_rule = TriggerRule.ALL_DONE ( - create_batch - >> create_batch_2 - >> create_batch_3 + # TEST SETUP + [create_batch, create_batch_2, create_batch_3] + # TEST BODY >> batch_async_sensor - >> get_batch - >> get_batch_2 - >> list_batches + >> [get_batch, get_batch_2, list_batches] >> create_batch_4 >> cancel_operation - >> delete_batch - >> delete_batch_2 - >> delete_batch_3 - >> delete_batch_4 + # TEST TEARDOWN + >> [delete_batch, delete_batch_2, delete_batch_3, delete_batch_4] ) from tests.system.utils.watcher import watcher diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_batch_deferrable.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_batch_deferrable.py index 2d363328c4585..8ed1893a5b3fd 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_batch_deferrable.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_batch_deferrable.py @@ -36,7 +36,7 @@ DAG_ID = "dataproc_batch_deferrable" PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") REGION = "europe-west1" -BATCH_ID = f"test-def-batch-id-{ENV_ID}" +BATCH_ID = f"batch-{ENV_ID}-{DAG_ID}".replace("_", "-") BATCH_CONFIG = { "spark_batch": { "jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"], @@ -50,7 +50,7 @@ schedule="@once", start_date=datetime(2021, 1, 1), catchup=False, - tags=["example", "dataproc"], + tags=["example", "dataproc", "batch", "deferrable"], ) as dag: # [START how_to_cloud_dataproc_create_batch_operator_async] create_batch = DataprocCreateBatchOperator( @@ -75,7 +75,14 @@ ) delete_batch.trigger_rule = TriggerRule.ALL_DONE - (create_batch >> get_batch >> delete_batch) + ( + # TEST SETUP + create_batch + # TEST BODY + >> get_batch + # TEST TEARDOWN + >> delete_batch + ) from tests.system.utils.watcher import watcher diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_batch_persistent.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_batch_persistent.py index 011b91d24594d..2a8aaf4975360 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_batch_persistent.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_batch_persistent.py @@ -37,8 +37,8 @@ PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}" REGION = "europe-west1" -CLUSTER_NAME = f"dataproc-cluster-ps-{ENV_ID}" -BATCH_ID = f"batch-ps-{ENV_ID}" +CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-") +BATCH_ID = f"batch-{ENV_ID}-{DAG_ID}".replace("_", "-") CLUSTER_GENERATOR_CONFIG_FOR_PHS = ClusterGenerator( project_id=PROJECT_ID, @@ -71,7 +71,7 @@ schedule="@once", start_date=datetime(2021, 1, 1), catchup=False, - tags=["example", "dataproc"], + tags=["example", "dataproc", "batch", "persistent"], ) as dag: create_bucket = GCSCreateBucketOperator( task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID @@ -108,7 +108,17 @@ delete_bucket = GCSDeleteBucketOperator( task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE ) - create_bucket >> create_cluster >> create_batch >> delete_cluster >> delete_bucket + + ( + # TEST SETUP + create_bucket + >> create_cluster + # TEST BODY + >> create_batch + # TEST TEARDOWN + >> delete_cluster + >> delete_bucket + ) from tests.system.utils.watcher import watcher diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_deferrable.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_deferrable.py index b86ae0f0c2490..b14c5c00f129b 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_deferrable.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_deferrable.py @@ -35,9 +35,8 @@ DAG_ID = "dataproc_cluster_def" PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") -CLUSTER_NAME = f"cluster-dataproc-def-{ENV_ID}" +CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-") REGION = "europe-west1" -ZONE = "europe-west1-b" # Cluster definition @@ -82,7 +81,7 @@ schedule="@once", start_date=datetime(2021, 1, 1), catchup=False, - tags=["example", "dataproc"], + tags=["example", "dataproc", "deferrable"], ) as dag: # [START how_to_cloud_dataproc_create_cluster_operator_async] create_cluster = DataprocCreateClusterOperator( @@ -119,7 +118,14 @@ ) # [END how_to_cloud_dataproc_delete_cluster_operator_async] - create_cluster >> update_cluster >> delete_cluster + ( + # TEST SETUP + create_cluster + # TEST BODY + >> update_cluster + # TEST TEARDOWN + >> delete_cluster + ) from tests.system.utils.watcher import watcher diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_generator.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_generator.py index f3f68291b2867..f9942472949ac 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_generator.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_generator.py @@ -40,7 +40,7 @@ PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}" -CLUSTER_NAME = f"dataproc-cluster-gen-{ENV_ID}" +CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-") REGION = "europe-west1" ZONE = "europe-west1-b" INIT_FILE_SRC = str(Path(__file__).parent / "resources" / "pip-install.sh") @@ -65,8 +65,6 @@ # [END how_to_cloud_dataproc_create_cluster_generate_cluster_config] -TIMEOUT = {"seconds": 1 * 24 * 60 * 60} - with models.DAG( DAG_ID, @@ -110,7 +108,16 @@ task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE ) - create_bucket >> upload_file >> create_dataproc_cluster >> [delete_cluster, delete_bucket] + ( + # TEST SETUP + create_bucket + >> upload_file + >> + # TEST BODY + create_dataproc_cluster + # TEST TEARDOWN + >> [delete_cluster, delete_bucket] + ) from tests.system.utils.watcher import watcher diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_update.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_update.py index 1607e714f5e76..ed8e9532390fe 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_update.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_update.py @@ -35,9 +35,8 @@ DAG_ID = "dataproc_update" PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") -CLUSTER_NAME = f"cluster-dataproc-update-{ENV_ID}" +CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-") REGION = "europe-west1" -ZONE = "europe-west1-b" # Cluster definition @@ -102,7 +101,14 @@ trigger_rule=TriggerRule.ALL_DONE, ) - create_cluster >> scale_cluster >> delete_cluster + ( + # TEST SETUP + create_cluster + # TEST BODY + >> scale_cluster + # TEST TEARDOWN + >> delete_cluster + ) from tests.system.utils.watcher import watcher diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_gke.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_gke.py index d2e5f0fd3d2f7..b5451d0d44770 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_gke.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_gke.py @@ -47,8 +47,8 @@ PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default") REGION = "us-central1" -CLUSTER_NAME = f"cluster-test-build-in-gke{ENV_ID}" -GKE_CLUSTER_NAME = f"test-dataproc-gke-cluster-{ENV_ID}" +CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-") +GKE_CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}-gke".replace("_", "-") WORKLOAD_POOL = f"{PROJECT_ID}.svc.id.goog" GKE_CLUSTER_CONFIG = { "name": GKE_CLUSTER_NAME, @@ -89,7 +89,7 @@ schedule="@once", start_date=datetime(2021, 1, 1), catchup=False, - tags=["example"], + tags=["example", "dataproc", "gke"], ) as dag: create_gke_cluster = GKECreateClusterOperator( task_id="create_gke_cluster", @@ -132,11 +132,13 @@ ) ( + # TEST SETUP create_gke_cluster >> add_iam_policy_binding + # TEST BODY >> create_cluster_in_gke - >> delete_gke_cluster - >> delete_dataproc_cluster + # TEST TEARDOWN + >> [delete_gke_cluster, delete_dataproc_cluster] ) from tests.system.utils.watcher import watcher diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_hadoop.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_hadoop.py index 7af2654bfed7a..4cd612c8ea36f 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_hadoop.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_hadoop.py @@ -37,9 +37,8 @@ PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}" -CLUSTER_NAME = f"dataproc-hadoop-{ENV_ID}" +CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-") REGION = "europe-west1" -ZONE = "europe-west1-b" OUTPUT_FOLDER = "wordcount" OUTPUT_PATH = f"gs://{BUCKET_NAME}/{OUTPUT_FOLDER}/" @@ -58,8 +57,6 @@ }, } -TIMEOUT = {"seconds": 1 * 24 * 60 * 60} - # Jobs definitions # [START how_to_cloud_dataproc_hadoop_config] HADOOP_JOB = { diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_hive.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_hive.py index 37ebe56dc0645..99312b2011627 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_hive.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_hive.py @@ -35,10 +35,8 @@ DAG_ID = "dataproc_hive" PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") -CLUSTER_NAME = f"cluster-dataproc-hive-{ENV_ID}" +CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-") REGION = "europe-west1" -ZONE = "europe-west1-b" - # Cluster definition # [START how_to_cloud_dataproc_create_cluster] @@ -68,8 +66,6 @@ # [END how_to_cloud_dataproc_create_cluster] -TIMEOUT = {"seconds": 1 * 24 * 60 * 60} - # [START how_to_cloud_dataproc_hive_config] HIVE_JOB = { "reference": {"project_id": PROJECT_ID}, diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_pig.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_pig.py index a449437cb31aa..1a0299bd10add 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_pig.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_pig.py @@ -35,9 +35,8 @@ ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") -CLUSTER_NAME = f"cluster-dataproc-pig-{ENV_ID}" +CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-") REGION = "europe-west1" -ZONE = "europe-west1-b" # Cluster definition @@ -54,8 +53,6 @@ }, } -TIMEOUT = {"seconds": 1 * 24 * 60 * 60} - # Jobs definitions # [START how_to_cloud_dataproc_pig_config] PIG_JOB = { diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_pyspark.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_pyspark.py index ff3e619d7d8e5..fe3884daddabd 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_pyspark.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_pyspark.py @@ -42,9 +42,8 @@ PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}" -CLUSTER_NAME = f"cluster-dataproc-pyspark-{ENV_ID}" +CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-") REGION = "europe-west1" -ZONE = "europe-west1-b" # Cluster definition CLUSTER_CONFIG = { diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_workflow.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_workflow.py index 52a9f3094f655..35d3ee0ebec45 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_workflow.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_workflow.py @@ -34,7 +34,7 @@ PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") REGION = "europe-west1" -CLUSTER_NAME = f"cluster-dataproc-workflow-{ENV_ID}" +CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-") CLUSTER_CONFIG = { "master_config": { "num_instances": 1, @@ -66,7 +66,7 @@ schedule="@once", start_date=datetime(2021, 1, 1), catchup=False, - tags=["example", "dataproc"], + tags=["example", "dataproc", "workflow"], ) as dag: # [START how_to_cloud_dataproc_create_workflow_template] create_workflow_template = DataprocCreateWorkflowTemplateOperator( @@ -89,7 +89,14 @@ ) # [END how_to_cloud_dataproc_instantiate_inline_workflow_template] - (create_workflow_template >> trigger_workflow >> instantiate_inline_workflow_template) + ( + # TEST SETUP + create_workflow_template + # TEST BODY + >> trigger_workflow + # TEST TEARDOWN + >> instantiate_inline_workflow_template + ) from tests.system.utils.watcher import watcher diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_workflow_deferrable.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_workflow_deferrable.py index d8dfab2267fef..843148bf9baf0 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_workflow_deferrable.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_workflow_deferrable.py @@ -30,11 +30,11 @@ ) ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") -DAG_ID = "dataproc_workflow" +DAG_ID = "dataproc_workflow_def" PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") REGION = "europe-west1" -CLUSTER_NAME = f"cluster-dataproc-workflow-{ENV_ID}" +CLUSTER_NAME = f"{ENV_ID}-{DAG_ID}".replace("_", "-") CLUSTER_CONFIG = { "master_config": { "num_instances": 1, @@ -66,7 +66,7 @@ schedule="@once", start_date=datetime(2021, 1, 1), catchup=False, - tags=["example", "dataproc"], + tags=["example", "dataproc", "workflow", "deferrable"], ) as dag: create_workflow_template = DataprocCreateWorkflowTemplateOperator( task_id="create_workflow_template", @@ -94,7 +94,14 @@ ) # [END how_to_cloud_dataproc_instantiate_inline_workflow_template_async] - (create_workflow_template >> trigger_workflow_async >> instantiate_inline_workflow_template_async) + ( + # TEST SETUP + create_workflow_template + # TEST BODY + >> trigger_workflow_async + # TEST TEARDOWN + >> instantiate_inline_workflow_template_async + ) from tests.system.utils.watcher import watcher