From 05ba6a55db1fbc4978afacb6991ea06fd0a99ff8 Mon Sep 17 00:00:00 2001 From: Martin Zink Date: Tue, 25 Nov 2025 17:49:20 +0100 Subject: [PATCH 1/5] MINIFICPP-2696 Add option for optimizationForSmallDB in DatabaseContentRepository --- .../DatabaseContentRepository.cpp | 21 ++- .../rocksdb-repos/database/OpenRocksDb.cpp | 48 ++++-- .../rocksdb-repos/database/RocksDbUtils.h | 6 + .../nodes/RepositoryMetricsSourceStore.cpp | 6 + .../test/libtest/unit/ProvenanceTestHelper.h | 4 +- libminifi/test/libtest/unit/TestUtils.h | 6 + .../test/unit/LogMetricsPublisherTests.cpp | 161 ++++++------------ libminifi/test/unit/MetricsTests.cpp | 4 +- .../minifi-cpp/core/RepositoryMetricsSource.h | 2 + .../minifi-cpp/properties/Configuration.h | 1 + 10 files changed, 135 insertions(+), 124 deletions(-) diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp b/extensions/rocksdb-repos/DatabaseContentRepository.cpp index ddb7b0dc0f..9367041da2 100644 --- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp +++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp @@ -58,21 +58,38 @@ bool DatabaseContentRepository::initialize(const std::shared_ptr& db_opts) { + const auto cache_size = configuration->get(Configure::nifi_dbcontent_optimize_for_small_db_cache_size) + | utils::andThen([](const auto& cache_size_str) -> std::optional { + return parsing::parseDataSize(cache_size_str) | utils::toOptional(); + }); + + std::shared_ptr cache = nullptr; + if (cache_size) { + cache = rocksdb::NewLRUCache(*cache_size); + } + + auto set_db_opts = [encrypted_env, &cache] (minifi::internal::Writable& db_opts) { minifi::internal::setCommonRocksDbOptions(db_opts); if (encrypted_env) { db_opts.set(&rocksdb::DBOptions::env, encrypted_env.get(), EncryptionEq{}); } else { db_opts.set(&rocksdb::DBOptions::env, rocksdb::Env::Default()); } + if (cache) { + db_opts.call(&rocksdb::DBOptions::OptimizeForSmallDb, &cache); + } }; - auto set_cf_opts = [&configuration] (rocksdb::ColumnFamilyOptions& cf_opts) { + auto set_cf_opts = [&configuration, &cache] (rocksdb::ColumnFamilyOptions& cf_opts) { cf_opts.OptimizeForPointLookup(4); cf_opts.merge_operator = std::make_shared(); cf_opts.max_successive_merges = 0; if (auto compression_type = minifi::internal::readConfiguredCompressionType(configuration, Configure::nifi_content_repository_rocksdb_compression)) { cf_opts.compression = *compression_type; } + if (cache) { + cf_opts.OptimizeForSmallDb(&cache); + cf_opts.compression_opts.max_dict_bytes = 0; + } }; db_ = minifi::internal::RocksDatabase::create(set_db_opts, set_cf_opts, directory_, minifi::internal::getRocksDbOptionsToOverride(configuration, Configure::nifi_content_repository_rocksdb_options)); diff --git a/extensions/rocksdb-repos/database/OpenRocksDb.cpp b/extensions/rocksdb-repos/database/OpenRocksDb.cpp index c48c4b62c6..bedd82e02c 100644 --- a/extensions/rocksdb-repos/database/OpenRocksDb.cpp +++ b/extensions/rocksdb-repos/database/OpenRocksDb.cpp @@ -137,20 +137,44 @@ std::optional OpenRocksDb::getApproximateSizes() const { minifi::core::RepositoryMetricsSource::RocksDbStats OpenRocksDb::getStats() { minifi::core::RepositoryMetricsSource::RocksDbStats stats; - std::string table_readers; - GetProperty("rocksdb.estimate-table-readers-mem", &table_readers); - try { - stats.table_readers_size = std::stoull(table_readers); - } catch (const std::exception&) { - logger_->log_warn("Could not retrieve valid 'rocksdb.estimate-table-readers-mem' property value from rocksdb content repository!"); + { + std::string table_readers; + GetProperty("rocksdb.estimate-table-readers-mem", &table_readers); + try { + stats.table_readers_size = std::stoull(table_readers); + } catch (const std::exception&) { + logger_->log_warn("Could not retrieve valid 'rocksdb.estimate-table-readers-mem' property value from rocksdb content repository!"); + } + } + + { + std::string all_memtables; + GetProperty("rocksdb.cur-size-all-mem-tables", &all_memtables); + try { + stats.all_memory_tables_size = std::stoull(all_memtables); + } catch (const std::exception&) { + logger_->log_warn("Could not retrieve valid 'rocksdb.cur-size-all-mem-tables' property value from rocksdb content repository!"); + } } - std::string all_memtables; - GetProperty("rocksdb.cur-size-all-mem-tables", &all_memtables); - try { - stats.all_memory_tables_size = std::stoull(all_memtables); - } catch (const std::exception&) { - logger_->log_warn("Could not retrieve valid 'rocksdb.cur-size-all-mem-tables' property value from rocksdb content repository!"); + { + std::string block_cache_usage; + GetProperty("rocksdb.block-cache-usage", &block_cache_usage); + try { + stats.block_cache_usage = std::stoull(block_cache_usage); + } catch (const std::exception&) { + logger_->log_warn("Could not retrieve valid 'rocksdb.block-cache-usage' property value from rocksdb content repository!"); + } + } + + { + std::string block_cache_pinned_usage; + GetProperty("rocksdb.block-cache-pinned-usage", &block_cache_pinned_usage); + try { + stats.block_cache_pinned_usage = std::stoull(block_cache_pinned_usage); + } catch (const std::exception&) { + logger_->log_warn("Could not retrieve valid 'rocksdb.block-cache-pinned-usage' property value from rocksdb content repository!"); + } } return stats; diff --git a/extensions/rocksdb-repos/database/RocksDbUtils.h b/extensions/rocksdb-repos/database/RocksDbUtils.h index 627517577f..13687805d0 100644 --- a/extensions/rocksdb-repos/database/RocksDbUtils.h +++ b/extensions/rocksdb-repos/database/RocksDbUtils.h @@ -49,6 +49,12 @@ class Writable { } } + template + decltype(auto) call(Method method, Args&&... args) { + is_modified_ = true; + return std::invoke(method, target_, std::forward(args)...); + } + template const F& get(F T::* member) { return target_.*member; diff --git a/libminifi/src/core/state/nodes/RepositoryMetricsSourceStore.cpp b/libminifi/src/core/state/nodes/RepositoryMetricsSourceStore.cpp index 0d4789f8c5..ef414837ac 100644 --- a/libminifi/src/core/state/nodes/RepositoryMetricsSourceStore.cpp +++ b/libminifi/src/core/state/nodes/RepositoryMetricsSourceStore.cpp @@ -48,6 +48,8 @@ std::vector RepositoryMetricsSourceStore::serialize() co if (auto rocksdb_stats = repo->getRocksDbStats()) { parent.children.push_back({.name = "rocksDbTableReadersSize", .value = rocksdb_stats->table_readers_size}); parent.children.push_back({.name = "rocksDbAllMemoryTablesSize", .value = rocksdb_stats->all_memory_tables_size}); + parent.children.push_back({.name = "rocksDbBlockCacheUsage", .value = rocksdb_stats->block_cache_usage}); + parent.children.push_back({.name = "rocksDbBlockCachePinnedUsage", .value = rocksdb_stats->block_cache_pinned_usage}); } serialized.push_back(parent); @@ -68,6 +70,10 @@ std::vector RepositoryMetricsSourceStore::calculateMetrics() co {{"metric_class", name_}, {"repository_name", repo->getRepositoryName()}}}); metrics.push_back({"rocksdb_all_memory_tables_size_bytes", static_cast(rocksdb_stats->all_memory_tables_size), {{"metric_class", name_}, {"repository_name", repo->getRepositoryName()}}}); + metrics.push_back({"rocksdb_block_cache_usage_bytes", static_cast(rocksdb_stats->block_cache_usage), + {{"metric_class", name_}, {"repository_name", repo->getRepositoryName()}}}); + metrics.push_back({"rocksdb_block_cache_pinned_usage_bytes", static_cast(rocksdb_stats->block_cache_pinned_usage), + {{"metric_class", name_}, {"repository_name", repo->getRepositoryName()}}}); } } return metrics; diff --git a/libminifi/test/libtest/unit/ProvenanceTestHelper.h b/libminifi/test/libtest/unit/ProvenanceTestHelper.h index 2b5b70bca2..4da9cea147 100644 --- a/libminifi/test/libtest/unit/ProvenanceTestHelper.h +++ b/libminifi/test/libtest/unit/ProvenanceTestHelper.h @@ -161,7 +161,9 @@ class TestRocksDbRepository : public TestThreadedRepository { std::optional getRocksDbStats() const override { return RocksDbStats { .table_readers_size = 100, - .all_memory_tables_size = 200 + .all_memory_tables_size = 200, + .block_cache_usage = 85, + .block_cache_pinned_usage = 50 }; } }; diff --git a/libminifi/test/libtest/unit/TestUtils.h b/libminifi/test/libtest/unit/TestUtils.h index 64af75c7c2..d113de8360 100644 --- a/libminifi/test/libtest/unit/TestUtils.h +++ b/libminifi/test/libtest/unit/TestUtils.h @@ -127,6 +127,12 @@ bool verifyLogLineVariantPresenceInPollTime(const std::chrono::duration +bool verifyLogMatchesRegexInPollTime(const std::chrono::duration& wait_duration, const std::string& regex) { + auto check = [®ex] { return LogTestController::getInstance().matchesRegex(regex); }; + return verifyEventHappenedInPollTime(wait_duration, check); +} + namespace internal { struct JsonContext { const JsonContext *parent{nullptr}; diff --git a/libminifi/test/unit/LogMetricsPublisherTests.cpp b/libminifi/test/unit/LogMetricsPublisherTests.cpp index abf8d55092..e9f2aae792 100644 --- a/libminifi/test/unit/LogMetricsPublisherTests.cpp +++ b/libminifi/test/unit/LogMetricsPublisherTests.cpp @@ -71,9 +71,8 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Logging interval property is mandator } SECTION("Logging interval is set to 2 seconds") { configuration_->set(minifi::Configuration::nifi_metrics_publisher_log_metrics_logging_interval, "2s"); - using org::apache::nifi::minifi::test::utils::verifyLogLinePresenceInPollTime; publisher_->initialize(configuration_, response_node_loader_); - REQUIRE(verifyLogLinePresenceInPollTime(5s, "Metric logging interval is set to 2000ms")); + REQUIRE(utils::verifyLogLinePresenceInPollTime(5s, "Metric logging interval is set to 2000ms")); } } @@ -86,8 +85,49 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify empty metrics if no valid metr } publisher_->initialize(configuration_, response_node_loader_); publisher_->loadMetricNodes(); - using org::apache::nifi::minifi::test::utils::verifyLogLinePresenceInPollTime; - REQUIRE(verifyLogLinePresenceInPollTime(5s, "LogMetricsPublisher is configured without any valid metrics!")); + REQUIRE(utils::verifyLogLinePresenceInPollTime(5s, "LogMetricsPublisher is configured without any valid metrics!")); +} + +std::string expectedRepositoryMetricsRegex(std::string level) { + std::string regex_pattern = + // 1. Log Header & Opening + R"(\[[\d\-\s\:\.]+\]\s*\[[^\]]+\]\s*\[)" + std::move(level) + R"(\]\s*\{\s*)" + + R"(\"LogMetrics\":\s*\{\s*)" + + + // 2. (skips other nodes, stops if a \n[ starts (other log line)) + R"((?:(?!\n\[)[\s\S])*?)" + + + // 3. RepositoryMetrics Node + R"(\"RepositoryMetrics\":\s*\{\s*)" + + + // 4. Provenance Repository + R"(\"provenancerepository\":\s*\{\s*)" + + R"(\"running\":\s*\"false\",\s*)" + + R"(\"full\":\s*\"false\",\s*)" + + R"(\"size\":\s*\"0\",\s*)" + + R"(\s*\"maxSize\":\s*\"0\",\s*)" + + R"(\"entryCount\":\s*\"0\",\s*)" + + R"(\"rocksDbTableReadersSize\":\s*\"0\",\s*)" + + R"(\"rocksDbAllMemoryTablesSize\":\s*\"2048\",\s*)" + + R"(\"rocksDbBlockCacheUsage\":\s*\"\d+\",\s*)" + + R"(\"rocksDbBlockCachePinnedUsage\":\s*\"\d+\"\s*)" + + R"(\},\s*)" + + + // 5. FlowFile Repository + R"(\"flowfilerepository\":\s*\{\s*)" + + R"(\"running\":\s*\"false\",\s*)" + + R"(\"full\":\s*\"false\",\s*)" + + R"(\"size\":\s*\"0\",\s*)" + + R"(\"maxSize\":\s*\"0\",\s*)" + + R"(\"entryCount\":\s*\"0\",\s*)" + + R"(\"rocksDbTableReadersSize\":\s*\"0\",\s*)" + + R"(\"rocksDbAllMemoryTablesSize\":\s*\"2048\",\s*)" + + R"(\"rocksDbBlockCacheUsage\":\s*\"\d+\",\s*)" + + R"(\"rocksDbBlockCachePinnedUsage\":\s*\"\d+\"\s*)" + + + // 6. Final Closures + R"(\}\s*\}\s*\}\s*\})"; + return regex_pattern; } TEST_CASE_METHOD(LogPublisherTestFixture, "Verify multiple metric nodes in logs", "[LogMetricsPublisher]") { @@ -96,34 +136,13 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify multiple metric nodes in logs" configuration_->set(Configure::nifi_metrics_publisher_metrics, "RepositoryMetrics,DeviceInfoNode"); publisher_->initialize(configuration_, response_node_loader_); publisher_->loadMetricNodes(); - using org::apache::nifi::minifi::test::utils::verifyLogLinePresenceInPollTime; std::string expected_log_1 = R"([info] { "LogMetrics": {)"; - std::string expected_log_2 = R"("RepositoryMetrics": { - "provenancerepository": { - "running": "false", - "full": "false", - "size": "0", - "maxSize": "0", - "entryCount": "0", - "rocksDbTableReadersSize": "0", - "rocksDbAllMemoryTablesSize": "2048" - }, - "flowfilerepository": { - "running": "false", - "full": "false", - "size": "0", - "maxSize": "0", - "entryCount": "0", - "rocksDbTableReadersSize": "0", - "rocksDbAllMemoryTablesSize": "2048" - } - })"; std::string expected_log_3 = R"("deviceInfo": { "identifier":)"; - REQUIRE(verifyLogLinePresenceInPollTime(5s, expected_log_1)); - REQUIRE(verifyLogLinePresenceInPollTime(5s, expected_log_2)); - REQUIRE(verifyLogLinePresenceInPollTime(5s, expected_log_3)); + REQUIRE(utils::verifyLogLinePresenceInPollTime(5s, expected_log_1)); + REQUIRE(utils::verifyLogMatchesRegexInPollTime(5s, expectedRepositoryMetricsRegex("info"))); + REQUIRE(utils::verifyLogLinePresenceInPollTime(5s, expected_log_3)); } TEST_CASE_METHOD(LogPublisherTestFixture, "Verify reloading different metrics", "[LogMetricsPublisher]") { @@ -132,42 +151,18 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify reloading different metrics", configuration_->set(Configure::nifi_metrics_publisher_metrics, "RepositoryMetrics"); publisher_->initialize(configuration_, response_node_loader_); publisher_->loadMetricNodes(); - using org::apache::nifi::minifi::test::utils::verifyLogLinePresenceInPollTime; - std::string expected_log = R"([info] { - "LogMetrics": { - "RepositoryMetrics": { - "provenancerepository": { - "running": "false", - "full": "false", - "size": "0", - "maxSize": "0", - "entryCount": "0", - "rocksDbTableReadersSize": "0", - "rocksDbAllMemoryTablesSize": "2048" - }, - "flowfilerepository": { - "running": "false", - "full": "false", - "size": "0", - "maxSize": "0", - "entryCount": "0", - "rocksDbTableReadersSize": "0", - "rocksDbAllMemoryTablesSize": "2048" - } - } - } -})"; - REQUIRE(verifyLogLinePresenceInPollTime(5s, expected_log)); + + REQUIRE(utils::verifyLogMatchesRegexInPollTime(5s, expectedRepositoryMetricsRegex("info"))); publisher_->clearMetricNodes(); LogTestController::getInstance().reset(); LogTestController::getInstance().setTrace(); configuration_->set(Configure::nifi_metrics_publisher_metrics, "DeviceInfoNode"); publisher_->loadMetricNodes(); - expected_log = R"([info] { + std::string expected_log = R"([info] { "LogMetrics": { "deviceInfo": { "identifier":)"; - REQUIRE(verifyLogLinePresenceInPollTime(5s, expected_log)); + REQUIRE(utils::verifyLogLinePresenceInPollTime(5s, expected_log)); } TEST_CASE_METHOD(LogPublisherTestFixture, "Verify generic and publisher specific metric properties", "[LogMetricsPublisher]") { @@ -185,32 +180,7 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify generic and publisher specific } publisher_->initialize(configuration_, response_node_loader_); publisher_->loadMetricNodes(); - using org::apache::nifi::minifi::test::utils::verifyLogLinePresenceInPollTime; - std::string expected_log = R"([info] { - "LogMetrics": { - "RepositoryMetrics": { - "provenancerepository": { - "running": "false", - "full": "false", - "size": "0", - "maxSize": "0", - "entryCount": "0", - "rocksDbTableReadersSize": "0", - "rocksDbAllMemoryTablesSize": "2048" - }, - "flowfilerepository": { - "running": "false", - "full": "false", - "size": "0", - "maxSize": "0", - "entryCount": "0", - "rocksDbTableReadersSize": "0", - "rocksDbAllMemoryTablesSize": "2048" - } - } - } -})"; - REQUIRE(verifyLogLinePresenceInPollTime(5s, expected_log)); + REQUIRE(utils::verifyLogMatchesRegexInPollTime(5s, expectedRepositoryMetricsRegex("info"))); } TEST_CASE_METHOD(LogPublisherTestFixture, "Verify changing log level property for logging", "[LogMetricsPublisher]") { @@ -220,32 +190,7 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify changing log level property fo configuration_->set(Configure::nifi_metrics_publisher_metrics, "RepositoryMetrics"); publisher_->initialize(configuration_, response_node_loader_); publisher_->loadMetricNodes(); - using org::apache::nifi::minifi::test::utils::verifyLogLinePresenceInPollTime; - std::string expected_log = R"([debug] { - "LogMetrics": { - "RepositoryMetrics": { - "provenancerepository": { - "running": "false", - "full": "false", - "size": "0", - "maxSize": "0", - "entryCount": "0", - "rocksDbTableReadersSize": "0", - "rocksDbAllMemoryTablesSize": "2048" - }, - "flowfilerepository": { - "running": "false", - "full": "false", - "size": "0", - "maxSize": "0", - "entryCount": "0", - "rocksDbTableReadersSize": "0", - "rocksDbAllMemoryTablesSize": "2048" - } - } - } -})"; - REQUIRE(verifyLogLinePresenceInPollTime(5s, expected_log)); + REQUIRE(utils::verifyLogMatchesRegexInPollTime(5s, expectedRepositoryMetricsRegex("debug"))); } } // namespace org::apache::nifi::minifi::test diff --git a/libminifi/test/unit/MetricsTests.cpp b/libminifi/test/unit/MetricsTests.cpp index 25840d2e66..4fe02defd4 100644 --- a/libminifi/test/unit/MetricsTests.cpp +++ b/libminifi/test/unit/MetricsTests.cpp @@ -104,7 +104,7 @@ TEST_CASE("RepositorymetricsHaveRepo", "[c2m4]") { SECTION("RocksDB repository") { repo = std::make_shared(); - expected_metric_count = 7; + expected_metric_count = 9; } @@ -125,6 +125,8 @@ TEST_CASE("RepositorymetricsHaveRepo", "[c2m4]") { if (expected_metric_count > 5) { checkSerializedValue(resp.children, "rocksDbTableReadersSize", "100"); checkSerializedValue(resp.children, "rocksDbAllMemoryTablesSize", "200"); + checkSerializedValue(resp.children, "rocksDbBlockCacheUsage", "85"); + checkSerializedValue(resp.children, "rocksDbBlockCachePinnedUsage", "50"); } } diff --git a/minifi-api/include/minifi-cpp/core/RepositoryMetricsSource.h b/minifi-api/include/minifi-cpp/core/RepositoryMetricsSource.h index ca55e461c6..21f2308d17 100644 --- a/minifi-api/include/minifi-cpp/core/RepositoryMetricsSource.h +++ b/minifi-api/include/minifi-cpp/core/RepositoryMetricsSource.h @@ -28,6 +28,8 @@ class RepositoryMetricsSource { struct RocksDbStats { uint64_t table_readers_size{}; uint64_t all_memory_tables_size{}; + uint64_t block_cache_usage{}; + uint64_t block_cache_pinned_usage{}; }; virtual ~RepositoryMetricsSource() = default; diff --git a/minifi-api/include/minifi-cpp/properties/Configuration.h b/minifi-api/include/minifi-cpp/properties/Configuration.h index 3fccff2c28..0ea5f4ca53 100644 --- a/minifi-api/include/minifi-cpp/properties/Configuration.h +++ b/minifi-api/include/minifi-cpp/properties/Configuration.h @@ -76,6 +76,7 @@ class Configuration : public virtual Properties { static constexpr const char *nifi_flowfile_repository_rocksdb_read_verify_checksums = "nifi.flowfile.repository.rocksdb.read.verify.checksums"; static constexpr const char *nifi_provenance_repository_rocksdb_read_verify_checksums = "nifi.provenance.repository.rocksdb.read.verify.checksums"; static constexpr const char *nifi_rocksdb_state_storage_read_verify_checksums = "nifi.rocksdb.state.storage.read.verify.checksums"; + static constexpr const char *nifi_dbcontent_optimize_for_small_db_cache_size = "nifi.database.content.repository.optimize.for.small.db.cache.size"; static constexpr const char *nifi_remote_input_secure = "nifi.remote.input.secure"; static constexpr const char *nifi_security_need_ClientAuth = "nifi.security.need.ClientAuth"; From d8be6915a9a915c7a12a90649a444cbd01003c61 Mon Sep 17 00:00:00 2001 From: Martin Zink Date: Thu, 18 Dec 2025 13:34:30 +0100 Subject: [PATCH 2/5] extract fillU64FromProperty --- .../rocksdb-repos/database/OpenRocksDb.cpp | 52 +++++-------------- .../rocksdb-repos/database/OpenRocksDb.h | 2 + 2 files changed, 16 insertions(+), 38 deletions(-) diff --git a/extensions/rocksdb-repos/database/OpenRocksDb.cpp b/extensions/rocksdb-repos/database/OpenRocksDb.cpp index bedd82e02c..9989ab3bd8 100644 --- a/extensions/rocksdb-repos/database/OpenRocksDb.cpp +++ b/extensions/rocksdb-repos/database/OpenRocksDb.cpp @@ -135,47 +135,23 @@ std::optional OpenRocksDb::getApproximateSizes() const { return std::nullopt; } -minifi::core::RepositoryMetricsSource::RocksDbStats OpenRocksDb::getStats() { - minifi::core::RepositoryMetricsSource::RocksDbStats stats; - { - std::string table_readers; - GetProperty("rocksdb.estimate-table-readers-mem", &table_readers); - try { - stats.table_readers_size = std::stoull(table_readers); - } catch (const std::exception&) { - logger_->log_warn("Could not retrieve valid 'rocksdb.estimate-table-readers-mem' property value from rocksdb content repository!"); - } +void OpenRocksDb::fillU64FromProperty(uint64_t& member, std::string_view property_name) { + std::string property_value; + GetProperty(property_name, &property_value); + try { + member = std::stoull(property_value); + } catch (const std::exception&) { + logger_->log_warn("Could not retrieve valid '{}' property value from rocksdb content repository!", property_name); } +} - { - std::string all_memtables; - GetProperty("rocksdb.cur-size-all-mem-tables", &all_memtables); - try { - stats.all_memory_tables_size = std::stoull(all_memtables); - } catch (const std::exception&) { - logger_->log_warn("Could not retrieve valid 'rocksdb.cur-size-all-mem-tables' property value from rocksdb content repository!"); - } - } - { - std::string block_cache_usage; - GetProperty("rocksdb.block-cache-usage", &block_cache_usage); - try { - stats.block_cache_usage = std::stoull(block_cache_usage); - } catch (const std::exception&) { - logger_->log_warn("Could not retrieve valid 'rocksdb.block-cache-usage' property value from rocksdb content repository!"); - } - } - - { - std::string block_cache_pinned_usage; - GetProperty("rocksdb.block-cache-pinned-usage", &block_cache_pinned_usage); - try { - stats.block_cache_pinned_usage = std::stoull(block_cache_pinned_usage); - } catch (const std::exception&) { - logger_->log_warn("Could not retrieve valid 'rocksdb.block-cache-pinned-usage' property value from rocksdb content repository!"); - } - } +minifi::core::RepositoryMetricsSource::RocksDbStats OpenRocksDb::getStats() { + minifi::core::RepositoryMetricsSource::RocksDbStats stats; + fillU64FromProperty(stats.table_readers_size, "rocksdb.estimate-table-readers-mem"); + fillU64FromProperty(stats.all_memory_tables_size, "rocksdb.cur-size-all-mem-tables"); + fillU64FromProperty(stats.block_cache_usage, "rocksdb.block-cache-usage"); + fillU64FromProperty(stats.block_cache_pinned_usage, "rocksdb.block-cache-pinned-usage"); return stats; } diff --git a/extensions/rocksdb-repos/database/OpenRocksDb.h b/extensions/rocksdb-repos/database/OpenRocksDb.h index b97fe880a1..dfd93b28cd 100644 --- a/extensions/rocksdb-repos/database/OpenRocksDb.h +++ b/extensions/rocksdb-repos/database/OpenRocksDb.h @@ -81,6 +81,8 @@ class OpenRocksDb { void handleResult(const rocksdb::Status& result); void handleResult(const std::vector& results); + void fillU64FromProperty(uint64_t& member, std::string_view property_name); + gsl::not_null db_; gsl::not_null> impl_; gsl::not_null> column_; From f632b6ef58c855856d282c5337431a216e127bb3 Mon Sep 17 00:00:00 2001 From: Martin Zink Date: Mon, 5 Jan 2026 15:19:31 +0100 Subject: [PATCH 3/5] refactor LogMetricsPublisherTests so it doesn't really on huge regex_match --- libminifi/test/libtest/unit/TestUtils.h | 59 +++++++- .../test/unit/LogMetricsPublisherTests.cpp | 137 ++++++++++-------- 2 files changed, 133 insertions(+), 63 deletions(-) diff --git a/libminifi/test/libtest/unit/TestUtils.h b/libminifi/test/libtest/unit/TestUtils.h index d113de8360..f312fe00a3 100644 --- a/libminifi/test/libtest/unit/TestUtils.h +++ b/libminifi/test/libtest/unit/TestUtils.h @@ -37,10 +37,8 @@ #include "asio.hpp" #include "asio/ssl.hpp" #include "utils/net/Ssl.h" -#include "range/v3/algorithm/any_of.hpp" #include "core/Processor.h" -#include "core/logging/LoggerFactory.h" -#include "./ProcessorUtils.h" +#include using namespace std::literals::chrono_literals; @@ -240,6 +238,61 @@ inline bool runningAsUnixRoot() { #endif } +struct LogMessageView { + std::string_view timestamp; + std::string_view logger_class; + std::string_view log_level; + std::string_view payload; +}; + +inline std::vector extractLogMessageViews(const std::string& log_str) { + std::vector messages; + const std::regex header_pattern(R"((\[[\d\-\s\:\.]+\]) (\s*\[[^\]]+\]) \[(.*)\])"); + const auto search_range = std::ranges::subrange(std::sregex_iterator(log_str.begin(), log_str.end(), header_pattern), std::sregex_iterator()); + struct MsgMarker { + size_t start; + size_t timestamp_start; + size_t timestamp_length; + size_t logger_class_start; + size_t logger_class_length; + size_t log_level_start; + size_t log_level_length; + }; + + std::vector markers = ranges::subrange(std::sregex_iterator(log_str.begin(), log_str.end(), header_pattern), + std::sregex_iterator()) | + ranges::views::transform([=](const std::smatch& m) { + return MsgMarker{.start = static_cast(m.position(0)), + .timestamp_start = static_cast(m.position(1)), + .timestamp_length = static_cast(m.length(1)), + .logger_class_start = static_cast(m.position(2)), + .logger_class_length = static_cast(m.length(2)), + .log_level_start = static_cast(m.position(3)), + .log_level_length = static_cast(m.length(3))}; + }) | + ranges::to(); + + markers.push_back(MsgMarker{.start = log_str.size(), + .timestamp_start = log_str.size(), + .timestamp_length = 0, + .logger_class_start = log_str.size(), + .logger_class_length = 0, + .log_level_start = log_str.size(), + .log_level_length = 0}); + + for (auto window: markers | ranges::views::sliding(2)) { + const std::string_view timestamp{log_str.data() + window[0].timestamp_start, window[0].timestamp_length}; + const std::string_view logger_class{log_str.data() + window[0].logger_class_start, window[0].logger_class_length}; + const std::string_view log_level{log_str.data() + window[0].log_level_start, window[0].log_level_length}; + const size_t msg_start_pos = window[0].log_level_start + window[0].log_level_length + 2; + const size_t msg_length = window[1].start >= msg_start_pos ? window[1].start - msg_start_pos : 0; + const std::string_view message{log_str.data() + msg_start_pos, msg_length}; + messages.push_back(LogMessageView{.timestamp = timestamp, .logger_class = logger_class, .log_level = log_level, .payload = message}); + } + + return messages; +} + } // namespace org::apache::nifi::minifi::test::utils namespace Catch { diff --git a/libminifi/test/unit/LogMetricsPublisherTests.cpp b/libminifi/test/unit/LogMetricsPublisherTests.cpp index e9f2aae792..d2250f5005 100644 --- a/libminifi/test/unit/LogMetricsPublisherTests.cpp +++ b/libminifi/test/unit/LogMetricsPublisherTests.cpp @@ -18,11 +18,11 @@ #include #include -#include "unit/TestBase.h" -#include "unit/Catch.h" +#include "core/RepositoryFactory.h" #include "core/state/LogMetricsPublisher.h" #include "core/state/nodes/ResponseNodeLoader.h" -#include "core/RepositoryFactory.h" +#include "unit/Catch.h" +#include "unit/TestBase.h" #include "unit/TestUtils.h" #include "utils/file/FileUtils.h" @@ -33,12 +33,12 @@ namespace org::apache::nifi::minifi::test { class LogPublisherTestFixture { public: LogPublisherTestFixture() - : configuration_(std::make_shared()), - provenance_repo_(core::createRepository("provenancerepository", "provenancerepository")), - flow_file_repo_(core::createRepository("flowfilerepository", "flowfilerepository")), - response_node_loader_(std::make_shared(configuration_, - std::vector>{provenance_repo_, flow_file_repo_}, nullptr)), - publisher_(std::make_unique("LogMetricsPublisher")) { + : configuration_(std::make_shared()), + provenance_repo_(core::createRepository("provenancerepository", "provenancerepository")), + flow_file_repo_(core::createRepository("flowfilerepository", "flowfilerepository")), + response_node_loader_(std::make_shared(configuration_, + std::vector>{provenance_repo_, flow_file_repo_}, nullptr)), + publisher_(std::make_unique("LogMetricsPublisher")) { provenance_repo_->initialize(configuration_); flow_file_repo_->initialize(configuration_); } @@ -67,7 +67,8 @@ class LogPublisherTestFixture { TEST_CASE_METHOD(LogPublisherTestFixture, "Logging interval property is mandatory", "[LogMetricsPublisher]") { LogTestController::getInstance().setTrace(); SECTION("No logging interval is set") { - REQUIRE_THROWS_WITH(publisher_->initialize(configuration_, response_node_loader_), "General Operation: Metrics logging interval not configured for log metrics publisher!"); + REQUIRE_THROWS_WITH(publisher_->initialize(configuration_, response_node_loader_), + "General Operation: Metrics logging interval not configured for log metrics publisher!"); } SECTION("Logging interval is set to 2 seconds") { configuration_->set(minifi::Configuration::nifi_metrics_publisher_log_metrics_logging_interval, "2s"); @@ -88,46 +89,40 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify empty metrics if no valid metr REQUIRE(utils::verifyLogLinePresenceInPollTime(5s, "LogMetricsPublisher is configured without any valid metrics!")); } -std::string expectedRepositoryMetricsRegex(std::string level) { - std::string regex_pattern = - // 1. Log Header & Opening - R"(\[[\d\-\s\:\.]+\]\s*\[[^\]]+\]\s*\[)" + std::move(level) + R"(\]\s*\{\s*)" + - R"(\"LogMetrics\":\s*\{\s*)" + - - // 2. (skips other nodes, stops if a \n[ starts (other log line)) - R"((?:(?!\n\[)[\s\S])*?)" + - - // 3. RepositoryMetrics Node - R"(\"RepositoryMetrics\":\s*\{\s*)" + - - // 4. Provenance Repository - R"(\"provenancerepository\":\s*\{\s*)" + - R"(\"running\":\s*\"false\",\s*)" + - R"(\"full\":\s*\"false\",\s*)" + - R"(\"size\":\s*\"0\",\s*)" + - R"(\s*\"maxSize\":\s*\"0\",\s*)" + - R"(\"entryCount\":\s*\"0\",\s*)" + - R"(\"rocksDbTableReadersSize\":\s*\"0\",\s*)" + - R"(\"rocksDbAllMemoryTablesSize\":\s*\"2048\",\s*)" + - R"(\"rocksDbBlockCacheUsage\":\s*\"\d+\",\s*)" + - R"(\"rocksDbBlockCachePinnedUsage\":\s*\"\d+\"\s*)" + - R"(\},\s*)" + - - // 5. FlowFile Repository - R"(\"flowfilerepository\":\s*\{\s*)" + - R"(\"running\":\s*\"false\",\s*)" + - R"(\"full\":\s*\"false\",\s*)" + - R"(\"size\":\s*\"0\",\s*)" + - R"(\"maxSize\":\s*\"0\",\s*)" + - R"(\"entryCount\":\s*\"0\",\s*)" + - R"(\"rocksDbTableReadersSize\":\s*\"0\",\s*)" + - R"(\"rocksDbAllMemoryTablesSize\":\s*\"2048\",\s*)" + - R"(\"rocksDbBlockCacheUsage\":\s*\"\d+\",\s*)" + - R"(\"rocksDbBlockCachePinnedUsage\":\s*\"\d+\"\s*)" + - - // 6. Final Closures - R"(\}\s*\}\s*\}\s*\})"; - return regex_pattern; +bool check_exact_metrics_value(const rapidjson::Value& repo_metrics, const std::string_view key, const std::string_view expected_value) { + const auto key_ref = rapidjson::StringRef(key.data(), key.size()); + const auto member_it = repo_metrics.FindMember(key_ref); + if (member_it == repo_metrics.MemberEnd()) { return false; } + const auto actual_value = std::string_view{member_it->value.GetString(), member_it->value.GetStringLength()}; + return actual_value == expected_value; +} + +bool isExpectedRepositoryMetricsLogMessage(const utils::LogMessageView& message_view, std::string_view log_level) { + if (message_view.log_level != log_level) { return false; } + if (message_view.logger_class != "[org::apache::nifi::minifi::state::LogMetricsPublisher]") { return false; } + rapidjson::Document document; + if (const rapidjson::ParseResult res = document.Parse(message_view.payload.data(), message_view.payload.length()); !res) { return false; } + if (!document.HasMember("LogMetrics")) { return false; } + const auto& log_metrics = document["LogMetrics"].GetObject(); + if (!log_metrics.HasMember("RepositoryMetrics")) { return false; } + const auto& repository_metrics = log_metrics["RepositoryMetrics"].GetObject(); + if (!repository_metrics.HasMember("provenancerepository") || !repository_metrics.HasMember("flowfilerepository")) { return false; } + const rapidjson::Value& provenance_repo_metrics = repository_metrics["provenancerepository"]; + const rapidjson::Value& flow_file_repo_metrics = repository_metrics["flowfilerepository"]; + + const auto repo_is_okay = [](const rapidjson::Value& repo_metrics) -> bool { + return check_exact_metrics_value(repo_metrics, "full", "false") + && check_exact_metrics_value(repo_metrics, "running", "false") + && check_exact_metrics_value(repo_metrics, "size", "0") + && check_exact_metrics_value(repo_metrics, "maxSize", "0") + && check_exact_metrics_value(repo_metrics, "entryCount", "0") + && check_exact_metrics_value(repo_metrics, "rocksDbTableReadersSize", "0") + && check_exact_metrics_value(repo_metrics, "rocksDbAllMemoryTablesSize", "2048") + && repo_metrics.HasMember("rocksDbBlockCacheUsage") + && repo_metrics.HasMember("rocksDbBlockCachePinnedUsage"); + }; + + return repo_is_okay(provenance_repo_metrics) && repo_is_okay(flow_file_repo_metrics); } TEST_CASE_METHOD(LogPublisherTestFixture, "Verify multiple metric nodes in logs", "[LogMetricsPublisher]") { @@ -136,13 +131,14 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify multiple metric nodes in logs" configuration_->set(Configure::nifi_metrics_publisher_metrics, "RepositoryMetrics,DeviceInfoNode"); publisher_->initialize(configuration_, response_node_loader_); publisher_->loadMetricNodes(); - std::string expected_log_1 = R"([info] { - "LogMetrics": {)"; - std::string expected_log_3 = R"("deviceInfo": { - "identifier":)"; - REQUIRE(utils::verifyLogLinePresenceInPollTime(5s, expected_log_1)); - REQUIRE(utils::verifyLogMatchesRegexInPollTime(5s, expectedRepositoryMetricsRegex("info"))); - REQUIRE(utils::verifyLogLinePresenceInPollTime(5s, expected_log_3)); + REQUIRE(utils::verifyEventHappenedInPollTime( + 5s, + [] { + const auto logs = LogTestController::getInstance().getLogs(); + const auto message_views = utils::extractLogMessageViews(logs); + return ranges::any_of(message_views, [](const auto& msg_view) { return isExpectedRepositoryMetricsLogMessage(msg_view, "info"); }); + }, + 100ms)); } TEST_CASE_METHOD(LogPublisherTestFixture, "Verify reloading different metrics", "[LogMetricsPublisher]") { @@ -152,7 +148,14 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify reloading different metrics", publisher_->initialize(configuration_, response_node_loader_); publisher_->loadMetricNodes(); - REQUIRE(utils::verifyLogMatchesRegexInPollTime(5s, expectedRepositoryMetricsRegex("info"))); + REQUIRE(utils::verifyEventHappenedInPollTime( + 5s, + [] { + const auto logs = LogTestController::getInstance().getLogs(); + const auto message_views = utils::extractLogMessageViews(logs); + return ranges::any_of(message_views, [](const auto& msg_view) { return isExpectedRepositoryMetricsLogMessage(msg_view, "info"); }); + }, + 100ms)); publisher_->clearMetricNodes(); LogTestController::getInstance().reset(); LogTestController::getInstance().setTrace(); @@ -180,7 +183,14 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify generic and publisher specific } publisher_->initialize(configuration_, response_node_loader_); publisher_->loadMetricNodes(); - REQUIRE(utils::verifyLogMatchesRegexInPollTime(5s, expectedRepositoryMetricsRegex("info"))); + REQUIRE(utils::verifyEventHappenedInPollTime( + 5s, + [] { + const auto logs = LogTestController::getInstance().getLogs(); + const auto message_views = utils::extractLogMessageViews(logs); + return ranges::any_of(message_views, [](const auto& msg_view) { return isExpectedRepositoryMetricsLogMessage(msg_view, "info"); }); + }, + 100ms)); } TEST_CASE_METHOD(LogPublisherTestFixture, "Verify changing log level property for logging", "[LogMetricsPublisher]") { @@ -190,7 +200,14 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify changing log level property fo configuration_->set(Configure::nifi_metrics_publisher_metrics, "RepositoryMetrics"); publisher_->initialize(configuration_, response_node_loader_); publisher_->loadMetricNodes(); - REQUIRE(utils::verifyLogMatchesRegexInPollTime(5s, expectedRepositoryMetricsRegex("debug"))); + REQUIRE(utils::verifyEventHappenedInPollTime( + 5s, + [] { + const auto logs = LogTestController::getInstance().getLogs(); + const auto message_views = utils::extractLogMessageViews(logs); + return ranges::any_of(message_views, [](const auto& msg_view) { return isExpectedRepositoryMetricsLogMessage(msg_view, "debug"); }); + }, + 100ms)); } } // namespace org::apache::nifi::minifi::test From a5a3cd119e5d1e7cc1ac91e9b3225e271363a185 Mon Sep 17 00:00:00 2001 From: Martin Zink Date: Mon, 5 Jan 2026 16:16:52 +0100 Subject: [PATCH 4/5] refactor extractLogMessageViews --- libminifi/test/libtest/unit/TestUtils.cpp | 39 ++++++++++++++++++ libminifi/test/libtest/unit/TestUtils.h | 48 +---------------------- 2 files changed, 40 insertions(+), 47 deletions(-) diff --git a/libminifi/test/libtest/unit/TestUtils.cpp b/libminifi/test/libtest/unit/TestUtils.cpp index c56f44f755..6442259122 100644 --- a/libminifi/test/libtest/unit/TestUtils.cpp +++ b/libminifi/test/libtest/unit/TestUtils.cpp @@ -260,4 +260,43 @@ std::error_code sendMessagesViaSSL(const std::vector& contents return {}; } +std::vector extractLogMessageViews(const std::string& log_str) { + std::vector messages; + const std::regex header_pattern(R"((\[[\d\-\s\:\.]+\]) (\s*\[[^\]]+\]) \[(.*)\])"); + struct HeaderMarker { + size_t start; + std::string_view timestamp; + std::string_view logger_class; + std::string_view log_level; + size_t end; + }; + + std::vector markers = ranges::subrange(std::sregex_iterator(log_str.begin(), log_str.end(), header_pattern), + std::sregex_iterator()) | + ranges::views::transform([=](const std::smatch& m) { + return HeaderMarker{.start = static_cast(m.position(0)), + .timestamp = std::string_view{log_str.data() + m.position(1), static_cast(m.length(1))}, + .logger_class = std::string_view{log_str.data() + m.position(2), static_cast(m.length(2))}, + .log_level = std::string_view{log_str.data() + m.position(3), static_cast(m.length(3))}, + .end = static_cast(m.position(0) + m.length(0)) + }; + }) | ranges::to(); + + markers.push_back(HeaderMarker{.start = log_str.size(), + .timestamp = {}, + .logger_class = {}, + .log_level = {}, + .end = log_str.size() + }); + + for (auto window: markers | ranges::views::sliding(2)) { + messages.push_back(LogMessageView{.timestamp = window[0].timestamp, + .logger_class = window[0].logger_class, + .log_level = window[0].log_level, + .payload = {log_str.data() + window[0].end, window[1].start - window[0].end}}); + } + + return messages; +} + } // namespace org::apache::nifi::minifi::test::utils diff --git a/libminifi/test/libtest/unit/TestUtils.h b/libminifi/test/libtest/unit/TestUtils.h index f312fe00a3..faeb196607 100644 --- a/libminifi/test/libtest/unit/TestUtils.h +++ b/libminifi/test/libtest/unit/TestUtils.h @@ -245,53 +245,7 @@ struct LogMessageView { std::string_view payload; }; -inline std::vector extractLogMessageViews(const std::string& log_str) { - std::vector messages; - const std::regex header_pattern(R"((\[[\d\-\s\:\.]+\]) (\s*\[[^\]]+\]) \[(.*)\])"); - const auto search_range = std::ranges::subrange(std::sregex_iterator(log_str.begin(), log_str.end(), header_pattern), std::sregex_iterator()); - struct MsgMarker { - size_t start; - size_t timestamp_start; - size_t timestamp_length; - size_t logger_class_start; - size_t logger_class_length; - size_t log_level_start; - size_t log_level_length; - }; - - std::vector markers = ranges::subrange(std::sregex_iterator(log_str.begin(), log_str.end(), header_pattern), - std::sregex_iterator()) | - ranges::views::transform([=](const std::smatch& m) { - return MsgMarker{.start = static_cast(m.position(0)), - .timestamp_start = static_cast(m.position(1)), - .timestamp_length = static_cast(m.length(1)), - .logger_class_start = static_cast(m.position(2)), - .logger_class_length = static_cast(m.length(2)), - .log_level_start = static_cast(m.position(3)), - .log_level_length = static_cast(m.length(3))}; - }) | - ranges::to(); - - markers.push_back(MsgMarker{.start = log_str.size(), - .timestamp_start = log_str.size(), - .timestamp_length = 0, - .logger_class_start = log_str.size(), - .logger_class_length = 0, - .log_level_start = log_str.size(), - .log_level_length = 0}); - - for (auto window: markers | ranges::views::sliding(2)) { - const std::string_view timestamp{log_str.data() + window[0].timestamp_start, window[0].timestamp_length}; - const std::string_view logger_class{log_str.data() + window[0].logger_class_start, window[0].logger_class_length}; - const std::string_view log_level{log_str.data() + window[0].log_level_start, window[0].log_level_length}; - const size_t msg_start_pos = window[0].log_level_start + window[0].log_level_length + 2; - const size_t msg_length = window[1].start >= msg_start_pos ? window[1].start - msg_start_pos : 0; - const std::string_view message{log_str.data() + msg_start_pos, msg_length}; - messages.push_back(LogMessageView{.timestamp = timestamp, .logger_class = logger_class, .log_level = log_level, .payload = message}); - } - - return messages; -} +std::vector extractLogMessageViews(const std::string& log_str); } // namespace org::apache::nifi::minifi::test::utils From 5cd6f2a45c54b777a9906d20a567aa612bfcf2bf Mon Sep 17 00:00:00 2001 From: Martin Zink Date: Tue, 6 Jan 2026 11:25:42 +0100 Subject: [PATCH 5/5] provide sensible default --- CONFIGURE.md | 14 ++++++++ conf/minifi.properties.in | 1 + .../DatabaseContentRepository.cpp | 29 ++++++++++++----- .../rocksdb-repos/database/RocksDbUtils.h | 8 ++++- .../tests/DBContentRepositoryTests.cpp | 32 +++++++++++++++++++ libminifi/src/Configuration.cpp | 1 + libminifi/test/libtest/unit/TestUtils.h | 1 + 7 files changed, 77 insertions(+), 9 deletions(-) diff --git a/CONFIGURE.md b/CONFIGURE.md index 62e8a8768e..40cb44b7b4 100644 --- a/CONFIGURE.md +++ b/CONFIGURE.md @@ -41,6 +41,7 @@ - [Configuring Repositories](#configuring-repositories) - [Configuring Volatile Repositories](#configuring-volatile-repositories) - [Configuring Repository storage locations](#configuring-repository-storage-locations) + - [Configuring cache size for rocksdb content repository](#configuring-cache-size-for-rocksdb-content-repository) - [Configuring compression for rocksdb database](#configuring-compression-for-rocksdb-database) - [Configuring compaction for rocksdb database](#configuring-compaction-for-rocksdb-database) - [Configuring synchronous or asynchronous writes for RocksDB content repository](#configuring-synchronous-or-asynchronous-writes-for-rocksdb-content-repository) @@ -674,6 +675,19 @@ In a Filesystem Hierarchy Standard (FHS) installation (from an RPM package), the nifi.flowfile.repository.directory.default=/var/lib/nifi-minifi-cpp/flowfile_repository nifi.database.content.repository.directory.default=/var/lib/nifi-minifi-cpp/content_repository +### Configuring cache size for rocksdb content repository + +The RocksDB content repository uses a cache to limit memory usage. The cache size can be configured using the following property. +This should limit the memory usage but may cause minimal processing overhead. + + # in minifi.properties + nifi.database.content.repository.optimize.for.small.db.cache.size=8 MB + +You can disable this cache by setting it to an empty value. + + # in minifi.properties + nifi.database.content.repository.optimize.for.small.db.cache.size= + ### Configuring compression for rocksdb database diff --git a/conf/minifi.properties.in b/conf/minifi.properties.in index 7ae002fbce..d4bcbd93b5 100644 --- a/conf/minifi.properties.in +++ b/conf/minifi.properties.in @@ -52,6 +52,7 @@ nifi.content.repository.class.name=DatabaseContentRepository ## Relates to the internal workings of the rocksdb backend # nifi.flowfile.repository.rocksdb.compaction.period=2 min # nifi.database.content.repository.rocksdb.compaction.period=2 min +# nifi.database.content.repository.optimize.for.small.db.cache.size=8 MB # setting this value to "0" enables synchronous deletion # nifi.database.content.repository.purge.period = 1 sec diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp b/extensions/rocksdb-repos/DatabaseContentRepository.cpp index 9367041da2..95e8dbec5f 100644 --- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp +++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp @@ -58,37 +58,50 @@ bool DatabaseContentRepository::initialize(const std::shared_ptrget(Configure::nifi_dbcontent_optimize_for_small_db_cache_size) + const auto cache_size = configuration->get(Configure::nifi_dbcontent_optimize_for_small_db_cache_size).or_else([] { return std::make_optional("8 MB"); }) | utils::andThen([](const auto& cache_size_str) -> std::optional { return parsing::parseDataSize(cache_size_str) | utils::toOptional(); }); std::shared_ptr cache = nullptr; + std::shared_ptr wbm = nullptr; if (cache_size) { cache = rocksdb::NewLRUCache(*cache_size); + wbm = std::make_shared(0, cache); + logger_->log_trace("Using {} sized cache for DatabaseContentRepository", *cache_size); + } else { + logger_->log_trace("Cache limitation disabled for DatabaseContentRepository"); } - auto set_db_opts = [encrypted_env, &cache] (minifi::internal::Writable& db_opts) { + auto set_db_opts = [encrypted_env, &cache, &wbm] (minifi::internal::Writable& db_opts) { minifi::internal::setCommonRocksDbOptions(db_opts); if (encrypted_env) { db_opts.set(&rocksdb::DBOptions::env, encrypted_env.get(), EncryptionEq{}); } else { db_opts.set(&rocksdb::DBOptions::env, rocksdb::Env::Default()); } - if (cache) { - db_opts.call(&rocksdb::DBOptions::OptimizeForSmallDb, &cache); - } + db_opts.optimizeForSmallDb(cache, wbm); }; auto set_cf_opts = [&configuration, &cache] (rocksdb::ColumnFamilyOptions& cf_opts) { cf_opts.OptimizeForPointLookup(4); cf_opts.merge_operator = std::make_shared(); cf_opts.max_successive_merges = 0; - if (auto compression_type = minifi::internal::readConfiguredCompressionType(configuration, Configure::nifi_content_repository_rocksdb_compression)) { + cf_opts.max_write_buffer_number = 2; + cf_opts.write_buffer_size = 4_MB; + if (const auto compression_type = internal::readConfiguredCompressionType(configuration, Configure::nifi_content_repository_rocksdb_compression)) { cf_opts.compression = *compression_type; } if (cache) { - cf_opts.OptimizeForSmallDb(&cache); - cf_opts.compression_opts.max_dict_bytes = 0; + rocksdb::BlockBasedTableOptions table_options; + table_options.block_cache = cache; + + table_options.cache_index_and_filter_blocks = true; + table_options.cache_index_and_filter_blocks_with_high_priority = true; + + table_options.pin_l0_filter_and_index_blocks_in_cache = false; + table_options.pin_top_level_index_and_filter = false; + + cf_opts.table_factory.reset(rocksdb::NewBlockBasedTableFactory(table_options)); } }; db_ = minifi::internal::RocksDatabase::create(set_db_opts, set_cf_opts, directory_, diff --git a/extensions/rocksdb-repos/database/RocksDbUtils.h b/extensions/rocksdb-repos/database/RocksDbUtils.h index 13687805d0..07eaed95cd 100644 --- a/extensions/rocksdb-repos/database/RocksDbUtils.h +++ b/extensions/rocksdb-repos/database/RocksDbUtils.h @@ -51,10 +51,16 @@ class Writable { template decltype(auto) call(Method method, Args&&... args) { - is_modified_ = true; return std::invoke(method, target_, std::forward(args)...); } + void optimizeForSmallDb(std::shared_ptr cache, std::shared_ptr wbm) { + if (!cache || !wbm) { return; } + target_.OptimizeForSmallDb(&cache); + target_.write_buffer_manager = wbm; + target_.max_open_files = 20; + } + template const F& get(F T::* member) { return target_.*member; diff --git a/extensions/rocksdb-repos/tests/DBContentRepositoryTests.cpp b/extensions/rocksdb-repos/tests/DBContentRepositoryTests.cpp index 50195748b1..ac122fbd86 100644 --- a/extensions/rocksdb-repos/tests/DBContentRepositoryTests.cpp +++ b/extensions/rocksdb-repos/tests/DBContentRepositoryTests.cpp @@ -355,3 +355,35 @@ TEST_CASE("DBContentRepository can clear orphan entries") { REQUIRE(getDbSize(dir) == 0); } + +TEST_CASE("nifi_dbcontent_optimize_for_small_db_cache_size default") { + LogTestController::getInstance().setTrace(); + + const auto configuration = std::make_shared(); + const auto content_repo = std::make_shared(); + REQUIRE(content_repo->initialize(configuration)); + + CHECK(LogTestController::getInstance().contains("Using 8388608 sized cache for DatabaseContentRepository")); +} + +TEST_CASE("nifi_dbcontent_optimize_for_small_db_cache_size override") { + LogTestController::getInstance().setTrace(); + + const auto configuration = std::make_shared(); + configuration->set("nifi.database.content.repository.optimize.for.small.db.cache.size", "100 MB"); + const auto content_repo = std::make_shared(); + REQUIRE(content_repo->initialize(configuration)); + + CHECK(LogTestController::getInstance().contains("Using 104857600 sized cache for DatabaseContentRepository")); +} + +TEST_CASE("nifi_dbcontent_optimize_for_small_db_cache_size disable") { + LogTestController::getInstance().setTrace(); + + const auto configuration = std::make_shared(); + configuration->set("nifi.database.content.repository.optimize.for.small.db.cache.size", ""); + const auto content_repo = std::make_shared(); + REQUIRE(content_repo->initialize(configuration)); + + CHECK(LogTestController::getInstance().contains("Cache limitation disabled for DatabaseContentRepository")); +} diff --git a/libminifi/src/Configuration.cpp b/libminifi/src/Configuration.cpp index 191705fac3..17dba361b0 100644 --- a/libminifi/src/Configuration.cpp +++ b/libminifi/src/Configuration.cpp @@ -54,6 +54,7 @@ const std::unordered_map +#include "./ProcessorUtils.h" using namespace std::literals::chrono_literals;