From 7f8aff8036f03feea030997e9079c52bcbc9d30c Mon Sep 17 00:00:00 2001 From: fxjwind Date: Fri, 16 Aug 2013 13:21:56 +0800 Subject: [PATCH 1/3] fix wrong comment --- storm-core/src/clj/backtype/storm/daemon/nimbus.clj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj index e58aeedd0..3316a4c55 100644 --- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj @@ -473,7 +473,7 @@ {tid (SchedulerAssignmentImpl. tid executor->slot)}))) (defn- read-all-supervisor-details [nimbus all-scheduling-slots supervisor->dead-ports] - "return a map: {topology-id SupervisorDetails}" + "return a map: {supervior-id SupervisorDetails}" (let [storm-cluster-state (:storm-cluster-state nimbus) supervisor-infos (all-supervisor-info storm-cluster-state) nonexistent-supervisor-slots (apply dissoc all-scheduling-slots (keys supervisor-infos)) From 9d038053a8bf9abab2951aaf1d97c5c8a3b4b4d7 Mon Sep 17 00:00:00 2001 From: fxjwind Date: Wed, 4 Sep 2013 11:36:22 +0800 Subject: [PATCH 2/3] improve sort-slots to sort-by the number of supervisor used slots --- .../storm/scheduler/EvenScheduler.clj | 38 +++++++-- storm-core/src/clj/backtype/storm/util.clj | 24 ++++-- .../clj/backtype/storm/scheduler_test.clj | 77 ++++++++++++++++++- 3 files changed, 127 insertions(+), 12 deletions(-) diff --git a/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj b/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj index b5d78f6f8..56a309d69 100644 --- a/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj +++ b/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj @@ -6,10 +6,28 @@ (:gen-class :implements [backtype.storm.scheduler.IScheduler])) -(defn sort-slots [all-slots] - (let [split-up (vals (group-by first all-slots))] - (apply interleave-all split-up) - )) +(defn sort-slots + "used-slots, {supervisorid count(used-slots)} + all-slots, ((supervisorid slot)......)" + [all-slots used-slots] + (loop [result [] all-slots (coll-to-map all-slots) used-slots used-slots] + (let [all-slots (filter-val (complement empty?) all-slots) + sorted-used (->> used-slots + (sort-by val) + (into {}))] + (if (empty? all-slots) + result + (let [slot (loop [all-slots all-slots sorted sorted-used] + (let [idle-supervisor (key (first sorted)) + ports (get all-slots idle-supervisor nil)] + (if ports + [idle-supervisor (first ports)] + (recur all-slots (dissoc sorted idle-supervisor))))) + sv (first slot) + inc-used (assoc sorted-used sv (inc (sorted-used sv))) + update-all (assoc all-slots sv (rest (all-slots sv)))] + (recur (conj result slot) update-all inc-used)) + )))) (defn get-alive-assigned-node+port->executors [cluster topology-id] (let [existing-assignment (.getAssignmentById cluster topology-id) @@ -23,6 +41,16 @@ alive-assigned (reverse-map executor->node+port)] alive-assigned)) +(defn get-supervisors-used-slots + "return {supervisor-id count(used-slots)}" + [cluster] + (->> cluster + (.getSupervisors) + (map-val #(.getUsedPorts cluster %)) + (map-val count) + ) + ) + (defn- schedule-topology [^TopologyDetails topology ^Cluster cluster] (let [topology-id (.getId topology) available-slots (->> (.getAvailableSlots cluster) @@ -35,7 +63,7 @@ total-slots-to-use (min (.getNumWorkers topology) (+ (count available-slots) (count alive-assigned))) reassign-slots (take (- total-slots-to-use (count alive-assigned)) - (sort-slots available-slots)) + (sort-slots available-slots (get-supervisors-used-slots cluster))) reassign-executors (sort (set/difference all-executors (set (apply concat (vals alive-assigned))))) reassignment (into {} (map vector diff --git a/storm-core/src/clj/backtype/storm/util.clj b/storm-core/src/clj/backtype/storm/util.clj index ecc87ef77..d22cc6569 100644 --- a/storm-core/src/clj/backtype/storm/util.clj +++ b/storm-core/src/clj/backtype/storm/util.clj @@ -320,6 +320,17 @@ (assoc m v (conj existing k)))) {} amap)) +(defn coll-to-map + "([:a 1][:a 2][:b 1][:c 2]) -> {:a [1 2] :b 1 :c 2}" + [acoll] + (reduce (fn [m item] + (let [existing (get m (first item) [])] + (assoc m (first item) (conj existing (second item))) + ) + ) + {} acoll) + ) + (defmacro print-vars [& vars] (let [prints (for [v vars] `(println ~(str v) ~v))] `(do ~@prints))) @@ -600,15 +611,16 @@ (Collections/shuffle state rand)) (.get state (.get curr))) -; this can be rewritten to be tail recursive +;;'(1 2 3 4) '(5 6) '( 7 8 9 10) '() '(11) --> (1 5 7 11 2 6 8 3 9 4 10) (defn interleave-all [& colls] (if (empty? colls) [] - (let [colls (filter (complement empty?) colls) - my-elems (map first colls) - rest-elems (apply interleave-all (map rest colls))] - (concat my-elems rest-elems) - ))) + (loop [result [] colls colls] + (let [colls (filter (complement empty?) colls)] + (if (empty? colls) + (apply concat result) + (recur (conj result (map first colls)) (map rest colls)) + ))))) (defn update [m k afn] (assoc m k (afn (get m k)))) diff --git a/storm-core/test/clj/backtype/storm/scheduler_test.clj b/storm-core/test/clj/backtype/storm/scheduler_test.clj index cfa3efdb5..5b09048e5 100644 --- a/storm-core/test/clj/backtype/storm/scheduler_test.clj +++ b/storm-core/test/clj/backtype/storm/scheduler_test.clj @@ -1,7 +1,9 @@ (ns backtype.storm.scheduler-test (:use [clojure test]) - (:use [backtype.storm bootstrap config testing]) + (:use [backtype.storm bootstrap config testing util]) (:require [backtype.storm.daemon [nimbus :as nimbus]]) + (:require [backtype.storm.scheduler [EvenScheduler :as EvenScheduler]]) + (:require [backtype.storm.scheduler [DefaultScheduler :as DefaultScheduler]]) (:import [backtype.storm.generated StormTopology]) (:import [backtype.storm.scheduler Cluster SupervisorDetails WorkerSlot ExecutorDetails SchedulerAssignmentImpl Topologies TopologyDetails])) @@ -244,3 +246,76 @@ (is (= false (.isSlotOccupied cluster (WorkerSlot. "supervisor1" (int 3))))) (is (= false (.isSlotOccupied cluster (WorkerSlot. "supervisor1" (int 5))))) )) + +(deftest test-use-resources-evenly + (let [supervisor1 (SupervisorDetails. "supervisor1" "192.168.0.1" (list ) (map int (list 1 3 5 7 9))) + supervisor2 (SupervisorDetails. "supervisor2" "192.168.0.2" (list ) (map int (list 2 4 6 8 10))) + supervisor3 (SupervisorDetails. "supervisor3" "192.168.0.3" (list ) (map int (list 11 13 15 17 19))) + supervisor4 (SupervisorDetails. "supervisor4" "192.168.0.4" (list ) (map int (list 12 14 16 18 20))) + executor1 (ExecutorDetails. (int 1001) (int 1001)) + executor2 (ExecutorDetails. (int 1002) (int 1002)) + executor3 (ExecutorDetails. (int 1003) (int 1003)) + executor4 (ExecutorDetails. (int 1004) (int 1004)) + executor5 (ExecutorDetails. (int 1005) (int 1005)) + executor6 (ExecutorDetails. (int 1006) (int 1006)) + executor7 (ExecutorDetails. (int 1007) (int 1007)) + executor8 (ExecutorDetails. (int 1008) (int 1008)) + executor9 (ExecutorDetails. (int 1009) (int 1009)) + executor10 (ExecutorDetails. (int 1010) (int 1010)) + executor11 (ExecutorDetails. (int 1011) (int 1011)) + executor12 (ExecutorDetails. (int 1012) (int 1012)) + executor13 (ExecutorDetails. (int 1013) (int 1013)) + topology1 (TopologyDetails. "topology1" {TOPOLOGY-NAME "topology-name-1"} + (StormTopology.) + 4 + {executor1 "spout1" + executor2 "bolt1" + executor3 "bolt2" + executor4 "bolt3" + executor5 "bolt4" + executor6 "bolt5"}) + topology2 (TopologyDetails. "topology2" {TOPOLOGY-NAME "topology-name-2"} + (StormTopology.) + 3 + {executor11 "spout11" + executor12 "bolt11" + executor13 "bolt12"}) + topology3 (TopologyDetails. "topology3" {TOPOLOGY-NAME "topology-name-3"} + (StormTopology.) + 4 + {executor7 "spout7" + executor8 "bolt8" + executor9 "bolt9" + executor10 "bolt10"}) + topologies (Topologies. {"topology1" topology1 "topology2" topology2 "topology3" topology3}) + executor->slot1 {executor1 (WorkerSlot. "supervisor1" (int 1)) + executor2 (WorkerSlot. "supervisor1" (int 3)) + executor3 (WorkerSlot. "supervisor1" (int 5)) + executor4 (WorkerSlot. "supervisor2" (int 2)) + executor5 (WorkerSlot. "supervisor3" (int 11)) + executor6 (WorkerSlot. "supervisor4" (int 12)) + } + assignment1 (SchedulerAssignmentImpl. "topology1" executor->slot1) + + executor->slot2 {executor11 (WorkerSlot. "supervisor1" (int 7)) + executor12 (WorkerSlot. "supervisor2" (int 4)) + executor13 (WorkerSlot. "supervisor3" (int 13)) + } + assignment2 (SchedulerAssignmentImpl. "topology2" executor->slot2) + + cluster (Cluster. (nimbus/standalone-nimbus) + {"supervisor1" supervisor1 "supervisor2" supervisor2 "supervisor3" supervisor3 "supervisor4" supervisor4} + {"topology1" assignment1 "topology2" assignment2})] + + (is (= false (.needsScheduling cluster topology1))) + (is (= false (.needsScheduling cluster topology2))) + (is (= true (.needsScheduling cluster topology3))) + + (DefaultScheduler/default-schedule topologies cluster) + ;(EvenScheduler/schedule-topologies-evenly topologies cluster) + + (is (= 4 (count (set (.getUsedPorts cluster supervisor1))))) + (is (= 3 (count (set (.getUsedPorts cluster supervisor2))))) + (is (= 3 (count (set (.getUsedPorts cluster supervisor3))))) + (is (= 3 (count (set (.getUsedPorts cluster supervisor4))))) + )) From 0b95d295e77736fe66577819384f7fe490786a99 Mon Sep 17 00:00:00 2001 From: fxjwind Date: Tue, 8 Oct 2013 17:45:38 +0800 Subject: [PATCH 3/3] updated according to comments --- .../storm/scheduler/EvenScheduler.clj | 22 +++++++++---------- storm-core/src/clj/backtype/storm/util.clj | 13 +++++------ .../clj/backtype/storm/scheduler_test.clj | 2 +- 3 files changed, 18 insertions(+), 19 deletions(-) diff --git a/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj b/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj index 56a309d69..2ffe7dc57 100644 --- a/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj +++ b/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj @@ -8,25 +8,25 @@ (defn sort-slots "used-slots, {supervisorid count(used-slots)} - all-slots, ((supervisorid slot)......)" - [all-slots used-slots] - (loop [result [] all-slots (coll-to-map all-slots) used-slots used-slots] - (let [all-slots (filter-val (complement empty?) all-slots) + available-slots, ((supervisorid slot)......)" + [available-slots used-slots] + (loop [result [] available-slots (coll-to-map available-slots) used-slots used-slots] + (let [available-slots (filter-val (complement empty?) available-slots) sorted-used (->> used-slots (sort-by val) (into {}))] - (if (empty? all-slots) + (if (empty? available-slots) result - (let [slot (loop [all-slots all-slots sorted sorted-used] - (let [idle-supervisor (key (first sorted)) - ports (get all-slots idle-supervisor nil)] + (let [slot (loop [available-slots available-slots sorted-used sorted-used] + (let [idle-supervisor (key (first sorted-used)) + ports (get available-slots idle-supervisor nil)] (if ports [idle-supervisor (first ports)] - (recur all-slots (dissoc sorted idle-supervisor))))) + (recur available-slots (dissoc sorted-used idle-supervisor))))) sv (first slot) inc-used (assoc sorted-used sv (inc (sorted-used sv))) - update-all (assoc all-slots sv (rest (all-slots sv)))] - (recur (conj result slot) update-all inc-used)) + update-available (assoc available-slots sv (rest (available-slots sv)))] + (recur (conj result slot) update-available inc-used)) )))) (defn get-alive-assigned-node+port->executors [cluster topology-id] diff --git a/storm-core/src/clj/backtype/storm/util.clj b/storm-core/src/clj/backtype/storm/util.clj index d22cc6569..20e44322d 100644 --- a/storm-core/src/clj/backtype/storm/util.clj +++ b/storm-core/src/clj/backtype/storm/util.clj @@ -611,16 +611,15 @@ (Collections/shuffle state rand)) (.get state (.get curr))) -;;'(1 2 3 4) '(5 6) '( 7 8 9 10) '() '(11) --> (1 5 7 11 2 6 8 3 9 4 10) +; this can be rewritten to be tail recursive (defn interleave-all [& colls] (if (empty? colls) [] - (loop [result [] colls colls] - (let [colls (filter (complement empty?) colls)] - (if (empty? colls) - (apply concat result) - (recur (conj result (map first colls)) (map rest colls)) - ))))) + (let [colls (filter (complement empty?) colls) + my-elems (map first colls) + rest-elems (apply interleave-all (map rest colls))] + (concat my-elems rest-elems) + ))) (defn update [m k afn] (assoc m k (afn (get m k)))) diff --git a/storm-core/test/clj/backtype/storm/scheduler_test.clj b/storm-core/test/clj/backtype/storm/scheduler_test.clj index 5b09048e5..67b10310c 100644 --- a/storm-core/test/clj/backtype/storm/scheduler_test.clj +++ b/storm-core/test/clj/backtype/storm/scheduler_test.clj @@ -267,7 +267,7 @@ executor13 (ExecutorDetails. (int 1013) (int 1013)) topology1 (TopologyDetails. "topology1" {TOPOLOGY-NAME "topology-name-1"} (StormTopology.) - 4 + 6 {executor1 "spout1" executor2 "bolt1" executor3 "bolt2"