diff --git a/README.md b/README.md index cb5182f..50d152e 100644 --- a/README.md +++ b/README.md @@ -74,31 +74,19 @@ tRanker, err = New( {Type: query.TaskHostname, Label: "container_label_task_host", Operator: query.Equal, Value: "localhost"}, }, new(dummyTaskRanksReceiver), 1*time.Second)) ``` +**The task ranker schedule (in seconds) SHOULD be a positive multiple of the prometheus scrape interval. This simplifies the calculation +of the time difference between data points fetched from successive query executions.** -You can now also configure the strategies using initialization [options](./strategies/strategy.go). This allows for +You can now also configure the strategies using initialization [options](./strategies/strategy.go). This also allows for configuring the time duration of range queries, enabling fine-grained control over the number of data points -over which the strategy is applied. See below example for strategy configuration using options. +over which the strategy is applied. See below code snippet for strategy configuration using options. ```go -type dummyTaskRanksReceiver struct{} - -func (r *dummyTaskRanksReceiver) Receive(rankedTasks entities.RankedTasks) { - log.Println(rankedTasks) -} - -prometheusDataFetcher, err = prometheus.NewDataFetcher( - prometheus.WithPrometheusEndpoint("http://localhost:9090")) - -tRanker, err = New( - WithDataFetcher(prometheusDataFetcher), - WithSchedule("?/5 * * * * *"), - WithStrategyOptions("cpuutil", - strategies.WithLabelMatchers([]*query.LabelMatcher{ - {Type: query.TaskID, Label: "container_label_task_id", Operator: query.NotEqual, Value: ""}, - {Type: query.TaskHostname, Label: "container_label_task_host", Operator: query.Equal, Value: "localhost"}}), - strategies.WithTaskRanksReceiver(new(dummyTaskRanksReceiver)), - strategies.WithPrometheusScrapeInterval(1*time.Second), - strategies.WithRange(query.Seconds, 5))) +WithStrategyOptions("dummyStrategy", + strategies.WithLabelMatchers([]*query.LabelMatcher{...} + strategies.WithTaskRanksReceiver(new(testTaskRanksReceiver)), + strategies.WithRange(query.Seconds, 5))) ``` +_Note: Currently, none of the strategies implemented (**cpushares** and **cpuutil**) support range queries._ ##### Dedicated Label Matchers Dedicated Label Matchers can be used to retrieve the task ID and host information from data retrieved @@ -160,3 +148,13 @@ HOST = localhost #### Tear-Down Once finished testing, tear down the test environment by running [`./tear_down_test_env`](./tear_down_test_env). + +### Logs +Task Ranker uses [logrus](https://github.com/sirupsen/logrus) for logging. To prevent logs from Task Ranker +mixing in with logs from the application that is using it, console logging is disabled. +There are [two types of logs](./logger/logger.go) as mentioned below. +1. Task Ranker logs - These logs are Task Ranker specific and correspond to functioning of the library. + These logs are written to a file named **_task\_ranker\_logs\_\.log_**. +2. Task Ranking Results logs - These are the results of task ranking using one of task ranking strategies. + These logs are written to a file named **_task\_ranking\_results\_\.log_**. To simplify parsing + these logs are written in JSON format. diff --git a/datafetcher/prometheus/dataFetcher.go b/datafetcher/prometheus/dataFetcher.go index 4e37acf..54077d6 100644 --- a/datafetcher/prometheus/dataFetcher.go +++ b/datafetcher/prometheus/dataFetcher.go @@ -18,11 +18,13 @@ import ( "context" "github.com/pkg/errors" "github.com/pradykaushik/task-ranker/datafetcher" + "github.com/pradykaushik/task-ranker/logger" "github.com/pradykaushik/task-ranker/query" "github.com/pradykaushik/task-ranker/strategies" "github.com/prometheus/client_golang/api" "github.com/prometheus/client_golang/api/prometheus/v1" "github.com/prometheus/common/model" + "github.com/sirupsen/logrus" "net/http" "time" ) @@ -94,7 +96,7 @@ func (f *DataFetcher) Fetch() (result model.Value, err error) { var client api.Client client, err = api.NewClient(api.Config{Address: f.endpoint}) if err != nil { - err = errors.Wrap(err, "failed to fetch data from prometheus") + err = errors.Wrap(err, "failed to create prometheus client") return } v1Api := v1.NewAPI(client) @@ -102,5 +104,12 @@ func (f *DataFetcher) Fetch() (result model.Value, err error) { defer cancel() // TODO do not ignore warnings. maybe log them? result, _, err = v1Api.Query(ctx, queryString, time.Now()) + if err == nil { + logger.WithFields(logrus.Fields{ + "stage": "data-fetcher", + "query": queryString, + "query_result": result, + }).Log(logrus.InfoLevel, "data fetched") + } return } diff --git a/go.mod b/go.mod index 914db45..3ca7303 100644 --- a/go.mod +++ b/go.mod @@ -9,5 +9,6 @@ require ( github.com/prometheus/client_golang v1.6.0 github.com/prometheus/common v0.9.1 github.com/robfig/cron/v3 v3.0.0 + github.com/sirupsen/logrus v1.6.0 github.com/stretchr/testify v1.6.0 ) diff --git a/go.sum b/go.sum index 00b3ca9..94e5b63 100644 --- a/go.sum +++ b/go.sum @@ -33,6 +33,7 @@ github.com/json-iterator/go v1.1.9 h1:9yzud/Ht36ygwatGx56VwCZtlI/2AD15T1X2sjSuGn github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -70,6 +71,8 @@ github.com/robfig/cron/v3 v3.0.0 h1:kQ6Cb7aHOHTSzNVNEhmp8EcWKLb4CbiMW9h9VyIhO4E= github.com/robfig/cron/v3 v3.0.0/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= +github.com/sirupsen/logrus v1.6.0 h1:UBcNElsrwanuuMsnGSlYmtmgbb23qDR5dG+6X6Oo89I= +github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= @@ -89,6 +92,7 @@ golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200420163511-1957bb5e6d1f h1:gWF768j/LaZugp8dyS4UwsslYCYz9XgFxvlgsn0n9H8= golang.org/x/sys v0.0.0-20200420163511-1957bb5e6d1f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/logger/hook.go b/logger/hook.go new file mode 100644 index 0000000..660fa29 --- /dev/null +++ b/logger/hook.go @@ -0,0 +1,81 @@ +// Copyright 2020 Pradyumna Kaushik +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package logger + +import ( + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "io" +) + +// WriterHook is a hook that writes logs, that contain at least one of the specified set of topics +// as keys in its fields, to the specified Writer. The logs are formatted using the specified formatter. +type WriterHook struct { + formatter logrus.Formatter + writer io.Writer + topics map[string]struct{} +} + +// newWriterHook instantiates and returns a new WriterHook. +func newWriterHook(formatter logrus.Formatter, writer io.Writer, topics ...string) logrus.Hook { + hook := &WriterHook{ + formatter: formatter, + writer: writer, + topics: make(map[string]struct{}), + } + for _, topic := range topics { + hook.topics[topic] = struct{}{} + } + + return hook +} + +// Levels return the list of levels for which this hook will be fired. +func (h WriterHook) Levels() []logrus.Level { + // We do not want debug and trace level logs to be persisted as they are typically temporary. + return []logrus.Level{ + logrus.PanicLevel, + logrus.FatalLevel, + logrus.ErrorLevel, + logrus.WarnLevel, + logrus.InfoLevel, + } +} + +// Fire checks whether the fields in the provided entry contain at least one of the specified +// topics and if yes, formats the entry using the specified formatter and then writes it to the +// specified Writer. +func (h *WriterHook) Fire(entry *logrus.Entry) error { + // Logging only if any of the provided topics are found as keys in fields. + allow := false + for topic := range h.topics { + if _, ok := entry.Data[topic]; ok { + allow = true + break + } + } + + var err error + var formattedLog []byte + if allow { + formattedLog, err = h.formatter.Format(entry) + if err != nil { + err = errors.Wrap(err, "failed to format entry") + } else { + _, err = h.writer.Write(formattedLog) + } + } + return err +} diff --git a/logger/logger.go b/logger/logger.go new file mode 100644 index 0000000..8a536ae --- /dev/null +++ b/logger/logger.go @@ -0,0 +1,131 @@ +// Copyright 2020 Pradyumna Kaushik +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package logger + +import ( + "fmt" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "io/ioutil" + "os" + "time" +) + +const ( + // Using RFC3339Nano as the timestamp format for logs as prometheus scrape interval can be in milliseconds. + timestampFormat = time.RFC3339Nano + // Prefix of the name of the log file to store task ranker logs. + // This will be suffixed with the timestamp, associated with creating the file, to obtain the log filename. + taskRankerLogFilePrefix = "task_ranker_logs" + // Prefix of the name of the log file to store task ranking results. + // This will be suffixed with the timestamp, associated with creating the file, to obtain the log filename. + taskRankingResultsLogFilePrefix = "task_ranking_results" + // Giving everyone read and write permissions to the log files. + logFilePermissions = 0666 +) + +// instantiating a Logger to be used. This instance is configured and maintained locally. +var log = logrus.New() + +var taskRankerLogFile *os.File +var taskRankingResultsLogFile *os.File + +// createTaskRankerLogFile creates the log file to which task ranker logs are persisted. +func createTaskRankerLogFile(now time.Time) error { + var err error + filename := fmt.Sprintf("%s_%v.log", taskRankerLogFilePrefix, now.UnixNano()) + taskRankerLogFile, err = os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, logFilePermissions) + if err != nil { + err = errors.Wrap(err, "failed to create task ranker operations log file") + } + return err +} + +// createTaskRankingResultsLogFile creates the log file to which task ranking results are persisted. +func createTaskRankingResultsLogFile(now time.Time) error { + var err error + filename := fmt.Sprintf("%s_%v.log", taskRankingResultsLogFilePrefix, now.UnixNano()) + taskRankingResultsLogFile, err = os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, logFilePermissions) + if err != nil { + err = errors.Wrap(err, "failed to create task ranker log file") + } + return err +} + +// Configure the logger. To be prevented task ranker logs from mixing with the logs of the application +// that is using it, logging to the console is disabled and instead hooks that redirect logs to corresponding +// log files are attached to the logger. +func Configure() error { + // Disabling log to stdout. + log.SetOutput(ioutil.Discard) + // Setting highest log level. + log.SetLevel(logrus.InfoLevel) + + // Creating the log files. + now := time.Now() + var err error + + if err = createTaskRankerLogFile(now); err != nil { + return err + } + if err = createTaskRankingResultsLogFile(now); err != nil { + return err + } + + // Instantiate the hooks. + jsonFormatter := &logrus.JSONFormatter{ + DisableHTMLEscape: true, + TimestampFormat: timestampFormat, + } + + textFormatter := &logrus.TextFormatter{ + DisableColors: true, + FullTimestamp: true, + TimestampFormat: timestampFormat, + } + + log.AddHook(newWriterHook(textFormatter, taskRankerLogFile, "stage", "query", "query_result")) + log.AddHook(newWriterHook(jsonFormatter, taskRankingResultsLogFile, "task_ranking_results")) + + return nil +} + +func Done() error { + var err error + if taskRankerLogFile != nil { + err = taskRankerLogFile.Close() + if err != nil { + err = errors.Wrap(err, "failed to close task ranker log file") + } + } + + if taskRankingResultsLogFile != nil { + err = taskRankingResultsLogFile.Close() + if err != nil { + err = errors.Wrap(err, "failed to close tank ranking results log file") + } + } + return err +} + +// Aliasing logrus functions. +var WithField = log.WithField +var WithFields = log.WithFields +var Info = log.Info +var Infof = log.Infof +var Error = log.Error +var Errorf = log.Errorf +var Warn = log.Warn +var Warnf = log.Warnf diff --git a/query/builder.go b/query/builder.go index d75a235..4352f23 100644 --- a/query/builder.go +++ b/query/builder.go @@ -30,7 +30,7 @@ type Builder struct { // If this field is not initialized, then timeUnit TimeUnit timeDuration uint - // TODO (pkaushi1) support functions. + // TODO (pradykaushik) support functions. } // NewBuilder returns a new Builder by applying all the given options. diff --git a/ranker.go b/ranker.go index 2344bfb..d69bbd6 100644 --- a/ranker.go +++ b/ranker.go @@ -15,14 +15,16 @@ package taskranker import ( + "fmt" "github.com/pkg/errors" df "github.com/pradykaushik/task-ranker/datafetcher" + "github.com/pradykaushik/task-ranker/logger" "github.com/pradykaushik/task-ranker/query" "github.com/pradykaushik/task-ranker/strategies" "github.com/pradykaushik/task-ranker/strategies/factory" "github.com/pradykaushik/task-ranker/util" "github.com/robfig/cron/v3" - "log" + "github.com/sirupsen/logrus" "time" ) @@ -43,6 +45,8 @@ type TaskRanker struct { runner *cron.Cron // termCh is a channel used to signal the task ranker to stop. termCh *util.SignalChannel + // prometheusScrapeInterval corresponds to the time interval between two successive metrics scrapes. + prometheusScrapeInterval time.Duration } func New(options ...Option) (*TaskRanker, error) { @@ -52,8 +56,34 @@ func New(options ...Option) (*TaskRanker, error) { return nil, errors.Wrap(err, "failed to create task ranker") } } + + // checking if schedule provided. + if tRanker.Schedule == nil { + return nil, errors.New("invalid schedule provided for task ranker") + } + + // validate task ranker schedule to be a multiple of prometheus scrape interval. + now := time.Unix(50000, 50000) + nextTimeTRankerSchedule := tRanker.Schedule.Next(now) + tRankerScheduleIntervalSeconds := int(nextTimeTRankerSchedule.Sub(now).Seconds()) + if (tRankerScheduleIntervalSeconds < int(tRanker.prometheusScrapeInterval.Seconds())) || + ((tRankerScheduleIntervalSeconds % int(tRanker.prometheusScrapeInterval.Seconds())) != 0) { + return nil, errors.New(fmt.Sprintf("task ranker schedule (%d seconds) should be a multiple of "+ + "prometheus scrape interval (%d seconds)", tRankerScheduleIntervalSeconds, int(tRanker.prometheusScrapeInterval.Seconds()))) + } + // Providing the prometheus scrape interval to the strategy. + tRanker.Strategy.SetPrometheusScrapeInterval(tRanker.prometheusScrapeInterval) tRanker.termCh = util.NewSignalChannel() - return tRanker, nil + + // Configuring logger. + err := logger.Configure() + if err != nil { + err = errors.Wrap(err, "failed to configure logger") + if err = logger.Done(); err != nil { + err = errors.Wrap(err, "failed to shutdown logger") + } + } + return tRanker, err } type Option func(*TaskRanker) error @@ -68,6 +98,17 @@ func WithDataFetcher(dataFetcher df.Interface) Option { } } +// WithPrometheusScrapeInterval returns an option that initializes the prometheus scrape interval. +func WithPrometheusScrapeInterval(prometheusScrapeInterval time.Duration) Option { + return func(tRanker *TaskRanker) error { + if prometheusScrapeInterval == 0 { + return errors.New("invalid prometheus scrape interval: should be > 0") + } + tRanker.prometheusScrapeInterval = prometheusScrapeInterval + return nil + } +} + // WithStrategy builds the task ranking strategy associated with the given name using the provided information. // For backwards compatibility, strategies that use range queries will use the default duration. If the time // duration for the range query needs to be configured, then use WithStrategyOptions(...) to configure the strategy @@ -75,8 +116,7 @@ func WithDataFetcher(dataFetcher df.Interface) Option { func WithStrategy( strategy string, labelMatchers []*query.LabelMatcher, - receiver strategies.TaskRanksReceiver, - prometheusScrapeInterval time.Duration) Option { + receiver strategies.TaskRanksReceiver) Option { return func(tRanker *TaskRanker) error { if strategy == "" { @@ -89,8 +129,7 @@ func WithStrategy( tRanker.Strategy = s err := strategies.Build(s, strategies.WithLabelMatchers(labelMatchers), - strategies.WithTaskRanksReceiver(receiver), - strategies.WithPrometheusScrapeInterval(prometheusScrapeInterval)) + strategies.WithTaskRanksReceiver(receiver)) if err != nil { return errors.Wrap(err, "failed to build strategy") } @@ -134,6 +173,9 @@ func WithSchedule(specString string) Option { } func (tRanker *TaskRanker) Start() { + logger.WithFields(logrus.Fields{ + "stage": "task-ranker", + }).Log(logrus.InfoLevel, "starting task ranker cron job") tRanker.runner = cron.New(cron.WithSeconds()) tRanker.runner.Schedule(tRanker.Schedule, tRanker) tRanker.runner.Start() @@ -145,13 +187,22 @@ func (tRanker *TaskRanker) Run() { } result, err := tRanker.DataFetcher.Fetch() if err != nil { - log.Println(err.Error()) + logger.WithFields(logrus.Fields{ + "stage": "data-fetcher", + }).Log(logrus.ErrorLevel, err.Error()) } else { tRanker.Strategy.Execute(result) } } func (tRanker *TaskRanker) Stop() { + logger.WithFields(logrus.Fields{ + "stage": "task-ranker", + }).Log(logrus.InfoLevel, "stopping task ranker cron job") tRanker.termCh.Close() tRanker.runner.Stop() + err := logger.Done() + if err != nil { + fmt.Printf("failed to shutdown logger: %v", err) + } } diff --git a/ranker_e2e_test.go b/ranker_e2e_test.go index 7b4c2a1..4732fac 100644 --- a/ranker_e2e_test.go +++ b/ranker_e2e_test.go @@ -51,10 +51,11 @@ func initTaskRanker(strategy string) (*TaskRanker, error) { tRanker, err = New( WithDataFetcher(prometheusDataFetcher), WithSchedule("?/5 * * * * *"), + WithPrometheusScrapeInterval(1*time.Second), WithStrategy(strategy, []*query.LabelMatcher{ {Type: query.TaskID, Label: "container_label_task_id", Operator: query.EqualRegex, Value: "hello_.*"}, {Type: query.TaskHostname, Label: "container_label_task_host", Operator: query.Equal, Value: "localhost"}, - }, dummyReceiver, 1*time.Second)) + }, dummyReceiver)) return tRanker, err } @@ -74,13 +75,12 @@ func initTaskRankerOptions(strategy string) (*TaskRanker, error) { tRanker, err = New( WithDataFetcher(prometheusDataFetcher), WithSchedule("?/5 * * * * *"), + WithPrometheusScrapeInterval(1*time.Second), WithStrategyOptions(strategy, strategies.WithLabelMatchers([]*query.LabelMatcher{ {Type: query.TaskID, Label: "container_label_task_id", Operator: query.EqualRegex, Value: "hello_.*"}, {Type: query.TaskHostname, Label: "container_label_task_host", Operator: query.Equal, Value: "localhost"}}), - strategies.WithTaskRanksReceiver(dummyReceiver), - strategies.WithPrometheusScrapeInterval(1*time.Second), - strategies.WithRange(query.Seconds, 5))) + strategies.WithTaskRanksReceiver(dummyReceiver))) return tRanker, err @@ -102,7 +102,7 @@ func testStrategy(t *testing.T, tRanker *TaskRanker, initErr error) { assert.NoError(t, initErr) assert.NotNil(t, tRanker) tRanker.Start() - <-time.After(7 * time.Second) // Enough time for one round of ranking. + <-time.After(13 * time.Second) // Enough time for at least one round of ranking. testRanked(t) tRanker.Stop() } diff --git a/ranker_test.go b/ranker_test.go index 2e01bbb..bd6b8fa 100644 --- a/ranker_test.go +++ b/ranker_test.go @@ -15,12 +15,14 @@ package taskranker import ( + "github.com/pradykaushik/task-ranker/datafetcher" "github.com/pradykaushik/task-ranker/datafetcher/prometheus" "github.com/pradykaushik/task-ranker/query" "github.com/pradykaushik/task-ranker/strategies" "github.com/robfig/cron/v3" "github.com/stretchr/testify/assert" "testing" + "time" ) func TestNew(t *testing.T) { @@ -50,7 +52,48 @@ func TestNew(t *testing.T) { tRanker, initErr := initTaskRankerOptions("cpushares") test(tRanker, initErr) timeUnit, qty := tRanker.Strategy.GetRange() - assert.Equal(t, query.Seconds, timeUnit) - assert.Equal(t, uint(5), qty) + assert.Equal(t, query.None, timeUnit) + assert.Equal(t, uint(0), qty) + }) +} + +func TestNew_InvalidSchedule(t *testing.T) { + var prometheusDataFetcher datafetcher.Interface + var err error + var tRanker *TaskRanker + + prometheusDataFetcher, err = prometheus.NewDataFetcher( + prometheus.WithPrometheusEndpoint("http://localhost:9090")) + assert.NoError(t, err, "failed to instantiate data fetcher") + + dummyReceiver = new(dummyTaskRanksReceiver) + t.Run("schedule (in seconds) < prometheus scrape interval", func(t *testing.T) { + // Setting the task ranker schedule to every 3 seconds (< 5 seconds). + tRanker, err = New( + WithDataFetcher(prometheusDataFetcher), + WithSchedule("?/3 * * * * *"), + WithPrometheusScrapeInterval(5*time.Second), + WithStrategyOptions("cpushares", + strategies.WithLabelMatchers([]*query.LabelMatcher{ + {Type: query.TaskID, Label: "container_label_task_id", Operator: query.EqualRegex, Value: "hello_.*"}, + {Type: query.TaskHostname, Label: "container_label_task_host", Operator: query.Equal, Value: "localhost"}}), + strategies.WithTaskRanksReceiver(dummyReceiver))) + assert.Error(t, err, "task ranker schedule validation failed") + assert.Nil(t, tRanker, "task ranker instantiated with invalid schedule") + }) + + t.Run("task ranker schedule (in seconds) is not a positive multiple of prometheus scrape interval", func(t *testing.T) { + // Setting the task ranker schedule to every 7 seconds (not a positive multiple of 5). + tRanker, err = New( + WithDataFetcher(prometheusDataFetcher), + WithSchedule("?/7 * * * * *"), + WithPrometheusScrapeInterval(5*time.Second), + WithStrategyOptions("cpushares", + strategies.WithLabelMatchers([]*query.LabelMatcher{ + {Type: query.TaskID, Label: "container_label_task_id", Operator: query.EqualRegex, Value: "hello_.*"}, + {Type: query.TaskHostname, Label: "container_label_task_host", Operator: query.Equal, Value: "localhost"}}), + strategies.WithTaskRanksReceiver(dummyReceiver))) + assert.Error(t, err, "task ranker schedule validation failed") + assert.Nil(t, tRanker, "task ranker instantiated with invalid schedule") }) } diff --git a/strategies/strategy.go b/strategies/strategy.go index caba439..ecc0cb3 100644 --- a/strategies/strategy.go +++ b/strategies/strategy.go @@ -97,11 +97,3 @@ func WithTaskRanksReceiver(receiver TaskRanksReceiver) Option { return nil } } - -// WithPrometheusScrapeInterval returns an option that initializes the prometheus scrape interval. -func WithPrometheusScrapeInterval(prometheusScrapeInterval time.Duration) Option { - return func(strategy Interface) error { - strategy.SetPrometheusScrapeInterval(prometheusScrapeInterval) - return nil - } -} diff --git a/strategies/taskRankCpuSharesStrategy.go b/strategies/taskRankCpuSharesStrategy.go index a5b149a..03b07b0 100644 --- a/strategies/taskRankCpuSharesStrategy.go +++ b/strategies/taskRankCpuSharesStrategy.go @@ -17,8 +17,10 @@ package strategies import ( "github.com/pkg/errors" "github.com/pradykaushik/task-ranker/entities" + "github.com/pradykaushik/task-ranker/logger" "github.com/pradykaushik/task-ranker/query" "github.com/prometheus/common/model" + "github.com/sirupsen/logrus" "sort" "time" ) @@ -120,6 +122,10 @@ func (s *TaskRankCpuSharesStrategy) Execute(data model.Value) { } // Submitting the ranked tasks to the receiver. + logger.WithFields(logrus.Fields{ + "task_ranking_strategy": "cpushares", + "task_ranking_results": tasks, + }).Log(logrus.InfoLevel, "strategy executed") s.receiver.Receive(tasks) } diff --git a/strategies/taskRankCpuUtilStrategy.go b/strategies/taskRankCpuUtilStrategy.go index 8910889..fdf4a36 100644 --- a/strategies/taskRankCpuUtilStrategy.go +++ b/strategies/taskRankCpuUtilStrategy.go @@ -14,20 +14,23 @@ package strategies import ( + "fmt" "github.com/pkg/errors" "github.com/pradykaushik/task-ranker/entities" + "github.com/pradykaushik/task-ranker/logger" "github.com/pradykaushik/task-ranker/query" "github.com/prometheus/common/model" + "github.com/sirupsen/logrus" "math" "sort" "time" ) // TaskRankCpuUtilStrategy is a task ranking strategy that ranks the tasks -// in non-increasing order based on the cpu utilization (%) in the past 5 intervals of time. +// in non-increasing order based on their cpu utilization (%). // -// For example, if Prometheus scrapes metrics every 1s, then each time interval is 1s long. -// This strategy then would then rank tasks based on their cpu utilization in the past 5 seconds. +// For example, if Prometheus scrapes metrics every 1s, then this strategy would rank tasks based +// on their cpu utilization in the past second. type TaskRankCpuUtilStrategy struct { // receiver of the results of task ranking. receiver TaskRanksReceiver @@ -41,15 +44,45 @@ type TaskRankCpuUtilStrategy struct { dedicatedLabelNameTaskHostname model.LabelName // prometheusScrapeInterval corresponds to the time interval between two successive metrics scrapes. prometheusScrapeInterval time.Duration + // previousTotalCpuUsage stores the sum of cumulative cpu usage seconds for each running task. + previousTotalCpuUsage map[string]map[string]*taskCpuUsageInfo // Time duration for range query. + // Note that there is a caveat in using range queries to retrieve cpu time for containers. + // If the tasks are pinned, then using a range > 1s works as we would always get the necessary data points for each cpu (thread). + // On the other hand, if the tasks are not pinned, then there is no guarantee that the necessary number of data points be available + // as the cpu scheduler can preempt and re-schedule the task on any available cpu. + // Therefore, to avoid confusion, this strategy does not use the range query. rangeTimeUnit query.TimeUnit rangeQty uint } +type taskCpuUsageInfo struct { + task *entities.Task + dataPoint *cpuUsageDataPoint +} + +type cpuUsageDataPoint struct { + totalCumulativeCpuUsage float64 + timestamp model.Time +} + +func (d cpuUsageDataPoint) String() string { + return fmt.Sprintf("cumulativeCpuUsage[%f], Timestamp[%d]", d.totalCumulativeCpuUsage, d.timestamp) +} + +func (c taskCpuUsageInfo) String() string { + prev := "nil" + if c.dataPoint != nil { + prev = fmt.Sprintf("%s", c.dataPoint) + } + + return fmt.Sprintf("Weight[%f], Prev[%s]", c.task.Weight, prev) +} + func (s *TaskRankCpuUtilStrategy) Init() { - // By default, rank tasks based on past 5 seconds cpu usage. - s.rangeTimeUnit = query.Seconds - s.rangeQty = 5 + s.rangeTimeUnit = query.None + s.rangeQty = 0 + s.previousTotalCpuUsage = make(map[string]map[string]*taskCpuUsageInfo) } // SetPrometheusScrapeInterval sets the scrape interval of prometheus. @@ -65,70 +98,70 @@ func (s *TaskRankCpuUtilStrategy) SetTaskRanksReceiver(receiver TaskRanksReceive // Execute the strategy using the provided data. func (s *TaskRankCpuUtilStrategy) Execute(data model.Value) { valueT := data.Type() - var matrix model.Matrix - // Safety check to make sure that we cast to matrix only if value type is matrix. + var vector model.Vector + // Safety check to make sure that we cast to vector only if value type is vector. // Note, however, that as the strategy decides the metric and the range for fetching data, - // it can assume the value type. For example, if a range is provided, then the value type would - // be a matrix. + // it can assume the value type. + // For example, if a range is provided, then the value type would be a matrix. + // If no range is provided, the value type would be a vector. switch valueT { - case model.ValMatrix: - matrix = data.(model.Matrix) + case model.ValVector: + vector = data.(model.Vector) default: // invalid value type. // TODO do not ignore this. maybe log it? } - // cpuLabelName is the label name that can be used to fetch the cpu associated with the usage data. - const cpuLabelName model.LabelName = "cpu" - - type hostValueT model.LabelValue - type taskIDValueT model.LabelValue - var ok bool - // allTasksCpuUsageSeconds stores the cpu utilization (%) for all tasks on each cpu on each host. - allTasksCpuUsageSeconds := make(map[hostValueT]map[taskIDValueT]*entities.Task) + // nowTotalCpuUsage stores the total cumulative cpu usage for each running task. + var nowTotalCpuUsage = make(map[string]map[string]*cpuUsageDataPoint) // Parse Prometheus metrics. - // TODO (pkaushi1) make efficient and parallelize parsing of Prometheus metrics. - for _, sampleStream := range matrix { - // Not considering task for ranking if < 2 data points retrieved. - if len(sampleStream.Values) < 2 { - continue - } - + // TODO (pradykaushik) make efficient and parallelize parsing of Prometheus metrics. + for _, sample := range vector { var hostname model.LabelValue - var colocatedTasksCpuUsageSeconds map[taskIDValueT]*entities.Task - if hostname, ok = sampleStream.Metric[s.dedicatedLabelNameTaskHostname]; ok { - if colocatedTasksCpuUsageSeconds, ok = allTasksCpuUsageSeconds[hostValueT(hostname)]; !ok { + if hostname, ok = sample.Metric[s.dedicatedLabelNameTaskHostname]; ok { + if _, ok = s.previousTotalCpuUsage[string(hostname)]; !ok { // First time fetching metrics from this host. - colocatedTasksCpuUsageSeconds = make(map[taskIDValueT]*entities.Task) - allTasksCpuUsageSeconds[hostValueT(hostname)] = colocatedTasksCpuUsageSeconds + s.previousTotalCpuUsage[string(hostname)] = make(map[string]*taskCpuUsageInfo) + } + + if _, ok = nowTotalCpuUsage[string(hostname)]; !ok { + // Creating entry to record current total cumulative cpu usage for colocated tasks. + nowTotalCpuUsage[string(hostname)] = make(map[string]*cpuUsageDataPoint) } var taskID model.LabelValue - var taskCpuUsage *entities.Task - if taskID, ok = sampleStream.Metric[s.dedicatedLabelNameTaskID]; ok { - if taskCpuUsage, ok = colocatedTasksCpuUsageSeconds[taskIDValueT(taskID)]; !ok { + if taskID, ok = sample.Metric[s.dedicatedLabelNameTaskID]; ok { + if _, ok := s.previousTotalCpuUsage[string(hostname)][string(taskID)]; !ok { // First time fetching metrics for task. Recording taskID and hostname to help consolidation. - taskCpuUsage = &entities.Task{ - Metric: sampleStream.Metric, - Weight: 0.0, - ID: string(taskID), - Hostname: string(hostname), + taskTotalCpuUsage := &taskCpuUsageInfo{ + task: &entities.Task{ + Metric: sample.Metric, + Weight: 0.0, + ID: string(taskID), + Hostname: string(hostname), + }, + dataPoint: nil, } - colocatedTasksCpuUsageSeconds[taskIDValueT(taskID)] = taskCpuUsage + s.previousTotalCpuUsage[string(hostname)][string(taskID)] = taskTotalCpuUsage } - if _, ok = sampleStream.Metric[cpuLabelName]; ok { - // Calculating and recording the cpu utilization (%) of this task on this cpu. - // Adding it to an accumulator that will later be used as the cpu utilization of - // this task across all cpus. - taskCpuUsage.Weight += s.cpuUtil(sampleStream.Values) + if _, ok := nowTotalCpuUsage[string(hostname)][string(taskID)]; !ok { + // Recording cumulative cpu usage seconds for task on cpu. + nowTotalCpuUsage[string(hostname)][string(taskID)] = &cpuUsageDataPoint{ + totalCumulativeCpuUsage: float64(sample.Value), + timestamp: sample.Timestamp, + } } else { - // CPU usage data does not correspond to a particular cpu. - // TODO do not ignore this. We should instead log this. + // Adding cumulative cpu usage seconds for task on a cpu. + nowTotalCpuUsage[string(hostname)][string(taskID)].totalCumulativeCpuUsage += float64(sample.Value) } + } else { + // Either taskID not exported along with the metrics, or + // Task ID dedicated label provided is incorrect. + // TODO do not ignore this. We should log this instead. } } else { // Either hostname not exported along with the metrics, or @@ -140,10 +173,23 @@ func (s *TaskRankCpuUtilStrategy) Execute(data model.Value) { // Rank colocated tasks in non-increasing order based on their total cpu utilization (%) // on the entire host (all cpus). rankedTasks := make(entities.RankedTasks) - for hostname, colocatedTasksCpuUsageSeconds := range allTasksCpuUsageSeconds { - for _, taskCpuUsage := range colocatedTasksCpuUsageSeconds { - taskCpuUsage.Weight = s.round(taskCpuUsage.Weight) - rankedTasks[entities.Hostname(hostname)] = append(rankedTasks[entities.Hostname(hostname)], *taskCpuUsage) + for hostname, colocatedTasksCpuUsageInfo := range s.previousTotalCpuUsage { + for taskID, prevTotalCpuUsage := range colocatedTasksCpuUsageInfo { + if prevTotalCpuUsage.dataPoint == nil { + prevTotalCpuUsage.dataPoint = new(cpuUsageDataPoint) + } else { + // Calculating the cpu utilization of this task. + prevTotalCpuUsage.task.Weight = s.round(s.cpuUtil( + prevTotalCpuUsage.dataPoint.totalCumulativeCpuUsage, + prevTotalCpuUsage.dataPoint.timestamp, + nowTotalCpuUsage[hostname][taskID].totalCumulativeCpuUsage, + nowTotalCpuUsage[hostname][taskID].timestamp, + )) + rankedTasks[entities.Hostname(hostname)] = append(rankedTasks[entities.Hostname(hostname)], *prevTotalCpuUsage.task) + } + // Saving current total cumulative cpu usage seconds to calculate cpu utilization in the next interval. + prevTotalCpuUsage.dataPoint.totalCumulativeCpuUsage = nowTotalCpuUsage[hostname][taskID].totalCumulativeCpuUsage + prevTotalCpuUsage.dataPoint.timestamp = nowTotalCpuUsage[hostname][taskID].timestamp } // Sorting colocated tasks. @@ -154,7 +200,13 @@ func (s *TaskRankCpuUtilStrategy) Execute(data model.Value) { } // Submitting ranked tasks to the receiver. - s.receiver.Receive(rankedTasks) + if len(rankedTasks) > 0 { + logger.WithFields(logrus.Fields{ + "task_ranking_strategy": "cpuutil", + "task_ranking_results": rankedTasks, + }).Log(logrus.InfoLevel, "strategy executed") + s.receiver.Receive(rankedTasks) + } } // cpuUtilPrecision defines the precision for cpu utilization (%) values. @@ -169,12 +221,17 @@ func (s TaskRankCpuUtilStrategy) round(cpuUtil float64) float64 { } // cpuUtil calculates and returns the cpu utilization (%) for the task. -// Elapsed time is calculated as the number of seconds between oldest and newest value by -// factoring in the prometheus scrape interval. -func (s TaskRankCpuUtilStrategy) cpuUtil(values []model.SamplePair) float64 { - n := len(values) - return 100.0 * ((float64(values[n-1].Value) - float64(values[0].Value)) / - (float64(n-1) * s.prometheusScrapeInterval.Seconds())) +// The time difference (in seconds) of the two data points is used as the elapsed time. Note that this okay +// as the task ranker schedule (in seconds) is a multiple of the prometheus scrape interval. +func (s TaskRankCpuUtilStrategy) cpuUtil( + prevTotalCpuUsage float64, + prevTotalCpuUsageTimestamp model.Time, + nowTotalCpuUsage float64, + nowTotalCpuUsageTimestamp model.Time) float64 { + + // timestamps are in milliseconds and therefore dividing by 1000 to convert to seconds. + timeDiffSeconds := float64(nowTotalCpuUsageTimestamp-prevTotalCpuUsageTimestamp) / 1000 + return 100.0 * ((nowTotalCpuUsage - prevTotalCpuUsage) / timeDiffSeconds) } // GetMetric returns the name of the metric to query. @@ -218,19 +275,8 @@ func (s TaskRankCpuUtilStrategy) GetLabelMatchers() []*query.LabelMatcher { // GetRange returns the time unit and duration for how far back (in seconds) values need to be fetched. func (s TaskRankCpuUtilStrategy) GetRange() (query.TimeUnit, uint) { return s.rangeTimeUnit, s.rangeQty - // return query.Seconds, uint(5 * int(s.prometheusScrapeInterval.Seconds())) } // SetRange sets the time duration for the range query. -// For cpu-util ranking strategy the time duration has to be > 1s as you need two data points to calculate cpu utilization. -// If the provided time duration <= 1s, the default duration of 5 intervals of time is used, where each -// interval of time is equal to the prometheus scrape interval. -func (s *TaskRankCpuUtilStrategy) SetRange(timeUnit query.TimeUnit, qty uint) { - if !timeUnit.IsValid() || ((timeUnit == query.Seconds) && qty <= 1) { - s.rangeTimeUnit = query.Seconds - s.rangeQty = uint(5 * int(s.prometheusScrapeInterval.Seconds())) - } else { - s.rangeTimeUnit = timeUnit - s.rangeQty = qty - } -} +// As this strategy does not use range queries a call to this method results in a NO-OP. +func (s *TaskRankCpuUtilStrategy) SetRange(_ query.TimeUnit, _ uint) {} diff --git a/strategies/taskRankCpuUtilStrategy_test.go b/strategies/taskRankCpuUtilStrategy_test.go index 5087feb..fb1e770 100644 --- a/strategies/taskRankCpuUtilStrategy_test.go +++ b/strategies/taskRankCpuUtilStrategy_test.go @@ -69,8 +69,8 @@ func TestTaskRankCpuUtilStrategy_GetRange(t *testing.T) { checkRange := func(strategy *TaskRankCpuUtilStrategy) { timeUnit, qty := strategy.GetRange() - assert.Equal(t, query.Seconds, timeUnit) - assert.Equal(t, uint(5), qty) + assert.Equal(t, query.None, timeUnit) + assert.Equal(t, uint(0), qty) } count := 5 @@ -81,30 +81,26 @@ func TestTaskRankCpuUtilStrategy_GetRange(t *testing.T) { } } -// mockCpuUtilData returns a mock of prometheus time series data. +var elapsedTime float64 = 0 + +// mockConstCpuUtilData returns a mock of prometheus time series data. // This mock has the following information. // 1. Three tasks with ids 'test_task_id_{1..3}'. // 2. Hostname for all tasks is localhost. // 3. For each task, cpu usage data is provided for two cpus, 'cpu00' and 'cpu01'. -// 4. task with id 'test_task_id_1' shows cpu utilization of 22.5% on each cpu. -// 5. task with id 'test_task_id_2' shows cpu utilization of 30% on each cpu. -// 6. task with id 'test_task_id_3' shows cpu utilization of 67.5% on each cpu. -func mockCpuUtilData(dedicatedLabelTaskID, dedicatedLabelTaskHost model.LabelName) model.Value { - now := time.Now() - return model.Value(model.Matrix{ +// 4. task with id 'test_task_id_1' demonstrates cpu utilization of 22.5% on each cpu. +// 5. task with id 'test_task_id_2' demonstrates cpu utilization of 30% on each cpu. +// 6. task with id 'test_task_id_3' demonstrates cpu utilization of 67.5% on each cpu. +func mockConstCpuUtilData(dedicatedLabelTaskID, dedicatedLabelTaskHost model.LabelName) (mockedCpuUtilData model.Value) { + mockedCpuUtilData = model.Value(model.Vector{ { Metric: map[model.LabelName]model.LabelValue{ dedicatedLabelTaskID: "test_task_id_1", dedicatedLabelTaskHost: "localhost", "cpu": "cpu00", }, - Values: []model.SamplePair{ - {Timestamp: model.Time(now.Unix()), Value: 0.5}, - {Timestamp: model.Time(now.Add(1 * time.Second).Unix()), Value: 0.8}, - {Timestamp: model.Time(now.Add(2 * time.Second).Unix()), Value: 1.1}, - {Timestamp: model.Time(now.Add(3 * time.Second).Unix()), Value: 1.3}, - {Timestamp: model.Time(now.Add(4 * time.Second).Unix()), Value: 1.4}, - }, + Value: model.SampleValue(0.225 * (elapsedTime + 1)), + Timestamp: model.Time(1000 * (elapsedTime + 1)), }, { Metric: map[model.LabelName]model.LabelValue{ @@ -112,13 +108,8 @@ func mockCpuUtilData(dedicatedLabelTaskID, dedicatedLabelTaskHost model.LabelNam dedicatedLabelTaskHost: "localhost", "cpu": "cpu01", }, - Values: []model.SamplePair{ - {Timestamp: model.Time(now.Unix()), Value: 0.5}, - {Timestamp: model.Time(now.Add(1 * time.Second).Unix()), Value: 0.8}, - {Timestamp: model.Time(now.Add(2 * time.Second).Unix()), Value: 1.1}, - {Timestamp: model.Time(now.Add(3 * time.Second).Unix()), Value: 1.3}, - {Timestamp: model.Time(now.Add(4 * time.Second).Unix()), Value: 1.4}, - }, + Value: model.SampleValue(0.225 * (elapsedTime + 1)), + Timestamp: model.Time(1000 * (elapsedTime + 1)), }, { Metric: map[model.LabelName]model.LabelValue{ @@ -126,13 +117,8 @@ func mockCpuUtilData(dedicatedLabelTaskID, dedicatedLabelTaskHost model.LabelNam dedicatedLabelTaskHost: "localhost", "cpu": "cpu00", }, - Values: []model.SamplePair{ - {Timestamp: model.Time(now.Unix()), Value: 0.5}, - {Timestamp: model.Time(now.Add(1 * time.Second).Unix()), Value: 0.9}, - {Timestamp: model.Time(now.Add(2 * time.Second).Unix()), Value: 1.3}, - {Timestamp: model.Time(now.Add(3 * time.Second).Unix()), Value: 1.5}, - {Timestamp: model.Time(now.Add(4 * time.Second).Unix()), Value: 1.7}, - }, + Value: model.SampleValue(0.3 * (elapsedTime + 1)), + Timestamp: model.Time(1000 * (elapsedTime + 1)), }, { Metric: map[model.LabelName]model.LabelValue{ @@ -140,13 +126,8 @@ func mockCpuUtilData(dedicatedLabelTaskID, dedicatedLabelTaskHost model.LabelNam dedicatedLabelTaskHost: "localhost", "cpu": "cpu01", }, - Values: []model.SamplePair{ - {Timestamp: model.Time(now.Unix()), Value: 0.5}, - {Timestamp: model.Time(now.Add(1 * time.Second).Unix()), Value: 0.9}, - {Timestamp: model.Time(now.Add(2 * time.Second).Unix()), Value: 1.3}, - {Timestamp: model.Time(now.Add(3 * time.Second).Unix()), Value: 1.5}, - {Timestamp: model.Time(now.Add(4 * time.Second).Unix()), Value: 1.7}, - }, + Value: model.SampleValue(0.3 * (elapsedTime + 1)), + Timestamp: model.Time(1000 * (elapsedTime + 1)), }, { Metric: map[model.LabelName]model.LabelValue{ @@ -154,13 +135,8 @@ func mockCpuUtilData(dedicatedLabelTaskID, dedicatedLabelTaskHost model.LabelNam dedicatedLabelTaskHost: "localhost", "cpu": "cpu00", }, - Values: []model.SamplePair{ - {Timestamp: model.Time(now.Unix()), Value: 0.3}, - {Timestamp: model.Time(now.Add(1 * time.Second).Unix()), Value: 0.9}, - {Timestamp: model.Time(now.Add(2 * time.Second).Unix()), Value: 1.6}, - {Timestamp: model.Time(now.Add(3 * time.Second).Unix()), Value: 2.5}, - {Timestamp: model.Time(now.Add(4 * time.Second).Unix()), Value: 3.0}, - }, + Value: model.SampleValue(0.675 * (elapsedTime + 1)), + Timestamp: model.Time(1000 * (elapsedTime + 1)), }, { Metric: map[model.LabelName]model.LabelValue{ @@ -168,15 +144,68 @@ func mockCpuUtilData(dedicatedLabelTaskID, dedicatedLabelTaskHost model.LabelNam dedicatedLabelTaskHost: "localhost", "cpu": "cpu01", }, - Values: []model.SamplePair{ - {Timestamp: model.Time(now.Unix()), Value: 0.3}, - {Timestamp: model.Time(now.Add(1 * time.Second).Unix()), Value: 0.9}, - {Timestamp: model.Time(now.Add(2 * time.Second).Unix()), Value: 1.6}, - {Timestamp: model.Time(now.Add(3 * time.Second).Unix()), Value: 2.5}, - {Timestamp: model.Time(now.Add(4 * time.Second).Unix()), Value: 3.0}, + Value: model.SampleValue(0.675 * (elapsedTime + 1)), + Timestamp: model.Time(1000 * (elapsedTime + 1)), + }, + }) + elapsedTime++ + return +} + +var availableCpus = map[int]model.LabelValue{ + 0: "cpu00", + 1: "cpu01", +} + +// mockVaryingCpuUtilData returns a mock of prometheus time series data. +// This mock has the following information. +// 1. Three tasks with ids 'test_task_id_{1..3}'. +// 2. Hostname for all tasks is localhost. +// 3. For each task, cpu usage data is provided for a subset of the two cpus, 'cpu00' and 'cpu01'. +// 4. task with id 'test_task_id_1' demonstrates total cpu utilization of 45%. +// 5. task with id 'test_task_id_2' demonstrates total cpu utilization of 60%. +// 6. task with id 'test_task_id_3' demonstrates total cpu utilization of 135%. +func mockVaryingCpuUtilData(dedicatedLabelTaskID, dedicatedLabelTaskHost model.LabelName) (mockedCpuUtilData model.Value) { + mockedCpuUtilData = model.Value(model.Vector{ + { + Metric: map[model.LabelName]model.LabelValue{ + dedicatedLabelTaskID: "test_task_id_1", + dedicatedLabelTaskHost: "localhost", + "cpu": availableCpus[rand.Intn(2)], + }, + Value: model.SampleValue(0.45 * (elapsedTime + 1)), + Timestamp: model.Time(1000 * (elapsedTime + 1)), + }, + { + Metric: map[model.LabelName]model.LabelValue{ + dedicatedLabelTaskID: "test_task_id_2", + dedicatedLabelTaskHost: "localhost", + "cpu": availableCpus[rand.Intn(2)], + }, + Value: model.SampleValue(0.6 * (elapsedTime + 1)), + Timestamp: model.Time(1000 * (elapsedTime + 1)), + }, + { + Metric: map[model.LabelName]model.LabelValue{ + dedicatedLabelTaskID: "test_task_id_3", + dedicatedLabelTaskHost: "localhost", + "cpu": "cpu00", }, + Value: model.SampleValue(0.9 * (elapsedTime + 1)), + Timestamp: model.Time(1000 * (elapsedTime + 1)), + }, + { + Metric: map[model.LabelName]model.LabelValue{ + dedicatedLabelTaskID: "test_task_id_3", + dedicatedLabelTaskHost: "localhost", + "cpu": "cpu01", + }, + Value: model.SampleValue(0.45 * (elapsedTime + 1)), + Timestamp: model.Time(1000 * (elapsedTime + 1)), }, }) + elapsedTime++ + return } func TestTaskRankCpuUtilStrategy_Execute(t *testing.T) { @@ -193,9 +222,6 @@ func TestTaskRankCpuUtilStrategy_Execute(t *testing.T) { } s.Init() - data := mockCpuUtilData("container_label_task_id", "container_label_task_host") - s.Execute(data) - expectedRankedTasks := map[entities.Hostname][]entities.Task{ "localhost": { { @@ -206,7 +232,8 @@ func TestTaskRankCpuUtilStrategy_Execute(t *testing.T) { }, ID: "test_task_id_3", Hostname: "localhost", - Weight: 135.0, // sum of cpu util (%) on cpu00 and cpu01. + // Expected sum of cpu util (%) on cpu00 and cpu01. + Weight: 135.0, }, { Metric: map[model.LabelName]model.LabelValue{ @@ -216,7 +243,8 @@ func TestTaskRankCpuUtilStrategy_Execute(t *testing.T) { }, ID: "test_task_id_2", Hostname: "localhost", - Weight: 60.0, // sum of cpu util (%) on cpu00 and cpu01. + // Expected sum of cpu util (%) on cpu00 and cpu01. + Weight: 60.0, }, { Metric: map[model.LabelName]model.LabelValue{ @@ -226,16 +254,47 @@ func TestTaskRankCpuUtilStrategy_Execute(t *testing.T) { }, ID: "test_task_id_1", Hostname: "localhost", - Weight: 45.0, // sum of cpu util (%) on cpu00 and cpu01. + // Expected sum of cpu util (%) on cpu00 and cpu01. + Weight: 45.0, }, }, } - assert.Equal(t, len(expectedRankedTasks), len(receiver.rankedTasks)) + t.Run("tasks demonstrate constant cpu usage and use all cpus", func(t *testing.T) { + for i := 0; i < 5; i++ { + data := mockConstCpuUtilData("container_label_task_id", "container_label_task_host") + s.Execute(data) + + if i == 0 { + // No ranked tasks yet as we only have one second of data. + assert.Empty(t, receiver.rankedTasks) + continue + } + + assert.Equal(t, len(expectedRankedTasks), len(receiver.rankedTasks)) + + _, ok := expectedRankedTasks["localhost"] + _, localhostIsInRankedTasks := receiver.rankedTasks["localhost"] + assert.True(t, ok == localhostIsInRankedTasks) + + assert.ElementsMatch(t, expectedRankedTasks["localhost"], receiver.rankedTasks["localhost"]) + } - _, ok := expectedRankedTasks["localhost"] - _, localhostIsInRankedTasks := receiver.rankedTasks["localhost"] - assert.True(t, ok == localhostIsInRankedTasks) + }) + + t.Run("tasks demonstrate varying cpu usage and do not run on all cpus", func(t *testing.T) { + for i := 0; i < 5; i++ { // Starting from 5 to simulate cumulative cpu usage from previous test. + data := mockVaryingCpuUtilData("container_label_task_id", "container_label_task_host") + s.Execute(data) + + assert.Equal(t, len(expectedRankedTasks), len(receiver.rankedTasks)) - assert.ElementsMatch(t, expectedRankedTasks["localhost"], receiver.rankedTasks["localhost"]) + _, ok := expectedRankedTasks["localhost"] + _, localhostIsInRankedTasks := receiver.rankedTasks["localhost"] + assert.True(t, ok == localhostIsInRankedTasks) + + assert.ElementsMatch(t, expectedRankedTasks["localhost"], receiver.rankedTasks["localhost"]) + } + + }) }