From e803e53957291431ef9a14561d544efeeaa786f6 Mon Sep 17 00:00:00 2001 From: Jonathan Schneider Date: Thu, 5 Aug 2021 20:43:57 +0000 Subject: [PATCH] refactor: Use Java standard library instead of Guava Co-authored-by: Moderne --- .../FastWordCountTopNWindowTopology.java | 9 +++---- .../tools/RankableObjectWithFields.java | 4 ++-- .../apache/storm/starter/tools/Rankings.java | 7 +++--- .../backtype/storm/command/blobstore.java | 3 +-- .../backtype/storm/command/gray_upgrade.java | 5 ++-- .../jstorm/blobstore/BlobStoreUtils.java | 6 ++--- .../jstorm/cache/rocksdb/RocksDbFactory.java | 5 ++-- .../impl/RollbackTransitionCallback.java | 6 +++-- .../jstorm/cluster/StormZkClusterState.java | 5 ++-- .../jstorm/common/metric/AsmMetric.java | 2 +- .../jstorm/config/YarnConfigBlacklist.java | 7 +++--- .../jstorm/daemon/nimbus/NimbusUtils.java | 15 ++++-------- .../jstorm/daemon/nimbus/ServiceHandler.java | 13 ++++------ .../nimbus/metric/refresh/RefreshEvent.java | 9 ++----- .../daemon/supervisor/ShutdownWork.java | 4 ++-- .../supervisor/SyncSupervisorEvent.java | 3 +-- .../metric/DefaultMetricQueryClient.java | 24 +++++++++---------- .../jstorm/metric/JStormMetricCache.java | 3 +-- .../alibaba/jstorm/metric/JStormMetrics.java | 12 ++++------ .../jstorm/schedule/FollowerRunnable.java | 12 +++++----- .../task/master/GrayUpgradeHandler.java | 9 +++---- .../bolt/KvStatefulBoltExecutor.java | 7 +++--- .../jstorm/window/SlidingCountWindows.java | 5 ++-- .../jstorm/window/WindowedBoltExecutor.java | 10 ++------ .../jstorm/elasticsearch/common/EsConfig.java | 8 ++----- jstorm-utility/rocket-mq | 1 - 26 files changed, 83 insertions(+), 111 deletions(-) delete mode 160000 jstorm-utility/rocket-mq diff --git a/example/sequence-split-merge/src/main/java/com/alipay/dw/jstorm/example/newindow/FastWordCountTopNWindowTopology.java b/example/sequence-split-merge/src/main/java/com/alipay/dw/jstorm/example/newindow/FastWordCountTopNWindowTopology.java index 57bca7d40..0f5b5dcf1 100644 --- a/example/sequence-split-merge/src/main/java/com/alipay/dw/jstorm/example/newindow/FastWordCountTopNWindowTopology.java +++ b/example/sequence-split-merge/src/main/java/com/alipay/dw/jstorm/example/newindow/FastWordCountTopNWindowTopology.java @@ -34,7 +34,6 @@ import com.alibaba.jstorm.window.Time; import com.alibaba.jstorm.window.TimeWindow; import com.alibaba.starter.utils.JStormHelper; -import com.google.common.collect.Lists; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -44,6 +43,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; + /** * WordCount but the spout does not stop, and the bolts are implemented in java. * This can show how fast the word count can run. @@ -203,7 +204,7 @@ public void execute(Tuple tuple, Object state, TimeWindow window) { public void purgeWindow(Object state, TimeWindow window) { LOG.info("purging window: {}", window); final HashMap counts = (HashMap) state; - List keys = Lists.newArrayList(counts.keySet()); + List keys = new ArrayList<>(counts.keySet()); Collections.sort(keys, new Comparator() { @Override public int compare(String o1, String o2) { @@ -211,7 +212,7 @@ public int compare(String o1, String o2) { } }); - List pairs = Lists.newArrayListWithCapacity(n); + List pairs = new ArrayList<>(n); for (int i = 0; i < n; i++) { pairs.add(new Pair<>(keys.get(i), counts.get(keys.get(i)))); } @@ -264,7 +265,7 @@ public Object initWindowState(TimeWindow window) { @Override public void purgeWindow(Object state, TimeWindow window) { final HashMap counts = (HashMap) state; - List keys = Lists.newArrayList(counts.keySet()); + List keys = new ArrayList<>(counts.keySet()); Collections.sort(keys, new Comparator() { @Override public int compare(String o1, String o2) { diff --git a/example/sequence-split-merge/src/main/java/org/apache/storm/starter/tools/RankableObjectWithFields.java b/example/sequence-split-merge/src/main/java/org/apache/storm/starter/tools/RankableObjectWithFields.java index ca88f90f3..a4bbc524f 100644 --- a/example/sequence-split-merge/src/main/java/org/apache/storm/starter/tools/RankableObjectWithFields.java +++ b/example/sequence-split-merge/src/main/java/org/apache/storm/starter/tools/RankableObjectWithFields.java @@ -18,11 +18,11 @@ package org.apache.storm.starter.tools; import java.io.Serializable; +import java.util.ArrayList; import java.util.List; import backtype.storm.tuple.Tuple; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; /** * This class wraps an objects and its associated count, including any @@ -67,7 +67,7 @@ public RankableObjectWithFields(Object obj, long count, Object... otherFields) { * @return new instance based on the provided tuple */ public static RankableObjectWithFields from(Tuple tuple) { - List otherFields = Lists.newArrayList(tuple.getValues()); + List otherFields = new ArrayList<>(tuple.getValues()); Object obj = otherFields.remove(0); Long count = (Long) otherFields.remove(0); return new RankableObjectWithFields(obj, count, otherFields.toArray()); diff --git a/example/sequence-split-merge/src/main/java/org/apache/storm/starter/tools/Rankings.java b/example/sequence-split-merge/src/main/java/org/apache/storm/starter/tools/Rankings.java index cf0aa39d5..07f5d2a0a 100644 --- a/example/sequence-split-merge/src/main/java/org/apache/storm/starter/tools/Rankings.java +++ b/example/sequence-split-merge/src/main/java/org/apache/storm/starter/tools/Rankings.java @@ -18,10 +18,11 @@ package org.apache.storm.starter.tools; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; import java.io.Serializable; +import java.util.ArrayList; import java.util.Collections; +import java.util.LinkedList; import java.util.List; public class Rankings implements Serializable { @@ -30,7 +31,7 @@ public class Rankings implements Serializable { private static final int DEFAULT_COUNT = 10; private final int maxSize; - private final List rankedItems = Lists.newArrayList(); + private final List rankedItems = new ArrayList<>(); public Rankings() { this(DEFAULT_COUNT); @@ -80,7 +81,7 @@ public int size() { * @return a somewhat defensive copy of ranked items */ public List getRankings() { - List copy = Lists.newLinkedList(); + List copy = new LinkedList<>(); for (Rankable r : rankedItems) { copy.add(r.copy()); } diff --git a/jstorm-core/src/main/java/backtype/storm/command/blobstore.java b/jstorm-core/src/main/java/backtype/storm/command/blobstore.java index 0073141fe..dc5e18123 100644 --- a/jstorm-core/src/main/java/backtype/storm/command/blobstore.java +++ b/jstorm-core/src/main/java/backtype/storm/command/blobstore.java @@ -28,7 +28,6 @@ import com.alibaba.jstorm.cluster.StormClusterState; import com.alibaba.jstorm.cluster.StormConfig; import com.alibaba.jstorm.utils.PathUtils; -import com.google.common.collect.Sets; import java.io.FileInputStream; import java.io.IOException; @@ -65,7 +64,7 @@ public static void migrateOldTopologyFiles() throws Exception { init(); String stormRoot = StormConfig.masterStormdistRoot(conf); List topologies = PathUtils.read_dir_contents(stormRoot); - Set activeKeys = Sets.newHashSet(blobStore.listKeys()); + Set activeKeys = new HashSet<>(Arrays.asList(blobStore.listKeys())); for (String topologyId : topologies) { try { diff --git a/jstorm-core/src/main/java/backtype/storm/command/gray_upgrade.java b/jstorm-core/src/main/java/backtype/storm/command/gray_upgrade.java index d3fce4af0..2cee5893e 100644 --- a/jstorm-core/src/main/java/backtype/storm/command/gray_upgrade.java +++ b/jstorm-core/src/main/java/backtype/storm/command/gray_upgrade.java @@ -23,7 +23,6 @@ import backtype.storm.utils.NimbusClient; import backtype.storm.utils.Utils; import com.alibaba.jstorm.client.ConfigExtension; -import com.google.common.collect.Lists; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -35,6 +34,8 @@ import org.apache.commons.cli.Options; import org.apache.commons.lang.StringUtils; +import java.util.ArrayList; + /** * gray upgrade a topology * @@ -128,7 +129,7 @@ public static void main(String[] args) throws Exception { if (commandLine.hasOption("w")) { String w = commandLine.getOptionValue("w"); if (!StringUtils.isBlank(w)) { - workers = Lists.newArrayList(); + workers = new ArrayList<>(); String[] parts = w.split(","); for (String part : parts) { if (part.split(":").length == 2) { diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/blobstore/BlobStoreUtils.java b/jstorm-core/src/main/java/com/alibaba/jstorm/blobstore/BlobStoreUtils.java index 4410e93b5..8870afd63 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/blobstore/BlobStoreUtils.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/blobstore/BlobStoreUtils.java @@ -30,8 +30,6 @@ import com.alibaba.jstorm.metric.JStormMetrics; import com.alibaba.jstorm.utils.JStormServerUtils; import com.alibaba.jstorm.utils.JStormUtils; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; import java.net.URL; import org.apache.commons.io.FileExistsException; import org.apache.commons.io.FileUtils; @@ -341,7 +339,7 @@ public static void downloadResourcesAsSupervisor(String key, String localFile, C CuratorFramework zkClient = null; try { zkClient = BlobStoreUtils.createZKClient(conf); - nimbusInfos = Lists.newArrayList(BlobStoreUtils.getNimbodesWithLatestSequenceNumberOfBlob(zkClient, key)); + nimbusInfos = new ArrayList<>(BlobStoreUtils.getNimbodesWithLatestSequenceNumberOfBlob(zkClient, key)); Collections.shuffle(nimbusInfos); } catch (Exception e) { LOG.error("get available nimbus for blob key:{} error", e); @@ -481,7 +479,7 @@ public String filter(String key) { return null; } }; - return Sets.newHashSet(filterAndListKeys(iterator, keyFilter)); + return new HashSet<>(Arrays.asList(filterAndListKeys(iterator, keyFilter))); } // remove blob information in zk for the blobkey diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/cache/rocksdb/RocksDbFactory.java b/jstorm-core/src/main/java/com/alibaba/jstorm/cache/rocksdb/RocksDbFactory.java index d24988fdc..f559d245e 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/cache/rocksdb/RocksDbFactory.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/cache/rocksdb/RocksDbFactory.java @@ -19,7 +19,6 @@ import backtype.storm.utils.Utils; import com.alibaba.jstorm.client.ConfigExtension; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.commons.io.FileUtils; import org.rocksdb.*; @@ -104,7 +103,7 @@ dbOptions, rocksDbDir, columnFamilyDescriptors, columnFamilyHandles, getTtlValue } private static List getTtlValues(int ttlSec, List descriptors) { - List ttlValues = Lists.newArrayList(); + List ttlValues = new ArrayList<>(); for (ColumnFamilyDescriptor descriptor : descriptors) { ttlValues.add(ttlSec); } @@ -113,7 +112,7 @@ private static List getTtlValues(int ttlSec, List getExistingColumnFamilyDesc(Map conf, String dbPath) throws IOException { try { - List families = Lists.newArrayList(); + List families = new ArrayList<>(); List existingFamilies = RocksDB.listColumnFamilies(getOptions(conf), dbPath); if (existingFamilies != null) { families.addAll(existingFamilies); diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/RollbackTransitionCallback.java b/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/RollbackTransitionCallback.java index b1538181f..68958f414 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/RollbackTransitionCallback.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/RollbackTransitionCallback.java @@ -27,11 +27,13 @@ import com.alibaba.jstorm.daemon.nimbus.NimbusData; import com.alibaba.jstorm.daemon.nimbus.StatusType; import com.alibaba.jstorm.task.upgrade.GrayUpgradeConfig; -import com.google.common.collect.Sets; import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; +import java.util.HashSet; + /** * @author wange * @since 24/02/2017 @@ -74,7 +76,7 @@ public Object execute(T... args) { upgradeConfig.setRollback(true); stormClusterState.set_gray_upgrade_conf(topologyId, upgradeConfig); - Set upgradedWorkers = Sets.newHashSet(stormClusterState.get_upgraded_workers(topologyId)); + Set upgradedWorkers = new HashSet<>(Arrays.asList(stormClusterState.get_upgraded_workers(topologyId))); if (upgradedWorkers.size() > 0) { LOG.info("Setting rollback workers:{}", upgradedWorkers); for (String upgradedWorker : upgradedWorkers) { diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormZkClusterState.java b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormZkClusterState.java index 882fe5a3b..98e87156f 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormZkClusterState.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormZkClusterState.java @@ -32,7 +32,6 @@ import com.alibaba.jstorm.utils.JStormUtils; import com.alibaba.jstorm.utils.PathUtils; import com.alibaba.jstorm.utils.TimeUtils; -import com.google.common.collect.Lists; import com.google.common.collect.Sets; import java.util.ArrayList; import java.util.Collections; @@ -844,7 +843,7 @@ public List get_blacklist() throws Exception { public List get_upgrading_topologies() throws Exception { List ret = cluster_state.get_children(Cluster.GRAY_UPGRADE_SUBTREE, false); if (ret == null) { - ret = Lists.newArrayList(); + ret = new ArrayList<>(); } return ret; } @@ -905,4 +904,4 @@ public void remove_gray_upgrade_info(String topologyId) throws Exception { cluster_state.delete_node(Cluster.gray_upgrade_upgraded_worker_path(topologyId, upgradedWorker)); } } -} \ No newline at end of file +} diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmMetric.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmMetric.java index 8116c762f..ea55863bb 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmMetric.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmMetric.java @@ -47,7 +47,7 @@ public abstract class AsmMetric { protected static int minWindow = getMinWindow(windowSeconds); private static final int FLUSH_INTERVAL_BIAS = 5; - protected static final List EMPTY_WIN = Lists.newArrayListWithCapacity(0); + protected static final List EMPTY_WIN = new ArrayList<>(0); /** * sample rate for meter, histogram and timer, note that counter & gauge are not sampled. diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/config/YarnConfigBlacklist.java b/jstorm-core/src/main/java/com/alibaba/jstorm/config/YarnConfigBlacklist.java index 6e28801c2..bdb14f2b4 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/config/YarnConfigBlacklist.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/config/YarnConfigBlacklist.java @@ -28,11 +28,12 @@ import java.util.Set; import java.util.regex.Pattern; -import com.google.common.collect.Lists; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; + /** * yarn config black list, note that ONLY plain K-V is supported, list/map values are not supported!!! * so if you intend to set list values, put the values on a single line like ['xx', 'xx']. @@ -100,7 +101,7 @@ public String filterConfigIfNecessary(String confData) { StringBuilder sb = new StringBuilder(4096); Iterable lines = splitLines(confData); - List lineArray = Lists.newArrayList(lines); + List lineArray = new ArrayList<>(lines); for (int i = 0; i < lineArray.size(); i++) { String trimmedLine = lineArray.get(i).trim(); if (!trimmedLine.startsWith("#") && trimmedLine.contains(":")) { @@ -138,7 +139,7 @@ public String getRetainedConfig(String confData) { StringBuilder sb = new StringBuilder(); Iterable lines = splitLines(confData); - List lineArray = Lists.newArrayList(lines); + List lineArray = new ArrayList<>(lines); for (int i = 0; i < lineArray.size(); i++) { String trimmedLine = lineArray.get(i).trim(); if (!trimmedLine.startsWith("#") && trimmedLine.contains(":")) { diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusUtils.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusUtils.java index 2001b5967..957beb7c6 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusUtils.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusUtils.java @@ -52,14 +52,7 @@ import com.google.common.collect.Sets; import java.security.InvalidParameterException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.Date; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.Map.Entry; import java.util.Set; @@ -314,11 +307,11 @@ public static void cleanupCorruptTopologies(NimbusData data) throws Exception { BlobStore blobStore = data.getBlobStore(); // we have only topology relative files , so we don't need filter - Set code_ids = Sets.newHashSet(BlobStoreUtils.code_ids(blobStore.listKeys())); + Set code_ids = new HashSet<>(Arrays.asList(BlobStoreUtils.code_ids(blobStore.listKeys()))); // get topology in ZK /storms - Set active_ids = Sets.newHashSet(data.getStormClusterState().active_storms()); + Set active_ids = new HashSet<>(Arrays.asList(data.getStormClusterState().active_storms())); //get topology in zk by blobs - Set blobsIdsOnZk = Sets.newHashSet(data.getStormClusterState().blobstore(null)); + Set blobsIdsOnZk = new HashSet<>(Arrays.asList(data.getStormClusterState().blobstore(null))); Set topologyIdsOnZkbyBlobs = BlobStoreUtils.code_ids(blobsIdsOnZk.iterator()); Set corrupt_ids = Sets.difference(active_ids, code_ids); diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/ServiceHandler.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/ServiceHandler.java index 64ecba82b..fcb0a3d99 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/ServiceHandler.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/ServiceHandler.java @@ -25,7 +25,6 @@ import com.alibaba.jstorm.utils.LoadConf; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; @@ -35,11 +34,7 @@ import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.Map.Entry; import java.util.Set; import java.util.TreeMap; @@ -2011,7 +2006,7 @@ public void grayUpgrade(String topologyName, String component, List work throw new NotAliveException(topologyName); } - Set workerSet = (workers == null) ? Sets.newHashSet() : Sets.newHashSet(workers); + Set workerSet = (workers == null) ? new HashSet<>() : new HashSet<>(workers); try { grayUpgrade(topologyId, null, null, Maps.newHashMap(), component, workerSet, workerNum); } catch (Exception ex) { @@ -2079,8 +2074,8 @@ private String grayUpgrade(String topologyId, String uploadedLocation, Assignment assignment = stormClusterState.assignment_info(topologyId, null); if (upgradeConfig != null) { LOG.info("Got existing gray upgrade config:{}", upgradeConfig); - Set upgradingWorkers = Sets.newHashSet(stormClusterState.get_upgrading_workers(topologyId)); - Set upgradedWorkers = Sets.newHashSet(stormClusterState.get_upgraded_workers(topologyId)); + Set upgradingWorkers = new HashSet<>(Arrays.asList(stormClusterState.get_upgrading_workers(topologyId))); + Set upgradedWorkers = new HashSet<>(Arrays.asList(stormClusterState.get_upgraded_workers(topologyId))); int upgradingWorkerNum = upgradingWorkers.size(); int upgradedWorkerNum = upgradedWorkers.size(); int totalWorkerNum = assignment.getWorkers().size() - 1; diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/metric/refresh/RefreshEvent.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/metric/refresh/RefreshEvent.java index b0f7834bc..fd31be010 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/metric/refresh/RefreshEvent.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/metric/refresh/RefreshEvent.java @@ -21,11 +21,7 @@ import com.alibaba.jstorm.utils.Pair; import com.alibaba.jstorm.utils.TimeUtils; import com.google.common.collect.Lists; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -47,7 +43,6 @@ import com.alibaba.jstorm.metric.TopologyMetricContext; import com.alibaba.jstorm.schedule.Assignment; import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot; -import com.google.common.collect.Sets; /** * Sync meta from cache and remote @@ -108,7 +103,7 @@ private void doRefreshTopologies() { //there's no need to consider sample rate when cluster metrics merge conf.put(ConfigExtension.TOPOLOGY_METRIC_SAMPLE_RATE, 1.0); } - Set workerSlot = Sets.newHashSet(new ResourceWorkerSlot()); + Set workerSlot = new HashSet<>(Arrays.asList(new ResourceWorkerSlot())); TopologyMetricContext metricContext = new TopologyMetricContext(topology, workerSlot, conf); context.getTopologyMetricContexts().putIfAbsent(topology, metricContext); syncMetaFromCache(topology, context.getTopologyMetricContexts().get(topology)); diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/ShutdownWork.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/ShutdownWork.java index 4b9749247..8bc3bd313 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/ShutdownWork.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/ShutdownWork.java @@ -17,8 +17,8 @@ */ package com.alibaba.jstorm.daemon.supervisor; -import com.google.common.collect.Lists; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -77,7 +77,7 @@ public void shutWorker(Map conf, try { pids = getPid(conf, workerId); } catch (IOException e1) { - pids = Lists.newArrayList(); + pids = new ArrayList<>(); LOG.error("Failed to get pid for " + workerId + " of " + topologyId); } workerId2Pids.put(workerId, pids); diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SyncSupervisorEvent.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SyncSupervisorEvent.java index 2454ca775..c88c46bd9 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SyncSupervisorEvent.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SyncSupervisorEvent.java @@ -33,7 +33,6 @@ import com.alibaba.jstorm.utils.Pair; import com.alibaba.jstorm.utils.PathUtils; import com.alibaba.jstorm.utils.TimeUtils; -import com.google.common.collect.Sets; import org.apache.commons.io.FileUtils; import org.apache.thrift.TException; import org.slf4j.Logger; @@ -399,7 +398,7 @@ private Map>> getUpgradeTopologies(StormCluste Map>> ret = new HashMap<>(); try { - Set upgradingTopologies = Sets.newHashSet(stormClusterState.get_upgrading_topologies()); + Set upgradingTopologies = new HashSet<>(Arrays.asList(stormClusterState.get_upgrading_topologies())); for (String topologyId : upgradingTopologies) { List upgradingWorkers = stormClusterState.get_upgrading_workers(topologyId); for (String worker : upgradingWorkers) { diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/DefaultMetricQueryClient.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/DefaultMetricQueryClient.java index b6df727f3..8e16044b8 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/DefaultMetricQueryClient.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/DefaultMetricQueryClient.java @@ -20,8 +20,8 @@ import com.alibaba.jstorm.common.metric.MetricMeta; import com.alibaba.jstorm.common.metric.TaskTrack; import com.alibaba.jstorm.common.metric.TopologyHistory; -import com.google.common.collect.Lists; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -48,32 +48,32 @@ public String getIdentity(Map conf) { @Override public List getMetricMeta(String clusterName, String topologyId, MetaType type, MetaFilter filter, Object arg) { - return Lists.newArrayList(); + return new ArrayList<>(); } @Override public List getMetricMeta(String clusterName, String topologyId, MetaType type) { - return Lists.newArrayList(); + return new ArrayList<>(); } @Override public List getWorkerMeta(String clusterName, String topologyId) { - return Lists.newArrayList(); + return new ArrayList<>(); } @Override public List getNettyMeta(String clusterName, String topologyId) { - return Lists.newArrayList(); + return new ArrayList<>(); } @Override public List getTaskMeta(String clusterName, String topologyId, int taskId) { - return Lists.newArrayList(); + return new ArrayList<>(); } @Override public List getComponentMeta(String clusterName, String topologyId, String componentId) { - return Lists.newArrayList(); + return new ArrayList<>(); } @Override @@ -88,27 +88,27 @@ public MetricMeta getMetricMeta(String key) { @Override public List getMetricData(String metricId, MetricType metricType, int win, long start, long end) { - return Lists.newArrayList(); + return new ArrayList<>(); } @Override public List getMetricData(String metricId, MetricType metricType, int win, long start, long end, int size) { - return Lists.newArrayList(); + return new ArrayList<>(); } @Override public List getTaskTrack(String clusterName, String topologyId) { - return Lists.newArrayList(); + return new ArrayList<>(); } @Override public List getTaskTrack(String clusterName, String topologyId, int taskId) { - return Lists.newArrayList(); + return new ArrayList<>(); } @Override public List getTopologyHistory(String clusterName, String topologyName, int size) { - return Lists.newArrayList(); + return new ArrayList<>(); } @Override diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormMetricCache.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormMetricCache.java index 047c63bb4..7c95a856c 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormMetricCache.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormMetricCache.java @@ -27,7 +27,6 @@ import com.alibaba.jstorm.cluster.StormClusterState; import com.alibaba.jstorm.cluster.StormConfig; import com.alibaba.jstorm.utils.OSInfo; -import com.google.common.collect.Lists; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -250,7 +249,7 @@ public List getMetricData(String topologyId, MetaType metaType) { retMap.put((Long) objects[0], (MetricInfo) objects[1]); } } - List ret = Lists.newArrayList(retMap.values()); + List ret = new ArrayList<>(retMap.values()); int cnt = 0; for (MetricInfo metricInfo : ret) { cnt += metricInfo.get_metrics_size(); diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormMetrics.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormMetrics.java index 6ce997f20..cd48daf5f 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormMetrics.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormMetrics.java @@ -24,8 +24,6 @@ import com.alibaba.jstorm.common.metric.snapshot.AsmSnapshot; import com.alibaba.jstorm.utils.NetWorkUtils; import com.google.common.base.Joiner; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +48,7 @@ public class JStormMetrics implements Serializable { NIMBUS_METRIC_KEY, CLUSTER_METRIC_KEY, SUPERVISOR_METRIC_KEY }; - public static final Set SYS_TOPOLOGY_SET = Sets.newHashSet(SYS_TOPOLOGIES); + public static final Set SYS_TOPOLOGY_SET = new HashSet<>(Arrays.asList(SYS_TOPOLOGIES)); public static final String DEFAULT_GROUP = "sys"; public static final String NETTY_GROUP = "netty"; @@ -188,11 +186,11 @@ public static List search(String metricName, MetaType metaType, Metri } else if (metaType == MetaType.WORKER) { return search(workerMetrics, metricName, metricType); } - return Lists.newArrayList(); + return new ArrayList<>(); } private static List search(AsmMetricRegistry registry, String metricName, MetricType metricType) { - List ret = Lists.newArrayList(); + List ret = new ArrayList<>(); Map metricMap; if (metricType == MetricType.COUNTER) { @@ -402,7 +400,7 @@ public static MetricInfo computeAllMetrics() { long start = System.currentTimeMillis(); MetricInfo metricInfo = MetricUtils.mkMetricInfo(); - List> entries = Lists.newLinkedList(); + List> entries = new LinkedList<>(); if (enableStreamMetrics) { entries.addAll(streamMetrics.metrics.entrySet()); } @@ -530,7 +528,7 @@ public static void mergeLevelMetricSnapshot(Map assocMetrics = metric.getAssocMetrics(); MetricType metricType = MetricUtils.metricType(metric.getMetricName()); - List asmMetricList = Lists.newLinkedList(assocMetrics); + List asmMetricList = new LinkedList<>(assocMetrics); asmMetricList.add(metric); // snapshot of the root metric: stream/task diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/FollowerRunnable.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/FollowerRunnable.java index 2710cccc9..bb14fb177 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/FollowerRunnable.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/FollowerRunnable.java @@ -134,8 +134,8 @@ public void run() { private void setupBlobstore() throws Exception { BlobStore blobStore = data.getBlobStore(); StormClusterState clusterState = data.getStormClusterState(); - Set localSetOfKeys = Sets.newHashSet(blobStore.listKeys()); - Set allKeys = Sets.newHashSet(clusterState.active_keys()); + Set localSetOfKeys = new HashSet<>(Arrays.asList(blobStore.listKeys())); + Set allKeys = new HashSet<>(Arrays.asList(clusterState.active_keys())); Set localAvailableActiveKeys = Sets.intersection(localSetOfKeys, allKeys); // keys on local but not on zk, we will delete it Set keysToDelete = Sets.difference(localSetOfKeys, allKeys); @@ -221,8 +221,8 @@ private synchronized void blobSync() { try { BlobStore blobStore = data.getBlobStore(); StormClusterState clusterState = data.getStormClusterState(); - Set localKeys = Sets.newHashSet(blobStore.listKeys()); - Set zkKeys = Sets.newHashSet(clusterState.blobstore(blobSyncCallback)); + Set localKeys = new HashSet<>(Arrays.asList(blobStore.listKeys())); + Set zkKeys = new HashSet<>(Arrays.asList(clusterState.blobstore(blobSyncCallback))); BlobSynchronizer blobSynchronizer = new BlobSynchronizer(blobStore, data.getConf()); blobSynchronizer.setNimbusInfo(data.getNimbusHostPortInfo()); blobSynchronizer.setBlobStoreKeySet(localKeys); @@ -305,8 +305,8 @@ private int update_nimbus_detail() throws Exception { // but if we use local blobstore, we should count topologies files int diffCount = 0; if (data.getBlobStore() instanceof LocalFsBlobStore) { - Set keysOnZk = Sets.newHashSet(zkClusterState.active_keys()); - Set keysOnLocal = Sets.newHashSet(data.getBlobStore().listKeys()); + Set keysOnZk = new HashSet<>(Arrays.asList(zkClusterState.active_keys())); + Set keysOnLocal = new HashSet<>(Arrays.asList(data.getBlobStore().listKeys())); // we count number of keys which is on zk but not on local diffCount = Sets.difference(keysOnZk, keysOnLocal).size(); } diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/master/GrayUpgradeHandler.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/master/GrayUpgradeHandler.java index b08f2aa17..e26cacfa3 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/master/GrayUpgradeHandler.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/master/GrayUpgradeHandler.java @@ -23,7 +23,6 @@ import com.alibaba.jstorm.daemon.nimbus.StatusType; import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot; import com.alibaba.jstorm.task.upgrade.GrayUpgradeConfig; -import com.google.common.collect.Sets; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -34,6 +33,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; + /** * @author wange * @since 2.3.1 @@ -62,7 +63,7 @@ public void init(TopologyMasterContext tmContext) { for (ResourceWorkerSlot workerSlot : tmContext.getWorkerSet().get()) { Set tasks = workerSlot.getTasks(); String hostPort = workerSlot.getHostPort(); - hostPortToTasks.put(hostPort, Sets.newHashSet(tasks)); + hostPortToTasks.put(hostPort, new HashSet<>(tasks)); for (Integer task : tasks) { this.taskToHostPort.put(task, hostPort); @@ -115,7 +116,7 @@ public void run() { } // notify current upgrading workers to upgrade (again) - Set upgradingWorkers = Sets.newHashSet(stormClusterState.get_upgrading_workers(topologyId)); + Set upgradingWorkers = new HashSet<>(Arrays.asList(stormClusterState.get_upgrading_workers(topologyId))); if (upgradingWorkers.size() > 0) { LOG.info("Following workers are under upgrade:{}", upgradingWorkers); for (String worker : upgradingWorkers) { @@ -124,7 +125,7 @@ public void run() { return; } - Set upgradedWorkers = Sets.newHashSet(stormClusterState.get_upgraded_workers(topologyId)); + Set upgradedWorkers = new HashSet<>(Arrays.asList(stormClusterState.get_upgraded_workers(topologyId))); if (grayUpgradeConf.isRollback()) { LOG.info("Rollback has completed, removing upgrade info in zk and updating storm status..."); // there's no way back after a rollback diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/transactional/bolt/KvStatefulBoltExecutor.java b/jstorm-core/src/main/java/com/alibaba/jstorm/transactional/bolt/KvStatefulBoltExecutor.java index da5ed23f4..e1093f350 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/transactional/bolt/KvStatefulBoltExecutor.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/transactional/bolt/KvStatefulBoltExecutor.java @@ -24,7 +24,8 @@ import org.slf4j.Logger; -import com.google.common.collect.Lists; +import java.util.ArrayList; + import com.google.common.collect.Maps; import backtype.storm.generated.GlobalStreamId; @@ -81,7 +82,7 @@ private IKvState getSpecifiedKeyRangeState(Tuple input, Fields fields) { if (fields.size() == 1) { key = input.getValueByField(fields.get(0)); } else { - List fieldedValuesTobeHash = Lists.newArrayList(); + List fieldedValuesTobeHash = new ArrayList<>(); for (String field : fields) { fieldedValuesTobeHash.add(input.getValueByField(field)); } @@ -89,4 +90,4 @@ private IKvState getSpecifiedKeyRangeState(Tuple input, Fields fields) { } return keyRangeState.getRangeStateByKey(key); } -} \ No newline at end of file +} diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/window/SlidingCountWindows.java b/jstorm-core/src/main/java/com/alibaba/jstorm/window/SlidingCountWindows.java index b99166a23..2408034af 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/window/SlidingCountWindows.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/window/SlidingCountWindows.java @@ -20,7 +20,8 @@ import backtype.storm.task.TopologyContext; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.TupleImpl; -import com.google.common.collect.Lists; + +import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -49,7 +50,7 @@ public Collection assignWindows(T element, long timestamp) { start = 0; } - List windows = Lists.newArrayList(); + List windows = new ArrayList<>(); for (long nextStart = start; offset >= nextStart; nextStart += slide) { if (offset < nextStart + size) { windows.add(new TimeWindow(nextStart, nextStart + size)); diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/window/WindowedBoltExecutor.java b/jstorm-core/src/main/java/com/alibaba/jstorm/window/WindowedBoltExecutor.java index 157847f48..2373ce995 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/window/WindowedBoltExecutor.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/window/WindowedBoltExecutor.java @@ -31,13 +31,7 @@ import com.alibaba.jstorm.transactional.state.ITransactionStateOperator; import com.alibaba.jstorm.utils.JStormUtils; import com.alibaba.jstorm.utils.TimeUtils; -import com.google.common.collect.Sets; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledFuture; @@ -316,7 +310,7 @@ public void execute(Tuple input) { if (WindowAssigner.isSessionTime(windowAssigner)) { // session windows don't slide TimeWindow sessionWindow = windows.iterator().next(); - Set oldWindows = Sets.newHashSet(windowToTriggers.keySet()); + Set oldWindows = new HashSet<>(Arrays.asList(windowToTriggers.keySet())); for (TimeWindow oldWindow : oldWindows) { if (!oldWindow.equals(sessionWindow)) { TimeWindow mergedWindow = mergeSessionWindows(oldWindow, sessionWindow); diff --git a/jstorm-utility/jstorm-elasticsearch/src/main/java/com/alibaba/jstorm/elasticsearch/common/EsConfig.java b/jstorm-utility/jstorm-elasticsearch/src/main/java/com/alibaba/jstorm/elasticsearch/common/EsConfig.java index c1133837a..8c93507ef 100644 --- a/jstorm-utility/jstorm-elasticsearch/src/main/java/com/alibaba/jstorm/elasticsearch/common/EsConfig.java +++ b/jstorm-utility/jstorm-elasticsearch/src/main/java/com/alibaba/jstorm/elasticsearch/common/EsConfig.java @@ -20,17 +20,13 @@ import java.io.Serializable; import java.net.InetAddress; import java.net.UnknownHostException; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.transport.TransportAddress; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; public class EsConfig implements Serializable { @@ -69,7 +65,7 @@ private void checkArguments() { } List getTransportAddresses() throws UnknownHostException { - List transportAddresses = Lists.newArrayList(); + List transportAddresses = new ArrayList<>(); for (String node : nodes) { String[] hostAndPort = node.split(DELIMITER); Preconditions.checkArgument(hostAndPort.length == 2, diff --git a/jstorm-utility/rocket-mq b/jstorm-utility/rocket-mq deleted file mode 160000 index 372e9d876..000000000 --- a/jstorm-utility/rocket-mq +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 372e9d87667272e6da7b5501d6d7dd2bad41ce6f