diff --git a/Makefile b/Makefile index 852738fdd..44017de7c 100644 --- a/Makefile +++ b/Makefile @@ -82,6 +82,7 @@ ifeq ("${IS_ALPINE}", "1") CONSUMER_BUILD_FLAG = -tags musl endif GOBUILD := $(GOEXPERIMENT) CGO_ENABLED=$(CGO) $(GO) build $(BUILD_FLAG) -trimpath $(GOVENDORFLAG) +CONSUMER_GOBUILD := $(GOEXPERIMENT) CGO_ENABLED=1 $(GO) build $(CONSUMER_BUILD_FLAG) -trimpath $(GOVENDORFLAG) PACKAGE_LIST := go list ./... | grep -vE 'vendor|proto|ticdc/tests|integration|testing_utils|pb|pbmock|ticdc/bin' PACKAGES := $$($(PACKAGE_LIST)) @@ -109,7 +110,19 @@ generate_mock: tools/bin/mockgen scripts/generate-mock.sh cdc: - $(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc ./cmd + $(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc ./cmd/cdc + +kafka_consumer: + $(CONSUMER_GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc_kafka_consumer ./cmd/kafka-consumer + +storage_consumer: + $(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc_storage_consumer ./cmd/storage-consumer/main.go + +pulsar_consumer: + $(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc_pulsar_consumer ./cmd/pulsar-consumer/main.go + +filter_helper: + $(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc_filter_helper ./cmd/filter-helper/main.go fmt: tools/bin/gofumports tools/bin/shfmt tools/bin/gci @echo "run gci (format imports)" @@ -126,9 +139,9 @@ integration_test_build: check_failpoint_ctl $(FAILPOINT_ENABLE) $(GOTEST) -ldflags '$(LDFLAGS)' -c -cover -covermode=atomic \ -coverpkg=github.com/pingcap/ticdc/... \ - -o bin/cdc.test github.com/pingcap/ticdc/cmd \ + -o bin/cdc.test github.com/pingcap/ticdc/cmd/cdc \ || { $(FAILPOINT_DISABLE); echo "Failed to build cdc.test"; exit 1; } - $(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc ./cmd/main.go \ + $(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc ./cmd/cdc/main.go \ || { $(FAILPOINT_DISABLE); exit 1; } $(FAILPOINT_DISABLE) diff --git a/cmd/cli/cli.go b/cmd/cdc/cli/cli.go similarity index 97% rename from cmd/cli/cli.go rename to cmd/cdc/cli/cli.go index b1932d490..cc27c0167 100644 --- a/cmd/cli/cli.go +++ b/cmd/cdc/cli/cli.go @@ -14,7 +14,7 @@ package cli import ( - "github.com/pingcap/ticdc/cmd/factory" + "github.com/pingcap/ticdc/cmd/cdc/factory" "github.com/pingcap/tiflow/pkg/cmd/util" "github.com/pingcap/tiflow/pkg/logutil" "github.com/spf13/cobra" diff --git a/cmd/cli/cli_capture.go b/cmd/cdc/cli/cli_capture.go similarity index 95% rename from cmd/cli/cli_capture.go rename to cmd/cdc/cli/cli_capture.go index 26a446ad6..3263dd84c 100644 --- a/cmd/cli/cli_capture.go +++ b/cmd/cdc/cli/cli_capture.go @@ -14,7 +14,7 @@ package cli import ( - "github.com/pingcap/ticdc/cmd/factory" + "github.com/pingcap/ticdc/cmd/cdc/factory" "github.com/spf13/cobra" ) diff --git a/cmd/cli/cli_capture_list.go b/cmd/cdc/cli/cli_capture_list.go similarity index 97% rename from cmd/cli/cli_capture_list.go rename to cmd/cdc/cli/cli_capture_list.go index ec2d83bc0..947fc58aa 100644 --- a/cmd/cli/cli_capture_list.go +++ b/cmd/cdc/cli/cli_capture_list.go @@ -14,7 +14,7 @@ package cli import ( - "github.com/pingcap/ticdc/cmd/factory" + "github.com/pingcap/ticdc/cmd/cdc/factory" apiv2client "github.com/pingcap/ticdc/pkg/api/v2" cmdcontext "github.com/pingcap/tiflow/pkg/cmd/context" "github.com/pingcap/tiflow/pkg/cmd/util" diff --git a/cmd/cli/cli_changefeed.go b/cmd/cdc/cli/cli_changefeed.go similarity index 96% rename from cmd/cli/cli_changefeed.go rename to cmd/cdc/cli/cli_changefeed.go index fcbc4eaed..14aecfa72 100644 --- a/cmd/cli/cli_changefeed.go +++ b/cmd/cdc/cli/cli_changefeed.go @@ -14,7 +14,7 @@ package cli import ( - "github.com/pingcap/ticdc/cmd/factory" + "github.com/pingcap/ticdc/cmd/cdc/factory" "github.com/spf13/cobra" ) diff --git a/cmd/cli/cli_changefeed_create.go b/cmd/cdc/cli/cli_changefeed_create.go similarity index 99% rename from cmd/cli/cli_changefeed_create.go rename to cmd/cdc/cli/cli_changefeed_create.go index dfb74ef93..c7ab8ab06 100644 --- a/cmd/cli/cli_changefeed_create.go +++ b/cmd/cdc/cli/cli_changefeed_create.go @@ -23,7 +23,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" v2 "github.com/pingcap/ticdc/api/v2" - "github.com/pingcap/ticdc/cmd/factory" + "github.com/pingcap/ticdc/cmd/cdc/factory" apiv2client "github.com/pingcap/ticdc/pkg/api/v2" "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/filter" diff --git a/cmd/cli/cli_changefeed_create_test.go b/cmd/cdc/cli/cli_changefeed_create_test.go similarity index 100% rename from cmd/cli/cli_changefeed_create_test.go rename to cmd/cdc/cli/cli_changefeed_create_test.go diff --git a/cmd/cli/cli_changefeed_helper.go b/cmd/cdc/cli/cli_changefeed_helper.go similarity index 100% rename from cmd/cli/cli_changefeed_helper.go rename to cmd/cdc/cli/cli_changefeed_helper.go diff --git a/cmd/cli/cli_changefeed_helper_test.go b/cmd/cdc/cli/cli_changefeed_helper_test.go similarity index 100% rename from cmd/cli/cli_changefeed_helper_test.go rename to cmd/cdc/cli/cli_changefeed_helper_test.go diff --git a/cmd/cli/cli_changefeed_list.go b/cmd/cdc/cli/cli_changefeed_list.go similarity index 98% rename from cmd/cli/cli_changefeed_list.go rename to cmd/cdc/cli/cli_changefeed_list.go index 3b4188c5a..0ce8c8088 100644 --- a/cmd/cli/cli_changefeed_list.go +++ b/cmd/cdc/cli/cli_changefeed_list.go @@ -16,7 +16,7 @@ package cli import ( "time" - "github.com/pingcap/ticdc/cmd/factory" + "github.com/pingcap/ticdc/cmd/cdc/factory" v2 "github.com/pingcap/ticdc/pkg/api/v2" "github.com/pingcap/tiflow/cdc/api/owner" "github.com/pingcap/tiflow/cdc/model" diff --git a/cmd/cli/cli_changefeed_list_test.go b/cmd/cdc/cli/cli_changefeed_list_test.go similarity index 100% rename from cmd/cli/cli_changefeed_list_test.go rename to cmd/cdc/cli/cli_changefeed_list_test.go diff --git a/cmd/cli/cli_changefeed_move_table.go b/cmd/cdc/cli/cli_changefeed_move_table.go similarity index 98% rename from cmd/cli/cli_changefeed_move_table.go rename to cmd/cdc/cli/cli_changefeed_move_table.go index eb086f954..45c486c80 100644 --- a/cmd/cli/cli_changefeed_move_table.go +++ b/cmd/cdc/cli/cli_changefeed_move_table.go @@ -16,7 +16,7 @@ package cli import ( "context" - "github.com/pingcap/ticdc/cmd/factory" + "github.com/pingcap/ticdc/cmd/cdc/factory" apiv2client "github.com/pingcap/ticdc/pkg/api/v2" "github.com/pingcap/tiflow/pkg/cmd/util" "github.com/spf13/cobra" diff --git a/cmd/cli/cli_changefeed_pause.go b/cmd/cdc/cli/cli_changefeed_pause.go similarity index 98% rename from cmd/cli/cli_changefeed_pause.go rename to cmd/cdc/cli/cli_changefeed_pause.go index 3c3a1c3b0..60d779db3 100644 --- a/cmd/cli/cli_changefeed_pause.go +++ b/cmd/cdc/cli/cli_changefeed_pause.go @@ -14,7 +14,7 @@ package cli import ( - "github.com/pingcap/ticdc/cmd/factory" + "github.com/pingcap/ticdc/cmd/cdc/factory" apiv2client "github.com/pingcap/ticdc/pkg/api/v2" "github.com/pingcap/tiflow/pkg/cmd/context" "github.com/pingcap/tiflow/pkg/cmd/util" diff --git a/cmd/cli/cli_changefeed_pause_test.go b/cmd/cdc/cli/cli_changefeed_pause_test.go similarity index 100% rename from cmd/cli/cli_changefeed_pause_test.go rename to cmd/cdc/cli/cli_changefeed_pause_test.go diff --git a/cmd/cli/cli_changefeed_query.go b/cmd/cdc/cli/cli_changefeed_query.go similarity index 99% rename from cmd/cli/cli_changefeed_query.go rename to cmd/cdc/cli/cli_changefeed_query.go index 4ce74d296..273c8dc4b 100644 --- a/cmd/cli/cli_changefeed_query.go +++ b/cmd/cdc/cli/cli_changefeed_query.go @@ -18,7 +18,7 @@ import ( "github.com/pingcap/errors" v2 "github.com/pingcap/ticdc/api/v2" - "github.com/pingcap/ticdc/cmd/factory" + "github.com/pingcap/ticdc/cmd/cdc/factory" apiv2client "github.com/pingcap/ticdc/pkg/api/v2" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/cmd/util" diff --git a/cmd/cli/cli_changefeed_query_test.go b/cmd/cdc/cli/cli_changefeed_query_test.go similarity index 100% rename from cmd/cli/cli_changefeed_query_test.go rename to cmd/cdc/cli/cli_changefeed_query_test.go diff --git a/cmd/cli/cli_changefeed_remove.go b/cmd/cdc/cli/cli_changefeed_remove.go similarity index 98% rename from cmd/cli/cli_changefeed_remove.go rename to cmd/cdc/cli/cli_changefeed_remove.go index b928803a7..00589e50b 100644 --- a/cmd/cli/cli_changefeed_remove.go +++ b/cmd/cdc/cli/cli_changefeed_remove.go @@ -16,7 +16,7 @@ package cli import ( "strings" - "github.com/pingcap/ticdc/cmd/factory" + "github.com/pingcap/ticdc/cmd/cdc/factory" apiv2client "github.com/pingcap/ticdc/pkg/api/v2" "github.com/pingcap/tiflow/pkg/cmd/context" "github.com/pingcap/tiflow/pkg/cmd/util" diff --git a/cmd/cli/cli_changefeed_remove_test.go b/cmd/cdc/cli/cli_changefeed_remove_test.go similarity index 100% rename from cmd/cli/cli_changefeed_remove_test.go rename to cmd/cdc/cli/cli_changefeed_remove_test.go diff --git a/cmd/cli/cli_changefeed_resume.go b/cmd/cdc/cli/cli_changefeed_resume.go similarity index 99% rename from cmd/cli/cli_changefeed_resume.go rename to cmd/cdc/cli/cli_changefeed_resume.go index 384524e8b..37a2180b7 100644 --- a/cmd/cli/cli_changefeed_resume.go +++ b/cmd/cdc/cli/cli_changefeed_resume.go @@ -19,7 +19,7 @@ import ( "strings" v2 "github.com/pingcap/ticdc/api/v2" - "github.com/pingcap/ticdc/cmd/factory" + "github.com/pingcap/ticdc/cmd/cdc/factory" apiv2client "github.com/pingcap/ticdc/pkg/api/v2" cmdcontext "github.com/pingcap/tiflow/pkg/cmd/context" "github.com/pingcap/tiflow/pkg/cmd/util" diff --git a/cmd/cli/cli_changefeed_resume_test.go b/cmd/cdc/cli/cli_changefeed_resume_test.go similarity index 100% rename from cmd/cli/cli_changefeed_resume_test.go rename to cmd/cdc/cli/cli_changefeed_resume_test.go diff --git a/cmd/cli/cli_changefeed_statistics.go b/cmd/cdc/cli/cli_changefeed_statistics.go similarity index 98% rename from cmd/cli/cli_changefeed_statistics.go rename to cmd/cdc/cli/cli_changefeed_statistics.go index 74ad327f2..e10e2f3bf 100644 --- a/cmd/cli/cli_changefeed_statistics.go +++ b/cmd/cdc/cli/cli_changefeed_statistics.go @@ -19,7 +19,7 @@ import ( "time" v2 "github.com/pingcap/ticdc/api/v2" - "github.com/pingcap/ticdc/cmd/factory" + "github.com/pingcap/ticdc/cmd/cdc/factory" apiv2client "github.com/pingcap/ticdc/pkg/api/v2" cmdcontext "github.com/pingcap/tiflow/pkg/cmd/context" "github.com/pingcap/tiflow/pkg/cmd/util" diff --git a/cmd/cli/cli_changefeed_update.go b/cmd/cdc/cli/cli_changefeed_update.go similarity index 99% rename from cmd/cli/cli_changefeed_update.go rename to cmd/cdc/cli/cli_changefeed_update.go index f355f5623..b9f81475f 100644 --- a/cmd/cli/cli_changefeed_update.go +++ b/cmd/cdc/cli/cli_changefeed_update.go @@ -19,7 +19,7 @@ import ( "github.com/pingcap/log" v2 "github.com/pingcap/ticdc/api/v2" - "github.com/pingcap/ticdc/cmd/factory" + "github.com/pingcap/ticdc/cmd/cdc/factory" apiv2client "github.com/pingcap/ticdc/pkg/api/v2" cmdcontext "github.com/pingcap/tiflow/pkg/cmd/context" "github.com/pingcap/tiflow/pkg/cmd/util" diff --git a/cmd/cli/cli_changefeed_update_test.go b/cmd/cdc/cli/cli_changefeed_update_test.go similarity index 100% rename from cmd/cli/cli_changefeed_update_test.go rename to cmd/cdc/cli/cli_changefeed_update_test.go diff --git a/cmd/cli/cli_tso.go b/cmd/cdc/cli/cli_tso.go similarity index 94% rename from cmd/cli/cli_tso.go rename to cmd/cdc/cli/cli_tso.go index 372837b25..015a1b118 100644 --- a/cmd/cli/cli_tso.go +++ b/cmd/cdc/cli/cli_tso.go @@ -14,7 +14,7 @@ package cli import ( - "github.com/pingcap/ticdc/cmd/factory" + "github.com/pingcap/ticdc/cmd/cdc/factory" "github.com/spf13/cobra" ) diff --git a/cmd/cli/cli_tso_query.go b/cmd/cdc/cli/cli_tso_query.go similarity index 97% rename from cmd/cli/cli_tso_query.go rename to cmd/cdc/cli/cli_tso_query.go index 282129224..555dea7c1 100644 --- a/cmd/cli/cli_tso_query.go +++ b/cmd/cdc/cli/cli_tso_query.go @@ -14,7 +14,7 @@ package cli import ( - "github.com/pingcap/ticdc/cmd/factory" + "github.com/pingcap/ticdc/cmd/cdc/factory" "github.com/pingcap/tiflow/pkg/cmd/context" "github.com/pingcap/tiflow/pkg/cmd/util" "github.com/spf13/cobra" diff --git a/cmd/factory/factory.go b/cmd/cdc/factory/factory.go similarity index 100% rename from cmd/factory/factory.go rename to cmd/cdc/factory/factory.go diff --git a/cmd/factory/factory_impl.go b/cmd/cdc/factory/factory_impl.go similarity index 100% rename from cmd/factory/factory_impl.go rename to cmd/cdc/factory/factory_impl.go diff --git a/cmd/main.go b/cmd/cdc/main.go similarity index 97% rename from cmd/main.go rename to cmd/cdc/main.go index 9d6871d78..c61874ff6 100644 --- a/cmd/main.go +++ b/cmd/cdc/main.go @@ -19,9 +19,9 @@ import ( "strings" "github.com/pingcap/log" - "github.com/pingcap/ticdc/cmd/cli" - "github.com/pingcap/ticdc/cmd/server" - "github.com/pingcap/ticdc/cmd/version" + "github.com/pingcap/ticdc/cmd/cdc/cli" + "github.com/pingcap/ticdc/cmd/cdc/server" + "github.com/pingcap/ticdc/cmd/cdc/version" "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/tidb/pkg/util/collate" tiflowCmd "github.com/pingcap/tiflow/pkg/cmd" diff --git a/cmd/main_test.go b/cmd/cdc/main_test.go similarity index 100% rename from cmd/main_test.go rename to cmd/cdc/main_test.go diff --git a/cmd/server/context.go b/cmd/cdc/server/context.go similarity index 100% rename from cmd/server/context.go rename to cmd/cdc/server/context.go diff --git a/cmd/server/server.go b/cmd/cdc/server/server.go similarity index 100% rename from cmd/server/server.go rename to cmd/cdc/server/server.go diff --git a/cmd/version/version.go b/cmd/cdc/version/version.go similarity index 100% rename from cmd/version/version.go rename to cmd/cdc/version/version.go diff --git a/cmd/filter-helper/main.go b/cmd/filter-helper/main.go new file mode 100644 index 000000000..97a0e671c --- /dev/null +++ b/cmd/filter-helper/main.go @@ -0,0 +1,113 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "fmt" + "strings" + + timodel "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/cmd/util" + "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/filter" + "github.com/spf13/cobra" +) + +var ( + table string + ddl string + cfgPath string +) + +func main() { + rootCmd := &cobra.Command{ + Use: "TiCDC filter helper, use to check whether your filter config works as expected", + Short: "A tool to check table and ddl query against filter rules", + Run: runFilter, + } + rootCmd.Flags().StringVarP(&cfgPath, "config", "c", "", "changefeed config file path") + rootCmd.Flags().StringVarP(&table, "table", "t", "", "table name, format: [schema].[table] ") + rootCmd.Flags().StringVarP(&ddl, "ddl", "d", "", "ddl query") + if err := rootCmd.Execute(); err != nil { + fmt.Println(err) + } +} + +func runFilter(cmd *cobra.Command, args []string) { + // fmt.Printf("Filter Rules: %v\n", filterRules) + // fmt.Printf("Schema Name: %s\n", schemaName) + // fmt.Printf("Table Name: %s\n", tableName) + cfg := &config.ReplicaConfig{} + err := util.StrictDecodeFile(cfgPath, "cdc filter helper", cfg) + if err != nil { + fmt.Printf("decode config file error: %v\n", err) + return + } + ft, err := filter.NewFilter(cfg, "") + if err != nil { + fmt.Printf("filter create error: %v\n", err) + return + } + tableAndSchema := strings.Split(table, ".") + if len(tableAndSchema) != 2 { + fmt.Printf("the input format is invalid, only support {schema}.{table}: %s\n", table) + return + } + + target := "table" + if ddl != "" { + target = "ddl" + } + + switch target { + case "table": + matched := !ft.ShouldIgnoreTable(tableAndSchema[0], tableAndSchema[1]) + if matched { + fmt.Printf("Table: %s, Matched filter rule\n", table) + return + } + fmt.Printf("Table: %s, Not matched filter rule\n", table) + case "ddl": + ddlType := timodel.ActionCreateTable + discard := ft.ShouldDiscardDDL(ddlType, tableAndSchema[0], tableAndSchema[1]) + if discard { + fmt.Printf("DDL: %s, should be discard by event filter rule\n", ddl) + return + } + ignored, err := ft.ShouldIgnoreDDLEvent(&model.DDLEvent{ + StartTs: uint64(0), + Query: ddl, + Type: ddlType, + TableInfo: &model.TableInfo{ + TableName: model.TableName{ + Schema: tableAndSchema[0], + Table: tableAndSchema[1], + }, + }, + }) + if err != nil { + fmt.Printf("filter ddl error: %s, error: %v\n", ddl, err) + return + } + if ignored { + fmt.Printf("DDL: %s, should be ignored by event filter rule\n", ddl) + return + } + fmt.Printf("DDL: %s, should not be discard by event filter rule\n", ddl) + default: + fmt.Printf("unknown target: %s", target) + + } +} diff --git a/cmd/kafka-consumer/consumer.go b/cmd/kafka-consumer/consumer.go new file mode 100644 index 000000000..5693a4b93 --- /dev/null +++ b/cmd/kafka-consumer/consumer.go @@ -0,0 +1,155 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "strings" + "time" + + "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/pkg/errors" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +func getPartitionNum(o *option) (int32, error) { + configMap := &kafka.ConfigMap{ + "bootstrap.servers": strings.Join(o.address, ","), + } + if len(o.ca) != 0 { + _ = configMap.SetKey("security.protocol", "SSL") + _ = configMap.SetKey("ssl.ca.location", o.ca) + _ = configMap.SetKey("ssl.key.location", o.key) + _ = configMap.SetKey("ssl.certificate.location", o.cert) + } + admin, err := kafka.NewAdminClient(configMap) + if err != nil { + return 0, errors.Trace(err) + } + defer admin.Close() + + timeout := 3000 + for i := 0; i <= o.retryTime; i++ { + resp, err := admin.GetMetadata(&o.topic, false, timeout) + if err != nil { + if err.(kafka.Error).Code() == kafka.ErrTransport { + log.Info("retry get partition number", zap.Int("retryTime", i), zap.Int("timeout", timeout)) + timeout += 100 + continue + } + return 0, errors.Trace(err) + } + if topicDetail, ok := resp.Topics[o.topic]; ok { + numPartitions := int32(len(topicDetail.Partitions)) + log.Info("get partition number of topic", + zap.String("topic", o.topic), + zap.Int32("partitionNum", numPartitions)) + return numPartitions, nil + } + log.Info("retry get partition number", zap.String("topic", o.topic)) + time.Sleep(1 * time.Second) + } + return 0, errors.Errorf("get partition number(%s) timeout", o.topic) +} + +type consumer struct { + client *kafka.Consumer + writer *writer +} + +// newConsumer will create a consumer client. +func newConsumer(ctx context.Context, o *option) *consumer { + partitionNum, err := getPartitionNum(o) + if err != nil { + log.Panic("cannot get the partition number", zap.String("topic", o.topic), zap.Error(err)) + } + if o.partitionNum == 0 { + o.partitionNum = partitionNum + } + topics := strings.Split(o.topic, ",") + if len(topics) == 0 { + log.Panic("no topic provided for the consumer") + } + configMap := &kafka.ConfigMap{ + "bootstrap.servers": strings.Join(o.address, ","), + "group.id": o.groupID, + // Start reading from the first message of each assigned + // partition if there are no previously committed offsets + // for this group. + "auto.offset.reset": "earliest", + // Whether we store offsets automatically. + "enable.auto.offset.store": false, + "enable.auto.commit": false, + } + if len(o.ca) != 0 { + _ = configMap.SetKey("security.protocol", "SSL") + _ = configMap.SetKey("ssl.ca.location", o.ca) + _ = configMap.SetKey("ssl.key.location", o.key) + _ = configMap.SetKey("ssl.certificate.location", o.cert) + } + if level, err := zapcore.ParseLevel(o.logLevel); err == nil && level.String() == "debug" { + configMap.SetKey("debug", "all") + } + client, err := kafka.NewConsumer(configMap) + if err != nil { + log.Panic("create kafka consumer failed", zap.Error(err)) + } + err = client.SubscribeTopics(topics, nil) + if err != nil { + log.Panic("subscribe topics failed", zap.Error(err)) + } + return &consumer{ + writer: newWriter(ctx, o), + client: client, + } +} + +// Consume will read message from Kafka. +func (c *consumer) Consume(ctx context.Context) { + defer func() { + if err := c.client.Close(); err != nil { + log.Panic("close kafka consumer failed", zap.Error(err)) + } + }() + for { + select { + case <-ctx.Done(): + log.Info("consumer exist: context cancelled") + return + default: + } + msg, err := c.client.ReadMessage(-1) + if err != nil { + log.Error("read message failed, just continue to retry", zap.Error(err)) + continue + } + needCommit := c.writer.WriteMessage(ctx, msg) + if !needCommit { + continue + } + + topicPartition, err := c.client.CommitMessage(msg) + if err != nil { + log.Error("commit message failed, just continue", + zap.String("topic", *msg.TopicPartition.Topic), zap.Int32("partition", msg.TopicPartition.Partition), + zap.Any("offset", msg.TopicPartition.Offset), zap.Error(err)) + continue + } + log.Debug("commit message success", + zap.String("topic", topicPartition[0].String()), zap.Int32("partition", topicPartition[0].Partition), + zap.Any("offset", topicPartition[0].Offset)) + } +} diff --git a/cmd/kafka-consumer/event_group.go b/cmd/kafka-consumer/event_group.go new file mode 100644 index 000000000..c5e4b908d --- /dev/null +++ b/cmd/kafka-consumer/event_group.go @@ -0,0 +1,76 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "sort" + + "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" + "go.uber.org/zap" +) + +// EventsGroup could store change event message. +type eventsGroup struct { + partition int32 + tableID int64 + + events []*model.RowChangedEvent + highWatermark uint64 +} + +// NewEventsGroup will create new event group. +func NewEventsGroup(partition int32, tableID int64) *eventsGroup { + return &eventsGroup{ + partition: partition, + tableID: tableID, + events: make([]*model.RowChangedEvent, 0, 1024), + } +} + +// Append will append an event to event groups. +func (g *eventsGroup) Append(row *model.RowChangedEvent, offset kafka.Offset) { + g.events = append(g.events, row) + if row.CommitTs > g.highWatermark { + g.highWatermark = row.CommitTs + } + log.Info("DML event received", + zap.Int32("partition", g.partition), + zap.Any("offset", offset), + zap.Uint64("commitTs", row.CommitTs), + zap.Uint64("highWatermark", g.highWatermark), + zap.Int64("tableID", row.GetTableID()), + zap.String("schema", row.TableInfo.GetSchemaName()), + zap.String("table", row.TableInfo.GetTableName()), + zap.Any("columns", row.Columns), zap.Any("preColumns", row.PreColumns)) +} + +// Resolve will get events where CommitTs is less than resolveTs. +func (g *eventsGroup) Resolve(resolve uint64) []*model.RowChangedEvent { + i := sort.Search(len(g.events), func(i int) bool { + return g.events[i].CommitTs > resolve + }) + + result := g.events[:i] + g.events = g.events[i:] + if len(result) != 0 && len(g.events) != 0 { + log.Warn("not all events resolved", + zap.Int32("partition", g.partition), zap.Int64("tableID", g.tableID), + zap.Int("resolved", len(result)), zap.Int("remained", len(g.events)), + zap.Uint64("resolveTs", resolve), zap.Uint64("firstCommitTs", g.events[0].CommitTs)) + } + + return result +} diff --git a/cmd/kafka-consumer/main.go b/cmd/kafka-consumer/main.go new file mode 100644 index 000000000..ac008e967 --- /dev/null +++ b/cmd/kafka-consumer/main.go @@ -0,0 +1,111 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "flag" + "fmt" + "net/http" + _ "net/http/pprof" + "net/url" + "os" + "os/signal" + "strings" + "sync" + "syscall" + + "github.com/google/uuid" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/pkg/logutil" + "github.com/pingcap/tiflow/pkg/version" + "go.uber.org/zap" +) + +func main() { + var ( + upstreamURIStr string + configFile string + ) + groupID := fmt.Sprintf("ticdc_kafka_consumer_%s", uuid.New().String()) + consumerOption := newOption() + flag.StringVar(&configFile, "config", "", "config file for changefeed") + flag.StringVar(&upstreamURIStr, "upstream-uri", "", "Kafka uri") + flag.StringVar(&consumerOption.downstreamURI, "downstream-uri", "", "downstream sink uri") + flag.StringVar(&consumerOption.schemaRegistryURI, "schema-registry-uri", "", "schema registry uri") + flag.StringVar(&consumerOption.upstreamTiDBDSN, "upstream-tidb-dsn", "", "upstream TiDB DSN") + flag.StringVar(&consumerOption.groupID, "consumer-group-id", groupID, "consumer group id") + flag.StringVar(&consumerOption.logPath, "log-file", "cdc_kafka_consumer.log", "log file path") + flag.StringVar(&consumerOption.logLevel, "log-level", "info", "log file path") + flag.StringVar(&consumerOption.timezone, "tz", "System", "Specify time zone of Kafka consumer") + flag.StringVar(&consumerOption.ca, "ca", "", "CA certificate path for Kafka SSL connection") + flag.StringVar(&consumerOption.cert, "cert", "", "Certificate path for Kafka SSL connection") + flag.StringVar(&consumerOption.key, "key", "", "Private key path for Kafka SSL connection") + flag.BoolVar(&consumerOption.enableProfiling, "enable-profiling", false, "enable pprof profiling") + flag.Parse() + + err := logutil.InitLogger(&logutil.Config{ + Level: consumerOption.logLevel, + File: consumerOption.logPath, + }) + if err != nil { + log.Panic("init logger failed", zap.Error(err)) + } + version.LogVersionInfo("kafka consumer") + + upstreamURI, err := url.Parse(upstreamURIStr) + if err != nil { + log.Panic("invalid upstream-uri", zap.Error(err)) + } + scheme := strings.ToLower(upstreamURI.Scheme) + if scheme != "kafka" { + log.Panic("invalid upstream-uri scheme, the scheme of upstream-uri must be `kafka`", + zap.String("upstreamURI", upstreamURIStr)) + } + + err = consumerOption.Adjust(upstreamURI, configFile) + if err != nil { + log.Panic("adjust consumer option failed", zap.Error(err)) + } + + ctx, cancel := context.WithCancel(context.Background()) + consumer := newConsumer(ctx, consumerOption) + var wg sync.WaitGroup + if consumerOption.enableProfiling { + log.Info("profiling is enabled") + wg.Add(1) + go func() { + defer wg.Done() + if err = http.ListenAndServe("127.0.0.1:6060", nil); err != nil { + log.Panic("cannot start the pprof", zap.Error(err)) + } + }() + } + wg.Add(1) + go func() { + defer wg.Done() + consumer.Consume(ctx) + }() + + sigterm := make(chan os.Signal, 1) + signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) + select { + case <-ctx.Done(): + log.Info("terminating: context cancelled") + case <-sigterm: + log.Info("terminating: via signal") + } + cancel() + wg.Wait() +} diff --git a/cmd/kafka-consumer/option.go b/cmd/kafka-consumer/option.go new file mode 100644 index 000000000..e23cc42ae --- /dev/null +++ b/cmd/kafka-consumer/option.go @@ -0,0 +1,176 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "math" + "net/url" + "strconv" + "strings" + "time" + + cerror "github.com/pingcap/errors" + "github.com/pingcap/log" + cmdUtil "github.com/pingcap/tiflow/pkg/cmd/util" + "github.com/pingcap/tiflow/pkg/config" + cerrors "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/filter" + "github.com/pingcap/tiflow/pkg/sink/codec/common" + "github.com/pingcap/tiflow/pkg/util" + "go.uber.org/zap" +) + +var ( + defaultVersion = "2.4.0" + defaultRetryTime = 30 + defaultTimeout = time.Second * 10 +) + +type option struct { + address []string + version string + topic string + partitionNum int32 + groupID string + + maxMessageBytes int + maxBatchSize int + + protocol config.Protocol + + codecConfig *common.Config + // the replicaConfig of the changefeed which produce data to the kafka topic + replicaConfig *config.ReplicaConfig + + logPath string + logLevel string + timezone string + ca, cert, key string + + downstreamURI string + + // avro schema registry uri should be set if the encoding protocol is avro + schemaRegistryURI string + + // upstreamTiDBDSN is the dsn of the upstream TiDB cluster + upstreamTiDBDSN string + + enableProfiling bool + + // connect kafka retry times, default 30 + retryTime int + // connect kafka timeout, default 10s + timeout time.Duration +} + +func newOption() *option { + return &option{ + version: defaultVersion, + maxMessageBytes: math.MaxInt64, + maxBatchSize: math.MaxInt64, + retryTime: defaultRetryTime, + timeout: defaultTimeout, + } +} + +// Adjust the consumer option by the upstream uri passed in parameters. +func (o *option) Adjust(upstreamURI *url.URL, configFile string) error { + s := upstreamURI.Query().Get("version") + if s != "" { + o.version = s + } + o.topic = strings.TrimFunc(upstreamURI.Path, func(r rune) bool { + return r == '/' + }) + o.address = strings.Split(upstreamURI.Host, ",") + + s = upstreamURI.Query().Get("partition-num") + if s != "" { + c, err := strconv.ParseInt(s, 10, 32) + if err != nil { + log.Panic("invalid partition-num of upstream-uri") + } + o.partitionNum = int32(c) + } + + s = upstreamURI.Query().Get("max-message-bytes") + if s != "" { + c, err := strconv.Atoi(s) + if err != nil { + log.Panic("invalid max-message-bytes of upstream-uri") + } + o.maxMessageBytes = c + } + + s = upstreamURI.Query().Get("max-batch-size") + if s != "" { + c, err := strconv.Atoi(s) + if err != nil { + log.Panic("invalid max-batch-size of upstream-uri") + } + o.maxBatchSize = c + } + + s = upstreamURI.Query().Get("protocol") + if s == "" { + log.Panic("cannot found the protocol from the sink url") + } + protocol, err := config.ParseSinkProtocolFromString(s) + if err != nil { + log.Panic("invalid protocol", zap.Error(err), zap.String("protocol", s)) + } + o.protocol = protocol + + replicaConfig := config.GetDefaultReplicaConfig() + // the TiDB source ID should never be set to 0 + replicaConfig.Sink.TiDBSourceID = 1 + replicaConfig.Sink.Protocol = util.AddressOf(protocol.String()) + if configFile != "" { + err = cmdUtil.StrictDecodeFile(configFile, "kafka consumer", replicaConfig) + if err != nil { + return cerror.Trace(err) + } + if _, err = filter.VerifyTableRules(replicaConfig.Filter); err != nil { + return cerror.Trace(err) + } + } + o.replicaConfig = replicaConfig + + o.codecConfig = common.NewConfig(protocol) + if err = o.codecConfig.Apply(upstreamURI, o.replicaConfig); err != nil { + return cerror.Trace(err) + } + tz, err := util.GetTimezone(o.timezone) + if err != nil { + return cerrors.Trace(err) + } + o.codecConfig.TimeZone = tz + + if protocol == config.ProtocolAvro { + o.codecConfig.AvroEnableWatermark = true + } + + log.Info("consumer option adjusted", + zap.String("configFile", configFile), + zap.String("address", strings.Join(o.address, ",")), + zap.String("version", o.version), + zap.String("topic", o.topic), + zap.Int32("partitionNum", o.partitionNum), + zap.String("groupID", o.groupID), + zap.Int("maxMessageBytes", o.maxMessageBytes), + zap.Int("maxBatchSize", o.maxBatchSize), + zap.String("upstreamURI", upstreamURI.String()), + zap.String("downstreamURI", o.downstreamURI)) + return nil +} diff --git a/cmd/kafka-consumer/writer.go b/cmd/kafka-consumer/writer.go new file mode 100644 index 000000000..a40c85500 --- /dev/null +++ b/cmd/kafka-consumer/writer.go @@ -0,0 +1,545 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "database/sql" + "errors" + "math" + "sync" + "time" + + "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/sink/ddlsink" + ddlsinkfactory "github.com/pingcap/tiflow/cdc/sink/ddlsink/factory" + eventsinkfactory "github.com/pingcap/tiflow/cdc/sink/dmlsink/factory" + "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher" + "github.com/pingcap/tiflow/cdc/sink/tablesink" + "github.com/pingcap/tiflow/pkg/config" + cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/sink/codec" + "github.com/pingcap/tiflow/pkg/sink/codec/avro" + "github.com/pingcap/tiflow/pkg/sink/codec/canal" + "github.com/pingcap/tiflow/pkg/sink/codec/open" + "github.com/pingcap/tiflow/pkg/sink/codec/simple" + "github.com/pingcap/tiflow/pkg/spanz" + "go.uber.org/zap" +) + +// NewDecoder will create a new event decoder +func NewDecoder(ctx context.Context, option *option, upstreamTiDB *sql.DB) (codec.RowEventDecoder, error) { + var ( + decoder codec.RowEventDecoder + err error + ) + switch option.protocol { + case config.ProtocolOpen, config.ProtocolDefault: + decoder, err = open.NewBatchDecoder(ctx, option.codecConfig, upstreamTiDB) + case config.ProtocolCanalJSON: + decoder, err = canal.NewBatchDecoder(ctx, option.codecConfig, upstreamTiDB) + case config.ProtocolAvro: + schemaM, err := avro.NewConfluentSchemaManager(ctx, option.schemaRegistryURI, nil) + if err != nil { + return decoder, cerror.Trace(err) + } + decoder = avro.NewDecoder(option.codecConfig, schemaM, option.topic, upstreamTiDB) + case config.ProtocolSimple: + decoder, err = simple.NewDecoder(ctx, option.codecConfig, upstreamTiDB) + default: + log.Panic("Protocol not supported", zap.Any("Protocol", option.protocol)) + } + if err != nil { + return nil, cerror.Trace(err) + } + return decoder, err +} + +type partitionProgress struct { + partition int32 + watermark uint64 + watermarkOffset kafka.Offset + + tableSinkMap map[model.TableID]tablesink.TableSink + eventGroups map[model.TableID]*eventsGroup + decoder codec.RowEventDecoder +} + +func newPartitionProgress(partition int32, decoder codec.RowEventDecoder) *partitionProgress { + return &partitionProgress{ + partition: partition, + eventGroups: make(map[model.TableID]*eventsGroup), + tableSinkMap: make(map[model.TableID]tablesink.TableSink), + decoder: decoder, + } +} + +func (p *partitionProgress) updateWatermark(newWatermark uint64, offset kafka.Offset) { + watermark := p.loadWatermark() + if newWatermark >= watermark { + p.watermark = newWatermark + p.watermarkOffset = offset + log.Info("watermark received", zap.Int32("partition", p.partition), zap.Any("offset", offset), + zap.Uint64("watermark", newWatermark)) + return + } + if offset > p.watermarkOffset { + log.Panic("partition resolved ts fallback", + zap.Int32("partition", p.partition), + zap.Uint64("newWatermark", newWatermark), zap.Any("offset", offset), + zap.Uint64("watermark", watermark), zap.Any("watermarkOffset", p.watermarkOffset)) + } + log.Warn("partition resolved ts fall back, ignore it, since consumer read old offset message", + zap.Int32("partition", p.partition), + zap.Uint64("newWatermark", newWatermark), zap.Any("offset", offset), + zap.Uint64("watermark", watermark), zap.Any("watermarkOffset", p.watermarkOffset)) +} + +func (p *partitionProgress) loadWatermark() uint64 { + return p.watermark +} + +type writer struct { + option *option + + ddlList []*model.DDLEvent + ddlWithMaxCommitTs *model.DDLEvent + ddlSink ddlsink.Sink + + // sinkFactory is used to create table sink for each table. + sinkFactory *eventsinkfactory.SinkFactory + progresses []*partitionProgress + + eventRouter *dispatcher.EventRouter +} + +func newWriter(ctx context.Context, o *option) *writer { + w := &writer{ + option: o, + progresses: make([]*partitionProgress, o.partitionNum), + } + var ( + db *sql.DB + err error + ) + if o.upstreamTiDBDSN != "" { + db, err = openDB(ctx, o.upstreamTiDBDSN) + if err != nil { + log.Panic("cannot open the upstream TiDB, handle key only enabled", + zap.String("dsn", o.upstreamTiDBDSN)) + } + } + for i := 0; i < int(o.partitionNum); i++ { + decoder, err := NewDecoder(ctx, o, db) + if err != nil { + log.Panic("cannot create the decoder", zap.Error(err)) + } + w.progresses[i] = newPartitionProgress(int32(i), decoder) + } + + eventRouter, err := dispatcher.NewEventRouter(o.replicaConfig, o.protocol, o.topic, "kafka") + if err != nil { + log.Panic("initialize the event router failed", + zap.Any("protocol", o.protocol), zap.Any("topic", o.topic), + zap.Any("dispatcherRules", o.replicaConfig.Sink.DispatchRules), zap.Error(err)) + } + w.eventRouter = eventRouter + log.Info("event router created", zap.Any("protocol", o.protocol), + zap.Any("topic", o.topic), zap.Any("dispatcherRules", o.replicaConfig.Sink.DispatchRules)) + + config.GetGlobalServerConfig().TZ = o.timezone + errChan := make(chan error, 1) + changefeed := model.DefaultChangeFeedID("kafka-consumer") + f, err := eventsinkfactory.New(ctx, changefeed, o.downstreamURI, o.replicaConfig, errChan, nil) + if err != nil { + log.Panic("cannot create the event sink factory", zap.Error(err)) + } + w.sinkFactory = f + + go func() { + err := <-errChan + if !errors.Is(cerror.Cause(err), context.Canceled) { + log.Error("error on running consumer", zap.Error(err)) + } else { + log.Info("consumer exited") + } + }() + + ddlSink, err := ddlsinkfactory.New(ctx, changefeed, o.downstreamURI, o.replicaConfig) + if err != nil { + log.Panic("cannot create the ddl sink factory", zap.Error(err)) + } + w.ddlSink = ddlSink + return w +} + +// append DDL wait to be handled, only consider the constraint among DDLs. +// for DDL a / b received in the order, a.CommitTs < b.CommitTs should be true. +func (w *writer) appendDDL(ddl *model.DDLEvent, offset kafka.Offset) { + // DDL CommitTs fallback, just crash it to indicate the bug. + if w.ddlWithMaxCommitTs != nil && ddl.CommitTs < w.ddlWithMaxCommitTs.CommitTs { + log.Warn("DDL CommitTs < maxCommitTsDDL.CommitTs", + zap.Uint64("commitTs", ddl.CommitTs), + zap.Uint64("maxCommitTs", w.ddlWithMaxCommitTs.CommitTs), + zap.String("DDL", ddl.Query)) + return + } + + // A rename tables DDL job contains multiple DDL events with same CommitTs. + // So to tell if a DDL is redundant or not, we must check the equivalence of + // the current DDL and the DDL with max CommitTs. + if ddl == w.ddlWithMaxCommitTs { + log.Warn("ignore redundant DDL, the DDL is equal to ddlWithMaxCommitTs", + zap.Uint64("commitTs", ddl.CommitTs), zap.String("DDL", ddl.Query)) + return + } + + w.ddlList = append(w.ddlList, ddl) + w.ddlWithMaxCommitTs = ddl + log.Info("DDL message received", zap.Any("offset", offset), zap.Uint64("commitTs", ddl.CommitTs), zap.String("DDL", ddl.Query)) +} + +func (w *writer) getFrontDDL() *model.DDLEvent { + if len(w.ddlList) > 0 { + return w.ddlList[0] + } + return nil +} + +func (w *writer) popDDL() { + if len(w.ddlList) > 0 { + w.ddlList = w.ddlList[1:] + } +} + +func (w *writer) getMinWatermark() uint64 { + result := uint64(math.MaxUint64) + for _, p := range w.progresses { + watermark := p.loadWatermark() + if watermark < result { + result = watermark + } + } + return result +} + +// partition progress could be executed at the same time +func (w *writer) forEachPartition(fn func(p *partitionProgress)) { + var wg sync.WaitGroup + for _, p := range w.progresses { + wg.Add(1) + go func(p *partitionProgress) { + defer wg.Done() + fn(p) + }(p) + } + wg.Wait() +} + +// Write will synchronously write data downstream +func (w *writer) Write(ctx context.Context, messageType model.MessageType) bool { + watermark := w.getMinWatermark() + var todoDDL *model.DDLEvent + for { + todoDDL = w.getFrontDDL() + // watermark is the min value for all partitions, + // the DDL only executed by the first partition, other partitions may be slow + // so that the watermark can be smaller than the DDL's commitTs, + // which means some DML events may not be consumed yet, so cannot execute the DDL right now. + if todoDDL == nil || todoDDL.CommitTs > watermark { + break + } + // flush DMLs + w.forEachPartition(func(sink *partitionProgress) { + syncFlushRowChangedEvents(ctx, sink, todoDDL.CommitTs) + }) + // DDL can be executed, do it first. + if err := w.ddlSink.WriteDDLEvent(ctx, todoDDL); err != nil { + log.Panic("write DDL event failed", zap.Error(err), + zap.String("DDL", todoDDL.Query), zap.Uint64("commitTs", todoDDL.CommitTs)) + } + w.popDDL() + } + + if messageType == model.MessageTypeResolved { + w.forEachPartition(func(sink *partitionProgress) { + syncFlushRowChangedEvents(ctx, sink, watermark) + }) + } + + // The DDL events will only execute in partition0 + if messageType == model.MessageTypeDDL && todoDDL != nil { + log.Info("DDL event will be flushed in the future", + zap.Uint64("watermark", watermark), + zap.Uint64("CommitTs", todoDDL.CommitTs), + zap.String("Query", todoDDL.Query)) + return false + } + return true +} + +// WriteMessage is to decode kafka message to event. +func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool { + var ( + key = message.Key + value = message.Value + partition = message.TopicPartition.Partition + offset = message.TopicPartition.Offset + ) + + progress := w.progresses[partition] + if err := progress.decoder.AddKeyValue(key, value); err != nil { + log.Panic("add key value to the decoder failed", + zap.Int32("partition", partition), zap.Any("offset", offset), zap.Error(err)) + } + var ( + counter int + needFlush bool + messageType model.MessageType + ) + for { + ty, hasNext, err := progress.decoder.HasNext() + if err != nil { + log.Panic("decode message key failed", + zap.Int32("partition", partition), zap.Any("offset", offset), zap.Error(err)) + } + if !hasNext { + break + } + counter++ + // If the message containing only one event exceeds the length limit, CDC will allow it and issue a warning. + if len(key)+len(value) > w.option.maxMessageBytes && counter > 1 { + log.Panic("kafka max-messages-bytes exceeded", + zap.Int32("partition", partition), zap.Any("offset", offset), + zap.Int("max-message-bytes", w.option.maxMessageBytes), + zap.Int("receivedBytes", len(key)+len(value))) + } + messageType = ty + switch messageType { + case model.MessageTypeDDL: + // for some protocol, DDL would be dispatched to all partitions, + // Consider that DDL a, b, c received from partition-0, the latest DDL is c, + // if we receive `a` from partition-1, which would be seemed as DDL regression, + // then cause the consumer panic, but it was a duplicate one. + // so we only handle DDL received from partition-0 should be enough. + // but all DDL event messages should be consumed. + ddl, err := progress.decoder.NextDDLEvent() + if err != nil { + log.Panic("decode message value failed", + zap.Int32("partition", partition), zap.Any("offset", offset), + zap.ByteString("value", value), zap.Error(err)) + } + + if dec, ok := progress.decoder.(*simple.Decoder); ok { + cachedEvents := dec.GetCachedEvents() + for _, row := range cachedEvents { + w.checkPartition(row, partition, message.TopicPartition.Offset) + log.Info("simple protocol cached event resolved, append to the group", + zap.Int64("tableID", row.GetTableID()), zap.Uint64("commitTs", row.CommitTs), + zap.Int32("partition", partition), zap.Any("offset", offset)) + w.appendRow2Group(row, progress, offset) + } + } + + // the Query maybe empty if using simple protocol, it's comes from `bootstrap` event, no need to handle it. + if ddl.Query == "" { + continue + } + + if partition == 0 { + w.appendDDL(ddl, offset) + } + needFlush = true + case model.MessageTypeRow: + row, err := progress.decoder.NextRowChangedEvent() + if err != nil { + log.Panic("decode message value failed", + zap.Int32("partition", partition), zap.Any("offset", offset), + zap.ByteString("value", value), + zap.Error(err)) + } + // when using simple protocol, the row may be nil, since it's table info not received yet, + // it's cached in the decoder, so just continue here. + if w.option.protocol == config.ProtocolSimple && row == nil { + continue + } + w.checkPartition(row, partition, message.TopicPartition.Offset) + w.appendRow2Group(row, progress, offset) + case model.MessageTypeResolved: + newWatermark, err := progress.decoder.NextResolvedEvent() + if err != nil { + log.Panic("decode message value failed", + zap.Int32("partition", partition), zap.Any("offset", offset), + zap.ByteString("value", value), zap.Error(err)) + } + + progress.updateWatermark(newWatermark, offset) + w.resolveRowChangedEvents(progress, newWatermark) + needFlush = true + default: + log.Panic("unknown message type", zap.Any("messageType", messageType), + zap.Int32("partition", partition), zap.Any("offset", offset)) + } + } + + if counter > w.option.maxBatchSize { + log.Panic("Open Protocol max-batch-size exceeded", + zap.Int("max-batch-size", w.option.maxBatchSize), zap.Int("actual-batch-size", counter), + zap.Int32("partition", partition), zap.Any("offset", offset)) + } + + if !needFlush { + return false + } + // flush when received DDL event or resolvedTs + return w.Write(ctx, messageType) +} + +func (w *writer) resolveRowChangedEvents(progress *partitionProgress, newWatermark uint64) { + for tableID, group := range progress.eventGroups { + events := group.Resolve(newWatermark) + if len(events) == 0 { + continue + } + tableSink, ok := progress.tableSinkMap[tableID] + if !ok { + tableSink = w.sinkFactory.CreateTableSinkForConsumer( + model.DefaultChangeFeedID("kafka-consumer"), + spanz.TableIDToComparableSpan(tableID), + events[0].CommitTs, + ) + progress.tableSinkMap[tableID] = tableSink + } + tableSink.AppendRowChangedEvents(events...) + } +} + +func (w *writer) checkPartition(row *model.RowChangedEvent, partition int32, offset kafka.Offset) { + target, _, err := w.eventRouter.GetPartitionForRowChange(row, w.option.partitionNum) + if err != nil { + log.Panic("cannot calculate partition for the row changed event", + zap.Int32("partition", partition), zap.Any("offset", offset), + zap.Int32("partitionNum", w.option.partitionNum), zap.Int64("tableID", row.GetTableID()), + zap.Error(err), zap.Any("event", row)) + } + if partition != target { + log.Panic("RowChangedEvent dispatched to wrong partition", + zap.Int32("partition", partition), zap.Int32("expected", target), + zap.Int32("partitionNum", w.option.partitionNum), zap.Any("offset", offset), + zap.Int64("tableID", row.GetTableID()), zap.Any("row", row), + ) + } +} + +func (w *writer) appendRow2Group(row *model.RowChangedEvent, progress *partitionProgress, offset kafka.Offset) { + // if the kafka cluster is normal, this should not hit. + // else if the cluster is abnormal, the consumer may consume old message, then cause the watermark fallback. + watermark := progress.loadWatermark() + partition := progress.partition + + tableID := row.GetTableID() + group := progress.eventGroups[tableID] + if group == nil { + group = NewEventsGroup(partition, tableID) + progress.eventGroups[tableID] = group + } + if row.CommitTs < watermark { + log.Warn("RowChanged Event fallback row, since les than the partition watermark, ignore it", + zap.Int64("tableID", tableID), zap.Int32("partition", partition), + zap.Uint64("commitTs", row.CommitTs), zap.Any("offset", offset), + zap.Uint64("watermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset), + zap.String("schema", row.TableInfo.GetSchemaName()), zap.String("table", row.TableInfo.GetTableName()), + zap.Any("columns", row.Columns), zap.Any("preColumns", row.PreColumns), + zap.String("protocol", w.option.protocol.String()), zap.Bool("IsPartition", row.TableInfo.TableName.IsPartition)) + return + } + if row.CommitTs >= group.highWatermark { + group.Append(row, offset) + return + } + switch w.option.protocol { + case config.ProtocolSimple, config.ProtocolOpen, config.ProtocolCanalJSON: + // simple protocol set the table id for all row message, it can be known which table the row message belongs to, + // also consider the table partition. + // open protocol set the partition table id if the table is partitioned. + // for normal table, the table id is generated by the fake table id generator by using schema and table name. + // so one event group for one normal table or one table partition, replayed messages can be ignored. + log.Warn("RowChangedEvent fallback row, since less than the group high watermark, ignore it", + zap.Int64("tableID", tableID), zap.Int32("partition", partition), + zap.Uint64("commitTs", row.CommitTs), zap.Any("offset", offset), + zap.Uint64("highWatermark", group.highWatermark), + zap.Any("partitionWatermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset), + zap.String("schema", row.TableInfo.GetSchemaName()), zap.String("table", row.TableInfo.GetTableName()), + zap.Any("columns", row.Columns), zap.Any("preColumns", row.PreColumns), + zap.String("protocol", w.option.protocol.String()), zap.Bool("IsPartition", row.TableInfo.TableName.IsPartition)) + return + default: + } + log.Warn("RowChangedEvent fallback row, since less than the group high watermark, do not ignore it", + zap.Int64("tableID", tableID), zap.Int32("partition", partition), + zap.Uint64("commitTs", row.CommitTs), zap.Any("offset", offset), + zap.Uint64("highWatermark", group.highWatermark), + zap.Any("partitionWatermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset), + zap.String("schema", row.TableInfo.GetSchemaName()), zap.String("table", row.TableInfo.GetTableName()), + zap.Any("columns", row.Columns), zap.Any("preColumns", row.PreColumns), + zap.String("protocol", w.option.protocol.String())) + group.Append(row, offset) +} + +func syncFlushRowChangedEvents(ctx context.Context, progress *partitionProgress, watermark uint64) { + resolvedTs := model.NewResolvedTs(watermark) + for { + select { + case <-ctx.Done(): + log.Warn("sync flush row changed event canceled", zap.Error(ctx.Err())) + return + default: + } + flushedResolvedTs := true + for _, tableSink := range progress.tableSinkMap { + if err := tableSink.UpdateResolvedTs(resolvedTs); err != nil { + log.Panic("Failed to update resolved ts", zap.Error(err)) + } + if tableSink.GetCheckpointTs().Less(resolvedTs) { + flushedResolvedTs = false + } + } + if flushedResolvedTs { + return + } + } +} + +func openDB(ctx context.Context, dsn string) (*sql.DB, error) { + db, err := sql.Open("mysql", dsn) + if err != nil { + log.Error("open db failed", zap.Error(err)) + return nil, cerror.Trace(err) + } + + db.SetMaxOpenConns(10) + db.SetMaxIdleConns(10) + db.SetConnMaxLifetime(10 * time.Minute) + + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + if err = db.PingContext(ctx); err != nil { + log.Error("ping db failed", zap.String("dsn", dsn), zap.Error(err)) + return nil, cerror.Trace(err) + } + log.Info("open db success", zap.String("dsn", dsn)) + return db, nil +} diff --git a/cmd/pulsar-consumer/main.go b/cmd/pulsar-consumer/main.go new file mode 100644 index 000000000..1159b0440 --- /dev/null +++ b/cmd/pulsar-consumer/main.go @@ -0,0 +1,715 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "fmt" + "math" + "net/url" + "os" + "os/signal" + "sort" + "strconv" + "strings" + "sync" + "sync/atomic" + "syscall" + "time" + + "github.com/apache/pulsar-client-go/pulsar" + "github.com/apache/pulsar-client-go/pulsar/auth" + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/sink/ddlsink" + ddlsinkfactory "github.com/pingcap/tiflow/cdc/sink/ddlsink/factory" + eventsinkfactory "github.com/pingcap/tiflow/cdc/sink/dmlsink/factory" + "github.com/pingcap/tiflow/cdc/sink/tablesink" + sutil "github.com/pingcap/tiflow/cdc/sink/util" + cmdUtil "github.com/pingcap/tiflow/pkg/cmd/util" + "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/logutil" + "github.com/pingcap/tiflow/pkg/sink" + "github.com/pingcap/tiflow/pkg/sink/codec" + "github.com/pingcap/tiflow/pkg/sink/codec/canal" + "github.com/pingcap/tiflow/pkg/sink/codec/common" + tpulsar "github.com/pingcap/tiflow/pkg/sink/pulsar" + "github.com/pingcap/tiflow/pkg/spanz" + "github.com/pingcap/tiflow/pkg/util" + "github.com/pingcap/tiflow/pkg/version" + "github.com/spf13/cobra" + "go.uber.org/zap" +) + +// ConsumerOption represents the options of the pulsar consumer +type ConsumerOption struct { + address []string + topic string + + protocol config.Protocol + enableTiDBExtension bool + + // the replicaConfig of the changefeed which produce data to the kafka topic + replicaConfig *config.ReplicaConfig + + logPath string + logLevel string + timezone string + ca, cert, key string + + oauth2PrivateKey string + oauth2IssuerURL string + oauth2ClientID string + oauth2Scope string + oauth2Audience string + + mtlsAuthTLSCertificatePath string + mtlsAuthTLSPrivateKeyPath string + + downstreamURI string + partitionNum int +} + +func newConsumerOption() *ConsumerOption { + return &ConsumerOption{ + protocol: config.ProtocolCanalJSON, + // the default value of partitionNum is 1 + partitionNum: 1, + } +} + +// Adjust the consumer option by the upstream uri passed in parameters. +func (o *ConsumerOption) Adjust(upstreamURI *url.URL, configFile string) { + o.topic = strings.TrimFunc(upstreamURI.Path, func(r rune) bool { + return r == '/' + }) + o.address = strings.Split(upstreamURI.Host, ",") + + replicaConfig := config.GetDefaultReplicaConfig() + if configFile != "" { + err := cmdUtil.StrictDecodeFile(configFile, "pulsar consumer", replicaConfig) + if err != nil { + log.Panic("decode config file failed", zap.Error(err)) + } + } + o.replicaConfig = replicaConfig + + s := upstreamURI.Query().Get("protocol") + if s != "" { + protocol, err := config.ParseSinkProtocolFromString(s) + if err != nil { + log.Panic("invalid protocol", zap.Error(err), zap.String("protocol", s)) + } + o.protocol = protocol + } + if !sutil.IsPulsarSupportedProtocols(o.protocol) { + log.Panic("unsupported protocol, pulsar sink currently only support these protocols: [canal-json]", + zap.String("protocol", s)) + } + + s = upstreamURI.Query().Get("enable-tidb-extension") + if s != "" { + enableTiDBExtension, err := strconv.ParseBool(s) + if err != nil { + log.Panic("invalid enable-tidb-extension of upstream-uri") + } + o.enableTiDBExtension = enableTiDBExtension + } + + log.Info("consumer option adjusted", + zap.String("configFile", configFile), + zap.String("address", strings.Join(o.address, ",")), + zap.String("topic", o.topic), + zap.Any("protocol", o.protocol), + zap.Bool("enableTiDBExtension", o.enableTiDBExtension)) +} + +var ( + upstreamURIStr string + configFile string + consumerOption = newConsumerOption() +) + +func main() { + cmd := &cobra.Command{ + Use: "pulsar consumer", + Run: run, + } + // Flags for the root command + cmd.Flags().StringVar(&configFile, "config", "", "config file for changefeed") + cmd.Flags().StringVar(&upstreamURIStr, "upstream-uri", "", "pulsar uri") + cmd.Flags().StringVar(&consumerOption.downstreamURI, "downstream-uri", "", "downstream sink uri") + cmd.Flags().StringVar(&consumerOption.timezone, "tz", "System", "Specify time zone of pulsar consumer") + cmd.Flags().StringVar(&consumerOption.ca, "ca", "", "CA certificate path for pulsar SSL connection") + cmd.Flags().StringVar(&consumerOption.cert, "cert", "", "Certificate path for pulsar SSL connection") + cmd.Flags().StringVar(&consumerOption.key, "key", "", "Private key path for pulsar SSL connection") + cmd.Flags().StringVar(&consumerOption.logPath, "log-file", "cdc_pulsar_consumer.log", "log file path") + cmd.Flags().StringVar(&consumerOption.logLevel, "log-level", "info", "log file path") + cmd.Flags().StringVar(&consumerOption.oauth2PrivateKey, "oauth2-private-key", "", "oauth2 private key path") + cmd.Flags().StringVar(&consumerOption.oauth2IssuerURL, "oauth2-issuer-url", "", "oauth2 issuer url") + cmd.Flags().StringVar(&consumerOption.oauth2ClientID, "oauth2-client-id", "", "oauth2 client id") + cmd.Flags().StringVar(&consumerOption.oauth2Audience, "oauth2-scope", "", "oauth2 scope") + cmd.Flags().StringVar(&consumerOption.oauth2Audience, "oauth2-audience", "", "oauth2 audience") + cmd.Flags().StringVar(&consumerOption.mtlsAuthTLSCertificatePath, "auth-tls-certificate-path", "", "mtls certificate path") + cmd.Flags().StringVar(&consumerOption.mtlsAuthTLSPrivateKeyPath, "auth-tls-private-key-path", "", "mtls private key path") + if err := cmd.Execute(); err != nil { + fmt.Println(err) + } +} + +func run(_ *cobra.Command, _ []string) { + err := logutil.InitLogger(&logutil.Config{ + Level: consumerOption.logLevel, + File: consumerOption.logPath, + }) + if err != nil { + log.Error("init logger failed", zap.Error(err)) + return + } + + version.LogVersionInfo("pulsar consumer") + + upstreamURI, err := url.Parse(upstreamURIStr) + if err != nil { + log.Panic("invalid upstream-uri", zap.Error(err)) + } + scheme := strings.ToLower(upstreamURI.Scheme) + if !sink.IsPulsarScheme(scheme) { + log.Panic("invalid upstream-uri scheme, the scheme of upstream-uri must be pulsar schema", + zap.String("schema", scheme), + zap.String("upstreamURI", upstreamURIStr)) + } + + consumerOption.Adjust(upstreamURI, configFile) + + ctx, cancel := context.WithCancel(context.Background()) + consumer, err := NewConsumer(ctx, consumerOption) + if err != nil { + log.Panic("Error creating pulsar consumer", zap.Error(err)) + } + + pulsarConsumer, client := NewPulsarConsumer(consumerOption) + defer client.Close() + defer pulsarConsumer.Close() + msgChan := pulsarConsumer.Chan() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + log.Info("terminating: context cancelled") + return + case consumerMsg := <-msgChan: + log.Debug(fmt.Sprintf("Received message msgId: %#v -- content: '%s'\n", + consumerMsg.ID(), + string(consumerMsg.Payload()))) + err = consumer.HandleMsg(consumerMsg.Message) + if err != nil { + log.Panic("Error consuming message", zap.Error(err)) + } + err = pulsarConsumer.AckID(consumerMsg.Message.ID()) + if err != nil { + log.Panic("Error ack message", zap.Error(err)) + } + } + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + if err = consumer.Run(ctx); err != nil { + if err != context.Canceled { + log.Panic("Error running consumer", zap.Error(err)) + } + } + }() + + log.Info("TiCDC consumer up and running!...") + sigterm := make(chan os.Signal, 1) + signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) + select { + case <-ctx.Done(): + log.Info("terminating: context cancelled") + case <-sigterm: + log.Info("terminating: via signal") + } + cancel() + wg.Wait() +} + +// NewPulsarConsumer creates a pulsar consumer +func NewPulsarConsumer(option *ConsumerOption) (pulsar.Consumer, pulsar.Client) { + var pulsarURL string + if len(option.ca) != 0 { + pulsarURL = "pulsar+ssl" + "://" + option.address[0] + } else { + pulsarURL = "pulsar" + "://" + option.address[0] + } + topicName := option.topic + subscriptionName := "pulsar-test-subscription" + + clientOption := pulsar.ClientOptions{ + URL: pulsarURL, + Logger: tpulsar.NewPulsarLogger(log.L()), + } + if len(option.ca) != 0 { + clientOption.TLSTrustCertsFilePath = option.ca + clientOption.TLSCertificateFile = option.cert + clientOption.TLSKeyFilePath = option.key + } + + var authentication pulsar.Authentication + if len(option.oauth2PrivateKey) != 0 { + authentication = pulsar.NewAuthenticationOAuth2(map[string]string{ + auth.ConfigParamIssuerURL: option.oauth2IssuerURL, + auth.ConfigParamAudience: option.oauth2Audience, + auth.ConfigParamKeyFile: option.oauth2PrivateKey, + auth.ConfigParamClientID: option.oauth2ClientID, + auth.ConfigParamScope: option.oauth2Scope, + auth.ConfigParamType: auth.ConfigParamTypeClientCredentials, + }) + log.Info("oauth2 authentication is enabled", zap.String("issuer url", option.oauth2IssuerURL)) + clientOption.Authentication = authentication + } + if len(option.mtlsAuthTLSCertificatePath) != 0 { + authentication = pulsar.NewAuthenticationTLS(option.mtlsAuthTLSCertificatePath, option.mtlsAuthTLSPrivateKeyPath) + log.Info("mtls authentication is enabled", + zap.String("cert", option.mtlsAuthTLSCertificatePath), + zap.String("key", option.mtlsAuthTLSPrivateKeyPath), + ) + clientOption.Authentication = authentication + } + + client, err := pulsar.NewClient(clientOption) + if err != nil { + log.Fatal("can't create pulsar client", zap.Error(err)) + } + + consumerConfig := pulsar.ConsumerOptions{ + Topic: topicName, + SubscriptionName: subscriptionName, + Type: pulsar.Exclusive, + SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest, + } + + consumer, err := client.Subscribe(consumerConfig) + if err != nil { + log.Fatal("can't create pulsar consumer", zap.Error(err)) + } + return consumer, client +} + +// partitionSinks maintained for each partition, it may sync data for multiple tables. +type partitionSinks struct { + decoder codec.RowEventDecoder + + tablesCommitTsMap sync.Map + tableSinksMap sync.Map + // resolvedTs record the maximum timestamp of the received event + resolvedTs uint64 +} + +// Consumer represents a local pulsar consumer +type Consumer struct { + eventGroups map[int64]*eventsGroup + ddlList []*model.DDLEvent + ddlListMu sync.Mutex + lastReceivedDDL *model.DDLEvent + ddlSink ddlsink.Sink + + // sinkFactory is used to create table sink for each table. + sinkFactory *eventsinkfactory.SinkFactory + sinks []*partitionSinks + sinksMu sync.Mutex + + // initialize to 0 by default + globalResolvedTs uint64 + + tz *time.Location + + codecConfig *common.Config + + option *ConsumerOption +} + +// NewConsumer creates a new cdc pulsar consumer +// the consumer is responsible for consuming the data from the pulsar topic +// and write the data to the downstream. +func NewConsumer(ctx context.Context, o *ConsumerOption) (*Consumer, error) { + c := new(Consumer) + c.option = o + + tz, err := util.GetTimezone(o.timezone) + if err != nil { + return nil, errors.Annotate(err, "can not load timezone") + } + config.GetGlobalServerConfig().TZ = o.timezone + c.tz = tz + + c.codecConfig = common.NewConfig(o.protocol) + c.codecConfig.EnableTiDBExtension = o.enableTiDBExtension + decoder, err := canal.NewBatchDecoder(ctx, c.codecConfig, nil) + if err != nil { + return nil, errors.Trace(err) + } + c.sinks = make([]*partitionSinks, o.partitionNum) + for i := 0; i < o.partitionNum; i++ { + c.sinks[i] = &partitionSinks{ + decoder: decoder, + } + } + + ctx, cancel := context.WithCancel(ctx) + errChan := make(chan error, 1) + changefeedID := model.DefaultChangeFeedID("pulsar-consumer") + f, err := eventsinkfactory.New(ctx, changefeedID, o.downstreamURI, o.replicaConfig, errChan, nil) + if err != nil { + cancel() + return nil, errors.Trace(err) + } + c.sinkFactory = f + + go func() { + err := <-errChan + if errors.Cause(err) != context.Canceled { + log.Error("error on running consumer", zap.Error(err)) + } else { + log.Info("consumer exited") + } + cancel() + }() + + ddlSink, err := ddlsinkfactory.New(ctx, changefeedID, o.downstreamURI, o.replicaConfig) + if err != nil { + cancel() + return nil, errors.Trace(err) + } + c.ddlSink = ddlSink + c.eventGroups = make(map[int64]*eventsGroup) + return c, nil +} + +type eventsGroup struct { + events []*model.RowChangedEvent +} + +func newEventsGroup() *eventsGroup { + return &eventsGroup{ + events: make([]*model.RowChangedEvent, 0), + } +} + +func (g *eventsGroup) Append(e *model.RowChangedEvent) { + g.events = append(g.events, e) +} + +func (g *eventsGroup) Resolve(resolveTs uint64) []*model.RowChangedEvent { + sort.Slice(g.events, func(i, j int) bool { + return g.events[i].CommitTs < g.events[j].CommitTs + }) + + i := sort.Search(len(g.events), func(i int) bool { + return g.events[i].CommitTs > resolveTs + }) + result := g.events[:i] + g.events = g.events[i:] + + return result +} + +// HandleMsg handles the message received from the pulsar consumer +func (c *Consumer) HandleMsg(msg pulsar.Message) error { + c.sinksMu.Lock() + sink := c.sinks[0] + c.sinksMu.Unlock() + + decoder := sink.decoder + if err := decoder.AddKeyValue([]byte(msg.Key()), msg.Payload()); err != nil { + log.Error("add key value to the decoder failed", zap.Error(err)) + return errors.Trace(err) + } + + counter := 0 + for { + tp, hasNext, err := decoder.HasNext() + if err != nil { + log.Panic("decode message key failed", zap.Error(err)) + } + if !hasNext { + break + } + + counter++ + switch tp { + case model.MessageTypeDDL: + // for some protocol, DDL would be dispatched to all partitions, + // Consider that DDL a, b, c received from partition-0, the latest DDL is c, + // if we receive `a` from partition-1, which would be seemed as DDL regression, + // then cause the consumer panic, but it was a duplicate one. + // so we only handle DDL received from partition-0 should be enough. + // but all DDL event messages should be consumed. + ddl, err := decoder.NextDDLEvent() + if err != nil { + log.Panic("decode message value failed", + zap.ByteString("value", msg.Payload()), + zap.Error(err)) + } + c.appendDDL(ddl) + case model.MessageTypeRow: + row, err := decoder.NextRowChangedEvent() + if err != nil { + log.Panic("decode message value failed", + zap.ByteString("value", msg.Payload()), + zap.Error(err)) + } + globalResolvedTs := atomic.LoadUint64(&c.globalResolvedTs) + partitionResolvedTs := atomic.LoadUint64(&sink.resolvedTs) + if row.CommitTs <= globalResolvedTs || row.CommitTs <= partitionResolvedTs { + log.Warn("RowChangedEvent fallback row, ignore it", + zap.Uint64("commitTs", row.CommitTs), + zap.Uint64("globalResolvedTs", globalResolvedTs), + zap.Uint64("partitionResolvedTs", partitionResolvedTs), + zap.Int32("partition", msg.ID().PartitionIdx()), + zap.Any("row", row)) + // todo: mark the offset after the DDL is fully synced to the downstream mysql. + continue + } + tableID := row.GetTableID() + group, ok := c.eventGroups[tableID] + if !ok { + group = newEventsGroup() + c.eventGroups[tableID] = group + } + group.Append(row) + log.Info("DML event received", + zap.Int64("tableID", row.GetTableID()), + zap.String("schema", row.TableInfo.GetSchemaName()), + zap.String("table", row.TableInfo.GetTableName()), + zap.Uint64("commitTs", row.CommitTs), + zap.Any("columns", row.Columns), zap.Any("preColumns", row.PreColumns)) + case model.MessageTypeResolved: + ts, err := decoder.NextResolvedEvent() + if err != nil { + log.Panic("decode message value failed", + zap.ByteString("value", msg.Payload()), + zap.Error(err)) + } + + globalResolvedTs := atomic.LoadUint64(&c.globalResolvedTs) + partitionResolvedTs := atomic.LoadUint64(&sink.resolvedTs) + if ts < globalResolvedTs || ts < partitionResolvedTs { + log.Warn("partition resolved ts fallback, skip it", + zap.Uint64("ts", ts), + zap.Uint64("partitionResolvedTs", partitionResolvedTs), + zap.Uint64("globalResolvedTs", globalResolvedTs), + zap.Int32("partition", msg.ID().PartitionIdx())) + continue + } + + for tableID, group := range c.eventGroups { + events := group.Resolve(ts) + if len(events) == 0 { + continue + } + if _, ok := sink.tableSinksMap.Load(tableID); !ok { + log.Info("create table sink for consumer", zap.Any("tableID", tableID)) + tableSink := c.sinkFactory.CreateTableSinkForConsumer( + model.DefaultChangeFeedID("pulsar-consumer"), + spanz.TableIDToComparableSpan(tableID), + events[0].CommitTs) + + log.Info("table sink created", zap.Any("tableID", tableID), + zap.Any("tableSink", tableSink.GetCheckpointTs())) + + sink.tableSinksMap.Store(tableID, tableSink) + } + s, _ := sink.tableSinksMap.Load(tableID) + s.(tablesink.TableSink).AppendRowChangedEvents(events...) + commitTs := events[len(events)-1].CommitTs + lastCommitTs, ok := sink.tablesCommitTsMap.Load(tableID) + if !ok || lastCommitTs.(uint64) < commitTs { + sink.tablesCommitTsMap.Store(tableID, commitTs) + } + } + atomic.StoreUint64(&sink.resolvedTs, ts) + log.Info("resolved ts updated", zap.Uint64("resolvedTs", ts)) + } + + } + return nil +} + +// append DDL wait to be handled, only consider the constraint among DDLs. +// for DDL a / b received in the order, a.CommitTs < b.CommitTs should be true. +func (c *Consumer) appendDDL(ddl *model.DDLEvent) { + c.ddlListMu.Lock() + defer c.ddlListMu.Unlock() + // DDL CommitTs fallback, just crash it to indicate the bug. + if c.lastReceivedDDL != nil && ddl.CommitTs < c.lastReceivedDDL.CommitTs { + log.Panic("DDL CommitTs < lastReceivedDDL.CommitTs", + zap.Uint64("commitTs", ddl.CommitTs), + zap.Uint64("lastReceivedDDLCommitTs", c.lastReceivedDDL.CommitTs), + zap.Uint64("commitTs", ddl.CommitTs), zap.String("DDL", ddl.Query)) + } + + // A rename tables DDL job contains multiple DDL events with same CommitTs. + // So to tell if a DDL is redundant or not, we must check the equivalence of + // the current DDL and the DDL with max CommitTs. + if ddl == c.lastReceivedDDL { + log.Info("ignore redundant DDL, the DDL is equal to ddlWithMaxCommitTs", + zap.Uint64("commitTs", ddl.CommitTs), zap.String("DDL", ddl.Query)) + return + } + + c.ddlList = append(c.ddlList, ddl) + log.Info("DDL event received", zap.Uint64("commitTs", ddl.CommitTs), zap.String("DDL", ddl.Query)) + c.lastReceivedDDL = ddl +} + +func (c *Consumer) getFrontDDL() *model.DDLEvent { + c.ddlListMu.Lock() + defer c.ddlListMu.Unlock() + if len(c.ddlList) > 0 { + return c.ddlList[0] + } + return nil +} + +func (c *Consumer) popDDL() *model.DDLEvent { + c.ddlListMu.Lock() + defer c.ddlListMu.Unlock() + if len(c.ddlList) > 0 { + ddl := c.ddlList[0] + c.ddlList = c.ddlList[1:] + return ddl + } + return nil +} + +func (c *Consumer) forEachSink(fn func(sink *partitionSinks) error) error { + c.sinksMu.Lock() + defer c.sinksMu.Unlock() + for _, sink := range c.sinks { + if err := fn(sink); err != nil { + return errors.Trace(err) + } + } + return nil +} + +// getMinResolvedTs returns the minimum resolvedTs of all the partitionSinks +func (c *Consumer) getMinResolvedTs() uint64 { + result := uint64(math.MaxUint64) + _ = c.forEachSink(func(sink *partitionSinks) error { + a := atomic.LoadUint64(&sink.resolvedTs) + if a < result { + result = a + } + return nil + }) + return result +} + +// Run the Consumer +func (c *Consumer) Run(ctx context.Context) error { + ticker := time.NewTicker(200 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + minResolvedTs := c.getMinResolvedTs() + + // 2. check if there is a DDL event that can be executed + // if there is, execute it and update the minResolvedTs + nextDDL := c.getFrontDDL() + if nextDDL != nil { + log.Info("get nextDDL", zap.Any("DDL", nextDDL)) + } + if nextDDL != nil && minResolvedTs >= nextDDL.CommitTs { + // flush DMLs that commitTs <= todoDDL.CommitTs + if err := c.forEachSink(func(sink *partitionSinks) error { + return flushRowChangedEvents(ctx, sink, nextDDL.CommitTs) + }); err != nil { + return errors.Trace(err) + } + log.Info("begin to execute DDL", zap.Any("DDL", nextDDL)) + // all DMLs with commitTs <= todoDDL.CommitTs have been flushed to downstream, + // so we can execute the DDL now. + if err := c.ddlSink.WriteDDLEvent(ctx, nextDDL); err != nil { + return errors.Trace(err) + } + ddl := c.popDDL() + log.Info("DDL executed", zap.Any("DDL", ddl)) + minResolvedTs = ddl.CommitTs + } + + // 3. Update global resolved ts + if c.globalResolvedTs > minResolvedTs { + log.Panic("global ResolvedTs fallback", + zap.Uint64("globalResolvedTs", c.globalResolvedTs), + zap.Uint64("minPartitionResolvedTs", minResolvedTs)) + } + + if c.globalResolvedTs < minResolvedTs { + c.globalResolvedTs = minResolvedTs + } + + // 4. flush all the DMLs that commitTs <= globalResolvedTs + if err := c.forEachSink(func(sink *partitionSinks) error { + return flushRowChangedEvents(ctx, sink, c.globalResolvedTs) + }); err != nil { + return errors.Trace(err) + } + } + } +} + +// flushRowChangedEvents flushes all the DMLs that commitTs <= resolvedTs +// Note: This function is synchronous, it will block until all the DMLs are flushed. +func flushRowChangedEvents(ctx context.Context, sink *partitionSinks, resolvedTs uint64) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + flushedResolvedTs := true + sink.tablesCommitTsMap.Range(func(key, value interface{}) bool { + tableID := key.(int64) + resolvedTs := model.NewResolvedTs(resolvedTs) + tableSink, ok := sink.tableSinksMap.Load(tableID) + if !ok { + log.Panic("Table sink not found", zap.Int64("tableID", tableID)) + } + if err := tableSink.(tablesink.TableSink).UpdateResolvedTs(resolvedTs); err != nil { + log.Error("Failed to update resolved ts", zap.Error(err)) + return false + } + if !tableSink.(tablesink.TableSink).GetCheckpointTs().EqualOrGreater(resolvedTs) { + flushedResolvedTs = false + } + return true + }) + if flushedResolvedTs { + return nil + } + } +} diff --git a/cmd/storage-consumer/main.go b/cmd/storage-consumer/main.go new file mode 100644 index 000000000..7713acc46 --- /dev/null +++ b/cmd/storage-consumer/main.go @@ -0,0 +1,681 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "encoding/json" + "flag" + "fmt" + "net/http" + "net/url" + "os" + "os/signal" + "sort" + "strings" + "sync" + "syscall" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/tidb/br/pkg/storage" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/sink/ddlsink" + ddlfactory "github.com/pingcap/tiflow/cdc/sink/ddlsink/factory" + dmlfactory "github.com/pingcap/tiflow/cdc/sink/dmlsink/factory" + "github.com/pingcap/tiflow/cdc/sink/tablesink" + sinkutil "github.com/pingcap/tiflow/cdc/sink/util" + "github.com/pingcap/tiflow/pkg/cmd/util" + "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/logutil" + "github.com/pingcap/tiflow/pkg/quotes" + psink "github.com/pingcap/tiflow/pkg/sink" + "github.com/pingcap/tiflow/pkg/sink/cloudstorage" + "github.com/pingcap/tiflow/pkg/sink/codec" + "github.com/pingcap/tiflow/pkg/sink/codec/canal" + "github.com/pingcap/tiflow/pkg/sink/codec/common" + "github.com/pingcap/tiflow/pkg/sink/codec/csv" + "github.com/pingcap/tiflow/pkg/spanz" + putil "github.com/pingcap/tiflow/pkg/util" + "github.com/pingcap/tiflow/pkg/version" + "go.uber.org/zap" +) + +var ( + upstreamURIStr string + upstreamURI *url.URL + downstreamURIStr string + configFile string + logFile string + logLevel string + flushInterval time.Duration + fileIndexWidth int + enableProfiling bool + timezone string +) + +const ( + defaultChangefeedName = "storage-consumer" + defaultFlushWaitDuration = 200 * time.Millisecond + fakePartitionNumForSchemaFile = -1 +) + +func init() { + version.LogVersionInfo("storage consumer") + flag.StringVar(&upstreamURIStr, "upstream-uri", "", "storage uri") + flag.StringVar(&downstreamURIStr, "downstream-uri", "", "downstream sink uri") + flag.StringVar(&configFile, "config", "", "changefeed configuration file") + flag.StringVar(&logFile, "log-file", "", "log file path") + flag.StringVar(&logLevel, "log-level", "info", "log level") + flag.DurationVar(&flushInterval, "flush-interval", 10*time.Second, "flush interval") + flag.IntVar(&fileIndexWidth, "file-index-width", + config.DefaultFileIndexWidth, "file index width") + flag.BoolVar(&enableProfiling, "enable-profiling", false, "whether to enable profiling") + flag.StringVar(&timezone, "tz", "System", "Specify time zone of storage consumer") + flag.Parse() + + err := logutil.InitLogger(&logutil.Config{ + Level: logLevel, + File: logFile, + }) + if err != nil { + log.Error("init logger failed", zap.Error(err)) + os.Exit(1) + } + + uri, err := url.Parse(upstreamURIStr) + if err != nil { + log.Error("invalid upstream-uri", zap.Error(err)) + os.Exit(1) + } + upstreamURI = uri + scheme := strings.ToLower(upstreamURI.Scheme) + if !psink.IsStorageScheme(scheme) { + log.Error("invalid storage scheme, the scheme of upstream-uri must be file/s3/azblob/gcs") + os.Exit(1) + } +} + +// fileIndexRange defines a range of files. eg. CDC000002.csv ~ CDC000005.csv +type fileIndexRange struct { + start uint64 + end uint64 +} + +type consumer struct { + sinkFactory *dmlfactory.SinkFactory + ddlSink ddlsink.Sink + replicationCfg *config.ReplicaConfig + codecCfg *common.Config + externalStorage storage.ExternalStorage + fileExtension string + // tableDMLIdxMap maintains a map of + tableDMLIdxMap map[cloudstorage.DmlPathKey]uint64 + // tableTsMap maintains a map of + tableTsMap map[model.TableID]model.ResolvedTs + // tableDefMap maintains a map of <`schema`.`table`, tableDef slice sorted by TableVersion> + tableDefMap map[string]map[uint64]*cloudstorage.TableDefinition + // tableSinkMap maintains a map of + tableSinkMap map[model.TableID]tablesink.TableSink + tableIDGenerator *fakeTableIDGenerator + errCh chan error +} + +func newConsumer(ctx context.Context) (*consumer, error) { + _, err := putil.GetTimezone(timezone) + if err != nil { + return nil, errors.Annotate(err, "can not load timezone") + } + serverCfg := config.GetGlobalServerConfig().Clone() + serverCfg.TZ = timezone + config.StoreGlobalServerConfig(serverCfg) + replicaConfig := config.GetDefaultReplicaConfig() + if len(configFile) > 0 { + err := util.StrictDecodeFile(configFile, "storage consumer", replicaConfig) + if err != nil { + log.Error("failed to decode config file", zap.Error(err)) + return nil, err + } + } + + err = replicaConfig.ValidateAndAdjust(upstreamURI) + if err != nil { + log.Error("failed to validate replica config", zap.Error(err)) + return nil, err + } + + switch putil.GetOrZero(replicaConfig.Sink.Protocol) { + case config.ProtocolCsv.String(): + case config.ProtocolCanalJSON.String(): + default: + return nil, fmt.Errorf( + "data encoded in protocol %s is not supported yet", + putil.GetOrZero(replicaConfig.Sink.Protocol), + ) + } + + protocol, err := config.ParseSinkProtocolFromString(putil.GetOrZero(replicaConfig.Sink.Protocol)) + if err != nil { + return nil, err + } + + codecConfig := common.NewConfig(protocol) + err = codecConfig.Apply(upstreamURI, replicaConfig) + if err != nil { + return nil, err + } + + extension := sinkutil.GetFileExtension(protocol) + + storage, err := putil.GetExternalStorageFromURI(ctx, upstreamURIStr) + if err != nil { + log.Error("failed to create external storage", zap.Error(err)) + return nil, err + } + + errCh := make(chan error, 1) + stdCtx := ctx + sinkFactory, err := dmlfactory.New( + stdCtx, + model.DefaultChangeFeedID(defaultChangefeedName), + downstreamURIStr, + replicaConfig, + errCh, + nil, + ) + if err != nil { + log.Error("failed to create event sink factory", zap.Error(err)) + return nil, err + } + + ddlSink, err := ddlfactory.New(ctx, model.DefaultChangeFeedID(defaultChangefeedName), + downstreamURIStr, replicaConfig) + if err != nil { + log.Error("failed to create ddl sink", zap.Error(err)) + return nil, err + } + + return &consumer{ + sinkFactory: sinkFactory, + ddlSink: ddlSink, + replicationCfg: replicaConfig, + codecCfg: codecConfig, + externalStorage: storage, + fileExtension: extension, + errCh: errCh, + tableDMLIdxMap: make(map[cloudstorage.DmlPathKey]uint64), + tableTsMap: make(map[model.TableID]model.ResolvedTs), + tableDefMap: make(map[string]map[uint64]*cloudstorage.TableDefinition), + tableSinkMap: make(map[model.TableID]tablesink.TableSink), + tableIDGenerator: &fakeTableIDGenerator{ + tableIDs: make(map[string]int64), + }, + }, nil +} + +// map1 - map2 +func diffDMLMaps( + map1, map2 map[cloudstorage.DmlPathKey]uint64, +) map[cloudstorage.DmlPathKey]fileIndexRange { + resMap := make(map[cloudstorage.DmlPathKey]fileIndexRange) + for k, v := range map1 { + if _, ok := map2[k]; !ok { + resMap[k] = fileIndexRange{ + start: 1, + end: v, + } + } else if v > map2[k] { + resMap[k] = fileIndexRange{ + start: map2[k] + 1, + end: v, + } + } + } + + return resMap +} + +// getNewFiles returns newly created dml files in specific ranges +func (c *consumer) getNewFiles( + ctx context.Context, +) (map[cloudstorage.DmlPathKey]fileIndexRange, error) { + tableDMLMap := make(map[cloudstorage.DmlPathKey]fileIndexRange) + opt := &storage.WalkOption{SubDir: ""} + + origDMLIdxMap := make(map[cloudstorage.DmlPathKey]uint64, len(c.tableDMLIdxMap)) + for k, v := range c.tableDMLIdxMap { + origDMLIdxMap[k] = v + } + + err := c.externalStorage.WalkDir(ctx, opt, func(path string, size int64) error { + if cloudstorage.IsSchemaFile(path) { + err := c.parseSchemaFilePath(ctx, path) + if err != nil { + log.Error("failed to parse schema file path", zap.Error(err)) + // skip handling this file + return nil + } + } else if strings.HasSuffix(path, c.fileExtension) { + err := c.parseDMLFilePath(ctx, path) + if err != nil { + log.Error("failed to parse dml file path", zap.Error(err)) + // skip handling this file + return nil + } + } else { + log.Debug("ignore handling file", zap.String("path", path)) + } + return nil + }) + if err != nil { + return tableDMLMap, err + } + + tableDMLMap = diffDMLMaps(c.tableDMLIdxMap, origDMLIdxMap) + return tableDMLMap, err +} + +// emitDMLEvents decodes RowChangedEvents from file content and emit them. +func (c *consumer) emitDMLEvents( + ctx context.Context, tableID int64, + tableDetail cloudstorage.TableDefinition, + pathKey cloudstorage.DmlPathKey, + content []byte, +) error { + var ( + decoder codec.RowEventDecoder + err error + ) + + tableInfo, err := tableDetail.ToTableInfo() + if err != nil { + return errors.Trace(err) + } + + switch c.codecCfg.Protocol { + case config.ProtocolCsv: + decoder, err = csv.NewBatchDecoder(ctx, c.codecCfg, tableInfo, content) + if err != nil { + return errors.Trace(err) + } + case config.ProtocolCanalJSON: + // Always enable tidb extension for canal-json protocol + // because we need to get the commit ts from the extension field. + c.codecCfg.EnableTiDBExtension = true + decoder, err = canal.NewBatchDecoder(ctx, c.codecCfg, nil) + err = decoder.AddKeyValue(nil, content) + if err != nil { + return errors.Trace(err) + } + } + + cnt := 0 + filteredCnt := 0 + for { + tp, hasNext, err := decoder.HasNext() + if err != nil { + log.Error("failed to decode message", zap.Error(err)) + return err + } + if !hasNext { + break + } + cnt++ + + if tp == model.MessageTypeRow { + row, err := decoder.NextRowChangedEvent() + if err != nil { + log.Error("failed to get next row changed event", zap.Error(err)) + return errors.Trace(err) + } + + if _, ok := c.tableSinkMap[tableID]; !ok { + c.tableSinkMap[tableID] = c.sinkFactory.CreateTableSinkForConsumer( + model.DefaultChangeFeedID(defaultChangefeedName), + spanz.TableIDToComparableSpan(tableID), + row.CommitTs) + } + + _, ok := c.tableTsMap[tableID] + if !ok || row.CommitTs > c.tableTsMap[tableID].Ts { + c.tableTsMap[tableID] = model.ResolvedTs{ + Mode: model.BatchResolvedMode, + Ts: row.CommitTs, + BatchID: 1, + } + } else if row.CommitTs == c.tableTsMap[tableID].Ts { + c.tableTsMap[tableID] = c.tableTsMap[tableID].AdvanceBatch() + } else { + log.Warn("row changed event commit ts fallback, ignore", + zap.Uint64("commitTs", row.CommitTs), + zap.Any("tableMaxCommitTs", c.tableTsMap[tableID]), + zap.Any("row", row), + ) + continue + } + row.PhysicalTableID = tableID + c.tableSinkMap[tableID].AppendRowChangedEvents(row) + filteredCnt++ + } + } + log.Info("decode success", zap.String("schema", pathKey.Schema), + zap.String("table", pathKey.Table), + zap.Uint64("version", pathKey.TableVersion), + zap.Int("decodeRowsCnt", cnt), + zap.Int("filteredRowsCnt", filteredCnt)) + + return err +} + +func (c *consumer) waitTableFlushComplete(ctx context.Context, tableID model.TableID) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-c.errCh: + return err + default: + } + + resolvedTs := c.tableTsMap[tableID] + err := c.tableSinkMap[tableID].UpdateResolvedTs(resolvedTs) + if err != nil { + return errors.Trace(err) + } + checkpoint := c.tableSinkMap[tableID].GetCheckpointTs() + if checkpoint.Equal(resolvedTs) { + c.tableTsMap[tableID] = resolvedTs.AdvanceBatch() + return nil + } + time.Sleep(defaultFlushWaitDuration) + } +} + +func (c *consumer) syncExecDMLEvents( + ctx context.Context, + tableDef cloudstorage.TableDefinition, + key cloudstorage.DmlPathKey, + fileIdx uint64, +) error { + filePath := key.GenerateDMLFilePath(fileIdx, c.fileExtension, fileIndexWidth) + log.Debug("read from dml file path", zap.String("path", filePath)) + content, err := c.externalStorage.ReadFile(ctx, filePath) + if err != nil { + return errors.Trace(err) + } + tableID := c.tableIDGenerator.generateFakeTableID( + key.Schema, key.Table, key.PartitionNum) + err = c.emitDMLEvents(ctx, tableID, tableDef, key, content) + if err != nil { + return errors.Trace(err) + } + + resolvedTs := c.tableTsMap[tableID] + err = c.tableSinkMap[tableID].UpdateResolvedTs(resolvedTs) + if err != nil { + return errors.Trace(err) + } + err = c.waitTableFlushComplete(ctx, tableID) + if err != nil { + return errors.Trace(err) + } + + return nil +} + +func (c *consumer) parseDMLFilePath(_ context.Context, path string) error { + var dmlkey cloudstorage.DmlPathKey + fileIdx, err := dmlkey.ParseDMLFilePath( + putil.GetOrZero(c.replicationCfg.Sink.DateSeparator), + path, + ) + if err != nil { + return errors.Trace(err) + } + + if _, ok := c.tableDMLIdxMap[dmlkey]; !ok || fileIdx >= c.tableDMLIdxMap[dmlkey] { + c.tableDMLIdxMap[dmlkey] = fileIdx + } + return nil +} + +func (c *consumer) parseSchemaFilePath(ctx context.Context, path string) error { + var schemaKey cloudstorage.SchemaPathKey + checksumInFile, err := schemaKey.ParseSchemaFilePath(path) + if err != nil { + return errors.Trace(err) + } + key := schemaKey.GetKey() + if tableDefs, ok := c.tableDefMap[key]; ok { + if _, ok := tableDefs[schemaKey.TableVersion]; ok { + // Skip if tableDef already exists. + return nil + } + } else { + c.tableDefMap[key] = make(map[uint64]*cloudstorage.TableDefinition) + } + + // Read tableDef from schema file and check checksum. + var tableDef cloudstorage.TableDefinition + schemaContent, err := c.externalStorage.ReadFile(ctx, path) + if err != nil { + return errors.Trace(err) + } + err = json.Unmarshal(schemaContent, &tableDef) + if err != nil { + return errors.Trace(err) + } + checksumInMem, err := tableDef.Sum32(nil) + if err != nil { + return errors.Trace(err) + } + if checksumInMem != checksumInFile || schemaKey.TableVersion != tableDef.TableVersion { + log.Panic("checksum mismatch", + zap.Uint32("checksumInMem", checksumInMem), + zap.Uint32("checksumInFile", checksumInFile), + zap.Uint64("tableversionInMem", schemaKey.TableVersion), + zap.Uint64("tableversionInFile", tableDef.TableVersion), + zap.String("path", path)) + } + + // Update tableDefMap. + c.tableDefMap[key][tableDef.TableVersion] = &tableDef + + // Fake a dml key for schema.json file, which is useful for putting DDL + // in front of the DML files when sorting. + // e.g, for the partitioned table: + // + // test/test1/439972354120482843/schema.json (partitionNum = -1) + // test/test1/439972354120482843/55/2023-03-09/CDC000001.csv (partitionNum = 55) + // test/test1/439972354120482843/66/2023-03-09/CDC000001.csv (partitionNum = 66) + // + // and for the non-partitioned table: + // test/test2/439972354120482843/schema.json (partitionNum = -1) + // test/test2/439972354120482843/2023-03-09/CDC000001.csv (partitionNum = 0) + // test/test2/439972354120482843/2023-03-09/CDC000002.csv (partitionNum = 0) + // + // the DDL event recorded in schema.json should be executed first, then the DML events + // in csv files can be executed. + dmlkey := cloudstorage.DmlPathKey{ + SchemaPathKey: schemaKey, + PartitionNum: fakePartitionNumForSchemaFile, + Date: "", + } + if _, ok := c.tableDMLIdxMap[dmlkey]; !ok { + c.tableDMLIdxMap[dmlkey] = 0 + } else { + // duplicate table schema file found, this should not happen. + log.Panic("duplicate schema file found", + zap.String("path", path), zap.Any("tableDef", tableDef), + zap.Any("schemaKey", schemaKey), zap.Any("dmlkey", dmlkey)) + } + return nil +} + +func (c *consumer) mustGetTableDef(key cloudstorage.SchemaPathKey) cloudstorage.TableDefinition { + var tableDef *cloudstorage.TableDefinition + if tableDefs, ok := c.tableDefMap[key.GetKey()]; ok { + tableDef = tableDefs[key.TableVersion] + } + if tableDef == nil { + log.Panic("tableDef not found", zap.Any("key", key), zap.Any("tableDefMap", c.tableDefMap)) + } + return *tableDef +} + +func (c *consumer) handleNewFiles( + ctx context.Context, + dmlFileMap map[cloudstorage.DmlPathKey]fileIndexRange, +) error { + keys := make([]cloudstorage.DmlPathKey, 0, len(dmlFileMap)) + for k := range dmlFileMap { + keys = append(keys, k) + } + if len(keys) == 0 { + log.Info("no new dml files found since last round") + return nil + } + sort.Slice(keys, func(i, j int) bool { + if keys[i].TableVersion != keys[j].TableVersion { + return keys[i].TableVersion < keys[j].TableVersion + } + if keys[i].PartitionNum != keys[j].PartitionNum { + return keys[i].PartitionNum < keys[j].PartitionNum + } + if keys[i].Date != keys[j].Date { + return keys[i].Date < keys[j].Date + } + if keys[i].Schema != keys[j].Schema { + return keys[i].Schema < keys[j].Schema + } + return keys[i].Table < keys[j].Table + }) + + for _, key := range keys { + tableDef := c.mustGetTableDef(key.SchemaPathKey) + // if the key is a fake dml path key which is mainly used for + // sorting schema.json file before the dml files, then execute the ddl query. + if key.PartitionNum == fakePartitionNumForSchemaFile && + len(key.Date) == 0 && len(tableDef.Query) > 0 { + ddlEvent, err := tableDef.ToDDLEvent() + if err != nil { + return err + } + if err := c.ddlSink.WriteDDLEvent(ctx, ddlEvent); err != nil { + return errors.Trace(err) + } + // TODO: need to cleanup tableDefMap in the future. + log.Info("execute ddl event successfully", zap.String("query", tableDef.Query)) + continue + } + + fileRange := dmlFileMap[key] + for i := fileRange.start; i <= fileRange.end; i++ { + if err := c.syncExecDMLEvents(ctx, tableDef, key, i); err != nil { + return err + } + } + } + + return nil +} + +func (c *consumer) run(ctx context.Context) error { + ticker := time.NewTicker(flushInterval) + for { + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-c.errCh: + return err + case <-ticker.C: + } + + dmlFileMap, err := c.getNewFiles(ctx) + if err != nil { + return errors.Trace(err) + } + + err = c.handleNewFiles(ctx, dmlFileMap) + if err != nil { + return errors.Trace(err) + } + } +} + +// copied from kafka-consumer +type fakeTableIDGenerator struct { + tableIDs map[string]int64 + currentTableID int64 + mu sync.Mutex +} + +func (g *fakeTableIDGenerator) generateFakeTableID(schema, table string, partition int64) int64 { + g.mu.Lock() + defer g.mu.Unlock() + key := quotes.QuoteSchema(schema, table) + if partition != 0 { + key = fmt.Sprintf("%s.`%d`", key, partition) + } + if tableID, ok := g.tableIDs[key]; ok { + return tableID + } + g.currentTableID++ + g.tableIDs[key] = g.currentTableID + return g.currentTableID +} + +func main() { + var consumer *consumer + var err error + + if enableProfiling { + go func() { + server := &http.Server{ + Addr: "127.0.0.1:6060", + ReadHeaderTimeout: 5 * time.Second, + } + + if err := server.ListenAndServe(); err != nil { + log.Fatal("http pprof", zap.Error(err)) + } + }() + } + + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + deferFunc := func() int { + stop() + if consumer != nil { + consumer.sinkFactory.Close() + } + if err != nil && err != context.Canceled { + return 1 + } + return 0 + } + + consumer, err = newConsumer(ctx) + if err != nil { + log.Error("failed to create storage consumer", zap.Error(err)) + goto EXIT + } + + if err = consumer.run(ctx); err != nil { + log.Error("error occurred while running consumer", zap.Error(err)) + } + +EXIT: + os.Exit(deferFunc()) +} diff --git a/go.mod b/go.mod index 37632316b..684b00744 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/bradleyjkemp/grpc-tools v0.2.5 github.com/cenkalti/backoff/v4 v4.2.1 github.com/cockroachdb/pebble v1.1.1 + github.com/confluentinc/confluent-kafka-go/v2 v2.3.0 github.com/coreos/go-semver v0.3.1 github.com/dustin/go-humanize v1.0.1 github.com/fatih/color v1.17.0 diff --git a/go.sum b/go.sum index b2f10dfd1..dfb0db4b0 100644 --- a/go.sum +++ b/go.sum @@ -50,6 +50,8 @@ github.com/Azure/azure-sdk-for-go/sdk/internal v1.9.0 h1:H+U3Gk9zY56G3u872L82bk4 github.com/Azure/azure-sdk-for-go/sdk/internal v1.9.0/go.mod h1:mgrmMSgaLp9hmax62XQTd0N4aAqSE5E0DulSpVYK7vc= github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.0.0 h1:u/LLAOFgsMv7HmNL4Qufg58y+qElGOt5qv0z1mURkRY= github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.0.0/go.mod h1:2e8rMJtl2+2j+HXbTBwnyGpm5Nou7KhvSfxOq8JpTag= +github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8= +github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 h1:mFRzDkZVAjdal+s7s0MwaRv9igoPqLRdzOLzw/8Xvq8= github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358/go.mod h1:chxPXzSsl7ZWRAuOIE23GDNzjWuZquvFlgA8xmpunjU= github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 h1:XHOnouVk1mxXfQidrMEnLlPk9UMeRtyBTnEFtxkV0kU= @@ -74,6 +76,10 @@ github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE= github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww= github.com/Masterminds/semver v1.5.0/go.mod h1:MB6lktGJrhw8PrUyiEoblNEGEQ+RzHPF078ddwwvV3Y= +github.com/Microsoft/go-winio v0.5.2 h1:a9IhgEQBCUEk6QCdml9CiJGhAws+YwffDHEMp1VMrpA= +github.com/Microsoft/go-winio v0.5.2/go.mod h1:WpS1mjBmmwHBEWmogvA2mj8546UReBk4v8QkMxJ6pZY= +github.com/Microsoft/hcsshim v0.9.4 h1:mnUj0ivWy6UzbB1uLFqKR6F+ZyiDc7j4iGgHTpO+5+I= +github.com/Microsoft/hcsshim v0.9.4/go.mod h1:7pLA8lDk46WKDWlVsENo92gC0XFa8rbKfyFRBqxEbCc= github.com/PuerkitoBio/purell v1.1.0/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA= @@ -202,8 +208,12 @@ github.com/cockroachdb/redact v1.1.5/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZ github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 h1:zuQyyAKVxetITBuuhv3BI9cMrmStnpT18zmgmTxunpo= github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06/go.mod h1:7nc4anLGjupUW/PeY5qiNYsdNXj7zopG+eqsS7To5IQ= github.com/colinmarc/hdfs/v2 v2.1.1/go.mod h1:M3x+k8UKKmxtFu++uAZ0OtDU8jR3jnaZIAc6yK4Ue0c= +github.com/confluentinc/confluent-kafka-go/v2 v2.3.0 h1:icCHutJouWlQREayFwCc7lxDAhws08td+W3/gdqgZts= +github.com/confluentinc/confluent-kafka-go/v2 v2.3.0/go.mod h1:/VTy8iEpe6mD9pkCH5BhijlUl8ulUXymKv1Qig5Rgb8= github.com/containerd/cgroups v1.0.4 h1:jN/mbWBEaz+T1pi5OFtnkQ+8qnmEbAr1Oo1FRm5B0dA= github.com/containerd/cgroups v1.0.4/go.mod h1:nLNQtsF7Sl2HxNebu77i1R0oDlhiTG+kO4JTrUzo6IA= +github.com/containerd/containerd v1.6.8 h1:h4dOFDwzHmqFEP754PgfgTeVXFnLiRc6kiqC7tplDJs= +github.com/containerd/containerd v1.6.8/go.mod h1:By6p5KqPK0/7/CgO/A6t/Gz+CUYUu2zf1hUaaymVXB0= github.com/coocood/bbloom v0.0.0-20190830030839-58deb6228d64 h1:W1SHiII3e0jVwvaQFglwu3kS9NLxOeTpvik7MbKCyuQ= github.com/coocood/bbloom v0.0.0-20190830030839-58deb6228d64/go.mod h1:F86k/6c7aDUdwSUevnLpHS/3Q9hzYCE99jGk2xsHnt0= github.com/coocood/freecache v1.2.1 h1:/v1CqMq45NFH9mp/Pt142reundeBM0dVUD3osQBeu/U= @@ -244,6 +254,12 @@ github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 h1:fAjc9m62+UWV/WA github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dimfeld/httptreemux v5.0.1+incompatible h1:Qj3gVcDNoOthBAqftuD596rm4wg/adLLz5xh5CmpiCA= github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0= +github.com/docker/distribution v2.8.1+incompatible h1:Q50tZOPR6T/hjNsyc9g8/syEs6bk8XXApsHjKukMl68= +github.com/docker/distribution v2.8.1+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w= +github.com/docker/docker v20.10.17+incompatible h1:JYCuMrWaVNophQTOrMMoSwudOVEfcegoZZrleKc1xwE= +github.com/docker/docker v20.10.17+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= +github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ= +github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= @@ -647,6 +663,8 @@ github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2 github.com/lufia/plan9stats v0.0.0-20230326075908-cb1d2100619a h1:N9zuLhTvBSRt0gWSiJswwQ2HqDmtX/ZCDJURnKUt1Ik= github.com/lufia/plan9stats v0.0.0-20230326075908-cb1d2100619a/go.mod h1:JKx41uQRwqlTZabZc+kILPrO/3jlKnQ2Z8b7YiVw5cE= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= +github.com/magiconair/properties v1.8.6 h1:5ibWZ6iY0NctNGWo87LalDlEZ6R41TqbbDamhfG/Qzo= +github.com/magiconair/properties v1.8.6/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPKd5NZ3oSwXrF60= github.com/mailru/easyjson v0.0.0-20180823135443-60711f1a8329/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= @@ -676,12 +694,20 @@ github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8D github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= +github.com/moby/sys/mount v0.3.3 h1:fX1SVkXFJ47XWDoeFW4Sq7PdQJnV2QIDZAqjNqgEjUs= +github.com/moby/sys/mount v0.3.3/go.mod h1:PBaEorSNTLG5t/+4EgukEQVlAvVEc6ZjTySwKdqp5K0= +github.com/moby/sys/mountinfo v0.6.2 h1:BzJjoreD5BMFNmD9Rus6gdd1pLuecOFPt8wC+Vygl78= +github.com/moby/sys/mountinfo v0.6.2/go.mod h1:IJb6JQeOklcdMU9F5xQ8ZALD+CUr5VlGpwtX+VE0rpI= +github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 h1:dcztxKSvZ4Id8iPpHERQBbIJfabdt4wUm5qy3wOL2Zc= +github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6/go.mod h1:E2VnQOmVuvZB6UYnnDB0qG5Nq/1tD9acaOpo6xmt0Kw= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= +github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= github.com/mtibben/percent v0.2.1 h1:5gssi8Nqo8QU/r2pynCm+hBQHpkB/uNK7BJCFogWdzs= github.com/mtibben/percent v0.2.1/go.mod h1:KG9uO+SZkUp+VkRHsCdYQV3XSZrrSpR3O9ibNBTZrns= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= @@ -702,6 +728,12 @@ github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= github.com/onsi/gomega v1.20.1 h1:PA/3qinGoukvymdIDV8pii6tiZgC8kbmJO6Z5+b002Q= github.com/onsi/gomega v1.20.1/go.mod h1:DtrZpjmvpn2mPm4YWQa0/ALMDj9v4YxLgojwPeREyVo= +github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= +github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= +github.com/opencontainers/image-spec v1.0.3-0.20211202183452-c5a74bcca799 h1:rc3tiVYb5z54aKaDfakKn0dDjIyPpTtszkjuMzyt7ec= +github.com/opencontainers/image-spec v1.0.3-0.20211202183452-c5a74bcca799/go.mod h1:BtxoFyWECRxE4U/7sNtV5W15zMzWCbyJoFRP3s7yZA0= +github.com/opencontainers/runc v1.1.3 h1:vIXrkId+0/J2Ymu2m7VjGvbSlAId9XNRPhn2p4b+d8w= +github.com/opencontainers/runc v1.1.3/go.mod h1:1J5XiS+vdZ3wCyZybsuxXZWGrgSr8fFJHLXuG2PsnNg= github.com/opencontainers/runtime-spec v1.0.2 h1:UfAcuLBJB9Coz72x1hgl8O5RVzTdNiaglX6v2DM6FI0= github.com/opencontainers/runtime-spec v1.0.2/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0= github.com/opentracing/basictracer-go v1.1.0 h1:Oa1fTSBvAl8pa3U+IJYqrKm0NALwH9OsgwOqDv4xJW0= @@ -911,6 +943,8 @@ github.com/swaggo/gin-swagger v1.2.0/go.mod h1:qlH2+W7zXGZkczuL+r2nEBR2JTT+/lX05 github.com/swaggo/swag v1.5.1/go.mod h1:1Bl9F/ZBpVWh22nY0zmYyASPO1lI/zIwRDrpZU+tv8Y= github.com/swaggo/swag v1.16.3 h1:PnCYjPCah8FK4I26l2F/KQ4yz3sILcVUN3cTlBFA9Pg= github.com/swaggo/swag v1.16.3/go.mod h1:DImHIuOFXKpMFAQjcC7FG4m3Dg4+QuUgUzJmKjI/gRk= +github.com/testcontainers/testcontainers-go v0.14.0 h1:h0D5GaYG9mhOWr2qHdEKDXpkce/VlvaYOCzTRi6UBi8= +github.com/testcontainers/testcontainers-go v0.14.0/go.mod h1:hSRGJ1G8Q5Bw2gXgPulJOLlEBaYJHeBSOkQM5JLG+JQ= github.com/thanhpk/randstr v1.0.6 h1:psAOktJFD4vV9NEVb3qkhRSMvYh4ORRaj1+w/hn4B+o= github.com/thanhpk/randstr v1.0.6/go.mod h1:M/H2P1eNLZzlDwAzpkkkUvoyNNMbzRGhESZuEQk3r0U= github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ=