Skip to content

Commit

Permalink
Avoid double attempt at reconnecting (#310)
Browse files Browse the repository at this point in the history
### Motivation

There is a sequence of conditions that can trigger a client to schedule multiple reconnections to the broker. This is due to fact that we're not checking whether such an attempt is already in progress.

example: 
 1. Receive `CloseConsumer` command from broker
    1a.  Schedule for reconnection in 100ms
 2. Connection is closed (eg: broker shutdown has initiated)
   2a. Schedule for reconnection in 200ms (since the backoff was already incremented)

Result is that we're going to call `grabCnx()` twice

### Modifications

Use atomic flag to ignore the 2nd attempt, just waiting for the 1st attempt to finish.
  • Loading branch information
merlimat authored Sep 7, 2023
1 parent 6ebea5c commit 2e2f90b
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 1 deletion.
14 changes: 13 additions & 1 deletion lib/HandlerBase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic,
state_(NotStarted),
backoff_(backoff),
epoch_(0),
timer_(executor_->createDeadlineTimer()) {}
timer_(executor_->createDeadlineTimer()),
reconnectionPending_(false) {}

HandlerBase::~HandlerBase() { timer_->cancel(); }

Expand Down Expand Up @@ -69,6 +70,13 @@ void HandlerBase::grabCnx() {
LOG_INFO(getName() << "Ignoring reconnection request since we're already connected");
return;
}

bool expectedState = false;
if (!reconnectionPending_.compare_exchange_strong(expectedState, true)) {
LOG_DEBUG(getName() << "Ignoring reconnection attempt since there's already a pending reconnection");
return;
}

LOG_INFO(getName() << "Getting connection from pool");
ClientImplPtr client = client_.lock();
Future<Result, ClientConnectionWeakPtr> future = client->getConnection(*topic_);
Expand All @@ -83,6 +91,9 @@ void HandlerBase::handleNewConnection(Result result, ClientConnectionWeakPtr con
LOG_DEBUG("HandlerBase Weak reference is not valid anymore");
return;
}

handler->reconnectionPending_ = false;

if (result == ResultOk) {
ClientConnectionPtr conn = connection.lock();
if (conn) {
Expand Down Expand Up @@ -140,6 +151,7 @@ void HandlerBase::handleDisconnection(Result result, ClientConnectionWeakPtr con

void HandlerBase::scheduleReconnection(HandlerBasePtr handler) {
const auto state = handler->state_.load();

if (state == Pending || state == Ready) {
TimeDuration delay = handler->backoff_.next();

Expand Down
1 change: 1 addition & 0 deletions lib/HandlerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ class HandlerBase {
DeadlineTimerPtr timer_;

mutable std::mutex connectionMutex_;
std::atomic<bool> reconnectionPending_;
ClientConnectionWeakPtr connection_;
friend class ClientConnection;
friend class PulsarFriend;
Expand Down

0 comments on commit 2e2f90b

Please sign in to comment.