Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Evenscheduler to schedule executors based supervisor used slots #673

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion storm-core/src/clj/backtype/storm/daemon/nimbus.clj
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@
{tid (SchedulerAssignmentImpl. tid executor->slot)})))

(defn- read-all-supervisor-details [nimbus all-scheduling-slots supervisor->dead-ports]
"return a map: {topology-id SupervisorDetails}"
"return a map: {supervior-id SupervisorDetails}"
(let [storm-cluster-state (:storm-cluster-state nimbus)
supervisor-infos (all-supervisor-info storm-cluster-state)
nonexistent-supervisor-slots (apply dissoc all-scheduling-slots (keys supervisor-infos))
Expand Down
38 changes: 33 additions & 5 deletions storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,28 @@
(:gen-class
:implements [backtype.storm.scheduler.IScheduler]))

(defn sort-slots [all-slots]
(let [split-up (vals (group-by first all-slots))]
(apply interleave-all split-up)
))
(defn sort-slots
"used-slots, {supervisorid count(used-slots)}
all-slots, ((supervisorid slot)......)"
[all-slots used-slots]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all-slots actually just contains the available-slots, better keep the name consistent with its actual usage.

(loop [result [] all-slots (coll-to-map all-slots) used-slots used-slots]
(let [all-slots (filter-val (complement empty?) all-slots)
sorted-used (->> used-slots
(sort-by val)
(into {}))]
(if (empty? all-slots)
result
(let [slot (loop [all-slots all-slots sorted sorted-used]
(let [idle-supervisor (key (first sorted))
ports (get all-slots idle-supervisor nil)]
(if ports
[idle-supervisor (first ports)]
(recur all-slots (dissoc sorted idle-supervisor)))))
sv (first slot)
inc-used (assoc sorted-used sv (inc (sorted-used sv)))
update-all (assoc all-slots sv (rest (all-slots sv)))]
(recur (conj result slot) update-all inc-used))
))))

(defn get-alive-assigned-node+port->executors [cluster topology-id]
(let [existing-assignment (.getAssignmentById cluster topology-id)
Expand All @@ -23,6 +41,16 @@
alive-assigned (reverse-map executor->node+port)]
alive-assigned))

(defn get-supervisors-used-slots
"return {supervisor-id count(used-slots)}"
[cluster]
(->> cluster
(.getSupervisors)
(map-val #(.getUsedPorts cluster %))
(map-val count)
)
)

(defn- schedule-topology [^TopologyDetails topology ^Cluster cluster]
(let [topology-id (.getId topology)
available-slots (->> (.getAvailableSlots cluster)
Expand All @@ -35,7 +63,7 @@
total-slots-to-use (min (.getNumWorkers topology)
(+ (count available-slots) (count alive-assigned)))
reassign-slots (take (- total-slots-to-use (count alive-assigned))
(sort-slots available-slots))
(sort-slots available-slots (get-supervisors-used-slots cluster)))
reassign-executors (sort (set/difference all-executors (set (apply concat (vals alive-assigned)))))
reassignment (into {}
(map vector
Expand Down
24 changes: 18 additions & 6 deletions storm-core/src/clj/backtype/storm/util.clj
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,17 @@
(assoc m v (conj existing k))))
{} amap))

(defn coll-to-map
"([:a 1][:a 2][:b 1][:c 2]) -> {:a [1 2] :b 1 :c 2}"
[acoll]
(reduce (fn [m item]
(let [existing (get m (first item) [])]
(assoc m (first item) (conj existing (second item)))
)
)
{} acoll)
)

(defmacro print-vars [& vars]
(let [prints (for [v vars] `(println ~(str v) ~v))]
`(do ~@prints)))
Expand Down Expand Up @@ -600,15 +611,16 @@
(Collections/shuffle state rand))
(.get state (.get curr)))

; this can be rewritten to be tail recursive
;;'(1 2 3 4) '(5 6) '( 7 8 9 10) '() '(11) --> (1 5 7 11 2 6 8 3 9 4 10)
(defn interleave-all [& colls]
(if (empty? colls)
[]
(let [colls (filter (complement empty?) colls)
my-elems (map first colls)
rest-elems (apply interleave-all (map rest colls))]
(concat my-elems rest-elems)
)))
(loop [result [] colls colls]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems this PR doesn't use the interleave-all function at all, better make it another PR if you really want to change it.

(let [colls (filter (complement empty?) colls)]
(if (empty? colls)
(apply concat result)
(recur (conj result (map first colls)) (map rest colls))
)))))

(defn update [m k afn]
(assoc m k (afn (get m k))))
Expand Down
77 changes: 76 additions & 1 deletion storm-core/test/clj/backtype/storm/scheduler_test.clj
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
(ns backtype.storm.scheduler-test
(:use [clojure test])
(:use [backtype.storm bootstrap config testing])
(:use [backtype.storm bootstrap config testing util])
(:require [backtype.storm.daemon [nimbus :as nimbus]])
(:require [backtype.storm.scheduler [EvenScheduler :as EvenScheduler]])
(:require [backtype.storm.scheduler [DefaultScheduler :as DefaultScheduler]])
(:import [backtype.storm.generated StormTopology])
(:import [backtype.storm.scheduler Cluster SupervisorDetails WorkerSlot ExecutorDetails
SchedulerAssignmentImpl Topologies TopologyDetails]))
Expand Down Expand Up @@ -244,3 +246,76 @@
(is (= false (.isSlotOccupied cluster (WorkerSlot. "supervisor1" (int 3)))))
(is (= false (.isSlotOccupied cluster (WorkerSlot. "supervisor1" (int 5)))))
))

(deftest test-use-resources-evenly
(let [supervisor1 (SupervisorDetails. "supervisor1" "192.168.0.1" (list ) (map int (list 1 3 5 7 9)))
supervisor2 (SupervisorDetails. "supervisor2" "192.168.0.2" (list ) (map int (list 2 4 6 8 10)))
supervisor3 (SupervisorDetails. "supervisor3" "192.168.0.3" (list ) (map int (list 11 13 15 17 19)))
supervisor4 (SupervisorDetails. "supervisor4" "192.168.0.4" (list ) (map int (list 12 14 16 18 20)))
executor1 (ExecutorDetails. (int 1001) (int 1001))
executor2 (ExecutorDetails. (int 1002) (int 1002))
executor3 (ExecutorDetails. (int 1003) (int 1003))
executor4 (ExecutorDetails. (int 1004) (int 1004))
executor5 (ExecutorDetails. (int 1005) (int 1005))
executor6 (ExecutorDetails. (int 1006) (int 1006))
executor7 (ExecutorDetails. (int 1007) (int 1007))
executor8 (ExecutorDetails. (int 1008) (int 1008))
executor9 (ExecutorDetails. (int 1009) (int 1009))
executor10 (ExecutorDetails. (int 1010) (int 1010))
executor11 (ExecutorDetails. (int 1011) (int 1011))
executor12 (ExecutorDetails. (int 1012) (int 1012))
executor13 (ExecutorDetails. (int 1013) (int 1013))
topology1 (TopologyDetails. "topology1" {TOPOLOGY-NAME "topology-name-1"}
(StormTopology.)
4
{executor1 "spout1"
executor2 "bolt1"
executor3 "bolt2"
executor4 "bolt3"
executor5 "bolt4"
executor6 "bolt5"})
topology2 (TopologyDetails. "topology2" {TOPOLOGY-NAME "topology-name-2"}
(StormTopology.)
3
{executor11 "spout11"
executor12 "bolt11"
executor13 "bolt12"})
topology3 (TopologyDetails. "topology3" {TOPOLOGY-NAME "topology-name-3"}
(StormTopology.)
4
{executor7 "spout7"
executor8 "bolt8"
executor9 "bolt9"
executor10 "bolt10"})
topologies (Topologies. {"topology1" topology1 "topology2" topology2 "topology3" topology3})
executor->slot1 {executor1 (WorkerSlot. "supervisor1" (int 1))
executor2 (WorkerSlot. "supervisor1" (int 3))
executor3 (WorkerSlot. "supervisor1" (int 5))
executor4 (WorkerSlot. "supervisor2" (int 2))
executor5 (WorkerSlot. "supervisor3" (int 11))
executor6 (WorkerSlot. "supervisor4" (int 12))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the test data here actually violates the worker number constraints, you specified the worker number to 4 for topology1, but here it is assigned 6 different workers.

}
assignment1 (SchedulerAssignmentImpl. "topology1" executor->slot1)

executor->slot2 {executor11 (WorkerSlot. "supervisor1" (int 7))
executor12 (WorkerSlot. "supervisor2" (int 4))
executor13 (WorkerSlot. "supervisor3" (int 13))
}
assignment2 (SchedulerAssignmentImpl. "topology2" executor->slot2)

cluster (Cluster. (nimbus/standalone-nimbus)
{"supervisor1" supervisor1 "supervisor2" supervisor2 "supervisor3" supervisor3 "supervisor4" supervisor4}
{"topology1" assignment1 "topology2" assignment2})]

(is (= false (.needsScheduling cluster topology1)))
(is (= false (.needsScheduling cluster topology2)))
(is (= true (.needsScheduling cluster topology3)))

(DefaultScheduler/default-schedule topologies cluster)
;(EvenScheduler/schedule-topologies-evenly topologies cluster)

(is (= 4 (count (set (.getUsedPorts cluster supervisor1)))))
(is (= 3 (count (set (.getUsedPorts cluster supervisor2)))))
(is (= 3 (count (set (.getUsedPorts cluster supervisor3)))))
(is (= 3 (count (set (.getUsedPorts cluster supervisor4)))))
))