Skip to content

Commit

Permalink
Issues 46 and 48: Add the "-scope" parameter and fix the reader-group…
Browse files Browse the repository at this point in the history
…s issue (#50)

Add new "scope" parameter for ReaderGroups and switched to byte serialization for event payloads.

Signed-off-by: Keshava Munegowda <keshava.munegowda@dell.com>
  • Loading branch information
kmgowda authored and RaulGracia committed Aug 16, 2019
1 parent cbe548c commit 668de71
Show file tree
Hide file tree
Showing 10 changed files with 108 additions and 74 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ usage: pravega-benchmark
-readcsv <arg> CSV file to record read latencies
-recreate <arg> If the stream is already existing, delete
and recreate the same
-scope <arg> Scope name
-segments <arg> Number of segments
-size <arg> Size of each message (event or record)
-stream <arg> Stream name
Expand Down
22 changes: 5 additions & 17 deletions src/main/java/io/pravega/perf/PerfStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -212,19 +212,7 @@ static private class LatencyWriter {
long totalLatency;
long maxLatency;
long totalBytes;
ArrayList<LatencyRange> latencyRanges;

private class LatencyRange {
private final int latency;
private final int start;
private final int end;

private LatencyRange(int latency, int start, int end) {
this.latency = latency;
this.start = start;
this.end = end;
}
}
ArrayList<int[]> latencyRanges;

LatencyWriter(String action, int messageSize, long startTime) {
this.action = action;
Expand All @@ -242,7 +230,7 @@ private void countLatencies() {
latencyRanges = new ArrayList<>();
for (int i = 0, cur = 0; i < latencies.length; i++) {
if (latencies[i] > 0) {
latencyRanges.add(new LatencyRange(i, cur, cur + latencies[i]));
latencyRanges.add(new int[]{cur, cur + latencies[i], i});
cur += latencies[i] + 1;
totalLatency += i * latencies[i];
count += latencies[i];
Expand All @@ -260,10 +248,10 @@ private int[] getPercentiles() {
percentileIds[i] = (int) (count * percentiles[i]);
}

for (LatencyRange lr : latencyRanges) {
for (int[] lr : latencyRanges) {
while ((index < percentileIds.length) &&
(lr.start <= percentileIds[index]) && (percentileIds[index] <= lr.end)) {
values[index++] = lr.latency;
(lr[0] <= percentileIds[index]) && (percentileIds[index] <= lr[1])) {
values[index++] = lr[2];
}
}
return values;
Expand Down
28 changes: 22 additions & 6 deletions src/main/java/io/pravega/perf/PravegaPerfTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public static void main(String[] args) {
final long startTime = System.currentTimeMillis();

options.addOption("controller", true, "Controller URI");
options.addOption("scope", true, "Scope name");
options.addOption("stream", true, "Stream name");
options.addOption("producers", true, "Number of producers");
options.addOption("consumers", true, "Number of consumers");
Expand Down Expand Up @@ -125,6 +126,7 @@ public void run() {
if (producers != null) {
producers.forEach(WriterWorker::close);
}
perfTest.closeReaderGroup();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
Expand All @@ -141,6 +143,7 @@ public void run() {
if (producers != null) {
producers.forEach(WriterWorker::close);
}
perfTest.closeReaderGroup();
} catch (Exception ex) {
ex.printStackTrace();
}
Expand Down Expand Up @@ -171,6 +174,7 @@ static private abstract class Test {
final String controllerUri;
final int messageSize;
final String streamName;
final String rdGrpName;
final String scopeName;
final boolean recreate;
final boolean writeAndRead;
Expand Down Expand Up @@ -248,6 +252,12 @@ static private abstract class Test {
streamName = null;
}

if (commandline.hasOption("scope")) {
scopeName = commandline.getOptionValue("scope");
} else {
scopeName = SCOPE;
}

if (commandline.hasOption("transactionspercommit")) {
transactionPerCommit = Integer.parseInt(commandline.getOptionValue("transactionspercommit"));
} else {
Expand Down Expand Up @@ -283,8 +293,6 @@ static private abstract class Test {
readFile = null;
}

scopeName = SCOPE;

if (controllerUri == null) {
throw new IllegalArgumentException("Error: Must specify Controller IP address");
}
Expand All @@ -297,6 +305,12 @@ static private abstract class Test {
throw new IllegalArgumentException("Error: Must specify the number of producers or Consumers");
}

if (recreate) {
rdGrpName = streamName + startTime;
} else {
rdGrpName = streamName + "RdGrp";
}

if (producerCount > 0) {
if (messageSize == 0) {
throw new IllegalArgumentException("Error: Must specify the event 'size'");
Expand Down Expand Up @@ -362,6 +376,8 @@ public void shutdown(long endTime) {
}
}

public abstract void closeReaderGroup();

public abstract List<WriterWorker> getProducers();

public abstract List<ReaderWorker> getConsumers() throws URISyntaxException;
Expand All @@ -383,7 +399,7 @@ static private class PravegaTest extends Test {
.maxBackoffMillis(5000).build(),
bgExecutor);

streamHandle = new PravegaStreamHandler(scopeName, streamName, controllerUri,
streamHandle = new PravegaStreamHandler(scopeName, streamName, rdGrpName, controllerUri,
segmentCount, TIMEOUT, controller,
bgExecutor);

Expand Down Expand Up @@ -440,7 +456,7 @@ public List<ReaderWorker> getConsumers() throws URISyntaxException {
.boxed()
.map(i -> new PravegaReaderWorker(i, eventsPerConsumer,
runtimeSec, startTime, consumeStats,
streamName, TIMEOUT, writeAndRead, factory))
rdGrpName, TIMEOUT, writeAndRead, factory))
.collect(Collectors.toList());
} else {
readers = null;
Expand All @@ -449,11 +465,11 @@ public List<ReaderWorker> getConsumers() throws URISyntaxException {
}

@Override
public void shutdown(long endTime) {
public void closeReaderGroup() {
if (readerGroup != null) {
readerGroup.close();
}
super.shutdown(endTime);
}

}
}
8 changes: 4 additions & 4 deletions src/main/java/io/pravega/perf/PravegaReaderWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@

import io.pravega.client.stream.EventStreamReader;
import io.pravega.client.ClientFactory;
import io.pravega.client.stream.impl.UTF8StringSerializer;
import io.pravega.client.stream.impl.ByteArraySerializer;
import io.pravega.client.stream.ReaderConfig;
import io.pravega.client.stream.ReinitializationRequiredException;

/**
* Class for Pravega reader/consumer.
*/
public class PravegaReaderWorker extends ReaderWorker {
private final EventStreamReader<String> reader;
private final EventStreamReader<byte[]> reader;

PravegaReaderWorker(int readerId, int events, int secondsToRun,
long start, PerfStats stats, String readergrp,
Expand All @@ -29,11 +29,11 @@ public class PravegaReaderWorker extends ReaderWorker {

final String readerSt = Integer.toString(readerId);
reader = factory.createReader(
readerSt, readergrp, new UTF8StringSerializer(), ReaderConfig.builder().build());
readerSt, readergrp, new ByteArraySerializer(), ReaderConfig.builder().build());
}

@Override
public String readData() {
public byte[] readData() {
try {
return reader.readNextEvent(timeout).getEvent();
} catch (ReinitializationRequiredException e) {
Expand Down
27 changes: 21 additions & 6 deletions src/main/java/io/pravega/perf/PravegaStreamHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,25 @@
public class PravegaStreamHandler {
final String scope;
final String stream;
final String rdGrpName;
final String controllerUri;
final ControllerImpl controller;
final StreamManager streamManager;
final StreamConfiguration streamconfig;
final ScheduledExecutorService bgexecutor;
final int segCount;
final int timeout;
ReaderGroupManager readerGroupManager;
ReaderGroupConfig rdGrpConfig;

PravegaStreamHandler(String scope, String stream,
String rdGrpName,
String uri, int segs,
int timeout, ControllerImpl contrl,
ScheduledExecutorService bgexecutor) throws Exception {
this.scope = scope;
this.stream = stream;
this.rdGrpName = rdGrpName;
this.controllerUri = uri;
this.controller = contrl;
this.segCount = segs;
Expand Down Expand Up @@ -135,15 +140,25 @@ void recreate() throws InterruptedException, ExecutionException, TimeoutExceptio
}

ReaderGroup createReaderGroup(boolean reset) throws URISyntaxException {
final ReaderGroupManager readerGroupManager = ReaderGroupManager.withScope(scope,
ClientConfig.builder().controllerURI(new URI(controllerUri)).build());
final ReaderGroupConfig rdGrpConfig = ReaderGroupConfig.builder()
.stream(Stream.of(scope, stream)).build();
readerGroupManager.createReaderGroup(stream, rdGrpConfig);
final ReaderGroup rdGroup = readerGroupManager.getReaderGroup(stream);
if (readerGroupManager == null) {
readerGroupManager = ReaderGroupManager.withScope(scope,
ClientConfig.builder().controllerURI(new URI(controllerUri)).build());
rdGrpConfig = ReaderGroupConfig.builder()
.stream(Stream.of(scope, stream)).build();
}
readerGroupManager.createReaderGroup(rdGrpName, rdGrpConfig);
final ReaderGroup rdGroup = readerGroupManager.getReaderGroup(rdGrpName);
if (reset) {
rdGroup.resetReaderGroup(rdGrpConfig);
}
return rdGroup;
}

void deleteReaderGroup() {
try {
readerGroupManager.deleteReaderGroup(rdGrpName);
} catch (RuntimeException e) {
System.out.println("Cannot delete reader group " + rdGrpName + " because it is already deleted");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class PravegaTransactionWriterWorker extends PravegaWriterWorker {
private int eventCount;

@GuardedBy("this")
private Transaction<String> transaction;
private Transaction<byte[]> transaction;

PravegaTransactionWriterWorker(int sensorId, int events,
int secondsToRun, boolean isRandomKey,
Expand All @@ -40,7 +40,7 @@ public class PravegaTransactionWriterWorker extends PravegaWriterWorker {
}

@Override
public long recordWrite(String data, TriConsumer record) {
public long recordWrite(byte[] data, TriConsumer record) {
long time = 0;
try {
synchronized (this) {
Expand Down
12 changes: 6 additions & 6 deletions src/main/java/io/pravega/perf/PravegaWriterWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@

import io.pravega.client.stream.EventStreamWriter;
import io.pravega.client.ClientFactory;
import io.pravega.client.stream.impl.UTF8StringSerializer;
import io.pravega.client.stream.impl.ByteArraySerializer;
import io.pravega.client.stream.EventWriterConfig;

/**
* Class for Pravega writer/producer.
*/
public class PravegaWriterWorker extends WriterWorker {
final EventStreamWriter<String> producer;
final EventStreamWriter<byte[]> producer;

PravegaWriterWorker(int sensorId, int events, int EventsPerFlush, int secondsToRun,
boolean isRandomKey, int messageSize, long start,
Expand All @@ -33,23 +33,23 @@ public class PravegaWriterWorker extends WriterWorker {
stats, streamName, eventsPerSec, writeAndRead);

this.producer = factory.createEventWriter(streamName,
new UTF8StringSerializer(),
new ByteArraySerializer(),
EventWriterConfig.builder().build());
}

@Override
public long recordWrite(String data, TriConsumer record) {
public long recordWrite(byte[] data, TriConsumer record) {
CompletableFuture ret;
final long time = System.currentTimeMillis();
ret = producer.writeEvent(data);
ret.thenAccept(d -> {
record.accept(time, System.currentTimeMillis(), data.length());
record.accept(time, System.currentTimeMillis(), data.length);
});
return time;
}

@Override
public void writeData(String data) {
public void writeData(byte[] data) {
producer.writeEvent(data);
}

Expand Down
Loading

0 comments on commit 668de71

Please sign in to comment.