Skip to content

Commit

Permalink
Merge branch 'main' into INS-33141-redis-to-valkey-part-1
Browse files Browse the repository at this point in the history
  • Loading branch information
ronycsdu authored Nov 19, 2024
2 parents 2b5540c + 3f76b54 commit 414e04b
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 1 deletion.
37 changes: 37 additions & 0 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,43 @@ async fn cluster_2_racks_multi_shotover(#[case] driver: KafkaDriver) {
}
}

#[rstest]
#[cfg_attr(feature = "kafka-cpp-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn cluster_2_racks_multi_shotover_kafka_3_9(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-2-racks/docker-compose-kafka-3.9.yaml");

// One shotover instance per rack
let mut shotovers = vec![];
for i in 1..3 {
shotovers.push(
shotover_process(&format!(
"tests/test-configs/kafka/cluster-2-racks/topology-rack{i}.yaml"
))
.with_config(&format!(
"tests/test-configs/shotover-config/config{i}.yaml"
))
.with_log_name(&format!("shotover{i}"))
.start()
.await,
);
}

let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
test_cases::cluster_test_suite(&connection_builder).await;

for shotover in shotovers {
tokio::time::timeout(
Duration::from_secs(10),
shotover.shutdown_and_then_consume_events(&multi_shotover_events()),
)
.await
.expect("Shotover did not shutdown within 10s");
}
}

#[rstest]
//#[cfg_attr(feature = "kafka-cpp-driver-tests", case::cpp(KafkaDriver::Cpp))] // CPP driver does not support scram
#[case::java(KafkaDriver::Java)]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
networks:
cluster_subnet:
name: cluster_subnet
driver: bridge
ipam:
driver: default
config:
- subnet: 172.16.1.0/24
gateway: 172.16.1.1
services:
kafka0:
image: &image 'bitnami/kafka:3.9.0-debian-12-r3'
networks:
cluster_subnet:
ipv4_address: 172.16.1.2
environment: &environment
KAFKA_CFG_LISTENERS: "BROKER://:9092,CONTROLLER://:9093"
KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.2:9092"
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT"
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "BROKER"
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_CFG_PROCESS_ROLES: "controller,broker"
KAFKA_KRAFT_CLUSTER_ID: "abcdefghijklmnopqrstuv"
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "0@kafka0:9093,1@kafka1:9093,2@kafka2:9093,3@kafka3:9093,4@kafka4:9093,5@kafka5:9093"
KAFKA_CFG_NODE_ID: 0
KAFKA_CFG_BROKER_RACK: "rack1"
ALLOW_PLAINTEXT_LISTENER: "yes"
# Required for high availability
KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR: 2

# This cfg is set to 3000 by default, which for a typical workload reduces the overhead of creating a
# new consumer group by avoiding constant rebalances as each initial consumer joins.
# See: https://cwiki.apache.org/confluence/display/KAFKA/KIP-134%3A+Delay+initial+consumer+group+rebalance
#
# However for an integration test workload we are constantly spinning up single consumer groups, so the default value makes the tests take twice as long to run.
KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS: "0"
volumes: &volumes
- type: tmpfs
target: /bitnami/kafka
kafka1:
image: *image
networks:
cluster_subnet:
ipv4_address: 172.16.1.3
environment:
<<: *environment
KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.3:9092"
KAFKA_CFG_NODE_ID: 1
KAFKA_CFG_BROKER_RACK: "rack1"
volumes: *volumes
kafka2:
image: *image
networks:
cluster_subnet:
ipv4_address: 172.16.1.4
environment:
<<: *environment
KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.4:9092"
KAFKA_CFG_NODE_ID: 2
KAFKA_CFG_BROKER_RACK: "rack1"
volumes: *volumes
kafka3:
image: *image
networks:
cluster_subnet:
ipv4_address: 172.16.1.5
environment:
<<: *environment
KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.5:9092"
KAFKA_CFG_NODE_ID: 3
KAFKA_CFG_BROKER_RACK: "rack2"
volumes: *volumes
kafka4:
image: *image
networks:
cluster_subnet:
ipv4_address: 172.16.1.6
environment:
<<: *environment
KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.6:9092"
KAFKA_CFG_NODE_ID: 4
KAFKA_CFG_BROKER_RACK: "rack2"
volumes: *volumes
kafka5:
image: *image
networks:
cluster_subnet:
ipv4_address: 172.16.1.7
environment:
<<: *environment
KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.7:9092"
KAFKA_CFG_NODE_ID: 5
KAFKA_CFG_BROKER_RACK: "rack2"
volumes: *volumes
38 changes: 38 additions & 0 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1101,6 +1101,17 @@ The connection to the client has been closed."#
return Err(anyhow!(
"Client sent ControlledShutdown request.
Shotover cannot handle this request as it is not appropriate for shotover to shutdown.
The connection to the client has been closed."
));
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::UnregisterBroker(_),
..
})) => {
// This message types is a replacement for ControlledShutdown so the same reasoning applies.
return Err(anyhow!(
"Client sent UnregisterBroker request.
Shotover cannot handle this request as it is not appropriate for shotover to shutdown.
The connection to the client has been closed."
));
}
Expand Down Expand Up @@ -3077,6 +3088,33 @@ The connection to the client has been closed."
self.rewrite_describe_cluster_response(describe_cluster)?;
response.invalidate_cache();
}
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::ApiVersions(api_versions),
..
})) => {
let original_size = api_versions.api_keys.len();

// List of keys that shotover doesnt support and so should be removed from supported keys list
let disable_keys = [
// This message type has very little documentation available and kafka responds to it with an error code 35 UNSUPPORTED_VERSION
// So its not clear at all how to implement this and its not even possible to test it.
// Instead lets just ask the client to not send it at all.
// We can consider supporting it when kafka itself starts to support it but we will need to be very
// careful to correctly implement the pagination/cursor logic.
ApiKey::DescribeTopicPartitionsKey as i16,
// This message type is part of the new consumer group API, we should implement support for it in the future.
// I've disabled it for now to keep the scope down for kafka 3.9 support.
ApiKey::ConsumerGroupDescribeKey as i16,
];
api_versions
.api_keys
.retain(|x| !disable_keys.contains(&x.api_key));

if original_size != api_versions.api_keys.len() {
// only invalidate the cache if we actually removed anything
response.invalidate_cache();
}
}
_ => {}
}

Expand Down
7 changes: 6 additions & 1 deletion test-helpers/src/docker_compose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub fn new_moto() -> DockerCompose {
docker_compose("tests/transforms/docker-compose-moto.yaml")
}

pub static IMAGE_WAITERS: [Image; 11] = [
pub static IMAGE_WAITERS: [Image; 12] = [
Image {
name: "motoserver/moto",
log_regex_to_wait_for: r"Press CTRL\+C to quit",
Expand Down Expand Up @@ -75,6 +75,11 @@ pub static IMAGE_WAITERS: [Image; 11] = [
log_regex_to_wait_for: r"Kafka Server started",
timeout: Duration::from_secs(120),
},
Image {
name: "bitnami/kafka:3.9.0-debian-12-r3",
log_regex_to_wait_for: r"Kafka Server started",
timeout: Duration::from_secs(120),
},
Image {
name: "opensearchproject/opensearch:2.9.0",
log_regex_to_wait_for: r"Node started",
Expand Down

0 comments on commit 414e04b

Please sign in to comment.