Skip to content

Commit

Permalink
Do not use a single transaction for reading
Browse files Browse the repository at this point in the history
  • Loading branch information
TheCharlatan committed Sep 19, 2024
1 parent fa224f3 commit 3f675fa
Showing 1 changed file with 30 additions and 52 deletions.
82 changes: 30 additions & 52 deletions src/dbwrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -407,14 +407,10 @@ struct MDBXContext {
// MDBX environment handle
mdbx::env_managed env;

// MDBX txn and map
mdbx::txn_managed read_txn;
// MDBX map handle
mdbx::map_handle map;

~MDBXContext() {
if(read_txn){
read_txn.abort();
}
env.close();
}
};
Expand Down Expand Up @@ -442,12 +438,10 @@ MDBXWrapper::MDBXWrapper(const DBParams& params)

// initialize the mdbx environment.
DBContext().env = mdbx::env_managed(params.path, DBContext().create_params, DBContext().operate_params);

DBContext().read_txn = DBContext().env.start_read();
DBContext().map = DBContext().read_txn.open_map(nullptr, mdbx::key_mode::usual, mdbx::value_mode::single);

// Open readers induce really ugly append-only LMDB behavior
DBContext().read_txn.park_reading(true);

auto txn = DBContext().env.start_read();
DBContext().map = txn.open_map(nullptr, mdbx::key_mode::usual, mdbx::value_mode::single);
txn.commit();

if (params.obfuscate && WriteObfuscateKeyIfNotExists()){
LogInfo("Wrote new obfuscate key for %s: %s\n", fs::PathToString(params.path), HexStr(obfuscate_key));
Expand All @@ -466,8 +460,9 @@ std::optional<std::string> MDBXWrapper::ReadImpl(Span<const std::byte> key) cons
{
mdbx::slice slKey(CharCast(key.data()), key.size()), slValue;

DBContext().read_txn.unpark_reading();
slValue = DBContext().read_txn.get(DBContext().map, slKey, mdbx::slice::invalid());
auto read_txn = DBContext().env.start_read();
slValue = read_txn.get(DBContext().map, slKey, mdbx::slice::invalid());
read_txn.commit();

std::optional<std::string> ret;

Expand All @@ -477,26 +472,20 @@ std::optional<std::string> MDBXWrapper::ReadImpl(Span<const std::byte> key) cons
else {
ret = std::string(slValue.as_string());
}
DBContext().read_txn.park_reading(true);
return ret;
}

bool MDBXWrapper::ExistsImpl(Span<const std::byte> key) const {
mdbx::slice slKey(CharCast(key.data()), key.size()), slValue;

DBContext().read_txn.unpark_reading();
slValue = DBContext().read_txn.get(DBContext().map, slKey, mdbx::slice::invalid());
auto read_txn = DBContext().env.start_read();
slValue = read_txn.get(DBContext().map, slKey, mdbx::slice::invalid());
read_txn.commit();

if(slValue == mdbx::slice::invalid()) {
DBContext().read_txn.park_reading(true);
return false;
}
else {
DBContext().read_txn.park_reading(true);
return true;
}

assert(-1);
return true;
}

size_t MDBXWrapper::EstimateSizeImpl(Span<const std::byte> key1, Span<const std::byte> key2) const
Expand Down Expand Up @@ -530,17 +519,14 @@ size_t MDBXWrapper::DynamicMemoryUsage() const

struct MDBXBatch::MDBXWriteBatchImpl {
mdbx::txn_managed txn;
mdbx::map_handle map;
};

MDBXBatch::MDBXBatch (const CDBWrapperBase& _parent) : CDBBatchBase(_parent)
{
const MDBXWrapper& parent = static_cast<const MDBXWrapper&>(m_parent);
m_impl_batch = std::make_unique<MDBXWriteBatchImpl>();


m_impl_batch->txn = parent.DBContext().env.start_write();
m_impl_batch->map = parent.DBContext().map;
};

MDBXBatch::~MDBXBatch()
Expand All @@ -567,7 +553,7 @@ void MDBXBatch::WriteImpl(Span<const std::byte> key, DataStream& value)
mdbx::slice slValue(CharCast(value.data()), value.size());

try {
m_impl_batch->txn.put(parent.DBContext().map, slKey, slValue, mdbx::put_mode::upsert);
m_impl_batch->txn.put(parent.m_db_context->map, slKey, slValue, mdbx::put_mode::upsert);
}
catch (mdbx::error err) {
const std::string errmsg = "Fatal MDBX error: " + err.message();
Expand All @@ -581,7 +567,7 @@ void MDBXBatch::EraseImpl(Span<const std::byte> key)
auto &parent = static_cast<const MDBXWrapper&>(m_parent);

mdbx::slice slKey(CharCast(key.data()), key.size());
m_impl_batch->txn.erase(parent.DBContext().map, slKey);
m_impl_batch->txn.erase(parent.m_db_context->map, slKey);
}

size_t MDBXBatch::SizeEstimate() const
Expand All @@ -591,37 +577,38 @@ size_t MDBXBatch::SizeEstimate() const

struct MDBXIterator::IteratorImpl {
MDBXContext &db_context;
mdbx::txn_managed txn;
const std::unique_ptr<mdbx::cursor_managed> cursor;

IteratorImpl(MDBXContext& db_context, mdbx::cursor_managed&& cur)
: db_context(db_context)
, cursor(std::make_unique<mdbx::cursor_managed>(std::move(cur)))
{}
IteratorImpl(MDBXContext& db_context, mdbx::txn_managed&& tx, mdbx::cursor_managed&& cur)
: db_context(db_context),
txn(std::move(tx)),
cursor(std::make_unique<mdbx::cursor_managed>(std::move(cur)))
{
txn.park_reading();
}
};

MDBXIterator::MDBXIterator(const CDBWrapperBase& _parent, MDBXContext &db_context) : CDBIteratorBase(_parent)
{
db_context.read_txn.unpark_reading();
mdbx::cursor_managed cursor = db_context.read_txn.open_cursor(db_context.map);
auto txn{db_context.env.start_read()};
mdbx::cursor_managed cursor = txn.open_cursor(db_context.map);
txn.park_reading();

m_impl_iter = std::unique_ptr<IteratorImpl>(new IteratorImpl(db_context, std::move(cursor)));
db_context.read_txn.park_reading(true);
m_impl_iter = std::unique_ptr<IteratorImpl>(new IteratorImpl(db_context, std::move(txn), std::move(cursor)));
}

MDBXIterator::~MDBXIterator()
{
m_impl_iter->cursor->close();
m_impl_iter->txn.unpark_reading();
m_impl_iter->txn.commit();
}

void MDBXIterator::SeekImpl(Span<const std::byte> key)
{
m_impl_iter->db_context.read_txn.unpark_reading();

mdbx::slice slKey(CharCast(key.data()), key.size());
valid = m_impl_iter->cursor->seek(slKey);

// The below is probably dangerous since the slice could become invalidated.
m_impl_iter->db_context.read_txn.park_reading(true);
}

CDBIteratorBase* MDBXWrapper::NewIterator()
Expand All @@ -631,12 +618,11 @@ CDBIteratorBase* MDBXWrapper::NewIterator()

bool MDBXWrapper::IsEmpty()
{
DBContext().read_txn.unpark_reading();;
auto cursor{DBContext().read_txn.open_cursor(DBContext().map)};
auto read_txn = DBContext().env.start_read();
auto cursor{read_txn.open_cursor(DBContext().map)};

// the done parameter indicates whether or not the cursor move succeeded.
auto ret = !cursor.to_first(/*throw_notfound=*/false).done;
DBContext().read_txn.park_reading(true);
return ret;
}

Expand Down Expand Up @@ -677,23 +663,19 @@ void CDBIterator::Next() { m_impl_iter->iter->Next(); }

Span<const std::byte> MDBXIterator::GetKeyImpl() const
{
m_impl_iter->db_context.read_txn.unpark_reading();
// 'AsBytes(Span(...' is necessary since mdbx::slice::bytes() returns std::span<char8_t>
// Rather than Span<std::byte>
auto ret = AsBytes(Span(m_impl_iter->cursor->current().key.bytes()));

m_impl_iter->db_context.read_txn.park_reading(true);
return ret;
}

Span<const std::byte> MDBXIterator::GetValueImpl() const
{
m_impl_iter->db_context.read_txn.unpark_reading();
// 'AsBytes(Span(...' is necessary since mdbx::slice::bytes() returns std::span<char8_t>
// Rather than Span<std::byte>
auto ret = AsBytes(Span(m_impl_iter->cursor->current().value.bytes()));

m_impl_iter->db_context.read_txn.park_reading(true);
return ret;
}

Expand All @@ -703,16 +685,12 @@ bool MDBXIterator::Valid() const {

void MDBXIterator::SeekToFirst()
{
m_impl_iter->db_context.read_txn.unpark_reading();
valid = m_impl_iter->cursor->to_first(/*throw_notfound=*/false).done;
m_impl_iter->db_context.read_txn.park_reading(true);
}

void MDBXIterator::Next()
{
m_impl_iter->db_context.read_txn.unpark_reading();
valid = m_impl_iter->cursor->to_next(/*throw_notfound=*/false).done;
m_impl_iter->db_context.read_txn.park_reading(true);
}

const std::vector<unsigned char>& CDBWrapperBase::GetObfuscateKey() const
Expand Down

0 comments on commit 3f675fa

Please sign in to comment.