Skip to content

Commit

Permalink
1. add process TcpSocket callbacks for ListenThread::Create
Browse files Browse the repository at this point in the history
2. add options for AsyncConnect.
  • Loading branch information
IronsDu committed Mar 14, 2019
1 parent db9e683 commit d37fc78
Show file tree
Hide file tree
Showing 16 changed files with 490 additions and 287 deletions.
2 changes: 1 addition & 1 deletion examples/BenchWebsocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ int main(int argc, char **argv)
};

service->addTcpConnection(std::move(socket),
brynet::net::TcpService::AddSocketOption::WithEnterCallback(enterCallback),
brynet::net::TcpService::AddSocketOption::AddEnterCallback(enterCallback),
brynet::net::TcpService::AddSocketOption::WithMaxRecvBufferSize(1024*1024));
}

Expand Down
17 changes: 8 additions & 9 deletions examples/BroadCastServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,23 +79,21 @@ int main(int argc, char** argv)

service = TcpService::Create();
auto mainLoop = std::make_shared<EventLoop>();
auto listenThrean = ListenThread::Create();

listenThrean->startListen(false, "0.0.0.0", port, [mainLoop, listenThrean](TcpSocket::Ptr socket) {
auto listenThread = ListenThread::Create(false, "0.0.0.0", port, [mainLoop](TcpSocket::Ptr socket) {
socket->setNodelay();
socket->setSendSize(32 * 1024);
socket->setRecvSize(32 * 1024);

auto enterCallback = [mainLoop](const TcpConnection::Ptr& session) {
mainLoop->runAsyncFunctor([session]() {
addClientID(session);
});
});

session->setDisConnectCallback([mainLoop](const TcpConnection::Ptr& session) {
mainLoop->runAsyncFunctor([session]() {
removeClientID(session);
});
});
});

session->setDataCallback([mainLoop](const char* buffer, size_t len) {
const char* parseStr = buffer;
Expand All @@ -115,7 +113,7 @@ int main(int argc, char** argv)
auto packet = TcpConnection::makePacket(parseStr, packet_len);
mainLoop->runAsyncFunctor([packet]() {
broadCastPacket(packet);
});
});

totalProcLen += packet_len;
parseStr += packet_len;
Expand All @@ -132,13 +130,14 @@ int main(int argc, char** argv)
}

return totalProcLen;
});
});
};
service->addTcpConnection(std::move(socket),
brynet::net::TcpService::AddSocketOption::WithEnterCallback(enterCallback),
brynet::net::TcpService::AddSocketOption::AddEnterCallback(enterCallback),
brynet::net::TcpService::AddSocketOption::WithMaxRecvBufferSize(1024 * 1024));
});
});

listenThread->startListen();
service->startWorkerThread(2);

auto now = std::chrono::steady_clock::now();
Expand Down
18 changes: 13 additions & 5 deletions examples/PingPongClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,32 @@ int main(int argc, char **argv)
{
try
{
connector->asyncConnect(argv[1], atoi(argv[2]), std::chrono::seconds(10), [server, tmp](TcpSocket::Ptr socket) {
auto enterCallback = [server, tmp](TcpSocket::Ptr socket) {
std::cout << "connect success" << std::endl;
socket->setNodelay();

auto enterCallback = [tmp](const TcpConnection::Ptr& session) {
session->setDataCallback([session](const char* buffer, size_t len) {
session->send(buffer, len);
return len;
});
});
session->send(tmp.c_str(), tmp.size());
};

server->addTcpConnection(std::move(socket),
brynet::net::TcpService::AddSocketOption::WithEnterCallback(enterCallback),
brynet::net::TcpService::AddSocketOption::AddEnterCallback(enterCallback),
brynet::net::TcpService::AddSocketOption::WithMaxRecvBufferSize(1024 * 1024));
}, []() {
};

auto failedCallback = []() {
std::cout << "connect failed" << std::endl;
});
};

connector->asyncConnect({
AsyncConnector::ConnectOptions::WithAddr(argv[1], atoi(argv[2])),
AsyncConnector::ConnectOptions::WithTimeout(std::chrono::seconds(10)),
AsyncConnector::ConnectOptions::WithCompletedCallback(enterCallback),
AsyncConnector::ConnectOptions::WithFailedCallback(failedCallback)});
}
catch (std::runtime_error& e)
{
Expand Down
14 changes: 7 additions & 7 deletions examples/PingPongServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ int main(int argc, char **argv)
}

auto server = TcpService::Create();
auto listenThread = ListenThread::Create();

listenThread->startListen(false, "0.0.0.0", atoi(argv[1]), [=](TcpSocket::Ptr socket){
auto listenThread = ListenThread::Create(false, "0.0.0.0", atoi(argv[1]), [=](TcpSocket::Ptr socket) {
socket->setNodelay();

auto enterCallback = [](const TcpConnection::Ptr& session) {
Expand All @@ -37,17 +35,19 @@ int main(int argc, char **argv)
TotalRecvSize += len;
total_packet_num++;
return len;
});
});

session->setDisConnectCallback([](const TcpConnection::Ptr& session) {
total_client_num--;
});
});
};

server->addTcpConnection(std::move(socket),
brynet::net::TcpService::AddSocketOption::WithEnterCallback(enterCallback),
brynet::net::TcpService::AddSocketOption::AddEnterCallback(enterCallback),
brynet::net::TcpService::AddSocketOption::WithMaxRecvBufferSize(1024 * 1024));
});
});

listenThread->startListen();

server->startWorkerThread(atoi(argv[2]));

Expand Down
125 changes: 79 additions & 46 deletions examples/TestHttp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,80 +8,113 @@
#include <brynet/net/http/HttpService.h>
#include <brynet/net/http/HttpFormat.h>
#include <brynet/net/http/WebSocketFormat.h>
#include <brynet/net/Connector.h>

using namespace brynet;
using namespace brynet::net;
using namespace brynet::net::http;

int main(int argc, char **argv)
void reqHttp(TcpService::Ptr service,
AsyncConnector::Ptr connector,
std::string requestStr,
std::vector<AsyncConnector::ConnectOptions::ConnectOptionFunc> options,
std::vector<TcpService::AddSocketOption::AddSocketOptionFunc> sessionOptions,
std::function<void(const HTTPParser& httpParser, const HttpSession::Ptr& session)> userCallback)
{
std::string body = "<html>hello world </html>";
auto enterCallback = [=](TcpSocket::Ptr socket) mutable {
socket->setNodelay();

auto enterCallback = [=](const TcpConnection::Ptr& session) {
HttpService::setup(session, [=](const HttpSession::Ptr& httpSession) {
httpSession->send(requestStr.c_str(), requestStr.size());
httpSession->setHttpCallback([=](const HTTPParser& httpParser, const HttpSession::Ptr& session) {
userCallback(httpParser, session);
});
});
};

sessionOptions.push_back(brynet::net::TcpService::AddSocketOption::AddEnterCallback(enterCallback));
service->addTcpConnection(std::move(socket), sessionOptions);
};

options.push_back(AsyncConnector::ConnectOptions::WithCompletedCallback(enterCallback));
connector->asyncConnect(options);
};

int main(int argc, char **argv)
{
auto service = TcpService::Create();
service->startWorkerThread(2);

auto listenThread = ListenThread::Create();
listenThread->startListen(false, "0.0.0.0", 8080, [service, body](TcpSocket::Ptr socket) {
auto connector = brynet::net::AsyncConnector::Create();
connector->startWorkerThread();


auto listenThread = ListenThread::Create(false, "0.0.0.0", 8080, [service](TcpSocket::Ptr socket) {
std::string body = "<html>hello world </html>";
auto enterCallback = [body](const TcpConnection::Ptr& session) {
HttpService::setup(session, [body](const HttpSession::Ptr& httpSession) {
httpSession->setHttpCallback([body](const HTTPParser& httpParser,
const HttpSession::Ptr& session) {
HttpResponse response;
response.setBody(body);
std::string result = response.getResult();
session->send(result.c_str(), result.size(), [session]() {
session->postShutdown();
HttpResponse response;
response.setBody(body);
std::string result = response.getResult();
session->send(result.c_str(), result.size(), [session]() {
session->postShutdown();
});
});
});

httpSession->setWSCallback([](const HttpSession::Ptr& httpSession,
WebSocketFormat::WebSocketFrameType opcode,
const std::string& payload) {
// ping pong
auto frame = std::make_shared<std::string>();
WebSocketFormat::wsFrameBuild(payload.c_str(),
payload.size(),
*frame,
WebSocketFormat::WebSocketFrameType::TEXT_FRAME,
true,
true);
httpSession->send(frame);
// ping pong
auto frame = std::make_shared<std::string>();
WebSocketFormat::wsFrameBuild(payload.c_str(),
payload.size(),
*frame,
WebSocketFormat::WebSocketFrameType::TEXT_FRAME,
true,
true);
httpSession->send(frame);
});
});
});
};
service->addTcpConnection(std::move(socket),
brynet::net::TcpService::AddSocketOption::WithEnterCallback(enterCallback),
brynet::net::TcpService::AddSocketOption::AddEnterCallback(enterCallback),
brynet::net::TcpService::AddSocketOption::WithMaxRecvBufferSize(10));
});
});
listenThread->startListen();

HttpRequest request;
request.setMethod(HttpRequest::HTTP_METHOD::HTTP_METHOD_GET);
request.setUrl("/ISteamUserAuth/AuthenticateUserTicket/v1/");
request.addHeadValue("Host", "api.steampowered.com");

HttpQueryParameter p;
p.add("key", "DCD9C36F1F54A96F707DFBE833600167");
p.add("appid", "929390");
p.add("ticket", "140000006FC57764C95D45085373F10401001001359F745C1800000001000000020000009DACD3DE1202A8C0431E100003000000B200000032000000040000005373F104010010016E2E0E009DACD3DE1202A8C000000000AAA16F5C2A518B5C0100FC96040000000000061129B849B0397DD62E0B1B0373451EC08E1BAB70FC18E21094FC5F4674EDD50226ABB33D71C601B8E65542FB9A9F48BFF87AC30904D272FAD5F15CD2D5428D44827BA58A45886119D6244D672A0C1909C5D7BD9096D96EB8BAC30E006BE6D405E5B25659CF3D343C9627078C5FD4CE0120D80DDB2FA09E76111143F132CA0B");
request.setQuery(p.getResult());

sock fd = brynet::net::base::Connect(false, "191.236.16.125", 80);
if (fd != INVALID_SOCKET)
std::string requestStr = request.getResult();

std::atomic<int> couner{0};

for (size_t i = 0; i < 10; i++)
{
auto socket = TcpSocket::Create(fd, false);
auto enterCallback = [](const TcpConnection::Ptr& session) {
HttpService::setup(session, [](const HttpSession::Ptr& httpSession) {
HttpRequest request;
request.setMethod(HttpRequest::HTTP_METHOD::HTTP_METHOD_GET);
request.setUrl("/httpgallery/chunked/chunkedimage.aspx");
request.addHeadValue("Host", "www.httpwatch.com");

std::string requestStr = request.getResult();
httpSession->send(requestStr.c_str(), requestStr.size());
httpSession->setHttpCallback([](const HTTPParser& httpParser, const HttpSession::Ptr& session) {
//http response handle
std::cout << httpParser.getBody() << std::endl;
std::cout << "len:" << httpParser.getBody().size() << std::endl;
std::flush(std::cout);
});
reqHttp(service,
connector,
requestStr,
{
AsyncConnector::ConnectOptions::WithAddr("23.73.140.64", 80),
AsyncConnector::ConnectOptions::WithTimeout(std::chrono::seconds(10)),
},
{
},
[&](const HTTPParser& httpParser, const HttpSession::Ptr& session) {
std::cout << ++couner << std::endl;
});
};

service->addTcpConnection(std::move(socket),
brynet::net::TcpService::AddSocketOption::WithEnterCallback(enterCallback),
brynet::net::TcpService::AddSocketOption::WithMaxRecvBufferSize(10));
}

std::cin.get();
return 0;
}
56 changes: 28 additions & 28 deletions examples/TestPromiseReceive.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ int main(int argc, char **argv)
}

auto server = TcpService::Create();
auto listenThread = ListenThread::Create();

listenThread->startListen(false, "0.0.0.0", atoi(argv[1]), [=](TcpSocket::Ptr socket){
auto listenThread = ListenThread::Create(false, "0.0.0.0", atoi(argv[1]), [=](TcpSocket::Ptr socket) {
socket->setNodelay();
auto enterCallback = [](const TcpConnection::Ptr& session) {
auto promiseReceive = setupPromiseReceive(session);
Expand All @@ -34,38 +32,40 @@ int main(int argc, char **argv)
auto headline = std::string(buffer, len);
std::cout << headline << std::endl;
return false;
})->receiveUntil("\r\n", [promiseReceive, contentLength](const char* buffer, size_t len) {
auto headerValue = std::string(buffer, len);
std::cout << headerValue << std::endl;
if (len > 2)
{
const static std::string ContentLenghtFlag = "Content-Length: ";
auto pos = headerValue.find(ContentLenghtFlag);
if (pos != std::string::npos)
})->receiveUntil("\r\n", [promiseReceive, contentLength](const char* buffer, size_t len) {
auto headerValue = std::string(buffer, len);
std::cout << headerValue << std::endl;
if (len > 2)
{
auto lenStr = headerValue.substr(pos+ ContentLenghtFlag.size(), headerValue.size());
*contentLength = std::stoi(lenStr);
const static std::string ContentLenghtFlag = "Content-Length: ";
auto pos = headerValue.find(ContentLenghtFlag);
if (pos != std::string::npos)
{
auto lenStr = headerValue.substr(pos + ContentLenghtFlag.size(), headerValue.size());
*contentLength = std::stoi(lenStr);
}
return true;
}
return true;
}
return false;
})->receive(contentLength, [session](const char* buffer, size_t len) {
HttpResponse response;
response.setStatus(HttpResponse::HTTP_RESPONSE_STATUS::OK);
response.setContentType("text/html; charset=utf-8");
response.setBody("<html>hello world </html>");
return false;
})->receive(contentLength, [session](const char* buffer, size_t len) {
HttpResponse response;
response.setStatus(HttpResponse::HTTP_RESPONSE_STATUS::OK);
response.setContentType("text/html; charset=utf-8");
response.setBody("<html>hello world </html>");

auto result = response.getResult();
session->send(result.c_str(), result.size());
session->postShutdown();
auto result = response.getResult();
session->send(result.c_str(), result.size());
session->postShutdown();

return false;
});
return false;
});
};
server->addTcpConnection(std::move(socket),
brynet::net::TcpService::AddSocketOption::WithEnterCallback(enterCallback),
brynet::net::TcpService::AddSocketOption::AddEnterCallback(enterCallback),
brynet::net::TcpService::AddSocketOption::WithMaxRecvBufferSize(10));
});
});

listenThread->startListen();

server->startWorkerThread(atoi(argv[2]));

Expand Down
Loading

0 comments on commit d37fc78

Please sign in to comment.