diff --git a/README.md b/README.md
index 2da961a1b..da64ce4a2 100644
--- a/README.md
+++ b/README.md
@@ -1,4 +1,170 @@
-# pravega-benchmark
-Performance benchmark tool and scripts for Pravega
+
+
+# Pravega Benchmark Tool
+
+The Pravega benchmark tool used for the performance benchmarking of pravega streaming storage cluster.
+This tool performs the throughput and latency analysis for the multi producers/writers and consumers/readers of pravega.
+it also validates the end to end latency. The write and/or read latencies can be stored in a CSV file for later analysis.
+At the end of the performance benchmarking, this tool outputs the 50th, 75th, 95th , 99th and 99.9th latency percentiles.
+
+
+### Prerequisites
+
+- Java 8+
+- Gradle 4+
+
+### Building
+
+Checkout the source code:
+
+```
+git clone https://github.com/pravega/pravega-benchmark
+cd pravega-benchmark
+```
+
+Build the Pravega benchmark Tool:
+
+```
+./gradlew build
+```
+
+untar the Pravega benchmark tool to local folder
+
+```
+tar -xvf ./build/distributions/pravega-benchmark.tar -C ./run
+```
+
+Running Pravega bencmark tool locally:
+
+```
+
/pravega-benchmark$ ./run/pravega-benchmark/bin/pravega-benchmark -help
+usage: pravega-benchmark
+ -consumers number of consumers
+ -controller controller URI
+ -events number of events/records if 'time' not
+ specified;
+ otherwise, maximum events per second by
+ producer(s) and/or number of events per
+ consumer
+ -help Help message
+ -producers number of producers
+ -readcsv csv file to record read latencies
+ -recreate If the stream is already existing, delete
+ it and recreate it
+ -segments Number of segments
+ -size Size of each message (event or record)
+ -stream Stream name
+ -throughput if > 0 , throughput in MB/s
+ if 0 , writes 'events'
+ if -1, get the maximum throughput
+ -time number of seconds the code runs
+ -transaction Producers use transactions or not
+ -transactionspercommit Number of events before a transaction is
+ committed
+ -writecsv csv file to record write latencies
+```
+
+## Running Performance benchmarking
+
+The Pravega benchmark tool can be executed to
+ - write/read specific amount of events/records to/from the Pravega cluster
+ - write/read the events/records for the specified amount of time
+
+The Pravega benchmark tool can be executed in the following modes:
+```
+1. Burst Mode
+2. Throughput Mode
+3. OPS Mode or Events Rate / Rate limiter Mode
+4. End to End Latency Mode
+```
+
+### 1 - Burst Mode
+In this mode, the Pravega benchmark tool pushes/pulls the messages to/from the pravega client as much as possible.
+This mode is used to find the maximum and throughput that can be obtained from the pravega cluster.
+This mode can be used for both producers and consumers.
+
+```
+For example:
+/run/pravega-benchmark/bin/pravega-benchmark -controller tcp://127.0.0.1:9090 -stream streamname1 -segments 1 -producers 1 -size 100 -throughput -1 -time 60
+
+The -throughput -1 indicates the burst mode.
+This test will executed for 60 seconds because option -time 60 is used.
+This test tries to write and read events of size 100 bytes to/from the stream 'streamname1'.
+The option '-controller tcp://127.0.0.1:9090' specifies the pravega controller IP address and port number.
+Note that -producers 1 indicates 1 producer/writers.
+
+in the case you want to write/read the certain number of events use the -events option without -time option as follows
+
+/run/pravega-benchmark/bin/pravega-benchmark -controller tcp://127.0.0.1:9090 -stream streamname1 -segments 1 -producers 1 -size 100 -throughput -1 -events 1000000
+
+-events indicates that total of events to write/read
+```
+
+### 2 - Throughput Mode
+In this mode, the Pravega benchmark tool pushes the messages to the pravega client with specified approximate maximum throughput in terms of Mega Bytes/second (MB/s).
+This mode is used to find the least latency that can be obtained from the pravega cluster for given throughput.
+This mode is used only for write operation.
+
+```
+For example:
+/run/pravega-benchmark/bin/pravega-benchmark -controller tcp://127.0.0.1:9090 -stream streamname5 -segments 5 -producers 5 -size 100 -throughput 10 -time 300
+
+The -throughput indicates the Throughput mode.
+
+This test will be executed with approximate max throughput of 10MB/sec.
+This test will executed for 300 seconds (5 minutes) because option -time 60 is used.
+This test tries to write and read events of size 100 bytes to/from the stream 'streamname5' of 5 segments.
+If the stream 'streamname5' is not existing , then it will be created with the 5 segments.
+if the steam is already existing then it will be scaled up/down to 5 segments.
+Note that -producers 5 indicates 5 producers/writers .
+
+in the case you want to write/read the certain number of events use the -events option without -time option as follows
+
+/run/pravega-benchmark/bin/pravega-benchmark -controller tcp://127.0.0.1:9090 -stream streamname5 -segments 5 -producers 1 -size 100 -throughput 10 -events 1000000
+
+-events 1000000 indicates that total 1000000 (1 million) of events will be written at the throughput speed of 10MB/sec
+```
+
+### 3 - OPS Mode or Events Rate / Rate Limiter Mode
+This mode is another form of controlling writers throughput by limiting the number of events per second.
+In this mode, the Pravega benchmark tool pushes the messages to the pravega client with specified approximate maximum events per sec.
+This mode is used to find the least latency that can be obtained from the pravega cluster for events rate.
+This mode is used only for write operation.
+
+```
+For example:
+/run/pravega-benchmark/bin/pravega-benchmark -controller tcp://127.0.0.1:9090 -stream streamname1 -segments 1 -producers 5 -size 100 -events 1000 -time 60
+
+The -events (1000 ) specifies the events per second to write.
+Note that the option "-throughput" SHOULD NOT supplied for this OPS Mode or Events Rate / Rate limiter Mode.
+
+This test will be executed with approximate 1000 events per second by 6 producers.
+This test will executed for 300 seconds (5 minutes) because option -time 60 is used.
+Note that in this mode, there is 'NO total number of events' to specify hence user must supply the time to run using -time option.
+```
+
+### 4 - End to End Latency Mode
+In this mode, the Pravega benchmark tool writes and read the messages to the pravega cluster and records the end to end latency.
+End to end latency means the time duration between the beginning of the writing event/record to stream and the time after reading the event/record.
+in this mode user must specify both the number of producers and consumers.
+The -throughput option (Throughput mode) or -events (late limiter) can used to limit the writers throughput or events rate.
+
+```
+For example:
+./run/pravega-benchmark/bin/pravega-benchmark -controller tcp://127.0.0.1:9090 -stream streamname3 -segments 1 -producers 1 -consumers 1 -size 100 -throughput -1 -time 60
+
+The user should specify both producers and consumers count for write to read or End to End latency mode. it should be set to true.
+The -throughput -1 specifies the writes tries to write the events at the maximum possible speed.
+```
+
+### Recording the latencies to CSV files
+user can use the options "-writecsv " to record the latencies of writers and "-readcsv " for readers.
+in case of End to End latency mode, if the user can supply only -readcsv to get the end to end latency in to the csv file.
diff --git a/src/main/java/io/pravega/perf/PerfStats.java b/src/main/java/io/pravega/perf/PerfStats.java
index 44fa637ec..3b8950ff9 100644
--- a/src/main/java/io/pravega/perf/PerfStats.java
+++ b/src/main/java/io/pravega/perf/PerfStats.java
@@ -10,52 +10,135 @@
package io.pravega.perf;
import java.io.IOException;
-import java.util.Arrays;
-import java.util.concurrent.CompletableFuture;
-import java.time.Instant;
-import java.time.Duration;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.ArrayList;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.locks.LockSupport;
import java.nio.file.Files;
import java.nio.file.Paths;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVPrinter;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVRecord;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.NotThreadSafe;
/**
- * class for Performance statistics.
+ * Class for Performance statistics.
*/
public class PerfStats {
- private static final long NANOSEC_PER_SEC = 1000000000L;
- final private int messageSize;
final private String action;
- final private Instant start;
- private long[] latencies;
- private int sampling;
- private int index;
- private long count;
- private long bytes;
- private long maxLatency;
- private long totalLatency;
- final private long windowInterval;
- private timeWindow window;
- final private AtomicLong eventID;
- final private CSVPrinter printer;
+ final private String csvFile;
+ final private int messageSize;
+ final private int windowInterval;
+ final private ConcurrentLinkedQueue queue;
+ final private ForkJoinPool executor;
+
+ @GuardedBy("this")
+ private Future ret;
+
+ /**
+ * Private class for start and end time.
+ */
+ final static private class TimeStamp {
+ final private long startTime;
+ final private long endTime;
+ final private int bytes;
+
+ private TimeStamp(long startTime, long endTime, int bytes) {
+ this.startTime = startTime;
+ this.endTime = endTime;
+ this.bytes = bytes;
+ }
+
+ private TimeStamp(long endTime) {
+ this(-1, endTime, -1);
+ }
+
+ private boolean isEnd() {
+ return this.bytes == -1 && this.startTime == -1;
+ }
+ }
+
+ public PerfStats(String action, int reportingInterval, int messageSize, String csvFile) {
+ this.action = action;
+ this.messageSize = messageSize;
+ this.windowInterval = reportingInterval;
+ this.csvFile = csvFile;
+ this.queue = new ConcurrentLinkedQueue<>();
+ this.executor = new ForkJoinPool(1);
+ this.ret = null;
+ }
+
+ /**
+ * Private class for start and end time.
+ */
+ final private class QueueProcessor implements Callable {
+ final private long startTime;
+
+ private QueueProcessor(long startTime) {
+ this.startTime = startTime;
+ }
+
+ public Void call() throws IOException {
+ final TimeWindow window = new TimeWindow(action, startTime);
+ final LatencyWriter latencyRecorder = csvFile == null ? new LatencyWriter(action, messageSize, startTime) :
+ new CSVLatencyWriter(action, messageSize, startTime, csvFile);
+ boolean doWork = true;
+ long time = startTime;
+ TimeStamp t;
+ while (doWork) {
+ t = queue.poll();
+ if (t != null) {
+ if (t.isEnd()) {
+ doWork = false;
+ } else {
+ final int latency = (int) (t.endTime - t.startTime);
+ window.record(t.bytes, latency);
+ latencyRecorder.record(t.startTime, t.bytes, latency);
+ }
+ time = t.endTime;
+ } else {
+ LockSupport.parkNanos(500);
+ time = System.currentTimeMillis();
+ }
+ if (window.windowTimeMS(time) > windowInterval) {
+ window.print(time);
+ window.reset(time);
+ }
+ }
+ latencyRecorder.printTotal(time);
+ return null;
+ }
+ }
/**
- * private class for Performance statistics within a given time window.
+ * Private class for Performance statistics within a given time window.
*/
- private class timeWindow {
- final private Instant startTime;
- private Instant lastTime;
+ @NotThreadSafe
+ final static private class TimeWindow {
+ final private String action;
+ private long startTime;
+ private long lastTime;
private long count;
private long bytes;
- private long maxLatency;
+ private int maxLatency;
private long totalLatency;
- public timeWindow() {
- this.startTime = Instant.now();
+ private TimeWindow(String action, long start) {
+ this.action = action;
+ reset(start);
+ }
+
+ private void reset(long start) {
+ this.startTime = start;
this.lastTime = this.startTime;
this.count = 0;
this.bytes = 0;
@@ -64,12 +147,12 @@ public timeWindow() {
}
/**
- * record the latency and bytes
+ * Record the latency and bytes
*
- * @param latency latency in ms.
* @param bytes number of bytes.
+ * @param latency latency in ms.
*/
- public void record(long latency, long bytes) {
+ private void record(long bytes, int latency) {
this.count++;
this.totalLatency += latency;
this.bytes += bytes;
@@ -77,11 +160,12 @@ public void record(long latency, long bytes) {
}
/**
- * print the window statistics
+ * Print the window statistics
*/
- public void print(Instant time) {
+ private void print(long time) {
this.lastTime = time;
- final double elapsed = Duration.between(this.startTime, this.lastTime).toMillis() / 1000.0;
+ assert this.lastTime > this.startTime : "Invalid Start and EndTime";
+ final double elapsed = (this.lastTime - this.startTime) / 1000.0;
final double recsPerSec = count / elapsed;
final double mbPerSec = (this.bytes / (1024.0 * 1024.0)) / elapsed;
@@ -90,142 +174,193 @@ public void print(Instant time) {
}
/**
- * get the current time duration of this window
+ * Get the current time duration of this window
*
* @param time current time.
*/
- public long windowTimeMS(Instant time) {
- return Duration.between(this.startTime, time).toMillis();
+ private long windowTimeMS(long time) {
+ return time - startTime;
}
+ }
- /**
- * get the time duration of this window
- */
- public long windowTimeMS() {
- return windowTimeMS(Instant.now());
+ @NotThreadSafe
+ static private class LatencyWriter {
+ final static int MS_PER_SEC = 1000;
+ final static int MS_PER_MIN = MS_PER_SEC * 60;
+ final static int MS_PER_HR = MS_PER_MIN * 60;
+ final double[] percentiles = {0.5, 0.75, 0.95, 0.99, 0.999};
+ final String action;
+ final int messageSize;
+ final long startTime;
+ final int[] latencies;
+ long count;
+ long totalLatency;
+ long maxLatency;
+ long totalBytes;
+ ArrayList latencyRanges;
+
+ private class LatencyRange {
+ private final int latency;
+ private final int start;
+ private final int end;
+
+ private LatencyRange(int latency, int start, int end) {
+ this.latency = latency;
+ this.start = start;
+ this.end = end;
+ }
}
- }
- public PerfStats(String action, int reportingInterval, int messageSize, long numRecords, String csvFile) throws IOException {
- this.action = action;
- this.start = Instant.now();
- this.count = 0;
- this.sampling = (int) (numRecords / Math.min(numRecords, 500000));
- this.latencies = new long[(int) (numRecords / this.sampling) + 1];
- this.index = 0;
- this.maxLatency = 0;
- this.totalLatency = 0;
- this.windowInterval = reportingInterval;
- this.messageSize = messageSize;
- this.window = new timeWindow();
- this.eventID = new AtomicLong();
- if (csvFile != null) {
- this.printer = new CSVPrinter(Files.newBufferedWriter(Paths.get(csvFile)), CSVFormat.DEFAULT
- .withHeader("#", "Event ID", "event size (bytes)", "Start Time (Nanoseconds)", action + " Latency (Milliseconds)"));
- } else {
- this.printer = null;
+ LatencyWriter(String action, int messageSize, long startTime) {
+ this.action = action;
+ this.messageSize = messageSize;
+ this.startTime = startTime;
+ this.latencies = new int[MS_PER_HR];
+ this.latencyRanges = null;
+ this.totalLatency = 0;
+ this.maxLatency = 0;
+ this.count = 0;
}
- }
+ private void countLatencies() {
+ count = 0;
+ latencyRanges = new ArrayList<>();
+ for (int i = 0, cur = 0; i < latencies.length; i++) {
+ if (latencies[i] > 0) {
+ latencyRanges.add(new LatencyRange(i, cur, cur + latencies[i]));
+ cur += latencies[i] + 1;
+ totalLatency += i * latencies[i];
+ count += latencies[i];
+ maxLatency = i;
+ }
+ }
+ }
- private synchronized void record(long event, int bytes, Instant startTime, Instant endTime) throws IOException {
+ private int[] getPercentiles() {
+ int[] percentileIds = new int[percentiles.length];
+ int[] values = new int[percentileIds.length];
+ int index = 0;
- final long latency = Duration.between(startTime, endTime).toMillis();
- this.count++;
- this.bytes += bytes;
- this.totalLatency += latency;
- this.maxLatency = Math.max(this.maxLatency, latency);
- window.record(latency, bytes);
+ for (int i = 0; i < percentiles.length; i++) {
+ percentileIds[i] = (int) (count * percentiles[i]);
+ }
- if (this.count % this.sampling == 0) {
- this.latencies[index] = latency;
- this.index++;
+ for (LatencyRange lr : latencyRanges) {
+ while ((index < percentileIds.length) &&
+ (lr.start <= percentileIds[index]) && (percentileIds[index] <= lr.end)) {
+ values[index++] = lr.latency;
+ }
+ }
+ return values;
}
- if (this.printer != null) {
- final long nanotime = startTime.getEpochSecond() * NANOSEC_PER_SEC + startTime.getNano();
- printer.printRecord(count, event, bytes, nanotime, latency);
+ public void record(int bytes, int latency) {
+ totalBytes += bytes;
+ latencies[latency]++;
}
- }
- private static long[] percentiles(long[] latencies, int count, double... percentiles) {
- int size = Math.min(count, latencies.length);
- Arrays.sort(latencies, 0, size);
- long[] values = new long[percentiles.length];
- for (int i = 0; i < percentiles.length; i++) {
- int index = (int) (percentiles[i] * size);
- values[i] = latencies[index];
+ public void record(long start, int bytes, int latency) {
+ this.record(bytes, latency);
+ }
+
+ public void printTotal(long endTime) {
+ countLatencies();
+ final double elapsed = (endTime - startTime) / 1000.0;
+ final double recsPerSec = count / elapsed;
+ final double mbPerSec = (this.totalBytes / (1024.0 * 1024.0)) / elapsed;
+ int[] percs = getPercentiles();
+
+ System.out.printf(
+ "%d records %s, %.3f records/sec, %d bytes record size, %.2f MB/sec, %.1f ms avg latency, %.1f ms max latency" +
+ ", %d ms 50th, %d ms 75th, %d ms 95th, %d ms 99th, %d ms 99.9th\n",
+ count, action, recsPerSec, messageSize, mbPerSec, totalLatency / ((double) count), (double) maxLatency,
+ percs[0], percs[1], percs[2], percs[3], percs[4]);
}
- return values;
}
- /**
- * print the performance statistics of current time window.
- */
- public synchronized void print() throws IOException {
-
- final Instant time = Instant.now();
- if (window.windowTimeMS(time) >= windowInterval) {
- window.print(time);
- this.window = new timeWindow();
- if (printer != null) {
- printer.flush();
+ @NotThreadSafe
+ static private class CSVLatencyWriter extends LatencyWriter {
+ final private String csvFile;
+ final private CSVPrinter csvPrinter;
+
+ CSVLatencyWriter(String action, int messageSize, long start, String csvFile) throws IOException {
+ super(action, messageSize, start);
+ this.csvFile = csvFile;
+ csvPrinter = new CSVPrinter(Files.newBufferedWriter(Paths.get(csvFile)), CSVFormat.DEFAULT
+ .withHeader("Start Time (Milliseconds)", "event size (bytes)", action + " Latency (Milliseconds)"));
+ }
+
+ private void readCSV() {
+ try {
+ CSVParser csvParser = new CSVParser(Files.newBufferedReader(Paths.get(csvFile)), CSVFormat.DEFAULT
+ .withFirstRecordAsHeader().withIgnoreHeaderCase().withTrim());
+
+ for (CSVRecord csvEntry : csvParser) {
+ record(Integer.parseInt(csvEntry.get(1)), Integer.parseInt(csvEntry.get(2)));
+ }
+ csvParser.close();
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
}
}
+
+ @Override
+ public void record(long start, int bytes, int latency) {
+ try {
+ csvPrinter.printRecord(start, bytes, latency);
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Override
+ public void printTotal(long endTime) {
+ try {
+ csvPrinter.close();
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ readCSV();
+ super.printTotal(endTime);
+ }
}
/**
- * print the final performance statistics.
+ * Start the performance statistics.
+ *
+ * @param startTime start time time
*/
- public synchronized void printTotal(Instant endTime) throws IOException {
-
- final double elapsed = Duration.between(start, endTime).toMillis() / 1000.0;
- final double recsPerSec = count / elapsed;
- final double mbPerSec = (this.bytes / (1024.0 * 1024.0)) / elapsed;
-
- long[] percs = percentiles(this.latencies, index, 0.5, 0.95, 0.99, 0.999);
- System.out.printf(
- "%d records %s, %.3f records/sec, %d bytes record size, %.2f MB/sec, %.1f ms avg latency, %.1f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.\n",
- count, action, recsPerSec, messageSize, mbPerSec, totalLatency / ((double) count), (double) maxLatency,
- percs[0], percs[1], percs[2], percs[3]);
- if (printer != null) {
- printer.close();
+ public synchronized void start(long startTime) {
+ if (this.ret == null) {
+ this.ret = executor.submit(new QueueProcessor(startTime));
}
}
/**
- * record the data write/read time of given length of data.
+ * End the final performance statistics.
*
- * @param retVal future to wait for.
- * @param startTime starting time
- * @param length length of data read/written
- * @return a completable future for recording the end time.
+ * @param endTime End time
+ * @throws ExecutionException If an exception occurred.
+ * @throws InterruptedException If an exception occurred.
*/
- public CompletableFuture recordTime(CompletableFuture retVal, Instant startTime, int length) throws IOException {
- final Instant time = Instant.now();
- final long event = eventID.incrementAndGet();
- if (retVal == null) {
- record(event, length, startTime, time);
- } else {
- retVal = retVal.thenAccept(d -> {
- final Instant endTime = Instant.now();
- try {
- record(event, length, startTime, endTime);
- } catch (IOException ex) {
- ex.printStackTrace();
- }
- });
+ public synchronized void shutdown(long endTime) throws ExecutionException, InterruptedException {
+ if (this.ret != null) {
+ queue.add(new TimeStamp(endTime));
+ ret.get();
+ executor.shutdownNow();
+ queue.clear();
+ this.ret = null;
}
- return retVal;
}
/**
- * get the rate of events.
+ * Record the data write/read time of data.
*
- * @return rate of number of events till now.
- */
- public synchronized int eventsRate() {
- final double elapsed = Duration.between(start, Instant.now()).toMillis() / 1000.0;
- return (int) (count / elapsed);
+ * @param startTime starting time
+ * @param endTime End time
+ * @param bytes number of bytes written or read
+ **/
+ public void recordTime(long startTime, long endTime, int bytes) {
+ queue.add(new TimeStamp(startTime, endTime, bytes));
}
-}
+}
\ No newline at end of file
diff --git a/src/main/java/io/pravega/perf/Performance.java b/src/main/java/io/pravega/perf/Performance.java
index 125f39b8d..b99519833 100644
--- a/src/main/java/io/pravega/perf/Performance.java
+++ b/src/main/java/io/pravega/perf/Performance.java
@@ -11,8 +11,10 @@
package io.pravega.perf;
import java.io.IOException;
-import java.util.concurrent.ExecutionException;
public interface Performance {
- void benchmark() throws InterruptedException, ExecutionException, IOException;
+ int TIME_HEADER_SIZE = 14;
+ String TIME_HEADER_FORMAT = "%0" + TIME_HEADER_SIZE + "d";
+
+ void benchmark() throws InterruptedException, IOException;
}
diff --git a/src/main/java/io/pravega/perf/PravegaPerfTest.java b/src/main/java/io/pravega/perf/PravegaPerfTest.java
index 04a14ec78..0c8919edb 100644
--- a/src/main/java/io/pravega/perf/PravegaPerfTest.java
+++ b/src/main/java/io/pravega/perf/PravegaPerfTest.java
@@ -13,154 +13,104 @@
import io.pravega.client.ClientConfig;
import io.pravega.client.ClientFactory;
import io.pravega.client.stream.ReaderGroup;
-import io.pravega.client.stream.ReinitializationRequiredException;
import io.pravega.client.stream.impl.ControllerImpl;
import io.pravega.client.stream.impl.ControllerImplConfig;
import io.pravega.client.stream.impl.ClientFactoryImpl;
import java.io.IOException;
-import java.time.Instant;
+import java.net.URISyntaxException;
-import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.Option;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
import java.net.URI;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
-import java.util.List;
import java.util.stream.Stream;
import java.util.stream.IntStream;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Executors;
/**
* Performance benchmark for Pravega.
* Data format is in comma separated format as following: {TimeStamp, Sensor Id, Location, TempValue }.
*/
public class PravegaPerfTest {
-
- private static String controllerUri = "tcp://localhost:9090";
- private static int messageSize = 100;
- private static String streamName = StartLocalService.STREAM_NAME;
- private static String scopeName = StartLocalService.SCOPE;
- private static boolean recreate = false;
- private static int producerCount = 0;
- private static int consumerCount = 0;
- private static int segmentCount = 0;
- private static int events = 3000;
- private static boolean isRandomKey = false;
- private static int transactionPerCommit = 0;
- private static int runtimeSec = 0;
- private static final int reportingInterval = 5000;
- private static ScheduledExecutorService bgexecutor;
- private static ForkJoinPool fjexecutor;
- private static PerfStats produceStats, consumeStats;
- private static double throughput = 0;
- private static ThroughputController tput;
- private static String writeFile = null;
- private static String readFile = null;
+ final static String BENCHMARKNAME = "pravega-benchmark";
public static void main(String[] args) {
- ReaderGroup readerGroup = null;
- final int timeout = 10;
- final ClientFactory factory;
- ControllerImpl controller = null;
+ final Options options = new Options();
+ final HelpFormatter formatter = new HelpFormatter();
+ final CommandLineParser parser;
+ CommandLine commandline = null;
+ Option opt = null;
+ final long startTime = System.currentTimeMillis();
- final List> readers;
- final List> writers;
+ opt = new Option("controller", true, "controller URI");
+ opt.setRequired(true);
+ options.addOption(opt);
+ opt = new Option("stream", true, "Stream name");
+ opt.setRequired(true);
+ options.addOption(opt);
- try {
- parseCmdLine(args);
- } catch (ParseException p) {
- p.printStackTrace();
- System.exit(1);
- }
- if (producerCount == 0 && consumerCount == 0) {
- System.out.println("Error: Must specify the number of producers or Consumers");
- System.exit(1);
- }
+ options.addOption("producers", true, "number of producers");
+ options.addOption("consumers", true, "number of consumers");
+ options.addOption("events", true,
+ "number of events/records if 'time' not specified;\n" +
+ "otherwise, maximum events per second by producer(s) " +
+ "and/or number of events per consumer");
+ options.addOption("time", true, "number of seconds the code runs");
+ options.addOption("transaction", true, "Producers use transactions or not");
+ options.addOption("transactionspercommit", true,
+ "Number of events before a transaction is committed");
+ options.addOption("segments", true, "Number of segments");
+ options.addOption("size", true, "Size of each message (event or record)");
+ options.addOption("recreate", true,
+ "If the stream is already existing, delete it and recreate it");
+ options.addOption("throughput", true,
+ "if > 0 , throughput in MB/s\n" +
+ "if 0 , writes 'events'\n" +
+ "if -1, get the maximum throughput");
+ options.addOption("writecsv", true, "csv file to record write latencies");
+ options.addOption("readcsv", true, "csv file to record read latencies");
- bgexecutor = Executors.newScheduledThreadPool(10);
- fjexecutor = new ForkJoinPool();
+ options.addOption("help", false, "Help message");
+ parser = new DefaultParser();
try {
+ commandline = parser.parse(options, args);
+ } catch (ParseException ex) {
+ ex.printStackTrace();
+ formatter.printHelp(BENCHMARKNAME, options);
+ System.exit(0);
+ }
- controller = new ControllerImpl(ControllerImplConfig.builder()
- .clientConfig(ClientConfig.builder()
- .controllerURI(new URI(controllerUri)).build())
- .maxBackoffMillis(5000).build(),
- bgexecutor);
-
- PravegaStreamHandler streamHandle = new PravegaStreamHandler(scopeName, streamName, controllerUri,
- segmentCount, timeout, controller,
- bgexecutor);
-
- if (producerCount > 0 && !streamHandle.create()) {
- if (recreate) {
- streamHandle.recreate();
- } else {
- streamHandle.scale();
- }
- }
-
- factory = new ClientFactoryImpl(scopeName, controller);
- final Instant StartTime = Instant.now();
-
- if (consumerCount > 0) {
- readerGroup = streamHandle.createReaderGroup();
- consumeStats = new PerfStats("Reading", reportingInterval, messageSize, consumerCount * events * (runtimeSec + 1), readFile);
- readers = IntStream.range(0, consumerCount)
- .boxed()
- .map(i -> new PravegaReaderWorker(i, events,
- runtimeSec, StartTime, consumeStats,
- streamName, timeout, factory))
- .collect(Collectors.toList());
- } else {
- readers = null;
- consumeStats = null;
- }
+ if (commandline.hasOption("help")) {
+ formatter.printHelp(BENCHMARKNAME, options);
+ System.exit(0);
+ }
- if (producerCount > 0) {
+ final Test perfTest = createTest(startTime, commandline, options);
+ if (perfTest == null) {
+ System.exit(0);
+ }
- produceStats = new PerfStats("Writing", reportingInterval, messageSize, producerCount * events * (runtimeSec + 1), writeFile);
- if (throughput == 0 && runtimeSec > 0) {
- tput = new ThroughputController(events);
- } else {
- tput = new ThroughputController(messageSize, throughput);
- }
+ final ForkJoinPool executor = new ForkJoinPool();
- if (transactionPerCommit > 0) {
- writers = IntStream.range(0, producerCount)
- .boxed()
- .map(i -> new PravegaTransactionWriterWorker(i, events,
- runtimeSec, isRandomKey,
- messageSize, StartTime,
- produceStats, streamName,
- tput, factory,
- transactionPerCommit))
- .collect(Collectors.toList());
- } else {
- writers = IntStream.range(0, producerCount)
- .boxed()
- .map(i -> new PravegaWriterWorker(i, events,
- runtimeSec, isRandomKey,
- messageSize, StartTime,
- produceStats, streamName,
- tput, factory))
- .collect(Collectors.toList());
- }
- } else {
- writers = null;
- produceStats = null;
- }
+ try {
+ final List> producers = perfTest.getProducers();
+ final List> consumers = perfTest.getConsumers();
- final List> workers = Stream.of(readers, writers)
+ final List> workers = Stream.of(producers, consumers)
.filter(x -> x != null)
.flatMap(x -> x.stream())
.collect(Collectors.toList());
@@ -169,116 +119,118 @@ public static void main(String[] args) {
public void run() {
try {
System.out.println();
- shutdown();
- } catch (InterruptedException | IOException e) {
- e.printStackTrace();
+ executor.shutdown();
+ executor.awaitTermination(1, TimeUnit.SECONDS);
+ perfTest.shutdown(System.currentTimeMillis());
+ } catch (InterruptedException ex) {
+ ex.printStackTrace();
}
}
});
-
- fjexecutor.invokeAll(workers);
- shutdown();
- } catch (Exception e) {
- e.printStackTrace();
+ perfTest.start(System.currentTimeMillis());
+ executor.invokeAll(workers);
+ executor.shutdown();
+ executor.awaitTermination(1, TimeUnit.SECONDS);
+ perfTest.shutdown(System.currentTimeMillis());
+ } catch (Exception ex) {
+ ex.printStackTrace();
}
-
System.exit(0);
}
- private static synchronized void shutdown() throws InterruptedException, IOException {
- final Instant endTime = Instant.now();
- if (fjexecutor == null) {
- return;
- }
- fjexecutor.shutdown();
- fjexecutor.awaitTermination(1, TimeUnit.SECONDS);
- fjexecutor = null;
- if (produceStats != null) {
- produceStats.printTotal(endTime);
- }
-
- if (consumeStats != null) {
- consumeStats.printTotal(endTime);
+ public static Test createTest(long startTime, CommandLine commandline, Options options) {
+ try {
+ return new PravegaTest(startTime, commandline);
+ } catch (IllegalArgumentException ex) {
+ ex.printStackTrace();
+ final HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp(BENCHMARKNAME, options);
+ } catch (URISyntaxException | InterruptedException ex) {
+ ex.printStackTrace();
+ } catch (Exception ex) {
+ ex.printStackTrace();
}
+ return null;
}
- private static void parseCmdLine(String[] args) throws ParseException {
- // create Options object
- Options options = new Options();
- final CommandLineParser parser;
- final CommandLine commandline;
-
- options.addOption("controller", true, "controller URI");
- options.addOption("producers", true, "number of producers");
- options.addOption("consumers", true, "number of consumers");
- options.addOption("events", true,
- "number of events/records per producer/consumer if 'time' not specified;\n" +
- "otherwise, maximum events per second by producer(s)" +
- "and/or number of events per consumer");
- options.addOption("time", true, "number of seconds the code runs");
- options.addOption("transaction", true, "Producers use transactions or not");
- options.addOption("size", true, "Size of each message (event or record)");
- options.addOption("stream", true, "Stream name");
- options.addOption("randomkey", true,
- "Set Random key default is one key per producer");
- options.addOption("transactionspercommit", true,
- "Number of events before a transaction is committed");
- options.addOption("segments", true, "Number of segments");
- options.addOption("recreate", true,
- "If the stream is already existing, delete it and recreate it");
-
- options.addOption("throughput", true,
- "if > 0 , throughput in MB/s\n" +
- "if 0 , writes 'events'\n" +
- "if -1, get the maximum throughput");
- options.addOption("writecsv", true, "csv file to record write latencies");
- options.addOption("readcsv", true, "csv file to record read latencies");
-
- options.addOption("help", false, "Help message");
-
- parser = new BasicParser();
- commandline = parser.parse(options, args);
-
- // Since it is command line sample producer, user inputs will be accepted from console
- if (commandline.hasOption("help")) {
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp("pravega-benchmark", options);
- System.exit(0);
- } else {
+ static private abstract class Test {
+ static final int MAXTIME = 60 * 60 * 24;
+ static final int REPORTINGINTERVAL = 5000;
+ static final int TIMEOUT = 10;
+ static final String SCOPE = "Scope";
+ final String controllerUri;
+ final int messageSize;
+ final String streamName;
+ final String scopeName;
+ final boolean recreate;
+ final boolean writeAndRead;
+ final int producerCount;
+ final int consumerCount;
+ final int segmentCount;
+ final int events;
+ final int eventsPerSec;
+ final int eventsPerProducer;
+ final int eventsPerConsumer;
+ final int transactionPerCommit;
+ final int runtimeSec;
+ final double throughput;
+ final String writeFile;
+ final String readFile;
+ final PerfStats produceStats;
+ final PerfStats consumeStats;
+ final long startTime;
+
+ Test(long startTime, CommandLine commandline) throws IllegalArgumentException {
+ this.startTime = startTime;
if (commandline.hasOption("controller")) {
controllerUri = commandline.getOptionValue("controller");
+ } else {
+ controllerUri = null;
}
+
if (commandline.hasOption("producers")) {
producerCount = Integer.parseInt(commandline.getOptionValue("producers"));
+ } else {
+ producerCount = 0;
}
if (commandline.hasOption("consumers")) {
consumerCount = Integer.parseInt(commandline.getOptionValue("consumers"));
+ } else {
+ consumerCount = 0;
}
if (commandline.hasOption("events")) {
events = Integer.parseInt(commandline.getOptionValue("events"));
+ } else {
+ events = 0;
}
if (commandline.hasOption("time")) {
runtimeSec = Integer.parseInt(commandline.getOptionValue("time"));
+ } else if (events > 0) {
+ runtimeSec = 0;
+ } else {
+ runtimeSec = MAXTIME;
}
if (commandline.hasOption("size")) {
messageSize = Integer.parseInt(commandline.getOptionValue("size"));
+ } else {
+ messageSize = 0;
}
if (commandline.hasOption("stream")) {
streamName = commandline.getOptionValue("stream");
- }
-
- if (commandline.hasOption("randomkey")) {
- isRandomKey = Boolean.parseBoolean(commandline.getOptionValue("randomkey"));
+ } else {
+ streamName = null;
}
if (commandline.hasOption("transactionspercommit")) {
transactionPerCommit = Integer.parseInt(commandline.getOptionValue("transactionspercommit"));
+ } else {
+ transactionPerCommit = 0;
}
if (commandline.hasOption("segments")) {
@@ -289,23 +241,177 @@ private static void parseCmdLine(String[] args) throws ParseException {
if (commandline.hasOption("recreate")) {
recreate = Boolean.parseBoolean(commandline.getOptionValue("recreate"));
+ } else {
+ recreate = producerCount > 0 && consumerCount > 0;
}
if (commandline.hasOption("throughput")) {
throughput = Double.parseDouble(commandline.getOptionValue("throughput"));
+ } else {
+ throughput = -1;
}
+
if (commandline.hasOption("writecsv")) {
writeFile = commandline.getOptionValue("writecsv");
+ } else {
+ writeFile = null;
}
if (commandline.hasOption("readcsv")) {
readFile = commandline.getOptionValue("readcsv");
+ } else {
+ readFile = null;
+ }
+
+ scopeName = SCOPE;
+ if (producerCount == 0 && consumerCount == 0) {
+ throw new IllegalArgumentException("Error: Must specify the number of producers or Consumers");
+ }
+
+ if (producerCount > 0) {
+ if (messageSize == 0) {
+ throw new IllegalArgumentException("Error: Must specify the event 'size'");
+ }
+
+ writeAndRead = consumerCount > 0;
+
+ if (writeAndRead) {
+ produceStats = null;
+ } else {
+ produceStats = new PerfStats("Writing", REPORTINGINTERVAL, messageSize, writeFile);
+ }
+ eventsPerProducer = (events + producerCount - 1) / producerCount;
+ if (throughput < 0 && runtimeSec > 0) {
+ eventsPerSec = events / producerCount;
+ } else if (throughput > 0) {
+ eventsPerSec = (int) (((throughput * 1024 * 1024) / messageSize) / producerCount);
+ } else {
+ eventsPerSec = 0;
+ }
+ } else {
+ produceStats = null;
+ eventsPerProducer = 0;
+ eventsPerSec = 0;
+ writeAndRead = false;
+ }
+
+ if (consumerCount > 0) {
+ String action;
+ if (writeAndRead) {
+ action = "Write/Reading";
+ } else {
+ action = "Reading";
+ }
+ consumeStats = new PerfStats(action, REPORTINGINTERVAL, messageSize, readFile);
+ eventsPerConsumer = events / consumerCount;
+ } else {
+ consumeStats = null;
+ eventsPerConsumer = 0;
+ }
+
+ }
+
+ private void start(long startTime) throws IOException {
+ if (produceStats != null && consumeStats == null) {
+ produceStats.start(startTime);
+ }
+ if (consumeStats != null) {
+ consumeStats.start(startTime);
}
}
+
+ private void shutdown(long endTime) {
+ try {
+ if (produceStats != null && consumeStats == null) {
+ produceStats.shutdown(endTime);
+ }
+ if (consumeStats != null) {
+ consumeStats.shutdown(endTime);
+ }
+ } catch (ExecutionException | InterruptedException ex) {
+ ex.printStackTrace();
+ }
+ }
+
+ public abstract List> getProducers();
+
+ public abstract List> getConsumers() throws URISyntaxException;
+
}
- private static class StartLocalService {
- static final int PORT = 9090;
- static final String SCOPE = "Scope";
- static final String STREAM_NAME = "aaj";
+ static private class PravegaTest extends Test {
+ final PravegaStreamHandler streamHandle;
+ final ClientFactory factory;
+
+ PravegaTest(long startTime, CommandLine commandline) throws
+ IllegalArgumentException, URISyntaxException, InterruptedException, Exception {
+ super(startTime, commandline);
+ final ScheduledExecutorService bgExecutor = Executors.newScheduledThreadPool(10);
+ final ControllerImpl controller = new ControllerImpl(ControllerImplConfig.builder()
+ .clientConfig(ClientConfig.builder()
+ .controllerURI(new URI(controllerUri)).build())
+ .maxBackoffMillis(5000).build(),
+ bgExecutor);
+
+ streamHandle = new PravegaStreamHandler(scopeName, streamName, controllerUri,
+ segmentCount, TIMEOUT, controller,
+ bgExecutor);
+
+ if (producerCount > 0 && !streamHandle.create()) {
+ if (recreate) {
+ streamHandle.recreate();
+ } else {
+ streamHandle.scale();
+ }
+ }
+
+ factory = new ClientFactoryImpl(scopeName, controller);
+ }
+
+ public List> getProducers() {
+ final List> writers;
+
+ if (producerCount > 0) {
+ if (transactionPerCommit > 0) {
+ writers = IntStream.range(0, producerCount)
+ .boxed()
+ .map(i -> new PravegaTransactionWriterWorker(i, eventsPerProducer,
+ runtimeSec, false,
+ messageSize, startTime,
+ produceStats, streamName,
+ eventsPerSec, writeAndRead, factory,
+ transactionPerCommit))
+ .collect(Collectors.toList());
+ } else {
+ writers = IntStream.range(0, producerCount)
+ .boxed()
+ .map(i -> new PravegaWriterWorker(i, eventsPerProducer,
+ runtimeSec, false,
+ messageSize, startTime,
+ produceStats, streamName,
+ eventsPerSec, writeAndRead, factory))
+ .collect(Collectors.toList());
+ }
+ } else {
+ writers = null;
+ }
+
+ return writers;
+ }
+
+ public List> getConsumers() throws URISyntaxException {
+ final List> readers;
+ if (consumerCount > 0) {
+ final ReaderGroup readerGroup = streamHandle.createReaderGroup();
+ readers = IntStream.range(0, consumerCount)
+ .boxed()
+ .map(i -> new PravegaReaderWorker(i, eventsPerConsumer,
+ runtimeSec, startTime, consumeStats,
+ streamName, TIMEOUT, writeAndRead, factory))
+ .collect(Collectors.toList());
+ } else {
+ readers = null;
+ }
+ return readers;
+ }
}
}
diff --git a/src/main/java/io/pravega/perf/PravegaReaderWorker.java b/src/main/java/io/pravega/perf/PravegaReaderWorker.java
index e33b355d4..df1fe6aa6 100644
--- a/src/main/java/io/pravega/perf/PravegaReaderWorker.java
+++ b/src/main/java/io/pravega/perf/PravegaReaderWorker.java
@@ -10,37 +10,32 @@
package io.pravega.perf;
-import java.time.Instant;
-
import io.pravega.client.stream.EventStreamReader;
import io.pravega.client.ClientFactory;
import io.pravega.client.stream.impl.UTF8StringSerializer;
import io.pravega.client.stream.ReaderConfig;
-import io.pravega.client.stream.EventRead;
import io.pravega.client.stream.ReinitializationRequiredException;
/**
- * class for Pravega reader/consumer.
+ * Class for Pravega reader/consumer.
*/
public class PravegaReaderWorker extends ReaderWorker {
private final EventStreamReader reader;
- private final String readerId;
PravegaReaderWorker(int readerId, int events, int secondsToRun,
- Instant start, PerfStats stats, String readergrp,
- int timeout, ClientFactory factory) {
- super(readerId, events, secondsToRun, start, stats, readergrp, timeout);
+ long start, PerfStats stats, String readergrp,
+ int timeout, boolean writeAndRead, ClientFactory factory) {
+ super(readerId, events, secondsToRun, start, stats, readergrp, timeout, writeAndRead);
- this.readerId = Integer.toString(readerId);
+ final String readerSt = Integer.toString(readerId);
reader = factory.createReader(
- this.readerId, readergrp, new UTF8StringSerializer(), ReaderConfig.builder().build());
+ readerSt, readergrp, new UTF8StringSerializer(), ReaderConfig.builder().build());
}
@Override
public String readData() {
try {
- String data = reader.readNextEvent(timeout).getEvent();
- return data;
+ return reader.readNextEvent(timeout).getEvent();
} catch (ReinitializationRequiredException e) {
throw new IllegalStateException(e);
}
diff --git a/src/main/java/io/pravega/perf/PravegaStreamHandler.java b/src/main/java/io/pravega/perf/PravegaStreamHandler.java
index 78044d823..501f4af3e 100644
--- a/src/main/java/io/pravega/perf/PravegaStreamHandler.java
+++ b/src/main/java/io/pravega/perf/PravegaStreamHandler.java
@@ -36,7 +36,7 @@
import io.pravega.client.stream.Stream;
/**
- * class for Pravega stream and segments.
+ * Class for Pravega stream and segments.
*/
public class PravegaStreamHandler {
final String scope;
diff --git a/src/main/java/io/pravega/perf/PravegaTransactionWriterWorker.java b/src/main/java/io/pravega/perf/PravegaTransactionWriterWorker.java
index 4aa286d6a..6bfab39e5 100644
--- a/src/main/java/io/pravega/perf/PravegaTransactionWriterWorker.java
+++ b/src/main/java/io/pravega/perf/PravegaTransactionWriterWorker.java
@@ -10,15 +10,9 @@
package io.pravega.perf;
-import java.time.Instant;
-import java.util.concurrent.CompletableFuture;
-
import io.pravega.client.ClientFactory;
import io.pravega.client.stream.Transaction;
import io.pravega.client.stream.TxnFailedException;
-
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.concurrent.GuardedBy;
public class PravegaTransactionWriterWorker extends PravegaWriterWorker {
@@ -32,12 +26,12 @@ public class PravegaTransactionWriterWorker extends PravegaWriterWorker {
PravegaTransactionWriterWorker(int sensorId, int events,
int secondsToRun, boolean isRandomKey,
- int messageSize, Instant start,
- PerfStats stats, String streamName, ThroughputController tput,
+ int messageSize, long start,
+ PerfStats stats, String streamName, int eventsPerSec, boolean writeAndRead,
ClientFactory factory, int transactionsPerCommit) {
super(sensorId, events, secondsToRun, isRandomKey,
- messageSize, start, stats, streamName, tput, factory);
+ messageSize, start, stats, streamName, eventsPerSec, writeAndRead, factory);
this.transactionsPerCommit = transactionsPerCommit;
eventCount = 0;
@@ -45,10 +39,13 @@ public class PravegaTransactionWriterWorker extends PravegaWriterWorker {
}
@Override
- public CompletableFuture writeData(String key, String data) {
+ public long recordWrite(String data, TriConsumer record) {
+ long time = 0;
try {
synchronized (this) {
- transaction.writeEvent(key, data);
+ time = System.currentTimeMillis();
+ transaction.writeEvent(data);
+ record.accept(time, System.currentTimeMillis(), messageSize);
eventCount++;
if (eventCount >= transactionsPerCommit) {
eventCount = 0;
@@ -59,6 +56,6 @@ public CompletableFuture writeData(String key, String data) {
} catch (TxnFailedException e) {
throw new RuntimeException("Transaction Write data failed ", e);
}
- return null;
+ return time;
}
}
\ No newline at end of file
diff --git a/src/main/java/io/pravega/perf/PravegaWriterWorker.java b/src/main/java/io/pravega/perf/PravegaWriterWorker.java
index a6f4a4515..90746b7a8 100644
--- a/src/main/java/io/pravega/perf/PravegaWriterWorker.java
+++ b/src/main/java/io/pravega/perf/PravegaWriterWorker.java
@@ -10,29 +10,27 @@
package io.pravega.perf;
-import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import io.pravega.client.stream.EventStreamWriter;
import io.pravega.client.ClientFactory;
-import io.pravega.client.stream.Transaction;
import io.pravega.client.stream.impl.UTF8StringSerializer;
import io.pravega.client.stream.EventWriterConfig;
-import io.pravega.client.stream.TxnFailedException;
/**
- * class for Pravega writer/producer.
+ * Class for Pravega writer/producer.
*/
public class PravegaWriterWorker extends WriterWorker {
final EventStreamWriter producer;
PravegaWriterWorker(int sensorId, int events, int secondsToRun,
- boolean isRandomKey, int messageSize, Instant start,
- PerfStats stats, String streamName, ThroughputController tput, ClientFactory factory) {
+ boolean isRandomKey, int messageSize, long start,
+ PerfStats stats, String streamName, int eventsPerSec,
+ boolean writeAndRead, ClientFactory factory) {
super(sensorId, events, secondsToRun,
isRandomKey, messageSize, start,
- stats, streamName, tput);
+ stats, streamName, eventsPerSec, writeAndRead);
this.producer = factory.createEventWriter(streamName,
new UTF8StringSerializer(),
@@ -40,8 +38,19 @@ public class PravegaWriterWorker extends WriterWorker {
}
@Override
- public CompletableFuture writeData(String key, String data) {
- return producer.writeEvent(key, data);
+ public long recordWrite(String data, TriConsumer record) {
+ CompletableFuture ret;
+ final long time = System.currentTimeMillis();
+ ret = producer.writeEvent(data);
+ ret.thenAccept(d -> {
+ record.accept(time, System.currentTimeMillis(), data.length());
+ });
+ return time;
+ }
+
+ @Override
+ public void writeData(String data) {
+ producer.writeEvent(data);
}
@Override
diff --git a/src/main/java/io/pravega/perf/ReaderWorker.java b/src/main/java/io/pravega/perf/ReaderWorker.java
index 2bbcfb58c..f730594ae 100644
--- a/src/main/java/io/pravega/perf/ReaderWorker.java
+++ b/src/main/java/io/pravega/perf/ReaderWorker.java
@@ -11,23 +11,24 @@
package io.pravega.perf;
import java.io.IOException;
-import java.time.Duration;
-import java.time.Instant;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
/**
- * abstract class for Readers.
+ * An Abstract class for Readers.
*/
public abstract class ReaderWorker extends Worker implements Callable {
+ final private static int MS_PER_SEC = 1000;
final private Performance perf;
- ReaderWorker(int readerId, int events, int secondsToRun, Instant start,
- PerfStats stats, String readergrp, int timeout) {
+ ReaderWorker(int readerId, int events, int secondsToRun, long start,
+ PerfStats stats, String readergrp, int timeout, boolean writeAndRead) {
super(readerId, events, secondsToRun,
- false, 0, start,
- stats, readergrp, timeout);
- perf = secondsToRun > 0 ? new EventstimeReader() : new EventsReader();
+ 0, start, stats, readergrp, timeout);
+
+ perf = secondsToRun > 0 ? (writeAndRead ? new EventsTimeReaderRW() : new EventsTimeReader()) :
+ (writeAndRead ? new EventsReaderRW() : new EventsReader());
+
}
/**
@@ -52,12 +53,11 @@ public void benchmark() throws IOException {
String ret = null;
try {
for (int i = 0; i < events; i++) {
- final Instant startTime = Instant.now();
+ final long startTime = System.currentTimeMillis();
ret = readData();
if (ret != null) {
- stats.recordTime(null, startTime, ret.length());
+ stats.recordTime(startTime, System.currentTimeMillis(), ret.length());
}
- stats.print();
}
} finally {
close();
@@ -65,18 +65,59 @@ public void benchmark() throws IOException {
}
}
- private class EventstimeReader implements Performance {
+ private class EventsReaderRW implements Performance {
public void benchmark() throws IOException {
String ret = null;
try {
+ for (int i = 0; i < events; i++) {
+ ret = readData();
+ if (ret != null) {
+ final long endTime = System.currentTimeMillis();
+ final long startTime = Long.parseLong(ret.substring(0, TIME_HEADER_SIZE));
+ stats.recordTime(startTime, endTime, ret.length());
+ }
+ }
+ } finally {
+ close();
+ }
+ }
+ }
+
+
+ private class EventsTimeReader implements Performance {
+ public void benchmark() throws IOException {
+ final long msToRun = secondsToRun * MS_PER_SEC;
+ String ret = null;
+ long time = System.currentTimeMillis();
- for (int i = 0; Duration.between(StartTime, Instant.now()).getSeconds() < secondsToRun; i++) {
- final Instant startTime = Instant.now();
+ try {
+
+ while ((time - startTime) < msToRun) {
+ time = System.currentTimeMillis();
+ ret = readData();
+ if (ret != null) {
+ stats.recordTime(time, System.currentTimeMillis(), ret.length());
+ }
+ }
+ } finally {
+ close();
+ }
+ }
+ }
+
+ private class EventsTimeReaderRW implements Performance {
+ public void benchmark() throws IOException {
+ final long msToRun = secondsToRun * MS_PER_SEC;
+ String ret = null;
+ long time = System.currentTimeMillis();
+ try {
+ while ((time - startTime) < msToRun) {
ret = readData();
if (ret != null) {
- stats.recordTime(null, startTime, ret.length());
+ time = System.currentTimeMillis();
+ final long startTime = Long.parseLong(ret.substring(0, TIME_HEADER_SIZE));
+ stats.recordTime(startTime, time, ret.length());
}
- stats.print();
}
} finally {
close();
diff --git a/src/main/java/io/pravega/perf/ThroughputController.java b/src/main/java/io/pravega/perf/ThroughputController.java
deleted file mode 100644
index a9a795c33..000000000
--- a/src/main/java/io/pravega/perf/ThroughputController.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- */
-
-package io.pravega.perf;
-
-import java.time.Duration;
-import java.time.Instant;
-
-public class ThroughputController {
- private static final long NS_PER_MS = 1000000L;
- private static final long NS_PER_SEC = 1000 * NS_PER_MS;
- private static final long MIN_SLEEP_NS = 2 * NS_PER_MS;
-
- private final long sleepTimeNs;
- private final long eventsPerSec;
- private long timeNs = 0;
-
- /**
- * @param eventsPerSec events per second
- */
- public ThroughputController(int eventsPerSec) {
- this.eventsPerSec = eventsPerSec;
- this.sleepTimeNs = this.eventsPerSec > 0 ?
- NS_PER_SEC / this.eventsPerSec : 0;
- }
-
- /**
- * @param messageSize message size in bytes
- * @param targetThroughput target throughput in MB/s
- */
- public ThroughputController(long messageSize, double targetThroughput) {
- this((int) ((targetThroughput * 1024 * 1024) / messageSize));
- }
-
- /**
- * blocks for small amounts of time to achieve targetThroughput/events per sec
- *
- * @param eventsRate number of events/sec till now
- */
- public synchronized void control(int eventsRate) {
-
- if ((this.eventsPerSec > 0) && (eventsRate > this.eventsPerSec)) {
- // control throughput / number of events by sleeping, on average,
- timeNs += sleepTimeNs;
-
- // If threshold reached, sleep a little
- if (timeNs >= MIN_SLEEP_NS) {
- Instant sleepStart = Instant.now();
- try {
- final long sleepMs = timeNs / NS_PER_MS;
- final long sleepNs = timeNs - sleepMs * NS_PER_MS;
- Thread.sleep(sleepMs, (int) sleepNs);
- } catch (InterruptedException e) {
- // will be taken care in finally block
- } finally {
- // in case of short sleeps or oversleep ;adjust it for next sleep duration
- final long sleptNS = Duration.between(sleepStart, Instant.now()).toNanos();
- if (sleptNS > 0) {
- timeNs -= sleptNS;
- } else {
- timeNs = 0;
- }
- }
- }
- }
- }
-}
diff --git a/src/main/java/io/pravega/perf/TriConsumer.java b/src/main/java/io/pravega/perf/TriConsumer.java
new file mode 100644
index 000000000..6d0b1c23b
--- /dev/null
+++ b/src/main/java/io/pravega/perf/TriConsumer.java
@@ -0,0 +1,15 @@
+/**
+ * Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ */
+
+package io.pravega.perf;
+
+public interface TriConsumer {
+ void accept(long a, long b, int c);
+}
diff --git a/src/main/java/io/pravega/perf/Worker.java b/src/main/java/io/pravega/perf/Worker.java
index 85d8efd63..f491cd161 100644
--- a/src/main/java/io/pravega/perf/Worker.java
+++ b/src/main/java/io/pravega/perf/Worker.java
@@ -10,10 +10,8 @@
package io.pravega.perf;
-import java.time.Instant;
-
/**
- * abstract class for Writers and Readers.
+ * Abstract class for Writers and Readers.
*/
public abstract class Worker {
final int workerID;
@@ -21,21 +19,19 @@ public abstract class Worker {
final int messageSize;
final int timeout;
final String streamName;
- final boolean isRandomKey;
- final Instant StartTime;
+ final long startTime;
final PerfStats stats;
final int secondsToRun;
Worker(int sensorId, int events, int secondsToRun,
- boolean isRandomKey, int messageSize, Instant start,
- PerfStats stats, String streamName, int timeout) {
+ int messageSize, long start, PerfStats stats,
+ String streamName, int timeout) {
this.workerID = sensorId;
this.events = events;
this.secondsToRun = secondsToRun;
- this.StartTime = start;
+ this.startTime = start;
this.stats = stats;
this.streamName = streamName;
- this.isRandomKey = isRandomKey;
this.messageSize = messageSize;
this.timeout = timeout;
}
diff --git a/src/main/java/io/pravega/perf/WriterWorker.java b/src/main/java/io/pravega/perf/WriterWorker.java
index 66d066d8f..e5e2a0512 100644
--- a/src/main/java/io/pravega/perf/WriterWorker.java
+++ b/src/main/java/io/pravega/perf/WriterWorker.java
@@ -10,42 +10,64 @@
package io.pravega.perf;
+import javax.annotation.concurrent.NotThreadSafe;
import java.io.IOException;
-import java.time.Duration;
-import java.time.Instant;
+import java.nio.charset.StandardCharsets;
import java.util.Random;
import java.util.concurrent.Callable;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
- * abstract class for Writers.
+ * Abstract class for Writers.
*/
public abstract class WriterWorker extends Worker implements Callable {
+ final private static int MS_PER_SEC = 1000;
final private Performance perf;
- final private ThroughputController tput;
+ final private String payload;
+ final private int eventsPerSec;
WriterWorker(int sensorId, int events, int secondsToRun,
- boolean isRandomKey, int messageSize, Instant start,
- PerfStats stats, String streamName, ThroughputController tput) {
+ boolean isRandomKey, int messageSize, long start,
+ PerfStats stats, String streamName, int eventsPerSec, boolean writeAndRead) {
super(sensorId, events, secondsToRun,
- isRandomKey, messageSize, start,
- stats, streamName, 0);
- this.tput = tput;
- perf = secondsToRun > 0 ? new ThroughputWriter() : new EventsWriter();
+ messageSize, start, stats,
+ streamName, 0);
+ this.eventsPerSec = eventsPerSec;
+ perf = secondsToRun > 0 ? (writeAndRead ? new EventsWriterTimeRW() : new EventsWriterTime()) :
+ (writeAndRead ? new EventsWriterRW() : new EventsWriter());
+ payload = createPayload(messageSize);
}
+
+ private String createPayload(int size) {
+ Random random = new Random();
+ byte[] bytes = new byte[size];
+ for (int i = 0; i < size; ++i) {
+ bytes[i] = (byte) (random.nextInt(26) + 65);
+ }
+ return new String(bytes, StandardCharsets.US_ASCII);
+ }
+
+
+ /**
+ * Writes the data and benchmark
+ *
+ * @param data data to write
+ * @param record to call for benchmarking
+ * @return time return the data sent time
+ */
+ public abstract long recordWrite(String data, TriConsumer record);
+
/**
- * writes the data.
+ * Writes the data and benchmark
*
- * @param key key for data.
* @param data data to write
*/
- public abstract CompletableFuture writeData(String key, String data);
+ public abstract void writeData(String data);
/**
- * flush the producer data.
+ * Flush the producer data.
*/
public abstract void flush();
@@ -57,67 +79,126 @@ public Void call() throws InterruptedException, ExecutionException, IOException
private class EventsWriter implements Performance {
- public void benchmark() throws InterruptedException, ExecutionException, IOException {
- CompletableFuture retFuture = null;
- Random rand = new Random();
+ public void benchmark() throws InterruptedException, IOException {
+ final EventsController eCnt = new EventsController(System.currentTimeMillis(), eventsPerSec);
+ for (int i = 0; i < events; i++) {
+ recordWrite(payload, stats::recordTime);
+ eCnt.control(i);
+ }
+ flush();
+ }
+ }
+
+ private class EventsWriterRW implements Performance {
+ public void benchmark() throws InterruptedException, IOException {
+ final long time = System.currentTimeMillis();
+ final String timeHeader = String.format(TIME_HEADER_FORMAT, time);
+ final EventsController eCnt = new EventsController(time, eventsPerSec);
+ final StringBuilder buffer = new StringBuilder(timeHeader + ", " + workerID + ", " + payload);
+ buffer.setLength(messageSize);
for (int i = 0; i < events; i++) {
+ final String header = String.format(TIME_HEADER_FORMAT, System.currentTimeMillis());
+ final String data = buffer.replace(0, TIME_HEADER_SIZE, header).toString();
+ writeData(data);
+ eCnt.control(i);
+ }
+ flush();
+ }
+ }
- // Construct event payload
- String val = System.currentTimeMillis() + ", " + workerID + ", " + (int) (Math.random() * 200);
- String payload = String.format("%-" + messageSize + "s", val);
- String key;
- if (isRandomKey) {
- key = Integer.toString(workerID + rand.nextInt());
- } else {
- key = Integer.toString(workerID);
- }
+ private class EventsWriterTime implements Performance {
- final Instant startTime = Instant.now();
- retFuture = writeData(key, payload);
- // event ingestion
- retFuture = stats.recordTime(retFuture, startTime, payload.length());
- stats.print();
- tput.control(stats.eventsRate());
+ public void benchmark() throws InterruptedException, IOException {
+ final long msToRun = secondsToRun * MS_PER_SEC;
+ long time = System.currentTimeMillis();
+ final EventsController eCnt = new EventsController(time, eventsPerSec);
+ for (int i = 0; (time - startTime) < msToRun; i++) {
+ time = recordWrite(payload, stats::recordTime);
+ eCnt.control(i);
}
-
flush();
+ }
+ }
- //Wait for the last packet to get acked
- retFuture.get();
+ private class EventsWriterTimeRW implements Performance {
+
+ public void benchmark() throws InterruptedException, IOException {
+ final long msToRun = secondsToRun * MS_PER_SEC;
+ long time = System.currentTimeMillis();
+ final String timeHeader = String.format(TIME_HEADER_FORMAT, time);
+ final EventsController eCnt = new EventsController(time, eventsPerSec);
+ final StringBuilder buffer = new StringBuilder(timeHeader + ", " + workerID + ", " + payload);
+ buffer.setLength(messageSize);
+ for (int i = 0; (time - startTime) < msToRun; i++) {
+ time = System.currentTimeMillis();
+ final String header = String.format(TIME_HEADER_FORMAT, time);
+ final String data = buffer.replace(0, TIME_HEADER_SIZE, header).toString();
+ writeData(data);
+ eCnt.control(i);
+ }
+ flush();
}
}
- private class ThroughputWriter implements Performance {
-
- public void benchmark() throws InterruptedException, ExecutionException, IOException {
- CompletableFuture retFuture = null;
- Random rand = new Random();
-
- while (Duration.between(StartTime, Instant.now()).getSeconds() < secondsToRun) {
- // Construct event payload
- String val = System.currentTimeMillis() + ", " + workerID + ", " + (int) (Math.random() * 200);
- String payload = String.format("%-" + messageSize + "s", val);
- String key;
- if (isRandomKey) {
- key = Integer.toString(workerID + rand.nextInt());
- } else {
- key = Integer.toString(workerID);
- }
+ @NotThreadSafe
+ static private class EventsController {
+ private static final long NS_PER_MS = 1000000L;
+ private static final long NS_PER_SEC = 1000 * NS_PER_MS;
+ private static final long MIN_SLEEP_NS = 2 * NS_PER_MS;
+ private final long startTime;
+ private final long sleepTimeNs;
+ private final int eventsPerSec;
+ private long toSleepNs = 0;
+
+ /**
+ * @param eventsPerSec events per second
+ */
+ private EventsController(long start, int eventsPerSec) {
+ this.startTime = start;
+ this.eventsPerSec = eventsPerSec;
+ this.sleepTimeNs = this.eventsPerSec > 0 ?
+ NS_PER_SEC / this.eventsPerSec : 0;
+ }
- final Instant beginTime = Instant.now();
- retFuture = writeData(key, payload);
- // event ingestion
- retFuture = stats.recordTime(retFuture, beginTime, payload.length());
- stats.print();
- tput.control(stats.eventsRate());
+ /**
+ * Blocks for small amounts of time to achieve targetThroughput/events per sec
+ *
+ * @param events current events
+ */
+ private void control(long events) {
+ if (this.eventsPerSec <= 0) {
+ return;
}
- flush();
+ float elapsedSec = (System.currentTimeMillis() - startTime) / 1000.f;
- //Wait for the last packet to get acked
- retFuture.get();
+ if ((events / elapsedSec) < this.eventsPerSec) {
+ return;
+ }
+
+ // control throughput / number of events by sleeping, on average,
+ toSleepNs += sleepTimeNs;
+ // If threshold reached, sleep a little
+ if (toSleepNs >= MIN_SLEEP_NS) {
+ long sleepStart = System.nanoTime();
+ try {
+ final long sleepMs = toSleepNs / NS_PER_MS;
+ final long sleepNs = toSleepNs - sleepMs * NS_PER_MS;
+ Thread.sleep(sleepMs, (int) sleepNs);
+ } catch (InterruptedException e) {
+ // will be taken care in finally block
+ } finally {
+ // in case of short sleeps or oversleep ;adjust it for next sleep duration
+ final long sleptNS = System.nanoTime() - sleepStart;
+ if (sleptNS > 0) {
+ toSleepNs -= sleptNS;
+ } else {
+ toSleepNs = 0;
+ }
+ }
+ }
}
}
}