diff --git a/query/builder.go b/query/builder.go index 4352f23..5d45846 100644 --- a/query/builder.go +++ b/query/builder.go @@ -27,7 +27,6 @@ type Builder struct { // List of labels to be used to filter the data. This is an optional field. labelMatchers []*LabelMatcher // The unit of time to use when performing range queries. This is an optional field. - // If this field is not initialized, then timeUnit TimeUnit timeDuration uint // TODO (pradykaushik) support functions. diff --git a/strategies/strategy_test.go b/strategies/strategy_test.go new file mode 100644 index 0000000..a1583ae --- /dev/null +++ b/strategies/strategy_test.go @@ -0,0 +1,32 @@ +// Copyright 2020 Pradyumna Kaushik +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package strategies + +import ( + "github.com/pradykaushik/task-ranker/logger" + "log" + "testing" +) + +func TestMain(m *testing.M) { + err := logger.Configure() + if err != nil { + log.Println("could not configure logger for strategy testing") + } + m.Run() + err = logger.Done() + if err != nil { + log.Println("could not close logger after strategy testing") + } +} diff --git a/strategies/taskRankCpuSharesStrategy_test.go b/strategies/taskRankCpuSharesStrategy_test.go index a53a5f2..9f4702b 100644 --- a/strategies/taskRankCpuSharesStrategy_test.go +++ b/strategies/taskRankCpuSharesStrategy_test.go @@ -78,35 +78,14 @@ func TestTaskRankCpuSharesStrategy_GetRange(t *testing.T) { // 5. task with id 'test_task_id_3' is allocated a 3072 cpu shares. func mockCpuSharesData(dedicatedLabelTaskID, dedicatedLabelTaskHost model.LabelName) model.Value { now := time.Now() - return model.Value(model.Matrix{ - { - Metric: map[model.LabelName]model.LabelValue{ - dedicatedLabelTaskID: "test_task_id_1", - dedicatedLabelTaskHost: "localhost", - }, - Values: []model.SamplePair{ - {Timestamp: model.Time(now.Second()), Value: 1024.0}, - }, - }, - { - Metric: map[model.LabelName]model.LabelValue{ - dedicatedLabelTaskID: "test_task_id_2", - dedicatedLabelTaskHost: "localhost", - }, - Values: []model.SamplePair{ - {Timestamp: model.Time(now.Second()), Value: 2048.0}, - }, - }, - { - Metric: map[model.LabelName]model.LabelValue{ - dedicatedLabelTaskID: "test_task_id_3", - dedicatedLabelTaskHost: "localhost", - }, - Values: []model.SamplePair{ - {Timestamp: model.Time(now.Second()), Value: 3072.0}, - }, - }, - }) + return model.Matrix{ + getMockDataRange(dedicatedLabelTaskID, dedicatedLabelTaskHost, uniqueTaskSets[0][0], + hostname, model.SamplePair{Timestamp: model.Time(now.Second()), Value: 1024.0}), + getMockDataRange(dedicatedLabelTaskID, dedicatedLabelTaskHost, uniqueTaskSets[0][1], + hostname, model.SamplePair{Timestamp: model.Time(now.Second()), Value: 2048.0}), + getMockDataRange(dedicatedLabelTaskID, dedicatedLabelTaskHost, uniqueTaskSets[0][2], + hostname, model.SamplePair{Timestamp: model.Time(now.Second()), Value: 3072.0}), + } } func TestTaskRankCpuSharesStrategy_Execute(t *testing.T) { diff --git a/strategies/taskRankCpuUtilStrategy.go b/strategies/taskRankCpuUtilStrategy.go index fdf4a36..2dba4c8 100644 --- a/strategies/taskRankCpuUtilStrategy.go +++ b/strategies/taskRankCpuUtilStrategy.go @@ -52,6 +52,7 @@ type TaskRankCpuUtilStrategy struct { // On the other hand, if the tasks are not pinned, then there is no guarantee that the necessary number of data points be available // as the cpu scheduler can preempt and re-schedule the task on any available cpu. // Therefore, to avoid confusion, this strategy does not use the range query. + // TODO (pkaushi1) get rid of this eventually. rangeTimeUnit query.TimeUnit rangeQty uint } @@ -113,7 +114,7 @@ func (s *TaskRankCpuUtilStrategy) Execute(data model.Value) { } var ok bool - // nowTotalCpuUsage stores the total cumulative cpu usage for each running task. + // Stores the total cumulative cpu usage for each running task. var nowTotalCpuUsage = make(map[string]map[string]*cpuUsageDataPoint) // Parse Prometheus metrics. @@ -170,29 +171,31 @@ func (s *TaskRankCpuUtilStrategy) Execute(data model.Value) { } } - // Rank colocated tasks in non-increasing order based on their total cpu utilization (%) - // on the entire host (all cpus). + // Rank colocated tasks in non-increasing order based on their total cpu utilization (%) on the entire host (all cpus). + // TODO (pradykaushik) Periodically drain previousTotalCpuUsage to prevent it from growing indefinitely. rankedTasks := make(entities.RankedTasks) - for hostname, colocatedTasksCpuUsageInfo := range s.previousTotalCpuUsage { - for taskID, prevTotalCpuUsage := range colocatedTasksCpuUsageInfo { - if prevTotalCpuUsage.dataPoint == nil { - prevTotalCpuUsage.dataPoint = new(cpuUsageDataPoint) + for hostname, cpuUsageColocatedActiveTasks := range nowTotalCpuUsage { + for taskID, totalCpuUsageInfoActiveTask := range cpuUsageColocatedActiveTasks { + // calculating cpu utilization if cpu usage information previously recorded. + prevRecordedTotalCpuUsage := s.previousTotalCpuUsage[hostname][taskID] + if prevRecordedTotalCpuUsage.dataPoint == nil { + prevRecordedTotalCpuUsage.dataPoint = new(cpuUsageDataPoint) } else { - // Calculating the cpu utilization of this task. - prevTotalCpuUsage.task.Weight = s.round(s.cpuUtil( - prevTotalCpuUsage.dataPoint.totalCumulativeCpuUsage, - prevTotalCpuUsage.dataPoint.timestamp, - nowTotalCpuUsage[hostname][taskID].totalCumulativeCpuUsage, - nowTotalCpuUsage[hostname][taskID].timestamp, + prevRecordedTotalCpuUsage.task.Weight = s.round(s.cpuUtil( + prevRecordedTotalCpuUsage.dataPoint.totalCumulativeCpuUsage, + prevRecordedTotalCpuUsage.dataPoint.timestamp, + totalCpuUsageInfoActiveTask.totalCumulativeCpuUsage, + totalCpuUsageInfoActiveTask.timestamp, )) - rankedTasks[entities.Hostname(hostname)] = append(rankedTasks[entities.Hostname(hostname)], *prevTotalCpuUsage.task) + rankedTasks[entities.Hostname(hostname)] = append(rankedTasks[entities.Hostname(hostname)], + *prevRecordedTotalCpuUsage.task) } // Saving current total cumulative cpu usage seconds to calculate cpu utilization in the next interval. - prevTotalCpuUsage.dataPoint.totalCumulativeCpuUsage = nowTotalCpuUsage[hostname][taskID].totalCumulativeCpuUsage - prevTotalCpuUsage.dataPoint.timestamp = nowTotalCpuUsage[hostname][taskID].timestamp + prevRecordedTotalCpuUsage.dataPoint.totalCumulativeCpuUsage = totalCpuUsageInfoActiveTask.totalCumulativeCpuUsage + prevRecordedTotalCpuUsage.dataPoint.timestamp = totalCpuUsageInfoActiveTask.timestamp } - // Sorting colocated tasks. + // Sorting co-located tasks. sort.SliceStable(rankedTasks[entities.Hostname(hostname)], func(i, j int) bool { return rankedTasks[entities.Hostname(hostname)][i].Weight >= rankedTasks[entities.Hostname(hostname)][j].Weight diff --git a/strategies/taskRankCpuUtilStrategy_test.go b/strategies/taskRankCpuUtilStrategy_test.go index fb1e770..a638b67 100644 --- a/strategies/taskRankCpuUtilStrategy_test.go +++ b/strategies/taskRankCpuUtilStrategy_test.go @@ -81,83 +81,39 @@ func TestTaskRankCpuUtilStrategy_GetRange(t *testing.T) { } } -var elapsedTime float64 = 0 - -// mockConstCpuUtilData returns a mock of prometheus time series data. +// mockCpuUtilDataAlwaysUsingAllCpus returns a mock of prometheus time series data. +// This mock is useful to test scenarios where tasks are N-level parallel (N >= #cpus) and use up all the cpus all the time. +// // This mock has the following information. // 1. Three tasks with ids 'test_task_id_{1..3}'. // 2. Hostname for all tasks is localhost. -// 3. For each task, cpu usage data is provided for two cpus, 'cpu00' and 'cpu01'. +// 3. For each task, cpu usage data is provided for both cpus, 'cpu00' and 'cpu01'. // 4. task with id 'test_task_id_1' demonstrates cpu utilization of 22.5% on each cpu. // 5. task with id 'test_task_id_2' demonstrates cpu utilization of 30% on each cpu. // 6. task with id 'test_task_id_3' demonstrates cpu utilization of 67.5% on each cpu. -func mockConstCpuUtilData(dedicatedLabelTaskID, dedicatedLabelTaskHost model.LabelName) (mockedCpuUtilData model.Value) { - mockedCpuUtilData = model.Value(model.Vector{ - { - Metric: map[model.LabelName]model.LabelValue{ - dedicatedLabelTaskID: "test_task_id_1", - dedicatedLabelTaskHost: "localhost", - "cpu": "cpu00", - }, - Value: model.SampleValue(0.225 * (elapsedTime + 1)), - Timestamp: model.Time(1000 * (elapsedTime + 1)), - }, - { - Metric: map[model.LabelName]model.LabelValue{ - dedicatedLabelTaskID: "test_task_id_1", - dedicatedLabelTaskHost: "localhost", - "cpu": "cpu01", - }, - Value: model.SampleValue(0.225 * (elapsedTime + 1)), - Timestamp: model.Time(1000 * (elapsedTime + 1)), - }, - { - Metric: map[model.LabelName]model.LabelValue{ - dedicatedLabelTaskID: "test_task_id_2", - dedicatedLabelTaskHost: "localhost", - "cpu": "cpu00", - }, - Value: model.SampleValue(0.3 * (elapsedTime + 1)), - Timestamp: model.Time(1000 * (elapsedTime + 1)), - }, - { - Metric: map[model.LabelName]model.LabelValue{ - dedicatedLabelTaskID: "test_task_id_2", - dedicatedLabelTaskHost: "localhost", - "cpu": "cpu01", - }, - Value: model.SampleValue(0.3 * (elapsedTime + 1)), - Timestamp: model.Time(1000 * (elapsedTime + 1)), - }, - { - Metric: map[model.LabelName]model.LabelValue{ - dedicatedLabelTaskID: "test_task_id_3", - dedicatedLabelTaskHost: "localhost", - "cpu": "cpu00", - }, - Value: model.SampleValue(0.675 * (elapsedTime + 1)), - Timestamp: model.Time(1000 * (elapsedTime + 1)), - }, - { - Metric: map[model.LabelName]model.LabelValue{ - dedicatedLabelTaskID: "test_task_id_3", - dedicatedLabelTaskHost: "localhost", - "cpu": "cpu01", - }, - Value: model.SampleValue(0.675 * (elapsedTime + 1)), - Timestamp: model.Time(1000 * (elapsedTime + 1)), - }, - }) - elapsedTime++ +func mockCpuUtilDataAlwaysUsingAllCpus(dedicatedLabelTaskID, dedicatedLabelTaskHost model.LabelName) (mockedCpuUtilData model.Value) { + mockedCpuUtilData = model.Vector{ + getMockDataSample(dedicatedLabelTaskID, dedicatedLabelTaskHost, uniqueTaskSets[0][0], hostname, availableCpus[0], + 0.225*(elapsedTimeSeconds+1), 1000*(elapsedTimeSeconds+1)), + getMockDataSample(dedicatedLabelTaskID, dedicatedLabelTaskHost, uniqueTaskSets[0][0], hostname, availableCpus[1], + 0.225*(elapsedTimeSeconds+1), 1000*(elapsedTimeSeconds+1)), + getMockDataSample(dedicatedLabelTaskID, dedicatedLabelTaskHost, uniqueTaskSets[0][1], hostname, availableCpus[0], + 0.3*(elapsedTimeSeconds+1), 1000*(elapsedTimeSeconds+1)), + getMockDataSample(dedicatedLabelTaskID, dedicatedLabelTaskHost, uniqueTaskSets[0][1], hostname, availableCpus[1], + 0.3*(elapsedTimeSeconds+1), 1000*(elapsedTimeSeconds+1)), + getMockDataSample(dedicatedLabelTaskID, dedicatedLabelTaskHost, uniqueTaskSets[0][2], hostname, availableCpus[0], + 0.675*(elapsedTimeSeconds+1), 1000*(elapsedTimeSeconds+1)), + getMockDataSample(dedicatedLabelTaskID, dedicatedLabelTaskHost, uniqueTaskSets[0][2], hostname, availableCpus[1], + 0.675*(elapsedTimeSeconds+1), 1000*(elapsedTimeSeconds+1)), + } + elapsedTimeSeconds++ return } -var availableCpus = map[int]model.LabelValue{ - 0: "cpu00", - 1: "cpu01", -} - -// mockVaryingCpuUtilData returns a mock of prometheus time series data. +// mockCpuUtilDataUsingOnlySomeCpus returns a mock of prometheus time series data. +// This mock is useful to test scenarios where tasks are using only some of the available cpus. +// In addition, this mock mimics real behavior of the OS cpu scheduler assigning threads to any available cpu. +// // This mock has the following information. // 1. Three tasks with ids 'test_task_id_{1..3}'. // 2. Hostname for all tasks is localhost. @@ -165,49 +121,25 @@ var availableCpus = map[int]model.LabelValue{ // 4. task with id 'test_task_id_1' demonstrates total cpu utilization of 45%. // 5. task with id 'test_task_id_2' demonstrates total cpu utilization of 60%. // 6. task with id 'test_task_id_3' demonstrates total cpu utilization of 135%. -func mockVaryingCpuUtilData(dedicatedLabelTaskID, dedicatedLabelTaskHost model.LabelName) (mockedCpuUtilData model.Value) { - mockedCpuUtilData = model.Value(model.Vector{ - { - Metric: map[model.LabelName]model.LabelValue{ - dedicatedLabelTaskID: "test_task_id_1", - dedicatedLabelTaskHost: "localhost", - "cpu": availableCpus[rand.Intn(2)], - }, - Value: model.SampleValue(0.45 * (elapsedTime + 1)), - Timestamp: model.Time(1000 * (elapsedTime + 1)), - }, - { - Metric: map[model.LabelName]model.LabelValue{ - dedicatedLabelTaskID: "test_task_id_2", - dedicatedLabelTaskHost: "localhost", - "cpu": availableCpus[rand.Intn(2)], - }, - Value: model.SampleValue(0.6 * (elapsedTime + 1)), - Timestamp: model.Time(1000 * (elapsedTime + 1)), - }, - { - Metric: map[model.LabelName]model.LabelValue{ - dedicatedLabelTaskID: "test_task_id_3", - dedicatedLabelTaskHost: "localhost", - "cpu": "cpu00", - }, - Value: model.SampleValue(0.9 * (elapsedTime + 1)), - Timestamp: model.Time(1000 * (elapsedTime + 1)), - }, - { - Metric: map[model.LabelName]model.LabelValue{ - dedicatedLabelTaskID: "test_task_id_3", - dedicatedLabelTaskHost: "localhost", - "cpu": "cpu01", - }, - Value: model.SampleValue(0.45 * (elapsedTime + 1)), - Timestamp: model.Time(1000 * (elapsedTime + 1)), - }, - }) - elapsedTime++ +func mockCpuUtilDataUsingOnlySomeCpus(dedicatedLabelTaskID, dedicatedLabelTaskHost model.LabelName) (mockedCpuUtilData model.Value) { + mockedCpuUtilData = model.Vector{ + getMockDataSample(dedicatedLabelTaskID, dedicatedLabelTaskHost, uniqueTaskSets[0][0], hostname, availableCpus[0], + 0.45*(elapsedTimeSeconds+1), 1000*(elapsedTimeSeconds+1)), + getMockDataSample(dedicatedLabelTaskID, dedicatedLabelTaskHost, uniqueTaskSets[0][1], hostname, availableCpus[0], + 0.6*(elapsedTimeSeconds+1), 1000*(elapsedTimeSeconds+1)), + getMockDataSample(dedicatedLabelTaskID, dedicatedLabelTaskHost, uniqueTaskSets[0][2], hostname, availableCpus[0], + 0.9*(elapsedTimeSeconds+1), 1000*(elapsedTimeSeconds+1)), + getMockDataSample(dedicatedLabelTaskID, dedicatedLabelTaskHost, uniqueTaskSets[0][2], hostname, availableCpus[1], + 0.45*(elapsedTimeSeconds+1), 1000*(elapsedTimeSeconds+1)), + } + elapsedTimeSeconds++ return } +func mockEmptyTaskSetCpuUtilData() model.Value { + return model.Vector{} +} + func TestTaskRankCpuUtilStrategy_Execute(t *testing.T) { receiver := &cpuUtilRanksReceiver{} s := &TaskRankCpuUtilStrategy{ @@ -260,9 +192,21 @@ func TestTaskRankCpuUtilStrategy_Execute(t *testing.T) { }, } + t.Run("no data retrieved from prometheus", func(t *testing.T) { + for i := 0; i < 5; i++ { // Just testing multiple times. + data := mockEmptyTaskSetCpuUtilData() + s.Execute(data) + + assert.Empty(t, receiver.rankedTasks) + } + }) + t.Run("tasks demonstrate constant cpu usage and use all cpus", func(t *testing.T) { + s.Init() // re-initializing. + elapsedTimeSeconds = 0 + receiver.rankedTasks = make(entities.RankedTasks) for i := 0; i < 5; i++ { - data := mockConstCpuUtilData("container_label_task_id", "container_label_task_host") + data := mockCpuUtilDataAlwaysUsingAllCpus("container_label_task_id", "container_label_task_host") s.Execute(data) if i == 0 { @@ -279,14 +223,22 @@ func TestTaskRankCpuUtilStrategy_Execute(t *testing.T) { assert.ElementsMatch(t, expectedRankedTasks["localhost"], receiver.rankedTasks["localhost"]) } - }) t.Run("tasks demonstrate varying cpu usage and do not run on all cpus", func(t *testing.T) { - for i := 0; i < 5; i++ { // Starting from 5 to simulate cumulative cpu usage from previous test. - data := mockVaryingCpuUtilData("container_label_task_id", "container_label_task_host") + s.Init() // re-initializing. + elapsedTimeSeconds = 0 + receiver.rankedTasks = make(entities.RankedTasks) + for i := 0; i < 5; i++ { + data := mockCpuUtilDataUsingOnlySomeCpus("container_label_task_id", "container_label_task_host") s.Execute(data) + if i == 0 { + // No ranked tasks yet as we only have one second of data. + assert.Empty(t, receiver.rankedTasks) + continue + } + assert.Equal(t, len(expectedRankedTasks), len(receiver.rankedTasks)) _, ok := expectedRankedTasks["localhost"] @@ -295,6 +247,220 @@ func TestTaskRankCpuUtilStrategy_Execute(t *testing.T) { assert.ElementsMatch(t, expectedRankedTasks["localhost"], receiver.rankedTasks["localhost"]) } + }) + + t.Run("cpu usage data received for different subsets of cpus", func(t *testing.T) { + s.Init() // re-initializing. + receiver.rankedTasks = make(entities.RankedTasks) + ///////////////////////////////////////////////// ROUND 1 ///////////////////////////////////////////////// + // CPU usage data received for three active tasks. + data := model.Vector{ + getMockDataSample("container_label_task_id", "container_label_task_host", uniqueTaskSets[0][0], + hostname, availableCpus[0], 0.45, 1000), + getMockDataSample("container_label_task_id", "container_label_task_host", uniqueTaskSets[0][1], + hostname, availableCpus[0], 0.30, 1000), + getMockDataSample("container_label_task_id", "container_label_task_host", uniqueTaskSets[0][2], + hostname, availableCpus[0], 0.675, 1000), + } + + s.Execute(data) + // No ranked tasks yet as we only have one second of data. + assert.Empty(t, receiver.rankedTasks) + + ///////////////////////////////////////////////// ROUND 2 ///////////////////////////////////////////////// + // Changing the cpu for which cpu usage information is received for a task. + data = model.Vector{ + getMockDataSample("container_label_task_id", "container_label_task_host", uniqueTaskSets[0][0], + hostname, availableCpus[1], 0.90, 2000), + getMockDataSample("container_label_task_id", "container_label_task_host", uniqueTaskSets[0][1], + hostname, availableCpus[0], 0.30, 2000), + getMockDataSample("container_label_task_id", "container_label_task_host", uniqueTaskSets[0][1], + hostname, availableCpus[1], 0.60, 2000), + getMockDataSample("container_label_task_id", "container_label_task_host", uniqueTaskSets[0][2], + hostname, availableCpus[0], 1.35, 2000), + getMockDataSample("container_label_task_id", "container_label_task_host", uniqueTaskSets[0][2], + hostname, availableCpus[1], 0.675, 2000), + } + + expectedRankedTasks = map[entities.Hostname][]entities.Task{ + "localhost": { + { + Metric: map[model.LabelName]model.LabelValue{ + "container_label_task_id": "test_task_id_3", + "container_label_task_host": "localhost", + "cpu": "cpu00", + }, + ID: "test_task_id_3", + Hostname: "localhost", + // Expected sum of cpu util (%) on cpu00 and cpu01. + Weight: 135.0, + }, + { + Metric: map[model.LabelName]model.LabelValue{ + "container_label_task_id": "test_task_id_2", + "container_label_task_host": "localhost", + "cpu": "cpu00", + }, + ID: "test_task_id_2", + Hostname: "localhost", + // Expected sum of cpu util (%) on cpu00 and cpu01. + Weight: 60.0, + }, + { + Metric: map[model.LabelName]model.LabelValue{ + "container_label_task_id": "test_task_id_1", + "container_label_task_host": "localhost", + "cpu": "cpu00", + }, + ID: "test_task_id_1", + Hostname: "localhost", + // Expected sum of cpu util (%) on cpu00 and cpu01. + Weight: 45.0, + }, + }, + } + + s.Execute(data) + + assert.Equal(t, len(expectedRankedTasks), len(receiver.rankedTasks)) + + _, ok := expectedRankedTasks["localhost"] + _, localhostIsInRankedTasks := receiver.rankedTasks["localhost"] + assert.True(t, ok == localhostIsInRankedTasks) + + assert.ElementsMatch(t, expectedRankedTasks["localhost"], receiver.rankedTasks["localhost"]) + }) + + t.Run("cpu usage information received after some tasks complete execution new tasks added", func(t *testing.T) { + s.Init() // re-initializing. + receiver.rankedTasks = make(entities.RankedTasks) + ///////////////////////////////////////////////// ROUND 1 ///////////////////////////////////////////////// + // CPU usage information received for three tasks. + data := model.Vector{ + getMockDataSample("container_label_task_id", "container_label_task_host", uniqueTaskSets[0][0], + hostname, availableCpus[0], 0.45, 1000), + getMockDataSample("container_label_task_id", "container_label_task_host", uniqueTaskSets[0][1], + hostname, availableCpus[0], 0.30, 1000), + getMockDataSample("container_label_task_id", "container_label_task_host", uniqueTaskSets[0][2], + hostname, availableCpus[0], 0.675, 1000), + } + + s.Execute(data) + // No ranked tasks yet as we only have one second of data. + assert.Empty(t, receiver.rankedTasks) + + ///////////////////////////////////////////////// ROUND 2 ///////////////////////////////////////////////// + // Changing the cpu for which cpu usage information is received for some tasks. + data = model.Vector{ + getMockDataSample("container_label_task_id", "container_label_task_host", uniqueTaskSets[0][0], + hostname, availableCpus[1], 0.90, 2000), + getMockDataSample("container_label_task_id", "container_label_task_host", uniqueTaskSets[0][1], + hostname, availableCpus[0], 0.30, 2000), + getMockDataSample("container_label_task_id", "container_label_task_host", uniqueTaskSets[0][1], + hostname, availableCpus[1], 0.60, 2000), + getMockDataSample("container_label_task_id", "container_label_task_host", uniqueTaskSets[0][2], + hostname, availableCpus[0], 1.35, 2000), + getMockDataSample("container_label_task_id", "container_label_task_host", uniqueTaskSets[0][2], + hostname, availableCpus[1], 0.675, 2000), + } + + expectedRankedTasks = map[entities.Hostname][]entities.Task{ + "localhost": { + { + Metric: map[model.LabelName]model.LabelValue{ + "container_label_task_id": "test_task_id_3", + "container_label_task_host": "localhost", + "cpu": "cpu00", + }, + ID: "test_task_id_3", + Hostname: "localhost", + // Expected sum of cpu util (%) on cpu00 and cpu01. + Weight: 135.0, + }, + { + Metric: map[model.LabelName]model.LabelValue{ + "container_label_task_id": "test_task_id_2", + "container_label_task_host": "localhost", + "cpu": "cpu00", + }, + ID: "test_task_id_2", + Hostname: "localhost", + // Expected sum of cpu util (%) on cpu00 and cpu01. + Weight: 60.0, + }, + { + Metric: map[model.LabelName]model.LabelValue{ + "container_label_task_id": "test_task_id_1", + "container_label_task_host": "localhost", + "cpu": "cpu00", + }, + ID: "test_task_id_1", + Hostname: "localhost", + // Expected sum of cpu util (%) on cpu00 and cpu01. + Weight: 45.0, + }, + }, + } + + s.Execute(data) + + assert.Equal(t, len(expectedRankedTasks), len(receiver.rankedTasks)) + + _, ok := expectedRankedTasks["localhost"] + _, localhostIsInRankedTasks := receiver.rankedTasks["localhost"] + assert.True(t, ok == localhostIsInRankedTasks) + + assert.ElementsMatch(t, expectedRankedTasks["localhost"], receiver.rankedTasks["localhost"]) + + ///////////////////////////////////////////////// ROUND 3 ///////////////////////////////////////////////// + // Assuming test_task_id_3 completed execution and a new task was co-located with the still active tasks. + data = model.Vector{ + getMockDataSample("container_label_task_id", "container_label_task_host", uniqueTaskSets[0][0], + hostname, availableCpus[0], 1.35, 3000), + getMockDataSample("container_label_task_id", "container_label_task_host", uniqueTaskSets[0][0], + hostname, availableCpus[1], 0.45, 3000), + getMockDataSample("container_label_task_id", "container_label_task_host", uniqueTaskSets[0][1], + hostname, availableCpus[1], 1.5, 3000), + getMockDataSample("container_label_task_id", "container_label_task_host", uniqueTaskSets[1][0], + hostname, availableCpus[1], 0.70, 3000), + } + + // We should only expect cpu utilization data for the still active tasks, excluding the new task. + expectedRankedTasks = map[entities.Hostname][]entities.Task{ + "localhost": { + { + Metric: map[model.LabelName]model.LabelValue{ + "container_label_task_id": uniqueTaskSets[0][0], + "container_label_task_host": hostname, + "cpu": availableCpus[0], + }, + ID: string(uniqueTaskSets[0][0]), + Hostname: string(hostname), + // Expected sum of cpu util (%) on cpu00 and cpu01. + Weight: 90.0, + }, + { + Metric: map[model.LabelName]model.LabelValue{ + "container_label_task_id": uniqueTaskSets[0][1], + "container_label_task_host": hostname, + "cpu": availableCpus[0], + }, + ID: string(uniqueTaskSets[0][1]), + Hostname: string(hostname), + // Expected sum of cpu util (%) on cpu00 and cpu01. + Weight: 60.0, + }, + }, + } + + s.Execute(data) + + assert.Equal(t, len(expectedRankedTasks), len(receiver.rankedTasks)) + + _, ok = expectedRankedTasks["localhost"] + _, localhostIsInRankedTasks = receiver.rankedTasks["localhost"] + assert.True(t, ok == localhostIsInRankedTasks) + assert.ElementsMatch(t, expectedRankedTasks["localhost"], receiver.rankedTasks["localhost"]) }) } diff --git a/strategies/testutil.go b/strategies/testutil.go new file mode 100644 index 0000000..2349c4b --- /dev/null +++ b/strategies/testutil.go @@ -0,0 +1,67 @@ +// Copyright 2020 Pradyumna Kaushik +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package strategies + +import "github.com/prometheus/common/model" + +var elapsedTimeSeconds float64 = 0 + +const hostname model.LabelValue = "localhost" + +// Task IDs to create mocks. +var uniqueTaskSets = map[int][]model.LabelValue{ + 0: {"test_task_id_1", "test_task_id_2", "test_task_id_3"}, + 1: {"test_task_id_4", "test_task_id_5", "test_task_id_6"}, + 2: {"test_task_id_7", "test_task_id_8", "test_task_id_9"}, +} + +var availableCpus = map[int]model.LabelValue{ + 0: "cpu00", + 1: "cpu01", +} + +// getMockDataSample returns a data sample mimicking cpu_usage_seconds information +// retrieved from prometheus for a single task. +func getMockDataSample( + dedicatedLabelTaskID, dedicatedLabelTaskHost model.LabelName, + taskID, hostname, cpu model.LabelValue, + cumulativeCpuUsageSeconds float64, + timestamp float64) *model.Sample { + + return &model.Sample{ + Metric: map[model.LabelName]model.LabelValue{ + dedicatedLabelTaskID: taskID, + dedicatedLabelTaskHost: hostname, + "cpu": cpu, + }, + Value: model.SampleValue(cumulativeCpuUsageSeconds), + Timestamp: model.Time(timestamp), + } +} + +// getMockDataRange returns a data sample stream mimicking cpu_usage_seconds information +// retrieved as a result of a range query from prometheus for a single task. +func getMockDataRange( + dedicatedLabelTaskID, dedicatedLabelTaskHost model.LabelName, + taskID, hostname model.LabelValue, + values ...model.SamplePair) *model.SampleStream { + + return &model.SampleStream{ + Metric: map[model.LabelName]model.LabelValue{ + dedicatedLabelTaskID: taskID, + dedicatedLabelTaskHost: hostname, + }, + Values: values, + } +}