Высокоуровневая библиотека для работы с kafka. Базируется на KafkaFlow, которая в свою очередь основана на Confluent Kafka Client.
- Создаем топик с которыми будет работать приложение, используя конвенцию именования: "{проект}.{сущность}.{событие с ней произошедшее}"
Пример: composer_assistant.entity.creation
Если прямо сейчас у вас нет времени погружаться в настройки топиков:
| Настройка | Описание | Значение |
|---|---|---|
| partitions | Количество партиций | 9 |
| replication-factor | Количество реплик каждой из партиций | 3 |
| min.insync.replicas | Число реплик, которые должны быть синхронизированы, чтобы можно было продолжить запись | 2 |
| retention.ms | Определяет максимальный возраст сообщения, после превышения которого следует его удалить | Время разбора инцидента поддержкой |
| retry.backoff.ms | Задержка (в миллисекундах) между повторными попытками отправки сообщений в случае возникновения ошибки. | 1000 |
- Устанавливаем пакет
dotnet add package Byndyusoft.Net.Kafka- Инициализируем настройки kafka в appsettings.json
{
"KafkaSettings" : {
"Hosts": [
"some-host:9092"
],
"Username" : "username",
"Password" : "password"
}- Используем расширение AddKafkaBus, которое регистрирует все необходимые зависимости для работы библиотеки, в т.ч:
- отправители сообщений, которые являются потомками
KafkaMessageProducerBaseи помечены атрибутомKafkaMessageProducerAttribute; - обработчики входящих сообщений, которые являются потомками
KafkaMessageHandlerBaseи помечены атрибутомKafkaMessageProducerAttribute
public void ConfigureServices(IServiceCollection services)
{
services.AddKafkaBus(_configuration.GetSection(nameof(KafkaSettings)).Get<KafkaSettings>());
}Поиск отправителей и обработчиков выполняется автоматически из всех используемых сборок, имя которых имеет общий префикс с вызывающей, префиксом считается последовательность символов имени сборки до первой точки.
Пример: вызывающая сборка называется MusicalityLabs.ComposerAssistant.Storage.Api, поэтому поиск будет выполняться во всех сборках, название которых начинается с MusicalityLabs.
- Запускаем работу с kafka, после запуска обработка сообщений будет осуществляться до момента остановки приложения
public static void Main(string[] args)
{
CreateHostBuilder(args)
.Build()
.StartKafkaProcessing()
.Run();
}- Добавляем телеметрию kafka
.AddOpenTelemetryTracing(
builder => builder
.SetResourceBuilder(ResourceBuilder.CreateDefault().AddService(serviceName))
.AddKafkaInstrumentation()
.AddAspNetCoreInstrumentation()
)- Ожидает подтверждения от всех синхронизированных реплик, прежде, чем считать сообщение успешно отправленным.
- Идемпотентность: cемантика exactly-once с сохранением порядка доставки для каждого раздела.
- Если сообщение не удалось отправить, producer будет повторять попытку до 3 раз с задержкой в 1 секунду между каждой попыткой
| Настройка | Описание | Значение |
|---|---|---|
| enable.idempotence | Гарантирует запись только одного сообщения в конкретную партицию одного топика | true |
| max.in.flight.requests.per.connection | Общее число неподтверждённых брокером запросов для одного клиента | 1 |
| acks | После скольких acknowledge лидеру кластера необходимо считать сообщение успешно записанным | all (значение из min.insync.replicas) |
Конвенция наименования: "{проект}.{сервис}.{топик}".ToSnakeCase(), где "проект" опрееляется как второй сегмент имени сборки, "сервис", как все то, что идет после него. При этом исходим из того, что название сборки формируется по принципу "{НазваниеОрганизации}.{НазваниеПроекта}.{Название.Сервиса}".
Пример: для сборки MusicalityLabs.ComposerAssistant.Storage.Api и топика composer_assistant.entity.creation "проект" будет равен composer_assistant, "сервис" будет равен storage_api, итоговое имя будет равно composer_assistant.storage_api.entity_creation.
[KafkaMessageProducer(topic: "composer_assistant.entity.creation")]
public class EntityCreationEventMessageProducer : KafkaMessageProducerBase<EntityCreation>
{
public EntityCreationEventMessageProducer(IKafkaMessageSender messageSender) : base(messageSender)
{
}
protected override string KeyGenerator(EntityCreation entityCreation)
=> entityCreation.Id.ToString();
}- При первом запуске потребителя, если не существует сохраненного смещения (offset), то потребитель будет начинать чтение сообщений с самого начала топика, т.е потребитель будет читать все сообщения, начиная с самого раннего доступного смещения (offset) в топике.
- Если сообщение не удалось обработать, consumer будет повторять попытку до 3 раз с задержкой равной 2^retryNumber
Конвенция наименования: "{проект}.{сервис}.{топик}".ToSnakeCase(), где "проект" опрееляется как второй сегмент имени сборки, "сервис", как все то, что идет после него. При этом исходим из того, что название сборки формируется по принципу "{НазваниеОрганизации}.{НазваниеПроекта}.{Название.Сервиса}".
Пример: для сборки MusicalityLabs.ComposerAssistant.Storage.Api и топика composer_assistant.entity.creation "проект" будет равен composer_assistant, "сервис" будет равен storage_api, итоговое имя будет равно composer_assistant.storage_api.entity_creation.
[KafkaMessageHandler(topic: "composer_assistant.entity.creation")]
public class EntityCreationMessageHandler : KafkaMessageHandlerBase<EntityCreation>
{
private readonly ILogger<EntityCreationMessageHandler> _logger;
public EntityCreationMessageHandler(ILogger<EntityCreationMessageHandler> logger)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
protected override Task Handle(EntityCreation someEvent)
{
_logger.LogInformation("Message: {EntityText}", someEvent.Text);
return Task.FromResult(someEvent);
}
}