diff --git a/cpp/core/MsgsGen.cpp b/cpp/core/MsgsGen.cpp index 3da27aea..fa4b611d 100644 --- a/cpp/core/MsgsGen.cpp +++ b/cpp/core/MsgsGen.cpp @@ -663,6 +663,30 @@ std::ostream& operator<<(std::ostream& out, LogMessageKind kind) { return out; } +std::ostream& operator<<(std::ostream& out, SyncMessageKind kind) { + switch (kind) { + case SyncMessageKind::ERROR: + out << "ERROR"; + break; + case SyncMessageKind::SYNC_START: + out << "SYNC_START"; + break; + case SyncMessageKind::SYNC_CHUNK: + out << "SYNC_CHUNK"; + break; + case SyncMessageKind::SYNC_COMPLETE: + out << "SYNC_COMPLETE"; + break; + case SyncMessageKind::EMPTY: + out << "EMPTY"; + break; + default: + out << "SyncMessageKind(" << ((int)kind) << ")"; + break; + } + return out; +} + void FailureDomain::pack(BincodeBuf& buf) const { buf.packFixedBytes<16>(name); } @@ -1723,6 +1747,28 @@ std::ostream& operator<<(std::ostream& out, const LocationInfo& x) { return out; } +void ColumnFamilyInfo::pack(BincodeBuf& buf) const { + buf.packScalar(index); + buf.packBytes(name); +} +void ColumnFamilyInfo::unpack(BincodeBuf& buf) { + index = buf.unpackScalar(); + buf.unpackBytes(name); +} +void ColumnFamilyInfo::clear() { + index = uint32_t(0); + name.clear(); +} +bool ColumnFamilyInfo::operator==(const ColumnFamilyInfo& rhs) const { + if ((uint32_t)this->index != (uint32_t)rhs.index) { return false; }; + if (name != rhs.name) { return false; }; + return true; +} +std::ostream& operator<<(std::ostream& out, const ColumnFamilyInfo& x) { + out << "ColumnFamilyInfo(" << "Index=" << x.index << ", " << "Name=" << GoLangQuotedStringFmt(x.name.data(), x.name.size()) << ")"; + return out; +} + void LookupReq::pack(BincodeBuf& buf) const { dirId.pack(buf); buf.packBytes(name); @@ -5771,6 +5817,150 @@ std::ostream& operator<<(std::ostream& out, const LogRecoveryWriteResp& x) { return out; } +void SyncStartReq::pack(BincodeBuf& buf) const { + buf.packScalar(lastAppliedLogEntry); +} +void SyncStartReq::unpack(BincodeBuf& buf) { + lastAppliedLogEntry = buf.unpackScalar(); +} +void SyncStartReq::clear() { + lastAppliedLogEntry = uint64_t(0); +} +bool SyncStartReq::operator==(const SyncStartReq& rhs) const { + if ((uint64_t)this->lastAppliedLogEntry != (uint64_t)rhs.lastAppliedLogEntry) { return false; }; + return true; +} +std::ostream& operator<<(std::ostream& out, const SyncStartReq& x) { + out << "SyncStartReq(" << "LastAppliedLogEntry=" << x.lastAppliedLogEntry << ")"; + return out; +} + +void SyncStartResp::pack(BincodeBuf& buf) const { + buf.packScalar(snapshotLogEntry); + buf.packList(columnFamilies); + buf.packScalar(totalSizeEstimate); +} +void SyncStartResp::unpack(BincodeBuf& buf) { + snapshotLogEntry = buf.unpackScalar(); + buf.unpackList(columnFamilies); + totalSizeEstimate = buf.unpackScalar(); +} +void SyncStartResp::clear() { + snapshotLogEntry = uint64_t(0); + columnFamilies.clear(); + totalSizeEstimate = uint64_t(0); +} +bool SyncStartResp::operator==(const SyncStartResp& rhs) const { + if ((uint64_t)this->snapshotLogEntry != (uint64_t)rhs.snapshotLogEntry) { return false; }; + if (columnFamilies != rhs.columnFamilies) { return false; }; + if ((uint64_t)this->totalSizeEstimate != (uint64_t)rhs.totalSizeEstimate) { return false; }; + return true; +} +std::ostream& operator<<(std::ostream& out, const SyncStartResp& x) { + out << "SyncStartResp(" << "SnapshotLogEntry=" << x.snapshotLogEntry << ", " << "ColumnFamilies=" << x.columnFamilies << ", " << "TotalSizeEstimate=" << x.totalSizeEstimate << ")"; + return out; +} + +void SyncChunkReq::pack(BincodeBuf& buf) const { + buf.packScalar(cfIndex); + buf.packBytes(cfName); + buf.packBytes(keyStart); + buf.packScalar(maxSize); +} +void SyncChunkReq::unpack(BincodeBuf& buf) { + cfIndex = buf.unpackScalar(); + buf.unpackBytes(cfName); + buf.unpackBytes(keyStart); + maxSize = buf.unpackScalar(); +} +void SyncChunkReq::clear() { + cfIndex = uint32_t(0); + cfName.clear(); + keyStart.clear(); + maxSize = uint32_t(0); +} +bool SyncChunkReq::operator==(const SyncChunkReq& rhs) const { + if ((uint32_t)this->cfIndex != (uint32_t)rhs.cfIndex) { return false; }; + if (cfName != rhs.cfName) { return false; }; + if (keyStart != rhs.keyStart) { return false; }; + if ((uint32_t)this->maxSize != (uint32_t)rhs.maxSize) { return false; }; + return true; +} +std::ostream& operator<<(std::ostream& out, const SyncChunkReq& x) { + out << "SyncChunkReq(" << "CfIndex=" << x.cfIndex << ", " << "CfName=" << GoLangQuotedStringFmt(x.cfName.data(), x.cfName.size()) << ", " << "KeyStart=" << x.keyStart << ", " << "MaxSize=" << x.maxSize << ")"; + return out; +} + +void SyncChunkResp::pack(BincodeBuf& buf) const { + buf.packScalar(cfIndex); + buf.packBytes(cfName); + buf.packList(keys); + buf.packList(values); + buf.packBytes(nextKeyStart); + buf.packScalar(isLastCF); +} +void SyncChunkResp::unpack(BincodeBuf& buf) { + cfIndex = buf.unpackScalar(); + buf.unpackBytes(cfName); + buf.unpackList(keys); + buf.unpackList(values); + buf.unpackBytes(nextKeyStart); + isLastCF = buf.unpackScalar(); +} +void SyncChunkResp::clear() { + cfIndex = uint32_t(0); + cfName.clear(); + keys.clear(); + values.clear(); + nextKeyStart.clear(); + isLastCF = bool(0); +} +bool SyncChunkResp::operator==(const SyncChunkResp& rhs) const { + if ((uint32_t)this->cfIndex != (uint32_t)rhs.cfIndex) { return false; }; + if (cfName != rhs.cfName) { return false; }; + if (keys != rhs.keys) { return false; }; + if (values != rhs.values) { return false; }; + if (nextKeyStart != rhs.nextKeyStart) { return false; }; + if ((bool)this->isLastCF != (bool)rhs.isLastCF) { return false; }; + return true; +} +std::ostream& operator<<(std::ostream& out, const SyncChunkResp& x) { + out << "SyncChunkResp(" << "CfIndex=" << x.cfIndex << ", " << "CfName=" << GoLangQuotedStringFmt(x.cfName.data(), x.cfName.size()) << ", " << "Keys=" << x.keys << ", " << "Values=" << x.values << ", " << "NextKeyStart=" << x.nextKeyStart << ", " << "IsLastCF=" << x.isLastCF << ")"; + return out; +} + +void SyncCompleteReq::pack(BincodeBuf& buf) const { + buf.packScalar(success); +} +void SyncCompleteReq::unpack(BincodeBuf& buf) { + success = buf.unpackScalar(); +} +void SyncCompleteReq::clear() { + success = bool(0); +} +bool SyncCompleteReq::operator==(const SyncCompleteReq& rhs) const { + if ((bool)this->success != (bool)rhs.success) { return false; }; + return true; +} +std::ostream& operator<<(std::ostream& out, const SyncCompleteReq& x) { + out << "SyncCompleteReq(" << "Success=" << x.success << ")"; + return out; +} + +void SyncCompleteResp::pack(BincodeBuf& buf) const { +} +void SyncCompleteResp::unpack(BincodeBuf& buf) { +} +void SyncCompleteResp::clear() { +} +bool SyncCompleteResp::operator==(const SyncCompleteResp& rhs) const { + return true; +} +std::ostream& operator<<(std::ostream& out, const SyncCompleteResp& x) { + out << "SyncCompleteResp(" << ")"; + return out; +} + const LookupReq& ShardReqContainer::getLookup() const { ALWAYS_ASSERT(_kind == ShardMessageKind::LOOKUP, "%s != %s", _kind, ShardMessageKind::LOOKUP); return std::get<0>(_data); @@ -10939,6 +11129,323 @@ std::ostream& operator<<(std::ostream& out, const LogRespContainer& x) { return out; } +const SyncStartReq& SyncReqContainer::getSyncStart() const { + ALWAYS_ASSERT(_kind == SyncMessageKind::SYNC_START, "%s != %s", _kind, SyncMessageKind::SYNC_START); + return std::get<0>(_data); +} +SyncStartReq& SyncReqContainer::setSyncStart() { + _kind = SyncMessageKind::SYNC_START; + auto& x = _data.emplace<0>(); + return x; +} +const SyncChunkReq& SyncReqContainer::getSyncChunk() const { + ALWAYS_ASSERT(_kind == SyncMessageKind::SYNC_CHUNK, "%s != %s", _kind, SyncMessageKind::SYNC_CHUNK); + return std::get<1>(_data); +} +SyncChunkReq& SyncReqContainer::setSyncChunk() { + _kind = SyncMessageKind::SYNC_CHUNK; + auto& x = _data.emplace<1>(); + return x; +} +const SyncCompleteReq& SyncReqContainer::getSyncComplete() const { + ALWAYS_ASSERT(_kind == SyncMessageKind::SYNC_COMPLETE, "%s != %s", _kind, SyncMessageKind::SYNC_COMPLETE); + return std::get<2>(_data); +} +SyncCompleteReq& SyncReqContainer::setSyncComplete() { + _kind = SyncMessageKind::SYNC_COMPLETE; + auto& x = _data.emplace<2>(); + return x; +} +SyncReqContainer::SyncReqContainer() { + clear(); +} + +SyncReqContainer::SyncReqContainer(const SyncReqContainer& other) { + *this = other; +} + +SyncReqContainer::SyncReqContainer(SyncReqContainer&& other) { + _data = std::move(other._data); + _kind = other._kind; + other._kind = SyncMessageKind::EMPTY; +} + +void SyncReqContainer::operator=(const SyncReqContainer& other) { + if (other.kind() == SyncMessageKind::EMPTY) { clear(); return; } + switch (other.kind()) { + case SyncMessageKind::SYNC_START: + setSyncStart() = other.getSyncStart(); + break; + case SyncMessageKind::SYNC_CHUNK: + setSyncChunk() = other.getSyncChunk(); + break; + case SyncMessageKind::SYNC_COMPLETE: + setSyncComplete() = other.getSyncComplete(); + break; + default: + throw TERN_EXCEPTION("bad SyncMessageKind kind %s", other.kind()); + } +} + +void SyncReqContainer::operator=(SyncReqContainer&& other) { + _data = std::move(other._data); + _kind = other._kind; + other._kind = SyncMessageKind::EMPTY; +} + +size_t SyncReqContainer::packedSize() const { + switch (_kind) { + case SyncMessageKind::SYNC_START: + return sizeof(SyncMessageKind) + std::get<0>(_data).packedSize(); + case SyncMessageKind::SYNC_CHUNK: + return sizeof(SyncMessageKind) + std::get<1>(_data).packedSize(); + case SyncMessageKind::SYNC_COMPLETE: + return sizeof(SyncMessageKind) + std::get<2>(_data).packedSize(); + default: + throw TERN_EXCEPTION("bad SyncMessageKind kind %s", _kind); + } +} + +void SyncReqContainer::pack(BincodeBuf& buf) const { + buf.packScalar(_kind); + switch (_kind) { + case SyncMessageKind::SYNC_START: + std::get<0>(_data).pack(buf); + break; + case SyncMessageKind::SYNC_CHUNK: + std::get<1>(_data).pack(buf); + break; + case SyncMessageKind::SYNC_COMPLETE: + std::get<2>(_data).pack(buf); + break; + default: + throw TERN_EXCEPTION("bad SyncMessageKind kind %s", _kind); + } +} + +void SyncReqContainer::unpack(BincodeBuf& buf) { + _kind = buf.unpackScalar(); + switch (_kind) { + case SyncMessageKind::SYNC_START: + _data.emplace<0>().unpack(buf); + break; + case SyncMessageKind::SYNC_CHUNK: + _data.emplace<1>().unpack(buf); + break; + case SyncMessageKind::SYNC_COMPLETE: + _data.emplace<2>().unpack(buf); + break; + default: + throw BINCODE_EXCEPTION("bad SyncMessageKind kind %s", _kind); + } +} + +bool SyncReqContainer::operator==(const SyncReqContainer& other) const { + if (_kind != other.kind()) { return false; } + if (_kind == SyncMessageKind::EMPTY) { return true; } + switch (_kind) { + case SyncMessageKind::SYNC_START: + return getSyncStart() == other.getSyncStart(); + case SyncMessageKind::SYNC_CHUNK: + return getSyncChunk() == other.getSyncChunk(); + case SyncMessageKind::SYNC_COMPLETE: + return getSyncComplete() == other.getSyncComplete(); + default: + throw BINCODE_EXCEPTION("bad SyncMessageKind kind %s", _kind); + } +} + +std::ostream& operator<<(std::ostream& out, const SyncReqContainer& x) { + switch (x.kind()) { + case SyncMessageKind::SYNC_START: + out << x.getSyncStart(); + break; + case SyncMessageKind::SYNC_CHUNK: + out << x.getSyncChunk(); + break; + case SyncMessageKind::SYNC_COMPLETE: + out << x.getSyncComplete(); + break; + case SyncMessageKind::EMPTY: + out << "EMPTY"; + break; + default: + throw TERN_EXCEPTION("bad SyncMessageKind kind %s", x.kind()); + } + return out; +} + +const TernError& SyncRespContainer::getError() const { + ALWAYS_ASSERT(_kind == SyncMessageKind::ERROR, "%s != %s", _kind, SyncMessageKind::ERROR); + return std::get<0>(_data); +} +TernError& SyncRespContainer::setError() { + _kind = SyncMessageKind::ERROR; + auto& x = _data.emplace<0>(); + return x; +} +const SyncStartResp& SyncRespContainer::getSyncStart() const { + ALWAYS_ASSERT(_kind == SyncMessageKind::SYNC_START, "%s != %s", _kind, SyncMessageKind::SYNC_START); + return std::get<1>(_data); +} +SyncStartResp& SyncRespContainer::setSyncStart() { + _kind = SyncMessageKind::SYNC_START; + auto& x = _data.emplace<1>(); + return x; +} +const SyncChunkResp& SyncRespContainer::getSyncChunk() const { + ALWAYS_ASSERT(_kind == SyncMessageKind::SYNC_CHUNK, "%s != %s", _kind, SyncMessageKind::SYNC_CHUNK); + return std::get<2>(_data); +} +SyncChunkResp& SyncRespContainer::setSyncChunk() { + _kind = SyncMessageKind::SYNC_CHUNK; + auto& x = _data.emplace<2>(); + return x; +} +const SyncCompleteResp& SyncRespContainer::getSyncComplete() const { + ALWAYS_ASSERT(_kind == SyncMessageKind::SYNC_COMPLETE, "%s != %s", _kind, SyncMessageKind::SYNC_COMPLETE); + return std::get<3>(_data); +} +SyncCompleteResp& SyncRespContainer::setSyncComplete() { + _kind = SyncMessageKind::SYNC_COMPLETE; + auto& x = _data.emplace<3>(); + return x; +} +SyncRespContainer::SyncRespContainer() { + clear(); +} + +SyncRespContainer::SyncRespContainer(const SyncRespContainer& other) { + *this = other; +} + +SyncRespContainer::SyncRespContainer(SyncRespContainer&& other) { + _data = std::move(other._data); + _kind = other._kind; + other._kind = SyncMessageKind::EMPTY; +} + +void SyncRespContainer::operator=(const SyncRespContainer& other) { + if (other.kind() == SyncMessageKind::EMPTY) { clear(); return; } + switch (other.kind()) { + case SyncMessageKind::ERROR: + setError() = other.getError(); + break; + case SyncMessageKind::SYNC_START: + setSyncStart() = other.getSyncStart(); + break; + case SyncMessageKind::SYNC_CHUNK: + setSyncChunk() = other.getSyncChunk(); + break; + case SyncMessageKind::SYNC_COMPLETE: + setSyncComplete() = other.getSyncComplete(); + break; + default: + throw TERN_EXCEPTION("bad SyncMessageKind kind %s", other.kind()); + } +} + +void SyncRespContainer::operator=(SyncRespContainer&& other) { + _data = std::move(other._data); + _kind = other._kind; + other._kind = SyncMessageKind::EMPTY; +} + +size_t SyncRespContainer::packedSize() const { + switch (_kind) { + case SyncMessageKind::ERROR: + return sizeof(SyncMessageKind) + sizeof(TernError); + case SyncMessageKind::SYNC_START: + return sizeof(SyncMessageKind) + std::get<1>(_data).packedSize(); + case SyncMessageKind::SYNC_CHUNK: + return sizeof(SyncMessageKind) + std::get<2>(_data).packedSize(); + case SyncMessageKind::SYNC_COMPLETE: + return sizeof(SyncMessageKind) + std::get<3>(_data).packedSize(); + default: + throw TERN_EXCEPTION("bad SyncMessageKind kind %s", _kind); + } +} + +void SyncRespContainer::pack(BincodeBuf& buf) const { + buf.packScalar(_kind); + switch (_kind) { + case SyncMessageKind::ERROR: + buf.packScalar(std::get<0>(_data)); + break; + case SyncMessageKind::SYNC_START: + std::get<1>(_data).pack(buf); + break; + case SyncMessageKind::SYNC_CHUNK: + std::get<2>(_data).pack(buf); + break; + case SyncMessageKind::SYNC_COMPLETE: + std::get<3>(_data).pack(buf); + break; + default: + throw TERN_EXCEPTION("bad SyncMessageKind kind %s", _kind); + } +} + +void SyncRespContainer::unpack(BincodeBuf& buf) { + _kind = buf.unpackScalar(); + switch (_kind) { + case SyncMessageKind::ERROR: + _data.emplace<0>(buf.unpackScalar()); + break; + case SyncMessageKind::SYNC_START: + _data.emplace<1>().unpack(buf); + break; + case SyncMessageKind::SYNC_CHUNK: + _data.emplace<2>().unpack(buf); + break; + case SyncMessageKind::SYNC_COMPLETE: + _data.emplace<3>().unpack(buf); + break; + default: + throw BINCODE_EXCEPTION("bad SyncMessageKind kind %s", _kind); + } +} + +bool SyncRespContainer::operator==(const SyncRespContainer& other) const { + if (_kind != other.kind()) { return false; } + if (_kind == SyncMessageKind::EMPTY) { return true; } + switch (_kind) { + case SyncMessageKind::ERROR: + return getError() == other.getError(); + case SyncMessageKind::SYNC_START: + return getSyncStart() == other.getSyncStart(); + case SyncMessageKind::SYNC_CHUNK: + return getSyncChunk() == other.getSyncChunk(); + case SyncMessageKind::SYNC_COMPLETE: + return getSyncComplete() == other.getSyncComplete(); + default: + throw BINCODE_EXCEPTION("bad SyncMessageKind kind %s", _kind); + } +} + +std::ostream& operator<<(std::ostream& out, const SyncRespContainer& x) { + switch (x.kind()) { + case SyncMessageKind::ERROR: + out << x.getError(); + break; + case SyncMessageKind::SYNC_START: + out << x.getSyncStart(); + break; + case SyncMessageKind::SYNC_CHUNK: + out << x.getSyncChunk(); + break; + case SyncMessageKind::SYNC_COMPLETE: + out << x.getSyncComplete(); + break; + case SyncMessageKind::EMPTY: + out << "EMPTY"; + break; + default: + throw TERN_EXCEPTION("bad SyncMessageKind kind %s", x.kind()); + } + return out; +} + std::ostream& operator<<(std::ostream& out, ShardLogEntryKind err) { switch (err) { case ShardLogEntryKind::CONSTRUCT_FILE: diff --git a/cpp/core/MsgsGen.hpp b/cpp/core/MsgsGen.hpp index 6bb1e227..662fc3a6 100644 --- a/cpp/core/MsgsGen.hpp +++ b/cpp/core/MsgsGen.hpp @@ -461,6 +461,24 @@ constexpr int maxLogMessageKind = 7; std::ostream& operator<<(std::ostream& out, LogMessageKind kind); +enum class SyncMessageKind : uint8_t { + ERROR = 0, + SYNC_START = 1, + SYNC_CHUNK = 2, + SYNC_COMPLETE = 3, + EMPTY = 255, +}; + +const std::vector allSyncMessageKind { + SyncMessageKind::SYNC_START, + SyncMessageKind::SYNC_CHUNK, + SyncMessageKind::SYNC_COMPLETE, +}; + +constexpr int maxSyncMessageKind = 3; + +std::ostream& operator<<(std::ostream& out, SyncMessageKind kind); + struct FailureDomain { BincodeFixedBytes<16> name; @@ -1535,6 +1553,27 @@ struct LocationInfo { std::ostream& operator<<(std::ostream& out, const LocationInfo& x); +struct ColumnFamilyInfo { + uint32_t index; + BincodeBytes name; + + static constexpr uint16_t STATIC_SIZE = 4 + BincodeBytes::STATIC_SIZE; // index + name + + ColumnFamilyInfo() { clear(); } + size_t packedSize() const { + size_t _size = 0; + _size += 4; // index + _size += name.packedSize(); // name + return _size; + } + void pack(BincodeBuf& buf) const; + void unpack(BincodeBuf& buf); + void clear(); + bool operator==(const ColumnFamilyInfo&rhs) const; +}; + +std::ostream& operator<<(std::ostream& out, const ColumnFamilyInfo& x); + struct LookupReq { InodeId dirId; BincodeBytes name; @@ -5479,6 +5518,138 @@ struct LogRecoveryWriteResp { std::ostream& operator<<(std::ostream& out, const LogRecoveryWriteResp& x); +struct SyncStartReq { + uint64_t lastAppliedLogEntry; + + static constexpr uint16_t STATIC_SIZE = 8; // lastAppliedLogEntry + + SyncStartReq() { clear(); } + size_t packedSize() const { + size_t _size = 0; + _size += 8; // lastAppliedLogEntry + return _size; + } + void pack(BincodeBuf& buf) const; + void unpack(BincodeBuf& buf); + void clear(); + bool operator==(const SyncStartReq&rhs) const; +}; + +std::ostream& operator<<(std::ostream& out, const SyncStartReq& x); + +struct SyncStartResp { + uint64_t snapshotLogEntry; + BincodeList columnFamilies; + uint64_t totalSizeEstimate; + + static constexpr uint16_t STATIC_SIZE = 8 + BincodeList::STATIC_SIZE + 8; // snapshotLogEntry + columnFamilies + totalSizeEstimate + + SyncStartResp() { clear(); } + size_t packedSize() const { + size_t _size = 0; + _size += 8; // snapshotLogEntry + _size += columnFamilies.packedSize(); // columnFamilies + _size += 8; // totalSizeEstimate + return _size; + } + void pack(BincodeBuf& buf) const; + void unpack(BincodeBuf& buf); + void clear(); + bool operator==(const SyncStartResp&rhs) const; +}; + +std::ostream& operator<<(std::ostream& out, const SyncStartResp& x); + +struct SyncChunkReq { + uint32_t cfIndex; + BincodeBytes cfName; + BincodeBytes keyStart; + uint32_t maxSize; + + static constexpr uint16_t STATIC_SIZE = 4 + BincodeBytes::STATIC_SIZE + BincodeBytes::STATIC_SIZE + 4; // cfIndex + cfName + keyStart + maxSize + + SyncChunkReq() { clear(); } + size_t packedSize() const { + size_t _size = 0; + _size += 4; // cfIndex + _size += cfName.packedSize(); // cfName + _size += keyStart.packedSize(); // keyStart + _size += 4; // maxSize + return _size; + } + void pack(BincodeBuf& buf) const; + void unpack(BincodeBuf& buf); + void clear(); + bool operator==(const SyncChunkReq&rhs) const; +}; + +std::ostream& operator<<(std::ostream& out, const SyncChunkReq& x); + +struct SyncChunkResp { + uint32_t cfIndex; + BincodeBytes cfName; + BincodeList keys; + BincodeList values; + BincodeBytes nextKeyStart; + bool isLastCF; + + static constexpr uint16_t STATIC_SIZE = 4 + BincodeBytes::STATIC_SIZE + BincodeList::STATIC_SIZE + BincodeList::STATIC_SIZE + BincodeBytes::STATIC_SIZE + 1; // cfIndex + cfName + keys + values + nextKeyStart + isLastCF + + SyncChunkResp() { clear(); } + size_t packedSize() const { + size_t _size = 0; + _size += 4; // cfIndex + _size += cfName.packedSize(); // cfName + _size += keys.packedSize(); // keys + _size += values.packedSize(); // values + _size += nextKeyStart.packedSize(); // nextKeyStart + _size += 1; // isLastCF + return _size; + } + void pack(BincodeBuf& buf) const; + void unpack(BincodeBuf& buf); + void clear(); + bool operator==(const SyncChunkResp&rhs) const; +}; + +std::ostream& operator<<(std::ostream& out, const SyncChunkResp& x); + +struct SyncCompleteReq { + bool success; + + static constexpr uint16_t STATIC_SIZE = 1; // success + + SyncCompleteReq() { clear(); } + size_t packedSize() const { + size_t _size = 0; + _size += 1; // success + return _size; + } + void pack(BincodeBuf& buf) const; + void unpack(BincodeBuf& buf); + void clear(); + bool operator==(const SyncCompleteReq&rhs) const; +}; + +std::ostream& operator<<(std::ostream& out, const SyncCompleteReq& x); + +struct SyncCompleteResp { + + static constexpr uint16_t STATIC_SIZE = 0; // + + SyncCompleteResp() { clear(); } + size_t packedSize() const { + size_t _size = 0; + return _size; + } + void pack(BincodeBuf& buf) const; + void unpack(BincodeBuf& buf); + void clear(); + bool operator==(const SyncCompleteResp&rhs) const; +}; + +std::ostream& operator<<(std::ostream& out, const SyncCompleteResp& x); + struct ShardReqContainer { private: static constexpr std::array _staticSizes = {LookupReq::STATIC_SIZE, StatFileReq::STATIC_SIZE, StatDirectoryReq::STATIC_SIZE, ReadDirReq::STATIC_SIZE, ConstructFileReq::STATIC_SIZE, AddSpanInitiateReq::STATIC_SIZE, AddSpanCertifyReq::STATIC_SIZE, LinkFileReq::STATIC_SIZE, SoftUnlinkFileReq::STATIC_SIZE, LocalFileSpansReq::STATIC_SIZE, SameDirectoryRenameReq::STATIC_SIZE, AddInlineSpanReq::STATIC_SIZE, SetTimeReq::STATIC_SIZE, FullReadDirReq::STATIC_SIZE, MoveSpanReq::STATIC_SIZE, RemoveNonOwnedEdgeReq::STATIC_SIZE, SameShardHardFileUnlinkReq::STATIC_SIZE, StatTransientFileReq::STATIC_SIZE, ShardSnapshotReq::STATIC_SIZE, FileSpansReq::STATIC_SIZE, AddSpanLocationReq::STATIC_SIZE, ScrapTransientFileReq::STATIC_SIZE, SetDirectoryInfoReq::STATIC_SIZE, GetLinkEntriesReq::STATIC_SIZE, WaitStateAppliedReq::STATIC_SIZE, VisitDirectoriesReq::STATIC_SIZE, VisitFilesReq::STATIC_SIZE, VisitTransientFilesReq::STATIC_SIZE, RemoveSpanInitiateReq::STATIC_SIZE, RemoveSpanCertifyReq::STATIC_SIZE, SwapBlocksReq::STATIC_SIZE, BlockServiceFilesReq::STATIC_SIZE, RemoveInodeReq::STATIC_SIZE, AddSpanInitiateWithReferenceReq::STATIC_SIZE, RemoveZeroBlockServiceFilesReq::STATIC_SIZE, SwapSpansReq::STATIC_SIZE, SameDirectoryRenameSnapshotReq::STATIC_SIZE, AddSpanAtLocationInitiateReq::STATIC_SIZE, CreateDirectoryInodeReq::STATIC_SIZE, SetDirectoryOwnerReq::STATIC_SIZE, RemoveDirectoryOwnerReq::STATIC_SIZE, CreateLockedCurrentEdgeReq::STATIC_SIZE, LockCurrentEdgeReq::STATIC_SIZE, UnlockCurrentEdgeReq::STATIC_SIZE, RemoveOwnedSnapshotFileEdgeReq::STATIC_SIZE, MakeFileTransientReq::STATIC_SIZE}; @@ -6055,6 +6226,72 @@ struct LogRespContainer { std::ostream& operator<<(std::ostream& out, const LogRespContainer& x); +struct SyncReqContainer { +private: + static constexpr std::array _staticSizes = {SyncStartReq::STATIC_SIZE, SyncChunkReq::STATIC_SIZE, SyncCompleteReq::STATIC_SIZE}; + SyncMessageKind _kind = SyncMessageKind::EMPTY; + std::variant _data; +public: + SyncReqContainer(); + SyncReqContainer(const SyncReqContainer& other); + SyncReqContainer(SyncReqContainer&& other); + void operator=(const SyncReqContainer& other); + void operator=(SyncReqContainer&& other); + + SyncMessageKind kind() const { return _kind; } + + const SyncStartReq& getSyncStart() const; + SyncStartReq& setSyncStart(); + const SyncChunkReq& getSyncChunk() const; + SyncChunkReq& setSyncChunk(); + const SyncCompleteReq& getSyncComplete() const; + SyncCompleteReq& setSyncComplete(); + + void clear() { _kind = SyncMessageKind::EMPTY; }; + + static constexpr size_t STATIC_SIZE = sizeof(SyncMessageKind) + *std::max_element(_staticSizes.begin(), _staticSizes.end()); + size_t packedSize() const; + void pack(BincodeBuf& buf) const; + void unpack(BincodeBuf& buf); + bool operator==(const SyncReqContainer& other) const; +}; + +std::ostream& operator<<(std::ostream& out, const SyncReqContainer& x); + +struct SyncRespContainer { +private: + static constexpr std::array _staticSizes = {sizeof(TernError), SyncStartResp::STATIC_SIZE, SyncChunkResp::STATIC_SIZE, SyncCompleteResp::STATIC_SIZE}; + SyncMessageKind _kind = SyncMessageKind::EMPTY; + std::variant _data; +public: + SyncRespContainer(); + SyncRespContainer(const SyncRespContainer& other); + SyncRespContainer(SyncRespContainer&& other); + void operator=(const SyncRespContainer& other); + void operator=(SyncRespContainer&& other); + + SyncMessageKind kind() const { return _kind; } + + const TernError& getError() const; + TernError& setError(); + const SyncStartResp& getSyncStart() const; + SyncStartResp& setSyncStart(); + const SyncChunkResp& getSyncChunk() const; + SyncChunkResp& setSyncChunk(); + const SyncCompleteResp& getSyncComplete() const; + SyncCompleteResp& setSyncComplete(); + + void clear() { _kind = SyncMessageKind::EMPTY; }; + + static constexpr size_t STATIC_SIZE = sizeof(SyncMessageKind) + *std::max_element(_staticSizes.begin(), _staticSizes.end()); + size_t packedSize() const; + void pack(BincodeBuf& buf) const; + void unpack(BincodeBuf& buf); + bool operator==(const SyncRespContainer& other) const; +}; + +std::ostream& operator<<(std::ostream& out, const SyncRespContainer& x); + enum class ShardLogEntryKind : uint16_t { CONSTRUCT_FILE = 1, LINK_FILE = 2, diff --git a/cpp/core/Protocol.hpp b/cpp/core/Protocol.hpp index 95696ac9..def0784d 100644 --- a/cpp/core/Protocol.hpp +++ b/cpp/core/Protocol.hpp @@ -58,6 +58,14 @@ constexpr uint32_t LOG_REQ_PROTOCOL_VERSION = 0x474f4c; // '1474f4c' constexpr uint32_t LOG_RESP_PROTOCOL_VERSION = 0x1474f4c; +// >>> format(struct.unpack('>> format(struct.unpack('; using ProxyShardReqMsg = SignedProtocolMessage; using ProxyShardRespMsg = SignedProtocolMessage; +using SyncReqMsg = ProtocolMessage; +using SyncRespMsg = ProtocolMessage; diff --git a/cpp/shard/CMakeLists.txt b/cpp/shard/CMakeLists.txt index ed84f2fe..cc5ea73c 100644 --- a/cpp/shard/CMakeLists.txt +++ b/cpp/shard/CMakeLists.txt @@ -4,7 +4,7 @@ include_directories(${ternfs_SOURCE_DIR}/core ${ternfs_SOURCE_DIR}/crc32c) -add_library(shard Shard.cpp Shard.hpp ShardDB.cpp ShardDB.hpp ShardDBData.cpp ShardDBData.hpp BlockServicesCacheDB.hpp BlockServicesCacheDB.cpp) +add_library(shard Shard.cpp Shard.hpp ShardDB.cpp ShardDB.hpp ShardDBData.cpp ShardDBData.hpp BlockServicesCacheDB.hpp BlockServicesCacheDB.cpp ShardSyncServer.cpp ShardSyncServer.hpp) target_link_libraries(shard PRIVATE core) add_executable(ternshard ternshard.cpp) diff --git a/cpp/shard/ShardSyncServer.cpp b/cpp/shard/ShardSyncServer.cpp new file mode 100644 index 00000000..8ba70cac --- /dev/null +++ b/cpp/shard/ShardSyncServer.cpp @@ -0,0 +1,311 @@ +// Copyright 2025 XTX Markets Technologies Limited +// +// SPDX-License-Identifier: GPL-2.0-or-later + +#include "ShardSyncServer.hpp" + +#include +#include +#include +#include +#include + +#include "Assert.hpp" +#include "Bincode.hpp" +#include "Loop.hpp" + +ShardSyncServer::~ShardSyncServer() { + for (auto& [fd, client] : _clients) { + close(fd); + } + if (_listenFd != -1) { + close(_listenFd); + } + if (_epollFd != -1) { + close(_epollFd); + } +} + +bool ShardSyncServer::init() { + _epollFd = epoll_create1(0); + if (_epollFd == -1) { + LOG_ERROR(_env, "Failed to create epoll instance: %s", strerror(errno)); + return false; + } + + if (_options.bindAddress.ip.data[0] == 0) { + LOG_INFO(_env, "Sync server not configured (no bind address)"); + return true; + } + + _listenFd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0); + if (_listenFd == -1) { + LOG_ERROR(_env, "Failed to create sync server socket: %s", strerror(errno)); + return false; + } + + int opt = 1; + setsockopt(_listenFd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); + + sockaddr_in sockAddr{}; + _options.bindAddress.toSockAddrIn(sockAddr); + + if (bind(_listenFd, (sockaddr*)&sockAddr, sizeof(sockAddr)) == -1) { + LOG_ERROR(_env, "Failed to bind sync server socket: %s", strerror(errno)); + return false; + } + + if (listen(_listenFd, SOMAXCONN) == -1) { + LOG_ERROR(_env, "Failed to listen on sync server socket: %s", strerror(errno)); + return false; + } + + epoll_event event{}; + event.events = EPOLLIN; + event.data.fd = _listenFd; + if (epoll_ctl(_epollFd, EPOLL_CTL_ADD, _listenFd, &event) == -1) { + LOG_ERROR(_env, "Failed to register sync server listen socket for epoll: %s", strerror(errno)); + return false; + } + + LOG_INFO(_env, "Sync server initialized on %s", _options.bindAddress); + return true; +} + +bool ShardSyncServer::receiveMessages(Duration timeout) { + ALWAYS_ASSERT(_receivedRequests.empty()); + + if (_epollFd == -1) { + return false; + } + + int numEvents = Loop::epollWait(_epollFd, &_events[0], _events.size(), timeout); + LOG_TRACE(_env, "Sync server epoll returned %s events", numEvents); + + if (numEvents == -1) { + if (errno != EINTR) { + LOG_ERROR(_env, "Sync server epoll_wait error: %s", strerror(errno)); + } + return false; + } + + for (int i = 0; i < numEvents; ++i) { + if (_events[i].data.fd == _listenFd) { + _acceptConnection(); + } else if (_events[i].events & (EPOLLHUP | EPOLLRDHUP | EPOLLERR)) { + _removeClient(_events[i].data.fd); + } else if (_events[i].events & EPOLLIN) { + _readClient(_events[i].data.fd); + } else if (_events[i].events & EPOLLOUT) { + _writeClient(_events[i].data.fd); + } + } + + return true; +} + +void ShardSyncServer::sendSyncResponses(std::vector& responses) { + for (auto& response : responses) { + auto inFlightIt = _inFlightRequests.find(response.requestId); + if (inFlightIt == _inFlightRequests.end()) { + LOG_TRACE(_env, "Dropping sync response for requestId %s as request was dropped", response.requestId); + continue; + } + int fd = inFlightIt->second; + _inFlightRequests.erase(inFlightIt); + if (response.resp.kind() == SyncMessageKind::EMPTY) { + LOG_TRACE(_env, "Dropping sync connection with fd %s due to empty response", fd); + _removeClient(fd); + continue; + } + _sendResponse(fd, response.resp); + } +} + +void ShardSyncServer::_acceptConnection() { + sockaddr_in clientAddr{}; + socklen_t clientAddrLen = sizeof(clientAddr); + + int clientFd = accept4(_listenFd, (sockaddr*)&clientAddr, &clientAddrLen, SOCK_NONBLOCK); + + if (clientFd == -1) { + LOG_ERROR(_env, "Failed to accept sync connection: %s", strerror(errno)); + return; + } + + if (_clients.size() >= _options.maxConnections) { + LOG_DEBUG(_env, "Dropping sync connection as we reached connection limit"); + close(clientFd); + return; + } + + auto client_it = _clients.emplace(clientFd, Client{clientFd, {}, {}, ternNow(), 0, 0}).first; + client_it->second.readBuffer.resize(MESSAGE_HEADER_SIZE); + + epoll_event event{}; + event.events = EPOLLIN | EPOLLHUP | EPOLLERR | EPOLLRDHUP; + event.data.fd = clientFd; + + if (epoll_ctl(_epollFd, EPOLL_CTL_ADD, clientFd, &event) == -1) { + LOG_ERROR(_env, "Failed to add sync client to epoll: %s", strerror(errno)); + _removeClient(clientFd); + return; + } + + LOG_TRACE(_env, "Accepted sync connection on fd %s", clientFd); +} + +void ShardSyncServer::_readClient(int fd) { + auto it = _clients.find(fd); + ALWAYS_ASSERT(it != _clients.end()); + + Client& client = it->second; + client.lastActive = ternNow(); + size_t bytesToRead = client.readBuffer.size() - client.messageBytesProcessed; + ssize_t bytesRead; + + while (bytesToRead > 0 && + (bytesRead = read(fd, &client.readBuffer[client.messageBytesProcessed], bytesToRead)) > 0) { + + LOG_TRACE(_env, "Received %s bytes from sync client", bytesRead); + bytesToRead -= bytesRead; + client.messageBytesProcessed += bytesRead; + + if (bytesToRead > 0) { + continue; + } + + if (client.messageBytesProcessed == MESSAGE_HEADER_SIZE) { + BincodeBuf buf{&client.readBuffer[0], MESSAGE_HEADER_SIZE}; + uint32_t protocol = buf.unpackScalar(); + if (protocol != SYNC_REQ_PROTOCOL_VERSION) { + LOG_ERROR(_env, "Invalid sync protocol version: %s", protocol); + _removeClient(fd); + return; + } + uint32_t len = buf.unpackScalar(); + buf.ensureFinished(); + LOG_TRACE(_env, "Received sync message of length %s", len); + bytesToRead = len; + client.readBuffer.resize(len + MESSAGE_HEADER_SIZE); + } else { + LOG_TRACE(_env, "Unpacking sync ReadBuffer size %s", client.readBuffer.size()); + BincodeBuf buf{&client.readBuffer[MESSAGE_HEADER_SIZE], client.readBuffer.size() - MESSAGE_HEADER_SIZE}; + auto& req = _receivedRequests.emplace_back(); + + try { + req.req.unpack(buf); + buf.ensureFinished(); + LOG_TRACE(_env, "Received sync request on fd %s, kind %s", fd, req.req.kind()); + + // Remove read event from epoll after receiving complete request + epoll_event event{}; + event.events = EPOLLHUP | EPOLLERR | EPOLLRDHUP; + event.data.fd = fd; + if (epoll_ctl(_epollFd, EPOLL_CTL_MOD, fd, &event) == -1) { + LOG_ERROR(_env, "Failed to modify sync client epoll event: %s", strerror(errno)); + _receivedRequests.pop_back(); + _removeClient(fd); + return; + } + } catch (const BincodeException& err) { + LOG_ERROR(_env, "Could not parse SyncReq: %s", err.what()); + _receivedRequests.pop_back(); + _removeClient(fd); + return; + } + + req.requestId = ++_lastRequestId; + client.readBuffer.clear(); + client.messageBytesProcessed = 0; + client.inFlightRequestId = req.requestId; + _inFlightRequests.emplace(req.requestId, fd); + } + } + + if (bytesRead == -1 && errno != EAGAIN && errno != EWOULDBLOCK) { + LOG_DEBUG(_env, "Error reading from sync client: %s", strerror(errno)); + _removeClient(fd); + } + + if (bytesRead == 0) { + _removeClient(fd); + } +} + +void ShardSyncServer::_removeClient(int fd) { + auto it = _clients.find(fd); + ALWAYS_ASSERT(it != _clients.end()); + + epoll_ctl(_epollFd, EPOLL_CTL_DEL, fd, nullptr); + close(fd); + if (it->second.inFlightRequestId != 0) { + _inFlightRequests.erase(it->second.inFlightRequestId); + } + _clients.erase(it); + LOG_TRACE(_env, "Removed sync client %s", fd); +} + +void ShardSyncServer::_sendResponse(int fd, SyncRespContainer& resp) { + LOG_TRACE(_env, "Sending sync response to client %s, kind %s", fd, resp.kind()); + auto it = _clients.find(fd); + ALWAYS_ASSERT(it != _clients.end()); + auto& client = it->second; + ALWAYS_ASSERT(client.writeBuffer.empty()); + ALWAYS_ASSERT(client.readBuffer.empty()); + ALWAYS_ASSERT(client.messageBytesProcessed == 0); + + uint32_t len = resp.packedSize(); + client.writeBuffer.resize(len + MESSAGE_HEADER_SIZE); + BincodeBuf buf(client.writeBuffer); + buf.packScalar(SYNC_RESP_PROTOCOL_VERSION); + buf.packScalar(len); + resp.pack(buf); + buf.ensureFinished(); + client.inFlightRequestId = 0; + _writeClient(fd, true); +} + +void ShardSyncServer::_writeClient(int fd, bool registerEpoll) { + auto it = _clients.find(fd); + ALWAYS_ASSERT(it != _clients.end()); + auto& client = it->second; + client.lastActive = ternNow(); + ssize_t bytesToWrite = client.writeBuffer.size() - client.messageBytesProcessed; + ssize_t bytesWritten = 0; + LOG_TRACE(_env, "Writing to sync client %s, %s bytes left", fd, bytesToWrite); + + while (bytesToWrite > 0 && + (bytesWritten = write(fd, &client.writeBuffer[client.messageBytesProcessed], bytesToWrite)) > 0) { + LOG_TRACE(_env, "Sent %s bytes to sync client", bytesWritten); + client.messageBytesProcessed += bytesWritten; + bytesToWrite -= bytesWritten; + } + + LOG_TRACE(_env, "Finished writing to sync client %s, %s bytes left", fd, bytesToWrite); + + if (bytesToWrite > 0 && registerEpoll) { + struct epoll_event ev; + ev.events = EPOLLOUT | EPOLLHUP | EPOLLERR | EPOLLRDHUP; + ev.data.fd = fd; + if (epoll_ctl(_epollFd, EPOLL_CTL_MOD, fd, &ev) == -1) { + LOG_ERROR(_env, "Failed to modify epoll for sync client %s", fd); + _removeClient(fd); + return; + } + } + + if (bytesToWrite == 0) { + struct epoll_event ev; + ev.events = EPOLLIN | EPOLLHUP | EPOLLERR | EPOLLRDHUP; + ev.data.fd = fd; + client.messageBytesProcessed = 0; + client.readBuffer.resize(MESSAGE_HEADER_SIZE); + client.writeBuffer.clear(); + if (epoll_ctl(_epollFd, EPOLL_CTL_MOD, fd, &ev) == -1) { + LOG_ERROR(_env, "Failed to modify epoll for sync client %s", fd); + _removeClient(fd); + return; + } + } +} diff --git a/cpp/shard/ShardSyncServer.hpp b/cpp/shard/ShardSyncServer.hpp new file mode 100644 index 00000000..d7efa6f4 --- /dev/null +++ b/cpp/shard/ShardSyncServer.hpp @@ -0,0 +1,88 @@ +// Copyright 2025 XTX Markets Technologies Limited +// +// SPDX-License-Identifier: GPL-2.0-or-later + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "Env.hpp" +#include "MsgsGen.hpp" +#include "Protocol.hpp" +#include "Time.hpp" + +struct SyncRequest { + uint64_t requestId; + SyncReqContainer req; +}; + +struct SyncResponse { + uint64_t requestId; + SyncRespContainer resp; +}; + +struct ShardSyncServerOptions { + IpPort bindAddress; + uint32_t maxConnections = 16; +}; + +class ShardSyncServer { +public: + ShardSyncServer(const ShardSyncServerOptions& options, Env& env) : + _options(options), + _env(env), + _lastRequestId(0) + {} + + virtual ~ShardSyncServer(); + + bool init(); + + inline const IpPort& boundAddress() const { return _options.bindAddress; } + + // Returns true if poll returned events, false on error/timeout + bool receiveMessages(Duration timeout); + + inline std::vector& receivedSyncRequests() { return _receivedRequests; } + + void sendSyncResponses(std::vector& responses); + +private: + static constexpr size_t MESSAGE_HEADER_SIZE = 8; + static constexpr size_t MESSAGE_HEADER_LENGTH_OFFSET = 4; + static constexpr int MAX_EVENTS = 64; + + const ShardSyncServerOptions _options; + Env& _env; + + int _listenFd = -1; + int _epollFd = -1; + std::array _events; + + struct Client { + int fd; + std::string readBuffer; + std::string writeBuffer; + TernTime lastActive; + size_t messageBytesProcessed; + uint64_t inFlightRequestId; + }; + + std::unordered_map _clients; + + uint64_t _lastRequestId; + std::unordered_map _inFlightRequests; // request to fd mapping + std::vector _receivedRequests; + + void _acceptConnection(); + void _removeClient(int fd); + void _readClient(int fd); + void _writeClient(int fd, bool registerEpoll = false); + + void _sendResponse(int fd, SyncRespContainer& resp); +}; diff --git a/go/bincodegen/bincodegen.go b/go/bincodegen/bincodegen.go index 0586ec6c..677721be 100644 --- a/go/bincodegen/bincodegen.go +++ b/go/bincodegen/bincodegen.go @@ -305,7 +305,7 @@ func generateGoErrorCodes(out io.Writer, errors []string) { //go:embed msgs_bincode.go.header var goHeader string -func generateGo(errors []string, shardReqResps []reqRespType, cdcReqResps []reqRespType, registryReqResps []reqRespType, blocksReqResps []reqRespType, logReqResps []reqRespType, extras []reflect.Type) []byte { +func generateGo(errors []string, shardReqResps []reqRespType, cdcReqResps []reqRespType, registryReqResps []reqRespType, blocksReqResps []reqRespType, logReqResps []reqRespType, syncReqResps []reqRespType, extras []reflect.Type) []byte { out := new(bytes.Buffer) out.Write([]byte(goHeader)) @@ -317,6 +317,7 @@ func generateGo(errors []string, shardReqResps []reqRespType, cdcReqResps []reqR generateGoMsgKind(out, "RegistryMessageKind", "RegistryRequest", "RegistryResponse", "MkRegistryMessage", registryReqResps) generateGoMsgKind(out, "BlocksMessageKind", "BlocksRequest", "BlocksResponse", "MkBlocksMessage", blocksReqResps) generateGoMsgKind(out, "LogMessageKind", "LogRequest", "LogResponse", "MkLogMessage", logReqResps) + generateGoMsgKind(out, "SyncMessageKind", "SyncRequest", "SyncResponse", "MkSyncMessage", syncReqResps) for _, reqResp := range shardReqResps { generateGoReqResp(out, reqResp, "ShardMessageKind", "ShardRequestKind", "ShardResponseKind") @@ -336,6 +337,9 @@ func generateGo(errors []string, shardReqResps []reqRespType, cdcReqResps []reqR for _, reqResp := range logReqResps { generateGoReqResp(out, reqResp, "LogMessageKind", "LogRequestKind", "LogResponseKind") } + for _, reqResp := range syncReqResps { + generateGoReqResp(out, reqResp, "SyncMessageKind", "SyncRequestKind", "SyncResponseKind") + } return out.Bytes() } @@ -1345,7 +1349,7 @@ var fetchedSpanCpp string //go:embed FetchedFullSpan.hpp var fetchedFullSpanCpp string -func generateCpp(errors []string, shardReqResps []reqRespType, cdcReqResps []reqRespType, registryReqResps []reqRespType, blocksReqResps []reqRespType, logReqResps []reqRespType, extras []reflect.Type) ([]byte, []byte) { +func generateCpp(errors []string, shardReqResps []reqRespType, cdcReqResps []reqRespType, registryReqResps []reqRespType, blocksReqResps []reqRespType, logReqResps []reqRespType, syncReqResps []reqRespType, extras []reflect.Type) ([]byte, []byte) { hppOut := new(bytes.Buffer) cppOut := new(bytes.Buffer) @@ -1378,6 +1382,7 @@ func generateCpp(errors []string, shardReqResps []reqRespType, cdcReqResps []req generateCppKind(hppOut, cppOut, "Registry", registryReqResps) generateCppKind(hppOut, cppOut, "Blocks", blocksReqResps) generateCppKind(hppOut, cppOut, "Log", logReqResps) + generateCppKind(hppOut, cppOut, "Sync", syncReqResps) for _, typ := range extras { generateCppSingle(hppOut, cppOut, typ) @@ -1410,11 +1415,16 @@ func generateCpp(errors []string, shardReqResps []reqRespType, cdcReqResps []req generateCppSingle(hppOut, cppOut, reqResp.req) generateCppSingle(hppOut, cppOut, reqResp.resp) } + for _, reqResp := range syncReqResps { + generateCppSingle(hppOut, cppOut, reqResp.req) + generateCppSingle(hppOut, cppOut, reqResp.resp) + } generateCppReqResp(hppOut, cppOut, "Shard", shardReqResps) generateCppReqResp(hppOut, cppOut, "CDC", cdcReqResps) generateCppReqResp(hppOut, cppOut, "Registry", registryReqResps) generateCppReqResp(hppOut, cppOut, "Log", logReqResps) + generateCppReqResp(hppOut, cppOut, "Sync", syncReqResps) generateCppLogEntries( hppOut, @@ -2102,6 +2112,24 @@ func main() { }, } + syncReqResps := []reqRespType{ + { + 0x01, + reflect.TypeOf(msgs.SyncStartReq{}), + reflect.TypeOf(msgs.SyncStartResp{}), + }, + { + 0x02, + reflect.TypeOf(msgs.SyncChunkReq{}), + reflect.TypeOf(msgs.SyncChunkResp{}), + }, + { + 0x03, + reflect.TypeOf(msgs.SyncCompleteReq{}), + reflect.TypeOf(msgs.SyncCompleteResp{}), + }, + } + kernelExtras := []reflect.Type{ reflect.TypeOf(msgs.DirectoryInfoEntry{}), reflect.TypeOf(msgs.DirectoryInfo{}), @@ -2145,6 +2173,7 @@ func main() { reflect.TypeOf(msgs.FullBlockServiceInfo{}), reflect.TypeOf(msgs.CdcInfo{}), reflect.TypeOf(msgs.LocationInfo{}), + reflect.TypeOf(msgs.ColumnFamilyInfo{}), }...)...) goExtras := append(extras, []reflect.Type{ @@ -2157,7 +2186,7 @@ func main() { reflect.TypeOf(msgs.AddrsInfo{})}, kernelExtras...) - goCode := generateGo(errors, shardReqResps, cdcReqResps, registryReqResps, blocksReqResps, logReqResps, goExtras) + goCode := generateGo(errors, shardReqResps, cdcReqResps, registryReqResps, blocksReqResps, logReqResps, syncReqResps, goExtras) goOutFileName := fmt.Sprintf("%s/msgs_bincode.go", cwd) writeIfChanged(goOutFileName, goCode) @@ -2165,7 +2194,7 @@ func main() { writeIfChanged(fmt.Sprintf("%s/../../kmod/bincodegen.h", cwd), kmodHBytes) writeIfChanged(fmt.Sprintf("%s/../../kmod/bincodegen.c", cwd), kmodCBytes) - hppBytes, cppBytes := generateCpp(errors, shardReqResps, cdcReqResps, registryReqResps, blocksReqResps, logReqResps, extras) + hppBytes, cppBytes := generateCpp(errors, shardReqResps, cdcReqResps, registryReqResps, blocksReqResps, logReqResps, syncReqResps, extras) writeIfChanged(fmt.Sprintf("%s/../../cpp/core/MsgsGen.hpp", cwd), hppBytes) writeIfChanged(fmt.Sprintf("%s/../../cpp/core/MsgsGen.cpp", cwd), cppBytes) } diff --git a/go/msgs/msgs.go b/go/msgs/msgs.go index b5909861..c0062fc8 100644 --- a/go/msgs/msgs.go +++ b/go/msgs/msgs.go @@ -92,6 +92,14 @@ const LOG_REQ_PROTOCOL_VERSION uint32 = 0x474f4c // '1474f4c' const LOG_RESP_PROTOCOL_VERSION uint32 = 0x1474f4c +// >>> format(struct.unpack('>> format(struct.unpack('