Skip to content

Commit

Permalink
Merge pull request #436 from kmgowda/kmg-perl-sleep-1
Browse files Browse the repository at this point in the history
Implement option 'millisecsleep' in SBK command line parameters

Signed-off-by: Keshava Munegowda <keshava.gowda@gmail.com>
  • Loading branch information
kmgowda authored Aug 16, 2024
2 parents e6cacd9 + e631399 commit 93e3f45
Show file tree
Hide file tree
Showing 10 changed files with 224 additions and 33 deletions.
46 changes: 46 additions & 0 deletions perl/src/main/java/io/perl/api/PerformanceRecorder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/**
* Copyright (c) KMG. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/

package io.perl.api;
import io.time.Time;

import javax.annotation.Nonnull;

abstract public class PerformanceRecorder {
final protected int windowIntervalMS;
final protected Time time;
final protected PeriodicRecorder periodicRecorder;
final protected Channel[] channels;

/**
* Constructor to initialize values.
*
* @param periodicRecorder PeriodicRecorder
* @param channels Channel[]
* @param time Time
* @param reportingIntervalMS int
*/
public PerformanceRecorder(PeriodicRecorder periodicRecorder, @Nonnull Channel[] channels, Time time,
int reportingIntervalMS) {
this.periodicRecorder = periodicRecorder;
this.channels = channels.clone();
this.time = time;
this.windowIntervalMS = reportingIntervalMS;
}

/**
* Method run.
*
* @param secondsToRun final long.
* @param totalRecords final long.
*/
abstract public void run(final long secondsToRun, final long totalRecords);

}
25 changes: 15 additions & 10 deletions perl/src/main/java/io/perl/api/impl/CQueuePerl.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
package io.perl.api.impl;

import io.perl.api.Channel;
import io.perl.api.PerformanceRecorder;
import io.perl.api.PeriodicRecorder;
import io.perl.api.Perl;
import io.perl.api.PerlChannel;
Expand Down Expand Up @@ -51,11 +52,11 @@ final public class CQueuePerl implements Perl {
/**
* Constructor CQueuePerl initialize all values.
*
* @param perlConfig NotNull PerlConfig
* @param periodicRecorder PeriodicRecorder
* @param reportingIntervalMS int
* @param time Time
* @param executor ExecutorService
* @param perlConfig NotNull PerlConfig
* @param periodicRecorder PeriodicRecorder
* @param reportingIntervalMS int
* @param time Time
* @param executor ExecutorService
*/
public CQueuePerl(@NotNull PerlConfig perlConfig, PeriodicRecorder periodicRecorder,
int reportingIntervalMS, Time time, ExecutorService executor) {
Expand All @@ -75,8 +76,13 @@ public CQueuePerl(@NotNull PerlConfig perlConfig, PeriodicRecorder periodicRecor
for (int i = 0; i < channels.length; i++) {
channels[i] = new CQueueChannel(maxQs, new OnError());
}
this.perlReceiver = new PerformanceRecorder(periodicRecorder, channels, time, reportingIntervalMS,
Math.max(PerlConfig.MIN_IDLE_NS, perlConfig.idleNS));
if (perlConfig.sleepMS > 0) {
this.perlReceiver = new PerformanceRecorderIdleSleep(periodicRecorder, channels, time, reportingIntervalMS,
Math.min(perlConfig.sleepMS, reportingIntervalMS));
} else {
this.perlReceiver = new PerformanceRecorderIdleBusyWait(periodicRecorder, channels, time, reportingIntervalMS,
Math.max(PerlConfig.MIN_IDLE_NS, perlConfig.idleNS));
}
}


Expand Down Expand Up @@ -153,8 +159,7 @@ public CompletableFuture<Void> run(long secondsToRun, long recordsCount) {
}

/**
* Stop the CQ Perl.
*
* Stop the CQ Perl.
*/
@Override
public void stop() {
Expand All @@ -168,7 +173,7 @@ interface Throw {

@NotThreadSafe
static final class CQueueChannel extends ConcurrentLinkedQueueArray<TimeStamp> implements Channel {
final private int maxQs;
final private int maxQs;
final private Throw eThrow;
private int rIndex;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
package io.perl.api.impl;

import io.perl.api.Channel;
import io.perl.api.PerformanceRecorder;
import io.perl.api.PeriodicRecorder;
import io.perl.config.PerlConfig;
import io.perl.system.PerlPrinter;
Expand All @@ -23,12 +24,8 @@
* Class for Performance Recording.
*/
@NotThreadSafe
public final class PerformanceRecorder {
final private int windowIntervalMS;
public final class PerformanceRecorderIdleBusyWait extends PerformanceRecorder {
final private int idleNS;
final private Time time;
final private PeriodicRecorder periodicRecorder;
final private Channel[] channels;

/**
* Constructor to initialize values.
Expand All @@ -39,12 +36,9 @@ public final class PerformanceRecorder {
* @param reportingIntervalMS int
* @param idleNS int
*/
public PerformanceRecorder(PeriodicRecorder periodicRecorder, @Nonnull Channel[] channels, Time time,
int reportingIntervalMS, int idleNS) {
this.periodicRecorder = periodicRecorder;
this.channels = channels.clone();
this.time = time;
this.windowIntervalMS = reportingIntervalMS;
public PerformanceRecorderIdleBusyWait(PeriodicRecorder periodicRecorder, @Nonnull Channel[] channels, Time time,
int reportingIntervalMS, int idleNS) {
super(periodicRecorder, channels, time, reportingIntervalMS);
this.idleNS = idleNS;
}

Expand All @@ -64,7 +58,7 @@ public void run(final long secondsToRun, final long totalRecords) {
long recordsCnt = 0;
boolean notFound;
TimeStamp t;
PerlPrinter.log.info("Performance Recorder Started");
PerlPrinter.log.info("PerformanceRecorderIdleBusyWait Started");
periodicRecorder.start(startTime);
periodicRecorder.startWindow(startTime);
while (doWork) {
Expand Down Expand Up @@ -115,7 +109,7 @@ public void run(final long secondsToRun, final long totalRecords) {
}
}
periodicRecorder.stop(ctime);
PerlPrinter.log.info("Performance Recorder Exited");
PerlPrinter.log.info("PerformanceRecorderIdleBusyWait Exited");
}

}
104 changes: 104 additions & 0 deletions perl/src/main/java/io/perl/api/impl/PerformanceRecorderIdleSleep.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/**
* Copyright (c) KMG. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.perl.api.impl;

import io.perl.api.Channel;
import io.perl.api.PerformanceRecorder;
import io.perl.api.PeriodicRecorder;
import io.perl.api.TimeStamp;
import io.perl.system.PerlPrinter;
import io.time.Time;

import javax.annotation.Nonnull;

public class PerformanceRecorderIdleSleep extends PerformanceRecorder {
final private int sleepMS;

/**
* Constructor to initialize values.
*
* @param periodicRecorder PeriodicRecorder
* @param channels Channel[]
* @param time Time
* @param reportingIntervalMS int
* @param sleepMS int
*/
public PerformanceRecorderIdleSleep(PeriodicRecorder periodicRecorder, @Nonnull Channel[] channels, Time time,
int reportingIntervalMS, int sleepMS) {
super(periodicRecorder, channels, time, reportingIntervalMS);
this.sleepMS = sleepMS;
}

/**
* Method run.
*
* @param secondsToRun final long.
* @param totalRecords final long.
*/
public void run(final long secondsToRun, final long totalRecords) {
final long msToRun = secondsToRun * Time.MS_PER_SEC;
final long startTime = time.getCurrentTime();
boolean doWork = true;
long ctime = startTime;
long recordsCnt = 0;
boolean notFound;
TimeStamp t;
PerlPrinter.log.info("PerformanceRecorderIdleSleep Started");
periodicRecorder.start(startTime);
periodicRecorder.startWindow(startTime);
while (doWork) {
notFound = true;
for (int i = 0; doWork && (i < channels.length); i++) {
t = channels[i].receive(windowIntervalMS);
if (t != null) {
notFound = false;
ctime = t.endTime;
if (t.isEnd()) {
doWork = false;
} else {
recordsCnt += t.records;
periodicRecorder.record(t.startTime, t.endTime, t.records, t.bytes);
if (msToRun > 0) {
if (time.elapsedMilliSeconds(ctime, startTime) >= msToRun) {
doWork = false;
}
} else if (totalRecords > 0 && recordsCnt >= totalRecords) {
doWork = false;
}
}
if (periodicRecorder.elapsedMilliSecondsWindow(ctime) > windowIntervalMS) {
periodicRecorder.stopWindow(ctime);
periodicRecorder.startWindow(ctime);
}
}
}
if (doWork) {
if (notFound) {
try {
Thread.sleep(this.sleepMS);
} catch (InterruptedException e) {
PerlPrinter.log.warn("PerformanceRecorderIdleSleep : {}", e.getMessage());
}
ctime = time.getCurrentTime();
final long diffTime = periodicRecorder.elapsedMilliSecondsWindow(ctime);
if (diffTime > windowIntervalMS) {
periodicRecorder.stopWindow(ctime);
periodicRecorder.startWindow(ctime);
}
}
if (msToRun > 0 && time.elapsedMilliSeconds(ctime, startTime) >= msToRun) {
doWork = false;
}
}
}
periodicRecorder.stop(ctime);
PerlPrinter.log.info("PerformanceRecorderIdleSleep Exited");
}
}
5 changes: 5 additions & 0 deletions perl/src/main/java/io/perl/config/PerlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ final public class PerlConfig extends LatencyConfig {
*/
public int idleNS;

/**
* <code>int sleepMS</code>.
*/
public int sleepMS;

/**
* <code>int maxQs</code>.
*/
Expand Down
4 changes: 4 additions & 0 deletions perl/src/main/resources/perl.properties
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ maxQs=0
# Maximum Idle delay in Nano seconds to read the benchmark Data. Minimum value is 1000 Nano seconds (1 Micro second).
idleNS=1000000

# Maximum sleep time in Milli seconds to read the benchmark data. Initial default value is 0.
# if this value is more than 1 then idleNS will be discarded.
sleepMS=0

#Max Latency Array Size
maxArraySizeMB=64

Expand Down
39 changes: 29 additions & 10 deletions perl/src/test/java/io/perl/test/PerlTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.perl.api.Perl;
import io.perl.api.PerlChannel;
import io.perl.api.impl.PerlBuilder;
import io.perl.config.PerlConfig;
import io.perl.logger.impl.DefaultLogger;
import io.perl.system.PerlPrinter;
import org.junit.Assert;
Expand All @@ -28,12 +29,13 @@
/**
* Class for PerL validation.
*/
public class PerlTest {
public class PerlTest {
public final static int PERL_THREADS = 2;
public final static int PERL_TOTAL_RECORDS = 100;
public final static int PERL_RECORDS_PER_THREAD = PERL_TOTAL_RECORDS / PERL_THREADS;
public final static int PERL_RECORD_SIZE = 10;
public final static int PERL_TIMEOUT_SECONDS = 5;
public final static int PERL_SLEEP_MS = 100;

public static class TestLogger extends DefaultLogger {
public final AtomicLong latencyReporterCnt;
Expand Down Expand Up @@ -75,15 +77,14 @@ public void recordLatency(long startTime, int events, int bytes, long latency) {
}
}

@Test
public void testPerlRecords() throws IOException, ExecutionException, InterruptedException, TimeoutException {
TestLogger logger = new TestLogger();
Perl perl = PerlBuilder.build( logger, logger, null, null, null);

private void runPerlRecords(final TestLogger logger, final Perl perl) throws IOException, ExecutionException,
InterruptedException, TimeoutException {
PerlChannel[] channels = new PerlChannel[PERL_THREADS];
for (int i = 0; i < PERL_THREADS; i++) {
channels[i] = perl.getPerlChannel();
}
CompletableFuture<Void> ret = perl.run(0, PERL_TOTAL_RECORDS);
CompletableFuture<Void> ret = perl.run(0, PERL_TOTAL_RECORDS);

int records = PERL_TOTAL_RECORDS;
int ch = 0;
Expand All @@ -99,16 +100,34 @@ public void testPerlRecords() throws IOException, ExecutionException, Interrupte
}
ret.get(PERL_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (logger.latencyReporterCnt.get() != PERL_TOTAL_RECORDS) {
Assert.fail("Latency Reporter Count Failed! Latency Reporter Count : "+ logger.latencyReporterCnt.get() +
" , Expected : " + PERL_TOTAL_RECORDS);
Assert.fail("Latency Reporter Count Failed! Latency Reporter Count : " + logger.latencyReporterCnt.get() +
" , Expected : " + PERL_TOTAL_RECORDS);
}
if (logger.printCnt.get() != PERL_TOTAL_RECORDS) {
Assert.fail("Print Count Failed! Latency Reporter Count : "+ logger.latencyReporterCnt.get() +
Assert.fail("Print Count Failed! Latency Reporter Count : " + logger.printCnt.get() +
" , Expected : " + PERL_TOTAL_RECORDS);
}
if (logger.totalPrintCnt.get() != PERL_TOTAL_RECORDS) {
Assert.fail("Total Print Count Failed! Latency Reporter Count : " + logger.latencyReporterCnt.get() +
Assert.fail("Total Print Count Failed! Latency Reporter Count : " + logger.totalPrintCnt.get() +
" , Expected : " + PERL_TOTAL_RECORDS);
}
}

@Test
public void testPerlRecordsIdleNS() throws IOException, ExecutionException, InterruptedException, TimeoutException {
TestLogger logger = new TestLogger();
Perl perl = PerlBuilder.build(logger, logger, null, null, null);
runPerlRecords(logger, perl);
}

@Test
public void testPerlRecordsSleepMS() throws IOException, ExecutionException,
InterruptedException, TimeoutException {
TestLogger logger = new TestLogger();
PerlConfig config = PerlConfig.build();
config.sleepMS = PERL_SLEEP_MS;
Perl perl = PerlBuilder.build(logger, logger, null, config, null);
runPerlRecords(logger, perl);
}

}
2 changes: 2 additions & 0 deletions sbk-api/src/main/java/io/sbk/api/impl/SbkBenchmark.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public SbkBenchmark(ParameterOptions params, Storage<Object> storage,
if (params.getWritersCount() > 0 && params.getAction() == Action.Writing) {
PerlConfig wConfig = PerlConfig.build(SbkBenchmark.class.getClassLoader().getResourceAsStream(CONFIGFILE));
wConfig.workers = params.getWritersCount();
wConfig.sleepMS = params.getIdleSleepMilliSeconds();
wConfig.csv = false;
writePerl = PerlBuilder.build(rwLogger, rwLogger, this.time, wConfig, executor);
} else {
Expand All @@ -100,6 +101,7 @@ public SbkBenchmark(ParameterOptions params, Storage<Object> storage,
if (params.getReadersCount() > 0) {
PerlConfig rConfig = PerlConfig.build(SbkBenchmark.class.getClassLoader().getResourceAsStream(CONFIGFILE));
rConfig.workers = params.getReadersCount();
rConfig.sleepMS = params.getIdleSleepMilliSeconds();
rConfig.csv = false;
readPerl = PerlBuilder.build(rwLogger, rwLogger, this.time, rConfig, executor);
} else {
Expand Down
Loading

0 comments on commit 93e3f45

Please sign in to comment.