From 24e35e7c611e37a2d42d02aadc393e9687c8bcdb Mon Sep 17 00:00:00 2001 From: Lanzheng Liu Date: Tue, 21 May 2024 17:49:15 +0800 Subject: [PATCH] add fini_hook for photon and for http client (#484) Signed-off-by: liulanzheng --- CMakeLists.txt | 3 +- net/http/client.cpp | 30 +++++++++----- net/http/test/client_function_test.cpp | 56 ++++++++++++++++++++++++++ photon.cpp | 14 +++++++ photon.h | 7 +++- 5 files changed, 97 insertions(+), 13 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index bc934fe2..f5e88f4a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -348,9 +348,8 @@ if (PHOTON_BUILD_TESTING) # add_compile_options(-Wno-error) + include_directories(include ${GFLAGS_INCLUDE_DIRS} ${GOOGLETEST_INCLUDE_DIRS}) add_library(ci-tools STATIC test/ci-tools.cpp) - - include_directories(photon_static ${GFLAGS_INCLUDE_DIRS} ${GOOGLETEST_INCLUDE_DIRS}) link_libraries(${GFLAGS_LIBRARIES} ${GOOGLETEST_LIBRARIES} ci-tools) add_subdirectory(examples) diff --git a/net/http/client.cpp b/net/http/client.cpp index eadb6c27..e808b512 100644 --- a/net/http/client.cpp +++ b/net/http/client.cpp @@ -24,6 +24,7 @@ limitations under the License. #include #include #include +#include namespace photon { namespace net { @@ -51,9 +52,17 @@ class PooledDialer { tcpsock.reset(new_tcp_socket_pool(tcp_cli, -1, true)); tlssock.reset(new_tcp_socket_pool(tls_cli, -1, true)); udssock.reset(new_uds_client()); + photon::fini_hook({this, &PooledDialer::at_photon_fini}); } ~PooledDialer() { + } + + void at_photon_fini() { + resolver.reset(); + udssock.reset(); + tlssock.reset(); + tcpsock.reset(); if (tls_ctx_ownership) delete tls_ctx; } @@ -127,14 +136,19 @@ enum RoundtripStatus { class ClientImpl : public Client { public: - PooledDialer m_dialer; CommonHeaders<> m_common_headers; + TLSContext *m_tls_ctx; ICookieJar *m_cookie_jar; ClientImpl(ICookieJar *cookie_jar, TLSContext *tls_ctx) : - m_dialer(tls_ctx), + m_tls_ctx(tls_ctx), m_cookie_jar(cookie_jar) { } + PooledDialer& get_dialer() { + thread_local PooledDialer dialer(m_tls_ctx); + return dialer; + } + using SocketStream_ptr = std::unique_ptr; int redirect(Operation* op) { if (op->resp.body_size() > 0) { @@ -165,22 +179,18 @@ class ClientImpl : public Client { return ROUNDTRIP_REDIRECT; } - int concurreny = 0; int do_roundtrip(Operation* op, Timeout tmo) { - concurreny++; - LOG_DEBUG(VALUE(concurreny)); - DEFER(concurreny--); op->status_code = -1; if (tmo.timeout() == 0) LOG_ERROR_RETURN(ETIMEDOUT, ROUNDTRIP_FAILED, "connection timedout"); auto &req = op->req; ISocketStream* s; if (m_proxy && !m_proxy_url.empty()) - s = m_dialer.dial(m_proxy_url, tmo.timeout()); + s = get_dialer().dial(m_proxy_url, tmo.timeout()); else if (!op->uds_path.empty()) - s = m_dialer.dial(op->uds_path, tmo.timeout()); + s = get_dialer().dial(op->uds_path, tmo.timeout()); else - s = m_dialer.dial(req, tmo.timeout()); + s = get_dialer().dial(req, tmo.timeout()); if (!s) { if (errno == ECONNREFUSED || errno == ENOENT) { LOG_ERROR_RETURN(0, ROUNDTRIP_FAST_RETRY, "connection refused") @@ -288,7 +298,7 @@ class ClientImpl : public Client { } ISocketStream* native_connect(std::string_view host, uint16_t port, bool secure, uint64_t timeout) override { - return m_dialer.dial(host, port, secure, timeout); + return get_dialer().dial(host, port, secure, timeout); } CommonHeaders<>* common_headers() override { diff --git a/net/http/test/client_function_test.cpp b/net/http/test/client_function_test.cpp index 851e079b..7d76264c 100644 --- a/net/http/test/client_function_test.cpp +++ b/net/http/test/client_function_test.cpp @@ -530,6 +530,62 @@ TEST(http_client, partial_body) { EXPECT_EQ(true, buf == "http_clien"); } + +TEST(http_client, vcpu) { + system("mkdir -p /tmp/ease_ut/http_test/"); + system("echo \"this is a http_client request body text for socket stream\" > /tmp/ease_ut/http_test/ease-httpclient-gettestfile"); + auto tcpserver = new_tcp_socket_server(); + tcpserver->setsockopt(IPPROTO_TCP, TCP_NODELAY, 1); + tcpserver->bind_v4localhost(); + tcpserver->listen(); + DEFER(delete tcpserver); + auto server = new_http_server(); + DEFER(delete server); + auto fs = photon::fs::new_localfs_adaptor("/tmp/ease_ut/http_test/"); + DEFER(delete fs); + auto fs_handler = new_fs_handler(fs); + DEFER(delete fs_handler); + server->add_handler(fs_handler); + tcpserver->set_handler(server->get_connection_handler()); + tcpserver->start_loop(); + auto target = to_url(tcpserver, "/ease-httpclient-gettestfile"); + auto client = new_http_client(); + DEFER(delete client); + + int vcpu_num = 16; + photon::semaphore sem(0); + std::thread th[vcpu_num]; + for (int i = 0; i < vcpu_num; i++) { + th[i] = std::thread([&] { + photon::init(photon::INIT_EVENT_DEFAULT, photon::INIT_IO_NONE); + DEFER({ + photon::fini(); + sem.signal(1); + }); + + for (int round = 0; round < 10; round++) { + auto op = client->new_operation(Verb::GET, target); + DEFER(client->destroy_operation(op)); + op->req.headers.content_length(0); + int ret = client->call(op); + GTEST_ASSERT_EQ(0, ret); + + char resp_body_buf[1024]; + EXPECT_EQ(sizeof(socket_buf), op->resp.resource_size()); + ret = op->resp.read(resp_body_buf, sizeof(socket_buf)); + EXPECT_EQ(sizeof(socket_buf), ret); + resp_body_buf[sizeof(socket_buf) - 1] = '\0'; + LOG_DEBUG(resp_body_buf); + EXPECT_EQ(0, strcmp(resp_body_buf, socket_buf)); + } + }); + } + + sem.wait(vcpu_num); + for (int i = 0; i < vcpu_num; i++) + th[i].join(); +} + TEST(DISABLED_http_client, ipv6) { // make sure runing in a ipv6-ready environment auto client = new_http_client(); DEFER(delete client); diff --git a/photon.cpp b/photon.cpp index b20379c8..4cf6ac2a 100644 --- a/photon.cpp +++ b/photon.cpp @@ -29,6 +29,8 @@ limitations under the License. #include "net/curl.h" #include "net/socket.h" #include "fs/exportfs.h" +#include "common/callback.h" +#include namespace photon { @@ -94,7 +96,19 @@ int init(uint64_t event_engine, uint64_t io_engine, const PhotonOptions& options return 0; } +static std::vector>& get_hook_vector() { + thread_local std::vector> hooks; + return hooks; +} + +void fini_hook(Delegate handler) { + get_hook_vector().emplace_back(handler); +} + int fini() { + for (auto h : get_hook_vector()) { + h.fire(); + } #ifdef __linux__ FINI_IO(LIBAIO, libaio_wrapper) FINI_IO(SOCKET_EDGE_TRIGGER, et_poller) diff --git a/photon.h b/photon.h index 8c2d75fb..1d903ab3 100644 --- a/photon.h +++ b/photon.h @@ -17,6 +17,7 @@ limitations under the License. #pragma once #include +#include namespace photon { @@ -68,5 +69,9 @@ int init(uint64_t event_engine = INIT_EVENT_DEFAULT, */ int fini(); +/** + * @brief add callbacks on fini() + */ +void fini_hook(Delegate handler); -} \ No newline at end of file +}