diff --git a/_codeql_detected_source_root b/_codeql_detected_source_root new file mode 120000 index 00000000..945c9b46 --- /dev/null +++ b/_codeql_detected_source_root @@ -0,0 +1 @@ +. \ No newline at end of file diff --git a/cpp/core/CMakeLists.txt b/cpp/core/CMakeLists.txt index bedf4e57..77ecdb9b 100644 --- a/cpp/core/CMakeLists.txt +++ b/cpp/core/CMakeLists.txt @@ -2,8 +2,8 @@ # # SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception -file(GLOB core_sources CONFIGURE_DEPENDS "*.cpp") -file(GLOB core_headers CONFIGURE_DEPENDS "*.hpp") +file(GLOB core_sources CONFIGURE_DEPENDS "*.cpp" "logsdb/*.cpp") +file(GLOB core_headers CONFIGURE_DEPENDS "*.hpp" "logsdb/*.hpp") add_library(core ${core_sources} ${core_headers}) add_dependencies(core thirdparty) diff --git a/cpp/core/LogsDB.cpp b/cpp/core/LogsDB.cpp index 442e8fe6..4f7fd0ec 100644 --- a/cpp/core/LogsDB.cpp +++ b/cpp/core/LogsDB.cpp @@ -18,10 +18,18 @@ #include #include "Assert.hpp" -#include "LogsDBData.hpp" #include "Msgs.hpp" #include "RocksDBUtils.hpp" #include "Time.hpp" +#include "logsdb/Appender.hpp" +#include "logsdb/BatchWriter.hpp" +#include "logsdb/CatchupReader.hpp" +#include "logsdb/DataPartitions.hpp" +#include "logsdb/LeaderElection.hpp" +#include "logsdb/LogMetadata.hpp" +#include "logsdb/LogsDBCommon.hpp" +#include "logsdb/LogsDBData.hpp" +#include "logsdb/ReqResp.hpp" std::ostream& operator<<(std::ostream& out, const LogsDBLogEntry& entry) { out << entry.idx << ":"; @@ -36,28 +44,6 @@ std::ostream& operator<<(std::ostream& out, const LogsDBResponse& entry) { return out << "replicaId: " << entry.replicaId << "[ " << entry.msg << "]"; } -static bool tryGet(rocksdb::DB* db, rocksdb::ColumnFamilyHandle* cf, const rocksdb::Slice& key, std::string& value) { - auto status = db->Get({}, cf, key, &value); - if (status.IsNotFound()) { - return false; - } - ROCKS_DB_CHECKED(status); - return true; -}; - -static constexpr auto METADATA_CF_NAME = "logMetadata"; -static constexpr auto DATA_PARTITION_0_NAME = "logTimePartition0"; -static constexpr auto DATA_PARTITION_1_NAME = "logTimePartition1"; - - -static void update_atomic_stat_ema(std::atomic& stat, double newValue) { - stat.store((stat.load(std::memory_order_relaxed)* 0.95 + newValue * 0.05), std::memory_order_relaxed); -} - -static void update_atomic_stat_ema(std::atomic& stat, Duration newValue) { - stat.store((Duration)((double)stat.load(std::memory_order_relaxed).ns * 0.95 + (double)newValue.ns * 0.05), std::memory_order_relaxed); -} - std::vector LogsDB::getColumnFamilyDescriptors() { return { {METADATA_CF_NAME,{}}, @@ -73,1518 +59,7 @@ void LogsDB::clearAllData(SharedRocksDB &shardDB) { shardDB.db()->FlushWAL(true); } -struct LogPartition { - std::string name; - LogsDBMetadataKey firstWriteKey; - rocksdb::ColumnFamilyHandle* cf; - TernTime firstWriteTime{0}; - LogIdx minKey{0}; - LogIdx maxKey{0}; - - void reset(rocksdb::ColumnFamilyHandle* cf_, LogIdx minMaxKey, TernTime firstWriteTime_) { - cf = cf_; - minKey = maxKey = minMaxKey; - firstWriteTime = firstWriteTime_; - } -}; - -class DataPartitions { -public: - class Iterator { - public: - Iterator(const DataPartitions& partitions) : _partitions(partitions), _rotationCount(_partitions._rotationCount), _smaller(nullptr) { - _iterators = _partitions._getPartitionIterators(); - } - - void seek(LogIdx idx) { - if (unlikely(_rotationCount != _partitions._rotationCount)) { - _iterators = _partitions._getPartitionIterators(); - } - auto key = U64Key::Static(idx.u64); - for (auto& it : _iterators) { - it->Seek(key.toSlice()); - } - _updateSmaller(); - } - - bool valid() const { - return _smaller != nullptr; - } - - void next() { - if (_smaller != nullptr) { - _smaller->Next(); - } - _updateSmaller(); - } - Iterator& operator++() { - this->next(); - return *this; - } - - LogIdx key() const { - return LogIdx(ExternalValue::FromSlice(_smaller->key())().u64()); - } - - LogsDBLogEntry entry() const { - auto value = _smaller->value(); - return LogsDBLogEntry{key(), {(const uint8_t*)value.data(), (const uint8_t*)value.data() + value.size()}}; - } - - void dropEntry() { - ALWAYS_ASSERT(_rotationCount == _partitions._rotationCount); - auto cfIdx = _cfIndexForCurrentIterator(); - ROCKS_DB_CHECKED(_partitions._sharedDb.db()->Delete({}, _partitions._partitions[cfIdx].cf, _smaller->key())); - } - - private: - void _updateSmaller() { - _smaller = nullptr; - for (auto& it : _iterators) { - if (!it->Valid()) { - continue; - } - if (_smaller == nullptr || (rocksdb::BytewiseComparator()->Compare(it->key(),_smaller->key()) < 0)) { - _smaller = it.get(); - } - } - } - size_t _cfIndexForCurrentIterator() const { - for (size_t i = 0; i < _iterators.size(); ++i) { - if (_smaller == _iterators[i].get()) { - return i; - } - } - return -1; - } - const DataPartitions& _partitions; - size_t _rotationCount; - rocksdb::Iterator* _smaller; - std::vector> _iterators; - }; - - DataPartitions(Env& env, SharedRocksDB& sharedDB) - : - _env(env), - _sharedDb(sharedDB), - _rotationCount(0), - _partitions({ - LogPartition{ - DATA_PARTITION_0_NAME, - LogsDBMetadataKey::PARTITION_0_FIRST_WRITE_TIME, - sharedDB.getCF(DATA_PARTITION_0_NAME), - 0, - 0, - 0 - }, - LogPartition{ - DATA_PARTITION_1_NAME, - LogsDBMetadataKey::PARTITION_1_FIRST_WRITE_TIME, - sharedDB.getCF(DATA_PARTITION_1_NAME), - 0, - 0, - 0 - } - }) - {} - - bool isInitialStart() { - auto it1 = std::unique_ptr(_sharedDb.db()->NewIterator({},_partitions[0].cf)); - auto it2 = std::unique_ptr(_sharedDb.db()->NewIterator({},_partitions[1].cf)); - it1->SeekToFirst(); - it2->SeekToFirst(); - return !(it1->Valid() || it2->Valid()); - } - - bool init(bool initialStart) { - bool initSuccess = true; - auto metadataCF = _sharedDb.getCF(METADATA_CF_NAME); - auto db = _sharedDb.db(); - std::string value; - for (auto& partition : _partitions) { - if (tryGet(db, metadataCF, logsDBMetadataKey(partition.firstWriteKey), value)) { - partition.firstWriteTime = ExternalValue::FromSlice(value)().u64(); - LOG_INFO(_env, "Loaded partition %s first write time %s", partition.name, partition.firstWriteTime); - } else if (initialStart) { - LOG_INFO(_env, "Partition %s first write time not found. Using %s", partition.name, partition.firstWriteTime); - _updatePartitionFirstWriteTime(partition, partition.firstWriteTime); - } else { - initSuccess = false; - LOG_ERROR(_env, "Partition %s first write time not found. Possible DB corruption!", partition.name); - } - } - { - auto partitionIterators = _getPartitionIterators(); - for (size_t i = 0; i < partitionIterators.size(); ++i) { - auto it = partitionIterators[i].get(); - auto& partition = _partitions[i]; - it->SeekToFirst(); - if (!it->Valid()) { - if (partition.firstWriteTime != 0) { - LOG_ERROR(_env, "No keys found in partition %s, but first write time is %s. DB Corruption!", partition.name, partition.firstWriteTime); - initSuccess = false; - } else { - LOG_INFO(_env, "Partition %s empty.", partition.name); - } - continue; - } - partition.minKey = ExternalValue::FromSlice(it->key())().u64(); - it->SeekToLast(); - // If at least one key exists seeking to last should never fail. - ROCKS_DB_CHECKED(it->status()); - partition.maxKey = ExternalValue::FromSlice(it->key())().u64(); - } - } - return initSuccess; - } - - Iterator getIterator() const { - return Iterator(*this); - } - - TernError readLogEntry(LogIdx logIdx, LogsDBLogEntry& entry) const { - auto& partition = _getPartitionForIdx(logIdx); - if (unlikely(logIdx < partition.minKey)) { - return TernError::LOG_ENTRY_TRIMMED; - } - - auto key = U64Key::Static(logIdx.u64); - rocksdb::PinnableSlice value; - auto status = _sharedDb.db()->Get({}, partition.cf, key.toSlice(), &value); - if (status.IsNotFound()) { - return TernError::LOG_ENTRY_MISSING; - } - ROCKS_DB_CHECKED(status); - entry.idx = logIdx; - entry.value.assign((const uint8_t*)value.data(), (const uint8_t*)value.data() + value.size()); - return TernError::NO_ERROR; - } - - void readIndexedEntries(const std::vector& indices, std::vector& entries) const { - entries.clear(); - if (indices.empty()) { - return; - } - // TODO: This is not very efficient as we're doing a lookup for each index. - entries.reserve(indices.size()); - for (auto idx : indices) { - LogsDBLogEntry& entry = entries.emplace_back(); - if (readLogEntry(idx, entry) != TernError::NO_ERROR) { - entry.idx = 0; - } - } - } - - void writeLogEntries(const std::vector& entries) { - _maybeRotate(); - - rocksdb::WriteBatch batch; - std::vector> keys; - keys.reserve(entries.size()); - for (const auto& entry : entries) { - auto& partition = _getPartitionForIdx(entry.idx); - keys.emplace_back(U64Key::Static(entry.idx.u64)); - batch.Put(partition.cf, keys.back().toSlice(), rocksdb::Slice((const char*)entry.value.data(), entry.value.size())); - _partitionKeyInserted(partition, entry.idx); - } - ROCKS_DB_CHECKED(_sharedDb.db()->Write({}, &batch)); - } - - void writeLogEntry(const LogsDBLogEntry& entry) { - _maybeRotate(); - - auto& partition = _getPartitionForIdx(entry.idx); - _sharedDb.db()->Put({}, partition.cf, U64Key::Static(entry.idx.u64).toSlice(), rocksdb::Slice((const char*)entry.value.data(), entry.value.size())); - _partitionKeyInserted(partition, entry.idx); - - } - - void dropEntriesAfterIdx(LogIdx start) { - auto iterator = getIterator(); - size_t droppedEntriesCount = 0; - for (iterator.seek(start), iterator.next(); iterator.valid(); ++iterator) { - iterator.dropEntry(); - ++droppedEntriesCount; - } - LOG_INFO(_env,"Dropped %s entries after %s", droppedEntriesCount, start); - } - - LogIdx getLowestKey() const { - return std::min(_partitions[0].firstWriteTime == 0 ? MAX_LOG_IDX : _partitions[0].minKey, _partitions[1].firstWriteTime == 0 ? MAX_LOG_IDX : _partitions[1].minKey); - } - -private: - void _updatePartitionFirstWriteTime(LogPartition& partition, TernTime time) { - ROCKS_DB_CHECKED(_sharedDb.db()->Put({}, _sharedDb.getCF(METADATA_CF_NAME), logsDBMetadataKey(partition.firstWriteKey), U64Value::Static(time.ns).toSlice())); - partition.firstWriteTime = time; - } - - std::vector> _getPartitionIterators() const { - std::vector cfHandles; - cfHandles.reserve(_partitions.size()); - for (const auto& partition : _partitions) { - cfHandles.emplace_back(partition.cf); - } - std::vector> iterators; - iterators.reserve(_partitions.size()); - ROCKS_DB_CHECKED(_sharedDb.db()->NewIterators({}, cfHandles, (std::vector*)(&iterators))); - return iterators; - } - - void _maybeRotate() { - auto& partition = _getPartitionForIdx(MAX_LOG_IDX); - if (likely(partition.firstWriteTime == 0 || (partition.firstWriteTime + LogsDB::PARTITION_TIME_SPAN > ternNow()))) { - return; - } - // we only need to drop older partition and reset it's info. - // picking partition for writes/reads takes care of rest - auto& olderPartition = _partitions[0].minKey < _partitions[1].minKey ? _partitions[0] : _partitions[1]; - LOG_INFO(_env, "Rotating partions. Dropping partition %s, firstWriteTime: %s, minKey: %s, maxKey: %s", olderPartition.name, olderPartition.firstWriteTime, olderPartition.minKey, olderPartition.maxKey); - - _sharedDb.deleteCF(olderPartition.name); - olderPartition.reset(_sharedDb.createCF({olderPartition.name,{}}),0,0); - _updatePartitionFirstWriteTime(olderPartition, 0); - ++_rotationCount; - } - - LogPartition& _getPartitionForIdx(LogIdx key) { - return const_cast(static_cast(this)->_getPartitionForIdx(key)); - } - - const LogPartition& _getPartitionForIdx(LogIdx key) const { - // This is a bit of a mess of ifs but I (mcrnic) am unsure how to do it better at this point. - // Logic is roughly: - // 1. If both are empty we return partition 0. - // 2. If only 1 is empty then it's likely we just rotated and key will be larger than range of old partition so we return new one, - // if it fits in old partition (we are backfilling missed data) we returned the old one - // 3. Both contain data, likely the key is in newer partition (newerPartition.minKey) <= key - // Note that there is inefficiency in case of empty DB where first key will be written in partition 0 and second one will immediately go to partition 1 - // This is irrelevant from correctness of rotation/retention perspective and will be ignored. - if (unlikely(_partitions[0].firstWriteTime == 0 && _partitions[1].firstWriteTime == 0)) { - return _partitions[0]; - } - if (unlikely(_partitions[0].firstWriteTime == 0)) { - if (likely(_partitions[1].maxKey < key)) { - return _partitions[0]; - } - return _partitions[1]; - } - if (unlikely(_partitions[1].firstWriteTime == 0)) { - if (likely(_partitions[0].maxKey < key)) { - return _partitions[1]; - } - return _partitions[0]; - } - int newerPartitionIdx = _partitions[0].minKey < _partitions[1].minKey ? 1 : 0; - if (likely(_partitions[newerPartitionIdx].minKey <= key)) { - return _partitions[newerPartitionIdx]; - } - - return _partitions[newerPartitionIdx ^ 1]; - } - - void _partitionKeyInserted(LogPartition& partition, LogIdx idx) { - if (unlikely(partition.minKey == 0)) { - partition.minKey = idx; - _updatePartitionFirstWriteTime(partition, ternNow()); - } - partition.minKey = std::min(partition.minKey, idx); - partition.maxKey = std::max(partition.maxKey, idx); - } - - Env& _env; - SharedRocksDB& _sharedDb; - size_t _rotationCount; - std::array _partitions; -}; - -class LogMetadata { -public: - LogMetadata(Env& env, LogsDBStats& stats, SharedRocksDB& sharedDb, ReplicaId replicaId, DataPartitions& data) : - _env(env), - _stats(stats), - _sharedDb(sharedDb), - _cf(sharedDb.getCF(METADATA_CF_NAME)), - _replicaId(replicaId), - _data(data), - _nomineeToken(LeaderToken(0,0)) - {} - - bool isInitialStart() { - auto it = std::unique_ptr(_sharedDb.db()->NewIterator({},_cf)); - it->SeekToFirst(); - return !it->Valid(); - } - - bool init(bool initialStart) { - bool initSuccess = true; - std::string value; - if (tryGet(_sharedDb.db(), _cf, logsDBMetadataKey(LEADER_TOKEN_KEY), value)) { - _leaderToken.u64 = ExternalValue::FromSlice(value)().u64(); - LOG_INFO(_env, "Loaded leader token %s", _leaderToken); - } else if (initialStart) { - _leaderToken = LeaderToken(0,0); - LOG_INFO(_env, "Leader token not found. Using %s", _leaderToken); - ROCKS_DB_CHECKED(_sharedDb.db()->Put({}, _cf, logsDBMetadataKey(LEADER_TOKEN_KEY), U64Value::Static(_leaderToken.u64).toSlice())); - } else { - initSuccess = false; - LOG_ERROR(_env, "Leader token not found! Possible DB corruption!"); - } - - if (tryGet(_sharedDb.db(), _cf, logsDBMetadataKey(LAST_RELEASED_IDX_KEY), value)) { - _lastReleased = ExternalValue::FromSlice(value)().u64(); - LOG_INFO(_env, "Loaded last released %s", _lastReleased); - } else if (initialStart) { - LOG_INFO(_env, "Last released not found. Using %s", 0); - setLastReleased(0); - } else { - initSuccess = false; - LOG_ERROR(_env, "Last released not found! Possible DB corruption!"); - } - - if (tryGet(_sharedDb.db(),_cf, logsDBMetadataKey(LAST_RELEASED_TIME_KEY), value)) { - _lastReleasedTime = ExternalValue::FromSlice(value)().u64(); - LOG_INFO(_env, "Loaded last released time %s", _lastReleasedTime); - } else { - initSuccess = false; - LOG_ERROR(_env, "Last released time not found! Possible DB corruption!"); - } - _stats.currentEpoch.store(_leaderToken.idx().u64, std::memory_order_relaxed); - return initSuccess; - } - - ReplicaId getReplicaId() const { - return _replicaId; - } - - LogIdx assignLogIdx() { - ALWAYS_ASSERT(_leaderToken.replica() ==_replicaId); - return ++_lastAssigned; - } - - LeaderToken getLeaderToken() const { - return _leaderToken; - } - - TernError updateLeaderToken(LeaderToken token) { - if (unlikely(token < _leaderToken || token < _nomineeToken)) { - return TernError::LEADER_PREEMPTED; - } - if (likely(token == _leaderToken)) { - return TernError::NO_ERROR; - } - _data.dropEntriesAfterIdx(_lastReleased); - ROCKS_DB_CHECKED(_sharedDb.db()->Put({}, _cf, logsDBMetadataKey(LEADER_TOKEN_KEY), U64Value::Static(token.u64).toSlice())); - if (_leaderToken != token && token.replica() == _replicaId) { - // We just became leader, at this point last released should be the last known entry - _lastAssigned = _lastReleased; - } - _leaderToken = token; - _stats.currentEpoch.store(_leaderToken.idx().u64, std::memory_order_relaxed); - _nomineeToken = LeaderToken(0,0); - return TernError::NO_ERROR; - } - - LeaderToken getNomineeToken() const { - return _nomineeToken; - } - - void setNomineeToken(LeaderToken token) { - if (++_leaderToken.idx() < _nomineeToken.idx()) { - LOG_INFO(_env, "Got a nominee token for epoch %s, last leader epoch is %s, we must have skipped leader election.", _nomineeToken.idx(), _leaderToken.idx()); - _data.dropEntriesAfterIdx(_lastReleased); - } - _nomineeToken = token; - } - - LeaderToken generateNomineeToken() const { - auto lastEpoch = _leaderToken.idx(); - return LeaderToken(_replicaId, ++lastEpoch); - } - - LogIdx getLastReleased() const { - return _lastReleased; - } - - TernTime getLastReleasedTime() const { - return _lastReleasedTime; - } - - void setLastReleased(LogIdx lastReleased) { - ALWAYS_ASSERT(_lastReleased <= lastReleased, "Moving release point backwards is not possible. It would cause data inconsistency"); - auto now = ternNow(); - rocksdb::WriteBatch batch; - batch.Put(_cf, logsDBMetadataKey(LAST_RELEASED_IDX_KEY), U64Value::Static(lastReleased.u64).toSlice()); - batch.Put(_cf, logsDBMetadataKey(LAST_RELEASED_TIME_KEY),U64Value::Static(now.ns).toSlice()); - ROCKS_DB_CHECKED(_sharedDb.db()->Write({}, &batch)); - update_atomic_stat_ema(_stats.entriesReleased, lastReleased.u64 - _lastReleased.u64); - _lastReleased = lastReleased; - _lastReleasedTime = now; - } - - bool isPreempting(LeaderToken token) const { - return _leaderToken < token && _nomineeToken < token; - } - -private: - Env& _env; - LogsDBStats& _stats; - SharedRocksDB& _sharedDb; - rocksdb::ColumnFamilyHandle* _cf; - const ReplicaId _replicaId; - DataPartitions& _data; - - LogIdx _lastAssigned; - LogIdx _lastReleased; - TernTime _lastReleasedTime; - LeaderToken _leaderToken; - LeaderToken _nomineeToken; -}; - -class ReqResp { - public: - static constexpr size_t UNUSED_REQ_ID = std::numeric_limits::max(); - static constexpr size_t CONFIRMED_REQ_ID = 0; - - using QuorumTrackArray = std::array; - - ReqResp(LogsDBStats& stats) : _stats(stats), _lastAssignedRequest(CONFIRMED_REQ_ID) {} - - LogsDBRequest& newRequest(ReplicaId targetReplicaId) { - auto& request = _requests[++_lastAssignedRequest]; - request.replicaId = targetReplicaId; - request.msg.id = _lastAssignedRequest; - return request; - } - - LogsDBRequest* getRequest(uint64_t requestId) { - auto it = _requests.find(requestId); - if (it == _requests.end()) { - return nullptr; - } - return &it->second; - } - - void eraseRequest(uint64_t requestId) { - _requests.erase(requestId); - } - - void cleanupRequests(QuorumTrackArray& requestIds) { - for (auto& reqId : requestIds) { - if (reqId == CONFIRMED_REQ_ID || reqId == UNUSED_REQ_ID) { - continue; - } - eraseRequest(reqId); - reqId = ReqResp::UNUSED_REQ_ID; - } - } - - void resendTimedOutRequests() { - auto now = ternNow(); - auto defaultCutoffTime = now - LogsDB::RESPONSE_TIMEOUT; - auto releaseCutoffTime = now - LogsDB::SEND_RELEASE_INTERVAL; - auto readCutoffTime = now - LogsDB::READ_TIMEOUT; - auto cutoffTime = now; - uint64_t timedOutCount{0}; - for (auto& r : _requests) { - switch (r.second.msg.body.kind()) { - case LogMessageKind::RELEASE: - cutoffTime = releaseCutoffTime; - break; - case LogMessageKind::LOG_READ: - cutoffTime = readCutoffTime; - break; - default: - cutoffTime = defaultCutoffTime; - } - if (r.second.sentTime < cutoffTime) { - r.second.sentTime = now; - _requestsToSend.emplace_back(&r.second); - if (r.second.msg.body.kind() != LogMessageKind::RELEASE) { - ++timedOutCount; - } - } - } - update_atomic_stat_ema(_stats.requestsTimedOut, timedOutCount); - } - - void getRequestsToSend(std::vector& requests) { - requests.swap(_requestsToSend); - update_atomic_stat_ema(_stats.requestsSent, requests.size()); - _requestsToSend.clear(); - } - - LogsDBResponse& newResponse(ReplicaId targetReplicaId, uint64_t requestId) { - _responses.emplace_back(); - auto& response = _responses.back(); - response.replicaId = targetReplicaId; - response.msg.id = requestId; - return response; - } - - void getResponsesToSend(std::vector& responses) { - responses.swap(_responses); - update_atomic_stat_ema(_stats.responsesSent, responses.size()); - _responses.clear(); - } - - Duration getNextTimeout() const { - if (_requests.empty()) { - return LogsDB::LEADER_INACTIVE_TIMEOUT; - } - return LogsDB::RESPONSE_TIMEOUT; - } - - static bool isQuorum(const QuorumTrackArray& requestIds) { - size_t numResponses = 0; - for (auto reqId : requestIds) { - if (reqId == CONFIRMED_REQ_ID) { - ++numResponses; - } - } - return numResponses > requestIds.size() / 2; - } - -private: - LogsDBStats& _stats; - uint64_t _lastAssignedRequest; - std::unordered_map _requests; - std::vector _requestsToSend; - - std::vector _responses; -}; - -enum class LeadershipState : uint8_t { - FOLLOWER, - BECOMING_NOMINEE, - DIGESTING_ENTRIES, - CONFIRMING_REPLICATION, - CONFIRMING_LEADERSHIP, - LEADER -}; - -std::ostream& operator<<(std::ostream& out, LeadershipState state) { - switch (state) { - case LeadershipState::FOLLOWER: - out << "FOLLOWER"; - break; - case LeadershipState::BECOMING_NOMINEE: - out << "BECOMING_NOMINEE"; - break; - case LeadershipState::DIGESTING_ENTRIES: - out << "DIGESTING_ENTRIES"; - break; - case LeadershipState::CONFIRMING_REPLICATION: - out << "CONFIRMING_REPLICATION"; - break; - case LeadershipState::CONFIRMING_LEADERSHIP: - out << "CONFIRMING_LEADERSHIP"; - break; - case LeadershipState::LEADER: - out << "LEADER"; - break; - } - return out; -} - -struct LeaderElectionState { - ReqResp::QuorumTrackArray requestIds; - LogIdx lastReleased; - std::array recoveryRequests; - std::array recoveryEntries; -}; - -class LeaderElection { -public: - LeaderElection(Env& env, LogsDBStats& stats, bool noReplication, bool avoidBeingLeader, ReplicaId replicaId, LogMetadata& metadata, DataPartitions& data, ReqResp& reqResp) : - _env(env), - _stats(stats), - _noReplication(noReplication), - _avoidBeingLeader(avoidBeingLeader), - _replicaId(replicaId), - _metadata(metadata), - _data(data), - _reqResp(reqResp), - _state(LeadershipState::FOLLOWER), - _leaderLastActive(_noReplication ? 0 :ternNow()) {} - - bool isLeader() const { - return _state == LeadershipState::LEADER; - } - - void maybeStartLeaderElection() { - if (unlikely(_avoidBeingLeader)) { - return; - } - auto now = ternNow(); - if (_state != LeadershipState::FOLLOWER || - (_leaderLastActive + LogsDB::LEADER_INACTIVE_TIMEOUT > now)) { - update_atomic_stat_ema(_stats.leaderLastActive, now - _leaderLastActive); - return; - } - auto nomineeToken = _metadata.generateNomineeToken(); - LOG_INFO(_env,"Starting new leader election round with token %s", nomineeToken); - _metadata.setNomineeToken(nomineeToken); - _state = LeadershipState::BECOMING_NOMINEE; - - _electionState.reset(new LeaderElectionState()); - _electionState->lastReleased = _metadata.getLastReleased(); - _leaderLastActive = now; - - //if (unlikely(_noReplication)) { - { - LOG_INFO(_env,"ForceLeader set, skipping to confirming leader phase"); - _electionState->requestIds.fill(ReqResp::CONFIRMED_REQ_ID); - _tryBecomeLeader(); - return; - } - auto& newLeaderRequestIds = _electionState->requestIds; - for (ReplicaId replicaId = 0; replicaId.u8 < newLeaderRequestIds.size(); ++replicaId.u8) { - if (replicaId == _replicaId) { - newLeaderRequestIds[replicaId.u8] = ReqResp::CONFIRMED_REQ_ID; - continue; - } - auto& request = _reqResp.newRequest(replicaId); - newLeaderRequestIds[replicaId.u8] = request.msg.id; - - auto& newLeaderRequest = request.msg.body.setNewLeader(); - newLeaderRequest.nomineeToken = nomineeToken; - } - } - - void proccessNewLeaderResponse(ReplicaId fromReplicaId, LogsDBRequest& request, const NewLeaderResp& response) { - LOG_DEBUG(_env, "Received NEW_LEADER response %s from replicaId %s", response, fromReplicaId); - ALWAYS_ASSERT(_state == LeadershipState::BECOMING_NOMINEE, "In state %s Received NEW_LEADER response %s", _state, response); - auto& state = *_electionState; - ALWAYS_ASSERT(_electionState->requestIds[fromReplicaId.u8] == request.msg.id); - auto result = TernError(response.result); - switch (result) { - case TernError::NO_ERROR: - _electionState->requestIds[request.replicaId.u8] = ReqResp::CONFIRMED_REQ_ID; - _electionState->lastReleased = std::max(_electionState->lastReleased, response.lastReleased); - _reqResp.eraseRequest(request.msg.id); - _tryProgressToDigest(); - break; - case TernError::LEADER_PREEMPTED: - resetLeaderElection(); - break; - default: - LOG_ERROR(_env, "Unexpected result %s in NEW_LEADER message, %s", result, response); - break; - } - } - - void proccessNewLeaderConfirmResponse(ReplicaId fromReplicaId, LogsDBRequest& request, const NewLeaderConfirmResp& response) { - ALWAYS_ASSERT(_state == LeadershipState::CONFIRMING_LEADERSHIP, "In state %s Received NEW_LEADER_CONFIRM response %s", _state, response); - ALWAYS_ASSERT(_electionState->requestIds[fromReplicaId.u8] == request.msg.id); - - auto result = TernError(response.result); - switch (result) { - case TernError::NO_ERROR: - _electionState->requestIds[request.replicaId.u8] = 0; - _reqResp.eraseRequest(request.msg.id); - LOG_DEBUG(_env,"trying to become leader"); - _tryBecomeLeader(); - break; - case TernError::LEADER_PREEMPTED: - resetLeaderElection(); - break; - default: - LOG_ERROR(_env, "Unexpected result %s in NEW_LEADER_CONFIRM message, %s", result, response); - break; - } - } - - void proccessRecoveryReadResponse(ReplicaId fromReplicaId, LogsDBRequest& request, const LogRecoveryReadResp& response) { - ALWAYS_ASSERT(_state == LeadershipState::DIGESTING_ENTRIES, "In state %s Received LOG_RECOVERY_READ response %s", _state, response); - auto& state = *_electionState; - auto result = TernError(response.result); - switch (result) { - case TernError::NO_ERROR: - case TernError::LOG_ENTRY_MISSING: - { - ALWAYS_ASSERT(state.lastReleased < request.msg.body.getLogRecoveryRead().idx); - auto entryOffset = request.msg.body.getLogRecoveryRead().idx.u64 - state.lastReleased.u64 - 1; - ALWAYS_ASSERT(entryOffset < LogsDB::IN_FLIGHT_APPEND_WINDOW); - ALWAYS_ASSERT(state.recoveryRequests[entryOffset][request.replicaId.u8] == request.msg.id); - auto& entry = state.recoveryEntries[entryOffset]; - if (response.value.els.size() != 0) { - // we found a record here, we don't care about other answers - entry.value = response.value.els; - _reqResp.cleanupRequests(state.recoveryRequests[entryOffset]); - } else { - state.recoveryRequests[entryOffset][request.replicaId.u8] = 0; - _reqResp.eraseRequest(request.msg.id); - } - _tryProgressToReplication(); - break; - } - case TernError::LEADER_PREEMPTED: - LOG_DEBUG(_env, "Got preempted during recovery by replica %s",fromReplicaId); - resetLeaderElection(); - break; - default: - LOG_ERROR(_env, "Unexpected result %s in LOG_RECOVERY_READ message, %s", result, response); - break; - } - } - - void proccessRecoveryWriteResponse(ReplicaId fromReplicaId, LogsDBRequest& request, const LogRecoveryWriteResp& response) { - ALWAYS_ASSERT(_state == LeadershipState::CONFIRMING_REPLICATION, "In state %s Received LOG_RECOVERY_WRITE response %s", _state, response); - auto& state = *_electionState; - auto result = TernError(response.result); - switch (result) { - case TernError::NO_ERROR: - { - ALWAYS_ASSERT(state.lastReleased < request.msg.body.getLogRecoveryWrite().idx); - auto entryOffset = request.msg.body.getLogRecoveryWrite().idx.u64 - state.lastReleased.u64 - 1; - ALWAYS_ASSERT(entryOffset < LogsDB::IN_FLIGHT_APPEND_WINDOW); - ALWAYS_ASSERT(state.recoveryRequests[entryOffset][request.replicaId.u8] == request.msg.id); - state.recoveryRequests[entryOffset][request.replicaId.u8] = 0; - _reqResp.eraseRequest(request.msg.id); - _tryProgressToLeaderConfirm(); - break; - } - case TernError::LEADER_PREEMPTED: - resetLeaderElection(); - break; - default: - LOG_ERROR(_env, "Unexpected result %s in LOG_RECOVERY_READ message, %s", result, response); - break; - } - } - - void proccessNewLeaderRequest(ReplicaId fromReplicaId, uint64_t requestId, const NewLeaderReq& request) { - if (unlikely(fromReplicaId != request.nomineeToken.replica())) { - LOG_ERROR(_env, "Nominee token from replica id %s does not have matching replica id. Token: %s", fromReplicaId, request.nomineeToken); - return; - } - auto& response = _reqResp.newResponse( fromReplicaId, requestId); - auto& newLeaderResponse = response.msg.body.setNewLeader(); - - if (request.nomineeToken.idx() <= _metadata.getLeaderToken().idx() || request.nomineeToken < _metadata.getNomineeToken()) { - newLeaderResponse.result = TernError::LEADER_PREEMPTED; - return; - } - - newLeaderResponse.result = TernError::NO_ERROR; - newLeaderResponse.lastReleased = _metadata.getLastReleased(); - _leaderLastActive = ternNow(); - - if (_metadata.getNomineeToken() == request.nomineeToken) { - return; - } - - resetLeaderElection(); - _metadata.setNomineeToken(request.nomineeToken); - } - - void proccessNewLeaderConfirmRequest(ReplicaId fromReplicaId, uint64_t requestId, const NewLeaderConfirmReq& request) { - if (unlikely(fromReplicaId != request.nomineeToken.replica())) { - LOG_ERROR(_env, "Nominee token from replica id %s does not have matching replica id. Token: %s", fromReplicaId, request.nomineeToken); - return; - } - auto& response = _reqResp.newResponse(fromReplicaId, requestId); - auto& newLeaderConfirmResponse = response.msg.body.setNewLeaderConfirm(); - if (_metadata.getNomineeToken() == request.nomineeToken) { - _metadata.setLastReleased(request.releasedIdx); - } - - auto err = _metadata.updateLeaderToken(request.nomineeToken); - newLeaderConfirmResponse.result = err; - if (err == TernError::NO_ERROR) { - _leaderLastActive = ternNow(); - resetLeaderElection(); - } - } - - void proccessRecoveryReadRequest(ReplicaId fromReplicaId, uint64_t requestId, const LogRecoveryReadReq& request) { - if (unlikely(fromReplicaId != request.nomineeToken.replica())) { - LOG_ERROR(_env, "Nominee token from replica id %s does not have matching replica id. Token: %s", fromReplicaId, request.nomineeToken); - return; - } - auto& response = _reqResp.newResponse(fromReplicaId, requestId); - auto& recoveryReadResponse = response.msg.body.setLogRecoveryRead(); - if (request.nomineeToken != _metadata.getNomineeToken()) { - recoveryReadResponse.result = TernError::LEADER_PREEMPTED; - return; - } - _leaderLastActive = ternNow(); - LogsDBLogEntry entry; - auto err = _data.readLogEntry(request.idx, entry); - recoveryReadResponse.result = err; - if (err == TernError::NO_ERROR) { - recoveryReadResponse.value.els = entry.value; - } - } - - void proccessRecoveryWriteRequest(ReplicaId fromReplicaId, uint64_t requestId, const LogRecoveryWriteReq& request) { - if (unlikely(fromReplicaId != request.nomineeToken.replica())) { - LOG_ERROR(_env, "Nominee token from replica id %s does not have matching replica id. Token: %s", fromReplicaId, request.nomineeToken); - return; - } - auto& response = _reqResp.newResponse(fromReplicaId, requestId); - auto& recoveryWriteResponse = response.msg.body.setLogRecoveryWrite(); - if (request.nomineeToken != _metadata.getNomineeToken()) { - recoveryWriteResponse.result = TernError::LEADER_PREEMPTED; - return; - } - _leaderLastActive = ternNow(); - LogsDBLogEntry entry; - entry.idx = request.idx; - entry.value = request.value.els; - _data.writeLogEntry(entry); - recoveryWriteResponse.result = TernError::NO_ERROR; - } - - TernError writeLogEntries(LeaderToken token, LogIdx newlastReleased, std::vector& entries) { - auto err = _metadata.updateLeaderToken(token); - if (err != TernError::NO_ERROR) { - return err; - } - _clearElectionState(); - _data.writeLogEntries(entries); - if (_metadata.getLastReleased() < newlastReleased) { - _metadata.setLastReleased(newlastReleased); - } - return TernError::NO_ERROR; - } - - void resetLeaderElection() { - if (isLeader()) { - LOG_INFO(_env,"Preempted as leader. Reseting leader election. Becoming follower"); - } else { - LOG_INFO(_env,"Reseting leader election. Becoming follower of leader with token %s", _metadata.getLeaderToken()); - } - _state = LeadershipState::FOLLOWER; - _leaderLastActive = ternNow(); - _metadata.setNomineeToken(LeaderToken(0,0)); - _clearElectionState(); - } - -private: - - void _tryProgressToDigest() { - ALWAYS_ASSERT(_state == LeadershipState::BECOMING_NOMINEE); - LOG_DEBUG(_env, "trying to progress to digest"); - if (!ReqResp::isQuorum(_electionState->requestIds)) { - return; - } - _reqResp.cleanupRequests(_electionState->requestIds); - _state = LeadershipState::DIGESTING_ENTRIES; - LOG_INFO(_env,"Became nominee with token: %s", _metadata.getNomineeToken()); - - // We might have gotten a higher release point. We can safely update - _metadata.setLastReleased(_electionState->lastReleased); - - // Populate entries we have and don't ask for them - std::vector entries; - entries.reserve(LogsDB::IN_FLIGHT_APPEND_WINDOW); - auto it = _data.getIterator(); - it.seek(_electionState->lastReleased); - it.next(); - for(; it.valid(); ++it) { - entries.emplace_back(it.entry()); - } - ALWAYS_ASSERT(entries.size() <= LogsDB::IN_FLIGHT_APPEND_WINDOW); - for (auto& entry : entries) { - auto offset = entry.idx.u64 - _electionState->lastReleased.u64 - 1; - _electionState->recoveryEntries[offset] = entry; - } - - // Ask for all non populated entries - for(size_t i = 0; i < _electionState->recoveryEntries.size(); ++i) { - auto& entry = _electionState->recoveryEntries[i]; - if (!entry.value.empty()) { - continue; - } - entry.idx = _electionState->lastReleased + i + 1; - auto& requestIds = _electionState->recoveryRequests[i]; - auto& participatingReplicas = _electionState->requestIds; - for(ReplicaId replicaId = 0; replicaId.u8 < LogsDB::REPLICA_COUNT; ++replicaId.u8) { - if (replicaId == _replicaId) { - requestIds[replicaId.u8] = ReqResp::CONFIRMED_REQ_ID; - continue; - } - if (participatingReplicas[replicaId.u8] != ReqResp::CONFIRMED_REQ_ID) { - requestIds[replicaId.u8] = ReqResp::UNUSED_REQ_ID; - continue; - } - auto& request = _reqResp.newRequest(replicaId); - auto& recoveryRead = request.msg.body.setLogRecoveryRead(); - recoveryRead.idx = entry.idx; - recoveryRead.nomineeToken = _metadata.getNomineeToken(); - requestIds[replicaId.u8] = request.msg.id; - } - } - } - - void _tryProgressToReplication() { - ALWAYS_ASSERT(_state == LeadershipState::DIGESTING_ENTRIES); - bool canMakeProgress{false}; - for(size_t i = 0; i < _electionState->recoveryEntries.size(); ++i) { - if (_electionState->recoveryEntries[i].value.empty()) { - auto& requestIds = _electionState->recoveryRequests[i]; - if (ReqResp::isQuorum(requestIds)) { - canMakeProgress = true; - } - if (canMakeProgress) { - _reqResp.cleanupRequests(requestIds); - continue; - } - return; - } - } - // If we came here it means whole array contains records - // Send replication requests until first hole - _state = LeadershipState::CONFIRMING_REPLICATION; - std::vector entries; - entries.reserve(_electionState->recoveryEntries.size()); - for(size_t i = 0; i < _electionState->recoveryEntries.size(); ++i) { - auto& entry = _electionState->recoveryEntries[i]; - if (entry.value.empty()) { - break; - } - auto& requestIds = _electionState->recoveryRequests[i]; - auto& participatingReplicas = _electionState->requestIds; - for (ReplicaId replicaId = 0; replicaId.u8 < LogsDB::REPLICA_COUNT; ++replicaId.u8) { - if (replicaId == replicaId) { - requestIds[replicaId.u8] = ReqResp::CONFIRMED_REQ_ID; - continue; - } - if (participatingReplicas[replicaId.u8] != ReqResp::CONFIRMED_REQ_ID) { - requestIds[replicaId.u8] = ReqResp::UNUSED_REQ_ID; - continue; - } - entries.emplace_back(entry); - auto& request = _reqResp.newRequest(replicaId); - auto& recoveryWrite = request.msg.body.setLogRecoveryWrite(); - recoveryWrite.idx = entry.idx; - recoveryWrite.nomineeToken = _metadata.getNomineeToken(); - recoveryWrite.value.els = entry.value; - requestIds[replicaId.u8] = request.msg.id; - } - } - LOG_INFO(_env,"Digesting complete progressing to replication of %s entries with token: %s", entries.size(), _metadata.getNomineeToken()); - if (entries.empty()) { - _tryProgressToLeaderConfirm(); - } else { - _data.writeLogEntries(entries); - } - } - - void _tryProgressToLeaderConfirm() { - ALWAYS_ASSERT(_state == LeadershipState::CONFIRMING_REPLICATION); - LogIdx newLastReleased = _electionState->lastReleased; - for(size_t i = 0; i < _electionState->recoveryEntries.size(); ++i) { - if (_electionState->recoveryEntries[i].value.empty()) { - break; - } - auto& requestIds = _electionState->recoveryRequests[i]; - if (!ReqResp::isQuorum(requestIds)) { - // we just confirmed replication up to this point. - // It is safe to move last released for us even if we don't become leader - // while not necessary for correctness it somewhat helps making progress in multiple preemtion case - _metadata.setLastReleased(newLastReleased); - return; - } - newLastReleased = _electionState->recoveryEntries[i].idx; - _reqResp.cleanupRequests(requestIds); - } - // we just confirmed replication up to this point. - // It is safe to move last released for us even if we don't become leader - // if we do become leader we guarantee state up here was readable - _metadata.setLastReleased(newLastReleased); - _state = LeadershipState::CONFIRMING_LEADERSHIP; - LOG_INFO(_env,"Replication of extra records complete. Progressing to CONFIRMING_LEADERSHIP with token: %s, newLastReleased: %s", _metadata.getNomineeToken(), newLastReleased); - - auto& requestIds = _electionState->requestIds; - for (ReplicaId replicaId = 0; replicaId.u8 < LogsDB::REPLICA_COUNT; ++replicaId.u8) { - if (replicaId == _replicaId) { - requestIds[replicaId.u8] = ReqResp::CONFIRMED_REQ_ID; - continue; - } - if (requestIds[replicaId.u8] == ReqResp::UNUSED_REQ_ID) { - continue; - } - auto& request = _reqResp.newRequest(replicaId); - auto& recoveryConfirm = request.msg.body.setNewLeaderConfirm(); - recoveryConfirm.nomineeToken = _metadata.getNomineeToken(); - recoveryConfirm.releasedIdx = _metadata.getLastReleased(); - requestIds[replicaId.u8] = request.msg.id; - } - } - - void _tryBecomeLeader() { - if (!ReqResp::isQuorum(_electionState->requestIds)) { - return; - } - auto nomineeToken = _metadata.getNomineeToken(); - ALWAYS_ASSERT(nomineeToken.replica() == _replicaId); - LOG_INFO(_env,"Became leader with token %s", nomineeToken); - _state = LeadershipState::LEADER; - ALWAYS_ASSERT(_metadata.updateLeaderToken(nomineeToken) == TernError::NO_ERROR); - _clearElectionState(); - } - - void _clearElectionState() { - _leaderLastActive = ternNow(); - if (!_electionState) { - return; - } - _reqResp.cleanupRequests(_electionState->requestIds); - _clearRecoveryRequests(); - _electionState.reset(); - } - - void _clearRecoveryRequests() { - for(auto& requestIds : _electionState->recoveryRequests) { - _reqResp.cleanupRequests(requestIds); - } - } - - Env& _env; - LogsDBStats& _stats; - const bool _noReplication; - const bool _avoidBeingLeader; - const ReplicaId _replicaId; - LogMetadata& _metadata; - DataPartitions& _data; - ReqResp& _reqResp; - - LeadershipState _state; - std::unique_ptr _electionState; - TernTime _leaderLastActive; -}; - -class BatchWriter { -public: - BatchWriter(Env& env, ReqResp& reqResp, LeaderElection& leaderElection) : - _env(env), - _reqResp(reqResp), - _leaderElection(leaderElection), - _token(LeaderToken(0,0)), - _lastReleased(0) {} - - void proccessLogWriteRequest(LogsDBRequest& request) { - ALWAYS_ASSERT(request.msg.body.kind() == LogMessageKind::LOG_WRITE); - const auto& writeRequest = request.msg.body.getLogWrite(); - if (unlikely(request.replicaId != writeRequest.token.replica())) { - LOG_ERROR(_env, "Token from replica id %s does not have matching replica id. Token: %s", request.replicaId, writeRequest.token); - return; - } - if (unlikely(writeRequest.token < _token)) { - auto& resp = _reqResp.newResponse(request.replicaId, request.msg.id); - auto& writeResponse = resp.msg.body.setLogWrite(); - writeResponse.result = TernError::LEADER_PREEMPTED; - return; - } - if (unlikely(_token < writeRequest.token )) { - writeBatch(); - _token = writeRequest.token; - } - _requests.emplace_back(&request); - _entries.emplace_back(); - auto& entry = _entries.back(); - entry.idx = writeRequest.idx; - entry.value = writeRequest.value.els; - if (_lastReleased < writeRequest.lastReleased) { - _lastReleased = writeRequest.lastReleased; - } - } - - void proccessReleaseRequest(ReplicaId fromReplicaId, uint64_t requestId, const ReleaseReq& request) { - if (unlikely(fromReplicaId != request.token.replica())) { - LOG_ERROR(_env, "Token from replica id %s does not have matching replica id. Token: %s", fromReplicaId, request.token); - return; - } - if (unlikely(request.token < _token)) { - return; - } - if (unlikely(_token < request.token )) { - writeBatch(); - _token = request.token; - } - - if (_lastReleased < request.lastReleased) { - _lastReleased = request.lastReleased; - } - - } - - void writeBatch() { - if (_token == LeaderToken(0,0)) { - return; - } - auto response = _leaderElection.writeLogEntries(_token, _lastReleased, _entries); - for (auto req : _requests) { - auto& resp = _reqResp.newResponse(req->replicaId, req->msg.id); - auto& writeResponse = resp.msg.body.setLogWrite(); - writeResponse.result = response; - } - _requests.clear(); - _entries.clear(); - _lastReleased = 0; - _token = LeaderToken(0,0); - } - -private: - Env& _env; - ReqResp& _reqResp; - LeaderElection& _leaderElection; - - LeaderToken _token; - LogIdx _lastReleased; - std::vector _requests; - std::vector _entries; -}; - -class CatchupReader { -public: - CatchupReader(LogsDBStats& stats, ReqResp& reqResp, LogMetadata& metadata, DataPartitions& data, ReplicaId replicaId, LogIdx lastRead) : - _stats(stats), - _reqResp(reqResp), - _metadata(metadata), - _data(data), - _replicaId(replicaId), - _lastRead(lastRead), - _lastContinuousIdx(lastRead), - _lastMissingIdx(lastRead) {} - - LogIdx getLastContinuous() const { - return _lastContinuousIdx; - } - - void readEntries(std::vector& entries, size_t maxEntries) { - if (_lastRead == _lastContinuousIdx) { - update_atomic_stat_ema(_stats.entriesRead, (uint64_t)0); - return; - } - auto lastReleased = _metadata.getLastReleased(); - auto startIndex = _lastRead; - ++startIndex; - - auto it = _data.getIterator(); - for (it.seek(startIndex); it.valid(); it.next(), ++startIndex) { - if (_lastContinuousIdx < it.key() || entries.size() >= maxEntries) { - break; - } - ALWAYS_ASSERT(startIndex == it.key()); - entries.emplace_back(it.entry()); - _lastRead = startIndex; - } - update_atomic_stat_ema(_stats.entriesRead, entries.size()); - update_atomic_stat_ema(_stats.readerLag, lastReleased.u64 - _lastRead.u64); - } - - void init() { - _missingEntries.reserve(LogsDB::CATCHUP_WINDOW); - _requestIds.reserve(LogsDB::CATCHUP_WINDOW); - _findMissingEntries(); - } - - void maybeCatchUp() { - for (auto idx : _missingEntries) { - if (idx != 0) { - _populateStats(); - return; - } - } - _lastContinuousIdx = _lastMissingIdx; - _missingEntries.clear(); - _requestIds.clear(); - _findMissingEntries(); - _populateStats(); - } - - - void proccessLogReadRequest(ReplicaId fromReplicaId, uint64_t requestId, const LogReadReq& request) { - auto& response = _reqResp.newResponse(fromReplicaId, requestId); - auto& readResponse = response.msg.body.setLogRead(); - if (_metadata.getLastReleased() < request.idx) { - readResponse.result = TernError::LOG_ENTRY_UNRELEASED; - return; - } - LogsDBLogEntry entry; - auto err =_data.readLogEntry(request.idx, entry); - readResponse.result = err; - if (err == TernError::NO_ERROR) { - readResponse.value.els = entry.value; - } - } - - void proccessLogReadResponse(ReplicaId fromReplicaId, LogsDBRequest& request, const LogReadResp& response) { - if (response.result != TernError::NO_ERROR) { - return; - } - - auto idx = request.msg.body.getLogRead().idx; - - size_t i = 0; - for (; i < _missingEntries.size(); ++i) { - if (_missingEntries[i] == idx) { - _missingEntries[i] = 0; - break; - } - } - - if (i == _missingEntries.size()) { - return; - } - _reqResp.cleanupRequests(_requestIds[i]); - LogsDBLogEntry entry; - entry.idx = idx; - entry.value = response.value.els; - _data.writeLogEntry(entry); - } - - LogIdx lastRead() const { - return _lastRead; - } - -private: - - void _populateStats() { - update_atomic_stat_ema(_stats.followerLag, _metadata.getLastReleased().u64 - _lastContinuousIdx.u64); - update_atomic_stat_ema(_stats.catchupWindow, _missingEntries.size()); - } - - void _findMissingEntries() { - if (!_missingEntries.empty()) { - return; - } - auto lastReleased = _metadata.getLastReleased(); - if (unlikely(_metadata.getLastReleased() <= _lastRead)) { - return; - } - auto it = _data.getIterator(); - auto startIdx = _lastContinuousIdx; - it.seek(++startIdx); - while (startIdx <= lastReleased && _missingEntries.size() < LogsDB::CATCHUP_WINDOW) { - if(!it.valid() || startIdx < it.key() ) { - _missingEntries.emplace_back(startIdx); - } else { - ++it; - } - ++startIdx; - } - - if (_missingEntries.empty()) { - _lastContinuousIdx = _lastMissingIdx = lastReleased; - return; - } - - _lastContinuousIdx = _missingEntries.front().u64 - 1; - _lastMissingIdx = _missingEntries.back(); - - for(auto logIdx : _missingEntries) { - _requestIds.emplace_back(); - auto& requests = _requestIds.back(); - for (ReplicaId replicaId = 0; replicaId.u8 < LogsDB::REPLICA_COUNT; ++replicaId.u8 ) { - if (replicaId == _replicaId) { - requests[replicaId.u8] = 0; - continue; - } - auto& request = _reqResp.newRequest(replicaId); - auto& readRequest = request.msg.body.setLogRead(); - readRequest.idx = logIdx; - requests[replicaId.u8] = request.msg.id; - } - } - } - LogsDBStats& _stats; - ReqResp& _reqResp; - LogMetadata& _metadata; - DataPartitions& _data; - - const ReplicaId _replicaId; - LogIdx _lastRead; - LogIdx _lastContinuousIdx; - LogIdx _lastMissingIdx; - - std::vector _missingEntries; - std::vector _requestIds; -}; - -class Appender { - static constexpr size_t IN_FLIGHT_MASK = LogsDB::IN_FLIGHT_APPEND_WINDOW - 1; - static_assert((IN_FLIGHT_MASK & LogsDB::IN_FLIGHT_APPEND_WINDOW) == 0); -public: - Appender(Env& env, LogsDBStats& stats, ReqResp& reqResp, LogMetadata& metadata, LeaderElection& leaderElection, bool noReplication) : - _env(env), - _reqResp(reqResp), - _metadata(metadata), - _leaderElection(leaderElection), - _noReplication(noReplication), - _currentIsLeader(false), - _entriesStart(0), - _entriesEnd(0) { } - - void maybeMoveRelease() { - if (!_currentIsLeader && _leaderElection.isLeader()) { - _init(); - return; - } - if (!_leaderElection.isLeader() && _currentIsLeader) { - _cleanup(); - return; - } - - if (!_currentIsLeader) { - return; - } - - auto newRelease = _metadata.getLastReleased(); - std::vector entriesToWrite; - for (; _entriesStart < _entriesEnd; ++_entriesStart) { - auto offset = _entriesStart & IN_FLIGHT_MASK; - auto& requestIds = _requestIds[offset]; - if (_noReplication || ReqResp::isQuorum(requestIds)) { - ++newRelease; - entriesToWrite.emplace_back(std::move(_entries[offset])); - ALWAYS_ASSERT(newRelease == entriesToWrite.back().idx); - _reqResp.cleanupRequests(requestIds); - continue; - } - break; - } - if (entriesToWrite.empty()) { - return; - } - - auto err = _leaderElection.writeLogEntries(_metadata.getLeaderToken(), newRelease, entriesToWrite); - ALWAYS_ASSERT(err == TernError::NO_ERROR); - for (auto reqId : _releaseRequests) { - if (reqId == 0) { - continue; - } - auto request = _reqResp.getRequest(reqId); - ALWAYS_ASSERT(request->msg.body.kind() == LogMessageKind::RELEASE); - auto& releaseReq = request->msg.body.setRelease(); - releaseReq.token = _metadata.getLeaderToken(); - releaseReq.lastReleased = _metadata.getLastReleased(); - } - } - - TernError appendEntries(std::vector& entries) { - if (!_leaderElection.isLeader()) { - return TernError::LEADER_PREEMPTED; - } - auto availableSpace = LogsDB::IN_FLIGHT_APPEND_WINDOW - entriesInFlight(); - auto countToAppend = std::min(entries.size(), availableSpace); - for(size_t i = 0; i < countToAppend; ++i) { - entries[i].idx = _metadata.assignLogIdx(); - auto offset = (_entriesEnd + i) & IN_FLIGHT_MASK; - _entries[offset] = entries[i]; - auto& requestIds = _requestIds[offset]; - for(ReplicaId replicaId = 0; replicaId.u8 < LogsDB::REPLICA_COUNT; ++replicaId.u8) { - if (replicaId == _metadata.getReplicaId()) { - requestIds[replicaId.u8] = 0; - continue; - } - if (unlikely(_noReplication)) { - requestIds[replicaId.u8] = 0; - continue; - } - auto& req = _reqResp.newRequest(replicaId); - auto& writeReq = req.msg.body.setLogWrite(); - writeReq.token = _metadata.getLeaderToken(); - writeReq.lastReleased = _metadata.getLastReleased(); - writeReq.idx = _entries[offset].idx; - writeReq.value.els = _entries[offset].value; - requestIds[replicaId.u8] = req.msg.id; - } - } - for (size_t i = countToAppend; i < entries.size(); ++i) { - entries[i].idx = 0; - } - _entriesEnd += countToAppend; - if (unlikely(_noReplication)) { - maybeMoveRelease(); - } - return TernError::NO_ERROR; - } - - void proccessLogWriteResponse(ReplicaId fromReplicaId, LogsDBRequest& request, const LogWriteResp& response) { - if (!_leaderElection.isLeader()) { - return; - } - switch ((TernError)response.result) { - case TernError::NO_ERROR: - break; - case TernError::LEADER_PREEMPTED: - _leaderElection.resetLeaderElection(); - return; - default: - LOG_ERROR(_env, "Unexpected result from LOG_WRITE response %s", response.result); - return; - } - - auto logIdx = request.msg.body.getLogWrite().idx; - ALWAYS_ASSERT(_metadata.getLastReleased() < logIdx); - auto offset = _entriesStart + (logIdx.u64 - _metadata.getLastReleased().u64 - 1); - ALWAYS_ASSERT(offset < _entriesEnd); - offset &= IN_FLIGHT_MASK; - ALWAYS_ASSERT(_entries[offset].idx == logIdx); - auto& requestIds = _requestIds[offset]; - if (requestIds[fromReplicaId.u8] != request.msg.id) { - LOG_ERROR(_env, "Mismatch in expected requestId in LOG_WRITE response %s", response); - return; - } - requestIds[fromReplicaId.u8] = 0; - _reqResp.eraseRequest(request.msg.id); - } - - uint64_t entriesInFlight() const { - return _entriesEnd - _entriesStart; - } - -private: - - void _init() { - for(ReplicaId replicaId = 0; replicaId.u8 < LogsDB::REPLICA_COUNT; ++replicaId.u8) { - if (replicaId == _metadata.getReplicaId()) { - _releaseRequests[replicaId.u8] = 0; - continue; - } - auto& req = _reqResp.newRequest(replicaId); - auto& releaseReq = req.msg.body.setRelease(); - releaseReq.token = _metadata.getLeaderToken(); - releaseReq.lastReleased = _metadata.getLastReleased(); - _releaseRequests[replicaId.u8] = req.msg.id; - } - _currentIsLeader = true; - } - - void _cleanup() { - for (; _entriesStart < _entriesEnd; ++_entriesStart) { - auto offset = _entriesStart & IN_FLIGHT_MASK; - _entries[offset].value.clear(); - _reqResp.cleanupRequests(_requestIds[offset]); - } - _reqResp.cleanupRequests(_releaseRequests); - _currentIsLeader = false; - } - Env& _env; - ReqResp& _reqResp; - LogMetadata& _metadata; - LeaderElection& _leaderElection; - - const bool _noReplication; - bool _currentIsLeader; - uint64_t _entriesStart; - uint64_t _entriesEnd; - - std::array _entries; - std::array _requestIds; - ReqResp::QuorumTrackArray _releaseRequests; - - -}; - -class LogsDBImpl { -public: - LogsDBImpl( +LogsDB::LogsDB( Logger& logger, std::shared_ptr& xmon, SharedRocksDB& sharedDB, @@ -1592,295 +67,197 @@ class LogsDBImpl { LogIdx lastRead, bool noReplication, bool avoidBeingLeader) - : - _env(logger, xmon, "LogsDB"), - _db(sharedDB.db()), - _replicaId(replicaId), - _stats(), - _partitions(_env,sharedDB), - _metadata(_env,_stats, sharedDB, replicaId, _partitions), - _reqResp(_stats), - _leaderElection(_env, _stats, noReplication, avoidBeingLeader, replicaId, _metadata, _partitions, _reqResp), - _batchWriter(_env,_reqResp, _leaderElection), - _catchupReader(_stats, _reqResp, _metadata, _partitions, replicaId, lastRead), - _appender(_env, _stats, _reqResp, _metadata, _leaderElection, noReplication) - { - LOG_INFO(_env, "Initializing LogsDB"); - auto initialStart = _metadata.isInitialStart() && _partitions.isInitialStart(); - if (initialStart) { - LOG_INFO(_env, "Initial start of LogsDB"); - } - - auto initSuccess = _metadata.init(initialStart); - initSuccess = _partitions.init(initialStart) && initSuccess; - - ALWAYS_ASSERT(initSuccess, "Failed to init LogsDB, check if you need to run with \"initialStart\" flag!"); - ALWAYS_ASSERT(lastRead <= _metadata.getLastReleased()); - flush(true); - _catchupReader.init(); - - LOG_INFO(_env,"LogsDB opened, leaderToken(%s), lastReleased(%s), lastRead(%s)",_metadata.getLeaderToken(), _metadata.getLastReleased(), _catchupReader.lastRead()); - _infoLoggedTime = ternNow(); - _lastLoopFinished = ternNow(); - } - - ~LogsDBImpl() { - close(); - } - - void close() { - LOG_INFO(_env,"closing LogsDB, leaderToken(%s), lastReleased(%s), lastRead(%s)", _metadata.getLeaderToken(), _metadata.getLastReleased(), _catchupReader.lastRead()); - } - - LogIdx appendLogEntries(std::vector& entries) { - ALWAYS_ASSERT(_metadata.getLeaderToken().replica() == _replicaId); - if (unlikely(entries.size() == 0)) { - return 0; - } - - for (auto& entry : entries) { - entry.idx = _metadata.assignLogIdx(); - } - auto firstAssigned = entries.front().idx; - ALWAYS_ASSERT(_metadata.getLastReleased() < firstAssigned); - _partitions.writeLogEntries(entries); - return firstAssigned; - } - - void flush(bool sync) { - ROCKS_DB_CHECKED(_db->FlushWAL(sync)); - } - - void processIncomingMessages(std::vector& requests, std::vector& responses) { - auto processingStarted = ternNow(); - _maybeLogStatus(processingStarted); - for(auto& resp : responses) { - auto request = _reqResp.getRequest(resp.msg.id); - if (request == nullptr) { - // We often don't care about all responses and remove requests as soon as we can make progress - continue; - } - - // Mismatch in responses could be due to network issues we don't want to crash but we will ignore and retry - // Mismatch in internal state is asserted on. - if (unlikely(request->replicaId != resp.replicaId)) { - LOG_ERROR(_env, "Expected response from replica %s, got it from replica %s. Response: %s", request->replicaId, resp.msg.id, resp); - continue; - } - if (unlikely(request->msg.body.kind() != resp.msg.body.kind())) { - LOG_ERROR(_env, "Expected response of type %s, got type %s. Response: %s", request->msg.body.kind(), resp.msg.body.kind(), resp); - continue; - } - LOG_TRACE(_env, "processing %s", resp); - - switch(resp.msg.body.kind()) { - case LogMessageKind::RELEASE: - // We don't track release requests. This response is unexpected - case LogMessageKind::ERROR: - LOG_ERROR(_env, "Bad response %s", resp); - break; - case LogMessageKind::LOG_WRITE: - _appender.proccessLogWriteResponse(request->replicaId, *request, resp.msg.body.getLogWrite()); - break; - case LogMessageKind::LOG_READ: - _catchupReader.proccessLogReadResponse(request->replicaId, *request, resp.msg.body.getLogRead()); - break; - case LogMessageKind::NEW_LEADER: - _leaderElection.proccessNewLeaderResponse(request->replicaId, *request, resp.msg.body.getNewLeader()); - break; - case LogMessageKind::NEW_LEADER_CONFIRM: - _leaderElection.proccessNewLeaderConfirmResponse(request->replicaId, *request, resp.msg.body.getNewLeaderConfirm()); - break; - case LogMessageKind::LOG_RECOVERY_READ: - _leaderElection.proccessRecoveryReadResponse(request->replicaId, *request, resp.msg.body.getLogRecoveryRead()); - break; - case LogMessageKind::LOG_RECOVERY_WRITE: - _leaderElection.proccessRecoveryWriteResponse(request->replicaId, *request, resp.msg.body.getLogRecoveryWrite()); - break; - case LogMessageKind::EMPTY: - ALWAYS_ASSERT("LogMessageKind::EMPTY should not happen"); - break; - } - } - for(auto& req : requests) { - switch (req.msg.body.kind()) { - case LogMessageKind::ERROR: - LOG_ERROR(_env, "Bad request %s", req); - break; - case LogMessageKind::LOG_WRITE: - _batchWriter.proccessLogWriteRequest(req); - break; - case LogMessageKind::RELEASE: - _batchWriter.proccessReleaseRequest(req.replicaId, req.msg.id, req.msg.body.getRelease()); - break; - case LogMessageKind::LOG_READ: - _catchupReader.proccessLogReadRequest(req.replicaId, req.msg.id, req.msg.body.getLogRead()); - break; - case LogMessageKind::NEW_LEADER: - _leaderElection.proccessNewLeaderRequest(req.replicaId, req.msg.id, req.msg.body.getNewLeader()); - break; - case LogMessageKind::NEW_LEADER_CONFIRM: - _leaderElection.proccessNewLeaderConfirmRequest(req.replicaId, req.msg.id, req.msg.body.getNewLeaderConfirm()); - break; - case LogMessageKind::LOG_RECOVERY_READ: - _leaderElection.proccessRecoveryReadRequest(req.replicaId, req.msg.id, req.msg.body.getLogRecoveryRead()); - break; - case LogMessageKind::LOG_RECOVERY_WRITE: - _leaderElection.proccessRecoveryWriteRequest(req.replicaId, req.msg.id, req.msg.body.getLogRecoveryWrite()); - break; - case LogMessageKind::EMPTY: - ALWAYS_ASSERT("LogMessageKind::EMPTY should not happen"); - break; - } - } - _leaderElection.maybeStartLeaderElection(); - _batchWriter.writeBatch(); - _appender.maybeMoveRelease(); - _catchupReader.maybeCatchUp(); - _reqResp.resendTimedOutRequests(); - update_atomic_stat_ema(_stats.requestsReceived, requests.size()); - update_atomic_stat_ema(_stats.responsesReceived, responses.size()); - update_atomic_stat_ema(_stats.appendWindow, _appender.entriesInFlight()); - _stats.isLeader.store(_leaderElection.isLeader(), std::memory_order_relaxed); - responses.clear(); - requests.clear(); - update_atomic_stat_ema(_stats.idleTime, processingStarted - _lastLoopFinished); - _lastLoopFinished = ternNow(); - update_atomic_stat_ema(_stats.processingTime, _lastLoopFinished - processingStarted); - } - - void getOutgoingMessages(std::vector& requests, std::vector& responses) { - _reqResp.getResponsesToSend(responses); - _reqResp.getRequestsToSend(requests); - } - - bool isLeader() const { - return _leaderElection.isLeader(); - } - - TernError appendEntries(std::vector& entries) { - return _appender.appendEntries(entries); - } - - LogIdx getLastContinuous() const { - return _catchupReader.getLastContinuous(); - } - - void readEntries(std::vector& entries, size_t maxEntries) { - _catchupReader.readEntries(entries, maxEntries); - } - - void readIndexedEntries(const std::vector &indices, std::vector &entries) const { - _partitions.readIndexedEntries(indices, entries); - } - - Duration getNextTimeout() const { - return _reqResp.getNextTimeout(); - } - - LogIdx getLastReleased() const { - return _metadata.getLastReleased(); - } - - LogIdx getHeadIdx() const { - return _partitions.getLowestKey(); - } +: + _env(logger, xmon, "LogsDB"), + _db(sharedDB.db()), + _replicaId(replicaId), + _stats() +{ + LOG_INFO(_env, "Initializing LogsDB"); + + // Create components in order of dependencies + _partitions = std::make_unique(_env, sharedDB); + _metadata = std::make_unique(_env, _stats, sharedDB, replicaId, *_partitions); + _reqResp = std::make_unique(_stats); + _leaderElection = std::make_unique(_env, _stats, noReplication, avoidBeingLeader, replicaId, *_metadata, *_partitions, *_reqResp); + _batchWriter = std::make_unique(_env, *_reqResp, *_leaderElection); + _catchupReader = std::make_unique(_stats, *_reqResp, *_metadata, *_partitions, replicaId, lastRead); + _appender = std::make_unique(_env, _stats, *_reqResp, *_metadata, *_leaderElection, noReplication); - const LogsDBStats& getStats() const { - return _stats; + auto initialStart = _metadata->isInitialStart() && _partitions->isInitialStart(); + if (initialStart) { + LOG_INFO(_env, "Initial start of LogsDB"); } -private: + auto initSuccess = _metadata->init(initialStart); + initSuccess = _partitions->init(initialStart) && initSuccess; - void _maybeLogStatus(TernTime now) { - if (now - _infoLoggedTime > 1_mins) { - LOG_INFO(_env,"LogsDB status: leaderToken(%s), lastReleased(%s), lastRead(%s)",_metadata.getLeaderToken(), _metadata.getLastReleased(), _catchupReader.lastRead()); - _infoLoggedTime = now; - } - } - - Env _env; - rocksdb::DB* _db; - const ReplicaId _replicaId; - LogsDBStats _stats; - DataPartitions _partitions; - LogMetadata _metadata; - ReqResp _reqResp; - LeaderElection _leaderElection; - BatchWriter _batchWriter; - CatchupReader _catchupReader; - Appender _appender; - TernTime _infoLoggedTime; - TernTime _lastLoopFinished; -}; + ALWAYS_ASSERT(initSuccess, "Failed to init LogsDB, check if you need to run with \"initialStart\" flag!"); + ALWAYS_ASSERT(lastRead <= _metadata->getLastReleased()); + flush(true); + _catchupReader->init(); -LogsDB::LogsDB( - Logger& logger, - std::shared_ptr& xmon, - SharedRocksDB& sharedDB, - ReplicaId replicaId, - LogIdx lastRead, - bool noReplication, - bool avoidBeingLeader) -{ - _impl = new LogsDBImpl(logger, xmon, sharedDB, replicaId, lastRead, noReplication, avoidBeingLeader); + LOG_INFO(_env,"LogsDB opened, leaderToken(%s), lastReleased(%s), lastRead(%s)",_metadata->getLeaderToken(), _metadata->getLastReleased(), _catchupReader->lastRead()); + _infoLoggedTime = ternNow(); + _lastLoopFinished = ternNow(); } LogsDB::~LogsDB() { - delete _impl; - _impl = nullptr; + close(); } void LogsDB::close() { - _impl->close(); + LOG_INFO(_env,"closing LogsDB, leaderToken(%s), lastReleased(%s), lastRead(%s)", _metadata->getLeaderToken(), _metadata->getLastReleased(), _catchupReader->lastRead()); } void LogsDB::flush(bool sync) { - _impl->flush(sync); + ROCKS_DB_CHECKED(_db->FlushWAL(sync)); } void LogsDB::processIncomingMessages(std::vector& requests, std::vector& responses) { - _impl->processIncomingMessages(requests, responses); + auto processingStarted = ternNow(); + _maybeLogStatus(processingStarted); + for(auto& resp : responses) { + auto request = _reqResp->getRequest(resp.msg.id); + if (request == nullptr) { + // We often don't care about all responses and remove requests as soon as we can make progress + continue; + } + + // Mismatch in responses could be due to network issues we don't want to crash but we will ignore and retry + // Mismatch in internal state is asserted on. + if (unlikely(request->replicaId != resp.replicaId)) { + LOG_ERROR(_env, "Expected response from replica %s, got it from replica %s. Response: %s", request->replicaId, resp.msg.id, resp); + continue; + } + if (unlikely(request->msg.body.kind() != resp.msg.body.kind())) { + LOG_ERROR(_env, "Expected response of type %s, got type %s. Response: %s", request->msg.body.kind(), resp.msg.body.kind(), resp); + continue; + } + LOG_TRACE(_env, "processing %s", resp); + + switch(resp.msg.body.kind()) { + case LogMessageKind::RELEASE: + // We don't track release requests. This response is unexpected + case LogMessageKind::ERROR: + LOG_ERROR(_env, "Bad response %s", resp); + break; + case LogMessageKind::LOG_WRITE: + _appender->proccessLogWriteResponse(request->replicaId, *request, resp.msg.body.getLogWrite()); + break; + case LogMessageKind::LOG_READ: + _catchupReader->proccessLogReadResponse(request->replicaId, *request, resp.msg.body.getLogRead()); + break; + case LogMessageKind::NEW_LEADER: + _leaderElection->proccessNewLeaderResponse(request->replicaId, *request, resp.msg.body.getNewLeader()); + break; + case LogMessageKind::NEW_LEADER_CONFIRM: + _leaderElection->proccessNewLeaderConfirmResponse(request->replicaId, *request, resp.msg.body.getNewLeaderConfirm()); + break; + case LogMessageKind::LOG_RECOVERY_READ: + _leaderElection->proccessRecoveryReadResponse(request->replicaId, *request, resp.msg.body.getLogRecoveryRead()); + break; + case LogMessageKind::LOG_RECOVERY_WRITE: + _leaderElection->proccessRecoveryWriteResponse(request->replicaId, *request, resp.msg.body.getLogRecoveryWrite()); + break; + case LogMessageKind::EMPTY: + ALWAYS_ASSERT("LogMessageKind::EMPTY should not happen"); + break; + } + } + for(auto& req : requests) { + switch (req.msg.body.kind()) { + case LogMessageKind::ERROR: + LOG_ERROR(_env, "Bad request %s", req); + break; + case LogMessageKind::LOG_WRITE: + _batchWriter->proccessLogWriteRequest(req); + break; + case LogMessageKind::RELEASE: + _batchWriter->proccessReleaseRequest(req.replicaId, req.msg.id, req.msg.body.getRelease()); + break; + case LogMessageKind::LOG_READ: + _catchupReader->proccessLogReadRequest(req.replicaId, req.msg.id, req.msg.body.getLogRead()); + break; + case LogMessageKind::NEW_LEADER: + _leaderElection->proccessNewLeaderRequest(req.replicaId, req.msg.id, req.msg.body.getNewLeader()); + break; + case LogMessageKind::NEW_LEADER_CONFIRM: + _leaderElection->proccessNewLeaderConfirmRequest(req.replicaId, req.msg.id, req.msg.body.getNewLeaderConfirm()); + break; + case LogMessageKind::LOG_RECOVERY_READ: + _leaderElection->proccessRecoveryReadRequest(req.replicaId, req.msg.id, req.msg.body.getLogRecoveryRead()); + break; + case LogMessageKind::LOG_RECOVERY_WRITE: + _leaderElection->proccessRecoveryWriteRequest(req.replicaId, req.msg.id, req.msg.body.getLogRecoveryWrite()); + break; + case LogMessageKind::EMPTY: + ALWAYS_ASSERT("LogMessageKind::EMPTY should not happen"); + break; + } + } + _leaderElection->maybeStartLeaderElection(); + _batchWriter->writeBatch(); + _appender->maybeMoveRelease(); + _catchupReader->maybeCatchUp(); + _reqResp->resendTimedOutRequests(); + update_atomic_stat_ema(_stats.requestsReceived, requests.size()); + update_atomic_stat_ema(_stats.responsesReceived, responses.size()); + update_atomic_stat_ema(_stats.appendWindow, _appender->entriesInFlight()); + _stats.isLeader.store(_leaderElection->isLeader(), std::memory_order_relaxed); + responses.clear(); + requests.clear(); + update_atomic_stat_ema(_stats.idleTime, processingStarted - _lastLoopFinished); + _lastLoopFinished = ternNow(); + update_atomic_stat_ema(_stats.processingTime, _lastLoopFinished - processingStarted); } void LogsDB::getOutgoingMessages(std::vector& requests, std::vector& responses) { - _impl->getOutgoingMessages(requests, responses); + _reqResp->getResponsesToSend(responses); + _reqResp->getRequestsToSend(requests); } bool LogsDB::isLeader() const { - return _impl->isLeader(); + return _leaderElection->isLeader(); } TernError LogsDB::appendEntries(std::vector& entries) { - return _impl->appendEntries(entries); + return _appender->appendEntries(entries); } LogIdx LogsDB::getLastContinuous() const { - return _impl->getLastContinuous(); + return _catchupReader->getLastContinuous(); } void LogsDB::readEntries(std::vector& entries, size_t maxEntries) { - _impl->readEntries(entries, maxEntries); + _catchupReader->readEntries(entries, maxEntries); } void LogsDB::readIndexedEntries(const std::vector &indices, std::vector &entries) const { - _impl->readIndexedEntries(indices, entries); + _partitions->readIndexedEntries(indices, entries); } Duration LogsDB::getNextTimeout() const { - return _impl->getNextTimeout(); + return _reqResp->getNextTimeout(); } LogIdx LogsDB::getLastReleased() const { - return _impl->getLastReleased(); + return _metadata->getLastReleased(); } LogIdx LogsDB::getHeadIdx() const { - return _impl->getHeadIdx(); + return _partitions->getLowestKey(); } const LogsDBStats& LogsDB::getStats() const { - return _impl->getStats(); + return _stats; +} + +void LogsDB::_maybeLogStatus(TernTime now) { + if (now - _infoLoggedTime > 1_mins) { + LOG_INFO(_env,"LogsDB status: leaderToken(%s), lastReleased(%s), lastRead(%s)",_metadata->getLeaderToken(), _metadata->getLastReleased(), _catchupReader->lastRead()); + _infoLoggedTime = now; + } } void LogsDB::_getUnreleasedLogEntries(Env& env, SharedRocksDB& sharedDB, LogIdx& lastReleasedOut, std::vector& unreleasedLogEntriesOut) { diff --git a/cpp/core/LogsDB.hpp b/cpp/core/LogsDB.hpp index 58b34579..2cae7c9b 100644 --- a/cpp/core/LogsDB.hpp +++ b/cpp/core/LogsDB.hpp @@ -6,6 +6,7 @@ #include #include +#include #include #include #include @@ -15,6 +16,17 @@ #include "Protocol.hpp" #include "SharedRocksDB.hpp" #include "Time.hpp" +#include "logsdb/LogsDBCommon.hpp" +#include "logsdb/LogsDBTypes.hpp" + +// Forward declarations for logsdb components +class DataPartitions; +class LogMetadata; +class ReqResp; +class LeaderElection; +class BatchWriter; +class CatchupReader; +class Appender; // ** Releases ** // Released records are records which have been confirmed by the leader to have been at some point correctly replicated. @@ -43,31 +55,7 @@ // Since they were not part of the leader election they know their records after last released point have not been taken into // account and could have been overwriten. They at this point drop these records and catch up from lastReleased point. - -struct LogsDBLogEntry { - LogIdx idx; - std::vector value; - bool operator==(const LogsDBLogEntry& oth) const { - return idx == oth.idx && value == oth.value; - } -}; - -std::ostream& operator<<(std::ostream& out, const LogsDBLogEntry& entry); - -struct LogsDBRequest { - ReplicaId replicaId; - TernTime sentTime; - LogReqMsg msg; -}; - -std::ostream& operator<<(std::ostream& out, const LogsDBRequest& entry); - -struct LogsDBResponse { - ReplicaId replicaId; - LogRespMsg msg; -}; - -std::ostream& operator<<(std::ostream& out, const LogsDBResponse& entry); +// LogsDBLogEntry, LogsDBRequest, and LogsDBResponse are now defined in logsdb/LogsDBTypes.hpp struct LogsDBStats { std::atomic idleTime{0}; @@ -88,18 +76,17 @@ struct LogsDBStats { std::atomic isLeader{false}; }; -class LogsDBImpl; - class LogsDB { public: - static constexpr size_t REPLICA_COUNT = 5; - static constexpr Duration PARTITION_TIME_SPAN = 12_hours; - static constexpr Duration RESPONSE_TIMEOUT = 10_ms; - static constexpr Duration READ_TIMEOUT = 1_sec; - static constexpr Duration SEND_RELEASE_INTERVAL = 300_ms; - static constexpr Duration LEADER_INACTIVE_TIMEOUT = 1_sec; - static constexpr size_t IN_FLIGHT_APPEND_WINDOW = 1 << 8; - static constexpr size_t CATCHUP_WINDOW = 1 << 8 ; + // Constants - for backward compatibility, these reference LogsDBConsts + static constexpr size_t REPLICA_COUNT = LogsDBConsts::REPLICA_COUNT; + static constexpr Duration PARTITION_TIME_SPAN = LogsDBConsts::PARTITION_TIME_SPAN; + static constexpr Duration RESPONSE_TIMEOUT = LogsDBConsts::RESPONSE_TIMEOUT; + static constexpr Duration READ_TIMEOUT = LogsDBConsts::READ_TIMEOUT; + static constexpr Duration SEND_RELEASE_INTERVAL = LogsDBConsts::SEND_RELEASE_INTERVAL; + static constexpr Duration LEADER_INACTIVE_TIMEOUT = LogsDBConsts::LEADER_INACTIVE_TIMEOUT; + static constexpr size_t IN_FLIGHT_APPEND_WINDOW = LogsDBConsts::IN_FLIGHT_APPEND_WINDOW; + static constexpr size_t CATCHUP_WINDOW = LogsDBConsts::CATCHUP_WINDOW; static constexpr size_t MAX_UDP_ENTRY_SIZE = MAX_UDP_MTU - std::max(LogReqMsg::STATIC_SIZE, LogRespMsg::STATIC_SIZE); static constexpr size_t DEFAULT_UDP_ENTRY_SIZE = DEFAULT_UDP_MTU - std::max(LogReqMsg::STATIC_SIZE, LogRespMsg::STATIC_SIZE); @@ -150,5 +137,20 @@ class LogsDB { friend class LogsDBTools; static void _getUnreleasedLogEntries(Env& env, SharedRocksDB& sharedDB, LogIdx& lastReleasedOut, std::vector& unreleasedLogEntriesOut); static void _getLogEntries(Env& env, SharedRocksDB& sharedDB, LogIdx start, size_t count, std::vector& logEntriesOut); - LogsDBImpl* _impl; + + void _maybeLogStatus(TernTime now); + + Env _env; + rocksdb::DB* _db; + const ReplicaId _replicaId; + LogsDBStats _stats; + std::unique_ptr _partitions; + std::unique_ptr _metadata; + std::unique_ptr _reqResp; + std::unique_ptr _leaderElection; + std::unique_ptr _batchWriter; + std::unique_ptr _catchupReader; + std::unique_ptr _appender; + TernTime _infoLoggedTime; + TernTime _lastLoopFinished; }; diff --git a/cpp/core/logsdb/Appender.cpp b/cpp/core/logsdb/Appender.cpp new file mode 100644 index 00000000..ebadb0cb --- /dev/null +++ b/cpp/core/logsdb/Appender.cpp @@ -0,0 +1,165 @@ +// Copyright 2025 XTX Markets Technologies Limited +// +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#include "Appender.hpp" + +#include + +#include "../Assert.hpp" +#include "../LogsDB.hpp" +#include "../Msgs.hpp" + +Appender::Appender(Env& env, LogsDBStats& stats, ReqResp& reqResp, LogMetadata& metadata, LeaderElection& leaderElection, bool noReplication) : + _env(env), + _reqResp(reqResp), + _metadata(metadata), + _leaderElection(leaderElection), + _noReplication(noReplication), + _currentIsLeader(false), + _entriesStart(0), + _entriesEnd(0) { } + +void Appender::maybeMoveRelease() { + if (!_currentIsLeader && _leaderElection.isLeader()) { + _init(); + return; + } + if (!_leaderElection.isLeader() && _currentIsLeader) { + _cleanup(); + return; + } + + if (!_currentIsLeader) { + return; + } + + auto newRelease = _metadata.getLastReleased(); + std::vector entriesToWrite; + for (; _entriesStart < _entriesEnd; ++_entriesStart) { + auto offset = _entriesStart & IN_FLIGHT_MASK; + auto& requestIds = _requestIds[offset]; + if (_noReplication || ReqResp::isQuorum(requestIds)) { + ++newRelease; + entriesToWrite.emplace_back(std::move(_entries[offset])); + ALWAYS_ASSERT(newRelease == entriesToWrite.back().idx); + _reqResp.cleanupRequests(requestIds); + continue; + } + break; + } + if (entriesToWrite.empty()) { + return; + } + + auto err = _leaderElection.writeLogEntries(_metadata.getLeaderToken(), newRelease, entriesToWrite); + ALWAYS_ASSERT(err == TernError::NO_ERROR); + for (auto reqId : _releaseRequests) { + if (reqId == 0) { + continue; + } + auto request = _reqResp.getRequest(reqId); + ALWAYS_ASSERT(request->msg.body.kind() == LogMessageKind::RELEASE); + auto& releaseReq = request->msg.body.setRelease(); + releaseReq.token = _metadata.getLeaderToken(); + releaseReq.lastReleased = _metadata.getLastReleased(); + } +} + +TernError Appender::appendEntries(std::vector& entries) { + if (!_leaderElection.isLeader()) { + return TernError::LEADER_PREEMPTED; + } + auto availableSpace = LogsDBConsts::IN_FLIGHT_APPEND_WINDOW - entriesInFlight(); + auto countToAppend = std::min(entries.size(), availableSpace); + for(size_t i = 0; i < countToAppend; ++i) { + entries[i].idx = _metadata.assignLogIdx(); + auto offset = (_entriesEnd + i) & IN_FLIGHT_MASK; + _entries[offset] = entries[i]; + auto& requestIds = _requestIds[offset]; + for(ReplicaId replicaId = 0; replicaId.u8 < LogsDBConsts::REPLICA_COUNT; ++replicaId.u8) { + if (replicaId == _metadata.getReplicaId()) { + requestIds[replicaId.u8] = 0; + continue; + } + if (unlikely(_noReplication)) { + requestIds[replicaId.u8] = 0; + continue; + } + auto& req = _reqResp.newRequest(replicaId); + auto& writeReq = req.msg.body.setLogWrite(); + writeReq.token = _metadata.getLeaderToken(); + writeReq.lastReleased = _metadata.getLastReleased(); + writeReq.idx = _entries[offset].idx; + writeReq.value.els = _entries[offset].value; + requestIds[replicaId.u8] = req.msg.id; + } + } + for (size_t i = countToAppend; i < entries.size(); ++i) { + entries[i].idx = 0; + } + _entriesEnd += countToAppend; + if (unlikely(_noReplication)) { + maybeMoveRelease(); + } + return TernError::NO_ERROR; +} + +void Appender::proccessLogWriteResponse(ReplicaId fromReplicaId, LogsDBRequest& request, const LogWriteResp& response) { + if (!_leaderElection.isLeader()) { + return; + } + switch ((TernError)response.result) { + case TernError::NO_ERROR: + break; + case TernError::LEADER_PREEMPTED: + _leaderElection.resetLeaderElection(); + return; + default: + LOG_ERROR(_env, "Unexpected result from LOG_WRITE response %s", response.result); + return; + } + + auto logIdx = request.msg.body.getLogWrite().idx; + ALWAYS_ASSERT(_metadata.getLastReleased() < logIdx); + auto offset = _entriesStart + (logIdx.u64 - _metadata.getLastReleased().u64 - 1); + ALWAYS_ASSERT(offset < _entriesEnd); + offset &= IN_FLIGHT_MASK; + ALWAYS_ASSERT(_entries[offset].idx == logIdx); + auto& requestIds = _requestIds[offset]; + if (requestIds[fromReplicaId.u8] != request.msg.id) { + LOG_ERROR(_env, "Mismatch in expected requestId in LOG_WRITE response %s", response); + return; + } + requestIds[fromReplicaId.u8] = 0; + _reqResp.eraseRequest(request.msg.id); +} + +uint64_t Appender::entriesInFlight() const { + return _entriesEnd - _entriesStart; +} + +void Appender::_init() { + for(ReplicaId replicaId = 0; replicaId.u8 < LogsDBConsts::REPLICA_COUNT; ++replicaId.u8) { + if (replicaId == _metadata.getReplicaId()) { + _releaseRequests[replicaId.u8] = 0; + continue; + } + auto& req = _reqResp.newRequest(replicaId); + auto& releaseReq = req.msg.body.setRelease(); + releaseReq.token = _metadata.getLeaderToken(); + releaseReq.lastReleased = _metadata.getLastReleased(); + _releaseRequests[replicaId.u8] = req.msg.id; + } + _currentIsLeader = true; +} + +void Appender::_cleanup() { + for (; _entriesStart < _entriesEnd; ++_entriesStart) { + auto offset = _entriesStart & IN_FLIGHT_MASK; + _entries[offset].value.clear(); + _reqResp.cleanupRequests(_requestIds[offset]); + } + _reqResp.cleanupRequests(_releaseRequests); + _currentIsLeader = false; +} diff --git a/cpp/core/logsdb/Appender.hpp b/cpp/core/logsdb/Appender.hpp new file mode 100644 index 00000000..96dbad68 --- /dev/null +++ b/cpp/core/logsdb/Appender.hpp @@ -0,0 +1,51 @@ +// Copyright 2025 XTX Markets Technologies Limited +// +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#pragma once + +#include +#include + +#include "../Env.hpp" +#include "../Protocol.hpp" +#include "LeaderElection.hpp" +#include "LogMetadata.hpp" +#include "LogsDBCommon.hpp" +#include "LogsDBTypes.hpp" +#include "ReqResp.hpp" + +// Forward declarations +struct LogsDBRequest; +struct LogsDBStats; +struct LogWriteResp; + +class Appender { + static constexpr size_t IN_FLIGHT_MASK = LogsDBConsts::IN_FLIGHT_APPEND_WINDOW - 1; + static_assert((IN_FLIGHT_MASK & LogsDBConsts::IN_FLIGHT_APPEND_WINDOW) == 0); +public: + Appender(Env& env, LogsDBStats& stats, ReqResp& reqResp, LogMetadata& metadata, LeaderElection& leaderElection, bool noReplication); + + void maybeMoveRelease(); + TernError appendEntries(std::vector& entries); + void proccessLogWriteResponse(ReplicaId fromReplicaId, LogsDBRequest& request, const LogWriteResp& response); + uint64_t entriesInFlight() const; + +private: + void _init(); + void _cleanup(); + + Env& _env; + ReqResp& _reqResp; + LogMetadata& _metadata; + LeaderElection& _leaderElection; + + const bool _noReplication; + bool _currentIsLeader; + uint64_t _entriesStart; + uint64_t _entriesEnd; + + std::array _entries; + std::array _requestIds; + ReqResp::QuorumTrackArray _releaseRequests; +}; diff --git a/cpp/core/logsdb/BatchWriter.cpp b/cpp/core/logsdb/BatchWriter.cpp new file mode 100644 index 00000000..e9bd0fac --- /dev/null +++ b/cpp/core/logsdb/BatchWriter.cpp @@ -0,0 +1,78 @@ +// Copyright 2025 XTX Markets Technologies Limited +// +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#include "BatchWriter.hpp" + +#include "../Assert.hpp" +#include "../LogsDB.hpp" +#include "../Msgs.hpp" + +BatchWriter::BatchWriter(Env& env, ReqResp& reqResp, LeaderElection& leaderElection) : + _env(env), + _reqResp(reqResp), + _leaderElection(leaderElection), + _token(LeaderToken(0,0)), + _lastReleased(0) {} + +void BatchWriter::proccessLogWriteRequest(LogsDBRequest& request) { + ALWAYS_ASSERT(request.msg.body.kind() == LogMessageKind::LOG_WRITE); + const auto& writeRequest = request.msg.body.getLogWrite(); + if (unlikely(request.replicaId != writeRequest.token.replica())) { + LOG_ERROR(_env, "Token from replica id %s does not have matching replica id. Token: %s", request.replicaId, writeRequest.token); + return; + } + if (unlikely(writeRequest.token < _token)) { + auto& resp = _reqResp.newResponse(request.replicaId, request.msg.id); + auto& writeResponse = resp.msg.body.setLogWrite(); + writeResponse.result = TernError::LEADER_PREEMPTED; + return; + } + if (unlikely(_token < writeRequest.token )) { + writeBatch(); + _token = writeRequest.token; + } + _requests.emplace_back(&request); + _entries.emplace_back(); + auto& entry = _entries.back(); + entry.idx = writeRequest.idx; + entry.value = writeRequest.value.els; + if (_lastReleased < writeRequest.lastReleased) { + _lastReleased = writeRequest.lastReleased; + } +} + +void BatchWriter::proccessReleaseRequest(ReplicaId fromReplicaId, uint64_t requestId, const ReleaseReq& request) { + if (unlikely(fromReplicaId != request.token.replica())) { + LOG_ERROR(_env, "Token from replica id %s does not have matching replica id. Token: %s", fromReplicaId, request.token); + return; + } + if (unlikely(request.token < _token)) { + return; + } + if (unlikely(_token < request.token )) { + writeBatch(); + _token = request.token; + } + + if (_lastReleased < request.lastReleased) { + _lastReleased = request.lastReleased; + } + +} + +void BatchWriter::writeBatch() { + if (_token == LeaderToken(0,0)) { + return; + } + auto response = _leaderElection.writeLogEntries(_token, _lastReleased, _entries); + for (auto req : _requests) { + auto& resp = _reqResp.newResponse(req->replicaId, req->msg.id); + auto& writeResponse = resp.msg.body.setLogWrite(); + writeResponse.result = response; + } + _requests.clear(); + _entries.clear(); + _lastReleased = 0; + _token = LeaderToken(0,0); +} diff --git a/cpp/core/logsdb/BatchWriter.hpp b/cpp/core/logsdb/BatchWriter.hpp new file mode 100644 index 00000000..5dfd4ae5 --- /dev/null +++ b/cpp/core/logsdb/BatchWriter.hpp @@ -0,0 +1,36 @@ +// Copyright 2025 XTX Markets Technologies Limited +// +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#pragma once + +#include + +#include "../Env.hpp" +#include "../Protocol.hpp" +#include "LeaderElection.hpp" +#include "LogsDBTypes.hpp" +#include "ReqResp.hpp" + +// Forward declarations +struct LogsDBRequest; +struct ReleaseReq; + +class BatchWriter { +public: + BatchWriter(Env& env, ReqResp& reqResp, LeaderElection& leaderElection); + + void proccessLogWriteRequest(LogsDBRequest& request); + void proccessReleaseRequest(ReplicaId fromReplicaId, uint64_t requestId, const ReleaseReq& request); + void writeBatch(); + +private: + Env& _env; + ReqResp& _reqResp; + LeaderElection& _leaderElection; + + LeaderToken _token; + LogIdx _lastReleased; + std::vector _requests; + std::vector _entries; +}; diff --git a/cpp/core/logsdb/CatchupReader.cpp b/cpp/core/logsdb/CatchupReader.cpp new file mode 100644 index 00000000..9b55e4e9 --- /dev/null +++ b/cpp/core/logsdb/CatchupReader.cpp @@ -0,0 +1,160 @@ +// Copyright 2025 XTX Markets Technologies Limited +// +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#include "CatchupReader.hpp" + +#include "../Assert.hpp" +#include "../LogsDB.hpp" +#include "../Msgs.hpp" +#include "LogsDBCommon.hpp" + +CatchupReader::CatchupReader(LogsDBStats& stats, ReqResp& reqResp, LogMetadata& metadata, DataPartitions& data, ReplicaId replicaId, LogIdx lastRead) : + _stats(stats), + _reqResp(reqResp), + _metadata(metadata), + _data(data), + _replicaId(replicaId), + _lastRead(lastRead), + _lastContinuousIdx(lastRead), + _lastMissingIdx(lastRead) {} + +LogIdx CatchupReader::getLastContinuous() const { + return _lastContinuousIdx; +} + +void CatchupReader::readEntries(std::vector& entries, size_t maxEntries) { + if (_lastRead == _lastContinuousIdx) { + update_atomic_stat_ema(_stats.entriesRead, (uint64_t)0); + return; + } + auto lastReleased = _metadata.getLastReleased(); + auto startIndex = _lastRead; + ++startIndex; + + auto it = _data.getIterator(); + for (it.seek(startIndex); it.valid(); it.next(), ++startIndex) { + if (_lastContinuousIdx < it.key() || entries.size() >= maxEntries) { + break; + } + ALWAYS_ASSERT(startIndex == it.key()); + entries.emplace_back(it.entry()); + _lastRead = startIndex; + } + update_atomic_stat_ema(_stats.entriesRead, entries.size()); + update_atomic_stat_ema(_stats.readerLag, lastReleased.u64 - _lastRead.u64); +} + +void CatchupReader::init() { + _missingEntries.reserve(LogsDBConsts::CATCHUP_WINDOW); + _requestIds.reserve(LogsDBConsts::CATCHUP_WINDOW); + _findMissingEntries(); +} + +void CatchupReader::maybeCatchUp() { + for (auto idx : _missingEntries) { + if (idx != 0) { + _populateStats(); + return; + } + } + _lastContinuousIdx = _lastMissingIdx; + _missingEntries.clear(); + _requestIds.clear(); + _findMissingEntries(); + _populateStats(); +} + + +void CatchupReader::proccessLogReadRequest(ReplicaId fromReplicaId, uint64_t requestId, const LogReadReq& request) { + auto& response = _reqResp.newResponse(fromReplicaId, requestId); + auto& readResponse = response.msg.body.setLogRead(); + if (_metadata.getLastReleased() < request.idx) { + readResponse.result = TernError::LOG_ENTRY_UNRELEASED; + return; + } + LogsDBLogEntry entry; + auto err =_data.readLogEntry(request.idx, entry); + readResponse.result = err; + if (err == TernError::NO_ERROR) { + readResponse.value.els = entry.value; + } +} + +void CatchupReader::proccessLogReadResponse(ReplicaId fromReplicaId, LogsDBRequest& request, const LogReadResp& response) { + if (response.result != TernError::NO_ERROR) { + return; + } + + auto idx = request.msg.body.getLogRead().idx; + + size_t i = 0; + for (; i < _missingEntries.size(); ++i) { + if (_missingEntries[i] == idx) { + _missingEntries[i] = 0; + break; + } + } + + if (i == _missingEntries.size()) { + return; + } + _reqResp.cleanupRequests(_requestIds[i]); + LogsDBLogEntry entry; + entry.idx = idx; + entry.value = response.value.els; + _data.writeLogEntry(entry); +} + +LogIdx CatchupReader::lastRead() const { + return _lastRead; +} + +void CatchupReader::_populateStats() { + update_atomic_stat_ema(_stats.followerLag, _metadata.getLastReleased().u64 - _lastContinuousIdx.u64); + update_atomic_stat_ema(_stats.catchupWindow, _missingEntries.size()); +} + +void CatchupReader::_findMissingEntries() { + if (!_missingEntries.empty()) { + return; + } + auto lastReleased = _metadata.getLastReleased(); + if (unlikely(_metadata.getLastReleased() <= _lastRead)) { + return; + } + auto it = _data.getIterator(); + auto startIdx = _lastContinuousIdx; + it.seek(++startIdx); + while (startIdx <= lastReleased && _missingEntries.size() < LogsDBConsts::CATCHUP_WINDOW) { + if(!it.valid() || startIdx < it.key() ) { + _missingEntries.emplace_back(startIdx); + } else { + ++it; + } + ++startIdx; + } + + if (_missingEntries.empty()) { + _lastContinuousIdx = _lastMissingIdx = lastReleased; + return; + } + + _lastContinuousIdx = _missingEntries.front().u64 - 1; + _lastMissingIdx = _missingEntries.back(); + + for(auto logIdx : _missingEntries) { + _requestIds.emplace_back(); + auto& requests = _requestIds.back(); + for (ReplicaId replicaId = 0; replicaId.u8 < LogsDBConsts::REPLICA_COUNT; ++replicaId.u8 ) { + if (replicaId == _replicaId) { + requests[replicaId.u8] = 0; + continue; + } + auto& request = _reqResp.newRequest(replicaId); + auto& readRequest = request.msg.body.setLogRead(); + readRequest.idx = logIdx; + requests[replicaId.u8] = request.msg.id; + } + } +} diff --git a/cpp/core/logsdb/CatchupReader.hpp b/cpp/core/logsdb/CatchupReader.hpp new file mode 100644 index 00000000..884abd1e --- /dev/null +++ b/cpp/core/logsdb/CatchupReader.hpp @@ -0,0 +1,50 @@ +// Copyright 2025 XTX Markets Technologies Limited +// +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#pragma once + +#include + +#include "../Protocol.hpp" +#include "DataPartitions.hpp" +#include "LogMetadata.hpp" +#include "ReqResp.hpp" + +// Forward declarations +struct LogsDBLogEntry; +struct LogsDBRequest; +struct LogReadReq; +struct LogReadResp; +struct LogsDBStats; +class LogsDB; + +class CatchupReader { +public: + CatchupReader(LogsDBStats& stats, ReqResp& reqResp, LogMetadata& metadata, DataPartitions& data, ReplicaId replicaId, LogIdx lastRead); + + LogIdx getLastContinuous() const; + void readEntries(std::vector& entries, size_t maxEntries); + void init(); + void maybeCatchUp(); + void proccessLogReadRequest(ReplicaId fromReplicaId, uint64_t requestId, const LogReadReq& request); + void proccessLogReadResponse(ReplicaId fromReplicaId, LogsDBRequest& request, const LogReadResp& response); + LogIdx lastRead() const; + +private: + void _populateStats(); + void _findMissingEntries(); + + LogsDBStats& _stats; + ReqResp& _reqResp; + LogMetadata& _metadata; + DataPartitions& _data; + + const ReplicaId _replicaId; + LogIdx _lastRead; + LogIdx _lastContinuousIdx; + LogIdx _lastMissingIdx; + + std::vector _missingEntries; + std::vector _requestIds; +}; diff --git a/cpp/core/logsdb/DataPartitions.cpp b/cpp/core/logsdb/DataPartitions.cpp new file mode 100644 index 00000000..3fbff067 --- /dev/null +++ b/cpp/core/logsdb/DataPartitions.cpp @@ -0,0 +1,316 @@ +// Copyright 2025 XTX Markets Technologies Limited +// +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#include "DataPartitions.hpp" + +#include +#include + +#include "../Assert.hpp" +#include "../Bincode.hpp" +#include "../LogsDB.hpp" +#include "../RocksDBUtils.hpp" +#include "LogsDBCommon.hpp" + +void LogPartition::reset(rocksdb::ColumnFamilyHandle* cf_, LogIdx minMaxKey, TernTime firstWriteTime_) { + cf = cf_; + minKey = maxKey = minMaxKey; + firstWriteTime = firstWriteTime_; +} + +DataPartitions::Iterator::Iterator(const DataPartitions& partitions) + : _partitions(partitions), _rotationCount(_partitions._rotationCount), _smaller(nullptr) { + _iterators = _partitions._getPartitionIterators(); +} + +void DataPartitions::Iterator::seek(LogIdx idx) { + if (unlikely(_rotationCount != _partitions._rotationCount)) { + _iterators = _partitions._getPartitionIterators(); + } + auto key = U64Key::Static(idx.u64); + for (auto& it : _iterators) { + it->Seek(key.toSlice()); + } + _updateSmaller(); +} + +bool DataPartitions::Iterator::valid() const { + return _smaller != nullptr; +} + +void DataPartitions::Iterator::next() { + if (_smaller != nullptr) { + _smaller->Next(); + } + _updateSmaller(); +} + +DataPartitions::Iterator& DataPartitions::Iterator::operator++() { + this->next(); + return *this; +} + +LogIdx DataPartitions::Iterator::key() const { + return LogIdx(ExternalValue::FromSlice(_smaller->key())().u64()); +} + +LogsDBLogEntry DataPartitions::Iterator::entry() const { + auto value = _smaller->value(); + return LogsDBLogEntry{key(), {(const uint8_t*)value.data(), (const uint8_t*)value.data() + value.size()}}; +} + +void DataPartitions::Iterator::dropEntry() { + ALWAYS_ASSERT(_rotationCount == _partitions._rotationCount); + auto cfIdx = _cfIndexForCurrentIterator(); + ROCKS_DB_CHECKED(_partitions._sharedDb.db()->Delete({}, _partitions._partitions[cfIdx].cf, _smaller->key())); +} + +void DataPartitions::Iterator::_updateSmaller() { + _smaller = nullptr; + for (auto& it : _iterators) { + if (!it->Valid()) { + continue; + } + if (_smaller == nullptr || (rocksdb::BytewiseComparator()->Compare(it->key(),_smaller->key()) < 0)) { + _smaller = it.get(); + } + } +} + +size_t DataPartitions::Iterator::_cfIndexForCurrentIterator() const { + for (size_t i = 0; i < _iterators.size(); ++i) { + if (_smaller == _iterators[i].get()) { + return i; + } + } + return -1; +} + +DataPartitions::DataPartitions(Env& env, SharedRocksDB& sharedDB) +: + _env(env), + _sharedDb(sharedDB), + _rotationCount(0), + _partitions({ + LogPartition{ + DATA_PARTITION_0_NAME, + LogsDBMetadataKey::PARTITION_0_FIRST_WRITE_TIME, + sharedDB.getCF(DATA_PARTITION_0_NAME), + 0, + 0, + 0 + }, + LogPartition{ + DATA_PARTITION_1_NAME, + LogsDBMetadataKey::PARTITION_1_FIRST_WRITE_TIME, + sharedDB.getCF(DATA_PARTITION_1_NAME), + 0, + 0, + 0 + } + }) +{} + +bool DataPartitions::isInitialStart() { + auto it1 = std::unique_ptr(_sharedDb.db()->NewIterator({},_partitions[0].cf)); + auto it2 = std::unique_ptr(_sharedDb.db()->NewIterator({},_partitions[1].cf)); + it1->SeekToFirst(); + it2->SeekToFirst(); + return !(it1->Valid() || it2->Valid()); +} + +bool DataPartitions::init(bool initialStart) { + bool initSuccess = true; + auto metadataCF = _sharedDb.getCF(METADATA_CF_NAME); + auto db = _sharedDb.db(); + std::string value; + for (auto& partition : _partitions) { + if (tryGet(db, metadataCF, logsDBMetadataKey(partition.firstWriteKey), value)) { + partition.firstWriteTime = ExternalValue::FromSlice(value)().u64(); + LOG_INFO(_env, "Loaded partition %s first write time %s", partition.name, partition.firstWriteTime); + } else if (initialStart) { + LOG_INFO(_env, "Partition %s first write time not found. Using %s", partition.name, partition.firstWriteTime); + _updatePartitionFirstWriteTime(partition, partition.firstWriteTime); + } else { + initSuccess = false; + LOG_ERROR(_env, "Partition %s first write time not found. Possible DB corruption!", partition.name); + } + } + { + auto partitionIterators = _getPartitionIterators(); + for (size_t i = 0; i < partitionIterators.size(); ++i) { + auto it = partitionIterators[i].get(); + auto& partition = _partitions[i]; + it->SeekToFirst(); + if (!it->Valid()) { + if (partition.firstWriteTime != 0) { + LOG_ERROR(_env, "No keys found in partition %s, but first write time is %s. DB Corruption!", partition.name, partition.firstWriteTime); + initSuccess = false; + } else { + LOG_INFO(_env, "Partition %s empty.", partition.name); + } + continue; + } + partition.minKey = ExternalValue::FromSlice(it->key())().u64(); + it->SeekToLast(); + // If at least one key exists seeking to last should never fail. + ROCKS_DB_CHECKED(it->status()); + partition.maxKey = ExternalValue::FromSlice(it->key())().u64(); + } + } + return initSuccess; +} + +DataPartitions::Iterator DataPartitions::getIterator() const { + return Iterator(*this); +} + +TernError DataPartitions::readLogEntry(LogIdx logIdx, LogsDBLogEntry& entry) const { + auto& partition = _getPartitionForIdx(logIdx); + if (unlikely(logIdx < partition.minKey)) { + return TernError::LOG_ENTRY_TRIMMED; + } + + auto key = U64Key::Static(logIdx.u64); + rocksdb::PinnableSlice value; + auto status = _sharedDb.db()->Get({}, partition.cf, key.toSlice(), &value); + if (status.IsNotFound()) { + return TernError::LOG_ENTRY_MISSING; + } + ROCKS_DB_CHECKED(status); + entry.idx = logIdx; + entry.value.assign((const uint8_t*)value.data(), (const uint8_t*)value.data() + value.size()); + return TernError::NO_ERROR; +} + +void DataPartitions::readIndexedEntries(const std::vector& indices, std::vector& entries) const { + entries.clear(); + if (indices.empty()) { + return; + } + // TODO: This is not very efficient as we're doing a lookup for each index. + entries.reserve(indices.size()); + for (auto idx : indices) { + LogsDBLogEntry& entry = entries.emplace_back(); + if (readLogEntry(idx, entry) != TernError::NO_ERROR) { + entry.idx = 0; + } + } +} + +void DataPartitions::writeLogEntries(const std::vector& entries) { + _maybeRotate(); + + rocksdb::WriteBatch batch; + std::vector> keys; + keys.reserve(entries.size()); + for (const auto& entry : entries) { + auto& partition = _getPartitionForIdx(entry.idx); + keys.emplace_back(U64Key::Static(entry.idx.u64)); + batch.Put(partition.cf, keys.back().toSlice(), rocksdb::Slice((const char*)entry.value.data(), entry.value.size())); + _partitionKeyInserted(partition, entry.idx); + } + ROCKS_DB_CHECKED(_sharedDb.db()->Write({}, &batch)); +} + +void DataPartitions::writeLogEntry(const LogsDBLogEntry& entry) { + _maybeRotate(); + + auto& partition = _getPartitionForIdx(entry.idx); + _sharedDb.db()->Put({}, partition.cf, U64Key::Static(entry.idx.u64).toSlice(), rocksdb::Slice((const char*)entry.value.data(), entry.value.size())); + _partitionKeyInserted(partition, entry.idx); + +} + +void DataPartitions::dropEntriesAfterIdx(LogIdx start) { + auto iterator = getIterator(); + size_t droppedEntriesCount = 0; + for (iterator.seek(start), iterator.next(); iterator.valid(); ++iterator) { + iterator.dropEntry(); + ++droppedEntriesCount; + } + LOG_INFO(_env,"Dropped %s entries after %s", droppedEntriesCount, start); +} + +LogIdx DataPartitions::getLowestKey() const { + return std::min(_partitions[0].firstWriteTime == 0 ? MAX_LOG_IDX : _partitions[0].minKey, _partitions[1].firstWriteTime == 0 ? MAX_LOG_IDX : _partitions[1].minKey); +} + +void DataPartitions::_updatePartitionFirstWriteTime(LogPartition& partition, TernTime time) { + ROCKS_DB_CHECKED(_sharedDb.db()->Put({}, _sharedDb.getCF(METADATA_CF_NAME), logsDBMetadataKey(partition.firstWriteKey), U64Value::Static(time.ns).toSlice())); + partition.firstWriteTime = time; +} + +std::vector> DataPartitions::_getPartitionIterators() const { + std::vector cfHandles; + cfHandles.reserve(_partitions.size()); + for (const auto& partition : _partitions) { + cfHandles.emplace_back(partition.cf); + } + std::vector> iterators; + iterators.reserve(_partitions.size()); + ROCKS_DB_CHECKED(_sharedDb.db()->NewIterators({}, cfHandles, (std::vector*)(&iterators))); + return iterators; +} + +void DataPartitions::_maybeRotate() { + auto& partition = _getPartitionForIdx(MAX_LOG_IDX); + if (likely(partition.firstWriteTime == 0 || (partition.firstWriteTime + LogsDBConsts::PARTITION_TIME_SPAN > ternNow()))) { + return; + } + // we only need to drop older partition and reset it's info. + // picking partition for writes/reads takes care of rest + auto& olderPartition = _partitions[0].minKey < _partitions[1].minKey ? _partitions[0] : _partitions[1]; + LOG_INFO(_env, "Rotating partions. Dropping partition %s, firstWriteTime: %s, minKey: %s, maxKey: %s", olderPartition.name, olderPartition.firstWriteTime, olderPartition.minKey, olderPartition.maxKey); + + _sharedDb.deleteCF(olderPartition.name); + olderPartition.reset(_sharedDb.createCF({olderPartition.name,{}}),0,0); + _updatePartitionFirstWriteTime(olderPartition, 0); + ++_rotationCount; +} + +LogPartition& DataPartitions::_getPartitionForIdx(LogIdx key) { + return const_cast(static_cast(this)->_getPartitionForIdx(key)); +} + +const LogPartition& DataPartitions::_getPartitionForIdx(LogIdx key) const { + // This is a bit of a mess of ifs but I (mcrnic) am unsure how to do it better at this point. + // Logic is roughly: + // 1. If both are empty we return partition 0. + // 2. If only 1 is empty then it's likely we just rotated and key will be larger than range of old partition so we return new one, + // if it fits in old partition (we are backfilling missed data) we returned the old one + // 3. Both contain data, likely the key is in newer partition (newerPartition.minKey) <= key + // Note that there is inefficiency in case of empty DB where first key will be written in partition 0 and second one will immediately go to partition 1 + // This is irrelevant from correctness of rotation/retention perspective and will be ignored. + if (unlikely(_partitions[0].firstWriteTime == 0 && _partitions[1].firstWriteTime == 0)) { + return _partitions[0]; + } + if (unlikely(_partitions[0].firstWriteTime == 0)) { + if (likely(_partitions[1].maxKey < key)) { + return _partitions[0]; + } + return _partitions[1]; + } + if (unlikely(_partitions[1].firstWriteTime == 0)) { + if (likely(_partitions[0].maxKey < key)) { + return _partitions[1]; + } + return _partitions[0]; + } + int newerPartitionIdx = _partitions[0].minKey < _partitions[1].minKey ? 1 : 0; + if (likely(_partitions[newerPartitionIdx].minKey <= key)) { + return _partitions[newerPartitionIdx]; + } + + return _partitions[newerPartitionIdx ^ 1]; +} + +void DataPartitions::_partitionKeyInserted(LogPartition& partition, LogIdx idx) { + if (unlikely(partition.minKey == 0)) { + partition.minKey = idx; + _updatePartitionFirstWriteTime(partition, ternNow()); + } + partition.minKey = std::min(partition.minKey, idx); + partition.maxKey = std::max(partition.maxKey, idx); +} diff --git a/cpp/core/logsdb/DataPartitions.hpp b/cpp/core/logsdb/DataPartitions.hpp new file mode 100644 index 00000000..3aa191c7 --- /dev/null +++ b/cpp/core/logsdb/DataPartitions.hpp @@ -0,0 +1,82 @@ +// Copyright 2025 XTX Markets Technologies Limited +// +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "../Env.hpp" +#include "../Protocol.hpp" +#include "../SharedRocksDB.hpp" +#include "../Time.hpp" +#include "LogsDBData.hpp" + +// Forward declarations +struct LogsDBLogEntry; + +struct LogPartition { + std::string name; + LogsDBMetadataKey firstWriteKey; + rocksdb::ColumnFamilyHandle* cf; + TernTime firstWriteTime{0}; + LogIdx minKey{0}; + LogIdx maxKey{0}; + + void reset(rocksdb::ColumnFamilyHandle* cf_, LogIdx minMaxKey, TernTime firstWriteTime_); +}; + +class DataPartitions { +public: + class Iterator { + public: + Iterator(const DataPartitions& partitions); + + void seek(LogIdx idx); + bool valid() const; + void next(); + Iterator& operator++(); + LogIdx key() const; + LogsDBLogEntry entry() const; + void dropEntry(); + + private: + void _updateSmaller(); + size_t _cfIndexForCurrentIterator() const; + + const DataPartitions& _partitions; + size_t _rotationCount; + rocksdb::Iterator* _smaller; + std::vector> _iterators; + }; + + DataPartitions(Env& env, SharedRocksDB& sharedDB); + + bool isInitialStart(); + bool init(bool initialStart); + Iterator getIterator() const; + TernError readLogEntry(LogIdx logIdx, LogsDBLogEntry& entry) const; + void readIndexedEntries(const std::vector& indices, std::vector& entries) const; + void writeLogEntries(const std::vector& entries); + void writeLogEntry(const LogsDBLogEntry& entry); + void dropEntriesAfterIdx(LogIdx start); + LogIdx getLowestKey() const; + +private: + void _updatePartitionFirstWriteTime(LogPartition& partition, TernTime time); + std::vector> _getPartitionIterators() const; + void _maybeRotate(); + LogPartition& _getPartitionForIdx(LogIdx key); + const LogPartition& _getPartitionForIdx(LogIdx key) const; + void _partitionKeyInserted(LogPartition& partition, LogIdx idx); + + Env& _env; + SharedRocksDB& _sharedDb; + size_t _rotationCount; + std::array _partitions; +}; diff --git a/cpp/core/logsdb/LeaderElection.cpp b/cpp/core/logsdb/LeaderElection.cpp new file mode 100644 index 00000000..2aff8694 --- /dev/null +++ b/cpp/core/logsdb/LeaderElection.cpp @@ -0,0 +1,480 @@ +// Copyright 2025 XTX Markets Technologies Limited +// +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#include "LeaderElection.hpp" + +#include "../Assert.hpp" +#include "../LogsDB.hpp" +#include "../Msgs.hpp" +#include "../Time.hpp" +#include "LogsDBCommon.hpp" + +std::ostream& operator<<(std::ostream& out, LeadershipState state) { + switch (state) { + case LeadershipState::FOLLOWER: + out << "FOLLOWER"; + break; + case LeadershipState::BECOMING_NOMINEE: + out << "BECOMING_NOMINEE"; + break; + case LeadershipState::DIGESTING_ENTRIES: + out << "DIGESTING_ENTRIES"; + break; + case LeadershipState::CONFIRMING_REPLICATION: + out << "CONFIRMING_REPLICATION"; + break; + case LeadershipState::CONFIRMING_LEADERSHIP: + out << "CONFIRMING_LEADERSHIP"; + break; + case LeadershipState::LEADER: + out << "LEADER"; + break; + } + return out; +} + +LeaderElection::LeaderElection(Env& env, LogsDBStats& stats, bool noReplication, bool avoidBeingLeader, ReplicaId replicaId, LogMetadata& metadata, DataPartitions& data, ReqResp& reqResp) : + _env(env), + _stats(stats), + _noReplication(noReplication), + _avoidBeingLeader(avoidBeingLeader), + _replicaId(replicaId), + _metadata(metadata), + _data(data), + _reqResp(reqResp), + _state(LeadershipState::FOLLOWER), + _leaderLastActive(_noReplication ? 0 :ternNow()) {} + +bool LeaderElection::isLeader() const { + return _state == LeadershipState::LEADER; +} + +void LeaderElection::maybeStartLeaderElection() { + if (unlikely(_avoidBeingLeader)) { + return; + } + auto now = ternNow(); + if (_state != LeadershipState::FOLLOWER || + (_leaderLastActive + LogsDBConsts::LEADER_INACTIVE_TIMEOUT > now)) { + update_atomic_stat_ema(_stats.leaderLastActive, now - _leaderLastActive); + return; + } + auto nomineeToken = _metadata.generateNomineeToken(); + LOG_INFO(_env,"Starting new leader election round with token %s", nomineeToken); + _metadata.setNomineeToken(nomineeToken); + _state = LeadershipState::BECOMING_NOMINEE; + + _electionState.reset(new LeaderElectionState()); + _electionState->lastReleased = _metadata.getLastReleased(); + _leaderLastActive = now; + + //if (unlikely(_noReplication)) { + { + LOG_INFO(_env,"ForceLeader set, skipping to confirming leader phase"); + _electionState->requestIds.fill(ReqResp::CONFIRMED_REQ_ID); + _tryBecomeLeader(); + return; + } + auto& newLeaderRequestIds = _electionState->requestIds; + for (ReplicaId replicaId = 0; replicaId.u8 < newLeaderRequestIds.size(); ++replicaId.u8) { + if (replicaId == _replicaId) { + newLeaderRequestIds[replicaId.u8] = ReqResp::CONFIRMED_REQ_ID; + continue; + } + auto& request = _reqResp.newRequest(replicaId); + newLeaderRequestIds[replicaId.u8] = request.msg.id; + + auto& newLeaderRequest = request.msg.body.setNewLeader(); + newLeaderRequest.nomineeToken = nomineeToken; + } +} + +void LeaderElection::proccessNewLeaderResponse(ReplicaId fromReplicaId, LogsDBRequest& request, const NewLeaderResp& response) { + LOG_DEBUG(_env, "Received NEW_LEADER response %s from replicaId %s", response, fromReplicaId); + ALWAYS_ASSERT(_state == LeadershipState::BECOMING_NOMINEE, "In state %s Received NEW_LEADER response %s", _state, response); + auto& state = *_electionState; + ALWAYS_ASSERT(_electionState->requestIds[fromReplicaId.u8] == request.msg.id); + auto result = TernError(response.result); + switch (result) { + case TernError::NO_ERROR: + _electionState->requestIds[request.replicaId.u8] = ReqResp::CONFIRMED_REQ_ID; + _electionState->lastReleased = std::max(_electionState->lastReleased, response.lastReleased); + _reqResp.eraseRequest(request.msg.id); + _tryProgressToDigest(); + break; + case TernError::LEADER_PREEMPTED: + resetLeaderElection(); + break; + default: + LOG_ERROR(_env, "Unexpected result %s in NEW_LEADER message, %s", result, response); + break; + } +} + +void LeaderElection::proccessNewLeaderConfirmResponse(ReplicaId fromReplicaId, LogsDBRequest& request, const NewLeaderConfirmResp& response) { + ALWAYS_ASSERT(_state == LeadershipState::CONFIRMING_LEADERSHIP, "In state %s Received NEW_LEADER_CONFIRM response %s", _state, response); + ALWAYS_ASSERT(_electionState->requestIds[fromReplicaId.u8] == request.msg.id); + + auto result = TernError(response.result); + switch (result) { + case TernError::NO_ERROR: + _electionState->requestIds[request.replicaId.u8] = 0; + _reqResp.eraseRequest(request.msg.id); + LOG_DEBUG(_env,"trying to become leader"); + _tryBecomeLeader(); + break; + case TernError::LEADER_PREEMPTED: + resetLeaderElection(); + break; + default: + LOG_ERROR(_env, "Unexpected result %s in NEW_LEADER_CONFIRM message, %s", result, response); + break; + } +} + +void LeaderElection::proccessRecoveryReadResponse(ReplicaId fromReplicaId, LogsDBRequest& request, const LogRecoveryReadResp& response) { + ALWAYS_ASSERT(_state == LeadershipState::DIGESTING_ENTRIES, "In state %s Received LOG_RECOVERY_READ response %s", _state, response); + auto& state = *_electionState; + auto result = TernError(response.result); + switch (result) { + case TernError::NO_ERROR: + case TernError::LOG_ENTRY_MISSING: + { + ALWAYS_ASSERT(state.lastReleased < request.msg.body.getLogRecoveryRead().idx); + auto entryOffset = request.msg.body.getLogRecoveryRead().idx.u64 - state.lastReleased.u64 - 1; + ALWAYS_ASSERT(entryOffset < LogsDBConsts::IN_FLIGHT_APPEND_WINDOW); + ALWAYS_ASSERT(state.recoveryRequests[entryOffset][request.replicaId.u8] == request.msg.id); + auto& entry = state.recoveryEntries[entryOffset]; + if (response.value.els.size() != 0) { + // we found a record here, we don't care about other answers + entry.value = response.value.els; + _reqResp.cleanupRequests(state.recoveryRequests[entryOffset]); + } else { + state.recoveryRequests[entryOffset][request.replicaId.u8] = 0; + _reqResp.eraseRequest(request.msg.id); + } + _tryProgressToReplication(); + break; + } + case TernError::LEADER_PREEMPTED: + LOG_DEBUG(_env, "Got preempted during recovery by replica %s",fromReplicaId); + resetLeaderElection(); + break; + default: + LOG_ERROR(_env, "Unexpected result %s in LOG_RECOVERY_READ message, %s", result, response); + break; + } +} + +void LeaderElection::proccessRecoveryWriteResponse(ReplicaId fromReplicaId, LogsDBRequest& request, const LogRecoveryWriteResp& response) { + ALWAYS_ASSERT(_state == LeadershipState::CONFIRMING_REPLICATION, "In state %s Received LOG_RECOVERY_WRITE response %s", _state, response); + auto& state = *_electionState; + auto result = TernError(response.result); + switch (result) { + case TernError::NO_ERROR: + { + ALWAYS_ASSERT(state.lastReleased < request.msg.body.getLogRecoveryWrite().idx); + auto entryOffset = request.msg.body.getLogRecoveryWrite().idx.u64 - state.lastReleased.u64 - 1; + ALWAYS_ASSERT(entryOffset < LogsDBConsts::IN_FLIGHT_APPEND_WINDOW); + ALWAYS_ASSERT(state.recoveryRequests[entryOffset][request.replicaId.u8] == request.msg.id); + state.recoveryRequests[entryOffset][request.replicaId.u8] = 0; + _reqResp.eraseRequest(request.msg.id); + _tryProgressToLeaderConfirm(); + break; + } + case TernError::LEADER_PREEMPTED: + resetLeaderElection(); + break; + default: + LOG_ERROR(_env, "Unexpected result %s in LOG_RECOVERY_READ message, %s", result, response); + break; + } +} + +void LeaderElection::proccessNewLeaderRequest(ReplicaId fromReplicaId, uint64_t requestId, const NewLeaderReq& request) { + if (unlikely(fromReplicaId != request.nomineeToken.replica())) { + LOG_ERROR(_env, "Nominee token from replica id %s does not have matching replica id. Token: %s", fromReplicaId, request.nomineeToken); + return; + } + auto& response = _reqResp.newResponse( fromReplicaId, requestId); + auto& newLeaderResponse = response.msg.body.setNewLeader(); + + if (request.nomineeToken.idx() <= _metadata.getLeaderToken().idx() || request.nomineeToken < _metadata.getNomineeToken()) { + newLeaderResponse.result = TernError::LEADER_PREEMPTED; + return; + } + + newLeaderResponse.result = TernError::NO_ERROR; + newLeaderResponse.lastReleased = _metadata.getLastReleased(); + _leaderLastActive = ternNow(); + + if (_metadata.getNomineeToken() == request.nomineeToken) { + return; + } + + resetLeaderElection(); + _metadata.setNomineeToken(request.nomineeToken); +} + +void LeaderElection::proccessNewLeaderConfirmRequest(ReplicaId fromReplicaId, uint64_t requestId, const NewLeaderConfirmReq& request) { + if (unlikely(fromReplicaId != request.nomineeToken.replica())) { + LOG_ERROR(_env, "Nominee token from replica id %s does not have matching replica id. Token: %s", fromReplicaId, request.nomineeToken); + return; + } + auto& response = _reqResp.newResponse(fromReplicaId, requestId); + auto& newLeaderConfirmResponse = response.msg.body.setNewLeaderConfirm(); + if (_metadata.getNomineeToken() == request.nomineeToken) { + _metadata.setLastReleased(request.releasedIdx); + } + + auto err = _metadata.updateLeaderToken(request.nomineeToken); + newLeaderConfirmResponse.result = err; + if (err == TernError::NO_ERROR) { + _leaderLastActive = ternNow(); + resetLeaderElection(); + } +} + +void LeaderElection::proccessRecoveryReadRequest(ReplicaId fromReplicaId, uint64_t requestId, const LogRecoveryReadReq& request) { + if (unlikely(fromReplicaId != request.nomineeToken.replica())) { + LOG_ERROR(_env, "Nominee token from replica id %s does not have matching replica id. Token: %s", fromReplicaId, request.nomineeToken); + return; + } + auto& response = _reqResp.newResponse(fromReplicaId, requestId); + auto& recoveryReadResponse = response.msg.body.setLogRecoveryRead(); + if (request.nomineeToken != _metadata.getNomineeToken()) { + recoveryReadResponse.result = TernError::LEADER_PREEMPTED; + return; + } + _leaderLastActive = ternNow(); + LogsDBLogEntry entry; + auto err = _data.readLogEntry(request.idx, entry); + recoveryReadResponse.result = err; + if (err == TernError::NO_ERROR) { + recoveryReadResponse.value.els = entry.value; + } +} + +void LeaderElection::proccessRecoveryWriteRequest(ReplicaId fromReplicaId, uint64_t requestId, const LogRecoveryWriteReq& request) { + if (unlikely(fromReplicaId != request.nomineeToken.replica())) { + LOG_ERROR(_env, "Nominee token from replica id %s does not have matching replica id. Token: %s", fromReplicaId, request.nomineeToken); + return; + } + auto& response = _reqResp.newResponse(fromReplicaId, requestId); + auto& recoveryWriteResponse = response.msg.body.setLogRecoveryWrite(); + if (request.nomineeToken != _metadata.getNomineeToken()) { + recoveryWriteResponse.result = TernError::LEADER_PREEMPTED; + return; + } + _leaderLastActive = ternNow(); + LogsDBLogEntry entry; + entry.idx = request.idx; + entry.value = request.value.els; + _data.writeLogEntry(entry); + recoveryWriteResponse.result = TernError::NO_ERROR; +} + +TernError LeaderElection::writeLogEntries(LeaderToken token, LogIdx newlastReleased, std::vector& entries) { + auto err = _metadata.updateLeaderToken(token); + if (err != TernError::NO_ERROR) { + return err; + } + _clearElectionState(); + _data.writeLogEntries(entries); + if (_metadata.getLastReleased() < newlastReleased) { + _metadata.setLastReleased(newlastReleased); + } + return TernError::NO_ERROR; +} + +void LeaderElection::resetLeaderElection() { + if (isLeader()) { + LOG_INFO(_env,"Preempted as leader. Reseting leader election. Becoming follower"); + } else { + LOG_INFO(_env,"Reseting leader election. Becoming follower of leader with token %s", _metadata.getLeaderToken()); + } + _state = LeadershipState::FOLLOWER; + _leaderLastActive = ternNow(); + _metadata.setNomineeToken(LeaderToken(0,0)); + _clearElectionState(); +} + +void LeaderElection::_tryProgressToDigest() { + ALWAYS_ASSERT(_state == LeadershipState::BECOMING_NOMINEE); + LOG_DEBUG(_env, "trying to progress to digest"); + if (!ReqResp::isQuorum(_electionState->requestIds)) { + return; + } + _reqResp.cleanupRequests(_electionState->requestIds); + _state = LeadershipState::DIGESTING_ENTRIES; + LOG_INFO(_env,"Became nominee with token: %s", _metadata.getNomineeToken()); + + // We might have gotten a higher release point. We can safely update + _metadata.setLastReleased(_electionState->lastReleased); + + // Populate entries we have and don't ask for them + std::vector entries; + entries.reserve(LogsDBConsts::IN_FLIGHT_APPEND_WINDOW); + auto it = _data.getIterator(); + it.seek(_electionState->lastReleased); + it.next(); + for(; it.valid(); ++it) { + entries.emplace_back(it.entry()); + } + ALWAYS_ASSERT(entries.size() <= LogsDBConsts::IN_FLIGHT_APPEND_WINDOW); + for (auto& entry : entries) { + auto offset = entry.idx.u64 - _electionState->lastReleased.u64 - 1; + _electionState->recoveryEntries[offset] = entry; + } + + // Ask for all non populated entries + for(size_t i = 0; i < _electionState->recoveryEntries.size(); ++i) { + auto& entry = _electionState->recoveryEntries[i]; + if (!entry.value.empty()) { + continue; + } + entry.idx = _electionState->lastReleased + i + 1; + auto& requestIds = _electionState->recoveryRequests[i]; + auto& participatingReplicas = _electionState->requestIds; + for(ReplicaId replicaId = 0; replicaId.u8 < LogsDBConsts::REPLICA_COUNT; ++replicaId.u8) { + if (replicaId == _replicaId) { + requestIds[replicaId.u8] = ReqResp::CONFIRMED_REQ_ID; + continue; + } + if (participatingReplicas[replicaId.u8] != ReqResp::CONFIRMED_REQ_ID) { + requestIds[replicaId.u8] = ReqResp::UNUSED_REQ_ID; + continue; + } + auto& request = _reqResp.newRequest(replicaId); + auto& recoveryRead = request.msg.body.setLogRecoveryRead(); + recoveryRead.idx = entry.idx; + recoveryRead.nomineeToken = _metadata.getNomineeToken(); + requestIds[replicaId.u8] = request.msg.id; + } + } +} + +void LeaderElection::_tryProgressToReplication() { + ALWAYS_ASSERT(_state == LeadershipState::DIGESTING_ENTRIES); + bool canMakeProgress{false}; + for(size_t i = 0; i < _electionState->recoveryEntries.size(); ++i) { + if (_electionState->recoveryEntries[i].value.empty()) { + auto& requestIds = _electionState->recoveryRequests[i]; + if (ReqResp::isQuorum(requestIds)) { + canMakeProgress = true; + } + if (canMakeProgress) { + _reqResp.cleanupRequests(requestIds); + continue; + } + return; + } + } + // If we came here it means whole array contains records + // Send replication requests until first hole + _state = LeadershipState::CONFIRMING_REPLICATION; + std::vector entries; + entries.reserve(_electionState->recoveryEntries.size()); + for(size_t i = 0; i < _electionState->recoveryEntries.size(); ++i) { + auto& entry = _electionState->recoveryEntries[i]; + if (entry.value.empty()) { + break; + } + auto& requestIds = _electionState->recoveryRequests[i]; + auto& participatingReplicas = _electionState->requestIds; + for (ReplicaId replicaId = 0; replicaId.u8 < LogsDBConsts::REPLICA_COUNT; ++replicaId.u8) { + if (replicaId == replicaId) { + requestIds[replicaId.u8] = ReqResp::CONFIRMED_REQ_ID; + continue; + } + if (participatingReplicas[replicaId.u8] != ReqResp::CONFIRMED_REQ_ID) { + requestIds[replicaId.u8] = ReqResp::UNUSED_REQ_ID; + continue; + } + entries.emplace_back(entry); + auto& request = _reqResp.newRequest(replicaId); + auto& recoveryWrite = request.msg.body.setLogRecoveryWrite(); + recoveryWrite.idx = entry.idx; + recoveryWrite.nomineeToken = _metadata.getNomineeToken(); + recoveryWrite.value.els = entry.value; + requestIds[replicaId.u8] = request.msg.id; + } + } + LOG_INFO(_env,"Digesting complete progressing to replication of %s entries with token: %s", entries.size(), _metadata.getNomineeToken()); + if (entries.empty()) { + _tryProgressToLeaderConfirm(); + } else { + _data.writeLogEntries(entries); + } +} + +void LeaderElection::_tryProgressToLeaderConfirm() { + ALWAYS_ASSERT(_state == LeadershipState::CONFIRMING_REPLICATION); + LogIdx newLastReleased = _electionState->lastReleased; + for(size_t i = 0; i < _electionState->recoveryEntries.size(); ++i) { + if (_electionState->recoveryEntries[i].value.empty()) { + break; + } + auto& requestIds = _electionState->recoveryRequests[i]; + if (!ReqResp::isQuorum(requestIds)) { + // we just confirmed replication up to this point. + // It is safe to move last released for us even if we don't become leader + // while not necessary for correctness it somewhat helps making progress in multiple preemtion case + _metadata.setLastReleased(newLastReleased); + return; + } + newLastReleased = _electionState->recoveryEntries[i].idx; + _reqResp.cleanupRequests(requestIds); + } + // we just confirmed replication up to this point. + // It is safe to move last released for us even if we don't become leader + // if we do become leader we guarantee state up here was readable + _metadata.setLastReleased(newLastReleased); + _state = LeadershipState::CONFIRMING_LEADERSHIP; + LOG_INFO(_env,"Replication of extra records complete. Progressing to CONFIRMING_LEADERSHIP with token: %s, newLastReleased: %s", _metadata.getNomineeToken(), newLastReleased); + + auto& requestIds = _electionState->requestIds; + for (ReplicaId replicaId = 0; replicaId.u8 < LogsDBConsts::REPLICA_COUNT; ++replicaId.u8) { + if (replicaId == _replicaId) { + requestIds[replicaId.u8] = ReqResp::CONFIRMED_REQ_ID; + continue; + } + if (requestIds[replicaId.u8] == ReqResp::UNUSED_REQ_ID) { + continue; + } + auto& request = _reqResp.newRequest(replicaId); + auto& recoveryConfirm = request.msg.body.setNewLeaderConfirm(); + recoveryConfirm.nomineeToken = _metadata.getNomineeToken(); + recoveryConfirm.releasedIdx = _metadata.getLastReleased(); + requestIds[replicaId.u8] = request.msg.id; + } +} + +void LeaderElection::_tryBecomeLeader() { + if (!ReqResp::isQuorum(_electionState->requestIds)) { + return; + } + auto nomineeToken = _metadata.getNomineeToken(); + ALWAYS_ASSERT(nomineeToken.replica() == _replicaId); + LOG_INFO(_env,"Became leader with token %s", nomineeToken); + _state = LeadershipState::LEADER; + ALWAYS_ASSERT(_metadata.updateLeaderToken(nomineeToken) == TernError::NO_ERROR); + _clearElectionState(); +} + +void LeaderElection::_clearElectionState() { + _leaderLastActive = ternNow(); + if (!_electionState) { + return; + } + _reqResp.cleanupRequests(_electionState->requestIds); + _clearRecoveryRequests(); + _electionState.reset(); +} + +void LeaderElection::_clearRecoveryRequests() { + for(auto& requestIds : _electionState->recoveryRequests) { + _reqResp.cleanupRequests(requestIds); + } +} diff --git a/cpp/core/logsdb/LeaderElection.hpp b/cpp/core/logsdb/LeaderElection.hpp new file mode 100644 index 00000000..4d9c41e6 --- /dev/null +++ b/cpp/core/logsdb/LeaderElection.hpp @@ -0,0 +1,87 @@ +// Copyright 2025 XTX Markets Technologies Limited +// +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#pragma once + +#include +#include +#include + +#include "../Env.hpp" +#include "../Protocol.hpp" +#include "DataPartitions.hpp" +#include "LogMetadata.hpp" +#include "LogsDBCommon.hpp" +#include "LogsDBTypes.hpp" +#include "ReqResp.hpp" + +// Forward declarations +struct LogsDBRequest; +struct LogsDBStats; +struct NewLeaderReq; +struct NewLeaderResp; +struct NewLeaderConfirmReq; +struct NewLeaderConfirmResp; +struct LogRecoveryReadReq; +struct LogRecoveryReadResp; +struct LogRecoveryWriteReq; +struct LogRecoveryWriteResp; +class LogsDB; + +enum class LeadershipState : uint8_t { + FOLLOWER, + BECOMING_NOMINEE, + DIGESTING_ENTRIES, + CONFIRMING_REPLICATION, + CONFIRMING_LEADERSHIP, + LEADER +}; + +std::ostream& operator<<(std::ostream& out, LeadershipState state); + +struct LeaderElectionState { + ReqResp::QuorumTrackArray requestIds; + LogIdx lastReleased; + std::array recoveryRequests; + std::array recoveryEntries; +}; + +class LeaderElection { +public: + LeaderElection(Env& env, LogsDBStats& stats, bool noReplication, bool avoidBeingLeader, ReplicaId replicaId, LogMetadata& metadata, DataPartitions& data, ReqResp& reqResp); + + bool isLeader() const; + void maybeStartLeaderElection(); + void proccessNewLeaderResponse(ReplicaId fromReplicaId, LogsDBRequest& request, const NewLeaderResp& response); + void proccessNewLeaderConfirmResponse(ReplicaId fromReplicaId, LogsDBRequest& request, const NewLeaderConfirmResp& response); + void proccessRecoveryReadResponse(ReplicaId fromReplicaId, LogsDBRequest& request, const LogRecoveryReadResp& response); + void proccessRecoveryWriteResponse(ReplicaId fromReplicaId, LogsDBRequest& request, const LogRecoveryWriteResp& response); + void proccessNewLeaderRequest(ReplicaId fromReplicaId, uint64_t requestId, const NewLeaderReq& request); + void proccessNewLeaderConfirmRequest(ReplicaId fromReplicaId, uint64_t requestId, const NewLeaderConfirmReq& request); + void proccessRecoveryReadRequest(ReplicaId fromReplicaId, uint64_t requestId, const LogRecoveryReadReq& request); + void proccessRecoveryWriteRequest(ReplicaId fromReplicaId, uint64_t requestId, const LogRecoveryWriteReq& request); + TernError writeLogEntries(LeaderToken token, LogIdx newlastReleased, std::vector& entries); + void resetLeaderElection(); + +private: + void _tryProgressToDigest(); + void _tryProgressToReplication(); + void _tryProgressToLeaderConfirm(); + void _tryBecomeLeader(); + void _clearElectionState(); + void _clearRecoveryRequests(); + + Env& _env; + LogsDBStats& _stats; + const bool _noReplication; + const bool _avoidBeingLeader; + const ReplicaId _replicaId; + LogMetadata& _metadata; + DataPartitions& _data; + ReqResp& _reqResp; + + LeadershipState _state; + std::unique_ptr _electionState; + TernTime _leaderLastActive; +}; diff --git a/cpp/core/logsdb/LogMetadata.cpp b/cpp/core/logsdb/LogMetadata.cpp new file mode 100644 index 00000000..ce4be37f --- /dev/null +++ b/cpp/core/logsdb/LogMetadata.cpp @@ -0,0 +1,137 @@ +// Copyright 2025 XTX Markets Technologies Limited +// +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#include "LogMetadata.hpp" + +#include "../Assert.hpp" +#include "../Bincode.hpp" +#include "../LogsDB.hpp" +#include "../RocksDBUtils.hpp" +#include "LogsDBCommon.hpp" + +LogMetadata::LogMetadata(Env& env, LogsDBStats& stats, SharedRocksDB& sharedDb, ReplicaId replicaId, DataPartitions& data) : + _env(env), + _stats(stats), + _sharedDb(sharedDb), + _cf(sharedDb.getCF(METADATA_CF_NAME)), + _replicaId(replicaId), + _data(data), + _nomineeToken(LeaderToken(0,0)) +{} + +bool LogMetadata::isInitialStart() { + auto it = std::unique_ptr(_sharedDb.db()->NewIterator({},_cf)); + it->SeekToFirst(); + return !it->Valid(); +} + +bool LogMetadata::init(bool initialStart) { + bool initSuccess = true; + std::string value; + if (tryGet(_sharedDb.db(), _cf, logsDBMetadataKey(LEADER_TOKEN_KEY), value)) { + _leaderToken.u64 = ExternalValue::FromSlice(value)().u64(); + LOG_INFO(_env, "Loaded leader token %s", _leaderToken); + } else if (initialStart) { + _leaderToken = LeaderToken(0,0); + LOG_INFO(_env, "Leader token not found. Using %s", _leaderToken); + ROCKS_DB_CHECKED(_sharedDb.db()->Put({}, _cf, logsDBMetadataKey(LEADER_TOKEN_KEY), U64Value::Static(_leaderToken.u64).toSlice())); + } else { + initSuccess = false; + LOG_ERROR(_env, "Leader token not found! Possible DB corruption!"); + } + + if (tryGet(_sharedDb.db(), _cf, logsDBMetadataKey(LAST_RELEASED_IDX_KEY), value)) { + _lastReleased = ExternalValue::FromSlice(value)().u64(); + LOG_INFO(_env, "Loaded last released %s", _lastReleased); + } else if (initialStart) { + LOG_INFO(_env, "Last released not found. Using %s", 0); + setLastReleased(0); + } else { + initSuccess = false; + LOG_ERROR(_env, "Last released not found! Possible DB corruption!"); + } + + if (tryGet(_sharedDb.db(),_cf, logsDBMetadataKey(LAST_RELEASED_TIME_KEY), value)) { + _lastReleasedTime = ExternalValue::FromSlice(value)().u64(); + LOG_INFO(_env, "Loaded last released time %s", _lastReleasedTime); + } else { + initSuccess = false; + LOG_ERROR(_env, "Last released time not found! Possible DB corruption!"); + } + _stats.currentEpoch.store(_leaderToken.idx().u64, std::memory_order_relaxed); + return initSuccess; +} + +ReplicaId LogMetadata::getReplicaId() const { + return _replicaId; +} + +LogIdx LogMetadata::assignLogIdx() { + ALWAYS_ASSERT(_leaderToken.replica() ==_replicaId); + return ++_lastAssigned; +} + +LeaderToken LogMetadata::getLeaderToken() const { + return _leaderToken; +} + +TernError LogMetadata::updateLeaderToken(LeaderToken token) { + if (unlikely(token < _leaderToken || token < _nomineeToken)) { + return TernError::LEADER_PREEMPTED; + } + if (likely(token == _leaderToken)) { + return TernError::NO_ERROR; + } + _data.dropEntriesAfterIdx(_lastReleased); + ROCKS_DB_CHECKED(_sharedDb.db()->Put({}, _cf, logsDBMetadataKey(LEADER_TOKEN_KEY), U64Value::Static(token.u64).toSlice())); + if (_leaderToken != token && token.replica() == _replicaId) { + // We just became leader, at this point last released should be the last known entry + _lastAssigned = _lastReleased; + } + _leaderToken = token; + _stats.currentEpoch.store(_leaderToken.idx().u64, std::memory_order_relaxed); + _nomineeToken = LeaderToken(0,0); + return TernError::NO_ERROR; +} + +LeaderToken LogMetadata::getNomineeToken() const { + return _nomineeToken; +} + +void LogMetadata::setNomineeToken(LeaderToken token) { + if (++_leaderToken.idx() < _nomineeToken.idx()) { + LOG_INFO(_env, "Got a nominee token for epoch %s, last leader epoch is %s, we must have skipped leader election.", _nomineeToken.idx(), _leaderToken.idx()); + _data.dropEntriesAfterIdx(_lastReleased); + } + _nomineeToken = token; +} + +LeaderToken LogMetadata::generateNomineeToken() const { + auto lastEpoch = _leaderToken.idx(); + return LeaderToken(_replicaId, ++lastEpoch); +} + +LogIdx LogMetadata::getLastReleased() const { + return _lastReleased; +} + +TernTime LogMetadata::getLastReleasedTime() const { + return _lastReleasedTime; +} + +void LogMetadata::setLastReleased(LogIdx lastReleased) { + ALWAYS_ASSERT(_lastReleased <= lastReleased, "Moving release point backwards is not possible. It would cause data inconsistency"); + auto now = ternNow(); + rocksdb::WriteBatch batch; + batch.Put(_cf, logsDBMetadataKey(LAST_RELEASED_IDX_KEY), U64Value::Static(lastReleased.u64).toSlice()); + batch.Put(_cf, logsDBMetadataKey(LAST_RELEASED_TIME_KEY),U64Value::Static(now.ns).toSlice()); + ROCKS_DB_CHECKED(_sharedDb.db()->Write({}, &batch)); + update_atomic_stat_ema(_stats.entriesReleased, lastReleased.u64 - _lastReleased.u64); + _lastReleased = lastReleased; + _lastReleasedTime = now; +} + +bool LogMetadata::isPreempting(LeaderToken token) const { + return _leaderToken < token && _nomineeToken < token; +} diff --git a/cpp/core/logsdb/LogMetadata.hpp b/cpp/core/logsdb/LogMetadata.hpp new file mode 100644 index 00000000..78074281 --- /dev/null +++ b/cpp/core/logsdb/LogMetadata.hpp @@ -0,0 +1,47 @@ +// Copyright 2025 XTX Markets Technologies Limited +// +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#pragma once + +#include "../Env.hpp" +#include "../Protocol.hpp" +#include "../SharedRocksDB.hpp" +#include "../Time.hpp" +#include "DataPartitions.hpp" + +// Forward declarations +struct LogsDBStats; + +class LogMetadata { +public: + LogMetadata(Env& env, LogsDBStats& stats, SharedRocksDB& sharedDb, ReplicaId replicaId, DataPartitions& data); + + bool isInitialStart(); + bool init(bool initialStart); + ReplicaId getReplicaId() const; + LogIdx assignLogIdx(); + LeaderToken getLeaderToken() const; + TernError updateLeaderToken(LeaderToken token); + LeaderToken getNomineeToken() const; + void setNomineeToken(LeaderToken token); + LeaderToken generateNomineeToken() const; + LogIdx getLastReleased() const; + TernTime getLastReleasedTime() const; + void setLastReleased(LogIdx lastReleased); + bool isPreempting(LeaderToken token) const; + +private: + Env& _env; + LogsDBStats& _stats; + SharedRocksDB& _sharedDb; + rocksdb::ColumnFamilyHandle* _cf; + const ReplicaId _replicaId; + DataPartitions& _data; + + LogIdx _lastAssigned; + LogIdx _lastReleased; + TernTime _lastReleasedTime; + LeaderToken _leaderToken; + LeaderToken _nomineeToken; +}; diff --git a/cpp/core/logsdb/LogsDBCommon.hpp b/cpp/core/logsdb/LogsDBCommon.hpp new file mode 100644 index 00000000..8305b211 --- /dev/null +++ b/cpp/core/logsdb/LogsDBCommon.hpp @@ -0,0 +1,52 @@ +// Copyright 2025 XTX Markets Technologies Limited +// +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "../Bincode.hpp" +#include "../RocksDBUtils.hpp" +#include "../Time.hpp" + +// LogsDB constants - moved here to avoid circular dependencies +namespace LogsDBConsts { + static constexpr size_t REPLICA_COUNT = 5; + static constexpr Duration PARTITION_TIME_SPAN = 12_hours; + static constexpr Duration RESPONSE_TIMEOUT = 10_ms; + static constexpr Duration READ_TIMEOUT = 1_sec; + static constexpr Duration SEND_RELEASE_INTERVAL = 300_ms; + static constexpr Duration LEADER_INACTIVE_TIMEOUT = 1_sec; + static constexpr size_t IN_FLIGHT_APPEND_WINDOW = 1 << 8; + static constexpr size_t CATCHUP_WINDOW = 1 << 8; +} + +// Helper functions for LogsDB + +static inline bool tryGet(rocksdb::DB* db, rocksdb::ColumnFamilyHandle* cf, const rocksdb::Slice& key, std::string& value) { + auto status = db->Get({}, cf, key, &value); + if (status.IsNotFound()) { + return false; + } + ROCKS_DB_CHECKED(status); + return true; +} + +static inline void update_atomic_stat_ema(std::atomic& stat, double newValue) { + stat.store((stat.load(std::memory_order_relaxed)* 0.95 + newValue * 0.05), std::memory_order_relaxed); +} + +static inline void update_atomic_stat_ema(std::atomic& stat, Duration newValue) { + stat.store((Duration)((double)stat.load(std::memory_order_relaxed).ns * 0.95 + (double)newValue.ns * 0.05), std::memory_order_relaxed); +} + +// Column family names +static constexpr auto METADATA_CF_NAME = "logMetadata"; +static constexpr auto DATA_PARTITION_0_NAME = "logTimePartition0"; +static constexpr auto DATA_PARTITION_1_NAME = "logTimePartition1"; diff --git a/cpp/core/LogsDBData.hpp b/cpp/core/logsdb/LogsDBData.hpp similarity index 100% rename from cpp/core/LogsDBData.hpp rename to cpp/core/logsdb/LogsDBData.hpp diff --git a/cpp/core/logsdb/LogsDBTypes.hpp b/cpp/core/logsdb/LogsDBTypes.hpp new file mode 100644 index 00000000..140582b8 --- /dev/null +++ b/cpp/core/logsdb/LogsDBTypes.hpp @@ -0,0 +1,39 @@ +// Copyright 2025 XTX Markets Technologies Limited +// +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#pragma once + +#include +#include +#include + +#include "../Protocol.hpp" +#include "../Time.hpp" + +// Core LogsDB types that need to be fully defined for helper classes + +struct LogsDBLogEntry { + LogIdx idx; + std::vector value; + bool operator==(const LogsDBLogEntry& oth) const { + return idx == oth.idx && value == oth.value; + } +}; + +std::ostream& operator<<(std::ostream& out, const LogsDBLogEntry& entry); + +struct LogsDBRequest { + ReplicaId replicaId; + TernTime sentTime; + LogReqMsg msg; +}; + +std::ostream& operator<<(std::ostream& out, const LogsDBRequest& entry); + +struct LogsDBResponse { + ReplicaId replicaId; + LogRespMsg msg; +}; + +std::ostream& operator<<(std::ostream& out, const LogsDBResponse& entry); diff --git a/cpp/core/logsdb/ReqResp.cpp b/cpp/core/logsdb/ReqResp.cpp new file mode 100644 index 00000000..c9cd4291 --- /dev/null +++ b/cpp/core/logsdb/ReqResp.cpp @@ -0,0 +1,106 @@ +// Copyright 2025 XTX Markets Technologies Limited +// +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#include "ReqResp.hpp" + +#include "../LogsDB.hpp" +#include "../Time.hpp" +#include "LogsDBCommon.hpp" + +ReqResp::ReqResp(LogsDBStats& stats) : _stats(stats), _lastAssignedRequest(CONFIRMED_REQ_ID) {} + +LogsDBRequest& ReqResp::newRequest(ReplicaId targetReplicaId) { + auto& request = _requests[++_lastAssignedRequest]; + request.replicaId = targetReplicaId; + request.msg.id = _lastAssignedRequest; + return request; +} + +LogsDBRequest* ReqResp::getRequest(uint64_t requestId) { + auto it = _requests.find(requestId); + if (it == _requests.end()) { + return nullptr; + } + return &it->second; +} + +void ReqResp::eraseRequest(uint64_t requestId) { + _requests.erase(requestId); +} + +void ReqResp::cleanupRequests(QuorumTrackArray& requestIds) { + for (auto& reqId : requestIds) { + if (reqId == CONFIRMED_REQ_ID || reqId == UNUSED_REQ_ID) { + continue; + } + eraseRequest(reqId); + reqId = ReqResp::UNUSED_REQ_ID; + } +} + +void ReqResp::resendTimedOutRequests() { + auto now = ternNow(); + auto defaultCutoffTime = now - LogsDBConsts::RESPONSE_TIMEOUT; + auto releaseCutoffTime = now - LogsDBConsts::SEND_RELEASE_INTERVAL; + auto readCutoffTime = now - LogsDBConsts::READ_TIMEOUT; + auto cutoffTime = now; + uint64_t timedOutCount{0}; + for (auto& r : _requests) { + switch (r.second.msg.body.kind()) { + case LogMessageKind::RELEASE: + cutoffTime = releaseCutoffTime; + break; + case LogMessageKind::LOG_READ: + cutoffTime = readCutoffTime; + break; + default: + cutoffTime = defaultCutoffTime; + } + if (r.second.sentTime < cutoffTime) { + r.second.sentTime = now; + _requestsToSend.emplace_back(&r.second); + if (r.second.msg.body.kind() != LogMessageKind::RELEASE) { + ++timedOutCount; + } + } + } + update_atomic_stat_ema(_stats.requestsTimedOut, timedOutCount); +} + +void ReqResp::getRequestsToSend(std::vector& requests) { + requests.swap(_requestsToSend); + update_atomic_stat_ema(_stats.requestsSent, requests.size()); + _requestsToSend.clear(); +} + +LogsDBResponse& ReqResp::newResponse(ReplicaId targetReplicaId, uint64_t requestId) { + _responses.emplace_back(); + auto& response = _responses.back(); + response.replicaId = targetReplicaId; + response.msg.id = requestId; + return response; +} + +void ReqResp::getResponsesToSend(std::vector& responses) { + responses.swap(_responses); + update_atomic_stat_ema(_stats.responsesSent, responses.size()); + _responses.clear(); +} + +Duration ReqResp::getNextTimeout() const { + if (_requests.empty()) { + return LogsDBConsts::LEADER_INACTIVE_TIMEOUT; + } + return LogsDBConsts::RESPONSE_TIMEOUT; +} + +bool ReqResp::isQuorum(const QuorumTrackArray& requestIds) { + size_t numResponses = 0; + for (auto reqId : requestIds) { + if (reqId == CONFIRMED_REQ_ID) { + ++numResponses; + } + } + return numResponses > requestIds.size() / 2; +} diff --git a/cpp/core/logsdb/ReqResp.hpp b/cpp/core/logsdb/ReqResp.hpp new file mode 100644 index 00000000..f34317a7 --- /dev/null +++ b/cpp/core/logsdb/ReqResp.hpp @@ -0,0 +1,49 @@ +// Copyright 2025 XTX Markets Technologies Limited +// +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#pragma once + +#include +#include +#include +#include +#include + +#include "../Protocol.hpp" +#include "LogsDBCommon.hpp" + +// Forward declarations +struct LogsDBRequest; +struct LogsDBResponse; +struct LogsDBStats; + +class ReqResp { + public: + static constexpr size_t UNUSED_REQ_ID = std::numeric_limits::max(); + static constexpr size_t CONFIRMED_REQ_ID = 0; + + using QuorumTrackArray = std::array; + + ReqResp(LogsDBStats& stats); + + LogsDBRequest& newRequest(ReplicaId targetReplicaId); + LogsDBRequest* getRequest(uint64_t requestId); + void eraseRequest(uint64_t requestId); + void cleanupRequests(QuorumTrackArray& requestIds); + void resendTimedOutRequests(); + void getRequestsToSend(std::vector& requests); + LogsDBResponse& newResponse(ReplicaId targetReplicaId, uint64_t requestId); + void getResponsesToSend(std::vector& responses); + Duration getNextTimeout() const; + + static bool isQuorum(const QuorumTrackArray& requestIds); + +private: + LogsDBStats& _stats; + uint64_t _lastAssignedRequest; + std::unordered_map _requests; + std::vector _requestsToSend; + + std::vector _responses; +};