Skip to content

Commit

Permalink
-
Browse files Browse the repository at this point in the history
  • Loading branch information
kelbon committed Oct 23, 2024
1 parent f7ae711 commit bdb9383
Show file tree
Hide file tree
Showing 12 changed files with 975 additions and 15 deletions.
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ set(TGBM_SRC_LIST
src/Bot.cpp
src/EventHandler.cpp
src/TgTypeParser.cpp
src/net/http2/protocol.cpp
src/net/http2_server.cpp
src/net/tcp_connection.cpp
src/net/http11_client.cpp
src/net/http_client.cpp
Expand Down Expand Up @@ -179,5 +181,3 @@ set_target_properties(tgbm_main PROPERTIES
CMAKE_CXX_STANDARD_REQUIRED ON
CXX_STANDARD 20)

target_compile_definitions(tgbmlib PUBLIC TGBM_SSL_KEYS_FILE="E:/sslkeys.txt")
target_compile_definitions(tgbmlib PUBLIC TGBM_SSL_ADDITIONAL_CERTIFICATE_PATH="E:/ssl_cert_from_curl.pem")
4 changes: 2 additions & 2 deletions include/tgbm/net/http2/protocol.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ struct push_promise_frame {
// if ACK not setted, requires ping back
struct ping_frame {
frame_header header;
std::byte data[8] = {};
byte_t data[8] = {};

[[nodiscard]] constexpr uint64_t get_data() noexcept {
return std::bit_cast<uint64_t>(data);
Expand All @@ -434,7 +434,7 @@ struct ping_frame {
[[nodiscard]] static ping_frame parse(frame_header h, std::span<const byte_t> bytes) {
assert(h.type == frame_e::PING && h.length == bytes.size());
ping_frame f(h);
if (h.length != 8)
if (h.stream_id != 0 || h.length != 8)
throw protocol_error{};
memcpy(f.data, bytes.data(), 8);
return f;
Expand Down
2 changes: 1 addition & 1 deletion include/tgbm/net/http2_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ struct http2_client_options {
// sends ping when there are no requests(for keeping alive). disabled by default
duration_t ping_interval = duration_t::max();
// duration_t::max() disables timeouts
duration_t timeout_check_interval = std::chrono::milliseconds(2); // TODO DEBUG std::chrono::seconds(1);
duration_t timeout_check_interval = std::chrono::seconds(1);
};

struct http2_client : http_client {
Expand Down
113 changes: 113 additions & 0 deletions include/tgbm/net/http2_server.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
#pragma once

#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/address.hpp>

#include <boost/asio/ip/tcp.hpp>

#include <filesystem>

#include <kelcoro/job.hpp>
#include <kelcoro/task.hpp>
#include <kelcoro/thread_pool.hpp>
#include <anyany/anyany.hpp>

#include "boost/smart_ptr/intrusive_ptr.hpp"
#include "tgbm/net/http_base.hpp"
#include "tgbm/net/tcp_connection.hpp"
#include "tgbm/tools/deadline.hpp"
#include "tgbm/tools/memory.hpp"

namespace tgbm {

namespace asio = boost::asio;

struct http_server {
virtual dd::task<http_response> handle_request(http_request) = 0;
virtual ~http_server() = default;
};

struct http2_server_options {
// required to set
std::filesystem::path ssl_cert_path;
std::filesystem::path private_key_path;
uint32_t max_send_frame_size = 8 * 1024; // 8 KB
uint32_t max_receive_frame_size = uint32_t(-1);
uint32_t hpack_dyntab_size = 4096;
uint32_t initial_stream_window_size = -1;
uint32_t max_concurrent_stream_per_connection = -1;
// TODO use
duration_t idle_timeout; // when drop client if it not send anything
// TODO ? somehow handle overloading, мб через таймауты на отправку?..
// хотя самое логичное это как то детектить перегрузку и обрывать новые стримы если перегруз
// (settings и поставить макс конкурент стрим 0)
};

struct http2_server;

using http2_server_ptr = boost::intrusive_ptr<http2_server>;

struct ssl_context;
using ssl_context_ptr = boost::intrusive_ptr<ssl_context>;

struct http2_server : http_server {
private:
asio::io_context io_ctx;
http2_server_options options;
tcp_connection_options tcp_options;
ssl_context_ptr sslctx = nullptr;
std::atomic<size_t> refcount = 0;
std::atomic_bool _stop_requested = false;
// accepts on all threads, but each connection works only on one worker
// by value, because work endlessly on io_ctx
dd::thread_pool tp;
using work_guard_t = decltype(asio::make_work_guard(io_ctx));
std::shared_ptr<work_guard_t> work_guard = nullptr;
friend struct client_session;
std::atomic_size_t opened_sessions = 0;

/*
acceptы on all threads, then creates session
which works on one thread
*/
dd::job start_accept(asio::ip::tcp::endpoint);

friend void intrusive_ptr_add_ref(http2_server* server) noexcept {
server->refcount.fetch_add(1, std::memory_order_acq_rel);
}
friend void intrusive_ptr_release(http2_server* server) noexcept {
if (server->refcount.fetch_sub(1, std::memory_order_acq_rel) == 1)
delete server;
}

protected:
// protected, must not be created on stack
~http2_server() {
stop();
}

public:
explicit http2_server(http2_server_options, tcp_connection_options = {},
size_t listen_thread_count = std::thread::hardware_concurrency());

http2_server(http2_server&&) = delete;
void operator=(http2_server&&) = delete;

const http2_server_options& get_options() const noexcept {
return options;
}
// adds addr to listen addrs
void listen(asio::ip::tcp::endpoint);
void listen(asio::ip::address addr, asio::ip::port_type port = 443) {
return listen({addr, port});
}

[[nodiscard]] bool stop_requested() const noexcept {
return _stop_requested.load(std::memory_order_acquire);
}

void run();
void stop();
};

} // namespace tgbm
1 change: 1 addition & 0 deletions include/tgbm/net/ssl_context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ ssl_context_ptr make_ssl_context_for_http2();

ssl_context_ptr make_ssl_context_for_http11();

// returns null on error
ssl_context_ptr make_ssl_context_for_server(std::filesystem::path certificate,
std::filesystem::path server_private_key);

Expand Down
3 changes: 2 additions & 1 deletion include/tgbm/net/tcp_connection.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#pragma once
// TODO wrap asio/detail/config.hpp (using include_next)
#include "boost/smart_ptr/intrusive_ptr.hpp"
#include <boost/smart_ptr/intrusive_ptr.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
// windows is really bad
Expand All @@ -10,6 +10,7 @@
#undef min
#undef max

#include "tgbm/net/errors.hpp"
#include "tgbm/tools/macro.hpp"
#include "tgbm/net/ssl_context.hpp"
#include "tgbm/logger.h"
Expand Down
5 changes: 4 additions & 1 deletion src/Bot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,16 @@ dd::task<void> Bot::get_and_handle_updates(std::chrono::seconds update_wait_time
on_scope_exit {
_client->stop();
};
repeat_try:
try {
co_foreach(Update::Ptr update,
long_poll(get_api(), 100, update_wait_timeout, nullptr, /*confirm_before_handle=*/true)) {
_eventHandler.handleUpdate(update);
}
} catch (std::exception& e) {
LOG_ERR("getUpdates ended with exception, http client will be stopped, what: {}", e.what());
LOG_ERR("Bot getUpdates ended with exception, its ignored, err: {}", e.what());
goto repeat_try;
// LOG_ERR("getUpdates ended with exception, http client will be stopped, what: {}", e.what());
} catch (...) {
LOG_ERR("getUpdates ended with unknown exception, http client will be stopped");
}
Expand Down
56 changes: 56 additions & 0 deletions src/net/http2/protocol.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#include "tgbm/net/http2/hpack.hpp"
#include "tgbm/net/http_base.hpp"
#include "tgbm/net/http2/protocol.hpp"

namespace tgbm::http2 {

void parse_http2_request_headers(hpack::decoder& d, std::span<const hpack::byte_t> bytes, http_request& req) {
const auto* in = bytes.data();
const auto* e = in + bytes.size();
hpack::header_view header;

// parse required pseudoheaders

bool scheme_parsed = false;
bool path_parsed = false;
bool method_parsed = false;
bool authority_parsed = false;
while (in != e) {
d.decode_header(in, e, header);
if (!header) // skip dynamic size updates
continue;
if (header.name == ":path") {
if (path_parsed)
throw protocol_error{};
path_parsed = true;
req.path = header.name.str();
} else if (header.name == ":method") {
if (method_parsed)
throw protocol_error{};
method_parsed = true;
enum_from_string(header.value.str(), req.method);
} else if (header.name == ":scheme") {
if (scheme_parsed)
throw protocol_error{};
scheme_parsed = true;
enum_from_string(header.value.str(), req.scheme);
} else if (header.name == ":authority") {
if (authority_parsed)
throw protocol_error{};
authority_parsed = true;
req.authority = header.value.str();
} else {
break;
}
}
if (header)
req.headers.push_back(http_header_t(std::string(header.name.str()), std::string(header.value.str())));
while (in != e) {
d.decode_header(in, e, header);
if (!header)
continue;
req.headers.push_back(http_header_t(std::string(header.name.str()), std::string(header.value.str())));
}
}

} // namespace tgbm::http2
Loading

0 comments on commit bdb9383

Please sign in to comment.