diff --git a/api/v1/api.go b/api/v1/api.go index 5bbef58db..a41c23d65 100644 --- a/api/v1/api.go +++ b/api/v1/api.go @@ -24,8 +24,7 @@ import ( "github.com/gin-gonic/gin" "github.com/pingcap/log" "github.com/pingcap/ticdc/api/middleware" - pv2 "github.com/pingcap/ticdc/api/v2" - v2 "github.com/pingcap/ticdc/api/v2" + "github.com/pingcap/ticdc/api/v2" "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/server" @@ -41,12 +40,12 @@ func setV1Header(c *gin.Context) { // OpenAPIV1 provides CDC v1 APIs type OpenAPIV1 struct { server server.Server - v2 pv2.OpenAPIV2 + v2 v2.OpenAPIV2 } // NewOpenAPIV1 creates a new OpenAPIV1. func NewOpenAPIV1(c server.Server) OpenAPIV1 { - return OpenAPIV1{c, pv2.NewOpenAPIV2(c)} + return OpenAPIV1{c, v2.NewOpenAPIV2(c)} } // RegisterOpenAPIV1Routes registers routes for OpenAPIV1 diff --git a/api/v2/changefeed.go b/api/v2/changefeed.go index cdc701cdd..b447ce8f9 100644 --- a/api/v2/changefeed.go +++ b/api/v2/changefeed.go @@ -26,7 +26,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/ticdc/api/middleware" "github.com/pingcap/ticdc/downstreamadapter/sink" - apperror "github.com/pingcap/ticdc/pkg/apperror" + "github.com/pingcap/ticdc/pkg/apperror" "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/errors" diff --git a/api/v2/model.go b/api/v2/model.go index fae90a3d1..9ca094d08 100644 --- a/api/v2/model.go +++ b/api/v2/model.go @@ -20,11 +20,11 @@ import ( "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/util" "github.com/pingcap/tiflow/cdc/model" bf "github.com/pingcap/tiflow/pkg/binlog-filter" "github.com/pingcap/tiflow/pkg/integrity" "github.com/pingcap/tiflow/pkg/security" - "github.com/pingcap/tiflow/pkg/util" ) // EmptyResponse return empty {} to http client diff --git a/cmd/cdc/cli/cli.go b/cmd/cdc/cli/cli.go index fdae7fc84..688ab14e8 100644 --- a/cmd/cdc/cli/cli.go +++ b/cmd/cdc/cli/cli.go @@ -14,9 +14,13 @@ package cli import ( + "context" + "os" + "github.com/pingcap/ticdc/cmd/cdc/factory" - "github.com/pingcap/tiflow/pkg/cmd/util" - "github.com/pingcap/tiflow/pkg/logutil" + "github.com/pingcap/ticdc/cmd/util" + "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/logger" "github.com/spf13/cobra" ) @@ -35,7 +39,13 @@ func NewCmdCli() *cobra.Command { cf.AddFlags(cmds) cmds.PersistentPreRun = func(cmd *cobra.Command, args []string) { // Here we will initialize the logging configuration and set the current default context. - cancel := util.InitCmd(cmd, &logutil.Config{Level: cf.GetLogLevel()}) + err := logger.InitLogger(&logger.Config{Level: cf.GetLogLevel()}) + if err != nil { + cmd.Printf("init logger error %v\n", errors.Trace(err)) + os.Exit(1) + } + _, cancel := context.WithCancel(context.Background()) + defer cancel() util.LogHTTPProxies() // A notify that complete immediately, it skips the second signal essentially. doneNotify := func() <-chan struct{} { diff --git a/cmd/cdc/cli/cli_capture_list.go b/cmd/cdc/cli/cli_capture_list.go index 947fc58aa..4328cfbf7 100644 --- a/cmd/cdc/cli/cli_capture_list.go +++ b/cmd/cdc/cli/cli_capture_list.go @@ -14,10 +14,11 @@ package cli import ( + "context" + "github.com/pingcap/ticdc/cmd/cdc/factory" + "github.com/pingcap/ticdc/cmd/util" apiv2client "github.com/pingcap/ticdc/pkg/api/v2" - cmdcontext "github.com/pingcap/tiflow/pkg/cmd/context" - "github.com/pingcap/tiflow/pkg/cmd/util" "github.com/spf13/cobra" ) @@ -43,8 +44,7 @@ func (o *listCaptureOptions) complete(f factory.Factory) error { // run runs the `cli capture list` command. func (o *listCaptureOptions) run(cmd *cobra.Command) error { - ctx := cmdcontext.GetDefaultContext() - + ctx := context.Background() raw, err := o.apiv2Client.Captures().List(ctx) if err != nil { return err diff --git a/cmd/cdc/cli/cli_changefeed_create.go b/cmd/cdc/cli/cli_changefeed_create.go index c7ab8ab06..14289923e 100644 --- a/cmd/cdc/cli/cli_changefeed_create.go +++ b/cmd/cdc/cli/cli_changefeed_create.go @@ -24,13 +24,12 @@ import ( "github.com/pingcap/log" v2 "github.com/pingcap/ticdc/api/v2" "github.com/pingcap/ticdc/cmd/cdc/factory" + "github.com/pingcap/ticdc/cmd/util" apiv2client "github.com/pingcap/ticdc/pkg/api/v2" "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/filter" + putil "github.com/pingcap/ticdc/pkg/util" "github.com/pingcap/tiflow/cdc/model" - cmdcontext "github.com/pingcap/tiflow/pkg/cmd/context" - "github.com/pingcap/tiflow/pkg/cmd/util" - putil "github.com/pingcap/tiflow/pkg/util" "github.com/spf13/cobra" "github.com/tikv/client-go/v2/oracle" "go.uber.org/zap" @@ -354,8 +353,7 @@ func newCmdCreateChangefeed(f factory.Factory) *cobra.Command { Short: "Create a new replication task (changefeed)", Args: cobra.NoArgs, Run: func(cmd *cobra.Command, args []string) { - ctx := cmdcontext.GetDefaultContext() - + ctx := context.Background() util.CheckErr(o.complete(f)) util.CheckErr(o.validate(cmd)) util.CheckErr(o.run(ctx, cmd)) diff --git a/cmd/cdc/cli/cli_changefeed_list.go b/cmd/cdc/cli/cli_changefeed_list.go index 0ce8c8088..8d7e2f855 100644 --- a/cmd/cdc/cli/cli_changefeed_list.go +++ b/cmd/cdc/cli/cli_changefeed_list.go @@ -14,14 +14,14 @@ package cli import ( + "context" "time" "github.com/pingcap/ticdc/cmd/cdc/factory" + "github.com/pingcap/ticdc/cmd/util" v2 "github.com/pingcap/ticdc/pkg/api/v2" "github.com/pingcap/tiflow/cdc/api/owner" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/cmd/context" - "github.com/pingcap/tiflow/pkg/cmd/util" "github.com/spf13/cobra" ) @@ -66,7 +66,7 @@ func (o *listChangefeedOptions) complete(f factory.Factory) error { // run the `cli changefeed list` command. func (o *listChangefeedOptions) run(cmd *cobra.Command) error { - ctx := context.GetDefaultContext() + ctx := context.Background() raw, err := o.apiClient.Changefeeds().List(ctx, o.namespace, "all") if err != nil { diff --git a/cmd/cdc/cli/cli_changefeed_move_table.go b/cmd/cdc/cli/cli_changefeed_move_table.go index 45c486c80..766ce59c3 100644 --- a/cmd/cdc/cli/cli_changefeed_move_table.go +++ b/cmd/cdc/cli/cli_changefeed_move_table.go @@ -17,8 +17,8 @@ import ( "context" "github.com/pingcap/ticdc/cmd/cdc/factory" + "github.com/pingcap/ticdc/cmd/util" apiv2client "github.com/pingcap/ticdc/pkg/api/v2" - "github.com/pingcap/tiflow/pkg/cmd/util" "github.com/spf13/cobra" ) diff --git a/cmd/cdc/cli/cli_changefeed_pause.go b/cmd/cdc/cli/cli_changefeed_pause.go index 60d779db3..9b1c09265 100644 --- a/cmd/cdc/cli/cli_changefeed_pause.go +++ b/cmd/cdc/cli/cli_changefeed_pause.go @@ -14,10 +14,11 @@ package cli import ( + "context" + "github.com/pingcap/ticdc/cmd/cdc/factory" + "github.com/pingcap/ticdc/cmd/util" apiv2client "github.com/pingcap/ticdc/pkg/api/v2" - "github.com/pingcap/tiflow/pkg/cmd/context" - "github.com/pingcap/tiflow/pkg/cmd/util" "github.com/spf13/cobra" ) @@ -55,7 +56,7 @@ func (o *pauseChangefeedOptions) complete(f factory.Factory) error { // run the `cli changefeed pause` command. func (o *pauseChangefeedOptions) run() error { - ctx := context.GetDefaultContext() + ctx := context.Background() return o.apiClient.Changefeeds().Pause(ctx, o.namespace, o.changefeedID) } diff --git a/cmd/cdc/cli/cli_changefeed_query.go b/cmd/cdc/cli/cli_changefeed_query.go index 1c619f5f4..d8bca884b 100644 --- a/cmd/cdc/cli/cli_changefeed_query.go +++ b/cmd/cdc/cli/cli_changefeed_query.go @@ -19,10 +19,10 @@ import ( "github.com/pingcap/errors" v2 "github.com/pingcap/ticdc/api/v2" "github.com/pingcap/ticdc/cmd/cdc/factory" + "github.com/pingcap/ticdc/cmd/util" apiv2client "github.com/pingcap/ticdc/pkg/api/v2" cerror "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/cmd/util" "github.com/spf13/cobra" ) diff --git a/cmd/cdc/cli/cli_changefeed_remove.go b/cmd/cdc/cli/cli_changefeed_remove.go index 2a3f8ca7b..4a9417385 100644 --- a/cmd/cdc/cli/cli_changefeed_remove.go +++ b/cmd/cdc/cli/cli_changefeed_remove.go @@ -14,13 +14,13 @@ package cli import ( + "context" "strings" "github.com/pingcap/ticdc/cmd/cdc/factory" + "github.com/pingcap/ticdc/cmd/util" apiv2client "github.com/pingcap/ticdc/pkg/api/v2" cerror "github.com/pingcap/ticdc/pkg/errors" - "github.com/pingcap/tiflow/pkg/cmd/context" - "github.com/pingcap/tiflow/pkg/cmd/util" "github.com/spf13/cobra" ) @@ -56,7 +56,7 @@ func (o *removeChangefeedOptions) complete(f factory.Factory) error { // run the `cli changefeed remove` command. func (o *removeChangefeedOptions) run(cmd *cobra.Command) error { - ctx := context.GetDefaultContext() + ctx := context.Background() changefeedDetail, err := o.apiClient.Changefeeds().Get(ctx, o.namespace, o.changefeedID) if err != nil { diff --git a/cmd/cdc/cli/cli_changefeed_resume.go b/cmd/cdc/cli/cli_changefeed_resume.go index ca675f791..02e5e7bce 100644 --- a/cmd/cdc/cli/cli_changefeed_resume.go +++ b/cmd/cdc/cli/cli_changefeed_resume.go @@ -20,10 +20,9 @@ import ( v2 "github.com/pingcap/ticdc/api/v2" "github.com/pingcap/ticdc/cmd/cdc/factory" + "github.com/pingcap/ticdc/cmd/util" apiv2client "github.com/pingcap/ticdc/pkg/api/v2" cerror "github.com/pingcap/ticdc/pkg/errors" - cmdcontext "github.com/pingcap/tiflow/pkg/cmd/context" - "github.com/pingcap/tiflow/pkg/cmd/util" "github.com/spf13/cobra" "github.com/tikv/client-go/v2/oracle" ) @@ -195,7 +194,7 @@ func (o *resumeChangefeedOptions) validateParams(ctx context.Context) error { // run the `cli changefeed resume` command. func (o *resumeChangefeedOptions) run(cmd *cobra.Command) error { - ctx := cmdcontext.GetDefaultContext() + ctx := context.Background() if err := o.validateParams(ctx); err != nil { return err diff --git a/cmd/cdc/cli/cli_changefeed_statistics.go b/cmd/cdc/cli/cli_changefeed_statistics.go index e10e2f3bf..3b3d0374d 100644 --- a/cmd/cdc/cli/cli_changefeed_statistics.go +++ b/cmd/cdc/cli/cli_changefeed_statistics.go @@ -20,9 +20,8 @@ import ( v2 "github.com/pingcap/ticdc/api/v2" "github.com/pingcap/ticdc/cmd/cdc/factory" + "github.com/pingcap/ticdc/cmd/util" apiv2client "github.com/pingcap/ticdc/pkg/api/v2" - cmdcontext "github.com/pingcap/tiflow/pkg/cmd/context" - "github.com/pingcap/tiflow/pkg/cmd/util" "github.com/spf13/cobra" "github.com/tikv/client-go/v2/oracle" ) @@ -96,7 +95,7 @@ func (o *statisticsChangefeedOptions) runCliWithAPIClient(ctx context.Context, c // run the `cli changefeed statistics` command. func (o *statisticsChangefeedOptions) run(cmd *cobra.Command) error { - ctx := cmdcontext.GetDefaultContext() + ctx := context.Background() tick := time.NewTicker(time.Duration(o.interval) * time.Second) var lastTime time.Time diff --git a/cmd/cdc/cli/cli_changefeed_update.go b/cmd/cdc/cli/cli_changefeed_update.go index b9f81475f..69172f2c1 100644 --- a/cmd/cdc/cli/cli_changefeed_update.go +++ b/cmd/cdc/cli/cli_changefeed_update.go @@ -14,16 +14,16 @@ package cli import ( + "context" "encoding/json" "strings" "github.com/pingcap/log" v2 "github.com/pingcap/ticdc/api/v2" "github.com/pingcap/ticdc/cmd/cdc/factory" + "github.com/pingcap/ticdc/cmd/util" apiv2client "github.com/pingcap/ticdc/pkg/api/v2" - cmdcontext "github.com/pingcap/tiflow/pkg/cmd/context" - "github.com/pingcap/tiflow/pkg/cmd/util" - putil "github.com/pingcap/tiflow/pkg/util" + putil "github.com/pingcap/ticdc/pkg/util" "github.com/r3labs/diff" "github.com/spf13/cobra" "github.com/spf13/pflag" @@ -91,7 +91,7 @@ func (o *updateChangefeedOptions) complete(f factory.Factory) error { // run the `cli changefeed update` command. func (o *updateChangefeedOptions) run(cmd *cobra.Command) error { - ctx := cmdcontext.GetDefaultContext() + ctx := context.Background() old, err := o.apiV2Client.Changefeeds().Get(ctx, o.namespace, o.changefeedID) if err != nil { diff --git a/cmd/cdc/cli/cli_changefeed_update_test.go b/cmd/cdc/cli/cli_changefeed_update_test.go index 1aedc4066..197b1bff7 100644 --- a/cmd/cdc/cli/cli_changefeed_update_test.go +++ b/cmd/cdc/cli/cli_changefeed_update_test.go @@ -22,9 +22,9 @@ import ( "github.com/golang/mock/gomock" "github.com/pingcap/errors" "github.com/pingcap/log" + putil "github.com/pingcap/ticdc/pkg/util" v2 "github.com/pingcap/tiflow/cdc/api/v2" "github.com/pingcap/tiflow/pkg/config" - putil "github.com/pingcap/tiflow/pkg/util" "github.com/stretchr/testify/require" ) diff --git a/cmd/cdc/cli/cli_tso_query.go b/cmd/cdc/cli/cli_tso_query.go index 555dea7c1..73baac55f 100644 --- a/cmd/cdc/cli/cli_tso_query.go +++ b/cmd/cdc/cli/cli_tso_query.go @@ -14,9 +14,10 @@ package cli import ( + "context" + "github.com/pingcap/ticdc/cmd/cdc/factory" - "github.com/pingcap/tiflow/pkg/cmd/context" - "github.com/pingcap/tiflow/pkg/cmd/util" + "github.com/pingcap/ticdc/cmd/util" "github.com/spf13/cobra" "github.com/tikv/client-go/v2/oracle" pd "github.com/tikv/pd/client" @@ -45,7 +46,7 @@ func (o *queryTsoOptions) complete(f factory.Factory) error { // run runs the `cli tso query` command. func (o *queryTsoOptions) run(cmd *cobra.Command) error { - ctx := context.GetDefaultContext() + ctx := context.Background() ts, logic, err := o.pdClient.GetTS(ctx) if err != nil { diff --git a/cmd/cdc/cli/cli_unsafe_delete_service_gc_safepoint.go b/cmd/cdc/cli/cli_unsafe_delete_service_gc_safepoint.go index 233d60434..b1dd7a5a8 100644 --- a/cmd/cdc/cli/cli_unsafe_delete_service_gc_safepoint.go +++ b/cmd/cdc/cli/cli_unsafe_delete_service_gc_safepoint.go @@ -14,14 +14,14 @@ package cli import ( + "context" "strings" "github.com/pingcap/errors" v2 "github.com/pingcap/ticdc/api/v2" "github.com/pingcap/ticdc/cmd/cdc/factory" + "github.com/pingcap/ticdc/cmd/util" apiv2client "github.com/pingcap/ticdc/pkg/api/v2" - cmdcontext "github.com/pingcap/tiflow/pkg/cmd/context" - "github.com/pingcap/tiflow/pkg/cmd/util" "github.com/spf13/cobra" ) @@ -74,7 +74,7 @@ func (o *unsafeDeleteServiceGcSafepointOptions) complete(f factory.Factory) erro // run runs the `cli unsafe delete-service-gc-safepoint` command. func (o *unsafeDeleteServiceGcSafepointOptions) run(cmd *cobra.Command) error { - ctx := cmdcontext.GetDefaultContext() + ctx := context.Background() err := o.apiClient.Unsafe().DeleteServiceGcSafePoint(ctx, o.getUpstreamConfig()) if err == nil { diff --git a/cmd/cdc/cli/cli_unsafe_reset.go b/cmd/cdc/cli/cli_unsafe_reset.go index dca7b7e37..667b7dd49 100644 --- a/cmd/cdc/cli/cli_unsafe_reset.go +++ b/cmd/cdc/cli/cli_unsafe_reset.go @@ -14,12 +14,13 @@ package cli import ( + "context" + "github.com/pingcap/errors" "github.com/pingcap/ticdc/cmd/cdc/factory" + "github.com/pingcap/ticdc/cmd/util" "github.com/pingcap/ticdc/pkg/etcd" "github.com/pingcap/ticdc/pkg/txnutil/gc" - cmdcontext "github.com/pingcap/tiflow/pkg/cmd/context" - "github.com/pingcap/tiflow/pkg/cmd/util" "github.com/spf13/cobra" pd "github.com/tikv/pd/client" ) @@ -66,7 +67,7 @@ func (o *unsafeResetOptions) addFlags(cmd *cobra.Command) { // run runs the `cli unsafe reset` command. func (o *unsafeResetOptions) run(cmd *cobra.Command) error { - ctx := cmdcontext.GetDefaultContext() + ctx := context.Background() defer o.pdClient.Close() defer o.etcdClient.Close() diff --git a/cmd/cdc/cli/cli_unsafe_resolve_lock.go b/cmd/cdc/cli/cli_unsafe_resolve_lock.go index 23ee2b26d..32cb19804 100644 --- a/cmd/cdc/cli/cli_unsafe_resolve_lock.go +++ b/cmd/cdc/cli/cli_unsafe_resolve_lock.go @@ -14,14 +14,14 @@ package cli import ( + "context" "strings" "time" v2 "github.com/pingcap/ticdc/api/v2" "github.com/pingcap/ticdc/cmd/cdc/factory" + "github.com/pingcap/ticdc/cmd/util" apiv2client "github.com/pingcap/ticdc/pkg/api/v2" - "github.com/pingcap/tiflow/pkg/cmd/context" - "github.com/pingcap/tiflow/pkg/cmd/util" "github.com/spf13/cobra" "github.com/tikv/client-go/v2/oracle" ) @@ -47,7 +47,7 @@ func newUnsafeResolveLockOptions() *unsafeResolveLockOptions { // complete adapts from the command line args to the data and client required. func (o *unsafeResolveLockOptions) complete(f factory.Factory) error { - ctx := context.GetDefaultContext() + ctx := context.Background() apiClient, err := f.APIV2Client() if err != nil { return err @@ -78,7 +78,7 @@ func (o *unsafeResolveLockOptions) complete(f factory.Factory) error { // run runs the `cli unsafe show-metadata` command. func (o *unsafeResolveLockOptions) run() error { - ctx := context.GetDefaultContext() + ctx := context.Background() var pdAddrs []string if o.upstreamPDAddrs != "" { pdAddrs = strings.Split(o.upstreamPDAddrs, ",") diff --git a/cmd/cdc/cli/cli_unsafe_show_metadata.go b/cmd/cdc/cli/cli_unsafe_show_metadata.go index 57ef6f524..5ef3c4e9b 100644 --- a/cmd/cdc/cli/cli_unsafe_show_metadata.go +++ b/cmd/cdc/cli/cli_unsafe_show_metadata.go @@ -14,11 +14,12 @@ package cli import ( + "context" + "github.com/pingcap/errors" "github.com/pingcap/ticdc/cmd/cdc/factory" + "github.com/pingcap/ticdc/cmd/util" apiv2client "github.com/pingcap/ticdc/pkg/api/v2" - "github.com/pingcap/tiflow/pkg/cmd/context" - "github.com/pingcap/tiflow/pkg/cmd/util" "github.com/spf13/cobra" ) @@ -45,7 +46,7 @@ func (o *unsafeShowMetadataOptions) complete(f factory.Factory) error { // run runs the `cli unsafe show-metadata` command. func (o *unsafeShowMetadataOptions) run(cmd *cobra.Command) error { - ctx := context.GetDefaultContext() + ctx := context.Background() kvs, err := o.apiClient.Unsafe().Metadata(ctx) if err != nil { diff --git a/cmd/cdc/factory/factory.go b/cmd/cdc/factory/factory.go index 6b4c7e5fc..c28db81e0 100644 --- a/cmd/cdc/factory/factory.go +++ b/cmd/cdc/factory/factory.go @@ -22,10 +22,10 @@ import ( "github.com/BurntSushi/toml" "github.com/pingcap/log" + "github.com/pingcap/ticdc/cmd/util" apiv2client "github.com/pingcap/ticdc/pkg/api/v2" "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/etcd" - "github.com/pingcap/tiflow/pkg/cmd/util" "github.com/pingcap/tiflow/pkg/security" "github.com/spf13/cobra" pd "github.com/tikv/pd/client" diff --git a/cmd/cdc/factory/factory_impl.go b/cmd/cdc/factory/factory_impl.go index f82c46393..49a49ba7b 100644 --- a/cmd/cdc/factory/factory_impl.go +++ b/cmd/cdc/factory/factory_impl.go @@ -14,15 +14,15 @@ package factory import ( + "context" "strings" "time" "github.com/pingcap/log" + "github.com/pingcap/ticdc/cmd/util" apiv2client "github.com/pingcap/ticdc/pkg/api/v2" "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/etcd" - cmdconetxt "github.com/pingcap/tiflow/pkg/cmd/context" - "github.com/pingcap/tiflow/pkg/cmd/util" "github.com/pingcap/tiflow/pkg/version" pd "github.com/tikv/pd/client" etcdlogutil "go.etcd.io/etcd/client/pkg/v3/logutil" @@ -55,7 +55,7 @@ func NewFactory(clientGetter ClientGetter) Factory { // EtcdClient creates new cdc etcd client. func (f *factoryImpl) EtcdClient() (*etcd.CDCEtcdClientImpl, error) { - ctx := cmdconetxt.GetDefaultContext() + ctx := context.Background() tlsConfig, err := f.ToTLSConfig() if err != nil { return nil, err @@ -115,7 +115,7 @@ func (f *factoryImpl) EtcdClient() (*etcd.CDCEtcdClientImpl, error) { // PdClient creates new pd client. func (f *factoryImpl) PdClient() (pd.Client, error) { - ctx := cmdconetxt.GetDefaultContext() + ctx := context.Background() credential := f.GetCredential() grpcTLSOption, err := f.ToGRPCDialOption() @@ -224,7 +224,7 @@ func (f *factoryImpl) findServerAddr() (string, error) { } defer etcdClient.Close() - ctx := cmdconetxt.GetDefaultContext() + ctx := context.Background() err = etcdClient.CheckMultipleCDCClusterExist(ctx) if err != nil { if errors.ErrMultipleCDCClustersExist.Equal(err) { @@ -252,7 +252,7 @@ func (f *factoryImpl) findServerAddr() (string, error) { } func checkCDCVersion(client apiv2client.APIV2Interface) error { - serverStatus, err := client.Status().Get(cmdconetxt.GetDefaultContext()) + serverStatus, err := client.Status().Get(context.Background()) if err != nil { return errors.Trace(err) } diff --git a/cmd/cdc/main.go b/cmd/cdc/main.go index 66fd59b34..83632b10a 100644 --- a/cmd/cdc/main.go +++ b/cmd/cdc/main.go @@ -1,4 +1,4 @@ -// Copyright 2024 PingCAP, Inc. +// Copyright 2025 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -22,10 +22,10 @@ import ( "github.com/pingcap/ticdc/cmd/cdc/cli" "github.com/pingcap/ticdc/cmd/cdc/server" "github.com/pingcap/ticdc/cmd/cdc/version" + "github.com/pingcap/ticdc/cmd/util" "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/tidb/pkg/util/collate" tiflowCmd "github.com/pingcap/tiflow/pkg/cmd" - "github.com/pingcap/tiflow/pkg/cmd/util" "github.com/spf13/cobra" "go.uber.org/zap" ) diff --git a/cmd/cdc/server/server.go b/cmd/cdc/server/server.go index 8ef993b51..32f4cdebd 100644 --- a/cmd/cdc/server/server.go +++ b/cmd/cdc/server/server.go @@ -21,12 +21,12 @@ import ( "github.com/fatih/color" "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/pingcap/ticdc/cmd/util" "github.com/pingcap/ticdc/pkg/config" cerror "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/logger" "github.com/pingcap/ticdc/server" "github.com/pingcap/ticdc/version" - "github.com/pingcap/tiflow/pkg/cmd/util" "github.com/pingcap/tiflow/pkg/security" cdcversion "github.com/pingcap/tiflow/pkg/version" "github.com/spf13/cobra" @@ -89,13 +89,12 @@ func (o *options) run(cmd *cobra.Command) error { } err := logger.InitLogger(loggerConfig) if err != nil { - cmd.Printf("init logger error %v\n", errors.ErrorStack(err)) + cmd.Printf("init logger error %v\n", errors.Trace(err)) os.Exit(1) } log.Info("init log", zap.String("file", loggerConfig.File), zap.String("level", loggerConfig.Level)) ctx, cancel := context.WithCancel(context.Background()) - setDefaultContext(ctx) defer cancel() cdcversion.ReleaseVersion = version.ReleaseVersion @@ -123,7 +122,7 @@ func (o *options) run(cmd *cobra.Command) error { } // complete adapts from the command line args and config file to the data required. -func (o *options) complete(cmd *cobra.Command) error { +func (o *options) complete(command *cobra.Command) error { cfg := config.GetDefaultServerConfig() if len(o.serverConfigFilePath) > 0 { // strict decode config file, but ignore debug item @@ -133,7 +132,7 @@ func (o *options) complete(cmd *cobra.Command) error { } o.serverConfig.Security = o.getCredential() - cmd.Flags().Visit(func(flag *pflag.Flag) { + command.Flags().Visit(func(flag *pflag.Flag) { switch flag.Name { case "addr": cfg.Addr = o.serverConfig.Addr @@ -171,7 +170,7 @@ func (o *options) complete(cmd *cobra.Command) error { } if cfg.DataDir == "" { - cmd.Printf(color.HiYellowString("[WARN] TiCDC server data-dir is not set. " + + command.Printf(color.HiYellowString("[WARN] TiCDC server data-dir is not set. " + "Please use `cdc server --data-dir` to start the cdc server if possible.\n")) } diff --git a/cmd/filter-helper/main.go b/cmd/filter-helper/main.go index 97a0e671c..1b96858e9 100644 --- a/cmd/filter-helper/main.go +++ b/cmd/filter-helper/main.go @@ -17,9 +17,9 @@ import ( "fmt" "strings" + "github.com/pingcap/ticdc/cmd/util" timodel "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/cmd/util" "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/filter" "github.com/spf13/cobra" diff --git a/cmd/kafka-consumer/option.go b/cmd/kafka-consumer/option.go index 671442e20..7f4209e8f 100644 --- a/cmd/kafka-consumer/option.go +++ b/cmd/kafka-consumer/option.go @@ -21,12 +21,12 @@ import ( "time" "github.com/pingcap/log" + cmdUtil "github.com/pingcap/ticdc/cmd/util" "github.com/pingcap/ticdc/pkg/errors" - cmdUtil "github.com/pingcap/tiflow/pkg/cmd/util" + "github.com/pingcap/ticdc/pkg/util" "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/filter" "github.com/pingcap/tiflow/pkg/sink/codec/common" - "github.com/pingcap/tiflow/pkg/util" "go.uber.org/zap" ) diff --git a/cmd/pulsar-consumer/main.go b/cmd/pulsar-consumer/main.go index ee9ff77da..a55bcf7aa 100644 --- a/cmd/pulsar-consumer/main.go +++ b/cmd/pulsar-consumer/main.go @@ -32,6 +32,7 @@ import ( "github.com/apache/pulsar-client-go/pulsar/auth" "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/pingcap/ticdc/cmd/util" "github.com/pingcap/ticdc/pkg/spanz" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/ddlsink" @@ -39,7 +40,6 @@ import ( eventsinkfactory "github.com/pingcap/tiflow/cdc/sink/dmlsink/factory" "github.com/pingcap/tiflow/cdc/sink/tablesink" sutil "github.com/pingcap/tiflow/cdc/sink/util" - cmdUtil "github.com/pingcap/tiflow/pkg/cmd/util" "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/logutil" "github.com/pingcap/tiflow/pkg/sink" @@ -47,7 +47,6 @@ import ( "github.com/pingcap/tiflow/pkg/sink/codec/canal" "github.com/pingcap/tiflow/pkg/sink/codec/common" tpulsar "github.com/pingcap/tiflow/pkg/sink/pulsar" - "github.com/pingcap/tiflow/pkg/util" "github.com/pingcap/tiflow/pkg/version" "github.com/spf13/cobra" "go.uber.org/zap" @@ -99,7 +98,7 @@ func (o *ConsumerOption) Adjust(upstreamURI *url.URL, configFile string) { replicaConfig := config.GetDefaultReplicaConfig() if configFile != "" { - err := cmdUtil.StrictDecodeFile(configFile, "pulsar consumer", replicaConfig) + err := util.StrictDecodeFile(configFile, "pulsar consumer", replicaConfig) if err != nil { log.Panic("decode config file failed", zap.Error(err)) } diff --git a/cmd/storage-consumer/main.go b/cmd/storage-consumer/main.go index 335b66681..4a8f8c787 100644 --- a/cmd/storage-consumer/main.go +++ b/cmd/storage-consumer/main.go @@ -30,7 +30,9 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/pingcap/ticdc/cmd/util" "github.com/pingcap/ticdc/pkg/spanz" + putil "github.com/pingcap/ticdc/pkg/util" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/ddlsink" @@ -38,7 +40,6 @@ import ( dmlfactory "github.com/pingcap/tiflow/cdc/sink/dmlsink/factory" "github.com/pingcap/tiflow/cdc/sink/tablesink" sinkutil "github.com/pingcap/tiflow/cdc/sink/util" - "github.com/pingcap/tiflow/pkg/cmd/util" "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/logutil" "github.com/pingcap/tiflow/pkg/quotes" @@ -48,7 +49,6 @@ import ( "github.com/pingcap/tiflow/pkg/sink/codec/canal" "github.com/pingcap/tiflow/pkg/sink/codec/common" "github.com/pingcap/tiflow/pkg/sink/codec/csv" - putil "github.com/pingcap/tiflow/pkg/util" "github.com/pingcap/tiflow/pkg/version" "go.uber.org/zap" ) @@ -143,7 +143,7 @@ func newConsumer(ctx context.Context) (*consumer, error) { config.StoreGlobalServerConfig(serverCfg) replicaConfig := config.GetDefaultReplicaConfig() if len(configFile) > 0 { - err := util.StrictDecodeFile(configFile, "storage consumer", replicaConfig) + err = util.StrictDecodeFile(configFile, "storage consumer", replicaConfig) if err != nil { log.Error("failed to decode config file", zap.Error(err)) return nil, err diff --git a/cmd/util/util.go b/cmd/util/util.go new file mode 100644 index 000000000..233966055 --- /dev/null +++ b/cmd/util/util.go @@ -0,0 +1,183 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "context" + "encoding/json" + "net/url" + "os" + "os/signal" + "strings" + "syscall" + + "github.com/BurntSushi/toml" + "github.com/pingcap/log" + "github.com/pingcap/ticdc/pkg/errors" + "github.com/spf13/cobra" + "go.uber.org/zap" + "golang.org/x/net/http/httpproxy" +) + +// StrictDecodeFile decodes the toml file strictly. If any item in confFile file is not mapped +// into the Config struct, issue an error and stop the server from starting. +func StrictDecodeFile(path, component string, cfg interface{}, ignoreCheckItems ...string) error { + metaData, err := toml.DecodeFile(path, cfg) + if err != nil { + return errors.Trace(err) + } + + // check if item is a ignoreCheckItem + hasIgnoreItem := func(item []string) bool { + for _, ignoreCheckItem := range ignoreCheckItems { + if item[0] == ignoreCheckItem { + return true + } + } + return false + } + + if undecoded := metaData.Undecoded(); len(undecoded) > 0 { + var b strings.Builder + hasUnknownConfigSize := 0 + for _, item := range undecoded { + if hasIgnoreItem(item) { + continue + } + + if hasUnknownConfigSize > 0 { + b.WriteString(", ") + } + b.WriteString(item.String()) + hasUnknownConfigSize++ + } + if hasUnknownConfigSize > 0 { + err = errors.Errorf("component %s's config file %s contained unknown configuration options: %s", + component, path, b.String()) + } + } + return errors.Trace(err) +} + +// LogHTTPProxies logs HTTP proxy relative environment variables. +func LogHTTPProxies() { + fields := findProxyFields() + if len(fields) > 0 { + log.Info("using proxy config", fields...) + } +} + +func findProxyFields() []zap.Field { + proxyCfg := httpproxy.FromEnvironment() + fields := make([]zap.Field, 0, 3) + if proxyCfg.HTTPProxy != "" { + fields = append(fields, zap.String("http_proxy", proxyCfg.HTTPProxy)) + } + if proxyCfg.HTTPSProxy != "" { + fields = append(fields, zap.String("https_proxy", proxyCfg.HTTPSProxy)) + } + if proxyCfg.NoProxy != "" { + fields = append(fields, zap.String("no_proxy", proxyCfg.NoProxy)) + } + return fields +} + +// shutdownNotify is a callback to notify caller that TiCDC is about to shutdown. +// It returns a done channel which receive an empty struct when shutdown is complete. +// It must be non-blocking. +type shutdownNotify func() <-chan struct{} + +// InitSignalHandling initializes signal handling. +// It must be called after InitCmd. +func InitSignalHandling(shutdown shutdownNotify, cancel context.CancelFunc) { + // systemd and k8s send signals twice. The first is for graceful shutdown, + // and the second is for force shutdown. + // We use 2 for channel length to ease testing. + signalChanLen := 2 + sc := make(chan os.Signal, signalChanLen) + signal.Notify(sc, + syscall.SIGHUP, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGQUIT) + + go func() { + sig := <-sc + log.Info("got signal, prepare to shutdown", zap.Stringer("signal", sig)) + done := shutdown() + select { + case <-done: + log.Info("shutdown complete") + case sig = <-sc: + log.Info("got signal, force shutdown", zap.Stringer("signal", sig)) + } + cancel() + }() +} + +// CheckErr is used to cmd err. +func CheckErr(err error) { + if errors.IsCliUnprintableError(err) { + err = nil + } + if err != nil { + if strings.Contains(err.Error(), string(errors.ErrCredentialNotFound.RFCCode())) { + msg := ", please use the following command to create a new one:\n" + + "1. specify the credential in the command line with `cdc cli --user --password `.\n" + + "2. specify the credential in the environment variables with `export TICDC_USER= TICDC_PASSWORD=`.\n" + + "3. `cdc cli configure-credentials` to initialize the default credential config.\n" + err = errors.New(err.Error() + msg) + } + } + cobra.CheckErr(err) +} + +// JSONPrint will output the data in JSON format. +func JSONPrint(cmd *cobra.Command, v interface{}) error { + data, err := json.MarshalIndent(v, "", " ") + if err != nil { + return err + } + cmd.Printf("%s\n", data) + return nil +} + +// Endpoint schemes. +const ( + HTTP = "http" + HTTPS = "https" +) + +// VerifyPdEndpoint verifies whether the pd endpoint is a valid http or https URL. +// The certificate is required when using https. +func VerifyPdEndpoint(pdEndpoint string, useTLS bool) error { + u, err := url.Parse(pdEndpoint) + if err != nil { + return errors.Annotate(err, "parse PD endpoint") + } + if (u.Scheme != HTTP && u.Scheme != HTTPS) || u.Host == "" { + return errors.New("PD endpoint should be a valid http or https URL") + } + + if useTLS { + if u.Scheme == HTTP { + return errors.New("PD endpoint scheme should be https") + } + } else { + if u.Scheme == HTTPS { + return errors.New("PD endpoint scheme is https, please provide certificate") + } + } + return nil +} diff --git a/downstreamadapter/sink/cloudstorage.go b/downstreamadapter/sink/cloudstorage.go index f4429eecf..879a9270d 100644 --- a/downstreamadapter/sink/cloudstorage.go +++ b/downstreamadapter/sink/cloudstorage.go @@ -31,8 +31,8 @@ import ( "github.com/pingcap/ticdc/pkg/metrics" "github.com/pingcap/ticdc/pkg/sink/cloudstorage" "github.com/pingcap/ticdc/pkg/sink/util" + putil "github.com/pingcap/ticdc/pkg/util" "github.com/pingcap/tidb/br/pkg/storage" - putil "github.com/pingcap/tiflow/pkg/util" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index ea8a20544..453088631 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -36,8 +36,8 @@ import ( "github.com/pingcap/ticdc/pkg/metrics" "github.com/pingcap/ticdc/pkg/node" "github.com/pingcap/ticdc/pkg/pdutil" + "github.com/pingcap/ticdc/pkg/util" "github.com/pingcap/ticdc/utils/chann" - "github.com/pingcap/tiflow/pkg/util" "github.com/tikv/client-go/v2/oracle" "go.uber.org/zap" "golang.org/x/sync/errgroup" diff --git a/logservice/logpuller/region_request_worker.go b/logservice/logpuller/region_request_worker.go index 077c6de56..e0b0974e6 100644 --- a/logservice/logpuller/region_request_worker.go +++ b/logservice/logpuller/region_request_worker.go @@ -24,8 +24,8 @@ import ( "github.com/pingcap/kvproto/pkg/cdcpb" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/log" + "github.com/pingcap/ticdc/pkg/util" "github.com/pingcap/tiflow/pkg/security" - "github.com/pingcap/tiflow/pkg/util" "github.com/pingcap/tiflow/pkg/version" "go.uber.org/zap" "golang.org/x/sync/errgroup" diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index 63c414cc5..bca13377d 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -24,12 +24,12 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" cerror "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/util" "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/config/outdated" "github.com/pingcap/tiflow/pkg/integrity" "github.com/pingcap/tiflow/pkg/redo" "github.com/pingcap/tiflow/pkg/sink" - "github.com/pingcap/tiflow/pkg/util" "go.uber.org/zap" ) diff --git a/pkg/errors/error.go b/pkg/errors/error.go index 62092b1d8..becd8f8c4 100644 --- a/pkg/errors/error.go +++ b/pkg/errors/error.go @@ -473,7 +473,10 @@ var ( "cdc met unexpected error: %s", errors.RFCCodeText("CDC:ErrUnexpected"), ) - + ErrLoadTimezone = errors.Normalize( + "load timezone", + errors.RFCCodeText("CDC:ErrLoadTimezone"), + ) // credential related errors ErrCredentialNotFound = errors.Normalize( "credential not found: %s", diff --git a/pkg/errors/helper.go b/pkg/errors/helper.go index 2d3505e52..3ac29250a 100644 --- a/pkg/errors/helper.go +++ b/pkg/errors/helper.go @@ -114,3 +114,18 @@ func ShouldFailChangefeed(err error) bool { } return false } + +var cliUnprintableError = []*errors.Error{ErrCliAborted} + +// IsCliUnprintableError returns true if the error should not be printed in cli. +func IsCliUnprintableError(err error) bool { + if err == nil { + return false + } + for _, e := range cliUnprintableError { + if strings.Contains(err.Error(), string(e.RFCCode())) { + return true + } + } + return false +} diff --git a/pkg/eventservice/dispatcher_stat.go b/pkg/eventservice/dispatcher_stat.go index 4c41bca7a..f5f2c4942 100644 --- a/pkg/eventservice/dispatcher_stat.go +++ b/pkg/eventservice/dispatcher_stat.go @@ -23,7 +23,7 @@ import ( "github.com/pingcap/ticdc/pkg/filter" "github.com/pingcap/ticdc/pkg/messaging" "github.com/pingcap/ticdc/pkg/node" - "github.com/pingcap/tiflow/pkg/util" + "github.com/pingcap/ticdc/pkg/util" "go.uber.org/atomic" "go.uber.org/zap" ) diff --git a/pkg/sink/cloudstorage/config.go b/pkg/sink/cloudstorage/config.go index f0c1ecd02..13006af21 100644 --- a/pkg/sink/cloudstorage/config.go +++ b/pkg/sink/cloudstorage/config.go @@ -25,9 +25,9 @@ import ( "github.com/imdario/mergo" "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/config" + "github.com/pingcap/ticdc/pkg/util" cerror "github.com/pingcap/tiflow/pkg/errors" psink "github.com/pingcap/tiflow/pkg/sink" - "github.com/pingcap/tiflow/pkg/util" "go.uber.org/zap" ) diff --git a/pkg/sink/cloudstorage/path_test.go b/pkg/sink/cloudstorage/path_test.go index bb0609e28..b40190c2c 100644 --- a/pkg/sink/cloudstorage/path_test.go +++ b/pkg/sink/cloudstorage/path_test.go @@ -27,12 +27,12 @@ import ( appcontext "github.com/pingcap/ticdc/pkg/common/context" "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/pdutil" + "github.com/pingcap/ticdc/pkg/util" timodel "github.com/pingcap/tidb/pkg/meta/model" pmodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/parser/types" "github.com/pingcap/tiflow/engine/pkg/clock" - "github.com/pingcap/tiflow/pkg/util" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/oracle" ) diff --git a/pkg/sink/codec/common/config.go b/pkg/sink/codec/common/config.go index 4b34bbe17..72935b7d2 100644 --- a/pkg/sink/codec/common/config.go +++ b/pkg/sink/codec/common/config.go @@ -25,7 +25,7 @@ import ( "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/pkg/config" cerror "github.com/pingcap/ticdc/pkg/errors" - "github.com/pingcap/tiflow/pkg/util" + "github.com/pingcap/ticdc/pkg/util" "go.uber.org/zap" ) diff --git a/pkg/sink/codec/common/verify_checksum.go b/pkg/sink/codec/common/verify_checksum.go index 7b66bd372..e928693c5 100644 --- a/pkg/sink/codec/common/verify_checksum.go +++ b/pkg/sink/codec/common/verify_checksum.go @@ -26,10 +26,10 @@ import ( "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" + "github.com/pingcap/ticdc/pkg/util" timodel "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/types" - "github.com/pingcap/tiflow/pkg/util" "go.uber.org/zap" ) diff --git a/pkg/sink/codec/csv/csv_message_test.go b/pkg/sink/codec/csv/csv_message_test.go index 27b62ce1f..e83d0f42b 100644 --- a/pkg/sink/codec/csv/csv_message_test.go +++ b/pkg/sink/codec/csv/csv_message_test.go @@ -18,6 +18,7 @@ import ( "strings" "testing" + "github.com/pingcap/ticdc/pkg/util" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/types" @@ -25,7 +26,6 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/sink/codec/common" - "github.com/pingcap/tiflow/pkg/util" "github.com/stretchr/testify/require" ) diff --git a/pkg/sink/codec/debezium/codec.go b/pkg/sink/codec/debezium/codec.go index afd619172..d09905a56 100644 --- a/pkg/sink/codec/debezium/codec.go +++ b/pkg/sink/codec/debezium/codec.go @@ -26,10 +26,10 @@ import ( "github.com/pingcap/ticdc/pkg/common" cerror "github.com/pingcap/ticdc/pkg/errors" ticommon "github.com/pingcap/ticdc/pkg/sink/codec/common" + "github.com/pingcap/ticdc/pkg/util" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util/hack" - "github.com/pingcap/tiflow/pkg/util" "github.com/tikv/client-go/v2/oracle" "go.uber.org/zap" ) diff --git a/pkg/sink/codec/encoder_group.go b/pkg/sink/codec/encoder_group.go index fe500a5b5..bf6f210a1 100644 --- a/pkg/sink/codec/encoder_group.go +++ b/pkg/sink/codec/encoder_group.go @@ -25,8 +25,8 @@ import ( "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/sink/codec/common" + "github.com/pingcap/ticdc/pkg/util" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/util" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) diff --git a/pkg/sink/codec/simple/decoder.go b/pkg/sink/codec/simple/decoder.go index 0dc57e13a..7d8cd58a7 100644 --- a/pkg/sink/codec/simple/decoder.go +++ b/pkg/sink/codec/simple/decoder.go @@ -25,10 +25,10 @@ import ( "github.com/pingcap/ticdc/pkg/common" cerror "github.com/pingcap/ticdc/pkg/errors" ticommon "github.com/pingcap/ticdc/pkg/sink/codec/common" + "github.com/pingcap/ticdc/pkg/util" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/util" "go.uber.org/zap" ) diff --git a/pkg/sink/mysql/config.go b/pkg/sink/mysql/config.go index 689be0d41..3c74a6f5e 100644 --- a/pkg/sink/mysql/config.go +++ b/pkg/sink/mysql/config.go @@ -30,11 +30,11 @@ import ( "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/pkg/config" cerror "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/util" "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tiflow/pkg/security" "github.com/pingcap/tiflow/pkg/sink" pmysql "github.com/pingcap/tiflow/pkg/sink/mysql" - "github.com/pingcap/tiflow/pkg/util" "go.uber.org/zap" ) diff --git a/pkg/sink/mysql/config_test.go b/pkg/sink/mysql/config_test.go index 669d53696..2148dc25b 100644 --- a/pkg/sink/mysql/config_test.go +++ b/pkg/sink/mysql/config_test.go @@ -24,7 +24,7 @@ import ( dmysql "github.com/go-sql-driver/mysql" "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/pkg/config" - "github.com/pingcap/tiflow/pkg/util" + "github.com/pingcap/ticdc/pkg/util" "github.com/stretchr/testify/require" ) diff --git a/pkg/sink/util/helper.go b/pkg/sink/util/helper.go index cffcb1756..c9d329d43 100644 --- a/pkg/sink/util/helper.go +++ b/pkg/sink/util/helper.go @@ -25,8 +25,8 @@ import ( ticonfig "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/sink/codec/common" + "github.com/pingcap/ticdc/pkg/util" "github.com/pingcap/tiflow/pkg/sink" - "github.com/pingcap/tiflow/pkg/util" "go.uber.org/zap" ) diff --git a/cmd/cdc/server/context.go b/pkg/util/pointer.go similarity index 59% rename from cmd/cdc/server/context.go rename to pkg/util/pointer.go index 05593c71c..61295fcfb 100644 --- a/cmd/cdc/server/context.go +++ b/pkg/util/pointer.go @@ -1,4 +1,4 @@ -// Copyright 2024 PingCAP, Inc. +// Copyright 2025 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -11,17 +11,17 @@ // See the License for the specific language governing permissions and // limitations under the License. -package server +package util -import "context" - -var defaultContext context.Context - -func setDefaultContext(ctx context.Context) { - defaultContext = ctx +// GetOrZero returns the value pointed to by p, or a zero value of +// its type if p is nil. +func GetOrZero[T any](p *T) T { + var val T + if p == nil { + return val + } + return *p } -// GetDefaultContext returns the default context for command line usage. -func GetDefaultContext() context.Context { - return defaultContext -} +// AddressOf return the address of the given input variable. +func AddressOf[T any](v T) *T { return &v } diff --git a/pkg/util/tz.go b/pkg/util/tz.go new file mode 100644 index 000000000..d298e9c53 --- /dev/null +++ b/pkg/util/tz.go @@ -0,0 +1,112 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "path/filepath" + "strings" + "time" + + "github.com/pingcap/log" + "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/tidb/pkg/types" + "github.com/pingcap/tidb/pkg/util/timeutil" + "go.uber.org/zap" +) + +// GetTimezone returns the timezone specified by the name +func GetTimezone(name string) (tz *time.Location, err error) { + switch strings.ToLower(name) { + case "", "system", "local": + tz, err = GetLocalTimezone() + err = errors.WrapError(errors.ErrLoadTimezone, err) + if err == nil { + log.Info("Use the timezone of the TiCDC server machine", + zap.String("timezoneName", name), + zap.String("timezone", tz.String())) + } + default: + tz, err = time.LoadLocation(name) + err = errors.WrapError(errors.ErrLoadTimezone, err) + if err == nil { + log.Info("Load the timezone specified by the user", + zap.String("timezoneName", name), + zap.String("timezone", tz.String())) + } + } + return +} + +// GetTimezoneFromZonefile read the timezone from file +func GetTimezoneFromZonefile(zonefile string) (tz *time.Location, err error) { + // the linked path of `/etc/localtime` sample: + // MacOS: /var/db/timezone/zoneinfo/Asia/Shanghai + // Linux: /usr/share/zoneinfo/Asia/Shanghai + region := filepath.Base(filepath.Dir(zonefile)) + zone := filepath.Base(zonefile) + var tzName string + if region == "zoneinfo" { + tzName = zone + } else { + tzName = filepath.Join(region, zone) + } + tz, err = time.LoadLocation(tzName) + err = errors.WrapError(errors.ErrLoadTimezone, err) + return +} + +// GetLocalTimezone returns the timezone in local system +func GetLocalTimezone() (*time.Location, error) { + if time.Local.String() != "Local" { + return time.Local, nil + } + str := timeutil.InferSystemTZ() + return GetTimezoneFromZonefile(str) +} + +// GetTimeZoneName returns the timezone name in a time.Location. +func GetTimeZoneName(tz *time.Location) string { + if tz == nil { + return "" + } + return tz.String() +} + +// ConvertTimezone converts the timestamp to the specified timezone +func ConvertTimezone(timestamp string, location string) (string, error) { + t, err := types.ParseTimestamp(types.StrictContext, timestamp) + if err != nil { + return "", err + } + + var tz *time.Location + switch strings.ToLower(location) { + case "", "system", "local": + tz, err = GetLocalTimezone() + default: + tz, err = time.LoadLocation(location) + } + if err != nil { + log.Info("cannot load timezone location", + zap.String("location", location), zap.Error(err)) + return "", err + } + + err = t.ConvertTimeZone(tz, time.UTC) + if err != nil { + return "", err + } + + return t.String(), nil +} diff --git a/tests/integration_tests/api_v2/cases.go b/tests/integration_tests/api_v2/cases.go index 48c9aa1e1..af7ddccb2 100644 --- a/tests/integration_tests/api_v2/cases.go +++ b/tests/integration_tests/api_v2/cases.go @@ -21,8 +21,8 @@ import ( "time" "github.com/pingcap/log" + "github.com/pingcap/ticdc/pkg/util" "github.com/pingcap/tiflow/pkg/config" - "github.com/pingcap/tiflow/pkg/util" "go.uber.org/zap" ) diff --git a/tests/utils/checksum_checker/main.go b/tests/utils/checksum_checker/main.go index ba49290e3..2838682f0 100644 --- a/tests/utils/checksum_checker/main.go +++ b/tests/utils/checksum_checker/main.go @@ -22,9 +22,9 @@ import ( "time" "github.com/pingcap/log" + "github.com/pingcap/ticdc/cmd/util" "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/transformer/columnselector" - cmdUtil "github.com/pingcap/tiflow/pkg/cmd/util" "github.com/pingcap/tiflow/pkg/config" "go.uber.org/zap" ) @@ -76,7 +76,7 @@ func main() { replicaConfig := config.GetDefaultReplicaConfig() if o.configFile != "" { - err = cmdUtil.StrictDecodeFile(o.configFile, "checksum checker", replicaConfig) + err = util.StrictDecodeFile(o.configFile, "checksum checker", replicaConfig) if err != nil { log.Panic("cannot decode config file", zap.Error(err)) }