Skip to content

Commit 4d414ca

Browse files
committed
fix tps
1 parent c3565d9 commit 4d414ca

File tree

3 files changed

+22
-21
lines changed

3 files changed

+22
-21
lines changed

consumer/option.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,15 @@ type consumerOptions struct {
4545
// Concurrently max span offset.it has no effect on sequential consumption
4646
ConsumeConcurrentlyMaxSpan int
4747

48-
// Flow control threshold on queue level, each message queue will cache at most 1000 messages by default,
48+
// Flow control threshold on queue level, each message queue will cache at most 1024 messages by default,
4949
// Consider the {PullBatchSize}, the instantaneous value may exceed the limit
50-
PullThresholdForQueue int64
50+
PullThresholdForQueue atomic.Int64
5151

52-
// Limit the cached message size on queue level, each message queue will cache at most 100 MiB messages by default,
52+
// Limit the cached message size on queue level, each message queue will cache at most 512 MiB messages by default,
5353
// Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit
5454
//
5555
// The size of a message only measured by message body, so it's not accurate
56-
PullThresholdSizeForQueue int
56+
PullThresholdSizeForQueue atomic.Int32
5757

5858
// Flow control threshold on topic level, default value is -1(Unlimited)
5959
//
@@ -198,13 +198,13 @@ func WithConsumeConcurrentlyMaxSpan(consumeConcurrentlyMaxSpan int) Option {
198198

199199
func WithPullThresholdForQueue(pullThresholdForQueue int64) Option {
200200
return func(options *consumerOptions) {
201-
options.PullThresholdForQueue = pullThresholdForQueue
201+
options.PullThresholdForQueue.Store(pullThresholdForQueue)
202202
}
203203
}
204204

205205
func WithPullThresholdSizeForQueue(pullThresholdSizeForQueue int) Option {
206206
return func(options *consumerOptions) {
207-
options.PullThresholdSizeForQueue = pullThresholdSizeForQueue
207+
options.PullThresholdSizeForQueue.Store(int32(pullThresholdSizeForQueue))
208208
}
209209
}
210210

consumer/push_consumer.go

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -518,23 +518,23 @@ func (pc *pushConsumer) messageQueueChanged(topic string, mqAll, mqDivided []*pr
518518
if newVal == 0 {
519519
newVal = 1
520520
}
521-
rlog.Info("The PullThresholdForTopic is changed", map[string]interface{}{
521+
rlog.Info("The PullThresholdForQueue is changed", map[string]interface{}{
522522
rlog.LogKeyValueChangedFrom: pc.option.PullThresholdForTopic,
523523
rlog.LogKeyValueChangedTo: newVal,
524524
})
525-
pc.option.PullThresholdForTopic = newVal
525+
pc.option.PullThresholdForQueue.Store(int64(newVal))
526526
}
527527

528528
if pc.option.PullThresholdSizeForTopic != -1 {
529529
newVal := pc.option.PullThresholdSizeForTopic / count
530530
if newVal == 0 {
531531
newVal = 1
532532
}
533-
rlog.Info("The PullThresholdSizeForTopic is changed", map[string]interface{}{
534-
rlog.LogKeyValueChangedFrom: pc.option.PullThresholdSizeForTopic,
533+
rlog.Info("The PullThresholdSizeForQueue is changed", map[string]interface{}{
534+
rlog.LogKeyValueChangedFrom: pc.option.PullThresholdSizeForQueue.Load(),
535535
rlog.LogKeyValueChangedTo: newVal,
536536
})
537-
pc.option.PullThresholdSizeForTopic = newVal
537+
pc.option.PullThresholdSizeForQueue.Store(int32(newVal))
538538
}
539539
}
540540
pc.client.SendHeartbeatToAllBrokerWithLock()
@@ -564,9 +564,9 @@ func (pc *pushConsumer) validate() error {
564564
}
565565
}
566566

567-
if pc.option.PullThresholdForQueue < 1 || pc.option.PullThresholdForQueue > 65535 {
568-
if pc.option.PullThresholdForQueue == 0 {
569-
pc.option.PullThresholdForQueue = 1024
567+
if pc.option.PullThresholdForQueue.Load() < 1 || pc.option.PullThresholdForQueue.Load() > 65535 {
568+
if pc.option.PullThresholdForQueue.Load() == 0 {
569+
pc.option.PullThresholdForQueue.Store(1024)
570570
} else {
571571
return errors.New("option.PullThresholdForQueue out of range [1, 65535]")
572572
}
@@ -580,9 +580,9 @@ func (pc *pushConsumer) validate() error {
580580
}
581581
}
582582

583-
if pc.option.PullThresholdSizeForQueue < 1 || pc.option.PullThresholdSizeForQueue > 1024 {
584-
if pc.option.PullThresholdSizeForQueue == 0 {
585-
pc.option.PullThresholdSizeForQueue = 512
583+
if pc.option.PullThresholdSizeForQueue.Load() < 1 || pc.option.PullThresholdSizeForQueue.Load() > 1024 {
584+
if pc.option.PullThresholdSizeForQueue.Load() == 0 {
585+
pc.option.PullThresholdSizeForQueue.Store(512)
586586
} else {
587587
return errors.New("option.PullThresholdSizeForQueue out of range [1, 1024]")
588588
}
@@ -693,10 +693,10 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
693693
}
694694

695695
cachedMessageSizeInMiB := int(pq.cachedMsgSize.Load() / Mb)
696-
if pq.cachedMsgCount.Load() > pc.option.PullThresholdForQueue {
696+
if pq.cachedMsgCount.Load() > pc.option.PullThresholdForQueue.Load() {
697697
if pc.queueFlowControlTimes%1000 == 0 {
698698
rlog.Warning("the cached message count exceeds the threshold, so do flow control", map[string]interface{}{
699-
"PullThresholdForQueue": pc.option.PullThresholdForQueue,
699+
"PullThresholdForQueue": pc.option.PullThresholdForQueue.Load(),
700700
"minOffset": pq.Min(),
701701
"maxOffset": pq.Max(),
702702
"count": pq.cachedMsgCount,
@@ -710,10 +710,10 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
710710
goto NEXT
711711
}
712712

713-
if cachedMessageSizeInMiB > pc.option.PullThresholdSizeForQueue {
713+
if cachedMessageSizeInMiB > int(pc.option.PullThresholdSizeForQueue.Load()) {
714714
if pc.queueFlowControlTimes%1000 == 0 {
715715
rlog.Warning("the cached message size exceeds the threshold, so do flow control", map[string]interface{}{
716-
"PullThresholdSizeForQueue": pc.option.PullThresholdSizeForQueue,
716+
"PullThresholdSizeForQueue": pc.option.PullThresholdSizeForQueue.Load(),
717717
"minOffset": pq.Min(),
718718
"maxOffset": pq.Max(),
719719
"count": pq.cachedMsgCount,

internal/utils/compression.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ func UnCompress(data []byte) []byte {
7878
if err != nil {
7979
return data
8080
}
81+
defer r.Close()
8182
retData, err := ioutil.ReadAll(r)
8283
if err != nil {
8384
return data

0 commit comments

Comments
 (0)