From 0eb416de42c648b746f1c0da84df62ebbbd02108 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Thu, 9 Jan 2025 11:21:58 +0800 Subject: [PATCH] puller: close kvclient correctly when stopping a processor (#11957) (#11982) close pingcap/tiflow#11954 --- cdc/processor/processor.go | 6 +++++- cdc/processor/sourcemanager/manager.go | 6 ++++-- cdc/puller/ddl_puller.go | 19 ++++++++++++++++--- cdc/puller/multiplexing_puller.go | 10 ++++++++++ 4 files changed, 35 insertions(+), 6 deletions(-) diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index df26eaeb7bb..602f660c4ee 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -1084,4 +1084,8 @@ func (d *ddlHandler) Run(ctx context.Context, _ ...chan<- error) error { func (d *ddlHandler) WaitForReady(_ context.Context) {} -func (d *ddlHandler) Close() {} +func (d *ddlHandler) Close() { + if d.puller != nil { + d.puller.Close() + } +} diff --git a/cdc/processor/sourcemanager/manager.go b/cdc/processor/sourcemanager/manager.go index a47dde6554d..84f7a2e2479 100644 --- a/cdc/processor/sourcemanager/manager.go +++ b/cdc/processor/sourcemanager/manager.go @@ -258,8 +258,10 @@ func (m *SourceManager) Close() { zap.String("changefeed", m.changefeedID.ID)) start := time.Now() - - log.Info("All pullers have been closed", + if m.puller != nil { + m.puller.Close() + } + log.Info("SourceManager puller have been closed", zap.String("namespace", m.changefeedID.Namespace), zap.String("changefeed", m.changefeedID.ID), zap.Duration("cost", time.Since(start))) diff --git a/cdc/puller/ddl_puller.go b/cdc/puller/ddl_puller.go index 79470325833..b5a982b3a41 100644 --- a/cdc/puller/ddl_puller.go +++ b/cdc/puller/ddl_puller.go @@ -164,7 +164,11 @@ func (p *ddlJobPullerImpl) Run(ctx context.Context, _ ...chan<- error) error { func (p *ddlJobPullerImpl) WaitForReady(_ context.Context) {} // Close implements util.Runnable. -func (p *ddlJobPullerImpl) Close() {} +func (p *ddlJobPullerImpl) Close() { + if p.mp != nil { + p.mp.Close() + } +} // Output implements DDLJobPuller, it returns the output channel of DDL job. func (p *ddlJobPullerImpl) Output() <-chan *model.DDLJobEntry { @@ -736,6 +740,12 @@ func (h *ddlPullerImpl) Run(ctx context.Context) error { zap.String("changefeed", h.changefeedID.ID), zap.Uint64("resolvedTS", atomic.LoadUint64(&h.resolvedTS))) + defer func() { + log.Info("DDL puller stopped", + zap.String("namespace", h.changefeedID.Namespace), + zap.String("changefeed", h.changefeedID.ID)) + }() + return g.Wait() } @@ -753,10 +763,13 @@ func (h *ddlPullerImpl) PopFrontDDL() (uint64, *timodel.Job) { // Close the ddl puller, release all resources. func (h *ddlPullerImpl) Close() { - log.Info("close the ddl puller", + h.cancel() + if h.ddlJobPuller != nil { + h.ddlJobPuller.Close() + } + log.Info("DDL puller closed", zap.String("namespace", h.changefeedID.Namespace), zap.String("changefeed", h.changefeedID.ID)) - h.cancel() } func (h *ddlPullerImpl) ResolvedTs() uint64 { diff --git a/cdc/puller/multiplexing_puller.go b/cdc/puller/multiplexing_puller.go index a7821a04b95..c6ee56d074a 100644 --- a/cdc/puller/multiplexing_puller.go +++ b/cdc/puller/multiplexing_puller.go @@ -361,6 +361,16 @@ func (p *MultiplexingPuller) run(ctx context.Context, includeClient bool) error return eg.Wait() } +// Close closes the puller. +func (p *MultiplexingPuller) Close() { + if p.client != nil { + p.client.Close() + } + log.Info("MultiplexingPuller is closed", + zap.String("namespace", p.changefeed.Namespace), + zap.String("changefeed", p.changefeed.ID)) +} + // runEventHandler consumes events from inputCh: // 1. If the event is a kv event, consume by calling progress.consume.f. // 2. If the event is a resolved event, send it to the resolvedEventsCache of the corresponding progress.