Skip to content

Commit

Permalink
#813 eth_sync
Browse files Browse the repository at this point in the history
  • Loading branch information
kladkogex committed Nov 30, 2023
1 parent 79d6b7f commit 91f69db
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 65 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/clang-format-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ jobs:
./libjson-rpc-cpp ./spdlog ./sgxwallet ./scripts ./run_sgx_test ./libzmq ./thirdparty
./cppzmq'
extensions: 'h,hpp,hxx,cpp,cxx,cc,ipp'
clangFormatVersion: 10
clangFormatVersion: 11
inplace: True
74 changes: 38 additions & 36 deletions catchup/server/CatchupServerAgent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
#include "CatchupServerAgent.h"

CatchupServerAgent::CatchupServerAgent( Schain& _schain, const ptr< TCPServerSocket >& _s )
: AbstractServerAgent( "CatchupServer", _schain, _s ) {
: AbstractServerAgent( "CatchupServer", _schain, _s ) {
CHECK_ARGUMENT( _s );
catchupWorkerThreadPool = make_shared< CatchupWorkerThreadPool >( num_threads( 2 ), this );
catchupWorkerThreadPool->startService();
Expand All @@ -77,7 +77,7 @@ CatchupServerAgent::~CatchupServerAgent() {}


void CatchupServerAgent::processNextAvailableConnection(
const ptr< ServerConnection >& _connection ) {
const ptr< ServerConnection >& _connection ) {
MONITOR( __CLASS_NAME__, __FUNCTION__ );

CHECK_ARGUMENT( _connection );
Expand All @@ -97,7 +97,7 @@ void CatchupServerAgent::processNextAvailableConnection(

try {
jsonRequest = sChain->getIo()->readJsonHeader(
_connection->getDescriptor(), "Read catchup request", 10, _connection->getIP() );
_connection->getDescriptor(), "Read catchup request", 10, _connection->getIP() );
} catch ( ExitRequestedException& ) {
throw;
} catch ( ... ) {
Expand All @@ -116,14 +116,14 @@ void CatchupServerAgent::processNextAvailableConnection(
responseHeader = make_shared< BlockFinalizeResponseHeader >();
} else {
BOOST_THROW_EXCEPTION(
InvalidMessageFormatException( "Unknown request type:" + type, __CLASS_NAME__ ) );
InvalidMessageFormatException( "Unknown request type:" + type, __CLASS_NAME__ ) );
}

ptr< vector< uint8_t > > serializedBinary = nullptr;

try {
serializedBinary =
this->createResponseHeaderAndBinary( _connection, jsonRequest, responseHeader );
this->createResponseHeaderAndBinary( _connection, jsonRequest, responseHeader );
} catch ( ExitRequestedException& ) {
throw;
} catch ( ... ) {
Expand All @@ -136,7 +136,7 @@ void CatchupServerAgent::processNextAvailableConnection(
} catch ( ... ) {
}
throw_with_nested( CouldNotSendMessageException(
"Could not create catchup response header", __CLASS_NAME__ ) );
"Could not create catchup response header", __CLASS_NAME__ ) );
}


Expand All @@ -146,7 +146,7 @@ void CatchupServerAgent::processNextAvailableConnection(
throw;
} catch ( ... ) {
throw_with_nested(
CouldNotSendMessageException( "Could not send response", __CLASS_NAME__ ) );
CouldNotSendMessageException( "Could not send response", __CLASS_NAME__ ) );
}


Expand All @@ -164,7 +164,7 @@ void CatchupServerAgent::processNextAvailableConnection(
throw;
} catch ( ... ) {
throw_with_nested(
CouldNotSendMessageException( "Could not send serialized binary", __CLASS_NAME__ ) );
CouldNotSendMessageException( "Could not send serialized binary", __CLASS_NAME__ ) );
}

LOG( debug, "Server step 3: response completed: blocks sent" );
Expand All @@ -174,8 +174,8 @@ void CatchupServerAgent::processNextAvailableConnection(


ptr< vector< uint8_t > > CatchupServerAgent::createResponseHeaderAndBinary(
const ptr< ServerConnection >&, nlohmann::json _jsonRequest,
const ptr< Header >& _responseHeader ) {
const ptr< ServerConnection >&, nlohmann::json _jsonRequest,
const ptr< Header >& _responseHeader ) {
CHECK_ARGUMENT( _responseHeader );

try {
Expand All @@ -186,9 +186,9 @@ ptr< vector< uint8_t > > CatchupServerAgent::createResponseHeaderAndBinary(

if ( ( uint64_t ) sChain->getSchainID() != schainID ) {
_responseHeader->setStatusSubStatus(
CONNECTION_ERROR, CONNECTION_ERROR_UNKNOWN_SCHAIN_ID );
CONNECTION_ERROR, CONNECTION_ERROR_UNKNOWN_SCHAIN_ID );
BOOST_THROW_EXCEPTION( InvalidSchainException(
"Incorrect schain " + to_string( schainID ), __CLASS_NAME__ ) );
"Incorrect schain " + to_string( schainID ), __CLASS_NAME__ ) );
};


Expand All @@ -198,22 +198,22 @@ ptr< vector< uint8_t > > CatchupServerAgent::createResponseHeaderAndBinary(

if ( type.compare( Header::BLOCK_CATCHUP_REQ ) == 0 ) {
serializedBinary = createBlockCatchupResponse( _jsonRequest,
dynamic_pointer_cast< CatchupResponseHeader >( _responseHeader ), blockID );
dynamic_pointer_cast< CatchupResponseHeader >( _responseHeader ), blockID );

} else if ( type.compare( Header::BLOCK_FINALIZE_REQ ) == 0 ) {
ptr< NodeInfo > nmi = sChain->getNode()->getNodeInfoById( nodeID );

if ( nmi == nullptr ) {
_responseHeader->setStatusSubStatus(
CONNECTION_ERROR, CONNECTION_ERROR_DONT_KNOW_THIS_NODE );
CONNECTION_ERROR, CONNECTION_ERROR_DONT_KNOW_THIS_NODE );
BOOST_THROW_EXCEPTION( InvalidNodeIDException(
"Could not find node info for NODE_ID:" + to_string( ( uint64_t ) nodeID ),
__CLASS_NAME__ ) );
"Could not find node info for NODE_ID:" + to_string( ( uint64_t ) nodeID ),
__CLASS_NAME__ ) );
}


serializedBinary = createBlockFinalizeResponse( _jsonRequest,
dynamic_pointer_cast< BlockFinalizeResponseHeader >( _responseHeader ), blockID );
dynamic_pointer_cast< BlockFinalizeResponseHeader >( _responseHeader ), blockID );
}


Expand All @@ -227,8 +227,8 @@ ptr< vector< uint8_t > > CatchupServerAgent::createResponseHeaderAndBinary(


ptr< vector< uint8_t > > CatchupServerAgent::createBlockCatchupResponse(
nlohmann::json /*_jsonRequest */, const ptr< CatchupResponseHeader >& _responseHeader,
block_id _blockID ) {
nlohmann::json /*_jsonRequest */, const ptr< CatchupResponseHeader >& _responseHeader,
block_id _blockID ) {
CHECK_ARGUMENT( _responseHeader );

MONITOR( __CLASS_NAME__, __FUNCTION__ );
Expand All @@ -246,34 +246,36 @@ ptr< vector< uint8_t > > CatchupServerAgent::createBlockCatchupResponse(

auto blockSizes = make_shared< list< uint64_t > >();

auto committedBlockID = sChain->getLastCommittedBlockID();
auto lastCommittedBlockID = sChain->getLastCommittedBlockID();
auto lastCommittedBlockTimestampS = sChain->getLastCommittedBlockTimeStamp().getS();

if ( _blockID >= committedBlockID ) {
LOG( debug, "Catchups: blockID >= committedBlockID" );
if ( _blockID >= lastCommittedBlockID ) {
LOG( debug, "Catchups: blockID >= lastCommittedBlockID" );
_responseHeader->setStatusSubStatus( CONNECTION_DISCONNECT, CONNECTION_OK );
_responseHeader->setComplete();
return nullptr;
}


auto serializedBlocks =
getSchain()->getNode()->getBlockDB()->getSerializedBlocksFromLevelDB(
( uint64_t ) _blockID + 1, committedBlockID, blockSizes );
getSchain()->getNode()->getBlockDB()->getSerializedBlocksFromLevelDB(
( uint64_t ) _blockID + 1, lastCommittedBlockID, blockSizes );

CHECK_STATE( blockSizes->size() > 0 );


if ( serializedBlocks == nullptr ) {
_responseHeader->setStatusSubStatus(
CONNECTION_DISCONNECT, CONNECTION_CATCHUP_DONT_HAVE_THIS_BLOCK );
CONNECTION_DISCONNECT, CONNECTION_CATCHUP_DONT_HAVE_THIS_BLOCK );
_responseHeader->setComplete();
return nullptr;
}


_responseHeader->setStatusSubStatus( CONNECTION_PROCEED, CONNECTION_OK );

_responseHeader->setBlockSizes( blockSizes );
_responseHeader->setBlockSizesAndLatestBlockInfo(
blockSizes, lastCommittedBlockID, lastCommittedBlockTimestampS );

auto responseTimeMs = Time::getCurrentTimeMs() - responseStartTimeMs;

Expand All @@ -289,8 +291,8 @@ ptr< vector< uint8_t > > CatchupServerAgent::createBlockCatchupResponse(


ptr< vector< uint8_t > > CatchupServerAgent::createBlockFinalizeResponse(
nlohmann::json _jsonRequest, const ptr< BlockFinalizeResponseHeader >& _responseHeader,
block_id _blockID ) {
nlohmann::json _jsonRequest, const ptr< BlockFinalizeResponseHeader >& _responseHeader,
block_id _blockID ) {
CHECK_ARGUMENT( _responseHeader );

MONITOR( __CLASS_NAME__, __FUNCTION__ );
Expand All @@ -301,7 +303,7 @@ ptr< vector< uint8_t > > CatchupServerAgent::createBlockFinalizeResponse(
if ( fragmentIndex < 1 || ( uint64_t ) fragmentIndex > getSchain()->getNodeCount() - 1 ) {
LOG( debug, "Incorrect fragment index:" << to_string( fragmentIndex ) );
_responseHeader->setStatusSubStatus(
CONNECTION_DISCONNECT, CONNECTION_ERROR_INVALID_FRAGMENT_INDEX );
CONNECTION_DISCONNECT, CONNECTION_ERROR_INVALID_FRAGMENT_INDEX );
_responseHeader->setComplete();
return nullptr;
}
Expand All @@ -313,7 +315,7 @@ ptr< vector< uint8_t > > CatchupServerAgent::createBlockFinalizeResponse(
if ( proposerIndex < 1 || ( uint64_t ) fragmentIndex > getSchain()->getNodeCount() ) {
LOG( debug, "Incorrect proposer index:" << to_string( proposerIndex ) );
_responseHeader->setStatusSubStatus(
CONNECTION_DISCONNECT, CONNECTION_ERROR_INVALID_PROPOSER_INDEX );
CONNECTION_DISCONNECT, CONNECTION_ERROR_INVALID_PROPOSER_INDEX );
_responseHeader->setComplete();
return nullptr;
}
Expand All @@ -322,7 +324,7 @@ ptr< vector< uint8_t > > CatchupServerAgent::createBlockFinalizeResponse(
// We could have either a proposal or a committed block. Try proposal first.

auto proposal = getSchain()->getNode()->getBlockProposalDB()->getBlockProposal(
_blockID, proposerIndex );
_blockID, proposerIndex );
string daSig;


Expand All @@ -340,28 +342,28 @@ ptr< vector< uint8_t > > CatchupServerAgent::createBlockFinalizeResponse(
// since at this time we already not which index has been committed
if ( committedBlock->getProposerIndex() != ( uint64_t ) proposerIndex ) {
_responseHeader->setStatusSubStatus( CONNECTION_DISCONNECT,
CONNECTION_FINALIZER_CLIENT_ASKING_FOR_INCORRECT_PROPOSER_INDEX );
CONNECTION_FINALIZER_CLIENT_ASKING_FOR_INCORRECT_PROPOSER_INDEX );
_responseHeader->setComplete();
return nullptr;
}
proposal = committedBlock;
daSig = committedBlock->getDaSig();
} else {
_responseHeader->setStatusSubStatus(
CONNECTION_DISCONNECT, CONNECTION_FINALIZE_DONT_HAVE_PROPOSAL );
CONNECTION_DISCONNECT, CONNECTION_FINALIZE_DONT_HAVE_PROPOSAL );
_responseHeader->setComplete();
return nullptr;
}
} else {
daSig = getNode()->getDaProofDB()->getDASig(
proposal->getBlockID(), proposal->getProposerIndex() );
proposal->getBlockID(), proposal->getProposerIndex() );
}

CHECK_STATE( !daSig.empty() );


auto fragment =
proposal->getFragment( ( uint64_t ) getSchain()->getNodeCount() - 1, fragmentIndex );
proposal->getFragment( ( uint64_t ) getSchain()->getNodeCount() - 1, fragmentIndex );


CHECK_STATE( fragment );
Expand All @@ -373,7 +375,7 @@ ptr< vector< uint8_t > > CatchupServerAgent::createBlockFinalizeResponse(
CHECK_STATE( serializedFragment );

_responseHeader->setFragmentParams( serializedFragment->size(),
proposal->serializeProposal()->size(), proposal->getHash().toHex(), daSig );
proposal->serializeProposal()->size(), proposal->getHash().toHex(), daSig );

return serializedFragment;
} catch ( ExitRequestedException& e ) {
Expand Down
33 changes: 15 additions & 18 deletions headers/CatchupResponseHeader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,31 +34,28 @@

using namespace std;

CatchupResponseHeader::CatchupResponseHeader() : Header( Header::BLOCK_CATCHUP_RSP ) {}
CatchupResponseHeader::CatchupResponseHeader() : Header(Header::BLOCK_CATCHUP_RSP) {}

void CatchupResponseHeader::setBlockSizes( const ptr< list< uint64_t > >& _blockSizes ) {
CHECK_ARGUMENT( _blockSizes );

blockCount = _blockSizes->size();
void CatchupResponseHeader::setBlockSizesAndLatestBlockInfo(
const ptr<list<uint64_t> > &_blockSizes, block_id _lastCommittedBlockId,
uint64_t _lastCommittedBlockTimestampS) {
CHECK_ARGUMENT(_blockSizes);
CHECK_STATE(!complete)

blockSizes = _blockSizes;

lastCommittedBlockId = (uint64_t) _lastCommittedBlockId;
lastCommittedBlockTimestampS = _lastCommittedBlockTimestampS;
complete = true;
}

void CatchupResponseHeader::addFields( nlohmann::json& _j ) {
Header::addFields( _j );
void CatchupResponseHeader::addFields(nlohmann::json &_j) {
Header::addFields(_j);

_j["count"] = blockCount;

if ( blockSizes != nullptr )
_j["sizes"] = *blockSizes;
}
_j["count"] = blockSizes ? blockSizes->size() : 0;
_j["lastBid"] = lastCommittedBlockId;
_j["lastTs"] = lastCommittedBlockTimestampS;

uint64_t CatchupResponseHeader::getBlockCount() const {
return blockCount;
}

void CatchupResponseHeader::setBlockCount( uint64_t _blockCount ) {
CatchupResponseHeader::blockCount = _blockCount;
if (blockSizes != nullptr)
_j["sizes"] = *blockSizes;
}
21 changes: 11 additions & 10 deletions headers/CatchupResponseHeader.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,22 @@ class Schain;
class Transaction;
class CatchupResponseHeader : public Header {
public:
[[nodiscard]] uint64_t getBlockCount() const;

void setBlockCount( uint64_t blockCount );

private:
uint64_t blockCount = 0;

ptr< list< uint64_t > > blockSizes = nullptr;

public:
CatchupResponseHeader();

explicit CatchupResponseHeader( const ptr< list< uint64_t > > _blockSizes );

void setBlockSizes( const ptr< list< uint64_t > >& _blockSizes );
void setBlockSizesAndLatestBlockInfo( const ptr< list< uint64_t > >& _blockSizes,
block_id _lastCommittedBlockId, uint64_t _lastCommittedBlockTimestampS );

void addFields( nlohmann::basic_json<>& j_ ) override;

private:

ptr< list< uint64_t > > blockSizes = nullptr;

// latest block known to this consensus instance and its
uint64_t lastCommittedBlockId = 0;
uint64_t lastCommittedBlockTimestampS = 0;

};

0 comments on commit 91f69db

Please sign in to comment.