Skip to content

Commit

Permalink
Rename ignoreRequestBody to receiveDataLen
Browse files Browse the repository at this point in the history
Rename pre-processing method where many received chunks
can be received before final stream processing.

Include inner general unique server sequence, to release
upper layers from this task.

Issue: #5
  • Loading branch information
testillano committed Jul 10, 2022
1 parent 17cd64f commit 2b71f1c
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 11 deletions.
13 changes: 9 additions & 4 deletions include/ert/http2comm/Http2Server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ class Http2Server
ert::metrics::histogram_t *messages_size_bytes_rx_histogram_{};
ert::metrics::histogram_t *messages_size_bytes_tx_histogram_{};

std::atomic<std::uint64_t> reception_id_{};

protected:

nghttp2::asio_http2::server::http2 server_;
Expand Down Expand Up @@ -200,16 +202,18 @@ class Http2Server
* a dummy server could mock valid static responses regardless the content received).
*
* @param req nghttp2-asio request structure.
* This could be used to store data received inside a server internal indexed map.
*
* @return Return the boolean about ignoring request body copy. Default implementation returns 'false'.
* @return Boolean about internal copy of request body received. Default implementation returns 'true'.
*/
virtual bool ignoreRequestBody(const nghttp2::asio_http2::server::request& req) {
return false;
virtual bool receiveDataLen(const nghttp2::asio_http2::server::request& req) {
return true;
}

/**
* Virtual reception callback. Implementation is mandatory.
*
* @param receptionId server reception identifier (monotonically increased value).
* @param req nghttp2-asio request structure.
* @param requestBody request body received.
* @param receptionTimestampUs microseconds timestamp of reception.
Expand All @@ -218,7 +222,8 @@ class Http2Server
* @param responseBody response body to be filled by reference.
* @param responseDelayMs response delay in milliseconds to be filled by reference.
*/
virtual void receive(const nghttp2::asio_http2::server::request& req,
virtual void receive(const std::uint64_t &receptionId,
const nghttp2::asio_http2::server::request& req,
std::shared_ptr<std::stringstream> requestBody,
const std::chrono::microseconds &receptionTimestampUs,
unsigned int& statusCode,
Expand Down
8 changes: 8 additions & 0 deletions include/ert/http2comm/Stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ class Stream : public std::enable_shared_from_this<Stream>
std::string response_body_{};
boost::asio::deadline_timer *timer_{};

// Server sequence for this stream:
std::uint64_t reception_id_{};

// For metrics:
std::chrono::microseconds reception_timestamp_us_{}; // timestamp in microsecods

Expand All @@ -107,6 +110,11 @@ class Stream : public std::enable_shared_from_this<Stream>
return req_;
}

// set server sequence
void setReceptionId(const std::uint64_t &id) {
reception_id_ = id;
}

// Process reception
void process();

Expand Down
16 changes: 10 additions & 6 deletions src/Http2Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ SOFTWARE.
#include <string>
#include <thread>
#include <sstream>
#include <iterator>
#include <iostream>
#include <boost/exception/diagnostic_information.hpp>

Expand All @@ -57,7 +58,7 @@ namespace ert
namespace http2comm
{

Http2Server::Http2Server(const std::string& name, size_t workerThreads, boost::asio::io_service *timersIoService): name_(name), timers_io_service_(timersIoService) {
Http2Server::Http2Server(const std::string& name, size_t workerThreads, boost::asio::io_service *timersIoService): name_(name), timers_io_service_(timersIoService), reception_id_(0) {

queue_dispatcher_ = (workerThreads > 1) ? new QueueDispatcher(name + "_queueDispatcher", workerThreads) : nullptr;
}
Expand Down Expand Up @@ -188,17 +189,20 @@ nghttp2::asio_http2::server::request_cb Http2Server::handler()
auto stream = std::make_shared<Stream>(req, res, requestBody, this);
req.on_data([requestBody, stream, this](const uint8_t* data, std::size_t len)
{
if (len > 0)
if (len > 0) // https://stackoverflow.com/a/72925875/2576671
{
if (!ignoreRequestBody(stream->getReq())) {
if (receiveDataLen(stream->getReq())) {
// https://github.com/testillano/h2agent/issues/6 is caused when this is enabled, on high load and broke client connections:
// (mutexes does not solves the problem, and does not matter is shared_ptr requestBody is replaced by static type like
// stringstream; it seems that data is not correctly protected on lower layers)
std::copy(data, data + len, std::ostream_iterator<uint8_t>(*requestBody));
// (mutexes does not solves the problem neither std::move of data, and does not matter is shared_ptr requestBody is replaced
// by static type like stringstream; it seems that data is not correctly protected on lower layers, probably tatsuhiro-t nghttp2)
std::copy(data, data + len, std::ostream_iterator<std::uint8_t>(*requestBody));
}
}
else
{
reception_id_++;
stream->setReceptionId(reception_id_.load());

if (queue_dispatcher_) {
queue_dispatcher_->dispatch(stream);
}
Expand Down
2 changes: 1 addition & 1 deletion src/Stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ void Stream::process()
}
else
{
server_->receive(req_, request_body_, reception_timestamp_us_, status_code_, response_headers_, response_body_, responseDelayMs);
server_->receive(reception_id_, req_, request_body_, reception_timestamp_us_, status_code_, response_headers_, response_body_, responseDelayMs);
}
}
else
Expand Down

0 comments on commit 2b71f1c

Please sign in to comment.