From 73ac04c1741a3972708133a018a776330faa9041 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Tue, 19 Nov 2024 10:39:02 +1100 Subject: [PATCH 1/2] KafkaSinkCluster: Add kafka 3.9 integration test (#1821) --- shotover-proxy/tests/kafka_int_tests/mod.rs | 37 +++++++ .../docker-compose-kafka-3.9.yaml | 96 +++++++++++++++++++ .../src/transforms/kafka/sink_cluster/mod.rs | 27 ++++++ test-helpers/src/docker_compose.rs | 7 +- 4 files changed, 166 insertions(+), 1 deletion(-) create mode 100644 shotover-proxy/tests/test-configs/kafka/cluster-2-racks/docker-compose-kafka-3.9.yaml diff --git a/shotover-proxy/tests/kafka_int_tests/mod.rs b/shotover-proxy/tests/kafka_int_tests/mod.rs index 3059529c7..7675093b2 100644 --- a/shotover-proxy/tests/kafka_int_tests/mod.rs +++ b/shotover-proxy/tests/kafka_int_tests/mod.rs @@ -596,6 +596,43 @@ async fn cluster_2_racks_multi_shotover(#[case] driver: KafkaDriver) { } } +#[rstest] +#[cfg_attr(feature = "kafka-cpp-driver-tests", case::cpp(KafkaDriver::Cpp))] +#[case::java(KafkaDriver::Java)] +#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear +async fn cluster_2_racks_multi_shotover_kafka_3_9(#[case] driver: KafkaDriver) { + let _docker_compose = + docker_compose("tests/test-configs/kafka/cluster-2-racks/docker-compose-kafka-3.9.yaml"); + + // One shotover instance per rack + let mut shotovers = vec![]; + for i in 1..3 { + shotovers.push( + shotover_process(&format!( + "tests/test-configs/kafka/cluster-2-racks/topology-rack{i}.yaml" + )) + .with_config(&format!( + "tests/test-configs/shotover-config/config{i}.yaml" + )) + .with_log_name(&format!("shotover{i}")) + .start() + .await, + ); + } + + let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192"); + test_cases::cluster_test_suite(&connection_builder).await; + + for shotover in shotovers { + tokio::time::timeout( + Duration::from_secs(10), + shotover.shutdown_and_then_consume_events(&multi_shotover_events()), + ) + .await + .expect("Shotover did not shutdown within 10s"); + } +} + #[rstest] //#[cfg_attr(feature = "kafka-cpp-driver-tests", case::cpp(KafkaDriver::Cpp))] // CPP driver does not support scram #[case::java(KafkaDriver::Java)] diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/docker-compose-kafka-3.9.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/docker-compose-kafka-3.9.yaml new file mode 100644 index 000000000..430d8e1c0 --- /dev/null +++ b/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/docker-compose-kafka-3.9.yaml @@ -0,0 +1,96 @@ +networks: + cluster_subnet: + name: cluster_subnet + driver: bridge + ipam: + driver: default + config: + - subnet: 172.16.1.0/24 + gateway: 172.16.1.1 +services: + kafka0: + image: &image 'bitnami/kafka:3.9.0-debian-12-r3' + networks: + cluster_subnet: + ipv4_address: 172.16.1.2 + environment: &environment + KAFKA_CFG_LISTENERS: "BROKER://:9092,CONTROLLER://:9093" + KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.2:9092" + KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT" + KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "BROKER" + KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "CONTROLLER" + KAFKA_CFG_PROCESS_ROLES: "controller,broker" + KAFKA_KRAFT_CLUSTER_ID: "abcdefghijklmnopqrstuv" + KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "0@kafka0:9093,1@kafka1:9093,2@kafka2:9093,3@kafka3:9093,4@kafka4:9093,5@kafka5:9093" + KAFKA_CFG_NODE_ID: 0 + KAFKA_CFG_BROKER_RACK: "rack1" + ALLOW_PLAINTEXT_LISTENER: "yes" + # Required for high availability + KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 + KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3 + KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR: 2 + + # This cfg is set to 3000 by default, which for a typical workload reduces the overhead of creating a + # new consumer group by avoiding constant rebalances as each initial consumer joins. + # See: https://cwiki.apache.org/confluence/display/KAFKA/KIP-134%3A+Delay+initial+consumer+group+rebalance + # + # However for an integration test workload we are constantly spinning up single consumer groups, so the default value makes the tests take twice as long to run. + KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS: "0" + volumes: &volumes + - type: tmpfs + target: /bitnami/kafka + kafka1: + image: *image + networks: + cluster_subnet: + ipv4_address: 172.16.1.3 + environment: + <<: *environment + KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.3:9092" + KAFKA_CFG_NODE_ID: 1 + KAFKA_CFG_BROKER_RACK: "rack1" + volumes: *volumes + kafka2: + image: *image + networks: + cluster_subnet: + ipv4_address: 172.16.1.4 + environment: + <<: *environment + KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.4:9092" + KAFKA_CFG_NODE_ID: 2 + KAFKA_CFG_BROKER_RACK: "rack1" + volumes: *volumes + kafka3: + image: *image + networks: + cluster_subnet: + ipv4_address: 172.16.1.5 + environment: + <<: *environment + KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.5:9092" + KAFKA_CFG_NODE_ID: 3 + KAFKA_CFG_BROKER_RACK: "rack2" + volumes: *volumes + kafka4: + image: *image + networks: + cluster_subnet: + ipv4_address: 172.16.1.6 + environment: + <<: *environment + KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.6:9092" + KAFKA_CFG_NODE_ID: 4 + KAFKA_CFG_BROKER_RACK: "rack2" + volumes: *volumes + kafka5: + image: *image + networks: + cluster_subnet: + ipv4_address: 172.16.1.7 + environment: + <<: *environment + KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.7:9092" + KAFKA_CFG_NODE_ID: 5 + KAFKA_CFG_BROKER_RACK: "rack2" + volumes: *volumes diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index c684b1e39..5acacd78c 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -3077,6 +3077,33 @@ The connection to the client has been closed." self.rewrite_describe_cluster_response(describe_cluster)?; response.invalidate_cache(); } + Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::ApiVersions(api_versions), + .. + })) => { + let original_size = api_versions.api_keys.len(); + + // List of keys that shotover doesnt support and so should be removed from supported keys list + let disable_keys = [ + // This message type has very little documentation available and kafka responds to it with an error code 35 UNSUPPORTED_VERSION + // So its not clear at all how to implement this and its not even possible to test it. + // Instead lets just ask the client to not send it at all. + // We can consider supporting it when kafka itself starts to support it but we will need to be very + // careful to correctly implement the pagination/cursor logic. + ApiKey::DescribeTopicPartitionsKey as i16, + // This message type is part of the new consumer group API, we should implement support for it in the future. + // I've disabled it for now to keep the scope down for kafka 3.9 support. + ApiKey::ConsumerGroupDescribeKey as i16, + ]; + api_versions + .api_keys + .retain(|x| !disable_keys.contains(&x.api_key)); + + if original_size != api_versions.api_keys.len() { + // only invalidate the cache if we actually removed anything + response.invalidate_cache(); + } + } _ => {} } diff --git a/test-helpers/src/docker_compose.rs b/test-helpers/src/docker_compose.rs index e8c2e9378..adb99308b 100644 --- a/test-helpers/src/docker_compose.rs +++ b/test-helpers/src/docker_compose.rs @@ -20,7 +20,7 @@ pub fn new_moto() -> DockerCompose { docker_compose("tests/transforms/docker-compose-moto.yaml") } -pub static IMAGE_WAITERS: [Image; 11] = [ +pub static IMAGE_WAITERS: [Image; 12] = [ Image { name: "motoserver/moto", log_regex_to_wait_for: r"Press CTRL\+C to quit", @@ -75,6 +75,11 @@ pub static IMAGE_WAITERS: [Image; 11] = [ log_regex_to_wait_for: r"Kafka Server started", timeout: Duration::from_secs(120), }, + Image { + name: "bitnami/kafka:3.9.0-debian-12-r3", + log_regex_to_wait_for: r"Kafka Server started", + timeout: Duration::from_secs(120), + }, Image { name: "opensearchproject/opensearch:2.9.0", log_regex_to_wait_for: r"Node started", From 3f76b54965f830fac8fb9802ea54b31e7f711e0b Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Tue, 19 Nov 2024 11:05:43 +1100 Subject: [PATCH 2/2] Error on UnregisterBroker (#1820) --- shotover/src/transforms/kafka/sink_cluster/mod.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 5acacd78c..6a4c72a71 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -1101,6 +1101,17 @@ The connection to the client has been closed."# return Err(anyhow!( "Client sent ControlledShutdown request. Shotover cannot handle this request as it is not appropriate for shotover to shutdown. +The connection to the client has been closed." + )); + } + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::UnregisterBroker(_), + .. + })) => { + // This message types is a replacement for ControlledShutdown so the same reasoning applies. + return Err(anyhow!( + "Client sent UnregisterBroker request. +Shotover cannot handle this request as it is not appropriate for shotover to shutdown. The connection to the client has been closed." )); }