Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(refactor) some opportunities to use lambda functions #786

Open
wants to merge 1 commit into
base: moved-to-apache
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions storm-core/src/jvm/backtype/storm/drpc/DRPCSpout.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ public void open(Map conf, TopologyContext context, SpoutOutputCollector collect
throw new RuntimeException("No DRPC servers configured for topology");
}
if(numTasks < servers.size()) {
for(String s: servers) {
servers.forEach(s -> {
_clients.add(new DRPCInvocationsClient(s, port));
}
});
} else {
int i = index % servers.size();
_clients.add(new DRPCInvocationsClient(servers.get(i), port));
Expand All @@ -92,9 +92,9 @@ public void open(Map conf, TopologyContext context, SpoutOutputCollector collect

@Override
public void close() {
for(DRPCInvocationsClient client: _clients) {
_clients.forEach(client -> {
client.close();
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -537,10 +537,9 @@ public void write(org.apache.thrift7.protocol.TProtocol oprot) throws org.apache
oprot.writeFieldBegin(TOPOLOGIES_FIELD_DESC);
{
oprot.writeListBegin(new org.apache.thrift7.protocol.TList(org.apache.thrift7.protocol.TType.STRUCT, this.topologies.size()));
for (TopologySummary _iter44 : this.topologies)
{
this.topologies.forEach(_iter44 -> {
_iter44.write(oprot);
}
});
oprot.writeListEnd();
}
oprot.writeFieldEnd();
Expand Down
5 changes: 2 additions & 3 deletions storm-core/src/jvm/backtype/storm/generated/Grouping.java
Original file line number Diff line number Diff line change
Expand Up @@ -379,10 +379,9 @@ protected void writeValue(org.apache.thrift7.protocol.TProtocol oprot) throws or
List<String> fields = (List<String>)value_;
{
oprot.writeListBegin(new org.apache.thrift7.protocol.TList(org.apache.thrift7.protocol.TType.STRING, fields.size()));
for (String _iter7 : fields)
{
fields.forEach(_iter7 -> {
oprot.writeString(_iter7);
}
});
oprot.writeListEnd();
}
return;
Expand Down
5 changes: 2 additions & 3 deletions storm-core/src/jvm/backtype/storm/generated/JavaObject.java
Original file line number Diff line number Diff line change
Expand Up @@ -412,10 +412,9 @@ public void write(org.apache.thrift7.protocol.TProtocol oprot) throws org.apache
oprot.writeFieldBegin(ARGS_LIST_FIELD_DESC);
{
oprot.writeListBegin(new org.apache.thrift7.protocol.TList(org.apache.thrift7.protocol.TType.STRUCT, this.args_list.size()));
for (JavaObjectArg _iter3 : this.args_list)
{
this.args_list.forEach(_iter3 -> {
_iter3.write(oprot);
}
});
oprot.writeListEnd();
}
oprot.writeFieldEnd();
Expand Down
5 changes: 2 additions & 3 deletions storm-core/src/jvm/backtype/storm/generated/StreamInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -410,10 +410,9 @@ public void write(org.apache.thrift7.protocol.TProtocol oprot) throws org.apache
oprot.writeFieldBegin(OUTPUT_FIELDS_FIELD_DESC);
{
oprot.writeListBegin(new org.apache.thrift7.protocol.TList(org.apache.thrift7.protocol.TType.STRING, this.output_fields.size()));
for (String _iter11 : this.output_fields)
{
this.output_fields.forEach(_iter11 -> {
oprot.writeString(_iter11);
}
});
oprot.writeListEnd();
}
oprot.writeFieldEnd();
Expand Down
5 changes: 2 additions & 3 deletions storm-core/src/jvm/backtype/storm/generated/TopologyInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -796,10 +796,9 @@ public void write(org.apache.thrift7.protocol.TProtocol oprot) throws org.apache
oprot.writeFieldBegin(EXECUTORS_FIELD_DESC);
{
oprot.writeListBegin(new org.apache.thrift7.protocol.TList(org.apache.thrift7.protocol.TType.STRUCT, this.executors.size()));
for (ExecutorSummary _iter155 : this.executors)
{
this.executors.forEach(_iter155 -> {
_iter155.write(oprot);
}
});
oprot.writeListEnd();
}
oprot.writeFieldEnd();
Expand Down
20 changes: 10 additions & 10 deletions storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -218,24 +218,24 @@ public List<WorkerSlot> getAvailableSlots(SupervisorDetails supervisor) {
Set<Integer> ports = this.getAvailablePorts(supervisor);
List<WorkerSlot> slots = new ArrayList<WorkerSlot>(ports.size());

for (Integer port : ports) {
ports.forEach(port -> {
slots.add(new WorkerSlot(supervisor.getId(), port));
}
});

return slots;
}

public List<WorkerSlot> getAssignableSlots(SupervisorDetails supervisor) {
Set<Integer> ports = this.getAssignablePorts(supervisor);
List<WorkerSlot> slots = new ArrayList<WorkerSlot>(ports.size());

for (Integer port : ports) {
ports.forEach(port -> {
slots.add(new WorkerSlot(supervisor.getId(), port));
}
});

return slots;
}

/**
* get the unassigned executors of the topology.
*/
Expand Down Expand Up @@ -342,9 +342,9 @@ public void freeSlot(WorkerSlot slot) {
*/
public void freeSlots(Collection<WorkerSlot> slots) {
if(slots!=null) {
for (WorkerSlot slot : slots) {
slots.forEach(slot -> {
this.freeSlot(slot);
}
});
}
}

Expand Down Expand Up @@ -405,9 +405,9 @@ public List<SupervisorDetails> getSupervisorsByHost(String host) {
List<SupervisorDetails> ret = new ArrayList<SupervisorDetails>();

if (nodeIds != null) {
for (String nodeId : nodeIds) {
nodeIds.forEach(nodeId -> {
ret.add(this.getSupervisorById(nodeId));
}
});
}

return ret;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ public Set<WorkerSlot> getSlots() {
* @param executors
*/
public void assign(WorkerSlot slot, Collection<ExecutorDetails> executors) {
for (ExecutorDetails executor : executors) {
executors.forEach(executor -> {
this.executorToSlot.put(executor, slot);
}
});
}

/**
* Release the slot occupied by this assignment.
* @param slot
Expand All @@ -74,9 +74,9 @@ public void unassignBySlot(WorkerSlot slot) {
}

// remove
for (ExecutorDetails executor : executors) {
executors.forEach(executor -> {
this.executorToSlot.remove(executor);
}
});
}

/**
Expand Down Expand Up @@ -107,4 +107,4 @@ public Map<ExecutorDetails, WorkerSlot> getExecutorToSlot() {
public Set<ExecutorDetails> getExecutors() {
return this.executorToSlot.keySet();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ public SupervisorDetails(String id, String host, Object schedulerMeta, Collectio
private void setAllPorts(Collection<Number> allPorts) {
this.allPorts = new HashSet<Integer>();
if(allPorts!=null) {
for(Number n: allPorts) {
allPorts.forEach(n -> {
this.allPorts.add(n.intValue());
}
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,16 @@ public Map<ExecutorDetails, String> getExecutorToComponent() {

public Map<ExecutorDetails, String> selectExecutorToComponent(Collection<ExecutorDetails> executors) {
Map<ExecutorDetails, String> ret = new HashMap<ExecutorDetails, String>(executors.size());
for (ExecutorDetails executor : executors) {
executors.forEach(executor -> {
String compId = this.executorToComponent.get(executor);
if (compId != null) {
ret.put(executor, compId);
}
}
});

return ret;
}

public Collection<ExecutorDetails> getExecutors() {
return this.executorToComponent.keySet();
}
Expand Down
4 changes: 2 additions & 2 deletions storm-core/src/jvm/backtype/storm/tuple/Fields.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ public Fields(List<String> fields) {

public List<Object> select(Fields selector, List<Object> tuple) {
List<Object> ret = new ArrayList<Object>(selector.size());
for(String s: selector) {
selector.forEach(s -> {
ret.add(tuple.get(_index.get(s)));
}
});
return ret;
}

Expand Down
6 changes: 2 additions & 4 deletions storm-core/src/jvm/backtype/storm/utils/RotatingMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,7 @@ public Object remove(K key) {

public int size() {
int size = 0;
for(HashMap<K, V> bucket: _buckets) {
size+=bucket.size();
}
_buckets.stream().map(bucket -> bucket.size()).reduce(size, Integer::sum);
return size;
}
}
}
4 changes: 1 addition & 3 deletions storm-core/src/jvm/backtype/storm/utils/TimeCacheMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,7 @@ public Object remove(K key) {
public int size() {
synchronized(_lock) {
int size = 0;
for(HashMap<K, V> bucket: _buckets) {
size+=bucket.size();
}
_buckets.stream().map(bucket -> bucket.size()).reduce(size, Integer::sum);
return size;
}
}
Expand Down
8 changes: 3 additions & 5 deletions storm-core/src/jvm/storm/trident/Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -365,10 +365,8 @@ private void projectionValidation(Fields projFields) {
}

Fields allFields = this.getOutputFields();
for (String field : projFields) {
if (!allFields.contains(field)) {
throw new IllegalArgumentException("Trying to select non-existent field: '" + field + "' from stream containing fields fields: <" + allFields + ">");
}
}
projFields.stream().filter(field -> !allFields.contains(field)).forEach(field -> {
throw new IllegalArgumentException("Trying to select non-existent field: '" + field + "' from stream containing fields fields: <" + allFields + ">");
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,15 +150,17 @@ public List<Integer> emit(String ignore, List<Object> values, Object msgId) {
finish.msgId = msgId;
List<Integer> tasks = _collector.emit(_stream, new ConsList(batchId, values));
Set<Integer> outTasksSet = new HashSet<Integer>(tasks);
for(Integer t: _outputTasks) {
_outputTasks.stream().map(t -> {
int count = 0;
if(outTasksSet.contains(t)) {
if (outTasksSet.contains(t)) {
count = 1;
}
long r = _rand.nextLong();
_collector.emitDirect(t, _coordStream, new Values(batchId, count), r);
return r;
}).forEach(r -> {
finish.vals.add(r);
}
});
_finishConditions.put(batchIdVal, finish);
return tasks;
}
Expand Down
4 changes: 2 additions & 2 deletions storm-core/src/jvm/storm/trident/state/map/CachedMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ public List<T> multiGet(List<List<Object>> keys) {
}

List<T> ret = new ArrayList<T>(keys.size());
for(List<Object> key: keys) {
keys.forEach(key -> {
ret.add(results.get(key));
}
});
return ret;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,12 @@ public void updateState(MapState map, List<TridentTuple> tuples, TridentCollecto
List<List<Object>> groups = new ArrayList<List<Object>>(tuples.size());
List<ValueUpdater> updaters = new ArrayList<ValueUpdater>(tuples.size());

for(TridentTuple t: tuples) {
tuples.stream().map(t -> {
groups.add(_groupFactory.create(t));
return t;
}).forEach(t -> {
updaters.add(new CombinerValueUpdater(_agg,_inputFactory.create(t).getValue(0)));
}
});
List<Object> newVals = map.multiUpdate(groups, updaters);

for(int i=0; i<tuples.size(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ public void updateState(MapState map, List<TridentTuple> tuples, TridentCollecto
}
List<List<Object>> uniqueGroups = new ArrayList(grouped.keySet());
List<ValueUpdater> updaters = new ArrayList(uniqueGroups.size());
for(List<Object> group: uniqueGroups) {
uniqueGroups.forEach(group -> {
updaters.add(new ReducerValueUpdater(_agg, grouped.get(group)));
}
});
List<Object> results = map.multiUpdate(uniqueGroups, updaters);

for(int i=0; i<uniqueGroups.size(); i++) {
Expand Down
4 changes: 2 additions & 2 deletions storm-core/src/jvm/storm/trident/state/map/OpaqueMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ public List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters)
@Override
public void multiPut(List<List<Object>> keys, List<T> vals) {
List<ValueUpdater> updaters = new ArrayList<ValueUpdater>(vals.size());
for(T val: vals) {
vals.forEach(val -> {
updaters.add(new ReplaceUpdater<T>(val));
}
});
multiUpdate(keys, updaters);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,9 @@ public FeederEmitter(int index) {
public void emitBatch(TransactionAttempt tx, Map<Integer, List<List<Object>>> coordinatorMeta, TridentCollector collector) {
List<List<Object>> tuples = coordinatorMeta.get(_index);
if(tuples!=null) {
for(List<Object> t: tuples) {
tuples.forEach(t -> {
collector.emit(t);
}
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ public class MemoryBackingMap implements IBackingMap<Object> {
@Override
public List<Object> multiGet(List<List<Object>> keys) {
List ret = new ArrayList();
for(List key: keys) {
keys.forEach(key -> {
ret.add(_vals.get(key));
}
});
return ret;
}

Expand Down
12 changes: 5 additions & 7 deletions storm-core/src/jvm/storm/trident/testing/MemoryMapState.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,13 @@ public MemoryMapStateBacking(String id) {
}

@Override
public List<T> multiGet(List<List<Object>> keys) {
public List<T> multiGet(List<List<Object>> keys) {
List<T> ret = new ArrayList();
for (List<Object> key : keys) {
ret.add(db.get(key));
}
keys.forEach(key -> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please format the all code.

ret.add(db.get(key));
});
return ret;
}

@Override
}@Override
public void multiPut(List<List<Object>> keys, List<T> vals) {
for (int i = 0; i < keys.size(); i++) {
List<Object> key = keys.get(i);
Expand Down
5 changes: 2 additions & 3 deletions storm-core/src/jvm/storm/trident/testing/TuplifyArgs.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@ public class TuplifyArgs extends BaseFunction {
public void execute(TridentTuple input, TridentCollector collector) {
String args = input.getString(0);
List<List<Object>> tuples = (List) JSONValue.parse(args);
for(List<Object> tuple: tuples) {
tuples.forEach(tuple -> {
collector.emit(tuple);
}
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ public IConnection connect(String storm_id, String host, int port) {
* terminate this context
*/
public void term() {
for (IConnection conn : connections) {
connections.forEach(conn -> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@desmorto, You can remove unnecessary brackets or use a method reference.

connections.forEach(conn -> conn.close());

conn.close();
}
});
connections = null;
}
}