Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Apache Kafka (KRaft) builder (#1347) #1348

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions src/Testcontainers.Kafka/ApacheKafkaBuilder.cs
IbrayevRamil marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
using Testcontainers.Kafka;

namespace Testcontainers.ApacheKafka;

/// <inheritdoc cref="ContainerBuilder{TBuilderEntity, TContainerEntity, TConfigurationEntity}" />
[PublicAPI]
public sealed class ApacheKafkaBuilder : BaseKafkaBuilder<ApacheKafkaBuilder>
{
public override string KafkaImage => "apache/kafka:3.9.0";
protected override string ReadyMessage => ".*Transitioning from RECOVERY to RUNNING.*";

/// <summary>
/// Initializes a new instance of the <see cref="ApacheKafkaBuilder" /> class.
/// </summary>
public ApacheKafkaBuilder() : this(new KafkaConfiguration())
{
DockerResourceConfiguration = Init().DockerResourceConfiguration;
}

/// <summary>
/// Initializes a new instance of the <see cref="ApacheKafkaBuilder" /> class.
/// </summary>
/// <param name="resourceConfiguration">The Docker resource configuration.</param>
private ApacheKafkaBuilder(KafkaConfiguration resourceConfiguration) : base(resourceConfiguration)
{
DockerResourceConfiguration = resourceConfiguration;
}

/// <inheritdoc />
protected override KafkaConfiguration DockerResourceConfiguration { get; }

/// <inheritdoc />
protected override ApacheKafkaBuilder Init()
{
return base.Init()
.WithEnvironment("KAFKA_PROCESS_ROLES", "broker,controller")
.WithEnvironment("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER");
}

protected override string GetStartupScript(KafkaContainer container)
{
var additionalAdvertisedListeners = string.Join(",", container.AdvertisedListeners ?? Array.Empty<string>());
var kafkaListener = $"PLAINTEXT://{container.Hostname}:{container.GetMappedPublicPort(KafkaConfiguration.KafkaPort)}";
var brokerListener = $"BROKER://{container.IpAddress}:{KafkaConfiguration.BrokerPort}";
return $"""
#!/bin/bash
export KAFKA_ADVERTISED_LISTENERS={kafkaListener},{brokerListener},{additionalAdvertisedListeners}
echo '' > /etc/kafka/docker/ensure
exec /etc/kafka/docker/run
""";
}

/// <inheritdoc />
protected override ApacheKafkaBuilder Merge(KafkaConfiguration oldValue, KafkaConfiguration newValue)
{
return new ApacheKafkaBuilder(new KafkaConfiguration(oldValue, newValue));
}
}
113 changes: 113 additions & 0 deletions src/Testcontainers.Kafka/BaseKafkaBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
namespace Testcontainers.Kafka;

/// <inheritdoc cref="ContainerBuilder{TBuilderEntity, TContainerEntity, TConfigurationEntity}" />
[PublicAPI]
public abstract class BaseKafkaBuilder<TBuilderEntity> : ContainerBuilder<TBuilderEntity, KafkaContainer, KafkaConfiguration>
where TBuilderEntity : BaseKafkaBuilder<TBuilderEntity>
{
public abstract string KafkaImage { get; }
public const string StartupScriptPath = "/testcontainers.sh";

protected abstract string ReadyMessage { get; }

protected abstract string GetStartupScript(KafkaContainer container);

private const string ProtocolPrefix = "TC";

/// <summary>
/// Initializes a new instance of the <see cref="BaseKafkaBuilder{TBuilderEntity}" /> class.
/// </summary>
/// <param name="resourceConfiguration">The Docker resource configuration.</param>
protected BaseKafkaBuilder(KafkaConfiguration resourceConfiguration) : base(resourceConfiguration)
{
}

/// <inheritdoc />
public override KafkaContainer Build()
{
Validate();
return new KafkaContainer(DockerResourceConfiguration);
}

/// <summary>
/// Adds a listener to the Kafka configuration in the format <c>host:port</c>.
/// </summary>
/// <remarks>
/// The host will be included as a network alias, allowing additional connections
/// to the Kafka broker within the same container network.
///
/// This method is useful for registering custom listeners beyond the default ones,
/// enabling specific connection points for Kafka brokers.
///
/// Default listeners include:
/// - <c>PLAINTEXT://:9092</c>
/// - <c>BROKER://:9093</c>
/// - <c>CONTROLLER://:9094</c>
/// </remarks>
/// <param name="kafka">The MsSql database.</param>
/// <returns>A configured instance of <see cref="BaseKafkaBuilder{TBuilderEntity}" />.</returns>
public TBuilderEntity WithListener(string kafka)
{
var index = DockerResourceConfiguration.Listeners?.Count() ?? 0;
var protocol = $"{ProtocolPrefix}-{index}";
var listener = $"{protocol}://{kafka}";
var listenerSecurityProtocolMap = $"{protocol}:PLAINTEXT";

var listeners = new[] { listener };
var listenersSecurityProtocolMap = new[] { listenerSecurityProtocolMap };

var host = kafka.Split(':')[0];

var updatedListeners = DockerResourceConfiguration.Environments["KAFKA_LISTENERS"]
.Split(',')
.Concat(listeners);

var updatedListenersSecurityProtocolMap = DockerResourceConfiguration.Environments["KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"]
.Split(',')
.Concat(listenersSecurityProtocolMap);

return Merge(DockerResourceConfiguration, new KafkaConfiguration(listeners, listeners))
.WithEnvironment("KAFKA_LISTENERS", string.Join(",", updatedListeners))
.WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", string.Join(",", updatedListenersSecurityProtocolMap))
.WithNetworkAliases(host);
}

/// <inheritdoc />
protected override TBuilderEntity Init()
{
return base.Init()
.WithImage(KafkaImage)
.WithPortBinding(KafkaConfiguration.KafkaPort, true)
.WithPortBinding(KafkaConfiguration.BrokerPort, true)
.WithEnvironment("KAFKA_LISTENERS",
$"PLAINTEXT://:{KafkaConfiguration.KafkaPort},BROKER://:{KafkaConfiguration.BrokerPort},CONTROLLER://:{KafkaConfiguration.ControllerPort}")
.WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
.WithEnvironment("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER")
.WithEnvironment("KAFKA_BROKER_ID", "1")
.WithEnvironment("KAFKA_NODE_ID", "1")
.WithEnvironment("KAFKA_CONTROLLER_QUORUM_VOTERS", "1@localhost:" + KafkaConfiguration.ControllerPort)
.WithEnvironment("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
.WithEnvironment("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1")
.WithEnvironment("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
.WithEnvironment("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
.WithEnvironment("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", long.MaxValue.ToString())
.WithEnvironment("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0")
.WithEntrypoint("/bin/sh", "-c")
.WithCommand("while [ ! -f " + StartupScriptPath + " ]; do sleep 0.1; done; " + StartupScriptPath)
.WithWaitStrategy(Wait.ForUnixContainer().UntilMessageIsLogged(ReadyMessage))
.WithStartupCallback((container, ct) =>
container.CopyAsync(Encoding.Default.GetBytes(GetStartupScript(container)), StartupScriptPath, Unix.FileMode755, ct));
}

/// <inheritdoc />
protected override TBuilderEntity Clone(IResourceConfiguration<CreateContainerParameters> resourceConfiguration)
{
return Merge(DockerResourceConfiguration, new KafkaConfiguration(resourceConfiguration));
}

/// <inheritdoc />
protected override TBuilderEntity Clone(IContainerConfiguration resourceConfiguration)
{
return Merge(DockerResourceConfiguration, new KafkaConfiguration(resourceConfiguration));
}
}
138 changes: 26 additions & 112 deletions src/Testcontainers.Kafka/KafkaBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,26 @@ namespace Testcontainers.Kafka;

/// <inheritdoc cref="ContainerBuilder{TBuilderEntity, TContainerEntity, TConfigurationEntity}" />
[PublicAPI]
public sealed class KafkaBuilder : ContainerBuilder<KafkaBuilder, KafkaContainer, KafkaConfiguration>
public sealed class KafkaBuilder : BaseKafkaBuilder<KafkaBuilder>
{
public const string KafkaImage = "confluentinc/cp-kafka:6.1.9";

public const ushort KafkaPort = 9092;

[Obsolete("This constant will be removed. Please use KafkaConfiguration.BrokerPort instead")]
public const ushort BrokerPort = 9093;

[Obsolete("This constant will be removed. Please use KafkaConfiguration.ControllerPort instead")]
public const ushort ControllerPort = 9094;

public const ushort ZookeeperPort = 2181;

[Obsolete("This constant will be removed. Please use BaseKafkaBuilder.StartupScriptPath instead")]
public const string StartupScriptFilePath = "/testcontainers.sh";

public const ushort ZookeeperPort = 2181;
public override string KafkaImage => "confluentinc/cp-kafka:6.1.9";

private const string ProtocolPrefix = "TC";
protected override string ReadyMessage => @"\[KafkaServer id=\d+\] started";

/// <summary>
/// Initializes a new instance of the <see cref="KafkaBuilder" /> class.
/// </summary>
public KafkaBuilder()
: this(new KafkaConfiguration())
public KafkaBuilder() : this(new KafkaConfiguration())
{
DockerResourceConfiguration = Init().DockerResourceConfiguration;
}
Expand All @@ -39,117 +38,32 @@ private KafkaBuilder(KafkaConfiguration resourceConfiguration)

/// <inheritdoc />
protected override KafkaConfiguration DockerResourceConfiguration { get; }

/// <inheritdoc />
public override KafkaContainer Build()
{
Validate();
return new KafkaContainer(DockerResourceConfiguration);
}

/// <summary>
/// Adds a listener to the Kafka configuration in the format <c>host:port</c>.
/// </summary>
/// <remarks>
/// The host will be included as a network alias, allowing additional connections
/// to the Kafka broker within the same container network.
///
/// This method is useful for registering custom listeners beyond the default ones,
/// enabling specific connection points for Kafka brokers.
///
/// Default listeners include:
/// - <c>PLAINTEXT://0.0.0.0:9092</c>
/// - <c>BROKER://0.0.0.0:9093</c>
/// - <c>CONTROLLER://0.0.0.0:9094</c>
/// </remarks>
/// <param name="kafka">The MsSql database.</param>
/// <returns>A configured instance of <see cref="KafkaBuilder" />.</returns>
public KafkaBuilder WithListener(string kafka)
{
var index = DockerResourceConfiguration.Listeners?.Count() ?? 0;
var protocol = $"{ProtocolPrefix}-{index}";
var listener = $"{protocol}://{kafka}";
var listenerSecurityProtocolMap = $"{protocol}:PLAINTEXT";

var listeners = new[] { listener };
var listenersSecurityProtocolMap = new[] { listenerSecurityProtocolMap };

var host = kafka.Split(':')[0];

var updatedListeners = DockerResourceConfiguration.Environments["KAFKA_LISTENERS"]
.Split(',')
.Concat(listeners);

var updatedListenersSecurityProtocolMap = DockerResourceConfiguration.Environments["KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"]
.Split(',')
.Concat(listenersSecurityProtocolMap);

return Merge(DockerResourceConfiguration, new KafkaConfiguration(listeners, listeners))
.WithEnvironment("KAFKA_LISTENERS", string.Join(",", updatedListeners))
.WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", string.Join(",", updatedListenersSecurityProtocolMap))
.WithNetworkAliases(host);
}


/// <inheritdoc />
protected override KafkaBuilder Init()
{
return base.Init()
.WithImage(KafkaImage)
.WithPortBinding(KafkaPort, true)
.WithPortBinding(BrokerPort, true)
.WithPortBinding(ZookeeperPort, true)
.WithEnvironment("KAFKA_LISTENERS", $"PLAINTEXT://0.0.0.0:{KafkaPort},BROKER://0.0.0.0:{BrokerPort},CONTROLLER://0.0.0.0:{ControllerPort}")
.WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
.WithEnvironment("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER")
.WithEnvironment("KAFKA_BROKER_ID", "1")
.WithEnvironment("KAFKA_NODE_ID", "1")
.WithEnvironment("KAFKA_CONTROLLER_QUORUM_VOTERS", "1@localhost:" + ControllerPort)
.WithEnvironment("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
.WithEnvironment("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1")
.WithEnvironment("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
.WithEnvironment("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
.WithEnvironment("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", long.MaxValue.ToString())
.WithEnvironment("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0")
.WithEnvironment("KAFKA_ZOOKEEPER_CONNECT", "localhost:" + ZookeeperPort)
.WithEntrypoint("/bin/sh", "-c")
.WithCommand("while [ ! -f " + StartupScriptFilePath + " ]; do sleep 0.1; done; " + StartupScriptFilePath)
.WithWaitStrategy(Wait.ForUnixContainer().UntilMessageIsLogged("\\[KafkaServer id=\\d+\\] started"))
.WithStartupCallback((container, ct) =>
{
const char lf = '\n';
var additionalAdvertisedListeners = string.Join(",", container.AdvertisedListeners ?? Array.Empty<string>());
var startupScript = new StringBuilder();
startupScript.Append("#!/bin/bash");
startupScript.Append(lf);
startupScript.Append("echo 'clientPort=" + ZookeeperPort + "' > zookeeper.properties");
startupScript.Append(lf);
startupScript.Append("echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties");
startupScript.Append(lf);
startupScript.Append("echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties");
startupScript.Append(lf);
startupScript.Append("zookeeper-server-start zookeeper.properties &");
startupScript.Append(lf);
startupScript.Append("export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://" + container.Hostname + ":" + container.GetMappedPublicPort(KafkaPort) + ",BROKER://" + container.IpAddress + ":" + BrokerPort + "," + additionalAdvertisedListeners);
startupScript.Append(lf);
startupScript.Append("echo '' > /etc/confluent/docker/ensure");
startupScript.Append(lf);
startupScript.Append("exec /etc/confluent/docker/run");
return container.CopyAsync(Encoding.Default.GetBytes(startupScript.ToString()), StartupScriptFilePath, Unix.FileMode755, ct);
});
.WithEnvironment("KAFKA_ZOOKEEPER_CONNECT", "localhost:" + ZookeeperPort);
}

/// <inheritdoc />
protected override KafkaBuilder Clone(IResourceConfiguration<CreateContainerParameters> resourceConfiguration)
{
return Merge(DockerResourceConfiguration, new KafkaConfiguration(resourceConfiguration));
}

/// <inheritdoc />
protected override KafkaBuilder Clone(IContainerConfiguration resourceConfiguration)
protected override string GetStartupScript(KafkaContainer container)
{
return Merge(DockerResourceConfiguration, new KafkaConfiguration(resourceConfiguration));
var additionalAdvertisedListeners = string.Join(",", container.AdvertisedListeners ?? Array.Empty<string>());
var kafkaListener = $"PLAINTEXT://{container.Hostname}:{container.GetMappedPublicPort(KafkaConfiguration.KafkaPort)}";
var brokerListener = $"BROKER://{container.IpAddress}:{KafkaConfiguration.BrokerPort}";
return $"""
#!/bin/bash
echo 'clientPort={ZookeeperPort}' > zookeeper.properties
echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties
echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties
zookeeper-server-start zookeeper.properties &
export KAFKA_ADVERTISED_LISTENERS={kafkaListener},{brokerListener},{additionalAdvertisedListeners}
echo '' > /etc/confluent/docker/ensure
exec /etc/confluent/docker/run
""";
}

/// <inheritdoc />
protected override KafkaBuilder Merge(KafkaConfiguration oldValue, KafkaConfiguration newValue)
{
Expand Down
4 changes: 4 additions & 0 deletions src/Testcontainers.Kafka/KafkaConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ namespace Testcontainers.Kafka;
[PublicAPI]
public sealed class KafkaConfiguration : ContainerConfiguration
{
public const ushort KafkaPort = 9092;
public const ushort BrokerPort = 9093;
public const ushort ControllerPort = 9094;

/// <summary>
/// Initializes a new instance of the <see cref="KafkaConfiguration" /> class.
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion src/Testcontainers.Kafka/KafkaContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public KafkaContainer(KafkaConfiguration configuration)
/// <returns>The broker address.</returns>
public string GetBootstrapAddress()
{
return new UriBuilder("PLAINTEXT", Hostname, GetMappedPublicPort(KafkaBuilder.KafkaPort)).ToString();
return new UriBuilder("PLAINTEXT", Hostname, GetMappedPublicPort(KafkaConfiguration.KafkaPort)).ToString();
}

/// <summary>
Expand Down
Loading