Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] The capacity of dataChan is too big #1067

Open
Gleiphir2769 opened this issue Jul 21, 2023 · 2 comments
Open

[BUG] The capacity of dataChan is too big #1067

Gleiphir2769 opened this issue Jul 21, 2023 · 2 comments

Comments

@Gleiphir2769
Copy link
Contributor

Gleiphir2769 commented Jul 21, 2023

Hi community. I noticed that the capacity of dataChan is too big. We make the dataChan with capacity MaxPendingMessages and the account of pending messages actual in queue may exceed the limit by MaxPendingMessages a lot.

This is because we use publishSemaphore instead of capacity of dataChan to limit pending messages. The actual limit is 2X of MaxPendingMessages.

dataChan: make(chan *sendRequest, maxPendingMessages),

@tisonkun
Copy link
Member

This is because we use publishSemaphore instead of capacity of dataChan to limit pending messages. The actual limit is 2X of MaxPendingMessages.

What does it mean? Both publishSemaphore and dataChan has n=MaxPendingMessages permits to use.

dataChan:         make(chan *sendRequest, maxPendingMessages),
publishSemaphore: internal.NewSemaphore(int32(maxPendingMessages)),

@Gleiphir2769
Copy link
Contributor Author

Because we acquire publishSemaphore after the consumption of dataChan. In some cases we may need to keep twice the number of MaxPendingMessages messages in memory. And it will break the producer memory limit.

p.updateMetaData(sr)
p.dataChan <- sr
}

func (p *partitionProducer) runEventsLoop() {
for {
select {
case data := <-p.dataChan:
p.internalSend(data)
case i := <-p.cmdChan:

if !p.canAddToQueue(sr) {
return
}
// try to reserve memory for uncompressedPayload
if !p.canReserveMem(sr, sr.uncompressedSize) {
return
}

But the concept here is a bit vague. MaxPendingMessages actually limits messages that are being sent to the broker but have not received ack. It actually has nothing to do with producer memory limit, but it affects the accuracy of mem limit because of the execution order. So I think it's better to reduce the size of datachan.

What do you think?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants