From e43e06448f21bc42f5c3d50cfcbe4ec2e6cadf3d Mon Sep 17 00:00:00 2001 From: Kostas Kyrimis Date: Thu, 6 Nov 2025 13:44:58 +0200 Subject: [PATCH 1/6] chore: disk backpressure class utility Signed-off-by: Kostas Kyrimis --- src/facade/CMakeLists.txt | 3 +- src/facade/disk_connection_backpressure.cc | 132 +++++++++++++++++++++ src/facade/disk_connection_backpressure.h | 65 ++++++++++ src/facade/dragonfly_connection.cc | 17 ++- src/facade/dragonfly_connection.h | 6 +- 5 files changed, 220 insertions(+), 3 deletions(-) create mode 100644 src/facade/disk_connection_backpressure.cc create mode 100644 src/facade/disk_connection_backpressure.h diff --git a/src/facade/CMakeLists.txt b/src/facade/CMakeLists.txt index 3cfc0b361231..bf06b21d2db6 100644 --- a/src/facade/CMakeLists.txt +++ b/src/facade/CMakeLists.txt @@ -3,7 +3,8 @@ cxx_link(dfly_parser_lib base strings_lib) add_library(dfly_facade conn_context.cc dragonfly_listener.cc dragonfly_connection.cc facade.cc memcache_parser.cc reply_builder.cc op_status.cc service_interface.cc - reply_capture.cc cmd_arg_parser.cc tls_helpers.cc socket_utils.cc) + reply_capture.cc cmd_arg_parser.cc tls_helpers.cc socket_utils.cc + disk_connection_backpressure.cc) if (DF_USE_SSL) set(TLS_LIB tls_lib) diff --git a/src/facade/disk_connection_backpressure.cc b/src/facade/disk_connection_backpressure.cc new file mode 100644 index 000000000000..78756fb5fe68 --- /dev/null +++ b/src/facade/disk_connection_backpressure.cc @@ -0,0 +1,132 @@ +// Copyright 2025, DragonflyDB authors. All rights reserved. +// +// See LICENSE for licensing terms. +// + +#include "facade/disk_connection_backpressure.h" + +#include + +#include + +#include "base/flags.h" +#include "base/logging.h" +#include "facade/facade_types.h" +#include "io/io.h" +#include "util/fibers/uring_file.h" + +using facade::operator""_MB; + +ABSL_FLAG(std::string, disk_backpressure_folder, "/tmp/", + "Folder to store " + "disk backed connection backpressure"); + +ABSL_FLAG(size_t, disk_backpressure_file_max_bytes, 50_MB, + "Maximum size of the backing file. When max size is reached, connection will " + "stop offloading backpressure to disk and block on client read."); + +ABSL_FLAG(size_t, disk_backpressure_load_size, 30, + "How many items to load in dispatch queue from the disk backed file."); + +namespace facade { + +DiskBackedBackpressureQueue::DiskBackedBackpressureQueue(uint32_t conn_id) + : max_backing_size_(absl::GetFlag(FLAGS_disk_backpressure_file_max_bytes)), + max_queue_load_size_(absl::GetFlag(FLAGS_disk_backpressure_load_size)), + id_(conn_id) { +} + +std::error_code DiskBackedBackpressureQueue::Init() { + std::string backing_name = absl::StrCat(absl::GetFlag(FLAGS_disk_backpressure_folder), id_); + { + // Kernel transparently handles buffering via the page cache. + auto res = util::fb2::OpenWrite(backing_name, {} /* overwrite mode + non direct io */); + if (!res) { + return res.error(); + } + writer_.reset(*res); + } + + auto res = util::fb2::OpenRead(backing_name); + if (!res) { + return res.error(); + } + reader_.reset(*res); + + VLOG(3) << "Created backing for connection " << this << " " << backing_name; + + return {}; +} + +DiskBackedBackpressureQueue::~DiskBackedBackpressureQueue() { + auto ec = writer_->Close(); + LOG_IF(WARNING, ec) << ec.message(); + ec = reader_->Close(); + LOG_IF(WARNING, ec) << ec.message(); +} + +// Check if backing file is empty, i.e. backing file has 0 bytes. +bool DiskBackedBackpressureQueue::Empty() const { + return total_backing_bytes_ == 0; +} + +bool DiskBackedBackpressureQueue::HasEnoughBackingSpaceFor(size_t bytes) const { + return (bytes + total_backing_bytes_) < max_backing_size_; +} + +size_t DiskBackedBackpressureQueue::TotalInMemoryBytes() const { + return offsets_.size() * sizeof(ItemOffset); +} + +void DiskBackedBackpressureQueue::OffloadToBacking(std::string_view blob) { + ItemOffset item; + item.offset = next_offset_; + item.total_bytes = blob.size(); + + // TODO we should truncate as the file grows. That way we never end up with large files + // on disk. + auto res = writer_->Write(blob); + if (res) { + VLOG(2) << "Failed to offload connection " << this << " backpressure with offset " + << item.offset << " of size " << item.total_bytes << " to backing with error: " << res; + return; + } + + total_backing_bytes_ += blob.size(); + offsets_.push_back(item); + next_offset_ += item.total_bytes; + + VLOG(2) << "Offload connection " << this << " backpressure of " << item.total_bytes + << " bytes to disk at offset: " << item.offset; + VLOG(3) << "Command offloaded: " << blob; +} + +void DiskBackedBackpressureQueue::LoadFromBacking(std::function f) { + std::string buffer; + size_t up_to = max_queue_load_size_; + + while (!offsets_.empty() && up_to--) { + ItemOffset item = offsets_.front(); + + buffer.resize(item.total_bytes); + + io::MutableBytes bytes{reinterpret_cast(buffer.data()), item.total_bytes}; + auto result = reader_->Read(item.offset, bytes); + if (!result) { + LOG(ERROR) << "Could not load item at offset " << item.offset << " of size " + << item.total_bytes << " from disk with error: " << result.error().value() << " " + << result.error().message(); + return; + } + + VLOG(2) << "Loaded item with offset " << item.offset << " of size " << item.total_bytes + << " for connection " << this; + + f(bytes); + + offsets_.pop_front(); + total_backing_bytes_ -= item.total_bytes; + } +} + +} // namespace facade diff --git a/src/facade/disk_connection_backpressure.h b/src/facade/disk_connection_backpressure.h new file mode 100644 index 000000000000..29f53adaec42 --- /dev/null +++ b/src/facade/disk_connection_backpressure.h @@ -0,0 +1,65 @@ +// Copyright 2025, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#pragma once + +#include + +#include +#include +#include +#include +#include + +#include "io/io.h" + +namespace facade { + +class DiskBackedBackpressureQueue { + public: + explicit DiskBackedBackpressureQueue(uint32_t conn_id); + ~DiskBackedBackpressureQueue(); + + std::error_code Init(); + + // Check if we can offload bytes to backing file. + bool HasEnoughBackingSpaceFor(size_t bytes) const; + + // Total size of internal buffers/structures. + size_t TotalInMemoryBytes() const; + + void OffloadToBacking(std::string_view blob); + + // For each item loaded from disk it calls f(item) to consume it. + // Reads up to max_queue_load_size_ items on each call + void LoadFromBacking(std::function f); + + // Check if backing file is empty, i.e. backing file has 0 bytes. + bool Empty() const; + + private: + // File Reader/Writer + std::unique_ptr writer_; + std::unique_ptr reader_; + + // In memory backed file map + struct ItemOffset { + size_t offset = 0; + size_t total_bytes = 0; + }; + + std::deque offsets_; + + size_t total_backing_bytes_ = 0; + size_t next_offset_ = 0; + + // Read only constants + const size_t max_backing_size_ = 0; + const size_t max_queue_load_size_ = 0; + + // unique id for the file backed + const size_t id_ = 0; +}; + +} // namespace facade diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index a4724aa69d44..38be88bc01a7 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -1,4 +1,4 @@ -// Copyright 2022, DragonflyDB authors. All rights reserved. +// Copyright 2025, DragonflyDB authors. All rights reserved. // // See LICENSE for licensing terms. // @@ -13,6 +13,7 @@ #include #include +#include "absl/flags/internal/flag.h" #include "base/cycle_clock.h" #include "base/flag_utils.h" #include "base/flags.h" @@ -22,6 +23,7 @@ #include "base/stl_util.h" #include "core/heap_size.h" #include "facade/conn_context.h" +#include "facade/disk_connection_backpressure.h" #include "facade/dragonfly_listener.h" #include "facade/memcache_parser.h" #include "facade/redis_parser.h" @@ -112,6 +114,9 @@ ABSL_FLAG(uint32_t, pipeline_wait_batch_usec, 0, "If non-zero, waits for this time for more I/O " " events to come for the connection in case there is only one command in the pipeline. "); +ABSL_FLAG(size_t, disk_backpressure_offload_watermark, 0, + "Offload backpressure to disk when dispatch queue size crosses the watermark."); + using namespace util; using namespace std; using absl::GetFlag; @@ -676,6 +681,16 @@ Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener, #endif UpdateLibNameVerMap(lib_name_, lib_ver_, +1); + + backpressure_to_disk_watermark_ = absl::GetFlag(FLAGS_disk_backpressure_offload_watermark); + if (backpressure_to_disk_watermark_ > 0) { + backing_queue_ = std::make_unique(id_); + auto ec = backing_queue_->Init(); + if (ec) { + LOG(ERROR) << "Error while initializing backpressure file " << ec.message(); + backing_queue_.reset(); + } + } } Connection::~Connection() { diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index 19dfef93f717..222c8d7d1100 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -1,4 +1,4 @@ -// Copyright 2022, DragonflyDB authors. All rights reserved. +// Copyright 2025, DragonflyDB authors. All rights reserved. // See LICENSE for licensing terms. // @@ -45,6 +45,7 @@ class ConnectionContext; class RedisParser; class ServiceInterface; class SinkReplyBuilder; +class DiskBackedBackpressureQueue; // Connection represents an active connection for a client. // @@ -503,6 +504,9 @@ class Connection : public util::Connection { }; }; + std::unique_ptr backing_queue_; + size_t backpressure_to_disk_watermark_ = 0; + bool request_shutdown_ = false; }; From 92393698fcfe2c43bd99da98b38667c5d5ca311c Mon Sep 17 00:00:00 2001 From: Kostas Kyrimis Date: Fri, 7 Nov 2025 17:31:42 +0200 Subject: [PATCH 2/6] comments --- helio | 2 +- src/facade/CMakeLists.txt | 3 +- src/facade/disk_backed_queue_test.cc | 65 ++++++++++ src/facade/disk_connection_backpressure.cc | 132 --------------------- src/facade/disk_connection_backpressure.h | 65 ---------- src/facade/dragonfly_connection.cc | 12 +- src/facade/dragonfly_connection.h | 4 +- 7 files changed, 76 insertions(+), 207 deletions(-) create mode 100644 src/facade/disk_backed_queue_test.cc delete mode 100644 src/facade/disk_connection_backpressure.cc delete mode 100644 src/facade/disk_connection_backpressure.h diff --git a/helio b/helio index 5c40a69430b9..087c88c76451 160000 --- a/helio +++ b/helio @@ -1 +1 @@ -Subproject commit 5c40a69430b9f9aaae30fc73f999a63bfc5d0f17 +Subproject commit 087c88c76451cbbc7870008ad2439d47f44a841b diff --git a/src/facade/CMakeLists.txt b/src/facade/CMakeLists.txt index bf06b21d2db6..22c100bdae7a 100644 --- a/src/facade/CMakeLists.txt +++ b/src/facade/CMakeLists.txt @@ -4,7 +4,7 @@ cxx_link(dfly_parser_lib base strings_lib) add_library(dfly_facade conn_context.cc dragonfly_listener.cc dragonfly_connection.cc facade.cc memcache_parser.cc reply_builder.cc op_status.cc service_interface.cc reply_capture.cc cmd_arg_parser.cc tls_helpers.cc socket_utils.cc - disk_connection_backpressure.cc) + disk_backed_queue.cc) if (DF_USE_SSL) set(TLS_LIB tls_lib) @@ -21,6 +21,7 @@ cxx_test(memcache_parser_test dfly_facade LABELS DFLY) cxx_test(redis_parser_test facade_test LABELS DFLY) cxx_test(reply_builder_test facade_test LABELS DFLY) cxx_test(cmd_arg_parser_test facade_test LABELS DFLY) +cxx_test(disk_backed_queue_test facade_test LABELS DFLY) add_executable(ok_backend ok_main.cc) cxx_link(ok_backend dfly_facade) diff --git a/src/facade/disk_backed_queue_test.cc b/src/facade/disk_backed_queue_test.cc new file mode 100644 index 000000000000..3eb8f7dd4ada --- /dev/null +++ b/src/facade/disk_backed_queue_test.cc @@ -0,0 +1,65 @@ +// Copyright 2025, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#include "facade/disk_backed_queue.h" + +#include +#include + +#include +#include +#include + +#include "base/flags.h" +#include "base/gtest.h" +#include "base/logging.h" +#include "io/io.h" +#include "util/fibers/uring_proactor.h" + +namespace dfly { +namespace { + +using namespace facade; + +TEST(DiskBackedQueueTest, ReadWrite) { + auto proactor = std::make_unique(); + + auto pthread = std::thread{[ptr = proactor.get()] { + static_cast(ptr)->Init(0, 8); + ptr->Run(); + }}; + + proactor->Await([]() { + DiskBackedQueue backing(1 /* id */); + EXPECT_FALSE(backing.Init()); + + std::vector commands; + for (size_t i = 0; i < 100; ++i) { + commands.push_back(absl::StrCat("SET FOO", i, " BAR")); + auto ec = backing.Push(commands.back()); + EXPECT_FALSE(ec); + } + + std::vector results; + for (size_t i = 0; i < 4; ++i) { + auto ec = backing.PopN([&](io::MutableBytes read) { + std::string str(reinterpret_cast(read.data()), read.size()); + results.push_back(std::move(str)); + }); + EXPECT_FALSE(ec); + } + + EXPECT_EQ(results.size(), commands.size()); + + for (size_t i = 0; i < results.size(); ++i) { + EXPECT_EQ(results[i], commands[i]); + } + }); + + proactor->Stop(); + pthread.join(); +} + +} // namespace +} // namespace dfly diff --git a/src/facade/disk_connection_backpressure.cc b/src/facade/disk_connection_backpressure.cc deleted file mode 100644 index 78756fb5fe68..000000000000 --- a/src/facade/disk_connection_backpressure.cc +++ /dev/null @@ -1,132 +0,0 @@ -// Copyright 2025, DragonflyDB authors. All rights reserved. -// -// See LICENSE for licensing terms. -// - -#include "facade/disk_connection_backpressure.h" - -#include - -#include - -#include "base/flags.h" -#include "base/logging.h" -#include "facade/facade_types.h" -#include "io/io.h" -#include "util/fibers/uring_file.h" - -using facade::operator""_MB; - -ABSL_FLAG(std::string, disk_backpressure_folder, "/tmp/", - "Folder to store " - "disk backed connection backpressure"); - -ABSL_FLAG(size_t, disk_backpressure_file_max_bytes, 50_MB, - "Maximum size of the backing file. When max size is reached, connection will " - "stop offloading backpressure to disk and block on client read."); - -ABSL_FLAG(size_t, disk_backpressure_load_size, 30, - "How many items to load in dispatch queue from the disk backed file."); - -namespace facade { - -DiskBackedBackpressureQueue::DiskBackedBackpressureQueue(uint32_t conn_id) - : max_backing_size_(absl::GetFlag(FLAGS_disk_backpressure_file_max_bytes)), - max_queue_load_size_(absl::GetFlag(FLAGS_disk_backpressure_load_size)), - id_(conn_id) { -} - -std::error_code DiskBackedBackpressureQueue::Init() { - std::string backing_name = absl::StrCat(absl::GetFlag(FLAGS_disk_backpressure_folder), id_); - { - // Kernel transparently handles buffering via the page cache. - auto res = util::fb2::OpenWrite(backing_name, {} /* overwrite mode + non direct io */); - if (!res) { - return res.error(); - } - writer_.reset(*res); - } - - auto res = util::fb2::OpenRead(backing_name); - if (!res) { - return res.error(); - } - reader_.reset(*res); - - VLOG(3) << "Created backing for connection " << this << " " << backing_name; - - return {}; -} - -DiskBackedBackpressureQueue::~DiskBackedBackpressureQueue() { - auto ec = writer_->Close(); - LOG_IF(WARNING, ec) << ec.message(); - ec = reader_->Close(); - LOG_IF(WARNING, ec) << ec.message(); -} - -// Check if backing file is empty, i.e. backing file has 0 bytes. -bool DiskBackedBackpressureQueue::Empty() const { - return total_backing_bytes_ == 0; -} - -bool DiskBackedBackpressureQueue::HasEnoughBackingSpaceFor(size_t bytes) const { - return (bytes + total_backing_bytes_) < max_backing_size_; -} - -size_t DiskBackedBackpressureQueue::TotalInMemoryBytes() const { - return offsets_.size() * sizeof(ItemOffset); -} - -void DiskBackedBackpressureQueue::OffloadToBacking(std::string_view blob) { - ItemOffset item; - item.offset = next_offset_; - item.total_bytes = blob.size(); - - // TODO we should truncate as the file grows. That way we never end up with large files - // on disk. - auto res = writer_->Write(blob); - if (res) { - VLOG(2) << "Failed to offload connection " << this << " backpressure with offset " - << item.offset << " of size " << item.total_bytes << " to backing with error: " << res; - return; - } - - total_backing_bytes_ += blob.size(); - offsets_.push_back(item); - next_offset_ += item.total_bytes; - - VLOG(2) << "Offload connection " << this << " backpressure of " << item.total_bytes - << " bytes to disk at offset: " << item.offset; - VLOG(3) << "Command offloaded: " << blob; -} - -void DiskBackedBackpressureQueue::LoadFromBacking(std::function f) { - std::string buffer; - size_t up_to = max_queue_load_size_; - - while (!offsets_.empty() && up_to--) { - ItemOffset item = offsets_.front(); - - buffer.resize(item.total_bytes); - - io::MutableBytes bytes{reinterpret_cast(buffer.data()), item.total_bytes}; - auto result = reader_->Read(item.offset, bytes); - if (!result) { - LOG(ERROR) << "Could not load item at offset " << item.offset << " of size " - << item.total_bytes << " from disk with error: " << result.error().value() << " " - << result.error().message(); - return; - } - - VLOG(2) << "Loaded item with offset " << item.offset << " of size " << item.total_bytes - << " for connection " << this; - - f(bytes); - - offsets_.pop_front(); - total_backing_bytes_ -= item.total_bytes; - } -} - -} // namespace facade diff --git a/src/facade/disk_connection_backpressure.h b/src/facade/disk_connection_backpressure.h deleted file mode 100644 index 29f53adaec42..000000000000 --- a/src/facade/disk_connection_backpressure.h +++ /dev/null @@ -1,65 +0,0 @@ -// Copyright 2025, DragonflyDB authors. All rights reserved. -// See LICENSE for licensing terms. -// - -#pragma once - -#include - -#include -#include -#include -#include -#include - -#include "io/io.h" - -namespace facade { - -class DiskBackedBackpressureQueue { - public: - explicit DiskBackedBackpressureQueue(uint32_t conn_id); - ~DiskBackedBackpressureQueue(); - - std::error_code Init(); - - // Check if we can offload bytes to backing file. - bool HasEnoughBackingSpaceFor(size_t bytes) const; - - // Total size of internal buffers/structures. - size_t TotalInMemoryBytes() const; - - void OffloadToBacking(std::string_view blob); - - // For each item loaded from disk it calls f(item) to consume it. - // Reads up to max_queue_load_size_ items on each call - void LoadFromBacking(std::function f); - - // Check if backing file is empty, i.e. backing file has 0 bytes. - bool Empty() const; - - private: - // File Reader/Writer - std::unique_ptr writer_; - std::unique_ptr reader_; - - // In memory backed file map - struct ItemOffset { - size_t offset = 0; - size_t total_bytes = 0; - }; - - std::deque offsets_; - - size_t total_backing_bytes_ = 0; - size_t next_offset_ = 0; - - // Read only constants - const size_t max_backing_size_ = 0; - const size_t max_queue_load_size_ = 0; - - // unique id for the file backed - const size_t id_ = 0; -}; - -} // namespace facade diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 38be88bc01a7..03b4288bfffd 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -13,7 +13,6 @@ #include #include -#include "absl/flags/internal/flag.h" #include "base/cycle_clock.h" #include "base/flag_utils.h" #include "base/flags.h" @@ -23,7 +22,7 @@ #include "base/stl_util.h" #include "core/heap_size.h" #include "facade/conn_context.h" -#include "facade/disk_connection_backpressure.h" +#include "facade/disk_backed_queue.h" #include "facade/dragonfly_listener.h" #include "facade/memcache_parser.h" #include "facade/redis_parser.h" @@ -114,7 +113,7 @@ ABSL_FLAG(uint32_t, pipeline_wait_batch_usec, 0, "If non-zero, waits for this time for more I/O " " events to come for the connection in case there is only one command in the pipeline. "); -ABSL_FLAG(size_t, disk_backpressure_offload_watermark, 0, +ABSL_FLAG(size_t, disk_queue_offload_watermark, 0, "Offload backpressure to disk when dispatch queue size crosses the watermark."); using namespace util; @@ -682,12 +681,13 @@ Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener, UpdateLibNameVerMap(lib_name_, lib_ver_, +1); - backpressure_to_disk_watermark_ = absl::GetFlag(FLAGS_disk_backpressure_offload_watermark); + backpressure_to_disk_watermark_ = absl::GetFlag(FLAGS_disk_queue_offload_watermark); if (backpressure_to_disk_watermark_ > 0) { - backing_queue_ = std::make_unique(id_); + backing_queue_ = std::make_unique(id_); auto ec = backing_queue_->Init(); if (ec) { - LOG(ERROR) << "Error while initializing backpressure file " << ec.message(); + LOG(ERROR) << "Error initializing disk backpressure file for connection " << id_ << ": " + << ec.message(); backing_queue_.reset(); } } diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index 222c8d7d1100..546fe6a59321 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -45,7 +45,7 @@ class ConnectionContext; class RedisParser; class ServiceInterface; class SinkReplyBuilder; -class DiskBackedBackpressureQueue; +class DiskBackedQueue; // Connection represents an active connection for a client. // @@ -504,7 +504,7 @@ class Connection : public util::Connection { }; }; - std::unique_ptr backing_queue_; + std::unique_ptr backing_queue_; size_t backpressure_to_disk_watermark_ = 0; bool request_shutdown_ = false; From e4de405a11eb808ab80ff31f8104a590c69b1555 Mon Sep 17 00:00:00 2001 From: Kostas Kyrimis Date: Fri, 7 Nov 2025 17:33:32 +0200 Subject: [PATCH 3/6] fixes --- helio | 2 +- src/facade/disk_backed_queue.cc | 144 ++++++++++++++++++++++++++++++++ src/facade/disk_backed_queue.h | 57 +++++++++++++ 3 files changed, 202 insertions(+), 1 deletion(-) create mode 100644 src/facade/disk_backed_queue.cc create mode 100644 src/facade/disk_backed_queue.h diff --git a/helio b/helio index 087c88c76451..5c40a69430b9 160000 --- a/helio +++ b/helio @@ -1 +1 @@ -Subproject commit 087c88c76451cbbc7870008ad2439d47f44a841b +Subproject commit 5c40a69430b9f9aaae30fc73f999a63bfc5d0f17 diff --git a/src/facade/disk_backed_queue.cc b/src/facade/disk_backed_queue.cc new file mode 100644 index 000000000000..fcea164f04c4 --- /dev/null +++ b/src/facade/disk_backed_queue.cc @@ -0,0 +1,144 @@ +// Copyright 2025, DragonflyDB authors. All rights reserved. +// +// See LICENSE for licensing terms. +// + +#include "facade/disk_backed_queue.h" + +#include + +#include + +#include "base/flags.h" +#include "base/logging.h" +#include "facade/facade_types.h" +#include "io/io.h" +#include "util/fibers/uring_file.h" + +using facade::operator""_MB; + +ABSL_FLAG(std::string, disk_backpressure_folder, "/tmp/", + "Folder to store disk-backed connection backpressure"); + +ABSL_FLAG(size_t, disk_backpressure_file_max_bytes, 50_MB, + "Maximum size of the backing file. When max size is reached, connection will " + "stop offloading backpressure to disk and block on client read."); + +ABSL_FLAG(size_t, disk_backpressure_load_size, 30, + "How many items to load in dispatch queue from the disk-backed file."); + +namespace facade { + +DiskBackedQueue::DiskBackedQueue(uint32_t conn_id) + : max_backing_size_(absl::GetFlag(FLAGS_disk_backpressure_file_max_bytes)), + max_queue_load_size_(absl::GetFlag(FLAGS_disk_backpressure_load_size)), + id_(conn_id) { +} + +std::error_code DiskBackedQueue::Init() { + std::string backing_name = absl::StrCat(absl::GetFlag(FLAGS_disk_backpressure_folder), id_); + { + // Kernel transparently handles buffering via the page cache. + auto res = util::fb2::OpenWrite(backing_name, {} /* overwrite mode + non-direct io */); + if (!res) { + return res.error(); + } + writer_.reset(*res); + } + + auto res = util::fb2::OpenRead(backing_name); + if (!res) { + return res.error(); + } + reader_.reset(*res); + + VLOG(3) << "Created backing for connection " << this << " " << backing_name; + + return {}; +} + +DiskBackedQueue::~DiskBackedQueue() { + auto ec = writer_->Close(); + LOG_IF(WARNING, ec) << ec.message(); + ec = reader_->Close(); + LOG_IF(WARNING, ec) << ec.message(); +} + +// Check if backing file is empty, i.e. backing file has 0 bytes. +bool DiskBackedQueue::Empty() const { + return total_backing_bytes_ == 0; +} + +bool DiskBackedQueue::HasEnoughBackingSpaceFor(size_t bytes) const { + return (bytes + total_backing_bytes_) < max_backing_size_; +} + +std::error_code DiskBackedQueue::Push(std::string_view blob) { + // TODO we should truncate as the file grows. That way we never end up with large files + // on disk. + uint32_t sz = blob.size(); + // We serialize the string as is and we prefix with 4 bytes denoting its size. The layout is: + // 4bytes(str size) + followed by blob.size() bytes + iovec offset_data[2]{{&sz, sizeof(uint32_t)}, {const_cast(blob.data()), blob.size()}}; + + auto ec = writer_->Write(offset_data, 2); + if (ec) { + VLOG(2) << "Failed to offload blob of size " << sz << " to backing with error: " << ec; + return ec; + } + + total_backing_bytes_ += blob.size(); + ++total_backing_items_; + + if (next_item_total_bytes_ == 0) { + next_item_total_bytes_ = blob.size(); + } + + VLOG(2) << "Offload connection " << this << " backpressure of " << blob.size(); + VLOG(3) << "Command offloaded: " << blob; + return {}; +} + +std::error_code DiskBackedQueue::PopN(std::function f) { + std::string buffer; + size_t up_to = max_queue_load_size_; + + while (total_backing_items_ > 0 && up_to--) { + // We read the next item and (if there are more) we also prefetch the next item's size. + uint32_t read_sz = next_item_total_bytes_ + (total_backing_items_ > 1 ? sizeof(uint32_t) : 0); + buffer.resize(read_sz); + + io::MutableBytes bytes{reinterpret_cast(buffer.data()), read_sz}; + auto result = reader_->Read(next_read_offset_, bytes); + if (!result) { + LOG(ERROR) << "Could not load item at offset " << next_read_offset_ << " of size " << read_sz + << " from disk with error: " << result.error().value() << " " + << result.error().message(); + return result.error(); + } + + VLOG(2) << "Loaded item with offset " << next_read_offset_ << " of size " << read_sz + << " for connection " << this; + + next_read_offset_ += bytes.size(); + + if (total_backing_items_ > 1) { + auto buf = bytes.subspan(bytes.size() - sizeof(uint32_t)); + uint32_t val = ((uint32_t)buf[0]) | ((uint32_t)buf[1] << 8) | ((uint32_t)buf[2] << 16) | + ((uint32_t)buf[3] << 24); + bytes = bytes.subspan(0, next_item_total_bytes_); + next_item_total_bytes_ = val; + } else { + next_item_total_bytes_ = 0; + } + + f(bytes); + + total_backing_bytes_ -= next_item_total_bytes_; + --total_backing_items_; + } + + return {}; +} + +} // namespace facade diff --git a/src/facade/disk_backed_queue.h b/src/facade/disk_backed_queue.h new file mode 100644 index 000000000000..d257b03d7834 --- /dev/null +++ b/src/facade/disk_backed_queue.h @@ -0,0 +1,57 @@ +// Copyright 2025, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#pragma once + +#include + +#include +#include +#include +#include +#include + +#include "io/io.h" + +namespace facade { + +class DiskBackedQueue { + public: + explicit DiskBackedQueue(uint32_t conn_id); + ~DiskBackedQueue(); + + std::error_code Init(); + + // Check if we can offload bytes to backing file. + bool HasEnoughBackingSpaceFor(size_t bytes) const; + + std::error_code Push(std::string_view blob); + + // For each item loaded from disk it calls f(item) to consume it. + // Reads up to max_queue_load_size_ items on each call + std::error_code PopN(std::function f); + + // Check if backing file is empty, i.e. backing file has 0 bytes. + bool Empty() const; + + private: + // File Reader/Writer + std::unique_ptr writer_; + std::unique_ptr reader_; + + size_t total_backing_bytes_ = 0; + size_t total_backing_items_ = 0; + + size_t next_read_offset_ = 4; + size_t next_item_total_bytes_ = 0; + + // Read only constants + const size_t max_backing_size_ = 0; + const size_t max_queue_load_size_ = 0; + + // same as connection id. Used to uniquely identify the backed file + const size_t id_ = 0; +}; + +} // namespace facade From 2f179c80844f2f230339d7be1eb9e813bb37cc08 Mon Sep 17 00:00:00 2001 From: Kostas Kyrimis Date: Fri, 7 Nov 2025 17:42:18 +0200 Subject: [PATCH 4/6] interface Signed-off-by: Kostas Kyrimis --- src/facade/disk_backed_queue.cc | 62 +++++++++++++--------------- src/facade/disk_backed_queue.h | 4 +- src/facade/disk_backed_queue_test.cc | 9 ++-- 3 files changed, 36 insertions(+), 39 deletions(-) diff --git a/src/facade/disk_backed_queue.cc b/src/facade/disk_backed_queue.cc index fcea164f04c4..d4aa045f6944 100644 --- a/src/facade/disk_backed_queue.cc +++ b/src/facade/disk_backed_queue.cc @@ -99,44 +99,40 @@ std::error_code DiskBackedQueue::Push(std::string_view blob) { return {}; } -std::error_code DiskBackedQueue::PopN(std::function f) { - std::string buffer; - size_t up_to = max_queue_load_size_; - - while (total_backing_items_ > 0 && up_to--) { - // We read the next item and (if there are more) we also prefetch the next item's size. - uint32_t read_sz = next_item_total_bytes_ + (total_backing_items_ > 1 ? sizeof(uint32_t) : 0); - buffer.resize(read_sz); - - io::MutableBytes bytes{reinterpret_cast(buffer.data()), read_sz}; - auto result = reader_->Read(next_read_offset_, bytes); - if (!result) { - LOG(ERROR) << "Could not load item at offset " << next_read_offset_ << " of size " << read_sz - << " from disk with error: " << result.error().value() << " " - << result.error().message(); - return result.error(); - } +std::error_code DiskBackedQueue::Pop(std::string* out) { + // We read the next item and (if there are more) we also prefetch the next item's size. + uint32_t read_sz = next_item_total_bytes_ + (total_backing_items_ > 1 ? sizeof(uint32_t) : 0); + buffer.resize(read_sz); + + io::MutableBytes bytes{reinterpret_cast(buffer.data()), read_sz}; + auto result = reader_->Read(next_read_offset_, bytes); + if (!result) { + LOG(ERROR) << "Could not load item at offset " << next_read_offset_ << " of size " << read_sz + << " from disk with error: " << result.error().value() << " " + << result.error().message(); + return result.error(); + } - VLOG(2) << "Loaded item with offset " << next_read_offset_ << " of size " << read_sz - << " for connection " << this; + VLOG(2) << "Loaded item with offset " << next_read_offset_ << " of size " << read_sz + << " for connection " << this; - next_read_offset_ += bytes.size(); + next_read_offset_ += bytes.size(); - if (total_backing_items_ > 1) { - auto buf = bytes.subspan(bytes.size() - sizeof(uint32_t)); - uint32_t val = ((uint32_t)buf[0]) | ((uint32_t)buf[1] << 8) | ((uint32_t)buf[2] << 16) | - ((uint32_t)buf[3] << 24); - bytes = bytes.subspan(0, next_item_total_bytes_); - next_item_total_bytes_ = val; - } else { - next_item_total_bytes_ = 0; - } + if (total_backing_items_ > 1) { + auto buf = bytes.subspan(bytes.size() - sizeof(uint32_t)); + uint32_t val = ((uint32_t)buf[0]) | ((uint32_t)buf[1] << 8) | ((uint32_t)buf[2] << 16) | + ((uint32_t)buf[3] << 24); + bytes = bytes.subspan(0, next_item_total_bytes_); + next_item_total_bytes_ = val; + } else { + next_item_total_bytes_ = 0; + } - f(bytes); + std::string read(reinterpret_cast(bytes.data()), bytes.size()); + *out = std::move(read); - total_backing_bytes_ -= next_item_total_bytes_; - --total_backing_items_; - } + total_backing_bytes_ -= next_item_total_bytes_; + --total_backing_items_; return {}; } diff --git a/src/facade/disk_backed_queue.h b/src/facade/disk_backed_queue.h index d257b03d7834..656977634313 100644 --- a/src/facade/disk_backed_queue.h +++ b/src/facade/disk_backed_queue.h @@ -30,7 +30,7 @@ class DiskBackedQueue { // For each item loaded from disk it calls f(item) to consume it. // Reads up to max_queue_load_size_ items on each call - std::error_code PopN(std::function f); + std::error_code Pop(std::string* out); // Check if backing file is empty, i.e. backing file has 0 bytes. bool Empty() const; @@ -52,6 +52,8 @@ class DiskBackedQueue { // same as connection id. Used to uniquely identify the backed file const size_t id_ = 0; + + std::string buffer; }; } // namespace facade diff --git a/src/facade/disk_backed_queue_test.cc b/src/facade/disk_backed_queue_test.cc index 3eb8f7dd4ada..b36459377eab 100644 --- a/src/facade/disk_backed_queue_test.cc +++ b/src/facade/disk_backed_queue_test.cc @@ -42,12 +42,11 @@ TEST(DiskBackedQueueTest, ReadWrite) { } std::vector results; - for (size_t i = 0; i < 4; ++i) { - auto ec = backing.PopN([&](io::MutableBytes read) { - std::string str(reinterpret_cast(read.data()), read.size()); - results.push_back(std::move(str)); - }); + for (size_t i = 0; i < 100; ++i) { + std::string res; + auto ec = backing.Pop(&res); EXPECT_FALSE(ec); + results.push_back(std::move(res)); } EXPECT_EQ(results.size(), commands.size()); From bb24e539082620deb2cc92a49e8c1cd536ad3687 Mon Sep 17 00:00:00 2001 From: Kostas Kyrimis Date: Fri, 7 Nov 2025 17:49:27 +0200 Subject: [PATCH 5/6] do not close on destructor --- src/facade/disk_backed_queue.cc | 22 ++++++++++++++++++---- src/facade/disk_backed_queue.h | 5 +++-- src/facade/disk_backed_queue_test.cc | 2 ++ 3 files changed, 23 insertions(+), 6 deletions(-) diff --git a/src/facade/disk_backed_queue.cc b/src/facade/disk_backed_queue.cc index d4aa045f6944..210b358a56e6 100644 --- a/src/facade/disk_backed_queue.cc +++ b/src/facade/disk_backed_queue.cc @@ -58,10 +58,24 @@ std::error_code DiskBackedQueue::Init() { } DiskBackedQueue::~DiskBackedQueue() { - auto ec = writer_->Close(); - LOG_IF(WARNING, ec) << ec.message(); - ec = reader_->Close(); - LOG_IF(WARNING, ec) << ec.message(); +} + +std::error_code DiskBackedQueue::CloseWriter() { + if (writer_) { + auto ec = writer_->Close(); + LOG_IF(WARNING, ec) << ec.message(); + return ec; + } + return {}; +} + +std::error_code DiskBackedQueue::CloseReader() { + if (reader_) { + auto ec = reader_->Close(); + LOG_IF(WARNING, ec) << ec.message(); + return ec; + } + return {}; } // Check if backing file is empty, i.e. backing file has 0 bytes. diff --git a/src/facade/disk_backed_queue.h b/src/facade/disk_backed_queue.h index 656977634313..753e13808b61 100644 --- a/src/facade/disk_backed_queue.h +++ b/src/facade/disk_backed_queue.h @@ -28,13 +28,14 @@ class DiskBackedQueue { std::error_code Push(std::string_view blob); - // For each item loaded from disk it calls f(item) to consume it. - // Reads up to max_queue_load_size_ items on each call std::error_code Pop(std::string* out); // Check if backing file is empty, i.e. backing file has 0 bytes. bool Empty() const; + std::error_code CloseReader(); + std::error_code CloseWriter(); + private: // File Reader/Writer std::unique_ptr writer_; diff --git a/src/facade/disk_backed_queue_test.cc b/src/facade/disk_backed_queue_test.cc index b36459377eab..494872540a4b 100644 --- a/src/facade/disk_backed_queue_test.cc +++ b/src/facade/disk_backed_queue_test.cc @@ -54,6 +54,8 @@ TEST(DiskBackedQueueTest, ReadWrite) { for (size_t i = 0; i < results.size(); ++i) { EXPECT_EQ(results[i], commands[i]); } + EXPECT_FALSE(backing.CloseReader()); + EXPECT_FALSE(backing.CloseWriter()); }); proactor->Stop(); From c7617ec797442aca070d4d704d14084127074abc Mon Sep 17 00:00:00 2001 From: Kostas Kyrimis Date: Thu, 13 Nov 2025 12:22:05 +0200 Subject: [PATCH 6/6] push/pop only raw bytes --- src/facade/disk_backed_queue.cc | 46 ++++++---------------------- src/facade/disk_backed_queue.h | 8 ++--- src/facade/disk_backed_queue_test.cc | 18 +++++------ 3 files changed, 19 insertions(+), 53 deletions(-) diff --git a/src/facade/disk_backed_queue.cc b/src/facade/disk_backed_queue.cc index 210b358a56e6..bce606a8e2c9 100644 --- a/src/facade/disk_backed_queue.cc +++ b/src/facade/disk_backed_queue.cc @@ -12,7 +12,6 @@ #include "base/flags.h" #include "base/logging.h" #include "facade/facade_types.h" -#include "io/io.h" #include "util/fibers/uring_file.h" using facade::operator""_MB; @@ -88,25 +87,13 @@ bool DiskBackedQueue::HasEnoughBackingSpaceFor(size_t bytes) const { } std::error_code DiskBackedQueue::Push(std::string_view blob) { - // TODO we should truncate as the file grows. That way we never end up with large files - // on disk. - uint32_t sz = blob.size(); - // We serialize the string as is and we prefix with 4 bytes denoting its size. The layout is: - // 4bytes(str size) + followed by blob.size() bytes - iovec offset_data[2]{{&sz, sizeof(uint32_t)}, {const_cast(blob.data()), blob.size()}}; - - auto ec = writer_->Write(offset_data, 2); + auto ec = writer_->Write(blob); if (ec) { - VLOG(2) << "Failed to offload blob of size " << sz << " to backing with error: " << ec; + VLOG(2) << "Failed to offload blob of size " << blob.size() << " to backing with error: " << ec; return ec; } total_backing_bytes_ += blob.size(); - ++total_backing_items_; - - if (next_item_total_bytes_ == 0) { - next_item_total_bytes_ = blob.size(); - } VLOG(2) << "Offload connection " << this << " backpressure of " << blob.size(); VLOG(3) << "Command offloaded: " << blob; @@ -114,39 +101,24 @@ std::error_code DiskBackedQueue::Push(std::string_view blob) { } std::error_code DiskBackedQueue::Pop(std::string* out) { - // We read the next item and (if there are more) we also prefetch the next item's size. - uint32_t read_sz = next_item_total_bytes_ + (total_backing_items_ > 1 ? sizeof(uint32_t) : 0); - buffer.resize(read_sz); + const size_t k_read_size = 4096; + const size_t to_read = std::min(k_read_size, total_backing_bytes_); + out->resize(to_read); - io::MutableBytes bytes{reinterpret_cast(buffer.data()), read_sz}; + io::MutableBytes bytes{reinterpret_cast(out->data()), to_read}; auto result = reader_->Read(next_read_offset_, bytes); if (!result) { - LOG(ERROR) << "Could not load item at offset " << next_read_offset_ << " of size " << read_sz + LOG(ERROR) << "Could not load item at offset " << next_read_offset_ << " of size " << to_read << " from disk with error: " << result.error().value() << " " << result.error().message(); return result.error(); } - VLOG(2) << "Loaded item with offset " << next_read_offset_ << " of size " << read_sz + VLOG(2) << "Loaded item with offset " << next_read_offset_ << " of size " << to_read << " for connection " << this; next_read_offset_ += bytes.size(); - - if (total_backing_items_ > 1) { - auto buf = bytes.subspan(bytes.size() - sizeof(uint32_t)); - uint32_t val = ((uint32_t)buf[0]) | ((uint32_t)buf[1] << 8) | ((uint32_t)buf[2] << 16) | - ((uint32_t)buf[3] << 24); - bytes = bytes.subspan(0, next_item_total_bytes_); - next_item_total_bytes_ = val; - } else { - next_item_total_bytes_ = 0; - } - - std::string read(reinterpret_cast(bytes.data()), bytes.size()); - *out = std::move(read); - - total_backing_bytes_ -= next_item_total_bytes_; - --total_backing_items_; + total_backing_bytes_ -= bytes.size(); return {}; } diff --git a/src/facade/disk_backed_queue.h b/src/facade/disk_backed_queue.h index 753e13808b61..6fb71b5334c1 100644 --- a/src/facade/disk_backed_queue.h +++ b/src/facade/disk_backed_queue.h @@ -4,16 +4,13 @@ #pragma once -#include +#include -#include #include #include #include #include -#include "io/io.h" - namespace facade { class DiskBackedQueue { @@ -44,8 +41,7 @@ class DiskBackedQueue { size_t total_backing_bytes_ = 0; size_t total_backing_items_ = 0; - size_t next_read_offset_ = 4; - size_t next_item_total_bytes_ = 0; + size_t next_read_offset_ = 0; // Read only constants const size_t max_backing_size_ = 0; diff --git a/src/facade/disk_backed_queue_test.cc b/src/facade/disk_backed_queue_test.cc index 494872540a4b..e976364cada5 100644 --- a/src/facade/disk_backed_queue_test.cc +++ b/src/facade/disk_backed_queue_test.cc @@ -34,26 +34,24 @@ TEST(DiskBackedQueueTest, ReadWrite) { DiskBackedQueue backing(1 /* id */); EXPECT_FALSE(backing.Init()); - std::vector commands; + std::string commands; for (size_t i = 0; i < 100; ++i) { - commands.push_back(absl::StrCat("SET FOO", i, " BAR")); - auto ec = backing.Push(commands.back()); - EXPECT_FALSE(ec); + auto cmd = absl::StrCat("SET FOO", i, " BAR"); + EXPECT_FALSE(backing.Push(cmd)); + absl::StrAppend(&commands, cmd); } - std::vector results; - for (size_t i = 0; i < 100; ++i) { + std::string results; + while (!backing.Empty()) { std::string res; auto ec = backing.Pop(&res); EXPECT_FALSE(ec); - results.push_back(std::move(res)); + absl::StrAppend(&results, res); } EXPECT_EQ(results.size(), commands.size()); + EXPECT_EQ(results, commands); - for (size_t i = 0; i < results.size(); ++i) { - EXPECT_EQ(results[i], commands[i]); - } EXPECT_FALSE(backing.CloseReader()); EXPECT_FALSE(backing.CloseWriter()); });