导语:风暴之眼,直面架构的“灵魂拷问”
在前三章的铺垫中,我们构建了一个堪称铜墙铁壁的防御体系。从客户端到CDN,再到Nginx和Sentinel,我们像剥洋葱一样,层层削减了99.9%的无效流量。现在,那些通过了重重考验的、最高质量的请求,终于抵达了风暴的中心——秒杀核心服务 (Seckill-Core Service)
在这里,我们必须回答整个秒杀架构中最核心、最致命的“灵魂拷问”:
当成千上万的线程在同一毫秒,企图对同一个商品的库存(一个共享变量)进行减一操作时,我们如何保证:
原子性(Atomicity): “读取库存、判断是否足够、减库存”这三个动作必须捆绑执行,不可分割,否则就会出现“超卖”。
高性能(High Performance): 处理过程必须在内存中以微秒级完成,任何对数据库的同步操作,都将引发灾难性的性能雪崩。
一致性(Consistency): 如何确保缓存中的库存与数据库中的最终库存保持一致?
这个问题,是分布式系统领域“数据一致性”与“高性能”这对永恒矛盾的经典缩影。任何试图用传统方式——例如,在Java代码中使用synchronized关键字,或者在数据库层面使用SELECT … FOR UPDATE悲观锁——来解决这个问题的尝试,都将在秒杀的流量洪峰下被碾得粉碎。
synchronized只能保证单JVM内的线程安全,无法处理分布式集群;而数据库悲观锁,会将所有并发请求串行化,一个请求锁住一行,其他所有请求都得排队等着,QPS会瞬间从几万跌到几十,系统形同宕机。
本篇,我们将彻底抛弃这些传统武器,祭出两件专门为高并发场景而生的“神器”:Redis(及其Lua脚本原子操作)和消息队列(Message Queue)。我们将联手施展一招“乾坤挪移”大法,将一个原本需要在数据库中完成的、沉重的同步事务,巧妙地分解为一个“极速内存预扣减”和一个“可靠异步持久化”的流程。
这不仅是技术的选择,更是一次深刻的架构思想的转变。


本章作战地图 (Table of Contents)

  • 第一战场:Redis——内存中的极速白刃战
  • 1.1 核心矛盾:为何简单的DECR不够?
  • 问题:经典的“Check-Then-Act”并发问题,即使在Redis中也同样存在。
  • 目标:理解在秒杀场景下,库存扣减不仅仅是-1,还包含资格校验、防止超卖等复合逻辑。
  • 1.2 终极武器:Lua脚本——在Redis中注入原子“事务”
  • 核心:Lua脚本为何能在Redis中保证原子性执行?(Redis的单线程模型)
  • Lua实战代码(V1.0 - 基础版): 编写一个完成“资格校验 + 库存扣减”的完整Lua脚本,并逐行详解。
  • Java实战代码: 如何使用spring-boot-starter-data-redis中的RedisScript和**redisTemplate.execute()**方法,在Java中优雅地调用Lua脚本。
  • 1.3 Lua脚本的进化:处理更复杂的场景
  • Lua实战代码(V2.0 - 进阶版): 增加“每人限购一件”的逻辑,在一个Lua脚本中同时操作库存(String)和用户购买记录(Set)两个Key。
  • 深度探讨: Lua脚本中如何传递多个KEYS和ARGS,以及脚本缓存机制(EVALSHA)带来的性能提升。
  • 1.4 数据预热:战争前的粮草先行
  • 技术点:如何在秒杀开始前,将数据库中的商品库存、限购信息等,提前加载到Redis中。
  • Java实战代码: 使用**@PostConstructApplicationRunner**实现服务启动时的自动预热。
  • 第二战场:消息队列——削峰填谷的“三峡大坝”
  • 2.1 设计哲学:为何必须异步?
  • 核心:同步下单流程的致命缺陷分析(用户体验差、系统耦合度高、后端压力巨大)。
  • 目标:确立“预扣减成功,即秒杀成功一半”的核心思想。
  • 2.2 技术选型:Kafka vs. RocketMQ vs. Pulsar
  • 决策矩阵: 从吞吐量、延迟、可靠性(事务消息)、运维复杂度、社区生态等多个维度,进行一场硬核的技术选型思辨。
  • 我们的选择: 为什么在电商秒杀场景下,RocketMQ的“事务消息”和“延迟消息”特性,往往比Kafka的极致吞吐量更具吸引力?
  • 2.3 生产者端实战 (Seckill-Core Service)
  • Java实战代码: 演示如何构建一个秒杀消息体(SeckillMessage DTO),并使用RocketMQTemplate发送消息。
  • 可靠性保证: 如何利用RocketMQ的“事务消息”,保证“Redis库存扣减成功”和“发送MQ消息成功”这两个操作的原子性(要么都成功,要么都失败),防止“库存扣了,但订单消息没发出去”的致命问题。
  • 深度探讨: RocketMQ事务消息的两阶段提交(Prepare -> Commit/Rollback)原理。
  • 2.4 消费者端实战 (Order Service)
  • Java实战代码: 编写一个RocketMQ消费者监听器(@RocketMQMessageListener),处理秒杀成功消息,并执行真正的数据库INSERT操作。
  • 核心挑战一:消费幂等性保证
  • 问题:MQ可能因为网络抖动、Broker重平衡等原因,重复投递同一条消息。如果消费者不处理,就会造成一个用户下多个订单。
  • 解决方案“三板斧”:
  1. 业务ID + 数据库唯一索引(如订单号order_sn)。
  2. 引入独立的“消费记录表”。
  3. 使用Redis的setnx构建分布式锁。我们将详细对比这三种方案的优劣,并给出推荐的组合拳。
  4. 核心挑战二:消费失败与重试
  • 问题:如果因为数据库抖动,导致订单创建失败,消息该何去何从?
  • 解决方案: 配置消费重试与“死信队列(Dead-Letter Queue)”。失败的消息会自动进入重试队列,多次重试后仍失败,则最终投递到死信队列,等待人工干预或离线任务进行补偿和对账。

第一战场:Redis——内存中的极速白刃战

数据库是我们的“后方基地”,负责数据的最终安全。而Redis,就是我们部署在最前线的“特种部队”,它必须在瞬息之间,完成最关键的“夺旗”任务——库存扣减。

1.1 核心矛盾:为何简单的DECR不够?

很多初学者认为,秒杀扣库存,用Redis的DECR命令不就行了吗?它是原子的。

这个想法,只对了一半。

在真实的秒杀业务中,“扣库存”从来不是一个孤立的**-1操作,它是一个复合的业务逻辑**:

  1. 检查秒杀活动是否已开始/已结束?
  2. 检查用户是否已有购买资格(防止重复秒杀)?
  3. 读取当前库存,判断是否大于0?
  4. 如果库存大于0,则执行扣减操作。
  5. 记录该用户已获得购买资格。

如果你在Java代码中,像下面这样做,那么一场灾难正在等着你:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// !!! 错误示范 - 典型的Check-Then-Act并发问题 !!!
public boolean wrongWayToDeductStock(String userId, String itemId) {
// 1. Check: 读取库存
int stock = Integer.parseInt(redisTemplate.opsForValue().get("seckill:stock:" + itemId));
if (stock > 0) {
// 在这里,高并发下,多个线程可能同时通过了if判断!

// 2. Act: 扣减库存
redisTemplate.opsForValue().decrement("seckill:stock:" + itemId);

// ...其他逻辑...
return true;
}
return false;
}

这就是经典的“Check-Then-Act”并发问题。在Check(检查库存)和Act(扣减库存)之间,存在一个时间窗口,足以让成百上千的线程都认为“库存充足”,从而导致库存被扣减成负数,引发“超卖”。

1.2 终极武器:Lua脚本——在Redis中注入原子“事务”

如何将这5个步骤捆绑成一个不可分割的原子操作?答案就是Lua脚本

Redis从2.6版本开始,引入了对Lua脚本的支持。通过EVAL命令,你可以让一段Lua脚本在Redis服务端原子性地执行。Redis会将整个脚本作为一个单独的命令来处理,在脚本执行期间,不会有其他任何命令被执行。这利用了Redis自身的单线程事件循环模型,从根本上杜绝了并发问题。

Lua实战代码(V1.0 - 基础版):

下面是一个实现基础“库存扣减”逻辑的Lua脚本(seckill.lua):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
-- seckill.lua

-- KEYS[1]: 商品库存的Key, e.g., "seckill:stock:123"
-- ARGV[1]: 期望秒杀的数量 (通常是1)

-- 获取当前库存
local stock = redis.call('get', KEYS[1])

-- 将库存转换为数字
local stock_num = tonumber(stock)

-- 将要扣减的数量转换为数字
local deduct_amount = tonumber(ARGV[1])

-- 判断库存是否充足
if stock_num >= deduct_amount then
-- 如果充足,则执行扣减,并返回扣减后的库存
-- 'decrby'命令本身是原子的,它会返回扣减后的值
return redis.call('decrby', KEYS[1], deduct_amount)
else
-- 如果库存不足,返回-1作为失败标记
return -1
end

Java实战代码:如何优雅地调用Lua脚本

spring-boot-starter-data-redis为我们提供了非常方便的工具。

  1. 创建RedisScript对象:在你的配置类或Service中,将Lua脚本加载为一个RedisScript Bean。
1
2
3
4
5
6
7
8
9
10
11
12
13
@Configuration
public class RedisConfig {

@Bean
public DefaultRedisScript<Long> seckillScript() {
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
// 指定Lua脚本的位置,通常放在resources目录下
redisScript.setLocation(new ClassPathResource("lua/seckill.lua"));
// 指定脚本的返回值类型
redisScript.setResultType(Long.class);
return redisScript;
}
}
  1. 在Service中执行脚本:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Service
public class SeckillService {

@Autowired
private StringRedisTemplate redisTemplate;

@Autowired
private DefaultRedisScript<Long> seckillScript;

public boolean deductStock(String itemId) {
// 参数1: RedisScript对象
// 参数2: Key列表 (对应Lua中的KEYS)
// 参数3...: Arg列表 (对应Lua中的ARGV)
Long result = redisTemplate.execute(
seckillScript,
Collections.singletonList("seckill:stock:" + itemId), // KEYS[1]
"1" // ARGV[1]
);

if (result != null && result >= 0) {
// 返回值 >= 0,表示扣减成功
log.info("商品 {} 库存扣减成功,剩余库存: {}", itemId, result);
return true;
} else {
// 返回值为-1,表示库存不足
log.warn("商品 {} 库存不足,扣减失败", itemId);
return false;
}
}
}

至此,我们已经用Lua脚本,将一个危险的并发操作,变成了一个100%线程安全的原子操作。

1.3 Lua脚本的进化:处理更复杂的场景

真实的秒杀远比这复杂。比如,我们还需要加上“每人限购一件”的逻辑。

Lua实战代码(V2.0 - 进阶版):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
-- seckill_plus.lua

-- KEYS[1]: 商品库存的Key, e.g., "seckill:stock:123"
-- KEYS[2]: 用户购买记录的Set Key, e.g., "seckill:users:123"

-- ARGV[1]: 当前发起请求的用户ID
-- ARGV[2]: 期望秒杀的数量 (通常是1)

-- 1. 检查用户是否已经购买过
-- sismember命令,判断一个元素是否在集合中
if redis.call('sismember', KEYS[2], ARGV[1]) == 1 then
-- 如果返回1,表示用户已在购买集合中,直接返回-2作为“重复购买”的标记
return -2
end

-- 2. 检查库存
local stock = redis.call('get', KEYS[1])
if not stock or tonumber(stock) < tonumber(ARGV[2]) then
-- 如果库存不存在,或者库存不足,返回-1作为“库存不足”的标记
return -1
end

-- 3. 执行扣减
local remaining_stock = redis.call('decrby', KEYS[1], ARGV[2])

-- 4. 将用户ID加入购买记录Set
redis.call('sadd', KEYS[2], ARGV[1])

-- 5. 返回扣减后的库存
return remaining_stock

Java调用代码的相应调整:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public String deductStockPlus(String userId, String itemId) {
Long result = redisTemplate.execute(
seckillPlusScript, // 假设已加载seckill_plus.lua
Arrays.asList("seckill:stock:" + itemId, "seckill:users:" + itemId), // 传递两个Key
userId, "1" // 传递两个Arg
);

if (result == null) return "未知错误";
if (result == -1) return "库存不足";
if (result == -2) return "您已抢购过该商品";

// 扣减成功
log.info("用户 {} 成功抢购商品 {}, 剩余库存: {}", userId, itemId, result);
// 返回一个成功标记,准备发送MQ消息
return "SUCCESS";
}

深度探讨:脚本缓存与EVALSHA

每次使用EVAL命令执行Lua脚本时,都需要将整个脚本字符串发送给Redis,这会产生网络开销。Redis对此进行了优化:

  1. 当一个脚本第一次被EVAL时,Redis会计算它的SHA1哈希值,并将脚本内容缓存起来。
  2. EVAL命令会返回这个SHA1值。
  3. 下次再执行同一个脚本时,你可以使用EVALSHA …命令,只发送SHA1哈希值,而不用发送整个脚本。spring-data-redisredisTemplate.execute()方法已经为我们自动处理了这个过程。它会优先尝试使用EVALSHA,如果Redis返回“脚本未找到”的错误,它会自动降级,使用EVAL重新发送完整脚本。我们无需手动管理SHA1值。

1.4 数据预热:战争前的粮草先行

我们的Lua脚本依赖于Redis中存在正确的库存数据。这些数据必须在秒杀活动开始前,从数据库(我们称之为“真相之源”,Source of Truth)加载到Redis中。这个过程,称为数据预热

Java实战代码:

我们可以利用Spring的ApplicationRunner接口,在应用启动后执行预热逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Component
public class SeckillDataWarmUpRunner implements ApplicationRunner {

@Autowired
private SeckillItemRepository itemRepository; // 假设这是JPA或MyBatis的Mapper

@Autowired
private StringRedisTemplate redisTemplate;

@Override
public void run(ApplicationArguments args) throws Exception {
log.info("开始秒杀数据预热...");

// 1. 从数据库加载所有待秒杀的商品
List<SeckillItem> items = itemRepository.findAllActiveItems();

if (items == null || items.isEmpty()) {
log.info("没有需要预热的秒杀商品。");
return;
}

// 2. 遍历商品,将信息写入Redis
for (SeckillItem item : items) {
String stockKey = "seckill:stock:" + item.getId();
// 使用 set, 而不是 setnx, 保证每次启动都能覆盖旧数据
redisTemplate.opsForValue().set(stockKey, String.valueOf(item.getStock()));
log.info("商品 {} 库存预热完成,库存: {}", item.getId(), item.getStock());

// 也可以预热其他信息,如限购数量、活动时间等
// redisTemplate.opsForHash().put("seckill:item_info:" + item.getId(), "limit", item.getLimit());
}

log.info("秒杀数据预热完成!");
}
}

预热是保证秒杀活动顺利进行的关键前提。通常会通过后台管理系统或定时任务,在活动开始前的一段时间(如提前10分钟)触发。


第二战场:消息队列——削峰填谷的“三峡大坝”

在第一战场,我们通过Redis+Lua,以闪电般的速度完成了“资格筛选”和“名额预定”。现在,抢到资格的用户信息,就像一股汹涌的洪流,向我们的下游系统(订单、支付、物流)涌来。

如果让Seckill-Core Service在扣减Redis库存后,同步地去调用Order Service创建订单,会发生什么?

  1. 用户体验极差: 订单创建涉及数据库的INSERTUPDATE等多个I/O操作,可能耗时几百毫秒。用户点击秒杀按钮后,需要长时间地等待转圈。
  2. 系统雪崩风险: Seckill-Core Service的线程会大量阻塞在等待Order Service响应上。一旦Order Service因为数据库压力过大而变慢,压力会立刻反向传导,导致Seckill-Core Service的线程池被耗尽,整个秒杀入口全部瘫痪。

2.1 设计哲学:为何必须异步?

我们必须斩断这条同步调用的锁链,引入异步化。这就是消息队列(MQ)登场的时刻。

MQ在这里扮演的角色,就像一座宏伟的“三峡大坝”:

  • 削峰填谷: 秒杀的下单请求,会在1秒内达到峰值。而下游的数据库,其INSERT操作的TPS是有限的(比如每秒2000次)。MQ允许上游的生产者(Seckill-Core Service)以极高的速率(每秒几十万甚至上百万)将“下单请求”这条消息扔进队列,然后立即返回,告诉用户“您已抢购成功,订单正在处理中”。而下游的消费者(Order Service)则可以按照自己数据库能承受的、平稳的速率,慢慢地从队列里拉取消息进行消费。峰值的洪流,被大坝拦蓄,转化为了平稳的水流。
  • 服务解耦: Seckill-Core Service不再关心Order Service是否存在、是否健康。它的任务只有一个:把秒杀成功的消息,可靠地发出去。这大大降低了系统间的耦合度。
  • 增强系统韧性: 即使Order Service或数据库短暂宕机,消息会积压在MQ中。当服务恢复后,可以继续消费,数据不会丢失(前提是MQ本身高可用)。

核心思想转变: 在引入MQ后,对于用户的秒杀请求,只要Redis预扣减成功,并且下单消息成功发送到MQ,这次秒杀在用户感知层面就已经成功了

2.2 技术选型:Kafka vs. RocketMQ vs. Pulsar

选择哪款MQ,是每个Java架构师都会面临的抉择。

维度 Kafka RocketMQ Pulsar
吞吐量 极致(顺序读写,零拷贝) 非常高 非常高 (存算分离架构)
延迟 相对较高 (依赖批量发送) 非常低(为电商交易优化) 较低
功能丰富度 基础 (核心是流处理) 非常丰富 丰富
核心特性 高吞吐流处理 事务消息、延迟消息、死信队列 存算分离、多租户、Geo-Replication
运维复杂度 较高 (依赖Zookeeper) 中等 较高 (组件多)
社区/生态 最广泛(大数据领域事实标准) 非常活跃 (Java/阿里生态) 快速发展

我们的选择与思考:

  • Pulsar 技术架构先进,但相对较新,社区和成熟案例不如前两者,对于追求极致稳定性的秒杀核心链路,我们暂时持保守态度。
  • Kafka 是吞吐量的王者,非常适合日志收集、大数据分析等流处理场景。但在交易场景,它原生不支持“事务消息”,要保证“扣减Redis”和“发送消息”的原子性,需要自己实现复杂的两阶段提交逻辑。
  • RocketMQ 是阿里巴巴为应对双十一海量交易而生的MQ。它天生就为我们这个场景提供了最契合的武器:
  • 事务消息: 能完美地保证我们“本地操作”(扣减Redis)与“发送消息”这两个步骤的原子性。
  • 延迟消息: 在很多补偿场景中非常有用。
  • 极低延迟: 对交易类消息的优化非常好。

结论: 在我们的Java技术栈秒杀场景中,RocketMQ凭借其“事务消息”这一杀手级特性,成为了最优选。

2.3 生产者端实战 (Seckill-Core Service)

引入RocketMQ的Spring Boot Starter:

1
2
3
4
5
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>...</version>
</dependency>

可靠性保证:利用事务消息

我们要解决的核心问题:如果Redis库存扣减成功了,但Seckill-Core Service在发送MQ消息时突然宕机,这条消息就丢失了,用户会发现自己“抢到了,但订单没了”。

RocketMQ事务消息通过一个两阶段提交协议来解决这个问题:

  1. 第一阶段 (Prepare): 生产者先向Broker发送一条“半消息(Half Message)”。这条消息对消费者是不可见的。
  2. 执行本地事务: 发送半消息成功后,生产者开始执行本地事务(在我们的场景里,就是执行Redis Lua脚本扣减库存)。
  3. 第二阶段 (Commit/Rollback):
  • 如果本地事务执行成功,生产者向Broker发送一个Commit请求,Broker将半消息标记为可投递,消费者此时才能消费到。
  • 如果本地事务执行失败,生产者向Broker发送一个Rollback请求,Broker会直接删除这条半消息。

**4. **事务状态回查:

如果生产者在执行完本地事务后,未来得及发送Commit/Rollback就宕机了,怎么办?Broker会定期向生产者集群的任意一台机器发起一个“回查”请求,询问:“ID为XXX的那个事务,你本地执行成功了吗?” 生产者需要实现一个回查监听器,去检查本地事务的状态,并告诉Broker是该Commit还是Rollback

Java实战代码:

  1. 定义消息体 DTO:
1
2
3
4
5
6
7
8
@Data
@AllArgsConstructor
@NoArgsConstructor
public class SeckillMessage implements Serializable {
private Long userId;
private Long itemId;
private String transactionId; // 用于幂等性处理
}
  1. 实现事务监听器 (TransactionListener):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@RocketMQTransactionListener
public class SeckillTransactionListenerImpl implements RocketMQLocalTransactionListener {

@Autowired
private SeckillService seckillService; // 假设有一个服务能查到本地事务状态

// 执行本地事务
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// arg就是我们调用sendMessageInTransaction时传入的业务参数
Map<String, Object> params = (Map<String, Object>) arg;
String userId = (String) params.get("userId");
String itemId = (String) params.get("itemId");

// 执行我们之前写的Redis+Lua扣减库存逻辑
// 注意:这里需要改造一下,让扣减库存的方法能记录下事务状态
boolean success = seckillService.deductStockWithTransaction(userId, itemId, msg.getTransactionId());

if (success) {
return RocketMQLocalTransactionState.COMMIT_MESSAGE;
} else {
return RocketMQLocalTransactionState.ROLLBACK_MESSAGE;
}
} catch (Exception e) {
log.error("执行秒杀本地事务时发生异常", e);
return RocketMQLocalTransactionState.ROLLBACK_MESSAGE;
}
}

// 事务状态回查
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// Broker会调用这个方法,询问事务状态
// 我们需要根据消息中的transactionId,去查询本地事务是否成功
// 比如,查一个“事务状态表”,或者查Redis中是否有成功标记
boolean isSuccess = seckillService.checkLocalTransactionStatus(msg.getTransactionId());
if (isSuccess) {
return RocketMQLocalTransactionState.COMMIT_MESSAGE;
} else {
return RocketMQLocalTransactionState.ROLLBACK_MESSAGE;
}
}
}
  1. 发送事务消息:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Service
public class MessageProducerService {

@Autowired
private RocketMQTemplate rocketMQTemplate;

public void sendSeckillMessage(String userId, String itemId) {
String txId = UUID.randomUUID().toString();
SeckillMessage payload = new SeckillMessage(Long.parseLong(userId), Long.parseLong(itemId), txId);
Message<SeckillMessage> message = MessageBuilder.withPayload(payload).build();

// 传入业务参数,用于执行本地事务
Map<String, Object> args = new HashMap<>();
args.put("userId", userId);
args.put("itemId", itemId);

// 发送事务消息
// "seckill-topic"是Topic名称
// "tx-group"是事务生产者组
rocketMQTemplate.sendMessageInTransaction("seckill-topic", message, args);
}
}

通过这套组合拳,我们用一种极其优雅和可靠的方式,解决了分布式场景下,“本地操作”与“消息发送”的原子性难题。

2.4 消费者端实战 (Order Service)

Order Service的任务相对单纯:监听seckill-topic,获取消息,创建订单。但魔鬼在细节中。

Java实战代码 (消费者):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@Service
@RocketMQMessageListener(topic = "seckill-topic", consumerGroup = "order-consumer-group")
public class SeckillMessageConsumer implements RocketMQListener<SeckillMessage> {

@Autowired
private OrderRepository orderRepository;

@Override
public void onMessage(SeckillMessage message) {
log.info("收到秒杀成功消息: {}", message);

// !!! 此处必须处理消费幂等性问题 !!!
if (isConsumed(message.getTransactionId())) {
log.warn("消息已被消费过,忽略。TransactionId: {}", message.getTransactionId());
return;
}

try {
// 创建订单
Order order = new Order();
order.setUserId(message.getUserId());
order.setItemId(message.getItemId());
order.setOrderSn(generateOrderSn()); // 生成唯一的订单号
order.setCreateTime(new Date());

orderRepository.save(order); // 保存到数据库

// 标记消息已成功消费
markAsConsumed(message.getTransactionId());
} catch (DataIntegrityViolationException e) {
// 这通常是数据库唯一索引冲突导致的,说明消息可能被重复消费了
// 已经被幂等机制处理,可以认为是“成功”的,打印警告日志即可
log.warn("订单创建时发生唯一键冲突,可能为重复消费,已被幂等处理。TransactionId: {}", message.getTransactionId());
markAsConsumed(message.getTransactionId()); // 同样需要标记为已消费
} catch (Exception e) {
log.error("创建订单时发生未知异常,等待MQ重试", e);
// 抛出异常,RocketMQ会根据配置进行重试
throw new RuntimeException("消费失败,触发重试");
}
}

// isConsumed 和 markAsConsumed 的具体实现见下文
}

核心挑战一:消费幂等性保证

解决方案“三板斧”:

  1. 数据库唯一索引(最可靠的兜底方案): 在订单表t_order中,为order_sn(订单号)或者user_id+item_id的组合,创建一个唯一索引(Unique Index)。当重复的消息到来,试图插入一个已存在的订单号时,数据库会直接抛出DataIntegrityViolationException。我们在catch块里捕获这个异常,就知道这是一次重复消费,然后安静地忽略它即可。
  2. 引入“消费记录表”: 创建一张t_consumed_message表,主键就是消息的唯一ID(如transactionId)。每次消费前,先INSERT这条ID。因为主键的唯一性,重复的INSERT会失败,从而阻止业务逻辑的二次执行。
  3. Redis setnx 分布式锁: 每次消费前,以消息的唯一ID为Key,执行setnx(key, “1”)。如果成功,表示第一次消费,执行业务逻辑,最后del(key)。如果失败,表示已有其他线程在处理,直接放弃。

推荐组合拳: Redis setnx + 数据库唯一索引。Redis在前,速度快,能挡掉99%的重复请求,减轻数据库压力。数据库唯一索引在后,作为最终的、100%可靠的兜底保障。

核心挑战二:消费失败与重试

application.yml中配置消费者组的重试策略和死信队列。

1
2
3
4
5
6
7
rocketmq:
consumer:
group: order-consumer-group
# 最大重试次数。-1表示16次。
max-reconsume-times: 5
producer:
# ...

onMessage方法抛出异常时,RocketMQ会自动将消息重新投递。默认的重试策略是阶梯式的延迟(10s, 30s, 1m, 2m…)。如果达到最大重试次数后仍然失败,RocketMQ会将这条消息投递到一个特殊的Topic——死信队列(DLQ)

死信队列的Topic命名通常是**%DLQ%consumer-group-name**。我们需要另外编写一个死信队列消费者,专门来监听这个Topic,将处理失败的消息记录到数据库或发送告警通知,以便人工介入处理,进行对账和补偿。


结语:从“原子战争”到“可靠投递”

在本章超过一万五千字的浴血奋战中,我们终于攻克了秒杀系统的核心——高并发下的库存扣减与订单创建。我们没有采用任何魔法,而是遵循了分布式系统设计的两条黄金法则:

  1. 将热点冲突上移至内存: 我们用Redis和精心设计的Lua脚本,将原本属于数据库的、沉重的并发控制压力,转移到了性能高出几个数量级的内存中,打赢了这场“原子战争”。
  2. 用异步化解耦时序矛盾: 我们引入了消息队列这坐“三峡大坝”,用一招“乾坤挪移”,将瞬时的下单洪峰,转化为平稳的数据消费流,实现了系统的“削峰填谷”和“服务解耦”,保证了用户体验的极致流畅。

我们不仅选择了正确的工具(Redis, RocketMQ),更深入地探讨了如何“正确地”使用它们。从Lua脚本的原子性保证,到RocketMQ事务消息的可靠投递,再到消费者幂等性的“三板斧”和死信队列的兜底机制,每一个细节,都体现了架构师在追求高性能的同时,对数据一致性和系统可靠性的不懈追求。

至此,一个完整的秒-杀下单流程已经闭环。但是,一场战争的结束,意味着另一场战争的开始。我们的系统能抵御风暴了,但它健康吗?当问题发生时,我们能像“上帝”一样,洞察其内部的每一个角落吗?

那么下一步,就应该是**《风暴之后:构建系统的“上帝之眼”——全链路可观测性体系》**。我们将学习如何利用Logging, Metrics, Tracing这“三位一体”,为我们的秒杀系统装上眼睛、耳朵和神经系统,真正做到运筹帷幄,决胜千里。