From 1b604bee22e8b12a93ebc38b6ee1cf694fe08911 Mon Sep 17 00:00:00 2001 From: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> Date: Wed, 8 Jan 2025 15:30:37 +0800 Subject: [PATCH] make check fix format (#822) * Add make clean * make fmt * fix line length and log style * remove error doc --- Makefile | 13 +++--- downstreamadapter/dispatcher/dispatcher.go | 45 ++++++++++++------- .../dispatcher/dispatcher_test.go | 4 +- downstreamadapter/dispatcher/helper.go | 2 +- .../event_dispatcher_manager.go | 10 +++-- .../dispatcher_orchestrator.go | 36 +++++++++------ .../downstream_performance_test.go | 2 +- downstreamadapter/sink/kafka_sink.go | 4 +- errors.toml | 8 ---- maintainer/barrier_event.go | 4 +- maintainer/maintainer.go | 4 +- maintainer/replica/replication_span.go | 30 ++++++------- maintainer/scheduler.go | 2 +- .../run.sh | 4 +- tests/integration_tests/cli_with_auth/run.sh | 2 +- .../integration_tests/fail_over_ddl_E/run.sh | 2 +- tests/integration_tests/synced_status/run.sh | 18 ++++---- .../synced_status_with_redo/run.sh | 18 ++++---- tools/check/check-errdoc.sh | 21 --------- tools/check/tools.go | 1 - tools/workload/main.go | 10 ++--- 21 files changed, 119 insertions(+), 121 deletions(-) delete mode 100644 errors.toml delete mode 100755 tools/check/check-errdoc.sh diff --git a/Makefile b/Makefile index 4b9f546af..ff5d5d80e 100644 --- a/Makefile +++ b/Makefile @@ -165,10 +165,6 @@ tidy: @echo "go mod tidy" ./tools/check/check-tidy.sh -errdoc: tools/bin/errdoc-gen - @echo "generator errors.toml" - ./tools/check/check-errdoc.sh - check-copyright: @echo "check-copyright" @./scripts/check-copyright.sh @@ -192,7 +188,14 @@ check-makefiles: format-makefiles format-makefiles: $(MAKE_FILES) $(SED_IN_PLACE) -e 's/^\(\t*\) /\1\t/g' -e 's/^\(\t*\) /\1/' -- $? -check: check-copyright fmt tidy errdoc check-diff-line-width check-ticdc-dashboard check-makefiles +check: check-copyright fmt tidy check-diff-line-width check-ticdc-dashboard check-makefiles @git --no-pager diff --exit-code || (echo "Please add changed files!" && false) +clean: + go clean -i ./... + rm -rf *.out + rm -rf bin + rm -rf tools/bin + rm -rf tools/include + workload: tools/bin/workload \ No newline at end of file diff --git a/downstreamadapter/dispatcher/dispatcher.go b/downstreamadapter/dispatcher/dispatcher.go index 37e9dd7b4..d19540edc 100644 --- a/downstreamadapter/dispatcher/dispatcher.go +++ b/downstreamadapter/dispatcher/dispatcher.go @@ -184,13 +184,14 @@ func NewDispatcher( return dispatcher } -func (d *Dispatcher) InitalizeTableSchemaStore(schemaInfo []*heartbeatpb.SchemaInfo) error { +func (d *Dispatcher) InitializeTableSchemaStore(schemaInfo []*heartbeatpb.SchemaInfo) error { // Only the table trigger event dispatcher need to create a tableSchemaStore // Because we only need to calculate the tableNames or TableIds in the sink // when the event dispatcher manager have table trigger event dispatcher if !d.tableSpan.Equal(heartbeatpb.DDLSpan) { - log.Error("InitalizeTableSchemaStore should only be received by table trigger event dispatcher", zap.Any("dispatcher", d.id)) - return apperror.ErrChangefeedInitTableTriggerEventDispatcherFailed.GenWithStackByArgs("InitalizeTableSchemaStore should only be received by table trigger event dispatcher") + log.Error("InitializeTableSchemaStore should only be received by table trigger event dispatcher", zap.Any("dispatcher", d.id)) + return apperror.ErrChangefeedInitTableTriggerEventDispatcherFailed. + GenWithStackByArgs("InitializeTableSchemaStore should only be received by table trigger event dispatcher") } if d.tableSchemaStore != nil { @@ -209,7 +210,11 @@ func (d *Dispatcher) InitalizeTableSchemaStore(schemaInfo []*heartbeatpb.SchemaI // 1. If the action is a write, we need to add the ddl event to the sink for writing to downstream. // 2. If the action is a pass, we just need to pass the event func (d *Dispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.DispatcherStatus) { - log.Debug("dispatcher handle dispatcher status", zap.Any("dispatcherStatus", dispatcherStatus), zap.Stringer("dispatcher", d.id), zap.Any("action", dispatcherStatus.GetAction()), zap.Any("ack", dispatcherStatus.GetAck())) + log.Debug("dispatcher handle dispatcher status", + zap.Any("dispatcherStatus", dispatcherStatus), + zap.Stringer("dispatcher", d.id), + zap.Any("action", dispatcherStatus.GetAction()), + zap.Any("ack", dispatcherStatus.GetAck())) // deal with the ack info ack := dispatcherStatus.GetAck() if ack != nil { @@ -238,8 +243,8 @@ func (d *Dispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.Dispat case d.errCh <- err: default: log.Error("error channel is full, discard error", - zap.Any("ChangefeedID", d.changefeedID.String()), - zap.Any("DispatcherID", d.id.String()), + zap.Any("changefeedID", d.changefeedID.String()), + zap.Any("dispatcherID", d.id.String()), zap.Error(err)) } return @@ -278,7 +283,8 @@ func (d *Dispatcher) HandleEvents(dispatcherEvents []DispatcherEvent, wakeCallba block = false // Dispatcher is ready, handle the events for _, dispatcherEvent := range dispatcherEvents { - log.Debug("dispatcher receive all event", zap.Any("event", dispatcherEvent.Event), zap.Stringer("dispatcher", d.id)) + log.Debug("dispatcher receive all event", + zap.Stringer("dispatcher", d.id), zap.Any("event", dispatcherEvent.Event)) failpoint.Inject("HandleEventsSlowly", func() { lag := time.Duration(rand.Intn(5000)) * time.Millisecond log.Warn("handle events slowly", zap.Duration("lag", lag)) @@ -298,7 +304,7 @@ func (d *Dispatcher) HandleEvents(dispatcherEvents []DispatcherEvent, wakeCallba } if event.GetType() != commonEvent.TypeResolvedEvent { - log.Debug("dispatcher receive event", zap.Any("event", event), zap.Stringer("dispatcher", d.id)) + log.Debug("dispatcher receive event", zap.Stringer("dispatcher", d.id), zap.Any("event", event)) } switch event.GetType() { @@ -354,9 +360,13 @@ func (d *Dispatcher) HandleEvents(dispatcherEvents []DispatcherEvent, wakeCallba }) d.dealWithBlockEvent(event) case commonEvent.TypeHandshakeEvent: - log.Warn("Receive handshake event unexpectedly", zap.Any("event", event), zap.Stringer("dispatcher", d.id)) + log.Warn("Receive handshake event unexpectedly", + zap.Stringer("dispatcher", d.id), zap.Any("event", event)) default: - log.Panic("Unexpected event type", zap.Any("event Type", event.GetType()), zap.Stringer("dispatcher", d.id), zap.Uint64("commitTs", event.GetCommitTs())) + log.Panic("Unexpected event type", + zap.Any("eventType", event.GetType()), + zap.Stringer("dispatcher", d.id), + zap.Uint64("commitTs", event.GetCommitTs())) } } return block @@ -419,7 +429,7 @@ func (d *Dispatcher) shouldBlock(event commonEvent.BlockEvent) bool { case commonEvent.TypeSyncPointEvent: return true default: - log.Error("invalid event type", zap.Any("event Type", event.GetType())) + log.Error("invalid event type", zap.Any("eventType", event.GetType())) } return false } @@ -435,8 +445,8 @@ func (d *Dispatcher) dealWithBlockEvent(event commonEvent.BlockEvent) { case d.errCh <- err: default: log.Error("error channel is full, discard error", - zap.Any("ChangefeedID", d.changefeedID.String()), - zap.Any("DispatcherID", d.id.String()), + zap.Any("changefeedID", d.changefeedID.String()), + zap.Any("dispatcherID", d.id.String()), zap.Error(err)) } return @@ -517,7 +527,11 @@ func (d *Dispatcher) dealWithBlockEvent(event commonEvent.BlockEvent) { for _, schemaIDChange := range event.GetUpdatedSchemas() { if schemaIDChange.TableID == d.tableSpan.TableID { if schemaIDChange.OldSchemaID != d.schemaID { - log.Error("Wrong Schema ID", zap.Any("dispatcherID", d.id), zap.Any("except schemaID", schemaIDChange.OldSchemaID), zap.Any("actual schemaID", d.schemaID), zap.Any("tableSpan", d.tableSpan.String())) + log.Error("Wrong Schema ID", + zap.Any("dispatcherID", d.id), + zap.Any("exceptSchemaID", schemaIDChange.OldSchemaID), + zap.Any("actualSchemaID", d.schemaID), + zap.Any("tableSpan", d.tableSpan.String())) return } else { d.schemaID = schemaIDChange.NewSchemaID @@ -594,7 +608,8 @@ func (d *Dispatcher) GetSyncPointInterval() time.Duration { } func (d *Dispatcher) Remove() { - log.Info("table event dispatcher component status changed to stopping", zap.String("table", d.tableSpan.String())) + log.Info("table event dispatcher component status changed to stopping", + zap.String("table", d.tableSpan.String())) d.isRemoving.Store(true) dispatcherStatusDynamicStream := GetDispatcherStatusDynamicStream() diff --git a/downstreamadapter/dispatcher/dispatcher_test.go b/downstreamadapter/dispatcher/dispatcher_test.go index 261b5b184..12078ac11 100644 --- a/downstreamadapter/dispatcher/dispatcher_test.go +++ b/downstreamadapter/dispatcher/dispatcher_test.go @@ -494,7 +494,7 @@ func TestTableTriggerEventDispatcherInMysql(t *testing.T) { tableTriggerEventDispatcher := newDispatcherForTest(sink, ddlTableSpan) require.Nil(t, tableTriggerEventDispatcher.tableSchemaStore) - err := tableTriggerEventDispatcher.InitalizeTableSchemaStore([]*heartbeatpb.SchemaInfo{}) + err := tableTriggerEventDispatcher.InitializeTableSchemaStore([]*heartbeatpb.SchemaInfo{}) require.NoError(t, err) helper := commonEvent.NewEventTestHelper(t) @@ -573,7 +573,7 @@ func TestTableTriggerEventDispatcherInKafka(t *testing.T) { tableTriggerEventDispatcher := newDispatcherForTest(sink, ddlTableSpan) require.Nil(t, tableTriggerEventDispatcher.tableSchemaStore) - err := tableTriggerEventDispatcher.InitalizeTableSchemaStore([]*heartbeatpb.SchemaInfo{}) + err := tableTriggerEventDispatcher.InitializeTableSchemaStore([]*heartbeatpb.SchemaInfo{}) require.NoError(t, err) helper := commonEvent.NewEventTestHelper(t) diff --git a/downstreamadapter/dispatcher/helper.go b/downstreamadapter/dispatcher/helper.go index 6a352ef17..e4ed31dc8 100644 --- a/downstreamadapter/dispatcher/helper.go +++ b/downstreamadapter/dispatcher/helper.go @@ -213,7 +213,7 @@ func newResendTask(message *heartbeatpb.TableSpanBlockStatus, dispatcher *Dispat } func (t *ResendTask) Execute() time.Time { - log.Debug("resend task", zap.Any("message", t.message), zap.Any("dispatcher id", t.dispatcher.id)) + log.Debug("resend task", zap.Any("message", t.message), zap.Any("dispatcherID", t.dispatcher.id)) t.dispatcher.blockStatusesChan <- t.message return time.Now().Add(200 * time.Millisecond) } diff --git a/downstreamadapter/dispatchermanager/event_dispatcher_manager.go b/downstreamadapter/dispatchermanager/event_dispatcher_manager.go index 6c380ceb1..983d26830 100644 --- a/downstreamadapter/dispatchermanager/event_dispatcher_manager.go +++ b/downstreamadapter/dispatchermanager/event_dispatcher_manager.go @@ -226,7 +226,7 @@ func (e *EventDispatcherManager) close(removeChangefeed bool) { if dispatcher.IsTableTriggerEventDispatcher() && e.sink.SinkType() != common.MysqlSinkType { err := appcontext.GetService[*HeartBeatCollector](appcontext.HeartbeatCollector).RemoveCheckpointTsMessage(e.changefeedID) if err != nil { - log.Error("remove checkpointTs message failed", zap.Error(err), zap.Any("changefeedID", e.changefeedID)) + log.Error("remove checkpointTs message failed", zap.Any("changefeedID", e.changefeedID), zap.Error(err)) } } dispatcher.Remove() @@ -313,7 +313,7 @@ func (e *EventDispatcherManager) InitalizeTableTriggerEventDispatcher(schemaInfo if e.tableTriggerEventDispatcher == nil { return nil } - err := e.tableTriggerEventDispatcher.InitalizeTableSchemaStore(schemaInfo) + err := e.tableTriggerEventDispatcher.InitializeTableSchemaStore(schemaInfo) if err != nil { return errors.Trace(err) } @@ -367,7 +367,8 @@ func (e *EventDispatcherManager) newDispatchers(infos []dispatcherCreateInfo) er if err != nil { return errors.Trace(err) } - log.Info("calculate real startTs for dispatchers", zap.Any("receive startTs", startTsList), zap.Any("real startTs", newStartTsList)) + log.Info("calculate real startTs for dispatchers", + zap.Any("receiveStartTs", startTsList), zap.Any("realStartTs", newStartTsList)) } else { newStartTsList = startTsList } @@ -677,7 +678,8 @@ func (e *EventDispatcherManager) cleanDispatcher(id common.DispatcherID, schemaI } else { e.metricEventDispatcherCount.Dec() } - log.Info("table event dispatcher completely stopped, and delete it from event dispatcher manager", zap.Any("dispatcher id", id)) + log.Info("table event dispatcher completely stopped, and delete it from event dispatcher manager", + zap.Any("dispatcherID", id)) } func (e *EventDispatcherManager) GetDispatcherMap() *DispatcherMap { diff --git a/downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go b/downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go index 70a965e18..e51fbc826 100644 --- a/downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go +++ b/downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go @@ -69,23 +69,25 @@ func (m *DispatcherOrchestrator) handlePostBootstrap(from node.ID, req *heartbea cfId := common.NewChangefeedIDFromPB(req.ChangefeedID) manager, exists := m.dispatcherManagers[cfId] if !exists || manager.GetTableTriggerEventDispatcher() == nil { - log.Error("Receive post bootstrap request but there is no table trigger event dispatcher", zap.Any("ChangefeedID", cfId.Name())) + log.Error("Receive post bootstrap request but there is no table trigger event dispatcher", + zap.Any("changefeedID", cfId.Name())) return nil } if manager.GetTableTriggerEventDispatcher().GetId() != common.NewDispatcherIDFromPB(req.TableTriggerEventDispatcherId) { log.Error("Receive post bootstrap request but the table trigger event dispatcher id is not match", - zap.Any("ChangefeedID", cfId.Name()), - zap.String("expected table trigger event dispatcher id", manager.GetTableTriggerEventDispatcher().GetId().String()), - zap.String("actual table trigger event dispatcher id", common.NewDispatcherIDFromPB(req.TableTriggerEventDispatcherId).String())) + zap.Any("changefeedID", cfId.Name()), + zap.String("expectedDispatcherID", manager.GetTableTriggerEventDispatcher().GetId().String()), + zap.String("actualDispatcherID", common.NewDispatcherIDFromPB(req.TableTriggerEventDispatcherId).String())) - error := apperror.ErrChangefeedInitTableTriggerEventDispatcherFailed.GenWithStackByArgs("Receive post bootstrap request but the table trigger event dispatcher id is not match") + err := apperror.ErrChangefeedInitTableTriggerEventDispatcherFailed. + GenWithStackByArgs("Receive post bootstrap request but the table trigger event dispatcher id is not match") response := &heartbeatpb.MaintainerPostBootstrapResponse{ ChangefeedID: req.ChangefeedID, Err: &heartbeatpb.RunningError{ Time: time.Now().String(), Node: from.String(), - Code: string(apperror.ErrorCode(error)), - Message: error.Error(), + Code: string(apperror.ErrorCode(err)), + Message: err.Error(), }, } return m.sendResponse(from, messaging.MaintainerManagerTopic, response) @@ -94,7 +96,8 @@ func (m *DispatcherOrchestrator) handlePostBootstrap(from node.ID, req *heartbea // init table schema store err := manager.InitalizeTableTriggerEventDispatcher(req.Schemas) if err != nil { - log.Error("failed to initalize table trigger event dispatcher", zap.Error(err), zap.Any("ChangefeedID", cfId.Name())) + log.Error("failed to initialize table trigger event dispatcher", + zap.Any("changefeedID", cfId.Name()), zap.Error(err)) response := &heartbeatpb.MaintainerPostBootstrapResponse{ ChangefeedID: req.ChangefeedID, @@ -122,14 +125,16 @@ func (m *DispatcherOrchestrator) handleAddDispatcherManager(from node.ID, req *h var startTs uint64 if !exists { cfConfig := &config.ChangefeedConfig{} - if err := json.Unmarshal(req.Config, cfConfig); err != nil { - log.Panic("failed to unmarshal changefeed config", zap.String("changefeed id", cfId.Name()), zap.Error(err)) + if err = json.Unmarshal(req.Config, cfConfig); err != nil { + log.Panic("failed to unmarshal changefeed config", + zap.String("changefeedID", cfId.Name()), zap.Error(err)) return err } manager, startTs, err = dispatchermanager.NewEventDispatcherManager(cfId, cfConfig, req.TableTriggerEventDispatcherId, req.StartTs, from) // Fast return the error to maintainer. if err != nil { - log.Error("failed to create new dispatcher manager", zap.Error(err), zap.Any("ChangefeedID", cfId.Name())) + log.Error("failed to create new dispatcher manager", + zap.Any("changefeedID", cfId.Name()), zap.Error(err)) response := &heartbeatpb.MaintainerBootstrapResponse{ ChangefeedID: req.ChangefeedID, @@ -151,7 +156,8 @@ func (m *DispatcherOrchestrator) handleAddDispatcherManager(from node.ID, req *h if manager.GetTableTriggerEventDispatcher() == nil && req.TableTriggerEventDispatcherId != nil { startTs, err = manager.NewTableTriggerEventDispatcher(req.TableTriggerEventDispatcherId, req.StartTs) if err != nil { - log.Error("failed to create new table trigger event dispatcher", zap.Error(err), zap.Any("ChangefeedID", cfId.Name())) + log.Error("failed to create new table trigger event dispatcher", + zap.Any("changefeedID", cfId.Name()), zap.Error(err)) response := &heartbeatpb.MaintainerBootstrapResponse{ ChangefeedID: req.ChangefeedID, @@ -169,7 +175,8 @@ func (m *DispatcherOrchestrator) handleAddDispatcherManager(from node.ID, req *h if manager.GetMaintainerID() != from { manager.SetMaintainerID(from) - log.Info("maintainer changed", zap.String("changefeed", cfId.Name()), zap.String("maintainer", from.String())) + log.Info("maintainer changed", + zap.String("changefeed", cfId.Name()), zap.String("maintainer", from.String())) } response := createBootstrapResponse(req.ChangefeedID, manager, startTs) @@ -193,7 +200,8 @@ func (m *DispatcherOrchestrator) handleRemoveDispatcherManager(from node.ID, req } } - log.Info("try close dispatcher manager", zap.String("changefeed", cfId.Name()), zap.Bool("success", response.Success)) + log.Info("try close dispatcher manager", + zap.String("changefeed", cfId.Name()), zap.Bool("success", response.Success)) return m.sendResponse(from, messaging.MaintainerTopic, response) } diff --git a/downstreamadapter/downstream_performance_test.go b/downstreamadapter/downstream_performance_test.go index eef052542..638021502 100644 --- a/downstreamadapter/downstream_performance_test.go +++ b/downstreamadapter/downstream_performance_test.go @@ -151,7 +151,7 @@ func TestDownstream(t *testing.T) { } wg.Wait() - log.Warn("test begin", zap.Any("create dispatcher cost time", time.Since(start))) + log.Warn("test begin", zap.Any("duration", time.Since(start))) // 插入数据, 先固定 data 格式 go pushDataIntoDispatchers(dispatcherIDSet, helper) diff --git a/downstreamadapter/sink/kafka_sink.go b/downstreamadapter/sink/kafka_sink.go index 3a5dd38ca..47303e1d0 100644 --- a/downstreamadapter/sink/kafka_sink.go +++ b/downstreamadapter/sink/kafka_sink.go @@ -134,7 +134,7 @@ func (s *KafkaSink) run(ctx context.Context) { case s.errCh <- err: default: log.Error("error channel is full, discard error", - zap.Any("ChangefeedID", s.changefeedID.String()), + zap.Any("changefeedID", s.changefeedID.String()), zap.Error(err)) } } @@ -174,7 +174,7 @@ func (s *KafkaSink) WriteBlockEvent(event commonEvent.BlockEvent) error { log.Error("KafkaSink doesn't support this type of block event", zap.String("namespace", s.changefeedID.Namespace()), zap.String("changefeed", s.changefeedID.Name()), - zap.Any("event type", event.GetType())) + zap.Any("eventType", event.GetType())) } return nil } diff --git a/errors.toml b/errors.toml deleted file mode 100644 index 34e8d7626..000000000 --- a/errors.toml +++ /dev/null @@ -1,8 +0,0 @@ -# AUTOGENERATED BY github.com/pingcap/errors/errdoc-gen -# YOU CAN CHANGE THE 'description'/'workaround' FIELDS IF THEM ARE IMPROPER. - -["CDC:ErrChangefeedRetryable"] -error = ''' -changefeed is in retryable state -''' - diff --git a/maintainer/barrier_event.go b/maintainer/barrier_event.go index 2956af853..41d665e86 100644 --- a/maintainer/barrier_event.go +++ b/maintainer/barrier_event.go @@ -111,8 +111,8 @@ func NewBlockEvent(cfID common.ChangeFeedID, controller *Controller, } log.Info("new block event is created", zap.String("changefeedID", cfID.Name()), - zap.Uint64("block-ts", event.commitTs), - zap.Bool("sync-point", event.isSyncPoint), + zap.Uint64("blockTs", event.commitTs), + zap.Bool("syncPoint", event.isSyncPoint), zap.Any("detail", status)) return event } diff --git a/maintainer/maintainer.go b/maintainer/maintainer.go index 5b715954e..c1cfc6dc2 100644 --- a/maintainer/maintainer.go +++ b/maintainer/maintainer.go @@ -193,7 +193,7 @@ func NewMaintainer(cfID common.ChangeFeedID, m.bootstrapper = bootstrap.NewBootstrapper[heartbeatpb.MaintainerBootstrapResponse](m.id.Name(), m.getNewBootstrapFn()) log.Info("changefeed maintainer is created", zap.String("id", cfID.String()), zap.Uint64("checkpointTs", checkpointTs), - zap.String("ddl dispatcher", tableTriggerEventDispatcherID.String())) + zap.String("ddlDispatcherID", tableTriggerEventDispatcherID.String())) metrics.MaintainerGauge.WithLabelValues(cfID.Namespace(), cfID.Name()).Inc() // Should update metrics immediately when maintainer is created // FIXME: Use a correct context @@ -736,7 +736,7 @@ func (m *Maintainer) getNewBootstrapFn() bootstrap.NewBootstrapMessageFn { if id == m.selfNode.ID { log.Info("create table event trigger dispatcher", zap.String("changefeed", m.id.String()), zap.String("server", id.String()), - zap.String("dispatcher id", m.ddlSpan.ID.String())) + zap.String("dispatcherID", m.ddlSpan.ID.String())) msg.TableTriggerEventDispatcherId = m.ddlSpan.ID.ToPB() } log.Info("send maintainer bootstrap message", diff --git a/maintainer/replica/replication_span.go b/maintainer/replica/replication_span.go index f86de9400..0843b7f26 100644 --- a/maintainer/replica/replication_span.go +++ b/maintainer/replica/replication_span.go @@ -73,11 +73,11 @@ func NewReplicaSet(cfID common.ChangeFeedID, CheckpointTs: checkpointTs, }) log.Info("new replica set created", - zap.String("changefeed id", cfID.Name()), + zap.String("changefeedID", cfID.Name()), zap.String("id", id.String()), - zap.Int64("schema id", SchemaID), - zap.Int64("table id", span.TableID), - zap.String("group id", replica.GetGroupName(r.groupID)), + zap.Int64("schemaID", SchemaID), + zap.Int64("tableID", span.TableID), + zap.String("groupID", replica.GetGroupName(r.groupID)), zap.String("start", hex.EncodeToString(span.StartKey)), zap.String("end", hex.EncodeToString(span.EndKey))) return r @@ -105,14 +105,14 @@ func NewWorkingReplicaSet( r.initGroupID() r.initStatus(status) log.Info("new working replica set created", - zap.String("changefeed id", cfID.Name()), + zap.String("changefeedID", cfID.Name()), zap.String("id", id.String()), - zap.String("node id", nodeID.String()), - zap.Uint64("checkpoint ts", status.CheckpointTs), - zap.String("component status", status.ComponentStatus.String()), - zap.Int64("schema id", SchemaID), - zap.Int64("table id", span.TableID), - zap.String("group id", replica.GetGroupName(r.groupID)), + zap.String("nodeID", nodeID.String()), + zap.Uint64("checkpointTs", status.CheckpointTs), + zap.String("componentStatus", status.ComponentStatus.String()), + zap.Int64("schemaID", SchemaID), + zap.Int64("tableID", span.TableID), + zap.String("groupID", replica.GetGroupName(r.groupID)), zap.String("start", hex.EncodeToString(span.StartKey)), zap.String("end", hex.EncodeToString(span.EndKey))) return r @@ -121,9 +121,9 @@ func NewWorkingReplicaSet( func (r *SpanReplication) initStatus(status *heartbeatpb.TableSpanStatus) { if status == nil || status.CheckpointTs == 0 { log.Panic("add replica with invalid checkpoint ts", - zap.String("changefeed id", r.ChangefeedID.Name()), + zap.String("changefeedID", r.ChangefeedID.Name()), zap.String("id", r.ID.String()), - zap.Uint64("checkpoint ts", status.CheckpointTs), + zap.Uint64("checkpointTs", status.CheckpointTs), ) } r.status.Store(status) @@ -135,8 +135,8 @@ func (r *SpanReplication) initGroupID() { // check if the table is split totalSpan := spanz.TableIDToComparableSpan(span.TableID) if !spanz.IsSubSpan(span, totalSpan) { - log.Warn("invalid span range", zap.String("changefeed id", r.ChangefeedID.Name()), - zap.String("id", r.ID.String()), zap.Int64("table id", span.TableID), + log.Warn("invalid span range", zap.String("changefeedID", r.ChangefeedID.Name()), + zap.String("id", r.ID.String()), zap.Int64("tableID", span.TableID), zap.String("totalSpan", totalSpan.String()), zap.String("start", hex.EncodeToString(span.StartKey)), zap.String("end", hex.EncodeToString(span.EndKey))) diff --git a/maintainer/scheduler.go b/maintainer/scheduler.go index 5f132f9ed..b5583ef4e 100644 --- a/maintainer/scheduler.go +++ b/maintainer/scheduler.go @@ -141,7 +141,7 @@ func (s *splitScheduler) doCheck(ret pkgReplica.GroupCheckResult, start time.Tim log.Info("split span", zap.String("changefeed", s.changefeedID.Name()), zap.String("span", totalSpan.String()), - zap.Int("span szie", len(spans))) + zap.Int("spanSize", len(spans))) s.opController.AddMergeSplitOperator(ret.Replications, spans) } } diff --git a/tests/integration_tests/capture_suicide_while_balance_table/run.sh b/tests/integration_tests/capture_suicide_while_balance_table/run.sh index 3507340af..c9f8323c1 100644 --- a/tests/integration_tests/capture_suicide_while_balance_table/run.sh +++ b/tests/integration_tests/capture_suicide_while_balance_table/run.sh @@ -51,8 +51,8 @@ function run() { check_table_exists "capture_suicide_while_balance_table.t$i" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} done - capture1_id=$(cdc cli capture list | grep -v "Command to ticdc"| jq -r '.[]|select(.address=="127.0.0.1:8300")|.id') - capture2_id=$(cdc cli capture list | grep -v "Command to ticdc"| jq -r '.[]|select(.address=="127.0.0.1:8301")|.id') + capture1_id=$(cdc cli capture list | grep -v "Command to ticdc" | jq -r '.[]|select(.address=="127.0.0.1:8300")|.id') + capture2_id=$(cdc cli capture list | grep -v "Command to ticdc" | jq -r '.[]|select(.address=="127.0.0.1:8301")|.id') target_capture=$capture1_id one_table_id=$(cdc cli processor query -c $changefeed_id -p $capture2_id | grep -v "Command to ticdc" | jq -r '.status.tables|keys[0]') diff --git a/tests/integration_tests/cli_with_auth/run.sh b/tests/integration_tests/cli_with_auth/run.sh index ac7bb3e7b..b914b49a4 100644 --- a/tests/integration_tests/cli_with_auth/run.sh +++ b/tests/integration_tests/cli_with_auth/run.sh @@ -15,7 +15,7 @@ export TICDC_PASSWORD=ticdc_secret function check_changefeed_count() { pd_addr=$1 expected=$2 - feed_count=$(cdc cli changefeed list --pd=$pd_addr | grep -v "Command to ticdc"| jq '.|length') + feed_count=$(cdc cli changefeed list --pd=$pd_addr | grep -v "Command to ticdc" | jq '.|length') if [[ "$feed_count" != "$expected" ]]; then echo "[$(date)] <<<<< unexpect changefeed count! expect ${expected} got ${feed_count} >>>>>" exit 1 diff --git a/tests/integration_tests/fail_over_ddl_E/run.sh b/tests/integration_tests/fail_over_ddl_E/run.sh index 5f404939d..d51c422ba 100644 --- a/tests/integration_tests/fail_over_ddl_E/run.sh +++ b/tests/integration_tests/fail_over_ddl_E/run.sh @@ -88,7 +88,7 @@ function failOverCaseE-1() { run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix "1-1" --addr "127.0.0.1:8301" ans=$(run_cdc_cli capture list) - node2ID=`echo $ans | sed 's/ PASS.*//' | grep -v "Command to ticdc" | jq -r '.[] | select(.address == "127.0.0.1:8301") | .id'` + node2ID=$(echo $ans | sed 's/ PASS.*//' | grep -v "Command to ticdc" | jq -r '.[] | select(.address == "127.0.0.1:8301") | .id') # move table 1 to node 2 move_table_with_retry "127.0.0.1:8301" 106 "test" 10 diff --git a/tests/integration_tests/synced_status/run.sh b/tests/integration_tests/synced_status/run.sh index 400f1dd35..a7329d92d 100644 --- a/tests/integration_tests/synced_status/run.sh +++ b/tests/integration_tests/synced_status/run.sh @@ -63,7 +63,7 @@ function run_normal_case_and_unavailable_pd() { run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --changefeed-id="test-1" --config="$CUR/$config_path" # case 1: test in available cluster - synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc" ) + synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc") status=$(echo $synced_status | jq '.synced') sink_checkpoint_ts=$(echo $synced_status | jq -r '.sink_checkpoint_ts') @@ -98,7 +98,7 @@ function run_normal_case_and_unavailable_pd() { check_table_exists "test.t1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} sleep 5 # wait data insert - synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc" ) + synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc") status=$(echo $synced_status | jq '.synced') if [ $status != false ]; then echo "synced status isn't correct" @@ -111,7 +111,7 @@ function run_normal_case_and_unavailable_pd() { fi sleep 130 # wait enough time for pass synced-check-interval - synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc" ) + synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc") status=$(echo $synced_status | jq '.synced') if [ $status != true ]; then echo "synced status isn't correct" @@ -124,7 +124,7 @@ function run_normal_case_and_unavailable_pd() { sleep 20 - synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc" ) + synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc") error_code=$(echo $synced_status | jq -r '.error_code') cleanup_process $CDC_BINARY stop_tidb_cluster @@ -153,7 +153,7 @@ function run_case_with_unavailable_tikv() { kill_tikv # test the case when pdNow - lastSyncedTs < threshold - synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc" ) + synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc") status=$(echo $synced_status | jq '.synced') if [ $status != false ]; then echo "synced status isn't correct" @@ -169,7 +169,7 @@ function run_case_with_unavailable_tikv() { sleep 130 # wait enough time for pass synced-check-interval # test the case when pdNow - lastSyncedTs > threshold - synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc" ) + synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc") status=$(echo $synced_status | jq '.synced') if [ $status != false ]; then echo "synced status isn't correct" @@ -215,7 +215,7 @@ function run_case_with_unavailable_tidb() { kill_tidb # test the case when pdNow - lastSyncedTs < threshold - synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc" ) + synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc") status=$(echo $synced_status | jq '.synced') if [ $status != false ]; then echo "synced status isn't correct" @@ -231,7 +231,7 @@ function run_case_with_unavailable_tidb() { sleep 130 # wait enough time for pass synced-check-interval # test the case when pdNow - lastSyncedTs > threshold - synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc" ) + synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc") status=$(echo $synced_status | jq '.synced') if [ $status != true ]; then echo "synced status isn't correct" @@ -268,7 +268,7 @@ function run_case_with_failpoint() { run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --changefeed-id="test-1" --config="$CUR/$config_path" sleep 20 # wait enough time for pass checkpoint-check-interval - synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc" ) + synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc") status=$(echo $synced_status | jq '.synced') if [ $status != false ]; then echo "synced status isn't correct" diff --git a/tests/integration_tests/synced_status_with_redo/run.sh b/tests/integration_tests/synced_status_with_redo/run.sh index 64b2308cd..77ff25f3b 100644 --- a/tests/integration_tests/synced_status_with_redo/run.sh +++ b/tests/integration_tests/synced_status_with_redo/run.sh @@ -67,7 +67,7 @@ function run_normal_case_and_unavailable_pd() { run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --changefeed-id="test-1" --config="$CUR/$config_path" # case 1: test in available cluster - synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc" ) + synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc") status=$(echo $synced_status | jq '.synced') sink_checkpoint_ts=$(echo $synced_status | jq -r '.sink_checkpoint_ts') @@ -102,7 +102,7 @@ function run_normal_case_and_unavailable_pd() { check_table_exists "test.t1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} sleep 5 # wait data insert - synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc" ) + synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc") status=$(echo $synced_status | jq '.synced') if [ $status != false ]; then echo "synced status isn't correct" @@ -115,7 +115,7 @@ function run_normal_case_and_unavailable_pd() { fi sleep 130 # wait enough time for pass synced-check-interval - synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc" ) + synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc") status=$(echo $synced_status | jq '.synced') if [ $status != true ]; then echo "synced status isn't correct" @@ -128,7 +128,7 @@ function run_normal_case_and_unavailable_pd() { sleep 20 - synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc" ) + synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc") error_code=$(echo $synced_status | jq -r '.error_code') cleanup_process $CDC_BINARY stop_tidb_cluster @@ -157,7 +157,7 @@ function run_case_with_unavailable_tikv() { kill_tikv # test the case when pdNow - lastSyncedTs < threshold - synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc" ) + synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc") status=$(echo $synced_status | jq '.synced') if [ $status != false ]; then echo "synced status isn't correct" @@ -173,7 +173,7 @@ function run_case_with_unavailable_tikv() { sleep 130 # wait enough time for pass synced-check-interval # test the case when pdNow - lastSyncedTs > threshold - synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc" ) + synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc") status=$(echo $synced_status | jq '.synced') if [ $status != false ]; then echo "synced status isn't correct" @@ -219,7 +219,7 @@ function run_case_with_unavailable_tidb() { kill_tidb # test the case when pdNow - lastSyncedTs < threshold - synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc" ) + synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc") status=$(echo $synced_status | jq '.synced') if [ $status != false ]; then echo "synced status isn't correct" @@ -235,7 +235,7 @@ function run_case_with_unavailable_tidb() { sleep 130 # wait enough time for pass synced-check-interval # test the case when pdNow - lastSyncedTs > threshold - synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc" ) + synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc") status=$(echo $synced_status | jq '.synced') if [ $status != true ]; then echo "synced status isn't correct" @@ -272,7 +272,7 @@ function run_case_with_failpoint() { run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --changefeed-id="test-1" --config="$CUR/$config_path" sleep 20 # wait enough time for pass checkpoint-check-interval - synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc" ) + synced_status=$(curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test-1/synced | grep -v "Command to ticdc") status=$(echo $synced_status | jq '.synced') if [ $status != false ]; then echo "synced status isn't correct" diff --git a/tools/check/check-errdoc.sh b/tools/check/check-errdoc.sh deleted file mode 100755 index 9d9cd2b68..000000000 --- a/tools/check/check-errdoc.sh +++ /dev/null @@ -1,21 +0,0 @@ -#!/bin/bash -# Copyright 2024 PingCAP, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# See the License for the specific language governing permissions and -# limitations under the License. - -set -euo pipefail - -cd -P . - -cp errors.toml /tmp/errors.toml.before -./tools/bin/errdoc-gen --source . --module github.com/pingcap/ticdc --output errors.toml --ignore eventpb,heartbeatpb -diff -q errors.toml /tmp/errors.toml.before diff --git a/tools/check/tools.go b/tools/check/tools.go index 13d2c2720..385700baf 100644 --- a/tools/check/tools.go +++ b/tools/check/tools.go @@ -32,7 +32,6 @@ import ( _ "github.com/grpc-ecosystem/grpc-gateway/v2/protoc-gen-grpc-gateway" _ "github.com/grpc-ecosystem/grpc-gateway/v2/protoc-gen-openapiv2" _ "github.com/mattn/goveralls" - _ "github.com/pingcap/errors/errdoc-gen" _ "github.com/pingcap/failpoint/failpoint-ctl" _ "github.com/swaggo/swag/cmd/swag" _ "github.com/tinylib/msgp" diff --git a/tools/workload/main.go b/tools/workload/main.go index f21bcbce2..e2bdbe90f 100644 --- a/tools/workload/main.go +++ b/tools/workload/main.go @@ -251,10 +251,10 @@ func handlePrepareAction(dbs []*sql.DB, insertConcurrency int, workload schema.W func handleWorkloadExecution(dbs []*sql.DB, insertConcurrency, updateConcurrency int, workload schema.Workload, wg *sync.WaitGroup) { log.Info("start running workload", - zap.String("workload_type", workloadType), - zap.Float64("large-ratio", largeRowRatio), - zap.Int("total_thread", thread), - zap.Int("batch-size", batchSize), + zap.String("workloadType", workloadType), + zap.Float64("largeRatio", largeRowRatio), + zap.Int("totalThread", thread), + zap.Int("batchSize", batchSize), zap.String("action", action), ) @@ -286,7 +286,7 @@ func executeUpdateWorkers(dbs []*sql.DB, updateConcurrency int, workload schema. if updateConcurrency == 0 { log.Info("skip update workload", zap.String("action", action), - zap.Int("total_thread", thread), + zap.Int("totalThread", thread), zap.Float64("percentageForUpdate", percentageForUpdate)) return }