Skip to content

Commit

Permalink
Changing return value of putBlob/delBlob to include leader_id.
Browse files Browse the repository at this point in the history
we need to set leader_id in case we are not the leader. So that
client can retry.

Signed-off-by: Xiaoxi Chen <xiaoxchen@ebay.com>
  • Loading branch information
xiaoxichen committed Sep 3, 2024
1 parent 4d999d4 commit 180f3b9
Show file tree
Hide file tree
Showing 12 changed files with 51 additions and 35 deletions.
20 changes: 15 additions & 5 deletions src/include/homeobject/blob_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ namespace homeobject {
ENUM(BlobError, uint16_t, UNKNOWN = 1, TIMEOUT, INVALID_ARG, UNSUPPORTED_OP, NOT_LEADER, REPLICATION_ERROR,
UNKNOWN_SHARD, UNKNOWN_BLOB, CHECKSUM_MISMATCH, READ_FAILED, INDEX_ERROR, SEALED_SHARD);

struct Blob {
struct BlobResponseBase {
std::optional< peer_id_t > current_leader{std::nullopt};
};

struct Blob : BlobResponseBase {
Blob() = default;
Blob(sisl::io_blob_safe b, std::string const& u, uint64_t o) : body(std::move(b)), user_key(u), object_off(o) {}

Expand All @@ -21,15 +25,21 @@ struct Blob {
sisl::io_blob_safe body;
std::string user_key{};
uint64_t object_off{};
std::optional< peer_id_t > current_leader{std::nullopt};
};

struct PutBlobRes : BlobResponseBase {
blob_id_t blob_id;
};

struct DelBlobRes : BlobResponseBase {
};

class BlobManager : public Manager< BlobError > {
public:
virtual AsyncResult< blob_id_t > put(shard_id_t shard, Blob&&) = 0;
virtual AsyncResult< PutBlobRes > put(shard_id_t shard, Blob&&) = 0;
virtual AsyncResult< Blob > get(shard_id_t shard, blob_id_t const& blob, uint64_t off = 0,
uint64_t len = 0) const = 0;
virtual NullAsyncResult del(shard_id_t shard, blob_id_t const& blob) = 0;
virtual AsyncResult< DelBlobRes > del(shard_id_t shard, blob_id_t const& blob) = 0;
};

} // namespace homeobject
} // namespace homeobject
8 changes: 4 additions & 4 deletions src/lib/blob_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,18 @@ BlobManager::AsyncResult< Blob > HomeObjectImpl::get(shard_id_t shard, blob_id_t
});
}

BlobManager::AsyncResult< blob_id_t > HomeObjectImpl::put(shard_id_t shard, Blob&& blob) {
BlobManager::AsyncResult< PutBlobRes > HomeObjectImpl::put(shard_id_t shard, Blob&& blob) {
return _get_shard(shard).thenValue(
[this, blob = std::move(blob)](auto const e) mutable -> BlobManager::AsyncResult< blob_id_t > {
[this, blob = std::move(blob)](auto const e) mutable -> BlobManager::AsyncResult< PutBlobRes > {
if (!e) return folly::makeUnexpected(BlobError::UNKNOWN_SHARD);
if (ShardInfo::State::SEALED == e.value().state) return folly::makeUnexpected(BlobError::SEALED_SHARD);
if (blob.body.size() == 0) return folly::makeUnexpected(BlobError::INVALID_ARG);
return _put_blob(e.value(), std::move(blob));
});
}

BlobManager::NullAsyncResult HomeObjectImpl::del(shard_id_t shard, blob_id_t const& blob) {
return _get_shard(shard).thenValue([this, blob](auto const e) mutable -> BlobManager::NullAsyncResult {
BlobManager::AsyncResult< DelBlobRes > HomeObjectImpl::del(shard_id_t shard, blob_id_t const& blob) {
return _get_shard(shard).thenValue([this, blob](auto const e) mutable -> BlobManager::AsyncResult< DelBlobRes > {
if (!e) return folly::makeUnexpected(BlobError::UNKNOWN_SHARD);
return _del_blob(e.value(), blob);
});
Expand Down
8 changes: 4 additions & 4 deletions src/lib/homeobject_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@ class HomeObjectImpl : public HomeObject,
virtual ShardManager::AsyncResult< ShardInfo > _create_shard(pg_id_t, uint64_t size_bytes) = 0;
virtual ShardManager::AsyncResult< ShardInfo > _seal_shard(ShardInfo const&) = 0;

virtual BlobManager::AsyncResult< blob_id_t > _put_blob(ShardInfo const&, Blob&&) = 0;
virtual BlobManager::AsyncResult< PutBlobRes > _put_blob(ShardInfo const&, Blob&&) = 0;
virtual BlobManager::AsyncResult< Blob > _get_blob(ShardInfo const&, blob_id_t, uint64_t off = 0,
uint64_t len = 0) const = 0;
virtual BlobManager::NullAsyncResult _del_blob(ShardInfo const&, blob_id_t) = 0;
virtual BlobManager::AsyncResult< DelBlobRes > _del_blob(ShardInfo const&, blob_id_t) = 0;
///

virtual PGManager::NullAsyncResult _create_pg(PGInfo&& pg_info, std::set< peer_id_t > const& peers) = 0;
Expand Down Expand Up @@ -158,10 +158,10 @@ class HomeObjectImpl : public HomeObject,
uint64_t get_current_timestamp();

/// BlobManager
BlobManager::AsyncResult< blob_id_t > put(shard_id_t shard, Blob&&) final;
BlobManager::AsyncResult< PutBlobRes > put(shard_id_t shard, Blob&&) final;
BlobManager::AsyncResult< Blob > get(shard_id_t shard, blob_id_t const& blob, uint64_t off,
uint64_t len) const final;
BlobManager::NullAsyncResult del(shard_id_t shard, blob_id_t const& blob) final;
BlobManager::AsyncResult< DelBlobRes > del(shard_id_t shard, blob_id_t const& blob) final;
};

} // namespace homeobject
15 changes: 9 additions & 6 deletions src/lib/homestore_backend/hs_blob_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ struct put_blob_req_ctx : public repl_result_ctx< BlobManager::Result< HSHomeObj
sisl::io_blob_safe& blob_header_buf() { return data_bufs_[blob_header_idx_]; }
};

BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& shard, Blob&& blob) {
BlobManager::AsyncResult< PutBlobRes > HSHomeObject::_put_blob(ShardInfo const& shard, Blob&& blob) {
auto& pg_id = shard.placement_group;
shared< homestore::ReplDev > repl_dev;
blob_id_t new_blob_id;
Expand Down Expand Up @@ -159,11 +159,13 @@ BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& s
req->blob_header()->to_string(), req->data_sgs_string());

repl_dev->async_alloc_write(req->cheader_buf(), req->ckey_buf(), req->data_sgs(), req);
return req->result().deferValue([this, req](const auto& result) -> BlobManager::AsyncResult< blob_id_t > {
return req->result().deferValue([this, req](const auto& result) -> BlobManager::AsyncResult< PutBlobRes > {
PutBlobRes res;
if (result.hasError()) { return folly::makeUnexpected(result.error()); }
auto blob_info = result.value();
BLOGT(blob_info.shard_id, blob_info.blob_id, "Put blob success blkid=[{}]", blob_info.pbas.to_string());
return blob_info.blob_id;
res.blob_id = blob_info.blob_id;
return res;
});
}

Expand Down Expand Up @@ -340,7 +342,7 @@ HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive<
return hints;
}

BlobManager::NullAsyncResult HSHomeObject::_del_blob(ShardInfo const& shard, blob_id_t blob_id) {
BlobManager::AsyncResult< DelBlobRes > HSHomeObject::_del_blob(ShardInfo const& shard, blob_id_t blob_id) {
BLOGT(shard.id, blob_id, "deleting blob");
auto& pg_id = shard.placement_group;
shared< homestore::ReplDev > repl_dev;
Expand All @@ -367,11 +369,12 @@ BlobManager::NullAsyncResult HSHomeObject::_del_blob(ShardInfo const& shard, blo
std::memcpy(req->key_buf().bytes(), &blob_id, sizeof(blob_id_t));

repl_dev->async_alloc_write(req->cheader_buf(), req->ckey_buf(), sisl::sg_list{}, req);
return req->result().deferValue([](const auto& result) -> folly::Expected< folly::Unit, BlobError > {
return req->result().deferValue([](const auto& result) -> folly::Expected< DelBlobRes, BlobError > {
DelBlobRes res;
if (result.hasError()) { return folly::makeUnexpected(result.error()); }
auto blob_info = result.value();
BLOGT(blob_info.shard_id, blob_info.blob_id, "Delete blob successful");
return folly::Unit();
return res;
});
}

Expand Down
4 changes: 2 additions & 2 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ class HSHomeObject : public HomeObjectImpl {
ShardManager::AsyncResult< ShardInfo > _create_shard(pg_id_t, uint64_t size_bytes) override;
ShardManager::AsyncResult< ShardInfo > _seal_shard(ShardInfo const&) override;

BlobManager::AsyncResult< blob_id_t > _put_blob(ShardInfo const&, Blob&&) override;
BlobManager::AsyncResult< PutBlobRes > _put_blob(ShardInfo const&, Blob&&) override;
BlobManager::AsyncResult< Blob > _get_blob(ShardInfo const&, blob_id_t, uint64_t off = 0,
uint64_t len = 0) const override;
BlobManager::NullAsyncResult _del_blob(ShardInfo const&, blob_id_t) override;
BlobManager::AsyncResult< DelBlobRes > _del_blob(ShardInfo const&, blob_id_t) override;

PGManager::NullAsyncResult _create_pg(PGInfo&& pg_info, std::set< peer_id_t > const& peers) override;
PGManager::NullAsyncResult _replace_member(pg_id_t id, peer_id_t const& old_member,
Expand Down
6 changes: 3 additions & 3 deletions src/lib/homestore_backend/tests/homeobj_fixture.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,11 @@ class HomeObjectFixture : public ::testing::Test {
ASSERT_TRUE(false);
continue;
}
auto blob_id = b.value();
auto putBlobRes = b.value();

LOGINFO("Put blob pg {} shard {} blob {} data {}", pg_id, shard_id, blob_id,
LOGINFO("Put blob pg {} shard {} blob {} data {}", pg_id, shard_id, putBlobRes.blob_id,
hex_bytes(clone.body.cbytes(), std::min(10u, clone.body.size())));
blob_map.insert({{pg_id, shard_id, blob_id}, std::move(clone)});
blob_map.insert({{pg_id, shard_id, putBlobRes.blob_id}, std::move(clone)});
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/lib/homestore_backend/tests/hs_blob_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ TEST_F(HomeObjectFixture, SealShardWithRestart) {
EXPECT_EQ(shard_info.state, ShardInfo::State::OPEN);
auto b = _obj_inst->blob_manager()->put(shard_id, Blob{sisl::io_blob_safe(512u, 512u), "test_blob", 0ul}).get();
ASSERT_TRUE(!!b);
LOGINFO("Put blob {}", b.value());
LOGINFO("Put blob {}", b.value().blob_id);

s = _obj_inst->shard_manager()->seal_shard(shard_id).get();
ASSERT_TRUE(!!s);
Expand Down
2 changes: 1 addition & 1 deletion src/lib/homestore_backend/tests/hs_pg_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ TEST_F(HomeObjectFixture, PGStatsTest) {
EXPECT_EQ(shard_info.state, ShardInfo::State::OPEN);
auto b = _obj_inst->blob_manager()->put(shard_id, Blob{sisl::io_blob_safe(512u, 512u), "test_blob", 0ul}).get();
ASSERT_TRUE(!!b);
LOGINFO("Put blob {}", b.value());
LOGINFO("Put blob {}", b.value().blob_id);

// create a shard
s = _obj_inst->shard_manager()->seal_shard(shard_id).get();
Expand Down
11 changes: 7 additions & 4 deletions src/lib/memory_backend/mem_blob_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace homeobject {
} else

// Write (move) Blob to new BlobExt on heap and Insert BlobExt to Index
BlobManager::AsyncResult< blob_id_t > MemoryHomeObject::_put_blob(ShardInfo const& _shard, Blob&& _blob) {
BlobManager::AsyncResult< PutBlobRes > MemoryHomeObject::_put_blob(ShardInfo const& _shard, Blob&& _blob) {
WITH_SHARD
blob_id_t new_blob_id;
{
Expand All @@ -32,7 +32,10 @@ BlobManager::AsyncResult< blob_id_t > MemoryHomeObject::_put_blob(ShardInfo cons
auto [_, happened] =
shard.btree_.try_emplace(route, BlobExt{.state_ = BlobState::ALIVE, .blob_ = new Blob(std::move(_blob))});
RELEASE_ASSERT(happened, "Generated duplicate BlobRoute!");
return route.blob;

PutBlobRes res;
res.blob_id = new_blob_id;
return res;
}

// Lookup BlobExt and duplicate underyling Blob for user; only *safe* because we defer GC.
Expand All @@ -45,14 +48,14 @@ BlobManager::AsyncResult< Blob > MemoryHomeObject::_get_blob(ShardInfo const& _s
}

// Tombstone BlobExt entry
BlobManager::NullAsyncResult MemoryHomeObject::_del_blob(ShardInfo const& _shard, blob_id_t _blob) {
BlobManager::AsyncResult< DelBlobRes > MemoryHomeObject::_del_blob(ShardInfo const& _shard, blob_id_t _blob) {
WITH_SHARD
WITH_ROUTE(_blob)
IF_BLOB_ALIVE {
shard.btree_.assign_if_equal(route, blob_it->second,
BlobExt{.state_ = BlobState::DELETED, .blob_ = blob_it->second.blob_});
}
return folly::Unit();
return DelBlobRes{};
}

} // namespace homeobject
4 changes: 2 additions & 2 deletions src/lib/memory_backend/mem_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ class MemoryHomeObject : public HomeObjectImpl {
ShardManager::AsyncResult< ShardInfo > _seal_shard(ShardInfo const&) override;

// BlobManager
BlobManager::AsyncResult< blob_id_t > _put_blob(ShardInfo const&, Blob&&) override;
BlobManager::AsyncResult< PutBlobRes > _put_blob(ShardInfo const&, Blob&&) override;
BlobManager::AsyncResult< Blob > _get_blob(ShardInfo const&, blob_id_t, uint64_t off = 0,
uint64_t len = 0) const override;
BlobManager::NullAsyncResult _del_blob(ShardInfo const&, blob_id_t) override;
BlobManager::AsyncResult< DelBlobRes > _del_blob(ShardInfo const&, blob_id_t) override;
///

// PGManager
Expand Down
4 changes: 2 additions & 2 deletions src/lib/tests/BlobManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ TEST_F(TestFixture, BasicBlobTests) {
->put(_shard_1.id, Blob{sisl::io_blob_safe(4 * Ki, 512u), "test_blob", 4 * Mi})
.deferValue([this](auto const& e) {
EXPECT_TRUE(!!e);
e.then([this](auto const& blob_id) {
e.then([this](auto const& put_blob_res) {
LOGINFO("Successfully put blob, shard {}, blobID {}", _shard_1.id,
blob_id);
put_blob_res.blob_id);
});
}));
our_calls.push_back(
Expand Down
2 changes: 1 addition & 1 deletion src/lib/tests/fixture_app.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ void TestFixture::SetUp() {
->put(_shard_1.id, homeobject::Blob{sisl::io_blob_safe(4 * Ki, 512u), "test_blob", 4 * Mi})
.get();
EXPECT_TRUE(!!o_e);
o_e.then([this](auto&& b) mutable { _blob_id = std::move(b); });
o_e.then([this](auto&& b) mutable { _blob_id = std::move(b.blob_id); });

g_e = homeobj_->blob_manager()->get(_shard_1.id, _blob_id).get();
EXPECT_TRUE(!!g_e);
Expand Down

0 comments on commit 180f3b9

Please sign in to comment.