diff --git a/packages/network/CMakeLists.txt b/packages/network/CMakeLists.txt index 5a70cb4..eb3fdf5 100644 --- a/packages/network/CMakeLists.txt +++ b/packages/network/CMakeLists.txt @@ -10,4 +10,4 @@ set(deps concurrency ) -bs_generate_package(network "tier2" "${deps}" "libcurl;mongoose") +bs_generate_package(network "tier2" "${deps}" "libcurl;mongoose;jsonxx") diff --git a/packages/network/include/network/NetworkConnection.h b/packages/network/include/network/NetworkConnection.h index b7c7d41..cc4fffd 100644 --- a/packages/network/include/network/NetworkConnection.h +++ b/packages/network/include/network/NetworkConnection.h @@ -78,7 +78,7 @@ namespace l::network { bool IsWebSocket(); bool HasExpired(); - bool WSWrite(char* buffer, size_t size); + int32_t WSWrite(char* buffer, size_t size); int32_t WSRead(char* buffer, size_t size); void WSClose(); @@ -89,6 +89,8 @@ namespace l::network { void NotifyAppendHeader(const char* contents, size_t size); void NotifyAppendResponse(const char* contents, size_t size); + std::string_view GetHeader(const std::string& key); + uint32_t GetResponseSize(); virtual void SetResponseSize(int32_t expectedResponseSize) = 0; diff --git a/packages/network/include/network/NetworkManager.h b/packages/network/include/network/NetworkManager.h index b1ad43c..f1bb919 100644 --- a/packages/network/include/network/NetworkManager.h +++ b/packages/network/include/network/NetworkManager.h @@ -48,7 +48,7 @@ namespace l::network { std::function cb = nullptr); void WSClose(std::string_view queryName = ""); - bool WSWrite(std::string_view queryName, char* buffer, size_t size); + int32_t WSWrite(std::string_view queryName, char* buffer, size_t size); int32_t WSRead(std::string_view queryName, char* buffer, size_t size); protected: diff --git a/packages/network/source/common/NetworkConnection.cpp b/packages/network/source/common/NetworkConnection.cpp index 95f4af2..4a624db 100644 --- a/packages/network/source/common/NetworkConnection.cpp +++ b/packages/network/source/common/NetworkConnection.cpp @@ -220,27 +220,41 @@ namespace l::network { return m; } - bool ConnectionBase::WSWrite(char* buffer, size_t size) { - if (HasExpired() || mCurl == nullptr) { - return false; + int32_t ConnectionBase::WSWrite(char* buffer, size_t size) { + if (HasExpired()) { + LOG(LogError) << "Failed wss write, connection expired"; + return -2; + } + if (mCurl == nullptr) { + LOG(LogError) << "Failed wss write, no curl instance"; + return -3; } size_t sentBytes = 0; auto res = curl_ws_send(mCurl, buffer, size, &sentBytes, 0, CURLWS_TEXT); ASSERT(sentBytes == size); - return res == CURLE_OK; + if (res != CURLE_OK) { + LOG(LogError) << "Failed wss write, error: " << res; + } + return res == CURLE_OK ? static_cast(sentBytes) : -1; } int32_t ConnectionBase::WSRead(char* buffer, size_t size) { if (HasExpired()) { + LOG(LogError) << "Failed wss read, connection expired"; return -2; } if (mCurl == nullptr) { + LOG(LogError) << "Failed wss read, no curl instance"; return -3; } const struct curl_ws_frame* meta; size_t readBytes = 0; auto res = curl_ws_recv(mCurl, buffer, size, &readBytes, &meta); - return res == CURLE_OK ? static_cast(readBytes) : -1; + if (res == CURLE_AGAIN || res == CURLE_OK){ + return static_cast(readBytes); + } + //LOG(LogError) << "Failed wss read, error: " << res; + return -1; } void ConnectionBase::WSClose() { @@ -291,5 +305,9 @@ namespace l::network { return mResponseSize; } + std::string_view ConnectionBase::GetHeader(const std::string& key) { + return mHeaderMap.at(key); + } + } diff --git a/packages/network/source/common/NetworkInterfaceWS.cpp b/packages/network/source/common/NetworkInterfaceWS.cpp index 33fcc7b..2493642 100644 --- a/packages/network/source/common/NetworkInterfaceWS.cpp +++ b/packages/network/source/common/NetworkInterfaceWS.cpp @@ -79,7 +79,7 @@ namespace l::network { if (NetworkStatus(interfaceName)) { auto networkManager = mNetworkManager.lock(); if (networkManager) { - result = networkManager->WSWrite(queryName, buffer, size); + result = networkManager->WSWrite(queryName, buffer, size) == 0; } } } diff --git a/packages/network/source/common/NetworkManager.cpp b/packages/network/source/common/NetworkManager.cpp index f260409..3428dea 100644 --- a/packages/network/source/common/NetworkManager.cpp +++ b/packages/network/source/common/NetworkManager.cpp @@ -40,7 +40,9 @@ namespace l::network { for (auto& it : mConnections) { if (it->IsHandle(e)) { foundHandle = true; - it->NotifyCompleteRequest(success); + if (!it->IsWebSocket()) { + it->NotifyCompleteRequest(success); + } } } ASSERT(foundHandle); @@ -206,43 +208,37 @@ namespace l::network { } } - bool NetworkManager::WSWrite(std::string_view queryName, char* buffer, size_t size) { + int32_t NetworkManager::WSWrite(std::string_view queryName, char* buffer, size_t size) { std::unique_lock lock(mConnectionsMutex); auto it = std::find_if(mConnections.begin(), mConnections.end(), [&](std::unique_ptr& request) { - if (queryName != request->GetRequestName()) { - return false; - } - if (!request->HasExpired()) { + if (queryName == request->GetRequestName()) { return true; } return false; }); if (it == mConnections.end()) { - return false; + LOG(LogError) << "Failed wss write, query not found"; + return -4; } auto request = it->get(); lock.unlock(); - request->WSWrite(buffer, size); - - return true; + return request->WSWrite(buffer, size); } int32_t NetworkManager::WSRead(std::string_view queryName, char* buffer, size_t size) { std::unique_lock lock(mConnectionsMutex); auto it = std::find_if(mConnections.begin(), mConnections.end(), [&](std::unique_ptr& request) { - if (queryName != request->GetRequestName()) { - return false; - } - if (!request->HasExpired()) { + if (queryName == request->GetRequestName()) { return true; } return false; }); if (it == mConnections.end()) { - return 0; + LOG(LogError) << "Failed wss read, query not found"; + return -4; } auto request = it->get(); lock.unlock(); diff --git a/packages/network/tests/common/NetworkInterfaceTest.cpp b/packages/network/tests/common/NetworkInterfaceTest.cpp index 422d0e5..d3d65d2 100644 --- a/packages/network/tests/common/NetworkInterfaceTest.cpp +++ b/packages/network/tests/common/NetworkInterfaceTest.cpp @@ -12,6 +12,8 @@ using namespace l; TEST(NetworkInterface, Setup) { + return 0; + std::stringstream configData; if (!l::filesystem::read("tests/telegrambottoken.txt", configData)) { return 0; diff --git a/packages/network/tests/common/NetworkWebSocketTest.cpp b/packages/network/tests/common/NetworkWebSocketTest.cpp index 4897d2d..ca5182f 100644 --- a/packages/network/tests/common/NetworkWebSocketTest.cpp +++ b/packages/network/tests/common/NetworkWebSocketTest.cpp @@ -4,6 +4,7 @@ #include "network/NetworkInterfaceWS.h" #include "filesystem/File.h" +#include "jsonxx/jsonxx.h" #include @@ -22,9 +23,6 @@ TEST(NetworkWebSocket, Setup) { TEST_TRUE_NO_RET(success, ""); TEST_TRUE_NO_RET(ws.IsWebSocket(), ""); failed = !success; - LOG(LogInfo) << "Query arguments: '" << queryArguments << "'"; - LOG(LogInfo) << ws.GetResponse().str(); - return success ? l::concurrency::RunnableResult::SUCCESS : l::concurrency::RunnableResult::FAILURE; }; @@ -36,8 +34,8 @@ TEST(NetworkWebSocket, Setup) { int32_t read = 0; int32_t readCount = 20; do { - read = networkInterfaceWS->Read("Websocket", "wsstest", &buffer[0], 1024); std::this_thread::sleep_for(std::chrono::milliseconds(50)); + read = networkInterfaceWS->Read("Websocket", "wsstest", &buffer[0], 1024); } while (read <= 0 && readCount-- >= 0); TEST_TRUE(read > 0, ""); @@ -54,3 +52,83 @@ TEST(NetworkWebSocket, Setup) { return 0; } + +TEST(NetworkWebSocket, Binance) { + + return 0; + + auto networkManager = l::network::CreateNetworkManager(1, false); + auto networkInterfaceWS = l::network::CreateNetworkInterfaceWS(networkManager); + + bool failed = false; + auto websocketHandler = [&]( + bool success, + std::string_view queryArguments, + l::network::WebSocket& ws) { + failed = !success; + LOG(LogInfo) << "Success: " << success; + + return success ? l::concurrency::RunnableResult::SUCCESS : l::concurrency::RunnableResult::FAILURE; + }; + + networkInterfaceWS->CreateInterface("Binance", "wss", "testnet.binance.vision"); + //networkInterfaceWS->CreateInterface("Binance", "wss", "stream.binance.com", 443); + networkInterfaceWS->CreateWebSocketTemplate("Binance", "binance", "ws", websocketHandler); + networkInterfaceWS->Connect("Binance", "binance"); + + char buffer[1024]; + int32_t write = 0; + + std::string subscribeStream; + subscribeStream += "{\n"; + subscribeStream += "\"method\": \"SUBSCRIBE\",\n"; + subscribeStream += "\"params\" : ["; + //subscribeStream += "\"btcusdt@kline_1m\","; + //subscribeStream += "\"dogeusdt@kline_1m\","; + //subscribeStream += "\"solusdt@kline_1m\""; + subscribeStream += "\"solusdt@kline_1m\""; + subscribeStream += "],\n"; + subscribeStream += "\"id\" : 1\n"; + subscribeStream += "}\n"; + + std::string listSubscriptions; + listSubscriptions += "{\n"; + listSubscriptions += "\"method\": \"LIST_SUBSCRIPTIONS\",\n"; + listSubscriptions += "\"id\" : 2\n"; + listSubscriptions += "}\n"; + + int32_t read = 0; + int32_t readCount = 3; + do { + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + read = networkInterfaceWS->Read("Binance", "binance", &buffer[0], 1024); + if (read > 0) { + LOG(LogInfo) << std::string_view(buffer, read); + } + } while (readCount-- >= 0); + + networkInterfaceWS->Write("Binance", "binance", subscribeStream.data(), subscribeStream.size()); + + + readCount = 60; + do { + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + read = networkInterfaceWS->Read("Binance", "binance", &buffer[0], 1024); + if (read > 0) { + LOG(LogInfo) << std::string_view(buffer, read); + } + } while (readCount-- >= 0); + + + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + + networkInterfaceWS->Disconnect("binance"); + + networkInterfaceWS->Shutdown(); + networkManager->ClearJobs(); + networkManager->Shutdown(); + + TEST_FALSE(failed, ""); + + return 0; +}