消息队列|Kafka重复消费

2025/4/14 后端消息队列

# 重复消费

高并发场景下怎么保证消息不会重复消费

# 布隆过滤器

一种数据结构,hash运算,可能存在假阳性

另一种类似布隆过滤器的方案bit array

# 重复消费可能的原因

两种:

  1. 生产者重复发送,可能是因为收到了超时响应,重试导致同一消息发送了多次
  2. 消费者重复消费,消息处理完毕准备提交,但是系统宕机,等恢复之后,会再次消费同一个消息

综上,要将消费逻辑设计成幂等。

# 面试准备

  • 你负责的业务里面有没有接口或消费者是要求幂等的?如果有,你是如何解决幂等的?如果你依赖于唯一索引来解决幂等,那么这部分的写流量有多大?
  • 如果你依赖于唯一索引来解决幂等,那么你是怎么保证业务操作和将数据插入到唯一索引是同时成功,或者同时失败的?
  • 你或者你的同事有没有因为没有设计幂等,或者幂等方案有问题而引起线上事故?如果有,你是怎么解决的?
  • 你的业务是否使用过布隆过滤器,如果有,当时用来解决什么问题?

# 基本思路

利用唯一索引。

# 本地事务将数据插入到唯一索引

这个时候你应该使用事务。也就是说,你收到消息之后就开启一个本地事务。

在这个本地事务里面你会同时完成业务操作和将数据插入到唯一索引这两个操作,然后提交。

当然,很多情况下这个插入唯一索引的操作本身就是你业务操作的一部分。

# 非本地事务将数据插入到唯一索引

依赖于第三方来检测。

  1. 把数据插入到唯一索引,但是这个时候状态被标记为初始状态。注意这一步一定要先执行,这是避免重复处理的关键。
  2. 执行业务操作。
  3. 将唯一索引对应的数据标记为完成状态。

需要异步检测,判断唯一索引和业务数据是否一致。

# 亮点方案

一个高并发的幂等方案。

这个幂等方案需要用到布隆过滤器、 Redis 和唯一索引。

方案的基本思路是,如果依赖于数据库唯一索引来判断幂等,那么数据库撑不住高并发。

所以我们就想办法在使用唯一索引之前,尽可能先削减流量。

这个场景就非常适合使用布隆过滤器。而为了避免假阳性的问题,进一步降低发送到数据库的流量,在布隆过滤器和数据库之间再引入一个Redis。

基本流程是这样的:

首先,一个请求过来的时候,我们会利用布隆过滤器来判断它有没有被处理过。如果布隆过滤器说没有处理过,那么就确实没有被处理过,可以直接处理。如果布隆过滤器说处理过(可能是假阳性),那么就要执行下一步。

第二步就是利用Redis存储近期处理过的key。如果Redis里面有这个key,说明它的确被处理过了,直接返回,否则进入第三步。这一步的关键就是key的过期时间应该是多长。

第三步则是利用唯一索引,如果唯一索引冲突了,那么就代表已经处理过了。这个唯一索引一般就是业务的唯一索引,并不需要额外创建一个索引。

# 更新顺序

先更新数据库的唯一索引。

因为不管什么原因导致的,只要到了插入唯一索引,都可以确保不会重复处理消息或者清居前

# redis key的过期时间

重试如果10min,设置过期时间11min,1min当作余量,削减流量

# 简化方案

重试间隔大,就只可以省略redis。

数据库性能差,redis+唯一索引。

# 本地布隆过滤器

bit array等等替代

# 伪代码

以下是一个基于布隆过滤器、Redis 和数据库唯一索引的高并发幂等方案的 Go 伪代码实现,包含详细注释和关键设计说明:

package main

import (
 "context"
 "errors"
 "fmt"
 "github.com/bits-and-blooms/bloom/v3" // 布隆过滤器库
 "github.com/go-redis/redis/v8"       // Redis客户端
 "gorm.io/gorm"                       // 数据库ORM(示例用GORM)
)

// 幂等校验器结构体
type IdempotencyChecker struct {
 bloomFilter  *bloom.BloomFilter  // 布隆过滤器(内存或分布式)
 redisClient  *redis.Client       // Redis客户端
 db           *gorm.DB            // 数据库连接
 redisKeyTTL  time.Duration       // Redis键过期时间(根据业务设置)
}

// 初始化幂等校验器
func NewIdempotencyChecker(redisTTL time.Duration) *IdempotencyChecker {
 // 布隆过滤器参数:预计1000万元素,误判率0.1%
 bf := bloom.NewWithEstimates(10_000_000, 0.001) 
 
 // Redis连接(示例配置)
 rdb := redis.NewClient(&redis.Options{
  Addr:     "localhost:6379",
  Password: "",
  DB:       0,
 })
 
 // 数据库连接(示例用GORM)
 db := ConnectDB() 
 
 return &IdempotencyChecker{
  bloomFilter: bf,
  redisClient: rdb,
  db:          db,
  redisKeyTTL: redisTTL,
 }
}

// 幂等校验主入口
func (c *IdempotencyChecker) CheckAndProcess(ctx context.Context, requestID string, data interface{}) error {
 // Step 1: 布隆过滤器快速判断
 if !c.bloomFilter.TestString(requestID) {
  // 布隆过滤器认为不存在,直接处理
  return c.processNewRequest(ctx, requestID, data)
 }
 
 // Step 2: 查询Redis确认是否处理过
 exists, err := c.redisClient.Exists(ctx, requestID).Result()
 if err != nil {
  // Redis查询失败,降级到数据库检查
  return c.fallbackToDBCheck(ctx, requestID, data)
 }
 if exists == 1 {
  // Redis中存在记录,直接返回幂等
  return ErrDuplicateRequest
 }
 
 // Step 3: 最终检查数据库唯一索引
 return c.fallbackToDBCheck(ctx, requestID, data)
}

// 处理新请求(核心逻辑)
func (c *IdempotencyChecker) processNewRequest(ctx context.Context, requestID string, data interface{}) error {
 // 尝试插入数据库(唯一索引是最终防线)
 err := c.db.Create(&YourBusinessModel{
  RequestID: requestID, // 业务唯一键
  Data:      data,
 }).Error
 
 if err != nil {
  // 唯一键冲突,说明已处理过
  if IsDuplicateKeyError(err) {
   // 异步更新布隆过滤器和Redis(补偿机制)
   go c.asyncUpdateCache(requestID)
   return ErrDuplicateRequest
  }
  return err
 }
 
 // 插入成功,更新缓存
 c.bloomFilter.AddString(requestID)
 c.redisClient.SetEX(ctx, requestID, "1", c.redisKeyTTL)
 return nil
}

// 降级到数据库检查
func (c *IdempotencyChecker) fallbackToDBCheck(ctx context.Context, requestID string, data interface{}) error {
 // 查询数据库是否存在记录
 var count int64
 if err := c.db.Model(&YourBusinessModel{}).
  Where("request_id = ?", requestID).
  Count(&count).Error; err != nil {
  return err
 }
 
 if count > 0 {
  // 数据库中存在记录,更新缓存后返回幂等
  c.bloomFilter.AddString(requestID)
  c.redisClient.SetEX(ctx, requestID, "1", c.redisKeyTTL)
  return ErrDuplicateRequest
 }
 
 // 不存在则继续处理(可能是布隆过滤器误判+Redis过期)
 return c.processNewRequest(ctx, requestID, data)
}

// 异步更新缓存(防止主流程阻塞)
func (c *IdempotencyChecker) asyncUpdateCache(requestID string) {
 c.bloomFilter.AddString(requestID)
 c.redisClient.SetEX(context.Background(), requestID, "1", c.redisKeyTTL)
}

// 判断是否为数据库唯一键冲突错误(需根据具体数据库驱动实现)
func IsDuplicateKeyError(err error) bool {
 // 示例:MySQL的Duplicate entry错误码为1062
 // return strings.Contains(err.Error(), "Error 1062")
 return errors.Is(err, gorm.ErrDuplicatedKey)
}

var (
 ErrDuplicateRequest = errors.New("request already processed")
)

// 业务数据模型(示例)
type YourBusinessModel struct {
 RequestID string `gorm:"uniqueIndex"` // 唯一索引
 Data      interface{}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132

# 关键设计说明

  1. 三级过滤机制

    • 第一层:布隆过滤器
      快速拦截绝对不存在的情况(无假阴性),内存级操作,TPS可达百万级。
    • 第二层:Redis
      解决布隆过滤器的假阳性问题,存储近期处理的请求ID,过期时间根据业务设置(如订单类业务可设7天)。
    • 第三层:数据库唯一索引
      最终一致性保障,处理极端情况(如Redis故障或缓存过期)。
  2. 性能优化点

    • 布隆过滤器内存优化
      通过NewWithEstimates预设容量和误判率,避免频繁扩容。
    • Redis Pipeline批量操作
      高并发场景下可将多个SETEX合并为Pipeline执行(伪代码未体现,可扩展)。
    • 异步更新缓存
      数据库插入成功后,异步更新布隆过滤器和Redis,降低主流程延迟。
  3. 容错与补偿

    • 降级策略
      Redis查询失败时直接降级到数据库检查,避免级联故障。
    • 异步补偿
      唯一键冲突时异步更新缓存,解决并发请求穿透问题。
  4. 防雪崩设计

    • Redis过期时间分散
      实际可对redisKeyTTL添加随机扰动(如TTL + rand(60)),避免大量Key同时过期。
    • 布隆过滤器动态扩容
      当元素数量超过预估时,可动态创建新布隆过滤器(需额外设计分布策略)。

# 适用场景示例

假设电商系统处理订单支付回调:

  1. 支付平台可能重复推送同一笔订单的支付结果。
  2. 请求ID = 订单号 + 支付流水号(唯一组合)。
  3. Redis过期时间设为7天(覆盖支付对账周期)。
  4. 数据库唯一索引为(order_id, payment_id)

# 压测建议

  1. 基准测试

    • 纯布隆过滤器:TPS可达 100万+/秒。
    • 布隆+Redis:TPS约 5万~10万/秒(依赖Redis性能)。
    • 全链路(含DB):TPS取决于数据库写入能力(需分库分表优化)。
  2. 监控指标

    • 布隆过滤器误判率(bloomFilter.ApproximatedSize() / bloomFilter.Cap()
    • Redis命中率(INFO stats中的keyspace_hits/keyspace_misses
    • 数据库唯一键冲突率(反映实际重复请求比例)

通过这种三级防御机制,可以在高并发场景下将数据库的幂等校验压力降低 99% 以上(假设布隆过滤器误判率0.1%,Redis命中率90%)。

Last Updated: 2025/4/15 10:54:47