Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmake/GoogleCloudCpp.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ set(GOOGLE_CLOUD_CPP_ENABLE_MACOS_OPENSSL_CHECK OFF CACHE INTERNAL macos-openssl
set(BUILD_TESTING OFF CACHE INTERNAL testing-off)
set(GOOGLE_CLOUD_CPP_ENABLE_WERROR OFF CACHE INTERNAL warnings-off)
FetchContent_Declare(google-cloud-cpp
URL https://github.com/googleapis/google-cloud-cpp/archive/refs/tags/v2.38.0.tar.gz
URL_HASH SHA256=f1493b2dce9b379714342f2be7ccb483d70d13aac09d4a90ae3b4756693b72fc
URL https://github.com/googleapis/google-cloud-cpp/archive/refs/tags/v2.45.0.tar.gz
URL_HASH SHA256=3d1b5eb696832f9071bf7ef0b3f0c9fd27c1a39d5edcb8a9976c65193319fd01
PATCH_COMMAND "${PC}"
SYSTEM)
if (WIN32)
Expand Down
69 changes: 29 additions & 40 deletions core-framework/include/http/HTTPStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,54 +48,42 @@ class HttpStream : public io::BaseStreamImpl {
}

const std::shared_ptr<HTTPClient>& getClient() {
http_client_future_.get();
if (http_client_read_future_.valid()) {
http_client_read_future_.get();
}
if (http_client_write_future_.valid()) {
http_client_write_future_.get();
}
return http_client_;
}

void forceClose() {
if (started_) {
// lock shouldn't be needed here as call paths currently guarantee
// flow, but we should be safe anyway.
std::lock_guard<std::mutex> lock(mutex_);
std::lock_guard<std::mutex> lock(mutex_);
if (read_started_ || write_started_) {
close();
http_client_->forceClose();
if (http_client_future_.valid()) {
http_client_future_.get();
} else {
logger_->log_warn("Future status already cleared for {}, continuing", http_client_->getURL());
}
read_started_ = false;
write_started_ = false;
}

started_ = false;
if (http_client_read_future_.valid()) {
http_client_read_future_.get();
}
if (http_client_write_future_.valid()) {
http_client_write_future_.get();
}
}

/**
* Skip to the specified offset.
* @param offset offset to which we will skip
*/
void seek(size_t offset) override;

[[nodiscard]] size_t tell() const override;

[[nodiscard]] size_t size() const override {
return written;
}
[[nodiscard]] size_t size() const override;

using BaseStream::write;
using BaseStream::read;

/**
* Reads data and places it into buf
* @param buf buffer in which we extract data
* @param buflen
*/
size_t read(std::span<std::byte> buf) override;

/**
* writes value to stream
* @param value value to write
* @param size size of value
*/
size_t write(const uint8_t* value, size_t size) override;

static bool submit_client(const std::shared_ptr<HTTPClient>& client) {
Expand All @@ -114,19 +102,22 @@ class HttpStream : public io::BaseStreamImpl {
}

inline bool isFinished(int seconds = 0) {
return http_client_future_.wait_for(std::chrono::seconds(seconds)) == std::future_status::ready
if (!http_client_read_future_.valid()) {
return false;
}
return http_client_read_future_.wait_for(std::chrono::seconds(seconds)) == std::future_status::ready
&& getByteOutputReadCallback()
&& getByteOutputReadCallback()->getSize() == 0
&& getByteOutputReadCallback()->waitingOps();
}

/**
* Waits for more data to become available.
*/
bool waitForDataAvailable() {
if (!http_client_read_future_.valid()) {
return false;
}
do {
logger_->log_trace("Waiting for more data");
} while (http_client_future_.wait_for(std::chrono::seconds(0)) != std::future_status::ready
} while (http_client_read_future_.wait_for(std::chrono::seconds(0)) != std::future_status::ready
&& getByteOutputReadCallback()
&& getByteOutputReadCallback()->getSize() == 0);

Expand All @@ -135,16 +126,14 @@ class HttpStream : public io::BaseStreamImpl {
}

protected:
std::vector<uint8_t> array;

std::shared_ptr<HTTPClient> http_client_;
std::future<bool> http_client_future_;

size_t written{0};
std::future<bool> http_client_read_future_;
std::future<bool> http_client_write_future_;

std::mutex mutex_;

std::atomic<bool> started_{false};
std::atomic<bool> read_started_{false};
std::atomic<bool> write_started_{false};

private:
utils::ByteOutputCallback* getByteOutputReadCallback() {
Expand Down
34 changes: 15 additions & 19 deletions core-framework/src/http/HTTPStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,30 +40,28 @@ void HttpStream::close() {
}

void HttpStream::seek(size_t /*offset*/) {
// seek is an unnecessary part of this implementation
throw std::logic_error{"HttpStream::seek is unimplemented"};
}

size_t HttpStream::tell() const {
// tell is an unnecessary part of this implementation
throw std::logic_error{"HttpStream::tell is unimplemented"};
}

// data stream overrides
[[nodiscard]] size_t HttpStream::size() const {
throw std::logic_error{"HttpStream::size is unimplemented"};
}

size_t HttpStream::write(const uint8_t* value, size_t size) {
if (size == 0) return 0;
if (IsNullOrEmpty(value)) {
return io::STREAM_ERROR;
}
if (!started_) {
std::lock_guard<std::mutex> lock(mutex_);
if (!started_) {
auto callback = std::make_unique<HttpStreamingCallback>();
http_client_->setUploadCallback(std::move(callback));
http_client_future_ = std::async(std::launch::async, submit_client, http_client_);
started_ = true;
}
std::lock_guard<std::mutex> lock(mutex_);
if (!write_started_) {
auto callback = std::make_unique<HttpStreamingCallback>();
http_client_->setUploadCallback(std::move(callback));
http_client_write_future_ = std::async(std::launch::async, submit_client, http_client_);
write_started_ = true;
}
if (auto http_callback = dynamic_cast<HttpStreamingCallback*>(http_client_->getUploadCallback()))
http_callback->process(value, size);
Expand All @@ -75,14 +73,12 @@ size_t HttpStream::write(const uint8_t* value, size_t size) {
size_t HttpStream::read(std::span<std::byte> buf) {
if (buf.empty()) { return 0; }
if (!IsNullOrEmpty(buf)) {
if (!started_) {
std::lock_guard<std::mutex> lock(mutex_);
if (!started_) {
auto read_callback = std::make_unique<HTTPReadByteOutputCallback>(66560, true);
http_client_future_ = std::async(std::launch::async, submit_read_client, http_client_, read_callback.get());
http_client_->setReadCallback(std::move(read_callback));
started_ = true;
}
std::lock_guard<std::mutex> lock(mutex_);
if (!read_started_) {
auto read_callback = std::make_unique<HTTPReadByteOutputCallback>(66560, true);
http_client_read_future_ = std::async(std::launch::async, submit_read_client, http_client_, read_callback.get());
http_client_->setReadCallback(std::move(read_callback));
read_started_ = true;
}
return gsl::not_null(getByteOutputReadCallback())->readFully(reinterpret_cast<char*>(buf.data()), buf.size());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ def serialize_node(self, connectable, root, visited):
'targetUri': group.url,
'communicationsTimeout': '30 sec',
'yieldDuration': '3 sec',
'transportProtocol': group.transport_protocol,
'outputPorts': []
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ def serialize_node(self, connectable, res=None, visited=None):
'url': group.url,
'timeout': '30 sec',
'yield period': '3 sec',
'transport protocol': group.transport_protocol,
'Output Ports': []
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,52 +21,37 @@
#include "core/Resource.h"
#include "google/cloud/storage/client.h"
#include "utils/ProcessorConfigUtils.h"

namespace gcs = ::google::cloud::storage;
#include "utils/file/FileUtils.h"

namespace org::apache::nifi::minifi::extensions::gcp {

void GCPCredentialsControllerService::initialize() {
setSupportedProperties(Properties);
}

std::shared_ptr<gcs::oauth2::Credentials> GCPCredentialsControllerService::createDefaultCredentials() const {
auto default_credentials = gcs::oauth2::CreateServiceAccountCredentialsFromDefaultPaths();
if (!default_credentials.ok()) {
logger_->log_error("{}", default_credentials.status().message());
return nullptr;
}
return *default_credentials;
}

std::shared_ptr<gcs::oauth2::Credentials> GCPCredentialsControllerService::createCredentialsFromJsonPath() const {
std::shared_ptr<google::cloud::Credentials> GCPCredentialsControllerService::createCredentialsFromJsonPath() const {
const auto json_path = getProperty(JsonFilePath.name);
if (!json_path) {
logger_->log_error("Missing or invalid {}", JsonFilePath.name);
return nullptr;
}

auto json_path_credentials = gcs::oauth2::CreateServiceAccountCredentialsFromJsonFilePath(*json_path);
if (!json_path_credentials.ok()) {
logger_->log_error("{}", json_path_credentials.status().message());
if (!utils::file::exists(*json_path)) {
logger_->log_error("JSON file for GCP credentials '{}' does not exist", *json_path);
return nullptr;
}
return *json_path_credentials;

return google::cloud::MakeServiceAccountCredentials(utils::file::get_content(*json_path));
}

std::shared_ptr<gcs::oauth2::Credentials> GCPCredentialsControllerService::createCredentialsFromJsonContents() const {
std::shared_ptr<google::cloud::Credentials> GCPCredentialsControllerService::createCredentialsFromJsonContents() const {
auto json_contents = getProperty(JsonContents.name);
if (!json_contents) {
logger_->log_error("Missing or invalid {}", JsonContents.name);
return nullptr;
}

auto json_path_credentials = gcs::oauth2::CreateServiceAccountCredentialsFromJsonContents(*json_contents);
if (!json_path_credentials.ok()) {
logger_->log_error("{}", json_path_credentials.status().message());
return nullptr;
}
return *json_path_credentials;
return google::cloud::MakeServiceAccountCredentials(*json_contents);
}

void GCPCredentialsControllerService::onEnable() {
Expand All @@ -79,15 +64,15 @@ void GCPCredentialsControllerService::onEnable() {
credentials_location = CredentialsLocation::USE_DEFAULT_CREDENTIALS;
}
if (*credentials_location == CredentialsLocation::USE_DEFAULT_CREDENTIALS) {
credentials_ = createDefaultCredentials();
credentials_ = google::cloud::MakeGoogleDefaultCredentials();
} else if (*credentials_location == CredentialsLocation::USE_COMPUTE_ENGINE_CREDENTIALS) {
credentials_ = gcs::oauth2::CreateComputeEngineCredentials();
credentials_ = google::cloud::MakeComputeEngineCredentials();
} else if (*credentials_location == CredentialsLocation::USE_JSON_FILE) {
credentials_ = createCredentialsFromJsonPath();
} else if (*credentials_location == CredentialsLocation::USE_JSON_CONTENTS) {
credentials_ = createCredentialsFromJsonContents();
} else if (*credentials_location == CredentialsLocation::USE_ANONYMOUS_CREDENTIALS) {
credentials_ = gcs::oauth2::CreateAnonymousCredentials();
credentials_ = google::cloud::MakeInsecureCredentials();
}
if (!credentials_)
logger_->log_error("Couldn't create valid credentials");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
#include "core/PropertyDefinitionBuilder.h"
#include "utils/Enum.h"

#include "google/cloud/storage/oauth2/credentials.h"
#include "google/cloud/credentials.h"

namespace org::apache::nifi::minifi::extensions::gcp {
enum class CredentialsLocation {
Expand Down Expand Up @@ -113,12 +113,11 @@ class GCPCredentialsControllerService : public core::controller::ControllerServi
[[nodiscard]] const auto& getCredentials() const { return credentials_; }

protected:
[[nodiscard]] std::shared_ptr<google::cloud::storage::oauth2::Credentials> createDefaultCredentials() const;
[[nodiscard]] std::shared_ptr<google::cloud::storage::oauth2::Credentials> createCredentialsFromJsonPath() const;
[[nodiscard]] std::shared_ptr<google::cloud::storage::oauth2::Credentials> createCredentialsFromJsonContents() const;
[[nodiscard]] std::shared_ptr<google::cloud::Credentials> createCredentialsFromJsonPath() const;
[[nodiscard]] std::shared_ptr<google::cloud::Credentials> createCredentialsFromJsonContents() const;


std::shared_ptr<google::cloud::storage::oauth2::Credentials> credentials_;
std::shared_ptr<google::cloud::Credentials> credentials_;
std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<GCPCredentialsControllerService>::getLogger(uuid_);
};
} // namespace org::apache::nifi::minifi::extensions::gcp
14 changes: 9 additions & 5 deletions extensions/gcp/processors/GCSProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ namespace gcs = ::google::cloud::storage;

namespace org::apache::nifi::minifi::extensions::gcp {

std::shared_ptr<google::cloud::storage::oauth2::Credentials> GCSProcessor::getCredentials(core::ProcessContext& context) const {
std::shared_ptr<google::cloud::Credentials> GCSProcessor::getCredentials(core::ProcessContext& context) const {
auto gcp_credentials_controller_service = utils::parseOptionalControllerService<GCPCredentialsControllerService>(context, GCSProcessor::GCPCredentials, getUUID());
if (gcp_credentials_controller_service) {
return gcp_credentials_controller_service->getCredentials();
Expand All @@ -51,10 +51,14 @@ void GCSProcessor::onSchedule(core::ProcessContext& context, core::ProcessSessio
}

gcs::Client GCSProcessor::getClient() const {
auto options = gcs::ClientOptions(gcp_credentials_);
if (endpoint_url_)
options.set_endpoint(*endpoint_url_);
return gcs::Client(options, *retry_policy_);
auto options = google::cloud::Options{}
.set<google::cloud::UnifiedCredentialsOption>(gcp_credentials_)
.set<google::cloud::storage::RetryPolicyOption>(retry_policy_);

if (endpoint_url_) {
options.set<gcs::RestEndpointOption>(*endpoint_url_);
}
return gcs::Client(options);
}

} // namespace org::apache::nifi::minifi::extensions::gcp
6 changes: 3 additions & 3 deletions extensions/gcp/processors/GCSProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
#include "minifi-cpp/core/PropertyDefinition.h"
#include "core/PropertyDefinitionBuilder.h"
#include "minifi-cpp/core/PropertyValidator.h"
#include "google/cloud/storage/oauth2/credentials.h"
#include "google/cloud/credentials.h"
#include "google/cloud/storage/client.h"
#include "google/cloud/storage/retry_policy.h"

Expand Down Expand Up @@ -64,10 +64,10 @@ class GCSProcessor : public core::ProcessorImpl {

protected:
virtual google::cloud::storage::Client getClient() const;
std::shared_ptr<google::cloud::storage::oauth2::Credentials> getCredentials(core::ProcessContext& context) const;
std::shared_ptr<google::cloud::Credentials> getCredentials(core::ProcessContext& context) const;

std::optional<std::string> endpoint_url_;
std::shared_ptr<google::cloud::storage::oauth2::Credentials> gcp_credentials_;
std::shared_ptr<google::cloud::Credentials> gcp_credentials_;
google::cloud::storage::RetryPolicyOption::Type retry_policy_ = std::make_shared<google::cloud::storage::LimitedErrorCountRetryPolicy>(6);
};

Expand Down
17 changes: 10 additions & 7 deletions extensions/gcp/tests/GCPCredentialsControllerServiceTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,6 @@ class GCPCredentialsTests : public ::testing::Test {
std::shared_ptr<GCPCredentialsControllerService> gcp_credentials_ = std::dynamic_pointer_cast<GCPCredentialsControllerService>(gcp_credentials_node_->getControllerServiceImplementation());
};

TEST_F(GCPCredentialsTests, DefaultGCPCredentialsWithoutEnv) {
minifi::utils::Environment::unsetEnvironmentVariable("GOOGLE_APPLICATION_CREDENTIALS");
plan_->setProperty(gcp_credentials_node_, GCPCredentialsControllerService::CredentialsLoc, magic_enum::enum_name(minifi_gcp::CredentialsLocation::USE_DEFAULT_CREDENTIALS));
ASSERT_NO_THROW(test_controller_.runSession(plan_));
EXPECT_EQ(nullptr, gcp_credentials_->getCredentials());
}

TEST_F(GCPCredentialsTests, DefaultGCPCredentialsWithEnv) {
auto temp_directory = test_controller_.createTempDirectory();
auto path = create_mock_json_file(temp_directory);
Expand All @@ -113,6 +106,16 @@ TEST_F(GCPCredentialsTests, CredentialsFromJsonWithProperty) {
EXPECT_NE(nullptr, gcp_credentials_->getCredentials());
}

TEST_F(GCPCredentialsTests, CredentialsFromJsonWithInvalidPath) {
auto temp_directory = test_controller_.createTempDirectory();
auto path = create_mock_json_file(temp_directory);
ASSERT_TRUE(path.has_value());
plan_->setProperty(gcp_credentials_node_, GCPCredentialsControllerService::CredentialsLoc, magic_enum::enum_name(minifi_gcp::CredentialsLocation::USE_JSON_FILE));
plan_->setProperty(gcp_credentials_node_, GCPCredentialsControllerService::JsonFilePath, "/invalid/path/to/credentials.json");
ASSERT_NO_THROW(test_controller_.runSession(plan_));
EXPECT_EQ(nullptr, gcp_credentials_->getCredentials());
}

TEST_F(GCPCredentialsTests, CredentialsFromComputeEngineVM) {
plan_->setProperty(gcp_credentials_node_, GCPCredentialsControllerService::CredentialsLoc, magic_enum::enum_name(minifi_gcp::CredentialsLocation::USE_COMPUTE_ENGINE_CREDENTIALS));
ASSERT_NO_THROW(test_controller_.runSession(plan_));
Expand Down
Loading
Loading