diff --git a/storm-core/src/clj/backtype/storm/cluster.clj b/storm-core/src/clj/backtype/storm/cluster.clj index 7231b15d6..02fae73cb 100644 --- a/storm-core/src/clj/backtype/storm/cluster.clj +++ b/storm-core/src/clj/backtype/storm/cluster.clj @@ -173,9 +173,9 @@ (cb id)) )) -(defn- maybe-deserialize [ser] +(defn- maybe-deserialize-clj-bytes [ser] (when ser - (Utils/deserialize ser))) + (deserialize-clj-bytes ser))) (defstruct TaskError :error :time-secs) @@ -230,7 +230,7 @@ (assignment-info [this storm-id callback] (when callback (swap! assignment-info-callback assoc storm-id callback)) - (maybe-deserialize (get-data cluster-state (assignment-path storm-id) (not-nil? callback))) + (maybe-deserialize-clj-bytes (get-data cluster-state (assignment-path storm-id) (not-nil? callback))) ) (active-storms [this] @@ -248,7 +248,7 @@ (get-worker-heartbeat [this storm-id node port] (-> cluster-state (get-data (workerbeat-path storm-id node port) false) - maybe-deserialize)) + maybe-deserialize-clj-bytes)) (executor-beats [this storm-id executor->node+port] ;; need to take executor->node+port in explicitly so that we don't run into a situation where a @@ -269,11 +269,11 @@ ) (supervisor-info [this supervisor-id] - (maybe-deserialize (get-data cluster-state (supervisor-path supervisor-id) false)) + (maybe-deserialize-clj-bytes (get-data cluster-state (supervisor-path supervisor-id) false)) ) (worker-heartbeat! [this storm-id node port info] - (set-data cluster-state (workerbeat-path storm-id node port) (Utils/serialize info))) + (set-data cluster-state (workerbeat-path storm-id node port) (serialize-clj-bytes info))) (remove-worker-heartbeat! [this storm-id node port] (delete-node cluster-state (workerbeat-path storm-id node port)) @@ -297,11 +297,11 @@ ))) (supervisor-heartbeat! [this supervisor-id info] - (set-ephemeral-node cluster-state (supervisor-path supervisor-id) (Utils/serialize info)) + (set-ephemeral-node cluster-state (supervisor-path supervisor-id) (serialize-clj-bytes info)) ) (activate-storm! [this storm-id storm-base] - (set-data cluster-state (storm-path storm-id) (Utils/serialize storm-base)) + (set-data cluster-state (storm-path storm-id) (serialize-clj-bytes storm-base)) ) (update-storm! [this storm-id new-elems] @@ -311,12 +311,12 @@ (set-data cluster-state (storm-path storm-id) (-> base (merge new-elems) - Utils/serialize)))) + serialize-clj-bytes)))) (storm-base [this storm-id callback] (when callback (swap! storm-base-callback assoc storm-id callback)) - (maybe-deserialize (get-data cluster-state (storm-path storm-id) (not-nil? callback))) + (maybe-deserialize-clj-bytes (get-data cluster-state (storm-path storm-id) (not-nil? callback))) ) (remove-storm-base! [this storm-id] @@ -324,7 +324,7 @@ ) (set-assignment! [this storm-id info] - (set-data cluster-state (assignment-path storm-id) (Utils/serialize info)) + (set-data cluster-state (assignment-path storm-id) (serialize-clj-bytes info)) ) (remove-storm! [this storm-id] @@ -335,7 +335,7 @@ (let [path (error-path storm-id component-id) data {:time-secs (current-time-secs) :error (stringify-error error)} _ (mkdirs cluster-state path) - _ (create-sequential cluster-state (str path "/e") (Utils/serialize data)) + _ (create-sequential cluster-state (str path "/e") (serialize-clj-bytes data)) to-kill (->> (get-children cluster-state path false) (sort-by parse-error-path) reverse @@ -349,7 +349,7 @@ children (get-children cluster-state path false) errors (dofor [c children] (let [data (-> (get-data cluster-state (str path "/" c) false) - maybe-deserialize)] + maybe-deserialize-clj-bytes)] (when data (struct TaskError (:error data) (:time-secs data)) ))) diff --git a/storm-core/src/clj/backtype/storm/config.clj b/storm-core/src/clj/backtype/storm/config.clj index c11074eb1..cedc5b7bc 100644 --- a/storm-core/src/clj/backtype/storm/config.clj +++ b/storm-core/src/clj/backtype/storm/config.clj @@ -4,6 +4,7 @@ (:import [backtype.storm.utils Utils LocalState]) (:import [org.apache.commons.io FileUtils]) (:require [clojure [string :as str]]) + (:use [backtype.storm.utils localstate-serializer]) (:use [backtype.storm util]) ) @@ -168,19 +169,20 @@ (str stormroot "/" RESOURCES-SUBDIR)) (defn ^LocalState supervisor-state [conf] - (LocalState. (str (supervisor-local-dir conf) "/localstate"))) + (LocalState. (str (supervisor-local-dir conf) "/localstate") + (localstate-serializer))) (defn read-supervisor-storm-conf [conf storm-id] (let [stormroot (supervisor-stormdist-root conf storm-id) conf-path (supervisor-stormconf-path stormroot) topology-path (supervisor-stormcode-path stormroot)] - (merge conf (Utils/deserialize (FileUtils/readFileToByteArray (File. conf-path)))) + (merge conf (deserialize-clj-bytes (FileUtils/readFileToByteArray (File. conf-path)))) )) (defn read-supervisor-topology [conf storm-id] (let [stormroot (supervisor-stormdist-root conf storm-id) topology-path (supervisor-stormcode-path stormroot)] - (Utils/deserialize (FileUtils/readFileToByteArray (File. topology-path))) + (Utils/deserializeTopology (FileUtils/readFileToByteArray (File. topology-path))) )) (defn worker-root @@ -204,4 +206,5 @@ ;; if supervisor stops receiving heartbeat, it kills and restarts the process ;; in local mode, keep a global map of ids to threads for simulating process management (defn ^LocalState worker-state [conf id] - (LocalState. (worker-heartbeats-root conf id))) + (LocalState. (worker-heartbeats-root conf id) (localstate-serializer))) + diff --git a/storm-core/src/clj/backtype/storm/daemon/common.clj b/storm-core/src/clj/backtype/storm/daemon/common.clj index dd8b12f85..72f7bffba 100644 --- a/storm-core/src/clj/backtype/storm/daemon/common.clj +++ b/storm-core/src/clj/backtype/storm/daemon/common.clj @@ -40,12 +40,6 @@ (defprotocol DaemonCommon (waiting? [this])) -(def LS-WORKER-HEARTBEAT "worker-heartbeat") - -;; LocalState constants -(def LS-ID "supervisor-id") -(def LS-LOCAL-ASSIGNMENTS "local-assignments") -(def LS-APPROVED-WORKERS "approved-workers") @@ -321,7 +315,7 @@ (supervisor-storm-resources-path (supervisor-stormdist-root (:conf worker) (:storm-id worker))) (worker-pids-root (:conf worker) (:worker-id worker)) - (:port worker) + (int (:port worker)) (:task-ids worker) (:default-shared-resources worker) (:user-shared-resources worker) diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj index e58aeedd0..1da3691de 100644 --- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj @@ -68,7 +68,7 @@ (defn- read-storm-conf [conf storm-id] (let [stormroot (master-stormdist-root conf storm-id)] (merge conf - (Utils/deserialize + (deserialize-clj-bytes (FileUtils/readFileToByteArray (File. (master-stormconf-path stormroot)) ))))) @@ -295,13 +295,13 @@ (FileUtils/forceMkdir (File. stormroot)) (FileUtils/cleanDirectory (File. stormroot)) (setup-jar conf tmp-jar-location stormroot) - (FileUtils/writeByteArrayToFile (File. (master-stormcode-path stormroot)) (Utils/serialize topology)) - (FileUtils/writeByteArrayToFile (File. (master-stormconf-path stormroot)) (Utils/serialize storm-conf)) + (FileUtils/writeByteArrayToFile (File. (master-stormcode-path stormroot)) (Utils/serializeTopology topology)) + (FileUtils/writeByteArrayToFile (File. (master-stormconf-path stormroot)) (serialize-clj-bytes storm-conf)) )) (defn- read-storm-topology [conf storm-id] (let [stormroot (master-stormdist-root conf storm-id)] - (Utils/deserialize + (Utils/deserializeTopology (FileUtils/readFileToByteArray (File. (master-stormcode-path stormroot)) )))) diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj index 150443165..2fc6e7de0 100644 --- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj +++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj @@ -1,7 +1,9 @@ (ns backtype.storm.daemon.supervisor (:import [backtype.storm.scheduler ISupervisor]) + (:import [backtype.storm Constants]) (:use [backtype.storm bootstrap]) (:use [backtype.storm.daemon common]) + (:use [backtype.storm.utils localstate-serializer]) (:require [backtype.storm.daemon [worker :as worker]]) (:gen-class :methods [^{:static true} [launch [backtype.storm.scheduler.ISupervisor] void]])) @@ -59,7 +61,7 @@ (defn read-worker-heartbeat [conf id] (let [local-state (worker-state conf id)] - (.get local-state LS-WORKER-HEARTBEAT) + (.get local-state Constants/LS_WORKER_HEARTBEAT) )) @@ -89,7 +91,7 @@ (let [conf (:conf supervisor) ^LocalState local-state (:local-state supervisor) id->heartbeat (read-worker-heartbeats conf) - approved-ids (set (keys (.get local-state LS-APPROVED-WORKERS)))] + approved-ids (set (keys (.get local-state Constants/LS_APPROVED_WORKERS)))] (into {} (dofor [[id hb] id->heartbeat] @@ -110,9 +112,9 @@ ))) (defn- wait-for-worker-launch [conf id start-time] - (let [state (worker-state conf id)] + (let [state (worker-state conf id)] (loop [] - (let [hb (.get state LS-WORKER-HEARTBEAT)] + (let [hb (.get state Constants/LS_WORKER_HEARTBEAT)] (when (and (not hb) (< @@ -123,7 +125,7 @@ (Time/sleep 500) (recur) ))) - (when-not (.get state LS-WORKER-HEARTBEAT) + (when-not (.get state Constants/LS_WORKER_HEARTBEAT) (log-message "Worker " id " failed to start") ))) @@ -184,7 +186,7 @@ (defn sync-processes [supervisor] (let [conf (:conf supervisor) ^LocalState local-state (:local-state supervisor) - assigned-executors (defaulted (.get local-state LS-LOCAL-ASSIGNMENTS) {}) + assigned-executors (defaulted (.get local-state Constants/LS_LOCAL_ASSIGNMENTS) {}) now (current-time-secs) allocated (read-allocated-workers supervisor assigned-executors now) keepers (filter-val @@ -221,11 +223,12 @@ )) (doseq [id (vals new-worker-ids)] (local-mkdirs (worker-pids-root conf id))) - (.put local-state LS-APPROVED-WORKERS + (.put local-state Constants/LS_APPROVED_WORKERS (merge - (select-keys (.get local-state LS-APPROVED-WORKERS) + (select-keys (.get local-state Constants/LS_APPROVED_WORKERS) (keys keepers)) (zipmap (vals new-worker-ids) (keys new-worker-ids)) + ;))) )) (wait-for-workers-launch conf @@ -269,7 +272,7 @@ new-assignment (->> all-assignment (filter-key #(.confirmAssigned isupervisor %))) assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment) - existing-assignment (.get local-state LS-LOCAL-ASSIGNMENTS)] + existing-assignment (.get local-state Constants/LS_LOCAL_ASSIGNMENTS)] (log-debug "Synchronizing supervisor") (log-debug "Storm code map: " storm-code-map) (log-debug "Downloaded storm ids: " downloaded-storm-ids) @@ -301,7 +304,7 @@ (.killedWorker isupervisor (int p))) (.assigned isupervisor (keys new-assignment)) (.put local-state - LS-LOCAL-ASSIGNMENTS + Constants/LS_LOCAL_ASSIGNMENTS new-assignment) (reset! (:curr-assignment supervisor) new-assignment) ;; remove any downloaded code that's no longer assigned or active @@ -481,11 +484,11 @@ (reify ISupervisor (prepare [this conf local-dir] (reset! conf-atom conf) - (let [state (LocalState. local-dir) - curr-id (if-let [id (.get state LS-ID)] + (let [state (LocalState. local-dir (localstate-serializer)) + curr-id (if-let [id (.get state Constants/LS_ID)] id (generate-supervisor-id))] - (.put state LS-ID curr-id) + (.put state Constants/LS_ID curr-id) (reset! id-atom curr-id)) ) (confirmAssigned [this port] diff --git a/storm-core/src/clj/backtype/storm/daemon/task.clj b/storm-core/src/clj/backtype/storm/daemon/task.clj index a517e6f43..77937ddbc 100644 --- a/storm-core/src/clj/backtype/storm/daemon/task.clj +++ b/storm-core/src/clj/backtype/storm/daemon/task.clj @@ -24,7 +24,7 @@ (supervisor-stormdist-root conf (:storm-id worker))) (worker-pids-root conf (:worker-id worker)) (int %) - (:port worker) + (int (:port worker)) (:task-ids worker) (:default-shared-resources worker) (:user-shared-resources worker) diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj index 5182027c2..2252229a6 100644 --- a/storm-core/src/clj/backtype/storm/daemon/worker.clj +++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj @@ -5,6 +5,7 @@ (:import [java.util.concurrent Executors]) (:import [backtype.storm.messaging TransportFactory]) (:import [backtype.storm.messaging IContext IConnection]) + (:import [backtype.storm Constants]) (:gen-class)) (bootstrap) @@ -49,13 +50,12 @@ (log-debug "Doing heartbeat " (pr-str hb)) ;; do the local-file-system heartbeat. (.put state - LS-WORKER-HEARTBEAT + Constants/LS_WORKER_HEARTBEAT hb false ) (.cleanup state 60) ; this is just in case supervisor is down so that disk doesn't fill up. ; it shouldn't take supervisor 120 seconds between listing dir and reading it - )) (defn worker-outbound-tasks diff --git a/storm-core/src/clj/backtype/storm/testing.clj b/storm-core/src/clj/backtype/storm/testing.clj index 2eb92d814..d90b08d78 100644 --- a/storm-core/src/clj/backtype/storm/testing.clj +++ b/storm-core/src/clj/backtype/storm/testing.clj @@ -11,6 +11,7 @@ (:import [java.util HashMap ArrayList]) (:import [java.util.concurrent.atomic AtomicInteger]) (:import [java.util.concurrent ConcurrentHashMap]) + (:import [backtype.storm Constants]) (:import [backtype.storm.utils Time Utils RegisteredGlobalState]) (:import [backtype.storm.tuple Fields Tuple TupleImpl]) (:import [backtype.storm.task TopologyContext]) @@ -258,13 +259,13 @@ (defn find-worker-id [supervisor-conf port] (let [supervisor-state (supervisor-state supervisor-conf) - worker->port (.get supervisor-state common/LS-APPROVED-WORKERS)] + worker->port (.get supervisor-state Constants/LS_APPROVED_WORKERS)] (first ((reverse-map worker->port) port)) )) (defn find-worker-port [supervisor-conf worker-id] (let [supervisor-state (supervisor-state supervisor-conf) - worker->port (.get supervisor-state common/LS-APPROVED-WORKERS) + worker->port (.get supervisor-state Constants/LS_APPROVED_WORKERS) ] (worker->port worker-id) )) diff --git a/storm-core/src/clj/backtype/storm/util.clj b/storm-core/src/clj/backtype/storm/util.clj index ecc87ef77..383653a71 100644 --- a/storm-core/src/clj/backtype/storm/util.clj +++ b/storm-core/src/clj/backtype/storm/util.clj @@ -844,3 +844,14 @@ (meta form)) (list form x))) ([x form & more] `(-<> (-<> ~x ~form) ~@more))) + +(def ^:const def-ser-enc "UTF-8") + +(defn serialize-clj-bytes [form] + "serializes Clojure form to UTF-8 byte array" + (.getBytes (pr-str form) def-ser-enc)) + +(defn deserialize-clj-bytes [form] + "deserializes Clojure form fom UTF-8 byte array" + (read-string (String. form def-ser-enc))) + diff --git a/storm-core/src/clj/backtype/storm/utils/localstate_serializer.clj b/storm-core/src/clj/backtype/storm/utils/localstate_serializer.clj new file mode 100644 index 000000000..ed5822b76 --- /dev/null +++ b/storm-core/src/clj/backtype/storm/utils/localstate_serializer.clj @@ -0,0 +1,34 @@ +;; Use to serialize LocalState. We assume that LocalState is a k/v store that +;; uses java.util.HashMap to store the following keys +;; LS_ID : String +;; LS_WORKER_HEARTBEAT : backtype.storm.daemon.common.WorkerHeartbeat +;; LS_LOCAL_ASSIGNMENTS : clojure.lang.PersistentArrayMap +;; LS_APPROVED_WORKERS : clojure.lang.PersistentArrayMap + +(ns backtype.storm.utils.localstate-serializer + (:import [backtype.storm.utils Utils]) + (:import [backtype.storm Constants]) + (:use [backtype.storm util]) + ) + +; java.util.HashMap -> byte[] +(defn serialize-localstate [form] + (serialize-clj-bytes (into {} form))) + +; byte[] -> java.util.HashMap +(defn deserialize-localstate [form] + (let [newm (java.util.HashMap.)] + (.putAll newm (deserialize-clj-bytes form)) + newm)) + +(defn localstate-serializer [] + (reify + backtype.storm.utils.StateSerializer + (serializeState [this val] (serialize-localstate val)) + (deserializeState [this ser] (deserialize-localstate ser)))) + +(defn localstate-default-serializer [] + (reify + backtype.storm.utils.StateSerializer + (serializeState [this val] (Utils/serialize val)) + (deserializeState [this ser] (Utils/deserialize ser)))) \ No newline at end of file diff --git a/storm-core/src/jvm/backtype/storm/Constants.java b/storm-core/src/jvm/backtype/storm/Constants.java index a8ade3c53..ab63db0dc 100644 --- a/storm-core/src/jvm/backtype/storm/Constants.java +++ b/storm-core/src/jvm/backtype/storm/Constants.java @@ -14,5 +14,11 @@ public class Constants { public static final String METRICS_COMPONENT_ID_PREFIX = "__metrics"; public static final String METRICS_STREAM_ID = "__metrics"; public static final String METRICS_TICK_STREAM_ID = "__metrics_tick"; + + // LocalState constants + public static final String LS_WORKER_HEARTBEAT = "worker-heartbeat"; + public static final String LS_ID = "supervisor-id"; + public static final String LS_LOCAL_ASSIGNMENTS = "local-assignments"; + public static final String LS_APPROVED_WORKERS = "approved-workers"; } - \ No newline at end of file + diff --git a/storm-core/src/jvm/backtype/storm/utils/LocalState.java b/storm-core/src/jvm/backtype/storm/utils/LocalState.java index 73560392c..439625408 100644 --- a/storm-core/src/jvm/backtype/storm/utils/LocalState.java +++ b/storm-core/src/jvm/backtype/storm/utils/LocalState.java @@ -14,9 +14,11 @@ */ public class LocalState { private VersionedStore _vs; - - public LocalState(String backingDir) throws IOException { + private StateSerializer _stSer; + + public LocalState (String backingDir, StateSerializer stSer) throws IOException { _vs = new VersionedStore(backingDir); + _stSer = stSer; } public synchronized Map snapshot() throws IOException { @@ -25,7 +27,7 @@ public synchronized Map snapshot() throws IOException { String latestPath = _vs.mostRecentVersionPath(); if(latestPath==null) return new HashMap(); try { - return (Map) Utils.deserialize(FileUtils.readFileToByteArray(new File(latestPath))); + return (Map) _stSer.deserializeState(FileUtils.readFileToByteArray(new File(latestPath))); } catch(IOException e) { attempts++; if(attempts >= 10) { @@ -64,10 +66,10 @@ public synchronized void cleanup(int keepVersions) throws IOException { } private void persist(Map val, boolean cleanup) throws IOException { - byte[] toWrite = Utils.serialize(val); + byte[] toWrite = _stSer.serializeState(val); String newPath = _vs.createVersion(); FileUtils.writeByteArrayToFile(new File(newPath), toWrite); _vs.succeedVersion(newPath); if(cleanup) _vs.cleanup(4); } -} \ No newline at end of file +} diff --git a/storm-core/src/jvm/backtype/storm/utils/StateSerializer.java b/storm-core/src/jvm/backtype/storm/utils/StateSerializer.java new file mode 100644 index 000000000..ad111359a --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/utils/StateSerializer.java @@ -0,0 +1,12 @@ +package backtype.storm.utils; + +import java.util.Map; + +/** + * Interface for serializing state, for example, when the `Supervisor` + * serializes `LocalState`. + */ +public interface StateSerializer { + public byte[] serializeState (Map val); + public Map deserializeState (byte[] ser); +} \ No newline at end of file diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java index a31402e5a..57875aad2 100644 --- a/storm-core/src/jvm/backtype/storm/utils/Utils.java +++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java @@ -30,6 +30,9 @@ import java.util.UUID; import org.apache.commons.lang.StringUtils; import org.apache.thrift7.TException; +import org.apache.thrift7.TSerializer; +import org.apache.thrift7.TDeserializer; +import org.apache.thrift7.protocol.TBinaryProtocol; import org.json.simple.JSONValue; import org.yaml.snakeyaml.Yaml; @@ -71,6 +74,30 @@ public static Object deserialize(byte[] serialized) { } } + public static byte[] serializeTopology(StormTopology topo) { + // TSerializer not threadsafe; lock or new instance every time + TSerializer serializer = + new TSerializer(new TBinaryProtocol.Factory()); + try { + return serializer.serialize(topo); + } catch(TException te) { + throw new RuntimeException(te); + } + } + + public static StormTopology deserializeTopology(byte[] bytes) { + // TDeserializer not threadsafe; lock or new instance every time + TDeserializer deserializer = + new TDeserializer(new TBinaryProtocol.Factory()); + StormTopology topo = new StormTopology(); + try { + deserializer.deserialize(topo, bytes); + return topo; + } catch(TException te) { + throw new RuntimeException(te); + } + } + public static String join(Iterable coll, String sep) { Iterator it = coll.iterator(); String ret = ""; diff --git a/storm-core/test/clj/backtype/storm/local_state_test.clj b/storm-core/test/clj/backtype/storm/local_state_test.clj index 71e73631d..a7c08ff37 100644 --- a/storm-core/test/clj/backtype/storm/local_state_test.clj +++ b/storm-core/test/clj/backtype/storm/local_state_test.clj @@ -1,12 +1,13 @@ (ns backtype.storm.local-state-test (:use [clojure test]) (:use [backtype.storm testing]) + (:use [backtype.storm.utils localstate-serializer]) (:import [backtype.storm.utils LocalState])) (deftest test-local-state (with-local-tmp [dir1 dir2] - (let [ls1 (LocalState. dir1) - ls2 (LocalState. dir2)] + (let [ls1 (LocalState. dir1 (localstate-serializer)) + ls2 (LocalState. dir2 (localstate-serializer))] (is (= {} (.snapshot ls1))) (.put ls1 "a" 1) (.put ls1 "b" 2) @@ -15,7 +16,8 @@ (is (= 1 (.get ls1 "a"))) (is (= nil (.get ls1 "c"))) (is (= 2 (.get ls1 "b"))) - (is (= {"a" 1 "b" 2} (.snapshot (LocalState. dir1)))) + (is (= {"a" 1 "b" 2} (.snapshot (LocalState. + dir1 (localstate-serializer))))) (.put ls2 "b" 1) (.put ls2 "b" 2) (.put ls2 "b" 3)