Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(groups): check permissions on cluster/resource for actions on consumer groups (#1936) #1937

Merged
merged 3 commits into from
Oct 1, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/main/java/org/akhq/controllers/GroupController.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public GroupController(
@Get
@Operation(tags = {"consumer group"}, summary = "List all consumer groups")
public ResultPagedList<ConsumerGroup> list(HttpRequest<?> request, String cluster, Optional<String> search, Optional<Integer> page) throws ExecutionException, InterruptedException {
checkIfClusterAllowed(cluster);

URIBuilder uri = URIBuilder.fromURI(request.getUri());
Pagination pagination = new Pagination(pageSize, uri, page.orElse(1));

Expand All @@ -70,30 +72,39 @@ public ResultPagedList<ConsumerGroup> list(HttpRequest<?> request, String cluste
@Get("{groupName}")
@Operation(tags = {"consumer group"}, summary = "Retrieve a consumer group")
public ConsumerGroup home(String cluster, String groupName) throws ExecutionException, InterruptedException {
checkIfClusterAndResourceAllowed(cluster, groupName);

return this.consumerGroupRepository.findByName(cluster, groupName, buildUserBasedResourceFilters(cluster));
}

@Get("{groupName}/offsets")
@Operation(tags = {"consumer group"}, summary = "Retrieve a consumer group offsets")
public List<TopicPartition.ConsumerGroupOffset> offsets(String cluster, String groupName) throws ExecutionException, InterruptedException {
checkIfClusterAndResourceAllowed(cluster, groupName);

return this.consumerGroupRepository.findByName(cluster, groupName, buildUserBasedResourceFilters(cluster)).getOffsets();
}

@Get("{groupName}/members")
@Operation(tags = {"consumer group"}, summary = "Retrieve a consumer group members")
public List<Consumer> members(String cluster, String groupName) throws ExecutionException, InterruptedException {
checkIfClusterAndResourceAllowed(cluster, groupName);

return this.consumerGroupRepository.findByName(cluster, groupName, buildUserBasedResourceFilters(cluster)).getMembers();
}

@Get("{groupName}/acls")
@Operation(tags = {"consumer group"}, summary = "Retrieve a consumer group acls")
public List<AccessControl> acls(String cluster, String groupName) throws ExecutionException, InterruptedException {
checkIfClusterAndResourceAllowed(cluster, groupName);

return aclRepository.findByResourceType(cluster, ResourceType.GROUP, groupName);
}

@Get("topics")
@Operation(tags = {"consumer group"}, summary = "Retrieve consumer group for list of topics")
public List filterByTopics(String cluster, Optional<List<String>> topics) {
checkIfClusterAllowed(cluster);

return topics.map(
topicsName -> {
Expand All @@ -115,6 +126,8 @@ public HttpResponse<?> offsets(
String groupName,
@Body List<OffsetsUpdate> offsets
) {
checkIfClusterAndResourceAllowed(cluster, groupName);

this.consumerGroupRepository.updateOffsets(
cluster,
groupName,
Expand All @@ -135,6 +148,8 @@ public HttpResponse<?> offsets(
@Get("{groupName}/offsets/start")
@Operation(tags = {"consumer group"}, summary = "Retrive consumer group offsets by timestamp")
public List<RecordRepository.TimeOffset> offsetsStart(String cluster, String groupName, Instant timestamp) throws ExecutionException, InterruptedException {
checkIfClusterAndResourceAllowed(cluster, groupName);

ConsumerGroup group = this.consumerGroupRepository.findByName(
cluster, groupName, buildUserBasedResourceFilters(cluster));

Expand All @@ -152,6 +167,8 @@ public List<RecordRepository.TimeOffset> offsetsStart(String cluster, String gro
@Delete("{groupName}")
@Operation(tags = {"consumer group"}, summary = "Delete a consumer group")
public HttpResponse<?> delete(String cluster, String groupName) throws ExecutionException, InterruptedException {
checkIfClusterAndResourceAllowed(cluster, groupName);

this.kafkaWrapper.deleteConsumerGroups(cluster, groupName);

return HttpResponse.noContent();
Expand All @@ -161,6 +178,8 @@ public HttpResponse<?> delete(String cluster, String groupName) throws Execution
@Delete("{groupName}/topic/{topicName}")
@Operation(tags = {"consumer group"}, summary = "Delete group offsets of given topic")
public HttpResponse<?> deleteConsumerGroupOffsets(String cluster, String groupName, String topicName) throws ExecutionException {
checkIfClusterAndResourceAllowed(cluster, groupName);

this.kafkaWrapper.deleteConsumerGroupOffsets(cluster, groupName, topicName);
return HttpResponse.noContent();
}
Expand Down