diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj index e58aeedd0..3316a4c55 100644 --- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj @@ -473,7 +473,7 @@ {tid (SchedulerAssignmentImpl. tid executor->slot)}))) (defn- read-all-supervisor-details [nimbus all-scheduling-slots supervisor->dead-ports] - "return a map: {topology-id SupervisorDetails}" + "return a map: {supervior-id SupervisorDetails}" (let [storm-cluster-state (:storm-cluster-state nimbus) supervisor-infos (all-supervisor-info storm-cluster-state) nonexistent-supervisor-slots (apply dissoc all-scheduling-slots (keys supervisor-infos)) diff --git a/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj b/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj index b5d78f6f8..2ffe7dc57 100644 --- a/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj +++ b/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj @@ -6,10 +6,28 @@ (:gen-class :implements [backtype.storm.scheduler.IScheduler])) -(defn sort-slots [all-slots] - (let [split-up (vals (group-by first all-slots))] - (apply interleave-all split-up) - )) +(defn sort-slots + "used-slots, {supervisorid count(used-slots)} + available-slots, ((supervisorid slot)......)" + [available-slots used-slots] + (loop [result [] available-slots (coll-to-map available-slots) used-slots used-slots] + (let [available-slots (filter-val (complement empty?) available-slots) + sorted-used (->> used-slots + (sort-by val) + (into {}))] + (if (empty? available-slots) + result + (let [slot (loop [available-slots available-slots sorted-used sorted-used] + (let [idle-supervisor (key (first sorted-used)) + ports (get available-slots idle-supervisor nil)] + (if ports + [idle-supervisor (first ports)] + (recur available-slots (dissoc sorted-used idle-supervisor))))) + sv (first slot) + inc-used (assoc sorted-used sv (inc (sorted-used sv))) + update-available (assoc available-slots sv (rest (available-slots sv)))] + (recur (conj result slot) update-available inc-used)) + )))) (defn get-alive-assigned-node+port->executors [cluster topology-id] (let [existing-assignment (.getAssignmentById cluster topology-id) @@ -23,6 +41,16 @@ alive-assigned (reverse-map executor->node+port)] alive-assigned)) +(defn get-supervisors-used-slots + "return {supervisor-id count(used-slots)}" + [cluster] + (->> cluster + (.getSupervisors) + (map-val #(.getUsedPorts cluster %)) + (map-val count) + ) + ) + (defn- schedule-topology [^TopologyDetails topology ^Cluster cluster] (let [topology-id (.getId topology) available-slots (->> (.getAvailableSlots cluster) @@ -35,7 +63,7 @@ total-slots-to-use (min (.getNumWorkers topology) (+ (count available-slots) (count alive-assigned))) reassign-slots (take (- total-slots-to-use (count alive-assigned)) - (sort-slots available-slots)) + (sort-slots available-slots (get-supervisors-used-slots cluster))) reassign-executors (sort (set/difference all-executors (set (apply concat (vals alive-assigned))))) reassignment (into {} (map vector diff --git a/storm-core/src/clj/backtype/storm/util.clj b/storm-core/src/clj/backtype/storm/util.clj index ecc87ef77..20e44322d 100644 --- a/storm-core/src/clj/backtype/storm/util.clj +++ b/storm-core/src/clj/backtype/storm/util.clj @@ -320,6 +320,17 @@ (assoc m v (conj existing k)))) {} amap)) +(defn coll-to-map + "([:a 1][:a 2][:b 1][:c 2]) -> {:a [1 2] :b 1 :c 2}" + [acoll] + (reduce (fn [m item] + (let [existing (get m (first item) [])] + (assoc m (first item) (conj existing (second item))) + ) + ) + {} acoll) + ) + (defmacro print-vars [& vars] (let [prints (for [v vars] `(println ~(str v) ~v))] `(do ~@prints))) diff --git a/storm-core/test/clj/backtype/storm/scheduler_test.clj b/storm-core/test/clj/backtype/storm/scheduler_test.clj index cfa3efdb5..67b10310c 100644 --- a/storm-core/test/clj/backtype/storm/scheduler_test.clj +++ b/storm-core/test/clj/backtype/storm/scheduler_test.clj @@ -1,7 +1,9 @@ (ns backtype.storm.scheduler-test (:use [clojure test]) - (:use [backtype.storm bootstrap config testing]) + (:use [backtype.storm bootstrap config testing util]) (:require [backtype.storm.daemon [nimbus :as nimbus]]) + (:require [backtype.storm.scheduler [EvenScheduler :as EvenScheduler]]) + (:require [backtype.storm.scheduler [DefaultScheduler :as DefaultScheduler]]) (:import [backtype.storm.generated StormTopology]) (:import [backtype.storm.scheduler Cluster SupervisorDetails WorkerSlot ExecutorDetails SchedulerAssignmentImpl Topologies TopologyDetails])) @@ -244,3 +246,76 @@ (is (= false (.isSlotOccupied cluster (WorkerSlot. "supervisor1" (int 3))))) (is (= false (.isSlotOccupied cluster (WorkerSlot. "supervisor1" (int 5))))) )) + +(deftest test-use-resources-evenly + (let [supervisor1 (SupervisorDetails. "supervisor1" "192.168.0.1" (list ) (map int (list 1 3 5 7 9))) + supervisor2 (SupervisorDetails. "supervisor2" "192.168.0.2" (list ) (map int (list 2 4 6 8 10))) + supervisor3 (SupervisorDetails. "supervisor3" "192.168.0.3" (list ) (map int (list 11 13 15 17 19))) + supervisor4 (SupervisorDetails. "supervisor4" "192.168.0.4" (list ) (map int (list 12 14 16 18 20))) + executor1 (ExecutorDetails. (int 1001) (int 1001)) + executor2 (ExecutorDetails. (int 1002) (int 1002)) + executor3 (ExecutorDetails. (int 1003) (int 1003)) + executor4 (ExecutorDetails. (int 1004) (int 1004)) + executor5 (ExecutorDetails. (int 1005) (int 1005)) + executor6 (ExecutorDetails. (int 1006) (int 1006)) + executor7 (ExecutorDetails. (int 1007) (int 1007)) + executor8 (ExecutorDetails. (int 1008) (int 1008)) + executor9 (ExecutorDetails. (int 1009) (int 1009)) + executor10 (ExecutorDetails. (int 1010) (int 1010)) + executor11 (ExecutorDetails. (int 1011) (int 1011)) + executor12 (ExecutorDetails. (int 1012) (int 1012)) + executor13 (ExecutorDetails. (int 1013) (int 1013)) + topology1 (TopologyDetails. "topology1" {TOPOLOGY-NAME "topology-name-1"} + (StormTopology.) + 6 + {executor1 "spout1" + executor2 "bolt1" + executor3 "bolt2" + executor4 "bolt3" + executor5 "bolt4" + executor6 "bolt5"}) + topology2 (TopologyDetails. "topology2" {TOPOLOGY-NAME "topology-name-2"} + (StormTopology.) + 3 + {executor11 "spout11" + executor12 "bolt11" + executor13 "bolt12"}) + topology3 (TopologyDetails. "topology3" {TOPOLOGY-NAME "topology-name-3"} + (StormTopology.) + 4 + {executor7 "spout7" + executor8 "bolt8" + executor9 "bolt9" + executor10 "bolt10"}) + topologies (Topologies. {"topology1" topology1 "topology2" topology2 "topology3" topology3}) + executor->slot1 {executor1 (WorkerSlot. "supervisor1" (int 1)) + executor2 (WorkerSlot. "supervisor1" (int 3)) + executor3 (WorkerSlot. "supervisor1" (int 5)) + executor4 (WorkerSlot. "supervisor2" (int 2)) + executor5 (WorkerSlot. "supervisor3" (int 11)) + executor6 (WorkerSlot. "supervisor4" (int 12)) + } + assignment1 (SchedulerAssignmentImpl. "topology1" executor->slot1) + + executor->slot2 {executor11 (WorkerSlot. "supervisor1" (int 7)) + executor12 (WorkerSlot. "supervisor2" (int 4)) + executor13 (WorkerSlot. "supervisor3" (int 13)) + } + assignment2 (SchedulerAssignmentImpl. "topology2" executor->slot2) + + cluster (Cluster. (nimbus/standalone-nimbus) + {"supervisor1" supervisor1 "supervisor2" supervisor2 "supervisor3" supervisor3 "supervisor4" supervisor4} + {"topology1" assignment1 "topology2" assignment2})] + + (is (= false (.needsScheduling cluster topology1))) + (is (= false (.needsScheduling cluster topology2))) + (is (= true (.needsScheduling cluster topology3))) + + (DefaultScheduler/default-schedule topologies cluster) + ;(EvenScheduler/schedule-topologies-evenly topologies cluster) + + (is (= 4 (count (set (.getUsedPorts cluster supervisor1))))) + (is (= 3 (count (set (.getUsedPorts cluster supervisor2))))) + (is (= 3 (count (set (.getUsedPorts cluster supervisor3))))) + (is (= 3 (count (set (.getUsedPorts cluster supervisor4))))) + ))