Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 61 additions & 68 deletions extensions/azure/tests/ListAzureBlobStorageTests.cpp

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ TEST_CASE("Test ControllerServicesMap", "[cs1]") {
auto service = std::make_shared<core::controller::ControllerService>("", utils::Identifier{}, std::make_unique<MockControllerService>());
auto testNode = std::make_shared<core::controller::StandardControllerServiceNode>(service, "ID", std::make_shared<minifi::ConfigureImpl>());

map.put("ID", testNode);
map.put("ID", testNode, nullptr);
REQUIRE(1 == map.getAllControllerServices().size());

REQUIRE(nullptr != map.get("ID"));

REQUIRE(false== map.put("", testNode));
REQUIRE(false== map.put("", nullptr));
REQUIRE(false== map.put("", testNode, nullptr));
REQUIRE(false== map.put("", nullptr, nullptr));

// ensure the pointer is the same

Expand Down
2 changes: 1 addition & 1 deletion libminifi/include/core/FlowConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class FlowConfiguration : public CoreComponentImpl {
static std::unique_ptr<core::ProcessGroup> createSimpleProcessGroup(const std::string &name, const utils::Identifier &uuid, int version);
static std::unique_ptr<core::ProcessGroup> createRemoteProcessGroup(const std::string &name, const utils::Identifier &uuid);

std::shared_ptr<core::controller::ControllerServiceNode> createControllerService(const std::string &class_name, const std::string &name, const utils::Identifier &uuid);
std::shared_ptr<core::controller::ControllerServiceNode> createControllerService(const std::string &class_name, const std::string &name, const utils::Identifier &uuid, ProcessGroup* parent);

// Create Connection
[[nodiscard]] std::unique_ptr<minifi::Connection> createConnection(const std::string &name, const utils::Identifier &uuid) const;
Expand Down
2 changes: 1 addition & 1 deletion libminifi/include/core/ProcessContextImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ class ProcessContextImpl : public core::VariableRegistryImpl, public virtual Pro

/* Function to help creating a state storage */
auto create_provider = [&](const std::string& type, const std::unordered_map<std::string, std::string>& extraProperties) -> std::shared_ptr<core::StateStorage> {
auto new_node = controller_service_provider->createControllerService(type, DefaultStateStorageName);
auto new_node = controller_service_provider->createControllerService(type, DefaultStateStorageName, nullptr, std::nullopt);
if (new_node == nullptr) { return nullptr; }
new_node->initialize();
auto storage = new_node->getControllerServiceImplementation();
Expand Down
19 changes: 13 additions & 6 deletions libminifi/include/core/controller/ControllerServiceNodeMap.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,25 @@ class ControllerServiceNodeMap {
ControllerServiceNode* get(const std::string &id) const;
ControllerServiceNode* get(const std::string &id, const utils::Identifier &processor_or_controller_uuid) const;

bool put(const std::string &id, const std::shared_ptr<ControllerServiceNode> &serviceNode);
bool put(const std::string &id, ProcessGroup* process_group);
bool put(std::string id, std::shared_ptr<ControllerServiceNode> controller_service_node, ProcessGroup* parent_group);

bool registerAlternativeKey(std::string primary_key, std::string alternative_key);

void clear();
std::vector<std::shared_ptr<ControllerServiceNode>> getAllControllerServices() const;

protected:
mutable std::mutex mutex_;
// Map of controller service id to the controller service node
std::map<std::string, std::shared_ptr<ControllerServiceNode>> controller_service_nodes_;
// Map of controller service id to the process group that contains it
std::map<std::string, gsl::not_null<ProcessGroup*>> process_groups_;

struct ServiceEntry {
std::shared_ptr<ControllerServiceNode> controller_service_node;
ProcessGroup* parent_group;
};

const ServiceEntry* getEntry(std::string_view primary_key, const std::scoped_lock<std::mutex>& mutex) const;

std::map<std::string, ServiceEntry, std::less<>> services_;
std::map<std::string, std::string, std::less<>> alternative_keys;
};

} // namespace controller
Expand Down
10 changes: 8 additions & 2 deletions libminifi/include/core/controller/ControllerServiceProvider.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ class ControllerServiceProvider : public CoreComponentImpl, public ConfigurableC
return controller_map_->get(id, processor_or_controller_uuid);
}

virtual std::shared_ptr<ControllerServiceNode> createControllerService(const std::string &type, const std::string &id) = 0;
virtual std::shared_ptr<ControllerServiceNode> createControllerService(const std::string &type,
const std::string &id,
ProcessGroup* parent,
const std::optional<std::string>& alternative_key) = 0;

virtual void clearControllerServices() = 0;

Expand All @@ -83,7 +86,10 @@ class ControllerServiceProvider : public CoreComponentImpl, public ConfigurableC
std::shared_ptr<ControllerService> getControllerService(const std::string &identifier) const override;
std::shared_ptr<ControllerService> getControllerService(const std::string &identifier, const utils::Identifier &processor_uuid) const override;

virtual void putControllerServiceNode(const std::string& identifier, const std::shared_ptr<ControllerServiceNode>& controller_service_node, ProcessGroup* process_group);
virtual void putControllerServiceNode(const std::string& primary_key,
const std::shared_ptr<ControllerServiceNode>& controller_service_node,
ProcessGroup* process_group,
const std::optional<std::string>& alternative_key);

bool supportsDynamicProperties() const final {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ class ForwardingControllerServiceProvider : public ControllerServiceProvider {
public:
using ControllerServiceProvider::ControllerServiceProvider;

std::shared_ptr<ControllerServiceNode> createControllerService(const std::string &type, const std::string &id) override {
return controller_service_provider_impl_->createControllerService(type, id);
std::shared_ptr<ControllerServiceNode> createControllerService(const std::string &type, const std::string &id, ProcessGroup* parent, const std::optional<std::string>& alternative_key) override {
return controller_service_provider_impl_->createControllerService(type, id, parent, alternative_key);
}

ControllerServiceNode* getControllerServiceNode(const std::string &id) const override {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ class StandardControllerServiceProvider : public ControllerServiceProvider {
stopEnableRetryThread();
}

std::shared_ptr<ControllerServiceNode> createControllerService(const std::string& type, const std::string& id) override;
std::shared_ptr<ControllerServiceNode> createControllerService(const std::string& type,
const std::string& id,
ProcessGroup* parent_group,
const std::optional<std::string>& alternative_key) override;
void enableAllControllerServices() override;
void disableAllControllerServices() override;
void clearControllerServices() override;
Expand Down
4 changes: 2 additions & 2 deletions libminifi/src/core/FlowConfiguration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,8 @@ std::unique_ptr<minifi::Connection> FlowConfiguration::createConnection(const st
}

std::shared_ptr<core::controller::ControllerServiceNode> FlowConfiguration::createControllerService(const std::string &class_name, const std::string &name,
const utils::Identifier& uuid) {
std::shared_ptr<core::controller::ControllerServiceNode> controllerServicesNode = service_provider_->createControllerService(class_name, name);
const utils::Identifier& uuid, ProcessGroup* parent) {
std::shared_ptr<core::controller::ControllerServiceNode> controllerServicesNode = service_provider_->createControllerService(class_name, name, parent, uuid.to_string());
if (nullptr != controllerServicesNode)
controllerServicesNode->setUUID(uuid);
return controllerServicesNode;
Expand Down
2 changes: 1 addition & 1 deletion libminifi/src/core/ProcessGroup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ Processor* ProcessGroup::findProcessorByName(const std::string &processorName, T
}

void ProcessGroup::addControllerService(const std::string &nodeId, const std::shared_ptr<core::controller::ControllerServiceNode> &node) {
controller_service_map_.put(nodeId, node);
controller_service_map_.put(nodeId, node, this);
}

core::controller::ControllerServiceNode* ProcessGroup::findControllerService(const std::string &nodeId, Traverse traverse) const {
Expand Down
100 changes: 56 additions & 44 deletions libminifi/src/core/controller/ControllerServiceNodeMap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,81 +17,93 @@
*/

#include "core/controller/ControllerServiceNodeMap.h"

#include <ranges>

#include "core/ProcessGroup.h"

namespace org::apache::nifi::minifi::core::controller {

ControllerServiceNode* ControllerServiceNodeMap::get(const std::string &id) const {
std::lock_guard<std::mutex> lock(mutex_);
auto exists = controller_service_nodes_.find(id);
if (exists != controller_service_nodes_.end())
return exists->second.get();
else
return nullptr;
const std::scoped_lock lock(mutex_);
if (const auto entry = getEntry(id, lock)) {
return entry->controller_service_node.get();
}
return nullptr;
}

ControllerServiceNode* ControllerServiceNodeMap::get(const std::string &id, const utils::Identifier& processor_or_controller_uuid) const {
std::lock_guard<std::mutex> lock(mutex_);
ControllerServiceNode* controller = nullptr;
auto exists = controller_service_nodes_.find(id);
if (exists != controller_service_nodes_.end()) {
controller = exists->second.get();
} else {
const std::scoped_lock lock(mutex_);
const auto entry = getEntry(id, lock);
if (!entry || !entry->parent_group) {
return nullptr;
}

auto process_group_of_controller_exists = process_groups_.find(id);
ProcessGroup* process_group = nullptr;
if (process_group_of_controller_exists != process_groups_.end()) {
process_group = process_group_of_controller_exists->second;
} else {
return nullptr;
}

if (process_group->findProcessorById(processor_or_controller_uuid, ProcessGroup::Traverse::IncludeChildren)) {
return controller;
if (entry->parent_group->findProcessorById(processor_or_controller_uuid, ProcessGroup::Traverse::IncludeChildren)) {
return entry->controller_service_node.get();
}

if (process_group->findControllerService(processor_or_controller_uuid.to_string(), ProcessGroup::Traverse::IncludeChildren)) {
return controller;
if (entry->parent_group->findControllerService(processor_or_controller_uuid.to_string(), ProcessGroup::Traverse::IncludeChildren)) {
return entry->controller_service_node.get();
}

return nullptr;
}

bool ControllerServiceNodeMap::put(const std::string &id, const std::shared_ptr<ControllerServiceNode> &serviceNode) {
if (id.empty() || serviceNode == nullptr)
bool ControllerServiceNodeMap::put(std::string id, std::shared_ptr<ControllerServiceNode> controller_service_node,
ProcessGroup* parent_group) {
std::scoped_lock lock(mutex_);
if (id.empty() || controller_service_node == nullptr || alternative_keys.contains(id)) {
return false;
std::lock_guard<std::mutex> lock(mutex_);
controller_service_nodes_[id] = serviceNode;
return true;
}
auto [_it, success] = services_.emplace(std::move(id), ServiceEntry{.controller_service_node = std::move(controller_service_node), .parent_group = parent_group});
return success;
}

bool ControllerServiceNodeMap::put(const std::string &id, ProcessGroup* process_group) {
if (id.empty() || process_group == nullptr)
return false;
std::lock_guard<std::mutex> lock(mutex_);
process_groups_.emplace(id, gsl::make_not_null(process_group));
return true;
}

void ControllerServiceNodeMap::clear() {
std::lock_guard<std::mutex> lock(mutex_);
for (const auto& [id, node] : controller_service_nodes_) {
node->disable();
std::scoped_lock lock(mutex_);
for (const auto& node: services_ | std::views::values) {
node.controller_service_node->disable();
}
controller_service_nodes_.clear();
process_groups_.clear();
services_.clear();
alternative_keys.clear();
}

std::vector<std::shared_ptr<ControllerServiceNode>> ControllerServiceNodeMap::getAllControllerServices() const {
std::lock_guard<std::mutex> lock(mutex_);
std::scoped_lock lock(mutex_);
std::vector<std::shared_ptr<ControllerServiceNode>> services;
services.reserve(controller_service_nodes_.size());
for (const auto& [id, node] : controller_service_nodes_) {
services.push_back(node);
services.reserve(services_.size());
for (const auto& [controller_service_node, _parent_group]: services_ | std::views::values) {
services.push_back(controller_service_node);
}
return services;
}

const ControllerServiceNodeMap::ServiceEntry* ControllerServiceNodeMap::getEntry(const std::string_view key, const std::scoped_lock<std::mutex>&) const {
const auto it = services_.find(key);
if (it != services_.end()) {
return &it->second;
}
const auto primary_key_it = alternative_keys.find(key);
if (primary_key_it == alternative_keys.end()) {
return nullptr;
}
const auto it_from_primary = services_.find(primary_key_it->second);
gsl_Expects(it_from_primary != services_.end());
return &it_from_primary->second;
}


bool ControllerServiceNodeMap::registerAlternativeKey(std::string primary_key, std::string alternative_key) {
std::scoped_lock lock(mutex_);
if (!services_.contains(primary_key) || services_.contains(alternative_key) || alternative_keys.contains(alternative_key)) {
return false;
}

auto [_it, success] = alternative_keys.emplace(std::move(alternative_key), std::move(primary_key));
return success;
}

} // namespace org::apache::nifi::minifi::core::controller
11 changes: 8 additions & 3 deletions libminifi/src/core/controller/ControllerServiceProvider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,15 @@ std::shared_ptr<ControllerService> ControllerServiceProvider::getControllerServi
}
}

void ControllerServiceProvider::putControllerServiceNode(const std::string& identifier, const std::shared_ptr<ControllerServiceNode>& controller_service_node, ProcessGroup* process_group) {
void ControllerServiceProvider::putControllerServiceNode(const std::string& primary_key,
const std::shared_ptr<ControllerServiceNode>& controller_service_node,
ProcessGroup* process_group,
const std::optional<std::string>& alternative_key) {
gsl_Expects(controller_map_);
controller_map_->put(identifier, controller_service_node);
controller_map_->put(identifier, process_group);
controller_map_->put(primary_key, controller_service_node, process_group);
if (alternative_key) {
controller_map_->registerAlternativeKey(primary_key, *alternative_key);
}
}

} // namespace org::apache::nifi::minifi::core::controller
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ using namespace std::literals::chrono_literals;

namespace org::apache::nifi::minifi::core::controller {

std::shared_ptr<ControllerServiceNode> StandardControllerServiceProvider::createControllerService(const std::string& type, const std::string& id) {
std::shared_ptr<ControllerServiceNode> StandardControllerServiceProvider::createControllerService(const std::string& type,
const std::string& id,
ProcessGroup* parent_group,
const std::optional<std::string>& alternative_key) {
std::shared_ptr<ControllerService> new_controller_service = extension_loader_.instantiate<ControllerService>(type, id);

if (!new_controller_service) {
Expand All @@ -35,7 +38,11 @@ std::shared_ptr<ControllerServiceNode> StandardControllerServiceProvider::create
sharedFromThis<ControllerServiceProvider>(), id,
configuration_);

controller_map_->put(id, new_service_node);
controller_map_->put(id, new_service_node, parent_group);
if (alternative_key) {
controller_map_->registerAlternativeKey(id, *alternative_key);
}

return new_service_node;
}

Expand Down
5 changes: 1 addition & 4 deletions libminifi/src/core/flow/StructuredConfiguration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ void StructuredConfiguration::parseControllerServices(const Node& controller_ser

utils::Identifier uuid;
uuid = id;
std::shared_ptr<core::controller::ControllerServiceNode> controller_service_node = createControllerService(type, name, uuid);
std::shared_ptr<core::controller::ControllerServiceNode> controller_service_node = createControllerService(type, name, uuid, parent_group);
if (nullptr != controller_service_node) {
logger_->log_debug("Created Controller Service with UUID {} and name {}", id, name);
controller_service_node->initialize();
Expand All @@ -668,9 +668,6 @@ void StructuredConfiguration::parseControllerServices(const Node& controller_ser
}
}

service_provider_->putControllerServiceNode(id, controller_service_node, parent_group);
service_provider_->putControllerServiceNode(name, controller_service_node, parent_group);

parent_group->addControllerService(controller_service_node->getName(), controller_service_node);
parent_group->addControllerService(controller_service_node->getUUIDStr(), controller_service_node);
} else {
Expand Down
5 changes: 1 addition & 4 deletions libminifi/test/libtest/unit/TestBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ std::shared_ptr<minifi::core::controller::ControllerServiceNode> TestPlan::addCo
minifi::utils::Identifier uuid = minifi::utils::IdGenerator::getIdGenerator()->generate();

std::shared_ptr<minifi::core::controller::ControllerServiceNode> controller_service_node =
controller_services_provider_->createControllerService(controller_name, name);
controller_services_provider_->createControllerService(controller_name, name, root_process_group_.get(), uuid.to_string());
if (controller_service_node == nullptr) {
return nullptr;
}
Expand All @@ -378,9 +378,6 @@ std::shared_ptr<minifi::core::controller::ControllerServiceNode> TestPlan::addCo
controller_service_node->setUUID(uuid);
controller_service_node->setName(name);

controller_services_provider_->putControllerServiceNode(uuid.to_string(), controller_service_node, root_process_group_.get());
controller_services_provider_->putControllerServiceNode(name, controller_service_node, root_process_group_.get());

root_process_group_->addControllerService(uuid.to_string(), controller_service_node);

return controller_service_node;
Expand Down
2 changes: 1 addition & 1 deletion libminifi/test/unit/ProcessorConfigUtilsTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class WrongTestControllerService : public TestControllerService {};
class TestControllerServiceProvider : public controller::ControllerServiceProvider {
public:
using ControllerServiceProvider::ControllerServiceProvider;
std::shared_ptr<controller::ControllerServiceNode> createControllerService(const std::string&, const std::string&) override { return nullptr; }
std::shared_ptr<controller::ControllerServiceNode> createControllerService(const std::string&, const std::string&, ProcessGroup*, const std::optional<std::string>&) override { return nullptr; }
void clearControllerServices() override {}
void enableAllControllerServices() override {}
void disableAllControllerServices() override {}
Expand Down
Loading