var kafkaMessageBusOptions = context.Configuration.GetSection("kafka").Get<KafkaMessageBusOptions>();
services.AddKafkaMessageBus(kafkaMessageBusOptions);
[TopicAttribute(Name = "BusinessMessage")]
public class BusinessMessage
{
[RouteKeyAttribute]
public string MessageId { get; set; }
public string Content { get; set; }
public DateTime CreateTime { get; set; }
}
var messageData = new BusinessMessage
{
MessageId = "1",
Content = $"我是内容",
CreateTime = DateTime.Now
};
await _messageBus.PublishAsync(messageData);
await _messageBus.SubscribeAsync<BusinessMessage>(async (message) =>
{
var current = Interlocked.Increment(ref Count);
_logger.LogInformation($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss fff")}消费--1--数据:MessageId={message.MessageId},Content= {message.Content},count={current}");
await Task.CompletedTask;
});
//或者
//订阅配置可以灵活的增加参数 支持参数如下
SubscribeOptions subscribeOptions = new SubscribeOptions();
subscribeOptions.GroupId = "group2";
subscribeOptions.ConsumerThreadCount = 2;
await _messageBus.SubscribeAsync<BusinessMessage>(async (message) =>
{
var current = Interlocked.Increment(ref Count);
_logger.LogInformation($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss fff")}消费--2--数据:MessageId={message.MessageId},Content={message.Content},count={current}");
await Task.CompletedTask;
}, subscribeOptions, cancellationToken);