Skip to content

Commit

Permalink
Release 1.1.0 (#17)
Browse files Browse the repository at this point in the history
configurable range + strategy (#16)

configure strategy + range duration with options.

With this commit, the time duration for range queries can also be
configured. This allows for more fine grained control over the number
of data points on which the strategies is to be applied.

For backwards compatibility the earlier WithStrategy(...) function
can still be used. However, note that this results in the default
setting of range duration (as exposed by the strategy) being used.
If the time duration needs to be configured, then WithStrategyOptions(...)
should now be used.

Retrofitted cpushares task ranking strategy to now parse either a matrix
or a vector depending on whether a range query is used.

Fixed the test code.

Added sample code to readme.
  • Loading branch information
pradykaushik authored Jul 10, 2020
1 parent 9ba66ca commit 713ff5c
Show file tree
Hide file tree
Showing 9 changed files with 268 additions and 63 deletions.
27 changes: 26 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,32 @@ tRanker, err = New(
WithStrategy("cpushares", []*query.LabelMatcher{
{Type: query.TaskID, Label: "container_label_task_id", Operator: query.NotEqual, Value: ""},
{Type: query.TaskHostname, Label: "container_label_task_host", Operator: query.Equal, Value: "localhost"},
}, &dummyTaskRanksReceiver{}))
}, new(dummyTaskRanksReceiver), 1*time.Second))
```

You can now also configure the strategies using initialization [options](./strategies/strategy.go). This allows for
configuring the time duration of range queries, enabling fine-grained control over the number of data points
over which the strategy is applied. See below example for strategy configuration using options.
```go
type dummyTaskRanksReceiver struct{}

func (r *dummyTaskRanksReceiver) Receive(rankedTasks entities.RankedTasks) {
log.Println(rankedTasks)
}

prometheusDataFetcher, err = prometheus.NewDataFetcher(
prometheus.WithPrometheusEndpoint("http://localhost:9090"))

tRanker, err = New(
WithDataFetcher(prometheusDataFetcher),
WithSchedule("?/5 * * * * *"),
WithStrategyOptions("cpuutil",
strategies.WithLabelMatchers([]*query.LabelMatcher{
{Type: query.TaskID, Label: "container_label_task_id", Operator: query.NotEqual, Value: ""},
{Type: query.TaskHostname, Label: "container_label_task_host", Operator: query.Equal, Value: "localhost"}}),
strategies.WithTaskRanksReceiver(new(dummyTaskRanksReceiver)),
strategies.WithPrometheusScrapeInterval(1*time.Second),
strategies.WithRange(query.Seconds, 5)))
```

##### Dedicated Label Matchers
Expand Down
31 changes: 29 additions & 2 deletions ranker.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ func WithDataFetcher(dataFetcher df.Interface) Option {
}
}

// WithStrategy builds the task ranking strategy associated with the given name using the provided information.
// For backwards compatibility, strategies that use range queries will use the default duration. If the time
// duration for the range query needs to be configured, then use WithStrategyOptions(...) to configure the strategy
// and provide the WithRange(...) option.
func WithStrategy(
strategy string,
labelMatchers []*query.LabelMatcher,
Expand All @@ -79,12 +83,35 @@ func WithStrategy(
return errors.New("invalid strategy")
}

// TODO validate arguments.
if s, err := factory.GetTaskRankStrategy(strategy); err != nil {
return err
} else {
tRanker.Strategy = s
err := strategies.Build(tRanker.Strategy, labelMatchers, receiver, prometheusScrapeInterval)
err := strategies.Build(s,
strategies.WithLabelMatchers(labelMatchers),
strategies.WithTaskRanksReceiver(receiver),
strategies.WithPrometheusScrapeInterval(prometheusScrapeInterval))
if err != nil {
return errors.Wrap(err, "failed to build strategy")
}
tRanker.DataFetcher.SetStrategy(s)
}
return nil
}
}

// WithStrategyOptions builds the strategy associated with the given name using the provided initialization options.
func WithStrategyOptions(strategy string, strategyOptions ...strategies.Option) Option {
return func(tRanker *TaskRanker) error {
if strategy == "" {
return errors.New("invalid strategy")
}

if s, err := factory.GetTaskRankStrategy(strategy); err != nil {
return err
} else {
tRanker.Strategy = s
err := strategies.Build(s, strategyOptions...)
if err != nil {
return errors.Wrap(err, "failed to build strategy")
}
Expand Down
37 changes: 33 additions & 4 deletions ranker_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/pradykaushik/task-ranker/datafetcher/prometheus"
"github.com/pradykaushik/task-ranker/entities"
"github.com/pradykaushik/task-ranker/query"
"github.com/pradykaushik/task-ranker/strategies"
"github.com/stretchr/testify/assert"
"testing"
"time"
Expand Down Expand Up @@ -58,18 +59,46 @@ func initTaskRanker(strategy string) (*TaskRanker, error) {
return tRanker, err
}

func initTaskRankerOptions(strategy string) (*TaskRanker, error) {
var prometheusDataFetcher datafetcher.Interface
var err error
var tRanker *TaskRanker

prometheusDataFetcher, err = prometheus.NewDataFetcher(
prometheus.WithPrometheusEndpoint("http://localhost:9090"))
if err != nil {
return nil, err
}

dummyReceiver = new(dummyTaskRanksReceiver)
tRanker, err = New(
WithDataFetcher(prometheusDataFetcher),
WithSchedule("?/5 * * * * *"),
WithStrategyOptions(strategy,
strategies.WithLabelMatchers([]*query.LabelMatcher{
{Type: query.TaskID, Label: "container_label_task_id", Operator: query.EqualRegex, Value: "hello_.*"},
{Type: query.TaskHostname, Label: "container_label_task_host", Operator: query.Equal, Value: "localhost"}}),
strategies.WithTaskRanksReceiver(dummyReceiver),
strategies.WithPrometheusScrapeInterval(1*time.Second),
strategies.WithRange(query.Seconds, 5)))

return tRanker, err

}

// Test the cpushares task ranking strategy.
func TestTaskRanker_CpuSharesRanking(t *testing.T) {
testStrategy(t, "cpushares")
tRanker, initErr := initTaskRanker("cpushares")
testStrategy(t, tRanker, initErr)
}

// Test the cpuutil task ranking strategy.
func TestTaskRanker_CpuUtilRanking(t *testing.T) {
testStrategy(t, "cpuutil")
tRanker, initErr := initTaskRankerOptions("cpuutil")
testStrategy(t, tRanker, initErr)
}

func testStrategy(t *testing.T, strategy string) {
tRanker, initErr := initTaskRanker(strategy)
func testStrategy(t *testing.T, tRanker *TaskRanker, initErr error) {
assert.NoError(t, initErr)
assert.NotNil(t, tRanker)
tRanker.Start()
Expand Down
44 changes: 29 additions & 15 deletions ranker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,33 @@ import (
)

func TestNew(t *testing.T) {
tRanker, initErr := initTaskRanker("cpushares")
assert.NoError(t, initErr)
assert.NotNil(t, tRanker)
assert.NotNil(t, tRanker.DataFetcher)
assert.NotNil(t, tRanker.Strategy)
assert.Equal(t, "http://localhost:9090",
tRanker.DataFetcher.(*prometheus.DataFetcher).GetEndpoint())
assert.ElementsMatch(t, []*query.LabelMatcher{
{Type: query.TaskID, Label: "container_label_task_id", Operator: query.EqualRegex, Value: "hello_.*"},
{Type: query.TaskHostname, Label: "container_label_task_host", Operator: query.Equal, Value: "localhost"},
}, tRanker.Strategy.(*strategies.TaskRankCpuSharesStrategy).GetLabelMatchers())
parser := cron.NewParser(cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow)
tRankerSchedule, err := parser.Parse("?/5 * * * * *")
assert.NoError(t, err)
assert.Equal(t, tRankerSchedule, tRanker.Schedule)
test := func(tRanker *TaskRanker, initErr error) {
assert.NoError(t, initErr)
assert.NotNil(t, tRanker)
assert.NotNil(t, tRanker.DataFetcher)
assert.NotNil(t, tRanker.Strategy)
assert.Equal(t, "http://localhost:9090",
tRanker.DataFetcher.(*prometheus.DataFetcher).GetEndpoint())
assert.ElementsMatch(t, []*query.LabelMatcher{
{Type: query.TaskID, Label: "container_label_task_id", Operator: query.EqualRegex, Value: "hello_.*"},
{Type: query.TaskHostname, Label: "container_label_task_host", Operator: query.Equal, Value: "localhost"},
}, tRanker.Strategy.(*strategies.TaskRankCpuSharesStrategy).GetLabelMatchers())
parser := cron.NewParser(cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow)
tRankerSchedule, err := parser.Parse("?/5 * * * * *")
assert.NoError(t, err)
assert.Equal(t, tRankerSchedule, tRanker.Schedule)
}

t.Run("using WithStrategy", func(t *testing.T) {
tRanker, initErr := initTaskRanker("cpushares")
test(tRanker, initErr)
})

t.Run("using WithStrategyOptions", func(t *testing.T) {
tRanker, initErr := initTaskRankerOptions("cpushares")
test(tRanker, initErr)
timeUnit, qty := tRanker.Strategy.GetRange()
assert.Equal(t, query.Seconds, timeUnit)
assert.Equal(t, uint(5), qty)
})
}
71 changes: 56 additions & 15 deletions strategies/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ import (
)

type Interface interface {
// Init initializes the task ranking strategy, if needed.
// The Prometheus scrape interval is also provided.
Init(prometheusScrapeInterval time.Duration)
// Initialize any other internal data structure and perform any further setup operations.
Init()
// SetPrometheusScrapeInterval sets the prometheus scrape interval.
SetPrometheusScrapeInterval(time.Duration)
// SetTaskRanksReceiver registers a receiver of the task ranking results.
// This receiver is a callback and is used to pass the result of applying
// the strategy to rank tasks.
Expand All @@ -43,24 +44,64 @@ type Interface interface {
// Range returns the duration specifying how far back in time data needs to be fetched.
// Returns the unit of time along with an integer quantifying the duration.
GetRange() (query.TimeUnit, uint)
// SetRange sets the time duration for the range query.
SetRange(query.TimeUnit, uint)
}

// Build the strategy object.
func Build(
s Interface,
labelMatchers []*query.LabelMatcher,
receiver TaskRanksReceiver,
prometheusScrapeInterval time.Duration) error {
func Build(s Interface, options ...Option) error {
s.Init()
for _, opt := range options {
if err := opt(s); err != nil {
return errors.Wrap(err, "failed to build strategy")
}
}
return nil
}

if receiver == nil {
return errors.New("nil receiver provided")
// Options for configuring strategies.
type Option func(Interface) error

// WithLabelMatchers returns an option that initializes the label matchers to be used by the strategy.
func WithLabelMatchers(labelMatchers []*query.LabelMatcher) Option {
return func(strategy Interface) error {
if labelMatchers == nil {
return errors.New("invalid label matchers: nil provided")
}
return strategy.SetLabelMatchers(labelMatchers)
}
}

s.Init(prometheusScrapeInterval)
// WithRange returns an option that initializes the time unit and duration, if using range queries.
func WithRange(timeUnit query.TimeUnit, qty uint) Option {
return func(strategy Interface) error {
if !timeUnit.IsValid() {
return errors.New("invalid time unit provided for range")
}
if qty == 0 {
return errors.New("time duration cannot be 0")
}
strategy.SetRange(timeUnit, qty)
return nil
}
}

s.SetTaskRanksReceiver(receiver)
if err := s.SetLabelMatchers(labelMatchers); err != nil {
return errors.Wrap(err, "invalid label matchers for strategy")
// WithTaskRanksReceiver returns an option that initializes the receiver to which the task ranking results
// are submitted.
func WithTaskRanksReceiver(receiver TaskRanksReceiver) Option {
return func(strategy Interface) error {
if receiver == nil {
return errors.New("nil receiver provided")
}
strategy.SetTaskRanksReceiver(receiver)
return nil
}
}

// WithPrometheusScrapeInterval returns an option that initializes the prometheus scrape interval.
func WithPrometheusScrapeInterval(prometheusScrapeInterval time.Duration) Option {
return func(strategy Interface) error {
strategy.SetPrometheusScrapeInterval(prometheusScrapeInterval)
return nil
}
return nil
}
56 changes: 43 additions & 13 deletions strategies/taskRankCpuSharesStrategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,17 @@ type TaskRankCpuSharesStrategy struct {
// dedicatedLabelNameTaskHostname is the dedicated label to use when filtering metrics on a hostname basis.
// Storing this quick access instead of performing another O(n) search through labels.
dedicatedLabelNameTaskHostname model.LabelName
// Time duration for range query.
rangeTimeUnit query.TimeUnit
rangeQty uint
}

func (s *TaskRankCpuSharesStrategy) Init(time.Duration) {}
func (s *TaskRankCpuSharesStrategy) Init() {
s.rangeTimeUnit = query.None // By default cpu-shares ranking does not require a range query.
s.rangeQty = 0
}

func (s *TaskRankCpuSharesStrategy) SetPrometheusScrapeInterval(_ time.Duration) {}

// SetTaskRanksReceiver sets the receiver of the results of task ranking.
func (s *TaskRankCpuSharesStrategy) SetTaskRanksReceiver(receiver TaskRanksReceiver) {
Expand All @@ -49,36 +57,40 @@ func (s *TaskRankCpuSharesStrategy) SetTaskRanksReceiver(receiver TaskRanksRecei
func (s *TaskRankCpuSharesStrategy) Execute(data model.Value) {
valueT := data.Type()
var matrix model.Matrix
// Safety check to make sure that we cast to matrix only if value type is matrix.
var vector model.Vector
// Safety check to make sure that we cast to matrix/vector based on valueT.
// Note, however, that as the strategy decides the metric and the range for fetching
// data, it can assume the value type.
// For example, if a range is provided, then the value type would
// be a matrix.
// For example, if a range is provided, then the value type would be a matrix.
// If no range is provided, then the value type would be a vector.
switch valueT {
case model.ValMatrix:
matrix = data.(model.Matrix)
case model.ValVector:
vector = data.(model.Vector)
default:
// invalid value type.
// TODO do not ignore this. maybe log it?
}

// Initializing tasks to rank.
var tasks = make(entities.RankedTasks)
for _, sampleStream := range matrix {
if hostname, ok := sampleStream.Metric[s.dedicatedLabelNameTaskHostname]; ok {
addEntryForTask := func(metric model.Metric, weight float64) {
// Fetching hostname and adding entry for host and task.
if hostname, ok := metric[s.dedicatedLabelNameTaskHostname]; ok {
// Adding entry for host if needed.
if _, ok := tasks[entities.Hostname(hostname)]; !ok {
tasks[entities.Hostname(hostname)] = make([]entities.Task, 0)
}
// Fetching the task id.
if taskID, ok := sampleStream.Metric[s.dedicatedLabelNameTaskID]; ok {

// Fetching task id and adding entry for task.
if taskID, ok := metric[s.dedicatedLabelNameTaskID]; ok {
tasks[entities.Hostname(hostname)] = append(tasks[entities.Hostname(hostname)],
entities.Task{
Metric: sampleStream.Metric,
Metric: metric,
ID: string(taskID),
Hostname: string(hostname),
// As cpu shares allocated to a container can be updated for docker containers,
// taking the average of allocated cpu shares.
Weight: s.avgCpuShare(sampleStream.Values),
Weight: weight,
})
} else {
// SHOULD NOT BE HERE.
Expand All @@ -88,6 +100,18 @@ func (s *TaskRankCpuSharesStrategy) Execute(data model.Value) {
}
}

if matrix != nil {
for _, sampleStream := range matrix {
// As cpu shares allocated to a container can be updated for docker containers,
// taking the average of allocated cpu shares.
addEntryForTask(sampleStream.Metric, s.avgCpuShare(sampleStream.Values))
}
} else if vector != nil {
for _, sample := range vector {
addEntryForTask(sample.Metric, float64(sample.Value))
}
}

// Sorting colocated tasks in non-increasing order of cpu shares.
for _, colocatedTasks := range tasks {
sort.SliceStable(colocatedTasks, func(i, j int) bool {
Expand Down Expand Up @@ -151,5 +175,11 @@ func (s TaskRankCpuSharesStrategy) GetLabelMatchers() []*query.LabelMatcher {

// GetRange returns the time unit and duration for how far back values need to be fetched.
func (s TaskRankCpuSharesStrategy) GetRange() (query.TimeUnit, uint) {
return query.Seconds, 1
return s.rangeTimeUnit, s.rangeQty
}

// SetRange sets the time duration for the range query.
func (s *TaskRankCpuSharesStrategy) SetRange(timeUnit query.TimeUnit, qty uint) {
s.rangeTimeUnit = timeUnit
s.rangeQty = qty
}
Loading

0 comments on commit 713ff5c

Please sign in to comment.