Skip to content

Commit 6619dd1

Browse files
committed
Release 0.5.0 (#17)
configurable range + strategy (#16) configure strategy + range duration with options. With this commit, the time duration for range queries can also be configured. This allows for more fine grained control over the number of data points on which the strategies is to be applied. For backwards compatibility the earlier WithStrategy(...) function can still be used. However, note that this results in the default setting of range duration (as exposed by the strategy) being used. If the time duration needs to be configured, then WithStrategyOptions(...) should now be used. Retrofitted cpushares task ranking strategy to now parse either a matrix or a vector depending on whether a range query is used. Fixed the test code. Added sample code to readme.
1 parent 9ba66ca commit 6619dd1

9 files changed

+268
-63
lines changed

README.md

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,32 @@ tRanker, err = New(
7272
WithStrategy("cpushares", []*query.LabelMatcher{
7373
{Type: query.TaskID, Label: "container_label_task_id", Operator: query.NotEqual, Value: ""},
7474
{Type: query.TaskHostname, Label: "container_label_task_host", Operator: query.Equal, Value: "localhost"},
75-
}, &dummyTaskRanksReceiver{}))
75+
}, new(dummyTaskRanksReceiver), 1*time.Second))
76+
```
77+
78+
You can now also configure the strategies using initialization [options](./strategies/strategy.go). This allows for
79+
configuring the time duration of range queries, enabling fine-grained control over the number of data points
80+
over which the strategy is applied. See below example for strategy configuration using options.
81+
```go
82+
type dummyTaskRanksReceiver struct{}
83+
84+
func (r *dummyTaskRanksReceiver) Receive(rankedTasks entities.RankedTasks) {
85+
log.Println(rankedTasks)
86+
}
87+
88+
prometheusDataFetcher, err = prometheus.NewDataFetcher(
89+
prometheus.WithPrometheusEndpoint("http://localhost:9090"))
90+
91+
tRanker, err = New(
92+
WithDataFetcher(prometheusDataFetcher),
93+
WithSchedule("?/5 * * * * *"),
94+
WithStrategyOptions("cpuutil",
95+
strategies.WithLabelMatchers([]*query.LabelMatcher{
96+
{Type: query.TaskID, Label: "container_label_task_id", Operator: query.NotEqual, Value: ""},
97+
{Type: query.TaskHostname, Label: "container_label_task_host", Operator: query.Equal, Value: "localhost"}}),
98+
strategies.WithTaskRanksReceiver(new(dummyTaskRanksReceiver)),
99+
strategies.WithPrometheusScrapeInterval(1*time.Second),
100+
strategies.WithRange(query.Seconds, 5)))
76101
```
77102

78103
##### Dedicated Label Matchers

ranker.go

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ func WithDataFetcher(dataFetcher df.Interface) Option {
6868
}
6969
}
7070

71+
// WithStrategy builds the task ranking strategy associated with the given name using the provided information.
72+
// For backwards compatibility, strategies that use range queries will use the default duration. If the time
73+
// duration for the range query needs to be configured, then use WithStrategyOptions(...) to configure the strategy
74+
// and provide the WithRange(...) option.
7175
func WithStrategy(
7276
strategy string,
7377
labelMatchers []*query.LabelMatcher,
@@ -79,12 +83,35 @@ func WithStrategy(
7983
return errors.New("invalid strategy")
8084
}
8185

82-
// TODO validate arguments.
8386
if s, err := factory.GetTaskRankStrategy(strategy); err != nil {
8487
return err
8588
} else {
8689
tRanker.Strategy = s
87-
err := strategies.Build(tRanker.Strategy, labelMatchers, receiver, prometheusScrapeInterval)
90+
err := strategies.Build(s,
91+
strategies.WithLabelMatchers(labelMatchers),
92+
strategies.WithTaskRanksReceiver(receiver),
93+
strategies.WithPrometheusScrapeInterval(prometheusScrapeInterval))
94+
if err != nil {
95+
return errors.Wrap(err, "failed to build strategy")
96+
}
97+
tRanker.DataFetcher.SetStrategy(s)
98+
}
99+
return nil
100+
}
101+
}
102+
103+
// WithStrategyOptions builds the strategy associated with the given name using the provided initialization options.
104+
func WithStrategyOptions(strategy string, strategyOptions ...strategies.Option) Option {
105+
return func(tRanker *TaskRanker) error {
106+
if strategy == "" {
107+
return errors.New("invalid strategy")
108+
}
109+
110+
if s, err := factory.GetTaskRankStrategy(strategy); err != nil {
111+
return err
112+
} else {
113+
tRanker.Strategy = s
114+
err := strategies.Build(s, strategyOptions...)
88115
if err != nil {
89116
return errors.Wrap(err, "failed to build strategy")
90117
}

ranker_e2e_test.go

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/pradykaushik/task-ranker/datafetcher/prometheus"
2020
"github.com/pradykaushik/task-ranker/entities"
2121
"github.com/pradykaushik/task-ranker/query"
22+
"github.com/pradykaushik/task-ranker/strategies"
2223
"github.com/stretchr/testify/assert"
2324
"testing"
2425
"time"
@@ -58,18 +59,46 @@ func initTaskRanker(strategy string) (*TaskRanker, error) {
5859
return tRanker, err
5960
}
6061

62+
func initTaskRankerOptions(strategy string) (*TaskRanker, error) {
63+
var prometheusDataFetcher datafetcher.Interface
64+
var err error
65+
var tRanker *TaskRanker
66+
67+
prometheusDataFetcher, err = prometheus.NewDataFetcher(
68+
prometheus.WithPrometheusEndpoint("http://localhost:9090"))
69+
if err != nil {
70+
return nil, err
71+
}
72+
73+
dummyReceiver = new(dummyTaskRanksReceiver)
74+
tRanker, err = New(
75+
WithDataFetcher(prometheusDataFetcher),
76+
WithSchedule("?/5 * * * * *"),
77+
WithStrategyOptions(strategy,
78+
strategies.WithLabelMatchers([]*query.LabelMatcher{
79+
{Type: query.TaskID, Label: "container_label_task_id", Operator: query.EqualRegex, Value: "hello_.*"},
80+
{Type: query.TaskHostname, Label: "container_label_task_host", Operator: query.Equal, Value: "localhost"}}),
81+
strategies.WithTaskRanksReceiver(dummyReceiver),
82+
strategies.WithPrometheusScrapeInterval(1*time.Second),
83+
strategies.WithRange(query.Seconds, 5)))
84+
85+
return tRanker, err
86+
87+
}
88+
6189
// Test the cpushares task ranking strategy.
6290
func TestTaskRanker_CpuSharesRanking(t *testing.T) {
63-
testStrategy(t, "cpushares")
91+
tRanker, initErr := initTaskRanker("cpushares")
92+
testStrategy(t, tRanker, initErr)
6493
}
6594

6695
// Test the cpuutil task ranking strategy.
6796
func TestTaskRanker_CpuUtilRanking(t *testing.T) {
68-
testStrategy(t, "cpuutil")
97+
tRanker, initErr := initTaskRankerOptions("cpuutil")
98+
testStrategy(t, tRanker, initErr)
6999
}
70100

71-
func testStrategy(t *testing.T, strategy string) {
72-
tRanker, initErr := initTaskRanker(strategy)
101+
func testStrategy(t *testing.T, tRanker *TaskRanker, initErr error) {
73102
assert.NoError(t, initErr)
74103
assert.NotNil(t, tRanker)
75104
tRanker.Start()

ranker_test.go

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,33 @@ import (
2424
)
2525

2626
func TestNew(t *testing.T) {
27-
tRanker, initErr := initTaskRanker("cpushares")
28-
assert.NoError(t, initErr)
29-
assert.NotNil(t, tRanker)
30-
assert.NotNil(t, tRanker.DataFetcher)
31-
assert.NotNil(t, tRanker.Strategy)
32-
assert.Equal(t, "http://localhost:9090",
33-
tRanker.DataFetcher.(*prometheus.DataFetcher).GetEndpoint())
34-
assert.ElementsMatch(t, []*query.LabelMatcher{
35-
{Type: query.TaskID, Label: "container_label_task_id", Operator: query.EqualRegex, Value: "hello_.*"},
36-
{Type: query.TaskHostname, Label: "container_label_task_host", Operator: query.Equal, Value: "localhost"},
37-
}, tRanker.Strategy.(*strategies.TaskRankCpuSharesStrategy).GetLabelMatchers())
38-
parser := cron.NewParser(cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow)
39-
tRankerSchedule, err := parser.Parse("?/5 * * * * *")
40-
assert.NoError(t, err)
41-
assert.Equal(t, tRankerSchedule, tRanker.Schedule)
27+
test := func(tRanker *TaskRanker, initErr error) {
28+
assert.NoError(t, initErr)
29+
assert.NotNil(t, tRanker)
30+
assert.NotNil(t, tRanker.DataFetcher)
31+
assert.NotNil(t, tRanker.Strategy)
32+
assert.Equal(t, "http://localhost:9090",
33+
tRanker.DataFetcher.(*prometheus.DataFetcher).GetEndpoint())
34+
assert.ElementsMatch(t, []*query.LabelMatcher{
35+
{Type: query.TaskID, Label: "container_label_task_id", Operator: query.EqualRegex, Value: "hello_.*"},
36+
{Type: query.TaskHostname, Label: "container_label_task_host", Operator: query.Equal, Value: "localhost"},
37+
}, tRanker.Strategy.(*strategies.TaskRankCpuSharesStrategy).GetLabelMatchers())
38+
parser := cron.NewParser(cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow)
39+
tRankerSchedule, err := parser.Parse("?/5 * * * * *")
40+
assert.NoError(t, err)
41+
assert.Equal(t, tRankerSchedule, tRanker.Schedule)
42+
}
43+
44+
t.Run("using WithStrategy", func(t *testing.T) {
45+
tRanker, initErr := initTaskRanker("cpushares")
46+
test(tRanker, initErr)
47+
})
48+
49+
t.Run("using WithStrategyOptions", func(t *testing.T) {
50+
tRanker, initErr := initTaskRankerOptions("cpushares")
51+
test(tRanker, initErr)
52+
timeUnit, qty := tRanker.Strategy.GetRange()
53+
assert.Equal(t, query.Seconds, timeUnit)
54+
assert.Equal(t, uint(5), qty)
55+
})
4256
}

strategies/strategy.go

Lines changed: 56 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,10 @@ import (
2222
)
2323

2424
type Interface interface {
25-
// Init initializes the task ranking strategy, if needed.
26-
// The Prometheus scrape interval is also provided.
27-
Init(prometheusScrapeInterval time.Duration)
25+
// Initialize any other internal data structure and perform any further setup operations.
26+
Init()
27+
// SetPrometheusScrapeInterval sets the prometheus scrape interval.
28+
SetPrometheusScrapeInterval(time.Duration)
2829
// SetTaskRanksReceiver registers a receiver of the task ranking results.
2930
// This receiver is a callback and is used to pass the result of applying
3031
// the strategy to rank tasks.
@@ -43,24 +44,64 @@ type Interface interface {
4344
// Range returns the duration specifying how far back in time data needs to be fetched.
4445
// Returns the unit of time along with an integer quantifying the duration.
4546
GetRange() (query.TimeUnit, uint)
47+
// SetRange sets the time duration for the range query.
48+
SetRange(query.TimeUnit, uint)
4649
}
4750

4851
// Build the strategy object.
49-
func Build(
50-
s Interface,
51-
labelMatchers []*query.LabelMatcher,
52-
receiver TaskRanksReceiver,
53-
prometheusScrapeInterval time.Duration) error {
52+
func Build(s Interface, options ...Option) error {
53+
s.Init()
54+
for _, opt := range options {
55+
if err := opt(s); err != nil {
56+
return errors.Wrap(err, "failed to build strategy")
57+
}
58+
}
59+
return nil
60+
}
5461

55-
if receiver == nil {
56-
return errors.New("nil receiver provided")
62+
// Options for configuring strategies.
63+
type Option func(Interface) error
64+
65+
// WithLabelMatchers returns an option that initializes the label matchers to be used by the strategy.
66+
func WithLabelMatchers(labelMatchers []*query.LabelMatcher) Option {
67+
return func(strategy Interface) error {
68+
if labelMatchers == nil {
69+
return errors.New("invalid label matchers: nil provided")
70+
}
71+
return strategy.SetLabelMatchers(labelMatchers)
5772
}
73+
}
5874

59-
s.Init(prometheusScrapeInterval)
75+
// WithRange returns an option that initializes the time unit and duration, if using range queries.
76+
func WithRange(timeUnit query.TimeUnit, qty uint) Option {
77+
return func(strategy Interface) error {
78+
if !timeUnit.IsValid() {
79+
return errors.New("invalid time unit provided for range")
80+
}
81+
if qty == 0 {
82+
return errors.New("time duration cannot be 0")
83+
}
84+
strategy.SetRange(timeUnit, qty)
85+
return nil
86+
}
87+
}
6088

61-
s.SetTaskRanksReceiver(receiver)
62-
if err := s.SetLabelMatchers(labelMatchers); err != nil {
63-
return errors.Wrap(err, "invalid label matchers for strategy")
89+
// WithTaskRanksReceiver returns an option that initializes the receiver to which the task ranking results
90+
// are submitted.
91+
func WithTaskRanksReceiver(receiver TaskRanksReceiver) Option {
92+
return func(strategy Interface) error {
93+
if receiver == nil {
94+
return errors.New("nil receiver provided")
95+
}
96+
strategy.SetTaskRanksReceiver(receiver)
97+
return nil
98+
}
99+
}
100+
101+
// WithPrometheusScrapeInterval returns an option that initializes the prometheus scrape interval.
102+
func WithPrometheusScrapeInterval(prometheusScrapeInterval time.Duration) Option {
103+
return func(strategy Interface) error {
104+
strategy.SetPrometheusScrapeInterval(prometheusScrapeInterval)
105+
return nil
64106
}
65-
return nil
66107
}

strategies/taskRankCpuSharesStrategy.go

Lines changed: 43 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,17 @@ type TaskRankCpuSharesStrategy struct {
3636
// dedicatedLabelNameTaskHostname is the dedicated label to use when filtering metrics on a hostname basis.
3737
// Storing this quick access instead of performing another O(n) search through labels.
3838
dedicatedLabelNameTaskHostname model.LabelName
39+
// Time duration for range query.
40+
rangeTimeUnit query.TimeUnit
41+
rangeQty uint
3942
}
4043

41-
func (s *TaskRankCpuSharesStrategy) Init(time.Duration) {}
44+
func (s *TaskRankCpuSharesStrategy) Init() {
45+
s.rangeTimeUnit = query.None // By default cpu-shares ranking does not require a range query.
46+
s.rangeQty = 0
47+
}
48+
49+
func (s *TaskRankCpuSharesStrategy) SetPrometheusScrapeInterval(_ time.Duration) {}
4250

4351
// SetTaskRanksReceiver sets the receiver of the results of task ranking.
4452
func (s *TaskRankCpuSharesStrategy) SetTaskRanksReceiver(receiver TaskRanksReceiver) {
@@ -49,36 +57,40 @@ func (s *TaskRankCpuSharesStrategy) SetTaskRanksReceiver(receiver TaskRanksRecei
4957
func (s *TaskRankCpuSharesStrategy) Execute(data model.Value) {
5058
valueT := data.Type()
5159
var matrix model.Matrix
52-
// Safety check to make sure that we cast to matrix only if value type is matrix.
60+
var vector model.Vector
61+
// Safety check to make sure that we cast to matrix/vector based on valueT.
5362
// Note, however, that as the strategy decides the metric and the range for fetching
5463
// data, it can assume the value type.
55-
// For example, if a range is provided, then the value type would
56-
// be a matrix.
64+
// For example, if a range is provided, then the value type would be a matrix.
65+
// If no range is provided, then the value type would be a vector.
5766
switch valueT {
5867
case model.ValMatrix:
5968
matrix = data.(model.Matrix)
69+
case model.ValVector:
70+
vector = data.(model.Vector)
6071
default:
6172
// invalid value type.
6273
// TODO do not ignore this. maybe log it?
6374
}
6475

6576
// Initializing tasks to rank.
6677
var tasks = make(entities.RankedTasks)
67-
for _, sampleStream := range matrix {
68-
if hostname, ok := sampleStream.Metric[s.dedicatedLabelNameTaskHostname]; ok {
78+
addEntryForTask := func(metric model.Metric, weight float64) {
79+
// Fetching hostname and adding entry for host and task.
80+
if hostname, ok := metric[s.dedicatedLabelNameTaskHostname]; ok {
81+
// Adding entry for host if needed.
6982
if _, ok := tasks[entities.Hostname(hostname)]; !ok {
7083
tasks[entities.Hostname(hostname)] = make([]entities.Task, 0)
7184
}
72-
// Fetching the task id.
73-
if taskID, ok := sampleStream.Metric[s.dedicatedLabelNameTaskID]; ok {
85+
86+
// Fetching task id and adding entry for task.
87+
if taskID, ok := metric[s.dedicatedLabelNameTaskID]; ok {
7488
tasks[entities.Hostname(hostname)] = append(tasks[entities.Hostname(hostname)],
7589
entities.Task{
76-
Metric: sampleStream.Metric,
90+
Metric: metric,
7791
ID: string(taskID),
7892
Hostname: string(hostname),
79-
// As cpu shares allocated to a container can be updated for docker containers,
80-
// taking the average of allocated cpu shares.
81-
Weight: s.avgCpuShare(sampleStream.Values),
93+
Weight: weight,
8294
})
8395
} else {
8496
// SHOULD NOT BE HERE.
@@ -88,6 +100,18 @@ func (s *TaskRankCpuSharesStrategy) Execute(data model.Value) {
88100
}
89101
}
90102

103+
if matrix != nil {
104+
for _, sampleStream := range matrix {
105+
// As cpu shares allocated to a container can be updated for docker containers,
106+
// taking the average of allocated cpu shares.
107+
addEntryForTask(sampleStream.Metric, s.avgCpuShare(sampleStream.Values))
108+
}
109+
} else if vector != nil {
110+
for _, sample := range vector {
111+
addEntryForTask(sample.Metric, float64(sample.Value))
112+
}
113+
}
114+
91115
// Sorting colocated tasks in non-increasing order of cpu shares.
92116
for _, colocatedTasks := range tasks {
93117
sort.SliceStable(colocatedTasks, func(i, j int) bool {
@@ -151,5 +175,11 @@ func (s TaskRankCpuSharesStrategy) GetLabelMatchers() []*query.LabelMatcher {
151175

152176
// GetRange returns the time unit and duration for how far back values need to be fetched.
153177
func (s TaskRankCpuSharesStrategy) GetRange() (query.TimeUnit, uint) {
154-
return query.Seconds, 1
178+
return s.rangeTimeUnit, s.rangeQty
179+
}
180+
181+
// SetRange sets the time duration for the range query.
182+
func (s *TaskRankCpuSharesStrategy) SetRange(timeUnit query.TimeUnit, qty uint) {
183+
s.rangeTimeUnit = timeUnit
184+
s.rangeQty = qty
155185
}

0 commit comments

Comments
 (0)