Skip to content

Commit 4205592

Browse files
committed
fixes
Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
1 parent 2f77302 commit 4205592

File tree

5 files changed

+63
-31
lines changed

5 files changed

+63
-31
lines changed

src/core/compact_object.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1244,6 +1244,10 @@ auto CompactObj::GetCool() const -> CoolItem {
12441244
return res;
12451245
}
12461246

1247+
void CompactObj::KeepExternal(size_t offset, size_t sz) {
1248+
SetExternal(offset, sz, GetExternalRep());
1249+
}
1250+
12471251
std::pair<size_t, size_t> CompactObj::GetExternalSlice() const {
12481252
DCHECK_EQ(EXTERNAL_TAG, taglen_);
12491253
auto& ext = u_.ext_ptr;

src/core/compact_object.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,10 @@ class CompactObj {
376376
// Returns the external data of the object incuding its ColdRecord.
377377
CoolItem GetCool() const;
378378

379+
// Prequisite: IsCool() is true.
380+
// Keeps cool record only as external value.
381+
void KeepExternal(size_t offset, size_t sz);
382+
379383
std::pair<size_t, size_t> GetExternalSlice() const;
380384

381385
// Injects either the the raw string (extracted with GetRawString()) or the usual string

src/server/hset_family.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,10 @@ OpResult<uint32_t> OpSet(const OpArgs& op_args, string_view key, CmdArgList valu
492492

493493
op_args.shard->search_indices()->AddDoc(key, op_args.db_cntx, pv);
494494

495+
// TODO: Enable in final PR under flag
496+
// if (auto* ts = op_args.shard->tiered_storage(); ts)
497+
// ts->TryStash(op_args.db_cntx.db_index, key, &pv);
498+
495499
return created;
496500
}
497501

src/server/tiered_storage.cc

Lines changed: 50 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ ABSL_FLAG(float, tiered_offload_threshold, 0.5,
5252
ABSL_FLAG(float, tiered_upload_threshold, 0.1,
5353
"Ratio of free memory (free/max memory) below which uploading stops");
5454

55+
ABSL_FLAG(bool, tiered_experimental_hash_offload, false, "Experimental hash datatype offloading");
56+
5557
namespace dfly {
5658

5759
using namespace std;
@@ -79,32 +81,34 @@ tiering::DiskSegment FromCoolItem(const PrimeValue::CoolItem& item) {
7981
return {item.record->page_index * tiering::kPageSize + item.page_offset, item.serialized_size};
8082
}
8183

82-
optional<size_t> EstimateSerializedSize(const PrimeValue& pv) {
84+
// Do NOT enforce rules depending on dynamic runtime values as this is called
85+
// when scheduling stash and just before succeeeding and is expected to return the same results
86+
optional<std::pair<size_t, CompactObj::ExternalRep>> EstimateSerializedSize(const PrimeValue& pv) {
8387
switch (pv.ObjType()) {
8488
case OBJ_STRING:
85-
return pv.GetRawString().view().size();
89+
return std::make_pair(pv.GetRawString().view().size(), CompactObj::ExternalRep::STRING);
8690
case OBJ_HASH:
8791
if (pv.Encoding() == kEncodingListPack) {
8892
auto* lp = static_cast<uint8_t*>(pv.RObjPtr());
8993
size_t bytes = lpBytes(lp);
9094
bytes += lpLength(lp) * 2 * 4;
91-
return bytes;
95+
return std::make_pair(bytes, CompactObj::ExternalRep::SERIALIZED_MAP);
9296
}
9397
return {};
9498
default:
9599
return {};
96100
};
97101
}
98102

99-
size_t Serialize(const PrimeValue& pv, io::MutableBytes buffer) {
100-
DCHECK_LE(EstimateSerializedSize(pv), buffer.size());
101-
switch (pv.ObjType()) {
102-
case OBJ_STRING: {
103+
size_t Serialize(CompactObj::ExternalRep rep, const PrimeValue& pv, io::MutableBytes buffer) {
104+
DCHECK_LE(EstimateSerializedSize(pv)->first, buffer.size());
105+
switch (rep) {
106+
case CompactObj::ExternalRep::STRING: {
103107
auto sv = pv.GetRawString();
104108
memcpy(buffer.data(), sv.view().data(), sv.view().size());
105109
return sv.view().size();
106110
}
107-
case OBJ_HASH: {
111+
case CompactObj::ExternalRep::SERIALIZED_MAP: {
108112
DCHECK_EQ(pv.Encoding(), kEncodingListPack);
109113

110114
detail::ListpackWrap lw{static_cast<uint8_t*>(pv.RObjPtr())};
@@ -113,15 +117,15 @@ size_t Serialize(const PrimeValue& pv, io::MutableBytes buffer) {
113117
return tiering::SerializedMap::Serialize(
114118
entries_sv, {reinterpret_cast<char*>(buffer.data()), buffer.length()});
115119
}
116-
default:
117-
DCHECK(false);
118-
return 0;
119-
}
120+
};
121+
return 0;
120122
}
121123

122124
string SerializeString(const PrimeValue& pv) {
123-
string s(*EstimateSerializedSize(pv), 0);
124-
size_t written = Serialize(pv, {reinterpret_cast<uint8_t*>(s.data()), s.size()});
125+
auto estimate = EstimateSerializedSize(pv);
126+
string s(estimate->first, 0);
127+
size_t written =
128+
Serialize(estimate->second, pv, {reinterpret_cast<uint8_t*>(s.data()), s.size()});
125129
s.resize(written);
126130
return s;
127131
}
@@ -186,12 +190,20 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
186190
void RetireColdEntries(size_t additional_memory);
187191

188192
// Set value to be an in-memory type again. Update memory stats.
189-
void Upload(DbIndex dbid, string_view value, bool is_raw, size_t serialized_len, PrimeValue* pv) {
193+
void Upload(DbIndex dbid, string_view value, PrimeValue* pv) {
190194
DCHECK(!value.empty());
191195
DCHECK_EQ(uint8_t(pv->GetExternalRep()), uint8_t(CompactObj::ExternalRep::STRING));
192196

193-
pv->Materialize(value, is_raw);
194-
RecordDeleted(*pv, serialized_len, GetDbTableStats(dbid));
197+
switch (pv->GetExternalRep()) {
198+
case CompactObj::ExternalRep::STRING:
199+
pv->Materialize(value, true);
200+
break;
201+
case CompactObj::ExternalRep::SERIALIZED_MAP:
202+
pv->InitRobj(OBJ_HASH, kEncodingListPack, nullptr);
203+
break;
204+
};
205+
206+
RecordDeleted(*pv, value.size(), GetDbTableStats(dbid));
195207
}
196208

197209
// Find entry by key in db_slice and store external segment in place of original value.
@@ -211,7 +223,8 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
211223
ts_->CoolDown(key.first, key.second, segment, pv);
212224
} else {
213225
stats->AddTypeMemoryUsage(pv->ObjType(), -pv->MallocUsed());
214-
pv->SetExternal(segment.offset, segment.length, CompactObj::ExternalRep::STRING);
226+
auto estimation = EstimateSerializedSize(*pv);
227+
pv->SetExternal(segment.offset, segment.length, estimation->second);
215228
}
216229
} else {
217230
LOG(DFATAL) << "Should not reach here";
@@ -268,7 +281,7 @@ void TieredStorage::ShardOpManager::Defragment(tiering::DiskSegment segment, str
268281
} else {
269282
// Cut out relevant part of value and restore it to memory
270283
string_view value = page.substr(item_segment.offset - segment.offset, item_segment.length);
271-
Upload(dbid, value, true, item_segment.length, &pv);
284+
Upload(dbid, value, &pv);
272285
}
273286
}
274287
}
@@ -439,18 +452,18 @@ std::optional<util::fb2::Future<bool>> TieredStorage::TryStash(DbIndex dbid, str
439452
return {};
440453
}
441454

442-
optional<size_t> estimated = EstimateSerializedSize(*value);
455+
auto estimated = EstimateSerializedSize(*value);
443456
DCHECK(estimated);
444457

445458
tiering::OpManager::EntryId id;
446459
error_code ec;
447460

448461
value->SetStashPending(true);
449-
if (OccupiesWholePages(*estimated)) { // large enough for own page
462+
if (true /*OccupiesWholePages(*estimated)*/) { // large enough for own page
450463
id = KeyRef(dbid, key);
451-
if (auto prepared = op_manager_->PrepareStash(*estimated); prepared) {
464+
if (auto prepared = op_manager_->PrepareStash(estimated->first); prepared) {
452465
auto [offset, buf] = *prepared;
453-
size_t written = Serialize(*value, buf.bytes);
466+
size_t written = Serialize(estimated->second, *value, buf.bytes);
454467
tiering::DiskSegment segment{offset, written};
455468
op_manager_->Stash(id, segment, buf);
456469
} else {
@@ -459,7 +472,7 @@ std::optional<util::fb2::Future<bool>> TieredStorage::TryStash(DbIndex dbid, str
459472
} else if (auto bin = bins_->Stash(dbid, key, SerializeString(*value)); bin) {
460473
id = bin->first;
461474
// TODO(vlad): Write bin to prepared buffer instead of allocating one
462-
if (auto prepared = op_manager_->PrepareStash(*estimated); prepared) {
475+
if (auto prepared = op_manager_->PrepareStash(estimated->first); prepared) {
463476
auto [offset, buf] = *prepared;
464477
memcpy(buf.bytes.data(), bin->second.data(), bin->second.size());
465478
tiering::DiskSegment segment{offset, bin->second.size()};
@@ -571,13 +584,14 @@ void TieredStorage::UpdateFromFlags() {
571584
.write_depth_limit = absl::GetFlag(FLAGS_tiered_storage_write_depth),
572585
.offload_threshold = absl::GetFlag(FLAGS_tiered_offload_threshold),
573586
.upload_threshold = absl::GetFlag(FLAGS_tiered_upload_threshold),
587+
.experimental_hash_offload = absl::GetFlag(FLAGS_tiered_experimental_hash_offload),
574588
};
575589
}
576590

577591
std::vector<std::string> TieredStorage::GetMutableFlagNames() {
578592
return base::GetFlagNames(FLAGS_tiered_min_value_size, FLAGS_tiered_experimental_cooling,
579593
FLAGS_tiered_storage_write_depth, FLAGS_tiered_offload_threshold,
580-
FLAGS_tiered_upload_threshold);
594+
FLAGS_tiered_upload_threshold, FLAGS_tiered_experimental_hash_offload);
581595
}
582596

583597
bool TieredStorage::ShouldOffload() const {
@@ -654,10 +668,10 @@ size_t TieredStorage::ReclaimMemory(size_t goal) {
654668
->prime.FindFirst(record->key_hash, predicate);
655669
CHECK(IsValid(it));
656670
PrimeValue& pv = it->second;
657-
tiering::DiskSegment segment = FromCoolItem(pv.GetCool());
658671

659672
// Now the item is only in storage.
660-
pv.SetExternal(segment.offset, segment.length, CompactObj::ExternalRep::STRING);
673+
tiering::DiskSegment segment = FromCoolItem(pv.GetCool());
674+
pv.KeepExternal(segment.offset, segment.length);
661675

662676
auto* stats = op_manager_->GetDbTableStats(record->db_index);
663677
stats->AddTypeMemoryUsage(record->value.ObjType(), -record->value.MallocUsed());
@@ -673,13 +687,18 @@ bool TieredStorage::ShouldStash(const PrimeValue& pv) const {
673687
return false;
674688

675689
// Estimate value size
676-
optional<size_t> size = EstimateSerializedSize(pv);
677-
if (!size)
690+
auto estimation = EstimateSerializedSize(pv);
691+
if (!estimation)
692+
return false;
693+
694+
// For now, hash offloading is conditional
695+
if (pv.ObjType() == OBJ_HASH && !config_.experimental_hash_offload)
678696
return false;
679697

680698
const auto& disk_stats = op_manager_->GetStats().disk_stats;
681-
return *size >= config_.min_value_size &&
682-
disk_stats.allocated_bytes + tiering::kPageSize + *size < disk_stats.max_file_size;
699+
return estimation->first >= config_.min_value_size &&
700+
disk_stats.allocated_bytes + tiering::kPageSize + estimation->first <
701+
disk_stats.max_file_size;
683702
}
684703

685704
void TieredStorage::CoolDown(DbIndex db_ind, std::string_view str,

src/server/tiered_storage.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ class TieredStorage {
138138
unsigned write_depth_limit;
139139
float offload_threshold;
140140
float upload_threshold;
141+
bool experimental_hash_offload;
141142
} config_;
142143
struct {
143144
uint64_t stash_overflow_cnt = 0;

0 commit comments

Comments
 (0)