Skip to content

Commit

Permalink
Issue 87: Add Throughput Recorder (#88)
Browse files Browse the repository at this point in the history
Adds an extra performance recorder to record throughput while the test is running.

Signed-off-by: David Maddison <david.maddison@dell.com>
  • Loading branch information
maddisondavid authored Feb 17, 2020
1 parent 8ea4bd1 commit 3d48804
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 22 deletions.
4 changes: 3 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ buildscript {
compile "io.pravega:pravega-client:${pravegaVersion}",
"io.pravega:pravega-common:${pravegaVersion}",
"commons-cli:commons-cli:${commonsCLIVersion}",
"org.apache.commons:commons-csv:${commonsCSVVersion}"
"org.apache.commons:commons-csv:${commonsCSVVersion}",
"org.apache.commons:commons-math3:${commonsMathVersion}",
"org.apache.commons:commons-io:${commonsIOVersion}"

runtime "org.slf4j:slf4j-simple:${slf4jSimpleVersion}",
"io.pravega:pravega-keycloak-client:${pravegaKeycloakVersion}"
Expand Down
2 changes: 2 additions & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

commonsCLIVersion=1.3.1
commonsCSVVersion=1.5
commonsMathVersion=3.6.1
commonsIOVersion=1.3.2
pravegaVersion=0.7.0-50.ef40b45-SNAPSHOT
pravegaKeycloakVersion=0.6.0-6.1a4bae7-SNAPSHOT
slf4jSimpleVersion=1.7.14
104 changes: 87 additions & 17 deletions src/main/java/io/pravega/perf/PerfStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.commons.csv.CSVPrinter;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.apache.commons.math3.util.Precision;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -37,8 +38,11 @@
public class PerfStats {
private static Logger log = LoggerFactory.getLogger(PerfStats.class);

private static final double[] PERCENTILES = {0.5, 0.75, 0.95, 0.99, 0.999, 0.9999};

final private String action;
final private String csvFile;
final private String throughputCsvFile;
final private int messageSize;
final private int windowInterval;
final private ConcurrentLinkedQueue<TimeStamp> queue;
Expand Down Expand Up @@ -70,11 +74,12 @@ private boolean isEnd() {
}
}

public PerfStats(String action, int reportingInterval, int messageSize, String csvFile) {
public PerfStats(String action, int reportingInterval, int messageSize, String csvFile, String throughputCsvFile) {
this.action = action;
this.messageSize = messageSize;
this.windowInterval = reportingInterval;
this.csvFile = csvFile;
this.throughputCsvFile = throughputCsvFile;
this.queue = new ConcurrentLinkedQueue<>();
this.executor = new ForkJoinPool(1);
this.ret = null;
Expand All @@ -89,20 +94,24 @@ final private class QueueProcessor implements Callable {
final private static int NS_PER_MS = NS_PER_MICRO * MICROS_PER_MS;
final private static int PARK_NS = NS_PER_MICRO;
final private long startTime;
final TimeWindow window;
CSVThroughputWriter throughputRecorder;

private QueueProcessor(long startTime) {
private QueueProcessor(long startTime){
this.startTime = startTime;
window = new TimeWindow(action, startTime);
}

public Void call() throws IOException {
final TimeWindow window = new TimeWindow(action, startTime);
final LatencyWriter latencyRecorder = csvFile == null ? new LatencyWriter(action, messageSize, startTime) :
new CSVLatencyWriter(action, messageSize, startTime, csvFile);
final int minWaitTimeMS = windowInterval / 50;
final long totalIdleCount = (NS_PER_MS / PARK_NS) * minWaitTimeMS;
boolean doWork = true;
long time = startTime;
long idleCount = 0;

throughputRecorder = throughputCsvFile != null ? new CSVThroughputWriter(action, throughputCsvFile) : null;
TimeStamp t;

while (doWork) {
Expand All @@ -113,12 +122,12 @@ public Void call() throws IOException {
} else {
final int latency = (int) (t.endTime - t.startTime);
window.record(t.bytes, latency);

latencyRecorder.record(t.startTime, t.bytes, latency);
}
time = t.endTime;
if (window.windowTimeMS(time) > windowInterval) {
window.print(time);
window.reset(time);
resetWindow(time);
}
} else {
LockSupport.parkNanos(PARK_NS);
Expand All @@ -127,15 +136,29 @@ public Void call() throws IOException {
time = System.currentTimeMillis();
idleCount = 0;
if (window.windowTimeMS(time) > windowInterval) {
window.print(time);
window.reset(time);
resetWindow(time);
}
}
}
}

latencyRecorder.printTotal(time);
if (throughputRecorder != null) {
throughputRecorder.close();
}
return null;
}

private final void resetWindow(long time) {
window.lastTime(time);

if (throughputRecorder != null) {
throughputRecorder.record(window.startTime, window.lastTime, window.bytes, window.getMiBPerSecond(), window.count, window.getEventsPerSecond());
}

window.print();
window.reset(time);
}
}

/**
Expand All @@ -150,6 +173,7 @@ final static private class TimeWindow {
private long bytes;
private int maxLatency;
private long totalLatency;
private double elapsedSeconds = 0;

private TimeWindow(String action, long start) {
this.action = action;
Expand All @@ -163,6 +187,7 @@ private void reset(long start) {
this.bytes = 0;
this.maxLatency = 0;
this.totalLatency = 0;
this.elapsedSeconds = 0;
}

/**
Expand All @@ -178,15 +203,32 @@ private void record(long bytes, int latency) {
this.maxLatency = Math.max(this.maxLatency, latency);
}

public void lastTime(long time) {
this.lastTime = time;

assert this.lastTime > this.startTime : "Invalid Start and EndTime";
this.elapsedSeconds = (this.lastTime - this.startTime) / 1000.0;
}

public double getMiBPerSecond() {
assert this.elapsedSeconds > 0 : "Elapsed Seconds cannot be zero";

return (this.bytes / (1024.0 * 1024.0)) / elapsedSeconds;
}

public double getEventsPerSecond() {
assert this.elapsedSeconds > 0 : "Elapsed Seconds cannot be zero";

return count/elapsedSeconds;
}

/**
* Print the window statistics
*/
private void print(long time) {
this.lastTime = time;
private void print() {
assert this.lastTime > this.startTime : "Invalid Start and EndTime";
final double elapsed = (this.lastTime - this.startTime) / 1000.0;
final double recsPerSec = count / elapsed;
final double mbPerSec = (this.bytes / (1024.0 * 1024.0)) / elapsed;
final double recsPerSec = count / elapsedSeconds;
final double mbPerSec = getMiBPerSecond();

log.info(String.format("%8d records %s, %9.1f records/sec, %6.2f MiB/sec, %7.1f ms avg latency, %7.1f ms max latency",
count, action, recsPerSec, mbPerSec, totalLatency / (double) count, (double) maxLatency));
Expand All @@ -202,12 +244,40 @@ private long windowTimeMS(long time) {
}
}

/**
* Perf Recorder that records the number of bytes every second. Percentiles are reported in human form, i.e. MB/s.
*/
@NotThreadSafe
static private class CSVThroughputWriter {
final private CSVPrinter csvPrinter;

CSVThroughputWriter(String action, String csvFile) throws IOException {
csvPrinter = new CSVPrinter(Files.newBufferedWriter(Paths.get(csvFile)), CSVFormat.DEFAULT
.withHeader("Start", "End", "Events", action + " Events Throughput", "Bytes", action + " MiB Throughput"));
}

public void record(long startTime, long lastTime, long bytes, double mbPerSecond, long count, double eventsPerSecond) {
try {
csvPrinter.printRecord(startTime, lastTime, count, Precision.round(eventsPerSecond, 2), bytes, Precision.round(mbPerSecond, 2));
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}

public void close() {
try {
csvPrinter.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

@NotThreadSafe
static private class LatencyWriter {
final static int MS_PER_SEC = 1000;
final static int MS_PER_MIN = MS_PER_SEC * 60;
final static int MS_PER_HR = MS_PER_MIN * 60;
final double[] percentiles = {0.5, 0.75, 0.95, 0.99, 0.999, 0.9999};
final String action;
final int messageSize;
final long startTime;
Expand Down Expand Up @@ -244,12 +314,12 @@ private void countLatencies() {
}

private int[] getPercentiles() {
int[] percentileIds = new int[percentiles.length];
int[] percentileIds = new int[PERCENTILES.length];
int[] values = new int[percentileIds.length];
int index = 0;

for (int i = 0; i < percentiles.length; i++) {
percentileIds[i] = (int) (count * percentiles[i]);
for (int i = 0; i < PERCENTILES.length; i++) {
percentileIds[i] = (int) (count * PERCENTILES[i]);
}

for (int[] lr : latencyRanges) {
Expand Down Expand Up @@ -371,4 +441,4 @@ public synchronized void shutdown(long endTime) throws ExecutionException, Inter
public void recordTime(long startTime, long endTime, int bytes) {
queue.add(new TimeStamp(startTime, endTime, bytes));
}
}
}
29 changes: 26 additions & 3 deletions src/main/java/io/pravega/perf/PravegaPerfTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ public static void main(String[] args) {
"if -1, get the maximum throughput");
options.addOption("writecsv", true, "CSV file to record write latencies");
options.addOption("readcsv", true, "CSV file to record read latencies");
options.addOption("writethroughputcsv", true, "CSV file to record write throughput");
options.addOption("readthroughputcsv", true, "CSV file to record read throughput");
options.addOption("enableConnectionPooling", true, "Set to false to disable connection pooling");
options.addOption("writeWatermarkPeriodMillis", true,
"If -1 (default), watermarks will not be written.\n" +
Expand All @@ -94,6 +96,7 @@ public static void main(String[] args) {
options.addOption("readWatermarkPeriodMillis", true,
"If -1 (default), watermarks will not be read.\n" +
"If >0, watermarks will be read with a period of this many milliseconds.");
options.addOption("reportingIntervalMillis", true, "period (in milliseconds) in which performance will be reported");
options.addOption("createScope", true, "attempt to create Pravega scope(true by default)");

options.addOption("help", false, "Help message");
Expand Down Expand Up @@ -182,7 +185,7 @@ public static Test createTest(long startTime, CommandLine commandline, Options o

static private abstract class Test {
static final int MAXTIME = 60 * 60 * 24;
static final int REPORTINGINTERVAL = 5000;
static final int DEFAULT_REPORTING_INTERVAL = 5000;
static final int TIMEOUT = 1000;
static final String SCOPE = "Scope";

Expand All @@ -209,12 +212,15 @@ static private abstract class Test {
final double throughput;
final String writeFile;
final String readFile;
final String writeThroughputFile;
final String readThroughputFile;
final PerfStats produceStats;
final PerfStats consumeStats;
final long startTime;
final boolean enableConnectionPooling;
final long writeWatermarkPeriodMillis;
final long readWatermarkPeriodMillis;
final int reportingInterval;
final boolean createScope;

Test(long startTime, CommandLine commandline) throws IllegalArgumentException {
Expand Down Expand Up @@ -268,6 +274,23 @@ static private abstract class Test {
writeFile = parseStringOption(commandline, "writecsv", null);
readFile = parseStringOption(commandline, "readcsv", null);

if (commandline.hasOption("writethroughputcsv")) {
writeThroughputFile = commandline.getOptionValue("writethroughputcsv");
} else {
writeThroughputFile = null;
}
if (commandline.hasOption("readthroughputcsv")) {
readThroughputFile = commandline.getOptionValue("readthroughputcsv");
} else {
readThroughputFile = null;
}

if (commandline.hasOption("reportingIntervalMillis")) {
reportingInterval = Integer.parseInt(commandline.getOptionValue("reportingIntervalMillis"));
} else {
reportingInterval = DEFAULT_REPORTING_INTERVAL;
}

enableConnectionPooling = Boolean.parseBoolean(commandline.getOptionValue("enableConnectionPooling", "true"));

writeWatermarkPeriodMillis = Long.parseLong(commandline.getOptionValue("writeWatermarkPeriodMillis", "-1"));
Expand Down Expand Up @@ -309,7 +332,7 @@ static private abstract class Test {
if (writeAndRead) {
produceStats = null;
} else {
produceStats = new PerfStats("Writing", REPORTINGINTERVAL, messageSize, writeFile);
produceStats = new PerfStats("Writing", reportingInterval, messageSize, writeFile, writeThroughputFile);
}

eventsPerProducer = (events + producerCount - 1) / producerCount;
Expand All @@ -334,7 +357,7 @@ static private abstract class Test {
} else {
action = "Reading";
}
consumeStats = new PerfStats(action, REPORTINGINTERVAL, messageSize, readFile);
consumeStats = new PerfStats(action, reportingInterval, messageSize, readFile, readThroughputFile);
eventsPerConsumer = events / consumerCount;
} else {
consumeStats = null;
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/pravega/perf/PravegaWriterWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,4 @@ public void flush() {
public synchronized void close() {
producer.close();
}
}
}

0 comments on commit 3d48804

Please sign in to comment.