# 消息队列 (10_MQ)

最后更新: 2024-09-20


# 📚 概述

DarkM 消息队列模块支持消息队列功能,用于异步处理、削峰填谷、系统解耦等场景。

源代码位置: DarkM/src/Framework/MQ


# 🏗️ 模块架构

# 完整目录结构

MQ/
└── MQ.RabbitMQ/
    ├── RabbitMQConfig.cs                  # RabbitMQ 配置
    ├── RabbitMQDeclareSettings.cs         # RabbitMQ 声明设置
    ├── DefaultExchange.cs                 # 默认交换机
    ├── RabbitMQClient.cs                  # RabbitMQ 客户端
    ├── Consumer.cs                        # 消费者
    └── ServiceCollectionExtensions.cs     # DI 扩展
1
2
3
4
5
6
7
8

# 🔧 支持的消息队列

提供器 说明 适用场景
RabbitMQ AMQP 协议,功能丰富 一般业务场景
Kafka 高吞吐量,适合日志处理 日志收集、大数据
Azure Service Bus Azure 云服务 Azure 云环境
Memory 内存队列,用于测试 开发测试

# 💡 配置说明

# appsettings.json 配置

{
  "MQ": {
    // 消息队列提供器
    "Provider": "RabbitMQ",
    
    // RabbitMQ 配置
    "RabbitMQ": {
      // 主机地址
      "HostName": "localhost",
      // 端口
      "Port": 5672,
      // 用户名
      "UserName": "guest",
      // 密码
      "Password": "guest",
      // 虚拟主机
      "VirtualHost": "/"
    }
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

# 配置项详解

配置项 类型 默认值 说明
Provider string RabbitMQ 消息队列提供器
RabbitMQ.HostName string localhost RabbitMQ 主机地址
RabbitMQ.Port int 5672 RabbitMQ 端口
RabbitMQ.UserName string guest 用户名
RabbitMQ.Password string guest 密码
RabbitMQ.VirtualHost string / 虚拟主机

# 🔧 核心接口

# 1. IMessagePublisher(消息发布器)

public interface IMessagePublisher
{
    /// <summary>
    /// 发布消息
    /// </summary>
    /// <typeparam name="T">消息类型</typeparam>
    /// <param name="message">消息内容</param>
    Task PublishAsync<T>(T message) where T : class;
    
    /// <summary>
    /// 发布消息到指定队列
    /// </summary>
    /// <typeparam name="T">消息类型</typeparam>
    /// <param name="queue">队列名称</param>
    /// <param name="message">消息内容</param>
    Task PublishAsync<T>(string queue, T message) where T : class;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# 2. IMessageSubscriber(消息订阅器)

public interface IMessageSubscriber
{
    /// <summary>
    /// 订阅消息
    /// </summary>
    /// <typeparam name="T">消息类型</typeparam>
    /// <param name="handler">消息处理器</param>
    Task SubscribeAsync<T>(Func<T, Task> handler) where T : class;
    
    /// <summary>
    /// 订阅消息(指定队列)
    /// </summary>
    /// <typeparam name="T">消息类型</typeparam>
    /// <param name="queue">队列名称</param>
    /// <param name="handler">消息处理器</param>
    Task SubscribeAsync<T>(string queue, Func<T, Task> handler) where T : class;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# 3. IMessageHandler(消息处理器)

public interface IMessageHandler<T> where T : class
{
    /// <summary>
    /// 处理消息
    /// </summary>
    /// <param name="message">消息内容</param>
    Task HandleAsync(T message);
}
1
2
3
4
5
6
7
8

# 💡 使用示例

# 1. 定义消息

/// <summary>
/// 订单创建事件
/// </summary>
public class OrderCreatedEvent
{
    /// <summary>
    /// 订单 ID
    /// </summary>
    public int OrderId { get; set; }
    
    /// <summary>
    /// 用户 ID
    /// </summary>
    public int UserId { get; set; }
    
    /// <summary>
    /// 订单金额
    /// </summary>
    public decimal Amount { get; set; }
    
    /// <summary>
    /// 创建时间
    /// </summary>
    public DateTime CreateTime { get; set; }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

# 2. 发布消息

using DarkM.Lib.MQ.Abstractions;

public class OrderService : IOrderService
{
    private readonly IMessagePublisher _publisher;
    
    public OrderService(IMessagePublisher publisher)
    {
        _publisher = publisher;
    }
    
    public async Task CreateOrderAsync(OrderCreateModel model)
    {
        // 创建订单
        var order = await _repository.AddAsync(model);
        
        // 发布订单创建事件
        await _publisher.PublishAsync(new OrderCreatedEvent
        {
            OrderId = order.Id,
            UserId = order.UserId,
            Amount = order.Amount,
            CreateTime = order.CreateTime
        });
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

# 3. 订阅消息

using DarkM.Lib.MQ.Abstractions;

public class EmailNotificationHandler : IMessageHandler<OrderCreatedEvent>
{
    private readonly IEmailService _emailService;
    
    public EmailNotificationHandler(IEmailService emailService)
    {
        _emailService = emailService;
    }
    
    public async Task HandleAsync(OrderCreatedEvent message)
    {
        // 发送订单创建通知邮件
        await _emailService.SendOrderNotificationAsync(message.OrderId);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# 4. 注册消息处理器

// 自动扫描并注册所有 IMessageHandler 实现
services.AddMessageHandlers(Assembly.GetExecutingAssembly());
1
2

# 📚 使用场景

# 1. 异步处理

// 同步返回,异步处理
public async Task<IResultModel> PlaceOrder(OrderModel model)
{
    // 创建订单(同步)
    var order = await _orderService.CreateAsync(model);
    
    // 异步发送通知
    await _publisher.PublishAsync(new OrderCreatedEvent { OrderId = order.Id });
    
    // 异步生成发票
    await _publisher.PublishAsync(new GenerateInvoiceEvent { OrderId = order.Id });
    
    return ResultModel.Success(order);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

特点: 快速响应用户,后台异步处理耗时操作。


# 2. 系统解耦

// 订单模块
await _publisher.PublishAsync(new OrderCreatedEvent { OrderId = orderId });

// 库存模块(独立订阅)
public class InventoryHandler : IMessageHandler<OrderCreatedEvent>
{
    public async Task HandleAsync(OrderCreatedEvent message)
    {
        await _inventoryService.DeductAsync(message.OrderId);
    }
}

// 物流模块(独立订阅)
public class ShippingHandler : IMessageHandler<OrderCreatedEvent>
{
    public async Task HandleAsync(OrderCreatedEvent message)
    {
        await _shippingService.CreateShippingAsync(message.OrderId);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

特点: 模块间通过消息通信,降低耦合度。


# 3. 削峰填谷

// 秒杀场景
public async Task<IResultModel> Seckill(int productId)
{
    // 快速返回
    await _publisher.PublishAsync(new SeckillRequestEvent
    {
        ProductId = productId,
        UserId = _loginInfo.AccountId
    });
    
    return ResultModel.Success("请求已接收,处理结果将稍后通知");
}

// 异步处理秒杀请求
public class SeckillHandler : IMessageHandler<SeckillRequestEvent>
{
    public async Task HandleAsync(SeckillRequestEvent message)
    {
        // 限流处理
        // 库存扣减
        // 订单创建
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

特点: 高峰期消息队列缓冲,后台按能力处理。


# 📚 最佳实践

# 1. 消息命名

使用有意义的命名,表明事件类型:

// ✅ 推荐
OrderCreatedEvent
PaymentCompletedEvent
UserRegisteredEvent

// ❌ 不推荐
OrderMessage
Event1
Data
1
2
3
4
5
6
7
8
9

# 2. 消息幂等性

确保消息可以重复消费:

public async Task HandleAsync(OrderCreatedEvent message)
{
    // 检查是否已处理
    if (await _processedRepository.ExistsAsync(message.OrderId))
        return;
    
    // 处理业务
    await ProcessAsync(message);
    
    // 标记已处理
    await _processedRepository.AddAsync(message.OrderId);
}
1
2
3
4
5
6
7
8
9
10
11
12

# 3. 错误处理

public async Task HandleAsync(OrderCreatedEvent message)
{
    try
    {
        await ProcessAsync(message);
    }
    catch (Exception ex)
    {
        _logger.Error("处理订单创建事件失败", ex);
        
        // 重试或发送到死信队列
        await _publisher.PublishAsync("dead_letter", message);
        
        throw;  // 重新抛出,让 MQ 重试
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 4. 消息持久化

重要消息需要持久化:

await _publisher.PublishAsync(message, new PublishOptions
{
    Persistent = true,  // 持久化
    Priority = 1        // 优先级
});
1
2
3
4
5

# 5. 消息确认

启用 ACK 机制确保消息可靠投递:

public async Task HandleAsync(OrderCreatedEvent message)
{
    try
    {
        await ProcessAsync(message);
        // 手动确认
        await _subscriber.AckAsync(message);
    }
    catch
    {
        // 拒绝消息,重新入队
        await _subscriber.NackAsync(message, requeue: true);
        throw;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 🔍 常见问题

# Q1: 消息丢失如何解决?

解决方案:

  1. 启用消息持久化
await _publisher.PublishAsync(message, new PublishOptions
{
    Persistent = true
});
1
2
3
4
  1. 启用确认机制(ACK)
// 手动确认
await _subscriber.AckAsync(message);
1
2
  1. 使用事务
using var transaction = _publisher.CreateTransaction();
await _publisher.PublishAsync(message);
transaction.Commit();
1
2
3

# Q2: 重复消费如何处理?

解决方案:

  1. 实现幂等性
if (await _processedRepository.ExistsAsync(message.OrderId))
    return;
1
2
  1. 使用唯一消息 ID
public class MessageBase
{
    public string MessageId { get; set; } = Guid.NewGuid().ToString();
}
1
2
3
4
  1. 记录已处理的消息
await _processedRepository.AddAsync(message.MessageId);
1

# Q3: 消息积压如何解决?

解决方案:

  1. 增加消费者数量
# 扩展消费者实例
docker scale --replicas=5 consumer
1
2
  1. 优化处理逻辑
// 批量处理
public async Task HandleAsync(List<OrderCreatedEvent> messages)
{
    foreach (var message in messages)
    {
        await ProcessAsync(message);
    }
}
1
2
3
4
5
6
7
8
  1. 使用并发处理
await Task.WhenAll(messages.Select(m => ProcessAsync(m)));
1

# Q4: 死信队列如何使用?

解决方案:

  1. 配置死信队列
services.AddRabbitMQ(options =>
{
    options.DeadLetterExchange = "dlx.exchange";
    options.DeadLetterQueue = "dlx.queue";
});
1
2
3
4
5
  1. 发送消息到死信队列
await _publisher.PublishAsync("dead_letter", failedMessage);
1
  1. 监控死信队列
var deadLetterCount = await _subscriber.GetQueueCountAsync("dlx.queue");
if (deadLetterCount > 0)
{
    _logger.LogWarning($"死信队列积压:{deadLetterCount}");
}
1
2
3
4
5

# Q5: 如何保证消息顺序?

解决方案:

  1. 使用消息分区
await _publisher.PublishAsync($"order_{orderId}", message);
1
  1. 单线程处理
services.AddMessageConsumer<OrderCreatedEvent>(options =>
{
    options.ConcurrentConsumers = 1;  // 单消费者
});
1
2
3
4
  1. 使用有序队列
// Kafka 等支持有序消息的队列
1

# 📚 相关文档


# 🔗 参考链接


最后更新: 2024-09-20