Skip to content

Commit

Permalink
Merge pull request #35824 from f0nZ/main
Browse files Browse the repository at this point in the history
Add capability to add topic configuration during topic creation
  • Loading branch information
cescoffier authored Dec 4, 2024
2 parents 5006bf3 + 5b0dfd5 commit c5cc765
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ export class QwcKafkaAddTopic extends LitElement {
min="0"
max="99">
</vaadin-integer-field>
<vaadin-text-field
label="Configurations"
placeholder="key1=value1,key2=value2"
value="${this._newTopic.configs}"
step-buttons-visible
@value-changed="${(e) => this._configsChanged(e)}">
</vaadin-text-field>
${this._renderButtons()}`;
}

Expand All @@ -70,10 +77,11 @@ export class QwcKafkaAddTopic extends LitElement {
}

_reset(){
this._newTopic = new Object();
this._newTopic = {};
this._newTopic.name = '';
this._newTopic.partitions = 1;
this._newTopic.replications = 1;
this._newTopic.configs = undefined;
}

_cancel(){
Expand All @@ -89,11 +97,11 @@ export class QwcKafkaAddTopic extends LitElement {

_submit(){
if(this._newTopic.name.trim() !== ''){

this.jsonRpc.createTopic({
topicName: this._newTopic.name,
partitions: parseInt(this._newTopic.partitions),
replications: parseInt(this._newTopic.replications)
replications: parseInt(this._newTopic.replications),
configs: this._newTopic.configs
}).then(jsonRpcResponse => {
this._reset();
const success = new CustomEvent("kafka-topic-added-success", {
Expand All @@ -119,6 +127,17 @@ export class QwcKafkaAddTopic extends LitElement {
_replicationsChanged(e){
this._newTopic.replications = e.detail.value;
}

_configsChanged(e){
this._newTopic.configs = Object.fromEntries(e.detail.value.split(',')
.reduce((configs, item) => {
const split = item.trim().split('=');
if (split.length > 1) {
configs.set(split[0], split[1]);
}
return configs;
}, new Map()));
}
}

customElements.define('qwc-kafka-add-topic', QwcKafkaAddTopic);
Original file line number Diff line number Diff line change
Expand Up @@ -70,23 +70,23 @@ public Collection<ConsumerGroupDescription> getConsumerGroups() throws Interrupt
.values();
}

public boolean deleteTopic(String name) {
public boolean deleteTopic(final String name) {
Collection<String> topics = new ArrayList<>();
topics.add(name);
DeleteTopicsResult dtr = client.deleteTopics(topics);
return dtr.topicNameValues() != null;
}

public boolean createTopic(KafkaCreateTopicRequest kafkaCreateTopicRq) {
public boolean createTopic(final KafkaCreateTopicRequest kafkaCreateTopicRq) {
var partitions = Optional.ofNullable(kafkaCreateTopicRq.getPartitions()).orElse(1);
var replications = Optional.ofNullable(kafkaCreateTopicRq.getReplications()).orElse((short) 1);
var newTopic = new NewTopic(kafkaCreateTopicRq.getTopicName(), partitions, replications);

newTopic.configs(Optional.ofNullable(kafkaCreateTopicRq.getConfigs()).orElse(Map.of()));
CreateTopicsResult ctr = client.createTopics(List.of(newTopic));
return ctr.values() != null;
}

public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId) {
public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(final String groupId) {
return client.listConsumerGroupOffsets(groupId);
}

Expand All @@ -96,7 +96,7 @@ public Collection<AclBinding> getAclInfo() throws InterruptedException, Executio
return client.describeAcls(filter, options).values().get();
}

public Map<String, TopicDescription> describeTopics(Collection<String> topicNames)
public Map<String, TopicDescription> describeTopics(final Collection<String> topicNames)
throws InterruptedException, ExecutionException {
return client.describeTopics(topicNames)
.allTopicNames()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,28 @@ public List<KafkaTopic> getTopics() throws InterruptedException, ExecutionExcept
return kafkaUiUtils.getTopics();
}

public List<KafkaTopic> createTopic(String topicName, int partitions, int replications)
public List<KafkaTopic> createTopic(final String topicName, final int partitions, final int replications,
Map<String, String> configs)
throws InterruptedException, ExecutionException {

KafkaCreateTopicRequest createTopicRequest = new KafkaCreateTopicRequest(topicName, partitions, (short) replications);
KafkaCreateTopicRequest createTopicRequest = new KafkaCreateTopicRequest(topicName, partitions, (short) replications,
configs);
boolean created = kafkaAdminClient.createTopic(createTopicRequest);
if (created) {
return kafkaUiUtils.getTopics();
}
throw new RuntimeException("Topic [" + topicName + "] not created");
}

public List<KafkaTopic> deleteTopic(String topicName) throws InterruptedException, ExecutionException {
public List<KafkaTopic> deleteTopic(final String topicName) throws InterruptedException, ExecutionException {
boolean deleted = kafkaAdminClient.deleteTopic(topicName);
if (deleted) {
return kafkaUiUtils.getTopics();
}
throw new RuntimeException("Topic [" + topicName + "] not deleted");
}

public KafkaMessagePage topicMessages(String topicName) throws ExecutionException, InterruptedException {
public KafkaMessagePage topicMessages(final String topicName) throws ExecutionException, InterruptedException {
List<Integer> partitions = getPartitions(topicName);
KafkaOffsetRequest offsetRequest = new KafkaOffsetRequest(topicName, partitions, Order.NEW_FIRST);
Map<Integer, Long> offset = kafkaUiUtils.getOffset(offsetRequest);
Expand All @@ -71,7 +73,7 @@ public KafkaMessagePage createMessage(String topicName, Integer partition, Strin
return topicMessages(topicName);
}

public List<Integer> getPartitions(String topicName) throws ExecutionException, InterruptedException {
public List<Integer> getPartitions(final String topicName) throws ExecutionException, InterruptedException {
return new ArrayList<>(kafkaUiUtils.partitions(topicName));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
package io.quarkus.kafka.client.runtime.devui.model.request;

import java.util.Map;

public class KafkaCreateTopicRequest {
private String topicName;
private Integer partitions;
private Short replications;
private Map<String, String> configs;

public KafkaCreateTopicRequest() {
}

public KafkaCreateTopicRequest(String topicName, Integer partitions, Short replications) {
public KafkaCreateTopicRequest(final String topicName, final Integer partitions, final Short replications,
final Map<String, String> configs) {
this.topicName = topicName;
this.partitions = partitions;
this.replications = replications;
this.configs = configs;
}

public String getTopicName() {
Expand All @@ -25,4 +30,9 @@ public Integer getPartitions() {
public Short getReplications() {
return replications;
}

public Map<String, String> getConfigs() {
return configs;
}

}

0 comments on commit c5cc765

Please sign in to comment.