Skip to content

Commit

Permalink
Final Code for Taufer's course
Browse files Browse the repository at this point in the history
Here we present the final code we used for the end of the course.

It includes the feature to measure the reaction time when changing the
cgroups value according to the number of containers created. These
experiments were created to explain the results of the presentation
  • Loading branch information
josemonsalve2 committed Jun 23, 2015
1 parent 47c6f71 commit 7ec5aca
Showing 1 changed file with 169 additions and 69 deletions.
238 changes: 169 additions & 69 deletions api/client/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"io"
"io/ioutil"
"math"
"net/http"
"net/url"
"os"
Expand Down Expand Up @@ -2840,7 +2841,7 @@ func (cli *DockerCli) CmdStats(args ...string) error {
}
// do a quick pause so that any failed connections for containers that do not exist are able to be
// evicted before we display the initial or default values.
time.Sleep(500 * time.Millisecond)
time.Sleep(50 * time.Millisecond)
var errs []string
for _, c := range cStats {
c.mu.Lock()
Expand Down Expand Up @@ -2908,6 +2909,7 @@ func (cli *DockerCli) CmdOda(args ...string) error {
controlStep = 2000
timeSlotsControl = false
StaticAssignment = false
ReactionTime = false

//FOR CONFLICTING OPTIONS
ErrConflictControlPolicies = fmt.Errorf("Conflicting options: -ts, -tsv, --observe and -st")
Expand All @@ -2918,20 +2920,18 @@ func (cli *DockerCli) CmdOda(args ...string) error {
flObserveOnly = cmd.Bool([]string{"ob", "-observe"}, false, "Do not adapt, only observe dumping the info every -obv ms. Default 2000ms (incompatible with -d)")
flObserveOnlyValue = cmd.Int([]string{"obv", "-observe_value"}, 2000, "Setting the observe only value. implies --observe (incompatible with -d)")
flTimeSlots = cmd.Bool([]string{"ts", "-time_slots"}, false, "Create timeslots, use -slot_size=val_ms to define time slot. Default 2000ms (incompatible with -d)")
flReactionTime = cmd.Bool([]string{"rt", "-reaction_time"}, false, "Measure how long it takes since the moment the action to modify cgroups is sent to the moment it takes place (Incompatible with any other policy)")
flTimeSlotsValue = cmd.Int([]string{"tsv", "-time_slots_value"}, 2000, "Timeslots value in ms. -tsv=time_in_ms. Implies --time_slots (incompatible with -d)")
flStaticAssignment = cmd.String([]string{"st", "-static"}, "", "Static Assignment of cpu shares. container_name1=share, container_name2=share . (incompatible with -d)")
flFileName = cmd.String([]string{"fd", "-file_dump"}, "", "--file_dump fileName. If defined, the output would ve in a CVS file. Else it would be std output.")
)
//Parsing the flags.
utils.ParseFlags(cmd, args, true)
if *flObserveOnly || *flTimeSlots {
fmt.Fprintln(cli.out, "I got the flag")
}

//FLAG FOR OBSERVE ONLY
if *flObserveOnly || cmd.IsSet("obv") || cmd.IsSet("-observe_value") {
//check if other control policy is active
if *flTimeSlots || cmd.IsSet("-static") || cmd.IsSet("st") || cmd.IsSet("tsv") || cmd.IsSet("-time_slots_value") {
if *flTimeSlots || cmd.IsSet("-static") || cmd.IsSet("st") || cmd.IsSet("tsv") || cmd.IsSet("-time_slots_value") || *flReactionTime {
return ErrConflictControlPolicies
}
//Get the user defined value of clock for the observe
Expand All @@ -2945,7 +2945,7 @@ func (cli *DockerCli) CmdOda(args ...string) error {
//FLAG FOR TIME SLOTS
if *flTimeSlots || cmd.IsSet("tsv") || cmd.IsSet("-time_slots_value") {
//check if other control policy is active
if *flObserveOnly || cmd.IsSet("-static") || cmd.IsSet("st") || cmd.IsSet("obv") || cmd.IsSet("-observe_value") {
if *flObserveOnly || cmd.IsSet("-static") || cmd.IsSet("st") || cmd.IsSet("obv") || cmd.IsSet("-observe_value") || *flReactionTime {
return ErrConflictControlPolicies
}
//Get the user defined value of the clock for time Slots
Expand All @@ -2961,13 +2961,23 @@ func (cli *DockerCli) CmdOda(args ...string) error {
//FLAG FOR STATIC ASSIGNMENT PER CONTAINER
if cmd.IsSet("-static") || cmd.IsSet("st") {
//check if other control policy is active
if *flTimeSlots || cmd.IsSet("tsv") || cmd.IsSet("-time_slots_value") || cmd.IsSet("obv") || cmd.IsSet("-observe_value") {
if *flTimeSlots || cmd.IsSet("tsv") || cmd.IsSet("-time_slots_value") || cmd.IsSet("obv") || cmd.IsSet("-observe_value") || *flReactionTime {
return ErrConflictControlPolicies
}
StaticAssignment = true
return fmt.Errorf("Under Construction")
}

//FLAG FOR REACTION TIME
if *flReactionTime || cmd.IsSet("-reaction_time") || cmd.IsSet("rt") {
//check if other control policy is active
if *flTimeSlots || *flObserveOnly || cmd.IsSet("-static") || cmd.IsSet("st") || cmd.IsSet("tsv") || cmd.IsSet("-time_slots_value") || cmd.IsSet("obv") || cmd.IsSet("-observe_value") {
return ErrConflictControlPolicies
}
controlStep = 500 //by default observation is every 10 ms for this experiment
ReactionTime = true
}

//Check if the flag for file dump was declared in the command line. If so assign the name
if cmd.IsSet("fd") || cmd.IsSet("-file_dump") {
fmt.Fprintln(cli.out, "File "+*flFileName+" created. Filling the information")
Expand All @@ -2981,45 +2991,6 @@ func (cli *DockerCli) CmdOda(args ...string) error {
fmt.Fprintln(w, "TIME(ms)\tCONTAINER\tCPU ")
}

//GETTING ALL CONTAINERS NAMES TODO: We need to do this process every other time for getting when new containers are created
//We receive the information of all the containers
body, _, err := readBody(cli.call("GET", "/containers/json?"+v.Encode(), nil, nil))
if err != nil {
return err
}
//Get a table with all the information of the containers. No filters.
outs := engine.NewTable("Created", 0)
if _, err := outs.ReadListFrom(body); err != nil {
return err
}
//Function to stripNamePrefix
stripNamePrefix := func(ss []string) []string {
for i, s := range ss {
ss[i] = s[1:]
}

return ss
}
//For each of the entries (Each container), get the name and print it.
for _, out := range outs.Data {
var (
outNames = stripNamePrefix(out.GetList("Names"))
)
for _, name := range outNames {
if len(strings.Split(name, "/")) == 1 {
outNames = []string{name}

break
}
}
//Current name. Print it and get the container Stats
name = strings.Join(outNames, ",")
fmt.Fprintln(w, "--", strings.Join(outNames, ","))
s := &containerStats{Name: name}
cStats = append(cStats, s)
go s.Collect(cli)
}

//CHECKING IF WE HAVE A FILE NAME. IF SO WE NEED TO CREATE IT. ALSO WE ONLY NEED TO SHOW THE HEADERS ONCE
if fileName != "" {
var Ferr error
Expand All @@ -3034,45 +3005,147 @@ func (cli *DockerCli) CmdOda(args ...string) error {
}()
header := []byte("TIME(ms)\tCONTAINER\tCPU\n")
fo.Write(header)
} else {
} else if !ReactionTime {
printHeader()
w.Flush()
}

// CONNECTION CHECK FOR STATS
// do a quick pause so that any failed connections for containers that do not exist are able to be
// evicted before we display the initial or default values.
time.Sleep(2000 * time.Millisecond)
var errs []string
for _, c := range cStats {
c.mu.Lock()
if c.err != nil {
errs = append(errs, fmt.Sprintf("%s: %s", c.Name, c.err.Error()))
}
c.mu.Unlock()
}
if len(errs) > 0 {
return fmt.Errorf("%s", strings.Join(errs, ", "))
}

//ODA LOOP SECTION
timeStamp = controlStep
clock_mult := 0
shares := 0

//MEASURING REACTION TIME
started := false
stoped := false
var rollingAverages = make([]float64, len(cStats))
rollAvgWinSize := 100
//Initial State for REACTION TIME
if ReactionTime {
var Ferr error
fo, Ferr = os.Create("ReactTimeResults.txt")
if Ferr != nil {
fmt.Fprintln(cli.out, "ERROR! Couldn't create the file")
}
defer func() {
if err := fo.Close(); err != nil {
fmt.Fprintln(cli.out, "ERROR! While closing the file")
}
}()
header := []byte("TIME(ms)\tCONTAINER\tCPU\n")
fo.Write(header)
tsError := cli.TimeSlots(cStats, 0)
if tsError != nil {
return tsError
}
}

//FOR ADDING NEW CONTAINERS
go func() error {
for {
time.Sleep(1 * time.Second)
//GETTING ALL CONTAINERS NAMES TODO: We need to do this process every other time for getting when new containers are created
//We receive the information of all the containers
wait := false
body, _, err := readBody(cli.call("GET", "/containers/json?"+v.Encode(), nil, nil))
if err != nil {
return err
}
//Get a table with all the information of the containers. No filters.
outs := engine.NewTable("Created", 0)
if _, err := outs.ReadListFrom(body); err != nil {
return err
}
//Function to stripNamePrefix
stripNamePrefix := func(ss []string) []string {
for i, s := range ss {
ss[i] = s[1:]
}
return ss
}
//For each of the entries (Each container), get the name and print it.
visited := make([]bool, len(cStats))
for _, out := range outs.Data {
var (
outNames = stripNamePrefix(out.GetList("Names"))
)
for _, name := range outNames {
if len(strings.Split(name, "/")) == 1 {
outNames = []string{name}
break
}
}
//Current name. Print it and get the container Stats
exist := false
name = strings.Join(outNames, ",")
for i, s := range cStats {
if s.Name == name {
exist = true
visited[i] = true
}
}
if !exist {
wait = true
s := &containerStats{Name: name}
cStats = append(cStats, s)
go s.Collect(cli)
}
}
//DELETE FROM STATS
for i := len(visited) - 1; i >= 0; i-- {
if !visited[i] {
cStats = append(cStats[:i], cStats[i+1:]...)
}
}

// CONNECTION CHECK FOR STATS
// do a quick pause so that any failed connections for containers that do not exist are able to be
// evicted before we display the initial or default values.
if wait {
time.Sleep(500 * time.Millisecond)
var errs []string
for _, c := range cStats {
c.mu.Lock()
if c.err != nil {
errs = append(errs, fmt.Sprintf("%s: %s", c.Name, c.err.Error()))
}
c.mu.Unlock()
}
if len(errs) > 0 {
return fmt.Errorf("%s", strings.Join(errs, ", "))
}
}
}
}()
for _ = range time.Tick(time.Duration(controlStep) / 50 * time.Millisecond) {
//For decide and act 50 times slower
clock_mult = clock_mult + 1
//OBSERVE should be 50 times faster than decide to have a good resolution
timeStamp = timeStamp + controlStep
timeStamp = timeStamp + controlStep/50

toRemove := []int{}
for i, s := range cStats {
s.mu.RLock()
if s.err != nil {
toRemove = append(toRemove, i)
fmt.Fprintf(w, "-----------%d\t%s\t%.2f--------\n",
timeStamp,
s.Name,
s.CpuPercentage)
}

if fileName != "" {
fo.WriteString(fmt.Sprintf("%d\t%s\t%.2f\n", timeStamp, s.Name, s.CpuPercentage))
} else if ReactionTime {
if started && !stoped {
if math.Abs(s.CpuPercentage-rollingAverages[i]) > 100 {
stoped = true
fmt.Fprintf(w, "%d\n", timeStamp)
}
}
//For rolling average
rollingAverages[i] = (rollingAverages[i]*(float64(rollAvgWinSize-1)) + s.CpuPercentage) / float64(rollAvgWinSize)
fo.WriteString(fmt.Sprintf("%d\t%s\t%.2f\n", timeStamp, s.Name, s.CpuPercentage))
} else {
fmt.Fprintf(w, "%d\t%s\t%.2f\n",
timeStamp,
Expand All @@ -3085,18 +3158,18 @@ func (cli *DockerCli) CmdOda(args ...string) error {
i := toRemove[j]
cStats = append(cStats[:i], cStats[i+1:]...)
}
if len(cStats) == 0 {
//TODO Should close to connection ??
return nil
}
//if len(cStats) == 0 {
//TODO Should close to connection ??
// return nil
//}
//if there is no file name, print the information on stdio
if fileName == "" {
w.Flush()
}
//DECIDE AND ACT
if clock_mult == 50 {
clock_mult = 0
if timeSlotsControl {
clock_mult = 0
if shares == len(cStats) {
shares = 0
}
Expand All @@ -3107,6 +3180,33 @@ func (cli *DockerCli) CmdOda(args ...string) error {
shares = shares + 1
} else if StaticAssignment {
fmt.Fprintln(cli.out, "UnderConstruction"+*flStaticAssignment)
} else if ReactionTime {
if timeStamp < 100 {
tsError := cli.TimeSlots(cStats, 0)
if tsError != nil {
return tsError
}
}
if timeStamp >= 5000 && !started {
started = true
fmt.Fprintf(cli.out, "%d:", timeStamp)
tsError := cli.TimeSlots(cStats, 1)
if tsError != nil {
return tsError
}
}
}
//REACTION TIME FINISHED
if stoped {
for _, s := range cStats {
time.Sleep(1000 * time.Millisecond)
_, _, err := readBody(cli.call("POST", "/containers/"+s.Name+"/kill?"+v.Encode(), nil, nil))
if err != nil {
fmt.Fprintf(cli.err, "%s\n", err)
return fmt.Errorf("Error: failed to stop one or more containers")
}
}
return nil
}
}

Expand Down

0 comments on commit 7ec5aca

Please sign in to comment.