Skip to content

Commit

Permalink
working server
Browse files Browse the repository at this point in the history
  • Loading branch information
kelbon committed Oct 23, 2024
1 parent bdb9383 commit a42e23e
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 14 deletions.
1 change: 0 additions & 1 deletion include/tgbm/net/connectiion_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@ struct pool_t {
}
void* mem = resource()->allocate(sizeof(node_t), alignof(node_t));
on_scope_failure(free_mem) {
LOG_DEBUG("{} CALLED!!!", mem);
--borrowed_count;
resource()->deallocate(mem, sizeof(node_t), alignof(node_t));
};
Expand Down
5 changes: 4 additions & 1 deletion include/tgbm/net/http2_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,10 @@ struct http2_server : http_server {
[[nodiscard]] bool stop_requested() const noexcept {
return _stop_requested.load(std::memory_order_acquire);
}

// not blocking, starts listening on other threads
// returns false if already started
bool start();
// start + blocking
void run();
void stop();
};
Expand Down
2 changes: 1 addition & 1 deletion src/net/http2/protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ void parse_http2_request_headers(hpack::decoder& d, std::span<const hpack::byte_
if (path_parsed)
throw protocol_error{};
path_parsed = true;
req.path = header.name.str();
req.path = header.value.str();
} else if (header.name == ":method") {
if (method_parsed)
throw protocol_error{};
Expand Down
44 changes: 33 additions & 11 deletions src/net/http2_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ struct frames_buffer {
// TODO rm? std::vector<on_sended_t> todo_when_sended;
std::coroutine_handle<> writer = nullptr; // not nullptr if writer waits next frames

explicit frames_buffer(client_session& ses) noexcept : session(&ses) {
}

// for writer
KELCORO_CO_AWAIT_REQUIRED auto next_work();

Expand Down Expand Up @@ -91,6 +94,8 @@ struct client_session {
hpack::decoder decoder;
hpack::encoder encoder;
frames_buffer framebuf;
// for supporting SETTINGS table reduction while connection in progress
uint32_t table_size_decrease = 0;
bool shutted_down = false;

size_t refcount = 0; // must be accessed from one thread (reader, )
Expand All @@ -111,7 +116,8 @@ struct client_session {
client_settings(client_opts),
decoder(client_opts.header_table_size),
server_settings(server_opts),
encoder(server_opts.header_table_size) {
encoder(server_opts.header_table_size),
framebuf(*this) {
assert(server);
server->opened_sessions.fetch_add(1, std::memory_order_acq_rel);
start_writer();
Expand Down Expand Up @@ -264,10 +270,22 @@ void frames_buffer::push_data(http2::stream_id_t streamid, std::span<const byte_
}
}

static bytes_t encode_response_headers(hpack::encoder& encoder, const http_response& rsp) {
static bytes_t encode_response_headers(client_session& ses, const http_response& rsp) {
hpack::encoder& encoder = ses.encoder;
bytes_t hdrs;
auto out = std::back_inserter(hdrs);
using enum hpack::static_table_t::values;
if (ses.table_size_decrease) {
uint32_t dec = ses.table_size_decrease;
ses.table_size_decrease = 0;
LOG("[HTTP2]: reducing table size after receiving SETTINGS, old size: {}, decrease: {}",
encoder.dyntab.max_size(), dec);

if (encoder.dyntab.max_size() - dec)
throw protocol_error{};
encoder.encode_dynamic_table_size_update(encoder.dyntab.max_size() - dec, out);
}

switch (rsp.status) {
#define TGBM_ENCODE_STATUS_CASE(NMB) \
case NMB: \
Expand All @@ -294,7 +312,7 @@ static bytes_t encode_response_headers(hpack::encoder& encoder, const http_respo
}

void frames_buffer::push_response(http2::stream_id_t streamid, http_response&& rsp) {
push_headers(streamid, encode_response_headers(session->encoder, rsp), !rsp.body.empty());
push_headers(streamid, encode_response_headers(*session, rsp), !rsp.body.empty());
if (!rsp.body.empty())
push_data(streamid, rsp.body);
commit();
Expand Down Expand Up @@ -389,9 +407,8 @@ void client_session::receive_data(http2::frame_header h, std::span<byte_t> data)
fmt::format("incorrect stream ordering, {} stream DATA sended before HEADERS", h.stream_id));
}
it->second.body.data.insert(it->second.body.data.end(), data.begin(), data.end());
if (h.flags & http2::flags::END_STREAM) {
if (h.flags & http2::flags::END_STREAM)
start_handle_request(this, it->first, std::move(it->second));
}
}
}

Expand Down Expand Up @@ -448,7 +465,8 @@ dd::job reader_for(client_session_ptr session) try {
constexpr uint32_t h2fhl = frame_header_len;

auto validate_frame_header = [](frame_header h) -> bool {
return h.length <= frame_len_max && h.stream_id <= max_stream_id && (h.stream_id % 2) == 1;
return h.length <= frame_len_max && h.stream_id <= max_stream_id &&
(h.stream_id == 0 || (h.stream_id % 2) == 1);
};

for (;;) {
Expand Down Expand Up @@ -500,7 +518,7 @@ dd::job reader_for(client_session_ptr session) try {
client_settings_visitor vtor{session->client_settings};
settings_frame::parse(framehdr, buf, vtor);
if (vtor.header_table_size_decrease)
throw unimplemented("reducing HPACK dynamic table when connection in progress");
session->table_size_decrease += vtor.header_table_size_decrease;
break;
}
case PUSH_PROMISE:
Expand Down Expand Up @@ -693,9 +711,9 @@ void http2_server::listen(asio::ip::tcp::endpoint ep) {
start_accept(ep); // start accept
}

void http2_server::run() {
bool http2_server::start() {
if (work_guard)
return;
return false;
if (io_ctx.stopped()) {
// drop old sessions before starting new ones
io_ctx.restart();
Expand All @@ -705,8 +723,12 @@ void http2_server::run() {
work_guard = std::shared_ptr<work_guard_t>(new auto(asio::make_work_guard(io_ctx)));
for (dd::worker& w : tp.workers_range())
dd::schedule_to(w, [&ctx = this->io_ctx, g = work_guard] { ctx.run(); });
// TODO исполняттся только на тредпуле? Хотя нахер
io_ctx.run();
return true;
}

void http2_server::run() {
if (start())
io_ctx.run();
}

void http2_server::stop() {
Expand Down

0 comments on commit a42e23e

Please sign in to comment.