From 28b57e09d4291af7d206cf93a3d9d436ad7ed3a8 Mon Sep 17 00:00:00 2001 From: yunfan123 Date: Mon, 6 Mar 2017 18:36:19 +0800 Subject: [PATCH] Support jstorm error reporter and cluster error manager plugin. Change-Id: I56a3417d8f5939ef83666126f8cd560e004fb292 --- .../jstorm/client/ConfigExtension.java | 18 ++ .../com/alibaba/jstorm/cluster/Cluster.java | 7 +- .../jstorm/cluster/IStormErrorManager.java | 42 +++ .../jstorm/cluster/StormZkClusterState.java | 129 ++------- .../jstorm/cluster/StormZkErrorManager.java | 256 ++++++++++++++++++ .../jstorm/daemon/supervisor/Supervisor.java | 7 +- .../daemon/supervisor/SyncProcessEvent.java | 4 +- .../daemon/worker/IWorkerReportError.java | 15 + .../jstorm/daemon/worker/WorkerData.java | 2 +- .../daemon/worker/WorkerReportError.java | 11 +- 10 files changed, 373 insertions(+), 118 deletions(-) create mode 100644 jstorm-core/src/main/java/com/alibaba/jstorm/cluster/IStormErrorManager.java create mode 100644 jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormZkErrorManager.java create mode 100644 jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/IWorkerReportError.java diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/client/ConfigExtension.java b/jstorm-core/src/main/java/com/alibaba/jstorm/client/ConfigExtension.java index 40fbb1846..0c535b4e8 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/client/ConfigExtension.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/client/ConfigExtension.java @@ -18,6 +18,7 @@ package com.alibaba.jstorm.client; import backtype.storm.Config; +import backtype.storm.ConfigValidation; import backtype.storm.utils.Utils; import com.alibaba.jstorm.config.DefaultConfigUpdateHandler; import com.alibaba.jstorm.utils.JStormUtils; @@ -1310,4 +1311,21 @@ public static Boolean getTopologyAccurateMetric(Map conf) { public static Integer getTopologyHistogramSize(Map conf) { return JStormUtils.parseInt(conf.get(TOPOLOGY_HISTOGRAM_SIZE), 256); } + + public static final String WORKER_ERROR_REPORT_PLUGIN = "worker.error.reporter.plugin.class"; + + public static String getWorkerErrorReportPluginClass(Map conf) { + if (conf.containsKey(WORKER_ERROR_REPORT_PLUGIN)) { + return (String)conf.get(WORKER_ERROR_REPORT_PLUGIN); + } + return "com.alibaba.jstorm.daemon.worker.WorkerReportError"; + } + + public static final String CLUSTER_ERROR_MANAGER_PLUGIN = "cluster.error.manager.plugin"; + public static final String getClusterErrorManagerPlugin(Map conf) { + if (conf.containsKey(CLUSTER_ERROR_MANAGER_PLUGIN)) { + return (String)conf.get(CLUSTER_ERROR_MANAGER_PLUGIN); + } + return "com.alibaba.jstorm.cluster.StormZkErrorManager"; + } } \ No newline at end of file diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/Cluster.java b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/Cluster.java index 82ce3ab23..5805e8815 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/Cluster.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/Cluster.java @@ -153,11 +153,12 @@ public static String blacklist_path(String key) { @SuppressWarnings("rawtypes") public static StormClusterState mk_storm_cluster_state(Map cluster_state_spec) throws Exception { - return new StormZkClusterState(cluster_state_spec); + return new StormZkClusterState(cluster_state_spec, cluster_state_spec); } - public static StormClusterState mk_storm_cluster_state(ClusterState cluster_state_spec) throws Exception { - return new StormZkClusterState(cluster_state_spec); + public static StormClusterState mk_storm_cluster_state(ClusterState cluster_state_spec, + Map conf) throws Exception { + return new StormZkClusterState(cluster_state_spec, conf); } public static Map get_all_taskInfo(StormClusterState zkCluster, String topologyId) throws Exception { diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/IStormErrorManager.java b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/IStormErrorManager.java new file mode 100644 index 000000000..905302366 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/IStormErrorManager.java @@ -0,0 +1,42 @@ +package com.alibaba.jstorm.cluster; + +import com.alibaba.jstorm.task.error.TaskError; + +import java.util.List; +import java.util.Map; + +/** + * Created by yunfan on 2017/2/28. + */ +public interface IStormErrorManager { + + public void init(Map conf, ClusterState obj); + + public List task_error_storms() throws Exception; + + public List task_error_ids(String topologyId) throws Exception; + + public void report_task_error(String topology_id, int task_id, Throwable error) throws Exception; + + public void report_task_error(String topology_id, int task_id, String error) throws Exception; + + public void report_task_error(String topology_id, int task_id, String error, String error_level, int error_code) throws Exception; + + public void report_task_error(String topology_id, int task_id, String error, String error_level, int error_code, int duration) throws Exception; + + public void report_task_error(String topology_id, int task_id, String error, String error_level, int error_code, int duration, String tag) throws Exception; + + public Map topo_lastErr_time(String topologyId) throws Exception; + + public void remove_lastErr_time(String topologyId) throws Exception; + + public List task_errors(String topology_id, int task_id) throws Exception; + + public void remove_task_error(String topologyId, int taskId) throws Exception; + + public List task_error_time(String topologyId, int taskId) throws Exception; + + public TaskError task_error_info(String topologyId, int taskId, long timeStamp) throws Exception; + + public void teardown_task_errors(String topology_id) throws Exception; +} diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormZkClusterState.java b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormZkClusterState.java index ca12f2661..8b28b16d8 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormZkClusterState.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormZkClusterState.java @@ -23,6 +23,7 @@ import com.alibaba.jstorm.cache.JStormCache; import com.alibaba.jstorm.callback.ClusterStateCallback; import com.alibaba.jstorm.callback.RunnableCallback; +import com.alibaba.jstorm.client.ConfigExtension; import com.alibaba.jstorm.daemon.supervisor.SupervisorInfo; import com.alibaba.jstorm.schedule.Assignment; import com.alibaba.jstorm.schedule.AssignmentBak; @@ -46,6 +47,8 @@ public class StormZkClusterState implements StormClusterState { private ClusterState cluster_state; + private IStormErrorManager stormErrorManager; + private ConcurrentHashMap assignment_info_callback; private AtomicReference supervisors_callback; private AtomicReference assignments_callback; @@ -57,17 +60,20 @@ public class StormZkClusterState implements StormClusterState { private boolean solo; - public StormZkClusterState(Object cluster_state_spec) throws Exception { + public StormZkClusterState(Object cluster_state_spec, Map conf) throws Exception { if (cluster_state_spec instanceof ClusterState) { solo = false; cluster_state = (ClusterState) cluster_state_spec; } else { - solo = true; cluster_state = new DistributedClusterState((Map) cluster_state_spec); } + String errorPluginClass = ConfigExtension.getClusterErrorManagerPlugin(conf); + stormErrorManager = (IStormErrorManager)Utils.newInstance(errorPluginClass); + stormErrorManager.init(conf, cluster_state); + assignment_info_callback = new ConcurrentHashMap(); supervisors_callback = new AtomicReference(null); assignments_callback = new AtomicReference(null); @@ -364,74 +370,32 @@ public void teardown_heartbeats(String topologyId) { @Override public void report_task_error(String topologyId, int taskId, Throwable error) throws Exception { - report_task_error(topologyId, taskId, JStormUtils.getErrorInfo(error), + stormErrorManager.report_task_error(topologyId, taskId, JStormUtils.getErrorInfo(error), ErrorConstants.FATAL, ErrorConstants.CODE_USER); } @Override public void report_task_error(String topology_id, int task_id, String error) throws Exception { // we use this interface only in user level error - report_task_error(topology_id, task_id, error, ErrorConstants.FATAL, ErrorConstants.CODE_USER); + stormErrorManager.report_task_error(topology_id, task_id, error, ErrorConstants.FATAL, ErrorConstants.CODE_USER); } @Override public void report_task_error(String topology_id, int task_id, String error, String error_level, int error_code) throws Exception { - report_task_error(topology_id, task_id, error, error_level, error_code, ErrorConstants.DURATION_SECS_DEFAULT); + stormErrorManager.report_task_error(topology_id, task_id, error, error_level, error_code, ErrorConstants.DURATION_SECS_DEFAULT); } @Override public void report_task_error(String topology_id, int task_id, String error, String error_level, int error_code, int duration_secs) throws Exception { - report_task_error(topology_id, task_id, error, error_level, error_code, duration_secs, null); + stormErrorManager.report_task_error(topology_id, task_id, error, error_level, error_code, duration_secs, null); } @Override public void report_task_error(String topology_id, int task_id, String error, String error_level, int error_code, int duration_secs, String tag) throws Exception { - boolean found = false; - String path = Cluster.taskerror_path(topology_id, task_id); - cluster_state.mkdirs(path); - - List children = new ArrayList(); - - int timeSecs = TimeUtils.current_time_secs(); - String timestampPath = path + Cluster.ZK_SEPERATOR + timeSecs; - TaskError taskError = new TaskError(error, error_level, error_code, timeSecs, duration_secs); - - for (String str : cluster_state.get_children(path, false)) { - String errorPath = path + Cluster.ZK_SEPERATOR + str; - Object obj = getObject(errorPath, false); - if (obj == null){ - deleteObject(errorPath); - continue; - } - - TaskError errorInfo = (TaskError) obj; - - // replace the old one if needed - if (errorInfo.getError().equals(error) - || (tag != null && errorInfo.getError().startsWith(tag))) { - cluster_state.delete_node(errorPath); - setObject(timestampPath, taskError); - removeLastErrInfoDuration(topology_id, taskError.getDurationSecs()); - found = true; - break; - } - - children.add(Integer.parseInt(str)); - } - - if (!found) { - Collections.sort(children); - - while (children.size() >= 3) { - deleteObject(path + Cluster.ZK_SEPERATOR + children.remove(0)); - } - - setObject(timestampPath, taskError); - } - setLastErrInfo(topology_id, duration_secs, timeSecs); + stormErrorManager.report_task_error(topology_id, task_id, error, error_level, error_code, duration_secs, tag); } private static final String TASK_IS_DEAD = "is dead on"; // Full string is @@ -477,40 +441,32 @@ private void setLastErrInfo(String topologyId, int duration, int timeStamp) thro @Override public void remove_task_error(String topologyId, int taskId) throws Exception { - String path = Cluster.taskerror_path(topologyId, taskId); - cluster_state.delete_node(path); + stormErrorManager.remove_task_error(topologyId, taskId); } @Override public Map topo_lastErr_time(String topologyId) throws Exception { - String path = Cluster.lasterror_path(topologyId); - - return (Map) getObject(path, false); + return stormErrorManager.topo_lastErr_time(topologyId); } @Override public void remove_lastErr_time(String topologyId) throws Exception { - String path = Cluster.lasterror_path(topologyId); - deleteObject(path); + stormErrorManager.remove_lastErr_time(topologyId); } @Override public List task_error_storms() throws Exception { - return cluster_state.get_children(Cluster.TASKERRORS_SUBTREE, false); + return stormErrorManager.task_error_storms(); } @Override public List task_error_ids(String topologyId) throws Exception { - return cluster_state.get_children(Cluster.taskerror_storm_root(topologyId), false); + return stormErrorManager.task_error_ids(topologyId); } @Override public List task_error_time(String topologyId, int taskId) throws Exception { - String path = Cluster.taskerror_path(topologyId, taskId); - if (cluster_state.node_existed(path, false) == false) { - return new ArrayList(); - } - return cluster_state.get_children(path, false); + return stormErrorManager.task_error_time(topologyId, taskId); } @Override @@ -529,56 +485,17 @@ public void remove_task(String topologyId, Set taskIds) throws Exceptio @Override public TaskError task_error_info(String topologyId, int taskId, long timeStamp) throws Exception { - String path = Cluster.taskerror_path(topologyId, taskId); - path = path + "/" + timeStamp; - return (TaskError) getObject(path, false); + return stormErrorManager.task_error_info(topologyId, taskId, timeStamp); } @Override public List task_errors(String topologyId, int taskId) throws Exception { - List errors = new ArrayList(); - String path = Cluster.taskerror_path(topologyId, taskId); - if (cluster_state.node_existed(path, false) == false) { - return errors; - } - - List children = cluster_state.get_children(path, false); - - - for (String str : children) { - Object obj = getObject(path + Cluster.ZK_SEPERATOR + str, false); - if (obj != null) { - TaskError error = (TaskError) obj; - errors.add(error); - } - } - - Collections.sort(errors, new Comparator() { - - @Override - public int compare(TaskError o1, TaskError o2) { - if (o1.getTimSecs() > o2.getTimSecs()) { - return 1; - } - if (o1.getTimSecs() < o2.getTimSecs()) { - return -1; - } - return 0; - } - }); - - return errors; - + return stormErrorManager.task_errors(topologyId, taskId); } @Override - public void teardown_task_errors(String topologyId) { - try { - String taskerrPath = Cluster.taskerror_storm_root(topologyId); - deleteObject(taskerrPath); - } catch (Exception e) { - LOG.error("Could not teardown errors for " + topologyId, e); - } + public void teardown_task_errors(String topologyId) throws Exception { + stormErrorManager.teardown_task_errors(topologyId); } @Override diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormZkErrorManager.java b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormZkErrorManager.java new file mode 100644 index 000000000..1160087fb --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormZkErrorManager.java @@ -0,0 +1,256 @@ +package com.alibaba.jstorm.cluster; + +import backtype.storm.utils.Utils; +import com.alibaba.jstorm.task.error.ErrorConstants; +import com.alibaba.jstorm.task.error.TaskError; +import com.alibaba.jstorm.utils.JStormUtils; +import com.alibaba.jstorm.utils.TimeUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +/** + * Created by yunfan on 2017/2/28. + */ + +public class StormZkErrorManager implements IStormErrorManager { + + private static Logger LOG = LoggerFactory.getLogger(StormZkErrorManager.class); + + private ClusterState cluster_state; + + @Override + public void init(Map conf, ClusterState cluster_state) { + this.cluster_state = cluster_state; + } + + @Override + public void report_task_error(String topologyId, int taskId, Throwable error) throws Exception { + report_task_error(topologyId, taskId, JStormUtils.getErrorInfo(error), + ErrorConstants.FATAL, ErrorConstants.CODE_USER); + } + + @Override + public void report_task_error(String topology_id, int task_id, String error) throws Exception { + // we use this interface only in user level error + report_task_error(topology_id, task_id, error, ErrorConstants.FATAL, ErrorConstants.CODE_USER); + } + + @Override + public void report_task_error(String topology_id, int task_id, String error, String error_level, int error_code) + throws Exception { + report_task_error(topology_id, task_id, error, error_level, error_code, ErrorConstants.DURATION_SECS_DEFAULT); + } + + @Override + public void report_task_error(String topology_id, int task_id, String error, String error_level, int error_code, + int duration_secs) throws Exception { + report_task_error(topology_id, task_id, error, error_level, error_code, duration_secs, null); + } + + @Override + public void report_task_error(String topology_id, int task_id, String error, String error_level, int error_code, + int duration_secs, String tag) throws Exception { + boolean found = false; + String path = Cluster.taskerror_path(topology_id, task_id); + cluster_state.mkdirs(path); + + List children = new ArrayList(); + + int timeSecs = TimeUtils.current_time_secs(); + String timestampPath = path + Cluster.ZK_SEPERATOR + timeSecs; + TaskError taskError = new TaskError(error, error_level, error_code, timeSecs, duration_secs); + + for (String str : cluster_state.get_children(path, false)) { + String errorPath = path + Cluster.ZK_SEPERATOR + str; + Object obj = getObject(errorPath, false); + if (obj == null){ + deleteObject(errorPath); + continue; + } + + TaskError errorInfo = (TaskError) obj; + + // replace the old one if needed + if (errorInfo.getError().equals(error) + || (tag != null && errorInfo.getError().startsWith(tag))) { + cluster_state.delete_node(errorPath); + setObject(timestampPath, taskError); + found = true; + break; + } + + children.add(Integer.parseInt(str)); + } + + if (!found) { + Collections.sort(children); + + while (children.size() >= 3) { + deleteObject(path + Cluster.ZK_SEPERATOR + children.remove(0)); + } + + setObject(timestampPath, taskError); + } + setLastErrInfo(topology_id, duration_secs, timeSecs); + } + + private static final String TASK_IS_DEAD = "is dead on"; // Full string is + // "task-id is dead on hostname:port" + + private void setLastErrInfo(String topologyId, int duration, int timeStamp) throws Exception { + // Set error information in task error topology patch + // Last Error information format in ZK: map + // report_duration means only the errors will presented in web ui if the + // error happens within this duration. + // Currently, the duration for "queue full" error is 180sec(3min) while + // the duration for other errors is 1800sec(30min). + String lastErrTopoPath = Cluster.lasterror_path(topologyId); + Map lastErrInfo = null; + try { + lastErrInfo = (Map) getObject(lastErrTopoPath, false); + + } catch (Exception e) { + LOG.error("Failed to get last error time. Remove the corrupt node for " + topologyId, e); + remove_lastErr_time(topologyId); + lastErrInfo = null; + } + if (lastErrInfo == null) + lastErrInfo = new HashMap(); + + // The error time is used to indicate how long the error info is present + // in UI + lastErrInfo.put(duration, timeStamp + ""); + setObject(lastErrTopoPath, lastErrInfo); + } + + @Override + public void remove_task_error(String topologyId, int taskId) throws Exception { + String path = Cluster.taskerror_path(topologyId, taskId); + cluster_state.delete_node(path); + } + + @Override + public Map topo_lastErr_time(String topologyId) throws Exception { + String path = Cluster.lasterror_path(topologyId); + + return (Map) getObject(path, false); + } + + @Override + public void remove_lastErr_time(String topologyId) throws Exception { + String path = Cluster.lasterror_path(topologyId); + deleteObject(path); + } + + @Override + public List task_error_storms() throws Exception { + return cluster_state.get_children(Cluster.TASKERRORS_SUBTREE, false); + } + + @Override + public List task_error_ids(String topologyId) throws Exception { + return cluster_state.get_children(Cluster.taskerror_storm_root(topologyId), false); + } + + @Override + public List task_error_time(String topologyId, int taskId) throws Exception { + String path = Cluster.taskerror_path(topologyId, taskId); + if (cluster_state.node_existed(path, false) == false) { + return new ArrayList(); + } + return cluster_state.get_children(path, false); + } + + public Object getObject(String path, boolean callback) throws Exception { + byte[] data = cluster_state.get_data(path, callback); + + return Utils.maybe_deserialize(data); + } + + public Object getObjectSync(String path, boolean callback) throws Exception { + byte[] data = cluster_state.get_data_sync(path, callback); + + return Utils.maybe_deserialize(data); + } + + public String getString(String path, boolean callback) throws Exception { + byte[] data = cluster_state.get_data(path, callback); + + return new String(data); + } + + public void deleteObject(String path) { + try { + cluster_state.delete_node(path); + } catch (Exception e) { + LOG.warn("Failed to delete node " + path); + } + } + + public void setObject(String path, Object obj) throws Exception { + if (obj instanceof byte[]) { + cluster_state.set_data(path, (byte[]) obj); + } else if (obj instanceof String) { + cluster_state.set_data(path, ((String) obj).getBytes()); + } else { + cluster_state.set_data(path, Utils.serialize(obj)); + } + } + + @Override + public TaskError task_error_info(String topologyId, int taskId, long timeStamp) throws Exception { + String path = Cluster.taskerror_path(topologyId, taskId); + path = path + "/" + timeStamp; + return (TaskError) getObject(path, false); + } + + @Override + public List task_errors(String topologyId, int taskId) throws Exception { + List errors = new ArrayList(); + String path = Cluster.taskerror_path(topologyId, taskId); + if (cluster_state.node_existed(path, false) == false) { + return errors; + } + + List children = cluster_state.get_children(path, false); + + + for (String str : children) { + Object obj = getObject(path + Cluster.ZK_SEPERATOR + str, false); + if (obj != null) { + TaskError error = (TaskError) obj; + errors.add(error); + } + } + + Collections.sort(errors, new Comparator() { + + @Override + public int compare(TaskError o1, TaskError o2) { + if (o1.getTimSecs() > o2.getTimSecs()) { + return 1; + } + if (o1.getTimSecs() < o2.getTimSecs()) { + return -1; + } + return 0; + } + }); + + return errors; + + } + + @Override + public void teardown_task_errors(String topologyId) { + try { + String taskerrPath = Cluster.taskerror_storm_root(topologyId); + deleteObject(taskerrPath); + } catch (Exception e) { + LOG.error("Could not teardown errors for " + topologyId, e); + } + } + +} diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/Supervisor.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/Supervisor.java index 76efc561c..23eaa61e5 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/Supervisor.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/Supervisor.java @@ -18,7 +18,7 @@ package com.alibaba.jstorm.daemon.supervisor; import com.alibaba.jstorm.config.SupervisorRefreshConfig; -import com.alibaba.jstorm.metric.JStormMetricsReporter; +import com.alibaba.jstorm.daemon.worker.IWorkerReportError; import java.io.File; import java.util.Map; import java.util.UUID; @@ -103,7 +103,10 @@ public SupervisorManger mkSupervisor(Map conf, IContext sharedContext) throws Ex StormClusterState stormClusterState = Cluster.mk_storm_cluster_state(conf); String hostName = JStormServerUtils.getHostName(conf); - WorkerReportError workerReportError = new WorkerReportError(stormClusterState, hostName); + String errorReporterClass = ConfigExtension.getWorkerErrorReportPluginClass(conf); + + IWorkerReportError workerReportError = (IWorkerReportError)Utils.newInstance(errorReporterClass); + workerReportError.init(conf, stormClusterState); /** * Step 3, create LocalStat (a simple KV store) diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SyncProcessEvent.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SyncProcessEvent.java index ee2005f14..28994292c 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SyncProcessEvent.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SyncProcessEvent.java @@ -94,11 +94,11 @@ class SyncProcessEvent extends ShutdownWork { // private Supervisor supervisor; private int lastTime; - private WorkerReportError workerReportError; + private IWorkerReportError workerReportError; public SyncProcessEvent(String supervisorId, Map conf, LocalState localState, ConcurrentHashMap workerThreadPids, - IContext sharedContext, WorkerReportError workerReportError) { + IContext sharedContext, IWorkerReportError workerReportError) { this.supervisorId = supervisorId; diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/IWorkerReportError.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/IWorkerReportError.java new file mode 100644 index 000000000..10b4a7084 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/IWorkerReportError.java @@ -0,0 +1,15 @@ +package com.alibaba.jstorm.daemon.worker; + +import java.util.Map; +import java.util.Set; + +/** + * Created by yunfan on 2017/2/28. + */ +public interface IWorkerReportError { + + public void init(Map conf, Object obj); + + public void report(String topology_id, Integer worker_port, + Set tasks, String error, int errorCode); +} diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerData.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerData.java index ab2e506f6..36a5a2bd0 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerData.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerData.java @@ -206,7 +206,7 @@ public WorkerData(Map conf, IContext context, String topology_id, String supervi // create zk interface this.zkClusterstate = ZkTool.mk_distributed_cluster_state(conf); - this.zkCluster = Cluster.mk_storm_cluster_state(zkClusterstate); + this.zkCluster = Cluster.mk_storm_cluster_state(zkClusterstate, conf); Map rawConf = StormConfig.read_supervisor_topology_conf(conf, topology_id); this.stormConf = new HashMap(); diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerReportError.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerReportError.java index 816d9ed5c..5146bea90 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerReportError.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerReportError.java @@ -2,24 +2,27 @@ import com.alibaba.jstorm.cluster.StormClusterState; import com.alibaba.jstorm.task.error.ErrorConstants; +import com.alibaba.jstorm.utils.JStormServerUtils; import com.alibaba.jstorm.utils.TimeFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Date; +import java.util.Map; import java.util.Set; /** * @author xiaojian.fxj */ -public class WorkerReportError { +public class WorkerReportError implements IWorkerReportError { private static Logger LOG = LoggerFactory.getLogger(WorkerReportError.class); private StormClusterState zkCluster; private String hostName; - public WorkerReportError(StormClusterState stormClusterState, String hostName) { - this.zkCluster = stormClusterState; - this.hostName = hostName; + @Override + public void init(Map conf, Object obj) { + this.hostName = JStormServerUtils.getHostName(conf); + this.zkCluster = (StormClusterState)obj; } public void report(String topologyId, Integer workerPort,