Skip to content

Commit

Permalink
Issues 37, 41, 43: Closing ReaderGroups, writers and adding "-flush" …
Browse files Browse the repository at this point in the history
…parameter to Benchmark tool (#42)

Ensure that reader groups and writers are closed always. Added new "flush" parameter to control the frequency of flush calls on writers.

Signed-off-by: Keshava Munegowda <keshava.munegowda@dell.com>
  • Loading branch information
kmgowda authored and RaulGracia committed May 29, 2019
1 parent 169fa2b commit cbe548c
Show file tree
Hide file tree
Showing 10 changed files with 362 additions and 203 deletions.
45 changes: 24 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ You may obtain a copy of the License at
The Pravega benchmark tool used for the performance benchmarking of pravega streaming storage cluster.
This tool performs the throughput and latency analysis for the multi producers/writers and consumers/readers of pravega.
it also validates the end to end latency. The write and/or read latencies can be stored in a CSV file for later analysis.
At the end of the performance benchmarking, this tool outputs the 50th, 75th, 95th , 99th and 99.9th latency percentiles.
At the end of the performance benchmarking, this tool outputs the 50th, 75th, 95th , 99th, 99.9th and 99.99th latency percentiles.


### Prerequisites
Expand Down Expand Up @@ -42,34 +42,37 @@ untar the Pravega benchmark tool to local folder
tar -xvf ./build/distributions/pravega-benchmark.tar -C ./run
```

Running Pravega bencmark tool locally:
Running Pravega benchmark tool locally:

```
<dir>/pravega-benchmark$ ./run/pravega-benchmark/bin/pravega-benchmark -help
usage: pravega-benchmark
-consumers <arg> number of consumers
-controller <arg> controller URI
-events <arg> number of events/records if 'time' not
-consumers <arg> Number of consumers
-controller <arg> Controller URI
-events <arg> Number of events/records if 'time' not
specified;
otherwise, maximum events per second by
producer(s) and/or number of events per
otherwise, Maximum events per second by
producer(s) and/or Number of events per
consumer
-flush <arg> Each producer calls flush after writing
<arg> number of of events/records; Not
applicable, if both producers and
consumers are specified
-help Help message
-producers <arg> number of producers
-readcsv <arg> csv file to record read latencies
-producers <arg> Number of producers
-readcsv <arg> CSV file to record read latencies
-recreate <arg> If the stream is already existing, delete
it and recreate it
and recreate the same
-segments <arg> Number of segments
-size <arg> Size of each message (event or record)
-stream <arg> Stream name
-throughput <arg> if > 0 , throughput in MB/s
if 0 , writes 'events'
if -1, get the maximum throughput
-time <arg> number of seconds the code runs
-transaction <arg> Producers use transactions or not
-time <arg> Number of seconds the code runs
-transactionspercommit <arg> Number of events before a transaction is
committed
-writecsv <arg> csv file to record write latencies
-writecsv <arg> CSV file to record write latencies
```

## Running Performance benchmarking
Expand All @@ -87,8 +90,8 @@ The Pravega benchmark tool can be executed in the following modes:
```

### 1 - Burst Mode
In this mode, the Pravega benchmark tool pushes/pulls the messages to/from the pravega client as much as possible.
This mode is used to find the maximum and throughput that can be obtained from the pravega cluster.
In this mode, the Pravega benchmark tool pushes/pulls the messages to/from the Pravega client as much as possible.
This mode is used to find the maximum and throughput that can be obtained from the Pravega cluster.
This mode can be used for both producers and consumers.

```
Expand All @@ -109,8 +112,8 @@ in the case you want to write/read the certain number of events use the -events
```

### 2 - Throughput Mode
In this mode, the Pravega benchmark tool pushes the messages to the pravega client with specified approximate maximum throughput in terms of Mega Bytes/second (MB/s).
This mode is used to find the least latency that can be obtained from the pravega cluster for given throughput.
In this mode, the Pravega benchmark tool pushes the messages to the Pravega client with specified approximate maximum throughput in terms of Mega Bytes/second (MB/s).
This mode is used to find the least latency that can be obtained from the Pravega cluster for given throughput.
This mode is used only for write operation.

```
Expand All @@ -135,8 +138,8 @@ in the case you want to write/read the certain number of events use the -events

### 3 - OPS Mode or Events Rate / Rate Limiter Mode
This mode is another form of controlling writers throughput by limiting the number of events per second.
In this mode, the Pravega benchmark tool pushes the messages to the pravega client with specified approximate maximum events per sec.
This mode is used to find the least latency that can be obtained from the pravega cluster for events rate.
In this mode, the Pravega benchmark tool pushes the messages to the Pravega client with specified approximate maximum events per sec.
This mode is used to find the least latency that can be obtained from the Pravega cluster for events rate.
This mode is used only for write operation.

```
Expand All @@ -152,7 +155,7 @@ Note that in this mode, there is 'NO total number of events' to specify hence us
```

### 4 - End to End Latency Mode
In this mode, the Pravega benchmark tool writes and read the messages to the pravega cluster and records the end to end latency.
In this mode, the Pravega benchmark tool writes and read the messages to the Pravega cluster and records the end to end latency.
End to end latency means the time duration between the beginning of the writing event/record to stream and the time after reading the event/record.
in this mode user must specify both the number of producers and consumers.
The -throughput option (Throughput mode) or -events (late limiter) can used to limit the writers throughput or events rate.
Expand All @@ -166,5 +169,5 @@ The -throughput -1 specifies the writes tries to write the events at the maximum
```

### Recording the latencies to CSV files
user can use the options "-writecsv <file name>" to record the latencies of writers and "-readcsv <file name>" for readers.
User can use the options "-writecsv <file name>" to record the latencies of writers and "-readcsv <file name>" for readers.
in case of End to End latency mode, if the user can supply only -readcsv to get the end to end latency in to the csv file.
34 changes: 25 additions & 9 deletions src/main/java/io/pravega/perf/PerfStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ public PerfStats(String action, int reportingInterval, int messageSize, String c
* Private class for start and end time.
*/
final private class QueueProcessor implements Callable {
final private static int NS_PER_MICRO = 1000;
final private static int MICROS_PER_MS = 1000;
final private static int NS_PER_MS = NS_PER_MICRO * MICROS_PER_MS;
final private static int PARK_NS = NS_PER_MICRO;
final private long startTime;

private QueueProcessor(long startTime) {
Expand All @@ -90,8 +94,11 @@ public Void call() throws IOException {
final TimeWindow window = new TimeWindow(action, startTime);
final LatencyWriter latencyRecorder = csvFile == null ? new LatencyWriter(action, messageSize, startTime) :
new CSVLatencyWriter(action, messageSize, startTime, csvFile);
final int minWaitTimeMS = windowInterval / 50;
final long totalIdleCount = (NS_PER_MS / PARK_NS) * minWaitTimeMS;
boolean doWork = true;
long time = startTime;
long idleCount = 0;
TimeStamp t;

while (doWork) {
Expand All @@ -105,13 +112,21 @@ public Void call() throws IOException {
latencyRecorder.record(t.startTime, t.bytes, latency);
}
time = t.endTime;
if (window.windowTimeMS(time) > windowInterval) {
window.print(time);
window.reset(time);
}
} else {
LockSupport.parkNanos(500);
time = System.currentTimeMillis();
}
if (window.windowTimeMS(time) > windowInterval) {
window.print(time);
window.reset(time);
LockSupport.parkNanos(PARK_NS);
idleCount++;
if (idleCount > totalIdleCount) {
time = System.currentTimeMillis();
idleCount = 0;
if (window.windowTimeMS(time) > windowInterval) {
window.print(time);
window.reset(time);
}
}
}
}
latencyRecorder.printTotal(time);
Expand Down Expand Up @@ -188,7 +203,7 @@ static private class LatencyWriter {
final static int MS_PER_SEC = 1000;
final static int MS_PER_MIN = MS_PER_SEC * 60;
final static int MS_PER_HR = MS_PER_MIN * 60;
final double[] percentiles = {0.5, 0.75, 0.95, 0.99, 0.999};
final double[] percentiles = {0.5, 0.75, 0.95, 0.99, 0.999, 0.9999};
final String action;
final int messageSize;
final long startTime;
Expand Down Expand Up @@ -255,6 +270,7 @@ private int[] getPercentiles() {
}

public void record(int bytes, int latency) {
assert latency < latencies.length : "Invalid latency";
totalBytes += bytes;
latencies[latency]++;
}
Expand All @@ -272,9 +288,9 @@ public void printTotal(long endTime) {

System.out.printf(
"%d records %s, %.3f records/sec, %d bytes record size, %.2f MB/sec, %.1f ms avg latency, %.1f ms max latency" +
", %d ms 50th, %d ms 75th, %d ms 95th, %d ms 99th, %d ms 99.9th\n",
", %d ms 50th, %d ms 75th, %d ms 95th, %d ms 99th, %d ms 99.9th, %d ms 99.99th.\n",
count, action, recsPerSec, messageSize, mbPerSec, totalLatency / ((double) count), (double) maxLatency,
percs[0], percs[1], percs[2], percs[3], percs[4]);
percs[0], percs[1], percs[2], percs[3], percs[4], percs[5]);
}
}

Expand Down
5 changes: 1 addition & 4 deletions src/main/java/io/pravega/perf/Performance.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,5 @@
import java.io.IOException;

public interface Performance {
int TIME_HEADER_SIZE = 14;
String TIME_HEADER_FORMAT = "%0" + TIME_HEADER_SIZE + "d";

void benchmark() throws InterruptedException, IOException;
void benchmark() throws InterruptedException, IOException;
}
Loading

0 comments on commit cbe548c

Please sign in to comment.