Skip to content

Commit

Permalink
cmd: add consumer and filter package (#855)
Browse files Browse the repository at this point in the history
  • Loading branch information
wk989898 authored Jan 10, 2025
1 parent 66ad4cc commit e701ccb
Show file tree
Hide file tree
Showing 42 changed files with 2,641 additions and 21 deletions.
19 changes: 16 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ ifeq ("${IS_ALPINE}", "1")
CONSUMER_BUILD_FLAG = -tags musl
endif
GOBUILD := $(GOEXPERIMENT) CGO_ENABLED=$(CGO) $(GO) build $(BUILD_FLAG) -trimpath $(GOVENDORFLAG)
CONSUMER_GOBUILD := $(GOEXPERIMENT) CGO_ENABLED=1 $(GO) build $(CONSUMER_BUILD_FLAG) -trimpath $(GOVENDORFLAG)

PACKAGE_LIST := go list ./... | grep -vE 'vendor|proto|ticdc/tests|integration|testing_utils|pb|pbmock|ticdc/bin'
PACKAGES := $$($(PACKAGE_LIST))
Expand Down Expand Up @@ -109,7 +110,19 @@ generate_mock: tools/bin/mockgen
scripts/generate-mock.sh

cdc:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc ./cmd
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc ./cmd/cdc

kafka_consumer:
$(CONSUMER_GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc_kafka_consumer ./cmd/kafka-consumer

storage_consumer:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc_storage_consumer ./cmd/storage-consumer/main.go

pulsar_consumer:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc_pulsar_consumer ./cmd/pulsar-consumer/main.go

filter_helper:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc_filter_helper ./cmd/filter-helper/main.go

fmt: tools/bin/gofumports tools/bin/shfmt tools/bin/gci
@echo "run gci (format imports)"
Expand All @@ -126,9 +139,9 @@ integration_test_build: check_failpoint_ctl
$(FAILPOINT_ENABLE)
$(GOTEST) -ldflags '$(LDFLAGS)' -c -cover -covermode=atomic \
-coverpkg=github.com/pingcap/ticdc/... \
-o bin/cdc.test github.com/pingcap/ticdc/cmd \
-o bin/cdc.test github.com/pingcap/ticdc/cmd/cdc \
|| { $(FAILPOINT_DISABLE); echo "Failed to build cdc.test"; exit 1; }
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc ./cmd/main.go \
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc ./cmd/cdc/main.go \
|| { $(FAILPOINT_DISABLE); exit 1; }
$(FAILPOINT_DISABLE)

Expand Down
2 changes: 1 addition & 1 deletion cmd/cli/cli.go → cmd/cdc/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
package cli

import (
"github.com/pingcap/ticdc/cmd/factory"
"github.com/pingcap/ticdc/cmd/cdc/factory"
"github.com/pingcap/tiflow/pkg/cmd/util"
"github.com/pingcap/tiflow/pkg/logutil"
"github.com/spf13/cobra"
Expand Down
2 changes: 1 addition & 1 deletion cmd/cli/cli_capture.go → cmd/cdc/cli/cli_capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
package cli

import (
"github.com/pingcap/ticdc/cmd/factory"
"github.com/pingcap/ticdc/cmd/cdc/factory"
"github.com/spf13/cobra"
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
package cli

import (
"github.com/pingcap/ticdc/cmd/factory"
"github.com/pingcap/ticdc/cmd/cdc/factory"
apiv2client "github.com/pingcap/ticdc/pkg/api/v2"
cmdcontext "github.com/pingcap/tiflow/pkg/cmd/context"
"github.com/pingcap/tiflow/pkg/cmd/util"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
package cli

import (
"github.com/pingcap/ticdc/cmd/factory"
"github.com/pingcap/ticdc/cmd/cdc/factory"
"github.com/spf13/cobra"
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
v2 "github.com/pingcap/ticdc/api/v2"
"github.com/pingcap/ticdc/cmd/factory"
"github.com/pingcap/ticdc/cmd/cdc/factory"
apiv2client "github.com/pingcap/ticdc/pkg/api/v2"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/filter"
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ package cli
import (
"time"

"github.com/pingcap/ticdc/cmd/factory"
"github.com/pingcap/ticdc/cmd/cdc/factory"
v2 "github.com/pingcap/ticdc/pkg/api/v2"
"github.com/pingcap/tiflow/cdc/api/owner"
"github.com/pingcap/tiflow/cdc/model"
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ package cli
import (
"context"

"github.com/pingcap/ticdc/cmd/factory"
"github.com/pingcap/ticdc/cmd/cdc/factory"
apiv2client "github.com/pingcap/ticdc/pkg/api/v2"
"github.com/pingcap/tiflow/pkg/cmd/util"
"github.com/spf13/cobra"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
package cli

import (
"github.com/pingcap/ticdc/cmd/factory"
"github.com/pingcap/ticdc/cmd/cdc/factory"
apiv2client "github.com/pingcap/ticdc/pkg/api/v2"
"github.com/pingcap/tiflow/pkg/cmd/context"
"github.com/pingcap/tiflow/pkg/cmd/util"
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

"github.com/pingcap/errors"
v2 "github.com/pingcap/ticdc/api/v2"
"github.com/pingcap/ticdc/cmd/factory"
"github.com/pingcap/ticdc/cmd/cdc/factory"
apiv2client "github.com/pingcap/ticdc/pkg/api/v2"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/cmd/util"
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ package cli
import (
"strings"

"github.com/pingcap/ticdc/cmd/factory"
"github.com/pingcap/ticdc/cmd/cdc/factory"
apiv2client "github.com/pingcap/ticdc/pkg/api/v2"
"github.com/pingcap/tiflow/pkg/cmd/context"
"github.com/pingcap/tiflow/pkg/cmd/util"
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"strings"

v2 "github.com/pingcap/ticdc/api/v2"
"github.com/pingcap/ticdc/cmd/factory"
"github.com/pingcap/ticdc/cmd/cdc/factory"
apiv2client "github.com/pingcap/ticdc/pkg/api/v2"
cmdcontext "github.com/pingcap/tiflow/pkg/cmd/context"
"github.com/pingcap/tiflow/pkg/cmd/util"
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"time"

v2 "github.com/pingcap/ticdc/api/v2"
"github.com/pingcap/ticdc/cmd/factory"
"github.com/pingcap/ticdc/cmd/cdc/factory"
apiv2client "github.com/pingcap/ticdc/pkg/api/v2"
cmdcontext "github.com/pingcap/tiflow/pkg/cmd/context"
"github.com/pingcap/tiflow/pkg/cmd/util"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (

"github.com/pingcap/log"
v2 "github.com/pingcap/ticdc/api/v2"
"github.com/pingcap/ticdc/cmd/factory"
"github.com/pingcap/ticdc/cmd/cdc/factory"
apiv2client "github.com/pingcap/ticdc/pkg/api/v2"
cmdcontext "github.com/pingcap/tiflow/pkg/cmd/context"
"github.com/pingcap/tiflow/pkg/cmd/util"
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion cmd/cli/cli_tso.go → cmd/cdc/cli/cli_tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
package cli

import (
"github.com/pingcap/ticdc/cmd/factory"
"github.com/pingcap/ticdc/cmd/cdc/factory"
"github.com/spf13/cobra"
)

Expand Down
2 changes: 1 addition & 1 deletion cmd/cli/cli_tso_query.go → cmd/cdc/cli/cli_tso_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
package cli

import (
"github.com/pingcap/ticdc/cmd/factory"
"github.com/pingcap/ticdc/cmd/cdc/factory"
"github.com/pingcap/tiflow/pkg/cmd/context"
"github.com/pingcap/tiflow/pkg/cmd/util"
"github.com/spf13/cobra"
Expand Down
File renamed without changes.
File renamed without changes.
6 changes: 3 additions & 3 deletions cmd/main.go → cmd/cdc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ import (
"strings"

"github.com/pingcap/log"
"github.com/pingcap/ticdc/cmd/cli"
"github.com/pingcap/ticdc/cmd/server"
"github.com/pingcap/ticdc/cmd/version"
"github.com/pingcap/ticdc/cmd/cdc/cli"
"github.com/pingcap/ticdc/cmd/cdc/server"
"github.com/pingcap/ticdc/cmd/cdc/version"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/tidb/pkg/util/collate"
tiflowCmd "github.com/pingcap/tiflow/pkg/cmd"
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
113 changes: 113 additions & 0 deletions cmd/filter-helper/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"fmt"
"strings"

timodel "github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/cmd/util"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/filter"
"github.com/spf13/cobra"
)

var (
table string
ddl string
cfgPath string
)

func main() {
rootCmd := &cobra.Command{
Use: "TiCDC filter helper, use to check whether your filter config works as expected",
Short: "A tool to check table and ddl query against filter rules",
Run: runFilter,
}
rootCmd.Flags().StringVarP(&cfgPath, "config", "c", "", "changefeed config file path")
rootCmd.Flags().StringVarP(&table, "table", "t", "", "table name, format: [schema].[table] ")
rootCmd.Flags().StringVarP(&ddl, "ddl", "d", "", "ddl query")
if err := rootCmd.Execute(); err != nil {
fmt.Println(err)
}
}

func runFilter(cmd *cobra.Command, args []string) {
// fmt.Printf("Filter Rules: %v\n", filterRules)
// fmt.Printf("Schema Name: %s\n", schemaName)
// fmt.Printf("Table Name: %s\n", tableName)
cfg := &config.ReplicaConfig{}
err := util.StrictDecodeFile(cfgPath, "cdc filter helper", cfg)
if err != nil {
fmt.Printf("decode config file error: %v\n", err)
return
}
ft, err := filter.NewFilter(cfg, "")
if err != nil {
fmt.Printf("filter create error: %v\n", err)
return
}
tableAndSchema := strings.Split(table, ".")
if len(tableAndSchema) != 2 {
fmt.Printf("the input format is invalid, only support {schema}.{table}: %s\n", table)
return
}

target := "table"
if ddl != "" {
target = "ddl"
}

switch target {
case "table":
matched := !ft.ShouldIgnoreTable(tableAndSchema[0], tableAndSchema[1])
if matched {
fmt.Printf("Table: %s, Matched filter rule\n", table)
return
}
fmt.Printf("Table: %s, Not matched filter rule\n", table)
case "ddl":
ddlType := timodel.ActionCreateTable
discard := ft.ShouldDiscardDDL(ddlType, tableAndSchema[0], tableAndSchema[1])
if discard {
fmt.Printf("DDL: %s, should be discard by event filter rule\n", ddl)
return
}
ignored, err := ft.ShouldIgnoreDDLEvent(&model.DDLEvent{
StartTs: uint64(0),
Query: ddl,
Type: ddlType,
TableInfo: &model.TableInfo{
TableName: model.TableName{
Schema: tableAndSchema[0],
Table: tableAndSchema[1],
},
},
})
if err != nil {
fmt.Printf("filter ddl error: %s, error: %v\n", ddl, err)
return
}
if ignored {
fmt.Printf("DDL: %s, should be ignored by event filter rule\n", ddl)
return
}
fmt.Printf("DDL: %s, should not be discard by event filter rule\n", ddl)
default:
fmt.Printf("unknown target: %s", target)

}
}
Loading

0 comments on commit e701ccb

Please sign in to comment.