Skip to content

Commit

Permalink
fix(groups): check permissions on cluster/resource for actions on con…
Browse files Browse the repository at this point in the history
…sumer groups (tchiotludo#1937)

Fixes tchiotludo#1936.
  • Loading branch information
alwibrm authored and jamfor352 committed Oct 29, 2024
1 parent 524d39a commit 1b2feb6
Showing 1 changed file with 19 additions and 0 deletions.
19 changes: 19 additions & 0 deletions src/main/java/org/akhq/controllers/GroupController.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public GroupController(
@Get
@Operation(tags = {"consumer group"}, summary = "List all consumer groups")
public ResultPagedList<ConsumerGroup> list(HttpRequest<?> request, String cluster, Optional<String> search, Optional<Integer> page) throws ExecutionException, InterruptedException {
checkIfClusterAllowed(cluster);

URIBuilder uri = URIBuilder.fromURI(request.getUri());
Pagination pagination = new Pagination(pageSize, uri, page.orElse(1));

Expand All @@ -70,30 +72,39 @@ public ResultPagedList<ConsumerGroup> list(HttpRequest<?> request, String cluste
@Get("{groupName}")
@Operation(tags = {"consumer group"}, summary = "Retrieve a consumer group")
public ConsumerGroup home(String cluster, String groupName) throws ExecutionException, InterruptedException {
checkIfClusterAndResourceAllowed(cluster, groupName);

return this.consumerGroupRepository.findByName(cluster, groupName, buildUserBasedResourceFilters(cluster));
}

@Get("{groupName}/offsets")
@Operation(tags = {"consumer group"}, summary = "Retrieve a consumer group offsets")
public List<TopicPartition.ConsumerGroupOffset> offsets(String cluster, String groupName) throws ExecutionException, InterruptedException {
checkIfClusterAndResourceAllowed(cluster, groupName);

return this.consumerGroupRepository.findByName(cluster, groupName, buildUserBasedResourceFilters(cluster)).getOffsets();
}

@Get("{groupName}/members")
@Operation(tags = {"consumer group"}, summary = "Retrieve a consumer group members")
public List<Consumer> members(String cluster, String groupName) throws ExecutionException, InterruptedException {
checkIfClusterAndResourceAllowed(cluster, groupName);

return this.consumerGroupRepository.findByName(cluster, groupName, buildUserBasedResourceFilters(cluster)).getMembers();
}

@Get("{groupName}/acls")
@Operation(tags = {"consumer group"}, summary = "Retrieve a consumer group acls")
public List<AccessControl> acls(String cluster, String groupName) throws ExecutionException, InterruptedException {
checkIfClusterAndResourceAllowed(cluster, groupName);

return aclRepository.findByResourceType(cluster, ResourceType.GROUP, groupName);
}

@Get("topics")
@Operation(tags = {"consumer group"}, summary = "Retrieve consumer group for list of topics")
public List filterByTopics(String cluster, Optional<List<String>> topics) {
checkIfClusterAllowed(cluster);

return topics.map(
topicsName -> {
Expand All @@ -115,6 +126,8 @@ public HttpResponse<?> offsets(
String groupName,
@Body List<OffsetsUpdate> offsets
) {
checkIfClusterAndResourceAllowed(cluster, groupName);

this.consumerGroupRepository.updateOffsets(
cluster,
groupName,
Expand All @@ -135,6 +148,8 @@ public HttpResponse<?> offsets(
@Get("{groupName}/offsets/start")
@Operation(tags = {"consumer group"}, summary = "Retrive consumer group offsets by timestamp")
public List<RecordRepository.TimeOffset> offsetsStart(String cluster, String groupName, Instant timestamp) throws ExecutionException, InterruptedException {
checkIfClusterAndResourceAllowed(cluster, groupName);

ConsumerGroup group = this.consumerGroupRepository.findByName(
cluster, groupName, buildUserBasedResourceFilters(cluster));

Expand All @@ -152,6 +167,8 @@ public List<RecordRepository.TimeOffset> offsetsStart(String cluster, String gro
@Delete("{groupName}")
@Operation(tags = {"consumer group"}, summary = "Delete a consumer group")
public HttpResponse<?> delete(String cluster, String groupName) throws ExecutionException, InterruptedException {
checkIfClusterAndResourceAllowed(cluster, groupName);

this.kafkaWrapper.deleteConsumerGroups(cluster, groupName);

return HttpResponse.noContent();
Expand All @@ -161,6 +178,8 @@ public HttpResponse<?> delete(String cluster, String groupName) throws Execution
@Delete("{groupName}/topic/{topicName}")
@Operation(tags = {"consumer group"}, summary = "Delete group offsets of given topic")
public HttpResponse<?> deleteConsumerGroupOffsets(String cluster, String groupName, String topicName) throws ExecutionException {
checkIfClusterAndResourceAllowed(cluster, groupName);

this.kafkaWrapper.deleteConsumerGroupOffsets(cluster, groupName, topicName);
return HttpResponse.noContent();
}
Expand Down

0 comments on commit 1b2feb6

Please sign in to comment.