Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NBE: add Messaging & Limits/Interest stream examples #16

Merged
merged 5 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions nats-by-example/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,26 @@
* [RequestReply.java](src/main/java/io/nats/RequestReply.java)
* [Nats By Example - Request-Reply in Messaging](https://natsbyexample.com/examples/messaging/request-reply/java)

* JSON
* [Json.java](src/main/java/io/nats/Json.java)
* [Nats By Example - JSON for Message Payloads in Messaging](https://natsbyexample.com/examples/messaging/json/java)

* Concurrent
* [Concurrent.java](src/main/java/io/nats/Concurrent.java)
* [Nats By Example - Concurrent Message Processing in Messaging](https://natsbyexample.com/examples/messaging/concurrent/java)

* Iterating Multiple Subscriptions
* [IteratingMultipleSubscriptions.java](src/main/java/io/nats/IteratingMultipleSubscriptions.java)
* [Nats By Example - Iterating Over Multiple Subscriptions in Messaging](https://natsbyexample.com/examples/messaging/iterating-multiple-subscriptions/java)

* Limits Stream
* [LimitsStream.java](src/main/java/io/nats/LimitsStream.java)
* [Nats By Example - Limits-based Stream in JetStream](https://natsbyexample.com/examples/jetstream/limits-stream/java)

* Interest Stream
* [InterestStream.java](src/main/java/io/nats/InterestStream.java)
* [Nats By Example - Interest-based Stream in JetStream](https://natsbyexample.com/examples/jetstream/interest-stream/java)

* Simplification Fetch Example
* [SimplificationFetchExample.java](src/main/java/io/nats/SimplificationFetchExample.java)
* [Nats By Example - Consumer - Fetch Messages in JetStream](https://natsbyexample.com/examples/jetstream/consumer-fetch-messages/java)
Expand Down
1 change: 1 addition & 0 deletions nats-by-example/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ repositories {

dependencies {
implementation 'io.nats:jnats:2.19.2-SNAPSHOT'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.13.2'

testImplementation 'org.junit.jupiter:junit-jupiter:5.8.2'
}
Expand Down
49 changes: 49 additions & 0 deletions nats-by-example/src/main/java/io/nats/Concurrent.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package io.nats;

import io.nats.client.Connection;
import io.nats.client.Dispatcher;
import io.nats.client.Nats;
import io.nats.client.Options;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;

public class Concurrent {
public static void main(String[] args) {
String natsURL = System.getenv("NATS_URL");
if (natsURL == null) {
natsURL = "nats://127.0.0.1:4222";
}

// Initialize a connection to the server. The connection is AutoCloseable
// on exit.
try (Connection nc = Nats.connect(natsURL)) {

int total = 50;
CountDownLatch latch = new CountDownLatch(total);

// Create message dispatchers with queue groups for handling messages in
// separate threads.
for (int i = 0; i < 4; i++) {
Dispatcher dispatcher = nc.createDispatcher((msg) -> {
System.out.printf("Received %s\n",
new String(msg.getData(), StandardCharsets.UTF_8));
latch.countDown();
});

dispatcher.subscribe("greet", "queue");
}

for (int i = 0; i < total; i++) {
nc.publish("greet", String.format("hello %s", i).getBytes(StandardCharsets.UTF_8));
}

// Await the dispatcher threads to have received all the messages before the program quits.
latch.await();

} catch (InterruptedException | IOException e) {
e.printStackTrace();
}
}
}
180 changes: 180 additions & 0 deletions nats-by-example/src/main/java/io/nats/InterestStream.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package io.nats;

import io.nats.client.*;
import io.nats.client.api.*;
import io.nats.client.impl.NatsJetStreamMetaData;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

public class InterestStream {
public static void main(String[] args) {
String natsURL = System.getenv("NATS_URL");
if (natsURL == null) {
natsURL = "nats://127.0.0.1:4222";
}

// Initialize a connection to the server. The connection is AutoCloseable
// on exit.
try (Connection nc = Nats.connect(natsURL)) {

// Create `JetStream` and `JetStreamManagement` to use the NATS JetStream API.
// It allows creating and managing streams and consumers as well as
// publishing to streams and consuming messages from streams.
JetStream js = nc.jetStream();
JetStreamManagement jsm = nc.jetStreamManagement();

// ### Creating the stream
// Define the stream configuration, specifying `Interest` for retention, and
// create the stream.
String streamName = "EVENTS";
StreamConfiguration config = StreamConfiguration.builder()
.name(streamName)
.subjects("events.>")
.retentionPolicy(RetentionPolicy.Interest)
.build();

StreamInfo stream = jsm.addStream(config);
System.out.println("Created the stream.");

// To demonstrate the base case behavior of the stream without any consumers, we
// will publish a few messages to the stream.
js.publish("events.page_loaded", null);
js.publish("events.mouse_clicked", null);
PublishAck ack = js.publish("events.input_focused", null);
System.out.println("Published 3 messages.");

// We confirm that all three messages were published and the last message sequence
// is 3.
System.out.printf("Last message seq: %d\n", ack.getSeqno());

// Checking out the stream info, notice how zero messages are present in
// the stream, but the `last_seq` is 3 which matches the last ack'ed
// publish sequence above. Also notice that the `first_seq` is one greater
// which behaves as a sentinel value indicating the stream is empty. This
// sequence has not been assigned to a message yet, but can be interpreted
// as _no messages available_ in this context.
System.out.println("# Stream info without any consumers");
printStreamState(jsm, streamName);

// ### Adding a consumer
// Now let's add a pull consumer and publish a few
// more messages. Also note that we are _only_ creating the consumer and
// have not yet started consuming the messages. This is only to point out
// that it is not _required_ to be actively consuming messages to show
// _interest_, but it is the presence of a consumer which the stream cares
// about to determine retention of messages. [pull]:
// /examples/jetstream/pull-consumer/java
StreamContext streamContext = js.getStreamContext(streamName);
ConsumerContext consumerContext = streamContext.createOrUpdateConsumer(ConsumerConfiguration.builder()
.durable("processor-1")
.ackPolicy(AckPolicy.Explicit)
.build());

js.publish("events.mouse_clicked", null);
js.publish("events.input_focused", null);

// If we inspect the stream info again, we will notice a few differences.
// It shows two messages (which we expect) and the first and last sequences
// corresponding to the two messages we just published. We also see that
// the `consumer_count` is now one.
System.out.println("\n# Stream info with one consumer");
printStreamState(jsm, streamName);

// Now that the consumer is there and showing _interest_ in the messages, we know they
// will remain until we process the messages. Let's fetch the two messages and ack them.
try (FetchConsumer fc = consumerContext.fetchMessages(2)) {
Message m = fc.nextMessage();
while (m != null) {
m.ackSync(Duration.ofSeconds(5));
m = fc.nextMessage();
}
}

// What do we expect in the stream? No messages and the `first_seq` has been set to
// the _next_ sequence number like in the base case.
// ☝️ As a quick aside on that second ack, We are using `ackSync` here for this
// example to ensure the stream state has been synced up for this subsequent
// retrieval.
System.out.println("\n# Stream info with one consumer and acked messages");
printStreamState(jsm, streamName);

// ### Two or more consumers
// Since each consumer represents a separate _view_ over a stream, we would expect
// that if messages were processed by one consumer, but not the other, the messages
// would be retained. This is indeed the case.
ConsumerContext consumerContext2 = streamContext.createOrUpdateConsumer(ConsumerConfiguration.builder()
.durable("processor-2")
.ackPolicy(AckPolicy.Explicit)
.build());

js.publish("events.input_focused", null);
js.publish("events.mouse_clicked", null);

// Here we fetch 2 messages for `processor-2`. There are two observations to
// make here. First the fetched messages are the latest two messages that
// were published just above and not any prior messages since these were
// already deleted from the stream. This should be apparent now, but this
// reinforces that a _late_ consumer cannot retroactively show interest. The
// second point is that the stream info shows that the latest two messages
// are still present in the stream. This is also expected since the first
// consumer had not yet processed them.
List<NatsJetStreamMetaData> messagesMetadata = new ArrayList<>();
try (FetchConsumer fc = consumerContext2.fetchMessages(2)) {
Message m = fc.nextMessage();
while (m != null) {
m.ackSync(Duration.ofSeconds(5));
messagesMetadata.add(m.metaData());
m = fc.nextMessage();
}
}

System.out.printf("Msg seqs %d and %d\n", messagesMetadata.get(0).streamSequence(), messagesMetadata.get(1).streamSequence());

System.out.println("\n# Stream info with two consumers, but only one set of acked messages");
printStreamState(jsm, streamName);

// Fetching and ack'ing from the first consumer subscription will result in the messages
// being deleted.
try (FetchConsumer fc = consumerContext.fetchMessages(2)) {
Message m = fc.nextMessage();
while (m != null) {
m.ackSync(Duration.ofSeconds(5));
m = fc.nextMessage();
}
}

System.out.println("\n# Stream info with two consumers having both acked");
printStreamState(jsm, streamName);

// A final callout is that _interest_ respects the `FilterSubject` on a consumer.
// For example, if a consumer defines a filter only for `events.mouse_clicked` events
// then it won't be considered _interested_ in events such as `events.input_focused`.
streamContext.createOrUpdateConsumer(ConsumerConfiguration.builder()
.durable("processor-3")
.ackPolicy(AckPolicy.Explicit)
.filterSubject("events.mouse_clicked")
.build());

js.publish("events.input_focused", null);

// Fetch and term (also works) and ack from the first consumers that _do_ have interest.
Message m = consumerContext.next();
m.term();
m = consumerContext2.next();
m.ackSync(Duration.ofSeconds(5));

System.out.println("\n# Stream info with three consumers with interest from two");
printStreamState(jsm, streamName);
} catch (Exception e) {
e.printStackTrace();
}
}

private static void printStreamState(JetStreamManagement jsm, String streamName) throws IOException, JetStreamApiException {
StreamInfo streamInfo = jsm.getStreamInfo(streamName);
System.out.println(streamInfo.getStreamState());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package io.nats;

import io.nats.client.Connection;
import io.nats.client.Dispatcher;
import io.nats.client.Nats;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;

public class IteratingMultipleSubscriptions {
public static void main(String[] args) {
String natsURL = System.getenv("NATS_URL");
if (natsURL == null) {
natsURL = "nats://127.0.0.1:4222";
}

// Initialize a connection to the server. The connection is AutoCloseable
// on exit.
try (Connection nc = Nats.connect(natsURL)) {

int total = 80;
CountDownLatch latch = new CountDownLatch(total);

// Create a message dispatcher. A dispatcher is a process that runs on
// its own thread, receives incoming messages via a FIFO queue,
// for subjects registered on it. For each message it takes from
// the queue, it makes a blocking call to the MessageHandler
// passed to the createDispatcher call.
Dispatcher dispatcher = nc.createDispatcher((msg) -> {
System.out.printf("Received %s: %s\n",
msg.getSubject(),
new String(msg.getData(), StandardCharsets.UTF_8));
latch.countDown();
});

// Subscribe directly on the dispatcher for multiple subjects.
dispatcher.subscribe("s1");
dispatcher.subscribe("s2");
dispatcher.subscribe("s3");
dispatcher.subscribe("s4");

for (int i = 0; i < total / 4; i++) {
nc.publish("s1", String.valueOf(i).getBytes(StandardCharsets.UTF_8));
nc.publish("s2", String.valueOf(i).getBytes(StandardCharsets.UTF_8));
nc.publish("s3", String.valueOf(i).getBytes(StandardCharsets.UTF_8));
nc.publish("s4", String.valueOf(i).getBytes(StandardCharsets.UTF_8));
Thread.sleep(100);
}

// Await the dispatcher thread to have received all the messages before the program quits.
latch.await();

} catch (InterruptedException | IOException e) {
e.printStackTrace();
}
}
}
Loading
Loading