Skip to content

Commit

Permalink
Ability to accept message counts of long instead of int for Multi Tool
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf committed Sep 9, 2024
1 parent 69f7042 commit 4ac187d
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 103 deletions.
145 changes: 68 additions & 77 deletions js-multi-tool/src/main/java/io/nats/jsmulti/JsMulti.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@
import io.nats.jsmulti.settings.Action;
import io.nats.jsmulti.settings.Arguments;
import io.nats.jsmulti.settings.Context;
import io.nats.jsmulti.shared.ActionRunner;
import io.nats.jsmulti.shared.OptionsFactory;
import io.nats.jsmulti.shared.Stats;
import io.nats.jsmulti.shared.Utils;
import io.nats.jsmulti.shared.*;

import java.io.IOException;
import java.time.Duration;
Expand Down Expand Up @@ -114,67 +111,58 @@ private static void cleanupConsumers(Context ctx) {
}

private static ActionRunner getRunner(final Context ctx) {
try {
if (!ctx.action.isQueue() || ctx.threads > 1) {

switch (ctx.action) {
case CUSTOM:
return ctx.customActionRunner;

case PUB_SYNC:
return JsMulti::pubSync;
case PUB_ASYNC:
return JsMulti::pubAsync;
case PUB_CORE:
return JsMulti::pubCore;
case PUB:
return JsMulti::pub;

case RTT:
return JsMulti::rtt;

case REQUEST:
return JsMulti::request;
case REPLY:
return JsMulti::reply;

case SUB_CORE:
return JsMulti::subCore;

case SUB_PUSH:
case SUB_QUEUE:
return JsMulti::subPush;

case SUB_PULL:
case SUB_PULL_READ:
case SUB_PULL_QUEUE:
case SUB_PULL_READ_QUEUE:
return JsMulti::subPull;

case SUB_ITERATE:
case SUB_FETCH:
case SUB_ITERATE_QUEUE:
case SUB_FETCH_QUEUE:
return JsMulti::subSimple;
}
if (!ctx.action.isQueue() || ctx.threads > 1) {
switch (ctx.action) {
case CUSTOM:
return ctx.customActionRunner;

case PUB_SYNC:
return JsMulti::pubSync;
case PUB_ASYNC:
return JsMulti::pubAsync;
case PUB_CORE:
return JsMulti::pubCore;
case PUB:
return JsMulti::pub;

case RTT:
return JsMulti::rtt;

case REQUEST:
return JsMulti::request;
case REPLY:
return JsMulti::reply;

case SUB_CORE:
return JsMulti::subCore;

case SUB_PUSH:
case SUB_QUEUE:
return JsMulti::subPush;

case SUB_PULL:
case SUB_PULL_READ:
case SUB_PULL_QUEUE:
case SUB_PULL_READ_QUEUE:
return JsMulti::subPull;

case SUB_ITERATE:
case SUB_FETCH:
case SUB_ITERATE_QUEUE:
case SUB_FETCH_QUEUE:
return JsMulti::subSimple;
}
throw new Exception("Invalid Action");
}
catch (Exception e) {
//noinspection CallToPrintStackTrace
e.printStackTrace();
System.exit(-1);
return null;
}
throw new TerminalException("Invalid Action");
}

// ----------------------------------------------------------------------------------------------------
// RTT
// ----------------------------------------------------------------------------------------------------
private static void rtt(Context ctx, Connection nc, Stats stats, int id) throws Exception {
int pubTarget = ctx.getPubCount(id);
int published = 0;
int unReported = 0;
long pubTarget = ctx.getPubCount(id);
long published = 0;
long unReported = 0;
report(ctx, published, "Begin RTT");
while (published < pubTarget) {
jitter(ctx);
Expand Down Expand Up @@ -251,7 +239,7 @@ private static void pubCore(Context ctx, final Connection nc, Stats stats, int i
jsm = nc.jetStreamManagement(ctx.getJetStreamOptions());
List<String> streamNames = jsm.getStreamNames(ctx.subject);
if (streamNames.size() != 1) {
throw new RuntimeException("JetStream subject does not exist for latency run [" + ctx.subject + "]");
throw new TerminalException("JetStream subject does not exist for latency run [" + ctx.subject + "]");
}
streamName = streamNames.get(0);
StreamInfo si = jsm.getStreamInfo(streamName, StreamInfoOptions.filterSubjects(ctx.subject));
Expand Down Expand Up @@ -280,9 +268,9 @@ private static void pubCore(Context ctx, final Connection nc, Stats stats, int i

private static <T> void _pub(Context ctx, Stats stats, int id, Publisher<T> p, ResultHandler<T> rh) throws Exception {
int retriesAvailable = ctx.maxPubRetries;
int pubTarget = ctx.getPubCount(id);
int published = 0;
int unReported = 0;
long pubTarget = ctx.getPubCount(id);
long published = 0;
long unReported = 0;
report(ctx, published, "Begin Publishing");
while (published < pubTarget) {
jitter(ctx);
Expand All @@ -294,7 +282,10 @@ private static <T> void _pub(Context ctx, Stats stats, int id, Publisher<T> p, R
unReported = reportAndTrackMaybe(ctx, ++published, ++unReported, "Published", stats);
}
catch (IOException ioe) {
if (!isRegularTimeout(ioe) || --retriesAvailable == 0) { throw ioe; }
if (!isRegularTimeout(ioe) || --retriesAvailable == 0) {
report(ctx, published, "Publishing ended due exception " + ioe.getMessage());
throw ioe;
}
}
}
report(ctx, published, "Completed Publishing");
Expand All @@ -312,9 +303,9 @@ private static void pubAsync(Context ctx, Connection nc, Stats stats, int id) th

List<CompletableFuture<PublishAck>> futures = new ArrayList<>();
int roundCount = 0;
int pubTarget = ctx.getPubCount(id);
int published = 0;
int unReported = 0;
long pubTarget = ctx.getPubCount(id);
long published = 0;
long unReported = 0;
report(ctx, published, "Begin Publishing");
while (published < pubTarget) {
if (++roundCount >= ctx.roundSize) {
Expand Down Expand Up @@ -392,8 +383,8 @@ private static void subCore(Context ctx, Connection nc, Stats stats, int id) thr
}

private static void _coreReadLikePush(Context ctx, Stats stats, String subName, SyncConsumer syncConsumer, ResultHandler<Message> rh) throws Exception {
int rcvd = 0;
int unReported = 0;
long rcvd = 0;
long unReported = 0;
long noMessageTotalElapsed = 0;
AtomicLong counter = ctx.getSubscribeCounter(subName);
report(ctx, rcvd, "Begin Reading");
Expand Down Expand Up @@ -452,10 +443,10 @@ private static void subPush(Context ctx, Connection nc, Stats stats, int id) thr
}

private static void _jsSyncConsume(Context ctx, Stats stats, String durable, SyncConsumer syncConsumer) throws Exception {
int rcvd = 0;
long rcvd = 0;
Message lastUnAcked = null;
int unAckedCount = 0;
int unReported = 0;
long unAckedCount = 0;
long unReported = 0;
long noMessageTotalElapsed = 0;
AtomicLong counter = ctx.getSubscribeCounter(durable);
report(ctx, rcvd, "Begin Reading");
Expand Down Expand Up @@ -532,7 +523,7 @@ else if (ctx.action == Action.SUB_ITERATE || ctx.action == Action.SUB_ITERATE_QU
// TODO
// }
else {
throw new Exception("Action Not Implemented: " + ctx.action.getLabel());
throw new TerminalException("Action Not Implemented: " + ctx.action.getLabel());
}
}

Expand Down Expand Up @@ -562,15 +553,15 @@ else if (ctx.action == Action.SUB_PULL_READ) {
_subPullRead(ctx, stats, sub, durable);
}
else {
throw new Exception("Action Not Implemented: " + ctx.action.getLabel());
throw new TerminalException("Action Not Implemented: " + ctx.action.getLabel());
}
}

private static void _subPullFetch(Context ctx, Stats stats, JetStreamSubscription sub, String durable) {
int rcvd = 0;
long rcvd = 0;
Message lastUnAcked = null;
int unAckedCount = 0;
int unReported = 0;
long unAckedCount = 0;
long unReported = 0;
AtomicLong counter = ctx.getSubscribeCounter(durable);
report(ctx, rcvd, "Begin Reading");
while (counter.get() < ctx.messageCount) {
Expand Down Expand Up @@ -607,7 +598,7 @@ private static void _subPullRead(Context ctx, Stats stats, JetStreamSubscription
// ----------------------------------------------------------------------------------------------------
// Helpers
// ----------------------------------------------------------------------------------------------------
private static void acceptHoldOnceStarted(Stats stats, int rcvd, long hold, Context ctx) {
private static void acceptHoldOnceStarted(Stats stats, long rcvd, long hold, Context ctx) {
if (rcvd == 0) {
ctx.app.report("Waiting for first message.");
}
Expand All @@ -622,7 +613,7 @@ private static boolean isRegularTimeout(IOException ioe) {
}

// This method returns null if message is acked or policy is None
private static Message ackMaybe(Context ctx, Stats stats, Message m, int unAckedCount) {
private static Message ackMaybe(Context ctx, Stats stats, Message m, long unAckedCount) {
if (ctx.ackPolicy == AckPolicy.Explicit) {
_ack(stats, m);
return null;
Expand Down
Loading

0 comments on commit 4ac187d

Please sign in to comment.