From 2e2f90b182aa783152c049df9fbc9ff472ff3afc Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Thu, 7 Sep 2023 01:05:40 -0700 Subject: [PATCH] Avoid double attempt at reconnecting (#310) ### Motivation There is a sequence of conditions that can trigger a client to schedule multiple reconnections to the broker. This is due to fact that we're not checking whether such an attempt is already in progress. example: 1. Receive `CloseConsumer` command from broker 1a. Schedule for reconnection in 100ms 2. Connection is closed (eg: broker shutdown has initiated) 2a. Schedule for reconnection in 200ms (since the backoff was already incremented) Result is that we're going to call `grabCnx()` twice ### Modifications Use atomic flag to ignore the 2nd attempt, just waiting for the 1st attempt to finish. --- lib/HandlerBase.cc | 14 +++++++++++++- lib/HandlerBase.h | 1 + 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc index 28b53176..df315d38 100644 --- a/lib/HandlerBase.cc +++ b/lib/HandlerBase.cc @@ -38,7 +38,8 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic, state_(NotStarted), backoff_(backoff), epoch_(0), - timer_(executor_->createDeadlineTimer()) {} + timer_(executor_->createDeadlineTimer()), + reconnectionPending_(false) {} HandlerBase::~HandlerBase() { timer_->cancel(); } @@ -69,6 +70,13 @@ void HandlerBase::grabCnx() { LOG_INFO(getName() << "Ignoring reconnection request since we're already connected"); return; } + + bool expectedState = false; + if (!reconnectionPending_.compare_exchange_strong(expectedState, true)) { + LOG_DEBUG(getName() << "Ignoring reconnection attempt since there's already a pending reconnection"); + return; + } + LOG_INFO(getName() << "Getting connection from pool"); ClientImplPtr client = client_.lock(); Future future = client->getConnection(*topic_); @@ -83,6 +91,9 @@ void HandlerBase::handleNewConnection(Result result, ClientConnectionWeakPtr con LOG_DEBUG("HandlerBase Weak reference is not valid anymore"); return; } + + handler->reconnectionPending_ = false; + if (result == ResultOk) { ClientConnectionPtr conn = connection.lock(); if (conn) { @@ -140,6 +151,7 @@ void HandlerBase::handleDisconnection(Result result, ClientConnectionWeakPtr con void HandlerBase::scheduleReconnection(HandlerBasePtr handler) { const auto state = handler->state_.load(); + if (state == Pending || state == Ready) { TimeDuration delay = handler->backoff_.next(); diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h index e6a2fc6d..cf6e115e 100644 --- a/lib/HandlerBase.h +++ b/lib/HandlerBase.h @@ -128,6 +128,7 @@ class HandlerBase { DeadlineTimerPtr timer_; mutable std::mutex connectionMutex_; + std::atomic reconnectionPending_; ClientConnectionWeakPtr connection_; friend class ClientConnection; friend class PulsarFriend;