# 消息队列 (10_MQ)
最后更新: 2024-09-20
# 📚 概述
DarkM 消息队列模块支持消息队列功能,用于异步处理、削峰填谷、系统解耦等场景。
源代码位置: DarkM/src/Framework/MQ
# 🏗️ 模块架构
# 完整目录结构
MQ/
└── MQ.RabbitMQ/
├── RabbitMQConfig.cs # RabbitMQ 配置
├── RabbitMQDeclareSettings.cs # RabbitMQ 声明设置
├── DefaultExchange.cs # 默认交换机
├── RabbitMQClient.cs # RabbitMQ 客户端
├── Consumer.cs # 消费者
└── ServiceCollectionExtensions.cs # DI 扩展
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
# 🔧 支持的消息队列
| 提供器 | 说明 | 适用场景 |
|---|---|---|
| RabbitMQ | AMQP 协议,功能丰富 | 一般业务场景 |
| Kafka | 高吞吐量,适合日志处理 | 日志收集、大数据 |
| Azure Service Bus | Azure 云服务 | Azure 云环境 |
| Memory | 内存队列,用于测试 | 开发测试 |
# 💡 配置说明
# appsettings.json 配置
{
"MQ": {
// 消息队列提供器
"Provider": "RabbitMQ",
// RabbitMQ 配置
"RabbitMQ": {
// 主机地址
"HostName": "localhost",
// 端口
"Port": 5672,
// 用户名
"UserName": "guest",
// 密码
"Password": "guest",
// 虚拟主机
"VirtualHost": "/"
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 配置项详解
| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
| Provider | string | RabbitMQ | 消息队列提供器 |
| RabbitMQ.HostName | string | localhost | RabbitMQ 主机地址 |
| RabbitMQ.Port | int | 5672 | RabbitMQ 端口 |
| RabbitMQ.UserName | string | guest | 用户名 |
| RabbitMQ.Password | string | guest | 密码 |
| RabbitMQ.VirtualHost | string | / | 虚拟主机 |
# 🔧 核心接口
# 1. IMessagePublisher(消息发布器)
public interface IMessagePublisher
{
/// <summary>
/// 发布消息
/// </summary>
/// <typeparam name="T">消息类型</typeparam>
/// <param name="message">消息内容</param>
Task PublishAsync<T>(T message) where T : class;
/// <summary>
/// 发布消息到指定队列
/// </summary>
/// <typeparam name="T">消息类型</typeparam>
/// <param name="queue">队列名称</param>
/// <param name="message">消息内容</param>
Task PublishAsync<T>(string queue, T message) where T : class;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 2. IMessageSubscriber(消息订阅器)
public interface IMessageSubscriber
{
/// <summary>
/// 订阅消息
/// </summary>
/// <typeparam name="T">消息类型</typeparam>
/// <param name="handler">消息处理器</param>
Task SubscribeAsync<T>(Func<T, Task> handler) where T : class;
/// <summary>
/// 订阅消息(指定队列)
/// </summary>
/// <typeparam name="T">消息类型</typeparam>
/// <param name="queue">队列名称</param>
/// <param name="handler">消息处理器</param>
Task SubscribeAsync<T>(string queue, Func<T, Task> handler) where T : class;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 3. IMessageHandler(消息处理器)
public interface IMessageHandler<T> where T : class
{
/// <summary>
/// 处理消息
/// </summary>
/// <param name="message">消息内容</param>
Task HandleAsync(T message);
}
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
# 💡 使用示例
# 1. 定义消息
/// <summary>
/// 订单创建事件
/// </summary>
public class OrderCreatedEvent
{
/// <summary>
/// 订单 ID
/// </summary>
public int OrderId { get; set; }
/// <summary>
/// 用户 ID
/// </summary>
public int UserId { get; set; }
/// <summary>
/// 订单金额
/// </summary>
public decimal Amount { get; set; }
/// <summary>
/// 创建时间
/// </summary>
public DateTime CreateTime { get; set; }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 2. 发布消息
using DarkM.Lib.MQ.Abstractions;
public class OrderService : IOrderService
{
private readonly IMessagePublisher _publisher;
public OrderService(IMessagePublisher publisher)
{
_publisher = publisher;
}
public async Task CreateOrderAsync(OrderCreateModel model)
{
// 创建订单
var order = await _repository.AddAsync(model);
// 发布订单创建事件
await _publisher.PublishAsync(new OrderCreatedEvent
{
OrderId = order.Id,
UserId = order.UserId,
Amount = order.Amount,
CreateTime = order.CreateTime
});
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 3. 订阅消息
using DarkM.Lib.MQ.Abstractions;
public class EmailNotificationHandler : IMessageHandler<OrderCreatedEvent>
{
private readonly IEmailService _emailService;
public EmailNotificationHandler(IEmailService emailService)
{
_emailService = emailService;
}
public async Task HandleAsync(OrderCreatedEvent message)
{
// 发送订单创建通知邮件
await _emailService.SendOrderNotificationAsync(message.OrderId);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 4. 注册消息处理器
// 自动扫描并注册所有 IMessageHandler 实现
services.AddMessageHandlers(Assembly.GetExecutingAssembly());
1
2
2
# 📚 使用场景
# 1. 异步处理
// 同步返回,异步处理
public async Task<IResultModel> PlaceOrder(OrderModel model)
{
// 创建订单(同步)
var order = await _orderService.CreateAsync(model);
// 异步发送通知
await _publisher.PublishAsync(new OrderCreatedEvent { OrderId = order.Id });
// 异步生成发票
await _publisher.PublishAsync(new GenerateInvoiceEvent { OrderId = order.Id });
return ResultModel.Success(order);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
特点: 快速响应用户,后台异步处理耗时操作。
# 2. 系统解耦
// 订单模块
await _publisher.PublishAsync(new OrderCreatedEvent { OrderId = orderId });
// 库存模块(独立订阅)
public class InventoryHandler : IMessageHandler<OrderCreatedEvent>
{
public async Task HandleAsync(OrderCreatedEvent message)
{
await _inventoryService.DeductAsync(message.OrderId);
}
}
// 物流模块(独立订阅)
public class ShippingHandler : IMessageHandler<OrderCreatedEvent>
{
public async Task HandleAsync(OrderCreatedEvent message)
{
await _shippingService.CreateShippingAsync(message.OrderId);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
特点: 模块间通过消息通信,降低耦合度。
# 3. 削峰填谷
// 秒杀场景
public async Task<IResultModel> Seckill(int productId)
{
// 快速返回
await _publisher.PublishAsync(new SeckillRequestEvent
{
ProductId = productId,
UserId = _loginInfo.AccountId
});
return ResultModel.Success("请求已接收,处理结果将稍后通知");
}
// 异步处理秒杀请求
public class SeckillHandler : IMessageHandler<SeckillRequestEvent>
{
public async Task HandleAsync(SeckillRequestEvent message)
{
// 限流处理
// 库存扣减
// 订单创建
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
特点: 高峰期消息队列缓冲,后台按能力处理。
# 📚 最佳实践
# 1. 消息命名
使用有意义的命名,表明事件类型:
// ✅ 推荐
OrderCreatedEvent
PaymentCompletedEvent
UserRegisteredEvent
// ❌ 不推荐
OrderMessage
Event1
Data
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
# 2. 消息幂等性
确保消息可以重复消费:
public async Task HandleAsync(OrderCreatedEvent message)
{
// 检查是否已处理
if (await _processedRepository.ExistsAsync(message.OrderId))
return;
// 处理业务
await ProcessAsync(message);
// 标记已处理
await _processedRepository.AddAsync(message.OrderId);
}
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
# 3. 错误处理
public async Task HandleAsync(OrderCreatedEvent message)
{
try
{
await ProcessAsync(message);
}
catch (Exception ex)
{
_logger.Error("处理订单创建事件失败", ex);
// 重试或发送到死信队列
await _publisher.PublishAsync("dead_letter", message);
throw; // 重新抛出,让 MQ 重试
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 4. 消息持久化
重要消息需要持久化:
await _publisher.PublishAsync(message, new PublishOptions
{
Persistent = true, // 持久化
Priority = 1 // 优先级
});
1
2
3
4
5
2
3
4
5
# 5. 消息确认
启用 ACK 机制确保消息可靠投递:
public async Task HandleAsync(OrderCreatedEvent message)
{
try
{
await ProcessAsync(message);
// 手动确认
await _subscriber.AckAsync(message);
}
catch
{
// 拒绝消息,重新入队
await _subscriber.NackAsync(message, requeue: true);
throw;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 🔍 常见问题
# Q1: 消息丢失如何解决?
解决方案:
- 启用消息持久化
await _publisher.PublishAsync(message, new PublishOptions
{
Persistent = true
});
1
2
3
4
2
3
4
- 启用确认机制(ACK)
// 手动确认
await _subscriber.AckAsync(message);
1
2
2
- 使用事务
using var transaction = _publisher.CreateTransaction();
await _publisher.PublishAsync(message);
transaction.Commit();
1
2
3
2
3
# Q2: 重复消费如何处理?
解决方案:
- 实现幂等性
if (await _processedRepository.ExistsAsync(message.OrderId))
return;
1
2
2
- 使用唯一消息 ID
public class MessageBase
{
public string MessageId { get; set; } = Guid.NewGuid().ToString();
}
1
2
3
4
2
3
4
- 记录已处理的消息
await _processedRepository.AddAsync(message.MessageId);
1
# Q3: 消息积压如何解决?
解决方案:
- 增加消费者数量
# 扩展消费者实例
docker scale --replicas=5 consumer
1
2
2
- 优化处理逻辑
// 批量处理
public async Task HandleAsync(List<OrderCreatedEvent> messages)
{
foreach (var message in messages)
{
await ProcessAsync(message);
}
}
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
- 使用并发处理
await Task.WhenAll(messages.Select(m => ProcessAsync(m)));
1
# Q4: 死信队列如何使用?
解决方案:
- 配置死信队列
services.AddRabbitMQ(options =>
{
options.DeadLetterExchange = "dlx.exchange";
options.DeadLetterQueue = "dlx.queue";
});
1
2
3
4
5
2
3
4
5
- 发送消息到死信队列
await _publisher.PublishAsync("dead_letter", failedMessage);
1
- 监控死信队列
var deadLetterCount = await _subscriber.GetQueueCountAsync("dlx.queue");
if (deadLetterCount > 0)
{
_logger.LogWarning($"死信队列积压:{deadLetterCount}");
}
1
2
3
4
5
2
3
4
5
# Q5: 如何保证消息顺序?
解决方案:
- 使用消息分区
await _publisher.PublishAsync($"order_{orderId}", message);
1
- 单线程处理
services.AddMessageConsumer<OrderCreatedEvent>(options =>
{
options.ConcurrentConsumers = 1; // 单消费者
});
1
2
3
4
2
3
4
- 使用有序队列
// Kafka 等支持有序消息的队列
1
# 📚 相关文档
# 🔗 参考链接
- 源代码 (opens new window) -
src/Framework/MQ - RabbitMQ 官方文档 (opens new window)
- Kafka 官方文档 (opens new window)
最后更新: 2024-09-20