Skip to content

Commit

Permalink
[close #348] support filter for key & value (#349)
Browse files Browse the repository at this point in the history
* support filter for key & value

Signed-off-by: Ping Yu <yuping@pingcap.com>

* add tests

Signed-off-by: Ping Yu <yuping@pingcap.com>

* polish ut

Signed-off-by: Ping Yu <yuping@pingcap.com>

* rename KvFilter

Signed-off-by: Ping Yu <yuping@pingcap.com>

* add it

Signed-off-by: Ping Yu <yuping@pingcap.com>

* polish

Signed-off-by: Ping Yu <yuping@pingcap.com>

* fix ut

Signed-off-by: Ping Yu <yuping@pingcap.com>

* fix metrics

Signed-off-by: Ping Yu <yuping@pingcap.com>

* fix CI error

Signed-off-by: Ping Yu <yuping@pingcap.com>

* decode user key for filter

Signed-off-by: Ping Yu <yuping@pingcap.com>

---------

Signed-off-by: Ping Yu <yuping@pingcap.com>
  • Loading branch information
pingyu authored Aug 25, 2023
1 parent c340278 commit 9b4dc22
Show file tree
Hide file tree
Showing 24 changed files with 535 additions and 24 deletions.
4 changes: 3 additions & 1 deletion cdc/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ ifeq (${CDC_ENABLE_VENDOR}, 1)
GOVENDORFLAG := -mod=vendor
endif

BUILD_FLAG := -buildvcs=false

GOBUILD := CGO_ENABLED=0 $(GO) build $(BUILD_FLAG) -trimpath $(GOVENDORFLAG)
GOBUILD_DEBUG := CGO_ENABLED=0 $(GO) build $(BUILD_FLAG) -gcflags "all=-N -l" $(GOVENDORFLAG)
GOBUILDNOVENDOR := CGO_ENABLED=0 $(GO) build $(BUILD_FLAG) -trimpath
Expand All @@ -43,7 +45,7 @@ FAILPOINT_DISABLE := $$(echo $(FAILPOINT_DIR) | xargs $(FAILPOINT) disable >/dev

RELEASE_VERSION ?=
ifeq ($(RELEASE_VERSION),)
RELEASE_VERSION := v1.1-master
RELEASE_VERSION := v1.2-master
release_version_regex := ^cdc-v[0-9]\..*$$
release_branch_regex := "^cdc-[0-9]\.[0-9].*$$|^HEAD$$|^.*/*tags/cdc-v[0-9]\.[0-9]\..*$$"
ifneq ($(shell git rev-parse --abbrev-ref HEAD | egrep $(release_branch_regex)),)
Expand Down
2 changes: 1 addition & 1 deletion cdc/cdc/http_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func (s *httpStatusSuite) TestServerTLSWithCommonName(c *check.C) {
resp.Body.Close()
return nil
}, retry.WithMaxTries(retryTime), retry.WithBackoffBaseDelay(50), retry.WithIsRetryableErr(cerrors.IsRetryableError))
c.Assert(strings.Contains(err.Error(), "remote error: tls: bad certificate"), check.IsTrue)
c.Assert(strings.Contains(err.Error(), "remote error: tls: "), check.IsTrue)

// test cli sends request with a cert will success
err = retry.Do(ctx, func() error {
Expand Down
4 changes: 3 additions & 1 deletion cdc/cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1110,9 +1110,11 @@ func (s *eventFeedSession) receiveFromStream(
changefeedID := util.ChangefeedIDFromCtx(ctx)
metricSendEventBatchResolvedSize := batchResolvedEventSize.WithLabelValues(captureAddr, changefeedID)

eventFilter := util.EventFilterFromCtx(ctx)

// always create a new region worker, because `receiveFromStreamV2` is ensured
// to call exactly once from outter code logic
worker := newRegionWorker(s, addr)
worker := newRegionWorker(s, addr, eventFilter)

defer worker.evictAllRegions()

Expand Down
8 changes: 8 additions & 0 deletions cdc/cdc/kv/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ var (
Name: "pull_event_count",
Help: "event count received by this puller",
}, []string{"type", "capture", "changefeed"})
filterOutEventCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "tikv_cdc",
Subsystem: "kvclient",
Name: "filter_out_event_count",
Help: "event count filtered out by event filter",
}, []string{"type", "capture", "changefeed"})
sendEventCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "tikv_cdc",
Expand Down Expand Up @@ -111,6 +118,7 @@ func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(eventFeedGauge)
registry.MustRegister(pullEventCounter)
registry.MustRegister(sendEventCounter)
registry.MustRegister(filterOutEventCounter)
registry.MustRegister(clientChannelSize)
registry.MustRegister(clientRegionTokenSize)
registry.MustRegister(cachedRegionSize)
Expand Down
32 changes: 26 additions & 6 deletions cdc/cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,20 @@ func (rsm *regionStateManager) delState(regionID uint64) {

type regionWorkerMetrics struct {
// kv events related metrics
metricReceivedEventSize prometheus.Observer
metricDroppedEventSize prometheus.Observer
metricReceivedEventSize prometheus.Observer
metricDroppedEventSize prometheus.Observer

metricPullEventInitializedCounter prometheus.Counter
metricPullEventPrewriteCounter prometheus.Counter
metricPullEventCommitCounter prometheus.Counter
metricPullEventCommittedCounter prometheus.Counter
metricPullEventRollbackCounter prometheus.Counter
metricSendEventResolvedCounter prometheus.Counter
metricSendEventCommitCounter prometheus.Counter
metricSendEventCommittedCounter prometheus.Counter

metricSendEventResolvedCounter prometheus.Counter
metricSendEventCommitCounter prometheus.Counter
metricSendEventCommittedCounter prometheus.Counter

metricFilterOutEventCommittedCounter prometheus.Counter

// TODO: add region runtime related metrics
}
Expand Down Expand Up @@ -156,9 +160,11 @@ type regionWorker struct {

enableOldValue bool
storeAddr string

eventFilter *util.KvFilter
}

func newRegionWorker(s *eventFeedSession, addr string) *regionWorker {
func newRegionWorker(s *eventFeedSession, addr string, eventFilter *util.KvFilter) *regionWorker {
cfg := config.GetGlobalServerConfig().KVClient
worker := &regionWorker{
session: s,
Expand All @@ -171,6 +177,7 @@ func newRegionWorker(s *eventFeedSession, addr string) *regionWorker {
enableOldValue: s.enableOldValue,
storeAddr: addr,
concurrent: cfg.WorkerConcurrent,
eventFilter: eventFilter,
}
return worker
}
Expand All @@ -190,6 +197,7 @@ func (w *regionWorker) initMetrics(ctx context.Context) {
metrics.metricSendEventResolvedCounter = sendEventCounter.WithLabelValues("native-resolved", captureAddr, changefeedID)
metrics.metricSendEventCommitCounter = sendEventCounter.WithLabelValues("commit", captureAddr, changefeedID)
metrics.metricSendEventCommittedCounter = sendEventCounter.WithLabelValues("committed", captureAddr, changefeedID)
metrics.metricFilterOutEventCommittedCounter = filterOutEventCounter.WithLabelValues("committed", captureAddr, changefeedID)

w.metrics = metrics
}
Expand Down Expand Up @@ -655,6 +663,18 @@ func (w *regionWorker) handleEventEntry(
}
case cdcpb.Event_COMMITTED:
w.metrics.metricPullEventCommittedCounter.Inc()

if w.eventFilter != nil {
matched, err := w.eventFilter.EventMatch(entry)
// EventMatch will return error when fail to decode key.
// Pass such entry to be handled by following steps.
if err == nil && !matched {
w.metrics.metricFilterOutEventCommittedCounter.Inc()
log.Debug("handleEventEntry: event is filter out and drop", zap.String("OpType", entry.OpType.String()), zap.String("key", hex.EncodeToString(entry.Key)))
continue
}
}

revent, err := assembleRowEvent(regionID, entry, w.enableOldValue)
if err != nil {
return errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion cdc/cdc/processor/pipeline/keyspan.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func NewKeySpanPipeline(ctx cdcContext.Context,

sinkNode := newSinkNode(keyspanID, sink, replicaInfo.StartTs, targetTs, flowController)

p.AppendNode(ctx, "puller", newPullerNode(keyspanID, replicaInfo))
p.AppendNode(ctx, "puller", newPullerNode(keyspanID, replicaInfo, replConfig.Filter))
p.AppendNode(ctx, "sorter", sorterNode)
p.AppendNode(ctx, "sink", sinkNode)

Expand Down
9 changes: 8 additions & 1 deletion cdc/cdc/processor/pipeline/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,25 @@ type pullerNode struct {
keyspanName string
keyspan regionspan.Span
replicaInfo *model.KeySpanReplicaInfo
eventFilter *util.KvFilter
cancel context.CancelFunc
wg *errgroup.Group
}

func newPullerNode(
keyspanID model.KeySpanID, replicaInfo *model.KeySpanReplicaInfo,
keyspanID model.KeySpanID, replicaInfo *model.KeySpanReplicaInfo, filterConfig *util.KvFilterConfig,
) pipeline.Node {
keyspan := regionspan.Span{Start: replicaInfo.Start, End: replicaInfo.End}
var filter *util.KvFilter
if filterConfig != nil {
filter = util.CreateFilter(filterConfig)
}
return &pullerNode{
keyspanID: keyspanID,
keyspanName: keyspan.Name(),
keyspan: keyspan,
replicaInfo: replicaInfo,
eventFilter: filter,
}
}

Expand All @@ -63,6 +69,7 @@ func (n *pullerNode) InitWithWaitGroup(ctx pipeline.NodeContext, wg *errgroup.Gr
ctxC = util.PutKeySpanInfoInCtx(ctxC, n.keyspanID, n.keyspanName)
ctxC = util.PutCaptureAddrInCtx(ctxC, ctx.GlobalVars().CaptureInfo.AdvertiseAddr)
ctxC = util.PutChangefeedIDInCtx(ctxC, ctx.ChangefeedVars().ID)
ctxC = util.PutEventFilterInCtx(ctxC, n.eventFilter)

plr := puller.NewPuller(ctxC, ctx.GlobalVars().PDClient, ctx.GlobalVars().GrpcPool, ctx.GlobalVars().RegionCache, ctx.GlobalVars().KVStorage,
n.replicaInfo.StartTs, n.keyspans(), true)
Expand Down
7 changes: 7 additions & 0 deletions cdc/metrics/grafana/tikv-cdc.json
Original file line number Diff line number Diff line change
Expand Up @@ -4837,6 +4837,13 @@
"intervalFactor": 1,
"legendFormat": "{{instance}}-{{type}}",
"refId": "A"
},
{
"expr": "sum(rate(tikv_cdc_kvclient_filter_out_event_count{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\"}[1m])) by (instance, type)",
"format": "time_series",
"intervalFactor": 1,
"legendFormat": "filter-out-{{instance}}-{{type}}",
"refId": "B"
}
],
"thresholds": [],
Expand Down
11 changes: 10 additions & 1 deletion cdc/pkg/cmd/cli/cli_changefeed_create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,12 @@ func (s *changefeedSuite) TestStrictDecodeConfig(c *check.C) {
dir := c.MkDir()
path := filepath.Join(dir, "config.toml")
content := `
check-gc-safe-point = true`
check-gc-safe-point = true
[filter]
key-prefix = "key\\x00"
key-pattern = "key\\x00pattern"
value-pattern = "value\\ffpattern"
`
err := os.WriteFile(path, []byte(content), 0o644)
c.Assert(err, check.IsNil)

Expand All @@ -49,6 +54,10 @@ func (s *changefeedSuite) TestStrictDecodeConfig(c *check.C) {
cfg := config.GetDefaultReplicaConfig()
err = o.strictDecodeConfig("cdc", cfg)
c.Assert(err, check.IsNil)
c.Assert(cfg.CheckGCSafePoint, check.IsTrue)
c.Assert(cfg.Filter.KeyPrefix, check.Equals, `key\x00`)
c.Assert(cfg.Filter.KeyPattern, check.Equals, `key\x00pattern`)
c.Assert(cfg.Filter.ValuePattern, check.Equals, `value\ffpattern`)

path = filepath.Join(dir, "config1.toml")
content = `
Expand Down
6 changes: 5 additions & 1 deletion cdc/pkg/cmd/cli/cli_changefeed_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"fmt"
"strings"

"github.com/tikv/migration/cdc/pkg/config"
"github.com/tikv/migration/cdc/pkg/etcd"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -158,8 +159,11 @@ func (o *updateChangefeedOptions) applyChanges(oldInfo *model.ChangeFeedInfo, cm
case "sink-uri":
newInfo.SinkURI = o.commonChangefeedOptions.sinkURI
case "config":
if newInfo.Config == nil {
newInfo.Config = &config.ReplicaConfig{}
}
cfg := newInfo.Config
if err = o.commonChangefeedOptions.strictDecodeConfig("TiCDC changefeed", cfg); err != nil {
if err = o.commonChangefeedOptions.strictDecodeConfig("TiKV-CDC changefeed", cfg); err != nil {
log.Error("decode config file error", zap.Error(err))
}
case "opts":
Expand Down
22 changes: 21 additions & 1 deletion cdc/pkg/cmd/cli/cli_changefeed_update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package cli

import (
"fmt"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -43,6 +44,26 @@ func (s *changefeedUpdateSuite) TestApplyChanges(c *check.C) {
c.Assert(err, check.IsNil)
c.Assert(newInfo.SinkURI, check.Equals, "mysql://root@downstream-tidb:4000")

// Test update config file
oldInfo = &model.ChangeFeedInfo{}
dir := c.MkDir()
path := filepath.Join(dir, "config.toml")
content := `
[filter]
key-prefix = "key\\x00"
key-pattern = "key\\x00pattern"
value-pattern = "value\\ffpattern"
`
err = os.WriteFile(path, []byte(content), 0o644)
c.Assert(err, check.IsNil)
c.Assert(cmd.ParseFlags([]string{fmt.Sprintf("--config=%s", path)}), check.IsNil)
newInfo, err = o.applyChanges(oldInfo, cmd)
c.Assert(err, check.IsNil)
filterCnf := newInfo.Config.Filter
c.Assert(filterCnf.KeyPrefix, check.Equals, `key\x00`)
c.Assert(filterCnf.KeyPattern, check.Equals, `key\x00pattern`)
c.Assert(filterCnf.ValuePattern, check.Equals, `value\ffpattern`)

// Test for cli command flags that should be ignored.
oldInfo = &model.ChangeFeedInfo{SortDir: "."}
c.Assert(cmd.ParseFlags([]string{"--interact"}), check.IsNil)
Expand All @@ -63,7 +84,6 @@ func (s *changefeedUpdateSuite) TestApplyChanges(c *check.C) {
c.Assert(newInfo.EndKey, check.Equals, "")
c.Assert(newInfo.Format, check.Equals, "hex")

dir := c.MkDir()
filename := filepath.Join(dir, "log.txt")
reset, err := initTestLogger(filename)
defer reset()
Expand Down
10 changes: 10 additions & 0 deletions cdc/pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ const (
"scheduler": {
"type": "keyspan-number",
"polling-time": -1
},
"filter": {
"key-prefix": "prefix",
"key-pattern": "key\\x00pattern",
"value-pattern": "value\\ffpattern"
}
}`

Expand All @@ -151,6 +156,11 @@ const (
"scheduler": {
"type": "keyspan-number",
"polling-time": -1
},
"filter": {
"key-prefix": "prefix",
"key-pattern": "key\\x00pattern",
"value-pattern": "value\\ffpattern"
}
}`
)
16 changes: 12 additions & 4 deletions cdc/pkg/config/replica_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"

"github.com/tikv/migration/cdc/pkg/config/outdated"
"github.com/tikv/migration/cdc/pkg/util"

"github.com/pingcap/errors"
"github.com/pingcap/log"
Expand All @@ -39,10 +40,11 @@ var defaultReplicaConfig = &ReplicaConfig{
type ReplicaConfig replicaConfig

type replicaConfig struct {
EnableOldValue bool `toml:"enable-old-value" json:"enable-old-value"`
CheckGCSafePoint bool `toml:"check-gc-safe-point" json:"check-gc-safe-point"`
Sink *SinkConfig `toml:"sink" json:"sink"`
Scheduler *SchedulerConfig `toml:"scheduler" json:"scheduler"`
EnableOldValue bool `toml:"enable-old-value" json:"enable-old-value"`
CheckGCSafePoint bool `toml:"check-gc-safe-point" json:"check-gc-safe-point"`
Sink *SinkConfig `toml:"sink" json:"sink"`
Scheduler *SchedulerConfig `toml:"scheduler" json:"scheduler"`
Filter *util.KvFilterConfig `toml:"filter" json:"filter"`
}

// Marshal returns the json marshal format of a ReplicationConfig
Expand Down Expand Up @@ -113,6 +115,12 @@ func (c *ReplicaConfig) Validate() error {
return err
}
}
if c.Filter != nil {
err := c.Filter.Validate()
if err != nil {
return err
}
}
return nil
}

Expand Down
6 changes: 6 additions & 0 deletions cdc/pkg/config/replica_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"testing"

"github.com/stretchr/testify/require"
"github.com/tikv/migration/cdc/pkg/util"
)

func mustIdentJSON(t *testing.T, j string) string {
Expand All @@ -38,6 +39,11 @@ func TestReplicaConfigMarshal(t *testing.T) {
Columns: []string{"a", "b"},
},
}
conf.Filter = &util.KvFilterConfig{
KeyPrefix: `prefix`,
KeyPattern: `key\x00pattern`,
ValuePattern: `value\ffpattern`,
}
b, err := conf.Marshal()
require.Nil(t, err)
require.Equal(t, testCfgTestReplicaConfigMarshal1, mustIdentJSON(t, b))
Expand Down
13 changes: 13 additions & 0 deletions cdc/pkg/util/ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
ctxKeyIsOwner = ctxKey("isOwner")
ctxKeyTimezone = ctxKey("timezone")
ctxKeyKVStorage = ctxKey("kvStorage")
ctxEventFilter = ctxKey("eventFilter")
)

// CaptureAddrFromCtx returns a capture ID stored in the specified context.
Expand Down Expand Up @@ -121,6 +122,18 @@ func PutChangefeedIDInCtx(ctx context.Context, changefeedID string) context.Cont
return context.WithValue(ctx, ctxKeyChangefeedID, changefeedID)
}

func EventFilterFromCtx(ctx context.Context) *KvFilter {
filter, ok := ctx.Value(ctxEventFilter).(*KvFilter)
if !ok {
return nil
}
return filter
}

func PutEventFilterInCtx(ctx context.Context, filter *KvFilter) context.Context {
return context.WithValue(ctx, ctxEventFilter, filter)
}

// ZapFieldCapture returns a zap field containing capture address
// TODO: log redact for capture address
func ZapFieldCapture(ctx context.Context) zap.Field {
Expand Down
Loading

0 comments on commit 9b4dc22

Please sign in to comment.