Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/fix-flow-control' into kafka-con…
Browse files Browse the repository at this point in the history
…sumer-batch

Signed-off-by: Ping Yu <yuping@pingcap.com>
  • Loading branch information
pingyu committed Mar 2, 2024
2 parents 3d406b3 + a6f8b1d commit 84578d7
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 38 deletions.
18 changes: 16 additions & 2 deletions cdc/cdc/sink/codec/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,23 @@ type MQMessage struct {
entriesCount int // entries in one MQ Message
}

const (
MemoryReleaseThreshold = 1024
MemoryReleaseFactor = 10
)

func resetBuffer(buf []byte) []byte {
length := len(buf)
capSize := cap(buf)
if capSize > MemoryReleaseThreshold && length > 0 && length*MemoryReleaseFactor < capSize {
return nil
}
return buf[:0]
}

func (m *MQMessage) Reset() {
m.Key = m.Key[:0]
m.Value = m.Value[:0]
m.Key = resetBuffer(m.Key)
m.Value = resetBuffer(m.Value)
m.Ts = 0
m.Type = model.MqMessageTypeUnknown
m.Protocol = config.ProtocolDefault
Expand Down
7 changes: 1 addition & 6 deletions cdc/cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,11 +239,6 @@ func (k *mqSink) runWorker(ctx context.Context, partition int32) error {
flushToProducer := func(op codec.EncoderResult) error {
return k.statistics.RecordBatchExecution(func() (int, error) {
messages := encoder.Build()
defer func() {
for _, msg := range messages {
codec.ReleaseMQMessage(msg)
}
}()
thisBatchSize := 0
if len(messages) == 0 {
return 0, nil
Expand All @@ -255,11 +250,11 @@ func (k *mqSink) runWorker(ctx context.Context, partition int32) error {
})

for _, msg := range messages {
thisBatchSize += msg.GetEntriesCount()
err := k.writeToProducer(ctx, msg, codec.EncoderNeedAsyncWrite, partition)
if err != nil {
return 0, err
}
thisBatchSize += msg.GetEntriesCount()
}

if op == codec.EncoderNeedSyncWrite {
Expand Down
22 changes: 16 additions & 6 deletions cdc/cdc/sink/mq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (

type mqSinkSuite struct{}

var _ = check.Suite(&mqSinkSuite{})
var _ = check.SerialSuites(&mqSinkSuite{})

func (s mqSinkSuite) TestKafkaSink(c *check.C) {
defer testleak.AfterTest(c)()
Expand Down Expand Up @@ -144,6 +144,16 @@ func (s mqSinkSuite) TestFlushChangedEvents(c *check.C) {
opts := map[string]string{}
errCh := make(chan error, 1)

newSaramaConfigImplBak := kafkap.NewSaramaConfigImpl
kafkap.NewSaramaConfigImpl = func(ctx context.Context, config *kafkap.Config) (*sarama.Config, error) {
cfg, err := newSaramaConfigImplBak(ctx, config)
c.Assert(err, check.IsNil)
cfg.Producer.Flush.MaxMessages = 1
return cfg, err
}
defer func() {
kafkap.NewSaramaConfigImpl = newSaramaConfigImplBak
}()
kafkap.NewAdminClientImpl = kafka.NewMockAdminClient
defer func() {
kafkap.NewAdminClientImpl = kafka.NewSaramaAdminClient
Expand All @@ -152,8 +162,11 @@ func (s mqSinkSuite) TestFlushChangedEvents(c *check.C) {
sink, err := newKafkaSaramaSink(ctx, sinkURI, replicaConfig, opts, errCh)
c.Assert(err, check.IsNil)

// mock kafka broker processes 1 row changed event
leader.Returns(prodSuccess)
// mock kafka broker processes 3 row changed event
for i := 0; i < 3; i++ {
leader.Returns(prodSuccess)
}

keyspanID1 := model.KeySpanID(1)
kv1 := &model.RawKVEntry{
OpType: model.OpTypePut,
Expand Down Expand Up @@ -182,12 +195,9 @@ func (s mqSinkSuite) TestFlushChangedEvents(c *check.C) {
StartTs: 110,
CRTs: 130,
}

err = sink.EmitChangedEvents(ctx, kv3)
c.Assert(err, check.IsNil)

// mock kafka broker processes 1 row resolvedTs event
leader.Returns(prodSuccess)
checkpointTs1, err := sink.FlushChangedEvents(ctx, keyspanID1, kv1.CRTs)
c.Assert(err, check.IsNil)
c.Assert(checkpointTs1, check.Equals, kv1.CRTs)
Expand Down
2 changes: 1 addition & 1 deletion cdc/cdc/sink/producer/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
)

func init() {
sarama.MaxRequestSize = 1024 * 1024 * 1024 // 1GB
sarama.MaxRequestSize = 100 * 1024 * 1024 // 100MB
}

// Config stores user specified Kafka producer configuration
Expand Down
10 changes: 5 additions & 5 deletions cdc/cdc/sink/producer/kafka/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ func (s *kafkaSuite) TestNewSaramaConfig(c *check.C) {
ctx := context.Background()
config := NewConfig()
config.Version = "invalid"
_, err := newSaramaConfigImpl(ctx, config)
_, err := NewSaramaConfigImpl(ctx, config)
c.Assert(errors.Cause(err), check.ErrorMatches, "invalid version.*")

ctx = util.SetOwnerInCtx(ctx)
config.Version = "2.6.0"
config.ClientID = "^invalid$"
_, err = newSaramaConfigImpl(ctx, config)
_, err = NewSaramaConfigImpl(ctx, config)
c.Assert(cerror.ErrKafkaInvalidClientID.Equal(err), check.IsTrue)

config.ClientID = "test-kafka-client"
Expand All @@ -56,15 +56,15 @@ func (s *kafkaSuite) TestNewSaramaConfig(c *check.C) {
}
for _, cc := range compressionCases {
config.Compression = cc.algorithm
cfg, err := newSaramaConfigImpl(ctx, config)
cfg, err := NewSaramaConfigImpl(ctx, config)
c.Assert(err, check.IsNil)
c.Assert(cfg.Producer.Compression, check.Equals, cc.expected)
}

config.Credential = &security.Credential{
CAPath: "/invalid/ca/path",
}
_, err = newSaramaConfigImpl(ctx, config)
_, err = NewSaramaConfigImpl(ctx, config)
c.Assert(errors.Cause(err), check.ErrorMatches, ".*no such file or directory")

saslConfig := NewConfig()
Expand All @@ -76,7 +76,7 @@ func (s *kafkaSuite) TestNewSaramaConfig(c *check.C) {
SaslMechanism: sarama.SASLTypeSCRAMSHA256,
}

cfg, err := newSaramaConfigImpl(ctx, saslConfig)
cfg, err := NewSaramaConfigImpl(ctx, saslConfig)
c.Assert(err, check.IsNil)
c.Assert(cfg, check.NotNil)
c.Assert(cfg.Net.SASL.User, check.Equals, "user")
Expand Down
19 changes: 15 additions & 4 deletions cdc/cdc/sink/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ type kafkaSaramaProducer struct {

type kafkaProducerClosingFlag = int32

type kafkaMetadata struct {
message *codec.MQMessage
offset uint64
}

func (k *kafkaSaramaProducer) AsyncSendMessage(ctx context.Context, message *codec.MQMessage, partition int32) error {
k.clientLock.RLock()
defer k.clientLock.RUnlock()
Expand All @@ -88,7 +93,11 @@ func (k *kafkaSaramaProducer) AsyncSendMessage(ctx context.Context, message *cod
Value: sarama.ByteEncoder(message.Value),
Partition: partition,
}
msg.Metadata = atomic.AddUint64(&k.partitionOffset[partition].sent, 1)
metadata := &kafkaMetadata{
message: message,
offset: atomic.AddUint64(&k.partitionOffset[partition].sent, 1),
}
msg.Metadata = metadata

failpoint.Inject("KafkaSinkAsyncSendError", func() {
// simulate sending message to input channel successfully but flushing
Expand Down Expand Up @@ -242,7 +251,9 @@ func (k *kafkaSaramaProducer) run(ctx context.Context) error {
if msg == nil || msg.Metadata == nil {
continue
}
flushedOffset := msg.Metadata.(uint64)
metadata := msg.Metadata.(*kafkaMetadata)
codec.ReleaseMQMessage(metadata.message)
flushedOffset := metadata.offset
atomic.StoreUint64(&k.partitionOffset[msg.Partition].flushed, flushedOffset)
k.flushedNotifier.Notify()
case err := <-k.asyncClient.Errors():
Expand All @@ -258,14 +269,14 @@ func (k *kafkaSaramaProducer) run(ctx context.Context) error {
}

var (
newSaramaConfigImpl = newSaramaConfig
NewSaramaConfigImpl = newSaramaConfig
NewAdminClientImpl kafka.ClusterAdminClientCreator = kafka.NewSaramaAdminClient
)

// NewKafkaSaramaProducer creates a kafka sarama producer
func NewKafkaSaramaProducer(ctx context.Context, topic string, config *Config, opts map[string]string, errCh chan error) (*kafkaSaramaProducer, error) {
log.Info("Starting kafka sarama producer ...", zap.Reflect("config", config))
cfg, err := newSaramaConfigImpl(ctx, config)
cfg, err := NewSaramaConfigImpl(ctx, config)
if err != nil {
return nil, err
}
Expand Down
31 changes: 17 additions & 14 deletions cdc/cdc/sink/producer/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (

type kafkaSuite struct{}

var _ = check.Suite(&kafkaSuite{})
var _ = check.SerialSuites(&kafkaSuite{})

func Test(t *testing.T) { check.TestingT(t) }

Expand Down Expand Up @@ -95,13 +95,16 @@ func (s *kafkaSuite) TestNewSaramaProducer(c *check.C) {
config.AutoCreate = false
config.BrokerEndpoints = strings.Split(leader.Addr(), ",")

newSaramaConfigImplBak := newSaramaConfigImpl
newSaramaConfigImpl = func(ctx context.Context, config *Config) (*sarama.Config, error) {
newSaramaConfigImplBak := NewSaramaConfigImpl
NewSaramaConfigImpl = func(ctx context.Context, config *Config) (*sarama.Config, error) {
cfg, err := newSaramaConfigImplBak(ctx, config)
c.Assert(err, check.IsNil)
cfg.Producer.Flush.MaxMessages = 1
return cfg, err
}
defer func() {
NewSaramaConfigImpl = newSaramaConfigImplBak
}()
NewAdminClientImpl = kafka.NewMockAdminClient
defer func() {
NewAdminClientImpl = kafka.NewSaramaAdminClient
Expand Down Expand Up @@ -201,7 +204,7 @@ func (s *kafkaSuite) TestValidateMaxMessageBytesAndCreateTopic(c *check.C) {

// When topic exists and max message bytes is set correctly.
config.MaxMessageBytes = adminClient.GetDefaultMaxMessageBytes()
cfg, err := newSaramaConfigImpl(context.Background(), config)
cfg, err := NewSaramaConfigImpl(context.Background(), config)
c.Assert(err, check.IsNil)
opts := make(map[string]string)
err = validateMaxMessageBytesAndCreateTopic(adminClient, adminClient.GetDefaultMockTopicName(), config, cfg, opts)
Expand All @@ -212,7 +215,7 @@ func (s *kafkaSuite) TestValidateMaxMessageBytesAndCreateTopic(c *check.C) {
// use the smaller one.
defaultMaxMessageBytes := adminClient.GetDefaultMaxMessageBytes()
config.MaxMessageBytes = defaultMaxMessageBytes + 1
cfg, err = newSaramaConfigImpl(context.Background(), config)
cfg, err = NewSaramaConfigImpl(context.Background(), config)
c.Assert(err, check.IsNil)
opts = make(map[string]string)
err = validateMaxMessageBytesAndCreateTopic(adminClient, adminClient.GetDefaultMockTopicName(), config, cfg, opts)
Expand All @@ -221,7 +224,7 @@ func (s *kafkaSuite) TestValidateMaxMessageBytesAndCreateTopic(c *check.C) {
c.Assert(opts["max-message-bytes"], check.Equals, strconv.Itoa(cfg.Producer.MaxMessageBytes))

config.MaxMessageBytes = defaultMaxMessageBytes - 1
cfg, err = newSaramaConfigImpl(context.Background(), config)
cfg, err = NewSaramaConfigImpl(context.Background(), config)
c.Assert(err, check.IsNil)
opts = make(map[string]string)
err = validateMaxMessageBytesAndCreateTopic(adminClient, adminClient.GetDefaultMockTopicName(), config, cfg, opts)
Expand All @@ -231,7 +234,7 @@ func (s *kafkaSuite) TestValidateMaxMessageBytesAndCreateTopic(c *check.C) {

// When topic does not exist and auto-create is not enabled.
config.AutoCreate = false
cfg, err = newSaramaConfigImpl(context.Background(), config)
cfg, err = NewSaramaConfigImpl(context.Background(), config)
c.Assert(err, check.IsNil)
opts = make(map[string]string)
err = validateMaxMessageBytesAndCreateTopic(adminClient, "non-exist", config, cfg, opts)
Expand All @@ -245,7 +248,7 @@ func (s *kafkaSuite) TestValidateMaxMessageBytesAndCreateTopic(c *check.C) {
// It is less than the value of broker.
config.AutoCreate = true
config.MaxMessageBytes = defaultMaxMessageBytes - 1
cfg, err = newSaramaConfigImpl(context.Background(), config)
cfg, err = NewSaramaConfigImpl(context.Background(), config)
c.Assert(err, check.IsNil)
opts = make(map[string]string)
err = validateMaxMessageBytesAndCreateTopic(adminClient, "create-new-success", config, cfg, opts)
Expand All @@ -257,7 +260,7 @@ func (s *kafkaSuite) TestValidateMaxMessageBytesAndCreateTopic(c *check.C) {
// It is larger than the value of broker.
config.MaxMessageBytes = defaultMaxMessageBytes + 1
config.AutoCreate = true
cfg, err = newSaramaConfigImpl(context.Background(), config)
cfg, err = NewSaramaConfigImpl(context.Background(), config)
c.Assert(err, check.IsNil)
opts = make(map[string]string)
err = validateMaxMessageBytesAndCreateTopic(adminClient, "create-new-fail", config, cfg, opts)
Expand All @@ -269,7 +272,7 @@ func (s *kafkaSuite) TestValidateMaxMessageBytesAndCreateTopic(c *check.C) {
// the check of parameter succeeds.
// It is less than the value of broker.
config.MaxMessageBytes = defaultMaxMessageBytes - 1
cfg, err = newSaramaConfigImpl(context.Background(), config)
cfg, err = NewSaramaConfigImpl(context.Background(), config)
c.Assert(err, check.IsNil)
detail := &sarama.TopicDetail{
NumPartitions: 3,
Expand All @@ -288,7 +291,7 @@ func (s *kafkaSuite) TestValidateMaxMessageBytesAndCreateTopic(c *check.C) {
// the check of parameter fails.
// It is larger than the value of broker.
config.MaxMessageBytes = defaultMaxMessageBytes + 1
cfg, err = newSaramaConfigImpl(context.Background(), config)
cfg, err = NewSaramaConfigImpl(context.Background(), config)
c.Assert(err, check.IsNil)
opts = make(map[string]string)
err = validateMaxMessageBytesAndCreateTopic(adminClient, "test-topic", config, cfg, opts)
Expand Down Expand Up @@ -343,8 +346,8 @@ func (s *kafkaSuite) TestProducerSendMessageFailed(c *check.C) {
NewAdminClientImpl = kafka.NewSaramaAdminClient
}()

newSaramaConfigImplBak := newSaramaConfigImpl
newSaramaConfigImpl = func(ctx context.Context, config *Config) (*sarama.Config, error) {
newSaramaConfigImplBak := NewSaramaConfigImpl
NewSaramaConfigImpl = func(ctx context.Context, config *Config) (*sarama.Config, error) {
cfg, err := newSaramaConfigImplBak(ctx, config)
c.Assert(err, check.IsNil)
cfg.Producer.Flush.MaxMessages = 1
Expand All @@ -353,7 +356,7 @@ func (s *kafkaSuite) TestProducerSendMessageFailed(c *check.C) {
return cfg, err
}
defer func() {
newSaramaConfigImpl = newSaramaConfigImplBak
NewSaramaConfigImpl = newSaramaConfigImplBak
}()

errCh := make(chan error, 1)
Expand Down

0 comments on commit 84578d7

Please sign in to comment.