Skip to content

Commit

Permalink
make check fix format (#822)
Browse files Browse the repository at this point in the history
* Add make clean

* make fmt

* fix line length and log style

* remove error doc
  • Loading branch information
3AceShowHand authored Jan 8, 2025
1 parent 564ded1 commit 1b604be
Show file tree
Hide file tree
Showing 21 changed files with 119 additions and 121 deletions.
13 changes: 8 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,6 @@ tidy:
@echo "go mod tidy"
./tools/check/check-tidy.sh

errdoc: tools/bin/errdoc-gen
@echo "generator errors.toml"
./tools/check/check-errdoc.sh

check-copyright:
@echo "check-copyright"
@./scripts/check-copyright.sh
Expand All @@ -192,7 +188,14 @@ check-makefiles: format-makefiles
format-makefiles: $(MAKE_FILES)
$(SED_IN_PLACE) -e 's/^\(\t*\) /\1\t/g' -e 's/^\(\t*\) /\1/' -- $?

check: check-copyright fmt tidy errdoc check-diff-line-width check-ticdc-dashboard check-makefiles
check: check-copyright fmt tidy check-diff-line-width check-ticdc-dashboard check-makefiles
@git --no-pager diff --exit-code || (echo "Please add changed files!" && false)

clean:
go clean -i ./...
rm -rf *.out
rm -rf bin
rm -rf tools/bin
rm -rf tools/include

workload: tools/bin/workload
45 changes: 30 additions & 15 deletions downstreamadapter/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,14 @@ func NewDispatcher(
return dispatcher
}

func (d *Dispatcher) InitalizeTableSchemaStore(schemaInfo []*heartbeatpb.SchemaInfo) error {
func (d *Dispatcher) InitializeTableSchemaStore(schemaInfo []*heartbeatpb.SchemaInfo) error {
// Only the table trigger event dispatcher need to create a tableSchemaStore
// Because we only need to calculate the tableNames or TableIds in the sink
// when the event dispatcher manager have table trigger event dispatcher
if !d.tableSpan.Equal(heartbeatpb.DDLSpan) {
log.Error("InitalizeTableSchemaStore should only be received by table trigger event dispatcher", zap.Any("dispatcher", d.id))
return apperror.ErrChangefeedInitTableTriggerEventDispatcherFailed.GenWithStackByArgs("InitalizeTableSchemaStore should only be received by table trigger event dispatcher")
log.Error("InitializeTableSchemaStore should only be received by table trigger event dispatcher", zap.Any("dispatcher", d.id))
return apperror.ErrChangefeedInitTableTriggerEventDispatcherFailed.
GenWithStackByArgs("InitializeTableSchemaStore should only be received by table trigger event dispatcher")
}

if d.tableSchemaStore != nil {
Expand All @@ -209,7 +210,11 @@ func (d *Dispatcher) InitalizeTableSchemaStore(schemaInfo []*heartbeatpb.SchemaI
// 1. If the action is a write, we need to add the ddl event to the sink for writing to downstream.
// 2. If the action is a pass, we just need to pass the event
func (d *Dispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.DispatcherStatus) {
log.Debug("dispatcher handle dispatcher status", zap.Any("dispatcherStatus", dispatcherStatus), zap.Stringer("dispatcher", d.id), zap.Any("action", dispatcherStatus.GetAction()), zap.Any("ack", dispatcherStatus.GetAck()))
log.Debug("dispatcher handle dispatcher status",
zap.Any("dispatcherStatus", dispatcherStatus),
zap.Stringer("dispatcher", d.id),
zap.Any("action", dispatcherStatus.GetAction()),
zap.Any("ack", dispatcherStatus.GetAck()))
// deal with the ack info
ack := dispatcherStatus.GetAck()
if ack != nil {
Expand Down Expand Up @@ -238,8 +243,8 @@ func (d *Dispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.Dispat
case d.errCh <- err:
default:
log.Error("error channel is full, discard error",
zap.Any("ChangefeedID", d.changefeedID.String()),
zap.Any("DispatcherID", d.id.String()),
zap.Any("changefeedID", d.changefeedID.String()),
zap.Any("dispatcherID", d.id.String()),
zap.Error(err))
}
return
Expand Down Expand Up @@ -278,7 +283,8 @@ func (d *Dispatcher) HandleEvents(dispatcherEvents []DispatcherEvent, wakeCallba
block = false
// Dispatcher is ready, handle the events
for _, dispatcherEvent := range dispatcherEvents {
log.Debug("dispatcher receive all event", zap.Any("event", dispatcherEvent.Event), zap.Stringer("dispatcher", d.id))
log.Debug("dispatcher receive all event",
zap.Stringer("dispatcher", d.id), zap.Any("event", dispatcherEvent.Event))
failpoint.Inject("HandleEventsSlowly", func() {
lag := time.Duration(rand.Intn(5000)) * time.Millisecond
log.Warn("handle events slowly", zap.Duration("lag", lag))
Expand All @@ -298,7 +304,7 @@ func (d *Dispatcher) HandleEvents(dispatcherEvents []DispatcherEvent, wakeCallba
}

if event.GetType() != commonEvent.TypeResolvedEvent {
log.Debug("dispatcher receive event", zap.Any("event", event), zap.Stringer("dispatcher", d.id))
log.Debug("dispatcher receive event", zap.Stringer("dispatcher", d.id), zap.Any("event", event))
}

switch event.GetType() {
Expand Down Expand Up @@ -354,9 +360,13 @@ func (d *Dispatcher) HandleEvents(dispatcherEvents []DispatcherEvent, wakeCallba
})
d.dealWithBlockEvent(event)
case commonEvent.TypeHandshakeEvent:
log.Warn("Receive handshake event unexpectedly", zap.Any("event", event), zap.Stringer("dispatcher", d.id))
log.Warn("Receive handshake event unexpectedly",
zap.Stringer("dispatcher", d.id), zap.Any("event", event))
default:
log.Panic("Unexpected event type", zap.Any("event Type", event.GetType()), zap.Stringer("dispatcher", d.id), zap.Uint64("commitTs", event.GetCommitTs()))
log.Panic("Unexpected event type",
zap.Any("eventType", event.GetType()),
zap.Stringer("dispatcher", d.id),
zap.Uint64("commitTs", event.GetCommitTs()))
}
}
return block
Expand Down Expand Up @@ -419,7 +429,7 @@ func (d *Dispatcher) shouldBlock(event commonEvent.BlockEvent) bool {
case commonEvent.TypeSyncPointEvent:
return true
default:
log.Error("invalid event type", zap.Any("event Type", event.GetType()))
log.Error("invalid event type", zap.Any("eventType", event.GetType()))
}
return false
}
Expand All @@ -435,8 +445,8 @@ func (d *Dispatcher) dealWithBlockEvent(event commonEvent.BlockEvent) {
case d.errCh <- err:
default:
log.Error("error channel is full, discard error",
zap.Any("ChangefeedID", d.changefeedID.String()),
zap.Any("DispatcherID", d.id.String()),
zap.Any("changefeedID", d.changefeedID.String()),
zap.Any("dispatcherID", d.id.String()),
zap.Error(err))
}
return
Expand Down Expand Up @@ -517,7 +527,11 @@ func (d *Dispatcher) dealWithBlockEvent(event commonEvent.BlockEvent) {
for _, schemaIDChange := range event.GetUpdatedSchemas() {
if schemaIDChange.TableID == d.tableSpan.TableID {
if schemaIDChange.OldSchemaID != d.schemaID {
log.Error("Wrong Schema ID", zap.Any("dispatcherID", d.id), zap.Any("except schemaID", schemaIDChange.OldSchemaID), zap.Any("actual schemaID", d.schemaID), zap.Any("tableSpan", d.tableSpan.String()))
log.Error("Wrong Schema ID",
zap.Any("dispatcherID", d.id),
zap.Any("exceptSchemaID", schemaIDChange.OldSchemaID),
zap.Any("actualSchemaID", d.schemaID),
zap.Any("tableSpan", d.tableSpan.String()))
return
} else {
d.schemaID = schemaIDChange.NewSchemaID
Expand Down Expand Up @@ -594,7 +608,8 @@ func (d *Dispatcher) GetSyncPointInterval() time.Duration {
}

func (d *Dispatcher) Remove() {
log.Info("table event dispatcher component status changed to stopping", zap.String("table", d.tableSpan.String()))
log.Info("table event dispatcher component status changed to stopping",
zap.String("table", d.tableSpan.String()))
d.isRemoving.Store(true)

dispatcherStatusDynamicStream := GetDispatcherStatusDynamicStream()
Expand Down
4 changes: 2 additions & 2 deletions downstreamadapter/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ func TestTableTriggerEventDispatcherInMysql(t *testing.T) {
tableTriggerEventDispatcher := newDispatcherForTest(sink, ddlTableSpan)
require.Nil(t, tableTriggerEventDispatcher.tableSchemaStore)

err := tableTriggerEventDispatcher.InitalizeTableSchemaStore([]*heartbeatpb.SchemaInfo{})
err := tableTriggerEventDispatcher.InitializeTableSchemaStore([]*heartbeatpb.SchemaInfo{})
require.NoError(t, err)

helper := commonEvent.NewEventTestHelper(t)
Expand Down Expand Up @@ -573,7 +573,7 @@ func TestTableTriggerEventDispatcherInKafka(t *testing.T) {
tableTriggerEventDispatcher := newDispatcherForTest(sink, ddlTableSpan)
require.Nil(t, tableTriggerEventDispatcher.tableSchemaStore)

err := tableTriggerEventDispatcher.InitalizeTableSchemaStore([]*heartbeatpb.SchemaInfo{})
err := tableTriggerEventDispatcher.InitializeTableSchemaStore([]*heartbeatpb.SchemaInfo{})
require.NoError(t, err)

helper := commonEvent.NewEventTestHelper(t)
Expand Down
2 changes: 1 addition & 1 deletion downstreamadapter/dispatcher/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func newResendTask(message *heartbeatpb.TableSpanBlockStatus, dispatcher *Dispat
}

func (t *ResendTask) Execute() time.Time {
log.Debug("resend task", zap.Any("message", t.message), zap.Any("dispatcher id", t.dispatcher.id))
log.Debug("resend task", zap.Any("message", t.message), zap.Any("dispatcherID", t.dispatcher.id))
t.dispatcher.blockStatusesChan <- t.message
return time.Now().Add(200 * time.Millisecond)
}
Expand Down
10 changes: 6 additions & 4 deletions downstreamadapter/dispatchermanager/event_dispatcher_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func (e *EventDispatcherManager) close(removeChangefeed bool) {
if dispatcher.IsTableTriggerEventDispatcher() && e.sink.SinkType() != common.MysqlSinkType {
err := appcontext.GetService[*HeartBeatCollector](appcontext.HeartbeatCollector).RemoveCheckpointTsMessage(e.changefeedID)
if err != nil {
log.Error("remove checkpointTs message failed", zap.Error(err), zap.Any("changefeedID", e.changefeedID))
log.Error("remove checkpointTs message failed", zap.Any("changefeedID", e.changefeedID), zap.Error(err))
}
}
dispatcher.Remove()
Expand Down Expand Up @@ -313,7 +313,7 @@ func (e *EventDispatcherManager) InitalizeTableTriggerEventDispatcher(schemaInfo
if e.tableTriggerEventDispatcher == nil {
return nil
}
err := e.tableTriggerEventDispatcher.InitalizeTableSchemaStore(schemaInfo)
err := e.tableTriggerEventDispatcher.InitializeTableSchemaStore(schemaInfo)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -367,7 +367,8 @@ func (e *EventDispatcherManager) newDispatchers(infos []dispatcherCreateInfo) er
if err != nil {
return errors.Trace(err)
}
log.Info("calculate real startTs for dispatchers", zap.Any("receive startTs", startTsList), zap.Any("real startTs", newStartTsList))
log.Info("calculate real startTs for dispatchers",
zap.Any("receiveStartTs", startTsList), zap.Any("realStartTs", newStartTsList))
} else {
newStartTsList = startTsList
}
Expand Down Expand Up @@ -677,7 +678,8 @@ func (e *EventDispatcherManager) cleanDispatcher(id common.DispatcherID, schemaI
} else {
e.metricEventDispatcherCount.Dec()
}
log.Info("table event dispatcher completely stopped, and delete it from event dispatcher manager", zap.Any("dispatcher id", id))
log.Info("table event dispatcher completely stopped, and delete it from event dispatcher manager",
zap.Any("dispatcherID", id))
}

func (e *EventDispatcherManager) GetDispatcherMap() *DispatcherMap {
Expand Down
36 changes: 22 additions & 14 deletions downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,23 +69,25 @@ func (m *DispatcherOrchestrator) handlePostBootstrap(from node.ID, req *heartbea
cfId := common.NewChangefeedIDFromPB(req.ChangefeedID)
manager, exists := m.dispatcherManagers[cfId]
if !exists || manager.GetTableTriggerEventDispatcher() == nil {
log.Error("Receive post bootstrap request but there is no table trigger event dispatcher", zap.Any("ChangefeedID", cfId.Name()))
log.Error("Receive post bootstrap request but there is no table trigger event dispatcher",
zap.Any("changefeedID", cfId.Name()))
return nil
}
if manager.GetTableTriggerEventDispatcher().GetId() != common.NewDispatcherIDFromPB(req.TableTriggerEventDispatcherId) {
log.Error("Receive post bootstrap request but the table trigger event dispatcher id is not match",
zap.Any("ChangefeedID", cfId.Name()),
zap.String("expected table trigger event dispatcher id", manager.GetTableTriggerEventDispatcher().GetId().String()),
zap.String("actual table trigger event dispatcher id", common.NewDispatcherIDFromPB(req.TableTriggerEventDispatcherId).String()))
zap.Any("changefeedID", cfId.Name()),
zap.String("expectedDispatcherID", manager.GetTableTriggerEventDispatcher().GetId().String()),
zap.String("actualDispatcherID", common.NewDispatcherIDFromPB(req.TableTriggerEventDispatcherId).String()))

error := apperror.ErrChangefeedInitTableTriggerEventDispatcherFailed.GenWithStackByArgs("Receive post bootstrap request but the table trigger event dispatcher id is not match")
err := apperror.ErrChangefeedInitTableTriggerEventDispatcherFailed.
GenWithStackByArgs("Receive post bootstrap request but the table trigger event dispatcher id is not match")
response := &heartbeatpb.MaintainerPostBootstrapResponse{
ChangefeedID: req.ChangefeedID,
Err: &heartbeatpb.RunningError{
Time: time.Now().String(),
Node: from.String(),
Code: string(apperror.ErrorCode(error)),
Message: error.Error(),
Code: string(apperror.ErrorCode(err)),
Message: err.Error(),
},
}
return m.sendResponse(from, messaging.MaintainerManagerTopic, response)
Expand All @@ -94,7 +96,8 @@ func (m *DispatcherOrchestrator) handlePostBootstrap(from node.ID, req *heartbea
// init table schema store
err := manager.InitalizeTableTriggerEventDispatcher(req.Schemas)
if err != nil {
log.Error("failed to initalize table trigger event dispatcher", zap.Error(err), zap.Any("ChangefeedID", cfId.Name()))
log.Error("failed to initialize table trigger event dispatcher",
zap.Any("changefeedID", cfId.Name()), zap.Error(err))

response := &heartbeatpb.MaintainerPostBootstrapResponse{
ChangefeedID: req.ChangefeedID,
Expand Down Expand Up @@ -122,14 +125,16 @@ func (m *DispatcherOrchestrator) handleAddDispatcherManager(from node.ID, req *h
var startTs uint64
if !exists {
cfConfig := &config.ChangefeedConfig{}
if err := json.Unmarshal(req.Config, cfConfig); err != nil {
log.Panic("failed to unmarshal changefeed config", zap.String("changefeed id", cfId.Name()), zap.Error(err))
if err = json.Unmarshal(req.Config, cfConfig); err != nil {
log.Panic("failed to unmarshal changefeed config",
zap.String("changefeedID", cfId.Name()), zap.Error(err))
return err
}
manager, startTs, err = dispatchermanager.NewEventDispatcherManager(cfId, cfConfig, req.TableTriggerEventDispatcherId, req.StartTs, from)
// Fast return the error to maintainer.
if err != nil {
log.Error("failed to create new dispatcher manager", zap.Error(err), zap.Any("ChangefeedID", cfId.Name()))
log.Error("failed to create new dispatcher manager",
zap.Any("changefeedID", cfId.Name()), zap.Error(err))

response := &heartbeatpb.MaintainerBootstrapResponse{
ChangefeedID: req.ChangefeedID,
Expand All @@ -151,7 +156,8 @@ func (m *DispatcherOrchestrator) handleAddDispatcherManager(from node.ID, req *h
if manager.GetTableTriggerEventDispatcher() == nil && req.TableTriggerEventDispatcherId != nil {
startTs, err = manager.NewTableTriggerEventDispatcher(req.TableTriggerEventDispatcherId, req.StartTs)
if err != nil {
log.Error("failed to create new table trigger event dispatcher", zap.Error(err), zap.Any("ChangefeedID", cfId.Name()))
log.Error("failed to create new table trigger event dispatcher",
zap.Any("changefeedID", cfId.Name()), zap.Error(err))

response := &heartbeatpb.MaintainerBootstrapResponse{
ChangefeedID: req.ChangefeedID,
Expand All @@ -169,7 +175,8 @@ func (m *DispatcherOrchestrator) handleAddDispatcherManager(from node.ID, req *h

if manager.GetMaintainerID() != from {
manager.SetMaintainerID(from)
log.Info("maintainer changed", zap.String("changefeed", cfId.Name()), zap.String("maintainer", from.String()))
log.Info("maintainer changed",
zap.String("changefeed", cfId.Name()), zap.String("maintainer", from.String()))
}

response := createBootstrapResponse(req.ChangefeedID, manager, startTs)
Expand All @@ -193,7 +200,8 @@ func (m *DispatcherOrchestrator) handleRemoveDispatcherManager(from node.ID, req
}
}

log.Info("try close dispatcher manager", zap.String("changefeed", cfId.Name()), zap.Bool("success", response.Success))
log.Info("try close dispatcher manager",
zap.String("changefeed", cfId.Name()), zap.Bool("success", response.Success))
return m.sendResponse(from, messaging.MaintainerTopic, response)
}

Expand Down
2 changes: 1 addition & 1 deletion downstreamadapter/downstream_performance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func TestDownstream(t *testing.T) {
}

wg.Wait()
log.Warn("test begin", zap.Any("create dispatcher cost time", time.Since(start)))
log.Warn("test begin", zap.Any("duration", time.Since(start)))

// 插入数据, 先固定 data 格式
go pushDataIntoDispatchers(dispatcherIDSet, helper)
Expand Down
4 changes: 2 additions & 2 deletions downstreamadapter/sink/kafka_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (s *KafkaSink) run(ctx context.Context) {
case s.errCh <- err:
default:
log.Error("error channel is full, discard error",
zap.Any("ChangefeedID", s.changefeedID.String()),
zap.Any("changefeedID", s.changefeedID.String()),
zap.Error(err))
}
}
Expand Down Expand Up @@ -174,7 +174,7 @@ func (s *KafkaSink) WriteBlockEvent(event commonEvent.BlockEvent) error {
log.Error("KafkaSink doesn't support this type of block event",
zap.String("namespace", s.changefeedID.Namespace()),
zap.String("changefeed", s.changefeedID.Name()),
zap.Any("event type", event.GetType()))
zap.Any("eventType", event.GetType()))
}
return nil
}
Expand Down
8 changes: 0 additions & 8 deletions errors.toml

This file was deleted.

4 changes: 2 additions & 2 deletions maintainer/barrier_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ func NewBlockEvent(cfID common.ChangeFeedID, controller *Controller,
}
log.Info("new block event is created",
zap.String("changefeedID", cfID.Name()),
zap.Uint64("block-ts", event.commitTs),
zap.Bool("sync-point", event.isSyncPoint),
zap.Uint64("blockTs", event.commitTs),
zap.Bool("syncPoint", event.isSyncPoint),
zap.Any("detail", status))
return event
}
Expand Down
4 changes: 2 additions & 2 deletions maintainer/maintainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func NewMaintainer(cfID common.ChangeFeedID,
m.bootstrapper = bootstrap.NewBootstrapper[heartbeatpb.MaintainerBootstrapResponse](m.id.Name(), m.getNewBootstrapFn())
log.Info("changefeed maintainer is created", zap.String("id", cfID.String()),
zap.Uint64("checkpointTs", checkpointTs),
zap.String("ddl dispatcher", tableTriggerEventDispatcherID.String()))
zap.String("ddlDispatcherID", tableTriggerEventDispatcherID.String()))
metrics.MaintainerGauge.WithLabelValues(cfID.Namespace(), cfID.Name()).Inc()
// Should update metrics immediately when maintainer is created
// FIXME: Use a correct context
Expand Down Expand Up @@ -736,7 +736,7 @@ func (m *Maintainer) getNewBootstrapFn() bootstrap.NewBootstrapMessageFn {
if id == m.selfNode.ID {
log.Info("create table event trigger dispatcher", zap.String("changefeed", m.id.String()),
zap.String("server", id.String()),
zap.String("dispatcher id", m.ddlSpan.ID.String()))
zap.String("dispatcherID", m.ddlSpan.ID.String()))
msg.TableTriggerEventDispatcherId = m.ddlSpan.ID.ToPB()
}
log.Info("send maintainer bootstrap message",
Expand Down
Loading

0 comments on commit 1b604be

Please sign in to comment.