From 94a8480aca6d24a0c9445ac98664b3c01d41a80c Mon Sep 17 00:00:00 2001 From: Gabor Gyimesi Date: Thu, 12 Feb 2026 17:12:47 +0100 Subject: [PATCH 1/3] MINIFICPP-2703 Update GCP extension to use unified credentials Closes #2086 Signed-off-by: Marton Szasz --- cmake/GoogleCloudCpp.cmake | 4 +- .../GCPCredentialsControllerService.cpp | 37 ++++++------------- .../GCPCredentialsControllerService.h | 9 ++--- extensions/gcp/processors/GCSProcessor.cpp | 14 ++++--- extensions/gcp/processors/GCSProcessor.h | 6 +-- .../GCPCredentialsControllerServiceTests.cpp | 17 +++++---- 6 files changed, 39 insertions(+), 48 deletions(-) diff --git a/cmake/GoogleCloudCpp.cmake b/cmake/GoogleCloudCpp.cmake index 071f83d5fe..0d34e78568 100644 --- a/cmake/GoogleCloudCpp.cmake +++ b/cmake/GoogleCloudCpp.cmake @@ -49,8 +49,8 @@ set(GOOGLE_CLOUD_CPP_ENABLE_MACOS_OPENSSL_CHECK OFF CACHE INTERNAL macos-openssl set(BUILD_TESTING OFF CACHE INTERNAL testing-off) set(GOOGLE_CLOUD_CPP_ENABLE_WERROR OFF CACHE INTERNAL warnings-off) FetchContent_Declare(google-cloud-cpp - URL https://github.com/googleapis/google-cloud-cpp/archive/refs/tags/v2.38.0.tar.gz - URL_HASH SHA256=f1493b2dce9b379714342f2be7ccb483d70d13aac09d4a90ae3b4756693b72fc + URL https://github.com/googleapis/google-cloud-cpp/archive/refs/tags/v2.45.0.tar.gz + URL_HASH SHA256=3d1b5eb696832f9071bf7ef0b3f0c9fd27c1a39d5edcb8a9976c65193319fd01 PATCH_COMMAND "${PC}" SYSTEM) if (WIN32) diff --git a/extensions/gcp/controllerservices/GCPCredentialsControllerService.cpp b/extensions/gcp/controllerservices/GCPCredentialsControllerService.cpp index eb9842fc45..9e93c29422 100644 --- a/extensions/gcp/controllerservices/GCPCredentialsControllerService.cpp +++ b/extensions/gcp/controllerservices/GCPCredentialsControllerService.cpp @@ -21,8 +21,7 @@ #include "core/Resource.h" #include "google/cloud/storage/client.h" #include "utils/ProcessorConfigUtils.h" - -namespace gcs = ::google::cloud::storage; +#include "utils/file/FileUtils.h" namespace org::apache::nifi::minifi::extensions::gcp { @@ -30,43 +29,29 @@ void GCPCredentialsControllerService::initialize() { setSupportedProperties(Properties); } -std::shared_ptr GCPCredentialsControllerService::createDefaultCredentials() const { - auto default_credentials = gcs::oauth2::CreateServiceAccountCredentialsFromDefaultPaths(); - if (!default_credentials.ok()) { - logger_->log_error("{}", default_credentials.status().message()); - return nullptr; - } - return *default_credentials; -} - -std::shared_ptr GCPCredentialsControllerService::createCredentialsFromJsonPath() const { +std::shared_ptr GCPCredentialsControllerService::createCredentialsFromJsonPath() const { const auto json_path = getProperty(JsonFilePath.name); if (!json_path) { logger_->log_error("Missing or invalid {}", JsonFilePath.name); return nullptr; } - auto json_path_credentials = gcs::oauth2::CreateServiceAccountCredentialsFromJsonFilePath(*json_path); - if (!json_path_credentials.ok()) { - logger_->log_error("{}", json_path_credentials.status().message()); + if (!utils::file::exists(*json_path)) { + logger_->log_error("JSON file for GCP credentials '{}' does not exist", *json_path); return nullptr; } - return *json_path_credentials; + + return google::cloud::MakeServiceAccountCredentials(utils::file::get_content(*json_path)); } -std::shared_ptr GCPCredentialsControllerService::createCredentialsFromJsonContents() const { +std::shared_ptr GCPCredentialsControllerService::createCredentialsFromJsonContents() const { auto json_contents = getProperty(JsonContents.name); if (!json_contents) { logger_->log_error("Missing or invalid {}", JsonContents.name); return nullptr; } - auto json_path_credentials = gcs::oauth2::CreateServiceAccountCredentialsFromJsonContents(*json_contents); - if (!json_path_credentials.ok()) { - logger_->log_error("{}", json_path_credentials.status().message()); - return nullptr; - } - return *json_path_credentials; + return google::cloud::MakeServiceAccountCredentials(*json_contents); } void GCPCredentialsControllerService::onEnable() { @@ -79,15 +64,15 @@ void GCPCredentialsControllerService::onEnable() { credentials_location = CredentialsLocation::USE_DEFAULT_CREDENTIALS; } if (*credentials_location == CredentialsLocation::USE_DEFAULT_CREDENTIALS) { - credentials_ = createDefaultCredentials(); + credentials_ = google::cloud::MakeGoogleDefaultCredentials(); } else if (*credentials_location == CredentialsLocation::USE_COMPUTE_ENGINE_CREDENTIALS) { - credentials_ = gcs::oauth2::CreateComputeEngineCredentials(); + credentials_ = google::cloud::MakeComputeEngineCredentials(); } else if (*credentials_location == CredentialsLocation::USE_JSON_FILE) { credentials_ = createCredentialsFromJsonPath(); } else if (*credentials_location == CredentialsLocation::USE_JSON_CONTENTS) { credentials_ = createCredentialsFromJsonContents(); } else if (*credentials_location == CredentialsLocation::USE_ANONYMOUS_CREDENTIALS) { - credentials_ = gcs::oauth2::CreateAnonymousCredentials(); + credentials_ = google::cloud::MakeInsecureCredentials(); } if (!credentials_) logger_->log_error("Couldn't create valid credentials"); diff --git a/extensions/gcp/controllerservices/GCPCredentialsControllerService.h b/extensions/gcp/controllerservices/GCPCredentialsControllerService.h index 20d7c65b21..4f5fc219b4 100644 --- a/extensions/gcp/controllerservices/GCPCredentialsControllerService.h +++ b/extensions/gcp/controllerservices/GCPCredentialsControllerService.h @@ -28,7 +28,7 @@ #include "core/PropertyDefinitionBuilder.h" #include "utils/Enum.h" -#include "google/cloud/storage/oauth2/credentials.h" +#include "google/cloud/credentials.h" namespace org::apache::nifi::minifi::extensions::gcp { enum class CredentialsLocation { @@ -113,12 +113,11 @@ class GCPCredentialsControllerService : public core::controller::ControllerServi [[nodiscard]] const auto& getCredentials() const { return credentials_; } protected: - [[nodiscard]] std::shared_ptr createDefaultCredentials() const; - [[nodiscard]] std::shared_ptr createCredentialsFromJsonPath() const; - [[nodiscard]] std::shared_ptr createCredentialsFromJsonContents() const; + [[nodiscard]] std::shared_ptr createCredentialsFromJsonPath() const; + [[nodiscard]] std::shared_ptr createCredentialsFromJsonContents() const; - std::shared_ptr credentials_; + std::shared_ptr credentials_; std::shared_ptr logger_ = core::logging::LoggerFactory::getLogger(uuid_); }; } // namespace org::apache::nifi::minifi::extensions::gcp diff --git a/extensions/gcp/processors/GCSProcessor.cpp b/extensions/gcp/processors/GCSProcessor.cpp index ded36f8a23..91d48d64a6 100644 --- a/extensions/gcp/processors/GCSProcessor.cpp +++ b/extensions/gcp/processors/GCSProcessor.cpp @@ -27,7 +27,7 @@ namespace gcs = ::google::cloud::storage; namespace org::apache::nifi::minifi::extensions::gcp { -std::shared_ptr GCSProcessor::getCredentials(core::ProcessContext& context) const { +std::shared_ptr GCSProcessor::getCredentials(core::ProcessContext& context) const { auto gcp_credentials_controller_service = utils::parseOptionalControllerService(context, GCSProcessor::GCPCredentials, getUUID()); if (gcp_credentials_controller_service) { return gcp_credentials_controller_service->getCredentials(); @@ -51,10 +51,14 @@ void GCSProcessor::onSchedule(core::ProcessContext& context, core::ProcessSessio } gcs::Client GCSProcessor::getClient() const { - auto options = gcs::ClientOptions(gcp_credentials_); - if (endpoint_url_) - options.set_endpoint(*endpoint_url_); - return gcs::Client(options, *retry_policy_); + auto options = google::cloud::Options{} + .set(gcp_credentials_) + .set(retry_policy_); + + if (endpoint_url_) { + options.set(*endpoint_url_); + } + return gcs::Client(options); } } // namespace org::apache::nifi::minifi::extensions::gcp diff --git a/extensions/gcp/processors/GCSProcessor.h b/extensions/gcp/processors/GCSProcessor.h index e3361a786c..1ec2b6641a 100644 --- a/extensions/gcp/processors/GCSProcessor.h +++ b/extensions/gcp/processors/GCSProcessor.h @@ -27,7 +27,7 @@ #include "minifi-cpp/core/PropertyDefinition.h" #include "core/PropertyDefinitionBuilder.h" #include "minifi-cpp/core/PropertyValidator.h" -#include "google/cloud/storage/oauth2/credentials.h" +#include "google/cloud/credentials.h" #include "google/cloud/storage/client.h" #include "google/cloud/storage/retry_policy.h" @@ -64,10 +64,10 @@ class GCSProcessor : public core::ProcessorImpl { protected: virtual google::cloud::storage::Client getClient() const; - std::shared_ptr getCredentials(core::ProcessContext& context) const; + std::shared_ptr getCredentials(core::ProcessContext& context) const; std::optional endpoint_url_; - std::shared_ptr gcp_credentials_; + std::shared_ptr gcp_credentials_; google::cloud::storage::RetryPolicyOption::Type retry_policy_ = std::make_shared(6); }; diff --git a/extensions/gcp/tests/GCPCredentialsControllerServiceTests.cpp b/extensions/gcp/tests/GCPCredentialsControllerServiceTests.cpp index 969865a746..6c33a402e6 100644 --- a/extensions/gcp/tests/GCPCredentialsControllerServiceTests.cpp +++ b/extensions/gcp/tests/GCPCredentialsControllerServiceTests.cpp @@ -80,13 +80,6 @@ class GCPCredentialsTests : public ::testing::Test { std::shared_ptr gcp_credentials_ = std::dynamic_pointer_cast(gcp_credentials_node_->getControllerServiceImplementation()); }; -TEST_F(GCPCredentialsTests, DefaultGCPCredentialsWithoutEnv) { - minifi::utils::Environment::unsetEnvironmentVariable("GOOGLE_APPLICATION_CREDENTIALS"); - plan_->setProperty(gcp_credentials_node_, GCPCredentialsControllerService::CredentialsLoc, magic_enum::enum_name(minifi_gcp::CredentialsLocation::USE_DEFAULT_CREDENTIALS)); - ASSERT_NO_THROW(test_controller_.runSession(plan_)); - EXPECT_EQ(nullptr, gcp_credentials_->getCredentials()); -} - TEST_F(GCPCredentialsTests, DefaultGCPCredentialsWithEnv) { auto temp_directory = test_controller_.createTempDirectory(); auto path = create_mock_json_file(temp_directory); @@ -113,6 +106,16 @@ TEST_F(GCPCredentialsTests, CredentialsFromJsonWithProperty) { EXPECT_NE(nullptr, gcp_credentials_->getCredentials()); } +TEST_F(GCPCredentialsTests, CredentialsFromJsonWithInvalidPath) { + auto temp_directory = test_controller_.createTempDirectory(); + auto path = create_mock_json_file(temp_directory); + ASSERT_TRUE(path.has_value()); + plan_->setProperty(gcp_credentials_node_, GCPCredentialsControllerService::CredentialsLoc, magic_enum::enum_name(minifi_gcp::CredentialsLocation::USE_JSON_FILE)); + plan_->setProperty(gcp_credentials_node_, GCPCredentialsControllerService::JsonFilePath, "/invalid/path/to/credentials.json"); + ASSERT_NO_THROW(test_controller_.runSession(plan_)); + EXPECT_EQ(nullptr, gcp_credentials_->getCredentials()); +} + TEST_F(GCPCredentialsTests, CredentialsFromComputeEngineVM) { plan_->setProperty(gcp_credentials_node_, GCPCredentialsControllerService::CredentialsLoc, magic_enum::enum_name(minifi_gcp::CredentialsLocation::USE_COMPUTE_ENGINE_CREDENTIALS)); ASSERT_NO_THROW(test_controller_.runSession(plan_)); From 18380fc7d3bd8328e58d7e4553cc944f4ae5b248 Mon Sep 17 00:00:00 2001 From: Martin Zink Date: Thu, 12 Feb 2026 17:12:58 +0100 Subject: [PATCH 2/3] MINIFICPP-2707 Catch exceptions during C2 Start/Stop commands Closes #2093 Signed-off-by: Marton Szasz --- libminifi/src/c2/C2Agent.cpp | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp index 7f5dc679ac..239c86ea71 100644 --- a/libminifi/src/c2/C2Agent.cpp +++ b/libminifi/src/c2/C2Agent.cpp @@ -890,21 +890,27 @@ void C2Agent::handle_start_stop(const C2ContentResponse& resp) { }); }; - if (lowered_response_name == "flow") { - executeStartStopOnComponent("FlowController"); - } else if (lowered_response_name == "processor") { - auto processor_id = resp.getStringArgument("processorId"); - if (processor_id) { - executeStartStopOnComponent(processor_id.value()); + auto update_state = state::UpdateState::FULLY_APPLIED; + try { + if (lowered_response_name == "flow") { + executeStartStopOnComponent("FlowController"); + } else if (lowered_response_name == "processor") { + if (const auto processor_id = resp.getStringArgument("processorId")) { + executeStartStopOnComponent(processor_id.value()); + } else { + update_state = state::UpdateState::NO_OPERATION; + logger_->log_warn("Processor start/stop request missing 'processorId' argument"); + } } else { - logger_->log_warn("Processor start/stop request missing 'processorId' argument"); + executeStartStopOnComponent(resp.name); } - } else { - executeStartStopOnComponent(resp.name); + } catch (const std::exception& err) { + update_state = state::UpdateState::NOT_APPLIED; + logger_->log_warn("Failed to execute StartStopOnComponent command due to \"{}\"", err.what()); } if (!resp.ident.empty()) { - C2Payload response(Operation::acknowledge, resp.ident, true); + C2Payload response(Operation::acknowledge, update_state, resp.ident, true); enqueue_c2_response(std::move(response)); } } From 9187c2efd5c26c7c7b654d9346de0959dc53610a Mon Sep 17 00:00:00 2001 From: Gabor Gyimesi Date: Thu, 12 Feb 2026 17:13:12 +0100 Subject: [PATCH 3/3] MINIFICPP-2709 Fix site to site receive using HTTP protocol Closes #2094 Signed-off-by: Marton Szasz --- core-framework/include/http/HTTPStream.h | 69 ++++++++----------- core-framework/src/http/HTTPStream.cpp | 34 ++++----- .../Minifi_flow_json_serializer.py | 1 + .../Minifi_flow_yaml_serializer.py | 1 + .../include/sitetosite/HttpSiteToSiteClient.h | 3 + .../include/sitetosite/SiteToSiteClient.h | 7 +- .../src/sitetosite/HttpSiteToSiteClient.cpp | 40 +++++++++-- libminifi/src/sitetosite/SiteToSiteClient.cpp | 40 +++++------ 8 files changed, 103 insertions(+), 92 deletions(-) diff --git a/core-framework/include/http/HTTPStream.h b/core-framework/include/http/HTTPStream.h index 7a513bd31d..cd3e08da25 100644 --- a/core-framework/include/http/HTTPStream.h +++ b/core-framework/include/http/HTTPStream.h @@ -48,54 +48,42 @@ class HttpStream : public io::BaseStreamImpl { } const std::shared_ptr& getClient() { - http_client_future_.get(); + if (http_client_read_future_.valid()) { + http_client_read_future_.get(); + } + if (http_client_write_future_.valid()) { + http_client_write_future_.get(); + } return http_client_; } void forceClose() { - if (started_) { - // lock shouldn't be needed here as call paths currently guarantee - // flow, but we should be safe anyway. - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); + if (read_started_ || write_started_) { close(); http_client_->forceClose(); - if (http_client_future_.valid()) { - http_client_future_.get(); - } else { - logger_->log_warn("Future status already cleared for {}, continuing", http_client_->getURL()); - } + read_started_ = false; + write_started_ = false; + } - started_ = false; + if (http_client_read_future_.valid()) { + http_client_read_future_.get(); + } + if (http_client_write_future_.valid()) { + http_client_write_future_.get(); } } - /** - * Skip to the specified offset. - * @param offset offset to which we will skip - */ void seek(size_t offset) override; [[nodiscard]] size_t tell() const override; - [[nodiscard]] size_t size() const override { - return written; - } + [[nodiscard]] size_t size() const override; using BaseStream::write; using BaseStream::read; - /** - * Reads data and places it into buf - * @param buf buffer in which we extract data - * @param buflen - */ size_t read(std::span buf) override; - - /** - * writes value to stream - * @param value value to write - * @param size size of value - */ size_t write(const uint8_t* value, size_t size) override; static bool submit_client(const std::shared_ptr& client) { @@ -114,19 +102,22 @@ class HttpStream : public io::BaseStreamImpl { } inline bool isFinished(int seconds = 0) { - return http_client_future_.wait_for(std::chrono::seconds(seconds)) == std::future_status::ready + if (!http_client_read_future_.valid()) { + return false; + } + return http_client_read_future_.wait_for(std::chrono::seconds(seconds)) == std::future_status::ready && getByteOutputReadCallback() && getByteOutputReadCallback()->getSize() == 0 && getByteOutputReadCallback()->waitingOps(); } - /** - * Waits for more data to become available. - */ bool waitForDataAvailable() { + if (!http_client_read_future_.valid()) { + return false; + } do { logger_->log_trace("Waiting for more data"); - } while (http_client_future_.wait_for(std::chrono::seconds(0)) != std::future_status::ready + } while (http_client_read_future_.wait_for(std::chrono::seconds(0)) != std::future_status::ready && getByteOutputReadCallback() && getByteOutputReadCallback()->getSize() == 0); @@ -135,16 +126,14 @@ class HttpStream : public io::BaseStreamImpl { } protected: - std::vector array; - std::shared_ptr http_client_; - std::future http_client_future_; - - size_t written{0}; + std::future http_client_read_future_; + std::future http_client_write_future_; std::mutex mutex_; - std::atomic started_{false}; + std::atomic read_started_{false}; + std::atomic write_started_{false}; private: utils::ByteOutputCallback* getByteOutputReadCallback() { diff --git a/core-framework/src/http/HTTPStream.cpp b/core-framework/src/http/HTTPStream.cpp index 61f076e456..69a0bbb7e4 100644 --- a/core-framework/src/http/HTTPStream.cpp +++ b/core-framework/src/http/HTTPStream.cpp @@ -40,30 +40,28 @@ void HttpStream::close() { } void HttpStream::seek(size_t /*offset*/) { - // seek is an unnecessary part of this implementation throw std::logic_error{"HttpStream::seek is unimplemented"}; } size_t HttpStream::tell() const { - // tell is an unnecessary part of this implementation throw std::logic_error{"HttpStream::tell is unimplemented"}; } -// data stream overrides +[[nodiscard]] size_t HttpStream::size() const { + throw std::logic_error{"HttpStream::size is unimplemented"}; +} size_t HttpStream::write(const uint8_t* value, size_t size) { if (size == 0) return 0; if (IsNullOrEmpty(value)) { return io::STREAM_ERROR; } - if (!started_) { - std::lock_guard lock(mutex_); - if (!started_) { - auto callback = std::make_unique(); - http_client_->setUploadCallback(std::move(callback)); - http_client_future_ = std::async(std::launch::async, submit_client, http_client_); - started_ = true; - } + std::lock_guard lock(mutex_); + if (!write_started_) { + auto callback = std::make_unique(); + http_client_->setUploadCallback(std::move(callback)); + http_client_write_future_ = std::async(std::launch::async, submit_client, http_client_); + write_started_ = true; } if (auto http_callback = dynamic_cast(http_client_->getUploadCallback())) http_callback->process(value, size); @@ -75,14 +73,12 @@ size_t HttpStream::write(const uint8_t* value, size_t size) { size_t HttpStream::read(std::span buf) { if (buf.empty()) { return 0; } if (!IsNullOrEmpty(buf)) { - if (!started_) { - std::lock_guard lock(mutex_); - if (!started_) { - auto read_callback = std::make_unique(66560, true); - http_client_future_ = std::async(std::launch::async, submit_read_client, http_client_, read_callback.get()); - http_client_->setReadCallback(std::move(read_callback)); - started_ = true; - } + std::lock_guard lock(mutex_); + if (!read_started_) { + auto read_callback = std::make_unique(66560, true); + http_client_read_future_ = std::async(std::launch::async, submit_read_client, http_client_, read_callback.get()); + http_client_->setReadCallback(std::move(read_callback)); + read_started_ = true; } return gsl::not_null(getByteOutputReadCallback())->readFully(reinterpret_cast(buf.data()), buf.size()); } else { diff --git a/docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py b/docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py index 4a64a982e1..7082d78209 100644 --- a/docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py +++ b/docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py @@ -133,6 +133,7 @@ def serialize_node(self, connectable, root, visited): 'targetUri': group.url, 'communicationsTimeout': '30 sec', 'yieldDuration': '3 sec', + 'transportProtocol': group.transport_protocol, 'outputPorts': [] } diff --git a/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py b/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py index 4c951a051e..c3e733d26e 100644 --- a/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py +++ b/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py @@ -136,6 +136,7 @@ def serialize_node(self, connectable, res=None, visited=None): 'url': group.url, 'timeout': '30 sec', 'yield period': '3 sec', + 'transport protocol': group.transport_protocol, 'Output Ports': [] } diff --git a/libminifi/include/sitetosite/HttpSiteToSiteClient.h b/libminifi/include/sitetosite/HttpSiteToSiteClient.h index 1d8446ae50..a235b6b9d9 100644 --- a/libminifi/include/sitetosite/HttpSiteToSiteClient.h +++ b/libminifi/include/sitetosite/HttpSiteToSiteClient.h @@ -77,6 +77,9 @@ class HttpSiteToSiteClient final : public SiteToSiteClient { void deleteTransaction(const utils::Identifier& transaction_id) override; void tearDown() override; + protected: + std::pair readFlowFiles(const std::shared_ptr& transaction, core::ProcessSession& session) override; + private: void setSiteToSiteHeaders(minifi::http::HTTPClient& client); void closeTransaction(const utils::Identifier &transaction_id); diff --git a/libminifi/include/sitetosite/SiteToSiteClient.h b/libminifi/include/sitetosite/SiteToSiteClient.h index ab4987943d..656f71e654 100644 --- a/libminifi/include/sitetosite/SiteToSiteClient.h +++ b/libminifi/include/sitetosite/SiteToSiteClient.h @@ -25,6 +25,7 @@ #include #include #include +#include #include "Peer.h" #include "SiteToSite.h" @@ -135,6 +136,7 @@ class SiteToSiteClient { virtual void deleteTransaction(const utils::Identifier &transaction_id); virtual std::optional readResponse(const std::shared_ptr &transaction); virtual bool writeResponse(const std::shared_ptr &transaction, const SiteToSiteResponse& response); + virtual std::pair readFlowFiles(const std::shared_ptr& transaction, core::ProcessSession& session); bool initializeSend(const std::shared_ptr& transaction); bool writeAttributesInSendTransaction(io::OutputStream& stream, const std::string& transaction_id_str, const std::map& attributes); @@ -187,9 +189,8 @@ class SiteToSiteClient { bool completeReceive(const std::shared_ptr& transaction, const utils::Identifier& transaction_id); bool completeSend(const std::shared_ptr& transaction, const utils::Identifier& transaction_id, core::ProcessContext& context); - bool readFlowFileHeaderData(io::InputStream& stream, const std::string& transaction_id, SiteToSiteClient::ReceiveFlowFileHeaderResult& result); - std::optional receiveFlowFileHeader(io::InputStream& stream, const std::shared_ptr& transaction); - std::pair readFlowFiles(const std::shared_ptr& transaction, core::ProcessSession& session); + std::expected readFlowFileHeaderData(io::InputStream& stream, const std::string& transaction_id, SiteToSiteClient::ReceiveFlowFileHeaderResult& result); + std::expected receiveFlowFileHeader(io::InputStream& stream, const std::shared_ptr& transaction); std::shared_ptr logger_{core::logging::LoggerFactory::getLogger()}; }; diff --git a/libminifi/src/sitetosite/HttpSiteToSiteClient.cpp b/libminifi/src/sitetosite/HttpSiteToSiteClient.cpp index 29ea227b47..aacc706099 100644 --- a/libminifi/src/sitetosite/HttpSiteToSiteClient.cpp +++ b/libminifi/src/sitetosite/HttpSiteToSiteClient.cpp @@ -141,7 +141,6 @@ std::shared_ptr HttpSiteToSiteClient::createTransaction(TransferDir } else { transaction_client = openConnectionForReceive(transaction); transaction->setDataAvailable(true); - // 201 tells us that data is available. 200 would mean that nothing is available. } gsl_Assert(transaction_client); @@ -352,13 +351,11 @@ void HttpSiteToSiteClient::closeTransaction(const utils::Identifier &transaction logger_->log_debug("Received {} response code from delete", client->getResponseCode()); - if (client->getResponseCode() == 400) { + if (client->getResponseCode() >= 400) { std::string error(client->getResponseBody().data(), client->getResponseBody().size()); - logger_->log_warn("400 received: {}", error); - std::stringstream message; - message << "Received " << client->getResponseCode() << " from " << uri.str(); - throw Exception(SITE2SITE_EXCEPTION, message.str()); + logger_->log_warn("{} received: {}", client->getResponseCode(), error); + throw Exception(SITE2SITE_EXCEPTION, fmt::format("Received {} from {}", client->getResponseCode(), uri.str())); } transaction->close(); @@ -388,4 +385,35 @@ void HttpSiteToSiteClient::setSiteToSiteHeaders(minifi::http::HTTPClient& client } } +std::pair HttpSiteToSiteClient::readFlowFiles(const std::shared_ptr& transaction, core::ProcessSession& session) { + auto http_stream = dynamic_cast(peer_->getStream()); + if (!http_stream) { + throw Exception(SITE2SITE_EXCEPTION, "Reading flow files failed: stream cannot be cast to HTTP stream"); + } + + std::pair read_result; + try { + read_result = SiteToSiteClient::readFlowFiles(transaction, session); + } catch (const Exception&) { + auto response_code = http_stream->getClientRef()->getResponseCode(); + + // 200 tells us that there is no content to read, so we should not treat it as an error. + // The read fails in this case because the stream does not contain a valid response body with the expected format. + // Unfortunately there is no way to get the response code before trying to read, so we have to let it fail and check the response code afterwards. + if (response_code == 200) { + logger_->log_debug("Response code 200, no content to read"); + transaction->setDataAvailable(false); + transaction->setState(TransactionState::TRANSACTION_CANCELED); + current_code_ = ResponseCode::CANCEL_TRANSACTION; + return {0, 0}; + } + throw; + } + + if (auto response_code = http_stream->getClientRef()->getResponseCode(); response_code >= 400) { + throw Exception(SITE2SITE_EXCEPTION, fmt::format("HTTP error code received while reading flow files: {}", response_code)); + } + return read_result; +} + } // namespace org::apache::nifi::minifi::sitetosite diff --git a/libminifi/src/sitetosite/SiteToSiteClient.cpp b/libminifi/src/sitetosite/SiteToSiteClient.cpp index e34c94858e..290097b409 100644 --- a/libminifi/src/sitetosite/SiteToSiteClient.cpp +++ b/libminifi/src/sitetosite/SiteToSiteClient.cpp @@ -559,11 +559,10 @@ bool SiteToSiteClient::sendPacket(const DataPacket& packet) { return true; } -bool SiteToSiteClient::readFlowFileHeaderData(io::InputStream& stream, const std::string& transaction_id_str, SiteToSiteClient::ReceiveFlowFileHeaderResult& result) { +std::expected SiteToSiteClient::readFlowFileHeaderData(io::InputStream& stream, const std::string& transaction_id_str, SiteToSiteClient::ReceiveFlowFileHeaderResult& result) { uint32_t num_attributes = 0; if (const auto ret = stream.read(num_attributes); ret == 0 || io::isError(ret) || num_attributes > MAX_NUM_ATTRIBUTES) { - logger_->log_error("Site2Site failed to read number of attributes with return code {}, or number of attributes is invalid: {}", ret, num_attributes); - return false; + return std::unexpected(fmt::format("Site2Site failed to read number of attributes with return code {}, or number of attributes is invalid: {}", ret, num_attributes)); } logger_->log_debug("Site2Site transaction {} receives {} attributes", transaction_id_str, num_attributes); @@ -571,13 +570,11 @@ bool SiteToSiteClient::readFlowFileHeaderData(io::InputStream& stream, const std std::string key; std::string value; if (const auto ret = stream.read(key, true); ret == 0 || io::isError(ret)) { - logger_->log_error("Site2Site transaction {} failed to read attribute key", transaction_id_str); - return false; + return std::unexpected(fmt::format("Site2Site transaction {} failed to read attribute key", transaction_id_str)); } if (const auto ret = stream.read(value, true); ret == 0 || io::isError(ret)) { - logger_->log_error("Site2Site transaction {} failed to read attribute value for key {}", transaction_id_str, key); - return false; + return std::unexpected(fmt::format("Site2Site transaction {} failed to read attribute value for key {}", transaction_id_str, key)); } result.attributes[key] = value; @@ -586,32 +583,29 @@ bool SiteToSiteClient::readFlowFileHeaderData(io::InputStream& stream, const std uint64_t len = 0; if (const auto ret = stream.read(len); ret == 0 || io::isError(ret)) { - logger_->log_error("Site2Site transaction {} failed to read flow file data size", transaction_id_str); - return false; + return std::unexpected(fmt::format("Site2Site transaction {} failed to read flow file data size", transaction_id_str)); } result.flow_file_data_size = len; - return true; + return {}; } -std::optional SiteToSiteClient::receiveFlowFileHeader(io::InputStream& stream, const std::shared_ptr& transaction) { +std::expected SiteToSiteClient::receiveFlowFileHeader(io::InputStream& stream, const std::shared_ptr& transaction) { if (peer_state_ != PeerState::READY) { bootstrap(); } if (peer_state_ != PeerState::READY) { - return std::nullopt; + return std::unexpected("Peer state is not ready"); } const auto transaction_id_str = transaction->getUUIDStr(); if (transaction->getState() != TransactionState::TRANSACTION_STARTED && transaction->getState() != TransactionState::DATA_EXCHANGED) { - logger_->log_warn("Site2Site transaction {} is not at started or exchanged state", transaction_id_str); - return std::nullopt; + return std::unexpected(fmt::format("Site2Site transaction {} is not at started or exchanged state", transaction_id_str)); } if (transaction->getDirection() != TransferDirection::RECEIVE) { - logger_->log_warn("Site2Site transaction {} direction is wrong", transaction_id_str); - return std::nullopt; + return std::unexpected(fmt::format("Site2Site transaction {} direction is wrong", transaction_id_str)); } ReceiveFlowFileHeaderResult result; @@ -624,7 +618,7 @@ std::optional SiteToSiteClient::r // if we already have transferred a flow file before, check to see whether another one is available auto response = readResponse(transaction); if (!response) { - return std::nullopt; + return std::unexpected("Failed to read response"); } if (response->code == ResponseCode::CONTINUE_TRANSACTION) { logger_->log_debug("Site2Site transaction {} peer indicate continue transaction", transaction_id_str); @@ -635,8 +629,7 @@ std::optional SiteToSiteClient::r result.eof = true; return result; } else { - logger_->log_debug("Site2Site transaction {} peer indicate wrong response code {}", transaction_id_str, magic_enum::enum_underlying(response->code)); - return std::nullopt; + return std::unexpected(fmt::format("Site2Site transaction {} peer indicate wrong response code {}", transaction_id_str, magic_enum::enum_underlying(response->code))); } } @@ -646,9 +639,8 @@ std::optional SiteToSiteClient::r return result; } - if (!readFlowFileHeaderData(stream, transaction_id_str, result)) { - logger_->log_error("Site2Site transaction {} failed to read flow file header data", transaction_id_str); - return std::nullopt; + if (auto ret = readFlowFileHeaderData(stream, transaction_id_str, result); !ret.has_value()) { + return std::unexpected(fmt::format("Site2Site transaction {} failed to read flow file header data: {}", transaction_id_str, ret.error())); } if (result.flow_file_data_size > 0 || !result.attributes.empty()) { @@ -679,14 +671,14 @@ std::pair SiteToSiteClient::readFlowFiles(const std::shared_ compression_stream = std::make_unique(transaction->getStream()); compression_wrapper_crc_stream = std::make_unique>(gsl::make_not_null(compression_stream.get())); } - io::InputStream& stream = use_compression_ ? static_cast(*compression_wrapper_crc_stream) : static_cast(transaction->getStream()); + io::InputStream& stream = use_compression_ ? static_cast(*compression_wrapper_crc_stream) : transaction->getStream(); while (true) { auto start_time = std::chrono::steady_clock::now(); auto receive_header_result = receiveFlowFileHeader(stream, transaction); if (!receive_header_result) { - throw Exception(SITE2SITE_EXCEPTION, "Receive Failed " + transaction->getUUIDStr()); + throw Exception(SITE2SITE_EXCEPTION, fmt::format("Receive Failed for {}: {}", transaction->getUUIDStr(), receive_header_result.error())); } if (receive_header_result->eof) {