diff --git a/airflow/providers/google/cloud/operators/cloud_storage_transfer_service.py b/airflow/providers/google/cloud/operators/cloud_storage_transfer_service.py index 8ae31e33f0f95..500443f674f67 100644 --- a/airflow/providers/google/cloud/operators/cloud_storage_transfer_service.py +++ b/airflow/providers/google/cloud/operators/cloud_storage_transfer_service.py @@ -936,7 +936,7 @@ class CloudDataTransferServiceGCSToGCSOperator(GoogleCloudBaseOperator): .. seealso:: For more information on how to use this operator, take a look at the guide: - :ref:`howto/operator:GCSToGCSOperator` + :ref:`howto/operator:CloudDataTransferServiceGCSToGCSOperator` **Example**: @@ -989,6 +989,7 @@ class CloudDataTransferServiceGCSToGCSOperator(GoogleCloudBaseOperator): If set to True, 'wait' must be set to True. """ + # [START gcp_transfer_gcs_to_gcs_template_fields] template_fields: Sequence[str] = ( "gcp_conn_id", "source_bucket", @@ -999,6 +1000,8 @@ class CloudDataTransferServiceGCSToGCSOperator(GoogleCloudBaseOperator): "object_conditions", "google_impersonation_chain", ) + # [END gcp_transfer_gcs_to_gcs_template_fields] + ui_color = "#e09411" def __init__( diff --git a/docs/apache-airflow-providers-google/operators/cloud/cloud_storage_transfer_service.rst b/docs/apache-airflow-providers-google/operators/cloud/cloud_storage_transfer_service.rst index a82a796e62da1..cdeb0ea71e6f9 100644 --- a/docs/apache-airflow-providers-google/operators/cloud/cloud_storage_transfer_service.rst +++ b/docs/apache-airflow-providers-google/operators/cloud/cloud_storage_transfer_service.rst @@ -370,6 +370,34 @@ Templating :start-after: [START gcp_transfer_job_sensor_template_fields] :end-before: [END gcp_transfer_job_sensor_template_fields] +.. _howto/operator:CloudDataTransferServiceGCSToGCSOperator: + +CloudDataTransferServiceGCSToGCSOperator +----------------------------------------------- + +Copy data from one GCS bucket to another. + +For parameter definition, take a look at +:class:`~airflow.providers.google.cloud.operators.cloud_storage_transfer_service.CloudDataTransferServiceGCSToGCSOperator`. + +Using the operator +"""""""""""""""""" + +.. exampleinclude:: /../../tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcp_to_gcs.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_transfer_gcs_to_gcs] + :end-before: [END howto_operator_transfer_gcs_to_gcs] + +Templating +"""""""""" + +.. literalinclude:: /../../airflow/providers/google/cloud/operators/cloud_storage_transfer_service.py + :language: python + :dedent: 4 + :start-after: [START gcp_transfer_gcs_to_gcs_template_fields] + :end-before: [END gcp_transfer_gcs_to_gcs_template_fields] + Reference --------- diff --git a/tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcp_to_gcs.py b/tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcp_to_gcs.py new file mode 100644 index 0000000000000..01a6abeb958a0 --- /dev/null +++ b/tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcp_to_gcs.py @@ -0,0 +1,127 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Example Airflow DAG that demonstrates interactions with Google Cloud Transfer. + + +This DAG relies on the following OS environment variables + +* GCP_PROJECT_ID - Google Cloud Project to use for the Google Cloud Transfer Service. +* GCP_TRANSFER_FIRST_TARGET_BUCKET - Google Cloud Storage bucket to which files are copied from AWS. + It is also a source bucket in next step +* GCP_TRANSFER_SECOND_TARGET_BUCKET - Google Cloud Storage bucket to which files are copied +""" + +from __future__ import annotations + +import os +from datetime import datetime +from pathlib import Path + +from airflow import models +from airflow.providers.google.cloud.operators.cloud_storage_transfer_service import ( + CloudDataTransferServiceGCSToGCSOperator, +) +from airflow.providers.google.cloud.operators.gcs import ( + GCSCreateBucketOperator, + GCSDeleteBucketOperator, +) +from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator +from airflow.utils.trigger_rule import TriggerRule + +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") +DAG_ID = "example_transfer_gcs_to_gcs" + +BUCKET_NAME_SRC = f"src-bucket-{DAG_ID}-{ENV_ID}" +BUCKET_NAME_DST = f"dst-bucket-{DAG_ID}-{ENV_ID}" +FILE_NAME = "file" +FILE_URI = f"gs://{BUCKET_NAME_SRC}/{FILE_NAME}" + +CURRENT_FOLDER = Path(__file__).parent +FILE_LOCAL_PATH = str(Path(CURRENT_FOLDER) / "resources" / FILE_NAME) + + +with models.DAG( + DAG_ID, + schedule="@once", # Override to match your needs + start_date=datetime(2021, 1, 1), + catchup=False, + tags=["example", "transfer", "gcs"], +) as dag: + + create_bucket_src = GCSCreateBucketOperator( + task_id="create_bucket_src", + bucket_name=BUCKET_NAME_SRC, + project_id=PROJECT_ID, + ) + + upload_file = LocalFilesystemToGCSOperator( + task_id="upload_file", + src=FILE_LOCAL_PATH, + dst=FILE_NAME, + bucket=BUCKET_NAME_SRC, + ) + + create_bucket_dst = GCSCreateBucketOperator( + task_id="create_bucket_dst", + bucket_name=BUCKET_NAME_DST, + project_id=PROJECT_ID, + ) + + # [START howto_operator_transfer_gcs_to_gcs] + transfer_gcs_to_gcs = CloudDataTransferServiceGCSToGCSOperator( + task_id="transfer_gcs_to_gcs", + source_bucket=BUCKET_NAME_SRC, + source_path=FILE_URI, + destination_bucket=BUCKET_NAME_DST, + destination_path=FILE_URI, + wait=True, + ) + # [END howto_operator_transfer_gcs_to_gcs] + + delete_bucket_dst = GCSDeleteBucketOperator( + task_id="delete_bucket", bucket_name=BUCKET_NAME_DST, trigger_rule=TriggerRule.ALL_DONE + ) + + delete_bucket_src = GCSDeleteBucketOperator( + task_id="delete_bucket_src", bucket_name=BUCKET_NAME_SRC, trigger_rule=TriggerRule.ALL_DONE + ) + + # TEST SETUP + create_bucket_src >> upload_file + [create_bucket_dst, upload_file] >> transfer_gcs_to_gcs + ( + [create_bucket_dst, create_bucket_src >> upload_file] + # TEST BODY + >> transfer_gcs_to_gcs + # TEST TEARDOWN + >> [delete_bucket_src, delete_bucket_dst] + ) + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag)