Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Apache Kafka (KRaft) builder (#1347) #1348

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions src/Testcontainers.Kafka/ApacheKafkaBuilder.cs
IbrayevRamil marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
using Testcontainers.Kafka;

namespace Testcontainers.ApacheKafka;

/// <inheritdoc cref="ContainerBuilder{TBuilderEntity, TContainerEntity, TConfigurationEntity}" />
[PublicAPI]
public sealed class ApacheKafkaBuilder : ContainerBuilder<ApacheKafkaBuilder, KafkaContainer, KafkaConfiguration>
{
public const ushort KafkaPort = 9092;
public const ushort BrokerPort = 9093;
public const ushort ControllerPort = 9094;

private const string KafkaImage = "apache/kafka:3.9.0";
private const string StarterScript = "/testcontainers.sh";
private const string KafkaNodeId = "1";

/// <summary>
/// Initializes a new instance of the <see cref="ApacheKafkaBuilder" /> class.
/// </summary>
public ApacheKafkaBuilder() : this(new KafkaConfiguration())
{
DockerResourceConfiguration = Init().DockerResourceConfiguration;
}

/// <summary>
/// Initializes a new instance of the <see cref="ApacheKafkaBuilder" /> class.
/// </summary>
/// <param name="resourceConfiguration">The Docker resource configuration.</param>
private ApacheKafkaBuilder(KafkaConfiguration resourceConfiguration) : base(resourceConfiguration)
{
DockerResourceConfiguration = resourceConfiguration;
}

/// <inheritdoc />
public override KafkaContainer Build()
{
Validate();
return new KafkaContainer(DockerResourceConfiguration);
}

/// <inheritdoc />
protected override KafkaConfiguration DockerResourceConfiguration { get; }

/// <inheritdoc />
protected override ApacheKafkaBuilder Init()
{
return base.Init()
.WithImage(KafkaImage)
.WithPortBinding(KafkaPort, true)
.WithEnvironment("KAFKA_LISTENERS", $"PLAINTEXT://:{KafkaPort},BROKER://:{BrokerPort},CONTROLLER://:{ControllerPort}")
.WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT")
.WithEnvironment("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER")
.WithEnvironment("KAFKA_PROCESS_ROLES", "broker,controller")
.WithEnvironment("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER")
.WithEnvironment("KAFKA_NODE_ID", KafkaNodeId)
.WithEnvironment("KAFKA_CONTROLLER_QUORUM_VOTERS", $"{KafkaNodeId}@localhost:{ControllerPort}")
.WithEnvironment("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
.WithEnvironment("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1")
.WithEnvironment("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
.WithEnvironment("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
.WithEnvironment("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", long.MaxValue.ToString())
.WithEnvironment("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0")
.WithEntrypoint("/bin/sh", "-c")
.WithCommand($"while [ ! -f {StarterScript} ]; do sleep 0.1; done; {StarterScript}")
.WithWaitStrategy(Wait.ForUnixContainer().UntilMessageIsLogged(".*Transitioning from RECOVERY to RUNNING.*"))
.WithStartupCallback((container, ct) =>
{
const char lf = '\n';
var startupScript = new StringBuilder();
startupScript.Append("#!/bin/bash");
startupScript.Append(lf);

var brokerListener = $"BROKER://{container.IpAddress}:{BrokerPort}";
var listener = $"PLAINTEXT://{container.Hostname}:{container.GetMappedPublicPort(KafkaPort)}";
startupScript.Append($"export KAFKA_ADVERTISED_LISTENERS={listener},{brokerListener}");
startupScript.Append(lf);
startupScript.Append("exec /etc/kafka/docker/run");
return container.CopyAsync(Encoding.Default.GetBytes(startupScript.ToString()), StarterScript, Unix.FileMode755, ct);
});
}

/// <inheritdoc />
protected override ApacheKafkaBuilder Clone(IResourceConfiguration<CreateContainerParameters> resourceConfiguration)
{
return Merge(DockerResourceConfiguration, new KafkaConfiguration(resourceConfiguration));
}

/// <inheritdoc />
protected override ApacheKafkaBuilder Clone(IContainerConfiguration resourceConfiguration)
{
return Merge(DockerResourceConfiguration, new KafkaConfiguration(resourceConfiguration));
}

/// <inheritdoc />
protected override ApacheKafkaBuilder Merge(KafkaConfiguration oldValue, KafkaConfiguration newValue)
{
return new ApacheKafkaBuilder(new KafkaConfiguration(oldValue, newValue));
}
}
53 changes: 53 additions & 0 deletions tests/Testcontainers.Kafka.Tests/ApacheKafkaContainerTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
using Testcontainers.ApacheKafka;

namespace Testcontainers.Kafka;

public sealed class ApacheKafkaContainerTest : IAsyncLifetime
{
private readonly KafkaContainer _kafkaContainer = new ApacheKafkaBuilder().Build();

public Task InitializeAsync()
{
return _kafkaContainer.StartAsync();
}

public Task DisposeAsync()
{
return _kafkaContainer.DisposeAsync().AsTask();
}

[Fact]
[Trait(nameof(DockerCli.DockerPlatform), nameof(DockerCli.DockerPlatform.Linux))]
public async Task ConsumerReturnsProducerMessage()
{
// Given
const string topic = "sample";

var bootstrapServer = _kafkaContainer.GetBootstrapAddress();

var producerConfig = new ProducerConfig();
producerConfig.BootstrapServers = bootstrapServer;

var consumerConfig = new ConsumerConfig();
consumerConfig.BootstrapServers = bootstrapServer;
consumerConfig.GroupId = "sample-consumer";
consumerConfig.AutoOffsetReset = AutoOffsetReset.Earliest;

var message = new Message<string, string>();
message.Value = Guid.NewGuid().ToString("D");

// When
using var producer = new ProducerBuilder<string, string>(producerConfig).Build();
_ = await producer.ProduceAsync(topic, message)
.ConfigureAwait(true);

using var consumer = new ConsumerBuilder<string, string>(consumerConfig).Build();
consumer.Subscribe(topic);

var result = consumer.Consume(TimeSpan.FromSeconds(15));

// Then
Assert.NotNull(result);
Assert.Equal(message.Value, result.Message.Value);
}
}