diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 734631a..b1aa7fd 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -8,26 +8,26 @@ jobs: strategy: fail-fast: false matrix: - os: [ubuntu-20.04, ubuntu-22.04] - compiler: [[gcc, g++], [clang, clang++]] + os: [ubuntu-20.04, ubuntu-22.04, ubuntu-24.04] + compiler: [{cc: gcc, cxx: g++}, {cc: clang, cxx: clang++}] timeout-minutes: 30 steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Install dependencies run: | sudo apt-get update sudo apt-get install -y libgoogle-perftools-dev libunwind-dev - name: Build + env: + CC: ${{ matrix.compiler.cc }} + CXX: ${{ matrix.compiler.cxx }} run: | $CC --version $CXX --version make -j $(nproc) CC=$CC CXX=$CXX OPTFLAGS="-O2 -fPIE" ./yrmcdsd & sleep 1 - env: - CC: ${{ matrix.compiler[0] }} - CXX: ${{ matrix.compiler[1] }} - name: Run tests run: | make YRMCDS_SERVER=localhost tests diff --git a/cybozu/reactor.cpp b/cybozu/reactor.cpp index 5c7a07b..c22d769 100644 --- a/cybozu/reactor.cpp +++ b/cybozu/reactor.cpp @@ -37,7 +37,7 @@ void reactor::add_resource(std::unique_ptr res, int events) { if( res->m_reactor != nullptr ) throw std::logic_error(" already added!"); res->m_reactor = this; - const int fd = res->fileno(); + const int fd = res->m_fd; struct epoll_event ev; ev.events = events | EPOLLET; ev.data.fd = fd; @@ -47,7 +47,7 @@ void reactor::add_resource(std::unique_ptr res, int events) { } void reactor::modify_events(const resource& res, int events) { - const int fd = res.fileno(); + const int fd = res.m_fd; struct epoll_event ev; ev.events = events | EPOLLET; ev.data.fd = fd; @@ -78,12 +78,15 @@ void reactor::remove_resource(int fd) { dump_stack(); throw std::logic_error("bug in remove_resource"); } + auto res = it->second.get(); m_garbage.emplace_back( std::move(it->second) ); m_resources.erase(it); if( epoll_ctl(m_fd, EPOLL_CTL_DEL, fd, NULL) == -1 ) throw_unix_error(errno, "epoll_ctl(EPOLL_CTL_DEL)"); m_readables.erase(std::remove(m_readables.begin(), m_readables.end(), fd), m_readables.end()); + + res->close(); } void reactor::poll() { diff --git a/cybozu/reactor.hpp b/cybozu/reactor.hpp index f5376bb..b1b5736 100644 --- a/cybozu/reactor.hpp +++ b/cybozu/reactor.hpp @@ -37,11 +37,16 @@ class resource { // Close the file descriptor. virtual ~resource() { - ::close(m_fd); + close(); } // Return the UNIX file descriptor for this resource. - int fileno() const { return m_fd; } + // This returns -1 after the resource is invalidated. + int fileno() const { + lock_guard g(m_lock); + if( ! m_valid ) return -1; + return m_fd; + } // `true` if this resource is still valid. bool valid() const { @@ -118,13 +123,14 @@ class resource { } protected: - const int m_fd; reactor* m_reactor = nullptr; friend class reactor; private: + const int m_fd; bool m_valid = true; + bool m_closed = false; mutable spinlock m_lock; typedef std::unique_lock lock_guard; @@ -133,6 +139,13 @@ class resource { g.unlock(); on_invalidate(); } + + void close() { + lock_guard g(m_lock); + if( m_closed ) return; + ::close(m_fd); + m_closed = true; + } }; @@ -191,7 +204,7 @@ class reactor { // Only may call this when it stops reading // from a resource before it encounters `EAGAIN` or `EWOULDBLOCK`. void add_readable(const resource& res) { - m_readables.push_back(res.fileno()); + m_readables.push_back(res.m_fd); } // Add a removal request for a resource. @@ -202,7 +215,7 @@ class reactor { // calling this. void request_removal(const resource& res) { lock_guard g(m_lock); - m_drop_req.push_back(res.fileno()); + m_drop_req.push_back(res.m_fd); } bool has_garbage() const noexcept { @@ -238,7 +251,7 @@ class reactor { // Remove a registered resource. // This is only for the reactor thread. void remove_resource(const resource& res) { - remove_resource(res.fileno()); + remove_resource(res.m_fd); } private: diff --git a/cybozu/signal.hpp b/cybozu/signal.hpp index eacf964..0ee8efe 100644 --- a/cybozu/signal.hpp +++ b/cybozu/signal.hpp @@ -31,7 +31,7 @@ class signal_reader: public resource { signal_reader(const sigset_t *mask, callback_t callback): resource( signalfd(-1, mask, SFD_NONBLOCK|SFD_CLOEXEC) ), m_callback(callback) { - if( m_fd == -1 ) + if( fileno() == -1 ) throw_unix_error(errno, "signalfd"); } // Constructor. @@ -54,8 +54,11 @@ class signal_reader: public resource { virtual bool on_readable() override final { while( true ) { + int fd = fileno(); + if( fd == -1 ) return true; + struct signalfd_siginfo si; - ssize_t n = read(m_fd, &si, sizeof(si)); + ssize_t n = read(fd, &si, sizeof(si)); if( n == -1 ) { if( errno == EINTR ) continue; if( errno == EAGAIN || errno ==EWOULDBLOCK ) return true; diff --git a/cybozu/tcp.cpp b/cybozu/tcp.cpp index e32d72e..b9d4a77 100644 --- a/cybozu/tcp.cpp +++ b/cybozu/tcp.cpp @@ -160,6 +160,9 @@ void tcp_socket::free_buffers() { } bool tcp_socket::_send(const char* p, std::size_t len, lock_guard& g) { + int fd = fileno(); + if( fd == -1 ) return false; + while( ! can_send(len) ) { on_buffer_full(); m_cond_write.wait(g); @@ -168,7 +171,7 @@ bool tcp_socket::_send(const char* p, std::size_t len, lock_guard& g) { if( m_pending.empty() ) { while( len > 0 ) { - ssize_t n = ::send(m_fd, p, len, 0); + ssize_t n = ::send(fd, p, len, 0); if( n == -1 ) { if( errno == EAGAIN || errno == EWOULDBLOCK ) break; if( errno == EINTR ) continue; @@ -226,6 +229,9 @@ bool tcp_socket::_send(const char* p, std::size_t len, lock_guard& g) { } bool tcp_socket::_sendv(const iovec* iov, const int iovcnt, lock_guard& g) { + int fd = fileno(); + if( fd == -1 ) return false; + std::size_t total = 0; for( int i = 0; i < iovcnt; ++i ) { total += iov[i].len; @@ -250,7 +256,7 @@ bool tcp_socket::_sendv(const iovec* iov, const int iovcnt, lock_guard& g) { if( m_pending.empty() ) { while( ind < v_size ) { - ssize_t n = ::writev(m_fd, &(v[ind]), v_size - ind); + ssize_t n = ::writev(fd, &(v[ind]), v_size - ind); if( n == -1 ) { if( errno == EAGAIN || errno == EWOULDBLOCK ) break; if( errno == EINTR ) continue; @@ -329,8 +335,11 @@ bool tcp_socket::_sendv(const iovec* iov, const int iovcnt, lock_guard& g) { bool tcp_socket::write_pending_data() { lock_guard g(m_lock); + int fd = fileno(); + if( fd == -1 ) return true; + while( ! m_tmpbuf.empty() ) { - ssize_t n = ::send(m_fd, m_tmpbuf.data(), m_tmpbuf.size(), 0); + ssize_t n = ::send(fd, m_tmpbuf.data(), m_tmpbuf.size(), 0); if( n == -1 ) { if( errno == EINTR ) continue; if( errno == EAGAIN || errno == EWOULDBLOCK ) return true; @@ -353,7 +362,7 @@ bool tcp_socket::write_pending_data() { std::tie(p, len, sent) = t; while( len != sent ) { - ssize_t n = ::send(m_fd, p+sent, len-sent, 0); + ssize_t n = ::send(fd, p+sent, len-sent, 0); if( n == -1 ) { if( errno == EINTR ) continue; if( errno == EAGAIN || errno == EWOULDBLOCK ) break; @@ -453,6 +462,9 @@ setup_server_socket(const char* bind_addr, std::uint16_t port, bool freebind) { } bool tcp_server_socket::on_readable() { + int fd = fileno(); + if( fd == -1 ) return true; + while( true ) { union { struct sockaddr sa; @@ -460,10 +472,10 @@ bool tcp_server_socket::on_readable() { } addr; socklen_t addrlen = sizeof(addr); #ifdef _GNU_SOURCE - int s = ::accept4(m_fd, &(addr.sa), &addrlen, + int s = ::accept4(fd, &(addr.sa), &addrlen, SOCK_NONBLOCK|SOCK_CLOEXEC); #else - int s = ::accept(m_fd, &(addr.sa), &addrlen); + int s = ::accept(fd, &(addr.sa), &addrlen); if( s != -1 ) { int fl = fcntl(s, F_GETFL, 0); if( fl == -1 ) fl = 0; diff --git a/cybozu/tcp.hpp b/cybozu/tcp.hpp index d54ddca..5826378 100644 --- a/cybozu/tcp.hpp +++ b/cybozu/tcp.hpp @@ -185,7 +185,7 @@ class tcp_socket: public resource { } virtual void on_invalidate() override { - ::shutdown(m_fd, SHUT_RDWR); + ::shutdown(fileno(), SHUT_RDWR); free_buffers(); m_cond_write.notify_all(); } @@ -223,10 +223,13 @@ class tcp_socket: public resource { return m_pending.empty() && m_tmpbuf.empty(); } void _flush() { + int fd = fileno(); + if( fd == -1 ) return; + // with TCP_CORK, setting TCP_NODELAY effectively flushes // the kernel send buffer. int v = 1; - if( setsockopt(m_fd, IPPROTO_TCP, TCP_NODELAY, &v, sizeof(v)) == -1 ) + if( setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &v, sizeof(v)) == -1 ) throw_unix_error(errno, "setsockopt(TCP_NODELAY)"); } void free_buffers(); diff --git a/src/counter/sockets.cpp b/src/counter/sockets.cpp index e5cb8a1..e2583a9 100644 --- a/src/counter/sockets.cpp +++ b/src/counter/sockets.cpp @@ -24,6 +24,9 @@ counter_socket::counter_socket(int fd, g_stats.total_connections.fetch_add(1); m_recvjob = [this](cybozu::dynbuf& buf) { + int fd = fileno(); + if( fd == -1 ) return; + // load pending data if( ! m_pending.empty() ) { buf.append(m_pending.data(), m_pending.size()); @@ -32,7 +35,7 @@ counter_socket::counter_socket(int fd, while( true ) { char* p = buf.prepare(MAX_RECVSIZE); - ssize_t n = ::recv(m_fd, p, MAX_RECVSIZE, 0); + ssize_t n = ::recv(fd, p, MAX_RECVSIZE, 0); if( n == -1 ) { if( errno == EAGAIN || errno == EWOULDBLOCK ) break; diff --git a/src/memcache/sockets.cpp b/src/memcache/sockets.cpp index 142ea0b..38f7e4d 100644 --- a/src/memcache/sockets.cpp +++ b/src/memcache/sockets.cpp @@ -43,8 +43,11 @@ memcache_socket::memcache_socket(int fd, g_stats.total_connections.fetch_add(1, relaxed); m_recvjob = [this](cybozu::dynbuf& buf) { + int fd = fileno(); + if( fd == -1 ) return; + // set lock context for objects. - g_context = m_fd; + g_context = fd; // load pending data if( ! m_pending.empty() ) { @@ -54,7 +57,7 @@ memcache_socket::memcache_socket(int fd, while( true ) { char* p = buf.prepare(MAX_RECVSIZE); - ssize_t n = ::recv(m_fd, p, MAX_RECVSIZE, 0); + ssize_t n = ::recv(fd, p, MAX_RECVSIZE, 0); if( n == -1 ) { if( errno == EAGAIN || errno == EWOULDBLOCK ) break; @@ -956,9 +959,12 @@ void memcache_socket::cmd_text(const memcache::text_request& cmd) { } bool repl_socket::on_readable() { + int fd = fileno(); + if( fd == -1 ) return true; + // recv and drop. while( true ) { - ssize_t n = ::recv(m_fd, &m_recvbuf[0], MAX_RECVSIZE, 0); + ssize_t n = ::recv(fd, &m_recvbuf[0], MAX_RECVSIZE, 0); if( n == -1 ) { if( errno == EAGAIN || errno == EWOULDBLOCK ) break; @@ -967,7 +973,7 @@ bool repl_socket::on_readable() { if( errno == ECONNRESET ) { std::string addr = "unknown address"; try { - addr = cybozu::get_peer_ip_address(m_fd).str(); + addr = cybozu::get_peer_ip_address(fd).str(); } catch (...) { // ignore errors } @@ -979,7 +985,7 @@ bool repl_socket::on_readable() { if( n == 0 ) { std::string addr = "unknown address"; try { - addr = cybozu::get_peer_ip_address(m_fd).str(); + addr = cybozu::get_peer_ip_address(fd).str(); } catch (...) { // ignore errors } @@ -1003,6 +1009,9 @@ bool repl_socket::on_writable() { } bool repl_client_socket::on_readable() { + int fd = fileno(); + if( fd == -1 ) return true; + // This function is executed in the same thread as the function that sends // heartbeats. If this function takes a very long time, no heartbeats will // be sent, and this process will be judged dead by the master. To prevent @@ -1012,7 +1021,7 @@ bool repl_client_socket::on_readable() { size_t n_iter = 0; while( true ) { char* p = m_recvbuf.prepare(MAX_RECVSIZE); - ssize_t n = ::recv(m_fd, p, MAX_RECVSIZE, 0); + ssize_t n = ::recv(fd, p, MAX_RECVSIZE, 0); if( n == -1 ) { if( errno == EAGAIN || errno == EWOULDBLOCK ) break;