Skip to content

Commit

Permalink
fix (hatchery:swarm): clean networks & check ratio (#1277)
Browse files Browse the repository at this point in the history
  • Loading branch information
yesnault authored and sguiheux committed Oct 10, 2017
1 parent 69634bd commit 7570776
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 34 deletions.
48 changes: 30 additions & 18 deletions engine/hatchery/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ var containersCache = struct {
func (h *HatcherySwarm) getContainers() ([]docker.APIContainers, error) {
t := time.Now()

defer log.Debug("getContainers() : %d s", time.Since(t).Seconds())
defer log.Debug("getContainers() : %f s", time.Since(t).Seconds())

containersCache.mu.RLock()
nbServers := len(containersCache.list)
Expand All @@ -137,6 +137,11 @@ func (h *HatcherySwarm) getContainers() ([]docker.APIContainers, error) {
containersCache.mu.Lock()
containersCache.list = s
containersCache.mu.Unlock()

log.Debug("getContainers> %d containers on this host", len(s))
for _, v := range s {
log.Debug("getContainers> container ID:%s names:%+v image:%s created:%d state:%s, status:%s", v.ID, v.Names, v.Image, v.Created, v.State, v.Status)
}
//Remove data from the cache after 2 seconds
go func() {
time.Sleep(2 * time.Second)
Expand Down Expand Up @@ -193,18 +198,19 @@ func (h *HatcherySwarm) killAndRemove(ID string) {
return
}

network, err := h.dockerClient.NetworkInfo(container.NetworkSettings.NetworkID)
if err != nil {
log.Info("killAndRemove> cannot NetworkInfo: %v", err)
h.killAndRemoveContainer(ID)
return
}

// If we succeed to get the network, kill and remove all the container on the network
if netname, ok := network.Labels["worker_net"]; ok {
log.Info("killAndRemove> Remove network %s", netname)
for id := range network.Containers {
h.killAndRemoveContainer(id)
for _, cnetwork := range container.NetworkSettings.Networks {
network, err := h.dockerClient.NetworkInfo(cnetwork.NetworkID)
if err != nil {
log.Info("killAndRemove> cannot NetworkInfo: %v", err)
h.killAndRemoveContainer(ID)
return
}
// If we succeed to get the network, kill and remove all the container on the network
if netname, ok := network.Labels["worker_net"]; ok {
log.Info("killAndRemove> Remove network %s", netname)
for id := range network.Containers {
h.killAndRemoveContainer(id)
}
}
}
}
Expand Down Expand Up @@ -421,14 +427,20 @@ func (h *HatcherySwarm) CanSpawn(model *sdk.Model, jobID int64, requirements []s
}
}

// hatcherySwarm.ratioService: Percent reserved for spwaning worker with service requirement
// hatcherySwarm.ratioService: Percent reserved for spawning worker with service requirement
// if no link -> we need to check ratioService
if len(links) == 0 && len(cs) > 0 {
percentFree := 100 - (100 * len(cs) / h.Config.MaxContainers)
if percentFree <= h.Config.RatioService {
log.Info("CanSpawn> ratio reached. percentFree:%d ratioService:%d", percentFree, h.Config.RatioService)
if len(links) == 0 {
if h.Config.RatioService >= 100 {
log.Debug("CanSpawn> ratioService 100 by conf - no spawn worker without CDS Service")
return false
}
if len(cs) > 0 {
percentFree := 100 - (100 * len(cs) / h.Config.MaxContainers)
if percentFree <= h.Config.RatioService {
log.Debug("CanSpawn> ratio reached. percentFree:%d ratioService:%d", percentFree, h.Config.RatioService)
return false
}
}
}

log.Debug("CanSpawn> %s need %v", model.Name, links)
Expand Down
15 changes: 8 additions & 7 deletions sdk/hatchery/hatchery.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,24 +138,25 @@ func receiveJob(h Interface, isWorkflowJob bool, execGroups []sdk.Group, jobID i

atomic.AddInt64(nRoutines, 1)
defer atomic.AddInt64(nRoutines, -1)
if errR := routine(h, isWorkflowJob, models, execGroups, jobID, requirements, hostname, time.Now().Unix()); errR != nil {
isSpawned, errR := routine(h, isWorkflowJob, models, execGroups, jobID, requirements, hostname, time.Now().Unix())
if errR != nil {
log.Warning("Error on routine: %s", errR)
return false
}
return true
return isSpawned
}

func routine(h Interface, isWorkflowJob bool, models []sdk.Model, execGroups []sdk.Group, jobID int64, requirements []sdk.Requirement, hostname string, timestamp int64) error {
func routine(h Interface, isWorkflowJob bool, models []sdk.Model, execGroups []sdk.Group, jobID int64, requirements []sdk.Requirement, hostname string, timestamp int64) (bool, error) {
defer logTime(h, fmt.Sprintf("routine> %d", timestamp), time.Now())
log.Debug("routine> %d enter", timestamp)

if h.Hatchery() == nil || h.Hatchery().ID == 0 {
log.Debug("Create> continue")
return nil
return false, nil
}

if len(models) == 0 {
return fmt.Errorf("routine> %d - No model returned by CDS api", timestamp)
return false, fmt.Errorf("routine> %d - No model returned by CDS api", timestamp)
}
log.Debug("routine> %d - models received: %d", timestamp, len(models))

Expand Down Expand Up @@ -205,11 +206,11 @@ func routine(h Interface, isWorkflowJob bool, models []sdk.Model, execGroups []s
if err := h.Client().QueueJobSendSpawnInfo(isWorkflowJob, jobID, infos); err != nil {
log.Warning("routine> %d - cannot client.QueueJobSendSpawnInfo for job %d: %s", timestamp, jobID, err)
}
break // ok for this job
return true, nil // ok for this job
}
}

return nil
return false, nil
}

func provisioning(h Interface, provisionDisabled bool, models []sdk.Model) {
Expand Down
21 changes: 12 additions & 9 deletions sdk/hatchery/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ func Create(h Interface) {
tickerCountWorkersStarted := time.NewTicker(time.Duration(2 * time.Second))
tickerGetModels := time.NewTicker(time.Duration(3 * time.Second))

var maxWorkersReached bool
var models []sdk.Model

// Call WorkerModel Enabled first
Expand All @@ -96,9 +95,6 @@ func Create(h Interface) {
workersStarted = int64(h.WorkersStarted())
if workersStarted > int64(h.Configuration().Provision.MaxWorker) {
log.Info("max workers reached. current:%d max:%d", workersStarted, int64(h.Configuration().Provision.MaxWorker))
maxWorkersReached = true
} else {
maxWorkersReached = false
}
log.Debug("workers already started:%d", workersStarted)
case <-tickerGetModels.C:
Expand All @@ -108,25 +104,32 @@ func Create(h Interface) {
log.Error("error on h.Client().WorkerModelsEnabled(): %v", errwm)
}
case j := <-pbjobs:
if maxWorkersReached {
log.Debug("maxWorkerReached:%d", workersStarted)
if workersStarted > int64(h.Configuration().Provision.MaxWorker) {
log.Debug("maxWorkersReached:%d", workersStarted)
continue
}
go func(job sdk.PipelineBuildJob) {
atomic.AddInt64(&workersStarted, 1)
if isRun := receiveJob(h, false, job.ExecGroups, job.ID, job.QueuedSeconds, job.BookedBy, job.Job.Action.Requirements, models, &nRoutines, spawnIDs, hostname); isRun {
atomic.AddInt64(&workersStarted, 1)
spawnIDs.SetDefault(string(job.ID), job.ID)
} else {
atomic.AddInt64(&workersStarted, -1)
}
}(j)
case j := <-wjobs:
if maxWorkersReached {
log.Debug("maxWorkerReached:%d", workersStarted)
if workersStarted > int64(h.Configuration().Provision.MaxWorker) {
log.Debug("maxWorkersReached:%d", workersStarted)
continue
}
go func(job sdk.WorkflowNodeJobRun) {
// count + 1 here, and remove -1 if worker is not started
// this avoid to spawn to many workers compare
atomic.AddInt64(&workersStarted, 1)
if isRun := receiveJob(h, true, nil, job.ID, job.QueuedSeconds, job.BookedBy, job.Job.Action.Requirements, models, &nRoutines, spawnIDs, hostname); isRun {
atomic.AddInt64(&workersStarted, 1)
spawnIDs.SetDefault(string(job.ID), job.ID)
} else {
atomic.AddInt64(&workersStarted, -1)
}
}(j)
case err := <-errs:
Expand Down

0 comments on commit 7570776

Please sign in to comment.