Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Early close file descriptors #94

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,26 @@ jobs:
strategy:
fail-fast: false
matrix:
os: [ubuntu-20.04, ubuntu-22.04]
compiler: [[gcc, g++], [clang, clang++]]
os: [ubuntu-20.04, ubuntu-22.04, ubuntu-24.04]
compiler: [{cc: gcc, cxx: g++}, {cc: clang, cxx: clang++}]
timeout-minutes: 30

steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Install dependencies
run: |
sudo apt-get update
sudo apt-get install -y libgoogle-perftools-dev libunwind-dev
- name: Build
env:
CC: ${{ matrix.compiler.cc }}
CXX: ${{ matrix.compiler.cxx }}
run: |
$CC --version
$CXX --version
make -j $(nproc) CC=$CC CXX=$CXX OPTFLAGS="-O2 -fPIE"
./yrmcdsd &
sleep 1
env:
CC: ${{ matrix.compiler[0] }}
CXX: ${{ matrix.compiler[1] }}
- name: Run tests
run: |
make YRMCDS_SERVER=localhost tests
7 changes: 5 additions & 2 deletions cybozu/reactor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ void reactor::add_resource(std::unique_ptr<resource> res, int events) {
if( res->m_reactor != nullptr )
throw std::logic_error("<reactor::add_resource> already added!");
res->m_reactor = this;
const int fd = res->fileno();
const int fd = res->m_fd;
struct epoll_event ev;
ev.events = events | EPOLLET;
ev.data.fd = fd;
Expand All @@ -47,7 +47,7 @@ void reactor::add_resource(std::unique_ptr<resource> res, int events) {
}

void reactor::modify_events(const resource& res, int events) {
const int fd = res.fileno();
const int fd = res.m_fd;
struct epoll_event ev;
ev.events = events | EPOLLET;
ev.data.fd = fd;
Expand Down Expand Up @@ -78,12 +78,15 @@ void reactor::remove_resource(int fd) {
dump_stack();
throw std::logic_error("bug in remove_resource");
}
auto res = it->second.get();
m_garbage.emplace_back( std::move(it->second) );
m_resources.erase(it);
if( epoll_ctl(m_fd, EPOLL_CTL_DEL, fd, NULL) == -1 )
throw_unix_error(errno, "epoll_ctl(EPOLL_CTL_DEL)");
m_readables.erase(std::remove(m_readables.begin(), m_readables.end(), fd),
m_readables.end());

res->close();
}

void reactor::poll() {
Expand Down
25 changes: 19 additions & 6 deletions cybozu/reactor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,16 @@ class resource {

// Close the file descriptor.
virtual ~resource() {
::close(m_fd);
close();
}

// Return the UNIX file descriptor for this resource.
int fileno() const { return m_fd; }
// This returns -1 after the resource is invalidated.
int fileno() const {
lock_guard g(m_lock);
if( ! m_valid ) return -1;
return m_fd;
}

// `true` if this resource is still valid.
bool valid() const {
Expand Down Expand Up @@ -118,13 +123,14 @@ class resource {
}

protected:
const int m_fd;
reactor* m_reactor = nullptr;

friend class reactor;

private:
const int m_fd;
bool m_valid = true;
bool m_closed = false;
mutable spinlock m_lock;
typedef std::unique_lock<spinlock> lock_guard;

Expand All @@ -133,6 +139,13 @@ class resource {
g.unlock();
on_invalidate();
}

void close() {
lock_guard g(m_lock);
if( m_closed ) return;
::close(m_fd);
m_closed = true;
}
};


Expand Down Expand Up @@ -191,7 +204,7 @@ class reactor {
// Only <resource::on_readable> may call this when it stops reading
// from a resource before it encounters `EAGAIN` or `EWOULDBLOCK`.
void add_readable(const resource& res) {
m_readables.push_back(res.fileno());
m_readables.push_back(res.m_fd);
}

// Add a removal request for a resource.
Expand All @@ -202,7 +215,7 @@ class reactor {
// calling this.
void request_removal(const resource& res) {
lock_guard g(m_lock);
m_drop_req.push_back(res.fileno());
m_drop_req.push_back(res.m_fd);
}

bool has_garbage() const noexcept {
Expand Down Expand Up @@ -238,7 +251,7 @@ class reactor {
// Remove a registered resource.
// This is only for the reactor thread.
void remove_resource(const resource& res) {
remove_resource(res.fileno());
remove_resource(res.m_fd);
}

private:
Expand Down
7 changes: 5 additions & 2 deletions cybozu/signal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class signal_reader: public resource {
signal_reader(const sigset_t *mask, callback_t callback):
resource( signalfd(-1, mask, SFD_NONBLOCK|SFD_CLOEXEC) ),
m_callback(callback) {
if( m_fd == -1 )
if( fileno() == -1 )
throw_unix_error(errno, "signalfd");
}
// Constructor.
Expand All @@ -54,8 +54,11 @@ class signal_reader: public resource {

virtual bool on_readable() override final {
while( true ) {
int fd = fileno();
if( fd == -1 ) return true;

struct signalfd_siginfo si;
ssize_t n = read(m_fd, &si, sizeof(si));
ssize_t n = read(fd, &si, sizeof(si));
if( n == -1 ) {
if( errno == EINTR ) continue;
if( errno == EAGAIN || errno ==EWOULDBLOCK ) return true;
Expand Down
24 changes: 18 additions & 6 deletions cybozu/tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ void tcp_socket::free_buffers() {
}

bool tcp_socket::_send(const char* p, std::size_t len, lock_guard& g) {
int fd = fileno();
if( fd == -1 ) return false;

while( ! can_send(len) ) {
on_buffer_full();
m_cond_write.wait(g);
Expand All @@ -168,7 +171,7 @@ bool tcp_socket::_send(const char* p, std::size_t len, lock_guard& g) {

if( m_pending.empty() ) {
while( len > 0 ) {
ssize_t n = ::send(m_fd, p, len, 0);
ssize_t n = ::send(fd, p, len, 0);
if( n == -1 ) {
if( errno == EAGAIN || errno == EWOULDBLOCK ) break;
if( errno == EINTR ) continue;
Expand Down Expand Up @@ -226,6 +229,9 @@ bool tcp_socket::_send(const char* p, std::size_t len, lock_guard& g) {
}

bool tcp_socket::_sendv(const iovec* iov, const int iovcnt, lock_guard& g) {
int fd = fileno();
if( fd == -1 ) return false;

std::size_t total = 0;
for( int i = 0; i < iovcnt; ++i ) {
total += iov[i].len;
Expand All @@ -250,7 +256,7 @@ bool tcp_socket::_sendv(const iovec* iov, const int iovcnt, lock_guard& g) {

if( m_pending.empty() ) {
while( ind < v_size ) {
ssize_t n = ::writev(m_fd, &(v[ind]), v_size - ind);
ssize_t n = ::writev(fd, &(v[ind]), v_size - ind);
if( n == -1 ) {
if( errno == EAGAIN || errno == EWOULDBLOCK ) break;
if( errno == EINTR ) continue;
Expand Down Expand Up @@ -329,8 +335,11 @@ bool tcp_socket::_sendv(const iovec* iov, const int iovcnt, lock_guard& g) {
bool tcp_socket::write_pending_data() {
lock_guard g(m_lock);

int fd = fileno();
if( fd == -1 ) return true;

while( ! m_tmpbuf.empty() ) {
ssize_t n = ::send(m_fd, m_tmpbuf.data(), m_tmpbuf.size(), 0);
ssize_t n = ::send(fd, m_tmpbuf.data(), m_tmpbuf.size(), 0);
if( n == -1 ) {
if( errno == EINTR ) continue;
if( errno == EAGAIN || errno == EWOULDBLOCK ) return true;
Expand All @@ -353,7 +362,7 @@ bool tcp_socket::write_pending_data() {
std::tie(p, len, sent) = t;

while( len != sent ) {
ssize_t n = ::send(m_fd, p+sent, len-sent, 0);
ssize_t n = ::send(fd, p+sent, len-sent, 0);
if( n == -1 ) {
if( errno == EINTR ) continue;
if( errno == EAGAIN || errno == EWOULDBLOCK ) break;
Expand Down Expand Up @@ -453,17 +462,20 @@ setup_server_socket(const char* bind_addr, std::uint16_t port, bool freebind) {
}

bool tcp_server_socket::on_readable() {
int fd = fileno();
if( fd == -1 ) return true;

while( true ) {
union {
struct sockaddr sa;
struct sockaddr_storage ss;
} addr;
socklen_t addrlen = sizeof(addr);
#ifdef _GNU_SOURCE
int s = ::accept4(m_fd, &(addr.sa), &addrlen,
int s = ::accept4(fd, &(addr.sa), &addrlen,
SOCK_NONBLOCK|SOCK_CLOEXEC);
#else
int s = ::accept(m_fd, &(addr.sa), &addrlen);
int s = ::accept(fd, &(addr.sa), &addrlen);
if( s != -1 ) {
int fl = fcntl(s, F_GETFL, 0);
if( fl == -1 ) fl = 0;
Expand Down
7 changes: 5 additions & 2 deletions cybozu/tcp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ class tcp_socket: public resource {
}

virtual void on_invalidate() override {
::shutdown(m_fd, SHUT_RDWR);
::shutdown(fileno(), SHUT_RDWR);
free_buffers();
m_cond_write.notify_all();
}
Expand Down Expand Up @@ -223,10 +223,13 @@ class tcp_socket: public resource {
return m_pending.empty() && m_tmpbuf.empty();
}
void _flush() {
int fd = fileno();
if( fd == -1 ) return;

// with TCP_CORK, setting TCP_NODELAY effectively flushes
// the kernel send buffer.
int v = 1;
if( setsockopt(m_fd, IPPROTO_TCP, TCP_NODELAY, &v, sizeof(v)) == -1 )
if( setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &v, sizeof(v)) == -1 )
throw_unix_error(errno, "setsockopt(TCP_NODELAY)");
}
void free_buffers();
Expand Down
5 changes: 4 additions & 1 deletion src/counter/sockets.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ counter_socket::counter_socket(int fd,
g_stats.total_connections.fetch_add(1);

m_recvjob = [this](cybozu::dynbuf& buf) {
int fd = fileno();
if( fd == -1 ) return;

// load pending data
if( ! m_pending.empty() ) {
buf.append(m_pending.data(), m_pending.size());
Expand All @@ -32,7 +35,7 @@ counter_socket::counter_socket(int fd,

while( true ) {
char* p = buf.prepare(MAX_RECVSIZE);
ssize_t n = ::recv(m_fd, p, MAX_RECVSIZE, 0);
ssize_t n = ::recv(fd, p, MAX_RECVSIZE, 0);
if( n == -1 ) {
if( errno == EAGAIN || errno == EWOULDBLOCK )
break;
Expand Down
21 changes: 15 additions & 6 deletions src/memcache/sockets.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,11 @@ memcache_socket::memcache_socket(int fd,
g_stats.total_connections.fetch_add(1, relaxed);

m_recvjob = [this](cybozu::dynbuf& buf) {
int fd = fileno();
if( fd == -1 ) return;

// set lock context for objects.
g_context = m_fd;
g_context = fd;

// load pending data
if( ! m_pending.empty() ) {
Expand All @@ -54,7 +57,7 @@ memcache_socket::memcache_socket(int fd,

while( true ) {
char* p = buf.prepare(MAX_RECVSIZE);
ssize_t n = ::recv(m_fd, p, MAX_RECVSIZE, 0);
ssize_t n = ::recv(fd, p, MAX_RECVSIZE, 0);
if( n == -1 ) {
if( errno == EAGAIN || errno == EWOULDBLOCK )
break;
Expand Down Expand Up @@ -956,9 +959,12 @@ void memcache_socket::cmd_text(const memcache::text_request& cmd) {
}

bool repl_socket::on_readable() {
int fd = fileno();
if( fd == -1 ) return true;

// recv and drop.
while( true ) {
ssize_t n = ::recv(m_fd, &m_recvbuf[0], MAX_RECVSIZE, 0);
ssize_t n = ::recv(fd, &m_recvbuf[0], MAX_RECVSIZE, 0);
if( n == -1 ) {
if( errno == EAGAIN || errno == EWOULDBLOCK )
break;
Expand All @@ -967,7 +973,7 @@ bool repl_socket::on_readable() {
if( errno == ECONNRESET ) {
std::string addr = "unknown address";
try {
addr = cybozu::get_peer_ip_address(m_fd).str();
addr = cybozu::get_peer_ip_address(fd).str();
} catch (...) {
// ignore errors
}
Expand All @@ -979,7 +985,7 @@ bool repl_socket::on_readable() {
if( n == 0 ) {
std::string addr = "unknown address";
try {
addr = cybozu::get_peer_ip_address(m_fd).str();
addr = cybozu::get_peer_ip_address(fd).str();
} catch (...) {
// ignore errors
}
Expand All @@ -1003,6 +1009,9 @@ bool repl_socket::on_writable() {
}

bool repl_client_socket::on_readable() {
int fd = fileno();
if( fd == -1 ) return true;

// This function is executed in the same thread as the function that sends
// heartbeats. If this function takes a very long time, no heartbeats will
// be sent, and this process will be judged dead by the master. To prevent
Expand All @@ -1012,7 +1021,7 @@ bool repl_client_socket::on_readable() {
size_t n_iter = 0;
while( true ) {
char* p = m_recvbuf.prepare(MAX_RECVSIZE);
ssize_t n = ::recv(m_fd, p, MAX_RECVSIZE, 0);
ssize_t n = ::recv(fd, p, MAX_RECVSIZE, 0);
if( n == -1 ) {
if( errno == EAGAIN || errno == EWOULDBLOCK )
break;
Expand Down