From b3fe7f494e6d8fc09c6424c6446acbc9c3cbf916 Mon Sep 17 00:00:00 2001 From: desmorto Date: Wed, 16 Aug 2017 01:22:53 -0300 Subject: [PATCH] (refactor) some opportunities to use lambda functions --- .../jvm/backtype/storm/drpc/DRPCSpout.java | 8 ++++---- .../storm/generated/ClusterSummary.java | 5 ++--- .../backtype/storm/generated/Grouping.java | 5 ++--- .../backtype/storm/generated/JavaObject.java | 5 ++--- .../backtype/storm/generated/StreamInfo.java | 5 ++--- .../storm/generated/TopologyInfo.java | 5 ++--- .../jvm/backtype/storm/scheduler/Cluster.java | 20 +++++++++---------- .../scheduler/SchedulerAssignmentImpl.java | 12 +++++------ .../storm/scheduler/SupervisorDetails.java | 4 ++-- .../storm/scheduler/TopologyDetails.java | 6 +++--- .../src/jvm/backtype/storm/tuple/Fields.java | 4 ++-- .../jvm/backtype/storm/utils/RotatingMap.java | 6 ++---- .../backtype/storm/utils/TimeCacheMap.java | 4 +--- storm-core/src/jvm/storm/trident/Stream.java | 8 +++----- .../spout/RichSpoutBatchTriggerer.java | 8 +++++--- .../storm/trident/state/map/CachedMap.java | 4 ++-- .../state/map/MapCombinerAggStateUpdater.java | 6 ++++-- .../state/map/MapReducerAggStateUpdater.java | 4 ++-- .../storm/trident/state/map/OpaqueMap.java | 4 ++-- .../trident/testing/FeederBatchSpout.java | 4 ++-- .../trident/testing/MemoryBackingMap.java | 4 ++-- .../storm/trident/testing/MemoryMapState.java | 12 +++++------ .../storm/trident/testing/TuplifyArgs.java | 5 ++--- .../storm/messaging/netty/Context.java | 4 ++-- 24 files changed, 71 insertions(+), 81 deletions(-) diff --git a/storm-core/src/jvm/backtype/storm/drpc/DRPCSpout.java b/storm-core/src/jvm/backtype/storm/drpc/DRPCSpout.java index da3242662..28a25dbc2 100644 --- a/storm-core/src/jvm/backtype/storm/drpc/DRPCSpout.java +++ b/storm-core/src/jvm/backtype/storm/drpc/DRPCSpout.java @@ -79,9 +79,9 @@ public void open(Map conf, TopologyContext context, SpoutOutputCollector collect throw new RuntimeException("No DRPC servers configured for topology"); } if(numTasks < servers.size()) { - for(String s: servers) { + servers.forEach(s -> { _clients.add(new DRPCInvocationsClient(s, port)); - } + }); } else { int i = index % servers.size(); _clients.add(new DRPCInvocationsClient(servers.get(i), port)); @@ -92,9 +92,9 @@ public void open(Map conf, TopologyContext context, SpoutOutputCollector collect @Override public void close() { - for(DRPCInvocationsClient client: _clients) { + _clients.forEach(client -> { client.close(); - } + }); } @Override diff --git a/storm-core/src/jvm/backtype/storm/generated/ClusterSummary.java b/storm-core/src/jvm/backtype/storm/generated/ClusterSummary.java index 902cd511f..5474d1209 100644 --- a/storm-core/src/jvm/backtype/storm/generated/ClusterSummary.java +++ b/storm-core/src/jvm/backtype/storm/generated/ClusterSummary.java @@ -537,10 +537,9 @@ public void write(org.apache.thrift7.protocol.TProtocol oprot) throws org.apache oprot.writeFieldBegin(TOPOLOGIES_FIELD_DESC); { oprot.writeListBegin(new org.apache.thrift7.protocol.TList(org.apache.thrift7.protocol.TType.STRUCT, this.topologies.size())); - for (TopologySummary _iter44 : this.topologies) - { + this.topologies.forEach(_iter44 -> { _iter44.write(oprot); - } + }); oprot.writeListEnd(); } oprot.writeFieldEnd(); diff --git a/storm-core/src/jvm/backtype/storm/generated/Grouping.java b/storm-core/src/jvm/backtype/storm/generated/Grouping.java index 7c8ef1365..864f4163f 100644 --- a/storm-core/src/jvm/backtype/storm/generated/Grouping.java +++ b/storm-core/src/jvm/backtype/storm/generated/Grouping.java @@ -379,10 +379,9 @@ protected void writeValue(org.apache.thrift7.protocol.TProtocol oprot) throws or List fields = (List)value_; { oprot.writeListBegin(new org.apache.thrift7.protocol.TList(org.apache.thrift7.protocol.TType.STRING, fields.size())); - for (String _iter7 : fields) - { + fields.forEach(_iter7 -> { oprot.writeString(_iter7); - } + }); oprot.writeListEnd(); } return; diff --git a/storm-core/src/jvm/backtype/storm/generated/JavaObject.java b/storm-core/src/jvm/backtype/storm/generated/JavaObject.java index 0db2620ea..b4b61fae1 100644 --- a/storm-core/src/jvm/backtype/storm/generated/JavaObject.java +++ b/storm-core/src/jvm/backtype/storm/generated/JavaObject.java @@ -412,10 +412,9 @@ public void write(org.apache.thrift7.protocol.TProtocol oprot) throws org.apache oprot.writeFieldBegin(ARGS_LIST_FIELD_DESC); { oprot.writeListBegin(new org.apache.thrift7.protocol.TList(org.apache.thrift7.protocol.TType.STRUCT, this.args_list.size())); - for (JavaObjectArg _iter3 : this.args_list) - { + this.args_list.forEach(_iter3 -> { _iter3.write(oprot); - } + }); oprot.writeListEnd(); } oprot.writeFieldEnd(); diff --git a/storm-core/src/jvm/backtype/storm/generated/StreamInfo.java b/storm-core/src/jvm/backtype/storm/generated/StreamInfo.java index 9d6d5c7bb..b2c2ac8f6 100644 --- a/storm-core/src/jvm/backtype/storm/generated/StreamInfo.java +++ b/storm-core/src/jvm/backtype/storm/generated/StreamInfo.java @@ -410,10 +410,9 @@ public void write(org.apache.thrift7.protocol.TProtocol oprot) throws org.apache oprot.writeFieldBegin(OUTPUT_FIELDS_FIELD_DESC); { oprot.writeListBegin(new org.apache.thrift7.protocol.TList(org.apache.thrift7.protocol.TType.STRING, this.output_fields.size())); - for (String _iter11 : this.output_fields) - { + this.output_fields.forEach(_iter11 -> { oprot.writeString(_iter11); - } + }); oprot.writeListEnd(); } oprot.writeFieldEnd(); diff --git a/storm-core/src/jvm/backtype/storm/generated/TopologyInfo.java b/storm-core/src/jvm/backtype/storm/generated/TopologyInfo.java index c675663fd..927e79179 100644 --- a/storm-core/src/jvm/backtype/storm/generated/TopologyInfo.java +++ b/storm-core/src/jvm/backtype/storm/generated/TopologyInfo.java @@ -796,10 +796,9 @@ public void write(org.apache.thrift7.protocol.TProtocol oprot) throws org.apache oprot.writeFieldBegin(EXECUTORS_FIELD_DESC); { oprot.writeListBegin(new org.apache.thrift7.protocol.TList(org.apache.thrift7.protocol.TType.STRUCT, this.executors.size())); - for (ExecutorSummary _iter155 : this.executors) - { + this.executors.forEach(_iter155 -> { _iter155.write(oprot); - } + }); oprot.writeListEnd(); } oprot.writeFieldEnd(); diff --git a/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java b/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java index 5e0b5af76..1b5531336 100644 --- a/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java +++ b/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java @@ -218,24 +218,24 @@ public List getAvailableSlots(SupervisorDetails supervisor) { Set ports = this.getAvailablePorts(supervisor); List slots = new ArrayList(ports.size()); - for (Integer port : ports) { + ports.forEach(port -> { slots.add(new WorkerSlot(supervisor.getId(), port)); - } + }); return slots; } - + public List getAssignableSlots(SupervisorDetails supervisor) { Set ports = this.getAssignablePorts(supervisor); List slots = new ArrayList(ports.size()); - for (Integer port : ports) { + ports.forEach(port -> { slots.add(new WorkerSlot(supervisor.getId(), port)); - } + }); return slots; } - + /** * get the unassigned executors of the topology. */ @@ -342,9 +342,9 @@ public void freeSlot(WorkerSlot slot) { */ public void freeSlots(Collection slots) { if(slots!=null) { - for (WorkerSlot slot : slots) { + slots.forEach(slot -> { this.freeSlot(slot); - } + }); } } @@ -405,9 +405,9 @@ public List getSupervisorsByHost(String host) { List ret = new ArrayList(); if (nodeIds != null) { - for (String nodeId : nodeIds) { + nodeIds.forEach(nodeId -> { ret.add(this.getSupervisorById(nodeId)); - } + }); } return ret; diff --git a/storm-core/src/jvm/backtype/storm/scheduler/SchedulerAssignmentImpl.java b/storm-core/src/jvm/backtype/storm/scheduler/SchedulerAssignmentImpl.java index 08af4b704..7fa849260 100644 --- a/storm-core/src/jvm/backtype/storm/scheduler/SchedulerAssignmentImpl.java +++ b/storm-core/src/jvm/backtype/storm/scheduler/SchedulerAssignmentImpl.java @@ -55,11 +55,11 @@ public Set getSlots() { * @param executors */ public void assign(WorkerSlot slot, Collection executors) { - for (ExecutorDetails executor : executors) { + executors.forEach(executor -> { this.executorToSlot.put(executor, slot); - } + }); } - + /** * Release the slot occupied by this assignment. * @param slot @@ -74,9 +74,9 @@ public void unassignBySlot(WorkerSlot slot) { } // remove - for (ExecutorDetails executor : executors) { + executors.forEach(executor -> { this.executorToSlot.remove(executor); - } + }); } /** @@ -107,4 +107,4 @@ public Map getExecutorToSlot() { public Set getExecutors() { return this.executorToSlot.keySet(); } -} \ No newline at end of file +} diff --git a/storm-core/src/jvm/backtype/storm/scheduler/SupervisorDetails.java b/storm-core/src/jvm/backtype/storm/scheduler/SupervisorDetails.java index 7497f26ff..91e3a2873 100644 --- a/storm-core/src/jvm/backtype/storm/scheduler/SupervisorDetails.java +++ b/storm-core/src/jvm/backtype/storm/scheduler/SupervisorDetails.java @@ -61,9 +61,9 @@ public SupervisorDetails(String id, String host, Object schedulerMeta, Collectio private void setAllPorts(Collection allPorts) { this.allPorts = new HashSet(); if(allPorts!=null) { - for(Number n: allPorts) { + allPorts.forEach(n -> { this.allPorts.add(n.intValue()); - } + }); } } diff --git a/storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java b/storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java index 6daf4edae..7bc544e48 100644 --- a/storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java +++ b/storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java @@ -73,16 +73,16 @@ public Map getExecutorToComponent() { public Map selectExecutorToComponent(Collection executors) { Map ret = new HashMap(executors.size()); - for (ExecutorDetails executor : executors) { + executors.forEach(executor -> { String compId = this.executorToComponent.get(executor); if (compId != null) { ret.put(executor, compId); } - } + }); return ret; } - + public Collection getExecutors() { return this.executorToComponent.keySet(); } diff --git a/storm-core/src/jvm/backtype/storm/tuple/Fields.java b/storm-core/src/jvm/backtype/storm/tuple/Fields.java index 9805ba6bc..a3ff29798 100644 --- a/storm-core/src/jvm/backtype/storm/tuple/Fields.java +++ b/storm-core/src/jvm/backtype/storm/tuple/Fields.java @@ -47,9 +47,9 @@ public Fields(List fields) { public List select(Fields selector, List tuple) { List ret = new ArrayList(selector.size()); - for(String s: selector) { + selector.forEach(s -> { ret.add(tuple.get(_index.get(s))); - } + }); return ret; } diff --git a/storm-core/src/jvm/backtype/storm/utils/RotatingMap.java b/storm-core/src/jvm/backtype/storm/utils/RotatingMap.java index aca8db694..5beaff962 100644 --- a/storm-core/src/jvm/backtype/storm/utils/RotatingMap.java +++ b/storm-core/src/jvm/backtype/storm/utils/RotatingMap.java @@ -116,9 +116,7 @@ public Object remove(K key) { public int size() { int size = 0; - for(HashMap bucket: _buckets) { - size+=bucket.size(); - } + _buckets.stream().map(bucket -> bucket.size()).reduce(size, Integer::sum); return size; - } + } } diff --git a/storm-core/src/jvm/backtype/storm/utils/TimeCacheMap.java b/storm-core/src/jvm/backtype/storm/utils/TimeCacheMap.java index 36d1baeb2..e6aabdc0a 100644 --- a/storm-core/src/jvm/backtype/storm/utils/TimeCacheMap.java +++ b/storm-core/src/jvm/backtype/storm/utils/TimeCacheMap.java @@ -148,9 +148,7 @@ public Object remove(K key) { public int size() { synchronized(_lock) { int size = 0; - for(HashMap bucket: _buckets) { - size+=bucket.size(); - } + _buckets.stream().map(bucket -> bucket.size()).reduce(size, Integer::sum); return size; } } diff --git a/storm-core/src/jvm/storm/trident/Stream.java b/storm-core/src/jvm/storm/trident/Stream.java index e847eee87..77aabb7a4 100644 --- a/storm-core/src/jvm/storm/trident/Stream.java +++ b/storm-core/src/jvm/storm/trident/Stream.java @@ -365,10 +365,8 @@ private void projectionValidation(Fields projFields) { } Fields allFields = this.getOutputFields(); - for (String field : projFields) { - if (!allFields.contains(field)) { - throw new IllegalArgumentException("Trying to select non-existent field: '" + field + "' from stream containing fields fields: <" + allFields + ">"); - } - } + projFields.stream().filter(field -> !allFields.contains(field)).forEach(field -> { + throw new IllegalArgumentException("Trying to select non-existent field: '" + field + "' from stream containing fields fields: <" + allFields + ">"); + }); } } diff --git a/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java b/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java index f5d37e6bb..56bb1af4d 100644 --- a/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java +++ b/storm-core/src/jvm/storm/trident/spout/RichSpoutBatchTriggerer.java @@ -150,15 +150,17 @@ public List emit(String ignore, List values, Object msgId) { finish.msgId = msgId; List tasks = _collector.emit(_stream, new ConsList(batchId, values)); Set outTasksSet = new HashSet(tasks); - for(Integer t: _outputTasks) { + _outputTasks.stream().map(t -> { int count = 0; - if(outTasksSet.contains(t)) { + if (outTasksSet.contains(t)) { count = 1; } long r = _rand.nextLong(); _collector.emitDirect(t, _coordStream, new Values(batchId, count), r); + return r; + }).forEach(r -> { finish.vals.add(r); - } + }); _finishConditions.put(batchIdVal, finish); return tasks; } diff --git a/storm-core/src/jvm/storm/trident/state/map/CachedMap.java b/storm-core/src/jvm/storm/trident/state/map/CachedMap.java index ecc495d6d..14ae27c4c 100644 --- a/storm-core/src/jvm/storm/trident/state/map/CachedMap.java +++ b/storm-core/src/jvm/storm/trident/state/map/CachedMap.java @@ -58,9 +58,9 @@ public List multiGet(List> keys) { } List ret = new ArrayList(keys.size()); - for(List key: keys) { + keys.forEach(key -> { ret.add(results.get(key)); - } + }); return ret; } diff --git a/storm-core/src/jvm/storm/trident/state/map/MapCombinerAggStateUpdater.java b/storm-core/src/jvm/storm/trident/state/map/MapCombinerAggStateUpdater.java index 80a5ffe36..03e7d4afc 100644 --- a/storm-core/src/jvm/storm/trident/state/map/MapCombinerAggStateUpdater.java +++ b/storm-core/src/jvm/storm/trident/state/map/MapCombinerAggStateUpdater.java @@ -57,10 +57,12 @@ public void updateState(MapState map, List tuples, TridentCollecto List> groups = new ArrayList>(tuples.size()); List updaters = new ArrayList(tuples.size()); - for(TridentTuple t: tuples) { + tuples.stream().map(t -> { groups.add(_groupFactory.create(t)); + return t; + }).forEach(t -> { updaters.add(new CombinerValueUpdater(_agg,_inputFactory.create(t).getValue(0))); - } + }); List newVals = map.multiUpdate(groups, updaters); for(int i=0; i tuples, TridentCollecto } List> uniqueGroups = new ArrayList(grouped.keySet()); List updaters = new ArrayList(uniqueGroups.size()); - for(List group: uniqueGroups) { + uniqueGroups.forEach(group -> { updaters.add(new ReducerValueUpdater(_agg, grouped.get(group))); - } + }); List results = map.multiUpdate(uniqueGroups, updaters); for(int i=0; i multiUpdate(List> keys, List updaters) @Override public void multiPut(List> keys, List vals) { List updaters = new ArrayList(vals.size()); - for(T val: vals) { + vals.forEach(val -> { updaters.add(new ReplaceUpdater(val)); - } + }); multiUpdate(keys, updaters); } diff --git a/storm-core/src/jvm/storm/trident/testing/FeederBatchSpout.java b/storm-core/src/jvm/storm/trident/testing/FeederBatchSpout.java index 03357a8b5..26d2e9b8f 100644 --- a/storm-core/src/jvm/storm/trident/testing/FeederBatchSpout.java +++ b/storm-core/src/jvm/storm/trident/testing/FeederBatchSpout.java @@ -140,9 +140,9 @@ public FeederEmitter(int index) { public void emitBatch(TransactionAttempt tx, Map>> coordinatorMeta, TridentCollector collector) { List> tuples = coordinatorMeta.get(_index); if(tuples!=null) { - for(List t: tuples) { + tuples.forEach(t -> { collector.emit(t); - } + }); } } diff --git a/storm-core/src/jvm/storm/trident/testing/MemoryBackingMap.java b/storm-core/src/jvm/storm/trident/testing/MemoryBackingMap.java index 25207ff97..8ed90940e 100644 --- a/storm-core/src/jvm/storm/trident/testing/MemoryBackingMap.java +++ b/storm-core/src/jvm/storm/trident/testing/MemoryBackingMap.java @@ -30,9 +30,9 @@ public class MemoryBackingMap implements IBackingMap { @Override public List multiGet(List> keys) { List ret = new ArrayList(); - for(List key: keys) { + keys.forEach(key -> { ret.add(_vals.get(key)); - } + }); return ret; } diff --git a/storm-core/src/jvm/storm/trident/testing/MemoryMapState.java b/storm-core/src/jvm/storm/trident/testing/MemoryMapState.java index 5df99f781..fc58a5642 100644 --- a/storm-core/src/jvm/storm/trident/testing/MemoryMapState.java +++ b/storm-core/src/jvm/storm/trident/testing/MemoryMapState.java @@ -107,15 +107,13 @@ public MemoryMapStateBacking(String id) { } @Override - public List multiGet(List> keys) { + public List multiGet(List> keys) { List ret = new ArrayList(); - for (List key : keys) { - ret.add(db.get(key)); - } + keys.forEach(key -> { +ret.add(db.get(key)); +}); return ret; - } - - @Override + }@Override public void multiPut(List> keys, List vals) { for (int i = 0; i < keys.size(); i++) { List key = keys.get(i); diff --git a/storm-core/src/jvm/storm/trident/testing/TuplifyArgs.java b/storm-core/src/jvm/storm/trident/testing/TuplifyArgs.java index e53025bb5..48b4d09da 100644 --- a/storm-core/src/jvm/storm/trident/testing/TuplifyArgs.java +++ b/storm-core/src/jvm/storm/trident/testing/TuplifyArgs.java @@ -29,9 +29,8 @@ public class TuplifyArgs extends BaseFunction { public void execute(TridentTuple input, TridentCollector collector) { String args = input.getString(0); List> tuples = (List) JSONValue.parse(args); - for(List tuple: tuples) { + tuples.forEach(tuple -> { collector.emit(tuple); - } + }); } - } diff --git a/storm-netty/src/jvm/backtype/storm/messaging/netty/Context.java b/storm-netty/src/jvm/backtype/storm/messaging/netty/Context.java index d7a72016e..6f28706eb 100644 --- a/storm-netty/src/jvm/backtype/storm/messaging/netty/Context.java +++ b/storm-netty/src/jvm/backtype/storm/messaging/netty/Context.java @@ -59,9 +59,9 @@ public IConnection connect(String storm_id, String host, int port) { * terminate this context */ public void term() { - for (IConnection conn : connections) { + connections.forEach(conn -> { conn.close(); - } + }); connections = null; } }