From 9971d6a4e03892fe8ad635f70b19d93a71f6c187 Mon Sep 17 00:00:00 2001 From: Ramil Ibrayev Date: Fri, 24 Jan 2025 13:32:20 +0100 Subject: [PATCH 1/4] feat: Add Apache Kafka (KRaft) builder (#1347) --- .../ApacheKafkaBuilder.cs | 99 +++++++++++++++++++ .../ApacheKafkaContainerTest.cs | 53 ++++++++++ 2 files changed, 152 insertions(+) create mode 100644 src/Testcontainers.Kafka/ApacheKafkaBuilder.cs create mode 100644 tests/Testcontainers.Kafka.Tests/ApacheKafkaContainerTest.cs diff --git a/src/Testcontainers.Kafka/ApacheKafkaBuilder.cs b/src/Testcontainers.Kafka/ApacheKafkaBuilder.cs new file mode 100644 index 000000000..78272b6d3 --- /dev/null +++ b/src/Testcontainers.Kafka/ApacheKafkaBuilder.cs @@ -0,0 +1,99 @@ +using Testcontainers.Kafka; + +namespace Testcontainers.ApacheKafka; + +/// +[PublicAPI] +public sealed class ApacheKafkaBuilder : ContainerBuilder +{ + public const ushort KafkaPort = 9092; + public const ushort BrokerPort = 9093; + public const ushort ControllerPort = 9094; + + private const string KafkaImage = "apache/kafka:3.9.0"; + private const string StarterScript = "/testcontainers.sh"; + private const string KafkaNodeId = "1"; + + /// + /// Initializes a new instance of the class. + /// + public ApacheKafkaBuilder() : this(new KafkaConfiguration()) + { + DockerResourceConfiguration = Init().DockerResourceConfiguration; + } + + /// + /// Initializes a new instance of the class. + /// + /// The Docker resource configuration. + private ApacheKafkaBuilder(KafkaConfiguration resourceConfiguration) : base(resourceConfiguration) + { + DockerResourceConfiguration = resourceConfiguration; + } + + /// + public override KafkaContainer Build() + { + Validate(); + return new KafkaContainer(DockerResourceConfiguration); + } + + /// + protected override KafkaConfiguration DockerResourceConfiguration { get; } + + /// + protected override ApacheKafkaBuilder Init() + { + return base.Init() + .WithImage(KafkaImage) + .WithPortBinding(KafkaPort, true) + .WithEnvironment("KAFKA_LISTENERS", $"PLAINTEXT://:{KafkaPort},BROKER://:{BrokerPort},CONTROLLER://:{ControllerPort}") + .WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT") + .WithEnvironment("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER") + .WithEnvironment("KAFKA_PROCESS_ROLES", "broker,controller") + .WithEnvironment("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER") + .WithEnvironment("KAFKA_NODE_ID", KafkaNodeId) + .WithEnvironment("KAFKA_CONTROLLER_QUORUM_VOTERS", $"{KafkaNodeId}@localhost:{ControllerPort}") + .WithEnvironment("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1") + .WithEnvironment("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1") + .WithEnvironment("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1") + .WithEnvironment("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1") + .WithEnvironment("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", long.MaxValue.ToString()) + .WithEnvironment("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0") + .WithEntrypoint("/bin/sh", "-c") + .WithCommand($"while [ ! -f {StarterScript} ]; do sleep 0.1; done; {StarterScript}") + .WithWaitStrategy(Wait.ForUnixContainer().UntilMessageIsLogged(".*Transitioning from RECOVERY to RUNNING.*")) + .WithStartupCallback((container, ct) => + { + const char lf = '\n'; + var startupScript = new StringBuilder(); + startupScript.Append("#!/bin/bash"); + startupScript.Append(lf); + + var brokerListener = $"BROKER://{container.IpAddress}:{BrokerPort}"; + var listener = $"PLAINTEXT://{container.Hostname}:{container.GetMappedPublicPort(KafkaPort)}"; + startupScript.Append($"export KAFKA_ADVERTISED_LISTENERS={listener},{brokerListener}"); + startupScript.Append(lf); + startupScript.Append("exec /etc/kafka/docker/run"); + return container.CopyAsync(Encoding.Default.GetBytes(startupScript.ToString()), StarterScript, Unix.FileMode755, ct); + }); + } + + /// + protected override ApacheKafkaBuilder Clone(IResourceConfiguration resourceConfiguration) + { + return Merge(DockerResourceConfiguration, new KafkaConfiguration(resourceConfiguration)); + } + + /// + protected override ApacheKafkaBuilder Clone(IContainerConfiguration resourceConfiguration) + { + return Merge(DockerResourceConfiguration, new KafkaConfiguration(resourceConfiguration)); + } + + /// + protected override ApacheKafkaBuilder Merge(KafkaConfiguration oldValue, KafkaConfiguration newValue) + { + return new ApacheKafkaBuilder(new KafkaConfiguration(oldValue, newValue)); + } +} \ No newline at end of file diff --git a/tests/Testcontainers.Kafka.Tests/ApacheKafkaContainerTest.cs b/tests/Testcontainers.Kafka.Tests/ApacheKafkaContainerTest.cs new file mode 100644 index 000000000..9919da584 --- /dev/null +++ b/tests/Testcontainers.Kafka.Tests/ApacheKafkaContainerTest.cs @@ -0,0 +1,53 @@ +using Testcontainers.ApacheKafka; + +namespace Testcontainers.Kafka; + +public sealed class ApacheKafkaContainerTest : IAsyncLifetime +{ + private readonly KafkaContainer _kafkaContainer = new ApacheKafkaBuilder().Build(); + + public Task InitializeAsync() + { + return _kafkaContainer.StartAsync(); + } + + public Task DisposeAsync() + { + return _kafkaContainer.DisposeAsync().AsTask(); + } + + [Fact] + [Trait(nameof(DockerCli.DockerPlatform), nameof(DockerCli.DockerPlatform.Linux))] + public async Task ConsumerReturnsProducerMessage() + { + // Given + const string topic = "sample"; + + var bootstrapServer = _kafkaContainer.GetBootstrapAddress(); + + var producerConfig = new ProducerConfig(); + producerConfig.BootstrapServers = bootstrapServer; + + var consumerConfig = new ConsumerConfig(); + consumerConfig.BootstrapServers = bootstrapServer; + consumerConfig.GroupId = "sample-consumer"; + consumerConfig.AutoOffsetReset = AutoOffsetReset.Earliest; + + var message = new Message(); + message.Value = Guid.NewGuid().ToString("D"); + + // When + using var producer = new ProducerBuilder(producerConfig).Build(); + _ = await producer.ProduceAsync(topic, message) + .ConfigureAwait(true); + + using var consumer = new ConsumerBuilder(consumerConfig).Build(); + consumer.Subscribe(topic); + + var result = consumer.Consume(TimeSpan.FromSeconds(15)); + + // Then + Assert.NotNull(result); + Assert.Equal(message.Value, result.Message.Value); + } +} \ No newline at end of file From 6f19aa7fccbf19c5cb517757786a43de029ef90d Mon Sep 17 00:00:00 2001 From: Ramil Ibrayev Date: Mon, 27 Jan 2025 14:24:41 +0100 Subject: [PATCH 2/4] Moved common logic to BaseKafkaBuilder.cs --- .../ApacheKafkaBuilder.cs | 56 +++---- src/Testcontainers.Kafka/BaseKafkaBuilder.cs | 113 ++++++++++++++ src/Testcontainers.Kafka/KafkaBuilder.cs | 138 ++++-------------- .../KafkaConfiguration.cs | 4 + src/Testcontainers.Kafka/KafkaContainer.cs | 2 +- 5 files changed, 161 insertions(+), 152 deletions(-) create mode 100644 src/Testcontainers.Kafka/BaseKafkaBuilder.cs diff --git a/src/Testcontainers.Kafka/ApacheKafkaBuilder.cs b/src/Testcontainers.Kafka/ApacheKafkaBuilder.cs index 78272b6d3..0b01ad5ca 100644 --- a/src/Testcontainers.Kafka/ApacheKafkaBuilder.cs +++ b/src/Testcontainers.Kafka/ApacheKafkaBuilder.cs @@ -4,15 +4,10 @@ namespace Testcontainers.ApacheKafka; /// [PublicAPI] -public sealed class ApacheKafkaBuilder : ContainerBuilder +public sealed class ApacheKafkaBuilder : BaseKafkaBuilder { - public const ushort KafkaPort = 9092; - public const ushort BrokerPort = 9093; - public const ushort ControllerPort = 9094; - - private const string KafkaImage = "apache/kafka:3.9.0"; - private const string StarterScript = "/testcontainers.sh"; - private const string KafkaNodeId = "1"; + public override string KafkaImage => "apache/kafka:3.9.0"; + protected override string ReadyMessage => ".*Transitioning from RECOVERY to RUNNING.*"; /// /// Initializes a new instance of the class. @@ -37,7 +32,7 @@ public override KafkaContainer Build() Validate(); return new KafkaContainer(DockerResourceConfiguration); } - + /// protected override KafkaConfiguration DockerResourceConfiguration { get; } @@ -45,38 +40,21 @@ public override KafkaContainer Build() protected override ApacheKafkaBuilder Init() { return base.Init() - .WithImage(KafkaImage) - .WithPortBinding(KafkaPort, true) - .WithEnvironment("KAFKA_LISTENERS", $"PLAINTEXT://:{KafkaPort},BROKER://:{BrokerPort},CONTROLLER://:{ControllerPort}") - .WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT") - .WithEnvironment("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER") .WithEnvironment("KAFKA_PROCESS_ROLES", "broker,controller") - .WithEnvironment("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER") - .WithEnvironment("KAFKA_NODE_ID", KafkaNodeId) - .WithEnvironment("KAFKA_CONTROLLER_QUORUM_VOTERS", $"{KafkaNodeId}@localhost:{ControllerPort}") - .WithEnvironment("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1") - .WithEnvironment("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1") - .WithEnvironment("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1") - .WithEnvironment("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1") - .WithEnvironment("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", long.MaxValue.ToString()) - .WithEnvironment("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0") - .WithEntrypoint("/bin/sh", "-c") - .WithCommand($"while [ ! -f {StarterScript} ]; do sleep 0.1; done; {StarterScript}") - .WithWaitStrategy(Wait.ForUnixContainer().UntilMessageIsLogged(".*Transitioning from RECOVERY to RUNNING.*")) - .WithStartupCallback((container, ct) => - { - const char lf = '\n'; - var startupScript = new StringBuilder(); - startupScript.Append("#!/bin/bash"); - startupScript.Append(lf); + .WithEnvironment("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER"); + } - var brokerListener = $"BROKER://{container.IpAddress}:{BrokerPort}"; - var listener = $"PLAINTEXT://{container.Hostname}:{container.GetMappedPublicPort(KafkaPort)}"; - startupScript.Append($"export KAFKA_ADVERTISED_LISTENERS={listener},{brokerListener}"); - startupScript.Append(lf); - startupScript.Append("exec /etc/kafka/docker/run"); - return container.CopyAsync(Encoding.Default.GetBytes(startupScript.ToString()), StarterScript, Unix.FileMode755, ct); - }); + protected override string GetStartupScript(KafkaContainer container) + { + var additionalAdvertisedListeners = string.Join(",", container.AdvertisedListeners ?? Array.Empty()); + var kafkaListener = $"PLAINTEXT://{container.Hostname}:{container.GetMappedPublicPort(KafkaConfiguration.KafkaPort)}"; + var brokerListener = $"BROKER://{container.IpAddress}:{KafkaConfiguration.BrokerPort}"; + return $""" + #!/bin/bash + export KAFKA_ADVERTISED_LISTENERS={kafkaListener},{brokerListener},{additionalAdvertisedListeners} + echo '' > /etc/confluent/docker/ensure + exec /etc/confluent/docker/run + """; } /// diff --git a/src/Testcontainers.Kafka/BaseKafkaBuilder.cs b/src/Testcontainers.Kafka/BaseKafkaBuilder.cs new file mode 100644 index 000000000..ffb2ea8b7 --- /dev/null +++ b/src/Testcontainers.Kafka/BaseKafkaBuilder.cs @@ -0,0 +1,113 @@ +namespace Testcontainers.Kafka; + +/// +[PublicAPI] +public abstract class BaseKafkaBuilder : ContainerBuilder + where TBuilderEntity : BaseKafkaBuilder +{ + public abstract string KafkaImage { get; } + public const string StartupScriptPath = "/testcontainers.sh"; + + protected abstract string ReadyMessage { get; } + + protected abstract string GetStartupScript(KafkaContainer container); + + private const string ProtocolPrefix = "TC"; + + /// + /// Initializes a new instance of the class. + /// + /// The Docker resource configuration. + protected BaseKafkaBuilder(KafkaConfiguration resourceConfiguration) : base(resourceConfiguration) + { + } + + /// + public override KafkaContainer Build() + { + Validate(); + return new KafkaContainer(DockerResourceConfiguration); + } + + /// + /// Adds a listener to the Kafka configuration in the format host:port. + /// + /// + /// The host will be included as a network alias, allowing additional connections + /// to the Kafka broker within the same container network. + /// + /// This method is useful for registering custom listeners beyond the default ones, + /// enabling specific connection points for Kafka brokers. + /// + /// Default listeners include: + /// - PLAINTEXT://:9092 + /// - BROKER://:9093 + /// - CONTROLLER://:9094 + /// + /// The MsSql database. + /// A configured instance of . + public TBuilderEntity WithListener(string kafka) + { + var index = DockerResourceConfiguration.Listeners?.Count() ?? 0; + var protocol = $"{ProtocolPrefix}-{index}"; + var listener = $"{protocol}://{kafka}"; + var listenerSecurityProtocolMap = $"{protocol}:PLAINTEXT"; + + var listeners = new[] { listener }; + var listenersSecurityProtocolMap = new[] { listenerSecurityProtocolMap }; + + var host = kafka.Split(':')[0]; + + var updatedListeners = DockerResourceConfiguration.Environments["KAFKA_LISTENERS"] + .Split(',') + .Concat(listeners); + + var updatedListenersSecurityProtocolMap = DockerResourceConfiguration.Environments["KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"] + .Split(',') + .Concat(listenersSecurityProtocolMap); + + return Merge(DockerResourceConfiguration, new KafkaConfiguration(listeners, listeners)) + .WithEnvironment("KAFKA_LISTENERS", string.Join(",", updatedListeners)) + .WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", string.Join(",", updatedListenersSecurityProtocolMap)) + .WithNetworkAliases(host); + } + + /// + protected override TBuilderEntity Init() + { + return base.Init() + .WithImage(KafkaImage) + .WithPortBinding(KafkaConfiguration.KafkaPort, true) + .WithPortBinding(KafkaConfiguration.BrokerPort, true) + .WithEnvironment("KAFKA_LISTENERS", + $"PLAINTEXT://:{KafkaConfiguration.KafkaPort},BROKER://:{KafkaConfiguration.BrokerPort},CONTROLLER://:{KafkaConfiguration.ControllerPort}") + .WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT") + .WithEnvironment("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER") + .WithEnvironment("KAFKA_BROKER_ID", "1") + .WithEnvironment("KAFKA_NODE_ID", "1") + .WithEnvironment("KAFKA_CONTROLLER_QUORUM_VOTERS", "1@localhost:" + KafkaConfiguration.ControllerPort) + .WithEnvironment("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1") + .WithEnvironment("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1") + .WithEnvironment("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1") + .WithEnvironment("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1") + .WithEnvironment("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", long.MaxValue.ToString()) + .WithEnvironment("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0") + .WithEntrypoint("/bin/sh", "-c") + .WithCommand("while [ ! -f " + StartupScriptPath + " ]; do sleep 0.1; done; " + StartupScriptPath) + .WithWaitStrategy(Wait.ForUnixContainer().UntilMessageIsLogged(ReadyMessage)) + .WithStartupCallback((container, ct) => + container.CopyAsync(Encoding.Default.GetBytes(GetStartupScript(container)), StartupScriptPath, Unix.FileMode755, ct)); + } + + /// + protected override TBuilderEntity Clone(IResourceConfiguration resourceConfiguration) + { + return Merge(DockerResourceConfiguration, new KafkaConfiguration(resourceConfiguration)); + } + + /// + protected override TBuilderEntity Clone(IContainerConfiguration resourceConfiguration) + { + return Merge(DockerResourceConfiguration, new KafkaConfiguration(resourceConfiguration)); + } +} \ No newline at end of file diff --git a/src/Testcontainers.Kafka/KafkaBuilder.cs b/src/Testcontainers.Kafka/KafkaBuilder.cs index efa81c439..6e2240fd4 100644 --- a/src/Testcontainers.Kafka/KafkaBuilder.cs +++ b/src/Testcontainers.Kafka/KafkaBuilder.cs @@ -2,27 +2,26 @@ namespace Testcontainers.Kafka; /// [PublicAPI] -public sealed class KafkaBuilder : ContainerBuilder +public sealed class KafkaBuilder : BaseKafkaBuilder { - public const string KafkaImage = "confluentinc/cp-kafka:6.1.9"; - - public const ushort KafkaPort = 9092; - + [Obsolete("This constant will be removed. Please use KafkaConfiguration.BrokerPort instead")] public const ushort BrokerPort = 9093; + [Obsolete("This constant will be removed. Please use KafkaConfiguration.ControllerPort instead")] public const ushort ControllerPort = 9094; - public const ushort ZookeeperPort = 2181; - + [Obsolete("This constant will be removed. Please use BaseKafkaBuilder.StartupScriptPath instead")] public const string StartupScriptFilePath = "/testcontainers.sh"; + + public const ushort ZookeeperPort = 2181; + public override string KafkaImage => "confluentinc/cp-kafka:6.1.9"; - private const string ProtocolPrefix = "TC"; + protected override string ReadyMessage => @"\[KafkaServer id=\d+\] started"; /// /// Initializes a new instance of the class. /// - public KafkaBuilder() - : this(new KafkaConfiguration()) + public KafkaBuilder() : this(new KafkaConfiguration()) { DockerResourceConfiguration = Init().DockerResourceConfiguration; } @@ -39,117 +38,32 @@ private KafkaBuilder(KafkaConfiguration resourceConfiguration) /// protected override KafkaConfiguration DockerResourceConfiguration { get; } - - /// - public override KafkaContainer Build() - { - Validate(); - return new KafkaContainer(DockerResourceConfiguration); - } - - /// - /// Adds a listener to the Kafka configuration in the format host:port. - /// - /// - /// The host will be included as a network alias, allowing additional connections - /// to the Kafka broker within the same container network. - /// - /// This method is useful for registering custom listeners beyond the default ones, - /// enabling specific connection points for Kafka brokers. - /// - /// Default listeners include: - /// - PLAINTEXT://0.0.0.0:9092 - /// - BROKER://0.0.0.0:9093 - /// - CONTROLLER://0.0.0.0:9094 - /// - /// The MsSql database. - /// A configured instance of . - public KafkaBuilder WithListener(string kafka) - { - var index = DockerResourceConfiguration.Listeners?.Count() ?? 0; - var protocol = $"{ProtocolPrefix}-{index}"; - var listener = $"{protocol}://{kafka}"; - var listenerSecurityProtocolMap = $"{protocol}:PLAINTEXT"; - - var listeners = new[] { listener }; - var listenersSecurityProtocolMap = new[] { listenerSecurityProtocolMap }; - - var host = kafka.Split(':')[0]; - - var updatedListeners = DockerResourceConfiguration.Environments["KAFKA_LISTENERS"] - .Split(',') - .Concat(listeners); - - var updatedListenersSecurityProtocolMap = DockerResourceConfiguration.Environments["KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"] - .Split(',') - .Concat(listenersSecurityProtocolMap); - - return Merge(DockerResourceConfiguration, new KafkaConfiguration(listeners, listeners)) - .WithEnvironment("KAFKA_LISTENERS", string.Join(",", updatedListeners)) - .WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", string.Join(",", updatedListenersSecurityProtocolMap)) - .WithNetworkAliases(host); - } - + /// protected override KafkaBuilder Init() { return base.Init() - .WithImage(KafkaImage) - .WithPortBinding(KafkaPort, true) - .WithPortBinding(BrokerPort, true) .WithPortBinding(ZookeeperPort, true) - .WithEnvironment("KAFKA_LISTENERS", $"PLAINTEXT://0.0.0.0:{KafkaPort},BROKER://0.0.0.0:{BrokerPort},CONTROLLER://0.0.0.0:{ControllerPort}") - .WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT") - .WithEnvironment("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER") - .WithEnvironment("KAFKA_BROKER_ID", "1") - .WithEnvironment("KAFKA_NODE_ID", "1") - .WithEnvironment("KAFKA_CONTROLLER_QUORUM_VOTERS", "1@localhost:" + ControllerPort) - .WithEnvironment("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1") - .WithEnvironment("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1") - .WithEnvironment("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1") - .WithEnvironment("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1") - .WithEnvironment("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", long.MaxValue.ToString()) - .WithEnvironment("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0") - .WithEnvironment("KAFKA_ZOOKEEPER_CONNECT", "localhost:" + ZookeeperPort) - .WithEntrypoint("/bin/sh", "-c") - .WithCommand("while [ ! -f " + StartupScriptFilePath + " ]; do sleep 0.1; done; " + StartupScriptFilePath) - .WithWaitStrategy(Wait.ForUnixContainer().UntilMessageIsLogged("\\[KafkaServer id=\\d+\\] started")) - .WithStartupCallback((container, ct) => - { - const char lf = '\n'; - var additionalAdvertisedListeners = string.Join(",", container.AdvertisedListeners ?? Array.Empty()); - var startupScript = new StringBuilder(); - startupScript.Append("#!/bin/bash"); - startupScript.Append(lf); - startupScript.Append("echo 'clientPort=" + ZookeeperPort + "' > zookeeper.properties"); - startupScript.Append(lf); - startupScript.Append("echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties"); - startupScript.Append(lf); - startupScript.Append("echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties"); - startupScript.Append(lf); - startupScript.Append("zookeeper-server-start zookeeper.properties &"); - startupScript.Append(lf); - startupScript.Append("export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://" + container.Hostname + ":" + container.GetMappedPublicPort(KafkaPort) + ",BROKER://" + container.IpAddress + ":" + BrokerPort + "," + additionalAdvertisedListeners); - startupScript.Append(lf); - startupScript.Append("echo '' > /etc/confluent/docker/ensure"); - startupScript.Append(lf); - startupScript.Append("exec /etc/confluent/docker/run"); - return container.CopyAsync(Encoding.Default.GetBytes(startupScript.ToString()), StartupScriptFilePath, Unix.FileMode755, ct); - }); + .WithEnvironment("KAFKA_ZOOKEEPER_CONNECT", "localhost:" + ZookeeperPort); } - /// - protected override KafkaBuilder Clone(IResourceConfiguration resourceConfiguration) - { - return Merge(DockerResourceConfiguration, new KafkaConfiguration(resourceConfiguration)); - } - - /// - protected override KafkaBuilder Clone(IContainerConfiguration resourceConfiguration) + protected override string GetStartupScript(KafkaContainer container) { - return Merge(DockerResourceConfiguration, new KafkaConfiguration(resourceConfiguration)); + var additionalAdvertisedListeners = string.Join(",", container.AdvertisedListeners ?? Array.Empty()); + var kafkaListener = $"PLAINTEXT://{container.Hostname}:{container.GetMappedPublicPort(KafkaConfiguration.KafkaPort)}"; + var brokerListener = $"BROKER://{container.IpAddress}:{KafkaConfiguration.BrokerPort}"; + return $""" + #!/bin/bash + echo 'clientPort={ZookeeperPort}' > zookeeper.properties + echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties + echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties + zookeeper-server-start zookeeper.properties & + export KAFKA_ADVERTISED_LISTENERS={kafkaListener},{brokerListener},{additionalAdvertisedListeners} + echo '' > /etc/confluent/docker/ensure + exec /etc/confluent/docker/run + """; } - + /// protected override KafkaBuilder Merge(KafkaConfiguration oldValue, KafkaConfiguration newValue) { diff --git a/src/Testcontainers.Kafka/KafkaConfiguration.cs b/src/Testcontainers.Kafka/KafkaConfiguration.cs index 7299e3488..0047efddf 100644 --- a/src/Testcontainers.Kafka/KafkaConfiguration.cs +++ b/src/Testcontainers.Kafka/KafkaConfiguration.cs @@ -4,6 +4,10 @@ namespace Testcontainers.Kafka; [PublicAPI] public sealed class KafkaConfiguration : ContainerConfiguration { + public const ushort KafkaPort = 9092; + public const ushort BrokerPort = 9093; + public const ushort ControllerPort = 9094; + /// /// Initializes a new instance of the class. /// diff --git a/src/Testcontainers.Kafka/KafkaContainer.cs b/src/Testcontainers.Kafka/KafkaContainer.cs index 89a20db5b..3aca18fab 100644 --- a/src/Testcontainers.Kafka/KafkaContainer.cs +++ b/src/Testcontainers.Kafka/KafkaContainer.cs @@ -22,7 +22,7 @@ public KafkaContainer(KafkaConfiguration configuration) /// The broker address. public string GetBootstrapAddress() { - return new UriBuilder("PLAINTEXT", Hostname, GetMappedPublicPort(KafkaBuilder.KafkaPort)).ToString(); + return new UriBuilder("PLAINTEXT", Hostname, GetMappedPublicPort(KafkaConfiguration.KafkaPort)).ToString(); } /// From 3a4c4a74f03bc987b4b739bce5500684ef6c1d7a Mon Sep 17 00:00:00 2001 From: Ramil Ibrayev Date: Mon, 27 Jan 2025 15:05:11 +0100 Subject: [PATCH 3/4] Fixed ApacheKafkaBuilder.cs StartupScript --- src/Testcontainers.Kafka/ApacheKafkaBuilder.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Testcontainers.Kafka/ApacheKafkaBuilder.cs b/src/Testcontainers.Kafka/ApacheKafkaBuilder.cs index 0b01ad5ca..ed202cb93 100644 --- a/src/Testcontainers.Kafka/ApacheKafkaBuilder.cs +++ b/src/Testcontainers.Kafka/ApacheKafkaBuilder.cs @@ -52,8 +52,8 @@ protected override string GetStartupScript(KafkaContainer container) return $""" #!/bin/bash export KAFKA_ADVERTISED_LISTENERS={kafkaListener},{brokerListener},{additionalAdvertisedListeners} - echo '' > /etc/confluent/docker/ensure - exec /etc/confluent/docker/run + echo '' > /etc/kafka/docker/ensure + exec /etc/kafka/docker/run """; } From e0c0dad226b703ca6a1905fd7589870841c325e9 Mon Sep 17 00:00:00 2001 From: Ramil Ibrayev Date: Tue, 28 Jan 2025 09:00:06 +0100 Subject: [PATCH 4/4] Removed redundant overridings --- .../ApacheKafkaBuilder.cs | 23 ++----------------- 1 file changed, 2 insertions(+), 21 deletions(-) diff --git a/src/Testcontainers.Kafka/ApacheKafkaBuilder.cs b/src/Testcontainers.Kafka/ApacheKafkaBuilder.cs index ed202cb93..704f87fd1 100644 --- a/src/Testcontainers.Kafka/ApacheKafkaBuilder.cs +++ b/src/Testcontainers.Kafka/ApacheKafkaBuilder.cs @@ -25,14 +25,7 @@ private ApacheKafkaBuilder(KafkaConfiguration resourceConfiguration) : base(reso { DockerResourceConfiguration = resourceConfiguration; } - - /// - public override KafkaContainer Build() - { - Validate(); - return new KafkaContainer(DockerResourceConfiguration); - } - + /// protected override KafkaConfiguration DockerResourceConfiguration { get; } @@ -56,19 +49,7 @@ protected override string GetStartupScript(KafkaContainer container) exec /etc/kafka/docker/run """; } - - /// - protected override ApacheKafkaBuilder Clone(IResourceConfiguration resourceConfiguration) - { - return Merge(DockerResourceConfiguration, new KafkaConfiguration(resourceConfiguration)); - } - - /// - protected override ApacheKafkaBuilder Clone(IContainerConfiguration resourceConfiguration) - { - return Merge(DockerResourceConfiguration, new KafkaConfiguration(resourceConfiguration)); - } - + /// protected override ApacheKafkaBuilder Merge(KafkaConfiguration oldValue, KafkaConfiguration newValue) {