Skip to content

Commit

Permalink
*: Add local docker (#142)
Browse files Browse the repository at this point in the history
* add local dockerfile

* fix

* disable thread pool

* fix scheduler

* fix
  • Loading branch information
CharlesCheung96 authored Jul 26, 2024
1 parent f6dd748 commit 7ac5f85
Show file tree
Hide file tree
Showing 9 changed files with 128 additions and 61 deletions.
4 changes: 3 additions & 1 deletion coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func NewCoordinator(capture *model.CaptureInfo, version int64) server.Coordinato
c.supervisor = scheduler.NewSupervisor(
scheduler.ChangefeedID(model.DefaultChangeFeedID("coordinator")),
c.newChangefeed, c.newBootstrapMessage,
scheduler.NewBasicScheduler(1000),
scheduler.NewBasicScheduler(),
scheduler.NewBalanceScheduler(time.Minute, 1000),
)

Expand Down Expand Up @@ -205,6 +205,8 @@ func (c *coordinator) scheduleMaintainer(state *orchestrator.GlobalReactorState)
allChangefeeds.ReplaceOrInsert(scheduler.ChangefeedID(id), cf)
}
}
c.supervisor.MarkNeedAddInferior()
c.supervisor.MarkNeedRemoveInferior()
tasks := c.supervisor.Schedule(
allChangefeeds,
c.supervisor.GetAllCaptures(),
Expand Down
7 changes: 7 additions & 0 deletions dockerfile-local
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# make cdc && docker build -f ./dockerfile-local . -t <target> --platform linux/amd64
FROM alpine:3.15
RUN apk add --no-cache tzdata bash curl socat
EXPOSE 8301
EXPOSE 8300
COPY ./bin/cdc /cdc
CMD [ "/cdc" ]
26 changes: 25 additions & 1 deletion maintainer/maintainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,30 @@ func NewMaintainer(cfID model.ChangeFeedID,
}
m.supervisor = scheduler.NewSupervisor(scheduler.ChangefeedID(cfID),
m.getReplicaSet, m.getNewBootstrapFn(),
scheduler.NewBasicScheduler(1000),
scheduler.NewBasicScheduler(),
scheduler.NewBalanceScheduler(time.Minute, 1000),
)
log.Info("create maintainer", zap.String("id", cfID.String()))
return m
}

func (m *Maintainer) Run() {
// Note: currently the threadPool maybe discard some running task, fix this later.
// threadpool.GetTaskSchedulerInstance().MaintainerTaskScheduler.
// Submit(m, threadpool.CPUTask, time.Now())
go func() {
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for range ticker.C {
newStatus, _ := m.Execute()
if newStatus == threadpool.Done {
log.Warn("maintainer is done", zap.String("id", m.id.String()))
return
}
}
}()
}

func (m *Maintainer) cleanupMetrics() {
metrics.ChangefeedCheckpointTsGauge.DeleteLabelValues(m.id.Namespace, m.id.ID)
metrics.ChangefeedCheckpointTsLagGauge.DeleteLabelValues(m.id.Namespace, m.id.ID)
Expand All @@ -158,6 +175,10 @@ func (m *Maintainer) cleanupMetrics() {

func (m *Maintainer) Execute() (taskStatus threadpool.TaskStatus, tick time.Time) {
log.Info("maintainer execute", zap.String("id", m.id.String()))
defer func() {
log.Info("maintainer execute done", zap.String("id", m.id.String()),
zap.Int("status", int(taskStatus)), zap.Time("tickTime", tick))
}()
m.updateMetrics()
if m.removed.Load() {
// removed, cancel the task
Expand Down Expand Up @@ -347,6 +368,7 @@ func (m *Maintainer) initChangefeed() error {
replicaSet := NewReplicaSet(m.id, tableSpan, m.checkpointTs.Load()).(*ReplicaSet)
m.tableSpans.ReplaceOrInsert(tableSpan, replicaSet)
}
m.supervisor.MarkNeedAddInferior()
return err
}

Expand Down Expand Up @@ -483,6 +505,8 @@ func (m *Maintainer) closeChangefeed() {
m.state = heartbeatpb.ComponentState_Stopping
m.statusChanged.Store(true)
m.tableSpans = utils.NewBtreeMap[scheduler.InferiorID, scheduler.Inferior]()
m.supervisor.MarkNeedAddInferior()
m.supervisor.MarkNeedRemoveInferior()
}
}

Expand Down
4 changes: 1 addition & 3 deletions maintainer/maintainer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/flowbehappy/tigate/heartbeatpb"
appcontext "github.com/flowbehappy/tigate/pkg/common/context"
"github.com/flowbehappy/tigate/pkg/messaging"
"github.com/flowbehappy/tigate/utils/threadpool"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"go.uber.org/zap"
Expand Down Expand Up @@ -188,8 +187,7 @@ func (m *Manager) onDispatchMaintainerRequest(
cf = NewMaintainer(cfID, req.IsSecondary,
cfConfig, req.CheckpointTs, m.pdEndpoints)
m.maintainers.Store(cfID, cf)
threadpool.GetTaskSchedulerInstance().MaintainerTaskScheduler.Submit(cf.(*Maintainer),
threadpool.CPUTask, time.Now())
cf.(*Maintainer).Run()
}
cf.(*Maintainer).isSecondary.Store(req.IsSecondary)
}
Expand Down
24 changes: 19 additions & 5 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Scheduler interface {
allInferiors utils.Map[InferiorID, Inferior],
aliveCaptures map[model.CaptureID]*CaptureStatus,
stateMachines utils.Map[InferiorID, *StateMachine],
batchSize int,
) []*ScheduleTask
}

Expand All @@ -38,10 +39,13 @@ func (s *Supervisor) Schedule(
aliveCaptures map[model.CaptureID]*CaptureStatus,
stateMachines utils.Map[InferiorID, *StateMachine],
) []*ScheduleTask {
if allInferiors.Len() == stateMachines.Len() && time.Since(s.lastScheduleTime) < 60*time.Second {
return nil
if time.Since(s.lastScheduleTime) > 120*time.Second {
s.MarkNeedAddInferior()
s.MarkNeedRemoveInferior()
}
if s.RunningTasks.Len() >= s.maxTaskConcurrency {

batchSize := s.maxTaskConcurrency - s.RunningTasks.Len()
if batchSize <= 0 {
log.Warn("Skip scheduling since there are too many running task",
zap.String("id", s.ID.String()),
zap.Int("totalInferiors", allInferiors.Len()),
Expand All @@ -51,16 +55,26 @@ func (s *Supervisor) Schedule(
)
return nil
}
s.lastScheduleTime = time.Now()
for _, sched := range s.schedulers {
tasks := sched.Schedule(allInferiors, aliveCaptures, stateMachines)
tasks := sched.Schedule(allInferiors, aliveCaptures, stateMachines, batchSize)
if len(tasks) != 0 {
s.lastScheduleTime = time.Now()
return tasks
}
}
return nil
}

func (s *Supervisor) MarkNeedAddInferior() {
basciScheduler := s.schedulers[0].(*BasicScheduler)
basciScheduler.needAddInferior = true
}

func (s *Supervisor) MarkNeedRemoveInferior() {
basciScheduler := s.schedulers[0].(*BasicScheduler)
basciScheduler.needRemoveInferior = true
}

func (s *Supervisor) Name() string {
return "combine-scheduler"
}
6 changes: 5 additions & 1 deletion scheduler/scheduler_balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ func (b *balanceScheduler) Schedule(
allInferiors utils.Map[InferiorID, Inferior],
aliveCaptures map[model.CaptureID]*CaptureStatus,
stateMachines utils.Map[InferiorID, *StateMachine],
batchSize int,
) []*ScheduleTask {
if b.maxTaskConcurrency < batchSize {
batchSize = b.maxTaskConcurrency
}
if !b.forceBalance {
now := time.Now()
if now.Sub(b.lastRebalanceTime) < b.checkBalanceInterval {
Expand All @@ -70,7 +74,7 @@ func (b *balanceScheduler) Schedule(
}

tasks := buildBalanceMoveTables(
b.random, aliveCaptures, stateMachines, b.maxTaskConcurrency)
b.random, aliveCaptures, stateMachines, batchSize)
b.forceBalance = len(tasks) != 0
return tasks
}
Expand Down
92 changes: 55 additions & 37 deletions scheduler/scheduler_basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@ import (
)

type BasicScheduler struct {
batchSize int
needAddInferior bool
addInferiorCache []InferiorID

needRemoveInferior bool
removeInferiorCache []InferiorID
}

func NewBasicScheduler(batchSize int) *BasicScheduler {
return &BasicScheduler{batchSize: batchSize}
func NewBasicScheduler() *BasicScheduler {
return &BasicScheduler{}
}

func (b *BasicScheduler) Name() string {
Expand All @@ -36,37 +40,40 @@ func (b *BasicScheduler) Schedule(
allInferiors utils.Map[InferiorID, Inferior],
aliveCaptures map[model.CaptureID]*CaptureStatus,
stateMachines utils.Map[InferiorID, *StateMachine],
batchSize int,
) []*ScheduleTask {
tasks := make([]*ScheduleTask, 0)
lenEqual := allInferiors.Len() == stateMachines.Len()
allFind := true
newInferiors := make([]InferiorID, 0)
allInferiors.Ascend(func(inf InferiorID, value Inferior) bool {
if len(newInferiors) >= b.batchSize {
return false
}
st := value.GetStateMachine()
if st == nil {
newInferiors = append(newInferiors, inf)
// The inferior ID is not in the state machine means the two sets are
// not identical.
allFind = false
// Build add inferior tasks.
if b.needAddInferior {
b.addInferiorCache = make([]InferiorID, 0, batchSize)
allInferiors.Ascend(func(inf InferiorID, value Inferior) bool {
st := value.GetStateMachine()
if st == nil || st.State == SchedulerStatusAbsent {
// add case 1: schedule a new inferior
// add case 2: reschedule an absent inferior. Currently, we only reschedule each 2 minutes.
// TODO: store absent inferiors in a separate map to trigger reschedule quickly.
b.addInferiorCache = append(b.addInferiorCache, inf)
}
return true
})
b.needAddInferior = false
}
if len(b.addInferiorCache) > 0 {
batch := batchSize
if batchSize > len(b.addInferiorCache) {
batch = len(b.addInferiorCache)
}
// absent status means we should schedule it again
if st.State == SchedulerStatusAbsent {
newInferiors = append(newInferiors, inf)
newInferiors := b.addInferiorCache[:batch]
b.addInferiorCache = b.addInferiorCache[batch:]
if len(b.addInferiorCache) == 0 {
// release for GC
b.addInferiorCache = nil
}
return true
})

// Build add inferior tasks.
if len(newInferiors) > 0 {
captureIDs := make([]model.CaptureID, 0, len(aliveCaptures))
for captureID := range aliveCaptures {
captureIDs = append(captureIDs, captureID)
}

if len(captureIDs) == 0 {
// this should never happen, if no server can be found
// for a cluster with n captures, n should be at least 2
Expand All @@ -76,28 +83,38 @@ func (b *BasicScheduler) Schedule(
return tasks
}
tasks = append(tasks, newBurstAddInferiors(newInferiors, captureIDs)...)
if len(newInferiors) >= b.batchSize {
if len(newInferiors) >= batchSize {
return tasks
}
}

// Build remove inferior tasks.
// For most of the time, remove inferiors are unlikely to happen.
//
// Fast path for check whether two sets are identical
if !lenEqual || !allFind {
if b.needRemoveInferior {
// The two sets are not identical. We need to build a map to find removed inferiors.
rmInferiorIDs := make([]InferiorID, 0)
b.removeInferiorCache = make([]InferiorID, 0, batchSize)
stateMachines.Ascend(func(key InferiorID, value *StateMachine) bool {
ok := allInferiors.Has(key)
if !ok {
rmInferiorIDs = append(rmInferiorIDs, key)
b.removeInferiorCache = append(b.removeInferiorCache, key)
}
return true
})
removeInferiorTasks := newBurstRemoveInferiors(rmInferiorIDs, stateMachines)
if removeInferiorTasks != nil {
tasks = append(tasks, removeInferiorTasks...)
b.needRemoveInferior = false
}
if len(b.removeInferiorCache) > 0 {
batch := batchSize - len(tasks)
if batchSize > len(b.removeInferiorCache) {
batch = len(b.removeInferiorCache)
}
rmInferiors := b.removeInferiorCache[:batch]
b.removeInferiorCache = b.removeInferiorCache[batch:]
if len(b.removeInferiorCache) == 0 {
// release for GC
b.removeInferiorCache = nil
}
tasks = append(tasks, newBurstRemoveInferiors(rmInferiors, stateMachines)...)
if len(rmInferiors) >= batchSize {
return tasks
}
}
return tasks
Expand Down Expand Up @@ -128,19 +145,20 @@ func newBurstAddInferiors(newInferiors []InferiorID, captureIDs []model.CaptureI
return addInferiorTasks
}

// TODO: maybe remove task does not need captureID.
func newBurstRemoveInferiors(
rmInferiors []InferiorID,
stateMachines utils.Map[InferiorID, *StateMachine],
) []*ScheduleTask {
removeTasks := make([]*ScheduleTask, 0, len(rmInferiors))
for _, id := range rmInferiors {
ccf, _ := stateMachines.Get(id)
state, _ := stateMachines.Get(id)
var captureID string
for server := range ccf.Servers {
for server := range state.Servers {
captureID = server
break
}
if ccf.Primary == "" {
if state.Primary == "" {
log.Warn("primary or secondary not found for removed inferior,"+
"this may happen if the server shutdown",
zap.Any("ID", id.String()))
Expand Down
2 changes: 1 addition & 1 deletion scheduler/state_machine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestInferiorStoppedWhenWorking(t *testing.T) {
func(id model.CaptureID) rpc.Message {
return &messaging.TargetMessage{}
})
scheduler := NewCombineScheduler(NewBasicScheduler(1000),
scheduler := NewCombineScheduler(NewBasicScheduler(),
NewBalanceScheduler(time.Minute, 1000))
allInferior := []InferiorID{cfID1}

Expand Down
24 changes: 12 additions & 12 deletions utils/threadpool/task_scheduler_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ import (
TaskSchedulerInstance is a singleton instance. It contains the necessary task schedulers for one instance.
*/
type TaskSchedulerInstance struct {
WorkerTaskScheduler *TaskScheduler
EventDispatcherTaskScheduler *TaskScheduler
SinkTaskScheduler *TaskScheduler
HeartbeatTaskScheduler *TaskScheduler
EventServiceTaskScheduler *TaskScheduler
MaintainerTaskScheduler *TaskScheduler
// WorkerTaskScheduler *TaskScheduler
// EventDispatcherTaskScheduler *TaskScheduler
// SinkTaskScheduler *TaskScheduler
// HeartbeatTaskScheduler *TaskScheduler
// EventServiceTaskScheduler *TaskScheduler
// MaintainerTaskScheduler *TaskScheduler
}

var TaskSchedulers *TaskSchedulerInstance
Expand All @@ -36,12 +36,12 @@ func GetTaskSchedulerInstance() *TaskSchedulerInstance {
if TaskSchedulers == nil {
once.Do(func() {
TaskSchedulers = &TaskSchedulerInstance{
WorkerTaskScheduler: NewTaskSchedulerDefault("WorkerTask"),
EventDispatcherTaskScheduler: NewTaskSchedulerDefault("EventDispatcherTask"),
SinkTaskScheduler: NewTaskSchedulerDefault("SinkTask"),
HeartbeatTaskScheduler: NewTaskSchedulerDefault("HeartbeatTask"),
EventServiceTaskScheduler: NewTaskSchedulerDefault("EventServiceTask"),
MaintainerTaskScheduler: NewTaskSchedulerDefault("MaintainerTask"),
// WorkerTaskScheduler: NewTaskSchedulerDefault("WorkerTask"),
// EventDispatcherTaskScheduler: NewTaskSchedulerDefault("EventDispatcherTask"),
// SinkTaskScheduler: NewTaskSchedulerDefault("SinkTask"),
// HeartbeatTaskScheduler: NewTaskSchedulerDefault("HeartbeatTask"),
// EventServiceTaskScheduler: NewTaskSchedulerDefault("EventServiceTask"),
// MaintainerTaskScheduler: NewTaskSchedulerDefault("MaintainerTask"),
}
})
}
Expand Down

0 comments on commit 7ac5f85

Please sign in to comment.