diff --git a/CONFIGURE.md b/CONFIGURE.md index 62e8a8768e..40cb44b7b4 100644 --- a/CONFIGURE.md +++ b/CONFIGURE.md @@ -41,6 +41,7 @@ - [Configuring Repositories](#configuring-repositories) - [Configuring Volatile Repositories](#configuring-volatile-repositories) - [Configuring Repository storage locations](#configuring-repository-storage-locations) + - [Configuring cache size for rocksdb content repository](#configuring-cache-size-for-rocksdb-content-repository) - [Configuring compression for rocksdb database](#configuring-compression-for-rocksdb-database) - [Configuring compaction for rocksdb database](#configuring-compaction-for-rocksdb-database) - [Configuring synchronous or asynchronous writes for RocksDB content repository](#configuring-synchronous-or-asynchronous-writes-for-rocksdb-content-repository) @@ -674,6 +675,19 @@ In a Filesystem Hierarchy Standard (FHS) installation (from an RPM package), the nifi.flowfile.repository.directory.default=/var/lib/nifi-minifi-cpp/flowfile_repository nifi.database.content.repository.directory.default=/var/lib/nifi-minifi-cpp/content_repository +### Configuring cache size for rocksdb content repository + +The RocksDB content repository uses a cache to limit memory usage. The cache size can be configured using the following property. +This should limit the memory usage but may cause minimal processing overhead. + + # in minifi.properties + nifi.database.content.repository.optimize.for.small.db.cache.size=8 MB + +You can disable this cache by setting it to an empty value. + + # in minifi.properties + nifi.database.content.repository.optimize.for.small.db.cache.size= + ### Configuring compression for rocksdb database diff --git a/conf/minifi.properties.in b/conf/minifi.properties.in index 7ae002fbce..d4bcbd93b5 100644 --- a/conf/minifi.properties.in +++ b/conf/minifi.properties.in @@ -52,6 +52,7 @@ nifi.content.repository.class.name=DatabaseContentRepository ## Relates to the internal workings of the rocksdb backend # nifi.flowfile.repository.rocksdb.compaction.period=2 min # nifi.database.content.repository.rocksdb.compaction.period=2 min +# nifi.database.content.repository.optimize.for.small.db.cache.size=8 MB # setting this value to "0" enables synchronous deletion # nifi.database.content.repository.purge.period = 1 sec diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp b/extensions/rocksdb-repos/DatabaseContentRepository.cpp index ddb7b0dc0f..95e8dbec5f 100644 --- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp +++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp @@ -58,21 +58,51 @@ bool DatabaseContentRepository::initialize(const std::shared_ptr& db_opts) { + const auto cache_size = configuration->get(Configure::nifi_dbcontent_optimize_for_small_db_cache_size).or_else([] { return std::make_optional("8 MB"); }) + | utils::andThen([](const auto& cache_size_str) -> std::optional { + return parsing::parseDataSize(cache_size_str) | utils::toOptional(); + }); + + std::shared_ptr cache = nullptr; + std::shared_ptr wbm = nullptr; + if (cache_size) { + cache = rocksdb::NewLRUCache(*cache_size); + wbm = std::make_shared(0, cache); + logger_->log_trace("Using {} sized cache for DatabaseContentRepository", *cache_size); + } else { + logger_->log_trace("Cache limitation disabled for DatabaseContentRepository"); + } + + auto set_db_opts = [encrypted_env, &cache, &wbm] (minifi::internal::Writable& db_opts) { minifi::internal::setCommonRocksDbOptions(db_opts); if (encrypted_env) { db_opts.set(&rocksdb::DBOptions::env, encrypted_env.get(), EncryptionEq{}); } else { db_opts.set(&rocksdb::DBOptions::env, rocksdb::Env::Default()); } + db_opts.optimizeForSmallDb(cache, wbm); }; - auto set_cf_opts = [&configuration] (rocksdb::ColumnFamilyOptions& cf_opts) { + auto set_cf_opts = [&configuration, &cache] (rocksdb::ColumnFamilyOptions& cf_opts) { cf_opts.OptimizeForPointLookup(4); cf_opts.merge_operator = std::make_shared(); cf_opts.max_successive_merges = 0; - if (auto compression_type = minifi::internal::readConfiguredCompressionType(configuration, Configure::nifi_content_repository_rocksdb_compression)) { + cf_opts.max_write_buffer_number = 2; + cf_opts.write_buffer_size = 4_MB; + if (const auto compression_type = internal::readConfiguredCompressionType(configuration, Configure::nifi_content_repository_rocksdb_compression)) { cf_opts.compression = *compression_type; } + if (cache) { + rocksdb::BlockBasedTableOptions table_options; + table_options.block_cache = cache; + + table_options.cache_index_and_filter_blocks = true; + table_options.cache_index_and_filter_blocks_with_high_priority = true; + + table_options.pin_l0_filter_and_index_blocks_in_cache = false; + table_options.pin_top_level_index_and_filter = false; + + cf_opts.table_factory.reset(rocksdb::NewBlockBasedTableFactory(table_options)); + } }; db_ = minifi::internal::RocksDatabase::create(set_db_opts, set_cf_opts, directory_, minifi::internal::getRocksDbOptionsToOverride(configuration, Configure::nifi_content_repository_rocksdb_options)); diff --git a/extensions/rocksdb-repos/database/OpenRocksDb.cpp b/extensions/rocksdb-repos/database/OpenRocksDb.cpp index c48c4b62c6..9989ab3bd8 100644 --- a/extensions/rocksdb-repos/database/OpenRocksDb.cpp +++ b/extensions/rocksdb-repos/database/OpenRocksDb.cpp @@ -135,23 +135,23 @@ std::optional OpenRocksDb::getApproximateSizes() const { return std::nullopt; } -minifi::core::RepositoryMetricsSource::RocksDbStats OpenRocksDb::getStats() { - minifi::core::RepositoryMetricsSource::RocksDbStats stats; - std::string table_readers; - GetProperty("rocksdb.estimate-table-readers-mem", &table_readers); +void OpenRocksDb::fillU64FromProperty(uint64_t& member, std::string_view property_name) { + std::string property_value; + GetProperty(property_name, &property_value); try { - stats.table_readers_size = std::stoull(table_readers); + member = std::stoull(property_value); } catch (const std::exception&) { - logger_->log_warn("Could not retrieve valid 'rocksdb.estimate-table-readers-mem' property value from rocksdb content repository!"); + logger_->log_warn("Could not retrieve valid '{}' property value from rocksdb content repository!", property_name); } +} - std::string all_memtables; - GetProperty("rocksdb.cur-size-all-mem-tables", &all_memtables); - try { - stats.all_memory_tables_size = std::stoull(all_memtables); - } catch (const std::exception&) { - logger_->log_warn("Could not retrieve valid 'rocksdb.cur-size-all-mem-tables' property value from rocksdb content repository!"); - } + +minifi::core::RepositoryMetricsSource::RocksDbStats OpenRocksDb::getStats() { + minifi::core::RepositoryMetricsSource::RocksDbStats stats; + fillU64FromProperty(stats.table_readers_size, "rocksdb.estimate-table-readers-mem"); + fillU64FromProperty(stats.all_memory_tables_size, "rocksdb.cur-size-all-mem-tables"); + fillU64FromProperty(stats.block_cache_usage, "rocksdb.block-cache-usage"); + fillU64FromProperty(stats.block_cache_pinned_usage, "rocksdb.block-cache-pinned-usage"); return stats; } diff --git a/extensions/rocksdb-repos/database/OpenRocksDb.h b/extensions/rocksdb-repos/database/OpenRocksDb.h index b97fe880a1..dfd93b28cd 100644 --- a/extensions/rocksdb-repos/database/OpenRocksDb.h +++ b/extensions/rocksdb-repos/database/OpenRocksDb.h @@ -81,6 +81,8 @@ class OpenRocksDb { void handleResult(const rocksdb::Status& result); void handleResult(const std::vector& results); + void fillU64FromProperty(uint64_t& member, std::string_view property_name); + gsl::not_null db_; gsl::not_null> impl_; gsl::not_null> column_; diff --git a/extensions/rocksdb-repos/database/RocksDbUtils.h b/extensions/rocksdb-repos/database/RocksDbUtils.h index 627517577f..07eaed95cd 100644 --- a/extensions/rocksdb-repos/database/RocksDbUtils.h +++ b/extensions/rocksdb-repos/database/RocksDbUtils.h @@ -49,6 +49,18 @@ class Writable { } } + template + decltype(auto) call(Method method, Args&&... args) { + return std::invoke(method, target_, std::forward(args)...); + } + + void optimizeForSmallDb(std::shared_ptr cache, std::shared_ptr wbm) { + if (!cache || !wbm) { return; } + target_.OptimizeForSmallDb(&cache); + target_.write_buffer_manager = wbm; + target_.max_open_files = 20; + } + template const F& get(F T::* member) { return target_.*member; diff --git a/extensions/rocksdb-repos/tests/DBContentRepositoryTests.cpp b/extensions/rocksdb-repos/tests/DBContentRepositoryTests.cpp index 50195748b1..ac122fbd86 100644 --- a/extensions/rocksdb-repos/tests/DBContentRepositoryTests.cpp +++ b/extensions/rocksdb-repos/tests/DBContentRepositoryTests.cpp @@ -355,3 +355,35 @@ TEST_CASE("DBContentRepository can clear orphan entries") { REQUIRE(getDbSize(dir) == 0); } + +TEST_CASE("nifi_dbcontent_optimize_for_small_db_cache_size default") { + LogTestController::getInstance().setTrace(); + + const auto configuration = std::make_shared(); + const auto content_repo = std::make_shared(); + REQUIRE(content_repo->initialize(configuration)); + + CHECK(LogTestController::getInstance().contains("Using 8388608 sized cache for DatabaseContentRepository")); +} + +TEST_CASE("nifi_dbcontent_optimize_for_small_db_cache_size override") { + LogTestController::getInstance().setTrace(); + + const auto configuration = std::make_shared(); + configuration->set("nifi.database.content.repository.optimize.for.small.db.cache.size", "100 MB"); + const auto content_repo = std::make_shared(); + REQUIRE(content_repo->initialize(configuration)); + + CHECK(LogTestController::getInstance().contains("Using 104857600 sized cache for DatabaseContentRepository")); +} + +TEST_CASE("nifi_dbcontent_optimize_for_small_db_cache_size disable") { + LogTestController::getInstance().setTrace(); + + const auto configuration = std::make_shared(); + configuration->set("nifi.database.content.repository.optimize.for.small.db.cache.size", ""); + const auto content_repo = std::make_shared(); + REQUIRE(content_repo->initialize(configuration)); + + CHECK(LogTestController::getInstance().contains("Cache limitation disabled for DatabaseContentRepository")); +} diff --git a/libminifi/src/Configuration.cpp b/libminifi/src/Configuration.cpp index 191705fac3..17dba361b0 100644 --- a/libminifi/src/Configuration.cpp +++ b/libminifi/src/Configuration.cpp @@ -54,6 +54,7 @@ const std::unordered_map RepositoryMetricsSourceStore::serialize() co if (auto rocksdb_stats = repo->getRocksDbStats()) { parent.children.push_back({.name = "rocksDbTableReadersSize", .value = rocksdb_stats->table_readers_size}); parent.children.push_back({.name = "rocksDbAllMemoryTablesSize", .value = rocksdb_stats->all_memory_tables_size}); + parent.children.push_back({.name = "rocksDbBlockCacheUsage", .value = rocksdb_stats->block_cache_usage}); + parent.children.push_back({.name = "rocksDbBlockCachePinnedUsage", .value = rocksdb_stats->block_cache_pinned_usage}); } serialized.push_back(parent); @@ -68,6 +70,10 @@ std::vector RepositoryMetricsSourceStore::calculateMetrics() co {{"metric_class", name_}, {"repository_name", repo->getRepositoryName()}}}); metrics.push_back({"rocksdb_all_memory_tables_size_bytes", static_cast(rocksdb_stats->all_memory_tables_size), {{"metric_class", name_}, {"repository_name", repo->getRepositoryName()}}}); + metrics.push_back({"rocksdb_block_cache_usage_bytes", static_cast(rocksdb_stats->block_cache_usage), + {{"metric_class", name_}, {"repository_name", repo->getRepositoryName()}}}); + metrics.push_back({"rocksdb_block_cache_pinned_usage_bytes", static_cast(rocksdb_stats->block_cache_pinned_usage), + {{"metric_class", name_}, {"repository_name", repo->getRepositoryName()}}}); } } return metrics; diff --git a/libminifi/test/libtest/unit/ProvenanceTestHelper.h b/libminifi/test/libtest/unit/ProvenanceTestHelper.h index 2b5b70bca2..4da9cea147 100644 --- a/libminifi/test/libtest/unit/ProvenanceTestHelper.h +++ b/libminifi/test/libtest/unit/ProvenanceTestHelper.h @@ -161,7 +161,9 @@ class TestRocksDbRepository : public TestThreadedRepository { std::optional getRocksDbStats() const override { return RocksDbStats { .table_readers_size = 100, - .all_memory_tables_size = 200 + .all_memory_tables_size = 200, + .block_cache_usage = 85, + .block_cache_pinned_usage = 50 }; } }; diff --git a/libminifi/test/libtest/unit/TestUtils.cpp b/libminifi/test/libtest/unit/TestUtils.cpp index c56f44f755..6442259122 100644 --- a/libminifi/test/libtest/unit/TestUtils.cpp +++ b/libminifi/test/libtest/unit/TestUtils.cpp @@ -260,4 +260,43 @@ std::error_code sendMessagesViaSSL(const std::vector& contents return {}; } +std::vector extractLogMessageViews(const std::string& log_str) { + std::vector messages; + const std::regex header_pattern(R"((\[[\d\-\s\:\.]+\]) (\s*\[[^\]]+\]) \[(.*)\])"); + struct HeaderMarker { + size_t start; + std::string_view timestamp; + std::string_view logger_class; + std::string_view log_level; + size_t end; + }; + + std::vector markers = ranges::subrange(std::sregex_iterator(log_str.begin(), log_str.end(), header_pattern), + std::sregex_iterator()) | + ranges::views::transform([=](const std::smatch& m) { + return HeaderMarker{.start = static_cast(m.position(0)), + .timestamp = std::string_view{log_str.data() + m.position(1), static_cast(m.length(1))}, + .logger_class = std::string_view{log_str.data() + m.position(2), static_cast(m.length(2))}, + .log_level = std::string_view{log_str.data() + m.position(3), static_cast(m.length(3))}, + .end = static_cast(m.position(0) + m.length(0)) + }; + }) | ranges::to(); + + markers.push_back(HeaderMarker{.start = log_str.size(), + .timestamp = {}, + .logger_class = {}, + .log_level = {}, + .end = log_str.size() + }); + + for (auto window: markers | ranges::views::sliding(2)) { + messages.push_back(LogMessageView{.timestamp = window[0].timestamp, + .logger_class = window[0].logger_class, + .log_level = window[0].log_level, + .payload = {log_str.data() + window[0].end, window[1].start - window[0].end}}); + } + + return messages; +} + } // namespace org::apache::nifi::minifi::test::utils diff --git a/libminifi/test/libtest/unit/TestUtils.h b/libminifi/test/libtest/unit/TestUtils.h index 64af75c7c2..c658405406 100644 --- a/libminifi/test/libtest/unit/TestUtils.h +++ b/libminifi/test/libtest/unit/TestUtils.h @@ -37,9 +37,8 @@ #include "asio.hpp" #include "asio/ssl.hpp" #include "utils/net/Ssl.h" -#include "range/v3/algorithm/any_of.hpp" #include "core/Processor.h" -#include "core/logging/LoggerFactory.h" +#include #include "./ProcessorUtils.h" using namespace std::literals::chrono_literals; @@ -127,6 +126,12 @@ bool verifyLogLineVariantPresenceInPollTime(const std::chrono::duration +bool verifyLogMatchesRegexInPollTime(const std::chrono::duration& wait_duration, const std::string& regex) { + auto check = [®ex] { return LogTestController::getInstance().matchesRegex(regex); }; + return verifyEventHappenedInPollTime(wait_duration, check); +} + namespace internal { struct JsonContext { const JsonContext *parent{nullptr}; @@ -234,6 +239,15 @@ inline bool runningAsUnixRoot() { #endif } +struct LogMessageView { + std::string_view timestamp; + std::string_view logger_class; + std::string_view log_level; + std::string_view payload; +}; + +std::vector extractLogMessageViews(const std::string& log_str); + } // namespace org::apache::nifi::minifi::test::utils namespace Catch { diff --git a/libminifi/test/unit/LogMetricsPublisherTests.cpp b/libminifi/test/unit/LogMetricsPublisherTests.cpp index abf8d55092..d2250f5005 100644 --- a/libminifi/test/unit/LogMetricsPublisherTests.cpp +++ b/libminifi/test/unit/LogMetricsPublisherTests.cpp @@ -18,11 +18,11 @@ #include #include -#include "unit/TestBase.h" -#include "unit/Catch.h" +#include "core/RepositoryFactory.h" #include "core/state/LogMetricsPublisher.h" #include "core/state/nodes/ResponseNodeLoader.h" -#include "core/RepositoryFactory.h" +#include "unit/Catch.h" +#include "unit/TestBase.h" #include "unit/TestUtils.h" #include "utils/file/FileUtils.h" @@ -33,12 +33,12 @@ namespace org::apache::nifi::minifi::test { class LogPublisherTestFixture { public: LogPublisherTestFixture() - : configuration_(std::make_shared()), - provenance_repo_(core::createRepository("provenancerepository", "provenancerepository")), - flow_file_repo_(core::createRepository("flowfilerepository", "flowfilerepository")), - response_node_loader_(std::make_shared(configuration_, - std::vector>{provenance_repo_, flow_file_repo_}, nullptr)), - publisher_(std::make_unique("LogMetricsPublisher")) { + : configuration_(std::make_shared()), + provenance_repo_(core::createRepository("provenancerepository", "provenancerepository")), + flow_file_repo_(core::createRepository("flowfilerepository", "flowfilerepository")), + response_node_loader_(std::make_shared(configuration_, + std::vector>{provenance_repo_, flow_file_repo_}, nullptr)), + publisher_(std::make_unique("LogMetricsPublisher")) { provenance_repo_->initialize(configuration_); flow_file_repo_->initialize(configuration_); } @@ -67,13 +67,13 @@ class LogPublisherTestFixture { TEST_CASE_METHOD(LogPublisherTestFixture, "Logging interval property is mandatory", "[LogMetricsPublisher]") { LogTestController::getInstance().setTrace(); SECTION("No logging interval is set") { - REQUIRE_THROWS_WITH(publisher_->initialize(configuration_, response_node_loader_), "General Operation: Metrics logging interval not configured for log metrics publisher!"); + REQUIRE_THROWS_WITH(publisher_->initialize(configuration_, response_node_loader_), + "General Operation: Metrics logging interval not configured for log metrics publisher!"); } SECTION("Logging interval is set to 2 seconds") { configuration_->set(minifi::Configuration::nifi_metrics_publisher_log_metrics_logging_interval, "2s"); - using org::apache::nifi::minifi::test::utils::verifyLogLinePresenceInPollTime; publisher_->initialize(configuration_, response_node_loader_); - REQUIRE(verifyLogLinePresenceInPollTime(5s, "Metric logging interval is set to 2000ms")); + REQUIRE(utils::verifyLogLinePresenceInPollTime(5s, "Metric logging interval is set to 2000ms")); } } @@ -86,8 +86,43 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify empty metrics if no valid metr } publisher_->initialize(configuration_, response_node_loader_); publisher_->loadMetricNodes(); - using org::apache::nifi::minifi::test::utils::verifyLogLinePresenceInPollTime; - REQUIRE(verifyLogLinePresenceInPollTime(5s, "LogMetricsPublisher is configured without any valid metrics!")); + REQUIRE(utils::verifyLogLinePresenceInPollTime(5s, "LogMetricsPublisher is configured without any valid metrics!")); +} + +bool check_exact_metrics_value(const rapidjson::Value& repo_metrics, const std::string_view key, const std::string_view expected_value) { + const auto key_ref = rapidjson::StringRef(key.data(), key.size()); + const auto member_it = repo_metrics.FindMember(key_ref); + if (member_it == repo_metrics.MemberEnd()) { return false; } + const auto actual_value = std::string_view{member_it->value.GetString(), member_it->value.GetStringLength()}; + return actual_value == expected_value; +} + +bool isExpectedRepositoryMetricsLogMessage(const utils::LogMessageView& message_view, std::string_view log_level) { + if (message_view.log_level != log_level) { return false; } + if (message_view.logger_class != "[org::apache::nifi::minifi::state::LogMetricsPublisher]") { return false; } + rapidjson::Document document; + if (const rapidjson::ParseResult res = document.Parse(message_view.payload.data(), message_view.payload.length()); !res) { return false; } + if (!document.HasMember("LogMetrics")) { return false; } + const auto& log_metrics = document["LogMetrics"].GetObject(); + if (!log_metrics.HasMember("RepositoryMetrics")) { return false; } + const auto& repository_metrics = log_metrics["RepositoryMetrics"].GetObject(); + if (!repository_metrics.HasMember("provenancerepository") || !repository_metrics.HasMember("flowfilerepository")) { return false; } + const rapidjson::Value& provenance_repo_metrics = repository_metrics["provenancerepository"]; + const rapidjson::Value& flow_file_repo_metrics = repository_metrics["flowfilerepository"]; + + const auto repo_is_okay = [](const rapidjson::Value& repo_metrics) -> bool { + return check_exact_metrics_value(repo_metrics, "full", "false") + && check_exact_metrics_value(repo_metrics, "running", "false") + && check_exact_metrics_value(repo_metrics, "size", "0") + && check_exact_metrics_value(repo_metrics, "maxSize", "0") + && check_exact_metrics_value(repo_metrics, "entryCount", "0") + && check_exact_metrics_value(repo_metrics, "rocksDbTableReadersSize", "0") + && check_exact_metrics_value(repo_metrics, "rocksDbAllMemoryTablesSize", "2048") + && repo_metrics.HasMember("rocksDbBlockCacheUsage") + && repo_metrics.HasMember("rocksDbBlockCachePinnedUsage"); + }; + + return repo_is_okay(provenance_repo_metrics) && repo_is_okay(flow_file_repo_metrics); } TEST_CASE_METHOD(LogPublisherTestFixture, "Verify multiple metric nodes in logs", "[LogMetricsPublisher]") { @@ -96,34 +131,14 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify multiple metric nodes in logs" configuration_->set(Configure::nifi_metrics_publisher_metrics, "RepositoryMetrics,DeviceInfoNode"); publisher_->initialize(configuration_, response_node_loader_); publisher_->loadMetricNodes(); - using org::apache::nifi::minifi::test::utils::verifyLogLinePresenceInPollTime; - std::string expected_log_1 = R"([info] { - "LogMetrics": {)"; - std::string expected_log_2 = R"("RepositoryMetrics": { - "provenancerepository": { - "running": "false", - "full": "false", - "size": "0", - "maxSize": "0", - "entryCount": "0", - "rocksDbTableReadersSize": "0", - "rocksDbAllMemoryTablesSize": "2048" - }, - "flowfilerepository": { - "running": "false", - "full": "false", - "size": "0", - "maxSize": "0", - "entryCount": "0", - "rocksDbTableReadersSize": "0", - "rocksDbAllMemoryTablesSize": "2048" - } - })"; - std::string expected_log_3 = R"("deviceInfo": { - "identifier":)"; - REQUIRE(verifyLogLinePresenceInPollTime(5s, expected_log_1)); - REQUIRE(verifyLogLinePresenceInPollTime(5s, expected_log_2)); - REQUIRE(verifyLogLinePresenceInPollTime(5s, expected_log_3)); + REQUIRE(utils::verifyEventHappenedInPollTime( + 5s, + [] { + const auto logs = LogTestController::getInstance().getLogs(); + const auto message_views = utils::extractLogMessageViews(logs); + return ranges::any_of(message_views, [](const auto& msg_view) { return isExpectedRepositoryMetricsLogMessage(msg_view, "info"); }); + }, + 100ms)); } TEST_CASE_METHOD(LogPublisherTestFixture, "Verify reloading different metrics", "[LogMetricsPublisher]") { @@ -132,42 +147,25 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify reloading different metrics", configuration_->set(Configure::nifi_metrics_publisher_metrics, "RepositoryMetrics"); publisher_->initialize(configuration_, response_node_loader_); publisher_->loadMetricNodes(); - using org::apache::nifi::minifi::test::utils::verifyLogLinePresenceInPollTime; - std::string expected_log = R"([info] { - "LogMetrics": { - "RepositoryMetrics": { - "provenancerepository": { - "running": "false", - "full": "false", - "size": "0", - "maxSize": "0", - "entryCount": "0", - "rocksDbTableReadersSize": "0", - "rocksDbAllMemoryTablesSize": "2048" - }, - "flowfilerepository": { - "running": "false", - "full": "false", - "size": "0", - "maxSize": "0", - "entryCount": "0", - "rocksDbTableReadersSize": "0", - "rocksDbAllMemoryTablesSize": "2048" - } - } - } -})"; - REQUIRE(verifyLogLinePresenceInPollTime(5s, expected_log)); + + REQUIRE(utils::verifyEventHappenedInPollTime( + 5s, + [] { + const auto logs = LogTestController::getInstance().getLogs(); + const auto message_views = utils::extractLogMessageViews(logs); + return ranges::any_of(message_views, [](const auto& msg_view) { return isExpectedRepositoryMetricsLogMessage(msg_view, "info"); }); + }, + 100ms)); publisher_->clearMetricNodes(); LogTestController::getInstance().reset(); LogTestController::getInstance().setTrace(); configuration_->set(Configure::nifi_metrics_publisher_metrics, "DeviceInfoNode"); publisher_->loadMetricNodes(); - expected_log = R"([info] { + std::string expected_log = R"([info] { "LogMetrics": { "deviceInfo": { "identifier":)"; - REQUIRE(verifyLogLinePresenceInPollTime(5s, expected_log)); + REQUIRE(utils::verifyLogLinePresenceInPollTime(5s, expected_log)); } TEST_CASE_METHOD(LogPublisherTestFixture, "Verify generic and publisher specific metric properties", "[LogMetricsPublisher]") { @@ -185,32 +183,14 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify generic and publisher specific } publisher_->initialize(configuration_, response_node_loader_); publisher_->loadMetricNodes(); - using org::apache::nifi::minifi::test::utils::verifyLogLinePresenceInPollTime; - std::string expected_log = R"([info] { - "LogMetrics": { - "RepositoryMetrics": { - "provenancerepository": { - "running": "false", - "full": "false", - "size": "0", - "maxSize": "0", - "entryCount": "0", - "rocksDbTableReadersSize": "0", - "rocksDbAllMemoryTablesSize": "2048" - }, - "flowfilerepository": { - "running": "false", - "full": "false", - "size": "0", - "maxSize": "0", - "entryCount": "0", - "rocksDbTableReadersSize": "0", - "rocksDbAllMemoryTablesSize": "2048" - } - } - } -})"; - REQUIRE(verifyLogLinePresenceInPollTime(5s, expected_log)); + REQUIRE(utils::verifyEventHappenedInPollTime( + 5s, + [] { + const auto logs = LogTestController::getInstance().getLogs(); + const auto message_views = utils::extractLogMessageViews(logs); + return ranges::any_of(message_views, [](const auto& msg_view) { return isExpectedRepositoryMetricsLogMessage(msg_view, "info"); }); + }, + 100ms)); } TEST_CASE_METHOD(LogPublisherTestFixture, "Verify changing log level property for logging", "[LogMetricsPublisher]") { @@ -220,32 +200,14 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify changing log level property fo configuration_->set(Configure::nifi_metrics_publisher_metrics, "RepositoryMetrics"); publisher_->initialize(configuration_, response_node_loader_); publisher_->loadMetricNodes(); - using org::apache::nifi::minifi::test::utils::verifyLogLinePresenceInPollTime; - std::string expected_log = R"([debug] { - "LogMetrics": { - "RepositoryMetrics": { - "provenancerepository": { - "running": "false", - "full": "false", - "size": "0", - "maxSize": "0", - "entryCount": "0", - "rocksDbTableReadersSize": "0", - "rocksDbAllMemoryTablesSize": "2048" - }, - "flowfilerepository": { - "running": "false", - "full": "false", - "size": "0", - "maxSize": "0", - "entryCount": "0", - "rocksDbTableReadersSize": "0", - "rocksDbAllMemoryTablesSize": "2048" - } - } - } -})"; - REQUIRE(verifyLogLinePresenceInPollTime(5s, expected_log)); + REQUIRE(utils::verifyEventHappenedInPollTime( + 5s, + [] { + const auto logs = LogTestController::getInstance().getLogs(); + const auto message_views = utils::extractLogMessageViews(logs); + return ranges::any_of(message_views, [](const auto& msg_view) { return isExpectedRepositoryMetricsLogMessage(msg_view, "debug"); }); + }, + 100ms)); } } // namespace org::apache::nifi::minifi::test diff --git a/libminifi/test/unit/MetricsTests.cpp b/libminifi/test/unit/MetricsTests.cpp index 25840d2e66..4fe02defd4 100644 --- a/libminifi/test/unit/MetricsTests.cpp +++ b/libminifi/test/unit/MetricsTests.cpp @@ -104,7 +104,7 @@ TEST_CASE("RepositorymetricsHaveRepo", "[c2m4]") { SECTION("RocksDB repository") { repo = std::make_shared(); - expected_metric_count = 7; + expected_metric_count = 9; } @@ -125,6 +125,8 @@ TEST_CASE("RepositorymetricsHaveRepo", "[c2m4]") { if (expected_metric_count > 5) { checkSerializedValue(resp.children, "rocksDbTableReadersSize", "100"); checkSerializedValue(resp.children, "rocksDbAllMemoryTablesSize", "200"); + checkSerializedValue(resp.children, "rocksDbBlockCacheUsage", "85"); + checkSerializedValue(resp.children, "rocksDbBlockCachePinnedUsage", "50"); } } diff --git a/minifi-api/include/minifi-cpp/core/RepositoryMetricsSource.h b/minifi-api/include/minifi-cpp/core/RepositoryMetricsSource.h index ca55e461c6..21f2308d17 100644 --- a/minifi-api/include/minifi-cpp/core/RepositoryMetricsSource.h +++ b/minifi-api/include/minifi-cpp/core/RepositoryMetricsSource.h @@ -28,6 +28,8 @@ class RepositoryMetricsSource { struct RocksDbStats { uint64_t table_readers_size{}; uint64_t all_memory_tables_size{}; + uint64_t block_cache_usage{}; + uint64_t block_cache_pinned_usage{}; }; virtual ~RepositoryMetricsSource() = default; diff --git a/minifi-api/include/minifi-cpp/properties/Configuration.h b/minifi-api/include/minifi-cpp/properties/Configuration.h index 3fccff2c28..0ea5f4ca53 100644 --- a/minifi-api/include/minifi-cpp/properties/Configuration.h +++ b/minifi-api/include/minifi-cpp/properties/Configuration.h @@ -76,6 +76,7 @@ class Configuration : public virtual Properties { static constexpr const char *nifi_flowfile_repository_rocksdb_read_verify_checksums = "nifi.flowfile.repository.rocksdb.read.verify.checksums"; static constexpr const char *nifi_provenance_repository_rocksdb_read_verify_checksums = "nifi.provenance.repository.rocksdb.read.verify.checksums"; static constexpr const char *nifi_rocksdb_state_storage_read_verify_checksums = "nifi.rocksdb.state.storage.read.verify.checksums"; + static constexpr const char *nifi_dbcontent_optimize_for_small_db_cache_size = "nifi.database.content.repository.optimize.for.small.db.cache.size"; static constexpr const char *nifi_remote_input_secure = "nifi.remote.input.secure"; static constexpr const char *nifi_security_need_ClientAuth = "nifi.security.need.ClientAuth";