Skip to content

Commit

Permalink
unifying "--topic" opt
Browse files Browse the repository at this point in the history
  • Loading branch information
andrus committed May 11, 2024
1 parent acf9ea1 commit aa36388
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 42 deletions.
24 changes: 9 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,7 @@ OPTIONS
-p, --producer
Starts an interactive Kafka producer for the specified topic
--topic=topic_name
Kafka topic name
--topic=topic_name
-t topic_name, --topic=topic_name
Kafka topic name
```

Expand All @@ -79,16 +76,13 @@ java -jar target/bootique-kafka-examples-3.0.jar --producer \
This starts an interactive console app that allows you to type messages, one line at a time, that are sent to Kafka:

```
INFO [2024-05-11 22:22:39,684] main i.b.k.c.p.DefaultKafkaProducerBuilder: Creating producer. Cluster: 127.0.0.1:9092.
INFO [2024-05-11 22:22:39,713] main o.a.k.c.t.i.KafkaMetricsCollector: initializing Kafka metrics collector
INFO [2024-05-11 22:22:39,773] main o.a.k.c.u.AppInfoParser: Kafka version: 3.7.0
INFO [2024-05-11 22:22:39,773] main o.a.k.c.u.AppInfoParser: Kafka commitId: 2ae524ed625438c5
INFO [2024-05-11 22:22:39,773] main o.a.k.c.u.AppInfoParser: Kafka startTimeMs: 1715466159773
Start typing messages below. Type '\q' to exit.
bq-kafka-example > Hi!
bq-kafka-example > Hi again!
INFO main i.b.k.c.p.DefaultKafkaProducerBuilder: Creating producer. Cluster: 127.0.0.1:9092.
INFO main o.a.k.c.t.i.KafkaMetricsCollector: initializing Kafka metrics collector
INFO main o.a.k.c.u.AppInfoParser: Kafka version: 3.7.0
INFO main o.a.k.c.u.AppInfoParser: Kafka commitId: 2ae524ed625438c5
INFO main o.a.k.c.u.AppInfoParser: Kafka startTimeMs: 1715466159773
(producer) bq-kafka-example > Hi!
(producer) bq-kafka-example > Hi again!
```

You can read these messages from the topic using Kafka provided console consumer, but let's start our own consumer.
Expand All @@ -101,4 +95,4 @@ java -jar target/bootique-kafka-examples-3.0.jar --consumer \
--topic=bq-kafka-example
```
Now, return to the running producer window, and type more messages. All of them should be mirrored in the consumer
command output.
command output. You can stop either producer or consumer with `Ctrl-c`.
10 changes: 9 additions & 1 deletion src/main/java/io/bootique/examples/kafka/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
import io.bootique.BQModule;
import io.bootique.Bootique;
import io.bootique.di.Binder;
import io.bootique.meta.application.OptionMetadata;

public class App implements BQModule {

public static final String TOPIC_OPT = "topic";
public static final String KAFKA_CLUSTER = "c1";

public static void main(String[] args) {
Expand All @@ -19,8 +21,14 @@ public static void main(String[] args) {

@Override
public void configure(Binder binder) {
OptionMetadata topicOpt = OptionMetadata
.builder(TOPIC_OPT)
.description("Kafka topic name")
.valueRequired("topic_name").build();

BQCoreModule.extend(binder)
.addCommand(ProducerCommand.class)
.addCommand(ConsumerCommand.class);
.addCommand(ConsumerCommand.class)
.addOption(topicOpt);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import io.bootique.kafka.client.consumer.KafkaConsumerFactory;
import io.bootique.kafka.client.consumer.KafkaPollingTracker;
import io.bootique.meta.application.CommandMetadata;
import io.bootique.meta.application.OptionMetadata;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;

Expand All @@ -16,16 +15,12 @@

public class ConsumerCommand extends CommandWithMetadata {

private static final String TOPIC_OPT = "topic";

private final Provider<KafkaConsumerFactory> consumerProvider;

private static CommandMetadata metadata() {
OptionMetadata omd = OptionMetadata.builder(TOPIC_OPT).description("Kafka topic name").valueRequired("topic_name").build();
return CommandMetadata
.builder(ConsumerCommand.class)
.description("Starts a Kafka consumer for the specified topic")
.addOption(omd)
.build();
}

Expand All @@ -38,7 +33,7 @@ public ConsumerCommand(Provider<KafkaConsumerFactory> consumerProvider) {
@Override
public CommandOutcome run(Cli cli) {

String topic = cli.optionString(TOPIC_OPT);
String topic = cli.optionString(App.TOPIC_OPT);
if (topic == null) {
return CommandOutcome.failed(-1, "No '--topic' specified");
}
Expand Down
25 changes: 5 additions & 20 deletions src/main/java/io/bootique/examples/kafka/ProducerCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import io.bootique.command.CommandWithMetadata;
import io.bootique.kafka.client.producer.KafkaProducerFactory;
import io.bootique.meta.application.CommandMetadata;
import io.bootique.meta.application.OptionMetadata;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

Expand All @@ -20,17 +19,12 @@
*/
public class ProducerCommand extends CommandWithMetadata {

private static final String TOPIC_OPT = "topic";
private static final String QUIT_COMMAND = "\\q";

private final Provider<KafkaProducerFactory> producerProvider;

private static CommandMetadata metadata() {
OptionMetadata omd = OptionMetadata.builder(TOPIC_OPT).description("Kafka topic name").valueRequired("topic_name").build();
return CommandMetadata
.builder(ProducerCommand.class)
.description("Starts an interactive Kafka producer for the specified topic")
.addOption(omd)
.build();
}

Expand All @@ -43,7 +37,7 @@ public ProducerCommand(Provider<KafkaProducerFactory> producerProvider) {
@Override
public CommandOutcome run(Cli cli) {

String topic = cli.optionString(TOPIC_OPT);
String topic = cli.optionString(App.TOPIC_OPT);
if (topic == null) {
return CommandOutcome.failed(-1, "No '--topic' specified");
}
Expand All @@ -62,10 +56,6 @@ public CommandOutcome run(Cli cli) {

private CommandOutcome runConsole(String topic, Producer<byte[], String> producer) {

System.out.println();
System.out.println(" Start typing messages below. Type '\\q' to exit.");
System.out.println();

try (BufferedReader stdinReader = new BufferedReader(new InputStreamReader(System.in))) {
readAndPost(stdinReader, topic, producer);
return CommandOutcome.succeeded();
Expand All @@ -76,19 +66,14 @@ private CommandOutcome runConsole(String topic, Producer<byte[], String> produce

private void readAndPost(BufferedReader reader, String topic, Producer<byte[], String> producer) throws IOException {

System.out.print(topic + " > ");
String message;
while ((message = reader.readLine()) != null) {

if (QUIT_COMMAND.equals(message)) {
break;
}
String message = "";

do {
if (!"".equals(message)) {
producer.send(new ProducerRecord<>(topic, message));
}

System.out.print(topic + " > ");
}
System.out.print("(producer) " + topic + " > ");
} while ((message = reader.readLine()) != null);
}
}

0 comments on commit aa36388

Please sign in to comment.