Skip to content

Commit

Permalink
avoid oom when there is lag and binlog size is big (pingcap#823) (pin…
Browse files Browse the repository at this point in the history
…gcap#834)

* Add size config
  • Loading branch information
july2993 authored and IANTHEREAL committed Nov 26, 2019
1 parent 90b5d6d commit 1fdfd69
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 68 deletions.
6 changes: 4 additions & 2 deletions arbiter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,10 @@ type UpConfig struct {
KafkaAddrs string `toml:"kafka-addrs" json:"kafka-addrs"`
KafkaVersion string `toml:"kafka-version" json:"kafka-version"`

InitialCommitTS int64 `toml:"initial-commit-ts" json:"initial-commit-ts"`
Topic string `toml:"topic" json:"topic"`
InitialCommitTS int64 `toml:"initial-commit-ts" json:"initial-commit-ts"`
Topic string `toml:"topic" json:"topic"`
MessageBufferSize int `toml:"message-buffer-size" json:"message-buffer-size"`
SaramaBufferSize int `toml:"sarama-buffer-size" json:"sarama-buffer-size"`
}

// DownConfig is configuration of downstream
Expand Down
8 changes: 5 additions & 3 deletions arbiter/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,11 @@ func NewServer(cfg *Config) (srv *Server, err error) {

// set reader to read binlog from kafka
readerCfg := &reader.Config{
KafkaAddr: strings.Split(up.KafkaAddrs, ","),
CommitTS: srv.finishTS,
Topic: up.Topic,
KafkaAddr: strings.Split(up.KafkaAddrs, ","),
CommitTS: srv.finishTS,
Topic: up.Topic,
SaramaBufferSize: up.SaramaBufferSize,
MessageBufferSize: up.MessageBufferSize,
}

log.Info("use kafka binlog reader", zap.Reflect("cfg", readerCfg))
Expand Down
8 changes: 7 additions & 1 deletion drainer/pump.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"sync/atomic"
"time"

"github.com/dustin/go-humanize"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb-binlog/pkg/util"
Expand Down Expand Up @@ -154,7 +155,12 @@ func (p *Pump) PullBinlog(pctx context.Context, last int64) chan MergeItem {
// TODO: add metric here
continue
}
readBinlogSizeHistogram.WithLabelValues(p.nodeID).Observe(float64(len(resp.Entity.Payload)))

payloadSize := len(resp.Entity.Payload)
readBinlogSizeHistogram.WithLabelValues(p.nodeID).Observe(float64(payloadSize))
if len(resp.Entity.Payload) >= 10*1024*1024 {
log.Info("receive big size binlog", zap.String("size", humanize.Bytes(uint64(payloadSize))))
}

binlog := new(pb.Binlog)
err = binlog.Unmarshal(resp.Entity.Payload)
Expand Down
36 changes: 31 additions & 5 deletions drainer/sync/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
)

var maxWaitTimeToSendMSG = time.Second * 30
var stallWriteSize = 90 * 1024 * 1024

var _ Syncer = &KafkaSyncer{}

Expand All @@ -38,8 +39,11 @@ type KafkaSyncer struct {
producer sarama.AsyncProducer
topic string

toBeAckCommitTSMu sync.Mutex
toBeAckCommitTS map[int64]struct{}
toBeAckCommitTSMu sync.Mutex
toBeAckCommitTS map[int64]int
toBeAckTotalSize int
resumeProduce chan struct{}
resumeProduceCloseOnce sync.Once

lastSuccessTime time.Time

Expand All @@ -63,7 +67,7 @@ func NewKafka(cfg *DBConfig, tableInfoGetter translator.TableInfoGetter) (*Kafka
executor := &KafkaSyncer{
addr: strings.Split(cfg.KafkaAddrs, ","),
topic: topic,
toBeAckCommitTS: make(map[int64]struct{}),
toBeAckCommitTS: make(map[int64]int),
shutdown: make(chan struct{}),
baseSyncer: newBaseSyncer(tableInfoGetter),
}
Expand Down Expand Up @@ -140,20 +144,35 @@ func (p *KafkaSyncer) saveBinlog(binlog *obinlog.Binlog, item *Item) error {
msg := &sarama.ProducerMessage{Topic: p.topic, Key: nil, Value: sarama.ByteEncoder(data), Partition: 0}
msg.Metadata = item

waitResume := false

p.toBeAckCommitTSMu.Lock()
if len(p.toBeAckCommitTS) == 0 {
p.lastSuccessTime = time.Now()
}
p.toBeAckCommitTS[binlog.CommitTs] = struct{}{}
p.toBeAckCommitTS[binlog.CommitTs] = len(data)
p.toBeAckTotalSize += len(data)
if p.toBeAckTotalSize >= stallWriteSize && len(p.toBeAckCommitTS) > 1 {
p.resumeProduce = make(chan struct{})
p.resumeProduceCloseOnce = sync.Once{}
waitResume = true
}
p.toBeAckCommitTSMu.Unlock()

if waitResume {
select {
case <-p.resumeProduce:
case <-p.errCh:
return errors.Trace(p.err)
}
}

select {
case p.producer.Input() <- msg:
return nil
case <-p.errCh:
return errors.Trace(p.err)
}

}

func (p *KafkaSyncer) run() {
Expand All @@ -171,6 +190,13 @@ func (p *KafkaSyncer) run() {

p.toBeAckCommitTSMu.Lock()
p.lastSuccessTime = time.Now()
size := p.toBeAckCommitTS[commitTs]
p.toBeAckTotalSize -= size
if p.toBeAckTotalSize < stallWriteSize && p.resumeProduce != nil {
p.resumeProduceCloseOnce.Do(func() {
close(p.resumeProduce)
})
}
delete(p.toBeAckCommitTS, commitTs)
p.toBeAckCommitTSMu.Unlock()

Expand Down
6 changes: 2 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/pingcap/tidb-binlog
require (
github.com/BurntSushi/toml v0.3.1
github.com/DATA-DOG/go-sqlmock v1.3.0
github.com/Shopify/sarama v1.23.1
github.com/Shopify/sarama v1.24.1
github.com/dustin/go-humanize v1.0.0
github.com/go-sql-driver/mysql v1.4.1
github.com/gogo/protobuf v1.2.1
Expand All @@ -12,15 +12,14 @@ require (
github.com/google/gofuzz v1.0.0
github.com/gorilla/mux v1.6.2
github.com/kami-zh/go-capturer v0.0.0-20171211120116-e492ea43421d
github.com/pierrec/lz4 v2.0.5+incompatible // indirect
github.com/pingcap/check v0.0.0-20191107115940-caf2b9e6ccf4
github.com/pingcap/errors v0.11.4
github.com/pingcap/kvproto v0.0.0-20191118050206-47672e7eabc0
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9
github.com/pingcap/parser v0.0.0-20191112053614-3b43b46331d5
github.com/pingcap/pd v1.1.0-beta.0.20191119124645-4c0c1027f412
github.com/pingcap/tidb v1.1.0-beta.0.20191119111543-77faf6466821
github.com/pingcap/tidb-tools v3.0.6-0.20191122032654-ea50b93da000+incompatible
github.com/pingcap/tidb-tools v3.0.6-0.20191125061035-b087739b71f1+incompatible
github.com/pingcap/tipb v0.0.0-20191120020146-6161b015e21e
github.com/prometheus/client_golang v1.0.0
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90
Expand All @@ -39,7 +38,6 @@ require (
golang.org/x/sync v0.0.0-20190423024810-112230192c58
golang.org/x/sys v0.0.0-20191029155521-f43be2a4598c
google.golang.org/grpc v1.23.1
gopkg.in/jcmturner/goidentity.v3 v3.0.0 // indirect
)

go 1.13
Loading

0 comments on commit 1fdfd69

Please sign in to comment.