diff --git a/src/main/java/org/akhq/controllers/GroupController.java b/src/main/java/org/akhq/controllers/GroupController.java index 6d1a6aabf..893534850 100644 --- a/src/main/java/org/akhq/controllers/GroupController.java +++ b/src/main/java/org/akhq/controllers/GroupController.java @@ -61,6 +61,8 @@ public GroupController( @Get @Operation(tags = {"consumer group"}, summary = "List all consumer groups") public ResultPagedList list(HttpRequest request, String cluster, Optional search, Optional page) throws ExecutionException, InterruptedException { + checkIfClusterAllowed(cluster); + URIBuilder uri = URIBuilder.fromURI(request.getUri()); Pagination pagination = new Pagination(pageSize, uri, page.orElse(1)); @@ -70,30 +72,39 @@ public ResultPagedList list(HttpRequest request, String cluste @Get("{groupName}") @Operation(tags = {"consumer group"}, summary = "Retrieve a consumer group") public ConsumerGroup home(String cluster, String groupName) throws ExecutionException, InterruptedException { + checkIfClusterAndResourceAllowed(cluster, groupName); + return this.consumerGroupRepository.findByName(cluster, groupName, buildUserBasedResourceFilters(cluster)); } @Get("{groupName}/offsets") @Operation(tags = {"consumer group"}, summary = "Retrieve a consumer group offsets") public List offsets(String cluster, String groupName) throws ExecutionException, InterruptedException { + checkIfClusterAndResourceAllowed(cluster, groupName); + return this.consumerGroupRepository.findByName(cluster, groupName, buildUserBasedResourceFilters(cluster)).getOffsets(); } @Get("{groupName}/members") @Operation(tags = {"consumer group"}, summary = "Retrieve a consumer group members") public List members(String cluster, String groupName) throws ExecutionException, InterruptedException { + checkIfClusterAndResourceAllowed(cluster, groupName); + return this.consumerGroupRepository.findByName(cluster, groupName, buildUserBasedResourceFilters(cluster)).getMembers(); } @Get("{groupName}/acls") @Operation(tags = {"consumer group"}, summary = "Retrieve a consumer group acls") public List acls(String cluster, String groupName) throws ExecutionException, InterruptedException { + checkIfClusterAndResourceAllowed(cluster, groupName); + return aclRepository.findByResourceType(cluster, ResourceType.GROUP, groupName); } @Get("topics") @Operation(tags = {"consumer group"}, summary = "Retrieve consumer group for list of topics") public List filterByTopics(String cluster, Optional> topics) { + checkIfClusterAllowed(cluster); return topics.map( topicsName -> { @@ -115,6 +126,8 @@ public HttpResponse offsets( String groupName, @Body List offsets ) { + checkIfClusterAndResourceAllowed(cluster, groupName); + this.consumerGroupRepository.updateOffsets( cluster, groupName, @@ -135,6 +148,8 @@ public HttpResponse offsets( @Get("{groupName}/offsets/start") @Operation(tags = {"consumer group"}, summary = "Retrive consumer group offsets by timestamp") public List offsetsStart(String cluster, String groupName, Instant timestamp) throws ExecutionException, InterruptedException { + checkIfClusterAndResourceAllowed(cluster, groupName); + ConsumerGroup group = this.consumerGroupRepository.findByName( cluster, groupName, buildUserBasedResourceFilters(cluster)); @@ -152,6 +167,8 @@ public List offsetsStart(String cluster, String gro @Delete("{groupName}") @Operation(tags = {"consumer group"}, summary = "Delete a consumer group") public HttpResponse delete(String cluster, String groupName) throws ExecutionException, InterruptedException { + checkIfClusterAndResourceAllowed(cluster, groupName); + this.kafkaWrapper.deleteConsumerGroups(cluster, groupName); return HttpResponse.noContent(); @@ -161,6 +178,8 @@ public HttpResponse delete(String cluster, String groupName) throws Execution @Delete("{groupName}/topic/{topicName}") @Operation(tags = {"consumer group"}, summary = "Delete group offsets of given topic") public HttpResponse deleteConsumerGroupOffsets(String cluster, String groupName, String topicName) throws ExecutionException { + checkIfClusterAndResourceAllowed(cluster, groupName); + this.kafkaWrapper.deleteConsumerGroupOffsets(cluster, groupName, topicName); return HttpResponse.noContent(); }