Skip to content

Commit

Permalink
coordinator: improve the log format and code struct (#978)
Browse files Browse the repository at this point in the history
close #963
  • Loading branch information
asddongmen authored Feb 11, 2025
1 parent 8b3929e commit b7a4b1c
Show file tree
Hide file tree
Showing 18 changed files with 486 additions and 249 deletions.
7 changes: 4 additions & 3 deletions coordinator/changefeed/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,15 +188,16 @@ func (m *Backoff) HandleError(errs []*heartbeatpb.RunningError) (bool, *heartbea
log.Error("The changefeed won't be restarted as it has been experiencing failures for "+
"an extended duration",
zap.Duration("maxElapsedTime", m.errBackoff.MaxElapsedTime),
zap.String("namespace", m.id.Namespace()),
zap.String("changefeed", m.id.Name()),
zap.Stringer("changefeed", m.id),
zap.Uint64("checkpointTs", m.checkpointTs),
zap.Time("nextRetryTime", m.nextRetryTime.Load()),
)
return true, lastError
}
// if any error is occurred , we should set the changefeed state to warning and stop the changefeed
log.Warn("changefeed meets an error, will be stopped",
zap.String("namespace", m.id.Name()),
zap.Stringer("changefeed", m.id),
zap.Uint64("checkpointTs", m.checkpointTs),
zap.Time("nextRetryTime", m.nextRetryTime.Load()),
zap.Any("error", errs))
// patch the last error to changefeed info
Expand Down
2 changes: 1 addition & 1 deletion coordinator/changefeed/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (c *Changefeed) UpdateStatus(newStatus *heartbeatpb.MaintainerStatus) (bool
c.status.Store(newStatus)
if old.BootstrapDone != newStatus.BootstrapDone {
log.Info("Received changefeed status with bootstrapDone",
zap.String("changefeed", c.ID.String()),
zap.Stringer("changefeed", c.ID),
zap.Bool("bootstrapDone", newStatus.BootstrapDone))
return true, model.StateNormal, nil
}
Expand Down
5 changes: 4 additions & 1 deletion coordinator/changefeed/changefeed_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,10 @@ func (db *ChangefeedDB) Resume(id common.ChangeFeedID, resetBackoff bool, overwr
delete(db.stopped, id)
cf.isNew = overwriteCheckpointTs
db.AddAbsentWithoutLock(cf)
log.Info("resume changefeed", zap.String("changefeed", id.String()), zap.Any("overwriteCheckpointTs", overwriteCheckpointTs))
log.Info("resume changefeed",
zap.Stringer("changefeed", id),
zap.Uint64("checkpointTs", cf.GetStatus().CheckpointTs),
zap.Bool("overwriteCheckpointTs", overwriteCheckpointTs))
}
}

Expand Down
3 changes: 3 additions & 0 deletions coordinator/changefeed/changefeed_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ func TestResume(t *testing.T) {
cf := &Changefeed{ID: common.NewChangeFeedIDWithName("test")}
db.AddStoppedChangefeed(cf)
cf.backoff = NewBackoff(cf.ID, 0, 0)
cf.status = atomic.NewPointer(&heartbeatpb.MaintainerStatus{
CheckpointTs: 100,
})

db.Resume(cf.ID, true, false)

Expand Down
Loading

0 comments on commit b7a4b1c

Please sign in to comment.