Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions cc/src/core/f2.h
Original file line number Diff line number Diff line change
Expand Up @@ -741,15 +741,26 @@ inline bool F2Kv<K, V, D, HHI, CHI>::CompactLog(S& store, StoreType store_type,
bool shift_begin_address, int n_threads, bool checkpoint) {
const bool is_hot_store = (store_type == StoreType::HOT);

uint64_t tail_address = store.hlog.GetTailAddress().control();
uint64_t begin_address{ store.hlog.begin_address.control() };
uint64_t tail_address{ store.hlog.GetTailAddress().control() };
uint64_t safe_read_only_address{ store.hlog.safe_read_only_address.control() };

log_debug("Compact %s: {%.2lf GB} {Goal %.2lf GB} [%lu %lu] -> [%lu %lu]",
is_hot_store ? "HOT" : "COLD",
static_cast<double>(store.Size()) / (1 << 30),
static_cast<double>(tail_address - until_address) / (1 << 30),
store.hlog.begin_address.control(), tail_address,
until_address, tail_address);
if (until_address > store.hlog.safe_read_only_address.control()) {
throw std::invalid_argument{ "Can only compact until safe read-only region" };

if (until_address <= begin_address) {
log_warn("Skipping log compaction due to: until_address <= begin_address. "
"until_address should be larger than hlog.begin_address");
return false;
}
if (until_address > safe_read_only_address) {
log_warn("Skipping log compaction due to: until_address > safe_read_only_address. "
"Can only compact until safe read-only region");
return false;
}

StoreCheckpointStatus status;
Expand Down
6 changes: 4 additions & 2 deletions cc/src/core/faster.h
Original file line number Diff line number Diff line change
Expand Up @@ -3591,7 +3591,9 @@ inline bool FasterKv<K, V, D, H, OH>::CompactWithLookup(uint64_t until_address,
template <class K, class V, class D, class H, class OH>
bool FasterKv<K, V, D, H, OH>::InternalCompactWithLookup(uint64_t until_address, bool shift_begin_address, int n_threads,
bool to_other_store, bool checkpoint, Guid& checkpoint_token) {
if (hlog.begin_address.load() > until_address) {
Address begin_address = hlog.begin_address.load();

if (begin_address >= until_address) {
throw std::invalid_argument {"Invalid until address; should be larger than hlog.begin_address"};
}
if (until_address > hlog.safe_read_only_address.control()) {
Expand All @@ -3610,7 +3612,7 @@ bool FasterKv<K, V, D, H, OH>::InternalCompactWithLookup(uint64_t until_address,

std::deque<std::thread> threads;

ConcurrentLogPageIterator<faster_t> iter(&hlog, &disk, &epoch_, hlog.begin_address.load(), Address(until_address));
ConcurrentLogPageIterator<faster_t> iter(&hlog, &disk, &epoch_, begin_address, Address(until_address));
compaction_context_.Initialize(&iter, n_threads-1, to_other_store);

// Spawn the threads first
Expand Down
45 changes: 27 additions & 18 deletions cc/test/f2_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,9 @@ TEST_P(HotColdParameterizedTestParam, UpsertRead) {
if (!auto_compaction) {
// perform hot-cold compaction
uint64_t hot_size = store.hot_store.Size(), cold_size = store.cold_store.Size();
store.CompactHotLog(store.hot_store.hlog.safe_read_only_address.control(), true);
ASSERT_TRUE(store.hot_store.Size() < hot_size && store.cold_store.Size() > cold_size);
if (store.CompactHotLog(store.hot_store.hlog.safe_read_only_address.control(), true)) {
ASSERT_TRUE(store.hot_store.Size() < hot_size && store.cold_store.Size() > cold_size);
}
}

// Read.
Expand Down Expand Up @@ -332,8 +333,9 @@ TEST_P(HotColdParameterizedTestParam, UpsertRead) {
if (!auto_compaction) {
// perform hot-cold compaction
uint64_t hot_size = store.hot_store.Size(), cold_size = store.cold_store.Size();
store.CompactHotLog(store.hot_store.hlog.safe_read_only_address.control(), true, 4);
ASSERT_TRUE(store.hot_store.Size() < hot_size && store.cold_store.Size() > cold_size);
if (store.CompactHotLog(store.hot_store.hlog.safe_read_only_address.control(), true, 4)) {
ASSERT_TRUE(store.hot_store.Size() < hot_size && store.cold_store.Size() > cold_size);
}
}

// Read existing again (in random order), plus non-existing ones
Expand Down Expand Up @@ -433,8 +435,9 @@ TEST_P(HotColdParameterizedTestParam, HotColdCompaction) {
if (!auto_compaction) {
// perform hot-cold compaction
uint64_t hot_size = store.hot_store.Size(), cold_size = store.cold_store.Size();
store.CompactHotLog(store.hot_store.hlog.safe_read_only_address.control(), true);
ASSERT_TRUE(store.hot_store.Size() < hot_size && store.cold_store.Size() > cold_size);
if (store.CompactHotLog(store.hot_store.hlog.safe_read_only_address.control(), true)) {
ASSERT_TRUE(store.hot_store.Size() < hot_size && store.cold_store.Size() > cold_size);
}
}

// Read existing again (in random order), plus non-existing ones
Expand Down Expand Up @@ -530,8 +533,9 @@ TEST_P(HotColdParameterizedTestParam, UpsertDelete) {
if (!auto_compaction) {
// perform hot-cold compaction
uint64_t hot_size = store.hot_store.Size(), cold_size = store.cold_store.Size();
store.CompactHotLog(store.hot_store.hlog.safe_read_only_address.control(), true, 1);
ASSERT_TRUE(store.hot_store.Size() < hot_size && store.cold_store.Size() > cold_size);
if (store.CompactHotLog(store.hot_store.hlog.safe_read_only_address.control(), true, 1)) {
ASSERT_TRUE(store.hot_store.Size() < hot_size && store.cold_store.Size() > cold_size);
}
}
// Read both existent and non-existent keys
for(size_t idx = 1; idx <= num_records; idx++) {
Expand Down Expand Up @@ -574,8 +578,9 @@ TEST_P(HotColdParameterizedTestParam, UpsertDelete) {
if (!auto_compaction) {
// perform hot-cold compaction
uint64_t hot_size = store.hot_store.Size(), cold_size = store.cold_store.Size();
store.CompactHotLog(store.hot_store.hlog.safe_read_only_address.control(), true, 1);
ASSERT_TRUE(store.hot_store.Size() < hot_size && store.cold_store.Size() > cold_size);
if (store.CompactHotLog(store.hot_store.hlog.safe_read_only_address.control(), true, 1)) {
ASSERT_TRUE(store.hot_store.Size() < hot_size && store.cold_store.Size() > cold_size);
}
}

// Read all keys -- all should return NOT_FOUND
Expand Down Expand Up @@ -709,8 +714,9 @@ TEST_P(HotColdParameterizedTestParam, Rmw) {
if (!auto_compaction) {
// perform hot-cold compaction
uint64_t hot_size = store.hot_store.Size(), cold_size = store.cold_store.Size();
store.CompactHotLog(store.hot_store.hlog.safe_read_only_address.control(), true);
ASSERT_TRUE(store.hot_store.Size() < hot_size && store.cold_store.Size() > cold_size);
if (store.CompactHotLog(store.hot_store.hlog.safe_read_only_address.control(), true)) {
ASSERT_TRUE(store.hot_store.Size() < hot_size && store.cold_store.Size() > cold_size);
}
}

// Rmw, decrement by 1, 8 times -- random order
Expand Down Expand Up @@ -895,8 +901,9 @@ TEST_P(HotColdParameterizedTestParam, ConcurrentOps) {
if (!auto_compaction) {
// perform hot-cold compaction
uint64_t hot_size = store.hot_store.Size(), cold_size = store.cold_store.Size();
store.CompactHotLog(store.hot_store.hlog.safe_read_only_address.control(), true);
ASSERT_TRUE(store.hot_store.Size() < hot_size && store.cold_store.Size() > cold_size);
if (store.CompactHotLog(store.hot_store.hlog.safe_read_only_address.control(), true)) {
ASSERT_TRUE(store.hot_store.Size() < hot_size && store.cold_store.Size() > cold_size);
}
}

// Perform reads for all keys (and more non-existent ones) in random order
Expand Down Expand Up @@ -1281,8 +1288,9 @@ TEST_P(HotColdParameterizedTestParam, VariableLengthKey) {
if (!auto_compaction) {
// perform hot-cold compaction
uint64_t hot_size = store.hot_store.Size(), cold_size = store.cold_store.Size();
store.CompactHotLog(store.hot_store.hlog.safe_read_only_address.control(), true);
ASSERT_TRUE(store.hot_store.Size() < hot_size && store.cold_store.Size() > cold_size);
if (store.CompactHotLog(store.hot_store.hlog.safe_read_only_address.control(), true)) {
ASSERT_TRUE(store.hot_store.Size() < hot_size && store.cold_store.Size() > cold_size);
}
}

// Read again.
Expand Down Expand Up @@ -1609,8 +1617,9 @@ TEST_P(HotColdParameterizedTestParam, VariableLengthValue) {
if (!auto_compaction) {
// perform hot-cold compaction
uint64_t hot_size = store.hot_store.Size(), cold_size = store.cold_store.Size();
store.CompactHotLog(store.hot_store.hlog.safe_read_only_address.control(), true);
ASSERT_TRUE(store.hot_store.Size() < hot_size && store.cold_store.Size() > cold_size);
if (store.CompactHotLog(store.hot_store.hlog.safe_read_only_address.control(), true)) {
ASSERT_TRUE(store.hot_store.Size() < hot_size && store.cold_store.Size() > cold_size);
}
}

// Read again.
Expand Down