10 个MQ高频业务场景深度解析
2026-02-23 07:22:36
在分布式系统开发中,很多开发者都会疑惑:“明明直接调用接口就能实现功能,为什么还要引入MQ这个‘中间商’?”其实,当系统面临高并发、紧耦合、数据一致性等问题时,MQ的价值才会真正凸显。本文结合实际业务场景,从问题背景、解决方案、代码实现到技术要点,全面拆解MQ的10种核心用法,帮你避开常见陷阱,真正用好消息队列。
一、MQ的核心价值:为什么需要它?
在引入具体场景前,先明确MQ解决的核心问题。系统间直接调用会面临四大痛点:
紧耦合:上游服务需感知所有下游服务,新增或移除下游需修改上游代码。
性能瓶颈:同步调用会累积所有下游服务的响应时间,导致接口耗时过长。
单点故障:任一下游服务宕机,会导致上游服务调用失败,影响核心流程。
扩展困难:下游服务扩容时,上游服务可能需要同步调整调用逻辑。
而引入MQ后,能实现五大核心能力:
解耦:上游只需发送消息,无需关心下游如何处理。
异步化:非核心流程异步执行,减少主流程响应时间。
削峰填谷:缓冲突发流量,避免数据库或服务被瞬时请求压垮。
冗余存储:消息持久化,避免数据丢失。
一致性保障:通过事务消息、重试机制,保证分布式系统数据一致性。
二、10个实战场景:从问题到解决方案
场景一:系统解耦——打破服务间强依赖
背景痛点
电商项目中,订单创建后需同步调用库存、积分、邮件、数据分析等多个服务(代码如下)。这种设计下,新增一个下游服务(如物流通知)需修改订单服务代码,且任一下游服务故障(如邮件服务超时)会导致订单创建失败。
// 紧耦合代码示例
@Service
public class OrderService {
// 依赖多个下游服务
@Autowired private InventoryService inventoryService;
@Autowired private PointsService pointsService;
@Autowired private EmailService emailService;
public void createOrder(Order order) {
// 1. 保存订单(核心流程)
orderDao.save(order);
// 2. 同步调用库存服务(非核心,但故障会阻塞订单)
inventoryService.updateInventory(order);
// 3. 同步调用积分服务(非核心)
pointsService.addPoints(order.getUserId(), order.getAmount());
// 4. 同步发送邮件(非核心,超时风险高)
emailService.sendConfirmation(order);
}
}
MQ解决方案
引入MQ后,订单服务只需发送“订单创建”消息到MQ,下游服务各自订阅消息并处理,服务间彻底解耦。架构如下:
订单服务 → 发送消息 → MQ → 库存服务/积分服务/邮件服务(各自消费)
代码实现
// 1. 订单服务(生产者):只关注核心流程,发送消息
@Service
public class OrderService {
@Autowired private RabbitTemplate rabbitTemplate;
public void createOrder(Order order) {
// 核心:保存订单
orderDao.save(order);
// 发送消息到MQ,参数:交换机、路由键、消息体
OrderCreatedEvent event = new OrderCreatedEvent(
order.getId(), order.getUserId(), order.getAmount()
);
rabbitTemplate.convertAndSend(
"order.exchange", // 交换机名称
"order.created", // 路由键(匹配下游队列)
event
);
}
}
// 2. 库存服务(消费者):订阅消息,独立处理
@Component
public class InventoryConsumer {
@Autowired private InventoryService inventoryService;
// 监听指定队列
@RabbitListener(queues = "inventory.queue")
public void handleOrderCreated(OrderCreatedEvent event) {
// 独立处理库存更新,失败不影响订单服务
inventoryService.updateInventory(event.getOrderId());
}
}
// 3. 邮件服务(消费者):同理订阅消息
@Component
public class EmailConsumer {
@Autowired private EmailService emailService;
@RabbitListener(queues = "email.queue")
public void handleOrderCreated(OrderCreatedEvent event) {
emailService.sendConfirmation(event.getUserId(), event.getOrderId());
}
}
技术要点
交换机与队列绑定:通过路由键(如order.created)将交换机与下游队列绑定,实现消息精准投递。
错误处理:下游服务失败时,通过重试机制(如RabbitMQ的重新入队)或死信队列兜底,避免消息丢失。
消息格式:使用JSON或Protobuf定义消息结构,保证跨服务、跨语言兼容性。
场景二:异步处理——提升核心流程响应速度
背景痛点
用户上传视频后,需执行转码、生成缩略图、内容审核等耗时操作(单步可能需几秒)。若同步处理,用户需等待操作完成才能收到反馈,体验极差。
MQ解决方案
视频服务接收上传后,立即保存原始视频并返回结果,同时发送“视频处理”消息到MQ;后台消费者异步执行转码、审核等操作,处理完成后通知用户。流程如下:
用户 → 视频服务(保存视频+发送消息)→ 返回上传成功 → MQ → 处理服务(转码/审核)→ 通知用户
代码实现
// 1. 视频服务(生产者):快速响应用户
@Service
public class VideoService {
@Autowired private KafkaTemplate
public UploadResponse uploadVideo(MultipartFile file, String userId) {
// 1. 保存原始视频,生成唯一视频ID
String videoId = saveOriginalVideo(file);
// 2. 发送处理消息到Kafka(异步执行,不阻塞返回)
VideoProcessingEvent event = new VideoProcessingEvent(videoId, userId);
kafkaTemplate.send("video-processing-topic", event);
// 3. 立即返回,用户无需等待
return new UploadResponse(videoId, "上传成功,处理中");
}
}
// 2. 视频处理服务(消费者):后台异步处理
@Service
public class VideoProcessingConsumer {
@Autowired private VideoProcessor videoProcessor;
@Autowired private NotificationService notificationService;
// 监听Kafka主题
@KafkaListener(topics = "video-processing-topic")
public void processVideo(VideoProcessingEvent event) {
try {
// 耗时操作:转码、生成缩略图、内容审核
videoProcessor.transcode(event.getVideoId());
videoProcessor.generateThumbnails(event.getVideoId());
contentAuditService.check(event.getVideoId());
// 处理完成,通知用户
notificationService.notifyUser(event.getUserId(), "视频处理完成");
} catch (Exception e) {
log.error("视频处理失败,videoId:{}", event.getVideoId(), e);
// 失败重试:可发送到重试队列
}
}
}
技术要点
MQ选型:处理大量视频消息时,优先选Kafka(高吞吐、持久化能力强)。
响应设计:返回给用户“处理中”状态,并提供查询接口(如通过videoId查询进度)。
弹性扩展:若处理压力大,可增加消费者实例,MQ自动负载均衡消息。
场景三:流量削峰——应对秒杀等突发流量
背景痛点
电商秒杀活动中,瞬时请求量可能达到平时的100倍(如1秒10万请求)。若直接转发到数据库,会导致数据库连接耗尽、查询超时,甚至宕机。
MQ解决方案
通过MQ缓冲请求,将“瞬时高并发”转化为“匀速消费”:
网关层限流:拒绝超出承载能力的请求(如每秒只允许1万请求进入)。
MQ缓冲:通过队列存储秒杀请求,避免直接冲击数据库。
消费者匀速处理:控制消费者数量,确保数据库按承载能力处理订单(如每秒处理500单)。
代码实现
// 1. 秒杀服务(生产者):预减库存+发送消息
@Service
public class SecKillService {
@Autowired private RedisTemplate
@Autowired private RabbitTemplate rabbitTemplate;
public SecKillResponse secKill(SecKillRequest request) {
String userId = request.getUserId();
String itemId = request.getItemId();
// 1. 校验用户资格(如是否已秒杀过)
if (Boolean.TRUE.equals(redisTemplate.hasKey("sec_kill:user:" + userId + ":" + itemId))) {
return SecKillResponse.failed("已参与秒杀");
}
// 2. Redis预减库存(原子操作,避免超卖)
String stockKey = "sec_kill:stock:" + itemId;
Long remainingStock = redisTemplate.opsForValue().decrement(stockKey);
if (remainingStock == null || remainingStock < 0) {
// 库存不足,恢复Redis计数
redisTemplate.opsForValue().increment(stockKey);
return SecKillResponse.failed("库存已空");
}
// 3. 发送秒杀成功消息到MQ,异步创建订单
SecKillSuccessEvent event = new SecKillSuccessEvent(userId, itemId);
rabbitTemplate.convertAndSend("sec_kill.exchange", "sec_kill.success", event);
// 4. 标记用户已参与秒杀
redisTemplate.opsForValue().set("sec_kill:user:" + userId + ":" + itemId, "1", 24, TimeUnit.HOURS);
return SecKillResponse.success("秒杀成功,订单创建中");
}
}
// 2. 订单处理服务(消费者):匀速创建订单
@Component
public class SecKillOrderConsumer {
@Autowired private OrderService orderService;
// 单消费者实例控制处理速度,或通过线程池调节
@RabbitListener(queues = "sec_kill.order.queue")
public void createSecKillOrder(SecKillSuccessEvent event) {
try {
// 匀速处理:创建订单(数据库操作)
orderService.createSecKillOrder(event.getUserId(), event.getItemId());
} catch (Exception e) {
log.error("创建秒杀订单失败,userId:{}, itemId:{}", event.getUserId(), event.getItemId(), e);
// 失败处理:恢复库存+通知用户
restoreStock(event.getItemId());
notificationService.notifyUser(event.getUserId(), "秒杀订单创建失败");
}
}
// 恢复库存(补偿逻辑)
private void restoreStock(String itemId) {
redisTemplate.opsForValue().increment("sec_kill:stock:" + itemId);
}
}
技术要点
超卖防护:用Redis原子操作(decrement)预减库存,避免多线程下超卖。
限流控制:网关层用Nginx或Sentinel限流,避免MQ被过多请求压满。
库存一致性:秒杀结束后,需对比Redis库存与数据库库存,修正差异(如Redis漏减)。
场景四:数据同步——保证微服务数据一致性
背景痛点
微服务架构中,每个服务有独立数据库(如用户服务用user_db,订单服务用order_db)。当用户状态变更(如禁用账号)时,订单服务的本地用户缓存需同步更新,否则会出现“用户已禁用但仍能下单”的问题。
MQ解决方案
用户服务数据变更时,发送“用户更新”消息到MQ;订单、支付等依赖用户数据的服务,订阅消息并更新本地缓存或数据库,实现数据最终一致性。
代码实现
// 1. 用户服务(生产者):事务内发送消息,保证数据与消息一致
@Service
public class UserService {
@Autowired private RocketMQTemplate rocketMQTemplate;
// 本地事务:更新数据库+发送消息
@Transactional
public void updateUserStatus(String userId, String status) {
// 1. 更新用户状态(数据库事务)
User user = userDao.findById(userId).orElseThrow(() -> new RuntimeException("用户不存在"));
user.setStatus(status);
userDao.save(user);
// 2. 发送事务消息(RocketMQ支持,确保消息与数据库操作同成功/同失败)
UserStatusEvent event = new UserStatusEvent(userId, status);
rocketMQTemplate.sendMessageInTransaction(
"user-status-topic", // 主题
MessageBuilder.withPayload(event).build(),
null // 事务参数(可选)
);
}
}
// 2. 订单服务(消费者):同步更新本地缓存
@Component
@RocketMQMessageListener(
topic = "user-status-topic",
consumerGroup = "order-service-group" // 消费者组,同一组内负载均衡
)
public class UserStatusConsumer implements RocketMQListener
@Autowired private LocalCacheManager localCacheManager;
@Override
public void onMessage(UserStatusEvent event) {
// 更新本地用户缓存(如禁用用户,后续下单会校验)
localCacheManager.updateUserStatus(event.getUserId(), event.getStatus());
// 可选:标记用户相关订单为“不可操作”
orderService.markOrderByUserStatus(event.getUserId(), event.getStatus());
}
}
技术要点
事务消息:用RocketMQ的事务消息机制,确保“数据库更新成功”与“消息发送成功”原子性(避免数据更新但消息未发,或消息发了但数据未更)。
幂等消费:消费者需实现幂等(如通过消息ID去重),避免重复更新缓存。
延迟处理:若下游服务依赖数据未准备好,可通过延迟队列重试(如3秒后再更新缓存)。
场景五:日志收集——分布式日志集中处理
背景痛点
分布式系统中,日志分散在多台服务器的本地文件中(如应用节点1的日志在/var/log/app1/,节点2在/var/log/app2/),排查问题时需逐个登录服务器查看,效率极低。
MQ解决方案
通过MQ实现日志“集中收集-分发处理”:
应用节点:将日志发送到MQ(如Kafka)。
消费服务:从MQ获取日志,分别写入Elasticsearch(用于查询)、HDFS(用于归档)、监控系统(用于告警)。
代码实现
// 1. 日志收集组件(生产者):嵌入应用,发送日志
@Component
public class LogCollector {
@Autowired private KafkaTemplate
// 收集日志并发送到Kafka
public void collect(String appId, String level, String message, Map
// 构建日志对象
LogEvent logEvent = new LogEvent();
logEvent.setAppId(appId);
logEvent.setLevel(level);
logEvent.setMessage(message);
logEvent.setContext(context);
logEvent.setTimestamp(System.currentTimeMillis());
// 发送到Kafka,按appId分区(便于按应用筛选)
kafkaTemplate.send(
"app-logs-topic",
appId, // 分区键:同一应用的日志进入同一分区,保证顺序
JsonUtils.toJson(logEvent)
);
}
}
// 2. 日志处理服务(消费者1):写入Elasticsearch,支持查询
@Service
public class EsLogConsumer {
@Autowired private ElasticsearchRestTemplate esTemplate;
@KafkaListener(topics = "app-logs-topic", groupId = "log-es-group")
public void writeToEs(String logJson) {
LogEvent logEvent = JsonUtils.fromJson(logJson, LogEvent.class);
// 写入Elasticsearch,索引按日期拆分(如app-logs-20251017)
String index = "app-logs-" + DateUtils.format(new Date(), "yyyyMMdd");
esTemplate.save(logEvent, index);
}
}
// 3. 日志告警服务(消费者2):异常日志触发告警
@Service
public class AlertLogConsumer {
@Autowired private AlertService alertService;
@KafkaListener(topics = "app-logs-topic", groupId = "log-alert-group")
public void checkAndAlert(String logJson) {
LogEvent logEvent = JsonUtils.fromJson(logJson, LogEvent.class);
// ERROR级日志触发告警
if ("ERROR".equals(logEvent.getLevel())) {
AlertMessage alert = new AlertMessage();
alert.setTitle("应用异常日志");
alert.setContent("appId:" + logEvent.getAppId() + ", 消息:" + logEvent.getMessage());
alertService.sendAlert(alert); // 发送邮件/短信告警
}
}
}
技术要点
MQ选型:优先选Kafka(高吞吐,支持海量日志收集)。
日志分区:按应用ID或服务名分区,避免单分区日志过多导致处理缓慢。
压缩传输:开启Kafka消息压缩(如GZIP),减少网络带宽消耗。
场景六:消息广播——配置更新实时同步
背景痛点
系统配置(如限流阈值、功能开关)更新后,需通知所有服务节点(如10台应用服务器)更新本地缓存,否则部分节点仍使用旧配置,导致行为不一致。
MQ解决方案
配置服务更新配置后,发送“配置更新”广播消息;所有服务节点订阅消息,实时更新本地配置缓存。常用MQ:RabbitMQ的Fanout交换机(广播)或Redis的Pub/Sub(轻量级广播)。
代码实现(Redis Pub/Sub)
// 1. 配置服务(发布者):广播配置更新
@Service
public class ConfigService {
@Autowired private RedisTemplate
public void updateConfig(String configKey, String configValue) {
// 1. 保存配置到数据库/Redis
configDao.updateConfig(configKey, configValue);
// 2. 广播配置更新消息(Redis Pub/Sub)
ConfigUpdateEvent event = new ConfigUpdateEvent(configKey, configValue);
redisTemplate.convertAndSend("config-update-channel", event);
}
}
// 2. 应用节点(订阅者):接收消息并更新本地缓存
@Component
public class ConfigSubscriber {
@Autowired private LocalConfigCache localConfigCache;
// 订阅Redis频道
@RedisMessageListener(channel = "config-update-channel")
public void onConfigUpdate(ConfigUpdateEvent event) {
// 更新本地配置缓存(内存缓存,如Caffeine)
localConfigCache.update(event.getConfigKey(), event.getConfigValue());
log.info("配置更新:{}={}", event.getConfigKey(), event.getConfigValue());
}
}
技术要点
可靠性选择:Redis Pub/Sub不保证消息持久化(节点离线会丢消息),若需可靠广播,用RabbitMQ的Fanout交换机+持久化队列。
配置版本:消息中携带配置版本号,避免旧消息覆盖新配置(如节点离线后收到旧消息)。
场景七:顺序消息——保证业务流程有序性
背景痛点
订单状态变更需按“创建→支付→发货→完成”的顺序处理,若消息乱序(如“发货”消息先于“支付”消息到达),会导致业务逻辑错误(如未支付就发货)。
MQ解决方案
通过“分区+顺序消费”保证消息有序:
发送端:同一订单的消息,按订单ID哈希到同一MQ分区(如Kafka的分区、RocketMQ的队列),确保同一订单的消息在同一分区内有序。
消费端:每个分区用单线程消费,避免多线程乱序。
代码实现(RocketMQ)
// 1. 订单服务(生产者):按订单ID分区,保证同一订单消息有序
@Service
public class OrderStateService {
@Autowired private RocketMQTemplate rocketMQTemplate;
public void changeOrderState(String orderId, String oldState, String newState) {
OrderStateEvent event = new OrderStateEvent(orderId, oldState, newState);
// 发送顺序消息:第三个参数为shardingKey(订单ID),同一ID进入同一队列
rocketMQTemplate.syncSendOrderly(
"order-state-topic", // 主题
event,
orderId // 分片键:保证同一订单消息有序
);
}
}
// 2. 订单处理服务(消费者):顺序消费
@Service
@RocketMQMessageListener(
topic = "order-state-topic",
consumerGroup = "order-state-group",
consumeMode = ConsumeMode.ORDERLY // 顺序消费模式(单线程处理每个队列)
)
public class OrderStateConsumer implements RocketMQListener
@Autowired private OrderService orderService;
@Override
public void onMessage(OrderStateEvent event) {
// 按顺序处理订单状态变更(如“支付”完成后才处理“发货”)
orderService.processStateChange(event.getOrderId(), event.getOldState(), event.getNewState());
}
}
技术要点
分区数量:分区数需合理(如按订单量设置10-20个分区),避免单分区消息堆积。
故障处理:若某分区消费失败,需暂停该分区消费(避免阻塞其他分区),排查问题后重试。
场景八:延迟消息——实现定时任务
背景痛点
订单超时未支付需自动取消(如30分钟后),传统方案如“数据库轮询”会导致数据库压力大、实时性差(如每分钟轮询一次,最多延迟1分钟)。
MQ解决方案
使用MQ的延迟队列:订单创建时发送“延迟30分钟”的消息,消费者在30分钟后收到消息,检查订单支付状态,未支付则取消。
代码实现(RabbitMQ)
// 1. 订单服务(生产者):发送延迟消息
@Service
public class OrderService {
@Autowired private RabbitTemplate rabbitTemplate;
public void createOrder(Order order) {
// 1. 保存订单(状态:未支付)
order.setStatus("UNPAID");
orderDao.save(order);
// 2. 发送延迟30分钟的消息
OrderCreateEvent event = new OrderCreateEvent(order.getId());
rabbitTemplate.convertAndSend(
"order.delay.exchange", // 延迟交换机
"order.delay.routing", // 延迟路由键
event,
message -> {
// 设置延迟时间:30分钟(单位:毫秒)
message.getMessageProperties().setDelay(30 * 60 * 1000);
return message;
}
);
}
}
// 2. 订单超时处理服务(消费者):30分钟后检查
@Component
public class OrderTimeoutConsumer {
@Autowired private OrderService orderService;
@RabbitListener(queues = "order.delay.queue")
public void checkOrderPayment(OrderCreateEvent event) {
// 查询订单当前状态
Order order = orderDao.findById(event.getOrderId()).orElse(null);
if (order != null && "UNPAID".equals(order.getStatus())) {
// 超时未支付,取消订单
orderService.cancelOrder(order.getId(), "超时未支付");
// 恢复库存
inventoryService.restoreStock(order.getItems());
}
}
}
技术要点
延迟精度:RabbitMQ的延迟队列精度约为秒级,若需更高精度(如毫秒级),可使用RocketMQ的定时消息。
消息持久化:开启队列持久化,避免MQ重启后延迟消息丢失。
替代方案对比: 方案 优点 缺点 数据库轮询 实现简单 数据库压力大,实时性差 MQ延迟队列 实时性好,无轮询压力 依赖MQ,需处理消息堆积 定时任务(如XXL-Job) 可控性强 分布式协调复杂,适合批量任务
场景九:消息重试——保证临时故障可恢复
背景痛点
消费者处理消息时,可能遇到临时故障(如数据库连接超时、下游服务临时不可用),若直接丢弃消息,会导致业务数据丢失。
MQ解决方案
通过“重试机制+死信队列”处理:
临时故障:消息重新入队,重试几次(如3次)。
永久故障:重试失败后,消息进入死信队列(DLQ),后续人工排查处理。
代码实现(RabbitMQ)
@Service
@Slf4j
public class RetryableConsumer {
@Autowired private BusinessService businessService;
// 监听业务队列,手动ACK(控制消息确认)
@RabbitListener(queues = "business.queue")
public void processMessage(Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
// 获取重试次数(自定义消息头)
Integer retryCount = message.getMessageProperties().getHeader("retry_count");
if (retryCount == null) retryCount = 0;
try {
// 1. 解析消息
String msgBody = new String(message.getBody(), StandardCharsets.UTF_8);
BusinessEvent event = JsonUtils.fromJson(msgBody, BusinessEvent.class);
// 2. 业务处理
businessService.process(event);
// 3. 处理成功,手动ACK(消息从队列删除)
channel.basicAck(deliveryTag, false);
} catch (TemporaryException e) {
// 临时故障(如数据库超时),重试(最多3次)
if (retryCount < 3) {
log.warn("临时故障,重试第{}次,消息:{}", retryCount + 1, msgBody, e);
// 设置重试次数,重新入队
message.getMessageProperties().setHeader("retry_count", retryCount + 1);
channel.basicNack(deliveryTag, false, true); // requeue=true:重新入队
} else {
// 重试超过3次,进入死信队列
log.error("重试3次失败,进入死信队列,消息:{}", msgBody, e);
channel.basicNack(deliveryTag, false, false); // requeue=false:不重新入队
}
} catch (PermanentException e) {
// 永久故障(如消息格式错误),直接进入死信队列
log.error("永久故障,进入死信队列,消息:{}", msgBody, e);
channel.basicNack(deliveryTag, false, false);
}
}
}
技术要点
手动ACK:关闭自动ACK(acknowledge-mode: MANUAL),确保业务处理成功后再确认消息。
重试间隔:可通过“延迟队列+重试次数”实现渐进式重试(如第1次间隔1秒,第2次3秒,第3次5秒)。
死信队列处理:定期监控死信队列,分析失败原因(如消息格式错误、业务逻辑bug),处理后可重新投递到业务队列。
场景十:事务消息——解决分布式事务问题
背景痛点
分布式系统中,“创建订单”和“扣减库存”需保证原子性(要么都成功,要么都失败)。若直接调用:
先创建订单,再扣减库存:库存扣减失败,订单已创建,导致超卖。
先扣减库存,再创建订单:订单创建失败,库存未恢复,导致库存锁定。
MQ解决方案
使用RocketMQ的事务消息,实现“本地事务+消息发送”的原子性:
发送半消息:消息进入MQ,但标记为“不可消费”。
执行本地事务:创建订单(数据库操作)。
确认事务:本地事务成功则提交消息(下游可消费),失败则回滚消息(消息删除)。
事务回查:若MQ未收到确认,定期回查本地事务状态,确保最终一致性。
代码实现
// 1. 订单服务(生产者):发送事务消息
@Service
public class OrderTransactionService {
@Autowired private RocketMQTemplate rocketMQTemplate;
@Autowired private OrderDao orderDao;
// 事务方法:创建订单+发送消息
@Transactional
public void createOrderWithTransaction(Order order) {
// 1. 发送半消息(不可消费)
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
"order-tx-topic", // 事务消息主题
MessageBuilder.withPayload(new OrderCreatedEvent(order.getId()))
.setHeader("orderId", order.getId()) // 携带订单ID,用于回查
.build(),
order // 事务参数(传递给本地事务方法)
);
// 2. 检查半消息发送结果
if (!result.getLocalTransactionState().equals(LocalTransactionState.UNKNOWN)) {
throw new RuntimeException("事务消息发送失败,订单创建中止");
}
}
// 2. 本地事务执行器(RocketMQ回调)
@RocketMQTransactionListener
public class OrderTransactionListener implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// arg为事务参数(订单对象)
Order order = (Order) arg;
// 执行本地事务:创建订单
orderDao.save(order);
// 本地事务成功,提交消息(下游可消费)
return RocketMQLocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
// 本地事务失败,回滚消息(消息删除)
return RocketMQLocalTransactionState.ROLLBACK_MESSAGE;
}
}
// 3. 事务回查(MQ未收到确认时调用)
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// 从消息头获取订单ID
String orderId = msg.getHeaders().get("orderId", String.class);
// 回查本地事务状态:订单是否存在
Order order = orderDao.findById(orderId).orElse(null);
if (order != null) {
// 订单存在,提交消息
return RocketMQLocalTransactionState.COMMIT_MESSAGE;
} else {
// 订单不存在,回滚消息
return RocketMQLocalTransactionState.ROLLBACK_MESSAGE;
}
}
}
}
// 4. 库存服务(消费者):消费事务消息,扣减库存
@Component
@RocketMQMessageListener(
topic = "order-tx-topic",
consumerGroup = "inventory-service-group"
)
public class InventoryTxConsumer implements RocketMQListener
@Autowired private InventoryService inventoryService;
@Override
public void onMessage(OrderCreatedEvent event) {
// 扣减库存(需实现幂等,避免重复扣减)
inventoryService.deductStock(event.getOrderId());
}
}
技术要点
幂等消费:库存扣减需通过“订单ID”去重(如扣减前检查是否已处理),避免MQ重试导致重复扣减。
事务回查:确保回查逻辑可靠(如通过订单ID查询数据库),避免因回查失败导致消息状态不确定。
补偿逻辑:若库存扣减失败,需触发补偿(如恢复订单状态、通知用户),或发送到死信队列人工处理。
三、MQ选型与最佳实践
1. MQ选型建议
场景需求
推荐MQ
原因
高吞吐(如日志)
Kafka
单机吞吐量10万+/秒,持久化能力强
事务消息
RocketMQ
原生支持事务消息,回查机制完善
复杂路由(如广播)
RabbitMQ
支持Fanout/Direct/Topic交换机,路由灵活
延迟消息
RabbitMQ/RocketMQ
RabbitMQ支持延迟队列,RocketMQ支持定时消息
轻量级(如配置同步)
Redis Pub/Sub
无需额外部署MQ,适合简单广播场景
2. 最佳实践
消息幂等:所有消费者必须实现幂等(如通过消息ID、业务唯一键去重),避免重复处理。
死信队列:为每个业务队列配置死信队列,兜底处理失败消息,避免消息丢失。
监控告警:监控MQ的消息堆积量、消费延迟、死信数量,超过阈值触发告警(如短信/邮件)。
性能优化:
开启消息压缩(如Kafka的GZIP),减少网络传输。
合理设置分区/队列数量,避免单分区堆积。
消费者线程池与队列数量匹配,避免线程空闲或过载。
四、总结
MQ不是“银弹”,但在分布式系统中,它能有效解决解耦、异步、削峰、一致性等核心问题。关键在于根据业务场景选择合适的MQ和方案,同时做好消息可靠性、幂等性、监控告警等基础保障。
除非注明,否则均为李锋镝的博客原创文章,转载必须以链接形式标明本文链接本文链接:https://www.lifengdi.com/zhong-jian-jian/4528
相关文章深度解析 Kafka Rebalance:从原理到实战,彻底解决消息积压、重复与丢失Redis 不只是缓存:8 大实战场景 + 深度避坑指南,从入门到架构师级应用Kafka 为什么要抛弃 Zookeeper?Redis的主从同步及Redis Cluster(集群)下的高可用为什么同样是分布式架构的Kafka需要Leader而Redis不需要?