Skip to content

Commit

Permalink
split-table: make split-table more useful (#992)
Browse files Browse the repository at this point in the history
  • Loading branch information
hongyunyan authored Feb 15, 2025
1 parent d1ec9a8 commit 7af8164
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 7 deletions.
6 changes: 6 additions & 0 deletions downstreamadapter/dispatcher/table_progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,5 +132,11 @@ func (p *TableProgress) GetEventSizePerSecond() float32 {
p.cumulateEventSize = 0
p.lastQueryTime = time.Now()

if eventSizePerSecond == 0 {
// The event size will only send to maintainer once per second.
// So if no data is write, we use a tiny value instead of 0 to distinguish it from the status without eventSize
return 0.1
}

return eventSizePerSecond
}
7 changes: 5 additions & 2 deletions maintainer/replica/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ const (
clearTimeout = 300 // seconds
)

var MinSpanNumberCoefficient = 2
var MinSpanNumberCoefficient = 32 // // This number is twice the default worker count of the MySQL sink. It can help evenly split and dispatch high - throughput tables.

type CheckResult struct {
OpType OpType
Expand Down Expand Up @@ -127,7 +127,9 @@ func (s *hotSpanChecker) UpdateStatus(span *SpanReplication) {
return
}

if status.EventSizePerSecond < s.writeThreshold {
log.Debug("hotSpanChecker EventSizePerSecond", zap.Any("span", span.Span), zap.Any("dispatcher", span.ID), zap.Any("EventSizePerSecond", status.EventSizePerSecond), zap.Any("writeThreshold", s.writeThreshold))

if status.EventSizePerSecond != 0 && status.EventSizePerSecond < s.writeThreshold {
if hotSpan, ok := s.hotTasks[span.ID]; ok {
hotSpan.score--
if hotSpan.score == 0 {
Expand All @@ -154,6 +156,7 @@ func (s *hotSpanChecker) Check(batchSize int) replica.GroupCheckResult {
cache := make([]CheckResult, 0)

for _, hotSpan := range s.hotTasks {
log.Debug("hot span", zap.String("changefeed", s.changefeedID.Name()), zap.String("span", hotSpan.ID.String()), zap.Int("score", hotSpan.score), zap.Int("scoreThreshold", s.scoreThreshold))
if time.Since(hotSpan.lastUpdateTime) > clearTimeout*time.Second {
// should not happen
log.Panic("remove hot span since it is outdated",
Expand Down
2 changes: 1 addition & 1 deletion maintainer/split/region_count_splitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func newEvenlySplitStepper(pages int, totalRegion int) evenlySplitStepper {
extraRegionPerSpan: extraRegionPerSpan,
remain: remain,
}
log.Info("evenly split stepper", zap.Any("evenlySplitStepper", res))
log.Info("evenly split stepper", zap.Any("regionPerSpan", regionPerSpan), zap.Any("spanCount", pages), zap.Any("extraRegionPerSpan", extraRegionPerSpan))
return res
}

Expand Down
16 changes: 14 additions & 2 deletions maintainer/split/region_count_splitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,13 @@ import (
)

func TestRegionCountSplitSpan(t *testing.T) {
t.Parallel()
// t.Parallel()

oldBaseSpanNumberCoefficient := baseSpanNumberCoefficient
baseSpanNumberCoefficient = 3
defer func() {
baseSpanNumberCoefficient = oldBaseSpanNumberCoefficient
}()

cache := NewMockRegionCache(nil)
cache.regions.ReplaceOrInsert(tablepb.Span{StartKey: []byte("t1_0"), EndKey: []byte("t1_1")}, 1)
Expand Down Expand Up @@ -134,7 +140,13 @@ func TestRegionCountSplitSpan(t *testing.T) {
}

func TestRegionCountEvenlySplitSpan(t *testing.T) {
t.Parallel()
// t.Parallel()

oldBaseSpanNumberCoefficient := baseSpanNumberCoefficient
baseSpanNumberCoefficient = 3
defer func() {
baseSpanNumberCoefficient = oldBaseSpanNumberCoefficient
}()

cache := NewMockRegionCache(nil)
totalRegion := 1000
Expand Down
5 changes: 5 additions & 0 deletions maintainer/split/write_bytes_splitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ func (m *writeSplitter) split(
zap.String("namespace", m.changefeedID.Namespace()),
zap.String("changefeed", m.changefeedID.Name()),
zap.String("span", span.String()),
zap.Int("captureNum", captureNum),
zap.Int("expectedSpanNum", expectedSpanNum),
zap.Int("DefaultMaxSpanNumber", DefaultMaxSpanNumber),
zap.Int("regionsLen", len(regions)),
zap.Error(err))
return []*heartbeatpb.TableSpan{span}
}
Expand Down Expand Up @@ -139,6 +143,7 @@ func (m *writeSplitter) splitRegionsByWrittenKeysV1(
// 1. If the total write is less than writeKeyThreshold
// don't need to split the regions
if totalWrite < uint64(m.writeKeyThreshold) {
log.Info("total write less than writeKeyThreshold, skip split", zap.Any("totalWrite", totalWrite), zap.Any("writeKeyThreshold", m.writeKeyThreshold))
return &splitRegionsInfo{
RegionCounts: []int{len(regions)},
Weights: []uint64{totalWriteNormalized},
Expand Down
14 changes: 12 additions & 2 deletions maintainer/split/write_bytes_splitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,12 @@ func TestSplitRegionsByWrittenKeysHotspot2(t *testing.T) {
}

func TestSplitRegionsByWrittenKeysCold(t *testing.T) {
t.Parallel()
// t.Parallel()
oldBaseSpanNumberCoefficient := baseSpanNumberCoefficient
baseSpanNumberCoefficient = 3
defer func() {
baseSpanNumberCoefficient = oldBaseSpanNumberCoefficient
}()
re := require.New(t)
cfID := common.NewChangeFeedIDWithName("test")
splitter := newWriteSplitter(cfID, nil, 0)
Expand All @@ -195,7 +200,12 @@ func TestSplitRegionsByWrittenKeysCold(t *testing.T) {
}

func TestNotSplitRegionsByWrittenKeysCold(t *testing.T) {
t.Parallel()
// t.Parallel()
oldBaseSpanNumberCoefficient := baseSpanNumberCoefficient
baseSpanNumberCoefficient = 3
defer func() {
baseSpanNumberCoefficient = oldBaseSpanNumberCoefficient
}()
re := require.New(t)
cfID := common.NewChangeFeedIDWithName("test")
splitter := newWriteSplitter(cfID, nil, 1)
Expand Down

0 comments on commit 7af8164

Please sign in to comment.