Skip to content
This repository was archived by the owner on Jun 16, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import com.alibaba.jstorm.window.Time;
import com.alibaba.jstorm.window.TimeWindow;
import com.alibaba.starter.utils.JStormHelper;
import com.google.common.collect.Lists;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
Expand All @@ -44,6 +43,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;

/**
* WordCount but the spout does not stop, and the bolts are implemented in java.
* This can show how fast the word count can run.
Expand Down Expand Up @@ -203,15 +204,15 @@ public void execute(Tuple tuple, Object state, TimeWindow window) {
public void purgeWindow(Object state, TimeWindow window) {
LOG.info("purging window: {}", window);
final HashMap<String, Integer> counts = (HashMap<String, Integer>) state;
List<String> keys = Lists.newArrayList(counts.keySet());
List<String> keys = new ArrayList<>(counts.keySet());
Collections.sort(keys, new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
return -counts.get(o1).compareTo(counts.get(o2));
}
});

List<Object> pairs = Lists.newArrayListWithCapacity(n);
List<Object> pairs = new ArrayList<>(n);
for (int i = 0; i < n; i++) {
pairs.add(new Pair<>(keys.get(i), counts.get(keys.get(i))));
}
Expand Down Expand Up @@ -264,7 +265,7 @@ public Object initWindowState(TimeWindow window) {
@Override
public void purgeWindow(Object state, TimeWindow window) {
final HashMap<String, Integer> counts = (HashMap<String, Integer>) state;
List<String> keys = Lists.newArrayList(counts.keySet());
List<String> keys = new ArrayList<>(counts.keySet());
Collections.sort(keys, new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
package org.apache.storm.starter.tools;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import backtype.storm.tuple.Tuple;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;

/**
* This class wraps an objects and its associated count, including any
Expand Down Expand Up @@ -67,7 +67,7 @@ public RankableObjectWithFields(Object obj, long count, Object... otherFields) {
* @return new instance based on the provided tuple
*/
public static RankableObjectWithFields from(Tuple tuple) {
List<Object> otherFields = Lists.newArrayList(tuple.getValues());
List<Object> otherFields = new ArrayList<>(tuple.getValues());
Object obj = otherFields.remove(0);
Long count = (Long) otherFields.remove(0);
return new RankableObjectWithFields(obj, count, otherFields.toArray());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
package org.apache.storm.starter.tools;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;

public class Rankings implements Serializable {
Expand All @@ -30,7 +31,7 @@ public class Rankings implements Serializable {
private static final int DEFAULT_COUNT = 10;

private final int maxSize;
private final List<Rankable> rankedItems = Lists.newArrayList();
private final List<Rankable> rankedItems = new ArrayList<>();

public Rankings() {
this(DEFAULT_COUNT);
Expand Down Expand Up @@ -80,7 +81,7 @@ public int size() {
* @return a somewhat defensive copy of ranked items
*/
public List<Rankable> getRankings() {
List<Rankable> copy = Lists.newLinkedList();
List<Rankable> copy = new LinkedList<>();
for (Rankable r : rankedItems) {
copy.add(r.copy());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.utils.PathUtils;
import com.google.common.collect.Sets;

import java.io.FileInputStream;
import java.io.IOException;
Expand Down Expand Up @@ -65,7 +64,7 @@ public static void migrateOldTopologyFiles() throws Exception {
init();
String stormRoot = StormConfig.masterStormdistRoot(conf);
List<String> topologies = PathUtils.read_dir_contents(stormRoot);
Set<String> activeKeys = Sets.newHashSet(blobStore.listKeys());
Set<String> activeKeys = new HashSet<>(Arrays.asList(blobStore.listKeys()));

for (String topologyId : topologies) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import backtype.storm.utils.NimbusClient;
import backtype.storm.utils.Utils;
import com.alibaba.jstorm.client.ConfigExtension;
import com.google.common.collect.Lists;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand All @@ -35,6 +34,8 @@
import org.apache.commons.cli.Options;
import org.apache.commons.lang.StringUtils;

import java.util.ArrayList;

/**
* gray upgrade a topology
*
Expand Down Expand Up @@ -128,7 +129,7 @@ public static void main(String[] args) throws Exception {
if (commandLine.hasOption("w")) {
String w = commandLine.getOptionValue("w");
if (!StringUtils.isBlank(w)) {
workers = Lists.newArrayList();
workers = new ArrayList<>();
String[] parts = w.split(",");
for (String part : parts) {
if (part.split(":").length == 2) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
import com.alibaba.jstorm.metric.JStormMetrics;
import com.alibaba.jstorm.utils.JStormServerUtils;
import com.alibaba.jstorm.utils.JStormUtils;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.net.URL;
import org.apache.commons.io.FileExistsException;
import org.apache.commons.io.FileUtils;
Expand Down Expand Up @@ -341,7 +339,7 @@ public static void downloadResourcesAsSupervisor(String key, String localFile, C
CuratorFramework zkClient = null;
try {
zkClient = BlobStoreUtils.createZKClient(conf);
nimbusInfos = Lists.newArrayList(BlobStoreUtils.getNimbodesWithLatestSequenceNumberOfBlob(zkClient, key));
nimbusInfos = new ArrayList<>(BlobStoreUtils.getNimbodesWithLatestSequenceNumberOfBlob(zkClient, key));
Collections.shuffle(nimbusInfos);
} catch (Exception e) {
LOG.error("get available nimbus for blob key:{} error", e);
Expand Down Expand Up @@ -481,7 +479,7 @@ public String filter(String key) {
return null;
}
};
return Sets.newHashSet(filterAndListKeys(iterator, keyFilter));
return new HashSet<>(Arrays.asList(filterAndListKeys(iterator, keyFilter)));
}

// remove blob information in zk for the blobkey
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import backtype.storm.utils.Utils;
import com.alibaba.jstorm.client.ConfigExtension;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.io.FileUtils;
import org.rocksdb.*;
Expand Down Expand Up @@ -104,7 +103,7 @@ dbOptions, rocksDbDir, columnFamilyDescriptors, columnFamilyHandles, getTtlValue
}

private static List<Integer> getTtlValues(int ttlSec, List<ColumnFamilyDescriptor> descriptors) {
List<Integer> ttlValues = Lists.newArrayList();
List<Integer> ttlValues = new ArrayList<>();
for (ColumnFamilyDescriptor descriptor : descriptors) {
ttlValues.add(ttlSec);
}
Expand All @@ -113,7 +112,7 @@ private static List<Integer> getTtlValues(int ttlSec, List<ColumnFamilyDescripto

private static List<ColumnFamilyDescriptor> getExistingColumnFamilyDesc(Map conf, String dbPath) throws IOException {
try {
List<byte[]> families = Lists.newArrayList();
List<byte[]> families = new ArrayList<>();
List<byte[]> existingFamilies = RocksDB.listColumnFamilies(getOptions(conf), dbPath);
if (existingFamilies != null) {
families.addAll(existingFamilies);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@
import com.alibaba.jstorm.daemon.nimbus.NimbusData;
import com.alibaba.jstorm.daemon.nimbus.StatusType;
import com.alibaba.jstorm.task.upgrade.GrayUpgradeConfig;
import com.google.common.collect.Sets;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.HashSet;

/**
* @author wange
* @since 24/02/2017
Expand Down Expand Up @@ -74,7 +76,7 @@ public <T> Object execute(T... args) {
upgradeConfig.setRollback(true);
stormClusterState.set_gray_upgrade_conf(topologyId, upgradeConfig);

Set<String> upgradedWorkers = Sets.newHashSet(stormClusterState.get_upgraded_workers(topologyId));
Set<String> upgradedWorkers = new HashSet<>(Arrays.asList(stormClusterState.get_upgraded_workers(topologyId)));
if (upgradedWorkers.size() > 0) {
LOG.info("Setting rollback workers:{}", upgradedWorkers);
for (String upgradedWorker : upgradedWorkers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.PathUtils;
import com.alibaba.jstorm.utils.TimeUtils;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -844,7 +843,7 @@ public List<String> get_blacklist() throws Exception {
public List<String> get_upgrading_topologies() throws Exception {
List<String> ret = cluster_state.get_children(Cluster.GRAY_UPGRADE_SUBTREE, false);
if (ret == null) {
ret = Lists.newArrayList();
ret = new ArrayList<>();
}
return ret;
}
Expand Down Expand Up @@ -905,4 +904,4 @@ public void remove_gray_upgrade_info(String topologyId) throws Exception {
cluster_state.delete_node(Cluster.gray_upgrade_upgraded_worker_path(topologyId, upgradedWorker));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public abstract class AsmMetric<T extends Metric> {

protected static int minWindow = getMinWindow(windowSeconds);
private static final int FLUSH_INTERVAL_BIAS = 5;
protected static final List<Integer> EMPTY_WIN = Lists.newArrayListWithCapacity(0);
protected static final List<Integer> EMPTY_WIN = new ArrayList<>(0);

/**
* sample rate for meter, histogram and timer, note that counter & gauge are not sampled.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@
import java.util.Set;
import java.util.regex.Pattern;

import com.google.common.collect.Lists;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;

/**
* yarn config black list, note that ONLY plain K-V is supported, list/map values are not supported!!!
* so if you intend to set list values, put the values on a single line like ['xx', 'xx'].
Expand Down Expand Up @@ -100,7 +101,7 @@ public String filterConfigIfNecessary(String confData) {

StringBuilder sb = new StringBuilder(4096);
Iterable<String> lines = splitLines(confData);
List<String> lineArray = Lists.newArrayList(lines);
List<String> lineArray = new ArrayList<>(lines);
for (int i = 0; i < lineArray.size(); i++) {
String trimmedLine = lineArray.get(i).trim();
if (!trimmedLine.startsWith("#") && trimmedLine.contains(":")) {
Expand Down Expand Up @@ -138,7 +139,7 @@ public String getRetainedConfig(String confData) {

StringBuilder sb = new StringBuilder();
Iterable<String> lines = splitLines(confData);
List<String> lineArray = Lists.newArrayList(lines);
List<String> lineArray = new ArrayList<>(lines);
for (int i = 0; i < lineArray.size(); i++) {
String trimmedLine = lineArray.get(i).trim();
if (!trimmedLine.startsWith("#") && trimmedLine.contains(":")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,7 @@
import com.google.common.collect.Sets;

import java.security.InvalidParameterException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.Map.Entry;
import java.util.Set;

Expand Down Expand Up @@ -314,11 +307,11 @@ public static void cleanupCorruptTopologies(NimbusData data) throws Exception {
BlobStore blobStore = data.getBlobStore();

// we have only topology relative files , so we don't need filter
Set<String> code_ids = Sets.newHashSet(BlobStoreUtils.code_ids(blobStore.listKeys()));
Set<String> code_ids = new HashSet<>(Arrays.asList(BlobStoreUtils.code_ids(blobStore.listKeys())));
// get topology in ZK /storms
Set<String> active_ids = Sets.newHashSet(data.getStormClusterState().active_storms());
Set<String> active_ids = new HashSet<>(Arrays.asList(data.getStormClusterState().active_storms()));
//get topology in zk by blobs
Set<String> blobsIdsOnZk = Sets.newHashSet(data.getStormClusterState().blobstore(null));
Set<String> blobsIdsOnZk = new HashSet<>(Arrays.asList(data.getStormClusterState().blobstore(null)));
Set<String> topologyIdsOnZkbyBlobs = BlobStoreUtils.code_ids(blobsIdsOnZk.iterator());

Set<String> corrupt_ids = Sets.difference(active_ids, code_ids);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.alibaba.jstorm.utils.LoadConf;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
Expand All @@ -35,11 +34,7 @@
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
Expand Down Expand Up @@ -2011,7 +2006,7 @@ public void grayUpgrade(String topologyName, String component, List<String> work
throw new NotAliveException(topologyName);
}

Set<String> workerSet = (workers == null) ? Sets.<String>newHashSet() : Sets.<String>newHashSet(workers);
Set<String> workerSet = (workers == null) ? new HashSet<>() : new HashSet<>(workers);
try {
grayUpgrade(topologyId, null, null, Maps.newHashMap(), component, workerSet, workerNum);
} catch (Exception ex) {
Expand Down Expand Up @@ -2079,8 +2074,8 @@ private String grayUpgrade(String topologyId, String uploadedLocation,
Assignment assignment = stormClusterState.assignment_info(topologyId, null);
if (upgradeConfig != null) {
LOG.info("Got existing gray upgrade config:{}", upgradeConfig);
Set<String> upgradingWorkers = Sets.newHashSet(stormClusterState.get_upgrading_workers(topologyId));
Set<String> upgradedWorkers = Sets.newHashSet(stormClusterState.get_upgraded_workers(topologyId));
Set<String> upgradingWorkers = new HashSet<>(Arrays.asList(stormClusterState.get_upgrading_workers(topologyId)));
Set<String> upgradedWorkers = new HashSet<>(Arrays.asList(stormClusterState.get_upgraded_workers(topologyId)));
int upgradingWorkerNum = upgradingWorkers.size();
int upgradedWorkerNum = upgradedWorkers.size();
int totalWorkerNum = assignment.getWorkers().size() - 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,7 @@
import com.alibaba.jstorm.utils.Pair;
import com.alibaba.jstorm.utils.TimeUtils;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -47,7 +43,6 @@
import com.alibaba.jstorm.metric.TopologyMetricContext;
import com.alibaba.jstorm.schedule.Assignment;
import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
import com.google.common.collect.Sets;

/**
* Sync meta <String, Long> from cache and remote
Expand Down Expand Up @@ -108,7 +103,7 @@ private void doRefreshTopologies() {
//there's no need to consider sample rate when cluster metrics merge
conf.put(ConfigExtension.TOPOLOGY_METRIC_SAMPLE_RATE, 1.0);
}
Set<ResourceWorkerSlot> workerSlot = Sets.newHashSet(new ResourceWorkerSlot());
Set<ResourceWorkerSlot> workerSlot = new HashSet<>(Arrays.asList(new ResourceWorkerSlot()));
TopologyMetricContext metricContext = new TopologyMetricContext(topology, workerSlot, conf);
context.getTopologyMetricContexts().putIfAbsent(topology, metricContext);
syncMetaFromCache(topology, context.getTopologyMetricContexts().get(topology));
Expand Down
Loading