diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc index 28b53176..df315d38 100644 --- a/lib/HandlerBase.cc +++ b/lib/HandlerBase.cc @@ -38,7 +38,8 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic, state_(NotStarted), backoff_(backoff), epoch_(0), - timer_(executor_->createDeadlineTimer()) {} + timer_(executor_->createDeadlineTimer()), + reconnectionPending_(false) {} HandlerBase::~HandlerBase() { timer_->cancel(); } @@ -69,6 +70,13 @@ void HandlerBase::grabCnx() { LOG_INFO(getName() << "Ignoring reconnection request since we're already connected"); return; } + + bool expectedState = false; + if (!reconnectionPending_.compare_exchange_strong(expectedState, true)) { + LOG_DEBUG(getName() << "Ignoring reconnection attempt since there's already a pending reconnection"); + return; + } + LOG_INFO(getName() << "Getting connection from pool"); ClientImplPtr client = client_.lock(); Future future = client->getConnection(*topic_); @@ -83,6 +91,9 @@ void HandlerBase::handleNewConnection(Result result, ClientConnectionWeakPtr con LOG_DEBUG("HandlerBase Weak reference is not valid anymore"); return; } + + handler->reconnectionPending_ = false; + if (result == ResultOk) { ClientConnectionPtr conn = connection.lock(); if (conn) { @@ -140,6 +151,7 @@ void HandlerBase::handleDisconnection(Result result, ClientConnectionWeakPtr con void HandlerBase::scheduleReconnection(HandlerBasePtr handler) { const auto state = handler->state_.load(); + if (state == Pending || state == Ready) { TimeDuration delay = handler->backoff_.next(); diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h index e6a2fc6d..cf6e115e 100644 --- a/lib/HandlerBase.h +++ b/lib/HandlerBase.h @@ -128,6 +128,7 @@ class HandlerBase { DeadlineTimerPtr timer_; mutable std::mutex connectionMutex_; + std::atomic reconnectionPending_; ClientConnectionWeakPtr connection_; friend class ClientConnection; friend class PulsarFriend;