# Aix.KafkaMessageBus **Repository Path**: aix01/Aix.KafkaMessageBus ## Basic Information - **Project Name**: Aix.KafkaMessageBus - **Description**: kafka订阅发布 - **Primary Language**: C# - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 1 - **Created**: 2021-04-21 - **Last Updated**: 2022-08-15 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # Aix.KafkaMessageBus #### kafka订阅发布 #### 注入 ConfigureServices方法中 ``` var kafkaMessageBusOptions = context.Configuration.GetSection("kafka").Get(); services.AddKafkaMessageBus(kafkaMessageBusOptions); ``` #### 注入 IMessageBus _messageBus,添加业务实体 ``` [TopicAttribute(Name = "BusinessMessage")] public class BusinessMessage { [RouteKeyAttribute] public string MessageId { get; set; } public string Content { get; set; } public DateTime CreateTime { get; set; } } ``` #### 生产消息 ``` var messageData = new BusinessMessage { MessageId = "1", Content = $"我是内容", CreateTime = DateTime.Now }; await _messageBus.PublishAsync(messageData); ``` #### 消费消息 ``` await _messageBus.SubscribeAsync(async (message) => { var current = Interlocked.Increment(ref Count); _logger.LogInformation($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss fff")}消费--1--数据:MessageId={message.MessageId},Content= {message.Content},count={current}"); await Task.CompletedTask; }); //或者 //订阅配置可以灵活的增加参数 支持参数如下 SubscribeOptions subscribeOptions = new SubscribeOptions(); subscribeOptions.GroupId = "group2"; subscribeOptions.ConsumerThreadCount = 2; await _messageBus.SubscribeAsync(async (message) => { var current = Interlocked.Increment(ref Count); _logger.LogInformation($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss fff")}消费--2--数据:MessageId={message.MessageId},Content={message.Content},count={current}"); await Task.CompletedTask; }, subscribeOptions, cancellationToken); ```