Skip to content

Commit

Permalink
Fix Dataproc system tests (apache#32807)
Browse files Browse the repository at this point in the history
  • Loading branch information
moiseenkov authored Jul 31, 2023
1 parent ce5eebd commit 42465c5
Show file tree
Hide file tree
Showing 13 changed files with 96 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
REGION = "europe-west1"

BATCH_ID = f"test-batch-id-{ENV_ID}"
BATCH_ID_2 = f"test-batch-id-{ENV_ID}-2"
BATCH_ID_3 = f"test-batch-id-{ENV_ID}-3"
BATCH_ID_4 = f"test-batch-id-{ENV_ID}-4"
BATCH_ID = f"batch-{ENV_ID}-{DAG_ID}".replace("_", "-")
BATCH_ID_2 = f"batch-{ENV_ID}-{DAG_ID}-2".replace("_", "-")
BATCH_ID_3 = f"batch-{ENV_ID}-{DAG_ID}-3".replace("_", "-")
BATCH_ID_4 = f"batch-{ENV_ID}-{DAG_ID}-4".replace("_", "-")

BATCH_CONFIG = {
"spark_batch": {
Expand Down Expand Up @@ -154,19 +154,15 @@
delete_batch_4.trigger_rule = TriggerRule.ALL_DONE

(
create_batch
>> create_batch_2
>> create_batch_3
# TEST SETUP
[create_batch, create_batch_2, create_batch_3]
# TEST BODY
>> batch_async_sensor
>> get_batch
>> get_batch_2
>> list_batches
>> [get_batch, get_batch_2, list_batches]
>> create_batch_4
>> cancel_operation
>> delete_batch
>> delete_batch_2
>> delete_batch_3
>> delete_batch_4
# TEST TEARDOWN
>> [delete_batch, delete_batch_2, delete_batch_3, delete_batch_4]
)

from tests.system.utils.watcher import watcher
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
DAG_ID = "dataproc_batch_deferrable"
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
REGION = "europe-west1"
BATCH_ID = f"test-def-batch-id-{ENV_ID}"
BATCH_ID = f"batch-{ENV_ID}-{DAG_ID}".replace("_", "-")
BATCH_CONFIG = {
"spark_batch": {
"jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
Expand All @@ -50,7 +50,7 @@
schedule="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example", "dataproc"],
tags=["example", "dataproc", "batch", "deferrable"],
) as dag:
# [START how_to_cloud_dataproc_create_batch_operator_async]
create_batch = DataprocCreateBatchOperator(
Expand All @@ -75,7 +75,14 @@
)
delete_batch.trigger_rule = TriggerRule.ALL_DONE

(create_batch >> get_batch >> delete_batch)
(
# TEST SETUP
create_batch
# TEST BODY
>> get_batch
# TEST TEARDOWN
>> delete_batch
)

from tests.system.utils.watcher import watcher

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
REGION = "europe-west1"
CLUSTER_NAME = f"dataproc-cluster-ps-{ENV_ID}"
BATCH_ID = f"batch-ps-{ENV_ID}"
CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-")
BATCH_ID = f"batch-{ENV_ID}-{DAG_ID}".replace("_", "-")

CLUSTER_GENERATOR_CONFIG_FOR_PHS = ClusterGenerator(
project_id=PROJECT_ID,
Expand Down Expand Up @@ -71,7 +71,7 @@
schedule="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example", "dataproc"],
tags=["example", "dataproc", "batch", "persistent"],
) as dag:
create_bucket = GCSCreateBucketOperator(
task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
Expand Down Expand Up @@ -108,7 +108,17 @@
delete_bucket = GCSDeleteBucketOperator(
task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
)
create_bucket >> create_cluster >> create_batch >> delete_cluster >> delete_bucket

(
# TEST SETUP
create_bucket
>> create_cluster
# TEST BODY
>> create_batch
# TEST TEARDOWN
>> delete_cluster
>> delete_bucket
)

from tests.system.utils.watcher import watcher

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,8 @@
DAG_ID = "dataproc_cluster_def"
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")

CLUSTER_NAME = f"cluster-dataproc-def-{ENV_ID}"
CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-")
REGION = "europe-west1"
ZONE = "europe-west1-b"


# Cluster definition
Expand Down Expand Up @@ -82,7 +81,7 @@
schedule="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example", "dataproc"],
tags=["example", "dataproc", "deferrable"],
) as dag:
# [START how_to_cloud_dataproc_create_cluster_operator_async]
create_cluster = DataprocCreateClusterOperator(
Expand Down Expand Up @@ -119,7 +118,14 @@
)
# [END how_to_cloud_dataproc_delete_cluster_operator_async]

create_cluster >> update_cluster >> delete_cluster
(
# TEST SETUP
create_cluster
# TEST BODY
>> update_cluster
# TEST TEARDOWN
>> delete_cluster
)

from tests.system.utils.watcher import watcher

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")

BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
CLUSTER_NAME = f"dataproc-cluster-gen-{ENV_ID}"
CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-")
REGION = "europe-west1"
ZONE = "europe-west1-b"
INIT_FILE_SRC = str(Path(__file__).parent / "resources" / "pip-install.sh")
Expand All @@ -65,8 +65,6 @@

# [END how_to_cloud_dataproc_create_cluster_generate_cluster_config]

TIMEOUT = {"seconds": 1 * 24 * 60 * 60}


with models.DAG(
DAG_ID,
Expand Down Expand Up @@ -110,7 +108,16 @@
task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
)

create_bucket >> upload_file >> create_dataproc_cluster >> [delete_cluster, delete_bucket]
(
# TEST SETUP
create_bucket
>> upload_file
>>
# TEST BODY
create_dataproc_cluster
# TEST TEARDOWN
>> [delete_cluster, delete_bucket]
)

from tests.system.utils.watcher import watcher

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,8 @@
DAG_ID = "dataproc_update"
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")

CLUSTER_NAME = f"cluster-dataproc-update-{ENV_ID}"
CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-")
REGION = "europe-west1"
ZONE = "europe-west1-b"


# Cluster definition
Expand Down Expand Up @@ -102,7 +101,14 @@
trigger_rule=TriggerRule.ALL_DONE,
)

create_cluster >> scale_cluster >> delete_cluster
(
# TEST SETUP
create_cluster
# TEST BODY
>> scale_cluster
# TEST TEARDOWN
>> delete_cluster
)

from tests.system.utils.watcher import watcher

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")

REGION = "us-central1"
CLUSTER_NAME = f"cluster-test-build-in-gke{ENV_ID}"
GKE_CLUSTER_NAME = f"test-dataproc-gke-cluster-{ENV_ID}"
CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-")
GKE_CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}-gke".replace("_", "-")
WORKLOAD_POOL = f"{PROJECT_ID}.svc.id.goog"
GKE_CLUSTER_CONFIG = {
"name": GKE_CLUSTER_NAME,
Expand Down Expand Up @@ -89,7 +89,7 @@
schedule="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
tags=["example", "dataproc", "gke"],
) as dag:
create_gke_cluster = GKECreateClusterOperator(
task_id="create_gke_cluster",
Expand Down Expand Up @@ -132,11 +132,13 @@
)

(
# TEST SETUP
create_gke_cluster
>> add_iam_policy_binding
# TEST BODY
>> create_cluster_in_gke
>> delete_gke_cluster
>> delete_dataproc_cluster
# TEST TEARDOWN
>> [delete_gke_cluster, delete_dataproc_cluster]
)

from tests.system.utils.watcher import watcher
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,8 @@
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")

BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
CLUSTER_NAME = f"dataproc-hadoop-{ENV_ID}"
CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-")
REGION = "europe-west1"
ZONE = "europe-west1-b"

OUTPUT_FOLDER = "wordcount"
OUTPUT_PATH = f"gs://{BUCKET_NAME}/{OUTPUT_FOLDER}/"
Expand All @@ -58,8 +57,6 @@
},
}

TIMEOUT = {"seconds": 1 * 24 * 60 * 60}

# Jobs definitions
# [START how_to_cloud_dataproc_hadoop_config]
HADOOP_JOB = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,8 @@
DAG_ID = "dataproc_hive"
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")

CLUSTER_NAME = f"cluster-dataproc-hive-{ENV_ID}"
CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-")
REGION = "europe-west1"
ZONE = "europe-west1-b"


# Cluster definition
# [START how_to_cloud_dataproc_create_cluster]
Expand Down Expand Up @@ -68,8 +66,6 @@

# [END how_to_cloud_dataproc_create_cluster]

TIMEOUT = {"seconds": 1 * 24 * 60 * 60}

# [START how_to_cloud_dataproc_hive_config]
HIVE_JOB = {
"reference": {"project_id": PROJECT_ID},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,8 @@
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")

CLUSTER_NAME = f"cluster-dataproc-pig-{ENV_ID}"
CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-")
REGION = "europe-west1"
ZONE = "europe-west1-b"


# Cluster definition
Expand All @@ -54,8 +53,6 @@
},
}

TIMEOUT = {"seconds": 1 * 24 * 60 * 60}

# Jobs definitions
# [START how_to_cloud_dataproc_pig_config]
PIG_JOB = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,8 @@
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")

BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
CLUSTER_NAME = f"cluster-dataproc-pyspark-{ENV_ID}"
CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-")
REGION = "europe-west1"
ZONE = "europe-west1-b"

# Cluster definition
CLUSTER_CONFIG = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")

REGION = "europe-west1"
CLUSTER_NAME = f"cluster-dataproc-workflow-{ENV_ID}"
CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-")
CLUSTER_CONFIG = {
"master_config": {
"num_instances": 1,
Expand Down Expand Up @@ -66,7 +66,7 @@
schedule="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example", "dataproc"],
tags=["example", "dataproc", "workflow"],
) as dag:
# [START how_to_cloud_dataproc_create_workflow_template]
create_workflow_template = DataprocCreateWorkflowTemplateOperator(
Expand All @@ -89,7 +89,14 @@
)
# [END how_to_cloud_dataproc_instantiate_inline_workflow_template]

(create_workflow_template >> trigger_workflow >> instantiate_inline_workflow_template)
(
# TEST SETUP
create_workflow_template
# TEST BODY
>> trigger_workflow
# TEST TEARDOWN
>> instantiate_inline_workflow_template
)

from tests.system.utils.watcher import watcher

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@
)

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "dataproc_workflow"
DAG_ID = "dataproc_workflow_def"
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")

REGION = "europe-west1"
CLUSTER_NAME = f"cluster-dataproc-workflow-{ENV_ID}"
CLUSTER_NAME = f"{ENV_ID}-{DAG_ID}".replace("_", "-")
CLUSTER_CONFIG = {
"master_config": {
"num_instances": 1,
Expand Down Expand Up @@ -66,7 +66,7 @@
schedule="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example", "dataproc"],
tags=["example", "dataproc", "workflow", "deferrable"],
) as dag:
create_workflow_template = DataprocCreateWorkflowTemplateOperator(
task_id="create_workflow_template",
Expand Down Expand Up @@ -94,7 +94,14 @@
)
# [END how_to_cloud_dataproc_instantiate_inline_workflow_template_async]

(create_workflow_template >> trigger_workflow_async >> instantiate_inline_workflow_template_async)
(
# TEST SETUP
create_workflow_template
# TEST BODY
>> trigger_workflow_async
# TEST TEARDOWN
>> instantiate_inline_workflow_template_async
)

from tests.system.utils.watcher import watcher

Expand Down

0 comments on commit 42465c5

Please sign in to comment.