Skip to content

Commit

Permalink
Merge pull request #92 from keis/task_attribute_constraints
Browse files Browse the repository at this point in the history
Task attribute constraints
  • Loading branch information
keis committed Jun 9, 2016
2 parents dc5aa8c + f008aca commit 5535d36
Show file tree
Hide file tree
Showing 13 changed files with 342 additions and 53 deletions.
23 changes: 21 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
.PHONY: all test docker publish-docker
.PHONY: all test test-server test-docker docker docker-clean publish-docker

VERSION?=$(shell git describe HEAD | sed s/^v//)
DATE?=$(shell date -u '+%Y-%m-%d_%H:%M:%S')
Expand All @@ -11,6 +11,9 @@ TOOLS=${GOPATH}/bin/go-bindata \
SRC=$(shell find . -name '*.go')
STATIC=$(shell find static templates)

DOCKER_GO_SRC_PATH=/go/src/github.com/klarna/eremetic
DOCKER_GOLANG_RUN_CMD=docker run --rm -v "$(PWD)":$(DOCKER_GO_SRC_PATH) -w $(DOCKER_GO_SRC_PATH) golang:1.6 bash -c

all: test

${TOOLS}:
Expand All @@ -24,6 +27,10 @@ test: eremetic
test-server: ${TOOLS}
${GOPATH}/bin/goconvey

# Run tests cleanly in a docker container.
test-docker:
$(DOCKER_GOLANG_RUN_CMD) "make test"

assets/assets.go: generate.go ${STATIC}
go generate

Expand All @@ -40,7 +47,19 @@ docker/eremetic: ${SRC}
docker: docker/eremetic docker/Dockerfile docker/marathon.sh
docker build -t ${DOCKERTAG} docker

publish-docker: docker
docker-clean: docker/Dockerfile docker/marathon.sh
# Create the docker/eremetic binary in the Docker container using the
# golang docker image. This ensures a completely clean build.
$(DOCKER_GOLANG_RUN_CMD) "make docker/eremetic"
docker build -t ${DOCKERTAG} docker

publish-docker:
ifeq ($(strip $(shell docker images --format "{{.Repository}}:{{.Tag}}" $(DOCKERTAG))),)
$(warning Docker tag does not exist:)
$(warning ${DOCKERTAG})
$(warning )
$(error Cannot publish the docker image. Please run `make docker` or `make docker-clean` first.)
endif
docker push ${DOCKERTAG}
git describe HEAD --exact 2>/dev/null && \
docker tag ${DOCKERTAG} ${DOCKERNAME}:latest && \
Expand Down
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@ JSON format:
"uris": [
"http://server.local/resource"
],
// Constraints for which slave the task can run on (beyond cpu/memory).
// Matching is strict and only attributes are currently supported. If
// multiple constraints exist, they are evaluated using AND (ie: all or none).
"slave_constraints": [
{
"attribute_name": "aws-region",
"attribute_value": "us-west-2"
}
],
// String, URL to post a callback to. Callback message has format:
// {"time":1451398320,"status":"TASK_FAILED","task_id":"eremetic-task.79feb50d-3d36-47cf-98ff-a52ef2bc0eb5"}
"callback_uri": "http://callback.local"
Expand Down
24 changes: 24 additions & 0 deletions examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,27 @@ arbitrary dockerfiles by url.
]
}
```

## Running a task with certain attributes

This configures a task to run the `busybox` image with a basic loop outputting
the time on any Mesos Slave with the attribute "role" set to "build".

```json
{
"docker_image": "busybox",
"command": "for i in $(seq 1 5); do echo \"`date` $i\"; sleep 5; done",
"task_cpus": 0.1,
"task_mem": 100.0,
"slave_constraints": [
{
"attribute_name": "role",
"attribute_value": "build"
}
]
}
```

Mesos slaves can be configured with arbitrary attributes. See the
[documentation](https://open.mesosphere.com/reference/mesos-slave/) for more
information on how to configure attributes.
1 change: 1 addition & 0 deletions handler/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func makeMap(task types.EremeticTask) map[string]interface{} {
data["Hostname"] = task.Hostname
data["Name"] = task.Name
data["SlaveID"] = task.SlaveId
data["SlaveConstraints"] = task.SlaveConstraints
data["Status"] = task.Status
data["CPU"] = fmt.Sprintf("%.2f", task.TaskCPUs)
data["Memory"] = fmt.Sprintf("%.2f", task.TaskMem)
Expand Down
51 changes: 50 additions & 1 deletion scheduler/match.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package scheduler
import (
"errors"
"fmt"
"strings"

"github.com/Sirupsen/logrus"
ogle "github.com/jacobsa/oglematchers"
Expand All @@ -15,6 +16,10 @@ type resourceMatcher struct {
value float64
}

type attributeMatcher struct {
SlaveConstraints []types.SlaveConstraint
}

func (m *resourceMatcher) Matches(o interface{}) error {
offer := o.(*mesos.Offer)
err := errors.New("")
Expand Down Expand Up @@ -47,10 +52,54 @@ func MemoryAvailable(v float64) ogle.Matcher {
return &resourceMatcher{"mem", v}
}

func (m *attributeMatcher) Matches(o interface{}) (err error) {
offer := o.(*mesos.Offer)
matched := int(0)

for _, constraint := range m.SlaveConstraints {
for _, attr := range offer.Attributes {
if attr.GetName() == constraint.AttributeName {
if attr.GetType() != mesos.Value_TEXT ||
attr.Text.GetValue() != constraint.AttributeValue {
err = errors.New("")

// Match all constraints, not just one.
return
}
matched += 1
}
}
}

if matched != len(m.SlaveConstraints) {
err = errors.New("")
}
return
}

func (m *attributeMatcher) Description() string {
descriptions := []string{}
for _, constraint := range m.SlaveConstraints {
descriptions = append(descriptions,
fmt.Sprintf("slave attribute constraint %s=%s",
constraint.AttributeName,
constraint.AttributeValue,
),
)
}
return strings.Join(descriptions, ", ")
}

func AttributeMatch(slaveConstraints []types.SlaveConstraint) ogle.Matcher {
return &attributeMatcher{slaveConstraints}
}

func createMatcher(task types.EremeticTask) ogle.Matcher {
return ogle.AllOf(
CPUAvailable(task.TaskCPUs),
MemoryAvailable(task.TaskMem))
MemoryAvailable(task.TaskMem),
AttributeMatch(task.SlaveConstraints),
)
}

func matches(matcher ogle.Matcher, o interface{}) bool {
Expand Down
184 changes: 154 additions & 30 deletions scheduler/match_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import (
. "github.com/smartystreets/goconvey/convey"
)

func offer(id string, cpu float64, mem float64) *mesos.Offer {
// Optional attributes can be added.
func offer(id string, cpu float64, mem float64, attributes ...*mesos.Attribute) *mesos.Offer {
return &mesos.Offer{
Id: &mesos.OfferID{
Value: proto.String(id),
Expand All @@ -26,12 +27,36 @@ func offer(id string, cpu float64, mem float64) *mesos.Offer {
mesosutil.NewScalarResource("cpus", cpu),
mesosutil.NewScalarResource("mem", mem),
},
Attributes: attributes,
}
}

func TestMatch(t *testing.T) {
offerA := offer("offer-a", 0.6, 200.0)
offerB := offer("offer-b", 1.8, 512.0)
offerA := offer("offer-a", 0.6, 200.0,
&mesos.Attribute{
Name: proto.String("role"),
Type: mesos.Value_TEXT.Enum(),
Text: &mesos.Value_Text{
Value: proto.String("badassmofo"),
},
},
&mesos.Attribute{
Name: proto.String("node_name"),
Type: mesos.Value_TEXT.Enum(),
Text: &mesos.Value_Text{
Value: proto.String("node1"),
},
},
)
offerB := offer("offer-b", 1.8, 512.0,
&mesos.Attribute{
Name: proto.String("node_name"),
Type: mesos.Value_TEXT.Enum(),
Text: &mesos.Value_Text{
Value: proto.String("node2"),
},
},
)

Convey("CPUAvailable", t, func() {
Convey("Above", func() {
Expand All @@ -49,6 +74,7 @@ func TestMatch(t *testing.T) {

Convey("MemoryAvailable", t, func() {
Convey("Above", func() {

m := MemoryAvailable(128.0)
err := m.Matches(offerA)
So(err, ShouldBeNil)
Expand All @@ -61,39 +87,137 @@ func TestMatch(t *testing.T) {
})
})

Convey("matchOffer", t, func() {
Convey("Match", func() {
task := types.EremeticTask{
TaskCPUs: 0.8,
TaskMem: 128.0,
}
offer, others := matchOffer(task, []*mesos.Offer{offerA, offerB})

So(offer, ShouldEqual, offerB)
So(others, ShouldHaveLength, 1)
So(others, ShouldContain, offerA)
Convey("AttributeMatch", t, func() {
Convey("Does match", func() {
m := AttributeMatch([]types.SlaveConstraint{
types.SlaveConstraint{
AttributeName: "node_name",
AttributeValue: "node1",
},
})
err := m.Matches(offerA)
So(err, ShouldBeNil)
})
Convey("Does not match", func() {
m := AttributeMatch([]types.SlaveConstraint{
types.SlaveConstraint{
AttributeName: "node_name",
AttributeValue: "node2",
},
})
err := m.Matches(offerA)
So(err, ShouldNotBeNil)
})
})

Convey("No match CPU", func() {
task := types.EremeticTask{
TaskCPUs: 2.0,
TaskMem: 128.0,
}
offer, others := matchOffer(task, []*mesos.Offer{offerA, offerB})
Convey("matchOffer", t, func() {
Convey("Tasks without SlaveConstraints", func() {
Convey("Match", func() {
task := types.EremeticTask{
TaskCPUs: 0.8,
TaskMem: 128.0,
}
offer, others := matchOffer(task, []*mesos.Offer{offerA, offerB})

So(offer, ShouldEqual, offerB)
So(others, ShouldHaveLength, 1)
So(others, ShouldContain, offerA)
})

Convey("No match CPU", func() {
task := types.EremeticTask{
TaskCPUs: 2.0,
TaskMem: 128.0,
}
offer, others := matchOffer(task, []*mesos.Offer{offerA, offerB})

So(offer, ShouldBeNil)
So(others, ShouldHaveLength, 2)
})

Convey("No match MEM", func() {
task := types.EremeticTask{
TaskCPUs: 0.2,
TaskMem: 712.0,
}
offer, others := matchOffer(task, []*mesos.Offer{offerA, offerB})

So(offer, ShouldBeNil)
So(others, ShouldHaveLength, 2)
So(offer, ShouldBeNil)
So(others, ShouldHaveLength, 2)
})
})

Convey("No match MEM", func() {
task := types.EremeticTask{
TaskCPUs: 0.2,
TaskMem: 712.0,
}
offer, others := matchOffer(task, []*mesos.Offer{offerA, offerB})
Convey("Tasks with SlaveConstraints", func() {
Convey("Match slave with attribute", func() {
// Use task/mem constraints which match both offers.
task := types.EremeticTask{
TaskCPUs: 0.5,
TaskMem: 128.0,
SlaveConstraints: []types.SlaveConstraint{
types.SlaveConstraint{
AttributeName: "node_name",
AttributeValue: "node2",
},
},
}
offer, others := matchOffer(task, []*mesos.Offer{offerA, offerB})

So(offer, ShouldEqual, offerB)
So(others, ShouldHaveLength, 1)
So(others, ShouldContain, offerA)
})

Convey("No matching slave with attribute", func() {
// Use task/mem constraints which match both offers.
task := types.EremeticTask{
TaskCPUs: 0.5,
TaskMem: 128.0,
SlaveConstraints: []types.SlaveConstraint{
types.SlaveConstraint{
AttributeName: "node_name",
AttributeValue: "sherah",
},
},
}
offer, others := matchOffer(task, []*mesos.Offer{offerA, offerB})

So(offer, ShouldBeNil)
So(others, ShouldHaveLength, 2)
})

Convey("Match slave with mulitple attributes", func() {
// Build two new offers, both with the same role as offerA.
offerC := offer("offer-c", 0.6, 200.0,
&mesos.Attribute{Name: proto.String("role"), Type: mesos.Value_TEXT.Enum(), Text: &mesos.Value_Text{Value: proto.String("badassmofo")}},
&mesos.Attribute{Name: proto.String("node_name"), Type: mesos.Value_TEXT.Enum(), Text: &mesos.Value_Text{Value: proto.String("node3")}},
)
offerD := offer("offer-d", 0.6, 200.0,
&mesos.Attribute{Name: proto.String("role"), Type: mesos.Value_TEXT.Enum(), Text: &mesos.Value_Text{Value: proto.String("badassmofo")}},
)

task := types.EremeticTask{
TaskCPUs: 0.5,
TaskMem: 128.0,
SlaveConstraints: []types.SlaveConstraint{
types.SlaveConstraint{
AttributeName: "role",
AttributeValue: "badassmofo",
},
types.SlaveConstraint{
AttributeName: "node_name",
AttributeValue: "node3",
},
},
}
// Specifically add C last, our expected, so that we ensure
// the other mocks do not match first.
offer, others := matchOffer(task, []*mesos.Offer{offerA, offerD, offerC})

So(offer, ShouldBeNil)
So(others, ShouldHaveLength, 2)
So(offer, ShouldEqual, offerC)
So(others, ShouldHaveLength, 2)
So(others, ShouldContain, offerA)
So(others, ShouldContain, offerD)
})
})
})
}
Loading

0 comments on commit 5535d36

Please sign in to comment.