Skip to content

Commit

Permalink
Bugfix/cpuutil strategy segfault (#28)
Browse files Browse the repository at this point in the history
fixed cpu-util calc bug - when tasks complete exec.

When tasks complete execution, task-ranker would no longer receive
cpu usage data for those retired tasks. Previously, the cpu-util
calculation depended on the previously recorded cpu usage information.

The tasks for which cpu usage data is retrieved can be assumed to be
currently active and therefore, cpu utilization is calculated just for
these if cpu usage was previously recorded for them (we need two data
points to calculate cpu utilization).

Added test case that tests cpu-util ranking strategy when tasks retire
and new tasks are co-located.

To reduce redundant code, moved *model.Sample generation into a function.

Moved getMockDataSample(), uniqueTaskSets, hostname, availableCpus
and elapsedTimeSeconds into strategies/testutil.go.

Added mockDataSampleStream() to be used by cpushares test code.
Refactored cpushares test code to use mockDataSampleStream().

consolidated test utilities in strategies/testutil.

Added additional tests for cpu-util strategy testing.
1. Test when there is no cpu usage data.
2. Test when the cpu usage information is available for a different
	set of tasks compared to the previous fetch.
  • Loading branch information
pradykaushik authored Nov 24, 2020
1 parent 0f0da6c commit eea6f06
Show file tree
Hide file tree
Showing 6 changed files with 405 additions and 159 deletions.
1 change: 0 additions & 1 deletion query/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ type Builder struct {
// List of labels to be used to filter the data. This is an optional field.
labelMatchers []*LabelMatcher
// The unit of time to use when performing range queries. This is an optional field.
// If this field is not initialized, then
timeUnit TimeUnit
timeDuration uint
// TODO (pradykaushik) support functions.
Expand Down
32 changes: 32 additions & 0 deletions strategies/strategy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2020 Pradyumna Kaushik
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package strategies

import (
"github.com/pradykaushik/task-ranker/logger"
"log"
"testing"
)

func TestMain(m *testing.M) {
err := logger.Configure()
if err != nil {
log.Println("could not configure logger for strategy testing")
}
m.Run()
err = logger.Done()
if err != nil {
log.Println("could not close logger after strategy testing")
}
}
37 changes: 8 additions & 29 deletions strategies/taskRankCpuSharesStrategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,35 +78,14 @@ func TestTaskRankCpuSharesStrategy_GetRange(t *testing.T) {
// 5. task with id 'test_task_id_3' is allocated a 3072 cpu shares.
func mockCpuSharesData(dedicatedLabelTaskID, dedicatedLabelTaskHost model.LabelName) model.Value {
now := time.Now()
return model.Value(model.Matrix{
{
Metric: map[model.LabelName]model.LabelValue{
dedicatedLabelTaskID: "test_task_id_1",
dedicatedLabelTaskHost: "localhost",
},
Values: []model.SamplePair{
{Timestamp: model.Time(now.Second()), Value: 1024.0},
},
},
{
Metric: map[model.LabelName]model.LabelValue{
dedicatedLabelTaskID: "test_task_id_2",
dedicatedLabelTaskHost: "localhost",
},
Values: []model.SamplePair{
{Timestamp: model.Time(now.Second()), Value: 2048.0},
},
},
{
Metric: map[model.LabelName]model.LabelValue{
dedicatedLabelTaskID: "test_task_id_3",
dedicatedLabelTaskHost: "localhost",
},
Values: []model.SamplePair{
{Timestamp: model.Time(now.Second()), Value: 3072.0},
},
},
})
return model.Matrix{
getMockDataRange(dedicatedLabelTaskID, dedicatedLabelTaskHost, uniqueTaskSets[0][0],
hostname, model.SamplePair{Timestamp: model.Time(now.Second()), Value: 1024.0}),
getMockDataRange(dedicatedLabelTaskID, dedicatedLabelTaskHost, uniqueTaskSets[0][1],
hostname, model.SamplePair{Timestamp: model.Time(now.Second()), Value: 2048.0}),
getMockDataRange(dedicatedLabelTaskID, dedicatedLabelTaskHost, uniqueTaskSets[0][2],
hostname, model.SamplePair{Timestamp: model.Time(now.Second()), Value: 3072.0}),
}
}

func TestTaskRankCpuSharesStrategy_Execute(t *testing.T) {
Expand Down
37 changes: 20 additions & 17 deletions strategies/taskRankCpuUtilStrategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type TaskRankCpuUtilStrategy struct {
// On the other hand, if the tasks are not pinned, then there is no guarantee that the necessary number of data points be available
// as the cpu scheduler can preempt and re-schedule the task on any available cpu.
// Therefore, to avoid confusion, this strategy does not use the range query.
// TODO (pkaushi1) get rid of this eventually.
rangeTimeUnit query.TimeUnit
rangeQty uint
}
Expand Down Expand Up @@ -113,7 +114,7 @@ func (s *TaskRankCpuUtilStrategy) Execute(data model.Value) {
}

var ok bool
// nowTotalCpuUsage stores the total cumulative cpu usage for each running task.
// Stores the total cumulative cpu usage for each running task.
var nowTotalCpuUsage = make(map[string]map[string]*cpuUsageDataPoint)

// Parse Prometheus metrics.
Expand Down Expand Up @@ -170,29 +171,31 @@ func (s *TaskRankCpuUtilStrategy) Execute(data model.Value) {
}
}

// Rank colocated tasks in non-increasing order based on their total cpu utilization (%)
// on the entire host (all cpus).
// Rank colocated tasks in non-increasing order based on their total cpu utilization (%) on the entire host (all cpus).
// TODO (pradykaushik) Periodically drain previousTotalCpuUsage to prevent it from growing indefinitely.
rankedTasks := make(entities.RankedTasks)
for hostname, colocatedTasksCpuUsageInfo := range s.previousTotalCpuUsage {
for taskID, prevTotalCpuUsage := range colocatedTasksCpuUsageInfo {
if prevTotalCpuUsage.dataPoint == nil {
prevTotalCpuUsage.dataPoint = new(cpuUsageDataPoint)
for hostname, cpuUsageColocatedActiveTasks := range nowTotalCpuUsage {
for taskID, totalCpuUsageInfoActiveTask := range cpuUsageColocatedActiveTasks {
// calculating cpu utilization if cpu usage information previously recorded.
prevRecordedTotalCpuUsage := s.previousTotalCpuUsage[hostname][taskID]
if prevRecordedTotalCpuUsage.dataPoint == nil {
prevRecordedTotalCpuUsage.dataPoint = new(cpuUsageDataPoint)
} else {
// Calculating the cpu utilization of this task.
prevTotalCpuUsage.task.Weight = s.round(s.cpuUtil(
prevTotalCpuUsage.dataPoint.totalCumulativeCpuUsage,
prevTotalCpuUsage.dataPoint.timestamp,
nowTotalCpuUsage[hostname][taskID].totalCumulativeCpuUsage,
nowTotalCpuUsage[hostname][taskID].timestamp,
prevRecordedTotalCpuUsage.task.Weight = s.round(s.cpuUtil(
prevRecordedTotalCpuUsage.dataPoint.totalCumulativeCpuUsage,
prevRecordedTotalCpuUsage.dataPoint.timestamp,
totalCpuUsageInfoActiveTask.totalCumulativeCpuUsage,
totalCpuUsageInfoActiveTask.timestamp,
))
rankedTasks[entities.Hostname(hostname)] = append(rankedTasks[entities.Hostname(hostname)], *prevTotalCpuUsage.task)
rankedTasks[entities.Hostname(hostname)] = append(rankedTasks[entities.Hostname(hostname)],
*prevRecordedTotalCpuUsage.task)
}
// Saving current total cumulative cpu usage seconds to calculate cpu utilization in the next interval.
prevTotalCpuUsage.dataPoint.totalCumulativeCpuUsage = nowTotalCpuUsage[hostname][taskID].totalCumulativeCpuUsage
prevTotalCpuUsage.dataPoint.timestamp = nowTotalCpuUsage[hostname][taskID].timestamp
prevRecordedTotalCpuUsage.dataPoint.totalCumulativeCpuUsage = totalCpuUsageInfoActiveTask.totalCumulativeCpuUsage
prevRecordedTotalCpuUsage.dataPoint.timestamp = totalCpuUsageInfoActiveTask.timestamp
}

// Sorting colocated tasks.
// Sorting co-located tasks.
sort.SliceStable(rankedTasks[entities.Hostname(hostname)], func(i, j int) bool {
return rankedTasks[entities.Hostname(hostname)][i].Weight >=
rankedTasks[entities.Hostname(hostname)][j].Weight
Expand Down
Loading

0 comments on commit eea6f06

Please sign in to comment.