10 个MQ高频业务场景深度解析

2026-02-23 07:22:36

在分布式系统开发中,很多开发者都会疑惑:“明明直接调用接口就能实现功能,为什么还要引入MQ这个‘中间商’?”其实,当系统面临高并发、紧耦合、数据一致性等问题时,MQ的价值才会真正凸显。本文结合实际业务场景,从问题背景、解决方案、代码实现到技术要点,全面拆解MQ的10种核心用法,帮你避开常见陷阱,真正用好消息队列。

一、MQ的核心价值:为什么需要它?

在引入具体场景前,先明确MQ解决的核心问题。系统间直接调用会面临四大痛点:

紧耦合:上游服务需感知所有下游服务,新增或移除下游需修改上游代码。

性能瓶颈:同步调用会累积所有下游服务的响应时间,导致接口耗时过长。

单点故障:任一下游服务宕机,会导致上游服务调用失败,影响核心流程。

扩展困难:下游服务扩容时,上游服务可能需要同步调整调用逻辑。

而引入MQ后,能实现五大核心能力:

解耦:上游只需发送消息,无需关心下游如何处理。

异步化:非核心流程异步执行,减少主流程响应时间。

削峰填谷:缓冲突发流量,避免数据库或服务被瞬时请求压垮。

冗余存储:消息持久化,避免数据丢失。

一致性保障:通过事务消息、重试机制,保证分布式系统数据一致性。

二、10个实战场景:从问题到解决方案

场景一:系统解耦——打破服务间强依赖

背景痛点

电商项目中,订单创建后需同步调用库存、积分、邮件、数据分析等多个服务(代码如下)。这种设计下,新增一个下游服务(如物流通知)需修改订单服务代码,且任一下游服务故障(如邮件服务超时)会导致订单创建失败。

// 紧耦合代码示例

@Service

public class OrderService {

// 依赖多个下游服务

@Autowired private InventoryService inventoryService;

@Autowired private PointsService pointsService;

@Autowired private EmailService emailService;

public void createOrder(Order order) {

// 1. 保存订单(核心流程)

orderDao.save(order);

// 2. 同步调用库存服务(非核心,但故障会阻塞订单)

inventoryService.updateInventory(order);

// 3. 同步调用积分服务(非核心)

pointsService.addPoints(order.getUserId(), order.getAmount());

// 4. 同步发送邮件(非核心,超时风险高)

emailService.sendConfirmation(order);

}

}

MQ解决方案

引入MQ后,订单服务只需发送“订单创建”消息到MQ,下游服务各自订阅消息并处理,服务间彻底解耦。架构如下:

订单服务 → 发送消息 → MQ → 库存服务/积分服务/邮件服务(各自消费)

代码实现

// 1. 订单服务(生产者):只关注核心流程,发送消息

@Service

public class OrderService {

@Autowired private RabbitTemplate rabbitTemplate;

public void createOrder(Order order) {

// 核心:保存订单

orderDao.save(order);

// 发送消息到MQ,参数:交换机、路由键、消息体

OrderCreatedEvent event = new OrderCreatedEvent(

order.getId(), order.getUserId(), order.getAmount()

);

rabbitTemplate.convertAndSend(

"order.exchange", // 交换机名称

"order.created", // 路由键(匹配下游队列)

event

);

}

}

// 2. 库存服务(消费者):订阅消息,独立处理

@Component

public class InventoryConsumer {

@Autowired private InventoryService inventoryService;

// 监听指定队列

@RabbitListener(queues = "inventory.queue")

public void handleOrderCreated(OrderCreatedEvent event) {

// 独立处理库存更新,失败不影响订单服务

inventoryService.updateInventory(event.getOrderId());

}

}

// 3. 邮件服务(消费者):同理订阅消息

@Component

public class EmailConsumer {

@Autowired private EmailService emailService;

@RabbitListener(queues = "email.queue")

public void handleOrderCreated(OrderCreatedEvent event) {

emailService.sendConfirmation(event.getUserId(), event.getOrderId());

}

}

技术要点

交换机与队列绑定:通过路由键(如order.created)将交换机与下游队列绑定,实现消息精准投递。

错误处理:下游服务失败时,通过重试机制(如RabbitMQ的重新入队)或死信队列兜底,避免消息丢失。

消息格式:使用JSON或Protobuf定义消息结构,保证跨服务、跨语言兼容性。

场景二:异步处理——提升核心流程响应速度

背景痛点

用户上传视频后,需执行转码、生成缩略图、内容审核等耗时操作(单步可能需几秒)。若同步处理,用户需等待操作完成才能收到反馈,体验极差。

MQ解决方案

视频服务接收上传后,立即保存原始视频并返回结果,同时发送“视频处理”消息到MQ;后台消费者异步执行转码、审核等操作,处理完成后通知用户。流程如下:

用户 → 视频服务(保存视频+发送消息)→ 返回上传成功 → MQ → 处理服务(转码/审核)→ 通知用户

代码实现

// 1. 视频服务(生产者):快速响应用户

@Service

public class VideoService {

@Autowired private KafkaTemplate kafkaTemplate;

public UploadResponse uploadVideo(MultipartFile file, String userId) {

// 1. 保存原始视频,生成唯一视频ID

String videoId = saveOriginalVideo(file);

// 2. 发送处理消息到Kafka(异步执行,不阻塞返回)

VideoProcessingEvent event = new VideoProcessingEvent(videoId, userId);

kafkaTemplate.send("video-processing-topic", event);

// 3. 立即返回,用户无需等待

return new UploadResponse(videoId, "上传成功,处理中");

}

}

// 2. 视频处理服务(消费者):后台异步处理

@Service

public class VideoProcessingConsumer {

@Autowired private VideoProcessor videoProcessor;

@Autowired private NotificationService notificationService;

// 监听Kafka主题

@KafkaListener(topics = "video-processing-topic")

public void processVideo(VideoProcessingEvent event) {

try {

// 耗时操作:转码、生成缩略图、内容审核

videoProcessor.transcode(event.getVideoId());

videoProcessor.generateThumbnails(event.getVideoId());

contentAuditService.check(event.getVideoId());

// 处理完成,通知用户

notificationService.notifyUser(event.getUserId(), "视频处理完成");

} catch (Exception e) {

log.error("视频处理失败,videoId:{}", event.getVideoId(), e);

// 失败重试:可发送到重试队列

}

}

}

技术要点

MQ选型:处理大量视频消息时,优先选Kafka(高吞吐、持久化能力强)。

响应设计:返回给用户“处理中”状态,并提供查询接口(如通过videoId查询进度)。

弹性扩展:若处理压力大,可增加消费者实例,MQ自动负载均衡消息。

场景三:流量削峰——应对秒杀等突发流量

背景痛点

电商秒杀活动中,瞬时请求量可能达到平时的100倍(如1秒10万请求)。若直接转发到数据库,会导致数据库连接耗尽、查询超时,甚至宕机。

MQ解决方案

通过MQ缓冲请求,将“瞬时高并发”转化为“匀速消费”:

网关层限流:拒绝超出承载能力的请求(如每秒只允许1万请求进入)。

MQ缓冲:通过队列存储秒杀请求,避免直接冲击数据库。

消费者匀速处理:控制消费者数量,确保数据库按承载能力处理订单(如每秒处理500单)。

代码实现

// 1. 秒杀服务(生产者):预减库存+发送消息

@Service

public class SecKillService {

@Autowired private RedisTemplate redisTemplate;

@Autowired private RabbitTemplate rabbitTemplate;

public SecKillResponse secKill(SecKillRequest request) {

String userId = request.getUserId();

String itemId = request.getItemId();

// 1. 校验用户资格(如是否已秒杀过)

if (Boolean.TRUE.equals(redisTemplate.hasKey("sec_kill:user:" + userId + ":" + itemId))) {

return SecKillResponse.failed("已参与秒杀");

}

// 2. Redis预减库存(原子操作,避免超卖)

String stockKey = "sec_kill:stock:" + itemId;

Long remainingStock = redisTemplate.opsForValue().decrement(stockKey);

if (remainingStock == null || remainingStock < 0) {

// 库存不足,恢复Redis计数

redisTemplate.opsForValue().increment(stockKey);

return SecKillResponse.failed("库存已空");

}

// 3. 发送秒杀成功消息到MQ,异步创建订单

SecKillSuccessEvent event = new SecKillSuccessEvent(userId, itemId);

rabbitTemplate.convertAndSend("sec_kill.exchange", "sec_kill.success", event);

// 4. 标记用户已参与秒杀

redisTemplate.opsForValue().set("sec_kill:user:" + userId + ":" + itemId, "1", 24, TimeUnit.HOURS);

return SecKillResponse.success("秒杀成功,订单创建中");

}

}

// 2. 订单处理服务(消费者):匀速创建订单

@Component

public class SecKillOrderConsumer {

@Autowired private OrderService orderService;

// 单消费者实例控制处理速度,或通过线程池调节

@RabbitListener(queues = "sec_kill.order.queue")

public void createSecKillOrder(SecKillSuccessEvent event) {

try {

// 匀速处理:创建订单(数据库操作)

orderService.createSecKillOrder(event.getUserId(), event.getItemId());

} catch (Exception e) {

log.error("创建秒杀订单失败,userId:{}, itemId:{}", event.getUserId(), event.getItemId(), e);

// 失败处理:恢复库存+通知用户

restoreStock(event.getItemId());

notificationService.notifyUser(event.getUserId(), "秒杀订单创建失败");

}

}

// 恢复库存(补偿逻辑)

private void restoreStock(String itemId) {

redisTemplate.opsForValue().increment("sec_kill:stock:" + itemId);

}

}

技术要点

超卖防护:用Redis原子操作(decrement)预减库存,避免多线程下超卖。

限流控制:网关层用Nginx或Sentinel限流,避免MQ被过多请求压满。

库存一致性:秒杀结束后,需对比Redis库存与数据库库存,修正差异(如Redis漏减)。

场景四:数据同步——保证微服务数据一致性

背景痛点

微服务架构中,每个服务有独立数据库(如用户服务用user_db,订单服务用order_db)。当用户状态变更(如禁用账号)时,订单服务的本地用户缓存需同步更新,否则会出现“用户已禁用但仍能下单”的问题。

MQ解决方案

用户服务数据变更时,发送“用户更新”消息到MQ;订单、支付等依赖用户数据的服务,订阅消息并更新本地缓存或数据库,实现数据最终一致性。

代码实现

// 1. 用户服务(生产者):事务内发送消息,保证数据与消息一致

@Service

public class UserService {

@Autowired private RocketMQTemplate rocketMQTemplate;

// 本地事务:更新数据库+发送消息

@Transactional

public void updateUserStatus(String userId, String status) {

// 1. 更新用户状态(数据库事务)

User user = userDao.findById(userId).orElseThrow(() -> new RuntimeException("用户不存在"));

user.setStatus(status);

userDao.save(user);

// 2. 发送事务消息(RocketMQ支持,确保消息与数据库操作同成功/同失败)

UserStatusEvent event = new UserStatusEvent(userId, status);

rocketMQTemplate.sendMessageInTransaction(

"user-status-topic", // 主题

MessageBuilder.withPayload(event).build(),

null // 事务参数(可选)

);

}

}

// 2. 订单服务(消费者):同步更新本地缓存

@Component

@RocketMQMessageListener(

topic = "user-status-topic",

consumerGroup = "order-service-group" // 消费者组,同一组内负载均衡

)

public class UserStatusConsumer implements RocketMQListener {

@Autowired private LocalCacheManager localCacheManager;

@Override

public void onMessage(UserStatusEvent event) {

// 更新本地用户缓存(如禁用用户,后续下单会校验)

localCacheManager.updateUserStatus(event.getUserId(), event.getStatus());

// 可选:标记用户相关订单为“不可操作”

orderService.markOrderByUserStatus(event.getUserId(), event.getStatus());

}

}

技术要点

事务消息:用RocketMQ的事务消息机制,确保“数据库更新成功”与“消息发送成功”原子性(避免数据更新但消息未发,或消息发了但数据未更)。

幂等消费:消费者需实现幂等(如通过消息ID去重),避免重复更新缓存。

延迟处理:若下游服务依赖数据未准备好,可通过延迟队列重试(如3秒后再更新缓存)。

场景五:日志收集——分布式日志集中处理

背景痛点

分布式系统中,日志分散在多台服务器的本地文件中(如应用节点1的日志在/var/log/app1/,节点2在/var/log/app2/),排查问题时需逐个登录服务器查看,效率极低。

MQ解决方案

通过MQ实现日志“集中收集-分发处理”:

应用节点:将日志发送到MQ(如Kafka)。

消费服务:从MQ获取日志,分别写入Elasticsearch(用于查询)、HDFS(用于归档)、监控系统(用于告警)。

代码实现

// 1. 日志收集组件(生产者):嵌入应用,发送日志

@Component

public class LogCollector {

@Autowired private KafkaTemplate kafkaTemplate;

// 收集日志并发送到Kafka

public void collect(String appId, String level, String message, Map context) {

// 构建日志对象

LogEvent logEvent = new LogEvent();

logEvent.setAppId(appId);

logEvent.setLevel(level);

logEvent.setMessage(message);

logEvent.setContext(context);

logEvent.setTimestamp(System.currentTimeMillis());

// 发送到Kafka,按appId分区(便于按应用筛选)

kafkaTemplate.send(

"app-logs-topic",

appId, // 分区键:同一应用的日志进入同一分区,保证顺序

JsonUtils.toJson(logEvent)

);

}

}

// 2. 日志处理服务(消费者1):写入Elasticsearch,支持查询

@Service

public class EsLogConsumer {

@Autowired private ElasticsearchRestTemplate esTemplate;

@KafkaListener(topics = "app-logs-topic", groupId = "log-es-group")

public void writeToEs(String logJson) {

LogEvent logEvent = JsonUtils.fromJson(logJson, LogEvent.class);

// 写入Elasticsearch,索引按日期拆分(如app-logs-20251017)

String index = "app-logs-" + DateUtils.format(new Date(), "yyyyMMdd");

esTemplate.save(logEvent, index);

}

}

// 3. 日志告警服务(消费者2):异常日志触发告警

@Service

public class AlertLogConsumer {

@Autowired private AlertService alertService;

@KafkaListener(topics = "app-logs-topic", groupId = "log-alert-group")

public void checkAndAlert(String logJson) {

LogEvent logEvent = JsonUtils.fromJson(logJson, LogEvent.class);

// ERROR级日志触发告警

if ("ERROR".equals(logEvent.getLevel())) {

AlertMessage alert = new AlertMessage();

alert.setTitle("应用异常日志");

alert.setContent("appId:" + logEvent.getAppId() + ", 消息:" + logEvent.getMessage());

alertService.sendAlert(alert); // 发送邮件/短信告警

}

}

}

技术要点

MQ选型:优先选Kafka(高吞吐,支持海量日志收集)。

日志分区:按应用ID或服务名分区,避免单分区日志过多导致处理缓慢。

压缩传输:开启Kafka消息压缩(如GZIP),减少网络带宽消耗。

场景六:消息广播——配置更新实时同步

背景痛点

系统配置(如限流阈值、功能开关)更新后,需通知所有服务节点(如10台应用服务器)更新本地缓存,否则部分节点仍使用旧配置,导致行为不一致。

MQ解决方案

配置服务更新配置后,发送“配置更新”广播消息;所有服务节点订阅消息,实时更新本地配置缓存。常用MQ:RabbitMQ的Fanout交换机(广播)或Redis的Pub/Sub(轻量级广播)。

代码实现(Redis Pub/Sub)

// 1. 配置服务(发布者):广播配置更新

@Service

public class ConfigService {

@Autowired private RedisTemplate redisTemplate;

public void updateConfig(String configKey, String configValue) {

// 1. 保存配置到数据库/Redis

configDao.updateConfig(configKey, configValue);

// 2. 广播配置更新消息(Redis Pub/Sub)

ConfigUpdateEvent event = new ConfigUpdateEvent(configKey, configValue);

redisTemplate.convertAndSend("config-update-channel", event);

}

}

// 2. 应用节点(订阅者):接收消息并更新本地缓存

@Component

public class ConfigSubscriber {

@Autowired private LocalConfigCache localConfigCache;

// 订阅Redis频道

@RedisMessageListener(channel = "config-update-channel")

public void onConfigUpdate(ConfigUpdateEvent event) {

// 更新本地配置缓存(内存缓存,如Caffeine)

localConfigCache.update(event.getConfigKey(), event.getConfigValue());

log.info("配置更新:{}={}", event.getConfigKey(), event.getConfigValue());

}

}

技术要点

可靠性选择:Redis Pub/Sub不保证消息持久化(节点离线会丢消息),若需可靠广播,用RabbitMQ的Fanout交换机+持久化队列。

配置版本:消息中携带配置版本号,避免旧消息覆盖新配置(如节点离线后收到旧消息)。

场景七:顺序消息——保证业务流程有序性

背景痛点

订单状态变更需按“创建→支付→发货→完成”的顺序处理,若消息乱序(如“发货”消息先于“支付”消息到达),会导致业务逻辑错误(如未支付就发货)。

MQ解决方案

通过“分区+顺序消费”保证消息有序:

发送端:同一订单的消息,按订单ID哈希到同一MQ分区(如Kafka的分区、RocketMQ的队列),确保同一订单的消息在同一分区内有序。

消费端:每个分区用单线程消费,避免多线程乱序。

代码实现(RocketMQ)

// 1. 订单服务(生产者):按订单ID分区,保证同一订单消息有序

@Service

public class OrderStateService {

@Autowired private RocketMQTemplate rocketMQTemplate;

public void changeOrderState(String orderId, String oldState, String newState) {

OrderStateEvent event = new OrderStateEvent(orderId, oldState, newState);

// 发送顺序消息:第三个参数为shardingKey(订单ID),同一ID进入同一队列

rocketMQTemplate.syncSendOrderly(

"order-state-topic", // 主题

event,

orderId // 分片键:保证同一订单消息有序

);

}

}

// 2. 订单处理服务(消费者):顺序消费

@Service

@RocketMQMessageListener(

topic = "order-state-topic",

consumerGroup = "order-state-group",

consumeMode = ConsumeMode.ORDERLY // 顺序消费模式(单线程处理每个队列)

)

public class OrderStateConsumer implements RocketMQListener {

@Autowired private OrderService orderService;

@Override

public void onMessage(OrderStateEvent event) {

// 按顺序处理订单状态变更(如“支付”完成后才处理“发货”)

orderService.processStateChange(event.getOrderId(), event.getOldState(), event.getNewState());

}

}

技术要点

分区数量:分区数需合理(如按订单量设置10-20个分区),避免单分区消息堆积。

故障处理:若某分区消费失败,需暂停该分区消费(避免阻塞其他分区),排查问题后重试。

场景八:延迟消息——实现定时任务

背景痛点

订单超时未支付需自动取消(如30分钟后),传统方案如“数据库轮询”会导致数据库压力大、实时性差(如每分钟轮询一次,最多延迟1分钟)。

MQ解决方案

使用MQ的延迟队列:订单创建时发送“延迟30分钟”的消息,消费者在30分钟后收到消息,检查订单支付状态,未支付则取消。

代码实现(RabbitMQ)

// 1. 订单服务(生产者):发送延迟消息

@Service

public class OrderService {

@Autowired private RabbitTemplate rabbitTemplate;

public void createOrder(Order order) {

// 1. 保存订单(状态:未支付)

order.setStatus("UNPAID");

orderDao.save(order);

// 2. 发送延迟30分钟的消息

OrderCreateEvent event = new OrderCreateEvent(order.getId());

rabbitTemplate.convertAndSend(

"order.delay.exchange", // 延迟交换机

"order.delay.routing", // 延迟路由键

event,

message -> {

// 设置延迟时间:30分钟(单位:毫秒)

message.getMessageProperties().setDelay(30 * 60 * 1000);

return message;

}

);

}

}

// 2. 订单超时处理服务(消费者):30分钟后检查

@Component

public class OrderTimeoutConsumer {

@Autowired private OrderService orderService;

@RabbitListener(queues = "order.delay.queue")

public void checkOrderPayment(OrderCreateEvent event) {

// 查询订单当前状态

Order order = orderDao.findById(event.getOrderId()).orElse(null);

if (order != null && "UNPAID".equals(order.getStatus())) {

// 超时未支付,取消订单

orderService.cancelOrder(order.getId(), "超时未支付");

// 恢复库存

inventoryService.restoreStock(order.getItems());

}

}

}

技术要点

延迟精度:RabbitMQ的延迟队列精度约为秒级,若需更高精度(如毫秒级),可使用RocketMQ的定时消息。

消息持久化:开启队列持久化,避免MQ重启后延迟消息丢失。

替代方案对比: 方案 优点 缺点 数据库轮询 实现简单 数据库压力大,实时性差 MQ延迟队列 实时性好,无轮询压力 依赖MQ,需处理消息堆积 定时任务(如XXL-Job) 可控性强 分布式协调复杂,适合批量任务

场景九:消息重试——保证临时故障可恢复

背景痛点

消费者处理消息时,可能遇到临时故障(如数据库连接超时、下游服务临时不可用),若直接丢弃消息,会导致业务数据丢失。

MQ解决方案

通过“重试机制+死信队列”处理:

临时故障:消息重新入队,重试几次(如3次)。

永久故障:重试失败后,消息进入死信队列(DLQ),后续人工排查处理。

代码实现(RabbitMQ)

@Service

@Slf4j

public class RetryableConsumer {

@Autowired private BusinessService businessService;

// 监听业务队列,手动ACK(控制消息确认)

@RabbitListener(queues = "business.queue")

public void processMessage(Message message, Channel channel) throws IOException {

long deliveryTag = message.getMessageProperties().getDeliveryTag();

// 获取重试次数(自定义消息头)

Integer retryCount = message.getMessageProperties().getHeader("retry_count");

if (retryCount == null) retryCount = 0;

try {

// 1. 解析消息

String msgBody = new String(message.getBody(), StandardCharsets.UTF_8);

BusinessEvent event = JsonUtils.fromJson(msgBody, BusinessEvent.class);

// 2. 业务处理

businessService.process(event);

// 3. 处理成功,手动ACK(消息从队列删除)

channel.basicAck(deliveryTag, false);

} catch (TemporaryException e) {

// 临时故障(如数据库超时),重试(最多3次)

if (retryCount < 3) {

log.warn("临时故障,重试第{}次,消息:{}", retryCount + 1, msgBody, e);

// 设置重试次数,重新入队

message.getMessageProperties().setHeader("retry_count", retryCount + 1);

channel.basicNack(deliveryTag, false, true); // requeue=true:重新入队

} else {

// 重试超过3次,进入死信队列

log.error("重试3次失败,进入死信队列,消息:{}", msgBody, e);

channel.basicNack(deliveryTag, false, false); // requeue=false:不重新入队

}

} catch (PermanentException e) {

// 永久故障(如消息格式错误),直接进入死信队列

log.error("永久故障,进入死信队列,消息:{}", msgBody, e);

channel.basicNack(deliveryTag, false, false);

}

}

}

技术要点

手动ACK:关闭自动ACK(acknowledge-mode: MANUAL),确保业务处理成功后再确认消息。

重试间隔:可通过“延迟队列+重试次数”实现渐进式重试(如第1次间隔1秒,第2次3秒,第3次5秒)。

死信队列处理:定期监控死信队列,分析失败原因(如消息格式错误、业务逻辑bug),处理后可重新投递到业务队列。

场景十:事务消息——解决分布式事务问题

背景痛点

分布式系统中,“创建订单”和“扣减库存”需保证原子性(要么都成功,要么都失败)。若直接调用:

先创建订单,再扣减库存:库存扣减失败,订单已创建,导致超卖。

先扣减库存,再创建订单:订单创建失败,库存未恢复,导致库存锁定。

MQ解决方案

使用RocketMQ的事务消息,实现“本地事务+消息发送”的原子性:

发送半消息:消息进入MQ,但标记为“不可消费”。

执行本地事务:创建订单(数据库操作)。

确认事务:本地事务成功则提交消息(下游可消费),失败则回滚消息(消息删除)。

事务回查:若MQ未收到确认,定期回查本地事务状态,确保最终一致性。

代码实现

// 1. 订单服务(生产者):发送事务消息

@Service

public class OrderTransactionService {

@Autowired private RocketMQTemplate rocketMQTemplate;

@Autowired private OrderDao orderDao;

// 事务方法:创建订单+发送消息

@Transactional

public void createOrderWithTransaction(Order order) {

// 1. 发送半消息(不可消费)

TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(

"order-tx-topic", // 事务消息主题

MessageBuilder.withPayload(new OrderCreatedEvent(order.getId()))

.setHeader("orderId", order.getId()) // 携带订单ID,用于回查

.build(),

order // 事务参数(传递给本地事务方法)

);

// 2. 检查半消息发送结果

if (!result.getLocalTransactionState().equals(LocalTransactionState.UNKNOWN)) {

throw new RuntimeException("事务消息发送失败,订单创建中止");

}

}

// 2. 本地事务执行器(RocketMQ回调)

@RocketMQTransactionListener

public class OrderTransactionListener implements RocketMQLocalTransactionListener {

@Override

public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {

try {

// arg为事务参数(订单对象)

Order order = (Order) arg;

// 执行本地事务:创建订单

orderDao.save(order);

// 本地事务成功,提交消息(下游可消费)

return RocketMQLocalTransactionState.COMMIT_MESSAGE;

} catch (Exception e) {

// 本地事务失败,回滚消息(消息删除)

return RocketMQLocalTransactionState.ROLLBACK_MESSAGE;

}

}

// 3. 事务回查(MQ未收到确认时调用)

@Override

public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {

// 从消息头获取订单ID

String orderId = msg.getHeaders().get("orderId", String.class);

// 回查本地事务状态:订单是否存在

Order order = orderDao.findById(orderId).orElse(null);

if (order != null) {

// 订单存在,提交消息

return RocketMQLocalTransactionState.COMMIT_MESSAGE;

} else {

// 订单不存在,回滚消息

return RocketMQLocalTransactionState.ROLLBACK_MESSAGE;

}

}

}

}

// 4. 库存服务(消费者):消费事务消息,扣减库存

@Component

@RocketMQMessageListener(

topic = "order-tx-topic",

consumerGroup = "inventory-service-group"

)

public class InventoryTxConsumer implements RocketMQListener {

@Autowired private InventoryService inventoryService;

@Override

public void onMessage(OrderCreatedEvent event) {

// 扣减库存(需实现幂等,避免重复扣减)

inventoryService.deductStock(event.getOrderId());

}

}

技术要点

幂等消费:库存扣减需通过“订单ID”去重(如扣减前检查是否已处理),避免MQ重试导致重复扣减。

事务回查:确保回查逻辑可靠(如通过订单ID查询数据库),避免因回查失败导致消息状态不确定。

补偿逻辑:若库存扣减失败,需触发补偿(如恢复订单状态、通知用户),或发送到死信队列人工处理。

三、MQ选型与最佳实践

1. MQ选型建议

场景需求

推荐MQ

原因

高吞吐(如日志)

Kafka

单机吞吐量10万+/秒,持久化能力强

事务消息

RocketMQ

原生支持事务消息,回查机制完善

复杂路由(如广播)

RabbitMQ

支持Fanout/Direct/Topic交换机,路由灵活

延迟消息

RabbitMQ/RocketMQ

RabbitMQ支持延迟队列,RocketMQ支持定时消息

轻量级(如配置同步)

Redis Pub/Sub

无需额外部署MQ,适合简单广播场景

2. 最佳实践

消息幂等:所有消费者必须实现幂等(如通过消息ID、业务唯一键去重),避免重复处理。

死信队列:为每个业务队列配置死信队列,兜底处理失败消息,避免消息丢失。

监控告警:监控MQ的消息堆积量、消费延迟、死信数量,超过阈值触发告警(如短信/邮件)。

性能优化:

开启消息压缩(如Kafka的GZIP),减少网络传输。

合理设置分区/队列数量,避免单分区堆积。

消费者线程池与队列数量匹配,避免线程空闲或过载。

四、总结

MQ不是“银弹”,但在分布式系统中,它能有效解决解耦、异步、削峰、一致性等核心问题。关键在于根据业务场景选择合适的MQ和方案,同时做好消息可靠性、幂等性、监控告警等基础保障。

除非注明,否则均为李锋镝的博客原创文章,转载必须以链接形式标明本文链接本文链接:https://www.lifengdi.com/zhong-jian-jian/4528

相关文章深度解析 Kafka Rebalance:从原理到实战,彻底解决消息积压、重复与丢失Redis 不只是缓存:8 大实战场景 + 深度避坑指南,从入门到架构师级应用Kafka 为什么要抛弃 Zookeeper?Redis的主从同步及Redis Cluster(集群)下的高可用为什么同样是分布式架构的Kafka需要Leader而Redis不需要?

口袋妖怪火红版可生蛋的地点
苹果芯片五年进化史:从M1到M4,AI时代芯片革命的启示