Skip to content

Commit

Permalink
adjust consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Feb 18, 2025
1 parent b3348a6 commit f5af712
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 33 deletions.
21 changes: 16 additions & 5 deletions cmd/kafka-consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/ticdc/pkg/errors"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/sync/errgroup"
)

func getPartitionNum(o *option) (int32, error) {
Expand Down Expand Up @@ -119,18 +120,17 @@ func newConsumer(ctx context.Context, o *option) *consumer {
}
}

// Consume will read message from Kafka.
func (c *consumer) Consume(ctx context.Context) {
func (c *consumer) readMessage(ctx context.Context) error {
defer func() {
if err := c.client.Close(); err != nil {
log.Panic("close kafka consumer failed", zap.Error(err))
log.Warn("close kafka consumer failed", zap.Error(err))
}
}()
for {
select {
case <-ctx.Done():
log.Info("consumer exist: context cancelled")
return
return errors.Trace(ctx.Err())
default:
}
msg, err := c.client.ReadMessage(-1)
Expand All @@ -142,7 +142,6 @@ func (c *consumer) Consume(ctx context.Context) {
if !needCommit {
continue
}

topicPartition, err := c.client.CommitMessage(msg)
if err != nil {
log.Error("commit message failed, just continue",
Expand All @@ -155,3 +154,15 @@ func (c *consumer) Consume(ctx context.Context) {
zap.Any("offset", topicPartition[0].Offset))
}
}

// Consume will read message from Kafka.
func (c *consumer) Consume(ctx context.Context) error {
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
return c.writer.run(ctx)
})
g.Go(func() error {
return c.readMessage(ctx)
})
return g.Wait()
}
3 changes: 1 addition & 2 deletions cmd/kafka-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,7 @@ func main() {

consumer := newConsumer(ctx, consumerOption)
g.Go(func() error {
consumer.Consume(ctx)
return nil
return consumer.Consume(ctx)
})

g.Go(func() error {
Expand Down
30 changes: 4 additions & 26 deletions cmd/kafka-consumer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,35 +134,13 @@ func newWriter(ctx context.Context, o *option) *writer {
log.Panic("cannot create the mysql sink", zap.Error(err))
}
w.mysqlSink = mysqlSink

// config.GetGlobalServerConfig().TZ = o.timezone
// errChan := make(chan error, 1)

//// todo: use local mysql sink, instead of the tiflow sink.
//tiflowReplicaConfig := tiflowConfig.GetDefaultReplicaConfig()
//f, err := eventsinkfactory.New(ctx, changefeed, o.downstreamURI, tiflowReplicaConfig, errChan, nil)
//if err != nil {
// log.Panic("cannot create the event sink factory", zap.Error(err))
//}
//w.sinkFactory = f

//go func() {
// err := <-errChan
// if !errors.Is(errors.Cause(err), context.Canceled) {
// log.Error("error on running consumer", zap.Error(err))
// } else {
// log.Info("consumer exited")
// }
//}()

//ddlSink, err := ddlsinkfactory.New(ctx, changefeed, o.downstreamURI, tiflowReplicaConfig)
//if err != nil {
// log.Panic("cannot create the ddl sink factory", zap.Error(err))
//}
//w.ddlSink = ddlSink
return w
}

func (w *writer) run(ctx context.Context) error {
return w.mysqlSink.Run(ctx)
}

// append DDL wait to be handled, only consider the constraint among DDLs.
// for DDL a / b received in the order, a.CommitTs < b.CommitTs should be true.
func (w *writer) appendDDL(ddl *model.DDLEvent, offset kafka.Offset) {
Expand Down

0 comments on commit f5af712

Please sign in to comment.