Skip to content

Commit

Permalink
[ISSUE-183] Support Async Shuffle
Browse files Browse the repository at this point in the history
  • Loading branch information
CrazyMountain authored Sep 28, 2023
1 parent 2e9732d commit a2e1ad3
Show file tree
Hide file tree
Showing 79 changed files with 1,819 additions and 865 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,11 @@ public class ExecutionConfigKeys implements Serializable {
.defaultValue(600000)
.description("shuffle fetch timeout in milliseconds");

public static final ConfigKey SHUFFLE_FETCH_QUEUE_SIZE = ConfigKeys
.key("geaflow.shuffle.fetch.queue.size")
.defaultValue(1)
.description("size of shuffle fetch queue");

/** shuffle write config. */

public static final ConfigKey SHUFFLE_SPILL_RECORDS = ConfigKeys
Expand All @@ -396,15 +401,25 @@ public class ExecutionConfigKeys implements Serializable {
.defaultValue(1610612736L) // 1.5G
.description("max size of each spill per slice in Bytes");

public static final ConfigKey SHUFFLE_WRITE_BUFFER_SIZE = ConfigKeys
.key("geaflow.shuffle.write.buffer.size")
.defaultValue(15360)
public static final ConfigKey SHUFFLE_WRITE_BUFFER_SIZE_BYTES = ConfigKeys
.key("geaflow.shuffle.write.buffer.size.bytes")
.defaultValue(128 * 1024)
.description("size of shuffle write buffer");

public static final ConfigKey SHUFFLE_FLUSH_BUFFER_TIMEOUT = ConfigKeys
.key("geaflow.shuffle.flush.buffer.timeout")
public static final ConfigKey SHUFFLE_EMIT_BUFFER_SIZE = ConfigKeys
.key("geaflow.shuffle.emit.buffer.size")
.defaultValue(1024)
.description("size of shuffle emit buffer of java object");

public static final ConfigKey SHUFFLE_EMIT_QUEUE_SIZE = ConfigKeys
.key("geaflow.shuffle.emit.queue.size")
.defaultValue(1)
.description("size of shuffle emit queue");

public static final ConfigKey SHUFFLE_FLUSH_BUFFER_TIMEOUT_MS = ConfigKeys
.key("geaflow.shuffle.flush.buffer.timeout.ms")
.defaultValue(100)
.description("shuffle flush buffer timeout");
.description("shuffle flush buffer timeout ms");

public static final ConfigKey SHUFFLE_CACHE_SPILL_THRESHOLD = ConfigKeys
.key("geaflow.shuffle.cache.spill.threshold")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,34 @@ public void setOutputKb(long outputKb) {
this.outputKb = outputKb;
}

public static CycleMetrics build(String metricName,
String pipelineName,
String opName,
int taskNum,
int slowestTask,
long startTime,
long duration,
long totalExecuteTime,
long totalGcTime,
long slowestTaskExecuteTime,
long totalInputRecords,
long totalInputBytes,
long totalOutputRecords,
long totalOutputBytes) {
CycleMetrics cycleMetrics = new CycleMetrics(metricName, pipelineName, opName);
cycleMetrics.setStartTime(startTime);
cycleMetrics.setTotalTasks(taskNum);
cycleMetrics.setSlowestTask(slowestTask);
cycleMetrics.setDuration(duration);
cycleMetrics.setAvgExecuteTime(totalExecuteTime / taskNum);
cycleMetrics.setAvgGcTime(totalGcTime / taskNum);
cycleMetrics.setSlowestTaskExecuteTime(slowestTaskExecuteTime);
cycleMetrics.setInputRecords(totalInputRecords);
cycleMetrics.setInputKb(totalInputBytes / 1024);
cycleMetrics.setOutputRecords(totalOutputRecords);
cycleMetrics.setOutputKb(totalOutputBytes / 1024);
return cycleMetrics;
}

@Override
public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,114 +14,188 @@

package com.antgroup.geaflow.common.metric;

import com.antgroup.geaflow.common.utils.GcUtil;
import java.io.Serializable;

public class EventMetrics implements Serializable {

/**
* execute compute event time cost.
* Meta.
*/
private long executeTime;
private final int vertexId;
private final int parallelism;
private final int index;

/**
* gc time in total during current task execution.
*/
private long gcTime;

/**
* finish time of process certain windowId.
* Execution.
*/
private long startTime;
private long finishTime;
private long executeCostMs;
private long processCostMs;
private long gcCostMs;

/**
* total input records.
*/
private long inputRecords;
private long inputBytes;

/**
* total output records.
* Shuffle.
*/
private long outputRecords;
private long outputBytes;
private long shuffleReadRecords;
private long shuffleReadBytes;
private long shuffleReadCostMs;

private transient long startTs;
private long shuffleWriteRecords;
private long shuffleWriteBytes;
private long shuffleWriteCostMs;

private transient long startGcTs;

public EventMetrics() {
public EventMetrics(int vertexId, int parallelism, int index) {
this.vertexId = vertexId;
this.parallelism = parallelism;
this.index = index;
this.startTime = System.currentTimeMillis();
this.startGcTs = GcUtil.computeCurrentTotalGcTime();
}

public long getExecuteTime() {
return executeTime;
public int getVertexId() {
return this.vertexId;
}

public void setExecuteTime(long executeTime) {
this.executeTime = executeTime;
public int getParallelism() {
return this.parallelism;
}

public long getGcTime() {
return gcTime;
public int getIndex() {
return this.index;
}

public void setGcTime(long gcTime) {
this.gcTime = gcTime;
public long getStartTime() {
return this.startTime;
}

public long getStartTs() {
return startTs;
public void setStartTime(long startTime) {
this.startTime = startTime;
}

public void setStartTs(long startTs) {
this.startTs = startTs;
public long getFinishTime() {
return this.finishTime;
}

public long getStartGcTs() {
return startGcTs;
public void setFinishTime(long finishTime) {
this.finishTime = finishTime;
this.executeCostMs = this.finishTime - this.startTime;
}

public void setStartGcTs(long startGcTs) {
this.startGcTs = startGcTs;
public long getExecuteCostMs() {
return this.executeCostMs;
}

public long getInputRecords() {
return inputRecords;
public long getProcessCostMs() {
return this.processCostMs;
}

public void setInputRecords(long inputRecords) {
this.inputRecords = inputRecords;
public void setProcessCostMs(long processCostMs) {
this.processCostMs = processCostMs;
}

public long getFinishTime() {
return finishTime;
public void addProcessCostMs(long processCostMs) {
this.processCostMs += processCostMs;
}

public void setFinishTime(long finishTime) {
this.finishTime = finishTime;
public long getGcCostMs() {
return this.gcCostMs;
}

public long getShuffleReadRecords() {
return this.shuffleReadRecords;
}

public void setShuffleReadRecords(long shuffleReadRecords) {
this.shuffleReadRecords = shuffleReadRecords;
}

public void addShuffleReadRecords(long shuffleReadRecords) {
this.shuffleReadRecords += shuffleReadRecords;
}

public long getShuffleReadBytes() {
return this.shuffleReadBytes;
}

public void setShuffleReadBytes(long shuffleReadBytes) {
this.shuffleReadBytes = shuffleReadBytes;
}

public void addShuffleReadBytes(long shuffleReadBytes) {
this.shuffleReadBytes += shuffleReadBytes;
}

public long getShuffleReadCostMs() {
return this.shuffleReadCostMs;
}

public void setShuffleReadCostMs(long shuffleReadCostMs) {
this.shuffleReadCostMs = shuffleReadCostMs;
}

public void addShuffleReadCostMs(long shuffleReadCostMs) {
this.shuffleReadCostMs += shuffleReadCostMs;
}

public long getOutputRecords() {
return outputRecords;
public long getShuffleWriteRecords() {
return this.shuffleWriteRecords;
}

public void setOutputRecords(long outputRecords) {
this.outputRecords = outputRecords;
public void setShuffleWriteRecords(long shuffleWriteRecords) {
this.shuffleWriteRecords = shuffleWriteRecords;
}

public long getInputBytes() {
return inputBytes;
public void addShuffleWriteRecords(long shuffleWriteRecords) {
this.shuffleWriteRecords += shuffleWriteRecords;
}

public void setInputBytes(long inputBytes) {
this.inputBytes = inputBytes;
public long getShuffleWriteBytes() {
return this.shuffleWriteBytes;
}

public long getOutputBytes() {
return outputBytes;
public void setShuffleWriteBytes(long shuffleWriteBytes) {
this.shuffleWriteBytes = shuffleWriteBytes;
}

public void addShuffleWriteBytes(long shuffleWriteBytes) {
this.shuffleWriteBytes += shuffleWriteBytes;
}

public long getShuffleWriteCostMs() {
return this.shuffleWriteCostMs;
}

public void addShuffleWriteCostMs(long shuffleWriteCostMs) {
this.shuffleWriteCostMs += shuffleWriteCostMs;
}

public void setStartGcTs(long startGcTs) {
this.startGcTs = startGcTs;
}

public void setOutputBytes(long outputBytes) {
this.outputBytes = outputBytes;
public void setFinishGcTs(long finishGcTs) {
this.gcCostMs = finishGcTs - this.startGcTs;
}

@Override
public String toString() {
return "EventMetrics{"
+ "startTime=" + startTime
+ ", finishTime=" + finishTime
+ ", executeCostMs=" + executeCostMs
+ ", processCostMs=" + processCostMs
+ ", gcCostMs=" + gcCostMs
+ ", shuffleReadRecords=" + shuffleReadRecords
+ ", shuffleReadBytes=" + shuffleReadBytes
+ ", shuffleReadCostMs=" + shuffleReadCostMs
+ ", shuffleWriteRecords=" + shuffleWriteRecords
+ ", shuffleWriteBytes=" + shuffleWriteBytes
+ ", shuffleWriteCostMs=" + shuffleWriteCostMs
+ '}';
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ public void setDecodeBytes(long decodeBytes) {
this.decodeBytes = decodeBytes;
}

public void increaseDecodeBytes(long decodeBytes) {
this.decodeBytes += decodeBytes;
}


public long getFetchWaitMs() {
return fetchWaitMs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ protected Kryo initialValue() {

private void registerClass(Kryo kryo, String className, int kryoId) {
try {
LOGGER.info("register class:{} id:{}", className, kryoId);
LOGGER.debug("register class:{} id:{}", className, kryoId);
Class<?> clazz = ClassUtil.classForName(className);
kryo.register(clazz, kryoId);
} catch (GeaflowRuntimeException e) {
Expand All @@ -211,7 +211,7 @@ private void registerClass(Kryo kryo, String className, int kryoId) {

private void registerClass(Kryo kryo, String className, String serializerClassName, int kryoId) {
try {
LOGGER.info("register class:{} id:{}", className, kryoId);
LOGGER.debug("register class:{} id:{}", className, kryoId);
Class<?> clazz = ClassUtil.classForName(className);
Class<?> serializerClazz = ClassUtil.classForName(serializerClassName);
Serializer serializer = (Serializer) serializerClazz.newInstance();
Expand Down
Loading

0 comments on commit a2e1ad3

Please sign in to comment.