diff --git a/downstreamadapter/dispatcher/table_progress.go b/downstreamadapter/dispatcher/table_progress.go index 6c132c57f..9f9972bd2 100644 --- a/downstreamadapter/dispatcher/table_progress.go +++ b/downstreamadapter/dispatcher/table_progress.go @@ -132,5 +132,11 @@ func (p *TableProgress) GetEventSizePerSecond() float32 { p.cumulateEventSize = 0 p.lastQueryTime = time.Now() + if eventSizePerSecond == 0 { + // The event size will only send to maintainer once per second. + // So if no data is write, we use a tiny value instead of 0 to distinguish it from the status without eventSize + return 0.1 + } + return eventSizePerSecond } diff --git a/maintainer/replica/checker.go b/maintainer/replica/checker.go index d1c48a676..76c67e67a 100644 --- a/maintainer/replica/checker.go +++ b/maintainer/replica/checker.go @@ -47,7 +47,7 @@ const ( clearTimeout = 300 // seconds ) -var MinSpanNumberCoefficient = 2 +var MinSpanNumberCoefficient = 32 // // This number is twice the default worker count of the MySQL sink. It can help evenly split and dispatch high - throughput tables. type CheckResult struct { OpType OpType @@ -127,7 +127,9 @@ func (s *hotSpanChecker) UpdateStatus(span *SpanReplication) { return } - if status.EventSizePerSecond < s.writeThreshold { + log.Debug("hotSpanChecker EventSizePerSecond", zap.Any("span", span.Span), zap.Any("dispatcher", span.ID), zap.Any("EventSizePerSecond", status.EventSizePerSecond), zap.Any("writeThreshold", s.writeThreshold)) + + if status.EventSizePerSecond != 0 && status.EventSizePerSecond < s.writeThreshold { if hotSpan, ok := s.hotTasks[span.ID]; ok { hotSpan.score-- if hotSpan.score == 0 { @@ -154,6 +156,7 @@ func (s *hotSpanChecker) Check(batchSize int) replica.GroupCheckResult { cache := make([]CheckResult, 0) for _, hotSpan := range s.hotTasks { + log.Debug("hot span", zap.String("changefeed", s.changefeedID.Name()), zap.String("span", hotSpan.ID.String()), zap.Int("score", hotSpan.score), zap.Int("scoreThreshold", s.scoreThreshold)) if time.Since(hotSpan.lastUpdateTime) > clearTimeout*time.Second { // should not happen log.Panic("remove hot span since it is outdated", diff --git a/maintainer/split/region_count_splitter.go b/maintainer/split/region_count_splitter.go index f18de5a83..6326a853c 100644 --- a/maintainer/split/region_count_splitter.go +++ b/maintainer/split/region_count_splitter.go @@ -154,7 +154,7 @@ func newEvenlySplitStepper(pages int, totalRegion int) evenlySplitStepper { extraRegionPerSpan: extraRegionPerSpan, remain: remain, } - log.Info("evenly split stepper", zap.Any("evenlySplitStepper", res)) + log.Info("evenly split stepper", zap.Any("regionPerSpan", regionPerSpan), zap.Any("spanCount", pages), zap.Any("extraRegionPerSpan", extraRegionPerSpan)) return res } diff --git a/maintainer/split/region_count_splitter_test.go b/maintainer/split/region_count_splitter_test.go index ad1bbe277..8102464b9 100644 --- a/maintainer/split/region_count_splitter_test.go +++ b/maintainer/split/region_count_splitter_test.go @@ -29,7 +29,13 @@ import ( ) func TestRegionCountSplitSpan(t *testing.T) { - t.Parallel() + // t.Parallel() + + oldBaseSpanNumberCoefficient := baseSpanNumberCoefficient + baseSpanNumberCoefficient = 3 + defer func() { + baseSpanNumberCoefficient = oldBaseSpanNumberCoefficient + }() cache := NewMockRegionCache(nil) cache.regions.ReplaceOrInsert(tablepb.Span{StartKey: []byte("t1_0"), EndKey: []byte("t1_1")}, 1) @@ -134,7 +140,13 @@ func TestRegionCountSplitSpan(t *testing.T) { } func TestRegionCountEvenlySplitSpan(t *testing.T) { - t.Parallel() + // t.Parallel() + + oldBaseSpanNumberCoefficient := baseSpanNumberCoefficient + baseSpanNumberCoefficient = 3 + defer func() { + baseSpanNumberCoefficient = oldBaseSpanNumberCoefficient + }() cache := NewMockRegionCache(nil) totalRegion := 1000 diff --git a/maintainer/split/write_bytes_splitter.go b/maintainer/split/write_bytes_splitter.go index 11671fe0f..4f8027add 100644 --- a/maintainer/split/write_bytes_splitter.go +++ b/maintainer/split/write_bytes_splitter.go @@ -83,6 +83,10 @@ func (m *writeSplitter) split( zap.String("namespace", m.changefeedID.Namespace()), zap.String("changefeed", m.changefeedID.Name()), zap.String("span", span.String()), + zap.Int("captureNum", captureNum), + zap.Int("expectedSpanNum", expectedSpanNum), + zap.Int("DefaultMaxSpanNumber", DefaultMaxSpanNumber), + zap.Int("regionsLen", len(regions)), zap.Error(err)) return []*heartbeatpb.TableSpan{span} } @@ -139,6 +143,7 @@ func (m *writeSplitter) splitRegionsByWrittenKeysV1( // 1. If the total write is less than writeKeyThreshold // don't need to split the regions if totalWrite < uint64(m.writeKeyThreshold) { + log.Info("total write less than writeKeyThreshold, skip split", zap.Any("totalWrite", totalWrite), zap.Any("writeKeyThreshold", m.writeKeyThreshold)) return &splitRegionsInfo{ RegionCounts: []int{len(regions)}, Weights: []uint64{totalWriteNormalized}, diff --git a/maintainer/split/write_bytes_splitter_test.go b/maintainer/split/write_bytes_splitter_test.go index b089edb1a..f205629be 100644 --- a/maintainer/split/write_bytes_splitter_test.go +++ b/maintainer/split/write_bytes_splitter_test.go @@ -169,7 +169,12 @@ func TestSplitRegionsByWrittenKeysHotspot2(t *testing.T) { } func TestSplitRegionsByWrittenKeysCold(t *testing.T) { - t.Parallel() + // t.Parallel() + oldBaseSpanNumberCoefficient := baseSpanNumberCoefficient + baseSpanNumberCoefficient = 3 + defer func() { + baseSpanNumberCoefficient = oldBaseSpanNumberCoefficient + }() re := require.New(t) cfID := common.NewChangeFeedIDWithName("test") splitter := newWriteSplitter(cfID, nil, 0) @@ -195,7 +200,12 @@ func TestSplitRegionsByWrittenKeysCold(t *testing.T) { } func TestNotSplitRegionsByWrittenKeysCold(t *testing.T) { - t.Parallel() + // t.Parallel() + oldBaseSpanNumberCoefficient := baseSpanNumberCoefficient + baseSpanNumberCoefficient = 3 + defer func() { + baseSpanNumberCoefficient = oldBaseSpanNumberCoefficient + }() re := require.New(t) cfID := common.NewChangeFeedIDWithName("test") splitter := newWriteSplitter(cfID, nil, 1)