diff --git a/PROCESSORS.md b/PROCESSORS.md index 521e5aa05c..4d02b9e760 100644 --- a/PROCESSORS.md +++ b/PROCESSORS.md @@ -609,15 +609,16 @@ Deletes an object from a Google Cloud Bucket. In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language. -| Name | Default Value | Allowable Values | Description | -|--------------------------------------|---------------|------------------|--------------------------------------------------------------------------------------------------------------------------------------------------| -| **GCP Credentials Provider Service** | | | The Controller Service used to obtain Google Cloud Platform credentials. Should be the name of a GCPCredentialsControllerService. | -| **Number of retries** | 6 | | How many retry attempts should be made before routing to the failure relationship. | -| Endpoint Override URL | | | Overrides the default Google Cloud Storage endpoints
**Supports Expression Language: true** | -| Bucket | ${gcs.bucket} | | Bucket of the object.
**Supports Expression Language: true** | -| Key | ${filename} | | Name of the object.
**Supports Expression Language: true** | -| Server Side Encryption Key | | | The AES256 Encryption Key (encoded in base64) for server-side decryption of the object.
**Supports Expression Language: true** | -| Object Generation | | | The generation of the Object to download. If left empty, then it will download the latest generation.
**Supports Expression Language: true** | +| Name | Default Value | Allowable Values | Description | +|--------------------------------------|---------------|------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| **GCP Credentials Provider Service** | | | The Controller Service used to obtain Google Cloud Platform credentials. Should be the name of a GCPCredentialsControllerService. | +| **Number of retries** | 6 | | How many retry attempts should be made before routing to the failure relationship. | +| Endpoint Override URL | | | Overrides the default Google Cloud Storage endpoints
**Supports Expression Language: true** | +| Proxy Configuration Service | | | Specifies the Proxy Configuration Controller Service to proxy network requests. When used, this will override any values specified for Proxy Host, Proxy Port, Proxy Username, and Proxy Password properties. | +| Bucket | ${gcs.bucket} | | Bucket of the object.
**Supports Expression Language: true** | +| Key | ${filename} | | Name of the object.
**Supports Expression Language: true** | +| Server Side Encryption Key | | | The AES256 Encryption Key (encoded in base64) for server-side decryption of the object.
**Supports Expression Language: true** | +| Object Generation | | | The generation of the Object to download. If left empty, then it will download the latest generation.
**Supports Expression Language: true** | ### Relationships @@ -909,15 +910,16 @@ Fetches a file from a Google Cloud Bucket. Designed to be used in tandem with Li In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language. -| Name | Default Value | Allowable Values | Description | -|--------------------------------------|---------------|------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| **GCP Credentials Provider Service** | | | The Controller Service used to obtain Google Cloud Platform credentials. Should be the name of a GCPCredentialsControllerService. | -| **Number of retries** | 6 | | How many retry attempts should be made before routing to the failure relationship. | -| Endpoint Override URL | | | Overrides the default Google Cloud Storage endpoints
**Supports Expression Language: true** | -| Bucket | ${gcs.bucket} | | Bucket of the object.
**Supports Expression Language: true** | -| Key | ${filename} | | Name of the object.
**Supports Expression Language: true** | -| Server Side Encryption Key | | | The AES256 Encryption Key (encoded in base64) for server-side decryption of the object.
**Sensitive Property: true**
**Supports Expression Language: true** | -| Object Generation | | | The generation of the Object to download. If left empty, then it will download the latest generation.
**Supports Expression Language: true** | +| Name | Default Value | Allowable Values | Description | +|--------------------------------------|---------------|------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| **GCP Credentials Provider Service** | | | The Controller Service used to obtain Google Cloud Platform credentials. Should be the name of a GCPCredentialsControllerService. | +| **Number of retries** | 6 | | How many retry attempts should be made before routing to the failure relationship. | +| Endpoint Override URL | | | Overrides the default Google Cloud Storage endpoints
**Supports Expression Language: true** | +| Proxy Configuration Service | | | Specifies the Proxy Configuration Controller Service to proxy network requests. When used, this will override any values specified for Proxy Host, Proxy Port, Proxy Username, and Proxy Password properties. | +| Bucket | ${gcs.bucket} | | Bucket of the object.
**Supports Expression Language: true** | +| Key | ${filename} | | Name of the object.
**Supports Expression Language: true** | +| Server Side Encryption Key | | | The AES256 Encryption Key (encoded in base64) for server-side decryption of the object.
**Sensitive Property: true**
**Supports Expression Language: true** | +| Object Generation | | | The generation of the Object to download. If left empty, then it will download the latest generation.
**Supports Expression Language: true** | ### Relationships @@ -1632,13 +1634,14 @@ Retrieves a listing of objects from an GCS bucket. For each object that is liste In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language. -| Name | Default Value | Allowable Values | Description | -|--------------------------------------|---------------|------------------|-----------------------------------------------------------------------------------------------------------------------------------| -| **GCP Credentials Provider Service** | | | The Controller Service used to obtain Google Cloud Platform credentials. Should be the name of a GCPCredentialsControllerService. | -| **Number of retries** | 6 | | How many retry attempts should be made before routing to the failure relationship. | -| Endpoint Override URL | | | Overrides the default Google Cloud Storage endpoints
**Supports Expression Language: true** | -| **Bucket** | | | Bucket of the object.
**Supports Expression Language: true** | -| List all versions | false | true
false | Set this option to `true` to get all the previous versions separately. | +| Name | Default Value | Allowable Values | Description | +|--------------------------------------|---------------|------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| **GCP Credentials Provider Service** | | | The Controller Service used to obtain Google Cloud Platform credentials. Should be the name of a GCPCredentialsControllerService. | +| **Number of retries** | 6 | | How many retry attempts should be made before routing to the failure relationship. | +| Endpoint Override URL | | | Overrides the default Google Cloud Storage endpoints
**Supports Expression Language: true** | +| Proxy Configuration Service | | | Specifies the Proxy Configuration Controller Service to proxy network requests. When used, this will override any values specified for Proxy Host, Proxy Port, Proxy Username, and Proxy Password properties. | +| **Bucket** | | | Bucket of the object.
**Supports Expression Language: true** | +| List all versions | false | true
false | Set this option to `true` to get all the previous versions separately. | ### Relationships @@ -2447,19 +2450,20 @@ Puts flow files to a Google Cloud Storage Bucket. In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language. -| Name | Default Value | Allowable Values | Description | -|--------------------------------------|---------------|------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| **GCP Credentials Provider Service** | | | The Controller Service used to obtain Google Cloud Platform credentials. Should be the name of a GCPCredentialsControllerService. | -| **Number of retries** | 6 | | How many retry attempts should be made before routing to the failure relationship. | -| Endpoint Override URL | | | Overrides the default Google Cloud Storage endpoints
**Supports Expression Language: true** | -| Bucket | ${gcs.bucket} | | Bucket of the object.
**Supports Expression Language: true** | -| Key | ${filename} | | Name of the object.
**Supports Expression Language: true** | -| Content Type | ${mime.type} | | Content Type for the file, i.e. text/plain
**Supports Expression Language: true** | -| MD5 Hash | | | MD5 Hash (encoded in Base64) of the file for server-side validation.
**Supports Expression Language: true** | -| CRC32C Checksum | | | CRC32C Checksum (encoded in Base64, big-Endian order) of the file for server-side validation.
**Supports Expression Language: true** | -| Server Side Encryption Key | | | An AES256 Encryption Key (encoded in base64) for server-side encryption of the object.
**Sensitive Property: true**
**Supports Expression Language: true** | -| Object ACL | | authenticatedRead
bucketOwnerFullControl
bucketOwnerRead
private
projectPrivate
publicRead
publicReadWrite | Access Control to be attached to the object uploaded. Not providing this will revert to bucket defaults. | -| Overwrite Object | true | true
false | If false, the upload to GCS will succeed only if the object does not exist. | +| Name | Default Value | Allowable Values | Description | +|--------------------------------------|---------------|------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| **GCP Credentials Provider Service** | | | The Controller Service used to obtain Google Cloud Platform credentials. Should be the name of a GCPCredentialsControllerService. | +| **Number of retries** | 6 | | How many retry attempts should be made before routing to the failure relationship. | +| Endpoint Override URL | | | Overrides the default Google Cloud Storage endpoints
**Supports Expression Language: true** | +| Proxy Configuration Service | | | Specifies the Proxy Configuration Controller Service to proxy network requests. When used, this will override any values specified for Proxy Host, Proxy Port, Proxy Username, and Proxy Password properties. | +| Bucket | ${gcs.bucket} | | Bucket of the object.
**Supports Expression Language: true** | +| Key | ${filename} | | Name of the object.
**Supports Expression Language: true** | +| Content Type | ${mime.type} | | Content Type for the file, i.e. text/plain
**Supports Expression Language: true** | +| MD5 Hash | | | MD5 Hash (encoded in Base64) of the file for server-side validation.
**Supports Expression Language: true** | +| CRC32C Checksum | | | CRC32C Checksum (encoded in Base64, big-Endian order) of the file for server-side validation.
**Supports Expression Language: true** | +| Server Side Encryption Key | | | An AES256 Encryption Key (encoded in base64) for server-side encryption of the object.
**Sensitive Property: true**
**Supports Expression Language: true** | +| Object ACL | | authenticatedRead
bucketOwnerFullControl
bucketOwnerRead
private
projectPrivate
publicRead
publicReadWrite | Access Control to be attached to the object uploaded. Not providing this will revert to bucket defaults. | +| Overwrite Object | true | true
false | If false, the upload to GCS will succeed only if the object does not exist. | ### Relationships diff --git a/behave_framework/src/minifi_test_framework/containers/container.py b/behave_framework/src/minifi_test_framework/containers/container.py index 8c59586f5d..24b048898c 100644 --- a/behave_framework/src/minifi_test_framework/containers/container.py +++ b/behave_framework/src/minifi_test_framework/containers/container.py @@ -47,6 +47,9 @@ def __init__(self, image_name: str, container_name: str, network: Network, comma self.ports: dict[str, int] | None = None self.environment: list[str] = [] + def is_deployed(self) -> bool: + return self.container is not None + def add_host_file(self, host_path: str, container_path: str, mode: str = "ro"): self.host_files.append(HostFile(container_path, host_path, mode)) @@ -75,6 +78,10 @@ def _configure_volumes_of_container_dirs(self): self.volumes[temp_path] = {"bind": directory.path, "mode": directory.mode} def deploy(self) -> bool: + if self.is_deployed(): + logging.info(f"Container '{self.container_name}' is already deployed.") + return True + self._temp_dir = tempfile.TemporaryDirectory() self._configure_volumes_of_container_files() self._configure_volumes_of_container_dirs() diff --git a/extensions/gcp/processors/GCSProcessor.cpp b/extensions/gcp/processors/GCSProcessor.cpp index 91d48d64a6..cdb2bcdc13 100644 --- a/extensions/gcp/processors/GCSProcessor.cpp +++ b/extensions/gcp/processors/GCSProcessor.cpp @@ -48,6 +48,23 @@ void GCSProcessor::onSchedule(core::ProcessContext& context, core::ProcessSessio endpoint_url_ = context.getProperty(EndpointOverrideURL) | utils::toOptional(); if (endpoint_url_) logger_->log_debug("Endpoint overwritten: {}", *endpoint_url_); + + auto proxy_controller_service = minifi::utils::parseOptionalControllerService(context, ProxyConfigurationService, getUUID()); + if (proxy_controller_service) { + logger_->log_debug("Proxy configuration is set for GCS processor"); + + proxy_ = google::cloud::ProxyConfig{}; + proxy_->set_hostname(proxy_controller_service->getHost()).set_scheme(proxy_controller_service->getProxyType() == minifi::controllers::ProxyType::HTTPS ? "https" : "http"); + if (proxy_controller_service->getPort()) { + proxy_->set_port(std::to_string(*proxy_controller_service->getPort())); + } + if (proxy_controller_service->getUsername()) { + proxy_->set_username(*proxy_controller_service->getUsername()); + } + if (proxy_controller_service->getPassword()) { + proxy_->set_password(*proxy_controller_service->getPassword()); + } + } } gcs::Client GCSProcessor::getClient() const { @@ -55,6 +72,10 @@ gcs::Client GCSProcessor::getClient() const { .set(gcp_credentials_) .set(retry_policy_); + if (proxy_) { + options.set(*proxy_); + } + if (endpoint_url_) { options.set(*endpoint_url_); } diff --git a/extensions/gcp/processors/GCSProcessor.h b/extensions/gcp/processors/GCSProcessor.h index 1ec2b6641a..3fb86ef304 100644 --- a/extensions/gcp/processors/GCSProcessor.h +++ b/extensions/gcp/processors/GCSProcessor.h @@ -30,6 +30,8 @@ #include "google/cloud/credentials.h" #include "google/cloud/storage/client.h" #include "google/cloud/storage/retry_policy.h" +#include "minifi-cpp/controllers/ProxyConfigurationServiceInterface.h" +#include "controllers/ProxyConfiguration.h" namespace org::apache::nifi::minifi::extensions::gcp { class GCSProcessor : public core::ProcessorImpl { @@ -53,10 +55,16 @@ class GCSProcessor : public core::ProcessorImpl { .isRequired(false) .supportsExpressionLanguage(true) .build(); + EXTENSIONAPI static constexpr auto ProxyConfigurationService = core::PropertyDefinitionBuilder<>::createProperty("Proxy Configuration Service") + .withDescription("Specifies the Proxy Configuration Controller Service to proxy network requests. When used, " + "this will override any values specified for Proxy Host, Proxy Port, Proxy Username, and Proxy Password properties.") + .withAllowedTypes() + .build(); EXTENSIONAPI static constexpr auto Properties = std::to_array({ GCPCredentials, NumberOfRetries, - EndpointOverrideURL + EndpointOverrideURL, + ProxyConfigurationService }); @@ -68,6 +76,7 @@ class GCSProcessor : public core::ProcessorImpl { std::optional endpoint_url_; std::shared_ptr gcp_credentials_; + std::optional proxy_; google::cloud::storage::RetryPolicyOption::Type retry_policy_ = std::make_shared(6); }; diff --git a/extensions/gcp/tests/features/google_cloud_storage.feature b/extensions/gcp/tests/features/google_cloud_storage.feature index 2c19abcf33..ac0a6c40db 100644 --- a/extensions/gcp/tests/features/google_cloud_storage.feature +++ b/extensions/gcp/tests/features/google_cloud_storage.feature @@ -35,11 +35,47 @@ Feature: Sending data to Google Cloud Storage using PutGCSObject And the "failure" relationship of the PutGCSObject processor is connected to the PutGCSObject And PutFile's success relationship is auto-terminated - When the MiNiFi instance starts up + When all instances start up Then a single file with the content "hello_gcs" is placed in the "/tmp/output" directory in less than 45 seconds And an object with the content "hello_gcs" is present in the Google Cloud storage + Scenario Outline: A MiNiFi instance can upload data to Google Cloud storage through a http proxy + Given the http proxy server is set up + And a GetFile processor with the "Input Directory" property set to "/tmp/input" + And the "Keep Source File" property of the GetFile processor is set to "true" + And the scheduling period of the GetFile processor is set to "60 sec" + And a file with the content "hello_gcs" is present in "/tmp/input" + And a Google Cloud storage server is set up + And a PutGCSObject processor + And the "Proxy Configuration Service" property of the PutGCSObject processor is set to "ProxyConfigurationService" + And PutGCSObject is EVENT_DRIVEN + And a GCPCredentialsControllerService controller service is set up + And the "Credentials Location" property of the GCPCredentialsControllerService controller service is set to "Use Anonymous credentials" + And the "GCP Credentials Provider Service" property of the PutGCSObject processor is set to "GCPCredentialsControllerService" + And the "Bucket" property of the PutGCSObject processor is set to "test-bucket" + And the "Number of retries" property of the PutGCSObject processor is set to "2" + And the "Endpoint Override URL" property of the PutGCSObject processor is set to "fake-gcs-server-${scenario_id}:4443" + And a PutFile processor with the "Directory" property set to "/tmp/output" + And PutFile is EVENT_DRIVEN + And a ProxyConfigurationService controller service is set up with proxy configuration + And the "success" relationship of the GetFile processor is connected to the PutGCSObject + And the "success" relationship of the PutGCSObject processor is connected to the PutFile + And the "failure" relationship of the PutGCSObject processor is connected to the PutGCSObject + And PutFile's success relationship is auto-terminated + And PutFile's failure relationship is auto-terminated + + When all instances start up + + Then a single file with the content "hello_gcs" is placed in the "/tmp/output" directory in less than 60 seconds + And an object with the content "hello_gcs" is present in the Google Cloud storage + And no errors were generated on the http-proxy regarding "http://fake-gcs-server-${scenario_id}:4443/" + + Examples: Proxy Type + | proxy type | + | HTTP | + | HTTPS | + Scenario: A MiNiFi instance can fetch the listed objects from Google Cloud storage bucket Given a Google Cloud storage server is set up and a single object with contents "preloaded data" is present And a GCPCredentialsControllerService controller service is set up @@ -65,6 +101,42 @@ Feature: Sending data to Google Cloud Storage using PutGCSObject Then a single file with the content "preloaded data" is placed in the "/tmp/output" directory in less than 10 seconds + Scenario Outline: A MiNiFi instance can fetch the listed objects from Google Cloud storage bucket through a http proxy + Given the http proxy server is set up + And a Google Cloud storage server is set up and a single object with contents "preloaded data" is present + And a GCPCredentialsControllerService controller service is set up + And the "Credentials Location" property of the GCPCredentialsControllerService controller service is set to "Use Anonymous credentials" + And a ListGCSBucket processor + And the "Bucket" property of the ListGCSBucket processor is set to "test-bucket" + And the "Number of retries" property of the ListGCSBucket processor is set to "2" + And the "Endpoint Override URL" property of the ListGCSBucket processor is set to "fake-gcs-server-${scenario_id}:4443" + And the "GCP Credentials Provider Service" property of the ListGCSBucket processor is set to "GCPCredentialsControllerService" + And the "Proxy Configuration Service" property of the ListGCSBucket processor is set to "ProxyConfigurationService" + And a FetchGCSObject processor + And FetchGCSObject is EVENT_DRIVEN + And the "Bucket" property of the FetchGCSObject processor is set to "test-bucket" + And the "Number of retries" property of the FetchGCSObject processor is set to "2" + And the "Endpoint Override URL" property of the FetchGCSObject processor is set to "fake-gcs-server-${scenario_id}:4443" + And the "GCP Credentials Provider Service" property of the FetchGCSObject processor is set to "GCPCredentialsControllerService" + And the "Proxy Configuration Service" property of the FetchGCSObject processor is set to "ProxyConfigurationService" + And a PutFile processor with the "Directory" property set to "/tmp/output" + And PutFile is EVENT_DRIVEN + And a ProxyConfigurationService controller service is set up with proxy configuration + And the "success" relationship of the ListGCSBucket processor is connected to the FetchGCSObject + And the "success" relationship of the FetchGCSObject processor is connected to the PutFile + And PutFile's success relationship is auto-terminated + And PutFile's failure relationship is auto-terminated + + When all instances start up + + Then a single file with the content "preloaded data" is placed in the "/tmp/output" directory in less than 20 seconds + And no errors were generated on the http-proxy regarding "http://fake-gcs-server-${scenario_id}:4443/" + + Examples: Proxy Type + | proxy type | + | HTTP | + | HTTPS | + Scenario: A MiNiFi instance can delete the listed objects from Google Cloud storage bucket Given a Google Cloud storage server is set up with some test data And a GCPCredentialsControllerService controller service is set up @@ -89,3 +161,39 @@ Feature: Sending data to Google Cloud Storage using PutGCSObject Then the test bucket of Google Cloud Storage is empty And at least one empty file is placed in the "/tmp/output" directory in less than 10 seconds + + Scenario Outline: A MiNiFi instance can delete the listed objects from Google Cloud storage bucket through a http proxy + Given the http proxy server is set up + Given a Google Cloud storage server is set up with some test data + And a GCPCredentialsControllerService controller service is set up + And the "Credentials Location" property of the GCPCredentialsControllerService controller service is set to "Use Anonymous credentials" + And a ListGCSBucket processor + And the "Bucket" property of the ListGCSBucket processor is set to "test-bucket" + And the "Number of retries" property of the ListGCSBucket processor is set to "2" + And the "Endpoint Override URL" property of the ListGCSBucket processor is set to "fake-gcs-server-${scenario_id}:4443" + And the "GCP Credentials Provider Service" property of the ListGCSBucket processor is set to "GCPCredentialsControllerService" + And the "Proxy Configuration Service" property of the ListGCSBucket processor is set to "ProxyConfigurationService" + And a DeleteGCSObject processor + And DeleteGCSObject is EVENT_DRIVEN + And the "Bucket" property of the DeleteGCSObject processor is set to "test-bucket" + And the "Number of retries" property of the DeleteGCSObject processor is set to "2" + And the "Endpoint Override URL" property of the DeleteGCSObject processor is set to "fake-gcs-server-${scenario_id}:4443" + And the "GCP Credentials Provider Service" property of the DeleteGCSObject processor is set to "GCPCredentialsControllerService" + And the "Proxy Configuration Service" property of the DeleteGCSObject processor is set to "ProxyConfigurationService" + And a PutFile processor with the "Directory" property set to "/tmp/output" + And a ProxyConfigurationService controller service is set up with proxy configuration + And the "success" relationship of the ListGCSBucket processor is connected to the DeleteGCSObject + And the "success" relationship of the DeleteGCSObject processor is connected to the PutFile + And PutFile's success relationship is auto-terminated + And PutFile's failure relationship is auto-terminated + + When all instances start up + + Then the test bucket of Google Cloud Storage is empty + And at least one empty file is placed in the "/tmp/output" directory in less than 10 seconds + And no errors were generated on the http-proxy regarding "http://fake-gcs-server-${scenario_id}:4443/" + + Examples: Proxy Type + | proxy type | + | HTTP | + | HTTPS |