From 40357a1450e94f481521c9b91e10a6a7a78dbc77 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Mon, 4 Mar 2024 11:01:52 +0800 Subject: [PATCH] [close #383] fix integration test case `flow_control` & `stop_downstream` (#392) * collect pprof heap Signed-off-by: Ping Yu * unlimit retry for pd connection Signed-off-by: Ping Yu * reduce record size Signed-off-by: Ping Yu * log level: info Signed-off-by: Ping Yu * reduce data size; add grafana panel Signed-off-by: Ping Yu * fix encoder size Signed-off-by: Ping Yu * fix Signed-off-by: Ping Yu * MQMessage pool Signed-off-by: Ping Yu * fix release Signed-off-by: Ping Yu * wip Signed-off-by: Ping Yu * fix flaky ut Signed-off-by: Ping Yu * logging Signed-off-by: Ping Yu * fix ut Signed-off-by: Ping Yu * adjust memory release parameter Signed-off-by: Ping Yu * polish Signed-off-by: Ping Yu * polish Signed-off-by: Ping Yu * polish Signed-off-by: Ping Yu --------- Signed-off-by: Ping Yu --- cdc/cdc/sink/codec/interface.go | 58 +++++++++++++---- cdc/cdc/sink/codec/json.go | 33 ++++++++-- cdc/cdc/sink/codec/json_test.go | 19 +++++- cdc/cdc/sink/mq.go | 2 +- cdc/cdc/sink/mq_test.go | 24 +++++-- cdc/cdc/sink/producer/kafka/config.go | 2 +- cdc/cdc/sink/producer/kafka/config_test.go | 10 +-- cdc/cdc/sink/producer/kafka/kafka.go | 28 ++++++-- cdc/cdc/sink/producer/kafka/kafka_test.go | 31 +++++---- cdc/cmd/kafka-consumer/tikv.go | 3 +- .../docker-compose-kafka-integration.yml | 21 ++++++ cdc/metrics/grafana/tikv-cdc.json | 64 ++++++++++++++----- .../integration_tests/_utils/run_cdc_server | 2 +- .../_utils/start_tidb_cluster_impl | 11 +++- .../flow_control/config/workload | 4 +- .../integration_tests/flow_control/run.sh | 17 +++-- .../integration_tests/run_kafka_in_docker.sh | 2 +- 17 files changed, 249 insertions(+), 82 deletions(-) diff --git a/cdc/cdc/sink/codec/interface.go b/cdc/cdc/sink/codec/interface.go index 5b2faaa8..18908ebe 100644 --- a/cdc/cdc/sink/codec/interface.go +++ b/cdc/cdc/sink/codec/interface.go @@ -16,6 +16,7 @@ package codec import ( "context" "encoding/binary" + "sync" "time" "github.com/pingcap/log" @@ -63,6 +64,29 @@ type MQMessage struct { entriesCount int // entries in one MQ Message } +const ( + MemoryReleaseThreshold = 100 * 1024 // 100KiB + MemoryReleaseFactor = 100 +) + +func resetBuffer(buf []byte) []byte { + length := len(buf) + capSize := cap(buf) + if capSize > MemoryReleaseThreshold && length > 0 && length*MemoryReleaseFactor < capSize { + return nil + } + return buf[:0] +} + +func (m *MQMessage) Reset() { + m.Key = resetBuffer(m.Key) + m.Value = resetBuffer(m.Value) + m.Ts = 0 + m.Type = model.MqMessageTypeUnknown + m.Protocol = config.ProtocolDefault + m.entriesCount = 0 +} + // maximumRecordOverhead is used to calculate ProducerMessage's byteSize by sarama kafka client. // reference: https://github.com/Shopify/sarama/blob/66521126c71c522c15a36663ae9cddc2b024c799/async_producer.go#L233 // for TiKV-CDC, minimum supported kafka version is `0.11.0.2`, which will be treated as `version = 2` by sarama producer. @@ -99,31 +123,43 @@ func newResolvedMQMessage(proto config.Protocol, key, value []byte, ts uint64) * return NewMQMessage(proto, key, value, ts, model.MqMessageTypeResolved) } +var mqMsgPool = sync.Pool{ + New: func() any { + return new(MQMessage) + }, +} + // NewMQMessage should be used when creating a MQMessage struct. // It copies the input byte slices to avoid any surprises in asynchronous MQ writes. func NewMQMessage(proto config.Protocol, key []byte, value []byte, ts uint64, ty model.MqMessageType) *MQMessage { - ret := &MQMessage{ - Key: nil, - Value: nil, - Ts: ts, - Type: ty, - Protocol: proto, - entriesCount: 0, + ret := mqMsgPool.Get().(*MQMessage) + + // TODO: remove this check. + if len(ret.Key) > 0 || len(ret.Value) > 0 { + log.Panic("MQMessage is not reset", zap.String("key", string(ret.Key)), zap.String("value", string(ret.Value))) } + ret.Ts = ts + ret.Type = ty + ret.Protocol = proto + ret.entriesCount = 0 + if key != nil { - ret.Key = make([]byte, len(key)) - copy(ret.Key, key) + ret.Key = append(ret.Key, key...) } if value != nil { - ret.Value = make([]byte, len(value)) - copy(ret.Value, value) + ret.Value = append(ret.Value, value...) } return ret } +func ReleaseMQMessage(m *MQMessage) { + m.Reset() + mqMsgPool.Put(m) +} + // EventBatchDecoder is an abstraction for events decoder // this interface is only for testing now type EventBatchDecoder interface { diff --git a/cdc/cdc/sink/codec/json.go b/cdc/cdc/sink/codec/json.go index 05bdb03a..071d606b 100644 --- a/cdc/cdc/sink/codec/json.go +++ b/cdc/cdc/sink/codec/json.go @@ -125,8 +125,10 @@ type JSONEventBatchEncoder struct { valueBuf *bytes.Buffer // Deprecated: only used for MixedBuild for now supportMixedBuild bool // TODO decouple this out - messageBuf []*MQMessage - curBatchSize int + messageBuf []*MQMessage + curBatchSize int + totalBatchBytes int // Note: The size of last message is not included + // configs maxMessageBytes int maxBatchSize int @@ -226,6 +228,9 @@ func (d *JSONEventBatchEncoder) AppendChangedEvent(e *model.RawKVEntry) (Encoder versionHead := make([]byte, 8) binary.BigEndian.PutUint64(versionHead, BatchVersion1) + if len(d.messageBuf) > 0 { + d.totalBatchBytes += d.messageBuf[len(d.messageBuf)-1].Length() + } d.messageBuf = append(d.messageBuf, NewMQMessage(config.ProtocolOpen, versionHead, nil, 0, model.MqMessageTypeKv)) d.curBatchSize = 0 } @@ -249,6 +254,8 @@ func (d *JSONEventBatchEncoder) AppendChangedEvent(e *model.RawKVEntry) (Encoder } // Build implements the EventBatchEncoder interface +// NOTE: when supportMixedBuild is enabled, must call Reset() after the returned `mqMessages` is used. +// It's not a good design. As supportMixedBuild is used in unit tests only, we don't fix it now. func (d *JSONEventBatchEncoder) Build() (mqMessages []*MQMessage) { if d.supportMixedBuild { if d.valueBuf.Len() == 0 { @@ -260,7 +267,7 @@ func (d *JSONEventBatchEncoder) Build() (mqMessages []*MQMessage) { } ret := d.messageBuf - d.messageBuf = make([]*MQMessage, 0) + d.Reset() return ret } @@ -307,13 +314,27 @@ func (d *JSONEventBatchEncoder) MixedBuild(withVersion bool) []byte { // Size implements the EventBatchEncoder interface func (d *JSONEventBatchEncoder) Size() int { - return d.keyBuf.Len() + d.valueBuf.Len() + if d.supportMixedBuild { + return d.keyBuf.Len() + d.valueBuf.Len() + } + + lastMessageLength := 0 + if len(d.messageBuf) > 0 { + lastMessageLength = d.messageBuf[len(d.messageBuf)-1].Length() + } + return d.totalBatchBytes + lastMessageLength } // Reset implements the EventBatchEncoder interface func (d *JSONEventBatchEncoder) Reset() { - d.keyBuf.Reset() - d.valueBuf.Reset() + if d.supportMixedBuild { + d.keyBuf.Reset() + d.valueBuf.Reset() + } else { + d.messageBuf = make([]*MQMessage, 0) + d.curBatchSize = 0 + d.totalBatchBytes = 0 + } } // SetParams reads relevant parameters for Open Protocol diff --git a/cdc/cdc/sink/codec/json_test.go b/cdc/cdc/sink/codec/json_test.go index 7b06d03d..aaca75a0 100644 --- a/cdc/cdc/sink/codec/json_test.go +++ b/cdc/cdc/sink/codec/json_test.go @@ -197,6 +197,7 @@ func (s *batchSuite) TestSetParams(c *check.C) { func (s *batchSuite) TestMaxMessageBytes(c *check.C) { defer testleak.AfterTest(c)() encoder := NewJSONEventBatchEncoder() + c.Check(encoder.Size(), check.Equals, 0) // the size of `testEvent` is 75 testEvent := &model.RawKVEntry{ @@ -207,14 +208,24 @@ func (s *batchSuite) TestMaxMessageBytes(c *check.C) { ExpiredTs: 200, } eventSize := 75 + // for a single message, the overhead is 36(maximumRecordOverhead) + 8(versionHea) = 44. + overhead := 36 + 8 - // for a single message, the overhead is 36(maximumRecordOverhead) + 8(versionHea) = 44, just can hold it. - a := strconv.Itoa(eventSize + 44) + // just can hold a single message. + a := strconv.Itoa(eventSize + overhead) err := encoder.SetParams(map[string]string{"max-message-bytes": a}) c.Check(err, check.IsNil) r, err := encoder.AppendChangedEvent(testEvent) c.Check(err, check.IsNil) c.Check(r, check.Equals, EncoderNoOperation) + totalSize := eventSize + overhead + c.Check(encoder.Size(), check.Equals, totalSize) + + r, err = encoder.AppendChangedEvent(testEvent) + c.Check(err, check.IsNil) + c.Check(r, check.Equals, EncoderNoOperation) + totalSize += eventSize + overhead + c.Check(encoder.Size(), check.Equals, totalSize) a = strconv.Itoa(eventSize + 43) err = encoder.SetParams(map[string]string{"max-message-bytes": a}) @@ -222,8 +233,10 @@ func (s *batchSuite) TestMaxMessageBytes(c *check.C) { r, err = encoder.AppendChangedEvent(testEvent) c.Check(err, check.NotNil) c.Check(r, check.Equals, EncoderNoOperation) + c.Check(encoder.Size(), check.Equals, totalSize) // make sure each batch's `Length` not greater than `max-message-bytes` + // 256: each message can hold 2 events (75 * 2 + 36 + 8 = 194) err = encoder.SetParams(map[string]string{"max-message-bytes": "256"}) c.Check(err, check.IsNil) @@ -232,6 +245,8 @@ func (s *batchSuite) TestMaxMessageBytes(c *check.C) { c.Check(r, check.Equals, EncoderNoOperation) c.Check(err, check.IsNil) } + totalSize += (eventSize*2 + overhead) * 5000 + c.Check(encoder.Size(), check.Equals, totalSize) messages := encoder.Build() for _, msg := range messages { diff --git a/cdc/cdc/sink/mq.go b/cdc/cdc/sink/mq.go index a305a830..761df1c2 100644 --- a/cdc/cdc/sink/mq.go +++ b/cdc/cdc/sink/mq.go @@ -250,11 +250,11 @@ func (k *mqSink) runWorker(ctx context.Context, partition int32) error { }) for _, msg := range messages { + thisBatchSize += msg.GetEntriesCount() err := k.writeToProducer(ctx, msg, codec.EncoderNeedAsyncWrite, partition) if err != nil { return 0, err } - thisBatchSize += msg.GetEntriesCount() } if op == codec.EncoderNeedSyncWrite { diff --git a/cdc/cdc/sink/mq_test.go b/cdc/cdc/sink/mq_test.go index e337e5c6..39699790 100644 --- a/cdc/cdc/sink/mq_test.go +++ b/cdc/cdc/sink/mq_test.go @@ -32,7 +32,7 @@ import ( type mqSinkSuite struct{} -var _ = check.Suite(&mqSinkSuite{}) +var _ = check.SerialSuites(&mqSinkSuite{}) func (s mqSinkSuite) TestKafkaSink(c *check.C) { defer testleak.AfterTest(c)() @@ -144,6 +144,16 @@ func (s mqSinkSuite) TestFlushChangedEvents(c *check.C) { opts := map[string]string{} errCh := make(chan error, 1) + newSaramaConfigImplBak := kafkap.NewSaramaConfigImpl + kafkap.NewSaramaConfigImpl = func(ctx context.Context, config *kafkap.Config) (*sarama.Config, error) { + cfg, err := newSaramaConfigImplBak(ctx, config) + c.Assert(err, check.IsNil) + cfg.Producer.Flush.MaxMessages = 1 + return cfg, err + } + defer func() { + kafkap.NewSaramaConfigImpl = newSaramaConfigImplBak + }() kafkap.NewAdminClientImpl = kafka.NewMockAdminClient defer func() { kafkap.NewAdminClientImpl = kafka.NewSaramaAdminClient @@ -152,8 +162,11 @@ func (s mqSinkSuite) TestFlushChangedEvents(c *check.C) { sink, err := newKafkaSaramaSink(ctx, sinkURI, replicaConfig, opts, errCh) c.Assert(err, check.IsNil) - // mock kafka broker processes 1 row changed event - leader.Returns(prodSuccess) + // mock kafka broker processes 3 row changed events + for i := 0; i < 3; i++ { + leader.Returns(prodSuccess) + } + keyspanID1 := model.KeySpanID(1) kv1 := &model.RawKVEntry{ OpType: model.OpTypePut, @@ -182,12 +195,13 @@ func (s mqSinkSuite) TestFlushChangedEvents(c *check.C) { StartTs: 110, CRTs: 130, } - err = sink.EmitChangedEvents(ctx, kv3) c.Assert(err, check.IsNil) + // TODO: fix EmitCheckpointTs // mock kafka broker processes 1 row resolvedTs event - leader.Returns(prodSuccess) + // leader.Returns(prodSuccess) + checkpointTs1, err := sink.FlushChangedEvents(ctx, keyspanID1, kv1.CRTs) c.Assert(err, check.IsNil) c.Assert(checkpointTs1, check.Equals, kv1.CRTs) diff --git a/cdc/cdc/sink/producer/kafka/config.go b/cdc/cdc/sink/producer/kafka/config.go index e33395df..3293f476 100644 --- a/cdc/cdc/sink/producer/kafka/config.go +++ b/cdc/cdc/sink/producer/kafka/config.go @@ -31,7 +31,7 @@ import ( ) func init() { - sarama.MaxRequestSize = 1024 * 1024 * 1024 // 1GB + sarama.MaxRequestSize = 100 * 1024 * 1024 // 100MB } // Config stores user specified Kafka producer configuration diff --git a/cdc/cdc/sink/producer/kafka/config_test.go b/cdc/cdc/sink/producer/kafka/config_test.go index c83c2ea9..967cce5a 100644 --- a/cdc/cdc/sink/producer/kafka/config_test.go +++ b/cdc/cdc/sink/producer/kafka/config_test.go @@ -33,13 +33,13 @@ func (s *kafkaSuite) TestNewSaramaConfig(c *check.C) { ctx := context.Background() config := NewConfig() config.Version = "invalid" - _, err := newSaramaConfigImpl(ctx, config) + _, err := NewSaramaConfigImpl(ctx, config) c.Assert(errors.Cause(err), check.ErrorMatches, "invalid version.*") ctx = util.SetOwnerInCtx(ctx) config.Version = "2.6.0" config.ClientID = "^invalid$" - _, err = newSaramaConfigImpl(ctx, config) + _, err = NewSaramaConfigImpl(ctx, config) c.Assert(cerror.ErrKafkaInvalidClientID.Equal(err), check.IsTrue) config.ClientID = "test-kafka-client" @@ -56,7 +56,7 @@ func (s *kafkaSuite) TestNewSaramaConfig(c *check.C) { } for _, cc := range compressionCases { config.Compression = cc.algorithm - cfg, err := newSaramaConfigImpl(ctx, config) + cfg, err := NewSaramaConfigImpl(ctx, config) c.Assert(err, check.IsNil) c.Assert(cfg.Producer.Compression, check.Equals, cc.expected) } @@ -64,7 +64,7 @@ func (s *kafkaSuite) TestNewSaramaConfig(c *check.C) { config.Credential = &security.Credential{ CAPath: "/invalid/ca/path", } - _, err = newSaramaConfigImpl(ctx, config) + _, err = NewSaramaConfigImpl(ctx, config) c.Assert(errors.Cause(err), check.ErrorMatches, ".*no such file or directory") saslConfig := NewConfig() @@ -76,7 +76,7 @@ func (s *kafkaSuite) TestNewSaramaConfig(c *check.C) { SaslMechanism: sarama.SASLTypeSCRAMSHA256, } - cfg, err := newSaramaConfigImpl(ctx, saslConfig) + cfg, err := NewSaramaConfigImpl(ctx, saslConfig) c.Assert(err, check.IsNil) c.Assert(cfg, check.NotNil) c.Assert(cfg.Net.SASL.User, check.Equals, "user") diff --git a/cdc/cdc/sink/producer/kafka/kafka.go b/cdc/cdc/sink/producer/kafka/kafka.go index d1a937b8..447879ee 100644 --- a/cdc/cdc/sink/producer/kafka/kafka.go +++ b/cdc/cdc/sink/producer/kafka/kafka.go @@ -72,6 +72,11 @@ type kafkaSaramaProducer struct { type kafkaProducerClosingFlag = int32 +type kafkaMetadata struct { + message *codec.MQMessage + offset uint64 +} + func (k *kafkaSaramaProducer) AsyncSendMessage(ctx context.Context, message *codec.MQMessage, partition int32) error { k.clientLock.RLock() defer k.clientLock.RUnlock() @@ -88,7 +93,12 @@ func (k *kafkaSaramaProducer) AsyncSendMessage(ctx context.Context, message *cod Value: sarama.ByteEncoder(message.Value), Partition: partition, } - msg.Metadata = atomic.AddUint64(&k.partitionOffset[partition].sent, 1) + metadata := &kafkaMetadata{ + message: message, + offset: atomic.AddUint64(&k.partitionOffset[partition].sent, 1), + } + log.Debug("kafka producer sending message", zap.Int32("partition", partition), zap.Uint64("offset", metadata.offset)) + msg.Metadata = metadata failpoint.Inject("KafkaSinkAsyncSendError", func() { // simulate sending message to input channel successfully but flushing @@ -242,8 +252,16 @@ func (k *kafkaSaramaProducer) run(ctx context.Context) error { if msg == nil || msg.Metadata == nil { continue } - flushedOffset := msg.Metadata.(uint64) - atomic.StoreUint64(&k.partitionOffset[msg.Partition].flushed, flushedOffset) + metadata := msg.Metadata.(*kafkaMetadata) + codec.ReleaseMQMessage(metadata.message) + flushedOffset := metadata.offset + + prevOffset := atomic.SwapUint64(&k.partitionOffset[msg.Partition].flushed, flushedOffset) + if flushedOffset <= prevOffset { + log.Panic("kafka producer flushed offset goes backward", zap.Int32("partition", msg.Partition), zap.Uint64("flushed", flushedOffset), zap.Uint64("prev", prevOffset)) + } + log.Debug("kafka producer flushed message", zap.Int32("partition", msg.Partition), zap.Uint64("offset", flushedOffset), zap.Uint64("prev", prevOffset)) + k.flushedNotifier.Notify() case err := <-k.asyncClient.Errors(): // We should not wrap a nil pointer if the pointer is of a subtype of `error` @@ -258,14 +276,14 @@ func (k *kafkaSaramaProducer) run(ctx context.Context) error { } var ( - newSaramaConfigImpl = newSaramaConfig + NewSaramaConfigImpl = newSaramaConfig NewAdminClientImpl kafka.ClusterAdminClientCreator = kafka.NewSaramaAdminClient ) // NewKafkaSaramaProducer creates a kafka sarama producer func NewKafkaSaramaProducer(ctx context.Context, topic string, config *Config, opts map[string]string, errCh chan error) (*kafkaSaramaProducer, error) { log.Info("Starting kafka sarama producer ...", zap.Reflect("config", config)) - cfg, err := newSaramaConfigImpl(ctx, config) + cfg, err := NewSaramaConfigImpl(ctx, config) if err != nil { return nil, err } diff --git a/cdc/cdc/sink/producer/kafka/kafka_test.go b/cdc/cdc/sink/producer/kafka/kafka_test.go index 98bc6c39..ec5c7d69 100644 --- a/cdc/cdc/sink/producer/kafka/kafka_test.go +++ b/cdc/cdc/sink/producer/kafka/kafka_test.go @@ -31,7 +31,7 @@ import ( type kafkaSuite struct{} -var _ = check.Suite(&kafkaSuite{}) +var _ = check.SerialSuites(&kafkaSuite{}) func Test(t *testing.T) { check.TestingT(t) } @@ -95,13 +95,16 @@ func (s *kafkaSuite) TestNewSaramaProducer(c *check.C) { config.AutoCreate = false config.BrokerEndpoints = strings.Split(leader.Addr(), ",") - newSaramaConfigImplBak := newSaramaConfigImpl - newSaramaConfigImpl = func(ctx context.Context, config *Config) (*sarama.Config, error) { + newSaramaConfigImplBak := NewSaramaConfigImpl + NewSaramaConfigImpl = func(ctx context.Context, config *Config) (*sarama.Config, error) { cfg, err := newSaramaConfigImplBak(ctx, config) c.Assert(err, check.IsNil) cfg.Producer.Flush.MaxMessages = 1 return cfg, err } + defer func() { + NewSaramaConfigImpl = newSaramaConfigImplBak + }() NewAdminClientImpl = kafka.NewMockAdminClient defer func() { NewAdminClientImpl = kafka.NewSaramaAdminClient @@ -201,7 +204,7 @@ func (s *kafkaSuite) TestValidateMaxMessageBytesAndCreateTopic(c *check.C) { // When topic exists and max message bytes is set correctly. config.MaxMessageBytes = adminClient.GetDefaultMaxMessageBytes() - cfg, err := newSaramaConfigImpl(context.Background(), config) + cfg, err := NewSaramaConfigImpl(context.Background(), config) c.Assert(err, check.IsNil) opts := make(map[string]string) err = validateMaxMessageBytesAndCreateTopic(adminClient, adminClient.GetDefaultMockTopicName(), config, cfg, opts) @@ -212,7 +215,7 @@ func (s *kafkaSuite) TestValidateMaxMessageBytesAndCreateTopic(c *check.C) { // use the smaller one. defaultMaxMessageBytes := adminClient.GetDefaultMaxMessageBytes() config.MaxMessageBytes = defaultMaxMessageBytes + 1 - cfg, err = newSaramaConfigImpl(context.Background(), config) + cfg, err = NewSaramaConfigImpl(context.Background(), config) c.Assert(err, check.IsNil) opts = make(map[string]string) err = validateMaxMessageBytesAndCreateTopic(adminClient, adminClient.GetDefaultMockTopicName(), config, cfg, opts) @@ -221,7 +224,7 @@ func (s *kafkaSuite) TestValidateMaxMessageBytesAndCreateTopic(c *check.C) { c.Assert(opts["max-message-bytes"], check.Equals, strconv.Itoa(cfg.Producer.MaxMessageBytes)) config.MaxMessageBytes = defaultMaxMessageBytes - 1 - cfg, err = newSaramaConfigImpl(context.Background(), config) + cfg, err = NewSaramaConfigImpl(context.Background(), config) c.Assert(err, check.IsNil) opts = make(map[string]string) err = validateMaxMessageBytesAndCreateTopic(adminClient, adminClient.GetDefaultMockTopicName(), config, cfg, opts) @@ -231,7 +234,7 @@ func (s *kafkaSuite) TestValidateMaxMessageBytesAndCreateTopic(c *check.C) { // When topic does not exist and auto-create is not enabled. config.AutoCreate = false - cfg, err = newSaramaConfigImpl(context.Background(), config) + cfg, err = NewSaramaConfigImpl(context.Background(), config) c.Assert(err, check.IsNil) opts = make(map[string]string) err = validateMaxMessageBytesAndCreateTopic(adminClient, "non-exist", config, cfg, opts) @@ -245,7 +248,7 @@ func (s *kafkaSuite) TestValidateMaxMessageBytesAndCreateTopic(c *check.C) { // It is less than the value of broker. config.AutoCreate = true config.MaxMessageBytes = defaultMaxMessageBytes - 1 - cfg, err = newSaramaConfigImpl(context.Background(), config) + cfg, err = NewSaramaConfigImpl(context.Background(), config) c.Assert(err, check.IsNil) opts = make(map[string]string) err = validateMaxMessageBytesAndCreateTopic(adminClient, "create-new-success", config, cfg, opts) @@ -257,7 +260,7 @@ func (s *kafkaSuite) TestValidateMaxMessageBytesAndCreateTopic(c *check.C) { // It is larger than the value of broker. config.MaxMessageBytes = defaultMaxMessageBytes + 1 config.AutoCreate = true - cfg, err = newSaramaConfigImpl(context.Background(), config) + cfg, err = NewSaramaConfigImpl(context.Background(), config) c.Assert(err, check.IsNil) opts = make(map[string]string) err = validateMaxMessageBytesAndCreateTopic(adminClient, "create-new-fail", config, cfg, opts) @@ -269,7 +272,7 @@ func (s *kafkaSuite) TestValidateMaxMessageBytesAndCreateTopic(c *check.C) { // the check of parameter succeeds. // It is less than the value of broker. config.MaxMessageBytes = defaultMaxMessageBytes - 1 - cfg, err = newSaramaConfigImpl(context.Background(), config) + cfg, err = NewSaramaConfigImpl(context.Background(), config) c.Assert(err, check.IsNil) detail := &sarama.TopicDetail{ NumPartitions: 3, @@ -288,7 +291,7 @@ func (s *kafkaSuite) TestValidateMaxMessageBytesAndCreateTopic(c *check.C) { // the check of parameter fails. // It is larger than the value of broker. config.MaxMessageBytes = defaultMaxMessageBytes + 1 - cfg, err = newSaramaConfigImpl(context.Background(), config) + cfg, err = NewSaramaConfigImpl(context.Background(), config) c.Assert(err, check.IsNil) opts = make(map[string]string) err = validateMaxMessageBytesAndCreateTopic(adminClient, "test-topic", config, cfg, opts) @@ -343,8 +346,8 @@ func (s *kafkaSuite) TestProducerSendMessageFailed(c *check.C) { NewAdminClientImpl = kafka.NewSaramaAdminClient }() - newSaramaConfigImplBak := newSaramaConfigImpl - newSaramaConfigImpl = func(ctx context.Context, config *Config) (*sarama.Config, error) { + newSaramaConfigImplBak := NewSaramaConfigImpl + NewSaramaConfigImpl = func(ctx context.Context, config *Config) (*sarama.Config, error) { cfg, err := newSaramaConfigImplBak(ctx, config) c.Assert(err, check.IsNil) cfg.Producer.Flush.MaxMessages = 1 @@ -353,7 +356,7 @@ func (s *kafkaSuite) TestProducerSendMessageFailed(c *check.C) { return cfg, err } defer func() { - newSaramaConfigImpl = newSaramaConfigImplBak + NewSaramaConfigImpl = newSaramaConfigImplBak }() errCh := make(chan error, 1) diff --git a/cdc/cmd/kafka-consumer/tikv.go b/cdc/cmd/kafka-consumer/tikv.go index e808ee3e..8cba5f46 100644 --- a/cdc/cmd/kafka-consumer/tikv.go +++ b/cdc/cmd/kafka-consumer/tikv.go @@ -15,6 +15,7 @@ package main import ( "context" + "math" "net/url" "time" @@ -29,7 +30,7 @@ import ( ) const ( - defaultPDErrorRetry int = 10 + defaultPDErrorRetry int = math.MaxInt ) var _ sink.Sink = (*tikvSimpleSink)(nil) diff --git a/cdc/deployments/tikv-cdc/docker-compose/docker-compose-kafka-integration.yml b/cdc/deployments/tikv-cdc/docker-compose/docker-compose-kafka-integration.yml index 8d47a2a8..50ab8592 100644 --- a/cdc/deployments/tikv-cdc/docker-compose/docker-compose-kafka-integration.yml +++ b/cdc/deployments/tikv-cdc/docker-compose/docker-compose-kafka-integration.yml @@ -5,6 +5,13 @@ services: image: wurstmeister/zookeeper ports: - "2181:2181" + deploy: + resources: + limits: + cpus: '2' + memory: 4G + reservations: + memory: 4G kafka: image: wurstmeister/kafka:2.12-2.4.1 @@ -19,6 +26,13 @@ services: KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://127.0.0.1:9092" ZK: "zk" KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + deploy: + resources: + limits: + cpus: '2' + memory: 4G + reservations: + memory: 4G depends_on: - "zookeeper" @@ -38,3 +52,10 @@ services: network_mode: "service:kafka" volumes: - /tmp/tikv_cdc_test/:/tmp/tikv_cdc_test + deploy: + resources: + limits: + cpus: '4' + memory: 16G + reservations: + memory: 12G diff --git a/cdc/metrics/grafana/tikv-cdc.json b/cdc/metrics/grafana/tikv-cdc.json index 6ff82cc0..ec885bd6 100644 --- a/cdc/metrics/grafana/tikv-cdc.json +++ b/cdc/metrics/grafana/tikv-cdc.json @@ -10371,7 +10371,7 @@ "dashLength": 10, "dashes": false, "datasource": "${DS_TEST-CLUSTER}", - "description": "Percentiles of TiKV sink write duration by each server.\n", + "description": "Percentiles of $sink_type sink write duration by each server.\n", "fieldConfig": { "defaults": { "links": [] @@ -10419,7 +10419,7 @@ "targets": [ { "exemplar": true, - "expr": "histogram_quantile(0.95, sum(rate(tikv_cdc_sink_txn_exec_duration_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\", type=\"TiKV\"}[1m])) by (le,instance, changefeed))", + "expr": "histogram_quantile(0.95, sum(rate(tikv_cdc_sink_txn_exec_duration_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\", type=\"$sink_type\"}[1m])) by (le,instance, changefeed))", "format": "time_series", "hide": false, "interval": "", @@ -10429,7 +10429,7 @@ }, { "exemplar": true, - "expr": "histogram_quantile(0.99, sum(rate(tikv_cdc_sink_txn_exec_duration_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\", type=\"TiKV\"}[1m])) by (le,instance, changefeed))", + "expr": "histogram_quantile(0.99, sum(rate(tikv_cdc_sink_txn_exec_duration_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\", type=\"$sink_type\"}[1m])) by (le,instance, changefeed))", "format": "time_series", "hide": false, "interval": "", @@ -10439,7 +10439,7 @@ }, { "exemplar": true, - "expr": "histogram_quantile(0.999, sum(rate(tikv_cdc_sink_txn_exec_duration_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\", type=\"TiKV\"}[1m])) by (le,instance, changefeed))", + "expr": "histogram_quantile(0.999, sum(rate(tikv_cdc_sink_txn_exec_duration_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\", type=\"$sink_type\"}[1m])) by (le,instance, changefeed))", "format": "time_series", "hide": false, "interval": "", @@ -10452,7 +10452,7 @@ "timeFrom": null, "timeRegions": [], "timeShift": null, - "title": "TiKV sink write duration percentile", + "title": "$sink_type sink write duration percentile", "tooltip": { "shared": true, "sort": 0, @@ -10543,7 +10543,7 @@ "targets": [ { "exemplar": true, - "expr": "histogram_quantile(0.90, sum(rate(tikv_cdc_sink_txn_batch_size_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\", type=\"TiKV\"}[1m])) by (le, instance, changefeed))", + "expr": "histogram_quantile(0.90, sum(rate(tikv_cdc_sink_txn_batch_size_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\", type=\"$sink_type\"}[1m])) by (le, instance, changefeed))", "format": "time_series", "interval": "", "intervalFactor": 1, @@ -10552,7 +10552,7 @@ }, { "exemplar": true, - "expr": "histogram_quantile(0.99, sum(rate(tikv_cdc_sink_txn_batch_size_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\", type=\"TiKV\"}[1m])) by (le, instance, changefeed))", + "expr": "histogram_quantile(0.99, sum(rate(tikv_cdc_sink_txn_batch_size_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\", type=\"$sink_type\"}[1m])) by (le, instance, changefeed))", "format": "time_series", "interval": "", "intervalFactor": 1, @@ -10561,7 +10561,7 @@ }, { "exemplar": true, - "expr": "histogram_quantile(0.999, sum(rate(tikv_cdc_sink_txn_batch_size_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\", type=\"TiKV\"}[1m])) by (le, instance, changefeed))", + "expr": "histogram_quantile(0.999, sum(rate(tikv_cdc_sink_txn_batch_size_bucket{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\", type=\"$sink_type\"}[1m])) by (le, instance, changefeed))", "format": "time_series", "hide": true, "interval": "", @@ -10574,7 +10574,7 @@ "timeFrom": null, "timeRegions": [], "timeShift": null, - "title": "TiKV sink write batch size percentile", + "title": "$sink_type sink write batch size percentile", "tooltip": { "shared": true, "sort": 0, @@ -10617,7 +10617,7 @@ "dashLength": 10, "dashes": false, "datasource": "${DS_TEST-CLUSTER}", - "description": "The number of changed entires that are written to downstream TiKV", + "description": "The number of changed entires that are written to downstream sink", "fieldConfig": { "defaults": { "links": [] @@ -10664,7 +10664,7 @@ "steppedLine": false, "targets": [ { - "expr": "sum (rate(tikv_cdc_sink_txn_batch_size_sum{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\", type=\"TiKV\"}[1m])) by (instance)", + "expr": "sum (rate(tikv_cdc_sink_txn_batch_size_sum{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\", type=\"$sink_type\"}[1m])) by (instance)", "format": "time_series", "hide": false, "intervalFactor": 1, @@ -10673,7 +10673,7 @@ }, { "exemplar": true, - "expr": "sum (rate(tikv_cdc_sink_txn_batch_size_sum{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\", type=\"TiKV\"}[1m])) by (changefeed)", + "expr": "sum (rate(tikv_cdc_sink_txn_batch_size_sum{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\", type=\"$sink_type\"}[1m])) by (changefeed)", "format": "time_series", "hide": false, "interval": "", @@ -10686,7 +10686,7 @@ "timeFrom": null, "timeRegions": [], "timeShift": null, - "title": "TiKV sink write rows count/s", + "title": "$sink_type sink write rows count/s", "tooltip": { "shared": true, "sort": 0, @@ -10773,7 +10773,7 @@ "steppedLine": false, "targets": [ { - "expr": "sum(delta(tikv_cdc_sink_execution_error{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\", type=\"TiKV\"}[1m])) by (capture)", + "expr": "sum(delta(tikv_cdc_sink_execution_error{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\", type=\"$sink_type\"}[1m])) by (capture)", "format": "time_series", "intervalFactor": 1, "legendFormat": "{{capture}}", @@ -10784,7 +10784,7 @@ "timeFrom": null, "timeRegions": [], "timeShift": null, - "title": "TiKV sink error count/m", + "title": "$sink_type sink error count/m", "tooltip": { "shared": true, "sort": 0, @@ -10822,7 +10822,8 @@ } } ], - "title": "TiKV Sink", + "repeat": "sink_type", + "title": "$sink_type Sink", "type": "row" } ], @@ -10957,6 +10958,37 @@ "type": "query", "useTags": false }, + { + "allValue": ".*", + "current": { + "selected": false, + "text": "All", + "value": "$__all" + }, + "datasource": "${DS_TEST-CLUSTER}", + "definition": "label_values(tikv_cdc_sink_txn_batch_size_sum{tidb_cluster=\"$tidb_cluster\"}, type)", + "description": null, + "error": null, + "hide": 0, + "includeAll": true, + "label": "Sink", + "multi": false, + "name": "sink_type", + "options": [], + "query": { + "query": "label_values(tikv_cdc_sink_txn_batch_size_sum{tidb_cluster=\"$tidb_cluster\"}, type)", + "refId": "${DS_TEST-CLUSTER}-sink_type-Variable-Query" + }, + "refresh": 2, + "regex": "", + "skipUrlSync": false, + "sort": 1, + "tagValuesQuery": "", + "tags": [], + "tagsQuery": "", + "type": "query", + "useTags": false + }, { "allValue": "9999999999", "current": { diff --git a/cdc/tests/integration_tests/_utils/run_cdc_server b/cdc/tests/integration_tests/_utils/run_cdc_server index 1938eab8..7520fdf4 100755 --- a/cdc/tests/integration_tests/_utils/run_cdc_server +++ b/cdc/tests/integration_tests/_utils/run_cdc_server @@ -21,7 +21,7 @@ addr= addr_url="127.0.0.1:8600" pd_addr= pwd=$pwd -log_level=debug +log_level=info restart= failpoint=$GO_FAILPOINTS config_path= diff --git a/cdc/tests/integration_tests/_utils/start_tidb_cluster_impl b/cdc/tests/integration_tests/_utils/start_tidb_cluster_impl index e2f5f201..d8024e1d 100755 --- a/cdc/tests/integration_tests/_utils/start_tidb_cluster_impl +++ b/cdc/tests/integration_tests/_utils/start_tidb_cluster_impl @@ -13,6 +13,7 @@ multiple_upstream_pd= random_file_name= verify_tidb=false tikv_count=3 +log_level=info # Random generate the sockets config. # Make sure we dont use the same sock. @@ -46,12 +47,16 @@ while [[ ${1-} ]]; do shift ;; --verify-tidb) - vierfy_tidb=true + verify_tidb=true ;; --tikv-count) tikv_count=${2} shift ;; + --log-level) + log_level=${2} + shift + ;; *) echo "Unknown parameter: ${1}" >&2 exit 1 @@ -204,7 +209,7 @@ for idx in $(seq 1 "$tikv_count"); do -A ${!host}:${!port} \ --status-addr ${!host}:${!status_port} \ --log-file "$OUT_DIR/tikv$idx.log" \ - --log-level debug \ + --log-level "$log_level" \ -C "$OUT_DIR/tikv-config.toml" \ -s "$OUT_DIR/tikv$idx" & done @@ -215,7 +220,7 @@ tikv-server \ -A ${DOWN_TIKV_HOST}:${DOWN_TIKV_PORT} \ --status-addr ${DOWN_TIKV_HOST}:${DOWN_TIKV_STATUS_PORT} \ --log-file "$OUT_DIR/tikv_down.log" \ - --log-level debug \ + --log-level "$log_level" \ -C "$OUT_DIR/tikv-config.toml" \ -s "$OUT_DIR/tikv_down" & diff --git a/cdc/tests/integration_tests/flow_control/config/workload b/cdc/tests/integration_tests/flow_control/config/workload index faa23c17..009c71a2 100644 --- a/cdc/tests/integration_tests/flow_control/config/workload +++ b/cdc/tests/integration_tests/flow_control/config/workload @@ -22,7 +22,7 @@ # Default data size: 1 KB records (10 fields, 100 bytes each, plus key) # Request distribution: zipfian -recordcount=1000000 +recordcount=2000000 workload=core readallfields=true @@ -34,3 +34,5 @@ insertproportion=0 requestdistribution=uniform +fieldcount=1 +fieldlength=250 diff --git a/cdc/tests/integration_tests/flow_control/run.sh b/cdc/tests/integration_tests/flow_control/run.sh index a434e021..9391c680 100644 --- a/cdc/tests/integration_tests/flow_control/run.sh +++ b/cdc/tests/integration_tests/flow_control/run.sh @@ -19,7 +19,7 @@ function run() { cd $WORK_DIR start_ts=$(get_start_ts $UP_PD) - go-ycsb load tikv -P $CUR/config/workload -p tikv.pd="$UP_PD" -p tikv.type="raw" -p tikv.apiversion=V2 --threads 100 # About 1G + go-ycsb load tikv -P $CUR/config/workload -p tikv.pd="$UP_PD" -p tikv.type="raw" -p tikv.apiversion=V2 --threads 200 # About 500MiB cat - >"$WORK_DIR/tikv-cdc-config.toml" <$WORK_DIR/heap-dump.log + exit 1 fi # As "per-changefeed-memory-quota" is low the syncing will cost more time. diff --git a/cdc/tests/integration_tests/run_kafka_in_docker.sh b/cdc/tests/integration_tests/run_kafka_in_docker.sh index e22b0acf..28193d8e 100755 --- a/cdc/tests/integration_tests/run_kafka_in_docker.sh +++ b/cdc/tests/integration_tests/run_kafka_in_docker.sh @@ -20,4 +20,4 @@ done COMPOSE_DOCKER_CLI_BUILD=1 DOCKER_BUILDKIT=1 \ CASE="$CASE" \ - docker-compose -f ./deployments/tikv-cdc/docker-compose/docker-compose-kafka-integration.yml up --build + docker-compose --compatibility -f ./deployments/tikv-cdc/docker-compose/docker-compose-kafka-integration.yml up --build