diff --git a/storm-core/src/clj/backtype/storm/cluster.clj b/storm-core/src/clj/backtype/storm/cluster.clj index 7231b15d6..055eca6f5 100644 --- a/storm-core/src/clj/backtype/storm/cluster.clj +++ b/storm-core/src/clj/backtype/storm/cluster.clj @@ -117,6 +117,7 @@ (update-storm! [this storm-id new-elems]) (remove-storm-base! [this storm-id]) (set-assignment! [this storm-id info]) + (remove-assignment! [this storm-id]) (remove-storm! [this storm-id]) (report-error [this storm-id task-id error]) (errors [this storm-id task-id]) @@ -327,6 +328,9 @@ (set-data cluster-state (assignment-path storm-id) (Utils/serialize info)) ) + (remove-assignment! [this storm-id] + (delete-node cluster-state (assignment-path storm-id))) + (remove-storm! [this storm-id] (delete-node cluster-state (assignment-path storm-id)) (remove-storm-base! this storm-id)) diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj index 04731dc8f..7e88760e8 100644 --- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj @@ -642,7 +642,8 @@ ;; for the topology which wants rebalance (specified by the scratch-topology-id) ;; we exclude its assignment, meaning that all the slots occupied by its assignment ;; will be treated as free slot in the scheduler code. - (when (or (nil? scratch-topology-id) (not= tid scratch-topology-id)) + (if (and (not-nil? scratch-topology-id) (= tid scratch-topology-id)) + (.remove-assignment! storm-cluster-state tid) {tid (.assignment-info storm-cluster-state tid nil)}))) ;; make the new assignments for topologies topology->executor->node+port (compute-new-topology->executor->node+port