From 684e5d68e9a28e05ef9b87167e668070e23cca47 Mon Sep 17 00:00:00 2001 From: cheniujh <41671101+cheniujh@users.noreply.github.com> Date: Tue, 13 Aug 2024 18:02:34 +0800 Subject: [PATCH] fix: kill connections correctly in Net WorkThread (#2862) * kill conns correct in WorkThread --- src/net/src/worker_thread.cc | 48 ++++++++++++++++++--- src/net/src/worker_thread.h | 5 +++ tests/integration/replication_test.go | 15 ++++++- tests/integration/rsync_dynamic_reconfig.go | 8 ++-- tests/integration/start_master_and_slave.sh | 21 ++++++--- 5 files changed, 77 insertions(+), 20 deletions(-) diff --git a/src/net/src/worker_thread.cc b/src/net/src/worker_thread.cc index 43671fdb7e..1304fc296c 100644 --- a/src/net/src/worker_thread.cc +++ b/src/net/src/worker_thread.cc @@ -270,17 +270,55 @@ void WorkerThread::DoCronTask() { ++iter; } } + /* + * How Do we kill a conn correct: + * stage 1: stop accept new request(also give up the write back of shooting request's response) + * 1.1 remove the fd from epoll and erase it from conns_ to ensure no more request will submit to threadpool + * 1.2 add to-close-conn to wait_to_close_conns_ + * stage 2: ensure there's no other shared_ptr of this conn in pika + * 2.1 in async task that exec by TheadPool, a shared_ptr of conn will hold and my case a pipe event to tell the epoll + * to back the response, we must ensure this notification is done before we really close fd(linux will reuse the fd to accept new conn) + * 2.2 we must clear all other shared_ptr of this to-close-conn, like the map of blpop/brpop and the map of watchkeys + * 2.3 for those to-close-conns that ref count drop to 1, we add them to ready-to-close-conns_ + * stage 3: after an epoll cycle(let it handle the already-invalid-writeback-notification ), we can safely close the fds of ready_to_close_conns_ + */ + + for (auto& conn : ready_to_close_conns_) { + close(conn->fd()); + server_thread_->handle_->FdClosedHandle(conn->fd(), conn->ip_port()); + } + ready_to_close_conns_.clear(); + + for (auto conn = wait_to_close_conns_.begin(); conn != wait_to_close_conns_.end();) { + if (conn->use_count() == 1) { + ready_to_close_conns_.push_back(*conn); + conn = wait_to_close_conns_.erase(conn); + } else { + ++conn; + } + } + for (const auto& conn : to_close) { net_multiplexer_->NetDelEvent(conn->fd(), 0); - CloseFd(conn); + ClearConnsRefAndOtherInfo(conn); + wait_to_close_conns_.push_back(conn); } for (const auto& conn : to_timeout) { net_multiplexer_->NetDelEvent(conn->fd(), 0); - CloseFd(conn); + ClearConnsRefAndOtherInfo(conn); + wait_to_close_conns_.push_back(conn); server_thread_->handle_->FdTimeoutHandle(conn->fd(), conn->ip_port()); } } +void WorkerThread::ClearConnsRefAndOtherInfo(const std::shared_ptr& conn) { + if (auto dispatcher = dynamic_cast(server_thread_); dispatcher != nullptr ) { + //check if this conn disconnected from being blocked by blpop/brpop + dispatcher->ClosingConnCheckForBlrPop(std::dynamic_pointer_cast(conn)); + dispatcher->RemoveWatchKeys(conn); + } +} + bool WorkerThread::TryKillConn(const std::string& ip_port) { bool find = false; if (ip_port != kKillAllConnsTask) { @@ -301,12 +339,8 @@ bool WorkerThread::TryKillConn(const std::string& ip_port) { } void WorkerThread::CloseFd(const std::shared_ptr& conn) { + ClearConnsRefAndOtherInfo(conn); close(conn->fd()); - if (auto dispatcher = dynamic_cast(server_thread_); dispatcher != nullptr ) { - //check if this conn disconnected from being blocked by blpop/brpop - dispatcher->ClosingConnCheckForBlrPop(std::dynamic_pointer_cast(conn)); - dispatcher->RemoveWatchKeys(conn); - } server_thread_->handle_->FdClosedHandle(conn->fd(), conn->ip_port()); } diff --git a/src/net/src/worker_thread.h b/src/net/src/worker_thread.h index 43af5a096c..47bab0091a 100644 --- a/src/net/src/worker_thread.h +++ b/src/net/src/worker_thread.h @@ -48,10 +48,15 @@ class WorkerThread : public Thread { NetMultiplexer* net_multiplexer() { return net_multiplexer_.get(); } bool TryKillConn(const std::string& ip_port); + void ClearConnsRefAndOtherInfo(const std::shared_ptr& conn); + ServerThread* GetServerThread() { return server_thread_; } mutable pstd::RWMutex rwlock_; /* For external statistics */ std::map> conns_; + std::vector> wait_to_close_conns_; + std::vector> ready_to_close_conns_; + void* private_data_ = nullptr; diff --git a/tests/integration/replication_test.go b/tests/integration/replication_test.go index 6434ae6960..cf72731d46 100644 --- a/tests/integration/replication_test.go +++ b/tests/integration/replication_test.go @@ -634,14 +634,25 @@ var _ = Describe("should replication ", func() { for i := 1; i <= 5; i++ { go func() { - clientMaster.BLPop(ctx, 0, lists...) + client := redis.NewClient(PikaOption(MASTERADDR)) + defer client.Close() + + client.BLPop(ctx, 0, lists...) }() go func() { - clientMaster.BRPop(ctx, 0, lists...) + client := redis.NewClient(PikaOption(MASTERADDR)) + defer client.Close() + + client.BRPop(ctx, 0, lists...) }() } execute(&ctx, clientMaster, 5, issuePushPopFrequency) + + time.Sleep(3 * time.Second); + //reconnect to avoid timeout-kill + clientSlave := redis.NewClient(PikaOption(SLAVEADDR)) + // Fail("Stopping the test due to some condition"); for i := int64(0); i < clientMaster.LLen(ctx, "blist0").Val(); i++ { Expect(clientMaster.LIndex(ctx, "blist0", i)).To(Equal(clientSlave.LIndex(ctx, "blist0", i))) } diff --git a/tests/integration/rsync_dynamic_reconfig.go b/tests/integration/rsync_dynamic_reconfig.go index 0dedf1ad56..67d8590e42 100644 --- a/tests/integration/rsync_dynamic_reconfig.go +++ b/tests/integration/rsync_dynamic_reconfig.go @@ -35,10 +35,10 @@ func RefillMaster(masterAddr string, dataVolumeMB int64, ctx context.Context) { cli.Set(ctx, rKey, rValue, 0) } } - keySize := 1024 - valueSize := 1024 + keySize := 64 + valueSize := 64 dataVolumeBytes := dataVolumeMB << 20 - threadNum := 10 + threadNum := 5 reqNumForEachThead := dataVolumeBytes / int64((keySize + valueSize)) / int64(threadNum) //fmt.Printf("reqNumForEach:%d\n", reqNumForEachThead) startTime := time.Now() @@ -136,7 +136,7 @@ var _ = Describe("Rsync Reconfig Test", func() { slave1.FlushDB(ctx) master1.FlushDB(ctx) time.Sleep(3 * time.Second) - RefillMaster(MASTERADDR, 64, ctx) + RefillMaster(MASTERADDR, 2, ctx) key1 := "45vs45f4s5d6" value1 := "afd54g5s4f545" //set key before sync happened, slave is supposed to fetch it when sync done diff --git a/tests/integration/start_master_and_slave.sh b/tests/integration/start_master_and_slave.sh index 65ceaea6ef..f211cd101d 100755 --- a/tests/integration/start_master_and_slave.sh +++ b/tests/integration/start_master_and_slave.sh @@ -14,7 +14,8 @@ mkdir slave_data # Example Change the location for storing data on primary and secondary nodes in the configuration file sed -i.bak \ -e 's|databases : 1|databases : 2|' \ - -e 's|#daemonize : yes|daemonize : yes|' ./pika_single.conf + -e 's|#daemonize : yes|daemonize : yes|' \ + -e 's|timeout : 60|timeout : 500|' ./pika_single.conf sed -i.bak \ -e 's|databases : 1|databases : 2|' \ @@ -24,7 +25,8 @@ sed -i.bak \ -e 's|dump-path : ./dump/|dump-path : ./master_data/dump/|' \ -e 's|pidfile : ./pika.pid|pidfile : ./master_data/pika.pid|' \ -e 's|db-sync-path : ./dbsync/|db-sync-path : ./master_data/dbsync/|' \ - -e 's|#daemonize : yes|daemonize : yes|' ./pika_master.conf + -e 's|#daemonize : yes|daemonize : yes|' \ + -e 's|timeout : 60|timeout : 500|' ./pika_master.conf sed -i.bak \ -e 's|databases : 1|databases : 2|' \ @@ -34,7 +36,8 @@ sed -i.bak \ -e 's|dump-path : ./dump/|dump-path : ./slave_data/dump/|' \ -e 's|pidfile : ./pika.pid|pidfile : ./slave_data/pika.pid|' \ -e 's|db-sync-path : ./dbsync/|db-sync-path : ./slave_data/dbsync/|' \ - -e 's|#daemonize : yes|daemonize : yes|' ./pika_slave.conf + -e 's|#daemonize : yes|daemonize : yes|' \ + -e 's|timeout : 60|timeout : 500|' ./pika_slave.conf sed -i.bak \ -e 's|# rename-command : FLUSHALL 360flushall|rename-command : FLUSHALL 360flushall|' \ @@ -46,7 +49,8 @@ sed -i.bak \ -e 's|dump-path : ./dump/|dump-path : ./rename_data/dump/|' \ -e 's|pidfile : ./pika.pid|pidfile : ./rename_data/pika.pid|' \ -e 's|db-sync-path : ./dbsync/|db-sync-path : ./rename_data/dbsync/|' \ - -e 's|#daemonize : yes|daemonize : yes|' ./pika_rename.conf + -e 's|#daemonize : yes|daemonize : yes|' \ + -e 's|timeout : 60|timeout : 500|' ./pika_rename.conf sed -i.bak \ -e 's|requirepass :|requirepass : requirepass|' \ @@ -59,7 +63,8 @@ sed -i.bak \ -e 's|dump-path : ./dump/|dump-path : ./acl1_data/dump/|' \ -e 's|pidfile : ./pika.pid|pidfile : ./acl1_data/pika.pid|' \ -e 's|db-sync-path : ./dbsync/|db-sync-path : ./acl1_data/dbsync/|' \ - -e 's|#daemonize : yes|daemonize : yes|' ./pika_acl_both_password.conf + -e 's|#daemonize : yes|daemonize : yes|' \ + -e 's|timeout : 60|timeout : 500|' ./pika_acl_both_password.conf sed -i.bak \ -e 's|requirepass :|requirepass : requirepass|' \ @@ -71,7 +76,8 @@ sed -i.bak \ -e 's|dump-path : ./dump/|dump-path : ./acl2_data/dump/|' \ -e 's|pidfile : ./pika.pid|pidfile : ./acl2_data/pika.pid|' \ -e 's|db-sync-path : ./dbsync/|db-sync-path : ./acl2_data/dbsync/|' \ - -e 's|#daemonize : yes|daemonize : yes|' ./pika_acl_only_admin_password.conf + -e 's|#daemonize : yes|daemonize : yes|' \ + -e 's|timeout : 60|timeout : 500|' ./pika_acl_only_admin_password.conf sed -i.bak \ -e 's|requirepass :|requirepass : requirepass|' \ -e 's|masterauth :|masterauth : requirepass|' \ @@ -83,7 +89,8 @@ sed -i.bak \ -e 's|dump-path : ./dump/|dump-path : ./acl3_data/dump/|' \ -e 's|pidfile : ./pika.pid|pidfile : ./acl3_data/pika.pid|' \ -e 's|db-sync-path : ./dbsync/|db-sync-path : ./acl3_data/dbsync/|' \ - -e 's|#daemonize : yes|daemonize : yes|' ./pika_has_other_acl_user.conf + -e 's|#daemonize : yes|daemonize : yes|' \ + -e 's|timeout : 60|timeout : 500|' ./pika_has_other_acl_user.conf echo -e '\nuser : limit on >limitpass ~* +@all &*' >> ./pika_has_other_acl_user.conf # Start three nodes