Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add more consumer related code #1003

Open
wants to merge 38 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
c0a4428
remove useless method call
3AceShowHand Feb 14, 2025
73df66a
add canal-json decoder
3AceShowHand Feb 17, 2025
48bfc93
try to add open protocol decoder
3AceShowHand Feb 17, 2025
b8becad
try to fix canal-json
3AceShowHand Feb 17, 2025
0a6c2b9
move java.go to canal java
3AceShowHand Feb 17, 2025
ea1baac
add canal-json txn decoder
3AceShowHand Feb 17, 2025
997f5cd
Review canal-json encoder
3AceShowHand Feb 17, 2025
67e1da9
rename file name
3AceShowHand Feb 17, 2025
19a8f48
refactor the import
3AceShowHand Feb 17, 2025
994a64c
refactor the open
3AceShowHand Feb 17, 2025
6aa6632
remove unused method
3AceShowHand Feb 17, 2025
5a5f4f8
fix open decoder
3AceShowHand Feb 17, 2025
4f7747a
remove method
3AceShowHand Feb 17, 2025
af5a13d
add 2 method
3AceShowHand Feb 17, 2025
fc350bb
fix consumer main.
3AceShowHand Feb 17, 2025
c7f9eee
fix more in the consumer
3AceShowHand Feb 17, 2025
7dcd261
fix more in the consumer
3AceShowHand Feb 17, 2025
1f2793d
Merge branch 'master' into kafka-consumer
3AceShowHand Feb 18, 2025
85bc54a
remove useless import
3AceShowHand Feb 18, 2025
413bda1
fix incorrect import on the message type
3AceShowHand Feb 18, 2025
f0f439c
disable some functionality temporarily
3AceShowHand Feb 18, 2025
c2dfc7d
fix consumer
3AceShowHand Feb 18, 2025
bc61483
get rid of the tiflow mysql sink
3AceShowHand Feb 18, 2025
ec1a96a
fix consumer option
3AceShowHand Feb 18, 2025
b3348a6
fix consumer option
3AceShowHand Feb 18, 2025
f5af712
adjust consumer
3AceShowHand Feb 18, 2025
376dcaa
flush
3AceShowHand Feb 18, 2025
759168d
adjust decoder ddl event signature
3AceShowHand Feb 18, 2025
9157353
adjust consumer handle ddl
3AceShowHand Feb 18, 2025
ef882f7
adjust use dml interface
3AceShowHand Feb 18, 2025
2fe113d
adjust decoder new the ddl event
3AceShowHand Feb 18, 2025
c43626e
try to fix the table info
3AceShowHand Feb 18, 2025
d955184
remove schema id
3AceShowHand Feb 18, 2025
3fc434d
try to set the table info
3AceShowHand Feb 18, 2025
cd2dc6f
try to set the table info
3AceShowHand Feb 18, 2025
191a8e8
try to new the table info by tidb table info
3AceShowHand Feb 18, 2025
7ca29e8
Merge branch 'master' into kafka-consumer
3AceShowHand Feb 19, 2025
30bb9cf
first version new the table info
3AceShowHand Feb 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions cmd/cdc/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@ import (
"strings"

"github.com/fatih/color"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cmd/util"
"github.com/pingcap/ticdc/pkg/config"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/logger"
"github.com/pingcap/ticdc/server"
"github.com/pingcap/ticdc/version"
Expand Down Expand Up @@ -99,7 +98,7 @@ func (o *options) run(cmd *cobra.Command) error {

cdcversion.ReleaseVersion = version.ReleaseVersion
version.LogVersionInfo("Change Data Capture (CDC)")
log.Info("The TiCDC release version is", zap.String("ReleaseVersion", cdcversion.ReleaseVersion))
log.Info("The TiCDC release version", zap.String("ReleaseVersion", cdcversion.ReleaseVersion))

util.LogHTTPProxies()
svr, err := server.New(o.serverConfig, o.pdEndpoints)
Expand All @@ -111,7 +110,7 @@ func (o *options) run(cmd *cobra.Command) error {
zap.Strings("pd", o.pdEndpoints), zap.Stringer("config", o.serverConfig))

err = svr.Run(ctx)
if err != nil && errors.Cause(err) != context.Canceled {
if err != nil && !errors.Is(errors.Cause(err), context.Canceled) {
log.Warn("cdc server exits with error", zap.Error(err))
} else {
log.Info("cdc server exits normally")
Expand Down Expand Up @@ -182,13 +181,13 @@ func (o *options) complete(command *cobra.Command) error {
// validate checks that the provided attach options are specified.
func (o *options) validate() error {
if len(o.pdEndpoints) == 0 {
return cerror.ErrInvalidServerOption.GenWithStack("empty PD address")
return errors.ErrInvalidServerOption.GenWithStack("empty PD address")
}
for _, ep := range o.pdEndpoints {
// NOTICE: The configuration used here is the one that has been completed,
// as it may be configured by the configuration file.
if err := util.VerifyPdEndpoint(ep, o.serverConfig.Security.IsTLSEnabled()); err != nil {
return cerror.WrapError(cerror.ErrInvalidServerOption, err)
return errors.WrapError(errors.ErrInvalidServerOption, err)
}
}
return nil
Expand Down
29 changes: 21 additions & 8 deletions cmd/kafka-consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/ticdc/pkg/errors"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/sync/errgroup"
)

func getPartitionNum(o *option) (int32, error) {
Expand All @@ -42,7 +43,7 @@ func getPartitionNum(o *option) (int32, error) {
defer admin.Close()

timeout := 3000
for i := 0; i <= o.retryTime; i++ {
for i := 0; i <= 30; i++ {
resp, err := admin.GetMetadata(&o.topic, false, timeout)
if err != nil {
if err.(kafka.Error).Code() == kafka.ErrTransport {
Expand Down Expand Up @@ -100,8 +101,10 @@ func newConsumer(ctx context.Context, o *option) *consumer {
_ = configMap.SetKey("ssl.key.location", o.key)
_ = configMap.SetKey("ssl.certificate.location", o.cert)
}
if level, err := zapcore.ParseLevel(o.logLevel); err == nil && level.String() == "debug" {
configMap.SetKey("debug", "all")
if level, err := zapcore.ParseLevel(logLevel); err == nil && level == zapcore.DebugLevel {
if err = configMap.SetKey("debug", "all"); err != nil {
log.Error("set kafka debug log failed", zap.Error(err))
}
}
client, err := kafka.NewConsumer(configMap)
if err != nil {
Expand All @@ -117,18 +120,17 @@ func newConsumer(ctx context.Context, o *option) *consumer {
}
}

// Consume will read message from Kafka.
func (c *consumer) Consume(ctx context.Context) {
func (c *consumer) readMessage(ctx context.Context) error {
defer func() {
if err := c.client.Close(); err != nil {
log.Panic("close kafka consumer failed", zap.Error(err))
log.Warn("close kafka consumer failed", zap.Error(err))
}
}()
for {
select {
case <-ctx.Done():
log.Info("consumer exist: context cancelled")
return
return errors.Trace(ctx.Err())
default:
}
msg, err := c.client.ReadMessage(-1)
Expand All @@ -140,7 +142,6 @@ func (c *consumer) Consume(ctx context.Context) {
if !needCommit {
continue
}

topicPartition, err := c.client.CommitMessage(msg)
if err != nil {
log.Error("commit message failed, just continue",
Expand All @@ -153,3 +154,15 @@ func (c *consumer) Consume(ctx context.Context) {
zap.Any("offset", topicPartition[0].Offset))
}
}

// Consume will read message from Kafka.
func (c *consumer) Consume(ctx context.Context) error {
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
return c.writer.run(ctx)
})
g.Go(func() error {
return c.readMessage(ctx)
})
return g.Wait()
}
14 changes: 7 additions & 7 deletions cmd/kafka-consumer/event_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"go.uber.org/zap"
)

Expand All @@ -27,7 +27,7 @@ type eventsGroup struct {
partition int32
tableID int64

events []*model.RowChangedEvent
events []*commonEvent.DMLEvent
highWatermark uint64
}

Expand All @@ -36,12 +36,12 @@ func NewEventsGroup(partition int32, tableID int64) *eventsGroup {
return &eventsGroup{
partition: partition,
tableID: tableID,
events: make([]*model.RowChangedEvent, 0, 1024),
events: make([]*commonEvent.DMLEvent, 0, 1024),
}
}

// Append will append an event to event groups.
func (g *eventsGroup) Append(row *model.RowChangedEvent, offset kafka.Offset) {
func (g *eventsGroup) Append(row *commonEvent.DMLEvent, offset kafka.Offset) {
g.events = append(g.events, row)
if row.CommitTs > g.highWatermark {
g.highWatermark = row.CommitTs
Expand All @@ -53,12 +53,12 @@ func (g *eventsGroup) Append(row *model.RowChangedEvent, offset kafka.Offset) {
zap.Uint64("highWatermark", g.highWatermark),
zap.Int64("tableID", row.GetTableID()),
zap.String("schema", row.TableInfo.GetSchemaName()),
zap.String("table", row.TableInfo.GetTableName()),
zap.Any("columns", row.Columns), zap.Any("preColumns", row.PreColumns))
zap.String("table", row.TableInfo.GetTableName()))
// zap.Any("columns", row.Columns), zap.Any("preColumns", row.PreColumns))
}

// Resolve will get events where CommitTs is less than resolveTs.
func (g *eventsGroup) Resolve(resolve uint64) []*model.RowChangedEvent {
func (g *eventsGroup) Resolve(resolve uint64) []*commonEvent.DMLEvent {
i := sort.Search(len(g.events), func(i int) bool {
return g.events[i].CommitTs > resolve
})
Expand Down
97 changes: 48 additions & 49 deletions cmd/kafka-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,93 +19,92 @@ import (
"fmt"
"net/http"
_ "net/http/pprof"
"net/url"
"os"
"os/signal"
"strings"
"sync"
"syscall"

"github.com/google/uuid"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/pkg/logutil"
"github.com/pingcap/tiflow/pkg/version"
"github.com/pingcap/ticdc/pkg/logger"
"github.com/pingcap/ticdc/version"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

var (
logPath string
logLevel string
)

func main() {
var (
upstreamURIStr string
configFile string
upstreamURIStr string
configFile string
enableProfiling bool
)
groupID := fmt.Sprintf("ticdc_kafka_consumer_%s", uuid.New().String())
consumerOption := newOption()

flag.StringVar(&configFile, "config", "", "config file for changefeed")
flag.StringVar(&upstreamURIStr, "upstream-uri", "", "Kafka uri")
flag.StringVar(&logPath, "log-file", "cdc_kafka_consumer.log", "log file path")
flag.StringVar(&logLevel, "log-level", "info", "log file path")
flag.BoolVar(&enableProfiling, "enable-profiling", false, "enable pprof profiling")

flag.StringVar(&consumerOption.downstreamURI, "downstream-uri", "", "downstream sink uri")
flag.StringVar(&consumerOption.schemaRegistryURI, "schema-registry-uri", "", "schema registry uri")
flag.StringVar(&consumerOption.upstreamTiDBDSN, "upstream-tidb-dsn", "", "upstream TiDB DSN")
flag.StringVar(&consumerOption.groupID, "consumer-group-id", groupID, "consumer group id")
flag.StringVar(&consumerOption.logPath, "log-file", "cdc_kafka_consumer.log", "log file path")
flag.StringVar(&consumerOption.logLevel, "log-level", "info", "log file path")
flag.StringVar(&consumerOption.timezone, "tz", "System", "Specify time zone of Kafka consumer")
flag.StringVar(&consumerOption.ca, "ca", "", "CA certificate path for Kafka SSL connection")
flag.StringVar(&consumerOption.cert, "cert", "", "Certificate path for Kafka SSL connection")
flag.StringVar(&consumerOption.key, "key", "", "Private key path for Kafka SSL connection")
flag.BoolVar(&consumerOption.enableProfiling, "enable-profiling", false, "enable pprof profiling")
flag.Parse()

err := logutil.InitLogger(&logutil.Config{
Level: consumerOption.logLevel,
File: consumerOption.logPath,
err := logger.InitLogger(&logger.Config{
Level: logLevel,
File: logPath,
})
if err != nil {
log.Panic("init logger failed", zap.Error(err))
}
version.LogVersionInfo("kafka consumer")

upstreamURI, err := url.Parse(upstreamURIStr)
if err != nil {
log.Panic("invalid upstream-uri", zap.Error(err))
}
scheme := strings.ToLower(upstreamURI.Scheme)
if scheme != "kafka" {
log.Panic("invalid upstream-uri scheme, the scheme of upstream-uri must be `kafka`",
zap.String("upstreamURI", upstreamURIStr))
}

err = consumerOption.Adjust(upstreamURI, configFile)
err = consumerOption.Adjust(upstreamURIStr, configFile)
if err != nil {
log.Panic("adjust consumer option failed", zap.Error(err))
}

ctx, cancel := context.WithCancel(context.Background())
consumer := newConsumer(ctx, consumerOption)
var wg sync.WaitGroup
if consumerOption.enableProfiling {
log.Info("profiling is enabled")
wg.Add(1)
go func() {
defer wg.Done()
if err = http.ListenAndServe("127.0.0.1:6060", nil); err != nil {
log.Panic("cannot start the pprof", zap.Error(err))
}
}()
g, ctx := errgroup.WithContext(ctx)

if enableProfiling {
g.Go(func() error {
return http.ListenAndServe(":6060", nil)
})
}
wg.Add(1)
go func() {
defer wg.Done()
consumer.Consume(ctx)
}()

sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case <-ctx.Done():
log.Info("terminating: context cancelled")
case <-sigterm:
log.Info("terminating: via signal")
consumer := newConsumer(ctx, consumerOption)
g.Go(func() error {
return consumer.Consume(ctx)
})

g.Go(func() error {
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case <-ctx.Done():
log.Info("terminating: context cancelled")
case <-sigterm:
log.Info("terminating: via signal")
}
cancel()
return nil
})
err = g.Wait()
if err != nil {
log.Error("kafka consumer exited with error", zap.Error(err))
} else {
log.Info("kafka consumer exited")
}
cancel()
wg.Wait()
}
Loading
Loading