Skip to content

Commit 2f77302

Browse files
committed
feat(tiering): Serialize hashes
Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
1 parent 3a8ff25 commit 2f77302

File tree

1 file changed

+83
-17
lines changed

1 file changed

+83
-17
lines changed

src/server/tiered_storage.cc

Lines changed: 83 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,22 @@
1717
#include "base/flag_utils.h"
1818
#include "base/flags.h"
1919
#include "base/logging.h"
20+
#include "core/detail/listpack_wrap.h"
2021
#include "server/common.h"
2122
#include "server/db_slice.h"
2223
#include "server/engine_shard_set.h"
2324
#include "server/snapshot.h"
2425
#include "server/table.h"
2526
#include "server/tiering/common.h"
2627
#include "server/tiering/op_manager.h"
28+
#include "server/tiering/serialized_map.h"
2729
#include "server/tiering/small_bins.h"
2830
#include "server/tx_base.h"
2931

32+
extern "C" {
33+
#include "redis/listpack.h"
34+
}
35+
3036
using namespace facade;
3137

3238
using AtLeast64 = base::ConstrainedNumericFlagValue<size_t, 64>; // ABSL_FLAG breaks with commas
@@ -73,6 +79,53 @@ tiering::DiskSegment FromCoolItem(const PrimeValue::CoolItem& item) {
7379
return {item.record->page_index * tiering::kPageSize + item.page_offset, item.serialized_size};
7480
}
7581

82+
optional<size_t> EstimateSerializedSize(const PrimeValue& pv) {
83+
switch (pv.ObjType()) {
84+
case OBJ_STRING:
85+
return pv.GetRawString().view().size();
86+
case OBJ_HASH:
87+
if (pv.Encoding() == kEncodingListPack) {
88+
auto* lp = static_cast<uint8_t*>(pv.RObjPtr());
89+
size_t bytes = lpBytes(lp);
90+
bytes += lpLength(lp) * 2 * 4;
91+
return bytes;
92+
}
93+
return {};
94+
default:
95+
return {};
96+
};
97+
}
98+
99+
size_t Serialize(const PrimeValue& pv, io::MutableBytes buffer) {
100+
DCHECK_LE(EstimateSerializedSize(pv), buffer.size());
101+
switch (pv.ObjType()) {
102+
case OBJ_STRING: {
103+
auto sv = pv.GetRawString();
104+
memcpy(buffer.data(), sv.view().data(), sv.view().size());
105+
return sv.view().size();
106+
}
107+
case OBJ_HASH: {
108+
DCHECK_EQ(pv.Encoding(), kEncodingListPack);
109+
110+
detail::ListpackWrap lw{static_cast<uint8_t*>(pv.RObjPtr())};
111+
vector<pair<string, string>> entries(lw.begin(), lw.end());
112+
vector<pair<string_view, string_view>> entries_sv(entries.begin(), entries.end());
113+
return tiering::SerializedMap::Serialize(
114+
entries_sv, {reinterpret_cast<char*>(buffer.data()), buffer.length()});
115+
}
116+
default:
117+
DCHECK(false);
118+
return 0;
119+
}
120+
}
121+
122+
string SerializeString(const PrimeValue& pv) {
123+
string s(*EstimateSerializedSize(pv), 0);
124+
size_t written = Serialize(pv, {reinterpret_cast<uint8_t*>(s.data()), s.size()});
125+
s.resize(written);
126+
return s;
127+
}
128+
76129
} // anonymous namespace
77130

78131
class TieredStorage::ShardOpManager : public tiering::OpManager {
@@ -386,36 +439,41 @@ std::optional<util::fb2::Future<bool>> TieredStorage::TryStash(DbIndex dbid, str
386439
return {};
387440
}
388441

389-
StringOrView raw_string = value->GetRawString();
390-
value->SetStashPending(true);
442+
optional<size_t> estimated = EstimateSerializedSize(*value);
443+
DCHECK(estimated);
391444

392445
tiering::OpManager::EntryId id;
393446
error_code ec;
394447

395-
// TODO(vlad): Replace with encoders for different types
396-
auto stash_string = [&](std::string_view str) {
397-
if (auto prepared = op_manager_->PrepareStash(str.size()); prepared) {
448+
value->SetStashPending(true);
449+
if (OccupiesWholePages(*estimated)) { // large enough for own page
450+
id = KeyRef(dbid, key);
451+
if (auto prepared = op_manager_->PrepareStash(*estimated); prepared) {
398452
auto [offset, buf] = *prepared;
399-
memcpy(buf.bytes.data(), str.data(), str.size());
400-
tiering::DiskSegment segment{offset, str.size()};
453+
size_t written = Serialize(*value, buf.bytes);
454+
tiering::DiskSegment segment{offset, written};
401455
op_manager_->Stash(id, segment, buf);
402456
} else {
403457
ec = prepared.error();
404458
}
405-
};
406-
407-
if (OccupiesWholePages(value->Size())) { // large enough for own page
408-
id = KeyRef(dbid, key);
409-
stash_string(raw_string.view());
410-
} else if (auto bin = bins_->Stash(dbid, key, raw_string.view()); bin) {
459+
} else if (auto bin = bins_->Stash(dbid, key, SerializeString(*value)); bin) {
411460
id = bin->first;
412461
// TODO(vlad): Write bin to prepared buffer instead of allocating one
413-
stash_string(bin->second);
462+
if (auto prepared = op_manager_->PrepareStash(*estimated); prepared) {
463+
auto [offset, buf] = *prepared;
464+
memcpy(buf.bytes.data(), bin->second.data(), bin->second.size());
465+
tiering::DiskSegment segment{offset, bin->second.size()};
466+
op_manager_->Stash(id, segment, buf);
467+
} else {
468+
CHECK(false);
469+
ec = prepared.error();
470+
}
414471
} else {
415472
return {}; // silently added to bin
416473
}
417474

418475
if (ec) {
476+
value->SetStashPending(false);
419477
LOG_IF(ERROR, ec != errc::file_too_large) << "Stash failed immediately" << ec.message();
420478
visit([this](auto id) { op_manager_->ClearIoPending(id); }, id);
421479
return {};
@@ -610,10 +668,18 @@ size_t TieredStorage::ReclaimMemory(size_t goal) {
610668
}
611669

612670
bool TieredStorage::ShouldStash(const PrimeValue& pv) const {
671+
// Check value state
672+
if (pv.IsExternal() || pv.HasStashPending())
673+
return false;
674+
675+
// Estimate value size
676+
optional<size_t> size = EstimateSerializedSize(pv);
677+
if (!size)
678+
return false;
679+
613680
const auto& disk_stats = op_manager_->GetStats().disk_stats;
614-
return !pv.IsExternal() && !pv.HasStashPending() && pv.ObjType() == OBJ_STRING &&
615-
pv.Size() >= config_.min_value_size &&
616-
disk_stats.allocated_bytes + tiering::kPageSize + pv.Size() < disk_stats.max_file_size;
681+
return *size >= config_.min_value_size &&
682+
disk_stats.allocated_bytes + tiering::kPageSize + *size < disk_stats.max_file_size;
617683
}
618684

619685
void TieredStorage::CoolDown(DbIndex db_ind, std::string_view str,

0 commit comments

Comments
 (0)