diff --git a/cmd/kafka-consumer/writer.go b/cmd/kafka-consumer/writer.go index aba803b1e..3871ba319 100644 --- a/cmd/kafka-consumer/writer.go +++ b/cmd/kafka-consumer/writer.go @@ -24,6 +24,7 @@ import ( "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/pingcap/log" cerror "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/spanz" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/ddlsink" ddlsinkfactory "github.com/pingcap/tiflow/cdc/sink/ddlsink/factory" @@ -36,7 +37,6 @@ import ( "github.com/pingcap/tiflow/pkg/sink/codec/canal" "github.com/pingcap/tiflow/pkg/sink/codec/open" "github.com/pingcap/tiflow/pkg/sink/codec/simple" - "github.com/pingcap/tiflow/pkg/spanz" "go.uber.org/zap" ) diff --git a/cmd/pulsar-consumer/main.go b/cmd/pulsar-consumer/main.go index 0e3e9f342..ee9ff77da 100644 --- a/cmd/pulsar-consumer/main.go +++ b/cmd/pulsar-consumer/main.go @@ -32,6 +32,7 @@ import ( "github.com/apache/pulsar-client-go/pulsar/auth" "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/pingcap/ticdc/pkg/spanz" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/ddlsink" ddlsinkfactory "github.com/pingcap/tiflow/cdc/sink/ddlsink/factory" @@ -46,7 +47,6 @@ import ( "github.com/pingcap/tiflow/pkg/sink/codec/canal" "github.com/pingcap/tiflow/pkg/sink/codec/common" tpulsar "github.com/pingcap/tiflow/pkg/sink/pulsar" - "github.com/pingcap/tiflow/pkg/spanz" "github.com/pingcap/tiflow/pkg/util" "github.com/pingcap/tiflow/pkg/version" "github.com/spf13/cobra" diff --git a/cmd/storage-consumer/main.go b/cmd/storage-consumer/main.go index 7713acc46..335b66681 100644 --- a/cmd/storage-consumer/main.go +++ b/cmd/storage-consumer/main.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/pingcap/ticdc/pkg/spanz" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/ddlsink" @@ -47,7 +48,6 @@ import ( "github.com/pingcap/tiflow/pkg/sink/codec/canal" "github.com/pingcap/tiflow/pkg/sink/codec/common" "github.com/pingcap/tiflow/pkg/sink/codec/csv" - "github.com/pingcap/tiflow/pkg/spanz" putil "github.com/pingcap/tiflow/pkg/util" "github.com/pingcap/tiflow/pkg/version" "go.uber.org/zap" diff --git a/downstreamadapter/dispatcher/dispatcher.go b/downstreamadapter/dispatcher/dispatcher.go index 66001a0cc..65bc0b666 100644 --- a/downstreamadapter/dispatcher/dispatcher.go +++ b/downstreamadapter/dispatcher/dispatcher.go @@ -28,7 +28,7 @@ import ( "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" "github.com/pingcap/ticdc/pkg/sink/util" - "github.com/pingcap/tiflow/pkg/spanz" + "github.com/pingcap/ticdc/pkg/spanz" "go.uber.org/zap" ) diff --git a/downstreamadapter/dispatcher/dispatcher_test.go b/downstreamadapter/dispatcher/dispatcher_test.go index 0d9d94d8f..d3c658d2f 100644 --- a/downstreamadapter/dispatcher/dispatcher_test.go +++ b/downstreamadapter/dispatcher/dispatcher_test.go @@ -25,7 +25,7 @@ import ( commonEvent "github.com/pingcap/ticdc/pkg/common/event" "github.com/pingcap/ticdc/pkg/node" sinkutil "github.com/pingcap/ticdc/pkg/sink/util" - "github.com/pingcap/tiflow/pkg/spanz" + "github.com/pingcap/ticdc/pkg/spanz" "github.com/stretchr/testify/require" ) diff --git a/heartbeatpb/table_span.go b/heartbeatpb/table_span.go index dd04bc621..016e38f9d 100644 --- a/heartbeatpb/table_span.go +++ b/heartbeatpb/table_span.go @@ -16,7 +16,7 @@ package heartbeatpb import ( "bytes" - "github.com/pingcap/tiflow/pkg/spanz" + "github.com/pingcap/ticdc/pkg/spanz" ) // DDLSpanSchemaID is the special schema id for DDL diff --git a/logservice/coordinator/coordinator.go b/logservice/coordinator/coordinator.go index 0d8519ff4..df7f64ac1 100644 --- a/logservice/coordinator/coordinator.go +++ b/logservice/coordinator/coordinator.go @@ -26,9 +26,9 @@ import ( appcontext "github.com/pingcap/ticdc/pkg/common/context" "github.com/pingcap/ticdc/pkg/messaging" "github.com/pingcap/ticdc/pkg/node" + "github.com/pingcap/ticdc/pkg/spanz" "github.com/pingcap/ticdc/server/watcher" "github.com/pingcap/tiflow/pkg/chann" - "github.com/pingcap/tiflow/pkg/spanz" "go.uber.org/zap" ) diff --git a/logservice/coordinator/coordinator_test.go b/logservice/coordinator/coordinator_test.go index 2f50e5aac..a6b24e5fa 100644 --- a/logservice/coordinator/coordinator_test.go +++ b/logservice/coordinator/coordinator_test.go @@ -19,7 +19,7 @@ import ( "github.com/pingcap/ticdc/heartbeatpb" "github.com/pingcap/ticdc/logservice/logservicepb" "github.com/pingcap/ticdc/pkg/node" - "github.com/pingcap/tiflow/pkg/spanz" + "github.com/pingcap/ticdc/pkg/spanz" "github.com/stretchr/testify/assert" ) diff --git a/logservice/logpuller/region_event_handler_test.go b/logservice/logpuller/region_event_handler_test.go index 53e9ad262..a81815488 100644 --- a/logservice/logpuller/region_event_handler_test.go +++ b/logservice/logpuller/region_event_handler_test.go @@ -21,8 +21,8 @@ import ( "github.com/pingcap/ticdc/heartbeatpb" "github.com/pingcap/ticdc/logservice/logpuller/regionlock" "github.com/pingcap/ticdc/pkg/common" + "github.com/pingcap/ticdc/pkg/spanz" "github.com/pingcap/ticdc/utils/dynstream" - "github.com/pingcap/tiflow/pkg/spanz" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/tikv" ) diff --git a/logservice/logpuller/regionlock/region_range_lock_test.go b/logservice/logpuller/regionlock/region_range_lock_test.go index 14723bb2e..c8cd330ae 100644 --- a/logservice/logpuller/regionlock/region_range_lock_test.go +++ b/logservice/logpuller/regionlock/region_range_lock_test.go @@ -22,7 +22,7 @@ import ( "time" "github.com/pingcap/ticdc/heartbeatpb" - "github.com/pingcap/tiflow/pkg/spanz" + "github.com/pingcap/ticdc/pkg/spanz" "github.com/stretchr/testify/require" ) diff --git a/logservice/logpuller/subscription_client.go b/logservice/logpuller/subscription_client.go index cfbe12958..0c04781d9 100644 --- a/logservice/logpuller/subscription_client.go +++ b/logservice/logpuller/subscription_client.go @@ -30,10 +30,10 @@ import ( cerror "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/metrics" "github.com/pingcap/ticdc/pkg/pdutil" + "github.com/pingcap/ticdc/pkg/spanz" "github.com/pingcap/ticdc/pkg/util" "github.com/pingcap/ticdc/utils/dynstream" "github.com/pingcap/tiflow/pkg/security" - "github.com/pingcap/tiflow/pkg/spanz" "github.com/prometheus/client_golang/prometheus" kvclientv2 "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/oracle" diff --git a/maintainer/maintainer_controller.go b/maintainer/maintainer_controller.go index f54aecfc2..8aa4b572a 100644 --- a/maintainer/maintainer_controller.go +++ b/maintainer/maintainer_controller.go @@ -34,10 +34,10 @@ import ( "github.com/pingcap/ticdc/pkg/node" "github.com/pingcap/ticdc/pkg/pdutil" "github.com/pingcap/ticdc/pkg/scheduler" + "github.com/pingcap/ticdc/pkg/spanz" "github.com/pingcap/ticdc/server/watcher" "github.com/pingcap/ticdc/utils" "github.com/pingcap/ticdc/utils/threadpool" - "github.com/pingcap/tiflow/pkg/spanz" "go.uber.org/zap" ) diff --git a/maintainer/maintainer_controller_test.go b/maintainer/maintainer_controller_test.go index 7d95f60eb..202c4acb0 100644 --- a/maintainer/maintainer_controller_test.go +++ b/maintainer/maintainer_controller_test.go @@ -32,10 +32,10 @@ import ( "github.com/pingcap/ticdc/pkg/pdutil" "github.com/pingcap/ticdc/pkg/scheduler" pkgOpearator "github.com/pingcap/ticdc/pkg/scheduler/operator" + "github.com/pingcap/ticdc/pkg/spanz" "github.com/pingcap/ticdc/server/watcher" "github.com/pingcap/ticdc/utils/threadpool" "github.com/pingcap/tiflow/cdc/processor/tablepb" - "github.com/pingcap/tiflow/pkg/spanz" "github.com/stretchr/testify/require" ) diff --git a/maintainer/maintainer_manager_test.go b/maintainer/maintainer_manager_test.go index f93679be1..024f274b5 100644 --- a/maintainer/maintainer_manager_test.go +++ b/maintainer/maintainer_manager_test.go @@ -34,11 +34,11 @@ import ( "github.com/pingcap/ticdc/pkg/messaging/proto" "github.com/pingcap/ticdc/pkg/node" "github.com/pingcap/ticdc/pkg/pdutil" + "github.com/pingcap/ticdc/pkg/spanz" "github.com/pingcap/ticdc/server/watcher" "github.com/pingcap/tiflow/cdc/model" config2 "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/orchestrator" - "github.com/pingcap/tiflow/pkg/spanz" "github.com/stretchr/testify/require" "google.golang.org/grpc" ) diff --git a/maintainer/range_checker/table_span_range_checker.go b/maintainer/range_checker/table_span_range_checker.go index c10253853..5e22744d1 100644 --- a/maintainer/range_checker/table_span_range_checker.go +++ b/maintainer/range_checker/table_span_range_checker.go @@ -20,7 +20,7 @@ import ( "sync/atomic" "github.com/google/btree" - "github.com/pingcap/tiflow/pkg/spanz" + "github.com/pingcap/ticdc/pkg/spanz" ) // TableSpanRangeChecker is used to check if all ranges cover the start and end byte slices. diff --git a/maintainer/range_checker/table_span_range_checker_test.go b/maintainer/range_checker/table_span_range_checker_test.go index f405cb88a..ab9335a9a 100644 --- a/maintainer/range_checker/table_span_range_checker_test.go +++ b/maintainer/range_checker/table_span_range_checker_test.go @@ -17,7 +17,7 @@ import ( "bytes" "testing" - "github.com/pingcap/tiflow/pkg/spanz" + "github.com/pingcap/ticdc/pkg/spanz" "github.com/stretchr/testify/require" ) diff --git a/maintainer/replica/replication_db_test.go b/maintainer/replica/replication_db_test.go index adea20909..4577017b9 100644 --- a/maintainer/replica/replication_db_test.go +++ b/maintainer/replica/replication_db_test.go @@ -20,7 +20,7 @@ import ( "github.com/pingcap/ticdc/heartbeatpb" replica_mock "github.com/pingcap/ticdc/maintainer/replica/mock" "github.com/pingcap/ticdc/pkg/common" - "github.com/pingcap/tiflow/pkg/spanz" + "github.com/pingcap/ticdc/pkg/spanz" "github.com/stretchr/testify/require" ) diff --git a/maintainer/replica/replication_span.go b/maintainer/replica/replication_span.go index f15b023b5..10ef39677 100644 --- a/maintainer/replica/replication_span.go +++ b/maintainer/replica/replication_span.go @@ -27,8 +27,8 @@ import ( "github.com/pingcap/ticdc/pkg/node" "github.com/pingcap/ticdc/pkg/retry" "github.com/pingcap/ticdc/pkg/scheduler/replica" + "github.com/pingcap/ticdc/pkg/spanz" "github.com/pingcap/tiflow/cdc/processor/tablepb" - "github.com/pingcap/tiflow/pkg/spanz" "github.com/tikv/client-go/v2/oracle" "go.uber.org/atomic" "go.uber.org/zap" diff --git a/maintainer/scheduler.go b/maintainer/scheduler.go index b5583ef4e..9db9f1a4b 100644 --- a/maintainer/scheduler.go +++ b/maintainer/scheduler.go @@ -25,9 +25,9 @@ import ( "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/pkg/scheduler" pkgReplica "github.com/pingcap/ticdc/pkg/scheduler/replica" + "github.com/pingcap/ticdc/pkg/spanz" "github.com/pingcap/ticdc/server/watcher" "github.com/pingcap/ticdc/utils" - "github.com/pingcap/tiflow/pkg/spanz" "go.uber.org/zap" ) diff --git a/maintainer/split/region_count_splitter_test.go b/maintainer/split/region_count_splitter_test.go index ee6ad96be..ad1bbe277 100644 --- a/maintainer/split/region_count_splitter_test.go +++ b/maintainer/split/region_count_splitter_test.go @@ -21,9 +21,9 @@ import ( "github.com/pingcap/ticdc/heartbeatpb" "github.com/pingcap/ticdc/pkg/common" + "github.com/pingcap/ticdc/pkg/spanz" "github.com/pingcap/tiflow/cdc/processor/tablepb" "github.com/pingcap/tiflow/pkg/config" - "github.com/pingcap/tiflow/pkg/spanz" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/tikv" ) diff --git a/pkg/common/event/mounter.go b/pkg/common/event/mounter.go index 7625b5bab..f6e08ea63 100644 --- a/pkg/common/event/mounter.go +++ b/pkg/common/event/mounter.go @@ -22,12 +22,12 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/ticdc/pkg/common" + "github.com/pingcap/ticdc/pkg/spanz" "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/tablecodec" "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util/chunk" "github.com/pingcap/tidb/pkg/util/rowcodec" - "github.com/pingcap/tiflow/pkg/spanz" ) // DDLTableInfo contains the tableInfo about tidb_ddl_job and tidb_ddl_history diff --git a/pkg/pdutil/api_client.go b/pkg/pdutil/api_client.go index f68ed8ba6..c718ae516 100644 --- a/pkg/pdutil/api_client.go +++ b/pkg/pdutil/api_client.go @@ -29,11 +29,11 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/retry" + "github.com/pingcap/ticdc/pkg/spanz" "github.com/pingcap/tiflow/cdc/processor/tablepb" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/httputil" "github.com/pingcap/tiflow/pkg/security" - "github.com/pingcap/tiflow/pkg/spanz" pd "github.com/tikv/pd/client" "go.uber.org/zap" ) diff --git a/pkg/pdutil/api_client_test.go b/pkg/pdutil/api_client_test.go index 2331d9b4e..45e37c3e3 100644 --- a/pkg/pdutil/api_client_test.go +++ b/pkg/pdutil/api_client_test.go @@ -22,12 +22,12 @@ import ( "strconv" "testing" + "github.com/pingcap/ticdc/pkg/spanz" "github.com/pingcap/tidb/pkg/tablecodec" "github.com/pingcap/tidb/pkg/util/codec" "github.com/pingcap/tiflow/cdc/processor/tablepb" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/httputil" - "github.com/pingcap/tiflow/pkg/spanz" "github.com/stretchr/testify/require" pd "github.com/tikv/pd/client" ) diff --git a/pkg/spanz/btree_map.go b/pkg/spanz/btree_map.go new file mode 100644 index 000000000..510b43bf0 --- /dev/null +++ b/pkg/spanz/btree_map.go @@ -0,0 +1,181 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package spanz + +import ( + "bytes" + "time" + + "github.com/google/btree" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/processor/tablepb" + "go.uber.org/zap" +) + +// spanItem is an btree item that wraps a span (key) and an item (value). +type spanItem[T any] struct { + tablepb.Span + Value T +} + +// lessSpanItem compares two Spans, defines the order between spans. +func lessSpanItem[T any](a, b spanItem[T]) bool { + return a.Less(&b.Span) +} + +// BtreeMap is a specialized btree map that map a Span to a value. +type BtreeMap[T any] struct { + tree *btree.BTreeG[spanItem[T]] + + cache *struct { + coveredSpans, holes []tablepb.Span + lastGC time.Time + } +} + +// NewBtreeMap returns a new BtreeMap. +func NewBtreeMap[T any]() *BtreeMap[T] { + const defaultDegree = 16 + return NewBtreeMapWithDegree[T](defaultDegree) +} + +// NewBtreeMapWithDegree returns a new BtreeMap with the given degree. +func NewBtreeMapWithDegree[T any](degree int) *BtreeMap[T] { + return &BtreeMap[T]{ + tree: btree.NewG(degree, lessSpanItem[T]), + } +} + +// Len returns the number of items currently in the tree. +func (m *BtreeMap[T]) Len() int { + return m.tree.Len() +} + +// Has returns true if the given key is in the tree. +func (m *BtreeMap[T]) Has(span tablepb.Span) bool { + return m.tree.Has(spanItem[T]{Span: span}) +} + +// Get looks for the key item in the tree, returning it. +// It returns (zeroValue, false) if unable to find that item. +func (m *BtreeMap[T]) Get(span tablepb.Span) (T, bool) { + item, ok := m.tree.Get(spanItem[T]{Span: span}) + return item.Value, ok +} + +// GetV looks for the key item in the tree, returning it. +// It returns zeroValue if unable to find that item. +func (m *BtreeMap[T]) GetV(span tablepb.Span) T { + item, _ := m.tree.Get(spanItem[T]{Span: span}) + return item.Value +} + +// Delete removes an item equal to the passed in item from the tree, returning +// it. If no such item exists, returns (zeroValue, false). +func (m *BtreeMap[T]) Delete(span tablepb.Span) (T, bool) { + item, ok := m.tree.Delete(spanItem[T]{Span: span}) + return item.Value, ok +} + +// ReplaceOrInsert adds the given item to the tree. If an item in the tree +// already equals the given one, it is removed from the tree and returned, +// and the second return value is true. Otherwise, (zeroValue, false) +// +// nil cannot be added to the tree (will panic). +func (m *BtreeMap[T]) ReplaceOrInsert(span tablepb.Span, value T) (T, bool) { + old, ok := m.tree.ReplaceOrInsert(spanItem[T]{Span: span, Value: value}) + return old.Value, ok +} + +// ItemIterator allows callers of Ascend to iterate in-order over portions of +// the tree. Similar to btree.ItemIterator. +// Note: The span must not be mutated. +type ItemIterator[T any] func(span tablepb.Span, value T) bool + +// Ascend calls the iterator for every value in the tree within the range +// [first, last], until iterator returns false. +func (m *BtreeMap[T]) Ascend(iterator ItemIterator[T]) { + m.tree.Ascend(func(item spanItem[T]) bool { + return iterator(item.Span, item.Value) + }) +} + +// AscendRange calls the iterator for every value in the tree within the range +// [start, end), until iterator returns false. +func (m *BtreeMap[T]) AscendRange(start, end tablepb.Span, iterator ItemIterator[T]) { + m.tree.AscendRange(spanItem[T]{Span: start}, spanItem[T]{Span: end}, + func(item spanItem[T]) bool { + return iterator(item.Span, item.Value) + }) +} + +// FindHoles returns an array of Span that are not covered in the range +// [start, end). +// Note: +// * Table ID is not set in returned holes. +// * Returned slice is read only and will be changed on next FindHoles. +func (m *BtreeMap[T]) FindHoles(start, end tablepb.Span) ([]tablepb.Span, []tablepb.Span) { + if bytes.Compare(start.StartKey, end.StartKey) >= 0 { + log.Panic("start must be larger than end", + zap.String("start", start.String()), + zap.String("end", end.String())) + } + if m.cache == nil || time.Since(m.cache.lastGC) > time.Minute { + m.cache = &struct { + coveredSpans []tablepb.Span + holes []tablepb.Span + lastGC time.Time + }{lastGC: time.Now()} + } + m.cache.coveredSpans = m.cache.coveredSpans[:0] + m.cache.holes = m.cache.holes[:0] + + lastSpan := tablepb.Span{ + StartKey: start.StartKey, + EndKey: start.StartKey, + } + m.AscendRange(start, end, func(current tablepb.Span, _ T) bool { + ord := bytes.Compare(lastSpan.EndKey, current.StartKey) + if ord < 0 { + // Find a hole. + m.cache.holes = append(m.cache.holes, tablepb.Span{ + StartKey: lastSpan.EndKey, + EndKey: current.StartKey, + }) + } else if ord > 0 { + log.Panic("map is out of order", + zap.String("lastSpan", lastSpan.String()), + zap.String("current", current.String())) + } + + lastSpan = current + m.cache.coveredSpans = append(m.cache.coveredSpans, current) + return true + }) + if len(m.cache.coveredSpans) == 0 { + // No such span in the map. + m.cache.holes = append(m.cache.holes, tablepb.Span{ + StartKey: start.StartKey, EndKey: end.StartKey, + }) + return m.cache.coveredSpans, m.cache.holes + } + // Check if there is a hole in the end. + if !bytes.Equal(lastSpan.EndKey, end.StartKey) { + m.cache.holes = append(m.cache.holes, tablepb.Span{ + StartKey: lastSpan.EndKey, + EndKey: end.StartKey, + }) + } + return m.cache.coveredSpans, m.cache.holes +} diff --git a/pkg/spanz/btree_map_test.go b/pkg/spanz/btree_map_test.go new file mode 100644 index 000000000..dc5d4aeb8 --- /dev/null +++ b/pkg/spanz/btree_map_test.go @@ -0,0 +1,219 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package spanz + +import ( + "testing" + + "github.com/pingcap/tiflow/cdc/processor/tablepb" + "github.com/stretchr/testify/require" +) + +func TestSpanMap(t *testing.T) { + t.Parallel() + + m := NewBtreeMap[int]() + + // Insert then get. + m.ReplaceOrInsert(tablepb.Span{TableID: 1}, 1) + v, ok := m.Get(tablepb.Span{TableID: 1}) + require.Equal(t, v, 1) + require.True(t, ok) + require.Equal(t, 1, m.Len()) + require.True(t, m.Has(tablepb.Span{TableID: 1})) + + // Insert then get again. + m.ReplaceOrInsert(tablepb.Span{TableID: 1, StartKey: []byte{1}}, 2) + require.Equal(t, 2, m.Len()) + v, ok = m.Get(tablepb.Span{TableID: 1, StartKey: []byte{1}}) + require.Equal(t, v, 2) + require.True(t, ok) + + // Overwrite then get. + old, ok := m.ReplaceOrInsert( + tablepb.Span{TableID: 1, StartKey: []byte{1}, EndKey: []byte{1}}, 3) + require.Equal(t, old, 2) + require.True(t, ok) + require.Equal(t, 2, m.Len()) + require.True(t, m.Has(tablepb.Span{TableID: 1, StartKey: []byte{1}})) + v, ok = m.Get(tablepb.Span{TableID: 1, StartKey: []byte{1}}) + require.Equal(t, v, 3) + require.True(t, ok) + + // get value + v = m.GetV(tablepb.Span{TableID: 1, StartKey: []byte{1}}) + require.Equal(t, v, 3) + + // Delete than get value + v, ok = m.Delete(tablepb.Span{TableID: 1, StartKey: []byte{1}}) + require.Equal(t, v, 3) + require.True(t, ok) + require.Equal(t, 1, m.Len()) + require.False(t, m.Has(tablepb.Span{TableID: 1, StartKey: []byte{1}})) + v = m.GetV(tablepb.Span{TableID: 1, StartKey: []byte{1}}) + require.Equal(t, v, 0) + + // Pointer value + mp := NewBtreeMap[*int]() + vp := &v + mp.ReplaceOrInsert(tablepb.Span{TableID: 1}, vp) + vp1, ok := mp.Get(tablepb.Span{TableID: 1}) + require.Equal(t, vp, vp1) + require.True(t, ok) + require.Equal(t, 1, m.Len()) +} + +func TestMapAscend(t *testing.T) { + t.Parallel() + + m := NewBtreeMap[int]() + for i := 0; i < 4; i++ { + m.ReplaceOrInsert(tablepb.Span{TableID: int64(i)}, i) + } + + j := 0 + m.Ascend(func(span tablepb.Span, value int) bool { + require.Equal(t, tablepb.Span{TableID: int64(j)}, span) + j++ + return true + }) + require.Equal(t, 4, j) + + j = 0 + m.AscendRange(tablepb.Span{TableID: 1}, tablepb.Span{TableID: 2}, + func(span tablepb.Span, value int) bool { + require.Equal(t, tablepb.Span{TableID: 1}, span) + j++ + return true + }) + require.Equal(t, 1, j) +} + +func TestMapFindHole(t *testing.T) { + t.Parallel() + + cases := []struct { + spans []tablepb.Span + rang [2]tablepb.Span + expectedFound []tablepb.Span + expectedHole []tablepb.Span + }{ + { // 0. all found. + spans: []tablepb.Span{ + {StartKey: []byte("t1_0"), EndKey: []byte("t1_1")}, + {StartKey: []byte("t1_1"), EndKey: []byte("t1_2")}, + {StartKey: []byte("t1_2"), EndKey: []byte("t2_0")}, + }, + rang: [2]tablepb.Span{ + {StartKey: []byte("t1_0")}, + {StartKey: []byte("t2_0")}, + }, + expectedFound: []tablepb.Span{ + {StartKey: []byte("t1_0"), EndKey: []byte("t1_1")}, + {StartKey: []byte("t1_1"), EndKey: []byte("t1_2")}, + {StartKey: []byte("t1_2"), EndKey: []byte("t2_0")}, + }, + }, + { // 1. on hole in the middle. + spans: []tablepb.Span{ + {StartKey: []byte("t1_0"), EndKey: []byte("t1_1")}, + {StartKey: []byte("t1_3"), EndKey: []byte("t1_4")}, + {StartKey: []byte("t1_4"), EndKey: []byte("t2_0")}, + }, + rang: [2]tablepb.Span{ + {StartKey: []byte("t1_0")}, + {StartKey: []byte("t2_0")}, + }, + expectedFound: []tablepb.Span{ + {StartKey: []byte("t1_0"), EndKey: []byte("t1_1")}, + {StartKey: []byte("t1_3"), EndKey: []byte("t1_4")}, + {StartKey: []byte("t1_4"), EndKey: []byte("t2_0")}, + }, + expectedHole: []tablepb.Span{ + {StartKey: []byte("t1_1"), EndKey: []byte("t1_3")}, + }, + }, + { // 2. two holes in the middle. + spans: []tablepb.Span{ + {StartKey: []byte("t1_0"), EndKey: []byte("t1_1")}, + {StartKey: []byte("t1_2"), EndKey: []byte("t1_3")}, + {StartKey: []byte("t1_4"), EndKey: []byte("t2_0")}, + }, + rang: [2]tablepb.Span{ + {StartKey: []byte("t1_0")}, + {StartKey: []byte("t2_0")}, + }, + expectedFound: []tablepb.Span{ + {StartKey: []byte("t1_0"), EndKey: []byte("t1_1")}, + {StartKey: []byte("t1_2"), EndKey: []byte("t1_3")}, + {StartKey: []byte("t1_4"), EndKey: []byte("t2_0")}, + }, + expectedHole: []tablepb.Span{ + {StartKey: []byte("t1_1"), EndKey: []byte("t1_2")}, + {StartKey: []byte("t1_3"), EndKey: []byte("t1_4")}, + }, + }, + { // 3. all missing. + spans: []tablepb.Span{}, + rang: [2]tablepb.Span{ + {StartKey: []byte("t1_0")}, + {StartKey: []byte("t2_0")}, + }, + expectedHole: []tablepb.Span{ + {StartKey: []byte("t1_0"), EndKey: []byte("t2_0")}, + }, + }, + { // 4. start not found + spans: []tablepb.Span{ + {StartKey: []byte("t1_4"), EndKey: []byte("t2_0")}, + }, + rang: [2]tablepb.Span{ + {StartKey: []byte("t1_0")}, + {StartKey: []byte("t2_0")}, + }, + expectedFound: []tablepb.Span{ + {StartKey: []byte("t1_4"), EndKey: []byte("t2_0")}, + }, + expectedHole: []tablepb.Span{ + {StartKey: []byte("t1_0"), EndKey: []byte("t1_4")}, + }, + }, + { // 5. end not found + spans: []tablepb.Span{ + {StartKey: []byte("t1_0"), EndKey: []byte("t1_1")}, + }, + rang: [2]tablepb.Span{ + {StartKey: []byte("t1_0")}, + {StartKey: []byte("t2_0")}, + }, + expectedFound: []tablepb.Span{ + {StartKey: []byte("t1_0"), EndKey: []byte("t1_1")}, + }, + expectedHole: []tablepb.Span{ + {StartKey: []byte("t1_1"), EndKey: []byte("t2_0")}, + }, + }, + } + + for i, cs := range cases { + _, _ = i, cs + m := NewBtreeMap[struct{}]() + for _, span := range cs.spans { + m.ReplaceOrInsert(span, struct{}{}) + } + found, holes := m.FindHoles(cs.rang[0], cs.rang[1]) + require.Equalf(t, cs.expectedFound, found, "case %d, %#v", i, cs) + require.Equalf(t, cs.expectedHole, holes, "case %d, %#v", i, cs) + } +} diff --git a/pkg/spanz/convert.go b/pkg/spanz/convert.go new file mode 100644 index 000000000..442f6ad73 --- /dev/null +++ b/pkg/spanz/convert.go @@ -0,0 +1,126 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package spanz + +import ( + "fmt" + "reflect" + "sort" + "unsafe" + + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/processor/tablepb" + "go.uber.org/zap" +) + +// HexKey returns a hex string generated from the key. +func HexKey(key []byte) string { + // TODO(qupeng): improve the function. + str := "" + for _, c := range key { + str += fmt.Sprintf("%02X", c) + } + return str +} + +// ArrayToSpan converts an array of TableID to an array of Span. +func ArrayToSpan(in []tablepb.TableID) []tablepb.Span { + out := make([]tablepb.Span, 0, len(in)) + for _, tableID := range in { + out = append(out, tablepb.Span{TableID: tableID}) + } + return out +} + +// TableIDToComparableSpan converts a TableID to a Span whose +// StartKey and EndKey are encoded in Comparable format. +func TableIDToComparableSpan(tableID tablepb.TableID) tablepb.Span { + startKey, endKey := GetTableRange(tableID) + return tablepb.Span{ + TableID: tableID, + StartKey: ToComparableKey(startKey), + EndKey: ToComparableKey(endKey), + } +} + +// TableIDToComparableRange returns a range of a table, +// start and end are encoded in Comparable format. +func TableIDToComparableRange(tableID tablepb.TableID) (start, end tablepb.Span) { + tableSpan := TableIDToComparableSpan(tableID) + start = tableSpan + start.EndKey = nil + end = tableSpan + end.StartKey = tableSpan.EndKey + end.EndKey = nil + return +} + +type sortableSpans []tablepb.Span + +func (a sortableSpans) Len() int { return len(a) } +func (a sortableSpans) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a sortableSpans) Less(i, j int) bool { return a[i].Less(&a[j]) } + +// Sort sorts a slice of Span. +func Sort(spans []tablepb.Span) { + sort.Sort(sortableSpans(spans)) +} + +// hashableSpan is a hashable span, which can be used as a map key. +type hashableSpan struct { + TableID tablepb.TableID + StartKey string + EndKey string +} + +// toHashableSpan converts a Span to a hashable span. +func toHashableSpan(span tablepb.Span) hashableSpan { + return hashableSpan{ + TableID: span.TableID, + StartKey: unsafeBytesToString(span.StartKey), + EndKey: unsafeBytesToString(span.EndKey), + } +} + +// toSpan converts to Span. +func (h hashableSpan) toSpan() tablepb.Span { + return tablepb.Span{ + TableID: h.TableID, + StartKey: unsafeStringToBytes(h.StartKey), + EndKey: unsafeStringToBytes(h.EndKey), + } +} + +// unsafeStringToBytes converts string to byte without memory allocation. +// The []byte must not be mutated. +// See: https://cs.opensource.google/go/go/+/refs/tags/go1.19.4:src/strings/builder.go;l=48 +func unsafeBytesToString(b []byte) string { + return *(*string)(unsafe.Pointer(&b)) +} + +// unsafeStringToBytes converts string to byte without memory allocation. +// The returned []byte must not be mutated. +// See: https://groups.google.com/g/golang-nuts/c/Zsfk-VMd_fU/m/O1ru4fO-BgAJ +func unsafeStringToBytes(s string) []byte { + if len(s) == 0 { + return []byte{} + } + const maxCap = 0x7fff0000 + if len(s) > maxCap { + log.Panic("string is too large", zap.Int("len", len(s))) + } + return (*[maxCap]byte)(unsafe.Pointer( + (*reflect.StringHeader)(unsafe.Pointer(&s)).Data), + )[:len(s):len(s)] +} diff --git a/pkg/spanz/convert_test.go b/pkg/spanz/convert_test.go new file mode 100644 index 000000000..f3e7c8db8 --- /dev/null +++ b/pkg/spanz/convert_test.go @@ -0,0 +1,73 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package spanz + +import ( + "testing" + + "github.com/pingcap/tiflow/cdc/processor/tablepb" + "github.com/stretchr/testify/require" +) + +func TestHashableSpan(t *testing.T) { + t.Parallel() + + // Make sure it can be a map key. + m := make(map[hashableSpan]int) + m[hashableSpan{}] = 1 + require.Equal(t, 1, m[hashableSpan{}]) + + span := toHashableSpan(TableIDToComparableSpan(1)) + require.EqualValues(t, TableIDToComparableSpan(1), span.toSpan()) +} + +func TestHashableSpanHeapAlloc(t *testing.T) { + span := tablepb.Span{TableID: 1} + for i := 0; i < 10; i++ { + span.StartKey = append(span.StartKey, byte(i)) + span.EndKey = append(span.EndKey, byte(i)) + } + + n := 0 + results := testing.Benchmark(func(b *testing.B) { + n = b.N + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + hspan := toHashableSpan(span) + span = hspan.toSpan() + } + }) + require.EqualValues(t, 0, results.MemAllocs/uint64(n)) +} + +func TestUnsafeStringByte(t *testing.T) { + b := []byte("unsafe bytes") + s := "unsafe bytes" + + us := unsafeBytesToString(b) + require.EqualValues(t, s, us) + require.EqualValues(t, len(b), len(us)) + + ub := unsafeStringToBytes(s) + require.EqualValues(t, b, ub) + require.EqualValues(t, len(s), len(ub)) + require.EqualValues(t, len(s), cap(ub)) +} + +func TestHexKey(t *testing.T) { + span := TableIDToComparableSpan(8616) + require.Equal(t, "7480000000000021FFA85F720000000000FA", HexKey(span.StartKey)) + require.Equal(t, "7480000000000021FFA85F730000000000FA", HexKey(span.EndKey)) +} diff --git a/pkg/spanz/hash_map.go b/pkg/spanz/hash_map.go new file mode 100644 index 000000000..69e6f977a --- /dev/null +++ b/pkg/spanz/hash_map.go @@ -0,0 +1,87 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package spanz + +import ( + "encoding/binary" + + "github.com/pingcap/tiflow/cdc/processor/tablepb" + "github.com/pingcap/tiflow/pkg/util/seahash" +) + +// HashMap is a specialized hash map that map a Span to a value. +type HashMap[T any] struct { + hashMap map[hashableSpan]T +} + +// NewHashMap returns a new HashMap. +func NewHashMap[T any]() *HashMap[T] { + return &HashMap[T]{ + hashMap: make(map[hashableSpan]T), + } +} + +// Len returns the number of items currently in the map. +func (m *HashMap[T]) Len() int { + return len(m.hashMap) +} + +// Has returns true if the given key is in the map. +func (m *HashMap[T]) Has(span tablepb.Span) bool { + _, ok := m.hashMap[toHashableSpan(span)] + return ok +} + +// Get looks for the key item in the map, returning it. +// It returns (zeroValue, false) if unable to find that item. +func (m *HashMap[T]) Get(span tablepb.Span) (T, bool) { + item, ok := m.hashMap[toHashableSpan(span)] + return item, ok +} + +// GetV looks for the key item in the map, returning it. +// It returns zeroValue if unable to find that item. +func (m *HashMap[T]) GetV(span tablepb.Span) T { + item := m.hashMap[toHashableSpan(span)] + return item +} + +// Delete removes an item whose key equals to the span. +func (m *HashMap[T]) Delete(span tablepb.Span) { + delete(m.hashMap, toHashableSpan(span)) +} + +// ReplaceOrInsert adds the given item to the map. +func (m *HashMap[T]) ReplaceOrInsert(span tablepb.Span, value T) { + m.hashMap[toHashableSpan(span)] = value +} + +// Range calls the iterator for every value in the map until iterator returns +// false. +func (m *HashMap[T]) Range(iterator ItemIterator[T]) { + for k, v := range m.hashMap { + ok := iterator(k.toSpan(), v) + if !ok { + break + } + } +} + +// HashTableSpan hashes the given span to a slot offset. +func HashTableSpan(span tablepb.Span, slots int) int { + b := make([]byte, 8+len(span.StartKey)) + binary.LittleEndian.PutUint64(b[0:8], uint64(span.TableID)) + copy(b[8:], span.StartKey) + return int(seahash.Sum64(b) % uint64(slots)) +} diff --git a/pkg/spanz/hash_map_test.go b/pkg/spanz/hash_map_test.go new file mode 100644 index 000000000..ea9d8b382 --- /dev/null +++ b/pkg/spanz/hash_map_test.go @@ -0,0 +1,98 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package spanz + +import ( + "testing" + + "github.com/pingcap/tiflow/cdc/processor/tablepb" + "github.com/stretchr/testify/require" +) + +func TestHashMap(t *testing.T) { + t.Parallel() + + m := NewHashMap[int]() + + // Insert then get. + m.ReplaceOrInsert(tablepb.Span{TableID: 1}, 1) + v, ok := m.Get(tablepb.Span{TableID: 1}) + require.Equal(t, v, 1) + require.True(t, ok) + require.Equal(t, 1, m.Len()) + require.True(t, m.Has(tablepb.Span{TableID: 1})) + + // Insert then get again. + m.ReplaceOrInsert(tablepb.Span{TableID: 1, StartKey: []byte{1}}, 2) + require.Equal(t, 2, m.Len()) + v, ok = m.Get(tablepb.Span{TableID: 1, StartKey: []byte{1}}) + require.Equal(t, v, 2) + require.True(t, ok) + + // Overwrite then get. + m.ReplaceOrInsert( + tablepb.Span{TableID: 1, StartKey: []byte{1}}, 3) + require.Equal(t, 2, m.Len()) + require.True(t, m.Has(tablepb.Span{TableID: 1, StartKey: []byte{1}})) + v, ok = m.Get(tablepb.Span{TableID: 1, StartKey: []byte{1}}) + require.Equal(t, v, 3) + require.True(t, ok) + + // get value + v = m.GetV(tablepb.Span{TableID: 1, StartKey: []byte{1}}) + require.Equal(t, v, 3) + + // Delete than get value + m.Delete(tablepb.Span{TableID: 1, StartKey: []byte{1}}) + require.Equal(t, 1, m.Len()) + require.False(t, m.Has(tablepb.Span{TableID: 1, StartKey: []byte{1}})) + v = m.GetV(tablepb.Span{TableID: 1, StartKey: []byte{1}}) + require.Equal(t, v, 0) + + // Pointer value + mp := NewHashMap[*int]() + vp := &v + mp.ReplaceOrInsert(tablepb.Span{TableID: 1}, vp) + vp1, ok := mp.Get(tablepb.Span{TableID: 1}) + require.Equal(t, vp, vp1) + require.True(t, ok) + require.Equal(t, 1, m.Len()) +} + +func TestHashMapIter(t *testing.T) { + t.Parallel() + + m := NewHashMap[int]() + for i := 0; i < 4; i++ { + m.ReplaceOrInsert(tablepb.Span{TableID: int64(i)}, i) + } + + j := 0 + m.Range(func(span tablepb.Span, value int) bool { + _, ok := m.Get(span) + require.True(t, ok) + j++ + return true + }) + require.Equal(t, 4, j) + + j = 0 + m.Range(func(span tablepb.Span, value int) bool { + ok := m.Has(span) + require.True(t, ok) + j++ + return false + }) + require.Equal(t, 1, j) +} diff --git a/pkg/spanz/map_bench_test.go b/pkg/spanz/map_bench_test.go new file mode 100644 index 000000000..96335245e --- /dev/null +++ b/pkg/spanz/map_bench_test.go @@ -0,0 +1,116 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package spanz + +import ( + "testing" + + "github.com/pingcap/tiflow/cdc/processor/tablepb" +) + +func BenchmarkMap(b *testing.B) { + bm := NewBtreeMap[int]() + hm := NewHashMap[int]() + sm := SyncMap{} + + spans := [100]tablepb.Span{} + for i := 0; i < len(spans); i++ { + spans[i] = TableIDToComparableSpan(int64(i)) + } + for i, span := range spans { + bm.ReplaceOrInsert(span, i) + hm.ReplaceOrInsert(span, i) + sm.Store(span, i) + } + + bench := func(name string, benchBm, benchHm, benchSm func(i int)) { + b.Run(name, func(b *testing.B) { + if benchBm != nil { + b.ResetTimer() + b.Run("BtreeMap", func(b *testing.B) { + for i := 0; i < b.N; i++ { + benchBm(i) + } + }) + b.StopTimer() + } + if benchHm != nil { + b.ResetTimer() + b.Run("HashMap", func(b *testing.B) { + for i := 0; i < b.N; i++ { + benchHm(i) + } + }) + b.StopTimer() + } + if benchSm != nil { + b.ResetTimer() + b.Run("SyncMap", func(b *testing.B) { + for i := 0; i < b.N; i++ { + benchSm(i) + } + }) + b.StopTimer() + } + }) + } + + bench("Get", func(i int) { + bm.Get(spans[i%100]) + }, func(i int) { + hm.Get(spans[i%100]) + }, func(i int) { + sm.Load(spans[i%100]) + }) + + bench("Store", func(i int) { + bm.ReplaceOrInsert(spans[i%100], i) + }, func(i int) { + hm.ReplaceOrInsert(spans[i%100], i) + }, func(i int) { + sm.Store(spans[i%100], i) + }) + + bench("Delete+Store", func(i int) { + bm.Delete(spans[i%100]) + bm.ReplaceOrInsert(spans[i%100], i) + }, func(i int) { + hm.Delete(spans[i%100]) + hm.ReplaceOrInsert(spans[i%100], i) + }, func(i int) { + sm.Delete(spans[i%100]) + sm.Store(spans[i%100], i) + }) + + bench("Range", func(i int) { + bm.Ascend(func(span tablepb.Span, value int) bool { + return span.TableID > -1 + }) + }, func(i int) { + hm.Range(func(span tablepb.Span, value int) bool { + return span.TableID > -1 + }) + }, func(i int) { + sm.Range(func(span tablepb.Span, value any) bool { + return span.TableID > -1 + }) + }) + + start, end := spans[0], TableIDToComparableSpan(int64(len(spans))) + bench("AscendRange", func(i int) { + bm.AscendRange(start, end, func(span tablepb.Span, value int) bool { + return span.TableID > -1 + }) + }, nil, nil) +} diff --git a/pkg/spanz/set.go b/pkg/spanz/set.go new file mode 100644 index 000000000..b7a69bd37 --- /dev/null +++ b/pkg/spanz/set.go @@ -0,0 +1,58 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package spanz + +import "github.com/pingcap/tiflow/cdc/processor/tablepb" + +// Set maintains a set of Span. +type Set struct { + memo *BtreeMap[struct{}] +} + +// NewSet creates a Set. +func NewSet() *Set { + return &Set{ + memo: NewBtreeMap[struct{}](), + } +} + +// Add adds a span to Set. +func (s *Set) Add(span tablepb.Span) { + s.memo.ReplaceOrInsert(span, struct{}{}) +} + +// Remove removes a span from a Set. +func (s *Set) Remove(span tablepb.Span) { + s.memo.Delete(span) +} + +// Keys returns a collection of Span. +func (s *Set) Keys() []tablepb.Span { + result := make([]tablepb.Span, 0, s.memo.Len()) + s.memo.Ascend(func(span tablepb.Span, value struct{}) bool { + result = append(result, span) + return true + }) + return result +} + +// Contain checks whether a Span is in Set. +func (s *Set) Contain(span tablepb.Span) bool { + return s.memo.Has(span) +} + +// Size returns the size of Set. +func (s *Set) Size() int { + return s.memo.Len() +} diff --git a/pkg/spanz/set_test.go b/pkg/spanz/set_test.go new file mode 100644 index 000000000..cd1bbbaa8 --- /dev/null +++ b/pkg/spanz/set_test.go @@ -0,0 +1,38 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package spanz + +import ( + "testing" + + "github.com/pingcap/tiflow/cdc/processor/tablepb" + "github.com/stretchr/testify/require" +) + +func TestSet(t *testing.T) { + s := NewSet() + s.Add(tablepb.Span{TableID: 1}) + s.Add(tablepb.Span{TableID: 1}) + s.Add(tablepb.Span{TableID: 2}) + s.Add(tablepb.Span{TableID: 3}) + + require.Equal(t, 3, s.Size()) + s.Remove(tablepb.Span{TableID: 3}) + require.Equal(t, 2, s.Size()) + + require.True(t, s.Contain(tablepb.Span{TableID: 2})) + require.False(t, s.Contain(tablepb.Span{TableID: 5})) + + require.Equal(t, []tablepb.Span{{TableID: 1}, {TableID: 2}}, s.Keys()) +} diff --git a/pkg/spanz/span.go b/pkg/spanz/span.go new file mode 100644 index 000000000..bc89ceac3 --- /dev/null +++ b/pkg/spanz/span.go @@ -0,0 +1,165 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package spanz + +import ( + "bytes" + + "github.com/pingcap/log" + "github.com/pingcap/tidb/pkg/ddl" + "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/tablecodec" + "github.com/pingcap/tidb/pkg/util/codec" + "github.com/pingcap/tiflow/cdc/processor/tablepb" + "github.com/pingcap/tiflow/pkg/errors" + "go.uber.org/zap" +) + +const ( + // JobTableID is the id of `tidb_ddl_job`. + JobTableID = ddl.JobTableID + // JobHistoryID is the id of `tidb_ddl_history` + JobHistoryID = ddl.HistoryTableID +) + +// UpperBoundKey represents the maximum value. +var UpperBoundKey = []byte{255, 255, 255, 255, 255} + +// HackSpan will set End as UpperBoundKey if End is Nil. +func HackSpan(span tablepb.Span) tablepb.Span { + if span.StartKey == nil { + span.StartKey = []byte{} + } + + if span.EndKey == nil { + span.EndKey = UpperBoundKey + } + return span +} + +// GetTableRange returns the span to watch for the specified table +// Note that returned keys are not in memcomparable format. +func GetTableRange(tableID int64) (startKey, endKey []byte) { + tablePrefix := tablecodec.GenTablePrefix(tableID) + sep := byte('_') + recordMarker := byte('r') + + var start, end kv.Key + // ignore index keys. + start = append(tablePrefix, sep, recordMarker) + end = append(tablePrefix, sep, recordMarker+1) + return start, end +} + +// KeyInSpan check if k in the span range. +func KeyInSpan(k tablepb.Key, span tablepb.Span) bool { + if StartCompare(k, span.StartKey) >= 0 && + EndCompare(k, span.EndKey) < 0 { + return true + } + + return false +} + +// StartCompare compares two start keys. +// The result will be 0 if lhs==rhs, -1 if lhs < rhs, and +1 if lhs > rhs +func StartCompare(lhs []byte, rhs []byte) int { + if len(lhs) == 0 && len(rhs) == 0 { + return 0 + } + + // Nil means Negative infinity. + // It's difference with EndCompare. + if len(lhs) == 0 { + return -1 + } + + if len(rhs) == 0 { + return 1 + } + + return bytes.Compare(lhs, rhs) +} + +// EndCompare compares two end keys. +// The result will be 0 if lhs==rhs, -1 if lhs < rhs, and +1 if lhs > rhs +func EndCompare(lhs []byte, rhs []byte) int { + if len(lhs) == 0 && len(rhs) == 0 { + return 0 + } + + // Nil means Positive infinity. + // It's difference with StartCompare. + if len(lhs) == 0 { + return 1 + } + + if len(rhs) == 0 { + return -1 + } + + return bytes.Compare(lhs, rhs) +} + +// Intersect return to intersect part of lhs and rhs span. +// Return error if there's no intersect part +func Intersect(lhs tablepb.Span, rhs tablepb.Span) (span tablepb.Span, err error) { + if len(lhs.StartKey) != 0 && EndCompare(lhs.StartKey, rhs.EndKey) >= 0 || + len(rhs.StartKey) != 0 && EndCompare(rhs.StartKey, lhs.EndKey) >= 0 { + return tablepb.Span{}, errors.ErrIntersectNoOverlap.GenWithStackByArgs(lhs, rhs) + } + + start := lhs.StartKey + + if StartCompare(rhs.StartKey, start) > 0 { + start = rhs.StartKey + } + + end := lhs.EndKey + + if EndCompare(rhs.EndKey, end) < 0 { + end = rhs.EndKey + } + + return tablepb.Span{StartKey: start, EndKey: end}, nil +} + +// IsSubSpan returns true if the sub span is parents spans +func IsSubSpan(sub tablepb.Span, parents ...tablepb.Span) bool { + if bytes.Compare(sub.StartKey, sub.EndKey) >= 0 { + log.Panic("the sub span is invalid", zap.Reflect("subSpan", sub)) + } + for _, parent := range parents { + if StartCompare(parent.StartKey, sub.StartKey) <= 0 && + EndCompare(sub.EndKey, parent.EndKey) <= 0 { + return true + } + } + return false +} + +// ToSpan returns a span, keys are encoded in memcomparable format. +// See: https://github.com/facebook/mysql-5.6/wiki/MyRocks-record-format +func ToSpan(startKey, endKey []byte) tablepb.Span { + return tablepb.Span{ + StartKey: ToComparableKey(startKey), + EndKey: ToComparableKey(endKey), + } +} + +// ToComparableKey returns a memcomparable key. +// See: https://github.com/facebook/mysql-5.6/wiki/MyRocks-record-format +func ToComparableKey(key []byte) tablepb.Key { + return codec.EncodeBytes(nil, key) +} diff --git a/pkg/spanz/span_test.go b/pkg/spanz/span_test.go new file mode 100644 index 000000000..b2c8ca067 --- /dev/null +++ b/pkg/spanz/span_test.go @@ -0,0 +1,133 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package spanz + +import ( + "bytes" + "testing" + + "github.com/pingcap/tidb/pkg/tablecodec" + "github.com/pingcap/tiflow/cdc/processor/tablepb" + "github.com/stretchr/testify/require" +) + +func TestStartCompare(t *testing.T) { + t.Parallel() + + tests := []struct { + lhs []byte + rhs []byte + res int + }{ + {nil, nil, 0}, + {nil, []byte{}, 0}, + {[]byte{}, nil, 0}, + {[]byte{}, []byte{}, 0}, + {[]byte{1}, []byte{2}, -1}, + {[]byte{2}, []byte{1}, 1}, + {[]byte{3}, []byte{3}, 0}, + } + + for _, test := range tests { + require.Equal(t, test.res, StartCompare(test.lhs, test.rhs)) + } +} + +func TestEndCompare(t *testing.T) { + t.Parallel() + + tests := []struct { + lhs []byte + rhs []byte + res int + }{ + {nil, nil, 0}, + {nil, []byte{}, 0}, + {[]byte{}, nil, 0}, + {[]byte{}, []byte{}, 0}, + {[]byte{1}, []byte{2}, -1}, + {[]byte{2}, []byte{1}, 1}, + {[]byte{3}, []byte{3}, 0}, + } + + for _, test := range tests { + require.Equal(t, test.res, EndCompare(test.lhs, test.rhs)) + } +} + +func TestIntersect(t *testing.T) { + t.Parallel() + + tests := []struct { + lhs tablepb.Span + rhs tablepb.Span + // Set nil for non-intersect + res *tablepb.Span + }{ + { + lhs: tablepb.Span{StartKey: nil, EndKey: []byte{1}}, + rhs: tablepb.Span{StartKey: []byte{1}, EndKey: nil}, + res: nil, + }, + { + lhs: tablepb.Span{StartKey: nil, EndKey: nil}, + rhs: tablepb.Span{StartKey: nil, EndKey: nil}, + res: &tablepb.Span{StartKey: nil, EndKey: nil}, + }, + { + lhs: tablepb.Span{StartKey: nil, EndKey: nil}, + rhs: tablepb.Span{StartKey: []byte{1}, EndKey: []byte{2}}, + res: &tablepb.Span{StartKey: []byte{1}, EndKey: []byte{2}}, + }, + { + lhs: tablepb.Span{StartKey: []byte{0}, EndKey: []byte{3}}, + rhs: tablepb.Span{StartKey: []byte{1}, EndKey: []byte{2}}, + res: &tablepb.Span{StartKey: []byte{1}, EndKey: []byte{2}}, + }, + { + lhs: tablepb.Span{StartKey: []byte{0}, EndKey: []byte{2}}, + rhs: tablepb.Span{StartKey: []byte{1}, EndKey: []byte{2}}, + res: &tablepb.Span{StartKey: []byte{1}, EndKey: []byte{2}}, + }, + } + + for _, test := range tests { + t.Logf("running.., %v", test) + res, err := Intersect(test.lhs, test.rhs) + if test.res == nil { + require.NotNil(t, err) + } else { + require.Equal(t, *test.res, res) + } + + // Swap lhs and rhs, should get the same result + res2, err2 := Intersect(test.rhs, test.lhs) + if test.res == nil { + require.NotNil(t, err2) + } else { + require.Equal(t, *test.res, res2) + } + } +} + +func TestGetTableRange(t *testing.T) { + t.Parallel() + + startKey, endKey := GetTableRange(123) + require.Equal(t, -1, bytes.Compare(startKey, endKey)) + prefix := []byte(tablecodec.GenTableRecordPrefix(123)) + require.GreaterOrEqual(t, 0, bytes.Compare(startKey, prefix)) + prefix[len(prefix)-1]++ + require.LessOrEqual(t, 0, bytes.Compare(endKey, prefix)) +} diff --git a/pkg/spanz/sync_map.go b/pkg/spanz/sync_map.go new file mode 100644 index 000000000..9b6f0e077 --- /dev/null +++ b/pkg/spanz/sync_map.go @@ -0,0 +1,64 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package spanz + +import ( + "sync" + + "github.com/pingcap/tiflow/cdc/processor/tablepb" +) + +// SyncMap is thread-safe map, its key is tablepb.Span. +type SyncMap struct { + m sync.Map +} + +// Load returns the value stored in the map for a key, or nil if no +// value is present. +// The ok result indicates whether value was found in the map. +func (m *SyncMap) Load(key tablepb.Span) (value any, ok bool) { + return m.m.Load(toHashableSpan(key)) +} + +// Store sets the value for a key. +func (m *SyncMap) Store(key tablepb.Span, value any) { + m.m.Store(toHashableSpan(key), value) +} + +// LoadOrStore returns the existing value for the key if present. +// Otherwise, it stores and returns the given value. +// The loaded result is true if the value was loaded, false if stored. +func (m *SyncMap) LoadOrStore(key tablepb.Span, value any) (actual any, loaded bool) { + return m.m.LoadOrStore(toHashableSpan(key), value) +} + +// Delete deletes the value for a key. +func (m *SyncMap) Delete(key tablepb.Span) { + m.m.Delete(toHashableSpan(key)) +} + +// LoadAndDelete deletes the value for a key, returning the previous value if any. +// The loaded result reports whether the key was present. +func (m *SyncMap) LoadAndDelete(key tablepb.Span) (value any, loaded bool) { + return m.m.LoadAndDelete(toHashableSpan(key)) +} + +// Range calls f sequentially for each key and value present in the map. +// If f returns false, range stops the iteration. +func (m *SyncMap) Range(f func(span tablepb.Span, value any) bool) { + m.m.Range(func(key, value any) bool { + span := key.(hashableSpan).toSpan() + return f(span, value) + }) +}