From 11b9bbd166ab986670a428143f33d01928fc47ea Mon Sep 17 00:00:00 2001 From: ymmt Date: Fri, 2 Aug 2024 09:12:51 +0000 Subject: [PATCH 1/2] Update the GitHub Actions workflow --- .github/workflows/tests.yml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 734631a..b1aa7fd 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -8,26 +8,26 @@ jobs: strategy: fail-fast: false matrix: - os: [ubuntu-20.04, ubuntu-22.04] - compiler: [[gcc, g++], [clang, clang++]] + os: [ubuntu-20.04, ubuntu-22.04, ubuntu-24.04] + compiler: [{cc: gcc, cxx: g++}, {cc: clang, cxx: clang++}] timeout-minutes: 30 steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Install dependencies run: | sudo apt-get update sudo apt-get install -y libgoogle-perftools-dev libunwind-dev - name: Build + env: + CC: ${{ matrix.compiler.cc }} + CXX: ${{ matrix.compiler.cxx }} run: | $CC --version $CXX --version make -j $(nproc) CC=$CC CXX=$CXX OPTFLAGS="-O2 -fPIE" ./yrmcdsd & sleep 1 - env: - CC: ${{ matrix.compiler[0] }} - CXX: ${{ matrix.compiler[1] }} - name: Run tests run: | make YRMCDS_SERVER=localhost tests From 720fdd37661437c3862d7b4c4ea2b260672e2a81 Mon Sep 17 00:00:00 2001 From: ymmt Date: Mon, 5 Aug 2024 06:05:56 +0000 Subject: [PATCH 2/2] Early close file descriptors Resolves #93. `cybozu::resource` now keeps the file descriptor private and adds a new private method `close()` to close the file descriptor from the friend class `cybozu::reactor`. Subclasses can get the file descriptor via `int fileno()` method that returns -1 after closed. `cybozu::reactor` calls `cybozu::resource::close` when it removes the resource from the active set of resources and puts it to the pending destruction list. By this, the file descriptor of the closed resource is closed earlier. Other classes are updated to reference `fileno()`. --- cybozu/reactor.cpp | 7 +++++-- cybozu/reactor.hpp | 25 +++++++++++++++++++------ cybozu/signal.hpp | 7 +++++-- cybozu/tcp.cpp | 24 ++++++++++++++++++------ cybozu/tcp.hpp | 7 +++++-- src/counter/sockets.cpp | 5 ++++- src/memcache/sockets.cpp | 21 +++++++++++++++------ 7 files changed, 71 insertions(+), 25 deletions(-) diff --git a/cybozu/reactor.cpp b/cybozu/reactor.cpp index 5c7a07b..c22d769 100644 --- a/cybozu/reactor.cpp +++ b/cybozu/reactor.cpp @@ -37,7 +37,7 @@ void reactor::add_resource(std::unique_ptr res, int events) { if( res->m_reactor != nullptr ) throw std::logic_error(" already added!"); res->m_reactor = this; - const int fd = res->fileno(); + const int fd = res->m_fd; struct epoll_event ev; ev.events = events | EPOLLET; ev.data.fd = fd; @@ -47,7 +47,7 @@ void reactor::add_resource(std::unique_ptr res, int events) { } void reactor::modify_events(const resource& res, int events) { - const int fd = res.fileno(); + const int fd = res.m_fd; struct epoll_event ev; ev.events = events | EPOLLET; ev.data.fd = fd; @@ -78,12 +78,15 @@ void reactor::remove_resource(int fd) { dump_stack(); throw std::logic_error("bug in remove_resource"); } + auto res = it->second.get(); m_garbage.emplace_back( std::move(it->second) ); m_resources.erase(it); if( epoll_ctl(m_fd, EPOLL_CTL_DEL, fd, NULL) == -1 ) throw_unix_error(errno, "epoll_ctl(EPOLL_CTL_DEL)"); m_readables.erase(std::remove(m_readables.begin(), m_readables.end(), fd), m_readables.end()); + + res->close(); } void reactor::poll() { diff --git a/cybozu/reactor.hpp b/cybozu/reactor.hpp index f5376bb..b1b5736 100644 --- a/cybozu/reactor.hpp +++ b/cybozu/reactor.hpp @@ -37,11 +37,16 @@ class resource { // Close the file descriptor. virtual ~resource() { - ::close(m_fd); + close(); } // Return the UNIX file descriptor for this resource. - int fileno() const { return m_fd; } + // This returns -1 after the resource is invalidated. + int fileno() const { + lock_guard g(m_lock); + if( ! m_valid ) return -1; + return m_fd; + } // `true` if this resource is still valid. bool valid() const { @@ -118,13 +123,14 @@ class resource { } protected: - const int m_fd; reactor* m_reactor = nullptr; friend class reactor; private: + const int m_fd; bool m_valid = true; + bool m_closed = false; mutable spinlock m_lock; typedef std::unique_lock lock_guard; @@ -133,6 +139,13 @@ class resource { g.unlock(); on_invalidate(); } + + void close() { + lock_guard g(m_lock); + if( m_closed ) return; + ::close(m_fd); + m_closed = true; + } }; @@ -191,7 +204,7 @@ class reactor { // Only may call this when it stops reading // from a resource before it encounters `EAGAIN` or `EWOULDBLOCK`. void add_readable(const resource& res) { - m_readables.push_back(res.fileno()); + m_readables.push_back(res.m_fd); } // Add a removal request for a resource. @@ -202,7 +215,7 @@ class reactor { // calling this. void request_removal(const resource& res) { lock_guard g(m_lock); - m_drop_req.push_back(res.fileno()); + m_drop_req.push_back(res.m_fd); } bool has_garbage() const noexcept { @@ -238,7 +251,7 @@ class reactor { // Remove a registered resource. // This is only for the reactor thread. void remove_resource(const resource& res) { - remove_resource(res.fileno()); + remove_resource(res.m_fd); } private: diff --git a/cybozu/signal.hpp b/cybozu/signal.hpp index eacf964..0ee8efe 100644 --- a/cybozu/signal.hpp +++ b/cybozu/signal.hpp @@ -31,7 +31,7 @@ class signal_reader: public resource { signal_reader(const sigset_t *mask, callback_t callback): resource( signalfd(-1, mask, SFD_NONBLOCK|SFD_CLOEXEC) ), m_callback(callback) { - if( m_fd == -1 ) + if( fileno() == -1 ) throw_unix_error(errno, "signalfd"); } // Constructor. @@ -54,8 +54,11 @@ class signal_reader: public resource { virtual bool on_readable() override final { while( true ) { + int fd = fileno(); + if( fd == -1 ) return true; + struct signalfd_siginfo si; - ssize_t n = read(m_fd, &si, sizeof(si)); + ssize_t n = read(fd, &si, sizeof(si)); if( n == -1 ) { if( errno == EINTR ) continue; if( errno == EAGAIN || errno ==EWOULDBLOCK ) return true; diff --git a/cybozu/tcp.cpp b/cybozu/tcp.cpp index e32d72e..b9d4a77 100644 --- a/cybozu/tcp.cpp +++ b/cybozu/tcp.cpp @@ -160,6 +160,9 @@ void tcp_socket::free_buffers() { } bool tcp_socket::_send(const char* p, std::size_t len, lock_guard& g) { + int fd = fileno(); + if( fd == -1 ) return false; + while( ! can_send(len) ) { on_buffer_full(); m_cond_write.wait(g); @@ -168,7 +171,7 @@ bool tcp_socket::_send(const char* p, std::size_t len, lock_guard& g) { if( m_pending.empty() ) { while( len > 0 ) { - ssize_t n = ::send(m_fd, p, len, 0); + ssize_t n = ::send(fd, p, len, 0); if( n == -1 ) { if( errno == EAGAIN || errno == EWOULDBLOCK ) break; if( errno == EINTR ) continue; @@ -226,6 +229,9 @@ bool tcp_socket::_send(const char* p, std::size_t len, lock_guard& g) { } bool tcp_socket::_sendv(const iovec* iov, const int iovcnt, lock_guard& g) { + int fd = fileno(); + if( fd == -1 ) return false; + std::size_t total = 0; for( int i = 0; i < iovcnt; ++i ) { total += iov[i].len; @@ -250,7 +256,7 @@ bool tcp_socket::_sendv(const iovec* iov, const int iovcnt, lock_guard& g) { if( m_pending.empty() ) { while( ind < v_size ) { - ssize_t n = ::writev(m_fd, &(v[ind]), v_size - ind); + ssize_t n = ::writev(fd, &(v[ind]), v_size - ind); if( n == -1 ) { if( errno == EAGAIN || errno == EWOULDBLOCK ) break; if( errno == EINTR ) continue; @@ -329,8 +335,11 @@ bool tcp_socket::_sendv(const iovec* iov, const int iovcnt, lock_guard& g) { bool tcp_socket::write_pending_data() { lock_guard g(m_lock); + int fd = fileno(); + if( fd == -1 ) return true; + while( ! m_tmpbuf.empty() ) { - ssize_t n = ::send(m_fd, m_tmpbuf.data(), m_tmpbuf.size(), 0); + ssize_t n = ::send(fd, m_tmpbuf.data(), m_tmpbuf.size(), 0); if( n == -1 ) { if( errno == EINTR ) continue; if( errno == EAGAIN || errno == EWOULDBLOCK ) return true; @@ -353,7 +362,7 @@ bool tcp_socket::write_pending_data() { std::tie(p, len, sent) = t; while( len != sent ) { - ssize_t n = ::send(m_fd, p+sent, len-sent, 0); + ssize_t n = ::send(fd, p+sent, len-sent, 0); if( n == -1 ) { if( errno == EINTR ) continue; if( errno == EAGAIN || errno == EWOULDBLOCK ) break; @@ -453,6 +462,9 @@ setup_server_socket(const char* bind_addr, std::uint16_t port, bool freebind) { } bool tcp_server_socket::on_readable() { + int fd = fileno(); + if( fd == -1 ) return true; + while( true ) { union { struct sockaddr sa; @@ -460,10 +472,10 @@ bool tcp_server_socket::on_readable() { } addr; socklen_t addrlen = sizeof(addr); #ifdef _GNU_SOURCE - int s = ::accept4(m_fd, &(addr.sa), &addrlen, + int s = ::accept4(fd, &(addr.sa), &addrlen, SOCK_NONBLOCK|SOCK_CLOEXEC); #else - int s = ::accept(m_fd, &(addr.sa), &addrlen); + int s = ::accept(fd, &(addr.sa), &addrlen); if( s != -1 ) { int fl = fcntl(s, F_GETFL, 0); if( fl == -1 ) fl = 0; diff --git a/cybozu/tcp.hpp b/cybozu/tcp.hpp index d54ddca..5826378 100644 --- a/cybozu/tcp.hpp +++ b/cybozu/tcp.hpp @@ -185,7 +185,7 @@ class tcp_socket: public resource { } virtual void on_invalidate() override { - ::shutdown(m_fd, SHUT_RDWR); + ::shutdown(fileno(), SHUT_RDWR); free_buffers(); m_cond_write.notify_all(); } @@ -223,10 +223,13 @@ class tcp_socket: public resource { return m_pending.empty() && m_tmpbuf.empty(); } void _flush() { + int fd = fileno(); + if( fd == -1 ) return; + // with TCP_CORK, setting TCP_NODELAY effectively flushes // the kernel send buffer. int v = 1; - if( setsockopt(m_fd, IPPROTO_TCP, TCP_NODELAY, &v, sizeof(v)) == -1 ) + if( setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &v, sizeof(v)) == -1 ) throw_unix_error(errno, "setsockopt(TCP_NODELAY)"); } void free_buffers(); diff --git a/src/counter/sockets.cpp b/src/counter/sockets.cpp index e5cb8a1..e2583a9 100644 --- a/src/counter/sockets.cpp +++ b/src/counter/sockets.cpp @@ -24,6 +24,9 @@ counter_socket::counter_socket(int fd, g_stats.total_connections.fetch_add(1); m_recvjob = [this](cybozu::dynbuf& buf) { + int fd = fileno(); + if( fd == -1 ) return; + // load pending data if( ! m_pending.empty() ) { buf.append(m_pending.data(), m_pending.size()); @@ -32,7 +35,7 @@ counter_socket::counter_socket(int fd, while( true ) { char* p = buf.prepare(MAX_RECVSIZE); - ssize_t n = ::recv(m_fd, p, MAX_RECVSIZE, 0); + ssize_t n = ::recv(fd, p, MAX_RECVSIZE, 0); if( n == -1 ) { if( errno == EAGAIN || errno == EWOULDBLOCK ) break; diff --git a/src/memcache/sockets.cpp b/src/memcache/sockets.cpp index 142ea0b..38f7e4d 100644 --- a/src/memcache/sockets.cpp +++ b/src/memcache/sockets.cpp @@ -43,8 +43,11 @@ memcache_socket::memcache_socket(int fd, g_stats.total_connections.fetch_add(1, relaxed); m_recvjob = [this](cybozu::dynbuf& buf) { + int fd = fileno(); + if( fd == -1 ) return; + // set lock context for objects. - g_context = m_fd; + g_context = fd; // load pending data if( ! m_pending.empty() ) { @@ -54,7 +57,7 @@ memcache_socket::memcache_socket(int fd, while( true ) { char* p = buf.prepare(MAX_RECVSIZE); - ssize_t n = ::recv(m_fd, p, MAX_RECVSIZE, 0); + ssize_t n = ::recv(fd, p, MAX_RECVSIZE, 0); if( n == -1 ) { if( errno == EAGAIN || errno == EWOULDBLOCK ) break; @@ -956,9 +959,12 @@ void memcache_socket::cmd_text(const memcache::text_request& cmd) { } bool repl_socket::on_readable() { + int fd = fileno(); + if( fd == -1 ) return true; + // recv and drop. while( true ) { - ssize_t n = ::recv(m_fd, &m_recvbuf[0], MAX_RECVSIZE, 0); + ssize_t n = ::recv(fd, &m_recvbuf[0], MAX_RECVSIZE, 0); if( n == -1 ) { if( errno == EAGAIN || errno == EWOULDBLOCK ) break; @@ -967,7 +973,7 @@ bool repl_socket::on_readable() { if( errno == ECONNRESET ) { std::string addr = "unknown address"; try { - addr = cybozu::get_peer_ip_address(m_fd).str(); + addr = cybozu::get_peer_ip_address(fd).str(); } catch (...) { // ignore errors } @@ -979,7 +985,7 @@ bool repl_socket::on_readable() { if( n == 0 ) { std::string addr = "unknown address"; try { - addr = cybozu::get_peer_ip_address(m_fd).str(); + addr = cybozu::get_peer_ip_address(fd).str(); } catch (...) { // ignore errors } @@ -1003,6 +1009,9 @@ bool repl_socket::on_writable() { } bool repl_client_socket::on_readable() { + int fd = fileno(); + if( fd == -1 ) return true; + // This function is executed in the same thread as the function that sends // heartbeats. If this function takes a very long time, no heartbeats will // be sent, and this process will be judged dead by the master. To prevent @@ -1012,7 +1021,7 @@ bool repl_client_socket::on_readable() { size_t n_iter = 0; while( true ) { char* p = m_recvbuf.prepare(MAX_RECVSIZE); - ssize_t n = ::recv(m_fd, p, MAX_RECVSIZE, 0); + ssize_t n = ::recv(fd, p, MAX_RECVSIZE, 0); if( n == -1 ) { if( errno == EAGAIN || errno == EWOULDBLOCK ) break;