Skip to content

Commit 1b0b3f9

Browse files
authored
Merge pull request #11 from seilc1/feature/kafka-channels
feat: add kafka channel implementation
2 parents 777bdbc + f935ffe commit 1b0b3f9

39 files changed

+947
-128
lines changed

.github/workflows/releasing.yml

+23
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ on:
88
env:
99
project: "src/EnterpriseIntegration"
1010
rabbitmq_project: "src/EnterpriseIntegration.RabbitMQ"
11+
kafka_project: "src/EnterpriseIntegration.Kafka"
1112
latest_dotnet_version: "7.x"
1213

1314
jobs:
@@ -79,3 +80,25 @@ jobs:
7980
run: dotnet nuget push $rabbitmq_project/bin/Release/*.nupkg -k $NUGET_AUTH_TOKEN -s https://api.nuget.org/v3/index.json
8081
env:
8182
NUGET_AUTH_TOKEN: ${{ secrets.NUGET_TOKEN }}
83+
84+
release_kafka:
85+
name: Create NuGET Package (Kafka)
86+
runs-on: ubuntu-latest
87+
steps:
88+
- uses: actions/checkout@v3
89+
with:
90+
fetch-depth: '0'
91+
- name: Setup .NET Core SDK ${{ env.latest_dotnet_version }}
92+
uses: actions/setup-dotnet@v2
93+
with:
94+
dotnet-version: ${{ env.latest_dotnet_version }}
95+
- name: Install dependencies
96+
run: dotnet restore $kafka_project
97+
- name: Build
98+
run: dotnet build $kafka_project --no-restore --configuration Release
99+
- name: Create nuget package
100+
run: dotnet pack --configuration Release $kafka_project
101+
- name: Publish the package to nuget.org
102+
run: dotnet nuget push $kafka_project/bin/Release/*.nupkg -k $NUGET_AUTH_TOKEN -s https://api.nuget.org/v3/index.json
103+
env:
104+
NUGET_AUTH_TOKEN: ${{ secrets.NUGET_TOKEN }}

.github/workflows/testing.yml

+25
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ env:
1010
unit_test_project: "tests/EnterpriseIntegration.Tests"
1111
integration_test_project: "tests/EnterpriseIntegration.IntegrationTests"
1212
rabbitmq_test_project: "tests/EnterpriseIntegration.RabbitMQ.Tests"
13+
kafka_test_project: "tests/EnterpriseIntegration.Kafka.Tests"
1314
latest_dotnet_version: "7.x"
1415

1516
jobs:
@@ -77,6 +78,30 @@ jobs:
7778
run: dotnet build $rabbitmq_test_project --no-restore --configuration Release
7879
- name: Test
7980
run: dotnet test $rabbitmq_test_project --no-restore --logger:trx --results-directory "TestResults"
81+
- name: Upload dotnet test results
82+
uses: actions/upload-artifact@v3
83+
with:
84+
name: dotnet-results
85+
path: "TestResults"
86+
if: ${{ always() }}
87+
88+
kafka_test:
89+
runs-on: ubuntu-latest
90+
91+
steps:
92+
- uses: actions/checkout@v3
93+
with:
94+
fetch-depth: '0'
95+
- name: Setup .NET Core SDK ${{ env.latest_dotnet_version }}
96+
uses: actions/setup-dotnet@v2
97+
with:
98+
dotnet-version: ${{ env.latest_dotnet_version }}
99+
- name: Install dependencies
100+
run: dotnet restore $kafka_test_project
101+
- name: Build
102+
run: dotnet build $kafka_test_project --no-restore --configuration Release
103+
- name: Test
104+
run: dotnet test $kafka_test_project --no-restore --logger:trx --results-directory "TestResults"
80105
- name: Upload dotnet test results
81106
uses: actions/upload-artifact@v3
82107
with:

EnterpriseIntegration.sln

+13
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "CICD", "CICD", "{42B78836-C
3232
.github\workflows\testing.yml = .github\workflows\testing.yml
3333
EndProjectSection
3434
EndProject
35+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EnterpriseIntegration.Kafka", "src\EnterpriseIntegration.Kafka\EnterpriseIntegration.Kafka.csproj", "{BB3AE5DE-4CC9-4247-84FB-742C3919D55B}"
36+
EndProject
37+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EnterpriseIntegration.Kafka.Tests", "tests\EnterpriseIntegration.Kafka.Tests\EnterpriseIntegration.Kafka.Tests.csproj", "{1AF465CF-1AC8-44E2-A3D2-9ED4CED34975}"
38+
EndProject
3539
Global
3640
GlobalSection(SolutionConfigurationPlatforms) = preSolution
3741
Debug|Any CPU = Debug|Any CPU
@@ -66,6 +70,14 @@ Global
6670
{23D63925-F6F2-4751-AE6E-45693B67D097}.Debug|Any CPU.Build.0 = Debug|Any CPU
6771
{23D63925-F6F2-4751-AE6E-45693B67D097}.Release|Any CPU.ActiveCfg = Release|Any CPU
6872
{23D63925-F6F2-4751-AE6E-45693B67D097}.Release|Any CPU.Build.0 = Release|Any CPU
73+
{BB3AE5DE-4CC9-4247-84FB-742C3919D55B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
74+
{BB3AE5DE-4CC9-4247-84FB-742C3919D55B}.Debug|Any CPU.Build.0 = Debug|Any CPU
75+
{BB3AE5DE-4CC9-4247-84FB-742C3919D55B}.Release|Any CPU.ActiveCfg = Release|Any CPU
76+
{BB3AE5DE-4CC9-4247-84FB-742C3919D55B}.Release|Any CPU.Build.0 = Release|Any CPU
77+
{1AF465CF-1AC8-44E2-A3D2-9ED4CED34975}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
78+
{1AF465CF-1AC8-44E2-A3D2-9ED4CED34975}.Debug|Any CPU.Build.0 = Debug|Any CPU
79+
{1AF465CF-1AC8-44E2-A3D2-9ED4CED34975}.Release|Any CPU.ActiveCfg = Release|Any CPU
80+
{1AF465CF-1AC8-44E2-A3D2-9ED4CED34975}.Release|Any CPU.Build.0 = Release|Any CPU
6981
EndGlobalSection
7082
GlobalSection(SolutionProperties) = preSolution
7183
HideSolutionNode = FALSE
@@ -76,6 +88,7 @@ Global
7688
{A8796EB2-FE2E-424D-A261-E0CCB128219F} = {7107F12F-6777-4FA4-B4E4-8EC6D7D122C9}
7789
{D40DB93C-E1C5-41A8-8ABF-52CBF68FE0A1} = {7107F12F-6777-4FA4-B4E4-8EC6D7D122C9}
7890
{23D63925-F6F2-4751-AE6E-45693B67D097} = {7107F12F-6777-4FA4-B4E4-8EC6D7D122C9}
91+
{1AF465CF-1AC8-44E2-A3D2-9ED4CED34975} = {7107F12F-6777-4FA4-B4E4-8EC6D7D122C9}
7992
EndGlobalSection
8093
GlobalSection(ExtensibilityGlobals) = postSolution
8194
SolutionGuid = {C0C48387-434F-40A0-A87A-87B3D1653E60}

README.md

+52
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,58 @@ public class Startup
285285

286286
The Registration of a RabbitMQChannel can be configured with additional parameters of the registration method.
287287

288+
### KafkaChannel
289+
290+
Provides a simple channel implementation using [Kafka](https://kafka.apache.org/).
291+
292+
Registration of Kafka
293+
```C#
294+
using EnterpriseIntegation.Kafka;
295+
using Microsoft.Extensions.Configuration;
296+
using Microsoft.Extensions.DependencyInjection;
297+
using Xunit.DependencyInjection.Logging;
298+
299+
namespace EnterpriseIntegration.Kafka.Tests;
300+
public class Startup
301+
{
302+
public static void ConfigureServices(IServiceCollection services)
303+
{
304+
IConfigurationBuilder configBuilder = new ConfigurationBuilder()
305+
.SetBasePath(Directory.GetCurrentDirectory())
306+
.AddJsonFile("appsettings.json");
307+
IConfiguration config = configBuilder.Build();
308+
309+
services
310+
.AddSingleton<ServiceActivatorFlow001>()
311+
// Enable Kafka Messaging based on json config
312+
.WithKafkaMessaging(config)
313+
// Provide the channel "001_world" via Kafka and will try to create the topic if it doesn't exist already
314+
.WithKafkaChannel("001_world", c => c.EnsureCreated = true)
315+
.UseWireTap()
316+
.UseEnterpriseIntegration();
317+
}
318+
}
319+
```
320+
321+
The Registration of a KafkaChannel can be configured with additional parameters of the registration method.
322+
323+
Example of the configuration for Kafka (it leverages the base kafka config, so all kafka configs are possible).
324+
```JSON
325+
{
326+
"EnterpriseIntegration": {
327+
"Kafka": {
328+
"ConsumerConfig": {
329+
"BootstrapServers": "127.0.0.1:29092",
330+
"GroupId": "integration-test"
331+
},
332+
"ProducerConfig": {
333+
"BootstrapServers": "127.0.0.1:29092"
334+
}
335+
}
336+
}
337+
}
338+
```
339+
288340
# Errors
289341

290342
## Immediate Error Handling
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<TargetFramework>net7.0</TargetFramework>
5+
<ImplicitUsings>enable</ImplicitUsings>
6+
<Nullable>enable</Nullable>
7+
</PropertyGroup>
8+
9+
<ItemGroup>
10+
<PackageReference Include="Confluent.Kafka" Version="2.1.0" />
11+
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="7.0.0" />
12+
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="7.0.0" />
13+
</ItemGroup>
14+
15+
<ItemGroup>
16+
<ProjectReference Include="..\EnterpriseIntegration\EnterpriseIntegration.csproj" />
17+
</ItemGroup>
18+
19+
</Project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
using Confluent.Kafka;
2+
namespace EnterpriseIntegration.Kafka;
3+
4+
public interface IKafkaConnectionProvider
5+
{
6+
public IProducer<string, byte[]> CreateProducer();
7+
8+
public IConsumer<string, byte[]> CreateConsumer();
9+
10+
public IAdminClient CreateAdminClient();
11+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
using Confluent.Kafka;
2+
using Confluent.Kafka.Admin;
3+
using EnterpriseIntegration.Channels;
4+
using EnterpriseIntegration.Message;
5+
using Microsoft.Extensions.Logging;
6+
7+
namespace EnterpriseIntegration.Kafka;
8+
9+
public class KafkaChannel : IMessagingChannel, IDisposable
10+
{
11+
/// <summary>
12+
/// Kafka is a highly distributed system, creating a subscription takes time to be synchronized across all nodes.
13+
/// </summary>
14+
private const int KafkaSubscriptionDelayInMs = 3_000;
15+
16+
private readonly CancellationTokenSource _cancellationTokenSource = new();
17+
18+
private readonly Lazy<IProducer<string, byte[]>> _producer;
19+
20+
private readonly Lazy<IConsumer<string, byte[]>> _consumer;
21+
22+
private readonly KafkaChannelSettings _settings;
23+
24+
private readonly IMessageMapper<ConsumeResult<string, byte[]>, KafkaMessage> _mapper;
25+
26+
private readonly ILogger<KafkaChannel> _logger;
27+
28+
private readonly Task? _topicCreationTask;
29+
30+
private bool _topicCreated = true;
31+
32+
public ChannelId ChannelId { get; }
33+
34+
public KafkaChannel(ChannelId id, IKafkaConnectionProvider connectionProvider, KafkaChannelSettings settings, IMessageMapper<ConsumeResult<string, byte[]>, KafkaMessage> mapper, ILogger<KafkaChannel> logger)
35+
{
36+
ChannelId = id;
37+
_settings = settings;
38+
_mapper = mapper;
39+
_logger = logger;
40+
_producer = new Lazy<IProducer<string, byte[]>>(connectionProvider.CreateProducer);
41+
_consumer = new Lazy<IConsumer<string, byte[]>>(connectionProvider.CreateConsumer);
42+
43+
if (settings.EnsureCreated)
44+
{
45+
_topicCreated = false;
46+
_topicCreationTask = EnsureTopicCreationAsync(connectionProvider);
47+
}
48+
}
49+
50+
private async Task EnsureTopicCreationAsync(IKafkaConnectionProvider connectionProvider)
51+
{
52+
using var adminClient = connectionProvider.CreateAdminClient();
53+
54+
var metaData = adminClient.GetMetadata(TimeSpan.FromSeconds(30));
55+
if (!metaData.Topics.Any(t => t.Topic.Equals(_settings.TopicName, StringComparison.OrdinalIgnoreCase)))
56+
{
57+
await adminClient.CreateTopicsAsync(new List<TopicSpecification> { new TopicSpecification { Name = _settings.TopicName, NumPartitions = 1, ReplicationFactor = 1, } });
58+
_topicCreated = true;
59+
}
60+
}
61+
62+
public async Task Send(IMessage message)
63+
{
64+
if (!_topicCreated && _topicCreationTask != null)
65+
{
66+
await _topicCreationTask;
67+
}
68+
69+
_logger.LogInformation("Produce Message(id:{id}) for Channel:{channelId} to topic:{topic}", message.MessageHeaders.Id, ChannelId, _settings.TopicName);
70+
await _producer.Value.ProduceAsync(_settings.TopicName, await _mapper.Map(message));
71+
}
72+
73+
/// <summary>
74+
/// Reads messages from a kafka topic.
75+
/// </summary>
76+
/// <typeparam name="T"></typeparam>
77+
/// <param name="subscriber"></param>
78+
/// <returns></returns>
79+
public async Task Subscribe<T>(Func<IMessage<T>, Task> subscriber)
80+
{
81+
if (!_topicCreated && _topicCreationTask != null)
82+
{
83+
await _topicCreationTask;
84+
}
85+
86+
_logger.LogInformation("Created subscription for channel:{channelId} on topic:{topic}", ChannelId, _settings.TopicName);
87+
_consumer.Value.Subscribe(_settings.TopicName);
88+
ThreadPool.QueueUserWorkItem(async _ => await ContinuousConsume(subscriber));
89+
await Task.Delay(KafkaSubscriptionDelayInMs);
90+
}
91+
92+
public void Dispose()
93+
{
94+
GC.SuppressFinalize(this);
95+
if (_consumer.IsValueCreated)
96+
{
97+
_consumer.Value.Unsubscribe();
98+
_consumer.Value.Close();
99+
_consumer.Value.Dispose();
100+
}
101+
102+
if (_producer.IsValueCreated)
103+
{
104+
_producer.Value.Dispose();
105+
}
106+
107+
_cancellationTokenSource.Cancel();
108+
}
109+
110+
private async Task ContinuousConsume<T>(Func<IMessage<T>, Task> subscriber)
111+
{
112+
while (!_cancellationTokenSource.Token.IsCancellationRequested)
113+
{
114+
try
115+
{
116+
_logger.LogInformation("Consumer starting.");
117+
IMessage<T> message = await _mapper.Map<T>(_consumer.Value.Consume(_cancellationTokenSource.Token));
118+
_logger.LogInformation("Received Message(id:{id}) for Channel:{channelId} from topic:{topic}", message.MessageHeaders.Id, ChannelId, _settings.TopicName);
119+
120+
await subscriber(message);
121+
_logger.LogInformation("Processed Message(id:{id})", message.MessageHeaders.Id);
122+
}
123+
catch (ObjectDisposedException)
124+
{
125+
if (!_cancellationTokenSource.Token.IsCancellationRequested)
126+
{
127+
return;
128+
}
129+
130+
throw;
131+
}
132+
catch (OperationCanceledException)
133+
{
134+
if (!_cancellationTokenSource.Token.IsCancellationRequested)
135+
{
136+
return;
137+
}
138+
139+
throw;
140+
}
141+
}
142+
}
143+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
namespace EnterpriseIntegration.Kafka;
2+
3+
public class KafkaChannelSettings
4+
{
5+
public required string TopicName { get; set; }
6+
7+
/// <summary>
8+
/// Flag if the Topic should be created if it does not exist.
9+
/// <para>This is only suggested for testing purposes, as a proper defined channel will work alot better</para>
10+
/// </summary>
11+
public bool EnsureCreated { get; set; }
12+
13+
/// <summary>
14+
/// Flag if the channel should ensure the Quality of Service (e.g. at least once delivery).
15+
/// The Channel will only acknowledge the message after the subscriber has been processed.
16+
/// <para>If this is set to false, the channel will auto acknowledge.</para>
17+
/// </summary>
18+
public bool EnsureQualityOfService { get; set; } = true;
19+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
using Microsoft.Extensions.Options;
2+
using Confluent.Kafka;
3+
4+
namespace EnterpriseIntegration.Kafka;
5+
6+
public class KafkaConnectionProvider : IKafkaConnectionProvider
7+
{
8+
private readonly KafkaSettings _settings;
9+
10+
public KafkaConnectionProvider(IOptions<KafkaSettings> settings)
11+
{
12+
_settings = settings.Value;
13+
}
14+
15+
public IConsumer<string, byte[]> CreateConsumer()
16+
=> new ConsumerBuilder<string, byte[]>(_settings.ConsumerConfig).Build();
17+
18+
public IProducer<string, byte[]> CreateProducer()
19+
=> new ProducerBuilder<string, byte[]>(_settings.ProducerConfig).Build();
20+
21+
public IAdminClient CreateAdminClient()
22+
=> new AdminClientBuilder(_settings.ProducerConfig).Build();
23+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
using Confluent.Kafka;
2+
3+
namespace EnterpriseIntegration.Kafka;
4+
5+
public class KafkaMessage : Message<string, byte[]> { }

0 commit comments

Comments
 (0)