Skip to content

Commit

Permalink
flush
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Feb 18, 2025
1 parent f5af712 commit 376dcaa
Showing 1 changed file with 2 additions and 2 deletions.
4 changes: 2 additions & 2 deletions cmd/kafka-consumer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (w *writer) forEachPartition(fn func(p *partitionProgress)) {
}

// Write will synchronously write data downstream
func (w *writer) Write(ctx context.Context, messageType common.MessageType) bool {
func (w *writer) flush(ctx context.Context, messageType common.MessageType) bool {
watermark := w.getMinWatermark()
var todoDDL *model.DDLEvent
for {
Expand Down Expand Up @@ -361,7 +361,7 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool
return false
}
// flush when received DDL event or resolvedTs
return w.Write(ctx, messageType)
return w.flush(ctx, messageType)
}

func (w *writer) resolveRowChangedEvents(progress *partitionProgress, newWatermark uint64) {
Expand Down

0 comments on commit 376dcaa

Please sign in to comment.