Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ class ProcessContext {

bool hasNonEmptyProperty(std::string_view name) const;

std::string getProcessorName() const;

private:
MinifiProcessContext* impl_;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,37 +33,37 @@ namespace org::apache::nifi::minifi::api::utils {

inline std::string parseProperty(const core::ProcessContext& ctx, const minifi::core::PropertyReference& property, const core::FlowFile* flow_file = nullptr) {
return ctx.getProperty(property.name, flow_file)
| minifi::utils::orThrow(fmt::format("Expected valid value from \"{}::{}\"", ctx.getProcessorName(), property.name));
| minifi::utils::orThrow(fmt::format("Expected valid value from \"{}\"", property.name));
}

inline bool parseBoolProperty(const core::ProcessContext& ctx, const minifi::core::PropertyReference& property, const core::FlowFile* flow_file = nullptr) {
return ctx.getProperty(property.name, flow_file)
| minifi::utils::andThen(parsing::parseBool)
| minifi::utils::orThrow(fmt::format("Expected parsable bool from \"{}::{}\"", ctx.getProcessorName(), property.name));
| minifi::utils::orThrow(fmt::format("Expected parsable bool from \"{}\"", property.name));
}

inline uint64_t parseU64Property(const core::ProcessContext& ctx, const minifi::core::PropertyReference& property, const core::FlowFile* flow_file = nullptr) {
return ctx.getProperty(property.name, flow_file)
| minifi::utils::andThen(parsing::parseIntegral<uint64_t>)
| minifi::utils::orThrow(fmt::format("Expected parsable uint64_t from \"{}::{}\"", ctx.getProcessorName(), property.name));
| minifi::utils::orThrow(fmt::format("Expected parsable uint64_t from \"{}\"", property.name));
}

inline int64_t parseI64Property(const core::ProcessContext& ctx, const minifi::core::PropertyReference& property, const core::FlowFile* flow_file = nullptr) {
return ctx.getProperty(property.name, flow_file)
| minifi::utils::andThen(parsing::parseIntegral<int64_t>)
| minifi::utils::orThrow(fmt::format("Expected parsable int64_t from \"{}::{}\"", ctx.getProcessorName(), property.name));
| minifi::utils::orThrow(fmt::format("Expected parsable int64_t from \"{}\"", property.name));
}

inline std::chrono::milliseconds parseDurationProperty(const core::ProcessContext& ctx, const minifi::core::PropertyReference& property, const core::FlowFile* flow_file = nullptr) {
return ctx.getProperty(property.name, flow_file)
| minifi::utils::andThen(parsing::parseDuration<std::chrono::milliseconds>)
| minifi::utils::orThrow(fmt::format("Expected parsable duration from \"{}::{}\"", ctx.getProcessorName(), property.name));
| minifi::utils::orThrow(fmt::format("Expected parsable duration from \"{}\"", property.name));
}

inline uint64_t parseDataSizeProperty(const core::ProcessContext& ctx, const minifi::core::PropertyReference& property, const core::FlowFile* flow_file = nullptr) {
return ctx.getProperty(property.name, flow_file)
| minifi::utils::andThen(parsing::parseDataSize)
| minifi::utils::orThrow(fmt::format("Expected parsable data size from \"{}::{}\"", ctx.getProcessorName(), property.name));
| minifi::utils::orThrow(fmt::format("Expected parsable data size from \"{}\"", property.name));
}

inline std::optional<std::string> parseOptionalProperty(const core::ProcessContext& ctx, const minifi::core::PropertyReference& property, const core::FlowFile* flow_file = nullptr) {
Expand All @@ -74,7 +74,7 @@ inline std::optional<std::string> parseOptionalProperty(const core::ProcessConte
inline std::optional<bool> parseOptionalBoolProperty(const core::ProcessContext& ctx, const minifi::core::PropertyReference& property, const core::FlowFile* flow_file = nullptr) {
if (const auto property_str = ctx.getProperty(property.name, flow_file)) {
return parsing::parseBool(*property_str)
| minifi::utils::orThrow(fmt::format("Expected parsable bool from \"{}::{}\"", ctx.getProcessorName(), property.name));
| minifi::utils::orThrow(fmt::format("Expected parsable bool from \"{}\"", property.name));
}
return std::nullopt;
}
Expand All @@ -85,7 +85,7 @@ inline std::optional<uint64_t> parseOptionalU64Property(const core::ProcessConte
return std::nullopt;
}
return parsing::parseIntegral<uint64_t>(*property_str)
| minifi::utils::orThrow(fmt::format("Expected parsable uint64_t from \"{}::{}\"", ctx.getProcessorName(), property.name));
| minifi::utils::orThrow(fmt::format("Expected parsable uint64_t from \"{}\"", property.name));
}

return std::nullopt;
Expand All @@ -97,7 +97,7 @@ inline std::optional<int64_t> parseOptionalI64Property(const core::ProcessContex
return std::nullopt;
}
return parsing::parseIntegral<int64_t>(*property_str)
| minifi::utils::orThrow(fmt::format("Expected parsable int64_t from \"{}::{}\"", ctx.getProcessorName(), property.name));
| minifi::utils::orThrow(fmt::format("Expected parsable int64_t from \"{}\"", property.name));
}

return std::nullopt;
Expand All @@ -111,7 +111,7 @@ inline std::optional<std::chrono::milliseconds> parseOptionalDurationProperty(co
return std::nullopt;
}
return parsing::parseDuration(*property_str)
| minifi::utils::orThrow(fmt::format("Expected parsable duration from \"{}::{}\"", ctx.getProcessorName(), property.name));
| minifi::utils::orThrow(fmt::format("Expected parsable duration from \"{}\"", property.name));
}

return std::nullopt;
Expand All @@ -123,7 +123,7 @@ inline std::optional<uint64_t> parseOptionalDataSizeProperty(const core::Process
return std::nullopt;
}
return parsing::parseDataSize(*property_str)
| minifi::utils::orThrow(fmt::format("Expected parsable data size from \"{}::{}\"", ctx.getProcessorName(), property.name));
| minifi::utils::orThrow(fmt::format("Expected parsable data size from \"{}\"", property.name));
}

return std::nullopt;
Expand All @@ -135,7 +135,7 @@ inline std::optional<float> parseOptionalFloatProperty(const core::ProcessContex
return std::nullopt;
}
return parsing::parseFloat(*property_str)
| minifi::utils::orThrow(fmt::format("Expected parsable float from {}::{}", ctx.getProcessorName(), property.name));
| minifi::utils::orThrow(fmt::format("Expected parsable float from \"{}\"", property.name));
}
return std::nullopt;
}
Expand Down
10 changes: 0 additions & 10 deletions extension-framework/cpp-extension-lib/src/core/ProcessContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,4 @@ bool ProcessContext::hasNonEmptyProperty(std::string_view name) const {
return MinifiProcessContextHasNonEmptyProperty(impl_, utils::toStringView(name));
}

std::string ProcessContext::getProcessorName() const {
std::string result;
MinifiProcessContextGetProcessorName(impl_, [] (void* data, MinifiStringView name) {
*static_cast<std::string*>(data) = std::string(name.data, name.length);
}, &result);
return result;
}



} // namespace org::apache::nifi::minifi::api::core
22 changes: 15 additions & 7 deletions extension-framework/cpp-extension-lib/src/core/ProcessSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,15 @@ FlowFile ProcessSession::create(const FlowFile* parent) {

void ProcessSession::transfer(FlowFile ff, const minifi::core::Relationship& relationship) {
const auto rel_name = relationship.getName();
MinifiProcessSessionTransfer(impl_, ff.release(), utils::toStringView(rel_name));
if (MINIFI_STATUS_SUCCESS != MinifiProcessSessionTransfer(impl_, ff.release(), utils::toStringView(rel_name))) {
throw minifi::Exception(minifi::FILE_OPERATION_EXCEPTION, "Failed to transfer flowfile");
}
}

void ProcessSession::remove(FlowFile ff) {
MinifiProcessSessionRemove(impl_, ff.release());
if (MINIFI_STATUS_SUCCESS != MinifiProcessSessionRemove(impl_, ff.release())) {
throw minifi::Exception(minifi::FILE_OPERATION_EXCEPTION, "Failed to remove flowfile");
}
}

void ProcessSession::write(FlowFile& flow_file, const io::OutputStreamCallback& callback) {
Expand All @@ -104,13 +108,17 @@ void ProcessSession::read(FlowFile& flow_file, const io::InputStreamCallback& ca
}
}

void ProcessSession::setAttribute(FlowFile& ff, std::string_view key, std::string value) { // NOLINT(performance-unnecessary-value-param)
MinifiStringView value_ref = utils::toStringView(value);
MinifiFlowFileSetAttribute(impl_, ff.get(), utils::toStringView(key), &value_ref);
void ProcessSession::setAttribute(FlowFile& ff, const std::string_view key, std::string value) { // NOLINT(performance-unnecessary-value-param)
const MinifiStringView value_ref = utils::toStringView(value);
if (MINIFI_STATUS_SUCCESS != MinifiFlowFileSetAttribute(impl_, ff.get(), utils::toStringView(key), &value_ref)) {
throw minifi::Exception(minifi::FILE_OPERATION_EXCEPTION, "Failed to set attribute");
}
}

void ProcessSession::removeAttribute(FlowFile& ff, std::string_view key) {
MinifiFlowFileSetAttribute(impl_, ff.get(), utils::toStringView(key), nullptr);
void ProcessSession::removeAttribute(FlowFile& ff, const std::string_view key) {
if (MINIFI_STATUS_SUCCESS != MinifiFlowFileSetAttribute(impl_, ff.get(), utils::toStringView(key), nullptr)) {
throw minifi::Exception(minifi::FILE_OPERATION_EXCEPTION, "Failed to remove attribute");
}
}

std::optional<std::string> ProcessSession::getAttribute(FlowFile& ff, std::string_view key) {
Expand Down
24 changes: 12 additions & 12 deletions extension-framework/include/utils/ProcessorConfigUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,37 +34,37 @@ namespace org::apache::nifi::minifi::utils {

inline std::string parseProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file = nullptr) {
return ctx.getProperty(property.name, flow_file)
| orThrow(fmt::format("Expected valid value from \"{}::{}\"", ctx.getProcessorInfo().getName(), property.name));
| orThrow(fmt::format("Expected valid value from \"{}\"", property.name));
}

inline bool parseBoolProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file = nullptr) {
return ctx.getProperty(property.name, flow_file)
| andThen(parsing::parseBool)
| orThrow(fmt::format("Expected parsable bool from \"{}::{}\"", ctx.getProcessorInfo().getName(), property.name));
| orThrow(fmt::format("Expected parsable bool from \"{}\"", property.name));
}

inline uint64_t parseU64Property(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file = nullptr) {
return ctx.getProperty(property.name, flow_file)
| andThen(parsing::parseIntegral<uint64_t>)
| orThrow(fmt::format("Expected parsable uint64_t from \"{}::{}\"", ctx.getProcessorInfo().getName(), property.name));
| orThrow(fmt::format("Expected parsable uint64_t from \"{}\"", property.name));
}

inline int64_t parseI64Property(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file = nullptr) {
return ctx.getProperty(property.name, flow_file)
| andThen(parsing::parseIntegral<int64_t>)
| orThrow(fmt::format("Expected parsable int64_t from \"{}::{}\"", ctx.getProcessorInfo().getName(), property.name));
| orThrow(fmt::format("Expected parsable int64_t from \"{}\"", property.name));
}

inline std::chrono::milliseconds parseDurationProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file = nullptr) {
return ctx.getProperty(property.name, flow_file)
| andThen(parsing::parseDuration<std::chrono::milliseconds>)
| orThrow(fmt::format("Expected parsable duration from \"{}::{}\"", ctx.getProcessorInfo().getName(), property.name));
| orThrow(fmt::format("Expected parsable duration from \"{}\"", property.name));
}

inline uint64_t parseDataSizeProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file = nullptr) {
return ctx.getProperty(property.name, flow_file)
| andThen(parsing::parseDataSize)
| orThrow(fmt::format("Expected parsable data size from \"{}::{}\"", ctx.getProcessorInfo().getName(), property.name));
| orThrow(fmt::format("Expected parsable data size from \"{}\"", property.name));
}

inline std::optional<std::string> parseOptionalProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file = nullptr) {
Expand All @@ -75,7 +75,7 @@ inline std::optional<std::string> parseOptionalProperty(const core::ProcessConte
inline std::optional<bool> parseOptionalBoolProperty(const core::ProcessContext& ctx, const core::PropertyReference& property, const core::FlowFile* flow_file = nullptr) {
if (const auto property_str = ctx.getProperty(property.name, flow_file)) {
return parsing::parseBool(*property_str)
| utils::orThrow(fmt::format("Expected parsable bool from \"{}::{}\"", ctx.getProcessorInfo().getName(), property.name));
| utils::orThrow(fmt::format("Expected parsable bool from \"{}\"", property.name));
}
return std::nullopt;
}
Expand All @@ -86,7 +86,7 @@ inline std::optional<uint64_t> parseOptionalU64Property(const core::ProcessConte
return std::nullopt;
}
return parsing::parseIntegral<uint64_t>(*property_str)
| utils::orThrow(fmt::format("Expected parsable uint64_t from \"{}::{}\"", ctx.getProcessorInfo().getName(), property.name));
| utils::orThrow(fmt::format("Expected parsable uint64_t from \"{}\"", property.name));
}

return std::nullopt;
Expand All @@ -98,7 +98,7 @@ inline std::optional<int64_t> parseOptionalI64Property(const core::ProcessContex
return std::nullopt;
}
return parsing::parseIntegral<int64_t>(*property_str)
| utils::orThrow(fmt::format("Expected parsable int64_t from \"{}::{}\"", ctx.getProcessorInfo().getName(), property.name));
| utils::orThrow(fmt::format("Expected parsable int64_t from \"{}\"", property.name));
}

return std::nullopt;
Expand All @@ -110,7 +110,7 @@ inline std::optional<std::chrono::milliseconds> parseOptionalDurationProperty(co
return std::nullopt;
}
return parsing::parseDuration(*property_str)
| utils::orThrow(fmt::format("Expected parsable duration from \"{}::{}\"", ctx.getProcessorInfo().getName(), property.name));
| utils::orThrow(fmt::format("Expected parsable duration from \"{}\"", property.name));
}

return std::nullopt;
Expand All @@ -122,7 +122,7 @@ inline std::optional<uint64_t> parseOptionalDataSizeProperty(const core::Process
return std::nullopt;
}
return parsing::parseDataSize(*property_str)
| utils::orThrow(fmt::format("Expected parsable data size from \"{}::{}\"", ctx.getProcessorInfo().getName(), property.name));
| utils::orThrow(fmt::format("Expected parsable data size from \"{}\"", property.name));
}

return std::nullopt;
Expand All @@ -134,7 +134,7 @@ inline std::optional<float> parseOptionalFloatProperty(const core::ProcessContex
return std::nullopt;
}
return parsing::parseFloat(*property_str)
| utils::orThrow(fmt::format("Expected parsable float from {}::{}", ctx.getProcessorInfo().getName(), property.name));
| utils::orThrow(fmt::format("Expected parsable float from \"{}\"", property.name));
}
return std::nullopt;
}
Expand Down
2 changes: 1 addition & 1 deletion extensions/azure/tests/FetchAzureDataLakeStorageTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ TEST_CASE_METHOD(FetchAzureDataLakeStorageTestsFixture, "Valid Number of Retries

TEST_CASE_METHOD(FetchAzureDataLakeStorageTestsFixture, "Invalid Number of Retries is set via EL", "[azureDataLakeStorageFetch]") {
plan_->setProperty(azure_data_lake_storage_, minifi::azure::processors::FetchAzureDataLakeStorage::NumberOfRetries, "${literal(\"asd\")}");
REQUIRE_THROWS_WITH(test_controller_.runSession(plan_, true), "Expected parsable uint64_t from \"AzureDataLakeStorageProcessor::Number of Retries\", but got GeneralParsingError (Parsing Error:0)");
REQUIRE_THROWS_WITH(test_controller_.runSession(plan_, true), "Expected parsable uint64_t from \"Number of Retries\", but got GeneralParsingError (Parsing Error:0)");
CHECK_FALSE(mock_data_lake_storage_client_ptr_->getPassedFetchParams().number_of_retries);
}

Expand Down
4 changes: 2 additions & 2 deletions extensions/llamacpp/tests/RunLlamaCppInferenceTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ TEST_CASE("Invalid values for optional double type properties throw exception")

REQUIRE_THROWS(controller.trigger(minifi::test::InputFlowFileData{.content = "42", .attributes = {}}));
CHECK(minifi::test::utils::verifyLogLinePresenceInPollTime(1s,
fmt::format("Expected parsable float from RunLlamaCppInference::{}, but got GeneralParsingError (Parsing Error:0)", property_name)));
fmt::format("Expected parsable float from \"{}\", but got GeneralParsingError (Parsing Error:0)", property_name)));
}

TEST_CASE("Top K property empty and invalid values are handled properly") {
Expand All @@ -252,7 +252,7 @@ TEST_CASE("Top K property empty and invalid values are handled properly") {
REQUIRE(controller.getProcessor()->setProperty(processors::RunLlamaCppInference::TopK.name, "invalid_value"));
REQUIRE_THROWS(controller.trigger(minifi::test::InputFlowFileData{.content = "42", .attributes = {}}));
CHECK(minifi::test::utils::verifyLogLinePresenceInPollTime(1s,
"Expected parsable int64_t from \"RunLlamaCppInference::Top K\", but got GeneralParsingError (Parsing Error:0)"));
"Expected parsable int64_t from \"Top K\", but got GeneralParsingError (Parsing Error:0)"));
}
}

Expand Down
4 changes: 2 additions & 2 deletions extensions/mqtt/tests/ConsumeMQTTTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,13 @@ using namespace std::literals::chrono_literals;
TEST_CASE_METHOD(ConsumeMqttTestFixture, "ConsumeMQTTTest_EmptyTopic", "[consumeMQTTTest]") {
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
REQUIRE_THROWS_WITH(test_controller_.plan->scheduleProcessor(consume_mqtt_processor_),
Catch::Matchers::EndsWith("Expected valid value from \"TestConsumeMQTTProcessor::Topic\", but got PropertyNotSet (Property Error:2)"));
Catch::Matchers::EndsWith("Expected valid value from \"Topic\", but got PropertyNotSet (Property Error:2)"));
}

TEST_CASE_METHOD(ConsumeMqttTestFixture, "ConsumeMQTTTest_EmptyBrokerURI", "[consumeMQTTTest]") {
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
REQUIRE_THROWS_WITH(test_controller_.plan->scheduleProcessor(consume_mqtt_processor_),
Catch::Matchers::EndsWith("Expected valid value from \"TestConsumeMQTTProcessor::Broker URI\", but got PropertyNotSet (Property Error:2)"));
Catch::Matchers::EndsWith("Expected valid value from \"Broker URI\", but got PropertyNotSet (Property Error:2)"));
}

TEST_CASE_METHOD(ConsumeMqttTestFixture, "ConsumeMQTTTest_DurableSessionWithID", "[consumeMQTTTest]") {
Expand Down
2 changes: 1 addition & 1 deletion extensions/mqtt/tests/PublishMQTTTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ TEST_CASE_METHOD(PublishMQTTTestFixture, "PublishMQTTTest_EmptyTopic", "[publish
TEST_CASE_METHOD(PublishMQTTTestFixture, "PublishMQTTTest_EmptyBrokerURI", "[publishMQTTTest]") {
REQUIRE(test_controller_.plan->setProperty(publish_mqtt_processor_, minifi::processors::PublishMQTT::Topic.name, "mytopic"));
REQUIRE_THROWS_WITH(test_controller_.plan->scheduleProcessor(publish_mqtt_processor_),
Catch::Matchers::EndsWith("Expected valid value from \"TestPublishMQTTProcessor::Broker URI\", but got PropertyNotSet (Property Error:2)"));
Catch::Matchers::EndsWith("Expected valid value from \"Broker URI\", but got PropertyNotSet (Property Error:2)"));
}

TEST_CASE_METHOD(PublishMQTTTestFixture, "PublishMQTTTest_EmptyClientID_V_3", "[publishMQTTTest]") {
Expand Down
Loading
Loading