Skip to content
This repository was archived by the owner on Aug 27, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
7dc02dd
chore: update response stats for debugging
chetan-zilliqa Oct 17, 2023
24721aa
Add logs when txn forwarding happens
chetan-zilliqa Oct 18, 2023
f2f181a
Fix for mismatch in txn pool data
chetan-zilliqa Oct 18, 2023
2c2d0bd
Some helpful addons
bzawisto Oct 19, 2023
90a05ae
Fixing cosig retrieval
bzawisto Oct 23, 2023
dbecdc4
Fixes
bzawisto Oct 23, 2023
d9460cc
Remove unnecessary logging
bzawisto Oct 23, 2023
c43cf75
Adding more logging
bzawisto Oct 24, 2023
fd05731
More logs
bzawisto Oct 24, 2023
25421ad
Followup investigation
bzawisto Oct 25, 2023
322f5f3
Another one
bzawisto Oct 25, 2023
1fbec0b
Hard revert
bzawisto Oct 26, 2023
964ad6c
Close socket
bzawisto Oct 26, 2023
abc33f5
Merge remote-tracking branch 'origin/debug/zil-5422'
Steve-White-UK Oct 26, 2023
23d832d
Merge remote-tracking branch 'origin/master'
Steve-White-UK Oct 27, 2023
bcca743
Save the changes that have been tested and verified
Steve-White-UK Oct 27, 2023
c33e048
Save the changes that have been tested and verified
Steve-White-UK Oct 27, 2023
c2f0424
Merge branch 'master' into ZIL-5450-Improvements-to-network-and-upgra…
Steve-White-UK Oct 27, 2023
bf2401f
Save the changes that have been tested and verified
Steve-White-UK Oct 27, 2023
230317d
Save the changes that have been tested and verified
Steve-White-UK Oct 27, 2023
60dac20
Save the changes that have been tested and verified
Steve-White-UK Oct 27, 2023
438dcbe
Save the changes that have been tested and verified
Steve-White-UK Oct 27, 2023
0cd3bc6
updates and experiment
Oct 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
523 changes: 0 additions & 523 deletions constants.xml.native.tests

This file was deleted.

68 changes: 58 additions & 10 deletions src/libNetwork/P2PServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/read.hpp>
#include <boost/container/small_vector.hpp>

#include "Blacklist.h"
#include "libUtils/Logger.h"
Expand Down Expand Up @@ -202,9 +203,10 @@ std::shared_ptr<P2PServer> P2PServer::CreateAndStart(AsioContext& asio,
throw std::runtime_error(error_msg);
}

P2PServerConnection::P2PServerConnection(
std::weak_ptr<P2PServerImpl> owner, uint64_t this_id, Peer&& remote_peer,
TcpSocket socket, size_t max_message_size)
P2PServerConnection::P2PServerConnection(std::weak_ptr<P2PServerImpl> owner,
uint64_t this_id, Peer&& remote_peer,
TcpSocket socket,
size_t max_message_size)
: m_owner(std::move(owner)),
m_id(this_id),
m_remotePeer(std::move(remote_peer)),
Expand All @@ -229,6 +231,9 @@ void P2PServerConnection::ReadNextMessage() {
if (!ec) {
assert(n == HDR_LEN);
}
if (ec) {
// LOG_GENERAL(WARNING, "Got error code: " << ec.message());
}
if (ec != OPERATION_ABORTED) {
self->OnHeaderRead(ec);
}
Expand All @@ -237,10 +242,9 @@ void P2PServerConnection::ReadNextMessage() {

void P2PServerConnection::OnHeaderRead(const ErrorCode& ec) {
if (ec) {
if (ec != END_OF_FILE) {
LOG_GENERAL(INFO,
"Peer " << m_remotePeer << " read error: " << ec.message());
}
LOG_GENERAL(INFO,
"Peer " << m_remotePeer << " read error: " << ec.message());
CloseSocket();
OnConnectionClosed();
return;
}
Expand All @@ -254,7 +258,9 @@ void P2PServerConnection::OnHeaderRead(const ErrorCode& ec) {
<< " Adding sending node "
<< m_remotePeer.GetPrintableIPAddress()
<< " as strictly blacklisted");
Blacklist::GetInstance().Add({m_remotePeer.GetIpAddress(),m_remotePeer.GetListenPortHost(),m_remotePeer.GetNodeIndentifier()});
Blacklist::GetInstance().Add({m_remotePeer.GetIpAddress(),
m_remotePeer.GetListenPortHost(),
m_remotePeer.GetNodeIndentifier()});

CloseSocket();
OnConnectionClosed();
Expand All @@ -269,6 +275,9 @@ void P2PServerConnection::OnHeaderRead(const ErrorCode& ec) {
if (!ec) {
assert(n == self->m_readBuffer.size() - HDR_LEN);
}
if (ec) {
// LOG_GENERAL(WARNING, "Got error code: " << ec.message());
}
if (ec != OPERATION_ABORTED) {
self->OnBodyRead(ec);
}
Expand All @@ -278,17 +287,19 @@ void P2PServerConnection::OnHeaderRead(const ErrorCode& ec) {
void P2PServerConnection::OnBodyRead(const ErrorCode& ec) {
if (ec) {
LOG_GENERAL(INFO, "Read error: " << ec.message());
CloseSocket();
OnConnectionClosed();
return;
}

ReadMessageResult result;
auto state = TryReadMessage(m_readBuffer.data(), m_readBuffer.size(), result);

if (state != ReadState::SUCCESS) {
LOG_GENERAL(WARNING, "Message deserialize error: blacklisting "
<< m_remotePeer.GetPrintableIPAddress());
Blacklist::GetInstance().Add({m_remotePeer.GetIpAddress(),m_remotePeer.GetListenPortHost(),m_remotePeer.GetNodeIndentifier()});
Blacklist::GetInstance().Add({m_remotePeer.GetIpAddress(),
m_remotePeer.GetListenPortHost(),
m_remotePeer.GetNodeIndentifier()});

CloseSocket();
OnConnectionClosed();
Expand All @@ -298,6 +309,7 @@ void P2PServerConnection::OnBodyRead(const ErrorCode& ec) {
auto owner = m_owner.lock();
if (!owner || !owner->OnMessage(m_id, m_remotePeer, result)) {
CloseSocket();
OnConnectionClosed();
return;
}

Expand All @@ -311,8 +323,44 @@ void P2PServerConnection::Close() {

void P2PServerConnection::CloseSocket() {
ErrorCode ec;
// both close and shutdown should be none blocking calls certainly on current linux
// shutdown marks the socket as blocked for both read and write
// shutdown tells the OS to begin the graceful closedown of the TCP connection.
// close() is a blocking call that waits for the OS to complete the closedown.
// close also frees the OS resources from the program so should be called even if
// an error condition is encountered
m_socket.shutdown(boost::asio::socket_base::shutdown_both, ec);
if (ec) {
m_socket.close(ec);
if (ec) {
LOG_GENERAL(INFO, "Informational, not an issue - Error closing socket: " << ec.message());
}
return;
}
size_t unread = m_socket.available(ec);
if (ec) {
m_socket.close(ec);
return;
}
// On Linux, the close() function for sockets does not necessarily wait
// for the operating system to complete the operation before returning.
// It typically marks the socket as closed and releases any resources
// associated with it immediately. However, this does not guarantee that all
// pending data has been sent or received. It is important to handle any
// necessary error checking and ensure all data transmission is complete
// before calling close() on a socket.
if (unread > 0) {
do {
boost::container::small_vector<uint8_t, 4096> buf;
buf.resize(unread);
m_socket.read_some(boost::asio::mutable_buffer(buf.data(), unread), ec);
LOG_GENERAL(INFO, "Draining remaining IO before close" << m_remotePeer.GetPrintableIPAddress());
} while (!ec && (unread = m_socket.available(ec)) > 0);
}
m_socket.close(ec);
if (ec) {
LOG_GENERAL(INFO, "Informational, not an issue - Error closing socket: " << ec.message());
}
}

void P2PServerConnection::OnConnectionClosed() {
Expand Down
22 changes: 19 additions & 3 deletions src/libNetwork/SendJobs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,8 @@ class GracefulCloseImpl
LOG_GENERAL(DEBUG,
"Expected EOF, got ec=" << ec.message() << " n=" << n);
}
ErrorCode ignored;
self->m_socket.close(ignored);
});
}
};
Expand All @@ -226,10 +228,12 @@ void CloseGracefully(Socket&& socket) {
}
socket.shutdown(boost::asio::socket_base::shutdown_both, ec);
if (ec) {
socket.close(ec);
return;
}
size_t unread = socket.available(ec);
if (ec) {
socket.close(ec);
return;
}
if (unread > 0) {
Expand All @@ -239,6 +243,8 @@ void CloseGracefully(Socket&& socket) {
}
if (!ec) {
std::make_shared<GracefulCloseImpl>(std::move(socket))->Close();
} else {
socket.close(ec);
}
}

Expand Down Expand Up @@ -378,13 +384,21 @@ class PeerSendQueue : public std::enable_shared_from_this<PeerSendQueue> {
boost::asio::async_write(
m_socket, boost::asio::const_buffer(msg.data.get(), msg.size),
[self = shared_from_this()](const ErrorCode& ec, size_t) {
if (ec) {
// LOG_GENERAL(WARNING, "Got error code: " << ec.message());
}
if (ec != OPERATION_ABORTED) {
self->OnWritten(ec);
}
});
}

void OnWritten(const ErrorCode& ec) {
if (!ec && !m_closed) {
// LOG_GENERAL(INFO, "Successfully sent message to: "
// << m_peer.GetPrintableIPAddress()
// << " with size: " << m_queue.front().msg.size);
}
if (m_closed) {
return;
}
Expand Down Expand Up @@ -415,11 +429,12 @@ class PeerSendQueue : public std::enable_shared_from_this<PeerSendQueue> {
return;
}

WaitTimer(m_timer, Milliseconds{RECONNECT_INTERVAL_IN_MS}, [this]() { Reconnect(); });
WaitTimer(m_timer, Milliseconds{RECONNECT_INTERVAL_IN_MS},
[this]() { Reconnect(); });
}

void Reconnect() {
LOG_GENERAL(DEBUG, "Peer " << m_peer << " reconnects");
LOG_GENERAL(INFO, "Peer " << m_peer << " reconnects");
CloseGracefully(std::move(m_socket));
m_socket = Socket(m_asioContext);
Connect();
Expand Down Expand Up @@ -493,7 +508,8 @@ class SendJobsImpl : public SendJobs,
return;
}

LOG_GENERAL(DEBUG, "Enqueueing message, size=" << message.size);
LOG_GENERAL(DEBUG, "Enqueueing message, size=" << message.size
<< " peer = " << peer);

// this fn enqueues the lambda to be executed on WorkerThread with
// sequential guarantees for messages from every calling thread
Expand Down
6 changes: 6 additions & 0 deletions src/libScilla/UnixDomainSocketServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,12 @@ void UnixDomainSocketServer::WorkerThread() {
boost::system::error_code ec;
m_acceptor->accept(socket, ec);
if (ec && m_started) {
socket.close(ec);
throw std::runtime_error(ec.message());
}

if (!m_started) {
socket.close(ec);
break;
}

Expand All @@ -115,6 +117,7 @@ void UnixDomainSocketServer::WorkerThread() {
}

if (!m_started) {
socket.close(ec);
break;
}

Expand All @@ -130,6 +133,7 @@ void UnixDomainSocketServer::WorkerThread() {
}

if (!m_started) {
socket.close(ec);
break;
}

Expand All @@ -147,13 +151,15 @@ void UnixDomainSocketServer::WorkerThread() {
if (ec) {
LOG_GENERAL(WARNING,
"Write to " << m_path << " failed: " << ec.message());
socket.close(ec);
continue;
}

socket.shutdown(
boost::asio::local::stream_protocol::socket::shutdown_both, ec);
if (ec) {
LOG_GENERAL(WARNING, "Shutdown failed: " << ec.message());
socket.close(ec);
continue;
}
}
Expand Down
8 changes: 7 additions & 1 deletion src/libServer/APIServerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ class APIServerImpl::Connection
if (!owner) {
// server is closed
m_stream.socket().shutdown(tcp::socket::shutdown_both, ec);
m_stream.socket().close(ec);
OnClosed();
return;
}

Expand All @@ -101,6 +103,8 @@ class APIServerImpl::Connection
LOG_GENERAL(DEBUG, "Read error: " << ec.message());
m_stream.socket().shutdown(tcp::socket::shutdown_both, ec);
}
m_stream.socket().shutdown(tcp::socket::shutdown_both, ec);
m_stream.socket().close(ec);
OnClosed();
return;
}
Expand Down Expand Up @@ -205,6 +209,8 @@ class APIServerImpl::Connection
if (sz == 0 && m_owner.expired()) {
beast::error_code ec;
m_stream.socket().shutdown(tcp::socket::shutdown_both, ec);
m_stream.socket().close(ec);
OnClosed();
return;
}

Expand Down Expand Up @@ -346,7 +352,7 @@ bool APIServerImpl::DoListen() {

#define CHECK_EC() \
if (ec) { \
LOG_GENERAL(FATAL, "Cannot start API server: " << ec.message()); \
LOG_GENERAL(FATAL, "Cannot start API server: " << ec.message() << " port " << m_options.port); \
return false; \
}

Expand Down
10 changes: 8 additions & 2 deletions src/libServer/LocalAPIServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ bool LocalAPIServer::StartListening() {

#define CHECK_EC() \
if (ec) { \
LOG_GENERAL(WARNING, "Cannot start API server: " << ec.message()); \
LOG_GENERAL(WARNING, "Cannot start API server: " << ec.message() << " port =" << m_port); \
return false; \
}

Expand Down Expand Up @@ -117,10 +117,12 @@ void LocalAPIServer::WorkerThread() {
boost::system::error_code ec;
m_acceptor->accept(socket, ec);
if (ec && m_started) {
socket.close(ec);
throw std::runtime_error(ec.message());
}

if (!m_started) {
socket.close(ec);
break;
}

Expand All @@ -131,10 +133,12 @@ void LocalAPIServer::WorkerThread() {
DEFAULT_DELIMITER_CHAR, ec);
if (ec || n <= 1) {
LOG_GENERAL(WARNING, "Read (" << m_ip << ") failed: " << ec.message());
socket.close(ec);
continue;
}

if (!m_started) {
socket.close(ec);
break;
}

Expand All @@ -150,6 +154,7 @@ void LocalAPIServer::WorkerThread() {
}

if (!m_started) {
socket.close(ec);
break;
}

Expand All @@ -166,15 +171,16 @@ void LocalAPIServer::WorkerThread() {
boost::asio::write(socket, boost::asio::buffer(response), ec);
if (ec) {
LOG_GENERAL(WARNING, "Write (" << m_ip << ") failed: " << ec.message());
socket.close(ec);
continue;
}

socket.shutdown(
boost::asio::local::stream_protocol::socket::shutdown_both, ec);
if (ec) {
LOG_GENERAL(WARNING, "Shutdown failed: " << ec.message());
continue;
}
socket.close(ec);
}
} catch (const std::exception& e) {
LOG_GENERAL(WARNING, "Listening to " << m_ip << " failed: " << e.what());
Expand Down
8 changes: 6 additions & 2 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@ add_subdirectory (Server)
add_subdirectory (EvmFiltersAPI)
add_subdirectory (EthRpcMethods)
add_subdirectory (Utils)
#add_subdirectory (Zilliqa)
add_subdirectory (Zilliqa)
add_subdirectory (native)
#add_subdirectory (RemoteStorageDB) Works only if you have a local mongo server running

file(COPY ${CMAKE_SOURCE_DIR}/constants_local.xml DESTINATION ${CMAKE_BINARY_DIR})
#FIXME: constants.xml is not used for local_run, thus shouldn't be copied
# presently, it's a workaround to silence the error thrown by tests_zilliqa_local.py
file(COPY ${CMAKE_SOURCE_DIR}/constants.xml DESTINATION ${CMAKE_BINARY_DIR})
file(COPY Node DESTINATION ${CMAKE_CURRENT_BINARY_DIR})
file(COPY ${CMAKE_SOURCE_DIR}/tests/Zilliqa/test_zilliqa_local.py DESTINATION ${CMAKE_SOURCE_DIR}/build/tests/zilliqa)
file(COPY ${CMAKE_SOURCE_DIR}/tests/Zilliqa/test_zilliqa_lookup.py DESTINATION ${CMAKE_SOURCE_DIR}/build/tests/zilliqa)
file(COPY ${CMAKE_SOURCE_DIR}/tests/Zilliqa/test_zilliqa_seedpub.py DESTINATION ${CMAKE_SOURCE_DIR}/build/tests/zilliqa)
file(COPY ${CMAKE_SOURCE_DIR}/tests/Zilliqa/test_zilliqa_late.py DESTINATION ${CMAKE_SOURCE_DIR}/build/tests/zilliqa)
Loading