From f2fd0bafcbea77c69602f89ff08073f07b0c5bf8 Mon Sep 17 00:00:00 2001 From: Neeraj Date: Sat, 21 Oct 2023 19:53:53 +0530 Subject: [PATCH 1/3] feat(api): Add increase topic partition api --- .../java/org/akhq/controllers/TopicController.java | 10 ++++++++++ .../java/org/akhq/modules/AbstractKafkaWrapper.java | 12 ++++++++++++ .../java/org/akhq/repositories/TopicRepository.java | 4 ++++ .../org/akhq/controllers/TopicControllerTest.java | 7 +++++++ .../org/akhq/repositories/TopicRepositoryTest.java | 11 +++++++++++ 5 files changed, 44 insertions(+) diff --git a/src/main/java/org/akhq/controllers/TopicController.java b/src/main/java/org/akhq/controllers/TopicController.java index 7b54bdec5..cbd8b2bb0 100644 --- a/src/main/java/org/akhq/controllers/TopicController.java +++ b/src/main/java/org/akhq/controllers/TopicController.java @@ -322,6 +322,16 @@ public List updateConfig(String cluster, String topicName, Map increasePartition(String cluster, String topicName, Map config) throws ExecutionException, InterruptedException { + checkIfClusterAndResourceAllowed(cluster, topicName); + this.topicRepository.increasePartition(cluster, topicName, config.get("partition")); + + return HttpResponse.accepted(); + } + @AKHQSecured(resource = Role.Resource.TOPIC_DATA, action = Role.Action.DELETE) @Delete("api/{cluster}/topic/{topicName}/data/empty") @Operation(tags = {"topic data"}, summary = "Empty data from a topic") diff --git a/src/main/java/org/akhq/modules/AbstractKafkaWrapper.java b/src/main/java/org/akhq/modules/AbstractKafkaWrapper.java index 27d06631b..42f5fd466 100644 --- a/src/main/java/org/akhq/modules/AbstractKafkaWrapper.java +++ b/src/main/java/org/akhq/modules/AbstractKafkaWrapper.java @@ -107,6 +107,18 @@ public void createTopics(String clusterId, String name, int partitions, short re listTopics = new HashMap<>(); } + public void alterTopicPartition(String clusterId, String name, int partitions) throws ExecutionException { + Map newPartitionMap = new HashMap<>(); + newPartitionMap.put(name, NewPartitions.increaseTo(partitions)); + + Logger.call(kafkaModule + .getAdminClient(clusterId) + .createPartitions(newPartitionMap).all(), + "Increase Topic partition", + Collections.singletonList(name) + ); + } + public void deleteTopics(String clusterId, String name) throws ExecutionException { Logger.call(kafkaModule.getAdminClient(clusterId) .deleteTopics(Collections.singleton(name)) diff --git a/src/main/java/org/akhq/repositories/TopicRepository.java b/src/main/java/org/akhq/repositories/TopicRepository.java index 19377c9c1..264eb8f68 100644 --- a/src/main/java/org/akhq/repositories/TopicRepository.java +++ b/src/main/java/org/akhq/repositories/TopicRepository.java @@ -126,6 +126,10 @@ public void delete(String clusterId, String name) throws ExecutionException, Int kafkaWrapper.deleteTopics(clusterId, name); } + public void increasePartition(String clusterId, String name, int partitions) throws ExecutionException, InterruptedException { + kafkaWrapper.alterTopicPartition(clusterId, name, partitions); + } + @Retryable( includes = { UnknownTopicOrPartitionException.class diff --git a/src/test/java/org/akhq/controllers/TopicControllerTest.java b/src/test/java/org/akhq/controllers/TopicControllerTest.java index 861625f14..05ecf21eb 100644 --- a/src/test/java/org/akhq/controllers/TopicControllerTest.java +++ b/src/test/java/org/akhq/controllers/TopicControllerTest.java @@ -259,6 +259,13 @@ void produceMultipleMessages() { assertTrue(response.get(2).getValue().contains("key3_{\"test_1\":3}")); } + @Test + @Order(7) + void increasePartitionApi() { + this.exchange(HttpRequest.POST(TOPIC_URL + "/partitions", + ImmutableMap.of("partition", 4))); + } + @Test @Order(8) void delete() { diff --git a/src/test/java/org/akhq/repositories/TopicRepositoryTest.java b/src/test/java/org/akhq/repositories/TopicRepositoryTest.java index a53cf2f8f..9693387dc 100644 --- a/src/test/java/org/akhq/repositories/TopicRepositoryTest.java +++ b/src/test/java/org/akhq/repositories/TopicRepositoryTest.java @@ -172,6 +172,17 @@ void partition() throws ExecutionException, InterruptedException { assertEquals(3, topicRepository.findByName(KafkaTestCluster.CLUSTER_ID, KafkaTestCluster.TOPIC_COMPACTED).getPartitions().size()); } + @Test + void increasePartition() throws ExecutionException, InterruptedException { + topicRepository.create(KafkaTestCluster.CLUSTER_ID, "increasePartition", 8, (short) 1, Collections.emptyList() + ); + topicRepository.increasePartition(KafkaTestCluster.CLUSTER_ID, "increasePartition", 9); + + assertEquals(9, topicRepository.findByName(KafkaTestCluster.CLUSTER_ID, "increasePartition").getPartitions().size()); + + topicRepository.delete(KafkaTestCluster.CLUSTER_ID, "increasePartition"); + } + private void mockApplicationContext() { Authentication auth = new ServerAuthentication("test", List.of(), Map.of()); DefaultSecurityService securityService = Mockito.mock(DefaultSecurityService.class); From 60c2599041b218a329a991c602ae9d19d411e46b Mon Sep 17 00:00:00 2001 From: Neeraj Date: Sat, 21 Oct 2023 21:03:59 +0530 Subject: [PATCH 2/3] feat(api): Change topic name in test case --- src/test/java/org/akhq/controllers/TopicControllerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/org/akhq/controllers/TopicControllerTest.java b/src/test/java/org/akhq/controllers/TopicControllerTest.java index 05ecf21eb..c9a79f253 100644 --- a/src/test/java/org/akhq/controllers/TopicControllerTest.java +++ b/src/test/java/org/akhq/controllers/TopicControllerTest.java @@ -262,7 +262,7 @@ void produceMultipleMessages() { @Test @Order(7) void increasePartitionApi() { - this.exchange(HttpRequest.POST(TOPIC_URL + "/partitions", + this.exchange(HttpRequest.POST(CREATE_TOPIC_URL + "/partitions", ImmutableMap.of("partition", 4))); } From 974381e32a60d994fd56408c89084147dacb4248 Mon Sep 17 00:00:00 2001 From: Neeraj Date: Tue, 24 Oct 2023 01:11:47 +0530 Subject: [PATCH 3/3] feat(api): Add Increase partition page --- client/src/components/Root/Root.jsx | 1 + client/src/containers/Topic/Topic/Topic.jsx | 11 +++ .../TopicPartitions/TopicIncreaseParition.jsx | 75 +++++++++++++++++++ client/src/utils/Routes.js | 9 +++ client/src/utils/api.js | 3 + client/src/utils/endpoints.js | 4 + 6 files changed, 103 insertions(+) create mode 100644 client/src/containers/Topic/Topic/TopicPartitions/TopicIncreaseParition.jsx diff --git a/client/src/components/Root/Root.jsx b/client/src/components/Root/Root.jsx index 317275801..341489089 100644 --- a/client/src/components/Root/Root.jsx +++ b/client/src/components/Root/Root.jsx @@ -42,6 +42,7 @@ class Root extends Component { buildConfig() { let config = new Map(); config.cancelToken = this.cancel.token; + config.validateStatus = () => true; if (localStorage.getItem('jwtToken')) { config.headers = {}; diff --git a/client/src/containers/Topic/Topic/Topic.jsx b/client/src/containers/Topic/Topic/Topic.jsx index dcb843036..870d67a7f 100644 --- a/client/src/containers/Topic/Topic/Topic.jsx +++ b/client/src/containers/Topic/Topic/Topic.jsx @@ -363,6 +363,17 @@ class Topic extends Root { )} + {selectedTab === 'partitions' && + roles.TOPIC_DATA && + roles.TOPIC_DATA.includes('CREATE') && ( + + Increase Partition + + )} + {roles.TOPIC_DATA && roles.TOPIC_DATA.includes('CREATE') && ( Produce to topic diff --git a/client/src/containers/Topic/Topic/TopicPartitions/TopicIncreaseParition.jsx b/client/src/containers/Topic/Topic/TopicPartitions/TopicIncreaseParition.jsx new file mode 100644 index 000000000..e26269389 --- /dev/null +++ b/client/src/containers/Topic/Topic/TopicPartitions/TopicIncreaseParition.jsx @@ -0,0 +1,75 @@ +import React from 'react'; +import Joi from 'joi-browser'; +import { withRouter } from 'react-router-dom'; +import Form from '../../../../components/Form/Form'; +import Header from '../../../Header'; +import { uriTopicIncreasePartition } from '../../../../utils/endpoints'; +import { toast } from 'react-toastify'; + +class TopicIncreasePartition extends Form { + state = { + formData: { + partition: 1 + }, + selectedCluster: this.props.match.params.clusterId, + selectedTopic: this.props.match.params.topicId, + errors: {} + }; + + componentDidMount() { + this.getTopicsPartitions(); + } + + async getTopicsPartitions() { + const { selectedCluster, selectedTopic } = this.state; + + let partitions = await this.getApi(uriTopicIncreasePartition(selectedCluster, selectedTopic)); + let form = {}; + form.partition = partitions.data.length; + this.setState({ formData: form }); + } + + schema = { + partition: Joi.number().min(1).label('Partition').required() + }; + + async doSubmit() { + const { formData, selectedCluster, selectedTopic } = this.state; + const partitionData = { + partition: formData.partition + }; + + this.postApi(uriTopicIncreasePartition(selectedCluster, selectedTopic), partitionData) + .then(() => { + this.props.history.push({ + pathname: `/ui/${selectedCluster}/topic` + }); + toast.success('Topic partition updated'); + }) + .catch(error => toast.error(error.data.message)); + } + render() { + return ( +
+
this.doSubmit()} + > +
+ {this.renderInput('partition', 'Partition', 'Partition', 'number')} + {this.renderButton( + 'Update', + () => { + this.doSubmit(); + }, + undefined, + 'button' + )} + +
+ ); + } +} + +export default withRouter(TopicIncreasePartition); diff --git a/client/src/utils/Routes.js b/client/src/utils/Routes.js index 9ee1fffdb..bbecaa534 100644 --- a/client/src/utils/Routes.js +++ b/client/src/utils/Routes.js @@ -13,6 +13,7 @@ import ConnectCreate from '../containers/Connect/ConnectCreate/ConnectCreate'; import Connect from '../containers/Connect/ConnectDetail/Connect'; import TopicCreate from '../containers/Topic/TopicCreate/TopicCreate'; import TopicProduce from '../containers/Topic/TopicProduce'; +import TopicIncreaseParition from '../containers/Topic/Topic/TopicPartitions/TopicIncreaseParition'; import TopicCopy from '../containers/Topic/TopicCopy'; import Loading from '../containers/Loading'; import ConsumerGroupList from '../containers/ConsumerGroup/ConsumerGroupList'; @@ -166,6 +167,14 @@ class Routes extends Root { /> )} + {roles && roles.TOPIC && roles.TOPIC_DATA.includes('CREATE') && ( + + )} + {roles && roles.TOPIC && roles.TOPIC_DATA.includes('CREATE') && ( )} diff --git a/client/src/utils/api.js b/client/src/utils/api.js index 435079f70..cdfafaa70 100644 --- a/client/src/utils/api.js +++ b/client/src/utils/api.js @@ -79,6 +79,9 @@ export const post = (url, body, config) => axios .post(url, body, { ...configs, ...config }) .then(res => { + if (res.status >= 400) { + reject(res); + } resolve(res); }) .catch(err => { diff --git a/client/src/utils/endpoints.js b/client/src/utils/endpoints.js index 30fa3a16f..a8aeca360 100644 --- a/client/src/utils/endpoints.js +++ b/client/src/utils/endpoints.js @@ -52,6 +52,9 @@ export const uriTopicsInfo = (clusterId, topicId) => `${apiUrl}/${clusterId}/top export const uriTopicsCreate = clusterId => `${apiUrl}/${clusterId}/topic`; +export const uriTopicIncreasePartition = (clusterId, topicId) => + `${apiUrl}/${clusterId}/topic/${topicId}/partitions`; + export const uriTopicsProduce = (clusterId, topicName) => `${apiUrl}/${clusterId}/topic/${topicName}/data`; @@ -380,5 +383,6 @@ export default { uriLiveTail, uriTopicDataSearch, uriTopicDataDelete, + uriTopicIncreasePartition, uriDeleteGroupOffsets };