在Kafka中,你如何处理消息的重复消费和消息丢失问题?请结合实际场景说明
什么是消息重复消费?
消费者订阅MQ消费时,收到多条消息内容一致的数据进行处理,对业务造成重复处理。
重复消费的原因有哪些?
生产者重试发送
消费者提交偏移量失败导致重复拉取
如何解决重复消费?
唯一ID
每条消息的消息体内添加一个全局唯一ID字段(订单号、流水号),在消费数据时查看数据是否被处理过。
数据库唯一索引
对业务关键字段创建唯一索引,重复插入会报错。
业务流程状态机
业务流程通过状态机流转,每个操作只允许在特定的状态下进行操作。如果之前处理过流转到下一个状态,出现重复消息时因为状态不匹配不处理。
场景:购物支付,即时收到多次支付请求也只扣款一次。
什么是消息丢失?
生产者生产的消息无法正确让消费者消费,中间过程出现消息丢失的现象。
消息丢失的原因有哪些?
生产者丢失
- 如果ack设置为0,不等待broker响应确认,消息发送失败
- 网络抖动
Broker丢失
- Leader宕机,消息未及时同步到Follower
- 消息未及时刷盘。pageCache -> 硬盘
消费者丢失
先提交偏移量,后消费时出现异常
如何解决消息丢失?
生产者端
- ack=all,等待所有的ISR副本同步完成
- retries消息重试机制,考虑消息幂等
Broker端
- 设置副本大于等于2
- ISR最小副本数合理配置
消费者端
手动提交偏移量
1
enable.auto.commit=false
先处理消息再提交偏移量
场景:订单消息创建成功,必须保证发送和消费成功。