Skip to content

Commit

Permalink
*: fix cdc compile failed (#954)
Browse files Browse the repository at this point in the history
ref #953
  • Loading branch information
wk989898 authored Jan 24, 2025
1 parent d9aed28 commit 7a53695
Show file tree
Hide file tree
Showing 34 changed files with 2,634 additions and 961 deletions.
14 changes: 11 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,19 @@ GITHASH := $(shell git rev-parse HEAD)
GITBRANCH := $(shell git rev-parse --abbrev-ref HEAD)
GOVERSION := $(shell go version)

# Since TiDB add a new dependency on github.com/cloudfoundry/gosigar,
# We need to add CGO_ENABLED=1 to make it work when build TiCDC in Darwin OS.
# These logic is to check if the OS is Darwin, if so, add CGO_ENABLED=1.
# ref: https://github.com/cloudfoundry/gosigar/issues/58#issuecomment-1150925711
# ref: https://github.com/pingcap/tidb/pull/39526#issuecomment-1407952955
OS := "$(shell go env GOOS)"
SED_IN_PLACE ?= $(shell which sed)
IS_ALPINE := $(shell if [ -f /etc/os-release ]; then grep -qi Alpine /etc/os-release && echo 1; else echo 0; fi)
ifeq (${OS}, "linux")
CGO := 0
SED_IN_PLACE += -i
else ifeq (${OS}, "darwin")
CGO := 1
SED_IN_PLACE += -i ''
endif

Expand All @@ -53,6 +60,7 @@ GOEXPERIMENT=
ifeq ("${ENABLE_FIPS}", "1")
BUILD_FLAG = -tags boringcrypto
GOEXPERIMENT = GOEXPERIMENT=boringcrypto
CGO = 1
endif

RELEASE_VERSION =
Expand Down Expand Up @@ -81,8 +89,8 @@ CONSUMER_BUILD_FLAG=
ifeq ("${IS_ALPINE}", "1")
CONSUMER_BUILD_FLAG = -tags musl
endif

GOBUILD := $(GOEXPERIMENT) CGO_ENABLED=1 $(GO) build $(BUILD_FLAG) -trimpath $(GOVENDORFLAG)
GOBUILD := $(GOEXPERIMENT) CGO_ENABLED=$(CGO) $(GO) build $(BUILD_FLAG) -trimpath $(GOVENDORFLAG)
CONSUMER_GOBUILD := $(GOEXPERIMENT) CGO_ENABLED=1 $(GO) build $(CONSUMER_BUILD_FLAG) -trimpath $(GOVENDORFLAG)

PACKAGE_LIST := go list ./... | grep -vE 'vendor|proto|ticdc/tests|integration|testing_utils|pb|pbmock|ticdc/bin'
PACKAGES := $$($(PACKAGE_LIST))
Expand Down Expand Up @@ -124,7 +132,7 @@ cdc:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc ./cmd/cdc

kafka_consumer:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc_kafka_consumer ./cmd/kafka-consumer
$(CONSUMER_GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc_kafka_consumer ./cmd/kafka-consumer

storage_consumer:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc_storage_consumer ./cmd/storage-consumer/main.go
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,16 @@ import (
func TestCreateTopic(t *testing.T) {
t.Parallel()

options := kafka.NewOptions()
changefeed := common.NewChangefeedID4Test("test", "test")
factory, err := kafka.NewMockFactory(options, changefeed)
require.NoError(t, err)

adminClient, err := factory.AdminClient()
require.NoError(t, err)
adminClient := kafka.NewClusterAdminClientMockImpl()
defer adminClient.Close()
cfg := &kafka.AutoCreateTopicConfig{
AutoCreate: true,
PartitionNum: 2,
ReplicationFactor: 1,
}
ctx := context.Background()

changefeedID := common.NewChangefeedID4Test("test", "test")
ctx := context.Background()
manager := newKafkaTopicManager(ctx, kafka.DefaultMockTopicName, changefeedID, adminClient, cfg)
defer manager.Close()
partitionNum, err := manager.CreateTopicAndWaitUntilVisible(ctx, kafka.DefaultMockTopicName)
Expand Down Expand Up @@ -85,23 +80,17 @@ func TestCreateTopic(t *testing.T) {
func TestCreateTopicWithDelay(t *testing.T) {
t.Parallel()

options := kafka.NewOptions()
changefeed := common.NewChangefeedID4Test("test", "test")
factory, err := kafka.NewMockFactory(options, changefeed)
require.NoError(t, err)

adminClient, err := factory.AdminClient()
require.NoError(t, err)
adminClient := kafka.NewClusterAdminClientMockImpl()
defer adminClient.Close()
cfg := &kafka.AutoCreateTopicConfig{
AutoCreate: true,
PartitionNum: 2,
ReplicationFactor: 1,
}

ctx := context.Background()
topic := "new_topic"
changefeedID := common.NewChangefeedID4Test("test", "test")
ctx := context.Background()
manager := newKafkaTopicManager(ctx, topic, changefeedID, adminClient, cfg)
defer manager.Close()
partitionNum, err := manager.createTopic(ctx, topic)
Expand Down
11 changes: 5 additions & 6 deletions downstreamadapter/sink/kafka_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func newKafkaSink(
}()

statistics := metrics.NewStatistics(changefeedID, "KafkaSink")
asyncProducer, err := kafkaComponent.Factory.AsyncProducer()
asyncProducer, err := kafkaComponent.Factory.AsyncProducer(ctx)
if err != nil {
return nil, errors.WrapError(errors.ErrKafkaNewProducer, err)
}
Expand Down Expand Up @@ -121,8 +121,8 @@ func newKafkaSink(
adminClient: kafkaComponent.AdminClient,
topicManager: kafkaComponent.TopicManager,
statistics: statistics,
metricsCollector: kafkaComponent.Factory.MetricsCollector(),
ctx: ctx,
metricsCollector: kafkaComponent.Factory.MetricsCollector(kafkaComponent.AdminClient),
}
return sink, nil
}
Expand Down Expand Up @@ -203,11 +203,10 @@ func newKafkaSinkForTest() (*KafkaSink, producer.DMLProducer, producer.DDLProduc
ctx := context.Background()
changefeedID := common.NewChangefeedID4Test("test", "test")
openProtocol := "open-protocol"
sinkConfig := config.GetDefaultReplicaConfig().Clone().Sink
sinkConfig.Protocol = &openProtocol
sinkConfig := &config.SinkConfig{Protocol: &openProtocol}
uriTemplate := "kafka://%s/%s?kafka-version=0.9.0.0&max-batch-size=1" +
"&max-message-bytes=1048576&partition-num=1" +
"&kafka-client-id=unit-test&auto-create-topic=true&compression=gzip&protocol=open-protocol"
"&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip&protocol=open-protocol"
uri := fmt.Sprintf(uriTemplate, "127.0.0.1:9092", kafka.DefaultMockTopicName)

sinkURI, err := url.Parse(uri)
Expand Down Expand Up @@ -257,7 +256,7 @@ func newKafkaSinkForTest() (*KafkaSink, producer.DMLProducer, producer.DDLProduc
adminClient: kafkaComponent.AdminClient,
topicManager: kafkaComponent.TopicManager,
statistics: statistics,
metricsCollector: kafkaComponent.Factory.MetricsCollector(),
metricsCollector: kafkaComponent.Factory.MetricsCollector(kafkaComponent.AdminClient),
}
go sink.Run(ctx)
return sink, dmlMockProducer, ddlMockProducer, nil
Expand Down
12 changes: 7 additions & 5 deletions downstreamadapter/worker/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/ticdc/pkg/sink/codec"
"github.com/pingcap/ticdc/pkg/sink/codec/common"
"github.com/pingcap/ticdc/pkg/sink/kafka"
v2 "github.com/pingcap/ticdc/pkg/sink/kafka/v2"
"github.com/pingcap/ticdc/pkg/sink/util"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tiflow/pkg/sink"
Expand Down Expand Up @@ -59,7 +60,7 @@ func getKafkaSinkComponentWithFactory(ctx context.Context,
return kafkaComponent, protocol, errors.WrapError(errors.ErrKafkaInvalidConfig, err)
}

kafkaComponent.Factory, err = factoryCreator(options, changefeedID)
kafkaComponent.Factory, err = factoryCreator(ctx, options, changefeedID)
if err != nil {
return kafkaComponent, protocol, errors.WrapError(errors.ErrKafkaNewProducer, err)
}
Expand Down Expand Up @@ -93,9 +94,7 @@ func getKafkaSinkComponentWithFactory(ctx context.Context,
options.DeriveTopicConfig(),
kafkaComponent.AdminClient,
)
if err != nil {
return kafkaComponent, protocol, errors.Trace(err)
}

scheme := sink.GetScheme(sinkURI)
kafkaComponent.EventRouter, err = eventrouter.NewEventRouter(sinkConfig, protocol, topic, scheme)
if err != nil {
Expand Down Expand Up @@ -130,7 +129,10 @@ func GetKafkaSinkComponent(
sinkURI *url.URL,
sinkConfig *config.SinkConfig,
) (KafkaComponent, config.Protocol, error) {
factoryCreator := kafka.NewFactory
factoryCreator := kafka.NewSaramaFactory
if utils.GetOrZero(sinkConfig.EnableKafkaSinkV2) {
factoryCreator = v2.NewFactory
}
return getKafkaSinkComponentWithFactory(ctx, changefeedID, sinkURI, sinkConfig, factoryCreator)
}

Expand Down
5 changes: 2 additions & 3 deletions downstreamadapter/worker/kafka_ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,10 @@ func kafkaDDLWorkerForTest(t *testing.T) *KafkaDDLWorker {
ctx := context.Background()
changefeedID := common.NewChangefeedID4Test("test", "test")
openProtocol := "open-protocol"
sinkConfig := config.GetDefaultReplicaConfig().Clone().Sink
sinkConfig.Protocol = &openProtocol
sinkConfig := &config.SinkConfig{Protocol: &openProtocol}
uriTemplate := "kafka://%s/%s?kafka-version=0.9.0.0&max-batch-size=1" +
"&max-message-bytes=1048576&partition-num=1" +
"&kafka-client-id=unit-test&auto-create-topic=true&compression=gzip&protocol=open-protocol"
"&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip&protocol=open-protocol"
uri := fmt.Sprintf(uriTemplate, "127.0.0.1:9092", kafka.DefaultMockTopicName)

sinkURI, err := url.Parse(uri)
Expand Down
5 changes: 2 additions & 3 deletions downstreamadapter/worker/kafka_dml_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,10 @@ func kafkaDMLWorkerForTest(t *testing.T) *KafkaDMLWorker {
ctx := context.Background()
changefeedID := common.NewChangefeedID4Test("test", "test")
openProtocol := "open-protocol"
sinkConfig := config.GetDefaultReplicaConfig().Clone().Sink
sinkConfig.Protocol = &openProtocol
sinkConfig := &config.SinkConfig{Protocol: &openProtocol}
uriTemplate := "kafka://%s/%s?kafka-version=0.9.0.0&max-batch-size=1" +
"&max-message-bytes=1048576&partition-num=1" +
"&kafka-client-id=unit-test&auto-create-topic=true&compression=gzip&protocol=open-protocol"
"&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip&protocol=open-protocol"
uri := fmt.Sprintf(uriTemplate, "127.0.0.1:9092", kafka.DefaultMockTopicName)

sinkURI, err := url.Parse(uri)
Expand Down
23 changes: 13 additions & 10 deletions downstreamadapter/worker/producer/kafka_ddl_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@ package producer

import (
"context"
"fmt"
"testing"
"time"

confluentKafka "github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/IBM/sarama"
commonType "github.com/pingcap/ticdc/pkg/common"
"github.com/pingcap/ticdc/pkg/errors"
cerror "github.com/pingcap/ticdc/pkg/errors"
Expand All @@ -34,16 +33,17 @@ func TestDDLSyncBroadcastMessage(t *testing.T) {
options.MaxMessages = 1

changefeed := commonType.NewChangefeedID4Test("test", "test")
factory, err := kafka.NewMockFactory(options, changefeed)
factory, err := kafka.NewMockFactory(ctx, options, changefeed)
require.NoError(t, err)
factory.(*kafka.MockFactory).ErrorReporter = t

syncProducer, err := factory.SyncProducer()
require.NoError(t, err)

p := NewKafkaDDLProducer(ctx, changefeed, syncProducer)

for i := 0; i < kafka.DefaultMockPartitionNum; i++ {
syncProducer.(*kafka.MockSyncProducer).Producer.ExpectSendMessageAndSucceed()
syncProducer.(*kafka.MockSaramaSyncProducer).Producer.ExpectSendMessageAndSucceed()
}
err = p.SyncBroadcastMessage(ctx, kafka.DefaultMockTopicName,
kafka.DefaultMockPartitionNum, &common.Message{})
Expand All @@ -61,15 +61,16 @@ func TestDDLSyncSendMessage(t *testing.T) {
options := getOptions()

changefeed := commonType.NewChangefeedID4Test("test", "test")
factory, err := kafka.NewMockFactory(options, changefeed)
factory, err := kafka.NewMockFactory(ctx, options, changefeed)
require.NoError(t, err)
factory.(*kafka.MockFactory).ErrorReporter = t

syncProducer, err := factory.SyncProducer()
require.NoError(t, err)

p := NewKafkaDDLProducer(ctx, changefeed, syncProducer)

syncProducer.(*kafka.MockSyncProducer).Producer.ExpectSendMessageAndSucceed()
syncProducer.(*kafka.MockSaramaSyncProducer).Producer.ExpectSendMessageAndSucceed()
err = p.SyncSendMessage(ctx, kafka.DefaultMockTopicName, 0, &common.Message{})
require.NoError(t, err)

Expand All @@ -90,18 +91,19 @@ func TestDDLProducerSendMsgFailed(t *testing.T) {

// This will make the first send failed.
changefeed := commonType.NewChangefeedID4Test("test", "test")
factory, err := kafka.NewMockFactory(options, changefeed)
factory, err := kafka.NewMockFactory(ctx, options, changefeed)
require.NoError(t, err)
factory.(*kafka.MockFactory).ErrorReporter = t

syncProducer, err := factory.SyncProducer()
require.NoError(t, err)

p := NewKafkaDDLProducer(ctx, changefeed, syncProducer)
defer p.Close()

syncProducer.(*kafka.MockSaramaSyncProducer).Producer.ExpectSendMessageAndFail(sarama.ErrMessageTooLarge)
err = p.SyncSendMessage(ctx, kafka.DefaultMockTopicName, 0, &common.Message{})
fmt.Println(err)
require.ErrorIs(t, err, confluentKafka.NewError(confluentKafka.ErrMsgSizeTooLarge, "", false))
require.ErrorIs(t, err, sarama.ErrMessageTooLarge)
}

func TestDDLProducerDoubleClose(t *testing.T) {
Expand All @@ -110,8 +112,9 @@ func TestDDLProducerDoubleClose(t *testing.T) {
options := getOptions()

changefeed := commonType.NewChangefeedID4Test("test", "test")
factory, err := kafka.NewMockFactory(options, changefeed)
factory, err := kafka.NewMockFactory(ctx, options, changefeed)
require.NoError(t, err)
factory.(*kafka.MockFactory).ErrorReporter = t

syncProducer, err := factory.SyncProducer()
require.NoError(t, err)
Expand Down
31 changes: 18 additions & 13 deletions downstreamadapter/worker/producer/kafka_dml_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"testing"
"time"

confluentKafka "github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/IBM/sarama"
commonType "github.com/pingcap/ticdc/pkg/common"
"github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/sink/codec/common"
Expand All @@ -45,16 +45,16 @@ func TestProducerAck(t *testing.T) {

errCh := make(chan error, 1)
ctx, cancel := context.WithCancel(context.Background())
config := kafka.NewConfig(options)
val, err := config.Get("queue.buffering.max.messages", -1)
require.NoError(t, err)
require.Equal(t, 1, val)
config, err := kafka.NewSaramaConfig(ctx, options)
require.Nil(t, err)
require.Equal(t, 1, config.Producer.Flush.MaxMessages)

changefeed := commonType.NewChangefeedID4Test("test", "test")
factory, err := kafka.NewMockFactory(options, changefeed)
factory, err := kafka.NewMockFactory(ctx, options, changefeed)
require.NoError(t, err)
factory.(*kafka.MockFactory).ErrorReporter = t

asyncProducer, err := factory.AsyncProducer()
asyncProducer, err := factory.AsyncProducer(ctx)
require.NoError(t, err)

producer := NewKafkaDMLProducer(changefeed, asyncProducer)
Expand All @@ -64,7 +64,7 @@ func TestProducerAck(t *testing.T) {

messageCount := 20
for i := 0; i < messageCount; i++ {
asyncProducer.(*kafka.MockAsyncProducer).Producer.ExpectInputAndSucceed()
asyncProducer.(*kafka.MockSaramaAsyncProducer).AsyncProducer.ExpectInputAndSucceed()
}

count := atomic.NewInt64(0)
Expand Down Expand Up @@ -113,14 +113,17 @@ func TestProducerSendMsgFailed(t *testing.T) {
errCh := make(chan error, 1)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
_, err := kafka.NewSaramaConfig(ctx, options)
require.NoError(t, err)
options.MaxMessages = 1
options.MaxMessageBytes = 1

changefeed := commonType.NewChangefeedID4Test("test", "test")
factory, err := kafka.NewMockFactory(options, changefeed)
factory, err := kafka.NewMockFactory(ctx, options, changefeed)
require.NoError(t, err)
factory.(*kafka.MockFactory).ErrorReporter = t

asyncProducer, err := factory.AsyncProducer()
asyncProducer, err := factory.AsyncProducer(ctx)
require.NoError(t, err)

producer := NewKafkaDMLProducer(changefeed, asyncProducer)
Expand All @@ -143,6 +146,7 @@ func TestProducerSendMsgFailed(t *testing.T) {
go func(t *testing.T) {
defer wg.Done()

asyncProducer.(*kafka.MockSaramaAsyncProducer).AsyncProducer.ExpectInputAndFail(sarama.ErrMessageTooLarge)
err = producer.AsyncSendMessage(ctx, kafka.DefaultMockTopicName, int32(0), &common.Message{
Key: []byte("test-key-1"),
Value: []byte("test-value"),
Expand All @@ -162,7 +166,7 @@ func TestProducerSendMsgFailed(t *testing.T) {
case <-ctx.Done():
t.Errorf("TestProducerSendMessageFailed timed out")
case err := <-errCh:
require.ErrorIs(t, err, confluentKafka.NewError(confluentKafka.ErrMsgSizeTooLarge, "", false))
require.ErrorIs(t, err, sarama.ErrMessageTooLarge)
}
}()

Expand All @@ -176,10 +180,11 @@ func TestProducerDoubleClose(t *testing.T) {
defer cancel()

changefeed := commonType.NewChangefeedID4Test("test", "test")
factory, err := kafka.NewMockFactory(options, changefeed)
factory, err := kafka.NewMockFactory(ctx, options, changefeed)
require.NoError(t, err)
factory.(*kafka.MockFactory).ErrorReporter = t

asyncProducer, err := factory.AsyncProducer()
asyncProducer, err := factory.AsyncProducer(ctx)
require.NoError(t, err)

producer := NewKafkaDMLProducer(changefeed, asyncProducer)
Expand Down
Loading

0 comments on commit 7a53695

Please sign in to comment.