Skip to content

Commit

Permalink
mysql-sink: support conflict detector for mysql sink (#1015)
Browse files Browse the repository at this point in the history
close #1013
  • Loading branch information
hongyunyan authored Feb 19, 2025
1 parent 0184c69 commit 17ed71b
Show file tree
Hide file tree
Showing 15 changed files with 914 additions and 23 deletions.
17 changes: 14 additions & 3 deletions downstreamadapter/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,18 @@ func (d *Dispatcher) HandleEvents(dispatcherEvents []DispatcherEvent, wakeCallba
wakeCallback()
}
})
d.AddDMLEventToSink(dml)
err := d.AddDMLEventToSink(dml)
if err != nil {
select {
case d.errCh <- err:
default:
log.Error("error channel is full, discard error",
zap.Stringer("changefeedID", d.changefeedID),
zap.Stringer("dispatcherID", d.id),
zap.Error(err))
}
return
}
case commonEvent.TypeDDLEvent:
if len(dispatcherEvents) != 1 {
log.Panic("ddl event should only be singly handled",
Expand Down Expand Up @@ -410,9 +421,9 @@ func (d *Dispatcher) HandleEvents(dispatcherEvents []DispatcherEvent, wakeCallba
return block
}

func (d *Dispatcher) AddDMLEventToSink(event *commonEvent.DMLEvent) {
func (d *Dispatcher) AddDMLEventToSink(event *commonEvent.DMLEvent) error {
d.tableProgress.Add(event)
d.sink.AddDMLEvent(event)
return d.sink.AddDMLEvent(event)
}

func (d *Dispatcher) AddBlockEventToSink(event commonEvent.BlockEvent) error {
Expand Down
3 changes: 2 additions & 1 deletion downstreamadapter/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ type mockSink struct {
sinkType common.SinkType
}

func (s *mockSink) AddDMLEvent(event *commonEvent.DMLEvent) {
func (s *mockSink) AddDMLEvent(event *commonEvent.DMLEvent) error {
s.dmls = append(s.dmls, event)
return nil
}

func (s *mockSink) WriteBlockEvent(event commonEvent.BlockEvent) error {
Expand Down
3 changes: 2 additions & 1 deletion downstreamadapter/sink/blackhole.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,14 @@ func (s *BlackHoleSink) SinkType() common.SinkType {
func (s *BlackHoleSink) SetTableSchemaStore(tableSchemaStore *util.TableSchemaStore) {
}

func (s *BlackHoleSink) AddDMLEvent(event *commonEvent.DMLEvent) {
func (s *BlackHoleSink) AddDMLEvent(event *commonEvent.DMLEvent) error {
// NOTE: don't change the log, integration test `lossy_ddl` depends on it.
// ref: https://github.com/pingcap/ticdc/blob/da834db76e0662ff15ef12645d1f37bfa6506d83/tests/integration_tests/lossy_ddl/run.sh#L23
log.Debug("BlackHoleSink: WriteEvents", zap.Any("dml", event))
for _, callback := range event.PostTxnFlushed {
callback()
}
return nil
}

func (s *BlackHoleSink) PassBlockEvent(event commonEvent.BlockEvent) {
Expand Down
3 changes: 2 additions & 1 deletion downstreamadapter/sink/cloudstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,9 @@ func (s *CloudStorageSink) IsNormal() bool {
return atomic.LoadUint32(&s.isNormal) == 1
}

func (s *CloudStorageSink) AddDMLEvent(event *commonEvent.DMLEvent) {
func (s *CloudStorageSink) AddDMLEvent(event *commonEvent.DMLEvent) error {
s.dmlWorker.AddDMLEvent(event)
return nil
}

func (s *CloudStorageSink) PassBlockEvent(event commonEvent.BlockEvent) {
Expand Down
138 changes: 138 additions & 0 deletions downstreamadapter/sink/conflictdetector/conflict_detector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package conflictdetector

import (
"sync"

"github.com/pingcap/log"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/utils/chann"
"go.uber.org/atomic"
"go.uber.org/zap"
)

// ConflictDetector implements a logic that dispatches transaction
// to different worker cache channels in a way that transactions
// modifying the same keys are never executed concurrently and
// have their original orders preserved. Transactions in different
// channels can be executed concurrently.
type ConflictDetector struct {
// resolvedTxnCaches are used to cache resolved transactions.
resolvedTxnCaches []txnCache

// slots are used to find all unfinished transactions
// conflicting with an incoming transactions.
slots *Slots
numSlots uint64

// nextCacheID is used to dispatch transactions round-robin.
nextCacheID atomic.Int64

closeCh chan struct{}

notifiedNodes *chann.DrainableChann[func()]
wg sync.WaitGroup
}

// NewConflictDetector creates a new ConflictDetector.
func NewConflictDetector(
numSlots uint64, opt TxnCacheOption,
) *ConflictDetector {
ret := &ConflictDetector{
resolvedTxnCaches: make([]txnCache, opt.Count),
slots: NewSlots(numSlots),
numSlots: numSlots,
closeCh: make(chan struct{}),
notifiedNodes: chann.NewAutoDrainChann[func()](),
}
for i := 0; i < opt.Count; i++ {
ret.resolvedTxnCaches[i] = newTxnCache(opt)
}

ret.wg.Add(1)
go func() {
defer ret.wg.Done()
ret.runBackgroundTasks()
}()

return ret
}

func (d *ConflictDetector) runBackgroundTasks() {
defer func() {
d.notifiedNodes.CloseAndDrain()
}()
for {
select {
case <-d.closeCh:
return
case notifyCallback := <-d.notifiedNodes.Out():
if notifyCallback != nil {
notifyCallback()
}
}
}
}

// Add pushes a transaction to the ConflictDetector.
//
// NOTE: if multiple threads access this concurrently,
// ConflictKeys must be sorted by the slot index.
func (d *ConflictDetector) Add(event *commonEvent.DMLEvent) error {
hashes, err := ConflictKeys(event)
if err != nil {
return err
}
node := d.slots.AllocNode(hashes)

event.AddPostFlushFunc(func() {
d.slots.Remove(node)
})

node.TrySendToTxnCache = func(cacheID int64) bool {
// Try sending this txn to related cache as soon as all dependencies are resolved.
return d.sendToCache(event, cacheID)
}
node.RandCacheID = func() int64 { return d.nextCacheID.Add(1) % int64(len(d.resolvedTxnCaches)) }
node.OnNotified = func(callback func()) { d.notifiedNodes.In() <- callback }
d.slots.Add(node)

return nil
}

// Close closes the ConflictDetector.
func (d *ConflictDetector) Close() {
close(d.closeCh)
}

// sendToCache should not call txn.Callback if it returns an error.
func (d *ConflictDetector) sendToCache(event *commonEvent.DMLEvent, id int64) bool {
if id < 0 {
log.Panic("must assign with a valid cacheID", zap.Int64("cacheID", id))
}

cache := d.resolvedTxnCaches[id]
ok := cache.add(event)
return ok
}

// GetOutChByCacheID returns the output channel by cacheID.
// Note txns in single cache should be executed sequentially.
func (d *ConflictDetector) GetOutChByCacheID(id int64) <-chan *commonEvent.DMLEvent {
if id < 0 {
log.Panic("must assign with a valid cacheID", zap.Int64("cacheID", id))
}
return d.resolvedTxnCaches[id].out()
}
154 changes: 154 additions & 0 deletions downstreamadapter/sink/conflictdetector/helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package conflictdetector

import (
"encoding/binary"
"hash/fnv"
"strings"

"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/common"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tiflow/cdc/model"
"go.uber.org/zap"
)

// ConflictKeys implements causality.txnEvent interface.
func ConflictKeys(event *commonEvent.DMLEvent) ([]uint64, error) {
if event.Len() == 0 {
return nil, nil
}

hashRes := make(map[uint64]struct{}, event.Len())
hasher := fnv.New32a()

for {
row, ok := event.GetNextRow()
if !ok {
break
}
keys, err := genRowKeys(row, event.TableInfo, event.DispatcherID)
if err != nil {
return nil, errors.Trace(err)
}
for _, key := range keys {
if n, err := hasher.Write(key); n != len(key) || err != nil {
log.Panic("transaction key hash fail")
}
hashRes[uint64(hasher.Sum32())] = struct{}{}
hasher.Reset()
}
}

event.FinishGetRow()

keys := make([]uint64, 0, len(hashRes))
for key := range hashRes {
keys = append(keys, key)
}
return keys, nil
}

func genRowKeys(row commonEvent.RowChange, tableInfo *common.TableInfo, dispatcherID common.DispatcherID) ([][]byte, error) {
var keys [][]byte

if !row.Row.IsEmpty() {
for iIdx, idxCol := range tableInfo.GetIndexColumnsOffset() {
key, err := genKeyList(&row.Row, iIdx, idxCol, dispatcherID, tableInfo)
if err != nil {
return nil, errors.Trace(err)
}
if len(key) == 0 {
continue
}
keys = append(keys, key)
}
}
if !row.PreRow.IsEmpty() {
for iIdx, idxCol := range tableInfo.GetIndexColumnsOffset() {
key, err := genKeyList(&row.PreRow, iIdx, idxCol, dispatcherID, tableInfo)
if err != nil {
return nil, errors.Trace(err)
}
if len(key) == 0 {
continue
}
keys = append(keys, key)
}
}
if len(keys) == 0 {
// use dispatcherID as key if no key generated (no PK/UK),
// no concurrence for rows in the same dispatcher.
log.Debug("Use dispatcherID as the key", zap.Any("dispatcherID", dispatcherID))
tableKey := make([]byte, 8)
binary.BigEndian.PutUint64(tableKey, uint64(dispatcherID.GetLow()))
keys = [][]byte{tableKey}
}
return keys, nil
}

func genKeyList(
row *chunk.Row, iIdx int, colIdx []int, dispatcherID common.DispatcherID, tableInfo *common.TableInfo,
) ([]byte, error) {
var key []byte
columnInfos := tableInfo.GetColumns()
for _, i := range colIdx {
// If the index contain generated column, we can't use this key to detect conflict with other DML,
if columnInfos[i] == nil || tableInfo.GetColumnFlags()[columnInfos[i].ID].IsGeneratedColumn() {
return nil, nil
}

value, err := common.FormatColVal(row, columnInfos[i], i)
if err != nil {
return nil, err
}
// if a column value is null, we can ignore this index
if value == nil {
return nil, nil
}

val := model.ColumnValueString(value)
if columnNeeds2LowerCase(columnInfos[i].GetType(), columnInfos[i].GetCollate()) {
val = strings.ToLower(val)
}

key = append(key, []byte(val)...)
key = append(key, 0)
}
if len(key) == 0 {
return nil, nil
}
tableKey := make([]byte, 16)
binary.BigEndian.PutUint64(tableKey[:8], uint64(iIdx))
binary.BigEndian.PutUint64(tableKey[8:], uint64(dispatcherID.GetLow()))
key = append(key, tableKey...)
return key, nil
}

func columnNeeds2LowerCase(mysqlType byte, collation string) bool {
switch mysqlType {
case mysql.TypeVarchar, mysql.TypeString, mysql.TypeVarString, mysql.TypeTinyBlob,
mysql.TypeMediumBlob, mysql.TypeBlob, mysql.TypeLongBlob:
return collationNeeds2LowerCase(collation)
}
return false
}

func collationNeeds2LowerCase(collation string) bool {
return strings.HasSuffix(collation, "_ci")
}
Loading

0 comments on commit 17ed71b

Please sign in to comment.