Skip to content

Commit

Permalink
KafkaSinkCluster: route DescribeGroups request (#1816)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Nov 18, 2024
1 parent cb8cf1b commit e50d528
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 33 deletions.
65 changes: 53 additions & 12 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ use std::{collections::HashMap, time::Duration};
use test_helpers::{
connection::kafka::{
Acl, AclOperation, AclPermissionType, AlterConfig, ConfigEntry, ConsumerConfig,
ExpectedResponse, IsolationLevel, KafkaAdmin, KafkaConnectionBuilder, KafkaConsumer,
KafkaDriver, KafkaProducer, ListOffsetsResultInfo, NewPartition, NewPartitionReassignment,
NewTopic, OffsetAndMetadata, OffsetSpec, Record, RecordsToDelete, ResourcePatternType,
ResourceSpecifier, ResourceType, TopicPartition, TransactionDescription,
ConsumerGroupDescription, ExpectedResponse, IsolationLevel, KafkaAdmin,
KafkaConnectionBuilder, KafkaConsumer, KafkaDriver, KafkaProducer, ListOffsetsResultInfo,
NewPartition, NewPartitionReassignment, NewTopic, OffsetAndMetadata, OffsetSpec, Record,
RecordsToDelete, ResourcePatternType, ResourceSpecifier, ResourceType, TopicPartition,
TransactionDescription,
},
docker_compose::DockerCompose,
};
Expand Down Expand Up @@ -1664,15 +1665,29 @@ async fn list_offsets(admin: &KafkaAdmin) {
assert_eq!(results, expected);
}

async fn list_groups(connection_builder: &KafkaConnectionBuilder) {
let admin = connection_builder.connect_admin().await;
let mut consumer = connection_builder
async fn list_and_describe_groups(connection_builder: &KafkaConnectionBuilder) {
// create consumers
let mut consumer1 = connection_builder
.connect_consumer(
ConsumerConfig::consume_from_topics(vec!["partitions1".to_owned()])
.with_group("list_groups_test"),
.with_group("list_groups_test1"),
)
.await;
consumer
consumer1
.assert_consume(ExpectedResponse {
message: "initial".to_owned(),
key: Some("Key".to_owned()),
topic_name: "partitions1".to_owned(),
offset: Some(0),
})
.await;
let mut consumer2 = connection_builder
.connect_consumer(
ConsumerConfig::consume_from_topics(vec!["partitions1".to_owned()])
.with_group("list_groups_test2"),
)
.await;
consumer2
.assert_consume(ExpectedResponse {
message: "initial".to_owned(),
key: Some("Key".to_owned()),
Expand All @@ -1681,10 +1696,36 @@ async fn list_groups(connection_builder: &KafkaConnectionBuilder) {
})
.await;

// observe consumers
let admin = connection_builder.connect_admin().await;
let actual_results = admin.list_groups().await;
if !actual_results.contains(&"list_groups_test".to_owned()) {
panic!("Expected to find \"list_groups_test\" in {actual_results:?} but was missing")
if !actual_results.contains(&"list_groups_test1".to_owned()) {
panic!("Expected to find \"list_groups_test1\" in {actual_results:?} but was missing")
}
if !actual_results.contains(&"list_groups_test2".to_owned()) {
panic!("Expected to find \"list_groups_test2\" in {actual_results:?} but was missing")
}

let result = admin
.describe_groups(&["list_groups_test1", "list_groups_test2"])
.await;
assert_eq!(
result,
HashMap::from([
(
"list_groups_test1".to_owned(),
ConsumerGroupDescription {
is_simple_consumer: false
}
),
(
"list_groups_test2".to_owned(),
ConsumerGroupDescription {
is_simple_consumer: false
}
),
])
)
}

async fn list_and_describe_transactions(connection_builder: &KafkaConnectionBuilder) {
Expand Down Expand Up @@ -1784,7 +1825,7 @@ pub async fn tests_requiring_all_shotover_nodes(connection_builder: &KafkaConnec
// rdkafka-rs doesnt support these methods
#[allow(irrefutable_let_patterns)]
if let KafkaConnectionBuilder::Java(_) = connection_builder {
list_groups(connection_builder).await;
list_and_describe_groups(connection_builder).await;
list_and_describe_transactions(connection_builder).await;
create_and_list_partition_reassignments(connection_builder).await;
}
Expand Down
85 changes: 72 additions & 13 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ use kafka_protocol::messages::produce_response::{
use kafka_protocol::messages::{
AddOffsetsToTxnRequest, AddPartitionsToTxnRequest, AddPartitionsToTxnResponse, ApiKey,
BrokerId, DeleteGroupsRequest, DeleteGroupsResponse, DeleteRecordsRequest,
DeleteRecordsResponse, DescribeProducersRequest, DescribeProducersResponse,
DescribeTransactionsRequest, DescribeTransactionsResponse, EndTxnRequest, FetchRequest,
FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, GroupId, HeartbeatRequest,
InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, ListGroupsResponse,
ListOffsetsRequest, ListOffsetsResponse, ListTransactionsResponse, MetadataRequest,
MetadataResponse, OffsetFetchRequest, OffsetFetchResponse, OffsetForLeaderEpochRequest,
OffsetForLeaderEpochResponse, ProduceRequest, ProduceResponse, RequestHeader,
SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest, SyncGroupRequest,
TopicName, TransactionalId, TxnOffsetCommitRequest,
DeleteRecordsResponse, DescribeGroupsRequest, DescribeGroupsResponse, DescribeProducersRequest,
DescribeProducersResponse, DescribeTransactionsRequest, DescribeTransactionsResponse,
EndTxnRequest, FetchRequest, FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse,
GroupId, HeartbeatRequest, InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest,
ListGroupsResponse, ListOffsetsRequest, ListOffsetsResponse, ListTransactionsResponse,
MetadataRequest, MetadataResponse, OffsetFetchRequest, OffsetFetchResponse,
OffsetForLeaderEpochRequest, OffsetForLeaderEpochResponse, ProduceRequest, ProduceResponse,
RequestHeader, SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest,
SyncGroupRequest, TopicName, TransactionalId, TxnOffsetCommitRequest,
};
use kafka_protocol::protocol::StrBytes;
use kafka_protocol::ResponseError;
Expand All @@ -56,10 +56,11 @@ use serde::{Deserialize, Serialize};
use shotover_node::{ShotoverNode, ShotoverNodeConfig};
use split::{
AddPartitionsToTxnRequestSplitAndRouter, DeleteGroupsSplitAndRouter,
DeleteRecordsRequestSplitAndRouter, DescribeProducersRequestSplitAndRouter,
DescribeTransactionsSplitAndRouter, ListGroupsSplitAndRouter, ListOffsetsRequestSplitAndRouter,
ListTransactionsSplitAndRouter, OffsetFetchSplitAndRouter,
OffsetForLeaderEpochRequestSplitAndRouter, ProduceRequestSplitAndRouter, RequestSplitAndRouter,
DeleteRecordsRequestSplitAndRouter, DescribeGroupsSplitAndRouter,
DescribeProducersRequestSplitAndRouter, DescribeTransactionsSplitAndRouter,
ListGroupsSplitAndRouter, ListOffsetsRequestSplitAndRouter, ListTransactionsSplitAndRouter,
OffsetFetchSplitAndRouter, OffsetForLeaderEpochRequestSplitAndRouter,
ProduceRequestSplitAndRouter, RequestSplitAndRouter,
};
use std::collections::{HashMap, HashSet, VecDeque};
use std::hash::Hasher;
Expand Down Expand Up @@ -755,6 +756,14 @@ impl KafkaSinkCluster {
})) => {
self.store_group(&mut groups, offset_delete.group_id.clone());
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::DescribeGroups(offset_delete),
..
})) => {
for group_id in &offset_delete.groups {
self.store_group(&mut groups, group_id.clone());
}
}
Some(Frame::Kafka(KafkaFrame::Request {
body:
RequestBody::InitProducerId(InitProducerIdRequest {
Expand Down Expand Up @@ -989,6 +998,12 @@ impl KafkaSinkCluster {
})) => {
self.split_and_route_request::<DeleteGroupsSplitAndRouter>(request)?;
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::DescribeGroups(_),
..
})) => {
self.split_and_route_request::<DescribeGroupsSplitAndRouter>(request)?;
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::OffsetDelete(offset_delete),
..
Expand Down Expand Up @@ -1672,6 +1687,29 @@ The connection to the client has been closed."
result
}

/// This method removes all groups from the DescribeGroups request and returns them split up by their destination.
/// If any groups are unroutable they will have their BrokerId set to -1
fn split_describe_groups_request_by_destination(
&mut self,
body: &mut DescribeGroupsRequest,
) -> HashMap<BrokerId, Vec<GroupId>> {
let mut result: HashMap<BrokerId, Vec<GroupId>> = Default::default();

for group in body.groups.drain(..) {
if let Some(destination) = self.group_to_coordinator_broker.get(&group) {
let dest_groups = result.entry(*destination).or_default();
dest_groups.push(group);
} else {
tracing::warn!("no known coordinator for group {group:?}, routing request to a random broker so that a NOT_COORDINATOR or similar error is returned to the client");
let destination = BrokerId(-1);
let dest_groups = result.entry(destination).or_default();
dest_groups.push(group);
}
}

result
}

/// This method removes all topics from the list offsets request and returns them split up by their destination
/// If any topics are unroutable they will have their BrokerId set to -1
fn split_offset_for_leader_epoch_request_by_destination(
Expand Down Expand Up @@ -2291,6 +2329,10 @@ The connection to the client has been closed."
body: ResponseBody::ListTransactions(base),
..
})) => Self::combine_list_transactions(base, drain)?,
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::DescribeGroups(base),
..
})) => Self::combine_describe_groups(base, drain)?,
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::AddPartitionsToTxn(base),
version,
Expand Down Expand Up @@ -2655,6 +2697,23 @@ The connection to the client has been closed."
Ok(())
}

fn combine_describe_groups(
base: &mut DescribeGroupsResponse,
drain: impl Iterator<Item = Message>,
) -> Result<()> {
for mut next in drain {
if let Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::DescribeGroups(next),
..
})) = next.frame()
{
base.groups.extend(std::mem::take(&mut next.groups));
}
}

Ok(())
}

fn combine_add_partitions_to_txn(
base_add_partitions_to_txn: &mut AddPartitionsToTxnResponse,
drain: impl Iterator<Item = Message>,
Expand Down
34 changes: 31 additions & 3 deletions shotover/src/transforms/kafka/sink_cluster/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ use kafka_protocol::messages::{
list_offsets_request::ListOffsetsTopic, offset_fetch_request::OffsetFetchRequestGroup,
offset_for_leader_epoch_request::OffsetForLeaderTopic, produce_request::TopicProduceData,
AddPartitionsToTxnRequest, BrokerId, DeleteGroupsRequest, DeleteRecordsRequest,
DescribeProducersRequest, DescribeTransactionsRequest, GroupId, ListGroupsRequest,
ListOffsetsRequest, ListTransactionsRequest, OffsetFetchRequest, OffsetForLeaderEpochRequest,
ProduceRequest, TopicName, TransactionalId,
DescribeGroupsRequest, DescribeProducersRequest, DescribeTransactionsRequest, GroupId,
ListGroupsRequest, ListOffsetsRequest, ListTransactionsRequest, OffsetFetchRequest,
OffsetForLeaderEpochRequest, ProduceRequest, TopicName, TransactionalId,
};
use std::collections::HashMap;

Expand Down Expand Up @@ -336,3 +336,31 @@ impl RequestSplitAndRouter for OffsetFetchSplitAndRouter {
request.groups = item;
}
}

pub struct DescribeGroupsSplitAndRouter;

impl RequestSplitAndRouter for DescribeGroupsSplitAndRouter {
type Request = DescribeGroupsRequest;
type SubRequests = Vec<GroupId>;

fn split_by_destination(
transform: &mut KafkaSinkCluster,
request: &mut Self::Request,
) -> HashMap<BrokerId, Self::SubRequests> {
transform.split_describe_groups_request_by_destination(request)
}

fn get_request_frame(request: &mut Message) -> &mut Self::Request {
match request.frame() {
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::DescribeGroups(request),
..
})) => request,
_ => unreachable!(),
}
}

fn reassemble(request: &mut Self::Request, item: Self::SubRequests) {
request.groups = item;
}
}
37 changes: 32 additions & 5 deletions test-helpers/src/connection/kafka/java.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use super::{
Acl, AclOperation, AclPermissionType, AlterConfig, ConsumerConfig, ExpectedResponse,
ListOffsetsResultInfo, NewPartition, NewPartitionReassignment, NewTopic, OffsetAndMetadata,
OffsetSpec, PartitionReassignment, ProduceResult, ProducerState, Record, RecordsToDelete,
ResourcePatternType, ResourceSpecifier, ResourceType, TopicDescription, TopicPartition,
TopicPartitionInfo, TransactionDescription,
Acl, AclOperation, AclPermissionType, AlterConfig, ConsumerConfig, ConsumerGroupDescription,
ExpectedResponse, ListOffsetsResultInfo, NewPartition, NewPartitionReassignment, NewTopic,
OffsetAndMetadata, OffsetSpec, PartitionReassignment, ProduceResult, ProducerState, Record,
RecordsToDelete, ResourcePatternType, ResourceSpecifier, ResourceType, TopicDescription,
TopicPartition, TopicPartitionInfo, TransactionDescription,
};
use crate::connection::java::{map_iterator, Jvm, Value};
use anyhow::Result;
Expand Down Expand Up @@ -730,6 +730,33 @@ impl KafkaAdminJava {
results
}

pub async fn describe_groups(
&self,
group_ids: &[&str],
) -> HashMap<String, ConsumerGroupDescription> {
let group_ids = group_ids.iter().map(|x| self.jvm.new_string(x)).collect();
let group_ids = self.jvm.new_list("java.lang.String", group_ids);

let java_results = self
.admin
.call("describeConsumerGroups", vec![group_ids])
.call_async("all", vec![])
.await;
map_iterator(java_results)
.map(|(group_id, consumer_group_description)| {
(
group_id.into_rust(),
ConsumerGroupDescription {
is_simple_consumer: consumer_group_description
.cast("org.apache.kafka.clients.admin.ConsumerGroupDescription")
.call("isSimpleConsumerGroup", vec![])
.into_rust(),
},
)
})
.collect()
}

pub async fn list_groups(&self) -> Vec<String> {
let java_results = self
.admin
Expand Down
15 changes: 15 additions & 0 deletions test-helpers/src/connection/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,16 @@ impl KafkaAdmin {
Self::Java(java) => java.list_offsets(topic_partitions).await,
}
}
pub async fn describe_groups(
&self,
group_ids: &[&str],
) -> HashMap<String, ConsumerGroupDescription> {
match self {
#[cfg(feature = "kafka-cpp-driver-tests")]
Self::Cpp(_) => panic!("rdkafka-rs driver does not support describe_groups"),
Self::Java(java) => java.describe_groups(group_ids).await,
}
}

pub async fn list_groups(&self) -> Vec<String> {
match self {
Expand Down Expand Up @@ -750,6 +760,11 @@ impl IsolationLevel {
}
}

#[derive(PartialEq, Debug)]
pub struct ConsumerGroupDescription {
pub is_simple_consumer: bool,
}

#[derive(PartialEq, Debug)]
pub struct OffsetAndMetadata {
pub offset: i64,
Expand Down

0 comments on commit e50d528

Please sign in to comment.