Skip to content

Commit

Permalink
Add local test case for web socket read/write. Add header getter for …
Browse files Browse the repository at this point in the history
…network connetions so we can check for back off etc.
  • Loading branch information
linuscu committed Oct 27, 2024
1 parent 4023f74 commit 3a3b150
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 28 deletions.
2 changes: 1 addition & 1 deletion packages/network/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ set(deps
concurrency
)

bs_generate_package(network "tier2" "${deps}" "libcurl;mongoose")
bs_generate_package(network "tier2" "${deps}" "libcurl;mongoose;jsonxx")
4 changes: 3 additions & 1 deletion packages/network/include/network/NetworkConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ namespace l::network {
bool IsWebSocket();
bool HasExpired();

bool WSWrite(char* buffer, size_t size);
int32_t WSWrite(char* buffer, size_t size);
int32_t WSRead(char* buffer, size_t size);
void WSClose();

Expand All @@ -89,6 +89,8 @@ namespace l::network {
void NotifyAppendHeader(const char* contents, size_t size);
void NotifyAppendResponse(const char* contents, size_t size);

std::string_view GetHeader(const std::string& key);

uint32_t GetResponseSize();

virtual void SetResponseSize(int32_t expectedResponseSize) = 0;
Expand Down
2 changes: 1 addition & 1 deletion packages/network/include/network/NetworkManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ namespace l::network {
std::function<void(bool, std::string_view)> cb = nullptr);

void WSClose(std::string_view queryName = "");
bool WSWrite(std::string_view queryName, char* buffer, size_t size);
int32_t WSWrite(std::string_view queryName, char* buffer, size_t size);
int32_t WSRead(std::string_view queryName, char* buffer, size_t size);

protected:
Expand Down
28 changes: 23 additions & 5 deletions packages/network/source/common/NetworkConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,27 +220,41 @@ namespace l::network {
return m;
}

bool ConnectionBase::WSWrite(char* buffer, size_t size) {
if (HasExpired() || mCurl == nullptr) {
return false;
int32_t ConnectionBase::WSWrite(char* buffer, size_t size) {
if (HasExpired()) {
LOG(LogError) << "Failed wss write, connection expired";
return -2;
}
if (mCurl == nullptr) {
LOG(LogError) << "Failed wss write, no curl instance";
return -3;
}
size_t sentBytes = 0;
auto res = curl_ws_send(mCurl, buffer, size, &sentBytes, 0, CURLWS_TEXT);
ASSERT(sentBytes == size);
return res == CURLE_OK;
if (res != CURLE_OK) {
LOG(LogError) << "Failed wss write, error: " << res;
}
return res == CURLE_OK ? static_cast<int32_t>(sentBytes) : -1;
}

int32_t ConnectionBase::WSRead(char* buffer, size_t size) {
if (HasExpired()) {
LOG(LogError) << "Failed wss read, connection expired";
return -2;
}
if (mCurl == nullptr) {
LOG(LogError) << "Failed wss read, no curl instance";
return -3;
}
const struct curl_ws_frame* meta;
size_t readBytes = 0;
auto res = curl_ws_recv(mCurl, buffer, size, &readBytes, &meta);
return res == CURLE_OK ? static_cast<int32_t>(readBytes) : -1;
if (res == CURLE_AGAIN || res == CURLE_OK){
return static_cast<int32_t>(readBytes);
}
//LOG(LogError) << "Failed wss read, error: " << res;
return -1;
}

void ConnectionBase::WSClose() {
Expand Down Expand Up @@ -291,5 +305,9 @@ namespace l::network {
return mResponseSize;
}

std::string_view ConnectionBase::GetHeader(const std::string& key) {
return mHeaderMap.at(key);
}


}
2 changes: 1 addition & 1 deletion packages/network/source/common/NetworkInterfaceWS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ namespace l::network {
if (NetworkStatus(interfaceName)) {
auto networkManager = mNetworkManager.lock();
if (networkManager) {
result = networkManager->WSWrite(queryName, buffer, size);
result = networkManager->WSWrite(queryName, buffer, size) == 0;
}
}
}
Expand Down
26 changes: 11 additions & 15 deletions packages/network/source/common/NetworkManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ namespace l::network {
for (auto& it : mConnections) {
if (it->IsHandle(e)) {
foundHandle = true;
it->NotifyCompleteRequest(success);
if (!it->IsWebSocket()) {
it->NotifyCompleteRequest(success);
}
}
}
ASSERT(foundHandle);
Expand Down Expand Up @@ -206,43 +208,37 @@ namespace l::network {
}
}

bool NetworkManager::WSWrite(std::string_view queryName, char* buffer, size_t size) {
int32_t NetworkManager::WSWrite(std::string_view queryName, char* buffer, size_t size) {
std::unique_lock lock(mConnectionsMutex);
auto it = std::find_if(mConnections.begin(), mConnections.end(), [&](std::unique_ptr<ConnectionBase>& request) {
if (queryName != request->GetRequestName()) {
return false;
}
if (!request->HasExpired()) {
if (queryName == request->GetRequestName()) {
return true;
}
return false;
});

if (it == mConnections.end()) {
return false;
LOG(LogError) << "Failed wss write, query not found";
return -4;
}
auto request = it->get();
lock.unlock();

request->WSWrite(buffer, size);

return true;
return request->WSWrite(buffer, size);
}

int32_t NetworkManager::WSRead(std::string_view queryName, char* buffer, size_t size) {
std::unique_lock lock(mConnectionsMutex);
auto it = std::find_if(mConnections.begin(), mConnections.end(), [&](std::unique_ptr<ConnectionBase>& request) {
if (queryName != request->GetRequestName()) {
return false;
}
if (!request->HasExpired()) {
if (queryName == request->GetRequestName()) {
return true;
}
return false;
});

if (it == mConnections.end()) {
return 0;
LOG(LogError) << "Failed wss read, query not found";
return -4;
}
auto request = it->get();
lock.unlock();
Expand Down
2 changes: 2 additions & 0 deletions packages/network/tests/common/NetworkInterfaceTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ using namespace l;

TEST(NetworkInterface, Setup) {

return 0;

std::stringstream configData;
if (!l::filesystem::read("tests/telegrambottoken.txt", configData)) {
return 0;
Expand Down
86 changes: 82 additions & 4 deletions packages/network/tests/common/NetworkWebSocketTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include "network/NetworkInterfaceWS.h"
#include "filesystem/File.h"
#include "jsonxx/jsonxx.h"

#include <array>

Expand All @@ -22,9 +23,6 @@ TEST(NetworkWebSocket, Setup) {
TEST_TRUE_NO_RET(success, "");
TEST_TRUE_NO_RET(ws.IsWebSocket(), "");
failed = !success;
LOG(LogInfo) << "Query arguments: '" << queryArguments << "'";
LOG(LogInfo) << ws.GetResponse().str();

return success ? l::concurrency::RunnableResult::SUCCESS : l::concurrency::RunnableResult::FAILURE;
};

Expand All @@ -36,8 +34,8 @@ TEST(NetworkWebSocket, Setup) {
int32_t read = 0;
int32_t readCount = 20;
do {
read = networkInterfaceWS->Read("Websocket", "wsstest", &buffer[0], 1024);
std::this_thread::sleep_for(std::chrono::milliseconds(50));
read = networkInterfaceWS->Read("Websocket", "wsstest", &buffer[0], 1024);
} while (read <= 0 && readCount-- >= 0);

TEST_TRUE(read > 0, "");
Expand All @@ -54,3 +52,83 @@ TEST(NetworkWebSocket, Setup) {

return 0;
}

TEST(NetworkWebSocket, Binance) {

return 0;

auto networkManager = l::network::CreateNetworkManager(1, false);
auto networkInterfaceWS = l::network::CreateNetworkInterfaceWS(networkManager);

bool failed = false;
auto websocketHandler = [&](
bool success,
std::string_view queryArguments,
l::network::WebSocket& ws) {
failed = !success;
LOG(LogInfo) << "Success: " << success;

return success ? l::concurrency::RunnableResult::SUCCESS : l::concurrency::RunnableResult::FAILURE;
};

networkInterfaceWS->CreateInterface("Binance", "wss", "testnet.binance.vision");
//networkInterfaceWS->CreateInterface("Binance", "wss", "stream.binance.com", 443);
networkInterfaceWS->CreateWebSocketTemplate<std::stringstream>("Binance", "binance", "ws", websocketHandler);
networkInterfaceWS->Connect("Binance", "binance");

char buffer[1024];
int32_t write = 0;

std::string subscribeStream;
subscribeStream += "{\n";
subscribeStream += "\"method\": \"SUBSCRIBE\",\n";
subscribeStream += "\"params\" : [";
//subscribeStream += "\"btcusdt@kline_1m\",";
//subscribeStream += "\"dogeusdt@kline_1m\",";
//subscribeStream += "\"solusdt@kline_1m\"";
subscribeStream += "\"solusdt@kline_1m\"";
subscribeStream += "],\n";
subscribeStream += "\"id\" : 1\n";
subscribeStream += "}\n";

std::string listSubscriptions;
listSubscriptions += "{\n";
listSubscriptions += "\"method\": \"LIST_SUBSCRIPTIONS\",\n";
listSubscriptions += "\"id\" : 2\n";
listSubscriptions += "}\n";

int32_t read = 0;
int32_t readCount = 3;
do {
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
read = networkInterfaceWS->Read("Binance", "binance", &buffer[0], 1024);
if (read > 0) {
LOG(LogInfo) << std::string_view(buffer, read);
}
} while (readCount-- >= 0);

networkInterfaceWS->Write("Binance", "binance", subscribeStream.data(), subscribeStream.size());


readCount = 60;
do {
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
read = networkInterfaceWS->Read("Binance", "binance", &buffer[0], 1024);
if (read > 0) {
LOG(LogInfo) << std::string_view(buffer, read);
}
} while (readCount-- >= 0);


std::this_thread::sleep_for(std::chrono::milliseconds(1000));

networkInterfaceWS->Disconnect("binance");

networkInterfaceWS->Shutdown();
networkManager->ClearJobs();
networkManager->Shutdown();

TEST_FALSE(failed, "");

return 0;
}

0 comments on commit 3a3b150

Please sign in to comment.