diff --git a/README.md b/README.md index da64ce4a2..c5e0e62ec 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ You may obtain a copy of the License at The Pravega benchmark tool used for the performance benchmarking of pravega streaming storage cluster. This tool performs the throughput and latency analysis for the multi producers/writers and consumers/readers of pravega. it also validates the end to end latency. The write and/or read latencies can be stored in a CSV file for later analysis. -At the end of the performance benchmarking, this tool outputs the 50th, 75th, 95th , 99th and 99.9th latency percentiles. +At the end of the performance benchmarking, this tool outputs the 50th, 75th, 95th , 99th, 99.9th and 99.99th latency percentiles. ### Prerequisites @@ -42,34 +42,37 @@ untar the Pravega benchmark tool to local folder tar -xvf ./build/distributions/pravega-benchmark.tar -C ./run ``` -Running Pravega bencmark tool locally: +Running Pravega benchmark tool locally: ``` /pravega-benchmark$ ./run/pravega-benchmark/bin/pravega-benchmark -help usage: pravega-benchmark - -consumers number of consumers - -controller controller URI - -events number of events/records if 'time' not + -consumers Number of consumers + -controller Controller URI + -events Number of events/records if 'time' not specified; - otherwise, maximum events per second by - producer(s) and/or number of events per + otherwise, Maximum events per second by + producer(s) and/or Number of events per consumer + -flush Each producer calls flush after writing + number of of events/records; Not + applicable, if both producers and + consumers are specified -help Help message - -producers number of producers - -readcsv csv file to record read latencies + -producers Number of producers + -readcsv CSV file to record read latencies -recreate If the stream is already existing, delete - it and recreate it + and recreate the same -segments Number of segments -size Size of each message (event or record) -stream Stream name -throughput if > 0 , throughput in MB/s if 0 , writes 'events' if -1, get the maximum throughput - -time number of seconds the code runs - -transaction Producers use transactions or not + -time Number of seconds the code runs -transactionspercommit Number of events before a transaction is committed - -writecsv csv file to record write latencies + -writecsv CSV file to record write latencies ``` ## Running Performance benchmarking @@ -87,8 +90,8 @@ The Pravega benchmark tool can be executed in the following modes: ``` ### 1 - Burst Mode -In this mode, the Pravega benchmark tool pushes/pulls the messages to/from the pravega client as much as possible. -This mode is used to find the maximum and throughput that can be obtained from the pravega cluster. +In this mode, the Pravega benchmark tool pushes/pulls the messages to/from the Pravega client as much as possible. +This mode is used to find the maximum and throughput that can be obtained from the Pravega cluster. This mode can be used for both producers and consumers. ``` @@ -109,8 +112,8 @@ in the case you want to write/read the certain number of events use the -events ``` ### 2 - Throughput Mode -In this mode, the Pravega benchmark tool pushes the messages to the pravega client with specified approximate maximum throughput in terms of Mega Bytes/second (MB/s). -This mode is used to find the least latency that can be obtained from the pravega cluster for given throughput. +In this mode, the Pravega benchmark tool pushes the messages to the Pravega client with specified approximate maximum throughput in terms of Mega Bytes/second (MB/s). +This mode is used to find the least latency that can be obtained from the Pravega cluster for given throughput. This mode is used only for write operation. ``` @@ -135,8 +138,8 @@ in the case you want to write/read the certain number of events use the -events ### 3 - OPS Mode or Events Rate / Rate Limiter Mode This mode is another form of controlling writers throughput by limiting the number of events per second. -In this mode, the Pravega benchmark tool pushes the messages to the pravega client with specified approximate maximum events per sec. -This mode is used to find the least latency that can be obtained from the pravega cluster for events rate. +In this mode, the Pravega benchmark tool pushes the messages to the Pravega client with specified approximate maximum events per sec. +This mode is used to find the least latency that can be obtained from the Pravega cluster for events rate. This mode is used only for write operation. ``` @@ -152,7 +155,7 @@ Note that in this mode, there is 'NO total number of events' to specify hence us ``` ### 4 - End to End Latency Mode -In this mode, the Pravega benchmark tool writes and read the messages to the pravega cluster and records the end to end latency. +In this mode, the Pravega benchmark tool writes and read the messages to the Pravega cluster and records the end to end latency. End to end latency means the time duration between the beginning of the writing event/record to stream and the time after reading the event/record. in this mode user must specify both the number of producers and consumers. The -throughput option (Throughput mode) or -events (late limiter) can used to limit the writers throughput or events rate. @@ -166,5 +169,5 @@ The -throughput -1 specifies the writes tries to write the events at the maximum ``` ### Recording the latencies to CSV files -user can use the options "-writecsv " to record the latencies of writers and "-readcsv " for readers. +User can use the options "-writecsv " to record the latencies of writers and "-readcsv " for readers. in case of End to End latency mode, if the user can supply only -readcsv to get the end to end latency in to the csv file. diff --git a/src/main/java/io/pravega/perf/PerfStats.java b/src/main/java/io/pravega/perf/PerfStats.java index 3b8950ff9..5f5400983 100644 --- a/src/main/java/io/pravega/perf/PerfStats.java +++ b/src/main/java/io/pravega/perf/PerfStats.java @@ -80,6 +80,10 @@ public PerfStats(String action, int reportingInterval, int messageSize, String c * Private class for start and end time. */ final private class QueueProcessor implements Callable { + final private static int NS_PER_MICRO = 1000; + final private static int MICROS_PER_MS = 1000; + final private static int NS_PER_MS = NS_PER_MICRO * MICROS_PER_MS; + final private static int PARK_NS = NS_PER_MICRO; final private long startTime; private QueueProcessor(long startTime) { @@ -90,8 +94,11 @@ public Void call() throws IOException { final TimeWindow window = new TimeWindow(action, startTime); final LatencyWriter latencyRecorder = csvFile == null ? new LatencyWriter(action, messageSize, startTime) : new CSVLatencyWriter(action, messageSize, startTime, csvFile); + final int minWaitTimeMS = windowInterval / 50; + final long totalIdleCount = (NS_PER_MS / PARK_NS) * minWaitTimeMS; boolean doWork = true; long time = startTime; + long idleCount = 0; TimeStamp t; while (doWork) { @@ -105,13 +112,21 @@ public Void call() throws IOException { latencyRecorder.record(t.startTime, t.bytes, latency); } time = t.endTime; + if (window.windowTimeMS(time) > windowInterval) { + window.print(time); + window.reset(time); + } } else { - LockSupport.parkNanos(500); - time = System.currentTimeMillis(); - } - if (window.windowTimeMS(time) > windowInterval) { - window.print(time); - window.reset(time); + LockSupport.parkNanos(PARK_NS); + idleCount++; + if (idleCount > totalIdleCount) { + time = System.currentTimeMillis(); + idleCount = 0; + if (window.windowTimeMS(time) > windowInterval) { + window.print(time); + window.reset(time); + } + } } } latencyRecorder.printTotal(time); @@ -188,7 +203,7 @@ static private class LatencyWriter { final static int MS_PER_SEC = 1000; final static int MS_PER_MIN = MS_PER_SEC * 60; final static int MS_PER_HR = MS_PER_MIN * 60; - final double[] percentiles = {0.5, 0.75, 0.95, 0.99, 0.999}; + final double[] percentiles = {0.5, 0.75, 0.95, 0.99, 0.999, 0.9999}; final String action; final int messageSize; final long startTime; @@ -255,6 +270,7 @@ private int[] getPercentiles() { } public void record(int bytes, int latency) { + assert latency < latencies.length : "Invalid latency"; totalBytes += bytes; latencies[latency]++; } @@ -272,9 +288,9 @@ public void printTotal(long endTime) { System.out.printf( "%d records %s, %.3f records/sec, %d bytes record size, %.2f MB/sec, %.1f ms avg latency, %.1f ms max latency" + - ", %d ms 50th, %d ms 75th, %d ms 95th, %d ms 99th, %d ms 99.9th\n", + ", %d ms 50th, %d ms 75th, %d ms 95th, %d ms 99th, %d ms 99.9th, %d ms 99.99th.\n", count, action, recsPerSec, messageSize, mbPerSec, totalLatency / ((double) count), (double) maxLatency, - percs[0], percs[1], percs[2], percs[3], percs[4]); + percs[0], percs[1], percs[2], percs[3], percs[4], percs[5]); } } diff --git a/src/main/java/io/pravega/perf/Performance.java b/src/main/java/io/pravega/perf/Performance.java index b99519833..5e837e9fb 100644 --- a/src/main/java/io/pravega/perf/Performance.java +++ b/src/main/java/io/pravega/perf/Performance.java @@ -13,8 +13,5 @@ import java.io.IOException; public interface Performance { - int TIME_HEADER_SIZE = 14; - String TIME_HEADER_FORMAT = "%0" + TIME_HEADER_SIZE + "d"; - - void benchmark() throws InterruptedException, IOException; + void benchmark() throws InterruptedException, IOException; } diff --git a/src/main/java/io/pravega/perf/PravegaPerfTest.java b/src/main/java/io/pravega/perf/PravegaPerfTest.java index 0c8919edb..9a49ce57f 100644 --- a/src/main/java/io/pravega/perf/PravegaPerfTest.java +++ b/src/main/java/io/pravega/perf/PravegaPerfTest.java @@ -55,33 +55,30 @@ public static void main(String[] args) { Option opt = null; final long startTime = System.currentTimeMillis(); - opt = new Option("controller", true, "controller URI"); - opt.setRequired(true); - options.addOption(opt); - opt = new Option("stream", true, "Stream name"); - opt.setRequired(true); - options.addOption(opt); - - options.addOption("producers", true, "number of producers"); - options.addOption("consumers", true, "number of consumers"); + options.addOption("controller", true, "Controller URI"); + options.addOption("stream", true, "Stream name"); + options.addOption("producers", true, "Number of producers"); + options.addOption("consumers", true, "Number of consumers"); options.addOption("events", true, - "number of events/records if 'time' not specified;\n" + - "otherwise, maximum events per second by producer(s) " + - "and/or number of events per consumer"); - options.addOption("time", true, "number of seconds the code runs"); - options.addOption("transaction", true, "Producers use transactions or not"); + "Number of events/records if 'time' not specified;\n" + + "otherwise, Maximum events per second by producer(s) " + + "and/or Number of events per consumer"); + options.addOption("flush", true, + "Each producer calls flush after writing number of of events/records; " + + "Not applicable, if both producers and consumers are specified"); + options.addOption("time", true, "Number of seconds the code runs"); options.addOption("transactionspercommit", true, "Number of events before a transaction is committed"); options.addOption("segments", true, "Number of segments"); options.addOption("size", true, "Size of each message (event or record)"); options.addOption("recreate", true, - "If the stream is already existing, delete it and recreate it"); + "If the stream is already existing, delete and recreate the same"); options.addOption("throughput", true, "if > 0 , throughput in MB/s\n" + "if 0 , writes 'events'\n" + "if -1, get the maximum throughput"); - options.addOption("writecsv", true, "csv file to record write latencies"); - options.addOption("readcsv", true, "csv file to record read latencies"); + options.addOption("writecsv", true, "CSV file to record write latencies"); + options.addOption("readcsv", true, "CSV file to record read latencies"); options.addOption("help", false, "Help message"); @@ -107,10 +104,10 @@ public static void main(String[] args) { final ForkJoinPool executor = new ForkJoinPool(); try { - final List> producers = perfTest.getProducers(); - final List> consumers = perfTest.getConsumers(); + final List producers = perfTest.getProducers(); + final List consumers = perfTest.getConsumers(); - final List> workers = Stream.of(producers, consumers) + final List> workers = Stream.of(consumers, producers) .filter(x -> x != null) .flatMap(x -> x.stream()) .collect(Collectors.toList()); @@ -122,6 +119,12 @@ public void run() { executor.shutdown(); executor.awaitTermination(1, TimeUnit.SECONDS); perfTest.shutdown(System.currentTimeMillis()); + if (consumers != null) { + consumers.forEach(ReaderWorker::close); + } + if (producers != null) { + producers.forEach(WriterWorker::close); + } } catch (InterruptedException ex) { ex.printStackTrace(); } @@ -132,6 +135,12 @@ public void run() { executor.shutdown(); executor.awaitTermination(1, TimeUnit.SECONDS); perfTest.shutdown(System.currentTimeMillis()); + if (consumers != null) { + consumers.forEach(ReaderWorker::close); + } + if (producers != null) { + producers.forEach(WriterWorker::close); + } } catch (Exception ex) { ex.printStackTrace(); } @@ -156,7 +165,7 @@ public static Test createTest(long startTime, CommandLine commandline, Options o static private abstract class Test { static final int MAXTIME = 60 * 60 * 24; static final int REPORTINGINTERVAL = 5000; - static final int TIMEOUT = 10; + static final int TIMEOUT = 1000; static final String SCOPE = "Scope"; final String controllerUri; @@ -172,6 +181,7 @@ static private abstract class Test { final int eventsPerSec; final int eventsPerProducer; final int eventsPerConsumer; + final int EventsPerFlush; final int transactionPerCommit; final int runtimeSec; final double throughput; @@ -207,6 +217,17 @@ static private abstract class Test { events = 0; } + if (commandline.hasOption("flush")) { + int flushEvents = Integer.parseInt(commandline.getOptionValue("flush")); + if (flushEvents > 0) { + EventsPerFlush = flushEvents; + } else { + EventsPerFlush = Integer.MAX_VALUE; + } + } else { + EventsPerFlush = Integer.MAX_VALUE; + } + if (commandline.hasOption("time")) { runtimeSec = Integer.parseInt(commandline.getOptionValue("time")); } else if (events > 0) { @@ -263,6 +284,15 @@ static private abstract class Test { } scopeName = SCOPE; + + if (controllerUri == null) { + throw new IllegalArgumentException("Error: Must specify Controller IP address"); + } + + if (streamName == null) { + throw new IllegalArgumentException("Error: Must specify stream Name"); + } + if (producerCount == 0 && consumerCount == 0) { throw new IllegalArgumentException("Error: Must specify the number of producers or Consumers"); } @@ -279,6 +309,7 @@ static private abstract class Test { } else { produceStats = new PerfStats("Writing", REPORTINGINTERVAL, messageSize, writeFile); } + eventsPerProducer = (events + producerCount - 1) / producerCount; if (throughput < 0 && runtimeSec > 0) { eventsPerSec = events / producerCount; @@ -307,11 +338,10 @@ static private abstract class Test { consumeStats = null; eventsPerConsumer = 0; } - } - private void start(long startTime) throws IOException { - if (produceStats != null && consumeStats == null) { + public void start(long startTime) throws IOException { + if (produceStats != null && !writeAndRead) { produceStats.start(startTime); } if (consumeStats != null) { @@ -319,9 +349,9 @@ private void start(long startTime) throws IOException { } } - private void shutdown(long endTime) { + public void shutdown(long endTime) { try { - if (produceStats != null && consumeStats == null) { + if (produceStats != null && !writeAndRead) { produceStats.shutdown(endTime); } if (consumeStats != null) { @@ -332,15 +362,16 @@ private void shutdown(long endTime) { } } - public abstract List> getProducers(); + public abstract List getProducers(); - public abstract List> getConsumers() throws URISyntaxException; + public abstract List getConsumers() throws URISyntaxException; } static private class PravegaTest extends Test { final PravegaStreamHandler streamHandle; final ClientFactory factory; + final ReaderGroup readerGroup; PravegaTest(long startTime, CommandLine commandline) throws IllegalArgumentException, URISyntaxException, InterruptedException, Exception { @@ -363,12 +394,17 @@ static private class PravegaTest extends Test { streamHandle.scale(); } } + if (consumerCount > 0) { + readerGroup = streamHandle.createReaderGroup(!writeAndRead); + } else { + readerGroup = null; + } factory = new ClientFactoryImpl(scopeName, controller); } - public List> getProducers() { - final List> writers; + public List getProducers() { + final List writers; if (producerCount > 0) { if (transactionPerCommit > 0) { @@ -385,10 +421,9 @@ public List> getProducers() { writers = IntStream.range(0, producerCount) .boxed() .map(i -> new PravegaWriterWorker(i, eventsPerProducer, - runtimeSec, false, - messageSize, startTime, - produceStats, streamName, - eventsPerSec, writeAndRead, factory)) + EventsPerFlush, runtimeSec, false, + messageSize, startTime, produceStats, + streamName, eventsPerSec, writeAndRead, factory)) .collect(Collectors.toList()); } } else { @@ -398,10 +433,9 @@ public List> getProducers() { return writers; } - public List> getConsumers() throws URISyntaxException { - final List> readers; + public List getConsumers() throws URISyntaxException { + final List readers; if (consumerCount > 0) { - final ReaderGroup readerGroup = streamHandle.createReaderGroup(); readers = IntStream.range(0, consumerCount) .boxed() .map(i -> new PravegaReaderWorker(i, eventsPerConsumer, @@ -413,5 +447,13 @@ public List> getConsumers() throws URISyntaxException { } return readers; } + + @Override + public void shutdown(long endTime) { + if (readerGroup != null) { + readerGroup.close(); + } + super.shutdown(endTime); + } } } diff --git a/src/main/java/io/pravega/perf/PravegaStreamHandler.java b/src/main/java/io/pravega/perf/PravegaStreamHandler.java index 501f4af3e..f9b9fa55e 100644 --- a/src/main/java/io/pravega/perf/PravegaStreamHandler.java +++ b/src/main/java/io/pravega/perf/PravegaStreamHandler.java @@ -36,7 +36,7 @@ import io.pravega.client.stream.Stream; /** - * Class for Pravega stream and segments. + * Class for Pravega stream and segments. */ public class PravegaStreamHandler { final String scope; @@ -134,14 +134,16 @@ void recreate() throws InterruptedException, ExecutionException, TimeoutExceptio } } - ReaderGroup createReaderGroup() throws URISyntaxException { - ReaderGroupManager readerGroupManager = ReaderGroupManager.withScope(scope, - ClientConfig.builder() - .controllerURI(new URI(controllerUri)).build()); - readerGroupManager.createReaderGroup(stream, - ReaderGroupConfig.builder() - .stream(Stream.of(scope, stream)) - .build()); - return readerGroupManager.getReaderGroup(stream); + ReaderGroup createReaderGroup(boolean reset) throws URISyntaxException { + final ReaderGroupManager readerGroupManager = ReaderGroupManager.withScope(scope, + ClientConfig.builder().controllerURI(new URI(controllerUri)).build()); + final ReaderGroupConfig rdGrpConfig = ReaderGroupConfig.builder() + .stream(Stream.of(scope, stream)).build(); + readerGroupManager.createReaderGroup(stream, rdGrpConfig); + final ReaderGroup rdGroup = readerGroupManager.getReaderGroup(stream); + if (reset) { + rdGroup.resetReaderGroup(rdGrpConfig); + } + return rdGroup; } } diff --git a/src/main/java/io/pravega/perf/PravegaTransactionWriterWorker.java b/src/main/java/io/pravega/perf/PravegaTransactionWriterWorker.java index 6bfab39e5..66bf500bf 100644 --- a/src/main/java/io/pravega/perf/PravegaTransactionWriterWorker.java +++ b/src/main/java/io/pravega/perf/PravegaTransactionWriterWorker.java @@ -13,6 +13,7 @@ import io.pravega.client.ClientFactory; import io.pravega.client.stream.Transaction; import io.pravega.client.stream.TxnFailedException; + import javax.annotation.concurrent.GuardedBy; public class PravegaTransactionWriterWorker extends PravegaWriterWorker { @@ -30,7 +31,7 @@ public class PravegaTransactionWriterWorker extends PravegaWriterWorker { PerfStats stats, String streamName, int eventsPerSec, boolean writeAndRead, ClientFactory factory, int transactionsPerCommit) { - super(sensorId, events, secondsToRun, isRandomKey, + super(sensorId, events, Integer.MAX_VALUE, secondsToRun, isRandomKey, messageSize, start, stats, streamName, eventsPerSec, writeAndRead, factory); this.transactionsPerCommit = transactionsPerCommit; diff --git a/src/main/java/io/pravega/perf/PravegaWriterWorker.java b/src/main/java/io/pravega/perf/PravegaWriterWorker.java index 90746b7a8..d4ceab356 100644 --- a/src/main/java/io/pravega/perf/PravegaWriterWorker.java +++ b/src/main/java/io/pravega/perf/PravegaWriterWorker.java @@ -23,13 +23,13 @@ public class PravegaWriterWorker extends WriterWorker { final EventStreamWriter producer; - PravegaWriterWorker(int sensorId, int events, int secondsToRun, + PravegaWriterWorker(int sensorId, int events, int EventsPerFlush, int secondsToRun, boolean isRandomKey, int messageSize, long start, PerfStats stats, String streamName, int eventsPerSec, boolean writeAndRead, ClientFactory factory) { - super(sensorId, events, secondsToRun, - isRandomKey, messageSize, start, + super(sensorId, events, EventsPerFlush, + secondsToRun, isRandomKey, messageSize, start, stats, streamName, eventsPerSec, writeAndRead); this.producer = factory.createEventWriter(streamName, @@ -57,4 +57,9 @@ public void writeData(String data) { public void flush() { producer.flush(); } + + @Override + public synchronized void close() { + producer.close(); + } } \ No newline at end of file diff --git a/src/main/java/io/pravega/perf/ReaderWorker.java b/src/main/java/io/pravega/perf/ReaderWorker.java index f730594ae..dc17d5c9f 100644 --- a/src/main/java/io/pravega/perf/ReaderWorker.java +++ b/src/main/java/io/pravega/perf/ReaderWorker.java @@ -20,17 +20,28 @@ public abstract class ReaderWorker extends Worker implements Callable { final private static int MS_PER_SEC = 1000; final private Performance perf; + final private boolean writeAndRead; ReaderWorker(int readerId, int events, int secondsToRun, long start, - PerfStats stats, String readergrp, int timeout, boolean writeAndRead) { - super(readerId, events, secondsToRun, - 0, start, stats, readergrp, timeout); + PerfStats stats, String readerGrp, int timeout, boolean writeAndRead) { + super(readerId, events, secondsToRun, 0, start, stats, readerGrp, timeout); - perf = secondsToRun > 0 ? (writeAndRead ? new EventsTimeReaderRW() : new EventsTimeReader()) : - (writeAndRead ? new EventsReaderRW() : new EventsReader()); + this.writeAndRead = writeAndRead; + this.perf = createBenchmark(); } + private Performance createBenchmark() { + final Performance perfReader; + if (secondsToRun > 0) { + perfReader = writeAndRead ? this::EventsTimeReaderRW : this::EventsTimeReader; + } else { + perfReader = writeAndRead ? this::EventsReaderRW : this::EventsReader; + } + return perfReader; + } + + /** * read the data. */ @@ -47,81 +58,78 @@ public Void call() throws InterruptedException, ExecutionException, IOException return null; } - private class EventsReader implements Performance { - - public void benchmark() throws IOException { - String ret = null; - try { - for (int i = 0; i < events; i++) { - final long startTime = System.currentTimeMillis(); - ret = readData(); - if (ret != null) { - stats.recordTime(startTime, System.currentTimeMillis(), ret.length()); - } + + public void EventsReader() throws IOException { + String ret = null; + try { + int i = 0; + while (i < events) { + final long startTime = System.currentTimeMillis(); + ret = readData(); + if (ret != null) { + stats.recordTime(startTime, System.currentTimeMillis(), ret.length()); + i++; } - } finally { - close(); } + } finally { + close(); } } - private class EventsReaderRW implements Performance { - public void benchmark() throws IOException { - String ret = null; - try { - for (int i = 0; i < events; i++) { - ret = readData(); - if (ret != null) { - final long endTime = System.currentTimeMillis(); - final long startTime = Long.parseLong(ret.substring(0, TIME_HEADER_SIZE)); - stats.recordTime(startTime, endTime, ret.length()); - } + + public void EventsReaderRW() throws IOException { + String ret = null; + try { + int i = 0; + while (i < events) { + ret = readData(); + if (ret != null) { + final long endTime = System.currentTimeMillis(); + final long start = Long.parseLong(ret.substring(0, TIME_HEADER_SIZE)); + stats.recordTime(start, endTime, ret.length()); + i++; } - } finally { - close(); } + } finally { + close(); } } - private class EventsTimeReader implements Performance { - public void benchmark() throws IOException { - final long msToRun = secondsToRun * MS_PER_SEC; - String ret = null; - long time = System.currentTimeMillis(); + public void EventsTimeReader() throws IOException { + final long msToRun = secondsToRun * MS_PER_SEC; + String ret = null; + long time = System.currentTimeMillis(); - try { - - while ((time - startTime) < msToRun) { - time = System.currentTimeMillis(); - ret = readData(); - if (ret != null) { - stats.recordTime(time, System.currentTimeMillis(), ret.length()); - } + try { + while ((time - startTime) < msToRun) { + time = System.currentTimeMillis(); + ret = readData(); + if (ret != null) { + stats.recordTime(time, System.currentTimeMillis(), ret.length()); } - } finally { - close(); } + } finally { + close(); } } - private class EventsTimeReaderRW implements Performance { - public void benchmark() throws IOException { - final long msToRun = secondsToRun * MS_PER_SEC; - String ret = null; - long time = System.currentTimeMillis(); - try { - while ((time - startTime) < msToRun) { - ret = readData(); - if (ret != null) { - time = System.currentTimeMillis(); - final long startTime = Long.parseLong(ret.substring(0, TIME_HEADER_SIZE)); - stats.recordTime(startTime, time, ret.length()); - } + + public void EventsTimeReaderRW() throws IOException { + final long msToRun = secondsToRun * MS_PER_SEC; + String ret = null; + long time = System.currentTimeMillis(); + try { + while ((time - startTime) < msToRun) { + ret = readData(); + time = System.currentTimeMillis(); + if (ret != null) { + final long start = Long.parseLong(ret.substring(0, TIME_HEADER_SIZE)); + stats.recordTime(startTime, time, ret.length()); } - } finally { - close(); } + } finally { + close(); } } } diff --git a/src/main/java/io/pravega/perf/Worker.java b/src/main/java/io/pravega/perf/Worker.java index f491cd161..2af971dac 100644 --- a/src/main/java/io/pravega/perf/Worker.java +++ b/src/main/java/io/pravega/perf/Worker.java @@ -11,9 +11,12 @@ package io.pravega.perf; /** - * Abstract class for Writers and Readers. + * Abstract class for Writers and Readers. */ public abstract class Worker { + final static int TIME_HEADER_SIZE = 14; + final static String TIME_HEADER_FORMAT = "%0" + TIME_HEADER_SIZE + "d"; + final int workerID; final int events; final int messageSize; diff --git a/src/main/java/io/pravega/perf/WriterWorker.java b/src/main/java/io/pravega/perf/WriterWorker.java index e5e2a0512..adec65357 100644 --- a/src/main/java/io/pravega/perf/WriterWorker.java +++ b/src/main/java/io/pravega/perf/WriterWorker.java @@ -25,18 +25,19 @@ public abstract class WriterWorker extends Worker implements Callable { final private Performance perf; final private String payload; final private int eventsPerSec; + final private int EventsPerFlush; + final private boolean writeAndRead; - WriterWorker(int sensorId, int events, int secondsToRun, + WriterWorker(int sensorId, int events, int EventsPerFlush, int secondsToRun, boolean isRandomKey, int messageSize, long start, PerfStats stats, String streamName, int eventsPerSec, boolean writeAndRead) { - super(sensorId, events, secondsToRun, - messageSize, start, stats, - streamName, 0); + super(sensorId, events, secondsToRun, messageSize, start, stats, streamName, 0); this.eventsPerSec = eventsPerSec; - perf = secondsToRun > 0 ? (writeAndRead ? new EventsWriterTimeRW() : new EventsWriterTime()) : - (writeAndRead ? new EventsWriterRW() : new EventsWriter()); - payload = createPayload(messageSize); + this.EventsPerFlush = EventsPerFlush; + this.writeAndRead = writeAndRead; + this.payload = createPayload(messageSize); + this.perf = createBenchmark(); } @@ -50,8 +51,35 @@ private String createPayload(int size) { } + private Performance createBenchmark() { + final Performance perfWriter; + if (secondsToRun > 0) { + if (writeAndRead) { + perfWriter = this::EventsWriterTimeRW; + } else { + if (eventsPerSec > 0 || EventsPerFlush < Integer.MAX_VALUE) { + perfWriter = this::EventsWriterTimeSleep; + } else { + perfWriter = this::EventsWriterTime; + } + } + } else { + if (writeAndRead) { + perfWriter = this::EventsWriterRW; + } else { + if (eventsPerSec > 0 || EventsPerFlush < Integer.MAX_VALUE) { + perfWriter = this::EventsWriterSleep; + } else { + perfWriter = this::EventsWriter; + } + } + } + return perfWriter; + } + + /** - * Writes the data and benchmark + * Writes the data and benchmark. * * @param data data to write * @param record to call for benchmarking @@ -60,7 +88,7 @@ private String createPayload(int size) { public abstract long recordWrite(String data, TriConsumer record); /** - * Writes the data and benchmark + * Writes the data and benchmark. * * @param data data to write */ @@ -71,79 +99,117 @@ private String createPayload(int size) { */ public abstract void flush(); + /** + * Flush the producer data. + */ + public abstract void close(); + + @Override public Void call() throws InterruptedException, ExecutionException, IOException { perf.benchmark(); return null; } - private class EventsWriter implements Performance { - public void benchmark() throws InterruptedException, IOException { - final EventsController eCnt = new EventsController(System.currentTimeMillis(), eventsPerSec); - for (int i = 0; i < events; i++) { - recordWrite(payload, stats::recordTime); - eCnt.control(i); - } - flush(); + private void EventsWriter() throws InterruptedException, IOException { + for (int i = 0; i < events; i++) { + recordWrite(payload, stats::recordTime); } + flush(); } - private class EventsWriterRW implements Performance { - - public void benchmark() throws InterruptedException, IOException { - final long time = System.currentTimeMillis(); - final String timeHeader = String.format(TIME_HEADER_FORMAT, time); - final EventsController eCnt = new EventsController(time, eventsPerSec); - final StringBuilder buffer = new StringBuilder(timeHeader + ", " + workerID + ", " + payload); - buffer.setLength(messageSize); - for (int i = 0; i < events; i++) { - final String header = String.format(TIME_HEADER_FORMAT, System.currentTimeMillis()); - final String data = buffer.replace(0, TIME_HEADER_SIZE, header).toString(); - writeData(data); - eCnt.control(i); + + private void EventsWriterSleep() throws InterruptedException, IOException { + final EventsController eCnt = new EventsController(System.currentTimeMillis(), eventsPerSec); + int cnt = 0; + while (cnt < events) { + int loopMax = Math.min(EventsPerFlush, events - cnt); + for (int i = 0; i < loopMax; i++) { + eCnt.control(cnt++, recordWrite(payload, stats::recordTime)); } flush(); } } - private class EventsWriterTime implements Performance { - public void benchmark() throws InterruptedException, IOException { - final long msToRun = secondsToRun * MS_PER_SEC; - long time = System.currentTimeMillis(); - final EventsController eCnt = new EventsController(time, eventsPerSec); + private void EventsWriterTime() throws InterruptedException, IOException { + final long msToRun = secondsToRun * MS_PER_SEC; + long time = System.currentTimeMillis(); + while ((time - startTime) < msToRun) { + time = recordWrite(payload, stats::recordTime); + } + flush(); + } + - for (int i = 0; (time - startTime) < msToRun; i++) { + private void EventsWriterTimeSleep() throws InterruptedException, IOException { + final long msToRun = secondsToRun * MS_PER_SEC; + long time = System.currentTimeMillis(); + final EventsController eCnt = new EventsController(time, eventsPerSec); + long msElapsed = time - startTime; + int cnt = 0; + while (msElapsed < msToRun) { + for (int i = 0; (msElapsed < msToRun) && (i < EventsPerFlush); i++) { time = recordWrite(payload, stats::recordTime); - eCnt.control(i); + eCnt.control(cnt++, time); + msElapsed = time - startTime; } flush(); } } - private class EventsWriterTimeRW implements Performance { - - public void benchmark() throws InterruptedException, IOException { - final long msToRun = secondsToRun * MS_PER_SEC; - long time = System.currentTimeMillis(); - final String timeHeader = String.format(TIME_HEADER_FORMAT, time); - final EventsController eCnt = new EventsController(time, eventsPerSec); - final StringBuilder buffer = new StringBuilder(timeHeader + ", " + workerID + ", " + payload); - buffer.setLength(messageSize); - for (int i = 0; (time - startTime) < msToRun; i++) { - time = System.currentTimeMillis(); - final String header = String.format(TIME_HEADER_FORMAT, time); - final String data = buffer.replace(0, TIME_HEADER_SIZE, header).toString(); - writeData(data); - eCnt.control(i); - } + + private void EventsWriterRW() throws InterruptedException, IOException { + final long time = System.currentTimeMillis(); + final String timeHeader = String.format(TIME_HEADER_FORMAT, time); + final EventsController eCnt = new EventsController(time, eventsPerSec); + final StringBuilder buffer = new StringBuilder(timeHeader + ", " + workerID + ", " + payload); + buffer.setLength(messageSize); + for (int i = 0; i < events; i++) { + final String header = String.format(TIME_HEADER_FORMAT, System.currentTimeMillis()); + final String data = buffer.replace(0, TIME_HEADER_SIZE, header).toString(); + writeData(data); + /* + flush is required here for following reasons: + 1. The writeData is called for End to End latency mode; hence make sure that data is sent. + 2. In case of kafka benchmarking, the buffering makes too many writes; + flushing moderates the kafka producer. + 3. If the flush called after several iterations, then flush may take too much of time. + */ + flush(); + eCnt.control(i); + } + } + + + private void EventsWriterTimeRW() throws InterruptedException, IOException { + final long msToRun = secondsToRun * MS_PER_SEC; + long time = System.currentTimeMillis(); + final String timeHeader = String.format(TIME_HEADER_FORMAT, time); + final EventsController eCnt = new EventsController(time, eventsPerSec); + final StringBuilder buffer = new StringBuilder(timeHeader + ", " + workerID + ", " + payload); + buffer.setLength(messageSize); + for (int i = 0; (time - startTime) < msToRun; i++) { + time = System.currentTimeMillis(); + final String header = String.format(TIME_HEADER_FORMAT, time); + final String data = buffer.replace(0, TIME_HEADER_SIZE, header).toString(); + writeData(data); + /* + flush is required here for following reasons: + 1. The writeData is called for End to End latency mode; hence make sure that data is sent. + 2. In case of kafka benchmarking, the buffering makes too many writes; + flushing moderates the kafka producer. + 3. If the flush called after several iterations, then flush may take too much of time. + */ flush(); + eCnt.control(i); } } + @NotThreadSafe - static private class EventsController { + final static private class EventsController { private static final long NS_PER_MS = 1000000L; private static final long NS_PER_SEC = 1000 * NS_PER_MS; private static final long MIN_SLEEP_NS = 2 * NS_PER_MS; @@ -167,12 +233,28 @@ private EventsController(long start, int eventsPerSec) { * * @param events current events */ - private void control(long events) { + void control(long events) { if (this.eventsPerSec <= 0) { return; } + needSleep(events, System.currentTimeMillis()); + } + + /** + * Blocks for small amounts of time to achieve targetThroughput/events per sec + * + * @param events current events + * @param time current time + */ + void control(long events, long time) { + if (this.eventsPerSec <= 0) { + return; + } + needSleep(events, time); + } - float elapsedSec = (System.currentTimeMillis() - startTime) / 1000.f; + private void needSleep(long events, long time) { + float elapsedSec = (time - startTime) / 1000.f; if ((events / elapsedSec) < this.eventsPerSec) { return;