diff --git a/src/Testcontainers.Kafka/ApacheKafkaBuilder.cs b/src/Testcontainers.Kafka/ApacheKafkaBuilder.cs
new file mode 100644
index 000000000..704f87fd1
--- /dev/null
+++ b/src/Testcontainers.Kafka/ApacheKafkaBuilder.cs
@@ -0,0 +1,58 @@
+using Testcontainers.Kafka;
+
+namespace Testcontainers.ApacheKafka;
+
+///
+[PublicAPI]
+public sealed class ApacheKafkaBuilder : BaseKafkaBuilder
+{
+ public override string KafkaImage => "apache/kafka:3.9.0";
+ protected override string ReadyMessage => ".*Transitioning from RECOVERY to RUNNING.*";
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ public ApacheKafkaBuilder() : this(new KafkaConfiguration())
+ {
+ DockerResourceConfiguration = Init().DockerResourceConfiguration;
+ }
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The Docker resource configuration.
+ private ApacheKafkaBuilder(KafkaConfiguration resourceConfiguration) : base(resourceConfiguration)
+ {
+ DockerResourceConfiguration = resourceConfiguration;
+ }
+
+ ///
+ protected override KafkaConfiguration DockerResourceConfiguration { get; }
+
+ ///
+ protected override ApacheKafkaBuilder Init()
+ {
+ return base.Init()
+ .WithEnvironment("KAFKA_PROCESS_ROLES", "broker,controller")
+ .WithEnvironment("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER");
+ }
+
+ protected override string GetStartupScript(KafkaContainer container)
+ {
+ var additionalAdvertisedListeners = string.Join(",", container.AdvertisedListeners ?? Array.Empty());
+ var kafkaListener = $"PLAINTEXT://{container.Hostname}:{container.GetMappedPublicPort(KafkaConfiguration.KafkaPort)}";
+ var brokerListener = $"BROKER://{container.IpAddress}:{KafkaConfiguration.BrokerPort}";
+ return $"""
+ #!/bin/bash
+ export KAFKA_ADVERTISED_LISTENERS={kafkaListener},{brokerListener},{additionalAdvertisedListeners}
+ echo '' > /etc/kafka/docker/ensure
+ exec /etc/kafka/docker/run
+ """;
+ }
+
+ ///
+ protected override ApacheKafkaBuilder Merge(KafkaConfiguration oldValue, KafkaConfiguration newValue)
+ {
+ return new ApacheKafkaBuilder(new KafkaConfiguration(oldValue, newValue));
+ }
+}
\ No newline at end of file
diff --git a/src/Testcontainers.Kafka/BaseKafkaBuilder.cs b/src/Testcontainers.Kafka/BaseKafkaBuilder.cs
new file mode 100644
index 000000000..ffb2ea8b7
--- /dev/null
+++ b/src/Testcontainers.Kafka/BaseKafkaBuilder.cs
@@ -0,0 +1,113 @@
+namespace Testcontainers.Kafka;
+
+///
+[PublicAPI]
+public abstract class BaseKafkaBuilder : ContainerBuilder
+ where TBuilderEntity : BaseKafkaBuilder
+{
+ public abstract string KafkaImage { get; }
+ public const string StartupScriptPath = "/testcontainers.sh";
+
+ protected abstract string ReadyMessage { get; }
+
+ protected abstract string GetStartupScript(KafkaContainer container);
+
+ private const string ProtocolPrefix = "TC";
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The Docker resource configuration.
+ protected BaseKafkaBuilder(KafkaConfiguration resourceConfiguration) : base(resourceConfiguration)
+ {
+ }
+
+ ///
+ public override KafkaContainer Build()
+ {
+ Validate();
+ return new KafkaContainer(DockerResourceConfiguration);
+ }
+
+ ///
+ /// Adds a listener to the Kafka configuration in the format host:port.
+ ///
+ ///
+ /// The host will be included as a network alias, allowing additional connections
+ /// to the Kafka broker within the same container network.
+ ///
+ /// This method is useful for registering custom listeners beyond the default ones,
+ /// enabling specific connection points for Kafka brokers.
+ ///
+ /// Default listeners include:
+ /// - PLAINTEXT://:9092
+ /// - BROKER://:9093
+ /// - CONTROLLER://:9094
+ ///
+ /// The MsSql database.
+ /// A configured instance of .
+ public TBuilderEntity WithListener(string kafka)
+ {
+ var index = DockerResourceConfiguration.Listeners?.Count() ?? 0;
+ var protocol = $"{ProtocolPrefix}-{index}";
+ var listener = $"{protocol}://{kafka}";
+ var listenerSecurityProtocolMap = $"{protocol}:PLAINTEXT";
+
+ var listeners = new[] { listener };
+ var listenersSecurityProtocolMap = new[] { listenerSecurityProtocolMap };
+
+ var host = kafka.Split(':')[0];
+
+ var updatedListeners = DockerResourceConfiguration.Environments["KAFKA_LISTENERS"]
+ .Split(',')
+ .Concat(listeners);
+
+ var updatedListenersSecurityProtocolMap = DockerResourceConfiguration.Environments["KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"]
+ .Split(',')
+ .Concat(listenersSecurityProtocolMap);
+
+ return Merge(DockerResourceConfiguration, new KafkaConfiguration(listeners, listeners))
+ .WithEnvironment("KAFKA_LISTENERS", string.Join(",", updatedListeners))
+ .WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", string.Join(",", updatedListenersSecurityProtocolMap))
+ .WithNetworkAliases(host);
+ }
+
+ ///
+ protected override TBuilderEntity Init()
+ {
+ return base.Init()
+ .WithImage(KafkaImage)
+ .WithPortBinding(KafkaConfiguration.KafkaPort, true)
+ .WithPortBinding(KafkaConfiguration.BrokerPort, true)
+ .WithEnvironment("KAFKA_LISTENERS",
+ $"PLAINTEXT://:{KafkaConfiguration.KafkaPort},BROKER://:{KafkaConfiguration.BrokerPort},CONTROLLER://:{KafkaConfiguration.ControllerPort}")
+ .WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
+ .WithEnvironment("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER")
+ .WithEnvironment("KAFKA_BROKER_ID", "1")
+ .WithEnvironment("KAFKA_NODE_ID", "1")
+ .WithEnvironment("KAFKA_CONTROLLER_QUORUM_VOTERS", "1@localhost:" + KafkaConfiguration.ControllerPort)
+ .WithEnvironment("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
+ .WithEnvironment("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1")
+ .WithEnvironment("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
+ .WithEnvironment("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
+ .WithEnvironment("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", long.MaxValue.ToString())
+ .WithEnvironment("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0")
+ .WithEntrypoint("/bin/sh", "-c")
+ .WithCommand("while [ ! -f " + StartupScriptPath + " ]; do sleep 0.1; done; " + StartupScriptPath)
+ .WithWaitStrategy(Wait.ForUnixContainer().UntilMessageIsLogged(ReadyMessage))
+ .WithStartupCallback((container, ct) =>
+ container.CopyAsync(Encoding.Default.GetBytes(GetStartupScript(container)), StartupScriptPath, Unix.FileMode755, ct));
+ }
+
+ ///
+ protected override TBuilderEntity Clone(IResourceConfiguration resourceConfiguration)
+ {
+ return Merge(DockerResourceConfiguration, new KafkaConfiguration(resourceConfiguration));
+ }
+
+ ///
+ protected override TBuilderEntity Clone(IContainerConfiguration resourceConfiguration)
+ {
+ return Merge(DockerResourceConfiguration, new KafkaConfiguration(resourceConfiguration));
+ }
+}
\ No newline at end of file
diff --git a/src/Testcontainers.Kafka/KafkaBuilder.cs b/src/Testcontainers.Kafka/KafkaBuilder.cs
index efa81c439..6e2240fd4 100644
--- a/src/Testcontainers.Kafka/KafkaBuilder.cs
+++ b/src/Testcontainers.Kafka/KafkaBuilder.cs
@@ -2,27 +2,26 @@ namespace Testcontainers.Kafka;
///
[PublicAPI]
-public sealed class KafkaBuilder : ContainerBuilder
+public sealed class KafkaBuilder : BaseKafkaBuilder
{
- public const string KafkaImage = "confluentinc/cp-kafka:6.1.9";
-
- public const ushort KafkaPort = 9092;
-
+ [Obsolete("This constant will be removed. Please use KafkaConfiguration.BrokerPort instead")]
public const ushort BrokerPort = 9093;
+ [Obsolete("This constant will be removed. Please use KafkaConfiguration.ControllerPort instead")]
public const ushort ControllerPort = 9094;
- public const ushort ZookeeperPort = 2181;
-
+ [Obsolete("This constant will be removed. Please use BaseKafkaBuilder.StartupScriptPath instead")]
public const string StartupScriptFilePath = "/testcontainers.sh";
+
+ public const ushort ZookeeperPort = 2181;
+ public override string KafkaImage => "confluentinc/cp-kafka:6.1.9";
- private const string ProtocolPrefix = "TC";
+ protected override string ReadyMessage => @"\[KafkaServer id=\d+\] started";
///
/// Initializes a new instance of the class.
///
- public KafkaBuilder()
- : this(new KafkaConfiguration())
+ public KafkaBuilder() : this(new KafkaConfiguration())
{
DockerResourceConfiguration = Init().DockerResourceConfiguration;
}
@@ -39,117 +38,32 @@ private KafkaBuilder(KafkaConfiguration resourceConfiguration)
///
protected override KafkaConfiguration DockerResourceConfiguration { get; }
-
- ///
- public override KafkaContainer Build()
- {
- Validate();
- return new KafkaContainer(DockerResourceConfiguration);
- }
-
- ///
- /// Adds a listener to the Kafka configuration in the format host:port.
- ///
- ///
- /// The host will be included as a network alias, allowing additional connections
- /// to the Kafka broker within the same container network.
- ///
- /// This method is useful for registering custom listeners beyond the default ones,
- /// enabling specific connection points for Kafka brokers.
- ///
- /// Default listeners include:
- /// - PLAINTEXT://0.0.0.0:9092
- /// - BROKER://0.0.0.0:9093
- /// - CONTROLLER://0.0.0.0:9094
- ///
- /// The MsSql database.
- /// A configured instance of .
- public KafkaBuilder WithListener(string kafka)
- {
- var index = DockerResourceConfiguration.Listeners?.Count() ?? 0;
- var protocol = $"{ProtocolPrefix}-{index}";
- var listener = $"{protocol}://{kafka}";
- var listenerSecurityProtocolMap = $"{protocol}:PLAINTEXT";
-
- var listeners = new[] { listener };
- var listenersSecurityProtocolMap = new[] { listenerSecurityProtocolMap };
-
- var host = kafka.Split(':')[0];
-
- var updatedListeners = DockerResourceConfiguration.Environments["KAFKA_LISTENERS"]
- .Split(',')
- .Concat(listeners);
-
- var updatedListenersSecurityProtocolMap = DockerResourceConfiguration.Environments["KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"]
- .Split(',')
- .Concat(listenersSecurityProtocolMap);
-
- return Merge(DockerResourceConfiguration, new KafkaConfiguration(listeners, listeners))
- .WithEnvironment("KAFKA_LISTENERS", string.Join(",", updatedListeners))
- .WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", string.Join(",", updatedListenersSecurityProtocolMap))
- .WithNetworkAliases(host);
- }
-
+
///
protected override KafkaBuilder Init()
{
return base.Init()
- .WithImage(KafkaImage)
- .WithPortBinding(KafkaPort, true)
- .WithPortBinding(BrokerPort, true)
.WithPortBinding(ZookeeperPort, true)
- .WithEnvironment("KAFKA_LISTENERS", $"PLAINTEXT://0.0.0.0:{KafkaPort},BROKER://0.0.0.0:{BrokerPort},CONTROLLER://0.0.0.0:{ControllerPort}")
- .WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
- .WithEnvironment("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER")
- .WithEnvironment("KAFKA_BROKER_ID", "1")
- .WithEnvironment("KAFKA_NODE_ID", "1")
- .WithEnvironment("KAFKA_CONTROLLER_QUORUM_VOTERS", "1@localhost:" + ControllerPort)
- .WithEnvironment("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
- .WithEnvironment("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1")
- .WithEnvironment("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
- .WithEnvironment("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
- .WithEnvironment("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", long.MaxValue.ToString())
- .WithEnvironment("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0")
- .WithEnvironment("KAFKA_ZOOKEEPER_CONNECT", "localhost:" + ZookeeperPort)
- .WithEntrypoint("/bin/sh", "-c")
- .WithCommand("while [ ! -f " + StartupScriptFilePath + " ]; do sleep 0.1; done; " + StartupScriptFilePath)
- .WithWaitStrategy(Wait.ForUnixContainer().UntilMessageIsLogged("\\[KafkaServer id=\\d+\\] started"))
- .WithStartupCallback((container, ct) =>
- {
- const char lf = '\n';
- var additionalAdvertisedListeners = string.Join(",", container.AdvertisedListeners ?? Array.Empty());
- var startupScript = new StringBuilder();
- startupScript.Append("#!/bin/bash");
- startupScript.Append(lf);
- startupScript.Append("echo 'clientPort=" + ZookeeperPort + "' > zookeeper.properties");
- startupScript.Append(lf);
- startupScript.Append("echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties");
- startupScript.Append(lf);
- startupScript.Append("echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties");
- startupScript.Append(lf);
- startupScript.Append("zookeeper-server-start zookeeper.properties &");
- startupScript.Append(lf);
- startupScript.Append("export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://" + container.Hostname + ":" + container.GetMappedPublicPort(KafkaPort) + ",BROKER://" + container.IpAddress + ":" + BrokerPort + "," + additionalAdvertisedListeners);
- startupScript.Append(lf);
- startupScript.Append("echo '' > /etc/confluent/docker/ensure");
- startupScript.Append(lf);
- startupScript.Append("exec /etc/confluent/docker/run");
- return container.CopyAsync(Encoding.Default.GetBytes(startupScript.ToString()), StartupScriptFilePath, Unix.FileMode755, ct);
- });
+ .WithEnvironment("KAFKA_ZOOKEEPER_CONNECT", "localhost:" + ZookeeperPort);
}
- ///
- protected override KafkaBuilder Clone(IResourceConfiguration resourceConfiguration)
- {
- return Merge(DockerResourceConfiguration, new KafkaConfiguration(resourceConfiguration));
- }
-
- ///
- protected override KafkaBuilder Clone(IContainerConfiguration resourceConfiguration)
+ protected override string GetStartupScript(KafkaContainer container)
{
- return Merge(DockerResourceConfiguration, new KafkaConfiguration(resourceConfiguration));
+ var additionalAdvertisedListeners = string.Join(",", container.AdvertisedListeners ?? Array.Empty());
+ var kafkaListener = $"PLAINTEXT://{container.Hostname}:{container.GetMappedPublicPort(KafkaConfiguration.KafkaPort)}";
+ var brokerListener = $"BROKER://{container.IpAddress}:{KafkaConfiguration.BrokerPort}";
+ return $"""
+ #!/bin/bash
+ echo 'clientPort={ZookeeperPort}' > zookeeper.properties
+ echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties
+ echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties
+ zookeeper-server-start zookeeper.properties &
+ export KAFKA_ADVERTISED_LISTENERS={kafkaListener},{brokerListener},{additionalAdvertisedListeners}
+ echo '' > /etc/confluent/docker/ensure
+ exec /etc/confluent/docker/run
+ """;
}
-
+
///
protected override KafkaBuilder Merge(KafkaConfiguration oldValue, KafkaConfiguration newValue)
{
diff --git a/src/Testcontainers.Kafka/KafkaConfiguration.cs b/src/Testcontainers.Kafka/KafkaConfiguration.cs
index 7299e3488..0047efddf 100644
--- a/src/Testcontainers.Kafka/KafkaConfiguration.cs
+++ b/src/Testcontainers.Kafka/KafkaConfiguration.cs
@@ -4,6 +4,10 @@ namespace Testcontainers.Kafka;
[PublicAPI]
public sealed class KafkaConfiguration : ContainerConfiguration
{
+ public const ushort KafkaPort = 9092;
+ public const ushort BrokerPort = 9093;
+ public const ushort ControllerPort = 9094;
+
///
/// Initializes a new instance of the class.
///
diff --git a/src/Testcontainers.Kafka/KafkaContainer.cs b/src/Testcontainers.Kafka/KafkaContainer.cs
index 89a20db5b..3aca18fab 100644
--- a/src/Testcontainers.Kafka/KafkaContainer.cs
+++ b/src/Testcontainers.Kafka/KafkaContainer.cs
@@ -22,7 +22,7 @@ public KafkaContainer(KafkaConfiguration configuration)
/// The broker address.
public string GetBootstrapAddress()
{
- return new UriBuilder("PLAINTEXT", Hostname, GetMappedPublicPort(KafkaBuilder.KafkaPort)).ToString();
+ return new UriBuilder("PLAINTEXT", Hostname, GetMappedPublicPort(KafkaConfiguration.KafkaPort)).ToString();
}
///
diff --git a/tests/Testcontainers.Kafka.Tests/ApacheKafkaContainerTest.cs b/tests/Testcontainers.Kafka.Tests/ApacheKafkaContainerTest.cs
new file mode 100644
index 000000000..9919da584
--- /dev/null
+++ b/tests/Testcontainers.Kafka.Tests/ApacheKafkaContainerTest.cs
@@ -0,0 +1,53 @@
+using Testcontainers.ApacheKafka;
+
+namespace Testcontainers.Kafka;
+
+public sealed class ApacheKafkaContainerTest : IAsyncLifetime
+{
+ private readonly KafkaContainer _kafkaContainer = new ApacheKafkaBuilder().Build();
+
+ public Task InitializeAsync()
+ {
+ return _kafkaContainer.StartAsync();
+ }
+
+ public Task DisposeAsync()
+ {
+ return _kafkaContainer.DisposeAsync().AsTask();
+ }
+
+ [Fact]
+ [Trait(nameof(DockerCli.DockerPlatform), nameof(DockerCli.DockerPlatform.Linux))]
+ public async Task ConsumerReturnsProducerMessage()
+ {
+ // Given
+ const string topic = "sample";
+
+ var bootstrapServer = _kafkaContainer.GetBootstrapAddress();
+
+ var producerConfig = new ProducerConfig();
+ producerConfig.BootstrapServers = bootstrapServer;
+
+ var consumerConfig = new ConsumerConfig();
+ consumerConfig.BootstrapServers = bootstrapServer;
+ consumerConfig.GroupId = "sample-consumer";
+ consumerConfig.AutoOffsetReset = AutoOffsetReset.Earliest;
+
+ var message = new Message();
+ message.Value = Guid.NewGuid().ToString("D");
+
+ // When
+ using var producer = new ProducerBuilder(producerConfig).Build();
+ _ = await producer.ProduceAsync(topic, message)
+ .ConfigureAwait(true);
+
+ using var consumer = new ConsumerBuilder(consumerConfig).Build();
+ consumer.Subscribe(topic);
+
+ var result = consumer.Consume(TimeSpan.FromSeconds(15));
+
+ // Then
+ Assert.NotNull(result);
+ Assert.Equal(message.Value, result.Message.Value);
+ }
+}
\ No newline at end of file