From 7ac5f85cc515c220e3de730d7460becdeb487ea2 Mon Sep 17 00:00:00 2001 From: CharlesCheung <61726649+CharlesCheung96@users.noreply.github.com> Date: Fri, 26 Jul 2024 19:20:23 +0800 Subject: [PATCH] *: Add local docker (#142) * add local dockerfile * fix * disable thread pool * fix scheduler * fix --- coordinator/coordinator.go | 4 +- dockerfile-local | 7 ++ maintainer/maintainer.go | 26 +++++- maintainer/maintainer_manager.go | 4 +- scheduler/scheduler.go | 24 ++++-- scheduler/scheduler_balance.go | 6 +- scheduler/scheduler_basic.go | 92 ++++++++++++--------- scheduler/state_machine_test.go | 2 +- utils/threadpool/task_scheduler_instance.go | 24 +++--- 9 files changed, 128 insertions(+), 61 deletions(-) create mode 100644 dockerfile-local diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index 19cdbde95..a3a0236f0 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -67,7 +67,7 @@ func NewCoordinator(capture *model.CaptureInfo, version int64) server.Coordinato c.supervisor = scheduler.NewSupervisor( scheduler.ChangefeedID(model.DefaultChangeFeedID("coordinator")), c.newChangefeed, c.newBootstrapMessage, - scheduler.NewBasicScheduler(1000), + scheduler.NewBasicScheduler(), scheduler.NewBalanceScheduler(time.Minute, 1000), ) @@ -205,6 +205,8 @@ func (c *coordinator) scheduleMaintainer(state *orchestrator.GlobalReactorState) allChangefeeds.ReplaceOrInsert(scheduler.ChangefeedID(id), cf) } } + c.supervisor.MarkNeedAddInferior() + c.supervisor.MarkNeedRemoveInferior() tasks := c.supervisor.Schedule( allChangefeeds, c.supervisor.GetAllCaptures(), diff --git a/dockerfile-local b/dockerfile-local new file mode 100644 index 000000000..bd76576cd --- /dev/null +++ b/dockerfile-local @@ -0,0 +1,7 @@ +# make cdc && docker build -f ./dockerfile-local . -t --platform linux/amd64 +FROM alpine:3.15 +RUN apk add --no-cache tzdata bash curl socat +EXPOSE 8301 +EXPOSE 8300 +COPY ./bin/cdc /cdc +CMD [ "/cdc" ] \ No newline at end of file diff --git a/maintainer/maintainer.go b/maintainer/maintainer.go index e4180507f..9b202e7d4 100644 --- a/maintainer/maintainer.go +++ b/maintainer/maintainer.go @@ -140,13 +140,30 @@ func NewMaintainer(cfID model.ChangeFeedID, } m.supervisor = scheduler.NewSupervisor(scheduler.ChangefeedID(cfID), m.getReplicaSet, m.getNewBootstrapFn(), - scheduler.NewBasicScheduler(1000), + scheduler.NewBasicScheduler(), scheduler.NewBalanceScheduler(time.Minute, 1000), ) log.Info("create maintainer", zap.String("id", cfID.String())) return m } +func (m *Maintainer) Run() { + // Note: currently the threadPool maybe discard some running task, fix this later. + // threadpool.GetTaskSchedulerInstance().MaintainerTaskScheduler. + // Submit(m, threadpool.CPUTask, time.Now()) + go func() { + ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() + for range ticker.C { + newStatus, _ := m.Execute() + if newStatus == threadpool.Done { + log.Warn("maintainer is done", zap.String("id", m.id.String())) + return + } + } + }() +} + func (m *Maintainer) cleanupMetrics() { metrics.ChangefeedCheckpointTsGauge.DeleteLabelValues(m.id.Namespace, m.id.ID) metrics.ChangefeedCheckpointTsLagGauge.DeleteLabelValues(m.id.Namespace, m.id.ID) @@ -158,6 +175,10 @@ func (m *Maintainer) cleanupMetrics() { func (m *Maintainer) Execute() (taskStatus threadpool.TaskStatus, tick time.Time) { log.Info("maintainer execute", zap.String("id", m.id.String())) + defer func() { + log.Info("maintainer execute done", zap.String("id", m.id.String()), + zap.Int("status", int(taskStatus)), zap.Time("tickTime", tick)) + }() m.updateMetrics() if m.removed.Load() { // removed, cancel the task @@ -347,6 +368,7 @@ func (m *Maintainer) initChangefeed() error { replicaSet := NewReplicaSet(m.id, tableSpan, m.checkpointTs.Load()).(*ReplicaSet) m.tableSpans.ReplaceOrInsert(tableSpan, replicaSet) } + m.supervisor.MarkNeedAddInferior() return err } @@ -483,6 +505,8 @@ func (m *Maintainer) closeChangefeed() { m.state = heartbeatpb.ComponentState_Stopping m.statusChanged.Store(true) m.tableSpans = utils.NewBtreeMap[scheduler.InferiorID, scheduler.Inferior]() + m.supervisor.MarkNeedAddInferior() + m.supervisor.MarkNeedRemoveInferior() } } diff --git a/maintainer/maintainer_manager.go b/maintainer/maintainer_manager.go index f77309933..f0e56a3b7 100644 --- a/maintainer/maintainer_manager.go +++ b/maintainer/maintainer_manager.go @@ -22,7 +22,6 @@ import ( "github.com/flowbehappy/tigate/heartbeatpb" appcontext "github.com/flowbehappy/tigate/pkg/common/context" "github.com/flowbehappy/tigate/pkg/messaging" - "github.com/flowbehappy/tigate/utils/threadpool" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "go.uber.org/zap" @@ -188,8 +187,7 @@ func (m *Manager) onDispatchMaintainerRequest( cf = NewMaintainer(cfID, req.IsSecondary, cfConfig, req.CheckpointTs, m.pdEndpoints) m.maintainers.Store(cfID, cf) - threadpool.GetTaskSchedulerInstance().MaintainerTaskScheduler.Submit(cf.(*Maintainer), - threadpool.CPUTask, time.Now()) + cf.(*Maintainer).Run() } cf.(*Maintainer).isSecondary.Store(req.IsSecondary) } diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 7b357daf0..2cb779d4f 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -29,6 +29,7 @@ type Scheduler interface { allInferiors utils.Map[InferiorID, Inferior], aliveCaptures map[model.CaptureID]*CaptureStatus, stateMachines utils.Map[InferiorID, *StateMachine], + batchSize int, ) []*ScheduleTask } @@ -38,10 +39,13 @@ func (s *Supervisor) Schedule( aliveCaptures map[model.CaptureID]*CaptureStatus, stateMachines utils.Map[InferiorID, *StateMachine], ) []*ScheduleTask { - if allInferiors.Len() == stateMachines.Len() && time.Since(s.lastScheduleTime) < 60*time.Second { - return nil + if time.Since(s.lastScheduleTime) > 120*time.Second { + s.MarkNeedAddInferior() + s.MarkNeedRemoveInferior() } - if s.RunningTasks.Len() >= s.maxTaskConcurrency { + + batchSize := s.maxTaskConcurrency - s.RunningTasks.Len() + if batchSize <= 0 { log.Warn("Skip scheduling since there are too many running task", zap.String("id", s.ID.String()), zap.Int("totalInferiors", allInferiors.Len()), @@ -51,16 +55,26 @@ func (s *Supervisor) Schedule( ) return nil } - s.lastScheduleTime = time.Now() for _, sched := range s.schedulers { - tasks := sched.Schedule(allInferiors, aliveCaptures, stateMachines) + tasks := sched.Schedule(allInferiors, aliveCaptures, stateMachines, batchSize) if len(tasks) != 0 { + s.lastScheduleTime = time.Now() return tasks } } return nil } +func (s *Supervisor) MarkNeedAddInferior() { + basciScheduler := s.schedulers[0].(*BasicScheduler) + basciScheduler.needAddInferior = true +} + +func (s *Supervisor) MarkNeedRemoveInferior() { + basciScheduler := s.schedulers[0].(*BasicScheduler) + basciScheduler.needRemoveInferior = true +} + func (s *Supervisor) Name() string { return "combine-scheduler" } diff --git a/scheduler/scheduler_balance.go b/scheduler/scheduler_balance.go index 7db77c2ef..d60ad43c3 100644 --- a/scheduler/scheduler_balance.go +++ b/scheduler/scheduler_balance.go @@ -59,7 +59,11 @@ func (b *balanceScheduler) Schedule( allInferiors utils.Map[InferiorID, Inferior], aliveCaptures map[model.CaptureID]*CaptureStatus, stateMachines utils.Map[InferiorID, *StateMachine], + batchSize int, ) []*ScheduleTask { + if b.maxTaskConcurrency < batchSize { + batchSize = b.maxTaskConcurrency + } if !b.forceBalance { now := time.Now() if now.Sub(b.lastRebalanceTime) < b.checkBalanceInterval { @@ -70,7 +74,7 @@ func (b *balanceScheduler) Schedule( } tasks := buildBalanceMoveTables( - b.random, aliveCaptures, stateMachines, b.maxTaskConcurrency) + b.random, aliveCaptures, stateMachines, batchSize) b.forceBalance = len(tasks) != 0 return tasks } diff --git a/scheduler/scheduler_basic.go b/scheduler/scheduler_basic.go index b192575b0..51fc2b78c 100644 --- a/scheduler/scheduler_basic.go +++ b/scheduler/scheduler_basic.go @@ -21,11 +21,15 @@ import ( ) type BasicScheduler struct { - batchSize int + needAddInferior bool + addInferiorCache []InferiorID + + needRemoveInferior bool + removeInferiorCache []InferiorID } -func NewBasicScheduler(batchSize int) *BasicScheduler { - return &BasicScheduler{batchSize: batchSize} +func NewBasicScheduler() *BasicScheduler { + return &BasicScheduler{} } func (b *BasicScheduler) Name() string { @@ -36,37 +40,40 @@ func (b *BasicScheduler) Schedule( allInferiors utils.Map[InferiorID, Inferior], aliveCaptures map[model.CaptureID]*CaptureStatus, stateMachines utils.Map[InferiorID, *StateMachine], + batchSize int, ) []*ScheduleTask { tasks := make([]*ScheduleTask, 0) - lenEqual := allInferiors.Len() == stateMachines.Len() - allFind := true - newInferiors := make([]InferiorID, 0) - allInferiors.Ascend(func(inf InferiorID, value Inferior) bool { - if len(newInferiors) >= b.batchSize { - return false - } - st := value.GetStateMachine() - if st == nil { - newInferiors = append(newInferiors, inf) - // The inferior ID is not in the state machine means the two sets are - // not identical. - allFind = false + // Build add inferior tasks. + if b.needAddInferior { + b.addInferiorCache = make([]InferiorID, 0, batchSize) + allInferiors.Ascend(func(inf InferiorID, value Inferior) bool { + st := value.GetStateMachine() + if st == nil || st.State == SchedulerStatusAbsent { + // add case 1: schedule a new inferior + // add case 2: reschedule an absent inferior. Currently, we only reschedule each 2 minutes. + // TODO: store absent inferiors in a separate map to trigger reschedule quickly. + b.addInferiorCache = append(b.addInferiorCache, inf) + } return true + }) + b.needAddInferior = false + } + if len(b.addInferiorCache) > 0 { + batch := batchSize + if batchSize > len(b.addInferiorCache) { + batch = len(b.addInferiorCache) } - // absent status means we should schedule it again - if st.State == SchedulerStatusAbsent { - newInferiors = append(newInferiors, inf) + newInferiors := b.addInferiorCache[:batch] + b.addInferiorCache = b.addInferiorCache[batch:] + if len(b.addInferiorCache) == 0 { + // release for GC + b.addInferiorCache = nil } - return true - }) - // Build add inferior tasks. - if len(newInferiors) > 0 { captureIDs := make([]model.CaptureID, 0, len(aliveCaptures)) for captureID := range aliveCaptures { captureIDs = append(captureIDs, captureID) } - if len(captureIDs) == 0 { // this should never happen, if no server can be found // for a cluster with n captures, n should be at least 2 @@ -76,28 +83,38 @@ func (b *BasicScheduler) Schedule( return tasks } tasks = append(tasks, newBurstAddInferiors(newInferiors, captureIDs)...) - if len(newInferiors) >= b.batchSize { + if len(newInferiors) >= batchSize { return tasks } } // Build remove inferior tasks. - // For most of the time, remove inferiors are unlikely to happen. - // - // Fast path for check whether two sets are identical - if !lenEqual || !allFind { + if b.needRemoveInferior { // The two sets are not identical. We need to build a map to find removed inferiors. - rmInferiorIDs := make([]InferiorID, 0) + b.removeInferiorCache = make([]InferiorID, 0, batchSize) stateMachines.Ascend(func(key InferiorID, value *StateMachine) bool { ok := allInferiors.Has(key) if !ok { - rmInferiorIDs = append(rmInferiorIDs, key) + b.removeInferiorCache = append(b.removeInferiorCache, key) } return true }) - removeInferiorTasks := newBurstRemoveInferiors(rmInferiorIDs, stateMachines) - if removeInferiorTasks != nil { - tasks = append(tasks, removeInferiorTasks...) + b.needRemoveInferior = false + } + if len(b.removeInferiorCache) > 0 { + batch := batchSize - len(tasks) + if batchSize > len(b.removeInferiorCache) { + batch = len(b.removeInferiorCache) + } + rmInferiors := b.removeInferiorCache[:batch] + b.removeInferiorCache = b.removeInferiorCache[batch:] + if len(b.removeInferiorCache) == 0 { + // release for GC + b.removeInferiorCache = nil + } + tasks = append(tasks, newBurstRemoveInferiors(rmInferiors, stateMachines)...) + if len(rmInferiors) >= batchSize { + return tasks } } return tasks @@ -128,19 +145,20 @@ func newBurstAddInferiors(newInferiors []InferiorID, captureIDs []model.CaptureI return addInferiorTasks } +// TODO: maybe remove task does not need captureID. func newBurstRemoveInferiors( rmInferiors []InferiorID, stateMachines utils.Map[InferiorID, *StateMachine], ) []*ScheduleTask { removeTasks := make([]*ScheduleTask, 0, len(rmInferiors)) for _, id := range rmInferiors { - ccf, _ := stateMachines.Get(id) + state, _ := stateMachines.Get(id) var captureID string - for server := range ccf.Servers { + for server := range state.Servers { captureID = server break } - if ccf.Primary == "" { + if state.Primary == "" { log.Warn("primary or secondary not found for removed inferior,"+ "this may happen if the server shutdown", zap.Any("ID", id.String())) diff --git a/scheduler/state_machine_test.go b/scheduler/state_machine_test.go index 44a1ef58f..72254b9cd 100644 --- a/scheduler/state_machine_test.go +++ b/scheduler/state_machine_test.go @@ -76,7 +76,7 @@ func TestInferiorStoppedWhenWorking(t *testing.T) { func(id model.CaptureID) rpc.Message { return &messaging.TargetMessage{} }) - scheduler := NewCombineScheduler(NewBasicScheduler(1000), + scheduler := NewCombineScheduler(NewBasicScheduler(), NewBalanceScheduler(time.Minute, 1000)) allInferior := []InferiorID{cfID1} diff --git a/utils/threadpool/task_scheduler_instance.go b/utils/threadpool/task_scheduler_instance.go index 96d0e7201..5144526c8 100644 --- a/utils/threadpool/task_scheduler_instance.go +++ b/utils/threadpool/task_scheduler_instance.go @@ -21,12 +21,12 @@ import ( TaskSchedulerInstance is a singleton instance. It contains the necessary task schedulers for one instance. */ type TaskSchedulerInstance struct { - WorkerTaskScheduler *TaskScheduler - EventDispatcherTaskScheduler *TaskScheduler - SinkTaskScheduler *TaskScheduler - HeartbeatTaskScheduler *TaskScheduler - EventServiceTaskScheduler *TaskScheduler - MaintainerTaskScheduler *TaskScheduler + // WorkerTaskScheduler *TaskScheduler + // EventDispatcherTaskScheduler *TaskScheduler + // SinkTaskScheduler *TaskScheduler + // HeartbeatTaskScheduler *TaskScheduler + // EventServiceTaskScheduler *TaskScheduler + // MaintainerTaskScheduler *TaskScheduler } var TaskSchedulers *TaskSchedulerInstance @@ -36,12 +36,12 @@ func GetTaskSchedulerInstance() *TaskSchedulerInstance { if TaskSchedulers == nil { once.Do(func() { TaskSchedulers = &TaskSchedulerInstance{ - WorkerTaskScheduler: NewTaskSchedulerDefault("WorkerTask"), - EventDispatcherTaskScheduler: NewTaskSchedulerDefault("EventDispatcherTask"), - SinkTaskScheduler: NewTaskSchedulerDefault("SinkTask"), - HeartbeatTaskScheduler: NewTaskSchedulerDefault("HeartbeatTask"), - EventServiceTaskScheduler: NewTaskSchedulerDefault("EventServiceTask"), - MaintainerTaskScheduler: NewTaskSchedulerDefault("MaintainerTask"), + // WorkerTaskScheduler: NewTaskSchedulerDefault("WorkerTask"), + // EventDispatcherTaskScheduler: NewTaskSchedulerDefault("EventDispatcherTask"), + // SinkTaskScheduler: NewTaskSchedulerDefault("SinkTask"), + // HeartbeatTaskScheduler: NewTaskSchedulerDefault("HeartbeatTask"), + // EventServiceTaskScheduler: NewTaskSchedulerDefault("EventServiceTask"), + // MaintainerTaskScheduler: NewTaskSchedulerDefault("MaintainerTask"), } }) }