Skip to content

Commit

Permalink
Issue 69: Added ability to write and read watermarks (#70)
Browse files Browse the repository at this point in the history
Pravega Benchmark now allows users to exercise watermarks on readers and writers (event-based or transactional).

Signed-off-by: Claudio Fahey <claudio.fahey@dell.com>
  • Loading branch information
Claudio Fahey authored and RaulGracia committed Nov 7, 2019
1 parent e30cfb8 commit 18a5ec8
Show file tree
Hide file tree
Showing 8 changed files with 257 additions and 65 deletions.
75 changes: 46 additions & 29 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,35 +47,52 @@ Running Pravega benchmark tool locally:
```
<dir>/pravega-benchmark$ ./run/pravega-benchmark/bin/pravega-benchmark -help
usage: pravega-benchmark
-consumers <arg> Number of consumers
-controller <arg> Controller URI
-enableConnectionPooling <arg> Set to false to disable connection
pooling
-events <arg> Number of events/records if 'time' not
specified;
otherwise, Maximum events per second by
producer(s) and/or Number of events per
consumer
-flush <arg> Each producer calls flush after writing
<arg> number of of events/records; Not
applicable, if both producers and
consumers are specified
-help Help message
-producers <arg> Number of producers
-readcsv <arg> CSV file to record read latencies
-recreate <arg> If the stream is already existing,
delete and recreate the same
-scope <arg> Scope name
-segments <arg> Number of segments
-size <arg> Size of each message (event or record)
-stream <arg> Stream name
-throughput <arg> if > 0 , throughput in MB/s
if 0 , writes 'events'
if -1, get the maximum throughput
-time <arg> Number of seconds the code runs
-transactionspercommit <arg> Number of events before a transaction is
committed
-writecsv <arg> CSV file to record write latencies
-consumers <arg> Number of consumers
-controller <arg> Controller URI
-enableConnectionPooling <arg> Set to false to disable connection
pooling
-events <arg> Number of events/records if 'time'
not specified;
otherwise, Maximum events per second
by producer(s) and/or Number of
events per consumer
-flush <arg> Each producer calls flush after
writing <arg> number of of
events/records; Not applicable, if
both producers and consumers are
specified
-help Help message
-producers <arg> Number of producers
-readcsv <arg> CSV file to record read latencies
-readWatermarkPeriodMillis <arg> If -1 (default), watermarks will not
be read.
If >0, watermarks will be read with a
period of this many milliseconds.
-recreate <arg> If the stream is already existing,
delete and recreate the same
-scope <arg> Scope name
-segments <arg> Number of segments
-size <arg> Size of each message (event or
record)
-stream <arg> Stream name
-throughput <arg> if > 0 , throughput in MB/s
if 0 , writes 'events'
if -1, get the maximum throughput
-time <arg> Number of seconds the code runs
-transactionspercommit <arg> Number of events before a transaction
is committed
-writecsv <arg> CSV file to record write latencies
-writeWatermarkPeriodMillis <arg> If -1 (default), watermarks will not
be written.
If 0 and not using transactions,
watermarks will be written after
every event.
If >0 and not using transactions,
watermarks will be written with a
period of this many milliseconds.
If >= 0 and using transactions,
watermarks will be written on each
commit.
## Running Performance benchmarking
Expand Down
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ buildscript {
mavenLocal()
jcenter()
mavenCentral()
maven {
url "https://oss.jfrog.org/jfrog-dependencies"
}
}

dependencies {
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@

commonsCLIVersion=1.3.1
commonsCSVVersion=1.5
pravegaVersion=0.5.0
pravegaVersion=0.6.0-50.fb3423a-SNAPSHOT
slf4jSimpleVersion=1.7.14
54 changes: 35 additions & 19 deletions src/main/java/io/pravega/perf/PravegaPerfTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,34 +11,32 @@
package io.pravega.perf;

import io.pravega.client.ClientConfig;
import io.pravega.client.ClientFactory;
import io.pravega.client.EventStreamClientFactory;
import io.pravega.client.stream.ReaderGroup;
import io.pravega.client.stream.impl.ClientFactoryImpl;
import io.pravega.client.stream.impl.ControllerImpl;
import io.pravega.client.stream.impl.ControllerImplConfig;
import io.pravega.client.stream.impl.ClientFactoryImpl;

import java.io.IOException;
import java.net.URISyntaxException;

import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;

import java.io.IOException;
import java.net.URI;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.IntStream;
import java.net.URISyntaxException;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

/**
* Performance benchmark for Pravega.
Expand Down Expand Up @@ -81,6 +79,14 @@ public static void main(String[] args) {
options.addOption("writecsv", true, "CSV file to record write latencies");
options.addOption("readcsv", true, "CSV file to record read latencies");
options.addOption("enableConnectionPooling", true, "Set to false to disable connection pooling");
options.addOption("writeWatermarkPeriodMillis", true,
"If -1 (default), watermarks will not be written.\n" +
"If 0 and not using transactions, watermarks will be written after every event.\n" +
"If >0 and not using transactions, watermarks will be written with a period of this many milliseconds.\n" +
"If >= 0 and using transactions, watermarks will be written on each commit.");
options.addOption("readWatermarkPeriodMillis", true,
"If -1 (default), watermarks will not be read.\n" +
"If >0, watermarks will be read with a period of this many milliseconds.");

options.addOption("help", false, "Help message");

Expand Down Expand Up @@ -196,6 +202,8 @@ static private abstract class Test {
final PerfStats consumeStats;
final long startTime;
final boolean enableConnectionPooling;
final long writeWatermarkPeriodMillis;
final long readWatermarkPeriodMillis;

Test(long startTime, CommandLine commandline) throws IllegalArgumentException {
this.startTime = startTime;
Expand Down Expand Up @@ -297,6 +305,9 @@ static private abstract class Test {

enableConnectionPooling = Boolean.parseBoolean(commandline.getOptionValue("enableConnectionPooling", "true"));

writeWatermarkPeriodMillis = Long.parseLong(commandline.getOptionValue("writeWatermarkPeriodMillis", "-1"));
readWatermarkPeriodMillis = Long.parseLong(commandline.getOptionValue("readWatermarkPeriodMillis", "-1"));

if (controllerUri == null) {
throw new IllegalArgumentException("Error: Must specify Controller IP address");
}
Expand Down Expand Up @@ -390,7 +401,7 @@ public void shutdown(long endTime) {

static private class PravegaTest extends Test {
final PravegaStreamHandler streamHandle;
final ClientFactory factory;
final EventStreamClientFactory factory;
final ReaderGroup readerGroup;

PravegaTest(long startTime, CommandLine commandline) throws
Expand Down Expand Up @@ -428,22 +439,25 @@ public List<WriterWorker> getProducers() {

if (producerCount > 0) {
if (transactionPerCommit > 0) {
final boolean enableWatermark = writeWatermarkPeriodMillis >= 0;
writers = IntStream.range(0, producerCount)
.boxed()
.map(i -> new PravegaTransactionWriterWorker(i, eventsPerProducer,
runtimeSec, false,
messageSize, startTime,
produceStats, streamName,
eventsPerSec, writeAndRead, factory,
transactionPerCommit, enableConnectionPooling))
transactionPerCommit, enableConnectionPooling,
enableWatermark))
.collect(Collectors.toList());
} else {
writers = IntStream.range(0, producerCount)
.boxed()
.map(i -> new PravegaWriterWorker(i, eventsPerProducer,
EventsPerFlush, runtimeSec, false,
messageSize, startTime, produceStats,
streamName, eventsPerSec, writeAndRead, factory, enableConnectionPooling))
streamName, eventsPerSec, writeAndRead, factory, enableConnectionPooling,
writeWatermarkPeriodMillis))
.collect(Collectors.toList());
}
} else {
Expand All @@ -460,7 +474,9 @@ public List<ReaderWorker> getConsumers() throws URISyntaxException {
.boxed()
.map(i -> new PravegaReaderWorker(i, eventsPerConsumer,
runtimeSec, startTime, consumeStats,
rdGrpName, TIMEOUT, writeAndRead, factory))
rdGrpName, TIMEOUT, writeAndRead, factory,
io.pravega.client.stream.Stream.of(scopeName, streamName),
readWatermarkPeriodMillis))
.collect(Collectors.toList());
} else {
readers = null;
Expand Down
40 changes: 37 additions & 3 deletions src/main/java/io/pravega/perf/PravegaReaderWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,49 @@

package io.pravega.perf;

import io.pravega.client.EventStreamClientFactory;
import io.pravega.client.stream.EventStreamReader;
import io.pravega.client.ClientFactory;
import io.pravega.client.stream.impl.ByteArraySerializer;
import io.pravega.client.stream.ReaderConfig;
import io.pravega.client.stream.ReinitializationRequiredException;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.TimeWindow;
import io.pravega.client.stream.impl.ByteArraySerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
* Class for Pravega reader/consumer.
*/
public class PravegaReaderWorker extends ReaderWorker {
private static Logger log = LoggerFactory.getLogger(PravegaReaderWorker.class);

private final EventStreamReader<byte[]> reader;
private final Stream stream;
private final ScheduledExecutorService watermarkExecutor = Executors.newScheduledThreadPool(1);

/**
*
* @param readWatermarkPeriodMillis If >0, watermarks will be read with a period of this many milliseconds.
*/
PravegaReaderWorker(int readerId, int events, int secondsToRun,
long start, PerfStats stats, String readergrp,
int timeout, boolean writeAndRead, ClientFactory factory) {
int timeout, boolean writeAndRead, EventStreamClientFactory factory,
Stream stream, long readWatermarkPeriodMillis) {
super(readerId, events, secondsToRun, start, stats, readergrp, timeout, writeAndRead);

final String readerSt = Integer.toString(readerId);
reader = factory.createReader(
readerSt, readergrp, new ByteArraySerializer(), ReaderConfig.builder().build());
this.stream = stream;

if (readWatermarkPeriodMillis > 0) {
watermarkExecutor.scheduleAtFixedRate(this::readWatermark, readWatermarkPeriodMillis, readWatermarkPeriodMillis,
TimeUnit.MILLISECONDS);
}
}

@Override
Expand All @@ -41,8 +64,19 @@ public byte[] readData() {
}
}

private void readWatermark() {
TimeWindow currentTimeWindow = reader.getCurrentTimeWindow(stream);
log.debug("readWatermark: currentTimeWindow={}", currentTimeWindow);
}

@Override
public void close() {
watermarkExecutor.shutdown();
try {
watermarkExecutor.awaitTermination(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
reader.close();
}
}
Loading

0 comments on commit 18a5ec8

Please sign in to comment.