Skip to content

Commit

Permalink
test: fix integration test capture_suicide_while_balance_table (#952)
Browse files Browse the repository at this point in the history
ref #442
  • Loading branch information
asddongmen authored Jan 24, 2025
1 parent f66770c commit d9aed28
Show file tree
Hide file tree
Showing 14 changed files with 352 additions and 94 deletions.
18 changes: 16 additions & 2 deletions .github/workflows/integration_test_mysql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -128,17 +128,31 @@ jobs:
run: |
export TICDC_NEWARCH=true && make integration_test CASE=savepoint
# The 14th case in this group
- name: Test server config compatibility
if: ${{ success() }}
run: |
export TICDC_NEWARCH=true && make integration_test CASE=server_config_compatibility
# The 15th case in this group
- name: Test split region
if: ${{ success() }}
run: |
export TICDC_NEWARCH=true && make integration_test CASE=split_region
# The 16th case in this group
- name: Test changefeed resume with checkpoint ts
if: ${{ success() }}
run: |
export TICDC_NEWARCH=true && make integration_test CASE=changefeed_resume_with_checkpoint_ts
- name: Test capture suicide while balance table
if: ${{ success() }}
run: |
export TICDC_NEWARCH=true && make integration_test CASE=capture_suicide_while_balance_table
- name: Test kv client stream reconnect
if: ${{ success() }}
run: |
export TICDC_NEWARCH=true && make integration_test CASE=kv_client_stream_reconnect
- name: Upload test logs
if: always()
Expand Down
24 changes: 24 additions & 0 deletions logservice/logpuller/region_request_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/cdcpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/log"
Expand Down Expand Up @@ -173,6 +174,17 @@ func (s *regionRequestWorker) run(ctx context.Context, credential *security.Cred
return s.receiveAndDispatchChangeEvents(conn)
})
g.Go(func() error { return s.processRegionSendTask(gctx, conn) })

failpoint.Inject("InjectForceReconnect", func() {
timer := time.After(10 * time.Second)
g.Go(func() error {
<-timer
err := errors.New("inject force reconnect")
log.Info("inject force reconnect", zap.Error(err))
return err
})
})

_ = g.Wait()
return isCanceled()
}
Expand Down Expand Up @@ -427,3 +439,15 @@ func (s *regionRequestWorker) clearPendingRegions() []regionInfo {
}
return regions
}

func (s *regionRequestWorker) getAllRegionStates() regionFeedStates {
s.requestedRegions.RLock()
defer s.requestedRegions.RUnlock()
states := make(regionFeedStates)
for _, statesMap := range s.requestedRegions.subscriptions {
for regionID, state := range statesMap {
states[regionID] = state
}
}
return states
}
2 changes: 1 addition & 1 deletion logservice/logpuller/subscription_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ import (
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/metrics"
"github.com/pingcap/ticdc/pkg/pdutil"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/ticdc/utils/dynstream"
"github.com/pingcap/tiflow/pkg/security"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/pingcap/tiflow/pkg/util"
"github.com/prometheus/client_golang/prometheus"
kvclientv2 "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
Expand Down
2 changes: 1 addition & 1 deletion maintainer/maintainer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ func (c *Controller) addNewSpans(schemaID int64, tableSpans []*heartbeatpb.Table
}

func (c *Controller) loadTables(startTs uint64) ([]commonEvent.Table, error) {
// todo: do we need to set timezone here?
// Use a empty timezone because table filter does not need it.
f, err := filter.NewFilter(c.cfConfig.Filter, "", c.cfConfig.CaseSensitive, c.cfConfig.ForceReplicate)
if err != nil {
return nil, errors.Cause(err)
Expand Down
3 changes: 3 additions & 0 deletions pkg/sink/mysql/mysql_writer_dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/retry"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/tidb/pkg/parser/mysql"
pmysql "github.com/pingcap/tiflow/pkg/sink/mysql"
"go.uber.org/zap"
Expand Down Expand Up @@ -170,6 +171,8 @@ func (w *MysqlWriter) execDMLWithMaxRetries(dmls *preparedDMLs) error {
failpoint.Return(err)
})

failpoint.Inject("MySQLSinkHangLongTime", func() { _ = util.Hang(w.ctx, time.Hour) })

failpoint.Inject("MySQLDuplicateEntryError", func() {
log.Warn("inject MySQLDuplicateEntryError")
err := cerror.WrapError(cerror.ErrMySQLDuplicateEntry, &dmysql.MySQLError{
Expand Down
58 changes: 58 additions & 0 deletions pkg/util/atomic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package util

type numbers interface {
int | int8 | int16 | int32 | int64 | uint | uint8 | uint16 | uint32 | uint64 | uintptr | float32 | float64
}

type genericAtomic[T numbers] interface {
Load() T
Store(T)
CompareAndSwap(old, new T) bool
}

// CompareAndIncrease updates the target if the new value is larger than or equal to the old value.
// It returns false if the new value is smaller than the old value.
func CompareAndIncrease[T numbers](target genericAtomic[T], new T) bool {
for {
old := target.Load()
if new < old {
return false
}
if new == old || target.CompareAndSwap(old, new) {
return true
}
}
}

// CompareAndMonotonicIncrease updates the target if the new value is larger than the old value.
// It returns false if the new value is smaller than or equal to the old value.
func CompareAndMonotonicIncrease[T numbers](target genericAtomic[T], new T) bool {
for {
old := target.Load()
if new <= old {
return false
}
if target.CompareAndSwap(old, new) {
return true
}
}
}

// MustCompareAndMonotonicIncrease updates the target if the new value is larger than the old value. It do nothing
// if the new value is smaller than or equal to the old value.
func MustCompareAndMonotonicIncrease[T numbers](target genericAtomic[T], new T) {
_ = CompareAndMonotonicIncrease(target, new)
}
106 changes: 106 additions & 0 deletions pkg/util/atomic_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package util

import (
"context"
"math/rand"
"sync"
"sync/atomic"
"testing"

"github.com/stretchr/testify/require"
)

func TestMustCompareAndIncrease(t *testing.T) {
t.Parallel()

var target atomic.Int64
target.Store(10)

ctx, cancel := context.WithCancel(context.Background())
wg := sync.WaitGroup{}

doIncrease := func() {
for {
select {
case <-ctx.Done():
return
default:
delta := rand.Int63n(100)
v := target.Load() + delta
MustCompareAndMonotonicIncrease(&target, v)
require.GreaterOrEqual(t, target.Load(), v)
}
}
}

// Test target increase.
wg.Add(2)
go func() {
defer wg.Done()
doIncrease()
}()
go func() {
defer wg.Done()
doIncrease()
}()

wg.Add(1)
// Test target never decrease.
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
v := target.Load() - 1
MustCompareAndMonotonicIncrease(&target, v)
require.Greater(t, target.Load(), v)
}
}
}()

cancel()
wg.Wait()
}

func TestCompareAndIncrease(t *testing.T) {
t.Parallel()

var target atomic.Int64
target.Store(10)
require.True(t, CompareAndIncrease(&target, 10))
require.Equal(t, int64(10), target.Load())

require.True(t, CompareAndIncrease(&target, 20))
require.Equal(t, int64(20), target.Load())
require.False(t, CompareAndIncrease(&target, 19))
require.Equal(t, int64(20), target.Load())
}

func TestCompareAndMonotonicIncrease(t *testing.T) {
t.Parallel()

var target atomic.Int64
target.Store(10)
require.False(t, CompareAndMonotonicIncrease(&target, 10))
require.Equal(t, int64(10), target.Load())

require.True(t, CompareAndMonotonicIncrease(&target, 11))
require.Equal(t, int64(11), target.Load())
require.False(t, CompareAndMonotonicIncrease(&target, 10))
require.Equal(t, int64(11), target.Load())
}
16 changes: 16 additions & 0 deletions pkg/util/net_util.go → pkg/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
package util

import (
"context"
"net"
"strconv"
"time"

"github.com/pingcap/errors"
)
Expand All @@ -33,3 +35,17 @@ func ParseHostAndPortFromAddress(address string) (string, uint, error) {
}
return host, uint(portNumeric), nil
}

// Hang will block the goroutine for a given duration, or return when `ctx` is done.
func Hang(ctx context.Context, dur time.Duration) error {
timer := time.NewTimer(dur)
select {
case <-ctx.Done():
if !timer.Stop() {
<-timer.C
}
return ctx.Err()
case <-timer.C:
return nil
}
}
Loading

0 comments on commit d9aed28

Please sign in to comment.