Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: producer close was blocked #1249

Merged
merged 5 commits into from
Jul 24, 2024

Conversation

nodece
Copy link
Member

@nodece nodece commented Jul 12, 2024

Closes #1190

Motivation

When the producer keeps reconnecting to the broker in the event loop, at the same time, the user calls producer.Close, the producer cannot be closed, because the event loop is executing the reconnect request.

Modifications

  • Add a context to control the reconnect and send logic, and stop execution when the context is done.
  • Upgrade go from 1.20 to 1.21, because testcontainers requires go 1.21.

Verifying this change

Added test.

@nodece nodece force-pushed the fix-producer-close-blocked branch from a3cec27 to 34a0678 Compare July 12, 2024 18:23
@nodece nodece marked this pull request as draft July 13, 2024 17:44
@nodece nodece force-pushed the fix-producer-close-blocked branch from 89b9e84 to 8900751 Compare July 14, 2024 14:41
@nodece nodece marked this pull request as ready for review July 14, 2024 14:47
@gunli
Copy link
Contributor

gunli commented Jul 19, 2024

Hi @nodece ,

I think calling p.internalClose(cp) is not a good idea to fix this bug, 'cause it breaks the thread/goroutine model of the SDK(the CSP model: DO NOT COMMUNICATE BY SHARING MEMORY; INSTEAD, SHARE MEMORY BY COMMUNICATING.). The purpose of the cmd channel is to isolate different goroutines, but now we just bypass it.

The root cause of this bug is that the event loop is stuck in an infinite loop in reconnectToBroker(), I think infinite loop in a case of select{case...} is not a good design, we can run the infinite loop in a seperate goroutine, like this:

case connectionClosed := <-p.connectClosedCh:
			p.log.Info("runEventsLoop will reconnect in producer")
			// Start a new goroutine for reconnecting to broker
			go p.reconnectToBroker(connectionClosed)

when the producer is closed, it will quit at:

if p.getProducerState() != producerReady {
			// Producer is already closing
			p.log.Info("producer state not ready, exit reconnect")
			return
		}

@nodece
Copy link
Member Author

nodece commented Jul 19, 2024

Reconnect and Close are different things.

The reconnect request should block the send and flush requests. We can also use go p.reconnectToBroker(connection closed), but this doesn't make sense, as the connection is closed and the send/flush request cannot be sent.

The close request shouldn't be added to the event loop, otherwise, the close request will be blocked when reconnecting.

@gunli
Copy link
Contributor

gunli commented Jul 22, 2024

The reconnect request should bloc

OK, since The reconnect request should block the send and flush requests. and The close request shouldn't be added to the event loop, we need to make sure that the runEventsLoop completely stopped after we call internalClose() (to be precise, it should be after calling p.casProducerState(producerReady, producerClosing)), Or when the state is set to producerClosing, the reconnecting case of the event loop will return and other cases will begin to run, these case may have race condition with internalClose(), which will lead to panic or some other bugs, it may not happen now, but it is hard to say that it won't happen in the future when we extend the code of event loop cases or the logic of internalClose().

@nodece
Copy link
Member Author

nodece commented Jul 22, 2024

@gunli Good catch, we should improve the event loop, once the producer is closed, the event loop should be stopped.

@RobertIndie
Copy link
Member

we need to make sure that the runEventsLoop completely stopped after we call internalClose()

@gunli +1

@nodece
Copy link
Member Author

nodece commented Jul 22, 2024

@gunli @RobertIndie The runEventsLoop can be stopped, when the producer is closed.

Event:

@gunli
Copy link
Contributor

gunli commented Jul 23, 2024

@gunli @RobertIndie The runEventsLoop can be stopped, when the producer is closed.

Event:

But the behavior is unpredictable as they are run in different goroutines, after doClose() update the state to closing, reconnecting() will return immediately, but now dataChan and cmdChan are not closed yet, neither the batchFlushTicker, the event loop will coninue to run, it can still run into internalSend() and internalFlushCurrentBatch().

@nodece nodece force-pushed the fix-producer-close-blocked branch from a6f8656 to 1619213 Compare July 23, 2024 13:08
@nodece
Copy link
Member Author

nodece commented Jul 23, 2024

Updated.

Copy link
Member

@crossoverJie crossoverJie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@nodece nodece requested a review from gunli July 24, 2024 09:03
Copy link
Contributor

@gunli gunli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@gunli
Copy link
Contributor

gunli commented Jul 24, 2024

This PR may be related to issue #1190

@nodece nodece merged commit 28293a6 into apache:master Jul 24, 2024
7 checks passed
RobertIndie pushed a commit that referenced this pull request Jul 31, 2024
* fix: producer close was blocked

* Fix lint

* Use ctx to control logic

* Reduce timeout

* Improve style

(cherry picked from commit 28293a6)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[BUG] The process will block if close client or producer when pulsar server is down.
4 participants