Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add baseline resync read part in homeobject. #204

Merged
merged 1 commit into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "2.1.1"
version = "2.1.2"

homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeReplication"
Expand Down
1 change: 1 addition & 0 deletions src/include/homeobject/shard_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ struct ShardInfo {
shard_id_t id;
pg_id_t placement_group;
State state;
uint64_t lsn;
sanebay marked this conversation as resolved.
Show resolved Hide resolved
uint64_t created_time;
uint64_t last_modified_time;
uint64_t available_capacity_bytes;
Expand Down
2 changes: 2 additions & 0 deletions src/lib/blob_route.hpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#pragma once

#include <compare>
#include <functional>

Expand Down
1 change: 1 addition & 0 deletions src/lib/homestore_backend/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ target_sources("${PROJECT_NAME}_homestore" PRIVATE
hs_blob_manager.cpp
hs_shard_manager.cpp
hs_pg_manager.cpp
pg_blob_iterator.cpp
index_kv.cpp
heap_chunk_selector.cpp
replication_state_machine.cpp
Expand Down
6 changes: 6 additions & 0 deletions src/lib/homestore_backend/hs_backend_config.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ attribute "deprecated";
table HSBackendSettings {
// timer thread freq in us
backend_timer_us: uint64 = 60000000 (hotswap);

// Maximum number of blobs in a snapshot batch
max_num_blobs_in_snapshot_batch: uint64 = 1024 (hotswap);

// Maximum size of a snapshot batch
max_snapshot_batch_size_mb: uint64 = 128 (hotswap);
}

root_type HSBackendSettings;
23 changes: 13 additions & 10 deletions src/lib/homestore_backend/hs_blob_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "lib/homeobject_impl.hpp"
#include "lib/blob_route.hpp"
#include <homestore/homestore.hpp>
#include <homestore/blkdata_service.hpp>

SISL_LOGGING_DECL(blobmgr)

Expand Down Expand Up @@ -162,9 +163,7 @@ BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& s
return req->result().deferValue([this, req, repl_dev](const auto& result) -> BlobManager::AsyncResult< blob_id_t > {
if (result.hasError()) {
auto err = result.error();
if (err.getCode() == BlobErrorCode::NOT_LEADER) {
err.current_leader = repl_dev->get_leader_id();
}
if (err.getCode() == BlobErrorCode::NOT_LEADER) { err.current_leader = repl_dev->get_leader_id(); }
return folly::makeUnexpected(err);
}
auto blob_info = result.value();
Expand Down Expand Up @@ -259,18 +258,24 @@ BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob(ShardInfo const& shard,
return folly::makeUnexpected(r.error());
}

auto const blkid = r.value();
return _get_blob_data(repl_dev, shard.id, blob_id, req_offset, req_len, r.value() /* blkid*/);
}

BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob_data(const shared< homestore::ReplDev >& repl_dev,
shard_id_t shard_id, blob_id_t blob_id,
uint64_t req_offset, uint64_t req_len,
const homestore::MultiBlkId& blkid) const {
auto const total_size = blkid.blk_count() * repl_dev->get_blk_size();
sisl::io_blob_safe read_buf{total_size, io_align};

sisl::sg_list sgs;
sgs.size = total_size;
sgs.iovs.emplace_back(iovec{.iov_base = read_buf.bytes(), .iov_len = read_buf.size()});

BLOGT(shard.id, blob_id, "Blob get request: blkid={}, buf={}", blkid.to_string(), (void*)read_buf.bytes());
BLOGT(shard_id, blob_id, "Blob get request: blkid={}, buf={}", blkid.to_string(), (void*)read_buf.bytes());
return repl_dev->async_read(blkid, sgs, total_size)
.thenValue([this, blob_id, shard_id = shard.id, req_len, req_offset, blkid,
read_buf = std::move(read_buf), repl_dev](auto&& result) mutable -> BlobManager::AsyncResult< Blob > {
.thenValue([this, blob_id, shard_id, req_len, req_offset, blkid, repl_dev,
read_buf = std::move(read_buf)](auto&& result) mutable -> BlobManager::AsyncResult< Blob > {
if (result) {
BLOGE(shard_id, blob_id, "Failed to get blob: err={}", blob_id, shard_id, result.value());
return folly::makeUnexpected(BlobError(BlobErrorCode::READ_FAILED));
Expand Down Expand Up @@ -379,9 +384,7 @@ BlobManager::NullAsyncResult HSHomeObject::_del_blob(ShardInfo const& shard, blo
return req->result().deferValue([repl_dev](const auto& result) -> folly::Expected< folly::Unit, BlobError > {
if (result.hasError()) {
auto err = result.error();
if (err.getCode() == BlobErrorCode::NOT_LEADER) {
err.current_leader = repl_dev->get_leader_id();
}
if (err.getCode() == BlobErrorCode::NOT_LEADER) { err.current_leader = repl_dev->get_leader_id(); }
return folly::makeUnexpected(err);
}
auto blob_info = result.value();
Expand Down
36 changes: 36 additions & 0 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#include "heap_chunk_selector.h"
#include "lib/homeobject_impl.hpp"
#include "replication_message.hpp"
#include "homeobject/common.hpp"
#include "index_kv.hpp"

namespace homestore {
struct meta_blk;
Expand Down Expand Up @@ -267,6 +269,10 @@ class HSHomeObject : public HomeObjectImpl {
homestore::MultiBlkId pbas;
};

struct BlobInfoData : public BlobInfo {
Blob blob;
};

enum class BlobState : uint8_t {
ALIVE = 0,
TOMBSTONE = 1,
Expand All @@ -275,6 +281,26 @@ class HSHomeObject : public HomeObjectImpl {

inline const static homestore::MultiBlkId tombstone_pbas{0, 0, 0};

struct PGBlobIterator {
PGBlobIterator(HSHomeObject& home_obj, homestore::group_id_t group_id, uint64_t upto_lsn = 0);
PG* get_pg_metadata();
int64_t get_next_blobs(uint64_t max_num_blobs_in_batch, uint64_t max_batch_size_bytes,
std::vector< HSHomeObject::BlobInfoData >& blob_vec, bool& end_of_shard);
void create_pg_shard_snapshot_data(sisl::io_blob_safe& meta_blob);
void create_blobs_snapshot_data(std::vector< HSHomeObject::BlobInfoData >& blob_vec,
sisl::io_blob_safe& data_blob, bool end_of_shard);
bool end_of_scan() const;

uint64_t cur_shard_seq_num_{1};
int64_t cur_blob_id_{-1};
uint64_t max_shard_seq_num_{0};
uint64_t cur_snapshot_batch_num{0};
HSHomeObject& home_obj_;
homestore::group_id_t group_id_;
pg_id_t pg_id_;
shared< homestore::ReplDev > repl_dev_;
};

private:
shared< HeapChunkSelector > chunk_selector_;
unique< HttpManager > http_mgr_;
Expand All @@ -286,6 +312,11 @@ class HSHomeObject : public HomeObjectImpl {
private:
static homestore::ReplicationService& hs_repl_service() { return homestore::hs()->repl_service(); }

// blob related
BlobManager::AsyncResult< Blob > _get_blob_data(const shared< homestore::ReplDev >& repl_dev, shard_id_t shard_id,
blob_id_t blob_id, uint64_t req_offset, uint64_t req_len,
const homestore::MultiBlkId& blkid) const;

// create pg related
PGManager::NullAsyncResult do_create_pg(cshared< homestore::ReplDev > repl_dev, PGInfo&& pg_info);
static std::string serialize_pg_info(const PGInfo& info);
Expand Down Expand Up @@ -414,6 +445,11 @@ class HSHomeObject : public HomeObjectImpl {
const BlobInfo& blob_info);
void print_btree_index(pg_id_t pg_id);

shared< BlobIndexTable > get_index_table(pg_id_t pg_id);

BlobManager::Result< std::vector< BlobInfo > >
query_blobs_in_shard(pg_id_t pg_id, uint64_t cur_shard_seq_num, blob_id_t start_blob_id, uint64_t max_num_in_batch);

// Zero padding buffer related.
size_t max_pad_size() const;
sisl::io_blob_safe& get_pad_buf(uint32_t pad_len);
Expand Down
6 changes: 5 additions & 1 deletion src/lib/homestore_backend/hs_shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ std::string HSHomeObject::serialize_shard_info(const ShardInfo& info) {
j["shard_info"]["shard_id_t"] = info.id;
j["shard_info"]["pg_id_t"] = info.placement_group;
j["shard_info"]["state"] = info.state;
j["shard_info"]["lsn"] = info.lsn;
j["shard_info"]["created_time"] = info.created_time;
j["shard_info"]["modified_time"] = info.last_modified_time;
j["shard_info"]["total_capacity"] = info.total_capacity_bytes;
Expand All @@ -81,6 +82,7 @@ ShardInfo HSHomeObject::deserialize_shard_info(const char* json_str, size_t str_
shard_info.id = shard_json["shard_info"]["shard_id_t"].get< shard_id_t >();
shard_info.placement_group = shard_json["shard_info"]["pg_id_t"].get< pg_id_t >();
shard_info.state = static_cast< ShardInfo::State >(shard_json["shard_info"]["state"].get< int >());
shard_info.lsn = shard_json["shard_info"]["lsn"].get< uint64_t >();
shard_info.created_time = shard_json["shard_info"]["created_time"].get< uint64_t >();
shard_info.last_modified_time = shard_json["shard_info"]["modified_time"].get< uint64_t >();
shard_info.available_capacity_bytes = shard_json["shard_info"]["available_capacity"].get< uint64_t >();
Expand Down Expand Up @@ -116,6 +118,7 @@ ShardManager::AsyncResult< ShardInfo > HSHomeObject::_create_shard(pg_id_t pg_ow
sb->info = ShardInfo{.id = new_shard_id,
.placement_group = pg_owner,
.state = ShardInfo::State::OPEN,
.lsn = 0,
.created_time = create_time,
.last_modified_time = create_time,
.available_capacity_bytes = size_bytes,
Expand Down Expand Up @@ -313,7 +316,8 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, hom
switch (header->msg_type) {
case ReplicationMessageType::CREATE_SHARD_MSG: {
auto sb = r_cast< shard_info_superblk const* >(h.cbytes() + sizeof(ReplicationMessageHeader));
auto const shard_info = sb->info;
auto shard_info = sb->info;
shard_info.lsn = lsn;
sanebay marked this conversation as resolved.
Show resolved Hide resolved

bool shard_exist = false;
{
Expand Down
37 changes: 37 additions & 0 deletions src/lib/homestore_backend/index_kv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,41 @@ void HSHomeObject::print_btree_index(pg_id_t pg_id) {
index_table->dump_tree_to_file();
}

shared< BlobIndexTable > HSHomeObject::get_index_table(pg_id_t pg_id) {
std::shared_lock lock_guard(_pg_lock);
auto iter = _pg_map.find(pg_id);
RELEASE_ASSERT(iter != _pg_map.end(), "PG not found");
auto hs_pg = static_cast< HSHomeObject::HS_PG* >(iter->second.get());
RELEASE_ASSERT(hs_pg->index_table_ != nullptr, "Index table not found for PG");
return hs_pg->index_table_;
}

BlobManager::Result< std::vector< HSHomeObject::BlobInfo > >
HSHomeObject::query_blobs_in_shard(pg_id_t pg_id, uint64_t cur_shard_seq_num, blob_id_t start_blob_id,
uint64_t max_num_in_batch) {
// Query all blobs from start_blob_id to the maximum blob_id value.
std::vector< std::pair< BlobRouteKey, BlobRouteValue > > out_vector;
auto shard_id = make_new_shard_id(pg_id, cur_shard_seq_num);
auto start_key = BlobRouteKey{BlobRoute{shard_id, start_blob_id}};
auto end_key = BlobRouteKey{BlobRoute{shard_id, std::numeric_limits< uint64_t >::max()}};
homestore::BtreeQueryRequest< BlobRouteKey > query_req{
homestore::BtreeKeyRange< BlobRouteKey >{std::move(start_key), true /* inclusive */, std::move(end_key),
true /* inclusive */},
homestore::BtreeQueryType::SWEEP_NON_INTRUSIVE_PAGINATION_QUERY, static_cast< uint32_t >(max_num_in_batch)};
auto index_table = get_index_table(pg_id);
auto const ret = index_table->query(query_req, out_vector);
if (ret != homestore::btree_status_t::success && ret != homestore::btree_status_t::has_more) {
LOGE("Failed to query blobs in index table for ret={} shard={} start_blob_id={}", ret, shard_id, start_blob_id);
return folly::makeUnexpected(BlobErrorCode::INDEX_ERROR);
}

std::vector< BlobInfo > blob_info_vec;
blob_info_vec.reserve(out_vector.size());
for (auto& [r, v] : out_vector) {
blob_info_vec.push_back(BlobInfo{r.key().shard, r.key().blob, v.pbas()});
}

return blob_info_vec;
}

} // namespace homeobject
129 changes: 129 additions & 0 deletions src/lib/homestore_backend/pg_blob_iterator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
#include "hs_homeobject.hpp"
#include <sisl/logging/logging.h>
#include <sisl/options/options.h>
#include <sisl/settings/settings.hpp>
#include "generated/resync_pg_shard_generated.h"
#include "generated/resync_blob_data_generated.h"

namespace homeobject {

HSHomeObject::PGBlobIterator::PGBlobIterator(HSHomeObject& home_obj, homestore::group_id_t group_id,
uint64_t upto_lsn) :
home_obj_(home_obj), group_id_(group_id) {
auto pg = get_pg_metadata();
pg_id_ = pg->pg_info_.id;
repl_dev_ = static_cast< HS_PG* >(pg)->repl_dev_;
if (upto_lsn != 0) {
// Iterate all shards and its blob which have lsn <= upto_lsn
for (auto& shard : pg->shards_) {
auto sequence_num = home_obj_.get_sequence_num_from_shard_id(shard->info.id);
if (shard->info.lsn <= upto_lsn) { max_shard_seq_num_ = std::max(max_shard_seq_num_, sequence_num); }
sanebay marked this conversation as resolved.
Show resolved Hide resolved
}
} else {
max_shard_seq_num_ = pg->shard_sequence_num_;
}
}

PG* HSHomeObject::PGBlobIterator::get_pg_metadata() {
std::scoped_lock lock_guard(home_obj_._pg_lock);
auto iter = home_obj_._pg_map.begin();
for (; iter != home_obj_._pg_map.end(); iter++) {
if (iter->second->pg_info_.replica_set_uuid == group_id_) { break; }
}

RELEASE_ASSERT(iter != home_obj_._pg_map.end(), "PG not found replica_set_uuid={}",
boost::uuids::to_string(group_id_));
return iter->second.get();
}

void HSHomeObject::PGBlobIterator::create_pg_shard_snapshot_data(sisl::io_blob_safe& meta_blob) {
auto pg = get_pg_metadata();
auto& pg_info = pg->pg_info_;
auto& pg_shards = pg->shards_;

flatbuffers::FlatBufferBuilder builder;
std::vector< std::uint8_t > uuid(pg_info.replica_set_uuid.size());
std::copy(pg_info.replica_set_uuid.begin(), pg_info.replica_set_uuid.end(), uuid.begin());
auto pg_entry = CreatePGInfoEntry(builder, pg_info.id, 0 /* priority*/, builder.CreateVector(uuid));

std::vector< ::flatbuffers::Offset< ShardInfoEntry > > shard_entries;
for (auto& shard : pg_shards) {
auto& shard_info = shard->info;
// TODO add lsn.
shard_entries.push_back(CreateShardInfoEntry(
builder, static_cast< uint8_t >(shard_info.state), shard_info.placement_group, shard_info.id,
shard_info.total_capacity_bytes, shard_info.created_time, shard_info.last_modified_time));
}
builder.FinishSizePrefixed(CreateResyncPGShardInfo(builder, pg_entry, builder.CreateVector(shard_entries)));
meta_blob = sisl::io_blob_safe{builder.GetSize()};
std::memcpy(meta_blob.bytes(), builder.GetBufferPointer(), builder.GetSize());
}

int64_t HSHomeObject::PGBlobIterator::get_next_blobs(uint64_t max_num_blobs_in_batch, uint64_t max_batch_size_bytes,
std::vector< BlobInfoData >& blob_data_vec, bool& end_of_shard) {
end_of_shard = false;
uint64_t total_bytes = 0, num_blobs = 0;
while (true) {
auto r = home_obj_.query_blobs_in_shard(pg_id_, cur_shard_seq_num_, cur_blob_id_ + 1, max_num_blobs_in_batch);
if (!r) { return -1; }
auto& index_results_vec = r.value();
for (auto& info : index_results_vec) {
if (info.pbas == HSHomeObject::tombstone_pbas) {
// Skip deleted blobs
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure can we do it like this?

suppose leader has {1,1} to {5,5}, and {5,7} to {10,10}({shard_id, blob_id}) , last lsn is 120, {5,6} is deleted at lsn 100. the log at leader has been compacted to 110.
follower has {1,1} to {6.0} and the last lsn is 80 , then if baseline resync occurs, the follower will never konw that {5,6} has been deleted since it is not aware fo lsn 100.

so I think here we should set some special data for the tombstone_pbas in the blob_info_vec which will be send to follower , so that the follower can identify that this blob is deleted.

another question, if GC happens and tombstone is also removed, so how can leader let the follower know this info when baseline resync happens.

pls correct me if i missunderstand anything

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We havent put LSN into each blob index so ATM it is a full resync --- i.e all existing data can be discard. So there is no issue for deleted blob and no necessary for transferring tombstone.

Extending the discussion further , assuming we have LSN in blob index, we can let follower to set its current LSN and leader will only send the [follower_lsn , snapshot_lsn] to follower. In this case, as you said , we care about blob deletion. The trivial approach is leader send out active blob list in <shard_id =S , batch =0 > , follower mark all blobs not in active blob list as deleted.

I think we are not yet have solid thinking regarding the "incremental snapshot" especially with a good amount of reserved log entries. Though personally I am loving it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this is a valid scenario. We can do two ways.

  1. Either follower see's a gap in blob-sequence and assume its deletion.
  2. More safe approach is use scrubber. In scrubber leader sends the valid list, its crc across followers. Follower can use this to delete.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think before we have the incremental snapshot feature , i.e only transfer diff between 2 snapshots, we would better erase everything on receiver side as anyway we start from scratch

Copy link
Collaborator

@JacksonYao287 JacksonYao287 Sep 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what we will do in baseline resync write part is incremental snapshot , no?
as I mentioned here
#204 (comment)
after follower receives pg and shard metadata in the first obj, it will ask for shard and blobs that does not exist in this follower.

Either follower see's a gap in blob-sequence and assume its deletion

this does not work, for example, leader has {1, 10} to {3,10} and follower has {1,10} to {2,5}. if {1,5} is deleted at leader , then follower can not get this blob-sequence gap since it will start syncing shards and blobs from shard 2

More safe approach is use scrubber. In scrubber leader sends the valid list, its crc across followers. Follower can use this to delete.

this seems works. also we should sends open shard list, since some seal shard log might also be compacted

continue;
}
auto result = home_obj_
._get_blob_data(repl_dev_, info.shard_id, info.blob_id, 0 /*start_offset*/,
0 /* req_len */, info.pbas)
.get();
if (!result) {
LOGE("Failed to retrieve blob for shard={} blob={} pbas={}", info.shard_id, info.blob_id,
info.pbas.to_string(), result.error());
return -1;
}

auto& blob = result.value();
num_blobs++;
total_bytes += blob.body.size() + blob.user_key.size();
if (num_blobs > max_num_blobs_in_batch || total_bytes > max_batch_size_bytes) { return 0; }

BlobInfoData blob_data{{info.shard_id, info.blob_id, std::move(info.pbas)}, std::move(blob)};
blob_data_vec.push_back(std::move(blob_data));
cur_blob_id_ = info.blob_id;
}

if (index_results_vec.empty()) {
// We got empty results from index, which means we read
// all the blobs in the current shard
end_of_shard = true;
cur_shard_seq_num_++;
cur_blob_id_ = -1;
break;
}
}

return 0;
}

void HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(std::vector< BlobInfoData >& blob_data_vec,
sisl::io_blob_safe& data_blob, bool end_of_shard) {
std::vector< ::flatbuffers::Offset< BlobData > > blob_entries;
flatbuffers::FlatBufferBuilder builder;
for (auto& b : blob_data_vec) {
blob_entries.push_back(
CreateBlobData(builder, b.shard_id, b.blob_id, b.blob.user_key.size(), b.blob.body.size(),
builder.CreateVector(r_cast< uint8_t* >(const_cast< char* >(b.blob.user_key.data())),
b.blob.user_key.size()),
builder.CreateVector(b.blob.body.bytes(), b.blob.body.size())));
}
builder.FinishSizePrefixed(
CreateResyncBlobDataBatch(builder, builder.CreateVector(blob_entries), end_of_shard /* end_of_batch */));
data_blob = sisl::io_blob_safe{builder.GetSize()};
std::memcpy(data_blob.bytes(), builder.GetBufferPointer(), builder.GetSize());
}

bool HSHomeObject::PGBlobIterator::end_of_scan() const {
return max_shard_seq_num_ == 0 || cur_shard_seq_num_ > max_shard_seq_num_;
}

} // namespace homeobject
Loading
Loading